Spdk/test/bdev/bdevio/tests.py
Konrad Sztyber 7610bc38dc scripts: move python modules to python directory
Up until now, importing an SPDK RPC python module was just a matter of
`import rpc`.  It's fine until there's another module called `rpc`
installed on the system, in which case it's impossible to import both of
them.  Therefore, to avoid this problem, all of the modules were moved
to a separate directory under the "spdk" namespace.

The decision to move to a location under a separate directory was
motivated by the fact that a directory called scripts/spdk would look
pretty confusing.  Moreover, it should make it also easier to package
these scripts as a python package.

Other than moving the packages, all of the imports were updated to
reflect these changes.  Files under python now use relative imports,
while those under scripts/ use the "spdk" namespace and have their
PYTHONPATH extended with python directory.

Signed-off-by: Konrad Sztyber <konrad.sztyber@intel.com>
Change-Id: Ib43dee73921d590a551dd83885e22870e72451cf
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/9692
Reviewed-by: Tomasz Zawadzki <tomasz.zawadzki@intel.com>
Reviewed-by: Jim Harris <james.r.harris@intel.com>
Community-CI: Broadcom CI <spdk-ci.pdl@broadcom.com>
Community-CI: Mellanox Build Bot
Tested-by: SPDK CI Jenkins <sys_sgci@intel.com>
2022-04-05 14:40:47 +00:00

89 lines
3.0 KiB
Python
Executable File

#!/usr/bin/env python3
import logging
import argparse
import sys
import shlex
try:
from spdk.rpc.client import print_dict, JSONRPCException
import spdk.rpc as rpc
except ImportError:
print("SPDK RPC library missing. Please add spdk/scripts/ directory to PYTHONPATH:")
print("'export PYTHONPATH=$PYTHONPATH:./spdk/scripts/'")
exit(1)
try:
from shlex import quote
except ImportError:
from pipes import quote
def print_array(a):
print(" ".join((quote(v) for v in a)))
def perform_tests_func(client, name=None):
"""
Args:
name: bdev name to perform bdevio tests on (optional; if omitted, test all bdevs)
Returns:
Number of failures in tests. 0 means no errors found.
"""
params = {}
if name:
params['name'] = name
return client.call('perform_tests', params)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description='SPDK RPC command line interface. NOTE: spdk/scripts/ is expected in PYTHONPATH')
parser.add_argument('-s', dest='server_addr',
help='RPC domain socket path or IP address', default='/var/tmp/spdk.sock')
parser.add_argument('-p', dest='port',
help='RPC port number (if server_addr is IP address)',
default=5260, type=int)
parser.add_argument('-t', dest='timeout',
help='Timeout as a floating point number expressed in seconds waiting for response. Default: 60.0',
default=60.0, type=float)
parser.add_argument('-v', dest='verbose', action='store_const', const="INFO",
help='Set verbose mode to INFO', default="ERROR")
parser.add_argument('--verbose', dest='verbose', choices=['DEBUG', 'INFO', 'ERROR'],
help="""Set verbose level. """)
subparsers = parser.add_subparsers(help='RPC methods')
def perform_tests(args):
print_dict(perform_tests_func(args.client, name=args.name))
p = subparsers.add_parser('perform_tests', help='Perform all bdevio tests on select bdev')
p.add_argument('-b', '--name', help="Name of the Blockdev. Example: Nvme0n1")
p.set_defaults(func=perform_tests)
def call_rpc_func(args):
try:
args.func(args)
except JSONRPCException as ex:
print(ex.message)
exit(1)
def execute_script(parser, client, fd):
for rpc_call in map(str.rstrip, fd):
if not rpc_call.strip():
continue
args = parser.parse_args(shlex.split(rpc_call))
args.client = client
call_rpc_func(args)
args = parser.parse_args()
args.client = rpc.client.JSONRPCClient(args.server_addr, args.port, args.timeout, log_level=getattr(logging, args.verbose.upper()))
if hasattr(args, 'func'):
call_rpc_func(args)
elif sys.stdin.isatty():
# No arguments and no data piped through stdin
parser.print_help()
exit(1)
else:
execute_script(parser, args.client, sys.stdin)