diff --git a/include/spdk/io_channel.h b/include/spdk/io_channel.h index 69f81a4b3..b5fe56c46 100644 --- a/include/spdk/io_channel.h +++ b/include/spdk/io_channel.h @@ -45,13 +45,23 @@ struct spdk_thread; struct spdk_io_channel; +typedef void (*spdk_thread_fn)(void *ctx); +typedef void (*spdk_thread_pass_msg)(spdk_thread_fn fn, void *ctx, + void *thread_ctx); + typedef int (*io_channel_create_cb_t)(void *io_device, void *ctx_buf); typedef void (*io_channel_destroy_cb_t)(void *io_device, void *ctx_buf); /** * \brief Initializes the calling thread for I/O channel allocation. + * + * @param fn A function that may be called from any thread and is + * passed a function pointer (spdk_thread_fn) that must be + * called on the same thread that spdk_allocate_thread + * was called from. + * @param thread_ctx Context that will be passed to fn. */ -struct spdk_thread *spdk_allocate_thread(void); +struct spdk_thread *spdk_allocate_thread(spdk_thread_pass_msg fn, void *thread_ctx); /** * \brief Releases any resources related to the calling thread for I/O channel allocation. @@ -62,10 +72,22 @@ struct spdk_thread *spdk_allocate_thread(void); void spdk_free_thread(void); /** - * \brief Get a handle to the current thread. + * \brief Get a handle to the current thread. This handle may be passed + * to other threads and used as the target of spdk_thread_send_msg(). */ struct spdk_thread *spdk_get_thread(void); +/** + * \brief Send a message to the given thread. The message + * may be sent asynchronously - i.e. spdk_thread_send_msg + * may return prior to `fn` being called. + * + * @param thread The target thread. + * @param fn This function will be called on the given thread. + * @param ctx This context will be passed to fn when called. + */ +void spdk_thread_send_msg(const struct spdk_thread *thread, spdk_thread_fn fn, void *ctx); + /** * \brief Register the opaque io_device context as an I/O device. * diff --git a/lib/event/reactor.c b/lib/event/reactor.c index 8e1b6934b..361e81b4f 100644 --- a/lib/event/reactor.c +++ b/lib/event/reactor.c @@ -264,6 +264,27 @@ _spdk_poller_unregister_complete(struct spdk_poller *poller) free(poller); } +static void +_spdk_reactor_msg_passed(void *arg1, void *arg2) +{ + spdk_thread_fn fn = arg1; + + fn(arg2); +} + +static void +_spdk_reactor_send_msg(spdk_thread_fn fn, void *ctx, void *thread_ctx) +{ + uint32_t core; + struct spdk_event *event; + + core = *(uint32_t *)thread_ctx; + + event = spdk_event_allocate(core, _spdk_reactor_msg_passed, fn, ctx); + + spdk_event_call(event); +} + /** * * \brief This is the main function of the reactor thread. @@ -296,7 +317,7 @@ _spdk_reactor_run(void *arg) uint32_t sleep_us; uint32_t timer_poll_count; - spdk_allocate_thread(); + spdk_allocate_thread(_spdk_reactor_send_msg, &reactor->lcore); set_reactor_thread_name(reactor->lcore); SPDK_NOTICELOG("Reactor started on core %u on socket %u\n", reactor->lcore, reactor->socket_id); diff --git a/lib/util/io_channel.c b/lib/util/io_channel.c index 7b372cd83..7654ae601 100644 --- a/lib/util/io_channel.c +++ b/lib/util/io_channel.c @@ -64,14 +64,18 @@ struct spdk_io_channel { }; struct spdk_thread { + spdk_thread_pass_msg fn; + void *thread_ctx; TAILQ_HEAD(, spdk_io_channel) io_channels; }; static __thread struct spdk_thread g_thread; struct spdk_thread * -spdk_allocate_thread(void) +spdk_allocate_thread(spdk_thread_pass_msg fn, void *thread_ctx) { + g_thread.fn = fn; + g_thread.thread_ctx = thread_ctx; TAILQ_INIT(&g_thread.io_channels); return &g_thread; } @@ -88,6 +92,12 @@ spdk_get_thread(void) return &g_thread; } +void +spdk_thread_send_msg(const struct spdk_thread *thread, spdk_thread_fn fn, void *ctx) +{ + thread->fn(fn, ctx, thread->thread_ctx); +} + void spdk_io_device_register(void *io_device, io_channel_create_cb_t create_cb, io_channel_destroy_cb_t destroy_cb, uint32_t ctx_size) diff --git a/test/lib/blob/blob_ut/blob_ut.c b/test/lib/blob/blob_ut/blob_ut.c index 69b9772f2..52eb4c1f8 100644 --- a/test/lib/blob/blob_ut/blob_ut.c +++ b/test/lib/blob/blob_ut/blob_ut.c @@ -48,6 +48,12 @@ int g_bserrno; struct spdk_xattr_names *g_names; int g_done; +static void +_bs_send_msg(spdk_thread_fn fn, void *ctx, void *thread_ctx) +{ + fn(ctx); +} + static void bs_op_complete(void *cb_arg, int bserrno) { @@ -963,7 +969,7 @@ int main(int argc, char **argv) } g_dev_buffer = calloc(1, DEV_BUFFER_SIZE); - spdk_allocate_thread(); + spdk_allocate_thread(_bs_send_msg, NULL); CU_basic_set_mode(CU_BRM_VERBOSE); CU_basic_run_tests(); num_failures = CU_get_number_of_failures(); diff --git a/test/lib/blobfs/blobfs_async_ut/blobfs_async_ut.c b/test/lib/blobfs/blobfs_async_ut/blobfs_async_ut.c index 18c31e617..c678bfb09 100644 --- a/test/lib/blobfs/blobfs_async_ut/blobfs_async_ut.c +++ b/test/lib/blobfs/blobfs_async_ut/blobfs_async_ut.c @@ -47,6 +47,12 @@ struct spdk_filesystem *g_fs; struct spdk_file *g_file; int g_fserrno; +static void +_fs_send_msg(spdk_thread_fn fn, void *ctx, void *thread_ctx) +{ + fn(ctx); +} + static void fs_op_complete(void *ctx, int fserrno) { @@ -67,7 +73,7 @@ fs_init(void) struct spdk_bs_dev dev; init_dev(&dev); - spdk_allocate_thread(); + spdk_allocate_thread(_fs_send_msg, NULL); spdk_fs_init(&dev, NULL, fs_op_with_handle_complete, NULL); CU_ASSERT(g_fs != NULL); @@ -109,7 +115,7 @@ fs_open(void) struct spdk_file *file; init_dev(&dev); - spdk_allocate_thread(); + spdk_allocate_thread(_fs_send_msg, NULL); spdk_fs_init(&dev, NULL, fs_op_with_handle_complete, NULL); CU_ASSERT(g_fs != NULL); @@ -171,7 +177,7 @@ fs_truncate(void) struct spdk_bs_dev dev; init_dev(&dev); - spdk_allocate_thread(); + spdk_allocate_thread(_fs_send_msg, NULL); spdk_fs_init(&dev, NULL, fs_op_with_handle_complete, NULL); SPDK_CU_ASSERT_FATAL(g_fs != NULL); @@ -224,7 +230,7 @@ fs_rename(void) struct spdk_bs_dev dev; init_dev(&dev); - spdk_allocate_thread(); + spdk_allocate_thread(_fs_send_msg, NULL); spdk_fs_init(&dev, NULL, fs_op_with_handle_complete, NULL); SPDK_CU_ASSERT_FATAL(g_fs != NULL); @@ -377,7 +383,7 @@ channel_ops(void) struct spdk_io_channel *channel; init_dev(&dev); - spdk_allocate_thread(); + spdk_allocate_thread(_fs_send_msg, NULL); spdk_fs_init(&dev, NULL, fs_op_with_handle_complete, NULL); SPDK_CU_ASSERT_FATAL(g_fs != NULL); @@ -405,7 +411,7 @@ channel_ops_sync(void) struct spdk_io_channel *channel; init_dev(&dev); - spdk_allocate_thread(); + spdk_allocate_thread(_fs_send_msg, NULL); spdk_fs_init(&dev, NULL, fs_op_with_handle_complete, NULL); SPDK_CU_ASSERT_FATAL(g_fs != NULL); diff --git a/test/lib/blobfs/blobfs_sync_ut/blobfs_sync_ut.c b/test/lib/blobfs/blobfs_sync_ut/blobfs_sync_ut.c index 09bf3a1fd..432e727a5 100644 --- a/test/lib/blobfs/blobfs_sync_ut/blobfs_sync_ut.c +++ b/test/lib/blobfs/blobfs_sync_ut/blobfs_sync_ut.c @@ -51,6 +51,12 @@ int g_fserrno; struct spdk_bs_dev g_dev; +static void +_fs_send_msg(spdk_thread_fn fn, void *ctx, void *thread_ctx) +{ + fn(ctx); +} + struct ut_request { fs_request_fn fn; void *arg; @@ -144,7 +150,7 @@ cache_write(void) ut_send_request(_fs_init, NULL); - spdk_allocate_thread(); + spdk_allocate_thread(_fs_send_msg, NULL); channel = spdk_fs_alloc_io_channel_sync(g_fs); rc = spdk_fs_open_file(g_fs, channel, "testfile", SPDK_BLOBFS_OPEN_CREATE, &g_file); @@ -182,7 +188,7 @@ cache_write_null_buffer(void) ut_send_request(_fs_init, NULL); - spdk_allocate_thread(); + spdk_allocate_thread(_fs_send_msg, NULL); channel = spdk_fs_alloc_io_channel_sync(g_fs); rc = spdk_fs_open_file(g_fs, channel, "testfile", SPDK_BLOBFS_OPEN_CREATE, &g_file); @@ -214,7 +220,7 @@ cache_append_no_cache(void) ut_send_request(_fs_init, NULL); - spdk_allocate_thread(); + spdk_allocate_thread(_fs_send_msg, NULL); channel = spdk_fs_alloc_io_channel_sync(g_fs); rc = spdk_fs_open_file(g_fs, channel, "testfile", SPDK_BLOBFS_OPEN_CREATE, &g_file); @@ -256,7 +262,7 @@ spdk_thread(void *arg) { struct ut_request *req; - spdk_allocate_thread(); + spdk_allocate_thread(_fs_send_msg, NULL); while (1) { pthread_mutex_lock(&g_mutex); diff --git a/test/lib/util/io_channel/io_channel_ut.c b/test/lib/util/io_channel/io_channel_ut.c index f0c3e9782..85d15e40d 100644 --- a/test/lib/util/io_channel/io_channel_ut.c +++ b/test/lib/util/io_channel/io_channel_ut.c @@ -37,10 +37,16 @@ #include "util/io_channel.c" +static void +_send_msg(spdk_thread_fn fn, void *ctx, void *thread_ctx) +{ + fn(ctx); +} + static void thread_alloc(void) { - spdk_allocate_thread(); + spdk_allocate_thread(_send_msg, NULL); spdk_free_thread(); } @@ -100,7 +106,7 @@ channel(void) struct spdk_io_channel *ch1, *ch2; void *ctx; - spdk_allocate_thread(); + spdk_allocate_thread(_send_msg, NULL); spdk_io_device_register(&device1, create_cb_1, destroy_cb_1, sizeof(ctx1)); spdk_io_device_register(&device2, create_cb_2, destroy_cb_2, sizeof(ctx2)); spdk_io_device_register(&device3, create_cb_null, NULL, 0);