From a20745541e41c2d473de428f38f93832e037b9e0 Mon Sep 17 00:00:00 2001 From: Liu Xiaodong Date: Sat, 9 Jan 2021 09:46:21 -0500 Subject: [PATCH] reactor: add spdk_reactor_set_interrupt_mode func spdk_reactor_set_interrupt_mode will send event to set reactor into interrupt mode or poll mode, also set every notify_cpuset on all reactors for consistency. It can be used by RPC method or scheduler to set reactor to interrupt mode while workload is lightweight. Currently, this function is limited that the specific reactor should have no attached spdk_thread. Change-Id: I7e8f449bff1184b9a7948f80b9572066a19da60f Signed-off-by: Liu Xiaodong Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/5853 Reviewed-by: Tomasz Zawadzki Reviewed-by: Jim Harris Tested-by: SPDK CI Jenkins --- include/spdk_internal/event.h | 35 ++++++ lib/event/app.c | 6 ++ lib/event/reactor.c | 119 +++++++++++++++++++++ lib/event/spdk_event.map | 1 + test/unit/lib/event/reactor.c/reactor_ut.c | 2 + 5 files changed, 163 insertions(+) diff --git a/include/spdk_internal/event.h b/include/spdk_internal/event.h index c3a55081d..89ccbfb81 100644 --- a/include/spdk_internal/event.h +++ b/include/spdk_internal/event.h @@ -71,6 +71,13 @@ struct spdk_lw_thread { struct spdk_thread_stats last_stats; }; +/** + * Completion callback to set reactor into interrupt mode or poll mode. + * + * \param cb_arg Argument to pass to the callback function. + */ +typedef void (*spdk_reactor_set_interrupt_mode_cb)(void *cb_arg); + struct spdk_reactor { /* Lightweight threads running on this reactor */ TAILQ_HEAD(, spdk_lw_thread) threads; @@ -101,6 +108,11 @@ struct spdk_reactor { struct spdk_cpuset notify_cpuset; /* Indicate whether this reactor currently runs in interrupt */ bool in_interrupt; + bool set_interrupt_mode_in_progress; + bool new_in_interrupt; + spdk_reactor_set_interrupt_mode_cb set_interrupt_mode_cb_fn; + void *set_interrupt_mode_cb_arg; + struct spdk_fd_group *fgrp; int resched_fd; } __attribute__((aligned(SPDK_CACHE_LINE_SIZE))); @@ -127,6 +139,29 @@ struct spdk_reactor *spdk_reactor_get(uint32_t lcore); */ void spdk_for_each_reactor(spdk_event_fn fn, void *arg1, void *arg2, spdk_event_fn cpl); +/** + * Set reactor into interrupt mode or back to poll mode. + * + * Currently, this function is only permitted within spdk application thread. + * Also it requires the corresponding reactor does not have any spdk_thread. + * + * \param lcore CPU core index of specified reactor. + * \param new_in_interrupt Set interrupt mode for true, or poll mode for false. + * \param cb_fn This will be called on spdk application thread after setting interupt mode. + * \param cb_arg Argument will be passed to cb_fn when called. + * + * \return 0 on success, negtive errno on failure. + */ +int spdk_reactor_set_interrupt_mode(uint32_t lcore, bool new_in_interrupt, + spdk_reactor_set_interrupt_mode_cb cb_fn, void *cb_arg); + +/** + * Get a handle to spdk application thread. + * + * \return a pointer to spdk application thread on success or NULL on failure. + */ +struct spdk_thread *_spdk_get_app_thread(void); + struct spdk_subsystem { const char *name; /* User must call spdk_subsystem_init_next() when they are done with their initialization. */ diff --git a/lib/event/app.c b/lib/event/app.c index 3fc3aef4a..9ba3050cd 100644 --- a/lib/event/app.c +++ b/lib/event/app.c @@ -624,6 +624,12 @@ spdk_app_stop(int rc) spdk_thread_send_msg(g_app_thread, app_stop, (void *)(intptr_t)rc); } +struct spdk_thread * +_spdk_get_app_thread(void) +{ + return g_app_thread; +} + static void usage(void (*app_usage)(void)) { diff --git a/lib/event/reactor.c b/lib/event/reactor.c index 8c4f3749b..1a0da3334 100644 --- a/lib/event/reactor.c +++ b/lib/event/reactor.c @@ -332,6 +332,125 @@ spdk_reactors_fini(void) g_core_infos = NULL; } +static void _reactor_set_interrupt_mode(void *arg1, void *arg2); + +static void +_reactor_set_notify_cpuset(void *arg1, void *arg2) +{ + struct spdk_reactor *target = arg1; + struct spdk_reactor *reactor = spdk_reactor_get(spdk_env_get_current_core()); + + spdk_cpuset_set_cpu(&reactor->notify_cpuset, target->lcore, target->new_in_interrupt); +} + +static void +_reactor_set_notify_cpuset_cpl(void *arg1, void *arg2) +{ + struct spdk_reactor *target = arg1; + + if (target->new_in_interrupt == false) { + target->set_interrupt_mode_in_progress = false; + spdk_thread_send_msg(_spdk_get_app_thread(), target->set_interrupt_mode_cb_fn, + target->set_interrupt_mode_cb_arg); + } else { + struct spdk_event *ev; + + ev = spdk_event_allocate(target->lcore, _reactor_set_interrupt_mode, target, NULL); + assert(ev); + spdk_event_call(ev); + } +} + +static void +_reactor_set_interrupt_mode(void *arg1, void *arg2) +{ + struct spdk_reactor *target = arg1; + struct spdk_reactor *reactor = spdk_reactor_get(spdk_env_get_current_core()); + + assert(target != NULL); + assert(target == reactor); + assert(target->in_interrupt != target->new_in_interrupt); + assert(TAILQ_EMPTY(&target->threads)); + SPDK_DEBUGLOG(reactor, "Do reactor set on core %u from %s to state %s\n", + target->lcore, !target->in_interrupt ? "intr" : "poll", target->new_in_interrupt ? "intr" : "poll"); + + target->in_interrupt = target->new_in_interrupt; + + if (target->new_in_interrupt == false) { + spdk_for_each_reactor(_reactor_set_notify_cpuset, target, NULL, _reactor_set_notify_cpuset_cpl); + } else { + uint64_t notify = 1; + int rc = 0; + + /* Always trigger spdk_event and resched event in case of race condition */ + rc = write(target->events_fd, ¬ify, sizeof(notify)); + if (rc < 0) { + SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno)); + } + rc = write(target->resched_fd, ¬ify, sizeof(notify)); + if (rc < 0) { + SPDK_ERRLOG("failed to notify reschedule: %s.\n", spdk_strerror(errno)); + } + + target->set_interrupt_mode_in_progress = false; + spdk_thread_send_msg(_spdk_get_app_thread(), target->set_interrupt_mode_cb_fn, + target->set_interrupt_mode_cb_arg); + } +} + +int +spdk_reactor_set_interrupt_mode(uint32_t lcore, bool new_in_interrupt, + spdk_reactor_set_interrupt_mode_cb cb_fn, void *cb_arg) +{ + struct spdk_reactor *target; + + target = spdk_reactor_get(lcore); + if (target == NULL) { + return -EINVAL; + } + + if (spdk_get_thread() != _spdk_get_app_thread()) { + SPDK_ERRLOG("It is only permitted within spdk application thread.\n"); + return -EPERM; + } + + if (target->in_interrupt == new_in_interrupt) { + return 0; + } + + if (target->set_interrupt_mode_in_progress) { + SPDK_NOTICELOG("Reactor(%u) is already in progress to set interrupt mode\n", lcore); + return -EBUSY; + } + target->set_interrupt_mode_in_progress = true; + + target->new_in_interrupt = new_in_interrupt; + target->set_interrupt_mode_cb_fn = cb_fn; + target->set_interrupt_mode_cb_arg = cb_arg; + + SPDK_DEBUGLOG(reactor, "Starting reactor event from %d to %d\n", + spdk_env_get_current_core(), lcore); + + if (new_in_interrupt == false) { + /* For potential race cases, when setting the reactor to poll mode, + * first change the mode of the reactor and then clear the corresponding + * bit of the notify_cpuset of each reactor. + */ + struct spdk_event *ev; + + ev = spdk_event_allocate(lcore, _reactor_set_interrupt_mode, target, NULL); + assert(ev); + spdk_event_call(ev); + } else { + /* For race caces, when setting the reactor to interrupt mode, first set the + * corresponding bit of the notify_cpuset of each reactor and then change the mode. + */ + spdk_for_each_reactor(_reactor_set_notify_cpuset, target, NULL, _reactor_set_notify_cpuset_cpl); + } + + return 0; +} + struct spdk_event * spdk_event_allocate(uint32_t lcore, spdk_event_fn fn, void *arg1, void *arg2) { diff --git a/lib/event/spdk_event.map b/lib/event/spdk_event.map index 57c7471b2..9a4ba5603 100644 --- a/lib/event/spdk_event.map +++ b/lib/event/spdk_event.map @@ -24,6 +24,7 @@ spdk_reactors_stop; spdk_reactor_get; spdk_for_each_reactor; + spdk_reactor_set_interrupt_mode; spdk_subsystem_find; spdk_subsystem_get_first; spdk_subsystem_get_next; diff --git a/test/unit/lib/event/reactor.c/reactor_ut.c b/test/unit/lib/event/reactor.c/reactor_ut.c index 677fd7371..3d5acbb37 100644 --- a/test/unit/lib/event/reactor.c/reactor_ut.c +++ b/test/unit/lib/event/reactor.c/reactor_ut.c @@ -40,6 +40,8 @@ #include "event/scheduler_static.c" #include "event/scheduler_dynamic.c" +DEFINE_STUB(_spdk_get_app_thread, struct spdk_thread *, (void), NULL); + static void test_create_reactor(void) {