From b74b6133fa1cdced18d9d04ea7304aed1db99245 Mon Sep 17 00:00:00 2001 From: Tomasz Zawadzki Date: Fri, 11 Jun 2021 07:09:12 -0400 Subject: [PATCH] lib/event: remove scheduler dependency on lw_thread Removing dependency on schedulers to directly modify lw_thread field structures will help making schedulers truly plugable. Instead of using lw_thread, new structure is created that holds copy of stats and refer to the thread by spdk_thread id. As an added benefit of not changing lw_thread directly, we won't run into issue of balancing function changing it while other reactor removes and frees it. In the future an API will be added for scheduler to call in order to move the thread directly. Rather than for event framework to rely on modified core_info/thread_info structure. Signed-off-by: Tomasz Zawadzki Change-Id: I8f85bb8dc080fd13b78b07ee9ef8e8be7051659b Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/8411 Community-CI: Broadcom CI Community-CI: Mellanox Build Bot Reviewed-by: Konrad Sztyber Reviewed-by: Jim Harris Reviewed-by: Shuhei Matsumoto Tested-by: SPDK CI Jenkins --- include/spdk_internal/event.h | 14 ++++++++- lib/event/reactor.c | 43 ++++++++++++++++++++------ lib/event/scheduler_dynamic.c | 58 +++++++++++++++++++---------------- 3 files changed, 78 insertions(+), 37 deletions(-) diff --git a/include/spdk_internal/event.h b/include/spdk_internal/event.h index 7d5088a56..85c11e03c 100644 --- a/include/spdk_internal/event.h +++ b/include/spdk_internal/event.h @@ -255,6 +255,18 @@ struct spdk_governor *_spdk_governor_get(void); _spdk_governor_list_add(governor); \ } \ +/** + * Structure representing thread used for scheduling. + */ +struct spdk_scheduler_thread_info { + uint32_t lcore; + uint64_t thread_id; + /* stats over a lifetime of a thread */ + struct spdk_thread_stats total_stats; + /* stats during the last scheduling period */ + struct spdk_thread_stats current_stats; +}; + /** * A list of cores and threads which is used for scheduling. */ @@ -269,7 +281,7 @@ struct spdk_scheduler_core_info { uint32_t lcore; uint32_t threads_count; bool interrupt_mode; - struct spdk_lw_thread **threads; + struct spdk_scheduler_thread_info *thread_infos; }; /** diff --git a/lib/event/reactor.c b/lib/event/reactor.c index 9f2606a8b..36ec7ee44 100644 --- a/lib/event/reactor.c +++ b/lib/event/reactor.c @@ -333,7 +333,7 @@ spdk_reactors_fini(void) reactor_interrupt_fini(reactor); if (g_core_infos != NULL) { - free(g_core_infos[i].threads); + free(g_core_infos[i].thread_infos); } } @@ -687,19 +687,37 @@ _init_thread_stats(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_threa lw_thread->current_stats.idle_tsc = lw_thread->total_stats.idle_tsc - prev_total_stats.idle_tsc; } +static void +_threads_reschedule_thread(struct spdk_scheduler_thread_info *thread_info) +{ + struct spdk_lw_thread *lw_thread; + struct spdk_thread *thread; + + thread = spdk_thread_get_by_id(thread_info->thread_id); + if (thread == NULL) { + /* Thread no longer exists. */ + return; + } + lw_thread = spdk_thread_get_ctx(thread); + assert(lw_thread != NULL); + + lw_thread->lcore = thread_info->lcore; + lw_thread->resched = true; +} + static void _threads_reschedule(struct spdk_scheduler_core_info *cores_info) { struct spdk_scheduler_core_info *core; - struct spdk_lw_thread *lw_thread; + struct spdk_scheduler_thread_info *thread_info; uint32_t i, j; SPDK_ENV_FOREACH_CORE(i) { core = &cores_info[i]; for (j = 0; j < core->threads_count; j++) { - lw_thread = core->threads[j]; - if (lw_thread->lcore != i) { - lw_thread->resched = true; + thread_info = &core->thread_infos[j]; + if (thread_info->lcore != i) { + _threads_reschedule_thread(thread_info); } } } @@ -782,6 +800,7 @@ _reactors_scheduler_gather_metrics(void *arg1, void *arg2) { struct spdk_scheduler_core_info *core_info; struct spdk_lw_thread *lw_thread; + struct spdk_thread *thread; struct spdk_reactor *reactor; struct spdk_event *evt; uint32_t next_core; @@ -800,8 +819,8 @@ _reactors_scheduler_gather_metrics(void *arg1, void *arg2) SPDK_DEBUGLOG(reactor, "Gathering metrics on %u\n", reactor->lcore); - free(core_info->threads); - core_info->threads = NULL; + free(core_info->thread_infos); + core_info->thread_infos = NULL; i = 0; @@ -813,8 +832,8 @@ _reactors_scheduler_gather_metrics(void *arg1, void *arg2) core_info->threads_count = i; if (core_info->threads_count > 0) { - core_info->threads = calloc(core_info->threads_count, sizeof(struct spdk_lw_thread *)); - if (core_info->threads == NULL) { + core_info->thread_infos = calloc(core_info->threads_count, sizeof(*core_info->thread_infos)); + if (core_info->thread_infos == NULL) { SPDK_ERRLOG("Failed to allocate memory when gathering metrics on %u\n", reactor->lcore); /* Cancel this round of schedule work */ @@ -825,7 +844,11 @@ _reactors_scheduler_gather_metrics(void *arg1, void *arg2) i = 0; TAILQ_FOREACH(lw_thread, &reactor->threads, link) { - core_info->threads[i] = lw_thread; + core_info->thread_infos[i].lcore = lw_thread->lcore; + thread = spdk_thread_get_from_ctx(lw_thread); + core_info->thread_infos[i].thread_id = spdk_thread_get_id(thread); + core_info->thread_infos[i].total_stats = lw_thread->total_stats; + core_info->thread_infos[i].current_stats = lw_thread->current_stats; i++; } } diff --git a/lib/event/scheduler_dynamic.c b/lib/event/scheduler_dynamic.c index 59779dbec..37fd5c3e1 100644 --- a/lib/event/scheduler_dynamic.c +++ b/lib/event/scheduler_dynamic.c @@ -56,12 +56,12 @@ static struct core_stats *g_cores; #define SCHEDULER_CORE_LIMIT 95 static uint8_t -_get_thread_load(struct spdk_lw_thread *lw_thread) +_get_thread_load(struct spdk_scheduler_thread_info *thread_info) { uint64_t busy, idle; - busy = lw_thread->current_stats.busy_tsc; - idle = lw_thread->current_stats.idle_tsc; + busy = thread_info->current_stats.busy_tsc; + idle = thread_info->current_stats.idle_tsc; if (busy == 0) { /* No work was done, exit before possible division by 0. */ @@ -71,7 +71,7 @@ _get_thread_load(struct spdk_lw_thread *lw_thread) return busy * 100 / (busy + idle); } -typedef void (*_foreach_fn)(struct spdk_lw_thread *lw_thread); +typedef void (*_foreach_fn)(struct spdk_scheduler_thread_info *thread_info); static void _foreach_thread(struct spdk_scheduler_core_info *cores_info, _foreach_fn fn) @@ -82,18 +82,18 @@ _foreach_thread(struct spdk_scheduler_core_info *cores_info, _foreach_fn fn) SPDK_ENV_FOREACH_CORE(i) { core = &cores_info[i]; for (j = 0; j < core->threads_count; j++) { - fn(core->threads[j]); + fn(&core->thread_infos[j]); } } } static void -_move_thread(struct spdk_lw_thread *lw_thread, uint32_t dst_core) +_move_thread(struct spdk_scheduler_thread_info *thread_info, uint32_t dst_core) { struct core_stats *dst = &g_cores[dst_core]; - struct core_stats *src = &g_cores[lw_thread->lcore]; - uint64_t busy_tsc = lw_thread->current_stats.busy_tsc; - uint64_t idle_tsc = lw_thread->current_stats.idle_tsc; + struct core_stats *src = &g_cores[thread_info->lcore]; + uint64_t busy_tsc = thread_info->current_stats.busy_tsc; + uint64_t idle_tsc = thread_info->current_stats.idle_tsc; if (src == dst) { /* Don't modify stats if thread is already on that core. */ @@ -111,7 +111,7 @@ _move_thread(struct spdk_lw_thread *lw_thread, uint32_t dst_core) assert(src->thread_count > 0); src->thread_count--; - lw_thread->lcore = dst_core; + thread_info->lcore = dst_core; } static bool @@ -142,12 +142,12 @@ _is_core_over_limit(uint32_t core_id) } static bool -_can_core_fit_thread(struct spdk_lw_thread *lw_thread, uint32_t dst_core) +_can_core_fit_thread(struct spdk_scheduler_thread_info *thread_info, uint32_t dst_core) { struct core_stats *dst = &g_cores[dst_core]; /* Thread can always fit on the core it's currently on. */ - if (lw_thread->lcore == dst_core) { + if (thread_info->lcore == dst_core) { return true; } @@ -162,22 +162,28 @@ _can_core_fit_thread(struct spdk_lw_thread *lw_thread, uint32_t dst_core) return true; } - if (lw_thread->current_stats.busy_tsc <= dst->idle) { + if (thread_info->current_stats.busy_tsc <= dst->idle) { return true; } return false; } static uint32_t -_find_optimal_core(struct spdk_lw_thread *lw_thread) +_find_optimal_core(struct spdk_scheduler_thread_info *thread_info) { uint32_t i; - uint32_t current_lcore = lw_thread->lcore; - uint32_t least_busy_lcore = lw_thread->lcore; - struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread); - struct spdk_cpuset *cpumask = spdk_thread_get_cpumask(thread); + uint32_t current_lcore = thread_info->lcore; + uint32_t least_busy_lcore = thread_info->lcore; + struct spdk_thread *thread; + struct spdk_cpuset *cpumask; bool core_over_limit = _is_core_over_limit(current_lcore); + thread = spdk_thread_get_by_id(thread_info->thread_id); + if (thread == NULL) { + return current_lcore; + } + cpumask = spdk_thread_get_cpumask(thread); + /* Find a core that can fit the thread. */ SPDK_ENV_FOREACH_CORE(i) { /* Ignore cores outside cpumask. */ @@ -191,7 +197,7 @@ _find_optimal_core(struct spdk_lw_thread *lw_thread) } /* Skip cores that cannot fit the thread and current one. */ - if (!_can_core_fit_thread(lw_thread, i) || i == current_lcore) { + if (!_can_core_fit_thread(thread_info, i) || i == current_lcore) { continue; } @@ -263,27 +269,27 @@ deinit(struct spdk_governor *governor) } static void -_balance_idle(struct spdk_lw_thread *lw_thread) +_balance_idle(struct spdk_scheduler_thread_info *thread_info) { - if (_get_thread_load(lw_thread) >= SCHEDULER_LOAD_LIMIT) { + if (_get_thread_load(thread_info) >= SCHEDULER_LOAD_LIMIT) { return; } /* This thread is idle, move it to the main core. */ - _move_thread(lw_thread, g_main_lcore); + _move_thread(thread_info, g_main_lcore); } static void -_balance_active(struct spdk_lw_thread *lw_thread) +_balance_active(struct spdk_scheduler_thread_info *thread_info) { uint32_t target_lcore; - if (_get_thread_load(lw_thread) < SCHEDULER_LOAD_LIMIT) { + if (_get_thread_load(thread_info) < SCHEDULER_LOAD_LIMIT) { return; } /* This thread is active. */ - target_lcore = _find_optimal_core(lw_thread); - _move_thread(lw_thread, target_lcore); + target_lcore = _find_optimal_core(thread_info); + _move_thread(thread_info, target_lcore); } static void