cachepc-qemu

Fork of AMDESE/qemu with changes for cachepc side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-qemu
Log | Files | Refs | Submodules | LICENSE | sfeed.txt

qmp_client.py (22375B)


      1"""
      2QMP Protocol Implementation
      3
      4This module provides the `QMPClient` class, which can be used to connect
      5and send commands to a QMP server such as QEMU. The QMP class can be
      6used to either connect to a listening server, or used to listen and
      7accept an incoming connection from that server.
      8"""
      9
     10import asyncio
     11import logging
     12import socket
     13import struct
     14from typing import (
     15    Dict,
     16    List,
     17    Mapping,
     18    Optional,
     19    Union,
     20    cast,
     21)
     22
     23from .error import AQMPError, ProtocolError
     24from .events import Events
     25from .message import Message
     26from .models import ErrorResponse, Greeting
     27from .protocol import AsyncProtocol, Runstate, require
     28from .util import (
     29    bottom_half,
     30    exception_summary,
     31    pretty_traceback,
     32    upper_half,
     33)
     34
     35
     36class _WrappedProtocolError(ProtocolError):
     37    """
     38    Abstract exception class for Protocol errors that wrap an Exception.
     39
     40    :param error_message: Human-readable string describing the error.
     41    :param exc: The root-cause exception.
     42    """
     43    def __init__(self, error_message: str, exc: Exception):
     44        super().__init__(error_message)
     45        self.exc = exc
     46
     47    def __str__(self) -> str:
     48        return f"{self.error_message}: {self.exc!s}"
     49
     50
     51class GreetingError(_WrappedProtocolError):
     52    """
     53    An exception occurred during the Greeting phase.
     54
     55    :param error_message: Human-readable string describing the error.
     56    :param exc: The root-cause exception.
     57    """
     58
     59
     60class NegotiationError(_WrappedProtocolError):
     61    """
     62    An exception occurred during the Negotiation phase.
     63
     64    :param error_message: Human-readable string describing the error.
     65    :param exc: The root-cause exception.
     66    """
     67
     68
     69class ExecuteError(AQMPError):
     70    """
     71    Exception raised by `QMPClient.execute()` on RPC failure.
     72
     73    :param error_response: The RPC error response object.
     74    :param sent: The sent RPC message that caused the failure.
     75    :param received: The raw RPC error reply received.
     76    """
     77    def __init__(self, error_response: ErrorResponse,
     78                 sent: Message, received: Message):
     79        super().__init__(error_response.error.desc)
     80        #: The sent `Message` that caused the failure
     81        self.sent: Message = sent
     82        #: The received `Message` that indicated failure
     83        self.received: Message = received
     84        #: The parsed error response
     85        self.error: ErrorResponse = error_response
     86        #: The QMP error class
     87        self.error_class: str = error_response.error.class_
     88
     89
     90class ExecInterruptedError(AQMPError):
     91    """
     92    Exception raised by `execute()` (et al) when an RPC is interrupted.
     93
     94    This error is raised when an `execute()` statement could not be
     95    completed.  This can occur because the connection itself was
     96    terminated before a reply was received.
     97
     98    The true cause of the interruption will be available via `disconnect()`.
     99    """
    100
    101
    102class _MsgProtocolError(ProtocolError):
    103    """
    104    Abstract error class for protocol errors that have a `Message` object.
    105
    106    This Exception class is used for protocol errors where the `Message`
    107    was mechanically understood, but was found to be inappropriate or
    108    malformed.
    109
    110    :param error_message: Human-readable string describing the error.
    111    :param msg: The QMP `Message` that caused the error.
    112    """
    113    def __init__(self, error_message: str, msg: Message):
    114        super().__init__(error_message)
    115        #: The received `Message` that caused the error.
    116        self.msg: Message = msg
    117
    118    def __str__(self) -> str:
    119        return "\n".join([
    120            super().__str__(),
    121            f"  Message was: {str(self.msg)}\n",
    122        ])
    123
    124
    125class ServerParseError(_MsgProtocolError):
    126    """
    127    The Server sent a `Message` indicating parsing failure.
    128
    129    i.e. A reply has arrived from the server, but it is missing the "ID"
    130    field, indicating a parsing error.
    131
    132    :param error_message: Human-readable string describing the error.
    133    :param msg: The QMP `Message` that caused the error.
    134    """
    135
    136
    137class BadReplyError(_MsgProtocolError):
    138    """
    139    An execution reply was successfully routed, but not understood.
    140
    141    If a QMP message is received with an 'id' field to allow it to be
    142    routed, but is otherwise malformed, this exception will be raised.
    143
    144    A reply message is malformed if it is missing either the 'return' or
    145    'error' keys, or if the 'error' value has missing keys or members of
    146    the wrong type.
    147
    148    :param error_message: Human-readable string describing the error.
    149    :param msg: The malformed reply that was received.
    150    :param sent: The message that was sent that prompted the error.
    151    """
    152    def __init__(self, error_message: str, msg: Message, sent: Message):
    153        super().__init__(error_message, msg)
    154        #: The sent `Message` that caused the failure
    155        self.sent = sent
    156
    157
    158class QMPClient(AsyncProtocol[Message], Events):
    159    """
    160    Implements a QMP client connection.
    161
    162    QMP can be used to establish a connection as either the transport
    163    client or server, though this class always acts as the QMP client.
    164
    165    :param name: Optional nickname for the connection, used for logging.
    166
    167    Basic script-style usage looks like this::
    168
    169      qmp = QMPClient('my_virtual_machine_name')
    170      await qmp.connect(('127.0.0.1', 1234))
    171      ...
    172      res = await qmp.execute('block-query')
    173      ...
    174      await qmp.disconnect()
    175
    176    Basic async client-style usage looks like this::
    177
    178      class Client:
    179          def __init__(self, name: str):
    180              self.qmp = QMPClient(name)
    181
    182          async def watch_events(self):
    183              try:
    184                  async for event in self.qmp.events:
    185                      print(f"Event: {event['event']}")
    186              except asyncio.CancelledError:
    187                  return
    188
    189          async def run(self, address='/tmp/qemu.socket'):
    190              await self.qmp.connect(address)
    191              asyncio.create_task(self.watch_events())
    192              await self.qmp.runstate_changed.wait()
    193              await self.disconnect()
    194
    195    See `aqmp.events` for more detail on event handling patterns.
    196    """
    197    #: Logger object used for debugging messages.
    198    logger = logging.getLogger(__name__)
    199
    200    # Read buffer limit; large enough to accept query-qmp-schema
    201    _limit = (256 * 1024)
    202
    203    # Type alias for pending execute() result items
    204    _PendingT = Union[Message, ExecInterruptedError]
    205
    206    def __init__(self, name: Optional[str] = None) -> None:
    207        super().__init__(name)
    208        Events.__init__(self)
    209
    210        #: Whether or not to await a greeting after establishing a connection.
    211        self.await_greeting: bool = True
    212
    213        #: Whether or not to perform capabilities negotiation upon connection.
    214        #: Implies `await_greeting`.
    215        self.negotiate: bool = True
    216
    217        # Cached Greeting, if one was awaited.
    218        self._greeting: Optional[Greeting] = None
    219
    220        # Command ID counter
    221        self._execute_id = 0
    222
    223        # Incoming RPC reply messages.
    224        self._pending: Dict[
    225            Union[str, None],
    226            'asyncio.Queue[QMPClient._PendingT]'
    227        ] = {}
    228
    229    @property
    230    def greeting(self) -> Optional[Greeting]:
    231        """The `Greeting` from the QMP server, if any."""
    232        return self._greeting
    233
    234    @upper_half
    235    async def _establish_session(self) -> None:
    236        """
    237        Initiate the QMP session.
    238
    239        Wait for the QMP greeting and perform capabilities negotiation.
    240
    241        :raise GreetingError: When the greeting is not understood.
    242        :raise NegotiationError: If the negotiation fails.
    243        :raise EOFError: When the server unexpectedly hangs up.
    244        :raise OSError: For underlying stream errors.
    245        """
    246        self._greeting = None
    247        self._pending = {}
    248
    249        if self.await_greeting or self.negotiate:
    250            self._greeting = await self._get_greeting()
    251
    252        if self.negotiate:
    253            await self._negotiate()
    254
    255        # This will start the reader/writers:
    256        await super()._establish_session()
    257
    258    @upper_half
    259    async def _get_greeting(self) -> Greeting:
    260        """
    261        :raise GreetingError: When the greeting is not understood.
    262        :raise EOFError: When the server unexpectedly hangs up.
    263        :raise OSError: For underlying stream errors.
    264
    265        :return: the Greeting object given by the server.
    266        """
    267        self.logger.debug("Awaiting greeting ...")
    268
    269        try:
    270            msg = await self._recv()
    271            return Greeting(msg)
    272        except (ProtocolError, KeyError, TypeError) as err:
    273            emsg = "Did not understand Greeting"
    274            self.logger.error("%s: %s", emsg, exception_summary(err))
    275            self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
    276            raise GreetingError(emsg, err) from err
    277        except BaseException as err:
    278            # EOFError, OSError, or something unexpected.
    279            emsg = "Failed to receive Greeting"
    280            self.logger.error("%s: %s", emsg, exception_summary(err))
    281            self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
    282            raise
    283
    284    @upper_half
    285    async def _negotiate(self) -> None:
    286        """
    287        Perform QMP capabilities negotiation.
    288
    289        :raise NegotiationError: When negotiation fails.
    290        :raise EOFError: When the server unexpectedly hangs up.
    291        :raise OSError: For underlying stream errors.
    292        """
    293        self.logger.debug("Negotiating capabilities ...")
    294
    295        arguments: Dict[str, List[str]] = {'enable': []}
    296        if self._greeting and 'oob' in self._greeting.QMP.capabilities:
    297            arguments['enable'].append('oob')
    298        msg = self.make_execute_msg('qmp_capabilities', arguments=arguments)
    299
    300        # It's not safe to use execute() here, because the reader/writers
    301        # aren't running. AsyncProtocol *requires* that a new session
    302        # does not fail after the reader/writers are running!
    303        try:
    304            await self._send(msg)
    305            reply = await self._recv()
    306            assert 'return' in reply
    307            assert 'error' not in reply
    308        except (ProtocolError, AssertionError) as err:
    309            emsg = "Negotiation failed"
    310            self.logger.error("%s: %s", emsg, exception_summary(err))
    311            self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
    312            raise NegotiationError(emsg, err) from err
    313        except BaseException as err:
    314            # EOFError, OSError, or something unexpected.
    315            emsg = "Negotiation failed"
    316            self.logger.error("%s: %s", emsg, exception_summary(err))
    317            self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
    318            raise
    319
    320    @bottom_half
    321    async def _bh_disconnect(self) -> None:
    322        try:
    323            await super()._bh_disconnect()
    324        finally:
    325            if self._pending:
    326                self.logger.debug("Cancelling pending executions")
    327            keys = self._pending.keys()
    328            for key in keys:
    329                self.logger.debug("Cancelling execution '%s'", key)
    330                self._pending[key].put_nowait(
    331                    ExecInterruptedError("Disconnected")
    332                )
    333
    334            self.logger.debug("QMP Disconnected.")
    335
    336    @upper_half
    337    def _cleanup(self) -> None:
    338        super()._cleanup()
    339        assert not self._pending
    340
    341    @bottom_half
    342    async def _on_message(self, msg: Message) -> None:
    343        """
    344        Add an incoming message to the appropriate queue/handler.
    345
    346        :raise ServerParseError: When Message indicates server parse failure.
    347        """
    348        # Incoming messages are not fully parsed/validated here;
    349        # do only light peeking to know how to route the messages.
    350
    351        if 'event' in msg:
    352            await self._event_dispatch(msg)
    353            return
    354
    355        # Below, we assume everything left is an execute/exec-oob response.
    356
    357        exec_id = cast(Optional[str], msg.get('id'))
    358
    359        if exec_id in self._pending:
    360            await self._pending[exec_id].put(msg)
    361            return
    362
    363        # We have a message we can't route back to a caller.
    364
    365        is_error = 'error' in msg
    366        has_id = 'id' in msg
    367
    368        if is_error and not has_id:
    369            # This is very likely a server parsing error.
    370            # It doesn't inherently belong to any pending execution.
    371            # Instead of performing clever recovery, just terminate.
    372            # See "NOTE" in qmp-spec.txt, section 2.4.2
    373            raise ServerParseError(
    374                ("Server sent an error response without an ID, "
    375                 "but there are no ID-less executions pending. "
    376                 "Assuming this is a server parser failure."),
    377                msg
    378            )
    379
    380        # qmp-spec.txt, section 2.4:
    381        # 'Clients should drop all the responses
    382        # that have an unknown "id" field.'
    383        self.logger.log(
    384            logging.ERROR if is_error else logging.WARNING,
    385            "Unknown ID '%s', message dropped.",
    386            exec_id,
    387        )
    388        self.logger.debug("Unroutable message: %s", str(msg))
    389
    390    @upper_half
    391    @bottom_half
    392    async def _do_recv(self) -> Message:
    393        """
    394        :raise OSError: When a stream error is encountered.
    395        :raise EOFError: When the stream is at EOF.
    396        :raise ProtocolError:
    397            When the Message is not understood.
    398            See also `Message._deserialize`.
    399
    400        :return: A single QMP `Message`.
    401        """
    402        msg_bytes = await self._readline()
    403        msg = Message(msg_bytes, eager=True)
    404        return msg
    405
    406    @upper_half
    407    @bottom_half
    408    def _do_send(self, msg: Message) -> None:
    409        """
    410        :raise ValueError: JSON serialization failure
    411        :raise TypeError: JSON serialization failure
    412        :raise OSError: When a stream error is encountered.
    413        """
    414        assert self._writer is not None
    415        self._writer.write(bytes(msg))
    416
    417    @upper_half
    418    def _get_exec_id(self) -> str:
    419        exec_id = f"__aqmp#{self._execute_id:05d}"
    420        self._execute_id += 1
    421        return exec_id
    422
    423    @upper_half
    424    async def _issue(self, msg: Message) -> Union[None, str]:
    425        """
    426        Issue a QMP `Message` and do not wait for a reply.
    427
    428        :param msg: The QMP `Message` to send to the server.
    429
    430        :return: The ID of the `Message` sent.
    431        """
    432        msg_id: Optional[str] = None
    433        if 'id' in msg:
    434            assert isinstance(msg['id'], str)
    435            msg_id = msg['id']
    436
    437        self._pending[msg_id] = asyncio.Queue(maxsize=1)
    438        await self._outgoing.put(msg)
    439
    440        return msg_id
    441
    442    @upper_half
    443    async def _reply(self, msg_id: Union[str, None]) -> Message:
    444        """
    445        Await a reply to a previously issued QMP message.
    446
    447        :param msg_id: The ID of the previously issued message.
    448
    449        :return: The reply from the server.
    450        :raise ExecInterruptedError:
    451            When the reply could not be retrieved because the connection
    452            was lost, or some other problem.
    453        """
    454        queue = self._pending[msg_id]
    455        result = await queue.get()
    456
    457        try:
    458            if isinstance(result, ExecInterruptedError):
    459                raise result
    460            return result
    461        finally:
    462            del self._pending[msg_id]
    463
    464    @upper_half
    465    async def _execute(self, msg: Message, assign_id: bool = True) -> Message:
    466        """
    467        Send a QMP `Message` to the server and await a reply.
    468
    469        This method *assumes* you are sending some kind of an execute
    470        statement that *will* receive a reply.
    471
    472        An execution ID will be assigned if assign_id is `True`. It can be
    473        disabled, but this requires that an ID is manually assigned
    474        instead. For manually assigned IDs, you must not use the string
    475        '__aqmp#' anywhere in the ID.
    476
    477        :param msg: The QMP `Message` to execute.
    478        :param assign_id: If True, assign a new execution ID.
    479
    480        :return: Execution reply from the server.
    481        :raise ExecInterruptedError:
    482            When the reply could not be retrieved because the connection
    483            was lost, or some other problem.
    484        """
    485        if assign_id:
    486            msg['id'] = self._get_exec_id()
    487        elif 'id' in msg:
    488            assert isinstance(msg['id'], str)
    489            assert '__aqmp#' not in msg['id']
    490
    491        exec_id = await self._issue(msg)
    492        return await self._reply(exec_id)
    493
    494    @upper_half
    495    @require(Runstate.RUNNING)
    496    async def _raw(
    497            self,
    498            msg: Union[Message, Mapping[str, object], bytes],
    499            assign_id: bool = True,
    500    ) -> Message:
    501        """
    502        Issue a raw `Message` to the QMP server and await a reply.
    503
    504        :param msg:
    505            A Message to send to the server. It may be a `Message`, any
    506            Mapping (including Dict), or raw bytes.
    507        :param assign_id:
    508            Assign an arbitrary execution ID to this message. If
    509            `False`, the existing id must either be absent (and no other
    510            such pending execution may omit an ID) or a string. If it is
    511            a string, it must not start with '__aqmp#' and no other such
    512            pending execution may currently be using that ID.
    513
    514        :return: Execution reply from the server.
    515
    516        :raise ExecInterruptedError:
    517            When the reply could not be retrieved because the connection
    518            was lost, or some other problem.
    519        :raise TypeError:
    520            When assign_id is `False`, an ID is given, and it is not a string.
    521        :raise ValueError:
    522            When assign_id is `False`, but the ID is not usable;
    523            Either because it starts with '__aqmp#' or it is already in-use.
    524        """
    525        # 1. convert generic Mapping or bytes to a QMP Message
    526        # 2. copy Message objects so that we assign an ID only to the copy.
    527        msg = Message(msg)
    528
    529        exec_id = msg.get('id')
    530        if not assign_id and 'id' in msg:
    531            if not isinstance(exec_id, str):
    532                raise TypeError(f"ID ('{exec_id}') must be a string.")
    533            if exec_id.startswith('__aqmp#'):
    534                raise ValueError(
    535                    f"ID ('{exec_id}') must not start with '__aqmp#'."
    536                )
    537
    538        if not assign_id and exec_id in self._pending:
    539            raise ValueError(
    540                f"ID '{exec_id}' is in-use and cannot be used."
    541            )
    542
    543        return await self._execute(msg, assign_id=assign_id)
    544
    545    @upper_half
    546    @require(Runstate.RUNNING)
    547    async def execute_msg(self, msg: Message) -> object:
    548        """
    549        Execute a QMP command and return its value.
    550
    551        :param msg: The QMP `Message` to execute.
    552
    553        :return:
    554            The command execution return value from the server. The type of
    555            object returned depends on the command that was issued,
    556            though most in QEMU return a `dict`.
    557        :raise ValueError:
    558            If the QMP `Message` does not have either the 'execute' or
    559            'exec-oob' fields set.
    560        :raise ExecuteError: When the server returns an error response.
    561        :raise ExecInterruptedError: if the connection was terminated early.
    562        """
    563        if not ('execute' in msg or 'exec-oob' in msg):
    564            raise ValueError("Requires 'execute' or 'exec-oob' message")
    565
    566        # Copy the Message so that the ID assigned by _execute() is
    567        # local to this method; allowing the ID to be seen in raised
    568        # Exceptions but without modifying the caller's held copy.
    569        msg = Message(msg)
    570        reply = await self._execute(msg)
    571
    572        if 'error' in reply:
    573            try:
    574                error_response = ErrorResponse(reply)
    575            except (KeyError, TypeError) as err:
    576                # Error response was malformed.
    577                raise BadReplyError(
    578                    "QMP error reply is malformed", reply, msg,
    579                ) from err
    580
    581            raise ExecuteError(error_response, msg, reply)
    582
    583        if 'return' not in reply:
    584            raise BadReplyError(
    585                "QMP reply is missing a 'error' or 'return' member",
    586                reply, msg,
    587            )
    588
    589        return reply['return']
    590
    591    @classmethod
    592    def make_execute_msg(cls, cmd: str,
    593                         arguments: Optional[Mapping[str, object]] = None,
    594                         oob: bool = False) -> Message:
    595        """
    596        Create an executable message to be sent by `execute_msg` later.
    597
    598        :param cmd: QMP command name.
    599        :param arguments: Arguments (if any). Must be JSON-serializable.
    600        :param oob: If `True`, execute "out of band".
    601
    602        :return: An executable QMP `Message`.
    603        """
    604        msg = Message({'exec-oob' if oob else 'execute': cmd})
    605        if arguments is not None:
    606            msg['arguments'] = arguments
    607        return msg
    608
    609    @upper_half
    610    async def execute(self, cmd: str,
    611                      arguments: Optional[Mapping[str, object]] = None,
    612                      oob: bool = False) -> object:
    613        """
    614        Execute a QMP command and return its value.
    615
    616        :param cmd: QMP command name.
    617        :param arguments: Arguments (if any). Must be JSON-serializable.
    618        :param oob: If `True`, execute "out of band".
    619
    620        :return:
    621            The command execution return value from the server. The type of
    622            object returned depends on the command that was issued,
    623            though most in QEMU return a `dict`.
    624        :raise ExecuteError: When the server returns an error response.
    625        :raise ExecInterruptedError: if the connection was terminated early.
    626        """
    627        msg = self.make_execute_msg(cmd, arguments, oob=oob)
    628        return await self.execute_msg(msg)
    629
    630    @upper_half
    631    @require(Runstate.RUNNING)
    632    def send_fd_scm(self, fd: int) -> None:
    633        """
    634        Send a file descriptor to the remote via SCM_RIGHTS.
    635        """
    636        assert self._writer is not None
    637        sock = self._writer.transport.get_extra_info('socket')
    638
    639        if sock.family != socket.AF_UNIX:
    640            raise AQMPError("Sending file descriptors requires a UNIX socket.")
    641
    642        # Void the warranty sticker.
    643        # Access to sendmsg in asyncio is scheduled for removal in Python 3.11.
    644        sock = sock._sock  # pylint: disable=protected-access
    645        sock.sendmsg(
    646            [b' '],
    647            [(socket.SOL_SOCKET, socket.SCM_RIGHTS, struct.pack('@i', fd))]
    648        )