cachepc-qemu

Fork of AMDESE/qemu with changes for cachepc side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-qemu
Log | Files | Refs | Submodules | LICENSE | sfeed.txt

qom_fuse.py (5990B)


      1"""
      2QEMU Object Model FUSE filesystem tool
      3
      4This script offers a simple FUSE filesystem within which the QOM tree
      5may be browsed, queried and edited using traditional shell tooling.
      6
      7This script requires the 'fusepy' python package.
      8
      9
     10usage: qom-fuse [-h] [--socket SOCKET] <mount>
     11
     12Mount a QOM tree as a FUSE filesystem
     13
     14positional arguments:
     15  <mount>               Mount point
     16
     17optional arguments:
     18  -h, --help            show this help message and exit
     19  --socket SOCKET, -s SOCKET
     20                        QMP socket path or address (addr:port). May also be
     21                        set via QMP_SOCKET environment variable.
     22"""
     23##
     24# Copyright IBM, Corp. 2012
     25# Copyright (C) 2020 Red Hat, Inc.
     26#
     27# Authors:
     28#  Anthony Liguori   <aliguori@us.ibm.com>
     29#  Markus Armbruster <armbru@redhat.com>
     30#
     31# This work is licensed under the terms of the GNU GPL, version 2 or later.
     32# See the COPYING file in the top-level directory.
     33##
     34
     35import argparse
     36from errno import ENOENT, EPERM
     37import stat
     38import sys
     39from typing import (
     40    IO,
     41    Dict,
     42    Iterator,
     43    Mapping,
     44    Optional,
     45    Union,
     46)
     47
     48import fuse
     49from fuse import FUSE, FuseOSError, Operations
     50
     51from . import QMPResponseError
     52from .qom_common import QOMCommand
     53
     54
     55fuse.fuse_python_api = (0, 2)
     56
     57
     58class QOMFuse(QOMCommand, Operations):
     59    """
     60    QOMFuse implements both fuse.Operations and QOMCommand.
     61
     62    Operations implements the FS, and QOMCommand implements the CLI command.
     63    """
     64    name = 'fuse'
     65    help = 'Mount a QOM tree as a FUSE filesystem'
     66    fuse: FUSE
     67
     68    @classmethod
     69    def configure_parser(cls, parser: argparse.ArgumentParser) -> None:
     70        super().configure_parser(parser)
     71        parser.add_argument(
     72            'mount',
     73            metavar='<mount>',
     74            action='store',
     75            help="Mount point",
     76        )
     77
     78    def __init__(self, args: argparse.Namespace):
     79        super().__init__(args)
     80        self.mount = args.mount
     81        self.ino_map: Dict[str, int] = {}
     82        self.ino_count = 1
     83
     84    def run(self) -> int:
     85        print(f"Mounting QOMFS to '{self.mount}'", file=sys.stderr)
     86        self.fuse = FUSE(self, self.mount, foreground=True)
     87        return 0
     88
     89    def get_ino(self, path: str) -> int:
     90        """Get an inode number for a given QOM path."""
     91        if path in self.ino_map:
     92            return self.ino_map[path]
     93        self.ino_map[path] = self.ino_count
     94        self.ino_count += 1
     95        return self.ino_map[path]
     96
     97    def is_object(self, path: str) -> bool:
     98        """Is the given QOM path an object?"""
     99        try:
    100            self.qom_list(path)
    101            return True
    102        except QMPResponseError:
    103            return False
    104
    105    def is_property(self, path: str) -> bool:
    106        """Is the given QOM path a property?"""
    107        path, prop = path.rsplit('/', 1)
    108        if path == '':
    109            path = '/'
    110        try:
    111            for item in self.qom_list(path):
    112                if item.name == prop:
    113                    return True
    114            return False
    115        except QMPResponseError:
    116            return False
    117
    118    def is_link(self, path: str) -> bool:
    119        """Is the given QOM path a link?"""
    120        path, prop = path.rsplit('/', 1)
    121        if path == '':
    122            path = '/'
    123        try:
    124            for item in self.qom_list(path):
    125                if item.name == prop and item.link:
    126                    return True
    127            return False
    128        except QMPResponseError:
    129            return False
    130
    131    def read(self, path: str, size: int, offset: int, fh: IO[bytes]) -> bytes:
    132        if not self.is_property(path):
    133            raise FuseOSError(ENOENT)
    134
    135        path, prop = path.rsplit('/', 1)
    136        if path == '':
    137            path = '/'
    138        try:
    139            data = str(self.qmp.command('qom-get', path=path, property=prop))
    140            data += '\n'  # make values shell friendly
    141        except QMPResponseError as err:
    142            raise FuseOSError(EPERM) from err
    143
    144        if offset > len(data):
    145            return b''
    146
    147        return bytes(data[offset:][:size], encoding='utf-8')
    148
    149    def readlink(self, path: str) -> Union[bool, str]:
    150        if not self.is_link(path):
    151            return False
    152        path, prop = path.rsplit('/', 1)
    153        prefix = '/'.join(['..'] * (len(path.split('/')) - 1))
    154        return prefix + str(self.qmp.command('qom-get', path=path,
    155                                             property=prop))
    156
    157    def getattr(self, path: str,
    158                fh: Optional[IO[bytes]] = None) -> Mapping[str, object]:
    159        if self.is_link(path):
    160            value = {
    161                'st_mode': 0o755 | stat.S_IFLNK,
    162                'st_ino': self.get_ino(path),
    163                'st_dev': 0,
    164                'st_nlink': 2,
    165                'st_uid': 1000,
    166                'st_gid': 1000,
    167                'st_size': 4096,
    168                'st_atime': 0,
    169                'st_mtime': 0,
    170                'st_ctime': 0
    171            }
    172        elif self.is_object(path):
    173            value = {
    174                'st_mode': 0o755 | stat.S_IFDIR,
    175                'st_ino': self.get_ino(path),
    176                'st_dev': 0,
    177                'st_nlink': 2,
    178                'st_uid': 1000,
    179                'st_gid': 1000,
    180                'st_size': 4096,
    181                'st_atime': 0,
    182                'st_mtime': 0,
    183                'st_ctime': 0
    184            }
    185        elif self.is_property(path):
    186            value = {
    187                'st_mode': 0o644 | stat.S_IFREG,
    188                'st_ino': self.get_ino(path),
    189                'st_dev': 0,
    190                'st_nlink': 1,
    191                'st_uid': 1000,
    192                'st_gid': 1000,
    193                'st_size': 4096,
    194                'st_atime': 0,
    195                'st_mtime': 0,
    196                'st_ctime': 0
    197            }
    198        else:
    199            raise FuseOSError(ENOENT)
    200        return value
    201
    202    def readdir(self, path: str, fh: IO[bytes]) -> Iterator[str]:
    203        yield '.'
    204        yield '..'
    205        for item in self.qom_list(path):
    206            yield item.name