From 1717b5d580c93393ce18862d34fd57f123a05c5a Mon Sep 17 00:00:00 2001 From: Paul Luse Date: Sat, 1 Dec 2018 15:04:21 -0700 Subject: [PATCH] bdev/compress: add reduce integration Basic integration, init and load sequences Change-Id: I12595a251f38168aad15e95275651cb4ce40ea7d Signed-off-by: paul luse Reviewed-on: https://review.gerrithub.io/c/spdk/spdk/+/435764 Tested-by: SPDK CI Jenkins Reviewed-by: Shuhei Matsumoto Reviewed-by: Jim Harris --- lib/bdev/compress/vbdev_compress.c | 286 +++++++++++++++++++++++++++-- mk/spdk.modules.mk | 3 +- 2 files changed, 268 insertions(+), 21 deletions(-) diff --git a/lib/bdev/compress/vbdev_compress.c b/lib/bdev/compress/vbdev_compress.c index 382c79e22..268e045db 100644 --- a/lib/bdev/compress/vbdev_compress.c +++ b/lib/bdev/compress/vbdev_compress.c @@ -33,6 +33,7 @@ #include "vbdev_compress.h" +#include "spdk/reduce.h" #include "spdk/stdinc.h" #include "spdk/rpc.h" #include "spdk/env.h" @@ -54,9 +55,14 @@ #define NUM_MAX_INFLIGHT_OPS 512 #define DEFAULT_WINDOW_SIZE 15 #define MAX_MBUFS_PER_OP 64 +#define BACKING_IO_UNIT_SZ (1024 * 4) +#define CHUNK_SIZE (1024 * 16) #define COMP_BDEV_NAME "compress" +/* TODO: need to get this from RPC on create or reduce metadata on load */ +#define TEST_MD_PATH "/tmp" + /* To add support for new device types, follow the examples of the following... * Note that the string names are defined by the DPDK PMD in question so be * sure to use the exact names. @@ -102,6 +108,9 @@ struct vbdev_compress { pthread_mutex_t reduce_lock; uint32_t ch_count; TAILQ_HEAD(, spdk_bdev_io) pending_comp_ios; /* outstanding operations to a comp library */ + struct spdk_reduce_vol_params params; /* params for the reduce volume */ + struct spdk_reduce_backing_dev backing_dev; /* backing device info for the reduce volume */ + struct spdk_reduce_vol *vol; /* the reduce volume */ TAILQ_ENTRY(vbdev_compress) link; }; static TAILQ_HEAD(, vbdev_compress) g_vbdev_comp = TAILQ_HEAD_INITIALIZER(g_vbdev_comp); @@ -134,6 +143,7 @@ static struct rte_mempool *g_comp_op_mp = NULL; /* comp operations, must be rt static void vbdev_compress_examine(struct spdk_bdev *bdev); static void vbdev_compress_claim(struct vbdev_compress *comp_bdev); static void vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io); +struct vbdev_compress *_prepare_for_load_init(struct spdk_bdev *bdev); static void vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io); /* Called by vbdev_init_compress_drivers() to init each discovered compression device */ @@ -557,6 +567,205 @@ vbdev_compress_config_json(struct spdk_json_write_ctx *w) return 0; } +/* Callback from reduce for when init is complete. We'll pass the vbdev_comp struct + * used for initial metadata operations to claim where it will be further filled out + * and added to the global list. + */ +static void +vbdev_reduce_init_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) +{ + struct vbdev_compress *meta_ctx = cb_arg; + + if (reduce_errno == 0) { + meta_ctx->vol = vol; + vbdev_compress_claim(meta_ctx); + } else { + SPDK_ERRLOG("for vol %s, error %u\n", + spdk_bdev_get_name(meta_ctx->base_bdev), reduce_errno); + spdk_put_io_channel(meta_ctx->base_ch); + spdk_bdev_close(meta_ctx->base_desc); + free(meta_ctx); + } +} + +/* Callback for the function used by reduceLib to perform IO to/from the backing device. We just + * call the callback provided by reduceLib when it called the read/write/unmap function and + * free the bdev_io. + */ +static void +comp_reduce_io_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg) +{ + struct spdk_reduce_vol_cb_args *cb_args = arg; + int reduce_errno; + + if (success) { + reduce_errno = 0; + } else { + reduce_errno = -EIO; + } + cb_args->cb_fn(cb_args->cb_arg, reduce_errno); + spdk_bdev_free_io(bdev_io); +} + +/* This is the function provided to the reduceLib for sending reads directly to + * the backing device. + */ +static void +_comp_reduce_readv(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt, + uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) +{ + struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, + backing_dev); + int rc; + + rc = spdk_bdev_readv_blocks(comp_bdev->base_desc, comp_bdev->base_ch, + iov, iovcnt, lba, lba_count, + comp_reduce_io_cb, + args); + if (rc) { + if (rc == -ENOMEM) { + SPDK_ERRLOG("No memory, start to queue io.\n"); + /* TODO: there's no bdev_io to queue */ + } else { + SPDK_ERRLOG("error submitting readv request\n"); + } + args->cb_fn(args->cb_arg, rc); + } +} + +/* This is the function provided to the reduceLib for sending writes directly to + * the backing device. + */ +static void +_comp_reduce_writev(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt, + uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) +{ + struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, + backing_dev); + int rc; + + rc = spdk_bdev_writev_blocks(comp_bdev->base_desc, comp_bdev->base_ch, + iov, iovcnt, lba, lba_count, + comp_reduce_io_cb, + args); + if (rc) { + if (rc == -ENOMEM) { + SPDK_ERRLOG("No memory, start to queue io.\n"); + /* TODO: there's no bdev_io to queue */ + } else { + SPDK_ERRLOG("error submitting writev request\n"); + } + args->cb_fn(args->cb_arg, rc); + } +} + +/* This is the function provided to the reduceLib for sending umaps directly to + * the backing device. + */ +static void +_comp_reduce_unmap(struct spdk_reduce_backing_dev *dev, + uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) +{ + struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, + backing_dev); + int rc; + + rc = spdk_bdev_unmap_blocks(comp_bdev->base_desc, comp_bdev->base_ch, + lba, lba_count, + comp_reduce_io_cb, + args); + + if (rc) { + if (rc == -ENOMEM) { + SPDK_ERRLOG("No memory, start to queue io.\n"); + /* TODO: there's no bdev_io to queue */ + } else { + SPDK_ERRLOG("error submitting unmap request\n"); + } + args->cb_fn(args->cb_arg, rc); + } +} + +/* Called when the underlying base bdev goes away. */ +static void +vbdev_compress_base_bdev_hotremove_cb(void *ctx) +{ + struct vbdev_compress *comp_bdev, *tmp; + struct spdk_bdev *bdev_find = ctx; + + TAILQ_FOREACH_SAFE(comp_bdev, &g_vbdev_comp, link, tmp) { + if (bdev_find == comp_bdev->base_bdev) { + spdk_bdev_unregister(&comp_bdev->comp_bdev, NULL, NULL); + } + } +} + +/* TODO: determine which parms we want user configurable, HC for now + * params.vol_size + * params.chunk_size + * compression PMD, algorithm, window size, comp level, etc. + * TEST_MD_PATH + */ + +/* Common function for init and load to allocate and populate the minimal + * information for reducelib to init or load. + */ +struct vbdev_compress * +_prepare_for_load_init(struct spdk_bdev *bdev) +{ + struct vbdev_compress *meta_ctx; + + meta_ctx = calloc(1, sizeof(struct vbdev_compress)); + if (meta_ctx == NULL) { + SPDK_ERRLOG("failed to alloc init contexs\n"); + return NULL; + } + + meta_ctx->base_bdev = bdev; + meta_ctx->backing_dev.unmap = _comp_reduce_unmap; + meta_ctx->backing_dev.readv = _comp_reduce_readv; + meta_ctx->backing_dev.writev = _comp_reduce_writev; + meta_ctx->backing_dev.blocklen = bdev->blocklen; + meta_ctx->backing_dev.blockcnt = bdev->blockcnt; + + meta_ctx->params.chunk_size = CHUNK_SIZE; + meta_ctx->params.backing_io_unit_size = BACKING_IO_UNIT_SZ; + meta_ctx->params.logical_block_size = meta_ctx->params.backing_io_unit_size; + + return meta_ctx; +} + +/* Call reducelib to initialize a new volume */ +static void +vbdev_init_reduce(struct spdk_bdev *bdev, const char *vbdev_name, const char *comp_pmd) +{ + struct vbdev_compress *meta_ctx; + int rc; + + meta_ctx = _prepare_for_load_init(bdev); + if (meta_ctx == NULL) { + return; + } + + rc = spdk_bdev_open(meta_ctx->base_bdev, true, vbdev_compress_base_bdev_hotremove_cb, + meta_ctx->base_bdev, &meta_ctx->base_desc); + if (rc) { + SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev)); + free(meta_ctx); + return; + } + meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc); + + /* TODO: we'll want to pass name and compression parms to this + * function so they can be persisted. We'll need to retrieve them + * in load. + */ + spdk_reduce_vol_init(&meta_ctx->params, &meta_ctx->backing_dev, + TEST_MD_PATH, + vbdev_reduce_init_cb, + meta_ctx); +} + /* We provide this callback for the SPDK channel code to create a channel using * the channel struct we provided in our module get_io_channel() entry point. Here * we get and save off an underlying base channel of the device below us so that @@ -657,7 +866,7 @@ create_compress_disk(const char *bdev_name, const char *vbdev_name, const char * return -ENODEV; } - /* TODO: vbdev_init_reduce(bdev); goes here */ + vbdev_init_reduce(bdev, vbdev_name, comp_pmd); return 0; } @@ -670,6 +879,7 @@ vbdev_compress_init(void) SPDK_ERRLOG("Error setting up compression devices\n"); return -EINVAL; } + return 0; } @@ -715,28 +925,13 @@ static struct spdk_bdev_module compress_if = { .module_init = vbdev_compress_init, .config_text = NULL, .get_ctx_size = vbdev_compress_get_ctx_size, - .examine_config = vbdev_compress_examine, + .examine_disk = vbdev_compress_examine, .module_fini = vbdev_compress_finish, .config_json = vbdev_compress_config_json }; SPDK_BDEV_MODULE_REGISTER(compress, &compress_if) -/* Called when the underlying base bdev goes away. */ -static void -vbdev_compress_examine_hotremove_cb(void *ctx) -{ - /* TODO */ -} - -/* Claim isn't called until a future series but removing this function - * alone generates many other warnings about basic vbdev stuff (function - * tables, etc.. I think it's better to have all the base functionality - * in this first patch, I'll remove the pragmas as soon as claim is called - * which happens after reduce is integrated. - */ -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Wunused-function" static void vbdev_compress_claim(struct vbdev_compress *comp_bdev) { @@ -779,7 +974,7 @@ vbdev_compress_claim(struct vbdev_compress *comp_bdev) TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link); - rc = spdk_bdev_open(comp_bdev->base_bdev, true, vbdev_compress_examine_hotremove_cb, + rc = spdk_bdev_open(comp_bdev->base_bdev, true, vbdev_compress_base_bdev_hotremove_cb, comp_bdev->base_bdev, &comp_bdev->base_desc); if (rc) { SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(comp_bdev->base_bdev)); @@ -804,6 +999,7 @@ vbdev_compress_claim(struct vbdev_compress *comp_bdev) } SPDK_NOTICELOG("registered io_device and virtual bdev for: %s\n", comp_bdev->comp_bdev.name); + spdk_bdev_module_examine_done(&compress_if); return; /* Error cleanup paths. */ @@ -817,9 +1013,11 @@ error_open: error_drv_name: free(comp_bdev->comp_bdev.name); error_bdev_name: + spdk_put_io_channel(comp_bdev->base_ch); + spdk_bdev_close(comp_bdev->base_desc); free(comp_bdev); + spdk_bdev_module_examine_done(&compress_if); } -#pragma GCC diagnostic pop void delete_compress_disk(struct spdk_bdev *bdev, spdk_delete_compress_complete cb_fn, void *cb_arg) @@ -832,10 +1030,58 @@ delete_compress_disk(struct spdk_bdev *bdev, spdk_delete_compress_complete cb_fn spdk_bdev_unregister(bdev, cb_fn, cb_arg); } +/* Callback from reduce for then load is complete. We'll pass the vbdev_comp struct + * used for initial metadata operations to claim where it will be further filled out + * and added to the global list. + */ +static void +vbdev_reduce_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) +{ + struct vbdev_compress *meta_ctx = cb_arg; + + if (reduce_errno != 0) { + /* This error means it is not a compress disk. */ + if (reduce_errno != -EILSEQ) { + SPDK_ERRLOG("for vol %s, error %u\n", + spdk_bdev_get_name(meta_ctx->base_bdev), reduce_errno); + } + spdk_put_io_channel(meta_ctx->base_ch); + spdk_bdev_close(meta_ctx->base_desc); + free(meta_ctx); + spdk_bdev_module_examine_done(&compress_if); + return; + } + + meta_ctx->vol = vol; + vbdev_compress_claim(meta_ctx); +} + +/* Examine_disk entry point: will do a metadata load to see if this is ours, + * and if so will go ahead and claim it. + */ static void vbdev_compress_examine(struct spdk_bdev *bdev) { - spdk_bdev_module_examine_done(&compress_if); + struct vbdev_compress *meta_ctx; + int rc; + + meta_ctx = _prepare_for_load_init(bdev); + if (meta_ctx == NULL) { + spdk_bdev_module_examine_done(&compress_if); + return; + } + + rc = spdk_bdev_open(meta_ctx->base_bdev, false, vbdev_compress_base_bdev_hotremove_cb, + meta_ctx->base_bdev, &meta_ctx->base_desc); + if (rc) { + SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev)); + free(meta_ctx); + spdk_bdev_module_examine_done(&compress_if); + return; + } + + meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc); + spdk_reduce_vol_load(&meta_ctx->backing_dev, vbdev_reduce_load_cb, meta_ctx); } SPDK_LOG_REGISTER_COMPONENT("vbdev_compress", SPDK_LOG_VBDEV_COMPRESS) diff --git a/mk/spdk.modules.mk b/mk/spdk.modules.mk index db5b5e071..7c0815993 100644 --- a/mk/spdk.modules.mk +++ b/mk/spdk.modules.mk @@ -45,7 +45,8 @@ BLOCKDEV_MODULES_LIST += ocfenv endif ifeq ($(CONFIG_REDUCE),y) -BLOCKDEV_MODULES_LIST += bdev_compress +BLOCKDEV_MODULES_LIST += bdev_compress reduce +SYS_LIBS += -lpmem endif ifeq ($(CONFIG_RDMA),y)