qmp_shell.py (17272B)
1# 2# Copyright (C) 2009, 2010 Red Hat Inc. 3# 4# Authors: 5# Luiz Capitulino <lcapitulino@redhat.com> 6# 7# This work is licensed under the terms of the GNU GPL, version 2. See 8# the COPYING file in the top-level directory. 9# 10 11""" 12Low-level QEMU shell on top of QMP. 13 14usage: qmp-shell [-h] [-H] [-N] [-v] [-p] qmp_server 15 16positional arguments: 17 qmp_server < UNIX socket path | TCP address:port > 18 19optional arguments: 20 -h, --help show this help message and exit 21 -H, --hmp Use HMP interface 22 -N, --skip-negotiation 23 Skip negotiate (for qemu-ga) 24 -v, --verbose Verbose (echo commands sent and received) 25 -p, --pretty Pretty-print JSON 26 27 28Start QEMU with: 29 30# qemu [...] -qmp unix:./qmp-sock,server 31 32Run the shell: 33 34$ qmp-shell ./qmp-sock 35 36Commands have the following format: 37 38 < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ] 39 40For example: 41 42(QEMU) device_add driver=e1000 id=net1 43{'return': {}} 44(QEMU) 45 46key=value pairs also support Python or JSON object literal subset notations, 47without spaces. Dictionaries/objects {} are supported as are arrays []. 48 49 example-command arg-name1={'key':'value','obj'={'prop':"value"}} 50 51Both JSON and Python formatting should work, including both styles of 52string literal quotes. Both paradigms of literal values should work, 53including null/true/false for JSON and None/True/False for Python. 54 55 56Transactions have the following multi-line format: 57 58 transaction( 59 action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ] 60 ... 61 action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ] 62 ) 63 64One line transactions are also supported: 65 66 transaction( action-name1 ... ) 67 68For example: 69 70 (QEMU) transaction( 71 TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1 72 TRANS> block-dirty-bitmap-clear node=drive0 name=bitmap0 73 TRANS> ) 74 {"return": {}} 75 (QEMU) 76 77Use the -v and -p options to activate the verbose and pretty-print options, 78which will echo back the properly formatted JSON-compliant QMP that is being 79sent to QEMU, which is useful for debugging and documentation generation. 80""" 81 82import argparse 83import ast 84import json 85import logging 86import os 87import re 88import readline 89import sys 90from typing import ( 91 Iterator, 92 List, 93 NoReturn, 94 Optional, 95 Sequence, 96) 97 98from qemu import qmp 99from qemu.qmp import QMPMessage 100 101 102LOG = logging.getLogger(__name__) 103 104 105class QMPCompleter: 106 """ 107 QMPCompleter provides a readline library tab-complete behavior. 108 """ 109 # NB: Python 3.9+ will probably allow us to subclass list[str] directly, 110 # but pylint as of today does not know that List[str] is simply 'list'. 111 def __init__(self) -> None: 112 self._matches: List[str] = [] 113 114 def append(self, value: str) -> None: 115 """Append a new valid completion to the list of possibilities.""" 116 return self._matches.append(value) 117 118 def complete(self, text: str, state: int) -> Optional[str]: 119 """readline.set_completer() callback implementation.""" 120 for cmd in self._matches: 121 if cmd.startswith(text): 122 if state == 0: 123 return cmd 124 state -= 1 125 return None 126 127 128class QMPShellError(qmp.QMPError): 129 """ 130 QMP Shell Base error class. 131 """ 132 133 134class FuzzyJSON(ast.NodeTransformer): 135 """ 136 This extension of ast.NodeTransformer filters literal "true/false/null" 137 values in a Python AST and replaces them by proper "True/False/None" values 138 that Python can properly evaluate. 139 """ 140 141 @classmethod 142 def visit_Name(cls, # pylint: disable=invalid-name 143 node: ast.Name) -> ast.AST: 144 """ 145 Transform Name nodes with certain values into Constant (keyword) nodes. 146 """ 147 if node.id == 'true': 148 return ast.Constant(value=True) 149 if node.id == 'false': 150 return ast.Constant(value=False) 151 if node.id == 'null': 152 return ast.Constant(value=None) 153 return node 154 155 156class QMPShell(qmp.QEMUMonitorProtocol): 157 """ 158 QMPShell provides a basic readline-based QMP shell. 159 160 :param address: Address of the QMP server. 161 :param pretty: Pretty-print QMP messages. 162 :param verbose: Echo outgoing QMP messages to console. 163 """ 164 def __init__(self, address: qmp.SocketAddrT, 165 pretty: bool = False, verbose: bool = False): 166 super().__init__(address) 167 self._greeting: Optional[QMPMessage] = None 168 self._completer = QMPCompleter() 169 self._transmode = False 170 self._actions: List[QMPMessage] = [] 171 self._histfile = os.path.join(os.path.expanduser('~'), 172 '.qmp-shell_history') 173 self.pretty = pretty 174 self.verbose = verbose 175 176 def close(self) -> None: 177 # Hook into context manager of parent to save shell history. 178 self._save_history() 179 super().close() 180 181 def _fill_completion(self) -> None: 182 cmds = self.cmd('query-commands') 183 if 'error' in cmds: 184 return 185 for cmd in cmds['return']: 186 self._completer.append(cmd['name']) 187 188 def _completer_setup(self) -> None: 189 self._completer = QMPCompleter() 190 self._fill_completion() 191 readline.set_history_length(1024) 192 readline.set_completer(self._completer.complete) 193 readline.parse_and_bind("tab: complete") 194 # NB: default delimiters conflict with some command names 195 # (eg. query-), clearing everything as it doesn't seem to matter 196 readline.set_completer_delims('') 197 try: 198 readline.read_history_file(self._histfile) 199 except FileNotFoundError: 200 pass 201 except IOError as err: 202 msg = f"Failed to read history '{self._histfile}': {err!s}" 203 LOG.warning(msg) 204 205 def _save_history(self) -> None: 206 try: 207 readline.write_history_file(self._histfile) 208 except IOError as err: 209 msg = f"Failed to save history file '{self._histfile}': {err!s}" 210 LOG.warning(msg) 211 212 @classmethod 213 def _parse_value(cls, val: str) -> object: 214 try: 215 return int(val) 216 except ValueError: 217 pass 218 219 if val.lower() == 'true': 220 return True 221 if val.lower() == 'false': 222 return False 223 if val.startswith(('{', '[')): 224 # Try first as pure JSON: 225 try: 226 return json.loads(val) 227 except ValueError: 228 pass 229 # Try once again as FuzzyJSON: 230 try: 231 tree = ast.parse(val, mode='eval') 232 transformed = FuzzyJSON().visit(tree) 233 return ast.literal_eval(transformed) 234 except (SyntaxError, ValueError): 235 pass 236 return val 237 238 def _cli_expr(self, 239 tokens: Sequence[str], 240 parent: qmp.QMPObject) -> None: 241 for arg in tokens: 242 (key, sep, val) = arg.partition('=') 243 if sep != '=': 244 raise QMPShellError( 245 f"Expected a key=value pair, got '{arg!s}'" 246 ) 247 248 value = self._parse_value(val) 249 optpath = key.split('.') 250 curpath = [] 251 for path in optpath[:-1]: 252 curpath.append(path) 253 obj = parent.get(path, {}) 254 if not isinstance(obj, dict): 255 msg = 'Cannot use "{:s}" as both leaf and non-leaf key' 256 raise QMPShellError(msg.format('.'.join(curpath))) 257 parent[path] = obj 258 parent = obj 259 if optpath[-1] in parent: 260 if isinstance(parent[optpath[-1]], dict): 261 msg = 'Cannot use "{:s}" as both leaf and non-leaf key' 262 raise QMPShellError(msg.format('.'.join(curpath))) 263 raise QMPShellError(f'Cannot set "{key}" multiple times') 264 parent[optpath[-1]] = value 265 266 def _build_cmd(self, cmdline: str) -> Optional[QMPMessage]: 267 """ 268 Build a QMP input object from a user provided command-line in the 269 following format: 270 271 < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ] 272 """ 273 argument_regex = r'''(?:[^\s"']|"(?:\\.|[^"])*"|'(?:\\.|[^'])*')+''' 274 cmdargs = re.findall(argument_regex, cmdline) 275 qmpcmd: QMPMessage 276 277 # Transactional CLI entry: 278 if cmdargs and cmdargs[0] == 'transaction(': 279 self._transmode = True 280 self._actions = [] 281 cmdargs.pop(0) 282 283 # Transactional CLI exit: 284 if cmdargs and cmdargs[0] == ')' and self._transmode: 285 self._transmode = False 286 if len(cmdargs) > 1: 287 msg = 'Unexpected input after close of Transaction sub-shell' 288 raise QMPShellError(msg) 289 qmpcmd = { 290 'execute': 'transaction', 291 'arguments': {'actions': self._actions} 292 } 293 return qmpcmd 294 295 # No args, or no args remaining 296 if not cmdargs: 297 return None 298 299 if self._transmode: 300 # Parse and cache this Transactional Action 301 finalize = False 302 action = {'type': cmdargs[0], 'data': {}} 303 if cmdargs[-1] == ')': 304 cmdargs.pop(-1) 305 finalize = True 306 self._cli_expr(cmdargs[1:], action['data']) 307 self._actions.append(action) 308 return self._build_cmd(')') if finalize else None 309 310 # Standard command: parse and return it to be executed. 311 qmpcmd = {'execute': cmdargs[0], 'arguments': {}} 312 self._cli_expr(cmdargs[1:], qmpcmd['arguments']) 313 return qmpcmd 314 315 def _print(self, qmp_message: object) -> None: 316 jsobj = json.dumps(qmp_message, 317 indent=4 if self.pretty else None, 318 sort_keys=self.pretty) 319 print(str(jsobj)) 320 321 def _execute_cmd(self, cmdline: str) -> bool: 322 try: 323 qmpcmd = self._build_cmd(cmdline) 324 except QMPShellError as err: 325 print( 326 f"Error while parsing command line: {err!s}\n" 327 "command format: <command-name> " 328 "[arg-name1=arg1] ... [arg-nameN=argN", 329 file=sys.stderr 330 ) 331 return True 332 # For transaction mode, we may have just cached the action: 333 if qmpcmd is None: 334 return True 335 if self.verbose: 336 self._print(qmpcmd) 337 resp = self.cmd_obj(qmpcmd) 338 if resp is None: 339 print('Disconnected') 340 return False 341 self._print(resp) 342 return True 343 344 def connect(self, negotiate: bool = True) -> None: 345 self._greeting = super().connect(negotiate) 346 self._completer_setup() 347 348 def show_banner(self, 349 msg: str = 'Welcome to the QMP low-level shell!') -> None: 350 """ 351 Print to stdio a greeting, and the QEMU version if available. 352 """ 353 print(msg) 354 if not self._greeting: 355 print('Connected') 356 return 357 version = self._greeting['QMP']['version']['qemu'] 358 print("Connected to QEMU {major}.{minor}.{micro}\n".format(**version)) 359 360 @property 361 def prompt(self) -> str: 362 """ 363 Return the current shell prompt, including a trailing space. 364 """ 365 if self._transmode: 366 return 'TRANS> ' 367 return '(QEMU) ' 368 369 def read_exec_command(self) -> bool: 370 """ 371 Read and execute a command. 372 373 @return True if execution was ok, return False if disconnected. 374 """ 375 try: 376 cmdline = input(self.prompt) 377 except EOFError: 378 print() 379 return False 380 381 if cmdline == '': 382 for event in self.get_events(): 383 print(event) 384 return True 385 386 return self._execute_cmd(cmdline) 387 388 def repl(self) -> Iterator[None]: 389 """ 390 Return an iterator that implements the REPL. 391 """ 392 self.show_banner() 393 while self.read_exec_command(): 394 yield 395 self.close() 396 397 398class HMPShell(QMPShell): 399 """ 400 HMPShell provides a basic readline-based HMP shell, tunnelled via QMP. 401 402 :param address: Address of the QMP server. 403 :param pretty: Pretty-print QMP messages. 404 :param verbose: Echo outgoing QMP messages to console. 405 """ 406 def __init__(self, address: qmp.SocketAddrT, 407 pretty: bool = False, verbose: bool = False): 408 super().__init__(address, pretty, verbose) 409 self._cpu_index = 0 410 411 def _cmd_completion(self) -> None: 412 for cmd in self._cmd_passthrough('help')['return'].split('\r\n'): 413 if cmd and cmd[0] != '[' and cmd[0] != '\t': 414 name = cmd.split()[0] # drop help text 415 if name == 'info': 416 continue 417 if name.find('|') != -1: 418 # Command in the form 'foobar|f' or 'f|foobar', take the 419 # full name 420 opt = name.split('|') 421 if len(opt[0]) == 1: 422 name = opt[1] 423 else: 424 name = opt[0] 425 self._completer.append(name) 426 self._completer.append('help ' + name) # help completion 427 428 def _info_completion(self) -> None: 429 for cmd in self._cmd_passthrough('info')['return'].split('\r\n'): 430 if cmd: 431 self._completer.append('info ' + cmd.split()[1]) 432 433 def _other_completion(self) -> None: 434 # special cases 435 self._completer.append('help info') 436 437 def _fill_completion(self) -> None: 438 self._cmd_completion() 439 self._info_completion() 440 self._other_completion() 441 442 def _cmd_passthrough(self, cmdline: str, 443 cpu_index: int = 0) -> QMPMessage: 444 return self.cmd_obj({ 445 'execute': 'human-monitor-command', 446 'arguments': { 447 'command-line': cmdline, 448 'cpu-index': cpu_index 449 } 450 }) 451 452 def _execute_cmd(self, cmdline: str) -> bool: 453 if cmdline.split()[0] == "cpu": 454 # trap the cpu command, it requires special setting 455 try: 456 idx = int(cmdline.split()[1]) 457 if 'return' not in self._cmd_passthrough('info version', idx): 458 print('bad CPU index') 459 return True 460 self._cpu_index = idx 461 except ValueError: 462 print('cpu command takes an integer argument') 463 return True 464 resp = self._cmd_passthrough(cmdline, self._cpu_index) 465 if resp is None: 466 print('Disconnected') 467 return False 468 assert 'return' in resp or 'error' in resp 469 if 'return' in resp: 470 # Success 471 if len(resp['return']) > 0: 472 print(resp['return'], end=' ') 473 else: 474 # Error 475 print('%s: %s' % (resp['error']['class'], resp['error']['desc'])) 476 return True 477 478 def show_banner(self, msg: str = 'Welcome to the HMP shell!') -> None: 479 QMPShell.show_banner(self, msg) 480 481 482def die(msg: str) -> NoReturn: 483 """Write an error to stderr, then exit with a return code of 1.""" 484 sys.stderr.write('ERROR: %s\n' % msg) 485 sys.exit(1) 486 487 488def main() -> None: 489 """ 490 qmp-shell entry point: parse command line arguments and start the REPL. 491 """ 492 parser = argparse.ArgumentParser() 493 parser.add_argument('-H', '--hmp', action='store_true', 494 help='Use HMP interface') 495 parser.add_argument('-N', '--skip-negotiation', action='store_true', 496 help='Skip negotiate (for qemu-ga)') 497 parser.add_argument('-v', '--verbose', action='store_true', 498 help='Verbose (echo commands sent and received)') 499 parser.add_argument('-p', '--pretty', action='store_true', 500 help='Pretty-print JSON') 501 502 default_server = os.environ.get('QMP_SOCKET') 503 parser.add_argument('qmp_server', action='store', 504 default=default_server, 505 help='< UNIX socket path | TCP address:port >') 506 507 args = parser.parse_args() 508 if args.qmp_server is None: 509 parser.error("QMP socket or TCP address must be specified") 510 511 shell_class = HMPShell if args.hmp else QMPShell 512 513 try: 514 address = shell_class.parse_address(args.qmp_server) 515 except qmp.QMPBadPortError: 516 parser.error(f"Bad port number: {args.qmp_server}") 517 return # pycharm doesn't know error() is noreturn 518 519 with shell_class(address, args.pretty, args.verbose) as qemu: 520 try: 521 qemu.connect(negotiate=not args.skip_negotiation) 522 except qmp.QMPConnectError: 523 die("Didn't get QMP greeting message") 524 except qmp.QMPCapabilitiesError: 525 die("Couldn't negotiate capabilities") 526 except OSError as err: 527 die(f"Couldn't connect to {args.qmp_server}: {err!s}") 528 529 for _ in qemu.repl(): 530 pass 531 532 533if __name__ == '__main__': 534 main()