runner.py (16343B)
1 #!/usr/bin/env python3 2 3 # Tool for running fuzz tests 4 # 5 # Copyright (C) 2014 Maria Kustova <maria.k@catit.be> 6 # 7 # This program is free software: you can redistribute it and/or modify 8 # it under the terms of the GNU General Public License as published by 9 # the Free Software Foundation, either version 2 of the License, or 10 # (at your option) any later version. 11 # 12 # This program is distributed in the hope that it will be useful, 13 # but WITHOUT ANY WARRANTY; without even the implied warranty of 14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 15 # GNU General Public License for more details. 16 # 17 # You should have received a copy of the GNU General Public License 18 # along with this program. If not, see <http://www.gnu.org/licenses/>. 19 # 20 21 import sys 22 import os 23 import signal 24 import subprocess 25 import random 26 import shutil 27 from itertools import count 28 import time 29 import getopt 30 import io 31 import resource 32 33 try: 34 import json 35 except ImportError: 36 try: 37 import simplejson as json 38 except ImportError: 39 print("Warning: Module for JSON processing is not found.\n" \ 40 "'--config' and '--command' options are not supported.", file=sys.stderr) 41 42 # Backing file sizes in MB 43 MAX_BACKING_FILE_SIZE = 10 44 MIN_BACKING_FILE_SIZE = 1 45 46 47 def multilog(msg, *output): 48 """ Write an object to all of specified file descriptors.""" 49 for fd in output: 50 fd.write(msg) 51 fd.flush() 52 53 54 def str_signal(sig): 55 """ Convert a numeric value of a system signal to the string one 56 defined by the current operational system. 57 """ 58 for k, v in signal.__dict__.items(): 59 if v == sig: 60 return k 61 62 63 def run_app(fd, q_args): 64 """Start an application with specified arguments and return its exit code 65 or kill signal depending on the result of execution. 66 """ 67 68 class Alarm(Exception): 69 """Exception for signal.alarm events.""" 70 pass 71 72 def handler(*args): 73 """Notify that an alarm event occurred.""" 74 raise Alarm 75 76 signal.signal(signal.SIGALRM, handler) 77 signal.alarm(600) 78 term_signal = signal.SIGKILL 79 devnull = open('/dev/null', 'r+') 80 process = subprocess.Popen(q_args, stdin=devnull, 81 stdout=subprocess.PIPE, 82 stderr=subprocess.PIPE, 83 errors='replace') 84 try: 85 out, err = process.communicate() 86 signal.alarm(0) 87 fd.write(out) 88 fd.write(err) 89 fd.flush() 90 return process.returncode 91 92 except Alarm: 93 os.kill(process.pid, term_signal) 94 fd.write('The command was terminated by timeout.\n') 95 fd.flush() 96 return -term_signal 97 98 99 class TestException(Exception): 100 """Exception for errors risen by TestEnv objects.""" 101 pass 102 103 104 class TestEnv(object): 105 106 """Test object. 107 108 The class sets up test environment, generates backing and test images 109 and executes application under tests with specified arguments and a test 110 image provided. 111 112 All logs are collected. 113 114 The summary log will contain short descriptions and statuses of tests in 115 a run. 116 117 The test log will include application (e.g. 'qemu-img') logs besides info 118 sent to the summary log. 119 """ 120 121 def __init__(self, test_id, seed, work_dir, run_log, 122 cleanup=True, log_all=False): 123 """Set test environment in a specified work directory. 124 125 Path to qemu-img and qemu-io will be retrieved from 'QEMU_IMG' and 126 'QEMU_IO' environment variables. 127 """ 128 if seed is not None: 129 self.seed = seed 130 else: 131 self.seed = str(random.randint(0, sys.maxsize)) 132 random.seed(self.seed) 133 134 self.init_path = os.getcwd() 135 self.work_dir = work_dir 136 self.current_dir = os.path.join(work_dir, 'test-' + test_id) 137 self.qemu_img = \ 138 os.environ.get('QEMU_IMG', 'qemu-img').strip().split(' ') 139 self.qemu_io = os.environ.get('QEMU_IO', 'qemu-io').strip().split(' ') 140 self.commands = [['qemu-img', 'check', '-f', 'qcow2', '$test_img'], 141 ['qemu-img', 'info', '-f', 'qcow2', '$test_img'], 142 ['qemu-io', '$test_img', '-c', 'read $off $len'], 143 ['qemu-io', '$test_img', '-c', 'write $off $len'], 144 ['qemu-io', '$test_img', '-c', 145 'aio_read $off $len'], 146 ['qemu-io', '$test_img', '-c', 147 'aio_write $off $len'], 148 ['qemu-io', '$test_img', '-c', 'flush'], 149 ['qemu-io', '$test_img', '-c', 150 'discard $off $len'], 151 ['qemu-io', '$test_img', '-c', 152 'truncate $off']] 153 for fmt in ['raw', 'vmdk', 'vdi', 'qcow2', 'file', 'qed', 'vpc']: 154 self.commands.append( 155 ['qemu-img', 'convert', '-f', 'qcow2', '-O', fmt, 156 '$test_img', 'converted_image.' + fmt]) 157 158 try: 159 os.makedirs(self.current_dir) 160 except OSError as e: 161 print("Error: The working directory '%s' cannot be used. Reason: %s"\ 162 % (self.work_dir, e.strerror), file=sys.stderr) 163 raise TestException 164 self.log = open(os.path.join(self.current_dir, "test.log"), "w") 165 self.parent_log = open(run_log, "a") 166 self.failed = False 167 self.cleanup = cleanup 168 self.log_all = log_all 169 170 def _create_backing_file(self): 171 """Create a backing file in the current directory. 172 173 Return a tuple of a backing file name and format. 174 175 Format of a backing file is randomly chosen from all formats supported 176 by 'qemu-img create'. 177 """ 178 # All formats supported by the 'qemu-img create' command. 179 backing_file_fmt = random.choice(['raw', 'vmdk', 'vdi', 'qcow2', 180 'file', 'qed', 'vpc']) 181 backing_file_name = 'backing_img.' + backing_file_fmt 182 backing_file_size = random.randint(MIN_BACKING_FILE_SIZE, 183 MAX_BACKING_FILE_SIZE) * (1 << 20) 184 cmd = self.qemu_img + ['create', '-f', backing_file_fmt, 185 backing_file_name, str(backing_file_size)] 186 temp_log = io.StringIO() 187 retcode = run_app(temp_log, cmd) 188 if retcode == 0: 189 temp_log.close() 190 return (backing_file_name, backing_file_fmt) 191 else: 192 multilog("Warning: The %s backing file was not created.\n\n" 193 % backing_file_fmt, sys.stderr, self.log, self.parent_log) 194 self.log.write("Log for the failure:\n" + temp_log.getvalue() + 195 '\n\n') 196 temp_log.close() 197 return (None, None) 198 199 def execute(self, input_commands=None, fuzz_config=None): 200 """ Execute a test. 201 202 The method creates backing and test images, runs test app and analyzes 203 its exit status. If the application was killed by a signal, the test 204 is marked as failed. 205 """ 206 if input_commands is None: 207 commands = self.commands 208 else: 209 commands = input_commands 210 211 os.chdir(self.current_dir) 212 backing_file_name, backing_file_fmt = self._create_backing_file() 213 img_size = image_generator.create_image( 214 'test.img', backing_file_name, backing_file_fmt, fuzz_config) 215 for item in commands: 216 shutil.copy('test.img', 'copy.img') 217 # 'off' and 'len' are multiple of the sector size 218 sector_size = 512 219 start = random.randrange(0, img_size + 1, sector_size) 220 end = random.randrange(start, img_size + 1, sector_size) 221 222 if item[0] == 'qemu-img': 223 current_cmd = list(self.qemu_img) 224 elif item[0] == 'qemu-io': 225 current_cmd = list(self.qemu_io) 226 else: 227 multilog("Warning: test command '%s' is not defined.\n" 228 % item[0], sys.stderr, self.log, self.parent_log) 229 continue 230 # Replace all placeholders with their real values 231 for v in item[1:]: 232 c = (v 233 .replace('$test_img', 'copy.img') 234 .replace('$off', str(start)) 235 .replace('$len', str(end - start))) 236 current_cmd.append(c) 237 238 # Log string with the test header 239 test_summary = "Seed: %s\nCommand: %s\nTest directory: %s\n" \ 240 "Backing file: %s\n" \ 241 % (self.seed, " ".join(current_cmd), 242 self.current_dir, backing_file_name) 243 temp_log = io.StringIO() 244 try: 245 retcode = run_app(temp_log, current_cmd) 246 except OSError as e: 247 multilog("%sError: Start of '%s' failed. Reason: %s\n\n" 248 % (test_summary, os.path.basename(current_cmd[0]), 249 e.strerror), 250 sys.stderr, self.log, self.parent_log) 251 raise TestException 252 253 if retcode < 0: 254 self.log.write(temp_log.getvalue()) 255 multilog("%sFAIL: Test terminated by signal %s\n\n" 256 % (test_summary, str_signal(-retcode)), 257 sys.stderr, self.log, self.parent_log) 258 self.failed = True 259 else: 260 if self.log_all: 261 self.log.write(temp_log.getvalue()) 262 multilog("%sPASS: Application exited with the code " \ 263 "'%d'\n\n" % (test_summary, retcode), 264 sys.stdout, self.log, self.parent_log) 265 temp_log.close() 266 os.remove('copy.img') 267 268 def finish(self): 269 """Restore the test environment after a test execution.""" 270 self.log.close() 271 self.parent_log.close() 272 os.chdir(self.init_path) 273 if self.cleanup and not self.failed: 274 shutil.rmtree(self.current_dir) 275 276 if __name__ == '__main__': 277 278 def usage(): 279 print(""" 280 Usage: runner.py [OPTION...] TEST_DIR IMG_GENERATOR 281 282 Set up test environment in TEST_DIR and run a test in it. A module for 283 test image generation should be specified via IMG_GENERATOR. 284 285 Example: 286 runner.py -c '[["qemu-img", "info", "$test_img"]]' /tmp/test qcow2 287 288 Optional arguments: 289 -h, --help display this help and exit 290 -d, --duration=NUMBER finish tests after NUMBER of seconds 291 -c, --command=JSON run tests for all commands specified in 292 the JSON array 293 -s, --seed=STRING seed for a test image generation, 294 by default will be generated randomly 295 --config=JSON take fuzzer configuration from the JSON 296 array 297 -k, --keep_passed don't remove folders of passed tests 298 -v, --verbose log information about passed tests 299 300 JSON: 301 302 '--command' accepts a JSON array of commands. Each command presents 303 an application under test with all its parameters as a list of strings, 304 e.g. ["qemu-io", "$test_img", "-c", "write $off $len"]. 305 306 Supported application aliases: 'qemu-img' and 'qemu-io'. 307 308 Supported argument aliases: $test_img for the fuzzed image, $off 309 for an offset, $len for length. 310 311 Values for $off and $len will be generated based on the virtual disk 312 size of the fuzzed image. 313 314 Paths to 'qemu-img' and 'qemu-io' are retrevied from 'QEMU_IMG' and 315 'QEMU_IO' environment variables. 316 317 '--config' accepts a JSON array of fields to be fuzzed, e.g. 318 '[["header"], ["header", "version"]]'. 319 320 Each of the list elements can consist of a complex image element only 321 as ["header"] or ["feature_name_table"] or an exact field as 322 ["header", "version"]. In the first case random portion of the element 323 fields will be fuzzed, in the second one the specified field will be 324 fuzzed always. 325 326 If '--config' argument is specified, fields not listed in 327 the configuration array will not be fuzzed. 328 """) 329 330 def run_test(test_id, seed, work_dir, run_log, cleanup, log_all, 331 command, fuzz_config): 332 """Setup environment for one test and execute this test.""" 333 try: 334 test = TestEnv(test_id, seed, work_dir, run_log, cleanup, 335 log_all) 336 except TestException: 337 sys.exit(1) 338 339 # Python 2.4 doesn't support 'finally' and 'except' in the same 'try' 340 # block 341 try: 342 try: 343 test.execute(command, fuzz_config) 344 except TestException: 345 sys.exit(1) 346 finally: 347 test.finish() 348 349 def should_continue(duration, start_time): 350 """Return True if a new test can be started and False otherwise.""" 351 current_time = int(time.time()) 352 return (duration is None) or (current_time - start_time < duration) 353 354 try: 355 opts, args = getopt.gnu_getopt(sys.argv[1:], 'c:hs:kvd:', 356 ['command=', 'help', 'seed=', 'config=', 357 'keep_passed', 'verbose', 'duration=']) 358 except getopt.error as e: 359 print("Error: %s\n\nTry 'runner.py --help' for more information" % e, file=sys.stderr) 360 sys.exit(1) 361 362 command = None 363 cleanup = True 364 log_all = False 365 seed = None 366 config = None 367 duration = None 368 for opt, arg in opts: 369 if opt in ('-h', '--help'): 370 usage() 371 sys.exit() 372 elif opt in ('-c', '--command'): 373 try: 374 command = json.loads(arg) 375 except (TypeError, ValueError, NameError) as e: 376 print("Error: JSON array of test commands cannot be loaded.\n" \ 377 "Reason: %s" % e, file=sys.stderr) 378 sys.exit(1) 379 elif opt in ('-k', '--keep_passed'): 380 cleanup = False 381 elif opt in ('-v', '--verbose'): 382 log_all = True 383 elif opt in ('-s', '--seed'): 384 seed = arg 385 elif opt in ('-d', '--duration'): 386 duration = int(arg) 387 elif opt == '--config': 388 try: 389 config = json.loads(arg) 390 except (TypeError, ValueError, NameError) as e: 391 print("Error: JSON array with the fuzzer configuration cannot" \ 392 " be loaded\nReason: %s" % e, file=sys.stderr) 393 sys.exit(1) 394 395 if not len(args) == 2: 396 print("Expected two parameters\nTry 'runner.py --help'" \ 397 " for more information.", file=sys.stderr) 398 sys.exit(1) 399 400 work_dir = os.path.realpath(args[0]) 401 # run_log is created in 'main', because multiple tests are expected to 402 # log in it 403 run_log = os.path.join(work_dir, 'run.log') 404 405 # Add the path to the image generator module to sys.path 406 sys.path.append(os.path.realpath(os.path.dirname(args[1]))) 407 # Remove a script extension from image generator module if any 408 generator_name = os.path.splitext(os.path.basename(args[1]))[0] 409 410 try: 411 image_generator = __import__(generator_name) 412 except ImportError as e: 413 print("Error: The image generator '%s' cannot be imported.\n" \ 414 "Reason: %s" % (generator_name, e), file=sys.stderr) 415 sys.exit(1) 416 417 # Enable core dumps 418 resource.setrlimit(resource.RLIMIT_CORE, (-1, -1)) 419 # If a seed is specified, only one test will be executed. 420 # Otherwise runner will terminate after a keyboard interruption 421 start_time = int(time.time()) 422 test_id = count(1) 423 while should_continue(duration, start_time): 424 try: 425 run_test(str(next(test_id)), seed, work_dir, run_log, cleanup, 426 log_all, command, config) 427 except (KeyboardInterrupt, SystemExit): 428 sys.exit(1) 429 430 if seed is not None: 431 break