diff --git a/CHANGELOG.md b/CHANGELOG.md index 938286bf8..2bc612a5b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -108,6 +108,17 @@ bdevs is one after another. The concat bdev is extendable. When the free space o concat bdev is not enough, the user can deconstruct the concat bdev, then reconstruct it with an additional underlying bdev. +### sock + +Allow MSG_ZEROCOPY flag to be set or not according to data size, which can be enabled and +set by setting "zerocopy_threshold". zerocopy_threshold = 0 means disable this function; +zerocopy_threshold > 0 means enable it and use this value as the threshold. + +### rpc + +Introduced `zerocopy_threshold` to enable zerocopy on send for server sockets according to +data size to be flushed. + ## v22.01 ### accel diff --git a/include/spdk/sock.h b/include/spdk/sock.h index 231bb2d5b..a77b56f10 100644 --- a/include/spdk/sock.h +++ b/include/spdk/sock.h @@ -75,6 +75,9 @@ struct spdk_sock_request { void *curr_list; #endif uint32_t offset; + + /* Indicate if the whole req or part of it is sent with zerocopy */ + bool is_zcopy; } internal; int iovcnt; @@ -139,6 +142,12 @@ struct spdk_sock_impl_opts { * Enable or disable use of zero copy flow on send for client sockets. Used by posix socket module. */ bool enable_zerocopy_send_client; + + /** + * Set zerocopy threshold in bytes. A consecutive sequence of requests' iovecs that fall below this + * threshold may be sent without zerocopy flag set. + */ + uint32_t zerocopy_threshold; }; /** diff --git a/include/spdk_internal/sock.h b/include/spdk_internal/sock.h index 39360d671..d6debdd82 100644 --- a/include/spdk_internal/sock.h +++ b/include/spdk_internal/sock.h @@ -66,6 +66,7 @@ struct spdk_sock { int cb_cnt; spdk_sock_cb cb_fn; void *cb_arg; + uint32_t zerocopy_threshold; struct { uint8_t closed : 1; uint8_t reserved : 7; @@ -174,6 +175,7 @@ spdk_sock_request_put(struct spdk_sock *sock, struct spdk_sock_request *req, int #endif req->internal.offset = 0; + req->internal.is_zcopy = 0; closed = sock->flags.closed; sock->cb_cnt++; @@ -245,11 +247,12 @@ spdk_sock_abort_requests(struct spdk_sock *sock) static inline int spdk_sock_prep_reqs(struct spdk_sock *_sock, struct iovec *iovs, int index, - struct spdk_sock_request **last_req) + struct spdk_sock_request **last_req, int *flags) { int iovcnt, i; struct spdk_sock_request *req; unsigned int offset; + uint64_t total = 0; /* Gather an iov */ iovcnt = index; @@ -275,8 +278,9 @@ spdk_sock_prep_reqs(struct spdk_sock *_sock, struct iovec *iovs, int index, iovs[iovcnt].iov_base = SPDK_SOCK_REQUEST_IOV(req, i)->iov_base + offset; iovs[iovcnt].iov_len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; - iovcnt++; + total += iovs[iovcnt].iov_len; + iovcnt++; offset = 0; if (iovcnt >= IOV_BATCH_SIZE) { @@ -294,6 +298,14 @@ spdk_sock_prep_reqs(struct spdk_sock *_sock, struct iovec *iovs, int index, } end: + +#if defined(MSG_ZEROCOPY) + /* if data size < zerocopy_threshold, remove MSG_ZEROCOPY flag */ + if (total < _sock->zerocopy_threshold && flags != NULL) { + *flags = *flags & (~MSG_ZEROCOPY); + } +#endif + return iovcnt; } diff --git a/lib/sock/sock.c b/lib/sock/sock.c index b61c8bd50..f77a80861 100644 --- a/lib/sock/sock.c +++ b/lib/sock/sock.c @@ -320,6 +320,8 @@ spdk_sock_connect_ext(const char *ip, int port, char *_impl_name, struct spdk_so struct spdk_sock *sock; struct spdk_sock_opts opts_local; const char *impl_name = NULL; + struct spdk_sock_impl_opts impl_opts = {}; + size_t len; if (opts == NULL) { SPDK_ERRLOG("the opts should not be NULL pointer\n"); @@ -346,6 +348,10 @@ spdk_sock_connect_ext(const char *ip, int port, char *_impl_name, struct spdk_so sock->net_impl = impl; TAILQ_INIT(&sock->queued_reqs); TAILQ_INIT(&sock->pending_reqs); + + len = sizeof(struct spdk_sock_impl_opts); + spdk_sock_impl_get_opts(impl->name, &impl_opts, &len); + sock->zerocopy_threshold = impl_opts.zerocopy_threshold; return sock; } } @@ -407,6 +413,8 @@ struct spdk_sock * spdk_sock_accept(struct spdk_sock *sock) { struct spdk_sock *new_sock; + struct spdk_sock_impl_opts impl_opts = {}; + size_t len; new_sock = sock->net_impl->accept(sock); if (new_sock != NULL) { @@ -416,6 +424,10 @@ spdk_sock_accept(struct spdk_sock *sock) new_sock->net_impl = sock->net_impl; TAILQ_INIT(&new_sock->queued_reqs); TAILQ_INIT(&new_sock->pending_reqs); + + len = sizeof(struct spdk_sock_impl_opts); + spdk_sock_impl_get_opts(sock->net_impl->name, &impl_opts, &len); + new_sock->zerocopy_threshold = impl_opts.zerocopy_threshold; } return new_sock; @@ -850,6 +862,7 @@ spdk_sock_write_config_json(struct spdk_json_write_ctx *w) spdk_json_write_named_uint32(w, "enable_placement_id", opts.enable_placement_id); spdk_json_write_named_bool(w, "enable_zerocopy_send_server", opts.enable_zerocopy_send_server); spdk_json_write_named_bool(w, "enable_zerocopy_send_client", opts.enable_zerocopy_send_client); + spdk_json_write_named_uint32(w, "zerocopy_threshold", opts.zerocopy_threshold); spdk_json_write_object_end(w); spdk_json_write_object_end(w); } else { diff --git a/lib/sock/sock_rpc.c b/lib/sock/sock_rpc.c index 6404b91e6..8f45a6bfb 100644 --- a/lib/sock/sock_rpc.c +++ b/lib/sock/sock_rpc.c @@ -79,6 +79,7 @@ rpc_sock_impl_get_options(struct spdk_jsonrpc_request *request, spdk_json_write_named_uint32(w, "enable_placement_id", sock_opts.enable_placement_id); spdk_json_write_named_bool(w, "enable_zerocopy_send_server", sock_opts.enable_zerocopy_send_server); spdk_json_write_named_bool(w, "enable_zerocopy_send_client", sock_opts.enable_zerocopy_send_client); + spdk_json_write_named_uint32(w, "zerocopy_threshold", sock_opts.zerocopy_threshold); spdk_json_write_object_end(w); spdk_jsonrpc_end_result(request, w); free(impl_name); @@ -123,6 +124,10 @@ static const struct spdk_json_object_decoder rpc_sock_impl_set_opts_decoders[] = { "enable_zerocopy_send_client", offsetof(struct spdk_rpc_sock_impl_set_opts, sock_opts.enable_zerocopy_send_client), spdk_json_decode_bool, true + }, + { + "zerocopy_threshold", offsetof(struct spdk_rpc_sock_impl_set_opts, sock_opts.zerocopy_threshold), + spdk_json_decode_uint32, true } }; diff --git a/module/sock/posix/posix.c b/module/sock/posix/posix.c index 8f06f5337..18f4c3e8e 100644 --- a/module/sock/posix/posix.c +++ b/module/sock/posix/posix.c @@ -95,7 +95,8 @@ static struct spdk_sock_impl_opts g_spdk_posix_sock_impl_opts = { .enable_quickack = false, .enable_placement_id = PLACEMENT_NONE, .enable_zerocopy_send_server = true, - .enable_zerocopy_send_client = false + .enable_zerocopy_send_client = false, + .zerocopy_threshold = 0 }; static struct spdk_sock_map g_map = { @@ -698,14 +699,18 @@ _sock_check_zcopy(struct spdk_sock *sock) for (idx = serr->ee_info; idx <= serr->ee_data; idx++) { found = false; TAILQ_FOREACH_SAFE(req, &sock->pending_reqs, internal.link, treq) { - if (req->internal.offset == idx) { - found = true; - + if (!req->internal.is_zcopy) { + /* This wasn't a zcopy request. It was just waiting in line to complete */ + rc = spdk_sock_request_put(sock, req, 0); + if (rc < 0) { + return rc; + } + } else if (req->internal.offset == idx) { + found = true; rc = spdk_sock_request_put(sock, req, 0); if (rc < 0) { return rc; } - } else if (found) { break; } @@ -731,21 +736,13 @@ _sock_flush(struct spdk_sock *sock) ssize_t rc; unsigned int offset; size_t len; + bool is_zcopy = false; /* Can't flush from within a callback or we end up with recursive calls */ if (sock->cb_cnt > 0) { return 0; } - iovcnt = spdk_sock_prep_reqs(sock, iovs, 0, NULL); - - if (iovcnt == 0) { - return 0; - } - - /* Perform the vectored write */ - msg.msg_iov = iovs; - msg.msg_iovlen = iovcnt; #ifdef SPDK_ZEROCOPY if (psock->zcopy) { flags = MSG_ZEROCOPY | MSG_NOSIGNAL; @@ -754,6 +751,20 @@ _sock_flush(struct spdk_sock *sock) { flags = MSG_NOSIGNAL; } + + iovcnt = spdk_sock_prep_reqs(sock, iovs, 0, NULL, &flags); + if (iovcnt == 0) { + return 0; + } + +#ifdef SPDK_ZEROCOPY + is_zcopy = flags & MSG_ZEROCOPY; +#endif + + /* Perform the vectored write */ + msg.msg_iov = iovs; + msg.msg_iovlen = iovcnt; + rc = sendmsg(psock->fd, &msg, flags); if (rc <= 0) { if (errno == EAGAIN || errno == EWOULDBLOCK || (errno == ENOBUFS && psock->zcopy)) { @@ -762,7 +773,7 @@ _sock_flush(struct spdk_sock *sock) return rc; } - if (psock->zcopy) { + if (is_zcopy) { /* Handling overflow case, because we use psock->sendmsg_idx - 1 for the * req->internal.offset, so sendmsg_idx should not be zero */ if (spdk_unlikely(psock->sendmsg_idx == UINT32_MAX)) { @@ -777,6 +788,9 @@ _sock_flush(struct spdk_sock *sock) while (req) { offset = req->internal.offset; + /* req->internal.is_zcopy is true when the whole req or part of it is sent with zerocopy */ + req->internal.is_zcopy = is_zcopy; + for (i = 0; i < req->iovcnt; i++) { /* Advance by the offset first */ if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { @@ -801,7 +815,7 @@ _sock_flush(struct spdk_sock *sock) /* Handled a full request. */ spdk_sock_request_pend(sock, req); - if (!psock->zcopy) { + if (!req->internal.is_zcopy && req == TAILQ_FIRST(&sock->pending_reqs)) { /* The sendmsg syscall above isn't currently asynchronous, * so it's already done. */ retval = spdk_sock_request_put(sock, req, 0); @@ -1514,6 +1528,7 @@ posix_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len) GET_FIELD(enable_placement_id); GET_FIELD(enable_zerocopy_send_server); GET_FIELD(enable_zerocopy_send_client); + GET_FIELD(zerocopy_threshold); #undef GET_FIELD #undef FIELD_OK @@ -1546,6 +1561,7 @@ posix_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len) SET_FIELD(enable_placement_id); SET_FIELD(enable_zerocopy_send_server); SET_FIELD(enable_zerocopy_send_client); + SET_FIELD(zerocopy_threshold); #undef SET_FIELD #undef FIELD_OK diff --git a/module/sock/uring/uring.c b/module/sock/uring/uring.c index 1237163d9..a69e85cbb 100644 --- a/module/sock/uring/uring.c +++ b/module/sock/uring/uring.c @@ -79,6 +79,7 @@ struct spdk_uring_task { struct iovec iovs[IOV_BATCH_SIZE]; int iov_cnt; struct spdk_sock_request *last_req; + bool is_zcopy; STAILQ_ENTRY(spdk_uring_task) link; }; @@ -122,6 +123,7 @@ static struct spdk_sock_impl_opts g_spdk_uring_sock_impl_opts = { .enable_placement_id = PLACEMENT_NONE, .enable_zerocopy_send_server = false, .enable_zerocopy_send_client = false, + .zerocopy_threshold = 0 }; static struct spdk_sock_map g_map = { @@ -766,7 +768,7 @@ uring_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) } static int -sock_complete_reqs(struct spdk_sock *_sock, ssize_t rc) +sock_complete_reqs(struct spdk_sock *_sock, ssize_t rc, bool is_zcopy) { struct spdk_uring_sock *sock = __uring_sock(_sock); struct spdk_sock_request *req; @@ -774,7 +776,7 @@ sock_complete_reqs(struct spdk_sock *_sock, ssize_t rc) unsigned int offset; size_t len; - if (sock->zcopy) { + if (is_zcopy) { /* Handling overflow case, because we use psock->sendmsg_idx - 1 for the * req->internal.offset, so sendmsg_idx should not be zero */ if (spdk_unlikely(sock->sendmsg_idx == UINT32_MAX)) { @@ -789,6 +791,9 @@ sock_complete_reqs(struct spdk_sock *_sock, ssize_t rc) while (req) { offset = req->internal.offset; + /* req->internal.is_zcopy is true when the whole req or part of it is sent with zerocopy */ + req->internal.is_zcopy = is_zcopy; + for (i = 0; i < req->iovcnt; i++) { /* Advance by the offset first */ if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { @@ -813,7 +818,7 @@ sock_complete_reqs(struct spdk_sock *_sock, ssize_t rc) /* Handled a full request. */ spdk_sock_request_pend(_sock, req); - if (!sock->zcopy) { + if (!req->internal.is_zcopy && req == TAILQ_FIRST(&_sock->pending_reqs)) { retval = spdk_sock_request_put(_sock, req, 0); if (retval) { return retval; @@ -881,13 +886,18 @@ _sock_check_zcopy(struct spdk_sock *_sock, int status) for (idx = serr->ee_info; idx <= serr->ee_data; idx++) { found = false; TAILQ_FOREACH_SAFE(req, &_sock->pending_reqs, internal.link, treq) { - if (req->internal.offset == idx) { + if (!req->internal.is_zcopy) { + /* This wasn't a zcopy request. It was just waiting in line to complete */ + rc = spdk_sock_request_put(_sock, req, 0); + if (rc < 0) { + return rc; + } + } else if (req->internal.offset == idx) { found = true; rc = spdk_sock_request_put(_sock, req, 0); if (rc < 0) { return rc; } - } else if (found) { break; } @@ -926,13 +936,22 @@ _sock_flush(struct spdk_sock *_sock) struct spdk_uring_task *task = &sock->write_task; uint32_t iovcnt; struct io_uring_sqe *sqe; - int flags = MSG_DONTWAIT | sock->zcopy_send_flags; + int flags; if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { return; } - iovcnt = spdk_sock_prep_reqs(&sock->base, task->iovs, task->iov_cnt, &task->last_req); +#ifdef SPDK_ZEROCOPY + if (sock->zcopy) { + flags = MSG_DONTWAIT | sock->zcopy_send_flags; + } else +#endif + { + flags = MSG_DONTWAIT; + } + + iovcnt = spdk_sock_prep_reqs(&sock->base, task->iovs, task->iov_cnt, &task->last_req, &flags); if (!iovcnt) { return; } @@ -941,7 +960,9 @@ _sock_flush(struct spdk_sock *_sock) assert(sock->group != NULL); task->msg.msg_iov = task->iovs; task->msg.msg_iovlen = task->iov_cnt; - +#ifdef SPDK_ZEROCOPY + task->is_zcopy = (flags & MSG_ZEROCOPY) ? true : false; +#endif sock->group->io_queued++; sqe = io_uring_get_sqe(&sock->group->uring); @@ -1048,11 +1069,12 @@ sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max case SPDK_SOCK_TASK_WRITE: task->last_req = NULL; task->iov_cnt = 0; + task->is_zcopy = false; if (spdk_unlikely(status) < 0) { sock->connection_status = status; spdk_sock_abort_requests(&sock->base); } else { - sock_complete_reqs(&sock->base, status); + sock_complete_reqs(&sock->base, status, task->is_zcopy); } break; @@ -1156,6 +1178,7 @@ _sock_flush_client(struct spdk_sock *_sock) ssize_t rc; int flags = sock->zcopy_send_flags; int retval; + bool is_zcopy = false; /* Can't flush from within a callback or we end up with recursive calls */ if (_sock->cb_cnt > 0) { @@ -1163,7 +1186,7 @@ _sock_flush_client(struct spdk_sock *_sock) } /* Gather an iov */ - iovcnt = spdk_sock_prep_reqs(_sock, iovs, 0, NULL); + iovcnt = spdk_sock_prep_reqs(_sock, iovs, 0, NULL, &flags); if (iovcnt == 0) { return 0; } @@ -1179,7 +1202,10 @@ _sock_flush_client(struct spdk_sock *_sock) return rc; } - retval = sock_complete_reqs(_sock, rc); +#ifdef SPDK_ZEROCOPY + is_zcopy = flags & MSG_ZEROCOPY; +#endif + retval = sock_complete_reqs(_sock, rc, is_zcopy); if (retval < 0) { /* if the socket is closed, return to avoid heap-use-after-free error */ return retval; @@ -1530,6 +1556,7 @@ uring_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len) GET_FIELD(enable_placement_id); GET_FIELD(enable_zerocopy_send_server); GET_FIELD(enable_zerocopy_send_client); + GET_FIELD(zerocopy_threshold); #undef GET_FIELD #undef FIELD_OK @@ -1561,6 +1588,7 @@ uring_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len) SET_FIELD(enable_placement_id); SET_FIELD(enable_zerocopy_send_server); SET_FIELD(enable_zerocopy_send_client); + SET_FIELD(zerocopy_threshold); #undef SET_FIELD #undef FIELD_OK diff --git a/python/spdk/rpc/sock.py b/python/spdk/rpc/sock.py index 3827368a7..63533de0f 100644 --- a/python/spdk/rpc/sock.py +++ b/python/spdk/rpc/sock.py @@ -19,7 +19,8 @@ def sock_impl_set_options(client, enable_quickack=None, enable_placement_id=None, enable_zerocopy_send_server=None, - enable_zerocopy_send_client=None): + enable_zerocopy_send_client=None, + zerocopy_threshold=None): """Set parameters for the socket layer implementation. Args: @@ -31,6 +32,7 @@ def sock_impl_set_options(client, enable_placement_id: option for placement_id. 0:disable,1:incoming_napi,2:incoming_cpu (optional) enable_zerocopy_send_server: enable or disable zerocopy on send for server sockets(optional) enable_zerocopy_send_client: enable or disable zerocopy on send for client sockets(optional) + zerocopy_threshold: set zerocopy_threshold in bytes(optional) """ params = {} @@ -49,6 +51,8 @@ def sock_impl_set_options(client, params['enable_zerocopy_send_server'] = enable_zerocopy_send_server if enable_zerocopy_send_client is not None: params['enable_zerocopy_send_client'] = enable_zerocopy_send_client + if enable_dynamic_zerocopy is not None: + params['zerocopy_threshold'] = zerocopy_threshold return client.call('sock_impl_set_options', params) diff --git a/scripts/rpc.py b/scripts/rpc.py index 10ca0f044..fc50af266 100755 --- a/scripts/rpc.py +++ b/scripts/rpc.py @@ -2838,7 +2838,8 @@ Format: 'user:u1 secret:s1 muser:mu1 msecret:ms1,user:u2 secret:s2 muser:mu2 mse enable_quickack=args.enable_quickack, enable_placement_id=args.enable_placement_id, enable_zerocopy_send_server=args.enable_zerocopy_send_server, - enable_zerocopy_send_client=args.enable_zerocopy_send_client) + enable_zerocopy_send_client=args.enable_zerocopy_send_client, + zerocopy_threshold=args.zerocopy_threshold) p = subparsers.add_parser('sock_impl_set_options', help="""Set options of socket layer implementation""") p.add_argument('-i', '--impl', help='Socket implementation name, e.g. posix', required=True) @@ -2861,8 +2862,11 @@ Format: 'user:u1 secret:s1 muser:mu1 msecret:ms1,user:u2 secret:s2 muser:mu2 mse action='store_true', dest='enable_zerocopy_send_client') p.add_argument('--disable-zerocopy-send-client', help='Disable zerocopy on send for client sockets', action='store_false', dest='enable_zerocopy_send_client') + p.add_argument('--zerocopy-threshold', help='Set zerocopy_threshold in bytes', + action='store_true', dest='zerocopy_threshold') p.set_defaults(func=sock_impl_set_options, enable_recv_pipe=None, enable_quickack=None, - enable_placement_id=None, enable_zerocopy_send_server=None, enable_zerocopy_send_client=None) + enable_placement_id=None, enable_zerocopy_send_server=None, enable_zerocopy_send_client=None, + zerocopy_threshold=None) def sock_set_default_impl(args): print_json(rpc.sock.sock_set_default_impl(args.client, diff --git a/test/unit/lib/sock/uring.c/uring_ut.c b/test/unit/lib/sock/uring.c/uring_ut.c index cbd52c700..b2cd4a578 100644 --- a/test/unit/lib/sock/uring.c/uring_ut.c +++ b/test/unit/lib/sock/uring.c/uring_ut.c @@ -212,9 +212,9 @@ flush_server(void) * that is fully completed. */ spdk_sock_request_queue(sock, req1); cb_arg1 = false; - rc = spdk_sock_prep_reqs(sock, usock.write_task.iovs, 0, NULL); + rc = spdk_sock_prep_reqs(sock, usock.write_task.iovs, 0, NULL, NULL); CU_ASSERT(rc == 2); - sock_complete_reqs(sock, 128); + sock_complete_reqs(sock, 128, 0); CU_ASSERT(cb_arg1 == true); CU_ASSERT(TAILQ_EMPTY(&sock->queued_reqs)); @@ -223,9 +223,9 @@ flush_server(void) spdk_sock_request_queue(sock, req2); cb_arg1 = false; cb_arg2 = false; - rc = spdk_sock_prep_reqs(sock, usock.write_task.iovs, 0, NULL); + rc = spdk_sock_prep_reqs(sock, usock.write_task.iovs, 0, NULL, NULL); CU_ASSERT(rc == 4); - sock_complete_reqs(sock, 192); + sock_complete_reqs(sock, 192, 0); CU_ASSERT(cb_arg1 == true); CU_ASSERT(cb_arg2 == true); CU_ASSERT(TAILQ_EMPTY(&sock->queued_reqs)); @@ -234,20 +234,20 @@ flush_server(void) /* One request that is partially sent. */ spdk_sock_request_queue(sock, req1); cb_arg1 = false; - rc = spdk_sock_prep_reqs(sock, usock.write_task.iovs, 0, NULL); + rc = spdk_sock_prep_reqs(sock, usock.write_task.iovs, 0, NULL, NULL); CU_ASSERT(rc == 2); - sock_complete_reqs(sock, 92); + sock_complete_reqs(sock, 92, 0); CU_ASSERT(rc == 2); CU_ASSERT(cb_arg1 == false); CU_ASSERT(TAILQ_FIRST(&sock->queued_reqs) == req1); /* Get the second time partial sent result. */ - sock_complete_reqs(sock, 10); + sock_complete_reqs(sock, 10, 0); CU_ASSERT(cb_arg1 == false); CU_ASSERT(TAILQ_FIRST(&sock->queued_reqs) == req1); /* Data is finally sent. */ - sock_complete_reqs(sock, 26); + sock_complete_reqs(sock, 26, 0); CU_ASSERT(cb_arg1 == true); CU_ASSERT(TAILQ_EMPTY(&sock->queued_reqs));