qmp_client.py (22375B)
1""" 2QMP Protocol Implementation 3 4This module provides the `QMPClient` class, which can be used to connect 5and send commands to a QMP server such as QEMU. The QMP class can be 6used to either connect to a listening server, or used to listen and 7accept an incoming connection from that server. 8""" 9 10import asyncio 11import logging 12import socket 13import struct 14from typing import ( 15 Dict, 16 List, 17 Mapping, 18 Optional, 19 Union, 20 cast, 21) 22 23from .error import AQMPError, ProtocolError 24from .events import Events 25from .message import Message 26from .models import ErrorResponse, Greeting 27from .protocol import AsyncProtocol, Runstate, require 28from .util import ( 29 bottom_half, 30 exception_summary, 31 pretty_traceback, 32 upper_half, 33) 34 35 36class _WrappedProtocolError(ProtocolError): 37 """ 38 Abstract exception class for Protocol errors that wrap an Exception. 39 40 :param error_message: Human-readable string describing the error. 41 :param exc: The root-cause exception. 42 """ 43 def __init__(self, error_message: str, exc: Exception): 44 super().__init__(error_message) 45 self.exc = exc 46 47 def __str__(self) -> str: 48 return f"{self.error_message}: {self.exc!s}" 49 50 51class GreetingError(_WrappedProtocolError): 52 """ 53 An exception occurred during the Greeting phase. 54 55 :param error_message: Human-readable string describing the error. 56 :param exc: The root-cause exception. 57 """ 58 59 60class NegotiationError(_WrappedProtocolError): 61 """ 62 An exception occurred during the Negotiation phase. 63 64 :param error_message: Human-readable string describing the error. 65 :param exc: The root-cause exception. 66 """ 67 68 69class ExecuteError(AQMPError): 70 """ 71 Exception raised by `QMPClient.execute()` on RPC failure. 72 73 :param error_response: The RPC error response object. 74 :param sent: The sent RPC message that caused the failure. 75 :param received: The raw RPC error reply received. 76 """ 77 def __init__(self, error_response: ErrorResponse, 78 sent: Message, received: Message): 79 super().__init__(error_response.error.desc) 80 #: The sent `Message` that caused the failure 81 self.sent: Message = sent 82 #: The received `Message` that indicated failure 83 self.received: Message = received 84 #: The parsed error response 85 self.error: ErrorResponse = error_response 86 #: The QMP error class 87 self.error_class: str = error_response.error.class_ 88 89 90class ExecInterruptedError(AQMPError): 91 """ 92 Exception raised by `execute()` (et al) when an RPC is interrupted. 93 94 This error is raised when an `execute()` statement could not be 95 completed. This can occur because the connection itself was 96 terminated before a reply was received. 97 98 The true cause of the interruption will be available via `disconnect()`. 99 """ 100 101 102class _MsgProtocolError(ProtocolError): 103 """ 104 Abstract error class for protocol errors that have a `Message` object. 105 106 This Exception class is used for protocol errors where the `Message` 107 was mechanically understood, but was found to be inappropriate or 108 malformed. 109 110 :param error_message: Human-readable string describing the error. 111 :param msg: The QMP `Message` that caused the error. 112 """ 113 def __init__(self, error_message: str, msg: Message): 114 super().__init__(error_message) 115 #: The received `Message` that caused the error. 116 self.msg: Message = msg 117 118 def __str__(self) -> str: 119 return "\n".join([ 120 super().__str__(), 121 f" Message was: {str(self.msg)}\n", 122 ]) 123 124 125class ServerParseError(_MsgProtocolError): 126 """ 127 The Server sent a `Message` indicating parsing failure. 128 129 i.e. A reply has arrived from the server, but it is missing the "ID" 130 field, indicating a parsing error. 131 132 :param error_message: Human-readable string describing the error. 133 :param msg: The QMP `Message` that caused the error. 134 """ 135 136 137class BadReplyError(_MsgProtocolError): 138 """ 139 An execution reply was successfully routed, but not understood. 140 141 If a QMP message is received with an 'id' field to allow it to be 142 routed, but is otherwise malformed, this exception will be raised. 143 144 A reply message is malformed if it is missing either the 'return' or 145 'error' keys, or if the 'error' value has missing keys or members of 146 the wrong type. 147 148 :param error_message: Human-readable string describing the error. 149 :param msg: The malformed reply that was received. 150 :param sent: The message that was sent that prompted the error. 151 """ 152 def __init__(self, error_message: str, msg: Message, sent: Message): 153 super().__init__(error_message, msg) 154 #: The sent `Message` that caused the failure 155 self.sent = sent 156 157 158class QMPClient(AsyncProtocol[Message], Events): 159 """ 160 Implements a QMP client connection. 161 162 QMP can be used to establish a connection as either the transport 163 client or server, though this class always acts as the QMP client. 164 165 :param name: Optional nickname for the connection, used for logging. 166 167 Basic script-style usage looks like this:: 168 169 qmp = QMPClient('my_virtual_machine_name') 170 await qmp.connect(('127.0.0.1', 1234)) 171 ... 172 res = await qmp.execute('block-query') 173 ... 174 await qmp.disconnect() 175 176 Basic async client-style usage looks like this:: 177 178 class Client: 179 def __init__(self, name: str): 180 self.qmp = QMPClient(name) 181 182 async def watch_events(self): 183 try: 184 async for event in self.qmp.events: 185 print(f"Event: {event['event']}") 186 except asyncio.CancelledError: 187 return 188 189 async def run(self, address='/tmp/qemu.socket'): 190 await self.qmp.connect(address) 191 asyncio.create_task(self.watch_events()) 192 await self.qmp.runstate_changed.wait() 193 await self.disconnect() 194 195 See `aqmp.events` for more detail on event handling patterns. 196 """ 197 #: Logger object used for debugging messages. 198 logger = logging.getLogger(__name__) 199 200 # Read buffer limit; large enough to accept query-qmp-schema 201 _limit = (256 * 1024) 202 203 # Type alias for pending execute() result items 204 _PendingT = Union[Message, ExecInterruptedError] 205 206 def __init__(self, name: Optional[str] = None) -> None: 207 super().__init__(name) 208 Events.__init__(self) 209 210 #: Whether or not to await a greeting after establishing a connection. 211 self.await_greeting: bool = True 212 213 #: Whether or not to perform capabilities negotiation upon connection. 214 #: Implies `await_greeting`. 215 self.negotiate: bool = True 216 217 # Cached Greeting, if one was awaited. 218 self._greeting: Optional[Greeting] = None 219 220 # Command ID counter 221 self._execute_id = 0 222 223 # Incoming RPC reply messages. 224 self._pending: Dict[ 225 Union[str, None], 226 'asyncio.Queue[QMPClient._PendingT]' 227 ] = {} 228 229 @property 230 def greeting(self) -> Optional[Greeting]: 231 """The `Greeting` from the QMP server, if any.""" 232 return self._greeting 233 234 @upper_half 235 async def _establish_session(self) -> None: 236 """ 237 Initiate the QMP session. 238 239 Wait for the QMP greeting and perform capabilities negotiation. 240 241 :raise GreetingError: When the greeting is not understood. 242 :raise NegotiationError: If the negotiation fails. 243 :raise EOFError: When the server unexpectedly hangs up. 244 :raise OSError: For underlying stream errors. 245 """ 246 self._greeting = None 247 self._pending = {} 248 249 if self.await_greeting or self.negotiate: 250 self._greeting = await self._get_greeting() 251 252 if self.negotiate: 253 await self._negotiate() 254 255 # This will start the reader/writers: 256 await super()._establish_session() 257 258 @upper_half 259 async def _get_greeting(self) -> Greeting: 260 """ 261 :raise GreetingError: When the greeting is not understood. 262 :raise EOFError: When the server unexpectedly hangs up. 263 :raise OSError: For underlying stream errors. 264 265 :return: the Greeting object given by the server. 266 """ 267 self.logger.debug("Awaiting greeting ...") 268 269 try: 270 msg = await self._recv() 271 return Greeting(msg) 272 except (ProtocolError, KeyError, TypeError) as err: 273 emsg = "Did not understand Greeting" 274 self.logger.error("%s: %s", emsg, exception_summary(err)) 275 self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) 276 raise GreetingError(emsg, err) from err 277 except BaseException as err: 278 # EOFError, OSError, or something unexpected. 279 emsg = "Failed to receive Greeting" 280 self.logger.error("%s: %s", emsg, exception_summary(err)) 281 self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) 282 raise 283 284 @upper_half 285 async def _negotiate(self) -> None: 286 """ 287 Perform QMP capabilities negotiation. 288 289 :raise NegotiationError: When negotiation fails. 290 :raise EOFError: When the server unexpectedly hangs up. 291 :raise OSError: For underlying stream errors. 292 """ 293 self.logger.debug("Negotiating capabilities ...") 294 295 arguments: Dict[str, List[str]] = {'enable': []} 296 if self._greeting and 'oob' in self._greeting.QMP.capabilities: 297 arguments['enable'].append('oob') 298 msg = self.make_execute_msg('qmp_capabilities', arguments=arguments) 299 300 # It's not safe to use execute() here, because the reader/writers 301 # aren't running. AsyncProtocol *requires* that a new session 302 # does not fail after the reader/writers are running! 303 try: 304 await self._send(msg) 305 reply = await self._recv() 306 assert 'return' in reply 307 assert 'error' not in reply 308 except (ProtocolError, AssertionError) as err: 309 emsg = "Negotiation failed" 310 self.logger.error("%s: %s", emsg, exception_summary(err)) 311 self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) 312 raise NegotiationError(emsg, err) from err 313 except BaseException as err: 314 # EOFError, OSError, or something unexpected. 315 emsg = "Negotiation failed" 316 self.logger.error("%s: %s", emsg, exception_summary(err)) 317 self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) 318 raise 319 320 @bottom_half 321 async def _bh_disconnect(self) -> None: 322 try: 323 await super()._bh_disconnect() 324 finally: 325 if self._pending: 326 self.logger.debug("Cancelling pending executions") 327 keys = self._pending.keys() 328 for key in keys: 329 self.logger.debug("Cancelling execution '%s'", key) 330 self._pending[key].put_nowait( 331 ExecInterruptedError("Disconnected") 332 ) 333 334 self.logger.debug("QMP Disconnected.") 335 336 @upper_half 337 def _cleanup(self) -> None: 338 super()._cleanup() 339 assert not self._pending 340 341 @bottom_half 342 async def _on_message(self, msg: Message) -> None: 343 """ 344 Add an incoming message to the appropriate queue/handler. 345 346 :raise ServerParseError: When Message indicates server parse failure. 347 """ 348 # Incoming messages are not fully parsed/validated here; 349 # do only light peeking to know how to route the messages. 350 351 if 'event' in msg: 352 await self._event_dispatch(msg) 353 return 354 355 # Below, we assume everything left is an execute/exec-oob response. 356 357 exec_id = cast(Optional[str], msg.get('id')) 358 359 if exec_id in self._pending: 360 await self._pending[exec_id].put(msg) 361 return 362 363 # We have a message we can't route back to a caller. 364 365 is_error = 'error' in msg 366 has_id = 'id' in msg 367 368 if is_error and not has_id: 369 # This is very likely a server parsing error. 370 # It doesn't inherently belong to any pending execution. 371 # Instead of performing clever recovery, just terminate. 372 # See "NOTE" in qmp-spec.txt, section 2.4.2 373 raise ServerParseError( 374 ("Server sent an error response without an ID, " 375 "but there are no ID-less executions pending. " 376 "Assuming this is a server parser failure."), 377 msg 378 ) 379 380 # qmp-spec.txt, section 2.4: 381 # 'Clients should drop all the responses 382 # that have an unknown "id" field.' 383 self.logger.log( 384 logging.ERROR if is_error else logging.WARNING, 385 "Unknown ID '%s', message dropped.", 386 exec_id, 387 ) 388 self.logger.debug("Unroutable message: %s", str(msg)) 389 390 @upper_half 391 @bottom_half 392 async def _do_recv(self) -> Message: 393 """ 394 :raise OSError: When a stream error is encountered. 395 :raise EOFError: When the stream is at EOF. 396 :raise ProtocolError: 397 When the Message is not understood. 398 See also `Message._deserialize`. 399 400 :return: A single QMP `Message`. 401 """ 402 msg_bytes = await self._readline() 403 msg = Message(msg_bytes, eager=True) 404 return msg 405 406 @upper_half 407 @bottom_half 408 def _do_send(self, msg: Message) -> None: 409 """ 410 :raise ValueError: JSON serialization failure 411 :raise TypeError: JSON serialization failure 412 :raise OSError: When a stream error is encountered. 413 """ 414 assert self._writer is not None 415 self._writer.write(bytes(msg)) 416 417 @upper_half 418 def _get_exec_id(self) -> str: 419 exec_id = f"__aqmp#{self._execute_id:05d}" 420 self._execute_id += 1 421 return exec_id 422 423 @upper_half 424 async def _issue(self, msg: Message) -> Union[None, str]: 425 """ 426 Issue a QMP `Message` and do not wait for a reply. 427 428 :param msg: The QMP `Message` to send to the server. 429 430 :return: The ID of the `Message` sent. 431 """ 432 msg_id: Optional[str] = None 433 if 'id' in msg: 434 assert isinstance(msg['id'], str) 435 msg_id = msg['id'] 436 437 self._pending[msg_id] = asyncio.Queue(maxsize=1) 438 await self._outgoing.put(msg) 439 440 return msg_id 441 442 @upper_half 443 async def _reply(self, msg_id: Union[str, None]) -> Message: 444 """ 445 Await a reply to a previously issued QMP message. 446 447 :param msg_id: The ID of the previously issued message. 448 449 :return: The reply from the server. 450 :raise ExecInterruptedError: 451 When the reply could not be retrieved because the connection 452 was lost, or some other problem. 453 """ 454 queue = self._pending[msg_id] 455 result = await queue.get() 456 457 try: 458 if isinstance(result, ExecInterruptedError): 459 raise result 460 return result 461 finally: 462 del self._pending[msg_id] 463 464 @upper_half 465 async def _execute(self, msg: Message, assign_id: bool = True) -> Message: 466 """ 467 Send a QMP `Message` to the server and await a reply. 468 469 This method *assumes* you are sending some kind of an execute 470 statement that *will* receive a reply. 471 472 An execution ID will be assigned if assign_id is `True`. It can be 473 disabled, but this requires that an ID is manually assigned 474 instead. For manually assigned IDs, you must not use the string 475 '__aqmp#' anywhere in the ID. 476 477 :param msg: The QMP `Message` to execute. 478 :param assign_id: If True, assign a new execution ID. 479 480 :return: Execution reply from the server. 481 :raise ExecInterruptedError: 482 When the reply could not be retrieved because the connection 483 was lost, or some other problem. 484 """ 485 if assign_id: 486 msg['id'] = self._get_exec_id() 487 elif 'id' in msg: 488 assert isinstance(msg['id'], str) 489 assert '__aqmp#' not in msg['id'] 490 491 exec_id = await self._issue(msg) 492 return await self._reply(exec_id) 493 494 @upper_half 495 @require(Runstate.RUNNING) 496 async def _raw( 497 self, 498 msg: Union[Message, Mapping[str, object], bytes], 499 assign_id: bool = True, 500 ) -> Message: 501 """ 502 Issue a raw `Message` to the QMP server and await a reply. 503 504 :param msg: 505 A Message to send to the server. It may be a `Message`, any 506 Mapping (including Dict), or raw bytes. 507 :param assign_id: 508 Assign an arbitrary execution ID to this message. If 509 `False`, the existing id must either be absent (and no other 510 such pending execution may omit an ID) or a string. If it is 511 a string, it must not start with '__aqmp#' and no other such 512 pending execution may currently be using that ID. 513 514 :return: Execution reply from the server. 515 516 :raise ExecInterruptedError: 517 When the reply could not be retrieved because the connection 518 was lost, or some other problem. 519 :raise TypeError: 520 When assign_id is `False`, an ID is given, and it is not a string. 521 :raise ValueError: 522 When assign_id is `False`, but the ID is not usable; 523 Either because it starts with '__aqmp#' or it is already in-use. 524 """ 525 # 1. convert generic Mapping or bytes to a QMP Message 526 # 2. copy Message objects so that we assign an ID only to the copy. 527 msg = Message(msg) 528 529 exec_id = msg.get('id') 530 if not assign_id and 'id' in msg: 531 if not isinstance(exec_id, str): 532 raise TypeError(f"ID ('{exec_id}') must be a string.") 533 if exec_id.startswith('__aqmp#'): 534 raise ValueError( 535 f"ID ('{exec_id}') must not start with '__aqmp#'." 536 ) 537 538 if not assign_id and exec_id in self._pending: 539 raise ValueError( 540 f"ID '{exec_id}' is in-use and cannot be used." 541 ) 542 543 return await self._execute(msg, assign_id=assign_id) 544 545 @upper_half 546 @require(Runstate.RUNNING) 547 async def execute_msg(self, msg: Message) -> object: 548 """ 549 Execute a QMP command and return its value. 550 551 :param msg: The QMP `Message` to execute. 552 553 :return: 554 The command execution return value from the server. The type of 555 object returned depends on the command that was issued, 556 though most in QEMU return a `dict`. 557 :raise ValueError: 558 If the QMP `Message` does not have either the 'execute' or 559 'exec-oob' fields set. 560 :raise ExecuteError: When the server returns an error response. 561 :raise ExecInterruptedError: if the connection was terminated early. 562 """ 563 if not ('execute' in msg or 'exec-oob' in msg): 564 raise ValueError("Requires 'execute' or 'exec-oob' message") 565 566 # Copy the Message so that the ID assigned by _execute() is 567 # local to this method; allowing the ID to be seen in raised 568 # Exceptions but without modifying the caller's held copy. 569 msg = Message(msg) 570 reply = await self._execute(msg) 571 572 if 'error' in reply: 573 try: 574 error_response = ErrorResponse(reply) 575 except (KeyError, TypeError) as err: 576 # Error response was malformed. 577 raise BadReplyError( 578 "QMP error reply is malformed", reply, msg, 579 ) from err 580 581 raise ExecuteError(error_response, msg, reply) 582 583 if 'return' not in reply: 584 raise BadReplyError( 585 "QMP reply is missing a 'error' or 'return' member", 586 reply, msg, 587 ) 588 589 return reply['return'] 590 591 @classmethod 592 def make_execute_msg(cls, cmd: str, 593 arguments: Optional[Mapping[str, object]] = None, 594 oob: bool = False) -> Message: 595 """ 596 Create an executable message to be sent by `execute_msg` later. 597 598 :param cmd: QMP command name. 599 :param arguments: Arguments (if any). Must be JSON-serializable. 600 :param oob: If `True`, execute "out of band". 601 602 :return: An executable QMP `Message`. 603 """ 604 msg = Message({'exec-oob' if oob else 'execute': cmd}) 605 if arguments is not None: 606 msg['arguments'] = arguments 607 return msg 608 609 @upper_half 610 async def execute(self, cmd: str, 611 arguments: Optional[Mapping[str, object]] = None, 612 oob: bool = False) -> object: 613 """ 614 Execute a QMP command and return its value. 615 616 :param cmd: QMP command name. 617 :param arguments: Arguments (if any). Must be JSON-serializable. 618 :param oob: If `True`, execute "out of band". 619 620 :return: 621 The command execution return value from the server. The type of 622 object returned depends on the command that was issued, 623 though most in QEMU return a `dict`. 624 :raise ExecuteError: When the server returns an error response. 625 :raise ExecInterruptedError: if the connection was terminated early. 626 """ 627 msg = self.make_execute_msg(cmd, arguments, oob=oob) 628 return await self.execute_msg(msg) 629 630 @upper_half 631 @require(Runstate.RUNNING) 632 def send_fd_scm(self, fd: int) -> None: 633 """ 634 Send a file descriptor to the remote via SCM_RIGHTS. 635 """ 636 assert self._writer is not None 637 sock = self._writer.transport.get_extra_info('socket') 638 639 if sock.family != socket.AF_UNIX: 640 raise AQMPError("Sending file descriptors requires a UNIX socket.") 641 642 # Void the warranty sticker. 643 # Access to sendmsg in asyncio is scheduled for removal in Python 3.11. 644 sock = sock._sock # pylint: disable=protected-access 645 sock.sendmsg( 646 [b' '], 647 [(socket.SOL_SOCKET, socket.SCM_RIGHTS, struct.pack('@i', fd))] 648 )