dpdk_cryptodev: Check queue capacity before submitting a task

When we submit more tasks than supported by qp,
extra tasks are queued on io_channel. Later completion
poller tries to resubmit these tasks one by one. That
is not efficient since every enqueu_burst may cause
doorbell updates in HW.

Instead add a check for qpir capacity and submit
appropriate number of requests. If qpair is full,
tasks are queued in dedicated list. This approach
should remove or minimize the need to resubmit
individual crypto operations.

This also handles a case where there are no entries
in global pools (crypto_ops or rte_mbuf)

Fixes issue #2756

Signed-off-by: Alexey Marchuk <alexeymar@nvidia.com>
Change-Id: Iab50e623e7a82a4f5bef7a1e4434e593240ab633
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/15769
Tested-by: SPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: Ben Walker <benjamin.walker@intel.com>
Reviewed-by: Jim Harris <james.r.harris@intel.com>
Reviewed-by: Paul Luse <paul.e.luse@intel.com>
This commit is contained in:
Alexey Marchuk 2022-12-02 16:58:20 +01:00 committed by Ben Walker
parent bf8e0656e8
commit 8f36853a84
2 changed files with 131 additions and 10 deletions

View File

@ -162,8 +162,10 @@ struct accel_dpdk_cryptodev_io_channel {
struct spdk_poller *poller;
/* Array of qpairs for each available device. The specific device will be selected depending on the crypto key */
struct accel_dpdk_cryptodev_qp *device_qp[ACCEL_DPDK_CRYPTODEV_DRIVER_LAST];
/* queued for re-submission to CryptoDev */
/* queued for re-submission to CryptoDev. Used when for some reason crypto op was not processed by the driver */
TAILQ_HEAD(, accel_dpdk_cryptodev_queued_op) queued_cry_ops;
/* Used to queue tasks when qpair is full. No crypto operation was submitted to the driver by the task */
TAILQ_HEAD(, accel_dpdk_cryptodev_task) queued_tasks;
};
struct accel_dpdk_cryptodev_task {
@ -334,6 +336,10 @@ accel_dpdk_cryptodev_poll_qp(struct accel_dpdk_cryptodev_qp *qp,
int rc = accel_dpdk_cryptodev_process_task(crypto_ch, task);
if (spdk_unlikely(rc)) {
if (rc == -ENOMEM) {
TAILQ_INSERT_TAIL(&crypto_ch->queued_tasks, task, link);
continue;
}
spdk_accel_task_complete(&task->base, rc);
}
}
@ -361,11 +367,12 @@ accel_dpdk_cryptodev_poller(void *args)
{
struct accel_dpdk_cryptodev_io_channel *crypto_ch = args;
struct accel_dpdk_cryptodev_qp *qp;
struct accel_dpdk_cryptodev_task *task;
struct accel_dpdk_cryptodev_queued_op *op_to_resubmit;
struct accel_dpdk_cryptodev_task *task, *task_tmp;
struct accel_dpdk_cryptodev_queued_op *op_to_resubmit, *op_to_resubmit_tmp;
TAILQ_HEAD(, accel_dpdk_cryptodev_task) queued_tasks_tmp;
uint32_t num_dequeued_ops = 0, num_enqueued_ops = 0;
uint16_t enqueued;
int i;
int i, rc;
for (i = 0; i < ACCEL_DPDK_CRYPTODEV_DRIVER_LAST; i++) {
qp = crypto_ch->device_qp[i];
@ -376,10 +383,12 @@ accel_dpdk_cryptodev_poller(void *args)
}
/* Check if there are any queued crypto ops to process */
while (!TAILQ_EMPTY(&crypto_ch->queued_cry_ops)) {
op_to_resubmit = TAILQ_FIRST(&crypto_ch->queued_cry_ops);
TAILQ_FOREACH_SAFE(op_to_resubmit, &crypto_ch->queued_cry_ops, link, op_to_resubmit_tmp) {
task = op_to_resubmit->task;
qp = op_to_resubmit->qp;
if (qp->num_enqueued_ops == qp->device->qp_desc_nr) {
continue;
}
enqueued = rte_cryptodev_enqueue_burst(qp->device->cdev_id,
qp->qp,
&op_to_resubmit->crypto_op,
@ -409,6 +418,28 @@ accel_dpdk_cryptodev_poller(void *args)
}
}
if (!TAILQ_EMPTY(&crypto_ch->queued_tasks)) {
TAILQ_INIT(&queued_tasks_tmp);
TAILQ_FOREACH_SAFE(task, &crypto_ch->queued_tasks, link, task_tmp) {
TAILQ_REMOVE(&crypto_ch->queued_tasks, task, link);
rc = accel_dpdk_cryptodev_process_task(crypto_ch, task);
if (spdk_unlikely(rc)) {
if (rc == -ENOMEM) {
TAILQ_INSERT_TAIL(&queued_tasks_tmp, task, link);
/* Other queued tasks may belong to other qpairs,
* so process the whole list */
continue;
}
spdk_accel_task_complete(&task->base, rc);
} else {
num_enqueued_ops++;
}
}
TAILQ_SWAP(&crypto_ch->queued_tasks, &queued_tasks_tmp, accel_dpdk_cryptodev_task, link);
}
return !!(num_dequeued_ops + num_enqueued_ops);
}
@ -603,6 +634,7 @@ accel_dpdk_cryptodev_process_task(struct accel_dpdk_cryptodev_io_channel *crypto
uint32_t crypto_len = task->base.block_size;
uint64_t dst_length, total_length;
uint32_t sgl_offset;
uint32_t qp_capacity;
uint64_t iv_start;
struct accel_dpdk_cryptodev_queued_op *op_to_queue;
uint32_t i, crypto_index;
@ -663,6 +695,14 @@ accel_dpdk_cryptodev_process_task(struct accel_dpdk_cryptodev_io_channel *crypto
assert(qp);
dev = qp->device;
assert(dev);
assert(dev->qp_desc_nr >= qp->num_enqueued_ops);
qp_capacity = dev->qp_desc_nr - qp->num_enqueued_ops;
cryop_cnt = spdk_min(cryop_cnt, qp_capacity);
if (spdk_unlikely(cryop_cnt == 0)) {
/* QP is full */
return -ENOMEM;
}
key_handle = accel_dpdk_find_key_handle_in_channel(crypto_ch, priv);
if (spdk_unlikely(!key_handle)) {
@ -734,8 +774,7 @@ accel_dpdk_cryptodev_process_task(struct accel_dpdk_cryptodev_io_channel *crypto
/* Enqueue everything we've got but limit by the max number of descriptors we
* configured the crypto device for.
*/
num_enqueued_ops = rte_cryptodev_enqueue_burst(dev->cdev_id, qp->qp, crypto_ops, spdk_min(cryop_cnt,
dev->qp_desc_nr));
num_enqueued_ops = rte_cryptodev_enqueue_burst(dev->cdev_id, qp->qp, crypto_ops, cryop_cnt);
qp->num_enqueued_ops += num_enqueued_ops;
/* We were unable to enqueue everything but did get some, so need to decide what
@ -897,6 +936,8 @@ _accel_dpdk_cryptodev_create_cb(void *io_device, void *ctx_buf)
/* We use this to queue up crypto ops when the device is busy. */
TAILQ_INIT(&crypto_ch->queued_cry_ops);
/* We use this to queue tasks when qpair is full or no resources in pools */
TAILQ_INIT(&crypto_ch->queued_tasks);
return 0;
}
@ -931,6 +972,7 @@ accel_dpdk_cryptodev_submit_tasks(struct spdk_io_channel *_ch, struct spdk_accel
struct accel_dpdk_cryptodev_task *task = SPDK_CONTAINEROF(_task, struct accel_dpdk_cryptodev_task,
base);
struct accel_dpdk_cryptodev_io_channel *ch = spdk_io_channel_get_ctx(_ch);
int rc;
task->cryop_completed = 0;
task->cryop_submitted = 0;
@ -947,7 +989,13 @@ accel_dpdk_cryptodev_submit_tasks(struct spdk_io_channel *_ch, struct spdk_accel
task->inplace = false;
}
return accel_dpdk_cryptodev_process_task(ch, task);
rc = accel_dpdk_cryptodev_process_task(ch, task);
if (spdk_unlikely(rc == -ENOMEM)) {
TAILQ_INSERT_TAIL(&ch->queued_tasks, task, link);
rc = 0;
}
return rc;
}
/* Dummy function used by DPDK to free ext attached buffers to mbufs, we free them ourselves but

View File

@ -284,6 +284,7 @@ test_setup(void)
g_io_ch = calloc(1, sizeof(*g_io_ch) + sizeof(struct accel_dpdk_cryptodev_io_channel));
g_crypto_ch = (struct accel_dpdk_cryptodev_io_channel *)spdk_io_channel_get_ctx(g_io_ch);
TAILQ_INIT(&g_crypto_ch->queued_cry_ops);
TAILQ_INIT(&g_crypto_ch->queued_tasks);
g_aesni_crypto_dev.type = ACCEL_DPDK_CRYPTODEV_DRIVER_AESNI_MB;
g_aesni_crypto_dev.qp_desc_nr = ACCEL_DPDK_CRYPTODEV_QP_DESCRIPTORS;
@ -406,9 +407,13 @@ test_error_paths(void)
/* case 5 - no entries in g_mbuf_mp */
MOCK_SET(spdk_mempool_get, NULL);
CU_ASSERT(TAILQ_EMPTY(&g_crypto_ch->queued_tasks) == true);
rc = accel_dpdk_cryptodev_submit_tasks(g_io_ch, &task.base);
CU_ASSERT(rc == -ENOMEM);
CU_ASSERT(rc == 0);
CU_ASSERT(TAILQ_EMPTY(&g_crypto_ch->queued_tasks) == false);
CU_ASSERT(TAILQ_FIRST(&g_crypto_ch->queued_tasks) == &task);
MOCK_CLEAR(spdk_mempool_get);
TAILQ_INIT(&g_crypto_ch->queued_tasks);
/* case 6 - vtophys error in accel_dpdk_cryptodev_mbuf_attach_buf */
MOCK_SET(spdk_vtophys, SPDK_VTOPHYS_ERROR);
@ -437,6 +442,7 @@ test_simple_encrypt(void)
g_enqueue_mock = g_dequeue_mock = ut_rte_crypto_op_bulk_alloc = 1;
/* Inplace encryption */
g_aesni_qp.num_enqueued_ops = 0;
rc = accel_dpdk_cryptodev_submit_tasks(g_io_ch, &task.base);
CU_ASSERT(rc == 0);
CU_ASSERT(task.cryop_submitted == 1);
@ -452,6 +458,7 @@ test_simple_encrypt(void)
rte_pktmbuf_free(g_test_crypto_ops[0]->sym->m_src);
/* out-of-place encryption */
g_aesni_qp.num_enqueued_ops = 0;
task.cryop_submitted = 0;
dst_iov.iov_base = (void *)0xFEEDBEEF;
@ -472,6 +479,7 @@ test_simple_encrypt(void)
rte_pktmbuf_free(g_test_crypto_ops[0]->sym->m_dst);
/* out-of-place encryption, fragmented payload */
g_aesni_qp.num_enqueued_ops = 0;
task.base.s.iovcnt = 4;
for (i = 0; i < 4; i++) {
src_iov[i].iov_base = (void *)0xDEADBEEF + i * 128;
@ -555,6 +563,7 @@ test_simple_decrypt(void)
g_enqueue_mock = g_dequeue_mock = ut_rte_crypto_op_bulk_alloc = 1;
/* Inplace decryption */
g_aesni_qp.num_enqueued_ops = 0;
rc = accel_dpdk_cryptodev_submit_tasks(g_io_ch, &task.base);
CU_ASSERT(rc == 0);
CU_ASSERT(task.cryop_submitted == 1);
@ -570,6 +579,7 @@ test_simple_decrypt(void)
rte_pktmbuf_free(g_test_crypto_ops[0]->sym->m_src);
/* out-of-place decryption */
g_aesni_qp.num_enqueued_ops = 0;
task.cryop_submitted = 0;
dst_iov.iov_base = (void *)0xFEEDBEEF;
@ -590,6 +600,7 @@ test_simple_decrypt(void)
rte_pktmbuf_free(g_test_crypto_ops[0]->sym->m_dst);
/* out-of-place decryption, fragmented payload */
g_aesni_qp.num_enqueued_ops = 0;
task.base.s.iovcnt = 4;
for (i = 0; i < 4; i++) {
src_iov[i].iov_base = (void *)0xDEADBEEF + i * 128;
@ -675,6 +686,7 @@ test_large_enc_dec(void)
task.base.iv = 1;
/* Test 1. Multi block size decryption, multi-element, inplace */
g_aesni_qp.num_enqueued_ops = 0;
g_enqueue_mock = g_dequeue_mock = ut_rte_crypto_op_bulk_alloc = num_blocks;
rc = accel_dpdk_cryptodev_submit_tasks(g_io_ch, &task.base);
@ -697,6 +709,7 @@ test_large_enc_dec(void)
}
/* Call accel_dpdk_cryptodev_process_task like it was called by completion poller */
g_aesni_qp.num_enqueued_ops = 0;
task.cryop_completed = task.cryop_submitted;
rc = accel_dpdk_cryptodev_process_task(g_crypto_ch, &task);
@ -718,6 +731,7 @@ test_large_enc_dec(void)
}
/* Test 2. Multi block size decryption, multi-element, out-of-place */
g_aesni_qp.num_enqueued_ops = 0;
dst_iov.iov_base = (void *)0xFEEDBEEF;
g_enqueue_mock = g_dequeue_mock = ut_rte_crypto_op_bulk_alloc = num_blocks;
@ -744,6 +758,7 @@ test_large_enc_dec(void)
}
/* Call accel_dpdk_cryptodev_process_task like it was called by completion poller */
g_aesni_qp.num_enqueued_ops = 0;
task.cryop_completed = task.cryop_submitted;
rc = accel_dpdk_cryptodev_process_task(g_crypto_ch, &task);
@ -769,6 +784,7 @@ test_large_enc_dec(void)
}
/* Test 3. Multi block size encryption, multi-element, inplace */
g_aesni_qp.num_enqueued_ops = 0;
dst_iov = src_iov;
task.base.op_code = ACCEL_OPC_ENCRYPT;
task.cryop_submitted = 0;
@ -794,6 +810,7 @@ test_large_enc_dec(void)
}
/* Call accel_dpdk_cryptodev_process_task like it was called by completion poller */
g_aesni_qp.num_enqueued_ops = 0;
task.cryop_completed = task.cryop_submitted;
rc = accel_dpdk_cryptodev_process_task(g_crypto_ch, &task);
@ -815,6 +832,7 @@ test_large_enc_dec(void)
}
/* Multi block size encryption, multi-element, out-of-place */
g_aesni_qp.num_enqueued_ops = 0;
task.cryop_submitted = 0;
dst_iov.iov_base = (void *)0xFEEDBEEF;
g_enqueue_mock = g_dequeue_mock = ut_rte_crypto_op_bulk_alloc = num_blocks;
@ -841,6 +859,7 @@ test_large_enc_dec(void)
}
/* Call accel_dpdk_cryptodev_process_task like it was called by completion poller */
g_aesni_qp.num_enqueued_ops = 0;
task.cryop_completed = task.cryop_submitted;
rc = accel_dpdk_cryptodev_process_task(g_crypto_ch, &task);
@ -886,6 +905,7 @@ test_dev_full(void)
task.base.iv = 1;
/* Two element block size decryption */
g_aesni_qp.num_enqueued_ops = 0;
g_enqueue_mock = g_dequeue_mock = 1;
ut_rte_crypto_op_bulk_alloc = 2;
@ -924,9 +944,20 @@ test_dev_full(void)
/* Non-busy reason for enqueue failure, all were rejected. */
g_enqueue_mock = 0;
g_aesni_qp.num_enqueued_ops = 0;
g_test_crypto_ops[0]->status = RTE_CRYPTO_OP_STATUS_ERROR;
rc = accel_dpdk_cryptodev_submit_tasks(g_io_ch, &task.base);
CU_ASSERT(rc == -EINVAL);
/* QP is full, task should be queued */
g_aesni_qp.num_enqueued_ops = g_aesni_crypto_dev.qp_desc_nr;
CU_ASSERT(TAILQ_EMPTY(&g_crypto_ch->queued_tasks) == true);
rc = accel_dpdk_cryptodev_submit_tasks(g_io_ch, &task.base);
CU_ASSERT(rc == 0);
CU_ASSERT(!TAILQ_EMPTY(&g_crypto_ch->queued_tasks));
CU_ASSERT(TAILQ_FIRST(&g_crypto_ch->queued_tasks) == &task);
TAILQ_INIT(&g_crypto_ch->queued_tasks);
}
static void
@ -953,6 +984,7 @@ test_crazy_rw(void)
/* Multi block size read, single element, strange IOV makeup */
g_enqueue_mock = g_dequeue_mock = ut_rte_crypto_op_bulk_alloc = num_blocks;
g_aesni_qp.num_enqueued_ops = 0;
rc = accel_dpdk_cryptodev_submit_tasks(g_io_ch, &task.base);
CU_ASSERT(rc == 0);
@ -986,6 +1018,7 @@ test_crazy_rw(void)
task.base.s.iovs[3].iov_base = (void *)0xDEADBEEF + 2048 + 512 + 512;
g_enqueue_mock = g_dequeue_mock = ut_rte_crypto_op_bulk_alloc = num_blocks;
g_aesni_qp.num_enqueued_ops = 0;
rc = accel_dpdk_cryptodev_submit_tasks(g_io_ch, &task.base);
CU_ASSERT(rc == 0);
@ -1219,6 +1252,7 @@ test_poller(void)
/* test regular 1 op to dequeue and complete */
g_dequeue_mock = g_enqueue_mock = 1;
g_aesni_qp.num_enqueued_ops = 1;
rte_pktmbuf_alloc_bulk(g_mbuf_mp, src_mbufs, 1);
g_test_crypto_ops[0]->sym->m_src = src_mbufs[0];
*RTE_MBUF_DYNFIELD(g_test_crypto_ops[0]->sym->m_src, g_mbuf_offset,
@ -1231,6 +1265,7 @@ test_poller(void)
rc = accel_dpdk_cryptodev_poller(g_crypto_ch);
CU_ASSERT(rc == 1);
CU_ASSERT(task.cryop_completed == task.cryop_submitted);
CU_ASSERT(g_aesni_qp.num_enqueued_ops == 0);
/* We have nothing dequeued but have some to resubmit */
g_dequeue_mock = 0;
@ -1254,9 +1289,11 @@ test_poller(void)
g_resubmit_test = false;
CU_ASSERT(rc == 1);
CU_ASSERT(TAILQ_EMPTY(&g_crypto_ch->queued_cry_ops) == true);
CU_ASSERT(g_aesni_qp.num_enqueued_ops == 1);
/* 2 to dequeue but 2nd one failed */
g_dequeue_mock = g_enqueue_mock = 2;
g_aesni_qp.num_enqueued_ops = 2;
task.cryop_submitted = 2;
task.cryop_total = 2;
task.cryop_completed = 0;
@ -1274,12 +1311,14 @@ test_poller(void)
rc = accel_dpdk_cryptodev_poller(g_crypto_ch);
CU_ASSERT(task.is_failed == true);
CU_ASSERT(rc == 1);
CU_ASSERT(g_aesni_qp.num_enqueued_ops == 0);
/* Dequeue a task which needs to be submitted again */
g_dequeue_mock = g_enqueue_mock = ut_rte_crypto_op_bulk_alloc = 1;
task.cryop_submitted = 1;
task.cryop_total = 2;
task.cryop_completed = 0;
g_aesni_qp.num_enqueued_ops = 1;
rte_pktmbuf_alloc_bulk(g_mbuf_mp, src_mbufs, 1);
SPDK_CU_ASSERT_FATAL(src_mbufs[0] != NULL);
g_test_crypto_ops[0]->sym->m_src = src_mbufs[0];
@ -1299,6 +1338,40 @@ test_poller(void)
CU_ASSERT(*RTE_MBUF_DYNFIELD(g_test_crypto_ops[0]->sym->m_src, g_mbuf_offset,
uint64_t *) == (uint64_t)&task);
CU_ASSERT(g_test_crypto_ops[0]->sym->m_dst == NULL);
CU_ASSERT(g_aesni_qp.num_enqueued_ops == 1);
rte_pktmbuf_free(g_test_crypto_ops[0]->sym->m_src);
/* Process queued tasks, qp is full */
g_dequeue_mock = g_enqueue_mock = 0;
g_aesni_qp.num_enqueued_ops = g_aesni_crypto_dev.qp_desc_nr;
task.cryop_submitted = 1;
task.cryop_total = 2;
task.cryop_completed = 1;
CU_ASSERT(TAILQ_EMPTY(&g_crypto_ch->queued_tasks));
TAILQ_INSERT_TAIL(&g_crypto_ch->queued_tasks, &task, link);
rc = accel_dpdk_cryptodev_poller(g_crypto_ch);
CU_ASSERT(rc == 0);
CU_ASSERT(TAILQ_FIRST(&g_crypto_ch->queued_tasks) == &task);
/* Try again when queue is empty, task should be submitted */
g_enqueue_mock = 1;
g_aesni_qp.num_enqueued_ops = 0;
rc = accel_dpdk_cryptodev_poller(g_crypto_ch);
CU_ASSERT(rc == 1);
CU_ASSERT(task.cryop_submitted == 2);
CU_ASSERT(task.cryop_total == 2);
CU_ASSERT(task.cryop_completed == 1);
CU_ASSERT(g_test_crypto_ops[0]->sym->m_src->buf_addr == src_iov.iov_base + task.base.block_size);
CU_ASSERT(g_test_crypto_ops[0]->sym->m_src->data_len == task.base.block_size);
CU_ASSERT(g_test_crypto_ops[0]->sym->m_src->next == NULL);
CU_ASSERT(g_test_crypto_ops[0]->sym->cipher.data.length == task.base.block_size);
CU_ASSERT(g_test_crypto_ops[0]->sym->cipher.data.offset == 0);
CU_ASSERT(*RTE_MBUF_DYNFIELD(g_test_crypto_ops[0]->sym->m_src, g_mbuf_offset,
uint64_t *) == (uint64_t)&task);
CU_ASSERT(g_test_crypto_ops[0]->sym->m_dst == NULL);
CU_ASSERT(g_aesni_qp.num_enqueued_ops == 1);
CU_ASSERT(TAILQ_EMPTY(&g_crypto_ch->queued_tasks));
rte_pktmbuf_free(g_test_crypto_ops[0]->sym->m_src);
}