diff --git a/examples/accel/perf/accel_perf.c b/examples/accel/perf/accel_perf.c index 1b386054f..dcf39a84d 100644 --- a/examples/accel/perf/accel_perf.c +++ b/examples/accel/perf/accel_perf.c @@ -58,6 +58,7 @@ static enum accel_capability g_workload_selection; static struct worker_thread *g_workers = NULL; static int g_num_workers = 0; static pthread_mutex_t g_workers_lock = PTHREAD_MUTEX_INITIALIZER; +uint64_t g_capabilites; struct worker_thread { struct spdk_io_channel *ch; @@ -318,6 +319,16 @@ _accel_done(void *arg1) } } +static void +batch_done(void *ref, int status) +{ + struct ap_task *task = __ap_task_from_accel_task(ref); + struct worker_thread *worker = task->worker; + + worker->current_queue_depth--; + spdk_mempool_put(worker->task_pool, task); +} + static int dump_result(void) { @@ -395,21 +406,11 @@ _init_thread_done(void *ctx) { } -static void -_init_thread(void *arg1) +static int +_get_task_data_bufs(struct ap_task *task) { - struct worker_thread *worker; - char task_pool_name[30]; - struct ap_task *task; - int i; uint32_t align = 0; - worker = calloc(1, sizeof(*worker)); - if (worker == NULL) { - fprintf(stderr, "Unable to allocate worker\n"); - return; - } - /* For dualcast, the DSA HW requires 4K alignment on destination addresses but * we do this for all engines to keep it simple. */ @@ -417,6 +418,73 @@ _init_thread(void *arg1) align = ALIGN_4K; } + task->src = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL); + if (task->src == NULL) { + fprintf(stderr, "Unable to alloc src buffer\n"); + return -ENOMEM; + } + memset(task->src, DATA_PATTERN, g_xfer_size_bytes); + + task->dst = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL); + if (task->dst == NULL) { + fprintf(stderr, "Unable to alloc dst buffer\n"); + return -ENOMEM; + } + + /* For compare we want the buffers to match, otherwise not. */ + if (g_workload_selection == ACCEL_COMPARE) { + memset(task->dst, DATA_PATTERN, g_xfer_size_bytes); + } else { + memset(task->dst, ~DATA_PATTERN, g_xfer_size_bytes); + } + + if (g_workload_selection == ACCEL_DUALCAST) { + task->dst2 = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL); + if (task->dst2 == NULL) { + fprintf(stderr, "Unable to alloc dst buffer\n"); + return -ENOMEM; + } + memset(task->dst2, ~DATA_PATTERN, g_xfer_size_bytes); + } + + return 0; +} + +static int +_batch_prep_cmd(struct worker_thread *worker, struct ap_task *task, struct spdk_accel_batch *batch) +{ + int rc = 0; + + switch (g_workload_selection) { + case ACCEL_COPY: + rc = spdk_accel_batch_prep_copy(__accel_task_from_ap_task(task), + worker->ch, batch, task->dst, + task->src, g_xfer_size_bytes, accel_done); + break; + default: + assert(false); + break; + } + + return rc; +} + +static void +_init_thread(void *arg1) +{ + struct worker_thread *worker; + char task_pool_name[30]; + struct ap_task *task; + int i, rc, max_per_batch, batch_count; + int remaining = g_queue_depth; + struct spdk_accel_batch *batch, *new_batch; + + worker = calloc(1, sizeof(*worker)); + if (worker == NULL) { + fprintf(stderr, "Unable to allocate worker\n"); + return; + } + worker->core = spdk_env_get_current_core(); worker->thread = spdk_get_thread(); worker->next = g_workers; @@ -424,7 +492,7 @@ _init_thread(void *arg1) snprintf(task_pool_name, sizeof(task_pool_name), "task_pool_%d", g_num_workers); worker->task_pool = spdk_mempool_create(task_pool_name, - g_queue_depth, + g_queue_depth * 2, spdk_accel_task_size() + sizeof(struct ap_task), SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, SPDK_ENV_SOCKET_ID_ANY); @@ -443,44 +511,100 @@ _init_thread(void *arg1) g_num_workers++; pthread_mutex_unlock(&g_workers_lock); - for (i = 0; i < g_queue_depth; i++) { + /* TODO: remove the check for ACCEL_COPY as other workloads are enabled for + * batching. It's a lot of code per workload so they are beeing added in + * separate patches. + */ + if (g_workload_selection == ACCEL_COPY && ((g_capabilites & ACCEL_BATCH) == ACCEL_BATCH) && + g_queue_depth > 1) { + + /* Selected engine supports batching and we have enough, so do it. */ + max_per_batch = spdk_accel_batch_get_max(worker->ch); + + /* Outter loop sets up each batch command, inner loop populates the + * batch descriptors. + */ + do { + new_batch = spdk_accel_batch_create(worker->ch); + if (new_batch == NULL) { + break; + } + + batch = new_batch; + batch_count = 0; + + do { + task = spdk_mempool_get(worker->task_pool); + if (!task) { + fprintf(stderr, "Unable to get accel_task\n"); + goto error; + } + task->worker = worker; + task->worker->current_queue_depth++; + + if (_get_task_data_bufs(task)) { + fprintf(stderr, "Unable to get data bufs\n"); + goto error; + } + + rc = _batch_prep_cmd(worker, task, batch); + if (rc) { + fprintf(stderr, "error preping command\n"); + goto error; + } + remaining--; + batch_count++; + } while (batch_count < max_per_batch && remaining > 0); + + /* Now send the batch command. */ + task = spdk_mempool_get(worker->task_pool); + if (!task) { + fprintf(stderr, "Unable to get accel_task\n"); + goto error; + } + task->worker = worker; + task->worker->current_queue_depth++; + + rc = spdk_accel_batch_submit(__accel_task_from_ap_task(task), + worker->ch, batch, batch_done); + if (rc) { + fprintf(stderr, "error ending batch %d\n", rc); + goto error; + } + /* We can't build a batch unless it has 2 descriptors (per spec). */ + } while (remaining > 1); + + /* If there are no more left, we're done. */ + if (remaining == 0) { + return; + } + } + + /* For engines that don't support batch or for the odd event that + * a batch ends with only one descriptor left. + */ + for (i = 0; i < remaining; i++) { + task = spdk_mempool_get(worker->task_pool); if (!task) { fprintf(stderr, "Unable to get accel_task\n"); - return; + goto error; } - task->src = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL); - if (task->src == NULL) { - fprintf(stderr, "Unable to alloc src buffer\n"); - return; - } - memset(task->src, DATA_PATTERN, g_xfer_size_bytes); - - task->dst = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL); - if (task->dst == NULL) { - fprintf(stderr, "Unable to alloc dst buffer\n"); - return; - } - - if (g_workload_selection == ACCEL_DUALCAST) { - task->dst2 = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL); - if (task->dst2 == NULL) { - fprintf(stderr, "Unable to alloc dst buffer\n"); - return; - } - memset(task->dst2, ~DATA_PATTERN, g_xfer_size_bytes); - } - - /* For compare we want the buffers to match, otherwise not. */ - if (g_workload_selection == ACCEL_COMPARE) { - memset(task->dst, DATA_PATTERN, g_xfer_size_bytes); - } else { - memset(task->dst, ~DATA_PATTERN, g_xfer_size_bytes); + if (_get_task_data_bufs(task)) { + fprintf(stderr, "Unable to get data bufs\n"); + goto error; } _submit_single(worker, task); } + return; +error: + /* TODO clean exit */ + raise(SIGINT); + spdk_mempool_free(worker->task_pool); + free(worker); + spdk_app_stop(-1); } static void @@ -498,14 +622,13 @@ accel_done(void *ref, int status) static void accel_perf_start(void *arg1) { - uint64_t capabilites; struct spdk_io_channel *accel_ch; accel_ch = spdk_accel_engine_get_io_channel(); - capabilites = spdk_accel_get_capabilities(accel_ch); + g_capabilites = spdk_accel_get_capabilities(accel_ch); spdk_put_io_channel(accel_ch); - if ((capabilites & g_workload_selection) != g_workload_selection) { + if ((g_capabilites & g_workload_selection) != g_workload_selection) { SPDK_ERRLOG("Selected workload is not supported by the current engine\n"); SPDK_NOTICELOG("Software engine is selected by default, enable a HW engine via RPC\n\n"); spdk_app_stop(-1);