bdev/rbd: support readv/writev with multiple iovs

Previously, the RBD bdev only supported a single iovec entry, which is
not sufficient for the bdev API.

Change-Id: Ic18257bae0363b9c01e091547e5b41bae0a21e9a
Signed-off-by: Daniel Verkamp <daniel.verkamp@intel.com>
Reviewed-on: https://review.gerrithub.io/401259
Tested-by: SPDK Automated Test System <sys_sgsw@intel.com>
Reviewed-by: Changpeng Liu <changpeng.liu@intel.com>
Reviewed-by: Jim Harris <james.r.harris@intel.com>
This commit is contained in:
Daniel Verkamp 2017-10-12 17:20:10 -07:00 committed by Jim Harris
parent afe51a1556
commit bd069288d3

View File

@ -45,6 +45,7 @@
#include "spdk/io_channel.h" #include "spdk/io_channel.h"
#include "spdk/json.h" #include "spdk/json.h"
#include "spdk/string.h" #include "spdk/string.h"
#include "spdk/util.h"
#include "spdk_internal/bdev.h" #include "spdk_internal/bdev.h"
#include "spdk_internal/log.h" #include "spdk_internal/log.h"
@ -70,6 +71,12 @@ struct bdev_rbd_io_channel {
struct spdk_poller *poller; struct spdk_poller *poller;
}; };
struct bdev_rbd_io {
uint64_t remaining_len;
int num_segments;
bool failed;
};
static void static void
bdev_rbd_free(struct bdev_rbd *rbd) bdev_rbd_free(struct bdev_rbd *rbd)
{ {
@ -199,21 +206,56 @@ bdev_rbd_start_aio(rbd_image_t image, struct spdk_bdev_io *bdev_io,
static int bdev_rbd_library_init(void); static int bdev_rbd_library_init(void);
static int
bdev_rbd_get_ctx_size(void)
{
return sizeof(struct bdev_rbd_io);
}
SPDK_BDEV_MODULE_REGISTER(rbd, bdev_rbd_library_init, NULL, NULL, SPDK_BDEV_MODULE_REGISTER(rbd, bdev_rbd_library_init, NULL, NULL,
NULL, NULL) bdev_rbd_get_ctx_size, NULL)
static int64_t static int64_t
bdev_rbd_rw(struct bdev_rbd *disk, struct spdk_io_channel *ch, bdev_rbd_rw(struct bdev_rbd *disk, struct spdk_io_channel *ch,
struct spdk_bdev_io *bdev_io, struct iovec *iov, struct spdk_bdev_io *bdev_io, struct iovec *iov,
int iovcnt, size_t len, uint64_t offset) int iovcnt, size_t len, uint64_t offset)
{ {
struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
struct bdev_rbd_io_channel *rbdio_ch = spdk_io_channel_get_ctx(ch); struct bdev_rbd_io_channel *rbdio_ch = spdk_io_channel_get_ctx(ch);
size_t remaining = len;
int i, rc;
if (iovcnt != 1 || iov->iov_len != len) { rbd_io->remaining_len = 0;
return -1; rbd_io->num_segments = 0;
rbd_io->failed = false;
for (i = 0; i < iovcnt && remaining > 0; i++) {
size_t seg_len = spdk_min(remaining, iov[i].iov_len);
rc = bdev_rbd_start_aio(rbdio_ch->image, bdev_io, iov[i].iov_base, offset, seg_len);
if (rc) {
/*
* This bdev_rbd_start_aio() call failed, but if any previous ones were
* submitted, we need to wait for them to finish.
*/
if (rbd_io->num_segments == 0) {
/* No previous I/O submitted - return error code immediately. */
return rc;
}
/* Return and wait for outstanding I/O to complete. */
rbd_io->failed = true;
return 0;
}
rbd_io->num_segments++;
rbd_io->remaining_len += seg_len;
offset += seg_len;
remaining -= seg_len;
} }
return bdev_rbd_start_aio(rbdio_ch->image, bdev_io, iov->iov_base, offset, len); return 0;
} }
static int64_t static int64_t
@ -308,7 +350,7 @@ bdev_rbd_io_poll(void *arg)
int i, io_status, rc; int i, io_status, rc;
rbd_completion_t comps[SPDK_RBD_QUEUE_DEPTH]; rbd_completion_t comps[SPDK_RBD_QUEUE_DEPTH];
struct spdk_bdev_io *bdev_io; struct spdk_bdev_io *bdev_io;
enum spdk_bdev_io_status status; struct bdev_rbd_io *rbd_io;
rc = poll(&ch->pfd, 1, 0); rc = poll(&ch->pfd, 1, 0);
@ -320,23 +362,34 @@ bdev_rbd_io_poll(void *arg)
rc = rbd_poll_io_events(ch->image, comps, SPDK_RBD_QUEUE_DEPTH); rc = rbd_poll_io_events(ch->image, comps, SPDK_RBD_QUEUE_DEPTH);
for (i = 0; i < rc; i++) { for (i = 0; i < rc; i++) {
bdev_io = rbd_aio_get_arg(comps[i]); bdev_io = rbd_aio_get_arg(comps[i]);
rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
io_status = rbd_aio_get_return_value(comps[i]); io_status = rbd_aio_get_return_value(comps[i]);
assert(rbd_io->num_segments > 0);
rbd_io->num_segments--;
if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
if ((int)(bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen) == io_status) { if (io_status > 0) {
status = SPDK_BDEV_IO_STATUS_SUCCESS; /* For reads, io_status is the length */
} else { rbd_io->remaining_len -= io_status;
status = SPDK_BDEV_IO_STATUS_FAILED; }
if (rbd_io->num_segments == 0 && rbd_io->remaining_len != 0) {
rbd_io->failed = true;
} }
} else { } else {
/* For others, 0 means success */ /* For others, 0 means success */
if (!io_status) { if (io_status != 0) {
status = SPDK_BDEV_IO_STATUS_SUCCESS; rbd_io->failed = true;
} else {
status = SPDK_BDEV_IO_STATUS_FAILED;
} }
} }
rbd_aio_release(comps[i]); rbd_aio_release(comps[i]);
spdk_bdev_io_complete(bdev_io, status);
if (rbd_io->num_segments == 0) {
spdk_bdev_io_complete(bdev_io,
rbd_io->failed ? SPDK_BDEV_IO_STATUS_FAILED : SPDK_BDEV_IO_STATUS_SUCCESS);
}
} }
} }