diff --git a/module/accel/idxd/accel_engine_idxd.c b/module/accel/idxd/accel_engine_idxd.c index 440ff7697..9ac8ffc72 100644 --- a/module/accel/idxd/accel_engine_idxd.c +++ b/module/accel/idxd/accel_engine_idxd.c @@ -144,12 +144,8 @@ static int idxd_submit_tasks(struct spdk_io_channel *ch, struct spdk_accel_task *first_task) { struct idxd_io_channel *chan = spdk_io_channel_get_ctx(ch); - struct spdk_accel_task *task, *tmp, *batch_task; - struct idxd_batch *idxd_batch; - uint8_t fill_pattern; - TAILQ_HEAD(, spdk_accel_task) batch_tasks; + struct spdk_accel_task *task, *tmp; int rc = 0; - uint32_t task_count = 0; task = first_task; @@ -164,8 +160,14 @@ idxd_submit_tasks(struct spdk_io_channel *ch, struct spdk_accel_task *first_task return 0; } - /* If this is just a single task handle it here. */ - if (!TAILQ_NEXT(task, link)) { + /* The caller will either submit a single task or a group of tasks that are + * linked together but they cannot be on a list. For example, see idxd_poll() + * where a list of queued tasks is being resubmitted, the list they are on + * is initialized after saving off the first task from the list which is then + * passed in here. Similar thing is done in the accel framework. + */ + while (task) { + tmp = TAILQ_NEXT(task, link); rc = _process_single_task(ch, task); if (rc == -EBUSY) { @@ -173,93 +175,9 @@ idxd_submit_tasks(struct spdk_io_channel *ch, struct spdk_accel_task *first_task } else if (rc) { spdk_accel_task_complete(task, rc); } - - return 0; + task = tmp; } - /* TODO: The DSA batch interface has performance tradeoffs that we need to measure with real - * silicon. It's unclear which workloads will benefit from batching and with what sizes. Or - * if there are some cases where batching is more beneficial on the completion side by turning - * off completion notifications for elements within the batch. Need to run some experiments where - * we use the batching interface versus not to help provide guidance on how to use these batching - * API. Here below is one such place, currently those batching using the framework will end up - * also using the DSA batch interface. We could send each of these operations as single commands - * to the low level library. - */ - - /* More than one task, create IDXD batch(es). */ - do { - idxd_batch = spdk_idxd_batch_create(chan->chan); - if (idxd_batch == NULL) { - /* Queue them all and try again later */ - goto queue_tasks; - } - - task_count = 0; - - /* Keep track of each batch's tasks in case we need to cancel. */ - TAILQ_INIT(&batch_tasks); - do { - switch (task->op_code) { - case ACCEL_OPCODE_MEMMOVE: - rc = spdk_idxd_batch_prep_copy(chan->chan, idxd_batch, task->dst, task->src, task->nbytes, - idxd_done, task); - break; - case ACCEL_OPCODE_DUALCAST: - rc = spdk_idxd_batch_prep_dualcast(chan->chan, idxd_batch, task->dst, task->dst2, - task->src, task->nbytes, idxd_done, task); - break; - case ACCEL_OPCODE_COMPARE: - rc = spdk_idxd_batch_prep_compare(chan->chan, idxd_batch, task->src, task->src2, - task->nbytes, idxd_done, task); - break; - case ACCEL_OPCODE_MEMFILL: - fill_pattern = (uint8_t)task->fill_pattern; - memset(&task->fill_pattern, fill_pattern, sizeof(uint64_t)); - rc = spdk_idxd_batch_prep_fill(chan->chan, idxd_batch, task->dst, task->fill_pattern, - task->nbytes, idxd_done, task); - break; - case ACCEL_OPCODE_CRC32C: - rc = spdk_idxd_batch_prep_crc32c(chan->chan, idxd_batch, task->dst, task->src, - task->seed, task->nbytes, idxd_done, task); - break; - default: - assert(false); - break; - } - - tmp = TAILQ_NEXT(task, link); - - if (rc == 0) { - TAILQ_INSERT_TAIL(&batch_tasks, task, link); - } else { - assert(rc != -EBUSY); - spdk_accel_task_complete(task, rc); - } - - task_count++; - task = tmp; - } while (task && task_count < g_batch_max); - - if (!TAILQ_EMPTY(&batch_tasks)) { - rc = spdk_idxd_batch_submit(chan->chan, idxd_batch, NULL, NULL); - - if (rc) { - /* Cancel the batch, requeue the items in the batch adn then - * any tasks that still hadn't been processed yet. - */ - spdk_idxd_batch_cancel(chan->chan, idxd_batch); - - while (!TAILQ_EMPTY(&batch_tasks)) { - batch_task = TAILQ_FIRST(&batch_tasks); - TAILQ_REMOVE(&batch_tasks, batch_task, link); - TAILQ_INSERT_TAIL(&chan->queued_tasks, batch_task, link); - } - goto queue_tasks; - } - } - } while (task); - return 0; queue_tasks: