From 11e25ed27e84038c8a698afa443912037d693330 Mon Sep 17 00:00:00 2001 From: Ziye Yang Date: Wed, 18 Mar 2020 08:27:50 +0800 Subject: [PATCH] uring: Add the Internally buffer reads support Do large reads from the socket and buffer into a pipe. Signed-off-by: Ziye Yang Change-Id: Ibce74917a1cd7248ff1a325204c0d338cc6a3470 Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/1349 Community-CI: Mellanox Build Bot Tested-by: SPDK CI Jenkins Reviewed-by: Shuhei Matsumoto Reviewed-by: Ben Walker --- module/sock/uring/uring.c | 235 +++++++++++++++++++++++++++++++++++--- 1 file changed, 220 insertions(+), 15 deletions(-) diff --git a/module/sock/uring/uring.c b/module/sock/uring/uring.c index 5835e0ad9..febd4656a 100644 --- a/module/sock/uring/uring.c +++ b/module/sock/uring/uring.c @@ -40,6 +40,7 @@ #include "spdk/barrier.h" #include "spdk/likely.h" #include "spdk/log.h" +#include "spdk/pipe.h" #include "spdk/sock.h" #include "spdk/string.h" #include "spdk/util.h" @@ -82,6 +83,11 @@ struct spdk_uring_sock { struct spdk_uring_task write_task; struct spdk_uring_task pollin_task; int outstanding_io; + struct spdk_pipe *recv_pipe; + void *recv_buf; + int recv_buf_sz; + bool pending_recv; + TAILQ_ENTRY(spdk_uring_sock) link; }; struct spdk_uring_sock_group_impl { @@ -90,6 +96,7 @@ struct spdk_uring_sock_group_impl { uint32_t io_inflight; uint32_t io_queued; uint32_t io_avail; + TAILQ_HEAD(, spdk_uring_sock) pending_recv; }; #define SPDK_URING_SOCK_REQUEST_IOV(req) ((struct iovec *)((uint8_t *)req + sizeof(struct spdk_sock_request))) @@ -202,6 +209,70 @@ enum spdk_uring_sock_create_type { SPDK_SOCK_CREATE_CONNECT, }; +static int +spdk_uring_sock_alloc_pipe(struct spdk_uring_sock *sock, int sz) +{ + uint8_t *new_buf; + struct spdk_pipe *new_pipe; + struct iovec siov[2]; + struct iovec diov[2]; + int sbytes; + ssize_t bytes; + + if (sock->recv_buf_sz == sz) { + return 0; + } + + /* If the new size is 0, just free the pipe */ + if (sz == 0) { + spdk_pipe_destroy(sock->recv_pipe); + free(sock->recv_buf); + sock->recv_pipe = NULL; + sock->recv_buf = NULL; + return 0; + } + + /* Round up to next 64 byte multiple */ + new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t)); + if (!new_buf) { + SPDK_ERRLOG("socket recv buf allocation failed\n"); + return -ENOMEM; + } + + new_pipe = spdk_pipe_create(new_buf, sz + 1); + if (new_pipe == NULL) { + SPDK_ERRLOG("socket pipe allocation failed\n"); + free(new_buf); + return -ENOMEM; + } + + if (sock->recv_pipe != NULL) { + /* Pull all of the data out of the old pipe */ + sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); + if (sbytes > sz) { + /* Too much data to fit into the new pipe size */ + spdk_pipe_destroy(new_pipe); + free(new_buf); + return -EINVAL; + } + + sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov); + assert(sbytes == sz); + + bytes = spdk_iovcpy(siov, 2, diov, 2); + spdk_pipe_writer_advance(new_pipe, bytes); + + spdk_pipe_destroy(sock->recv_pipe); + free(sock->recv_buf); + } + + sock->recv_buf_sz = sz; + sock->recv_buf = new_buf; + sock->recv_pipe = new_pipe; + + return 0; +} + static int spdk_uring_sock_set_recvbuf(struct spdk_sock *_sock, int sz) { @@ -210,6 +281,16 @@ spdk_uring_sock_set_recvbuf(struct spdk_sock *_sock, int sz) assert(sock != NULL); +#ifndef __aarch64__ + /* On ARM systems, this buffering does not help. Skip it. */ + /* The size of the pipe is purely derived from benchmarks. It seems to work well. */ + rc = spdk_uring_sock_alloc_pipe(sock, sz); + if (rc) { + SPDK_ERRLOG("unable to allocate sufficient recvbuf with sz=%d on sock=%p\n", sz, _sock); + return rc; + } +#endif + if (sz < SO_RCVBUF_SIZE) { sz = SO_RCVBUF_SIZE; } @@ -254,7 +335,6 @@ _spdk_uring_sock_alloc(int fd) } sock->fd = fd; - return sock; } @@ -469,6 +549,9 @@ spdk_uring_sock_close(struct spdk_sock *_sock) assert(TAILQ_EMPTY(&_sock->pending_reqs)); assert(sock->group == NULL); + + spdk_pipe_destroy(sock->recv_pipe); + free(sock->recv_buf); rc = close(sock->fd); if (rc == 0) { free(sock); @@ -478,19 +561,108 @@ spdk_uring_sock_close(struct spdk_sock *_sock) } static ssize_t -spdk_uring_sock_recv(struct spdk_sock *_sock, void *buf, size_t len) +spdk_uring_sock_recv_from_pipe(struct spdk_uring_sock *sock, struct iovec *diov, int diovcnt) { - struct spdk_uring_sock *sock = __uring_sock(_sock); + struct iovec siov[2]; + int sbytes; + ssize_t bytes; + struct spdk_uring_sock_group_impl *group; - return recv(sock->fd, buf, len, MSG_DONTWAIT); + sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); + if (sbytes < 0) { + errno = EINVAL; + return -1; + } else if (sbytes == 0) { + errno = EAGAIN; + return -1; + } + + bytes = spdk_iovcpy(siov, 2, diov, diovcnt); + + if (bytes == 0) { + /* The only way this happens is if diov is 0 length */ + errno = EINVAL; + return -1; + } + + spdk_pipe_reader_advance(sock->recv_pipe, bytes); + + /* If we drained the pipe, take it off the level-triggered list */ + if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { + group = __uring_group_impl(sock->base.group_impl); + TAILQ_REMOVE(&group->pending_recv, sock, link); + sock->pending_recv = false; + } + + return bytes; +} + +static inline ssize_t +_spdk_uring_sock_read(struct spdk_uring_sock *sock) +{ + struct iovec iov[2]; + int bytes; + struct spdk_uring_sock_group_impl *group; + + bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov); + + if (bytes > 0) { + bytes = readv(sock->fd, iov, 2); + if (bytes > 0) { + spdk_pipe_writer_advance(sock->recv_pipe, bytes); + if (sock->base.group_impl) { + group = __uring_group_impl(sock->base.group_impl); + TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); + sock->pending_recv = true; + } + } + } + + return bytes; } static ssize_t spdk_uring_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) { struct spdk_uring_sock *sock = __uring_sock(_sock); + int rc, i; + size_t len; - return readv(sock->fd, iov, iovcnt); + if (sock->recv_pipe == NULL) { + return readv(sock->fd, iov, iovcnt); + } + + len = 0; + for (i = 0; i < iovcnt; i++) { + len += iov[i].iov_len; + } + + if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { + /* If the user is receiving a sufficiently large amount of data, + * receive directly to their buffers. */ + if (len >= 1024) { + return readv(sock->fd, iov, iovcnt); + } + + /* Otherwise, do a big read into our pipe */ + rc = _spdk_uring_sock_read(sock); + if (rc <= 0) { + return rc; + } + } + + return spdk_uring_sock_recv_from_pipe(sock, iov, iovcnt); +} + +static ssize_t +spdk_uring_sock_recv(struct spdk_sock *sock, void *buf, size_t len) +{ + struct iovec iov[1]; + + iov[0].iov_base = buf; + iov[0].iov_len = len; + + return spdk_uring_sock_readv(sock, iov, 1); } static ssize_t @@ -649,7 +821,8 @@ _sock_prep_pollin(struct spdk_sock *_sock) struct spdk_uring_task *task = &sock->pollin_task; struct io_uring_sqe *sqe; - if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { + /* Do not prepare pollin event */ + if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS || sock->pending_recv) { return; } @@ -663,18 +836,17 @@ _sock_prep_pollin(struct spdk_sock *_sock) } static int -spdk_sock_uring_group_reap(struct io_uring *ring, int max, +spdk_sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max_read_events, struct spdk_sock **socks) { int i, count, ret; struct io_uring_cqe *cqe; - struct spdk_uring_sock *sock; + struct spdk_uring_sock *sock, *tmp; struct spdk_uring_task *task; int status; - count = 0; for (i = 0; i < max; i++) { - ret = io_uring_peek_cqe(ring, &cqe); + ret = io_uring_peek_cqe(&group->uring, &cqe); if (ret != 0) { break; } @@ -688,10 +860,11 @@ spdk_sock_uring_group_reap(struct io_uring *ring, int max, sock = task->sock; assert(sock != NULL); assert(sock->group != NULL); + assert(sock->group == group); sock->group->io_inflight--; sock->group->io_avail++; status = cqe->res; - io_uring_cqe_seen(ring, cqe); + io_uring_cqe_seen(&group->uring, cqe); task->status = SPDK_URING_SOCK_TASK_NOT_IN_USE; @@ -705,8 +878,9 @@ spdk_sock_uring_group_reap(struct io_uring *ring, int max, case SPDK_SOCK_TASK_POLLIN: if ((status & POLLIN) == POLLIN) { if ((socks != NULL) && (sock->base.cb_fn != NULL)) { - socks[count] = &sock->base; - count++; + assert(sock->pending_recv == false); + sock->pending_recv = true; + TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); } } else { SPDK_UNREACHABLE(); @@ -736,6 +910,29 @@ spdk_sock_uring_group_reap(struct io_uring *ring, int max, } } + count = 0; + TAILQ_FOREACH_SAFE(sock, &group->pending_recv, link, tmp) { + if (count == max_read_events) { + break; + } + + socks[count++] = &sock->base; + } + + /* Cycle the pending_recv list so that each time we poll things aren't + * in the same order. */ + for (i = 0; i < count; i++) { + sock = __uring_sock(socks[i]); + + TAILQ_REMOVE(&group->pending_recv, sock, link); + + if (sock->recv_pipe == NULL || spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { + sock->pending_recv = false; + } else { + TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); + } + } + return count; } @@ -928,6 +1125,8 @@ spdk_uring_sock_group_impl_create(void) return NULL; } + TAILQ_INIT(&group_impl->pending_recv); + return &group_impl->base; } @@ -953,6 +1152,7 @@ spdk_uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) { struct spdk_uring_sock *sock = __uring_sock(_sock); + struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { sock->outstanding_io++; @@ -962,6 +1162,12 @@ spdk_uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, sock->outstanding_io++; } + if ((sock->recv_pipe != NULL) && + spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0) { + TAILQ_REMOVE(&group->pending_recv, sock, link); + sock->pending_recv = false; + } + if (!sock->outstanding_io) { sock->group = NULL; } @@ -1001,8 +1207,7 @@ spdk_uring_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_eve count = 0; to_complete = group->io_inflight; if (to_complete > 0) { - to_complete = spdk_min(to_complete, max_events); - count = spdk_sock_uring_group_reap(&group->uring, to_complete, socks); + count = spdk_sock_uring_group_reap(group, to_complete, max_events, socks); } return count;