env: Add free_space parameter to spdk_ring_enqueue

DPDK rte_ring_enqueue_bulk() has free_space parameter to return
the amount of space in the ring after enqueue operation has finished.
This parameter can be used to wait when the ring is almost full and
wake up when there is enough space available in the ring.

Hence we add free_space to spdk_ring_enqueue() and spdk_ring_enqueue()
passes it to rte_ring_enqueue_bulk() simply.

Signed-off-by: Shuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
Change-Id: I9b9d6a5a097cf6dc4b97dfda7442f2c4b0aed4d3
Reviewed-on: https://review.gerrithub.io/c/spdk/spdk/+/456734
Tested-by: SPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: Jim Harris <james.r.harris@intel.com>
Reviewed-by: Darek Stojaczyk <dariusz.stojaczyk@intel.com>
This commit is contained in:
Shuhei Matsumoto 2019-06-04 13:22:00 +09:00 committed by Darek Stojaczyk
parent 5f28443821
commit 1554a344c9
10 changed files with 25 additions and 14 deletions

View File

@ -39,6 +39,12 @@ taken into account if trying to achieve an artificial latency on top of an nvme
Added spdk_nvme_ctrlr_get_transport_id() to get the transport ID from a
previously attached controller.
### env
The parameter `free_space` has been added to spdk_ring_enqueue() to wait when
the ring is almost full and resume when there is enough space available in
the ring.
## v19.04:
### nvme

View File

@ -582,10 +582,12 @@ size_t spdk_ring_count(struct spdk_ring *ring);
* \param ring A pointer to the ring.
* \param objs A pointer to the array to be queued.
* \param count Length count of the array of objects.
* \param free_space If non-NULL, amount of free space after the enqueue has finished.
*
* \return the number of objects enqueued.
*/
size_t spdk_ring_enqueue(struct spdk_ring *ring, void **objs, size_t count);
size_t spdk_ring_enqueue(struct spdk_ring *ring, void **objs, size_t count,
size_t *free_space);
/**
* Dequeue count objects from the ring into the array objs.

View File

@ -247,7 +247,7 @@ bdev_ftl_cb(void *arg, int status)
io->status = status;
cnt = spdk_ring_enqueue(io->ring, (void **)&io, 1);
cnt = spdk_ring_enqueue(io->ring, (void **)&io, 1, NULL);
assert(cnt == 1);
}

View File

@ -570,7 +570,7 @@ bdev_virtio_reset(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
tmf_req->type = VIRTIO_SCSI_T_TMF;
tmf_req->subtype = VIRTIO_SCSI_T_TMF_LOGICAL_UNIT_RESET;
enqueued_count = spdk_ring_enqueue(svdev->ctrlq_ring, (void **)&bdev_io, 1);
enqueued_count = spdk_ring_enqueue(svdev->ctrlq_ring, (void **)&bdev_io, 1, NULL);
if (spdk_likely(enqueued_count == 1)) {
return;
} else {

View File

@ -393,9 +393,11 @@ spdk_ring_count(struct spdk_ring *ring)
}
size_t
spdk_ring_enqueue(struct spdk_ring *ring, void **objs, size_t count)
spdk_ring_enqueue(struct spdk_ring *ring, void **objs, size_t count,
size_t *free_space)
{
return rte_ring_enqueue_bulk((struct rte_ring *)ring, objs, count, NULL);
return rte_ring_enqueue_bulk((struct rte_ring *)ring, objs, count,
(unsigned int *)free_space);
}
size_t

View File

@ -135,7 +135,7 @@ spdk_event_call(struct spdk_event *event)
reactor = spdk_reactor_get(event->lcore);
assert(reactor->events != NULL);
rc = spdk_ring_enqueue(reactor->events, (void **)&event, 1);
rc = spdk_ring_enqueue(reactor->events, (void **)&event, 1, NULL);
if (rc != 1) {
assert(false);
}

View File

@ -158,7 +158,7 @@ _ftl_reloc_prep(struct ftl_band_reloc *breloc)
for (i = 0; i < reloc->max_qdepth; ++i) {
io = ftl_io_alloc(dev->ioch);
spdk_ring_enqueue(breloc->free_queue, (void **)&io, 1);
spdk_ring_enqueue(breloc->free_queue, (void **)&io, 1, NULL);
}
}
@ -215,7 +215,7 @@ ftl_reloc_free_io(struct ftl_band_reloc *breloc, struct ftl_io *io)
{
spdk_dma_free(io->iov[0].iov_base);
free(io->lba.vector);
spdk_ring_enqueue(breloc->free_queue, (void **)&io, 1);
spdk_ring_enqueue(breloc->free_queue, (void **)&io, 1, NULL);
}
static void
@ -257,7 +257,7 @@ ftl_reloc_read_cb(void *arg, int status)
}
io->flags &= ~FTL_IO_INITIALIZED;
spdk_ring_enqueue(breloc->write_queue, (void **)&io, 1);
spdk_ring_enqueue(breloc->write_queue, (void **)&io, 1, NULL);
}
static void
@ -465,7 +465,7 @@ ftl_reloc_read(struct ftl_band_reloc *breloc, struct ftl_io *io)
num_lbks = ftl_reloc_next_lbks(breloc, &ppa);
if (!num_lbks) {
spdk_ring_enqueue(breloc->free_queue, (void **)&io, 1);
spdk_ring_enqueue(breloc->free_queue, (void **)&io, 1, NULL);
return 0;
}

View File

@ -368,7 +368,7 @@ ftl_rwb_batch_revert(struct ftl_rwb_batch *batch)
{
struct ftl_rwb *rwb = batch->rwb;
if (spdk_ring_enqueue(rwb->prio_queue, (void **)&batch, 1) != 1) {
if (spdk_ring_enqueue(rwb->prio_queue, (void **)&batch, 1, NULL) != 1) {
assert(0 && "Should never happen");
}
}
@ -385,7 +385,7 @@ ftl_rwb_push(struct ftl_rwb_entry *entry)
/* Once all of the entries are put back, push the batch on the */
/* submission queue */
if (ftl_rwb_batch_full(batch, batch_size)) {
if (spdk_ring_enqueue(rwb->submit_queue, (void **)&batch, 1) != 1) {
if (spdk_ring_enqueue(rwb->submit_queue, (void **)&batch, 1, NULL) != 1) {
assert(0 && "Should never happen");
}
}

View File

@ -692,7 +692,7 @@ spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx
msg->fn = fn;
msg->arg = ctx;
rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1);
rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL);
if (rc != 1) {
assert(false);
spdk_mempool_put(g_spdk_msg_mempool, msg);

View File

@ -358,7 +358,8 @@ spdk_ring_free(struct spdk_ring *ring)
DEFINE_RETURN_MOCK(spdk_ring_enqueue, size_t);
size_t
spdk_ring_enqueue(struct spdk_ring *ring, void **objs, size_t count)
spdk_ring_enqueue(struct spdk_ring *ring, void **objs, size_t count,
size_t *free_space)
{
struct spdk_ring_ele *ele;
size_t i;