From 3d6305b9fd7248f60a8f2fc6c6b99170f7093672 Mon Sep 17 00:00:00 2001 From: paul luse Date: Sun, 19 May 2019 10:04:11 -0700 Subject: [PATCH] bdev/compress: fix issues with multi-thread Submission logic was incorrect and completion logic was not present yet. This passes now with multi-thread bdevio test. Signed-off-by: paul luse Change-Id: Ia8ed1c123be511240d93503a2c5e501ccad445bf Reviewed-on: https://review.gerrithub.io/c/spdk/spdk/+/455018 Tested-by: SPDK CI Jenkins Reviewed-by: Ben Walker Reviewed-by: Shuhei Matsumoto --- lib/bdev/compress/vbdev_compress.c | 109 ++++++++++++++++++++--------- 1 file changed, 76 insertions(+), 33 deletions(-) diff --git a/lib/bdev/compress/vbdev_compress.c b/lib/bdev/compress/vbdev_compress.c index 603bafc80..ec238ad8c 100644 --- a/lib/bdev/compress/vbdev_compress.c +++ b/lib/bdev/compress/vbdev_compress.c @@ -145,6 +145,7 @@ struct comp_bdev_io { struct spdk_bdev_io_wait_entry bdev_io_wait; /* for bdev_io_wait */ struct spdk_bdev_io *orig_io; /* the original IO */ struct spdk_io_channel *ch; /* for resubmission */ + int status; /* save for completion on orig thread */ }; /* Shared mempools between all devices on this system */ @@ -375,19 +376,38 @@ error_create_mbuf: return rc; } +/* for completing rw requests on the orig IO thread. */ +static void +_spdk_reduce_rw_blocks_cb(void *arg) +{ + struct comp_bdev_io *io_ctx = arg; + + if (io_ctx->status == 0) { + spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS); + } else { + SPDK_ERRLOG("status %d on operation from reduce API\n", io_ctx->status); + spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED); + } +} + /* Completion callback for r/w that were issued via reducelib. */ static void spdk_reduce_rw_blocks_cb(void *arg, int reduce_errno) { struct spdk_bdev_io *bdev_io = arg; + struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; + struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch); /* TODO: need to decide which error codes are bdev_io success vs failure; * example examine calls reading metadata */ - if (reduce_errno == 0) { - spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS); + + io_ctx->status = reduce_errno; + + /* Send this request to the orig IO thread. */ + if (spdk_io_channel_get_thread(ch) != spdk_get_thread()) { + spdk_thread_send_msg(spdk_io_channel_get_thread(ch), _spdk_reduce_rw_blocks_cb, io_ctx); } else { - SPDK_ERRLOG("ERROR %d on operation from reduce API\n", reduce_errno); - spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); + _spdk_reduce_rw_blocks_cb(io_ctx); } } @@ -630,55 +650,48 @@ comp_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, b spdk_reduce_rw_blocks_cb, bdev_io); } +/* scheduled for completion on IO thread */ +static void +_complete_other_io(void *arg) +{ + struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)arg; + if (io_ctx->status == 0) { + spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS); + } else { + spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED); + } +} + +/* scheduled for submission on reduce thread */ static void _spdk_bdev_io_submit(void *arg) { - struct comp_bdev_io *io_ctx = arg; - - vbdev_compress_submit_request(spdk_io_channel_from_ctx(io_ctx->comp_ch), io_ctx->orig_io); -} - -/* Called when someone above submits IO to this vbdev. */ -static void -vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) -{ + struct spdk_bdev_io *bdev_io = arg; + struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; + struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch); struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, comp_bdev); - struct comp_io_channel *comp_ch = spdk_io_channel_get_ctx(ch); - struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; int rc = 0; - memset(io_ctx, 0, sizeof(struct comp_bdev_io)); - io_ctx->comp_bdev = comp_bdev; - io_ctx->comp_ch = comp_ch; - io_ctx->orig_io = bdev_io; - - /* Send this request to the reduce_thread. */ - if (spdk_io_channel_get_thread(spdk_bdev_io_get_io_channel(bdev_io)) - != comp_bdev->reduce_thread) { - spdk_thread_send_msg(comp_bdev->reduce_thread, _spdk_bdev_io_submit, io_ctx); - return; - } - switch (bdev_io->type) { case SPDK_BDEV_IO_TYPE_READ: spdk_bdev_io_get_buf(bdev_io, comp_read_get_buf_cb, bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); - break; + return; case SPDK_BDEV_IO_TYPE_WRITE: spdk_reduce_vol_writev(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, spdk_reduce_rw_blocks_cb, bdev_io); - break; + return; /* TODO in future patch in the series */ + case SPDK_BDEV_IO_TYPE_RESET: + break; case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: case SPDK_BDEV_IO_TYPE_UNMAP: case SPDK_BDEV_IO_TYPE_FLUSH: - case SPDK_BDEV_IO_TYPE_RESET: default: SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type); - spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); - return; + rc = -EINVAL; } if (rc) { @@ -686,11 +699,41 @@ vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *b SPDK_ERRLOG("No memory, start to queue io for compress.\n"); io_ctx->ch = ch; vbdev_compress_queue_io(bdev_io); + return; } else { SPDK_ERRLOG("on bdev_io submission!\n"); - spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); + io_ctx->status = rc; } } + + /* Complete this on the orig IO thread. */ + if (spdk_io_channel_get_thread(ch) != spdk_get_thread()) { + spdk_thread_send_msg(spdk_io_channel_get_thread(ch), _complete_other_io, io_ctx); + } else { + _complete_other_io(io_ctx); + } +} + +/* Called when someone above submits IO to this vbdev. */ +static void +vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) +{ + struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; + struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, + comp_bdev); + struct comp_io_channel *comp_ch = spdk_io_channel_get_ctx(ch); + + memset(io_ctx, 0, sizeof(struct comp_bdev_io)); + io_ctx->comp_bdev = comp_bdev; + io_ctx->comp_ch = comp_ch; + io_ctx->orig_io = bdev_io; + + /* Send this request to the reduce_thread if that's not what we're on. */ + if (spdk_io_channel_get_thread(ch) != comp_bdev->reduce_thread) { + spdk_thread_send_msg(comp_bdev->reduce_thread, _spdk_bdev_io_submit, bdev_io); + } else { + _spdk_bdev_io_submit(bdev_io); + } } static bool