diff --git a/examples/accel/perf/accel_perf.c b/examples/accel/perf/accel_perf.c index ed541272d..2093253c1 100644 --- a/examples/accel/perf/accel_perf.c +++ b/examples/accel/perf/accel_perf.c @@ -39,6 +39,7 @@ #include "spdk/string.h" #include "spdk/accel_engine.h" #include "spdk/crc32.h" +#include "spdk/util.h" #define DATA_PATTERN 0x5a #define ALIGN_4K 0x1000 @@ -59,6 +60,7 @@ static struct worker_thread *g_workers = NULL; static int g_num_workers = 0; static pthread_mutex_t g_workers_lock = PTHREAD_MUTEX_INITIALIZER; uint64_t g_capabilites; +struct ap_task; struct worker_thread { struct spdk_io_channel *ch; @@ -66,7 +68,7 @@ struct worker_thread { uint64_t xfer_failed; uint64_t injected_miscompares; uint64_t current_queue_depth; - struct spdk_mempool *task_pool; + TAILQ_HEAD(, ap_task) tasks; struct worker_thread *next; unsigned core; struct spdk_thread *thread; @@ -82,6 +84,7 @@ struct ap_task { struct worker_thread *worker; int status; int expected_status; /* used for compare */ + TAILQ_ENTRY(ap_task) link; }; static void @@ -170,8 +173,13 @@ static void unregister_worker(void *arg1) { struct worker_thread *worker = arg1; + struct ap_task *task; - spdk_mempool_free(worker->task_pool); + while (!TAILQ_EMPTY(&worker->tasks)) { + task = TAILQ_FIRST(&worker->tasks); + TAILQ_REMOVE(&worker->tasks, task, link); + free(task); + } spdk_put_io_channel(worker->ch); pthread_mutex_lock(&g_workers_lock); assert(g_num_workers >= 1); @@ -307,7 +315,7 @@ _accel_done(void *arg1) if (g_workload_selection == ACCEL_DUALCAST) { spdk_free(task->dst2); } - spdk_mempool_put(worker->task_pool, task); + TAILQ_INSERT_TAIL(&worker->tasks, task, link); } } @@ -318,7 +326,7 @@ batch_done(void *cb_arg, int status) struct worker_thread *worker = task->worker; worker->current_queue_depth--; - spdk_mempool_put(worker->task_pool, task); + TAILQ_INSERT_TAIL(&worker->tasks, task, link); } static int @@ -486,9 +494,8 @@ static void _init_thread(void *arg1) { struct worker_thread *worker; - char task_pool_name[30]; struct ap_task *task; - int i, rc, max_per_batch, batch_count; + int i, rc, max_per_batch, batch_count, num_tasks; int remaining = g_queue_depth; struct spdk_accel_batch *batch, *new_batch; @@ -503,16 +510,19 @@ _init_thread(void *arg1) worker->next = g_workers; worker->ch = spdk_accel_engine_get_io_channel(); - snprintf(task_pool_name, sizeof(task_pool_name), "task_pool_%d", g_num_workers); - worker->task_pool = spdk_mempool_create(task_pool_name, - g_queue_depth * 2, - sizeof(struct ap_task), - SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, - SPDK_ENV_SOCKET_ID_ANY); - if (!worker->task_pool) { - fprintf(stderr, "Could not allocate buffer pool.\n"); - free(worker); - return; + max_per_batch = spdk_accel_batch_get_max(worker->ch); + assert(max_per_batch > 0); + num_tasks = g_queue_depth + spdk_divide_round_up(g_queue_depth, max_per_batch); + + TAILQ_INIT(&worker->tasks); + for (i = 0; i < num_tasks; i++) { + task = calloc(1, sizeof(struct ap_task)); + if (task == NULL) { + fprintf(stderr, "Could not allocate task.\n"); + return; + /* TODO cleanup */ + } + TAILQ_INSERT_TAIL(&worker->tasks, task, link); } /* Register a poller that will stop the worker at time elapsed */ @@ -527,9 +537,6 @@ _init_thread(void *arg1) /* Batching is only possible if there is at least 2 operations. */ if (g_queue_depth > 1) { - /* Selected engine supports batching and we have enough, so do it. */ - max_per_batch = spdk_accel_batch_get_max(worker->ch); - /* Outter loop sets up each batch command, inner loop populates the * batch descriptors. */ @@ -543,8 +550,10 @@ _init_thread(void *arg1) batch_count = 0; do { - task = spdk_mempool_get(worker->task_pool); - if (!task) { + if (!TAILQ_EMPTY(&worker->tasks)) { + task = TAILQ_FIRST(&worker->tasks); + TAILQ_REMOVE(&worker->tasks, task, link); + } else { fprintf(stderr, "Unable to get accel_task\n"); goto error; } @@ -566,8 +575,10 @@ _init_thread(void *arg1) } while (batch_count < max_per_batch && remaining > 0); /* Now send the batch command. */ - task = spdk_mempool_get(worker->task_pool); - if (!task) { + if (!TAILQ_EMPTY(&worker->tasks)) { + task = TAILQ_FIRST(&worker->tasks); + TAILQ_REMOVE(&worker->tasks, task, link); + } else { fprintf(stderr, "Unable to get accel_task\n"); goto error; } @@ -593,8 +604,10 @@ _init_thread(void *arg1) */ for (i = 0; i < remaining; i++) { - task = spdk_mempool_get(worker->task_pool); - if (!task) { + if (!TAILQ_EMPTY(&worker->tasks)) { + task = TAILQ_FIRST(&worker->tasks); + TAILQ_REMOVE(&worker->tasks, task, link); + } else { fprintf(stderr, "Unable to get accel_task\n"); goto error; } @@ -610,7 +623,11 @@ _init_thread(void *arg1) error: /* TODO clean exit */ raise(SIGINT); - spdk_mempool_free(worker->task_pool); + while (!TAILQ_EMPTY(&worker->tasks)) { + task = TAILQ_FIRST(&worker->tasks); + TAILQ_REMOVE(&worker->tasks, task, link); + free(task); + } free(worker); spdk_app_stop(-1); }