thread: Implement a thread scheduler
Change-Id: Ie1cad80a071f9de066fe18093a4c4c8e726e5a77 Reviewed-on: https://review.gerrithub.io/c/spdk/spdk/+/446998 Tested-by: SPDK CI Jenkins <sys_sgci@intel.com> Reviewed-by: Shuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com> Reviewed-by: Jim Harris <james.r.harris@intel.com>
This commit is contained in:
parent
b71bebe88a
commit
032920f250
@ -70,7 +70,7 @@ static struct spdk_app g_spdk_app;
|
|||||||
static spdk_msg_fn g_start_fn = NULL;
|
static spdk_msg_fn g_start_fn = NULL;
|
||||||
static void *g_start_arg = NULL;
|
static void *g_start_arg = NULL;
|
||||||
static struct spdk_event *g_shutdown_event = NULL;
|
static struct spdk_event *g_shutdown_event = NULL;
|
||||||
static uint32_t g_init_lcore;
|
static struct spdk_thread *g_app_thread = NULL;
|
||||||
static bool g_delay_subsystem_init = false;
|
static bool g_delay_subsystem_init = false;
|
||||||
static bool g_shutdown_sig_received = false;
|
static bool g_shutdown_sig_received = false;
|
||||||
static char *g_executable_name;
|
static char *g_executable_name;
|
||||||
@ -350,7 +350,7 @@ spdk_app_start_application(void)
|
|||||||
{
|
{
|
||||||
spdk_rpc_set_state(SPDK_RPC_RUNTIME);
|
spdk_rpc_set_state(SPDK_RPC_RUNTIME);
|
||||||
|
|
||||||
assert(spdk_env_get_current_core() == g_init_lcore);
|
assert(spdk_get_thread() == g_app_thread);
|
||||||
|
|
||||||
g_start_fn(g_start_arg);
|
g_start_fn(g_start_arg);
|
||||||
}
|
}
|
||||||
@ -559,7 +559,7 @@ spdk_app_setup_trace(struct spdk_app_opts *opts)
|
|||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
bootstrap_fn(void *arg1, void *arg2)
|
bootstrap_fn(void *arg1)
|
||||||
{
|
{
|
||||||
if (g_spdk_app.json_config_file) {
|
if (g_spdk_app.json_config_file) {
|
||||||
g_delay_subsystem_init = false;
|
g_delay_subsystem_init = false;
|
||||||
@ -581,7 +581,6 @@ spdk_app_start(struct spdk_app_opts *opts, spdk_msg_fn start_fn,
|
|||||||
struct spdk_conf *config = NULL;
|
struct spdk_conf *config = NULL;
|
||||||
int rc;
|
int rc;
|
||||||
char *tty;
|
char *tty;
|
||||||
struct spdk_event *event;
|
|
||||||
|
|
||||||
if (!opts) {
|
if (!opts) {
|
||||||
SPDK_ERRLOG("opts should not be NULL\n");
|
SPDK_ERRLOG("opts should not be NULL\n");
|
||||||
@ -645,6 +644,14 @@ spdk_app_start(struct spdk_app_opts *opts, spdk_msg_fn start_fn,
|
|||||||
goto app_start_log_close_err;
|
goto app_start_log_close_err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Now that the reactors have been initialized, we can create an
|
||||||
|
* initialization thread. */
|
||||||
|
g_app_thread = spdk_thread_create("app_thread");
|
||||||
|
if (!g_app_thread) {
|
||||||
|
SPDK_ERRLOG("Unable to create an spdk_thread for initialization\n");
|
||||||
|
goto app_start_log_close_err;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Note the call to spdk_app_setup_trace() is located here
|
* Note the call to spdk_app_setup_trace() is located here
|
||||||
* ahead of spdk_app_setup_signal_handlers().
|
* ahead of spdk_app_setup_signal_handlers().
|
||||||
@ -668,14 +675,11 @@ spdk_app_start(struct spdk_app_opts *opts, spdk_msg_fn start_fn,
|
|||||||
g_spdk_app.shutdown_cb = opts->shutdown_cb;
|
g_spdk_app.shutdown_cb = opts->shutdown_cb;
|
||||||
g_spdk_app.rc = 0;
|
g_spdk_app.rc = 0;
|
||||||
|
|
||||||
g_init_lcore = spdk_env_get_current_core();
|
|
||||||
g_delay_subsystem_init = opts->delay_subsystem_init;
|
g_delay_subsystem_init = opts->delay_subsystem_init;
|
||||||
g_start_fn = start_fn;
|
g_start_fn = start_fn;
|
||||||
g_start_arg = arg1;
|
g_start_arg = arg1;
|
||||||
|
|
||||||
event = spdk_event_allocate(g_init_lcore, bootstrap_fn, NULL, NULL);
|
spdk_thread_send_msg(g_app_thread, bootstrap_fn, NULL);
|
||||||
|
|
||||||
spdk_event_call(event);
|
|
||||||
|
|
||||||
/* This blocks until spdk_app_stop is called */
|
/* This blocks until spdk_app_stop is called */
|
||||||
spdk_reactors_start();
|
spdk_reactors_start();
|
||||||
@ -703,7 +707,7 @@ spdk_app_fini(void)
|
|||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
_spdk_app_stop(void *arg1, void *arg2)
|
_spdk_app_stop(void *arg1)
|
||||||
{
|
{
|
||||||
spdk_rpc_finish();
|
spdk_rpc_finish();
|
||||||
spdk_subsystem_fini(spdk_reactors_stop, NULL);
|
spdk_subsystem_fini(spdk_reactors_stop, NULL);
|
||||||
@ -717,10 +721,10 @@ spdk_app_stop(int rc)
|
|||||||
}
|
}
|
||||||
g_spdk_app.rc = rc;
|
g_spdk_app.rc = rc;
|
||||||
/*
|
/*
|
||||||
* We want to run spdk_subsystem_fini() from the same lcore where spdk_subsystem_init()
|
* We want to run spdk_subsystem_fini() from the same thread where spdk_subsystem_init()
|
||||||
* was called.
|
* was called.
|
||||||
*/
|
*/
|
||||||
spdk_event_call(spdk_event_allocate(g_init_lcore, _spdk_app_stop, NULL, NULL));
|
spdk_thread_send_msg(g_app_thread, _spdk_app_stop, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
@ -1052,7 +1056,7 @@ spdk_rpc_start_subsystem_init_cpl(void *arg1)
|
|||||||
struct spdk_jsonrpc_request *request = arg1;
|
struct spdk_jsonrpc_request *request = arg1;
|
||||||
struct spdk_json_write_ctx *w;
|
struct spdk_json_write_ctx *w;
|
||||||
|
|
||||||
assert(spdk_env_get_current_core() == g_init_lcore);
|
assert(spdk_get_thread() == g_app_thread);
|
||||||
|
|
||||||
spdk_app_start_application();
|
spdk_app_start_application();
|
||||||
|
|
||||||
|
@ -223,24 +223,9 @@ static int
|
|||||||
_spdk_reactor_run(void *arg)
|
_spdk_reactor_run(void *arg)
|
||||||
{
|
{
|
||||||
struct spdk_reactor *reactor = arg;
|
struct spdk_reactor *reactor = arg;
|
||||||
struct spdk_thread *orig_thread, *thread;
|
struct spdk_thread *thread;
|
||||||
uint64_t last_rusage = 0;
|
uint64_t last_rusage = 0;
|
||||||
struct spdk_lw_thread *lw_thread, *tmp;
|
struct spdk_lw_thread *lw_thread, *tmp;
|
||||||
char thread_name[32];
|
|
||||||
|
|
||||||
snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore);
|
|
||||||
orig_thread = spdk_thread_create(thread_name);
|
|
||||||
if (!orig_thread) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
lw_thread = (struct spdk_lw_thread *)spdk_thread_get_ctx(orig_thread);
|
|
||||||
if (!lw_thread) {
|
|
||||||
spdk_thread_exit(orig_thread);
|
|
||||||
return -ENOMEM;
|
|
||||||
}
|
|
||||||
|
|
||||||
TAILQ_INSERT_TAIL(&reactor->threads, lw_thread, link);
|
|
||||||
|
|
||||||
SPDK_NOTICELOG("Reactor started on core %u\n", reactor->lcore);
|
SPDK_NOTICELOG("Reactor started on core %u\n", reactor->lcore);
|
||||||
|
|
||||||
@ -271,11 +256,11 @@ _spdk_reactor_run(void *arg)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
lw_thread = spdk_thread_get_ctx(orig_thread);
|
TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
|
||||||
TAILQ_REMOVE(&reactor->threads, lw_thread, link);
|
thread = spdk_thread_get_from_ctx(lw_thread);
|
||||||
assert(TAILQ_EMPTY(&reactor->threads));
|
TAILQ_REMOVE(&reactor->threads, lw_thread, link);
|
||||||
|
spdk_thread_exit(thread);
|
||||||
spdk_thread_exit(orig_thread);
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@ -320,6 +305,7 @@ spdk_reactors_start(void)
|
|||||||
struct spdk_reactor *reactor;
|
struct spdk_reactor *reactor;
|
||||||
uint32_t i, current_core;
|
uint32_t i, current_core;
|
||||||
int rc;
|
int rc;
|
||||||
|
char thread_name[32];
|
||||||
|
|
||||||
g_reactor_state = SPDK_REACTOR_STATE_RUNNING;
|
g_reactor_state = SPDK_REACTOR_STATE_RUNNING;
|
||||||
g_spdk_app_core_mask = spdk_cpuset_alloc();
|
g_spdk_app_core_mask = spdk_cpuset_alloc();
|
||||||
@ -334,6 +320,10 @@ spdk_reactors_start(void)
|
|||||||
assert(false);
|
assert(false);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* For now, for each reactor spawn one thread. */
|
||||||
|
snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore);
|
||||||
|
spdk_thread_create(thread_name);
|
||||||
}
|
}
|
||||||
spdk_cpuset_set_cpu(g_spdk_app_core_mask, i, true);
|
spdk_cpuset_set_cpu(g_spdk_app_core_mask, i, true);
|
||||||
}
|
}
|
||||||
@ -355,6 +345,43 @@ spdk_reactors_stop(void *arg1)
|
|||||||
g_reactor_state = SPDK_REACTOR_STATE_EXITING;
|
g_reactor_state = SPDK_REACTOR_STATE_EXITING;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static pthread_mutex_t g_scheduler_mtx = PTHREAD_MUTEX_INITIALIZER;
|
||||||
|
static uint32_t g_next_core = UINT32_MAX;
|
||||||
|
|
||||||
|
static void
|
||||||
|
_schedule_thread(void *arg1, void *arg2)
|
||||||
|
{
|
||||||
|
struct spdk_lw_thread *lw_thread = arg1;
|
||||||
|
struct spdk_reactor *reactor;
|
||||||
|
|
||||||
|
reactor = spdk_reactor_get(spdk_env_get_current_core());
|
||||||
|
|
||||||
|
TAILQ_INSERT_TAIL(&reactor->threads, lw_thread, link);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
spdk_reactor_schedule_thread(struct spdk_thread *thread)
|
||||||
|
{
|
||||||
|
uint32_t core;
|
||||||
|
struct spdk_lw_thread *lw_thread;
|
||||||
|
struct spdk_event *evt;
|
||||||
|
|
||||||
|
lw_thread = spdk_thread_get_ctx(thread);
|
||||||
|
assert(lw_thread != NULL);
|
||||||
|
memset(lw_thread, 0, sizeof(*lw_thread));
|
||||||
|
|
||||||
|
pthread_mutex_lock(&g_scheduler_mtx);
|
||||||
|
if (g_next_core > spdk_env_get_core_count()) {
|
||||||
|
g_next_core = spdk_env_get_first_core();
|
||||||
|
}
|
||||||
|
core = g_next_core;
|
||||||
|
g_next_core = spdk_env_get_next_core(g_next_core);
|
||||||
|
pthread_mutex_unlock(&g_scheduler_mtx);
|
||||||
|
|
||||||
|
evt = spdk_event_allocate(core, _schedule_thread, lw_thread, NULL);
|
||||||
|
spdk_event_call(evt);
|
||||||
|
}
|
||||||
|
|
||||||
int
|
int
|
||||||
spdk_reactors_init(void)
|
spdk_reactors_init(void)
|
||||||
{
|
{
|
||||||
@ -388,7 +415,7 @@ spdk_reactors_init(void)
|
|||||||
|
|
||||||
memset(g_reactors, 0, (last_core + 1) * sizeof(struct spdk_reactor));
|
memset(g_reactors, 0, (last_core + 1) * sizeof(struct spdk_reactor));
|
||||||
|
|
||||||
spdk_thread_lib_init(NULL, sizeof(struct spdk_lw_thread));
|
spdk_thread_lib_init(spdk_reactor_schedule_thread, sizeof(struct spdk_lw_thread));
|
||||||
|
|
||||||
SPDK_ENV_FOREACH_CORE(i) {
|
SPDK_ENV_FOREACH_CORE(i) {
|
||||||
reactor = spdk_reactor_get(i);
|
reactor = spdk_reactor_get(i);
|
||||||
|
Loading…
Reference in New Issue
Block a user