From b1617bf80f05336a0bee0eead35404eb2052c210 Mon Sep 17 00:00:00 2001 From: Ben Walker Date: Thu, 20 Jul 2017 07:47:34 -0700 Subject: [PATCH] bdev/rbd: Poller can now handle channel deletion inline An I/O completion callback may delete the channel, so restructure the poller to avoid touching channel memory while triggering completions. Change-Id: I612f10ff172481084386c9a3056fdd5e2f19e854 Signed-off-by: Ben Walker Reviewed-on: https://review.gerrithub.io/370526 Tested-by: SPDK Automated Test System Reviewed-by: Daniel Verkamp --- lib/bdev/rbd/bdev_rbd.c | 69 ++++++++++++++++------------------------- 1 file changed, 27 insertions(+), 42 deletions(-) diff --git a/lib/bdev/rbd/bdev_rbd.c b/lib/bdev/rbd/bdev_rbd.c index 7d70afd31..60d62a7ba 100644 --- a/lib/bdev/rbd/bdev_rbd.c +++ b/lib/bdev/rbd/bdev_rbd.c @@ -48,13 +48,11 @@ #include "spdk_internal/bdev.h" +#define SPDK_RBD_QUEUE_DEPTH 128 + static TAILQ_HEAD(, bdev_rbd) g_rbds = TAILQ_HEAD_INITIALIZER(g_rbds); static int bdev_rbd_count = 0; -struct bdev_rbd_io { - rbd_completion_t completion; -}; - struct bdev_rbd { struct spdk_bdev disk; char *rbd_name; @@ -68,8 +66,6 @@ struct bdev_rbd_io_channel { rados_t cluster; struct pollfd pfd; rbd_image_t image; - rbd_completion_t *comps; - uint32_t queue_depth; struct bdev_rbd *disk; struct spdk_bdev_poller *poller; }; @@ -171,30 +167,30 @@ bdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg) } static int -bdev_rbd_start_aio(rbd_image_t image, struct bdev_rbd_io *cmd, +bdev_rbd_start_aio(rbd_image_t image, struct spdk_bdev_io *bdev_io, void *buf, uint64_t offset, size_t len) { int ret; - struct spdk_bdev_io *bdev_io = spdk_bdev_io_from_ctx(cmd); + rbd_completion_t comp; - ret = rbd_aio_create_completion((void *)cmd, bdev_rbd_finish_aiocb, - &cmd->completion); + ret = rbd_aio_create_completion(bdev_io, bdev_rbd_finish_aiocb, + &comp); if (ret < 0) { return -1; } if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { ret = rbd_aio_read(image, offset, len, - buf, cmd->completion); + buf, comp); } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { ret = rbd_aio_write(image, offset, len, - buf, cmd->completion); + buf, comp); } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH) { - ret = rbd_aio_flush(image, cmd->completion); + ret = rbd_aio_flush(image, comp); } if (ret < 0) { - rbd_aio_release(cmd->completion); + rbd_aio_release(comp); return -1; } @@ -207,7 +203,7 @@ static void bdev_rbd_library_fini(void); static int bdev_rbd_get_ctx_size(void) { - return sizeof(struct bdev_rbd_io); + return 0; } SPDK_BDEV_MODULE_REGISTER(rbd, bdev_rbd_library_init, bdev_rbd_library_fini, NULL, @@ -215,7 +211,7 @@ SPDK_BDEV_MODULE_REGISTER(rbd, bdev_rbd_library_init, bdev_rbd_library_fini, NUL static int64_t bdev_rbd_readv(struct bdev_rbd *disk, struct spdk_io_channel *ch, - struct bdev_rbd_io *cmd, struct iovec *iov, + struct spdk_bdev_io *bdev_io, struct iovec *iov, int iovcnt, size_t len, uint64_t offset) { struct bdev_rbd_io_channel *rbdio_ch = spdk_io_channel_get_ctx(ch); @@ -223,12 +219,12 @@ bdev_rbd_readv(struct bdev_rbd *disk, struct spdk_io_channel *ch, if (iovcnt != 1 || iov->iov_len != len) return -1; - return bdev_rbd_start_aio(rbdio_ch->image, cmd, iov->iov_base, offset, len); + return bdev_rbd_start_aio(rbdio_ch->image, bdev_io, iov->iov_base, offset, len); } static int64_t bdev_rbd_writev(struct bdev_rbd *disk, struct spdk_io_channel *ch, - struct bdev_rbd_io *cmd, struct iovec *iov, + struct spdk_bdev_io *bdev_io, struct iovec *iov, int iovcnt, size_t len, uint64_t offset) { struct bdev_rbd_io_channel *rbdio_ch = spdk_io_channel_get_ctx(ch); @@ -236,16 +232,16 @@ bdev_rbd_writev(struct bdev_rbd *disk, struct spdk_io_channel *ch, if ((iovcnt != 1) || (iov->iov_len != len)) return -1; - return bdev_rbd_start_aio(rbdio_ch->image, cmd, (void *)iov->iov_base, offset, len); + return bdev_rbd_start_aio(rbdio_ch->image, bdev_io, (void *)iov->iov_base, offset, len); } static int64_t bdev_rbd_flush(struct bdev_rbd *disk, struct spdk_io_channel *ch, - struct bdev_rbd_io *cmd, uint64_t offset, uint64_t nbytes) + struct spdk_bdev_io *bdev_io, uint64_t offset, uint64_t nbytes) { struct bdev_rbd_io_channel *rbdio_ch = spdk_io_channel_get_ctx(ch); - return bdev_rbd_start_aio(rbdio_ch->image, cmd, NULL, offset, nbytes); + return bdev_rbd_start_aio(rbdio_ch->image, bdev_io, NULL, offset, nbytes); } static int @@ -260,7 +256,7 @@ static void bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io ret = bdev_rbd_readv(bdev_io->bdev->ctxt, ch, - (struct bdev_rbd_io *)bdev_io->driver_ctx, + bdev_io, bdev_io->u.read.iovs, bdev_io->u.read.iovcnt, bdev_io->u.read.len, @@ -281,7 +277,7 @@ static int _bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev case SPDK_BDEV_IO_TYPE_WRITE: return bdev_rbd_writev((struct bdev_rbd *)bdev_io->bdev->ctxt, ch, - (struct bdev_rbd_io *)bdev_io->driver_ctx, + bdev_io, bdev_io->u.write.iovs, bdev_io->u.write.iovcnt, bdev_io->u.write.len, @@ -289,7 +285,7 @@ static int _bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev case SPDK_BDEV_IO_TYPE_FLUSH: return bdev_rbd_flush((struct bdev_rbd *)bdev_io->bdev->ctxt, ch, - (struct bdev_rbd_io *)bdev_io->driver_ctx, + bdev_io, bdev_io->u.flush.offset, bdev_io->u.flush.length); default: @@ -323,9 +319,10 @@ static void bdev_rbd_io_poll(void *arg) { struct bdev_rbd_io_channel *ch = arg; - struct bdev_rbd_io *req; + int i, io_status, rc; + rbd_completion_t comps[SPDK_RBD_QUEUE_DEPTH]; struct spdk_bdev_io *bdev_io; - int i, io_status, status, rc; + enum spdk_bdev_io_status status; rc = poll(&ch->pfd, 1, 0); @@ -334,11 +331,10 @@ bdev_rbd_io_poll(void *arg) return; } - rc = rbd_poll_io_events(ch->image, ch->comps, ch->queue_depth); + rc = rbd_poll_io_events(ch->image, comps, SPDK_RBD_QUEUE_DEPTH); for (i = 0; i < rc; i++) { - req = (struct bdev_rbd_io *)rbd_aio_get_arg(ch->comps[i]); - bdev_io = spdk_bdev_io_from_ctx(req); - io_status = rbd_aio_get_return_value(ch->comps[i]); + bdev_io = rbd_aio_get_arg(comps[i]); + io_status = rbd_aio_get_return_value(comps[i]); if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { if ((int)bdev_io->u.read.len == io_status) { status = SPDK_BDEV_IO_STATUS_SUCCESS; @@ -353,8 +349,8 @@ bdev_rbd_io_poll(void *arg) status = SPDK_BDEV_IO_STATUS_FAILED; } } + rbd_aio_release(comps[i]); spdk_bdev_io_complete(bdev_io, status); - rbd_aio_release(req->completion); } } @@ -377,10 +373,6 @@ bdev_rbd_free_channel(struct bdev_rbd_io_channel *ch) rados_shutdown(ch->cluster); } - if (ch->comps) { - free(ch->comps); - } - if (ch->pfd.fd >= 0) { close(ch->pfd.fd); } @@ -435,13 +427,6 @@ bdev_rbd_create_cb(void *io_device, void *ctx_buf) goto err; } - ch->queue_depth = 128; - ch->comps = calloc(sizeof(rbd_completion_t), ch->queue_depth); - if (!ch->comps) { - SPDK_ERRLOG("Failed to allocate rbd completion array\n"); - goto err; - } - spdk_bdev_poller_start(&ch->poller, bdev_rbd_io_poll, ch, spdk_env_get_current_core(), 0);