diff --git a/lib/bdev/compress/vbdev_compress.c b/lib/bdev/compress/vbdev_compress.c index e1162ac51..603bafc80 100644 --- a/lib/bdev/compress/vbdev_compress.c +++ b/lib/bdev/compress/vbdev_compress.c @@ -539,15 +539,16 @@ comp_dev_poller(void *args) for (i = 0; i < num_deq; i++) { reduce_args = (struct spdk_reduce_vol_cb_args *)deq_ops[i]->m_src->userdata; - if (deq_ops[i]->status != RTE_COMP_OP_STATUS_SUCCESS) { + if (deq_ops[i]->status == RTE_COMP_OP_STATUS_SUCCESS) { + + /* tell reduce this is done and what the bytecount was */ + reduce_args->cb_fn(reduce_args->cb_arg, deq_ops[i]->produced); + } else { SPDK_ERRLOG("deque status %u\n", deq_ops[i]->status); - /* Reduce will simply store uncompressed on any negative value - * passed in to deq_ops[i]->produced so set that now. - */ - deq_ops[i]->produced = -EINVAL; + /* Reduce will simply store uncompressed on neg errno value. */ + reduce_args->cb_fn(reduce_args->cb_arg, -EINVAL); } - reduce_args->cb_fn(reduce_args->cb_arg, deq_ops[i]->produced); /* Now free both mbufs and the compress operation. The rte_pktmbuf_free() * call takes care of freeing all of the mbufs in the chain back to their diff --git a/test/unit/lib/bdev/compress.c/compress_ut.c b/test/unit/lib/bdev/compress.c/compress_ut.c index 36c65a083..c42fb17eb 100644 --- a/test/unit/lib/bdev/compress.c/compress_ut.c +++ b/test/unit/lib/bdev/compress.c/compress_ut.c @@ -42,7 +42,7 @@ struct spdk_bdev_io *g_bdev_io; struct spdk_io_channel *g_io_ch; -struct rte_comp_op g_comp_op; +struct rte_comp_op g_comp_op[2]; struct vbdev_compress g_comp_bdev; struct comp_device_qp g_device_qp; struct compress_dev g_device; @@ -266,8 +266,6 @@ DEFINE_STUB(spdk_reduce_vol_get_params, const struct spdk_reduce_vol_params *, DEFINE_STUB(rte_socket_id, unsigned, (void), 0); DEFINE_STUB(rte_eal_get_configuration, struct rte_config *, (void), NULL); DEFINE_STUB(rte_vdev_init, int, (const char *name, const char *args), 0); -DEFINE_STUB(rte_compressdev_dequeue_burst, uint16_t, - (uint8_t dev_id, uint16_t qp_id, struct rte_comp_op **ops, uint16_t nb_ops), 0); DEFINE_STUB_V(rte_comp_op_free, (struct rte_comp_op *op)); DEFINE_STUB(rte_comp_op_alloc, struct rte_comp_op *, (struct rte_mempool *mempool), NULL); @@ -341,6 +339,38 @@ spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status sta g_completion_called = true; } +static uint16_t ut_rte_compressdev_dequeue_burst = 0; +uint16_t +rte_compressdev_dequeue_burst(uint8_t dev_id, uint16_t qp_id, struct rte_comp_op **ops, + uint16_t nb_op) +{ + if (ut_rte_compressdev_dequeue_burst == 0) { + return 0; + } + + ops[0] = &g_comp_op[0]; + ops[1] = &g_comp_op[1]; + + return ut_rte_compressdev_dequeue_burst; +} + +static int ut_compress_done[2]; +/* done_count and done_idx together control which expected assertion + * value to use when dequeuing 2 operations. + */ +static uint16_t done_count = 1; +static uint16_t done_idx = 0; +static void +_compress_done(void *_req, int reduce_errno) +{ + if (done_count == 1) { + CU_ASSERT(reduce_errno == ut_compress_done[0]); + } else if (done_count == 2) { + CU_ASSERT(reduce_errno == ut_compress_done[done_idx++]); + } +} + +#define FAKE_ENQUEUE_SUCCESS 255 static uint16_t ut_enqueue_value = 0; static struct rte_comp_op ut_expected_op; uint16_t @@ -353,6 +383,9 @@ rte_compressdev_enqueue_burst(uint8_t dev_id, uint16_t qp_id, struct rte_comp_op return 0; } + if (ut_enqueue_value == FAKE_ENQUEUE_SUCCESS) { + return 1; + } /* by design the compress module will never send more than 1 op at a time */ CU_ASSERT(op->private_xform == ut_expected_op.private_xform); @@ -497,7 +530,7 @@ test_compress_operation(void) } CU_ASSERT(TAILQ_EMPTY(&g_comp_bdev.queued_comp_ops) == true); CU_ASSERT(rc == 0); - MOCK_SET(rte_comp_op_alloc, &g_comp_op); + MOCK_SET(rte_comp_op_alloc, &g_comp_op[0]); /* test mempool get failure */ ut_rte_pktmbuf_alloc_bulk = -1; @@ -557,6 +590,88 @@ test_compress_operation(void) CU_ASSERT(rc == 0); } +static void +test_poller(void) +{ + int rc; + struct spdk_reduce_vol_cb_args *cb_args; + struct rte_mbuf mbuf[2]; + struct vbdev_comp_op *op_to_queue; + struct iovec src_iovs[2] = {}; + struct iovec dst_iovs[2] = {}; + + cb_args = calloc(1, sizeof(*cb_args)); + SPDK_CU_ASSERT_FATAL(cb_args != NULL); + cb_args->cb_fn = _compress_done; + memset(&g_comp_op[0], 0, sizeof(struct rte_comp_op)); + g_comp_op[0].m_src = &mbuf[0]; + g_comp_op[1].m_src = &mbuf[1]; + + /* Error from dequeue, nothing needing to be resubmitted. + */ + ut_rte_compressdev_dequeue_burst = 1; + /* setup what we want dequeue to return for the op */ + g_comp_op[0].m_src->userdata = (void *)cb_args; + g_comp_op[0].produced = 1; + g_comp_op[0].status = 1; + /* value asserted in the reduce callback */ + ut_compress_done[0] = -EINVAL; + CU_ASSERT(TAILQ_EMPTY(&g_comp_bdev.queued_comp_ops) == true); + rc = comp_dev_poller((void *)&g_comp_bdev); + CU_ASSERT(TAILQ_EMPTY(&g_comp_bdev.queued_comp_ops) == true); + CU_ASSERT(rc == 0); + + /* Success from dequeue, 2 ops. nothing needing to be resubmitted. + */ + ut_rte_compressdev_dequeue_burst = 2; + /* setup what we want dequeue to return for the op */ + g_comp_op[0].m_src->userdata = (void *)cb_args; + g_comp_op[0].produced = 16; + g_comp_op[0].status = 0; + g_comp_op[1].m_src->userdata = (void *)cb_args; + g_comp_op[1].produced = 32; + g_comp_op[1].status = 0; + /* value asserted in the reduce callback */ + ut_compress_done[0] = 16; + ut_compress_done[1] = 32; + done_count = 2; + CU_ASSERT(TAILQ_EMPTY(&g_comp_bdev.queued_comp_ops) == true); + rc = comp_dev_poller((void *)&g_comp_bdev); + CU_ASSERT(TAILQ_EMPTY(&g_comp_bdev.queued_comp_ops) == true); + CU_ASSERT(rc == 0); + + /* Success from dequeue, one op to be resubmitted. + */ + ut_rte_compressdev_dequeue_burst = 1; + /* setup what we want dequeue to return for the op */ + g_comp_op[0].m_src->userdata = (void *)cb_args; + g_comp_op[0].produced = 16; + g_comp_op[0].status = 0; + /* value asserted in the reduce callback */ + ut_compress_done[0] = 16; + done_count = 1; + op_to_queue = calloc(1, sizeof(struct vbdev_comp_op)); + SPDK_CU_ASSERT_FATAL(op_to_queue != NULL); + op_to_queue->backing_dev = &g_comp_bdev.backing_dev; + op_to_queue->src_iovs = &src_iovs[0]; + op_to_queue->src_iovcnt = 2; + op_to_queue->dst_iovs = &dst_iovs[0]; + op_to_queue->dst_iovcnt = 2; + op_to_queue->compress = true; + op_to_queue->cb_arg = cb_args; + ut_enqueue_value = FAKE_ENQUEUE_SUCCESS; + TAILQ_INSERT_TAIL(&g_comp_bdev.queued_comp_ops, + op_to_queue, + link); + CU_ASSERT(TAILQ_EMPTY(&g_comp_bdev.queued_comp_ops) == false); + rc = comp_dev_poller((void *)&g_comp_bdev); + CU_ASSERT(TAILQ_EMPTY(&g_comp_bdev.queued_comp_ops) == true); + CU_ASSERT(rc == 0); + + /* op_to_queue is freed in code under test */ + free(cb_args); +} + static void test_vbdev_compress_submit_request(void) { @@ -742,6 +857,8 @@ main(int argc, char **argv) test_initdrivers) == NULL || CU_add_test(suite, "test_supported_io", test_supported_io) == NULL || + CU_add_test(suite, "test_poller", + test_poller) == NULL || CU_add_test(suite, "test_reset", test_reset) == NULL ) {