Spdk/python/spdk/sma/qmp.py
Milosz Linkiewicz 16cc2ade5d sma: QMP base definitions created
This patch adds python scripts for basic handling of QEMU Machine
Protocol connection for use with vfio_user. Basic low-level connection
and error handling added as extended socket class.
QMPClient: Wrapper class for handling connection with QMP was added.

Protected class method _receive relies on the fact, that QMP always
sends command 'in one line' - each one is always terminated with CRLF.
https://github.com/qemu/qemu/blob/master/docs/interop/qmp-spec.txt

Usage example, command line calls:
1) Without event parameter:
```json
{
    "request": {
        "execute": "device-list-properties",
        "arguments": {
            "typename": "vfiouser-0-1-1"
        }
    }
}
```
2) With event parameter specified. Specifying 'event'
parameter will set script to block wait for occurrence
of such one after a valid execution of specified request:
```json
{
    "event": {
        "event": "DEVICE_DELETED",
        "data": {
            "device": "vfiouser-0-1-1"
        }
    },
    "request": {
        "execute": "device_del",
        "arguments": {
            "id": "vfiouser-0-1-1"
        }
    }
}
```

Signed-off-by: mionsz <milosz.linkiewicz@intel.com>
Change-Id: I7a548f61d31097ad425994b2ea4bac8bc6800b5b
Signed-off-by: Sebastian Brzezinka <sebastian.brzezinka@intel.com>
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/11106
Tested-by: SPDK CI Jenkins <sys_sgci@intel.com>
Community-CI: Broadcom CI <spdk-ci.pdl@broadcom.com>
Reviewed-by: Konrad Sztyber <konrad.sztyber@intel.com>
Reviewed-by: Jim Harris <james.r.harris@intel.com>
2022-05-20 09:16:54 +00:00

287 lines
9.7 KiB
Python
Executable File

#!/usr/bin/env python3
import socket
from socket import error as SocketError
import time
import json
import logging
import sys
from typing import (Any, Dict, Tuple)
from argparse import ArgumentParser
log = logging.getLogger(__name__)
QMPMessage = Dict[str, Any]
'''
Base class for all QMPBaseClass messages
'''
QMPEvent = QMPMessage
'''
Base class alias all asynchronous event messages
'''
class QMPError(Exception):
'''
Base Exception Class for QMPClient implementation
'''
def __init__(self, message, code='internal'):
self.code = repr(code)
self.message = repr(message)
self.description = f'QMP Error ({self.code}): {self.message}'
def __str__(self):
return repr(self.description)
class QMPSocketError(QMPError):
'''
Exception Class for socket exceptions in QMPClient implementation
'''
def __init__(self, message, code='socket'):
super().__init__(message, code)
class QMPRequestError(QMPError):
'''
Exception Class for handling request response errors
'''
def __init__(self, reply: QMPMessage):
self.error_class = reply.get('error', {}).get('class', 'Undefined')
self.error_msg = reply.get('error', {}).get('desc', 'Unknown')
super().__init__(self.error_msg, self.error_class)
class QMPClient():
'''
QMPBaseClass implements a low level connection to QMP socket
:param family is one of [socket.AF_INET, socket.AF_UNIX]
:param address is tuple(address, port) for socket.AF_INET
or a path string for socket.AF_UNIX
:param timeout: timeout in seconds to use for the connection
:raise QMPError: for most error cases
'''
def __init__(self,
address=('127.0.0.1', 10500),
family: socket.AddressFamily = socket.AF_INET,
timeout: float = 8.0):
self._exec_id = 0
self._capabilities = None
self._timeout = timeout
self._socketf = None
self._address = address
try:
self._socket = socket.socket(family, socket.SOCK_STREAM)
self._socket.settimeout(timeout)
except OSError as e:
raise QMPSocketError('Create: exception while creating') from e
def __enter__(self):
self._start()
return self
def __exit__(self, exception_type, exception_value, traceback):
self._disconnect_socket()
def _start(self):
'''
Exit negotiation mode and enter command mode
Based on: https://wiki.qemu.org/Documentation/QMP
Part of communication done after connect.
As stated in Capabilities Negotiation paragraph, for new connection
QMP sends greetings msg and enters capabilities negotiation mode.
To enter command mode, the qmp_capabilities command must be issued.
Can be issued only once per session or the QMP will report an error.
'''
self._connect()
self._capabilities = self._receive()[0]
if 'QMP' not in self._capabilities:
raise QMPError('NegotiateCap: protocol error, wrong message')
self.exec('qmp_capabilities')
def _get_next_exec_id(self):
self._exec_id += 1
return str(self._exec_id)
def _connect(self):
try:
if not self._is_connected():
self._socket.connect(self._address)
self._socketf = self._socket.makefile(mode='rw', encoding='utf-8')
except SocketError as e:
raise QMPSocketError('Connect: could not connect') from e
def _disconnect_socket(self):
if self._socket is not None:
self._socket.close()
if self._socketf is not None:
self._socketf.close()
self._socket = None
self._socketf = None
def _is_connected(self) -> bool:
return self._socketf is not None
def _check_event(self, event, received):
'''
Method for cheking if "received" is the "event" we are waiting for.
:param event: dictionary description of event, mandatory fields are
'event' = QMP name of the event
'data' = event specific params in form of a dict.
:param received: received QMP event to check.
'''
if event['event'].lower() != received['event'].lower():
return False
for it in event.get('data', {}).items():
if it not in received.get('data', {}).items():
return False
return True
def _receive(self, event=None) -> Tuple[QMPMessage, QMPEvent]:
response = None
timeout_begin = time.time()
while self._timeout > (time.time() - timeout_begin):
try:
data = self._socketf.readline()
if data is None:
raise QMPSocketError('Receive: socket got disconnected')
log.debug(f'Received: {data}')
msg = json.loads(data)
except SocketError as e:
raise QMPSocketError('Receive: socket read failed') from e
except EOFError as e:
raise QMPSocketError('Receive: socket read got unexpected EOF') from e
except json.JSONDecodeError as e:
raise QMPError('Receive: QMP message decode failed, JSONDecodeError') from e
if response is None:
if 'error' in msg:
return msg, None
elif 'return' in msg:
if event is None:
return msg, None
response = msg
# Sent only once per connection. Valid for capabilities negotiation mode only
elif 'QMP' in msg:
if self._capabilities is not None:
raise QMPError('Receive: QMP unexpected message type')
return msg, None
elif self._check_event(event, msg):
return response, msg
raise QMPSocketError('Receive: Timed out while processing QMP receive loop')
def _send(self, msg: Dict):
log.debug(f'Sending: {msg}')
try:
self._socket.sendall(bytes(json.dumps(msg) + '\r\n', 'utf-8'))
except TimeoutError as e:
raise QMPSocketError('Send: got socket timeout error') from e
except SocketError as e:
raise QMPSocketError('Send: got system socket error') from e
def exec(self, cmd: str, args: Dict = None, event: Dict = None) -> QMPMessage:
'''
Execute QMP cmd and read result. Returns resulting message, error or optionally
an event that the QMP client should wait for to be send by the server.
:param cmd: string name of the command to execute
:param args: optional arguments dictionary to pass
:param event: optional dictionary describing an event to wait for
:return command exec response or optionally execute result event
:raise QMPRequestError: on response from QMP server being of error type
:raise QMPSocketError: on timeout or socket errors
:raise QMPError: on id missmatch and JSONdecoder errors
'''
cmd_id = self._get_next_exec_id()
msg = {'execute': cmd, 'id': cmd_id}
if args is not None and len(args):
msg['arguments'] = args
self._send(msg)
response, result = self._receive(event)
if response.get('id') != cmd_id:
raise QMPError('QMP Protocol Error, invalid result id')
elif 'error' in response:
raise QMPRequestError(response)
if result is not None:
return result
return response
def device_add(self, params: Dict, event: Dict = None):
return self.exec('device_add', params, event)
def device_del(self, params: Dict, event: Dict = None):
return self.exec('device_del', params, event)
def chardev_add(self, params: Dict, event: Dict = None):
return self.exec('chardev-add', params, event)
def chardev_remove(self, params: Dict, event: Dict = None):
return self.exec('chardev-remove', params, event)
def query_pci(self):
return self.exec('query-pci')
def query_chardev(self):
return self.exec('query-chardev')
def device_list_properties(self, typename: str):
return self.exec('device-list-properties', {'typename': typename})
def parse_argv():
parser = ArgumentParser(description='QEMU Machine Protocol (QMP) client')
parser.add_argument('--address', '-a', default='127.0.0.1',
help='IP address of QMP server instance to connect to')
parser.add_argument('--port', '-p', default=10500, type=int,
help='Port number of QMP server instance to connect to')
return parser.parse_args()
def main(args):
argv = parse_argv()
data = json.loads(sys.stdin.read())
request = data.get('request')
event = data.get('event')
with QMPClient((argv.address, argv.port)) as cli:
result = cli.exec(request['execute'], request.get('arguments'), event)
print(json.dumps(result, indent=2))
# Example usage with command line calls:
# 1) Without event parameter:
# {
# "request": {
# "execute": "device-list-properties",
# "arguments": {
# "typename": "vfiouser-1-1"
# }
# }
# }
# 2) With event parameter specified. Specifying 'event'
# parameter will set script to block wait for occurrence
# of such one after a valid execution of specified request:
# {
# "event": {
# "event": "DEVICE_DELETED",
# "data": {
# "device": "vfiouser-1-1"
# }
# },
# "request": {
# "execute": "device_del",
# "arguments": {
# "id": "vfiouser-1-1"
# }
# }
# }
if __name__ == '__main__':
main(sys.argv[1:])