qemu

FORK: QEMU emulator
git clone https://git.neptards.moe/neptards/qemu.git
Log | Files | Refs | Submodules | LICENSE

testenv.py (11361B)


      1 # TestEnv class to manage test environment variables.
      2 #
      3 # Copyright (c) 2020-2021 Virtuozzo International GmbH
      4 #
      5 # This program is free software; you can redistribute it and/or modify
      6 # it under the terms of the GNU General Public License as published by
      7 # the Free Software Foundation; either version 2 of the License, or
      8 # (at your option) any later version.
      9 #
     10 # This program is distributed in the hope that it will be useful,
     11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
     12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     13 # GNU General Public License for more details.
     14 #
     15 # You should have received a copy of the GNU General Public License
     16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
     17 #
     18 
     19 import os
     20 import sys
     21 import tempfile
     22 from pathlib import Path
     23 import shutil
     24 import collections
     25 import random
     26 import subprocess
     27 import glob
     28 from typing import List, Dict, Any, Optional, ContextManager
     29 
     30 DEF_GDB_OPTIONS = 'localhost:12345'
     31 
     32 def isxfile(path: str) -> bool:
     33     return os.path.isfile(path) and os.access(path, os.X_OK)
     34 
     35 
     36 def get_default_machine(qemu_prog: str) -> str:
     37     outp = subprocess.run([qemu_prog, '-machine', 'help'], check=True,
     38                           universal_newlines=True,
     39                           stdout=subprocess.PIPE).stdout
     40 
     41     machines = outp.split('\n')
     42     try:
     43         default_machine = next(m for m in machines if m.endswith(' (default)'))
     44     except StopIteration:
     45         return ''
     46     default_machine = default_machine.split(' ', 1)[0]
     47 
     48     alias_suf = ' (alias of {})'.format(default_machine)
     49     alias = next((m for m in machines if m.endswith(alias_suf)), None)
     50     if alias is not None:
     51         default_machine = alias.split(' ', 1)[0]
     52 
     53     return default_machine
     54 
     55 
     56 class TestEnv(ContextManager['TestEnv']):
     57     """
     58     Manage system environment for running tests
     59 
     60     The following variables are supported/provided. They are represented by
     61     lower-cased TestEnv attributes.
     62     """
     63 
     64     # We store environment variables as instance attributes, and there are a
     65     # lot of them. Silence pylint:
     66     # pylint: disable=too-many-instance-attributes
     67 
     68     env_variables = ['PYTHONPATH', 'TEST_DIR', 'SOCK_DIR', 'SAMPLE_IMG_DIR',
     69                      'PYTHON', 'QEMU_PROG', 'QEMU_IMG_PROG',
     70                      'QEMU_IO_PROG', 'QEMU_NBD_PROG', 'QSD_PROG',
     71                      'QEMU_OPTIONS', 'QEMU_IMG_OPTIONS',
     72                      'QEMU_IO_OPTIONS', 'QEMU_IO_OPTIONS_NO_FMT',
     73                      'QEMU_NBD_OPTIONS', 'IMGOPTS', 'IMGFMT', 'IMGPROTO',
     74                      'AIOMODE', 'CACHEMODE', 'VALGRIND_QEMU',
     75                      'CACHEMODE_IS_DEFAULT', 'IMGFMT_GENERIC', 'IMGOPTSSYNTAX',
     76                      'IMGKEYSECRET', 'QEMU_DEFAULT_MACHINE', 'MALLOC_PERTURB_',
     77                      'GDB_OPTIONS', 'PRINT_QEMU']
     78 
     79     def prepare_subprocess(self, args: List[str]) -> Dict[str, str]:
     80         if self.debug:
     81             args.append('-d')
     82 
     83         with open(args[0], encoding="utf-8") as f:
     84             try:
     85                 if f.readline().rstrip() == '#!/usr/bin/env python3':
     86                     args.insert(0, self.python)
     87             except UnicodeDecodeError:  # binary test? for future.
     88                 pass
     89 
     90         os_env = os.environ.copy()
     91         os_env.update(self.get_env())
     92         return os_env
     93 
     94     def get_env(self) -> Dict[str, str]:
     95         env = {}
     96         for v in self.env_variables:
     97             val = getattr(self, v.lower(), None)
     98             if val is not None:
     99                 env[v] = val
    100 
    101         return env
    102 
    103     def init_directories(self) -> None:
    104         """Init directory variables:
    105              PYTHONPATH
    106              TEST_DIR
    107              SOCK_DIR
    108              SAMPLE_IMG_DIR
    109         """
    110 
    111         # Path where qemu goodies live in this source tree.
    112         qemu_srctree_path = Path(__file__, '../../../python').resolve()
    113 
    114         self.pythonpath = os.pathsep.join(filter(None, (
    115             self.source_iotests,
    116             str(qemu_srctree_path),
    117             os.getenv('PYTHONPATH'),
    118         )))
    119 
    120         self.test_dir = os.getenv('TEST_DIR',
    121                                   os.path.join(os.getcwd(), 'scratch'))
    122         Path(self.test_dir).mkdir(parents=True, exist_ok=True)
    123 
    124         try:
    125             self.sock_dir = os.environ['SOCK_DIR']
    126             self.tmp_sock_dir = False
    127             Path(self.sock_dir).mkdir(parents=True, exist_ok=True)
    128         except KeyError:
    129             self.sock_dir = tempfile.mkdtemp()
    130             self.tmp_sock_dir = True
    131 
    132         self.sample_img_dir = os.getenv('SAMPLE_IMG_DIR',
    133                                         os.path.join(self.source_iotests,
    134                                                      'sample_images'))
    135 
    136     def init_binaries(self) -> None:
    137         """Init binary path variables:
    138              PYTHON (for bash tests)
    139              QEMU_PROG, QEMU_IMG_PROG, QEMU_IO_PROG, QEMU_NBD_PROG, QSD_PROG
    140         """
    141         self.python = sys.executable
    142 
    143         def root(*names: str) -> str:
    144             return os.path.join(self.build_root, *names)
    145 
    146         arch = os.uname().machine
    147         if 'ppc64' in arch:
    148             arch = 'ppc64'
    149 
    150         self.qemu_prog = os.getenv('QEMU_PROG', root(f'qemu-system-{arch}'))
    151         if not os.path.exists(self.qemu_prog):
    152             pattern = root('qemu-system-*')
    153             try:
    154                 progs = sorted(glob.iglob(pattern))
    155                 self.qemu_prog = next(p for p in progs if isxfile(p))
    156             except StopIteration:
    157                 sys.exit("Not found any Qemu executable binary by pattern "
    158                          f"'{pattern}'")
    159 
    160         self.qemu_img_prog = os.getenv('QEMU_IMG_PROG', root('qemu-img'))
    161         self.qemu_io_prog = os.getenv('QEMU_IO_PROG', root('qemu-io'))
    162         self.qemu_nbd_prog = os.getenv('QEMU_NBD_PROG', root('qemu-nbd'))
    163         self.qsd_prog = os.getenv('QSD_PROG', root('storage-daemon',
    164                                                    'qemu-storage-daemon'))
    165 
    166         for b in [self.qemu_img_prog, self.qemu_io_prog, self.qemu_nbd_prog,
    167                   self.qemu_prog, self.qsd_prog]:
    168             if not os.path.exists(b):
    169                 sys.exit('No such file: ' + b)
    170             if not isxfile(b):
    171                 sys.exit('Not executable: ' + b)
    172 
    173     def __init__(self, imgfmt: str, imgproto: str, aiomode: str,
    174                  cachemode: Optional[str] = None,
    175                  imgopts: Optional[str] = None,
    176                  misalign: bool = False,
    177                  debug: bool = False,
    178                  valgrind: bool = False,
    179                  gdb: bool = False,
    180                  qprint: bool = False) -> None:
    181         self.imgfmt = imgfmt
    182         self.imgproto = imgproto
    183         self.aiomode = aiomode
    184         self.imgopts = imgopts
    185         self.misalign = misalign
    186         self.debug = debug
    187 
    188         if qprint:
    189             self.print_qemu = 'y'
    190 
    191         if gdb:
    192             self.gdb_options = os.getenv('GDB_OPTIONS', DEF_GDB_OPTIONS)
    193             if not self.gdb_options:
    194                 # cover the case 'export GDB_OPTIONS='
    195                 self.gdb_options = DEF_GDB_OPTIONS
    196         elif 'GDB_OPTIONS' in os.environ:
    197             # to not propagate it in prepare_subprocess()
    198             del os.environ['GDB_OPTIONS']
    199 
    200         if valgrind:
    201             self.valgrind_qemu = 'y'
    202 
    203         if cachemode is None:
    204             self.cachemode_is_default = 'true'
    205             self.cachemode = 'writeback'
    206         else:
    207             self.cachemode_is_default = 'false'
    208             self.cachemode = cachemode
    209 
    210         # Initialize generic paths: build_root, build_iotests, source_iotests,
    211         # which are needed to initialize some environment variables. They are
    212         # used by init_*() functions as well.
    213 
    214         if os.path.islink(sys.argv[0]):
    215             # called from the build tree
    216             self.source_iotests = os.path.dirname(os.readlink(sys.argv[0]))
    217             self.build_iotests = os.path.dirname(os.path.abspath(sys.argv[0]))
    218         else:
    219             # called from the source tree
    220             self.source_iotests = os.getcwd()
    221             self.build_iotests = self.source_iotests
    222 
    223         self.build_root = os.path.join(self.build_iotests, '..', '..')
    224 
    225         self.init_directories()
    226         self.init_binaries()
    227 
    228         self.malloc_perturb_ = os.getenv('MALLOC_PERTURB_',
    229                                          str(random.randrange(1, 255)))
    230 
    231         # QEMU_OPTIONS
    232         self.qemu_options = '-nodefaults -display none -accel qtest'
    233         machine_map = (
    234             ('arm', 'virt'),
    235             ('aarch64', 'virt'),
    236             ('avr', 'mega2560'),
    237             ('m68k', 'virt'),
    238             ('riscv32', 'virt'),
    239             ('riscv64', 'virt'),
    240             ('rx', 'gdbsim-r5f562n8'),
    241             ('tricore', 'tricore_testboard')
    242         )
    243         for suffix, machine in machine_map:
    244             if self.qemu_prog.endswith(f'qemu-system-{suffix}'):
    245                 self.qemu_options += f' -machine {machine}'
    246 
    247         # QEMU_DEFAULT_MACHINE
    248         self.qemu_default_machine = get_default_machine(self.qemu_prog)
    249 
    250         self.qemu_img_options = os.getenv('QEMU_IMG_OPTIONS')
    251         self.qemu_nbd_options = os.getenv('QEMU_NBD_OPTIONS')
    252 
    253         is_generic = self.imgfmt not in ['bochs', 'cloop', 'dmg']
    254         self.imgfmt_generic = 'true' if is_generic else 'false'
    255 
    256         self.qemu_io_options = f'--cache {self.cachemode} --aio {self.aiomode}'
    257         if self.misalign:
    258             self.qemu_io_options += ' --misalign'
    259 
    260         self.qemu_io_options_no_fmt = self.qemu_io_options
    261 
    262         if self.imgfmt == 'luks':
    263             self.imgoptssyntax = 'true'
    264             self.imgkeysecret = '123456'
    265             if not self.imgopts:
    266                 self.imgopts = 'iter-time=10'
    267             elif 'iter-time=' not in self.imgopts:
    268                 self.imgopts += ',iter-time=10'
    269         else:
    270             self.imgoptssyntax = 'false'
    271             self.qemu_io_options += ' -f ' + self.imgfmt
    272 
    273         if self.imgfmt == 'vmdk':
    274             if not self.imgopts:
    275                 self.imgopts = 'zeroed_grain=on'
    276             elif 'zeroed_grain=' not in self.imgopts:
    277                 self.imgopts += ',zeroed_grain=on'
    278 
    279     def close(self) -> None:
    280         if self.tmp_sock_dir:
    281             shutil.rmtree(self.sock_dir)
    282 
    283     def __enter__(self) -> 'TestEnv':
    284         return self
    285 
    286     def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
    287         self.close()
    288 
    289     def print_env(self, prefix: str = '') -> None:
    290         template = """\
    291 {prefix}QEMU          -- "{QEMU_PROG}" {QEMU_OPTIONS}
    292 {prefix}QEMU_IMG      -- "{QEMU_IMG_PROG}" {QEMU_IMG_OPTIONS}
    293 {prefix}QEMU_IO       -- "{QEMU_IO_PROG}" {QEMU_IO_OPTIONS}
    294 {prefix}QEMU_NBD      -- "{QEMU_NBD_PROG}" {QEMU_NBD_OPTIONS}
    295 {prefix}IMGFMT        -- {IMGFMT}{imgopts}
    296 {prefix}IMGPROTO      -- {IMGPROTO}
    297 {prefix}PLATFORM      -- {platform}
    298 {prefix}TEST_DIR      -- {TEST_DIR}
    299 {prefix}SOCK_DIR      -- {SOCK_DIR}
    300 {prefix}GDB_OPTIONS   -- {GDB_OPTIONS}
    301 {prefix}VALGRIND_QEMU -- {VALGRIND_QEMU}
    302 {prefix}PRINT_QEMU_OUTPUT -- {PRINT_QEMU}
    303 {prefix}"""
    304 
    305         args = collections.defaultdict(str, self.get_env())
    306 
    307         if 'IMGOPTS' in args:
    308             args['imgopts'] = f" ({args['IMGOPTS']})"
    309 
    310         u = os.uname()
    311         args['platform'] = f'{u.sysname}/{u.machine} {u.nodename} {u.release}'
    312         args['prefix'] = prefix
    313         print(template.format_map(args))