cachepc-qemu

Fork of AMDESE/qemu with changes for cachepc side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-qemu
Log | Files | Refs | Submodules | LICENSE | sfeed.txt

aqmp_tui.py (22156B)


      1# Copyright (c) 2021
      2#
      3# Authors:
      4#  Niteesh Babu G S <niteesh.gs@gmail.com>
      5#
      6# This work is licensed under the terms of the GNU GPL, version 2 or
      7# later.  See the COPYING file in the top-level directory.
      8"""
      9AQMP TUI
     10
     11AQMP TUI is an asynchronous interface built on top the of the AQMP library.
     12It is the successor of QMP-shell and is bought-in as a replacement for it.
     13
     14Example Usage: aqmp-tui <SOCKET | TCP IP:PORT>
     15Full Usage: aqmp-tui --help
     16"""
     17
     18import argparse
     19import asyncio
     20import json
     21import logging
     22from logging import Handler, LogRecord
     23import signal
     24from typing import (
     25    List,
     26    Optional,
     27    Tuple,
     28    Type,
     29    Union,
     30    cast,
     31)
     32
     33from pygments import lexers
     34from pygments import token as Token
     35import urwid
     36import urwid_readline
     37
     38from ..qmp import QEMUMonitorProtocol, QMPBadPortError
     39from .error import ProtocolError
     40from .message import DeserializationError, Message, UnexpectedTypeError
     41from .protocol import ConnectError, Runstate
     42from .qmp_client import ExecInterruptedError, QMPClient
     43from .util import create_task, pretty_traceback
     44
     45
     46# The name of the signal that is used to update the history list
     47UPDATE_MSG: str = 'UPDATE_MSG'
     48
     49
     50palette = [
     51    (Token.Punctuation, '', '', '', 'h15,bold', 'g7'),
     52    (Token.Text, '', '', '', '', 'g7'),
     53    (Token.Name.Tag, '', '', '', 'bold,#f88', 'g7'),
     54    (Token.Literal.Number.Integer, '', '', '', '#fa0', 'g7'),
     55    (Token.Literal.String.Double, '', '', '', '#6f6', 'g7'),
     56    (Token.Keyword.Constant, '', '', '', '#6af', 'g7'),
     57    ('DEBUG', '', '', '', '#ddf', 'g7'),
     58    ('INFO', '', '', '', 'g100', 'g7'),
     59    ('WARNING', '', '', '', '#ff6', 'g7'),
     60    ('ERROR', '', '', '', '#a00', 'g7'),
     61    ('CRITICAL', '', '', '', '#a00', 'g7'),
     62    ('background', '', 'black', '', '', 'g7'),
     63]
     64
     65
     66def format_json(msg: str) -> str:
     67    """
     68    Formats valid/invalid multi-line JSON message into a single-line message.
     69
     70    Formatting is first tried using the standard json module. If that fails
     71    due to an decoding error then a simple string manipulation is done to
     72    achieve a single line JSON string.
     73
     74    Converting into single line is more asthetically pleasing when looking
     75    along with error messages.
     76
     77    Eg:
     78    Input:
     79          [ 1,
     80            true,
     81            3 ]
     82    The above input is not a valid QMP message and produces the following error
     83    "QMP message is not a JSON object."
     84    When displaying this in TUI in multiline mode we get
     85
     86        [ 1,
     87          true,
     88          3 ]: QMP message is not a JSON object.
     89
     90    whereas in singleline mode we get the following
     91
     92        [1, true, 3]: QMP message is not a JSON object.
     93
     94    The single line mode is more asthetically pleasing.
     95
     96    :param msg:
     97        The message to formatted into single line.
     98
     99    :return: Formatted singleline message.
    100    """
    101    try:
    102        msg = json.loads(msg)
    103        return str(json.dumps(msg))
    104    except json.decoder.JSONDecodeError:
    105        msg = msg.replace('\n', '')
    106        words = msg.split(' ')
    107        words = list(filter(None, words))
    108        return ' '.join(words)
    109
    110
    111def has_handler_type(logger: logging.Logger,
    112                     handler_type: Type[Handler]) -> bool:
    113    """
    114    The Logger class has no interface to check if a certain type of handler is
    115    installed or not. So we provide an interface to do so.
    116
    117    :param logger:
    118        Logger object
    119    :param handler_type:
    120        The type of the handler to be checked.
    121
    122    :return: returns True if handler of type `handler_type`.
    123    """
    124    for handler in logger.handlers:
    125        if isinstance(handler, handler_type):
    126            return True
    127    return False
    128
    129
    130class App(QMPClient):
    131    """
    132    Implements the AQMP TUI.
    133
    134    Initializes the widgets and starts the urwid event loop.
    135
    136    :param address:
    137        Address of the server to connect to.
    138    :param num_retries:
    139        The number of times to retry before stopping to reconnect.
    140    :param retry_delay:
    141        The delay(sec) before each retry
    142    """
    143    def __init__(self, address: Union[str, Tuple[str, int]], num_retries: int,
    144                 retry_delay: Optional[int]) -> None:
    145        urwid.register_signal(type(self), UPDATE_MSG)
    146        self.window = Window(self)
    147        self.address = address
    148        self.aloop: Optional[asyncio.AbstractEventLoop] = None
    149        self.num_retries = num_retries
    150        self.retry_delay = retry_delay if retry_delay else 2
    151        self.retry: bool = False
    152        self.exiting: bool = False
    153        super().__init__()
    154
    155    def add_to_history(self, msg: str, level: Optional[str] = None) -> None:
    156        """
    157        Appends the msg to the history list.
    158
    159        :param msg:
    160            The raw message to be appended in string type.
    161        """
    162        urwid.emit_signal(self, UPDATE_MSG, msg, level)
    163
    164    def _cb_outbound(self, msg: Message) -> Message:
    165        """
    166        Callback: outbound message hook.
    167
    168        Appends the outgoing messages to the history box.
    169
    170        :param msg: raw outbound message.
    171        :return: final outbound message.
    172        """
    173        str_msg = str(msg)
    174
    175        if not has_handler_type(logging.getLogger(), TUILogHandler):
    176            logging.debug('Request: %s', str_msg)
    177        self.add_to_history('<-- ' + str_msg)
    178        return msg
    179
    180    def _cb_inbound(self, msg: Message) -> Message:
    181        """
    182        Callback: outbound message hook.
    183
    184        Appends the incoming messages to the history box.
    185
    186        :param msg: raw inbound message.
    187        :return: final inbound message.
    188        """
    189        str_msg = str(msg)
    190
    191        if not has_handler_type(logging.getLogger(), TUILogHandler):
    192            logging.debug('Request: %s', str_msg)
    193        self.add_to_history('--> ' + str_msg)
    194        return msg
    195
    196    async def _send_to_server(self, msg: Message) -> None:
    197        """
    198        This coroutine sends the message to the server.
    199        The message has to be pre-validated.
    200
    201        :param msg:
    202            Pre-validated message to be to sent to the server.
    203
    204        :raise Exception: When an unhandled exception is caught.
    205        """
    206        try:
    207            await self._raw(msg, assign_id='id' not in msg)
    208        except ExecInterruptedError as err:
    209            logging.info('Error server disconnected before reply %s', str(err))
    210            self.add_to_history('Server disconnected before reply', 'ERROR')
    211        except Exception as err:
    212            logging.error('Exception from _send_to_server: %s', str(err))
    213            raise err
    214
    215    def cb_send_to_server(self, raw_msg: str) -> None:
    216        """
    217        Validates and sends the message to the server.
    218        The raw string message is first converted into a Message object
    219        and is then sent to the server.
    220
    221        :param raw_msg:
    222            The raw string message to be sent to the server.
    223
    224        :raise Exception: When an unhandled exception is caught.
    225        """
    226        try:
    227            msg = Message(bytes(raw_msg, encoding='utf-8'))
    228            create_task(self._send_to_server(msg))
    229        except (DeserializationError, UnexpectedTypeError) as err:
    230            raw_msg = format_json(raw_msg)
    231            logging.info('Invalid message: %s', err.error_message)
    232            self.add_to_history(f'{raw_msg}: {err.error_message}', 'ERROR')
    233
    234    def unhandled_input(self, key: str) -> None:
    235        """
    236        Handle's keys which haven't been handled by the child widgets.
    237
    238        :param key:
    239            Unhandled key
    240        """
    241        if key == 'esc':
    242            self.kill_app()
    243
    244    def kill_app(self) -> None:
    245        """
    246        Initiates killing of app. A bridge between asynchronous and synchronous
    247        code.
    248        """
    249        create_task(self._kill_app())
    250
    251    async def _kill_app(self) -> None:
    252        """
    253        This coroutine initiates the actual disconnect process and calls
    254        urwid.ExitMainLoop() to kill the TUI.
    255
    256        :raise Exception: When an unhandled exception is caught.
    257        """
    258        self.exiting = True
    259        await self.disconnect()
    260        logging.debug('Disconnect finished. Exiting app')
    261        raise urwid.ExitMainLoop()
    262
    263    async def disconnect(self) -> None:
    264        """
    265        Overrides the disconnect method to handle the errors locally.
    266        """
    267        try:
    268            await super().disconnect()
    269        except (OSError, EOFError) as err:
    270            logging.info('disconnect: %s', str(err))
    271            self.retry = True
    272        except ProtocolError as err:
    273            logging.info('disconnect: %s', str(err))
    274        except Exception as err:
    275            logging.error('disconnect: Unhandled exception %s', str(err))
    276            raise err
    277
    278    def _set_status(self, msg: str) -> None:
    279        """
    280        Sets the message as the status.
    281
    282        :param msg:
    283            The message to be displayed in the status bar.
    284        """
    285        self.window.footer.set_text(msg)
    286
    287    def _get_formatted_address(self) -> str:
    288        """
    289        Returns a formatted version of the server's address.
    290
    291        :return: formatted address
    292        """
    293        if isinstance(self.address, tuple):
    294            host, port = self.address
    295            addr = f'{host}:{port}'
    296        else:
    297            addr = f'{self.address}'
    298        return addr
    299
    300    async def _initiate_connection(self) -> Optional[ConnectError]:
    301        """
    302        Tries connecting to a server a number of times with a delay between
    303        each try. If all retries failed then return the error faced during
    304        the last retry.
    305
    306        :return: Error faced during last retry.
    307        """
    308        current_retries = 0
    309        err = None
    310
    311        # initial try
    312        await self.connect_server()
    313        while self.retry and current_retries < self.num_retries:
    314            logging.info('Connection Failed, retrying in %d', self.retry_delay)
    315            status = f'[Retry #{current_retries} ({self.retry_delay}s)]'
    316            self._set_status(status)
    317
    318            await asyncio.sleep(self.retry_delay)
    319
    320            err = await self.connect_server()
    321            current_retries += 1
    322        # If all retries failed report the last error
    323        if err:
    324            logging.info('All retries failed: %s', err)
    325            return err
    326        return None
    327
    328    async def manage_connection(self) -> None:
    329        """
    330        Manage the connection based on the current run state.
    331
    332        A reconnect is issued when the current state is IDLE and the number
    333        of retries is not exhausted.
    334        A disconnect is issued when the current state is DISCONNECTING.
    335        """
    336        while not self.exiting:
    337            if self.runstate == Runstate.IDLE:
    338                err = await self._initiate_connection()
    339                # If retry is still true then, we have exhausted all our tries.
    340                if err:
    341                    self._set_status(f'[Error: {err.error_message}]')
    342                else:
    343                    addr = self._get_formatted_address()
    344                    self._set_status(f'[Connected {addr}]')
    345            elif self.runstate == Runstate.DISCONNECTING:
    346                self._set_status('[Disconnected]')
    347                await self.disconnect()
    348                # check if a retry is needed
    349                if self.runstate == Runstate.IDLE:
    350                    continue
    351            await self.runstate_changed()
    352
    353    async def connect_server(self) -> Optional[ConnectError]:
    354        """
    355        Initiates a connection to the server at address `self.address`
    356        and in case of a failure, sets the status to the respective error.
    357        """
    358        try:
    359            await self.connect(self.address)
    360            self.retry = False
    361        except ConnectError as err:
    362            logging.info('connect_server: ConnectError %s', str(err))
    363            self.retry = True
    364            return err
    365        return None
    366
    367    def run(self, debug: bool = False) -> None:
    368        """
    369        Starts the long running co-routines and the urwid event loop.
    370
    371        :param debug:
    372            Enables/Disables asyncio event loop debugging
    373        """
    374        screen = urwid.raw_display.Screen()
    375        screen.set_terminal_properties(256)
    376
    377        self.aloop = asyncio.get_event_loop()
    378        self.aloop.set_debug(debug)
    379
    380        # Gracefully handle SIGTERM and SIGINT signals
    381        cancel_signals = [signal.SIGTERM, signal.SIGINT]
    382        for sig in cancel_signals:
    383            self.aloop.add_signal_handler(sig, self.kill_app)
    384
    385        event_loop = urwid.AsyncioEventLoop(loop=self.aloop)
    386        main_loop = urwid.MainLoop(urwid.AttrMap(self.window, 'background'),
    387                                   unhandled_input=self.unhandled_input,
    388                                   screen=screen,
    389                                   palette=palette,
    390                                   handle_mouse=True,
    391                                   event_loop=event_loop)
    392
    393        create_task(self.manage_connection(), self.aloop)
    394        try:
    395            main_loop.run()
    396        except Exception as err:
    397            logging.error('%s\n%s\n', str(err), pretty_traceback())
    398            raise err
    399
    400
    401class StatusBar(urwid.Text):
    402    """
    403    A simple statusbar modelled using the Text widget. The status can be
    404    set using the set_text function. All text set is aligned to right.
    405
    406    :param text: Initial text to be displayed. Default is empty str.
    407    """
    408    def __init__(self, text: str = ''):
    409        super().__init__(text, align='right')
    410
    411
    412class Editor(urwid_readline.ReadlineEdit):
    413    """
    414    A simple editor modelled using the urwid_readline.ReadlineEdit widget.
    415    Mimcs GNU readline shortcuts and provides history support.
    416
    417    The readline shortcuts can be found below:
    418    https://github.com/rr-/urwid_readline#features
    419
    420    Along with the readline features, this editor also has support for
    421    history. Pressing the 'up'/'down' switches between the prev/next messages
    422    available in the history.
    423
    424    Currently there is no support to save the history to a file. The history of
    425    previous commands is lost on exit.
    426
    427    :param parent: Reference to the TUI object.
    428    """
    429    def __init__(self, parent: App) -> None:
    430        super().__init__(caption='> ', multiline=True)
    431        self.parent = parent
    432        self.history: List[str] = []
    433        self.last_index: int = -1
    434        self.show_history: bool = False
    435
    436    def keypress(self, size: Tuple[int, int], key: str) -> Optional[str]:
    437        """
    438        Handles the keypress on this widget.
    439
    440        :param size:
    441            The current size of the widget.
    442        :param key:
    443            The key to be handled.
    444
    445        :return: Unhandled key if any.
    446        """
    447        msg = self.get_edit_text()
    448        if key == 'up' and not msg:
    449            # Show the history when 'up arrow' is pressed with no input text.
    450            # NOTE: The show_history logic is necessary because in 'multiline'
    451            # mode (which we use) 'up arrow' is used to move between lines.
    452            if not self.history:
    453                return None
    454            self.show_history = True
    455            last_msg = self.history[self.last_index]
    456            self.set_edit_text(last_msg)
    457            self.edit_pos = len(last_msg)
    458        elif key == 'up' and self.show_history:
    459            self.last_index = max(self.last_index - 1, -len(self.history))
    460            self.set_edit_text(self.history[self.last_index])
    461            self.edit_pos = len(self.history[self.last_index])
    462        elif key == 'down' and self.show_history:
    463            if self.last_index == -1:
    464                self.set_edit_text('')
    465                self.show_history = False
    466            else:
    467                self.last_index += 1
    468                self.set_edit_text(self.history[self.last_index])
    469                self.edit_pos = len(self.history[self.last_index])
    470        elif key == 'meta enter':
    471            # When using multiline, enter inserts a new line into the editor
    472            # send the input to the server on alt + enter
    473            self.parent.cb_send_to_server(msg)
    474            self.history.append(msg)
    475            self.set_edit_text('')
    476            self.last_index = -1
    477            self.show_history = False
    478        else:
    479            self.show_history = False
    480            self.last_index = -1
    481            return cast(Optional[str], super().keypress(size, key))
    482        return None
    483
    484
    485class EditorWidget(urwid.Filler):
    486    """
    487    Wrapper around the editor widget.
    488
    489    The Editor is a flow widget and has to wrapped inside a box widget.
    490    This class wraps the Editor inside filler widget.
    491
    492    :param parent: Reference to the TUI object.
    493    """
    494    def __init__(self, parent: App) -> None:
    495        super().__init__(Editor(parent), valign='top')
    496
    497
    498class HistoryBox(urwid.ListBox):
    499    """
    500    This widget is modelled using the ListBox widget, contains the list of
    501    all messages both QMP messages and log messsages to be shown in the TUI.
    502
    503    The messages are urwid.Text widgets. On every append of a message, the
    504    focus is shifted to the last appended message.
    505
    506    :param parent: Reference to the TUI object.
    507    """
    508    def __init__(self, parent: App) -> None:
    509        self.parent = parent
    510        self.history = urwid.SimpleFocusListWalker([])
    511        super().__init__(self.history)
    512
    513    def add_to_history(self,
    514                       history: Union[str, List[Tuple[str, str]]]) -> None:
    515        """
    516        Appends a message to the list and set the focus to the last appended
    517        message.
    518
    519        :param history:
    520            The history item(message/event) to be appended to the list.
    521        """
    522        self.history.append(urwid.Text(history))
    523        self.history.set_focus(len(self.history) - 1)
    524
    525    def mouse_event(self, size: Tuple[int, int], _event: str, button: float,
    526                    _x: int, _y: int, focus: bool) -> None:
    527        # Unfortunately there are no urwid constants that represent the mouse
    528        # events.
    529        if button == 4:  # Scroll up event
    530            super().keypress(size, 'up')
    531        elif button == 5:  # Scroll down event
    532            super().keypress(size, 'down')
    533
    534
    535class HistoryWindow(urwid.Frame):
    536    """
    537    This window composes the HistoryBox and EditorWidget in a horizontal split.
    538    By default the first focus is given to the history box.
    539
    540    :param parent: Reference to the TUI object.
    541    """
    542    def __init__(self, parent: App) -> None:
    543        self.parent = parent
    544        self.editor_widget = EditorWidget(parent)
    545        self.editor = urwid.LineBox(self.editor_widget)
    546        self.history = HistoryBox(parent)
    547        self.body = urwid.Pile([('weight', 80, self.history),
    548                                ('weight', 20, self.editor)])
    549        super().__init__(self.body)
    550        urwid.connect_signal(self.parent, UPDATE_MSG, self.cb_add_to_history)
    551
    552    def cb_add_to_history(self, msg: str, level: Optional[str] = None) -> None:
    553        """
    554        Appends a message to the history box
    555
    556        :param msg:
    557            The message to be appended to the history box.
    558        :param level:
    559            The log level of the message, if it is a log message.
    560        """
    561        formatted = []
    562        if level:
    563            msg = f'[{level}]: {msg}'
    564            formatted.append((level, msg))
    565        else:
    566            lexer = lexers.JsonLexer()  # pylint: disable=no-member
    567            for token in lexer.get_tokens(msg):
    568                formatted.append(token)
    569        self.history.add_to_history(formatted)
    570
    571
    572class Window(urwid.Frame):
    573    """
    574    This window is the top most widget of the TUI and will contain other
    575    windows. Each child of this widget is responsible for displaying a specific
    576    functionality.
    577
    578    :param parent: Reference to the TUI object.
    579    """
    580    def __init__(self, parent: App) -> None:
    581        self.parent = parent
    582        footer = StatusBar()
    583        body = HistoryWindow(parent)
    584        super().__init__(body, footer=footer)
    585
    586
    587class TUILogHandler(Handler):
    588    """
    589    This handler routes all the log messages to the TUI screen.
    590    It is installed to the root logger to so that the log message from all
    591    libraries begin used is routed to the screen.
    592
    593    :param tui: Reference to the TUI object.
    594    """
    595    def __init__(self, tui: App) -> None:
    596        super().__init__()
    597        self.tui = tui
    598
    599    def emit(self, record: LogRecord) -> None:
    600        """
    601        Emits a record to the TUI screen.
    602
    603        Appends the log message to the TUI screen
    604        """
    605        level = record.levelname
    606        msg = record.getMessage()
    607        self.tui.add_to_history(msg, level)
    608
    609
    610def main() -> None:
    611    """
    612    Driver of the whole script, parses arguments, initialize the TUI and
    613    the logger.
    614    """
    615    parser = argparse.ArgumentParser(description='AQMP TUI')
    616    parser.add_argument('qmp_server', help='Address of the QMP server. '
    617                        'Format <UNIX socket path | TCP addr:port>')
    618    parser.add_argument('--num-retries', type=int, default=10,
    619                        help='Number of times to reconnect before giving up.')
    620    parser.add_argument('--retry-delay', type=int,
    621                        help='Time(s) to wait before next retry. '
    622                        'Default action is to wait 2s between each retry.')
    623    parser.add_argument('--log-file', help='The Log file name')
    624    parser.add_argument('--log-level', default='WARNING',
    625                        help='Log level <CRITICAL|ERROR|WARNING|INFO|DEBUG|>')
    626    parser.add_argument('--asyncio-debug', action='store_true',
    627                        help='Enable debug mode for asyncio loop. '
    628                        'Generates lot of output, makes TUI unusable when '
    629                        'logs are logged in the TUI. '
    630                        'Use only when logging to a file.')
    631    args = parser.parse_args()
    632
    633    try:
    634        address = QEMUMonitorProtocol.parse_address(args.qmp_server)
    635    except QMPBadPortError as err:
    636        parser.error(str(err))
    637
    638    app = App(address, args.num_retries, args.retry_delay)
    639
    640    root_logger = logging.getLogger()
    641    root_logger.setLevel(logging.getLevelName(args.log_level))
    642
    643    if args.log_file:
    644        root_logger.addHandler(logging.FileHandler(args.log_file))
    645    else:
    646        root_logger.addHandler(TUILogHandler(app))
    647
    648    app.run(args.asyncio_debug)
    649
    650
    651if __name__ == '__main__':
    652    main()