lib/ftl: pending batches queue

Added queue responsible for keeping track of full, ready to be written
batches.  A batch might be put on this queue in case it'd already been
filled, but could not be written out due to lack of resources or it was
written out, but failed and needs to be resent.

Change-Id: Iba49cd359425300d21b8100f13f17189e99d4c7c
Signed-off-by: Konrad Sztyber <konrad.sztyber@intel.com>
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/908
Tested-by: SPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: Jim Harris <james.r.harris@intel.com>
Reviewed-by: Shuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
Reviewed-by: Maciej Szczepaniak <maciej.szczepaniak@intel.com>
Reviewed-by: Wojciech Malikowski <wojciech.malikowski@intel.com>
This commit is contained in:
Konrad Sztyber 2020-02-13 13:36:46 +01:00 committed by Tomasz Zawadzki
parent 17ad5c8ea3
commit 925cc3b8a9
4 changed files with 47 additions and 1 deletions

View File

@ -219,6 +219,12 @@ ftl_get_next_batch(struct spdk_ftl_dev *dev)
uint64_t *metadata;
if (batch == NULL) {
batch = TAILQ_FIRST(&dev->pending_batches);
if (batch != NULL) {
TAILQ_REMOVE(&dev->pending_batches, batch, tailq);
return batch;
}
batch = TAILQ_FIRST(&dev->free_batches);
if (spdk_unlikely(batch == NULL)) {
return NULL;

View File

@ -234,6 +234,10 @@ struct spdk_ftl_dev {
struct iovec *iov_buf;
/* Batch currently being filled */
struct ftl_batch *current_batch;
/* Full and ready to be sent batches. A batch is put on this queue in
* case it's already filled, but cannot be sent.
*/
TAILQ_HEAD(, ftl_batch) pending_batches;
TAILQ_HEAD(, ftl_batch) free_batches;
/* Devices' list */

View File

@ -1225,6 +1225,7 @@ ftl_dev_init_io_channel(struct spdk_ftl_dev *dev)
}
TAILQ_INIT(&dev->free_batches);
TAILQ_INIT(&dev->pending_batches);
TAILQ_INIT(&dev->ioch_queue);
for (i = 0; i < FTL_BATCH_COUNT; ++i) {

View File

@ -807,7 +807,7 @@ test_submit_batch(void)
struct spdk_io_channel **_ioch_array;
struct ftl_io_channel **ioch_array;
struct ftl_wbuf_entry *entry;
struct ftl_batch *batch;
struct ftl_batch *batch, *batch2;
uint32_t num_io_channels = 16;
uint32_t ioch_idx, tmp_idx, entry_idx;
uint64_t ioch_bitmap;
@ -906,6 +906,41 @@ test_submit_batch(void)
ioch_array[ioch_idx]->num_entries);
}
/* Make sure pending batches are prioritized */
for (ioch_idx = 0; ioch_idx < num_io_channels; ++ioch_idx) {
set_thread(ioch_idx);
while (spdk_ring_count(ioch_array[ioch_idx]->submit_queue) < dev->xfer_size) {
entry = ftl_acquire_wbuf_entry(ioch_array[ioch_idx], 0);
SPDK_CU_ASSERT_FATAL(entry != NULL);
num_entries = spdk_ring_enqueue(ioch_array[ioch_idx]->submit_queue,
(void **)&entry, 1, NULL);
CU_ASSERT(num_entries == 1);
}
}
batch = ftl_get_next_batch(dev);
SPDK_CU_ASSERT_FATAL(batch != NULL);
TAILQ_INSERT_TAIL(&dev->pending_batches, batch, tailq);
batch2 = ftl_get_next_batch(dev);
SPDK_CU_ASSERT_FATAL(batch2 != NULL);
CU_ASSERT(TAILQ_EMPTY(&dev->pending_batches));
CU_ASSERT(batch == batch2);
batch = ftl_get_next_batch(dev);
SPDK_CU_ASSERT_FATAL(batch != NULL);
ftl_release_batch(dev, batch);
ftl_release_batch(dev, batch2);
for (ioch_idx = 2; ioch_idx < num_io_channels; ++ioch_idx) {
batch = ftl_get_next_batch(dev);
SPDK_CU_ASSERT_FATAL(batch != NULL);
ftl_release_batch(dev, batch);
}
for (ioch_idx = 0; ioch_idx < num_io_channels; ++ioch_idx) {
set_thread(ioch_idx);
spdk_put_io_channel(_ioch_array[ioch_idx]);