runner.py (16343B)
1#!/usr/bin/env python3 2 3# Tool for running fuzz tests 4# 5# Copyright (C) 2014 Maria Kustova <maria.k@catit.be> 6# 7# This program is free software: you can redistribute it and/or modify 8# it under the terms of the GNU General Public License as published by 9# the Free Software Foundation, either version 2 of the License, or 10# (at your option) any later version. 11# 12# This program is distributed in the hope that it will be useful, 13# but WITHOUT ANY WARRANTY; without even the implied warranty of 14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 15# GNU General Public License for more details. 16# 17# You should have received a copy of the GNU General Public License 18# along with this program. If not, see <http://www.gnu.org/licenses/>. 19# 20 21import sys 22import os 23import signal 24import subprocess 25import random 26import shutil 27from itertools import count 28import time 29import getopt 30import io 31import resource 32 33try: 34 import json 35except ImportError: 36 try: 37 import simplejson as json 38 except ImportError: 39 print("Warning: Module for JSON processing is not found.\n" \ 40 "'--config' and '--command' options are not supported.", file=sys.stderr) 41 42# Backing file sizes in MB 43MAX_BACKING_FILE_SIZE = 10 44MIN_BACKING_FILE_SIZE = 1 45 46 47def multilog(msg, *output): 48 """ Write an object to all of specified file descriptors.""" 49 for fd in output: 50 fd.write(msg) 51 fd.flush() 52 53 54def str_signal(sig): 55 """ Convert a numeric value of a system signal to the string one 56 defined by the current operational system. 57 """ 58 for k, v in signal.__dict__.items(): 59 if v == sig: 60 return k 61 62 63def run_app(fd, q_args): 64 """Start an application with specified arguments and return its exit code 65 or kill signal depending on the result of execution. 66 """ 67 68 class Alarm(Exception): 69 """Exception for signal.alarm events.""" 70 pass 71 72 def handler(*args): 73 """Notify that an alarm event occurred.""" 74 raise Alarm 75 76 signal.signal(signal.SIGALRM, handler) 77 signal.alarm(600) 78 term_signal = signal.SIGKILL 79 devnull = open('/dev/null', 'r+') 80 process = subprocess.Popen(q_args, stdin=devnull, 81 stdout=subprocess.PIPE, 82 stderr=subprocess.PIPE, 83 errors='replace') 84 try: 85 out, err = process.communicate() 86 signal.alarm(0) 87 fd.write(out) 88 fd.write(err) 89 fd.flush() 90 return process.returncode 91 92 except Alarm: 93 os.kill(process.pid, term_signal) 94 fd.write('The command was terminated by timeout.\n') 95 fd.flush() 96 return -term_signal 97 98 99class TestException(Exception): 100 """Exception for errors risen by TestEnv objects.""" 101 pass 102 103 104class TestEnv(object): 105 106 """Test object. 107 108 The class sets up test environment, generates backing and test images 109 and executes application under tests with specified arguments and a test 110 image provided. 111 112 All logs are collected. 113 114 The summary log will contain short descriptions and statuses of tests in 115 a run. 116 117 The test log will include application (e.g. 'qemu-img') logs besides info 118 sent to the summary log. 119 """ 120 121 def __init__(self, test_id, seed, work_dir, run_log, 122 cleanup=True, log_all=False): 123 """Set test environment in a specified work directory. 124 125 Path to qemu-img and qemu-io will be retrieved from 'QEMU_IMG' and 126 'QEMU_IO' environment variables. 127 """ 128 if seed is not None: 129 self.seed = seed 130 else: 131 self.seed = str(random.randint(0, sys.maxsize)) 132 random.seed(self.seed) 133 134 self.init_path = os.getcwd() 135 self.work_dir = work_dir 136 self.current_dir = os.path.join(work_dir, 'test-' + test_id) 137 self.qemu_img = \ 138 os.environ.get('QEMU_IMG', 'qemu-img').strip().split(' ') 139 self.qemu_io = os.environ.get('QEMU_IO', 'qemu-io').strip().split(' ') 140 self.commands = [['qemu-img', 'check', '-f', 'qcow2', '$test_img'], 141 ['qemu-img', 'info', '-f', 'qcow2', '$test_img'], 142 ['qemu-io', '$test_img', '-c', 'read $off $len'], 143 ['qemu-io', '$test_img', '-c', 'write $off $len'], 144 ['qemu-io', '$test_img', '-c', 145 'aio_read $off $len'], 146 ['qemu-io', '$test_img', '-c', 147 'aio_write $off $len'], 148 ['qemu-io', '$test_img', '-c', 'flush'], 149 ['qemu-io', '$test_img', '-c', 150 'discard $off $len'], 151 ['qemu-io', '$test_img', '-c', 152 'truncate $off']] 153 for fmt in ['raw', 'vmdk', 'vdi', 'qcow2', 'file', 'qed', 'vpc']: 154 self.commands.append( 155 ['qemu-img', 'convert', '-f', 'qcow2', '-O', fmt, 156 '$test_img', 'converted_image.' + fmt]) 157 158 try: 159 os.makedirs(self.current_dir) 160 except OSError as e: 161 print("Error: The working directory '%s' cannot be used. Reason: %s"\ 162 % (self.work_dir, e.strerror), file=sys.stderr) 163 raise TestException 164 self.log = open(os.path.join(self.current_dir, "test.log"), "w") 165 self.parent_log = open(run_log, "a") 166 self.failed = False 167 self.cleanup = cleanup 168 self.log_all = log_all 169 170 def _create_backing_file(self): 171 """Create a backing file in the current directory. 172 173 Return a tuple of a backing file name and format. 174 175 Format of a backing file is randomly chosen from all formats supported 176 by 'qemu-img create'. 177 """ 178 # All formats supported by the 'qemu-img create' command. 179 backing_file_fmt = random.choice(['raw', 'vmdk', 'vdi', 'qcow2', 180 'file', 'qed', 'vpc']) 181 backing_file_name = 'backing_img.' + backing_file_fmt 182 backing_file_size = random.randint(MIN_BACKING_FILE_SIZE, 183 MAX_BACKING_FILE_SIZE) * (1 << 20) 184 cmd = self.qemu_img + ['create', '-f', backing_file_fmt, 185 backing_file_name, str(backing_file_size)] 186 temp_log = io.StringIO() 187 retcode = run_app(temp_log, cmd) 188 if retcode == 0: 189 temp_log.close() 190 return (backing_file_name, backing_file_fmt) 191 else: 192 multilog("Warning: The %s backing file was not created.\n\n" 193 % backing_file_fmt, sys.stderr, self.log, self.parent_log) 194 self.log.write("Log for the failure:\n" + temp_log.getvalue() + 195 '\n\n') 196 temp_log.close() 197 return (None, None) 198 199 def execute(self, input_commands=None, fuzz_config=None): 200 """ Execute a test. 201 202 The method creates backing and test images, runs test app and analyzes 203 its exit status. If the application was killed by a signal, the test 204 is marked as failed. 205 """ 206 if input_commands is None: 207 commands = self.commands 208 else: 209 commands = input_commands 210 211 os.chdir(self.current_dir) 212 backing_file_name, backing_file_fmt = self._create_backing_file() 213 img_size = image_generator.create_image( 214 'test.img', backing_file_name, backing_file_fmt, fuzz_config) 215 for item in commands: 216 shutil.copy('test.img', 'copy.img') 217 # 'off' and 'len' are multiple of the sector size 218 sector_size = 512 219 start = random.randrange(0, img_size + 1, sector_size) 220 end = random.randrange(start, img_size + 1, sector_size) 221 222 if item[0] == 'qemu-img': 223 current_cmd = list(self.qemu_img) 224 elif item[0] == 'qemu-io': 225 current_cmd = list(self.qemu_io) 226 else: 227 multilog("Warning: test command '%s' is not defined.\n" 228 % item[0], sys.stderr, self.log, self.parent_log) 229 continue 230 # Replace all placeholders with their real values 231 for v in item[1:]: 232 c = (v 233 .replace('$test_img', 'copy.img') 234 .replace('$off', str(start)) 235 .replace('$len', str(end - start))) 236 current_cmd.append(c) 237 238 # Log string with the test header 239 test_summary = "Seed: %s\nCommand: %s\nTest directory: %s\n" \ 240 "Backing file: %s\n" \ 241 % (self.seed, " ".join(current_cmd), 242 self.current_dir, backing_file_name) 243 temp_log = io.StringIO() 244 try: 245 retcode = run_app(temp_log, current_cmd) 246 except OSError as e: 247 multilog("%sError: Start of '%s' failed. Reason: %s\n\n" 248 % (test_summary, os.path.basename(current_cmd[0]), 249 e.strerror), 250 sys.stderr, self.log, self.parent_log) 251 raise TestException 252 253 if retcode < 0: 254 self.log.write(temp_log.getvalue()) 255 multilog("%sFAIL: Test terminated by signal %s\n\n" 256 % (test_summary, str_signal(-retcode)), 257 sys.stderr, self.log, self.parent_log) 258 self.failed = True 259 else: 260 if self.log_all: 261 self.log.write(temp_log.getvalue()) 262 multilog("%sPASS: Application exited with the code " \ 263 "'%d'\n\n" % (test_summary, retcode), 264 sys.stdout, self.log, self.parent_log) 265 temp_log.close() 266 os.remove('copy.img') 267 268 def finish(self): 269 """Restore the test environment after a test execution.""" 270 self.log.close() 271 self.parent_log.close() 272 os.chdir(self.init_path) 273 if self.cleanup and not self.failed: 274 shutil.rmtree(self.current_dir) 275 276if __name__ == '__main__': 277 278 def usage(): 279 print(""" 280 Usage: runner.py [OPTION...] TEST_DIR IMG_GENERATOR 281 282 Set up test environment in TEST_DIR and run a test in it. A module for 283 test image generation should be specified via IMG_GENERATOR. 284 285 Example: 286 runner.py -c '[["qemu-img", "info", "$test_img"]]' /tmp/test qcow2 287 288 Optional arguments: 289 -h, --help display this help and exit 290 -d, --duration=NUMBER finish tests after NUMBER of seconds 291 -c, --command=JSON run tests for all commands specified in 292 the JSON array 293 -s, --seed=STRING seed for a test image generation, 294 by default will be generated randomly 295 --config=JSON take fuzzer configuration from the JSON 296 array 297 -k, --keep_passed don't remove folders of passed tests 298 -v, --verbose log information about passed tests 299 300 JSON: 301 302 '--command' accepts a JSON array of commands. Each command presents 303 an application under test with all its parameters as a list of strings, 304 e.g. ["qemu-io", "$test_img", "-c", "write $off $len"]. 305 306 Supported application aliases: 'qemu-img' and 'qemu-io'. 307 308 Supported argument aliases: $test_img for the fuzzed image, $off 309 for an offset, $len for length. 310 311 Values for $off and $len will be generated based on the virtual disk 312 size of the fuzzed image. 313 314 Paths to 'qemu-img' and 'qemu-io' are retrevied from 'QEMU_IMG' and 315 'QEMU_IO' environment variables. 316 317 '--config' accepts a JSON array of fields to be fuzzed, e.g. 318 '[["header"], ["header", "version"]]'. 319 320 Each of the list elements can consist of a complex image element only 321 as ["header"] or ["feature_name_table"] or an exact field as 322 ["header", "version"]. In the first case random portion of the element 323 fields will be fuzzed, in the second one the specified field will be 324 fuzzed always. 325 326 If '--config' argument is specified, fields not listed in 327 the configuration array will not be fuzzed. 328 """) 329 330 def run_test(test_id, seed, work_dir, run_log, cleanup, log_all, 331 command, fuzz_config): 332 """Setup environment for one test and execute this test.""" 333 try: 334 test = TestEnv(test_id, seed, work_dir, run_log, cleanup, 335 log_all) 336 except TestException: 337 sys.exit(1) 338 339 # Python 2.4 doesn't support 'finally' and 'except' in the same 'try' 340 # block 341 try: 342 try: 343 test.execute(command, fuzz_config) 344 except TestException: 345 sys.exit(1) 346 finally: 347 test.finish() 348 349 def should_continue(duration, start_time): 350 """Return True if a new test can be started and False otherwise.""" 351 current_time = int(time.time()) 352 return (duration is None) or (current_time - start_time < duration) 353 354 try: 355 opts, args = getopt.gnu_getopt(sys.argv[1:], 'c:hs:kvd:', 356 ['command=', 'help', 'seed=', 'config=', 357 'keep_passed', 'verbose', 'duration=']) 358 except getopt.error as e: 359 print("Error: %s\n\nTry 'runner.py --help' for more information" % e, file=sys.stderr) 360 sys.exit(1) 361 362 command = None 363 cleanup = True 364 log_all = False 365 seed = None 366 config = None 367 duration = None 368 for opt, arg in opts: 369 if opt in ('-h', '--help'): 370 usage() 371 sys.exit() 372 elif opt in ('-c', '--command'): 373 try: 374 command = json.loads(arg) 375 except (TypeError, ValueError, NameError) as e: 376 print("Error: JSON array of test commands cannot be loaded.\n" \ 377 "Reason: %s" % e, file=sys.stderr) 378 sys.exit(1) 379 elif opt in ('-k', '--keep_passed'): 380 cleanup = False 381 elif opt in ('-v', '--verbose'): 382 log_all = True 383 elif opt in ('-s', '--seed'): 384 seed = arg 385 elif opt in ('-d', '--duration'): 386 duration = int(arg) 387 elif opt == '--config': 388 try: 389 config = json.loads(arg) 390 except (TypeError, ValueError, NameError) as e: 391 print("Error: JSON array with the fuzzer configuration cannot" \ 392 " be loaded\nReason: %s" % e, file=sys.stderr) 393 sys.exit(1) 394 395 if not len(args) == 2: 396 print("Expected two parameters\nTry 'runner.py --help'" \ 397 " for more information.", file=sys.stderr) 398 sys.exit(1) 399 400 work_dir = os.path.realpath(args[0]) 401 # run_log is created in 'main', because multiple tests are expected to 402 # log in it 403 run_log = os.path.join(work_dir, 'run.log') 404 405 # Add the path to the image generator module to sys.path 406 sys.path.append(os.path.realpath(os.path.dirname(args[1]))) 407 # Remove a script extension from image generator module if any 408 generator_name = os.path.splitext(os.path.basename(args[1]))[0] 409 410 try: 411 image_generator = __import__(generator_name) 412 except ImportError as e: 413 print("Error: The image generator '%s' cannot be imported.\n" \ 414 "Reason: %s" % (generator_name, e), file=sys.stderr) 415 sys.exit(1) 416 417 # Enable core dumps 418 resource.setrlimit(resource.RLIMIT_CORE, (-1, -1)) 419 # If a seed is specified, only one test will be executed. 420 # Otherwise runner will terminate after a keyboard interruption 421 start_time = int(time.time()) 422 test_id = count(1) 423 while should_continue(duration, start_time): 424 try: 425 run_test(str(next(test_id)), seed, work_dir, run_log, cleanup, 426 log_all, command, config) 427 except (KeyboardInterrupt, SystemExit): 428 sys.exit(1) 429 430 if seed is not None: 431 break