diff --git a/module/sock/uring/uring.c b/module/sock/uring/uring.c index cae258d03..642636445 100644 --- a/module/sock/uring/uring.c +++ b/module/sock/uring/uring.c @@ -58,6 +58,7 @@ enum spdk_sock_task_type { SPDK_SOCK_TASK_POLLIN = 0, SPDK_SOCK_TASK_WRITE, + SPDK_SOCK_TASK_CANCEL, }; enum spdk_uring_sock_task_status { @@ -82,6 +83,7 @@ struct spdk_uring_sock { struct spdk_uring_sock_group_impl *group; struct spdk_uring_task write_task; struct spdk_uring_task pollin_task; + struct spdk_uring_task cancel_task; int outstanding_io; struct spdk_pipe *recv_pipe; void *recv_buf; @@ -858,6 +860,26 @@ _sock_prep_pollin(struct spdk_sock *_sock) task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; } +static void +_sock_prep_cancel_task(struct spdk_sock *_sock, void *user_data) +{ + struct spdk_uring_sock *sock = __uring_sock(_sock); + struct spdk_uring_task *task = &sock->cancel_task; + struct io_uring_sqe *sqe; + + if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { + return; + } + + assert(sock->group != NULL); + sock->group->io_queued++; + + sqe = io_uring_get_sqe(&sock->group->uring); + io_uring_prep_cancel(sqe, user_data, 0); + io_uring_sqe_set_data(sqe, task); + task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; +} + static int spdk_sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max_read_events, struct spdk_sock **socks) @@ -919,7 +941,6 @@ spdk_sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, in if (spdk_unlikely(task->sock->outstanding_io > 0 && TAILQ_EMPTY(&sock->base.pending_reqs))) { if (--sock->outstanding_io == 0) { - sock->group = NULL; /* Just for sock close case */ if (sock->base.flags.closed) { spdk_uring_sock_close(&sock->base); @@ -927,6 +948,11 @@ spdk_sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, in } } + break; + case SPDK_SOCK_TASK_CANCEL: + if ((status == 0) && (sock->outstanding_io > 0)) { + sock->outstanding_io--; + } break; default: SPDK_UNREACHABLE(); @@ -1151,6 +1177,9 @@ spdk_uring_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, sock->pollin_task.sock = sock; sock->pollin_task.type = SPDK_SOCK_TASK_POLLIN; + sock->cancel_task.sock = sock; + sock->cancel_task.type = SPDK_SOCK_TASK_CANCEL; + /* switched from another polling group due to scheduling */ if (spdk_unlikely(sock->recv_pipe != NULL && (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) { @@ -1207,13 +1236,21 @@ spdk_uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, struct spdk_uring_sock *sock = __uring_sock(_sock); struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); - if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { + /* For write, we do not need to cancel it */ sock->outstanding_io++; } if (sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { sock->outstanding_io++; + _sock_prep_cancel_task(_sock, &sock->pollin_task); + } + + + /* Since spdk_sock_group_remove_sock is not asynchronous interface, so + * currently can use a while loop here. */ + while (sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { + spdk_uring_sock_group_impl_poll(_group, 32, NULL); } if (sock->recv_pipe != NULL) { @@ -1223,11 +1260,7 @@ spdk_uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, } assert(sock->pending_recv == false); } - - if (!sock->outstanding_io) { - sock->group = NULL; - } - + sock->group = NULL; return 0; } @@ -1253,7 +1286,13 @@ spdk_uring_sock_group_impl_close(struct spdk_sock_group_impl *_group) static int spdk_uring_sock_flush(struct spdk_sock *_sock) { - return _sock_flush_client(_sock); + struct spdk_uring_sock *sock = __uring_sock(_sock); + + if (!sock->group) { + return _sock_flush_client(_sock); + } + + return 0; } static struct spdk_net_impl g_uring_net_impl = {