examples/accel_perf: adding batching capability for copy

This patch uses the IDXD batch feature for initial queue depth loading.
As there is a good amount of code needed to support batching on a per
command basis, this starts with only copy.  As other commands are
enabled for batching, they will be added to accel_perf.  If batching is
not available, the perf tool will simply submit the initial queue depth via
a loop, the same way it did before batching.

Signed-off-by: paul luse <paul.e.luse@intel.com>
Change-Id: I2669936e4da9b31a1d8fa8f0c71e9f5a6fcae412
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/2745
Community-CI: Mellanox Build Bot
Tested-by: SPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: Ben Walker <benjamin.walker@intel.com>
Reviewed-by: Jim Harris <james.r.harris@intel.com>
This commit is contained in:
paul luse 2020-06-02 14:27:47 -04:00 committed by Tomasz Zawadzki
parent e63eb0375d
commit a34fc12be3

View File

@ -58,6 +58,7 @@ static enum accel_capability g_workload_selection;
static struct worker_thread *g_workers = NULL;
static int g_num_workers = 0;
static pthread_mutex_t g_workers_lock = PTHREAD_MUTEX_INITIALIZER;
uint64_t g_capabilites;
struct worker_thread {
struct spdk_io_channel *ch;
@ -318,6 +319,16 @@ _accel_done(void *arg1)
}
}
static void
batch_done(void *ref, int status)
{
struct ap_task *task = __ap_task_from_accel_task(ref);
struct worker_thread *worker = task->worker;
worker->current_queue_depth--;
spdk_mempool_put(worker->task_pool, task);
}
static int
dump_result(void)
{
@ -395,21 +406,11 @@ _init_thread_done(void *ctx)
{
}
static void
_init_thread(void *arg1)
static int
_get_task_data_bufs(struct ap_task *task)
{
struct worker_thread *worker;
char task_pool_name[30];
struct ap_task *task;
int i;
uint32_t align = 0;
worker = calloc(1, sizeof(*worker));
if (worker == NULL) {
fprintf(stderr, "Unable to allocate worker\n");
return;
}
/* For dualcast, the DSA HW requires 4K alignment on destination addresses but
* we do this for all engines to keep it simple.
*/
@ -417,6 +418,73 @@ _init_thread(void *arg1)
align = ALIGN_4K;
}
task->src = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL);
if (task->src == NULL) {
fprintf(stderr, "Unable to alloc src buffer\n");
return -ENOMEM;
}
memset(task->src, DATA_PATTERN, g_xfer_size_bytes);
task->dst = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL);
if (task->dst == NULL) {
fprintf(stderr, "Unable to alloc dst buffer\n");
return -ENOMEM;
}
/* For compare we want the buffers to match, otherwise not. */
if (g_workload_selection == ACCEL_COMPARE) {
memset(task->dst, DATA_PATTERN, g_xfer_size_bytes);
} else {
memset(task->dst, ~DATA_PATTERN, g_xfer_size_bytes);
}
if (g_workload_selection == ACCEL_DUALCAST) {
task->dst2 = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL);
if (task->dst2 == NULL) {
fprintf(stderr, "Unable to alloc dst buffer\n");
return -ENOMEM;
}
memset(task->dst2, ~DATA_PATTERN, g_xfer_size_bytes);
}
return 0;
}
static int
_batch_prep_cmd(struct worker_thread *worker, struct ap_task *task, struct spdk_accel_batch *batch)
{
int rc = 0;
switch (g_workload_selection) {
case ACCEL_COPY:
rc = spdk_accel_batch_prep_copy(__accel_task_from_ap_task(task),
worker->ch, batch, task->dst,
task->src, g_xfer_size_bytes, accel_done);
break;
default:
assert(false);
break;
}
return rc;
}
static void
_init_thread(void *arg1)
{
struct worker_thread *worker;
char task_pool_name[30];
struct ap_task *task;
int i, rc, max_per_batch, batch_count;
int remaining = g_queue_depth;
struct spdk_accel_batch *batch, *new_batch;
worker = calloc(1, sizeof(*worker));
if (worker == NULL) {
fprintf(stderr, "Unable to allocate worker\n");
return;
}
worker->core = spdk_env_get_current_core();
worker->thread = spdk_get_thread();
worker->next = g_workers;
@ -424,7 +492,7 @@ _init_thread(void *arg1)
snprintf(task_pool_name, sizeof(task_pool_name), "task_pool_%d", g_num_workers);
worker->task_pool = spdk_mempool_create(task_pool_name,
g_queue_depth,
g_queue_depth * 2,
spdk_accel_task_size() + sizeof(struct ap_task),
SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
SPDK_ENV_SOCKET_ID_ANY);
@ -443,44 +511,100 @@ _init_thread(void *arg1)
g_num_workers++;
pthread_mutex_unlock(&g_workers_lock);
for (i = 0; i < g_queue_depth; i++) {
/* TODO: remove the check for ACCEL_COPY as other workloads are enabled for
* batching. It's a lot of code per workload so they are beeing added in
* separate patches.
*/
if (g_workload_selection == ACCEL_COPY && ((g_capabilites & ACCEL_BATCH) == ACCEL_BATCH) &&
g_queue_depth > 1) {
/* Selected engine supports batching and we have enough, so do it. */
max_per_batch = spdk_accel_batch_get_max(worker->ch);
/* Outter loop sets up each batch command, inner loop populates the
* batch descriptors.
*/
do {
new_batch = spdk_accel_batch_create(worker->ch);
if (new_batch == NULL) {
break;
}
batch = new_batch;
batch_count = 0;
do {
task = spdk_mempool_get(worker->task_pool);
if (!task) {
fprintf(stderr, "Unable to get accel_task\n");
goto error;
}
task->worker = worker;
task->worker->current_queue_depth++;
if (_get_task_data_bufs(task)) {
fprintf(stderr, "Unable to get data bufs\n");
goto error;
}
rc = _batch_prep_cmd(worker, task, batch);
if (rc) {
fprintf(stderr, "error preping command\n");
goto error;
}
remaining--;
batch_count++;
} while (batch_count < max_per_batch && remaining > 0);
/* Now send the batch command. */
task = spdk_mempool_get(worker->task_pool);
if (!task) {
fprintf(stderr, "Unable to get accel_task\n");
goto error;
}
task->worker = worker;
task->worker->current_queue_depth++;
rc = spdk_accel_batch_submit(__accel_task_from_ap_task(task),
worker->ch, batch, batch_done);
if (rc) {
fprintf(stderr, "error ending batch %d\n", rc);
goto error;
}
/* We can't build a batch unless it has 2 descriptors (per spec). */
} while (remaining > 1);
/* If there are no more left, we're done. */
if (remaining == 0) {
return;
}
}
/* For engines that don't support batch or for the odd event that
* a batch ends with only one descriptor left.
*/
for (i = 0; i < remaining; i++) {
task = spdk_mempool_get(worker->task_pool);
if (!task) {
fprintf(stderr, "Unable to get accel_task\n");
return;
goto error;
}
task->src = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL);
if (task->src == NULL) {
fprintf(stderr, "Unable to alloc src buffer\n");
return;
}
memset(task->src, DATA_PATTERN, g_xfer_size_bytes);
task->dst = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL);
if (task->dst == NULL) {
fprintf(stderr, "Unable to alloc dst buffer\n");
return;
}
if (g_workload_selection == ACCEL_DUALCAST) {
task->dst2 = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL);
if (task->dst2 == NULL) {
fprintf(stderr, "Unable to alloc dst buffer\n");
return;
}
memset(task->dst2, ~DATA_PATTERN, g_xfer_size_bytes);
}
/* For compare we want the buffers to match, otherwise not. */
if (g_workload_selection == ACCEL_COMPARE) {
memset(task->dst, DATA_PATTERN, g_xfer_size_bytes);
} else {
memset(task->dst, ~DATA_PATTERN, g_xfer_size_bytes);
if (_get_task_data_bufs(task)) {
fprintf(stderr, "Unable to get data bufs\n");
goto error;
}
_submit_single(worker, task);
}
return;
error:
/* TODO clean exit */
raise(SIGINT);
spdk_mempool_free(worker->task_pool);
free(worker);
spdk_app_stop(-1);
}
static void
@ -498,14 +622,13 @@ accel_done(void *ref, int status)
static void
accel_perf_start(void *arg1)
{
uint64_t capabilites;
struct spdk_io_channel *accel_ch;
accel_ch = spdk_accel_engine_get_io_channel();
capabilites = spdk_accel_get_capabilities(accel_ch);
g_capabilites = spdk_accel_get_capabilities(accel_ch);
spdk_put_io_channel(accel_ch);
if ((capabilites & g_workload_selection) != g_workload_selection) {
if ((g_capabilites & g_workload_selection) != g_workload_selection) {
SPDK_ERRLOG("Selected workload is not supported by the current engine\n");
SPDK_NOTICELOG("Software engine is selected by default, enable a HW engine via RPC\n\n");
spdk_app_stop(-1);