cachepc-qemu

Fork of AMDESE/qemu with changes for cachepc side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-qemu
Log | Files | Refs | Submodules | LICENSE | sfeed.txt

events.py (22656B)


      1"""
      2AQMP Events and EventListeners
      3
      4Asynchronous QMP uses `EventListener` objects to listen for events. An
      5`EventListener` is a FIFO event queue that can be pre-filtered to listen
      6for only specific events. Each `EventListener` instance receives its own
      7copy of events that it hears, so events may be consumed without fear or
      8worry for depriving other listeners of events they need to hear.
      9
     10
     11EventListener Tutorial
     12----------------------
     13
     14In all of the following examples, we assume that we have a `QMPClient`
     15instantiated named ``qmp`` that is already connected.
     16
     17
     18`listener()` context blocks with one name
     19~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
     20
     21The most basic usage is by using the `listener()` context manager to
     22construct them:
     23
     24.. code:: python
     25
     26   with qmp.listener('STOP') as listener:
     27       await qmp.execute('stop')
     28       await listener.get()
     29
     30The listener is active only for the duration of the ‘with’ block. This
     31instance listens only for ‘STOP’ events.
     32
     33
     34`listener()` context blocks with two or more names
     35~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
     36
     37Multiple events can be selected for by providing any ``Iterable[str]``:
     38
     39.. code:: python
     40
     41   with qmp.listener(('STOP', 'RESUME')) as listener:
     42       await qmp.execute('stop')
     43       event = await listener.get()
     44       assert event['event'] == 'STOP'
     45
     46       await qmp.execute('cont')
     47       event = await listener.get()
     48       assert event['event'] == 'RESUME'
     49
     50
     51`listener()` context blocks with no names
     52~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
     53
     54By omitting names entirely, you can listen to ALL events.
     55
     56.. code:: python
     57
     58   with qmp.listener() as listener:
     59       await qmp.execute('stop')
     60       event = await listener.get()
     61       assert event['event'] == 'STOP'
     62
     63This isn’t a very good use case for this feature: In a non-trivial
     64running system, we may not know what event will arrive next. Grabbing
     65the top of a FIFO queue returning multiple kinds of events may be prone
     66to error.
     67
     68
     69Using async iterators to retrieve events
     70~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
     71
     72If you’d like to simply watch what events happen to arrive, you can use
     73the listener as an async iterator:
     74
     75.. code:: python
     76
     77   with qmp.listener() as listener:
     78       async for event in listener:
     79           print(f"Event arrived: {event['event']}")
     80
     81This is analogous to the following code:
     82
     83.. code:: python
     84
     85   with qmp.listener() as listener:
     86       while True:
     87           event = listener.get()
     88           print(f"Event arrived: {event['event']}")
     89
     90This event stream will never end, so these blocks will never terminate.
     91
     92
     93Using asyncio.Task to concurrently retrieve events
     94~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
     95
     96Since a listener’s event stream will never terminate, it is not likely
     97useful to use that form in a script. For longer-running clients, we can
     98create event handlers by using `asyncio.Task` to create concurrent
     99coroutines:
    100
    101.. code:: python
    102
    103   async def print_events(listener):
    104       try:
    105           async for event in listener:
    106               print(f"Event arrived: {event['event']}")
    107       except asyncio.CancelledError:
    108           return
    109
    110   with qmp.listener() as listener:
    111       task = asyncio.Task(print_events(listener))
    112       await qmp.execute('stop')
    113       await qmp.execute('cont')
    114       task.cancel()
    115       await task
    116
    117However, there is no guarantee that these events will be received by the
    118time we leave this context block. Once the context block is exited, the
    119listener will cease to hear any new events, and becomes inert.
    120
    121Be mindful of the timing: the above example will *probably*– but does
    122not *guarantee*– that both STOP/RESUMED events will be printed. The
    123example below outlines how to use listeners outside of a context block.
    124
    125
    126Using `register_listener()` and `remove_listener()`
    127~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    128
    129To create a listener with a longer lifetime, beyond the scope of a
    130single block, create a listener and then call `register_listener()`:
    131
    132.. code:: python
    133
    134   class MyClient:
    135       def __init__(self, qmp):
    136           self.qmp = qmp
    137           self.listener = EventListener()
    138
    139       async def print_events(self):
    140           try:
    141               async for event in self.listener:
    142                   print(f"Event arrived: {event['event']}")
    143           except asyncio.CancelledError:
    144               return
    145
    146       async def run(self):
    147           self.task = asyncio.Task(self.print_events)
    148           self.qmp.register_listener(self.listener)
    149           await qmp.execute('stop')
    150           await qmp.execute('cont')
    151
    152       async def stop(self):
    153           self.task.cancel()
    154           await self.task
    155           self.qmp.remove_listener(self.listener)
    156
    157The listener can be deactivated by using `remove_listener()`. When it is
    158removed, any possible pending events are cleared and it can be
    159re-registered at a later time.
    160
    161
    162Using the built-in all events listener
    163~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    164
    165The `QMPClient` object creates its own default listener named
    166:py:obj:`~Events.events` that can be used for the same purpose without
    167having to create your own:
    168
    169.. code:: python
    170
    171   async def print_events(listener):
    172       try:
    173           async for event in listener:
    174               print(f"Event arrived: {event['event']}")
    175       except asyncio.CancelledError:
    176           return
    177
    178   task = asyncio.Task(print_events(qmp.events))
    179
    180   await qmp.execute('stop')
    181   await qmp.execute('cont')
    182
    183   task.cancel()
    184   await task
    185
    186
    187Using both .get() and async iterators
    188~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    189
    190The async iterator and `get()` methods pull events from the same FIFO
    191queue. If you mix the usage of both, be aware: Events are emitted
    192precisely once per listener.
    193
    194If multiple contexts try to pull events from the same listener instance,
    195events are still emitted only precisely once.
    196
    197This restriction can be lifted by creating additional listeners.
    198
    199
    200Creating multiple listeners
    201~~~~~~~~~~~~~~~~~~~~~~~~~~~
    202
    203Additional `EventListener` objects can be created at-will. Each one
    204receives its own copy of events, with separate FIFO event queues.
    205
    206.. code:: python
    207
    208   my_listener = EventListener()
    209   qmp.register_listener(my_listener)
    210
    211   await qmp.execute('stop')
    212   copy1 = await my_listener.get()
    213   copy2 = await qmp.events.get()
    214
    215   assert copy1 == copy2
    216
    217In this example, we await an event from both a user-created
    218`EventListener` and the built-in events listener. Both receive the same
    219event.
    220
    221
    222Clearing listeners
    223~~~~~~~~~~~~~~~~~~
    224
    225`EventListener` objects can be cleared, clearing all events seen thus far:
    226
    227.. code:: python
    228
    229   await qmp.execute('stop')
    230   qmp.events.clear()
    231   await qmp.execute('cont')
    232   event = await qmp.events.get()
    233   assert event['event'] == 'RESUME'
    234
    235`EventListener` objects are FIFO queues. If events are not consumed,
    236they will remain in the queue until they are witnessed or discarded via
    237`clear()`. FIFO queues will be drained automatically upon leaving a
    238context block, or when calling `remove_listener()`.
    239
    240
    241Accessing listener history
    242~~~~~~~~~~~~~~~~~~~~~~~~~~
    243
    244`EventListener` objects record their history. Even after being cleared,
    245you can obtain a record of all events seen so far:
    246
    247.. code:: python
    248
    249   await qmp.execute('stop')
    250   await qmp.execute('cont')
    251   qmp.events.clear()
    252
    253   assert len(qmp.events.history) == 2
    254   assert qmp.events.history[0]['event'] == 'STOP'
    255   assert qmp.events.history[1]['event'] == 'RESUME'
    256
    257The history is updated immediately and does not require the event to be
    258witnessed first.
    259
    260
    261Using event filters
    262~~~~~~~~~~~~~~~~~~~
    263
    264`EventListener` objects can be given complex filtering criteria if names
    265are not sufficient:
    266
    267.. code:: python
    268
    269   def job1_filter(event) -> bool:
    270       event_data = event.get('data', {})
    271       event_job_id = event_data.get('id')
    272       return event_job_id == "job1"
    273
    274   with qmp.listener('JOB_STATUS_CHANGE', job1_filter) as listener:
    275       await qmp.execute('blockdev-backup', arguments={'job-id': 'job1', ...})
    276       async for event in listener:
    277           if event['data']['status'] == 'concluded':
    278               break
    279
    280These filters might be most useful when parameterized. `EventListener`
    281objects expect a function that takes only a single argument (the raw
    282event, as a `Message`) and returns a bool; True if the event should be
    283accepted into the stream. You can create a function that adapts this
    284signature to accept configuration parameters:
    285
    286.. code:: python
    287
    288   def job_filter(job_id: str) -> EventFilter:
    289       def filter(event: Message) -> bool:
    290           return event['data']['id'] == job_id
    291       return filter
    292
    293   with qmp.listener('JOB_STATUS_CHANGE', job_filter('job2')) as listener:
    294       await qmp.execute('blockdev-backup', arguments={'job-id': 'job2', ...})
    295       async for event in listener:
    296           if event['data']['status'] == 'concluded':
    297               break
    298
    299
    300Activating an existing listener with `listen()`
    301~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    302
    303Listeners with complex, long configurations can also be created manually
    304and activated temporarily by using `listen()` instead of `listener()`:
    305
    306.. code:: python
    307
    308   listener = EventListener(('BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED',
    309                             'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY',
    310                             'BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'))
    311
    312   with qmp.listen(listener):
    313       await qmp.execute('blockdev-backup', arguments={'job-id': 'job3', ...})
    314       async for event in listener:
    315           print(event)
    316           if event['event'] == 'BLOCK_JOB_COMPLETED':
    317               break
    318
    319Any events that are not witnessed by the time the block is left will be
    320cleared from the queue; entering the block is an implicit
    321`register_listener()` and leaving the block is an implicit
    322`remove_listener()`.
    323
    324
    325Activating multiple existing listeners with `listen()`
    326~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    327
    328While `listener()` is only capable of creating a single listener,
    329`listen()` is capable of activating multiple listeners simultaneously:
    330
    331.. code:: python
    332
    333   def job_filter(job_id: str) -> EventFilter:
    334       def filter(event: Message) -> bool:
    335           return event['data']['id'] == job_id
    336       return filter
    337
    338   jobA = EventListener('JOB_STATUS_CHANGE', job_filter('jobA'))
    339   jobB = EventListener('JOB_STATUS_CHANGE', job_filter('jobB'))
    340
    341   with qmp.listen(jobA, jobB):
    342       qmp.execute('blockdev-create', arguments={'job-id': 'jobA', ...})
    343       qmp.execute('blockdev-create', arguments={'job-id': 'jobB', ...})
    344
    345       async for event in jobA.get():
    346           if event['data']['status'] == 'concluded':
    347               break
    348       async for event in jobB.get():
    349           if event['data']['status'] == 'concluded':
    350               break
    351
    352
    353Extending the `EventListener` class
    354~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    355
    356In the case that a more specialized `EventListener` is desired to
    357provide either more functionality or more compact syntax for specialized
    358cases, it can be extended.
    359
    360One of the key methods to extend or override is
    361:py:meth:`~EventListener.accept()`. The default implementation checks an
    362incoming message for:
    363
    3641. A qualifying name, if any :py:obj:`~EventListener.names` were
    365   specified at initialization time
    3662. That :py:obj:`~EventListener.event_filter()` returns True.
    367
    368This can be modified however you see fit to change the criteria for
    369inclusion in the stream.
    370
    371For convenience, a ``JobListener`` class could be created that simply
    372bakes in configuration so it does not need to be repeated:
    373
    374.. code:: python
    375
    376   class JobListener(EventListener):
    377       def __init__(self, job_id: str):
    378           super().__init__(('BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED',
    379                             'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY',
    380                             'BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'))
    381           self.job_id = job_id
    382
    383       def accept(self, event) -> bool:
    384           if not super().accept(event):
    385               return False
    386           if event['event'] in ('BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'):
    387               return event['data']['id'] == job_id
    388           return event['data']['device'] == job_id
    389
    390From here on out, you can conjure up a custom-purpose listener that
    391listens only for job-related events for a specific job-id easily:
    392
    393.. code:: python
    394
    395   listener = JobListener('job4')
    396   with qmp.listener(listener):
    397       await qmp.execute('blockdev-backup', arguments={'job-id': 'job4', ...})
    398       async for event in listener:
    399           print(event)
    400           if event['event'] == 'BLOCK_JOB_COMPLETED':
    401               break
    402
    403
    404Experimental Interfaces & Design Issues
    405---------------------------------------
    406
    407These interfaces are not ones I am sure I will keep or otherwise modify
    408heavily.
    409
    410qmp.listener()’s type signature
    411~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    412
    413`listener()` does not return anything, because it was assumed the caller
    414already had a handle to the listener. However, for
    415``qmp.listener(EventListener())`` forms, the caller will not have saved
    416a handle to the listener.
    417
    418Because this function can accept *many* listeners, I found it hard to
    419accurately type in a way where it could be used in both “one” or “many”
    420forms conveniently and in a statically type-safe manner.
    421
    422Ultimately, I removed the return altogether, but perhaps with more time
    423I can work out a way to re-add it.
    424
    425
    426API Reference
    427-------------
    428
    429"""
    430
    431import asyncio
    432from contextlib import contextmanager
    433import logging
    434from typing import (
    435    AsyncIterator,
    436    Callable,
    437    Iterable,
    438    Iterator,
    439    List,
    440    Optional,
    441    Set,
    442    Tuple,
    443    Union,
    444)
    445
    446from .error import AQMPError
    447from .message import Message
    448
    449
    450EventNames = Union[str, Iterable[str], None]
    451EventFilter = Callable[[Message], bool]
    452
    453
    454class ListenerError(AQMPError):
    455    """
    456    Generic error class for `EventListener`-related problems.
    457    """
    458
    459
    460class EventListener:
    461    """
    462    Selectively listens for events with runtime configurable filtering.
    463
    464    This class is designed to be directly usable for the most common cases,
    465    but it can be extended to provide more rigorous control.
    466
    467    :param names:
    468        One or more names of events to listen for.
    469        When not provided, listen for ALL events.
    470    :param event_filter:
    471        An optional event filtering function.
    472        When names are also provided, this acts as a secondary filter.
    473
    474    When ``names`` and ``event_filter`` are both provided, the names
    475    will be filtered first, and then the filter function will be called
    476    second. The event filter function can assume that the format of the
    477    event is a known format.
    478    """
    479    def __init__(
    480        self,
    481        names: EventNames = None,
    482        event_filter: Optional[EventFilter] = None,
    483    ):
    484        # Queue of 'heard' events yet to be witnessed by a caller.
    485        self._queue: 'asyncio.Queue[Message]' = asyncio.Queue()
    486
    487        # Intended as a historical record, NOT a processing queue or backlog.
    488        self._history: List[Message] = []
    489
    490        #: Primary event filter, based on one or more event names.
    491        self.names: Set[str] = set()
    492        if isinstance(names, str):
    493            self.names.add(names)
    494        elif names is not None:
    495            self.names.update(names)
    496
    497        #: Optional, secondary event filter.
    498        self.event_filter: Optional[EventFilter] = event_filter
    499
    500    @property
    501    def history(self) -> Tuple[Message, ...]:
    502        """
    503        A read-only history of all events seen so far.
    504
    505        This represents *every* event, including those not yet witnessed
    506        via `get()` or ``async for``. It persists between `clear()`
    507        calls and is immutable.
    508        """
    509        return tuple(self._history)
    510
    511    def accept(self, event: Message) -> bool:
    512        """
    513        Determine if this listener accepts this event.
    514
    515        This method determines which events will appear in the stream.
    516        The default implementation simply checks the event against the
    517        list of names and the event_filter to decide if this
    518        `EventListener` accepts a given event. It can be
    519        overridden/extended to provide custom listener behavior.
    520
    521        User code is not expected to need to invoke this method.
    522
    523        :param event: The event under consideration.
    524        :return: `True`, if this listener accepts this event.
    525        """
    526        name_ok = (not self.names) or (event['event'] in self.names)
    527        return name_ok and (
    528            (not self.event_filter) or self.event_filter(event)
    529        )
    530
    531    async def put(self, event: Message) -> None:
    532        """
    533        Conditionally put a new event into the FIFO queue.
    534
    535        This method is not designed to be invoked from user code, and it
    536        should not need to be overridden. It is a public interface so
    537        that `QMPClient` has an interface by which it can inform
    538        registered listeners of new events.
    539
    540        The event will be put into the queue if
    541        :py:meth:`~EventListener.accept()` returns `True`.
    542
    543        :param event: The new event to put into the FIFO queue.
    544        """
    545        if not self.accept(event):
    546            return
    547
    548        self._history.append(event)
    549        await self._queue.put(event)
    550
    551    async def get(self) -> Message:
    552        """
    553        Wait for the very next event in this stream.
    554
    555        If one is already available, return that one.
    556        """
    557        return await self._queue.get()
    558
    559    def empty(self) -> bool:
    560        """
    561        Return `True` if there are no pending events.
    562        """
    563        return self._queue.empty()
    564
    565    def clear(self) -> List[Message]:
    566        """
    567        Clear this listener of all pending events.
    568
    569        Called when an `EventListener` is being unregistered, this clears the
    570        pending FIFO queue synchronously. It can be also be used to
    571        manually clear any pending events, if desired.
    572
    573        :return: The cleared events, if any.
    574
    575        .. warning::
    576            Take care when discarding events. Cleared events will be
    577            silently tossed on the floor. All events that were ever
    578            accepted by this listener are visible in `history()`.
    579        """
    580        events = []
    581        while True:
    582            try:
    583                events.append(self._queue.get_nowait())
    584            except asyncio.QueueEmpty:
    585                break
    586
    587        return events
    588
    589    def __aiter__(self) -> AsyncIterator[Message]:
    590        return self
    591
    592    async def __anext__(self) -> Message:
    593        """
    594        Enables the `EventListener` to function as an async iterator.
    595
    596        It may be used like this:
    597
    598        .. code:: python
    599
    600            async for event in listener:
    601                print(event)
    602
    603        These iterators will never terminate of their own accord; you
    604        must provide break conditions or otherwise prepare to run them
    605        in an `asyncio.Task` that can be cancelled.
    606        """
    607        return await self.get()
    608
    609
    610class Events:
    611    """
    612    Events is a mix-in class that adds event functionality to the QMP class.
    613
    614    It's designed specifically as a mix-in for `QMPClient`, and it
    615    relies upon the class it is being mixed into having a 'logger'
    616    property.
    617    """
    618    def __init__(self) -> None:
    619        self._listeners: List[EventListener] = []
    620
    621        #: Default, all-events `EventListener`.
    622        self.events: EventListener = EventListener()
    623        self.register_listener(self.events)
    624
    625        # Parent class needs to have a logger
    626        self.logger: logging.Logger
    627
    628    async def _event_dispatch(self, msg: Message) -> None:
    629        """
    630        Given a new event, propagate it to all of the active listeners.
    631
    632        :param msg: The event to propagate.
    633        """
    634        for listener in self._listeners:
    635            await listener.put(msg)
    636
    637    def register_listener(self, listener: EventListener) -> None:
    638        """
    639        Register and activate an `EventListener`.
    640
    641        :param listener: The listener to activate.
    642        :raise ListenerError: If the given listener is already registered.
    643        """
    644        if listener in self._listeners:
    645            raise ListenerError("Attempted to re-register existing listener")
    646        self.logger.debug("Registering %s.", str(listener))
    647        self._listeners.append(listener)
    648
    649    def remove_listener(self, listener: EventListener) -> None:
    650        """
    651        Unregister and deactivate an `EventListener`.
    652
    653        The removed listener will have its pending events cleared via
    654        `clear()`. The listener can be re-registered later when
    655        desired.
    656
    657        :param listener: The listener to deactivate.
    658        :raise ListenerError: If the given listener is not registered.
    659        """
    660        if listener == self.events:
    661            raise ListenerError("Cannot remove the default listener.")
    662        self.logger.debug("Removing %s.", str(listener))
    663        listener.clear()
    664        self._listeners.remove(listener)
    665
    666    @contextmanager
    667    def listen(self, *listeners: EventListener) -> Iterator[None]:
    668        r"""
    669        Context manager: Temporarily listen with an `EventListener`.
    670
    671        Accepts one or more `EventListener` objects and registers them,
    672        activating them for the duration of the context block.
    673
    674        `EventListener` objects will have any pending events in their
    675        FIFO queue cleared upon exiting the context block, when they are
    676        deactivated.
    677
    678        :param \*listeners: One or more EventListeners to activate.
    679        :raise ListenerError: If the given listener(s) are already active.
    680        """
    681        _added = []
    682
    683        try:
    684            for listener in listeners:
    685                self.register_listener(listener)
    686                _added.append(listener)
    687
    688            yield
    689
    690        finally:
    691            for listener in _added:
    692                self.remove_listener(listener)
    693
    694    @contextmanager
    695    def listener(
    696        self,
    697        names: EventNames = (),
    698        event_filter: Optional[EventFilter] = None
    699    ) -> Iterator[EventListener]:
    700        """
    701        Context manager: Temporarily listen with a new `EventListener`.
    702
    703        Creates an `EventListener` object and registers it, activating
    704        it for the duration of the context block.
    705
    706        :param names:
    707            One or more names of events to listen for.
    708            When not provided, listen for ALL events.
    709        :param event_filter:
    710            An optional event filtering function.
    711            When names are also provided, this acts as a secondary filter.
    712
    713        :return: The newly created and active `EventListener`.
    714        """
    715        listener = EventListener(names, event_filter)
    716        with self.listen(listener):
    717            yield listener