bdev/compress: Port to use accel_fw instead of compressdev

directly

This patch removes hardcoded compressdev code from the
vbdev module and instead uses the accel_fw. The port required
a few changes based on how things are plumbed and accessed,
nothing that isn't be too obscure.  CI tests were updated to
run ISAL accel_fw module as well as DPDK compressdev with QAT.

Unit tests for the new module will follow in a separate patch.

Signed-off-by: paul luse <paul.e.luse@intel.com>
Change-Id: I769cbc888658fb846d89f6f0bfeeb1a2a820767e
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/13610
Tested-by: SPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: Tomasz Zawadzki <tomasz.zawadzki@intel.com>
Reviewed-by: Konrad Sztyber <konrad.sztyber@intel.com>
This commit is contained in:
paul luse 2022-07-07 12:46:45 -07:00 committed by Tomasz Zawadzki
parent 90a6407df6
commit bb5083a85d
15 changed files with 86 additions and 1706 deletions

View File

@ -325,7 +325,7 @@ if [ $SPDK_RUN_FUNCTIONAL_TEST -eq 1 ]; then
fi
if [ $SPDK_TEST_VBDEV_COMPRESS -eq 1 ]; then
run_test "compress_qat" ./test/compress/compress.sh "qat"
run_test "compress_compdev" ./test/compress/compress.sh "compdev"
run_test "compress_isal" ./test/compress/compress.sh "isal"
fi

View File

@ -104,15 +104,10 @@ unless the data stored on disk is placed appropriately. The compression vbdev mo
relies on an internal SPDK library called `reduce` to accomplish this, see @ref reduce
for detailed information.
The vbdev module relies on the DPDK CompressDev Framework to provide all compression
functionality. The framework provides support for many different software only
compression modules as well as hardware assisted support for Intel QAT. At this
time the vbdev module supports the DPDK drivers for ISAL, QAT and mlx5_pci.
mlx5_pci driver works with BlueField 2 SmartNIC and requires additional configuration of DPDK
environment to enable compression function. It can be done via SPDK event library by configuring
`env_context` member of `spdk_app_opts` structure or by passing corresponding CLI arguments in the
following form: `--allow=BDF,class=compress`, e.g. `--allow=0000:01:00.0,class=compress`.
The compression bdev module leverages the [Acceleration Framework](https://spdk.io/doc/accel_fw.html) to
carry out the actual compression and decompression. The acceleration framework can be configured to use
ISA-L software optimized compression or the DPDK Compressdev module for hardware acceleration. To configure
the Compressdev module please see the `compressdev_scan_accel_module` documentation [here](https://spdk.io/doc/jsonrpc.html)
Persistent memory is used to store metadata associated with the layout of the data on the
backing device. SPDK relies on [PMDK](http://pmem.io/pmdk/) to interface persistent memory so any hardware
@ -135,18 +130,6 @@ created it cannot be separated from the persistent memory file that will be crea
the specified directory. If the persistent memory file is not available, the compression
vbdev will also not be available.
By default the vbdev module will choose the QAT driver if the hardware and drivers are
available and loaded. If not, it will revert to the software-only ISAL driver. By using
the following command, the driver may be specified however this is not persistent so it
must be done either upon creation or before the underlying logical volume is loaded to
be honored. In the example below, `0` is telling the vbdev module to use QAT if available
otherwise use ISAL, this is the default and if sufficient the command is not required. Passing
a value of 1 tells the driver to use QAT and if not available then the creation or loading
the vbdev should fail to create or load. A value of '2' as shown below tells the module
to use ISAL and if for some reason it is not available, the vbdev should fail to create or load.
`rpc.py bdev_compress_set_pmd -p 2`
To remove a compression vbdev, use the following command which will also delete the PMEM
file. If the logical volume is deleted the PMEM file will not be removed and the
compression vbdev will not be available.

View File

@ -2724,42 +2724,6 @@ Example response:
}
~~~
### bdev_compress_set_pmd {#rpc_bdev_compress_set_pmd}
Select the DPDK polled mode driver (pmd) for a compressed bdev,
0 = auto-select, 1= QAT only, 2 = ISAL only, 3 = mlx5_pci only.
#### Parameters
Name | Optional | Type | Description
----------------------- | -------- | ----------- | -----------
pmd | Required | int | pmd selection
#### Example
Example request:
~~~json
{
"params": {
"pmd": 1
},
"jsonrpc": "2.0",
"method": "bdev_compress_set_pmd",
"id": 1
}
~~~
Example response:
~~~json
{
"jsonrpc": "2.0",
"id": 1,
"result": true
}
~~~
### bdev_crypto_create {#rpc_bdev_crypto_create}
Create a new crypto bdev on a given base bdev.

View File

@ -78,6 +78,7 @@ typedef void (*spdk_reduce_vol_op_with_handle_complete)(void *ctx,
typedef void (*spdk_reduce_dev_cpl)(void *cb_arg, int reduce_errno);
struct spdk_reduce_vol_cb_args {
uint32_t output_size;
spdk_reduce_dev_cpl cb_fn;
void *cb_arg;
};

View File

@ -1179,11 +1179,10 @@ _write_compress_done(void *_req, int reduce_errno)
* the uncompressed buffer to disk.
*/
if (reduce_errno < 0) {
reduce_errno = req->vol->params.chunk_size;
req->backing_cb_args.output_size = req->vol->params.chunk_size;
}
/* Positive reduce_errno indicates number of bytes in compressed buffer. */
_reduce_vol_write_chunk(req, _write_write_done, (uint32_t)reduce_errno);
_reduce_vol_write_chunk(req, _write_write_done, req->backing_cb_args.output_size);
}
static void
@ -1392,11 +1391,10 @@ _write_decompress_done(void *_req, int reduce_errno)
return;
}
/* Positive reduce_errno indicates number of bytes in decompressed
* buffer. This should equal the chunk size - otherwise that's another
* type of failure.
/* Positive reduce_errno indicates that the output size field in the backing_cb_args
* represents the output_size.
*/
if ((uint32_t)reduce_errno != req->vol->params.chunk_size) {
if (req->backing_cb_args.output_size != req->vol->params.chunk_size) {
_reduce_vol_complete_req(req, -EIO);
return;
}
@ -1443,11 +1441,10 @@ _read_decompress_done(void *_req, int reduce_errno)
return;
}
/* Positive reduce_errno indicates number of bytes in decompressed
* buffer. This should equal the chunk size - otherwise that's another
* type of failure.
/* Positive reduce_errno indicates that the output size field in the backing_cb_args
* represents the output_size.
*/
if ((uint32_t)reduce_errno != vol->params.chunk_size) {
if (req->backing_cb_args.output_size != vol->params.chunk_size) {
_reduce_vol_complete_req(req, -EIO);
return;
}

View File

@ -136,7 +136,7 @@ DEPDIRS-bdev_rpc := $(BDEV_DEPS)
DEPDIRS-bdev_split := $(BDEV_DEPS)
DEPDIRS-bdev_aio := $(BDEV_DEPS_THREAD)
DEPDIRS-bdev_compress := $(BDEV_DEPS_THREAD) reduce
DEPDIRS-bdev_compress := $(BDEV_DEPS_THREAD) reduce accel
DEPDIRS-bdev_crypto := $(BDEV_DEPS_THREAD) accel
DEPDIRS-bdev_delay := $(BDEV_DEPS_THREAD)
DEPDIRS-bdev_iscsi := $(BDEV_DEPS_THREAD)

View File

@ -16,81 +16,16 @@
#include "spdk/util.h"
#include "spdk/bdev_module.h"
#include "spdk/likely.h"
#include "spdk/log.h"
#include "spdk/accel.h"
#include <rte_config.h>
#include <rte_bus_vdev.h>
#include <rte_compressdev.h>
#include <rte_comp.h>
#include <rte_mbuf_dyn.h>
#include "spdk_internal/accel_module.h"
/* Used to store IO context in mbuf */
static const struct rte_mbuf_dynfield rte_mbuf_dynfield_io_context = {
.name = "context_reduce",
.size = sizeof(uint64_t),
.align = __alignof__(uint64_t),
.flags = 0,
};
static int g_mbuf_offset;
#define NUM_MAX_XFORMS 2
#define NUM_MAX_INFLIGHT_OPS 128
#define DEFAULT_WINDOW_SIZE 15
/* We need extra mbufs per operation to accommodate host buffers that
* span a physical page boundary.
*/
#define MAX_MBUFS_PER_OP (REDUCE_MAX_IOVECS * 2)
#define CHUNK_SIZE (1024 * 16)
#define COMP_BDEV_NAME "compress"
#define BACKING_IO_SZ (4 * 1024)
#define ISAL_PMD "compress_isal"
#define QAT_PMD "compress_qat"
#define MLX5_PMD "mlx5_pci"
#define NUM_MBUFS 8192
#define POOL_CACHE_SIZE 256
static enum compress_pmd g_opts;
/* Global list of available compression devices. */
struct compress_dev {
struct rte_compressdev_info cdev_info; /* includes device friendly name */
uint8_t cdev_id; /* identifier for the device */
void *comp_xform; /* shared private xform for comp on this PMD */
void *decomp_xform; /* shared private xform for decomp on this PMD */
TAILQ_ENTRY(compress_dev) link;
};
static TAILQ_HEAD(, compress_dev) g_compress_devs = TAILQ_HEAD_INITIALIZER(g_compress_devs);
/* Although ISAL PMD reports 'unlimited' qpairs, it has an unplanned limit of 99 due to
* the length of the internal ring name that it creates, it breaks a limit in the generic
* ring code and fails the qp initialization.
* FIXME: Reduce number of qpairs to 48, due to issue #2338
*/
#define MAX_NUM_QP 48
/* Global list and lock for unique device/queue pair combos */
struct comp_device_qp {
struct compress_dev *device; /* ptr to compression device */
uint8_t qp; /* queue pair for this node */
struct spdk_thread *thread; /* thread that this qp is assigned to */
TAILQ_ENTRY(comp_device_qp) link;
};
static TAILQ_HEAD(, comp_device_qp) g_comp_device_qp = TAILQ_HEAD_INITIALIZER(g_comp_device_qp);
static pthread_mutex_t g_comp_device_qp_lock = PTHREAD_MUTEX_INITIALIZER;
/* For queueing up compression operations that we can't submit for some reason */
struct vbdev_comp_op {
struct spdk_reduce_backing_dev *backing_dev;
struct iovec *src_iovs;
int src_iovcnt;
struct iovec *dst_iovs;
int dst_iovcnt;
bool compress;
void *cb_arg;
TAILQ_ENTRY(vbdev_comp_op) link;
};
struct vbdev_comp_delete_ctx {
spdk_delete_compress_complete cb_fn;
void *cb_arg;
@ -105,8 +40,7 @@ struct vbdev_compress {
struct spdk_io_channel *base_ch; /* IO channel of base device */
struct spdk_bdev comp_bdev; /* the compression virtual bdev */
struct comp_io_channel *comp_ch; /* channel associated with this bdev */
char *drv_name; /* name of the compression device driver */
struct comp_device_qp *device_qp;
struct spdk_io_channel *accel_channel; /* to communicate with the accel framework */
struct spdk_thread *reduce_thread;
pthread_mutex_t reduce_lock;
uint32_t ch_count;
@ -140,37 +74,6 @@ struct comp_bdev_io {
int status; /* save for completion on orig thread */
};
/* Shared mempools between all devices on this system */
static struct rte_mempool *g_mbuf_mp = NULL; /* mbuf mempool */
static struct rte_mempool *g_comp_op_mp = NULL; /* comp operations, must be rte* mempool */
static struct rte_mbuf_ext_shared_info g_shinfo = {}; /* used by DPDK mbuf macros */
static bool g_qat_available = false;
static bool g_isal_available = false;
static bool g_mlx5_pci_available = false;
/* Create shared (between all ops per PMD) compress xforms. */
static struct rte_comp_xform g_comp_xform = {
.type = RTE_COMP_COMPRESS,
.compress = {
.algo = RTE_COMP_ALGO_DEFLATE,
.deflate.huffman = RTE_COMP_HUFFMAN_DEFAULT,
.level = RTE_COMP_LEVEL_MAX,
.window_size = DEFAULT_WINDOW_SIZE,
.chksum = RTE_COMP_CHECKSUM_NONE,
.hash_algo = RTE_COMP_HASH_ALGO_NONE
}
};
/* Create shared (between all ops per PMD) decompress xforms. */
static struct rte_comp_xform g_decomp_xform = {
.type = RTE_COMP_DECOMPRESS,
.decompress = {
.algo = RTE_COMP_ALGO_DEFLATE,
.chksum = RTE_COMP_CHECKSUM_NONE,
.window_size = DEFAULT_WINDOW_SIZE,
.hash_algo = RTE_COMP_HASH_ALGO_NONE
}
};
static void vbdev_compress_examine(struct spdk_bdev *bdev);
static int vbdev_compress_claim(struct vbdev_compress *comp_bdev);
static void vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io);
@ -179,227 +82,6 @@ static void vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spd
static void comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf);
static void vbdev_compress_delete_done(void *cb_arg, int bdeverrno);
/* Dummy function used by DPDK to free ext attached buffers
* to mbufs, we free them ourselves but this callback has to
* be here.
*/
static void
shinfo_free_cb(void *arg1, void *arg2)
{
}
/* Called by vbdev_init_compress_drivers() to init each discovered compression device */
static int
create_compress_dev(uint8_t index)
{
struct compress_dev *device;
uint16_t q_pairs;
uint8_t cdev_id;
int rc, i;
struct comp_device_qp *dev_qp;
struct comp_device_qp *tmp_qp;
device = calloc(1, sizeof(struct compress_dev));
if (!device) {
return -ENOMEM;
}
/* Get details about this device. */
rte_compressdev_info_get(index, &device->cdev_info);
cdev_id = device->cdev_id = index;
/* Zero means no limit so choose number of lcores. */
if (device->cdev_info.max_nb_queue_pairs == 0) {
q_pairs = MAX_NUM_QP;
} else {
q_pairs = spdk_min(device->cdev_info.max_nb_queue_pairs, MAX_NUM_QP);
}
/* Configure the compression device. */
struct rte_compressdev_config config = {
.socket_id = rte_socket_id(),
.nb_queue_pairs = q_pairs,
.max_nb_priv_xforms = NUM_MAX_XFORMS,
.max_nb_streams = 0
};
rc = rte_compressdev_configure(cdev_id, &config);
if (rc < 0) {
SPDK_ERRLOG("Failed to configure compressdev %u\n", cdev_id);
goto err;
}
/* Pre-setup all potential qpairs now and assign them in the channel
* callback.
*/
for (i = 0; i < q_pairs; i++) {
rc = rte_compressdev_queue_pair_setup(cdev_id, i,
NUM_MAX_INFLIGHT_OPS,
rte_socket_id());
if (rc) {
if (i > 0) {
q_pairs = i;
SPDK_NOTICELOG("FYI failed to setup a queue pair on "
"compressdev %u with error %u "
"so limiting to %u qpairs\n",
cdev_id, rc, q_pairs);
break;
} else {
SPDK_ERRLOG("Failed to setup queue pair on "
"compressdev %u with error %u\n", cdev_id, rc);
rc = -EINVAL;
goto err;
}
}
}
rc = rte_compressdev_start(cdev_id);
if (rc < 0) {
SPDK_ERRLOG("Failed to start device %u: error %d\n",
cdev_id, rc);
goto err;
}
if (device->cdev_info.capabilities->comp_feature_flags & RTE_COMP_FF_SHAREABLE_PRIV_XFORM) {
rc = rte_compressdev_private_xform_create(cdev_id, &g_comp_xform,
&device->comp_xform);
if (rc < 0) {
SPDK_ERRLOG("Failed to create private comp xform device %u: error %d\n",
cdev_id, rc);
goto err;
}
rc = rte_compressdev_private_xform_create(cdev_id, &g_decomp_xform,
&device->decomp_xform);
if (rc) {
SPDK_ERRLOG("Failed to create private decomp xform device %u: error %d\n",
cdev_id, rc);
goto err;
}
} else {
SPDK_ERRLOG("PMD does not support shared transforms\n");
goto err;
}
/* Build up list of device/qp combinations */
for (i = 0; i < q_pairs; i++) {
dev_qp = calloc(1, sizeof(struct comp_device_qp));
if (!dev_qp) {
rc = -ENOMEM;
goto err;
}
dev_qp->device = device;
dev_qp->qp = i;
dev_qp->thread = NULL;
TAILQ_INSERT_TAIL(&g_comp_device_qp, dev_qp, link);
}
TAILQ_INSERT_TAIL(&g_compress_devs, device, link);
if (strcmp(device->cdev_info.driver_name, QAT_PMD) == 0) {
g_qat_available = true;
}
if (strcmp(device->cdev_info.driver_name, ISAL_PMD) == 0) {
g_isal_available = true;
}
if (strcmp(device->cdev_info.driver_name, MLX5_PMD) == 0) {
g_mlx5_pci_available = true;
}
return 0;
err:
TAILQ_FOREACH_SAFE(dev_qp, &g_comp_device_qp, link, tmp_qp) {
TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link);
free(dev_qp);
}
free(device);
return rc;
}
/* Called from driver init entry point, vbdev_compress_init() */
static int
vbdev_init_compress_drivers(void)
{
uint8_t cdev_count, i;
struct compress_dev *tmp_dev;
struct compress_dev *device;
int rc;
/* We always init the compress_isal PMD */
rc = rte_vdev_init(ISAL_PMD, NULL);
if (rc == 0) {
SPDK_NOTICELOG("created virtual PMD %s\n", ISAL_PMD);
} else if (rc == -EEXIST) {
SPDK_NOTICELOG("virtual PMD %s already exists.\n", ISAL_PMD);
} else {
SPDK_ERRLOG("creating virtual PMD %s\n", ISAL_PMD);
return -EINVAL;
}
/* If we have no compression devices, there's no reason to continue. */
cdev_count = rte_compressdev_count();
if (cdev_count == 0) {
return 0;
}
if (cdev_count > RTE_COMPRESS_MAX_DEVS) {
SPDK_ERRLOG("invalid device count from rte_compressdev_count()\n");
return -EINVAL;
}
g_mbuf_offset = rte_mbuf_dynfield_register(&rte_mbuf_dynfield_io_context);
if (g_mbuf_offset < 0) {
SPDK_ERRLOG("error registering dynamic field with DPDK\n");
return -EINVAL;
}
/* TODO: make these global pools per thread but do in a follow-up patch to make
* it easier to review against the old compressdev code */
g_mbuf_mp = rte_pktmbuf_pool_create("comp_mbuf_mp", NUM_MBUFS, POOL_CACHE_SIZE,
sizeof(struct rte_mbuf), 0, rte_socket_id());
if (g_mbuf_mp == NULL) {
SPDK_ERRLOG("Cannot create mbuf pool\n");
rc = -ENOMEM;
goto error_create_mbuf;
}
g_comp_op_mp = rte_comp_op_pool_create("comp_op_pool", NUM_MBUFS, POOL_CACHE_SIZE,
0, rte_socket_id());
if (g_comp_op_mp == NULL) {
SPDK_ERRLOG("Cannot create comp op pool\n");
rc = -ENOMEM;
goto error_create_op;
}
/* Init all devices */
for (i = 0; i < cdev_count; i++) {
rc = create_compress_dev(i);
if (rc != 0) {
goto error_create_compress_devs;
}
}
if (g_qat_available == true) {
SPDK_NOTICELOG("initialized QAT PMD\n");
}
g_shinfo.free_cb = shinfo_free_cb;
return 0;
/* Error cleanup paths. */
error_create_compress_devs:
TAILQ_FOREACH_SAFE(device, &g_compress_devs, link, tmp_dev) {
TAILQ_REMOVE(&g_compress_devs, device, link);
free(device);
}
error_create_op:
error_create_mbuf:
rte_mempool_free(g_mbuf_mp);
return rc;
}
/* for completing rw requests on the orig IO thread. */
static void
_reduce_rw_blocks_cb(void *arg)
@ -436,286 +118,28 @@ reduce_rw_blocks_cb(void *arg, int reduce_errno)
spdk_thread_exec_msg(orig_thread, _reduce_rw_blocks_cb, io_ctx);
}
static int
_setup_compress_mbuf(struct rte_mbuf **mbufs, int *mbuf_total, uint64_t *total_length,
struct iovec *iovs, int iovcnt, void *reduce_cb_arg)
{
uint64_t updated_length, remainder, phys_addr;
uint8_t *current_base = NULL;
int iov_index, mbuf_index;
int rc = 0;
/* Setup mbufs */
iov_index = mbuf_index = 0;
while (iov_index < iovcnt) {
current_base = iovs[iov_index].iov_base;
if (total_length) {
*total_length += iovs[iov_index].iov_len;
}
assert(mbufs[mbuf_index] != NULL);
*RTE_MBUF_DYNFIELD(mbufs[mbuf_index], g_mbuf_offset, uint64_t *) = (uint64_t)reduce_cb_arg;
updated_length = iovs[iov_index].iov_len;
phys_addr = spdk_vtophys((void *)current_base, &updated_length);
rte_pktmbuf_attach_extbuf(mbufs[mbuf_index],
current_base,
phys_addr,
updated_length,
&g_shinfo);
rte_pktmbuf_append(mbufs[mbuf_index], updated_length);
remainder = iovs[iov_index].iov_len - updated_length;
if (mbuf_index > 0) {
rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]);
}
/* If we crossed 2 physical pages boundary we need another mbuf for the remainder */
if (remainder > 0) {
/* allocate an mbuf at the end of the array */
rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp,
(struct rte_mbuf **)&mbufs[*mbuf_total], 1);
if (rc) {
SPDK_ERRLOG("ERROR trying to get an extra mbuf!\n");
return -1;
}
(*mbuf_total)++;
mbuf_index++;
*RTE_MBUF_DYNFIELD(mbufs[mbuf_index], g_mbuf_offset, uint64_t *) = (uint64_t)reduce_cb_arg;
current_base += updated_length;
phys_addr = spdk_vtophys((void *)current_base, &remainder);
/* assert we don't cross another */
assert(remainder == iovs[iov_index].iov_len - updated_length);
rte_pktmbuf_attach_extbuf(mbufs[mbuf_index],
current_base,
phys_addr,
remainder,
&g_shinfo);
rte_pktmbuf_append(mbufs[mbuf_index], remainder);
rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]);
}
iov_index++;
mbuf_index++;
}
return 0;
}
static int
_compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *src_iovs,
int src_iovcnt, struct iovec *dst_iovs,
int dst_iovcnt, bool compress, void *cb_arg)
{
void *reduce_cb_arg = cb_arg;
struct spdk_reduce_vol_cb_args *reduce_cb_arg = cb_arg;
struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_dev, struct vbdev_compress,
backing_dev);
struct rte_comp_op *comp_op;
struct rte_mbuf *src_mbufs[MAX_MBUFS_PER_OP];
struct rte_mbuf *dst_mbufs[MAX_MBUFS_PER_OP];
uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id;
uint64_t total_length = 0;
int rc = 0;
struct vbdev_comp_op *op_to_queue;
int src_mbuf_total = src_iovcnt;
int dst_mbuf_total = dst_iovcnt;
bool device_error = false;
int rc;
assert(src_iovcnt < MAX_MBUFS_PER_OP);
#ifdef DEBUG
memset(src_mbufs, 0, sizeof(src_mbufs));
memset(dst_mbufs, 0, sizeof(dst_mbufs));
#endif
comp_op = rte_comp_op_alloc(g_comp_op_mp);
if (!comp_op) {
SPDK_ERRLOG("trying to get a comp op!\n");
rc = -ENOMEM;
goto error_get_op;
}
/* get an mbuf per iov, src and dst */
rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&src_mbufs[0], src_iovcnt);
if (rc) {
SPDK_ERRLOG("ERROR trying to get src_mbufs!\n");
rc = -ENOMEM;
goto error_get_src;
}
assert(src_mbufs[0]);
rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&dst_mbufs[0], dst_iovcnt);
if (rc) {
SPDK_ERRLOG("ERROR trying to get dst_mbufs!\n");
rc = -ENOMEM;
goto error_get_dst;
}
assert(dst_mbufs[0]);
/* There is a 1:1 mapping between a bdev_io and a compression operation
* Some PMDs that SPDK uses don't support chaining, but reduce library should
* provide correct buffers
* Build our mbuf chain and associate it with our single comp_op.
*/
rc = _setup_compress_mbuf(src_mbufs, &src_mbuf_total, &total_length,
src_iovs, src_iovcnt, reduce_cb_arg);
if (rc < 0) {
goto error_src_dst;
}
if (!comp_bdev->backing_dev.sgl_in && src_mbufs[0]->next != NULL) {
if (src_iovcnt == 1) {
SPDK_ERRLOG("Src buffer crosses physical page boundary but driver %s doesn't support SGL input\n",
comp_bdev->drv_name);
} else {
SPDK_ERRLOG("Driver %s doesn't support SGL input\n", comp_bdev->drv_name);
}
rc = -EINVAL;
goto error_src_dst;
}
comp_op->m_src = src_mbufs[0];
comp_op->src.offset = 0;
comp_op->src.length = total_length;
rc = _setup_compress_mbuf(dst_mbufs, &dst_mbuf_total, NULL,
dst_iovs, dst_iovcnt, reduce_cb_arg);
if (rc < 0) {
goto error_src_dst;
}
if (!comp_bdev->backing_dev.sgl_out && dst_mbufs[0]->next != NULL) {
if (dst_iovcnt == 1) {
SPDK_ERRLOG("Dst buffer crosses physical page boundary but driver %s doesn't support SGL output\n",
comp_bdev->drv_name);
} else {
SPDK_ERRLOG("Driver %s doesn't support SGL output\n", comp_bdev->drv_name);
}
rc = -EINVAL;
goto error_src_dst;
}
comp_op->m_dst = dst_mbufs[0];
comp_op->dst.offset = 0;
if (compress == true) {
comp_op->private_xform = comp_bdev->device_qp->device->comp_xform;
if (compress) {
assert(dst_iovcnt == 1);
rc = spdk_accel_submit_compress(comp_bdev->accel_channel, dst_iovs[0].iov_base, dst_iovs[0].iov_len,
src_iovs, src_iovcnt, &reduce_cb_arg->output_size,
0, reduce_cb_arg->cb_fn, reduce_cb_arg->cb_arg);
} else {
comp_op->private_xform = comp_bdev->device_qp->device->decomp_xform;
rc = spdk_accel_submit_decompress(comp_bdev->accel_channel, dst_iovs, dst_iovcnt,
src_iovs, src_iovcnt, &reduce_cb_arg->output_size,
0, reduce_cb_arg->cb_fn, reduce_cb_arg->cb_arg);
}
comp_op->op_type = RTE_COMP_OP_STATELESS;
comp_op->flush_flag = RTE_COMP_FLUSH_FINAL;
rc = rte_compressdev_enqueue_burst(cdev_id, comp_bdev->device_qp->qp, &comp_op, 1);
assert(rc <= 1);
/* We always expect 1 got queued, if 0 then we need to queue it up. */
if (rc == 1) {
return 0;
} else if (comp_op->status == RTE_COMP_OP_STATUS_NOT_PROCESSED) {
rc = -EAGAIN;
} else {
device_error = true;
}
/* Error cleanup paths. */
error_src_dst:
rte_pktmbuf_free_bulk(dst_mbufs, dst_iovcnt);
error_get_dst:
rte_pktmbuf_free_bulk(src_mbufs, src_iovcnt);
error_get_src:
rte_comp_op_free(comp_op);
error_get_op:
if (device_error == true) {
/* There was an error sending the op to the device, most
* likely with the parameters.
*/
SPDK_ERRLOG("Compression API returned 0x%x\n", comp_op->status);
return -EINVAL;
}
if (rc != -ENOMEM && rc != -EAGAIN) {
return rc;
}
op_to_queue = calloc(1, sizeof(struct vbdev_comp_op));
if (op_to_queue == NULL) {
SPDK_ERRLOG("unable to allocate operation for queueing.\n");
return -ENOMEM;
}
op_to_queue->backing_dev = backing_dev;
op_to_queue->src_iovs = src_iovs;
op_to_queue->src_iovcnt = src_iovcnt;
op_to_queue->dst_iovs = dst_iovs;
op_to_queue->dst_iovcnt = dst_iovcnt;
op_to_queue->compress = compress;
op_to_queue->cb_arg = cb_arg;
TAILQ_INSERT_TAIL(&comp_bdev->queued_comp_ops,
op_to_queue,
link);
return 0;
}
/* Poller for the DPDK compression driver. */
static int
comp_dev_poller(void *args)
{
struct vbdev_compress *comp_bdev = args;
uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id;
struct rte_comp_op *deq_ops[NUM_MAX_INFLIGHT_OPS];
uint16_t num_deq;
struct spdk_reduce_vol_cb_args *reduce_args;
struct vbdev_comp_op *op_to_resubmit;
int rc, i;
num_deq = rte_compressdev_dequeue_burst(cdev_id, comp_bdev->device_qp->qp, deq_ops,
NUM_MAX_INFLIGHT_OPS);
for (i = 0; i < num_deq; i++) {
reduce_args = (struct spdk_reduce_vol_cb_args *)*RTE_MBUF_DYNFIELD(deq_ops[i]->m_src, g_mbuf_offset,
uint64_t *);
if (deq_ops[i]->status == RTE_COMP_OP_STATUS_SUCCESS) {
/* tell reduce this is done and what the bytecount was */
reduce_args->cb_fn(reduce_args->cb_arg, deq_ops[i]->produced);
} else {
SPDK_NOTICELOG("FYI storing data uncompressed due to deque status %u\n",
deq_ops[i]->status);
/* Reduce will simply store uncompressed on neg errno value. */
reduce_args->cb_fn(reduce_args->cb_arg, -EINVAL);
}
/* Now free both mbufs and the compress operation. The rte_pktmbuf_free()
* call takes care of freeing all of the mbufs in the chain back to their
* original pool.
*/
rte_pktmbuf_free(deq_ops[i]->m_src);
rte_pktmbuf_free(deq_ops[i]->m_dst);
/* There is no bulk free for com ops so we have to free them one at a time
* here however it would be rare that we'd ever have more than 1 at a time
* anyways.
*/
rte_comp_op_free(deq_ops[i]);
/* Check if there are any pending comp ops to process, only pull one
* at a time off as _compress_operation() may re-queue the op.
*/
if (!TAILQ_EMPTY(&comp_bdev->queued_comp_ops)) {
op_to_resubmit = TAILQ_FIRST(&comp_bdev->queued_comp_ops);
rc = _compress_operation(op_to_resubmit->backing_dev,
op_to_resubmit->src_iovs,
op_to_resubmit->src_iovcnt,
op_to_resubmit->dst_iovs,
op_to_resubmit->dst_iovcnt,
op_to_resubmit->compress,
op_to_resubmit->cb_arg);
if (rc == 0) {
TAILQ_REMOVE(&comp_bdev->queued_comp_ops, op_to_resubmit, link);
free(op_to_resubmit);
}
}
}
return num_deq == 0 ? SPDK_POLLER_IDLE : SPDK_POLLER_BUSY;
return rc;
}
/* Entry point for reduce lib to issue a compress operation. */
@ -1068,7 +492,6 @@ vbdev_compress_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
spdk_json_write_object_begin(w);
spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev));
spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev));
spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name);
spdk_json_write_object_end(w);
return 0;
@ -1079,6 +502,13 @@ static int
vbdev_compress_config_json(struct spdk_json_write_ctx *w)
{
struct vbdev_compress *comp_bdev;
const char *module_name = NULL;
int rc;
rc = spdk_accel_get_opc_module_name(ACCEL_OPC_COMPRESS, &module_name);
if (rc) {
SPDK_ERRLOG("error getting module name (%d)\n", rc);
}
TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
spdk_json_write_object_begin(w);
@ -1086,7 +516,6 @@ vbdev_compress_config_json(struct spdk_json_write_ctx *w)
spdk_json_write_named_object_begin(w, "params");
spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev));
spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev));
spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name);
spdk_json_write_object_end(w);
spdk_json_write_object_end(w);
}
@ -1301,7 +730,6 @@ _prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size)
return NULL;
}
meta_ctx->drv_name = "None";
meta_ctx->backing_dev.unmap = _comp_reduce_unmap;
meta_ctx->backing_dev.readv = _comp_reduce_readv;
meta_ctx->backing_dev.writev = _comp_reduce_writev;
@ -1326,31 +754,6 @@ _prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size)
return meta_ctx;
}
static bool
_set_pmd(struct vbdev_compress *comp_dev)
{
if (g_opts == COMPRESS_PMD_AUTO) {
if (g_qat_available) {
comp_dev->drv_name = QAT_PMD;
} else if (g_mlx5_pci_available) {
comp_dev->drv_name = MLX5_PMD;
} else {
comp_dev->drv_name = ISAL_PMD;
}
} else if (g_opts == COMPRESS_PMD_QAT_ONLY && g_qat_available) {
comp_dev->drv_name = QAT_PMD;
} else if (g_opts == COMPRESS_PMD_ISAL_ONLY && g_isal_available) {
comp_dev->drv_name = ISAL_PMD;
} else if (g_opts == COMPRESS_PMD_MLX5_PCI_ONLY && g_mlx5_pci_available) {
comp_dev->drv_name = MLX5_PMD;
} else {
SPDK_ERRLOG("Requested PMD is not available.\n");
return false;
}
SPDK_NOTICELOG("PMD being used: %s\n", comp_dev->drv_name);
return true;
}
/* Call reducelib to initialize a new volume */
static int
vbdev_init_reduce(const char *bdev_name, const char *pm_path, uint32_t lb_size)
@ -1372,13 +775,6 @@ vbdev_init_reduce(const char *bdev_name, const char *pm_path, uint32_t lb_size)
return -EINVAL;
}
if (_set_pmd(meta_ctx) == false) {
SPDK_ERRLOG("could not find required pmd\n");
free(meta_ctx);
spdk_bdev_close(bdev_desc);
return -EINVAL;
}
/* Save the thread where the base device is opened */
meta_ctx->thread = spdk_get_thread();
@ -1401,11 +797,6 @@ static int
comp_bdev_ch_create_cb(void *io_device, void *ctx_buf)
{
struct vbdev_compress *comp_bdev = io_device;
struct comp_device_qp *device_qp;
/* TODO look into associating the device_qp with the channel vs the thread,
* doing in next patch to make this one easier to review against code taken
* from the vbdev module */
/* Now set the reduce channel if it's not already set. */
pthread_mutex_lock(&comp_bdev->reduce_lock);
@ -1418,55 +809,20 @@ comp_bdev_ch_create_cb(void *io_device, void *ctx_buf)
comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
comp_bdev->reduce_thread = spdk_get_thread();
comp_bdev->poller = SPDK_POLLER_REGISTER(comp_dev_poller, comp_bdev, 0);
/* Now assign a q pair */
pthread_mutex_lock(&g_comp_device_qp_lock);
TAILQ_FOREACH(device_qp, &g_comp_device_qp, link) {
if (strcmp(device_qp->device->cdev_info.driver_name, comp_bdev->drv_name) == 0) {
if (device_qp->thread == spdk_get_thread()) {
comp_bdev->device_qp = device_qp;
break;
}
if (device_qp->thread == NULL) {
comp_bdev->device_qp = device_qp;
device_qp->thread = spdk_get_thread();
break;
}
}
}
pthread_mutex_unlock(&g_comp_device_qp_lock);
comp_bdev->accel_channel = spdk_accel_get_io_channel();
}
comp_bdev->ch_count++;
pthread_mutex_unlock(&comp_bdev->reduce_lock);
if (comp_bdev->device_qp != NULL) {
uint64_t comp_feature_flags =
comp_bdev->device_qp->device->cdev_info.capabilities[RTE_COMP_ALGO_DEFLATE].comp_feature_flags;
if (comp_feature_flags & (RTE_COMP_FF_OOP_SGL_IN_SGL_OUT | RTE_COMP_FF_OOP_SGL_IN_LB_OUT)) {
comp_bdev->backing_dev.sgl_in = true;
}
if (comp_feature_flags & (RTE_COMP_FF_OOP_SGL_IN_SGL_OUT | RTE_COMP_FF_OOP_LB_IN_SGL_OUT)) {
comp_bdev->backing_dev.sgl_out = true;
}
return 0;
} else {
SPDK_ERRLOG("out of qpairs, cannot assign one to comp_bdev %p\n", comp_bdev);
assert(false);
return -ENOMEM;
}
return 0;
}
static void
_channel_cleanup(struct vbdev_compress *comp_bdev)
{
/* Note: comp_bdevs can share a device_qp if they are
* on the same thread so we leave the device_qp element
* alone for this comp_bdev and just clear the reduce thread.
*/
spdk_put_io_channel(comp_bdev->base_ch);
spdk_put_io_channel(comp_bdev->accel_channel);
comp_bdev->reduce_thread = NULL;
spdk_poller_unregister(&comp_bdev->poller);
}
/* Used to reroute destroy_ch to the correct thread */
@ -1523,15 +879,9 @@ create_compress_bdev(const char *bdev_name, const char *pm_path, uint32_t lb_siz
return vbdev_init_reduce(bdev_name, pm_path, lb_size);
}
/* On init, just init the compress drivers. All metadata is stored on disk. */
static int
vbdev_compress_init(void)
{
if (vbdev_init_compress_drivers()) {
SPDK_ERRLOG("Error setting up compression devices\n");
return -EINVAL;
}
return 0;
}
@ -1539,17 +889,7 @@ vbdev_compress_init(void)
static void
vbdev_compress_finish(void)
{
struct comp_device_qp *dev_qp;
/* TODO: unload vol in a future patch */
while ((dev_qp = TAILQ_FIRST(&g_comp_device_qp))) {
TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link);
free(dev_qp);
}
pthread_mutex_destroy(&g_comp_device_qp_lock);
rte_mempool_free(g_comp_op_mp);
rte_mempool_free(g_mbuf_mp);
}
/* During init we'll be asked how much memory we'd like passed to us
@ -1620,15 +960,6 @@ vbdev_compress_claim(struct vbdev_compress *comp_bdev)
comp_bdev->comp_bdev.product_name = COMP_BDEV_NAME;
comp_bdev->comp_bdev.write_cache = comp_bdev->base_bdev->write_cache;
if (strcmp(comp_bdev->drv_name, QAT_PMD) == 0) {
comp_bdev->comp_bdev.required_alignment =
spdk_max(spdk_u32log2(comp_bdev->base_bdev->blocklen),
comp_bdev->base_bdev->required_alignment);
SPDK_NOTICELOG("QAT in use: Required alignment set to %u\n",
comp_bdev->comp_bdev.required_alignment);
} else {
comp_bdev->comp_bdev.required_alignment = comp_bdev->base_bdev->required_alignment;
}
comp_bdev->comp_bdev.optimal_io_boundary =
comp_bdev->params.chunk_size / comp_bdev->params.logical_block_size;
@ -1757,11 +1088,6 @@ _vbdev_reduce_load_cb(void *ctx)
spdk_put_io_channel(meta_ctx->base_ch);
if (meta_ctx->reduce_errno == 0) {
if (_set_pmd(meta_ctx) == false) {
SPDK_ERRLOG("could not find required pmd\n");
goto err;
}
rc = vbdev_compress_claim(meta_ctx);
if (rc != 0) {
goto err;
@ -1867,12 +1193,4 @@ vbdev_compress_examine(struct spdk_bdev *bdev)
spdk_reduce_vol_load(&meta_ctx->backing_dev, vbdev_reduce_load_cb, meta_ctx);
}
int
compress_set_pmd(enum compress_pmd *opts)
{
g_opts = *opts;
return 0;
}
SPDK_LOG_REGISTER_COMPONENT(vbdev_compress)

View File

@ -45,16 +45,6 @@ bool compress_has_orphan(const char *name);
*/
const char *compress_get_name(const struct vbdev_compress *comp_bdev);
enum compress_pmd {
COMPRESS_PMD_AUTO = 0,
COMPRESS_PMD_QAT_ONLY,
COMPRESS_PMD_ISAL_ONLY,
COMPRESS_PMD_MLX5_PCI_ONLY,
COMPRESS_PMD_MAX
};
int compress_set_pmd(enum compress_pmd *opts);
typedef void (*spdk_delete_compress_complete)(void *cb_arg, int bdeverrno);
/**

View File

@ -70,47 +70,6 @@ rpc_bdev_compress_get_orphans(struct spdk_jsonrpc_request *request,
}
SPDK_RPC_REGISTER("bdev_compress_get_orphans", rpc_bdev_compress_get_orphans, SPDK_RPC_RUNTIME)
struct rpc_compress_set_pmd {
enum compress_pmd pmd;
};
static const struct spdk_json_object_decoder rpc_compress_pmd_decoder[] = {
{"pmd", offsetof(struct rpc_compress_set_pmd, pmd), spdk_json_decode_int32},
};
static void
rpc_bdev_compress_set_pmd(struct spdk_jsonrpc_request *request,
const struct spdk_json_val *params)
{
struct rpc_compress_set_pmd req;
int rc = 0;
if (spdk_json_decode_object(params, rpc_compress_pmd_decoder,
SPDK_COUNTOF(rpc_compress_pmd_decoder),
&req)) {
SPDK_ERRLOG("spdk_json_decode_object failed\n");
spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INTERNAL_ERROR,
"spdk_json_decode_object failed");
return;
}
if (req.pmd >= COMPRESS_PMD_MAX) {
spdk_jsonrpc_send_error_response_fmt(request, -EINVAL,
"PMD value %d should be less than %d", req.pmd, COMPRESS_PMD_MAX);
return;
}
rc = compress_set_pmd(&req.pmd);
if (rc) {
spdk_jsonrpc_send_error_response(request, rc, spdk_strerror(-rc));
return;
}
spdk_jsonrpc_send_bool_response(request, true);
}
SPDK_RPC_REGISTER("bdev_compress_set_pmd", rpc_bdev_compress_set_pmd,
SPDK_RPC_STARTUP | SPDK_RPC_RUNTIME)
/* Structure to hold the parameters for this RPC method. */
struct rpc_construct_compress {
char *base_bdev_name;

View File

@ -78,17 +78,6 @@ def bdev_compress_delete(client, name):
return client.call('bdev_compress_delete', params)
def bdev_compress_set_pmd(client, pmd):
"""Set pmd options for the bdev compress.
Args:
pmd: 0 = auto-select, 1 = QAT, 2 = ISAL, 3 = mlx5_pci
"""
params = {'pmd': pmd}
return client.call('bdev_compress_set_pmd', params)
def bdev_compress_get_orphans(client, name=None):
"""Get a list of comp bdevs that do not have a pmem file (aka orphaned).

View File

@ -255,13 +255,6 @@ if __name__ == "__main__":
p.add_argument('name', help='compress bdev name')
p.set_defaults(func=bdev_compress_delete)
def bdev_compress_set_pmd(args):
rpc.bdev.bdev_compress_set_pmd(args.client,
pmd=args.pmd)
p = subparsers.add_parser('bdev_compress_set_pmd', help='Set pmd option for a compress disk')
p.add_argument('-p', '--pmd', type=int, help='0 = auto-select, 1= QAT only, 2 = ISAL only, 3 = mlx5_pci only')
p.set_defaults(func=bdev_compress_set_pmd)
def bdev_compress_get_orphans(args):
print_dict(rpc.bdev.bdev_compress_get_orphans(args.client,
name=args.name))

View File

@ -38,7 +38,6 @@ function create_vols() {
$rpc_py bdev_lvol_create -t -l lvs0 lv0 100
waitforbdev lvs0/lv0
$rpc_py bdev_compress_set_pmd -p "$pmd"
if [ -z "$1" ]; then
$rpc_py bdev_compress_create -b lvs0/lv0 -p /tmp/pmem
else
@ -48,7 +47,11 @@ function create_vols() {
}
function run_bdevio() {
$rootdir/test/bdev/bdevio/bdevio -w &
if [[ $test_type == "compdev" ]]; then
$rootdir/test/bdev/bdevio/bdevio -c $rootdir/test/compress/dpdk.json -w &
else
$rootdir/test/bdev/bdevio/bdevio -w &
fi
bdevio_pid=$!
trap 'killprocess $bdevio_pid; error_cleanup; exit 1' SIGINT SIGTERM EXIT
waitforlisten $bdevio_pid
@ -60,7 +63,11 @@ function run_bdevio() {
}
function run_bdevperf() {
$rootdir/build/examples/bdevperf -z -q $1 -o $2 -w verify -t $3 -C -m 0x6 &
if [[ $test_type == "compdev" ]]; then
$rootdir/build/examples/bdevperf -z -q $1 -o $2 -w verify -t $3 -C -m 0x6 -c $rootdir/test/compress/dpdk.json &
else
$rootdir/build/examples/bdevperf -z -q $1 -o $2 -w verify -t $3 -C -m 0x6 &
fi
bdevperf_pid=$!
trap 'killprocess $bdevperf_pid; error_cleanup; exit 1' SIGINT SIGTERM EXIT
waitforlisten $bdevperf_pid
@ -71,21 +78,8 @@ function run_bdevperf() {
killprocess $bdevperf_pid
}
test_type=$1
case "$test_type" in
qat)
pmd=1
;;
isal)
pmd=2
;;
*)
echo "invalid pmd name"
exit 1
;;
esac
mkdir -p /tmp/pmem
test_type=$1
# per patch bdevperf uses slightly different params than nightly
# logical block size same as underlying device, then 512 then 4096

15
test/compress/dpdk.json Normal file
View File

@ -0,0 +1,15 @@
{
"subsystems": [
{
"subsystem": "accel",
"config": [
{
"method": "compressdev_scan_accel_module",
"params": {
"pmd": 1
}
}
]
}
]
}

View File

@ -13,7 +13,6 @@
#include "unit/lib/json_mock.c"
#include "spdk/reduce.h"
#include <rte_compressdev.h>
/* There will be one if the data perfectly matches the chunk size,
* or there could be an offset into the data and a remainder after
@ -27,207 +26,10 @@
struct spdk_bdev_io *g_bdev_io;
struct spdk_io_channel *g_io_ch;
struct rte_comp_op g_comp_op[2];
struct vbdev_compress g_comp_bdev;
struct comp_device_qp g_device_qp;
struct compress_dev g_device;
struct rte_compressdev_capabilities g_cdev_cap;
static struct rte_mbuf *g_src_mbufs[UT_MBUFS_PER_OP_BOUND_TEST];
static struct rte_mbuf *g_dst_mbufs[UT_MBUFS_PER_OP];
static struct rte_mbuf g_expected_src_mbufs[UT_MBUFS_PER_OP_BOUND_TEST];
static struct rte_mbuf g_expected_dst_mbufs[UT_MBUFS_PER_OP];
struct comp_bdev_io *g_io_ctx;
struct comp_io_channel *g_comp_ch;
/* Those functions are defined as static inline in DPDK, so we can't
* mock them straight away. We use defines to redirect them into
* our custom functions.
*/
static void mock_rte_pktmbuf_attach_extbuf(struct rte_mbuf *m, void *buf_addr, rte_iova_t buf_iova,
uint16_t buf_len, struct rte_mbuf_ext_shared_info *shinfo);
#define rte_pktmbuf_attach_extbuf mock_rte_pktmbuf_attach_extbuf
static void
mock_rte_pktmbuf_attach_extbuf(struct rte_mbuf *m, void *buf_addr, rte_iova_t buf_iova,
uint16_t buf_len, struct rte_mbuf_ext_shared_info *shinfo)
{
assert(m != NULL);
m->buf_addr = buf_addr;
m->buf_iova = buf_iova;
m->buf_len = buf_len;
m->data_len = m->pkt_len = 0;
}
static char *mock_rte_pktmbuf_append(struct rte_mbuf *m, uint16_t len);
#define rte_pktmbuf_append mock_rte_pktmbuf_append
static char *
mock_rte_pktmbuf_append(struct rte_mbuf *m, uint16_t len)
{
m->pkt_len = m->pkt_len + len;
return NULL;
}
static inline int mock_rte_pktmbuf_chain(struct rte_mbuf *head, struct rte_mbuf *tail);
#define rte_pktmbuf_chain mock_rte_pktmbuf_chain
static inline int
mock_rte_pktmbuf_chain(struct rte_mbuf *head, struct rte_mbuf *tail)
{
struct rte_mbuf *cur_tail;
cur_tail = rte_pktmbuf_lastseg(head);
cur_tail->next = tail;
return 0;
}
uint16_t ut_max_nb_queue_pairs = 0;
void __rte_experimental mock_rte_compressdev_info_get(uint8_t dev_id,
struct rte_compressdev_info *dev_info);
#define rte_compressdev_info_get mock_rte_compressdev_info_get
void __rte_experimental
mock_rte_compressdev_info_get(uint8_t dev_id, struct rte_compressdev_info *dev_info)
{
dev_info->max_nb_queue_pairs = ut_max_nb_queue_pairs;
dev_info->capabilities = &g_cdev_cap;
dev_info->driver_name = "compress_isal";
}
int ut_rte_compressdev_configure = 0;
int __rte_experimental mock_rte_compressdev_configure(uint8_t dev_id,
struct rte_compressdev_config *config);
#define rte_compressdev_configure mock_rte_compressdev_configure
int __rte_experimental
mock_rte_compressdev_configure(uint8_t dev_id, struct rte_compressdev_config *config)
{
return ut_rte_compressdev_configure;
}
int ut_rte_compressdev_queue_pair_setup = 0;
int __rte_experimental mock_rte_compressdev_queue_pair_setup(uint8_t dev_id, uint16_t queue_pair_id,
uint32_t max_inflight_ops, int socket_id);
#define rte_compressdev_queue_pair_setup mock_rte_compressdev_queue_pair_setup
int __rte_experimental
mock_rte_compressdev_queue_pair_setup(uint8_t dev_id, uint16_t queue_pair_id,
uint32_t max_inflight_ops, int socket_id)
{
return ut_rte_compressdev_queue_pair_setup;
}
int ut_rte_compressdev_start = 0;
int __rte_experimental mock_rte_compressdev_start(uint8_t dev_id);
#define rte_compressdev_start mock_rte_compressdev_start
int __rte_experimental
mock_rte_compressdev_start(uint8_t dev_id)
{
return ut_rte_compressdev_start;
}
int ut_rte_compressdev_private_xform_create = 0;
int __rte_experimental mock_rte_compressdev_private_xform_create(uint8_t dev_id,
const struct rte_comp_xform *xform, void **private_xform);
#define rte_compressdev_private_xform_create mock_rte_compressdev_private_xform_create
int __rte_experimental
mock_rte_compressdev_private_xform_create(uint8_t dev_id,
const struct rte_comp_xform *xform, void **private_xform)
{
return ut_rte_compressdev_private_xform_create;
}
uint8_t ut_rte_compressdev_count = 0;
uint8_t __rte_experimental mock_rte_compressdev_count(void);
#define rte_compressdev_count mock_rte_compressdev_count
uint8_t __rte_experimental
mock_rte_compressdev_count(void)
{
return ut_rte_compressdev_count;
}
struct rte_mempool *ut_rte_comp_op_pool_create = NULL;
struct rte_mempool *__rte_experimental mock_rte_comp_op_pool_create(const char *name,
unsigned int nb_elts, unsigned int cache_size, uint16_t user_size,
int socket_id);
#define rte_comp_op_pool_create mock_rte_comp_op_pool_create
struct rte_mempool *__rte_experimental
mock_rte_comp_op_pool_create(const char *name, unsigned int nb_elts,
unsigned int cache_size, uint16_t user_size, int socket_id)
{
return ut_rte_comp_op_pool_create;
}
void mock_rte_pktmbuf_free(struct rte_mbuf *m);
#define rte_pktmbuf_free mock_rte_pktmbuf_free
void
mock_rte_pktmbuf_free(struct rte_mbuf *m)
{
}
void mock_rte_pktmbuf_free_bulk(struct rte_mbuf **m, unsigned int cnt);
#define rte_pktmbuf_free_bulk mock_rte_pktmbuf_free_bulk
void
mock_rte_pktmbuf_free_bulk(struct rte_mbuf **m, unsigned int cnt)
{
}
static bool ut_boundary_alloc = false;
static int ut_rte_pktmbuf_alloc_bulk = 0;
int mock_rte_pktmbuf_alloc_bulk(struct rte_mempool *pool, struct rte_mbuf **mbufs,
unsigned count);
#define rte_pktmbuf_alloc_bulk mock_rte_pktmbuf_alloc_bulk
int
mock_rte_pktmbuf_alloc_bulk(struct rte_mempool *pool, struct rte_mbuf **mbufs,
unsigned count)
{
int i;
/* This mocked function only supports the alloc of up to 3 src and 3 dst. */
ut_rte_pktmbuf_alloc_bulk += count;
if (ut_rte_pktmbuf_alloc_bulk == 1) {
/* allocation of an extra mbuf for boundary cross test */
ut_boundary_alloc = true;
g_src_mbufs[UT_MBUFS_PER_OP_BOUND_TEST - 1]->next = NULL;
*mbufs = g_src_mbufs[UT_MBUFS_PER_OP_BOUND_TEST - 1];
ut_rte_pktmbuf_alloc_bulk = 0;
} else if (ut_rte_pktmbuf_alloc_bulk == UT_MBUFS_PER_OP) {
/* first test allocation, src mbufs */
for (i = 0; i < UT_MBUFS_PER_OP; i++) {
g_src_mbufs[i]->next = NULL;
*mbufs++ = g_src_mbufs[i];
}
} else if (ut_rte_pktmbuf_alloc_bulk == UT_MBUFS_PER_OP * 2) {
/* second test allocation, dst mbufs */
for (i = 0; i < UT_MBUFS_PER_OP; i++) {
g_dst_mbufs[i]->next = NULL;
*mbufs++ = g_dst_mbufs[i];
}
ut_rte_pktmbuf_alloc_bulk = 0;
} else {
return -1;
}
return 0;
}
struct rte_mempool *
rte_pktmbuf_pool_create(const char *name, unsigned n, unsigned cache_size,
uint16_t priv_size, uint16_t data_room_size, int socket_id)
{
struct spdk_mempool *tmp;
tmp = spdk_mempool_create("mbuf_mp", 1024, sizeof(struct rte_mbuf),
SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
SPDK_ENV_SOCKET_ID_ANY);
return (struct rte_mempool *)tmp;
}
void
rte_mempool_free(struct rte_mempool *mp)
{
if (mp) {
spdk_mempool_free((struct spdk_mempool *)mp);
}
}
static int ut_spdk_reduce_vol_op_complete_err = 0;
void
spdk_reduce_vol_writev(struct spdk_reduce_vol *vol, struct iovec *iov, int iovcnt,
@ -248,6 +50,9 @@ spdk_reduce_vol_readv(struct spdk_reduce_vol *vol, struct iovec *iov, int iovcnt
#include "bdev/compress/vbdev_compress.c"
/* SPDK stubs */
DEFINE_STUB(spdk_accel_get_opc_module_name, int, (enum accel_opcode opcode,
const char **module_name), 0);
DEFINE_STUB(spdk_accel_get_io_channel, struct spdk_io_channel *, (void), (void *)0xfeedbeef);
DEFINE_STUB(spdk_bdev_get_aliases, const struct spdk_bdev_aliases_list *,
(const struct spdk_bdev *bdev), NULL);
DEFINE_STUB_V(spdk_bdev_module_list_add, (struct spdk_bdev_module *bdev_module));
@ -286,15 +91,6 @@ DEFINE_STUB_V(spdk_reduce_vol_init, (struct spdk_reduce_vol_params *params,
DEFINE_STUB_V(spdk_reduce_vol_destroy, (struct spdk_reduce_backing_dev *backing_dev,
spdk_reduce_vol_op_complete cb_fn, void *cb_arg));
/* DPDK stubs */
#define DPDK_DYNFIELD_OFFSET offsetof(struct rte_mbuf, dynfield1[1])
DEFINE_STUB(rte_mbuf_dynfield_register, int, (const struct rte_mbuf_dynfield *params),
DPDK_DYNFIELD_OFFSET);
DEFINE_STUB(rte_socket_id, unsigned, (void), 0);
DEFINE_STUB(rte_vdev_init, int, (const char *name, const char *args), 0);
DEFINE_STUB_V(rte_comp_op_free, (struct rte_comp_op *op));
DEFINE_STUB(rte_comp_op_alloc, struct rte_comp_op *, (struct rte_mempool *mempool), NULL);
int g_small_size_counter = 0;
int g_small_size_modify = 0;
uint64_t g_small_size = 0;
@ -380,132 +176,22 @@ spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status sta
g_completion_called = true;
}
static uint16_t ut_rte_compressdev_dequeue_burst = 0;
uint16_t
rte_compressdev_dequeue_burst(uint8_t dev_id, uint16_t qp_id, struct rte_comp_op **ops,
uint16_t nb_op)
int
spdk_accel_submit_compress(struct spdk_io_channel *ch, void *dst, uint64_t nbytes,
struct iovec *src_iovs, size_t src_iovcnt, uint32_t *output_size, int flags,
spdk_accel_completion_cb cb_fn, void *cb_arg)
{
if (ut_rte_compressdev_dequeue_burst == 0) {
return 0;
}
ops[0] = &g_comp_op[0];
ops[1] = &g_comp_op[1];
return ut_rte_compressdev_dequeue_burst;
return 0;
}
static int ut_compress_done[2];
/* done_count and done_idx together control which expected assertion
* value to use when dequeuing 2 operations.
*/
static uint16_t done_count = 1;
static uint16_t done_idx = 0;
static void
_compress_done(void *_req, int reduce_errno)
int
spdk_accel_submit_decompress(struct spdk_io_channel *ch, struct iovec *dst_iovs, size_t dst_iovcnt,
struct iovec *src_iovs, size_t src_iovcnt, uint32_t *output_size, int flags,
spdk_accel_completion_cb cb_fn, void *cb_arg)
{
if (done_count == 1) {
CU_ASSERT(reduce_errno == ut_compress_done[0]);
} else if (done_count == 2) {
CU_ASSERT(reduce_errno == ut_compress_done[done_idx++]);
}
}
static void
_get_mbuf_array(struct rte_mbuf **mbuf_array, struct rte_mbuf *mbuf_head,
int mbuf_count, bool null_final)
{
int i;
for (i = 0; i < mbuf_count; i++) {
mbuf_array[i] = mbuf_head;
if (mbuf_head) {
mbuf_head = mbuf_head->next;
}
}
if (null_final) {
mbuf_array[i - 1] = NULL;
}
}
#define FAKE_ENQUEUE_SUCCESS 255
#define FAKE_ENQUEUE_ERROR 128
#define FAKE_ENQUEUE_BUSY 64
static uint16_t ut_enqueue_value = FAKE_ENQUEUE_SUCCESS;
static struct rte_comp_op ut_expected_op;
uint16_t
rte_compressdev_enqueue_burst(uint8_t dev_id, uint16_t qp_id, struct rte_comp_op **ops,
uint16_t nb_ops)
{
struct rte_comp_op *op = *ops;
struct rte_mbuf *op_mbuf[UT_MBUFS_PER_OP_BOUND_TEST];
struct rte_mbuf *exp_mbuf[UT_MBUFS_PER_OP_BOUND_TEST];
int i, num_src_mbufs = UT_MBUFS_PER_OP;
switch (ut_enqueue_value) {
case FAKE_ENQUEUE_BUSY:
op->status = RTE_COMP_OP_STATUS_NOT_PROCESSED;
return 0;
case FAKE_ENQUEUE_SUCCESS:
op->status = RTE_COMP_OP_STATUS_SUCCESS;
return 1;
case FAKE_ENQUEUE_ERROR:
op->status = RTE_COMP_OP_STATUS_ERROR;
return 0;
default:
break;
}
/* by design the compress module will never send more than 1 op at a time */
CU_ASSERT(op->private_xform == ut_expected_op.private_xform);
/* setup our local pointers to the chained mbufs, those pointed to in the
* operation struct and the expected values.
*/
_get_mbuf_array(op_mbuf, op->m_src, SPDK_COUNTOF(op_mbuf), true);
_get_mbuf_array(exp_mbuf, ut_expected_op.m_src, SPDK_COUNTOF(exp_mbuf), true);
if (ut_boundary_alloc == true) {
/* if we crossed a boundary, we need to check the 4th src mbuf and
* reset the global that is used to identify whether we crossed
* or not
*/
num_src_mbufs = UT_MBUFS_PER_OP_BOUND_TEST;
exp_mbuf[UT_MBUFS_PER_OP_BOUND_TEST - 1] = ut_expected_op.m_src->next->next->next;
op_mbuf[UT_MBUFS_PER_OP_BOUND_TEST - 1] = op->m_src->next->next->next;
ut_boundary_alloc = false;
}
for (i = 0; i < num_src_mbufs; i++) {
CU_ASSERT(op_mbuf[i]->buf_addr == exp_mbuf[i]->buf_addr);
CU_ASSERT(op_mbuf[i]->buf_iova == exp_mbuf[i]->buf_iova);
CU_ASSERT(op_mbuf[i]->buf_len == exp_mbuf[i]->buf_len);
CU_ASSERT(op_mbuf[i]->pkt_len == exp_mbuf[i]->pkt_len);
}
/* if only 3 mbufs were used in the test, the 4th should be zeroed */
if (num_src_mbufs == UT_MBUFS_PER_OP) {
CU_ASSERT(op_mbuf[UT_MBUFS_PER_OP_BOUND_TEST - 1] == NULL);
CU_ASSERT(exp_mbuf[UT_MBUFS_PER_OP_BOUND_TEST - 1] == NULL);
}
CU_ASSERT(*RTE_MBUF_DYNFIELD(op->m_src, g_mbuf_offset, uint64_t *) ==
*RTE_MBUF_DYNFIELD(ut_expected_op.m_src, g_mbuf_offset, uint64_t *));
CU_ASSERT(op->src.offset == ut_expected_op.src.offset);
CU_ASSERT(op->src.length == ut_expected_op.src.length);
/* check dst mbuf values */
_get_mbuf_array(op_mbuf, op->m_dst, SPDK_COUNTOF(op_mbuf), true);
_get_mbuf_array(exp_mbuf, ut_expected_op.m_dst, SPDK_COUNTOF(exp_mbuf), true);
for (i = 0; i < UT_MBUFS_PER_OP; i++) {
CU_ASSERT(op_mbuf[i]->buf_addr == exp_mbuf[i]->buf_addr);
CU_ASSERT(op_mbuf[i]->buf_iova == exp_mbuf[i]->buf_iova);
CU_ASSERT(op_mbuf[i]->buf_len == exp_mbuf[i]->buf_len);
CU_ASSERT(op_mbuf[i]->pkt_len == exp_mbuf[i]->pkt_len);
}
CU_ASSERT(op->dst.offset == ut_expected_op.dst.offset);
return ut_enqueue_value;
return 0;
}
/* Global setup for all tests that share a bunch of preparation... */
@ -513,14 +199,12 @@ static int
test_setup(void)
{
struct spdk_thread *thread;
int i;
spdk_thread_lib_init(NULL, 0);
thread = spdk_thread_create(NULL, NULL);
spdk_set_thread(thread);
g_comp_bdev.drv_name = "test";
g_comp_bdev.reduce_thread = thread;
g_comp_bdev.backing_dev.unmap = _comp_reduce_unmap;
g_comp_bdev.backing_dev.readv = _comp_reduce_readv;
@ -532,44 +216,8 @@ test_setup(void)
g_comp_bdev.backing_dev.sgl_in = true;
g_comp_bdev.backing_dev.sgl_out = true;
g_comp_bdev.device_qp = &g_device_qp;
g_comp_bdev.device_qp->device = &g_device;
TAILQ_INIT(&g_comp_bdev.queued_comp_ops);
g_comp_xform = (struct rte_comp_xform) {
.type = RTE_COMP_COMPRESS,
.compress = {
.algo = RTE_COMP_ALGO_DEFLATE,
.deflate.huffman = RTE_COMP_HUFFMAN_DEFAULT,
.level = RTE_COMP_LEVEL_MAX,
.window_size = DEFAULT_WINDOW_SIZE,
.chksum = RTE_COMP_CHECKSUM_NONE,
.hash_algo = RTE_COMP_HASH_ALGO_NONE
}
};
g_decomp_xform = (struct rte_comp_xform) {
.type = RTE_COMP_DECOMPRESS,
.decompress = {
.algo = RTE_COMP_ALGO_DEFLATE,
.chksum = RTE_COMP_CHECKSUM_NONE,
.window_size = DEFAULT_WINDOW_SIZE,
.hash_algo = RTE_COMP_HASH_ALGO_NONE
}
};
g_device.comp_xform = &g_comp_xform;
g_device.decomp_xform = &g_decomp_xform;
g_cdev_cap.comp_feature_flags = RTE_COMP_FF_SHAREABLE_PRIV_XFORM;
g_device.cdev_info.driver_name = "compress_isal";
g_device.cdev_info.capabilities = &g_cdev_cap;
for (i = 0; i < UT_MBUFS_PER_OP_BOUND_TEST; i++) {
g_src_mbufs[i] = calloc(1, sizeof(struct rte_mbuf));
}
for (i = 0; i < UT_MBUFS_PER_OP; i++) {
g_dst_mbufs[i] = calloc(1, sizeof(struct rte_mbuf));
}
g_bdev_io = calloc(1, sizeof(struct spdk_bdev_io) + sizeof(struct comp_bdev_io));
g_bdev_io->u.bdev.iovs = calloc(128, sizeof(struct iovec));
g_bdev_io->bdev = &g_comp_bdev.comp_bdev;
@ -580,19 +228,6 @@ test_setup(void)
g_io_ctx->comp_ch = g_comp_ch;
g_io_ctx->comp_bdev = &g_comp_bdev;
g_comp_bdev.device_qp = &g_device_qp;
for (i = 0; i < UT_MBUFS_PER_OP_BOUND_TEST - 1; i++) {
g_expected_src_mbufs[i].next = &g_expected_src_mbufs[i + 1];
}
g_expected_src_mbufs[UT_MBUFS_PER_OP_BOUND_TEST - 1].next = NULL;
/* we only test w/4 mbufs on src side */
for (i = 0; i < UT_MBUFS_PER_OP - 1; i++) {
g_expected_dst_mbufs[i].next = &g_expected_dst_mbufs[i + 1];
}
g_expected_dst_mbufs[UT_MBUFS_PER_OP - 1].next = NULL;
g_mbuf_offset = DPDK_DYNFIELD_OFFSET;
return 0;
}
@ -602,14 +237,7 @@ static int
test_cleanup(void)
{
struct spdk_thread *thread;
int i;
for (i = 0; i < UT_MBUFS_PER_OP_BOUND_TEST; i++) {
free(g_src_mbufs[i]);
}
for (i = 0; i < UT_MBUFS_PER_OP; i++) {
free(g_dst_mbufs[i]);
}
free(g_bdev_io->u.bdev.iovs);
free(g_bdev_io);
free(g_io_ch);
@ -629,386 +257,11 @@ test_cleanup(void)
static void
test_compress_operation(void)
{
struct iovec src_iovs[3] = {};
int src_iovcnt;
struct iovec dst_iovs[3] = {};
int dst_iovcnt;
struct spdk_reduce_vol_cb_args cb_arg;
int rc, i;
struct vbdev_comp_op *op;
struct rte_mbuf *exp_src_mbuf[UT_MBUFS_PER_OP];
struct rte_mbuf *exp_dst_mbuf[UT_MBUFS_PER_OP];
src_iovcnt = dst_iovcnt = 3;
for (i = 0; i < dst_iovcnt; i++) {
src_iovs[i].iov_len = 0x1000;
dst_iovs[i].iov_len = 0x1000;
src_iovs[i].iov_base = (void *)0x10000000 + 0x1000 * i;
dst_iovs[i].iov_base = (void *)0x20000000 + 0x1000 * i;
}
/* test rte_comp_op_alloc failure */
MOCK_SET(rte_comp_op_alloc, NULL);
CU_ASSERT(TAILQ_EMPTY(&g_comp_bdev.queued_comp_ops) == true);
rc = _compress_operation(&g_comp_bdev.backing_dev, &src_iovs[0], src_iovcnt,
&dst_iovs[0], dst_iovcnt, true, &cb_arg);
CU_ASSERT(TAILQ_EMPTY(&g_comp_bdev.queued_comp_ops) == false);
while (!TAILQ_EMPTY(&g_comp_bdev.queued_comp_ops)) {
op = TAILQ_FIRST(&g_comp_bdev.queued_comp_ops);
TAILQ_REMOVE(&g_comp_bdev.queued_comp_ops, op, link);
free(op);
}
CU_ASSERT(TAILQ_EMPTY(&g_comp_bdev.queued_comp_ops) == true);
CU_ASSERT(rc == 0);
MOCK_SET(rte_comp_op_alloc, &g_comp_op[0]);
/* test mempool get failure */
ut_rte_pktmbuf_alloc_bulk = -1;
CU_ASSERT(TAILQ_EMPTY(&g_comp_bdev.queued_comp_ops) == true);
rc = _compress_operation(&g_comp_bdev.backing_dev, &src_iovs[0], src_iovcnt,
&dst_iovs[0], dst_iovcnt, true, &cb_arg);
CU_ASSERT(TAILQ_EMPTY(&g_comp_bdev.queued_comp_ops) == false);
while (!TAILQ_EMPTY(&g_comp_bdev.queued_comp_ops)) {
op = TAILQ_FIRST(&g_comp_bdev.queued_comp_ops);
TAILQ_REMOVE(&g_comp_bdev.queued_comp_ops, op, link);
free(op);
}
CU_ASSERT(TAILQ_EMPTY(&g_comp_bdev.queued_comp_ops) == true);
CU_ASSERT(rc == 0);
ut_rte_pktmbuf_alloc_bulk = 0;
/* test enqueue failure busy */
ut_enqueue_value = FAKE_ENQUEUE_BUSY;
CU_ASSERT(TAILQ_EMPTY(&g_comp_bdev.queued_comp_ops) == true);
rc = _compress_operation(&g_comp_bdev.backing_dev, &src_iovs[0], src_iovcnt,
&dst_iovs[0], dst_iovcnt, true, &cb_arg);
CU_ASSERT(TAILQ_EMPTY(&g_comp_bdev.queued_comp_ops) == false);
while (!TAILQ_EMPTY(&g_comp_bdev.queued_comp_ops)) {
op = TAILQ_FIRST(&g_comp_bdev.queued_comp_ops);
TAILQ_REMOVE(&g_comp_bdev.queued_comp_ops, op, link);
free(op);
}
CU_ASSERT(TAILQ_EMPTY(&g_comp_bdev.queued_comp_ops) == true);
CU_ASSERT(rc == 0);
ut_enqueue_value = 1;
/* test enqueue failure error */
ut_enqueue_value = FAKE_ENQUEUE_ERROR;
CU_ASSERT(TAILQ_EMPTY(&g_comp_bdev.queued_comp_ops) == true);
rc = _compress_operation(&g_comp_bdev.backing_dev, &src_iovs[0], src_iovcnt,
&dst_iovs[0], dst_iovcnt, true, &cb_arg);
CU_ASSERT(TAILQ_EMPTY(&g_comp_bdev.queued_comp_ops) == true);
CU_ASSERT(rc == -EINVAL);
ut_enqueue_value = FAKE_ENQUEUE_SUCCESS;
/* test success with 3 vector iovec */
ut_expected_op.private_xform = &g_decomp_xform;
ut_expected_op.src.offset = 0;
ut_expected_op.src.length = src_iovs[0].iov_len + src_iovs[1].iov_len + src_iovs[2].iov_len;
/* setup the src expected values */
_get_mbuf_array(exp_src_mbuf, &g_expected_src_mbufs[0], SPDK_COUNTOF(exp_src_mbuf), false);
ut_expected_op.m_src = exp_src_mbuf[0];
for (i = 0; i < UT_MBUFS_PER_OP; i++) {
*RTE_MBUF_DYNFIELD(exp_src_mbuf[i], g_mbuf_offset, uint64_t *) = (uint64_t)&cb_arg;
exp_src_mbuf[i]->buf_addr = src_iovs[i].iov_base;
exp_src_mbuf[i]->buf_iova = spdk_vtophys(src_iovs[i].iov_base, &src_iovs[i].iov_len);
exp_src_mbuf[i]->buf_len = src_iovs[i].iov_len;
exp_src_mbuf[i]->pkt_len = src_iovs[i].iov_len;
}
/* setup the dst expected values */
_get_mbuf_array(exp_dst_mbuf, &g_expected_dst_mbufs[0], SPDK_COUNTOF(exp_dst_mbuf), false);
ut_expected_op.dst.offset = 0;
ut_expected_op.m_dst = exp_dst_mbuf[0];
for (i = 0; i < UT_MBUFS_PER_OP; i++) {
exp_dst_mbuf[i]->buf_addr = dst_iovs[i].iov_base;
exp_dst_mbuf[i]->buf_iova = spdk_vtophys(dst_iovs[i].iov_base, &dst_iovs[i].iov_len);
exp_dst_mbuf[i]->buf_len = dst_iovs[i].iov_len;
exp_dst_mbuf[i]->pkt_len = dst_iovs[i].iov_len;
}
rc = _compress_operation(&g_comp_bdev.backing_dev, &src_iovs[0], src_iovcnt,
&dst_iovs[0], dst_iovcnt, false, &cb_arg);
CU_ASSERT(TAILQ_EMPTY(&g_comp_bdev.queued_comp_ops) == true);
CU_ASSERT(rc == 0);
/* test sgl out failure */
g_comp_bdev.backing_dev.sgl_out = false;
CU_ASSERT(TAILQ_EMPTY(&g_comp_bdev.queued_comp_ops) == true);
rc = _compress_operation(&g_comp_bdev.backing_dev, &src_iovs[0], 1,
&dst_iovs[0], dst_iovcnt, true, &cb_arg);
CU_ASSERT(rc == -EINVAL);
CU_ASSERT(TAILQ_EMPTY(&g_comp_bdev.queued_comp_ops) == true);
g_comp_bdev.backing_dev.sgl_out = true;
/* test sgl in failure */
g_comp_bdev.backing_dev.sgl_in = false;
CU_ASSERT(TAILQ_EMPTY(&g_comp_bdev.queued_comp_ops) == true);
rc = _compress_operation(&g_comp_bdev.backing_dev, &src_iovs[0], src_iovcnt,
&dst_iovs[0], 1, true, &cb_arg);
CU_ASSERT(rc == -EINVAL);
CU_ASSERT(TAILQ_EMPTY(&g_comp_bdev.queued_comp_ops) == true);
g_comp_bdev.backing_dev.sgl_in = true;
}
static void
test_compress_operation_cross_boundary(void)
{
struct iovec src_iovs[3] = {};
int src_iovcnt;
struct iovec dst_iovs[3] = {};
int dst_iovcnt;
struct spdk_reduce_vol_cb_args cb_arg;
int rc, i;
struct rte_mbuf *exp_src_mbuf[UT_MBUFS_PER_OP_BOUND_TEST];
struct rte_mbuf *exp_dst_mbuf[UT_MBUFS_PER_OP_BOUND_TEST];
/* Setup the same basic 3 IOV test as used in the simple success case
* but then we'll start testing a vtophy boundary crossing at each
* position.
*/
src_iovcnt = dst_iovcnt = 3;
for (i = 0; i < dst_iovcnt; i++) {
src_iovs[i].iov_len = 0x1000;
dst_iovs[i].iov_len = 0x1000;
src_iovs[i].iov_base = (void *)0x10000000 + 0x1000 * i;
dst_iovs[i].iov_base = (void *)0x20000000 + 0x1000 * i;
}
ut_expected_op.private_xform = &g_decomp_xform;
ut_expected_op.src.offset = 0;
ut_expected_op.src.length = src_iovs[0].iov_len + src_iovs[1].iov_len + src_iovs[2].iov_len;
/* setup the src expected values */
_get_mbuf_array(exp_src_mbuf, &g_expected_src_mbufs[0], SPDK_COUNTOF(exp_src_mbuf), false);
ut_expected_op.m_src = exp_src_mbuf[0];
for (i = 0; i < UT_MBUFS_PER_OP; i++) {
*RTE_MBUF_DYNFIELD(exp_src_mbuf[i], g_mbuf_offset, uint64_t *) = (uint64_t)&cb_arg;
exp_src_mbuf[i]->buf_addr = src_iovs[i].iov_base;
exp_src_mbuf[i]->buf_iova = spdk_vtophys(src_iovs[i].iov_base, &src_iovs[i].iov_len);
exp_src_mbuf[i]->buf_len = src_iovs[i].iov_len;
exp_src_mbuf[i]->pkt_len = src_iovs[i].iov_len;
}
/* setup the dst expected values, we don't test needing a 4th dst mbuf */
_get_mbuf_array(exp_dst_mbuf, &g_expected_dst_mbufs[0], SPDK_COUNTOF(exp_dst_mbuf), false);
ut_expected_op.dst.offset = 0;
ut_expected_op.m_dst = exp_dst_mbuf[0];
for (i = 0; i < UT_MBUFS_PER_OP; i++) {
exp_dst_mbuf[i]->buf_addr = dst_iovs[i].iov_base;
exp_dst_mbuf[i]->buf_iova = spdk_vtophys(dst_iovs[i].iov_base, &dst_iovs[i].iov_len);
exp_dst_mbuf[i]->buf_len = dst_iovs[i].iov_len;
exp_dst_mbuf[i]->pkt_len = dst_iovs[i].iov_len;
}
/* force the 1st IOV to get partial length from spdk_vtophys */
g_small_size_counter = 0;
g_small_size_modify = 1;
g_small_size = 0x800;
*RTE_MBUF_DYNFIELD(exp_src_mbuf[3], g_mbuf_offset, uint64_t *) = (uint64_t)&cb_arg;
/* first only has shorter length */
exp_src_mbuf[0]->pkt_len = exp_src_mbuf[0]->buf_len = 0x800;
/* 2nd was inserted by the boundary crossing condition and finishes off
* the length from the first */
exp_src_mbuf[1]->buf_addr = (void *)0x10000800;
exp_src_mbuf[1]->buf_iova = 0x10000800;
exp_src_mbuf[1]->pkt_len = exp_src_mbuf[1]->buf_len = 0x800;
/* 3rd looks like that the 2nd would have */
exp_src_mbuf[2]->buf_addr = (void *)0x10001000;
exp_src_mbuf[2]->buf_iova = 0x10001000;
exp_src_mbuf[2]->pkt_len = exp_src_mbuf[2]->buf_len = 0x1000;
/* a new 4th looks like what the 3rd would have */
exp_src_mbuf[3]->buf_addr = (void *)0x10002000;
exp_src_mbuf[3]->buf_iova = 0x10002000;
exp_src_mbuf[3]->pkt_len = exp_src_mbuf[3]->buf_len = 0x1000;
rc = _compress_operation(&g_comp_bdev.backing_dev, &src_iovs[0], src_iovcnt,
&dst_iovs[0], dst_iovcnt, false, &cb_arg);
CU_ASSERT(TAILQ_EMPTY(&g_comp_bdev.queued_comp_ops) == true);
CU_ASSERT(rc == 0);
/* Now force the 2nd IOV to get partial length from spdk_vtophys */
g_small_size_counter = 0;
g_small_size_modify = 2;
g_small_size = 0x800;
/* first is normal */
exp_src_mbuf[0]->buf_addr = (void *)0x10000000;
exp_src_mbuf[0]->buf_iova = 0x10000000;
exp_src_mbuf[0]->pkt_len = exp_src_mbuf[0]->buf_len = 0x1000;
/* second only has shorter length */
exp_src_mbuf[1]->buf_addr = (void *)0x10001000;
exp_src_mbuf[1]->buf_iova = 0x10001000;
exp_src_mbuf[1]->pkt_len = exp_src_mbuf[1]->buf_len = 0x800;
/* 3rd was inserted by the boundary crossing condition and finishes off
* the length from the first */
exp_src_mbuf[2]->buf_addr = (void *)0x10001800;
exp_src_mbuf[2]->buf_iova = 0x10001800;
exp_src_mbuf[2]->pkt_len = exp_src_mbuf[2]->buf_len = 0x800;
/* a new 4th looks like what the 3rd would have */
exp_src_mbuf[3]->buf_addr = (void *)0x10002000;
exp_src_mbuf[3]->buf_iova = 0x10002000;
exp_src_mbuf[3]->pkt_len = exp_src_mbuf[3]->buf_len = 0x1000;
rc = _compress_operation(&g_comp_bdev.backing_dev, &src_iovs[0], src_iovcnt,
&dst_iovs[0], dst_iovcnt, false, &cb_arg);
CU_ASSERT(TAILQ_EMPTY(&g_comp_bdev.queued_comp_ops) == true);
CU_ASSERT(rc == 0);
/* Finally force the 3rd IOV to get partial length from spdk_vtophys */
g_small_size_counter = 0;
g_small_size_modify = 3;
g_small_size = 0x800;
/* first is normal */
exp_src_mbuf[0]->buf_addr = (void *)0x10000000;
exp_src_mbuf[0]->buf_iova = 0x10000000;
exp_src_mbuf[0]->pkt_len = exp_src_mbuf[0]->buf_len = 0x1000;
/* second is normal */
exp_src_mbuf[1]->buf_addr = (void *)0x10001000;
exp_src_mbuf[1]->buf_iova = 0x10001000;
exp_src_mbuf[1]->pkt_len = exp_src_mbuf[1]->buf_len = 0x1000;
/* 3rd has shorter length */
exp_src_mbuf[2]->buf_addr = (void *)0x10002000;
exp_src_mbuf[2]->buf_iova = 0x10002000;
exp_src_mbuf[2]->pkt_len = exp_src_mbuf[2]->buf_len = 0x800;
/* a new 4th handles the remainder from the 3rd */
exp_src_mbuf[3]->buf_addr = (void *)0x10002800;
exp_src_mbuf[3]->buf_iova = 0x10002800;
exp_src_mbuf[3]->pkt_len = exp_src_mbuf[3]->buf_len = 0x800;
rc = _compress_operation(&g_comp_bdev.backing_dev, &src_iovs[0], src_iovcnt,
&dst_iovs[0], dst_iovcnt, false, &cb_arg);
CU_ASSERT(TAILQ_EMPTY(&g_comp_bdev.queued_comp_ops) == true);
CU_ASSERT(rc == 0);
/* Single input iov is split on page boundary, sgl_in is not supported */
g_comp_bdev.backing_dev.sgl_in = false;
g_small_size_counter = 0;
g_small_size_modify = 1;
g_small_size = 0x800;
rc = _compress_operation(&g_comp_bdev.backing_dev, src_iovs, 1,
dst_iovs, 1, false, &cb_arg);
CU_ASSERT(rc == -EINVAL);
g_comp_bdev.backing_dev.sgl_in = true;
/* Single output iov is split on page boundary, sgl_out is not supported */
g_comp_bdev.backing_dev.sgl_out = false;
g_small_size_counter = 0;
g_small_size_modify = 2;
g_small_size = 0x800;
rc = _compress_operation(&g_comp_bdev.backing_dev, src_iovs, 1,
dst_iovs, 1, false, &cb_arg);
CU_ASSERT(rc == -EINVAL);
g_comp_bdev.backing_dev.sgl_out = true;
}
static void
test_poller(void)
{
int rc;
struct spdk_reduce_vol_cb_args *cb_args;
struct rte_mbuf mbuf[4]; /* one src, one dst, 2 ops */
struct vbdev_comp_op *op_to_queue;
struct iovec src_iovs[3] = {};
struct iovec dst_iovs[3] = {};
int i;
cb_args = calloc(1, sizeof(*cb_args));
SPDK_CU_ASSERT_FATAL(cb_args != NULL);
cb_args->cb_fn = _compress_done;
memset(&g_comp_op[0], 0, sizeof(struct rte_comp_op));
g_comp_op[0].m_src = &mbuf[0];
g_comp_op[1].m_src = &mbuf[1];
g_comp_op[0].m_dst = &mbuf[2];
g_comp_op[1].m_dst = &mbuf[3];
for (i = 0; i < 3; i++) {
src_iovs[i].iov_len = 0x1000;
dst_iovs[i].iov_len = 0x1000;
src_iovs[i].iov_base = (void *)0x10000000 + 0x1000 * i;
dst_iovs[i].iov_base = (void *)0x20000000 + 0x1000 * i;
}
/* Error from dequeue, nothing needing to be resubmitted.
*/
ut_rte_compressdev_dequeue_burst = 1;
/* setup what we want dequeue to return for the op */
*RTE_MBUF_DYNFIELD(g_comp_op[0].m_src, g_mbuf_offset, uint64_t *) = (uint64_t)cb_args;
g_comp_op[0].produced = 1;
g_comp_op[0].status = 1;
/* value asserted in the reduce callback */
ut_compress_done[0] = -EINVAL;
CU_ASSERT(TAILQ_EMPTY(&g_comp_bdev.queued_comp_ops) == true);
rc = comp_dev_poller((void *)&g_comp_bdev);
CU_ASSERT(TAILQ_EMPTY(&g_comp_bdev.queued_comp_ops) == true);
CU_ASSERT(rc == SPDK_POLLER_BUSY);
/* Success from dequeue, 2 ops. nothing needing to be resubmitted.
*/
ut_rte_compressdev_dequeue_burst = 2;
/* setup what we want dequeue to return for the op */
*RTE_MBUF_DYNFIELD(g_comp_op[0].m_src, g_mbuf_offset, uint64_t *) = (uint64_t)cb_args;
g_comp_op[0].produced = 16;
g_comp_op[0].status = 0;
*RTE_MBUF_DYNFIELD(g_comp_op[1].m_src, g_mbuf_offset, uint64_t *) = (uint64_t)cb_args;
g_comp_op[1].produced = 32;
g_comp_op[1].status = 0;
/* value asserted in the reduce callback */
ut_compress_done[0] = 16;
ut_compress_done[1] = 32;
done_count = 2;
CU_ASSERT(TAILQ_EMPTY(&g_comp_bdev.queued_comp_ops) == true);
rc = comp_dev_poller((void *)&g_comp_bdev);
CU_ASSERT(TAILQ_EMPTY(&g_comp_bdev.queued_comp_ops) == true);
CU_ASSERT(rc == SPDK_POLLER_BUSY);
/* Success from dequeue, one op to be resubmitted.
*/
ut_rte_compressdev_dequeue_burst = 1;
/* setup what we want dequeue to return for the op */
*RTE_MBUF_DYNFIELD(g_comp_op[0].m_src, g_mbuf_offset, uint64_t *) = (uint64_t)cb_args;
g_comp_op[0].produced = 16;
g_comp_op[0].status = 0;
/* value asserted in the reduce callback */
ut_compress_done[0] = 16;
done_count = 1;
op_to_queue = calloc(1, sizeof(struct vbdev_comp_op));
SPDK_CU_ASSERT_FATAL(op_to_queue != NULL);
op_to_queue->backing_dev = &g_comp_bdev.backing_dev;
op_to_queue->src_iovs = &src_iovs[0];
op_to_queue->src_iovcnt = 3;
op_to_queue->dst_iovs = &dst_iovs[0];
op_to_queue->dst_iovcnt = 3;
op_to_queue->compress = true;
op_to_queue->cb_arg = cb_args;
ut_enqueue_value = FAKE_ENQUEUE_SUCCESS;
TAILQ_INSERT_TAIL(&g_comp_bdev.queued_comp_ops,
op_to_queue,
link);
CU_ASSERT(TAILQ_EMPTY(&g_comp_bdev.queued_comp_ops) == false);
rc = comp_dev_poller((void *)&g_comp_bdev);
CU_ASSERT(TAILQ_EMPTY(&g_comp_bdev.queued_comp_ops) == true);
CU_ASSERT(rc == SPDK_POLLER_BUSY);
/* op_to_queue is freed in code under test */
free(cb_args);
}
static void
@ -1064,85 +317,6 @@ test_reset(void)
*/
}
static void
test_initdrivers(void)
{
int rc;
/* test return values from rte_vdev_init() */
MOCK_SET(rte_vdev_init, -EEXIST);
rc = vbdev_init_compress_drivers();
/* This is not an error condition, we already have one */
CU_ASSERT(rc == 0);
/* error */
MOCK_SET(rte_vdev_init, -2);
rc = vbdev_init_compress_drivers();
CU_ASSERT(rc == -EINVAL);
CU_ASSERT(g_mbuf_mp == NULL);
CU_ASSERT(g_comp_op_mp == NULL);
/* compressdev count 0 */
ut_rte_compressdev_count = 0;
MOCK_SET(rte_vdev_init, 0);
rc = vbdev_init_compress_drivers();
CU_ASSERT(rc == 0);
/* bogus count */
ut_rte_compressdev_count = RTE_COMPRESS_MAX_DEVS + 1;
rc = vbdev_init_compress_drivers();
CU_ASSERT(rc == -EINVAL);
/* can't get mbuf pool */
ut_rte_compressdev_count = 1;
MOCK_SET(spdk_mempool_create, NULL);
rc = vbdev_init_compress_drivers();
CU_ASSERT(rc == -ENOMEM);
MOCK_CLEAR(spdk_mempool_create);
/* can't get comp op pool */
ut_rte_comp_op_pool_create = NULL;
rc = vbdev_init_compress_drivers();
CU_ASSERT(rc == -ENOMEM);
/* error on create_compress_dev() */
ut_rte_comp_op_pool_create = (struct rte_mempool *)&test_initdrivers;
ut_rte_compressdev_configure = -1;
rc = vbdev_init_compress_drivers();
CU_ASSERT(rc == -1);
/* error on create_compress_dev() but coverage for large num queues */
ut_max_nb_queue_pairs = 99;
rc = vbdev_init_compress_drivers();
CU_ASSERT(rc == -1);
/* qpair setup fails */
ut_rte_compressdev_configure = 0;
ut_max_nb_queue_pairs = 0;
ut_rte_compressdev_queue_pair_setup = -1;
rc = vbdev_init_compress_drivers();
CU_ASSERT(rc == -EINVAL);
/* rte_compressdev_start fails */
ut_rte_compressdev_queue_pair_setup = 0;
ut_rte_compressdev_start = -1;
rc = vbdev_init_compress_drivers();
CU_ASSERT(rc == -1);
/* rte_compressdev_private_xform_create() fails */
ut_rte_compressdev_start = 0;
ut_rte_compressdev_private_xform_create = -2;
rc = vbdev_init_compress_drivers();
CU_ASSERT(rc == -2);
/* success */
ut_rte_compressdev_private_xform_create = 0;
rc = vbdev_init_compress_drivers();
CU_ASSERT(rc == 0);
CU_ASSERT(g_mbuf_offset == DPDK_DYNFIELD_OFFSET);
spdk_mempool_free((struct spdk_mempool *)g_mbuf_mp);
}
static void
test_supported_io(void)
{
@ -1163,9 +337,7 @@ main(int argc, char **argv)
CU_ADD_TEST(suite, test_compress_operation_cross_boundary);
CU_ADD_TEST(suite, test_vbdev_compress_submit_request);
CU_ADD_TEST(suite, test_passthru);
CU_ADD_TEST(suite, test_initdrivers);
CU_ADD_TEST(suite, test_supported_io);
CU_ADD_TEST(suite, test_poller);
CU_ADD_TEST(suite, test_reset);
CU_basic_set_mode(CU_BRM_VERBOSE);

View File

@ -479,7 +479,9 @@ backing_dev_compress(struct spdk_reduce_backing_dev *backing_dev,
rc = ut_compress(dst_iov[0].iov_base, &compressed_len,
g_decomp_buf, total_length);
args->cb_fn(args->cb_arg, rc ? rc : (int)compressed_len);
args->output_size = compressed_len;
args->cb_fn(args->cb_arg, rc);
}
static void
@ -506,7 +508,9 @@ backing_dev_decompress(struct spdk_reduce_backing_dev *backing_dev,
buf += dst_iov[i].iov_len;
}
args->cb_fn(args->cb_arg, rc ? rc : (int)decompressed_len);
args->output_size = decompressed_len;
args->cb_fn(args->cb_arg, rc);
}
static void
@ -1547,7 +1551,8 @@ dummy_backing_dev_decompress(struct spdk_reduce_backing_dev *backing_dev,
struct iovec *dst_iov, int dst_iovcnt,
struct spdk_reduce_vol_cb_args *args)
{
args->cb_fn(args->cb_arg, g_decompressed_len);
args->output_size = g_decompressed_len;
args->cb_fn(args->cb_arg, 0);
}
static void
test_reduce_decompress_chunk(void)