diff --git a/lib/blob/blobstore.c b/lib/blob/blobstore.c index 3989b7d4f..e5eed5f7b 100644 --- a/lib/blob/blobstore.c +++ b/lib/blob/blobstore.c @@ -606,7 +606,13 @@ _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob *bl /* This means that Extent RLE is present in MD, * both should never be at the same time. */ return -EINVAL; + } else if (blob->extent_table_found && + desc_extent_table->num_clusters != blob->num_clusters_in_et) { + /* Number of clusters in this ET does not match number + * from previously read EXTENT_TABLE. */ + return -EINVAL; } + blob->extent_table_found = true; if (desc_extent_table->length == 0 || @@ -690,6 +696,8 @@ _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob *bl return -EINVAL; } } + assert(blob->num_clusters_in_et >= cluster_count); + blob->num_clusters_in_et -= cluster_count; } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) { int rc; @@ -726,6 +734,22 @@ _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob *bl return 0; } +static bool _spdk_bs_load_cur_extent_page_valid(struct spdk_blob_md_page *page); + +static int +_spdk_blob_parse_extent_page(struct spdk_blob_md_page *extent_page, struct spdk_blob *blob) +{ + assert(blob != NULL); + assert(blob->state == SPDK_BLOB_STATE_LOADING); + assert(blob->active.clusters == NULL); + + if (_spdk_bs_load_cur_extent_page_valid(extent_page) == false) { + return -ENOENT; + } + + return _spdk_blob_parse_page(extent_page, blob); +} + static int _spdk_blob_parse(const struct spdk_blob_md_page *pages, uint32_t page_count, struct spdk_blob *blob) @@ -1186,6 +1210,7 @@ struct spdk_blob_load_ctx { struct spdk_blob_md_page *pages; uint32_t num_pages; + uint32_t next_extent_page; spdk_bs_sequence_t *seq; spdk_bs_sequence_cpl cb_fn; @@ -1275,6 +1300,90 @@ _spdk_blob_load_backing_dev(void *cb_arg) _spdk_blob_load_final(ctx, 0); } +static void +_spdk_blob_load_cpl_extents_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) +{ + struct spdk_blob_load_ctx *ctx = cb_arg; + struct spdk_blob *blob = ctx->blob; + struct spdk_blob_md_page *page; + uint64_t i; + uint32_t crc; + uint64_t lba; + void *tmp; + uint64_t sz; + + if (bserrno) { + SPDK_ERRLOG("Extent page read failed: %d\n", bserrno); + _spdk_blob_load_final(ctx, bserrno); + return; + } + + if (ctx->pages == NULL) { + /* First iteration of this function, allocate buffer for single EXTENT_PAGE */ + ctx->pages = spdk_zmalloc(SPDK_BS_PAGE_SIZE, SPDK_BS_PAGE_SIZE, NULL, SPDK_ENV_SOCKET_ID_ANY, + SPDK_MALLOC_DMA); + if (!ctx->pages) { + _spdk_blob_load_final(ctx, -ENOMEM); + return; + } + ctx->num_pages = 1; + ctx->next_extent_page = 0; + } else { + page = &ctx->pages[0]; + crc = _spdk_blob_md_page_calc_crc(page); + if (crc != page->crc) { + _spdk_blob_load_final(ctx, -EINVAL); + return; + } + + if (page->next != SPDK_INVALID_MD_PAGE) { + _spdk_blob_load_final(ctx, -EINVAL); + return; + } + + bserrno = _spdk_blob_parse_extent_page(page, blob); + if (bserrno) { + _spdk_blob_load_final(ctx, bserrno); + return; + } + } + + for (i = ctx->next_extent_page; i < blob->active.num_extent_pages; i++) { + if (blob->active.extent_pages[i] != 0) { + /* Extent page was allocated, read and parse it. */ + lba = _spdk_bs_md_page_to_lba(blob->bs, blob->active.extent_pages[i]); + ctx->next_extent_page = i + 1; + + spdk_bs_sequence_read_dev(seq, &ctx->pages[0], lba, + _spdk_bs_byte_to_lba(blob->bs, SPDK_BS_PAGE_SIZE), + _spdk_blob_load_cpl_extents_cpl, ctx); + return; + } else { + /* Thin provisioned blobs can point to unallocated extent pages. + * In this case blob size should be increased by up to the amount left in num_clusters_in_et. */ + + sz = spdk_min(blob->num_clusters_in_et, SPDK_EXTENTS_PER_EP); + blob->active.num_clusters += sz; + blob->num_clusters_in_et -= sz; + + assert(spdk_blob_is_thin_provisioned(blob)); + assert(i + 1 < blob->active.num_extent_pages || blob->num_clusters_in_et == 0); + + tmp = realloc(blob->active.clusters, blob->active.num_clusters * sizeof(*blob->active.clusters)); + if (tmp == NULL) { + _spdk_blob_load_final(ctx, -ENOMEM); + return; + } + memset(tmp + blob->active.cluster_array_size, 0, + sizeof(*blob->active.clusters) * (sz - blob->active.cluster_array_size)); + blob->active.clusters = tmp; + blob->active.cluster_array_size = blob->active.num_clusters; + } + } + + _spdk_blob_load_backing_dev(ctx); +} + static void _spdk_blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) { @@ -1337,14 +1446,19 @@ _spdk_blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) blob->use_extent_table = false; } - ctx->seq = seq; - /* Check the clear_method stored in metadata vs what may have been passed * via spdk_bs_open_blob_ext() and update accordingly. */ _spdk_blob_update_clear_method(blob); - _spdk_blob_load_backing_dev(ctx); + spdk_free(ctx->pages); + ctx->pages = NULL; + + if (blob->extent_table_found) { + _spdk_blob_load_cpl_extents_cpl(seq, ctx, 0); + } else { + _spdk_blob_load_backing_dev(ctx); + } } /* Load a blob from disk given a blobid */ @@ -3519,6 +3633,44 @@ _spdk_bs_load_replay_md_parse_page(const struct spdk_blob_md_page *page, struct return 0; } +static bool _spdk_bs_load_cur_extent_page_valid(struct spdk_blob_md_page *page) +{ + uint32_t crc; + struct spdk_blob_md_descriptor *desc = (struct spdk_blob_md_descriptor *)page->descriptors; + size_t desc_len; + + crc = _spdk_blob_md_page_calc_crc(page); + if (crc != page->crc) { + return false; + } + + /* Extent page should always be of sequence num 0. */ + if (page->sequence_num != 0) { + return false; + } + + /* Descriptor type must be EXTENT_PAGE. */ + if (desc->type != SPDK_MD_DESCRIPTOR_TYPE_EXTENT_PAGE) { + return false; + } + + /* Descriptor length cannot exceed the page. */ + desc_len = sizeof(*desc) + desc->length; + if (desc_len > sizeof(page->descriptors)) { + return false; + } + + /* It has to be the only descriptor in the page. */ + if (desc_len + sizeof(*desc) <= sizeof(page->descriptors)) { + desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + desc_len); + if (desc->length != 0) { + return false; + } + } + + return true; +} + static bool _spdk_bs_load_cur_md_page_valid(struct spdk_bs_load_ctx *ctx) { uint32_t crc;