qemu

FORK: QEMU emulator
git clone https://git.neptards.moe/neptards/qemu.git
Log | Files | Refs | Submodules | LICENSE

iotests.py (59988B)


      1 # Common utilities and Python wrappers for qemu-iotests
      2 #
      3 # Copyright (C) 2012 IBM Corp.
      4 #
      5 # This program is free software; you can redistribute it and/or modify
      6 # it under the terms of the GNU General Public License as published by
      7 # the Free Software Foundation; either version 2 of the License, or
      8 # (at your option) any later version.
      9 #
     10 # This program is distributed in the hope that it will be useful,
     11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
     12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     13 # GNU General Public License for more details.
     14 #
     15 # You should have received a copy of the GNU General Public License
     16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
     17 #
     18 
     19 import argparse
     20 import atexit
     21 import bz2
     22 from collections import OrderedDict
     23 import faulthandler
     24 import json
     25 import logging
     26 import os
     27 import re
     28 import shutil
     29 import signal
     30 import struct
     31 import subprocess
     32 import sys
     33 import time
     34 from typing import (Any, Callable, Dict, Iterable, Iterator,
     35                     List, Optional, Sequence, TextIO, Tuple, Type, TypeVar)
     36 import unittest
     37 
     38 from contextlib import contextmanager
     39 
     40 from qemu.machine import qtest
     41 from qemu.qmp.legacy import QMPMessage, QEMUMonitorProtocol
     42 from qemu.utils import VerboseProcessError
     43 
     44 # Use this logger for logging messages directly from the iotests module
     45 logger = logging.getLogger('qemu.iotests')
     46 logger.addHandler(logging.NullHandler())
     47 
     48 # Use this logger for messages that ought to be used for diff output.
     49 test_logger = logging.getLogger('qemu.iotests.diff_io')
     50 
     51 
     52 faulthandler.enable()
     53 
     54 # This will not work if arguments contain spaces but is necessary if we
     55 # want to support the override options that ./check supports.
     56 qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
     57 if os.environ.get('QEMU_IMG_OPTIONS'):
     58     qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
     59 
     60 qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
     61 if os.environ.get('QEMU_IO_OPTIONS'):
     62     qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
     63 
     64 qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
     65 if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
     66     qemu_io_args_no_fmt += \
     67         os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
     68 
     69 qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
     70 qemu_nbd_args = [qemu_nbd_prog]
     71 if os.environ.get('QEMU_NBD_OPTIONS'):
     72     qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
     73 
     74 qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
     75 qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
     76 
     77 qsd_prog = os.environ.get('QSD_PROG', 'qemu-storage-daemon')
     78 
     79 gdb_qemu_env = os.environ.get('GDB_OPTIONS')
     80 qemu_gdb = []
     81 if gdb_qemu_env:
     82     qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ')
     83 
     84 qemu_print = os.environ.get('PRINT_QEMU', False)
     85 
     86 imgfmt = os.environ.get('IMGFMT', 'raw')
     87 imgproto = os.environ.get('IMGPROTO', 'file')
     88 
     89 try:
     90     test_dir = os.environ['TEST_DIR']
     91     sock_dir = os.environ['SOCK_DIR']
     92     cachemode = os.environ['CACHEMODE']
     93     aiomode = os.environ['AIOMODE']
     94     qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE']
     95 except KeyError:
     96     # We are using these variables as proxies to indicate that we're
     97     # not being run via "check". There may be other things set up by
     98     # "check" that individual test cases rely on.
     99     sys.stderr.write('Please run this test via the "check" script\n')
    100     sys.exit(os.EX_USAGE)
    101 
    102 qemu_valgrind = []
    103 if os.environ.get('VALGRIND_QEMU') == "y" and \
    104     os.environ.get('NO_VALGRIND') != "y":
    105     valgrind_logfile = "--log-file=" + test_dir
    106     # %p allows to put the valgrind process PID, since
    107     # we don't know it a priori (subprocess.Popen is
    108     # not yet invoked)
    109     valgrind_logfile += "/%p.valgrind"
    110 
    111     qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99']
    112 
    113 luks_default_secret_object = 'secret,id=keysec0,data=' + \
    114                              os.environ.get('IMGKEYSECRET', '')
    115 luks_default_key_secret_opt = 'key-secret=keysec0'
    116 
    117 sample_img_dir = os.environ['SAMPLE_IMG_DIR']
    118 
    119 
    120 @contextmanager
    121 def change_log_level(
    122         logger_name: str, level: int = logging.CRITICAL) -> Iterator[None]:
    123     """
    124     Utility function for temporarily changing the log level of a logger.
    125 
    126     This can be used to silence errors that are expected or uninteresting.
    127     """
    128     _logger = logging.getLogger(logger_name)
    129     current_level = _logger.level
    130     _logger.setLevel(level)
    131 
    132     try:
    133         yield
    134     finally:
    135         _logger.setLevel(current_level)
    136 
    137 
    138 def unarchive_sample_image(sample, fname):
    139     sample_fname = os.path.join(sample_img_dir, sample + '.bz2')
    140     with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out:
    141         shutil.copyfileobj(f_in, f_out)
    142 
    143 
    144 def qemu_tool_popen(args: Sequence[str],
    145                     connect_stderr: bool = True) -> 'subprocess.Popen[str]':
    146     stderr = subprocess.STDOUT if connect_stderr else None
    147     # pylint: disable=consider-using-with
    148     return subprocess.Popen(args,
    149                             stdout=subprocess.PIPE,
    150                             stderr=stderr,
    151                             universal_newlines=True)
    152 
    153 
    154 def qemu_tool_pipe_and_status(tool: str, args: Sequence[str],
    155                               connect_stderr: bool = True,
    156                               drop_successful_output: bool = False) \
    157         -> Tuple[str, int]:
    158     """
    159     Run a tool and return both its output and its exit code
    160     """
    161     with qemu_tool_popen(args, connect_stderr) as subp:
    162         output = subp.communicate()[0]
    163         if subp.returncode < 0:
    164             cmd = ' '.join(args)
    165             sys.stderr.write(f'{tool} received signal \
    166                                {-subp.returncode}: {cmd}\n')
    167         if drop_successful_output and subp.returncode == 0:
    168             output = ''
    169         return (output, subp.returncode)
    170 
    171 def qemu_img_create_prepare_args(args: List[str]) -> List[str]:
    172     if not args or args[0] != 'create':
    173         return list(args)
    174     args = args[1:]
    175 
    176     p = argparse.ArgumentParser(allow_abbrev=False)
    177     # -o option may be specified several times
    178     p.add_argument('-o', action='append', default=[])
    179     p.add_argument('-f')
    180     parsed, remaining = p.parse_known_args(args)
    181 
    182     opts_list = parsed.o
    183 
    184     result = ['create']
    185     if parsed.f is not None:
    186         result += ['-f', parsed.f]
    187 
    188     # IMGOPTS most probably contain options specific for the selected format,
    189     # like extended_l2 or compression_type for qcow2. Test may want to create
    190     # additional images in other formats that doesn't support these options.
    191     # So, use IMGOPTS only for images created in imgfmt format.
    192     imgopts = os.environ.get('IMGOPTS')
    193     if imgopts and parsed.f == imgfmt:
    194         opts_list.insert(0, imgopts)
    195 
    196     # default luks support
    197     if parsed.f == 'luks' and \
    198             all('key-secret' not in opts for opts in opts_list):
    199         result += ['--object', luks_default_secret_object]
    200         opts_list.append(luks_default_key_secret_opt)
    201 
    202     for opts in opts_list:
    203         result += ['-o', opts]
    204 
    205     result += remaining
    206 
    207     return result
    208 
    209 
    210 def qemu_tool(*args: str, check: bool = True, combine_stdio: bool = True
    211               ) -> 'subprocess.CompletedProcess[str]':
    212     """
    213     Run a qemu tool and return its status code and console output.
    214 
    215     :param args: full command line to run.
    216     :param check: Enforce a return code of zero.
    217     :param combine_stdio: set to False to keep stdout/stderr separated.
    218 
    219     :raise VerboseProcessError:
    220         When the return code is negative, or on any non-zero exit code
    221         when 'check=True' was provided (the default). This exception has
    222         'stdout', 'stderr', and 'returncode' properties that may be
    223         inspected to show greater detail. If this exception is not
    224         handled, the command-line, return code, and all console output
    225         will be included at the bottom of the stack trace.
    226 
    227     :return:
    228         a CompletedProcess. This object has args, returncode, and stdout
    229         properties. If streams are not combined, it will also have a
    230         stderr property.
    231     """
    232     subp = subprocess.run(
    233         args,
    234         stdout=subprocess.PIPE,
    235         stderr=subprocess.STDOUT if combine_stdio else subprocess.PIPE,
    236         universal_newlines=True,
    237         check=False
    238     )
    239 
    240     if check and subp.returncode or (subp.returncode < 0):
    241         raise VerboseProcessError(
    242             subp.returncode, args,
    243             output=subp.stdout,
    244             stderr=subp.stderr,
    245         )
    246 
    247     return subp
    248 
    249 
    250 def qemu_img(*args: str, check: bool = True, combine_stdio: bool = True
    251              ) -> 'subprocess.CompletedProcess[str]':
    252     """
    253     Run QEMU_IMG_PROG and return its status code and console output.
    254 
    255     This function always prepends QEMU_IMG_OPTIONS and may further alter
    256     the args for 'create' commands.
    257 
    258     See `qemu_tool()` for greater detail.
    259     """
    260     full_args = qemu_img_args + qemu_img_create_prepare_args(list(args))
    261     return qemu_tool(*full_args, check=check, combine_stdio=combine_stdio)
    262 
    263 
    264 def ordered_qmp(qmsg, conv_keys=True):
    265     # Dictionaries are not ordered prior to 3.6, therefore:
    266     if isinstance(qmsg, list):
    267         return [ordered_qmp(atom) for atom in qmsg]
    268     if isinstance(qmsg, dict):
    269         od = OrderedDict()
    270         for k, v in sorted(qmsg.items()):
    271             if conv_keys:
    272                 k = k.replace('_', '-')
    273             od[k] = ordered_qmp(v, conv_keys=False)
    274         return od
    275     return qmsg
    276 
    277 def qemu_img_create(*args: str) -> 'subprocess.CompletedProcess[str]':
    278     return qemu_img('create', *args)
    279 
    280 def qemu_img_json(*args: str) -> Any:
    281     """
    282     Run qemu-img and return its output as deserialized JSON.
    283 
    284     :raise CalledProcessError:
    285         When qemu-img crashes, or returns a non-zero exit code without
    286         producing a valid JSON document to stdout.
    287     :raise JSONDecoderError:
    288         When qemu-img returns 0, but failed to produce a valid JSON document.
    289 
    290     :return: A deserialized JSON object; probably a dict[str, Any].
    291     """
    292     try:
    293         res = qemu_img(*args, combine_stdio=False)
    294     except subprocess.CalledProcessError as exc:
    295         # Terminated due to signal. Don't bother.
    296         if exc.returncode < 0:
    297             raise
    298 
    299         # Commands like 'check' can return failure (exit codes 2 and 3)
    300         # to indicate command completion, but with errors found. For
    301         # multi-command flexibility, ignore the exact error codes and
    302         # *try* to load JSON.
    303         try:
    304             return json.loads(exc.stdout)
    305         except json.JSONDecodeError:
    306             # Nope. This thing is toast. Raise the /process/ error.
    307             pass
    308         raise
    309 
    310     return json.loads(res.stdout)
    311 
    312 def qemu_img_measure(*args: str) -> Any:
    313     return qemu_img_json("measure", "--output", "json", *args)
    314 
    315 def qemu_img_check(*args: str) -> Any:
    316     return qemu_img_json("check", "--output", "json", *args)
    317 
    318 def qemu_img_info(*args: str) -> Any:
    319     return qemu_img_json('info', "--output", "json", *args)
    320 
    321 def qemu_img_map(*args: str) -> Any:
    322     return qemu_img_json('map', "--output", "json", *args)
    323 
    324 def qemu_img_log(*args: str, check: bool = True
    325                  ) -> 'subprocess.CompletedProcess[str]':
    326     result = qemu_img(*args, check=check)
    327     log(result.stdout, filters=[filter_testfiles])
    328     return result
    329 
    330 def img_info_log(filename: str, filter_path: Optional[str] = None,
    331                  use_image_opts: bool = False, extra_args: Sequence[str] = (),
    332                  check: bool = True,
    333                  ) -> None:
    334     args = ['info']
    335     if use_image_opts:
    336         args.append('--image-opts')
    337     else:
    338         args += ['-f', imgfmt]
    339     args += extra_args
    340     args.append(filename)
    341 
    342     output = qemu_img(*args, check=check).stdout
    343     if not filter_path:
    344         filter_path = filename
    345     log(filter_img_info(output, filter_path))
    346 
    347 def qemu_io_wrap_args(args: Sequence[str]) -> List[str]:
    348     if '-f' in args or '--image-opts' in args:
    349         return qemu_io_args_no_fmt + list(args)
    350     else:
    351         return qemu_io_args + list(args)
    352 
    353 def qemu_io_popen(*args):
    354     return qemu_tool_popen(qemu_io_wrap_args(args))
    355 
    356 def qemu_io(*args: str, check: bool = True, combine_stdio: bool = True
    357             ) -> 'subprocess.CompletedProcess[str]':
    358     """
    359     Run QEMU_IO_PROG and return the status code and console output.
    360 
    361     This function always prepends either QEMU_IO_OPTIONS or
    362     QEMU_IO_OPTIONS_NO_FMT.
    363     """
    364     return qemu_tool(*qemu_io_wrap_args(args),
    365                      check=check, combine_stdio=combine_stdio)
    366 
    367 def qemu_io_log(*args: str, check: bool = True
    368                 ) -> 'subprocess.CompletedProcess[str]':
    369     result = qemu_io(*args, check=check)
    370     log(result.stdout, filters=[filter_testfiles, filter_qemu_io])
    371     return result
    372 
    373 class QemuIoInteractive:
    374     def __init__(self, *args):
    375         self.args = qemu_io_wrap_args(args)
    376         # We need to keep the Popen objext around, and not
    377         # close it immediately. Therefore, disable the pylint check:
    378         # pylint: disable=consider-using-with
    379         self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
    380                                    stdout=subprocess.PIPE,
    381                                    stderr=subprocess.STDOUT,
    382                                    universal_newlines=True)
    383         out = self._p.stdout.read(9)
    384         if out != 'qemu-io> ':
    385             # Most probably qemu-io just failed to start.
    386             # Let's collect the whole output and exit.
    387             out += self._p.stdout.read()
    388             self._p.wait(timeout=1)
    389             raise ValueError(out)
    390 
    391     def close(self):
    392         self._p.communicate('q\n')
    393 
    394     def _read_output(self):
    395         pattern = 'qemu-io> '
    396         n = len(pattern)
    397         pos = 0
    398         s = []
    399         while pos != n:
    400             c = self._p.stdout.read(1)
    401             # check unexpected EOF
    402             assert c != ''
    403             s.append(c)
    404             if c == pattern[pos]:
    405                 pos += 1
    406             else:
    407                 pos = 0
    408 
    409         return ''.join(s[:-n])
    410 
    411     def cmd(self, cmd):
    412         # quit command is in close(), '\n' is added automatically
    413         assert '\n' not in cmd
    414         cmd = cmd.strip()
    415         assert cmd not in ('q', 'quit')
    416         self._p.stdin.write(cmd + '\n')
    417         self._p.stdin.flush()
    418         return self._read_output()
    419 
    420 
    421 class QemuStorageDaemon:
    422     _qmp: Optional[QEMUMonitorProtocol] = None
    423     _qmpsock: Optional[str] = None
    424     # Python < 3.8 would complain if this type were not a string literal
    425     # (importing `annotations` from `__future__` would work; but not on <= 3.6)
    426     _p: 'Optional[subprocess.Popen[bytes]]' = None
    427 
    428     def __init__(self, *args: str, instance_id: str = 'a', qmp: bool = False):
    429         assert '--pidfile' not in args
    430         self.pidfile = os.path.join(test_dir, f'qsd-{instance_id}-pid')
    431         all_args = [qsd_prog] + list(args) + ['--pidfile', self.pidfile]
    432 
    433         if qmp:
    434             self._qmpsock = os.path.join(sock_dir, f'qsd-{instance_id}.sock')
    435             all_args += ['--chardev',
    436                          f'socket,id=qmp-sock,path={self._qmpsock}',
    437                          '--monitor', 'qmp-sock']
    438 
    439             self._qmp = QEMUMonitorProtocol(self._qmpsock, server=True)
    440 
    441         # Cannot use with here, we want the subprocess to stay around
    442         # pylint: disable=consider-using-with
    443         self._p = subprocess.Popen(all_args)
    444         if self._qmp is not None:
    445             self._qmp.accept()
    446         while not os.path.exists(self.pidfile):
    447             if self._p.poll() is not None:
    448                 cmd = ' '.join(all_args)
    449                 raise RuntimeError(
    450                     'qemu-storage-daemon terminated with exit code ' +
    451                     f'{self._p.returncode}: {cmd}')
    452 
    453             time.sleep(0.01)
    454 
    455         with open(self.pidfile, encoding='utf-8') as f:
    456             self._pid = int(f.read().strip())
    457 
    458         assert self._pid == self._p.pid
    459 
    460     def qmp(self, cmd: str, args: Optional[Dict[str, object]] = None) \
    461             -> QMPMessage:
    462         assert self._qmp is not None
    463         return self._qmp.cmd(cmd, args)
    464 
    465     def stop(self, kill_signal=15):
    466         self._p.send_signal(kill_signal)
    467         self._p.wait()
    468         self._p = None
    469 
    470         if self._qmp:
    471             self._qmp.close()
    472 
    473         if self._qmpsock is not None:
    474             try:
    475                 os.remove(self._qmpsock)
    476             except OSError:
    477                 pass
    478         try:
    479             os.remove(self.pidfile)
    480         except OSError:
    481             pass
    482 
    483     def __del__(self):
    484         if self._p is not None:
    485             self.stop(kill_signal=9)
    486 
    487 
    488 def qemu_nbd(*args):
    489     '''Run qemu-nbd in daemon mode and return the parent's exit code'''
    490     return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
    491 
    492 def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]:
    493     '''Run qemu-nbd in daemon mode and return both the parent's exit code
    494        and its output in case of an error'''
    495     full_args = qemu_nbd_args + ['--fork'] + list(args)
    496     output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args,
    497                                                    connect_stderr=False)
    498     return returncode, output if returncode else ''
    499 
    500 def qemu_nbd_list_log(*args: str) -> str:
    501     '''Run qemu-nbd to list remote exports'''
    502     full_args = [qemu_nbd_prog, '-L'] + list(args)
    503     output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
    504     log(output, filters=[filter_testfiles, filter_nbd_exports])
    505     return output
    506 
    507 @contextmanager
    508 def qemu_nbd_popen(*args):
    509     '''Context manager running qemu-nbd within the context'''
    510     pid_file = file_path("qemu_nbd_popen-nbd-pid-file")
    511 
    512     assert not os.path.exists(pid_file)
    513 
    514     cmd = list(qemu_nbd_args)
    515     cmd.extend(('--persistent', '--pid-file', pid_file))
    516     cmd.extend(args)
    517 
    518     log('Start NBD server')
    519     with subprocess.Popen(cmd) as p:
    520         try:
    521             while not os.path.exists(pid_file):
    522                 if p.poll() is not None:
    523                     raise RuntimeError(
    524                         "qemu-nbd terminated with exit code {}: {}"
    525                         .format(p.returncode, ' '.join(cmd)))
    526 
    527                 time.sleep(0.01)
    528             yield
    529         finally:
    530             if os.path.exists(pid_file):
    531                 os.remove(pid_file)
    532             log('Kill NBD server')
    533             p.kill()
    534             p.wait()
    535 
    536 def compare_images(img1: str, img2: str,
    537                    fmt1: str = imgfmt, fmt2: str = imgfmt) -> bool:
    538     """
    539     Compare two images with QEMU_IMG; return True if they are identical.
    540 
    541     :raise CalledProcessError:
    542         when qemu-img crashes or returns a status code of anything other
    543         than 0 (identical) or 1 (different).
    544     """
    545     try:
    546         qemu_img('compare', '-f', fmt1, '-F', fmt2, img1, img2)
    547         return True
    548     except subprocess.CalledProcessError as exc:
    549         if exc.returncode == 1:
    550             return False
    551         raise
    552 
    553 def create_image(name, size):
    554     '''Create a fully-allocated raw image with sector markers'''
    555     with open(name, 'wb') as file:
    556         i = 0
    557         while i < size:
    558             sector = struct.pack('>l504xl', i // 512, i // 512)
    559             file.write(sector)
    560             i = i + 512
    561 
    562 def image_size(img: str) -> int:
    563     """Return image's virtual size"""
    564     value = qemu_img_info('-f', imgfmt, img)['virtual-size']
    565     if not isinstance(value, int):
    566         type_name = type(value).__name__
    567         raise TypeError("Expected 'int' for 'virtual-size', "
    568                         f"got '{value}' of type '{type_name}'")
    569     return value
    570 
    571 def is_str(val):
    572     return isinstance(val, str)
    573 
    574 test_dir_re = re.compile(r"%s" % test_dir)
    575 def filter_test_dir(msg):
    576     return test_dir_re.sub("TEST_DIR", msg)
    577 
    578 win32_re = re.compile(r"\r")
    579 def filter_win32(msg):
    580     return win32_re.sub("", msg)
    581 
    582 qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
    583                         r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
    584                         r"and [0-9\/.inf]* ops\/sec\)")
    585 def filter_qemu_io(msg):
    586     msg = filter_win32(msg)
    587     return qemu_io_re.sub("X ops; XX:XX:XX.X "
    588                           "(XXX YYY/sec and XXX ops/sec)", msg)
    589 
    590 chown_re = re.compile(r"chown [0-9]+:[0-9]+")
    591 def filter_chown(msg):
    592     return chown_re.sub("chown UID:GID", msg)
    593 
    594 def filter_qmp_event(event):
    595     '''Filter a QMP event dict'''
    596     event = dict(event)
    597     if 'timestamp' in event:
    598         event['timestamp']['seconds'] = 'SECS'
    599         event['timestamp']['microseconds'] = 'USECS'
    600     return event
    601 
    602 def filter_qmp(qmsg, filter_fn):
    603     '''Given a string filter, filter a QMP object's values.
    604     filter_fn takes a (key, value) pair.'''
    605     # Iterate through either lists or dicts;
    606     if isinstance(qmsg, list):
    607         items = enumerate(qmsg)
    608     elif isinstance(qmsg, dict):
    609         items = qmsg.items()
    610     else:
    611         return filter_fn(None, qmsg)
    612 
    613     for k, v in items:
    614         if isinstance(v, (dict, list)):
    615             qmsg[k] = filter_qmp(v, filter_fn)
    616         else:
    617             qmsg[k] = filter_fn(k, v)
    618     return qmsg
    619 
    620 def filter_testfiles(msg):
    621     pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
    622     pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
    623     return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
    624 
    625 def filter_qmp_testfiles(qmsg):
    626     def _filter(_key, value):
    627         if is_str(value):
    628             return filter_testfiles(value)
    629         return value
    630     return filter_qmp(qmsg, _filter)
    631 
    632 def filter_virtio_scsi(output: str) -> str:
    633     return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output)
    634 
    635 def filter_qmp_virtio_scsi(qmsg):
    636     def _filter(_key, value):
    637         if is_str(value):
    638             return filter_virtio_scsi(value)
    639         return value
    640     return filter_qmp(qmsg, _filter)
    641 
    642 def filter_generated_node_ids(msg):
    643     return re.sub("#block[0-9]+", "NODE_NAME", msg)
    644 
    645 def filter_img_info(output, filename):
    646     lines = []
    647     for line in output.split('\n'):
    648         if 'disk size' in line or 'actual-size' in line:
    649             continue
    650         line = line.replace(filename, 'TEST_IMG')
    651         line = filter_testfiles(line)
    652         line = line.replace(imgfmt, 'IMGFMT')
    653         line = re.sub('iters: [0-9]+', 'iters: XXX', line)
    654         line = re.sub('uuid: [-a-f0-9]+',
    655                       'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
    656                       line)
    657         line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
    658         line = re.sub('(compression type: )(zlib|zstd)', r'\1COMPRESSION_TYPE',
    659                       line)
    660         lines.append(line)
    661     return '\n'.join(lines)
    662 
    663 def filter_imgfmt(msg):
    664     return msg.replace(imgfmt, 'IMGFMT')
    665 
    666 def filter_qmp_imgfmt(qmsg):
    667     def _filter(_key, value):
    668         if is_str(value):
    669             return filter_imgfmt(value)
    670         return value
    671     return filter_qmp(qmsg, _filter)
    672 
    673 def filter_nbd_exports(output: str) -> str:
    674     return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
    675 
    676 
    677 Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
    678 
    679 def log(msg: Msg,
    680         filters: Iterable[Callable[[Msg], Msg]] = (),
    681         indent: Optional[int] = None) -> None:
    682     """
    683     Logs either a string message or a JSON serializable message (like QMP).
    684     If indent is provided, JSON serializable messages are pretty-printed.
    685     """
    686     for flt in filters:
    687         msg = flt(msg)
    688     if isinstance(msg, (dict, list)):
    689         # Don't sort if it's already sorted
    690         do_sort = not isinstance(msg, OrderedDict)
    691         test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
    692     else:
    693         test_logger.info(msg)
    694 
    695 class Timeout:
    696     def __init__(self, seconds, errmsg="Timeout"):
    697         self.seconds = seconds
    698         self.errmsg = errmsg
    699     def __enter__(self):
    700         if qemu_gdb or qemu_valgrind:
    701             return self
    702         signal.signal(signal.SIGALRM, self.timeout)
    703         signal.setitimer(signal.ITIMER_REAL, self.seconds)
    704         return self
    705     def __exit__(self, exc_type, value, traceback):
    706         if qemu_gdb or qemu_valgrind:
    707             return False
    708         signal.setitimer(signal.ITIMER_REAL, 0)
    709         return False
    710     def timeout(self, signum, frame):
    711         raise Exception(self.errmsg)
    712 
    713 def file_pattern(name):
    714     return "{0}-{1}".format(os.getpid(), name)
    715 
    716 class FilePath:
    717     """
    718     Context manager generating multiple file names. The generated files are
    719     removed when exiting the context.
    720 
    721     Example usage:
    722 
    723         with FilePath('a.img', 'b.img') as (img_a, img_b):
    724             # Use img_a and img_b here...
    725 
    726         # a.img and b.img are automatically removed here.
    727 
    728     By default images are created in iotests.test_dir. To create sockets use
    729     iotests.sock_dir:
    730 
    731        with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
    732 
    733     For convenience, calling with one argument yields a single file instead of
    734     a tuple with one item.
    735 
    736     """
    737     def __init__(self, *names, base_dir=test_dir):
    738         self.paths = [os.path.join(base_dir, file_pattern(name))
    739                       for name in names]
    740 
    741     def __enter__(self):
    742         if len(self.paths) == 1:
    743             return self.paths[0]
    744         else:
    745             return self.paths
    746 
    747     def __exit__(self, exc_type, exc_val, exc_tb):
    748         for path in self.paths:
    749             try:
    750                 os.remove(path)
    751             except OSError:
    752                 pass
    753         return False
    754 
    755 
    756 def try_remove(img):
    757     try:
    758         os.remove(img)
    759     except OSError:
    760         pass
    761 
    762 def file_path_remover():
    763     for path in reversed(file_path_remover.paths):
    764         try_remove(path)
    765 
    766 
    767 def file_path(*names, base_dir=test_dir):
    768     ''' Another way to get auto-generated filename that cleans itself up.
    769 
    770     Use is as simple as:
    771 
    772     img_a, img_b = file_path('a.img', 'b.img')
    773     sock = file_path('socket')
    774     '''
    775 
    776     if not hasattr(file_path_remover, 'paths'):
    777         file_path_remover.paths = []
    778         atexit.register(file_path_remover)
    779 
    780     paths = []
    781     for name in names:
    782         filename = file_pattern(name)
    783         path = os.path.join(base_dir, filename)
    784         file_path_remover.paths.append(path)
    785         paths.append(path)
    786 
    787     return paths[0] if len(paths) == 1 else paths
    788 
    789 def remote_filename(path):
    790     if imgproto == 'file':
    791         return path
    792     elif imgproto == 'ssh':
    793         return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
    794     else:
    795         raise Exception("Protocol %s not supported" % (imgproto))
    796 
    797 class VM(qtest.QEMUQtestMachine):
    798     '''A QEMU VM'''
    799 
    800     def __init__(self, path_suffix=''):
    801         name = "qemu%s-%d" % (path_suffix, os.getpid())
    802         timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None
    803         if qemu_gdb and qemu_valgrind:
    804             sys.stderr.write('gdb and valgrind are mutually exclusive\n')
    805             sys.exit(1)
    806         wrapper = qemu_gdb if qemu_gdb else qemu_valgrind
    807         super().__init__(qemu_prog, qemu_opts, wrapper=wrapper,
    808                          name=name,
    809                          base_temp_dir=test_dir,
    810                          sock_dir=sock_dir, qmp_timer=timer)
    811         self._num_drives = 0
    812 
    813     def _post_shutdown(self) -> None:
    814         super()._post_shutdown()
    815         if not qemu_valgrind or not self._popen:
    816             return
    817         valgrind_filename = f"{test_dir}/{self._popen.pid}.valgrind"
    818         if self.exitcode() == 99:
    819             with open(valgrind_filename, encoding='utf-8') as f:
    820                 print(f.read())
    821         else:
    822             os.remove(valgrind_filename)
    823 
    824     def _pre_launch(self) -> None:
    825         super()._pre_launch()
    826         if qemu_print:
    827             # set QEMU binary output to stdout
    828             self._close_qemu_log_file()
    829 
    830     def add_object(self, opts):
    831         self._args.append('-object')
    832         self._args.append(opts)
    833         return self
    834 
    835     def add_device(self, opts):
    836         self._args.append('-device')
    837         self._args.append(opts)
    838         return self
    839 
    840     def add_drive_raw(self, opts):
    841         self._args.append('-drive')
    842         self._args.append(opts)
    843         return self
    844 
    845     def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
    846         '''Add a virtio-blk drive to the VM'''
    847         options = ['if=%s' % interface,
    848                    'id=drive%d' % self._num_drives]
    849 
    850         if path is not None:
    851             options.append('file=%s' % path)
    852             options.append('format=%s' % img_format)
    853             options.append('cache=%s' % cachemode)
    854             options.append('aio=%s' % aiomode)
    855 
    856         if opts:
    857             options.append(opts)
    858 
    859         if img_format == 'luks' and 'key-secret' not in opts:
    860             # default luks support
    861             if luks_default_secret_object not in self._args:
    862                 self.add_object(luks_default_secret_object)
    863 
    864             options.append(luks_default_key_secret_opt)
    865 
    866         self._args.append('-drive')
    867         self._args.append(','.join(options))
    868         self._num_drives += 1
    869         return self
    870 
    871     def add_blockdev(self, opts):
    872         self._args.append('-blockdev')
    873         if isinstance(opts, str):
    874             self._args.append(opts)
    875         else:
    876             self._args.append(','.join(opts))
    877         return self
    878 
    879     def add_incoming(self, addr):
    880         self._args.append('-incoming')
    881         self._args.append(addr)
    882         return self
    883 
    884     def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
    885         cmd = 'human-monitor-command'
    886         kwargs: Dict[str, Any] = {'command-line': command_line}
    887         if use_log:
    888             return self.qmp_log(cmd, **kwargs)
    889         else:
    890             return self.qmp(cmd, **kwargs)
    891 
    892     def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
    893         """Pause drive r/w operations"""
    894         if not event:
    895             self.pause_drive(drive, "read_aio")
    896             self.pause_drive(drive, "write_aio")
    897             return
    898         self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
    899 
    900     def resume_drive(self, drive: str) -> None:
    901         """Resume drive r/w operations"""
    902         self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
    903 
    904     def hmp_qemu_io(self, drive: str, cmd: str,
    905                     use_log: bool = False, qdev: bool = False) -> QMPMessage:
    906         """Write to a given drive using an HMP command"""
    907         d = '-d ' if qdev else ''
    908         return self.hmp(f'qemu-io {d}{drive} "{cmd}"', use_log=use_log)
    909 
    910     def flatten_qmp_object(self, obj, output=None, basestr=''):
    911         if output is None:
    912             output = {}
    913         if isinstance(obj, list):
    914             for i, item in enumerate(obj):
    915                 self.flatten_qmp_object(item, output, basestr + str(i) + '.')
    916         elif isinstance(obj, dict):
    917             for key in obj:
    918                 self.flatten_qmp_object(obj[key], output, basestr + key + '.')
    919         else:
    920             output[basestr[:-1]] = obj # Strip trailing '.'
    921         return output
    922 
    923     def qmp_to_opts(self, obj):
    924         obj = self.flatten_qmp_object(obj)
    925         output_list = []
    926         for key in obj:
    927             output_list += [key + '=' + obj[key]]
    928         return ','.join(output_list)
    929 
    930     def get_qmp_events_filtered(self, wait=60.0):
    931         result = []
    932         for ev in self.get_qmp_events(wait=wait):
    933             result.append(filter_qmp_event(ev))
    934         return result
    935 
    936     def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
    937         full_cmd = OrderedDict((
    938             ("execute", cmd),
    939             ("arguments", ordered_qmp(kwargs))
    940         ))
    941         log(full_cmd, filters, indent=indent)
    942         result = self.qmp(cmd, **kwargs)
    943         log(result, filters, indent=indent)
    944         return result
    945 
    946     # Returns None on success, and an error string on failure
    947     def run_job(self, job: str, auto_finalize: bool = True,
    948                 auto_dismiss: bool = False,
    949                 pre_finalize: Optional[Callable[[], None]] = None,
    950                 cancel: bool = False, wait: float = 60.0,
    951                 filters: Iterable[Callable[[Any], Any]] = (),
    952                 ) -> Optional[str]:
    953         """
    954         run_job moves a job from creation through to dismissal.
    955 
    956         :param job: String. ID of recently-launched job
    957         :param auto_finalize: Bool. True if the job was launched with
    958                               auto_finalize. Defaults to True.
    959         :param auto_dismiss: Bool. True if the job was launched with
    960                              auto_dismiss=True. Defaults to False.
    961         :param pre_finalize: Callback. A callable that takes no arguments to be
    962                              invoked prior to issuing job-finalize, if any.
    963         :param cancel: Bool. When true, cancels the job after the pre_finalize
    964                        callback.
    965         :param wait: Float. Timeout value specifying how long to wait for any
    966                      event, in seconds. Defaults to 60.0.
    967         """
    968         match_device = {'data': {'device': job}}
    969         match_id = {'data': {'id': job}}
    970         events = [
    971             ('BLOCK_JOB_COMPLETED', match_device),
    972             ('BLOCK_JOB_CANCELLED', match_device),
    973             ('BLOCK_JOB_ERROR', match_device),
    974             ('BLOCK_JOB_READY', match_device),
    975             ('BLOCK_JOB_PENDING', match_id),
    976             ('JOB_STATUS_CHANGE', match_id)
    977         ]
    978         error = None
    979         while True:
    980             ev = filter_qmp_event(self.events_wait(events, timeout=wait))
    981             if ev['event'] != 'JOB_STATUS_CHANGE':
    982                 log(ev, filters=filters)
    983                 continue
    984             status = ev['data']['status']
    985             if status == 'aborting':
    986                 result = self.qmp('query-jobs')
    987                 for j in result['return']:
    988                     if j['id'] == job:
    989                         error = j['error']
    990                         log('Job failed: %s' % (j['error']), filters=filters)
    991             elif status == 'ready':
    992                 self.qmp_log('job-complete', id=job, filters=filters)
    993             elif status == 'pending' and not auto_finalize:
    994                 if pre_finalize:
    995                     pre_finalize()
    996                 if cancel:
    997                     self.qmp_log('job-cancel', id=job, filters=filters)
    998                 else:
    999                     self.qmp_log('job-finalize', id=job, filters=filters)
   1000             elif status == 'concluded' and not auto_dismiss:
   1001                 self.qmp_log('job-dismiss', id=job, filters=filters)
   1002             elif status == 'null':
   1003                 return error
   1004 
   1005     # Returns None on success, and an error string on failure
   1006     def blockdev_create(self, options, job_id='job0', filters=None):
   1007         if filters is None:
   1008             filters = [filter_qmp_testfiles]
   1009         result = self.qmp_log('blockdev-create', filters=filters,
   1010                               job_id=job_id, options=options)
   1011 
   1012         if 'return' in result:
   1013             assert result['return'] == {}
   1014             job_result = self.run_job(job_id, filters=filters)
   1015         else:
   1016             job_result = result['error']
   1017 
   1018         log("")
   1019         return job_result
   1020 
   1021     def enable_migration_events(self, name):
   1022         log('Enabling migration QMP events on %s...' % name)
   1023         log(self.qmp('migrate-set-capabilities', capabilities=[
   1024             {
   1025                 'capability': 'events',
   1026                 'state': True
   1027             }
   1028         ]))
   1029 
   1030     def wait_migration(self, expect_runstate: Optional[str]) -> bool:
   1031         while True:
   1032             event = self.event_wait('MIGRATION')
   1033             # We use the default timeout, and with a timeout, event_wait()
   1034             # never returns None
   1035             assert event
   1036 
   1037             log(event, filters=[filter_qmp_event])
   1038             if event['data']['status'] in ('completed', 'failed'):
   1039                 break
   1040 
   1041         if event['data']['status'] == 'completed':
   1042             # The event may occur in finish-migrate, so wait for the expected
   1043             # post-migration runstate
   1044             runstate = None
   1045             while runstate != expect_runstate:
   1046                 runstate = self.qmp('query-status')['return']['status']
   1047             return True
   1048         else:
   1049             return False
   1050 
   1051     def node_info(self, node_name):
   1052         nodes = self.qmp('query-named-block-nodes')
   1053         for x in nodes['return']:
   1054             if x['node-name'] == node_name:
   1055                 return x
   1056         return None
   1057 
   1058     def query_bitmaps(self):
   1059         res = self.qmp("query-named-block-nodes")
   1060         return {device['node-name']: device['dirty-bitmaps']
   1061                 for device in res['return'] if 'dirty-bitmaps' in device}
   1062 
   1063     def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
   1064         """
   1065         get a specific bitmap from the object returned by query_bitmaps.
   1066         :param recording: If specified, filter results by the specified value.
   1067         :param bitmaps: If specified, use it instead of call query_bitmaps()
   1068         """
   1069         if bitmaps is None:
   1070             bitmaps = self.query_bitmaps()
   1071 
   1072         for bitmap in bitmaps[node_name]:
   1073             if bitmap.get('name', '') == bitmap_name:
   1074                 if recording is None or bitmap.get('recording') == recording:
   1075                     return bitmap
   1076         return None
   1077 
   1078     def check_bitmap_status(self, node_name, bitmap_name, fields):
   1079         ret = self.get_bitmap(node_name, bitmap_name)
   1080 
   1081         return fields.items() <= ret.items()
   1082 
   1083     def assert_block_path(self, root, path, expected_node, graph=None):
   1084         """
   1085         Check whether the node under the given path in the block graph
   1086         is @expected_node.
   1087 
   1088         @root is the node name of the node where the @path is rooted.
   1089 
   1090         @path is a string that consists of child names separated by
   1091         slashes.  It must begin with a slash.
   1092 
   1093         Examples for @root + @path:
   1094           - root="qcow2-node", path="/backing/file"
   1095           - root="quorum-node", path="/children.2/file"
   1096 
   1097         Hypothetically, @path could be empty, in which case it would
   1098         point to @root.  However, in practice this case is not useful
   1099         and hence not allowed.
   1100 
   1101         @expected_node may be None.  (All elements of the path but the
   1102         leaf must still exist.)
   1103 
   1104         @graph may be None or the result of an x-debug-query-block-graph
   1105         call that has already been performed.
   1106         """
   1107         if graph is None:
   1108             graph = self.qmp('x-debug-query-block-graph')['return']
   1109 
   1110         iter_path = iter(path.split('/'))
   1111 
   1112         # Must start with a /
   1113         assert next(iter_path) == ''
   1114 
   1115         node = next((node for node in graph['nodes'] if node['name'] == root),
   1116                     None)
   1117 
   1118         # An empty @path is not allowed, so the root node must be present
   1119         assert node is not None, 'Root node %s not found' % root
   1120 
   1121         for child_name in iter_path:
   1122             assert node is not None, 'Cannot follow path %s%s' % (root, path)
   1123 
   1124             try:
   1125                 node_id = next(edge['child'] for edge in graph['edges']
   1126                                if (edge['parent'] == node['id'] and
   1127                                    edge['name'] == child_name))
   1128 
   1129                 node = next(node for node in graph['nodes']
   1130                             if node['id'] == node_id)
   1131 
   1132             except StopIteration:
   1133                 node = None
   1134 
   1135         if node is None:
   1136             assert expected_node is None, \
   1137                    'No node found under %s (but expected %s)' % \
   1138                    (path, expected_node)
   1139         else:
   1140             assert node['name'] == expected_node, \
   1141                    'Found node %s under %s (but expected %s)' % \
   1142                    (node['name'], path, expected_node)
   1143 
   1144 index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
   1145 
   1146 class QMPTestCase(unittest.TestCase):
   1147     '''Abstract base class for QMP test cases'''
   1148 
   1149     def __init__(self, *args, **kwargs):
   1150         super().__init__(*args, **kwargs)
   1151         # Many users of this class set a VM property we rely on heavily
   1152         # in the methods below.
   1153         self.vm = None
   1154 
   1155     def dictpath(self, d, path):
   1156         '''Traverse a path in a nested dict'''
   1157         for component in path.split('/'):
   1158             m = index_re.match(component)
   1159             if m:
   1160                 component, idx = m.groups()
   1161                 idx = int(idx)
   1162 
   1163             if not isinstance(d, dict) or component not in d:
   1164                 self.fail(f'failed path traversal for "{path}" in "{d}"')
   1165             d = d[component]
   1166 
   1167             if m:
   1168                 if not isinstance(d, list):
   1169                     self.fail(f'path component "{component}" in "{path}" '
   1170                               f'is not a list in "{d}"')
   1171                 try:
   1172                     d = d[idx]
   1173                 except IndexError:
   1174                     self.fail(f'invalid index "{idx}" in path "{path}" '
   1175                               f'in "{d}"')
   1176         return d
   1177 
   1178     def assert_qmp_absent(self, d, path):
   1179         try:
   1180             result = self.dictpath(d, path)
   1181         except AssertionError:
   1182             return
   1183         self.fail('path "%s" has value "%s"' % (path, str(result)))
   1184 
   1185     def assert_qmp(self, d, path, value):
   1186         '''Assert that the value for a specific path in a QMP dict
   1187            matches.  When given a list of values, assert that any of
   1188            them matches.'''
   1189 
   1190         result = self.dictpath(d, path)
   1191 
   1192         # [] makes no sense as a list of valid values, so treat it as
   1193         # an actual single value.
   1194         if isinstance(value, list) and value != []:
   1195             for v in value:
   1196                 if result == v:
   1197                     return
   1198             self.fail('no match for "%s" in %s' % (str(result), str(value)))
   1199         else:
   1200             self.assertEqual(result, value,
   1201                              '"%s" is "%s", expected "%s"'
   1202                              % (path, str(result), str(value)))
   1203 
   1204     def assert_no_active_block_jobs(self):
   1205         result = self.vm.qmp('query-block-jobs')
   1206         self.assert_qmp(result, 'return', [])
   1207 
   1208     def assert_has_block_node(self, node_name=None, file_name=None):
   1209         """Issue a query-named-block-nodes and assert node_name and/or
   1210         file_name is present in the result"""
   1211         def check_equal_or_none(a, b):
   1212             return a is None or b is None or a == b
   1213         assert node_name or file_name
   1214         result = self.vm.qmp('query-named-block-nodes')
   1215         for x in result["return"]:
   1216             if check_equal_or_none(x.get("node-name"), node_name) and \
   1217                     check_equal_or_none(x.get("file"), file_name):
   1218                 return
   1219         self.fail("Cannot find %s %s in result:\n%s" %
   1220                   (node_name, file_name, result))
   1221 
   1222     def assert_json_filename_equal(self, json_filename, reference):
   1223         '''Asserts that the given filename is a json: filename and that its
   1224            content is equal to the given reference object'''
   1225         self.assertEqual(json_filename[:5], 'json:')
   1226         self.assertEqual(
   1227             self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
   1228             self.vm.flatten_qmp_object(reference)
   1229         )
   1230 
   1231     def cancel_and_wait(self, drive='drive0', force=False,
   1232                         resume=False, wait=60.0):
   1233         '''Cancel a block job and wait for it to finish, returning the event'''
   1234         result = self.vm.qmp('block-job-cancel', device=drive, force=force)
   1235         self.assert_qmp(result, 'return', {})
   1236 
   1237         if resume:
   1238             self.vm.resume_drive(drive)
   1239 
   1240         cancelled = False
   1241         result = None
   1242         while not cancelled:
   1243             for event in self.vm.get_qmp_events(wait=wait):
   1244                 if event['event'] == 'BLOCK_JOB_COMPLETED' or \
   1245                    event['event'] == 'BLOCK_JOB_CANCELLED':
   1246                     self.assert_qmp(event, 'data/device', drive)
   1247                     result = event
   1248                     cancelled = True
   1249                 elif event['event'] == 'JOB_STATUS_CHANGE':
   1250                     self.assert_qmp(event, 'data/id', drive)
   1251 
   1252 
   1253         self.assert_no_active_block_jobs()
   1254         return result
   1255 
   1256     def wait_until_completed(self, drive='drive0', check_offset=True,
   1257                              wait=60.0, error=None):
   1258         '''Wait for a block job to finish, returning the event'''
   1259         while True:
   1260             for event in self.vm.get_qmp_events(wait=wait):
   1261                 if event['event'] == 'BLOCK_JOB_COMPLETED':
   1262                     self.assert_qmp(event, 'data/device', drive)
   1263                     if error is None:
   1264                         self.assert_qmp_absent(event, 'data/error')
   1265                         if check_offset:
   1266                             self.assert_qmp(event, 'data/offset',
   1267                                             event['data']['len'])
   1268                     else:
   1269                         self.assert_qmp(event, 'data/error', error)
   1270                     self.assert_no_active_block_jobs()
   1271                     return event
   1272                 if event['event'] == 'JOB_STATUS_CHANGE':
   1273                     self.assert_qmp(event, 'data/id', drive)
   1274 
   1275     def wait_ready(self, drive='drive0'):
   1276         """Wait until a BLOCK_JOB_READY event, and return the event."""
   1277         return self.vm.events_wait([
   1278             ('BLOCK_JOB_READY',
   1279              {'data': {'type': 'mirror', 'device': drive}}),
   1280             ('BLOCK_JOB_READY',
   1281              {'data': {'type': 'commit', 'device': drive}})
   1282         ])
   1283 
   1284     def wait_ready_and_cancel(self, drive='drive0'):
   1285         self.wait_ready(drive=drive)
   1286         event = self.cancel_and_wait(drive=drive)
   1287         self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
   1288         self.assert_qmp(event, 'data/type', 'mirror')
   1289         self.assert_qmp(event, 'data/offset', event['data']['len'])
   1290 
   1291     def complete_and_wait(self, drive='drive0', wait_ready=True,
   1292                           completion_error=None):
   1293         '''Complete a block job and wait for it to finish'''
   1294         if wait_ready:
   1295             self.wait_ready(drive=drive)
   1296 
   1297         result = self.vm.qmp('block-job-complete', device=drive)
   1298         self.assert_qmp(result, 'return', {})
   1299 
   1300         event = self.wait_until_completed(drive=drive, error=completion_error)
   1301         self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
   1302 
   1303     def pause_wait(self, job_id='job0'):
   1304         with Timeout(3, "Timeout waiting for job to pause"):
   1305             while True:
   1306                 result = self.vm.qmp('query-block-jobs')
   1307                 found = False
   1308                 for job in result['return']:
   1309                     if job['device'] == job_id:
   1310                         found = True
   1311                         if job['paused'] and not job['busy']:
   1312                             return job
   1313                         break
   1314                 assert found
   1315 
   1316     def pause_job(self, job_id='job0', wait=True):
   1317         result = self.vm.qmp('block-job-pause', device=job_id)
   1318         self.assert_qmp(result, 'return', {})
   1319         if wait:
   1320             return self.pause_wait(job_id)
   1321         return result
   1322 
   1323     def case_skip(self, reason):
   1324         '''Skip this test case'''
   1325         case_notrun(reason)
   1326         self.skipTest(reason)
   1327 
   1328 
   1329 def notrun(reason):
   1330     '''Skip this test suite'''
   1331     # Each test in qemu-iotests has a number ("seq")
   1332     seq = os.path.basename(sys.argv[0])
   1333 
   1334     with open('%s/%s.notrun' % (test_dir, seq), 'w', encoding='utf-8') \
   1335             as outfile:
   1336         outfile.write(reason + '\n')
   1337     logger.warning("%s not run: %s", seq, reason)
   1338     sys.exit(0)
   1339 
   1340 def case_notrun(reason):
   1341     '''Mark this test case as not having been run (without actually
   1342     skipping it, that is left to the caller).  See
   1343     QMPTestCase.case_skip() for a variant that actually skips the
   1344     current test case.'''
   1345 
   1346     # Each test in qemu-iotests has a number ("seq")
   1347     seq = os.path.basename(sys.argv[0])
   1348 
   1349     with open('%s/%s.casenotrun' % (test_dir, seq), 'a', encoding='utf-8') \
   1350             as outfile:
   1351         outfile.write('    [case not run] ' + reason + '\n')
   1352 
   1353 def _verify_image_format(supported_fmts: Sequence[str] = (),
   1354                          unsupported_fmts: Sequence[str] = ()) -> None:
   1355     if 'generic' in supported_fmts and \
   1356             os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
   1357         # similar to
   1358         #   _supported_fmt generic
   1359         # for bash tests
   1360         supported_fmts = ()
   1361 
   1362     not_sup = supported_fmts and (imgfmt not in supported_fmts)
   1363     if not_sup or (imgfmt in unsupported_fmts):
   1364         notrun('not suitable for this image format: %s' % imgfmt)
   1365 
   1366     if imgfmt == 'luks':
   1367         verify_working_luks()
   1368 
   1369 def _verify_protocol(supported: Sequence[str] = (),
   1370                      unsupported: Sequence[str] = ()) -> None:
   1371     assert not (supported and unsupported)
   1372 
   1373     if 'generic' in supported:
   1374         return
   1375 
   1376     not_sup = supported and (imgproto not in supported)
   1377     if not_sup or (imgproto in unsupported):
   1378         notrun('not suitable for this protocol: %s' % imgproto)
   1379 
   1380 def _verify_platform(supported: Sequence[str] = (),
   1381                      unsupported: Sequence[str] = ()) -> None:
   1382     if any((sys.platform.startswith(x) for x in unsupported)):
   1383         notrun('not suitable for this OS: %s' % sys.platform)
   1384 
   1385     if supported:
   1386         if not any((sys.platform.startswith(x) for x in supported)):
   1387             notrun('not suitable for this OS: %s' % sys.platform)
   1388 
   1389 def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
   1390     if supported_cache_modes and (cachemode not in supported_cache_modes):
   1391         notrun('not suitable for this cache mode: %s' % cachemode)
   1392 
   1393 def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
   1394     if supported_aio_modes and (aiomode not in supported_aio_modes):
   1395         notrun('not suitable for this aio mode: %s' % aiomode)
   1396 
   1397 def _verify_formats(required_formats: Sequence[str] = ()) -> None:
   1398     usf_list = list(set(required_formats) - set(supported_formats()))
   1399     if usf_list:
   1400         notrun(f'formats {usf_list} are not whitelisted')
   1401 
   1402 
   1403 def _verify_virtio_blk() -> None:
   1404     out = qemu_pipe('-M', 'none', '-device', 'help')
   1405     if 'virtio-blk' not in out:
   1406         notrun('Missing virtio-blk in QEMU binary')
   1407 
   1408 def _verify_virtio_scsi_pci_or_ccw() -> None:
   1409     out = qemu_pipe('-M', 'none', '-device', 'help')
   1410     if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out:
   1411         notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary')
   1412 
   1413 
   1414 def _verify_imgopts(unsupported: Sequence[str] = ()) -> None:
   1415     imgopts = os.environ.get('IMGOPTS')
   1416     # One of usage examples for IMGOPTS is "data_file=$TEST_IMG.ext_data_file"
   1417     # but it supported only for bash tests. We don't have a concept of global
   1418     # TEST_IMG in iotests.py, not saying about somehow parsing $variables.
   1419     # So, for simplicity let's just not support any IMGOPTS with '$' inside.
   1420     unsup = list(unsupported) + ['$']
   1421     if imgopts and any(x in imgopts for x in unsup):
   1422         notrun(f'not suitable for this imgopts: {imgopts}')
   1423 
   1424 
   1425 def supports_quorum() -> bool:
   1426     return 'quorum' in qemu_img('--help').stdout
   1427 
   1428 def verify_quorum():
   1429     '''Skip test suite if quorum support is not available'''
   1430     if not supports_quorum():
   1431         notrun('quorum support missing')
   1432 
   1433 def has_working_luks() -> Tuple[bool, str]:
   1434     """
   1435     Check whether our LUKS driver can actually create images
   1436     (this extends to LUKS encryption for qcow2).
   1437 
   1438     If not, return the reason why.
   1439     """
   1440 
   1441     img_file = f'{test_dir}/luks-test.luks'
   1442     res = qemu_img('create', '-f', 'luks',
   1443                    '--object', luks_default_secret_object,
   1444                    '-o', luks_default_key_secret_opt,
   1445                    '-o', 'iter-time=10',
   1446                    img_file, '1G',
   1447                    check=False)
   1448     try:
   1449         os.remove(img_file)
   1450     except OSError:
   1451         pass
   1452 
   1453     if res.returncode:
   1454         reason = res.stdout
   1455         for line in res.stdout.splitlines():
   1456             if img_file + ':' in line:
   1457                 reason = line.split(img_file + ':', 1)[1].strip()
   1458                 break
   1459 
   1460         return (False, reason)
   1461     else:
   1462         return (True, '')
   1463 
   1464 def verify_working_luks():
   1465     """
   1466     Skip test suite if LUKS does not work
   1467     """
   1468     (working, reason) = has_working_luks()
   1469     if not working:
   1470         notrun(reason)
   1471 
   1472 def supports_qcow2_zstd_compression() -> bool:
   1473     img_file = f'{test_dir}/qcow2-zstd-test.qcow2'
   1474     res = qemu_img('create', '-f', 'qcow2', '-o', 'compression_type=zstd',
   1475                    img_file, '0',
   1476                    check=False)
   1477     try:
   1478         os.remove(img_file)
   1479     except OSError:
   1480         pass
   1481 
   1482     if res.returncode == 1 and \
   1483             "'compression-type' does not accept value 'zstd'" in res.stdout:
   1484         return False
   1485     else:
   1486         return True
   1487 
   1488 def verify_qcow2_zstd_compression():
   1489     if not supports_qcow2_zstd_compression():
   1490         notrun('zstd compression not supported')
   1491 
   1492 def qemu_pipe(*args: str) -> str:
   1493     """
   1494     Run qemu with an option to print something and exit (e.g. a help option).
   1495 
   1496     :return: QEMU's stdout output.
   1497     """
   1498     full_args = [qemu_prog] + qemu_opts + list(args)
   1499     output, _ = qemu_tool_pipe_and_status('qemu', full_args)
   1500     return output
   1501 
   1502 def supported_formats(read_only=False):
   1503     '''Set 'read_only' to True to check ro-whitelist
   1504        Otherwise, rw-whitelist is checked'''
   1505 
   1506     if not hasattr(supported_formats, "formats"):
   1507         supported_formats.formats = {}
   1508 
   1509     if read_only not in supported_formats.formats:
   1510         format_message = qemu_pipe("-drive", "format=help")
   1511         line = 1 if read_only else 0
   1512         supported_formats.formats[read_only] = \
   1513             format_message.splitlines()[line].split(":")[1].split()
   1514 
   1515     return supported_formats.formats[read_only]
   1516 
   1517 def skip_if_unsupported(required_formats=(), read_only=False):
   1518     '''Skip Test Decorator
   1519        Runs the test if all the required formats are whitelisted'''
   1520     def skip_test_decorator(func):
   1521         def func_wrapper(test_case: QMPTestCase, *args: List[Any],
   1522                          **kwargs: Dict[str, Any]) -> None:
   1523             if callable(required_formats):
   1524                 fmts = required_formats(test_case)
   1525             else:
   1526                 fmts = required_formats
   1527 
   1528             usf_list = list(set(fmts) - set(supported_formats(read_only)))
   1529             if usf_list:
   1530                 msg = f'{test_case}: formats {usf_list} are not whitelisted'
   1531                 test_case.case_skip(msg)
   1532             else:
   1533                 func(test_case, *args, **kwargs)
   1534         return func_wrapper
   1535     return skip_test_decorator
   1536 
   1537 def skip_for_formats(formats: Sequence[str] = ()) \
   1538     -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
   1539                 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
   1540     '''Skip Test Decorator
   1541        Skips the test for the given formats'''
   1542     def skip_test_decorator(func):
   1543         def func_wrapper(test_case: QMPTestCase, *args: List[Any],
   1544                          **kwargs: Dict[str, Any]) -> None:
   1545             if imgfmt in formats:
   1546                 msg = f'{test_case}: Skipped for format {imgfmt}'
   1547                 test_case.case_skip(msg)
   1548             else:
   1549                 func(test_case, *args, **kwargs)
   1550         return func_wrapper
   1551     return skip_test_decorator
   1552 
   1553 def skip_if_user_is_root(func):
   1554     '''Skip Test Decorator
   1555        Runs the test only without root permissions'''
   1556     def func_wrapper(*args, **kwargs):
   1557         if os.getuid() == 0:
   1558             case_notrun('{}: cannot be run as root'.format(args[0]))
   1559             return None
   1560         else:
   1561             return func(*args, **kwargs)
   1562     return func_wrapper
   1563 
   1564 # We need to filter out the time taken from the output so that
   1565 # qemu-iotest can reliably diff the results against master output,
   1566 # and hide skipped tests from the reference output.
   1567 
   1568 class ReproducibleTestResult(unittest.TextTestResult):
   1569     def addSkip(self, test, reason):
   1570         # Same as TextTestResult, but print dot instead of "s"
   1571         unittest.TestResult.addSkip(self, test, reason)
   1572         if self.showAll:
   1573             self.stream.writeln("skipped {0!r}".format(reason))
   1574         elif self.dots:
   1575             self.stream.write(".")
   1576             self.stream.flush()
   1577 
   1578 class ReproducibleStreamWrapper:
   1579     def __init__(self, stream: TextIO):
   1580         self.stream = stream
   1581 
   1582     def __getattr__(self, attr):
   1583         if attr in ('stream', '__getstate__'):
   1584             raise AttributeError(attr)
   1585         return getattr(self.stream, attr)
   1586 
   1587     def write(self, arg=None):
   1588         arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg)
   1589         arg = re.sub(r' \(skipped=\d+\)', r'', arg)
   1590         self.stream.write(arg)
   1591 
   1592 class ReproducibleTestRunner(unittest.TextTestRunner):
   1593     def __init__(self, stream: Optional[TextIO] = None,
   1594                  resultclass: Type[unittest.TestResult] =
   1595                  ReproducibleTestResult,
   1596                  **kwargs: Any) -> None:
   1597         rstream = ReproducibleStreamWrapper(stream or sys.stdout)
   1598         super().__init__(stream=rstream,           # type: ignore
   1599                          descriptions=True,
   1600                          resultclass=resultclass,
   1601                          **kwargs)
   1602 
   1603 def execute_unittest(argv: List[str], debug: bool = False) -> None:
   1604     """Executes unittests within the calling module."""
   1605 
   1606     # Some tests have warnings, especially ResourceWarnings for unclosed
   1607     # files and sockets.  Ignore them for now to ensure reproducibility of
   1608     # the test output.
   1609     unittest.main(argv=argv,
   1610                   testRunner=ReproducibleTestRunner,
   1611                   verbosity=2 if debug else 1,
   1612                   warnings=None if sys.warnoptions else 'ignore')
   1613 
   1614 def execute_setup_common(supported_fmts: Sequence[str] = (),
   1615                          supported_platforms: Sequence[str] = (),
   1616                          supported_cache_modes: Sequence[str] = (),
   1617                          supported_aio_modes: Sequence[str] = (),
   1618                          unsupported_fmts: Sequence[str] = (),
   1619                          supported_protocols: Sequence[str] = (),
   1620                          unsupported_protocols: Sequence[str] = (),
   1621                          required_fmts: Sequence[str] = (),
   1622                          unsupported_imgopts: Sequence[str] = ()) -> bool:
   1623     """
   1624     Perform necessary setup for either script-style or unittest-style tests.
   1625 
   1626     :return: Bool; Whether or not debug mode has been requested via the CLI.
   1627     """
   1628     # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
   1629 
   1630     debug = '-d' in sys.argv
   1631     if debug:
   1632         sys.argv.remove('-d')
   1633     logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
   1634 
   1635     _verify_image_format(supported_fmts, unsupported_fmts)
   1636     _verify_protocol(supported_protocols, unsupported_protocols)
   1637     _verify_platform(supported=supported_platforms)
   1638     _verify_cache_mode(supported_cache_modes)
   1639     _verify_aio_mode(supported_aio_modes)
   1640     _verify_formats(required_fmts)
   1641     _verify_virtio_blk()
   1642     _verify_imgopts(unsupported_imgopts)
   1643 
   1644     return debug
   1645 
   1646 def execute_test(*args, test_function=None, **kwargs):
   1647     """Run either unittest or script-style tests."""
   1648 
   1649     debug = execute_setup_common(*args, **kwargs)
   1650     if not test_function:
   1651         execute_unittest(sys.argv, debug)
   1652     else:
   1653         test_function()
   1654 
   1655 def activate_logging():
   1656     """Activate iotests.log() output to stdout for script-style tests."""
   1657     handler = logging.StreamHandler(stream=sys.stdout)
   1658     formatter = logging.Formatter('%(message)s')
   1659     handler.setFormatter(formatter)
   1660     test_logger.addHandler(handler)
   1661     test_logger.setLevel(logging.INFO)
   1662     test_logger.propagate = False
   1663 
   1664 # This is called from script-style iotests without a single point of entry
   1665 def script_initialize(*args, **kwargs):
   1666     """Initialize script-style tests without running any tests."""
   1667     activate_logging()
   1668     execute_setup_common(*args, **kwargs)
   1669 
   1670 # This is called from script-style iotests with a single point of entry
   1671 def script_main(test_function, *args, **kwargs):
   1672     """Run script-style tests outside of the unittest framework"""
   1673     activate_logging()
   1674     execute_test(*args, test_function=test_function, **kwargs)
   1675 
   1676 # This is called from unittest style iotests
   1677 def main(*args, **kwargs):
   1678     """Run tests using the unittest framework"""
   1679     execute_test(*args, **kwargs)