diff --git a/lib/event/reactor.c b/lib/event/reactor.c index b8acbefd6..813df6066 100644 --- a/lib/event/reactor.c +++ b/lib/event/reactor.c @@ -66,7 +66,9 @@ TAILQ_HEAD(, spdk_scheduler) g_scheduler_list = TAILQ_HEAD_INITIALIZER(g_scheduler_list); static struct spdk_scheduler *g_scheduler; +static struct spdk_reactor *g_scheduling_reactor; static uint32_t g_scheduler_period; +static struct spdk_scheduler_core_info *g_core_infos = NULL; static int reactor_interrupt_init(struct spdk_reactor *reactor); static void reactor_interrupt_fini(struct spdk_reactor *reactor); @@ -197,6 +199,14 @@ spdk_reactors_init(void) return -1; } + g_core_infos = calloc(last_core + 1, sizeof(*g_core_infos)); + if (g_core_infos == NULL) { + SPDK_ERRLOG("Could not allocate memory for g_core_infos\n"); + spdk_mempool_free(g_spdk_event_mempool); + free(g_reactors); + return -ENOMEM; + } + memset(g_reactors, 0, (last_core + 1) * sizeof(struct spdk_reactor)); spdk_thread_lib_init_ext(reactor_thread_op, reactor_thread_op_supported, @@ -238,12 +248,18 @@ spdk_reactors_fini(void) if (reactor->interrupt_mode) { reactor_interrupt_fini(reactor); } + + if (g_core_infos != NULL) { + free(g_core_infos[i].threads); + } } spdk_mempool_free(g_spdk_event_mempool); free(g_reactors); g_reactors = NULL; + free(g_core_infos); + g_core_infos = NULL; } struct spdk_event * @@ -423,6 +439,115 @@ _set_thread_name(const char *thread_name) #endif } +static void +_init_thread_stats(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread) +{ + struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread); + + lw_thread->lcore = reactor->lcore; + + spdk_set_thread(thread); + spdk_thread_get_stats(&lw_thread->current_stats); +} + +static void +_threads_reschedule(struct spdk_scheduler_core_info *cores_info) +{ + struct spdk_scheduler_core_info *core; + struct spdk_lw_thread *lw_thread; + uint32_t i, j; + + SPDK_ENV_FOREACH_CORE(i) { + core = &cores_info[i]; + for (j = 0; j < core->threads_count; j++) { + lw_thread = core->threads[j]; + if (lw_thread->lcore != lw_thread->new_lcore) { + _spdk_lw_thread_set_core(lw_thread, lw_thread->new_lcore); + } + } + } +} + +static void +_reactors_scheduler_fini(void *arg1, void *arg2) +{ + struct spdk_reactor *reactor; + uint32_t last_core; + uint32_t i; + + if (g_reactor_state == SPDK_REACTOR_STATE_RUNNING) { + last_core = spdk_env_get_last_core(); + g_scheduler->balance(g_core_infos, last_core + 1); + + /* Reschedule based on the balancing output */ + _threads_reschedule(g_core_infos); + + SPDK_ENV_FOREACH_CORE(i) { + reactor = spdk_reactor_get(i); + reactor->flags.is_scheduling = false; + } + } +} + +/* Phase 1 of thread scheduling is to gather metrics on the existing threads */ +static void +_reactors_scheduler_gather_metrics(void *arg1, void *arg2) +{ + struct spdk_scheduler_core_info *core_info; + struct spdk_lw_thread *lw_thread; + struct spdk_reactor *reactor; + struct spdk_event *evt; + uint32_t next_core; + uint32_t i; + + reactor = spdk_reactor_get(spdk_env_get_current_core()); + reactor->flags.is_scheduling = true; + core_info = &g_core_infos[reactor->lcore]; + core_info->lcore = reactor->lcore; + core_info->core_idle_tsc = reactor->idle_tsc; + core_info->core_busy_tsc = reactor->busy_tsc; + + SPDK_DEBUGLOG(reactor, "Gathering metrics on %u\n", reactor->lcore); + + free(core_info->threads); + core_info->threads = NULL; + + i = 0; + + TAILQ_FOREACH(lw_thread, &reactor->threads, link) { + _init_thread_stats(reactor, lw_thread); + i++; + } + + core_info->threads_count = i; + + if (core_info->threads_count > 0) { + core_info->threads = calloc(core_info->threads_count, sizeof(struct spdk_lw_thread *)); + + i = 0; + TAILQ_FOREACH(lw_thread, &reactor->threads, link) { + core_info->threads[i] = lw_thread; + i++; + } + } + + next_core = spdk_env_get_next_core(reactor->lcore); + if (next_core == UINT32_MAX) { + next_core = spdk_env_get_first_core(); + } + + /* If we've looped back around to the scheduler thread, move to the next phase */ + if (next_core == g_scheduling_reactor->lcore) { + /* Phase 2 of scheduling is rebalancing - deciding which threads to move where */ + evt = spdk_event_allocate(next_core, _reactors_scheduler_fini, NULL, NULL); + spdk_event_call(evt); + return; + } + + evt = spdk_event_allocate(next_core, _reactors_scheduler_gather_metrics, NULL, NULL); + spdk_event_call(evt); +} + static int _reactor_schedule_thread(struct spdk_thread *thread); static uint64_t g_rusage_period; @@ -448,16 +573,18 @@ reactor_post_process_lw_thread(struct spdk_reactor *reactor, struct spdk_lw_thre if (spdk_unlikely(spdk_thread_is_exited(thread) && spdk_thread_is_idle(thread))) { - TAILQ_REMOVE(&reactor->threads, lw_thread, link); - assert(reactor->thread_count > 0); - reactor->thread_count--; + if (reactor->flags.is_scheduling == false) { + TAILQ_REMOVE(&reactor->threads, lw_thread, link); + assert(reactor->thread_count > 0); + reactor->thread_count--; - if (reactor->interrupt_mode) { - efd = spdk_thread_get_interrupt_fd(thread); - spdk_fd_group_remove(reactor->fgrp, efd); + if (reactor->interrupt_mode) { + efd = spdk_thread_get_interrupt_fd(thread); + spdk_fd_group_remove(reactor->fgrp, efd); + } + spdk_thread_destroy(thread); + return true; } - spdk_thread_destroy(thread); - return true; } return false; @@ -513,6 +640,7 @@ reactor_run(void *arg) struct spdk_thread *thread; struct spdk_lw_thread *lw_thread, *tmp; char thread_name[32]; + uint64_t last_sched = 0; SPDK_NOTICELOG("Reactor started on core %u\n", reactor->lcore); @@ -531,6 +659,14 @@ reactor_run(void *arg) _reactor_run(reactor); } + if (spdk_unlikely((reactor->tsc_last - last_sched) > g_scheduler_period && + reactor == g_scheduling_reactor && + !reactor->flags.is_scheduling && + g_scheduler->balance)) { + last_sched = reactor->tsc_last; + _reactors_scheduler_gather_metrics(NULL, NULL); + } + if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING) { break; } @@ -629,6 +765,7 @@ spdk_reactors_start(void) /* Start the master reactor */ reactor = spdk_reactor_get(current_core); assert(reactor != NULL); + g_scheduling_reactor = reactor; reactor_run(reactor); spdk_env_thread_wait_all(); @@ -712,21 +849,26 @@ _reactor_schedule_thread(struct spdk_thread *thread) lw_thread = spdk_thread_get_ctx(thread); assert(lw_thread != NULL); + core = lw_thread->lcore; memset(lw_thread, 0, sizeof(*lw_thread)); pthread_mutex_lock(&g_scheduler_mtx); - for (i = 0; i < spdk_env_get_core_count(); i++) { - if (g_next_core > spdk_env_get_last_core()) { - g_next_core = spdk_env_get_first_core(); - } - core = g_next_core; - g_next_core = spdk_env_get_next_core(g_next_core); + if (core == SPDK_ENV_LCORE_ID_ANY) { + for (i = 0; i < spdk_env_get_core_count(); i++) { + if (g_next_core > spdk_env_get_last_core()) { + g_next_core = spdk_env_get_first_core(); + } + core = g_next_core; + g_next_core = spdk_env_get_next_core(g_next_core); - if (spdk_cpuset_get_cpu(cpumask, core)) { - evt = spdk_event_allocate(core, _schedule_thread, lw_thread, NULL); - break; + if (spdk_cpuset_get_cpu(cpumask, core)) { + break; + } } } + + evt = spdk_event_allocate(core, _schedule_thread, lw_thread, NULL); + pthread_mutex_unlock(&g_scheduler_mtx); assert(evt != NULL); @@ -770,8 +912,12 @@ _reactor_request_thread_reschedule(struct spdk_thread *thread) static int reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op) { + struct spdk_lw_thread *lw_thread; + switch (op) { case SPDK_THREAD_OP_NEW: + lw_thread = spdk_thread_get_ctx(thread); + lw_thread->lcore = SPDK_ENV_LCORE_ID_ANY; return _reactor_schedule_thread(thread); case SPDK_THREAD_OP_RESCHED: _reactor_request_thread_reschedule(thread);