sock: introduce dynamic zerocopy according to data size

MSG_ZEROCOPY is not always effective as mentioned in
https://www.kernel.org/doc/html/v4.15/networking/msg_zerocopy.html.

Currently in spdk, once we enable sendmsg zerocopy, then all data
transferred through _sock_flush are sent with zerocopy, and vice
versa. Here dynamic zerocopy is introduced to allow data sent with
MSG_ZEROCOPY or not according to its size, which can be enabled by
setting "enable_dynamic_zerocopy" as true.

Test with 16 P4610 NVMe SSD, 2 initiators, target's and initiators'
configurations are the same as spdk report:
https://ci.spdk.io/download/performance-reports/SPDK_tcp_perf_report_2104.pdf

For posix socket, rw_percent=0(randwrite), it has 1.9%~8.3% performance boost
tested with target 1~40 cpu cores and qdepth=128,256,512. And it has no obvious
influence when read percentage is greater than 50%.

For uring socket, rw_percent=0(randwrite), it has 1.8%~7.9% performance boost
tested with target 1~40 cpu cores and qdepth=128,256,512. And it still has
1%~7% improvement when read percentage is greater than 50%.

The following is part of the detailed data.

posix:
qdepth=128
rw_percent      0             |           30
cpu  origin  thisPatch  opt   | origin  thisPatch opt
1	286.5	298.5	4.19%		 307	304.15	-0.93%
4	1042.5	1107	6.19%		1135.5	1136	0.04%
8	1952.5	2058	5.40%		2170.5	2170.5	0.00%
12	2658.5	2879	8.29%		3042	3046	0.13%
16	3247.5	3460.5	6.56%		3793.5	3775	-0.49%
24	4232.5	4459.5	5.36%		4614.5	4756.5	3.08%
32	4810	5095	5.93%		4488	4845	7.95%
40	5306.5	5435	2.42%		4427.5	4902	10.72%

qdepth=512
rw_percent      0             |           30
cpu  origin  thisPatch  opt   | origin  thisPatch opt
1    275	 287	4.36%		294.4	295.45	0.36%
4	 979	1041	6.33%		1073	1083.5	0.98%
8	1822.5	1914.5	5.05%		2030.5	2018.5	-0.59%
12	2441	2598.5	6.45%		2808.5	2779.5	-1.03%
16	2920.5	3109.5	6.47%		3455	3411.5	-1.26%
24	3709	3972.5	7.10%		4483.5	4502.5	0.42%
32	4225.5	4532.5	7.27%		4463.5	4733	6.04%
40	4790.5	4884.5	1.96%		4427	4904.5	10.79%

uring:
qdepth=128
rw_percent      0             |           30
cpu  origin  thisPatch  opt   | origin  thisPatch opt
1	270.5	287.5	6.28%		295.75	304.75	3.04%
4	1018.5	1089.5	6.97%		1119.5	1156.5	3.31%
8	1907	2055	7.76%		2127	2211.5	3.97%
12	2614	2801	7.15%		2982.5	3061.5	2.65%
16	3169.5	3420	7.90%		3654.5	3781.5	3.48%
24	4109.5	4414	7.41%		4691.5	4750.5	1.26%
32	4752.5	4908	3.27%		4494	4825.5	7.38%
40	5233.5	5327	1.79%		4374.5	4891	11.81%

qdepth=512
rw_percent      0             |           30
cpu  origin  thisPatch  opt   | origin  thisPatch opt
1	259.95	 276	6.17%		286.65	294.8	2.84%
4	955 	1021	6.91%		1070.5	1100	2.76%
8	1772	1903.5	7.42%		1992.5	2077.5	4.27%
12	2380.5	2543.5	6.85%		2752.5	2860	3.91%
16	2920.5	3099	6.11%		3391.5	3540	4.38%
24	3697	3912	5.82%		4401	4637	5.36%
32	4256.5	4454.5	4.65%		4516	4777	5.78%
40	4707	4968.5	5.56%		4400.5	4933	12.10%

Signed-off-by: Richael Zhuang <richael.zhuang@arm.com>
Change-Id: I730dcf89ed2bf3efe91586421a89045fc11c81f0
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/12210
Community-CI: Broadcom CI <spdk-ci.pdl@broadcom.com>
Community-CI: Mellanox Build Bot
Tested-by: SPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: Aleksey Marchuk <alexeymar@nvidia.com>
Reviewed-by: Ben Walker <benjamin.walker@intel.com>
This commit is contained in:
Richael Zhuang 2021-12-07 17:37:02 +08:00 committed by Tomasz Zawadzki
parent eef6af95d1
commit 9bff828f99
10 changed files with 142 additions and 40 deletions

View File

@ -108,6 +108,17 @@ bdevs is one after another. The concat bdev is extendable. When the free space o
concat bdev is not enough, the user can deconstruct the concat bdev, then reconstruct it concat bdev is not enough, the user can deconstruct the concat bdev, then reconstruct it
with an additional underlying bdev. with an additional underlying bdev.
### sock
Allow MSG_ZEROCOPY flag to be set or not according to data size, which can be enabled and
set by setting "zerocopy_threshold". zerocopy_threshold = 0 means disable this function;
zerocopy_threshold > 0 means enable it and use this value as the threshold.
### rpc
Introduced `zerocopy_threshold` to enable zerocopy on send for server sockets according to
data size to be flushed.
## v22.01 ## v22.01
### accel ### accel

View File

@ -75,6 +75,9 @@ struct spdk_sock_request {
void *curr_list; void *curr_list;
#endif #endif
uint32_t offset; uint32_t offset;
/* Indicate if the whole req or part of it is sent with zerocopy */
bool is_zcopy;
} internal; } internal;
int iovcnt; int iovcnt;
@ -139,6 +142,12 @@ struct spdk_sock_impl_opts {
* Enable or disable use of zero copy flow on send for client sockets. Used by posix socket module. * Enable or disable use of zero copy flow on send for client sockets. Used by posix socket module.
*/ */
bool enable_zerocopy_send_client; bool enable_zerocopy_send_client;
/**
* Set zerocopy threshold in bytes. A consecutive sequence of requests' iovecs that fall below this
* threshold may be sent without zerocopy flag set.
*/
uint32_t zerocopy_threshold;
}; };
/** /**

View File

@ -66,6 +66,7 @@ struct spdk_sock {
int cb_cnt; int cb_cnt;
spdk_sock_cb cb_fn; spdk_sock_cb cb_fn;
void *cb_arg; void *cb_arg;
uint32_t zerocopy_threshold;
struct { struct {
uint8_t closed : 1; uint8_t closed : 1;
uint8_t reserved : 7; uint8_t reserved : 7;
@ -174,6 +175,7 @@ spdk_sock_request_put(struct spdk_sock *sock, struct spdk_sock_request *req, int
#endif #endif
req->internal.offset = 0; req->internal.offset = 0;
req->internal.is_zcopy = 0;
closed = sock->flags.closed; closed = sock->flags.closed;
sock->cb_cnt++; sock->cb_cnt++;
@ -245,11 +247,12 @@ spdk_sock_abort_requests(struct spdk_sock *sock)
static inline int static inline int
spdk_sock_prep_reqs(struct spdk_sock *_sock, struct iovec *iovs, int index, spdk_sock_prep_reqs(struct spdk_sock *_sock, struct iovec *iovs, int index,
struct spdk_sock_request **last_req) struct spdk_sock_request **last_req, int *flags)
{ {
int iovcnt, i; int iovcnt, i;
struct spdk_sock_request *req; struct spdk_sock_request *req;
unsigned int offset; unsigned int offset;
uint64_t total = 0;
/* Gather an iov */ /* Gather an iov */
iovcnt = index; iovcnt = index;
@ -275,8 +278,9 @@ spdk_sock_prep_reqs(struct spdk_sock *_sock, struct iovec *iovs, int index,
iovs[iovcnt].iov_base = SPDK_SOCK_REQUEST_IOV(req, i)->iov_base + offset; iovs[iovcnt].iov_base = SPDK_SOCK_REQUEST_IOV(req, i)->iov_base + offset;
iovs[iovcnt].iov_len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; iovs[iovcnt].iov_len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
iovcnt++;
total += iovs[iovcnt].iov_len;
iovcnt++;
offset = 0; offset = 0;
if (iovcnt >= IOV_BATCH_SIZE) { if (iovcnt >= IOV_BATCH_SIZE) {
@ -294,6 +298,14 @@ spdk_sock_prep_reqs(struct spdk_sock *_sock, struct iovec *iovs, int index,
} }
end: end:
#if defined(MSG_ZEROCOPY)
/* if data size < zerocopy_threshold, remove MSG_ZEROCOPY flag */
if (total < _sock->zerocopy_threshold && flags != NULL) {
*flags = *flags & (~MSG_ZEROCOPY);
}
#endif
return iovcnt; return iovcnt;
} }

View File

@ -320,6 +320,8 @@ spdk_sock_connect_ext(const char *ip, int port, char *_impl_name, struct spdk_so
struct spdk_sock *sock; struct spdk_sock *sock;
struct spdk_sock_opts opts_local; struct spdk_sock_opts opts_local;
const char *impl_name = NULL; const char *impl_name = NULL;
struct spdk_sock_impl_opts impl_opts = {};
size_t len;
if (opts == NULL) { if (opts == NULL) {
SPDK_ERRLOG("the opts should not be NULL pointer\n"); SPDK_ERRLOG("the opts should not be NULL pointer\n");
@ -346,6 +348,10 @@ spdk_sock_connect_ext(const char *ip, int port, char *_impl_name, struct spdk_so
sock->net_impl = impl; sock->net_impl = impl;
TAILQ_INIT(&sock->queued_reqs); TAILQ_INIT(&sock->queued_reqs);
TAILQ_INIT(&sock->pending_reqs); TAILQ_INIT(&sock->pending_reqs);
len = sizeof(struct spdk_sock_impl_opts);
spdk_sock_impl_get_opts(impl->name, &impl_opts, &len);
sock->zerocopy_threshold = impl_opts.zerocopy_threshold;
return sock; return sock;
} }
} }
@ -407,6 +413,8 @@ struct spdk_sock *
spdk_sock_accept(struct spdk_sock *sock) spdk_sock_accept(struct spdk_sock *sock)
{ {
struct spdk_sock *new_sock; struct spdk_sock *new_sock;
struct spdk_sock_impl_opts impl_opts = {};
size_t len;
new_sock = sock->net_impl->accept(sock); new_sock = sock->net_impl->accept(sock);
if (new_sock != NULL) { if (new_sock != NULL) {
@ -416,6 +424,10 @@ spdk_sock_accept(struct spdk_sock *sock)
new_sock->net_impl = sock->net_impl; new_sock->net_impl = sock->net_impl;
TAILQ_INIT(&new_sock->queued_reqs); TAILQ_INIT(&new_sock->queued_reqs);
TAILQ_INIT(&new_sock->pending_reqs); TAILQ_INIT(&new_sock->pending_reqs);
len = sizeof(struct spdk_sock_impl_opts);
spdk_sock_impl_get_opts(sock->net_impl->name, &impl_opts, &len);
new_sock->zerocopy_threshold = impl_opts.zerocopy_threshold;
} }
return new_sock; return new_sock;
@ -850,6 +862,7 @@ spdk_sock_write_config_json(struct spdk_json_write_ctx *w)
spdk_json_write_named_uint32(w, "enable_placement_id", opts.enable_placement_id); spdk_json_write_named_uint32(w, "enable_placement_id", opts.enable_placement_id);
spdk_json_write_named_bool(w, "enable_zerocopy_send_server", opts.enable_zerocopy_send_server); spdk_json_write_named_bool(w, "enable_zerocopy_send_server", opts.enable_zerocopy_send_server);
spdk_json_write_named_bool(w, "enable_zerocopy_send_client", opts.enable_zerocopy_send_client); spdk_json_write_named_bool(w, "enable_zerocopy_send_client", opts.enable_zerocopy_send_client);
spdk_json_write_named_uint32(w, "zerocopy_threshold", opts.zerocopy_threshold);
spdk_json_write_object_end(w); spdk_json_write_object_end(w);
spdk_json_write_object_end(w); spdk_json_write_object_end(w);
} else { } else {

View File

@ -79,6 +79,7 @@ rpc_sock_impl_get_options(struct spdk_jsonrpc_request *request,
spdk_json_write_named_uint32(w, "enable_placement_id", sock_opts.enable_placement_id); spdk_json_write_named_uint32(w, "enable_placement_id", sock_opts.enable_placement_id);
spdk_json_write_named_bool(w, "enable_zerocopy_send_server", sock_opts.enable_zerocopy_send_server); spdk_json_write_named_bool(w, "enable_zerocopy_send_server", sock_opts.enable_zerocopy_send_server);
spdk_json_write_named_bool(w, "enable_zerocopy_send_client", sock_opts.enable_zerocopy_send_client); spdk_json_write_named_bool(w, "enable_zerocopy_send_client", sock_opts.enable_zerocopy_send_client);
spdk_json_write_named_uint32(w, "zerocopy_threshold", sock_opts.zerocopy_threshold);
spdk_json_write_object_end(w); spdk_json_write_object_end(w);
spdk_jsonrpc_end_result(request, w); spdk_jsonrpc_end_result(request, w);
free(impl_name); free(impl_name);
@ -123,6 +124,10 @@ static const struct spdk_json_object_decoder rpc_sock_impl_set_opts_decoders[] =
{ {
"enable_zerocopy_send_client", offsetof(struct spdk_rpc_sock_impl_set_opts, sock_opts.enable_zerocopy_send_client), "enable_zerocopy_send_client", offsetof(struct spdk_rpc_sock_impl_set_opts, sock_opts.enable_zerocopy_send_client),
spdk_json_decode_bool, true spdk_json_decode_bool, true
},
{
"zerocopy_threshold", offsetof(struct spdk_rpc_sock_impl_set_opts, sock_opts.zerocopy_threshold),
spdk_json_decode_uint32, true
} }
}; };

View File

@ -95,7 +95,8 @@ static struct spdk_sock_impl_opts g_spdk_posix_sock_impl_opts = {
.enable_quickack = false, .enable_quickack = false,
.enable_placement_id = PLACEMENT_NONE, .enable_placement_id = PLACEMENT_NONE,
.enable_zerocopy_send_server = true, .enable_zerocopy_send_server = true,
.enable_zerocopy_send_client = false .enable_zerocopy_send_client = false,
.zerocopy_threshold = 0
}; };
static struct spdk_sock_map g_map = { static struct spdk_sock_map g_map = {
@ -698,14 +699,18 @@ _sock_check_zcopy(struct spdk_sock *sock)
for (idx = serr->ee_info; idx <= serr->ee_data; idx++) { for (idx = serr->ee_info; idx <= serr->ee_data; idx++) {
found = false; found = false;
TAILQ_FOREACH_SAFE(req, &sock->pending_reqs, internal.link, treq) { TAILQ_FOREACH_SAFE(req, &sock->pending_reqs, internal.link, treq) {
if (req->internal.offset == idx) { if (!req->internal.is_zcopy) {
found = true; /* This wasn't a zcopy request. It was just waiting in line to complete */
rc = spdk_sock_request_put(sock, req, 0);
if (rc < 0) {
return rc;
}
} else if (req->internal.offset == idx) {
found = true;
rc = spdk_sock_request_put(sock, req, 0); rc = spdk_sock_request_put(sock, req, 0);
if (rc < 0) { if (rc < 0) {
return rc; return rc;
} }
} else if (found) { } else if (found) {
break; break;
} }
@ -731,21 +736,13 @@ _sock_flush(struct spdk_sock *sock)
ssize_t rc; ssize_t rc;
unsigned int offset; unsigned int offset;
size_t len; size_t len;
bool is_zcopy = false;
/* Can't flush from within a callback or we end up with recursive calls */ /* Can't flush from within a callback or we end up with recursive calls */
if (sock->cb_cnt > 0) { if (sock->cb_cnt > 0) {
return 0; return 0;
} }
iovcnt = spdk_sock_prep_reqs(sock, iovs, 0, NULL);
if (iovcnt == 0) {
return 0;
}
/* Perform the vectored write */
msg.msg_iov = iovs;
msg.msg_iovlen = iovcnt;
#ifdef SPDK_ZEROCOPY #ifdef SPDK_ZEROCOPY
if (psock->zcopy) { if (psock->zcopy) {
flags = MSG_ZEROCOPY | MSG_NOSIGNAL; flags = MSG_ZEROCOPY | MSG_NOSIGNAL;
@ -754,6 +751,20 @@ _sock_flush(struct spdk_sock *sock)
{ {
flags = MSG_NOSIGNAL; flags = MSG_NOSIGNAL;
} }
iovcnt = spdk_sock_prep_reqs(sock, iovs, 0, NULL, &flags);
if (iovcnt == 0) {
return 0;
}
#ifdef SPDK_ZEROCOPY
is_zcopy = flags & MSG_ZEROCOPY;
#endif
/* Perform the vectored write */
msg.msg_iov = iovs;
msg.msg_iovlen = iovcnt;
rc = sendmsg(psock->fd, &msg, flags); rc = sendmsg(psock->fd, &msg, flags);
if (rc <= 0) { if (rc <= 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK || (errno == ENOBUFS && psock->zcopy)) { if (errno == EAGAIN || errno == EWOULDBLOCK || (errno == ENOBUFS && psock->zcopy)) {
@ -762,7 +773,7 @@ _sock_flush(struct spdk_sock *sock)
return rc; return rc;
} }
if (psock->zcopy) { if (is_zcopy) {
/* Handling overflow case, because we use psock->sendmsg_idx - 1 for the /* Handling overflow case, because we use psock->sendmsg_idx - 1 for the
* req->internal.offset, so sendmsg_idx should not be zero */ * req->internal.offset, so sendmsg_idx should not be zero */
if (spdk_unlikely(psock->sendmsg_idx == UINT32_MAX)) { if (spdk_unlikely(psock->sendmsg_idx == UINT32_MAX)) {
@ -777,6 +788,9 @@ _sock_flush(struct spdk_sock *sock)
while (req) { while (req) {
offset = req->internal.offset; offset = req->internal.offset;
/* req->internal.is_zcopy is true when the whole req or part of it is sent with zerocopy */
req->internal.is_zcopy = is_zcopy;
for (i = 0; i < req->iovcnt; i++) { for (i = 0; i < req->iovcnt; i++) {
/* Advance by the offset first */ /* Advance by the offset first */
if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
@ -801,7 +815,7 @@ _sock_flush(struct spdk_sock *sock)
/* Handled a full request. */ /* Handled a full request. */
spdk_sock_request_pend(sock, req); spdk_sock_request_pend(sock, req);
if (!psock->zcopy) { if (!req->internal.is_zcopy && req == TAILQ_FIRST(&sock->pending_reqs)) {
/* The sendmsg syscall above isn't currently asynchronous, /* The sendmsg syscall above isn't currently asynchronous,
* so it's already done. */ * so it's already done. */
retval = spdk_sock_request_put(sock, req, 0); retval = spdk_sock_request_put(sock, req, 0);
@ -1514,6 +1528,7 @@ posix_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len)
GET_FIELD(enable_placement_id); GET_FIELD(enable_placement_id);
GET_FIELD(enable_zerocopy_send_server); GET_FIELD(enable_zerocopy_send_server);
GET_FIELD(enable_zerocopy_send_client); GET_FIELD(enable_zerocopy_send_client);
GET_FIELD(zerocopy_threshold);
#undef GET_FIELD #undef GET_FIELD
#undef FIELD_OK #undef FIELD_OK
@ -1546,6 +1561,7 @@ posix_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len)
SET_FIELD(enable_placement_id); SET_FIELD(enable_placement_id);
SET_FIELD(enable_zerocopy_send_server); SET_FIELD(enable_zerocopy_send_server);
SET_FIELD(enable_zerocopy_send_client); SET_FIELD(enable_zerocopy_send_client);
SET_FIELD(zerocopy_threshold);
#undef SET_FIELD #undef SET_FIELD
#undef FIELD_OK #undef FIELD_OK

View File

@ -79,6 +79,7 @@ struct spdk_uring_task {
struct iovec iovs[IOV_BATCH_SIZE]; struct iovec iovs[IOV_BATCH_SIZE];
int iov_cnt; int iov_cnt;
struct spdk_sock_request *last_req; struct spdk_sock_request *last_req;
bool is_zcopy;
STAILQ_ENTRY(spdk_uring_task) link; STAILQ_ENTRY(spdk_uring_task) link;
}; };
@ -122,6 +123,7 @@ static struct spdk_sock_impl_opts g_spdk_uring_sock_impl_opts = {
.enable_placement_id = PLACEMENT_NONE, .enable_placement_id = PLACEMENT_NONE,
.enable_zerocopy_send_server = false, .enable_zerocopy_send_server = false,
.enable_zerocopy_send_client = false, .enable_zerocopy_send_client = false,
.zerocopy_threshold = 0
}; };
static struct spdk_sock_map g_map = { static struct spdk_sock_map g_map = {
@ -766,7 +768,7 @@ uring_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
} }
static int static int
sock_complete_reqs(struct spdk_sock *_sock, ssize_t rc) sock_complete_reqs(struct spdk_sock *_sock, ssize_t rc, bool is_zcopy)
{ {
struct spdk_uring_sock *sock = __uring_sock(_sock); struct spdk_uring_sock *sock = __uring_sock(_sock);
struct spdk_sock_request *req; struct spdk_sock_request *req;
@ -774,7 +776,7 @@ sock_complete_reqs(struct spdk_sock *_sock, ssize_t rc)
unsigned int offset; unsigned int offset;
size_t len; size_t len;
if (sock->zcopy) { if (is_zcopy) {
/* Handling overflow case, because we use psock->sendmsg_idx - 1 for the /* Handling overflow case, because we use psock->sendmsg_idx - 1 for the
* req->internal.offset, so sendmsg_idx should not be zero */ * req->internal.offset, so sendmsg_idx should not be zero */
if (spdk_unlikely(sock->sendmsg_idx == UINT32_MAX)) { if (spdk_unlikely(sock->sendmsg_idx == UINT32_MAX)) {
@ -789,6 +791,9 @@ sock_complete_reqs(struct spdk_sock *_sock, ssize_t rc)
while (req) { while (req) {
offset = req->internal.offset; offset = req->internal.offset;
/* req->internal.is_zcopy is true when the whole req or part of it is sent with zerocopy */
req->internal.is_zcopy = is_zcopy;
for (i = 0; i < req->iovcnt; i++) { for (i = 0; i < req->iovcnt; i++) {
/* Advance by the offset first */ /* Advance by the offset first */
if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
@ -813,7 +818,7 @@ sock_complete_reqs(struct spdk_sock *_sock, ssize_t rc)
/* Handled a full request. */ /* Handled a full request. */
spdk_sock_request_pend(_sock, req); spdk_sock_request_pend(_sock, req);
if (!sock->zcopy) { if (!req->internal.is_zcopy && req == TAILQ_FIRST(&_sock->pending_reqs)) {
retval = spdk_sock_request_put(_sock, req, 0); retval = spdk_sock_request_put(_sock, req, 0);
if (retval) { if (retval) {
return retval; return retval;
@ -881,13 +886,18 @@ _sock_check_zcopy(struct spdk_sock *_sock, int status)
for (idx = serr->ee_info; idx <= serr->ee_data; idx++) { for (idx = serr->ee_info; idx <= serr->ee_data; idx++) {
found = false; found = false;
TAILQ_FOREACH_SAFE(req, &_sock->pending_reqs, internal.link, treq) { TAILQ_FOREACH_SAFE(req, &_sock->pending_reqs, internal.link, treq) {
if (req->internal.offset == idx) { if (!req->internal.is_zcopy) {
/* This wasn't a zcopy request. It was just waiting in line to complete */
rc = spdk_sock_request_put(_sock, req, 0);
if (rc < 0) {
return rc;
}
} else if (req->internal.offset == idx) {
found = true; found = true;
rc = spdk_sock_request_put(_sock, req, 0); rc = spdk_sock_request_put(_sock, req, 0);
if (rc < 0) { if (rc < 0) {
return rc; return rc;
} }
} else if (found) { } else if (found) {
break; break;
} }
@ -926,13 +936,22 @@ _sock_flush(struct spdk_sock *_sock)
struct spdk_uring_task *task = &sock->write_task; struct spdk_uring_task *task = &sock->write_task;
uint32_t iovcnt; uint32_t iovcnt;
struct io_uring_sqe *sqe; struct io_uring_sqe *sqe;
int flags = MSG_DONTWAIT | sock->zcopy_send_flags; int flags;
if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
return; return;
} }
iovcnt = spdk_sock_prep_reqs(&sock->base, task->iovs, task->iov_cnt, &task->last_req); #ifdef SPDK_ZEROCOPY
if (sock->zcopy) {
flags = MSG_DONTWAIT | sock->zcopy_send_flags;
} else
#endif
{
flags = MSG_DONTWAIT;
}
iovcnt = spdk_sock_prep_reqs(&sock->base, task->iovs, task->iov_cnt, &task->last_req, &flags);
if (!iovcnt) { if (!iovcnt) {
return; return;
} }
@ -941,7 +960,9 @@ _sock_flush(struct spdk_sock *_sock)
assert(sock->group != NULL); assert(sock->group != NULL);
task->msg.msg_iov = task->iovs; task->msg.msg_iov = task->iovs;
task->msg.msg_iovlen = task->iov_cnt; task->msg.msg_iovlen = task->iov_cnt;
#ifdef SPDK_ZEROCOPY
task->is_zcopy = (flags & MSG_ZEROCOPY) ? true : false;
#endif
sock->group->io_queued++; sock->group->io_queued++;
sqe = io_uring_get_sqe(&sock->group->uring); sqe = io_uring_get_sqe(&sock->group->uring);
@ -1048,11 +1069,12 @@ sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max
case SPDK_SOCK_TASK_WRITE: case SPDK_SOCK_TASK_WRITE:
task->last_req = NULL; task->last_req = NULL;
task->iov_cnt = 0; task->iov_cnt = 0;
task->is_zcopy = false;
if (spdk_unlikely(status) < 0) { if (spdk_unlikely(status) < 0) {
sock->connection_status = status; sock->connection_status = status;
spdk_sock_abort_requests(&sock->base); spdk_sock_abort_requests(&sock->base);
} else { } else {
sock_complete_reqs(&sock->base, status); sock_complete_reqs(&sock->base, status, task->is_zcopy);
} }
break; break;
@ -1156,6 +1178,7 @@ _sock_flush_client(struct spdk_sock *_sock)
ssize_t rc; ssize_t rc;
int flags = sock->zcopy_send_flags; int flags = sock->zcopy_send_flags;
int retval; int retval;
bool is_zcopy = false;
/* Can't flush from within a callback or we end up with recursive calls */ /* Can't flush from within a callback or we end up with recursive calls */
if (_sock->cb_cnt > 0) { if (_sock->cb_cnt > 0) {
@ -1163,7 +1186,7 @@ _sock_flush_client(struct spdk_sock *_sock)
} }
/* Gather an iov */ /* Gather an iov */
iovcnt = spdk_sock_prep_reqs(_sock, iovs, 0, NULL); iovcnt = spdk_sock_prep_reqs(_sock, iovs, 0, NULL, &flags);
if (iovcnt == 0) { if (iovcnt == 0) {
return 0; return 0;
} }
@ -1179,7 +1202,10 @@ _sock_flush_client(struct spdk_sock *_sock)
return rc; return rc;
} }
retval = sock_complete_reqs(_sock, rc); #ifdef SPDK_ZEROCOPY
is_zcopy = flags & MSG_ZEROCOPY;
#endif
retval = sock_complete_reqs(_sock, rc, is_zcopy);
if (retval < 0) { if (retval < 0) {
/* if the socket is closed, return to avoid heap-use-after-free error */ /* if the socket is closed, return to avoid heap-use-after-free error */
return retval; return retval;
@ -1530,6 +1556,7 @@ uring_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len)
GET_FIELD(enable_placement_id); GET_FIELD(enable_placement_id);
GET_FIELD(enable_zerocopy_send_server); GET_FIELD(enable_zerocopy_send_server);
GET_FIELD(enable_zerocopy_send_client); GET_FIELD(enable_zerocopy_send_client);
GET_FIELD(zerocopy_threshold);
#undef GET_FIELD #undef GET_FIELD
#undef FIELD_OK #undef FIELD_OK
@ -1561,6 +1588,7 @@ uring_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len)
SET_FIELD(enable_placement_id); SET_FIELD(enable_placement_id);
SET_FIELD(enable_zerocopy_send_server); SET_FIELD(enable_zerocopy_send_server);
SET_FIELD(enable_zerocopy_send_client); SET_FIELD(enable_zerocopy_send_client);
SET_FIELD(zerocopy_threshold);
#undef SET_FIELD #undef SET_FIELD
#undef FIELD_OK #undef FIELD_OK

View File

@ -19,7 +19,8 @@ def sock_impl_set_options(client,
enable_quickack=None, enable_quickack=None,
enable_placement_id=None, enable_placement_id=None,
enable_zerocopy_send_server=None, enable_zerocopy_send_server=None,
enable_zerocopy_send_client=None): enable_zerocopy_send_client=None,
zerocopy_threshold=None):
"""Set parameters for the socket layer implementation. """Set parameters for the socket layer implementation.
Args: Args:
@ -31,6 +32,7 @@ def sock_impl_set_options(client,
enable_placement_id: option for placement_id. 0:disable,1:incoming_napi,2:incoming_cpu (optional) enable_placement_id: option for placement_id. 0:disable,1:incoming_napi,2:incoming_cpu (optional)
enable_zerocopy_send_server: enable or disable zerocopy on send for server sockets(optional) enable_zerocopy_send_server: enable or disable zerocopy on send for server sockets(optional)
enable_zerocopy_send_client: enable or disable zerocopy on send for client sockets(optional) enable_zerocopy_send_client: enable or disable zerocopy on send for client sockets(optional)
zerocopy_threshold: set zerocopy_threshold in bytes(optional)
""" """
params = {} params = {}
@ -49,6 +51,8 @@ def sock_impl_set_options(client,
params['enable_zerocopy_send_server'] = enable_zerocopy_send_server params['enable_zerocopy_send_server'] = enable_zerocopy_send_server
if enable_zerocopy_send_client is not None: if enable_zerocopy_send_client is not None:
params['enable_zerocopy_send_client'] = enable_zerocopy_send_client params['enable_zerocopy_send_client'] = enable_zerocopy_send_client
if enable_dynamic_zerocopy is not None:
params['zerocopy_threshold'] = zerocopy_threshold
return client.call('sock_impl_set_options', params) return client.call('sock_impl_set_options', params)

View File

@ -2838,7 +2838,8 @@ Format: 'user:u1 secret:s1 muser:mu1 msecret:ms1,user:u2 secret:s2 muser:mu2 mse
enable_quickack=args.enable_quickack, enable_quickack=args.enable_quickack,
enable_placement_id=args.enable_placement_id, enable_placement_id=args.enable_placement_id,
enable_zerocopy_send_server=args.enable_zerocopy_send_server, enable_zerocopy_send_server=args.enable_zerocopy_send_server,
enable_zerocopy_send_client=args.enable_zerocopy_send_client) enable_zerocopy_send_client=args.enable_zerocopy_send_client,
zerocopy_threshold=args.zerocopy_threshold)
p = subparsers.add_parser('sock_impl_set_options', help="""Set options of socket layer implementation""") p = subparsers.add_parser('sock_impl_set_options', help="""Set options of socket layer implementation""")
p.add_argument('-i', '--impl', help='Socket implementation name, e.g. posix', required=True) p.add_argument('-i', '--impl', help='Socket implementation name, e.g. posix', required=True)
@ -2861,8 +2862,11 @@ Format: 'user:u1 secret:s1 muser:mu1 msecret:ms1,user:u2 secret:s2 muser:mu2 mse
action='store_true', dest='enable_zerocopy_send_client') action='store_true', dest='enable_zerocopy_send_client')
p.add_argument('--disable-zerocopy-send-client', help='Disable zerocopy on send for client sockets', p.add_argument('--disable-zerocopy-send-client', help='Disable zerocopy on send for client sockets',
action='store_false', dest='enable_zerocopy_send_client') action='store_false', dest='enable_zerocopy_send_client')
p.add_argument('--zerocopy-threshold', help='Set zerocopy_threshold in bytes',
action='store_true', dest='zerocopy_threshold')
p.set_defaults(func=sock_impl_set_options, enable_recv_pipe=None, enable_quickack=None, p.set_defaults(func=sock_impl_set_options, enable_recv_pipe=None, enable_quickack=None,
enable_placement_id=None, enable_zerocopy_send_server=None, enable_zerocopy_send_client=None) enable_placement_id=None, enable_zerocopy_send_server=None, enable_zerocopy_send_client=None,
zerocopy_threshold=None)
def sock_set_default_impl(args): def sock_set_default_impl(args):
print_json(rpc.sock.sock_set_default_impl(args.client, print_json(rpc.sock.sock_set_default_impl(args.client,

View File

@ -212,9 +212,9 @@ flush_server(void)
* that is fully completed. */ * that is fully completed. */
spdk_sock_request_queue(sock, req1); spdk_sock_request_queue(sock, req1);
cb_arg1 = false; cb_arg1 = false;
rc = spdk_sock_prep_reqs(sock, usock.write_task.iovs, 0, NULL); rc = spdk_sock_prep_reqs(sock, usock.write_task.iovs, 0, NULL, NULL);
CU_ASSERT(rc == 2); CU_ASSERT(rc == 2);
sock_complete_reqs(sock, 128); sock_complete_reqs(sock, 128, 0);
CU_ASSERT(cb_arg1 == true); CU_ASSERT(cb_arg1 == true);
CU_ASSERT(TAILQ_EMPTY(&sock->queued_reqs)); CU_ASSERT(TAILQ_EMPTY(&sock->queued_reqs));
@ -223,9 +223,9 @@ flush_server(void)
spdk_sock_request_queue(sock, req2); spdk_sock_request_queue(sock, req2);
cb_arg1 = false; cb_arg1 = false;
cb_arg2 = false; cb_arg2 = false;
rc = spdk_sock_prep_reqs(sock, usock.write_task.iovs, 0, NULL); rc = spdk_sock_prep_reqs(sock, usock.write_task.iovs, 0, NULL, NULL);
CU_ASSERT(rc == 4); CU_ASSERT(rc == 4);
sock_complete_reqs(sock, 192); sock_complete_reqs(sock, 192, 0);
CU_ASSERT(cb_arg1 == true); CU_ASSERT(cb_arg1 == true);
CU_ASSERT(cb_arg2 == true); CU_ASSERT(cb_arg2 == true);
CU_ASSERT(TAILQ_EMPTY(&sock->queued_reqs)); CU_ASSERT(TAILQ_EMPTY(&sock->queued_reqs));
@ -234,20 +234,20 @@ flush_server(void)
/* One request that is partially sent. */ /* One request that is partially sent. */
spdk_sock_request_queue(sock, req1); spdk_sock_request_queue(sock, req1);
cb_arg1 = false; cb_arg1 = false;
rc = spdk_sock_prep_reqs(sock, usock.write_task.iovs, 0, NULL); rc = spdk_sock_prep_reqs(sock, usock.write_task.iovs, 0, NULL, NULL);
CU_ASSERT(rc == 2); CU_ASSERT(rc == 2);
sock_complete_reqs(sock, 92); sock_complete_reqs(sock, 92, 0);
CU_ASSERT(rc == 2); CU_ASSERT(rc == 2);
CU_ASSERT(cb_arg1 == false); CU_ASSERT(cb_arg1 == false);
CU_ASSERT(TAILQ_FIRST(&sock->queued_reqs) == req1); CU_ASSERT(TAILQ_FIRST(&sock->queued_reqs) == req1);
/* Get the second time partial sent result. */ /* Get the second time partial sent result. */
sock_complete_reqs(sock, 10); sock_complete_reqs(sock, 10, 0);
CU_ASSERT(cb_arg1 == false); CU_ASSERT(cb_arg1 == false);
CU_ASSERT(TAILQ_FIRST(&sock->queued_reqs) == req1); CU_ASSERT(TAILQ_FIRST(&sock->queued_reqs) == req1);
/* Data is finally sent. */ /* Data is finally sent. */
sock_complete_reqs(sock, 26); sock_complete_reqs(sock, 26, 0);
CU_ASSERT(cb_arg1 == true); CU_ASSERT(cb_arg1 == true);
CU_ASSERT(TAILQ_EMPTY(&sock->queued_reqs)); CU_ASSERT(TAILQ_EMPTY(&sock->queued_reqs));