test/lvol: Check if snapshot with snapshot can be deleted

Change-Id: If4458e998abb6fc3415ebbd46c5e5081cddb9180
Signed-off-by: Pawel Kaminski <pawelx.kaminski@intel.com>
Reviewed-on: https://review.gerrithub.io/c/spdk/spdk/+/450515
Tested-by: SPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: Ben Walker <benjamin.walker@intel.com>
Reviewed-by: Shuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
Reviewed-by: Jim Harris <james.r.harris@intel.com>
Reviewed-by: Tomasz Zawadzki <tomasz.zawadzki@intel.com>
Reviewed-by: Maciej Szwed <maciej.szwed@intel.com>
This commit is contained in:
Pawel Kaminski 2019-04-08 08:37:09 -04:00 committed by Ben Walker
parent 60b36002bd
commit b585dcb1db
2 changed files with 123 additions and 0 deletions

View File

@ -74,6 +74,7 @@ function usage() {
759: 'clone_decouple_parent_rw',
760: 'set_read_only',
761: 'delete_snapshot',
762: 'delete_snapshot_with_snapshot',
800: 'rename_positive',
801: 'rename_lvs_nonexistent',
802: 'rename_lvs_EEXIST',

View File

@ -173,6 +173,7 @@ def case_message(func):
759: 'decouple_parent_rw',
760: 'set_read_only',
761: 'delete_snapshot',
762: 'delete_snapshot_with_snapshot',
# logical volume rename tests
800: 'rename_positive',
801: 'rename_lvs_nonexistent',
@ -2979,6 +2980,127 @@ class TestCases(object):
# - no other operation fails
return fail_count
@case_message
def test_case762(self):
"""
delete_snapshot_with_snapshot
Check if it is possible to delete snapshot with one snapshot on it
"""
fail_count = 0
nbd_name0 = "/dev/nbd0"
nbd_name1 = "/dev/nbd1"
nbd_name2 = "/dev/nbd2"
snapshot_name = "snapshot"
snapshot_name2 = "snapshot2"
# Construct malloc bdev
base_name = self.c.construct_malloc_bdev(self.total_size,
self.block_size)
# Construct lvol store on malloc bdev
uuid_store = self.c.construct_lvol_store(base_name,
self.lvs_name)
fail_count += self.c.check_get_lvol_stores(base_name, uuid_store,
self.cluster_size)
# Create lvol bdev with one third of lvol store space
lvs = self.c.get_lvol_stores()[0]
bdev_size = self.get_lvs_divided_size(3)
bdev_name = self.c.construct_lvol_bdev(uuid_store, self.lbd_name,
bdev_size)
lvol_bdev = self.c.get_lvol_bdev_with_name(bdev_name)
# Perform write operation on lvol
fail_count += self.c.start_nbd_disk(lvol_bdev['name'], nbd_name0)
size = bdev_size * MEGABYTE
fail_count += self.run_fio_test(nbd_name0, 0, size-1, "write", "0xcc")
# Create snapshot of lvol bdev
fail_count += self.c.snapshot_lvol_bdev(lvol_bdev['name'], snapshot_name)
snapshot_bdev = self.c.get_lvol_bdev_with_name(self.lvs_name + "/" + snapshot_name)
lvol_bdev = self.c.get_lvol_bdev_with_name(bdev_name)
if lvol_bdev['driver_specific']['lvol']['base_snapshot'] != snapshot_name:
fail_count += 1
# Fill second 1/3 of lvol bdev
first_part = int(size / 3)
second_part = int(size / 3 * 2)
fail_count += self.run_fio_test(nbd_name0, first_part, second_part-first_part, "write", "0xee")
# Check if snapshot was unchanged
fail_count += self.c.start_nbd_disk(snapshot_bdev['name'], nbd_name1)
fail_count += self.run_fio_test(nbd_name1, 0, size-1, "read", "0xcc")
# Create second snapshot of lvol_bdev
# First snapshot becomes snapshot of second snapshot
fail_count += self.c.snapshot_lvol_bdev(lvol_bdev['name'], snapshot_name2)
snapshot_bdev2 = self.c.get_lvol_bdev_with_name(self.lvs_name + "/" + snapshot_name2)
snapshot_bdev = self.c.get_lvol_bdev_with_name(self.lvs_name + "/" + snapshot_name)
if snapshot_bdev2['driver_specific']['lvol']['base_snapshot'] != snapshot_name:
fail_count += 1
if self.lbd_name not in snapshot_bdev2['driver_specific']['lvol']['clones']:
fail_count += 1
if snapshot_bdev2['driver_specific']['lvol']['clone'] is not True\
or snapshot_bdev2['driver_specific']['lvol']['snapshot'] is not True:
fail_count += 1
if snapshot_name2 not in snapshot_bdev['driver_specific']['lvol']['clones']:
fail_count += 1
# Verify snapshots
fail_count += self.run_fio_test(nbd_name1, 0, size-1, "read", "0xcc")
fail_count += self.c.start_nbd_disk(snapshot_bdev2['name'], nbd_name2)
fail_count += self.run_fio_test(nbd_name2, 0, first_part-1, "read", "0xcc")
fail_count += self.run_fio_test(nbd_name2, first_part, second_part-first_part, "read", "0xee")
fail_count += self.run_fio_test(nbd_name2, second_part, size-second_part, "read", "0xcc")
# Verify lvol bdev
fail_count += self.run_fio_test(nbd_name0, first_part, second_part-first_part, "read", "0xee")
fail_count += self.run_fio_test(nbd_name0, second_part, size-second_part, "read", "0xcc")
lvol_bdev = self.c.get_lvol_bdev_with_name(bdev_name)
if lvol_bdev['driver_specific']['lvol']['clone'] is not True:
fail_count += 1
if lvol_bdev['driver_specific']['lvol']['base_snapshot'] != snapshot_name2:
fail_count += 1
# Fill third part of lvol bdev
fail_count += self.run_fio_test(nbd_name0, second_part, size-second_part, "write", "0xdd")
# Verify snapshots
fail_count += self.run_fio_test(nbd_name1, 0, size-1, "read", "0xcc")
fail_count += self.c.stop_nbd_disk(nbd_name1)
fail_count += self.run_fio_test(nbd_name2, second_part, size-second_part, "read", "0xcc")
fail_count += self.c.stop_nbd_disk(nbd_name2)
# Delete snapshot - should succeed
fail_count += self.c.destroy_lvol_bdev(snapshot_bdev2['name'])
# Check data consistency
snapshot_bdev = self.c.get_lvol_bdev_with_name(self.lvs_name + "/" + snapshot_name)
lvol_bdev = self.c.get_lvol_bdev_with_name(bdev_name)
if lvol_bdev['driver_specific']['lvol']['clone'] is not True:
fail_count += 1
if lvol_bdev['driver_specific']['lvol']['base_snapshot'] != snapshot_name:
fail_count += 1
if self.lbd_name not in snapshot_bdev['driver_specific']['lvol']['clones']:
fail_count += 1
fail_count += self.run_fio_test(nbd_name0, first_part, second_part-first_part, "read", "0xee")
fail_count += self.run_fio_test(nbd_name0, second_part, size-second_part, "read", "0xdd")
fail_count += self.c.stop_nbd_disk(nbd_name0)
# Destroy snapshot
fail_count += self.c.destroy_lvol_bdev(snapshot_bdev['name'])
# Destroy lvol bdev
fail_count += self.c.destroy_lvol_bdev(lvol_bdev['name'])
# Destroy lvol store
fail_count += self.c.destroy_lvol_store(uuid_store)
# Delete malloc bdev
fail_count += self.c.delete_malloc_bdev(base_name)
# Expected result:
# - calls successful, return code = 0
# - no other operation fails
return fail_count
@case_message
def test_case800(self):
"""