From d7393e2e4f16e4de7e1804442bf9c951a400556b Mon Sep 17 00:00:00 2001 From: Shuhei Matsumoto Date: Mon, 27 Jan 2020 00:31:37 -0500 Subject: [PATCH] lib/thread: Stop and reap pending messages after thread is marked as exited This is a preparation to support voluntary thread termination by calling spdk_thread_exit(). Previously, the exiting thread had discarded all pending mesasges. We change this to stop accepting any new message in spdk_thread_send_msg() and reap pending messages in _spdk_msg_queue_run_batch(). Add unit test case for the new behavior. Adding g_ prefix to global variables for clarification is done together. Signed-off-by: Shuhei Matsumoto Change-Id: Ida78e7bb1b86357602aea6938dd514897b67edd6 Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/482 Tested-by: SPDK CI Jenkins Reviewed-by: Ben Walker Reviewed-by: Aleksey Marchuk Reviewed-by: Changpeng Liu --- lib/thread/thread.c | 9 +- test/unit/lib/thread/thread.c/thread_ut.c | 112 +++++++++++++++++----- 2 files changed, 94 insertions(+), 27 deletions(-) diff --git a/lib/thread/thread.c b/lib/thread/thread.c index 78c1c1558..b2ae25588 100644 --- a/lib/thread/thread.c +++ b/lib/thread/thread.c @@ -444,10 +444,6 @@ _spdk_msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) } else { spdk_mempool_put(g_spdk_msg_mempool, msg); } - - if (thread->exit) { - break; - } } return count; @@ -716,6 +712,11 @@ spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx assert(thread != NULL); + if (spdk_unlikely(thread->exit)) { + SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name); + return -EIO; + } + local_thread = _get_thread(); msg = NULL; diff --git a/test/unit/lib/thread/thread.c/thread_ut.c b/test/unit/lib/thread/thread.c/thread_ut.c index fa605efb4..407bfc05d 100644 --- a/test/unit/lib/thread/thread.c/thread_ut.c +++ b/test/unit/lib/thread/thread.c/thread_ut.c @@ -587,12 +587,12 @@ thread_name(void) spdk_thread_lib_fini(); } -static uint64_t device1; -static uint64_t device2; -static uint64_t device3; +static uint64_t g_device1; +static uint64_t g_device2; +static uint64_t g_device3; -static uint64_t ctx1 = 0x1111; -static uint64_t ctx2 = 0x2222; +static uint64_t g_ctx1 = 0x1111; +static uint64_t g_ctx2 = 0x2222; static int g_create_cb_calls = 0; static int g_destroy_cb_calls = 0; @@ -600,8 +600,8 @@ static int g_destroy_cb_calls = 0; static int create_cb_1(void *io_device, void *ctx_buf) { - CU_ASSERT(io_device == &device1); - *(uint64_t *)ctx_buf = ctx1; + CU_ASSERT(io_device == &g_device1); + *(uint64_t *)ctx_buf = g_ctx1; g_create_cb_calls++; return 0; } @@ -609,16 +609,16 @@ create_cb_1(void *io_device, void *ctx_buf) static void destroy_cb_1(void *io_device, void *ctx_buf) { - CU_ASSERT(io_device == &device1); - CU_ASSERT(*(uint64_t *)ctx_buf == ctx1); + CU_ASSERT(io_device == &g_device1); + CU_ASSERT(*(uint64_t *)ctx_buf == g_ctx1); g_destroy_cb_calls++; } static int create_cb_2(void *io_device, void *ctx_buf) { - CU_ASSERT(io_device == &device2); - *(uint64_t *)ctx_buf = ctx2; + CU_ASSERT(io_device == &g_device2); + *(uint64_t *)ctx_buf = g_ctx2; g_create_cb_calls++; return 0; } @@ -626,8 +626,8 @@ create_cb_2(void *io_device, void *ctx_buf) static void destroy_cb_2(void *io_device, void *ctx_buf) { - CU_ASSERT(io_device == &device2); - CU_ASSERT(*(uint64_t *)ctx_buf == ctx2); + CU_ASSERT(io_device == &g_device2); + CU_ASSERT(*(uint64_t *)ctx_buf == g_ctx2); g_destroy_cb_calls++; } @@ -640,16 +640,16 @@ channel(void) allocate_threads(1); set_thread(0); - spdk_io_device_register(&device1, create_cb_1, destroy_cb_1, sizeof(ctx1), NULL); - spdk_io_device_register(&device2, create_cb_2, destroy_cb_2, sizeof(ctx2), NULL); + spdk_io_device_register(&g_device1, create_cb_1, destroy_cb_1, sizeof(g_ctx1), NULL); + spdk_io_device_register(&g_device2, create_cb_2, destroy_cb_2, sizeof(g_ctx2), NULL); g_create_cb_calls = 0; - ch1 = spdk_get_io_channel(&device1); + ch1 = spdk_get_io_channel(&g_device1); CU_ASSERT(g_create_cb_calls == 1); SPDK_CU_ASSERT_FATAL(ch1 != NULL); g_create_cb_calls = 0; - ch2 = spdk_get_io_channel(&device1); + ch2 = spdk_get_io_channel(&g_device1); CU_ASSERT(g_create_cb_calls == 0); CU_ASSERT(ch1 == ch2); SPDK_CU_ASSERT_FATAL(ch2 != NULL); @@ -660,13 +660,13 @@ channel(void) CU_ASSERT(g_destroy_cb_calls == 0); g_create_cb_calls = 0; - ch2 = spdk_get_io_channel(&device2); + ch2 = spdk_get_io_channel(&g_device2); CU_ASSERT(g_create_cb_calls == 1); CU_ASSERT(ch1 != ch2); SPDK_CU_ASSERT_FATAL(ch2 != NULL); ctx = spdk_io_channel_get_ctx(ch2); - CU_ASSERT(*(uint64_t *)ctx == ctx2); + CU_ASSERT(*(uint64_t *)ctx == g_ctx2); g_destroy_cb_calls = 0; spdk_put_io_channel(ch1); @@ -678,12 +678,12 @@ channel(void) poll_threads(); CU_ASSERT(g_destroy_cb_calls == 1); - ch1 = spdk_get_io_channel(&device3); + ch1 = spdk_get_io_channel(&g_device3); CU_ASSERT(ch1 == NULL); - spdk_io_device_unregister(&device1, NULL); + spdk_io_device_unregister(&g_device1, NULL); poll_threads(); - spdk_io_device_unregister(&device2, NULL); + spdk_io_device_unregister(&g_device2, NULL); poll_threads(); CU_ASSERT(TAILQ_EMPTY(&g_io_devices)); free_threads(); @@ -745,6 +745,71 @@ channel_destroy_races(void) CU_ASSERT(TAILQ_EMPTY(&g_threads)); } +static void +thread_exit(void) +{ + struct spdk_thread *thread; + struct spdk_io_channel *ch; + void *ctx; + bool done1 = false, done2 = false; + int rc __attribute__((unused)); + + allocate_threads(3); + + /* Test all pending messages are reaped for the thread marked as exited. */ + set_thread(0); + thread = spdk_get_thread(); + + /* Sending message to thread 0 will be accepted. */ + set_thread(1); + rc = spdk_thread_send_msg(thread, send_msg_cb, &done1); + CU_ASSERT(rc == 0); + CU_ASSERT(!done1); + + /* Mark thread 0 as exited. */ + set_thread(0); + spdk_thread_exit(thread); + + /* Sending message to thread 0 will be rejected. */ + set_thread(1); + rc = spdk_thread_send_msg(thread, send_msg_cb, &done2); + CU_ASSERT(rc == -EIO); + + /* Thread 0 will reap pending message. */ + poll_thread(0); + CU_ASSERT(done1 == true); + CU_ASSERT(done2 == false); + + /* Test releasing I/O channel is reaped even after the thread is marked + * as exited. + */ + set_thread(2); + + spdk_io_device_register(&g_device1, create_cb_1, destroy_cb_1, sizeof(g_ctx1), NULL); + + g_create_cb_calls = 0; + ch = spdk_get_io_channel(&g_device1); + CU_ASSERT(g_create_cb_calls == 1); + SPDK_CU_ASSERT_FATAL(ch != NULL); + + ctx = spdk_io_channel_get_ctx(ch); + CU_ASSERT(*(uint64_t *)ctx == g_ctx1); + + g_destroy_cb_calls = 0; + spdk_put_io_channel(ch); + + thread = spdk_get_thread(); + spdk_thread_exit(thread); + + poll_threads(); + CU_ASSERT(g_destroy_cb_calls == 1); + + spdk_io_device_unregister(&g_device1, NULL); + poll_threads(); + + free_threads(); +} + int main(int argc, char **argv) { @@ -771,7 +836,8 @@ main(int argc, char **argv) CU_add_test(suite, "for_each_channel_unreg", for_each_channel_unreg) == NULL || CU_add_test(suite, "thread_name", thread_name) == NULL || CU_add_test(suite, "channel", channel) == NULL || - CU_add_test(suite, "channel_destroy_races", channel_destroy_races) == NULL + CU_add_test(suite, "channel_destroy_races", channel_destroy_races) == NULL || + CU_add_test(suite, "thread_exit", thread_exit) == NULL ) { CU_cleanup_registry(); return CU_get_error();