blob: handle overlapping open case

We only create one spdk_blob object for a given blob, and just
increase the ref_count if it is opened multiple times.  bs_open_blob
would do the lookup for existing opened blobs.

But if the blob is opened again, before the previous open operation
has completed, we would end up with two spdk_blob objects for the same
blob.

Solution is to do another lookup when the open operation completes.
If we find the blob, free the one we just finished opening and return
the existing one instead.

Also added unit test that failed on the existing code but passes now
with this patch.

Signed-off-by: Jim Harris <james.r.harris@intel.com>
Reported-by: Mike Cui
Change-Id: I00c3a913b413deddf06f0b63f7a669efb2b5658f

Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/3855
Reviewed-by: Tomasz Zawadzki <tomasz.zawadzki@intel.com>
Reviewed-by: Shuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
Reviewed-by: Ben Walker <benjamin.walker@intel.com>
Community-CI: Mellanox Build Bot
Tested-by: SPDK CI Jenkins <sys_sgci@intel.com>
This commit is contained in:
Jim Harris 2020-08-19 12:11:41 -07:00 committed by Tomasz Zawadzki
parent 361cddfd63
commit ed7848f2df
2 changed files with 49 additions and 3 deletions

View File

@ -6648,6 +6648,7 @@ static void
bs_open_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) bs_open_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
{ {
struct spdk_blob *blob = cb_arg; struct spdk_blob *blob = cb_arg;
struct spdk_blob *existing;
if (bserrno != 0) { if (bserrno != 0) {
blob_free(blob); blob_free(blob);
@ -6656,6 +6657,15 @@ bs_open_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
return; return;
} }
existing = blob_lookup(blob->bs, blob->id);
if (existing) {
blob_free(blob);
existing->open_ref++;
seq->cpl.u.blob_handle.blob = existing;
bs_sequence_finish(seq, 0);
return;
}
blob->open_ref++; blob->open_ref++;
spdk_bit_array_set(blob->bs->open_blobids, blob->id); spdk_bit_array_set(blob->bs->open_blobids, blob->id);

View File

@ -47,8 +47,8 @@
struct spdk_blob_store *g_bs; struct spdk_blob_store *g_bs;
spdk_blob_id g_blobid; spdk_blob_id g_blobid;
struct spdk_blob *g_blob; struct spdk_blob *g_blob, *g_blob2;
int g_bserrno; int g_bserrno, g_bserrno2;
struct spdk_xattr_names *g_names; struct spdk_xattr_names *g_names;
int g_done; int g_done;
char *g_xattr_names[] = {"first", "second", "third"}; char *g_xattr_names[] = {"first", "second", "third"};
@ -170,6 +170,18 @@ blob_op_with_handle_complete(void *cb_arg, struct spdk_blob *blb, int bserrno)
g_bserrno = bserrno; g_bserrno = bserrno;
} }
static void
blob_op_with_handle_complete2(void *cb_arg, struct spdk_blob *blob, int bserrno)
{
if (g_blob == NULL) {
g_blob = blob;
g_bserrno = bserrno;
} else {
g_blob2 = blob;
g_bserrno2 = bserrno;
}
}
static void static void
ut_bs_reload(struct spdk_blob_store **bs, struct spdk_bs_opts *opts) ut_bs_reload(struct spdk_blob_store **bs, struct spdk_bs_opts *opts)
{ {
@ -322,8 +334,32 @@ blob_open(void)
CU_ASSERT(g_bserrno == 0); CU_ASSERT(g_bserrno == 0);
CU_ASSERT(g_blob != NULL); CU_ASSERT(g_blob != NULL);
blob = g_blob; blob = g_blob;
spdk_blob_close(blob, blob_op_complete, NULL);
poll_threads();
CU_ASSERT(g_bserrno == 0);
ut_blob_close_and_delete(bs, blob); /* Try to open file twice in succession. This should return the same
* blob object.
*/
g_blob = NULL;
g_blob2 = NULL;
g_bserrno = -1;
g_bserrno2 = -1;
spdk_bs_open_blob(bs, blobid, blob_op_with_handle_complete2, NULL);
spdk_bs_open_blob(bs, blobid, blob_op_with_handle_complete2, NULL);
poll_threads();
CU_ASSERT(g_bserrno == 0);
CU_ASSERT(g_bserrno2 == 0);
CU_ASSERT(g_blob != NULL);
CU_ASSERT(g_blob2 != NULL);
CU_ASSERT(g_blob == g_blob2);
g_bserrno = -1;
spdk_blob_close(g_blob, blob_op_complete, NULL);
poll_threads();
CU_ASSERT(g_bserrno == 0);
ut_blob_close_and_delete(bs, g_blob);
} }
static void static void