diff --git a/lib/blob/blobstore.c b/lib/blob/blobstore.c index 9b91add42..be0580c81 100644 --- a/lib/blob/blobstore.c +++ b/lib/blob/blobstore.c @@ -73,6 +73,7 @@ bs_get_snapshot_entry(struct spdk_blob_store *bs, spdk_blob_id blobid) static void bs_claim_md_page(struct spdk_blob_store *bs, uint32_t page) { + assert(spdk_spin_held(&bs->used_lock)); assert(page < spdk_bit_array_capacity(bs->used_md_pages)); assert(spdk_bit_array_get(bs->used_md_pages, page) == false); @@ -82,6 +83,7 @@ bs_claim_md_page(struct spdk_blob_store *bs, uint32_t page) static void bs_release_md_page(struct spdk_blob_store *bs, uint32_t page) { + assert(spdk_spin_held(&bs->used_lock)); assert(page < spdk_bit_array_capacity(bs->used_md_pages)); assert(spdk_bit_array_get(bs->used_md_pages, page) == true); @@ -93,6 +95,8 @@ bs_claim_cluster(struct spdk_blob_store *bs) { uint32_t cluster_num; + assert(spdk_spin_held(&bs->used_lock)); + cluster_num = spdk_bit_pool_allocate_bit(bs->used_clusters); if (cluster_num == UINT32_MAX) { return UINT32_MAX; @@ -107,6 +111,7 @@ bs_claim_cluster(struct spdk_blob_store *bs) static void bs_release_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) { + assert(spdk_spin_held(&bs->used_lock)); assert(cluster_num < spdk_bit_pool_capacity(bs->used_clusters)); assert(spdk_bit_pool_is_allocated(bs->used_clusters, cluster_num) == true); assert(bs->num_free_clusters < bs->total_clusters); @@ -138,6 +143,8 @@ bs_allocate_cluster(struct spdk_blob *blob, uint32_t cluster_num, { uint32_t *extent_page = 0; + assert(spdk_spin_held(&blob->bs->used_lock)); + *cluster = bs_claim_cluster(blob->bs); if (*cluster == UINT32_MAX) { /* No more free clusters. Cannot satisfy the request */ @@ -1666,6 +1673,8 @@ blob_persist_clear_extents_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrn return; } + spdk_spin_lock(&bs->used_lock); + /* Release all extent_pages that were truncated */ for (i = blob->active.num_extent_pages; i < blob->active.extent_pages_array_size; i++) { /* Nothing to release if it was not allocated */ @@ -1674,6 +1683,8 @@ blob_persist_clear_extents_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrn } } + spdk_spin_unlock(&bs->used_lock); + if (blob->active.num_extent_pages == 0) { free(blob->active.extent_pages); blob->active.extent_pages = NULL; @@ -1832,6 +1843,8 @@ blob_persist_zero_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) return; } + spdk_spin_lock(&bs->used_lock); + /* This loop starts at 1 because the first page is special and handled * below. The pages (except the first) are never written in place, * so any pages in the clean list must be zeroed. @@ -1847,6 +1860,8 @@ blob_persist_zero_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) bs_release_md_page(bs, page_num); } + spdk_spin_unlock(&bs->used_lock); + /* Move on to clearing clusters */ blob_persist_clear_clusters(seq, ctx); } @@ -1999,8 +2014,14 @@ blob_resize(struct spdk_blob *blob, uint64_t sz) current_num_ep = spdk_divide_round_up(num_clusters, SPDK_EXTENTS_PER_EP); } - /* Check first that we have enough clusters and md pages before we start claiming them. */ + assert(!spdk_spin_held(&bs->used_lock)); + + /* Check first that we have enough clusters and md pages before we start claiming them. + * bs->used_lock is held to ensure that clusters we think are free are still free when we go + * to claim them later in this function. + */ if (sz > num_clusters && spdk_blob_is_thin_provisioned(blob) == false) { + spdk_spin_lock(&bs->used_lock); if ((sz - num_clusters) > bs->num_free_clusters) { rc = -ENOSPC; goto out; @@ -2049,12 +2070,10 @@ blob_resize(struct spdk_blob *blob, uint64_t sz) if (spdk_blob_is_thin_provisioned(blob) == false) { cluster = 0; lfmd = 0; - spdk_spin_lock(&blob->bs->used_lock); for (i = num_clusters; i < sz; i++) { bs_allocate_cluster(blob, i, &cluster, &lfmd, true); lfmd++; } - spdk_spin_unlock(&blob->bs->used_lock); } blob->active.num_clusters = sz; @@ -2062,6 +2081,10 @@ blob_resize(struct spdk_blob *blob, uint64_t sz) rc = 0; out: + if (spdk_spin_held(&bs->used_lock)) { + spdk_spin_unlock(&bs->used_lock); + } + return rc; } @@ -2093,14 +2116,17 @@ blob_persist_generate_new_md(struct spdk_blob_persist_ctx *ctx) } blob->active.pages = tmp; - /* Assign this metadata to pages. This requires two passes - - * one to verify that there are enough pages and a second - * to actually claim them. */ + /* Assign this metadata to pages. This requires two passes - one to verify that there are + * enough pages and a second to actually claim them. The used_lock is held across + * both passes to ensure things don't change in the middle. + */ + spdk_spin_lock(&bs->used_lock); page_num = 0; /* Note that this loop starts at one. The first page location is fixed by the blobid. */ for (i = 1; i < blob->active.num_pages; i++) { page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); if (page_num == UINT32_MAX) { + spdk_spin_unlock(&bs->used_lock); blob_persist_complete(seq, ctx, -ENOMEM); return; } @@ -2119,6 +2145,7 @@ blob_persist_generate_new_md(struct spdk_blob_persist_ctx *ctx) SPDK_DEBUGLOG(blob, "Claiming page %u for blob %" PRIu64 "\n", page_num, blob->id); page_num++; } + spdk_spin_unlock(&bs->used_lock); ctx->pages[i - 1].crc = blob_md_page_calc_crc(&ctx->pages[i - 1]); /* Start writing the metadata from last page to first */ blob->state = SPDK_BLOB_STATE_CLEAN; @@ -2351,10 +2378,10 @@ blob_insert_cluster_cpl(void *cb_arg, int bserrno) } spdk_spin_lock(&ctx->blob->bs->used_lock); bs_release_cluster(ctx->blob->bs, ctx->new_cluster); - spdk_spin_unlock(&ctx->blob->bs->used_lock); if (ctx->new_extent_page != 0) { bs_release_md_page(ctx->blob->bs, ctx->new_extent_page); } + spdk_spin_unlock(&ctx->blob->bs->used_lock); } bs_sequence_finish(ctx->seq, bserrno); @@ -4341,7 +4368,9 @@ bs_load_replay_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) page = ctx->page; if (bs_load_cur_md_page_valid(ctx) == true) { if (page->sequence_num == 0 || ctx->in_page_chain == true) { + spdk_spin_lock(&ctx->bs->used_lock); bs_claim_md_page(ctx->bs, page_num); + spdk_spin_unlock(&ctx->bs->used_lock); if (page->sequence_num == 0) { SPDK_NOTICELOG("Recover: blob %" PRIu32 "\n", page_num); spdk_bit_array_set(ctx->bs->used_blobids, page_num); @@ -5667,8 +5696,10 @@ bs_create_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) uint32_t page_idx = bs_blobid_to_page(blob->id); if (bserrno != 0) { + spdk_spin_lock(&blob->bs->used_lock); spdk_bit_array_clear(blob->bs->used_blobids, page_idx); bs_release_md_page(blob->bs, page_idx); + spdk_spin_unlock(&blob->bs->used_lock); } blob_free(blob); @@ -5748,13 +5779,16 @@ bs_create_blob(struct spdk_blob_store *bs, assert(spdk_get_thread() == bs->md_thread); + spdk_spin_lock(&bs->used_lock); page_idx = spdk_bit_array_find_first_clear(bs->used_md_pages, 0); if (page_idx == UINT32_MAX) { + spdk_spin_unlock(&bs->used_lock); cb_fn(cb_arg, 0, -ENOMEM); return; } spdk_bit_array_set(bs->used_blobids, page_idx); bs_claim_md_page(bs, page_idx); + spdk_spin_unlock(&bs->used_lock); id = bs_page_to_blobid(page_idx); @@ -5819,8 +5853,10 @@ error: if (blob != NULL) { blob_free(blob); } + spdk_spin_lock(&bs->used_lock); spdk_bit_array_clear(bs->used_blobids, page_idx); bs_release_md_page(bs, page_idx); + spdk_spin_unlock(&bs->used_lock); cb_fn(cb_arg, 0, rc); } @@ -7528,8 +7564,10 @@ blob_insert_cluster_msg(void *arg) * different cluster in the same extent page. In such case proceed with * updating the existing extent page, but release the additional one. */ if (ctx->extent_page != 0) { + spdk_spin_lock(&ctx->blob->bs->used_lock); assert(spdk_bit_array_get(ctx->blob->bs->used_md_pages, ctx->extent_page) == true); bs_release_md_page(ctx->blob->bs, ctx->extent_page); + spdk_spin_unlock(&ctx->blob->bs->used_lock); ctx->extent_page = 0; } /* Extent page already allocated. diff --git a/test/unit/lib/blob/blob.c/blob_ut.c b/test/unit/lib/blob/blob.c/blob_ut.c index 5119bd1c4..015dd3d16 100644 --- a/test/unit/lib/blob/blob.c/blob_ut.c +++ b/test/unit/lib/blob/blob.c/blob_ut.c @@ -4028,8 +4028,10 @@ blob_insert_cluster_msg_test(void) /* Specify cluster_num to allocate and new_cluster will be returned to insert on md_thread. * This is to simulate behaviour when cluster is allocated after blob creation. * Such as _spdk_bs_allocate_and_copy_cluster(). */ + spdk_spin_lock(&bs->used_lock); bs_allocate_cluster(blob, cluster_num, &new_cluster, &extent_page, false); CU_ASSERT(blob->active.clusters[cluster_num] == 0); + spdk_spin_unlock(&bs->used_lock); blob_insert_cluster_on_md_thread(blob, cluster_num, new_cluster, extent_page, &page, blob_op_complete, NULL);