cachepc-qemu

Fork of AMDESE/qemu with changes for cachepc side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-qemu
Log | Files | Refs | Submodules | LICENSE | sfeed.txt

util.py (6185B)


      1"""
      2Miscellaneous Utilities
      3
      4This module provides asyncio utilities and compatibility wrappers for
      5Python 3.6 to provide some features that otherwise become available in
      6Python 3.7+.
      7
      8Various logging and debugging utilities are also provided, such as
      9`exception_summary()` and `pretty_traceback()`, used primarily for
     10adding information into the logging stream.
     11"""
     12
     13import asyncio
     14import sys
     15import traceback
     16from typing import (
     17    Any,
     18    Coroutine,
     19    Optional,
     20    TypeVar,
     21    cast,
     22)
     23
     24
     25T = TypeVar('T')
     26
     27
     28# --------------------------
     29# Section: Utility Functions
     30# --------------------------
     31
     32
     33async def flush(writer: asyncio.StreamWriter) -> None:
     34    """
     35    Utility function to ensure a StreamWriter is *fully* drained.
     36
     37    `asyncio.StreamWriter.drain` only promises we will return to below
     38    the "high-water mark". This function ensures we flush the entire
     39    buffer -- by setting the high water mark to 0 and then calling
     40    drain. The flow control limits are restored after the call is
     41    completed.
     42    """
     43    transport = cast(asyncio.WriteTransport, writer.transport)
     44
     45    # https://github.com/python/typeshed/issues/5779
     46    low, high = transport.get_write_buffer_limits()  # type: ignore
     47    transport.set_write_buffer_limits(0, 0)
     48    try:
     49        await writer.drain()
     50    finally:
     51        transport.set_write_buffer_limits(high, low)
     52
     53
     54def upper_half(func: T) -> T:
     55    """
     56    Do-nothing decorator that annotates a method as an "upper-half" method.
     57
     58    These methods must not call bottom-half functions directly, but can
     59    schedule them to run.
     60    """
     61    return func
     62
     63
     64def bottom_half(func: T) -> T:
     65    """
     66    Do-nothing decorator that annotates a method as a "bottom-half" method.
     67
     68    These methods must take great care to handle their own exceptions whenever
     69    possible. If they go unhandled, they will cause termination of the loop.
     70
     71    These methods do not, in general, have the ability to directly
     72    report information to a caller’s context and will usually be
     73    collected as a Task result instead.
     74
     75    They must not call upper-half functions directly.
     76    """
     77    return func
     78
     79
     80# -------------------------------
     81# Section: Compatibility Wrappers
     82# -------------------------------
     83
     84
     85def create_task(coro: Coroutine[Any, Any, T],
     86                loop: Optional[asyncio.AbstractEventLoop] = None
     87                ) -> 'asyncio.Future[T]':
     88    """
     89    Python 3.6-compatible `asyncio.create_task` wrapper.
     90
     91    :param coro: The coroutine to execute in a task.
     92    :param loop: Optionally, the loop to create the task in.
     93
     94    :return: An `asyncio.Future` object.
     95    """
     96    if sys.version_info >= (3, 7):
     97        if loop is not None:
     98            return loop.create_task(coro)
     99        return asyncio.create_task(coro)  # pylint: disable=no-member
    100
    101    # Python 3.6:
    102    return asyncio.ensure_future(coro, loop=loop)
    103
    104
    105def is_closing(writer: asyncio.StreamWriter) -> bool:
    106    """
    107    Python 3.6-compatible `asyncio.StreamWriter.is_closing` wrapper.
    108
    109    :param writer: The `asyncio.StreamWriter` object.
    110    :return: `True` if the writer is closing, or closed.
    111    """
    112    if sys.version_info >= (3, 7):
    113        return writer.is_closing()
    114
    115    # Python 3.6:
    116    transport = writer.transport
    117    assert isinstance(transport, asyncio.WriteTransport)
    118    return transport.is_closing()
    119
    120
    121async def wait_closed(writer: asyncio.StreamWriter) -> None:
    122    """
    123    Python 3.6-compatible `asyncio.StreamWriter.wait_closed` wrapper.
    124
    125    :param writer: The `asyncio.StreamWriter` to wait on.
    126    """
    127    if sys.version_info >= (3, 7):
    128        await writer.wait_closed()
    129        return
    130
    131    # Python 3.6
    132    transport = writer.transport
    133    assert isinstance(transport, asyncio.WriteTransport)
    134
    135    while not transport.is_closing():
    136        await asyncio.sleep(0)
    137
    138    # This is an ugly workaround, but it's the best I can come up with.
    139    sock = transport.get_extra_info('socket')
    140
    141    if sock is None:
    142        # Our transport doesn't have a socket? ...
    143        # Nothing we can reasonably do.
    144        return
    145
    146    while sock.fileno() != -1:
    147        await asyncio.sleep(0)
    148
    149
    150def asyncio_run(coro: Coroutine[Any, Any, T], *, debug: bool = False) -> T:
    151    """
    152    Python 3.6-compatible `asyncio.run` wrapper.
    153
    154    :param coro: A coroutine to execute now.
    155    :return: The return value from the coroutine.
    156    """
    157    if sys.version_info >= (3, 7):
    158        return asyncio.run(coro, debug=debug)
    159
    160    # Python 3.6
    161    loop = asyncio.get_event_loop()
    162    loop.set_debug(debug)
    163    ret = loop.run_until_complete(coro)
    164    loop.close()
    165
    166    return ret
    167
    168
    169# ----------------------------
    170# Section: Logging & Debugging
    171# ----------------------------
    172
    173
    174def exception_summary(exc: BaseException) -> str:
    175    """
    176    Return a summary string of an arbitrary exception.
    177
    178    It will be of the form "ExceptionType: Error Message", if the error
    179    string is non-empty, and just "ExceptionType" otherwise.
    180    """
    181    name = type(exc).__qualname__
    182    smod = type(exc).__module__
    183    if smod not in ("__main__", "builtins"):
    184        name = smod + '.' + name
    185
    186    error = str(exc)
    187    if error:
    188        return f"{name}: {error}"
    189    return name
    190
    191
    192def pretty_traceback(prefix: str = "  | ") -> str:
    193    """
    194    Formats the current traceback, indented to provide visual distinction.
    195
    196    This is useful for printing a traceback within a traceback for
    197    debugging purposes when encapsulating errors to deliver them up the
    198    stack; when those errors are printed, this helps provide a nice
    199    visual grouping to quickly identify the parts of the error that
    200    belong to the inner exception.
    201
    202    :param prefix: The prefix to append to each line of the traceback.
    203    :return: A string, formatted something like the following::
    204
    205      | Traceback (most recent call last):
    206      |   File "foobar.py", line 42, in arbitrary_example
    207      |     foo.baz()
    208      | ArbitraryError: [Errno 42] Something bad happened!
    209    """
    210    output = "".join(traceback.format_exception(*sys.exc_info()))
    211
    212    exc_lines = []
    213    for line in output.split('\n'):
    214        exc_lines.append(prefix + line)
    215
    216    # The last line is always empty, omit it
    217    return "\n".join(exc_lines[:-1])