From 309f7791b9e9036f2c5795861dda407f3b1e2ac5 Mon Sep 17 00:00:00 2001 From: Jim Harris Date: Fri, 22 Mar 2019 06:35:25 -0700 Subject: [PATCH] reduce: plumb basic compress/decompress callbacks The unit tests don't really try to compress anything yet, but this at least gets the pipeline in place. Signed-off-by: Jim Harris Change-Id: Ic413850b30e4d9631f3ece2bab40d9026225e5b2 Reviewed-on: https://review.gerrithub.io/c/spdk/spdk/+/449097 Tested-by: SPDK CI Jenkins Reviewed-by: Changpeng Liu Reviewed-by: Paul Luse Reviewed-by: Shuhei Matsumoto --- include/spdk/reduce.h | 22 ++++ lib/reduce/reduce.c | 133 ++++++++++++++++++---- test/unit/lib/reduce/reduce.c/reduce_ut.c | 28 +++++ 3 files changed, 161 insertions(+), 22 deletions(-) diff --git a/include/spdk/reduce.h b/include/spdk/reduce.h index 0021f3454..a35c5b998 100644 --- a/include/spdk/reduce.h +++ b/include/spdk/reduce.h @@ -88,6 +88,18 @@ typedef void (*spdk_reduce_vol_op_with_handle_complete)(void *ctx, struct spdk_reduce_vol *vol, int reduce_errno); +/** + * Defines function type for callback functions called when backing_dev + * operations are complete. + * + * \param cb_arg Callback argument + * \param reduce_errno Completion status of backing_dev operation + * Negative values indicate negated errno value + * 0 indicates successful readv/writev/unmap operation + * Positive value indicates successful compress/decompress + * operations; number indicates number of bytes written to + * destination iovs + */ typedef void (*spdk_reduce_dev_cpl)(void *cb_arg, int reduce_errno); struct spdk_reduce_vol_cb_args { @@ -105,6 +117,16 @@ struct spdk_reduce_backing_dev { void (*unmap)(struct spdk_reduce_backing_dev *dev, uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args); + void (*compress)(struct spdk_reduce_backing_dev *dev, + struct iovec *src_iov, int src_iovcnt, + struct iovec *dst_iov, int dst_iovcnt, + struct spdk_reduce_vol_cb_args *args); + + void (*decompress)(struct spdk_reduce_backing_dev *dev, + struct iovec *src_iov, int src_iovcnt, + struct iovec *dst_iov, int dst_iovcnt, + struct spdk_reduce_vol_cb_args *args); + uint64_t blockcnt; uint32_t blocklen; }; diff --git a/lib/reduce/reduce.c b/lib/reduce/reduce.c index 51985526e..40ba2426a 100644 --- a/lib/reduce/reduce.c +++ b/lib/reduce/reduce.c @@ -950,14 +950,14 @@ _issue_backing_ops(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol * req->backing_cb_args.cb_fn = next_fn; req->backing_cb_args.cb_arg = req; for (i = 0; i < vol->backing_io_units_per_chunk; i++) { - req->decomp_buf_iov[i].iov_base = req->decomp_buf + i * vol->params.backing_io_unit_size; - req->decomp_buf_iov[i].iov_len = vol->params.backing_io_unit_size; + req->comp_buf_iov[i].iov_base = req->comp_buf + i * vol->params.backing_io_unit_size; + req->comp_buf_iov[i].iov_len = vol->params.backing_io_unit_size; if (is_write) { - vol->backing_dev->writev(vol->backing_dev, &req->decomp_buf_iov[i], 1, + vol->backing_dev->writev(vol->backing_dev, &req->comp_buf_iov[i], 1, req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit, vol->backing_lba_per_io_unit, &req->backing_cb_args); } else { - vol->backing_dev->readv(vol->backing_dev, &req->decomp_buf_iov[i], 1, + vol->backing_dev->readv(vol->backing_dev, &req->comp_buf_iov[i], 1, req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit, vol->backing_lba_per_io_unit, &req->backing_cb_args); } @@ -965,7 +965,8 @@ _issue_backing_ops(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol * } static void -_reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) +_reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn, + uint32_t compressed_size) { struct spdk_reduce_vol *vol = req->vol; uint32_t i; @@ -979,7 +980,7 @@ _reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn n spdk_bit_array_set(vol->allocated_chunk_maps, req->chunk_map_index); req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index); - req->chunk->compressed_size = vol->params.chunk_size; + req->chunk->compressed_size = compressed_size; for (i = 0; i < vol->backing_io_units_per_chunk; i++) { req->chunk->io_unit_index[i] = spdk_bit_array_find_first_clear(vol->allocated_backing_io_units, 0); @@ -993,13 +994,92 @@ _reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn n _issue_backing_ops(req, vol, next_fn, true /* write */); } + +static void +_write_compress_done(void *_req, int reduce_errno) +{ + struct spdk_reduce_vol_request *req = _req; + + /* Negative reduce_errno indicates failure for compression operations. */ + if (reduce_errno < 0) { + _reduce_vol_complete_req(req, reduce_errno); + return; + } + + /* Positive reduce_errno indicates number of bytes in compressed buffer. */ + _reduce_vol_write_chunk(req, _write_write_done, (uint32_t)reduce_errno); +} + +static void +_reduce_vol_compress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) +{ + struct spdk_reduce_vol *vol = req->vol; + + req->backing_cb_args.cb_fn = next_fn; + req->backing_cb_args.cb_arg = req; + req->comp_buf_iov[0].iov_base = req->comp_buf; + req->comp_buf_iov[0].iov_len = vol->params.chunk_size; + req->decomp_buf_iov[0].iov_base = req->decomp_buf; + req->decomp_buf_iov[0].iov_len = vol->params.chunk_size; + vol->backing_dev->compress(vol->backing_dev, + req->decomp_buf_iov, 1, req->comp_buf_iov, 1, + &req->backing_cb_args); +} + +static void +_reduce_vol_decompress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) +{ + struct spdk_reduce_vol *vol = req->vol; + + req->backing_cb_args.cb_fn = next_fn; + req->backing_cb_args.cb_arg = req; + req->comp_buf_iov[0].iov_base = req->comp_buf; + req->comp_buf_iov[0].iov_len = vol->params.chunk_size; + req->decomp_buf_iov[0].iov_base = req->decomp_buf; + req->decomp_buf_iov[0].iov_len = vol->params.chunk_size; + vol->backing_dev->decompress(vol->backing_dev, + req->comp_buf_iov, 1, req->decomp_buf_iov, 1, + &req->backing_cb_args); +} + +static void +_write_decompress_done(void *_req, int reduce_errno) +{ + struct spdk_reduce_vol_request *req = _req; + struct spdk_reduce_vol *vol = req->vol; + uint64_t chunk_offset; + uint8_t *buf; + int i; + + /* Negative reduce_errno indicates failure for compression operations. */ + if (reduce_errno < 0) { + _reduce_vol_complete_req(req, reduce_errno); + return; + } + + /* Positive reduce_errno indicates number of bytes in decompressed + * buffer. This should equal the chunk size - otherwise that's another + * type of failure. + */ + if ((uint32_t)reduce_errno != vol->params.chunk_size) { + _reduce_vol_complete_req(req, -EIO); + return; + } + + chunk_offset = req->offset % vol->logical_blocks_per_chunk; + buf = req->decomp_buf + chunk_offset * vol->params.logical_block_size; + for (i = 0; i < req->iovcnt; i++) { + memcpy(buf, req->iov[i].iov_base, req->iov[i].iov_len); + buf += req->iov[i].iov_len; + } + + _reduce_vol_compress_chunk(req, _write_compress_done); +} + static void _write_read_done(void *_req, int reduce_errno) { struct spdk_reduce_vol_request *req = _req; - uint64_t chunk_offset; - uint8_t *buf; - int i; if (reduce_errno != 0) { req->reduce_errno = reduce_errno; @@ -1015,26 +1095,35 @@ _write_read_done(void *_req, int reduce_errno) return; } - chunk_offset = req->offset % req->vol->logical_blocks_per_chunk; - buf = req->decomp_buf + chunk_offset * req->vol->params.logical_block_size; - for (i = 0; i < req->iovcnt; i++) { - memcpy(buf, req->iov[i].iov_base, req->iov[i].iov_len); - buf += req->iov[i].iov_len; - } - - _reduce_vol_write_chunk(req, _write_write_done); + _reduce_vol_decompress_chunk(req, _write_decompress_done); } static void -_read_complete_req(void *_req, int reduce_errno) +_read_decompress_done(void *_req, int reduce_errno) { struct spdk_reduce_vol_request *req = _req; + struct spdk_reduce_vol *vol = req->vol; uint64_t chunk_offset; uint8_t *buf; int i; - chunk_offset = req->offset % req->vol->logical_blocks_per_chunk; - buf = req->decomp_buf + chunk_offset * req->vol->params.logical_block_size; + /* Negative reduce_errno indicates failure for compression operations. */ + if (reduce_errno < 0) { + _reduce_vol_complete_req(req, reduce_errno); + return; + } + + /* Positive reduce_errno indicates number of bytes in decompressed + * buffer. This should equal the chunk size - otherwise that's another + * type of failure. + */ + if ((uint32_t)reduce_errno != vol->params.chunk_size) { + _reduce_vol_complete_req(req, -EIO); + return; + } + + chunk_offset = req->offset % vol->logical_blocks_per_chunk; + buf = req->decomp_buf + chunk_offset * vol->params.logical_block_size; for (i = 0; i < req->iovcnt; i++) { memcpy(req->iov[i].iov_base, buf, req->iov[i].iov_len); buf += req->iov[i].iov_len; @@ -1061,7 +1150,7 @@ _read_read_done(void *_req, int reduce_errno) return; } - _read_complete_req(req, 0); + _reduce_vol_decompress_chunk(req, _read_decompress_done); } static void @@ -1213,7 +1302,7 @@ _start_writev_request(struct spdk_reduce_vol_request *req) if (chunk_offset != lb_per_chunk) { memset(buf, 0, (lb_per_chunk - chunk_offset) * lbsize); } - _reduce_vol_write_chunk(req, _write_write_done); + _reduce_vol_compress_chunk(req, _write_compress_done); } void diff --git a/test/unit/lib/reduce/reduce.c/reduce_ut.c b/test/unit/lib/reduce/reduce.c/reduce_ut.c index 8d74fd36b..bb87f0d49 100644 --- a/test/unit/lib/reduce/reduce.c/reduce_ut.c +++ b/test/unit/lib/reduce/reduce.c/reduce_ut.c @@ -389,6 +389,32 @@ backing_dev_io_execute(uint32_t count) } } +static void +backing_dev_compress(struct spdk_reduce_backing_dev *backing_dev, + struct iovec *src_iov, int src_iovcnt, + struct iovec *dst_iov, int dst_iovcnt, + struct spdk_reduce_vol_cb_args *args) +{ + CU_ASSERT(src_iovcnt == 1); + CU_ASSERT(dst_iovcnt == 1); + CU_ASSERT(src_iov[0].iov_len == dst_iov[0].iov_len); + memcpy(dst_iov[0].iov_base, src_iov[0].iov_base, src_iov[0].iov_len); + args->cb_fn(args->cb_arg, src_iov[0].iov_len); +} + +static void +backing_dev_decompress(struct spdk_reduce_backing_dev *backing_dev, + struct iovec *src_iov, int src_iovcnt, + struct iovec *dst_iov, int dst_iovcnt, + struct spdk_reduce_vol_cb_args *args) +{ + CU_ASSERT(src_iovcnt == 1); + CU_ASSERT(dst_iovcnt == 1); + CU_ASSERT(src_iov[0].iov_len == dst_iov[0].iov_len); + memcpy(dst_iov[0].iov_base, src_iov[0].iov_base, src_iov[0].iov_len); + args->cb_fn(args->cb_arg, src_iov[0].iov_len); +} + static void backing_dev_destroy(struct spdk_reduce_backing_dev *backing_dev) { @@ -411,6 +437,8 @@ backing_dev_init(struct spdk_reduce_backing_dev *backing_dev, struct spdk_reduce backing_dev->readv = backing_dev_readv; backing_dev->writev = backing_dev_writev; backing_dev->unmap = backing_dev_unmap; + backing_dev->compress = backing_dev_compress; + backing_dev->decompress = backing_dev_decompress; g_backing_dev_buf = calloc(1, size); SPDK_CU_ASSERT_FATAL(g_backing_dev_buf != NULL);