cachepc-qemu

Fork of AMDESE/qemu with changes for cachepc side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-qemu
Log | Files | Refs | Submodules | LICENSE | sfeed.txt

message.py (6355B)


      1"""
      2QMP Message Format
      3
      4This module provides the `Message` class, which represents a single QMP
      5message sent to or from the server.
      6"""
      7
      8import json
      9from json import JSONDecodeError
     10from typing import (
     11    Dict,
     12    Iterator,
     13    Mapping,
     14    MutableMapping,
     15    Optional,
     16    Union,
     17)
     18
     19from .error import ProtocolError
     20
     21
     22class Message(MutableMapping[str, object]):
     23    """
     24    Represents a single QMP protocol message.
     25
     26    QMP uses JSON objects as its basic communicative unit; so this
     27    Python object is a :py:obj:`~collections.abc.MutableMapping`. It may
     28    be instantiated from either another mapping (like a `dict`), or from
     29    raw `bytes` that still need to be deserialized.
     30
     31    Once instantiated, it may be treated like any other MutableMapping::
     32
     33        >>> msg = Message(b'{"hello": "world"}')
     34        >>> assert msg['hello'] == 'world'
     35        >>> msg['id'] = 'foobar'
     36        >>> print(msg)
     37        {
     38          "hello": "world",
     39          "id": "foobar"
     40        }
     41
     42    It can be converted to `bytes`::
     43
     44        >>> msg = Message({"hello": "world"})
     45        >>> print(bytes(msg))
     46        b'{"hello":"world","id":"foobar"}'
     47
     48    Or back into a garden-variety `dict`::
     49
     50       >>> dict(msg)
     51       {'hello': 'world'}
     52
     53
     54    :param value: Initial value, if any.
     55    :param eager:
     56        When `True`, attempt to serialize or deserialize the initial value
     57        immediately, so that conversion exceptions are raised during
     58        the call to ``__init__()``.
     59    """
     60    # pylint: disable=too-many-ancestors
     61
     62    def __init__(self,
     63                 value: Union[bytes, Mapping[str, object]] = b'{}', *,
     64                 eager: bool = True):
     65        self._data: Optional[bytes] = None
     66        self._obj: Optional[Dict[str, object]] = None
     67
     68        if isinstance(value, bytes):
     69            self._data = value
     70            if eager:
     71                self._obj = self._deserialize(self._data)
     72        else:
     73            self._obj = dict(value)
     74            if eager:
     75                self._data = self._serialize(self._obj)
     76
     77    # Methods necessary to implement the MutableMapping interface, see:
     78    # https://docs.python.org/3/library/collections.abc.html#collections.abc.MutableMapping
     79
     80    # We get pop, popitem, clear, update, setdefault, __contains__,
     81    # keys, items, values, get, __eq__ and __ne__ for free.
     82
     83    def __getitem__(self, key: str) -> object:
     84        return self._object[key]
     85
     86    def __setitem__(self, key: str, value: object) -> None:
     87        self._object[key] = value
     88        self._data = None
     89
     90    def __delitem__(self, key: str) -> None:
     91        del self._object[key]
     92        self._data = None
     93
     94    def __iter__(self) -> Iterator[str]:
     95        return iter(self._object)
     96
     97    def __len__(self) -> int:
     98        return len(self._object)
     99
    100    # Dunder methods not related to MutableMapping:
    101
    102    def __repr__(self) -> str:
    103        if self._obj is not None:
    104            return f"Message({self._object!r})"
    105        return f"Message({bytes(self)!r})"
    106
    107    def __str__(self) -> str:
    108        """Pretty-printed representation of this QMP message."""
    109        return json.dumps(self._object, indent=2)
    110
    111    def __bytes__(self) -> bytes:
    112        """bytes representing this QMP message."""
    113        if self._data is None:
    114            self._data = self._serialize(self._obj or {})
    115        return self._data
    116
    117    # Conversion Methods
    118
    119    @property
    120    def _object(self) -> Dict[str, object]:
    121        """
    122        A `dict` representing this QMP message.
    123
    124        Generated on-demand, if required. This property is private
    125        because it returns an object that could be used to invalidate
    126        the internal state of the `Message` object.
    127        """
    128        if self._obj is None:
    129            self._obj = self._deserialize(self._data or b'{}')
    130        return self._obj
    131
    132    @classmethod
    133    def _serialize(cls, value: object) -> bytes:
    134        """
    135        Serialize a JSON object as `bytes`.
    136
    137        :raise ValueError: When the object cannot be serialized.
    138        :raise TypeError: When the object cannot be serialized.
    139
    140        :return: `bytes` ready to be sent over the wire.
    141        """
    142        return json.dumps(value, separators=(',', ':')).encode('utf-8')
    143
    144    @classmethod
    145    def _deserialize(cls, data: bytes) -> Dict[str, object]:
    146        """
    147        Deserialize JSON `bytes` into a native Python `dict`.
    148
    149        :raise DeserializationError:
    150            If JSON deserialization fails for any reason.
    151        :raise UnexpectedTypeError:
    152            If the data does not represent a JSON object.
    153
    154        :return: A `dict` representing this QMP message.
    155        """
    156        try:
    157            obj = json.loads(data)
    158        except JSONDecodeError as err:
    159            emsg = "Failed to deserialize QMP message."
    160            raise DeserializationError(emsg, data) from err
    161        if not isinstance(obj, dict):
    162            raise UnexpectedTypeError(
    163                "QMP message is not a JSON object.",
    164                obj
    165            )
    166        return obj
    167
    168
    169class DeserializationError(ProtocolError):
    170    """
    171    A QMP message was not understood as JSON.
    172
    173    When this Exception is raised, ``__cause__`` will be set to the
    174    `json.JSONDecodeError` Exception, which can be interrogated for
    175    further details.
    176
    177    :param error_message: Human-readable string describing the error.
    178    :param raw: The raw `bytes` that prompted the failure.
    179    """
    180    def __init__(self, error_message: str, raw: bytes):
    181        super().__init__(error_message)
    182        #: The raw `bytes` that were not understood as JSON.
    183        self.raw: bytes = raw
    184
    185    def __str__(self) -> str:
    186        return "\n".join([
    187            super().__str__(),
    188            f"  raw bytes were: {str(self.raw)}",
    189        ])
    190
    191
    192class UnexpectedTypeError(ProtocolError):
    193    """
    194    A QMP message was JSON, but not a JSON object.
    195
    196    :param error_message: Human-readable string describing the error.
    197    :param value: The deserialized JSON value that wasn't an object.
    198    """
    199    def __init__(self, error_message: str, value: object):
    200        super().__init__(error_message)
    201        #: The JSON value that was expected to be an object.
    202        self.value: object = value
    203
    204    def __str__(self) -> str:
    205        strval = json.dumps(self.value, indent=2)
    206        return "\n".join([
    207            super().__str__(),
    208            f"  json value was: {strval}",
    209        ])