cachepc-qemu

Fork of AMDESE/qemu with changes for cachepc side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-qemu
Log | Files | Refs | Submodules | LICENSE | sfeed.txt

qcow2_format.py (14526B)


      1# Library for manipulations with qcow2 image
      2#
      3# Copyright (c) 2020 Virtuozzo International GmbH.
      4# Copyright (C) 2012 Red Hat, Inc.
      5#
      6# This program is free software; you can redistribute it and/or modify
      7# it under the terms of the GNU General Public License as published by
      8# the Free Software Foundation; either version 2 of the License, or
      9# (at your option) any later version.
     10#
     11# This program is distributed in the hope that it will be useful,
     12# but WITHOUT ANY WARRANTY; without even the implied warranty of
     13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     14# GNU General Public License for more details.
     15#
     16# You should have received a copy of the GNU General Public License
     17# along with this program.  If not, see <http://www.gnu.org/licenses/>.
     18#
     19
     20import struct
     21import string
     22import json
     23
     24
     25class ComplexEncoder(json.JSONEncoder):
     26    def default(self, obj):
     27        if hasattr(obj, 'to_json'):
     28            return obj.to_json()
     29        else:
     30            return json.JSONEncoder.default(self, obj)
     31
     32
     33class Qcow2Field:
     34
     35    def __init__(self, value):
     36        self.value = value
     37
     38    def __str__(self):
     39        return str(self.value)
     40
     41
     42class Flags64(Qcow2Field):
     43
     44    def __str__(self):
     45        bits = []
     46        for bit in range(64):
     47            if self.value & (1 << bit):
     48                bits.append(bit)
     49        return str(bits)
     50
     51
     52class BitmapFlags(Qcow2Field):
     53
     54    flags = {
     55        0x1: 'in-use',
     56        0x2: 'auto'
     57    }
     58
     59    def __str__(self):
     60        bits = []
     61        for bit in range(64):
     62            flag = self.value & (1 << bit)
     63            if flag:
     64                bits.append(self.flags.get(flag, f'bit-{bit}'))
     65        return f'{self.value:#x} ({bits})'
     66
     67
     68class Enum(Qcow2Field):
     69
     70    def __str__(self):
     71        return f'{self.value:#x} ({self.mapping.get(self.value, "<unknown>")})'
     72
     73
     74class Qcow2StructMeta(type):
     75
     76    # Mapping from c types to python struct format
     77    ctypes = {
     78        'u8': 'B',
     79        'u16': 'H',
     80        'u32': 'I',
     81        'u64': 'Q'
     82    }
     83
     84    def __init__(self, name, bases, attrs):
     85        if 'fields' in attrs:
     86            self.fmt = '>' + ''.join(self.ctypes[f[0]] for f in self.fields)
     87
     88
     89class Qcow2Struct(metaclass=Qcow2StructMeta):
     90
     91    """Qcow2Struct: base class for qcow2 data structures
     92
     93    Successors should define fields class variable, which is: list of tuples,
     94    each of three elements:
     95        - c-type (one of 'u8', 'u16', 'u32', 'u64')
     96        - format (format_spec to use with .format() when dump or 'mask' to dump
     97                  bitmasks)
     98        - field name
     99    """
    100
    101    def __init__(self, fd=None, offset=None, data=None):
    102        """
    103        Two variants:
    104            1. Specify data. fd and offset must be None.
    105            2. Specify fd and offset, data must be None. offset may be omitted
    106               in this case, than current position of fd is used.
    107        """
    108        if data is None:
    109            assert fd is not None
    110            buf_size = struct.calcsize(self.fmt)
    111            if offset is not None:
    112                fd.seek(offset)
    113            data = fd.read(buf_size)
    114        else:
    115            assert fd is None and offset is None
    116
    117        values = struct.unpack(self.fmt, data)
    118        self.__dict__ = dict((field[2], values[i])
    119                             for i, field in enumerate(self.fields))
    120
    121    def dump(self, is_json=False):
    122        if is_json:
    123            print(json.dumps(self.to_json(), indent=4, cls=ComplexEncoder))
    124            return
    125
    126        for f in self.fields:
    127            value = self.__dict__[f[2]]
    128            if isinstance(f[1], str):
    129                value_str = f[1].format(value)
    130            else:
    131                value_str = str(f[1](value))
    132
    133            print('{:<25} {}'.format(f[2], value_str))
    134
    135    def to_json(self):
    136        return dict((f[2], self.__dict__[f[2]]) for f in self.fields)
    137
    138
    139class Qcow2BitmapExt(Qcow2Struct):
    140
    141    fields = (
    142        ('u32', '{}', 'nb_bitmaps'),
    143        ('u32', '{}', 'reserved32'),
    144        ('u64', '{:#x}', 'bitmap_directory_size'),
    145        ('u64', '{:#x}', 'bitmap_directory_offset')
    146    )
    147
    148    def __init__(self, fd, cluster_size):
    149        super().__init__(fd=fd)
    150        tail = struct.calcsize(self.fmt) % 8
    151        if tail:
    152            fd.seek(8 - tail, 1)
    153        position = fd.tell()
    154        self.cluster_size = cluster_size
    155        self.read_bitmap_directory(fd)
    156        fd.seek(position)
    157
    158    def read_bitmap_directory(self, fd):
    159        fd.seek(self.bitmap_directory_offset)
    160        self.bitmap_directory = \
    161            [Qcow2BitmapDirEntry(fd, cluster_size=self.cluster_size)
    162             for _ in range(self.nb_bitmaps)]
    163
    164    def dump(self):
    165        super().dump()
    166        for entry in self.bitmap_directory:
    167            print()
    168            entry.dump()
    169
    170    def to_json(self):
    171        fields_dict = super().to_json()
    172        fields_dict['bitmap_directory'] = self.bitmap_directory
    173        return fields_dict
    174
    175
    176class Qcow2BitmapDirEntry(Qcow2Struct):
    177
    178    fields = (
    179        ('u64', '{:#x}', 'bitmap_table_offset'),
    180        ('u32', '{}', 'bitmap_table_size'),
    181        ('u32', BitmapFlags, 'flags'),
    182        ('u8',  '{}', 'type'),
    183        ('u8',  '{}', 'granularity_bits'),
    184        ('u16', '{}', 'name_size'),
    185        ('u32', '{}', 'extra_data_size')
    186    )
    187
    188    def __init__(self, fd, cluster_size):
    189        super().__init__(fd=fd)
    190        self.cluster_size = cluster_size
    191        # Seek relative to the current position in the file
    192        fd.seek(self.extra_data_size, 1)
    193        bitmap_name = fd.read(self.name_size)
    194        self.name = bitmap_name.decode('ascii')
    195        # Move position to the end of the entry in the directory
    196        entry_raw_size = self.bitmap_dir_entry_raw_size()
    197        padding = ((entry_raw_size + 7) & ~7) - entry_raw_size
    198        fd.seek(padding, 1)
    199        self.bitmap_table = Qcow2BitmapTable(fd=fd,
    200                                             offset=self.bitmap_table_offset,
    201                                             nb_entries=self.bitmap_table_size,
    202                                             cluster_size=self.cluster_size)
    203
    204    def bitmap_dir_entry_raw_size(self):
    205        return struct.calcsize(self.fmt) + self.name_size + \
    206            self.extra_data_size
    207
    208    def dump(self):
    209        print(f'{"Bitmap name":<25} {self.name}')
    210        super(Qcow2BitmapDirEntry, self).dump()
    211        self.bitmap_table.dump()
    212
    213    def to_json(self):
    214        # Put the name ahead of the dict
    215        return {
    216            'name': self.name,
    217            **super().to_json(),
    218            'bitmap_table': self.bitmap_table
    219        }
    220
    221
    222class Qcow2BitmapTableEntry(Qcow2Struct):
    223
    224    fields = (
    225        ('u64',  '{}', 'entry'),
    226    )
    227
    228    BME_TABLE_ENTRY_RESERVED_MASK = 0xff000000000001fe
    229    BME_TABLE_ENTRY_OFFSET_MASK = 0x00fffffffffffe00
    230    BME_TABLE_ENTRY_FLAG_ALL_ONES = 1
    231
    232    def __init__(self, fd):
    233        super().__init__(fd=fd)
    234        self.reserved = self.entry & self.BME_TABLE_ENTRY_RESERVED_MASK
    235        self.offset = self.entry & self.BME_TABLE_ENTRY_OFFSET_MASK
    236        if self.offset:
    237            if self.entry & self.BME_TABLE_ENTRY_FLAG_ALL_ONES:
    238                self.type = 'invalid'
    239            else:
    240                self.type = 'serialized'
    241        elif self.entry & self.BME_TABLE_ENTRY_FLAG_ALL_ONES:
    242            self.type = 'all-ones'
    243        else:
    244            self.type = 'all-zeroes'
    245
    246    def to_json(self):
    247        return {'type': self.type, 'offset': self.offset,
    248                'reserved': self.reserved}
    249
    250
    251class Qcow2BitmapTable:
    252
    253    def __init__(self, fd, offset, nb_entries, cluster_size):
    254        self.cluster_size = cluster_size
    255        position = fd.tell()
    256        fd.seek(offset)
    257        self.entries = [Qcow2BitmapTableEntry(fd) for _ in range(nb_entries)]
    258        fd.seek(position)
    259
    260    def dump(self):
    261        bitmap_table = enumerate(self.entries)
    262        print(f'{"Bitmap table":<14} {"type":<15} {"size":<12} {"offset"}')
    263        for i, entry in bitmap_table:
    264            if entry.type == 'serialized':
    265                size = self.cluster_size
    266            else:
    267                size = 0
    268            print(f'{i:<14} {entry.type:<15} {size:<12} {entry.offset}')
    269
    270    def to_json(self):
    271        return self.entries
    272
    273
    274QCOW2_EXT_MAGIC_BITMAPS = 0x23852875
    275
    276
    277class QcowHeaderExtension(Qcow2Struct):
    278
    279    class Magic(Enum):
    280        mapping = {
    281            0xe2792aca: 'Backing format',
    282            0x6803f857: 'Feature table',
    283            0x0537be77: 'Crypto header',
    284            QCOW2_EXT_MAGIC_BITMAPS: 'Bitmaps',
    285            0x44415441: 'Data file'
    286        }
    287
    288        def to_json(self):
    289            return self.mapping.get(self.value, "<unknown>")
    290
    291    fields = (
    292        ('u32', Magic, 'magic'),
    293        ('u32', '{}', 'length')
    294        # length bytes of data follows
    295        # then padding to next multiply of 8
    296    )
    297
    298    def __init__(self, magic=None, length=None, data=None, fd=None,
    299                 cluster_size=None):
    300        """
    301        Support both loading from fd and creation from user data.
    302        For fd-based creation current position in a file will be used to read
    303        the data.
    304        The cluster_size value may be obtained by dependent structures.
    305
    306        This should be somehow refactored and functionality should be moved to
    307        superclass (to allow creation of any qcow2 struct), but then, fields
    308        of variable length (data here) should be supported in base class
    309        somehow. Note also, that we probably want to parse different
    310        extensions. Should they be subclasses of this class, or how to do it
    311        better? Should it be something like QAPI union with discriminator field
    312        (magic here). So, it's a TODO. We'll see how to properly refactor this
    313        when we have more qcow2 structures.
    314        """
    315        if fd is None:
    316            assert all(v is not None for v in (magic, length, data))
    317            self.magic = magic
    318            self.length = length
    319            if length % 8 != 0:
    320                padding = 8 - (length % 8)
    321                data += b'\0' * padding
    322            self.data = data
    323        else:
    324            assert all(v is None for v in (magic, length, data))
    325            super().__init__(fd=fd)
    326            if self.magic == QCOW2_EXT_MAGIC_BITMAPS:
    327                self.obj = Qcow2BitmapExt(fd=fd, cluster_size=cluster_size)
    328                self.data = None
    329            else:
    330                padded = (self.length + 7) & ~7
    331                self.data = fd.read(padded)
    332                assert self.data is not None
    333                self.obj = None
    334
    335        if self.data is not None:
    336            data_str = self.data[:self.length]
    337            if all(c in string.printable.encode(
    338                'ascii') for c in data_str):
    339                data_str = f"'{ data_str.decode('ascii') }'"
    340            else:
    341                data_str = '<binary>'
    342            self.data_str = data_str
    343
    344
    345    def dump(self):
    346        super().dump()
    347
    348        if self.obj is None:
    349            print(f'{"data":<25} {self.data_str}')
    350        else:
    351            self.obj.dump()
    352
    353    def to_json(self):
    354        # Put the name ahead of the dict
    355        res = {'name': self.Magic(self.magic), **super().to_json()}
    356        if self.obj is not None:
    357            res['data'] = self.obj
    358        else:
    359            res['data_str'] = self.data_str
    360
    361        return res
    362
    363    @classmethod
    364    def create(cls, magic, data):
    365        return QcowHeaderExtension(magic, len(data), data)
    366
    367
    368class QcowHeader(Qcow2Struct):
    369
    370    fields = (
    371        # Version 2 header fields
    372        ('u32', '{:#x}', 'magic'),
    373        ('u32', '{}', 'version'),
    374        ('u64', '{:#x}', 'backing_file_offset'),
    375        ('u32', '{:#x}', 'backing_file_size'),
    376        ('u32', '{}', 'cluster_bits'),
    377        ('u64', '{}', 'size'),
    378        ('u32', '{}', 'crypt_method'),
    379        ('u32', '{}', 'l1_size'),
    380        ('u64', '{:#x}', 'l1_table_offset'),
    381        ('u64', '{:#x}', 'refcount_table_offset'),
    382        ('u32', '{}', 'refcount_table_clusters'),
    383        ('u32', '{}', 'nb_snapshots'),
    384        ('u64', '{:#x}', 'snapshot_offset'),
    385
    386        # Version 3 header fields
    387        ('u64', Flags64, 'incompatible_features'),
    388        ('u64', Flags64, 'compatible_features'),
    389        ('u64', Flags64, 'autoclear_features'),
    390        ('u32', '{}', 'refcount_order'),
    391        ('u32', '{}', 'header_length'),
    392    )
    393
    394    def __init__(self, fd):
    395        super().__init__(fd=fd, offset=0)
    396
    397        self.set_defaults()
    398        self.cluster_size = 1 << self.cluster_bits
    399
    400        fd.seek(self.header_length)
    401        self.load_extensions(fd)
    402
    403        if self.backing_file_offset:
    404            fd.seek(self.backing_file_offset)
    405            self.backing_file = fd.read(self.backing_file_size)
    406        else:
    407            self.backing_file = None
    408
    409    def set_defaults(self):
    410        if self.version == 2:
    411            self.incompatible_features = 0
    412            self.compatible_features = 0
    413            self.autoclear_features = 0
    414            self.refcount_order = 4
    415            self.header_length = 72
    416
    417    def load_extensions(self, fd):
    418        self.extensions = []
    419
    420        if self.backing_file_offset != 0:
    421            end = min(self.cluster_size, self.backing_file_offset)
    422        else:
    423            end = self.cluster_size
    424
    425        while fd.tell() < end:
    426            ext = QcowHeaderExtension(fd=fd, cluster_size=self.cluster_size)
    427            if ext.magic == 0:
    428                break
    429            else:
    430                self.extensions.append(ext)
    431
    432    def update_extensions(self, fd):
    433
    434        fd.seek(self.header_length)
    435        extensions = self.extensions
    436        extensions.append(QcowHeaderExtension(0, 0, b''))
    437        for ex in extensions:
    438            buf = struct.pack('>II', ex.magic, ex.length)
    439            fd.write(buf)
    440            fd.write(ex.data)
    441
    442        if self.backing_file is not None:
    443            self.backing_file_offset = fd.tell()
    444            fd.write(self.backing_file)
    445
    446        if fd.tell() > self.cluster_size:
    447            raise Exception('I think I just broke the image...')
    448
    449    def update(self, fd):
    450        header_bytes = self.header_length
    451
    452        self.update_extensions(fd)
    453
    454        fd.seek(0)
    455        header = tuple(self.__dict__[f] for t, p, f in QcowHeader.fields)
    456        buf = struct.pack(QcowHeader.fmt, *header)
    457        buf = buf[0:header_bytes-1]
    458        fd.write(buf)
    459
    460    def dump_extensions(self, is_json=False):
    461        if is_json:
    462            print(json.dumps(self.extensions, indent=4, cls=ComplexEncoder))
    463            return
    464
    465        for ex in self.extensions:
    466            print('Header extension:')
    467            ex.dump()
    468            print()