accel/dpdk_cryptodev: Handle OP_STATUS_SUCCESS

SW PMD might process a crypto operation but failed
to submit it to a completions ring.
Such operation can't be retried if crypto operation
is inplace.
Handle such crypto op as a completed.
Verified by integrating rte openssl driver and
adding additional logs to check that SUCCESS
status received and completed as expected.

Signed-off-by: Alexey Marchuk <alexeymar@nvidia.com>
Change-Id: Ida161cec045167af752ebd5b57f41b2bbfe8b97c
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/16995
Reviewed-by: Jim Harris <james.r.harris@intel.com>
Reviewed-by: Ben Walker <benjamin.walker@intel.com>
Tested-by: SPDK CI Jenkins <sys_sgci@intel.com>
This commit is contained in:
Alexey Marchuk 2023-03-01 10:04:01 +01:00 committed by Ben Walker
parent 56eced4280
commit 6b7cca1542
2 changed files with 142 additions and 16 deletions

View File

@ -153,8 +153,11 @@ struct accel_dpdk_cryptodev_io_channel {
struct spdk_poller *poller;
/* Array of qpairs for each available device. The specific device will be selected depending on the crypto key */
struct accel_dpdk_cryptodev_qp *device_qp[ACCEL_DPDK_CRYPTODEV_DRIVER_LAST];
/* Used to queue tasks when qpair is full. No crypto operation was submitted to the driver by the task */
/* Used to queue tasks when qpair is full or only part of crypto ops was submitted to the PMD */
TAILQ_HEAD(, accel_dpdk_cryptodev_task) queued_tasks;
/* Used to queue tasks that were completed in submission path - to avoid calling cpl_cb and possibly overflow
* call stack */
TAILQ_HEAD(, accel_dpdk_cryptodev_task) completed_tasks;
};
struct accel_dpdk_cryptodev_task {
@ -291,6 +294,11 @@ accel_dpdk_cryptodev_poll_qp(struct accel_dpdk_cryptodev_qp *qp,
if (rc == -ENOMEM) {
TAILQ_INSERT_TAIL(&crypto_ch->queued_tasks, task, link);
continue;
} else if (rc == -EALREADY) {
/* -EALREADY means that a task is completed, but it might be unsafe to complete
* it if we are in the submission path. Since we are in the poller context, we can
* complete th task immediately */
rc = 0;
}
spdk_accel_task_complete(&task->base, rc);
}
@ -321,7 +329,7 @@ accel_dpdk_cryptodev_poller(void *args)
struct accel_dpdk_cryptodev_qp *qp;
struct accel_dpdk_cryptodev_task *task, *task_tmp;
TAILQ_HEAD(, accel_dpdk_cryptodev_task) queued_tasks_tmp;
uint32_t num_dequeued_ops = 0, num_enqueued_ops = 0;
uint32_t num_dequeued_ops = 0, num_enqueued_ops = 0, num_completed_tasks = 0;
int i, rc;
for (i = 0; i < ACCEL_DPDK_CRYPTODEV_DRIVER_LAST; i++) {
@ -344,8 +352,14 @@ accel_dpdk_cryptodev_poller(void *args)
/* Other queued tasks may belong to other qpairs,
* so process the whole list */
continue;
} else if (rc == -EALREADY) {
/* -EALREADY means that a task is completed, but it might be unsafe to complete
* it if we are in the submission path. Since we are in the poller context, we can
* complete th task immediately */
rc = 0;
}
spdk_accel_task_complete(&task->base, rc);
num_completed_tasks++;
} else {
num_enqueued_ops++;
}
@ -354,7 +368,13 @@ accel_dpdk_cryptodev_poller(void *args)
TAILQ_SWAP(&crypto_ch->queued_tasks, &queued_tasks_tmp, accel_dpdk_cryptodev_task, link);
}
return !!(num_dequeued_ops + num_enqueued_ops);
TAILQ_FOREACH_SAFE(task, &crypto_ch->completed_tasks, link, task_tmp) {
TAILQ_REMOVE(&crypto_ch->completed_tasks, task, link);
spdk_accel_task_complete(&task->base, rc);
num_completed_tasks++;
}
return !!(num_dequeued_ops + num_enqueued_ops + num_completed_tasks);
}
/* Allocate the new mbuf of @remainder size with data pointed by @addr and attach
@ -541,6 +561,18 @@ accel_dpdk_cryptodev_op_set_iv(struct rte_crypto_op *crypto_op, uint64_t iv)
rte_memcpy(iv_ptr, &iv, sizeof(uint64_t));
}
static inline void
accel_dpdk_cryptodev_update_resources_from_pools(struct rte_crypto_op **crypto_ops,
struct rte_mbuf **src_mbufs, struct rte_mbuf **dst_mbufs,
uint32_t num_enqueued_ops, uint32_t cryop_cnt)
{
memmove(crypto_ops, &crypto_ops[num_enqueued_ops], sizeof(crypto_ops[0]) * cryop_cnt);
memmove(src_mbufs, &src_mbufs[num_enqueued_ops], sizeof(src_mbufs[0]) * cryop_cnt);
if (dst_mbufs) {
memmove(dst_mbufs, &dst_mbufs[num_enqueued_ops], sizeof(dst_mbufs[0]) * cryop_cnt);
}
}
static int
accel_dpdk_cryptodev_process_task(struct accel_dpdk_cryptodev_io_channel *crypto_ch,
struct accel_dpdk_cryptodev_task *task)
@ -563,6 +595,7 @@ accel_dpdk_cryptodev_process_task(struct accel_dpdk_cryptodev_io_channel *crypto
struct accel_dpdk_cryptodev_device *dev;
struct spdk_iov_sgl src, dst = {};
int rc;
bool inplace = task->inplace;
if (spdk_unlikely(!task->base.crypto_key ||
task->base.crypto_key->module_if != &g_accel_dpdk_cryptodev_module)) {
@ -636,7 +669,7 @@ accel_dpdk_cryptodev_process_task(struct accel_dpdk_cryptodev_io_channel *crypto
return -EINVAL;
}
rc = accel_dpdk_cryptodev_task_alloc_resources(src_mbufs, task->inplace ? NULL : dst_mbufs,
rc = accel_dpdk_cryptodev_task_alloc_resources(src_mbufs, inplace ? NULL : dst_mbufs,
crypto_ops, cryop_cnt);
if (rc) {
return rc;
@ -651,7 +684,7 @@ accel_dpdk_cryptodev_process_task(struct accel_dpdk_cryptodev_io_channel *crypto
*/
spdk_iov_sgl_init(&src, task->base.s.iovs, task->base.s.iovcnt, 0);
spdk_iov_sgl_advance(&src, sgl_offset);
if (!task->inplace) {
if (!inplace) {
spdk_iov_sgl_init(&dst, task->base.d.iovs, task->base.d.iovcnt, 0);
spdk_iov_sgl_advance(&dst, sgl_offset);
}
@ -672,7 +705,7 @@ accel_dpdk_cryptodev_process_task(struct accel_dpdk_cryptodev_io_channel *crypto
/* link the mbuf to the crypto op. */
crypto_ops[crypto_index]->sym->m_src = src_mbufs[crypto_index];
if (task->inplace) {
if (inplace) {
crypto_ops[crypto_index]->sym->m_dst = NULL;
} else {
#ifndef __clang_analyzer__
@ -698,6 +731,29 @@ accel_dpdk_cryptodev_process_task(struct accel_dpdk_cryptodev_io_channel *crypto
*/
if (num_enqueued_ops < cryop_cnt) {
switch (crypto_ops[num_enqueued_ops]->status) {
case RTE_CRYPTO_OP_STATUS_SUCCESS:
/* Crypto operation might be completed successfully but enqueuing to a completion ring might fail.
* That might happen with SW PMDs like openssl
* We can't retry such operation on next turn since if crypto operation was inplace, we can encrypt/
* decrypt already processed buffer. See github issue #2907 for more details.
* Handle this case as the crypto op was completed successfully - increment cryop_submitted and
* cryop_completed.
* We won't receive a completion for such operation, so we need to cleanup mbufs and crypto_ops */
assert(task->cryop_total > task->cryop_completed);
task->cryop_completed++;
task->cryop_submitted++;
if (task->cryop_completed == task->cryop_total) {
assert(num_enqueued_ops == 0);
/* All crypto ops are completed. We can't complete the task immediately since this function might be
* called in scope of spdk_accel_submit_* function and user's logic in the completion callback
* might lead to stack overflow */
cryop_cnt -= num_enqueued_ops;
accel_dpdk_cryptodev_update_resources_from_pools(crypto_ops, src_mbufs, inplace ? NULL : dst_mbufs,
num_enqueued_ops, cryop_cnt);
rc = -EALREADY;
goto free_ops;
}
/* fallthrough */
case RTE_CRYPTO_OP_STATUS_NOT_PROCESSED:
if (num_enqueued_ops == 0) {
/* Nothing was submitted. Free crypto ops and mbufs, treat this case as NOMEM */
@ -707,12 +763,8 @@ accel_dpdk_cryptodev_process_task(struct accel_dpdk_cryptodev_io_channel *crypto
/* Part of the crypto operations were not submitted, release mbufs and crypto ops.
* The rest crypto ops will be submitted again once current batch is completed */
cryop_cnt -= num_enqueued_ops;
memmove(crypto_ops, &crypto_ops[num_enqueued_ops], sizeof(crypto_ops[0]) * cryop_cnt);
memmove(src_mbufs, &src_mbufs[num_enqueued_ops], sizeof(src_mbufs[0]) * cryop_cnt);
if (!task->inplace) {
memmove(dst_mbufs, &dst_mbufs[num_enqueued_ops], sizeof(dst_mbufs[0]) * cryop_cnt);
}
accel_dpdk_cryptodev_update_resources_from_pools(crypto_ops, src_mbufs, inplace ? NULL : dst_mbufs,
num_enqueued_ops, cryop_cnt);
rc = 0;
goto free_ops;
default:
@ -735,7 +787,7 @@ accel_dpdk_cryptodev_process_task(struct accel_dpdk_cryptodev_io_channel *crypto
/* Error cleanup paths. */
free_ops:
if (!task->inplace) {
if (!inplace) {
/* This also releases chained mbufs if any. */
rte_pktmbuf_free_bulk(dst_mbufs, cryop_cnt);
}
@ -857,6 +909,7 @@ _accel_dpdk_cryptodev_create_cb(void *io_device, void *ctx_buf)
/* We use this to queue tasks when qpair is full or no resources in pools */
TAILQ_INIT(&crypto_ch->queued_tasks);
TAILQ_INIT(&crypto_ch->completed_tasks);
return 0;
}
@ -909,9 +962,17 @@ accel_dpdk_cryptodev_submit_tasks(struct spdk_io_channel *_ch, struct spdk_accel
}
rc = accel_dpdk_cryptodev_process_task(ch, task);
if (spdk_unlikely(rc == -ENOMEM)) {
TAILQ_INSERT_TAIL(&ch->queued_tasks, task, link);
rc = 0;
if (spdk_unlikely(rc)) {
if (rc == -ENOMEM) {
TAILQ_INSERT_TAIL(&ch->queued_tasks, task, link);
rc = 0;
} else if (rc == -EALREADY) {
/* -EALREADY means that a task is completed, but it might be unsafe to complete
* it if we are in the submission path. Hence put it into a dedicated queue to and
* process it during polling */
TAILQ_INSERT_TAIL(&ch->completed_tasks, task, link);
rc = 0;
}
}
return rc;

View File

@ -296,6 +296,7 @@ test_setup(void)
g_io_ch = calloc(1, sizeof(*g_io_ch) + sizeof(struct accel_dpdk_cryptodev_io_channel));
g_crypto_ch = (struct accel_dpdk_cryptodev_io_channel *)spdk_io_channel_get_ctx(g_io_ch);
TAILQ_INIT(&g_crypto_ch->queued_tasks);
TAILQ_INIT(&g_crypto_ch->completed_tasks);
g_aesni_crypto_dev.type = ACCEL_DPDK_CRYPTODEV_DRIVER_AESNI_MB;
g_aesni_crypto_dev.qp_desc_nr = ACCEL_DPDK_CRYPTODEV_QP_DESCRIPTORS;
@ -952,6 +953,7 @@ test_dev_full(void)
rc = accel_dpdk_cryptodev_submit_tasks(g_io_ch, &task.base);
CU_ASSERT(rc == 0);
CU_ASSERT(task.cryop_submitted == 1);
CU_ASSERT(task.cryop_total == 2);
sym_op = g_test_crypto_ops[0]->sym;
CU_ASSERT(sym_op->m_src->buf_addr == src_iov.iov_base);
CU_ASSERT(sym_op->m_src->data_len == 512);
@ -996,6 +998,61 @@ test_dev_full(void)
g_aesni_qp.num_enqueued_ops = 0;
TAILQ_INIT(&g_crypto_ch->queued_tasks);
/* Two element block size decryption, 2nd op was not submitted, but has RTE_CRYPTO_OP_STATUS_SUCCESS status */
g_aesni_qp.num_enqueued_ops = 0;
g_enqueue_mock = g_dequeue_mock = 1;
ut_rte_crypto_op_bulk_alloc = 2;
g_test_crypto_ops[1]->status = RTE_CRYPTO_OP_STATUS_SUCCESS;
rc = accel_dpdk_cryptodev_submit_tasks(g_io_ch, &task.base);
CU_ASSERT(rc == 0);
CU_ASSERT(task.cryop_total == 2);
CU_ASSERT(task.cryop_submitted == 2);
CU_ASSERT(task.cryop_completed == 1);
sym_op = g_test_crypto_ops[0]->sym;
CU_ASSERT(sym_op->m_src->buf_addr == src_iov.iov_base);
CU_ASSERT(sym_op->m_src->data_len == 512);
CU_ASSERT(sym_op->m_src->next == NULL);
CU_ASSERT(sym_op->cipher.data.length == 512);
CU_ASSERT(sym_op->cipher.data.offset == 0);
CU_ASSERT(*RTE_MBUF_DYNFIELD(sym_op->m_src, g_mbuf_offset, uint64_t *) == (uint64_t)&task);
CU_ASSERT(sym_op->m_dst == NULL);
/* op which was not submitted is already released */
rte_pktmbuf_free(g_test_crypto_ops[0]->sym->m_src);
/* Two element block size decryption, 1st op was not submitted, but has RTE_CRYPTO_OP_STATUS_SUCCESS status */
g_aesni_qp.num_enqueued_ops = 0;
g_enqueue_mock = g_dequeue_mock = 0;
ut_rte_crypto_op_bulk_alloc = 2;
g_test_crypto_ops[0]->status = RTE_CRYPTO_OP_STATUS_SUCCESS;
rc = accel_dpdk_cryptodev_submit_tasks(g_io_ch, &task.base);
CU_ASSERT(rc == 0);
CU_ASSERT(task.cryop_total == 2);
CU_ASSERT(task.cryop_submitted == 1);
CU_ASSERT(task.cryop_completed == 1);
CU_ASSERT(!TAILQ_EMPTY(&g_crypto_ch->queued_tasks));
CU_ASSERT(TAILQ_FIRST(&g_crypto_ch->queued_tasks) == &task);
TAILQ_INIT(&g_crypto_ch->queued_tasks);
/* Single element block size decryption, 1st op was not submitted, but has RTE_CRYPTO_OP_STATUS_SUCCESS status.
* Task should be queued in the completed_tasks list */
src_iov.iov_len = 512;
dst_iov.iov_len = 512;
g_aesni_qp.num_enqueued_ops = 0;
g_enqueue_mock = g_dequeue_mock = 0;
ut_rte_crypto_op_bulk_alloc = 1;
g_test_crypto_ops[0]->status = RTE_CRYPTO_OP_STATUS_SUCCESS;
rc = accel_dpdk_cryptodev_submit_tasks(g_io_ch, &task.base);
CU_ASSERT(rc == 0);
CU_ASSERT(task.cryop_total == 1);
CU_ASSERT(task.cryop_submitted == 1);
CU_ASSERT(task.cryop_completed == 1);
CU_ASSERT(!TAILQ_EMPTY(&g_crypto_ch->completed_tasks));
CU_ASSERT(TAILQ_FIRST(&g_crypto_ch->completed_tasks) == &task);
TAILQ_INIT(&g_crypto_ch->completed_tasks);
}
static void
@ -1388,6 +1445,14 @@ test_poller(void)
CU_ASSERT(g_aesni_qp.num_enqueued_ops == 1);
CU_ASSERT(TAILQ_EMPTY(&g_crypto_ch->queued_tasks));
rte_pktmbuf_free(g_test_crypto_ops[0]->sym->m_src);
/* Complete tasks in the dedicated list */
g_dequeue_mock = g_enqueue_mock = 0;
CU_ASSERT(TAILQ_EMPTY(&g_crypto_ch->completed_tasks));
TAILQ_INSERT_TAIL(&g_crypto_ch->completed_tasks, &task, link);
rc = accel_dpdk_cryptodev_poller(g_crypto_ch);
CU_ASSERT(rc == 1);
CU_ASSERT(TAILQ_EMPTY(&g_crypto_ch->completed_tasks));
}
/* Helper function for accel_dpdk_cryptodev_assign_device_qps() */