diff --git a/lib/blob/blobstore.c b/lib/blob/blobstore.c index 3835ce222..a1f7625cc 100644 --- a/lib/blob/blobstore.c +++ b/lib/blob/blobstore.c @@ -222,6 +222,105 @@ _spdk_blob_free(struct spdk_blob *blob) free(blob); } +struct freeze_io_ctx { + struct spdk_bs_cpl cpl; + struct spdk_blob *blob; +}; + +static void +_spdk_blob_io_sync(struct spdk_io_channel_iter *i) +{ + spdk_for_each_channel_continue(i, 0); +} + +static void +_spdk_blob_execute_queued_io(struct spdk_io_channel_iter *i) +{ + struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); + struct spdk_bs_channel *ch = spdk_io_channel_get_ctx(_ch); + struct freeze_io_ctx *ctx = spdk_io_channel_iter_get_ctx(i); + struct spdk_bs_request_set *set; + struct spdk_bs_user_op_args *args; + spdk_bs_user_op_t *op, *tmp; + + TAILQ_FOREACH_SAFE(op, &ch->queued_io, link, tmp) { + set = (struct spdk_bs_request_set *)op; + args = &set->u.user_op; + + if (args->blob == ctx->blob) { + TAILQ_REMOVE(&ch->queued_io, op, link); + spdk_bs_user_op_execute(op); + } + } + + spdk_for_each_channel_continue(i, 0); +} + +static void +_spdk_blob_io_cpl(struct spdk_io_channel_iter *i, int status) +{ + struct freeze_io_ctx *ctx = spdk_io_channel_iter_get_ctx(i); + + ctx->cpl.u.blob_basic.cb_fn(ctx->cpl.u.blob_basic.cb_arg, 0); + + free(ctx); +} + +static void +_spdk_blob_freeze_io(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg) +{ + struct freeze_io_ctx *ctx; + + ctx = calloc(1, sizeof(*ctx)); + if (!ctx) { + cb_fn(cb_arg, -ENOMEM); + return; + } + + ctx->cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; + ctx->cpl.u.blob_basic.cb_fn = cb_fn; + ctx->cpl.u.blob_basic.cb_arg = cb_arg; + ctx->blob = blob; + + /* Freeze I/O on blob */ + blob->frozen_refcnt++; + + if (blob->frozen_refcnt == 1) { + spdk_for_each_channel(blob->bs, _spdk_blob_io_sync, ctx, _spdk_blob_io_cpl); + } else { + cb_fn(cb_arg, 0); + free(ctx); + } +} + +static void +_spdk_blob_unfreeze_io(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg) +{ + struct freeze_io_ctx *ctx; + + ctx = calloc(1, sizeof(*ctx)); + if (!ctx) { + cb_fn(cb_arg, -ENOMEM); + return; + } + + ctx->cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; + ctx->cpl.u.blob_basic.cb_fn = cb_fn; + ctx->cpl.u.blob_basic.cb_arg = cb_arg; + ctx->blob = blob; + + assert(blob->frozen_refcnt > 0); + + blob->frozen_refcnt--; + + if (blob->frozen_refcnt == 0) { + spdk_for_each_channel(blob->bs, _spdk_blob_execute_queued_io, ctx, _spdk_blob_io_cpl); + } else { + cb_fn(cb_arg, 0); + free(ctx); + } +} + static int _spdk_blob_mark_clean(struct spdk_blob *blob) { @@ -1716,6 +1815,22 @@ _spdk_blob_request_submit_op_single(struct spdk_io_channel *_ch, struct spdk_blo _spdk_blob_calculate_lba_and_lba_count(blob, offset, length, &lba, &lba_count); + if (blob->frozen_refcnt) { + /* This blob I/O is frozen */ + spdk_bs_user_op_t *op; + struct spdk_bs_channel *bs_channel = spdk_io_channel_get_ctx(_ch); + + op = spdk_bs_user_op_alloc(_ch, &cpl, op_type, blob, payload, 0, offset, length); + if (!op) { + cb_fn(cb_arg, -ENOMEM); + return; + } + + TAILQ_INSERT_TAIL(&bs_channel->queued_io, op, link); + + return; + } + switch (op_type) { case SPDK_BLOB_READ: { spdk_bs_batch_t *batch; @@ -1963,6 +2078,21 @@ _spdk_blob_request_submit_rw_iov(struct spdk_blob *blob, struct spdk_io_channel cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; cpl.u.blob_basic.cb_fn = cb_fn; cpl.u.blob_basic.cb_arg = cb_arg; + if (blob->frozen_refcnt) { + /* This blob I/O is frozen */ + spdk_bs_user_op_t *op; + struct spdk_bs_channel *bs_channel = spdk_io_channel_get_ctx(_channel); + + op = spdk_bs_user_op_alloc(_channel, &cpl, read, blob, iov, iovcnt, offset, length); + if (!op) { + cb_fn(cb_arg, -ENOMEM); + return; + } + + TAILQ_INSERT_TAIL(&bs_channel->queued_io, op, link); + + return; + } if (read) { spdk_bs_sequence_t *seq; @@ -2074,6 +2204,7 @@ _spdk_bs_channel_create(void *io_device, void *ctx_buf) } TAILQ_INIT(&channel->need_cluster_alloc); + TAILQ_INIT(&channel->queued_io); return 0; } @@ -2090,6 +2221,12 @@ _spdk_bs_channel_destroy(void *io_device, void *ctx_buf) spdk_bs_user_op_abort(op); } + while (!TAILQ_EMPTY(&channel->queued_io)) { + op = TAILQ_FIRST(&channel->queued_io); + TAILQ_REMOVE(&channel->queued_io, op, link); + spdk_bs_user_op_abort(op); + } + free(channel->req_mem); channel->dev->destroy_channel(channel->dev, channel->dev_channel); } @@ -3757,6 +3894,7 @@ void spdk_bs_create_blob_ext(struct spdk_blob_store *bs, const struct spdk_blob_ struct spdk_clone_snapshot_ctx { struct spdk_bs_cpl cpl; int bserrno; + bool frozen; struct spdk_io_channel *channel; @@ -3806,6 +3944,24 @@ _spdk_bs_clone_snapshot_cleanup_finish(void *cb_arg, int bserrno) free(ctx); } +static void +_spdk_bs_snapshot_unfreeze_cpl(void *cb_arg, int bserrno) +{ + struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; + struct spdk_blob *origblob = ctx->original.blob; + + if (bserrno != 0) { + if (ctx->bserrno != 0) { + SPDK_ERRLOG("Unfreeze error %d\n", bserrno); + } else { + ctx->bserrno = bserrno; + } + } + + ctx->original.id = origblob->id; + spdk_blob_close(origblob, _spdk_bs_clone_snapshot_cleanup_finish, ctx); +} + static void _spdk_bs_clone_snapshot_origblob_cleanup(void *cb_arg, int bserrno) { @@ -3820,8 +3976,13 @@ _spdk_bs_clone_snapshot_origblob_cleanup(void *cb_arg, int bserrno) } } - ctx->original.id = origblob->id; - spdk_blob_close(origblob, _spdk_bs_clone_snapshot_cleanup_finish, ctx); + if (ctx->frozen) { + /* Unfreeze any outstanding I/O */ + _spdk_blob_unfreeze_io(origblob, _spdk_bs_snapshot_unfreeze_cpl, ctx); + } else { + _spdk_bs_snapshot_unfreeze_cpl(ctx, 0); + } + } static void @@ -3912,6 +4073,28 @@ _spdk_bs_snapshot_newblob_sync_cpl(void *cb_arg, int bserrno) spdk_blob_sync_md(origblob, _spdk_bs_snapshot_origblob_sync_cpl, ctx); } +static void +_spdk_bs_snapshot_freeze_cpl(void *cb_arg, int rc) +{ + struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; + struct spdk_blob *origblob = ctx->original.blob; + struct spdk_blob *newblob = ctx->new.blob; + + ctx->frozen = true; + + /* set new back_bs_dev for snapshot */ + newblob->back_bs_dev = origblob->back_bs_dev; + /* Set invalid flags from origblob */ + newblob->invalid_flags = origblob->invalid_flags; + + /* Copy cluster map to snapshot */ + memcpy(newblob->active.clusters, origblob->active.clusters, + origblob->active.num_clusters * sizeof(origblob->active.clusters)); + + /* sync snapshot metadata */ + spdk_blob_sync_md(newblob, _spdk_bs_snapshot_newblob_sync_cpl, ctx); +} + static void _spdk_bs_snapshot_newblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno) { @@ -3926,17 +4109,7 @@ _spdk_bs_snapshot_newblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bs ctx->new.blob = newblob; - /* set new back_bs_dev for snapshot */ - newblob->back_bs_dev = origblob->back_bs_dev; - /* Set invalid flags from origblob */ - newblob->invalid_flags = origblob->invalid_flags; - - /* Copy cluster map to snapshot */ - memcpy(newblob->active.clusters, origblob->active.clusters, - origblob->active.num_clusters * sizeof(origblob->active.clusters)); - - /* sync snapshot metadata */ - spdk_blob_sync_md(newblob, _spdk_bs_snapshot_newblob_sync_cpl, ctx); + _spdk_blob_freeze_io(origblob, _spdk_bs_snapshot_freeze_cpl, ctx); } static void @@ -4027,6 +4200,7 @@ void spdk_bs_create_snapshot(struct spdk_blob_store *bs, spdk_blob_id blobid, ctx->cpl.u.blobid.cb_arg = cb_arg; ctx->cpl.u.blobid.blobid = SPDK_BLOBID_INVALID; ctx->bserrno = 0; + ctx->frozen = false; ctx->original.id = blobid; ctx->xattrs = snapshot_xattrs; diff --git a/lib/blob/blobstore.h b/lib/blob/blobstore.h index 633c4a049..b2e3cd6ea 100644 --- a/lib/blob/blobstore.h +++ b/lib/blob/blobstore.h @@ -147,6 +147,8 @@ struct spdk_blob { struct spdk_xattr_tailq xattrs_internal; TAILQ_ENTRY(spdk_blob) link; + + uint32_t frozen_refcnt; }; struct spdk_blob_store { @@ -194,6 +196,7 @@ struct spdk_bs_channel { struct spdk_io_channel *dev_channel; TAILQ_HEAD(, spdk_bs_request_set) need_cluster_alloc; + TAILQ_HEAD(, spdk_bs_request_set) queued_io; }; /** operation type */ diff --git a/test/unit/lib/blob/blob.c/blob_ut.c b/test/unit/lib/blob/blob.c/blob_ut.c index 38cc819dd..ad6462c43 100644 --- a/test/unit/lib/blob/blob.c/blob_ut.c +++ b/test/unit/lib/blob/blob.c/blob_ut.c @@ -635,6 +635,107 @@ blob_snapshot(void) g_bs = NULL; } +static void +blob_snapshot_freeze_io(void) +{ + struct spdk_io_channel *channel; + struct spdk_bs_channel *bs_channel; + struct spdk_blob_store *bs; + struct spdk_bs_dev *dev; + struct spdk_blob *blob; + struct spdk_blob_opts opts; + spdk_blob_id blobid; + uint32_t num_of_pages = 10; + uint8_t payload_read[num_of_pages * SPDK_BS_PAGE_SIZE]; + uint8_t payload_write[num_of_pages * SPDK_BS_PAGE_SIZE]; + uint8_t payload_zero[num_of_pages * SPDK_BS_PAGE_SIZE]; + + memset(payload_write, 0xE5, sizeof(payload_write)); + memset(payload_read, 0x00, sizeof(payload_read)); + memset(payload_zero, 0x00, sizeof(payload_zero)); + + dev = init_dev(); + memset(g_dev_buffer, 0, DEV_BUFFER_SIZE); + + /* Test freeze I/O during snapshot */ + + spdk_bs_init(dev, NULL, bs_op_with_handle_complete, NULL); + CU_ASSERT(g_bserrno == 0); + SPDK_CU_ASSERT_FATAL(g_bs != NULL); + bs = g_bs; + + channel = spdk_bs_alloc_io_channel(bs); + bs_channel = spdk_io_channel_get_ctx(channel); + + /* Create blob with 10 clusters */ + spdk_blob_opts_init(&opts); + opts.num_clusters = 10; + opts.thin_provision = false; + + spdk_bs_create_blob_ext(bs, &opts, blob_op_with_id_complete, NULL); + CU_ASSERT(g_bserrno == 0); + CU_ASSERT(g_blobid != SPDK_BLOBID_INVALID); + blobid = g_blobid; + + spdk_bs_open_blob(bs, blobid, blob_op_with_handle_complete, NULL); + CU_ASSERT(g_bserrno == 0); + SPDK_CU_ASSERT_FATAL(g_blob != NULL); + blob = g_blob; + CU_ASSERT(spdk_blob_get_num_clusters(blob) == 10); + + /* Enable explicitly calling callbacks. On each read/write to back device + * execution will stop and wait until _bs_flush_scheduler is called */ + g_scheduler_delay = true; + + spdk_bs_create_snapshot(bs, blobid, NULL, blob_op_with_id_complete, NULL); + + /* This is implementation specific. + * Flag 'frozen_io' is set in _spdk_bs_snapshot_freeze_cpl callback. + * Four async I/O operations happen before that. */ + + _bs_flush_scheduler(4); + + CU_ASSERT(TAILQ_EMPTY(&bs_channel->queued_io)); + + /* Blob I/O should be frozen here */ + CU_ASSERT(blob->frozen_refcnt == 1); + + /* Write to the blob */ + spdk_blob_io_write(blob, channel, payload_write, 0, num_of_pages, blob_op_complete, NULL); + + /* Verify that I/O is queued */ + CU_ASSERT(!TAILQ_EMPTY(&bs_channel->queued_io)); + /* Verify that payload is not written to disk */ + CU_ASSERT(memcmp(payload_zero, &g_dev_buffer[blob->active.clusters[0]*SPDK_BS_PAGE_SIZE], + SPDK_BS_PAGE_SIZE) == 0); + + /* Disable scheduler delay. + * Finish all operations including spdk_bs_create_snapshot */ + g_scheduler_delay = false; + _bs_flush_scheduler(1); + + /* Verify snapshot */ + CU_ASSERT(g_bserrno == 0); + CU_ASSERT(g_blobid != SPDK_BLOBID_INVALID); + + /* Verify that blob has unset frozen_io */ + CU_ASSERT(blob->frozen_refcnt == 0); + + /* Verify that postponed I/O completed successfully by comparing payload */ + spdk_blob_io_read(blob, channel, payload_read, 0, num_of_pages, blob_op_complete, NULL); + CU_ASSERT(g_bserrno == 0); + CU_ASSERT(memcmp(payload_write, payload_read, num_of_pages * SPDK_BS_PAGE_SIZE) == 0); + + spdk_blob_close(blob, blob_op_complete, NULL); + CU_ASSERT(g_bserrno == 0); + + spdk_bs_free_io_channel(channel); + + spdk_bs_unload(g_bs, bs_op_complete, NULL); + CU_ASSERT(g_bserrno == 0); + g_bs = NULL; +} + static void blob_clone(void) { @@ -4314,7 +4415,8 @@ int main(int argc, char **argv) CU_add_test(suite, "blob_snapshot_rw", blob_snapshot_rw) == NULL || CU_add_test(suite, "blob_snapshot_rw_iov", blob_snapshot_rw_iov) == NULL || CU_add_test(suite, "blob_inflate_rw", blob_inflate_rw) == NULL || - CU_add_test(suite, "blob_relations", blob_relations) == NULL + CU_add_test(suite, "blob_relations", blob_relations) == NULL || + CU_add_test(suite, "blob_snapshot_freeze_io", blob_snapshot_freeze_io) == NULL ) { CU_cleanup_registry(); return CU_get_error();