diff --git a/lib/event/reactor.c b/lib/event/reactor.c index 98e897c4c..38af3b7b2 100644 --- a/lib/event/reactor.c +++ b/lib/event/reactor.c @@ -53,10 +53,17 @@ enum spdk_reactor_state { SPDK_REACTOR_STATE_SHUTDOWN = 4, }; +struct spdk_lw_thread { + TAILQ_ENTRY(spdk_lw_thread) link; +}; + struct spdk_reactor { /* Logical core number for this reactor. */ uint32_t lcore; + /* Lightweight threads running on this reactor */ + TAILQ_HEAD(, spdk_lw_thread) threads; + /* Poller for get the rusage for the reactor. */ struct spdk_poller *rusage_poller; @@ -204,29 +211,47 @@ static int _spdk_reactor_run(void *arg) { struct spdk_reactor *reactor = arg; - struct spdk_thread *thread; + struct spdk_thread *orig_thread, *thread; uint64_t last_rusage = 0; + struct spdk_lw_thread *lw_thread, *tmp; char thread_name[32]; snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore); - thread = spdk_thread_create(thread_name); - if (!thread) { + orig_thread = spdk_thread_create(thread_name); + if (!orig_thread) { return -1; } + + lw_thread = (struct spdk_lw_thread *)spdk_thread_get_ctx(orig_thread); + if (!lw_thread) { + spdk_thread_exit(orig_thread); + return -ENOMEM; + } + + TAILQ_INSERT_TAIL(&reactor->threads, lw_thread, link); + SPDK_NOTICELOG("Reactor started on core %u\n", reactor->lcore); while (1) { - _spdk_event_queue_run_batch(reactor, thread); + uint64_t now; - spdk_thread_poll(thread, 0, 0); + /* For each loop through the reactor, capture the time. This time + * is used for all threads. */ + now = spdk_get_ticks(); + + TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) { + thread = spdk_thread_get_from_ctx(lw_thread); + + _spdk_event_queue_run_batch(reactor, thread); + + spdk_thread_poll(thread, 0, now); + } if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING) { break; } if (g_context_switch_monitor_enabled) { - uint64_t now = spdk_get_ticks(); - if ((last_rusage + CONTEXT_SWITCH_MONITOR_PERIOD) < now) { get_rusage(reactor); last_rusage = now; @@ -234,6 +259,12 @@ _spdk_reactor_run(void *arg) } } + lw_thread = spdk_thread_get_ctx(orig_thread); + TAILQ_REMOVE(&reactor->threads, lw_thread, link); + assert(TAILQ_EMPTY(&reactor->threads)); + + spdk_thread_exit(orig_thread); + return 0; } @@ -242,6 +273,8 @@ spdk_reactor_construct(struct spdk_reactor *reactor, uint32_t lcore) { reactor->lcore = lcore; + TAILQ_INIT(&reactor->threads); + reactor->events = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); assert(reactor->events != NULL); } @@ -343,7 +376,7 @@ spdk_reactors_init(void) memset(g_reactors, 0, (last_core + 1) * sizeof(struct spdk_reactor)); - spdk_thread_lib_init(NULL, 0); + spdk_thread_lib_init(NULL, sizeof(struct spdk_lw_thread)); SPDK_ENV_FOREACH_CORE(i) { reactor = spdk_reactor_get(i);