diff --git a/module/sock/uring/uring.c b/module/sock/uring/uring.c index 3d8d66312..4e00c7bcb 100644 --- a/module/sock/uring/uring.c +++ b/module/sock/uring/uring.c @@ -34,6 +34,7 @@ #include "spdk/stdinc.h" #include "spdk/config.h" +#include #include #include @@ -51,13 +52,19 @@ #define MAX_TMPBUF 1024 #define PORTNUMLEN 32 #define SPDK_SOCK_GROUP_QUEUE_DEPTH 4096 +#define SPDK_SOCK_CMG_INFO_SIZE (sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)) enum spdk_sock_task_type { SPDK_SOCK_TASK_POLLIN = 0, + SPDK_SOCK_TASK_RECV, SPDK_SOCK_TASK_WRITE, SPDK_SOCK_TASK_CANCEL, }; +#if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY) +#define SPDK_ZEROCOPY +#endif + enum spdk_uring_sock_task_status { SPDK_URING_SOCK_TASK_NOT_IN_USE = 0, SPDK_URING_SOCK_TASK_IN_PROCESS, @@ -77,16 +84,21 @@ struct spdk_uring_task { struct spdk_uring_sock { struct spdk_sock base; int fd; + uint32_t sendmsg_idx; struct spdk_uring_sock_group_impl *group; struct spdk_uring_task write_task; + struct spdk_uring_task recv_task; struct spdk_uring_task pollin_task; struct spdk_uring_task cancel_task; struct spdk_pipe *recv_pipe; void *recv_buf; int recv_buf_sz; + bool zcopy; bool pending_recv; + int zcopy_send_flags; int connection_status; int placement_id; + uint8_t buf[SPDK_SOCK_CMG_INFO_SIZE]; TAILQ_ENTRY(spdk_uring_sock) link; }; @@ -107,6 +119,8 @@ static struct spdk_sock_impl_opts g_spdk_uring_sock_impl_opts = { .enable_recv_pipe = true, .enable_quickack = false, .enable_placement_id = PLACEMENT_NONE, + .enable_zerocopy_send_server = false, + .enable_zerocopy_send_client = false, }; static struct spdk_sock_map g_map = { @@ -346,7 +360,7 @@ uring_sock_set_sendbuf(struct spdk_sock *_sock, int sz) } static struct spdk_uring_sock * -uring_sock_alloc(int fd) +uring_sock_alloc(int fd, bool enable_zero_copy) { struct spdk_uring_sock *sock; #if defined(__linux__) @@ -374,6 +388,18 @@ uring_sock_alloc(int fd) spdk_sock_get_placement_id(sock->fd, g_spdk_uring_sock_impl_opts.enable_placement_id, &sock->placement_id); +#ifdef SPDK_ZEROCOPY + /* Try to turn on zero copy sends */ + flag = 1; + + if (enable_zero_copy) { + rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag)); + if (rc == 0) { + sock->zcopy = true; + sock->zcopy_send_flags = MSG_ZEROCOPY; + } + } +#endif #endif return sock; @@ -392,6 +418,10 @@ uring_sock_create(const char *ip, int port, int fd, flag; int val = 1; int rc; + bool enable_zcopy_impl_opts = false; + bool enable_zcopy_user_opts = true; + + assert(opts != NULL); if (ip == NULL) { return NULL; @@ -506,6 +536,7 @@ retry: fd = -1; break; } + enable_zcopy_impl_opts = g_spdk_uring_sock_impl_opts.enable_zerocopy_send_server; } else if (type == SPDK_SOCK_CREATE_CONNECT) { rc = connect(fd, res->ai_addr, res->ai_addrlen); if (rc != 0) { @@ -515,6 +546,8 @@ retry: fd = -1; continue; } + + enable_zcopy_impl_opts = g_spdk_uring_sock_impl_opts.enable_zerocopy_send_client; } flag = fcntl(fd, F_GETFL); @@ -532,7 +565,8 @@ retry: return NULL; } - sock = uring_sock_alloc(fd); + enable_zcopy_user_opts = opts->zcopy; + sock = uring_sock_alloc(fd, enable_zcopy_user_opts && enable_zcopy_impl_opts); if (sock == NULL) { SPDK_ERRLOG("sock allocation failed\n"); close(fd); @@ -595,7 +629,7 @@ uring_sock_accept(struct spdk_sock *_sock) } #endif - new_sock = uring_sock_alloc(fd); + new_sock = uring_sock_alloc(fd, sock->zcopy); if (new_sock == NULL) { close(fd); return NULL; @@ -745,11 +779,22 @@ uring_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) static int sock_complete_reqs(struct spdk_sock *_sock, ssize_t rc) { + struct spdk_uring_sock *sock = __uring_sock(_sock); struct spdk_sock_request *req; int i, retval; unsigned int offset; size_t len; + if (sock->zcopy) { + /* Handling overflow case, because we use psock->sendmsg_idx - 1 for the + * req->internal.offset, so sendmsg_idx should not be zero */ + if (spdk_unlikely(sock->sendmsg_idx == UINT32_MAX)) { + sock->sendmsg_idx = 1; + } else { + sock->sendmsg_idx++; + } + } + /* Consume the requests that were actually written */ req = TAILQ_FIRST(&_sock->queued_reqs); while (req) { @@ -779,9 +824,16 @@ sock_complete_reqs(struct spdk_sock *_sock, ssize_t rc) /* Handled a full request. */ spdk_sock_request_pend(_sock, req); - retval = spdk_sock_request_put(_sock, req, 0); - if (retval) { - return retval; + if (!sock->zcopy) { + retval = spdk_sock_request_put(_sock, req, 0); + if (retval) { + return retval; + } + } else { + /* Re-use the offset field to hold the sendmsg call index. The + * index is 0 based, so subtract one here because we've already + * incremented above. */ + req->internal.offset = sock->sendmsg_idx - 1; } if (rc == 0) { @@ -794,6 +846,89 @@ sock_complete_reqs(struct spdk_sock *_sock, ssize_t rc) return 0; } +#ifdef SPDK_ZEROCOPY +static int +_sock_check_zcopy(struct spdk_sock *_sock, int status) +{ + struct spdk_uring_sock *sock = __uring_sock(_sock); + ssize_t rc; + struct sock_extended_err *serr; + struct cmsghdr *cm; + uint32_t idx; + struct spdk_sock_request *req, *treq; + bool found; + + assert(sock->zcopy == true); + if (spdk_unlikely(status) < 0) { + if (!TAILQ_EMPTY(&_sock->pending_reqs)) { + SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries, status =%d\n", + status); + } else { + SPDK_WARNLOG("Recvmsg yielded an error!\n"); + } + return 0; + } + + cm = CMSG_FIRSTHDR(&sock->recv_task.msg); + if (cm->cmsg_level != SOL_IP || cm->cmsg_type != IP_RECVERR) { + SPDK_WARNLOG("Unexpected cmsg level or type!\n"); + return 0; + } + + serr = (struct sock_extended_err *)CMSG_DATA(cm); + if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) { + SPDK_WARNLOG("Unexpected extended error origin\n"); + return 0; + } + + /* Most of the time, the pending_reqs array is in the exact + * order we need such that all of the requests to complete are + * in order, in the front. It is guaranteed that all requests + * belonging to the same sendmsg call are sequential, so once + * we encounter one match we can stop looping as soon as a + * non-match is found. + */ + for (idx = serr->ee_info; idx <= serr->ee_data; idx++) { + found = false; + TAILQ_FOREACH_SAFE(req, &_sock->pending_reqs, internal.link, treq) { + if (req->internal.offset == idx) { + found = true; + rc = spdk_sock_request_put(_sock, req, 0); + if (rc < 0) { + return rc; + } + + } else if (found) { + break; + } + } + } + + return 0; +} + +static void +_sock_prep_recv(struct spdk_sock *_sock) +{ + struct spdk_uring_sock *sock = __uring_sock(_sock); + struct spdk_uring_task *task = &sock->recv_task; + struct io_uring_sqe *sqe; + + if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { + return; + } + + assert(sock->group != NULL); + sock->group->io_queued++; + + sqe = io_uring_get_sqe(&sock->group->uring); + io_uring_prep_recvmsg(sqe, sock->fd, &task->msg, MSG_ERRQUEUE); + io_uring_sqe_set_data(sqe, task); + task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; +} + +#endif + static void _sock_flush(struct spdk_sock *_sock) { @@ -801,6 +936,7 @@ _sock_flush(struct spdk_sock *_sock) struct spdk_uring_task *task = &sock->write_task; uint32_t iovcnt; struct io_uring_sqe *sqe; + int flags = MSG_DONTWAIT | sock->zcopy_send_flags; if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { return; @@ -819,7 +955,7 @@ _sock_flush(struct spdk_sock *_sock) sock->group->io_queued++; sqe = io_uring_get_sqe(&sock->group->uring); - io_uring_prep_sendmsg(sqe, sock->fd, &sock->write_task.msg, 0); + io_uring_prep_sendmsg(sqe, sock->fd, &sock->write_task.msg, flags); io_uring_sqe_set_data(sqe, task); task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; } @@ -832,7 +968,7 @@ _sock_prep_pollin(struct spdk_sock *_sock) struct io_uring_sqe *sqe; /* Do not prepare pollin event */ - if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS || sock->pending_recv) { + if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS || (sock->pending_recv && !sock->zcopy)) { return; } @@ -840,7 +976,7 @@ _sock_prep_pollin(struct spdk_sock *_sock) sock->group->io_queued++; sqe = io_uring_get_sqe(&sock->group->uring); - io_uring_prep_poll_add(sqe, sock->fd, POLLIN); + io_uring_prep_poll_add(sqe, sock->fd, POLLIN | POLLERR); io_uring_sqe_set_data(sqe, task); task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; } @@ -899,13 +1035,16 @@ sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max task->status = SPDK_URING_SOCK_TASK_NOT_IN_USE; if (spdk_unlikely(status <= 0)) { - if (status == -EAGAIN || status == -EWOULDBLOCK) { + if (status == -EAGAIN || status == -EWOULDBLOCK || (status == -ENOBUFS && sock->zcopy)) { continue; } } switch (task->type) { case SPDK_SOCK_TASK_POLLIN: + if ((status & POLLERR) == POLLERR) { + _sock_prep_recv(&sock->base); + } if ((status & POLLIN) == POLLIN) { if (sock->base.cb_fn != NULL && sock->pending_recv == false) { @@ -915,7 +1054,6 @@ sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max } break; case SPDK_SOCK_TASK_WRITE: - assert(TAILQ_EMPTY(&sock->base.pending_reqs)); task->last_req = NULL; task->iov_cnt = 0; if (spdk_unlikely(status) < 0) { @@ -926,6 +1064,15 @@ sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max } break; +#ifdef SPDK_ZEROCOPY + case SPDK_SOCK_TASK_RECV: + if (spdk_unlikely(status == -ECANCELED)) { + sock->connection_status = status; + break; + } + _sock_check_zcopy(&sock->base, status); + break; +#endif case SPDK_SOCK_TASK_CANCEL: /* Do nothing */ break; @@ -1002,6 +1149,7 @@ _sock_flush_client(struct spdk_sock *_sock) struct iovec iovs[IOV_BATCH_SIZE]; int iovcnt; ssize_t rc; + int flags = sock->zcopy_send_flags; /* Can't flush from within a callback or we end up with recursive calls */ if (_sock->cb_cnt > 0) { @@ -1017,7 +1165,7 @@ _sock_flush_client(struct spdk_sock *_sock) /* Perform the vectored write */ msg.msg_iov = iovs; msg.msg_iovlen = iovcnt; - rc = sendmsg(sock->fd, &msg, 0); + rc = sendmsg(sock->fd, &msg, flags); if (rc <= 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { return 0; @@ -1027,6 +1175,12 @@ _sock_flush_client(struct spdk_sock *_sock) sock_complete_reqs(_sock, rc); +#ifdef SPDK_ZEROCOPY + if (sock->zcopy && !TAILQ_EMPTY(&_sock->pending_reqs)) { + _sock_check_zcopy(_sock, 0); + } +#endif + return 0; } @@ -1192,6 +1346,11 @@ uring_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, sock->pollin_task.sock = sock; sock->pollin_task.type = SPDK_SOCK_TASK_POLLIN; + sock->recv_task.sock = sock; + sock->recv_task.type = SPDK_SOCK_TASK_RECV; + sock->recv_task.msg.msg_control = sock->buf; + sock->recv_task.msg.msg_controllen = sizeof(sock->buf); + sock->cancel_task.sock = sock; sock->cancel_task.type = SPDK_SOCK_TASK_CANCEL; @@ -1276,6 +1435,16 @@ uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, } } + if (sock->recv_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { + _sock_prep_cancel_task(_sock, &sock->recv_task); + /* Since spdk_sock_group_remove_sock is not asynchronous interface, so + * currently can use a while loop here. */ + while ((sock->recv_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) || + (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) { + uring_sock_group_impl_poll(_group, 32, NULL); + } + } + if (sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { _sock_prep_cancel_task(_sock, &sock->pollin_task); /* Since spdk_sock_group_remove_sock is not asynchronous interface, so @@ -1285,7 +1454,6 @@ uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, uring_sock_group_impl_poll(_group, 32, NULL); } } - if (sock->pending_recv) { TAILQ_REMOVE(&group->pending_recv, sock, link); sock->pending_recv = false; @@ -1344,6 +1512,8 @@ uring_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len) GET_FIELD(enable_recv_pipe); GET_FIELD(enable_quickack); GET_FIELD(enable_placement_id); + GET_FIELD(enable_zerocopy_send_server); + GET_FIELD(enable_zerocopy_send_client); #undef GET_FIELD #undef FIELD_OK @@ -1373,6 +1543,8 @@ uring_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len) SET_FIELD(enable_recv_pipe); SET_FIELD(enable_quickack); SET_FIELD(enable_placement_id); + SET_FIELD(enable_zerocopy_send_server); + SET_FIELD(enable_zerocopy_send_client); #undef SET_FIELD #undef FIELD_OK