diff --git a/lib/bdev/rbd/bdev_rbd.c b/lib/bdev/rbd/bdev_rbd.c index 1329afeee..76a83bf16 100644 --- a/lib/bdev/rbd/bdev_rbd.c +++ b/lib/bdev/rbd/bdev_rbd.c @@ -45,6 +45,7 @@ #include "spdk/io_channel.h" #include "spdk/json.h" #include "spdk/string.h" +#include "spdk/util.h" #include "spdk_internal/bdev.h" #include "spdk_internal/log.h" @@ -70,6 +71,12 @@ struct bdev_rbd_io_channel { struct spdk_poller *poller; }; +struct bdev_rbd_io { + uint64_t remaining_len; + int num_segments; + bool failed; +}; + static void bdev_rbd_free(struct bdev_rbd *rbd) { @@ -199,21 +206,56 @@ bdev_rbd_start_aio(rbd_image_t image, struct spdk_bdev_io *bdev_io, static int bdev_rbd_library_init(void); +static int +bdev_rbd_get_ctx_size(void) +{ + return sizeof(struct bdev_rbd_io); +} + SPDK_BDEV_MODULE_REGISTER(rbd, bdev_rbd_library_init, NULL, NULL, - NULL, NULL) + bdev_rbd_get_ctx_size, NULL) static int64_t bdev_rbd_rw(struct bdev_rbd *disk, struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, struct iovec *iov, int iovcnt, size_t len, uint64_t offset) { + struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; struct bdev_rbd_io_channel *rbdio_ch = spdk_io_channel_get_ctx(ch); + size_t remaining = len; + int i, rc; - if (iovcnt != 1 || iov->iov_len != len) { - return -1; + rbd_io->remaining_len = 0; + rbd_io->num_segments = 0; + rbd_io->failed = false; + + for (i = 0; i < iovcnt && remaining > 0; i++) { + size_t seg_len = spdk_min(remaining, iov[i].iov_len); + + rc = bdev_rbd_start_aio(rbdio_ch->image, bdev_io, iov[i].iov_base, offset, seg_len); + if (rc) { + /* + * This bdev_rbd_start_aio() call failed, but if any previous ones were + * submitted, we need to wait for them to finish. + */ + if (rbd_io->num_segments == 0) { + /* No previous I/O submitted - return error code immediately. */ + return rc; + } + + /* Return and wait for outstanding I/O to complete. */ + rbd_io->failed = true; + return 0; + } + + rbd_io->num_segments++; + rbd_io->remaining_len += seg_len; + + offset += seg_len; + remaining -= seg_len; } - return bdev_rbd_start_aio(rbdio_ch->image, bdev_io, iov->iov_base, offset, len); + return 0; } static int64_t @@ -308,7 +350,7 @@ bdev_rbd_io_poll(void *arg) int i, io_status, rc; rbd_completion_t comps[SPDK_RBD_QUEUE_DEPTH]; struct spdk_bdev_io *bdev_io; - enum spdk_bdev_io_status status; + struct bdev_rbd_io *rbd_io; rc = poll(&ch->pfd, 1, 0); @@ -320,23 +362,34 @@ bdev_rbd_io_poll(void *arg) rc = rbd_poll_io_events(ch->image, comps, SPDK_RBD_QUEUE_DEPTH); for (i = 0; i < rc; i++) { bdev_io = rbd_aio_get_arg(comps[i]); + rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; io_status = rbd_aio_get_return_value(comps[i]); + + assert(rbd_io->num_segments > 0); + rbd_io->num_segments--; + if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { - if ((int)(bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen) == io_status) { - status = SPDK_BDEV_IO_STATUS_SUCCESS; - } else { - status = SPDK_BDEV_IO_STATUS_FAILED; + if (io_status > 0) { + /* For reads, io_status is the length */ + rbd_io->remaining_len -= io_status; + } + + if (rbd_io->num_segments == 0 && rbd_io->remaining_len != 0) { + rbd_io->failed = true; } } else { /* For others, 0 means success */ - if (!io_status) { - status = SPDK_BDEV_IO_STATUS_SUCCESS; - } else { - status = SPDK_BDEV_IO_STATUS_FAILED; + if (io_status != 0) { + rbd_io->failed = true; } } + rbd_aio_release(comps[i]); - spdk_bdev_io_complete(bdev_io, status); + + if (rbd_io->num_segments == 0) { + spdk_bdev_io_complete(bdev_io, + rbd_io->failed ? SPDK_BDEV_IO_STATUS_FAILED : SPDK_BDEV_IO_STATUS_SUCCESS); + } } }