__init__.py (13793B)
1""" 2QEMU Monitor Protocol (QMP) development library & tooling. 3 4This package provides a fairly low-level class for communicating to QMP 5protocol servers, as implemented by QEMU, the QEMU Guest Agent, and the 6QEMU Storage Daemon. This library is not intended for production use. 7 8`QEMUMonitorProtocol` is the primary class of interest, and all errors 9raised derive from `QMPError`. 10""" 11 12# Copyright (C) 2009, 2010 Red Hat Inc. 13# 14# Authors: 15# Luiz Capitulino <lcapitulino@redhat.com> 16# 17# This work is licensed under the terms of the GNU GPL, version 2. See 18# the COPYING file in the top-level directory. 19 20import errno 21import json 22import logging 23import socket 24import struct 25from types import TracebackType 26from typing import ( 27 Any, 28 Dict, 29 List, 30 Optional, 31 TextIO, 32 Tuple, 33 Type, 34 TypeVar, 35 Union, 36 cast, 37) 38 39 40#: QMPMessage is an entire QMP message of any kind. 41QMPMessage = Dict[str, Any] 42 43#: QMPReturnValue is the 'return' value of a command. 44QMPReturnValue = object 45 46#: QMPObject is any object in a QMP message. 47QMPObject = Dict[str, object] 48 49# QMPMessage can be outgoing commands or incoming events/returns. 50# QMPReturnValue is usually a dict/json object, but due to QAPI's 51# 'returns-whitelist', it can actually be anything. 52# 53# {'return': {}} is a QMPMessage, 54# {} is the QMPReturnValue. 55 56 57InternetAddrT = Tuple[str, int] 58UnixAddrT = str 59SocketAddrT = Union[InternetAddrT, UnixAddrT] 60 61 62class QMPError(Exception): 63 """ 64 QMP base exception 65 """ 66 67 68class QMPConnectError(QMPError): 69 """ 70 QMP connection exception 71 """ 72 73 74class QMPCapabilitiesError(QMPError): 75 """ 76 QMP negotiate capabilities exception 77 """ 78 79 80class QMPTimeoutError(QMPError): 81 """ 82 QMP timeout exception 83 """ 84 85 86class QMPProtocolError(QMPError): 87 """ 88 QMP protocol error; unexpected response 89 """ 90 91 92class QMPResponseError(QMPError): 93 """ 94 Represents erroneous QMP monitor reply 95 """ 96 def __init__(self, reply: QMPMessage): 97 try: 98 desc = reply['error']['desc'] 99 except KeyError: 100 desc = reply 101 super().__init__(desc) 102 self.reply = reply 103 104 105class QMPBadPortError(QMPError): 106 """ 107 Unable to parse socket address: Port was non-numerical. 108 """ 109 110 111class QEMUMonitorProtocol: 112 """ 113 Provide an API to connect to QEMU via QEMU Monitor Protocol (QMP) and then 114 allow to handle commands and events. 115 """ 116 117 #: Logger object for debugging messages 118 logger = logging.getLogger('QMP') 119 120 def __init__(self, address: SocketAddrT, 121 server: bool = False, 122 nickname: Optional[str] = None): 123 """ 124 Create a QEMUMonitorProtocol class. 125 126 @param address: QEMU address, can be either a unix socket path (string) 127 or a tuple in the form ( address, port ) for a TCP 128 connection 129 @param server: server mode listens on the socket (bool) 130 @raise OSError on socket connection errors 131 @note No connection is established, this is done by the connect() or 132 accept() methods 133 """ 134 self.__events: List[QMPMessage] = [] 135 self.__address = address 136 self.__sock = self.__get_sock() 137 self.__sockfile: Optional[TextIO] = None 138 self._nickname = nickname 139 if self._nickname: 140 self.logger = logging.getLogger('QMP').getChild(self._nickname) 141 if server: 142 self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 143 self.__sock.bind(self.__address) 144 self.__sock.listen(1) 145 146 def __get_sock(self) -> socket.socket: 147 if isinstance(self.__address, tuple): 148 family = socket.AF_INET 149 else: 150 family = socket.AF_UNIX 151 return socket.socket(family, socket.SOCK_STREAM) 152 153 def __negotiate_capabilities(self) -> QMPMessage: 154 greeting = self.__json_read() 155 if greeting is None or "QMP" not in greeting: 156 raise QMPConnectError 157 # Greeting seems ok, negotiate capabilities 158 resp = self.cmd('qmp_capabilities') 159 if resp and "return" in resp: 160 return greeting 161 raise QMPCapabilitiesError 162 163 def __json_read(self, only_event: bool = False) -> Optional[QMPMessage]: 164 assert self.__sockfile is not None 165 while True: 166 data = self.__sockfile.readline() 167 if not data: 168 return None 169 # By definition, any JSON received from QMP is a QMPMessage, 170 # and we are asserting only at static analysis time that it 171 # has a particular shape. 172 resp: QMPMessage = json.loads(data) 173 if 'event' in resp: 174 self.logger.debug("<<< %s", resp) 175 self.__events.append(resp) 176 if not only_event: 177 continue 178 return resp 179 180 def __get_events(self, wait: Union[bool, float] = False) -> None: 181 """ 182 Check for new events in the stream and cache them in __events. 183 184 @param wait (bool): block until an event is available. 185 @param wait (float): If wait is a float, treat it as a timeout value. 186 187 @raise QMPTimeoutError: If a timeout float is provided and the timeout 188 period elapses. 189 @raise QMPConnectError: If wait is True but no events could be 190 retrieved or if some other error occurred. 191 """ 192 193 # Current timeout and blocking status 194 current_timeout = self.__sock.gettimeout() 195 196 # Check for new events regardless and pull them into the cache: 197 self.__sock.settimeout(0) # i.e. setblocking(False) 198 try: 199 self.__json_read() 200 except OSError as err: 201 # EAGAIN: No data available; not critical 202 if err.errno != errno.EAGAIN: 203 raise 204 finally: 205 self.__sock.settimeout(current_timeout) 206 207 # Wait for new events, if needed. 208 # if wait is 0.0, this means "no wait" and is also implicitly false. 209 if not self.__events and wait: 210 if isinstance(wait, float): 211 self.__sock.settimeout(wait) 212 try: 213 ret = self.__json_read(only_event=True) 214 except socket.timeout as err: 215 raise QMPTimeoutError("Timeout waiting for event") from err 216 except Exception as err: 217 msg = "Error while reading from socket" 218 raise QMPConnectError(msg) from err 219 finally: 220 self.__sock.settimeout(current_timeout) 221 222 if ret is None: 223 raise QMPConnectError("Error while reading from socket") 224 225 T = TypeVar('T') 226 227 def __enter__(self: T) -> T: 228 # Implement context manager enter function. 229 return self 230 231 def __exit__(self, 232 # pylint: disable=duplicate-code 233 # see https://github.com/PyCQA/pylint/issues/3619 234 exc_type: Optional[Type[BaseException]], 235 exc_val: Optional[BaseException], 236 exc_tb: Optional[TracebackType]) -> None: 237 # Implement context manager exit function. 238 self.close() 239 240 @classmethod 241 def parse_address(cls, address: str) -> SocketAddrT: 242 """ 243 Parse a string into a QMP address. 244 245 Figure out if the argument is in the port:host form. 246 If it's not, it's probably a file path. 247 """ 248 components = address.split(':') 249 if len(components) == 2: 250 try: 251 port = int(components[1]) 252 except ValueError: 253 msg = f"Bad port: '{components[1]}' in '{address}'." 254 raise QMPBadPortError(msg) from None 255 return (components[0], port) 256 257 # Treat as filepath. 258 return address 259 260 def connect(self, negotiate: bool = True) -> Optional[QMPMessage]: 261 """ 262 Connect to the QMP Monitor and perform capabilities negotiation. 263 264 @return QMP greeting dict, or None if negotiate is false 265 @raise OSError on socket connection errors 266 @raise QMPConnectError if the greeting is not received 267 @raise QMPCapabilitiesError if fails to negotiate capabilities 268 """ 269 self.__sock.connect(self.__address) 270 self.__sockfile = self.__sock.makefile(mode='r') 271 if negotiate: 272 return self.__negotiate_capabilities() 273 return None 274 275 def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage: 276 """ 277 Await connection from QMP Monitor and perform capabilities negotiation. 278 279 @param timeout: timeout in seconds (nonnegative float number, or 280 None). The value passed will set the behavior of the 281 underneath QMP socket as described in [1]. 282 Default value is set to 15.0. 283 284 @return QMP greeting dict 285 @raise OSError on socket connection errors 286 @raise QMPConnectError if the greeting is not received 287 @raise QMPCapabilitiesError if fails to negotiate capabilities 288 289 [1] 290 https://docs.python.org/3/library/socket.html#socket.socket.settimeout 291 """ 292 self.__sock.settimeout(timeout) 293 self.__sock, _ = self.__sock.accept() 294 self.__sockfile = self.__sock.makefile(mode='r') 295 return self.__negotiate_capabilities() 296 297 def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage: 298 """ 299 Send a QMP command to the QMP Monitor. 300 301 @param qmp_cmd: QMP command to be sent as a Python dict 302 @return QMP response as a Python dict 303 """ 304 self.logger.debug(">>> %s", qmp_cmd) 305 self.__sock.sendall(json.dumps(qmp_cmd).encode('utf-8')) 306 resp = self.__json_read() 307 if resp is None: 308 raise QMPConnectError("Unexpected empty reply from server") 309 self.logger.debug("<<< %s", resp) 310 return resp 311 312 def cmd(self, name: str, 313 args: Optional[Dict[str, object]] = None, 314 cmd_id: Optional[object] = None) -> QMPMessage: 315 """ 316 Build a QMP command and send it to the QMP Monitor. 317 318 @param name: command name (string) 319 @param args: command arguments (dict) 320 @param cmd_id: command id (dict, list, string or int) 321 """ 322 qmp_cmd: QMPMessage = {'execute': name} 323 if args: 324 qmp_cmd['arguments'] = args 325 if cmd_id: 326 qmp_cmd['id'] = cmd_id 327 return self.cmd_obj(qmp_cmd) 328 329 def command(self, cmd: str, **kwds: object) -> QMPReturnValue: 330 """ 331 Build and send a QMP command to the monitor, report errors if any 332 """ 333 ret = self.cmd(cmd, kwds) 334 if 'error' in ret: 335 raise QMPResponseError(ret) 336 if 'return' not in ret: 337 raise QMPProtocolError( 338 "'return' key not found in QMP response '{}'".format(str(ret)) 339 ) 340 return cast(QMPReturnValue, ret['return']) 341 342 def pull_event(self, 343 wait: Union[bool, float] = False) -> Optional[QMPMessage]: 344 """ 345 Pulls a single event. 346 347 @param wait (bool): block until an event is available. 348 @param wait (float): If wait is a float, treat it as a timeout value. 349 350 @raise QMPTimeoutError: If a timeout float is provided and the timeout 351 period elapses. 352 @raise QMPConnectError: If wait is True but no events could be 353 retrieved or if some other error occurred. 354 355 @return The first available QMP event, or None. 356 """ 357 self.__get_events(wait) 358 359 if self.__events: 360 return self.__events.pop(0) 361 return None 362 363 def get_events(self, wait: bool = False) -> List[QMPMessage]: 364 """ 365 Get a list of available QMP events and clear all pending events. 366 367 @param wait (bool): block until an event is available. 368 @param wait (float): If wait is a float, treat it as a timeout value. 369 370 @raise QMPTimeoutError: If a timeout float is provided and the timeout 371 period elapses. 372 @raise QMPConnectError: If wait is True but no events could be 373 retrieved or if some other error occurred. 374 375 @return The list of available QMP events. 376 """ 377 self.__get_events(wait) 378 events = self.__events 379 self.__events = [] 380 return events 381 382 def clear_events(self) -> None: 383 """ 384 Clear current list of pending events. 385 """ 386 self.__events = [] 387 388 def close(self) -> None: 389 """ 390 Close the socket and socket file. 391 """ 392 if self.__sock: 393 self.__sock.close() 394 if self.__sockfile: 395 self.__sockfile.close() 396 397 def settimeout(self, timeout: Optional[float]) -> None: 398 """ 399 Set the socket timeout. 400 401 @param timeout (float): timeout in seconds (non-zero), or None. 402 @note This is a wrap around socket.settimeout 403 404 @raise ValueError: if timeout was set to 0. 405 """ 406 if timeout == 0: 407 msg = "timeout cannot be 0; this engages non-blocking mode." 408 msg += " Use 'None' instead to disable timeouts." 409 raise ValueError(msg) 410 self.__sock.settimeout(timeout) 411 412 def send_fd_scm(self, fd: int) -> None: 413 """ 414 Send a file descriptor to the remote via SCM_RIGHTS. 415 """ 416 if self.__sock.family != socket.AF_UNIX: 417 raise RuntimeError("Can't use SCM_RIGHTS on non-AF_UNIX socket.") 418 419 self.__sock.sendmsg( 420 [b' '], 421 [(socket.SOL_SOCKET, socket.SCM_RIGHTS, struct.pack('@i', fd))] 422 )