bdevperf: Create and use per lcore thread pool

If bdevperf runs with -C option and two or more NVMe bdevs,
two or more poll groups are created on each CPU core because bdevperf
creates one SPDK thread per job per CPU core in this case.

However, generally SPDK application creates one SPDK thread per CPU
core. We want bdevperf to immitate closely generic SPDK application
pattern.

For this requirement, create a thread poll made of per core threads.
Then, each job gets one of the threads whose lcore matches instead of
creating a new thread.

This feature is available only if bdevperf's config file is not used
because bdevperf's config file can specify cpumap per job.

Fixes issue #2933

Signed-off-by: Shuhei Matsumoto <smatsumoto@nvidia.com>
Change-Id: I0bd5a72f9ee8deade6c4e8ca0cdea3c89ff2541f
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/16994
Reviewed-by: Aleksey Marchuk <alexeymar@nvidia.com>
Tested-by: SPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: Jim Harris <james.r.harris@intel.com>
This commit is contained in:
Shuhei Matsumoto 2023-03-01 16:45:37 +09:00 committed by Tomasz Zawadzki
parent 72da547aa2
commit 8e373044bf

View File

@ -80,6 +80,8 @@ static void rpc_perform_tests_cb(void);
static uint32_t g_bdev_count = 0; static uint32_t g_bdev_count = 0;
static uint32_t g_latency_display_level; static uint32_t g_latency_display_level;
static bool g_one_thread_per_lcore = false;
static const double g_latency_cutoffs[] = { static const double g_latency_cutoffs[] = {
0.01, 0.01,
0.10, 0.10,
@ -184,6 +186,7 @@ struct job_config {
int bs; int bs;
int iodepth; int iodepth;
int rwmixread; int rwmixread;
uint32_t lcore;
int64_t offset; int64_t offset;
uint64_t length; uint64_t length;
enum job_config_rw rw; enum job_config_rw rw;
@ -211,6 +214,15 @@ struct bdevperf_aggregate_stats {
static struct bdevperf_aggregate_stats g_stats = {.min_latency = (double)UINT64_MAX}; static struct bdevperf_aggregate_stats g_stats = {.min_latency = (double)UINT64_MAX};
struct lcore_thread {
struct spdk_thread *thread;
uint32_t lcore;
TAILQ_ENTRY(lcore_thread) link;
};
TAILQ_HEAD(, lcore_thread) g_lcore_thread_list
= TAILQ_HEAD_INITIALIZER(g_lcore_thread_list);
/* /*
* Cumulative Moving Average (CMA): average of all data up to current * Cumulative Moving Average (CMA): average of all data up to current
* Exponential Moving Average (EMA): weighted mean of the previous n data and more weight is given to recent * Exponential Moving Average (EMA): weighted mean of the previous n data and more weight is given to recent
@ -506,6 +518,7 @@ bdevperf_test_done(void *ctx)
{ {
struct bdevperf_job *job, *jtmp; struct bdevperf_job *job, *jtmp;
struct bdevperf_task *task, *ttmp; struct bdevperf_task *task, *ttmp;
struct lcore_thread *lthread, *lttmp;
double average_latency = 0.0; double average_latency = 0.0;
uint64_t time_in_usec; uint64_t time_in_usec;
int rc; int rc;
@ -589,7 +602,9 @@ clean:
TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, jtmp) { TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, jtmp) {
TAILQ_REMOVE(&g_bdevperf.jobs, job, link); TAILQ_REMOVE(&g_bdevperf.jobs, job, link);
spdk_thread_send_msg(job->thread, job_thread_exit, NULL); if (!g_one_thread_per_lcore) {
spdk_thread_send_msg(job->thread, job_thread_exit, NULL);
}
TAILQ_FOREACH_SAFE(task, &job->task_list, link, ttmp) { TAILQ_FOREACH_SAFE(task, &job->task_list, link, ttmp) {
TAILQ_REMOVE(&job->task_list, task, link); TAILQ_REMOVE(&job->task_list, task, link);
@ -601,6 +616,14 @@ clean:
bdevperf_job_free(job); bdevperf_job_free(job);
} }
if (g_one_thread_per_lcore) {
TAILQ_FOREACH_SAFE(lthread, &g_lcore_thread_list, link, lttmp) {
TAILQ_REMOVE(&g_lcore_thread_list, lthread, link);
spdk_thread_send_msg(lthread->thread, job_thread_exit, NULL);
free(lthread);
}
}
rc = g_run_rc; rc = g_run_rc;
if (g_request && !g_shutdown) { if (g_request && !g_shutdown) {
rpc_perform_tests_cb(); rpc_perform_tests_cb();
@ -1852,6 +1875,20 @@ config_filename_next(const char *filename, char *out)
return filename + i; return filename + i;
} }
static struct spdk_thread *
get_lcore_thread(uint32_t lcore)
{
struct lcore_thread *lthread;
TAILQ_FOREACH(lthread, &g_lcore_thread_list, link) {
if (lthread->lcore == lcore) {
return lthread->thread;
}
}
return NULL;
}
static void static void
bdevperf_construct_jobs(void) bdevperf_construct_jobs(void)
{ {
@ -1865,7 +1902,11 @@ bdevperf_construct_jobs(void)
TAILQ_FOREACH(config, &job_config_list, link) { TAILQ_FOREACH(config, &job_config_list, link) {
filenames = config->filename; filenames = config->filename;
thread = construct_job_thread(&config->cpumask, config->name); if (!g_one_thread_per_lcore) {
thread = construct_job_thread(&config->cpumask, config->name);
} else {
thread = get_lcore_thread(config->lcore);
}
assert(thread); assert(thread);
while (filenames) { while (filenames) {
@ -1902,8 +1943,9 @@ make_cli_job_config(const char *filename, int64_t offset, uint64_t range)
config->name = filename; config->name = filename;
config->filename = filename; config->filename = filename;
config->lcore = _get_next_core();
spdk_cpuset_zero(&config->cpumask); spdk_cpuset_zero(&config->cpumask);
spdk_cpuset_set_cpu(&config->cpumask, _get_next_core(), true); spdk_cpuset_set_cpu(&config->cpumask, config->lcore, true);
config->bs = g_io_size; config->bs = g_io_size;
config->iodepth = g_queue_depth; config->iodepth = g_queue_depth;
config->rwmixread = g_rw_percentage; config->rwmixread = g_rw_percentage;
@ -1980,10 +2022,32 @@ bdevperf_construct_job_config(void *ctx, struct spdk_bdev *bdev)
return make_cli_job_config(spdk_bdev_get_name(bdev), 0, 0); return make_cli_job_config(spdk_bdev_get_name(bdev), 0, 0);
} }
static void
create_lcore_thread(uint32_t lcore)
{
struct lcore_thread *lthread;
struct spdk_cpuset cpumask = {};
char name[32];
lthread = calloc(1, sizeof(*lthread));
assert(lthread != NULL);
lthread->lcore = lcore;
snprintf(name, sizeof(name), "lcore_%u", lcore);
spdk_cpuset_set_cpu(&cpumask, lcore, true);
lthread->thread = spdk_thread_create(name, &cpumask);
assert(lthread->thread != NULL);
TAILQ_INSERT_TAIL(&g_lcore_thread_list, lthread, link);
}
static void static void
bdevperf_construct_job_configs(void) bdevperf_construct_job_configs(void)
{ {
struct spdk_bdev *bdev; struct spdk_bdev *bdev;
uint32_t i;
/* There are three different modes for allocating jobs. Standard mode /* There are three different modes for allocating jobs. Standard mode
* (the default) creates one spdk_thread per bdev and runs the I/O job there. * (the default) creates one spdk_thread per bdev and runs the I/O job there.
@ -1996,16 +2060,25 @@ bdevperf_construct_job_configs(void)
* In "FIO" mode, threads are spawned per-job instead of per-bdev. * In "FIO" mode, threads are spawned per-job instead of per-bdev.
* Each FIO job can be individually parameterized by filename, cpu mask, etc, * Each FIO job can be individually parameterized by filename, cpu mask, etc,
* which is different from other modes in that they only support global options. * which is different from other modes in that they only support global options.
*
* Both for standard mode and "multithread" mode, if the -E flag is specified,
* it creates one spdk_thread PER CORE. On each core, one spdk_thread is shared by
* multiple jobs.
*/ */
if (g_bdevperf_conf) { if (g_bdevperf_conf) {
goto end; goto end;
} else if (g_multithread_mode) {
bdevperf_construct_multithread_job_configs();
goto end;
} }
if (g_job_bdev_name != NULL) { if (g_one_thread_per_lcore) {
SPDK_ENV_FOREACH_CORE(i) {
create_lcore_thread(i);
}
}
if (g_multithread_mode) {
bdevperf_construct_multithread_job_configs();
} else if (g_job_bdev_name != NULL) {
bdev = spdk_bdev_get_by_name(g_job_bdev_name); bdev = spdk_bdev_get_by_name(g_job_bdev_name);
if (bdev) { if (bdev) {
/* Construct the job */ /* Construct the job */
@ -2381,6 +2454,8 @@ bdevperf_parse_arg(int ch, char *arg)
g_latency_display_level++; g_latency_display_level++;
} else if (ch == 'D') { } else if (ch == 'D') {
g_random_map = true; g_random_map = true;
} else if (ch == 'E') {
g_one_thread_per_lcore = true;
} else { } else {
tmp = spdk_strtoll(optarg, 10); tmp = spdk_strtoll(optarg, 10);
if (tmp < 0) { if (tmp < 0) {
@ -2446,6 +2521,7 @@ bdevperf_usage(void)
printf(" -j <filename> use job config file\n"); printf(" -j <filename> use job config file\n");
printf(" -l display latency histogram, default: disable. -l display summary, -ll display details\n"); printf(" -l display latency histogram, default: disable. -l display summary, -ll display details\n");
printf(" -D use a random map for picking offsets not previously read or written (for all jobs)\n"); printf(" -D use a random map for picking offsets not previously read or written (for all jobs)\n");
printf(" -E share per lcore thread among jobs. Available only if -j is not used.\n");
} }
static int static int
@ -2467,6 +2543,10 @@ verify_test_params(struct spdk_app_opts *opts)
if (!g_bdevperf_conf_file && !g_workload_type) { if (!g_bdevperf_conf_file && !g_workload_type) {
goto out; goto out;
} }
if (g_bdevperf_conf_file && g_one_thread_per_lcore) {
printf("If bdevperf's config file is used, per lcore thread cannot be used\n");
goto out;
}
if (g_time_in_sec <= 0) { if (g_time_in_sec <= 0) {
goto out; goto out;
} }
@ -2568,7 +2648,7 @@ main(int argc, char **argv)
opts.rpc_addr = NULL; opts.rpc_addr = NULL;
opts.shutdown_cb = spdk_bdevperf_shutdown_cb; opts.shutdown_cb = spdk_bdevperf_shutdown_cb;
if ((rc = spdk_app_parse_args(argc, argv, &opts, "Zzfq:o:t:w:k:CF:M:P:S:T:Xlj:D", NULL, if ((rc = spdk_app_parse_args(argc, argv, &opts, "Zzfq:o:t:w:k:CEF:M:P:S:T:Xlj:D", NULL,
bdevperf_parse_arg, bdevperf_usage)) != bdevperf_parse_arg, bdevperf_usage)) !=
SPDK_APP_PARSE_ARGS_SUCCESS) { SPDK_APP_PARSE_ARGS_SUCCESS) {
return rc; return rc;