diff --git a/examples/accel/perf/accel_perf.c b/examples/accel/perf/accel_perf.c index 9eebfd6b5..8df008a53 100644 --- a/examples/accel/perf/accel_perf.c +++ b/examples/accel/perf/accel_perf.c @@ -80,13 +80,14 @@ struct worker_thread { uint64_t xfer_failed; uint64_t injected_miscompares; uint64_t current_queue_depth; - TAILQ_HEAD(, ap_task) tasks; + TAILQ_HEAD(, ap_task) tasks_pool; struct worker_thread *next; unsigned core; struct spdk_thread *thread; bool is_draining; struct spdk_poller *is_draining_poller; struct spdk_poller *stop_poller; + void *task_base; }; static void @@ -175,13 +176,8 @@ static void unregister_worker(void *arg1) { struct worker_thread *worker = arg1; - struct ap_task *task; - while (!TAILQ_EMPTY(&worker->tasks)) { - task = TAILQ_FIRST(&worker->tasks); - TAILQ_REMOVE(&worker->tasks, task, link); - free(task); - } + free(worker->task_base); spdk_put_io_channel(worker->ch); pthread_mutex_lock(&g_workers_lock); assert(g_num_workers >= 1); @@ -241,20 +237,34 @@ _get_task_data_bufs(struct ap_task *task) return 0; } +inline static struct ap_task * +_get_task(struct worker_thread *worker) +{ + struct ap_task *task; + + if (!TAILQ_EMPTY(&worker->tasks_pool)) { + task = TAILQ_FIRST(&worker->tasks_pool); + TAILQ_REMOVE(&worker->tasks_pool, task, link); + } else { + fprintf(stderr, "Unable to get ap_task\n"); + return NULL; + } + + task->worker = worker; + task->worker->current_queue_depth++; + return task; +} + static void accel_done(void *ref, int status); static void -_submit_single(void *arg1, void *arg2) +_submit_single(struct worker_thread *worker, struct ap_task *task) { - struct worker_thread *worker = arg1; - struct ap_task *task = arg2; int random_num; int rc = 0; assert(worker); - task->worker = worker; - task->worker->current_queue_depth++; switch (g_workload_selection) { case ACCEL_COPY: rc = spdk_accel_submit_copy(worker->ch, task->dst, task->src, @@ -297,6 +307,16 @@ _submit_single(void *arg1, void *arg2) } } +static void +_free_task(struct ap_task *task) +{ + spdk_dma_free(task->src); + spdk_dma_free(task->dst); + if (g_workload_selection == ACCEL_DUALCAST) { + spdk_dma_free(task->dst2); + } +} + static void _accel_done(void *arg1) { @@ -360,13 +380,7 @@ _accel_done(void *arg1) if (!worker->is_draining) { _submit_single(worker, task); - } else { - spdk_free(task->src); - spdk_free(task->dst); - if (g_workload_selection == ACCEL_DUALCAST) { - spdk_free(task->dst2); - } - TAILQ_INSERT_TAIL(&worker->tasks, task, link); + worker->current_queue_depth++; } } @@ -377,7 +391,6 @@ batch_done(void *cb_arg, int status) struct worker_thread *worker = task->worker; worker->current_queue_depth--; - TAILQ_INSERT_TAIL(&worker->tasks, task, link); } static int @@ -425,10 +438,15 @@ static int _check_draining(void *arg) { struct worker_thread *worker = arg; + struct ap_task *task; assert(worker); if (worker->current_queue_depth == 0) { + while ((task = TAILQ_FIRST(&worker->tasks_pool))) { + TAILQ_REMOVE(&worker->tasks_pool, task, link); + _free_task(task); + } spdk_poller_unregister(&worker->is_draining_poller); unregister_worker(worker); } @@ -516,15 +534,21 @@ _init_thread(void *arg1) assert(max_per_batch > 0); num_tasks = g_queue_depth + spdk_divide_round_up(g_queue_depth, max_per_batch); - TAILQ_INIT(&worker->tasks); + TAILQ_INIT(&worker->tasks_pool); + worker->task_base = calloc(num_tasks, sizeof(struct ap_task)); + if (worker->task_base == NULL) { + fprintf(stderr, "Could not allocate task base.\n"); + goto error; + } + + task = worker->task_base; for (i = 0; i < num_tasks; i++) { - task = calloc(1, sizeof(struct ap_task)); - if (task == NULL) { - fprintf(stderr, "Could not allocate task.\n"); - return; - /* TODO cleanup */ + TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link); + if (_get_task_data_bufs(task)) { + fprintf(stderr, "Unable to get data bufs\n"); + goto error; } - TAILQ_INSERT_TAIL(&worker->tasks, task, link); + task++; } /* Register a poller that will stop the worker at time elapsed */ @@ -552,18 +576,8 @@ _init_thread(void *arg1) batch_count = 0; do { - if (!TAILQ_EMPTY(&worker->tasks)) { - task = TAILQ_FIRST(&worker->tasks); - TAILQ_REMOVE(&worker->tasks, task, link); - } else { - fprintf(stderr, "Unable to get accel_task\n"); - goto error; - } - task->worker = worker; - task->worker->current_queue_depth++; - - if (_get_task_data_bufs(task)) { - fprintf(stderr, "Unable to get data bufs\n"); + task = _get_task(worker); + if (task == NULL) { goto error; } @@ -577,15 +591,10 @@ _init_thread(void *arg1) } while (batch_count < max_per_batch && remaining > 0); /* Now send the batch command. */ - if (!TAILQ_EMPTY(&worker->tasks)) { - task = TAILQ_FIRST(&worker->tasks); - TAILQ_REMOVE(&worker->tasks, task, link); - } else { - fprintf(stderr, "Unable to get accel_task\n"); + task = _get_task(worker); + if (task == NULL) { goto error; } - task->worker = worker; - task->worker->current_queue_depth++; rc = spdk_accel_batch_submit(worker->ch, batch, batch_done, task); if (rc) { @@ -606,16 +615,8 @@ _init_thread(void *arg1) */ for (i = 0; i < remaining; i++) { - if (!TAILQ_EMPTY(&worker->tasks)) { - task = TAILQ_FIRST(&worker->tasks); - TAILQ_REMOVE(&worker->tasks, task, link); - } else { - fprintf(stderr, "Unable to get accel_task\n"); - goto error; - } - - if (_get_task_data_bufs(task)) { - fprintf(stderr, "Unable to get data bufs\n"); + task = _get_task(worker); + if (task == NULL) { goto error; } @@ -623,13 +624,11 @@ _init_thread(void *arg1) } return; error: - /* TODO clean exit */ - raise(SIGINT); - while (!TAILQ_EMPTY(&worker->tasks)) { - task = TAILQ_FIRST(&worker->tasks); - TAILQ_REMOVE(&worker->tasks, task, link); - free(task); + while ((task = TAILQ_FIRST(&worker->tasks_pool))) { + TAILQ_REMOVE(&worker->tasks_pool, task, link); + _free_task(task); } + free(worker->task_base); free(worker); spdk_app_stop(-1); }