diff --git a/module/bdev/compress/vbdev_compress.c b/module/bdev/compress/vbdev_compress.c index 970fe6d66..77e6df48f 100644 --- a/module/bdev/compress/vbdev_compress.c +++ b/module/bdev/compress/vbdev_compress.c @@ -54,7 +54,10 @@ #define NUM_MAX_XFORMS 2 #define NUM_MAX_INFLIGHT_OPS 128 #define DEFAULT_WINDOW_SIZE 15 -#define MAX_MBUFS_PER_OP REDUCE_MAX_IOVECS +/* We need extra mbufs per operation to accomodate host buffers that + * span a 2MB boundary. + */ +#define MAX_MBUFS_PER_OP (REDUCE_MAX_IOVECS * 2) #define CHUNK_SIZE (1024 * 16) #define COMP_BDEV_NAME "compress" #define BACKING_IO_SZ (4 * 1024) @@ -437,13 +440,15 @@ _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *s struct rte_mbuf *src_mbufs[MAX_MBUFS_PER_OP]; struct rte_mbuf *dst_mbufs[MAX_MBUFS_PER_OP]; uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id; - uint64_t total_length = 0; + uint64_t updated_length, remainder, phys_addr, total_length = 0; uint8_t *current_src_base = NULL; uint8_t *current_dst_base = NULL; - int iov_index; + int iov_index, mbuf_index; int rc = 0; struct vbdev_comp_op *op_to_queue; int i; + int src_mbuf_total = src_iovcnt; + int dst_mbuf_total = dst_iovcnt; assert(src_iovcnt < MAX_MBUFS_PER_OP); @@ -477,23 +482,53 @@ _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *s */ /* Setup src mbufs */ - for (iov_index = 0; iov_index < src_iovcnt; iov_index++) { + iov_index = mbuf_index = 0; + while (iov_index < src_iovcnt) { current_src_base = src_iovs[iov_index].iov_base; total_length += src_iovs[iov_index].iov_len; - assert(src_mbufs[iov_index] != NULL); + assert(src_mbufs[mbuf_index] != NULL); src_mbufs[iov_index]->userdata = reduce_cb_arg; + updated_length = src_iovs[iov_index].iov_len; + phys_addr = spdk_vtophys((void *)current_src_base, &updated_length); - rte_pktmbuf_attach_extbuf(src_mbufs[iov_index], + rte_pktmbuf_attach_extbuf(src_mbufs[mbuf_index], current_src_base, - spdk_vtophys((void *)current_src_base, NULL), - src_iovs[iov_index].iov_len, + phys_addr, + updated_length, &g_shinfo); - rte_pktmbuf_append(src_mbufs[iov_index], src_iovs[iov_index].iov_len); + rte_pktmbuf_append(src_mbufs[mbuf_index], updated_length); + remainder = src_iovs[iov_index].iov_len - updated_length; - if (iov_index > 0) { - rte_pktmbuf_chain(src_mbufs[0], src_mbufs[iov_index]); + if (mbuf_index > 0) { + rte_pktmbuf_chain(src_mbufs[0], src_mbufs[mbuf_index]); } + + /* If we crossed 2 2MB boundary we need another mbuf for the remainder */ + if (remainder > 0) { + /* allocate an mbuf at the end of the array */ + rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&src_mbufs[src_mbuf_total], 1); + if (rc) { + SPDK_ERRLOG("ERROR trying to get an extra src_mbuf!\n"); + goto error_src_dst; + } + src_mbuf_total++; + mbuf_index++; + current_src_base += updated_length; + phys_addr = spdk_vtophys((void *)current_src_base, &remainder); + /* assert we don't cross another */ + assert(remainder == src_iovs[iov_index].iov_len - updated_length); + + rte_pktmbuf_attach_extbuf(src_mbufs[mbuf_index], + current_src_base, + phys_addr, + remainder, + &g_shinfo); + rte_pktmbuf_append(src_mbufs[mbuf_index], remainder); + rte_pktmbuf_chain(src_mbufs[0], src_mbufs[mbuf_index]); + } + iov_index++; + mbuf_index++; } comp_op->m_src = src_mbufs[0]; @@ -501,21 +536,51 @@ _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *s comp_op->src.length = total_length; /* setup dst mbufs, for the current test being used with this code there's only one vector */ - for (iov_index = 0; iov_index < dst_iovcnt; iov_index++) { + iov_index = mbuf_index = 0; + while (iov_index < dst_iovcnt) { current_dst_base = dst_iovs[iov_index].iov_base; + updated_length = dst_iovs[iov_index].iov_len; + phys_addr = spdk_vtophys((void *)current_dst_base, &updated_length); - rte_pktmbuf_attach_extbuf(dst_mbufs[iov_index], + rte_pktmbuf_attach_extbuf(dst_mbufs[mbuf_index], current_dst_base, - spdk_vtophys((void *)current_dst_base, NULL), - dst_iovs[iov_index].iov_len, + phys_addr, + updated_length, &g_shinfo); - rte_pktmbuf_append(dst_mbufs[iov_index], dst_iovs[iov_index].iov_len); + rte_pktmbuf_append(dst_mbufs[mbuf_index], updated_length); + remainder = dst_iovs[iov_index].iov_len - updated_length; - if (iov_index > 0) { - rte_pktmbuf_chain(dst_mbufs[0], dst_mbufs[iov_index]); + if (mbuf_index > 0) { + rte_pktmbuf_chain(dst_mbufs[0], dst_mbufs[mbuf_index]); } + + /* If we crossed 2 2MB boundary we need another mbuf for the remainder */ + if (remainder > 0) { + rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&dst_mbufs[dst_mbuf_total], 1); + if (rc) { + SPDK_ERRLOG("ERROR trying to get an extra dst_mbuf!\n"); + goto error_src_dst; + } + dst_mbuf_total++; + mbuf_index++; + current_dst_base += updated_length; + phys_addr = spdk_vtophys((void *)current_dst_base, &remainder); + /* assert we don't cross another */ + assert(remainder == dst_iovs[iov_index].iov_len - updated_length); + + rte_pktmbuf_attach_extbuf(dst_mbufs[mbuf_index], + current_dst_base, + phys_addr, + remainder, + &g_shinfo); + rte_pktmbuf_append(dst_mbufs[mbuf_index], remainder); + rte_pktmbuf_chain(dst_mbufs[0], dst_mbufs[mbuf_index]); + } + iov_index++; + mbuf_index++; } + comp_op->m_dst = dst_mbufs[0]; comp_op->dst.offset = 0; @@ -542,8 +607,12 @@ _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *s } /* Error cleanup paths. */ +error_src_dst: + for (i = 0; i < dst_mbuf_total; i++) { + rte_pktmbuf_free((struct rte_mbuf *)&dst_mbufs[i]); + } error_get_dst: - for (i = 0; i < src_iovcnt; i++) { + for (i = 0; i < src_mbuf_total; i++) { rte_pktmbuf_free((struct rte_mbuf *)&src_mbufs[i]); } error_get_src: