cachepc-qemu

Fork of AMDESE/qemu with changes for cachepc side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-qemu
Log | Files | Refs | Submodules | LICENSE | sfeed.txt

testrunner.py (12029B)


      1# Class for actually running tests.
      2#
      3# Copyright (c) 2020-2021 Virtuozzo International GmbH
      4#
      5# This program is free software; you can redistribute it and/or modify
      6# it under the terms of the GNU General Public License as published by
      7# the Free Software Foundation; either version 2 of the License, or
      8# (at your option) any later version.
      9#
     10# This program is distributed in the hope that it will be useful,
     11# but WITHOUT ANY WARRANTY; without even the implied warranty of
     12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     13# GNU General Public License for more details.
     14#
     15# You should have received a copy of the GNU General Public License
     16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
     17#
     18
     19import os
     20from pathlib import Path
     21import datetime
     22import time
     23import difflib
     24import subprocess
     25import contextlib
     26import json
     27import termios
     28import sys
     29from contextlib import contextmanager
     30from typing import List, Optional, Iterator, Any, Sequence, Dict, \
     31        ContextManager
     32
     33from testenv import TestEnv
     34
     35
     36def silent_unlink(path: Path) -> None:
     37    try:
     38        path.unlink()
     39    except OSError:
     40        pass
     41
     42
     43def file_diff(file1: str, file2: str) -> List[str]:
     44    with open(file1, encoding="utf-8") as f1, \
     45         open(file2, encoding="utf-8") as f2:
     46        # We want to ignore spaces at line ends. There are a lot of mess about
     47        # it in iotests.
     48        # TODO: fix all tests to not produce extra spaces, fix all .out files
     49        # and use strict diff here!
     50        seq1 = [line.rstrip() for line in f1]
     51        seq2 = [line.rstrip() for line in f2]
     52        res = [line.rstrip()
     53               for line in difflib.unified_diff(seq1, seq2, file1, file2)]
     54        return res
     55
     56
     57# We want to save current tty settings during test run,
     58# since an aborting qemu call may leave things screwed up.
     59@contextmanager
     60def savetty() -> Iterator[None]:
     61    isterm = sys.stdin.isatty()
     62    if isterm:
     63        fd = sys.stdin.fileno()
     64        attr = termios.tcgetattr(fd)
     65
     66    try:
     67        yield
     68    finally:
     69        if isterm:
     70            termios.tcsetattr(fd, termios.TCSADRAIN, attr)
     71
     72
     73class LastElapsedTime(ContextManager['LastElapsedTime']):
     74    """ Cache for elapsed time for tests, to show it during new test run
     75
     76    It is safe to use get() at any time.  To use update(), you must either
     77    use it inside with-block or use save() after update().
     78    """
     79    def __init__(self, cache_file: str, env: TestEnv) -> None:
     80        self.env = env
     81        self.cache_file = cache_file
     82        self.cache: Dict[str, Dict[str, Dict[str, float]]]
     83
     84        try:
     85            with open(cache_file, encoding="utf-8") as f:
     86                self.cache = json.load(f)
     87        except (OSError, ValueError):
     88            self.cache = {}
     89
     90    def get(self, test: str,
     91            default: Optional[float] = None) -> Optional[float]:
     92        if test not in self.cache:
     93            return default
     94
     95        if self.env.imgproto not in self.cache[test]:
     96            return default
     97
     98        return self.cache[test][self.env.imgproto].get(self.env.imgfmt,
     99                                                       default)
    100
    101    def update(self, test: str, elapsed: float) -> None:
    102        d = self.cache.setdefault(test, {})
    103        d.setdefault(self.env.imgproto, {})[self.env.imgfmt] = elapsed
    104
    105    def save(self) -> None:
    106        with open(self.cache_file, 'w', encoding="utf-8") as f:
    107            json.dump(self.cache, f)
    108
    109    def __enter__(self) -> 'LastElapsedTime':
    110        return self
    111
    112    def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
    113        self.save()
    114
    115
    116class TestResult:
    117    def __init__(self, status: str, description: str = '',
    118                 elapsed: Optional[float] = None, diff: Sequence[str] = (),
    119                 casenotrun: str = '', interrupted: bool = False) -> None:
    120        self.status = status
    121        self.description = description
    122        self.elapsed = elapsed
    123        self.diff = diff
    124        self.casenotrun = casenotrun
    125        self.interrupted = interrupted
    126
    127
    128class TestRunner(ContextManager['TestRunner']):
    129    def __init__(self, env: TestEnv, makecheck: bool = False,
    130                 color: str = 'auto') -> None:
    131        self.env = env
    132        self.makecheck = makecheck
    133        self.last_elapsed = LastElapsedTime('.last-elapsed-cache', env)
    134
    135        assert color in ('auto', 'on', 'off')
    136        self.color = (color == 'on') or (color == 'auto' and
    137                                         sys.stdout.isatty())
    138
    139        self._stack: contextlib.ExitStack
    140
    141    def __enter__(self) -> 'TestRunner':
    142        self._stack = contextlib.ExitStack()
    143        self._stack.enter_context(self.env)
    144        self._stack.enter_context(self.last_elapsed)
    145        self._stack.enter_context(savetty())
    146        return self
    147
    148    def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
    149        self._stack.close()
    150
    151    def test_print_one_line(self, test: str, starttime: str,
    152                            endtime: Optional[str] = None, status: str = '...',
    153                            lasttime: Optional[float] = None,
    154                            thistime: Optional[float] = None,
    155                            description: str = '',
    156                            test_field_width: Optional[int] = None,
    157                            end: str = '\n') -> None:
    158        """ Print short test info before/after test run """
    159        test = os.path.basename(test)
    160
    161        if test_field_width is None:
    162            test_field_width = 8
    163
    164        if self.makecheck and status != '...':
    165            if status and status != 'pass':
    166                status = f' [{status}]'
    167            else:
    168                status = ''
    169
    170            print(f'  TEST   iotest-{self.env.imgfmt}: {test}{status}')
    171            return
    172
    173        if lasttime:
    174            lasttime_s = f' (last: {lasttime:.1f}s)'
    175        else:
    176            lasttime_s = ''
    177        if thistime:
    178            thistime_s = f'{thistime:.1f}s'
    179        else:
    180            thistime_s = '...'
    181
    182        if endtime:
    183            endtime = f'[{endtime}]'
    184        else:
    185            endtime = ''
    186
    187        if self.color:
    188            if status == 'pass':
    189                col = '\033[32m'
    190            elif status == 'fail':
    191                col = '\033[1m\033[31m'
    192            elif status == 'not run':
    193                col = '\033[33m'
    194            else:
    195                col = ''
    196
    197            col_end = '\033[0m'
    198        else:
    199            col = ''
    200            col_end = ''
    201
    202        print(f'{test:{test_field_width}} {col}{status:10}{col_end} '
    203              f'[{starttime}] {endtime:13}{thistime_s:5} {lasttime_s:14} '
    204              f'{description}', end=end)
    205
    206    def find_reference(self, test: str) -> str:
    207        if self.env.cachemode == 'none':
    208            ref = f'{test}.out.nocache'
    209            if os.path.isfile(ref):
    210                return ref
    211
    212        ref = f'{test}.out.{self.env.imgfmt}'
    213        if os.path.isfile(ref):
    214            return ref
    215
    216        ref = f'{test}.{self.env.qemu_default_machine}.out'
    217        if os.path.isfile(ref):
    218            return ref
    219
    220        return f'{test}.out'
    221
    222    def do_run_test(self, test: str) -> TestResult:
    223        f_test = Path(test)
    224        f_bad = Path(f_test.name + '.out.bad')
    225        f_notrun = Path(f_test.name + '.notrun')
    226        f_casenotrun = Path(f_test.name + '.casenotrun')
    227        f_reference = Path(self.find_reference(test))
    228
    229        if not f_test.exists():
    230            return TestResult(status='fail',
    231                              description=f'No such test file: {f_test}')
    232
    233        if not os.access(str(f_test), os.X_OK):
    234            sys.exit(f'Not executable: {f_test}')
    235
    236        if not f_reference.exists():
    237            return TestResult(status='not run',
    238                              description='No qualified output '
    239                                          f'(expected {f_reference})')
    240
    241        for p in (f_bad, f_notrun, f_casenotrun):
    242            silent_unlink(p)
    243
    244        args = [str(f_test.resolve())]
    245        env = self.env.prepare_subprocess(args)
    246
    247        t0 = time.time()
    248        with f_bad.open('w', encoding="utf-8") as f:
    249            with subprocess.Popen(args, cwd=str(f_test.parent), env=env,
    250                                  stdout=f, stderr=subprocess.STDOUT) as proc:
    251                try:
    252                    proc.wait()
    253                except KeyboardInterrupt:
    254                    proc.terminate()
    255                    proc.wait()
    256                    return TestResult(status='not run',
    257                                      description='Interrupted by user',
    258                                      interrupted=True)
    259                ret = proc.returncode
    260
    261        elapsed = round(time.time() - t0, 1)
    262
    263        if ret != 0:
    264            return TestResult(status='fail', elapsed=elapsed,
    265                              description=f'failed, exit status {ret}',
    266                              diff=file_diff(str(f_reference), str(f_bad)))
    267
    268        if f_notrun.exists():
    269            return TestResult(
    270                status='not run',
    271                description=f_notrun.read_text(encoding='utf-8').strip())
    272
    273        casenotrun = ''
    274        if f_casenotrun.exists():
    275            casenotrun = f_casenotrun.read_text(encoding='utf-8')
    276
    277        diff = file_diff(str(f_reference), str(f_bad))
    278        if diff:
    279            return TestResult(status='fail', elapsed=elapsed,
    280                              description=f'output mismatch (see {f_bad})',
    281                              diff=diff, casenotrun=casenotrun)
    282        else:
    283            f_bad.unlink()
    284            self.last_elapsed.update(test, elapsed)
    285            return TestResult(status='pass', elapsed=elapsed,
    286                              casenotrun=casenotrun)
    287
    288    def run_test(self, test: str,
    289                 test_field_width: Optional[int] = None) -> TestResult:
    290        last_el = self.last_elapsed.get(test)
    291        start = datetime.datetime.now().strftime('%H:%M:%S')
    292
    293        if not self.makecheck:
    294            self.test_print_one_line(test=test, starttime=start,
    295                                     lasttime=last_el, end='\r',
    296                                     test_field_width=test_field_width)
    297
    298        res = self.do_run_test(test)
    299
    300        end = datetime.datetime.now().strftime('%H:%M:%S')
    301        self.test_print_one_line(test=test, status=res.status,
    302                                 starttime=start, endtime=end,
    303                                 lasttime=last_el, thistime=res.elapsed,
    304                                 description=res.description,
    305                                 test_field_width=test_field_width)
    306
    307        if res.casenotrun:
    308            print(res.casenotrun)
    309
    310        return res
    311
    312    def run_tests(self, tests: List[str]) -> bool:
    313        n_run = 0
    314        failed = []
    315        notrun = []
    316        casenotrun = []
    317
    318        if not self.makecheck:
    319            self.env.print_env()
    320
    321        test_field_width = max(len(os.path.basename(t)) for t in tests) + 2
    322
    323        for t in tests:
    324            name = os.path.basename(t)
    325            res = self.run_test(t, test_field_width=test_field_width)
    326
    327            assert res.status in ('pass', 'fail', 'not run')
    328
    329            if res.casenotrun:
    330                casenotrun.append(t)
    331
    332            if res.status != 'not run':
    333                n_run += 1
    334
    335            if res.status == 'fail':
    336                failed.append(name)
    337                if self.makecheck:
    338                    self.env.print_env()
    339                if res.diff:
    340                    print('\n'.join(res.diff))
    341            elif res.status == 'not run':
    342                notrun.append(name)
    343
    344            if res.interrupted:
    345                break
    346
    347        if notrun:
    348            print('Not run:', ' '.join(notrun))
    349
    350        if casenotrun:
    351            print('Some cases not run in:', ' '.join(casenotrun))
    352
    353        if failed:
    354            print('Failures:', ' '.join(failed))
    355            print(f'Failed {len(failed)} of {n_run} iotests')
    356            return False
    357        else:
    358            print(f'Passed all {n_run} iotests')
    359            return True