qemu

FORK: QEMU emulator
git clone https://git.neptards.moe/neptards/qemu.git
Log | Files | Refs | Submodules | LICENSE

qcow2_format.py (14526B)


      1 # Library for manipulations with qcow2 image
      2 #
      3 # Copyright (c) 2020 Virtuozzo International GmbH.
      4 # Copyright (C) 2012 Red Hat, Inc.
      5 #
      6 # This program is free software; you can redistribute it and/or modify
      7 # it under the terms of the GNU General Public License as published by
      8 # the Free Software Foundation; either version 2 of the License, or
      9 # (at your option) any later version.
     10 #
     11 # This program is distributed in the hope that it will be useful,
     12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
     13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     14 # GNU General Public License for more details.
     15 #
     16 # You should have received a copy of the GNU General Public License
     17 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
     18 #
     19 
     20 import struct
     21 import string
     22 import json
     23 
     24 
     25 class ComplexEncoder(json.JSONEncoder):
     26     def default(self, obj):
     27         if hasattr(obj, 'to_json'):
     28             return obj.to_json()
     29         else:
     30             return json.JSONEncoder.default(self, obj)
     31 
     32 
     33 class Qcow2Field:
     34 
     35     def __init__(self, value):
     36         self.value = value
     37 
     38     def __str__(self):
     39         return str(self.value)
     40 
     41 
     42 class Flags64(Qcow2Field):
     43 
     44     def __str__(self):
     45         bits = []
     46         for bit in range(64):
     47             if self.value & (1 << bit):
     48                 bits.append(bit)
     49         return str(bits)
     50 
     51 
     52 class BitmapFlags(Qcow2Field):
     53 
     54     flags = {
     55         0x1: 'in-use',
     56         0x2: 'auto'
     57     }
     58 
     59     def __str__(self):
     60         bits = []
     61         for bit in range(64):
     62             flag = self.value & (1 << bit)
     63             if flag:
     64                 bits.append(self.flags.get(flag, f'bit-{bit}'))
     65         return f'{self.value:#x} ({bits})'
     66 
     67 
     68 class Enum(Qcow2Field):
     69 
     70     def __str__(self):
     71         return f'{self.value:#x} ({self.mapping.get(self.value, "<unknown>")})'
     72 
     73 
     74 class Qcow2StructMeta(type):
     75 
     76     # Mapping from c types to python struct format
     77     ctypes = {
     78         'u8': 'B',
     79         'u16': 'H',
     80         'u32': 'I',
     81         'u64': 'Q'
     82     }
     83 
     84     def __init__(self, name, bases, attrs):
     85         if 'fields' in attrs:
     86             self.fmt = '>' + ''.join(self.ctypes[f[0]] for f in self.fields)
     87 
     88 
     89 class Qcow2Struct(metaclass=Qcow2StructMeta):
     90 
     91     """Qcow2Struct: base class for qcow2 data structures
     92 
     93     Successors should define fields class variable, which is: list of tuples,
     94     each of three elements:
     95         - c-type (one of 'u8', 'u16', 'u32', 'u64')
     96         - format (format_spec to use with .format() when dump or 'mask' to dump
     97                   bitmasks)
     98         - field name
     99     """
    100 
    101     def __init__(self, fd=None, offset=None, data=None):
    102         """
    103         Two variants:
    104             1. Specify data. fd and offset must be None.
    105             2. Specify fd and offset, data must be None. offset may be omitted
    106                in this case, than current position of fd is used.
    107         """
    108         if data is None:
    109             assert fd is not None
    110             buf_size = struct.calcsize(self.fmt)
    111             if offset is not None:
    112                 fd.seek(offset)
    113             data = fd.read(buf_size)
    114         else:
    115             assert fd is None and offset is None
    116 
    117         values = struct.unpack(self.fmt, data)
    118         self.__dict__ = dict((field[2], values[i])
    119                              for i, field in enumerate(self.fields))
    120 
    121     def dump(self, is_json=False):
    122         if is_json:
    123             print(json.dumps(self.to_json(), indent=4, cls=ComplexEncoder))
    124             return
    125 
    126         for f in self.fields:
    127             value = self.__dict__[f[2]]
    128             if isinstance(f[1], str):
    129                 value_str = f[1].format(value)
    130             else:
    131                 value_str = str(f[1](value))
    132 
    133             print('{:<25} {}'.format(f[2], value_str))
    134 
    135     def to_json(self):
    136         return dict((f[2], self.__dict__[f[2]]) for f in self.fields)
    137 
    138 
    139 class Qcow2BitmapExt(Qcow2Struct):
    140 
    141     fields = (
    142         ('u32', '{}', 'nb_bitmaps'),
    143         ('u32', '{}', 'reserved32'),
    144         ('u64', '{:#x}', 'bitmap_directory_size'),
    145         ('u64', '{:#x}', 'bitmap_directory_offset')
    146     )
    147 
    148     def __init__(self, fd, cluster_size):
    149         super().__init__(fd=fd)
    150         tail = struct.calcsize(self.fmt) % 8
    151         if tail:
    152             fd.seek(8 - tail, 1)
    153         position = fd.tell()
    154         self.cluster_size = cluster_size
    155         self.read_bitmap_directory(fd)
    156         fd.seek(position)
    157 
    158     def read_bitmap_directory(self, fd):
    159         fd.seek(self.bitmap_directory_offset)
    160         self.bitmap_directory = \
    161             [Qcow2BitmapDirEntry(fd, cluster_size=self.cluster_size)
    162              for _ in range(self.nb_bitmaps)]
    163 
    164     def dump(self):
    165         super().dump()
    166         for entry in self.bitmap_directory:
    167             print()
    168             entry.dump()
    169 
    170     def to_json(self):
    171         fields_dict = super().to_json()
    172         fields_dict['bitmap_directory'] = self.bitmap_directory
    173         return fields_dict
    174 
    175 
    176 class Qcow2BitmapDirEntry(Qcow2Struct):
    177 
    178     fields = (
    179         ('u64', '{:#x}', 'bitmap_table_offset'),
    180         ('u32', '{}', 'bitmap_table_size'),
    181         ('u32', BitmapFlags, 'flags'),
    182         ('u8',  '{}', 'type'),
    183         ('u8',  '{}', 'granularity_bits'),
    184         ('u16', '{}', 'name_size'),
    185         ('u32', '{}', 'extra_data_size')
    186     )
    187 
    188     def __init__(self, fd, cluster_size):
    189         super().__init__(fd=fd)
    190         self.cluster_size = cluster_size
    191         # Seek relative to the current position in the file
    192         fd.seek(self.extra_data_size, 1)
    193         bitmap_name = fd.read(self.name_size)
    194         self.name = bitmap_name.decode('ascii')
    195         # Move position to the end of the entry in the directory
    196         entry_raw_size = self.bitmap_dir_entry_raw_size()
    197         padding = ((entry_raw_size + 7) & ~7) - entry_raw_size
    198         fd.seek(padding, 1)
    199         self.bitmap_table = Qcow2BitmapTable(fd=fd,
    200                                              offset=self.bitmap_table_offset,
    201                                              nb_entries=self.bitmap_table_size,
    202                                              cluster_size=self.cluster_size)
    203 
    204     def bitmap_dir_entry_raw_size(self):
    205         return struct.calcsize(self.fmt) + self.name_size + \
    206             self.extra_data_size
    207 
    208     def dump(self):
    209         print(f'{"Bitmap name":<25} {self.name}')
    210         super(Qcow2BitmapDirEntry, self).dump()
    211         self.bitmap_table.dump()
    212 
    213     def to_json(self):
    214         # Put the name ahead of the dict
    215         return {
    216             'name': self.name,
    217             **super().to_json(),
    218             'bitmap_table': self.bitmap_table
    219         }
    220 
    221 
    222 class Qcow2BitmapTableEntry(Qcow2Struct):
    223 
    224     fields = (
    225         ('u64',  '{}', 'entry'),
    226     )
    227 
    228     BME_TABLE_ENTRY_RESERVED_MASK = 0xff000000000001fe
    229     BME_TABLE_ENTRY_OFFSET_MASK = 0x00fffffffffffe00
    230     BME_TABLE_ENTRY_FLAG_ALL_ONES = 1
    231 
    232     def __init__(self, fd):
    233         super().__init__(fd=fd)
    234         self.reserved = self.entry & self.BME_TABLE_ENTRY_RESERVED_MASK
    235         self.offset = self.entry & self.BME_TABLE_ENTRY_OFFSET_MASK
    236         if self.offset:
    237             if self.entry & self.BME_TABLE_ENTRY_FLAG_ALL_ONES:
    238                 self.type = 'invalid'
    239             else:
    240                 self.type = 'serialized'
    241         elif self.entry & self.BME_TABLE_ENTRY_FLAG_ALL_ONES:
    242             self.type = 'all-ones'
    243         else:
    244             self.type = 'all-zeroes'
    245 
    246     def to_json(self):
    247         return {'type': self.type, 'offset': self.offset,
    248                 'reserved': self.reserved}
    249 
    250 
    251 class Qcow2BitmapTable:
    252 
    253     def __init__(self, fd, offset, nb_entries, cluster_size):
    254         self.cluster_size = cluster_size
    255         position = fd.tell()
    256         fd.seek(offset)
    257         self.entries = [Qcow2BitmapTableEntry(fd) for _ in range(nb_entries)]
    258         fd.seek(position)
    259 
    260     def dump(self):
    261         bitmap_table = enumerate(self.entries)
    262         print(f'{"Bitmap table":<14} {"type":<15} {"size":<12} {"offset"}')
    263         for i, entry in bitmap_table:
    264             if entry.type == 'serialized':
    265                 size = self.cluster_size
    266             else:
    267                 size = 0
    268             print(f'{i:<14} {entry.type:<15} {size:<12} {entry.offset}')
    269 
    270     def to_json(self):
    271         return self.entries
    272 
    273 
    274 QCOW2_EXT_MAGIC_BITMAPS = 0x23852875
    275 
    276 
    277 class QcowHeaderExtension(Qcow2Struct):
    278 
    279     class Magic(Enum):
    280         mapping = {
    281             0xe2792aca: 'Backing format',
    282             0x6803f857: 'Feature table',
    283             0x0537be77: 'Crypto header',
    284             QCOW2_EXT_MAGIC_BITMAPS: 'Bitmaps',
    285             0x44415441: 'Data file'
    286         }
    287 
    288         def to_json(self):
    289             return self.mapping.get(self.value, "<unknown>")
    290 
    291     fields = (
    292         ('u32', Magic, 'magic'),
    293         ('u32', '{}', 'length')
    294         # length bytes of data follows
    295         # then padding to next multiply of 8
    296     )
    297 
    298     def __init__(self, magic=None, length=None, data=None, fd=None,
    299                  cluster_size=None):
    300         """
    301         Support both loading from fd and creation from user data.
    302         For fd-based creation current position in a file will be used to read
    303         the data.
    304         The cluster_size value may be obtained by dependent structures.
    305 
    306         This should be somehow refactored and functionality should be moved to
    307         superclass (to allow creation of any qcow2 struct), but then, fields
    308         of variable length (data here) should be supported in base class
    309         somehow. Note also, that we probably want to parse different
    310         extensions. Should they be subclasses of this class, or how to do it
    311         better? Should it be something like QAPI union with discriminator field
    312         (magic here). So, it's a TODO. We'll see how to properly refactor this
    313         when we have more qcow2 structures.
    314         """
    315         if fd is None:
    316             assert all(v is not None for v in (magic, length, data))
    317             self.magic = magic
    318             self.length = length
    319             if length % 8 != 0:
    320                 padding = 8 - (length % 8)
    321                 data += b'\0' * padding
    322             self.data = data
    323         else:
    324             assert all(v is None for v in (magic, length, data))
    325             super().__init__(fd=fd)
    326             if self.magic == QCOW2_EXT_MAGIC_BITMAPS:
    327                 self.obj = Qcow2BitmapExt(fd=fd, cluster_size=cluster_size)
    328                 self.data = None
    329             else:
    330                 padded = (self.length + 7) & ~7
    331                 self.data = fd.read(padded)
    332                 assert self.data is not None
    333                 self.obj = None
    334 
    335         if self.data is not None:
    336             data_str = self.data[:self.length]
    337             if all(c in string.printable.encode(
    338                 'ascii') for c in data_str):
    339                 data_str = f"'{ data_str.decode('ascii') }'"
    340             else:
    341                 data_str = '<binary>'
    342             self.data_str = data_str
    343 
    344 
    345     def dump(self):
    346         super().dump()
    347 
    348         if self.obj is None:
    349             print(f'{"data":<25} {self.data_str}')
    350         else:
    351             self.obj.dump()
    352 
    353     def to_json(self):
    354         # Put the name ahead of the dict
    355         res = {'name': self.Magic(self.magic), **super().to_json()}
    356         if self.obj is not None:
    357             res['data'] = self.obj
    358         else:
    359             res['data_str'] = self.data_str
    360 
    361         return res
    362 
    363     @classmethod
    364     def create(cls, magic, data):
    365         return QcowHeaderExtension(magic, len(data), data)
    366 
    367 
    368 class QcowHeader(Qcow2Struct):
    369 
    370     fields = (
    371         # Version 2 header fields
    372         ('u32', '{:#x}', 'magic'),
    373         ('u32', '{}', 'version'),
    374         ('u64', '{:#x}', 'backing_file_offset'),
    375         ('u32', '{:#x}', 'backing_file_size'),
    376         ('u32', '{}', 'cluster_bits'),
    377         ('u64', '{}', 'size'),
    378         ('u32', '{}', 'crypt_method'),
    379         ('u32', '{}', 'l1_size'),
    380         ('u64', '{:#x}', 'l1_table_offset'),
    381         ('u64', '{:#x}', 'refcount_table_offset'),
    382         ('u32', '{}', 'refcount_table_clusters'),
    383         ('u32', '{}', 'nb_snapshots'),
    384         ('u64', '{:#x}', 'snapshot_offset'),
    385 
    386         # Version 3 header fields
    387         ('u64', Flags64, 'incompatible_features'),
    388         ('u64', Flags64, 'compatible_features'),
    389         ('u64', Flags64, 'autoclear_features'),
    390         ('u32', '{}', 'refcount_order'),
    391         ('u32', '{}', 'header_length'),
    392     )
    393 
    394     def __init__(self, fd):
    395         super().__init__(fd=fd, offset=0)
    396 
    397         self.set_defaults()
    398         self.cluster_size = 1 << self.cluster_bits
    399 
    400         fd.seek(self.header_length)
    401         self.load_extensions(fd)
    402 
    403         if self.backing_file_offset:
    404             fd.seek(self.backing_file_offset)
    405             self.backing_file = fd.read(self.backing_file_size)
    406         else:
    407             self.backing_file = None
    408 
    409     def set_defaults(self):
    410         if self.version == 2:
    411             self.incompatible_features = 0
    412             self.compatible_features = 0
    413             self.autoclear_features = 0
    414             self.refcount_order = 4
    415             self.header_length = 72
    416 
    417     def load_extensions(self, fd):
    418         self.extensions = []
    419 
    420         if self.backing_file_offset != 0:
    421             end = min(self.cluster_size, self.backing_file_offset)
    422         else:
    423             end = self.cluster_size
    424 
    425         while fd.tell() < end:
    426             ext = QcowHeaderExtension(fd=fd, cluster_size=self.cluster_size)
    427             if ext.magic == 0:
    428                 break
    429             else:
    430                 self.extensions.append(ext)
    431 
    432     def update_extensions(self, fd):
    433 
    434         fd.seek(self.header_length)
    435         extensions = self.extensions
    436         extensions.append(QcowHeaderExtension(0, 0, b''))
    437         for ex in extensions:
    438             buf = struct.pack('>II', ex.magic, ex.length)
    439             fd.write(buf)
    440             fd.write(ex.data)
    441 
    442         if self.backing_file is not None:
    443             self.backing_file_offset = fd.tell()
    444             fd.write(self.backing_file)
    445 
    446         if fd.tell() > self.cluster_size:
    447             raise Exception('I think I just broke the image...')
    448 
    449     def update(self, fd):
    450         header_bytes = self.header_length
    451 
    452         self.update_extensions(fd)
    453 
    454         fd.seek(0)
    455         header = tuple(self.__dict__[f] for t, p, f in QcowHeader.fields)
    456         buf = struct.pack(QcowHeader.fmt, *header)
    457         buf = buf[0:header_bytes-1]
    458         fd.write(buf)
    459 
    460     def dump_extensions(self, is_json=False):
    461         if is_json:
    462             print(json.dumps(self.extensions, indent=4, cls=ComplexEncoder))
    463             return
    464 
    465         for ex in self.extensions:
    466             print('Header extension:')
    467             ex.dump()
    468             print()