Spdk/python/spdk/sma/device/nvmf_vfiouser.py
spdk 01471e63bc sma: fix multiple bus support for NVMe/vfio_user plugin
In original code, device search was performed only on first
attached bus. If device resided on second attached bus, it couldn't
be found and properly deleted in qemu.

This patch allows to traverse all attached buses to find required
device.

Signed-off-by: Artsiom Koltun <artsiom.koltun@intel.com>
Change-Id: I0840d0a2367d3382c6723ee4cdb6d768128c8032
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/14651
Reviewed-by: Konrad Sztyber <konrad.sztyber@intel.com>
Reviewed-by: <sebastian.brzezinka@intel.com>
Reviewed-by: Tomasz Zawadzki <tomasz.zawadzki@intel.com>
Tested-by: SPDK CI Jenkins <sys_sgci@intel.com>
2022-09-23 11:33:07 +00:00

318 lines
13 KiB
Python

import logging
import os
import shutil
from contextlib import contextmanager
from socket import AddressFamily
import grpc
from google.protobuf import wrappers_pb2 as wrap
from spdk.rpc.client import JSONRPCException
from spdk.sma import qos
from ..common import format_volume_id, volume_id_to_nguid
from ..proto import sma_pb2
from ..qmp import QMPClient, QMPError
from ..volume import CryptoException, get_crypto_engine
from .device import DeviceException, DeviceManager
log = logging.getLogger(__name__)
class NvmfVfioDeviceManager(DeviceManager):
def __init__(self, client):
super().__init__('vfiouser', 'nvme', client)
def init(self, config):
self._buses = config.get('buses', [])
try:
if len(self._buses) != len(list({v['name']: v for v in self._buses}.values())):
raise DeviceException(grpc.StatusCode.INVALID_ARGUMENT,
'Duplicate PCI bridge names')
except KeyError:
raise DeviceException(grpc.StatusCode.INVALID_ARGUMENT,
'PCI bridge name is missing')
for bus in self._buses:
bus['count'] = bus.get('count', 32)
if bus['count'] < 0:
raise DeviceException(grpc.StatusCode.INVALID_ARGUMENT,
'Incorrect PCI bridge count')
self._qmp_addr = (config.get('qmp_addr', '127.0.0.1'), config.get('qmp_port'))
self._sock_path = config.get('sock_path', '/var/tmp/')
self._prefix = f'{self.protocol}'
if not self._create_transport():
raise DeviceException(grpc.StatusCode.INTERNAL,
'NVMe/vfiouser transport is unavailable')
def _create_transport(self):
try:
with self._client() as client:
transports = client.call('nvmf_get_transports')
for transport in transports:
if transport['trtype'].lower() == 'vfiouser':
return True
return client.call('nvmf_create_transport', {'trtype': 'vfiouser'})
except JSONRPCException:
logging.error(f'Transport query NVMe/vfiouser failed')
return False
@contextmanager
def _client_wrap(self):
try:
with self._client() as client:
yield client
except JSONRPCException:
raise DeviceException(grpc.StatusCode.INTERNAL,
'Failed to connect to SPDK application')
def _get_subsys(self, client, nqn):
try:
return client.call('nvmf_get_subsystems', {'nqn': nqn})[0]
except JSONRPCException:
return False
def _create_subsystem(self, client, subnqn):
try:
if self._get_subsys(client, subnqn):
return True
return client.call('nvmf_create_subsystem', {'nqn': subnqn, 'allow_any_host': True})
except JSONRPCException:
logging.error('Failed to create subsystem')
return False
def _delete_subsystem(self, client, subnqn):
try:
if not self._get_subsys(client, subnqn):
return True
return client.call('nvmf_delete_subsystem', {'nqn': subnqn})
except JSONRPCException:
logging.error('Failed to delete subsystem')
return False
def _subsystem_add_listener(self, client, subnqn, addr):
try:
return client.call('nvmf_subsystem_add_listener',
{'nqn': subnqn,
'listen_address': {
'trtype': 'vfiouser',
'traddr': addr}})
except JSONRPCException:
logging.error('Failed to add listener')
return False
def _create_socket_dir(self, dev_id):
try:
path = os.path.join(self._sock_path, dev_id)
if os.path.exists(path):
shutil.rmtree(path)
os.makedirs(path)
if os.path.isdir(path):
return path
except OSError:
raise DeviceException(grpc.StatusCode.INTERNAL, f'Socket path error {path}')
def _find_pcidev(self, qclient, name):
def rsearch(devices, name):
found_dev = None
for dev in devices:
if dev['qdev_id'] == name:
found_dev = dev
elif 'pci_bridge' in dev:
found_dev = rsearch(dev['pci_bridge']['devices'], name)
if found_dev:
break
return found_dev
try:
buses = qclient.query_pci()['return']
for bus in buses:
rc = rsearch(bus['devices'], name)
if rc is not None:
return rc
except QMPError:
return None
def _qmp_add_device(self, phid, dev_id):
# Find a bus that the physical_id maps to
for bus in self._buses:
if phid >= bus['count']:
phid = phid - bus['count']
else:
break
else:
raise DeviceException(grpc.StatusCode.INVALID_ARGUMENT, 'Invalid physical_id')
try:
with QMPClient(self._qmp_addr, AddressFamily.AF_INET) as qclient:
if self._find_pcidev(qclient, dev_id) is None:
qclient.device_add({'driver': 'vfio-user-pci',
'x-enable-migration': 'on',
'socket': os.path.join(self._sock_path, dev_id, 'cntrl'),
'bus': bus.get('name'),
'addr': hex(phid),
'id': dev_id})
return True
except QMPError:
logging.error('QMP: Failed to add device')
return False
def _create_device(self, physical_id):
with self._client_wrap() as client:
dev_id = f'{self.name}-{physical_id}'
subnqn = f'nqn.2016-06.io.spdk:{dev_id}'
rc = self._create_subsystem(client, subnqn)
if not rc:
raise DeviceException(grpc.StatusCode.INTERNAL,
'Failed to create the NVMe/vfiouser subsystem')
rc = self._subsystem_add_listener(client, subnqn,
self._create_socket_dir(dev_id))
if not rc:
self._delete_subsystem(client, subnqn)
raise DeviceException(grpc.StatusCode.INTERNAL,
'Failed to add the NVMe/vfiouser listener')
rc = self._qmp_add_device(physical_id, dev_id)
if not rc:
self._delete_subsystem(client, subnqn)
raise DeviceException(grpc.StatusCode.INTERNAL,
'Failed to create NVMe/vfiouser device')
return subnqn
def create_device(self, request):
if request.nvme.virtual_id != 0:
raise DeviceException(grpc.StatusCode.INVALID_ARGUMENT,
'Unsupported virtual_id value')
subnqn = self._create_device(request.nvme.physical_id)
return sma_pb2.CreateDeviceResponse(handle=f'{self._prefix}:{subnqn}')
def _qmp_delete_device(self, dev_id):
try:
with QMPClient(self._qmp_addr, AddressFamily.AF_INET) as qclient:
if self._find_pcidev(qclient, dev_id) is not None:
qclient.device_del({'id': dev_id})
return True
except QMPError:
logging.error('QMP: Failed to delete device')
return False
def delete_device(self, request):
with self._client_wrap() as client:
nqn = request.handle[len(f'{self._prefix}:'):]
dev_id = nqn[len('nqn.2016-06.io.spdk:'):]
if not self._delete_subsystem(client, nqn):
raise DeviceException(grpc.StatusCode.INTERNAL,
'Failed to delete NVMe/vfiouser device')
if not self._qmp_delete_device(dev_id):
raise DeviceException(grpc.StatusCode.INTERNAL,
'Failed to delete NVMe/vfiouser device')
try:
path = os.path.join(self._sock_path, dev_id)
if os.path.exists(path):
shutil.rmtree(path)
except OSError:
raise DeviceException(grpc.StatusCode.INTERNAL, f'Socket path error {path}')
def _get_bdev(self, client, guid):
try:
bdev_name = get_crypto_engine().get_crypto_bdev(guid) or guid
return client.call('bdev_get_bdevs', {'name': bdev_name})[0]
except (JSONRPCException, CryptoException):
logging.error('Failed to find bdev')
return None
def _get_ns(self, bdev, subsystem):
for ns in subsystem['namespaces']:
if ns['name'] == bdev['name']:
return ns
def _subsystem_add_ns(self, client, bdev, subsystem, subnqn, volume_id):
try:
if self._get_ns(bdev, subsystem) is not None:
return True
return client.call('nvmf_subsystem_add_ns',
{'nqn': subnqn,
'namespace': {
'bdev_name': bdev['name'],
'uuid': volume_id,
'nguid': volume_id_to_nguid(volume_id)}})
except JSONRPCException:
logging.error('Failed to add ns')
return False
def attach_volume(self, request):
nqn = request.device_handle[len(f'{self._prefix}:'):]
volume_id = format_volume_id(request.volume.volume_id)
with self._client_wrap() as client:
bdev = self._get_bdev(client, volume_id)
if bdev is None:
raise DeviceException(grpc.StatusCode.NOT_FOUND,
'Invalid volume GUID')
subsys = self._get_subsys(client, nqn)
if subsys is None:
raise DeviceException(grpc.StatusCode.NOT_FOUND,
'Invalid device handle')
result = self._subsystem_add_ns(client, bdev, subsys, nqn, volume_id)
if not result:
raise DeviceException(grpc.StatusCode.INTERNAL,
'Failed to attach volume')
def _subsystem_remove_ns(self, client, bdev, subsystem, subnqn):
try:
ns = self._get_ns(bdev, subsystem)
if ns is None:
return True
return client.call('nvmf_subsystem_remove_ns',
{'nqn': subnqn, 'nsid': ns['nsid']})
except JSONRPCException:
logging.error('Failed to remove ns')
return False
def detach_volume(self, request):
nqn = request.device_handle[len(f'{self._prefix}:'):]
volume_id = format_volume_id(request.volume_id)
with self._client_wrap() as client:
bdev = self._get_bdev(client, volume_id)
if bdev is None:
raise DeviceException(grpc.StatusCode.NOT_FOUND,
'Invalid volume GUID')
subsys = self._get_subsys(client, nqn)
if subsys is None:
raise DeviceException(grpc.StatusCode.NOT_FOUND,
'Invalid device handle')
result = self._subsystem_remove_ns(client, bdev, subsys, nqn)
if not result:
raise DeviceException(grpc.StatusCode.INTERNAL,
'Failed to detach volume')
def owns_device(self, id):
return id.startswith(self._prefix)
def set_qos(self, request):
nqn = request.device_handle[len(f'{self._prefix}:'):]
volume = format_volume_id(request.volume_id)
if volume is None:
raise DeviceException(grpc.StatusCode.INVALID_ARGUMENT,
'Invalid volume ID')
try:
with self._client() as client:
# Make sure that a volume exists and is attached to the device
bdev = self._get_bdev(client, volume)
if bdev is None:
raise DeviceException(grpc.StatusCode.NOT_FOUND,
'No volume associated with volume_id could be found')
subsys = self._get_subsys(client, nqn)
if subsys is None:
raise DeviceException(grpc.StatusCode.NOT_FOUND,
'No device associated with device_handle could be found')
ns = self._get_ns(bdev, subsys)
if ns is None:
raise DeviceException(grpc.StatusCode.INVALID_ARGUMENT,
'Specified volume is not attached to the device')
qos.set_volume_bdev_qos(client, request)
except qos.QosException as ex:
raise DeviceException(ex.code, ex.message)
except JSONRPCException:
raise DeviceException(grpc.StatusCode.INTERNAL,
'Failed to set QoS')
def get_qos_capabilities(self, request):
return qos.get_bdev_qos_capabilities()