qcow2_format.py (14526B)
1# Library for manipulations with qcow2 image 2# 3# Copyright (c) 2020 Virtuozzo International GmbH. 4# Copyright (C) 2012 Red Hat, Inc. 5# 6# This program is free software; you can redistribute it and/or modify 7# it under the terms of the GNU General Public License as published by 8# the Free Software Foundation; either version 2 of the License, or 9# (at your option) any later version. 10# 11# This program is distributed in the hope that it will be useful, 12# but WITHOUT ANY WARRANTY; without even the implied warranty of 13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14# GNU General Public License for more details. 15# 16# You should have received a copy of the GNU General Public License 17# along with this program. If not, see <http://www.gnu.org/licenses/>. 18# 19 20import struct 21import string 22import json 23 24 25class ComplexEncoder(json.JSONEncoder): 26 def default(self, obj): 27 if hasattr(obj, 'to_json'): 28 return obj.to_json() 29 else: 30 return json.JSONEncoder.default(self, obj) 31 32 33class Qcow2Field: 34 35 def __init__(self, value): 36 self.value = value 37 38 def __str__(self): 39 return str(self.value) 40 41 42class Flags64(Qcow2Field): 43 44 def __str__(self): 45 bits = [] 46 for bit in range(64): 47 if self.value & (1 << bit): 48 bits.append(bit) 49 return str(bits) 50 51 52class BitmapFlags(Qcow2Field): 53 54 flags = { 55 0x1: 'in-use', 56 0x2: 'auto' 57 } 58 59 def __str__(self): 60 bits = [] 61 for bit in range(64): 62 flag = self.value & (1 << bit) 63 if flag: 64 bits.append(self.flags.get(flag, f'bit-{bit}')) 65 return f'{self.value:#x} ({bits})' 66 67 68class Enum(Qcow2Field): 69 70 def __str__(self): 71 return f'{self.value:#x} ({self.mapping.get(self.value, "<unknown>")})' 72 73 74class Qcow2StructMeta(type): 75 76 # Mapping from c types to python struct format 77 ctypes = { 78 'u8': 'B', 79 'u16': 'H', 80 'u32': 'I', 81 'u64': 'Q' 82 } 83 84 def __init__(self, name, bases, attrs): 85 if 'fields' in attrs: 86 self.fmt = '>' + ''.join(self.ctypes[f[0]] for f in self.fields) 87 88 89class Qcow2Struct(metaclass=Qcow2StructMeta): 90 91 """Qcow2Struct: base class for qcow2 data structures 92 93 Successors should define fields class variable, which is: list of tuples, 94 each of three elements: 95 - c-type (one of 'u8', 'u16', 'u32', 'u64') 96 - format (format_spec to use with .format() when dump or 'mask' to dump 97 bitmasks) 98 - field name 99 """ 100 101 def __init__(self, fd=None, offset=None, data=None): 102 """ 103 Two variants: 104 1. Specify data. fd and offset must be None. 105 2. Specify fd and offset, data must be None. offset may be omitted 106 in this case, than current position of fd is used. 107 """ 108 if data is None: 109 assert fd is not None 110 buf_size = struct.calcsize(self.fmt) 111 if offset is not None: 112 fd.seek(offset) 113 data = fd.read(buf_size) 114 else: 115 assert fd is None and offset is None 116 117 values = struct.unpack(self.fmt, data) 118 self.__dict__ = dict((field[2], values[i]) 119 for i, field in enumerate(self.fields)) 120 121 def dump(self, is_json=False): 122 if is_json: 123 print(json.dumps(self.to_json(), indent=4, cls=ComplexEncoder)) 124 return 125 126 for f in self.fields: 127 value = self.__dict__[f[2]] 128 if isinstance(f[1], str): 129 value_str = f[1].format(value) 130 else: 131 value_str = str(f[1](value)) 132 133 print('{:<25} {}'.format(f[2], value_str)) 134 135 def to_json(self): 136 return dict((f[2], self.__dict__[f[2]]) for f in self.fields) 137 138 139class Qcow2BitmapExt(Qcow2Struct): 140 141 fields = ( 142 ('u32', '{}', 'nb_bitmaps'), 143 ('u32', '{}', 'reserved32'), 144 ('u64', '{:#x}', 'bitmap_directory_size'), 145 ('u64', '{:#x}', 'bitmap_directory_offset') 146 ) 147 148 def __init__(self, fd, cluster_size): 149 super().__init__(fd=fd) 150 tail = struct.calcsize(self.fmt) % 8 151 if tail: 152 fd.seek(8 - tail, 1) 153 position = fd.tell() 154 self.cluster_size = cluster_size 155 self.read_bitmap_directory(fd) 156 fd.seek(position) 157 158 def read_bitmap_directory(self, fd): 159 fd.seek(self.bitmap_directory_offset) 160 self.bitmap_directory = \ 161 [Qcow2BitmapDirEntry(fd, cluster_size=self.cluster_size) 162 for _ in range(self.nb_bitmaps)] 163 164 def dump(self): 165 super().dump() 166 for entry in self.bitmap_directory: 167 print() 168 entry.dump() 169 170 def to_json(self): 171 fields_dict = super().to_json() 172 fields_dict['bitmap_directory'] = self.bitmap_directory 173 return fields_dict 174 175 176class Qcow2BitmapDirEntry(Qcow2Struct): 177 178 fields = ( 179 ('u64', '{:#x}', 'bitmap_table_offset'), 180 ('u32', '{}', 'bitmap_table_size'), 181 ('u32', BitmapFlags, 'flags'), 182 ('u8', '{}', 'type'), 183 ('u8', '{}', 'granularity_bits'), 184 ('u16', '{}', 'name_size'), 185 ('u32', '{}', 'extra_data_size') 186 ) 187 188 def __init__(self, fd, cluster_size): 189 super().__init__(fd=fd) 190 self.cluster_size = cluster_size 191 # Seek relative to the current position in the file 192 fd.seek(self.extra_data_size, 1) 193 bitmap_name = fd.read(self.name_size) 194 self.name = bitmap_name.decode('ascii') 195 # Move position to the end of the entry in the directory 196 entry_raw_size = self.bitmap_dir_entry_raw_size() 197 padding = ((entry_raw_size + 7) & ~7) - entry_raw_size 198 fd.seek(padding, 1) 199 self.bitmap_table = Qcow2BitmapTable(fd=fd, 200 offset=self.bitmap_table_offset, 201 nb_entries=self.bitmap_table_size, 202 cluster_size=self.cluster_size) 203 204 def bitmap_dir_entry_raw_size(self): 205 return struct.calcsize(self.fmt) + self.name_size + \ 206 self.extra_data_size 207 208 def dump(self): 209 print(f'{"Bitmap name":<25} {self.name}') 210 super(Qcow2BitmapDirEntry, self).dump() 211 self.bitmap_table.dump() 212 213 def to_json(self): 214 # Put the name ahead of the dict 215 return { 216 'name': self.name, 217 **super().to_json(), 218 'bitmap_table': self.bitmap_table 219 } 220 221 222class Qcow2BitmapTableEntry(Qcow2Struct): 223 224 fields = ( 225 ('u64', '{}', 'entry'), 226 ) 227 228 BME_TABLE_ENTRY_RESERVED_MASK = 0xff000000000001fe 229 BME_TABLE_ENTRY_OFFSET_MASK = 0x00fffffffffffe00 230 BME_TABLE_ENTRY_FLAG_ALL_ONES = 1 231 232 def __init__(self, fd): 233 super().__init__(fd=fd) 234 self.reserved = self.entry & self.BME_TABLE_ENTRY_RESERVED_MASK 235 self.offset = self.entry & self.BME_TABLE_ENTRY_OFFSET_MASK 236 if self.offset: 237 if self.entry & self.BME_TABLE_ENTRY_FLAG_ALL_ONES: 238 self.type = 'invalid' 239 else: 240 self.type = 'serialized' 241 elif self.entry & self.BME_TABLE_ENTRY_FLAG_ALL_ONES: 242 self.type = 'all-ones' 243 else: 244 self.type = 'all-zeroes' 245 246 def to_json(self): 247 return {'type': self.type, 'offset': self.offset, 248 'reserved': self.reserved} 249 250 251class Qcow2BitmapTable: 252 253 def __init__(self, fd, offset, nb_entries, cluster_size): 254 self.cluster_size = cluster_size 255 position = fd.tell() 256 fd.seek(offset) 257 self.entries = [Qcow2BitmapTableEntry(fd) for _ in range(nb_entries)] 258 fd.seek(position) 259 260 def dump(self): 261 bitmap_table = enumerate(self.entries) 262 print(f'{"Bitmap table":<14} {"type":<15} {"size":<12} {"offset"}') 263 for i, entry in bitmap_table: 264 if entry.type == 'serialized': 265 size = self.cluster_size 266 else: 267 size = 0 268 print(f'{i:<14} {entry.type:<15} {size:<12} {entry.offset}') 269 270 def to_json(self): 271 return self.entries 272 273 274QCOW2_EXT_MAGIC_BITMAPS = 0x23852875 275 276 277class QcowHeaderExtension(Qcow2Struct): 278 279 class Magic(Enum): 280 mapping = { 281 0xe2792aca: 'Backing format', 282 0x6803f857: 'Feature table', 283 0x0537be77: 'Crypto header', 284 QCOW2_EXT_MAGIC_BITMAPS: 'Bitmaps', 285 0x44415441: 'Data file' 286 } 287 288 def to_json(self): 289 return self.mapping.get(self.value, "<unknown>") 290 291 fields = ( 292 ('u32', Magic, 'magic'), 293 ('u32', '{}', 'length') 294 # length bytes of data follows 295 # then padding to next multiply of 8 296 ) 297 298 def __init__(self, magic=None, length=None, data=None, fd=None, 299 cluster_size=None): 300 """ 301 Support both loading from fd and creation from user data. 302 For fd-based creation current position in a file will be used to read 303 the data. 304 The cluster_size value may be obtained by dependent structures. 305 306 This should be somehow refactored and functionality should be moved to 307 superclass (to allow creation of any qcow2 struct), but then, fields 308 of variable length (data here) should be supported in base class 309 somehow. Note also, that we probably want to parse different 310 extensions. Should they be subclasses of this class, or how to do it 311 better? Should it be something like QAPI union with discriminator field 312 (magic here). So, it's a TODO. We'll see how to properly refactor this 313 when we have more qcow2 structures. 314 """ 315 if fd is None: 316 assert all(v is not None for v in (magic, length, data)) 317 self.magic = magic 318 self.length = length 319 if length % 8 != 0: 320 padding = 8 - (length % 8) 321 data += b'\0' * padding 322 self.data = data 323 else: 324 assert all(v is None for v in (magic, length, data)) 325 super().__init__(fd=fd) 326 if self.magic == QCOW2_EXT_MAGIC_BITMAPS: 327 self.obj = Qcow2BitmapExt(fd=fd, cluster_size=cluster_size) 328 self.data = None 329 else: 330 padded = (self.length + 7) & ~7 331 self.data = fd.read(padded) 332 assert self.data is not None 333 self.obj = None 334 335 if self.data is not None: 336 data_str = self.data[:self.length] 337 if all(c in string.printable.encode( 338 'ascii') for c in data_str): 339 data_str = f"'{ data_str.decode('ascii') }'" 340 else: 341 data_str = '<binary>' 342 self.data_str = data_str 343 344 345 def dump(self): 346 super().dump() 347 348 if self.obj is None: 349 print(f'{"data":<25} {self.data_str}') 350 else: 351 self.obj.dump() 352 353 def to_json(self): 354 # Put the name ahead of the dict 355 res = {'name': self.Magic(self.magic), **super().to_json()} 356 if self.obj is not None: 357 res['data'] = self.obj 358 else: 359 res['data_str'] = self.data_str 360 361 return res 362 363 @classmethod 364 def create(cls, magic, data): 365 return QcowHeaderExtension(magic, len(data), data) 366 367 368class QcowHeader(Qcow2Struct): 369 370 fields = ( 371 # Version 2 header fields 372 ('u32', '{:#x}', 'magic'), 373 ('u32', '{}', 'version'), 374 ('u64', '{:#x}', 'backing_file_offset'), 375 ('u32', '{:#x}', 'backing_file_size'), 376 ('u32', '{}', 'cluster_bits'), 377 ('u64', '{}', 'size'), 378 ('u32', '{}', 'crypt_method'), 379 ('u32', '{}', 'l1_size'), 380 ('u64', '{:#x}', 'l1_table_offset'), 381 ('u64', '{:#x}', 'refcount_table_offset'), 382 ('u32', '{}', 'refcount_table_clusters'), 383 ('u32', '{}', 'nb_snapshots'), 384 ('u64', '{:#x}', 'snapshot_offset'), 385 386 # Version 3 header fields 387 ('u64', Flags64, 'incompatible_features'), 388 ('u64', Flags64, 'compatible_features'), 389 ('u64', Flags64, 'autoclear_features'), 390 ('u32', '{}', 'refcount_order'), 391 ('u32', '{}', 'header_length'), 392 ) 393 394 def __init__(self, fd): 395 super().__init__(fd=fd, offset=0) 396 397 self.set_defaults() 398 self.cluster_size = 1 << self.cluster_bits 399 400 fd.seek(self.header_length) 401 self.load_extensions(fd) 402 403 if self.backing_file_offset: 404 fd.seek(self.backing_file_offset) 405 self.backing_file = fd.read(self.backing_file_size) 406 else: 407 self.backing_file = None 408 409 def set_defaults(self): 410 if self.version == 2: 411 self.incompatible_features = 0 412 self.compatible_features = 0 413 self.autoclear_features = 0 414 self.refcount_order = 4 415 self.header_length = 72 416 417 def load_extensions(self, fd): 418 self.extensions = [] 419 420 if self.backing_file_offset != 0: 421 end = min(self.cluster_size, self.backing_file_offset) 422 else: 423 end = self.cluster_size 424 425 while fd.tell() < end: 426 ext = QcowHeaderExtension(fd=fd, cluster_size=self.cluster_size) 427 if ext.magic == 0: 428 break 429 else: 430 self.extensions.append(ext) 431 432 def update_extensions(self, fd): 433 434 fd.seek(self.header_length) 435 extensions = self.extensions 436 extensions.append(QcowHeaderExtension(0, 0, b'')) 437 for ex in extensions: 438 buf = struct.pack('>II', ex.magic, ex.length) 439 fd.write(buf) 440 fd.write(ex.data) 441 442 if self.backing_file is not None: 443 self.backing_file_offset = fd.tell() 444 fd.write(self.backing_file) 445 446 if fd.tell() > self.cluster_size: 447 raise Exception('I think I just broke the image...') 448 449 def update(self, fd): 450 header_bytes = self.header_length 451 452 self.update_extensions(fd) 453 454 fd.seek(0) 455 header = tuple(self.__dict__[f] for t, p, f in QcowHeader.fields) 456 buf = struct.pack(QcowHeader.fmt, *header) 457 buf = buf[0:header_bytes-1] 458 fd.write(buf) 459 460 def dump_extensions(self, is_json=False): 461 if is_json: 462 print(json.dumps(self.extensions, indent=4, cls=ComplexEncoder)) 463 return 464 465 for ex in self.extensions: 466 print('Header extension:') 467 ex.dump() 468 print()