diff --git a/module/bdev/crypto/vbdev_crypto.c b/module/bdev/crypto/vbdev_crypto.c index d0d390c50..31135b964 100644 --- a/module/bdev/crypto/vbdev_crypto.c +++ b/module/bdev/crypto/vbdev_crypto.c @@ -121,6 +121,7 @@ static pthread_mutex_t g_device_qp_lock = PTHREAD_MUTEX_INITIALIZER; /* Common for suported devices. */ #define IV_OFFSET (sizeof(struct rte_crypto_op) + \ sizeof(struct rte_crypto_sym_op)) +#define QUEUED_OP_OFFSET (IV_OFFSET + AES_CBC_IV_LENGTH) static void _complete_internal_io(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); static void _complete_internal_read(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); @@ -165,6 +166,16 @@ static struct rte_mempool *g_session_mp_priv = NULL; static struct spdk_mempool *g_mbuf_mp = NULL; /* mbuf mempool */ static struct rte_mempool *g_crypto_op_mp = NULL; /* crypto operations, must be rte* mempool */ +/* For queueing up crypto operations that we can't submit for some reason */ +struct vbdev_crypto_op { + uint8_t cdev_id; + uint8_t qp; + struct rte_crypto_op *crypto_op; + struct spdk_bdev_io *bdev_io; + TAILQ_ENTRY(vbdev_crypto_op) link; +}; +#define QUEUED_OP_LENGTH (sizeof(struct vbdev_crypto_op)) + /* The crypto vbdev channel struct. It is allocated and freed on my behalf by the io channel code. * We store things in here that are needed on per thread basis like the base_channel for this thread, * and the poller for this thread. @@ -175,6 +186,7 @@ struct crypto_io_channel { struct device_qp *device_qp; /* unique device/qp combination for this channel */ TAILQ_HEAD(, spdk_bdev_io) pending_cry_ios; /* outstanding operations to the crypto device */ struct spdk_io_channel_iter *iter; /* used with for_each_channel in reset */ + TAILQ_HEAD(, vbdev_crypto_op) queued_cry_ops; /* queued for re-submission to CryptoDev */ }; /* This is the crypto per IO context that the bdev layer allocates for us opaquely and attaches to @@ -187,7 +199,7 @@ struct crypto_bdev_io { struct spdk_bdev_io *orig_io; /* the original IO */ struct spdk_bdev_io *read_io; /* the read IO we issued */ int8_t bdev_io_status; /* the status we'll report back on the bdev IO */ - + bool on_pending_list; /* Used for the single contiguous buffer that serves as the crypto destination target for writes */ uint64_t cry_num_blocks; /* num of blocks for the contiguous buffer */ uint64_t cry_offset_blocks; /* block offset on media */ @@ -390,12 +402,16 @@ vbdev_crypto_init_crypto_drivers(void) goto error_create_mbuf; } + /* We use per op private data to store the IV and our own struct + * for queueing ops. + */ g_crypto_op_mp = rte_crypto_op_pool_create("op_mp", RTE_CRYPTO_OP_TYPE_SYMMETRIC, NUM_MBUFS, POOL_CACHE_SIZE, - AES_CBC_IV_LENGTH, + AES_CBC_IV_LENGTH + QUEUED_OP_LENGTH, rte_socket_id()); + if (g_crypto_op_mp == NULL) { SPDK_ERRLOG("Cannot create op pool\n"); rc = -ENOMEM; @@ -485,6 +501,9 @@ _crypto_operation_complete(struct spdk_bdev_io *bdev_io) } } +static int _crypto_operation(struct spdk_bdev_io *bdev_io, + enum rte_crypto_cipher_operation crypto_op); + /* This is the poller for the crypto device. It uses a single API to dequeue whatever is ready at * the device. Then we need to decide if what we've got so far (including previous poller * runs) totals up to one or more complete bdev_ios and if so continue with the bdev_io @@ -495,12 +514,13 @@ crypto_dev_poller(void *args) { struct crypto_io_channel *crypto_ch = args; uint8_t cdev_id = crypto_ch->device_qp->device->cdev_id; - int i, num_dequeued_ops; + int i, num_dequeued_ops, num_enqueued_ops; struct spdk_bdev_io *bdev_io = NULL; struct crypto_bdev_io *io_ctx = NULL; struct rte_crypto_op *dequeued_ops[MAX_DEQUEUE_BURST_SIZE]; struct rte_crypto_op *mbufs_to_free[2 * MAX_DEQUEUE_BURST_SIZE]; int num_mbufs = 0; + struct vbdev_crypto_op *op_to_resubmit; /* Each run of the poller will get just what the device has available * at the moment we call it, we don't check again after draining the @@ -567,6 +587,29 @@ crypto_dev_poller(void *args) num_mbufs); } + /* Check if there are any pending crypto ops to process */ + while (!TAILQ_EMPTY(&crypto_ch->queued_cry_ops)) { + op_to_resubmit = TAILQ_FIRST(&crypto_ch->queued_cry_ops); + io_ctx = (struct crypto_bdev_io *)op_to_resubmit->bdev_io->driver_ctx; + num_enqueued_ops = rte_cryptodev_enqueue_burst(op_to_resubmit->cdev_id, + op_to_resubmit->qp, + &op_to_resubmit->crypto_op, + 1); + if (num_enqueued_ops == 1) { + /* Make sure we don't put this on twice as one bdev_io is made up + * of many crypto ops. + */ + if (io_ctx->on_pending_list == false) { + TAILQ_INSERT_TAIL(&crypto_ch->pending_cry_ios, op_to_resubmit->bdev_io, module_link); + io_ctx->on_pending_list = true; + } + TAILQ_REMOVE(&crypto_ch->queued_cry_ops, op_to_resubmit, link); + } else { + /* if we couldn't get one, just break and try again later. */ + break; + } + } + /* If the channel iter is not NULL, we need to continue to poll * until the pending list is empty, then we can move on to the * next channel. @@ -592,19 +635,18 @@ _crypto_operation(struct spdk_bdev_io *bdev_io, enum rte_crypto_cipher_operation uint32_t crypto_len = io_ctx->crypto_bdev->crypto_bdev.blocklen; uint64_t total_length = bdev_io->u.bdev.num_blocks * crypto_len; int rc; - uint32_t enqueued = 0; uint32_t iov_index = 0; uint32_t allocated = 0; uint8_t *current_iov = NULL; uint64_t total_remaining = 0; uint64_t updated_length, current_iov_remaining = 0; - int completed = 0; - int crypto_index = 0; + uint32_t crypto_index = 0; uint32_t en_offset = 0; struct rte_crypto_op *crypto_ops[MAX_ENQUEUE_ARRAY_SIZE]; struct rte_mbuf *src_mbufs[MAX_ENQUEUE_ARRAY_SIZE]; struct rte_mbuf *dst_mbufs[MAX_ENQUEUE_ARRAY_SIZE]; int burst; + struct vbdev_crypto_op *op_to_queue; assert((bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen) <= CRYPTO_MAX_IO); @@ -771,30 +813,33 @@ _crypto_operation(struct spdk_bdev_io *bdev_io, enum rte_crypto_cipher_operation /* Enqueue everything we've got but limit by the max number of descriptors we * configured the crypto device for. */ - do { - burst = spdk_min((cryop_cnt - enqueued), CRYPTO_QP_DESCRIPTORS); - num_enqueued_ops = rte_cryptodev_enqueue_burst(cdev_id, crypto_ch->device_qp->qp, - &crypto_ops[enqueued], - burst); - enqueued += num_enqueued_ops; + burst = spdk_min(cryop_cnt, CRYPTO_QP_DESCRIPTORS); + num_enqueued_ops = rte_cryptodev_enqueue_burst(cdev_id, crypto_ch->device_qp->qp, + &crypto_ops[0], + burst); - /* Dequeue all inline if the device is full. We don't defer anything simply - * because of the complexity involved as we're building 1 or more crypto - * ops per IO. Dequeue will free up space for more enqueue. - */ - if (enqueued < cryop_cnt) { - - /* Dequeue everything, this may include ops that were already - * in the device before this submission.... - */ - do { - completed = crypto_dev_poller(crypto_ch); - } while (completed > 0); + /* If any didn't get submitted, queue them for the poller to de-queue later. */ + if (num_enqueued_ops < cryop_cnt) { + for (crypto_index = num_enqueued_ops; crypto_index < cryop_cnt; crypto_index++) { + op_to_queue = (struct vbdev_crypto_op *)rte_crypto_op_ctod_offset(crypto_ops[crypto_index], + uint8_t *, + QUEUED_OP_OFFSET); + op_to_queue->cdev_id = cdev_id; + op_to_queue->qp = crypto_ch->device_qp->qp; + op_to_queue->crypto_op = crypto_ops[crypto_index]; + op_to_queue->bdev_io = bdev_io; + TAILQ_INSERT_TAIL(&crypto_ch->queued_cry_ops, + op_to_queue, + link); } - } while (enqueued < cryop_cnt); - /* Add this bdev_io to our outstanding list. */ - TAILQ_INSERT_TAIL(&crypto_ch->pending_cry_ios, bdev_io, module_link); + } + + /* Add this bdev_io to our outstanding list if any of its crypto ops made it. */ + if (num_enqueued_ops > 0) { + TAILQ_INSERT_TAIL(&crypto_ch->pending_cry_ios, bdev_io, module_link); + io_ctx->on_pending_list = true; + } return rc; @@ -1188,6 +1233,9 @@ crypto_bdev_ch_create_cb(void *io_device, void *ctx_buf) /* We use this queue to track outstanding IO in our layer. */ TAILQ_INIT(&crypto_ch->pending_cry_ios); + /* We use this to queue up crypto ops when the device is busy. */ + TAILQ_INIT(&crypto_ch->queued_cry_ops); + return 0; } diff --git a/test/unit/lib/bdev/crypto.c/crypto_ut.c b/test/unit/lib/bdev/crypto.c/crypto_ut.c index e57a8b3ed..38a0599c1 100644 --- a/test/unit/lib/bdev/crypto.c/crypto_ut.c +++ b/test/unit/lib/bdev/crypto.c/crypto_ut.c @@ -55,6 +55,7 @@ bool ut_rte_cryptodev_info_get_mocked = false; * mock them straight away. We use defines to redirect them into * our custom functions. */ +static bool g_resubmit_test = false; #define rte_cryptodev_enqueue_burst mock_rte_cryptodev_enqueue_burst static inline uint16_t mock_rte_cryptodev_enqueue_burst(uint8_t dev_id, uint16_t qp_id, @@ -69,38 +70,29 @@ mock_rte_cryptodev_enqueue_burst(uint8_t dev_id, uint16_t qp_id, * enqueued operations for assertion in dev_full test. */ g_test_dev_full_ops[i] = *ops++; + if (g_resubmit_test == true) { + CU_ASSERT(g_test_dev_full_ops[i] == (void *)0xDEADBEEF); + } } return g_enqueue_mock; } -/* This is pretty ugly but in order to complete an IO via the - * poller in the submit path, we need to first call to this func - * to return the dequeued value and also decrement it. On the subsequent - * call it needs to return 0 to indicate to the caller that there are - * no more IOs to drain. - */ -int g_test_overflow = 0; #define rte_cryptodev_dequeue_burst mock_rte_cryptodev_dequeue_burst static inline uint16_t mock_rte_cryptodev_dequeue_burst(uint8_t dev_id, uint16_t qp_id, struct rte_crypto_op **ops, uint16_t nb_ops) { + int i; + CU_ASSERT(nb_ops > 0); - /* A crypto device can be full on enqueue, the driver is designed to drain - * the device at the time by calling the poller until it's empty, then - * submitting the remaining crypto ops. - */ - if (g_test_overflow) { - if (g_dequeue_mock == 0) { - return 0; - } - *ops = g_test_crypto_ops[g_enqueue_mock]; + for (i = 0; i < g_dequeue_mock; i++) { + *ops = g_test_crypto_ops[i]; (*ops)->status = RTE_CRYPTO_OP_STATUS_SUCCESS; - g_dequeue_mock -= 1; } - return (g_dequeue_mock + 1); + + return g_dequeue_mock; } /* Instead of allocating real memory, assign the allocations to our @@ -322,6 +314,7 @@ test_setup(void) g_test_config = calloc(1, sizeof(struct rte_config)); g_test_config->lcore_count = 1; TAILQ_INIT(&g_crypto_ch->pending_cry_ios); + TAILQ_INIT(&g_crypto_ch->queued_cry_ops); /* Allocate a real mbuf pool so we can test error paths */ g_mbuf_mp = spdk_mempool_create("mbuf_mp", NUM_MBUFS, sizeof(struct rte_mbuf), @@ -334,12 +327,12 @@ test_setup(void) for (i = 0; i < MAX_TEST_BLOCKS; i++) { rc = posix_memalign((void **)&g_test_crypto_ops[i], 64, sizeof(struct rte_crypto_op) + sizeof(struct rte_crypto_sym_op) + - AES_CBC_IV_LENGTH); + AES_CBC_IV_LENGTH + QUEUED_OP_LENGTH); if (rc != 0) { assert(false); } memset(g_test_crypto_ops[i], 0, sizeof(struct rte_crypto_op) + - sizeof(struct rte_crypto_sym_op)); + sizeof(struct rte_crypto_sym_op) + QUEUED_OP_LENGTH); } return 0; } @@ -544,45 +537,54 @@ test_large_rw(void) static void test_dev_full(void) { - unsigned block_len = 512; - unsigned num_blocks = 2; - unsigned io_len = block_len * num_blocks; - unsigned i; + struct vbdev_crypto_op *queued_op; + struct rte_crypto_sym_op *sym_op; - g_test_overflow = 1; - - /* Multi block size read, multi-element */ + /* Two element block size read */ g_bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; g_bdev_io->u.bdev.iovcnt = 1; - g_bdev_io->u.bdev.num_blocks = num_blocks; - g_bdev_io->u.bdev.iovs[0].iov_len = io_len; - g_bdev_io->u.bdev.iovs[0].iov_base = &test_dev_full; - g_crypto_bdev.crypto_bdev.blocklen = block_len; + g_bdev_io->u.bdev.num_blocks = 2; + g_bdev_io->u.bdev.iovs[0].iov_len = 512; + g_bdev_io->u.bdev.iovs[0].iov_base = (void *)0xDEADBEEF; + g_bdev_io->u.bdev.iovs[1].iov_len = 512; + g_bdev_io->u.bdev.iovs[1].iov_base = (void *)0xFEEDBEEF; + g_crypto_bdev.crypto_bdev.blocklen = 512; g_bdev_io->type = SPDK_BDEV_IO_TYPE_READ; g_enqueue_mock = g_dequeue_mock = 1; - ut_rte_crypto_op_bulk_alloc = num_blocks; + ut_rte_crypto_op_bulk_alloc = 2; + + CU_ASSERT(TAILQ_EMPTY(&g_crypto_ch->queued_cry_ops) == true); vbdev_crypto_submit_request(g_io_ch, g_bdev_io); CU_ASSERT(g_bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS); + CU_ASSERT(g_io_ctx->cryop_cnt_remaining == 2); + sym_op = g_test_crypto_ops[0]->sym; + CU_ASSERT(sym_op->m_src->buf_addr == (void *)0xDEADBEEF); + CU_ASSERT(sym_op->m_src->data_len == 512); + CU_ASSERT(sym_op->m_src->next == NULL); + CU_ASSERT(sym_op->cipher.data.length == 512); + CU_ASSERT(sym_op->cipher.data.offset == 0); + CU_ASSERT(sym_op->m_src->userdata == g_bdev_io); + CU_ASSERT(sym_op->m_dst == NULL); - /* this test only completes one of the 2 IOs (in the drain path) */ - CU_ASSERT(g_io_ctx->cryop_cnt_remaining == 1); + /* make sure one got queued and confirm its values */ + CU_ASSERT(TAILQ_EMPTY(&g_crypto_ch->queued_cry_ops) == false); + queued_op = TAILQ_FIRST(&g_crypto_ch->queued_cry_ops); + sym_op = queued_op->crypto_op->sym; + TAILQ_REMOVE(&g_crypto_ch->queued_cry_ops, queued_op, link); + CU_ASSERT(queued_op->bdev_io == g_bdev_io); + CU_ASSERT(queued_op->crypto_op == g_test_crypto_ops[1]); + CU_ASSERT(sym_op->m_src->buf_addr == (void *)0xFEEDBEEF); + CU_ASSERT(sym_op->m_src->data_len == 512); + CU_ASSERT(sym_op->m_src->next == NULL); + CU_ASSERT(sym_op->cipher.data.length == 512); + CU_ASSERT(sym_op->cipher.data.offset == 0); + CU_ASSERT(sym_op->m_src->userdata == g_bdev_io); + CU_ASSERT(sym_op->m_dst == NULL); + CU_ASSERT(TAILQ_EMPTY(&g_crypto_ch->queued_cry_ops) == true); - for (i = 0; i < num_blocks; i++) { - /* One of the src_mbufs was freed because of the device full condition so - * we can't assert its value here. - */ - CU_ASSERT(g_test_dev_full_ops[i]->sym->cipher.data.length == block_len); - CU_ASSERT(g_test_dev_full_ops[i]->sym->cipher.data.offset == 0); - CU_ASSERT(g_test_dev_full_ops[i]->sym->m_src == g_test_dev_full_ops[i]->sym->m_src); - CU_ASSERT(g_test_dev_full_ops[i]->sym->m_dst == NULL); - } - - /* Only one of the 2 blocks in the test was freed on completion by design, so - * we need to free th other one here. - */ spdk_mempool_put(g_mbuf_mp, g_test_crypto_ops[0]->sym->m_src); - g_test_overflow = 0; + spdk_mempool_put(g_mbuf_mp, g_test_crypto_ops[1]->sym->m_src); } static void @@ -886,6 +888,43 @@ test_supported_io(void) CU_ASSERT(rc == false); } +static void +test_poller(void) +{ + int rc; + struct rte_mbuf *src_mbufs[1]; + struct vbdev_crypto_op *op_to_resubmit; + + /* test regular 1 op to dequeue and complete */ + g_dequeue_mock = g_enqueue_mock = 1; + spdk_mempool_get_bulk(g_mbuf_mp, (void **)&src_mbufs[0], 1); + g_test_crypto_ops[0]->sym->m_src = src_mbufs[0]; + g_test_crypto_ops[0]->sym->m_src->userdata = g_bdev_io; + g_test_crypto_ops[0]->sym->m_dst = NULL; + g_io_ctx->cryop_cnt_remaining = 1; + g_bdev_io->type = SPDK_BDEV_IO_TYPE_READ; + rc = crypto_dev_poller(g_crypto_ch); + CU_ASSERT(rc == 1); + + /* We have nothing dequeued but have some to resubmit */ + g_dequeue_mock = 0; + CU_ASSERT(TAILQ_EMPTY(&g_crypto_ch->queued_cry_ops) == true); + + /* add an op to the queued list. */ + g_resubmit_test = true; + op_to_resubmit = (struct vbdev_crypto_op *)((uint8_t *)g_test_crypto_ops[0] + QUEUED_OP_OFFSET); + op_to_resubmit->crypto_op = (void *)0xDEADBEEF; + op_to_resubmit->bdev_io = g_bdev_io; + TAILQ_INSERT_TAIL(&g_crypto_ch->queued_cry_ops, + op_to_resubmit, + link); + CU_ASSERT(TAILQ_EMPTY(&g_crypto_ch->queued_cry_ops) == false); + rc = crypto_dev_poller(g_crypto_ch); + g_resubmit_test = false; + CU_ASSERT(rc == 0); + CU_ASSERT(TAILQ_EMPTY(&g_crypto_ch->queued_cry_ops) == true); +} + int main(int argc, char **argv) { @@ -923,7 +962,9 @@ main(int argc, char **argv) CU_add_test(suite, "test_supported_io", test_supported_io) == NULL || CU_add_test(suite, "test_reset", - test_reset) == NULL + test_reset) == NULL || + CU_add_test(suite, "test_poller", + test_poller) == NULL ) { CU_cleanup_registry(); return CU_get_error();