Spdk/python/spdk/sma/device/nvmf_vfiouser.py
Milosz Linkiewicz d5d6efd8a8 sma: nvmf/vfiouser device manager implementation
This patch utilize generic sma implementation by adding vfiouser
devices manager. It's allow to expose virtualized block devices to
QEMU instances or other arbitrary processes.

Max device capacity depend on available `pci-bridge`
```yaml
devices:
  - name: 'vfiouser'
    params:
      buses:
        - name: 'pci.spdk.0'
          count: 32
        - name: 'pci.spdk.1'
          count: 32
      qmp_addr: 127.0.0.1
      qmp_port: 9090
```

Signed-off-by: Milosz Linkiewicz <milosz.linkiewicz@intel.com>
Signed-off-by: Sebastian Brzezinka <sebastian.brzezinka@intel.com>
Change-Id: I5ab43f4b877c371fa16a4daf4212ac2686991bd4
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/13004
Tested-by: SPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: Konrad Sztyber <konrad.sztyber@intel.com>
Reviewed-by: Tomasz Zawadzki <tomasz.zawadzki@intel.com>
2022-08-01 08:45:58 +00:00

276 lines
11 KiB
Python

import os
import grpc
import logging
from ..common import format_volume_id
from socket import AddressFamily
from spdk.rpc.client import JSONRPCException
import shutil
from .device import DeviceManager, DeviceException
from google.protobuf import wrappers_pb2 as wrap
from ..qmp import QMPClient, QMPError
from ..proto import sma_pb2
from contextlib import contextmanager
log = logging.getLogger(__name__)
class NvmfVfioDeviceManager(DeviceManager):
def __init__(self, client):
super().__init__('vfiouser', 'nvme', client)
def init(self, config):
self._buses = config.get('buses', [])
try:
if len(self._buses) != len(list({v['name']: v for v in self._buses}.values())):
raise DeviceException(grpc.StatusCode.INVALID_ARGUMENT,
'Duplicate PCI bridge names')
except KeyError:
raise DeviceException(grpc.StatusCode.INVALID_ARGUMENT,
'PCI bridge name is missing')
for bus in self._buses:
bus['count'] = bus.get('count', 32)
if bus['count'] < 0:
raise DeviceException(grpc.StatusCode.INVALID_ARGUMENT,
'Incorrect PCI bridge count')
self._qmp_addr = (config.get('qmp_addr', '127.0.0.1'), config.get('qmp_port'))
self._sock_path = config.get('sock_path', '/var/tmp/')
self._prefix = f'{self.protocol}'
if not self._create_transport():
raise DeviceException(grpc.StatusCode.INTERNAL,
'NVMe/vfiouser transport is unavailable')
def _create_transport(self):
try:
with self._client() as client:
transports = client.call('nvmf_get_transports')
for transport in transports:
if transport['trtype'].lower() == 'vfiouser':
return True
return client.call('nvmf_create_transport', {'trtype': 'vfiouser'})
except JSONRPCException:
logging.error(f'Transport query NVMe/vfiouser failed')
return False
@contextmanager
def _client_wrap(self):
try:
with self._client() as client:
yield client
except JSONRPCException:
raise DeviceException(grpc.StatusCode.INTERNAL,
'Failed to connect to SPDK application')
def _get_subsys(self, client, nqn):
try:
return client.call('nvmf_get_subsystems', {'nqn': nqn})[0]
except JSONRPCException:
return False
def _create_subsystem(self, client, subnqn):
try:
if self._get_subsys(client, subnqn):
return True
return client.call('nvmf_create_subsystem', {'nqn': subnqn, 'allow_any_host': True})
except JSONRPCException:
logging.error('Failed to create subsystem')
return False
def _delete_subsystem(self, client, subnqn):
try:
if not self._get_subsys(client, subnqn):
return True
return client.call('nvmf_delete_subsystem', {'nqn': subnqn})
except JSONRPCException:
logging.error('Failed to delete subsystem')
return False
def _subsystem_add_listener(self, client, subnqn, addr):
try:
return client.call('nvmf_subsystem_add_listener',
{'nqn': subnqn,
'listen_address': {
'trtype': 'vfiouser',
'traddr': addr}})
except JSONRPCException:
logging.error('Failed to add listener')
return False
def _create_socket_dir(self, dev_id):
try:
path = os.path.join(self._sock_path, dev_id)
if os.path.exists(path):
shutil.rmtree(path)
os.makedirs(path)
if os.path.isdir(path):
return path
except OSError:
raise DeviceException(grpc.StatusCode.INTERNAL, f'Socket path error {path}')
def _find_pcidev(self, qclient, name):
def rsearch(devices, name):
for dev in devices:
if dev['qdev_id'] == name:
return dev
if 'pci_bridge' in dev:
return rsearch(dev['pci_bridge']['devices'], name)
else:
pass
try:
buses = qclient.query_pci()['return']
for bus in buses:
rc = rsearch(bus['devices'], name)
if rc is not None:
return rc
except QMPError:
return None
def _qmp_add_device(self, phid, dev_id):
# Find a bus that the physical_id maps to
for bus in self._buses:
if phid >= bus['count']:
phid = phid - bus['count']
else:
break
else:
raise DeviceException(grpc.StatusCode.INVALID_ARGUMENT, 'Invalid physical_id')
try:
with QMPClient(self._qmp_addr, AddressFamily.AF_INET) as qclient:
if self._find_pcidev(qclient, dev_id) is None:
qclient.device_add({'driver': 'vfio-user-pci',
'x-enable-migration': 'on',
'socket': os.path.join(self._sock_path, dev_id, 'cntrl'),
'bus': bus.get('name'),
'addr': hex(phid),
'id': dev_id})
return True
except QMPError:
logging.error('QMP: Failed to add device')
return False
def _create_device(self, physical_id):
with self._client_wrap() as client:
dev_id = f'{self.name}-{physical_id}'
subnqn = f'nqn.2016-06.io.spdk:{dev_id}'
rc = self._create_subsystem(client, subnqn)
if not rc:
raise DeviceException(grpc.StatusCode.INTERNAL,
'Failed to create the NVMe/vfiouser subsystem')
rc = self._subsystem_add_listener(client, subnqn,
self._create_socket_dir(dev_id))
if not rc:
self._delete_subsystem(client, subnqn)
raise DeviceException(grpc.StatusCode.INTERNAL,
'Failed to add the NVMe/vfiouser listener')
rc = self._qmp_add_device(physical_id, dev_id)
if not rc:
self._delete_subsystem(client, subnqn)
raise DeviceException(grpc.StatusCode.INTERNAL,
'Failed to create NVMe/vfiouser device')
return subnqn
def create_device(self, request):
if request.nvme.virtual_id != 0:
raise DeviceException(grpc.StatusCode.INVALID_ARGUMENT,
'Unsupported virtual_id value')
subnqn = self._create_device(request.nvme.physical_id)
return sma_pb2.CreateDeviceResponse(handle=f'{self._prefix}:{subnqn}')
def _qmp_delete_device(self, dev_id):
try:
with QMPClient(self._qmp_addr, AddressFamily.AF_INET) as qclient:
if self._find_pcidev(qclient, dev_id) is not None:
qclient.device_del({'id': dev_id})
return True
except QMPError:
logging.error('QMP: Failed to delete device')
return False
def delete_device(self, request):
with self._client_wrap() as client:
nqn = request.handle[len(f'{self._prefix}:'):]
dev_id = nqn[len('nqn.2016-06.io.spdk:'):]
if not self._delete_subsystem(client, nqn):
raise DeviceException(grpc.StatusCode.INTERNAL,
'Failed to delete NVMe/vfiouser device')
if not self._qmp_delete_device(dev_id):
raise DeviceException(grpc.StatusCode.INTERNAL,
'Failed to delete NVMe/vfiouser device')
try:
path = os.path.join(self._sock_path, dev_id)
if os.path.exists(path):
shutil.rmtree(path)
except OSError:
raise DeviceException(grpc.StatusCode.INTERNAL, f'Socket path error {path}')
def _get_bdev(self, client, guid):
try:
return client.call('bdev_get_bdevs', {'name': guid})[0]
except JSONRPCException:
logging.error('Failed to find bdev')
return None
def _get_ns(self, bdev, subsystem):
for ns in subsystem['namespaces']:
if ns['name'] == bdev['name']:
return ns
def _subsystem_add_ns(self, client, bdev, subsystem, subnqn):
try:
if self._get_ns(bdev, subsystem) is not None:
return True
return client.call('nvmf_subsystem_add_ns',
{'nqn': subnqn,
'namespace': {
'bdev_name': bdev['name']}})
except JSONRPCException:
logging.error('Failed to add ns')
return False
def attach_volume(self, request):
nqn = request.device_handle[len(f'{self._prefix}:'):]
volume_id = format_volume_id(request.volume.volume_id)
with self._client_wrap() as client:
bdev = self._get_bdev(client, volume_id)
if bdev is None:
raise DeviceException(grpc.StatusCode.NOT_FOUND,
'Invalid volume GUID')
subsys = self._get_subsys(client, nqn)
if subsys is None:
raise DeviceException(grpc.StatusCode.NOT_FOUND,
'Invalid device handle')
result = self._subsystem_add_ns(client, bdev, subsys, nqn)
if not result:
raise DeviceException(grpc.StatusCode.INTERNAL,
'Failed to attach volume')
def _subsystem_remove_ns(self, client, bdev, subsystem, subnqn):
try:
ns = self._get_ns(bdev, subsystem)
if ns is None:
return True
return client.call('nvmf_subsystem_remove_ns',
{'nqn': subnqn, 'nsid': ns['nsid']})
except JSONRPCException:
logging.error('Failed to remove ns')
return False
def detach_volume(self, request):
nqn = request.device_handle[len(f'{self._prefix}:'):]
volume_id = format_volume_id(request.volume_id)
with self._client_wrap() as client:
bdev = self._get_bdev(client, volume_id)
if bdev is None:
raise DeviceException(grpc.StatusCode.NOT_FOUND,
'Invalid volume GUID')
subsys = self._get_subsys(client, nqn)
if subsys is None:
raise DeviceException(grpc.StatusCode.NOT_FOUND,
'Invalid device handle')
result = self._subsystem_remove_ns(client, bdev, subsys, nqn)
if not result:
raise DeviceException(grpc.StatusCode.INTERNAL,
'Failed to detach volume')
def owns_device(self, id):
return id.startswith(self._prefix)