cachepc-linux

Fork of AMDESE/linux with modifications for CachePC side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-linux
Log | Files | Refs | README | LICENSE | sfeed.txt

devlink_port_split.py (7517B)


      1#!/usr/bin/env python3
      2# SPDX-License-Identifier: GPL-2.0
      3
      4from subprocess import PIPE, Popen
      5import json
      6import time
      7import argparse
      8import collections
      9import sys
     10
     11#
     12# Test port split configuration using devlink-port lanes attribute.
     13# The test is skipped in case the attribute is not available.
     14#
     15# First, check that all the ports with 1 lane fail to split.
     16# Second, check that all the ports with more than 1 lane can be split
     17# to all valid configurations (e.g., split to 2, split to 4 etc.)
     18#
     19
     20
     21# Kselftest framework requirement - SKIP code is 4
     22KSFT_SKIP=4
     23Port = collections.namedtuple('Port', 'bus_info name')
     24
     25
     26def run_command(cmd, should_fail=False):
     27    """
     28    Run a command in subprocess.
     29    Return: Tuple of (stdout, stderr).
     30    """
     31
     32    p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True)
     33    stdout, stderr = p.communicate()
     34    stdout, stderr = stdout.decode(), stderr.decode()
     35
     36    if stderr != "" and not should_fail:
     37        print("Error sending command: %s" % cmd)
     38        print(stdout)
     39        print(stderr)
     40    return stdout, stderr
     41
     42
     43class devlink_ports(object):
     44    """
     45    Class that holds information on the devlink ports, required to the tests;
     46    if_names: A list of interfaces in the devlink ports.
     47    """
     48
     49    def get_if_names(dev):
     50        """
     51        Get a list of physical devlink ports.
     52        Return: Array of tuples (bus_info/port, if_name).
     53        """
     54
     55        arr = []
     56
     57        cmd = "devlink -j port show"
     58        stdout, stderr = run_command(cmd)
     59        assert stderr == ""
     60        ports = json.loads(stdout)['port']
     61
     62        for port in ports:
     63            if dev in port:
     64                if ports[port]['flavour'] == 'physical':
     65                    arr.append(Port(bus_info=port, name=ports[port]['netdev']))
     66
     67        return arr
     68
     69    def __init__(self, dev):
     70        self.if_names = devlink_ports.get_if_names(dev)
     71
     72
     73def get_max_lanes(port):
     74    """
     75    Get the $port's maximum number of lanes.
     76    Return: number of lanes, e.g. 1, 2, 4 and 8.
     77    """
     78
     79    cmd = "devlink -j port show %s" % port
     80    stdout, stderr = run_command(cmd)
     81    assert stderr == ""
     82    values = list(json.loads(stdout)['port'].values())[0]
     83
     84    if 'lanes' in values:
     85        lanes = values['lanes']
     86    else:
     87        lanes = 0
     88    return lanes
     89
     90
     91def get_split_ability(port):
     92    """
     93    Get the $port split ability.
     94    Return: split ability, true or false.
     95    """
     96
     97    cmd = "devlink -j port show %s" % port.name
     98    stdout, stderr = run_command(cmd)
     99    assert stderr == ""
    100    values = list(json.loads(stdout)['port'].values())[0]
    101
    102    return values['splittable']
    103
    104
    105def split(k, port, should_fail=False):
    106    """
    107    Split $port into $k ports.
    108    If should_fail == True, the split should fail. Otherwise, should pass.
    109    Return: Array of sub ports after splitting.
    110            If the $port wasn't split, the array will be empty.
    111    """
    112
    113    cmd = "devlink port split %s count %s" % (port.bus_info, k)
    114    stdout, stderr = run_command(cmd, should_fail=should_fail)
    115
    116    if should_fail:
    117        if not test(stderr != "", "%s is unsplittable" % port.name):
    118            print("split an unsplittable port %s" % port.name)
    119            return create_split_group(port, k)
    120    else:
    121        if stderr == "":
    122            return create_split_group(port, k)
    123        print("didn't split a splittable port %s" % port.name)
    124
    125    return []
    126
    127
    128def unsplit(port):
    129    """
    130    Unsplit $port.
    131    """
    132
    133    cmd = "devlink port unsplit %s" % port
    134    stdout, stderr = run_command(cmd)
    135    test(stderr == "", "Unsplit port %s" % port)
    136
    137
    138def exists(port, dev):
    139    """
    140    Check if $port exists in the devlink ports.
    141    Return: True is so, False otherwise.
    142    """
    143
    144    return any(dev_port.name == port
    145               for dev_port in devlink_ports.get_if_names(dev))
    146
    147
    148def exists_and_lanes(ports, lanes, dev):
    149    """
    150    Check if every port in the list $ports exists in the devlink ports and has
    151    $lanes number of lanes after splitting.
    152    Return: True if both are True, False otherwise.
    153    """
    154
    155    for port in ports:
    156        max_lanes = get_max_lanes(port)
    157        if not exists(port, dev):
    158            print("port %s doesn't exist in devlink ports" % port)
    159            return False
    160        if max_lanes != lanes:
    161            print("port %s has %d lanes, but %s were expected"
    162                  % (port, lanes, max_lanes))
    163            return False
    164    return True
    165
    166
    167def test(cond, msg):
    168    """
    169    Check $cond and print a message accordingly.
    170    Return: True is pass, False otherwise.
    171    """
    172
    173    if cond:
    174        print("TEST: %-60s [ OK ]" % msg)
    175    else:
    176        print("TEST: %-60s [FAIL]" % msg)
    177
    178    return cond
    179
    180
    181def create_split_group(port, k):
    182    """
    183    Create the split group for $port.
    184    Return: Array with $k elements, which are the split port group.
    185    """
    186
    187    return list(port.name + "s" + str(i) for i in range(k))
    188
    189
    190def split_unsplittable_port(port, k):
    191    """
    192    Test that splitting of unsplittable port fails.
    193    """
    194
    195    # split to max
    196    new_split_group = split(k, port, should_fail=True)
    197
    198    if new_split_group != []:
    199        unsplit(port.bus_info)
    200
    201
    202def split_splittable_port(port, k, lanes, dev):
    203    """
    204    Test that splitting of splittable port passes correctly.
    205    """
    206
    207    new_split_group = split(k, port)
    208
    209    # Once the split command ends, it takes some time to the sub ifaces'
    210    # to get their names. Use udevadm to continue only when all current udev
    211    # events are handled.
    212    cmd = "udevadm settle"
    213    stdout, stderr = run_command(cmd)
    214    assert stderr == ""
    215
    216    if new_split_group != []:
    217        test(exists_and_lanes(new_split_group, lanes/k, dev),
    218             "split port %s into %s" % (port.name, k))
    219
    220    unsplit(port.bus_info)
    221
    222
    223def make_parser():
    224    parser = argparse.ArgumentParser(description='A test for port splitting.')
    225    parser.add_argument('--dev',
    226                        help='The devlink handle of the device under test. ' +
    227                             'The default is the first registered devlink ' +
    228                             'handle.')
    229
    230    return parser
    231
    232
    233def main(cmdline=None):
    234    parser = make_parser()
    235    args = parser.parse_args(cmdline)
    236
    237    dev = args.dev
    238    if not dev:
    239        cmd = "devlink -j dev show"
    240        stdout, stderr = run_command(cmd)
    241        assert stderr == ""
    242
    243        devs = json.loads(stdout)['dev']
    244        if devs:
    245            dev = list(devs.keys())[0]
    246        else:
    247            print("no devlink device was found, test skipped")
    248            sys.exit(KSFT_SKIP)
    249
    250    cmd = "devlink dev show %s" % dev
    251    stdout, stderr = run_command(cmd)
    252    if stderr != "":
    253        print("devlink device %s can not be found" % dev)
    254        sys.exit(1)
    255
    256    ports = devlink_ports(dev)
    257
    258    for port in ports.if_names:
    259        max_lanes = get_max_lanes(port.name)
    260
    261        # If max lanes is 0, do not test port splitting at all
    262        if max_lanes == 0:
    263            continue
    264
    265        # If 1 lane, shouldn't be able to split
    266        elif max_lanes == 1:
    267            test(not get_split_ability(port),
    268                 "%s should not be able to split" % port.name)
    269            split_unsplittable_port(port, max_lanes)
    270
    271        # Else, splitting should pass and all the split ports should exist.
    272        else:
    273            lane = max_lanes
    274            test(get_split_ability(port),
    275                 "%s should be able to split" % port.name)
    276            while lane > 1:
    277                split_splittable_port(port, lane, max_lanes, dev)
    278
    279                lane //= 2
    280
    281
    282if __name__ == "__main__":
    283    main()