iotests.py (51215B)
1# Common utilities and Python wrappers for qemu-iotests 2# 3# Copyright (C) 2012 IBM Corp. 4# 5# This program is free software; you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation; either version 2 of the License, or 8# (at your option) any later version. 9# 10# This program is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with this program. If not, see <http://www.gnu.org/licenses/>. 17# 18 19import atexit 20import bz2 21from collections import OrderedDict 22import faulthandler 23import json 24import logging 25import os 26import re 27import shutil 28import signal 29import struct 30import subprocess 31import sys 32import time 33from typing import (Any, Callable, Dict, Iterable, 34 List, Optional, Sequence, TextIO, Tuple, Type, TypeVar) 35import unittest 36 37from contextlib import contextmanager 38 39from qemu.machine import qtest 40from qemu.qmp import QMPMessage 41 42# Use this logger for logging messages directly from the iotests module 43logger = logging.getLogger('qemu.iotests') 44logger.addHandler(logging.NullHandler()) 45 46# Use this logger for messages that ought to be used for diff output. 47test_logger = logging.getLogger('qemu.iotests.diff_io') 48 49 50faulthandler.enable() 51 52# This will not work if arguments contain spaces but is necessary if we 53# want to support the override options that ./check supports. 54qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')] 55if os.environ.get('QEMU_IMG_OPTIONS'): 56 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ') 57 58qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 59if os.environ.get('QEMU_IO_OPTIONS'): 60 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ') 61 62qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 63if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'): 64 qemu_io_args_no_fmt += \ 65 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ') 66 67qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd') 68qemu_nbd_args = [qemu_nbd_prog] 69if os.environ.get('QEMU_NBD_OPTIONS'): 70 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ') 71 72qemu_prog = os.environ.get('QEMU_PROG', 'qemu') 73qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ') 74 75gdb_qemu_env = os.environ.get('GDB_OPTIONS') 76qemu_gdb = [] 77if gdb_qemu_env: 78 qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ') 79 80qemu_print = os.environ.get('PRINT_QEMU', False) 81 82imgfmt = os.environ.get('IMGFMT', 'raw') 83imgproto = os.environ.get('IMGPROTO', 'file') 84output_dir = os.environ.get('OUTPUT_DIR', '.') 85 86try: 87 test_dir = os.environ['TEST_DIR'] 88 sock_dir = os.environ['SOCK_DIR'] 89 cachemode = os.environ['CACHEMODE'] 90 aiomode = os.environ['AIOMODE'] 91 qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE'] 92except KeyError: 93 # We are using these variables as proxies to indicate that we're 94 # not being run via "check". There may be other things set up by 95 # "check" that individual test cases rely on. 96 sys.stderr.write('Please run this test via the "check" script\n') 97 sys.exit(os.EX_USAGE) 98 99qemu_valgrind = [] 100if os.environ.get('VALGRIND_QEMU') == "y" and \ 101 os.environ.get('NO_VALGRIND') != "y": 102 valgrind_logfile = "--log-file=" + test_dir 103 # %p allows to put the valgrind process PID, since 104 # we don't know it a priori (subprocess.Popen is 105 # not yet invoked) 106 valgrind_logfile += "/%p.valgrind" 107 108 qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99'] 109 110luks_default_secret_object = 'secret,id=keysec0,data=' + \ 111 os.environ.get('IMGKEYSECRET', '') 112luks_default_key_secret_opt = 'key-secret=keysec0' 113 114sample_img_dir = os.environ['SAMPLE_IMG_DIR'] 115 116 117def unarchive_sample_image(sample, fname): 118 sample_fname = os.path.join(sample_img_dir, sample + '.bz2') 119 with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out: 120 shutil.copyfileobj(f_in, f_out) 121 122 123def qemu_tool_pipe_and_status(tool: str, args: Sequence[str], 124 connect_stderr: bool = True) -> Tuple[str, int]: 125 """ 126 Run a tool and return both its output and its exit code 127 """ 128 stderr = subprocess.STDOUT if connect_stderr else None 129 with subprocess.Popen(args, stdout=subprocess.PIPE, 130 stderr=stderr, universal_newlines=True) as subp: 131 output = subp.communicate()[0] 132 if subp.returncode < 0: 133 cmd = ' '.join(args) 134 sys.stderr.write(f'{tool} received signal \ 135 {-subp.returncode}: {cmd}\n') 136 return (output, subp.returncode) 137 138def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]: 139 """ 140 Run qemu-img and return both its output and its exit code 141 """ 142 full_args = qemu_img_args + list(args) 143 return qemu_tool_pipe_and_status('qemu-img', full_args) 144 145def qemu_img(*args: str) -> int: 146 '''Run qemu-img and return the exit code''' 147 return qemu_img_pipe_and_status(*args)[1] 148 149def ordered_qmp(qmsg, conv_keys=True): 150 # Dictionaries are not ordered prior to 3.6, therefore: 151 if isinstance(qmsg, list): 152 return [ordered_qmp(atom) for atom in qmsg] 153 if isinstance(qmsg, dict): 154 od = OrderedDict() 155 for k, v in sorted(qmsg.items()): 156 if conv_keys: 157 k = k.replace('_', '-') 158 od[k] = ordered_qmp(v, conv_keys=False) 159 return od 160 return qmsg 161 162def qemu_img_create(*args): 163 args = list(args) 164 165 # default luks support 166 if '-f' in args and args[args.index('-f') + 1] == 'luks': 167 if '-o' in args: 168 i = args.index('-o') 169 if 'key-secret' not in args[i + 1]: 170 args[i + 1].append(luks_default_key_secret_opt) 171 args.insert(i + 2, '--object') 172 args.insert(i + 3, luks_default_secret_object) 173 else: 174 args = ['-o', luks_default_key_secret_opt, 175 '--object', luks_default_secret_object] + args 176 177 args.insert(0, 'create') 178 179 return qemu_img(*args) 180 181def qemu_img_measure(*args): 182 return json.loads(qemu_img_pipe("measure", "--output", "json", *args)) 183 184def qemu_img_check(*args): 185 return json.loads(qemu_img_pipe("check", "--output", "json", *args)) 186 187def qemu_img_verbose(*args): 188 '''Run qemu-img without suppressing its output and return the exit code''' 189 exitcode = subprocess.call(qemu_img_args + list(args)) 190 if exitcode < 0: 191 sys.stderr.write('qemu-img received signal %i: %s\n' 192 % (-exitcode, ' '.join(qemu_img_args + list(args)))) 193 return exitcode 194 195def qemu_img_pipe(*args: str) -> str: 196 '''Run qemu-img and return its output''' 197 return qemu_img_pipe_and_status(*args)[0] 198 199def qemu_img_log(*args): 200 result = qemu_img_pipe(*args) 201 log(result, filters=[filter_testfiles]) 202 return result 203 204def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()): 205 args = ['info'] 206 if imgopts: 207 args.append('--image-opts') 208 else: 209 args += ['-f', imgfmt] 210 args += extra_args 211 args.append(filename) 212 213 output = qemu_img_pipe(*args) 214 if not filter_path: 215 filter_path = filename 216 log(filter_img_info(output, filter_path)) 217 218def qemu_io(*args): 219 '''Run qemu-io and return the stdout data''' 220 args = qemu_io_args + list(args) 221 return qemu_tool_pipe_and_status('qemu-io', args)[0] 222 223def qemu_io_log(*args): 224 result = qemu_io(*args) 225 log(result, filters=[filter_testfiles, filter_qemu_io]) 226 return result 227 228def qemu_io_silent(*args): 229 '''Run qemu-io and return the exit code, suppressing stdout''' 230 if '-f' in args or '--image-opts' in args: 231 default_args = qemu_io_args_no_fmt 232 else: 233 default_args = qemu_io_args 234 235 args = default_args + list(args) 236 result = subprocess.run(args, stdout=subprocess.DEVNULL, check=False) 237 if result.returncode < 0: 238 sys.stderr.write('qemu-io received signal %i: %s\n' % 239 (-result.returncode, ' '.join(args))) 240 return result.returncode 241 242def qemu_io_silent_check(*args): 243 '''Run qemu-io and return the true if subprocess returned 0''' 244 args = qemu_io_args + list(args) 245 result = subprocess.run(args, stdout=subprocess.DEVNULL, 246 stderr=subprocess.STDOUT, check=False) 247 return result.returncode == 0 248 249class QemuIoInteractive: 250 def __init__(self, *args): 251 self.args = qemu_io_args_no_fmt + list(args) 252 # We need to keep the Popen objext around, and not 253 # close it immediately. Therefore, disable the pylint check: 254 # pylint: disable=consider-using-with 255 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE, 256 stdout=subprocess.PIPE, 257 stderr=subprocess.STDOUT, 258 universal_newlines=True) 259 out = self._p.stdout.read(9) 260 if out != 'qemu-io> ': 261 # Most probably qemu-io just failed to start. 262 # Let's collect the whole output and exit. 263 out += self._p.stdout.read() 264 self._p.wait(timeout=1) 265 raise ValueError(out) 266 267 def close(self): 268 self._p.communicate('q\n') 269 270 def _read_output(self): 271 pattern = 'qemu-io> ' 272 n = len(pattern) 273 pos = 0 274 s = [] 275 while pos != n: 276 c = self._p.stdout.read(1) 277 # check unexpected EOF 278 assert c != '' 279 s.append(c) 280 if c == pattern[pos]: 281 pos += 1 282 else: 283 pos = 0 284 285 return ''.join(s[:-n]) 286 287 def cmd(self, cmd): 288 # quit command is in close(), '\n' is added automatically 289 assert '\n' not in cmd 290 cmd = cmd.strip() 291 assert cmd not in ('q', 'quit') 292 self._p.stdin.write(cmd + '\n') 293 self._p.stdin.flush() 294 return self._read_output() 295 296 297def qemu_nbd(*args): 298 '''Run qemu-nbd in daemon mode and return the parent's exit code''' 299 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args)) 300 301def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]: 302 '''Run qemu-nbd in daemon mode and return both the parent's exit code 303 and its output in case of an error''' 304 full_args = qemu_nbd_args + ['--fork'] + list(args) 305 output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args, 306 connect_stderr=False) 307 return returncode, output if returncode else '' 308 309def qemu_nbd_list_log(*args: str) -> str: 310 '''Run qemu-nbd to list remote exports''' 311 full_args = [qemu_nbd_prog, '-L'] + list(args) 312 output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args) 313 log(output, filters=[filter_testfiles, filter_nbd_exports]) 314 return output 315 316@contextmanager 317def qemu_nbd_popen(*args): 318 '''Context manager running qemu-nbd within the context''' 319 pid_file = file_path("qemu_nbd_popen-nbd-pid-file") 320 321 assert not os.path.exists(pid_file) 322 323 cmd = list(qemu_nbd_args) 324 cmd.extend(('--persistent', '--pid-file', pid_file)) 325 cmd.extend(args) 326 327 log('Start NBD server') 328 with subprocess.Popen(cmd) as p: 329 try: 330 while not os.path.exists(pid_file): 331 if p.poll() is not None: 332 raise RuntimeError( 333 "qemu-nbd terminated with exit code {}: {}" 334 .format(p.returncode, ' '.join(cmd))) 335 336 time.sleep(0.01) 337 yield 338 finally: 339 if os.path.exists(pid_file): 340 os.remove(pid_file) 341 log('Kill NBD server') 342 p.kill() 343 p.wait() 344 345def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt): 346 '''Return True if two image files are identical''' 347 return qemu_img('compare', '-f', fmt1, 348 '-F', fmt2, img1, img2) == 0 349 350def create_image(name, size): 351 '''Create a fully-allocated raw image with sector markers''' 352 with open(name, 'wb') as file: 353 i = 0 354 while i < size: 355 sector = struct.pack('>l504xl', i // 512, i // 512) 356 file.write(sector) 357 i = i + 512 358 359def image_size(img): 360 '''Return image's virtual size''' 361 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img) 362 return json.loads(r)['virtual-size'] 363 364def is_str(val): 365 return isinstance(val, str) 366 367test_dir_re = re.compile(r"%s" % test_dir) 368def filter_test_dir(msg): 369 return test_dir_re.sub("TEST_DIR", msg) 370 371win32_re = re.compile(r"\r") 372def filter_win32(msg): 373 return win32_re.sub("", msg) 374 375qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* " 376 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec " 377 r"and [0-9\/.inf]* ops\/sec\)") 378def filter_qemu_io(msg): 379 msg = filter_win32(msg) 380 return qemu_io_re.sub("X ops; XX:XX:XX.X " 381 "(XXX YYY/sec and XXX ops/sec)", msg) 382 383chown_re = re.compile(r"chown [0-9]+:[0-9]+") 384def filter_chown(msg): 385 return chown_re.sub("chown UID:GID", msg) 386 387def filter_qmp_event(event): 388 '''Filter a QMP event dict''' 389 event = dict(event) 390 if 'timestamp' in event: 391 event['timestamp']['seconds'] = 'SECS' 392 event['timestamp']['microseconds'] = 'USECS' 393 return event 394 395def filter_qmp(qmsg, filter_fn): 396 '''Given a string filter, filter a QMP object's values. 397 filter_fn takes a (key, value) pair.''' 398 # Iterate through either lists or dicts; 399 if isinstance(qmsg, list): 400 items = enumerate(qmsg) 401 else: 402 items = qmsg.items() 403 404 for k, v in items: 405 if isinstance(v, (dict, list)): 406 qmsg[k] = filter_qmp(v, filter_fn) 407 else: 408 qmsg[k] = filter_fn(k, v) 409 return qmsg 410 411def filter_testfiles(msg): 412 pref1 = os.path.join(test_dir, "%s-" % (os.getpid())) 413 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid())) 414 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-') 415 416def filter_qmp_testfiles(qmsg): 417 def _filter(_key, value): 418 if is_str(value): 419 return filter_testfiles(value) 420 return value 421 return filter_qmp(qmsg, _filter) 422 423def filter_virtio_scsi(output: str) -> str: 424 return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output) 425 426def filter_qmp_virtio_scsi(qmsg): 427 def _filter(_key, value): 428 if is_str(value): 429 return filter_virtio_scsi(value) 430 return value 431 return filter_qmp(qmsg, _filter) 432 433def filter_generated_node_ids(msg): 434 return re.sub("#block[0-9]+", "NODE_NAME", msg) 435 436def filter_img_info(output, filename): 437 lines = [] 438 for line in output.split('\n'): 439 if 'disk size' in line or 'actual-size' in line: 440 continue 441 line = line.replace(filename, 'TEST_IMG') 442 line = filter_testfiles(line) 443 line = line.replace(imgfmt, 'IMGFMT') 444 line = re.sub('iters: [0-9]+', 'iters: XXX', line) 445 line = re.sub('uuid: [-a-f0-9]+', 446 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', 447 line) 448 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line) 449 lines.append(line) 450 return '\n'.join(lines) 451 452def filter_imgfmt(msg): 453 return msg.replace(imgfmt, 'IMGFMT') 454 455def filter_qmp_imgfmt(qmsg): 456 def _filter(_key, value): 457 if is_str(value): 458 return filter_imgfmt(value) 459 return value 460 return filter_qmp(qmsg, _filter) 461 462def filter_nbd_exports(output: str) -> str: 463 return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output) 464 465 466Msg = TypeVar('Msg', Dict[str, Any], List[Any], str) 467 468def log(msg: Msg, 469 filters: Iterable[Callable[[Msg], Msg]] = (), 470 indent: Optional[int] = None) -> None: 471 """ 472 Logs either a string message or a JSON serializable message (like QMP). 473 If indent is provided, JSON serializable messages are pretty-printed. 474 """ 475 for flt in filters: 476 msg = flt(msg) 477 if isinstance(msg, (dict, list)): 478 # Don't sort if it's already sorted 479 do_sort = not isinstance(msg, OrderedDict) 480 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent)) 481 else: 482 test_logger.info(msg) 483 484class Timeout: 485 def __init__(self, seconds, errmsg="Timeout"): 486 self.seconds = seconds 487 self.errmsg = errmsg 488 def __enter__(self): 489 if qemu_gdb or qemu_valgrind: 490 return self 491 signal.signal(signal.SIGALRM, self.timeout) 492 signal.setitimer(signal.ITIMER_REAL, self.seconds) 493 return self 494 def __exit__(self, exc_type, value, traceback): 495 if qemu_gdb or qemu_valgrind: 496 return False 497 signal.setitimer(signal.ITIMER_REAL, 0) 498 return False 499 def timeout(self, signum, frame): 500 raise Exception(self.errmsg) 501 502def file_pattern(name): 503 return "{0}-{1}".format(os.getpid(), name) 504 505class FilePath: 506 """ 507 Context manager generating multiple file names. The generated files are 508 removed when exiting the context. 509 510 Example usage: 511 512 with FilePath('a.img', 'b.img') as (img_a, img_b): 513 # Use img_a and img_b here... 514 515 # a.img and b.img are automatically removed here. 516 517 By default images are created in iotests.test_dir. To create sockets use 518 iotests.sock_dir: 519 520 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock: 521 522 For convenience, calling with one argument yields a single file instead of 523 a tuple with one item. 524 525 """ 526 def __init__(self, *names, base_dir=test_dir): 527 self.paths = [os.path.join(base_dir, file_pattern(name)) 528 for name in names] 529 530 def __enter__(self): 531 if len(self.paths) == 1: 532 return self.paths[0] 533 else: 534 return self.paths 535 536 def __exit__(self, exc_type, exc_val, exc_tb): 537 for path in self.paths: 538 try: 539 os.remove(path) 540 except OSError: 541 pass 542 return False 543 544 545def try_remove(img): 546 try: 547 os.remove(img) 548 except OSError: 549 pass 550 551def file_path_remover(): 552 for path in reversed(file_path_remover.paths): 553 try_remove(path) 554 555 556def file_path(*names, base_dir=test_dir): 557 ''' Another way to get auto-generated filename that cleans itself up. 558 559 Use is as simple as: 560 561 img_a, img_b = file_path('a.img', 'b.img') 562 sock = file_path('socket') 563 ''' 564 565 if not hasattr(file_path_remover, 'paths'): 566 file_path_remover.paths = [] 567 atexit.register(file_path_remover) 568 569 paths = [] 570 for name in names: 571 filename = file_pattern(name) 572 path = os.path.join(base_dir, filename) 573 file_path_remover.paths.append(path) 574 paths.append(path) 575 576 return paths[0] if len(paths) == 1 else paths 577 578def remote_filename(path): 579 if imgproto == 'file': 580 return path 581 elif imgproto == 'ssh': 582 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path) 583 else: 584 raise Exception("Protocol %s not supported" % (imgproto)) 585 586class VM(qtest.QEMUQtestMachine): 587 '''A QEMU VM''' 588 589 def __init__(self, path_suffix=''): 590 name = "qemu%s-%d" % (path_suffix, os.getpid()) 591 timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None 592 if qemu_gdb and qemu_valgrind: 593 sys.stderr.write('gdb and valgrind are mutually exclusive\n') 594 sys.exit(1) 595 wrapper = qemu_gdb if qemu_gdb else qemu_valgrind 596 super().__init__(qemu_prog, qemu_opts, wrapper=wrapper, 597 name=name, 598 base_temp_dir=test_dir, 599 sock_dir=sock_dir, qmp_timer=timer) 600 self._num_drives = 0 601 602 def _post_shutdown(self) -> None: 603 super()._post_shutdown() 604 if not qemu_valgrind or not self._popen: 605 return 606 valgrind_filename = f"{test_dir}/{self._popen.pid}.valgrind" 607 if self.exitcode() == 99: 608 with open(valgrind_filename, encoding='utf-8') as f: 609 print(f.read()) 610 else: 611 os.remove(valgrind_filename) 612 613 def _pre_launch(self) -> None: 614 super()._pre_launch() 615 if qemu_print: 616 # set QEMU binary output to stdout 617 self._close_qemu_log_file() 618 619 def add_object(self, opts): 620 self._args.append('-object') 621 self._args.append(opts) 622 return self 623 624 def add_device(self, opts): 625 self._args.append('-device') 626 self._args.append(opts) 627 return self 628 629 def add_drive_raw(self, opts): 630 self._args.append('-drive') 631 self._args.append(opts) 632 return self 633 634 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt): 635 '''Add a virtio-blk drive to the VM''' 636 options = ['if=%s' % interface, 637 'id=drive%d' % self._num_drives] 638 639 if path is not None: 640 options.append('file=%s' % path) 641 options.append('format=%s' % img_format) 642 options.append('cache=%s' % cachemode) 643 options.append('aio=%s' % aiomode) 644 645 if opts: 646 options.append(opts) 647 648 if img_format == 'luks' and 'key-secret' not in opts: 649 # default luks support 650 if luks_default_secret_object not in self._args: 651 self.add_object(luks_default_secret_object) 652 653 options.append(luks_default_key_secret_opt) 654 655 self._args.append('-drive') 656 self._args.append(','.join(options)) 657 self._num_drives += 1 658 return self 659 660 def add_blockdev(self, opts): 661 self._args.append('-blockdev') 662 if isinstance(opts, str): 663 self._args.append(opts) 664 else: 665 self._args.append(','.join(opts)) 666 return self 667 668 def add_incoming(self, addr): 669 self._args.append('-incoming') 670 self._args.append(addr) 671 return self 672 673 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage: 674 cmd = 'human-monitor-command' 675 kwargs: Dict[str, Any] = {'command-line': command_line} 676 if use_log: 677 return self.qmp_log(cmd, **kwargs) 678 else: 679 return self.qmp(cmd, **kwargs) 680 681 def pause_drive(self, drive: str, event: Optional[str] = None) -> None: 682 """Pause drive r/w operations""" 683 if not event: 684 self.pause_drive(drive, "read_aio") 685 self.pause_drive(drive, "write_aio") 686 return 687 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"') 688 689 def resume_drive(self, drive: str) -> None: 690 """Resume drive r/w operations""" 691 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"') 692 693 def hmp_qemu_io(self, drive: str, cmd: str, 694 use_log: bool = False, qdev: bool = False) -> QMPMessage: 695 """Write to a given drive using an HMP command""" 696 d = '-d ' if qdev else '' 697 return self.hmp(f'qemu-io {d}{drive} "{cmd}"', use_log=use_log) 698 699 def flatten_qmp_object(self, obj, output=None, basestr=''): 700 if output is None: 701 output = {} 702 if isinstance(obj, list): 703 for i, item in enumerate(obj): 704 self.flatten_qmp_object(item, output, basestr + str(i) + '.') 705 elif isinstance(obj, dict): 706 for key in obj: 707 self.flatten_qmp_object(obj[key], output, basestr + key + '.') 708 else: 709 output[basestr[:-1]] = obj # Strip trailing '.' 710 return output 711 712 def qmp_to_opts(self, obj): 713 obj = self.flatten_qmp_object(obj) 714 output_list = [] 715 for key in obj: 716 output_list += [key + '=' + obj[key]] 717 return ','.join(output_list) 718 719 def get_qmp_events_filtered(self, wait=60.0): 720 result = [] 721 for ev in self.get_qmp_events(wait=wait): 722 result.append(filter_qmp_event(ev)) 723 return result 724 725 def qmp_log(self, cmd, filters=(), indent=None, **kwargs): 726 full_cmd = OrderedDict(( 727 ("execute", cmd), 728 ("arguments", ordered_qmp(kwargs)) 729 )) 730 log(full_cmd, filters, indent=indent) 731 result = self.qmp(cmd, **kwargs) 732 log(result, filters, indent=indent) 733 return result 734 735 # Returns None on success, and an error string on failure 736 def run_job(self, job, auto_finalize=True, auto_dismiss=False, 737 pre_finalize=None, cancel=False, wait=60.0): 738 """ 739 run_job moves a job from creation through to dismissal. 740 741 :param job: String. ID of recently-launched job 742 :param auto_finalize: Bool. True if the job was launched with 743 auto_finalize. Defaults to True. 744 :param auto_dismiss: Bool. True if the job was launched with 745 auto_dismiss=True. Defaults to False. 746 :param pre_finalize: Callback. A callable that takes no arguments to be 747 invoked prior to issuing job-finalize, if any. 748 :param cancel: Bool. When true, cancels the job after the pre_finalize 749 callback. 750 :param wait: Float. Timeout value specifying how long to wait for any 751 event, in seconds. Defaults to 60.0. 752 """ 753 match_device = {'data': {'device': job}} 754 match_id = {'data': {'id': job}} 755 events = [ 756 ('BLOCK_JOB_COMPLETED', match_device), 757 ('BLOCK_JOB_CANCELLED', match_device), 758 ('BLOCK_JOB_ERROR', match_device), 759 ('BLOCK_JOB_READY', match_device), 760 ('BLOCK_JOB_PENDING', match_id), 761 ('JOB_STATUS_CHANGE', match_id) 762 ] 763 error = None 764 while True: 765 ev = filter_qmp_event(self.events_wait(events, timeout=wait)) 766 if ev['event'] != 'JOB_STATUS_CHANGE': 767 log(ev) 768 continue 769 status = ev['data']['status'] 770 if status == 'aborting': 771 result = self.qmp('query-jobs') 772 for j in result['return']: 773 if j['id'] == job: 774 error = j['error'] 775 log('Job failed: %s' % (j['error'])) 776 elif status == 'ready': 777 self.qmp_log('job-complete', id=job) 778 elif status == 'pending' and not auto_finalize: 779 if pre_finalize: 780 pre_finalize() 781 if cancel: 782 self.qmp_log('job-cancel', id=job) 783 else: 784 self.qmp_log('job-finalize', id=job) 785 elif status == 'concluded' and not auto_dismiss: 786 self.qmp_log('job-dismiss', id=job) 787 elif status == 'null': 788 return error 789 790 # Returns None on success, and an error string on failure 791 def blockdev_create(self, options, job_id='job0', filters=None): 792 if filters is None: 793 filters = [filter_qmp_testfiles] 794 result = self.qmp_log('blockdev-create', filters=filters, 795 job_id=job_id, options=options) 796 797 if 'return' in result: 798 assert result['return'] == {} 799 job_result = self.run_job(job_id) 800 else: 801 job_result = result['error'] 802 803 log("") 804 return job_result 805 806 def enable_migration_events(self, name): 807 log('Enabling migration QMP events on %s...' % name) 808 log(self.qmp('migrate-set-capabilities', capabilities=[ 809 { 810 'capability': 'events', 811 'state': True 812 } 813 ])) 814 815 def wait_migration(self, expect_runstate: Optional[str]) -> bool: 816 while True: 817 event = self.event_wait('MIGRATION') 818 # We use the default timeout, and with a timeout, event_wait() 819 # never returns None 820 assert event 821 822 log(event, filters=[filter_qmp_event]) 823 if event['data']['status'] in ('completed', 'failed'): 824 break 825 826 if event['data']['status'] == 'completed': 827 # The event may occur in finish-migrate, so wait for the expected 828 # post-migration runstate 829 runstate = None 830 while runstate != expect_runstate: 831 runstate = self.qmp('query-status')['return']['status'] 832 return True 833 else: 834 return False 835 836 def node_info(self, node_name): 837 nodes = self.qmp('query-named-block-nodes') 838 for x in nodes['return']: 839 if x['node-name'] == node_name: 840 return x 841 return None 842 843 def query_bitmaps(self): 844 res = self.qmp("query-named-block-nodes") 845 return {device['node-name']: device['dirty-bitmaps'] 846 for device in res['return'] if 'dirty-bitmaps' in device} 847 848 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None): 849 """ 850 get a specific bitmap from the object returned by query_bitmaps. 851 :param recording: If specified, filter results by the specified value. 852 :param bitmaps: If specified, use it instead of call query_bitmaps() 853 """ 854 if bitmaps is None: 855 bitmaps = self.query_bitmaps() 856 857 for bitmap in bitmaps[node_name]: 858 if bitmap.get('name', '') == bitmap_name: 859 if recording is None or bitmap.get('recording') == recording: 860 return bitmap 861 return None 862 863 def check_bitmap_status(self, node_name, bitmap_name, fields): 864 ret = self.get_bitmap(node_name, bitmap_name) 865 866 return fields.items() <= ret.items() 867 868 def assert_block_path(self, root, path, expected_node, graph=None): 869 """ 870 Check whether the node under the given path in the block graph 871 is @expected_node. 872 873 @root is the node name of the node where the @path is rooted. 874 875 @path is a string that consists of child names separated by 876 slashes. It must begin with a slash. 877 878 Examples for @root + @path: 879 - root="qcow2-node", path="/backing/file" 880 - root="quorum-node", path="/children.2/file" 881 882 Hypothetically, @path could be empty, in which case it would 883 point to @root. However, in practice this case is not useful 884 and hence not allowed. 885 886 @expected_node may be None. (All elements of the path but the 887 leaf must still exist.) 888 889 @graph may be None or the result of an x-debug-query-block-graph 890 call that has already been performed. 891 """ 892 if graph is None: 893 graph = self.qmp('x-debug-query-block-graph')['return'] 894 895 iter_path = iter(path.split('/')) 896 897 # Must start with a / 898 assert next(iter_path) == '' 899 900 node = next((node for node in graph['nodes'] if node['name'] == root), 901 None) 902 903 # An empty @path is not allowed, so the root node must be present 904 assert node is not None, 'Root node %s not found' % root 905 906 for child_name in iter_path: 907 assert node is not None, 'Cannot follow path %s%s' % (root, path) 908 909 try: 910 node_id = next(edge['child'] for edge in graph['edges'] 911 if (edge['parent'] == node['id'] and 912 edge['name'] == child_name)) 913 914 node = next(node for node in graph['nodes'] 915 if node['id'] == node_id) 916 917 except StopIteration: 918 node = None 919 920 if node is None: 921 assert expected_node is None, \ 922 'No node found under %s (but expected %s)' % \ 923 (path, expected_node) 924 else: 925 assert node['name'] == expected_node, \ 926 'Found node %s under %s (but expected %s)' % \ 927 (node['name'], path, expected_node) 928 929index_re = re.compile(r'([^\[]+)\[([^\]]+)\]') 930 931class QMPTestCase(unittest.TestCase): 932 '''Abstract base class for QMP test cases''' 933 934 def __init__(self, *args, **kwargs): 935 super().__init__(*args, **kwargs) 936 # Many users of this class set a VM property we rely on heavily 937 # in the methods below. 938 self.vm = None 939 940 def dictpath(self, d, path): 941 '''Traverse a path in a nested dict''' 942 for component in path.split('/'): 943 m = index_re.match(component) 944 if m: 945 component, idx = m.groups() 946 idx = int(idx) 947 948 if not isinstance(d, dict) or component not in d: 949 self.fail(f'failed path traversal for "{path}" in "{d}"') 950 d = d[component] 951 952 if m: 953 if not isinstance(d, list): 954 self.fail(f'path component "{component}" in "{path}" ' 955 f'is not a list in "{d}"') 956 try: 957 d = d[idx] 958 except IndexError: 959 self.fail(f'invalid index "{idx}" in path "{path}" ' 960 f'in "{d}"') 961 return d 962 963 def assert_qmp_absent(self, d, path): 964 try: 965 result = self.dictpath(d, path) 966 except AssertionError: 967 return 968 self.fail('path "%s" has value "%s"' % (path, str(result))) 969 970 def assert_qmp(self, d, path, value): 971 '''Assert that the value for a specific path in a QMP dict 972 matches. When given a list of values, assert that any of 973 them matches.''' 974 975 result = self.dictpath(d, path) 976 977 # [] makes no sense as a list of valid values, so treat it as 978 # an actual single value. 979 if isinstance(value, list) and value != []: 980 for v in value: 981 if result == v: 982 return 983 self.fail('no match for "%s" in %s' % (str(result), str(value))) 984 else: 985 self.assertEqual(result, value, 986 '"%s" is "%s", expected "%s"' 987 % (path, str(result), str(value))) 988 989 def assert_no_active_block_jobs(self): 990 result = self.vm.qmp('query-block-jobs') 991 self.assert_qmp(result, 'return', []) 992 993 def assert_has_block_node(self, node_name=None, file_name=None): 994 """Issue a query-named-block-nodes and assert node_name and/or 995 file_name is present in the result""" 996 def check_equal_or_none(a, b): 997 return a is None or b is None or a == b 998 assert node_name or file_name 999 result = self.vm.qmp('query-named-block-nodes') 1000 for x in result["return"]: 1001 if check_equal_or_none(x.get("node-name"), node_name) and \ 1002 check_equal_or_none(x.get("file"), file_name): 1003 return 1004 self.fail("Cannot find %s %s in result:\n%s" % 1005 (node_name, file_name, result)) 1006 1007 def assert_json_filename_equal(self, json_filename, reference): 1008 '''Asserts that the given filename is a json: filename and that its 1009 content is equal to the given reference object''' 1010 self.assertEqual(json_filename[:5], 'json:') 1011 self.assertEqual( 1012 self.vm.flatten_qmp_object(json.loads(json_filename[5:])), 1013 self.vm.flatten_qmp_object(reference) 1014 ) 1015 1016 def cancel_and_wait(self, drive='drive0', force=False, 1017 resume=False, wait=60.0): 1018 '''Cancel a block job and wait for it to finish, returning the event''' 1019 result = self.vm.qmp('block-job-cancel', device=drive, force=force) 1020 self.assert_qmp(result, 'return', {}) 1021 1022 if resume: 1023 self.vm.resume_drive(drive) 1024 1025 cancelled = False 1026 result = None 1027 while not cancelled: 1028 for event in self.vm.get_qmp_events(wait=wait): 1029 if event['event'] == 'BLOCK_JOB_COMPLETED' or \ 1030 event['event'] == 'BLOCK_JOB_CANCELLED': 1031 self.assert_qmp(event, 'data/device', drive) 1032 result = event 1033 cancelled = True 1034 elif event['event'] == 'JOB_STATUS_CHANGE': 1035 self.assert_qmp(event, 'data/id', drive) 1036 1037 1038 self.assert_no_active_block_jobs() 1039 return result 1040 1041 def wait_until_completed(self, drive='drive0', check_offset=True, 1042 wait=60.0, error=None): 1043 '''Wait for a block job to finish, returning the event''' 1044 while True: 1045 for event in self.vm.get_qmp_events(wait=wait): 1046 if event['event'] == 'BLOCK_JOB_COMPLETED': 1047 self.assert_qmp(event, 'data/device', drive) 1048 if error is None: 1049 self.assert_qmp_absent(event, 'data/error') 1050 if check_offset: 1051 self.assert_qmp(event, 'data/offset', 1052 event['data']['len']) 1053 else: 1054 self.assert_qmp(event, 'data/error', error) 1055 self.assert_no_active_block_jobs() 1056 return event 1057 if event['event'] == 'JOB_STATUS_CHANGE': 1058 self.assert_qmp(event, 'data/id', drive) 1059 1060 def wait_ready(self, drive='drive0'): 1061 """Wait until a BLOCK_JOB_READY event, and return the event.""" 1062 return self.vm.events_wait([ 1063 ('BLOCK_JOB_READY', 1064 {'data': {'type': 'mirror', 'device': drive}}), 1065 ('BLOCK_JOB_READY', 1066 {'data': {'type': 'commit', 'device': drive}}) 1067 ]) 1068 1069 def wait_ready_and_cancel(self, drive='drive0'): 1070 self.wait_ready(drive=drive) 1071 event = self.cancel_and_wait(drive=drive) 1072 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED') 1073 self.assert_qmp(event, 'data/type', 'mirror') 1074 self.assert_qmp(event, 'data/offset', event['data']['len']) 1075 1076 def complete_and_wait(self, drive='drive0', wait_ready=True, 1077 completion_error=None): 1078 '''Complete a block job and wait for it to finish''' 1079 if wait_ready: 1080 self.wait_ready(drive=drive) 1081 1082 result = self.vm.qmp('block-job-complete', device=drive) 1083 self.assert_qmp(result, 'return', {}) 1084 1085 event = self.wait_until_completed(drive=drive, error=completion_error) 1086 self.assertTrue(event['data']['type'] in ['mirror', 'commit']) 1087 1088 def pause_wait(self, job_id='job0'): 1089 with Timeout(3, "Timeout waiting for job to pause"): 1090 while True: 1091 result = self.vm.qmp('query-block-jobs') 1092 found = False 1093 for job in result['return']: 1094 if job['device'] == job_id: 1095 found = True 1096 if job['paused'] and not job['busy']: 1097 return job 1098 break 1099 assert found 1100 1101 def pause_job(self, job_id='job0', wait=True): 1102 result = self.vm.qmp('block-job-pause', device=job_id) 1103 self.assert_qmp(result, 'return', {}) 1104 if wait: 1105 return self.pause_wait(job_id) 1106 return result 1107 1108 def case_skip(self, reason): 1109 '''Skip this test case''' 1110 case_notrun(reason) 1111 self.skipTest(reason) 1112 1113 1114def notrun(reason): 1115 '''Skip this test suite''' 1116 # Each test in qemu-iotests has a number ("seq") 1117 seq = os.path.basename(sys.argv[0]) 1118 1119 with open('%s/%s.notrun' % (output_dir, seq), 'w', encoding='utf-8') \ 1120 as outfile: 1121 outfile.write(reason + '\n') 1122 logger.warning("%s not run: %s", seq, reason) 1123 sys.exit(0) 1124 1125def case_notrun(reason): 1126 '''Mark this test case as not having been run (without actually 1127 skipping it, that is left to the caller). See 1128 QMPTestCase.case_skip() for a variant that actually skips the 1129 current test case.''' 1130 1131 # Each test in qemu-iotests has a number ("seq") 1132 seq = os.path.basename(sys.argv[0]) 1133 1134 with open('%s/%s.casenotrun' % (output_dir, seq), 'a', encoding='utf-8') \ 1135 as outfile: 1136 outfile.write(' [case not run] ' + reason + '\n') 1137 1138def _verify_image_format(supported_fmts: Sequence[str] = (), 1139 unsupported_fmts: Sequence[str] = ()) -> None: 1140 if 'generic' in supported_fmts and \ 1141 os.environ.get('IMGFMT_GENERIC', 'true') == 'true': 1142 # similar to 1143 # _supported_fmt generic 1144 # for bash tests 1145 supported_fmts = () 1146 1147 not_sup = supported_fmts and (imgfmt not in supported_fmts) 1148 if not_sup or (imgfmt in unsupported_fmts): 1149 notrun('not suitable for this image format: %s' % imgfmt) 1150 1151 if imgfmt == 'luks': 1152 verify_working_luks() 1153 1154def _verify_protocol(supported: Sequence[str] = (), 1155 unsupported: Sequence[str] = ()) -> None: 1156 assert not (supported and unsupported) 1157 1158 if 'generic' in supported: 1159 return 1160 1161 not_sup = supported and (imgproto not in supported) 1162 if not_sup or (imgproto in unsupported): 1163 notrun('not suitable for this protocol: %s' % imgproto) 1164 1165def _verify_platform(supported: Sequence[str] = (), 1166 unsupported: Sequence[str] = ()) -> None: 1167 if any((sys.platform.startswith(x) for x in unsupported)): 1168 notrun('not suitable for this OS: %s' % sys.platform) 1169 1170 if supported: 1171 if not any((sys.platform.startswith(x) for x in supported)): 1172 notrun('not suitable for this OS: %s' % sys.platform) 1173 1174def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None: 1175 if supported_cache_modes and (cachemode not in supported_cache_modes): 1176 notrun('not suitable for this cache mode: %s' % cachemode) 1177 1178def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None: 1179 if supported_aio_modes and (aiomode not in supported_aio_modes): 1180 notrun('not suitable for this aio mode: %s' % aiomode) 1181 1182def _verify_formats(required_formats: Sequence[str] = ()) -> None: 1183 usf_list = list(set(required_formats) - set(supported_formats())) 1184 if usf_list: 1185 notrun(f'formats {usf_list} are not whitelisted') 1186 1187 1188def _verify_virtio_blk() -> None: 1189 out = qemu_pipe('-M', 'none', '-device', 'help') 1190 if 'virtio-blk' not in out: 1191 notrun('Missing virtio-blk in QEMU binary') 1192 1193def _verify_virtio_scsi_pci_or_ccw() -> None: 1194 out = qemu_pipe('-M', 'none', '-device', 'help') 1195 if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out: 1196 notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary') 1197 1198 1199def supports_quorum(): 1200 return 'quorum' in qemu_img_pipe('--help') 1201 1202def verify_quorum(): 1203 '''Skip test suite if quorum support is not available''' 1204 if not supports_quorum(): 1205 notrun('quorum support missing') 1206 1207def has_working_luks() -> Tuple[bool, str]: 1208 """ 1209 Check whether our LUKS driver can actually create images 1210 (this extends to LUKS encryption for qcow2). 1211 1212 If not, return the reason why. 1213 """ 1214 1215 img_file = f'{test_dir}/luks-test.luks' 1216 (output, status) = \ 1217 qemu_img_pipe_and_status('create', '-f', 'luks', 1218 '--object', luks_default_secret_object, 1219 '-o', luks_default_key_secret_opt, 1220 '-o', 'iter-time=10', 1221 img_file, '1G') 1222 try: 1223 os.remove(img_file) 1224 except OSError: 1225 pass 1226 1227 if status != 0: 1228 reason = output 1229 for line in output.splitlines(): 1230 if img_file + ':' in line: 1231 reason = line.split(img_file + ':', 1)[1].strip() 1232 break 1233 1234 return (False, reason) 1235 else: 1236 return (True, '') 1237 1238def verify_working_luks(): 1239 """ 1240 Skip test suite if LUKS does not work 1241 """ 1242 (working, reason) = has_working_luks() 1243 if not working: 1244 notrun(reason) 1245 1246def qemu_pipe(*args: str) -> str: 1247 """ 1248 Run qemu with an option to print something and exit (e.g. a help option). 1249 1250 :return: QEMU's stdout output. 1251 """ 1252 full_args = [qemu_prog] + qemu_opts + list(args) 1253 output, _ = qemu_tool_pipe_and_status('qemu', full_args) 1254 return output 1255 1256def supported_formats(read_only=False): 1257 '''Set 'read_only' to True to check ro-whitelist 1258 Otherwise, rw-whitelist is checked''' 1259 1260 if not hasattr(supported_formats, "formats"): 1261 supported_formats.formats = {} 1262 1263 if read_only not in supported_formats.formats: 1264 format_message = qemu_pipe("-drive", "format=help") 1265 line = 1 if read_only else 0 1266 supported_formats.formats[read_only] = \ 1267 format_message.splitlines()[line].split(":")[1].split() 1268 1269 return supported_formats.formats[read_only] 1270 1271def skip_if_unsupported(required_formats=(), read_only=False): 1272 '''Skip Test Decorator 1273 Runs the test if all the required formats are whitelisted''' 1274 def skip_test_decorator(func): 1275 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1276 **kwargs: Dict[str, Any]) -> None: 1277 if callable(required_formats): 1278 fmts = required_formats(test_case) 1279 else: 1280 fmts = required_formats 1281 1282 usf_list = list(set(fmts) - set(supported_formats(read_only))) 1283 if usf_list: 1284 msg = f'{test_case}: formats {usf_list} are not whitelisted' 1285 test_case.case_skip(msg) 1286 else: 1287 func(test_case, *args, **kwargs) 1288 return func_wrapper 1289 return skip_test_decorator 1290 1291def skip_for_formats(formats: Sequence[str] = ()) \ 1292 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]], 1293 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]: 1294 '''Skip Test Decorator 1295 Skips the test for the given formats''' 1296 def skip_test_decorator(func): 1297 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1298 **kwargs: Dict[str, Any]) -> None: 1299 if imgfmt in formats: 1300 msg = f'{test_case}: Skipped for format {imgfmt}' 1301 test_case.case_skip(msg) 1302 else: 1303 func(test_case, *args, **kwargs) 1304 return func_wrapper 1305 return skip_test_decorator 1306 1307def skip_if_user_is_root(func): 1308 '''Skip Test Decorator 1309 Runs the test only without root permissions''' 1310 def func_wrapper(*args, **kwargs): 1311 if os.getuid() == 0: 1312 case_notrun('{}: cannot be run as root'.format(args[0])) 1313 return None 1314 else: 1315 return func(*args, **kwargs) 1316 return func_wrapper 1317 1318# We need to filter out the time taken from the output so that 1319# qemu-iotest can reliably diff the results against master output, 1320# and hide skipped tests from the reference output. 1321 1322class ReproducibleTestResult(unittest.TextTestResult): 1323 def addSkip(self, test, reason): 1324 # Same as TextTestResult, but print dot instead of "s" 1325 unittest.TestResult.addSkip(self, test, reason) 1326 if self.showAll: 1327 self.stream.writeln("skipped {0!r}".format(reason)) 1328 elif self.dots: 1329 self.stream.write(".") 1330 self.stream.flush() 1331 1332class ReproducibleStreamWrapper: 1333 def __init__(self, stream: TextIO): 1334 self.stream = stream 1335 1336 def __getattr__(self, attr): 1337 if attr in ('stream', '__getstate__'): 1338 raise AttributeError(attr) 1339 return getattr(self.stream, attr) 1340 1341 def write(self, arg=None): 1342 arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg) 1343 arg = re.sub(r' \(skipped=\d+\)', r'', arg) 1344 self.stream.write(arg) 1345 1346class ReproducibleTestRunner(unittest.TextTestRunner): 1347 def __init__(self, stream: Optional[TextIO] = None, 1348 resultclass: Type[unittest.TestResult] = ReproducibleTestResult, 1349 **kwargs: Any) -> None: 1350 rstream = ReproducibleStreamWrapper(stream or sys.stdout) 1351 super().__init__(stream=rstream, # type: ignore 1352 descriptions=True, 1353 resultclass=resultclass, 1354 **kwargs) 1355 1356def execute_unittest(argv: List[str], debug: bool = False) -> None: 1357 """Executes unittests within the calling module.""" 1358 1359 # Some tests have warnings, especially ResourceWarnings for unclosed 1360 # files and sockets. Ignore them for now to ensure reproducibility of 1361 # the test output. 1362 unittest.main(argv=argv, 1363 testRunner=ReproducibleTestRunner, 1364 verbosity=2 if debug else 1, 1365 warnings=None if sys.warnoptions else 'ignore') 1366 1367def execute_setup_common(supported_fmts: Sequence[str] = (), 1368 supported_platforms: Sequence[str] = (), 1369 supported_cache_modes: Sequence[str] = (), 1370 supported_aio_modes: Sequence[str] = (), 1371 unsupported_fmts: Sequence[str] = (), 1372 supported_protocols: Sequence[str] = (), 1373 unsupported_protocols: Sequence[str] = (), 1374 required_fmts: Sequence[str] = ()) -> bool: 1375 """ 1376 Perform necessary setup for either script-style or unittest-style tests. 1377 1378 :return: Bool; Whether or not debug mode has been requested via the CLI. 1379 """ 1380 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'. 1381 1382 debug = '-d' in sys.argv 1383 if debug: 1384 sys.argv.remove('-d') 1385 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN)) 1386 1387 _verify_image_format(supported_fmts, unsupported_fmts) 1388 _verify_protocol(supported_protocols, unsupported_protocols) 1389 _verify_platform(supported=supported_platforms) 1390 _verify_cache_mode(supported_cache_modes) 1391 _verify_aio_mode(supported_aio_modes) 1392 _verify_formats(required_fmts) 1393 _verify_virtio_blk() 1394 1395 return debug 1396 1397def execute_test(*args, test_function=None, **kwargs): 1398 """Run either unittest or script-style tests.""" 1399 1400 debug = execute_setup_common(*args, **kwargs) 1401 if not test_function: 1402 execute_unittest(sys.argv, debug) 1403 else: 1404 test_function() 1405 1406def activate_logging(): 1407 """Activate iotests.log() output to stdout for script-style tests.""" 1408 handler = logging.StreamHandler(stream=sys.stdout) 1409 formatter = logging.Formatter('%(message)s') 1410 handler.setFormatter(formatter) 1411 test_logger.addHandler(handler) 1412 test_logger.setLevel(logging.INFO) 1413 test_logger.propagate = False 1414 1415# This is called from script-style iotests without a single point of entry 1416def script_initialize(*args, **kwargs): 1417 """Initialize script-style tests without running any tests.""" 1418 activate_logging() 1419 execute_setup_common(*args, **kwargs) 1420 1421# This is called from script-style iotests with a single point of entry 1422def script_main(test_function, *args, **kwargs): 1423 """Run script-style tests outside of the unittest framework""" 1424 activate_logging() 1425 execute_test(*args, test_function=test_function, **kwargs) 1426 1427# This is called from unittest style iotests 1428def main(*args, **kwargs): 1429 """Run tests using the unittest framework""" 1430 execute_test(*args, **kwargs)