blob: add data_ro and md_ro flags to spdk_blob

data_ro means that write, write_zeroes and unmap operations are not
allowed.

md_ro means that resize, set_xattr and remove_xattr are not
allowed.

There is no code yet that can activate this - it is coming in a future
patch.  Two usages are planned though:

1) a user explicitly marks a blob as read-only - this is persisted so that
   future loads of the blob will ensure the blob cannot be modified - neither
   metadata nor data
2) a future feature flag framework (how's that for alliteration) may allow
   a blob to be opened, but not allow metadata modifications, if there are
   feature flags set in the blob's or blobstore's metadata that the
   application does not understand

Signed-off-by: Jim Harris <james.r.harris@intel.com>
Change-Id: I247fd900430c56f7176edfb80dddd5a1a6c8dc87

Reviewed-on: https://review.gerrithub.io/388663
Reviewed-by: Daniel Verkamp <daniel.verkamp@intel.com>
Reviewed-by: Ben Walker <benjamin.walker@intel.com>
Tested-by: SPDK Automated Test System <sys_sgsw@intel.com>
This commit is contained in:
Jim Harris 2017-11-21 16:24:00 -07:00
parent 500f531200
commit f2223d7d21
3 changed files with 128 additions and 4 deletions

View File

@ -1067,6 +1067,11 @@ _spdk_blob_request_submit_op(struct spdk_blob *blob, struct spdk_io_channel *_ch
assert(blob != NULL); assert(blob != NULL);
if (blob->data_ro && op_type != SPDK_BLOB_READ) {
cb_fn(cb_arg, -EPERM);
return;
}
if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) { if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) {
cb_fn(cb_arg, -EINVAL); cb_fn(cb_arg, -EINVAL);
return; return;
@ -2552,6 +2557,10 @@ spdk_bs_md_resize_blob(struct spdk_blob *blob, uint64_t sz)
SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Resizing blob %lu to %lu clusters\n", blob->id, sz); SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Resizing blob %lu to %lu clusters\n", blob->id, sz);
if (blob->md_ro) {
return -EPERM;
}
if (sz == blob->active.num_clusters) { if (sz == blob->active.num_clusters) {
return 0; return 0;
} }
@ -2721,6 +2730,11 @@ void spdk_bs_md_sync_blob(struct spdk_blob *blob,
assert(blob->state != SPDK_BLOB_STATE_LOADING && assert(blob->state != SPDK_BLOB_STATE_LOADING &&
blob->state != SPDK_BLOB_STATE_SYNCING); blob->state != SPDK_BLOB_STATE_SYNCING);
if (blob->md_ro) {
assert(blob->state == SPDK_BLOB_STATE_CLEAN);
return;
}
if (blob->state == SPDK_BLOB_STATE_CLEAN) { if (blob->state == SPDK_BLOB_STATE_CLEAN) {
cb_fn(cb_arg, 0); cb_fn(cb_arg, 0);
return; return;
@ -2853,6 +2867,10 @@ void spdk_bs_io_writev_blob(struct spdk_blob *blob, struct spdk_io_channel *chan
struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
spdk_blob_op_complete cb_fn, void *cb_arg) spdk_blob_op_complete cb_fn, void *cb_arg)
{ {
if (blob->data_ro) {
cb_fn(cb_arg, -EPERM);
return;
}
_spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, false); _spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, false);
} }
@ -2970,6 +2988,10 @@ spdk_blob_md_set_xattr(struct spdk_blob *blob, const char *name, const void *val
assert(blob->state != SPDK_BLOB_STATE_LOADING && assert(blob->state != SPDK_BLOB_STATE_LOADING &&
blob->state != SPDK_BLOB_STATE_SYNCING); blob->state != SPDK_BLOB_STATE_SYNCING);
if (blob->md_ro) {
return -EPERM;
}
TAILQ_FOREACH(xattr, &blob->xattrs, link) { TAILQ_FOREACH(xattr, &blob->xattrs, link) {
if (!strcmp(name, xattr->name)) { if (!strcmp(name, xattr->name)) {
free(xattr->value); free(xattr->value);
@ -3008,6 +3030,10 @@ spdk_blob_md_remove_xattr(struct spdk_blob *blob, const char *name)
assert(blob->state != SPDK_BLOB_STATE_LOADING && assert(blob->state != SPDK_BLOB_STATE_LOADING &&
blob->state != SPDK_BLOB_STATE_SYNCING); blob->state != SPDK_BLOB_STATE_SYNCING);
if (blob->md_ro) {
return -EPERM;
}
TAILQ_FOREACH(xattr, &blob->xattrs, link) { TAILQ_FOREACH(xattr, &blob->xattrs, link) {
if (!strcmp(name, xattr->name)) { if (!strcmp(name, xattr->name)) {
TAILQ_REMOVE(&blob->xattrs, xattr, link); TAILQ_REMOVE(&blob->xattrs, xattr, link);

View File

@ -125,6 +125,9 @@ struct spdk_blob {
struct spdk_blob_mut_data clean; struct spdk_blob_mut_data clean;
struct spdk_blob_mut_data active; struct spdk_blob_mut_data active;
bool data_ro;
bool md_ro;
/* TODO: The xattrs are mutable, but we don't want to be /* TODO: The xattrs are mutable, but we don't want to be
* copying them unecessarily. Figure this out. * copying them unecessarily. Figure this out.
*/ */

View File

@ -325,9 +325,15 @@ blob_resize(void)
spdk_bs_md_open_blob(bs, blobid, blob_op_with_handle_complete, NULL); spdk_bs_md_open_blob(bs, blobid, blob_op_with_handle_complete, NULL);
CU_ASSERT(g_bserrno == 0); CU_ASSERT(g_bserrno == 0);
CU_ASSERT(g_blob != NULL); SPDK_CU_ASSERT_FATAL(g_blob != NULL);
blob = g_blob; blob = g_blob;
/* Confirm that resize fails if blob is marked read-only. */
blob->md_ro = true;
rc = spdk_bs_md_resize_blob(blob, 5);
CU_ASSERT(rc == -EPERM);
blob->md_ro = false;
/* The blob started at 0 clusters. Resize it to be 5. */ /* The blob started at 0 clusters. Resize it to be 5. */
rc = spdk_bs_md_resize_blob(blob, 5); rc = spdk_bs_md_resize_blob(blob, 5);
CU_ASSERT(rc == 0); CU_ASSERT(rc == 0);
@ -417,7 +423,7 @@ blob_write(void)
spdk_bs_md_open_blob(bs, blobid, blob_op_with_handle_complete, NULL); spdk_bs_md_open_blob(bs, blobid, blob_op_with_handle_complete, NULL);
CU_ASSERT(g_bserrno == 0); CU_ASSERT(g_bserrno == 0);
CU_ASSERT(g_blob != NULL); SPDK_CU_ASSERT_FATAL(g_blob != NULL);
blob = g_blob; blob = g_blob;
/* Write to a blob with 0 size */ /* Write to a blob with 0 size */
@ -428,6 +434,12 @@ blob_write(void)
rc = spdk_bs_md_resize_blob(blob, 5); rc = spdk_bs_md_resize_blob(blob, 5);
CU_ASSERT(rc == 0); CU_ASSERT(rc == 0);
/* Confirm that write fails if blob is marked read-only. */
blob->data_ro = true;
spdk_bs_io_write_blob(blob, channel, payload, 0, 1, blob_op_complete, NULL);
CU_ASSERT(g_bserrno == -EPERM);
blob->data_ro = false;
/* Write to the blob */ /* Write to the blob */
spdk_bs_io_write_blob(blob, channel, payload, 0, 1, blob_op_complete, NULL); spdk_bs_io_write_blob(blob, channel, payload, 0, 1, blob_op_complete, NULL);
CU_ASSERT(g_bserrno == 0); CU_ASSERT(g_bserrno == 0);
@ -483,7 +495,7 @@ blob_read(void)
spdk_bs_md_open_blob(bs, blobid, blob_op_with_handle_complete, NULL); spdk_bs_md_open_blob(bs, blobid, blob_op_with_handle_complete, NULL);
CU_ASSERT(g_bserrno == 0); CU_ASSERT(g_bserrno == 0);
CU_ASSERT(g_blob != NULL); SPDK_CU_ASSERT_FATAL(g_blob != NULL);
blob = g_blob; blob = g_blob;
/* Read from a blob with 0 size */ /* Read from a blob with 0 size */
@ -494,6 +506,12 @@ blob_read(void)
rc = spdk_bs_md_resize_blob(blob, 5); rc = spdk_bs_md_resize_blob(blob, 5);
CU_ASSERT(rc == 0); CU_ASSERT(rc == 0);
/* Confirm that read passes if blob is marked read-only. */
blob->data_ro = true;
spdk_bs_io_read_blob(blob, channel, payload, 0, 1, blob_op_complete, NULL);
CU_ASSERT(g_bserrno == 0);
blob->data_ro = false;
/* Read from the blob */ /* Read from the blob */
spdk_bs_io_read_blob(blob, channel, payload, 0, 1, blob_op_complete, NULL); spdk_bs_io_read_blob(blob, channel, payload, 0, 1, blob_op_complete, NULL);
CU_ASSERT(g_bserrno == 0); CU_ASSERT(g_bserrno == 0);
@ -740,6 +758,67 @@ blob_rw_verify_iov_nomem(void)
g_bs = NULL; g_bs = NULL;
} }
static void
blob_rw_iov_read_only(void)
{
struct spdk_blob_store *bs;
struct spdk_bs_dev *dev;
struct spdk_blob *blob;
struct spdk_io_channel *channel;
spdk_blob_id blobid;
uint8_t payload_read[4096];
uint8_t payload_write[4096];
struct iovec iov_read;
struct iovec iov_write;
int rc;
dev = init_dev();
memset(g_dev_buffer, 0, DEV_BUFFER_SIZE);
spdk_bs_init(dev, NULL, bs_op_with_handle_complete, NULL);
CU_ASSERT(g_bserrno == 0);
SPDK_CU_ASSERT_FATAL(g_bs != NULL);
bs = g_bs;
channel = spdk_bs_alloc_io_channel(bs);
CU_ASSERT(channel != NULL);
spdk_bs_md_create_blob(bs, blob_op_with_id_complete, NULL);
CU_ASSERT(g_bserrno == 0);
CU_ASSERT(g_blobid != SPDK_BLOBID_INVALID);
blobid = g_blobid;
spdk_bs_md_open_blob(bs, blobid, blob_op_with_handle_complete, NULL);
CU_ASSERT(g_bserrno == 0);
SPDK_CU_ASSERT_FATAL(g_blob != NULL);
blob = g_blob;
rc = spdk_bs_md_resize_blob(blob, 2);
CU_ASSERT(rc == 0);
/* Verify that writev failed if read_only flag is set. */
blob->data_ro = true;
iov_write.iov_base = payload_write;
iov_write.iov_len = sizeof(payload_write);
spdk_bs_io_writev_blob(blob, channel, &iov_write, 1, 0, 1, blob_op_complete, NULL);
CU_ASSERT(g_bserrno == -EPERM);
/* Verify that reads pass if data_ro flag is set. */
iov_read.iov_base = payload_read;
iov_read.iov_len = sizeof(payload_read);
spdk_bs_io_readv_blob(blob, channel, &iov_read, 1, 0, 1, blob_op_complete, NULL);
CU_ASSERT(g_bserrno == 0);
spdk_bs_md_close_blob(&blob, blob_op_complete, NULL);
CU_ASSERT(g_bserrno == 0);
spdk_bs_free_io_channel(channel);
spdk_bs_unload(g_bs, bs_op_complete, NULL);
CU_ASSERT(g_bserrno == 0);
g_bs = NULL;
}
static void static void
blob_iter(void) blob_iter(void)
{ {
@ -807,9 +886,15 @@ blob_xattr(void)
spdk_bs_md_open_blob(bs, blobid, blob_op_with_handle_complete, NULL); spdk_bs_md_open_blob(bs, blobid, blob_op_with_handle_complete, NULL);
CU_ASSERT(g_bserrno == 0); CU_ASSERT(g_bserrno == 0);
CU_ASSERT(g_blob != NULL); SPDK_CU_ASSERT_FATAL(g_blob != NULL);
blob = g_blob; blob = g_blob;
/* Test that set_xattr fails if md_ro flag is set. */
blob->md_ro = true;
rc = spdk_blob_md_set_xattr(blob, "name", "log.txt", strlen("log.txt") + 1);
CU_ASSERT(rc == -EPERM);
blob->md_ro = false;
rc = spdk_blob_md_set_xattr(blob, "name", "log.txt", strlen("log.txt") + 1); rc = spdk_blob_md_set_xattr(blob, "name", "log.txt", strlen("log.txt") + 1);
CU_ASSERT(rc == 0); CU_ASSERT(rc == 0);
@ -822,12 +907,15 @@ blob_xattr(void)
rc = spdk_blob_md_set_xattr(blob, "length", &length, sizeof(length)); rc = spdk_blob_md_set_xattr(blob, "length", &length, sizeof(length));
CU_ASSERT(rc == 0); CU_ASSERT(rc == 0);
/* get_xattr should still work even if md_ro flag is set. */
value = NULL; value = NULL;
blob->md_ro = true;
rc = spdk_bs_md_get_xattr_value(blob, "length", &value, &value_len); rc = spdk_bs_md_get_xattr_value(blob, "length", &value, &value_len);
CU_ASSERT(rc == 0); CU_ASSERT(rc == 0);
SPDK_CU_ASSERT_FATAL(value != NULL); SPDK_CU_ASSERT_FATAL(value != NULL);
CU_ASSERT(*(uint64_t *)value == length); CU_ASSERT(*(uint64_t *)value == length);
CU_ASSERT(value_len == 8); CU_ASSERT(value_len == 8);
blob->md_ro = false;
rc = spdk_bs_md_get_xattr_value(blob, "foobar", &value, &value_len); rc = spdk_bs_md_get_xattr_value(blob, "foobar", &value, &value_len);
CU_ASSERT(rc == -ENOENT); CU_ASSERT(rc == -ENOENT);
@ -846,6 +934,12 @@ blob_xattr(void)
CU_ASSERT(strcmp(name1, name2)); CU_ASSERT(strcmp(name1, name2));
spdk_xattr_names_free(names); spdk_xattr_names_free(names);
/* Confirm that remove_xattr fails if md_ro is set to true. */
blob->md_ro = true;
rc = spdk_blob_md_remove_xattr(blob, "name");
CU_ASSERT(rc == -EPERM);
blob->md_ro = false;
rc = spdk_blob_md_remove_xattr(blob, "name"); rc = spdk_blob_md_remove_xattr(blob, "name");
CU_ASSERT(rc == 0); CU_ASSERT(rc == 0);
@ -1916,6 +2010,7 @@ int main(int argc, char **argv)
CU_add_test(suite, "blob_rw_verify", blob_rw_verify) == NULL || CU_add_test(suite, "blob_rw_verify", blob_rw_verify) == NULL ||
CU_add_test(suite, "blob_rw_verify_iov", blob_rw_verify_iov) == NULL || CU_add_test(suite, "blob_rw_verify_iov", blob_rw_verify_iov) == NULL ||
CU_add_test(suite, "blob_rw_verify_iov_nomem", blob_rw_verify_iov_nomem) == NULL || CU_add_test(suite, "blob_rw_verify_iov_nomem", blob_rw_verify_iov_nomem) == NULL ||
CU_add_test(suite, "blob_rw_iov_read_only", blob_rw_iov_read_only) == NULL ||
CU_add_test(suite, "blob_iter", blob_iter) == NULL || CU_add_test(suite, "blob_iter", blob_iter) == NULL ||
CU_add_test(suite, "blob_xattr", blob_xattr) == NULL || CU_add_test(suite, "blob_xattr", blob_xattr) == NULL ||
CU_add_test(suite, "bs_load", bs_load) == NULL || CU_add_test(suite, "bs_load", bs_load) == NULL ||