cachepc-qemu

Fork of AMDESE/qemu with changes for cachepc side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-qemu
Log | Files | Refs | Submodules | LICENSE | sfeed.txt

qemu_ga_client.py (9399B)


      1"""
      2QEMU Guest Agent Client
      3
      4Usage:
      5
      6Start QEMU with:
      7
      8# qemu [...] -chardev socket,path=/tmp/qga.sock,server,wait=off,id=qga0 \
      9  -device virtio-serial \
     10  -device virtserialport,chardev=qga0,name=org.qemu.guest_agent.0
     11
     12Run the script:
     13
     14$ qemu-ga-client --address=/tmp/qga.sock <command> [args...]
     15
     16or
     17
     18$ export QGA_CLIENT_ADDRESS=/tmp/qga.sock
     19$ qemu-ga-client <command> [args...]
     20
     21For example:
     22
     23$ qemu-ga-client cat /etc/resolv.conf
     24# Generated by NetworkManager
     25nameserver 10.0.2.3
     26$ qemu-ga-client fsfreeze status
     27thawed
     28$ qemu-ga-client fsfreeze freeze
     292 filesystems frozen
     30
     31See also: https://wiki.qemu.org/Features/QAPI/GuestAgent
     32"""
     33
     34# Copyright (C) 2012 Ryota Ozaki <ozaki.ryota@gmail.com>
     35#
     36# This work is licensed under the terms of the GNU GPL, version 2.  See
     37# the COPYING file in the top-level directory.
     38
     39import argparse
     40import base64
     41import errno
     42import os
     43import random
     44import sys
     45from typing import (
     46    Any,
     47    Callable,
     48    Dict,
     49    Optional,
     50    Sequence,
     51)
     52
     53from qemu import qmp
     54from qemu.qmp import SocketAddrT
     55
     56
     57# This script has not seen many patches or careful attention in quite
     58# some time. If you would like to improve it, please review the design
     59# carefully and add docstrings at that point in time. Until then:
     60
     61# pylint: disable=missing-docstring
     62
     63
     64class QemuGuestAgent(qmp.QEMUMonitorProtocol):
     65    def __getattr__(self, name: str) -> Callable[..., Any]:
     66        def wrapper(**kwds: object) -> object:
     67            return self.command('guest-' + name.replace('_', '-'), **kwds)
     68        return wrapper
     69
     70
     71class QemuGuestAgentClient:
     72    def __init__(self, address: SocketAddrT):
     73        self.qga = QemuGuestAgent(address)
     74        self.qga.connect(negotiate=False)
     75
     76    def sync(self, timeout: Optional[float] = 3) -> None:
     77        # Avoid being blocked forever
     78        if not self.ping(timeout):
     79            raise EnvironmentError('Agent seems not alive')
     80        uid = random.randint(0, (1 << 32) - 1)
     81        while True:
     82            ret = self.qga.sync(id=uid)
     83            if isinstance(ret, int) and int(ret) == uid:
     84                break
     85
     86    def __file_read_all(self, handle: int) -> bytes:
     87        eof = False
     88        data = b''
     89        while not eof:
     90            ret = self.qga.file_read(handle=handle, count=1024)
     91            _data = base64.b64decode(ret['buf-b64'])
     92            data += _data
     93            eof = ret['eof']
     94        return data
     95
     96    def read(self, path: str) -> bytes:
     97        handle = self.qga.file_open(path=path)
     98        try:
     99            data = self.__file_read_all(handle)
    100        finally:
    101            self.qga.file_close(handle=handle)
    102        return data
    103
    104    def info(self) -> str:
    105        info = self.qga.info()
    106
    107        msgs = []
    108        msgs.append('version: ' + info['version'])
    109        msgs.append('supported_commands:')
    110        enabled = [c['name'] for c in info['supported_commands']
    111                   if c['enabled']]
    112        msgs.append('\tenabled: ' + ', '.join(enabled))
    113        disabled = [c['name'] for c in info['supported_commands']
    114                    if not c['enabled']]
    115        msgs.append('\tdisabled: ' + ', '.join(disabled))
    116
    117        return '\n'.join(msgs)
    118
    119    @classmethod
    120    def __gen_ipv4_netmask(cls, prefixlen: int) -> str:
    121        mask = int('1' * prefixlen + '0' * (32 - prefixlen), 2)
    122        return '.'.join([str(mask >> 24),
    123                         str((mask >> 16) & 0xff),
    124                         str((mask >> 8) & 0xff),
    125                         str(mask & 0xff)])
    126
    127    def ifconfig(self) -> str:
    128        nifs = self.qga.network_get_interfaces()
    129
    130        msgs = []
    131        for nif in nifs:
    132            msgs.append(nif['name'] + ':')
    133            if 'ip-addresses' in nif:
    134                for ipaddr in nif['ip-addresses']:
    135                    if ipaddr['ip-address-type'] == 'ipv4':
    136                        addr = ipaddr['ip-address']
    137                        mask = self.__gen_ipv4_netmask(int(ipaddr['prefix']))
    138                        msgs.append(f"\tinet {addr}  netmask {mask}")
    139                    elif ipaddr['ip-address-type'] == 'ipv6':
    140                        addr = ipaddr['ip-address']
    141                        prefix = ipaddr['prefix']
    142                        msgs.append(f"\tinet6 {addr}  prefixlen {prefix}")
    143            if nif['hardware-address'] != '00:00:00:00:00:00':
    144                msgs.append("\tether " + nif['hardware-address'])
    145
    146        return '\n'.join(msgs)
    147
    148    def ping(self, timeout: Optional[float]) -> bool:
    149        self.qga.settimeout(timeout)
    150        try:
    151            self.qga.ping()
    152        except TimeoutError:
    153            return False
    154        return True
    155
    156    def fsfreeze(self, cmd: str) -> object:
    157        if cmd not in ['status', 'freeze', 'thaw']:
    158            raise Exception('Invalid command: ' + cmd)
    159        # Can be int (freeze, thaw) or GuestFsfreezeStatus (status)
    160        return getattr(self.qga, 'fsfreeze' + '_' + cmd)()
    161
    162    def fstrim(self, minimum: int) -> Dict[str, object]:
    163        # returns GuestFilesystemTrimResponse
    164        ret = getattr(self.qga, 'fstrim')(minimum=minimum)
    165        assert isinstance(ret, dict)
    166        return ret
    167
    168    def suspend(self, mode: str) -> None:
    169        if mode not in ['disk', 'ram', 'hybrid']:
    170            raise Exception('Invalid mode: ' + mode)
    171
    172        try:
    173            getattr(self.qga, 'suspend' + '_' + mode)()
    174            # On error exception will raise
    175        except TimeoutError:
    176            # On success command will timed out
    177            return
    178
    179    def shutdown(self, mode: str = 'powerdown') -> None:
    180        if mode not in ['powerdown', 'halt', 'reboot']:
    181            raise Exception('Invalid mode: ' + mode)
    182
    183        try:
    184            self.qga.shutdown(mode=mode)
    185        except TimeoutError:
    186            pass
    187
    188
    189def _cmd_cat(client: QemuGuestAgentClient, args: Sequence[str]) -> None:
    190    if len(args) != 1:
    191        print('Invalid argument')
    192        print('Usage: cat <file>')
    193        sys.exit(1)
    194    print(client.read(args[0]))
    195
    196
    197def _cmd_fsfreeze(client: QemuGuestAgentClient, args: Sequence[str]) -> None:
    198    usage = 'Usage: fsfreeze status|freeze|thaw'
    199    if len(args) != 1:
    200        print('Invalid argument')
    201        print(usage)
    202        sys.exit(1)
    203    if args[0] not in ['status', 'freeze', 'thaw']:
    204        print('Invalid command: ' + args[0])
    205        print(usage)
    206        sys.exit(1)
    207    cmd = args[0]
    208    ret = client.fsfreeze(cmd)
    209    if cmd == 'status':
    210        print(ret)
    211        return
    212
    213    assert isinstance(ret, int)
    214    verb = 'frozen' if cmd == 'freeze' else 'thawed'
    215    print(f"{ret:d} filesystems {verb}")
    216
    217
    218def _cmd_fstrim(client: QemuGuestAgentClient, args: Sequence[str]) -> None:
    219    if len(args) == 0:
    220        minimum = 0
    221    else:
    222        minimum = int(args[0])
    223    print(client.fstrim(minimum))
    224
    225
    226def _cmd_ifconfig(client: QemuGuestAgentClient, args: Sequence[str]) -> None:
    227    assert not args
    228    print(client.ifconfig())
    229
    230
    231def _cmd_info(client: QemuGuestAgentClient, args: Sequence[str]) -> None:
    232    assert not args
    233    print(client.info())
    234
    235
    236def _cmd_ping(client: QemuGuestAgentClient, args: Sequence[str]) -> None:
    237    timeout = 3.0 if len(args) == 0 else float(args[0])
    238    alive = client.ping(timeout)
    239    if not alive:
    240        print("Not responded in %s sec" % args[0])
    241        sys.exit(1)
    242
    243
    244def _cmd_suspend(client: QemuGuestAgentClient, args: Sequence[str]) -> None:
    245    usage = 'Usage: suspend disk|ram|hybrid'
    246    if len(args) != 1:
    247        print('Less argument')
    248        print(usage)
    249        sys.exit(1)
    250    if args[0] not in ['disk', 'ram', 'hybrid']:
    251        print('Invalid command: ' + args[0])
    252        print(usage)
    253        sys.exit(1)
    254    client.suspend(args[0])
    255
    256
    257def _cmd_shutdown(client: QemuGuestAgentClient, args: Sequence[str]) -> None:
    258    assert not args
    259    client.shutdown()
    260
    261
    262_cmd_powerdown = _cmd_shutdown
    263
    264
    265def _cmd_halt(client: QemuGuestAgentClient, args: Sequence[str]) -> None:
    266    assert not args
    267    client.shutdown('halt')
    268
    269
    270def _cmd_reboot(client: QemuGuestAgentClient, args: Sequence[str]) -> None:
    271    assert not args
    272    client.shutdown('reboot')
    273
    274
    275commands = [m.replace('_cmd_', '') for m in dir() if '_cmd_' in m]
    276
    277
    278def send_command(address: str, cmd: str, args: Sequence[str]) -> None:
    279    if not os.path.exists(address):
    280        print('%s not found' % address)
    281        sys.exit(1)
    282
    283    if cmd not in commands:
    284        print('Invalid command: ' + cmd)
    285        print('Available commands: ' + ', '.join(commands))
    286        sys.exit(1)
    287
    288    try:
    289        client = QemuGuestAgentClient(address)
    290    except OSError as err:
    291        print(err)
    292        if err.errno == errno.ECONNREFUSED:
    293            print('Hint: qemu is not running?')
    294        sys.exit(1)
    295
    296    if cmd == 'fsfreeze' and args[0] == 'freeze':
    297        client.sync(60)
    298    elif cmd != 'ping':
    299        client.sync()
    300
    301    globals()['_cmd_' + cmd](client, args)
    302
    303
    304def main() -> None:
    305    address = os.environ.get('QGA_CLIENT_ADDRESS')
    306
    307    parser = argparse.ArgumentParser()
    308    parser.add_argument('--address', action='store',
    309                        default=address,
    310                        help='Specify a ip:port pair or a unix socket path')
    311    parser.add_argument('command', choices=commands)
    312    parser.add_argument('args', nargs='*')
    313
    314    args = parser.parse_args()
    315    if args.address is None:
    316        parser.error('address is not specified')
    317        sys.exit(1)
    318
    319    send_command(args.address, args.command, args.args)
    320
    321
    322if __name__ == '__main__':
    323    main()