diff --git a/examples/accel/perf/accel_perf.c b/examples/accel/perf/accel_perf.c index 3daaaed52..216df26e4 100644 --- a/examples/accel/perf/accel_perf.c +++ b/examples/accel/perf/accel_perf.c @@ -49,6 +49,7 @@ static uint64_t g_tsc_us_rate; static uint64_t g_tsc_end; static int g_xfer_size_bytes = 4096; static int g_queue_depth = 32; +static int g_ops_per_batch = 0; static int g_time_in_sec = 5; static uint32_t g_crc32c_seed = 0; static int g_fail_percent_goal = 0; @@ -74,6 +75,14 @@ struct ap_task { TAILQ_ENTRY(ap_task) link; }; +struct accel_batch { + int status; + int cmd_count; + struct spdk_accel_batch *batch; + struct worker_thread *worker; + TAILQ_ENTRY(accel_batch) link; +}; + struct worker_thread { struct spdk_io_channel *ch; uint64_t xfer_completed; @@ -88,6 +97,10 @@ struct worker_thread { struct spdk_poller *is_draining_poller; struct spdk_poller *stop_poller; void *task_base; + struct accel_batch *batch_base; + TAILQ_HEAD(, accel_batch) in_prep_batches; + TAILQ_HEAD(, accel_batch) in_use_batches; + TAILQ_HEAD(, accel_batch) to_submit_batches; }; static void @@ -107,6 +120,11 @@ dump_user_config(struct spdk_app_opts *opts) printf("Transfer size: %u bytes\n", g_xfer_size_bytes); printf("Queue depth: %u\n", g_queue_depth); printf("Run time: %u seconds\n", g_time_in_sec); + if (g_ops_per_batch > 0) { + printf("Batching: %u operations\n", g_ops_per_batch); + } else { + printf("Batching: Disabled\n"); + } printf("Verify: %s\n\n", g_verify ? "Yes" : "No"); } @@ -115,7 +133,7 @@ usage(void) { printf("accel_perf options:\n"); printf("\t[-h help message]\n"); - printf("\t[-q queue depth]\n"); + printf("\t[-q queue depth per core]\n"); printf("\t[-n number of channels]\n"); printf("\t[-o transfer size in bytes]\n"); printf("\t[-t time in seconds]\n"); @@ -124,12 +142,16 @@ usage(void) printf("\t[-P for compare workload, percentage of operations that should miscompare (percent, default 0)\n"); printf("\t[-f for fill workload, use this BYTE value (default 255)\n"); printf("\t[-y verify result if this switch is on]\n"); + printf("\t[-b batch this number of operations at a time (default 0 = disabled)]\n"); } static int parse_args(int argc, char *argv) { switch (argc) { + case 'b': + g_ops_per_batch = spdk_strtol(optarg, 10); + break; case 'f': g_fill_pattern = (uint8_t)spdk_strtol(optarg, 10); break; @@ -178,6 +200,7 @@ unregister_worker(void *arg1) struct worker_thread *worker = arg1; free(worker->task_base); + free(worker->batch_base); spdk_put_io_channel(worker->ch); pthread_mutex_lock(&g_workers_lock); assert(g_num_workers >= 1); @@ -255,8 +278,7 @@ _get_task(struct worker_thread *worker) return task; } -static void accel_done(void *ref, int status); - +/* Submit one operation using the same ap task that just completed. */ static void _submit_single(struct worker_thread *worker, struct ap_task *task) { @@ -308,10 +330,15 @@ _submit_single(struct worker_thread *worker, struct ap_task *task) } static int -_batch_prep_cmd(struct worker_thread *worker, struct ap_task *task, struct spdk_accel_batch *batch) +_batch_prep_cmd(struct worker_thread *worker, struct ap_task *task, + struct accel_batch *worker_batch) { + struct spdk_accel_batch *batch = worker_batch->batch; int rc = 0; + worker_batch->cmd_count++; + assert(worker_batch->cmd_count <= g_ops_per_batch); + switch (g_workload_selection) { case ACCEL_COPY: rc = spdk_accel_batch_prep_copy(worker->ch, batch, task->dst, @@ -352,14 +379,125 @@ _free_task(struct ap_task *task) } } +static void _batch_done(void *cb_arg); +static void +_build_batch(struct worker_thread *worker, struct ap_task *task) +{ + struct accel_batch *worker_batch = NULL; + int rc; + + assert(!TAILQ_EMPTY(&worker->in_prep_batches)); + + worker_batch = TAILQ_FIRST(&worker->in_prep_batches); + + /* If an accel batch hasn't been created yet do so now. */ + if (worker_batch->batch == NULL) { + worker_batch->batch = spdk_accel_batch_create(worker->ch); + if (worker_batch->batch == NULL) { + fprintf(stderr, "error unable to create new batch\n"); + return; + } + } + + /* Prep the command re-using the last completed command's task */ + rc = _batch_prep_cmd(worker, task, worker_batch); + if (rc) { + fprintf(stderr, "error preping command for batch\n"); + goto error; + } + + /* If this batch is full move it to the to_submit list so it gets + * submitted as batches complete. + */ + if (worker_batch->cmd_count == g_ops_per_batch) { + TAILQ_REMOVE(&worker->in_prep_batches, worker_batch, link); + TAILQ_INSERT_TAIL(&worker->to_submit_batches, worker_batch, link); + } + + return; +error: + spdk_accel_batch_cancel(worker->ch, worker_batch->batch); + +} + +static void batch_done(void *cb_arg, int status); +static void +_drain_batch(struct worker_thread *worker) +{ + struct accel_batch *worker_batch, *tmp; + int rc; + + /* submit any batches that were being built up. */ + TAILQ_FOREACH_SAFE(worker_batch, &worker->in_prep_batches, link, tmp) { + if (worker_batch->cmd_count == 0) { + continue; + } + worker->current_queue_depth += worker_batch->cmd_count + 1; + + TAILQ_REMOVE(&worker->in_prep_batches, worker_batch, link); + TAILQ_INSERT_TAIL(&worker->in_use_batches, worker_batch, link); + rc = spdk_accel_batch_submit(worker->ch, worker_batch->batch, batch_done, worker_batch); + if (rc == 0) { + worker_batch->cmd_count = 0; + } else { + fprintf(stderr, "error sending final batch\n"); + worker->current_queue_depth -= worker_batch->cmd_count + 1; + break; + } + } +} + +static void +_batch_done(void *cb_arg) +{ + struct accel_batch *worker_batch = (struct accel_batch *)cb_arg; + struct worker_thread *worker = worker_batch->worker; + int rc; + + assert(TAILQ_EMPTY(&worker->in_use_batches) == 0); + + if (worker_batch->status) { + SPDK_ERRLOG("error %d\n", worker_batch->status); + } + + worker->current_queue_depth--; + TAILQ_REMOVE(&worker->in_use_batches, worker_batch, link); + TAILQ_INSERT_TAIL(&worker->in_prep_batches, worker_batch, link); + worker_batch->batch = NULL; + worker_batch->cmd_count = 0; + + if (!worker->is_draining) { + worker_batch = TAILQ_FIRST(&worker->to_submit_batches); + if (worker_batch != NULL) { + + assert(worker_batch->cmd_count == g_ops_per_batch); + + /* Add one for the batch command itself. */ + worker->current_queue_depth += g_ops_per_batch + 1; + TAILQ_REMOVE(&worker->to_submit_batches, worker_batch, link); + TAILQ_INSERT_TAIL(&worker->in_use_batches, worker_batch, link); + + rc = spdk_accel_batch_submit(worker->ch, worker_batch->batch, batch_done, worker_batch); + if (rc) { + fprintf(stderr, "error ending batch\n"); + worker->current_queue_depth -= g_ops_per_batch + 1; + return; + } + } + } else { + _drain_batch(worker); + } +} + static void batch_done(void *cb_arg, int status) { - struct ap_task *task = (struct ap_task *)cb_arg; - struct worker_thread *worker = task->worker; + struct accel_batch *worker_batch = (struct accel_batch *)cb_arg; - worker->current_queue_depth--; - TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link); + assert(worker_batch->worker); + + worker_batch->status = status; + spdk_thread_send_msg(worker_batch->worker->thread, _batch_done, worker_batch); } static void @@ -416,7 +554,7 @@ _accel_done(void *arg1) assert(task->status != 0); worker->injected_miscompares++; } else if (task->status) { - /* Expected to pass but API reported error. */ + /* Expected to pass but the accel engine reported an error (ex: COMPARE operation). */ worker->xfer_failed++; } @@ -424,9 +562,16 @@ _accel_done(void *arg1) worker->current_queue_depth--; if (!worker->is_draining) { - _submit_single(worker, task); - worker->current_queue_depth++; + if (g_ops_per_batch == 0) { + _submit_single(worker, task); + worker->current_queue_depth++; + } else { + _build_batch(worker, task); + } + } else if (g_ops_per_batch > 0) { + _drain_batch(worker); } + } static int @@ -516,9 +661,12 @@ _init_thread(void *arg1) { struct worker_thread *worker; struct ap_task *task; - int i, rc, max_per_batch, batch_count, num_tasks; + int i, rc, num_batches; + int max_per_batch; int remaining = g_queue_depth; - struct spdk_accel_batch *batch, *new_batch; + int num_tasks = g_queue_depth; + struct accel_batch *tmp; + struct accel_batch *worker_batch = NULL; worker = calloc(1, sizeof(*worker)); if (worker == NULL) { @@ -531,11 +679,51 @@ _init_thread(void *arg1) worker->next = g_workers; worker->ch = spdk_accel_engine_get_io_channel(); - max_per_batch = spdk_accel_batch_get_max(worker->ch); - assert(max_per_batch > 0); - num_tasks = g_queue_depth + spdk_divide_round_up(g_queue_depth, max_per_batch); - TAILQ_INIT(&worker->tasks_pool); + + if (g_ops_per_batch > 0) { + + max_per_batch = spdk_accel_batch_get_max(worker->ch); + assert(max_per_batch > 0); + + if (g_ops_per_batch > max_per_batch) { + fprintf(stderr, "Reducing requested batch amount to max supported of %d\n", max_per_batch); + g_ops_per_batch = max_per_batch; + } + + if (g_ops_per_batch > g_queue_depth) { + fprintf(stderr, "Batch amount > queue depth, resetting to %d\n", g_queue_depth); + g_ops_per_batch = g_queue_depth; + } + + TAILQ_INIT(&worker->in_prep_batches); + TAILQ_INIT(&worker->to_submit_batches); + TAILQ_INIT(&worker->in_use_batches); + + /* A worker_batch will live on one of 3 lists: + * IN_PREP: as individual IOs complete new ones are built on on a + * worker_batch on this list until it reaches g_ops_per_batch. + * TO_SUBMIT: as batches are built up on IO completion they are moved + * to this list once they are full. This list is used in + * batch completion to start new batches. + * IN_USE: the worker_batch is outstanding and will be moved to in prep + * list when the batch is completed. + * + * So we need enough to cover Q depth loading and then one to replace + * each one of those and for when everything is outstanding there needs + * to be one extra batch to build up while the last batch is completing + * IO but before it's completed the batch command. + */ + num_batches = (g_queue_depth / g_ops_per_batch * 2) + 1; + worker->batch_base = calloc(num_batches, sizeof(struct accel_batch)); + worker_batch = worker->batch_base; + for (i = 0; i < num_batches; i++) { + worker_batch->worker = worker; + TAILQ_INSERT_TAIL(&worker->in_prep_batches, worker_batch, link); + worker_batch++; + } + } + worker->task_base = calloc(num_tasks, sizeof(struct ap_task)); if (worker->task_base == NULL) { fprintf(stderr, "Could not allocate task base.\n"); @@ -561,61 +749,53 @@ _init_thread(void *arg1) g_num_workers++; pthread_mutex_unlock(&g_workers_lock); - /* Batching is only possible if there is at least 2 operations. */ - if (g_queue_depth > 1) { - - /* Outter loop sets up each batch command, inner loop populates the - * batch descriptors. - */ + /* If batching is enabled load up to the full Q depth before + * processing any completions, then ping pong between two batches, + * one processing and one being built up for when the other completes. + */ + if (g_ops_per_batch > 0) { do { - new_batch = spdk_accel_batch_create(worker->ch); - if (new_batch == NULL) { + worker_batch = TAILQ_FIRST(&worker->in_prep_batches); + if (worker_batch == NULL) { + goto error; + } + + worker_batch->batch = spdk_accel_batch_create(worker->ch); + if (worker_batch->batch == NULL) { + raise(SIGINT); break; } - batch = new_batch; - batch_count = 0; - - do { + for (i = 0; i < g_ops_per_batch; i++) { task = _get_task(worker); if (task == NULL) { goto error; } - rc = _batch_prep_cmd(worker, task, batch); + rc = _batch_prep_cmd(worker, task, worker_batch); if (rc) { fprintf(stderr, "error preping command\n"); goto error; } - remaining--; - batch_count++; - } while (batch_count < max_per_batch && remaining > 0); - - /* Now send the batch command. */ - task = _get_task(worker); - if (task == NULL) { - goto error; } - rc = spdk_accel_batch_submit(worker->ch, batch, batch_done, task); + /* for the batch operation itself. */ + task->worker->current_queue_depth++; + TAILQ_REMOVE(&worker->in_prep_batches, worker_batch, link); + TAILQ_INSERT_TAIL(&worker->in_use_batches, worker_batch, link); + + rc = spdk_accel_batch_submit(worker->ch, worker_batch->batch, batch_done, worker_batch); if (rc) { - fprintf(stderr, "error ending batch %d\n", rc); + fprintf(stderr, "error ending batch\n"); goto error; } - /* We can't build a batch unless it has 2 descriptors (per spec). */ - } while (remaining > 1); - - /* If there are no more left, we're done. */ - if (remaining == 0) { - return; - } + assert(remaining >= g_ops_per_batch); + remaining -= g_ops_per_batch; + } while (remaining > 0); } - /* For engines that don't support batch or for the odd event that - * a batch ends with only one descriptor left. - */ + /* Submit as singles when no batching is enabled or we ran out of batches. */ for (i = 0; i < remaining; i++) { - task = _get_task(worker); if (task == NULL) { goto error; @@ -625,10 +805,17 @@ _init_thread(void *arg1) } return; error: + if (worker_batch && worker_batch->batch) { + TAILQ_FOREACH_SAFE(worker_batch, &worker->in_use_batches, link, tmp) { + spdk_accel_batch_cancel(worker->ch, worker_batch->batch); + TAILQ_REMOVE(&worker->in_use_batches, worker_batch, link); + } + } while ((task = TAILQ_FIRST(&worker->tasks_pool))) { TAILQ_REMOVE(&worker->tasks_pool, task, link); _free_task(task); } + free(worker->batch_base); free(worker->task_base); free(worker); spdk_app_stop(-1); @@ -680,7 +867,7 @@ main(int argc, char **argv) pthread_mutex_init(&g_workers_lock, NULL); spdk_app_opts_init(&opts, sizeof(opts)); opts.reactor_mask = "0x1"; - if (spdk_app_parse_args(argc, argv, &opts, "o:q:t:yw:P:f:", NULL, parse_args, + if (spdk_app_parse_args(argc, argv, &opts, "o:q:t:yw:P:f:b:", NULL, parse_args, usage) != SPDK_APP_PARSE_ARGS_SUCCESS) { rc = -1; goto cleanup; @@ -696,6 +883,13 @@ main(int argc, char **argv) goto cleanup; } + if (g_ops_per_batch > 0 && (g_queue_depth % g_ops_per_batch > 0)) { + fprintf(stdout, "batch size must be a multiple of queue depth\n"); + usage(); + rc = -1; + goto cleanup; + } + dump_user_config(&opts); rc = spdk_app_start(&opts, accel_perf_start, NULL); if (rc) {