sma: add volume cleanup thread

Since SMA keeps track of attached volumes and a volume might get
disconnected at any time (in which case they're also removed from the
host side), a mechanism is needed to monitor if any of the volumes are
no longer accessible.

This patch implements that mechanism by adding a new thread running in
the background that will periodically (60s by default) send a
bdev_get_bdevs RPC and check that all previously attached volumes are
available.  If any of them are not, it'll remove it and stop the
associated discovery services (if their refcount goes down to zero).
The period can be changed through the `volume_cleanup_period` variable
in the config file.

One important thing to note is that we assume that any intermittent
connection issues are handled internally by the SPDK application and a
bdev is only removed after all reconnection attempts are performed.

Signed-off-by: Konrad Sztyber <konrad.sztyber@intel.com>
Change-Id: I5b9e63698879527d9f79156a0eda1c8bc5e66def
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/12699
Community-CI: Broadcom CI <spdk-ci.pdl@broadcom.com>
Tested-by: SPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: Jim Harris <james.r.harris@intel.com>
Reviewed-by: Tomasz Zawadzki <tomasz.zawadzki@intel.com>
This commit is contained in:
Konrad Sztyber 2022-05-16 12:28:29 +02:00 committed by Tomasz Zawadzki
parent 3c60910eb8
commit 943088499b
4 changed files with 103 additions and 6 deletions

View File

@ -15,7 +15,8 @@ class StorageManagementAgent(pb2_grpc.StorageManagementAgentServicer):
self._devices = {} self._devices = {}
self._server = grpc.server(futures.ThreadPoolExecutor(max_workers=1)) self._server = grpc.server(futures.ThreadPoolExecutor(max_workers=1))
self._server.add_insecure_port(f'{addr}:{port}') self._server.add_insecure_port(f'{addr}:{port}')
self._volume_mgr = VolumeManager(client, config['discovery_timeout']) self._volume_mgr = VolumeManager(client, config['discovery_timeout'],
config['volume_cleanup_period'])
pb2_grpc.add_StorageManagementAgentServicer_to_server(self, self._server) pb2_grpc.add_StorageManagementAgentServicer_to_server(self, self._server)
def _grpc_method(f): def _grpc_method(f):
@ -28,10 +29,12 @@ class StorageManagementAgent(pb2_grpc.StorageManagementAgentServicer):
self._devices[device_manager.protocol] = device_manager self._devices[device_manager.protocol] = device_manager
def start(self): def start(self):
self._volume_mgr.start()
self._server.start() self._server.start()
def stop(self): def stop(self):
self._server.stop(None) self._server.stop(None)
self._volume_mgr.stop()
def _find_device_by_name(self, name): def _find_device_by_name(self, name):
return self._devices.get(name) return self._devices.get(name)

View File

@ -1,6 +1,7 @@
import grpc import grpc
import ipaddress import ipaddress
import logging import logging
import threading
import uuid import uuid
from dataclasses import dataclass from dataclasses import dataclass
from spdk.rpc.client import JSONRPCException from spdk.rpc.client import JSONRPCException
@ -25,13 +26,67 @@ class Volume:
class VolumeManager: class VolumeManager:
def __init__(self, client, discovery_timeout): def __init__(self, client, discovery_timeout, cleanup_period):
self._client = client self._client = client
# Discovery service map (name -> refcnt) # Discovery service map (name -> refcnt)
self._discovery = {} self._discovery = {}
# Volume map (volume_id -> Volume) # Volume map (volume_id -> Volume)
self._volumes = {} self._volumes = {}
self._discovery_timeout = int(discovery_timeout * 1000) self._discovery_timeout = int(discovery_timeout * 1000)
self._cleanup_period = cleanup_period
self._lock = threading.Lock()
self._cv = threading.Condition(self._lock)
self._running = False
self._thread = None
def _locked(f):
def wrapper(self, *args, **kwargs):
self._lock.acquire()
try:
return f(self, *args, **kwargs)
finally:
self._lock.release()
return wrapper
def start(self):
if self._thread is not None:
raise ValueError('Volume manager was already started')
self._running = True
self._thread = threading.Thread(target=self._cleanup_thread, args=(self,))
self._thread.start()
def stop(self):
if self._thread is None:
return
with self._lock:
self._running = False
self._cv.notify_all()
self._thread.join()
self._thread = None
@staticmethod
def _cleanup_thread(*args):
self, = args
with self._lock:
while self._running:
self._cleanup_volumes()
self._cv.wait(self._cleanup_period)
def _cleanup_volumes(self):
try:
disconnected = []
with self._client() as client:
bdevs = client.call('bdev_get_bdevs')
for volume_id in self._volumes:
if volume_id not in [b['uuid'] for b in bdevs]:
log.warning(f'Found disconnected volume: {volume_id}')
disconnected.append(volume_id)
for volume_id in disconnected:
self._disconnect_volume(volume_id)
except VolumeException as ex:
log.error(f'Failure when trying to disconnect volumes: {ex.message}')
except JSONRPCException as ex:
log.error(f'Failed to retrieve bdevs: {ex.message}')
def _get_discovery_info(self): def _get_discovery_info(self):
try: try:
@ -102,6 +157,7 @@ class VolumeManager:
raise VolumeException(grpc.StatusCode.INTERNAL, raise VolumeException(grpc.StatusCode.INTERNAL,
'Failed to stop discovery') 'Failed to stop discovery')
@_locked
def connect_volume(self, params, device_handle=None): def connect_volume(self, params, device_handle=None):
""" Connects a volume through a discovery service. Returns a tuple (volume_id, existing): """ Connects a volume through a discovery service. Returns a tuple (volume_id, existing):
the first item is a volume_id as str, while the second denotes whether the selected volume the first item is a volume_id as str, while the second denotes whether the selected volume
@ -173,8 +229,7 @@ class VolumeManager:
raise ex raise ex
return volume_id, False return volume_id, False
def disconnect_volume(self, volume_id): def _disconnect_volume(self, volume_id):
"""Disconnects a volume connected through discovery service"""
id = format_volume_id(volume_id) id = format_volume_id(volume_id)
if id is None: if id is None:
raise VolumeException(grpc.StatusCode.INVALID_ARGUMENT, raise VolumeException(grpc.StatusCode.INVALID_ARGUMENT,
@ -193,6 +248,12 @@ class VolumeManager:
log.error(f'Failed to stop discovery service: {name}') log.error(f'Failed to stop discovery service: {name}')
del self._volumes[id] del self._volumes[id]
@_locked
def disconnect_volume(self, volume_id):
"""Disconnects a volume connected through discovery service"""
return self._disconnect_volume(volume_id)
@_locked
def set_device(self, volume_id, device_handle): def set_device(self, volume_id, device_handle):
"""Marks a previously connected volume as being attached to specified device. This is only """Marks a previously connected volume as being attached to specified device. This is only
necessary if the device handle is not known at a time a volume is connected. necessary if the device handle is not known at a time a volume is connected.
@ -210,8 +271,9 @@ class VolumeManager:
'Volume is already attached to a different device') 'Volume is already attached to a different device')
volume.device_handle = device_handle volume.device_handle = device_handle
@_locked
def disconnect_device_volumes(self, device_handle): def disconnect_device_volumes(self, device_handle):
"""Disconnects all volumes attached to a specific device""" """Disconnects all volumes attached to a specific device"""
volumes = [i for i, v in self._volumes.items() if v.device_handle == device_handle] volumes = [i for i, v in self._volumes.items() if v.device_handle == device_handle]
for volume_id in volumes: for volume_id in volumes:
self.disconnect_volume(volume_id) self._disconnect_volume(volume_id)

View File

@ -33,7 +33,8 @@ def parse_argv():
defaults = {'address': 'localhost', defaults = {'address': 'localhost',
'socket': '/var/tmp/spdk.sock', 'socket': '/var/tmp/spdk.sock',
'port': 8080, 'port': 8080,
'discovery_timeout': 10.0} 'discovery_timeout': 10.0,
'volume_cleanup_period': 60.0}
# Merge the default values, config file, and the command-line # Merge the default values, config file, and the command-line
args = vars(parser.parse_args()) args = vars(parser.parse_args())
config = parse_config(args.get('config')) config = parse_config(args.get('config'))

View File

@ -18,6 +18,7 @@ t2dscport2=8011
t1nqn='nqn.2016-06.io.spdk:node1' t1nqn='nqn.2016-06.io.spdk:node1'
t2nqn='nqn.2016-06.io.spdk:node2' t2nqn='nqn.2016-06.io.spdk:node2'
hostnqn='nqn.2016-06.io.spdk:host0' hostnqn='nqn.2016-06.io.spdk:host0'
cleanup_period=1
function cleanup() { function cleanup() {
killprocess $smapid killprocess $smapid
@ -141,6 +142,7 @@ tgtpid=$!
$rootdir/scripts/sma.py -c <( $rootdir/scripts/sma.py -c <(
cat <<- EOF cat <<- EOF
discovery_timeout: 5 discovery_timeout: 5
volume_cleanup_period: $cleanup_period
devices: devices:
- name: 'nvmf_tcp' - name: 'nvmf_tcp'
EOF EOF
@ -398,6 +400,35 @@ NOT attach_volume $device_id $(uuidgen) $invalid_port
[[ $($rpc_py bdev_nvme_get_discovery_info | jq -r '. | length') -eq 1 ]] [[ $($rpc_py bdev_nvme_get_discovery_info | jq -r '. | length') -eq 1 ]]
$rpc_py bdev_nvme_get_discovery_info | jq -r '.[].trid.trsvcid' | grep $t1dscport $rpc_py bdev_nvme_get_discovery_info | jq -r '.[].trid.trsvcid' | grep $t1dscport
# Make sure that the discovery service is stopped if a volume is disconnected outside of SMA (e.g.
# by removing it from the target)
$rpc_py -s $t1sock nvmf_subsystem_remove_ns $t1nqn 1
# Give SMA some time to be notified about the change
sleep $((cleanup_period + 1))
[[ $($rpc_py bdev_nvme_get_discovery_info | jq -r '. | length') -eq 0 ]]
$rpc_py -s $t1sock nvmf_subsystem_add_ns $t1nqn $t1uuid
# Do the same, but this time attach two volumes and check that the discovery service is only
# stopped once both volumes are disconnected
attach_volume $device_id $t2uuid $t2dscport1
attach_volume $device_id $t2uuid2 $t2dscport1
[[ $($rpc_py nvmf_get_subsystems $localnqn | jq -r '.[].namespaces | length') -eq 2 ]]
[[ $($rpc_py bdev_nvme_get_discovery_info | jq -r '. | length') -eq 1 ]]
$rpc_py -s $t2sock nvmf_subsystem_remove_ns $t2nqn 2
# Give SMA some time to be notified about the change
sleep $((cleanup_period + 1))
# One of the volumes should be gone, but the discovery service should still be running
[[ $($rpc_py nvmf_get_subsystems $localnqn | jq -r '.[].namespaces | length') -eq 1 ]]
[[ $($rpc_py bdev_nvme_get_discovery_info | jq -r '. | length') -eq 1 ]]
$rpc_py -s $t2sock nvmf_subsystem_remove_ns $t2nqn 1
# Give SMA some time to be notified about the change
sleep $((cleanup_period + 1))
# Now that both are gone, the discovery service should be stopped too
[[ $($rpc_py nvmf_get_subsystems $localnqn | jq -r '.[].namespaces | length') -eq 0 ]]
[[ $($rpc_py bdev_nvme_get_discovery_info | jq -r '. | length') -eq 0 ]]
$rpc_py -s $t2sock nvmf_subsystem_add_ns $t2nqn $t2uuid
$rpc_py -s $t2sock nvmf_subsystem_add_ns $t2nqn $t2uuid2
delete_device $device_id delete_device $device_id
cleanup cleanup