cachepc-qemu

Fork of AMDESE/qemu with changes for cachepc side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-qemu
Log | Files | Refs | Submodules | LICENSE | sfeed.txt

engine.py (17464B)


      1#
      2# Migration test main engine
      3#
      4# Copyright (c) 2016 Red Hat, Inc.
      5#
      6# This library is free software; you can redistribute it and/or
      7# modify it under the terms of the GNU Lesser General Public
      8# License as published by the Free Software Foundation; either
      9# version 2.1 of the License, or (at your option) any later version.
     10#
     11# This library is distributed in the hope that it will be useful,
     12# but WITHOUT ANY WARRANTY; without even the implied warranty of
     13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
     14# Lesser General Public License for more details.
     15#
     16# You should have received a copy of the GNU Lesser General Public
     17# License along with this library; if not, see <http://www.gnu.org/licenses/>.
     18#
     19
     20
     21import os
     22import re
     23import sys
     24import time
     25
     26from guestperf.progress import Progress, ProgressStats
     27from guestperf.report import Report
     28from guestperf.timings import TimingRecord, Timings
     29
     30sys.path.append(os.path.join(os.path.dirname(__file__),
     31                             '..', '..', '..', 'python'))
     32from qemu.machine import QEMUMachine
     33
     34
     35class Engine(object):
     36
     37    def __init__(self, binary, dst_host, kernel, initrd, transport="tcp",
     38                 sleep=15, verbose=False, debug=False):
     39
     40        self._binary = binary # Path to QEMU binary
     41        self._dst_host = dst_host # Hostname of target host
     42        self._kernel = kernel # Path to kernel image
     43        self._initrd = initrd # Path to stress initrd
     44        self._transport = transport # 'unix' or 'tcp' or 'rdma'
     45        self._sleep = sleep
     46        self._verbose = verbose
     47        self._debug = debug
     48
     49        if debug:
     50            self._verbose = debug
     51
     52    def _vcpu_timing(self, pid, tid_list):
     53        records = []
     54        now = time.time()
     55
     56        jiffies_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK'])
     57        for tid in tid_list:
     58            statfile = "/proc/%d/task/%d/stat" % (pid, tid)
     59            with open(statfile, "r") as fh:
     60                stat = fh.readline()
     61                fields = stat.split(" ")
     62                stime = int(fields[13])
     63                utime = int(fields[14])
     64                records.append(TimingRecord(tid, now, 1000 * (stime + utime) / jiffies_per_sec))
     65        return records
     66
     67    def _cpu_timing(self, pid):
     68        records = []
     69        now = time.time()
     70
     71        jiffies_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK'])
     72        statfile = "/proc/%d/stat" % pid
     73        with open(statfile, "r") as fh:
     74            stat = fh.readline()
     75            fields = stat.split(" ")
     76            stime = int(fields[13])
     77            utime = int(fields[14])
     78            return TimingRecord(pid, now, 1000 * (stime + utime) / jiffies_per_sec)
     79
     80    def _migrate_progress(self, vm):
     81        info = vm.command("query-migrate")
     82
     83        if "ram" not in info:
     84            info["ram"] = {}
     85
     86        return Progress(
     87            info.get("status", "active"),
     88            ProgressStats(
     89                info["ram"].get("transferred", 0),
     90                info["ram"].get("remaining", 0),
     91                info["ram"].get("total", 0),
     92                info["ram"].get("duplicate", 0),
     93                info["ram"].get("skipped", 0),
     94                info["ram"].get("normal", 0),
     95                info["ram"].get("normal-bytes", 0),
     96                info["ram"].get("dirty-pages-rate", 0),
     97                info["ram"].get("mbps", 0),
     98                info["ram"].get("dirty-sync-count", 0)
     99            ),
    100            time.time(),
    101            info.get("total-time", 0),
    102            info.get("downtime", 0),
    103            info.get("expected-downtime", 0),
    104            info.get("setup-time", 0),
    105            info.get("cpu-throttle-percentage", 0),
    106        )
    107
    108    def _migrate(self, hardware, scenario, src, dst, connect_uri):
    109        src_qemu_time = []
    110        src_vcpu_time = []
    111        src_pid = src.get_pid()
    112
    113        vcpus = src.command("query-cpus-fast")
    114        src_threads = []
    115        for vcpu in vcpus:
    116            src_threads.append(vcpu["thread-id"])
    117
    118        # XXX how to get dst timings on remote host ?
    119
    120        if self._verbose:
    121            print("Sleeping %d seconds for initial guest workload run" % self._sleep)
    122        sleep_secs = self._sleep
    123        while sleep_secs > 1:
    124            src_qemu_time.append(self._cpu_timing(src_pid))
    125            src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads))
    126            time.sleep(1)
    127            sleep_secs -= 1
    128
    129        if self._verbose:
    130            print("Starting migration")
    131        if scenario._auto_converge:
    132            resp = src.command("migrate-set-capabilities",
    133                               capabilities = [
    134                                   { "capability": "auto-converge",
    135                                     "state": True }
    136                               ])
    137            resp = src.command("migrate-set-parameters",
    138                               cpu_throttle_increment=scenario._auto_converge_step)
    139
    140        if scenario._post_copy:
    141            resp = src.command("migrate-set-capabilities",
    142                               capabilities = [
    143                                   { "capability": "postcopy-ram",
    144                                     "state": True }
    145                               ])
    146            resp = dst.command("migrate-set-capabilities",
    147                               capabilities = [
    148                                   { "capability": "postcopy-ram",
    149                                     "state": True }
    150                               ])
    151
    152        resp = src.command("migrate-set-parameters",
    153                           max_bandwidth=scenario._bandwidth * 1024 * 1024)
    154
    155        resp = src.command("migrate-set-parameters",
    156                           downtime_limit=scenario._downtime)
    157
    158        if scenario._compression_mt:
    159            resp = src.command("migrate-set-capabilities",
    160                               capabilities = [
    161                                   { "capability": "compress",
    162                                     "state": True }
    163                               ])
    164            resp = src.command("migrate-set-parameters",
    165                               compress_threads=scenario._compression_mt_threads)
    166            resp = dst.command("migrate-set-capabilities",
    167                               capabilities = [
    168                                   { "capability": "compress",
    169                                     "state": True }
    170                               ])
    171            resp = dst.command("migrate-set-parameters",
    172                               decompress_threads=scenario._compression_mt_threads)
    173
    174        if scenario._compression_xbzrle:
    175            resp = src.command("migrate-set-capabilities",
    176                               capabilities = [
    177                                   { "capability": "xbzrle",
    178                                     "state": True }
    179                               ])
    180            resp = dst.command("migrate-set-capabilities",
    181                               capabilities = [
    182                                   { "capability": "xbzrle",
    183                                     "state": True }
    184                               ])
    185            resp = src.command("migrate-set-parameters",
    186                               xbzrle_cache_size=(
    187                                   hardware._mem *
    188                                   1024 * 1024 * 1024 / 100 *
    189                                   scenario._compression_xbzrle_cache))
    190
    191        if scenario._multifd:
    192            resp = src.command("migrate-set-capabilities",
    193                               capabilities = [
    194                                   { "capability": "multifd",
    195                                     "state": True }
    196                               ])
    197            resp = src.command("migrate-set-parameters",
    198                               multifd_channels=scenario._multifd_channels)
    199            resp = dst.command("migrate-set-capabilities",
    200                               capabilities = [
    201                                   { "capability": "multifd",
    202                                     "state": True }
    203                               ])
    204            resp = dst.command("migrate-set-parameters",
    205                               multifd_channels=scenario._multifd_channels)
    206
    207        resp = src.command("migrate", uri=connect_uri)
    208
    209        post_copy = False
    210        paused = False
    211
    212        progress_history = []
    213
    214        start = time.time()
    215        loop = 0
    216        while True:
    217            loop = loop + 1
    218            time.sleep(0.05)
    219
    220            progress = self._migrate_progress(src)
    221            if (loop % 20) == 0:
    222                src_qemu_time.append(self._cpu_timing(src_pid))
    223                src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads))
    224
    225            if (len(progress_history) == 0 or
    226                (progress_history[-1]._ram._iterations <
    227                 progress._ram._iterations)):
    228                progress_history.append(progress)
    229
    230            if progress._status in ("completed", "failed", "cancelled"):
    231                if progress._status == "completed" and paused:
    232                    dst.command("cont")
    233                if progress_history[-1] != progress:
    234                    progress_history.append(progress)
    235
    236                if progress._status == "completed":
    237                    if self._verbose:
    238                        print("Sleeping %d seconds for final guest workload run" % self._sleep)
    239                    sleep_secs = self._sleep
    240                    while sleep_secs > 1:
    241                        time.sleep(1)
    242                        src_qemu_time.append(self._cpu_timing(src_pid))
    243                        src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads))
    244                        sleep_secs -= 1
    245
    246                return [progress_history, src_qemu_time, src_vcpu_time]
    247
    248            if self._verbose and (loop % 20) == 0:
    249                print("Iter %d: remain %5dMB of %5dMB (total %5dMB @ %5dMb/sec)" % (
    250                    progress._ram._iterations,
    251                    progress._ram._remaining_bytes / (1024 * 1024),
    252                    progress._ram._total_bytes / (1024 * 1024),
    253                    progress._ram._transferred_bytes / (1024 * 1024),
    254                    progress._ram._transfer_rate_mbs,
    255                ))
    256
    257            if progress._ram._iterations > scenario._max_iters:
    258                if self._verbose:
    259                    print("No completion after %d iterations over RAM" % scenario._max_iters)
    260                src.command("migrate_cancel")
    261                continue
    262
    263            if time.time() > (start + scenario._max_time):
    264                if self._verbose:
    265                    print("No completion after %d seconds" % scenario._max_time)
    266                src.command("migrate_cancel")
    267                continue
    268
    269            if (scenario._post_copy and
    270                progress._ram._iterations >= scenario._post_copy_iters and
    271                not post_copy):
    272                if self._verbose:
    273                    print("Switching to post-copy after %d iterations" % scenario._post_copy_iters)
    274                resp = src.command("migrate-start-postcopy")
    275                post_copy = True
    276
    277            if (scenario._pause and
    278                progress._ram._iterations >= scenario._pause_iters and
    279                not paused):
    280                if self._verbose:
    281                    print("Pausing VM after %d iterations" % scenario._pause_iters)
    282                resp = src.command("stop")
    283                paused = True
    284
    285    def _get_common_args(self, hardware, tunnelled=False):
    286        args = [
    287            "noapic",
    288            "edd=off",
    289            "printk.time=1",
    290            "noreplace-smp",
    291            "cgroup_disable=memory",
    292            "pci=noearly",
    293            "console=ttyS0",
    294        ]
    295        if self._debug:
    296            args.append("debug")
    297        else:
    298            args.append("quiet")
    299
    300        args.append("ramsize=%s" % hardware._mem)
    301
    302        cmdline = " ".join(args)
    303        if tunnelled:
    304            cmdline = "'" + cmdline + "'"
    305
    306        argv = [
    307            "-accel", "kvm",
    308            "-cpu", "host",
    309            "-kernel", self._kernel,
    310            "-initrd", self._initrd,
    311            "-append", cmdline,
    312            "-chardev", "stdio,id=cdev0",
    313            "-device", "isa-serial,chardev=cdev0",
    314            "-m", str((hardware._mem * 1024) + 512),
    315            "-smp", str(hardware._cpus),
    316        ]
    317
    318        if self._debug:
    319            argv.extend(["-device", "sga"])
    320
    321        if hardware._prealloc_pages:
    322            argv_source += ["-mem-path", "/dev/shm",
    323                            "-mem-prealloc"]
    324        if hardware._locked_pages:
    325            argv_source += ["-overcommit", "mem-lock=on"]
    326        if hardware._huge_pages:
    327            pass
    328
    329        return argv
    330
    331    def _get_src_args(self, hardware):
    332        return self._get_common_args(hardware)
    333
    334    def _get_dst_args(self, hardware, uri):
    335        tunnelled = False
    336        if self._dst_host != "localhost":
    337            tunnelled = True
    338        argv = self._get_common_args(hardware, tunnelled)
    339        return argv + ["-incoming", uri]
    340
    341    @staticmethod
    342    def _get_common_wrapper(cpu_bind, mem_bind):
    343        wrapper = []
    344        if len(cpu_bind) > 0 or len(mem_bind) > 0:
    345            wrapper.append("numactl")
    346            if cpu_bind:
    347                wrapper.append("--physcpubind=%s" % ",".join(cpu_bind))
    348            if mem_bind:
    349                wrapper.append("--membind=%s" % ",".join(mem_bind))
    350
    351        return wrapper
    352
    353    def _get_src_wrapper(self, hardware):
    354        return self._get_common_wrapper(hardware._src_cpu_bind, hardware._src_mem_bind)
    355
    356    def _get_dst_wrapper(self, hardware):
    357        wrapper = self._get_common_wrapper(hardware._dst_cpu_bind, hardware._dst_mem_bind)
    358        if self._dst_host != "localhost":
    359            return ["ssh",
    360                    "-R", "9001:localhost:9001",
    361                    self._dst_host] + wrapper
    362        else:
    363            return wrapper
    364
    365    def _get_timings(self, vm):
    366        log = vm.get_log()
    367        if not log:
    368            return []
    369        if self._debug:
    370            print(log)
    371
    372        regex = r"[^\s]+\s\((\d+)\):\sINFO:\s(\d+)ms\scopied\s\d+\sGB\sin\s(\d+)ms"
    373        matcher = re.compile(regex)
    374        records = []
    375        for line in log.split("\n"):
    376            match = matcher.match(line)
    377            if match:
    378                records.append(TimingRecord(int(match.group(1)),
    379                                            int(match.group(2)) / 1000.0,
    380                                            int(match.group(3))))
    381        return records
    382
    383    def run(self, hardware, scenario, result_dir=os.getcwd()):
    384        abs_result_dir = os.path.join(result_dir, scenario._name)
    385
    386        if self._transport == "tcp":
    387            uri = "tcp:%s:9000" % self._dst_host
    388        elif self._transport == "rdma":
    389            uri = "rdma:%s:9000" % self._dst_host
    390        elif self._transport == "unix":
    391            if self._dst_host != "localhost":
    392                raise Exception("Running use unix migration transport for non-local host")
    393            uri = "unix:/var/tmp/qemu-migrate-%d.migrate" % os.getpid()
    394            try:
    395                os.remove(uri[5:])
    396                os.remove(monaddr)
    397            except:
    398                pass
    399
    400        if self._dst_host != "localhost":
    401            dstmonaddr = ("localhost", 9001)
    402        else:
    403            dstmonaddr = "/var/tmp/qemu-dst-%d-monitor.sock" % os.getpid()
    404        srcmonaddr = "/var/tmp/qemu-src-%d-monitor.sock" % os.getpid()
    405
    406        src = QEMUMachine(self._binary,
    407                          args=self._get_src_args(hardware),
    408                          wrapper=self._get_src_wrapper(hardware),
    409                          name="qemu-src-%d" % os.getpid(),
    410                          monitor_address=srcmonaddr)
    411
    412        dst = QEMUMachine(self._binary,
    413                          args=self._get_dst_args(hardware, uri),
    414                          wrapper=self._get_dst_wrapper(hardware),
    415                          name="qemu-dst-%d" % os.getpid(),
    416                          monitor_address=dstmonaddr)
    417
    418        try:
    419            src.launch()
    420            dst.launch()
    421
    422            ret = self._migrate(hardware, scenario, src, dst, uri)
    423            progress_history = ret[0]
    424            qemu_timings = ret[1]
    425            vcpu_timings = ret[2]
    426            if uri[0:5] == "unix:" and os.path.exists(uri[5:]):
    427                os.remove(uri[5:])
    428
    429            if os.path.exists(srcmonaddr):
    430                os.remove(srcmonaddr)
    431
    432            if self._dst_host == "localhost" and os.path.exists(dstmonaddr):
    433                os.remove(dstmonaddr)
    434
    435            if self._verbose:
    436                print("Finished migration")
    437
    438            src.shutdown()
    439            dst.shutdown()
    440
    441            return Report(hardware, scenario, progress_history,
    442                          Timings(self._get_timings(src) + self._get_timings(dst)),
    443                          Timings(qemu_timings),
    444                          Timings(vcpu_timings),
    445                          self._binary, self._dst_host, self._kernel,
    446                          self._initrd, self._transport, self._sleep)
    447        except Exception as e:
    448            if self._debug:
    449                print("Failed: %s" % str(e))
    450            try:
    451                src.shutdown()
    452            except:
    453                pass
    454            try:
    455                dst.shutdown()
    456            except:
    457                pass
    458
    459            if self._debug:
    460                print(src.get_log())
    461                print(dst.get_log())
    462            raise
    463