blobstore: freeze I/O during snapshoting.

Signed-off-by: Piotr Pelplinski <piotr.pelplinski@intel.com>
Change-Id: I6182eb3a77d23db7088703492d71349e3a4b6460
Reviewed-on: https://review.gerrithub.io/399366
Tested-by: SPDK Automated Test System <sys_sgsw@intel.com>
Reviewed-by: Ben Walker <benjamin.walker@intel.com>
Reviewed-by: Jim Harris <james.r.harris@intel.com>
This commit is contained in:
Piotr Pelplinski 2018-05-16 16:46:52 +02:00 committed by Jim Harris
parent 4c4c8ca269
commit 8c45ed3822
3 changed files with 293 additions and 14 deletions

View File

@ -222,6 +222,105 @@ _spdk_blob_free(struct spdk_blob *blob)
free(blob);
}
struct freeze_io_ctx {
struct spdk_bs_cpl cpl;
struct spdk_blob *blob;
};
static void
_spdk_blob_io_sync(struct spdk_io_channel_iter *i)
{
spdk_for_each_channel_continue(i, 0);
}
static void
_spdk_blob_execute_queued_io(struct spdk_io_channel_iter *i)
{
struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
struct spdk_bs_channel *ch = spdk_io_channel_get_ctx(_ch);
struct freeze_io_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
struct spdk_bs_request_set *set;
struct spdk_bs_user_op_args *args;
spdk_bs_user_op_t *op, *tmp;
TAILQ_FOREACH_SAFE(op, &ch->queued_io, link, tmp) {
set = (struct spdk_bs_request_set *)op;
args = &set->u.user_op;
if (args->blob == ctx->blob) {
TAILQ_REMOVE(&ch->queued_io, op, link);
spdk_bs_user_op_execute(op);
}
}
spdk_for_each_channel_continue(i, 0);
}
static void
_spdk_blob_io_cpl(struct spdk_io_channel_iter *i, int status)
{
struct freeze_io_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
ctx->cpl.u.blob_basic.cb_fn(ctx->cpl.u.blob_basic.cb_arg, 0);
free(ctx);
}
static void
_spdk_blob_freeze_io(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg)
{
struct freeze_io_ctx *ctx;
ctx = calloc(1, sizeof(*ctx));
if (!ctx) {
cb_fn(cb_arg, -ENOMEM);
return;
}
ctx->cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
ctx->cpl.u.blob_basic.cb_fn = cb_fn;
ctx->cpl.u.blob_basic.cb_arg = cb_arg;
ctx->blob = blob;
/* Freeze I/O on blob */
blob->frozen_refcnt++;
if (blob->frozen_refcnt == 1) {
spdk_for_each_channel(blob->bs, _spdk_blob_io_sync, ctx, _spdk_blob_io_cpl);
} else {
cb_fn(cb_arg, 0);
free(ctx);
}
}
static void
_spdk_blob_unfreeze_io(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg)
{
struct freeze_io_ctx *ctx;
ctx = calloc(1, sizeof(*ctx));
if (!ctx) {
cb_fn(cb_arg, -ENOMEM);
return;
}
ctx->cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
ctx->cpl.u.blob_basic.cb_fn = cb_fn;
ctx->cpl.u.blob_basic.cb_arg = cb_arg;
ctx->blob = blob;
assert(blob->frozen_refcnt > 0);
blob->frozen_refcnt--;
if (blob->frozen_refcnt == 0) {
spdk_for_each_channel(blob->bs, _spdk_blob_execute_queued_io, ctx, _spdk_blob_io_cpl);
} else {
cb_fn(cb_arg, 0);
free(ctx);
}
}
static int
_spdk_blob_mark_clean(struct spdk_blob *blob)
{
@ -1716,6 +1815,22 @@ _spdk_blob_request_submit_op_single(struct spdk_io_channel *_ch, struct spdk_blo
_spdk_blob_calculate_lba_and_lba_count(blob, offset, length, &lba, &lba_count);
if (blob->frozen_refcnt) {
/* This blob I/O is frozen */
spdk_bs_user_op_t *op;
struct spdk_bs_channel *bs_channel = spdk_io_channel_get_ctx(_ch);
op = spdk_bs_user_op_alloc(_ch, &cpl, op_type, blob, payload, 0, offset, length);
if (!op) {
cb_fn(cb_arg, -ENOMEM);
return;
}
TAILQ_INSERT_TAIL(&bs_channel->queued_io, op, link);
return;
}
switch (op_type) {
case SPDK_BLOB_READ: {
spdk_bs_batch_t *batch;
@ -1963,6 +2078,21 @@ _spdk_blob_request_submit_rw_iov(struct spdk_blob *blob, struct spdk_io_channel
cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
cpl.u.blob_basic.cb_fn = cb_fn;
cpl.u.blob_basic.cb_arg = cb_arg;
if (blob->frozen_refcnt) {
/* This blob I/O is frozen */
spdk_bs_user_op_t *op;
struct spdk_bs_channel *bs_channel = spdk_io_channel_get_ctx(_channel);
op = spdk_bs_user_op_alloc(_channel, &cpl, read, blob, iov, iovcnt, offset, length);
if (!op) {
cb_fn(cb_arg, -ENOMEM);
return;
}
TAILQ_INSERT_TAIL(&bs_channel->queued_io, op, link);
return;
}
if (read) {
spdk_bs_sequence_t *seq;
@ -2074,6 +2204,7 @@ _spdk_bs_channel_create(void *io_device, void *ctx_buf)
}
TAILQ_INIT(&channel->need_cluster_alloc);
TAILQ_INIT(&channel->queued_io);
return 0;
}
@ -2090,6 +2221,12 @@ _spdk_bs_channel_destroy(void *io_device, void *ctx_buf)
spdk_bs_user_op_abort(op);
}
while (!TAILQ_EMPTY(&channel->queued_io)) {
op = TAILQ_FIRST(&channel->queued_io);
TAILQ_REMOVE(&channel->queued_io, op, link);
spdk_bs_user_op_abort(op);
}
free(channel->req_mem);
channel->dev->destroy_channel(channel->dev, channel->dev_channel);
}
@ -3757,6 +3894,7 @@ void spdk_bs_create_blob_ext(struct spdk_blob_store *bs, const struct spdk_blob_
struct spdk_clone_snapshot_ctx {
struct spdk_bs_cpl cpl;
int bserrno;
bool frozen;
struct spdk_io_channel *channel;
@ -3806,6 +3944,24 @@ _spdk_bs_clone_snapshot_cleanup_finish(void *cb_arg, int bserrno)
free(ctx);
}
static void
_spdk_bs_snapshot_unfreeze_cpl(void *cb_arg, int bserrno)
{
struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
struct spdk_blob *origblob = ctx->original.blob;
if (bserrno != 0) {
if (ctx->bserrno != 0) {
SPDK_ERRLOG("Unfreeze error %d\n", bserrno);
} else {
ctx->bserrno = bserrno;
}
}
ctx->original.id = origblob->id;
spdk_blob_close(origblob, _spdk_bs_clone_snapshot_cleanup_finish, ctx);
}
static void
_spdk_bs_clone_snapshot_origblob_cleanup(void *cb_arg, int bserrno)
{
@ -3820,8 +3976,13 @@ _spdk_bs_clone_snapshot_origblob_cleanup(void *cb_arg, int bserrno)
}
}
ctx->original.id = origblob->id;
spdk_blob_close(origblob, _spdk_bs_clone_snapshot_cleanup_finish, ctx);
if (ctx->frozen) {
/* Unfreeze any outstanding I/O */
_spdk_blob_unfreeze_io(origblob, _spdk_bs_snapshot_unfreeze_cpl, ctx);
} else {
_spdk_bs_snapshot_unfreeze_cpl(ctx, 0);
}
}
static void
@ -3912,6 +4073,28 @@ _spdk_bs_snapshot_newblob_sync_cpl(void *cb_arg, int bserrno)
spdk_blob_sync_md(origblob, _spdk_bs_snapshot_origblob_sync_cpl, ctx);
}
static void
_spdk_bs_snapshot_freeze_cpl(void *cb_arg, int rc)
{
struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
struct spdk_blob *origblob = ctx->original.blob;
struct spdk_blob *newblob = ctx->new.blob;
ctx->frozen = true;
/* set new back_bs_dev for snapshot */
newblob->back_bs_dev = origblob->back_bs_dev;
/* Set invalid flags from origblob */
newblob->invalid_flags = origblob->invalid_flags;
/* Copy cluster map to snapshot */
memcpy(newblob->active.clusters, origblob->active.clusters,
origblob->active.num_clusters * sizeof(origblob->active.clusters));
/* sync snapshot metadata */
spdk_blob_sync_md(newblob, _spdk_bs_snapshot_newblob_sync_cpl, ctx);
}
static void
_spdk_bs_snapshot_newblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno)
{
@ -3926,17 +4109,7 @@ _spdk_bs_snapshot_newblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bs
ctx->new.blob = newblob;
/* set new back_bs_dev for snapshot */
newblob->back_bs_dev = origblob->back_bs_dev;
/* Set invalid flags from origblob */
newblob->invalid_flags = origblob->invalid_flags;
/* Copy cluster map to snapshot */
memcpy(newblob->active.clusters, origblob->active.clusters,
origblob->active.num_clusters * sizeof(origblob->active.clusters));
/* sync snapshot metadata */
spdk_blob_sync_md(newblob, _spdk_bs_snapshot_newblob_sync_cpl, ctx);
_spdk_blob_freeze_io(origblob, _spdk_bs_snapshot_freeze_cpl, ctx);
}
static void
@ -4027,6 +4200,7 @@ void spdk_bs_create_snapshot(struct spdk_blob_store *bs, spdk_blob_id blobid,
ctx->cpl.u.blobid.cb_arg = cb_arg;
ctx->cpl.u.blobid.blobid = SPDK_BLOBID_INVALID;
ctx->bserrno = 0;
ctx->frozen = false;
ctx->original.id = blobid;
ctx->xattrs = snapshot_xattrs;

View File

@ -147,6 +147,8 @@ struct spdk_blob {
struct spdk_xattr_tailq xattrs_internal;
TAILQ_ENTRY(spdk_blob) link;
uint32_t frozen_refcnt;
};
struct spdk_blob_store {
@ -194,6 +196,7 @@ struct spdk_bs_channel {
struct spdk_io_channel *dev_channel;
TAILQ_HEAD(, spdk_bs_request_set) need_cluster_alloc;
TAILQ_HEAD(, spdk_bs_request_set) queued_io;
};
/** operation type */

View File

@ -635,6 +635,107 @@ blob_snapshot(void)
g_bs = NULL;
}
static void
blob_snapshot_freeze_io(void)
{
struct spdk_io_channel *channel;
struct spdk_bs_channel *bs_channel;
struct spdk_blob_store *bs;
struct spdk_bs_dev *dev;
struct spdk_blob *blob;
struct spdk_blob_opts opts;
spdk_blob_id blobid;
uint32_t num_of_pages = 10;
uint8_t payload_read[num_of_pages * SPDK_BS_PAGE_SIZE];
uint8_t payload_write[num_of_pages * SPDK_BS_PAGE_SIZE];
uint8_t payload_zero[num_of_pages * SPDK_BS_PAGE_SIZE];
memset(payload_write, 0xE5, sizeof(payload_write));
memset(payload_read, 0x00, sizeof(payload_read));
memset(payload_zero, 0x00, sizeof(payload_zero));
dev = init_dev();
memset(g_dev_buffer, 0, DEV_BUFFER_SIZE);
/* Test freeze I/O during snapshot */
spdk_bs_init(dev, NULL, bs_op_with_handle_complete, NULL);
CU_ASSERT(g_bserrno == 0);
SPDK_CU_ASSERT_FATAL(g_bs != NULL);
bs = g_bs;
channel = spdk_bs_alloc_io_channel(bs);
bs_channel = spdk_io_channel_get_ctx(channel);
/* Create blob with 10 clusters */
spdk_blob_opts_init(&opts);
opts.num_clusters = 10;
opts.thin_provision = false;
spdk_bs_create_blob_ext(bs, &opts, blob_op_with_id_complete, NULL);
CU_ASSERT(g_bserrno == 0);
CU_ASSERT(g_blobid != SPDK_BLOBID_INVALID);
blobid = g_blobid;
spdk_bs_open_blob(bs, blobid, blob_op_with_handle_complete, NULL);
CU_ASSERT(g_bserrno == 0);
SPDK_CU_ASSERT_FATAL(g_blob != NULL);
blob = g_blob;
CU_ASSERT(spdk_blob_get_num_clusters(blob) == 10);
/* Enable explicitly calling callbacks. On each read/write to back device
* execution will stop and wait until _bs_flush_scheduler is called */
g_scheduler_delay = true;
spdk_bs_create_snapshot(bs, blobid, NULL, blob_op_with_id_complete, NULL);
/* This is implementation specific.
* Flag 'frozen_io' is set in _spdk_bs_snapshot_freeze_cpl callback.
* Four async I/O operations happen before that. */
_bs_flush_scheduler(4);
CU_ASSERT(TAILQ_EMPTY(&bs_channel->queued_io));
/* Blob I/O should be frozen here */
CU_ASSERT(blob->frozen_refcnt == 1);
/* Write to the blob */
spdk_blob_io_write(blob, channel, payload_write, 0, num_of_pages, blob_op_complete, NULL);
/* Verify that I/O is queued */
CU_ASSERT(!TAILQ_EMPTY(&bs_channel->queued_io));
/* Verify that payload is not written to disk */
CU_ASSERT(memcmp(payload_zero, &g_dev_buffer[blob->active.clusters[0]*SPDK_BS_PAGE_SIZE],
SPDK_BS_PAGE_SIZE) == 0);
/* Disable scheduler delay.
* Finish all operations including spdk_bs_create_snapshot */
g_scheduler_delay = false;
_bs_flush_scheduler(1);
/* Verify snapshot */
CU_ASSERT(g_bserrno == 0);
CU_ASSERT(g_blobid != SPDK_BLOBID_INVALID);
/* Verify that blob has unset frozen_io */
CU_ASSERT(blob->frozen_refcnt == 0);
/* Verify that postponed I/O completed successfully by comparing payload */
spdk_blob_io_read(blob, channel, payload_read, 0, num_of_pages, blob_op_complete, NULL);
CU_ASSERT(g_bserrno == 0);
CU_ASSERT(memcmp(payload_write, payload_read, num_of_pages * SPDK_BS_PAGE_SIZE) == 0);
spdk_blob_close(blob, blob_op_complete, NULL);
CU_ASSERT(g_bserrno == 0);
spdk_bs_free_io_channel(channel);
spdk_bs_unload(g_bs, bs_op_complete, NULL);
CU_ASSERT(g_bserrno == 0);
g_bs = NULL;
}
static void
blob_clone(void)
{
@ -4314,7 +4415,8 @@ int main(int argc, char **argv)
CU_add_test(suite, "blob_snapshot_rw", blob_snapshot_rw) == NULL ||
CU_add_test(suite, "blob_snapshot_rw_iov", blob_snapshot_rw_iov) == NULL ||
CU_add_test(suite, "blob_inflate_rw", blob_inflate_rw) == NULL ||
CU_add_test(suite, "blob_relations", blob_relations) == NULL
CU_add_test(suite, "blob_relations", blob_relations) == NULL ||
CU_add_test(suite, "blob_snapshot_freeze_io", blob_snapshot_freeze_io) == NULL
) {
CU_cleanup_registry();
return CU_get_error();