qemu

FORK: QEMU emulator
git clone https://git.neptards.moe/neptards/qemu.git
Log | Files | Refs | Submodules | LICENSE

runner.py (16343B)


      1 #!/usr/bin/env python3
      2 
      3 # Tool for running fuzz tests
      4 #
      5 # Copyright (C) 2014 Maria Kustova <maria.k@catit.be>
      6 #
      7 # This program is free software: you can redistribute it and/or modify
      8 # it under the terms of the GNU General Public License as published by
      9 # the Free Software Foundation, either version 2 of the License, or
     10 # (at your option) any later version.
     11 #
     12 # This program is distributed in the hope that it will be useful,
     13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
     14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     15 # GNU General Public License for more details.
     16 #
     17 # You should have received a copy of the GNU General Public License
     18 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
     19 #
     20 
     21 import sys
     22 import os
     23 import signal
     24 import subprocess
     25 import random
     26 import shutil
     27 from itertools import count
     28 import time
     29 import getopt
     30 import io
     31 import resource
     32 
     33 try:
     34     import json
     35 except ImportError:
     36     try:
     37         import simplejson as json
     38     except ImportError:
     39         print("Warning: Module for JSON processing is not found.\n" \
     40             "'--config' and '--command' options are not supported.", file=sys.stderr)
     41 
     42 # Backing file sizes in MB
     43 MAX_BACKING_FILE_SIZE = 10
     44 MIN_BACKING_FILE_SIZE = 1
     45 
     46 
     47 def multilog(msg, *output):
     48     """ Write an object to all of specified file descriptors."""
     49     for fd in output:
     50         fd.write(msg)
     51         fd.flush()
     52 
     53 
     54 def str_signal(sig):
     55     """ Convert a numeric value of a system signal to the string one
     56     defined by the current operational system.
     57     """
     58     for k, v in signal.__dict__.items():
     59         if v == sig:
     60             return k
     61 
     62 
     63 def run_app(fd, q_args):
     64     """Start an application with specified arguments and return its exit code
     65     or kill signal depending on the result of execution.
     66     """
     67 
     68     class Alarm(Exception):
     69         """Exception for signal.alarm events."""
     70         pass
     71 
     72     def handler(*args):
     73         """Notify that an alarm event occurred."""
     74         raise Alarm
     75 
     76     signal.signal(signal.SIGALRM, handler)
     77     signal.alarm(600)
     78     term_signal = signal.SIGKILL
     79     devnull = open('/dev/null', 'r+')
     80     process = subprocess.Popen(q_args, stdin=devnull,
     81                                stdout=subprocess.PIPE,
     82                                stderr=subprocess.PIPE,
     83                                errors='replace')
     84     try:
     85         out, err = process.communicate()
     86         signal.alarm(0)
     87         fd.write(out)
     88         fd.write(err)
     89         fd.flush()
     90         return process.returncode
     91 
     92     except Alarm:
     93         os.kill(process.pid, term_signal)
     94         fd.write('The command was terminated by timeout.\n')
     95         fd.flush()
     96         return -term_signal
     97 
     98 
     99 class TestException(Exception):
    100     """Exception for errors risen by TestEnv objects."""
    101     pass
    102 
    103 
    104 class TestEnv(object):
    105 
    106     """Test object.
    107 
    108     The class sets up test environment, generates backing and test images
    109     and executes application under tests with specified arguments and a test
    110     image provided.
    111 
    112     All logs are collected.
    113 
    114     The summary log will contain short descriptions and statuses of tests in
    115     a run.
    116 
    117     The test log will include application (e.g. 'qemu-img') logs besides info
    118     sent to the summary log.
    119     """
    120 
    121     def __init__(self, test_id, seed, work_dir, run_log,
    122                  cleanup=True, log_all=False):
    123         """Set test environment in a specified work directory.
    124 
    125         Path to qemu-img and qemu-io will be retrieved from 'QEMU_IMG' and
    126         'QEMU_IO' environment variables.
    127         """
    128         if seed is not None:
    129             self.seed = seed
    130         else:
    131             self.seed = str(random.randint(0, sys.maxsize))
    132         random.seed(self.seed)
    133 
    134         self.init_path = os.getcwd()
    135         self.work_dir = work_dir
    136         self.current_dir = os.path.join(work_dir, 'test-' + test_id)
    137         self.qemu_img = \
    138             os.environ.get('QEMU_IMG', 'qemu-img').strip().split(' ')
    139         self.qemu_io = os.environ.get('QEMU_IO', 'qemu-io').strip().split(' ')
    140         self.commands = [['qemu-img', 'check', '-f', 'qcow2', '$test_img'],
    141                          ['qemu-img', 'info', '-f', 'qcow2', '$test_img'],
    142                          ['qemu-io', '$test_img', '-c', 'read $off $len'],
    143                          ['qemu-io', '$test_img', '-c', 'write $off $len'],
    144                          ['qemu-io', '$test_img', '-c',
    145                           'aio_read $off $len'],
    146                          ['qemu-io', '$test_img', '-c',
    147                           'aio_write $off $len'],
    148                          ['qemu-io', '$test_img', '-c', 'flush'],
    149                          ['qemu-io', '$test_img', '-c',
    150                           'discard $off $len'],
    151                          ['qemu-io', '$test_img', '-c',
    152                           'truncate $off']]
    153         for fmt in ['raw', 'vmdk', 'vdi', 'qcow2', 'file', 'qed', 'vpc']:
    154             self.commands.append(
    155                 ['qemu-img', 'convert', '-f', 'qcow2', '-O', fmt,
    156                  '$test_img', 'converted_image.' + fmt])
    157 
    158         try:
    159             os.makedirs(self.current_dir)
    160         except OSError as e:
    161             print("Error: The working directory '%s' cannot be used. Reason: %s"\
    162                 % (self.work_dir, e.strerror), file=sys.stderr)
    163             raise TestException
    164         self.log = open(os.path.join(self.current_dir, "test.log"), "w")
    165         self.parent_log = open(run_log, "a")
    166         self.failed = False
    167         self.cleanup = cleanup
    168         self.log_all = log_all
    169 
    170     def _create_backing_file(self):
    171         """Create a backing file in the current directory.
    172 
    173         Return a tuple of a backing file name and format.
    174 
    175         Format of a backing file is randomly chosen from all formats supported
    176         by 'qemu-img create'.
    177         """
    178         # All formats supported by the 'qemu-img create' command.
    179         backing_file_fmt = random.choice(['raw', 'vmdk', 'vdi', 'qcow2',
    180                                           'file', 'qed', 'vpc'])
    181         backing_file_name = 'backing_img.' + backing_file_fmt
    182         backing_file_size = random.randint(MIN_BACKING_FILE_SIZE,
    183                                            MAX_BACKING_FILE_SIZE) * (1 << 20)
    184         cmd = self.qemu_img + ['create', '-f', backing_file_fmt,
    185                                backing_file_name, str(backing_file_size)]
    186         temp_log = io.StringIO()
    187         retcode = run_app(temp_log, cmd)
    188         if retcode == 0:
    189             temp_log.close()
    190             return (backing_file_name, backing_file_fmt)
    191         else:
    192             multilog("Warning: The %s backing file was not created.\n\n"
    193                      % backing_file_fmt, sys.stderr, self.log, self.parent_log)
    194             self.log.write("Log for the failure:\n" + temp_log.getvalue() +
    195                            '\n\n')
    196             temp_log.close()
    197             return (None, None)
    198 
    199     def execute(self, input_commands=None, fuzz_config=None):
    200         """ Execute a test.
    201 
    202         The method creates backing and test images, runs test app and analyzes
    203         its exit status. If the application was killed by a signal, the test
    204         is marked as failed.
    205         """
    206         if input_commands is None:
    207             commands = self.commands
    208         else:
    209             commands = input_commands
    210 
    211         os.chdir(self.current_dir)
    212         backing_file_name, backing_file_fmt = self._create_backing_file()
    213         img_size = image_generator.create_image(
    214             'test.img', backing_file_name, backing_file_fmt, fuzz_config)
    215         for item in commands:
    216             shutil.copy('test.img', 'copy.img')
    217             # 'off' and 'len' are multiple of the sector size
    218             sector_size = 512
    219             start = random.randrange(0, img_size + 1, sector_size)
    220             end = random.randrange(start, img_size + 1, sector_size)
    221 
    222             if item[0] == 'qemu-img':
    223                 current_cmd = list(self.qemu_img)
    224             elif item[0] == 'qemu-io':
    225                 current_cmd = list(self.qemu_io)
    226             else:
    227                 multilog("Warning: test command '%s' is not defined.\n"
    228                          % item[0], sys.stderr, self.log, self.parent_log)
    229                 continue
    230             # Replace all placeholders with their real values
    231             for v in item[1:]:
    232                 c = (v
    233                      .replace('$test_img', 'copy.img')
    234                      .replace('$off', str(start))
    235                      .replace('$len', str(end - start)))
    236                 current_cmd.append(c)
    237 
    238             # Log string with the test header
    239             test_summary = "Seed: %s\nCommand: %s\nTest directory: %s\n" \
    240                            "Backing file: %s\n" \
    241                            % (self.seed, " ".join(current_cmd),
    242                               self.current_dir, backing_file_name)
    243             temp_log = io.StringIO()
    244             try:
    245                 retcode = run_app(temp_log, current_cmd)
    246             except OSError as e:
    247                 multilog("%sError: Start of '%s' failed. Reason: %s\n\n"
    248                          % (test_summary, os.path.basename(current_cmd[0]),
    249                             e.strerror),
    250                          sys.stderr, self.log, self.parent_log)
    251                 raise TestException
    252 
    253             if retcode < 0:
    254                 self.log.write(temp_log.getvalue())
    255                 multilog("%sFAIL: Test terminated by signal %s\n\n"
    256                          % (test_summary, str_signal(-retcode)),
    257                          sys.stderr, self.log, self.parent_log)
    258                 self.failed = True
    259             else:
    260                 if self.log_all:
    261                     self.log.write(temp_log.getvalue())
    262                     multilog("%sPASS: Application exited with the code " \
    263                              "'%d'\n\n" % (test_summary, retcode),
    264                              sys.stdout, self.log, self.parent_log)
    265             temp_log.close()
    266             os.remove('copy.img')
    267 
    268     def finish(self):
    269         """Restore the test environment after a test execution."""
    270         self.log.close()
    271         self.parent_log.close()
    272         os.chdir(self.init_path)
    273         if self.cleanup and not self.failed:
    274             shutil.rmtree(self.current_dir)
    275 
    276 if __name__ == '__main__':
    277 
    278     def usage():
    279         print("""
    280         Usage: runner.py [OPTION...] TEST_DIR IMG_GENERATOR
    281 
    282         Set up test environment in TEST_DIR and run a test in it. A module for
    283         test image generation should be specified via IMG_GENERATOR.
    284 
    285         Example:
    286           runner.py -c '[["qemu-img", "info", "$test_img"]]' /tmp/test qcow2
    287 
    288         Optional arguments:
    289           -h, --help                    display this help and exit
    290           -d, --duration=NUMBER         finish tests after NUMBER of seconds
    291           -c, --command=JSON            run tests for all commands specified in
    292                                         the JSON array
    293           -s, --seed=STRING             seed for a test image generation,
    294                                         by default will be generated randomly
    295           --config=JSON                 take fuzzer configuration from the JSON
    296                                         array
    297           -k, --keep_passed             don't remove folders of passed tests
    298           -v, --verbose                 log information about passed tests
    299 
    300         JSON:
    301 
    302         '--command' accepts a JSON array of commands. Each command presents
    303         an application under test with all its parameters as a list of strings,
    304         e.g. ["qemu-io", "$test_img", "-c", "write $off $len"].
    305 
    306         Supported application aliases: 'qemu-img' and 'qemu-io'.
    307 
    308         Supported argument aliases: $test_img for the fuzzed image, $off
    309         for an offset, $len for length.
    310 
    311         Values for $off and $len will be generated based on the virtual disk
    312         size of the fuzzed image.
    313 
    314         Paths to 'qemu-img' and 'qemu-io' are retrevied from 'QEMU_IMG' and
    315         'QEMU_IO' environment variables.
    316 
    317         '--config' accepts a JSON array of fields to be fuzzed, e.g.
    318         '[["header"], ["header", "version"]]'.
    319 
    320         Each of the list elements can consist of a complex image element only
    321         as ["header"] or ["feature_name_table"] or an exact field as
    322         ["header", "version"]. In the first case random portion of the element
    323         fields will be fuzzed, in the second one the specified field will be
    324         fuzzed always.
    325 
    326         If '--config' argument is specified, fields not listed in
    327         the configuration array will not be fuzzed.
    328         """)
    329 
    330     def run_test(test_id, seed, work_dir, run_log, cleanup, log_all,
    331                  command, fuzz_config):
    332         """Setup environment for one test and execute this test."""
    333         try:
    334             test = TestEnv(test_id, seed, work_dir, run_log, cleanup,
    335                            log_all)
    336         except TestException:
    337             sys.exit(1)
    338 
    339         # Python 2.4 doesn't support 'finally' and 'except' in the same 'try'
    340         # block
    341         try:
    342             try:
    343                 test.execute(command, fuzz_config)
    344             except TestException:
    345                 sys.exit(1)
    346         finally:
    347             test.finish()
    348 
    349     def should_continue(duration, start_time):
    350         """Return True if a new test can be started and False otherwise."""
    351         current_time = int(time.time())
    352         return (duration is None) or (current_time - start_time < duration)
    353 
    354     try:
    355         opts, args = getopt.gnu_getopt(sys.argv[1:], 'c:hs:kvd:',
    356                                        ['command=', 'help', 'seed=', 'config=',
    357                                         'keep_passed', 'verbose', 'duration='])
    358     except getopt.error as e:
    359         print("Error: %s\n\nTry 'runner.py --help' for more information" % e, file=sys.stderr)
    360         sys.exit(1)
    361 
    362     command = None
    363     cleanup = True
    364     log_all = False
    365     seed = None
    366     config = None
    367     duration = None
    368     for opt, arg in opts:
    369         if opt in ('-h', '--help'):
    370             usage()
    371             sys.exit()
    372         elif opt in ('-c', '--command'):
    373             try:
    374                 command = json.loads(arg)
    375             except (TypeError, ValueError, NameError) as e:
    376                 print("Error: JSON array of test commands cannot be loaded.\n" \
    377                     "Reason: %s" % e, file=sys.stderr)
    378                 sys.exit(1)
    379         elif opt in ('-k', '--keep_passed'):
    380             cleanup = False
    381         elif opt in ('-v', '--verbose'):
    382             log_all = True
    383         elif opt in ('-s', '--seed'):
    384             seed = arg
    385         elif opt in ('-d', '--duration'):
    386             duration = int(arg)
    387         elif opt == '--config':
    388             try:
    389                 config = json.loads(arg)
    390             except (TypeError, ValueError, NameError) as e:
    391                 print("Error: JSON array with the fuzzer configuration cannot" \
    392                     " be loaded\nReason: %s" % e, file=sys.stderr)
    393                 sys.exit(1)
    394 
    395     if not len(args) == 2:
    396         print("Expected two parameters\nTry 'runner.py --help'" \
    397             " for more information.", file=sys.stderr)
    398         sys.exit(1)
    399 
    400     work_dir = os.path.realpath(args[0])
    401     # run_log is created in 'main', because multiple tests are expected to
    402     # log in it
    403     run_log = os.path.join(work_dir, 'run.log')
    404 
    405     # Add the path to the image generator module to sys.path
    406     sys.path.append(os.path.realpath(os.path.dirname(args[1])))
    407     # Remove a script extension from image generator module if any
    408     generator_name = os.path.splitext(os.path.basename(args[1]))[0]
    409 
    410     try:
    411         image_generator = __import__(generator_name)
    412     except ImportError as e:
    413         print("Error: The image generator '%s' cannot be imported.\n" \
    414             "Reason: %s" % (generator_name, e), file=sys.stderr)
    415         sys.exit(1)
    416 
    417     # Enable core dumps
    418     resource.setrlimit(resource.RLIMIT_CORE, (-1, -1))
    419     # If a seed is specified, only one test will be executed.
    420     # Otherwise runner will terminate after a keyboard interruption
    421     start_time = int(time.time())
    422     test_id = count(1)
    423     while should_continue(duration, start_time):
    424         try:
    425             run_test(str(next(test_id)), seed, work_dir, run_log, cleanup,
    426                      log_all, command, config)
    427         except (KeyboardInterrupt, SystemExit):
    428             sys.exit(1)
    429 
    430         if seed is not None:
    431             break