accel_perf: update to create worker threads on its own

As reactors no longer have a thread created with them.

Signed-off-by: paul luse <paul.e.luse@intel.com>
Change-Id: Ie9e9411c52c215b8cffd894fef6394448ae8167d
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/6312
Tested-by: SPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: Tomasz Zawadzki <tomasz.zawadzki@intel.com>
Reviewed-by: Jim Harris <james.r.harris@intel.com>
This commit is contained in:
paul luse 2021-02-07 13:46:50 -05:00 committed by Tomasz Zawadzki
parent 68ff34bc66
commit eea826a276

View File

@ -194,6 +194,7 @@ parse_args(int argc, char *argv)
return 0; return 0;
} }
static int dump_result(void);
static void static void
unregister_worker(void *arg1) unregister_worker(void *arg1)
{ {
@ -206,6 +207,7 @@ unregister_worker(void *arg1)
assert(g_num_workers >= 1); assert(g_num_workers >= 1);
if (--g_num_workers == 0) { if (--g_num_workers == 0) {
pthread_mutex_unlock(&g_workers_lock); pthread_mutex_unlock(&g_workers_lock);
dump_result();
spdk_app_stop(0); spdk_app_stop(0);
} }
pthread_mutex_unlock(&g_workers_lock); pthread_mutex_unlock(&g_workers_lock);
@ -660,11 +662,6 @@ _worker_stop(void *arg)
return 0; return 0;
} }
static void
_init_thread_done(void *ctx)
{
}
static void static void
_init_thread(void *arg1) _init_thread(void *arg1)
{ {
@ -685,7 +682,11 @@ _init_thread(void *arg1)
worker->core = spdk_env_get_current_core(); worker->core = spdk_env_get_current_core();
worker->thread = spdk_get_thread(); worker->thread = spdk_get_thread();
pthread_mutex_lock(&g_workers_lock);
g_num_workers++;
worker->next = g_workers; worker->next = g_workers;
g_workers = worker;
pthread_mutex_unlock(&g_workers_lock);
worker->ch = spdk_accel_engine_get_io_channel(); worker->ch = spdk_accel_engine_get_io_channel();
TAILQ_INIT(&worker->tasks_pool); TAILQ_INIT(&worker->tasks_pool);
@ -753,11 +754,6 @@ _init_thread(void *arg1)
worker->stop_poller = SPDK_POLLER_REGISTER(_worker_stop, worker, worker->stop_poller = SPDK_POLLER_REGISTER(_worker_stop, worker,
g_time_in_sec * 1000000ULL); g_time_in_sec * 1000000ULL);
g_workers = worker;
pthread_mutex_lock(&g_workers_lock);
g_num_workers++;
pthread_mutex_unlock(&g_workers_lock);
/* If batching is enabled load up to the full Q depth before /* If batching is enabled load up to the full Q depth before
* processing any completions, then ping pong between two batches, * processing any completions, then ping pong between two batches,
* one processing and one being built up for when the other completes. * one processing and one being built up for when the other completes.
@ -844,6 +840,10 @@ static void
accel_perf_start(void *arg1) accel_perf_start(void *arg1)
{ {
struct spdk_io_channel *accel_ch; struct spdk_io_channel *accel_ch;
struct spdk_cpuset tmp_cpumask = {};
char thread_name[32];
uint32_t i;
struct spdk_thread *thread;
accel_ch = spdk_accel_engine_get_io_channel(); accel_ch = spdk_accel_engine_get_io_channel();
g_capabilites = spdk_accel_get_capabilities(accel_ch); g_capabilites = spdk_accel_get_capabilities(accel_ch);
@ -861,7 +861,14 @@ accel_perf_start(void *arg1)
printf("Running for %d seconds...\n", g_time_in_sec); printf("Running for %d seconds...\n", g_time_in_sec);
fflush(stdout); fflush(stdout);
spdk_for_each_thread(_init_thread, NULL, _init_thread_done); /* Create worker threads for each core that was specified. */
SPDK_ENV_FOREACH_CORE(i) {
snprintf(thread_name, sizeof(thread_name), "ap_worker");
spdk_cpuset_zero(&tmp_cpumask);
spdk_cpuset_set_cpu(&tmp_cpumask, i, true);
thread = spdk_thread_create(thread_name, &tmp_cpumask);
spdk_thread_send_msg(thread, _init_thread, NULL);
}
} }
int int
@ -901,8 +908,6 @@ main(int argc, char **argv)
rc = spdk_app_start(&opts, accel_perf_start, NULL); rc = spdk_app_start(&opts, accel_perf_start, NULL);
if (rc) { if (rc) {
SPDK_ERRLOG("ERROR starting application\n"); SPDK_ERRLOG("ERROR starting application\n");
} else {
dump_result();
} }
pthread_mutex_destroy(&g_workers_lock); pthread_mutex_destroy(&g_workers_lock);