__init__.py (22392B)
1# Test class and utilities for functional tests 2# 3# Copyright (c) 2018 Red Hat, Inc. 4# 5# Author: 6# Cleber Rosa <crosa@redhat.com> 7# 8# This work is licensed under the terms of the GNU GPL, version 2 or 9# later. See the COPYING file in the top-level directory. 10 11import logging 12import os 13import shutil 14import sys 15import tempfile 16import time 17import uuid 18 19import avocado 20from avocado.utils import cloudinit, datadrainer, network, ssh, vmimage 21from avocado.utils.path import find_command 22 23#: The QEMU build root directory. It may also be the source directory 24#: if building from the source dir, but it's safer to use BUILD_DIR for 25#: that purpose. Be aware that if this code is moved outside of a source 26#: and build tree, it will not be accurate. 27BUILD_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) 28 29if os.path.islink(os.path.dirname(os.path.dirname(__file__))): 30 # The link to the acceptance tests dir in the source code directory 31 lnk = os.path.dirname(os.path.dirname(__file__)) 32 #: The QEMU root source directory 33 SOURCE_DIR = os.path.dirname(os.path.dirname(os.readlink(lnk))) 34else: 35 SOURCE_DIR = BUILD_DIR 36 37sys.path.append(os.path.join(SOURCE_DIR, 'python')) 38 39from qemu.machine import QEMUMachine 40from qemu.utils import (get_info_usernet_hostfwd_port, kvm_available, 41 tcg_available) 42 43 44def is_readable_executable_file(path): 45 return os.path.isfile(path) and os.access(path, os.R_OK | os.X_OK) 46 47 48def pick_default_qemu_bin(arch=None): 49 """ 50 Picks the path of a QEMU binary, starting either in the current working 51 directory or in the source tree root directory. 52 53 :param arch: the arch to use when looking for a QEMU binary (the target 54 will match the arch given). If None (the default), arch 55 will be the current host system arch (as given by 56 :func:`os.uname`). 57 :type arch: str 58 :returns: the path to the default QEMU binary or None if one could not 59 be found 60 :rtype: str or None 61 """ 62 if arch is None: 63 arch = os.uname()[4] 64 # qemu binary path does not match arch for powerpc, handle it 65 if 'ppc64le' in arch: 66 arch = 'ppc64' 67 qemu_bin_relative_path = "./qemu-system-%s" % arch 68 if is_readable_executable_file(qemu_bin_relative_path): 69 return qemu_bin_relative_path 70 71 qemu_bin_from_bld_dir_path = os.path.join(BUILD_DIR, 72 qemu_bin_relative_path) 73 if is_readable_executable_file(qemu_bin_from_bld_dir_path): 74 return qemu_bin_from_bld_dir_path 75 return None 76 77 78def _console_interaction(test, success_message, failure_message, 79 send_string, keep_sending=False, vm=None): 80 assert not keep_sending or send_string 81 if vm is None: 82 vm = test.vm 83 console = vm.console_socket.makefile(mode='rb', encoding='utf-8') 84 console_logger = logging.getLogger('console') 85 while True: 86 if send_string: 87 vm.console_socket.sendall(send_string.encode()) 88 if not keep_sending: 89 send_string = None # send only once 90 try: 91 msg = console.readline().decode().strip() 92 except UnicodeDecodeError: 93 msg = None 94 if not msg: 95 continue 96 console_logger.debug(msg) 97 if success_message is None or success_message in msg: 98 break 99 if failure_message and failure_message in msg: 100 console.close() 101 fail = 'Failure message found in console: "%s". Expected: "%s"' % \ 102 (failure_message, success_message) 103 test.fail(fail) 104 105def interrupt_interactive_console_until_pattern(test, success_message, 106 failure_message=None, 107 interrupt_string='\r'): 108 """ 109 Keep sending a string to interrupt a console prompt, while logging the 110 console output. Typical use case is to break a boot loader prompt, such: 111 112 Press a key within 5 seconds to interrupt boot process. 113 5 114 4 115 3 116 2 117 1 118 Booting default image... 119 120 :param test: an Avocado test containing a VM that will have its console 121 read and probed for a success or failure message 122 :type test: :class:`avocado_qemu.Test` 123 :param success_message: if this message appears, test succeeds 124 :param failure_message: if this message appears, test fails 125 :param interrupt_string: a string to send to the console before trying 126 to read a new line 127 """ 128 _console_interaction(test, success_message, failure_message, 129 interrupt_string, True) 130 131def wait_for_console_pattern(test, success_message, failure_message=None, 132 vm=None): 133 """ 134 Waits for messages to appear on the console, while logging the content 135 136 :param test: an Avocado test containing a VM that will have its console 137 read and probed for a success or failure message 138 :type test: :class:`avocado_qemu.Test` 139 :param success_message: if this message appears, test succeeds 140 :param failure_message: if this message appears, test fails 141 """ 142 _console_interaction(test, success_message, failure_message, None, vm=vm) 143 144def exec_command(test, command): 145 """ 146 Send a command to a console (appending CRLF characters), while logging 147 the content. 148 149 :param test: an Avocado test containing a VM. 150 :type test: :class:`avocado_qemu.Test` 151 :param command: the command to send 152 :type command: str 153 """ 154 _console_interaction(test, None, None, command + '\r') 155 156def exec_command_and_wait_for_pattern(test, command, 157 success_message, failure_message=None): 158 """ 159 Send a command to a console (appending CRLF characters), then wait 160 for success_message to appear on the console, while logging the. 161 content. Mark the test as failed if failure_message is found instead. 162 163 :param test: an Avocado test containing a VM that will have its console 164 read and probed for a success or failure message 165 :type test: :class:`avocado_qemu.Test` 166 :param command: the command to send 167 :param success_message: if this message appears, test succeeds 168 :param failure_message: if this message appears, test fails 169 """ 170 _console_interaction(test, success_message, failure_message, command + '\r') 171 172class Test(avocado.Test): 173 def _get_unique_tag_val(self, tag_name): 174 """ 175 Gets a tag value, if unique for a key 176 """ 177 vals = self.tags.get(tag_name, []) 178 if len(vals) == 1: 179 return vals.pop() 180 return None 181 182 def require_accelerator(self, accelerator): 183 """ 184 Requires an accelerator to be available for the test to continue 185 186 It takes into account the currently set qemu binary. 187 188 If the check fails, the test is canceled. If the check itself 189 for the given accelerator is not available, the test is also 190 canceled. 191 192 :param accelerator: name of the accelerator, such as "kvm" or "tcg" 193 :type accelerator: str 194 """ 195 checker = {'tcg': tcg_available, 196 'kvm': kvm_available}.get(accelerator) 197 if checker is None: 198 self.cancel("Don't know how to check for the presence " 199 "of accelerator %s" % accelerator) 200 if not checker(qemu_bin=self.qemu_bin): 201 self.cancel("%s accelerator does not seem to be " 202 "available" % accelerator) 203 204 def setUp(self): 205 self._vms = {} 206 207 self.arch = self.params.get('arch', 208 default=self._get_unique_tag_val('arch')) 209 210 self.cpu = self.params.get('cpu', 211 default=self._get_unique_tag_val('cpu')) 212 213 self.machine = self.params.get('machine', 214 default=self._get_unique_tag_val('machine')) 215 216 default_qemu_bin = pick_default_qemu_bin(arch=self.arch) 217 self.qemu_bin = self.params.get('qemu_bin', 218 default=default_qemu_bin) 219 if self.qemu_bin is None: 220 self.cancel("No QEMU binary defined or found in the build tree") 221 222 def _new_vm(self, name, *args): 223 self._sd = tempfile.TemporaryDirectory(prefix="avo_qemu_sock_") 224 vm = QEMUMachine(self.qemu_bin, base_temp_dir=self.workdir, 225 sock_dir=self._sd.name, log_dir=self.logdir) 226 self.log.debug('QEMUMachine "%s" created', name) 227 self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir) 228 self.log.debug('QEMUMachine "%s" log_dir: %s', name, vm.log_dir) 229 if args: 230 vm.add_args(*args) 231 return vm 232 233 @property 234 def vm(self): 235 return self.get_vm(name='default') 236 237 def get_vm(self, *args, name=None): 238 if not name: 239 name = str(uuid.uuid4()) 240 if self._vms.get(name) is None: 241 self._vms[name] = self._new_vm(name, *args) 242 if self.cpu is not None: 243 self._vms[name].add_args('-cpu', self.cpu) 244 if self.machine is not None: 245 self._vms[name].set_machine(self.machine) 246 return self._vms[name] 247 248 def set_vm_arg(self, arg, value): 249 """ 250 Set an argument to list of extra arguments to be given to the QEMU 251 binary. If the argument already exists then its value is replaced. 252 253 :param arg: the QEMU argument, such as "-cpu" in "-cpu host" 254 :type arg: str 255 :param value: the argument value, such as "host" in "-cpu host" 256 :type value: str 257 """ 258 if not arg or not value: 259 return 260 if arg not in self.vm.args: 261 self.vm.args.extend([arg, value]) 262 else: 263 idx = self.vm.args.index(arg) + 1 264 if idx < len(self.vm.args): 265 self.vm.args[idx] = value 266 else: 267 self.vm.args.append(value) 268 269 def tearDown(self): 270 for vm in self._vms.values(): 271 vm.shutdown() 272 self._sd = None 273 super().tearDown() 274 275 def fetch_asset(self, name, 276 asset_hash=None, algorithm=None, 277 locations=None, expire=None, 278 find_only=False, cancel_on_missing=True): 279 return super().fetch_asset(name, 280 asset_hash=asset_hash, 281 algorithm=algorithm, 282 locations=locations, 283 expire=expire, 284 find_only=find_only, 285 cancel_on_missing=cancel_on_missing) 286 287 288class LinuxSSHMixIn: 289 """Contains utility methods for interacting with a guest via SSH.""" 290 291 def ssh_connect(self, username, credential, credential_is_key=True): 292 self.ssh_logger = logging.getLogger('ssh') 293 res = self.vm.command('human-monitor-command', 294 command_line='info usernet') 295 port = get_info_usernet_hostfwd_port(res) 296 self.assertIsNotNone(port) 297 self.assertGreater(port, 0) 298 self.log.debug('sshd listening on port: %d', port) 299 if credential_is_key: 300 self.ssh_session = ssh.Session('127.0.0.1', port=port, 301 user=username, key=credential) 302 else: 303 self.ssh_session = ssh.Session('127.0.0.1', port=port, 304 user=username, password=credential) 305 for i in range(10): 306 try: 307 self.ssh_session.connect() 308 return 309 except: 310 time.sleep(i) 311 self.fail('ssh connection timeout') 312 313 def ssh_command(self, command): 314 self.ssh_logger.info(command) 315 result = self.ssh_session.cmd(command) 316 stdout_lines = [line.rstrip() for line 317 in result.stdout_text.splitlines()] 318 for line in stdout_lines: 319 self.ssh_logger.info(line) 320 stderr_lines = [line.rstrip() for line 321 in result.stderr_text.splitlines()] 322 for line in stderr_lines: 323 self.ssh_logger.warning(line) 324 325 self.assertEqual(result.exit_status, 0, 326 f'Guest command failed: {command}') 327 return stdout_lines, stderr_lines 328 329class LinuxDistro: 330 """Represents a Linux distribution 331 332 Holds information of known distros. 333 """ 334 #: A collection of known distros and their respective image checksum 335 KNOWN_DISTROS = { 336 'fedora': { 337 '31': { 338 'x86_64': 339 {'checksum': ('e3c1b309d9203604922d6e255c2c5d09' 340 '8a309c2d46215d8fc026954f3c5c27a0'), 341 'pxeboot_url': ('https://archives.fedoraproject.org/' 342 'pub/archive/fedora/linux/releases/31/' 343 'Everything/x86_64/os/images/pxeboot/'), 344 'kernel_params': ('root=UUID=b1438b9b-2cab-4065-a99a-' 345 '08a96687f73c ro no_timer_check ' 346 'net.ifnames=0 console=tty1 ' 347 'console=ttyS0,115200n8'), 348 }, 349 'aarch64': 350 {'checksum': ('1e18d9c0cf734940c4b5d5ec592facae' 351 'd2af0ad0329383d5639c997fdf16fe49'), 352 'pxeboot_url': 'https://archives.fedoraproject.org/' 353 'pub/archive/fedora/linux/releases/31/' 354 'Everything/aarch64/os/images/pxeboot/', 355 'kernel_params': ('root=UUID=b6950a44-9f3c-4076-a9c2-' 356 '355e8475b0a7 ro earlyprintk=pl011,0x9000000' 357 ' ignore_loglevel no_timer_check' 358 ' printk.time=1 rd_NO_PLYMOUTH' 359 ' console=ttyAMA0'), 360 }, 361 'ppc64': 362 {'checksum': ('7c3528b85a3df4b2306e892199a9e1e4' 363 '3f991c506f2cc390dc4efa2026ad2f58')}, 364 's390x': 365 {'checksum': ('4caaab5a434fd4d1079149a072fdc789' 366 '1e354f834d355069ca982fdcaf5a122d')}, 367 }, 368 '32': { 369 'aarch64': 370 {'checksum': ('b367755c664a2d7a26955bbfff985855' 371 'adfa2ca15e908baf15b4b176d68d3967'), 372 'pxeboot_url': ('http://dl.fedoraproject.org/pub/fedora/linux/' 373 'releases/32/Server/aarch64/os/images/' 374 'pxeboot/'), 375 'kernel_params': ('root=UUID=3df75b65-be8d-4db4-8655-' 376 '14d95c0e90c5 ro no_timer_check net.ifnames=0' 377 ' console=tty1 console=ttyS0,115200n8'), 378 }, 379 }, 380 '33': { 381 'aarch64': 382 {'checksum': ('e7f75cdfd523fe5ac2ca9eeece68edc1' 383 'a81f386a17f969c1d1c7c87031008a6b'), 384 'pxeboot_url': ('http://dl.fedoraproject.org/pub/fedora/linux/' 385 'releases/33/Server/aarch64/os/images/' 386 'pxeboot/'), 387 'kernel_params': ('root=UUID=d20b3ffa-6397-4a63-a734-' 388 '1126a0208f8a ro no_timer_check net.ifnames=0' 389 ' console=tty1 console=ttyS0,115200n8' 390 ' console=tty0'), 391 }, 392 }, 393 } 394 } 395 396 def __init__(self, name, version, arch): 397 self.name = name 398 self.version = version 399 self.arch = arch 400 try: 401 info = self.KNOWN_DISTROS.get(name).get(version).get(arch) 402 except AttributeError: 403 # Unknown distro 404 info = None 405 self._info = info or {} 406 407 @property 408 def checksum(self): 409 """Gets the cloud-image file checksum""" 410 return self._info.get('checksum', None) 411 412 @checksum.setter 413 def checksum(self, value): 414 self._info['checksum'] = value 415 416 @property 417 def pxeboot_url(self): 418 """Gets the repository url where pxeboot files can be found""" 419 return self._info.get('pxeboot_url', None) 420 421 @property 422 def default_kernel_params(self): 423 """Gets the default kernel parameters""" 424 return self._info.get('kernel_params', None) 425 426 427class LinuxTest(LinuxSSHMixIn, Test): 428 """Facilitates having a cloud-image Linux based available. 429 430 For tests that indend to interact with guests, this is a better choice 431 to start with than the more vanilla `Test` class. 432 """ 433 434 timeout = 900 435 distro = None 436 username = 'root' 437 password = 'password' 438 439 def _set_distro(self): 440 distro_name = self.params.get( 441 'distro', 442 default=self._get_unique_tag_val('distro')) 443 if not distro_name: 444 distro_name = 'fedora' 445 446 distro_version = self.params.get( 447 'distro_version', 448 default=self._get_unique_tag_val('distro_version')) 449 if not distro_version: 450 distro_version = '31' 451 452 self.distro = LinuxDistro(distro_name, distro_version, self.arch) 453 454 # The distro checksum behaves differently than distro name and 455 # version. First, it does not respect a tag with the same 456 # name, given that it's not expected to be used for filtering 457 # (distro name versions are the natural choice). Second, the 458 # order of precedence is: parameter, attribute and then value 459 # from KNOWN_DISTROS. 460 distro_checksum = self.params.get('distro_checksum', 461 default=None) 462 if distro_checksum: 463 self.distro.checksum = distro_checksum 464 465 def setUp(self, ssh_pubkey=None, network_device_type='virtio-net'): 466 super().setUp() 467 self._set_distro() 468 self.vm.add_args('-smp', '2') 469 self.vm.add_args('-m', '1024') 470 # The following network device allows for SSH connections 471 self.vm.add_args('-netdev', 'user,id=vnet,hostfwd=:127.0.0.1:0-:22', 472 '-device', '%s,netdev=vnet' % network_device_type) 473 self.set_up_boot() 474 if ssh_pubkey is None: 475 ssh_pubkey, self.ssh_key = self.set_up_existing_ssh_keys() 476 self.set_up_cloudinit(ssh_pubkey) 477 478 def set_up_existing_ssh_keys(self): 479 ssh_public_key = os.path.join(SOURCE_DIR, 'tests', 'keys', 'id_rsa.pub') 480 source_private_key = os.path.join(SOURCE_DIR, 'tests', 'keys', 'id_rsa') 481 ssh_dir = os.path.join(self.workdir, '.ssh') 482 os.mkdir(ssh_dir, mode=0o700) 483 ssh_private_key = os.path.join(ssh_dir, 484 os.path.basename(source_private_key)) 485 shutil.copyfile(source_private_key, ssh_private_key) 486 os.chmod(ssh_private_key, 0o600) 487 return (ssh_public_key, ssh_private_key) 488 489 def download_boot(self): 490 self.log.debug('Looking for and selecting a qemu-img binary to be ' 491 'used to create the bootable snapshot image') 492 # If qemu-img has been built, use it, otherwise the system wide one 493 # will be used. If none is available, the test will cancel. 494 qemu_img = os.path.join(BUILD_DIR, 'qemu-img') 495 if not os.path.exists(qemu_img): 496 qemu_img = find_command('qemu-img', False) 497 if qemu_img is False: 498 self.cancel('Could not find "qemu-img", which is required to ' 499 'create the bootable image') 500 vmimage.QEMU_IMG = qemu_img 501 502 self.log.info('Downloading/preparing boot image') 503 # Fedora 31 only provides ppc64le images 504 image_arch = self.arch 505 if self.distro.name == 'fedora': 506 if image_arch == 'ppc64': 507 image_arch = 'ppc64le' 508 509 try: 510 boot = vmimage.get( 511 self.distro.name, arch=image_arch, version=self.distro.version, 512 checksum=self.distro.checksum, 513 algorithm='sha256', 514 cache_dir=self.cache_dirs[0], 515 snapshot_dir=self.workdir) 516 except: 517 self.cancel('Failed to download/prepare boot image') 518 return boot.path 519 520 def prepare_cloudinit(self, ssh_pubkey=None): 521 self.log.info('Preparing cloudinit image') 522 try: 523 cloudinit_iso = os.path.join(self.workdir, 'cloudinit.iso') 524 self.phone_home_port = network.find_free_port() 525 pubkey_content = None 526 if ssh_pubkey: 527 with open(ssh_pubkey) as pubkey: 528 pubkey_content = pubkey.read() 529 cloudinit.iso(cloudinit_iso, self.name, 530 username=self.username, 531 password=self.password, 532 # QEMU's hard coded usermode router address 533 phone_home_host='10.0.2.2', 534 phone_home_port=self.phone_home_port, 535 authorized_key=pubkey_content) 536 except Exception: 537 self.cancel('Failed to prepare the cloudinit image') 538 return cloudinit_iso 539 540 def set_up_boot(self): 541 path = self.download_boot() 542 self.vm.add_args('-drive', 'file=%s' % path) 543 544 def set_up_cloudinit(self, ssh_pubkey=None): 545 cloudinit_iso = self.prepare_cloudinit(ssh_pubkey) 546 self.vm.add_args('-drive', 'file=%s,format=raw' % cloudinit_iso) 547 548 def launch_and_wait(self, set_up_ssh_connection=True): 549 self.vm.set_console() 550 self.vm.launch() 551 console_drainer = datadrainer.LineLogger(self.vm.console_socket.fileno(), 552 logger=self.log.getChild('console')) 553 console_drainer.start() 554 self.log.info('VM launched, waiting for boot confirmation from guest') 555 cloudinit.wait_for_phone_home(('0.0.0.0', self.phone_home_port), self.name) 556 if set_up_ssh_connection: 557 self.log.info('Setting up the SSH connection') 558 self.ssh_connect(self.username, self.ssh_key)