channel: Add spdk_for_each_thread

This function will send a message to each allocated
thread asynchronously, then call a callback on
the originating thread.

Change-Id: I3ebe7c6c5b460a944a32487d1091b601a482a256
Signed-off-by: Ben Walker <benjamin.walker@intel.com>
Reviewed-on: https://review.gerrithub.io/388041
Reviewed-by: Jim Harris <james.r.harris@intel.com>
Reviewed-by: Daniel Verkamp <daniel.verkamp@intel.com>
Reviewed-by: Dariusz Stojaczyk <dariuszx.stojaczyk@intel.com>
Tested-by: SPDK Automated Test System <sys_sgsw@intel.com>
This commit is contained in:
Ben Walker 2017-11-01 15:12:13 -07:00 committed by Jim Harris
parent eaaddf3d48
commit 111eef8f19
3 changed files with 103 additions and 0 deletions

View File

@ -116,6 +116,18 @@ const char *spdk_thread_get_name(const struct spdk_thread *thread);
*/ */
void spdk_thread_send_msg(const struct spdk_thread *thread, spdk_thread_fn fn, void *ctx); void spdk_thread_send_msg(const struct spdk_thread *thread, spdk_thread_fn fn, void *ctx);
/**
* \brief Send a message to each thread, serially. The message
* is sent asynchronously - i.e. spdk_for_each_thread
* will return prior to `fn` being called on each thread.
*
* @param fn This is the function that will be called on each thread.
* @param ctx This context will be passed to fn when called.
* @param cpl This will be called on the originating thread after `fn` has been
* called on each thread.
*/
void spdk_for_each_thread(spdk_thread_fn fn, void *ctx, spdk_thread_fn cpl);
/** /**
* \brief Register a poller on the current thread. The poller can be * \brief Register a poller on the current thread. The poller can be
* unregistered by calling spdk_poller_unregister(). * unregistered by calling spdk_poller_unregister().

View File

@ -252,6 +252,58 @@ spdk_poller_unregister(struct spdk_poller **ppoller)
} }
} }
struct call_thread {
struct spdk_thread *cur_thread;
spdk_thread_fn fn;
void *ctx;
struct spdk_thread *orig_thread;
spdk_thread_fn cpl;
};
static void
spdk_on_thread(void *ctx)
{
struct call_thread *ct = ctx;
ct->fn(ct->ctx);
pthread_mutex_lock(&g_devlist_mutex);
ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq);
pthread_mutex_unlock(&g_devlist_mutex);
if (!ct->cur_thread) {
spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx);
free(ctx);
} else {
spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ctx);
}
}
void
spdk_for_each_thread(spdk_thread_fn fn, void *ctx, spdk_thread_fn cpl)
{
struct call_thread *ct;
ct = calloc(1, sizeof(*ct));
if (!ct) {
SPDK_ERRLOG("Unable to perform thread iteration\n");
cpl(ctx);
return;
}
ct->fn = fn;
ct->ctx = ctx;
ct->cpl = cpl;
pthread_mutex_lock(&g_devlist_mutex);
ct->orig_thread = _get_thread();
ct->cur_thread = TAILQ_FIRST(&g_threads);
pthread_mutex_unlock(&g_devlist_mutex);
spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ct);
}
void void
spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb,
spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size) spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size)

View File

@ -97,6 +97,44 @@ thread_send_msg(void)
free_threads(); free_threads();
} }
static void
for_each_cb(void *ctx)
{
int *count = ctx;
(*count)++;
}
static void
thread_for_each(void)
{
int count = 0;
int i;
allocate_threads(3);
set_thread(0);
spdk_for_each_thread(for_each_cb, &count, for_each_cb);
/* We have not polled thread 0 yet, so count should be 0 */
CU_ASSERT(count == 0);
/* Poll each thread to verify the message is passed to each */
for (i = 0; i < 3; i++) {
poll_thread(i);
CU_ASSERT(count == (i + 1));
}
/*
* After each thread is called, the completion calls it
* one more time.
*/
poll_thread(0);
CU_ASSERT(count == 4);
free_threads();
}
static int static int
channel_create(void *io_device, void *ctx_buf) channel_create(void *io_device, void *ctx_buf)
{ {
@ -393,6 +431,7 @@ main(int argc, char **argv)
if ( if (
CU_add_test(suite, "thread_alloc", thread_alloc) == NULL || CU_add_test(suite, "thread_alloc", thread_alloc) == NULL ||
CU_add_test(suite, "thread_send_msg", thread_send_msg) == NULL || CU_add_test(suite, "thread_send_msg", thread_send_msg) == NULL ||
CU_add_test(suite, "thread_for_each", thread_for_each) == NULL ||
CU_add_test(suite, "for_each_channel_remove", for_each_channel_remove) == NULL || CU_add_test(suite, "for_each_channel_remove", for_each_channel_remove) == NULL ||
CU_add_test(suite, "for_each_channel_unreg", for_each_channel_unreg) == NULL || CU_add_test(suite, "for_each_channel_unreg", for_each_channel_unreg) == NULL ||
CU_add_test(suite, "thread_name", thread_name) == NULL || CU_add_test(suite, "thread_name", thread_name) == NULL ||