diff --git a/python/spdk/sma/sma.py b/python/spdk/sma/sma.py index 5865fc5d4..e8236be08 100644 --- a/python/spdk/sma/sma.py +++ b/python/spdk/sma/sma.py @@ -4,16 +4,18 @@ from multiprocessing import Lock import grpc import logging from .device import DeviceException +from .volume import VolumeException, VolumeManager from .proto import sma_pb2 as pb2 from .proto import sma_pb2_grpc as pb2_grpc class StorageManagementAgent(pb2_grpc.StorageManagementAgentServicer): - def __init__(self, config): + def __init__(self, config, client): addr, port = config['address'], config['port'] self._devices = {} self._server = grpc.server(futures.ThreadPoolExecutor(max_workers=1)) self._server.add_insecure_port(f'{addr}:{port}') + self._volume_mgr = VolumeManager(client, config['discovery_timeout']) pb2_grpc.add_StorageManagementAgentServicer_to_server(self, self._server) def _grpc_method(f): @@ -43,19 +45,37 @@ class StorageManagementAgent(pb2_grpc.StorageManagementAgentServicer): pass return None + def _cleanup_volume(self, volume_id, existing): + if volume_id is None or existing: + return + try: + self._volume_mgr.disconnect_volume(volume_id) + except VolumeException: + logging.warning('Failed to cleanup volume {volume_id}') + @_grpc_method def CreateDevice(self, request, context): response = pb2.CreateDeviceResponse() + volume_id, existing = None, False try: + if request.HasField('volume'): + volume_id, existing = self._volume_mgr.connect_volume(request.volume) + manager = self._find_device_by_name(request.WhichOneof('params')) if manager is None: raise DeviceException(grpc.StatusCode.INVALID_ARGUMENT, 'Unsupported device type') response = manager.create_device(request) - except DeviceException as ex: + # Now that we know the device handle, mark the volume as attached to + # that device + if volume_id is not None: + self._volume_mgr.set_device(volume_id, response.handle) + except (DeviceException, VolumeException) as ex: + self._cleanup_volume(volume_id, existing) context.set_details(ex.message) context.set_code(ex.code) except NotImplementedError: + self._cleanup_volume(volume_id, existing) context.set_details('Method is not implemented by selected device type') context.set_code(grpc.StatusCode.UNIMPLEMENTED) return response @@ -69,6 +89,8 @@ class StorageManagementAgent(pb2_grpc.StorageManagementAgentServicer): raise DeviceException(grpc.StatusCode.NOT_FOUND, 'Invalid device handle') device.delete_device(request) + # Remove all volumes attached to that device + self._volume_mgr.disconnect_device_volumes(request.handle) except DeviceException as ex: context.set_details(ex.message) context.set_code(ex.code) @@ -80,15 +102,23 @@ class StorageManagementAgent(pb2_grpc.StorageManagementAgentServicer): @_grpc_method def AttachVolume(self, request, context): response = pb2.AttachVolumeResponse() + volume_id, existing = None, False try: + if not request.HasField('volume'): + raise VolumeException(grpc.StatusCode.INVALID_ARGUMENT, + 'Missing required field: volume') + volume_id, existing = self._volume_mgr.connect_volume(request.volume, + request.device_handle) device = self._find_device_by_handle(request.device_handle) if device is None: raise DeviceException(grpc.StatusCode.NOT_FOUND, 'Invalid device handle') device.attach_volume(request) - except DeviceException as ex: + except (DeviceException, VolumeException) as ex: + self._cleanup_volume(volume_id, existing) context.set_details(ex.message) context.set_code(ex.code) except NotImplementedError: + self._cleanup_volume(volume_id, existing) context.set_details('Method is not implemented by selected device type') context.set_code(grpc.StatusCode.UNIMPLEMENTED) return response @@ -100,6 +130,7 @@ class StorageManagementAgent(pb2_grpc.StorageManagementAgentServicer): device = self._find_device_by_handle(request.device_handle) if device is not None: device.detach_volume(request) + self._volume_mgr.disconnect_volume(request.volume_id) except DeviceException as ex: context.set_details(ex.message) context.set_code(ex.code) diff --git a/python/spdk/sma/volume/__init__.py b/python/spdk/sma/volume/__init__.py new file mode 100644 index 000000000..e21ebe6c5 --- /dev/null +++ b/python/spdk/sma/volume/__init__.py @@ -0,0 +1,2 @@ +from .volume import VolumeException +from .volume import VolumeManager diff --git a/python/spdk/sma/volume/volume.py b/python/spdk/sma/volume/volume.py new file mode 100644 index 000000000..ac01385c7 --- /dev/null +++ b/python/spdk/sma/volume/volume.py @@ -0,0 +1,217 @@ +import grpc +import ipaddress +import logging +import uuid +from dataclasses import dataclass +from spdk.rpc.client import JSONRPCException +from ..common import format_volume_id +from ..proto import sma_pb2 + + +log = logging.getLogger(__name__) + + +class VolumeException(Exception): + def __init__(self, code, message): + self.code = code + self.message = message + + +class Volume: + def __init__(self, volume_id, device_handle, discovery_services): + self.volume_id = volume_id + self.discovery_services = discovery_services + self.device_handle = device_handle + + +class VolumeManager: + def __init__(self, client, discovery_timeout): + self._client = client + # Discovery service map (name -> refcnt) + self._discovery = {} + # Volume map (volume_id -> Volume) + self._volumes = {} + self._discovery_timeout = int(discovery_timeout * 1000) + + def _get_discovery_info(self): + try: + with self._client() as client: + return client.call('bdev_nvme_get_discovery_info') + except JSONRPCException: + raise VolumeException(grpc.StatusCode.INTERNAL, + 'Failed to retrieve discovery service status') + + def _compare_trid(self, trid1, trid2): + return (trid1['trtype'].lower() == trid2['trtype'].lower() and + trid1['traddr'].lower() == trid2['traddr'].lower() and + trid1['trsvcid'].lower() == trid2['trsvcid'].lower() and + trid1['adrfam'].lower() == trid2['adrfam'].lower()) + + def _get_adrfam(self, traddr): + try: + return 'ipv{}'.format(ipaddress.ip_address(traddr).version) + except ValueError: + raise VolumeException(grpc.StatusCode.INVALID_ARGUMENT, + 'Invalid traddr') + + def _get_volume_bdev(self, volume_id, timeout): + try: + with self._client() as client: + return client.call('bdev_get_bdevs', + {'name': volume_id, + 'timeout': timeout})[0] + except JSONRPCException: + return None + + def _start_discovery(self, trid, hostnqn): + try: + # Use random UUID as name + name = str(uuid.uuid4()) + log.debug(f'Starting discovery service {name}') + with self._client() as client: + client.call('bdev_nvme_start_discovery', + {'name': name, + 'wait_for_attach': True, + 'attach_timeout_ms': self._discovery_timeout, + 'hostnqn': hostnqn, + **trid}) + self._discovery[name] = 1 + return name + except JSONRPCException: + raise VolumeException(grpc.StatusCode.INTERNAL, + 'Failed to start discovery') + + def _stop_discovery(self, name): + refcnt = self._discovery.get(name) + log.debug(f'Stopping discovery service {name}, refcnt={refcnt}') + if refcnt is None: + # Should never happen + log.warning('Tried to stop discovery using non-existing name') + return + # Check the refcount to leave the service running if there are more volumes using it + if refcnt > 1: + self._discovery[name] = refcnt - 1 + return + del self._discovery[name] + try: + with self._client() as client: + client.call('bdev_nvme_stop_discovery', + {'name': name}) + log.debug(f'Stopped discovery service {name}') + except JSONRPCException: + raise VolumeException(grpc.StatusCode.INTERNAL, + 'Failed to stop discovery') + + def connect_volume(self, params, device_handle=None): + """ Connects a volume through a discovery service. Returns a tuple (volume_id, existing): + the first item is a volume_id as str, while the second denotes whether the selected volume + existed prior to calling this method. + """ + volume_id = format_volume_id(params.volume_id) + if volume_id is None: + raise VolumeException(grpc.StatusCode.INVALID_ARGUMENT, + 'Invalid volume ID') + if volume_id in self._volumes: + volume = self._volumes[volume_id] + if device_handle is not None and volume.device_handle != device_handle: + raise VolumeException(grpc.StatusCode.ALREADY_EXISTS, + 'Volume is already attached to a different device') + return volume_id, True + discovery_services = set() + try: + # First start discovery connecting to specified endpoints + for req_ep in params.nvmf.discovery.discovery_endpoints: + info = self._get_discovery_info() + trid = {'trtype': req_ep.trtype, + 'traddr': req_ep.traddr, + 'trsvcid': req_ep.trsvcid, + 'adrfam': self._get_adrfam(req_ep.traddr)} + name = None + for discovery in info: + if self._compare_trid(discovery['trid'], trid): + name = discovery['name'] + break + if next(filter(lambda r: self._compare_trid(r['trid'], trid), + discovery['referrals']), None): + name = discovery['name'] + break + if name is not None: + # If we've already attached a discovery service, it probably means that the user + # specified a referred address + if name not in discovery_services: + refcnt = self._discovery.get(name) + if refcnt is None: + log.warning('Found a discovery service missing from internal map') + refcnt = 0 + self._discovery[name] = refcnt + 1 + else: + name = self._start_discovery(trid, params.nvmf.hostnqn) + discovery_services.add(name) + + # Now check if a bdev with specified volume_id exists, give it 1s to appear + bdev = self._get_volume_bdev(volume_id, timeout=1000) + if bdev is None: + raise VolumeException(grpc.StatusCode.NOT_FOUND, + 'Volume could not be found') + # Check subsystem's NQN if it's specified + if params.nvmf.subnqn: + nvme = bdev.get('driver_specific', {}).get('nvme', []) + # The NVMe bdev can report multiple subnqns, but they all should be the same, so + # don't bother checking more than the first one + subnqn = next(iter(nvme), {}).get('trid', {}).get('subnqn') + if subnqn != params.nvmf.subnqn: + raise VolumeException(grpc.StatusCode.INVALID_ARGUMENT, + 'Unexpected subsystem NQN') + # Finally remember that volume + self._volumes[volume_id] = Volume(volume_id, device_handle, discovery_services) + except Exception as ex: + for name in discovery_services: + try: + self._stop_discovery(name) + except Exception: + log.warning(f'Failed to cleanup discovery service: {name}') + raise ex + return volume_id, False + + def disconnect_volume(self, volume_id): + """Disconnects a volume connected through discovery service""" + id = format_volume_id(volume_id) + if id is None: + raise VolumeException(grpc.StatusCode.INVALID_ARGUMENT, + 'Invalid volume ID') + # Return immediately if the volume is not on our map + volume = self._volumes.get(id) + if volume is None: + return + # Delete the volume from the map and stop the services it uses + for name in volume.discovery_services: + try: + self._stop_discovery(name) + except Exception: + # There's no good way to handle this, so just print an error message and + # continue + log.error(f'Failed to stop discovery service: {name}') + del self._volumes[id] + + def set_device(self, volume_id, device_handle): + """Marks a previously connected volume as being attached to specified device. This is only + necessary if the device handle is not known at a time a volume is connected. + """ + id = format_volume_id(volume_id) + if id is None: + raise VolumeException(grpc.StatusCode.INVALID_ARGUMENT, + 'Invalid volume ID') + volume = self._volumes.get(id) + if volume is None: + raise VolumeException(grpc.StatusCode.NOT_FOUND, + 'Volume could not be found') + if volume.device_handle is not None and volume.device_handle != device_handle: + raise VolumeException(grpc.StatusCode.ALREADY_EXISTS, + 'Volume is already attached to a different device') + volume.device_handle = device_handle + + def disconnect_device_volumes(self, device_handle): + """Disconnects all volumes attached to a specific device""" + volumes = [i for i, v in self._volumes.items() if v.device_handle == device_handle] + for volume_id in volumes: + self.disconnect_volume(volume_id) diff --git a/scripts/sma.py b/scripts/sma.py index 1978abcfc..8ae9cd57b 100755 --- a/scripts/sma.py +++ b/scripts/sma.py @@ -32,7 +32,8 @@ def parse_argv(): parser.add_argument('--config', '-c', help='Path to config file') defaults = {'address': 'localhost', 'socket': '/var/tmp/spdk.sock', - 'port': 8080} + 'port': 8080, + 'discovery_timeout': 10.0} # Merge the default values, config file, and the command-line args = vars(parser.parse_args()) config = parse_config(args.get('config')) @@ -116,7 +117,7 @@ if __name__ == '__main__': # Wait until the SPDK process starts responding to RPCs wait_for_listen(client, timeout=60.0) - agent = sma.StorageManagementAgent(config) + agent = sma.StorageManagementAgent(config, client) devices = [sma.NvmfTcpDeviceManager(client)] devices += load_plugins(config.get('plugins') or [], client)