From e036215fe9b1c79be4ede31954b9173de2e8f8f3 Mon Sep 17 00:00:00 2001 From: Ben Walker Date: Tue, 21 May 2019 14:59:12 -0700 Subject: [PATCH] thread: Add a mechanism to exit a lightweight thread Lightweight threads may now be exited by calling spdk_thread_exit() within the thread. The framework polling the thread can release the resources associated with that lightweight thread by calling spdk_thread_destroy(). Change-Id: I6586b9d22556b3874fb113ce5402c6b1f371786e Signed-off-by: Ben Walker Reviewed-on: https://review.gerrithub.io/c/spdk/spdk/+/455319 Tested-by: SPDK CI Jenkins Reviewed-by: Seth Howell Reviewed-by: Changpeng Liu Reviewed-by: Darek Stojaczyk --- examples/bdev/fio_plugin/fio_plugin.c | 1 + include/spdk/thread.h | 16 +++++++++-- lib/event/reactor.c | 9 +++++- lib/thread/thread.c | 28 ++++++++++++++++++- test/common/lib/ut_multithread.c | 1 + .../blobfs/blobfs_sync_ut/blobfs_sync_ut.c | 5 ++++ test/unit/lib/ftl/common/utils.c | 2 ++ .../lib/iscsi/portal_grp.c/portal_grp_ut.c | 2 ++ test/unit/lib/nvmf/tcp.c/tcp_ut.c | 3 ++ test/unit/lib/thread/thread.c/thread_ut.c | 6 ++++ 10 files changed, 68 insertions(+), 5 deletions(-) diff --git a/examples/bdev/fio_plugin/fio_plugin.c b/examples/bdev/fio_plugin/fio_plugin.c index d8eea2e2f..c8c9b6f3e 100644 --- a/examples/bdev/fio_plugin/fio_plugin.c +++ b/examples/bdev/fio_plugin/fio_plugin.c @@ -147,6 +147,7 @@ spdk_fio_cleanup_thread(struct spdk_fio_thread *fio_thread) spdk_set_thread(fio_thread->thread); spdk_thread_exit(fio_thread->thread); + spdk_thread_destroy(fio_thread->thread); free(fio_thread->iocq); free(fio_thread); } diff --git a/include/spdk/thread.h b/include/spdk/thread.h index a6f3cb5ae..466cbf5b5 100644 --- a/include/spdk/thread.h +++ b/include/spdk/thread.h @@ -206,8 +206,9 @@ void spdk_thread_lib_fini(void); struct spdk_thread *spdk_thread_create(const char *name, struct spdk_cpuset *cpumask); /** - * Release any resources related to the given thread and destroy it. Execution - * continues on the current system thread after returning. + * Mark the thread as exited, failing all future spdk_thread_poll() calls. May + * only be called within an spdk poller or message. + * * * \param thread The thread to destroy. * @@ -216,6 +217,15 @@ struct spdk_thread *spdk_thread_create(const char *name, struct spdk_cpuset *cpu */ void spdk_thread_exit(struct spdk_thread *thread); +/** + * Destroy a thread, releasing all of its resources. May only be called + * on a thread previously marked as exited. + * + * \param thread The thread to destroy. + * + */ +void spdk_thread_destroy(struct spdk_thread *thread); + /** * Return a pointer to this thread's context. * @@ -255,7 +265,7 @@ struct spdk_thread *spdk_thread_get_from_ctx(void *ctx); * \param now The current time, in ticks. Optional. If 0 is passed, this * function may call spdk_get_ticks() to get the current time. * - * \return 1 if work was done. 0 if no work was done. -1 if unknown. + * \return 1 if work was done. 0 if no work was done. -1 if thread has exited. */ int spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now); diff --git a/lib/event/reactor.c b/lib/event/reactor.c index 8b19839c9..eae3d6d42 100644 --- a/lib/event/reactor.c +++ b/lib/event/reactor.c @@ -258,6 +258,7 @@ _spdk_reactor_run(void *arg) while (1) { uint64_t now; + int rc; /* For each loop through the reactor, capture the time. This time * is used for all threads. */ @@ -268,7 +269,11 @@ _spdk_reactor_run(void *arg) TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) { thread = spdk_thread_get_from_ctx(lw_thread); - spdk_thread_poll(thread, 0, now); + rc = spdk_thread_poll(thread, 0, now); + if (rc < 0) { + TAILQ_REMOVE(&reactor->threads, lw_thread, link); + spdk_thread_destroy(thread); + } } if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING) { @@ -286,7 +291,9 @@ _spdk_reactor_run(void *arg) TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) { thread = spdk_thread_get_from_ctx(lw_thread); TAILQ_REMOVE(&reactor->threads, lw_thread, link); + spdk_set_thread(thread); spdk_thread_exit(thread); + spdk_thread_destroy(thread); } return 0; diff --git a/lib/thread/thread.c b/lib/thread/thread.c index 6f16c1ca4..a10a773cf 100644 --- a/lib/thread/thread.c +++ b/lib/thread/thread.c @@ -105,6 +105,8 @@ struct spdk_thread { TAILQ_ENTRY(spdk_thread) tailq; char *name; + bool exit; + struct spdk_cpuset *cpumask; uint64_t tsc_last; @@ -330,7 +332,19 @@ spdk_set_thread(struct spdk_thread *thread) void spdk_thread_exit(struct spdk_thread *thread) { - SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Freeing thread %s\n", thread->name); + SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Exit thread %s\n", thread->name); + + assert(tls_thread == thread); + + thread->exit = true; +} + +void +spdk_thread_destroy(struct spdk_thread *thread) +{ + SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Destroy thread %s\n", thread->name); + + assert(thread->exit == true); if (tls_thread == thread) { tls_thread = NULL; @@ -400,6 +414,10 @@ _spdk_msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) assert(msg != NULL); msg->fn(msg->arg); + if (thread->exit) { + break; + } + if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { /* Insert the messages at the head. We want to re-use the hot * ones. */ @@ -459,6 +477,10 @@ spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) active_pollers_head, tailq, tmp) { int poller_rc; + if (thread->exit) { + break; + } + if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { TAILQ_REMOVE(&thread->active_pollers, poller, tailq); free(poller); @@ -491,6 +513,10 @@ spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) TAILQ_FOREACH_SAFE(poller, &thread->timer_pollers, tailq, tmp) { int timer_rc = 0; + if (thread->exit) { + break; + } + if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); free(poller); diff --git a/test/common/lib/ut_multithread.c b/test/common/lib/ut_multithread.c index ee7db98be..48bd31379 100644 --- a/test/common/lib/ut_multithread.c +++ b/test/common/lib/ut_multithread.c @@ -106,6 +106,7 @@ free_threads(void) for (i = 0; i < g_ut_num_threads; i++) { set_thread(i); spdk_thread_exit(g_ut_threads[i].thread); + spdk_thread_destroy(g_ut_threads[i].thread); g_ut_threads[i].thread = NULL; } diff --git a/test/unit/lib/blobfs/blobfs_sync_ut/blobfs_sync_ut.c b/test/unit/lib/blobfs/blobfs_sync_ut/blobfs_sync_ut.c index 4102a0f79..5dc3883cd 100644 --- a/test/unit/lib/blobfs/blobfs_sync_ut/blobfs_sync_ut.c +++ b/test/unit/lib/blobfs/blobfs_sync_ut/blobfs_sync_ut.c @@ -409,8 +409,13 @@ int main(int argc, char **argv) while (spdk_thread_poll(g_dispatch_thread, 0, 0) > 0) {} while (spdk_thread_poll(thread, 0, 0) > 0) {} + spdk_set_thread(thread); spdk_thread_exit(thread); + spdk_thread_destroy(thread); + + spdk_set_thread(g_dispatch_thread); spdk_thread_exit(g_dispatch_thread); + spdk_thread_destroy(g_dispatch_thread); spdk_thread_lib_fini(); diff --git a/test/unit/lib/ftl/common/utils.c b/test/unit/lib/ftl/common/utils.c index fba1fed0e..8e92c8c44 100644 --- a/test/unit/lib/ftl/common/utils.c +++ b/test/unit/lib/ftl/common/utils.c @@ -120,7 +120,9 @@ void test_free_ftl_dev(struct spdk_ftl_dev *dev) { SPDK_CU_ASSERT_FATAL(dev != NULL); + spdk_set_thread(dev->core_thread.thread); spdk_thread_exit(dev->core_thread.thread); + spdk_thread_destroy(dev->core_thread.thread); free(dev->punits); free(dev->bands); free(dev); diff --git a/test/unit/lib/iscsi/portal_grp.c/portal_grp_ut.c b/test/unit/lib/iscsi/portal_grp.c/portal_grp_ut.c index 1a862b965..96962a6b9 100644 --- a/test/unit/lib/iscsi/portal_grp.c/portal_grp_ut.c +++ b/test/unit/lib/iscsi/portal_grp.c/portal_grp_ut.c @@ -431,6 +431,7 @@ portal_grp_add_delete_case(void) CU_ASSERT(TAILQ_EMPTY(&g_spdk_iscsi.pg_head)); spdk_thread_exit(thread); + spdk_thread_destroy(thread); } static void @@ -491,6 +492,7 @@ portal_grp_add_delete_twice_case(void) MOCK_CLEAR_P(spdk_sock_listen); spdk_thread_exit(thread); + spdk_thread_destroy(thread); } int diff --git a/test/unit/lib/nvmf/tcp.c/tcp_ut.c b/test/unit/lib/nvmf/tcp.c/tcp_ut.c index 68bec99c1..41d16dc51 100644 --- a/test/unit/lib/nvmf/tcp.c/tcp_ut.c +++ b/test/unit/lib/nvmf/tcp.c/tcp_ut.c @@ -305,6 +305,7 @@ test_nvmf_tcp_create(void) CU_ASSERT_PTR_NULL(transport); spdk_thread_exit(thread); + spdk_thread_destroy(thread); } static void @@ -334,6 +335,7 @@ test_nvmf_tcp_destroy(void) CU_ASSERT(spdk_nvmf_tcp_destroy(transport) == 0); spdk_thread_exit(thread); + spdk_thread_destroy(thread); } static void @@ -369,6 +371,7 @@ test_nvmf_tcp_poll_group_create(void) spdk_nvmf_tcp_destroy(transport); spdk_thread_exit(thread); + spdk_thread_destroy(thread); } int main(int argc, char **argv) diff --git a/test/unit/lib/thread/thread.c/thread_ut.c b/test/unit/lib/thread/thread.c/thread_ut.c index 453f24809..8c593c494 100644 --- a/test/unit/lib/thread/thread.c/thread_ut.c +++ b/test/unit/lib/thread/thread.c/thread_ut.c @@ -57,7 +57,9 @@ thread_alloc(void) spdk_thread_lib_init(NULL, 0); thread = spdk_thread_create(NULL, NULL); SPDK_CU_ASSERT_FATAL(thread != NULL); + spdk_set_thread(thread); spdk_thread_exit(thread); + spdk_thread_destroy(thread); spdk_thread_lib_fini(); /* Schedule callback exists */ @@ -67,7 +69,9 @@ thread_alloc(void) g_sched_rc = 0; thread = spdk_thread_create(NULL, NULL); SPDK_CU_ASSERT_FATAL(thread != NULL); + spdk_set_thread(thread); spdk_thread_exit(thread); + spdk_thread_destroy(thread); /* Scheduling fails */ g_sched_rc = -1; @@ -381,6 +385,7 @@ thread_name(void) name = spdk_thread_get_name(thread); CU_ASSERT(name != NULL); spdk_thread_exit(thread); + spdk_thread_destroy(thread); /* Create thread named "test_thread" */ thread = spdk_thread_create("test_thread", NULL); @@ -391,6 +396,7 @@ thread_name(void) SPDK_CU_ASSERT_FATAL(name != NULL); CU_ASSERT(strcmp(name, "test_thread") == 0); spdk_thread_exit(thread); + spdk_thread_destroy(thread); spdk_thread_lib_fini(); }