diff --git a/include/spdk_internal/bdev.h b/include/spdk_internal/bdev.h index 1695114de..accdbbe2b 100644 --- a/include/spdk_internal/bdev.h +++ b/include/spdk_internal/bdev.h @@ -218,6 +218,21 @@ struct spdk_bdev { /** Number of blocks */ uint64_t blockcnt; + /** QoS per second */ + uint64_t ios_per_sec; + + /** Number of active channels on this bdev except the QoS bdev channel */ + uint32_t channel_count; + + /** QoS bdev channel for this bdev */ + struct spdk_bdev_channel *qos_channel; + + /** QoS thread for this bdev */ + struct spdk_thread *qos_thread; + + /** Indicate an async destroying event is on going */ + bool qos_channel_destroying; + /** write cache enabled, not used at the moment */ int write_cache; @@ -290,9 +305,12 @@ struct spdk_bdev_io { /** The block device that this I/O belongs to. */ struct spdk_bdev *bdev; - /** The bdev I/O channel that this was submitted on. */ + /** The bdev I/O channel that this was handled on. */ struct spdk_bdev_channel *ch; + /** The bdev I/O channel that this was submitted on. */ + struct spdk_bdev_channel *io_submit_ch; + /** The mgmt channel that this I/O was allocated from. */ struct spdk_bdev_mgmt_channel *mgmt_ch; diff --git a/lib/bdev/bdev.c b/lib/bdev/bdev.c index 118ea1cea..60795fbc7 100644 --- a/lib/bdev/bdev.c +++ b/lib/bdev/bdev.c @@ -55,13 +55,15 @@ int __itt_init_ittlib(const char *, __itt_group_id); #endif -#define SPDK_BDEV_IO_POOL_SIZE (64 * 1024) -#define SPDK_BDEV_IO_CACHE_SIZE 256 -#define BUF_SMALL_POOL_SIZE 8192 -#define BUF_LARGE_POOL_SIZE 1024 -#define NOMEM_THRESHOLD_COUNT 8 -#define ZERO_BUFFER_SIZE 0x100000 -#define SPDK_BDEV_QOS_TIMESLICE_IN_US 1000 +#define SPDK_BDEV_IO_POOL_SIZE (64 * 1024) +#define SPDK_BDEV_IO_CACHE_SIZE 256 +#define BUF_SMALL_POOL_SIZE 8192 +#define BUF_LARGE_POOL_SIZE 1024 +#define NOMEM_THRESHOLD_COUNT 8 +#define ZERO_BUFFER_SIZE 0x100000 +#define SPDK_BDEV_QOS_TIMESLICE_IN_USEC 1000 +#define SPDK_BDEV_SEC_TO_USEC 1000000ULL +#define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE 1 typedef TAILQ_HEAD(, spdk_bdev_io) bdev_io_tailq_t; typedef STAILQ_HEAD(, spdk_bdev_io) bdev_io_stailq_t; @@ -127,6 +129,7 @@ struct spdk_bdev_desc { }; #define BDEV_CH_RESET_IN_PROGRESS (1 << 0) +#define BDEV_CH_QOS_ENABLED (1 << 1) struct spdk_bdev_channel { struct spdk_bdev *bdev; @@ -828,15 +831,35 @@ spdk_bdev_put_io(struct spdk_bdev_io *bdev_io) } static void -spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io) +_spdk_bdev_qos_io_submit(void *ctx) { + struct spdk_bdev_channel *ch = ctx; + struct spdk_bdev_io *bdev_io = NULL; + struct spdk_bdev *bdev = ch->bdev; + struct spdk_bdev_module_channel *shared_ch = ch->module_ch; + + while (!TAILQ_EMPTY(&ch->qos_io)) { + if (ch->io_submitted_this_timeslice < ch->qos_max_ios_per_timeslice) { + bdev_io = TAILQ_FIRST(&ch->qos_io); + TAILQ_REMOVE(&ch->qos_io, bdev_io, link); + ch->io_submitted_this_timeslice++; + shared_ch->io_outstanding++; + bdev->fn_table->submit_request(ch->channel, bdev_io); + } else { + break; + } + } +} + +static void +_spdk_bdev_io_submit(void *ctx) +{ + struct spdk_bdev_io *bdev_io = ctx; struct spdk_bdev *bdev = bdev_io->bdev; struct spdk_bdev_channel *bdev_ch = bdev_io->ch; struct spdk_io_channel *ch = bdev_ch->channel; struct spdk_bdev_module_channel *shared_ch = bdev_ch->module_ch; - assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING); - bdev_io->submit_tsc = spdk_get_ticks(); shared_ch->io_outstanding++; bdev_io->in_submit_request = true; @@ -849,6 +872,10 @@ spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io) } } else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) { spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); + } else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) { + shared_ch->io_outstanding--; + TAILQ_INSERT_TAIL(&bdev_ch->qos_io, bdev_io, link); + _spdk_bdev_qos_io_submit(bdev_ch); } else { SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags); spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); @@ -856,6 +883,23 @@ spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io) bdev_io->in_submit_request = false; } +static void +spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io) +{ + struct spdk_bdev *bdev = bdev_io->bdev; + + assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING); + + /* QoS channel and thread have been properly configured */ + if (bdev->ios_per_sec > 0 && bdev->qos_channel && bdev->qos_thread) { + bdev_io->io_submit_ch = bdev_io->ch; + bdev_io->ch = bdev->qos_channel; + spdk_thread_send_msg(bdev->qos_thread, _spdk_bdev_io_submit, bdev_io); + } else { + _spdk_bdev_io_submit(bdev_io); + } +} + static void spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io) { @@ -881,6 +925,7 @@ spdk_bdev_io_init(struct spdk_bdev_io *bdev_io, bdev_io->status = SPDK_BDEV_IO_STATUS_PENDING; bdev_io->in_submit_request = false; bdev_io->buf = NULL; + bdev_io->io_submit_ch = NULL; } bool @@ -899,6 +944,47 @@ spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) return 0; } +static void +spdk_bdev_qos_get_max_ios_per_timeslice(struct spdk_bdev *bdev) +{ + uint64_t qos_max_ios_per_timeslice = 0; + + qos_max_ios_per_timeslice = bdev->ios_per_sec * SPDK_BDEV_QOS_TIMESLICE_IN_USEC / + SPDK_BDEV_SEC_TO_USEC; + bdev->qos_channel->qos_max_ios_per_timeslice = spdk_max(qos_max_ios_per_timeslice, + SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE); +} + +static void +spdk_bdev_channel_poll_qos(void *arg) +{ + struct spdk_bdev_channel *ch = arg; + struct spdk_bdev *bdev = ch->bdev; + + /* Reset for next round of rate limiting */ + ch->io_submitted_this_timeslice = 0; + spdk_bdev_qos_get_max_ios_per_timeslice(bdev); + + _spdk_bdev_qos_io_submit(ch); +} + +static void +spdk_bdev_qos_register_poller(void *ctx) +{ + struct spdk_bdev_channel *ch = ctx; + + ch->qos_poller = spdk_poller_register(spdk_bdev_channel_poll_qos, ch, + SPDK_BDEV_QOS_TIMESLICE_IN_USEC); +} + +static void +spdk_bdev_qos_unregister_poller(void *ctx) +{ + struct spdk_poller *poller = ctx; + + spdk_poller_unregister(&poller); +} + static int _spdk_bdev_channel_create(struct spdk_bdev_channel *ch, void *io_device) { @@ -955,13 +1041,18 @@ static void _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch) { struct spdk_bdev_mgmt_channel *mgmt_channel; - struct spdk_bdev_module_channel *shared_ch = ch->module_ch; + struct spdk_bdev_module_channel *shared_ch = NULL; + + if (!ch) { + return; + } if (ch->channel) { spdk_put_io_channel(ch->channel); } if (ch->mgmt_channel) { + shared_ch = ch->module_ch; if (shared_ch) { assert(shared_ch->ref > 0); shared_ch->ref--; @@ -976,9 +1067,102 @@ _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch) } } +static int +_spdk_bdev_qos_channel_create(struct spdk_bdev *bdev) +{ + bdev->qos_channel = calloc(1, sizeof(struct spdk_bdev_channel)); + if (!bdev->qos_channel) { + return -1; + } + + bdev->qos_thread = spdk_get_thread(); + if (!bdev->qos_thread) { + return -1; + } + + if (_spdk_bdev_channel_create(bdev->qos_channel, __bdev_to_io_dev(bdev)) != 0) { + return -1; + } + + bdev->qos_channel->flags |= BDEV_CH_QOS_ENABLED; + spdk_bdev_qos_get_max_ios_per_timeslice(bdev); + spdk_bdev_qos_register_poller(bdev->qos_channel); + + return 0; +} + +static void +_spdk_bdev_qos_channel_destroy(void *ctx) +{ + struct spdk_bdev_channel *qos_channel = ctx; + struct spdk_bdev *bdev = NULL; + struct spdk_poller *poller = NULL; + + if (!qos_channel) { + SPDK_DEBUGLOG(SPDK_LOG_BDEV, "QoS channel already NULL\n"); + return; + } + + bdev = qos_channel->bdev; + poller = qos_channel->qos_poller; + + assert(bdev->qos_thread == spdk_get_thread()); + assert(bdev->qos_channel == qos_channel); + + free(bdev->qos_channel); + bdev->qos_channel = NULL; + bdev->qos_thread = NULL; + + if (!poller) { + SPDK_DEBUGLOG(SPDK_LOG_BDEV, "QoS poller already NULL\n"); + } else { + spdk_bdev_qos_unregister_poller(poller); + } +} + +static void +spdk_bdev_qos_channel_create_async(void *ctx) +{ + struct spdk_bdev *bdev = ctx; + + if (!bdev->qos_channel) { + if (_spdk_bdev_qos_channel_create(bdev) != 0) { + SPDK_ERRLOG("QoS channel failed to create\n"); + _spdk_bdev_channel_destroy_resource(bdev->qos_channel); + _spdk_bdev_qos_channel_destroy(bdev->qos_channel); + } + } +} + +static int +spdk_bdev_qos_channel_create(void *ctx) +{ + struct spdk_bdev *bdev = ctx; + struct spdk_thread *qos_thread = bdev->qos_thread; + + /* + * There is an async destroying on going. + * Send a message to that thread to defer the creation. + */ + if (bdev->qos_channel_destroying == true) { + if (qos_thread) { + spdk_thread_send_msg(qos_thread, + spdk_bdev_qos_channel_create_async, bdev); + return 0; + } + } + + if (!bdev->qos_channel) { + return _spdk_bdev_qos_channel_create(bdev); + } else { + return 0; + } +} + static int spdk_bdev_channel_create(void *io_device, void *ctx_buf) { + struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); struct spdk_bdev_channel *ch = ctx_buf; if (_spdk_bdev_channel_create(ch, io_device) != 0) { @@ -986,6 +1170,20 @@ spdk_bdev_channel_create(void *io_device, void *ctx_buf) return -1; } + /* Rate limiting on this bdev enabled */ + if (bdev->ios_per_sec > 0) { + if (spdk_bdev_qos_channel_create(bdev) != 0) { + _spdk_bdev_channel_destroy_resource(ch); + _spdk_bdev_channel_destroy_resource(bdev->qos_channel); + _spdk_bdev_qos_channel_destroy(bdev->qos_channel); + return -1; + } + } + + pthread_mutex_lock(&bdev->mutex); + bdev->channel_count++; + pthread_mutex_unlock(&bdev->mutex); + #ifdef SPDK_CONFIG_VTUNE { char *name; @@ -993,6 +1191,8 @@ spdk_bdev_channel_create(void *io_device, void *ctx_buf) name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch); if (!name) { _spdk_bdev_channel_destroy_resource(ch); + _spdk_bdev_channel_destroy_resource(bdev->qos_channel); + _spdk_bdev_qos_channel_destroy(bdev->qos_channel); return -1; } ch->handle = __itt_string_handle_create(name); @@ -1073,12 +1273,41 @@ _spdk_bdev_channel_destroy(struct spdk_bdev_channel *ch) _spdk_bdev_channel_destroy_resource(ch); } +static void +spdk_bdev_qos_channel_destroy(void *ctx) +{ + struct spdk_bdev *bdev = ctx; + + bdev->qos_channel_destroying = false; + + _spdk_bdev_channel_destroy(bdev->qos_channel); + _spdk_bdev_qos_channel_destroy(bdev->qos_channel); +} + static void spdk_bdev_channel_destroy(void *io_device, void *ctx_buf) { struct spdk_bdev_channel *ch = ctx_buf; + struct spdk_bdev *bdev = ch->bdev; + uint32_t channel_count = 0; _spdk_bdev_channel_destroy(ch); + + pthread_mutex_lock(&bdev->mutex); + bdev->channel_count--; + channel_count = bdev->channel_count; + pthread_mutex_unlock(&bdev->mutex); + + /* Destroy QoS channel as no active bdev channels there */ + if (channel_count == 0 && bdev->ios_per_sec > 0 && bdev->qos_thread) { + if (bdev->qos_thread == spdk_get_thread()) { + spdk_bdev_qos_channel_destroy(bdev); + } else { + bdev->qos_channel_destroying = true; + spdk_thread_send_msg(bdev->qos_thread, + spdk_bdev_qos_channel_destroy, bdev); + } + } } int @@ -1903,13 +2132,30 @@ _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch) } } +static void +_spdk_bdev_qos_io_complete(void *ctx) +{ + struct spdk_bdev_io *bdev_io = ctx; + + bdev_io->cb(bdev_io, bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS, bdev_io->caller_ctx); +} + static void _spdk_bdev_io_complete(void *ctx) { struct spdk_bdev_io *bdev_io = ctx; assert(bdev_io->cb != NULL); - bdev_io->cb(bdev_io, bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS, bdev_io->caller_ctx); + + if (bdev_io->io_submit_ch) { + bdev_io->ch = bdev_io->io_submit_ch; + bdev_io->io_submit_ch = NULL; + spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->ch->channel), + _spdk_bdev_qos_io_complete, bdev_io); + } else { + bdev_io->cb(bdev_io, bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS, + bdev_io->caller_ctx); + } } static void diff --git a/test/unit/lib/bdev/mt/bdev.c/bdev_ut.c b/test/unit/lib/bdev/mt/bdev.c/bdev_ut.c index 4b04f4824..5b715ee7a 100644 --- a/test/unit/lib/bdev/mt/bdev.c/bdev_ut.c +++ b/test/unit/lib/bdev/mt/bdev.c/bdev_ut.c @@ -250,6 +250,19 @@ teardown_test(void) free_threads(); } +static uint32_t +bdev_io_tailq_cnt(bdev_io_tailq_t *tailq) +{ + struct spdk_bdev_io *io; + uint32_t cnt = 0; + + TAILQ_FOREACH(io, tailq, link) { + cnt++; + } + + return cnt; +} + static void basic(void) { @@ -459,7 +472,7 @@ aborted_reset(void) } static void -io_during_reset_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) +io_during_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) { enum spdk_bdev_io_status *status = cb_arg; @@ -486,7 +499,7 @@ io_during_reset(void) bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]); CU_ASSERT(bdev_ch[0]->flags == 0); status0 = SPDK_BDEV_IO_STATUS_PENDING; - rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_reset_done, &status0); + rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status0); CU_ASSERT(rc == 0); set_thread(1); @@ -494,7 +507,7 @@ io_during_reset(void) bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]); CU_ASSERT(bdev_ch[1]->flags == 0); status1 = SPDK_BDEV_IO_STATUS_PENDING; - rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_reset_done, &status1); + rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status1); CU_ASSERT(rc == 0); poll_threads(); @@ -516,7 +529,7 @@ io_during_reset(void) */ set_thread(0); status_reset = SPDK_BDEV_IO_STATUS_PENDING; - rc = spdk_bdev_reset(g_desc, io_ch[0], io_during_reset_done, &status_reset); + rc = spdk_bdev_reset(g_desc, io_ch[0], io_during_io_done, &status_reset); CU_ASSERT(rc == 0); CU_ASSERT(bdev_ch[0]->flags == 0); @@ -527,12 +540,12 @@ io_during_reset(void) set_thread(0); status0 = SPDK_BDEV_IO_STATUS_PENDING; - rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_reset_done, &status0); + rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status0); CU_ASSERT(rc == 0); set_thread(1); status1 = SPDK_BDEV_IO_STATUS_PENDING; - rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_reset_done, &status1); + rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status1); CU_ASSERT(rc == 0); /* @@ -571,6 +584,336 @@ io_during_reset(void) teardown_test(); } +static void +basic_qos(void) +{ + struct spdk_io_channel *io_ch[3]; + struct spdk_bdev_channel *bdev_ch[3], *qos_bdev_ch; + struct spdk_bdev *bdev; + enum spdk_bdev_io_status status; + struct spdk_bdev_module_channel *module_ch; + int rc; + + setup_test(); + + /* + * First test normal case - submit an I/O on the channel (QoS not enabled) + * and verify it completes successfully. + */ + set_thread(0); + g_get_io_channel = false; + io_ch[0] = spdk_bdev_get_io_channel(g_desc); + CU_ASSERT(io_ch[0] == NULL); + g_get_io_channel = true; + io_ch[0] = spdk_bdev_get_io_channel(g_desc); + bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]); + status = SPDK_BDEV_IO_STATUS_PENDING; + rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status); + CU_ASSERT(rc == 0); + CU_ASSERT(bdev_ch[0]->flags == 0); + + CU_ASSERT(status == SPDK_BDEV_IO_STATUS_PENDING); + + set_thread(0); + stub_complete_io(g_bdev.io_target, 0); + CU_ASSERT(status == SPDK_BDEV_IO_STATUS_SUCCESS); + + poll_threads(); + + set_thread(1); + bdev = &g_bdev.bdev; + bdev->ios_per_sec = 2000; + g_get_io_channel = false; + io_ch[1] = spdk_bdev_get_io_channel(g_desc); + CU_ASSERT(io_ch[1] == NULL); + bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]); + qos_bdev_ch = bdev->qos_channel; + CU_ASSERT(qos_bdev_ch == NULL); + g_get_io_channel = true; + io_ch[1] = spdk_bdev_get_io_channel(g_desc); + bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]); + qos_bdev_ch = bdev->qos_channel; + CU_ASSERT(bdev->qos_channel->flags == BDEV_CH_QOS_ENABLED); + CU_ASSERT(qos_bdev_ch != NULL); + module_ch = qos_bdev_ch->module_ch; + CU_ASSERT(module_ch->io_outstanding == 0); + CU_ASSERT(g_ut_threads[1].thread == bdev->qos_thread); + + /* + * Now sending one I/O on first channel + */ + set_thread(0); + status = SPDK_BDEV_IO_STATUS_PENDING; + rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status); + CU_ASSERT(rc == 0); + + poll_threads(); + CU_ASSERT(module_ch->io_outstanding == 1); + CU_ASSERT(status == SPDK_BDEV_IO_STATUS_PENDING); + + /* + * IO is operated on thread_id(1) via the QoS thread + */ + set_thread(1); + stub_complete_io(g_bdev.io_target, 1); + + poll_threads(); + CU_ASSERT(status == SPDK_BDEV_IO_STATUS_SUCCESS); + + /* + * QoS thread is on thread 1. Put I/O channel on thread 1 first + * to trigger an async destruction of QoS bdev channel. + */ + set_thread(1); + spdk_put_io_channel(io_ch[0]); + set_thread(0); + spdk_put_io_channel(io_ch[1]); + + /* + * Handle the messages on thread 1 first so that the QoS bdev + * channel destroy message from thread 0 handling will be active + * there. + */ + poll_thread(1); + poll_thread(0); + + /* + * Create a new I/O channel when the async destruction of QoS + * bdev channel is on going. The expected result is the QoS bdev + * channel will be properly setup again. + */ + set_thread(2); + io_ch[2] = spdk_bdev_get_io_channel(g_desc); + bdev_ch[2] = spdk_io_channel_get_ctx(io_ch[2]); + + poll_threads(); + + qos_bdev_ch = bdev->qos_channel; + CU_ASSERT(qos_bdev_ch->flags == BDEV_CH_QOS_ENABLED); + CU_ASSERT(qos_bdev_ch != NULL); + module_ch = qos_bdev_ch->module_ch; + CU_ASSERT(module_ch->io_outstanding == 0); + CU_ASSERT(g_ut_threads[1].thread == bdev->qos_thread); + + /* + * Destroy the last I/O channel so that the QoS bdev channel + * will be destroyed. + */ + set_thread(2); + spdk_put_io_channel(io_ch[2]); + + poll_threads(); + + teardown_test(); +} + +static void +io_during_qos(void) +{ + struct spdk_io_channel *io_ch[3]; + struct spdk_bdev_channel *bdev_ch[3], *qos_bdev_ch; + struct spdk_bdev *bdev; + enum spdk_bdev_io_status status0, status1; + struct spdk_bdev_module_channel *module_ch; + int rc; + + setup_test(); + + /* + * First test normal case - submit an I/O on each of two channels (QoS not enabled) + * and verify they complete successfully. + */ + set_thread(0); + io_ch[0] = spdk_bdev_get_io_channel(g_desc); + bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]); + status0 = SPDK_BDEV_IO_STATUS_PENDING; + rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status0); + CU_ASSERT(rc == 0); + CU_ASSERT(bdev_ch[0]->flags == 0); + + set_thread(1); + io_ch[1] = spdk_bdev_get_io_channel(g_desc); + bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]); + status1 = SPDK_BDEV_IO_STATUS_PENDING; + rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status1); + CU_ASSERT(rc == 0); + CU_ASSERT(bdev_ch[1]->flags == 0); + + CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_PENDING); + CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_PENDING); + + set_thread(0); + stub_complete_io(g_bdev.io_target, 0); + CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_SUCCESS); + + set_thread(1); + stub_complete_io(g_bdev.io_target, 0); + CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_SUCCESS); + + poll_threads(); + + set_thread(2); + bdev = &g_bdev.bdev; + /* + * 10 IOs allowed per millisecond + */ + bdev->ios_per_sec = 10000; + io_ch[2] = spdk_bdev_get_io_channel(g_desc); + bdev_ch[2] = spdk_io_channel_get_ctx(io_ch[2]); + qos_bdev_ch = bdev->qos_channel; + CU_ASSERT(bdev->qos_channel->flags == BDEV_CH_QOS_ENABLED); + CU_ASSERT(qos_bdev_ch != NULL); + module_ch = qos_bdev_ch->module_ch; + CU_ASSERT(module_ch->io_outstanding == 0); + + /* + * Now sending some I/Os on different channels when QoS has been enabled + */ + set_thread(0); + status0 = SPDK_BDEV_IO_STATUS_PENDING; + rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status0); + CU_ASSERT(rc == 0); + + set_thread(1); + status1 = SPDK_BDEV_IO_STATUS_PENDING; + rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status1); + CU_ASSERT(rc == 0); + + poll_threads(); + CU_ASSERT(module_ch->io_outstanding == 2); + CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_PENDING); + CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_PENDING); + + /* + * IOs are operated on thread_id(2) via the QoS thread + */ + set_thread(2); + stub_complete_io(g_bdev.io_target, 2); + + poll_threads(); + CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_SUCCESS); + CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_SUCCESS); + + set_thread(0); + spdk_put_io_channel(io_ch[0]); + set_thread(1); + spdk_put_io_channel(io_ch[1]); + set_thread(2); + spdk_put_io_channel(io_ch[2]); + + poll_threads(); + + teardown_test(); +} + +static void +io_during_qos_queue(void) +{ + struct spdk_io_channel *io_ch[3]; + struct spdk_bdev_channel *bdev_ch[3], *qos_bdev_ch; + struct spdk_bdev *bdev; + enum spdk_bdev_io_status status0, status1; + struct spdk_bdev_module_channel *module_ch; + int rc; + + setup_test(); + reset_time(); + + /* + * First test normal case - submit an I/O on each of two channels (QoS not enabled) + * and verify they complete successfully. + */ + set_thread(0); + io_ch[0] = spdk_bdev_get_io_channel(g_desc); + bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]); + status0 = SPDK_BDEV_IO_STATUS_PENDING; + rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status0); + CU_ASSERT(rc == 0); + CU_ASSERT(bdev_ch[0]->flags == 0); + + set_thread(1); + io_ch[1] = spdk_bdev_get_io_channel(g_desc); + bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]); + status1 = SPDK_BDEV_IO_STATUS_PENDING; + rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status1); + CU_ASSERT(rc == 0); + CU_ASSERT(bdev_ch[1]->flags == 0); + + poll_threads(); + CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_PENDING); + CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_PENDING); + + set_thread(0); + stub_complete_io(g_bdev.io_target, 0); + CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_SUCCESS); + + set_thread(1); + stub_complete_io(g_bdev.io_target, 0); + CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_SUCCESS); + + poll_threads(); + + set_thread(2); + bdev = bdev_ch[0]->bdev; + /* + * Only 1 IO allowed per millisecond. More IOs will be queued. + */ + bdev->ios_per_sec = 1000; + io_ch[2] = spdk_bdev_get_io_channel(g_desc); + bdev_ch[2] = spdk_io_channel_get_ctx(io_ch[2]); + qos_bdev_ch = bdev->qos_channel; + CU_ASSERT(bdev->qos_channel->flags == BDEV_CH_QOS_ENABLED); + CU_ASSERT(qos_bdev_ch != NULL); + module_ch = qos_bdev_ch->module_ch; + CU_ASSERT(module_ch->io_outstanding == 0); + + /* + * Now sending some I/Os on different channels when QoS has been enabled + */ + set_thread(0); + status0 = SPDK_BDEV_IO_STATUS_PENDING; + rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status0); + CU_ASSERT(rc == 0); + + set_thread(1); + status1 = SPDK_BDEV_IO_STATUS_PENDING; + rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status1); + CU_ASSERT(rc == 0); + + /* + * Poll the QoS thread to send the allowed I/O down + */ + poll_threads(); + CU_ASSERT(module_ch->io_outstanding == 1); + CU_ASSERT(bdev_io_tailq_cnt(&qos_bdev_ch->qos_io) == 1); + CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_PENDING); + + /* + * Increase the time and poll the QoS thread to run the periodical poller + */ + increment_time(1000); + poll_threads(); + CU_ASSERT(module_ch->io_outstanding == 2); + CU_ASSERT(bdev_io_tailq_cnt(&qos_bdev_ch->qos_io) == 0); + CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_PENDING); + + /* + * IOs are handled on the thread(2) as the master thread + */ + set_thread(2); + stub_complete_io(g_bdev.io_target, 0); + spdk_put_io_channel(io_ch[0]); + spdk_put_io_channel(io_ch[1]); + spdk_put_io_channel(io_ch[2]); + + poll_threads(); + + CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_SUCCESS); + CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_SUCCESS); + + teardown_test(); +} + static void enomem_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) { @@ -580,19 +923,6 @@ enomem_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) spdk_bdev_free_io(bdev_io); } -static uint32_t -bdev_io_tailq_cnt(bdev_io_tailq_t *tailq) -{ - struct spdk_bdev_io *io; - uint32_t cnt = 0; - - TAILQ_FOREACH(io, tailq, link) { - cnt++; - } - - return cnt; -} - static void enomem(void) { @@ -780,9 +1110,12 @@ main(int argc, char **argv) if ( CU_add_test(suite, "basic", basic) == NULL || CU_add_test(suite, "basic_poller", basic_poller) == NULL || + CU_add_test(suite, "basic_qos", basic_qos) == NULL || CU_add_test(suite, "put_channel_during_reset", put_channel_during_reset) == NULL || CU_add_test(suite, "aborted_reset", aborted_reset) == NULL || CU_add_test(suite, "io_during_reset", io_during_reset) == NULL || + CU_add_test(suite, "io_during_qos", io_during_qos) == NULL || + CU_add_test(suite, "io_during_qos_queue", io_during_qos_queue) == NULL || CU_add_test(suite, "enomem", enomem) == NULL || CU_add_test(suite, "enomem_multi_bdev", enomem_multi_bdev) == NULL ) {