diff --git a/lib/event/app.c b/lib/event/app.c index 4ccc61bef..a54c0bb1a 100644 --- a/lib/event/app.c +++ b/lib/event/app.c @@ -70,7 +70,7 @@ static struct spdk_app g_spdk_app; static spdk_msg_fn g_start_fn = NULL; static void *g_start_arg = NULL; static struct spdk_event *g_shutdown_event = NULL; -static uint32_t g_init_lcore; +static struct spdk_thread *g_app_thread = NULL; static bool g_delay_subsystem_init = false; static bool g_shutdown_sig_received = false; static char *g_executable_name; @@ -350,7 +350,7 @@ spdk_app_start_application(void) { spdk_rpc_set_state(SPDK_RPC_RUNTIME); - assert(spdk_env_get_current_core() == g_init_lcore); + assert(spdk_get_thread() == g_app_thread); g_start_fn(g_start_arg); } @@ -559,7 +559,7 @@ spdk_app_setup_trace(struct spdk_app_opts *opts) } static void -bootstrap_fn(void *arg1, void *arg2) +bootstrap_fn(void *arg1) { if (g_spdk_app.json_config_file) { g_delay_subsystem_init = false; @@ -581,7 +581,6 @@ spdk_app_start(struct spdk_app_opts *opts, spdk_msg_fn start_fn, struct spdk_conf *config = NULL; int rc; char *tty; - struct spdk_event *event; if (!opts) { SPDK_ERRLOG("opts should not be NULL\n"); @@ -645,6 +644,14 @@ spdk_app_start(struct spdk_app_opts *opts, spdk_msg_fn start_fn, goto app_start_log_close_err; } + /* Now that the reactors have been initialized, we can create an + * initialization thread. */ + g_app_thread = spdk_thread_create("app_thread"); + if (!g_app_thread) { + SPDK_ERRLOG("Unable to create an spdk_thread for initialization\n"); + goto app_start_log_close_err; + } + /* * Note the call to spdk_app_setup_trace() is located here * ahead of spdk_app_setup_signal_handlers(). @@ -668,14 +675,11 @@ spdk_app_start(struct spdk_app_opts *opts, spdk_msg_fn start_fn, g_spdk_app.shutdown_cb = opts->shutdown_cb; g_spdk_app.rc = 0; - g_init_lcore = spdk_env_get_current_core(); g_delay_subsystem_init = opts->delay_subsystem_init; g_start_fn = start_fn; g_start_arg = arg1; - event = spdk_event_allocate(g_init_lcore, bootstrap_fn, NULL, NULL); - - spdk_event_call(event); + spdk_thread_send_msg(g_app_thread, bootstrap_fn, NULL); /* This blocks until spdk_app_stop is called */ spdk_reactors_start(); @@ -703,7 +707,7 @@ spdk_app_fini(void) } static void -_spdk_app_stop(void *arg1, void *arg2) +_spdk_app_stop(void *arg1) { spdk_rpc_finish(); spdk_subsystem_fini(spdk_reactors_stop, NULL); @@ -717,10 +721,10 @@ spdk_app_stop(int rc) } g_spdk_app.rc = rc; /* - * We want to run spdk_subsystem_fini() from the same lcore where spdk_subsystem_init() + * We want to run spdk_subsystem_fini() from the same thread where spdk_subsystem_init() * was called. */ - spdk_event_call(spdk_event_allocate(g_init_lcore, _spdk_app_stop, NULL, NULL)); + spdk_thread_send_msg(g_app_thread, _spdk_app_stop, NULL); } static void @@ -1052,7 +1056,7 @@ spdk_rpc_start_subsystem_init_cpl(void *arg1) struct spdk_jsonrpc_request *request = arg1; struct spdk_json_write_ctx *w; - assert(spdk_env_get_current_core() == g_init_lcore); + assert(spdk_get_thread() == g_app_thread); spdk_app_start_application(); diff --git a/lib/event/reactor.c b/lib/event/reactor.c index a58c5695e..62702402f 100644 --- a/lib/event/reactor.c +++ b/lib/event/reactor.c @@ -223,24 +223,9 @@ static int _spdk_reactor_run(void *arg) { struct spdk_reactor *reactor = arg; - struct spdk_thread *orig_thread, *thread; + struct spdk_thread *thread; uint64_t last_rusage = 0; struct spdk_lw_thread *lw_thread, *tmp; - char thread_name[32]; - - snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore); - orig_thread = spdk_thread_create(thread_name); - if (!orig_thread) { - return -1; - } - - lw_thread = (struct spdk_lw_thread *)spdk_thread_get_ctx(orig_thread); - if (!lw_thread) { - spdk_thread_exit(orig_thread); - return -ENOMEM; - } - - TAILQ_INSERT_TAIL(&reactor->threads, lw_thread, link); SPDK_NOTICELOG("Reactor started on core %u\n", reactor->lcore); @@ -271,11 +256,11 @@ _spdk_reactor_run(void *arg) } } - lw_thread = spdk_thread_get_ctx(orig_thread); - TAILQ_REMOVE(&reactor->threads, lw_thread, link); - assert(TAILQ_EMPTY(&reactor->threads)); - - spdk_thread_exit(orig_thread); + TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) { + thread = spdk_thread_get_from_ctx(lw_thread); + TAILQ_REMOVE(&reactor->threads, lw_thread, link); + spdk_thread_exit(thread); + } return 0; } @@ -320,6 +305,7 @@ spdk_reactors_start(void) struct spdk_reactor *reactor; uint32_t i, current_core; int rc; + char thread_name[32]; g_reactor_state = SPDK_REACTOR_STATE_RUNNING; g_spdk_app_core_mask = spdk_cpuset_alloc(); @@ -334,6 +320,10 @@ spdk_reactors_start(void) assert(false); return; } + + /* For now, for each reactor spawn one thread. */ + snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore); + spdk_thread_create(thread_name); } spdk_cpuset_set_cpu(g_spdk_app_core_mask, i, true); } @@ -355,6 +345,43 @@ spdk_reactors_stop(void *arg1) g_reactor_state = SPDK_REACTOR_STATE_EXITING; } +static pthread_mutex_t g_scheduler_mtx = PTHREAD_MUTEX_INITIALIZER; +static uint32_t g_next_core = UINT32_MAX; + +static void +_schedule_thread(void *arg1, void *arg2) +{ + struct spdk_lw_thread *lw_thread = arg1; + struct spdk_reactor *reactor; + + reactor = spdk_reactor_get(spdk_env_get_current_core()); + + TAILQ_INSERT_TAIL(&reactor->threads, lw_thread, link); +} + +static void +spdk_reactor_schedule_thread(struct spdk_thread *thread) +{ + uint32_t core; + struct spdk_lw_thread *lw_thread; + struct spdk_event *evt; + + lw_thread = spdk_thread_get_ctx(thread); + assert(lw_thread != NULL); + memset(lw_thread, 0, sizeof(*lw_thread)); + + pthread_mutex_lock(&g_scheduler_mtx); + if (g_next_core > spdk_env_get_core_count()) { + g_next_core = spdk_env_get_first_core(); + } + core = g_next_core; + g_next_core = spdk_env_get_next_core(g_next_core); + pthread_mutex_unlock(&g_scheduler_mtx); + + evt = spdk_event_allocate(core, _schedule_thread, lw_thread, NULL); + spdk_event_call(evt); +} + int spdk_reactors_init(void) { @@ -388,7 +415,7 @@ spdk_reactors_init(void) memset(g_reactors, 0, (last_core + 1) * sizeof(struct spdk_reactor)); - spdk_thread_lib_init(NULL, sizeof(struct spdk_lw_thread)); + spdk_thread_lib_init(spdk_reactor_schedule_thread, sizeof(struct spdk_lw_thread)); SPDK_ENV_FOREACH_CORE(i) { reactor = spdk_reactor_get(i);