cachepc-qemu

Fork of AMDESE/qemu with changes for cachepc side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-qemu
Log | Files | Refs | Submodules | LICENSE | sfeed.txt

iotests.py (51215B)


      1# Common utilities and Python wrappers for qemu-iotests
      2#
      3# Copyright (C) 2012 IBM Corp.
      4#
      5# This program is free software; you can redistribute it and/or modify
      6# it under the terms of the GNU General Public License as published by
      7# the Free Software Foundation; either version 2 of the License, or
      8# (at your option) any later version.
      9#
     10# This program is distributed in the hope that it will be useful,
     11# but WITHOUT ANY WARRANTY; without even the implied warranty of
     12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     13# GNU General Public License for more details.
     14#
     15# You should have received a copy of the GNU General Public License
     16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
     17#
     18
     19import atexit
     20import bz2
     21from collections import OrderedDict
     22import faulthandler
     23import json
     24import logging
     25import os
     26import re
     27import shutil
     28import signal
     29import struct
     30import subprocess
     31import sys
     32import time
     33from typing import (Any, Callable, Dict, Iterable,
     34                    List, Optional, Sequence, TextIO, Tuple, Type, TypeVar)
     35import unittest
     36
     37from contextlib import contextmanager
     38
     39from qemu.machine import qtest
     40from qemu.qmp import QMPMessage
     41
     42# Use this logger for logging messages directly from the iotests module
     43logger = logging.getLogger('qemu.iotests')
     44logger.addHandler(logging.NullHandler())
     45
     46# Use this logger for messages that ought to be used for diff output.
     47test_logger = logging.getLogger('qemu.iotests.diff_io')
     48
     49
     50faulthandler.enable()
     51
     52# This will not work if arguments contain spaces but is necessary if we
     53# want to support the override options that ./check supports.
     54qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
     55if os.environ.get('QEMU_IMG_OPTIONS'):
     56    qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
     57
     58qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
     59if os.environ.get('QEMU_IO_OPTIONS'):
     60    qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
     61
     62qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
     63if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
     64    qemu_io_args_no_fmt += \
     65        os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
     66
     67qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
     68qemu_nbd_args = [qemu_nbd_prog]
     69if os.environ.get('QEMU_NBD_OPTIONS'):
     70    qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
     71
     72qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
     73qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
     74
     75gdb_qemu_env = os.environ.get('GDB_OPTIONS')
     76qemu_gdb = []
     77if gdb_qemu_env:
     78    qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ')
     79
     80qemu_print = os.environ.get('PRINT_QEMU', False)
     81
     82imgfmt = os.environ.get('IMGFMT', 'raw')
     83imgproto = os.environ.get('IMGPROTO', 'file')
     84output_dir = os.environ.get('OUTPUT_DIR', '.')
     85
     86try:
     87    test_dir = os.environ['TEST_DIR']
     88    sock_dir = os.environ['SOCK_DIR']
     89    cachemode = os.environ['CACHEMODE']
     90    aiomode = os.environ['AIOMODE']
     91    qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE']
     92except KeyError:
     93    # We are using these variables as proxies to indicate that we're
     94    # not being run via "check". There may be other things set up by
     95    # "check" that individual test cases rely on.
     96    sys.stderr.write('Please run this test via the "check" script\n')
     97    sys.exit(os.EX_USAGE)
     98
     99qemu_valgrind = []
    100if os.environ.get('VALGRIND_QEMU') == "y" and \
    101    os.environ.get('NO_VALGRIND') != "y":
    102    valgrind_logfile = "--log-file=" + test_dir
    103    # %p allows to put the valgrind process PID, since
    104    # we don't know it a priori (subprocess.Popen is
    105    # not yet invoked)
    106    valgrind_logfile += "/%p.valgrind"
    107
    108    qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99']
    109
    110luks_default_secret_object = 'secret,id=keysec0,data=' + \
    111                             os.environ.get('IMGKEYSECRET', '')
    112luks_default_key_secret_opt = 'key-secret=keysec0'
    113
    114sample_img_dir = os.environ['SAMPLE_IMG_DIR']
    115
    116
    117def unarchive_sample_image(sample, fname):
    118    sample_fname = os.path.join(sample_img_dir, sample + '.bz2')
    119    with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out:
    120        shutil.copyfileobj(f_in, f_out)
    121
    122
    123def qemu_tool_pipe_and_status(tool: str, args: Sequence[str],
    124                              connect_stderr: bool = True) -> Tuple[str, int]:
    125    """
    126    Run a tool and return both its output and its exit code
    127    """
    128    stderr = subprocess.STDOUT if connect_stderr else None
    129    with subprocess.Popen(args, stdout=subprocess.PIPE,
    130                          stderr=stderr, universal_newlines=True) as subp:
    131        output = subp.communicate()[0]
    132        if subp.returncode < 0:
    133            cmd = ' '.join(args)
    134            sys.stderr.write(f'{tool} received signal \
    135                               {-subp.returncode}: {cmd}\n')
    136        return (output, subp.returncode)
    137
    138def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
    139    """
    140    Run qemu-img and return both its output and its exit code
    141    """
    142    full_args = qemu_img_args + list(args)
    143    return qemu_tool_pipe_and_status('qemu-img', full_args)
    144
    145def qemu_img(*args: str) -> int:
    146    '''Run qemu-img and return the exit code'''
    147    return qemu_img_pipe_and_status(*args)[1]
    148
    149def ordered_qmp(qmsg, conv_keys=True):
    150    # Dictionaries are not ordered prior to 3.6, therefore:
    151    if isinstance(qmsg, list):
    152        return [ordered_qmp(atom) for atom in qmsg]
    153    if isinstance(qmsg, dict):
    154        od = OrderedDict()
    155        for k, v in sorted(qmsg.items()):
    156            if conv_keys:
    157                k = k.replace('_', '-')
    158            od[k] = ordered_qmp(v, conv_keys=False)
    159        return od
    160    return qmsg
    161
    162def qemu_img_create(*args):
    163    args = list(args)
    164
    165    # default luks support
    166    if '-f' in args and args[args.index('-f') + 1] == 'luks':
    167        if '-o' in args:
    168            i = args.index('-o')
    169            if 'key-secret' not in args[i + 1]:
    170                args[i + 1].append(luks_default_key_secret_opt)
    171                args.insert(i + 2, '--object')
    172                args.insert(i + 3, luks_default_secret_object)
    173        else:
    174            args = ['-o', luks_default_key_secret_opt,
    175                    '--object', luks_default_secret_object] + args
    176
    177    args.insert(0, 'create')
    178
    179    return qemu_img(*args)
    180
    181def qemu_img_measure(*args):
    182    return json.loads(qemu_img_pipe("measure", "--output", "json", *args))
    183
    184def qemu_img_check(*args):
    185    return json.loads(qemu_img_pipe("check", "--output", "json", *args))
    186
    187def qemu_img_verbose(*args):
    188    '''Run qemu-img without suppressing its output and return the exit code'''
    189    exitcode = subprocess.call(qemu_img_args + list(args))
    190    if exitcode < 0:
    191        sys.stderr.write('qemu-img received signal %i: %s\n'
    192                         % (-exitcode, ' '.join(qemu_img_args + list(args))))
    193    return exitcode
    194
    195def qemu_img_pipe(*args: str) -> str:
    196    '''Run qemu-img and return its output'''
    197    return qemu_img_pipe_and_status(*args)[0]
    198
    199def qemu_img_log(*args):
    200    result = qemu_img_pipe(*args)
    201    log(result, filters=[filter_testfiles])
    202    return result
    203
    204def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()):
    205    args = ['info']
    206    if imgopts:
    207        args.append('--image-opts')
    208    else:
    209        args += ['-f', imgfmt]
    210    args += extra_args
    211    args.append(filename)
    212
    213    output = qemu_img_pipe(*args)
    214    if not filter_path:
    215        filter_path = filename
    216    log(filter_img_info(output, filter_path))
    217
    218def qemu_io(*args):
    219    '''Run qemu-io and return the stdout data'''
    220    args = qemu_io_args + list(args)
    221    return qemu_tool_pipe_and_status('qemu-io', args)[0]
    222
    223def qemu_io_log(*args):
    224    result = qemu_io(*args)
    225    log(result, filters=[filter_testfiles, filter_qemu_io])
    226    return result
    227
    228def qemu_io_silent(*args):
    229    '''Run qemu-io and return the exit code, suppressing stdout'''
    230    if '-f' in args or '--image-opts' in args:
    231        default_args = qemu_io_args_no_fmt
    232    else:
    233        default_args = qemu_io_args
    234
    235    args = default_args + list(args)
    236    result = subprocess.run(args, stdout=subprocess.DEVNULL, check=False)
    237    if result.returncode < 0:
    238        sys.stderr.write('qemu-io received signal %i: %s\n' %
    239                         (-result.returncode, ' '.join(args)))
    240    return result.returncode
    241
    242def qemu_io_silent_check(*args):
    243    '''Run qemu-io and return the true if subprocess returned 0'''
    244    args = qemu_io_args + list(args)
    245    result = subprocess.run(args, stdout=subprocess.DEVNULL,
    246                            stderr=subprocess.STDOUT, check=False)
    247    return result.returncode == 0
    248
    249class QemuIoInteractive:
    250    def __init__(self, *args):
    251        self.args = qemu_io_args_no_fmt + list(args)
    252        # We need to keep the Popen objext around, and not
    253        # close it immediately. Therefore, disable the pylint check:
    254        # pylint: disable=consider-using-with
    255        self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
    256                                   stdout=subprocess.PIPE,
    257                                   stderr=subprocess.STDOUT,
    258                                   universal_newlines=True)
    259        out = self._p.stdout.read(9)
    260        if out != 'qemu-io> ':
    261            # Most probably qemu-io just failed to start.
    262            # Let's collect the whole output and exit.
    263            out += self._p.stdout.read()
    264            self._p.wait(timeout=1)
    265            raise ValueError(out)
    266
    267    def close(self):
    268        self._p.communicate('q\n')
    269
    270    def _read_output(self):
    271        pattern = 'qemu-io> '
    272        n = len(pattern)
    273        pos = 0
    274        s = []
    275        while pos != n:
    276            c = self._p.stdout.read(1)
    277            # check unexpected EOF
    278            assert c != ''
    279            s.append(c)
    280            if c == pattern[pos]:
    281                pos += 1
    282            else:
    283                pos = 0
    284
    285        return ''.join(s[:-n])
    286
    287    def cmd(self, cmd):
    288        # quit command is in close(), '\n' is added automatically
    289        assert '\n' not in cmd
    290        cmd = cmd.strip()
    291        assert cmd not in ('q', 'quit')
    292        self._p.stdin.write(cmd + '\n')
    293        self._p.stdin.flush()
    294        return self._read_output()
    295
    296
    297def qemu_nbd(*args):
    298    '''Run qemu-nbd in daemon mode and return the parent's exit code'''
    299    return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
    300
    301def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]:
    302    '''Run qemu-nbd in daemon mode and return both the parent's exit code
    303       and its output in case of an error'''
    304    full_args = qemu_nbd_args + ['--fork'] + list(args)
    305    output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args,
    306                                                   connect_stderr=False)
    307    return returncode, output if returncode else ''
    308
    309def qemu_nbd_list_log(*args: str) -> str:
    310    '''Run qemu-nbd to list remote exports'''
    311    full_args = [qemu_nbd_prog, '-L'] + list(args)
    312    output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
    313    log(output, filters=[filter_testfiles, filter_nbd_exports])
    314    return output
    315
    316@contextmanager
    317def qemu_nbd_popen(*args):
    318    '''Context manager running qemu-nbd within the context'''
    319    pid_file = file_path("qemu_nbd_popen-nbd-pid-file")
    320
    321    assert not os.path.exists(pid_file)
    322
    323    cmd = list(qemu_nbd_args)
    324    cmd.extend(('--persistent', '--pid-file', pid_file))
    325    cmd.extend(args)
    326
    327    log('Start NBD server')
    328    with subprocess.Popen(cmd) as p:
    329        try:
    330            while not os.path.exists(pid_file):
    331                if p.poll() is not None:
    332                    raise RuntimeError(
    333                        "qemu-nbd terminated with exit code {}: {}"
    334                        .format(p.returncode, ' '.join(cmd)))
    335
    336                time.sleep(0.01)
    337            yield
    338        finally:
    339            if os.path.exists(pid_file):
    340                os.remove(pid_file)
    341            log('Kill NBD server')
    342            p.kill()
    343            p.wait()
    344
    345def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
    346    '''Return True if two image files are identical'''
    347    return qemu_img('compare', '-f', fmt1,
    348                    '-F', fmt2, img1, img2) == 0
    349
    350def create_image(name, size):
    351    '''Create a fully-allocated raw image with sector markers'''
    352    with open(name, 'wb') as file:
    353        i = 0
    354        while i < size:
    355            sector = struct.pack('>l504xl', i // 512, i // 512)
    356            file.write(sector)
    357            i = i + 512
    358
    359def image_size(img):
    360    '''Return image's virtual size'''
    361    r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
    362    return json.loads(r)['virtual-size']
    363
    364def is_str(val):
    365    return isinstance(val, str)
    366
    367test_dir_re = re.compile(r"%s" % test_dir)
    368def filter_test_dir(msg):
    369    return test_dir_re.sub("TEST_DIR", msg)
    370
    371win32_re = re.compile(r"\r")
    372def filter_win32(msg):
    373    return win32_re.sub("", msg)
    374
    375qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
    376                        r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
    377                        r"and [0-9\/.inf]* ops\/sec\)")
    378def filter_qemu_io(msg):
    379    msg = filter_win32(msg)
    380    return qemu_io_re.sub("X ops; XX:XX:XX.X "
    381                          "(XXX YYY/sec and XXX ops/sec)", msg)
    382
    383chown_re = re.compile(r"chown [0-9]+:[0-9]+")
    384def filter_chown(msg):
    385    return chown_re.sub("chown UID:GID", msg)
    386
    387def filter_qmp_event(event):
    388    '''Filter a QMP event dict'''
    389    event = dict(event)
    390    if 'timestamp' in event:
    391        event['timestamp']['seconds'] = 'SECS'
    392        event['timestamp']['microseconds'] = 'USECS'
    393    return event
    394
    395def filter_qmp(qmsg, filter_fn):
    396    '''Given a string filter, filter a QMP object's values.
    397    filter_fn takes a (key, value) pair.'''
    398    # Iterate through either lists or dicts;
    399    if isinstance(qmsg, list):
    400        items = enumerate(qmsg)
    401    else:
    402        items = qmsg.items()
    403
    404    for k, v in items:
    405        if isinstance(v, (dict, list)):
    406            qmsg[k] = filter_qmp(v, filter_fn)
    407        else:
    408            qmsg[k] = filter_fn(k, v)
    409    return qmsg
    410
    411def filter_testfiles(msg):
    412    pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
    413    pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
    414    return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
    415
    416def filter_qmp_testfiles(qmsg):
    417    def _filter(_key, value):
    418        if is_str(value):
    419            return filter_testfiles(value)
    420        return value
    421    return filter_qmp(qmsg, _filter)
    422
    423def filter_virtio_scsi(output: str) -> str:
    424    return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output)
    425
    426def filter_qmp_virtio_scsi(qmsg):
    427    def _filter(_key, value):
    428        if is_str(value):
    429            return filter_virtio_scsi(value)
    430        return value
    431    return filter_qmp(qmsg, _filter)
    432
    433def filter_generated_node_ids(msg):
    434    return re.sub("#block[0-9]+", "NODE_NAME", msg)
    435
    436def filter_img_info(output, filename):
    437    lines = []
    438    for line in output.split('\n'):
    439        if 'disk size' in line or 'actual-size' in line:
    440            continue
    441        line = line.replace(filename, 'TEST_IMG')
    442        line = filter_testfiles(line)
    443        line = line.replace(imgfmt, 'IMGFMT')
    444        line = re.sub('iters: [0-9]+', 'iters: XXX', line)
    445        line = re.sub('uuid: [-a-f0-9]+',
    446                      'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
    447                      line)
    448        line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
    449        lines.append(line)
    450    return '\n'.join(lines)
    451
    452def filter_imgfmt(msg):
    453    return msg.replace(imgfmt, 'IMGFMT')
    454
    455def filter_qmp_imgfmt(qmsg):
    456    def _filter(_key, value):
    457        if is_str(value):
    458            return filter_imgfmt(value)
    459        return value
    460    return filter_qmp(qmsg, _filter)
    461
    462def filter_nbd_exports(output: str) -> str:
    463    return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
    464
    465
    466Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
    467
    468def log(msg: Msg,
    469        filters: Iterable[Callable[[Msg], Msg]] = (),
    470        indent: Optional[int] = None) -> None:
    471    """
    472    Logs either a string message or a JSON serializable message (like QMP).
    473    If indent is provided, JSON serializable messages are pretty-printed.
    474    """
    475    for flt in filters:
    476        msg = flt(msg)
    477    if isinstance(msg, (dict, list)):
    478        # Don't sort if it's already sorted
    479        do_sort = not isinstance(msg, OrderedDict)
    480        test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
    481    else:
    482        test_logger.info(msg)
    483
    484class Timeout:
    485    def __init__(self, seconds, errmsg="Timeout"):
    486        self.seconds = seconds
    487        self.errmsg = errmsg
    488    def __enter__(self):
    489        if qemu_gdb or qemu_valgrind:
    490            return self
    491        signal.signal(signal.SIGALRM, self.timeout)
    492        signal.setitimer(signal.ITIMER_REAL, self.seconds)
    493        return self
    494    def __exit__(self, exc_type, value, traceback):
    495        if qemu_gdb or qemu_valgrind:
    496            return False
    497        signal.setitimer(signal.ITIMER_REAL, 0)
    498        return False
    499    def timeout(self, signum, frame):
    500        raise Exception(self.errmsg)
    501
    502def file_pattern(name):
    503    return "{0}-{1}".format(os.getpid(), name)
    504
    505class FilePath:
    506    """
    507    Context manager generating multiple file names. The generated files are
    508    removed when exiting the context.
    509
    510    Example usage:
    511
    512        with FilePath('a.img', 'b.img') as (img_a, img_b):
    513            # Use img_a and img_b here...
    514
    515        # a.img and b.img are automatically removed here.
    516
    517    By default images are created in iotests.test_dir. To create sockets use
    518    iotests.sock_dir:
    519
    520       with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
    521
    522    For convenience, calling with one argument yields a single file instead of
    523    a tuple with one item.
    524
    525    """
    526    def __init__(self, *names, base_dir=test_dir):
    527        self.paths = [os.path.join(base_dir, file_pattern(name))
    528                      for name in names]
    529
    530    def __enter__(self):
    531        if len(self.paths) == 1:
    532            return self.paths[0]
    533        else:
    534            return self.paths
    535
    536    def __exit__(self, exc_type, exc_val, exc_tb):
    537        for path in self.paths:
    538            try:
    539                os.remove(path)
    540            except OSError:
    541                pass
    542        return False
    543
    544
    545def try_remove(img):
    546    try:
    547        os.remove(img)
    548    except OSError:
    549        pass
    550
    551def file_path_remover():
    552    for path in reversed(file_path_remover.paths):
    553        try_remove(path)
    554
    555
    556def file_path(*names, base_dir=test_dir):
    557    ''' Another way to get auto-generated filename that cleans itself up.
    558
    559    Use is as simple as:
    560
    561    img_a, img_b = file_path('a.img', 'b.img')
    562    sock = file_path('socket')
    563    '''
    564
    565    if not hasattr(file_path_remover, 'paths'):
    566        file_path_remover.paths = []
    567        atexit.register(file_path_remover)
    568
    569    paths = []
    570    for name in names:
    571        filename = file_pattern(name)
    572        path = os.path.join(base_dir, filename)
    573        file_path_remover.paths.append(path)
    574        paths.append(path)
    575
    576    return paths[0] if len(paths) == 1 else paths
    577
    578def remote_filename(path):
    579    if imgproto == 'file':
    580        return path
    581    elif imgproto == 'ssh':
    582        return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
    583    else:
    584        raise Exception("Protocol %s not supported" % (imgproto))
    585
    586class VM(qtest.QEMUQtestMachine):
    587    '''A QEMU VM'''
    588
    589    def __init__(self, path_suffix=''):
    590        name = "qemu%s-%d" % (path_suffix, os.getpid())
    591        timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None
    592        if qemu_gdb and qemu_valgrind:
    593            sys.stderr.write('gdb and valgrind are mutually exclusive\n')
    594            sys.exit(1)
    595        wrapper = qemu_gdb if qemu_gdb else qemu_valgrind
    596        super().__init__(qemu_prog, qemu_opts, wrapper=wrapper,
    597                         name=name,
    598                         base_temp_dir=test_dir,
    599                         sock_dir=sock_dir, qmp_timer=timer)
    600        self._num_drives = 0
    601
    602    def _post_shutdown(self) -> None:
    603        super()._post_shutdown()
    604        if not qemu_valgrind or not self._popen:
    605            return
    606        valgrind_filename =  f"{test_dir}/{self._popen.pid}.valgrind"
    607        if self.exitcode() == 99:
    608            with open(valgrind_filename, encoding='utf-8') as f:
    609                print(f.read())
    610        else:
    611            os.remove(valgrind_filename)
    612
    613    def _pre_launch(self) -> None:
    614        super()._pre_launch()
    615        if qemu_print:
    616            # set QEMU binary output to stdout
    617            self._close_qemu_log_file()
    618
    619    def add_object(self, opts):
    620        self._args.append('-object')
    621        self._args.append(opts)
    622        return self
    623
    624    def add_device(self, opts):
    625        self._args.append('-device')
    626        self._args.append(opts)
    627        return self
    628
    629    def add_drive_raw(self, opts):
    630        self._args.append('-drive')
    631        self._args.append(opts)
    632        return self
    633
    634    def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
    635        '''Add a virtio-blk drive to the VM'''
    636        options = ['if=%s' % interface,
    637                   'id=drive%d' % self._num_drives]
    638
    639        if path is not None:
    640            options.append('file=%s' % path)
    641            options.append('format=%s' % img_format)
    642            options.append('cache=%s' % cachemode)
    643            options.append('aio=%s' % aiomode)
    644
    645        if opts:
    646            options.append(opts)
    647
    648        if img_format == 'luks' and 'key-secret' not in opts:
    649            # default luks support
    650            if luks_default_secret_object not in self._args:
    651                self.add_object(luks_default_secret_object)
    652
    653            options.append(luks_default_key_secret_opt)
    654
    655        self._args.append('-drive')
    656        self._args.append(','.join(options))
    657        self._num_drives += 1
    658        return self
    659
    660    def add_blockdev(self, opts):
    661        self._args.append('-blockdev')
    662        if isinstance(opts, str):
    663            self._args.append(opts)
    664        else:
    665            self._args.append(','.join(opts))
    666        return self
    667
    668    def add_incoming(self, addr):
    669        self._args.append('-incoming')
    670        self._args.append(addr)
    671        return self
    672
    673    def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
    674        cmd = 'human-monitor-command'
    675        kwargs: Dict[str, Any] = {'command-line': command_line}
    676        if use_log:
    677            return self.qmp_log(cmd, **kwargs)
    678        else:
    679            return self.qmp(cmd, **kwargs)
    680
    681    def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
    682        """Pause drive r/w operations"""
    683        if not event:
    684            self.pause_drive(drive, "read_aio")
    685            self.pause_drive(drive, "write_aio")
    686            return
    687        self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
    688
    689    def resume_drive(self, drive: str) -> None:
    690        """Resume drive r/w operations"""
    691        self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
    692
    693    def hmp_qemu_io(self, drive: str, cmd: str,
    694                    use_log: bool = False, qdev: bool = False) -> QMPMessage:
    695        """Write to a given drive using an HMP command"""
    696        d = '-d ' if qdev else ''
    697        return self.hmp(f'qemu-io {d}{drive} "{cmd}"', use_log=use_log)
    698
    699    def flatten_qmp_object(self, obj, output=None, basestr=''):
    700        if output is None:
    701            output = {}
    702        if isinstance(obj, list):
    703            for i, item in enumerate(obj):
    704                self.flatten_qmp_object(item, output, basestr + str(i) + '.')
    705        elif isinstance(obj, dict):
    706            for key in obj:
    707                self.flatten_qmp_object(obj[key], output, basestr + key + '.')
    708        else:
    709            output[basestr[:-1]] = obj # Strip trailing '.'
    710        return output
    711
    712    def qmp_to_opts(self, obj):
    713        obj = self.flatten_qmp_object(obj)
    714        output_list = []
    715        for key in obj:
    716            output_list += [key + '=' + obj[key]]
    717        return ','.join(output_list)
    718
    719    def get_qmp_events_filtered(self, wait=60.0):
    720        result = []
    721        for ev in self.get_qmp_events(wait=wait):
    722            result.append(filter_qmp_event(ev))
    723        return result
    724
    725    def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
    726        full_cmd = OrderedDict((
    727            ("execute", cmd),
    728            ("arguments", ordered_qmp(kwargs))
    729        ))
    730        log(full_cmd, filters, indent=indent)
    731        result = self.qmp(cmd, **kwargs)
    732        log(result, filters, indent=indent)
    733        return result
    734
    735    # Returns None on success, and an error string on failure
    736    def run_job(self, job, auto_finalize=True, auto_dismiss=False,
    737                pre_finalize=None, cancel=False, wait=60.0):
    738        """
    739        run_job moves a job from creation through to dismissal.
    740
    741        :param job: String. ID of recently-launched job
    742        :param auto_finalize: Bool. True if the job was launched with
    743                              auto_finalize. Defaults to True.
    744        :param auto_dismiss: Bool. True if the job was launched with
    745                             auto_dismiss=True. Defaults to False.
    746        :param pre_finalize: Callback. A callable that takes no arguments to be
    747                             invoked prior to issuing job-finalize, if any.
    748        :param cancel: Bool. When true, cancels the job after the pre_finalize
    749                       callback.
    750        :param wait: Float. Timeout value specifying how long to wait for any
    751                     event, in seconds. Defaults to 60.0.
    752        """
    753        match_device = {'data': {'device': job}}
    754        match_id = {'data': {'id': job}}
    755        events = [
    756            ('BLOCK_JOB_COMPLETED', match_device),
    757            ('BLOCK_JOB_CANCELLED', match_device),
    758            ('BLOCK_JOB_ERROR', match_device),
    759            ('BLOCK_JOB_READY', match_device),
    760            ('BLOCK_JOB_PENDING', match_id),
    761            ('JOB_STATUS_CHANGE', match_id)
    762        ]
    763        error = None
    764        while True:
    765            ev = filter_qmp_event(self.events_wait(events, timeout=wait))
    766            if ev['event'] != 'JOB_STATUS_CHANGE':
    767                log(ev)
    768                continue
    769            status = ev['data']['status']
    770            if status == 'aborting':
    771                result = self.qmp('query-jobs')
    772                for j in result['return']:
    773                    if j['id'] == job:
    774                        error = j['error']
    775                        log('Job failed: %s' % (j['error']))
    776            elif status == 'ready':
    777                self.qmp_log('job-complete', id=job)
    778            elif status == 'pending' and not auto_finalize:
    779                if pre_finalize:
    780                    pre_finalize()
    781                if cancel:
    782                    self.qmp_log('job-cancel', id=job)
    783                else:
    784                    self.qmp_log('job-finalize', id=job)
    785            elif status == 'concluded' and not auto_dismiss:
    786                self.qmp_log('job-dismiss', id=job)
    787            elif status == 'null':
    788                return error
    789
    790    # Returns None on success, and an error string on failure
    791    def blockdev_create(self, options, job_id='job0', filters=None):
    792        if filters is None:
    793            filters = [filter_qmp_testfiles]
    794        result = self.qmp_log('blockdev-create', filters=filters,
    795                              job_id=job_id, options=options)
    796
    797        if 'return' in result:
    798            assert result['return'] == {}
    799            job_result = self.run_job(job_id)
    800        else:
    801            job_result = result['error']
    802
    803        log("")
    804        return job_result
    805
    806    def enable_migration_events(self, name):
    807        log('Enabling migration QMP events on %s...' % name)
    808        log(self.qmp('migrate-set-capabilities', capabilities=[
    809            {
    810                'capability': 'events',
    811                'state': True
    812            }
    813        ]))
    814
    815    def wait_migration(self, expect_runstate: Optional[str]) -> bool:
    816        while True:
    817            event = self.event_wait('MIGRATION')
    818            # We use the default timeout, and with a timeout, event_wait()
    819            # never returns None
    820            assert event
    821
    822            log(event, filters=[filter_qmp_event])
    823            if event['data']['status'] in ('completed', 'failed'):
    824                break
    825
    826        if event['data']['status'] == 'completed':
    827            # The event may occur in finish-migrate, so wait for the expected
    828            # post-migration runstate
    829            runstate = None
    830            while runstate != expect_runstate:
    831                runstate = self.qmp('query-status')['return']['status']
    832            return True
    833        else:
    834            return False
    835
    836    def node_info(self, node_name):
    837        nodes = self.qmp('query-named-block-nodes')
    838        for x in nodes['return']:
    839            if x['node-name'] == node_name:
    840                return x
    841        return None
    842
    843    def query_bitmaps(self):
    844        res = self.qmp("query-named-block-nodes")
    845        return {device['node-name']: device['dirty-bitmaps']
    846                for device in res['return'] if 'dirty-bitmaps' in device}
    847
    848    def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
    849        """
    850        get a specific bitmap from the object returned by query_bitmaps.
    851        :param recording: If specified, filter results by the specified value.
    852        :param bitmaps: If specified, use it instead of call query_bitmaps()
    853        """
    854        if bitmaps is None:
    855            bitmaps = self.query_bitmaps()
    856
    857        for bitmap in bitmaps[node_name]:
    858            if bitmap.get('name', '') == bitmap_name:
    859                if recording is None or bitmap.get('recording') == recording:
    860                    return bitmap
    861        return None
    862
    863    def check_bitmap_status(self, node_name, bitmap_name, fields):
    864        ret = self.get_bitmap(node_name, bitmap_name)
    865
    866        return fields.items() <= ret.items()
    867
    868    def assert_block_path(self, root, path, expected_node, graph=None):
    869        """
    870        Check whether the node under the given path in the block graph
    871        is @expected_node.
    872
    873        @root is the node name of the node where the @path is rooted.
    874
    875        @path is a string that consists of child names separated by
    876        slashes.  It must begin with a slash.
    877
    878        Examples for @root + @path:
    879          - root="qcow2-node", path="/backing/file"
    880          - root="quorum-node", path="/children.2/file"
    881
    882        Hypothetically, @path could be empty, in which case it would
    883        point to @root.  However, in practice this case is not useful
    884        and hence not allowed.
    885
    886        @expected_node may be None.  (All elements of the path but the
    887        leaf must still exist.)
    888
    889        @graph may be None or the result of an x-debug-query-block-graph
    890        call that has already been performed.
    891        """
    892        if graph is None:
    893            graph = self.qmp('x-debug-query-block-graph')['return']
    894
    895        iter_path = iter(path.split('/'))
    896
    897        # Must start with a /
    898        assert next(iter_path) == ''
    899
    900        node = next((node for node in graph['nodes'] if node['name'] == root),
    901                    None)
    902
    903        # An empty @path is not allowed, so the root node must be present
    904        assert node is not None, 'Root node %s not found' % root
    905
    906        for child_name in iter_path:
    907            assert node is not None, 'Cannot follow path %s%s' % (root, path)
    908
    909            try:
    910                node_id = next(edge['child'] for edge in graph['edges']
    911                               if (edge['parent'] == node['id'] and
    912                                   edge['name'] == child_name))
    913
    914                node = next(node for node in graph['nodes']
    915                            if node['id'] == node_id)
    916
    917            except StopIteration:
    918                node = None
    919
    920        if node is None:
    921            assert expected_node is None, \
    922                   'No node found under %s (but expected %s)' % \
    923                   (path, expected_node)
    924        else:
    925            assert node['name'] == expected_node, \
    926                   'Found node %s under %s (but expected %s)' % \
    927                   (node['name'], path, expected_node)
    928
    929index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
    930
    931class QMPTestCase(unittest.TestCase):
    932    '''Abstract base class for QMP test cases'''
    933
    934    def __init__(self, *args, **kwargs):
    935        super().__init__(*args, **kwargs)
    936        # Many users of this class set a VM property we rely on heavily
    937        # in the methods below.
    938        self.vm = None
    939
    940    def dictpath(self, d, path):
    941        '''Traverse a path in a nested dict'''
    942        for component in path.split('/'):
    943            m = index_re.match(component)
    944            if m:
    945                component, idx = m.groups()
    946                idx = int(idx)
    947
    948            if not isinstance(d, dict) or component not in d:
    949                self.fail(f'failed path traversal for "{path}" in "{d}"')
    950            d = d[component]
    951
    952            if m:
    953                if not isinstance(d, list):
    954                    self.fail(f'path component "{component}" in "{path}" '
    955                              f'is not a list in "{d}"')
    956                try:
    957                    d = d[idx]
    958                except IndexError:
    959                    self.fail(f'invalid index "{idx}" in path "{path}" '
    960                              f'in "{d}"')
    961        return d
    962
    963    def assert_qmp_absent(self, d, path):
    964        try:
    965            result = self.dictpath(d, path)
    966        except AssertionError:
    967            return
    968        self.fail('path "%s" has value "%s"' % (path, str(result)))
    969
    970    def assert_qmp(self, d, path, value):
    971        '''Assert that the value for a specific path in a QMP dict
    972           matches.  When given a list of values, assert that any of
    973           them matches.'''
    974
    975        result = self.dictpath(d, path)
    976
    977        # [] makes no sense as a list of valid values, so treat it as
    978        # an actual single value.
    979        if isinstance(value, list) and value != []:
    980            for v in value:
    981                if result == v:
    982                    return
    983            self.fail('no match for "%s" in %s' % (str(result), str(value)))
    984        else:
    985            self.assertEqual(result, value,
    986                             '"%s" is "%s", expected "%s"'
    987                             % (path, str(result), str(value)))
    988
    989    def assert_no_active_block_jobs(self):
    990        result = self.vm.qmp('query-block-jobs')
    991        self.assert_qmp(result, 'return', [])
    992
    993    def assert_has_block_node(self, node_name=None, file_name=None):
    994        """Issue a query-named-block-nodes and assert node_name and/or
    995        file_name is present in the result"""
    996        def check_equal_or_none(a, b):
    997            return a is None or b is None or a == b
    998        assert node_name or file_name
    999        result = self.vm.qmp('query-named-block-nodes')
   1000        for x in result["return"]:
   1001            if check_equal_or_none(x.get("node-name"), node_name) and \
   1002                    check_equal_or_none(x.get("file"), file_name):
   1003                return
   1004        self.fail("Cannot find %s %s in result:\n%s" %
   1005                  (node_name, file_name, result))
   1006
   1007    def assert_json_filename_equal(self, json_filename, reference):
   1008        '''Asserts that the given filename is a json: filename and that its
   1009           content is equal to the given reference object'''
   1010        self.assertEqual(json_filename[:5], 'json:')
   1011        self.assertEqual(
   1012            self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
   1013            self.vm.flatten_qmp_object(reference)
   1014        )
   1015
   1016    def cancel_and_wait(self, drive='drive0', force=False,
   1017                        resume=False, wait=60.0):
   1018        '''Cancel a block job and wait for it to finish, returning the event'''
   1019        result = self.vm.qmp('block-job-cancel', device=drive, force=force)
   1020        self.assert_qmp(result, 'return', {})
   1021
   1022        if resume:
   1023            self.vm.resume_drive(drive)
   1024
   1025        cancelled = False
   1026        result = None
   1027        while not cancelled:
   1028            for event in self.vm.get_qmp_events(wait=wait):
   1029                if event['event'] == 'BLOCK_JOB_COMPLETED' or \
   1030                   event['event'] == 'BLOCK_JOB_CANCELLED':
   1031                    self.assert_qmp(event, 'data/device', drive)
   1032                    result = event
   1033                    cancelled = True
   1034                elif event['event'] == 'JOB_STATUS_CHANGE':
   1035                    self.assert_qmp(event, 'data/id', drive)
   1036
   1037
   1038        self.assert_no_active_block_jobs()
   1039        return result
   1040
   1041    def wait_until_completed(self, drive='drive0', check_offset=True,
   1042                             wait=60.0, error=None):
   1043        '''Wait for a block job to finish, returning the event'''
   1044        while True:
   1045            for event in self.vm.get_qmp_events(wait=wait):
   1046                if event['event'] == 'BLOCK_JOB_COMPLETED':
   1047                    self.assert_qmp(event, 'data/device', drive)
   1048                    if error is None:
   1049                        self.assert_qmp_absent(event, 'data/error')
   1050                        if check_offset:
   1051                            self.assert_qmp(event, 'data/offset',
   1052                                            event['data']['len'])
   1053                    else:
   1054                        self.assert_qmp(event, 'data/error', error)
   1055                    self.assert_no_active_block_jobs()
   1056                    return event
   1057                if event['event'] == 'JOB_STATUS_CHANGE':
   1058                    self.assert_qmp(event, 'data/id', drive)
   1059
   1060    def wait_ready(self, drive='drive0'):
   1061        """Wait until a BLOCK_JOB_READY event, and return the event."""
   1062        return self.vm.events_wait([
   1063            ('BLOCK_JOB_READY',
   1064             {'data': {'type': 'mirror', 'device': drive}}),
   1065            ('BLOCK_JOB_READY',
   1066             {'data': {'type': 'commit', 'device': drive}})
   1067        ])
   1068
   1069    def wait_ready_and_cancel(self, drive='drive0'):
   1070        self.wait_ready(drive=drive)
   1071        event = self.cancel_and_wait(drive=drive)
   1072        self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
   1073        self.assert_qmp(event, 'data/type', 'mirror')
   1074        self.assert_qmp(event, 'data/offset', event['data']['len'])
   1075
   1076    def complete_and_wait(self, drive='drive0', wait_ready=True,
   1077                          completion_error=None):
   1078        '''Complete a block job and wait for it to finish'''
   1079        if wait_ready:
   1080            self.wait_ready(drive=drive)
   1081
   1082        result = self.vm.qmp('block-job-complete', device=drive)
   1083        self.assert_qmp(result, 'return', {})
   1084
   1085        event = self.wait_until_completed(drive=drive, error=completion_error)
   1086        self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
   1087
   1088    def pause_wait(self, job_id='job0'):
   1089        with Timeout(3, "Timeout waiting for job to pause"):
   1090            while True:
   1091                result = self.vm.qmp('query-block-jobs')
   1092                found = False
   1093                for job in result['return']:
   1094                    if job['device'] == job_id:
   1095                        found = True
   1096                        if job['paused'] and not job['busy']:
   1097                            return job
   1098                        break
   1099                assert found
   1100
   1101    def pause_job(self, job_id='job0', wait=True):
   1102        result = self.vm.qmp('block-job-pause', device=job_id)
   1103        self.assert_qmp(result, 'return', {})
   1104        if wait:
   1105            return self.pause_wait(job_id)
   1106        return result
   1107
   1108    def case_skip(self, reason):
   1109        '''Skip this test case'''
   1110        case_notrun(reason)
   1111        self.skipTest(reason)
   1112
   1113
   1114def notrun(reason):
   1115    '''Skip this test suite'''
   1116    # Each test in qemu-iotests has a number ("seq")
   1117    seq = os.path.basename(sys.argv[0])
   1118
   1119    with open('%s/%s.notrun' % (output_dir, seq), 'w', encoding='utf-8') \
   1120            as outfile:
   1121        outfile.write(reason + '\n')
   1122    logger.warning("%s not run: %s", seq, reason)
   1123    sys.exit(0)
   1124
   1125def case_notrun(reason):
   1126    '''Mark this test case as not having been run (without actually
   1127    skipping it, that is left to the caller).  See
   1128    QMPTestCase.case_skip() for a variant that actually skips the
   1129    current test case.'''
   1130
   1131    # Each test in qemu-iotests has a number ("seq")
   1132    seq = os.path.basename(sys.argv[0])
   1133
   1134    with open('%s/%s.casenotrun' % (output_dir, seq), 'a', encoding='utf-8') \
   1135            as outfile:
   1136        outfile.write('    [case not run] ' + reason + '\n')
   1137
   1138def _verify_image_format(supported_fmts: Sequence[str] = (),
   1139                         unsupported_fmts: Sequence[str] = ()) -> None:
   1140    if 'generic' in supported_fmts and \
   1141            os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
   1142        # similar to
   1143        #   _supported_fmt generic
   1144        # for bash tests
   1145        supported_fmts = ()
   1146
   1147    not_sup = supported_fmts and (imgfmt not in supported_fmts)
   1148    if not_sup or (imgfmt in unsupported_fmts):
   1149        notrun('not suitable for this image format: %s' % imgfmt)
   1150
   1151    if imgfmt == 'luks':
   1152        verify_working_luks()
   1153
   1154def _verify_protocol(supported: Sequence[str] = (),
   1155                     unsupported: Sequence[str] = ()) -> None:
   1156    assert not (supported and unsupported)
   1157
   1158    if 'generic' in supported:
   1159        return
   1160
   1161    not_sup = supported and (imgproto not in supported)
   1162    if not_sup or (imgproto in unsupported):
   1163        notrun('not suitable for this protocol: %s' % imgproto)
   1164
   1165def _verify_platform(supported: Sequence[str] = (),
   1166                     unsupported: Sequence[str] = ()) -> None:
   1167    if any((sys.platform.startswith(x) for x in unsupported)):
   1168        notrun('not suitable for this OS: %s' % sys.platform)
   1169
   1170    if supported:
   1171        if not any((sys.platform.startswith(x) for x in supported)):
   1172            notrun('not suitable for this OS: %s' % sys.platform)
   1173
   1174def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
   1175    if supported_cache_modes and (cachemode not in supported_cache_modes):
   1176        notrun('not suitable for this cache mode: %s' % cachemode)
   1177
   1178def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
   1179    if supported_aio_modes and (aiomode not in supported_aio_modes):
   1180        notrun('not suitable for this aio mode: %s' % aiomode)
   1181
   1182def _verify_formats(required_formats: Sequence[str] = ()) -> None:
   1183    usf_list = list(set(required_formats) - set(supported_formats()))
   1184    if usf_list:
   1185        notrun(f'formats {usf_list} are not whitelisted')
   1186
   1187
   1188def _verify_virtio_blk() -> None:
   1189    out = qemu_pipe('-M', 'none', '-device', 'help')
   1190    if 'virtio-blk' not in out:
   1191        notrun('Missing virtio-blk in QEMU binary')
   1192
   1193def _verify_virtio_scsi_pci_or_ccw() -> None:
   1194    out = qemu_pipe('-M', 'none', '-device', 'help')
   1195    if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out:
   1196        notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary')
   1197
   1198
   1199def supports_quorum():
   1200    return 'quorum' in qemu_img_pipe('--help')
   1201
   1202def verify_quorum():
   1203    '''Skip test suite if quorum support is not available'''
   1204    if not supports_quorum():
   1205        notrun('quorum support missing')
   1206
   1207def has_working_luks() -> Tuple[bool, str]:
   1208    """
   1209    Check whether our LUKS driver can actually create images
   1210    (this extends to LUKS encryption for qcow2).
   1211
   1212    If not, return the reason why.
   1213    """
   1214
   1215    img_file = f'{test_dir}/luks-test.luks'
   1216    (output, status) = \
   1217        qemu_img_pipe_and_status('create', '-f', 'luks',
   1218                                 '--object', luks_default_secret_object,
   1219                                 '-o', luks_default_key_secret_opt,
   1220                                 '-o', 'iter-time=10',
   1221                                 img_file, '1G')
   1222    try:
   1223        os.remove(img_file)
   1224    except OSError:
   1225        pass
   1226
   1227    if status != 0:
   1228        reason = output
   1229        for line in output.splitlines():
   1230            if img_file + ':' in line:
   1231                reason = line.split(img_file + ':', 1)[1].strip()
   1232                break
   1233
   1234        return (False, reason)
   1235    else:
   1236        return (True, '')
   1237
   1238def verify_working_luks():
   1239    """
   1240    Skip test suite if LUKS does not work
   1241    """
   1242    (working, reason) = has_working_luks()
   1243    if not working:
   1244        notrun(reason)
   1245
   1246def qemu_pipe(*args: str) -> str:
   1247    """
   1248    Run qemu with an option to print something and exit (e.g. a help option).
   1249
   1250    :return: QEMU's stdout output.
   1251    """
   1252    full_args = [qemu_prog] + qemu_opts + list(args)
   1253    output, _ = qemu_tool_pipe_and_status('qemu', full_args)
   1254    return output
   1255
   1256def supported_formats(read_only=False):
   1257    '''Set 'read_only' to True to check ro-whitelist
   1258       Otherwise, rw-whitelist is checked'''
   1259
   1260    if not hasattr(supported_formats, "formats"):
   1261        supported_formats.formats = {}
   1262
   1263    if read_only not in supported_formats.formats:
   1264        format_message = qemu_pipe("-drive", "format=help")
   1265        line = 1 if read_only else 0
   1266        supported_formats.formats[read_only] = \
   1267            format_message.splitlines()[line].split(":")[1].split()
   1268
   1269    return supported_formats.formats[read_only]
   1270
   1271def skip_if_unsupported(required_formats=(), read_only=False):
   1272    '''Skip Test Decorator
   1273       Runs the test if all the required formats are whitelisted'''
   1274    def skip_test_decorator(func):
   1275        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
   1276                         **kwargs: Dict[str, Any]) -> None:
   1277            if callable(required_formats):
   1278                fmts = required_formats(test_case)
   1279            else:
   1280                fmts = required_formats
   1281
   1282            usf_list = list(set(fmts) - set(supported_formats(read_only)))
   1283            if usf_list:
   1284                msg = f'{test_case}: formats {usf_list} are not whitelisted'
   1285                test_case.case_skip(msg)
   1286            else:
   1287                func(test_case, *args, **kwargs)
   1288        return func_wrapper
   1289    return skip_test_decorator
   1290
   1291def skip_for_formats(formats: Sequence[str] = ()) \
   1292    -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
   1293                Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
   1294    '''Skip Test Decorator
   1295       Skips the test for the given formats'''
   1296    def skip_test_decorator(func):
   1297        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
   1298                         **kwargs: Dict[str, Any]) -> None:
   1299            if imgfmt in formats:
   1300                msg = f'{test_case}: Skipped for format {imgfmt}'
   1301                test_case.case_skip(msg)
   1302            else:
   1303                func(test_case, *args, **kwargs)
   1304        return func_wrapper
   1305    return skip_test_decorator
   1306
   1307def skip_if_user_is_root(func):
   1308    '''Skip Test Decorator
   1309       Runs the test only without root permissions'''
   1310    def func_wrapper(*args, **kwargs):
   1311        if os.getuid() == 0:
   1312            case_notrun('{}: cannot be run as root'.format(args[0]))
   1313            return None
   1314        else:
   1315            return func(*args, **kwargs)
   1316    return func_wrapper
   1317
   1318# We need to filter out the time taken from the output so that
   1319# qemu-iotest can reliably diff the results against master output,
   1320# and hide skipped tests from the reference output.
   1321
   1322class ReproducibleTestResult(unittest.TextTestResult):
   1323    def addSkip(self, test, reason):
   1324        # Same as TextTestResult, but print dot instead of "s"
   1325        unittest.TestResult.addSkip(self, test, reason)
   1326        if self.showAll:
   1327            self.stream.writeln("skipped {0!r}".format(reason))
   1328        elif self.dots:
   1329            self.stream.write(".")
   1330            self.stream.flush()
   1331
   1332class ReproducibleStreamWrapper:
   1333    def __init__(self, stream: TextIO):
   1334        self.stream = stream
   1335
   1336    def __getattr__(self, attr):
   1337        if attr in ('stream', '__getstate__'):
   1338            raise AttributeError(attr)
   1339        return getattr(self.stream, attr)
   1340
   1341    def write(self, arg=None):
   1342        arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg)
   1343        arg = re.sub(r' \(skipped=\d+\)', r'', arg)
   1344        self.stream.write(arg)
   1345
   1346class ReproducibleTestRunner(unittest.TextTestRunner):
   1347    def __init__(self, stream: Optional[TextIO] = None,
   1348             resultclass: Type[unittest.TestResult] = ReproducibleTestResult,
   1349             **kwargs: Any) -> None:
   1350        rstream = ReproducibleStreamWrapper(stream or sys.stdout)
   1351        super().__init__(stream=rstream,           # type: ignore
   1352                         descriptions=True,
   1353                         resultclass=resultclass,
   1354                         **kwargs)
   1355
   1356def execute_unittest(argv: List[str], debug: bool = False) -> None:
   1357    """Executes unittests within the calling module."""
   1358
   1359    # Some tests have warnings, especially ResourceWarnings for unclosed
   1360    # files and sockets.  Ignore them for now to ensure reproducibility of
   1361    # the test output.
   1362    unittest.main(argv=argv,
   1363                  testRunner=ReproducibleTestRunner,
   1364                  verbosity=2 if debug else 1,
   1365                  warnings=None if sys.warnoptions else 'ignore')
   1366
   1367def execute_setup_common(supported_fmts: Sequence[str] = (),
   1368                         supported_platforms: Sequence[str] = (),
   1369                         supported_cache_modes: Sequence[str] = (),
   1370                         supported_aio_modes: Sequence[str] = (),
   1371                         unsupported_fmts: Sequence[str] = (),
   1372                         supported_protocols: Sequence[str] = (),
   1373                         unsupported_protocols: Sequence[str] = (),
   1374                         required_fmts: Sequence[str] = ()) -> bool:
   1375    """
   1376    Perform necessary setup for either script-style or unittest-style tests.
   1377
   1378    :return: Bool; Whether or not debug mode has been requested via the CLI.
   1379    """
   1380    # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
   1381
   1382    debug = '-d' in sys.argv
   1383    if debug:
   1384        sys.argv.remove('-d')
   1385    logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
   1386
   1387    _verify_image_format(supported_fmts, unsupported_fmts)
   1388    _verify_protocol(supported_protocols, unsupported_protocols)
   1389    _verify_platform(supported=supported_platforms)
   1390    _verify_cache_mode(supported_cache_modes)
   1391    _verify_aio_mode(supported_aio_modes)
   1392    _verify_formats(required_fmts)
   1393    _verify_virtio_blk()
   1394
   1395    return debug
   1396
   1397def execute_test(*args, test_function=None, **kwargs):
   1398    """Run either unittest or script-style tests."""
   1399
   1400    debug = execute_setup_common(*args, **kwargs)
   1401    if not test_function:
   1402        execute_unittest(sys.argv, debug)
   1403    else:
   1404        test_function()
   1405
   1406def activate_logging():
   1407    """Activate iotests.log() output to stdout for script-style tests."""
   1408    handler = logging.StreamHandler(stream=sys.stdout)
   1409    formatter = logging.Formatter('%(message)s')
   1410    handler.setFormatter(formatter)
   1411    test_logger.addHandler(handler)
   1412    test_logger.setLevel(logging.INFO)
   1413    test_logger.propagate = False
   1414
   1415# This is called from script-style iotests without a single point of entry
   1416def script_initialize(*args, **kwargs):
   1417    """Initialize script-style tests without running any tests."""
   1418    activate_logging()
   1419    execute_setup_common(*args, **kwargs)
   1420
   1421# This is called from script-style iotests with a single point of entry
   1422def script_main(test_function, *args, **kwargs):
   1423    """Run script-style tests outside of the unittest framework"""
   1424    activate_logging()
   1425    execute_test(*args, test_function=test_function, **kwargs)
   1426
   1427# This is called from unittest style iotests
   1428def main(*args, **kwargs):
   1429    """Run tests using the unittest framework"""
   1430    execute_test(*args, **kwargs)