Since SMA keeps track of attached volumes and a volume might get disconnected at any time (in which case they're also removed from the host side), a mechanism is needed to monitor if any of the volumes are no longer accessible. This patch implements that mechanism by adding a new thread running in the background that will periodically (60s by default) send a bdev_get_bdevs RPC and check that all previously attached volumes are available. If any of them are not, it'll remove it and stop the associated discovery services (if their refcount goes down to zero). The period can be changed through the `volume_cleanup_period` variable in the config file. One important thing to note is that we assume that any intermittent connection issues are handled internally by the SPDK application and a bdev is only removed after all reconnection attempts are performed. Signed-off-by: Konrad Sztyber <konrad.sztyber@intel.com> Change-Id: I5b9e63698879527d9f79156a0eda1c8bc5e66def Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/12699 Community-CI: Broadcom CI <spdk-ci.pdl@broadcom.com> Tested-by: SPDK CI Jenkins <sys_sgci@intel.com> Reviewed-by: Jim Harris <james.r.harris@intel.com> Reviewed-by: Tomasz Zawadzki <tomasz.zawadzki@intel.com>
141 lines
5.7 KiB
Python
141 lines
5.7 KiB
Python
from concurrent import futures
|
|
from contextlib import contextmanager
|
|
from multiprocessing import Lock
|
|
import grpc
|
|
import logging
|
|
from .device import DeviceException
|
|
from .volume import VolumeException, VolumeManager
|
|
from .proto import sma_pb2 as pb2
|
|
from .proto import sma_pb2_grpc as pb2_grpc
|
|
|
|
|
|
class StorageManagementAgent(pb2_grpc.StorageManagementAgentServicer):
|
|
def __init__(self, config, client):
|
|
addr, port = config['address'], config['port']
|
|
self._devices = {}
|
|
self._server = grpc.server(futures.ThreadPoolExecutor(max_workers=1))
|
|
self._server.add_insecure_port(f'{addr}:{port}')
|
|
self._volume_mgr = VolumeManager(client, config['discovery_timeout'],
|
|
config['volume_cleanup_period'])
|
|
pb2_grpc.add_StorageManagementAgentServicer_to_server(self, self._server)
|
|
|
|
def _grpc_method(f):
|
|
def wrapper(self, request, context):
|
|
logging.debug(f'{f.__name__}\n{request}')
|
|
return f(self, request, context)
|
|
return wrapper
|
|
|
|
def register_device(self, device_manager):
|
|
self._devices[device_manager.protocol] = device_manager
|
|
|
|
def start(self):
|
|
self._volume_mgr.start()
|
|
self._server.start()
|
|
|
|
def stop(self):
|
|
self._server.stop(None)
|
|
self._volume_mgr.stop()
|
|
|
|
def _find_device_by_name(self, name):
|
|
return self._devices.get(name)
|
|
|
|
def _find_device_by_handle(self, handle):
|
|
for device in self._devices.values():
|
|
try:
|
|
if device.owns_device(handle):
|
|
return device
|
|
except NotImplementedError:
|
|
pass
|
|
return None
|
|
|
|
def _cleanup_volume(self, volume_id, existing):
|
|
if volume_id is None or existing:
|
|
return
|
|
try:
|
|
self._volume_mgr.disconnect_volume(volume_id)
|
|
except VolumeException:
|
|
logging.warning('Failed to cleanup volume {volume_id}')
|
|
|
|
@_grpc_method
|
|
def CreateDevice(self, request, context):
|
|
response = pb2.CreateDeviceResponse()
|
|
volume_id, existing = None, False
|
|
try:
|
|
if request.HasField('volume'):
|
|
volume_id, existing = self._volume_mgr.connect_volume(request.volume)
|
|
|
|
manager = self._find_device_by_name(request.WhichOneof('params'))
|
|
if manager is None:
|
|
raise DeviceException(grpc.StatusCode.INVALID_ARGUMENT,
|
|
'Unsupported device type')
|
|
response = manager.create_device(request)
|
|
# Now that we know the device handle, mark the volume as attached to
|
|
# that device
|
|
if volume_id is not None:
|
|
self._volume_mgr.set_device(volume_id, response.handle)
|
|
except (DeviceException, VolumeException) as ex:
|
|
self._cleanup_volume(volume_id, existing)
|
|
context.set_details(ex.message)
|
|
context.set_code(ex.code)
|
|
except NotImplementedError:
|
|
self._cleanup_volume(volume_id, existing)
|
|
context.set_details('Method is not implemented by selected device type')
|
|
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
|
return response
|
|
|
|
@_grpc_method
|
|
def DeleteDevice(self, request, context):
|
|
response = pb2.DeleteDeviceResponse()
|
|
try:
|
|
device = self._find_device_by_handle(request.handle)
|
|
if device is None:
|
|
raise DeviceException(grpc.StatusCode.NOT_FOUND,
|
|
'Invalid device handle')
|
|
device.delete_device(request)
|
|
# Remove all volumes attached to that device
|
|
self._volume_mgr.disconnect_device_volumes(request.handle)
|
|
except DeviceException as ex:
|
|
context.set_details(ex.message)
|
|
context.set_code(ex.code)
|
|
except NotImplementedError:
|
|
context.set_details('Method is not implemented by selected device type')
|
|
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
|
return response
|
|
|
|
@_grpc_method
|
|
def AttachVolume(self, request, context):
|
|
response = pb2.AttachVolumeResponse()
|
|
volume_id, existing = None, False
|
|
try:
|
|
if not request.HasField('volume'):
|
|
raise VolumeException(grpc.StatusCode.INVALID_ARGUMENT,
|
|
'Missing required field: volume')
|
|
volume_id, existing = self._volume_mgr.connect_volume(request.volume,
|
|
request.device_handle)
|
|
device = self._find_device_by_handle(request.device_handle)
|
|
if device is None:
|
|
raise DeviceException(grpc.StatusCode.NOT_FOUND, 'Invalid device handle')
|
|
device.attach_volume(request)
|
|
except (DeviceException, VolumeException) as ex:
|
|
self._cleanup_volume(volume_id, existing)
|
|
context.set_details(ex.message)
|
|
context.set_code(ex.code)
|
|
except NotImplementedError:
|
|
self._cleanup_volume(volume_id, existing)
|
|
context.set_details('Method is not implemented by selected device type')
|
|
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
|
return response
|
|
|
|
@_grpc_method
|
|
def DetachVolume(self, request, context):
|
|
response = pb2.DetachVolumeResponse()
|
|
try:
|
|
device = self._find_device_by_handle(request.device_handle)
|
|
if device is not None:
|
|
device.detach_volume(request)
|
|
self._volume_mgr.disconnect_volume(request.volume_id)
|
|
except DeviceException as ex:
|
|
context.set_details(ex.message)
|
|
context.set_code(ex.code)
|
|
return response
|