diff --git a/include/spdk_internal/event.h b/include/spdk_internal/event.h index 37a4e2173..c3a55081d 100644 --- a/include/spdk_internal/event.h +++ b/include/spdk_internal/event.h @@ -97,7 +97,10 @@ struct spdk_reactor { uint64_t busy_tsc; uint64_t idle_tsc; - bool interrupt_mode; + /* Each bit of cpuset indicates whether a reactor probably requires event notification */ + struct spdk_cpuset notify_cpuset; + /* Indicate whether this reactor currently runs in interrupt */ + bool in_interrupt; struct spdk_fd_group *fgrp; int resched_fd; } __attribute__((aligned(SPDK_CACHE_LINE_SIZE))); diff --git a/lib/event/reactor.c b/lib/event/reactor.c index 0d195bd28..d8243e1db 100644 --- a/lib/event/reactor.c +++ b/lib/event/reactor.c @@ -172,6 +172,7 @@ reactor_construct(struct spdk_reactor *reactor, uint32_t lcore) TAILQ_INIT(&reactor->threads); reactor->thread_count = 0; + spdk_cpuset_zero(&reactor->notify_cpuset); reactor->events = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); if (reactor->events == NULL) { @@ -179,8 +180,26 @@ reactor_construct(struct spdk_reactor *reactor, uint32_t lcore) assert(false); } + /* Always initialize interrupt facilities for reactor */ + if (reactor_interrupt_init(reactor) != 0) { + /* Reactor interrupt facilities are necessary if seting app to interrupt mode. */ + if (spdk_interrupt_mode_is_enabled()) { + SPDK_ERRLOG("Failed to prepare intr facilities\n"); + assert(false); + } + return; + } + + /* If application runs with full interrupt ability, + * all reactors are going to run in interrupt mode. + */ if (spdk_interrupt_mode_is_enabled()) { - reactor_interrupt_init(reactor); + uint32_t i; + + SPDK_ENV_FOREACH_CORE(i) { + spdk_cpuset_set_cpu(&reactor->notify_cpuset, i, true); + } + reactor->in_interrupt = true; } } @@ -298,9 +317,7 @@ spdk_reactors_fini(void) spdk_ring_free(reactor->events); } - if (reactor->interrupt_mode) { - reactor_interrupt_fini(reactor); - } + reactor_interrupt_fini(reactor); if (g_core_infos != NULL) { free(g_core_infos[i].threads); @@ -345,6 +362,8 @@ spdk_event_call(struct spdk_event *event) { int rc; struct spdk_reactor *reactor; + struct spdk_reactor *local_reactor = NULL; + uint32_t current_core = spdk_env_get_current_core(); reactor = spdk_reactor_get(event->lcore); @@ -356,7 +375,16 @@ spdk_event_call(struct spdk_event *event) assert(false); } - if (reactor->interrupt_mode) { + if (current_core != SPDK_ENV_LCORE_ID_ANY) { + local_reactor = spdk_reactor_get(current_core); + } + + /* If spdk_event_call isn't called on a reactor, always send a notification. + * If it is called on a reactor, send a notification if the destination reactor + * is indicated in interrupt mode state. + */ + if (spdk_unlikely(local_reactor == NULL) || + spdk_unlikely(spdk_cpuset_get_cpu(&local_reactor->notify_cpuset, event->lcore))) { uint64_t notify = 1; rc = write(reactor->events_fd, ¬ify, sizeof(notify)); @@ -383,7 +411,8 @@ event_queue_run_batch(struct spdk_reactor *reactor) memset(events, 0, sizeof(events)); #endif - if (reactor->interrupt_mode) { + /* Operate event notification if this reactor currently runs in interrupt state */ + if (spdk_unlikely(reactor->in_interrupt)) { uint64_t notify = 1; int rc; @@ -638,7 +667,8 @@ reactor_post_process_lw_thread(struct spdk_reactor *reactor, struct spdk_lw_thre assert(reactor->thread_count > 0); reactor->thread_count--; - if (reactor->interrupt_mode) { + /* Operate thread intr if running with full interrupt ability */ + if (spdk_interrupt_mode_is_enabled()) { efd = spdk_thread_get_interrupt_fd(thread); spdk_fd_group_remove(reactor->fgrp, efd); } @@ -653,7 +683,8 @@ reactor_post_process_lw_thread(struct spdk_reactor *reactor, struct spdk_lw_thre assert(reactor->thread_count > 0); reactor->thread_count--; - if (reactor->interrupt_mode) { + /* Operate thread intr if running with full interrupt ability */ + if (spdk_interrupt_mode_is_enabled()) { efd = spdk_thread_get_interrupt_fd(thread); spdk_fd_group_remove(reactor->fgrp, efd); } @@ -728,7 +759,8 @@ reactor_run(void *arg) reactor->tsc_last = spdk_get_ticks(); while (1) { - if (spdk_unlikely(reactor->interrupt_mode)) { + /* Execute interrupt process fn if this reactor currently runs in interrupt state */ + if (spdk_unlikely(reactor->in_interrupt)) { reactor_interrupt_run(reactor); } else { _reactor_run(reactor); @@ -769,7 +801,9 @@ reactor_run(void *arg) TAILQ_REMOVE(&reactor->threads, lw_thread, link); assert(reactor->thread_count > 0); reactor->thread_count--; - if (reactor->interrupt_mode) { + + /* Operate thread intr if running with full interrupt ability */ + if (spdk_interrupt_mode_is_enabled()) { int efd = spdk_thread_get_interrupt_fd(thread); spdk_fd_group_remove(reactor->fgrp, efd); @@ -851,12 +885,18 @@ spdk_reactors_stop(void *arg1) uint32_t i; int rc; struct spdk_reactor *reactor; + struct spdk_reactor *local_reactor; uint64_t notify = 1; g_reactor_state = SPDK_REACTOR_STATE_EXITING; + local_reactor = spdk_reactor_get(spdk_env_get_current_core()); - if (spdk_interrupt_mode_is_enabled()) { - SPDK_ENV_FOREACH_CORE(i) { + SPDK_ENV_FOREACH_CORE(i) { + /* If spdk_event_call isn't called on a reactor, always send a notification. + * If it is called on a reactor, send a notification if the destination reactor + * is indicated in interrupt mode state. + */ + if (local_reactor == NULL || spdk_cpuset_get_cpu(&local_reactor->notify_cpuset, i)) { reactor = spdk_reactor_get(i); assert(reactor != NULL); rc = write(reactor->events_fd, ¬ify, sizeof(notify)); @@ -888,14 +928,14 @@ _schedule_thread(void *arg1, void *arg2) int efd; current_core = spdk_env_get_current_core(); - reactor = spdk_reactor_get(current_core); assert(reactor != NULL); TAILQ_INSERT_TAIL(&reactor->threads, lw_thread, link); reactor->thread_count++; - if (reactor->interrupt_mode) { + /* Operate thread intr if running with full interrupt ability */ + if (spdk_interrupt_mode_is_enabled()) { int rc; struct spdk_thread *thread; @@ -975,7 +1015,9 @@ _reactor_request_thread_reschedule(struct spdk_thread *thread) current_core = spdk_env_get_current_core(); reactor = spdk_reactor_get(current_core); assert(reactor != NULL); - if (reactor->interrupt_mode) { + + /* Send a notification if the destination reactor is indicated in intr mode state */ + if (spdk_unlikely(spdk_cpuset_get_cpu(&reactor->notify_cpuset, reactor->lcore))) { uint64_t notify = 1; if (write(reactor->resched_fd, ¬ify, sizeof(notify)) < 0) { @@ -1086,7 +1128,7 @@ reactor_schedule_thread_event(void *arg) uint32_t count = 0; uint64_t notify = 1; - assert(reactor->interrupt_mode); + assert(reactor->in_interrupt); if (read(reactor->resched_fd, ¬ify, sizeof(notify)) < 0) { SPDK_ERRLOG("failed to acknowledge reschedule: %s.\n", spdk_strerror(errno)); @@ -1141,11 +1183,11 @@ reactor_interrupt_init(struct spdk_reactor *reactor) goto err; } - reactor->interrupt_mode = true; return 0; err: spdk_fd_group_destroy(reactor->fgrp); + reactor->fgrp = NULL; return rc; } #else diff --git a/test/unit/lib/event/reactor.c/reactor_ut.c b/test/unit/lib/event/reactor.c/reactor_ut.c index 56941b331..677fd7371 100644 --- a/test/unit/lib/event/reactor.c/reactor_ut.c +++ b/test/unit/lib/event/reactor.c/reactor_ut.c @@ -53,6 +53,7 @@ test_create_reactor(void) CU_ASSERT(spdk_reactor_get(0) == &reactor); spdk_ring_free(reactor.events); + reactor_interrupt_fini(&reactor); g_reactors = NULL; } @@ -105,6 +106,8 @@ test_event_call(void) evt = spdk_event_allocate(0, ut_event_fn, &test1, &test2); CU_ASSERT(evt != NULL); + MOCK_SET(spdk_env_get_current_core, 0); + spdk_event_call(evt); reactor = spdk_reactor_get(0); @@ -114,6 +117,8 @@ test_event_call(void) CU_ASSERT(test1 == 1); CU_ASSERT(test2 == 0xFF); + MOCK_CLEAR(spdk_env_get_current_core); + spdk_reactors_fini(); free_cores(); @@ -138,6 +143,8 @@ test_schedule_thread(void) spdk_cpuset_set_cpu(&cpuset, 3, true); g_next_core = 4; + MOCK_SET(spdk_env_get_current_core, 3); + /* _reactor_schedule_thread() will be called in spdk_thread_create() * at its end because it is passed to SPDK thread library by * spdk_thread_lib_init(). @@ -148,8 +155,6 @@ test_schedule_thread(void) reactor = spdk_reactor_get(3); CU_ASSERT(reactor != NULL); - MOCK_SET(spdk_env_get_current_core, 3); - CU_ASSERT(event_queue_run_batch(reactor) == 1); MOCK_CLEAR(spdk_env_get_current_core); @@ -192,6 +197,7 @@ test_reschedule_thread(void) spdk_cpuset_set_cpu(&g_reactor_core_mask, 2, true); g_next_core = 0; + MOCK_SET(spdk_env_get_current_core, 1); /* Create and schedule the thread to core 1. */ spdk_cpuset_set_cpu(&cpuset, 1, true); @@ -201,7 +207,6 @@ test_reschedule_thread(void) reactor = spdk_reactor_get(1); CU_ASSERT(reactor != NULL); - MOCK_SET(spdk_env_get_current_core, 1); CU_ASSERT(event_queue_run_batch(reactor) == 1); CU_ASSERT(TAILQ_FIRST(&reactor->threads) == lw_thread); @@ -302,12 +307,15 @@ test_for_each_reactor(void) for (i = 0; i < 5; i++) { reactor = spdk_reactor_get(i); CU_ASSERT(reactor != NULL); + MOCK_SET(spdk_env_get_current_core, i); event_queue_run_batch(reactor); CU_ASSERT(count == (i + 1)); CU_ASSERT(done == false); + MOCK_CLEAR(spdk_env_get_current_core); } + MOCK_SET(spdk_env_get_current_core, 0); /* After each reactor is called, the completion calls it one more time. */ reactor = spdk_reactor_get(0); CU_ASSERT(reactor != NULL); @@ -315,6 +323,7 @@ test_for_each_reactor(void) event_queue_run_batch(reactor); CU_ASSERT(count == 6); CU_ASSERT(done == true); + MOCK_CLEAR(spdk_env_get_current_core); spdk_reactors_fini();