module/crypto: remove need to allocate a buffer for writes

Use new bdev aux buf feature.  Huge performance benefit for writes.

Signed-off-by: paul luse <paul.e.luse@intel.com>
Change-Id: I5a27460a369ef5f13bf490a287603e566071be41
Reviewed-on: https://review.gerrithub.io/c/spdk/spdk/+/478384
Community-CI: SPDK CI Jenkins <sys_sgci@intel.com>
Tested-by: SPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: Shuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
Reviewed-by: Ben Walker <benjamin.walker@intel.com>
Reviewed-by: Jim Harris <james.r.harris@intel.com>
This commit is contained in:
Shuhei Matsumoto 2020-01-20 19:07:51 -05:00 committed by Jim Harris
parent 82a4c84fec
commit 2c8ddd08bd
2 changed files with 65 additions and 58 deletions

View File

@ -215,9 +215,10 @@ struct crypto_bdev_io {
int8_t bdev_io_status; /* the status we'll report back on the bdev IO */
bool on_pending_list;
/* Used for the single contiguous buffer that serves as the crypto destination target for writes */
uint64_t cry_num_blocks; /* num of blocks for the contiguous buffer */
uint64_t cry_offset_blocks; /* block offset on media */
struct iovec cry_iov; /* iov representing contig write buffer */
uint64_t aux_num_blocks; /* num of blocks for the contiguous buffer */
uint64_t aux_offset_blocks; /* block offset on media */
void *aux_buf_raw; /* raw buffer that the bdev layer gave us for write buffer */
struct iovec aux_buf_iov; /* iov representing aligned contig write buffer */
/* for bdev_io_wait */
struct spdk_bdev_io_wait_entry bdev_io_wait;
@ -522,8 +523,8 @@ _crypto_operation_complete(struct spdk_bdev_io *bdev_io)
if (io_ctx->bdev_io_status != SPDK_BDEV_IO_STATUS_FAILED) {
/* Write the encrypted data. */
rc = spdk_bdev_writev_blocks(crypto_bdev->base_desc, crypto_ch->base_ch,
&io_ctx->cry_iov, 1, io_ctx->cry_offset_blocks,
io_ctx->cry_num_blocks, _complete_internal_write,
&io_ctx->aux_buf_iov, 1, io_ctx->aux_offset_blocks,
io_ctx->aux_num_blocks, _complete_internal_write,
bdev_io);
} else {
SPDK_ERRLOG("Issue with encryption on bdev_io %p\n", bdev_io);
@ -542,7 +543,8 @@ _crypto_operation_complete(struct spdk_bdev_io *bdev_io)
}
static int _crypto_operation(struct spdk_bdev_io *bdev_io,
enum rte_crypto_cipher_operation crypto_op);
enum rte_crypto_cipher_operation crypto_op,
void *aux_buf);
/* This is the poller for the crypto device. It uses a single API to dequeue whatever is ready at
* the device. Then we need to decide if what we've got so far (including previous poller
@ -665,7 +667,8 @@ crypto_dev_poller(void *args)
/* We're either encrypting on the way down or decrypting on the way back. */
static int
_crypto_operation(struct spdk_bdev_io *bdev_io, enum rte_crypto_cipher_operation crypto_op)
_crypto_operation(struct spdk_bdev_io *bdev_io, enum rte_crypto_cipher_operation crypto_op,
void *aux_buf)
{
uint16_t num_enqueued_ops = 0;
uint32_t cryop_cnt = bdev_io->u.bdev.num_blocks;
@ -687,6 +690,7 @@ _crypto_operation(struct spdk_bdev_io *bdev_io, enum rte_crypto_cipher_operation
struct rte_mbuf *dst_mbufs[MAX_ENQUEUE_ARRAY_SIZE];
int burst;
struct vbdev_crypto_op *op_to_queue;
uint64_t alignment = spdk_bdev_get_buf_align(&io_ctx->crypto_bdev->crypto_bdev);
assert((bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen) <= CRYPTO_MAX_IO);
@ -731,22 +735,11 @@ _crypto_operation(struct spdk_bdev_io *bdev_io, enum rte_crypto_cipher_operation
* undesirable in some use cases.
*/
if (crypto_op == RTE_CRYPTO_CIPHER_OP_ENCRYPT) {
io_ctx->cry_iov.iov_len = total_length;
/* For now just allocate in the I/O path, not optimal but the current bdev API
* for getting a buffer from the pool won't work if the bdev_io passed in
* has a buffer, which ours always will. So, until we modify that API
* or better yet the current ZCOPY work lands, this is the best we can do.
*/
io_ctx->cry_iov.iov_base = spdk_malloc(total_length,
spdk_bdev_get_buf_align(bdev_io->bdev), NULL,
SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
if (!io_ctx->cry_iov.iov_base) {
SPDK_ERRLOG("ERROR trying to allocate write buffer for encryption!\n");
rc = -ENOMEM;
goto error_get_write_buffer;
}
io_ctx->cry_offset_blocks = bdev_io->u.bdev.offset_blocks;
io_ctx->cry_num_blocks = bdev_io->u.bdev.num_blocks;
io_ctx->aux_buf_iov.iov_len = total_length;
io_ctx->aux_buf_raw = aux_buf;
io_ctx->aux_buf_iov.iov_base = (void *)(((uintptr_t)aux_buf + (alignment - 1)) & ~(alignment - 1));
io_ctx->aux_offset_blocks = bdev_io->u.bdev.offset_blocks;
io_ctx->aux_num_blocks = bdev_io->u.bdev.num_blocks;
}
/* This value is used in the completion callback to determine when the bdev_io is
@ -803,7 +796,7 @@ _crypto_operation(struct spdk_bdev_io *bdev_io, enum rte_crypto_cipher_operation
if (crypto_op == RTE_CRYPTO_CIPHER_OP_ENCRYPT) {
/* Set the relevant destination en_mbuf elements. */
dst_mbufs[crypto_index]->buf_addr = io_ctx->cry_iov.iov_base + en_offset;
dst_mbufs[crypto_index]->buf_addr = io_ctx->aux_buf_iov.iov_base + en_offset;
dst_mbufs[crypto_index]->data_len = updated_length = crypto_len;
/* TODO: Make this assignment conditional on QAT usage and add an assert. */
dst_mbufs[crypto_index]->buf_iova = spdk_vtophys(dst_mbufs[crypto_index]->buf_addr,
@ -903,9 +896,6 @@ _crypto_operation(struct spdk_bdev_io *bdev_io, enum rte_crypto_cipher_operation
/* Error cleanup paths. */
error_attach_session:
error_get_write_buffer:
rte_mempool_put_bulk(g_crypto_op_mp, (void **)crypto_ops, cryop_cnt);
allocated = 0;
error_get_ops:
if (crypto_op == RTE_CRYPTO_CIPHER_OP_ENCRYPT) {
spdk_mempool_put_bulk(g_mbuf_mp, (void **)&dst_mbufs[0],
@ -985,7 +975,8 @@ _complete_internal_write(struct spdk_bdev_io *bdev_io, bool success, void *cb_ar
int status = success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED;
struct crypto_bdev_io *orig_ctx = (struct crypto_bdev_io *)orig_io->driver_ctx;
spdk_free(orig_ctx->cry_iov.iov_base);
spdk_bdev_io_put_aux_buf(orig_io, orig_ctx->aux_buf_raw);
spdk_bdev_io_complete(orig_io, status);
spdk_bdev_free_io(bdev_io);
}
@ -1002,7 +993,7 @@ _complete_internal_read(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg
/* Save off this bdev_io so it can be freed after decryption. */
orig_ctx->read_io = bdev_io;
if (!_crypto_operation(orig_io, RTE_CRYPTO_CIPHER_OP_DECRYPT)) {
if (!_crypto_operation(orig_io, RTE_CRYPTO_CIPHER_OP_DECRYPT, NULL)) {
return;
} else {
SPDK_ERRLOG("ERROR decrypting\n");
@ -1076,6 +1067,31 @@ crypto_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
}
}
/* For encryption we don't want to encrypt the data in place as the host isn't
* expecting us to mangle its data buffers so we need to encrypt into the bdev
* aux buffer, then we can use that as the source for the disk data transfer.
*/
static void
crypto_write_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
void *aux_buf)
{
struct crypto_bdev_io *io_ctx = (struct crypto_bdev_io *)bdev_io->driver_ctx;
int rc = 0;
rc = _crypto_operation(bdev_io, RTE_CRYPTO_CIPHER_OP_ENCRYPT, aux_buf);
if (rc != 0) {
spdk_bdev_io_put_aux_buf(bdev_io, aux_buf);
if (rc == -ENOMEM) {
SPDK_DEBUGLOG(SPDK_LOG_CRYPTO, "No memory, queue the IO.\n");
io_ctx->ch = ch;
vbdev_crypto_queue_io(bdev_io);
} else {
SPDK_ERRLOG("ERROR on bdev_io submission!\n");
spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
}
}
}
/* Called when someone submits IO to this crypto vbdev. For IO's not relevant to crypto,
* we're simply passing it on here via SPDK IO calls which in turn allocate another bdev IO
* and call our cpl callback provided below along with the original bdev_io so that we can
@ -1104,7 +1120,10 @@ vbdev_crypto_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bde
bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
break;
case SPDK_BDEV_IO_TYPE_WRITE:
rc = _crypto_operation(bdev_io, RTE_CRYPTO_CIPHER_OP_ENCRYPT);
/* Tell the bdev layer that we need an aux buf in addition to the data
* buf already associated with the bdev.
*/
spdk_bdev_io_get_aux_buf(bdev_io, crypto_write_get_buf_cb);
break;
case SPDK_BDEV_IO_TYPE_UNMAP:
rc = spdk_bdev_unmap_blocks(crypto_bdev->base_desc, crypto_ch->base_ch,

View File

@ -150,12 +150,13 @@ DEFINE_STUB(spdk_conf_section_get_nmval, char *,
(struct spdk_conf_section *sp, const char *key, int idx1, int idx2), NULL);
DEFINE_STUB_V(spdk_bdev_module_list_add, (struct spdk_bdev_module *bdev_module));
DEFINE_STUB_V(spdk_bdev_free_io, (struct spdk_bdev_io *g_bdev_io));
DEFINE_STUB_V(spdk_bdev_io_put_aux_buf, (struct spdk_bdev_io *bdev_io, void *aux_buf));
DEFINE_STUB(spdk_bdev_io_type_supported, bool, (struct spdk_bdev *bdev,
enum spdk_bdev_io_type io_type), 0);
DEFINE_STUB_V(spdk_bdev_module_release_bdev, (struct spdk_bdev *bdev));
DEFINE_STUB_V(spdk_bdev_close, (struct spdk_bdev_desc *desc));
DEFINE_STUB(spdk_bdev_get_name, const char *, (const struct spdk_bdev *bdev), 0);
DEFINE_STUB(spdk_bdev_get_buf_align, size_t, (const struct spdk_bdev *bdev), 0);
DEFINE_STUB(spdk_bdev_get_buf_align, size_t, (const struct spdk_bdev *bdev), 64);
DEFINE_STUB(spdk_bdev_get_io_channel, struct spdk_io_channel *, (struct spdk_bdev_desc *desc), 0);
DEFINE_STUB_V(spdk_bdev_unregister, (struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn,
void *cb_arg));
@ -235,6 +236,12 @@ rte_cryptodev_sym_get_private_session_size(uint8_t dev_id)
return (unsigned int)dev_id;
}
void
spdk_bdev_io_get_aux_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_aux_buf_cb cb)
{
cb(g_io_ch, g_bdev_io, (void *)0xDEADBEEF);
}
void
spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len)
{
@ -434,10 +441,10 @@ test_simple_write(void)
vbdev_crypto_submit_request(g_io_ch, g_bdev_io);
CU_ASSERT(g_bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS);
CU_ASSERT(g_io_ctx->cryop_cnt_remaining == 1);
CU_ASSERT(g_io_ctx->cry_iov.iov_len == 512);
CU_ASSERT(g_io_ctx->cry_iov.iov_base != NULL);
CU_ASSERT(g_io_ctx->cry_offset_blocks == 0);
CU_ASSERT(g_io_ctx->cry_num_blocks == 1);
CU_ASSERT(g_io_ctx->aux_buf_iov.iov_len == 512);
CU_ASSERT(g_io_ctx->aux_buf_iov.iov_base != NULL);
CU_ASSERT(g_io_ctx->aux_offset_blocks == 0);
CU_ASSERT(g_io_ctx->aux_num_blocks == 1);
CU_ASSERT(g_test_crypto_ops[0]->sym->m_src->buf_addr == &test_simple_write);
CU_ASSERT(g_test_crypto_ops[0]->sym->m_src->data_len == 512);
CU_ASSERT(g_test_crypto_ops[0]->sym->m_src->next == NULL);
@ -447,7 +454,6 @@ test_simple_write(void)
CU_ASSERT(g_test_crypto_ops[0]->sym->m_dst->buf_addr != NULL);
CU_ASSERT(g_test_crypto_ops[0]->sym->m_dst->data_len == 512);
spdk_free(g_io_ctx->cry_iov.iov_base);
spdk_mempool_put(g_mbuf_mp, g_test_crypto_ops[0]->sym->m_src);
spdk_mempool_put(g_mbuf_mp, g_test_crypto_ops[0]->sym->m_dst);
}
@ -533,16 +539,15 @@ test_large_rw(void)
CU_ASSERT(g_test_crypto_ops[i]->sym->cipher.data.length == block_len);
CU_ASSERT(g_test_crypto_ops[i]->sym->cipher.data.offset == 0);
CU_ASSERT(g_test_crypto_ops[i]->sym->m_src->userdata == g_bdev_io);
CU_ASSERT(g_io_ctx->cry_iov.iov_len == io_len);
CU_ASSERT(g_io_ctx->cry_iov.iov_base != NULL);
CU_ASSERT(g_io_ctx->cry_offset_blocks == 0);
CU_ASSERT(g_io_ctx->cry_num_blocks == num_blocks);
CU_ASSERT(g_io_ctx->aux_buf_iov.iov_len == io_len);
CU_ASSERT(g_io_ctx->aux_buf_iov.iov_base != NULL);
CU_ASSERT(g_io_ctx->aux_offset_blocks == 0);
CU_ASSERT(g_io_ctx->aux_num_blocks == num_blocks);
CU_ASSERT(g_test_crypto_ops[i]->sym->m_dst->buf_addr != NULL);
CU_ASSERT(g_test_crypto_ops[i]->sym->m_dst->data_len == block_len);
spdk_mempool_put(g_mbuf_mp, g_test_crypto_ops[i]->sym->m_src);
spdk_mempool_put(g_mbuf_mp, g_test_crypto_ops[i]->sym->m_dst);
}
spdk_free(g_io_ctx->cry_iov.iov_base);
}
static void
@ -678,7 +683,6 @@ test_crazy_rw(void)
spdk_mempool_put(g_mbuf_mp, g_test_crypto_ops[i]->sym->m_src);
spdk_mempool_put(g_mbuf_mp, g_test_crypto_ops[i]->sym->m_dst);
}
spdk_free(g_io_ctx->cry_iov.iov_base);
}
static void
@ -857,10 +861,6 @@ test_initdrivers(void)
static void
test_crypto_op_complete(void)
{
/* Need to prove to scan-build that we are setting iov_bases properly. */
void *old_iov_base;
struct crypto_bdev_io *orig_ctx;
/* Make sure completion code respects failure. */
g_bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED;
g_completion_called = false;
@ -881,11 +881,6 @@ test_crypto_op_complete(void)
g_bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
g_completion_called = false;
MOCK_SET(spdk_bdev_writev_blocks, 0);
/* Code under test will free this, if not ASAN will complain. */
g_io_ctx->cry_iov.iov_base = spdk_malloc(16, 0x10, NULL, SPDK_ENV_LCORE_ID_ANY,
SPDK_MALLOC_DMA);
orig_ctx = (struct crypto_bdev_io *)g_bdev_io->driver_ctx;
old_iov_base = orig_ctx->cry_iov.iov_base;
_crypto_operation_complete(g_bdev_io);
CU_ASSERT(g_bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS);
CU_ASSERT(g_completion_called == true);
@ -895,13 +890,6 @@ test_crypto_op_complete(void)
g_bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
g_completion_called = false;
MOCK_SET(spdk_bdev_writev_blocks, -1);
/* Code under test will free this, if not ASAN will complain. */
g_io_ctx->cry_iov.iov_base = spdk_malloc(16, 0x40, NULL, SPDK_ENV_LCORE_ID_ANY,
SPDK_MALLOC_DMA);
/* To Do: remove this garbage assert as soon as scan-build stops throwing a
* heap use after free error.
*/
SPDK_CU_ASSERT_FATAL(old_iov_base != orig_ctx->cry_iov.iov_base);
_crypto_operation_complete(g_bdev_io);
CU_ASSERT(g_bdev_io->internal.status == SPDK_BDEV_IO_STATUS_FAILED);
CU_ASSERT(g_completion_called == true);