lvol: exposed marking lvol bdev as read only

Added set_read_only_lvol_bdev() RPC that marks
lvol bdev as read only.
This enables to create clones off of a base lvol,
without having to create additional lvol bdev with snapshot.

Change-Id: Ic20bbcd8fbebcef157acce44bfa1da035b0da459
Signed-off-by: Tomasz Zawadzki <tomasz.zawadzki@intel.com>
Reviewed-on: https://review.gerrithub.io/c/440534
Tested-by: SPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: Jim Harris <james.r.harris@intel.com>
Reviewed-by: Ben Walker <benjamin.walker@intel.com>
This commit is contained in:
Tomasz Zawadzki 2019-01-15 10:31:05 -05:00 committed by Jim Harris
parent 8a3620ffe0
commit 3bb815ae9f
15 changed files with 377 additions and 2 deletions

View File

@ -81,6 +81,11 @@ allows virtual bdevs to be shut down cleanly as opposed to the
previous behavior that didn't differentiate between hotremove and
planned shutdown.
### logical volumes
Logical volume bdev can now be marked as read only using `set_read_only_lvol_bdev` RPC.
This allows for basing clones on top of lvol_bdev without first creating a snapshot.
### log
"trace flags" are now referred to as "log flags" in the SPDK log API. The

View File

@ -167,7 +167,7 @@ if [ $SPDK_TEST_LVOL -eq 1 ]; then
test_cases="1,50,51,52,53,100,101,102,150,200,201,250,251,252,253,254,255,"
test_cases+="300,301,450,451,452,550,551,552,553,"
test_cases+="600,601,650,651,652,654,655,"
test_cases+="700,701,702,750,751,752,753,754,755,756,757,758,759,"
test_cases+="700,701,702,750,751,752,753,754,755,756,757,758,759,760,"
test_cases+="800,801,802,803,804,10000"
run_test suite ./test/lvol/lvol.sh --test-cases=$test_cases
run_test suite ./test/blobstore/blob_io_wait/blob_io_wait.sh

View File

@ -53,7 +53,7 @@ The write operation is performed as shown in the diagram below:
![Writing cluster to the clone](lvol_clone_snapshot_write.svg)
User may also create clone of existing snapshot that will be thin provisioned and it will behave in the same way as logical volume from which snapshot is created.
There is no limit of clones and snapshots that may be created as long as there is enough space on logical volume store. Snapshots are read only. Clones may be created only from snapshots.
There is no limit of clones and snapshots that may be created as long as there is enough space on logical volume store. Snapshots are read only. Clones may be created only from snapshots or read only logical volumes.
## Inflation {#lvol_inflation}
@ -137,6 +137,10 @@ resize_lvol_bdev [-h] name size
Resize existing lvol bdev
optional arguments:
-h, --help show help
set_read_only_lvol_bdev [-h] name
Mark lvol bdev as read only
optional arguments:
-h, --help show help
inflate_lvol_bdev [-h] name
Inflate lvol bdev
optional arguments:

View File

@ -121,4 +121,7 @@ struct lvol_store_bdev *vbdev_lvol_store_next(struct lvol_store_bdev *prev);
void spdk_lvol_resize(struct spdk_lvol *lvol, uint64_t sz, spdk_lvol_op_complete cb_fn,
void *cb_arg);
void spdk_lvol_set_read_only(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn,
void *cb_arg);
#endif /* SPDK_INTERNAL_LVOLSTORE_H */

View File

@ -1158,6 +1158,46 @@ vbdev_lvol_resize(struct spdk_lvol *lvol, uint64_t sz, spdk_lvol_op_complete cb_
spdk_lvol_resize(req->lvol, req->sz, _vbdev_lvol_resize_cb, req);
}
static void
_vbdev_lvol_set_read_only_cb(void *cb_arg, int lvolerrno)
{
struct spdk_lvol_req *req = cb_arg;
struct spdk_lvol *lvol = req->lvol;
if (lvolerrno != 0) {
SPDK_ERRLOG("Could not set bdev lvol %s as read only due to error: %d.\n", lvol->name, lvolerrno);
}
req->cb_fn(req->cb_arg, lvolerrno);
free(req);
}
void
vbdev_lvol_set_read_only(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
{
struct spdk_lvol_req *req;
if (lvol == NULL) {
SPDK_ERRLOG("lvol does not exist\n");
cb_fn(cb_arg, -EINVAL);
return;
}
assert(lvol->bdev != NULL);
req = calloc(1, sizeof(*req));
if (req == NULL) {
cb_fn(cb_arg, -ENOMEM);
return;
}
req->cb_fn = cb_fn;
req->cb_arg = cb_arg;
req->lvol = lvol;
spdk_lvol_set_read_only(lvol, _vbdev_lvol_set_read_only_cb, req);
}
static int
vbdev_lvs_init(void)
{

View File

@ -72,6 +72,14 @@ void vbdev_lvol_create_clone(struct spdk_lvol *lvol, const char *clone_name,
void vbdev_lvol_resize(struct spdk_lvol *lvol, uint64_t sz, spdk_lvol_op_complete cb_fn,
void *cb_arg);
/**
* \brief Mark lvol as read only
* \param lvol Handle to lvol
* \param cb_fn Completion callback
* \param cb_arg Completion callback custom arguments
*/
void vbdev_lvol_set_read_only(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg);
void vbdev_lvol_rename(struct spdk_lvol *lvol, const char *new_lvol_name,
spdk_lvol_op_complete cb_fn, void *cb_arg);

View File

@ -888,6 +888,95 @@ invalid:
SPDK_RPC_REGISTER("resize_lvol_bdev", spdk_rpc_resize_lvol_bdev, SPDK_RPC_RUNTIME)
struct rpc_set_ro_lvol_bdev {
char *name;
};
static void
free_rpc_set_ro_lvol_bdev(struct rpc_set_ro_lvol_bdev *req)
{
free(req->name);
}
static const struct spdk_json_object_decoder rpc_set_ro_lvol_bdev_decoders[] = {
{"name", offsetof(struct rpc_set_ro_lvol_bdev, name), spdk_json_decode_string},
};
static void
_spdk_rpc_set_ro_lvol_bdev_cb(void *cb_arg, int lvolerrno)
{
struct spdk_json_write_ctx *w;
struct spdk_jsonrpc_request *request = cb_arg;
if (lvolerrno != 0) {
goto invalid;
}
w = spdk_jsonrpc_begin_result(request);
if (w == NULL) {
return;
}
spdk_json_write_bool(w, true);
spdk_jsonrpc_end_result(request, w);
return;
invalid:
spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INVALID_PARAMS,
spdk_strerror(-lvolerrno));
}
static void
spdk_rpc_set_ro_lvol_bdev(struct spdk_jsonrpc_request *request,
const struct spdk_json_val *params)
{
struct rpc_set_ro_lvol_bdev req = {};
struct spdk_bdev *bdev;
struct spdk_lvol *lvol;
int rc = 0;
SPDK_INFOLOG(SPDK_LOG_LVOL_RPC, "Setting lvol as read only\n");
if (spdk_json_decode_object(params, rpc_set_ro_lvol_bdev_decoders,
SPDK_COUNTOF(rpc_set_ro_lvol_bdev_decoders),
&req)) {
SPDK_INFOLOG(SPDK_LOG_LVOL_RPC, "spdk_json_decode_object failed\n");
rc = -EINVAL;
goto invalid;
}
if (req.name == NULL) {
SPDK_ERRLOG("missing name param\n");
rc = -EINVAL;
goto invalid;
}
bdev = spdk_bdev_get_by_name(req.name);
if (bdev == NULL) {
SPDK_ERRLOG("no bdev for provided name %s\n", req.name);
rc = -ENODEV;
goto invalid;
}
lvol = vbdev_lvol_get_from_bdev(bdev);
if (lvol == NULL) {
rc = -ENODEV;
goto invalid;
}
vbdev_lvol_set_read_only(lvol, _spdk_rpc_set_ro_lvol_bdev_cb, request);
free_rpc_set_ro_lvol_bdev(&req);
return;
invalid:
spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INVALID_PARAMS,
spdk_strerror(-rc));
free_rpc_set_ro_lvol_bdev(&req);
}
SPDK_RPC_REGISTER("set_read_only_lvol_bdev", spdk_rpc_set_ro_lvol_bdev, SPDK_RPC_RUNTIME)
struct rpc_destroy_lvol_bdev {
char *name;
};

View File

@ -1265,6 +1265,33 @@ spdk_lvol_resize(struct spdk_lvol *lvol, uint64_t sz,
spdk_blob_resize(blob, new_clusters, _spdk_lvol_blob_resize_cb, req);
}
static void
_spdk_lvol_set_read_only_cb(void *cb_arg, int lvolerrno)
{
struct spdk_lvol_req *req = cb_arg;
req->cb_fn(req->cb_arg, lvolerrno);
free(req);
}
void
spdk_lvol_set_read_only(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
{
struct spdk_lvol_req *req;
req = calloc(1, sizeof(*req));
if (!req) {
SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
cb_fn(cb_arg, -ENOMEM);
return;
}
req->cb_fn = cb_fn;
req->cb_arg = cb_arg;
spdk_blob_set_read_only(lvol->blob);
spdk_blob_sync_md(lvol->blob, _spdk_lvol_set_read_only_cb, req);
}
static void
_spdk_lvol_rename_cb(void *cb_arg, int lvolerrno)
{

View File

@ -1092,6 +1092,14 @@ Format: 'user:u1 secret:s1 muser:mu1 msecret:ms1,user:u2 secret:s2 muser:mu2 mse
p.add_argument('size', help='new size in MiB for this bdev', type=int)
p.set_defaults(func=resize_lvol_bdev)
def set_read_only_lvol_bdev(args):
rpc.lvol.set_read_only_lvol_bdev(args.client,
name=args.name)
p = subparsers.add_parser('set_read_only_lvol_bdev', help='Mark lvol bdev as read only')
p.add_argument('name', help='lvol bdev name')
p.set_defaults(func=set_read_only_lvol_bdev)
def destroy_lvol_bdev(args):
rpc.lvol.destroy_lvol_bdev(args.client,
name=args.name)

View File

@ -119,6 +119,18 @@ def resize_lvol_bdev(client, name, size):
return client.call('resize_lvol_bdev', params)
def set_read_only_lvol_bdev(client, name):
"""Mark logical volume as read only.
Args:
name: name of logical volume to set as read only
"""
params = {
'name': name,
}
return client.call('set_read_only_lvol_bdev', params)
def destroy_lvol_bdev(client, name):
"""Destroy a logical volume.

View File

@ -70,6 +70,7 @@ function usage() {
757: 'clone_inflate',
758: 'clone_decouple_parent',
759: 'clone_decouple_parent_rw',
760: 'set_read_only',
800: 'rename_positive',
801: 'rename_lvs_nonexistent',
802: 'rename_lvs_EEXIST',

View File

@ -165,6 +165,11 @@ class Commands_Rpc(object):
output, rc = self.rpc.resize_lvol_bdev(uuid, new_size)
return rc
def set_read_only_lvol_bdev(self, uuid):
print("INFO: RPC COMMAND set_read_only_lvol_bdev")
output, rc = self.rpc.set_read_only_lvol_bdev(uuid)
return rc
def start_nbd_disk(self, bdev_name, nbd_name):
print("INFO: RPC COMMAND start_nbd_disk")
output, rc = self.rpc.start_nbd_disk(bdev_name, nbd_name)

View File

@ -140,6 +140,7 @@ def case_message(func):
757: 'clone_inflate',
758: 'decouple_parent',
759: 'decouple_parent_rw',
760: 'set_read_only',
800: 'rename_positive',
801: 'rename_lvs_nonexistent',
802: 'rename_lvs_EEXIST',
@ -2366,6 +2367,70 @@ class TestCases(object):
# - no other operation fails
return fail_count
@case_message
def test_case760(self):
"""
set read only
Set lvol bdev as read only and perform clone on it.
"""
fail_count = 0
nbd_name0 = "/dev/nbd0"
nbd_name1 = "/dev/nbd1"
clone_name = "clone0"
# Construct malloc bdev
base_name = self.c.construct_malloc_bdev(self.total_size,
self.block_size)
# Construct lvol store on malloc bdev
uuid_store = self.c.construct_lvol_store(base_name,
self.lvs_name)
fail_count += self.c.check_get_lvol_stores(base_name, uuid_store,
self.cluster_size)
# Create lvol bdev with 50% of lvol store space
lvs = self.c.get_lvol_stores()[0]
free_clusters_start = int(lvs['free_clusters'])
bdev_size = self.get_lvs_divided_size(2)
bdev_name = self.c.construct_lvol_bdev(uuid_store, self.lbd_name,
bdev_size)
# Set lvol bdev as read only
lvol_bdev = self.c.get_lvol_bdev_with_name(bdev_name)
fail_count += self.c.set_read_only_lvol_bdev(lvol_bdev['name'])
# Try to perform write operation on lvol marked as read only
fail_count += self.c.start_nbd_disk(lvol_bdev['name'], nbd_name0)
size = bdev_size * MEGABYTE
fail_count += self.run_fio_test(nbd_name0, 0, size, "write", "0xcc", 1)
# Create clone of lvol set to read only
rv = self.c.clone_lvol_bdev(lvol_bdev['name'], clone_name)
if rv != 0:
print("ERROR: Creating clone of snapshot ended with unexpected failure")
fail_count += 1
clone_bdev = self.c.get_lvol_bdev_with_name(self.lvs_name + "/" + clone_name)
# Try to perform write operation on lvol clone
fail_count += self.c.start_nbd_disk(clone_bdev['name'], nbd_name1)
size = bdev_size * MEGABYTE
fail_count += self.run_fio_test(nbd_name1, 0, size, "write", "0xcc", 0)
# Stop nbd disks
fail_count += self.c.stop_nbd_disk(nbd_name0)
fail_count += self.c.stop_nbd_disk(nbd_name1)
# Destroy clone lvol bdev
fail_count += self.c.destroy_lvol_bdev(clone_bdev['name'])
# Destroy lvol bdev
fail_count += self.c.destroy_lvol_bdev(lvol_bdev['name'])
# Destroy lvol store
fail_count += self.c.destroy_lvol_store(uuid_store)
# Delete malloc bdev
fail_count += self.c.delete_malloc_bdev(base_name)
# Expected result:
# - calls successful, return code = 0
# - no other operation fails
return fail_count
@case_message
def test_case800(self):
fail_count = 0

View File

@ -405,6 +405,12 @@ spdk_lvol_resize(struct spdk_lvol *lvol, size_t sz, spdk_lvol_op_complete cb_fn
cb_fn(cb_arg, 0);
}
void
spdk_lvol_set_read_only(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
{
cb_fn(cb_arg, 0);
}
int
spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size)
{
@ -682,6 +688,12 @@ vbdev_lvol_resize_complete(void *cb_arg, int lvolerrno)
g_lvolerrno = lvolerrno;
}
static void
vbdev_lvol_set_read_only_complete(void *cb_arg, int lvolerrno)
{
g_lvolerrno = lvolerrno;
}
static void
vbdev_lvol_rename_complete(void *cb_arg, int lvolerrno)
{
@ -1115,6 +1127,45 @@ ut_lvol_resize(void)
CU_ASSERT(g_lvol_store == NULL);
}
static void
ut_lvol_set_read_only(void)
{
struct spdk_lvol_store *lvs;
struct spdk_lvol *lvol;
int sz = 10;
int rc = 0;
/* Lvol store is successfully created */
rc = vbdev_lvs_create(&g_bdev, "lvs", 0, lvol_store_op_with_handle_complete, NULL);
CU_ASSERT(rc == 0);
CU_ASSERT(g_lvserrno == 0);
SPDK_CU_ASSERT_FATAL(g_lvol_store != NULL);
CU_ASSERT(g_lvol_store->bs_dev != NULL);
lvs = g_lvol_store;
/* Successful lvol create */
g_lvolerrno = -1;
rc = vbdev_lvol_create(lvs, "lvol", sz, false, vbdev_lvol_create_complete, NULL);
CU_ASSERT(rc == 0);
CU_ASSERT(g_lvolerrno == 0);
SPDK_CU_ASSERT_FATAL(g_lvol != NULL);
lvol = g_lvol;
/* Successful set lvol as read only */
g_lvolerrno = -1;
vbdev_lvol_set_read_only(lvol, vbdev_lvol_set_read_only_complete, NULL);
CU_ASSERT(g_lvolerrno == 0);
/* Successful lvol destroy */
vbdev_lvol_destroy(lvol, lvol_store_op_complete, NULL);
CU_ASSERT(g_lvol == NULL);
/* Destroy lvol store */
vbdev_lvs_destruct(lvs, lvol_store_op_complete, NULL);
CU_ASSERT(g_lvserrno == 0);
CU_ASSERT(g_lvol_store == NULL);
}
static void
ut_lvs_unload(void)
{
@ -1388,6 +1439,7 @@ int main(int argc, char **argv)
CU_add_test(suite, "ut_lvs_destroy", ut_lvs_destroy) == NULL ||
CU_add_test(suite, "ut_lvs_unload", ut_lvs_unload) == NULL ||
CU_add_test(suite, "ut_lvol_resize", ut_lvol_resize) == NULL ||
CU_add_test(suite, "ut_lvol_set_read_only", ut_lvol_set_read_only) == NULL ||
CU_add_test(suite, "lvol_hotremove", ut_lvol_hotremove) == NULL ||
CU_add_test(suite, "ut_vbdev_lvol_get_io_channel", ut_vbdev_lvol_get_io_channel) == NULL ||
CU_add_test(suite, "ut_vbdev_lvol_io_type_supported", ut_vbdev_lvol_io_type_supported) == NULL ||

View File

@ -369,6 +369,12 @@ spdk_blob_resize(struct spdk_blob *blob, uint64_t sz, spdk_blob_op_complete cb_f
cb_fn(cb_arg, 0);
}
int
spdk_blob_set_read_only(struct spdk_blob *blob)
{
return 0;
}
void
spdk_blob_sync_md(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg)
{
@ -977,6 +983,55 @@ lvol_resize(void)
free_dev(&dev);
}
static void
lvol_set_read_only(void)
{
struct lvol_ut_bs_dev dev;
struct spdk_lvs_opts opts;
int rc = 0;
struct spdk_lvol *lvol, *clone;
init_dev(&dev);
spdk_lvs_opts_init(&opts);
snprintf(opts.name, sizeof(opts.name), "lvs");
g_lvserrno = -1;
rc = spdk_lvs_init(&dev.bs_dev, &opts, lvol_store_op_with_handle_complete, NULL);
CU_ASSERT(rc == 0);
CU_ASSERT(g_lvserrno == 0);
SPDK_CU_ASSERT_FATAL(g_lvol_store != NULL);
spdk_lvol_create(g_lvol_store, "lvol", 10, false, lvol_op_with_handle_complete, NULL);
CU_ASSERT(g_lvolerrno == 0);
SPDK_CU_ASSERT_FATAL(g_lvol != NULL);
lvol = g_lvol;
/* Set lvol as read only */
spdk_lvol_set_read_only(lvol, lvol_op_complete, NULL);
CU_ASSERT(g_lvolerrno == 0);
/* Create lvol clone from read only lvol */
spdk_lvol_create_clone(lvol, "clone", lvol_op_with_handle_complete, NULL);
CU_ASSERT(g_lvolerrno == 0);
SPDK_CU_ASSERT_FATAL(g_lvol != NULL);
CU_ASSERT_STRING_EQUAL(g_lvol->name, "clone");
clone = g_lvol;
spdk_lvol_close(lvol, close_cb, NULL);
CU_ASSERT(g_lvserrno == 0);
spdk_lvol_close(clone, close_cb, NULL);
CU_ASSERT(g_lvserrno == 0);
g_lvserrno = -1;
rc = spdk_lvs_unload(g_lvol_store, lvol_store_op_complete, NULL);
CU_ASSERT(rc == 0);
CU_ASSERT(g_lvserrno == 0);
g_lvol_store = NULL;
free_dev(&dev);
}
static void
null_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
{
@ -1993,6 +2048,7 @@ int main(int argc, char **argv)
CU_add_test(suite, "lvol_close_fail", lvol_close_fail) == NULL ||
CU_add_test(suite, "lvol_close_success", lvol_close_success) == NULL ||
CU_add_test(suite, "lvol_resize", lvol_resize) == NULL ||
CU_add_test(suite, "lvol_set_read_only", lvol_set_read_only) == NULL ||
CU_add_test(suite, "lvs_load", lvs_load) == NULL ||
CU_add_test(suite, "lvols_load", lvols_load) == NULL ||
CU_add_test(suite, "lvol_open", lvol_open) == NULL ||