cachepc-qemu

Fork of AMDESE/qemu with changes for cachepc side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-qemu
Log | Files | Refs | Submodules | LICENSE | sfeed.txt

qtest.py (4728B)


      1"""
      2QEMU qtest library
      3
      4qtest offers the QEMUQtestProtocol and QEMUQTestMachine classes, which
      5offer a connection to QEMU's qtest protocol socket, and a qtest-enabled
      6subclass of QEMUMachine, respectively.
      7"""
      8
      9# Copyright (C) 2015 Red Hat Inc.
     10#
     11# Authors:
     12#  Fam Zheng <famz@redhat.com>
     13#
     14# This work is licensed under the terms of the GNU GPL, version 2.  See
     15# the COPYING file in the top-level directory.
     16#
     17# Based on qmp.py.
     18#
     19
     20import os
     21import socket
     22from typing import (
     23    List,
     24    Optional,
     25    Sequence,
     26    TextIO,
     27)
     28
     29from qemu.qmp import SocketAddrT  # pylint: disable=import-error
     30
     31from .machine import QEMUMachine
     32
     33
     34class QEMUQtestProtocol:
     35    """
     36    QEMUQtestProtocol implements a connection to a qtest socket.
     37
     38    :param address: QEMU address, can be either a unix socket path (string)
     39                    or a tuple in the form ( address, port ) for a TCP
     40                    connection
     41    :param server: server mode, listens on the socket (bool)
     42    :raise socket.error: on socket connection errors
     43
     44    .. note::
     45       No conection is estabalished by __init__(), this is done
     46       by the connect() or accept() methods.
     47    """
     48    def __init__(self, address: SocketAddrT,
     49                 server: bool = False):
     50        self._address = address
     51        self._sock = self._get_sock()
     52        self._sockfile: Optional[TextIO] = None
     53        if server:
     54            self._sock.bind(self._address)
     55            self._sock.listen(1)
     56
     57    def _get_sock(self) -> socket.socket:
     58        if isinstance(self._address, tuple):
     59            family = socket.AF_INET
     60        else:
     61            family = socket.AF_UNIX
     62        return socket.socket(family, socket.SOCK_STREAM)
     63
     64    def connect(self) -> None:
     65        """
     66        Connect to the qtest socket.
     67
     68        @raise socket.error on socket connection errors
     69        """
     70        self._sock.connect(self._address)
     71        self._sockfile = self._sock.makefile(mode='r')
     72
     73    def accept(self) -> None:
     74        """
     75        Await connection from QEMU.
     76
     77        @raise socket.error on socket connection errors
     78        """
     79        self._sock, _ = self._sock.accept()
     80        self._sockfile = self._sock.makefile(mode='r')
     81
     82    def cmd(self, qtest_cmd: str) -> str:
     83        """
     84        Send a qtest command on the wire.
     85
     86        @param qtest_cmd: qtest command text to be sent
     87        """
     88        assert self._sockfile is not None
     89        self._sock.sendall((qtest_cmd + "\n").encode('utf-8'))
     90        resp = self._sockfile.readline()
     91        return resp
     92
     93    def close(self) -> None:
     94        """
     95        Close this socket.
     96        """
     97        self._sock.close()
     98        if self._sockfile:
     99            self._sockfile.close()
    100            self._sockfile = None
    101
    102    def settimeout(self, timeout: Optional[float]) -> None:
    103        """Set a timeout, in seconds."""
    104        self._sock.settimeout(timeout)
    105
    106
    107class QEMUQtestMachine(QEMUMachine):
    108    """
    109    A QEMU VM, with a qtest socket available.
    110    """
    111
    112    def __init__(self,
    113                 binary: str,
    114                 args: Sequence[str] = (),
    115                 wrapper: Sequence[str] = (),
    116                 name: Optional[str] = None,
    117                 base_temp_dir: str = "/var/tmp",
    118                 sock_dir: Optional[str] = None,
    119                 qmp_timer: Optional[float] = None):
    120        # pylint: disable=too-many-arguments
    121
    122        if name is None:
    123            name = "qemu-%d" % os.getpid()
    124        if sock_dir is None:
    125            sock_dir = base_temp_dir
    126        super().__init__(binary, args, wrapper=wrapper, name=name,
    127                         base_temp_dir=base_temp_dir,
    128                         sock_dir=sock_dir, qmp_timer=qmp_timer)
    129        self._qtest: Optional[QEMUQtestProtocol] = None
    130        self._qtest_path = os.path.join(sock_dir, name + "-qtest.sock")
    131
    132    @property
    133    def _base_args(self) -> List[str]:
    134        args = super()._base_args
    135        args.extend([
    136            '-qtest', f"unix:path={self._qtest_path}",
    137            '-accel', 'qtest'
    138        ])
    139        return args
    140
    141    def _pre_launch(self) -> None:
    142        super()._pre_launch()
    143        self._qtest = QEMUQtestProtocol(self._qtest_path, server=True)
    144
    145    def _post_launch(self) -> None:
    146        assert self._qtest is not None
    147        super()._post_launch()
    148        self._qtest.accept()
    149
    150    def _post_shutdown(self) -> None:
    151        super()._post_shutdown()
    152        self._remove_if_exists(self._qtest_path)
    153
    154    def qtest(self, cmd: str) -> str:
    155        """
    156        Send a qtest command to the guest.
    157
    158        :param cmd: qtest command to send
    159        :return: qtest server response
    160        """
    161        if self._qtest is None:
    162            raise RuntimeError("qtest socket not available")
    163        return self._qtest.cmd(cmd)