From 6ade44c25f0d9ff227f54a03d56a2cb83c54027a Mon Sep 17 00:00:00 2001 From: Maciej Szwed Date: Fri, 22 Jan 2021 10:38:38 +0100 Subject: [PATCH] event: Implement new scheduler This scheduler will group idle threads on first available core and balance busy threads on other cores. Change-Id: Ia0425c767dc3da2a66a9d82a20a0012fac83163c Signed-off-by: Vitaliy Mysak Signed-off-by: Maciej Szwed Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/3901 Community-CI: Broadcom CI Tested-by: SPDK CI Jenkins Reviewed-by: Tomasz Zawadzki Reviewed-by: Jim Harris Reviewed-by: Paul Luse --- lib/event/Makefile | 2 +- lib/event/scheduler_dynamic.c | 134 +++++++++++++++++ test/unit/lib/event/reactor.c/reactor_ut.c | 165 +++++++++++++++++++++ 3 files changed, 300 insertions(+), 1 deletion(-) create mode 100644 lib/event/scheduler_dynamic.c diff --git a/lib/event/Makefile b/lib/event/Makefile index a017020f8..415c058ea 100644 --- a/lib/event/Makefile +++ b/lib/event/Makefile @@ -41,7 +41,7 @@ CFLAGS += $(ENV_CFLAGS) LIBNAME = event C_SRCS = app.c reactor.c rpc.c subsystem.c json_config.c log_rpc.c \ - app_rpc.c subsystem_rpc.c scheduler_static.c + app_rpc.c subsystem_rpc.c scheduler_static.c scheduler_dynamic.c ifeq ($(OS),Linux) C_SRCS += gscheduler.c dpdk_governor.c diff --git a/lib/event/scheduler_dynamic.c b/lib/event/scheduler_dynamic.c new file mode 100644 index 000000000..7f7fac613 --- /dev/null +++ b/lib/event/scheduler_dynamic.c @@ -0,0 +1,134 @@ +/*- + * BSD LICENSE + * + * Copyright (c) Intel Corporation. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "spdk/stdinc.h" +#include "spdk/likely.h" +#include "spdk/event.h" +#include "spdk/log.h" +#include "spdk/env.h" + +#include "spdk_internal/thread.h" +#include "spdk_internal/event.h" + +static uint32_t g_next_lcore = SPDK_ENV_LCORE_ID_ANY; +static uint32_t g_main_lcore; + +#define SCHEDULER_THREAD_BUSY 100 + +static uint8_t +_get_thread_load(struct spdk_lw_thread *lw_thread) +{ + uint64_t busy, idle; + + if (lw_thread->last_stats.busy_tsc == 0 && lw_thread->last_stats.idle_tsc == 0) { + lw_thread->last_stats.busy_tsc = lw_thread->snapshot_stats.busy_tsc; + lw_thread->last_stats.idle_tsc = lw_thread->snapshot_stats.idle_tsc; + return SCHEDULER_THREAD_BUSY; + } + + busy = lw_thread->snapshot_stats.busy_tsc - lw_thread->last_stats.busy_tsc; + idle = lw_thread->snapshot_stats.idle_tsc - lw_thread->last_stats.idle_tsc; + + lw_thread->last_stats.busy_tsc = lw_thread->snapshot_stats.busy_tsc; + lw_thread->last_stats.idle_tsc = lw_thread->snapshot_stats.idle_tsc; + + /* return percentage of time thread was busy */ + return busy * 100 / (busy + idle); +} + +static int +init(struct spdk_governor *governor) +{ + g_main_lcore = spdk_env_get_current_core(); + + return 0; +} + +static void +balance(struct spdk_scheduler_core_info *cores_info, int cores_count, + struct spdk_governor *governor) +{ + struct spdk_lw_thread *lw_thread; + struct spdk_thread *thread; + struct spdk_scheduler_core_info *core; + struct spdk_cpuset *cpumask; + uint32_t target_lcore; + uint32_t i, j, k; + + /* Distribute active threads across all cores except first one + * and move idle threads to first core */ + SPDK_ENV_FOREACH_CORE(i) { + core = &cores_info[i]; + for (j = 0; j < core->threads_count; j++) { + lw_thread = core->threads[j]; + lw_thread->new_lcore = lw_thread->lcore; + thread = spdk_thread_get_from_ctx(lw_thread); + cpumask = spdk_thread_get_cpumask(thread); + + if (_get_thread_load(lw_thread) < 50) { + /* Continue searching for active threads */ + lw_thread->new_lcore = g_main_lcore; + continue; + } + + if (i != g_main_lcore) { + /* Do not move active thread if it is not on the main core */ + continue; + } + + /* Find a suitable reactor */ + for (k = 0; k < spdk_env_get_core_count(); k++) { + if (g_next_lcore == SPDK_ENV_LCORE_ID_ANY) { + g_next_lcore = spdk_env_get_first_core(); + } + + target_lcore = g_next_lcore; + g_next_lcore = spdk_env_get_next_core(g_next_lcore); + + if (spdk_cpuset_get_cpu(cpumask, target_lcore)) { + lw_thread->new_lcore = target_lcore; + break; + } + } + } + } +} + +static struct spdk_scheduler scheduler_dynamic = { + .name = "dynamic", + .init = init, + .deinit = NULL, + .balance = balance, +}; + +SPDK_SCHEDULER_REGISTER(scheduler_dynamic); diff --git a/test/unit/lib/event/reactor.c/reactor_ut.c b/test/unit/lib/event/reactor.c/reactor_ut.c index b5e31e08f..0ca51dd17 100644 --- a/test/unit/lib/event/reactor.c/reactor_ut.c +++ b/test/unit/lib/event/reactor.c/reactor_ut.c @@ -38,6 +38,7 @@ #include "event/reactor.c" #include "spdk_internal/thread.h" #include "event/scheduler_static.c" +#include "event/scheduler_dynamic.c" static void test_create_reactor(void) @@ -445,6 +446,169 @@ test_reactor_stats(void) MOCK_CLEAR(spdk_env_get_current_core); } +static void +test_scheduler(void) +{ + struct spdk_cpuset cpuset = {}; + struct spdk_thread *thread[3]; + struct spdk_lw_thread *lw_thread; + struct spdk_reactor *reactor; + struct spdk_poller *busy, *idle; + int i; + + MOCK_SET(spdk_env_get_current_core, 0); + + allocate_cores(3); + + CU_ASSERT(spdk_reactors_init() == 0); + + _spdk_scheduler_set("dynamic"); + + for (i = 0; i < 3; i++) { + spdk_cpuset_set_cpu(&g_reactor_core_mask, i, true); + } + g_next_core = 0; + + /* Create threads. */ + for (i = 0; i < 3; i++) { + spdk_cpuset_set_cpu(&cpuset, i, true); + thread[i] = spdk_thread_create(NULL, &cpuset); + CU_ASSERT(thread[i] != NULL); + } + + for (i = 0; i < 3; i++) { + reactor = spdk_reactor_get(i); + CU_ASSERT(reactor != NULL); + MOCK_SET(spdk_env_get_current_core, i); + CU_ASSERT(event_queue_run_batch(reactor) == 1); + CU_ASSERT(!TAILQ_EMPTY(&reactor->threads)); + } + + g_reactor_state = SPDK_REACTOR_STATE_RUNNING; + + MOCK_SET(spdk_env_get_current_core, 0); + + /* Init threads stats (low load) */ + for (i = 0; i < 3; i++) { + spdk_set_thread(thread[i]); + busy = spdk_poller_register(poller_run_busy, (void *)10, 0); + idle = spdk_poller_register(poller_run_idle, (void *)90, 0); + reactor = spdk_reactor_get(i); + CU_ASSERT(reactor != NULL); + reactor->tsc_last = 100; + _reactor_run(reactor); + spdk_poller_unregister(&busy); + spdk_poller_unregister(&idle); + + /* Update last stats so that we don't have to call scheduler twice */ + lw_thread = spdk_thread_get_ctx(thread[i]); + lw_thread->last_stats.busy_tsc = UINT32_MAX; + lw_thread->last_stats.idle_tsc = UINT32_MAX; + } + + reactor = spdk_reactor_get(0); + CU_ASSERT(reactor != NULL); + + _reactors_scheduler_gather_metrics(NULL, NULL); + + /* Gather metrics for all cores */ + reactor = spdk_reactor_get(1); + CU_ASSERT(reactor != NULL); + MOCK_SET(spdk_env_get_current_core, 1); + CU_ASSERT(event_queue_run_batch(reactor) == 1); + reactor = spdk_reactor_get(2); + CU_ASSERT(reactor != NULL); + MOCK_SET(spdk_env_get_current_core, 2); + CU_ASSERT(event_queue_run_batch(reactor) == 1); + reactor = spdk_reactor_get(0); + CU_ASSERT(reactor != NULL); + MOCK_SET(spdk_env_get_current_core, 0); + CU_ASSERT(event_queue_run_batch(reactor) == 1); + + /* Threads were idle, so all of them should be placed on core 0 */ + for (i = 0; i < 3; i++) { + reactor = spdk_reactor_get(i); + CU_ASSERT(reactor != NULL); + _reactor_run(reactor); + } + + /* 2 threads should be scheduled to core 0 */ + reactor = spdk_reactor_get(0); + CU_ASSERT(reactor != NULL); + MOCK_SET(spdk_env_get_current_core, 0); + CU_ASSERT(event_queue_run_batch(reactor) == 2); + + reactor = spdk_reactor_get(0); + CU_ASSERT(reactor != NULL); + CU_ASSERT(!TAILQ_EMPTY(&reactor->threads)); + reactor = spdk_reactor_get(1); + CU_ASSERT(reactor != NULL); + CU_ASSERT(TAILQ_EMPTY(&reactor->threads)); + reactor = spdk_reactor_get(2); + CU_ASSERT(reactor != NULL); + CU_ASSERT(TAILQ_EMPTY(&reactor->threads)); + + /* Make threads busy */ + reactor = spdk_reactor_get(0); + CU_ASSERT(reactor != NULL); + reactor->tsc_last = 100; + + for (i = 0; i < 3; i++) { + spdk_set_thread(thread[i]); + busy = spdk_poller_register(poller_run_busy, (void *)100, 0); + _reactor_run(reactor); + spdk_poller_unregister(&busy); + } + + /* Run scheduler again, this time all threads are busy */ + _reactors_scheduler_gather_metrics(NULL, NULL); + + /* Gather metrics for all cores */ + reactor = spdk_reactor_get(1); + CU_ASSERT(reactor != NULL); + MOCK_SET(spdk_env_get_current_core, 1); + CU_ASSERT(event_queue_run_batch(reactor) == 1); + reactor = spdk_reactor_get(2); + CU_ASSERT(reactor != NULL); + MOCK_SET(spdk_env_get_current_core, 2); + CU_ASSERT(event_queue_run_batch(reactor) == 1); + reactor = spdk_reactor_get(0); + CU_ASSERT(reactor != NULL); + MOCK_SET(spdk_env_get_current_core, 0); + CU_ASSERT(event_queue_run_batch(reactor) == 1); + + /* Threads were busy, so they should be distributed evenly across cores */ + for (i = 0; i < 3; i++) { + MOCK_SET(spdk_env_get_current_core, i); + reactor = spdk_reactor_get(i); + CU_ASSERT(reactor != NULL); + _reactor_run(reactor); + } + + for (i = 0; i < 3; i++) { + reactor = spdk_reactor_get(i); + CU_ASSERT(reactor != NULL); + CU_ASSERT(!TAILQ_EMPTY(&reactor->threads)); + } + + g_reactor_state = SPDK_REACTOR_STATE_INITIALIZED; + + /* Destroy threads */ + for (i = 0; i < 3; i++) { + reactor = spdk_reactor_get(i); + CU_ASSERT(reactor != NULL); + reactor_run(reactor); + } + + spdk_set_thread(NULL); + + MOCK_CLEAR(spdk_env_get_current_core); + + spdk_reactors_fini(); + + free_cores(); +} + int main(int argc, char **argv) { @@ -463,6 +627,7 @@ main(int argc, char **argv) CU_ADD_TEST(suite, test_reschedule_thread); CU_ADD_TEST(suite, test_for_each_reactor); CU_ADD_TEST(suite, test_reactor_stats); + CU_ADD_TEST(suite, test_scheduler); CU_basic_set_mode(CU_BRM_VERBOSE); CU_basic_run_tests();