From 111eef8f19331e93e29842205422706d5ba10c36 Mon Sep 17 00:00:00 2001 From: Ben Walker Date: Wed, 1 Nov 2017 15:12:13 -0700 Subject: [PATCH] channel: Add spdk_for_each_thread This function will send a message to each allocated thread asynchronously, then call a callback on the originating thread. Change-Id: I3ebe7c6c5b460a944a32487d1091b601a482a256 Signed-off-by: Ben Walker Reviewed-on: https://review.gerrithub.io/388041 Reviewed-by: Jim Harris Reviewed-by: Daniel Verkamp Reviewed-by: Dariusz Stojaczyk Tested-by: SPDK Automated Test System --- include/spdk/io_channel.h | 12 +++++ lib/util/io_channel.c | 52 +++++++++++++++++++ .../lib/util/io_channel.c/io_channel_ut.c | 39 ++++++++++++++ 3 files changed, 103 insertions(+) diff --git a/include/spdk/io_channel.h b/include/spdk/io_channel.h index fa6ef300f..3b9af61f5 100644 --- a/include/spdk/io_channel.h +++ b/include/spdk/io_channel.h @@ -116,6 +116,18 @@ const char *spdk_thread_get_name(const struct spdk_thread *thread); */ void spdk_thread_send_msg(const struct spdk_thread *thread, spdk_thread_fn fn, void *ctx); +/** + * \brief Send a message to each thread, serially. The message + * is sent asynchronously - i.e. spdk_for_each_thread + * will return prior to `fn` being called on each thread. + * + * @param fn This is the function that will be called on each thread. + * @param ctx This context will be passed to fn when called. + * @param cpl This will be called on the originating thread after `fn` has been + * called on each thread. + */ +void spdk_for_each_thread(spdk_thread_fn fn, void *ctx, spdk_thread_fn cpl); + /** * \brief Register a poller on the current thread. The poller can be * unregistered by calling spdk_poller_unregister(). diff --git a/lib/util/io_channel.c b/lib/util/io_channel.c index 9c0037f1a..0c04ce946 100644 --- a/lib/util/io_channel.c +++ b/lib/util/io_channel.c @@ -252,6 +252,58 @@ spdk_poller_unregister(struct spdk_poller **ppoller) } } +struct call_thread { + struct spdk_thread *cur_thread; + spdk_thread_fn fn; + void *ctx; + + struct spdk_thread *orig_thread; + spdk_thread_fn cpl; +}; + +static void +spdk_on_thread(void *ctx) +{ + struct call_thread *ct = ctx; + + ct->fn(ct->ctx); + + pthread_mutex_lock(&g_devlist_mutex); + ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); + pthread_mutex_unlock(&g_devlist_mutex); + + if (!ct->cur_thread) { + spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); + free(ctx); + } else { + spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ctx); + } +} + +void +spdk_for_each_thread(spdk_thread_fn fn, void *ctx, spdk_thread_fn cpl) +{ + struct call_thread *ct; + + ct = calloc(1, sizeof(*ct)); + if (!ct) { + SPDK_ERRLOG("Unable to perform thread iteration\n"); + cpl(ctx); + return; + } + + ct->fn = fn; + ct->ctx = ctx; + ct->cpl = cpl; + + pthread_mutex_lock(&g_devlist_mutex); + ct->orig_thread = _get_thread(); + ct->cur_thread = TAILQ_FIRST(&g_threads); + pthread_mutex_unlock(&g_devlist_mutex); + + spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ct); +} + void spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size) diff --git a/test/unit/lib/util/io_channel.c/io_channel_ut.c b/test/unit/lib/util/io_channel.c/io_channel_ut.c index d032bc843..e059a6fa0 100644 --- a/test/unit/lib/util/io_channel.c/io_channel_ut.c +++ b/test/unit/lib/util/io_channel.c/io_channel_ut.c @@ -97,6 +97,44 @@ thread_send_msg(void) free_threads(); } +static void +for_each_cb(void *ctx) +{ + int *count = ctx; + + (*count)++; +} + +static void +thread_for_each(void) +{ + int count = 0; + int i; + + allocate_threads(3); + set_thread(0); + + spdk_for_each_thread(for_each_cb, &count, for_each_cb); + + /* We have not polled thread 0 yet, so count should be 0 */ + CU_ASSERT(count == 0); + + /* Poll each thread to verify the message is passed to each */ + for (i = 0; i < 3; i++) { + poll_thread(i); + CU_ASSERT(count == (i + 1)); + } + + /* + * After each thread is called, the completion calls it + * one more time. + */ + poll_thread(0); + CU_ASSERT(count == 4); + + free_threads(); +} + static int channel_create(void *io_device, void *ctx_buf) { @@ -393,6 +431,7 @@ main(int argc, char **argv) if ( CU_add_test(suite, "thread_alloc", thread_alloc) == NULL || CU_add_test(suite, "thread_send_msg", thread_send_msg) == NULL || + CU_add_test(suite, "thread_for_each", thread_for_each) == NULL || CU_add_test(suite, "for_each_channel_remove", for_each_channel_remove) == NULL || CU_add_test(suite, "for_each_channel_unreg", for_each_channel_unreg) == NULL || CU_add_test(suite, "thread_name", thread_name) == NULL ||