diff --git a/module/sock/uring/uring.c b/module/sock/uring/uring.c index 24de93f91..2abdb20c9 100644 --- a/module/sock/uring/uring.c +++ b/module/sock/uring/uring.c @@ -84,11 +84,11 @@ struct spdk_uring_sock { struct spdk_uring_task write_task; struct spdk_uring_task pollin_task; struct spdk_uring_task cancel_task; - int outstanding_io; struct spdk_pipe *recv_pipe; void *recv_buf; int recv_buf_sz; bool pending_recv; + int connection_status; TAILQ_ENTRY(spdk_uring_sock) link; }; @@ -567,11 +567,6 @@ uring_sock_close(struct spdk_sock *_sock) struct spdk_uring_sock *sock = __uring_sock(_sock); int rc; - /* defer the socket close if there is outstanding I/O */ - if (sock->outstanding_io) { - return 0; - } - assert(TAILQ_EMPTY(&_sock->pending_reqs)); assert(sock->group == NULL); @@ -922,7 +917,7 @@ sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max switch (task->type) { case SPDK_SOCK_TASK_POLLIN: if ((status & POLLIN) == POLLIN) { - if ((socks != NULL) && (sock->base.cb_fn != NULL)) { + if (sock->base.cb_fn != NULL) { assert(sock->pending_recv == false); sock->pending_recv = true; TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); @@ -935,30 +930,25 @@ sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max assert(TAILQ_EMPTY(&sock->base.pending_reqs)); task->last_req = NULL; task->iov_cnt = 0; - sock_complete_reqs(&sock->base, status); - - /* For socket is removed from the group but having outstanding I/O */ - if (spdk_unlikely(task->sock->outstanding_io > 0 && - TAILQ_EMPTY(&sock->base.pending_reqs))) { - if (--sock->outstanding_io == 0) { - /* Just for sock close case */ - if (sock->base.flags.closed) { - uring_sock_close(&sock->base); - } - } + if (spdk_unlikely(status) < 0) { + sock->connection_status = status; + spdk_sock_abort_requests(&sock->base); + } else { + sock_complete_reqs(&sock->base, status); } break; case SPDK_SOCK_TASK_CANCEL: - if ((status == 0) && (sock->outstanding_io > 0)) { - sock->outstanding_io--; - } + /* Do nothing */ break; default: SPDK_UNREACHABLE(); } } + if (!socks) { + return 0; + } count = 0; TAILQ_FOREACH_SAFE(sock, &group->pending_recv, link, tmp) { if (count == max_read_events) { @@ -1027,6 +1017,11 @@ uring_sock_writev_async(struct spdk_sock *_sock, struct spdk_sock_request *req) struct spdk_uring_sock *sock = __uring_sock(_sock); int rc; + if (spdk_unlikely(sock->connection_status)) { + req->cb_fn(req->cb_arg, sock->connection_status); + return; + } + spdk_sock_request_queue(_sock, req); if (!sock->group) { @@ -1199,8 +1194,13 @@ uring_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, int count, ret; int to_complete, to_submit; struct spdk_sock *_sock, *tmp; + struct spdk_uring_sock *sock; TAILQ_FOREACH_SAFE(_sock, &group->base.socks, link, tmp) { + sock = __uring_sock(_sock); + if (spdk_unlikely(sock->connection_status)) { + continue; + } _sock_flush(_sock); _sock_prep_pollin(_sock); } @@ -1237,29 +1237,31 @@ uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { - /* For write, we do not need to cancel it */ - sock->outstanding_io++; + _sock_prep_cancel_task(_sock, &sock->write_task); + /* Since spdk_sock_group_remove_sock is not asynchronous interface, so + * currently can use a while loop here. */ + while ((sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) || + (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) { + uring_sock_group_impl_poll(_group, 32, NULL); + } } if (sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { - sock->outstanding_io++; _sock_prep_cancel_task(_sock, &sock->pollin_task); - } - - - /* Since spdk_sock_group_remove_sock is not asynchronous interface, so - * currently can use a while loop here. */ - while (sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { - uring_sock_group_impl_poll(_group, 32, NULL); - } - - if (sock->recv_pipe != NULL) { - if (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0) { - TAILQ_REMOVE(&group->pending_recv, sock, link); - sock->pending_recv = false; + /* Since spdk_sock_group_remove_sock is not asynchronous interface, so + * currently can use a while loop here. */ + while ((sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) || + (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) { + uring_sock_group_impl_poll(_group, 32, NULL); } - assert(sock->pending_recv == false); } + + if (sock->pending_recv) { + TAILQ_REMOVE(&group->pending_recv, sock, link); + sock->pending_recv = false; + } + assert(sock->pending_recv == false); + sock->group = NULL; return 0; }