cachepc-linux

Fork of AMDESE/linux with modifications for CachePC side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-linux
Log | Files | Refs | README | LICENSE | sfeed.txt

kunit.py (17870B)


      1#!/usr/bin/env python3
      2# SPDX-License-Identifier: GPL-2.0
      3#
      4# A thin wrapper on top of the KUnit Kernel
      5#
      6# Copyright (C) 2019, Google LLC.
      7# Author: Felix Guo <felixguoxiuping@gmail.com>
      8# Author: Brendan Higgins <brendanhiggins@google.com>
      9
     10import argparse
     11import os
     12import re
     13import sys
     14import time
     15
     16assert sys.version_info >= (3, 7), "Python version is too old"
     17
     18from dataclasses import dataclass
     19from enum import Enum, auto
     20from typing import Iterable, List, Optional, Sequence, Tuple
     21
     22import kunit_json
     23import kunit_kernel
     24import kunit_parser
     25
     26class KunitStatus(Enum):
     27	SUCCESS = auto()
     28	CONFIG_FAILURE = auto()
     29	BUILD_FAILURE = auto()
     30	TEST_FAILURE = auto()
     31
     32@dataclass
     33class KunitResult:
     34	status: KunitStatus
     35	elapsed_time: float
     36
     37@dataclass
     38class KunitConfigRequest:
     39	build_dir: str
     40	make_options: Optional[List[str]]
     41
     42@dataclass
     43class KunitBuildRequest(KunitConfigRequest):
     44	jobs: int
     45	alltests: bool
     46
     47@dataclass
     48class KunitParseRequest:
     49	raw_output: Optional[str]
     50	json: Optional[str]
     51
     52@dataclass
     53class KunitExecRequest(KunitParseRequest):
     54	build_dir: str
     55	timeout: int
     56	alltests: bool
     57	filter_glob: str
     58	kernel_args: Optional[List[str]]
     59	run_isolated: Optional[str]
     60
     61@dataclass
     62class KunitRequest(KunitExecRequest, KunitBuildRequest):
     63	pass
     64
     65
     66def get_kernel_root_path() -> str:
     67	path = sys.argv[0] if not __file__ else __file__
     68	parts = os.path.realpath(path).split('tools/testing/kunit')
     69	if len(parts) != 2:
     70		sys.exit(1)
     71	return parts[0]
     72
     73def config_tests(linux: kunit_kernel.LinuxSourceTree,
     74		 request: KunitConfigRequest) -> KunitResult:
     75	kunit_parser.print_with_timestamp('Configuring KUnit Kernel ...')
     76
     77	config_start = time.time()
     78	success = linux.build_reconfig(request.build_dir, request.make_options)
     79	config_end = time.time()
     80	if not success:
     81		return KunitResult(KunitStatus.CONFIG_FAILURE,
     82				   config_end - config_start)
     83	return KunitResult(KunitStatus.SUCCESS,
     84			   config_end - config_start)
     85
     86def build_tests(linux: kunit_kernel.LinuxSourceTree,
     87		request: KunitBuildRequest) -> KunitResult:
     88	kunit_parser.print_with_timestamp('Building KUnit Kernel ...')
     89
     90	build_start = time.time()
     91	success = linux.build_kernel(request.alltests,
     92				     request.jobs,
     93				     request.build_dir,
     94				     request.make_options)
     95	build_end = time.time()
     96	if not success:
     97		return KunitResult(KunitStatus.BUILD_FAILURE,
     98				   build_end - build_start)
     99	if not success:
    100		return KunitResult(KunitStatus.BUILD_FAILURE,
    101				   build_end - build_start)
    102	return KunitResult(KunitStatus.SUCCESS,
    103			   build_end - build_start)
    104
    105def config_and_build_tests(linux: kunit_kernel.LinuxSourceTree,
    106			   request: KunitBuildRequest) -> KunitResult:
    107	config_result = config_tests(linux, request)
    108	if config_result.status != KunitStatus.SUCCESS:
    109		return config_result
    110
    111	return build_tests(linux, request)
    112
    113def _list_tests(linux: kunit_kernel.LinuxSourceTree, request: KunitExecRequest) -> List[str]:
    114	args = ['kunit.action=list']
    115	if request.kernel_args:
    116		args.extend(request.kernel_args)
    117
    118	output = linux.run_kernel(args=args,
    119			   timeout=None if request.alltests else request.timeout,
    120			   filter_glob=request.filter_glob,
    121			   build_dir=request.build_dir)
    122	lines = kunit_parser.extract_tap_lines(output)
    123	# Hack! Drop the dummy TAP version header that the executor prints out.
    124	lines.pop()
    125
    126	# Filter out any extraneous non-test output that might have gotten mixed in.
    127	return [l for l in lines if re.match(r'^[^\s.]+\.[^\s.]+$', l)]
    128
    129def _suites_from_test_list(tests: List[str]) -> List[str]:
    130	"""Extracts all the suites from an ordered list of tests."""
    131	suites = []  # type: List[str]
    132	for t in tests:
    133		parts = t.split('.', maxsplit=2)
    134		if len(parts) != 2:
    135			raise ValueError(f'internal KUnit error, test name should be of the form "<suite>.<test>", got "{t}"')
    136		suite, case = parts
    137		if not suites or suites[-1] != suite:
    138			suites.append(suite)
    139	return suites
    140
    141
    142
    143def exec_tests(linux: kunit_kernel.LinuxSourceTree, request: KunitExecRequest) -> KunitResult:
    144	filter_globs = [request.filter_glob]
    145	if request.run_isolated:
    146		tests = _list_tests(linux, request)
    147		if request.run_isolated == 'test':
    148			filter_globs = tests
    149		if request.run_isolated == 'suite':
    150			filter_globs = _suites_from_test_list(tests)
    151			# Apply the test-part of the user's glob, if present.
    152			if '.' in request.filter_glob:
    153				test_glob = request.filter_glob.split('.', maxsplit=2)[1]
    154				filter_globs = [g + '.'+ test_glob for g in filter_globs]
    155
    156	metadata = kunit_json.Metadata(arch=linux.arch(), build_dir=request.build_dir, def_config='kunit_defconfig')
    157
    158	test_counts = kunit_parser.TestCounts()
    159	exec_time = 0.0
    160	for i, filter_glob in enumerate(filter_globs):
    161		kunit_parser.print_with_timestamp('Starting KUnit Kernel ({}/{})...'.format(i+1, len(filter_globs)))
    162
    163		test_start = time.time()
    164		run_result = linux.run_kernel(
    165			args=request.kernel_args,
    166			timeout=None if request.alltests else request.timeout,
    167			filter_glob=filter_glob,
    168			build_dir=request.build_dir)
    169
    170		_, test_result = parse_tests(request, metadata, run_result)
    171		# run_kernel() doesn't block on the kernel exiting.
    172		# That only happens after we get the last line of output from `run_result`.
    173		# So exec_time here actually contains parsing + execution time, which is fine.
    174		test_end = time.time()
    175		exec_time += test_end - test_start
    176
    177		test_counts.add_subtest_counts(test_result.counts)
    178
    179	if len(filter_globs) == 1 and test_counts.crashed > 0:
    180		bd = request.build_dir
    181		print('The kernel seems to have crashed; you can decode the stack traces with:')
    182		print('$ scripts/decode_stacktrace.sh {}/vmlinux {} < {} | tee {}/decoded.log | {} parse'.format(
    183				bd, bd, kunit_kernel.get_outfile_path(bd), bd, sys.argv[0]))
    184
    185	kunit_status = _map_to_overall_status(test_counts.get_status())
    186	return KunitResult(status=kunit_status, elapsed_time=exec_time)
    187
    188def _map_to_overall_status(test_status: kunit_parser.TestStatus) -> KunitStatus:
    189	if test_status in (kunit_parser.TestStatus.SUCCESS, kunit_parser.TestStatus.SKIPPED):
    190		return KunitStatus.SUCCESS
    191	return KunitStatus.TEST_FAILURE
    192
    193def parse_tests(request: KunitParseRequest, metadata: kunit_json.Metadata, input_data: Iterable[str]) -> Tuple[KunitResult, kunit_parser.Test]:
    194	parse_start = time.time()
    195
    196	test_result = kunit_parser.Test()
    197
    198	if request.raw_output:
    199		# Treat unparsed results as one passing test.
    200		test_result.status = kunit_parser.TestStatus.SUCCESS
    201		test_result.counts.passed = 1
    202
    203		output: Iterable[str] = input_data
    204		if request.raw_output == 'all':
    205			pass
    206		elif request.raw_output == 'kunit':
    207			output = kunit_parser.extract_tap_lines(output)
    208		for line in output:
    209			print(line.rstrip())
    210
    211	else:
    212		test_result = kunit_parser.parse_run_tests(input_data)
    213	parse_end = time.time()
    214
    215	if request.json:
    216		json_str = kunit_json.get_json_result(
    217					test=test_result,
    218					metadata=metadata)
    219		if request.json == 'stdout':
    220			print(json_str)
    221		else:
    222			with open(request.json, 'w') as f:
    223				f.write(json_str)
    224			kunit_parser.print_with_timestamp("Test results stored in %s" %
    225				os.path.abspath(request.json))
    226
    227	if test_result.status != kunit_parser.TestStatus.SUCCESS:
    228		return KunitResult(KunitStatus.TEST_FAILURE, parse_end - parse_start), test_result
    229
    230	return KunitResult(KunitStatus.SUCCESS, parse_end - parse_start), test_result
    231
    232def run_tests(linux: kunit_kernel.LinuxSourceTree,
    233	      request: KunitRequest) -> KunitResult:
    234	run_start = time.time()
    235
    236	config_result = config_tests(linux, request)
    237	if config_result.status != KunitStatus.SUCCESS:
    238		return config_result
    239
    240	build_result = build_tests(linux, request)
    241	if build_result.status != KunitStatus.SUCCESS:
    242		return build_result
    243
    244	exec_result = exec_tests(linux, request)
    245
    246	run_end = time.time()
    247
    248	kunit_parser.print_with_timestamp((
    249		'Elapsed time: %.3fs total, %.3fs configuring, %.3fs ' +
    250		'building, %.3fs running\n') % (
    251				run_end - run_start,
    252				config_result.elapsed_time,
    253				build_result.elapsed_time,
    254				exec_result.elapsed_time))
    255	return exec_result
    256
    257# Problem:
    258# $ kunit.py run --json
    259# works as one would expect and prints the parsed test results as JSON.
    260# $ kunit.py run --json suite_name
    261# would *not* pass suite_name as the filter_glob and print as json.
    262# argparse will consider it to be another way of writing
    263# $ kunit.py run --json=suite_name
    264# i.e. it would run all tests, and dump the json to a `suite_name` file.
    265# So we hackily automatically rewrite --json => --json=stdout
    266pseudo_bool_flag_defaults = {
    267		'--json': 'stdout',
    268		'--raw_output': 'kunit',
    269}
    270def massage_argv(argv: Sequence[str]) -> Sequence[str]:
    271	def massage_arg(arg: str) -> str:
    272		if arg not in pseudo_bool_flag_defaults:
    273			return arg
    274		return  f'{arg}={pseudo_bool_flag_defaults[arg]}'
    275	return list(map(massage_arg, argv))
    276
    277def get_default_jobs() -> int:
    278	return len(os.sched_getaffinity(0))
    279
    280def add_common_opts(parser) -> None:
    281	parser.add_argument('--build_dir',
    282			    help='As in the make command, it specifies the build '
    283			    'directory.',
    284			    type=str, default='.kunit', metavar='DIR')
    285	parser.add_argument('--make_options',
    286			    help='X=Y make option, can be repeated.',
    287			    action='append', metavar='X=Y')
    288	parser.add_argument('--alltests',
    289			    help='Run all KUnit tests through allyesconfig',
    290			    action='store_true')
    291	parser.add_argument('--kunitconfig',
    292			     help='Path to Kconfig fragment that enables KUnit tests.'
    293			     ' If given a directory, (e.g. lib/kunit), "/.kunitconfig" '
    294			     'will get  automatically appended.',
    295			     metavar='PATH')
    296	parser.add_argument('--kconfig_add',
    297			     help='Additional Kconfig options to append to the '
    298			     '.kunitconfig, e.g. CONFIG_KASAN=y. Can be repeated.',
    299			    action='append', metavar='CONFIG_X=Y')
    300
    301	parser.add_argument('--arch',
    302			    help=('Specifies the architecture to run tests under. '
    303				  'The architecture specified here must match the '
    304				  'string passed to the ARCH make param, '
    305				  'e.g. i386, x86_64, arm, um, etc. Non-UML '
    306				  'architectures run on QEMU.'),
    307			    type=str, default='um', metavar='ARCH')
    308
    309	parser.add_argument('--cross_compile',
    310			    help=('Sets make\'s CROSS_COMPILE variable; it should '
    311				  'be set to a toolchain path prefix (the prefix '
    312				  'of gcc and other tools in your toolchain, for '
    313				  'example `sparc64-linux-gnu-` if you have the '
    314				  'sparc toolchain installed on your system, or '
    315				  '`$HOME/toolchains/microblaze/gcc-9.2.0-nolibc/microblaze-linux/bin/microblaze-linux-` '
    316				  'if you have downloaded the microblaze toolchain '
    317				  'from the 0-day website to a directory in your '
    318				  'home directory called `toolchains`).'),
    319			    metavar='PREFIX')
    320
    321	parser.add_argument('--qemu_config',
    322			    help=('Takes a path to a path to a file containing '
    323				  'a QemuArchParams object.'),
    324			    type=str, metavar='FILE')
    325
    326def add_build_opts(parser) -> None:
    327	parser.add_argument('--jobs',
    328			    help='As in the make command, "Specifies  the number of '
    329			    'jobs (commands) to run simultaneously."',
    330			    type=int, default=get_default_jobs(), metavar='N')
    331
    332def add_exec_opts(parser) -> None:
    333	parser.add_argument('--timeout',
    334			    help='maximum number of seconds to allow for all tests '
    335			    'to run. This does not include time taken to build the '
    336			    'tests.',
    337			    type=int,
    338			    default=300,
    339			    metavar='SECONDS')
    340	parser.add_argument('filter_glob',
    341			    help='Filter which KUnit test suites/tests run at '
    342			    'boot-time, e.g. list* or list*.*del_test',
    343			    type=str,
    344			    nargs='?',
    345			    default='',
    346			    metavar='filter_glob')
    347	parser.add_argument('--kernel_args',
    348			    help='Kernel command-line parameters. Maybe be repeated',
    349			     action='append', metavar='')
    350	parser.add_argument('--run_isolated', help='If set, boot the kernel for each '
    351			    'individual suite/test. This is can be useful for debugging '
    352			    'a non-hermetic test, one that might pass/fail based on '
    353			    'what ran before it.',
    354			    type=str,
    355			    choices=['suite', 'test'])
    356
    357def add_parse_opts(parser) -> None:
    358	parser.add_argument('--raw_output', help='If set don\'t format output from kernel. '
    359			    'If set to --raw_output=kunit, filters to just KUnit output.',
    360			     type=str, nargs='?', const='all', default=None, choices=['all', 'kunit'])
    361	parser.add_argument('--json',
    362			    nargs='?',
    363			    help='Stores test results in a JSON, and either '
    364			    'prints to stdout or saves to file if a '
    365			    'filename is specified',
    366			    type=str, const='stdout', default=None, metavar='FILE')
    367
    368def main(argv, linux=None):
    369	parser = argparse.ArgumentParser(
    370			description='Helps writing and running KUnit tests.')
    371	subparser = parser.add_subparsers(dest='subcommand')
    372
    373	# The 'run' command will config, build, exec, and parse in one go.
    374	run_parser = subparser.add_parser('run', help='Runs KUnit tests.')
    375	add_common_opts(run_parser)
    376	add_build_opts(run_parser)
    377	add_exec_opts(run_parser)
    378	add_parse_opts(run_parser)
    379
    380	config_parser = subparser.add_parser('config',
    381						help='Ensures that .config contains all of '
    382						'the options in .kunitconfig')
    383	add_common_opts(config_parser)
    384
    385	build_parser = subparser.add_parser('build', help='Builds a kernel with KUnit tests')
    386	add_common_opts(build_parser)
    387	add_build_opts(build_parser)
    388
    389	exec_parser = subparser.add_parser('exec', help='Run a kernel with KUnit tests')
    390	add_common_opts(exec_parser)
    391	add_exec_opts(exec_parser)
    392	add_parse_opts(exec_parser)
    393
    394	# The 'parse' option is special, as it doesn't need the kernel source
    395	# (therefore there is no need for a build_dir, hence no add_common_opts)
    396	# and the '--file' argument is not relevant to 'run', so isn't in
    397	# add_parse_opts()
    398	parse_parser = subparser.add_parser('parse',
    399					    help='Parses KUnit results from a file, '
    400					    'and parses formatted results.')
    401	add_parse_opts(parse_parser)
    402	parse_parser.add_argument('file',
    403				  help='Specifies the file to read results from.',
    404				  type=str, nargs='?', metavar='input_file')
    405
    406	cli_args = parser.parse_args(massage_argv(argv))
    407
    408	if get_kernel_root_path():
    409		os.chdir(get_kernel_root_path())
    410
    411	if cli_args.subcommand == 'run':
    412		if not os.path.exists(cli_args.build_dir):
    413			os.mkdir(cli_args.build_dir)
    414
    415		if not linux:
    416			linux = kunit_kernel.LinuxSourceTree(cli_args.build_dir,
    417					kunitconfig_path=cli_args.kunitconfig,
    418					kconfig_add=cli_args.kconfig_add,
    419					arch=cli_args.arch,
    420					cross_compile=cli_args.cross_compile,
    421					qemu_config_path=cli_args.qemu_config)
    422
    423		request = KunitRequest(build_dir=cli_args.build_dir,
    424				       make_options=cli_args.make_options,
    425				       jobs=cli_args.jobs,
    426				       alltests=cli_args.alltests,
    427				       raw_output=cli_args.raw_output,
    428				       json=cli_args.json,
    429				       timeout=cli_args.timeout,
    430				       filter_glob=cli_args.filter_glob,
    431				       kernel_args=cli_args.kernel_args,
    432				       run_isolated=cli_args.run_isolated)
    433		result = run_tests(linux, request)
    434		if result.status != KunitStatus.SUCCESS:
    435			sys.exit(1)
    436	elif cli_args.subcommand == 'config':
    437		if cli_args.build_dir and (
    438				not os.path.exists(cli_args.build_dir)):
    439			os.mkdir(cli_args.build_dir)
    440
    441		if not linux:
    442			linux = kunit_kernel.LinuxSourceTree(cli_args.build_dir,
    443					kunitconfig_path=cli_args.kunitconfig,
    444					kconfig_add=cli_args.kconfig_add,
    445					arch=cli_args.arch,
    446					cross_compile=cli_args.cross_compile,
    447					qemu_config_path=cli_args.qemu_config)
    448
    449		request = KunitConfigRequest(build_dir=cli_args.build_dir,
    450					     make_options=cli_args.make_options)
    451		result = config_tests(linux, request)
    452		kunit_parser.print_with_timestamp((
    453			'Elapsed time: %.3fs\n') % (
    454				result.elapsed_time))
    455		if result.status != KunitStatus.SUCCESS:
    456			sys.exit(1)
    457	elif cli_args.subcommand == 'build':
    458		if not linux:
    459			linux = kunit_kernel.LinuxSourceTree(cli_args.build_dir,
    460					kunitconfig_path=cli_args.kunitconfig,
    461					kconfig_add=cli_args.kconfig_add,
    462					arch=cli_args.arch,
    463					cross_compile=cli_args.cross_compile,
    464					qemu_config_path=cli_args.qemu_config)
    465
    466		request = KunitBuildRequest(build_dir=cli_args.build_dir,
    467					    make_options=cli_args.make_options,
    468					    jobs=cli_args.jobs,
    469					    alltests=cli_args.alltests)
    470		result = config_and_build_tests(linux, request)
    471		kunit_parser.print_with_timestamp((
    472			'Elapsed time: %.3fs\n') % (
    473				result.elapsed_time))
    474		if result.status != KunitStatus.SUCCESS:
    475			sys.exit(1)
    476	elif cli_args.subcommand == 'exec':
    477		if not linux:
    478			linux = kunit_kernel.LinuxSourceTree(cli_args.build_dir,
    479					kunitconfig_path=cli_args.kunitconfig,
    480					kconfig_add=cli_args.kconfig_add,
    481					arch=cli_args.arch,
    482					cross_compile=cli_args.cross_compile,
    483					qemu_config_path=cli_args.qemu_config)
    484
    485		exec_request = KunitExecRequest(raw_output=cli_args.raw_output,
    486						build_dir=cli_args.build_dir,
    487						json=cli_args.json,
    488						timeout=cli_args.timeout,
    489						alltests=cli_args.alltests,
    490						filter_glob=cli_args.filter_glob,
    491						kernel_args=cli_args.kernel_args,
    492						run_isolated=cli_args.run_isolated)
    493		result = exec_tests(linux, exec_request)
    494		kunit_parser.print_with_timestamp((
    495			'Elapsed time: %.3fs\n') % (result.elapsed_time))
    496		if result.status != KunitStatus.SUCCESS:
    497			sys.exit(1)
    498	elif cli_args.subcommand == 'parse':
    499		if cli_args.file is None:
    500			sys.stdin.reconfigure(errors='backslashreplace')  # pytype: disable=attribute-error
    501			kunit_output = sys.stdin
    502		else:
    503			with open(cli_args.file, 'r', errors='backslashreplace') as f:
    504				kunit_output = f.read().splitlines()
    505		# We know nothing about how the result was created!
    506		metadata = kunit_json.Metadata()
    507		request = KunitParseRequest(raw_output=cli_args.raw_output,
    508					    json=cli_args.json)
    509		result, _ = parse_tests(request, metadata, kunit_output)
    510		if result.status != KunitStatus.SUCCESS:
    511			sys.exit(1)
    512	else:
    513		parser.print_help()
    514
    515if __name__ == '__main__':
    516	main(sys.argv[1:])