cachepc-qemu

Fork of AMDESE/qemu with changes for cachepc side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-qemu
Log | Files | Refs | Submodules | LICENSE | sfeed.txt

qmp_shell.py (17272B)


      1#
      2# Copyright (C) 2009, 2010 Red Hat Inc.
      3#
      4# Authors:
      5#  Luiz Capitulino <lcapitulino@redhat.com>
      6#
      7# This work is licensed under the terms of the GNU GPL, version 2.  See
      8# the COPYING file in the top-level directory.
      9#
     10
     11"""
     12Low-level QEMU shell on top of QMP.
     13
     14usage: qmp-shell [-h] [-H] [-N] [-v] [-p] qmp_server
     15
     16positional arguments:
     17  qmp_server            < UNIX socket path | TCP address:port >
     18
     19optional arguments:
     20  -h, --help            show this help message and exit
     21  -H, --hmp             Use HMP interface
     22  -N, --skip-negotiation
     23                        Skip negotiate (for qemu-ga)
     24  -v, --verbose         Verbose (echo commands sent and received)
     25  -p, --pretty          Pretty-print JSON
     26
     27
     28Start QEMU with:
     29
     30# qemu [...] -qmp unix:./qmp-sock,server
     31
     32Run the shell:
     33
     34$ qmp-shell ./qmp-sock
     35
     36Commands have the following format:
     37
     38   < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
     39
     40For example:
     41
     42(QEMU) device_add driver=e1000 id=net1
     43{'return': {}}
     44(QEMU)
     45
     46key=value pairs also support Python or JSON object literal subset notations,
     47without spaces. Dictionaries/objects {} are supported as are arrays [].
     48
     49   example-command arg-name1={'key':'value','obj'={'prop':"value"}}
     50
     51Both JSON and Python formatting should work, including both styles of
     52string literal quotes. Both paradigms of literal values should work,
     53including null/true/false for JSON and None/True/False for Python.
     54
     55
     56Transactions have the following multi-line format:
     57
     58   transaction(
     59   action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ]
     60   ...
     61   action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ]
     62   )
     63
     64One line transactions are also supported:
     65
     66   transaction( action-name1 ... )
     67
     68For example:
     69
     70    (QEMU) transaction(
     71    TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1
     72    TRANS> block-dirty-bitmap-clear node=drive0 name=bitmap0
     73    TRANS> )
     74    {"return": {}}
     75    (QEMU)
     76
     77Use the -v and -p options to activate the verbose and pretty-print options,
     78which will echo back the properly formatted JSON-compliant QMP that is being
     79sent to QEMU, which is useful for debugging and documentation generation.
     80"""
     81
     82import argparse
     83import ast
     84import json
     85import logging
     86import os
     87import re
     88import readline
     89import sys
     90from typing import (
     91    Iterator,
     92    List,
     93    NoReturn,
     94    Optional,
     95    Sequence,
     96)
     97
     98from qemu import qmp
     99from qemu.qmp import QMPMessage
    100
    101
    102LOG = logging.getLogger(__name__)
    103
    104
    105class QMPCompleter:
    106    """
    107    QMPCompleter provides a readline library tab-complete behavior.
    108    """
    109    # NB: Python 3.9+ will probably allow us to subclass list[str] directly,
    110    # but pylint as of today does not know that List[str] is simply 'list'.
    111    def __init__(self) -> None:
    112        self._matches: List[str] = []
    113
    114    def append(self, value: str) -> None:
    115        """Append a new valid completion to the list of possibilities."""
    116        return self._matches.append(value)
    117
    118    def complete(self, text: str, state: int) -> Optional[str]:
    119        """readline.set_completer() callback implementation."""
    120        for cmd in self._matches:
    121            if cmd.startswith(text):
    122                if state == 0:
    123                    return cmd
    124                state -= 1
    125        return None
    126
    127
    128class QMPShellError(qmp.QMPError):
    129    """
    130    QMP Shell Base error class.
    131    """
    132
    133
    134class FuzzyJSON(ast.NodeTransformer):
    135    """
    136    This extension of ast.NodeTransformer filters literal "true/false/null"
    137    values in a Python AST and replaces them by proper "True/False/None" values
    138    that Python can properly evaluate.
    139    """
    140
    141    @classmethod
    142    def visit_Name(cls,  # pylint: disable=invalid-name
    143                   node: ast.Name) -> ast.AST:
    144        """
    145        Transform Name nodes with certain values into Constant (keyword) nodes.
    146        """
    147        if node.id == 'true':
    148            return ast.Constant(value=True)
    149        if node.id == 'false':
    150            return ast.Constant(value=False)
    151        if node.id == 'null':
    152            return ast.Constant(value=None)
    153        return node
    154
    155
    156class QMPShell(qmp.QEMUMonitorProtocol):
    157    """
    158    QMPShell provides a basic readline-based QMP shell.
    159
    160    :param address: Address of the QMP server.
    161    :param pretty: Pretty-print QMP messages.
    162    :param verbose: Echo outgoing QMP messages to console.
    163    """
    164    def __init__(self, address: qmp.SocketAddrT,
    165                 pretty: bool = False, verbose: bool = False):
    166        super().__init__(address)
    167        self._greeting: Optional[QMPMessage] = None
    168        self._completer = QMPCompleter()
    169        self._transmode = False
    170        self._actions: List[QMPMessage] = []
    171        self._histfile = os.path.join(os.path.expanduser('~'),
    172                                      '.qmp-shell_history')
    173        self.pretty = pretty
    174        self.verbose = verbose
    175
    176    def close(self) -> None:
    177        # Hook into context manager of parent to save shell history.
    178        self._save_history()
    179        super().close()
    180
    181    def _fill_completion(self) -> None:
    182        cmds = self.cmd('query-commands')
    183        if 'error' in cmds:
    184            return
    185        for cmd in cmds['return']:
    186            self._completer.append(cmd['name'])
    187
    188    def _completer_setup(self) -> None:
    189        self._completer = QMPCompleter()
    190        self._fill_completion()
    191        readline.set_history_length(1024)
    192        readline.set_completer(self._completer.complete)
    193        readline.parse_and_bind("tab: complete")
    194        # NB: default delimiters conflict with some command names
    195        # (eg. query-), clearing everything as it doesn't seem to matter
    196        readline.set_completer_delims('')
    197        try:
    198            readline.read_history_file(self._histfile)
    199        except FileNotFoundError:
    200            pass
    201        except IOError as err:
    202            msg = f"Failed to read history '{self._histfile}': {err!s}"
    203            LOG.warning(msg)
    204
    205    def _save_history(self) -> None:
    206        try:
    207            readline.write_history_file(self._histfile)
    208        except IOError as err:
    209            msg = f"Failed to save history file '{self._histfile}': {err!s}"
    210            LOG.warning(msg)
    211
    212    @classmethod
    213    def _parse_value(cls, val: str) -> object:
    214        try:
    215            return int(val)
    216        except ValueError:
    217            pass
    218
    219        if val.lower() == 'true':
    220            return True
    221        if val.lower() == 'false':
    222            return False
    223        if val.startswith(('{', '[')):
    224            # Try first as pure JSON:
    225            try:
    226                return json.loads(val)
    227            except ValueError:
    228                pass
    229            # Try once again as FuzzyJSON:
    230            try:
    231                tree = ast.parse(val, mode='eval')
    232                transformed = FuzzyJSON().visit(tree)
    233                return ast.literal_eval(transformed)
    234            except (SyntaxError, ValueError):
    235                pass
    236        return val
    237
    238    def _cli_expr(self,
    239                  tokens: Sequence[str],
    240                  parent: qmp.QMPObject) -> None:
    241        for arg in tokens:
    242            (key, sep, val) = arg.partition('=')
    243            if sep != '=':
    244                raise QMPShellError(
    245                    f"Expected a key=value pair, got '{arg!s}'"
    246                )
    247
    248            value = self._parse_value(val)
    249            optpath = key.split('.')
    250            curpath = []
    251            for path in optpath[:-1]:
    252                curpath.append(path)
    253                obj = parent.get(path, {})
    254                if not isinstance(obj, dict):
    255                    msg = 'Cannot use "{:s}" as both leaf and non-leaf key'
    256                    raise QMPShellError(msg.format('.'.join(curpath)))
    257                parent[path] = obj
    258                parent = obj
    259            if optpath[-1] in parent:
    260                if isinstance(parent[optpath[-1]], dict):
    261                    msg = 'Cannot use "{:s}" as both leaf and non-leaf key'
    262                    raise QMPShellError(msg.format('.'.join(curpath)))
    263                raise QMPShellError(f'Cannot set "{key}" multiple times')
    264            parent[optpath[-1]] = value
    265
    266    def _build_cmd(self, cmdline: str) -> Optional[QMPMessage]:
    267        """
    268        Build a QMP input object from a user provided command-line in the
    269        following format:
    270
    271            < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
    272        """
    273        argument_regex = r'''(?:[^\s"']|"(?:\\.|[^"])*"|'(?:\\.|[^'])*')+'''
    274        cmdargs = re.findall(argument_regex, cmdline)
    275        qmpcmd: QMPMessage
    276
    277        # Transactional CLI entry:
    278        if cmdargs and cmdargs[0] == 'transaction(':
    279            self._transmode = True
    280            self._actions = []
    281            cmdargs.pop(0)
    282
    283        # Transactional CLI exit:
    284        if cmdargs and cmdargs[0] == ')' and self._transmode:
    285            self._transmode = False
    286            if len(cmdargs) > 1:
    287                msg = 'Unexpected input after close of Transaction sub-shell'
    288                raise QMPShellError(msg)
    289            qmpcmd = {
    290                'execute': 'transaction',
    291                'arguments': {'actions': self._actions}
    292            }
    293            return qmpcmd
    294
    295        # No args, or no args remaining
    296        if not cmdargs:
    297            return None
    298
    299        if self._transmode:
    300            # Parse and cache this Transactional Action
    301            finalize = False
    302            action = {'type': cmdargs[0], 'data': {}}
    303            if cmdargs[-1] == ')':
    304                cmdargs.pop(-1)
    305                finalize = True
    306            self._cli_expr(cmdargs[1:], action['data'])
    307            self._actions.append(action)
    308            return self._build_cmd(')') if finalize else None
    309
    310        # Standard command: parse and return it to be executed.
    311        qmpcmd = {'execute': cmdargs[0], 'arguments': {}}
    312        self._cli_expr(cmdargs[1:], qmpcmd['arguments'])
    313        return qmpcmd
    314
    315    def _print(self, qmp_message: object) -> None:
    316        jsobj = json.dumps(qmp_message,
    317                           indent=4 if self.pretty else None,
    318                           sort_keys=self.pretty)
    319        print(str(jsobj))
    320
    321    def _execute_cmd(self, cmdline: str) -> bool:
    322        try:
    323            qmpcmd = self._build_cmd(cmdline)
    324        except QMPShellError as err:
    325            print(
    326                f"Error while parsing command line: {err!s}\n"
    327                "command format: <command-name> "
    328                "[arg-name1=arg1] ... [arg-nameN=argN",
    329                file=sys.stderr
    330            )
    331            return True
    332        # For transaction mode, we may have just cached the action:
    333        if qmpcmd is None:
    334            return True
    335        if self.verbose:
    336            self._print(qmpcmd)
    337        resp = self.cmd_obj(qmpcmd)
    338        if resp is None:
    339            print('Disconnected')
    340            return False
    341        self._print(resp)
    342        return True
    343
    344    def connect(self, negotiate: bool = True) -> None:
    345        self._greeting = super().connect(negotiate)
    346        self._completer_setup()
    347
    348    def show_banner(self,
    349                    msg: str = 'Welcome to the QMP low-level shell!') -> None:
    350        """
    351        Print to stdio a greeting, and the QEMU version if available.
    352        """
    353        print(msg)
    354        if not self._greeting:
    355            print('Connected')
    356            return
    357        version = self._greeting['QMP']['version']['qemu']
    358        print("Connected to QEMU {major}.{minor}.{micro}\n".format(**version))
    359
    360    @property
    361    def prompt(self) -> str:
    362        """
    363        Return the current shell prompt, including a trailing space.
    364        """
    365        if self._transmode:
    366            return 'TRANS> '
    367        return '(QEMU) '
    368
    369    def read_exec_command(self) -> bool:
    370        """
    371        Read and execute a command.
    372
    373        @return True if execution was ok, return False if disconnected.
    374        """
    375        try:
    376            cmdline = input(self.prompt)
    377        except EOFError:
    378            print()
    379            return False
    380
    381        if cmdline == '':
    382            for event in self.get_events():
    383                print(event)
    384            return True
    385
    386        return self._execute_cmd(cmdline)
    387
    388    def repl(self) -> Iterator[None]:
    389        """
    390        Return an iterator that implements the REPL.
    391        """
    392        self.show_banner()
    393        while self.read_exec_command():
    394            yield
    395        self.close()
    396
    397
    398class HMPShell(QMPShell):
    399    """
    400    HMPShell provides a basic readline-based HMP shell, tunnelled via QMP.
    401
    402    :param address: Address of the QMP server.
    403    :param pretty: Pretty-print QMP messages.
    404    :param verbose: Echo outgoing QMP messages to console.
    405    """
    406    def __init__(self, address: qmp.SocketAddrT,
    407                 pretty: bool = False, verbose: bool = False):
    408        super().__init__(address, pretty, verbose)
    409        self._cpu_index = 0
    410
    411    def _cmd_completion(self) -> None:
    412        for cmd in self._cmd_passthrough('help')['return'].split('\r\n'):
    413            if cmd and cmd[0] != '[' and cmd[0] != '\t':
    414                name = cmd.split()[0]  # drop help text
    415                if name == 'info':
    416                    continue
    417                if name.find('|') != -1:
    418                    # Command in the form 'foobar|f' or 'f|foobar', take the
    419                    # full name
    420                    opt = name.split('|')
    421                    if len(opt[0]) == 1:
    422                        name = opt[1]
    423                    else:
    424                        name = opt[0]
    425                self._completer.append(name)
    426                self._completer.append('help ' + name)  # help completion
    427
    428    def _info_completion(self) -> None:
    429        for cmd in self._cmd_passthrough('info')['return'].split('\r\n'):
    430            if cmd:
    431                self._completer.append('info ' + cmd.split()[1])
    432
    433    def _other_completion(self) -> None:
    434        # special cases
    435        self._completer.append('help info')
    436
    437    def _fill_completion(self) -> None:
    438        self._cmd_completion()
    439        self._info_completion()
    440        self._other_completion()
    441
    442    def _cmd_passthrough(self, cmdline: str,
    443                         cpu_index: int = 0) -> QMPMessage:
    444        return self.cmd_obj({
    445            'execute': 'human-monitor-command',
    446            'arguments': {
    447                'command-line': cmdline,
    448                'cpu-index': cpu_index
    449            }
    450        })
    451
    452    def _execute_cmd(self, cmdline: str) -> bool:
    453        if cmdline.split()[0] == "cpu":
    454            # trap the cpu command, it requires special setting
    455            try:
    456                idx = int(cmdline.split()[1])
    457                if 'return' not in self._cmd_passthrough('info version', idx):
    458                    print('bad CPU index')
    459                    return True
    460                self._cpu_index = idx
    461            except ValueError:
    462                print('cpu command takes an integer argument')
    463                return True
    464        resp = self._cmd_passthrough(cmdline, self._cpu_index)
    465        if resp is None:
    466            print('Disconnected')
    467            return False
    468        assert 'return' in resp or 'error' in resp
    469        if 'return' in resp:
    470            # Success
    471            if len(resp['return']) > 0:
    472                print(resp['return'], end=' ')
    473        else:
    474            # Error
    475            print('%s: %s' % (resp['error']['class'], resp['error']['desc']))
    476        return True
    477
    478    def show_banner(self, msg: str = 'Welcome to the HMP shell!') -> None:
    479        QMPShell.show_banner(self, msg)
    480
    481
    482def die(msg: str) -> NoReturn:
    483    """Write an error to stderr, then exit with a return code of 1."""
    484    sys.stderr.write('ERROR: %s\n' % msg)
    485    sys.exit(1)
    486
    487
    488def main() -> None:
    489    """
    490    qmp-shell entry point: parse command line arguments and start the REPL.
    491    """
    492    parser = argparse.ArgumentParser()
    493    parser.add_argument('-H', '--hmp', action='store_true',
    494                        help='Use HMP interface')
    495    parser.add_argument('-N', '--skip-negotiation', action='store_true',
    496                        help='Skip negotiate (for qemu-ga)')
    497    parser.add_argument('-v', '--verbose', action='store_true',
    498                        help='Verbose (echo commands sent and received)')
    499    parser.add_argument('-p', '--pretty', action='store_true',
    500                        help='Pretty-print JSON')
    501
    502    default_server = os.environ.get('QMP_SOCKET')
    503    parser.add_argument('qmp_server', action='store',
    504                        default=default_server,
    505                        help='< UNIX socket path | TCP address:port >')
    506
    507    args = parser.parse_args()
    508    if args.qmp_server is None:
    509        parser.error("QMP socket or TCP address must be specified")
    510
    511    shell_class = HMPShell if args.hmp else QMPShell
    512
    513    try:
    514        address = shell_class.parse_address(args.qmp_server)
    515    except qmp.QMPBadPortError:
    516        parser.error(f"Bad port number: {args.qmp_server}")
    517        return  # pycharm doesn't know error() is noreturn
    518
    519    with shell_class(address, args.pretty, args.verbose) as qemu:
    520        try:
    521            qemu.connect(negotiate=not args.skip_negotiation)
    522        except qmp.QMPConnectError:
    523            die("Didn't get QMP greeting message")
    524        except qmp.QMPCapabilitiesError:
    525            die("Couldn't negotiate capabilities")
    526        except OSError as err:
    527            die(f"Couldn't connect to {args.qmp_server}: {err!s}")
    528
    529        for _ in qemu.repl():
    530            pass
    531
    532
    533if __name__ == '__main__':
    534    main()