From 32d7c91cbc48079dcd60309b073f1aa929ee8110 Mon Sep 17 00:00:00 2001 From: Jim Harris Date: Tue, 12 Jun 2018 08:11:31 -0700 Subject: [PATCH] bdev: add spdk_bdev_queue_io_wait() This function is intended to be used when an spdk_bdev I/O operation (such as spdk_bdev_write or spdk_bdev_write_blocks) fails due to spdk_bdev_io buffer exhaustion. The caller can queue an spdk_bdev_io_wait structure on the calling thread which will be invoked when an spdk_bdev_io buffer is available. While here, turn off error messages in bdev.c related to spdk_bdev_io pool exhaustion, since we now have an API designed to gracefully recover from it. Also modify bdevperf as an example of how to use this new API. Signed-off-by: Jim Harris Change-Id: Ia55f6582dc5a8d6d8bcc74689fd846d742324510 Reviewed-on: https://review.gerrithub.io/415074 Tested-by: SPDK Automated Test System Reviewed-by: Daniel Verkamp Reviewed-by: Ziye Yang Reviewed-by: Ben Walker --- include/spdk/bdev.h | 48 +++++++ lib/bdev/bdev.c | 55 +++++--- test/bdev/bdevperf/bdevperf.c | 29 +++- test/unit/lib/bdev/bdev.c/bdev_ut.c | 202 +++++++++++++++++++++++++++- 4 files changed, 311 insertions(+), 23 deletions(-) diff --git a/include/spdk/bdev.h b/include/spdk/bdev.h index 7c17fb9fd..8c87882dc 100644 --- a/include/spdk/bdev.h +++ b/include/spdk/bdev.h @@ -44,6 +44,7 @@ #include "spdk/scsi_spec.h" #include "spdk/nvme_spec.h" #include "spdk/json.h" +#include "spdk/queue.h" #ifdef __cplusplus extern "C" { @@ -889,6 +890,53 @@ int spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *bdev_desc, */ void spdk_bdev_free_io(struct spdk_bdev_io *bdev_io); +/** + * Block device I/O wait callback + * + * Callback function to notify when an spdk_bdev_io structure is available + * to satisfy a call to one of the @ref bdev_io_submit_functions. + */ +typedef void (*spdk_bdev_io_wait_cb)(void *cb_arg); + +/** + * Structure to register a callback when an spdk_bdev_io becomes available. + */ +struct spdk_bdev_io_wait_entry { + struct spdk_bdev *bdev; + spdk_bdev_io_wait_cb cb_fn; + void *cb_arg; + TAILQ_ENTRY(spdk_bdev_io_wait_entry) link; +}; + +/** + * Add an entry into the calling thread's queue to be notified when an + * spdk_bdev_io becomes available. + * + * When one of the @ref bdev_io_submit_functions returns -ENOMEM, it means + * the spdk_bdev_io buffer pool has no available buffers. This function may + * be called to register a callback to be notified when a buffer becomes + * available on the calling thread. + * + * The callback function will always be called on the same thread as this + * function was called. + * + * This function must only be called immediately after one of the + * @ref bdev_io_submit_functions returns -ENOMEM. + * + * \param bdev Block device. The block device that the caller will submit + * an I/O to when the callback is invoked. Must match the bdev + * member in the entry parameter. + * \param ch I/O channel. Obtained by calling spdk_bdev_get_io_channel(). + * \param entry Data structure allocated by the caller specifying the callback + * function and argument. + * + * \return 0 on success. + * -EINVAL if bdev parameter does not match bdev member in entry + * -EINVAL if an spdk_bdev_io structure was available on this thread. + */ +int spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch, + struct spdk_bdev_io_wait_entry *entry); + /** * Return I/O statistics for this channel. * diff --git a/lib/bdev/bdev.c b/lib/bdev/bdev.c index f009c2151..a1de0ea0a 100644 --- a/lib/bdev/bdev.c +++ b/lib/bdev/bdev.c @@ -165,7 +165,8 @@ struct spdk_bdev_mgmt_channel { uint32_t per_thread_cache_count; uint32_t bdev_io_cache_size; - TAILQ_HEAD(, spdk_bdev_shared_resource) shared_resources; + TAILQ_HEAD(, spdk_bdev_shared_resource) shared_resources; + TAILQ_HEAD(, spdk_bdev_io_wait_entry) io_wait_queue; }; /* @@ -541,6 +542,7 @@ spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf) } TAILQ_INIT(&ch->shared_resources); + TAILQ_INIT(&ch->io_wait_queue); return 0; } @@ -924,12 +926,14 @@ spdk_bdev_get_io(struct spdk_bdev_channel *channel) bdev_io = STAILQ_FIRST(&ch->per_thread_cache); STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); ch->per_thread_cache_count--; + } else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) { + /* + * Don't try to look for bdev_ios in the global pool if there are + * waiters on bdev_ios - we don't want this caller to jump the line. + */ + bdev_io = NULL; } else { bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); - if (!bdev_io) { - SPDK_ERRLOG("Unable to get spdk_bdev_io\n"); - return NULL; - } } return bdev_io; @@ -950,7 +954,16 @@ spdk_bdev_free_io(struct spdk_bdev_io *bdev_io) if (ch->per_thread_cache_count < ch->bdev_io_cache_size) { ch->per_thread_cache_count++; STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link); + while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) { + struct spdk_bdev_io_wait_entry *entry; + + entry = TAILQ_FIRST(&ch->io_wait_queue); + TAILQ_REMOVE(&ch->io_wait_queue, entry, link); + entry->cb_fn(entry->cb_arg); + } } else { + /* We should never have a full cache with entries on the io wait queue. */ + assert(TAILQ_EMPTY(&ch->io_wait_queue)); spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); } } @@ -1650,7 +1663,6 @@ spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, bdev_io = spdk_bdev_get_io(channel); if (!bdev_io) { - SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n"); return -ENOMEM; } @@ -1698,7 +1710,6 @@ int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel * bdev_io = spdk_bdev_get_io(channel); if (!bdev_io) { - SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n"); return -ENOMEM; } @@ -1747,7 +1758,6 @@ spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, bdev_io = spdk_bdev_get_io(channel); if (!bdev_io) { - SPDK_ERRLOG("bdev_io memory allocation failed duing write\n"); return -ENOMEM; } @@ -1800,7 +1810,6 @@ spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, bdev_io = spdk_bdev_get_io(channel); if (!bdev_io) { - SPDK_ERRLOG("bdev_io memory allocation failed duing writev\n"); return -ENOMEM; } @@ -1852,7 +1861,6 @@ spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channe bdev_io = spdk_bdev_get_io(channel); if (!bdev_io) { - SPDK_ERRLOG("bdev_io memory allocation failed duing write_zeroes\n"); return -ENOMEM; } @@ -1933,7 +1941,6 @@ spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, bdev_io = spdk_bdev_get_io(channel); if (!bdev_io) { - SPDK_ERRLOG("bdev_io memory allocation failed duing unmap\n"); return -ENOMEM; } @@ -1984,7 +1991,6 @@ spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, bdev_io = spdk_bdev_get_io(channel); if (!bdev_io) { - SPDK_ERRLOG("bdev_io memory allocation failed duing flush\n"); return -ENOMEM; } @@ -2091,7 +2097,6 @@ spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, bdev_io = spdk_bdev_get_io(channel); if (!bdev_io) { - SPDK_ERRLOG("bdev_io memory allocation failed duing reset\n"); return -ENOMEM; } @@ -2186,7 +2191,6 @@ spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channe bdev_io = spdk_bdev_get_io(channel); if (!bdev_io) { - SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n"); return -ENOMEM; } @@ -2224,7 +2228,6 @@ spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel * bdev_io = spdk_bdev_get_io(channel); if (!bdev_io) { - SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n"); return -ENOMEM; } @@ -2262,7 +2265,6 @@ spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channe bdev_io = spdk_bdev_get_io(channel); if (!bdev_io) { - SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n"); return -ENOMEM; } @@ -2280,6 +2282,27 @@ spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channe return 0; } +int +spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch, + struct spdk_bdev_io_wait_entry *entry) +{ + struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); + struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch; + + if (bdev != entry->bdev) { + SPDK_ERRLOG("bdevs do not match\n"); + return -EINVAL; + } + + if (mgmt_ch->per_thread_cache_count > 0) { + SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n"); + return -EINVAL; + } + + TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link); + return 0; +} + static void _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch) { diff --git a/test/bdev/bdevperf/bdevperf.c b/test/bdev/bdevperf/bdevperf.c index a816e9b10..ff925eef9 100644 --- a/test/bdev/bdevperf/bdevperf.c +++ b/test/bdev/bdevperf/bdevperf.c @@ -51,6 +51,7 @@ struct bdevperf_task { uint64_t offset_blocks; enum spdk_bdev_io_type io_type; TAILQ_ENTRY(bdevperf_task) link; + struct spdk_bdev_io_wait_entry bdev_io_wait; }; static int g_io_size = 0; @@ -330,8 +331,7 @@ bdevperf_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) } static void -bdevperf_verify_write_complete(struct spdk_bdev_io *bdev_io, bool success, - void *cb_arg) +bdevperf_verify_submit_read(void *cb_arg) { struct io_target *target; struct bdevperf_task *task = cb_arg; @@ -342,14 +342,24 @@ bdevperf_verify_write_complete(struct spdk_bdev_io *bdev_io, bool success, /* Read the data back in */ rc = spdk_bdev_read_blocks(target->bdev_desc, target->ch, NULL, task->offset_blocks, target->io_size_blocks, bdevperf_complete, task); - if (rc) { + if (rc == -ENOMEM) { + task->bdev_io_wait.bdev = target->bdev; + task->bdev_io_wait.cb_fn = bdevperf_verify_submit_read; + task->bdev_io_wait.cb_arg = task; + spdk_bdev_queue_io_wait(target->bdev, target->ch, &task->bdev_io_wait); + } else if (rc != 0) { printf("Failed to submit read: %d\n", rc); target->is_draining = true; g_run_failed = true; - return; } +} +static void +bdevperf_verify_write_complete(struct spdk_bdev_io *bdev_io, bool success, + void *cb_arg) +{ spdk_bdev_free_io(bdev_io); + bdevperf_verify_submit_read(cb_arg); } static __thread unsigned int seed = 0; @@ -390,8 +400,9 @@ bdevperf_prep_task(struct bdevperf_task *task) } static void -bdevperf_submit_task(struct bdevperf_task *task) +bdevperf_submit_task(void *arg) { + struct bdevperf_task *task = arg; struct io_target *target = task->target; struct spdk_bdev_desc *desc; struct spdk_io_channel *ch; @@ -427,7 +438,13 @@ bdevperf_submit_task(struct bdevperf_task *task) break; } - if (rc) { + if (rc == -ENOMEM) { + task->bdev_io_wait.bdev = target->bdev; + task->bdev_io_wait.cb_fn = bdevperf_submit_task; + task->bdev_io_wait.cb_arg = task; + spdk_bdev_queue_io_wait(target->bdev, ch, &task->bdev_io_wait); + return; + } else if (rc != 0) { printf("Failed to submit bdev_io: %d\n", rc); target->is_draining = true; g_run_failed = true; diff --git a/test/unit/lib/bdev/bdev.c/bdev_ut.c b/test/unit/lib/bdev/bdev.c/bdev_ut.c index d7706b0a8..9f3075521 100644 --- a/test/unit/lib/bdev/bdev.c/bdev_ut.c +++ b/test/unit/lib/bdev/bdev.c/bdev_ut.c @@ -77,19 +77,113 @@ stub_destruct(void *ctx) return 0; } +struct bdev_ut_channel { + TAILQ_HEAD(, spdk_bdev_io) outstanding_io; + uint32_t outstanding_io_count; +}; + +static uint32_t g_bdev_ut_io_device; +static struct bdev_ut_channel *g_bdev_ut_channel; + +static void +stub_submit_request(struct spdk_io_channel *_ch, struct spdk_bdev_io *bdev_io) +{ + struct bdev_ut_channel *ch = spdk_io_channel_get_ctx(_ch); + + TAILQ_INSERT_TAIL(&ch->outstanding_io, bdev_io, module_link); + ch->outstanding_io_count++; +} + +static uint32_t +stub_complete_io(uint32_t num_to_complete) +{ + struct bdev_ut_channel *ch = g_bdev_ut_channel; + struct spdk_bdev_io *bdev_io; + uint32_t num_completed = 0; + + while (num_completed < num_to_complete) { + if (TAILQ_EMPTY(&ch->outstanding_io)) { + break; + } + bdev_io = TAILQ_FIRST(&ch->outstanding_io); + TAILQ_REMOVE(&ch->outstanding_io, bdev_io, module_link); + ch->outstanding_io_count--; + spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS); + num_completed++; + } + + return num_completed; +} + +static struct spdk_io_channel * +bdev_ut_get_io_channel(void *ctx) +{ + return spdk_get_io_channel(&g_bdev_ut_io_device); +} + static struct spdk_bdev_fn_table fn_table = { .destruct = stub_destruct, + .submit_request = stub_submit_request, + .get_io_channel = bdev_ut_get_io_channel, }; +static int +bdev_ut_create_ch(void *io_device, void *ctx_buf) +{ + struct bdev_ut_channel *ch = ctx_buf; + + CU_ASSERT(g_bdev_ut_channel == NULL); + g_bdev_ut_channel = ch; + + TAILQ_INIT(&ch->outstanding_io); + ch->outstanding_io_count = 0; + return 0; +} + +static void +bdev_ut_destroy_ch(void *io_device, void *ctx_buf) +{ + CU_ASSERT(g_bdev_ut_channel != NULL); + g_bdev_ut_channel = NULL; +} + +static int +bdev_ut_module_init(void) +{ + spdk_io_device_register(&g_bdev_ut_io_device, bdev_ut_create_ch, bdev_ut_destroy_ch, + sizeof(struct bdev_ut_channel)); + return 0; +} + +static void +bdev_ut_module_fini(void) +{ +} + struct spdk_bdev_module bdev_ut_if = { .name = "bdev_ut", + .module_init = bdev_ut_module_init, + .module_fini = bdev_ut_module_fini, }; static void vbdev_ut_examine(struct spdk_bdev *bdev); +static int +vbdev_ut_module_init(void) +{ + return 0; +} + +static void +vbdev_ut_module_fini(void) +{ +} + struct spdk_bdev_module vbdev_ut_if = { .name = "vbdev_ut", .examine = vbdev_ut_examine, + .module_init = vbdev_ut_module_init, + .module_fini = vbdev_ut_module_fini, }; SPDK_BDEV_MODULE_REGISTER(&bdev_ut_if) @@ -152,6 +246,7 @@ allocate_bdev(char *name) bdev->name = name; bdev->fn_table = &fn_table; bdev->module = &bdev_ut_if; + bdev->blockcnt = 1; rc = spdk_bdev_register(bdev); CU_ASSERT(rc == 0); @@ -543,6 +638,110 @@ alias_add_del_test(void) free(bdev[1]); } +static void +io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) +{ + spdk_bdev_free_io(bdev_io); +} + +static void +bdev_init_cb(void *arg, int rc) +{ + CU_ASSERT(rc == 0); +} + +struct bdev_ut_io_wait_entry { + struct spdk_bdev_io_wait_entry entry; + struct spdk_io_channel *io_ch; + struct spdk_bdev_desc *desc; + bool submitted; +}; + +static void +io_wait_cb(void *arg) +{ + struct bdev_ut_io_wait_entry *entry = arg; + int rc; + + rc = spdk_bdev_read_blocks(entry->desc, entry->io_ch, NULL, 0, 1, io_done, NULL); + CU_ASSERT(rc == 0); + entry->submitted = true; +} + +static void +bdev_io_wait_test(void) +{ + struct spdk_bdev *bdev; + struct spdk_bdev_desc *desc; + struct spdk_io_channel *io_ch; + struct spdk_bdev_opts bdev_opts = { + .bdev_io_pool_size = 4, + .bdev_io_cache_size = 2, + }; + struct bdev_ut_io_wait_entry io_wait_entry; + struct bdev_ut_io_wait_entry io_wait_entry2; + int rc; + + rc = spdk_bdev_set_opts(&bdev_opts); + CU_ASSERT(rc == 0); + spdk_bdev_initialize(bdev_init_cb, NULL); + + bdev = allocate_bdev("bdev0"); + + rc = spdk_bdev_open(bdev, true, NULL, NULL, &desc); + CU_ASSERT(rc == 0); + CU_ASSERT(desc != NULL); + io_ch = spdk_bdev_get_io_channel(desc); + CU_ASSERT(io_ch != NULL); + + rc = spdk_bdev_read_blocks(desc, io_ch, NULL, 0, 1, io_done, NULL); + CU_ASSERT(rc == 0); + rc = spdk_bdev_read_blocks(desc, io_ch, NULL, 0, 1, io_done, NULL); + CU_ASSERT(rc == 0); + rc = spdk_bdev_read_blocks(desc, io_ch, NULL, 0, 1, io_done, NULL); + CU_ASSERT(rc == 0); + rc = spdk_bdev_read_blocks(desc, io_ch, NULL, 0, 1, io_done, NULL); + CU_ASSERT(rc == 0); + CU_ASSERT(g_bdev_ut_channel->outstanding_io_count == 4); + + rc = spdk_bdev_read_blocks(desc, io_ch, NULL, 0, 1, io_done, NULL); + CU_ASSERT(rc == -ENOMEM); + + io_wait_entry.entry.bdev = bdev; + io_wait_entry.entry.cb_fn = io_wait_cb; + io_wait_entry.entry.cb_arg = &io_wait_entry; + io_wait_entry.io_ch = io_ch; + io_wait_entry.desc = desc; + io_wait_entry.submitted = false; + /* Cannot use the same io_wait_entry for two different calls. */ + memcpy(&io_wait_entry2, &io_wait_entry, sizeof(io_wait_entry)); + io_wait_entry2.entry.cb_arg = &io_wait_entry2; + + /* Queue two I/O waits. */ + rc = spdk_bdev_queue_io_wait(bdev, io_ch, &io_wait_entry.entry); + CU_ASSERT(rc == 0); + CU_ASSERT(io_wait_entry.submitted == false); + rc = spdk_bdev_queue_io_wait(bdev, io_ch, &io_wait_entry2.entry); + CU_ASSERT(rc == 0); + CU_ASSERT(io_wait_entry2.submitted == false); + + stub_complete_io(1); + CU_ASSERT(g_bdev_ut_channel->outstanding_io_count == 4); + CU_ASSERT(io_wait_entry.submitted == true); + CU_ASSERT(io_wait_entry2.submitted == false); + + stub_complete_io(1); + CU_ASSERT(g_bdev_ut_channel->outstanding_io_count == 4); + CU_ASSERT(io_wait_entry2.submitted == true); + + stub_complete_io(4); + CU_ASSERT(g_bdev_ut_channel->outstanding_io_count == 0); + + spdk_put_io_channel(io_ch); + spdk_bdev_close(desc); + free_bdev(bdev); +} + int main(int argc, char **argv) { @@ -565,7 +764,8 @@ main(int argc, char **argv) CU_add_test(suite, "io_valid", io_valid_test) == NULL || CU_add_test(suite, "open_write", open_write_test) == NULL || CU_add_test(suite, "alias_add_del", alias_add_del_test) == NULL || - CU_add_test(suite, "get_device_stat", get_device_stat_test) == NULL + CU_add_test(suite, "get_device_stat", get_device_stat_test) == NULL || + CU_add_test(suite, "bdev_io_wait", bdev_io_wait_test) == NULL ) { CU_cleanup_registry(); return CU_get_error();