diff --git a/CONFIG b/CONFIG index 54517ce56..9bddc1f6a 100644 --- a/CONFIG +++ b/CONFIG @@ -130,6 +130,9 @@ CONFIG_PMDK_DIR= # Build with xNVMe CONFIG_XNVME=n +# Enable the dependencies for building the DPDK accel compress module +CONFIG_DPDK_COMPRESSDEV=n + # Enable the dependencies for building the compress vbdev, includes the reduce library CONFIG_VBDEV_COMPRESS=n diff --git a/configure b/configure index abb08e5d1..989c51da7 100755 --- a/configure +++ b/configure @@ -76,6 +76,8 @@ function usage() { echo " --without-pmdk No path required." echo " --with-vbdev-compress Build vbdev compression module and dependencies." echo " --without-vbdev-compress No path required." + echo " --with-dpdk-compressdev Build accel DPDK compression module and dependencies." + echo " --without-dpdk-compressdev No path required." echo " --with-rbd Build Ceph RBD bdev module." echo " --without-rbd No path required." echo " --with-rdma[=DIR] Build RDMA transport for NVMf target and initiator." @@ -535,6 +537,12 @@ for i in "$@"; do --without-vbdev-compress) CONFIG[VBDEV_COMPRESS]=n ;; + --with-dpdk-compressdev) + CONFIG[DPDK_COMPRESSDEV]=y + ;; + --without-dpdk-compressdev) + CONFIG[DPDK_COMPRESSDEV]=n + ;; --with-xnvme) CONFIG[XNVME]=y ;; @@ -1192,6 +1200,7 @@ else echo "so these features will be disabled." CONFIG[CRYPTO]=n CONFIG[VBDEV_COMPRESS]=n + CONFIG[DPDK_COMPRESSDEV]=n fi # ISA-L-crypto complements ISA-L functionality, it is only enabled together with ISA-L diff --git a/doc/jsonrpc.md b/doc/jsonrpc.md index 46f239431..9fe8948cd 100644 --- a/doc/jsonrpc.md +++ b/doc/jsonrpc.md @@ -1867,6 +1867,43 @@ Example response: } ~~~ +### compressdev_scan_accel_module {#rpc_compressdev_scan_accel_module} + +Set config and enable compressdev accel module offload. +Select the DPDK polled mode driver (pmd) for the accel compress module, +0 = auto-select, 1= QAT only, 2 = mlx5_pci only. + +#### Parameters + +Name | Optional | Type | Description +----------------------- | -------- | ----------- | ----------- +pmd | Required | int | pmd selection + +#### Example + +Example request: + +~~~json +{ + "params": { + "pmd": 1 + }, + "jsonrpc": "2.0", + "method": "compressdev_scan_accel_module", + "id": 1 +} +~~~ + +Example response: + +~~~json +{ + "jsonrpc": "2.0", + "id": 1, + "result": true +} +~~~ + ### dsa_scan_accel_module {#rpc_dsa_scan_accel_module} Set config and enable dsa accel module offload. diff --git a/dpdkbuild/Makefile b/dpdkbuild/Makefile index 103234a82..cefae048c 100644 --- a/dpdkbuild/Makefile +++ b/dpdkbuild/Makefile @@ -40,7 +40,7 @@ DPDK_LIBS += kvargs telemetry DPDK_LIBS += power timer ethdev net # common crypto/compress drivers -ifeq ($(findstring y,$(CONFIG_CRYPTO)$(CONFIG_VBDEV_COMPRESS)),y) +ifeq ($(findstring y,$(CONFIG_DPDK_COMPRESSDEV)$(CONFIG_CRYPTO)$(CONFIG_VBDEV_COMPRESS)),y) DPDK_DRIVERS += crypto/qat compress/qat common/qat endif @@ -63,7 +63,7 @@ DPDK_LDFLAGS += -L$(IPSEC_MB_DIR) endif endif -ifeq ($(CONFIG_VBDEV_COMPRESS),y) +ifeq ($(findstring y,$(CONFIG_DPDK_COMPRESSDEV)$(CONFIG_VBDEV_COMPRESS)),y) DPDK_DRIVERS += compress compress/isal ifeq ($(CONFIG_VBDEV_COMPRESS_MLX5),y) DPDK_DRIVERS += compress/mlx5 diff --git a/lib/env_dpdk/env.mk b/lib/env_dpdk/env.mk index b7522a1e1..e423c970d 100644 --- a/lib/env_dpdk/env.mk +++ b/lib/env_dpdk/env.mk @@ -80,7 +80,7 @@ endif endif endif -ifeq ($(CONFIG_VBDEV_COMPRESS),y) +ifeq ($(findstring y,$(CONFIG_DPDK_COMPRESSDEV)$(CONFIG_VBDEV_COMPRESS)),y) DPDK_FRAMEWORK=y ifneq (, $(wildcard $(DPDK_LIB_DIR)/librte_compress_isal.*)) DPDK_LIB_LIST += rte_compress_isal diff --git a/mk/spdk.lib_deps.mk b/mk/spdk.lib_deps.mk index 4ad25f174..e08d9eb9d 100644 --- a/mk/spdk.lib_deps.mk +++ b/mk/spdk.lib_deps.mk @@ -98,6 +98,7 @@ DEPDIRS-accel_ioat := log ioat thread jsonrpc rpc accel DEPDIRS-accel_dsa := log idxd thread $(JSON_LIBS) accel trace DEPDIRS-accel_iaa := log idxd thread $(JSON_LIBS) accel trace DEPDIRS-accel_dpdk_cryptodev := log thread $(JSON_LIBS) accel +DEPDIRS-accel_dpdk_compressdev := log thread $(JSON_LIBS) accel util # module/env_dpdk DEPDIRS-env_dpdk_rpc := log $(JSON_LIBS) diff --git a/mk/spdk.modules.mk b/mk/spdk.modules.mk index 0ecf4fb9c..76c96fcca 100644 --- a/mk/spdk.modules.mk +++ b/mk/spdk.modules.mk @@ -104,6 +104,9 @@ endif ifeq ($(CONFIG_CRYPTO),y) ACCEL_MODULES_LIST += accel_dpdk_cryptodev endif +ifeq ($(CONFIG_DPDK_COMPRESSDEV),y) +ACCEL_MODULES_LIST += accel_dpdk_compressdev +endif SCHEDULER_MODULES_LIST = scheduler_dynamic ifeq (y,$(DPDK_POWER)) diff --git a/module/accel/Makefile b/module/accel/Makefile index 1360c2204..29958a671 100644 --- a/module/accel/Makefile +++ b/module/accel/Makefile @@ -8,7 +8,7 @@ SPDK_ROOT_DIR := $(abspath $(CURDIR)/../..) include $(SPDK_ROOT_DIR)/mk/spdk.common.mk DIRS-y = ioat - +DIRS-$(CONFIG_DPDK_COMPRESSDEV) += dpdk_compressdev DIRS-$(CONFIG_IDXD) += dsa DIRS-$(CONFIG_IDXD) += iaa DIRS-$(CONFIG_CRYPTO) += dpdk_cryptodev diff --git a/module/accel/dpdk_compressdev/Makefile b/module/accel/dpdk_compressdev/Makefile new file mode 100644 index 000000000..b2a70894c --- /dev/null +++ b/module/accel/dpdk_compressdev/Makefile @@ -0,0 +1,18 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (C) 2022 Intel Corporation. +# All rights reserved. +# + +SPDK_ROOT_DIR := $(abspath $(CURDIR)/../../..) +include $(SPDK_ROOT_DIR)/mk/spdk.common.mk + +SO_VER := 1 +SO_MINOR := 0 + +CFLAGS += $(ENV_CFLAGS) +LIBNAME = accel_dpdk_compressdev +C_SRCS = accel_dpdk_compressdev.c accel_dpdk_compressdev_rpc.c + +SPDK_MAP_FILE = $(SPDK_ROOT_DIR)/mk/spdk_blank.map + +include $(SPDK_ROOT_DIR)/mk/spdk.lib.mk diff --git a/module/accel/dpdk_compressdev/accel_dpdk_compressdev.c b/module/accel/dpdk_compressdev/accel_dpdk_compressdev.c index 0c2bb1d9f..4c7ca243a 100644 --- a/module/accel/dpdk_compressdev/accel_dpdk_compressdev.c +++ b/module/accel/dpdk_compressdev/accel_dpdk_compressdev.c @@ -4,9 +4,9 @@ * Copyright (c) 2021, 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. */ -#include "vbdev_compress.h" +#include "accel_dpdk_compressdev.h" +#include "spdk_internal/accel_module.h" -#include "spdk/reduce.h" #include "spdk/stdinc.h" #include "spdk/rpc.h" #include "spdk/env.h" @@ -14,7 +14,6 @@ #include "spdk/string.h" #include "spdk/thread.h" #include "spdk/util.h" -#include "spdk/bdev_module.h" #include "spdk/likely.h" #include "spdk/log.h" @@ -27,31 +26,24 @@ /* Used to store IO context in mbuf */ static const struct rte_mbuf_dynfield rte_mbuf_dynfield_io_context = { - .name = "context_reduce", + .name = "context_accel_comp", .size = sizeof(uint64_t), .align = __alignof__(uint64_t), .flags = 0, }; static int g_mbuf_offset; - -#define NUM_MAX_XFORMS 2 -#define NUM_MAX_INFLIGHT_OPS 128 -#define DEFAULT_WINDOW_SIZE 15 -/* We need extra mbufs per operation to accommodate host buffers that - * span a physical page boundary. - */ -#define MAX_MBUFS_PER_OP (REDUCE_MAX_IOVECS * 2) -#define CHUNK_SIZE (1024 * 16) -#define COMP_BDEV_NAME "compress" -#define BACKING_IO_SZ (4 * 1024) - -#define ISAL_PMD "compress_isal" -#define QAT_PMD "compress_qat" -#define MLX5_PMD "mlx5_pci" -#define NUM_MBUFS 8192 -#define POOL_CACHE_SIZE 256 - static enum compress_pmd g_opts; +static bool g_compressdev_enable = false; +static bool g_compressdev_initialized = false; + +#define NUM_MAX_XFORMS 2 +#define NUM_MAX_INFLIGHT_OPS 128 +#define DEFAULT_WINDOW_SIZE 15 +#define MBUF_SPLIT (1UL << DEFAULT_WINDOW_SIZE) +#define QAT_PMD "compress_qat" +#define MLX5_PMD "mlx5_pci" +#define NUM_MBUFS 65536 +#define POOL_CACHE_SIZE 256 /* Global list of available compression devices. */ struct compress_dev { @@ -59,15 +51,12 @@ struct compress_dev { uint8_t cdev_id; /* identifier for the device */ void *comp_xform; /* shared private xform for comp on this PMD */ void *decomp_xform; /* shared private xform for decomp on this PMD */ + bool sgl_in; + bool sgl_out; TAILQ_ENTRY(compress_dev) link; }; static TAILQ_HEAD(, compress_dev) g_compress_devs = TAILQ_HEAD_INITIALIZER(g_compress_devs); -/* Although ISAL PMD reports 'unlimited' qpairs, it has an unplanned limit of 99 due to - * the length of the internal ring name that it creates, it breaks a limit in the generic - * ring code and fails the qp initialization. - * FIXME: Reduce number of qpairs to 48, due to issue #2338 - */ #define MAX_NUM_QP 48 /* Global list and lock for unique device/queue pair combos */ struct comp_device_qp { @@ -79,73 +68,20 @@ struct comp_device_qp { static TAILQ_HEAD(, comp_device_qp) g_comp_device_qp = TAILQ_HEAD_INITIALIZER(g_comp_device_qp); static pthread_mutex_t g_comp_device_qp_lock = PTHREAD_MUTEX_INITIALIZER; -/* For queueing up compression operations that we can't submit for some reason */ -struct vbdev_comp_op { - struct spdk_reduce_backing_dev *backing_dev; - struct iovec *src_iovs; - int src_iovcnt; - struct iovec *dst_iovs; - int dst_iovcnt; - bool compress; - void *cb_arg; - TAILQ_ENTRY(vbdev_comp_op) link; -}; - -struct vbdev_comp_delete_ctx { - spdk_delete_compress_complete cb_fn; - void *cb_arg; - int cb_rc; - struct spdk_thread *orig_thread; -}; - -/* List of virtual bdevs and associated info for each. */ -struct vbdev_compress { - struct spdk_bdev *base_bdev; /* the thing we're attaching to */ - struct spdk_bdev_desc *base_desc; /* its descriptor we get from open */ - struct spdk_io_channel *base_ch; /* IO channel of base device */ - struct spdk_bdev comp_bdev; /* the compression virtual bdev */ - struct comp_io_channel *comp_ch; /* channel associated with this bdev */ +struct compress_io_channel { char *drv_name; /* name of the compression device driver */ struct comp_device_qp *device_qp; - struct spdk_thread *reduce_thread; - pthread_mutex_t reduce_lock; - uint32_t ch_count; - TAILQ_HEAD(, spdk_bdev_io) pending_comp_ios; /* outstanding operations to a comp library */ - struct spdk_poller *poller; /* completion poller */ - struct spdk_reduce_vol_params params; /* params for the reduce volume */ - struct spdk_reduce_backing_dev backing_dev; /* backing device info for the reduce volume */ - struct spdk_reduce_vol *vol; /* the reduce volume */ - struct vbdev_comp_delete_ctx *delete_ctx; - bool orphaned; /* base bdev claimed but comp_bdev not registered */ - int reduce_errno; - TAILQ_HEAD(, vbdev_comp_op) queued_comp_ops; - TAILQ_ENTRY(vbdev_compress) link; - struct spdk_thread *thread; /* thread where base device is opened */ -}; -static TAILQ_HEAD(, vbdev_compress) g_vbdev_comp = TAILQ_HEAD_INITIALIZER(g_vbdev_comp); - -/* The comp vbdev channel struct. It is allocated and freed on my behalf by the io channel code. - */ -struct comp_io_channel { - struct spdk_io_channel_iter *iter; /* used with for_each_channel in reset */ -}; - -/* Per I/O context for the compression vbdev. */ -struct comp_bdev_io { - struct comp_io_channel *comp_ch; /* used in completion handling */ - struct vbdev_compress *comp_bdev; /* vbdev associated with this IO */ - struct spdk_bdev_io_wait_entry bdev_io_wait; /* for bdev_io_wait */ - struct spdk_bdev_io *orig_io; /* the original IO */ - struct spdk_io_channel *ch; /* for resubmission */ - int status; /* save for completion on orig thread */ + struct spdk_poller *poller; + struct rte_mbuf **src_mbufs; + struct rte_mbuf **dst_mbufs; + TAILQ_HEAD(, spdk_accel_task) queued_tasks; }; /* Shared mempools between all devices on this system */ -static struct rte_mempool *g_mbuf_mp = NULL; /* mbuf mempool */ -static struct rte_mempool *g_comp_op_mp = NULL; /* comp operations, must be rte* mempool */ -static struct rte_mbuf_ext_shared_info g_shinfo = {}; /* used by DPDK mbuf macros */ +static struct rte_mempool *g_mbuf_mp = NULL; /* mbuf mempool */ +static struct rte_mempool *g_comp_op_mp = NULL; /* comp operations, must be rte* mempool */ +static struct rte_mbuf_ext_shared_info g_shinfo = {}; /* used by DPDK mbuf macros */ static bool g_qat_available = false; -static bool g_isal_available = false; static bool g_mlx5_pci_available = false; /* Create shared (between all ops per PMD) compress xforms. */ @@ -171,14 +107,6 @@ static struct rte_comp_xform g_decomp_xform = { } }; -static void vbdev_compress_examine(struct spdk_bdev *bdev); -static int vbdev_compress_claim(struct vbdev_compress *comp_bdev); -static void vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io); -struct vbdev_compress *_prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size); -static void vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io); -static void comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf); -static void vbdev_compress_delete_done(void *cb_arg, int bdeverrno); - /* Dummy function used by DPDK to free ext attached buffers * to mbufs, we free them ourselves but this callback has to * be here. @@ -188,7 +116,7 @@ shinfo_free_cb(void *arg1, void *arg2) { } -/* Called by vbdev_init_compress_drivers() to init each discovered compression device */ +/* Called by accel_init_compress_drivers() to init each discovered compression device */ static int create_compress_dev(uint8_t index) { @@ -299,9 +227,7 @@ create_compress_dev(uint8_t index) if (strcmp(device->cdev_info.driver_name, QAT_PMD) == 0) { g_qat_available = true; } - if (strcmp(device->cdev_info.driver_name, ISAL_PMD) == 0) { - g_isal_available = true; - } + if (strcmp(device->cdev_info.driver_name, MLX5_PMD) == 0) { g_mlx5_pci_available = true; } @@ -317,26 +243,15 @@ err: return rc; } -/* Called from driver init entry point, vbdev_compress_init() */ +/* Called from driver init entry point, accel_compress_init() */ static int -vbdev_init_compress_drivers(void) +accel_init_compress_drivers(void) { uint8_t cdev_count, i; struct compress_dev *tmp_dev; struct compress_dev *device; int rc; - /* We always init the compress_isal PMD */ - rc = rte_vdev_init(ISAL_PMD, NULL); - if (rc == 0) { - SPDK_NOTICELOG("created virtual PMD %s\n", ISAL_PMD); - } else if (rc == -EEXIST) { - SPDK_NOTICELOG("virtual PMD %s already exists.\n", ISAL_PMD); - } else { - SPDK_ERRLOG("creating virtual PMD %s\n", ISAL_PMD); - return -EINVAL; - } - /* If we have no compression devices, there's no reason to continue. */ cdev_count = rte_compressdev_count(); if (cdev_count == 0) { @@ -398,47 +313,21 @@ error_create_mbuf: return rc; } -/* for completing rw requests on the orig IO thread. */ -static void -_reduce_rw_blocks_cb(void *arg) +int +accel_compressdev_enable_probe(enum compress_pmd *opts) { - struct comp_bdev_io *io_ctx = arg; + g_opts = *opts; + g_compressdev_enable = true; - if (spdk_likely(io_ctx->status == 0)) { - spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS); - } else if (io_ctx->status == -ENOMEM) { - vbdev_compress_queue_io(spdk_bdev_io_from_ctx(io_ctx)); - } else { - SPDK_ERRLOG("status %d on operation from reduce API\n", io_ctx->status); - spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED); - } -} - -/* Completion callback for r/w that were issued via reducelib. */ -static void -reduce_rw_blocks_cb(void *arg, int reduce_errno) -{ - struct spdk_bdev_io *bdev_io = arg; - struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; - struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch); - struct spdk_thread *orig_thread; - - /* TODO: need to decide which error codes are bdev_io success vs failure; - * example examine calls reading metadata */ - - io_ctx->status = reduce_errno; - - /* Send this request to the orig IO thread. */ - orig_thread = spdk_io_channel_get_thread(ch); - - spdk_thread_exec_msg(orig_thread, _reduce_rw_blocks_cb, io_ctx); + return 0; } static int _setup_compress_mbuf(struct rte_mbuf **mbufs, int *mbuf_total, uint64_t *total_length, - struct iovec *iovs, int iovcnt, void *reduce_cb_arg) + struct iovec *iovs, int iovcnt, struct spdk_accel_task *task) { - uint64_t updated_length, remainder, phys_addr; + uint64_t iovec_length, updated_length, phys_addr; + uint64_t processed, mbuf_length, remainder; uint8_t *current_base = NULL; int iov_index, mbuf_index; int rc = 0; @@ -447,84 +336,126 @@ _setup_compress_mbuf(struct rte_mbuf **mbufs, int *mbuf_total, uint64_t *total_l iov_index = mbuf_index = 0; while (iov_index < iovcnt) { + processed = 0; + iovec_length = iovs[iov_index].iov_len; + current_base = iovs[iov_index].iov_base; if (total_length) { - *total_length += iovs[iov_index].iov_len; + *total_length += iovec_length; } + assert(mbufs[mbuf_index] != NULL); - *RTE_MBUF_DYNFIELD(mbufs[mbuf_index], g_mbuf_offset, uint64_t *) = (uint64_t)reduce_cb_arg; - updated_length = iovs[iov_index].iov_len; - phys_addr = spdk_vtophys((void *)current_base, &updated_length); + *RTE_MBUF_DYNFIELD(mbufs[mbuf_index], g_mbuf_offset, uint64_t *) = (uint64_t)task; - rte_pktmbuf_attach_extbuf(mbufs[mbuf_index], - current_base, - phys_addr, - updated_length, - &g_shinfo); - rte_pktmbuf_append(mbufs[mbuf_index], updated_length); - remainder = iovs[iov_index].iov_len - updated_length; + do { + /* new length is min of remaining left or max mbuf size of MBUF_SPLIT */ + mbuf_length = updated_length = spdk_min(MBUF_SPLIT, iovec_length - processed); - if (mbuf_index > 0) { - rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]); - } - - /* If we crossed 2 physical pages boundary we need another mbuf for the remainder */ - if (remainder > 0) { - /* allocate an mbuf at the end of the array */ - rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, - (struct rte_mbuf **)&mbufs[*mbuf_total], 1); - if (rc) { - SPDK_ERRLOG("ERROR trying to get an extra mbuf!\n"); - return -1; - } - (*mbuf_total)++; - mbuf_index++; - *RTE_MBUF_DYNFIELD(mbufs[mbuf_index], g_mbuf_offset, uint64_t *) = (uint64_t)reduce_cb_arg; - current_base += updated_length; - phys_addr = spdk_vtophys((void *)current_base, &remainder); - /* assert we don't cross another */ - assert(remainder == iovs[iov_index].iov_len - updated_length); + phys_addr = spdk_vtophys((void *)current_base, &updated_length); rte_pktmbuf_attach_extbuf(mbufs[mbuf_index], current_base, phys_addr, - remainder, + updated_length, &g_shinfo); - rte_pktmbuf_append(mbufs[mbuf_index], remainder); - rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]); - } + rte_pktmbuf_append(mbufs[mbuf_index], updated_length); + remainder = mbuf_length - updated_length; + + /* although the mbufs were preallocated, we still need to chain them */ + if (mbuf_index > 0) { + rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]); + } + + /* keep track of the total we've put into the mbuf chain */ + processed += updated_length; + /* bump the base by what was previously added */ + current_base += updated_length; + + /* If we crossed 2MB boundary we need another mbuf for the remainder */ + if (remainder > 0) { + + assert(remainder <= MBUF_SPLIT); + + /* allocate an mbuf at the end of the array */ + rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, + (struct rte_mbuf **)&mbufs[*mbuf_total], 1); + if (rc) { + SPDK_ERRLOG("ERROR trying to get an extra mbuf!\n"); + return -1; + } + (*mbuf_total)++; + mbuf_index++; + *RTE_MBUF_DYNFIELD(mbufs[mbuf_index], g_mbuf_offset, uint64_t *) = (uint64_t)task; + + /* bump the base by what was previously added */ + current_base += updated_length; + + updated_length = remainder; + phys_addr = spdk_vtophys((void *)current_base, &updated_length); + + /* assert we don't cross another */ + assert(remainder == updated_length); + + rte_pktmbuf_attach_extbuf(mbufs[mbuf_index], + current_base, + phys_addr, + remainder, + &g_shinfo); + rte_pktmbuf_append(mbufs[mbuf_index], remainder); + rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]); + + /* keep track of the total we've put into the mbuf chain */ + processed += remainder; + } + + mbuf_index++; + + } while (processed < iovec_length); + + assert(processed == iovec_length); iov_index++; - mbuf_index++; } return 0; } static int -_compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *src_iovs, - int src_iovcnt, struct iovec *dst_iovs, - int dst_iovcnt, bool compress, void *cb_arg) +_compress_operation(struct compress_io_channel *chan, struct spdk_accel_task *task) { - void *reduce_cb_arg = cb_arg; - struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_dev, struct vbdev_compress, - backing_dev); + int dst_iovcnt = task->d.iovcnt; + struct iovec *dst_iovs = task->d.iovs; + int src_iovcnt = task->s.iovcnt; + struct iovec *src_iovs = task->s.iovs; struct rte_comp_op *comp_op; - struct rte_mbuf *src_mbufs[MAX_MBUFS_PER_OP]; - struct rte_mbuf *dst_mbufs[MAX_MBUFS_PER_OP]; - uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id; + uint8_t cdev_id; uint64_t total_length = 0; - int rc = 0; - struct vbdev_comp_op *op_to_queue; - int src_mbuf_total = src_iovcnt; - int dst_mbuf_total = dst_iovcnt; + int rc = 0, i; + int src_mbuf_total = 0; + int dst_mbuf_total = 0; bool device_error = false; + bool compress = (task->op_code == ACCEL_OPC_COMPRESS); - assert(src_iovcnt < MAX_MBUFS_PER_OP); + assert(chan->device_qp->device != NULL); + cdev_id = chan->device_qp->device->cdev_id; -#ifdef DEBUG - memset(src_mbufs, 0, sizeof(src_mbufs)); - memset(dst_mbufs, 0, sizeof(dst_mbufs)); -#endif + /* Much of this code was ported from the vbdev compress module where iovecs were + * supported on dst. In the accel_fw they are not however lets preserve the full + * functionality of the code and make a simple iovec out of the dst. + */ + if (task->d.iovcnt == 0) { + assert(task->dst != NULL); + dst_iovcnt = 1; + dst_iovs[0].iov_base = task->dst; + dst_iovs[0].iov_len = task->nbytes_dst; + } + + /* calc our mbuf totals based on max MBUF size allowed so we can pre-alloc mbufs in bulk */ + for (i = 0 ; i < src_iovcnt; i++) { + src_mbuf_total += spdk_divide_round_up(src_iovs[i].iov_len, MBUF_SPLIT); + } + for (i = 0 ; i < dst_iovcnt; i++) { + dst_mbuf_total += spdk_divide_round_up(dst_iovs[i].iov_len, MBUF_SPLIT); + } comp_op = rte_comp_op_alloc(g_comp_op_mp); if (!comp_op) { @@ -534,76 +465,64 @@ _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *s } /* get an mbuf per iov, src and dst */ - rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&src_mbufs[0], src_iovcnt); + rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, chan->src_mbufs, src_mbuf_total); if (rc) { SPDK_ERRLOG("ERROR trying to get src_mbufs!\n"); rc = -ENOMEM; goto error_get_src; } - assert(src_mbufs[0]); + assert(chan->src_mbufs[0]); - rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&dst_mbufs[0], dst_iovcnt); + rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, chan->dst_mbufs, dst_mbuf_total); if (rc) { SPDK_ERRLOG("ERROR trying to get dst_mbufs!\n"); rc = -ENOMEM; goto error_get_dst; } - assert(dst_mbufs[0]); + assert(chan->dst_mbufs[0]); + + rc = _setup_compress_mbuf(chan->src_mbufs, &src_mbuf_total, &total_length, + src_iovs, src_iovcnt, task); - /* There is a 1:1 mapping between a bdev_io and a compression operation - * Some PMDs that SPDK uses don't support chaining, but reduce library should - * provide correct buffers - * Build our mbuf chain and associate it with our single comp_op. - */ - rc = _setup_compress_mbuf(src_mbufs, &src_mbuf_total, &total_length, - src_iovs, src_iovcnt, reduce_cb_arg); if (rc < 0) { goto error_src_dst; } - if (!comp_bdev->backing_dev.sgl_in && src_mbufs[0]->next != NULL) { - if (src_iovcnt == 1) { - SPDK_ERRLOG("Src buffer crosses physical page boundary but driver %s doesn't support SGL input\n", - comp_bdev->drv_name); - } else { - SPDK_ERRLOG("Driver %s doesn't support SGL input\n", comp_bdev->drv_name); - } + if (!chan->device_qp->device->sgl_in && src_mbuf_total > 1) { + SPDK_ERRLOG("Src buffer uses chained mbufs but driver %s doesn't support SGL input\n", + chan->drv_name); rc = -EINVAL; goto error_src_dst; } - comp_op->m_src = src_mbufs[0]; + comp_op->m_src = chan->src_mbufs[0]; comp_op->src.offset = 0; comp_op->src.length = total_length; - rc = _setup_compress_mbuf(dst_mbufs, &dst_mbuf_total, NULL, - dst_iovs, dst_iovcnt, reduce_cb_arg); + rc = _setup_compress_mbuf(chan->dst_mbufs, &dst_mbuf_total, NULL, + dst_iovs, dst_iovcnt, task); if (rc < 0) { goto error_src_dst; } - if (!comp_bdev->backing_dev.sgl_out && dst_mbufs[0]->next != NULL) { - if (dst_iovcnt == 1) { - SPDK_ERRLOG("Dst buffer crosses physical page boundary but driver %s doesn't support SGL output\n", - comp_bdev->drv_name); - } else { - SPDK_ERRLOG("Driver %s doesn't support SGL output\n", comp_bdev->drv_name); - } + if (!chan->device_qp->device->sgl_out && dst_mbuf_total > 1) { + SPDK_ERRLOG("Dst buffer uses chained mbufs but driver %s doesn't support SGL output\n", + chan->drv_name); rc = -EINVAL; goto error_src_dst; } - comp_op->m_dst = dst_mbufs[0]; + comp_op->m_dst = chan->dst_mbufs[0]; comp_op->dst.offset = 0; if (compress == true) { - comp_op->private_xform = comp_bdev->device_qp->device->comp_xform; + comp_op->private_xform = chan->device_qp->device->comp_xform; } else { - comp_op->private_xform = comp_bdev->device_qp->device->decomp_xform; + comp_op->private_xform = chan->device_qp->device->decomp_xform; } comp_op->op_type = RTE_COMP_OP_STATELESS; comp_op->flush_flag = RTE_COMP_FLUSH_FINAL; - rc = rte_compressdev_enqueue_burst(cdev_id, comp_bdev->device_qp->qp, &comp_op, 1); + rc = rte_compressdev_enqueue_burst(cdev_id, chan->device_qp->qp, &comp_op, 1); assert(rc <= 1); /* We always expect 1 got queued, if 0 then we need to queue it up. */ @@ -617,9 +536,9 @@ _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *s /* Error cleanup paths. */ error_src_dst: - rte_pktmbuf_free_bulk(dst_mbufs, dst_iovcnt); + rte_pktmbuf_free_bulk(chan->dst_mbufs, dst_iovcnt); error_get_dst: - rte_pktmbuf_free_bulk(src_mbufs, src_iovcnt); + rte_pktmbuf_free_bulk(chan->src_mbufs, src_iovcnt); error_get_src: rte_comp_op_free(comp_op); error_get_op: @@ -635,21 +554,7 @@ error_get_op: return rc; } - op_to_queue = calloc(1, sizeof(struct vbdev_comp_op)); - if (op_to_queue == NULL) { - SPDK_ERRLOG("unable to allocate operation for queueing.\n"); - return -ENOMEM; - } - op_to_queue->backing_dev = backing_dev; - op_to_queue->src_iovs = src_iovs; - op_to_queue->src_iovcnt = src_iovcnt; - op_to_queue->dst_iovs = dst_iovs; - op_to_queue->dst_iovcnt = dst_iovcnt; - op_to_queue->compress = compress; - op_to_queue->cb_arg = cb_arg; - TAILQ_INSERT_TAIL(&comp_bdev->queued_comp_ops, - op_to_queue, - link); + TAILQ_INSERT_TAIL(&chan->queued_tasks, task, link); return 0; } @@ -657,31 +562,37 @@ error_get_op: static int comp_dev_poller(void *args) { - struct vbdev_compress *comp_bdev = args; - uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id; + struct compress_io_channel *chan = args; + uint8_t cdev_id; struct rte_comp_op *deq_ops[NUM_MAX_INFLIGHT_OPS]; uint16_t num_deq; - struct spdk_reduce_vol_cb_args *reduce_args; - struct vbdev_comp_op *op_to_resubmit; - int rc, i; + struct spdk_accel_task *task, *task_to_resubmit; + int rc, i, status; - num_deq = rte_compressdev_dequeue_burst(cdev_id, comp_bdev->device_qp->qp, deq_ops, + assert(chan->device_qp->device != NULL); + cdev_id = chan->device_qp->device->cdev_id; + + num_deq = rte_compressdev_dequeue_burst(cdev_id, chan->device_qp->qp, deq_ops, NUM_MAX_INFLIGHT_OPS); for (i = 0; i < num_deq; i++) { - reduce_args = (struct spdk_reduce_vol_cb_args *)*RTE_MBUF_DYNFIELD(deq_ops[i]->m_src, g_mbuf_offset, + + /* We store this off regardless of success/error so we know how to contruct the + * next task + */ + task = (struct spdk_accel_task *)*RTE_MBUF_DYNFIELD(deq_ops[i]->m_src, g_mbuf_offset, uint64_t *); - if (deq_ops[i]->status == RTE_COMP_OP_STATUS_SUCCESS) { + status = deq_ops[i]->status; - /* tell reduce this is done and what the bytecount was */ - reduce_args->cb_fn(reduce_args->cb_arg, deq_ops[i]->produced); + if (spdk_likely(status == RTE_COMP_OP_STATUS_SUCCESS)) { + if (task->op_code == ACCEL_OPC_COMPRESS) { + *task->output_size = deq_ops[i]->produced; + } } else { - SPDK_NOTICELOG("FYI storing data uncompressed due to deque status %u\n", - deq_ops[i]->status); - - /* Reduce will simply store uncompressed on neg errno value. */ - reduce_args->cb_fn(reduce_args->cb_arg, -EINVAL); + SPDK_NOTICELOG("Deque status %u\n", status); } + spdk_accel_task_complete(task, status); + /* Now free both mbufs and the compress operation. The rte_pktmbuf_free() * call takes care of freeing all of the mbufs in the chain back to their * original pool. @@ -698,1175 +609,297 @@ comp_dev_poller(void *args) /* Check if there are any pending comp ops to process, only pull one * at a time off as _compress_operation() may re-queue the op. */ - if (!TAILQ_EMPTY(&comp_bdev->queued_comp_ops)) { - op_to_resubmit = TAILQ_FIRST(&comp_bdev->queued_comp_ops); - rc = _compress_operation(op_to_resubmit->backing_dev, - op_to_resubmit->src_iovs, - op_to_resubmit->src_iovcnt, - op_to_resubmit->dst_iovs, - op_to_resubmit->dst_iovcnt, - op_to_resubmit->compress, - op_to_resubmit->cb_arg); + if (!TAILQ_EMPTY(&chan->queued_tasks)) { + task_to_resubmit = TAILQ_FIRST(&chan->queued_tasks); + rc = _compress_operation(chan, task_to_resubmit); if (rc == 0) { - TAILQ_REMOVE(&comp_bdev->queued_comp_ops, op_to_resubmit, link); - free(op_to_resubmit); + TAILQ_REMOVE(&chan->queued_tasks, task_to_resubmit, link); } } } + return num_deq == 0 ? SPDK_POLLER_IDLE : SPDK_POLLER_BUSY; } -/* Entry point for reduce lib to issue a compress operation. */ -static void -_comp_reduce_compress(struct spdk_reduce_backing_dev *dev, - struct iovec *src_iovs, int src_iovcnt, - struct iovec *dst_iovs, int dst_iovcnt, - struct spdk_reduce_vol_cb_args *cb_arg) +static int +_process_single_task(struct spdk_io_channel *ch, struct spdk_accel_task *task) { + struct compress_io_channel *chan = spdk_io_channel_get_ctx(ch); int rc; - rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, true, cb_arg); + rc = _compress_operation(chan, task); if (rc) { - SPDK_ERRLOG("with compress operation code %d (%s)\n", rc, spdk_strerror(-rc)); - cb_arg->cb_fn(cb_arg->cb_arg, rc); - } -} - -/* Entry point for reduce lib to issue a decompress operation. */ -static void -_comp_reduce_decompress(struct spdk_reduce_backing_dev *dev, - struct iovec *src_iovs, int src_iovcnt, - struct iovec *dst_iovs, int dst_iovcnt, - struct spdk_reduce_vol_cb_args *cb_arg) -{ - int rc; - - rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, false, cb_arg); - if (rc) { - SPDK_ERRLOG("with decompress operation code %d (%s)\n", rc, spdk_strerror(-rc)); - cb_arg->cb_fn(cb_arg->cb_arg, rc); - } -} - -static void -_comp_submit_write(void *ctx) -{ - struct spdk_bdev_io *bdev_io = ctx; - struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, - comp_bdev); - - spdk_reduce_vol_writev(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, - bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, - reduce_rw_blocks_cb, bdev_io); -} - -static void -_comp_submit_read(void *ctx) -{ - struct spdk_bdev_io *bdev_io = ctx; - struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, - comp_bdev); - - spdk_reduce_vol_readv(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, - bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, - reduce_rw_blocks_cb, bdev_io); -} - - -/* Callback for getting a buf from the bdev pool in the event that the caller passed - * in NULL, we need to own the buffer so it doesn't get freed by another vbdev module - * beneath us before we're done with it. - */ -static void -comp_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success) -{ - struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, - comp_bdev); - - if (spdk_unlikely(!success)) { - SPDK_ERRLOG("Failed to get data buffer\n"); - reduce_rw_blocks_cb(bdev_io, -ENOMEM); - return; - } - - spdk_thread_exec_msg(comp_bdev->reduce_thread, _comp_submit_read, bdev_io); -} - -/* Called when someone above submits IO to this vbdev. */ -static void -vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) -{ - struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; - struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, - comp_bdev); - struct comp_io_channel *comp_ch = spdk_io_channel_get_ctx(ch); - - memset(io_ctx, 0, sizeof(struct comp_bdev_io)); - io_ctx->comp_bdev = comp_bdev; - io_ctx->comp_ch = comp_ch; - io_ctx->orig_io = bdev_io; - - switch (bdev_io->type) { - case SPDK_BDEV_IO_TYPE_READ: - spdk_bdev_io_get_buf(bdev_io, comp_read_get_buf_cb, - bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); - return; - case SPDK_BDEV_IO_TYPE_WRITE: - spdk_thread_exec_msg(comp_bdev->reduce_thread, _comp_submit_write, bdev_io); - return; - /* TODO support RESET in future patch in the series */ - case SPDK_BDEV_IO_TYPE_RESET: - case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: - case SPDK_BDEV_IO_TYPE_UNMAP: - case SPDK_BDEV_IO_TYPE_FLUSH: - default: - SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type); - spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED); - break; - } -} - -static bool -vbdev_compress_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type) -{ - struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; - - switch (io_type) { - case SPDK_BDEV_IO_TYPE_READ: - case SPDK_BDEV_IO_TYPE_WRITE: - return spdk_bdev_io_type_supported(comp_bdev->base_bdev, io_type); - case SPDK_BDEV_IO_TYPE_UNMAP: - case SPDK_BDEV_IO_TYPE_RESET: - case SPDK_BDEV_IO_TYPE_FLUSH: - case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: - default: - return false; - } -} - -/* Resubmission function used by the bdev layer when a queued IO is ready to be - * submitted. - */ -static void -vbdev_compress_resubmit_io(void *arg) -{ - struct spdk_bdev_io *bdev_io = (struct spdk_bdev_io *)arg; - struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; - - vbdev_compress_submit_request(io_ctx->ch, bdev_io); -} - -/* Used to queue an IO in the event of resource issues. */ -static void -vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io) -{ - struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; - int rc; - - io_ctx->bdev_io_wait.bdev = bdev_io->bdev; - io_ctx->bdev_io_wait.cb_fn = vbdev_compress_resubmit_io; - io_ctx->bdev_io_wait.cb_arg = bdev_io; - - rc = spdk_bdev_queue_io_wait(bdev_io->bdev, io_ctx->comp_bdev->base_ch, &io_ctx->bdev_io_wait); - if (rc) { - SPDK_ERRLOG("Queue io failed in vbdev_compress_queue_io, rc=%d.\n", rc); + SPDK_ERRLOG("Error (%d) in comrpess operation\n", rc); assert(false); - spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); - } -} - -/* Callback for unregistering the IO device. */ -static void -_device_unregister_cb(void *io_device) -{ - struct vbdev_compress *comp_bdev = io_device; - - /* Done with this comp_bdev. */ - pthread_mutex_destroy(&comp_bdev->reduce_lock); - free(comp_bdev->comp_bdev.name); - free(comp_bdev); -} - -static void -_vbdev_compress_destruct_cb(void *ctx) -{ - struct vbdev_compress *comp_bdev = ctx; - - TAILQ_REMOVE(&g_vbdev_comp, comp_bdev, link); - spdk_bdev_module_release_bdev(comp_bdev->base_bdev); - /* Close the underlying bdev on its same opened thread. */ - spdk_bdev_close(comp_bdev->base_desc); - comp_bdev->vol = NULL; - if (comp_bdev->orphaned == false) { - spdk_io_device_unregister(comp_bdev, _device_unregister_cb); - } else { - vbdev_compress_delete_done(comp_bdev->delete_ctx, 0); - _device_unregister_cb(comp_bdev); - } -} - -static void -vbdev_compress_destruct_cb(void *cb_arg, int reduce_errno) -{ - struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; - - if (reduce_errno) { - SPDK_ERRLOG("number %d\n", reduce_errno); - } else { - if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) { - spdk_thread_send_msg(comp_bdev->thread, - _vbdev_compress_destruct_cb, comp_bdev); - } else { - _vbdev_compress_destruct_cb(comp_bdev); - } - } -} - -static void -_reduce_destroy_cb(void *ctx, int reduce_errno) -{ - struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; - - if (reduce_errno) { - SPDK_ERRLOG("number %d\n", reduce_errno); - } - - comp_bdev->vol = NULL; - spdk_put_io_channel(comp_bdev->base_ch); - if (comp_bdev->orphaned == false) { - spdk_bdev_unregister(&comp_bdev->comp_bdev, vbdev_compress_delete_done, - comp_bdev->delete_ctx); - } else { - vbdev_compress_destruct_cb((void *)comp_bdev, 0); } + return rc; } -static void -_delete_vol_unload_cb(void *ctx) -{ - struct vbdev_compress *comp_bdev = ctx; - - /* FIXME: Assert if these conditions are not satisfied for now. */ - assert(!comp_bdev->reduce_thread || - comp_bdev->reduce_thread == spdk_get_thread()); - - /* reducelib needs a channel to comm with the backing device */ - comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc); - - /* Clean the device before we free our resources. */ - spdk_reduce_vol_destroy(&comp_bdev->backing_dev, _reduce_destroy_cb, comp_bdev); -} - -/* Called by reduceLib after performing unload vol actions */ -static void -delete_vol_unload_cb(void *cb_arg, int reduce_errno) -{ - struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; - - if (reduce_errno) { - SPDK_ERRLOG("number %d\n", reduce_errno); - /* FIXME: callback should be executed. */ - return; - } - - pthread_mutex_lock(&comp_bdev->reduce_lock); - if (comp_bdev->reduce_thread && comp_bdev->reduce_thread != spdk_get_thread()) { - spdk_thread_send_msg(comp_bdev->reduce_thread, - _delete_vol_unload_cb, comp_bdev); - pthread_mutex_unlock(&comp_bdev->reduce_lock); - } else { - pthread_mutex_unlock(&comp_bdev->reduce_lock); - - _delete_vol_unload_cb(comp_bdev); - } -} - -const char * -compress_get_name(const struct vbdev_compress *comp_bdev) -{ - return comp_bdev->comp_bdev.name; -} - -struct vbdev_compress * -compress_bdev_first(void) -{ - struct vbdev_compress *comp_bdev; - - comp_bdev = TAILQ_FIRST(&g_vbdev_comp); - - return comp_bdev; -} - -struct vbdev_compress * -compress_bdev_next(struct vbdev_compress *prev) -{ - struct vbdev_compress *comp_bdev; - - comp_bdev = TAILQ_NEXT(prev, link); - - return comp_bdev; -} - -bool -compress_has_orphan(const char *name) -{ - struct vbdev_compress *comp_bdev; - - TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { - if (comp_bdev->orphaned && strcmp(name, comp_bdev->comp_bdev.name) == 0) { - return true; - } - } - return false; -} - -/* Called after we've unregistered following a hot remove callback. - * Our finish entry point will be called next. - */ static int -vbdev_compress_destruct(void *ctx) +compress_submit_tasks(struct spdk_io_channel *ch, struct spdk_accel_task *first_task) { - struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; + struct compress_io_channel *chan = spdk_io_channel_get_ctx(ch); + struct spdk_accel_task *task, *tmp; + int rc = 0; - if (comp_bdev->vol != NULL) { - /* Tell reducelib that we're done with this volume. */ - spdk_reduce_vol_unload(comp_bdev->vol, vbdev_compress_destruct_cb, comp_bdev); - } else { - vbdev_compress_destruct_cb(comp_bdev, 0); + task = first_task; + + if (!TAILQ_EMPTY(&chan->queued_tasks)) { + goto queue_tasks; } - return 0; -} - -/* We supplied this as an entry point for upper layers who want to communicate to this - * bdev. This is how they get a channel. - */ -static struct spdk_io_channel * -vbdev_compress_get_io_channel(void *ctx) -{ - struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; - - /* The IO channel code will allocate a channel for us which consists of - * the SPDK channel structure plus the size of our comp_io_channel struct - * that we passed in when we registered our IO device. It will then call - * our channel create callback to populate any elements that we need to - * update. + /* The caller will either submit a single task or a group of tasks that are + * linked together but they cannot be on a list. For example, see poller + * where a list of queued tasks is being resubmitted, the list they are on + * is initialized after saving off the first task from the list which is then + * passed in here. Similar thing is done in the accel framework. */ - return spdk_get_io_channel(comp_bdev); -} + while (task) { + tmp = TAILQ_NEXT(task, link); + rc = _process_single_task(ch, task); -/* This is the output for bdev_get_bdevs() for this vbdev */ -static int -vbdev_compress_dump_info_json(void *ctx, struct spdk_json_write_ctx *w) -{ - struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; - - spdk_json_write_name(w, "compress"); - spdk_json_write_object_begin(w); - spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev)); - spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev)); - spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name); - spdk_json_write_object_end(w); + if (rc == -EBUSY) { + goto queue_tasks; + } else if (rc) { + spdk_accel_task_complete(task, rc); + } + task = tmp; + } return 0; -} -/* This is used to generate JSON that can configure this module to its current state. */ -static int -vbdev_compress_config_json(struct spdk_json_write_ctx *w) -{ - struct vbdev_compress *comp_bdev; - - TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { - spdk_json_write_object_begin(w); - spdk_json_write_named_string(w, "method", "bdev_compress_create"); - spdk_json_write_named_object_begin(w, "params"); - spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev)); - spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev)); - spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name); - spdk_json_write_object_end(w); - spdk_json_write_object_end(w); +queue_tasks: + while (task != NULL) { + tmp = TAILQ_NEXT(task, link); + TAILQ_INSERT_TAIL(&chan->queued_tasks, task, link); + task = tmp; } return 0; } -static void -_vbdev_reduce_init_cb(void *ctx) -{ - struct vbdev_compress *meta_ctx = ctx; - int rc; - - assert(meta_ctx->base_desc != NULL); - - /* We're done with metadata operations */ - spdk_put_io_channel(meta_ctx->base_ch); - - if (meta_ctx->vol) { - rc = vbdev_compress_claim(meta_ctx); - if (rc == 0) { - return; - } - } - - /* Close the underlying bdev on its same opened thread. */ - spdk_bdev_close(meta_ctx->base_desc); - free(meta_ctx); -} - -/* Callback from reduce for when init is complete. We'll pass the vbdev_comp struct - * used for initial metadata operations to claim where it will be further filled out - * and added to the global list. - */ -static void -vbdev_reduce_init_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) -{ - struct vbdev_compress *meta_ctx = cb_arg; - - if (reduce_errno == 0) { - meta_ctx->vol = vol; - } else { - SPDK_ERRLOG("for vol %s, error %u\n", - spdk_bdev_get_name(meta_ctx->base_bdev), reduce_errno); - } - - if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) { - spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_init_cb, meta_ctx); - } else { - _vbdev_reduce_init_cb(meta_ctx); - } -} - -/* Callback for the function used by reduceLib to perform IO to/from the backing device. We just - * call the callback provided by reduceLib when it called the read/write/unmap function and - * free the bdev_io. - */ -static void -comp_reduce_io_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg) -{ - struct spdk_reduce_vol_cb_args *cb_args = arg; - int reduce_errno; - - if (success) { - reduce_errno = 0; - } else { - reduce_errno = -EIO; - } - spdk_bdev_free_io(bdev_io); - cb_args->cb_fn(cb_args->cb_arg, reduce_errno); -} - -/* This is the function provided to the reduceLib for sending reads directly to - * the backing device. - */ -static void -_comp_reduce_readv(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt, - uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) -{ - struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, - backing_dev); - int rc; - - rc = spdk_bdev_readv_blocks(comp_bdev->base_desc, comp_bdev->base_ch, - iov, iovcnt, lba, lba_count, - comp_reduce_io_cb, - args); - if (rc) { - if (rc == -ENOMEM) { - SPDK_ERRLOG("No memory, start to queue io.\n"); - /* TODO: there's no bdev_io to queue */ - } else { - SPDK_ERRLOG("submitting readv request\n"); - } - args->cb_fn(args->cb_arg, rc); - } -} - -/* This is the function provided to the reduceLib for sending writes directly to - * the backing device. - */ -static void -_comp_reduce_writev(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt, - uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) -{ - struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, - backing_dev); - int rc; - - rc = spdk_bdev_writev_blocks(comp_bdev->base_desc, comp_bdev->base_ch, - iov, iovcnt, lba, lba_count, - comp_reduce_io_cb, - args); - if (rc) { - if (rc == -ENOMEM) { - SPDK_ERRLOG("No memory, start to queue io.\n"); - /* TODO: there's no bdev_io to queue */ - } else { - SPDK_ERRLOG("error submitting writev request\n"); - } - args->cb_fn(args->cb_arg, rc); - } -} - -/* This is the function provided to the reduceLib for sending unmaps directly to - * the backing device. - */ -static void -_comp_reduce_unmap(struct spdk_reduce_backing_dev *dev, - uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) -{ - struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, - backing_dev); - int rc; - - rc = spdk_bdev_unmap_blocks(comp_bdev->base_desc, comp_bdev->base_ch, - lba, lba_count, - comp_reduce_io_cb, - args); - - if (rc) { - if (rc == -ENOMEM) { - SPDK_ERRLOG("No memory, start to queue io.\n"); - /* TODO: there's no bdev_io to queue */ - } else { - SPDK_ERRLOG("submitting unmap request\n"); - } - args->cb_fn(args->cb_arg, rc); - } -} - -/* Called by reduceLib after performing unload vol actions following base bdev hotremove */ -static void -bdev_hotremove_vol_unload_cb(void *cb_arg, int reduce_errno) -{ - struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; - - if (reduce_errno) { - SPDK_ERRLOG("number %d\n", reduce_errno); - } - - comp_bdev->vol = NULL; - spdk_bdev_unregister(&comp_bdev->comp_bdev, NULL, NULL); -} - -static void -vbdev_compress_base_bdev_hotremove_cb(struct spdk_bdev *bdev_find) -{ - struct vbdev_compress *comp_bdev, *tmp; - - TAILQ_FOREACH_SAFE(comp_bdev, &g_vbdev_comp, link, tmp) { - if (bdev_find == comp_bdev->base_bdev) { - /* Tell reduceLib that we're done with this volume. */ - spdk_reduce_vol_unload(comp_bdev->vol, bdev_hotremove_vol_unload_cb, comp_bdev); - } - } -} - -/* Called when the underlying base bdev triggers asynchronous event such as bdev removal. */ -static void -vbdev_compress_base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, - void *event_ctx) -{ - switch (type) { - case SPDK_BDEV_EVENT_REMOVE: - vbdev_compress_base_bdev_hotremove_cb(bdev); - break; - default: - SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type); - break; - } -} - -/* TODO: determine which parms we want user configurable, HC for now - * params.vol_size - * params.chunk_size - * compression PMD, algorithm, window size, comp level, etc. - * DEV_MD_PATH - */ - -/* Common function for init and load to allocate and populate the minimal - * information for reducelib to init or load. - */ -struct vbdev_compress * -_prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size) -{ - struct vbdev_compress *meta_ctx; - struct spdk_bdev *bdev; - - meta_ctx = calloc(1, sizeof(struct vbdev_compress)); - if (meta_ctx == NULL) { - SPDK_ERRLOG("failed to alloc init contexts\n"); - return NULL; - } - - meta_ctx->drv_name = "None"; - meta_ctx->backing_dev.unmap = _comp_reduce_unmap; - meta_ctx->backing_dev.readv = _comp_reduce_readv; - meta_ctx->backing_dev.writev = _comp_reduce_writev; - meta_ctx->backing_dev.compress = _comp_reduce_compress; - meta_ctx->backing_dev.decompress = _comp_reduce_decompress; - - meta_ctx->base_desc = bdev_desc; - bdev = spdk_bdev_desc_get_bdev(bdev_desc); - meta_ctx->base_bdev = bdev; - - meta_ctx->backing_dev.blocklen = bdev->blocklen; - meta_ctx->backing_dev.blockcnt = bdev->blockcnt; - - meta_ctx->params.chunk_size = CHUNK_SIZE; - if (lb_size == 0) { - meta_ctx->params.logical_block_size = bdev->blocklen; - } else { - meta_ctx->params.logical_block_size = lb_size; - } - - meta_ctx->params.backing_io_unit_size = BACKING_IO_SZ; - return meta_ctx; -} - static bool -_set_pmd(struct vbdev_compress *comp_dev) +_set_pmd(struct compress_io_channel *chan) { + + /* Note: the compress_isal PMD is not supported as accel_fw supports native ISAL + * using the accel_sw module */ if (g_opts == COMPRESS_PMD_AUTO) { if (g_qat_available) { - comp_dev->drv_name = QAT_PMD; + chan->drv_name = QAT_PMD; } else if (g_mlx5_pci_available) { - comp_dev->drv_name = MLX5_PMD; - } else { - comp_dev->drv_name = ISAL_PMD; + chan->drv_name = MLX5_PMD; } } else if (g_opts == COMPRESS_PMD_QAT_ONLY && g_qat_available) { - comp_dev->drv_name = QAT_PMD; - } else if (g_opts == COMPRESS_PMD_ISAL_ONLY && g_isal_available) { - comp_dev->drv_name = ISAL_PMD; + chan->drv_name = QAT_PMD; } else if (g_opts == COMPRESS_PMD_MLX5_PCI_ONLY && g_mlx5_pci_available) { - comp_dev->drv_name = MLX5_PMD; + chan->drv_name = MLX5_PMD; } else { SPDK_ERRLOG("Requested PMD is not available.\n"); return false; } - SPDK_NOTICELOG("PMD being used: %s\n", comp_dev->drv_name); + SPDK_NOTICELOG("Channel %p PMD being used: %s\n", chan, chan->drv_name); return true; } -/* Call reducelib to initialize a new volume */ +static int compress_create_cb(void *io_device, void *ctx_buf); +static void compress_destroy_cb(void *io_device, void *ctx_buf); +static struct spdk_accel_module_if g_compress_module; static int -vbdev_init_reduce(const char *bdev_name, const char *pm_path, uint32_t lb_size) +accel_compress_init(void) { - struct spdk_bdev_desc *bdev_desc = NULL; - struct vbdev_compress *meta_ctx; int rc; - rc = spdk_bdev_open_ext(bdev_name, true, vbdev_compress_base_bdev_event_cb, - NULL, &bdev_desc); + if (!g_compressdev_enable) { + return -EINVAL; + } + + rc = accel_init_compress_drivers(); if (rc) { - SPDK_ERRLOG("could not open bdev %s\n", bdev_name); - return rc; - } - - meta_ctx = _prepare_for_load_init(bdev_desc, lb_size); - if (meta_ctx == NULL) { - spdk_bdev_close(bdev_desc); + assert(TAILQ_EMPTY(&g_compress_devs)); + SPDK_NOTICELOG("no available compression devices\n"); return -EINVAL; } - if (_set_pmd(meta_ctx) == false) { - SPDK_ERRLOG("could not find required pmd\n"); - free(meta_ctx); - spdk_bdev_close(bdev_desc); - return -EINVAL; - } - - /* Save the thread where the base device is opened */ - meta_ctx->thread = spdk_get_thread(); - - meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc); - - spdk_reduce_vol_init(&meta_ctx->params, &meta_ctx->backing_dev, - pm_path, - vbdev_reduce_init_cb, - meta_ctx); + g_compressdev_initialized = true; + SPDK_NOTICELOG("Accel framework compressdev module initialized.\n"); + spdk_io_device_register(&g_compress_module, compress_create_cb, compress_destroy_cb, + sizeof(struct compress_io_channel), "compressdev_accel_module"); return 0; + } -/* We provide this callback for the SPDK channel code to create a channel using - * the channel struct we provided in our module get_io_channel() entry point. Here - * we get and save off an underlying base channel of the device below us so that - * we can communicate with the base bdev on a per channel basis. If we needed - * our own poller for this vbdev, we'd register it here. - */ static int -comp_bdev_ch_create_cb(void *io_device, void *ctx_buf) +compress_create_cb(void *io_device, void *ctx_buf) { - struct vbdev_compress *comp_bdev = io_device; + struct compress_io_channel *chan = ctx_buf; + const struct rte_compressdev_capabilities *capab; struct comp_device_qp *device_qp; + size_t length; - /* Now set the reduce channel if it's not already set. */ - pthread_mutex_lock(&comp_bdev->reduce_lock); - if (comp_bdev->ch_count == 0) { - /* We use this queue to track outstanding IO in our layer. */ - TAILQ_INIT(&comp_bdev->pending_comp_ios); - - /* We use this to queue up compression operations as needed. */ - TAILQ_INIT(&comp_bdev->queued_comp_ops); - - comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc); - comp_bdev->reduce_thread = spdk_get_thread(); - comp_bdev->poller = SPDK_POLLER_REGISTER(comp_dev_poller, comp_bdev, 0); - /* Now assign a q pair */ - pthread_mutex_lock(&g_comp_device_qp_lock); - TAILQ_FOREACH(device_qp, &g_comp_device_qp, link) { - if (strcmp(device_qp->device->cdev_info.driver_name, comp_bdev->drv_name) == 0) { - if (device_qp->thread == spdk_get_thread()) { - comp_bdev->device_qp = device_qp; - break; - } - if (device_qp->thread == NULL) { - comp_bdev->device_qp = device_qp; - device_qp->thread = spdk_get_thread(); - break; - } - } - } - pthread_mutex_unlock(&g_comp_device_qp_lock); - } - comp_bdev->ch_count++; - pthread_mutex_unlock(&comp_bdev->reduce_lock); - - if (comp_bdev->device_qp != NULL) { - uint64_t comp_feature_flags = - comp_bdev->device_qp->device->cdev_info.capabilities[RTE_COMP_ALGO_DEFLATE].comp_feature_flags; - - if (comp_feature_flags & (RTE_COMP_FF_OOP_SGL_IN_SGL_OUT | RTE_COMP_FF_OOP_SGL_IN_LB_OUT)) { - comp_bdev->backing_dev.sgl_in = true; - } - if (comp_feature_flags & (RTE_COMP_FF_OOP_SGL_IN_SGL_OUT | RTE_COMP_FF_OOP_LB_IN_SGL_OUT)) { - comp_bdev->backing_dev.sgl_out = true; - } - return 0; - } else { - SPDK_ERRLOG("out of qpairs, cannot assign one to comp_bdev %p\n", comp_bdev); + if (_set_pmd(chan) == false) { assert(false); + return -ENODEV; + } + + /* The following variable length arrays of mbuf pointers are required to submit to compressdev */ + length = NUM_MBUFS * sizeof(void *); + chan->src_mbufs = spdk_zmalloc(length, 0x40, NULL, + SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); + if (chan->src_mbufs == NULL) { + return -ENOMEM; + } + chan->dst_mbufs = spdk_zmalloc(length, 0x40, NULL, + SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); + if (chan->dst_mbufs == NULL) { + free(chan->src_mbufs); return -ENOMEM; } -} -static void -_channel_cleanup(struct vbdev_compress *comp_bdev) -{ - /* Note: comp_bdevs can share a device_qp if they are - * on the same thread so we leave the device_qp element - * alone for this comp_bdev and just clear the reduce thread. - */ - spdk_put_io_channel(comp_bdev->base_ch); - comp_bdev->reduce_thread = NULL; - spdk_poller_unregister(&comp_bdev->poller); -} + chan->poller = SPDK_POLLER_REGISTER(comp_dev_poller, chan, 0); + TAILQ_INIT(&chan->queued_tasks); -/* Used to reroute destroy_ch to the correct thread */ -static void -_comp_bdev_ch_destroy_cb(void *arg) -{ - struct vbdev_compress *comp_bdev = arg; - - pthread_mutex_lock(&comp_bdev->reduce_lock); - _channel_cleanup(comp_bdev); - pthread_mutex_unlock(&comp_bdev->reduce_lock); -} - -/* We provide this callback for the SPDK channel code to destroy a channel - * created with our create callback. We just need to undo anything we did - * when we created. If this bdev used its own poller, we'd unregister it here. - */ -static void -comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf) -{ - struct vbdev_compress *comp_bdev = io_device; - - pthread_mutex_lock(&comp_bdev->reduce_lock); - comp_bdev->ch_count--; - if (comp_bdev->ch_count == 0) { - /* Send this request to the thread where the channel was created. */ - if (comp_bdev->reduce_thread != spdk_get_thread()) { - spdk_thread_send_msg(comp_bdev->reduce_thread, - _comp_bdev_ch_destroy_cb, comp_bdev); - } else { - _channel_cleanup(comp_bdev); + /* TODO: consider associating device_qp with chan as opposed to thread */ + pthread_mutex_lock(&g_comp_device_qp_lock); + TAILQ_FOREACH(device_qp, &g_comp_device_qp, link) { + if (strcmp(device_qp->device->cdev_info.driver_name, chan->drv_name) == 0) { + if (device_qp->thread == NULL) { + chan->device_qp = device_qp; + device_qp->thread = spdk_get_thread(); + break; + } } } - pthread_mutex_unlock(&comp_bdev->reduce_lock); -} + pthread_mutex_unlock(&g_comp_device_qp_lock); -/* RPC entry point for compression vbdev creation. */ -int -create_compress_bdev(const char *bdev_name, const char *pm_path, uint32_t lb_size) -{ - struct vbdev_compress *comp_bdev = NULL; + if (chan->device_qp == NULL) { + SPDK_ERRLOG("out of qpairs, cannot assign one\n"); + assert(false); + return -ENOMEM; + } else { + capab = rte_compressdev_capability_get(0, RTE_COMP_ALGO_DEFLATE); - if ((lb_size != 0) && (lb_size != LB_SIZE_4K) && (lb_size != LB_SIZE_512B)) { - SPDK_ERRLOG("Logical block size must be 512 or 4096\n"); - return -EINVAL; - } - - TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { - if (strcmp(bdev_name, comp_bdev->base_bdev->name) == 0) { - SPDK_ERRLOG("Bass bdev %s already being used for a compress bdev\n", bdev_name); - return -EBUSY; + if (capab->comp_feature_flags & (RTE_COMP_FF_OOP_SGL_IN_SGL_OUT | RTE_COMP_FF_OOP_SGL_IN_LB_OUT)) { + chan->device_qp->device->sgl_in = true; } - } - return vbdev_init_reduce(bdev_name, pm_path, lb_size); -} -/* On init, just init the compress drivers. All metadata is stored on disk. */ -static int -vbdev_compress_init(void) -{ - if (vbdev_init_compress_drivers()) { - SPDK_ERRLOG("Error setting up compression devices\n"); - return -EINVAL; + if (capab->comp_feature_flags & (RTE_COMP_FF_OOP_SGL_IN_SGL_OUT | RTE_COMP_FF_OOP_LB_IN_SGL_OUT)) { + chan->device_qp->device->sgl_out = true; + } } return 0; } -/* Called when the entire module is being torn down. */ static void -vbdev_compress_finish(void) +accel_compress_write_config_json(struct spdk_json_write_ctx *w) +{ + if (g_compressdev_enable) { + spdk_json_write_object_begin(w); + spdk_json_write_named_string(w, "method", "compressdev_scan_accel_module"); + spdk_json_write_named_object_begin(w, "params"); + spdk_json_write_named_uint32(w, "pmd", g_opts); + spdk_json_write_object_end(w); + spdk_json_write_object_end(w); + } +} + +static void +compress_destroy_cb(void *io_device, void *ctx_buf) +{ + struct compress_io_channel *chan = ctx_buf; + struct comp_device_qp *device_qp; + struct spdk_thread *thread = spdk_get_thread(); + + spdk_free(chan->src_mbufs); + spdk_free(chan->dst_mbufs); + + spdk_poller_unregister(&chan->poller); + + pthread_mutex_lock(&g_comp_device_qp_lock); + TAILQ_FOREACH(device_qp, &g_comp_device_qp, link) { + if (strcmp(device_qp->device->cdev_info.driver_name, chan->drv_name) == 0) { + if (device_qp->thread == thread) { + chan->device_qp = NULL; + device_qp->thread = NULL; + break; + } + } + } + pthread_mutex_unlock(&g_comp_device_qp_lock); +} + +static size_t +accel_compress_get_ctx_size(void) +{ + return 0; +} + +static bool +compress_supports_opcode(enum accel_opcode opc) +{ + if (g_mlx5_pci_available || g_qat_available) { + switch (opc) { + case ACCEL_OPC_COMPRESS: + case ACCEL_OPC_DECOMPRESS: + return true; + default: + break; + } + } + + return false; +} + +static struct spdk_io_channel * +compress_get_io_channel(void) +{ + return spdk_get_io_channel(&g_compress_module); +} + +static void accel_compress_exit(void *ctx); +static struct spdk_accel_module_if g_compress_module = { + .module_init = accel_compress_init, + .module_fini = accel_compress_exit, + .write_config_json = accel_compress_write_config_json, + .get_ctx_size = accel_compress_get_ctx_size, + .name = "dpdk_compressdev", + .supports_opcode = compress_supports_opcode, + .get_io_channel = compress_get_io_channel, + .submit_tasks = compress_submit_tasks +}; + +void +accel_dpdk_compressdev_enable(void) +{ + spdk_accel_module_list_add(&g_compress_module); +} + +static void +accel_compress_exit(void *ctx) { struct comp_device_qp *dev_qp; - /* TODO: unload vol in a future patch */ + struct compress_dev *device; + + if (g_compressdev_initialized) { + spdk_io_device_unregister(&g_compress_module, NULL); + g_compressdev_initialized = false; + } + + while ((device = TAILQ_FIRST(&g_compress_devs))) { + TAILQ_REMOVE(&g_compress_devs, device, link); + free(device); + } while ((dev_qp = TAILQ_FIRST(&g_comp_device_qp))) { TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link); free(dev_qp); } + pthread_mutex_destroy(&g_comp_device_qp_lock); rte_mempool_free(g_comp_op_mp); rte_mempool_free(g_mbuf_mp); + + spdk_accel_module_finish(); } - -/* During init we'll be asked how much memory we'd like passed to us - * in bev_io structures as context. Here's where we specify how - * much context we want per IO. - */ -static int -vbdev_compress_get_ctx_size(void) -{ - return sizeof(struct comp_bdev_io); -} - -/* When we register our bdev this is how we specify our entry points. */ -static const struct spdk_bdev_fn_table vbdev_compress_fn_table = { - .destruct = vbdev_compress_destruct, - .submit_request = vbdev_compress_submit_request, - .io_type_supported = vbdev_compress_io_type_supported, - .get_io_channel = vbdev_compress_get_io_channel, - .dump_info_json = vbdev_compress_dump_info_json, - .write_config_json = NULL, -}; - -static struct spdk_bdev_module compress_if = { - .name = "compress", - .module_init = vbdev_compress_init, - .get_ctx_size = vbdev_compress_get_ctx_size, - .examine_disk = vbdev_compress_examine, - .module_fini = vbdev_compress_finish, - .config_json = vbdev_compress_config_json -}; - -SPDK_BDEV_MODULE_REGISTER(compress, &compress_if) - -static int _set_compbdev_name(struct vbdev_compress *comp_bdev) -{ - struct spdk_bdev_alias *aliases; - - if (!TAILQ_EMPTY(spdk_bdev_get_aliases(comp_bdev->base_bdev))) { - aliases = TAILQ_FIRST(spdk_bdev_get_aliases(comp_bdev->base_bdev)); - comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", aliases->alias.name); - if (!comp_bdev->comp_bdev.name) { - SPDK_ERRLOG("could not allocate comp_bdev name for alias\n"); - return -ENOMEM; - } - } else { - comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", comp_bdev->base_bdev->name); - if (!comp_bdev->comp_bdev.name) { - SPDK_ERRLOG("could not allocate comp_bdev name for unique name\n"); - return -ENOMEM; - } - } - return 0; -} - -static int -vbdev_compress_claim(struct vbdev_compress *comp_bdev) -{ - int rc; - - if (_set_compbdev_name(comp_bdev)) { - return -EINVAL; - } - - /* Note: some of the fields below will change in the future - for example, - * blockcnt specifically will not match (the compressed volume size will - * be slightly less than the base bdev size) - */ - comp_bdev->comp_bdev.product_name = COMP_BDEV_NAME; - comp_bdev->comp_bdev.write_cache = comp_bdev->base_bdev->write_cache; - - if (strcmp(comp_bdev->drv_name, QAT_PMD) == 0) { - comp_bdev->comp_bdev.required_alignment = - spdk_max(spdk_u32log2(comp_bdev->base_bdev->blocklen), - comp_bdev->base_bdev->required_alignment); - SPDK_NOTICELOG("QAT in use: Required alignment set to %u\n", - comp_bdev->comp_bdev.required_alignment); - } else { - comp_bdev->comp_bdev.required_alignment = comp_bdev->base_bdev->required_alignment; - } - comp_bdev->comp_bdev.optimal_io_boundary = - comp_bdev->params.chunk_size / comp_bdev->params.logical_block_size; - - comp_bdev->comp_bdev.split_on_optimal_io_boundary = true; - - comp_bdev->comp_bdev.blocklen = comp_bdev->params.logical_block_size; - comp_bdev->comp_bdev.blockcnt = comp_bdev->params.vol_size / comp_bdev->comp_bdev.blocklen; - assert(comp_bdev->comp_bdev.blockcnt > 0); - - /* This is the context that is passed to us when the bdev - * layer calls in so we'll save our comp_bdev node here. - */ - comp_bdev->comp_bdev.ctxt = comp_bdev; - comp_bdev->comp_bdev.fn_table = &vbdev_compress_fn_table; - comp_bdev->comp_bdev.module = &compress_if; - - pthread_mutex_init(&comp_bdev->reduce_lock, NULL); - - /* Save the thread where the base device is opened */ - comp_bdev->thread = spdk_get_thread(); - - spdk_io_device_register(comp_bdev, comp_bdev_ch_create_cb, comp_bdev_ch_destroy_cb, - sizeof(struct comp_io_channel), - comp_bdev->comp_bdev.name); - - rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc, - comp_bdev->comp_bdev.module); - if (rc) { - SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(comp_bdev->base_bdev)); - goto error_claim; - } - - rc = spdk_bdev_register(&comp_bdev->comp_bdev); - if (rc < 0) { - SPDK_ERRLOG("trying to register bdev\n"); - goto error_bdev_register; - } - - TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link); - - SPDK_NOTICELOG("registered io_device and virtual bdev for: %s\n", comp_bdev->comp_bdev.name); - - return 0; - - /* Error cleanup paths. */ -error_bdev_register: - spdk_bdev_module_release_bdev(comp_bdev->base_bdev); -error_claim: - spdk_io_device_unregister(comp_bdev, NULL); - free(comp_bdev->comp_bdev.name); - return rc; -} - -static void -_vbdev_compress_delete_done(void *_ctx) -{ - struct vbdev_comp_delete_ctx *ctx = _ctx; - - ctx->cb_fn(ctx->cb_arg, ctx->cb_rc); - - free(ctx); -} - -static void -vbdev_compress_delete_done(void *cb_arg, int bdeverrno) -{ - struct vbdev_comp_delete_ctx *ctx = cb_arg; - - ctx->cb_rc = bdeverrno; - - if (ctx->orig_thread != spdk_get_thread()) { - spdk_thread_send_msg(ctx->orig_thread, _vbdev_compress_delete_done, ctx); - } else { - _vbdev_compress_delete_done(ctx); - } -} - -void -bdev_compress_delete(const char *name, spdk_delete_compress_complete cb_fn, void *cb_arg) -{ - struct vbdev_compress *comp_bdev = NULL; - struct vbdev_comp_delete_ctx *ctx; - - TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { - if (strcmp(name, comp_bdev->comp_bdev.name) == 0) { - break; - } - } - - if (comp_bdev == NULL) { - cb_fn(cb_arg, -ENODEV); - return; - } - - ctx = calloc(1, sizeof(*ctx)); - if (ctx == NULL) { - SPDK_ERRLOG("Failed to allocate delete context\n"); - cb_fn(cb_arg, -ENOMEM); - return; - } - - /* Save these for after the vol is destroyed. */ - ctx->cb_fn = cb_fn; - ctx->cb_arg = cb_arg; - ctx->orig_thread = spdk_get_thread(); - - comp_bdev->delete_ctx = ctx; - - /* Tell reducelib that we're done with this volume. */ - if (comp_bdev->orphaned == false) { - spdk_reduce_vol_unload(comp_bdev->vol, delete_vol_unload_cb, comp_bdev); - } else { - delete_vol_unload_cb(comp_bdev, 0); - } -} - -static void -_vbdev_reduce_load_cb(void *ctx) -{ - struct vbdev_compress *meta_ctx = ctx; - int rc; - - assert(meta_ctx->base_desc != NULL); - - /* Done with metadata operations */ - spdk_put_io_channel(meta_ctx->base_ch); - - if (meta_ctx->reduce_errno == 0) { - if (_set_pmd(meta_ctx) == false) { - SPDK_ERRLOG("could not find required pmd\n"); - goto err; - } - - rc = vbdev_compress_claim(meta_ctx); - if (rc != 0) { - goto err; - } - } else if (meta_ctx->reduce_errno == -ENOENT) { - if (_set_compbdev_name(meta_ctx)) { - goto err; - } - - /* Save the thread where the base device is opened */ - meta_ctx->thread = spdk_get_thread(); - - meta_ctx->comp_bdev.module = &compress_if; - pthread_mutex_init(&meta_ctx->reduce_lock, NULL); - rc = spdk_bdev_module_claim_bdev(meta_ctx->base_bdev, meta_ctx->base_desc, - meta_ctx->comp_bdev.module); - if (rc) { - SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev)); - free(meta_ctx->comp_bdev.name); - goto err; - } - - meta_ctx->orphaned = true; - TAILQ_INSERT_TAIL(&g_vbdev_comp, meta_ctx, link); - } else { - if (meta_ctx->reduce_errno != -EILSEQ) { - SPDK_ERRLOG("for vol %s, error %u\n", - spdk_bdev_get_name(meta_ctx->base_bdev), meta_ctx->reduce_errno); - } - goto err; - } - - spdk_bdev_module_examine_done(&compress_if); - return; - -err: - /* Close the underlying bdev on its same opened thread. */ - spdk_bdev_close(meta_ctx->base_desc); - free(meta_ctx); - spdk_bdev_module_examine_done(&compress_if); -} - -/* Callback from reduce for then load is complete. We'll pass the vbdev_comp struct - * used for initial metadata operations to claim where it will be further filled out - * and added to the global list. - */ -static void -vbdev_reduce_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) -{ - struct vbdev_compress *meta_ctx = cb_arg; - - if (reduce_errno == 0) { - /* Update information following volume load. */ - meta_ctx->vol = vol; - memcpy(&meta_ctx->params, spdk_reduce_vol_get_params(vol), - sizeof(struct spdk_reduce_vol_params)); - } - - meta_ctx->reduce_errno = reduce_errno; - - if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) { - spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_load_cb, meta_ctx); - } else { - _vbdev_reduce_load_cb(meta_ctx); - } - -} - -/* Examine_disk entry point: will do a metadata load to see if this is ours, - * and if so will go ahead and claim it. - */ -static void -vbdev_compress_examine(struct spdk_bdev *bdev) -{ - struct spdk_bdev_desc *bdev_desc = NULL; - struct vbdev_compress *meta_ctx; - int rc; - - if (strcmp(bdev->product_name, COMP_BDEV_NAME) == 0) { - spdk_bdev_module_examine_done(&compress_if); - return; - } - - rc = spdk_bdev_open_ext(spdk_bdev_get_name(bdev), false, - vbdev_compress_base_bdev_event_cb, NULL, &bdev_desc); - if (rc) { - SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(bdev)); - spdk_bdev_module_examine_done(&compress_if); - return; - } - - meta_ctx = _prepare_for_load_init(bdev_desc, 0); - if (meta_ctx == NULL) { - spdk_bdev_close(bdev_desc); - spdk_bdev_module_examine_done(&compress_if); - return; - } - - /* Save the thread where the base device is opened */ - meta_ctx->thread = spdk_get_thread(); - - meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc); - spdk_reduce_vol_load(&meta_ctx->backing_dev, vbdev_reduce_load_cb, meta_ctx); -} - -int -compress_set_pmd(enum compress_pmd *opts) -{ - g_opts = *opts; - - return 0; -} - -SPDK_LOG_REGISTER_COMPONENT(vbdev_compress) diff --git a/module/accel/dpdk_compressdev/accel_dpdk_compressdev.h b/module/accel/dpdk_compressdev/accel_dpdk_compressdev.h new file mode 100644 index 000000000..0ad54f6d8 --- /dev/null +++ b/module/accel/dpdk_compressdev/accel_dpdk_compressdev.h @@ -0,0 +1,16 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright (C) 2022 Intel Corporation. + * All rights reserved. + */ + +#include "spdk/stdinc.h" + +enum compress_pmd { + COMPRESS_PMD_AUTO = 0, + COMPRESS_PMD_QAT_ONLY, + COMPRESS_PMD_MLX5_PCI_ONLY, + COMPRESS_PMD_MAX, +}; + +void accel_dpdk_compressdev_enable(void); +int accel_compressdev_enable_probe(enum compress_pmd *opts); diff --git a/module/accel/dpdk_compressdev/accel_dpdk_compressdev_rpc.c b/module/accel/dpdk_compressdev/accel_dpdk_compressdev_rpc.c new file mode 100644 index 000000000..7d9a09d55 --- /dev/null +++ b/module/accel/dpdk_compressdev/accel_dpdk_compressdev_rpc.c @@ -0,0 +1,52 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright (C) 2022 Intel Corporation. + * All rights reserved. + */ + +#include "accel_dpdk_compressdev.h" +#include "spdk/rpc.h" +#include "spdk/util.h" +#include "spdk/string.h" +#include "spdk/log.h" + +struct rpc_compressdev_scan_accel_module { + uint32_t pmd; +}; + +static const struct spdk_json_object_decoder rpc_compressdev_scan_accel_module_decoder[] = { + {"pmd", offsetof(struct rpc_compressdev_scan_accel_module, pmd), spdk_json_decode_uint32}, +}; + +static void +rpc_compressdev_scan_accel_module(struct spdk_jsonrpc_request *request, + const struct spdk_json_val *params) +{ + struct rpc_compressdev_scan_accel_module req; + int rc = 0; + + if (spdk_json_decode_object(params, rpc_compressdev_scan_accel_module_decoder, + SPDK_COUNTOF(rpc_compressdev_scan_accel_module_decoder), + &req)) { + SPDK_ERRLOG("spdk_json_decode_object failed\n"); + spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_PARSE_ERROR, + "spdk_json_decode_object failed"); + return; + } + + if (req.pmd >= COMPRESS_PMD_MAX) { + spdk_jsonrpc_send_error_response_fmt(request, -EINVAL, + "PMD value %d should be less than %d", req.pmd, COMPRESS_PMD_MAX); + return; + } + + rc = accel_compressdev_enable_probe(&req.pmd); + if (rc) { + spdk_jsonrpc_send_error_response(request, rc, spdk_strerror(-rc)); + return; + } + + accel_dpdk_compressdev_enable(); + spdk_jsonrpc_send_bool_response(request, true); +} +SPDK_RPC_REGISTER("compressdev_scan_accel_module", rpc_compressdev_scan_accel_module, + SPDK_RPC_STARTUP) diff --git a/module/bdev/compress/vbdev_compress.c b/module/bdev/compress/vbdev_compress.c index 0c2bb1d9f..e78c8395d 100644 --- a/module/bdev/compress/vbdev_compress.c +++ b/module/bdev/compress/vbdev_compress.c @@ -353,6 +353,8 @@ vbdev_init_compress_drivers(void) return -EINVAL; } + /* TODO: make these global pools per thread but do in a follow-up patch to make + * it easier to review against the old compressdev code */ g_mbuf_mp = rte_pktmbuf_pool_create("comp_mbuf_mp", NUM_MBUFS, POOL_CACHE_SIZE, sizeof(struct rte_mbuf), 0, rte_socket_id()); if (g_mbuf_mp == NULL) { @@ -1401,6 +1403,10 @@ comp_bdev_ch_create_cb(void *io_device, void *ctx_buf) struct vbdev_compress *comp_bdev = io_device; struct comp_device_qp *device_qp; + /* TODO look into associating the device_qp with the channel vs the thread, + * doing in next patch to make this one easier to review against code taken + * from the vbdev module */ + /* Now set the reduce channel if it's not already set. */ pthread_mutex_lock(&comp_bdev->reduce_lock); if (comp_bdev->ch_count == 0) { diff --git a/python/spdk/rpc/__init__.py b/python/spdk/rpc/__init__.py index d64b9eeef..471ffe292 100644 --- a/python/spdk/rpc/__init__.py +++ b/python/spdk/rpc/__init__.py @@ -12,6 +12,7 @@ from . import accel from . import app from . import bdev from . import blobfs +from . import compressdev from . import env_dpdk from . import dsa from . import iaa diff --git a/python/spdk/rpc/accel.py b/python/spdk/rpc/accel.py index 3d320f296..00035b9f5 100644 --- a/python/spdk/rpc/accel.py +++ b/python/spdk/rpc/accel.py @@ -1,6 +1,7 @@ # SPDX-License-Identifier: BSD-3-Clause # Copyright (C) 2022 Intel Corporation. # All rights reserved. +# from spdk.rpc.helpers import deprecated_alias diff --git a/python/spdk/rpc/compressdev.py b/python/spdk/rpc/compressdev.py new file mode 100644 index 000000000..91076ae0d --- /dev/null +++ b/python/spdk/rpc/compressdev.py @@ -0,0 +1,14 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (C) 2022 Intel Corporation. +# All rights reserved. +# + +def compressdev_scan_accel_module(client, pmd): + """Scan and enable compressdev module and set pmd option. + + Args: + pmd: 0 = auto-select, 1 = QAT, 2 = mlx5_pci + """ + params = {'pmd': pmd} + + return client.call('compressdev_scan_accel_module', params) diff --git a/scripts/rpc.py b/scripts/rpc.py index e45a92b19..b84fc6881 100755 --- a/scripts/rpc.py +++ b/scripts/rpc.py @@ -2840,6 +2840,14 @@ Format: 'user:u1 secret:s1 muser:mu1 msecret:ms1,user:u2 secret:s2 muser:mu2 mse p = subparsers.add_parser('ioat_scan_accel_module', help='Enable IOAT accel module offload.') p.set_defaults(func=ioat_scan_accel_module) + # dpdk compressdev + def compressdev_scan_accel_module(args): + rpc.compressdev.compressdev_scan_accel_module(args.client, pmd=args.pmd) + + p = subparsers.add_parser('compressdev_scan_accel_module', help='Scan and enable compressdev module and set pmd option.') + p.add_argument('-p', '--pmd', type=int, help='0 = auto-select, 1= QAT only, 2 = mlx5_pci only') + p.set_defaults(func=compressdev_scan_accel_module) + # dsa def dsa_scan_accel_module(args): rpc.dsa.dsa_scan_accel_module(args.client, config_kernel_mode=args.config_kernel_mode) diff --git a/test/accel/accel.sh b/test/accel/accel.sh index ee985cf13..fd4ff2949 100755 --- a/test/accel/accel.sh +++ b/test/accel/accel.sh @@ -28,6 +28,15 @@ if [[ $CONFIG_ISAL == y ]]; then run_test "accel_decomp_mthread" $SPDK_EXAMPLE_DIR/accel_perf -t 1 -w decompress -l $testdir/bib -y -T 2 run_test "accel_deomp_full_mthread" $SPDK_EXAMPLE_DIR/accel_perf -t 1 -w decompress -l $testdir/bib -y -o 0 -T 2 fi +if [[ $CONFIG_DPDK_COMPRESSDEV == y ]]; then + run_test "accel_cdev_comp" $SPDK_EXAMPLE_DIR/accel_perf -t 1 -w compress -l $testdir/bib -c $testdir/dpdk.json + run_test "accel_cdev_decomp" $SPDK_EXAMPLE_DIR/accel_perf -t 1 -w decompress -l $testdir/bib -y -c $testdir/dpdk.json + run_test "accel_cdev_decmop_full" $SPDK_EXAMPLE_DIR/accel_perf -t 1 -w decompress -l $testdir/bib -y -o 0 -c $testdir/dpdk.json + run_test "accel_cdev_decomp_mcore" $SPDK_EXAMPLE_DIR/accel_perf -t 1 -w decompress -l $testdir/bib -y -m 0xf -c $testdir/dpdk.json + run_test "accel_cdev_decomp_full_mcore" $SPDK_EXAMPLE_DIR/accel_perf -t 1 -w decompress -l $testdir/bib -y -o 0 -m 0xf -c $testdir/dpdk.json + run_test "accel_cdev_decomp_mthread" $SPDK_EXAMPLE_DIR/accel_perf -t 1 -w decompress -l $testdir/bib -y -T 2 -c $testdir/dpdk.json + run_test "accel_cdev_deomp_full_mthread" $SPDK_EXAMPLE_DIR/accel_perf -t 1 -w decompress -l $testdir/bib -y -o 0 -T 2 -c $testdir/dpdk.json +fi trap 'killprocess $spdk_tgt_pid; exit 1' ERR diff --git a/test/accel/dpdk.json b/test/accel/dpdk.json new file mode 100644 index 000000000..43d984321 --- /dev/null +++ b/test/accel/dpdk.json @@ -0,0 +1,15 @@ +{ + "subsystems": [ + { + "subsystem": "accel", + "config": [ + { + "method": "compressdev_scan_accel_module", + "params": { + "pmd": 0 + } + } + ] + } + ] +} diff --git a/test/common/autotest_common.sh b/test/common/autotest_common.sh index d5de077df..f5ae033ef 100755 --- a/test/common/autotest_common.sh +++ b/test/common/autotest_common.sh @@ -444,7 +444,7 @@ function get_config_params() { if [ -f /usr/include/libpmem.h ] && [ $SPDK_TEST_VBDEV_COMPRESS -eq 1 ]; then if ge "$(nasm --version | awk '{print $3}')" 2.14 && [[ $SPDK_TEST_ISAL -eq 1 ]]; then - config_params+=' --with-vbdev-compress' + config_params+=' --with-vbdev-compress --with-dpdk-compressdev' fi fi diff --git a/test/common/skipped_build_files.txt b/test/common/skipped_build_files.txt index 871c3d73b..08842fad5 100644 --- a/test/common/skipped_build_files.txt +++ b/test/common/skipped_build_files.txt @@ -56,7 +56,3 @@ module/bdev/daos/bdev_daos_rpc # Not configured to test xNVMe bdev module/bdev/xnvme/bdev_xnvme module/bdev/xnvme/bdev_xnvme_rpc - -# Temporarily include this in the skipped files. Next patches will remove -# this and start building it. -module/accel/dpdk_compressdev/accel_dpdk_compressdev