cachepc-linux

Fork of AMDESE/linux with modifications for CachePC side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-linux
Log | Files | Refs | README | LICENSE | sfeed.txt

tdc.py (25984B)


      1#!/usr/bin/env python3
      2# SPDX-License-Identifier: GPL-2.0
      3
      4"""
      5tdc.py - Linux tc (Traffic Control) unit test driver
      6
      7Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com>
      8"""
      9
     10import re
     11import os
     12import sys
     13import argparse
     14import importlib
     15import json
     16import subprocess
     17import time
     18import traceback
     19from collections import OrderedDict
     20from string import Template
     21
     22from tdc_config import *
     23from tdc_helper import *
     24
     25import TdcPlugin
     26from TdcResults import *
     27
     28class PluginDependencyException(Exception):
     29    def __init__(self, missing_pg):
     30        self.missing_pg = missing_pg
     31
     32class PluginMgrTestFail(Exception):
     33    def __init__(self, stage, output, message):
     34        self.stage = stage
     35        self.output = output
     36        self.message = message
     37
     38class PluginMgr:
     39    def __init__(self, argparser):
     40        super().__init__()
     41        self.plugins = {}
     42        self.plugin_instances = []
     43        self.failed_plugins = {}
     44        self.argparser = argparser
     45
     46        # TODO, put plugins in order
     47        plugindir = os.getenv('TDC_PLUGIN_DIR', './plugins')
     48        for dirpath, dirnames, filenames in os.walk(plugindir):
     49            for fn in filenames:
     50                if (fn.endswith('.py') and
     51                    not fn == '__init__.py' and
     52                    not fn.startswith('#') and
     53                    not fn.startswith('.#')):
     54                    mn = fn[0:-3]
     55                    foo = importlib.import_module('plugins.' + mn)
     56                    self.plugins[mn] = foo
     57                    self.plugin_instances.append(foo.SubPlugin())
     58
     59    def load_plugin(self, pgdir, pgname):
     60        pgname = pgname[0:-3]
     61        foo = importlib.import_module('{}.{}'.format(pgdir, pgname))
     62        self.plugins[pgname] = foo
     63        self.plugin_instances.append(foo.SubPlugin())
     64        self.plugin_instances[-1].check_args(self.args, None)
     65
     66    def get_required_plugins(self, testlist):
     67        '''
     68        Get all required plugins from the list of test cases and return
     69        all unique items.
     70        '''
     71        reqs = []
     72        for t in testlist:
     73            try:
     74                if 'requires' in t['plugins']:
     75                    if isinstance(t['plugins']['requires'], list):
     76                        reqs.extend(t['plugins']['requires'])
     77                    else:
     78                        reqs.append(t['plugins']['requires'])
     79            except KeyError:
     80                continue
     81        reqs = get_unique_item(reqs)
     82        return reqs
     83
     84    def load_required_plugins(self, reqs, parser, args, remaining):
     85        '''
     86        Get all required plugins from the list of test cases and load any plugin
     87        that is not already enabled.
     88        '''
     89        pgd = ['plugin-lib', 'plugin-lib-custom']
     90        pnf = []
     91
     92        for r in reqs:
     93            if r not in self.plugins:
     94                fname = '{}.py'.format(r)
     95                source_path = []
     96                for d in pgd:
     97                    pgpath = '{}/{}'.format(d, fname)
     98                    if os.path.isfile(pgpath):
     99                        source_path.append(pgpath)
    100                if len(source_path) == 0:
    101                    print('ERROR: unable to find required plugin {}'.format(r))
    102                    pnf.append(fname)
    103                    continue
    104                elif len(source_path) > 1:
    105                    print('WARNING: multiple copies of plugin {} found, using version found')
    106                    print('at {}'.format(source_path[0]))
    107                pgdir = source_path[0]
    108                pgdir = pgdir.split('/')[0]
    109                self.load_plugin(pgdir, fname)
    110        if len(pnf) > 0:
    111            raise PluginDependencyException(pnf)
    112
    113        parser = self.call_add_args(parser)
    114        (args, remaining) = parser.parse_known_args(args=remaining, namespace=args)
    115        return args
    116
    117    def call_pre_suite(self, testcount, testidlist):
    118        for pgn_inst in self.plugin_instances:
    119            pgn_inst.pre_suite(testcount, testidlist)
    120
    121    def call_post_suite(self, index):
    122        for pgn_inst in reversed(self.plugin_instances):
    123            pgn_inst.post_suite(index)
    124
    125    def call_pre_case(self, caseinfo, *, test_skip=False):
    126        for pgn_inst in self.plugin_instances:
    127            try:
    128                pgn_inst.pre_case(caseinfo, test_skip)
    129            except Exception as ee:
    130                print('exception {} in call to pre_case for {} plugin'.
    131                      format(ee, pgn_inst.__class__))
    132                print('test_ordinal is {}'.format(test_ordinal))
    133                print('testid is {}'.format(caseinfo['id']))
    134                raise
    135
    136    def call_post_case(self):
    137        for pgn_inst in reversed(self.plugin_instances):
    138            pgn_inst.post_case()
    139
    140    def call_pre_execute(self):
    141        for pgn_inst in self.plugin_instances:
    142            pgn_inst.pre_execute()
    143
    144    def call_post_execute(self):
    145        for pgn_inst in reversed(self.plugin_instances):
    146            pgn_inst.post_execute()
    147
    148    def call_add_args(self, parser):
    149        for pgn_inst in self.plugin_instances:
    150            parser = pgn_inst.add_args(parser)
    151        return parser
    152
    153    def call_check_args(self, args, remaining):
    154        for pgn_inst in self.plugin_instances:
    155            pgn_inst.check_args(args, remaining)
    156
    157    def call_adjust_command(self, stage, command):
    158        for pgn_inst in self.plugin_instances:
    159            command = pgn_inst.adjust_command(stage, command)
    160        return command
    161
    162    def set_args(self, args):
    163        self.args = args
    164
    165    @staticmethod
    166    def _make_argparser(args):
    167        self.argparser = argparse.ArgumentParser(
    168            description='Linux TC unit tests')
    169
    170def replace_keywords(cmd):
    171    """
    172    For a given executable command, substitute any known
    173    variables contained within NAMES with the correct values
    174    """
    175    tcmd = Template(cmd)
    176    subcmd = tcmd.safe_substitute(NAMES)
    177    return subcmd
    178
    179
    180def exec_cmd(args, pm, stage, command):
    181    """
    182    Perform any required modifications on an executable command, then run
    183    it in a subprocess and return the results.
    184    """
    185    if len(command.strip()) == 0:
    186        return None, None
    187    if '$' in command:
    188        command = replace_keywords(command)
    189
    190    command = pm.call_adjust_command(stage, command)
    191    if args.verbose > 0:
    192        print('command "{}"'.format(command))
    193    proc = subprocess.Popen(command,
    194        shell=True,
    195        stdout=subprocess.PIPE,
    196        stderr=subprocess.PIPE,
    197        env=ENVIR)
    198
    199    try:
    200        (rawout, serr) = proc.communicate(timeout=NAMES['TIMEOUT'])
    201        if proc.returncode != 0 and len(serr) > 0:
    202            foutput = serr.decode("utf-8", errors="ignore")
    203        else:
    204            foutput = rawout.decode("utf-8", errors="ignore")
    205    except subprocess.TimeoutExpired:
    206        foutput = "Command \"{}\" timed out\n".format(command)
    207        proc.returncode = 255
    208
    209    proc.stdout.close()
    210    proc.stderr.close()
    211    return proc, foutput
    212
    213
    214def prepare_env(args, pm, stage, prefix, cmdlist, output = None):
    215    """
    216    Execute the setup/teardown commands for a test case.
    217    Optionally terminate test execution if the command fails.
    218    """
    219    if args.verbose > 0:
    220        print('{}'.format(prefix))
    221    for cmdinfo in cmdlist:
    222        if isinstance(cmdinfo, list):
    223            exit_codes = cmdinfo[1:]
    224            cmd = cmdinfo[0]
    225        else:
    226            exit_codes = [0]
    227            cmd = cmdinfo
    228
    229        if not cmd:
    230            continue
    231
    232        (proc, foutput) = exec_cmd(args, pm, stage, cmd)
    233
    234        if proc and (proc.returncode not in exit_codes):
    235            print('', file=sys.stderr)
    236            print("{} *** Could not execute: \"{}\"".format(prefix, cmd),
    237                  file=sys.stderr)
    238            print("\n{} *** Error message: \"{}\"".format(prefix, foutput),
    239                  file=sys.stderr)
    240            print("returncode {}; expected {}".format(proc.returncode,
    241                                                      exit_codes))
    242            print("\n{} *** Aborting test run.".format(prefix), file=sys.stderr)
    243            print("\n\n{} *** stdout ***".format(proc.stdout), file=sys.stderr)
    244            print("\n\n{} *** stderr ***".format(proc.stderr), file=sys.stderr)
    245            raise PluginMgrTestFail(
    246                stage, output,
    247                '"{}" did not complete successfully'.format(prefix))
    248
    249def run_one_test(pm, args, index, tidx):
    250    global NAMES
    251    result = True
    252    tresult = ""
    253    tap = ""
    254    res = TestResult(tidx['id'], tidx['name'])
    255    if args.verbose > 0:
    256        print("\t====================\n=====> ", end="")
    257    print("Test " + tidx["id"] + ": " + tidx["name"])
    258
    259    if 'skip' in tidx:
    260        if tidx['skip'] == 'yes':
    261            res = TestResult(tidx['id'], tidx['name'])
    262            res.set_result(ResultState.skip)
    263            res.set_errormsg('Test case designated as skipped.')
    264            pm.call_pre_case(tidx, test_skip=True)
    265            pm.call_post_execute()
    266            return res
    267
    268    # populate NAMES with TESTID for this test
    269    NAMES['TESTID'] = tidx['id']
    270
    271    pm.call_pre_case(tidx)
    272    prepare_env(args, pm, 'setup', "-----> prepare stage", tidx["setup"])
    273
    274    if (args.verbose > 0):
    275        print('-----> execute stage')
    276    pm.call_pre_execute()
    277    (p, procout) = exec_cmd(args, pm, 'execute', tidx["cmdUnderTest"])
    278    if p:
    279        exit_code = p.returncode
    280    else:
    281        exit_code = None
    282
    283    pm.call_post_execute()
    284
    285    if (exit_code is None or exit_code != int(tidx["expExitCode"])):
    286        print("exit: {!r}".format(exit_code))
    287        print("exit: {}".format(int(tidx["expExitCode"])))
    288        #print("exit: {!r} {}".format(exit_code, int(tidx["expExitCode"])))
    289        res.set_result(ResultState.fail)
    290        res.set_failmsg('Command exited with {}, expected {}\n{}'.format(exit_code, tidx["expExitCode"], procout))
    291        print(procout)
    292    else:
    293        if args.verbose > 0:
    294            print('-----> verify stage')
    295        match_pattern = re.compile(
    296            str(tidx["matchPattern"]), re.DOTALL | re.MULTILINE)
    297        (p, procout) = exec_cmd(args, pm, 'verify', tidx["verifyCmd"])
    298        if procout:
    299            match_index = re.findall(match_pattern, procout)
    300            if len(match_index) != int(tidx["matchCount"]):
    301                res.set_result(ResultState.fail)
    302                res.set_failmsg('Could not match regex pattern. Verify command output:\n{}'.format(procout))
    303            else:
    304                res.set_result(ResultState.success)
    305        elif int(tidx["matchCount"]) != 0:
    306            res.set_result(ResultState.fail)
    307            res.set_failmsg('No output generated by verify command.')
    308        else:
    309            res.set_result(ResultState.success)
    310
    311    prepare_env(args, pm, 'teardown', '-----> teardown stage', tidx['teardown'], procout)
    312    pm.call_post_case()
    313
    314    index += 1
    315
    316    # remove TESTID from NAMES
    317    del(NAMES['TESTID'])
    318    return res
    319
    320def test_runner(pm, args, filtered_tests):
    321    """
    322    Driver function for the unit tests.
    323
    324    Prints information about the tests being run, executes the setup and
    325    teardown commands and the command under test itself. Also determines
    326    success/failure based on the information in the test case and generates
    327    TAP output accordingly.
    328    """
    329    testlist = filtered_tests
    330    tcount = len(testlist)
    331    index = 1
    332    tap = ''
    333    badtest = None
    334    stage = None
    335    emergency_exit = False
    336    emergency_exit_message = ''
    337
    338    tsr = TestSuiteReport()
    339
    340    try:
    341        pm.call_pre_suite(tcount, [tidx['id'] for tidx in testlist])
    342    except Exception as ee:
    343        ex_type, ex, ex_tb = sys.exc_info()
    344        print('Exception {} {} (caught in pre_suite).'.
    345              format(ex_type, ex))
    346        traceback.print_tb(ex_tb)
    347        emergency_exit_message = 'EMERGENCY EXIT, call_pre_suite failed with exception {} {}\n'.format(ex_type, ex)
    348        emergency_exit = True
    349        stage = 'pre-SUITE'
    350
    351    if emergency_exit:
    352        pm.call_post_suite(index)
    353        return emergency_exit_message
    354    if args.verbose > 1:
    355        print('give test rig 2 seconds to stabilize')
    356    time.sleep(2)
    357    for tidx in testlist:
    358        if "flower" in tidx["category"] and args.device == None:
    359            errmsg = "Tests using the DEV2 variable must define the name of a "
    360            errmsg += "physical NIC with the -d option when running tdc.\n"
    361            errmsg += "Test has been skipped."
    362            if args.verbose > 1:
    363                print(errmsg)
    364            res = TestResult(tidx['id'], tidx['name'])
    365            res.set_result(ResultState.skip)
    366            res.set_errormsg(errmsg)
    367            tsr.add_resultdata(res)
    368            continue
    369        try:
    370            badtest = tidx  # in case it goes bad
    371            res = run_one_test(pm, args, index, tidx)
    372            tsr.add_resultdata(res)
    373        except PluginMgrTestFail as pmtf:
    374            ex_type, ex, ex_tb = sys.exc_info()
    375            stage = pmtf.stage
    376            message = pmtf.message
    377            output = pmtf.output
    378            res = TestResult(tidx['id'], tidx['name'])
    379            res.set_result(ResultState.skip)
    380            res.set_errormsg(pmtf.message)
    381            res.set_failmsg(pmtf.output)
    382            tsr.add_resultdata(res)
    383            index += 1
    384            print(message)
    385            print('Exception {} {} (caught in test_runner, running test {} {} {} stage {})'.
    386                  format(ex_type, ex, index, tidx['id'], tidx['name'], stage))
    387            print('---------------')
    388            print('traceback')
    389            traceback.print_tb(ex_tb)
    390            print('---------------')
    391            if stage == 'teardown':
    392                print('accumulated output for this test:')
    393                if pmtf.output:
    394                    print(pmtf.output)
    395            print('---------------')
    396            break
    397        index += 1
    398
    399    # if we failed in setup or teardown,
    400    # fill in the remaining tests with ok-skipped
    401    count = index
    402
    403    if tcount + 1 != count:
    404        for tidx in testlist[count - 1:]:
    405            res = TestResult(tidx['id'], tidx['name'])
    406            res.set_result(ResultState.skip)
    407            msg = 'skipped - previous {} failed {} {}'.format(stage,
    408                index, badtest.get('id', '--Unknown--'))
    409            res.set_errormsg(msg)
    410            tsr.add_resultdata(res)
    411            count += 1
    412
    413    if args.pause:
    414        print('Want to pause\nPress enter to continue ...')
    415        if input(sys.stdin):
    416            print('got something on stdin')
    417
    418    pm.call_post_suite(index)
    419
    420    return tsr
    421
    422def has_blank_ids(idlist):
    423    """
    424    Search the list for empty ID fields and return true/false accordingly.
    425    """
    426    return not(all(k for k in idlist))
    427
    428
    429def load_from_file(filename):
    430    """
    431    Open the JSON file containing the test cases and return them
    432    as list of ordered dictionary objects.
    433    """
    434    try:
    435        with open(filename) as test_data:
    436            testlist = json.load(test_data, object_pairs_hook=OrderedDict)
    437    except json.JSONDecodeError as jde:
    438        print('IGNORING test case file {}\n\tBECAUSE:  {}'.format(filename, jde))
    439        testlist = list()
    440    else:
    441        idlist = get_id_list(testlist)
    442        if (has_blank_ids(idlist)):
    443            for k in testlist:
    444                k['filename'] = filename
    445    return testlist
    446
    447
    448def args_parse():
    449    """
    450    Create the argument parser.
    451    """
    452    parser = argparse.ArgumentParser(description='Linux TC unit tests')
    453    return parser
    454
    455
    456def set_args(parser):
    457    """
    458    Set the command line arguments for tdc.
    459    """
    460    parser.add_argument(
    461        '--outfile', type=str,
    462        help='Path to the file in which results should be saved. ' +
    463        'Default target is the current directory.')
    464    parser.add_argument(
    465        '-p', '--path', type=str,
    466        help='The full path to the tc executable to use')
    467    sg = parser.add_argument_group(
    468        'selection', 'select which test cases: ' +
    469        'files plus directories; filtered by categories plus testids')
    470    ag = parser.add_argument_group(
    471        'action', 'select action to perform on selected test cases')
    472
    473    sg.add_argument(
    474        '-D', '--directory', nargs='+', metavar='DIR',
    475        help='Collect tests from the specified directory(ies) ' +
    476        '(default [tc-tests])')
    477    sg.add_argument(
    478        '-f', '--file', nargs='+', metavar='FILE',
    479        help='Run tests from the specified file(s)')
    480    sg.add_argument(
    481        '-c', '--category', nargs='*', metavar='CATG', default=['+c'],
    482        help='Run tests only from the specified category/ies, ' +
    483        'or if no category/ies is/are specified, list known categories.')
    484    sg.add_argument(
    485        '-e', '--execute', nargs='+', metavar='ID',
    486        help='Execute the specified test cases with specified IDs')
    487    ag.add_argument(
    488        '-l', '--list', action='store_true',
    489        help='List all test cases, or those only within the specified category')
    490    ag.add_argument(
    491        '-s', '--show', action='store_true', dest='showID',
    492        help='Display the selected test cases')
    493    ag.add_argument(
    494        '-i', '--id', action='store_true', dest='gen_id',
    495        help='Generate ID numbers for new test cases')
    496    parser.add_argument(
    497        '-v', '--verbose', action='count', default=0,
    498        help='Show the commands that are being run')
    499    parser.add_argument(
    500        '--format', default='tap', const='tap', nargs='?',
    501        choices=['none', 'xunit', 'tap'],
    502        help='Specify the format for test results. (Default: TAP)')
    503    parser.add_argument('-d', '--device',
    504                        help='Execute test cases that use a physical device, ' +
    505                        'where DEVICE is its name. (If not defined, tests ' +
    506                        'that require a physical device will be skipped)')
    507    parser.add_argument(
    508        '-P', '--pause', action='store_true',
    509        help='Pause execution just before post-suite stage')
    510    return parser
    511
    512
    513def check_default_settings(args, remaining, pm):
    514    """
    515    Process any arguments overriding the default settings,
    516    and ensure the settings are correct.
    517    """
    518    # Allow for overriding specific settings
    519    global NAMES
    520
    521    if args.path != None:
    522        NAMES['TC'] = args.path
    523    if args.device != None:
    524        NAMES['DEV2'] = args.device
    525    if 'TIMEOUT' not in NAMES:
    526        NAMES['TIMEOUT'] = None
    527    if not os.path.isfile(NAMES['TC']):
    528        print("The specified tc path " + NAMES['TC'] + " does not exist.")
    529        exit(1)
    530
    531    pm.call_check_args(args, remaining)
    532
    533
    534def get_id_list(alltests):
    535    """
    536    Generate a list of all IDs in the test cases.
    537    """
    538    return [x["id"] for x in alltests]
    539
    540
    541def check_case_id(alltests):
    542    """
    543    Check for duplicate test case IDs.
    544    """
    545    idl = get_id_list(alltests)
    546    return [x for x in idl if idl.count(x) > 1]
    547
    548
    549def does_id_exist(alltests, newid):
    550    """
    551    Check if a given ID already exists in the list of test cases.
    552    """
    553    idl = get_id_list(alltests)
    554    return (any(newid == x for x in idl))
    555
    556
    557def generate_case_ids(alltests):
    558    """
    559    If a test case has a blank ID field, generate a random hex ID for it
    560    and then write the test cases back to disk.
    561    """
    562    import random
    563    for c in alltests:
    564        if (c["id"] == ""):
    565            while True:
    566                newid = str('{:04x}'.format(random.randrange(16**4)))
    567                if (does_id_exist(alltests, newid)):
    568                    continue
    569                else:
    570                    c['id'] = newid
    571                    break
    572
    573    ufilename = []
    574    for c in alltests:
    575        if ('filename' in c):
    576            ufilename.append(c['filename'])
    577    ufilename = get_unique_item(ufilename)
    578    for f in ufilename:
    579        testlist = []
    580        for t in alltests:
    581            if 'filename' in t:
    582                if t['filename'] == f:
    583                    del t['filename']
    584                    testlist.append(t)
    585        outfile = open(f, "w")
    586        json.dump(testlist, outfile, indent=4)
    587        outfile.write("\n")
    588        outfile.close()
    589
    590def filter_tests_by_id(args, testlist):
    591    '''
    592    Remove tests from testlist that are not in the named id list.
    593    If id list is empty, return empty list.
    594    '''
    595    newlist = list()
    596    if testlist and args.execute:
    597        target_ids = args.execute
    598
    599        if isinstance(target_ids, list) and (len(target_ids) > 0):
    600            newlist = list(filter(lambda x: x['id'] in target_ids, testlist))
    601    return newlist
    602
    603def filter_tests_by_category(args, testlist):
    604    '''
    605    Remove tests from testlist that are not in a named category.
    606    '''
    607    answer = list()
    608    if args.category and testlist:
    609        test_ids = list()
    610        for catg in set(args.category):
    611            if catg == '+c':
    612                continue
    613            print('considering category {}'.format(catg))
    614            for tc in testlist:
    615                if catg in tc['category'] and tc['id'] not in test_ids:
    616                    answer.append(tc)
    617                    test_ids.append(tc['id'])
    618
    619    return answer
    620
    621
    622def get_test_cases(args):
    623    """
    624    If a test case file is specified, retrieve tests from that file.
    625    Otherwise, glob for all json files in subdirectories and load from
    626    each one.
    627    Also, if requested, filter by category, and add tests matching
    628    certain ids.
    629    """
    630    import fnmatch
    631
    632    flist = []
    633    testdirs = ['tc-tests']
    634
    635    if args.file:
    636        # at least one file was specified - remove the default directory
    637        testdirs = []
    638
    639        for ff in args.file:
    640            if not os.path.isfile(ff):
    641                print("IGNORING file " + ff + "\n\tBECAUSE does not exist.")
    642            else:
    643                flist.append(os.path.abspath(ff))
    644
    645    if args.directory:
    646        testdirs = args.directory
    647
    648    for testdir in testdirs:
    649        for root, dirnames, filenames in os.walk(testdir):
    650            for filename in fnmatch.filter(filenames, '*.json'):
    651                candidate = os.path.abspath(os.path.join(root, filename))
    652                if candidate not in testdirs:
    653                    flist.append(candidate)
    654
    655    alltestcases = list()
    656    for casefile in flist:
    657        alltestcases = alltestcases + (load_from_file(casefile))
    658
    659    allcatlist = get_test_categories(alltestcases)
    660    allidlist = get_id_list(alltestcases)
    661
    662    testcases_by_cats = get_categorized_testlist(alltestcases, allcatlist)
    663    idtestcases = filter_tests_by_id(args, alltestcases)
    664    cattestcases = filter_tests_by_category(args, alltestcases)
    665
    666    cat_ids = [x['id'] for x in cattestcases]
    667    if args.execute:
    668        if args.category:
    669            alltestcases = cattestcases + [x for x in idtestcases if x['id'] not in cat_ids]
    670        else:
    671            alltestcases = idtestcases
    672    else:
    673        if cat_ids:
    674            alltestcases = cattestcases
    675        else:
    676            # just accept the existing value of alltestcases,
    677            # which has been filtered by file/directory
    678            pass
    679
    680    return allcatlist, allidlist, testcases_by_cats, alltestcases
    681
    682
    683def set_operation_mode(pm, parser, args, remaining):
    684    """
    685    Load the test case data and process remaining arguments to determine
    686    what the script should do for this run, and call the appropriate
    687    function.
    688    """
    689    ucat, idlist, testcases, alltests = get_test_cases(args)
    690
    691    if args.gen_id:
    692        if (has_blank_ids(idlist)):
    693            alltests = generate_case_ids(alltests)
    694        else:
    695            print("No empty ID fields found in test files.")
    696        exit(0)
    697
    698    duplicate_ids = check_case_id(alltests)
    699    if (len(duplicate_ids) > 0):
    700        print("The following test case IDs are not unique:")
    701        print(str(set(duplicate_ids)))
    702        print("Please correct them before continuing.")
    703        exit(1)
    704
    705    if args.showID:
    706        for atest in alltests:
    707            print_test_case(atest)
    708        exit(0)
    709
    710    if isinstance(args.category, list) and (len(args.category) == 0):
    711        print("Available categories:")
    712        print_sll(ucat)
    713        exit(0)
    714
    715    if args.list:
    716        list_test_cases(alltests)
    717        exit(0)
    718
    719    exit_code = 0 # KSFT_PASS
    720    if len(alltests):
    721        req_plugins = pm.get_required_plugins(alltests)
    722        try:
    723            args = pm.load_required_plugins(req_plugins, parser, args, remaining)
    724        except PluginDependencyException as pde:
    725            print('The following plugins were not found:')
    726            print('{}'.format(pde.missing_pg))
    727        catresults = test_runner(pm, args, alltests)
    728        if catresults.count_failures() != 0:
    729            exit_code = 1 # KSFT_FAIL
    730        if args.format == 'none':
    731            print('Test results output suppression requested\n')
    732        else:
    733            print('\nAll test results: \n')
    734            if args.format == 'xunit':
    735                suffix = 'xml'
    736                res = catresults.format_xunit()
    737            elif args.format == 'tap':
    738                suffix = 'tap'
    739                res = catresults.format_tap()
    740            print(res)
    741            print('\n\n')
    742            if not args.outfile:
    743                fname = 'test-results.{}'.format(suffix)
    744            else:
    745                fname = args.outfile
    746            with open(fname, 'w') as fh:
    747                fh.write(res)
    748                fh.close()
    749                if os.getenv('SUDO_UID') is not None:
    750                    os.chown(fname, uid=int(os.getenv('SUDO_UID')),
    751                        gid=int(os.getenv('SUDO_GID')))
    752    else:
    753        print('No tests found\n')
    754        exit_code = 4 # KSFT_SKIP
    755    exit(exit_code)
    756
    757def main():
    758    """
    759    Start of execution; set up argument parser and get the arguments,
    760    and start operations.
    761    """
    762    parser = args_parse()
    763    parser = set_args(parser)
    764    pm = PluginMgr(parser)
    765    parser = pm.call_add_args(parser)
    766    (args, remaining) = parser.parse_known_args()
    767    args.NAMES = NAMES
    768    pm.set_args(args)
    769    check_default_settings(args, remaining, pm)
    770    if args.verbose > 2:
    771        print('args is {}'.format(args))
    772
    773    set_operation_mode(pm, parser, args, remaining)
    774
    775if __name__ == "__main__":
    776    main()