io_channel: Add a message passing callback

When a thread is registered, the user must provide
a function pointer that can pass a message to that thread.

Change-Id: I743b5e0d6e3b5118c0a68d2fcedbccdd6fb237f9
Signed-off-by: Ben Walker <benjamin.walker@intel.com>
Reviewed-on: https://review.gerrithub.io/362067
Reviewed-by: Jim Harris <james.r.harris@intel.com>
Tested-by: SPDK Automated Test System <sys_sgsw@intel.com>
This commit is contained in:
Ben Walker 2017-05-17 08:58:16 -07:00
parent f1c5344b32
commit 2ef5722f3c
7 changed files with 94 additions and 17 deletions

View File

@ -45,13 +45,23 @@
struct spdk_thread;
struct spdk_io_channel;
typedef void (*spdk_thread_fn)(void *ctx);
typedef void (*spdk_thread_pass_msg)(spdk_thread_fn fn, void *ctx,
void *thread_ctx);
typedef int (*io_channel_create_cb_t)(void *io_device, void *ctx_buf);
typedef void (*io_channel_destroy_cb_t)(void *io_device, void *ctx_buf);
/**
* \brief Initializes the calling thread for I/O channel allocation.
*
* @param fn A function that may be called from any thread and is
* passed a function pointer (spdk_thread_fn) that must be
* called on the same thread that spdk_allocate_thread
* was called from.
* @param thread_ctx Context that will be passed to fn.
*/
struct spdk_thread *spdk_allocate_thread(void);
struct spdk_thread *spdk_allocate_thread(spdk_thread_pass_msg fn, void *thread_ctx);
/**
* \brief Releases any resources related to the calling thread for I/O channel allocation.
@ -62,10 +72,22 @@ struct spdk_thread *spdk_allocate_thread(void);
void spdk_free_thread(void);
/**
* \brief Get a handle to the current thread.
* \brief Get a handle to the current thread. This handle may be passed
* to other threads and used as the target of spdk_thread_send_msg().
*/
struct spdk_thread *spdk_get_thread(void);
/**
* \brief Send a message to the given thread. The message
* may be sent asynchronously - i.e. spdk_thread_send_msg
* may return prior to `fn` being called.
*
* @param thread The target thread.
* @param fn This function will be called on the given thread.
* @param ctx This context will be passed to fn when called.
*/
void spdk_thread_send_msg(const struct spdk_thread *thread, spdk_thread_fn fn, void *ctx);
/**
* \brief Register the opaque io_device context as an I/O device.
*

View File

@ -264,6 +264,27 @@ _spdk_poller_unregister_complete(struct spdk_poller *poller)
free(poller);
}
static void
_spdk_reactor_msg_passed(void *arg1, void *arg2)
{
spdk_thread_fn fn = arg1;
fn(arg2);
}
static void
_spdk_reactor_send_msg(spdk_thread_fn fn, void *ctx, void *thread_ctx)
{
uint32_t core;
struct spdk_event *event;
core = *(uint32_t *)thread_ctx;
event = spdk_event_allocate(core, _spdk_reactor_msg_passed, fn, ctx);
spdk_event_call(event);
}
/**
*
* \brief This is the main function of the reactor thread.
@ -296,7 +317,7 @@ _spdk_reactor_run(void *arg)
uint32_t sleep_us;
uint32_t timer_poll_count;
spdk_allocate_thread();
spdk_allocate_thread(_spdk_reactor_send_msg, &reactor->lcore);
set_reactor_thread_name(reactor->lcore);
SPDK_NOTICELOG("Reactor started on core %u on socket %u\n", reactor->lcore,
reactor->socket_id);

View File

@ -64,14 +64,18 @@ struct spdk_io_channel {
};
struct spdk_thread {
spdk_thread_pass_msg fn;
void *thread_ctx;
TAILQ_HEAD(, spdk_io_channel) io_channels;
};
static __thread struct spdk_thread g_thread;
struct spdk_thread *
spdk_allocate_thread(void)
spdk_allocate_thread(spdk_thread_pass_msg fn, void *thread_ctx)
{
g_thread.fn = fn;
g_thread.thread_ctx = thread_ctx;
TAILQ_INIT(&g_thread.io_channels);
return &g_thread;
}
@ -88,6 +92,12 @@ spdk_get_thread(void)
return &g_thread;
}
void
spdk_thread_send_msg(const struct spdk_thread *thread, spdk_thread_fn fn, void *ctx)
{
thread->fn(fn, ctx, thread->thread_ctx);
}
void
spdk_io_device_register(void *io_device, io_channel_create_cb_t create_cb,
io_channel_destroy_cb_t destroy_cb, uint32_t ctx_size)

View File

@ -48,6 +48,12 @@ int g_bserrno;
struct spdk_xattr_names *g_names;
int g_done;
static void
_bs_send_msg(spdk_thread_fn fn, void *ctx, void *thread_ctx)
{
fn(ctx);
}
static void
bs_op_complete(void *cb_arg, int bserrno)
{
@ -963,7 +969,7 @@ int main(int argc, char **argv)
}
g_dev_buffer = calloc(1, DEV_BUFFER_SIZE);
spdk_allocate_thread();
spdk_allocate_thread(_bs_send_msg, NULL);
CU_basic_set_mode(CU_BRM_VERBOSE);
CU_basic_run_tests();
num_failures = CU_get_number_of_failures();

View File

@ -47,6 +47,12 @@ struct spdk_filesystem *g_fs;
struct spdk_file *g_file;
int g_fserrno;
static void
_fs_send_msg(spdk_thread_fn fn, void *ctx, void *thread_ctx)
{
fn(ctx);
}
static void
fs_op_complete(void *ctx, int fserrno)
{
@ -67,7 +73,7 @@ fs_init(void)
struct spdk_bs_dev dev;
init_dev(&dev);
spdk_allocate_thread();
spdk_allocate_thread(_fs_send_msg, NULL);
spdk_fs_init(&dev, NULL, fs_op_with_handle_complete, NULL);
CU_ASSERT(g_fs != NULL);
@ -109,7 +115,7 @@ fs_open(void)
struct spdk_file *file;
init_dev(&dev);
spdk_allocate_thread();
spdk_allocate_thread(_fs_send_msg, NULL);
spdk_fs_init(&dev, NULL, fs_op_with_handle_complete, NULL);
CU_ASSERT(g_fs != NULL);
@ -171,7 +177,7 @@ fs_truncate(void)
struct spdk_bs_dev dev;
init_dev(&dev);
spdk_allocate_thread();
spdk_allocate_thread(_fs_send_msg, NULL);
spdk_fs_init(&dev, NULL, fs_op_with_handle_complete, NULL);
SPDK_CU_ASSERT_FATAL(g_fs != NULL);
@ -224,7 +230,7 @@ fs_rename(void)
struct spdk_bs_dev dev;
init_dev(&dev);
spdk_allocate_thread();
spdk_allocate_thread(_fs_send_msg, NULL);
spdk_fs_init(&dev, NULL, fs_op_with_handle_complete, NULL);
SPDK_CU_ASSERT_FATAL(g_fs != NULL);
@ -377,7 +383,7 @@ channel_ops(void)
struct spdk_io_channel *channel;
init_dev(&dev);
spdk_allocate_thread();
spdk_allocate_thread(_fs_send_msg, NULL);
spdk_fs_init(&dev, NULL, fs_op_with_handle_complete, NULL);
SPDK_CU_ASSERT_FATAL(g_fs != NULL);
@ -405,7 +411,7 @@ channel_ops_sync(void)
struct spdk_io_channel *channel;
init_dev(&dev);
spdk_allocate_thread();
spdk_allocate_thread(_fs_send_msg, NULL);
spdk_fs_init(&dev, NULL, fs_op_with_handle_complete, NULL);
SPDK_CU_ASSERT_FATAL(g_fs != NULL);

View File

@ -51,6 +51,12 @@ int g_fserrno;
struct spdk_bs_dev g_dev;
static void
_fs_send_msg(spdk_thread_fn fn, void *ctx, void *thread_ctx)
{
fn(ctx);
}
struct ut_request {
fs_request_fn fn;
void *arg;
@ -144,7 +150,7 @@ cache_write(void)
ut_send_request(_fs_init, NULL);
spdk_allocate_thread();
spdk_allocate_thread(_fs_send_msg, NULL);
channel = spdk_fs_alloc_io_channel_sync(g_fs);
rc = spdk_fs_open_file(g_fs, channel, "testfile", SPDK_BLOBFS_OPEN_CREATE, &g_file);
@ -182,7 +188,7 @@ cache_write_null_buffer(void)
ut_send_request(_fs_init, NULL);
spdk_allocate_thread();
spdk_allocate_thread(_fs_send_msg, NULL);
channel = spdk_fs_alloc_io_channel_sync(g_fs);
rc = spdk_fs_open_file(g_fs, channel, "testfile", SPDK_BLOBFS_OPEN_CREATE, &g_file);
@ -214,7 +220,7 @@ cache_append_no_cache(void)
ut_send_request(_fs_init, NULL);
spdk_allocate_thread();
spdk_allocate_thread(_fs_send_msg, NULL);
channel = spdk_fs_alloc_io_channel_sync(g_fs);
rc = spdk_fs_open_file(g_fs, channel, "testfile", SPDK_BLOBFS_OPEN_CREATE, &g_file);
@ -256,7 +262,7 @@ spdk_thread(void *arg)
{
struct ut_request *req;
spdk_allocate_thread();
spdk_allocate_thread(_fs_send_msg, NULL);
while (1) {
pthread_mutex_lock(&g_mutex);

View File

@ -37,10 +37,16 @@
#include "util/io_channel.c"
static void
_send_msg(spdk_thread_fn fn, void *ctx, void *thread_ctx)
{
fn(ctx);
}
static void
thread_alloc(void)
{
spdk_allocate_thread();
spdk_allocate_thread(_send_msg, NULL);
spdk_free_thread();
}
@ -100,7 +106,7 @@ channel(void)
struct spdk_io_channel *ch1, *ch2;
void *ctx;
spdk_allocate_thread();
spdk_allocate_thread(_send_msg, NULL);
spdk_io_device_register(&device1, create_cb_1, destroy_cb_1, sizeof(ctx1));
spdk_io_device_register(&device2, create_cb_2, destroy_cb_2, sizeof(ctx2));
spdk_io_device_register(&device3, create_cb_null, NULL, 0);