From ff87d29cc3139c3a1f79a928960c7922edd4bb1e Mon Sep 17 00:00:00 2001 From: Ben Walker Date: Tue, 23 May 2017 11:56:40 -0700 Subject: [PATCH] io_channel: Add mechanism to call a function on each channel For a given I/O device, call a user function on the correct thread for each channel. Change-Id: I568a443184ac834c80c5e36b80aa9b6f8bb0ac99 Signed-off-by: Ben Walker Reviewed-on: https://review.gerrithub.io/362256 Tested-by: SPDK Automated Test System Reviewed-by: Jim Harris --- include/spdk/io_channel.h | 17 ++++++++ lib/util/io_channel.c | 86 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 103 insertions(+) diff --git a/include/spdk/io_channel.h b/include/spdk/io_channel.h index 86b9fd065..aea084a75 100644 --- a/include/spdk/io_channel.h +++ b/include/spdk/io_channel.h @@ -52,6 +52,10 @@ typedef void (*spdk_thread_pass_msg)(spdk_thread_fn fn, void *ctx, typedef int (*spdk_io_channel_create_cb)(void *io_device, void *ctx_buf); typedef void (*spdk_io_channel_destroy_cb)(void *io_device, void *ctx_buf); +typedef void (*spdk_channel_msg)(void *io_device, struct spdk_io_channel *ch, + void *ctx); +typedef void (*spdk_channel_for_each_cpl)(void *io_device, void *ctx); + /** * \brief Initializes the calling thread for I/O channel allocation. * @@ -136,4 +140,17 @@ void spdk_put_io_channel(struct spdk_io_channel *ch); */ void *spdk_io_channel_get_ctx(struct spdk_io_channel *ch); +/** + * \brief Call 'fn' on each channel associated with io_device. This happens + * asynchronously, so fn may be called after spdk_for_each_channel returns. + * 'fn' will be called on the correct thread for each channel. 'fn' will be + * called for each channel serially, such that two calls to 'fn' will not + * overlap in time. + * + * Once 'fn' has been called on each channel, 'cpl' will be called + * on the thread that spdk_for_each_channel was initially called from. + */ +void spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, + spdk_channel_for_each_cpl cpl); + #endif /* SPDK_IO_CHANNEL_H_ */ diff --git a/lib/util/io_channel.c b/lib/util/io_channel.c index 7bd76cc8c..8d1f30f62 100644 --- a/lib/util/io_channel.c +++ b/lib/util/io_channel.c @@ -295,3 +295,89 @@ spdk_io_channel_get_ctx(struct spdk_io_channel *ch) { return (uint8_t *)ch + sizeof(*ch); } + +struct call_channel { + void *io_device; + spdk_channel_msg fn; + void *ctx; + + struct spdk_thread *cur_thread; + struct spdk_io_channel *cur_ch; + + struct spdk_thread *orig_thread; + spdk_channel_for_each_cpl cpl; +}; + +static void +_call_channel(void *ctx) +{ + struct call_channel *ch_ctx = ctx; + struct spdk_thread *thread; + struct spdk_io_channel *ch; + + thread = ch_ctx->cur_thread; + ch = ch_ctx->cur_ch; + + ch_ctx->fn(ch_ctx->io_device, ch, ch_ctx->ctx); + + pthread_mutex_lock(&g_devlist_mutex); + thread = TAILQ_NEXT(thread, tailq); + while (thread) { + TAILQ_FOREACH(ch, &thread->io_channels, tailq) { + if (ch->io_device == ch_ctx->io_device) { + ch_ctx->cur_thread = thread; + ch_ctx->cur_ch = ch; + pthread_mutex_unlock(&g_devlist_mutex); + spdk_thread_send_msg(thread, _call_channel, ch_ctx); + return; + } + } + thread = TAILQ_NEXT(thread, tailq); + } + + pthread_mutex_unlock(&g_devlist_mutex); + + ch_ctx->cpl(ch_ctx->io_device, ch_ctx->ctx); + free(ch_ctx); +} + +void +spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, + spdk_channel_for_each_cpl cpl) +{ + struct spdk_thread *thread; + struct spdk_io_channel *ch; + struct call_channel *ch_ctx; + + ch_ctx = calloc(1, sizeof(*ch_ctx)); + if (!ch_ctx) { + SPDK_ERRLOG("Unable to allocate context\n"); + return; + } + + ch_ctx->io_device = io_device; + ch_ctx->fn = fn; + ch_ctx->ctx = ctx; + ch_ctx->cpl = cpl; + + pthread_mutex_lock(&g_devlist_mutex); + ch_ctx->orig_thread = _get_thread(); + + TAILQ_FOREACH(thread, &g_threads, tailq) { + TAILQ_FOREACH(ch, &thread->io_channels, tailq) { + if (ch->io_device == io_device) { + ch_ctx->cur_thread = thread; + ch_ctx->cur_ch = ch; + pthread_mutex_unlock(&g_devlist_mutex); + spdk_thread_send_msg(thread, _call_channel, ch_ctx); + return; + } + } + } + + free(ch_ctx); + + pthread_mutex_unlock(&g_devlist_mutex); + + cpl(io_device, ctx); +}