cachepc-qemu

Fork of AMDESE/qemu with changes for cachepc side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-qemu
Log | Files | Refs | Submodules | LICENSE | sfeed.txt

testenv.py (11233B)


      1# TestEnv class to manage test environment variables.
      2#
      3# Copyright (c) 2020-2021 Virtuozzo International GmbH
      4#
      5# This program is free software; you can redistribute it and/or modify
      6# it under the terms of the GNU General Public License as published by
      7# the Free Software Foundation; either version 2 of the License, or
      8# (at your option) any later version.
      9#
     10# This program is distributed in the hope that it will be useful,
     11# but WITHOUT ANY WARRANTY; without even the implied warranty of
     12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     13# GNU General Public License for more details.
     14#
     15# You should have received a copy of the GNU General Public License
     16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
     17#
     18
     19import os
     20import sys
     21import tempfile
     22from pathlib import Path
     23import shutil
     24import collections
     25import random
     26import subprocess
     27import glob
     28from typing import List, Dict, Any, Optional, ContextManager
     29
     30DEF_GDB_OPTIONS = 'localhost:12345'
     31
     32def isxfile(path: str) -> bool:
     33    return os.path.isfile(path) and os.access(path, os.X_OK)
     34
     35
     36def get_default_machine(qemu_prog: str) -> str:
     37    outp = subprocess.run([qemu_prog, '-machine', 'help'], check=True,
     38                          universal_newlines=True,
     39                          stdout=subprocess.PIPE).stdout
     40
     41    machines = outp.split('\n')
     42    try:
     43        default_machine = next(m for m in machines if m.endswith(' (default)'))
     44    except StopIteration:
     45        return ''
     46    default_machine = default_machine.split(' ', 1)[0]
     47
     48    alias_suf = ' (alias of {})'.format(default_machine)
     49    alias = next((m for m in machines if m.endswith(alias_suf)), None)
     50    if alias is not None:
     51        default_machine = alias.split(' ', 1)[0]
     52
     53    return default_machine
     54
     55
     56class TestEnv(ContextManager['TestEnv']):
     57    """
     58    Manage system environment for running tests
     59
     60    The following variables are supported/provided. They are represented by
     61    lower-cased TestEnv attributes.
     62    """
     63
     64    # We store environment variables as instance attributes, and there are a
     65    # lot of them. Silence pylint:
     66    # pylint: disable=too-many-instance-attributes
     67
     68    env_variables = ['PYTHONPATH', 'TEST_DIR', 'SOCK_DIR', 'SAMPLE_IMG_DIR',
     69                     'OUTPUT_DIR', 'PYTHON', 'QEMU_PROG', 'QEMU_IMG_PROG',
     70                     'QEMU_IO_PROG', 'QEMU_NBD_PROG', 'QSD_PROG',
     71                     'QEMU_OPTIONS', 'QEMU_IMG_OPTIONS',
     72                     'QEMU_IO_OPTIONS', 'QEMU_IO_OPTIONS_NO_FMT',
     73                     'QEMU_NBD_OPTIONS', 'IMGOPTS', 'IMGFMT', 'IMGPROTO',
     74                     'AIOMODE', 'CACHEMODE', 'VALGRIND_QEMU',
     75                     'CACHEMODE_IS_DEFAULT', 'IMGFMT_GENERIC', 'IMGOPTSSYNTAX',
     76                     'IMGKEYSECRET', 'QEMU_DEFAULT_MACHINE', 'MALLOC_PERTURB_',
     77                     'GDB_OPTIONS', 'PRINT_QEMU']
     78
     79    def prepare_subprocess(self, args: List[str]) -> Dict[str, str]:
     80        if self.debug:
     81            args.append('-d')
     82
     83        with open(args[0], encoding="utf-8") as f:
     84            try:
     85                if f.readline().rstrip() == '#!/usr/bin/env python3':
     86                    args.insert(0, self.python)
     87            except UnicodeDecodeError:  # binary test? for future.
     88                pass
     89
     90        os_env = os.environ.copy()
     91        os_env.update(self.get_env())
     92        return os_env
     93
     94    def get_env(self) -> Dict[str, str]:
     95        env = {}
     96        for v in self.env_variables:
     97            val = getattr(self, v.lower(), None)
     98            if val is not None:
     99                env[v] = val
    100
    101        return env
    102
    103    def init_directories(self) -> None:
    104        """Init directory variables:
    105             PYTHONPATH
    106             TEST_DIR
    107             SOCK_DIR
    108             SAMPLE_IMG_DIR
    109             OUTPUT_DIR
    110        """
    111
    112        # Path where qemu goodies live in this source tree.
    113        qemu_srctree_path = Path(__file__, '../../../python').resolve()
    114
    115        self.pythonpath = os.pathsep.join(filter(None, (
    116            self.source_iotests,
    117            str(qemu_srctree_path),
    118            os.getenv('PYTHONPATH'),
    119        )))
    120
    121        self.test_dir = os.getenv('TEST_DIR',
    122                                  os.path.join(os.getcwd(), 'scratch'))
    123        Path(self.test_dir).mkdir(parents=True, exist_ok=True)
    124
    125        try:
    126            self.sock_dir = os.environ['SOCK_DIR']
    127            self.tmp_sock_dir = False
    128            Path(self.sock_dir).mkdir(parents=True, exist_ok=True)
    129        except KeyError:
    130            self.sock_dir = tempfile.mkdtemp()
    131            self.tmp_sock_dir = True
    132
    133        self.sample_img_dir = os.getenv('SAMPLE_IMG_DIR',
    134                                        os.path.join(self.source_iotests,
    135                                                     'sample_images'))
    136
    137        self.output_dir = os.getcwd()  # OUTPUT_DIR
    138
    139    def init_binaries(self) -> None:
    140        """Init binary path variables:
    141             PYTHON (for bash tests)
    142             QEMU_PROG, QEMU_IMG_PROG, QEMU_IO_PROG, QEMU_NBD_PROG, QSD_PROG
    143        """
    144        self.python = sys.executable
    145
    146        def root(*names: str) -> str:
    147            return os.path.join(self.build_root, *names)
    148
    149        arch = os.uname().machine
    150        if 'ppc64' in arch:
    151            arch = 'ppc64'
    152
    153        self.qemu_prog = os.getenv('QEMU_PROG', root(f'qemu-system-{arch}'))
    154        if not os.path.exists(self.qemu_prog):
    155            pattern = root('qemu-system-*')
    156            try:
    157                progs = sorted(glob.iglob(pattern))
    158                self.qemu_prog = next(p for p in progs if isxfile(p))
    159            except StopIteration:
    160                sys.exit("Not found any Qemu executable binary by pattern "
    161                         f"'{pattern}'")
    162
    163        self.qemu_img_prog = os.getenv('QEMU_IMG_PROG', root('qemu-img'))
    164        self.qemu_io_prog = os.getenv('QEMU_IO_PROG', root('qemu-io'))
    165        self.qemu_nbd_prog = os.getenv('QEMU_NBD_PROG', root('qemu-nbd'))
    166        self.qsd_prog = os.getenv('QSD_PROG', root('storage-daemon',
    167                                                   'qemu-storage-daemon'))
    168
    169        for b in [self.qemu_img_prog, self.qemu_io_prog, self.qemu_nbd_prog,
    170                  self.qemu_prog, self.qsd_prog]:
    171            if not os.path.exists(b):
    172                sys.exit('No such file: ' + b)
    173            if not isxfile(b):
    174                sys.exit('Not executable: ' + b)
    175
    176    def __init__(self, imgfmt: str, imgproto: str, aiomode: str,
    177                 cachemode: Optional[str] = None,
    178                 imgopts: Optional[str] = None,
    179                 misalign: bool = False,
    180                 debug: bool = False,
    181                 valgrind: bool = False,
    182                 gdb: bool = False,
    183                 qprint: bool = False) -> None:
    184        self.imgfmt = imgfmt
    185        self.imgproto = imgproto
    186        self.aiomode = aiomode
    187        self.imgopts = imgopts
    188        self.misalign = misalign
    189        self.debug = debug
    190
    191        if qprint:
    192            self.print_qemu = 'y'
    193
    194        if gdb:
    195            self.gdb_options = os.getenv('GDB_OPTIONS', DEF_GDB_OPTIONS)
    196            if not self.gdb_options:
    197                # cover the case 'export GDB_OPTIONS='
    198                self.gdb_options = DEF_GDB_OPTIONS
    199        elif 'GDB_OPTIONS' in os.environ:
    200            # to not propagate it in prepare_subprocess()
    201            del os.environ['GDB_OPTIONS']
    202
    203        if valgrind:
    204            self.valgrind_qemu = 'y'
    205
    206        if cachemode is None:
    207            self.cachemode_is_default = 'true'
    208            self.cachemode = 'writeback'
    209        else:
    210            self.cachemode_is_default = 'false'
    211            self.cachemode = cachemode
    212
    213        # Initialize generic paths: build_root, build_iotests, source_iotests,
    214        # which are needed to initialize some environment variables. They are
    215        # used by init_*() functions as well.
    216
    217        if os.path.islink(sys.argv[0]):
    218            # called from the build tree
    219            self.source_iotests = os.path.dirname(os.readlink(sys.argv[0]))
    220            self.build_iotests = os.path.dirname(os.path.abspath(sys.argv[0]))
    221        else:
    222            # called from the source tree
    223            self.source_iotests = os.getcwd()
    224            self.build_iotests = self.source_iotests
    225
    226        self.build_root = os.path.join(self.build_iotests, '..', '..')
    227
    228        self.init_directories()
    229        self.init_binaries()
    230
    231        self.malloc_perturb_ = os.getenv('MALLOC_PERTURB_',
    232                                         str(random.randrange(1, 255)))
    233
    234        # QEMU_OPTIONS
    235        self.qemu_options = '-nodefaults -display none -accel qtest'
    236        machine_map = (
    237            ('arm', 'virt'),
    238            ('aarch64', 'virt'),
    239            ('avr', 'mega2560'),
    240            ('m68k', 'virt'),
    241            ('rx', 'gdbsim-r5f562n8'),
    242            ('tricore', 'tricore_testboard')
    243        )
    244        for suffix, machine in machine_map:
    245            if self.qemu_prog.endswith(f'qemu-system-{suffix}'):
    246                self.qemu_options += f' -machine {machine}'
    247
    248        # QEMU_DEFAULT_MACHINE
    249        self.qemu_default_machine = get_default_machine(self.qemu_prog)
    250
    251        self.qemu_img_options = os.getenv('QEMU_IMG_OPTIONS')
    252        self.qemu_nbd_options = os.getenv('QEMU_NBD_OPTIONS')
    253
    254        is_generic = self.imgfmt not in ['bochs', 'cloop', 'dmg']
    255        self.imgfmt_generic = 'true' if is_generic else 'false'
    256
    257        self.qemu_io_options = f'--cache {self.cachemode} --aio {self.aiomode}'
    258        if self.misalign:
    259            self.qemu_io_options += ' --misalign'
    260
    261        self.qemu_io_options_no_fmt = self.qemu_io_options
    262
    263        if self.imgfmt == 'luks':
    264            self.imgoptssyntax = 'true'
    265            self.imgkeysecret = '123456'
    266            if not self.imgopts:
    267                self.imgopts = 'iter-time=10'
    268            elif 'iter-time=' not in self.imgopts:
    269                self.imgopts += ',iter-time=10'
    270        else:
    271            self.imgoptssyntax = 'false'
    272            self.qemu_io_options += ' -f ' + self.imgfmt
    273
    274        if self.imgfmt == 'vmdk':
    275            if not self.imgopts:
    276                self.imgopts = 'zeroed_grain=on'
    277            elif 'zeroed_grain=' not in self.imgopts:
    278                self.imgopts += ',zeroed_grain=on'
    279
    280    def close(self) -> None:
    281        if self.tmp_sock_dir:
    282            shutil.rmtree(self.sock_dir)
    283
    284    def __enter__(self) -> 'TestEnv':
    285        return self
    286
    287    def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
    288        self.close()
    289
    290    def print_env(self) -> None:
    291        template = """\
    292QEMU          -- "{QEMU_PROG}" {QEMU_OPTIONS}
    293QEMU_IMG      -- "{QEMU_IMG_PROG}" {QEMU_IMG_OPTIONS}
    294QEMU_IO       -- "{QEMU_IO_PROG}" {QEMU_IO_OPTIONS}
    295QEMU_NBD      -- "{QEMU_NBD_PROG}" {QEMU_NBD_OPTIONS}
    296IMGFMT        -- {IMGFMT}{imgopts}
    297IMGPROTO      -- {IMGPROTO}
    298PLATFORM      -- {platform}
    299TEST_DIR      -- {TEST_DIR}
    300SOCK_DIR      -- {SOCK_DIR}
    301GDB_OPTIONS   -- {GDB_OPTIONS}
    302VALGRIND_QEMU -- {VALGRIND_QEMU}
    303PRINT_QEMU_OUTPUT -- {PRINT_QEMU}
    304"""
    305
    306        args = collections.defaultdict(str, self.get_env())
    307
    308        if 'IMGOPTS' in args:
    309            args['imgopts'] = f" ({args['IMGOPTS']})"
    310
    311        u = os.uname()
    312        args['platform'] = f'{u.sysname}/{u.machine} {u.nodename} {u.release}'
    313
    314        print(template.format_map(args))