From 33eac886b9325b200f3238bdb30336b6bd8e2e45 Mon Sep 17 00:00:00 2001 From: paul luse Date: Mon, 3 Aug 2020 11:54:55 -0400 Subject: [PATCH] lib/idxd: refactor batching for increased performance And to eliminate an artificial constraint on # of user descriptors. The main idea here was to move from a single ring that covered all user descriptors to a pre-allocated ring per pre-allocated batch. In addition, the other major change here is in how we poll for completions. We used to poll the batch rings then the main ring. Now when commands are prepared their completion address is added to a per channel list and the poller simply runs through that list not caring which ring the completion address belongs too. This simplifies the completion logic considerably and will avoid polling locations that can't potentially have a completion. Some minor rework was included as well, mainly getting rid of the ring_ctrl struct as it didn't serve much of a purpose anyway and with how things are setup now its easier to read with all the elements in the channel struct. Also, a change that came in while this was WIP needed a few fixes to function correctly. Addressed those and moved them to a helper function so we have one point of control for xlations. Added support for NOP in cases where a batch is submitted with only 1 descriptor. Signed-off-by: paul luse Change-Id: Ie201b28118823100e908e0d1b08e7c10bb8fa9e7 Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/3654 Tested-by: SPDK CI Jenkins Community-CI: Mellanox Build Bot Reviewed-by: Jim Harris Reviewed-by: Ben Walker --- lib/idxd/Makefile | 2 +- lib/idxd/idxd.c | 664 +++++++++++++------------- lib/idxd/idxd.h | 84 ++-- module/accel/idxd/accel_engine_idxd.c | 34 +- test/unit/lib/idxd/idxd.c/idxd_ut.c | 18 +- 5 files changed, 417 insertions(+), 385 deletions(-) diff --git a/lib/idxd/Makefile b/lib/idxd/Makefile index ed66aeb15..5d896649b 100644 --- a/lib/idxd/Makefile +++ b/lib/idxd/Makefile @@ -34,7 +34,7 @@ SPDK_ROOT_DIR := $(abspath $(CURDIR)/../..) include $(SPDK_ROOT_DIR)/mk/spdk.common.mk -SO_VER := 2 +SO_VER := 3 SO_MINOR := 0 C_SRCS = idxd.c diff --git a/lib/idxd/idxd.c b/lib/idxd/idxd.c index 743194f5c..6f3e253c8 100644 --- a/lib/idxd/idxd.c +++ b/lib/idxd/idxd.c @@ -115,19 +115,19 @@ spdk_idxd_get_channel(struct spdk_idxd_device *idxd) chan->idxd = idxd; TAILQ_INIT(&chan->batches); - TAILQ_INIT(&chan->batch_pool); - for (i = 0 ; i < NUM_BATCHES ; i++) { - batch = calloc(1, sizeof(struct idxd_batch)); - if (batch == NULL) { - SPDK_ERRLOG("Failed to allocate batch\n"); - while ((batch = TAILQ_FIRST(&chan->batch_pool))) { - TAILQ_REMOVE(&chan->batch_pool, batch, link); - free(batch); - } - return NULL; - } + TAILQ_INIT(&chan->comp_ctx_oustanding); + + chan->batch_base = calloc(NUM_BATCHES_PER_CHANNEL, sizeof(struct idxd_batch)); + if (chan->batch_base == NULL) { + SPDK_ERRLOG("Failed to allocate batch pool\n"); + return NULL; + } + + batch = chan->batch_base; + for (i = 0 ; i < NUM_BATCHES_PER_CHANNEL ; i++) { TAILQ_INSERT_TAIL(&chan->batch_pool, batch, link); + batch++; } return chan; @@ -142,8 +142,8 @@ spdk_idxd_put_channel(struct spdk_idxd_io_channel *chan) int spdk_idxd_configure_chan(struct spdk_idxd_io_channel *chan) { - uint32_t num_ring_slots; - int rc; + struct idxd_batch *batch; + int rc, num_ring_slots; /* Round robin the WQ selection for the chan on this IDXD device. */ chan->idxd->wq_id++; @@ -153,8 +153,8 @@ spdk_idxd_configure_chan(struct spdk_idxd_io_channel *chan) num_ring_slots = chan->idxd->queues[chan->idxd->wq_id].wqcfg.wq_size; - chan->ring_ctrl.ring_slots = spdk_bit_array_create(num_ring_slots); - if (chan->ring_ctrl.ring_slots == NULL) { + chan->ring_slots = spdk_bit_array_create(num_ring_slots); + if (chan->ring_slots == NULL) { SPDK_ERRLOG("Failed to allocate bit array for ring\n"); return -ENOMEM; } @@ -163,69 +163,67 @@ spdk_idxd_configure_chan(struct spdk_idxd_io_channel *chan) * max ring slots can change as channels come and go but we * start off getting all of the slots for this work queue. */ - chan->ring_ctrl.max_ring_slots = num_ring_slots; + chan->max_ring_slots = num_ring_slots; /* Store the original size of the ring. */ - chan->ring_ctrl.ring_size = num_ring_slots; + chan->ring_size = num_ring_slots; - chan->ring_ctrl.desc = spdk_zmalloc(num_ring_slots * sizeof(struct idxd_hw_desc), - 0x40, NULL, - SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); - if (chan->ring_ctrl.desc == NULL) { + chan->desc = spdk_zmalloc(num_ring_slots * sizeof(struct idxd_hw_desc), + 0x40, NULL, + SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); + if (chan->desc == NULL) { SPDK_ERRLOG("Failed to allocate descriptor memory\n"); rc = -ENOMEM; goto err_desc; } - chan->ring_ctrl.completions = spdk_zmalloc(num_ring_slots * sizeof(struct idxd_comp), - 0x40, NULL, - SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); - if (chan->ring_ctrl.completions == NULL) { + chan->completions = spdk_zmalloc(num_ring_slots * sizeof(struct idxd_comp), + 0x40, NULL, + SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); + if (chan->completions == NULL) { SPDK_ERRLOG("Failed to allocate completion memory\n"); rc = -ENOMEM; goto err_comp; } - chan->ring_ctrl.user_desc = spdk_zmalloc(TOTAL_USER_DESC * sizeof(struct idxd_hw_desc), - 0x40, NULL, - SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); - if (chan->ring_ctrl.user_desc == NULL) { - SPDK_ERRLOG("Failed to allocate batch descriptor memory\n"); - rc = -ENOMEM; - goto err_user_desc; + /* Populate the batches */ + TAILQ_FOREACH(batch, &chan->batch_pool, link) { + batch->user_desc = spdk_zmalloc(DESC_PER_BATCH * sizeof(struct idxd_hw_desc), + 0x40, NULL, + SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); + if (batch->user_desc == NULL) { + SPDK_ERRLOG("Failed to allocate batch descriptor memory\n"); + rc = -ENOMEM; + goto err_user_desc; + } + + batch->user_completions = spdk_zmalloc(DESC_PER_BATCH * sizeof(struct idxd_comp), + 0x40, NULL, + SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); + if (batch->user_completions == NULL) { + SPDK_ERRLOG("Failed to allocate user completion memory\n"); + rc = -ENOMEM; + goto err_user_comp; + } } - /* Each slot on the ring reserves DESC_PER_BATCH elemnts in user_desc. */ - chan->ring_ctrl.user_ring_slots = spdk_bit_array_create(NUM_BATCHES); - if (chan->ring_ctrl.user_ring_slots == NULL) { - SPDK_ERRLOG("Failed to allocate bit array for user ring\n"); - rc = -ENOMEM; - goto err_user_ring; - } - - chan->ring_ctrl.user_completions = spdk_zmalloc(TOTAL_USER_DESC * sizeof(struct idxd_comp), - 0x40, NULL, - SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); - if (chan->ring_ctrl.user_completions == NULL) { - SPDK_ERRLOG("Failed to allocate user completion memory\n"); - rc = -ENOMEM; - goto err_user_comp; - } - - chan->ring_ctrl.portal = (char *)chan->idxd->portals + chan->idxd->wq_id * PORTAL_SIZE; + /* Assign portal based on work queue chosen earlier. */ + chan->portal = (char *)chan->idxd->portals + chan->idxd->wq_id * PORTAL_SIZE; return 0; err_user_comp: - spdk_bit_array_free(&chan->ring_ctrl.user_ring_slots); -err_user_ring: - spdk_free(chan->ring_ctrl.user_desc); + TAILQ_FOREACH(batch, &chan->batch_pool, link) { + spdk_free(batch->user_desc); + } err_user_desc: - spdk_free(chan->ring_ctrl.completions); + TAILQ_FOREACH(batch, &chan->batch_pool, link) { + spdk_free(chan->completions); + } err_comp: - spdk_free(chan->ring_ctrl.desc); + spdk_free(chan->desc); err_desc: - spdk_bit_array_free(&chan->ring_ctrl.ring_slots); + spdk_bit_array_free(&chan->ring_slots); return rc; } @@ -267,8 +265,8 @@ _idxd_drain(struct spdk_idxd_io_channel *chan) do { spdk_idxd_process_events(chan); set = 0; - for (index = 0; index < chan->ring_ctrl.max_ring_slots; index++) { - set |= spdk_bit_array_get(chan->ring_ctrl.ring_slots, index); + for (index = 0; index < chan->max_ring_slots; index++) { + set |= spdk_bit_array_get(chan->ring_slots, index); } } while (set); } @@ -282,32 +280,31 @@ spdk_idxd_reconfigure_chan(struct spdk_idxd_io_channel *chan, uint32_t num_chann _idxd_drain(chan); - assert(spdk_bit_array_count_set(chan->ring_ctrl.ring_slots) == 0); + assert(spdk_bit_array_count_set(chan->ring_slots) == 0); if (num_channels == 0) { - spdk_free(chan->ring_ctrl.completions); - spdk_free(chan->ring_ctrl.desc); - spdk_bit_array_free(&chan->ring_ctrl.ring_slots); - spdk_free(chan->ring_ctrl.user_completions); - spdk_free(chan->ring_ctrl.user_desc); - spdk_bit_array_free(&chan->ring_ctrl.user_ring_slots); + spdk_free(chan->completions); + spdk_free(chan->desc); + spdk_bit_array_free(&chan->ring_slots); while ((batch = TAILQ_FIRST(&chan->batch_pool))) { TAILQ_REMOVE(&chan->batch_pool, batch, link); - free(batch); + spdk_free(batch->user_completions); + spdk_free(batch->user_desc); } + free(chan->batch_base); return 0; } - num_ring_slots = chan->ring_ctrl.ring_size / num_channels; + num_ring_slots = chan->ring_size / num_channels; /* re-allocate our descriptor ring for hw flow control. */ - rc = spdk_bit_array_resize(&chan->ring_ctrl.ring_slots, num_ring_slots); + rc = spdk_bit_array_resize(&chan->ring_slots, num_ring_slots); if (rc < 0) { SPDK_ERRLOG("Unable to resize channel bit array\n"); return -ENOMEM; } - chan->ring_ctrl.max_ring_slots = num_ring_slots; + chan->max_ring_slots = num_ring_slots; /* * Note: The batch descriptor ring does not change with the @@ -706,6 +703,46 @@ spdk_idxd_detach(struct spdk_idxd_device *idxd) idxd_device_destruct(idxd); } +inline static void +_track_comp(struct spdk_idxd_io_channel *chan, bool batch_op, uint32_t index, + struct idxd_comp *comp_ctx, struct idxd_hw_desc *desc, struct idxd_batch *batch) +{ + comp_ctx->desc = desc; + comp_ctx->index = index; + /* Tag this as a batched operation or not so we know which bit array index to clear. */ + comp_ctx->batch_op = batch_op; + + if (batch_op == false) { + TAILQ_INSERT_TAIL(&chan->comp_ctx_oustanding, comp_ctx, link); + } else { + /* Build up completion addresses for batches but don't add them to + * the outstanding list until submission as it simplifies batch + * cancellation. + */ + batch->comp_ctx[index] = comp_ctx; + } +} + +inline static int +_vtophys(const void *buf, uint64_t *buf_addr, uint64_t size) +{ + uint64_t updated_size = size; + + *buf_addr = spdk_vtophys(buf, &updated_size); + + if (*buf_addr == SPDK_VTOPHYS_ERROR) { + SPDK_ERRLOG("Error translating address\n"); + return -EINVAL; + } + + if (updated_size < size) { + SPDK_ERRLOG("Error translating size (0x%lx), return size (0x%lx)\n", size, updated_size); + return -EINVAL; + } + + return 0; +} + static struct idxd_hw_desc * _idxd_prep_command(struct spdk_idxd_io_channel *chan, spdk_idxd_req_cb cb_fn, void *cb_arg, struct idxd_batch *batch) @@ -713,26 +750,39 @@ _idxd_prep_command(struct spdk_idxd_io_channel *chan, spdk_idxd_req_cb cb_fn, uint32_t index; struct idxd_hw_desc *desc; struct idxd_comp *comp; + uint64_t comp_hw_addr, desc_addr; + int rc; - index = spdk_bit_array_find_first_clear(chan->ring_ctrl.ring_slots, 0); + index = spdk_bit_array_find_first_clear(chan->ring_slots, 0); if (index == UINT32_MAX) { /* ran out of ring slots */ return NULL; } - spdk_bit_array_set(chan->ring_ctrl.ring_slots, index); + spdk_bit_array_set(chan->ring_slots, index); - desc = &chan->ring_ctrl.desc[index]; - comp = &chan->ring_ctrl.completions[index]; + desc = &chan->desc[index]; + comp = &chan->completions[index]; + + rc = _vtophys(&comp->hw, &comp_hw_addr, sizeof(struct idxd_hw_comp_record)); + if (rc) { + spdk_bit_array_clear(chan->ring_slots, index); + return NULL; + } + + rc = _vtophys(desc, &desc_addr, sizeof(struct idxd_hw_desc)); + if (rc) { + spdk_bit_array_clear(chan->ring_slots, index); + return NULL; + } + + _track_comp(chan, false, index, comp, desc, NULL); desc->flags = IDXD_FLAG_COMPLETION_ADDR_VALID | IDXD_FLAG_REQUEST_COMPLETION; - desc->completion_addr = (uintptr_t)&comp->hw; + desc->completion_addr = comp_hw_addr; comp->cb_arg = cb_arg; comp->cb_fn = cb_fn; - if (batch) { - comp->batch = batch; - batch->batch_desc_index = index; - } + comp->batch = batch; return desc; } @@ -743,7 +793,7 @@ spdk_idxd_submit_copy(struct spdk_idxd_io_channel *chan, void *dst, const void * { struct idxd_hw_desc *desc; uint64_t src_addr, dst_addr; - uint64_t src_nbytes = 0, dst_nbytes = 0; + int rc; /* Common prep. */ desc = _idxd_prep_command(chan, cb_fn, cb_arg, NULL); @@ -751,15 +801,14 @@ spdk_idxd_submit_copy(struct spdk_idxd_io_channel *chan, void *dst, const void * return -EBUSY; } - src_addr = spdk_vtophys(src, &src_nbytes); - dst_addr = spdk_vtophys(dst, &dst_nbytes); - - if (src_addr == SPDK_VTOPHYS_ERROR || dst_addr == SPDK_VTOPHYS_ERROR) { - return -EINVAL; + rc = _vtophys(src, &src_addr, nbytes); + if (rc) { + return rc; } - if (src_nbytes < nbytes || dst_nbytes < nbytes) { - return -EINVAL; + rc = _vtophys(dst, &dst_addr, nbytes); + if (rc) { + return rc; } /* Command specific. */ @@ -769,7 +818,7 @@ spdk_idxd_submit_copy(struct spdk_idxd_io_channel *chan, void *dst, const void * desc->xfer_size = nbytes; /* Submit operation. */ - movdir64b(chan->ring_ctrl.portal, desc); + movdir64b(chan->portal, desc); return 0; } @@ -781,7 +830,7 @@ spdk_idxd_submit_dualcast(struct spdk_idxd_io_channel *chan, void *dst1, void *d { struct idxd_hw_desc *desc; uint64_t src_addr, dst1_addr, dst2_addr; - uint64_t src_nbytes = 0, dst1_nbytes = 0, dst2_nbytes = 0; + int rc; if ((uintptr_t)dst1 & (ALIGN_4K - 1) || (uintptr_t)dst2 & (ALIGN_4K - 1)) { SPDK_ERRLOG("Dualcast requires 4K alignment on dst addresses\n"); @@ -794,17 +843,19 @@ spdk_idxd_submit_dualcast(struct spdk_idxd_io_channel *chan, void *dst1, void *d return -EBUSY; } - src_addr = spdk_vtophys(src, &src_nbytes); - dst1_addr = spdk_vtophys(dst1, &dst1_nbytes); - dst2_addr = spdk_vtophys(dst2, &dst2_nbytes); - - if (src_addr == SPDK_VTOPHYS_ERROR || dst1_addr == SPDK_VTOPHYS_ERROR || - dst2_addr == SPDK_VTOPHYS_ERROR) { - return -EINVAL; + rc = _vtophys(src, &src_addr, nbytes); + if (rc) { + return rc; } - if (src_nbytes < nbytes || dst1_nbytes < nbytes || dst2_nbytes < nbytes) { - return -EINVAL; + rc = _vtophys(dst1, &dst1_addr, nbytes); + if (rc) { + return rc; + } + + rc = _vtophys(dst2, &dst2_addr, nbytes); + if (rc) { + return rc; } /* Command specific. */ @@ -815,19 +866,18 @@ spdk_idxd_submit_dualcast(struct spdk_idxd_io_channel *chan, void *dst1, void *d desc->xfer_size = nbytes; /* Submit operation. */ - movdir64b(chan->ring_ctrl.portal, desc); + movdir64b(chan->portal, desc); return 0; } int spdk_idxd_submit_compare(struct spdk_idxd_io_channel *chan, void *src1, const void *src2, - uint64_t nbytes, - spdk_idxd_req_cb cb_fn, void *cb_arg) + uint64_t nbytes, spdk_idxd_req_cb cb_fn, void *cb_arg) { struct idxd_hw_desc *desc; uint64_t src1_addr, src2_addr; - uint64_t src1_nbytes = 0, src2_nbytes = 0; + int rc; /* Common prep. */ desc = _idxd_prep_command(chan, cb_fn, cb_arg, NULL); @@ -835,15 +885,14 @@ spdk_idxd_submit_compare(struct spdk_idxd_io_channel *chan, void *src1, const vo return -EBUSY; } - src1_addr = spdk_vtophys(src1, &src1_nbytes); - src2_addr = spdk_vtophys(src2, &src2_nbytes); - - if (src1_addr == SPDK_VTOPHYS_ERROR || src2_addr == SPDK_VTOPHYS_ERROR) { - return -EINVAL; + rc = _vtophys(src1, &src1_addr, nbytes); + if (rc) { + return rc; } - if (src1_nbytes < nbytes || src2_nbytes < nbytes) { - return -EINVAL; + rc = _vtophys(src2, &src2_addr, nbytes); + if (rc) { + return rc; } /* Command specific. */ @@ -853,7 +902,7 @@ spdk_idxd_submit_compare(struct spdk_idxd_io_channel *chan, void *src1, const vo desc->xfer_size = nbytes; /* Submit operation. */ - movdir64b(chan->ring_ctrl.portal, desc); + movdir64b(chan->portal, desc); return 0; } @@ -864,7 +913,7 @@ spdk_idxd_submit_fill(struct spdk_idxd_io_channel *chan, void *dst, uint64_t fil { struct idxd_hw_desc *desc; uint64_t dst_addr; - uint64_t dst_nbytes = 0; + int rc; /* Common prep. */ desc = _idxd_prep_command(chan, cb_fn, cb_arg, NULL); @@ -872,14 +921,9 @@ spdk_idxd_submit_fill(struct spdk_idxd_io_channel *chan, void *dst, uint64_t fil return -EBUSY; } - dst_addr = spdk_vtophys(dst, &dst_nbytes); - - if (dst_addr == SPDK_VTOPHYS_ERROR) { - return -EINVAL; - } - - if (dst_nbytes < nbytes) { - return -EINVAL; + rc = _vtophys(dst, &dst_addr, nbytes); + if (rc) { + return rc; } /* Command specific. */ @@ -889,7 +933,7 @@ spdk_idxd_submit_fill(struct spdk_idxd_io_channel *chan, void *dst, uint64_t fil desc->xfer_size = nbytes; /* Submit operation. */ - movdir64b(chan->ring_ctrl.portal, desc); + movdir64b(chan->portal, desc); return 0; } @@ -901,7 +945,7 @@ spdk_idxd_submit_crc32c(struct spdk_idxd_io_channel *chan, uint32_t *dst, void * { struct idxd_hw_desc *desc; uint64_t src_addr, dst_addr; - uint64_t src_nbytes = 0, dst_nbytes = 0; + int rc; /* Common prep. */ desc = _idxd_prep_command(chan, cb_fn, cb_arg, NULL); @@ -909,15 +953,14 @@ spdk_idxd_submit_crc32c(struct spdk_idxd_io_channel *chan, uint32_t *dst, void * return -EBUSY; } - src_addr = spdk_vtophys(src, &src_nbytes); - dst_addr = spdk_vtophys(dst, &dst_nbytes); - - if (src_addr == SPDK_VTOPHYS_ERROR || dst_addr == SPDK_VTOPHYS_ERROR) { - return -EINVAL; + rc = _vtophys(src, &src_addr, nbytes); + if (rc) { + return rc; } - if (src_nbytes < nbytes || dst_nbytes < nbytes) { - return -EINVAL; + rc = _vtophys(dst, &dst_addr, nbytes); + if (rc) { + return rc; } /* Command specific. */ @@ -929,7 +972,7 @@ spdk_idxd_submit_crc32c(struct spdk_idxd_io_channel *chan, uint32_t *dst, void * desc->xfer_size = nbytes; /* Submit operation. */ - movdir64b(chan->ring_ctrl.portal, desc); + movdir64b(chan->portal, desc); return 0; } @@ -937,47 +980,31 @@ spdk_idxd_submit_crc32c(struct spdk_idxd_io_channel *chan, uint32_t *dst, void * uint32_t spdk_idxd_batch_get_max(void) { - return DESC_PER_BATCH; /* TODO maybe add startup RPC to set this */ + /* TODO: consider setting this via RPC. */ + return DESC_PER_BATCH; } struct idxd_batch * spdk_idxd_batch_create(struct spdk_idxd_io_channel *chan) { - struct idxd_batch *batch = NULL; + struct idxd_batch *batch; if (!TAILQ_EMPTY(&chan->batch_pool)) { batch = TAILQ_FIRST(&chan->batch_pool); + batch->index = batch->remaining = 0; + batch->submitted = false; TAILQ_REMOVE(&chan->batch_pool, batch, link); + TAILQ_INSERT_TAIL(&chan->batches, batch, link); } else { /* The application needs to handle this. */ return NULL; } - batch->batch_num = spdk_bit_array_find_first_clear(chan->ring_ctrl.user_ring_slots, 0); - if (batch->batch_num == UINT32_MAX) { - /* ran out of ring slots, the application needs to handle this. */ - TAILQ_INSERT_TAIL(&chan->batch_pool, batch, link); - return NULL; - } - - spdk_bit_array_set(chan->ring_ctrl.user_ring_slots, batch->batch_num); - - /* - * Find the first descriptor address for the given batch. The - * descriptor ring used for user desctipors is allocated in - * units of DESC_PER_BATCH. The actual index is in units of - * one descriptor. - */ - batch->start_index = batch->cur_index = batch->batch_num * DESC_PER_BATCH; - - TAILQ_INSERT_TAIL(&chan->batches, batch, link); - SPDK_DEBUGLOG(idxd, "New batch %p num %u\n", batch, batch->batch_num); - return batch; } static bool -_does_batch_exist(struct idxd_batch *batch, struct spdk_idxd_io_channel *chan) +_is_batch_valid(struct idxd_batch *batch, struct spdk_idxd_io_channel *chan) { bool found = false; struct idxd_batch *cur_batch; @@ -992,61 +1019,89 @@ _does_batch_exist(struct idxd_batch *batch, struct spdk_idxd_io_channel *chan) return found; } +static void +_free_batch(struct idxd_batch *batch, struct spdk_idxd_io_channel *chan) +{ + SPDK_DEBUGLOG(idxd, "Free batch %p\n", batch); + assert(batch->remaining == 0); + TAILQ_REMOVE(&chan->batches, batch, link); + TAILQ_INSERT_TAIL(&chan->batch_pool, batch, link); +} + int spdk_idxd_batch_cancel(struct spdk_idxd_io_channel *chan, struct idxd_batch *batch) { - if (_does_batch_exist(batch, chan) == false) { - SPDK_ERRLOG("Attempt to cancel a batch that doesn't exist\n."); + if (_is_batch_valid(batch, chan) == false) { + SPDK_ERRLOG("Attempt to cancel an invalid batch.\n"); return -EINVAL; } - if (batch->remaining > 0) { - SPDK_ERRLOG("Cannot cancel batch, already submitted to HW\n."); + if (batch->submitted == true) { + SPDK_ERRLOG("Cannot cancel batch, already submitted to HW.\n"); return -EINVAL; } - TAILQ_REMOVE(&chan->batches, batch, link); - spdk_bit_array_clear(chan->ring_ctrl.user_ring_slots, batch->batch_num); - TAILQ_INSERT_TAIL(&chan->batch_pool, batch, link); + batch->remaining = 0; + _free_batch(batch, chan); return 0; } +static int _idxd_batch_prep_nop(struct spdk_idxd_io_channel *chan, struct idxd_batch *batch); + int spdk_idxd_batch_submit(struct spdk_idxd_io_channel *chan, struct idxd_batch *batch, spdk_idxd_req_cb cb_fn, void *cb_arg) { struct idxd_hw_desc *desc; + uint64_t desc_addr; + int i, rc; - if (_does_batch_exist(batch, chan) == false) { - SPDK_ERRLOG("Attempt to submit a batch that doesn't exist\n."); + if (_is_batch_valid(batch, chan) == false) { + SPDK_ERRLOG("Attempt to submit an invalid batch.\n"); return -EINVAL; } + if (batch->remaining < MIN_USER_DESC_COUNT) { + /* DSA needs at least MIN_USER_DESC_COUNT for a batch, add a NOP to make it so. */ + if (_idxd_batch_prep_nop(chan, batch)) { + return -EINVAL; + } + } + /* Common prep. */ desc = _idxd_prep_command(chan, cb_fn, cb_arg, batch); if (desc == NULL) { - SPDK_DEBUGLOG(idxd, "Can't submit batch %p busy batch num %u\n", batch, batch->batch_num); + SPDK_DEBUGLOG(idxd, "Busy, can't submit batch %p\n", batch); return -EBUSY; } + rc = _vtophys(batch->user_desc, &desc_addr, batch->remaining * sizeof(struct idxd_hw_desc)); + if (rc) { + return -EINVAL; + } + /* Command specific. */ desc->opcode = IDXD_OPCODE_BATCH; - desc->desc_list_addr = (uintptr_t)&chan->ring_ctrl.user_desc[batch->start_index]; - desc->desc_count = batch->cur_index - batch->start_index; - assert(desc->desc_count <= DESC_PER_BATCH); + desc->desc_list_addr = desc_addr; + desc->desc_count = batch->remaining; + assert(batch->remaining <= DESC_PER_BATCH); + assert(batch->remaining == batch->index); - if (desc->desc_count < MIN_USER_DESC_COUNT) { - SPDK_ERRLOG("Attempt to submit a batch without at least %u operations.\n", - MIN_USER_DESC_COUNT); - return -EINVAL; + /* Add the batch elements completion contexts to the outstanding list to be polled. */ + for (i = 0 ; i < batch->index; i++) { + TAILQ_INSERT_TAIL(&chan->comp_ctx_oustanding, (struct idxd_comp *)batch->comp_ctx[i], link); } - /* Total completions for the batch = num desc plus 1 for the batch desc itself. */ - batch->remaining = desc->desc_count + 1; + /* Add one for the batch desc itself, we use this to determine when + * to free the batch. + */ + batch->remaining++; /* Submit operation. */ - movdir64b(chan->ring_ctrl.portal, desc); + batch->submitted = true; + movdir64b(chan->portal, desc); + SPDK_DEBUGLOG(idxd, "Submitted batch %p\n", batch); return 0; } @@ -1057,23 +1112,31 @@ _idxd_prep_batch_cmd(struct spdk_idxd_io_channel *chan, spdk_idxd_req_cb cb_fn, { struct idxd_hw_desc *desc; struct idxd_comp *comp; + uint64_t desc_addr; + int rc; - if (_does_batch_exist(batch, chan) == false) { - SPDK_ERRLOG("Attempt to add to a batch that doesn't exist\n."); + if (_is_batch_valid(batch, chan) == false) { + SPDK_ERRLOG("Attempt to add to an invalid batch.\n"); return NULL; } - if ((batch->cur_index - batch->start_index) == DESC_PER_BATCH) { - SPDK_ERRLOG("Attempt to add to a batch that is already full\n."); + if (batch->remaining == DESC_PER_BATCH) { + SPDK_ERRLOG("Attempt to add to a batch that is already full.\n"); return NULL; } - desc = &chan->ring_ctrl.user_desc[batch->cur_index]; - comp = &chan->ring_ctrl.user_completions[batch->cur_index]; - SPDK_DEBUGLOG(idxd, "Prep batch %p index %u\n", batch, batch->cur_index); + batch->remaining++; + desc = &batch->user_desc[batch->index]; + rc = _vtophys(desc, &desc_addr, sizeof(struct idxd_hw_desc)); + if (rc) { + return NULL; + } - batch->cur_index++; - assert(batch->cur_index > batch->start_index); + comp = &batch->user_completions[batch->index]; + _track_comp(chan, true, batch->index, comp, desc, batch); + SPDK_DEBUGLOG(idxd, "Prep batch %p index %u\n", batch, batch->index); + + batch->index++; desc->flags = IDXD_FLAG_COMPLETION_ADDR_VALID | IDXD_FLAG_REQUEST_COMPLETION; desc->completion_addr = (uintptr_t)&comp->hw; @@ -1084,13 +1147,32 @@ _idxd_prep_batch_cmd(struct spdk_idxd_io_channel *chan, spdk_idxd_req_cb cb_fn, return desc; } +static int +_idxd_batch_prep_nop(struct spdk_idxd_io_channel *chan, struct idxd_batch *batch) +{ + struct idxd_hw_desc *desc; + + /* Common prep. */ + desc = _idxd_prep_batch_cmd(chan, NULL, NULL, batch); + if (desc == NULL) { + return -EINVAL; + } + + /* Command specific. */ + desc->opcode = IDXD_OPCODE_NOOP; + /* TODO: temp workaround for simulator. Remove when fixed or w/silicon. */ + desc->xfer_size = 1; + + return 0; +} + int spdk_idxd_batch_prep_copy(struct spdk_idxd_io_channel *chan, struct idxd_batch *batch, void *dst, const void *src, uint64_t nbytes, spdk_idxd_req_cb cb_fn, void *cb_arg) { struct idxd_hw_desc *desc; uint64_t src_addr, dst_addr; - uint64_t src_nbytes = 0, dst_nbytes = 0; + int rc; /* Common prep. */ desc = _idxd_prep_batch_cmd(chan, cb_fn, cb_arg, batch); @@ -1098,15 +1180,14 @@ spdk_idxd_batch_prep_copy(struct spdk_idxd_io_channel *chan, struct idxd_batch * return -EINVAL; } - src_addr = spdk_vtophys(src, &src_nbytes); - dst_addr = spdk_vtophys(dst, &dst_nbytes); - - if (src_addr == SPDK_VTOPHYS_ERROR || dst_addr == SPDK_VTOPHYS_ERROR) { - return -EINVAL; + rc = _vtophys(src, &src_addr, nbytes); + if (rc) { + return rc; } - if (src_nbytes < nbytes || dst_nbytes < nbytes) { - return -EINVAL; + rc = _vtophys(dst, &dst_addr, nbytes); + if (rc) { + return rc; } /* Command specific. */ @@ -1125,7 +1206,7 @@ spdk_idxd_batch_prep_fill(struct spdk_idxd_io_channel *chan, struct idxd_batch * { struct idxd_hw_desc *desc; uint64_t dst_addr; - uint64_t dst_nbytes = 0; + int rc; /* Common prep. */ desc = _idxd_prep_batch_cmd(chan, cb_fn, cb_arg, batch); @@ -1133,14 +1214,9 @@ spdk_idxd_batch_prep_fill(struct spdk_idxd_io_channel *chan, struct idxd_batch * return -EINVAL; } - dst_addr = spdk_vtophys(dst, &dst_nbytes); - - if (dst_addr == SPDK_VTOPHYS_ERROR) { - return -EINVAL; - } - - if (dst_nbytes < nbytes) { - return -EINVAL; + rc = _vtophys(dst, &dst_addr, nbytes); + if (rc) { + return rc; } /* Command specific. */ @@ -1158,7 +1234,7 @@ spdk_idxd_batch_prep_dualcast(struct spdk_idxd_io_channel *chan, struct idxd_bat { struct idxd_hw_desc *desc; uint64_t src_addr, dst1_addr, dst2_addr; - uint64_t src_nbytes = 0, dst1_nbytes = 0, dst2_nbytes = 0; + int rc; if ((uintptr_t)dst1 & (ALIGN_4K - 1) || (uintptr_t)dst2 & (ALIGN_4K - 1)) { SPDK_ERRLOG("Dualcast requires 4K alignment on dst addresses\n"); @@ -1171,17 +1247,19 @@ spdk_idxd_batch_prep_dualcast(struct spdk_idxd_io_channel *chan, struct idxd_bat return -EINVAL; } - src_addr = spdk_vtophys(src, &src_nbytes); - dst1_addr = spdk_vtophys(dst1, &dst1_nbytes); - dst2_addr = spdk_vtophys(dst2, &dst2_nbytes); - - if (src_addr == SPDK_VTOPHYS_ERROR || dst1_addr == SPDK_VTOPHYS_ERROR || - dst2_addr == SPDK_VTOPHYS_ERROR) { - return -EINVAL; + rc = _vtophys(src, &src_addr, nbytes); + if (rc) { + return rc; } - if (src_nbytes < nbytes || dst1_nbytes < nbytes || dst2_nbytes < nbytes) { - return -EINVAL; + rc = _vtophys(dst1, &dst1_addr, nbytes); + if (rc) { + return rc; + } + + rc = _vtophys(dst2, &dst2_addr, nbytes); + if (rc) { + return rc; } desc->opcode = IDXD_OPCODE_DUALCAST; @@ -1200,7 +1278,7 @@ spdk_idxd_batch_prep_crc32c(struct spdk_idxd_io_channel *chan, struct idxd_batch { struct idxd_hw_desc *desc; uint64_t src_addr, dst_addr; - uint64_t src_nbytes = 0, dst_nbytes = 0; + int rc; /* Common prep. */ desc = _idxd_prep_batch_cmd(chan, cb_fn, cb_arg, batch); @@ -1208,15 +1286,14 @@ spdk_idxd_batch_prep_crc32c(struct spdk_idxd_io_channel *chan, struct idxd_batch return -EINVAL; } - src_addr = spdk_vtophys(src, &src_nbytes); - dst_addr = spdk_vtophys(dst, &dst_nbytes); - - if (src_addr == SPDK_VTOPHYS_ERROR || dst_addr == SPDK_VTOPHYS_ERROR) { - return -EINVAL; + rc = _vtophys(src, &src_addr, nbytes); + if (rc) { + return rc; } - if (src_nbytes < nbytes || dst_nbytes < nbytes) { - return -EINVAL; + rc = _vtophys(dst, &dst_addr, nbytes); + if (rc) { + return rc; } /* Command specific. */ @@ -1237,7 +1314,7 @@ spdk_idxd_batch_prep_compare(struct spdk_idxd_io_channel *chan, struct idxd_batc { struct idxd_hw_desc *desc; uint64_t src1_addr, src2_addr; - uint64_t src1_nbytes = 0, src2_nbytes = 0; + int rc; /* Common prep. */ desc = _idxd_prep_batch_cmd(chan, cb_fn, cb_arg, batch); @@ -1245,15 +1322,14 @@ spdk_idxd_batch_prep_compare(struct spdk_idxd_io_channel *chan, struct idxd_batc return -EINVAL; } - src1_addr = spdk_vtophys(src1, &src1_nbytes); - src2_addr = spdk_vtophys(src2, &src2_nbytes); - - if (src1_addr == SPDK_VTOPHYS_ERROR || src2_addr == SPDK_VTOPHYS_ERROR) { - return -EINVAL; + rc = _vtophys(src1, &src1_addr, nbytes); + if (rc) { + return rc; } - if (src1_nbytes < nbytes || src2_nbytes < nbytes) { - return -EINVAL; + rc = _vtophys(src2, &src2_addr, nbytes); + if (rc) { + return rc; } /* Command specific. */ @@ -1284,138 +1360,62 @@ _dump_error_reg(struct spdk_idxd_io_channel *chan) SPDK_NOTICELOG("SW Error Operation: %u\n", (uint8_t)(sw_error_0 >> 32)); } -static void -_free_batch(struct idxd_batch *batch, struct spdk_idxd_io_channel *chan, - struct idxd_comp *comp) +/* TODO: there are multiple ways of getting completions but we can't really pick the best one without + * silicon (from a perf perspective at least). The current solution has a large (> cache line) comp_ctx + * struct but only has one polling loop that covers both batch and regular descriptors based on a list + * of comp_ctx that we know have outstanding commands. Another experiment would be to have a 64 byte comp_ctx + * by relying on the bit_array indicies to get all the context we need. This has been implemented in a prev + * version but has the downside of needing to poll multiple ranges of comp records (because of batches) and + * needs to look at each array bit to know whether it should even check that completion record. That may be + * faster though, need to experiment. + */ +void +spdk_idxd_process_events(struct spdk_idxd_io_channel *chan) { - TAILQ_REMOVE(&chan->batches, batch, link); - TAILQ_INSERT_TAIL(&chan->batch_pool, batch, link); - comp->batch = NULL; - spdk_bit_array_clear(chan->ring_ctrl.user_ring_slots, batch->batch_num); - spdk_bit_array_clear(chan->ring_ctrl.ring_slots, batch->batch_desc_index); -} - -static void -_spdk_idxd_process_batch_events(struct spdk_idxd_io_channel *chan) -{ - uint16_t index; - struct idxd_comp *comp; + struct idxd_comp *comp_ctx, *tmp; uint64_t sw_error_0; int status = 0; - struct idxd_batch *batch; - /* - * We don't check the bit array for user completions as there's only - * one bit per per batch. - */ - for (index = 0; index < TOTAL_USER_DESC; index++) { - comp = &chan->ring_ctrl.user_completions[index]; - if (comp->hw.status == 1) { - struct idxd_hw_desc *desc; + TAILQ_FOREACH_SAFE(comp_ctx, &chan->comp_ctx_oustanding, link, tmp) { + if (comp_ctx->hw.status == 1) { + TAILQ_REMOVE(&chan->comp_ctx_oustanding, comp_ctx, link); sw_error_0 = _idxd_read_8(chan->idxd, IDXD_SWERR_OFFSET); if (sw_error_0 & 0x1) { _dump_error_reg(chan); status = -EINVAL; } - desc = &chan->ring_ctrl.user_desc[index]; - switch (desc->opcode) { + switch (comp_ctx->desc->opcode) { + case IDXD_OPCODE_BATCH: + SPDK_DEBUGLOG(idxd, "Complete batch %p\n", comp_ctx->batch); + break; case IDXD_OPCODE_CRC32C_GEN: - *(uint32_t *)desc->dst_addr = comp->hw.crc32c_val; - *(uint32_t *)desc->dst_addr ^= ~0; + *(uint32_t *)comp_ctx->desc->dst_addr = comp_ctx->hw.crc32c_val; + *(uint32_t *)comp_ctx->desc->dst_addr ^= ~0; break; case IDXD_OPCODE_COMPARE: if (status == 0) { - status = comp->hw.result; + status = comp_ctx->hw.result; } break; - case IDXD_OPCODE_MEMFILL: - case IDXD_OPCODE_DUALCAST: - case IDXD_OPCODE_MEMMOVE: - break; - default: - assert(false); - break; } - /* The hw will complete all user desc first before the batch - * desc (see spec for configuration exceptions) however - * because of the order that we check for comps in the poller - * we may "see" them in a different order than they actually - * completed in. - */ - batch = comp->batch; - assert(batch->remaining > 0); - if (--batch->remaining == 0) { - _free_batch(batch, chan, comp); + if (comp_ctx->cb_fn) { + comp_ctx->cb_fn(comp_ctx->cb_arg, status); } - comp->cb_fn((void *)comp->cb_arg, status); - comp->hw.status = status = 0; - } - } -} + comp_ctx->hw.status = status = 0; -/* - * TODO: Experiment with different methods of reaping completions for performance - * once we have real silicon. - */ -void -spdk_idxd_process_events(struct spdk_idxd_io_channel *chan) -{ - uint16_t index; - struct idxd_comp *comp; - uint64_t sw_error_0; - int status = 0; - struct idxd_batch *batch; + if (comp_ctx->batch_op == false) { + assert(spdk_bit_array_get(chan->ring_slots, comp_ctx->index)); + spdk_bit_array_clear(chan->ring_slots, comp_ctx->index); + } - if (!TAILQ_EMPTY(&chan->batches)) { - _spdk_idxd_process_batch_events(chan); - } - - for (index = 0; index < chan->ring_ctrl.max_ring_slots; index++) { - if (spdk_bit_array_get(chan->ring_ctrl.ring_slots, index)) { - comp = &chan->ring_ctrl.completions[index]; - if (comp->hw.status == 1) { - struct idxd_hw_desc *desc; - - sw_error_0 = _idxd_read_8(chan->idxd, IDXD_SWERR_OFFSET); - if (sw_error_0 & 0x1) { - _dump_error_reg(chan); - status = -EINVAL; - } - - desc = &chan->ring_ctrl.desc[index]; - switch (desc->opcode) { - case IDXD_OPCODE_BATCH: - /* The hw will complete all user desc first before the batch - * desc (see spec for configuration exceptions) however - * because of the order that we check for comps in the poller - * we may "see" them in a different order than they actually - * completed in. - */ - batch = comp->batch; - assert(batch->remaining > 0); - if (--batch->remaining == 0) { - _free_batch(batch, chan, comp); - } - break; - case IDXD_OPCODE_CRC32C_GEN: - *(uint32_t *)desc->dst_addr = comp->hw.crc32c_val; - *(uint32_t *)desc->dst_addr ^= ~0; - break; - case IDXD_OPCODE_COMPARE: - if (status == 0) { - status = comp->hw.result; - } - break; - } - - comp->cb_fn(comp->cb_arg, status); - comp->hw.status = status = 0; - if (desc->opcode != IDXD_OPCODE_BATCH) { - spdk_bit_array_clear(chan->ring_ctrl.ring_slots, index); + if (comp_ctx->batch) { + assert(comp_ctx->batch->remaining > 0); + if (--comp_ctx->batch->remaining == 0) { + _free_batch(comp_ctx->batch, chan); } } } diff --git a/lib/idxd/idxd.h b/lib/idxd/idxd.h index 09d021152..2435d9ee6 100644 --- a/lib/idxd/idxd.h +++ b/lib/idxd/idxd.h @@ -59,25 +59,37 @@ static inline void movdir64b(void *dst, const void *src) #define IDXD_REGISTER_TIMEOUT_US 50 #define IDXD_DRAIN_TIMEOUT_US 500000 -/* TODO: make some of these RPC selectable */ #define WQ_MODE_DEDICATED 1 -#define LOG2_WQ_MAX_BATCH 8 /* 2^8 = 256 */ + +/* TODO: consider setting the max per batch limit via RPC. */ + +/* The following sets up a max desc count per batch of 16 */ +#define LOG2_WQ_MAX_BATCH 4 /* 2^4 = 16 */ +#define DESC_PER_BATCH (1 << LOG2_WQ_MAX_BATCH) +/* We decide how many batches we want to support based on what max queue + * depth makes sense resource wise. There is a small price to pay with + * larger numbers wrt polling for completions. + */ +#define NUM_BATCHES_PER_CHANNEL 0x400 +#define MIN_USER_DESC_COUNT 2 + #define LOG2_WQ_MAX_XFER 30 /* 2^30 = 1073741824 */ #define WQCFG_NUM_DWORDS 8 #define WQ_PRIORITY_1 1 #define IDXD_MAX_QUEUES 64 -#define TOTAL_USER_DESC (1 << LOG2_WQ_MAX_BATCH) -#define DESC_PER_BATCH 16 /* TODO maybe make this a startup RPC */ -#define NUM_BATCHES (TOTAL_USER_DESC / DESC_PER_BATCH) -#define MIN_USER_DESC_COUNT 2 - +/* Each pre-allocated batch structure goes on a per channel list and + * contains the memory for both user descriptors. The array of comp_ctx + * holds the list of completion contexts that we will add to the list + * used by the poller. The list is updated when the batch is submitted. + */ struct idxd_batch { - uint32_t batch_desc_index; - uint32_t batch_num; - uint32_t cur_index; - uint32_t start_index; + struct idxd_hw_desc *user_desc; + struct idxd_comp *user_completions; uint32_t remaining; + bool submitted; + void *comp_ctx[DESC_PER_BATCH]; + uint8_t index; TAILQ_ENTRY(idxd_batch) link; }; @@ -90,40 +102,44 @@ struct device_config { uint16_t total_engines; }; -struct idxd_ring_control { - void *portal; +struct idxd_comp ; +struct spdk_idxd_io_channel { + struct spdk_idxd_device *idxd; + /* The portal is the address that we write descriptors to for submission. */ + void *portal; uint16_t ring_size; /* - * Rings for this channel, one for descriptors and one - * for completions, share the same index. Batch descriptors - * are managed independently from data descriptors. + * Descriptors and completions share the same index. User descriptors + * (those included in a batch) are managed independently from data descriptors + * and are located in the batch structure. */ struct idxd_hw_desc *desc; struct idxd_comp *completions; - struct idxd_hw_desc *user_desc; - struct idxd_comp *user_completions; + + /* Current list of oustanding completion addresses to poll. */ + TAILQ_HEAD(, idxd_comp) comp_ctx_oustanding; /* * We use one bit array to track ring slots for both * desc and completions. + * + * TODO: We can get rid of the bit array and just use a uint + * to manage flow control as the current implementation saves + * enough info in comp_ctx that it doesn't need the index. Keeping + * the bit arrays for now as (a) they provide some extra debug benefit + * until we have silicon and (b) they may still be needed depending on + * polling implementation experiments that we need to run with real silicon. */ struct spdk_bit_array *ring_slots; uint32_t max_ring_slots; - /* - * We use a separate bit array to track ring slots for - * descriptors submitted via the user in a batch. - */ - struct spdk_bit_array *user_ring_slots; -}; + /* Lists of batches, free and in use. */ + TAILQ_HEAD(, idxd_batch) batch_pool; + TAILQ_HEAD(, idxd_batch) batches; -struct spdk_idxd_io_channel { - struct spdk_idxd_device *idxd; - struct idxd_ring_control ring_ctrl; - TAILQ_HEAD(, idxd_batch) batch_pool; /* free batches */ - TAILQ_HEAD(, idxd_batch) batches; /* in use batches */ + void *batch_base; }; struct pci_dev_id { @@ -154,9 +170,13 @@ struct idxd_comp { void *cb_arg; spdk_idxd_req_cb cb_fn; struct idxd_batch *batch; - uint64_t pad2; -} __attribute__((packed)); -SPDK_STATIC_ASSERT(sizeof(struct idxd_comp) == 64, "size mismatch"); + bool batch_op; + struct idxd_hw_desc *desc; + uint32_t index; + char pad[3]; + TAILQ_ENTRY(idxd_comp) link; +}; +SPDK_STATIC_ASSERT(sizeof(struct idxd_comp) == 96, "size mismatch"); struct idxd_wq { struct spdk_idxd_device *idxd; diff --git a/module/accel/idxd/accel_engine_idxd.c b/module/accel/idxd/accel_engine_idxd.c index f6a31d78d..f5d1da7f3 100644 --- a/module/accel/idxd/accel_engine_idxd.c +++ b/module/accel/idxd/accel_engine_idxd.c @@ -113,6 +113,7 @@ _process_single_task(struct spdk_io_channel *ch, struct spdk_accel_task *task) { struct idxd_io_channel *chan = spdk_io_channel_get_ctx(ch); int rc = 0; + uint8_t fill_pattern = (uint8_t)task->fill_pattern; switch (task->op_code) { case ACCEL_OPCODE_MEMMOVE: @@ -126,6 +127,7 @@ _process_single_task(struct spdk_io_channel *ch, struct spdk_accel_task *task) rc = spdk_idxd_submit_compare(chan->chan, task->src, task->src2, task->nbytes, idxd_done, task); break; case ACCEL_OPCODE_MEMFILL: + memset(&task->fill_pattern, fill_pattern, sizeof(uint64_t)); rc = spdk_idxd_submit_fill(chan->chan, task->dst, task->fill_pattern, task->nbytes, idxd_done, task); break; @@ -148,6 +150,7 @@ idxd_submit_tasks(struct spdk_io_channel *ch, struct spdk_accel_task *first_task struct idxd_io_channel *chan = spdk_io_channel_get_ctx(ch); struct spdk_accel_task *task, *tmp, *batch_task; struct idxd_batch *idxd_batch; + uint8_t fill_pattern; TAILQ_HEAD(, spdk_accel_task) batch_tasks; int rc = 0; uint32_t task_count = 0; @@ -178,15 +181,26 @@ idxd_submit_tasks(struct spdk_io_channel *ch, struct spdk_accel_task *first_task return 0; } + /* TODO: The DSA batch interface has performance tradeoffs that we need to measure with real + * silicon. It's unclear which workloads will benefit from batching and with what sizes. Or + * if there are some cases where batching is more beneficial on the completion side by turning + * off completion notifications for elements within the batch. Need to run some experiments where + * we use the batching interface versus not to help provide guidance on how to use these batching + * API. Here below is one such place, currently those batching using the framework will end up + * also using the DSA batch interface. We could send each of these operations as single commands + * to the low level library. + */ + /* More than one task, create IDXD batch(es). */ do { idxd_batch = spdk_idxd_batch_create(chan->chan); - task_count = 0; if (idxd_batch == NULL) { /* Queue them all and try again later */ goto queue_tasks; } + task_count = 0; + /* Keep track of each batch's tasks in case we need to cancel. */ TAILQ_INIT(&batch_tasks); do { @@ -204,6 +218,8 @@ idxd_submit_tasks(struct spdk_io_channel *ch, struct spdk_accel_task *first_task task->nbytes, idxd_done, task); break; case ACCEL_OPCODE_MEMFILL: + fill_pattern = (uint8_t)task->fill_pattern; + memset(&task->fill_pattern, fill_pattern, sizeof(uint64_t)); rc = spdk_idxd_batch_prep_fill(chan->chan, idxd_batch, task->dst, task->fill_pattern, task->nbytes, idxd_done, task); break; @@ -232,25 +248,21 @@ idxd_submit_tasks(struct spdk_io_channel *ch, struct spdk_accel_task *first_task if (!TAILQ_EMPTY(&batch_tasks)) { rc = spdk_idxd_batch_submit(chan->chan, idxd_batch, NULL, NULL); - /* If we can't submit the batch, just destroy it and queue up all the operations - * from the latest batch and try again later. If this list was from an accel_fw batch, - * all of the batch info is still associated with the tasks that we're about to - * queue up so nothing is lost. - */ if (rc) { + /* Cancel the batch, requeue the items in the batch adn then + * any tasks that still hadn't been processed yet. + */ spdk_idxd_batch_cancel(chan->chan, idxd_batch); + while (!TAILQ_EMPTY(&batch_tasks)) { batch_task = TAILQ_FIRST(&batch_tasks); TAILQ_REMOVE(&batch_tasks, batch_task, link); TAILQ_INSERT_TAIL(&chan->queued_tasks, batch_task, link); } - rc = 0; + goto queue_tasks; } - } else { - /* the last batch task list was empty so all tasks had their cb_fn called. */ - rc = 0; } - } while (task && rc == 0); + } while (task); return 0; diff --git a/test/unit/lib/idxd/idxd.c/idxd_ut.c b/test/unit/lib/idxd/idxd.c/idxd_ut.c index 0eed4273a..a4e1f390c 100644 --- a/test/unit/lib/idxd/idxd.c/idxd_ut.c +++ b/test/unit/lib/idxd/idxd.c/idxd_ut.c @@ -95,7 +95,7 @@ test_idxd_wq_config(void) { struct spdk_idxd_device idxd = {}; union idxd_wqcfg wqcfg = {}; - uint32_t expected[8] = {0x10, 0, 0x11, 0x11e, 0, 0, 0x40000000, 0}; + uint32_t expected[8] = {0x10, 0, 0x11, 0x9e, 0, 0, 0x40000000, 0}; uint32_t wq_size; int rc, i, j; @@ -260,18 +260,18 @@ test_spdk_idxd_reconfigure_chan(void) uint32_t test_ring_size = 8; uint32_t num_channels = 2; - chan.ring_ctrl.ring_slots = spdk_bit_array_create(test_ring_size); - chan.ring_ctrl.ring_size = test_ring_size; - chan.ring_ctrl.completions = spdk_zmalloc(test_ring_size * sizeof(struct idxd_hw_desc), 0, NULL, - SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); - SPDK_CU_ASSERT_FATAL(chan.ring_ctrl.completions != NULL); + chan.ring_slots = spdk_bit_array_create(test_ring_size); + chan.ring_size = test_ring_size; + chan.completions = spdk_zmalloc(test_ring_size * sizeof(struct idxd_hw_desc), 0, NULL, + SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); + SPDK_CU_ASSERT_FATAL(chan.completions != NULL); rc = spdk_idxd_reconfigure_chan(&chan, num_channels); CU_ASSERT(rc == 0); - CU_ASSERT(chan.ring_ctrl.max_ring_slots == test_ring_size / num_channels); + CU_ASSERT(chan.max_ring_slots == test_ring_size / num_channels); - spdk_bit_array_free(&chan.ring_ctrl.ring_slots); - spdk_free(chan.ring_ctrl.completions); + spdk_bit_array_free(&chan.ring_slots); + spdk_free(chan.completions); return 0; }