cachepc-qemu

Fork of AMDESE/qemu with changes for cachepc side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-qemu
Log | Files | Refs | Submodules | LICENSE | sfeed.txt

__init__.py (13793B)


      1"""
      2QEMU Monitor Protocol (QMP) development library & tooling.
      3
      4This package provides a fairly low-level class for communicating to QMP
      5protocol servers, as implemented by QEMU, the QEMU Guest Agent, and the
      6QEMU Storage Daemon. This library is not intended for production use.
      7
      8`QEMUMonitorProtocol` is the primary class of interest, and all errors
      9raised derive from `QMPError`.
     10"""
     11
     12# Copyright (C) 2009, 2010 Red Hat Inc.
     13#
     14# Authors:
     15#  Luiz Capitulino <lcapitulino@redhat.com>
     16#
     17# This work is licensed under the terms of the GNU GPL, version 2.  See
     18# the COPYING file in the top-level directory.
     19
     20import errno
     21import json
     22import logging
     23import socket
     24import struct
     25from types import TracebackType
     26from typing import (
     27    Any,
     28    Dict,
     29    List,
     30    Optional,
     31    TextIO,
     32    Tuple,
     33    Type,
     34    TypeVar,
     35    Union,
     36    cast,
     37)
     38
     39
     40#: QMPMessage is an entire QMP message of any kind.
     41QMPMessage = Dict[str, Any]
     42
     43#: QMPReturnValue is the 'return' value of a command.
     44QMPReturnValue = object
     45
     46#: QMPObject is any object in a QMP message.
     47QMPObject = Dict[str, object]
     48
     49# QMPMessage can be outgoing commands or incoming events/returns.
     50# QMPReturnValue is usually a dict/json object, but due to QAPI's
     51# 'returns-whitelist', it can actually be anything.
     52#
     53# {'return': {}} is a QMPMessage,
     54# {} is the QMPReturnValue.
     55
     56
     57InternetAddrT = Tuple[str, int]
     58UnixAddrT = str
     59SocketAddrT = Union[InternetAddrT, UnixAddrT]
     60
     61
     62class QMPError(Exception):
     63    """
     64    QMP base exception
     65    """
     66
     67
     68class QMPConnectError(QMPError):
     69    """
     70    QMP connection exception
     71    """
     72
     73
     74class QMPCapabilitiesError(QMPError):
     75    """
     76    QMP negotiate capabilities exception
     77    """
     78
     79
     80class QMPTimeoutError(QMPError):
     81    """
     82    QMP timeout exception
     83    """
     84
     85
     86class QMPProtocolError(QMPError):
     87    """
     88    QMP protocol error; unexpected response
     89    """
     90
     91
     92class QMPResponseError(QMPError):
     93    """
     94    Represents erroneous QMP monitor reply
     95    """
     96    def __init__(self, reply: QMPMessage):
     97        try:
     98            desc = reply['error']['desc']
     99        except KeyError:
    100            desc = reply
    101        super().__init__(desc)
    102        self.reply = reply
    103
    104
    105class QMPBadPortError(QMPError):
    106    """
    107    Unable to parse socket address: Port was non-numerical.
    108    """
    109
    110
    111class QEMUMonitorProtocol:
    112    """
    113    Provide an API to connect to QEMU via QEMU Monitor Protocol (QMP) and then
    114    allow to handle commands and events.
    115    """
    116
    117    #: Logger object for debugging messages
    118    logger = logging.getLogger('QMP')
    119
    120    def __init__(self, address: SocketAddrT,
    121                 server: bool = False,
    122                 nickname: Optional[str] = None):
    123        """
    124        Create a QEMUMonitorProtocol class.
    125
    126        @param address: QEMU address, can be either a unix socket path (string)
    127                        or a tuple in the form ( address, port ) for a TCP
    128                        connection
    129        @param server: server mode listens on the socket (bool)
    130        @raise OSError on socket connection errors
    131        @note No connection is established, this is done by the connect() or
    132              accept() methods
    133        """
    134        self.__events: List[QMPMessage] = []
    135        self.__address = address
    136        self.__sock = self.__get_sock()
    137        self.__sockfile: Optional[TextIO] = None
    138        self._nickname = nickname
    139        if self._nickname:
    140            self.logger = logging.getLogger('QMP').getChild(self._nickname)
    141        if server:
    142            self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    143            self.__sock.bind(self.__address)
    144            self.__sock.listen(1)
    145
    146    def __get_sock(self) -> socket.socket:
    147        if isinstance(self.__address, tuple):
    148            family = socket.AF_INET
    149        else:
    150            family = socket.AF_UNIX
    151        return socket.socket(family, socket.SOCK_STREAM)
    152
    153    def __negotiate_capabilities(self) -> QMPMessage:
    154        greeting = self.__json_read()
    155        if greeting is None or "QMP" not in greeting:
    156            raise QMPConnectError
    157        # Greeting seems ok, negotiate capabilities
    158        resp = self.cmd('qmp_capabilities')
    159        if resp and "return" in resp:
    160            return greeting
    161        raise QMPCapabilitiesError
    162
    163    def __json_read(self, only_event: bool = False) -> Optional[QMPMessage]:
    164        assert self.__sockfile is not None
    165        while True:
    166            data = self.__sockfile.readline()
    167            if not data:
    168                return None
    169            # By definition, any JSON received from QMP is a QMPMessage,
    170            # and we are asserting only at static analysis time that it
    171            # has a particular shape.
    172            resp: QMPMessage = json.loads(data)
    173            if 'event' in resp:
    174                self.logger.debug("<<< %s", resp)
    175                self.__events.append(resp)
    176                if not only_event:
    177                    continue
    178            return resp
    179
    180    def __get_events(self, wait: Union[bool, float] = False) -> None:
    181        """
    182        Check for new events in the stream and cache them in __events.
    183
    184        @param wait (bool): block until an event is available.
    185        @param wait (float): If wait is a float, treat it as a timeout value.
    186
    187        @raise QMPTimeoutError: If a timeout float is provided and the timeout
    188                                period elapses.
    189        @raise QMPConnectError: If wait is True but no events could be
    190                                retrieved or if some other error occurred.
    191        """
    192
    193        # Current timeout and blocking status
    194        current_timeout = self.__sock.gettimeout()
    195
    196        # Check for new events regardless and pull them into the cache:
    197        self.__sock.settimeout(0)  # i.e. setblocking(False)
    198        try:
    199            self.__json_read()
    200        except OSError as err:
    201            # EAGAIN: No data available; not critical
    202            if err.errno != errno.EAGAIN:
    203                raise
    204        finally:
    205            self.__sock.settimeout(current_timeout)
    206
    207        # Wait for new events, if needed.
    208        # if wait is 0.0, this means "no wait" and is also implicitly false.
    209        if not self.__events and wait:
    210            if isinstance(wait, float):
    211                self.__sock.settimeout(wait)
    212            try:
    213                ret = self.__json_read(only_event=True)
    214            except socket.timeout as err:
    215                raise QMPTimeoutError("Timeout waiting for event") from err
    216            except Exception as err:
    217                msg = "Error while reading from socket"
    218                raise QMPConnectError(msg) from err
    219            finally:
    220                self.__sock.settimeout(current_timeout)
    221
    222            if ret is None:
    223                raise QMPConnectError("Error while reading from socket")
    224
    225    T = TypeVar('T')
    226
    227    def __enter__(self: T) -> T:
    228        # Implement context manager enter function.
    229        return self
    230
    231    def __exit__(self,
    232                 # pylint: disable=duplicate-code
    233                 # see https://github.com/PyCQA/pylint/issues/3619
    234                 exc_type: Optional[Type[BaseException]],
    235                 exc_val: Optional[BaseException],
    236                 exc_tb: Optional[TracebackType]) -> None:
    237        # Implement context manager exit function.
    238        self.close()
    239
    240    @classmethod
    241    def parse_address(cls, address: str) -> SocketAddrT:
    242        """
    243        Parse a string into a QMP address.
    244
    245        Figure out if the argument is in the port:host form.
    246        If it's not, it's probably a file path.
    247        """
    248        components = address.split(':')
    249        if len(components) == 2:
    250            try:
    251                port = int(components[1])
    252            except ValueError:
    253                msg = f"Bad port: '{components[1]}' in '{address}'."
    254                raise QMPBadPortError(msg) from None
    255            return (components[0], port)
    256
    257        # Treat as filepath.
    258        return address
    259
    260    def connect(self, negotiate: bool = True) -> Optional[QMPMessage]:
    261        """
    262        Connect to the QMP Monitor and perform capabilities negotiation.
    263
    264        @return QMP greeting dict, or None if negotiate is false
    265        @raise OSError on socket connection errors
    266        @raise QMPConnectError if the greeting is not received
    267        @raise QMPCapabilitiesError if fails to negotiate capabilities
    268        """
    269        self.__sock.connect(self.__address)
    270        self.__sockfile = self.__sock.makefile(mode='r')
    271        if negotiate:
    272            return self.__negotiate_capabilities()
    273        return None
    274
    275    def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage:
    276        """
    277        Await connection from QMP Monitor and perform capabilities negotiation.
    278
    279        @param timeout: timeout in seconds (nonnegative float number, or
    280                        None). The value passed will set the behavior of the
    281                        underneath QMP socket as described in [1].
    282                        Default value is set to 15.0.
    283
    284        @return QMP greeting dict
    285        @raise OSError on socket connection errors
    286        @raise QMPConnectError if the greeting is not received
    287        @raise QMPCapabilitiesError if fails to negotiate capabilities
    288
    289        [1]
    290        https://docs.python.org/3/library/socket.html#socket.socket.settimeout
    291        """
    292        self.__sock.settimeout(timeout)
    293        self.__sock, _ = self.__sock.accept()
    294        self.__sockfile = self.__sock.makefile(mode='r')
    295        return self.__negotiate_capabilities()
    296
    297    def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage:
    298        """
    299        Send a QMP command to the QMP Monitor.
    300
    301        @param qmp_cmd: QMP command to be sent as a Python dict
    302        @return QMP response as a Python dict
    303        """
    304        self.logger.debug(">>> %s", qmp_cmd)
    305        self.__sock.sendall(json.dumps(qmp_cmd).encode('utf-8'))
    306        resp = self.__json_read()
    307        if resp is None:
    308            raise QMPConnectError("Unexpected empty reply from server")
    309        self.logger.debug("<<< %s", resp)
    310        return resp
    311
    312    def cmd(self, name: str,
    313            args: Optional[Dict[str, object]] = None,
    314            cmd_id: Optional[object] = None) -> QMPMessage:
    315        """
    316        Build a QMP command and send it to the QMP Monitor.
    317
    318        @param name: command name (string)
    319        @param args: command arguments (dict)
    320        @param cmd_id: command id (dict, list, string or int)
    321        """
    322        qmp_cmd: QMPMessage = {'execute': name}
    323        if args:
    324            qmp_cmd['arguments'] = args
    325        if cmd_id:
    326            qmp_cmd['id'] = cmd_id
    327        return self.cmd_obj(qmp_cmd)
    328
    329    def command(self, cmd: str, **kwds: object) -> QMPReturnValue:
    330        """
    331        Build and send a QMP command to the monitor, report errors if any
    332        """
    333        ret = self.cmd(cmd, kwds)
    334        if 'error' in ret:
    335            raise QMPResponseError(ret)
    336        if 'return' not in ret:
    337            raise QMPProtocolError(
    338                "'return' key not found in QMP response '{}'".format(str(ret))
    339            )
    340        return cast(QMPReturnValue, ret['return'])
    341
    342    def pull_event(self,
    343                   wait: Union[bool, float] = False) -> Optional[QMPMessage]:
    344        """
    345        Pulls a single event.
    346
    347        @param wait (bool): block until an event is available.
    348        @param wait (float): If wait is a float, treat it as a timeout value.
    349
    350        @raise QMPTimeoutError: If a timeout float is provided and the timeout
    351                                period elapses.
    352        @raise QMPConnectError: If wait is True but no events could be
    353                                retrieved or if some other error occurred.
    354
    355        @return The first available QMP event, or None.
    356        """
    357        self.__get_events(wait)
    358
    359        if self.__events:
    360            return self.__events.pop(0)
    361        return None
    362
    363    def get_events(self, wait: bool = False) -> List[QMPMessage]:
    364        """
    365        Get a list of available QMP events and clear all pending events.
    366
    367        @param wait (bool): block until an event is available.
    368        @param wait (float): If wait is a float, treat it as a timeout value.
    369
    370        @raise QMPTimeoutError: If a timeout float is provided and the timeout
    371                                period elapses.
    372        @raise QMPConnectError: If wait is True but no events could be
    373                                retrieved or if some other error occurred.
    374
    375        @return The list of available QMP events.
    376        """
    377        self.__get_events(wait)
    378        events = self.__events
    379        self.__events = []
    380        return events
    381
    382    def clear_events(self) -> None:
    383        """
    384        Clear current list of pending events.
    385        """
    386        self.__events = []
    387
    388    def close(self) -> None:
    389        """
    390        Close the socket and socket file.
    391        """
    392        if self.__sock:
    393            self.__sock.close()
    394        if self.__sockfile:
    395            self.__sockfile.close()
    396
    397    def settimeout(self, timeout: Optional[float]) -> None:
    398        """
    399        Set the socket timeout.
    400
    401        @param timeout (float): timeout in seconds (non-zero), or None.
    402        @note This is a wrap around socket.settimeout
    403
    404        @raise ValueError: if timeout was set to 0.
    405        """
    406        if timeout == 0:
    407            msg = "timeout cannot be 0; this engages non-blocking mode."
    408            msg += " Use 'None' instead to disable timeouts."
    409            raise ValueError(msg)
    410        self.__sock.settimeout(timeout)
    411
    412    def send_fd_scm(self, fd: int) -> None:
    413        """
    414        Send a file descriptor to the remote via SCM_RIGHTS.
    415        """
    416        if self.__sock.family != socket.AF_UNIX:
    417            raise RuntimeError("Can't use SCM_RIGHTS on non-AF_UNIX socket.")
    418
    419        self.__sock.sendmsg(
    420            [b' '],
    421            [(socket.SOL_SOCKET, socket.SCM_RIGHTS, struct.pack('@i', fd))]
    422        )