diff --git a/include/spdk_internal/event.h b/include/spdk_internal/event.h index 154c997d3..9e6254dcb 100644 --- a/include/spdk_internal/event.h +++ b/include/spdk_internal/event.h @@ -62,6 +62,7 @@ enum spdk_reactor_state { struct spdk_lw_thread { TAILQ_ENTRY(spdk_lw_thread) link; + bool resched; }; struct spdk_reactor { diff --git a/lib/event/reactor.c b/lib/event/reactor.c index a6112ebdd..e64c83b27 100644 --- a/lib/event/reactor.c +++ b/lib/event/reactor.c @@ -305,6 +305,8 @@ _set_thread_name(const char *thread_name) #endif } +static int _reactor_schedule_thread(struct spdk_thread *thread); + static int _spdk_reactor_run(void *arg) { @@ -339,10 +341,18 @@ _spdk_reactor_run(void *arg) thread = spdk_thread_get_from_ctx(lw_thread); spdk_thread_poll(thread, 0, now); + if (spdk_unlikely(lw_thread->resched)) { + lw_thread->resched = false; + TAILQ_REMOVE(&reactor->threads, lw_thread, link); + _reactor_schedule_thread(thread); + continue; + } + if (spdk_unlikely(spdk_thread_is_exited(thread) && spdk_thread_is_idle(thread))) { TAILQ_REMOVE(&reactor->threads, lw_thread, link); spdk_thread_destroy(thread); + continue; } } @@ -516,12 +526,27 @@ _reactor_schedule_thread(struct spdk_thread *thread) return 0; } +static void +_reactor_request_thread_reschedule(struct spdk_thread *thread) +{ + struct spdk_lw_thread *lw_thread; + + assert(thread == spdk_get_thread()); + + lw_thread = spdk_thread_get_ctx(thread); + + lw_thread->resched = true; +} + static int spdk_reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op) { switch (op) { case SPDK_THREAD_OP_NEW: return _reactor_schedule_thread(thread); + case SPDK_THREAD_OP_RESCHED: + _reactor_request_thread_reschedule(thread); + return 0; default: return -ENOTSUP; } @@ -532,6 +557,7 @@ spdk_reactor_thread_op_supported(enum spdk_thread_op op) { switch (op) { case SPDK_THREAD_OP_NEW: + case SPDK_THREAD_OP_RESCHED: return true; default: return false; diff --git a/test/unit/lib/event/reactor.c/reactor_ut.c b/test/unit/lib/event/reactor.c/reactor_ut.c index 9fe448e6a..ec198f950 100644 --- a/test/unit/lib/event/reactor.c/reactor_ut.c +++ b/test/unit/lib/event/reactor.c/reactor_ut.c @@ -155,6 +155,85 @@ test_schedule_thread(void) free_cores(); } +static void +test_reschedule_thread(void) +{ + struct spdk_cpuset cpuset = {}; + struct spdk_thread *thread; + struct spdk_reactor *reactor; + struct spdk_lw_thread *lw_thread; + + allocate_cores(3); + + CU_ASSERT(spdk_reactors_init() == 0); + + spdk_cpuset_set_cpu(&g_reactor_core_mask, 0, true); + spdk_cpuset_set_cpu(&g_reactor_core_mask, 1, true); + spdk_cpuset_set_cpu(&g_reactor_core_mask, 2, true); + g_next_core = 0; + + /* Create and schedule the thread to core 1. */ + spdk_cpuset_set_cpu(&cpuset, 1, true); + + thread = spdk_thread_create(NULL, &cpuset); + CU_ASSERT(thread != NULL); + lw_thread = spdk_thread_get_ctx(thread); + + reactor = spdk_reactor_get(1); + CU_ASSERT(reactor != NULL); + MOCK_SET(spdk_env_get_current_core, 1); + + CU_ASSERT(_spdk_event_queue_run_batch(reactor) == 1); + CU_ASSERT(TAILQ_FIRST(&reactor->threads) == lw_thread); + + spdk_set_thread(thread); + + /* Call spdk_thread_set_cpumask() twice with different cpumask values. + * The cpumask of the 2nd call will be used in reschedule operation. + */ + + spdk_cpuset_zero(&cpuset); + spdk_cpuset_set_cpu(&cpuset, 0, true); + CU_ASSERT(spdk_thread_set_cpumask(&cpuset) == 0); + + spdk_cpuset_zero(&cpuset); + spdk_cpuset_set_cpu(&cpuset, 2, true); + CU_ASSERT(spdk_thread_set_cpumask(&cpuset) == 0); + + CU_ASSERT(lw_thread->resched == true); + + _spdk_reactor_run(reactor); + + CU_ASSERT(lw_thread->resched == false); + CU_ASSERT(TAILQ_EMPTY(&reactor->threads)); + + reactor = spdk_reactor_get(0); + CU_ASSERT(reactor != NULL); + MOCK_SET(spdk_env_get_current_core, 0); + + CU_ASSERT(_spdk_event_queue_run_batch(reactor) == 0); + + reactor = spdk_reactor_get(2); + CU_ASSERT(reactor != NULL); + MOCK_SET(spdk_env_get_current_core, 2); + + CU_ASSERT(_spdk_event_queue_run_batch(reactor) == 1); + + CU_ASSERT(TAILQ_FIRST(&reactor->threads) == lw_thread); + + MOCK_CLEAR(spdk_env_get_current_core); + + TAILQ_REMOVE(&reactor->threads, lw_thread, link); + spdk_set_thread(thread); + spdk_thread_exit(thread); + spdk_thread_destroy(thread); + spdk_set_thread(NULL); + + spdk_reactors_fini(); + + free_cores(); +} + static void for_each_reactor_done(void *arg1, void *arg2) { @@ -239,6 +318,7 @@ main(int argc, char **argv) CU_add_test(suite, "test_init_reactors", test_init_reactors) == NULL || CU_add_test(suite, "test_event_call", test_event_call) == NULL || CU_add_test(suite, "test_schedule_thread", test_schedule_thread) == NULL || + CU_add_test(suite, "test_reschedule_thread", test_reschedule_thread) == NULL || CU_add_test(suite, "test_for_each_reactor", test_for_each_reactor) == NULL ) { CU_cleanup_registry();