From be6aa4270865b00792c3ecd5deb0ee45db4f7f0d Mon Sep 17 00:00:00 2001 From: Ziye Yang Date: Thu, 16 Jan 2020 21:34:39 +0800 Subject: [PATCH] sock/vpp: Add spdk_vpp_sock_writev_async support Purpose: To support SPDK iSCSI target applications when it uses spdk_sock_writev_async. PS: for some duplicated code between posix and vpp, we should consider it later. Since if we do it early, if it cannot be abstracted in the common header, it will invoke addtional work. Signed-off-by: Ziye Yang Change-Id: I30d8ee81f80ea5e74c53ff726ee44b0612867c71 Reviewed-on: https://review.gerrithub.io/c/spdk/spdk/+/481749 Reviewed-by: Shuhei Matsumoto Reviewed-by: Tomasz Zawadzki Reviewed-by: Ben Walker Community-CI: SPDK CI Jenkins Tested-by: SPDK CI Jenkins --- module/sock/vpp/vpp.c | 161 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 154 insertions(+), 7 deletions(-) diff --git a/module/sock/vpp/vpp.c b/module/sock/vpp/vpp.c index 4a7db9ef8..c873a9817 100644 --- a/module/sock/vpp/vpp.c +++ b/module/sock/vpp/vpp.c @@ -75,6 +75,7 @@ #define SPDK_VPP_LISTEN_QUEUE_SIZE SPDK_VPP_SESSIONS_MAX #define SPDK_VPP_SEGMENT_BASEVA 0x200000000ULL #define SPDK_VPP_SEGMENT_TIMEOUT 20 +#define IOV_BATCH_SIZE 64 /* VPP connection state */ enum spdk_vpp_state { @@ -917,7 +918,7 @@ spdk_vpp_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) } static ssize_t -spdk_vpp_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) +_vpp_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) { struct spdk_vpp_session *session = __vpp_session(_sock); ssize_t total = 0; @@ -962,15 +963,151 @@ spdk_vpp_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) return total; } +static int +_sock_flush(struct spdk_sock *sock) +{ + struct iovec iovs[IOV_BATCH_SIZE]; + int iovcnt; + int retval; + struct spdk_sock_request *req; + int i; + ssize_t rc; + unsigned int offset; + size_t len; + + /* Can't flush from within a callback or we end up with recursive calls */ + if (sock->cb_cnt > 0) { + return 0; + } + + /* Gather an iov */ + iovcnt = 0; + req = TAILQ_FIRST(&sock->queued_reqs); + while (req) { + offset = req->internal.offset; + + for (i = 0; i < req->iovcnt; i++) { + /* Consume any offset first */ + if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { + offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; + continue; + } + + iovs[iovcnt].iov_base = SPDK_SOCK_REQUEST_IOV(req, i)->iov_base + offset; + iovs[iovcnt].iov_len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; + iovcnt++; + + offset = 0; + + if (iovcnt >= IOV_BATCH_SIZE) { + break; + } + } + + if (iovcnt >= IOV_BATCH_SIZE) { + break; + } + + req = TAILQ_NEXT(req, internal.link); + } + + if (iovcnt == 0) { + return 0; + } + + /* Perform the vectored write */ + rc = _vpp_sock_writev(sock, iovs, iovcnt); + if (rc <= 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return 0; + } + return rc; + } + + /* Consume the requests that were actually written */ + req = TAILQ_FIRST(&sock->queued_reqs); + while (req) { + offset = req->internal.offset; + + for (i = 0; i < req->iovcnt; i++) { + /* Advance by the offset first */ + if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { + offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; + continue; + } + + /* Calculate the remaining length of this element */ + len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; + + if (len > (size_t)rc) { + /* This element was partially sent. */ + req->internal.offset += rc; + return 0; + } + + offset = 0; + req->internal.offset += len; + rc -= len; + } + + /* Handled a full request. */ + req->internal.offset = 0; + spdk_sock_request_pend(sock, req); + + /* The _vpp_sock_writev above isn't currently asynchronous, + * so it's already done. */ + retval = spdk_sock_request_put(sock, req, 0); + + if (rc == 0 || retval) { + break; + } + + req = TAILQ_FIRST(&sock->queued_reqs); + } + + return 0; +} + +static ssize_t +spdk_vpp_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) +{ + int rc; + + /* In order to process a writev, we need to flush any asynchronous writes + * first. */ + rc = _sock_flush(_sock); + if (rc < 0) { + return rc; + } + + if (!TAILQ_EMPTY(&_sock->queued_reqs)) { + /* We weren't able to flush all requests */ + errno = EAGAIN; + return -1; + } + + return _vpp_sock_writev(_sock, iov, iovcnt); +} + static void spdk_vpp_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req) { - /* TODO. We don't test VPP with NVMe-oF TCP right now. This will - * need to be implemented later. */ - assert(false); + int rc; spdk_sock_request_queue(sock, req); - spdk_sock_request_put(sock, req, -ENOTSUP); + + if (sock->group_impl == NULL) { + spdk_sock_request_put(sock, req, -ENOTSUP); + return; + } + + /* If there are a sufficient number queued, just flush them out immediately. */ + if (sock->queued_iovcnt >= IOV_BATCH_SIZE) { + rc = _sock_flush(sock); + if (rc) { + spdk_sock_abort_requests(sock); + } + } } static int @@ -1092,8 +1229,8 @@ static int spdk_vpp_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, struct spdk_sock **socks) { - int num_events; - struct spdk_sock *sock; + int num_events, rc; + struct spdk_sock *sock, *tmp; struct spdk_vpp_session *session; struct spdk_vpp_sock_group_impl *group; @@ -1104,6 +1241,16 @@ spdk_vpp_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_event group = __vpp_group_impl(_group); num_events = 0; + /* This must be a TAILQ_FOREACH_SAFE because while flushing, + * a completion callback could remove the sock from the + * group. */ + TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) { + rc = _sock_flush(sock); + if (rc) { + spdk_sock_abort_requests(sock); + } + } + sock = group->last_sock; if (sock == NULL) { sock = TAILQ_FIRST(&group->base.socks);