From 7ef33c86b824b912d8d487a86fccff31d76d93aa Mon Sep 17 00:00:00 2001 From: Ben Walker Date: Wed, 9 Oct 2019 14:06:15 -0700 Subject: [PATCH] sock/posix: Zero copy send If available, automatically use MSG_ZEROCOPY when sending on sockets. Storage workloads contain sufficient data transfer sizes that this is always a performance improvement, regardless of workload. Change-Id: I14429d78c22ad3bc036aec13c9fce6453e899c92 Signed-off-by: Ben Walker Reviewed-on: https://review.gerrithub.io/c/spdk/spdk/+/471752 Tested-by: SPDK CI Jenkins Reviewed-by: Shuhei Matsumoto Reviewed-by: Jim Harris Reviewed-by: Or Gerlitz --- include/spdk/sock.h | 2 +- include/spdk_internal/sock.h | 2 + module/sock/posix/posix.c | 155 +++++++++++++++++++++++++--- test/unit/lib/sock/sock.c/sock_ut.c | 6 ++ 4 files changed, 151 insertions(+), 14 deletions(-) diff --git a/include/spdk/sock.h b/include/spdk/sock.h index 2ef68449e..6bbf8dcfb 100644 --- a/include/spdk/sock.h +++ b/include/spdk/sock.h @@ -70,7 +70,7 @@ struct spdk_sock_request { */ struct __sock_request_internal { TAILQ_ENTRY(spdk_sock_request) link; - unsigned int offset; + uint32_t offset; } internal; int iovcnt; diff --git a/include/spdk_internal/sock.h b/include/spdk_internal/sock.h index 5693a6ebc..42164e57e 100644 --- a/include/spdk_internal/sock.h +++ b/include/spdk_internal/sock.h @@ -148,6 +148,8 @@ spdk_sock_request_put(struct spdk_sock *sock, struct spdk_sock_request *req, int TAILQ_REMOVE(&sock->pending_reqs, req, internal.link); + req->internal.offset = 0; + closed = sock->flags.closed; sock->cb_cnt++; req->cb_fn(req->cb_arg, err); diff --git a/module/sock/posix/posix.c b/module/sock/posix/posix.c index c8846c93c..d5922246f 100644 --- a/module/sock/posix/posix.c +++ b/module/sock/posix/posix.c @@ -35,6 +35,7 @@ #if defined(__linux__) #include +#include #elif defined(__FreeBSD__) #include #endif @@ -49,9 +50,16 @@ #define SO_SNDBUF_SIZE (2 * 1024 * 1024) #define IOV_BATCH_SIZE 64 +#if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY) +#define SPDK_ZEROCOPY +#endif + struct spdk_posix_sock { struct spdk_sock base; int fd; + + uint32_t sendmsg_idx; + bool zcopy; }; struct spdk_posix_sock_group_impl { @@ -212,6 +220,9 @@ _spdk_posix_sock_alloc(int fd) { struct spdk_posix_sock *sock; int rc; +#ifdef SPDK_ZEROCOPY + int flag; +#endif sock = calloc(1, sizeof(*sock)); if (sock == NULL) { @@ -231,6 +242,15 @@ _spdk_posix_sock_alloc(int fd) /* Not fatal */ } +#ifdef SPDK_ZEROCOPY + /* Try to turn on zero copy sends */ + flag = 1; + rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag)); + if (rc == 0) { + sock->zcopy = true; + } +#endif + return sock; } @@ -365,6 +385,11 @@ retry: return NULL; } + /* Disable zero copy for client sockets until support is added */ + if (type == SPDK_SOCK_CREATE_CONNECT) { + sock->zcopy = false; + } + return &sock->base; } @@ -424,6 +449,8 @@ spdk_posix_sock_close(struct spdk_sock *_sock) { struct spdk_posix_sock *sock = __posix_sock(_sock); + assert(TAILQ_EMPTY(&_sock->pending_reqs)); + /* If the socket fails to close, the best choice is to * leak the fd but continue to free the rest of the sock * memory. */ @@ -434,6 +461,79 @@ spdk_posix_sock_close(struct spdk_sock *_sock) return 0; } +#ifdef SPDK_ZEROCOPY +static void +_sock_check_zcopy(struct spdk_sock *sock) +{ + struct spdk_posix_sock *psock = __posix_sock(sock); + struct msghdr msgh = {}; + uint8_t buf[sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)]; + ssize_t rc; + struct sock_extended_err *serr; + struct cmsghdr *cm; + uint32_t idx; + struct spdk_sock_request *req, *treq; + bool found; + + msgh.msg_control = buf; + msgh.msg_controllen = sizeof(buf); + + while (true) { + rc = recvmsg(psock->fd, &msgh, MSG_ERRQUEUE); + + if (rc < 0) { + if (errno == EWOULDBLOCK || errno == EAGAIN) { + return; + } + + if (!TAILQ_EMPTY(&sock->pending_reqs)) { + SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries\n"); + } else { + SPDK_WARNLOG("Recvmsg yielded an error!\n"); + } + return; + } + + cm = CMSG_FIRSTHDR(&msgh); + if (cm->cmsg_level != SOL_IP || cm->cmsg_type != IP_RECVERR) { + SPDK_WARNLOG("Unexpected cmsg level or type!\n"); + return; + } + + serr = (struct sock_extended_err *)CMSG_DATA(cm); + if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) { + SPDK_WARNLOG("Unexpected extended error origin\n"); + return; + } + + /* Most of the time, the pending_reqs array is in the exact + * order we need such that all of the requests to complete are + * in order, in the front. It is guaranteed that all requests + * belonging to the same sendmsg call are sequential, so once + * we encounter one match we can stop looping as soon as a + * non-match is found. + */ + for (idx = serr->ee_info; idx <= serr->ee_data; idx++) { + found = false; + TAILQ_FOREACH_SAFE(req, &sock->pending_reqs, internal.link, treq) { + if (req->internal.offset == idx) { + found = true; + + rc = spdk_sock_request_put(sock, req, 0); + if (rc < 0) { + return; + } + + } else if (found) { + break; + } + } + + } + } +} +#endif + static int _sock_flush(struct spdk_sock *sock) { @@ -492,7 +592,14 @@ _sock_flush(struct spdk_sock *sock) /* Perform the vectored write */ msg.msg_iov = iovs; msg.msg_iovlen = iovcnt; - flags = 0; +#ifdef SPDK_ZEROCOPY + if (psock->zcopy) { + flags = MSG_ZEROCOPY; + } else +#endif + { + flags = 0; + } rc = sendmsg(psock->fd, &msg, flags); if (rc <= 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { @@ -501,6 +608,8 @@ _sock_flush(struct spdk_sock *sock) return rc; } + psock->sendmsg_idx++; + /* Consume the requests that were actually written */ req = TAILQ_FIRST(&sock->queued_reqs); while (req) { @@ -528,14 +637,23 @@ _sock_flush(struct spdk_sock *sock) } /* Handled a full request. */ - req->internal.offset = 0; spdk_sock_request_pend(sock, req); - /* The sendmsg syscall above isn't currently asynchronous, - * so it's already done. */ - retval = spdk_sock_request_put(sock, req, 0); + if (!psock->zcopy) { + /* The sendmsg syscall above isn't currently asynchronous, + * so it's already done. */ + retval = spdk_sock_request_put(sock, req, 0); + if (retval) { + break; + } + } else { + /* Re-use the offset field to hold the sendmsg call index. The + * index is 0 based, so subtract one here because we've already + * incremented above. */ + req->internal.offset = psock->sendmsg_idx - 1; + } - if (rc == 0 || retval) { + if (rc == 0) { break; } @@ -759,7 +877,8 @@ spdk_posix_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, struct struct epoll_event event; memset(&event, 0, sizeof(event)); - event.events = EPOLLIN; + /* EPOLLERR is always on even if we don't set it, but be explicit for clarity */ + event.events = EPOLLIN | EPOLLERR; event.data.ptr = sock; rc = epoll_ctl(group->fd, EPOLL_CTL_ADD, sock->fd, &event); @@ -810,7 +929,7 @@ spdk_posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_eve { struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); struct spdk_sock *sock, *tmp; - int num_events, i, rc; + int num_events, i, j, rc; #if defined(__linux__) struct epoll_event events[MAX_EVENTS_PER_POLL]; #elif defined(__FreeBSD__) @@ -838,15 +957,25 @@ spdk_posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_eve return -1; } - for (i = 0; i < num_events; i++) { + for (i = 0, j = 0; i < num_events; i++) { #if defined(__linux__) - socks[i] = events[i].data.ptr; -#elif defined(__FreeBSD__) - socks[i] = events[i].udata; +#ifdef SPDK_ZEROCOPY + if (events[i].events & EPOLLERR) { + _sock_check_zcopy(events[i].data.ptr); + } #endif + + if (events[i].events & EPOLLIN) { + socks[j++] = events[i].data.ptr; + } + +#elif defined(__FreeBSD__) + socks[j++] = events[i].udata; +#endif + } - return num_events; + return j; } static int diff --git a/test/unit/lib/sock/sock.c/sock_ut.c b/test/unit/lib/sock/sock.c/sock_ut.c index a8b8567a4..7e8345054 100644 --- a/test/unit/lib/sock/sock.c/sock_ut.c +++ b/test/unit/lib/sock/sock.c/sock_ut.c @@ -785,6 +785,12 @@ _sock_close(const char *ip, int port, char *impl_name) /* Poll the socket so the writev_async's send. The first one's * callback will close the socket. */ spdk_sock_group_poll(group); + if (ctx.called == false) { + /* Sometimes the zerocopy completion isn't posted immediately. Delay slightly + * and poll one more time. */ + usleep(1000); + spdk_sock_group_poll(group); + } CU_ASSERT(ctx.called == true); CU_ASSERT(cb_arg2 == true);