From 35fdf2dd9ed61b1effe80d20d5e9f3344726b13f Mon Sep 17 00:00:00 2001 From: Konrad Sztyber Date: Tue, 11 Feb 2020 13:38:16 +0100 Subject: [PATCH] lib/ftl: write buffer batches This patch adds the ftl_batch strucutre, which describes a single batch of data to be written to the disk. It's comprised of multiple write buffer entries (the actual number depends on the write unit size of the underyling device). Additionally, a function responsible for filling out a batch was added. It iterates over available IO channels and dequeues submitted write buffer entries until a batch is completed. The IO channel queue is shifted, so that each time the function is called a subsequent IO channel is used first, which guarantees that all channels are treated fairly. Change-Id: Ie61d8c6cb51d5c5175540c975447bc27590c5cb4 Signed-off-by: Konrad Sztyber Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/905 Tested-by: SPDK CI Jenkins Reviewed-by: Wojciech Malikowski Reviewed-by: Jim Harris Reviewed-by: Shuhei Matsumoto --- lib/ftl/ftl_core.c | 91 +++++++++++++++- lib/ftl/ftl_core.h | 20 ++++ lib/ftl/ftl_init.c | 21 ++++ lib/ftl/ftl_io.h | 1 + test/unit/lib/ftl/ftl_io.c/ftl_io_ut.c | 137 +++++++++++++++++++++++-- 5 files changed, 259 insertions(+), 11 deletions(-) diff --git a/lib/ftl/ftl_core.c b/lib/ftl/ftl_core.c index 8930d34f8..9dbb71462 100644 --- a/lib/ftl/ftl_core.c +++ b/lib/ftl/ftl_core.c @@ -193,9 +193,7 @@ ftl_acquire_wbuf_entry(struct ftl_io_channel *io_channel, int io_flags) return entry; } -void ftl_release_wbuf_entry(struct ftl_wbuf_entry *entry); - -void +static void ftl_release_wbuf_entry(struct ftl_wbuf_entry *entry) { struct ftl_io_channel *io_channel = entry->ioch; @@ -207,6 +205,93 @@ ftl_release_wbuf_entry(struct ftl_wbuf_entry *entry) spdk_ring_enqueue(io_channel->free_queue, (void **)&entry, 1, NULL); } +struct ftl_batch *ftl_get_next_batch(struct spdk_ftl_dev *dev); + +struct ftl_batch * +ftl_get_next_batch(struct spdk_ftl_dev *dev) +{ + struct ftl_batch *batch = dev->current_batch; + struct ftl_io_channel *ioch; +#define FTL_DEQUEUE_ENTRIES 128 + struct ftl_wbuf_entry *entries[FTL_DEQUEUE_ENTRIES]; + TAILQ_HEAD(, ftl_io_channel) ioch_queue; + size_t i, num_dequeued, num_remaining; + + if (batch == NULL) { + batch = TAILQ_FIRST(&dev->free_batches); + if (spdk_unlikely(batch == NULL)) { + return NULL; + } + + assert(TAILQ_EMPTY(&batch->entries)); + assert(batch->num_entries == 0); + TAILQ_REMOVE(&dev->free_batches, batch, tailq); + } + + /* + * Keep shifting the queue to ensure fairness in IO channel selection. Each time + * ftl_get_next_batch() is called, we're starting to dequeue write buffer entries from a + * different IO channel. + */ + TAILQ_INIT(&ioch_queue); + while (!TAILQ_EMPTY(&dev->ioch_queue)) { + ioch = TAILQ_FIRST(&dev->ioch_queue); + TAILQ_REMOVE(&dev->ioch_queue, ioch, tailq); + TAILQ_INSERT_TAIL(&ioch_queue, ioch, tailq); + + num_remaining = dev->xfer_size - batch->num_entries; + while (num_remaining > 0) { + num_dequeued = spdk_ring_dequeue(ioch->submit_queue, (void **)entries, + spdk_min(num_remaining, + FTL_DEQUEUE_ENTRIES)); + if (num_dequeued == 0) { + break; + } + + for (i = 0; i < num_dequeued; ++i) { + batch->iov[batch->num_entries + i].iov_base = entries[i]->payload; + batch->iov[batch->num_entries + i].iov_len = FTL_BLOCK_SIZE; + TAILQ_INSERT_TAIL(&batch->entries, entries[i], tailq); + } + + batch->num_entries += num_dequeued; + num_remaining -= num_dequeued; + } + + if (num_remaining == 0) { + break; + } + } + + TAILQ_CONCAT(&dev->ioch_queue, &ioch_queue, tailq); + + if (batch->num_entries == dev->xfer_size) { + dev->current_batch = NULL; + } else { + dev->current_batch = batch; + batch = NULL; + } + + return batch; +} + +void ftl_release_batch(struct spdk_ftl_dev *dev, struct ftl_batch *batch); + +void +ftl_release_batch(struct spdk_ftl_dev *dev, struct ftl_batch *batch) +{ + struct ftl_wbuf_entry *entry; + + while (!TAILQ_EMPTY(&batch->entries)) { + entry = TAILQ_FIRST(&batch->entries); + TAILQ_REMOVE(&batch->entries, entry, tailq); + ftl_release_wbuf_entry(entry); + } + + batch->num_entries = 0; + TAILQ_INSERT_TAIL(&dev->free_batches, batch, tailq); +} + static void ftl_io_cmpl_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) { diff --git a/lib/ftl/ftl_core.h b/lib/ftl/ftl_core.h index cc869e009..5485cd134 100644 --- a/lib/ftl/ftl_core.h +++ b/lib/ftl/ftl_core.h @@ -107,6 +107,17 @@ struct ftl_nv_cache { pthread_spinlock_t lock; }; +struct ftl_batch { + /* Queue of write buffer entries, can reach up to xfer_size entries */ + TAILQ_HEAD(, ftl_wbuf_entry) entries; + /* Number of entries in the queue above */ + uint32_t num_entries; + /* Index within spdk_ftl_dev.batch_array */ + uint32_t index; + struct iovec *iov; + TAILQ_ENTRY(ftl_batch) tailq; +}; + struct spdk_ftl_dev { /* Device instance */ struct spdk_uuid uuid; @@ -209,6 +220,15 @@ struct spdk_ftl_dev { TAILQ_HEAD(, ftl_io_channel) ioch_queue; uint64_t num_io_channels; + /* Write buffer batches */ +#define FTL_BATCH_COUNT 4096 + struct ftl_batch batch_array[FTL_BATCH_COUNT]; + /* Iovec buffer used by batches */ + struct iovec *iov_buf; + /* Batch currently being filled */ + struct ftl_batch *current_batch; + TAILQ_HEAD(, ftl_batch) free_batches; + /* Devices' list */ STAILQ_ENTRY(spdk_ftl_dev) stailq; }; diff --git a/lib/ftl/ftl_init.c b/lib/ftl/ftl_init.c index 8f64fdc96..3d3576a98 100644 --- a/lib/ftl/ftl_init.c +++ b/lib/ftl/ftl_init.c @@ -1195,13 +1195,33 @@ ftl_io_channel_destroy_cb(void *io_device, void *ctx) static int ftl_dev_init_io_channel(struct spdk_ftl_dev *dev) { + struct ftl_batch *batch; + uint32_t i; + dev->ioch_array = calloc(dev->conf.max_io_channels, sizeof(*dev->ioch_array)); if (!dev->ioch_array) { SPDK_ERRLOG("Failed to allocate IO channel array\n"); return -1; } + dev->iov_buf = calloc(FTL_BATCH_COUNT, dev->xfer_size * sizeof(struct iovec)); + if (!dev->iov_buf) { + SPDK_ERRLOG("Failed to allocate iovec buffer\n"); + return -1; + } + + TAILQ_INIT(&dev->free_batches); TAILQ_INIT(&dev->ioch_queue); + + for (i = 0; i < FTL_BATCH_COUNT; ++i) { + batch = &dev->batch_array[i]; + batch->iov = &dev->iov_buf[i * dev->xfer_size]; + batch->num_entries = 0; + batch->index = i; + TAILQ_INIT(&batch->entries); + TAILQ_INSERT_TAIL(&dev->free_batches, batch, tailq); + } + dev->num_io_channels = 0; spdk_io_device_register(dev, ftl_io_channel_create_cb, ftl_io_channel_destroy_cb, @@ -1345,6 +1365,7 @@ ftl_dev_free_sync(struct spdk_ftl_dev *dev) assert(dev->num_io_channels == 0); free(dev->ioch_array); + free(dev->iov_buf); free(dev->name); free(dev->bands); free(dev->l2p); diff --git a/lib/ftl/ftl_io.h b/lib/ftl/ftl_io.h index 58d090211..940a4814b 100644 --- a/lib/ftl/ftl_io.h +++ b/lib/ftl/ftl_io.h @@ -154,6 +154,7 @@ struct ftl_wbuf_entry { bool valid; /* Lock that protects the entry from being evicted from the L2P */ pthread_spinlock_t lock; + TAILQ_ENTRY(ftl_wbuf_entry) tailq; }; struct ftl_io_channel { diff --git a/test/unit/lib/ftl/ftl_io.c/ftl_io_ut.c b/test/unit/lib/ftl/ftl_io.c/ftl_io_ut.c index 47acbb0c8..665f7f12e 100644 --- a/test/unit/lib/ftl/ftl_io.c/ftl_io_ut.c +++ b/test/unit/lib/ftl/ftl_io.c/ftl_io_ut.c @@ -94,7 +94,7 @@ channel_destroy_cb(void *io_device, void *ctx) {} static struct spdk_ftl_dev * -setup_device(uint32_t num_threads) +setup_device(uint32_t num_threads, uint32_t xfer_size) { struct spdk_ftl_dev *dev; struct _ftl_io_channel *_ioch; @@ -121,6 +121,7 @@ setup_device(uint32_t num_threads) SPDK_CU_ASSERT_FATAL(ioch->io_pool != NULL); dev->conf = g_default_conf; + dev->xfer_size = xfer_size; dev->base_bdev_desc = (struct spdk_bdev_desc *)0xdeadbeef; spdk_io_device_register(dev->base_bdev_desc, channel_create_cb, channel_destroy_cb, 0, NULL); @@ -144,6 +145,7 @@ free_device(struct spdk_ftl_dev *dev) free_threads(); free(dev->ioch_array); + free(dev->iov_buf); free(dev->ioch); free(dev); } @@ -183,7 +185,7 @@ test_completion(void) int req, status = 0; size_t pool_size; - dev = setup_device(1); + dev = setup_device(1, 16); ioch = ftl_io_channel_get_ctx(dev->ioch); pool_size = spdk_mempool_count(ioch->io_pool); @@ -225,7 +227,7 @@ test_alloc_free(void) int parent_status = -1; size_t pool_size; - dev = setup_device(1); + dev = setup_device(1, 16); ioch = ftl_io_channel_get_ctx(dev->ioch); pool_size = spdk_mempool_count(ioch->io_pool); @@ -271,7 +273,7 @@ test_child_requests(void) int status[MAX_CHILDREN + 1], i; size_t pool_size; - dev = setup_device(1); + dev = setup_device(1, 16); ioch = ftl_io_channel_get_ctx(dev->ioch); pool_size = spdk_mempool_count(ioch->io_pool); @@ -371,7 +373,7 @@ test_child_status(void) int parent_status, child_status[2]; size_t pool_size, i; - dev = setup_device(1); + dev = setup_device(1, 16); ioch = ftl_io_channel_get_ctx(dev->ioch); pool_size = spdk_mempool_count(ioch->io_pool); @@ -458,7 +460,7 @@ test_multi_generation(void) size_t pool_size; int i, j; - dev = setup_device(1); + dev = setup_device(1, 16); ioch = ftl_io_channel_get_ctx(dev->ioch); pool_size = spdk_mempool_count(ioch->io_pool); @@ -583,7 +585,7 @@ test_io_channel_create(void) struct ftl_io_channel *ftl_ioch; uint32_t ioch_idx; - dev = setup_device(g_default_conf.max_io_channels + 1); + dev = setup_device(g_default_conf.max_io_channels + 1, 16); ioch = spdk_get_io_channel(dev); CU_ASSERT(ioch != NULL); @@ -656,7 +658,7 @@ test_acquire_entry(void) uint32_t num_entries, num_io_channels = 2; uint32_t ioch_idx, entry_idx, tmp_idx; - dev = setup_device(num_io_channels); + dev = setup_device(num_io_channels, 16); num_entries = dev->conf.rwb_size / FTL_BLOCK_SIZE; entries = calloc(num_entries * num_io_channels, sizeof(*entries)); @@ -798,6 +800,123 @@ test_acquire_entry(void) free_device(dev); } +static void +test_submit_batch(void) +{ + struct spdk_ftl_dev *dev; + struct spdk_io_channel **_ioch_array; + struct ftl_io_channel **ioch_array; + struct ftl_wbuf_entry *entry; + struct ftl_batch *batch; + uint32_t num_io_channels = 16; + uint32_t ioch_idx, tmp_idx, entry_idx; + uint64_t ioch_bitmap; + size_t num_entries; + + dev = setup_device(num_io_channels, num_io_channels); + + _ioch_array = calloc(num_io_channels, sizeof(*_ioch_array)); + SPDK_CU_ASSERT_FATAL(_ioch_array != NULL); + ioch_array = calloc(num_io_channels, sizeof(*ioch_array)); + SPDK_CU_ASSERT_FATAL(ioch_array != NULL); + + for (ioch_idx = 0; ioch_idx < num_io_channels; ++ioch_idx) { + set_thread(ioch_idx); + _ioch_array[ioch_idx] = spdk_get_io_channel(dev); + SPDK_CU_ASSERT_FATAL(_ioch_array[ioch_idx] != NULL); + ioch_array[ioch_idx] = ftl_io_channel_get_ctx(_ioch_array[ioch_idx]); + poll_threads(); + } + + /* Make sure the IO channels are not starved and entries are popped in RR fashion */ + for (ioch_idx = 0; ioch_idx < num_io_channels; ++ioch_idx) { + set_thread(ioch_idx); + + for (entry_idx = 0; entry_idx < dev->xfer_size; ++entry_idx) { + entry = ftl_acquire_wbuf_entry(ioch_array[ioch_idx], 0); + SPDK_CU_ASSERT_FATAL(entry != NULL); + + num_entries = spdk_ring_enqueue(ioch_array[ioch_idx]->submit_queue, + (void **)&entry, 1, NULL); + CU_ASSERT(num_entries == 1); + } + } + + for (ioch_idx = 0; ioch_idx < num_io_channels; ++ioch_idx) { + for (tmp_idx = 0; tmp_idx < ioch_idx; ++tmp_idx) { + set_thread(tmp_idx); + + while (spdk_ring_count(ioch_array[tmp_idx]->submit_queue) < dev->xfer_size) { + entry = ftl_acquire_wbuf_entry(ioch_array[tmp_idx], 0); + SPDK_CU_ASSERT_FATAL(entry != NULL); + + num_entries = spdk_ring_enqueue(ioch_array[tmp_idx]->submit_queue, + (void **)&entry, 1, NULL); + CU_ASSERT(num_entries == 1); + } + } + + set_thread(ioch_idx); + + batch = ftl_get_next_batch(dev); + SPDK_CU_ASSERT_FATAL(batch != NULL); + + TAILQ_FOREACH(entry, &batch->entries, tailq) { + CU_ASSERT(entry->ioch == ioch_array[ioch_idx]); + } + + ftl_release_batch(dev, batch); + + CU_ASSERT(spdk_ring_count(ioch_array[ioch_idx]->free_queue) == + ioch_array[ioch_idx]->num_entries); + } + + for (ioch_idx = 0; ioch_idx < num_io_channels - 1; ++ioch_idx) { + batch = ftl_get_next_batch(dev); + SPDK_CU_ASSERT_FATAL(batch != NULL); + ftl_release_batch(dev, batch); + } + + /* Make sure the batch can be built from entries from any IO channel */ + for (ioch_idx = 0; ioch_idx < num_io_channels; ++ioch_idx) { + set_thread(ioch_idx); + entry = ftl_acquire_wbuf_entry(ioch_array[ioch_idx], 0); + SPDK_CU_ASSERT_FATAL(entry != NULL); + + num_entries = spdk_ring_enqueue(ioch_array[ioch_idx]->submit_queue, + (void **)&entry, 1, NULL); + CU_ASSERT(num_entries == 1); + } + + batch = ftl_get_next_batch(dev); + SPDK_CU_ASSERT_FATAL(batch != NULL); + + ioch_bitmap = 0; + TAILQ_FOREACH(entry, &batch->entries, tailq) { + ioch_bitmap |= 1 << entry->ioch->index; + } + + for (ioch_idx = 0; ioch_idx < num_io_channels; ++ioch_idx) { + CU_ASSERT((ioch_bitmap & (1 << ioch_array[ioch_idx]->index)) != 0); + } + ftl_release_batch(dev, batch); + + for (ioch_idx = 0; ioch_idx < num_io_channels; ++ioch_idx) { + CU_ASSERT(spdk_ring_count(ioch_array[ioch_idx]->free_queue) == + ioch_array[ioch_idx]->num_entries); + } + + for (ioch_idx = 0; ioch_idx < num_io_channels; ++ioch_idx) { + set_thread(ioch_idx); + spdk_put_io_channel(_ioch_array[ioch_idx]); + } + poll_threads(); + + free(_ioch_array); + free(ioch_array); + free_device(dev); +} + int main(int argc, char **argv) { @@ -829,6 +948,8 @@ main(int argc, char **argv) test_io_channel_create) == NULL || CU_add_test(suite, "test_acquire_entry", test_acquire_entry) == NULL + || CU_add_test(suite, "test_submit_batch", + test_submit_batch) == NULL ) { CU_cleanup_registry();