diff --git a/test/bdev/bdevperf/bdevperf.c b/test/bdev/bdevperf/bdevperf.c index a3b53feee..11daad91c 100644 --- a/test/bdev/bdevperf/bdevperf.c +++ b/test/bdev/bdevperf/bdevperf.c @@ -85,8 +85,6 @@ static const char *g_job_bdev_name; static bool g_wait_for_tests = false; static struct spdk_jsonrpc_request *g_request = NULL; static bool g_multithread_mode = false; -static uint32_t g_core_ordinal = 0; -pthread_mutex_t g_ordinal_lock = PTHREAD_MUTEX_INITIALIZER; static struct spdk_poller *g_perf_timer = NULL; @@ -99,7 +97,8 @@ struct bdevperf_job { struct spdk_bdev_desc *bdev_desc; struct spdk_io_channel *ch; TAILQ_ENTRY(bdevperf_job) link; - struct bdevperf_reactor *reactor; + struct spdk_thread *thread; + uint64_t io_completed; uint64_t prev_io_completed; double ema_io_per_second; @@ -117,31 +116,20 @@ struct bdevperf_job { TAILQ_HEAD(, bdevperf_task) task_list; }; -struct bdevperf_reactor { - struct spdk_thread *thread; - TAILQ_HEAD(, bdevperf_job) jobs; - uint32_t lcore; - uint32_t multiplier; - TAILQ_ENTRY(bdevperf_reactor) link; -}; - struct spdk_bdevperf { - TAILQ_HEAD(, bdevperf_reactor) reactors; - uint32_t num_reactors; + TAILQ_HEAD(, bdevperf_job) jobs; uint32_t running_jobs; }; static struct spdk_bdevperf g_bdevperf = { - .reactors = TAILQ_HEAD_INITIALIZER(g_bdevperf.reactors), - .num_reactors = 0, + .jobs = TAILQ_HEAD_INITIALIZER(g_bdevperf.jobs), .running_jobs = 0, }; -struct bdevperf_reactor *g_next_reactor; - static bool g_performance_dump_active = false; struct bdevperf_aggregate_stats { + struct bdevperf_job *current_job; uint64_t io_time_in_usec; uint64_t ema_period; double total_io_per_second; @@ -183,8 +171,8 @@ performance_dump_job(struct bdevperf_aggregate_stats *stats, struct bdevperf_job { double io_per_second, mb_per_second; - printf("\r Thread name: %s\n", spdk_thread_get_name(job->reactor->thread)); - printf("\r Core Mask: 0x%s\n", spdk_cpuset_fmt(spdk_thread_get_cpumask(job->reactor->thread))); + printf("\r Thread name: %s\n", spdk_thread_get_name(job->thread)); + printf("\r Core Mask: 0x%s\n", spdk_cpuset_fmt(spdk_thread_get_cpumask(job->thread))); if (stats->ema_period == 0) { io_per_second = get_cma_io_per_second(job, stats->io_time_in_usec); @@ -290,41 +278,9 @@ verify_data(void *wr_buf, int wr_buf_len, void *rd_buf, int rd_buf_len, int bloc return true; } -static void -_bdevperf_fini_thread_done(struct spdk_io_channel_iter *i, int status) -{ - spdk_io_device_unregister(&g_bdevperf, NULL); - - spdk_app_stop(g_run_rc); -} - -static void -_bdevperf_fini_thread(struct spdk_io_channel_iter *i) -{ - struct spdk_io_channel *ch; - struct bdevperf_reactor *reactor; - - ch = spdk_io_channel_iter_get_channel(i); - reactor = spdk_io_channel_get_ctx(ch); - - TAILQ_REMOVE(&g_bdevperf.reactors, reactor, link); - - spdk_put_io_channel(ch); - - spdk_for_each_channel_continue(i, 0); -} - -static void -bdevperf_fini(void) -{ - spdk_for_each_channel(&g_bdevperf, _bdevperf_fini_thread, NULL, - _bdevperf_fini_thread_done); -} - static void bdevperf_test_done(void *ctx) { - struct bdevperf_reactor *reactor; struct bdevperf_job *job, *jtmp; struct bdevperf_task *task, *ttmp; @@ -349,26 +305,24 @@ bdevperf_test_done(void *ctx) (double)g_time_in_usec / 1000000); } - TAILQ_FOREACH(reactor, &g_bdevperf.reactors, link) { - TAILQ_FOREACH_SAFE(job, &reactor->jobs, link, jtmp) { - TAILQ_REMOVE(&reactor->jobs, job, link); + TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, jtmp) { + TAILQ_REMOVE(&g_bdevperf.jobs, job, link); - performance_dump_job(&g_stats, job); + performance_dump_job(&g_stats, job); - TAILQ_FOREACH_SAFE(task, &job->task_list, link, ttmp) { - TAILQ_REMOVE(&job->task_list, task, link); - spdk_free(task->buf); - spdk_free(task->md_buf); - free(task); - } - - if (g_verify) { - spdk_bit_array_free(&job->outstanding); - } - - free(job->name); - free(job); + TAILQ_FOREACH_SAFE(task, &job->task_list, link, ttmp) { + TAILQ_REMOVE(&job->task_list, task, link); + spdk_free(task->buf); + spdk_free(task->md_buf); + free(task); } + + if (g_verify) { + spdk_bit_array_free(&job->outstanding); + } + + free(job->name); + free(job); } printf("\r =====================================================\n"); @@ -379,7 +333,7 @@ bdevperf_test_done(void *ctx) if (g_request && !g_shutdown) { rpc_perform_tests_cb(); } else { - bdevperf_fini(); + spdk_app_stop(g_run_rc); } } @@ -864,8 +818,9 @@ reset_job(void *arg) } static void -bdevperf_job_run(struct bdevperf_job *job) +bdevperf_job_run(void *ctx) { + struct bdevperf_job *job = ctx; struct bdevperf_task *task; int i; @@ -886,30 +841,9 @@ bdevperf_job_run(struct bdevperf_job *job) } static void -bdevperf_submit_on_reactor(struct spdk_io_channel_iter *i) +_performance_dump_done(void *ctx) { - struct spdk_io_channel *ch; - struct bdevperf_reactor *reactor; - struct bdevperf_job *job; - - ch = spdk_io_channel_iter_get_channel(i); - reactor = spdk_io_channel_get_ctx(ch); - - /* Submit initial I/O for each block device. Each time one - * completes, another will be submitted. */ - TAILQ_FOREACH(job, &reactor->jobs, link) { - bdevperf_job_run(job); - } - - spdk_for_each_channel_continue(i, 0); -} - -static void -_performance_dump_done(struct spdk_io_channel_iter *i, int status) -{ - struct bdevperf_aggregate_stats *stats; - - stats = spdk_io_channel_iter_get_ctx(i); + struct bdevperf_aggregate_stats *stats = ctx; printf("\r =====================================================\n"); printf("\r %-20s: %10.2f IOPS %10.2f MiB/s\n", @@ -922,29 +856,20 @@ _performance_dump_done(struct spdk_io_channel_iter *i, int status) } static void -_performance_dump(struct spdk_io_channel_iter *i) +_performance_dump(void *ctx) { - struct bdevperf_aggregate_stats *stats; - struct spdk_io_channel *ch; - struct bdevperf_reactor *reactor; - struct bdevperf_job *job; + struct bdevperf_aggregate_stats *stats = ctx; - stats = spdk_io_channel_iter_get_ctx(i); - ch = spdk_io_channel_iter_get_channel(i); - reactor = spdk_io_channel_get_ctx(ch); + performance_dump_job(stats, stats->current_job); - if (TAILQ_EMPTY(&reactor->jobs)) { - goto exit; + /* This assumes the jobs list is static after start up time. + * That's true right now, but if that ever changed this would need a lock. */ + stats->current_job = TAILQ_NEXT(stats->current_job, link); + if (stats->current_job == NULL) { + spdk_thread_send_msg(g_master_thread, _performance_dump_done, stats); + } else { + spdk_thread_send_msg(stats->current_job->thread, _performance_dump, stats); } - - TAILQ_FOREACH(job, &reactor->jobs, link) { - performance_dump_job(stats, job); - } - - fflush(stdout); - -exit: - spdk_for_each_channel_continue(i, 0); } static int @@ -968,14 +893,25 @@ performance_statistics_thread(void *arg) stats->io_time_in_usec = g_show_performance_period_num * g_show_performance_period_in_usec; stats->ema_period = g_show_performance_ema_period; - spdk_for_each_channel(&g_bdevperf, _performance_dump, stats, - _performance_dump_done); + /* Iterate all of the jobs to gather stats + * These jobs will not get removed here until a final performance dump is run, + * so this should be safe without locking. + */ + stats->current_job = TAILQ_FIRST(&g_bdevperf.jobs); + if (stats->current_job == NULL) { + spdk_thread_send_msg(g_master_thread, _performance_dump_done, stats); + } else { + spdk_thread_send_msg(stats->current_job->thread, _performance_dump, stats); + } + return -1; } static void bdevperf_test(void) { + struct bdevperf_job *job; + printf("Running I/O for %" PRIu64 " seconds...\n", g_time_in_usec / 1000000); fflush(stdout); @@ -986,8 +922,11 @@ bdevperf_test(void) g_show_performance_period_in_usec); } - /* Iterate reactors to start all I/O */ - spdk_for_each_channel(&g_bdevperf, bdevperf_submit_on_reactor, NULL, NULL); + /* Iterate jobs to start all I/O */ + TAILQ_FOREACH(job, &g_bdevperf.jobs, link) { + g_bdevperf.running_jobs++; + spdk_thread_send_msg(job->thread, bdevperf_job_run, job); + } } static void @@ -995,9 +934,6 @@ bdevperf_bdev_removed(void *arg) { struct bdevperf_job *job = arg; - assert(spdk_io_channel_get_thread(spdk_io_channel_from_ctx(job->reactor)) == - spdk_get_thread()); - bdevperf_job_drain(job); } @@ -1006,10 +942,8 @@ static uint32_t g_construct_job_count = 0; static void _bdevperf_construct_job_done(void *ctx) { - /* Update g_bdevperf.running_jobs on the master thread. */ - g_bdevperf.running_jobs++; - if (--g_construct_job_count == 0) { + if (g_run_rc != 0) { /* Something failed. */ bdevperf_test_done(NULL); @@ -1047,17 +981,27 @@ end: } static int -bdevperf_construct_job(struct spdk_bdev *bdev, struct bdevperf_reactor *reactor) +bdevperf_construct_job(struct spdk_bdev *bdev, struct spdk_cpuset *cpumask, + uint32_t offset, uint32_t length) { struct bdevperf_job *job; struct bdevperf_task *task; int block_size, data_block_size; int rc; int task_num, n; + char thread_name[32]; + struct spdk_thread *thread; /* This function runs on the master thread. */ assert(g_master_thread == spdk_get_thread()); + snprintf(thread_name, sizeof(thread_name), "%s_%s", spdk_bdev_get_name(bdev), + spdk_cpuset_fmt(cpumask)); + + /* Create a new thread for the job */ + thread = spdk_thread_create(thread_name, cpumask); + assert(thread != NULL); + block_size = spdk_bdev_get_block_size(bdev); data_block_size = spdk_bdev_get_data_block_size(bdev); @@ -1096,13 +1040,15 @@ bdevperf_construct_job(struct spdk_bdev *bdev, struct bdevperf_reactor *reactor) job->dif_check_flags |= SPDK_DIF_FLAGS_GUARD_CHECK; } - job->size_in_ios = spdk_bdev_get_num_blocks(bdev) / job->io_size_blocks; job->offset_in_ios = 0; - if (g_multithread_mode) { - job->size_in_ios = job->size_in_ios / g_bdevperf.num_reactors; - job->ios_base = reactor->multiplier * job->size_in_ios; + if (length != 0) { + /* Use subset of disk */ + job->size_in_ios = length / job->io_size_blocks; + job->ios_base = offset / job->io_size_blocks; } else { + /* Use whole disk */ + job->size_in_ios = spdk_bdev_get_num_blocks(bdev) / job->io_size_blocks; job->ios_base = 0; } @@ -1124,7 +1070,7 @@ bdevperf_construct_job(struct spdk_bdev *bdev, struct bdevperf_reactor *reactor) task_num += 1; } - TAILQ_INSERT_TAIL(&reactor->jobs, job, link); + TAILQ_INSERT_TAIL(&g_bdevperf.jobs, job, link); for (n = 0; n < task_num; n++) { task = calloc(1, sizeof(struct bdevperf_task)); @@ -1157,11 +1103,11 @@ bdevperf_construct_job(struct spdk_bdev *bdev, struct bdevperf_reactor *reactor) TAILQ_INSERT_TAIL(&job->task_list, task, link); } - job->reactor = reactor; + job->thread = thread; g_construct_job_count++; - rc = spdk_thread_send_msg(reactor->thread, _bdevperf_construct_job, job); + rc = spdk_thread_send_msg(thread, _bdevperf_construct_job, job); assert(rc == 0); return rc; @@ -1171,9 +1117,23 @@ static void bdevperf_construct_multithread_jobs(void) { struct spdk_bdev *bdev; - struct bdevperf_reactor *reactor; + uint32_t i; + struct spdk_cpuset cpumask; + uint32_t num_cores; + uint32_t blocks_per_job; + uint32_t offset; int rc; + num_cores = 0; + SPDK_ENV_FOREACH_CORE(i) { + num_cores++; + } + + if (num_cores == 0) { + g_run_rc = -EINVAL; + return; + } + if (g_job_bdev_name != NULL) { bdev = spdk_bdev_get_by_name(g_job_bdev_name); if (!bdev) { @@ -1181,24 +1141,40 @@ bdevperf_construct_multithread_jobs(void) return; } - /* Build a job for each reactor */ - TAILQ_FOREACH(reactor, &g_bdevperf.reactors, link) { - rc = bdevperf_construct_job(bdev, reactor); + blocks_per_job = spdk_bdev_get_num_blocks(bdev) / num_cores; + offset = 0; + + SPDK_ENV_FOREACH_CORE(i) { + spdk_cpuset_zero(&cpumask); + spdk_cpuset_set_cpu(&cpumask, i, true); + + /* Construct the job */ + rc = bdevperf_construct_job(bdev, &cpumask, offset, blocks_per_job); if (rc < 0) { g_run_rc = rc; break; } + + offset += blocks_per_job; } } else { bdev = spdk_bdev_first_leaf(); while (bdev != NULL) { - /* Build a job for each reactor */ - TAILQ_FOREACH(reactor, &g_bdevperf.reactors, link) { - rc = bdevperf_construct_job(bdev, reactor); + blocks_per_job = spdk_bdev_get_num_blocks(bdev) / num_cores; + offset = 0; + + SPDK_ENV_FOREACH_CORE(i) { + spdk_cpuset_zero(&cpumask); + spdk_cpuset_set_cpu(&cpumask, i, true); + + /* Construct the job */ + rc = bdevperf_construct_job(bdev, &cpumask, offset, blocks_per_job); if (rc < 0) { g_run_rc = rc; break; } + + offset += blocks_per_job; } if (g_run_rc != 0) { @@ -1210,35 +1186,37 @@ bdevperf_construct_multithread_jobs(void) } } - -static struct bdevperf_reactor * -get_next_bdevperf_reactor(void) +static uint32_t +_get_next_core(void) { - struct bdevperf_reactor *reactor; + static uint32_t current_core = SPDK_ENV_LCORE_ID_ANY; - if (g_next_reactor == NULL) { - g_next_reactor = TAILQ_FIRST(&g_bdevperf.reactors); - assert(g_next_reactor != NULL); + if (current_core == SPDK_ENV_LCORE_ID_ANY) { + current_core = spdk_env_get_first_core(); + return current_core; } - reactor = g_next_reactor; - g_next_reactor = TAILQ_NEXT(g_next_reactor, link); + current_core = spdk_env_get_next_core(current_core); + if (current_core == SPDK_ENV_LCORE_ID_ANY) { + current_core = spdk_env_get_first_core(); + } - return reactor; + return current_core; } static void bdevperf_construct_jobs(void) { struct spdk_bdev *bdev; - struct bdevperf_reactor *reactor; + uint32_t lcore; + struct spdk_cpuset cpumask; int rc; /* There are two entirely separate modes for allocating jobs. Standard mode - * (the default) creates one job per bdev and assigns them to reactors round-robin. + * (the default) creates one spdk_thread per bdev and runs the I/O job there. * * The -C flag places bdevperf into "multithread" mode, meaning it creates - * one job per bdev per REACTOR. + * one spdk_thread per bdev PER CORE, and runs a copy of the job on each. * This runs multiple threads per bdev, effectively. */ @@ -1255,11 +1233,13 @@ bdevperf_construct_jobs(void) if (g_job_bdev_name != NULL) { bdev = spdk_bdev_get_by_name(g_job_bdev_name); if (bdev) { - /* Select the reactor for this job */ - reactor = get_next_bdevperf_reactor(); + lcore = _get_next_core(); + + spdk_cpuset_zero(&cpumask); + spdk_cpuset_set_cpu(&cpumask, lcore, true); /* Construct the job */ - rc = bdevperf_construct_job(bdev, reactor); + rc = bdevperf_construct_job(bdev, &cpumask, 0, 0); if (rc < 0) { g_run_rc = rc; } @@ -1268,12 +1248,15 @@ bdevperf_construct_jobs(void) } } else { bdev = spdk_bdev_first_leaf(); + while (bdev != NULL) { - /* Select the reactor for this job */ - reactor = get_next_bdevperf_reactor(); + lcore = _get_next_core(); + + spdk_cpuset_zero(&cpumask); + spdk_cpuset_set_cpu(&cpumask, lcore, true); /* Construct the job */ - rc = bdevperf_construct_job(bdev, reactor); + rc = bdevperf_construct_job(bdev, &cpumask, 0, 0); if (rc < 0) { g_run_rc = rc; break; @@ -1295,48 +1278,10 @@ end: } } -static int -bdevperf_reactor_create(void *io_device, void *ctx_buf) -{ - struct bdevperf_reactor *reactor = ctx_buf; - - TAILQ_INIT(&reactor->jobs); - reactor->lcore = spdk_env_get_current_core(); - pthread_mutex_lock(&g_ordinal_lock); - reactor->multiplier = g_core_ordinal++; - pthread_mutex_unlock(&g_ordinal_lock); - reactor->thread = spdk_get_thread(); - - return 0; -} - static void -bdevperf_reactor_destroy(void *io_device, void *ctx_buf) +bdevperf_run(void *arg1) { - struct bdevperf_reactor *reactor = ctx_buf; - struct spdk_io_channel *ch; - struct spdk_thread *thread; - - ch = spdk_io_channel_from_ctx(reactor); - thread = spdk_io_channel_get_thread(ch); - - assert(thread == spdk_get_thread()); - - spdk_thread_exit(thread); -} - -static void -_bdevperf_init_thread_done(void *ctx) -{ - struct bdevperf_reactor *reactor = ctx; - - TAILQ_INSERT_TAIL(&g_bdevperf.reactors, reactor, link); - - assert(g_bdevperf.num_reactors < spdk_env_get_core_count()); - - if (++g_bdevperf.num_reactors < spdk_env_get_core_count()) { - return; - } + g_master_thread = spdk_get_thread(); if (g_wait_for_tests) { /* Do not perform any tests until RPC is received */ @@ -1346,46 +1291,6 @@ _bdevperf_init_thread_done(void *ctx) bdevperf_construct_jobs(); } -static void -_bdevperf_init_thread(void *ctx) -{ - struct spdk_io_channel *ch; - struct bdevperf_reactor *reactor; - - ch = spdk_get_io_channel(&g_bdevperf); - reactor = spdk_io_channel_get_ctx(ch); - - spdk_thread_send_msg(g_master_thread, _bdevperf_init_thread_done, reactor); -} - -static void -bdevperf_run(void *arg1) -{ - struct spdk_cpuset tmp_cpumask = {}; - uint32_t i; - char thread_name[32]; - struct spdk_thread *thread; - - g_master_thread = spdk_get_thread(); - - spdk_io_device_register(&g_bdevperf, bdevperf_reactor_create, bdevperf_reactor_destroy, - sizeof(struct bdevperf_reactor), "bdevperf"); - - /* Create threads for CPU cores active for this application, and send a - * message to each thread to create a reactor on it. - */ - SPDK_ENV_FOREACH_CORE(i) { - spdk_cpuset_zero(&tmp_cpumask); - spdk_cpuset_set_cpu(&tmp_cpumask, i, true); - snprintf(thread_name, sizeof(thread_name), "bdevperf_reactor_%u", i); - - thread = spdk_thread_create(thread_name, &tmp_cpumask); - assert(thread != NULL); - - spdk_thread_send_msg(thread, _bdevperf_init_thread, NULL); - } -} - static void rpc_perform_tests_cb(void) { @@ -1428,32 +1333,16 @@ rpc_perform_tests(struct spdk_jsonrpc_request *request, const struct spdk_json_v SPDK_RPC_REGISTER("perform_tests", rpc_perform_tests, SPDK_RPC_RUNTIME) static void -bdevperf_stop_io_on_reactor(struct spdk_io_channel_iter *i) +_bdevperf_job_drain(void *ctx) { - struct spdk_io_channel *ch; - struct bdevperf_reactor *reactor; - struct bdevperf_job *job; - - ch = spdk_io_channel_iter_get_channel(i); - reactor = spdk_io_channel_get_ctx(ch); - - /* Stop I/O for each block device. */ - TAILQ_FOREACH(job, &reactor->jobs, link) { - bdevperf_job_drain(job); - } - - spdk_for_each_channel_continue(i, 0); + bdevperf_job_drain(ctx); } static void spdk_bdevperf_shutdown_cb(void) { g_shutdown = true; - - if (TAILQ_EMPTY(&g_bdevperf.reactors)) { - spdk_app_stop(0); - return; - } + struct bdevperf_job *job, *tmp; if (g_bdevperf.running_jobs == 0) { bdevperf_test_done(NULL); @@ -1462,8 +1351,10 @@ spdk_bdevperf_shutdown_cb(void) g_shutdown_tsc = spdk_get_ticks() - g_shutdown_tsc; - /* Send events to stop all I/O on each reactor */ - spdk_for_each_channel(&g_bdevperf, bdevperf_stop_io_on_reactor, NULL, NULL); + /* Iterate jobs to stop all I/O */ + TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, tmp) { + spdk_thread_send_msg(job->thread, _bdevperf_job_drain, job); + } } static int