diff --git a/include/spdk/sock.h b/include/spdk/sock.h index 79789176c..49bce28ca 100644 --- a/include/spdk/sock.h +++ b/include/spdk/sock.h @@ -319,7 +319,7 @@ int spdk_sock_close(struct spdk_sock **sock); * * \param sock Socket to flush. * - * \return 0 on success, -1 on failure. + * \return number of bytes sent on success, -1 (with errno set) on failure */ int spdk_sock_flush(struct spdk_sock *sock); diff --git a/module/sock/posix/posix.c b/module/sock/posix/posix.c index 9ed7b2e98..af064f57f 100644 --- a/module/sock/posix/posix.c +++ b/module/sock/posix/posix.c @@ -1213,14 +1213,15 @@ _sock_flush(struct spdk_sock *sock) int retval; struct spdk_sock_request *req; int i; - ssize_t rc; + ssize_t rc, sent; unsigned int offset; size_t len; bool is_zcopy = false; /* Can't flush from within a callback or we end up with recursive calls */ if (sock->cb_cnt > 0) { - return 0; + errno = EAGAIN; + return -1; } #ifdef SPDK_ZEROCOPY @@ -1251,12 +1252,14 @@ _sock_flush(struct spdk_sock *sock) rc = sendmsg(psock->fd, &msg, flags); } if (rc <= 0) { - if (errno == EAGAIN || errno == EWOULDBLOCK || (errno == ENOBUFS && psock->zcopy)) { - return 0; + if (rc == 0 || errno == EAGAIN || errno == EWOULDBLOCK || (errno == ENOBUFS && psock->zcopy)) { + errno = EAGAIN; } - return rc; + return -1; } + sent = rc; + if (is_zcopy) { /* Handling overflow case, because we use psock->sendmsg_idx - 1 for the * req->internal.offset, so sendmsg_idx should not be zero */ @@ -1288,7 +1291,7 @@ _sock_flush(struct spdk_sock *sock) if (len > (size_t)rc) { /* This element was partially sent. */ req->internal.offset += rc; - return 0; + return sent; } offset = 0; @@ -1320,7 +1323,7 @@ _sock_flush(struct spdk_sock *sock) req = TAILQ_FIRST(&sock->queued_reqs); } - return 0; + return sent; } static int @@ -1532,7 +1535,7 @@ posix_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req) /* If there are a sufficient number queued, just flush them out immediately. */ if (sock->queued_iovcnt >= IOV_BATCH_SIZE) { rc = _sock_flush(sock); - if (rc) { + if (rc < 0 && errno != EAGAIN) { spdk_sock_abort_requests(sock); } } @@ -1876,7 +1879,7 @@ posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, * group. */ TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) { rc = _sock_flush(sock); - if (rc) { + if (rc < 0 && errno != EAGAIN) { spdk_sock_abort_requests(sock); } } diff --git a/module/sock/uring/uring.c b/module/sock/uring/uring.c index 291a68194..39faca11c 100644 --- a/module/sock/uring/uring.c +++ b/module/sock/uring/uring.c @@ -1271,57 +1271,7 @@ end: return count; } -static int -_sock_flush_client(struct spdk_sock *_sock) -{ - struct spdk_uring_sock *sock = __uring_sock(_sock); - struct msghdr msg = {}; - struct iovec iovs[IOV_BATCH_SIZE]; - int iovcnt; - ssize_t rc; - int flags = sock->zcopy_send_flags; - int retval; - bool is_zcopy = false; - - /* Can't flush from within a callback or we end up with recursive calls */ - if (_sock->cb_cnt > 0) { - return 0; - } - - /* Gather an iov */ - iovcnt = spdk_sock_prep_reqs(_sock, iovs, 0, NULL, &flags); - if (iovcnt == 0) { - return 0; - } - - /* Perform the vectored write */ - msg.msg_iov = iovs; - msg.msg_iovlen = iovcnt; - rc = sendmsg(sock->fd, &msg, flags | MSG_DONTWAIT); - if (rc <= 0) { - if (errno == EAGAIN || errno == EWOULDBLOCK) { - return 0; - } - return rc; - } - -#ifdef SPDK_ZEROCOPY - is_zcopy = flags & MSG_ZEROCOPY; -#endif - retval = sock_complete_write_reqs(_sock, rc, is_zcopy); - if (retval < 0) { - /* if the socket is closed, return to avoid heap-use-after-free error */ - return retval; - } - -#ifdef SPDK_ZEROCOPY - if (sock->zcopy && !TAILQ_EMPTY(&_sock->pending_reqs)) { - _sock_check_zcopy(_sock, 0); - } -#endif - - return 0; -} +static int uring_sock_flush(struct spdk_sock *_sock); static void uring_sock_writev_async(struct spdk_sock *_sock, struct spdk_sock_request *req) @@ -1338,8 +1288,8 @@ uring_sock_writev_async(struct spdk_sock *_sock, struct spdk_sock_request *req) if (!sock->group) { if (_sock->queued_iovcnt >= IOV_BATCH_SIZE) { - rc = _sock_flush_client(_sock); - if (rc) { + rc = uring_sock_flush(_sock); + if (rc < 0 && errno != EAGAIN) { spdk_sock_abort_requests(_sock); } } @@ -1645,12 +1595,61 @@ static int uring_sock_flush(struct spdk_sock *_sock) { struct spdk_uring_sock *sock = __uring_sock(_sock); + struct msghdr msg = {}; + struct iovec iovs[IOV_BATCH_SIZE]; + int iovcnt; + ssize_t rc; + int flags = sock->zcopy_send_flags; + int retval; + bool is_zcopy = false; - if (!sock->group) { - return _sock_flush_client(_sock); + /* Can't flush from within a callback or we end up with recursive calls */ + if (_sock->cb_cnt > 0) { + errno = EAGAIN; + return -1; } - return 0; + /* Can't flush while a write is already outstanding */ + if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { + errno = EAGAIN; + return -1; + } + + /* Gather an iov */ + iovcnt = spdk_sock_prep_reqs(_sock, iovs, 0, NULL, &flags); + if (iovcnt == 0) { + /* Nothing to send */ + return 0; + } + + /* Perform the vectored write */ + msg.msg_iov = iovs; + msg.msg_iovlen = iovcnt; + rc = sendmsg(sock->fd, &msg, flags | MSG_DONTWAIT); + if (rc <= 0) { + if (rc == 0 || errno == EAGAIN || errno == EWOULDBLOCK || (errno == ENOBUFS && sock->zcopy)) { + errno = EAGAIN; + } + return -1; + } + +#ifdef SPDK_ZEROCOPY + is_zcopy = flags & MSG_ZEROCOPY; +#endif + retval = sock_complete_write_reqs(_sock, rc, is_zcopy); + if (retval < 0) { + /* if the socket is closed, return to avoid heap-use-after-free error */ + errno = ENOTCONN; + return -1; + } + +#ifdef SPDK_ZEROCOPY + if (sock->zcopy && !TAILQ_EMPTY(&_sock->pending_reqs)) { + _sock_check_zcopy(_sock, 0); + } +#endif + + return rc; } static struct spdk_net_impl g_uring_net_impl = { diff --git a/test/unit/lib/sock/posix.c/posix_ut.c b/test/unit/lib/sock/posix.c/posix_ut.c index 4253034be..0044d491e 100644 --- a/test/unit/lib/sock/posix.c/posix_ut.c +++ b/test/unit/lib/sock/posix.c/posix_ut.c @@ -72,7 +72,7 @@ flush(void) MOCK_SET(sendmsg, 64); cb_arg1 = false; rc = _sock_flush(sock); - CU_ASSERT(rc == 0); + CU_ASSERT(rc == 64); CU_ASSERT(cb_arg1 == true); CU_ASSERT(TAILQ_EMPTY(&sock->queued_reqs)); @@ -83,7 +83,7 @@ flush(void) cb_arg1 = false; cb_arg2 = false; rc = _sock_flush(sock); - CU_ASSERT(rc == 0); + CU_ASSERT(rc == 128); CU_ASSERT(cb_arg1 == true); CU_ASSERT(cb_arg2 == true); CU_ASSERT(TAILQ_EMPTY(&sock->queued_reqs)); @@ -95,7 +95,7 @@ flush(void) cb_arg1 = false; cb_arg2 = false; rc = _sock_flush(sock); - CU_ASSERT(rc == 0); + CU_ASSERT(rc == 64); CU_ASSERT(cb_arg1 == true); CU_ASSERT(cb_arg2 == false); CU_ASSERT(TAILQ_FIRST(&sock->queued_reqs) == req2); @@ -107,7 +107,7 @@ flush(void) MOCK_SET(sendmsg, 10); cb_arg1 = false; rc = _sock_flush(sock); - CU_ASSERT(rc == 0); + CU_ASSERT(rc == 10); CU_ASSERT(cb_arg1 == false); CU_ASSERT(TAILQ_FIRST(&sock->queued_reqs) == req1); @@ -115,7 +115,7 @@ flush(void) MOCK_SET(sendmsg, 24); cb_arg1 = false; rc = _sock_flush(sock); - CU_ASSERT(rc == 0); + CU_ASSERT(rc == 24); CU_ASSERT(cb_arg1 == false); CU_ASSERT(TAILQ_FIRST(&sock->queued_reqs) == req1); @@ -123,7 +123,7 @@ flush(void) MOCK_SET(sendmsg, 30); cb_arg1 = false; rc = _sock_flush(sock); - CU_ASSERT(rc == 0); + CU_ASSERT(rc == 30); CU_ASSERT(cb_arg1 == true); CU_ASSERT(TAILQ_EMPTY(&sock->queued_reqs)); diff --git a/test/unit/lib/sock/uring.c/uring_ut.c b/test/unit/lib/sock/uring.c/uring_ut.c index 7d2fed4c9..42051276a 100644 --- a/test/unit/lib/sock/uring.c/uring_ut.c +++ b/test/unit/lib/sock/uring.c/uring_ut.c @@ -79,8 +79,8 @@ flush_client(void) spdk_sock_request_queue(sock, req1); MOCK_SET(sendmsg, 192); cb_arg1 = false; - rc = _sock_flush_client(sock); - CU_ASSERT(rc == 0); + rc = uring_sock_flush(sock); + CU_ASSERT(rc == 192); CU_ASSERT(cb_arg1 == true); CU_ASSERT(TAILQ_EMPTY(&sock->queued_reqs)); @@ -90,8 +90,8 @@ flush_client(void) MOCK_SET(sendmsg, 256); cb_arg1 = false; cb_arg2 = false; - rc = _sock_flush_client(sock); - CU_ASSERT(rc == 0); + rc = uring_sock_flush(sock); + CU_ASSERT(rc == 256); CU_ASSERT(cb_arg1 == true); CU_ASSERT(cb_arg2 == true); CU_ASSERT(TAILQ_EMPTY(&sock->queued_reqs)); @@ -102,8 +102,8 @@ flush_client(void) MOCK_SET(sendmsg, 192); cb_arg1 = false; cb_arg2 = false; - rc = _sock_flush_client(sock); - CU_ASSERT(rc == 0); + rc = uring_sock_flush(sock); + CU_ASSERT(rc == 192); CU_ASSERT(cb_arg1 == true); CU_ASSERT(cb_arg2 == false); CU_ASSERT(TAILQ_FIRST(&sock->queued_reqs) == req2); @@ -114,24 +114,24 @@ flush_client(void) spdk_sock_request_queue(sock, req1); MOCK_SET(sendmsg, 10); cb_arg1 = false; - rc = _sock_flush_client(sock); - CU_ASSERT(rc == 0); + rc = uring_sock_flush(sock); + CU_ASSERT(rc == 10); CU_ASSERT(cb_arg1 == false); CU_ASSERT(TAILQ_FIRST(&sock->queued_reqs) == req1); /* Do a second flush that partial sends again. */ MOCK_SET(sendmsg, 52); cb_arg1 = false; - rc = _sock_flush_client(sock); - CU_ASSERT(rc == 0); + rc = uring_sock_flush(sock); + CU_ASSERT(rc == 52); CU_ASSERT(cb_arg1 == false); CU_ASSERT(TAILQ_FIRST(&sock->queued_reqs) == req1); /* Flush the rest of the data */ MOCK_SET(sendmsg, 130); cb_arg1 = false; - rc = _sock_flush_client(sock); - CU_ASSERT(rc == 0); + rc = uring_sock_flush(sock); + CU_ASSERT(rc == 130); CU_ASSERT(cb_arg1 == true); CU_ASSERT(TAILQ_EMPTY(&sock->queued_reqs));