cachepc-linux

Fork of AMDESE/linux with modifications for CachePC side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-linux
Log | Files | Refs | README | LICENSE | sfeed.txt

vmbus_testing (16460B)


      1#!/usr/bin/env python3
      2# SPDX-License-Identifier: GPL-2.0
      3#
      4# Program to allow users to fuzz test Hyper-V drivers
      5# by interfacing with Hyper-V debugfs attributes.
      6# Current test methods available:
      7#       1. delay testing
      8#
      9# Current file/directory structure of hyper-V debugfs:
     10#       /sys/kernel/debug/hyperv/UUID
     11#       /sys/kernel/debug/hyperv/UUID/<test-state filename>
     12#       /sys/kernel/debug/hyperv/UUID/<test-method sub-directory>
     13#
     14# author: Branden Bonaby <brandonbonaby94@gmail.com>
     15
     16import os
     17import cmd
     18import argparse
     19import glob
     20from argparse import RawDescriptionHelpFormatter
     21from argparse import RawTextHelpFormatter
     22from enum import Enum
     23
     24# Do not change unless, you change the debugfs attributes
     25# in /drivers/hv/debugfs.c. All fuzz testing
     26# attributes will start with "fuzz_test".
     27
     28# debugfs path for hyperv must exist before proceeding
     29debugfs_hyperv_path = "/sys/kernel/debug/hyperv"
     30if not os.path.isdir(debugfs_hyperv_path):
     31        print("{} doesn't exist/check permissions".format(debugfs_hyperv_path))
     32        exit(-1)
     33
     34class dev_state(Enum):
     35        off = 0
     36        on = 1
     37
     38# File names, that correspond to the files created in
     39# /drivers/hv/debugfs.c
     40class f_names(Enum):
     41        state_f = "fuzz_test_state"
     42        buff_f =  "fuzz_test_buffer_interrupt_delay"
     43        mess_f =  "fuzz_test_message_delay"
     44
     45# Both single_actions and all_actions are used
     46# for error checking and to allow for some subparser
     47# names to be abbreviated. Do not abbreviate the
     48# test method names, as it will become less intuitive
     49# as to what the user can do. If you do decide to
     50# abbreviate the test method name, make sure the main
     51# function reflects this change.
     52
     53all_actions = [
     54        "disable_all",
     55        "D",
     56        "enable_all",
     57        "view_all",
     58        "V"
     59]
     60
     61single_actions = [
     62        "disable_single",
     63        "d",
     64        "enable_single",
     65        "view_single",
     66        "v"
     67]
     68
     69def main():
     70
     71        file_map = recursive_file_lookup(debugfs_hyperv_path, dict())
     72        args = parse_args()
     73        if (not args.action):
     74                print ("Error, no options selected...exiting")
     75                exit(-1)
     76        arg_set = { k for (k,v) in vars(args).items() if v and k != "action" }
     77        arg_set.add(args.action)
     78        path = args.path if "path" in arg_set else None
     79        if (path and path[-1] == "/"):
     80                path = path[:-1]
     81        validate_args_path(path, arg_set, file_map)
     82        if (path and "enable_single" in arg_set):
     83            state_path = locate_state(path, file_map)
     84            set_test_state(state_path, dev_state.on.value, args.quiet)
     85
     86        # Use subparsers as the key for different actions
     87        if ("delay" in arg_set):
     88                validate_delay_values(args.delay_time)
     89                if (args.enable_all):
     90                        set_delay_all_devices(file_map, args.delay_time,
     91                                              args.quiet)
     92                else:
     93                        set_delay_values(path, file_map, args.delay_time,
     94                                         args.quiet)
     95        elif ("disable_all" in arg_set or "D" in arg_set):
     96                disable_all_testing(file_map)
     97        elif ("disable_single" in arg_set or "d" in arg_set):
     98                disable_testing_single_device(path, file_map)
     99        elif ("view_all" in arg_set or "V" in arg_set):
    100                get_all_devices_test_status(file_map)
    101        elif ("view_single" in arg_set or  "v" in arg_set):
    102                get_device_test_values(path, file_map)
    103
    104# Get the state location
    105def locate_state(device, file_map):
    106        return file_map[device][f_names.state_f.value]
    107
    108# Validate delay values to make sure they are acceptable to
    109# enable delays on a device
    110def validate_delay_values(delay):
    111
    112        if (delay[0]  == -1 and delay[1] == -1):
    113                print("\nError, At least 1 value must be greater than 0")
    114                exit(-1)
    115        for i in delay:
    116                if (i < -1 or i == 0 or i > 1000):
    117                        print("\nError, Values must be  equal to -1 "
    118                              "or be > 0 and <= 1000")
    119                        exit(-1)
    120
    121# Validate argument path
    122def validate_args_path(path, arg_set, file_map):
    123
    124        if (not path and any(element in arg_set for element in single_actions)):
    125                print("Error, path (-p) REQUIRED for the specified option. "
    126                      "Use (-h) to check usage.")
    127                exit(-1)
    128        elif (path and any(item in arg_set for item in all_actions)):
    129                print("Error, path (-p) NOT REQUIRED for the specified option. "
    130                      "Use (-h) to check usage." )
    131                exit(-1)
    132        elif (path not in file_map and any(item in arg_set
    133                                           for item in single_actions)):
    134                print("Error, path '{}' not a valid vmbus device".format(path))
    135                exit(-1)
    136
    137# display Testing status of single device
    138def get_device_test_values(path, file_map):
    139
    140        for name in file_map[path]:
    141                file_location = file_map[path][name]
    142                print( name + " = " + str(read_test_files(file_location)))
    143
    144# Create a map of the vmbus devices and their associated files
    145# [key=device, value = [key = filename, value = file path]]
    146def recursive_file_lookup(path, file_map):
    147
    148        for f_path in glob.iglob(path + '**/*'):
    149                if (os.path.isfile(f_path)):
    150                        if (f_path.rsplit("/",2)[0] == debugfs_hyperv_path):
    151                                directory = f_path.rsplit("/",1)[0]
    152                        else:
    153                                directory = f_path.rsplit("/",2)[0]
    154                        f_name = f_path.split("/")[-1]
    155                        if (file_map.get(directory)):
    156                                file_map[directory].update({f_name:f_path})
    157                        else:
    158                                file_map[directory] = {f_name:f_path}
    159                elif (os.path.isdir(f_path)):
    160                        recursive_file_lookup(f_path,file_map)
    161        return file_map
    162
    163# display Testing state of devices
    164def get_all_devices_test_status(file_map):
    165
    166        for device in file_map:
    167                if (get_test_state(locate_state(device, file_map)) is 1):
    168                        print("Testing = ON for: {}"
    169                              .format(device.split("/")[5]))
    170                else:
    171                        print("Testing = OFF for: {}"
    172                              .format(device.split("/")[5]))
    173
    174# read the vmbus device files, path must be absolute path before calling
    175def read_test_files(path):
    176        try:
    177                with open(path,"r") as f:
    178                        file_value = f.readline().strip()
    179                return int(file_value)
    180
    181        except IOError as e:
    182                errno, strerror = e.args
    183                print("I/O error({0}): {1} on file {2}"
    184                      .format(errno, strerror, path))
    185                exit(-1)
    186        except ValueError:
    187                print ("Element to int conversion error in: \n{}".format(path))
    188                exit(-1)
    189
    190# writing to vmbus device files, path must be absolute path before calling
    191def write_test_files(path, value):
    192
    193        try:
    194                with open(path,"w") as f:
    195                        f.write("{}".format(value))
    196        except IOError as e:
    197                errno, strerror = e.args
    198                print("I/O error({0}): {1} on file {2}"
    199                      .format(errno, strerror, path))
    200                exit(-1)
    201
    202# set testing state of device
    203def set_test_state(state_path, state_value, quiet):
    204
    205        write_test_files(state_path, state_value)
    206        if (get_test_state(state_path) is 1):
    207                if (not quiet):
    208                        print("Testing = ON for device: {}"
    209                              .format(state_path.split("/")[5]))
    210        else:
    211                if (not quiet):
    212                        print("Testing = OFF for device: {}"
    213                              .format(state_path.split("/")[5]))
    214
    215# get testing state of device
    216def get_test_state(state_path):
    217        #state == 1 - test = ON
    218        #state == 0 - test = OFF
    219        return  read_test_files(state_path)
    220
    221# write 1 - 1000 microseconds, into a single device using the
    222# fuzz_test_buffer_interrupt_delay and fuzz_test_message_delay
    223# debugfs attributes
    224def set_delay_values(device, file_map, delay_length, quiet):
    225
    226        try:
    227                interrupt = file_map[device][f_names.buff_f.value]
    228                message = file_map[device][f_names.mess_f.value]
    229
    230                # delay[0]- buffer interrupt delay, delay[1]- message delay
    231                if (delay_length[0] >= 0 and delay_length[0] <= 1000):
    232                        write_test_files(interrupt, delay_length[0])
    233                if (delay_length[1] >= 0 and delay_length[1] <= 1000):
    234                        write_test_files(message, delay_length[1])
    235                if (not quiet):
    236                        print("Buffer delay testing = {} for: {}"
    237                              .format(read_test_files(interrupt),
    238                                      interrupt.split("/")[5]))
    239                        print("Message delay testing = {} for: {}"
    240                              .format(read_test_files(message),
    241                                      message.split("/")[5]))
    242        except IOError as e:
    243                errno, strerror = e.args
    244                print("I/O error({0}): {1} on files {2}{3}"
    245                      .format(errno, strerror, interrupt, message))
    246                exit(-1)
    247
    248# enabling delay testing on all devices
    249def set_delay_all_devices(file_map, delay, quiet):
    250
    251        for device in (file_map):
    252                set_test_state(locate_state(device, file_map),
    253                               dev_state.on.value,
    254                               quiet)
    255                set_delay_values(device, file_map, delay, quiet)
    256
    257# disable all testing on a SINGLE device.
    258def disable_testing_single_device(device, file_map):
    259
    260        for name in file_map[device]:
    261                file_location = file_map[device][name]
    262                write_test_files(file_location, dev_state.off.value)
    263        print("ALL testing now OFF for {}".format(device.split("/")[-1]))
    264
    265# disable all testing on ALL devices
    266def disable_all_testing(file_map):
    267
    268        for device in file_map:
    269                disable_testing_single_device(device, file_map)
    270
    271def parse_args():
    272        parser = argparse.ArgumentParser(prog = "vmbus_testing",usage ="\n"
    273                "%(prog)s [delay]   [-h] [-e|-E] -t [-p]\n"
    274                "%(prog)s [view_all       | V]      [-h]\n"
    275                "%(prog)s [disable_all    | D]      [-h]\n"
    276                "%(prog)s [disable_single | d]      [-h|-p]\n"
    277                "%(prog)s [view_single    | v]      [-h|-p]\n"
    278                "%(prog)s --version\n",
    279                description = "\nUse lsvmbus to get vmbus device type "
    280                "information.\n" "\nThe debugfs root path is "
    281                "/sys/kernel/debug/hyperv",
    282                formatter_class = RawDescriptionHelpFormatter)
    283        subparsers = parser.add_subparsers(dest = "action")
    284        parser.add_argument("--version", action = "version",
    285                version = '%(prog)s 0.1.0')
    286        parser.add_argument("-q","--quiet", action = "store_true",
    287                help = "silence none important test messages."
    288                       " This will only work when enabling testing"
    289                       " on a device.")
    290        # Use the path parser to hold the --path attribute so it can
    291        # be shared between subparsers. Also do the same for the state
    292        # parser, as all testing methods will use --enable_all and
    293        # enable_single.
    294        path_parser = argparse.ArgumentParser(add_help=False)
    295        path_parser.add_argument("-p","--path", metavar = "",
    296                help = "Debugfs path to a vmbus device. The path "
    297                "must be the absolute path to the device.")
    298        state_parser = argparse.ArgumentParser(add_help=False)
    299        state_group = state_parser.add_mutually_exclusive_group(required = True)
    300        state_group.add_argument("-E", "--enable_all", action = "store_const",
    301                                 const = "enable_all",
    302                                 help = "Enable the specified test type "
    303                                 "on ALL vmbus devices.")
    304        state_group.add_argument("-e", "--enable_single",
    305                                 action = "store_const",
    306                                 const = "enable_single",
    307                                 help = "Enable the specified test type on a "
    308                                 "SINGLE vmbus device.")
    309        parser_delay = subparsers.add_parser("delay",
    310                        parents = [state_parser, path_parser],
    311                        help = "Delay the ring buffer interrupt or the "
    312                        "ring buffer message reads in microseconds.",
    313                        prog = "vmbus_testing",
    314                        usage = "%(prog)s [-h]\n"
    315                        "%(prog)s -E -t [value] [value]\n"
    316                        "%(prog)s -e -t [value] [value] -p",
    317                        description = "Delay the ring buffer interrupt for "
    318                        "vmbus devices, or delay the ring buffer message "
    319                        "reads for vmbus devices (both in microseconds). This "
    320                        "is only on the host to guest channel.")
    321        parser_delay.add_argument("-t", "--delay_time", metavar = "", nargs = 2,
    322                        type = check_range, default =[0,0], required = (True),
    323                        help = "Set [buffer] & [message] delay time. "
    324                        "Value constraints: -1 == value "
    325                        "or 0 < value <= 1000.\n"
    326                        "Use -1 to keep the previous value for that delay "
    327                        "type, or a value > 0 <= 1000 to change the delay "
    328                        "time.")
    329        parser_dis_all = subparsers.add_parser("disable_all",
    330                        aliases = ['D'], prog = "vmbus_testing",
    331                        usage = "%(prog)s [disable_all | D] -h\n"
    332                        "%(prog)s [disable_all | D]\n",
    333                        help = "Disable ALL testing on ALL vmbus devices.",
    334                        description = "Disable ALL testing on ALL vmbus "
    335                        "devices.")
    336        parser_dis_single = subparsers.add_parser("disable_single",
    337                        aliases = ['d'],
    338                        parents = [path_parser], prog = "vmbus_testing",
    339                        usage = "%(prog)s [disable_single | d] -h\n"
    340                        "%(prog)s [disable_single | d] -p\n",
    341                        help = "Disable ALL testing on a SINGLE vmbus device.",
    342                        description = "Disable ALL testing on a SINGLE vmbus "
    343                        "device.")
    344        parser_view_all = subparsers.add_parser("view_all", aliases = ['V'],
    345                        help = "View the test state for ALL vmbus devices.",
    346                        prog = "vmbus_testing",
    347                        usage = "%(prog)s [view_all | V] -h\n"
    348                        "%(prog)s [view_all | V]\n",
    349                        description = "This shows the test state for ALL the "
    350                        "vmbus devices.")
    351        parser_view_single = subparsers.add_parser("view_single",
    352                        aliases = ['v'],parents = [path_parser],
    353                        help = "View the test values for a SINGLE vmbus "
    354                        "device.",
    355                        description = "This shows the test values for a SINGLE "
    356                        "vmbus device.", prog = "vmbus_testing",
    357                        usage = "%(prog)s [view_single | v] -h\n"
    358                        "%(prog)s [view_single | v] -p")
    359
    360        return  parser.parse_args()
    361
    362# value checking for range checking input in parser
    363def check_range(arg1):
    364
    365        try:
    366                val = int(arg1)
    367        except ValueError as err:
    368                raise argparse.ArgumentTypeError(str(err))
    369        if val < -1 or val > 1000:
    370                message = ("\n\nvalue must be -1 or  0 < value <= 1000. "
    371                           "Value program received: {}\n").format(val)
    372                raise argparse.ArgumentTypeError(message)
    373        return val
    374
    375if __name__ == "__main__":
    376        main()