diff --git a/module/sock/vpp/vpp.c b/module/sock/vpp/vpp.c index 4a7db9ef8..c873a9817 100644 --- a/module/sock/vpp/vpp.c +++ b/module/sock/vpp/vpp.c @@ -75,6 +75,7 @@ #define SPDK_VPP_LISTEN_QUEUE_SIZE SPDK_VPP_SESSIONS_MAX #define SPDK_VPP_SEGMENT_BASEVA 0x200000000ULL #define SPDK_VPP_SEGMENT_TIMEOUT 20 +#define IOV_BATCH_SIZE 64 /* VPP connection state */ enum spdk_vpp_state { @@ -917,7 +918,7 @@ spdk_vpp_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) } static ssize_t -spdk_vpp_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) +_vpp_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) { struct spdk_vpp_session *session = __vpp_session(_sock); ssize_t total = 0; @@ -962,15 +963,151 @@ spdk_vpp_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) return total; } +static int +_sock_flush(struct spdk_sock *sock) +{ + struct iovec iovs[IOV_BATCH_SIZE]; + int iovcnt; + int retval; + struct spdk_sock_request *req; + int i; + ssize_t rc; + unsigned int offset; + size_t len; + + /* Can't flush from within a callback or we end up with recursive calls */ + if (sock->cb_cnt > 0) { + return 0; + } + + /* Gather an iov */ + iovcnt = 0; + req = TAILQ_FIRST(&sock->queued_reqs); + while (req) { + offset = req->internal.offset; + + for (i = 0; i < req->iovcnt; i++) { + /* Consume any offset first */ + if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { + offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; + continue; + } + + iovs[iovcnt].iov_base = SPDK_SOCK_REQUEST_IOV(req, i)->iov_base + offset; + iovs[iovcnt].iov_len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; + iovcnt++; + + offset = 0; + + if (iovcnt >= IOV_BATCH_SIZE) { + break; + } + } + + if (iovcnt >= IOV_BATCH_SIZE) { + break; + } + + req = TAILQ_NEXT(req, internal.link); + } + + if (iovcnt == 0) { + return 0; + } + + /* Perform the vectored write */ + rc = _vpp_sock_writev(sock, iovs, iovcnt); + if (rc <= 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return 0; + } + return rc; + } + + /* Consume the requests that were actually written */ + req = TAILQ_FIRST(&sock->queued_reqs); + while (req) { + offset = req->internal.offset; + + for (i = 0; i < req->iovcnt; i++) { + /* Advance by the offset first */ + if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { + offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; + continue; + } + + /* Calculate the remaining length of this element */ + len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; + + if (len > (size_t)rc) { + /* This element was partially sent. */ + req->internal.offset += rc; + return 0; + } + + offset = 0; + req->internal.offset += len; + rc -= len; + } + + /* Handled a full request. */ + req->internal.offset = 0; + spdk_sock_request_pend(sock, req); + + /* The _vpp_sock_writev above isn't currently asynchronous, + * so it's already done. */ + retval = spdk_sock_request_put(sock, req, 0); + + if (rc == 0 || retval) { + break; + } + + req = TAILQ_FIRST(&sock->queued_reqs); + } + + return 0; +} + +static ssize_t +spdk_vpp_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) +{ + int rc; + + /* In order to process a writev, we need to flush any asynchronous writes + * first. */ + rc = _sock_flush(_sock); + if (rc < 0) { + return rc; + } + + if (!TAILQ_EMPTY(&_sock->queued_reqs)) { + /* We weren't able to flush all requests */ + errno = EAGAIN; + return -1; + } + + return _vpp_sock_writev(_sock, iov, iovcnt); +} + static void spdk_vpp_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req) { - /* TODO. We don't test VPP with NVMe-oF TCP right now. This will - * need to be implemented later. */ - assert(false); + int rc; spdk_sock_request_queue(sock, req); - spdk_sock_request_put(sock, req, -ENOTSUP); + + if (sock->group_impl == NULL) { + spdk_sock_request_put(sock, req, -ENOTSUP); + return; + } + + /* If there are a sufficient number queued, just flush them out immediately. */ + if (sock->queued_iovcnt >= IOV_BATCH_SIZE) { + rc = _sock_flush(sock); + if (rc) { + spdk_sock_abort_requests(sock); + } + } } static int @@ -1092,8 +1229,8 @@ static int spdk_vpp_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, struct spdk_sock **socks) { - int num_events; - struct spdk_sock *sock; + int num_events, rc; + struct spdk_sock *sock, *tmp; struct spdk_vpp_session *session; struct spdk_vpp_sock_group_impl *group; @@ -1104,6 +1241,16 @@ spdk_vpp_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_event group = __vpp_group_impl(_group); num_events = 0; + /* This must be a TAILQ_FOREACH_SAFE because while flushing, + * a completion callback could remove the sock from the + * group. */ + TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) { + rc = _sock_flush(sock); + if (rc) { + spdk_sock_abort_requests(sock); + } + } + sock = group->last_sock; if (sock == NULL) { sock = TAILQ_FIRST(&group->base.socks);