lib/blob: read extents during blob load

When EXTENT_TABLE descriptor is found when parsing metadata
that means there can be extent pages to read.

If extent page was not allocated, number of clusters can be
increased depending on the num_clusters_in_et.
Unallocated extent page contains either SPDK_EXTENTS_PER_EP
or remainder of num_clusters_in_et worth of clusters.
Depending which is less.

Added decreasing fo num_clusters_in_et to parsing
extent pages as well.

While here, remove ctx->seq = seq assignment as that is
done at beginning of blob load.

Signed-off-by: Tomasz Zawadzki <tomasz.zawadzki@intel.com>
Change-Id: I57f54634b908ffb406f3e91e15841b7f36fd6de6
Reviewed-on: https://review.gerrithub.io/c/spdk/spdk/+/476429
Reviewed-by: Ben Walker <benjamin.walker@intel.com>
Reviewed-by: Jim Harris <james.r.harris@intel.com>
Reviewed-by: Paul Luse <paul.e.luse@intel.com>
Tested-by: SPDK CI Jenkins <sys_sgci@intel.com>
This commit is contained in:
Tomasz Zawadzki 2020-01-23 10:47:29 -05:00
parent d1f863ca57
commit b5e993483f

View File

@ -606,7 +606,13 @@ _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob *bl
/* This means that Extent RLE is present in MD,
* both should never be at the same time. */
return -EINVAL;
} else if (blob->extent_table_found &&
desc_extent_table->num_clusters != blob->num_clusters_in_et) {
/* Number of clusters in this ET does not match number
* from previously read EXTENT_TABLE. */
return -EINVAL;
}
blob->extent_table_found = true;
if (desc_extent_table->length == 0 ||
@ -690,6 +696,8 @@ _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob *bl
return -EINVAL;
}
}
assert(blob->num_clusters_in_et >= cluster_count);
blob->num_clusters_in_et -= cluster_count;
} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) {
int rc;
@ -726,6 +734,22 @@ _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob *bl
return 0;
}
static bool _spdk_bs_load_cur_extent_page_valid(struct spdk_blob_md_page *page);
static int
_spdk_blob_parse_extent_page(struct spdk_blob_md_page *extent_page, struct spdk_blob *blob)
{
assert(blob != NULL);
assert(blob->state == SPDK_BLOB_STATE_LOADING);
assert(blob->active.clusters == NULL);
if (_spdk_bs_load_cur_extent_page_valid(extent_page) == false) {
return -ENOENT;
}
return _spdk_blob_parse_page(extent_page, blob);
}
static int
_spdk_blob_parse(const struct spdk_blob_md_page *pages, uint32_t page_count,
struct spdk_blob *blob)
@ -1186,6 +1210,7 @@ struct spdk_blob_load_ctx {
struct spdk_blob_md_page *pages;
uint32_t num_pages;
uint32_t next_extent_page;
spdk_bs_sequence_t *seq;
spdk_bs_sequence_cpl cb_fn;
@ -1275,6 +1300,90 @@ _spdk_blob_load_backing_dev(void *cb_arg)
_spdk_blob_load_final(ctx, 0);
}
static void
_spdk_blob_load_cpl_extents_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
{
struct spdk_blob_load_ctx *ctx = cb_arg;
struct spdk_blob *blob = ctx->blob;
struct spdk_blob_md_page *page;
uint64_t i;
uint32_t crc;
uint64_t lba;
void *tmp;
uint64_t sz;
if (bserrno) {
SPDK_ERRLOG("Extent page read failed: %d\n", bserrno);
_spdk_blob_load_final(ctx, bserrno);
return;
}
if (ctx->pages == NULL) {
/* First iteration of this function, allocate buffer for single EXTENT_PAGE */
ctx->pages = spdk_zmalloc(SPDK_BS_PAGE_SIZE, SPDK_BS_PAGE_SIZE, NULL, SPDK_ENV_SOCKET_ID_ANY,
SPDK_MALLOC_DMA);
if (!ctx->pages) {
_spdk_blob_load_final(ctx, -ENOMEM);
return;
}
ctx->num_pages = 1;
ctx->next_extent_page = 0;
} else {
page = &ctx->pages[0];
crc = _spdk_blob_md_page_calc_crc(page);
if (crc != page->crc) {
_spdk_blob_load_final(ctx, -EINVAL);
return;
}
if (page->next != SPDK_INVALID_MD_PAGE) {
_spdk_blob_load_final(ctx, -EINVAL);
return;
}
bserrno = _spdk_blob_parse_extent_page(page, blob);
if (bserrno) {
_spdk_blob_load_final(ctx, bserrno);
return;
}
}
for (i = ctx->next_extent_page; i < blob->active.num_extent_pages; i++) {
if (blob->active.extent_pages[i] != 0) {
/* Extent page was allocated, read and parse it. */
lba = _spdk_bs_md_page_to_lba(blob->bs, blob->active.extent_pages[i]);
ctx->next_extent_page = i + 1;
spdk_bs_sequence_read_dev(seq, &ctx->pages[0], lba,
_spdk_bs_byte_to_lba(blob->bs, SPDK_BS_PAGE_SIZE),
_spdk_blob_load_cpl_extents_cpl, ctx);
return;
} else {
/* Thin provisioned blobs can point to unallocated extent pages.
* In this case blob size should be increased by up to the amount left in num_clusters_in_et. */
sz = spdk_min(blob->num_clusters_in_et, SPDK_EXTENTS_PER_EP);
blob->active.num_clusters += sz;
blob->num_clusters_in_et -= sz;
assert(spdk_blob_is_thin_provisioned(blob));
assert(i + 1 < blob->active.num_extent_pages || blob->num_clusters_in_et == 0);
tmp = realloc(blob->active.clusters, blob->active.num_clusters * sizeof(*blob->active.clusters));
if (tmp == NULL) {
_spdk_blob_load_final(ctx, -ENOMEM);
return;
}
memset(tmp + blob->active.cluster_array_size, 0,
sizeof(*blob->active.clusters) * (sz - blob->active.cluster_array_size));
blob->active.clusters = tmp;
blob->active.cluster_array_size = blob->active.num_clusters;
}
}
_spdk_blob_load_backing_dev(ctx);
}
static void
_spdk_blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
{
@ -1337,14 +1446,19 @@ _spdk_blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
blob->use_extent_table = false;
}
ctx->seq = seq;
/* Check the clear_method stored in metadata vs what may have been passed
* via spdk_bs_open_blob_ext() and update accordingly.
*/
_spdk_blob_update_clear_method(blob);
_spdk_blob_load_backing_dev(ctx);
spdk_free(ctx->pages);
ctx->pages = NULL;
if (blob->extent_table_found) {
_spdk_blob_load_cpl_extents_cpl(seq, ctx, 0);
} else {
_spdk_blob_load_backing_dev(ctx);
}
}
/* Load a blob from disk given a blobid */
@ -3519,6 +3633,44 @@ _spdk_bs_load_replay_md_parse_page(const struct spdk_blob_md_page *page, struct
return 0;
}
static bool _spdk_bs_load_cur_extent_page_valid(struct spdk_blob_md_page *page)
{
uint32_t crc;
struct spdk_blob_md_descriptor *desc = (struct spdk_blob_md_descriptor *)page->descriptors;
size_t desc_len;
crc = _spdk_blob_md_page_calc_crc(page);
if (crc != page->crc) {
return false;
}
/* Extent page should always be of sequence num 0. */
if (page->sequence_num != 0) {
return false;
}
/* Descriptor type must be EXTENT_PAGE. */
if (desc->type != SPDK_MD_DESCRIPTOR_TYPE_EXTENT_PAGE) {
return false;
}
/* Descriptor length cannot exceed the page. */
desc_len = sizeof(*desc) + desc->length;
if (desc_len > sizeof(page->descriptors)) {
return false;
}
/* It has to be the only descriptor in the page. */
if (desc_len + sizeof(*desc) <= sizeof(page->descriptors)) {
desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + desc_len);
if (desc->length != 0) {
return false;
}
}
return true;
}
static bool _spdk_bs_load_cur_md_page_valid(struct spdk_bs_load_ctx *ctx)
{
uint32_t crc;