blob: add per-blob feature flags

Similar flags will be added at the blobstore level in a future
patch.

This allows backwards compatibility - i.e. allow older blobstore
applications to open blobstores created by newer blobstore
applications with new features.  Any blob's using a new feature
should have an associated flag set in one of three new flag masks:

- invalid: if a bit is set in this mask that the application is not
	   aware of, do not allow the blob to be opened
- data_ro: if a bit is set in this mask that the application is not
	   aware of, allow the blob to be opened, but do not allow
	   write I/O nor any operation that changes metadata
- md_ro:   if a bit is set in this mask that the application is not
	   aware of, allow the blob to be opened for performing any
	   kind of I/O, but do not allow any operation that changes
	   metadata

While here, bump SPDK_BS_VERSION to 3.  We intend this to be the
last change made to SPDK_BS_VERSION - future versioning will be
done via blobstore or per-blob feature flags instead.

Signed-off-by: Jim Harris <james.r.harris@intel.com>
Change-Id: If059e38bfffbeec25c849a7629a81193b12302c4

Reviewed-on: https://review.gerrithub.io/388703
Reviewed-by: Ben Walker <benjamin.walker@intel.com>
Tested-by: SPDK Automated Test System <sys_sgsw@intel.com>
Reviewed-by: Daniel Verkamp <daniel.verkamp@intel.com>
This commit is contained in:
Jim Harris 2017-11-21 16:10:28 -07:00
parent c45a589901
commit d12ba75bd1
4 changed files with 231 additions and 5 deletions

View File

@ -216,7 +216,7 @@ Each page is defined as:
<blob-descriptor> ::= <blob-descriptor-type> <blob-descriptor-length>
<blob-descriptor-data>
<blob-descriptor-type> ::= u8 # 0 means padding, 1 means "extent", 2 means
# xattr. The type
# xattr, 3 means flags. The type
# describes how to interpret the descriptor data.
<blob-descriptor-length> ::= u32 # Length of the entire descriptor
@ -233,6 +233,12 @@ Each page is defined as:
<xattr-name> ::= u8*
<xattr-value> ::= u8*
<blob-descriptor-data-flags> ::= <flags-invalid> <flags-data-ro> <flags-md-ro>
<flags-invalid> ::= u64
<flags-data-ro> ::= u64
<flags-md-ro> ::= u64
<blob-next> ::= u32 # The offset into the metadata region that contains the
# next page of metadata. 0 means no next page.
<blob-crc> ::= u32 # CRC of the entire page

View File

@ -188,6 +188,35 @@ _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob *bl
/* If padding and length are 0, this terminates the page */
break;
}
} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) {
struct spdk_blob_md_descriptor_flags *desc_flags;
desc_flags = (struct spdk_blob_md_descriptor_flags *)desc;
if (desc_flags->length != sizeof(*desc_flags) - sizeof(*desc)) {
return -EINVAL;
}
if ((desc_flags->invalid_flags | SPDK_BLOB_INVALID_FLAGS_MASK) !=
SPDK_BLOB_INVALID_FLAGS_MASK) {
return -EINVAL;
}
if ((desc_flags->data_ro_flags | SPDK_BLOB_DATA_RO_FLAGS_MASK) !=
SPDK_BLOB_DATA_RO_FLAGS_MASK) {
blob->data_ro = true;
blob->md_ro = true;
}
if ((desc_flags->md_ro_flags | SPDK_BLOB_MD_RO_FLAGS_MASK) !=
SPDK_BLOB_MD_RO_FLAGS_MASK) {
blob->md_ro = true;
}
blob->invalid_flags = desc_flags->invalid_flags;
blob->data_ro_flags = desc_flags->data_ro_flags;
blob->md_ro_flags = desc_flags->md_ro_flags;
} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) {
struct spdk_blob_md_descriptor_extent *desc_extent;
unsigned int i, j;
@ -265,8 +294,12 @@ _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob *bl
TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link);
} else {
/* Error */
return -EINVAL;
/* Unrecognized descriptor type. Do not fail - just continue to the
* next descriptor. If this descriptor is associated with some feature
* defined in a newer version of blobstore, that version of blobstore
* should create and set an associated feature flag to specify if this
* blob can be loaded or not.
*/
}
/* Advance to the next descriptor */
@ -455,6 +488,28 @@ _spdk_blob_serialize_extent(const struct spdk_blob *blob,
return;
}
static void
_spdk_blob_serialize_flags(const struct spdk_blob *blob,
uint8_t *buf, size_t *buf_sz)
{
struct spdk_blob_md_descriptor_flags *desc;
/*
* Flags get serialized first, so we should always have room for the flags
* descriptor.
*/
assert(*buf_sz >= sizeof(*desc));
desc = (struct spdk_blob_md_descriptor_flags *)buf;
desc->type = SPDK_MD_DESCRIPTOR_TYPE_FLAGS;
desc->length = sizeof(*desc) - sizeof(struct spdk_blob_md_descriptor);
desc->invalid_flags = blob->invalid_flags;
desc->data_ro_flags = blob->data_ro_flags;
desc->md_ro_flags = blob->md_ro_flags;
*buf_sz -= sizeof(*desc);
}
static int
_spdk_blob_serialize(const struct spdk_blob *blob, struct spdk_blob_md_page **pages,
uint32_t *page_count)
@ -483,6 +538,9 @@ _spdk_blob_serialize(const struct spdk_blob *blob, struct spdk_blob_md_page **pa
buf = (uint8_t *)cur_page->descriptors;
remaining_sz = sizeof(cur_page->descriptors);
/* Serialize flags */
_spdk_blob_serialize_flags(blob, buf, &remaining_sz);
/* Serialize xattrs */
TAILQ_FOREACH(xattr, &blob->xattrs, link) {
size_t required_sz = 0;

View File

@ -129,6 +129,10 @@ struct spdk_blob {
bool data_ro;
bool md_ro;
uint64_t invalid_flags;
uint64_t data_ro_flags;
uint64_t md_ro_flags;
/* TODO: The xattrs are mutable, but we don't want to be
* copying them unecessarily. Figure this out.
*/
@ -194,7 +198,7 @@ enum spdk_blob_op_type {
* The following data structures exist on disk.
*/
#define SPDK_BS_INITIAL_VERSION 1
#define SPDK_BS_VERSION 2 /* current version */
#define SPDK_BS_VERSION 3 /* current version */
#pragma pack(push, 1)
@ -210,6 +214,7 @@ struct spdk_bs_md_mask {
#define SPDK_MD_DESCRIPTOR_TYPE_PADDING 0
#define SPDK_MD_DESCRIPTOR_TYPE_EXTENT 1
#define SPDK_MD_DESCRIPTOR_TYPE_XATTR 2
#define SPDK_MD_DESCRIPTOR_TYPE_FLAGS 3
struct spdk_blob_md_descriptor_xattr {
uint8_t type;
@ -232,6 +237,37 @@ struct spdk_blob_md_descriptor_extent {
} extents[0];
};
/*
* As new flags are defined, these values will be updated to reflect the
* mask of all flag values understood by this application.
*/
#define SPDK_BLOB_INVALID_FLAGS_MASK 0
#define SPDK_BLOB_DATA_RO_FLAGS_MASK 0
#define SPDK_BLOB_MD_RO_FLAGS_MASK 0
struct spdk_blob_md_descriptor_flags {
uint8_t type;
uint32_t length;
/*
* If a flag in invalid_flags is set that the application is not aware of,
* it will not allow the blob to be opened.
*/
uint64_t invalid_flags;
/*
* If a flag in data_ro_flags is set that the application is not aware of,
* allow the blob to be opened in data_read_only and md_read_only mode.
*/
uint64_t data_ro_flags;
/*
* If a flag in md_ro_flags is set the the application is not aware of,
* allow the blob to be opened in md_read_only mode.
*/
uint64_t md_ro_flags;
};
struct spdk_blob_md_descriptor {
uint8_t type;
uint32_t length;

View File

@ -1983,6 +1983,131 @@ blob_dirty_shutdown(void)
g_bs = NULL;
}
static void
blob_flags(void)
{
struct spdk_bs_dev *dev;
spdk_blob_id blobid_invalid, blobid_data_ro, blobid_md_ro;
struct spdk_blob *blob_invalid, *blob_data_ro, *blob_md_ro;
struct spdk_bs_opts opts;
dev = init_dev();
spdk_bs_opts_init(&opts);
/* Initialize a new blob store */
spdk_bs_init(dev, &opts, bs_op_with_handle_complete, NULL);
CU_ASSERT(g_bserrno == 0);
SPDK_CU_ASSERT_FATAL(g_bs != NULL);
/* Create three blobs - one each for testing invalid, data_ro and md_ro flags. */
spdk_bs_md_create_blob(g_bs, blob_op_with_id_complete, NULL);
CU_ASSERT(g_bserrno == 0);
CU_ASSERT(g_blobid != SPDK_BLOBID_INVALID);
blobid_invalid = g_blobid;
spdk_bs_md_create_blob(g_bs, blob_op_with_id_complete, NULL);
CU_ASSERT(g_bserrno == 0);
CU_ASSERT(g_blobid != SPDK_BLOBID_INVALID);
blobid_data_ro = g_blobid;
spdk_bs_md_create_blob(g_bs, blob_op_with_id_complete, NULL);
CU_ASSERT(g_bserrno == 0);
CU_ASSERT(g_blobid != SPDK_BLOBID_INVALID);
blobid_md_ro = g_blobid;
spdk_bs_md_open_blob(g_bs, blobid_invalid, blob_op_with_handle_complete, NULL);
CU_ASSERT(g_bserrno == 0);
CU_ASSERT(g_blob != NULL);
blob_invalid = g_blob;
spdk_bs_md_open_blob(g_bs, blobid_data_ro, blob_op_with_handle_complete, NULL);
CU_ASSERT(g_bserrno == 0);
CU_ASSERT(g_blob != NULL);
blob_data_ro = g_blob;
spdk_bs_md_open_blob(g_bs, blobid_md_ro, blob_op_with_handle_complete, NULL);
CU_ASSERT(g_bserrno == 0);
CU_ASSERT(g_blob != NULL);
blob_md_ro = g_blob;
blob_invalid->invalid_flags = (1ULL << 63);
blob_invalid->state = SPDK_BLOB_STATE_DIRTY;
blob_data_ro->data_ro_flags = (1ULL << 62);
blob_data_ro->state = SPDK_BLOB_STATE_DIRTY;
blob_md_ro->md_ro_flags = (1ULL << 61);
blob_md_ro->state = SPDK_BLOB_STATE_DIRTY;
g_bserrno = -1;
spdk_bs_md_sync_blob(blob_invalid, blob_op_complete, NULL);
CU_ASSERT(g_bserrno == 0);
g_bserrno = -1;
spdk_bs_md_sync_blob(blob_data_ro, blob_op_complete, NULL);
CU_ASSERT(g_bserrno == 0);
g_bserrno = -1;
spdk_bs_md_sync_blob(blob_md_ro, blob_op_complete, NULL);
CU_ASSERT(g_bserrno == 0);
g_bserrno = -1;
spdk_bs_md_close_blob(&blob_invalid, blob_op_complete, NULL);
CU_ASSERT(g_bserrno == 0);
blob_invalid = NULL;
g_bserrno = -1;
spdk_bs_md_close_blob(&blob_data_ro, blob_op_complete, NULL);
CU_ASSERT(g_bserrno == 0);
blob_data_ro = NULL;
g_bserrno = -1;
spdk_bs_md_close_blob(&blob_md_ro, blob_op_complete, NULL);
CU_ASSERT(g_bserrno == 0);
blob_md_ro = NULL;
g_blob = NULL;
g_blobid = SPDK_BLOBID_INVALID;
/* Unload the blob store */
spdk_bs_unload(g_bs, bs_op_complete, NULL);
CU_ASSERT(g_bserrno == 0);
g_bs = NULL;
/* Load an existing blob store */
dev = init_dev();
spdk_bs_load(dev, &opts, bs_op_with_handle_complete, NULL);
CU_ASSERT(g_bserrno == 0);
SPDK_CU_ASSERT_FATAL(g_bs != NULL);
g_blob = NULL;
g_bserrno = 0;
spdk_bs_md_open_blob(g_bs, blobid_invalid, blob_op_with_handle_complete, NULL);
CU_ASSERT(g_bserrno != 0);
CU_ASSERT(g_blob == NULL);
g_blob = NULL;
g_bserrno = -1;
spdk_bs_md_open_blob(g_bs, blobid_data_ro, blob_op_with_handle_complete, NULL);
CU_ASSERT(g_bserrno == 0);
SPDK_CU_ASSERT_FATAL(g_blob != NULL);
blob_data_ro = g_blob;
/* If an unknown data_ro flag was found, the blob should be marked both data and md read-only. */
CU_ASSERT(blob_data_ro->data_ro == true);
CU_ASSERT(blob_data_ro->md_ro == true);
g_blob = NULL;
g_bserrno = -1;
spdk_bs_md_open_blob(g_bs, blobid_md_ro, blob_op_with_handle_complete, NULL);
CU_ASSERT(g_bserrno == 0);
SPDK_CU_ASSERT_FATAL(g_blob != NULL);
blob_md_ro = g_blob;
CU_ASSERT(blob_md_ro->data_ro == false);
CU_ASSERT(blob_md_ro->md_ro == true);
spdk_bs_md_close_blob(&blob_data_ro, blob_op_complete, NULL);
CU_ASSERT(g_bserrno == 0);
spdk_bs_md_close_blob(&blob_md_ro, blob_op_complete, NULL);
CU_ASSERT(g_bserrno == 0);
spdk_bs_unload(g_bs, bs_op_complete, NULL);
CU_ASSERT(g_bserrno == 0);
}
int main(int argc, char **argv)
{
CU_pSuite suite = NULL;
@ -2024,7 +2149,8 @@ int main(int argc, char **argv)
CU_add_test(suite, "blob_serialize", blob_serialize) == NULL ||
CU_add_test(suite, "blob_crc", blob_crc) == NULL ||
CU_add_test(suite, "super_block_crc", super_block_crc) == NULL ||
CU_add_test(suite, "blob_dirty_shutdown", blob_dirty_shutdown) == NULL
CU_add_test(suite, "blob_dirty_shutdown", blob_dirty_shutdown) == NULL ||
CU_add_test(suite, "blob_flags", blob_flags) == NULL
) {
CU_cleanup_registry();
return CU_get_error();