diff --git a/include/spdk_internal/event.h b/include/spdk_internal/event.h index c3a55081d..89ccbfb81 100644 --- a/include/spdk_internal/event.h +++ b/include/spdk_internal/event.h @@ -71,6 +71,13 @@ struct spdk_lw_thread { struct spdk_thread_stats last_stats; }; +/** + * Completion callback to set reactor into interrupt mode or poll mode. + * + * \param cb_arg Argument to pass to the callback function. + */ +typedef void (*spdk_reactor_set_interrupt_mode_cb)(void *cb_arg); + struct spdk_reactor { /* Lightweight threads running on this reactor */ TAILQ_HEAD(, spdk_lw_thread) threads; @@ -101,6 +108,11 @@ struct spdk_reactor { struct spdk_cpuset notify_cpuset; /* Indicate whether this reactor currently runs in interrupt */ bool in_interrupt; + bool set_interrupt_mode_in_progress; + bool new_in_interrupt; + spdk_reactor_set_interrupt_mode_cb set_interrupt_mode_cb_fn; + void *set_interrupt_mode_cb_arg; + struct spdk_fd_group *fgrp; int resched_fd; } __attribute__((aligned(SPDK_CACHE_LINE_SIZE))); @@ -127,6 +139,29 @@ struct spdk_reactor *spdk_reactor_get(uint32_t lcore); */ void spdk_for_each_reactor(spdk_event_fn fn, void *arg1, void *arg2, spdk_event_fn cpl); +/** + * Set reactor into interrupt mode or back to poll mode. + * + * Currently, this function is only permitted within spdk application thread. + * Also it requires the corresponding reactor does not have any spdk_thread. + * + * \param lcore CPU core index of specified reactor. + * \param new_in_interrupt Set interrupt mode for true, or poll mode for false. + * \param cb_fn This will be called on spdk application thread after setting interupt mode. + * \param cb_arg Argument will be passed to cb_fn when called. + * + * \return 0 on success, negtive errno on failure. + */ +int spdk_reactor_set_interrupt_mode(uint32_t lcore, bool new_in_interrupt, + spdk_reactor_set_interrupt_mode_cb cb_fn, void *cb_arg); + +/** + * Get a handle to spdk application thread. + * + * \return a pointer to spdk application thread on success or NULL on failure. + */ +struct spdk_thread *_spdk_get_app_thread(void); + struct spdk_subsystem { const char *name; /* User must call spdk_subsystem_init_next() when they are done with their initialization. */ diff --git a/lib/event/app.c b/lib/event/app.c index 3fc3aef4a..9ba3050cd 100644 --- a/lib/event/app.c +++ b/lib/event/app.c @@ -624,6 +624,12 @@ spdk_app_stop(int rc) spdk_thread_send_msg(g_app_thread, app_stop, (void *)(intptr_t)rc); } +struct spdk_thread * +_spdk_get_app_thread(void) +{ + return g_app_thread; +} + static void usage(void (*app_usage)(void)) { diff --git a/lib/event/reactor.c b/lib/event/reactor.c index 8c4f3749b..1a0da3334 100644 --- a/lib/event/reactor.c +++ b/lib/event/reactor.c @@ -332,6 +332,125 @@ spdk_reactors_fini(void) g_core_infos = NULL; } +static void _reactor_set_interrupt_mode(void *arg1, void *arg2); + +static void +_reactor_set_notify_cpuset(void *arg1, void *arg2) +{ + struct spdk_reactor *target = arg1; + struct spdk_reactor *reactor = spdk_reactor_get(spdk_env_get_current_core()); + + spdk_cpuset_set_cpu(&reactor->notify_cpuset, target->lcore, target->new_in_interrupt); +} + +static void +_reactor_set_notify_cpuset_cpl(void *arg1, void *arg2) +{ + struct spdk_reactor *target = arg1; + + if (target->new_in_interrupt == false) { + target->set_interrupt_mode_in_progress = false; + spdk_thread_send_msg(_spdk_get_app_thread(), target->set_interrupt_mode_cb_fn, + target->set_interrupt_mode_cb_arg); + } else { + struct spdk_event *ev; + + ev = spdk_event_allocate(target->lcore, _reactor_set_interrupt_mode, target, NULL); + assert(ev); + spdk_event_call(ev); + } +} + +static void +_reactor_set_interrupt_mode(void *arg1, void *arg2) +{ + struct spdk_reactor *target = arg1; + struct spdk_reactor *reactor = spdk_reactor_get(spdk_env_get_current_core()); + + assert(target != NULL); + assert(target == reactor); + assert(target->in_interrupt != target->new_in_interrupt); + assert(TAILQ_EMPTY(&target->threads)); + SPDK_DEBUGLOG(reactor, "Do reactor set on core %u from %s to state %s\n", + target->lcore, !target->in_interrupt ? "intr" : "poll", target->new_in_interrupt ? "intr" : "poll"); + + target->in_interrupt = target->new_in_interrupt; + + if (target->new_in_interrupt == false) { + spdk_for_each_reactor(_reactor_set_notify_cpuset, target, NULL, _reactor_set_notify_cpuset_cpl); + } else { + uint64_t notify = 1; + int rc = 0; + + /* Always trigger spdk_event and resched event in case of race condition */ + rc = write(target->events_fd, ¬ify, sizeof(notify)); + if (rc < 0) { + SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno)); + } + rc = write(target->resched_fd, ¬ify, sizeof(notify)); + if (rc < 0) { + SPDK_ERRLOG("failed to notify reschedule: %s.\n", spdk_strerror(errno)); + } + + target->set_interrupt_mode_in_progress = false; + spdk_thread_send_msg(_spdk_get_app_thread(), target->set_interrupt_mode_cb_fn, + target->set_interrupt_mode_cb_arg); + } +} + +int +spdk_reactor_set_interrupt_mode(uint32_t lcore, bool new_in_interrupt, + spdk_reactor_set_interrupt_mode_cb cb_fn, void *cb_arg) +{ + struct spdk_reactor *target; + + target = spdk_reactor_get(lcore); + if (target == NULL) { + return -EINVAL; + } + + if (spdk_get_thread() != _spdk_get_app_thread()) { + SPDK_ERRLOG("It is only permitted within spdk application thread.\n"); + return -EPERM; + } + + if (target->in_interrupt == new_in_interrupt) { + return 0; + } + + if (target->set_interrupt_mode_in_progress) { + SPDK_NOTICELOG("Reactor(%u) is already in progress to set interrupt mode\n", lcore); + return -EBUSY; + } + target->set_interrupt_mode_in_progress = true; + + target->new_in_interrupt = new_in_interrupt; + target->set_interrupt_mode_cb_fn = cb_fn; + target->set_interrupt_mode_cb_arg = cb_arg; + + SPDK_DEBUGLOG(reactor, "Starting reactor event from %d to %d\n", + spdk_env_get_current_core(), lcore); + + if (new_in_interrupt == false) { + /* For potential race cases, when setting the reactor to poll mode, + * first change the mode of the reactor and then clear the corresponding + * bit of the notify_cpuset of each reactor. + */ + struct spdk_event *ev; + + ev = spdk_event_allocate(lcore, _reactor_set_interrupt_mode, target, NULL); + assert(ev); + spdk_event_call(ev); + } else { + /* For race caces, when setting the reactor to interrupt mode, first set the + * corresponding bit of the notify_cpuset of each reactor and then change the mode. + */ + spdk_for_each_reactor(_reactor_set_notify_cpuset, target, NULL, _reactor_set_notify_cpuset_cpl); + } + + return 0; +} + struct spdk_event * spdk_event_allocate(uint32_t lcore, spdk_event_fn fn, void *arg1, void *arg2) { diff --git a/lib/event/spdk_event.map b/lib/event/spdk_event.map index 57c7471b2..9a4ba5603 100644 --- a/lib/event/spdk_event.map +++ b/lib/event/spdk_event.map @@ -24,6 +24,7 @@ spdk_reactors_stop; spdk_reactor_get; spdk_for_each_reactor; + spdk_reactor_set_interrupt_mode; spdk_subsystem_find; spdk_subsystem_get_first; spdk_subsystem_get_next; diff --git a/test/unit/lib/event/reactor.c/reactor_ut.c b/test/unit/lib/event/reactor.c/reactor_ut.c index 677fd7371..3d5acbb37 100644 --- a/test/unit/lib/event/reactor.c/reactor_ut.c +++ b/test/unit/lib/event/reactor.c/reactor_ut.c @@ -40,6 +40,8 @@ #include "event/scheduler_static.c" #include "event/scheduler_dynamic.c" +DEFINE_STUB(_spdk_get_app_thread, struct spdk_thread *, (void), NULL); + static void test_create_reactor(void) {