bdev: queue new IO that overlap a locked region

Add an io_locked TAILQ to each channel, which hold
IO that will write to a currently locked region.

Also add a new step to the locking process per channel.
Each channel needs to wait until all existing outstanding
writes to the newly locked range have been completed.

Only the channel that locked an LBA range may submit
write I/O to it.  It must use the same cb_arg for the
write I/O as the cb_arg used when locking the LBA range.
This ensures that only the specific I/O operations needing
the lock will bypass the lock.

When a range is unlocked, we will just blindly try to
resubmit all IO in the io_locked tailq.  This could be
made more efficient in the future, but we don't expect
this path to occur very often, so going for simplicity
in the first pass.

Signed-off-by: Jim Harris <james.r.harris@intel.com>
Change-Id: Ibdc992144dfaffe7c05471a5b3c020cedd8cdfc3

Reviewed-on: https://review.gerrithub.io/c/spdk/spdk/+/478226
Community-CI: Broadcom SPDK FC-NVMe CI <spdk-ci.pdl@broadcom.com>
Tested-by: SPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: Shuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
Reviewed-by: Paul Luse <paul.e.luse@intel.com>
Reviewed-by: Maciej Szwed <maciej.szwed@intel.com>
Reviewed-by: Ben Walker <benjamin.walker@intel.com>
This commit is contained in:
Jim Harris 2019-12-17 04:27:11 -07:00 committed by Tomasz Zawadzki
parent d84a88c1a0
commit b90b7ce477
3 changed files with 349 additions and 4 deletions

View File

@ -129,6 +129,7 @@ typedef void (*lock_range_cb)(void *ctx, int status);
struct lba_range {
uint64_t offset;
uint64_t length;
void *locked_ctx;
struct spdk_bdev_channel *owner_ch;
TAILQ_ENTRY(lba_range) tailq;
};
@ -274,6 +275,12 @@ struct spdk_bdev_channel {
*/
bdev_io_tailq_t io_submitted;
/*
* List of spdk_bdev_io that are currently queued because they write to a locked
* LBA range.
*/
bdev_io_tailq_t io_locked;
uint32_t flags;
struct spdk_histogram_data *histogram;
@ -1927,6 +1934,42 @@ bdev_lba_range_overlapped(struct lba_range *range1, struct lba_range *range2)
return true;
}
static bool
bdev_io_range_is_locked(struct spdk_bdev_io *bdev_io, struct lba_range *range)
{
struct spdk_bdev_channel *ch = bdev_io->internal.ch;
struct lba_range r;
switch (bdev_io->type) {
case SPDK_BDEV_IO_TYPE_NVME_IO:
case SPDK_BDEV_IO_TYPE_NVME_IO_MD:
/* Don't try to decode the NVMe command - just assume worst-case and that
* it overlaps a locked range.
*/
return true;
case SPDK_BDEV_IO_TYPE_WRITE:
case SPDK_BDEV_IO_TYPE_UNMAP:
case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
case SPDK_BDEV_IO_TYPE_ZCOPY:
r.offset = bdev_io->u.bdev.offset_blocks;
r.length = bdev_io->u.bdev.num_blocks;
if (!bdev_lba_range_overlapped(range, &r)) {
/* This I/O doesn't overlap the specified LBA range. */
return false;
} else if (range->owner_ch == ch && range->locked_ctx == bdev_io->internal.caller_ctx) {
/* This I/O overlaps, but the I/O is on the same channel that locked this
* range, and the caller_ctx is the same as the locked_ctx. This means
* that this I/O is associated with the lock, and is allowed to execute.
*/
return false;
} else {
return true;
}
default:
return false;
}
}
void
bdev_io_submit(struct spdk_bdev_io *bdev_io)
{
@ -1937,6 +1980,17 @@ bdev_io_submit(struct spdk_bdev_io *bdev_io)
assert(thread != NULL);
assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING);
if (!TAILQ_EMPTY(&ch->locked_ranges)) {
struct lba_range *range;
TAILQ_FOREACH(range, &ch->locked_ranges, tailq) {
if (bdev_io_range_is_locked(bdev_io, range)) {
TAILQ_INSERT_TAIL(&ch->io_locked, bdev_io, internal.ch_link);
return;
}
}
}
/* Add the bdev_io to io_submitted only if it is the original
* submission from the bdev user. When a bdev_io is split,
* it comes back through this code path, so we need to make sure
@ -2120,6 +2174,7 @@ bdev_channel_destroy_resource(struct spdk_bdev_channel *ch)
shared_resource = ch->shared_resource;
assert(TAILQ_EMPTY(&ch->io_locked));
assert(TAILQ_EMPTY(&ch->io_submitted));
assert(ch->io_outstanding == 0);
assert(shared_resource->ref > 0);
@ -2376,6 +2431,7 @@ bdev_channel_create(void *io_device, void *ctx_buf)
ch->shared_resource = shared_resource;
TAILQ_INIT(&ch->io_submitted);
TAILQ_INIT(&ch->io_locked);
#ifdef SPDK_CONFIG_VTUNE
{
@ -5668,6 +5724,7 @@ spdk_bdev_notify_media_management(struct spdk_bdev *bdev)
struct locked_lba_range_ctx {
struct lba_range range;
struct spdk_bdev *bdev;
struct lba_range *current_range;
struct lba_range *owner_range;
struct spdk_poller *poller;
lock_range_cb cb_fn;
@ -5715,6 +5772,33 @@ bdev_lock_lba_range_cb(struct spdk_io_channel_iter *i, int status)
free(ctx);
}
static int
bdev_lock_lba_range_check_io(void *_i)
{
struct spdk_io_channel_iter *i = _i;
struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
struct lba_range *range = ctx->current_range;
struct spdk_bdev_io *bdev_io;
spdk_poller_unregister(&ctx->poller);
/* The range is now in the locked_ranges, so no new IO can be submitted to this
* range. But we need to wait until any outstanding IO overlapping with this range
* are completed.
*/
TAILQ_FOREACH(bdev_io, &ch->io_submitted, internal.ch_link) {
if (bdev_io_range_is_locked(bdev_io, range)) {
ctx->poller = spdk_poller_register(bdev_lock_lba_range_check_io, i, 100);
return 1;
}
}
spdk_for_each_channel_continue(i, 0);
return 1;
}
static void
bdev_lock_lba_range_get_channel(struct spdk_io_channel_iter *i)
{
@ -5731,6 +5815,8 @@ bdev_lock_lba_range_get_channel(struct spdk_io_channel_iter *i)
range->length = ctx->range.length;
range->offset = ctx->range.offset;
range->locked_ctx = ctx->range.locked_ctx;
ctx->current_range = range;
if (ctx->range.owner_ch == ch) {
/* This is the range object for the channel that will hold
* the lock. Store it in the ctx object so that we can easily
@ -5739,7 +5825,7 @@ bdev_lock_lba_range_get_channel(struct spdk_io_channel_iter *i)
ctx->owner_range = range;
}
TAILQ_INSERT_TAIL(&ch->locked_ranges, range, tailq);
spdk_for_each_channel_continue(i, 0);
bdev_lock_lba_range_check_io(i);
}
static void
@ -5764,6 +5850,11 @@ bdev_lock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch,
struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
struct locked_lba_range_ctx *ctx;
if (cb_arg == NULL) {
SPDK_ERRLOG("cb_arg must not be NULL\n");
return -EINVAL;
}
ctx = calloc(1, sizeof(*ctx));
if (ctx == NULL) {
return -ENOMEM;
@ -5772,6 +5863,7 @@ bdev_lock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch,
ctx->range.offset = offset;
ctx->range.length = length;
ctx->range.owner_ch = ch;
ctx->range.locked_ctx = cb_arg;
ctx->bdev = bdev;
ctx->cb_fn = cb_fn;
ctx->cb_arg = cb_arg;
@ -5795,6 +5887,8 @@ bdev_unlock_lba_range_get_channel(struct spdk_io_channel_iter *i)
struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
TAILQ_HEAD(, spdk_bdev_io) io_locked;
struct spdk_bdev_io *bdev_io;
struct lba_range *range;
TAILQ_FOREACH(range, &ch->locked_ranges, tailq) {
@ -5815,6 +5909,19 @@ bdev_unlock_lba_range_get_channel(struct spdk_io_channel_iter *i)
* So we can't actually assert() here.
*/
/* Swap the locked IO into a temporary list, and then try to submit them again.
* We could hyper-optimize this to only resubmit locked I/O that overlap
* with the range that was just unlocked, but this isn't a performance path so
* we go for simplicity here.
*/
TAILQ_INIT(&io_locked);
TAILQ_SWAP(&ch->io_locked, &io_locked, spdk_bdev_io, internal.ch_link);
while (!TAILQ_EMPTY(&io_locked)) {
bdev_io = TAILQ_FIRST(&io_locked);
TAILQ_REMOVE(&io_locked, bdev_io, internal.ch_link);
bdev_io_submit(bdev_io);
}
spdk_for_each_channel_continue(i, 0);
}
@ -5839,7 +5946,7 @@ bdev_unlock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch,
*/
TAILQ_FOREACH(range, &ch->locked_ranges, tailq) {
if (range->offset == offset && range->length == length &&
range->owner_ch == ch) {
range->owner_ch == ch && range->locked_ctx == cb_arg) {
range_found = true;
break;
}
@ -5857,6 +5964,7 @@ bdev_unlock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch,
ctx->range.offset = offset;
ctx->range.length = length;
ctx->range.owner_ch = ch;
ctx->range.locked_ctx = cb_arg;
ctx->cb_fn = cb_fn;
ctx->cb_arg = cb_arg;

View File

@ -2656,6 +2656,101 @@ lock_lba_range_check_ranges(void)
poll_threads();
}
static void
lock_lba_range_with_io_outstanding(void)
{
struct spdk_bdev *bdev;
struct spdk_bdev_desc *desc = NULL;
struct spdk_io_channel *io_ch;
struct spdk_bdev_channel *channel;
struct lba_range *range;
char buf[4096];
int ctx1;
int rc;
spdk_bdev_initialize(bdev_init_cb, NULL);
bdev = allocate_bdev("bdev0");
rc = spdk_bdev_open(bdev, true, NULL, NULL, &desc);
CU_ASSERT(rc == 0);
CU_ASSERT(desc != NULL);
io_ch = spdk_bdev_get_io_channel(desc);
CU_ASSERT(io_ch != NULL);
channel = spdk_io_channel_get_ctx(io_ch);
g_io_done = false;
rc = spdk_bdev_read_blocks(desc, io_ch, buf, 20, 1, io_done, &ctx1);
CU_ASSERT(rc == 0);
g_lock_lba_range_done = false;
rc = bdev_lock_lba_range(desc, io_ch, 20, 10, lock_lba_range_done, &ctx1);
CU_ASSERT(rc == 0);
poll_threads();
/* The lock should immediately become valid, since there are no outstanding
* write I/O.
*/
CU_ASSERT(g_io_done == false);
CU_ASSERT(g_lock_lba_range_done == true);
range = TAILQ_FIRST(&channel->locked_ranges);
SPDK_CU_ASSERT_FATAL(range != NULL);
CU_ASSERT(range->offset == 20);
CU_ASSERT(range->length == 10);
CU_ASSERT(range->owner_ch == channel);
CU_ASSERT(range->locked_ctx == &ctx1);
rc = bdev_unlock_lba_range(desc, io_ch, 20, 10, lock_lba_range_done, &ctx1);
CU_ASSERT(rc == 0);
stub_complete_io(1);
spdk_delay_us(100);
poll_threads();
CU_ASSERT(TAILQ_EMPTY(&channel->locked_ranges));
/* Now try again, but with a write I/O. */
g_io_done = false;
rc = spdk_bdev_write_blocks(desc, io_ch, buf, 20, 1, io_done, &ctx1);
CU_ASSERT(rc == 0);
g_lock_lba_range_done = false;
rc = bdev_lock_lba_range(desc, io_ch, 20, 10, lock_lba_range_done, &ctx1);
CU_ASSERT(rc == 0);
poll_threads();
/* The lock should not be fully valid yet, since a write I/O is outstanding.
* But note that the range should be on the channel's locked_list, to make sure no
* new write I/O are started.
*/
CU_ASSERT(g_io_done == false);
CU_ASSERT(g_lock_lba_range_done == false);
range = TAILQ_FIRST(&channel->locked_ranges);
SPDK_CU_ASSERT_FATAL(range != NULL);
CU_ASSERT(range->offset == 20);
CU_ASSERT(range->length == 10);
/* Complete the write I/O. This should make the lock valid (checked by confirming
* our callback was invoked).
*/
stub_complete_io(1);
spdk_delay_us(100);
poll_threads();
CU_ASSERT(g_io_done == true);
CU_ASSERT(g_lock_lba_range_done == true);
rc = bdev_unlock_lba_range(desc, io_ch, 20, 10, unlock_lba_range_done, &ctx1);
CU_ASSERT(rc == 0);
poll_threads();
CU_ASSERT(TAILQ_EMPTY(&channel->locked_ranges));
spdk_put_io_channel(io_ch);
spdk_bdev_close(desc);
free_bdev(bdev);
spdk_bdev_finish(bdev_fini_cb, NULL);
poll_threads();
}
int
main(int argc, char **argv)
{
@ -2693,7 +2788,8 @@ main(int argc, char **argv)
CU_add_test(suite, "bdev_open_ext", bdev_open_ext) == NULL ||
CU_add_test(suite, "bdev_set_io_timeout", bdev_set_io_timeout) == NULL ||
CU_add_test(suite, "lba_range_overlap", lba_range_overlap) == NULL ||
CU_add_test(suite, "lock_lba_range_check_ranges", lock_lba_range_check_ranges) == NULL
CU_add_test(suite, "lock_lba_range_check_ranges", lock_lba_range_check_ranges) == NULL ||
CU_add_test(suite, "lock_lba_range_with_io_outstanding", lock_lba_range_with_io_outstanding) == NULL
) {
CU_cleanup_registry();
return CU_get_error();

View File

@ -1544,9 +1544,12 @@ bdev_channel_io_timeout_cb(void *cb_arg, struct spdk_bdev_io *bdev_io)
ctx->iov.iov_len = bdev_io->iov.iov_len;
}
static bool g_io_done;
static void
io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
{
g_io_done = true;
spdk_bdev_free_io(bdev_io);
}
@ -1721,6 +1724,143 @@ bdev_set_io_timeout_mt(void)
free_threads();
}
static bool g_lock_lba_range_done;
static bool g_unlock_lba_range_done;
static void
lock_lba_range_done(void *ctx, int status)
{
g_lock_lba_range_done = true;
}
static void
unlock_lba_range_done(void *ctx, int status)
{
g_unlock_lba_range_done = true;
}
static uint32_t
stub_channel_outstanding_cnt(void *io_target)
{
struct spdk_io_channel *_ch = spdk_get_io_channel(io_target);
struct ut_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
uint32_t outstanding_cnt;
outstanding_cnt = ch->outstanding_cnt;
spdk_put_io_channel(_ch);
return outstanding_cnt;
}
static void
lock_lba_range_then_submit_io(void)
{
struct spdk_bdev_desc *desc = NULL;
void *io_target;
struct spdk_io_channel *io_ch[3];
struct spdk_bdev_channel *bdev_ch[3];
struct lba_range *range;
char buf[4096];
int ctx0, ctx1;
int rc;
setup_test();
io_target = g_bdev.io_target;
desc = g_desc;
set_thread(0);
io_ch[0] = spdk_bdev_get_io_channel(desc);
bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
CU_ASSERT(io_ch[0] != NULL);
set_thread(1);
io_ch[1] = spdk_bdev_get_io_channel(desc);
bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
CU_ASSERT(io_ch[1] != NULL);
set_thread(0);
g_lock_lba_range_done = false;
rc = bdev_lock_lba_range(desc, io_ch[0], 20, 10, lock_lba_range_done, &ctx0);
CU_ASSERT(rc == 0);
poll_threads();
/* The lock should immediately become valid, since there are no outstanding
* write I/O.
*/
CU_ASSERT(g_lock_lba_range_done == true);
range = TAILQ_FIRST(&bdev_ch[0]->locked_ranges);
SPDK_CU_ASSERT_FATAL(range != NULL);
CU_ASSERT(range->offset == 20);
CU_ASSERT(range->length == 10);
CU_ASSERT(range->owner_ch == bdev_ch[0]);
g_io_done = false;
CU_ASSERT(TAILQ_EMPTY(&bdev_ch[0]->io_locked));
rc = spdk_bdev_read_blocks(desc, io_ch[0], buf, 20, 1, io_done, &ctx0);
CU_ASSERT(rc == 0);
CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 1);
stub_complete_io(io_target, 1);
poll_threads();
CU_ASSERT(g_io_done == true);
CU_ASSERT(TAILQ_EMPTY(&bdev_ch[0]->io_locked));
/* Try a write I/O. This should actually be allowed to execute, since the channel
* holding the lock is submitting the write I/O.
*/
g_io_done = false;
CU_ASSERT(TAILQ_EMPTY(&bdev_ch[0]->io_locked));
rc = spdk_bdev_write_blocks(desc, io_ch[0], buf, 20, 1, io_done, &ctx0);
CU_ASSERT(rc == 0);
CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 1);
stub_complete_io(io_target, 1);
poll_threads();
CU_ASSERT(g_io_done == true);
CU_ASSERT(TAILQ_EMPTY(&bdev_ch[0]->io_locked));
/* Try a write I/O. This should get queued in the io_locked tailq. */
set_thread(1);
g_io_done = false;
CU_ASSERT(TAILQ_EMPTY(&bdev_ch[1]->io_locked));
rc = spdk_bdev_write_blocks(desc, io_ch[1], buf, 20, 1, io_done, &ctx1);
CU_ASSERT(rc == 0);
poll_threads();
CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 0);
CU_ASSERT(!TAILQ_EMPTY(&bdev_ch[1]->io_locked));
CU_ASSERT(g_io_done == false);
/* Try to unlock the lba range using thread 1's io_ch. This should fail. */
rc = bdev_unlock_lba_range(desc, io_ch[1], 20, 10, unlock_lba_range_done, &ctx1);
CU_ASSERT(rc == -EINVAL);
set_thread(0);
rc = bdev_unlock_lba_range(desc, io_ch[0], 20, 10, unlock_lba_range_done, &ctx0);
CU_ASSERT(rc == 0);
poll_threads();
CU_ASSERT(TAILQ_EMPTY(&bdev_ch[0]->locked_ranges));
/* The LBA range is unlocked, so the write IOs should now have started execution. */
CU_ASSERT(TAILQ_EMPTY(&bdev_ch[1]->io_locked));
set_thread(1);
CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 1);
stub_complete_io(io_target, 1);
poll_threads();
CU_ASSERT(g_io_done == true);
/* Tear down the channels */
set_thread(0);
spdk_put_io_channel(io_ch[0]);
set_thread(1);
spdk_put_io_channel(io_ch[1]);
poll_threads();
set_thread(0);
teardown_test();
}
int
main(int argc, char **argv)
{
@ -1751,7 +1891,8 @@ main(int argc, char **argv)
CU_add_test(suite, "enomem_multi_io_target", enomem_multi_io_target) == NULL ||
CU_add_test(suite, "qos_dynamic_enable", qos_dynamic_enable) == NULL ||
CU_add_test(suite, "bdev_histograms_mt", bdev_histograms_mt) == NULL ||
CU_add_test(suite, "bdev_set_io_timeout_mt", bdev_set_io_timeout_mt) == NULL
CU_add_test(suite, "bdev_set_io_timeout_mt", bdev_set_io_timeout_mt) == NULL ||
CU_add_test(suite, "lock_lba_range_then_submit_io", lock_lba_range_then_submit_io) == NULL
) {
CU_cleanup_registry();
return CU_get_error();