Path: blob/master/tools/testing/selftests/drivers/net/lib/py/env.py
29581 views
# SPDX-License-Identifier: GPL-2.012import os3import time4from pathlib import Path5from lib.py import KsftSkipEx, KsftXfailEx6from lib.py import ksft_setup, wait_file7from lib.py import cmd, ethtool, ip, CmdExitFailure8from lib.py import NetNS, NetdevSimDev9from .remote import Remote101112class NetDrvEnvBase:13"""14Base class for a NIC / host environments1516Attributes:17test_dir: Path to the source directory of the test18net_lib_dir: Path to the net/lib directory19"""20def __init__(self, src_path):21self.src_path = Path(src_path)22self.test_dir = self.src_path.parent.resolve()23self.net_lib_dir = (Path(__file__).parent / "../../../../net/lib").resolve()2425self.env = self._load_env_file()2627# Following attrs must be set be inheriting classes28self.dev = None2930def _load_env_file(self):31env = os.environ.copy()3233src_dir = Path(self.src_path).parent.resolve()34if not (src_dir / "net.config").exists():35return ksft_setup(env)3637with open((src_dir / "net.config").as_posix(), 'r') as fp:38for line in fp.readlines():39full_file = line40# Strip comments41pos = line.find("#")42if pos >= 0:43line = line[:pos]44line = line.strip()45if not line:46continue47pair = line.split('=', maxsplit=1)48if len(pair) != 2:49raise Exception("Can't parse configuration line:", full_file)50env[pair[0]] = pair[1]51return ksft_setup(env)5253def __del__(self):54pass5556def __enter__(self):57ip(f"link set dev {self.dev['ifname']} up")58wait_file(f"/sys/class/net/{self.dev['ifname']}/carrier",59lambda x: x.strip() == "1")6061return self6263def __exit__(self, ex_type, ex_value, ex_tb):64"""65__exit__ gets called at the end of a "with" block.66"""67self.__del__()686970class NetDrvEnv(NetDrvEnvBase):71"""72Class for a single NIC / host env, with no remote end73"""74def __init__(self, src_path, nsim_test=None, **kwargs):75super().__init__(src_path)7677self._ns = None7879if 'NETIF' in self.env:80if nsim_test is True:81raise KsftXfailEx("Test only works on netdevsim")8283self.dev = ip("-d link show dev " + self.env['NETIF'], json=True)[0]84else:85if nsim_test is False:86raise KsftXfailEx("Test does not work on netdevsim")8788self._ns = NetdevSimDev(**kwargs)89self.dev = self._ns.nsims[0].dev90self.ifname = self.dev['ifname']91self.ifindex = self.dev['ifindex']9293def __del__(self):94if self._ns:95self._ns.remove()96self._ns = None979899class NetDrvEpEnv(NetDrvEnvBase):100"""101Class for an environment with a local device and "remote endpoint"102which can be used to send traffic in.103104For local testing it creates two network namespaces and a pair105of netdevsim devices.106"""107108# Network prefixes used for local tests109nsim_v4_pfx = "192.0.2."110nsim_v6_pfx = "2001:db8::"111112def __init__(self, src_path, nsim_test=None):113super().__init__(src_path)114115self._stats_settle_time = None116117# Things we try to destroy118self.remote = None119# These are for local testing state120self._netns = None121self._ns = None122self._ns_peer = None123124self.addr_v = { "4": None, "6": None }125self.remote_addr_v = { "4": None, "6": None }126127if "NETIF" in self.env:128if nsim_test is True:129raise KsftXfailEx("Test only works on netdevsim")130self._check_env()131132self.dev = ip("-d link show dev " + self.env['NETIF'], json=True)[0]133134self.addr_v["4"] = self.env.get("LOCAL_V4")135self.addr_v["6"] = self.env.get("LOCAL_V6")136self.remote_addr_v["4"] = self.env.get("REMOTE_V4")137self.remote_addr_v["6"] = self.env.get("REMOTE_V6")138kind = self.env["REMOTE_TYPE"]139args = self.env["REMOTE_ARGS"]140else:141if nsim_test is False:142raise KsftXfailEx("Test does not work on netdevsim")143144self.create_local()145146self.dev = self._ns.nsims[0].dev147148self.addr_v["4"] = self.nsim_v4_pfx + "1"149self.addr_v["6"] = self.nsim_v6_pfx + "1"150self.remote_addr_v["4"] = self.nsim_v4_pfx + "2"151self.remote_addr_v["6"] = self.nsim_v6_pfx + "2"152kind = "netns"153args = self._netns.name154155self.remote = Remote(kind, args, src_path)156157self.addr_ipver = "6" if self.addr_v["6"] else "4"158self.addr = self.addr_v[self.addr_ipver]159self.remote_addr = self.remote_addr_v[self.addr_ipver]160161# Bracketed addresses, some commands need IPv6 to be inside []162self.baddr = f"[{self.addr_v['6']}]" if self.addr_v["6"] else self.addr_v["4"]163self.remote_baddr = f"[{self.remote_addr_v['6']}]" if self.remote_addr_v["6"] else self.remote_addr_v["4"]164165self.ifname = self.dev['ifname']166self.ifindex = self.dev['ifindex']167168# resolve remote interface name169self.remote_ifname = self.resolve_remote_ifc()170171self._required_cmd = {}172173def create_local(self):174self._netns = NetNS()175self._ns = NetdevSimDev()176self._ns_peer = NetdevSimDev(ns=self._netns)177178with open("/proc/self/ns/net") as nsfd0, \179open("/var/run/netns/" + self._netns.name) as nsfd1:180ifi0 = self._ns.nsims[0].ifindex181ifi1 = self._ns_peer.nsims[0].ifindex182NetdevSimDev.ctrl_write('link_device',183f'{nsfd0.fileno()}:{ifi0} {nsfd1.fileno()}:{ifi1}')184185ip(f" addr add dev {self._ns.nsims[0].ifname} {self.nsim_v4_pfx}1/24")186ip(f"-6 addr add dev {self._ns.nsims[0].ifname} {self.nsim_v6_pfx}1/64 nodad")187ip(f" link set dev {self._ns.nsims[0].ifname} up")188189ip(f" addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v4_pfx}2/24", ns=self._netns)190ip(f"-6 addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v6_pfx}2/64 nodad", ns=self._netns)191ip(f" link set dev {self._ns_peer.nsims[0].ifname} up", ns=self._netns)192193def _check_env(self):194vars_needed = [195["LOCAL_V4", "LOCAL_V6"],196["REMOTE_V4", "REMOTE_V6"],197["REMOTE_TYPE"],198["REMOTE_ARGS"]199]200missing = []201202for choice in vars_needed:203for entry in choice:204if entry in self.env:205break206else:207missing.append(choice)208# Make sure v4 / v6 configs are symmetric209if ("LOCAL_V6" in self.env) != ("REMOTE_V6" in self.env):210missing.append(["LOCAL_V6", "REMOTE_V6"])211if ("LOCAL_V4" in self.env) != ("REMOTE_V4" in self.env):212missing.append(["LOCAL_V4", "REMOTE_V4"])213if missing:214raise Exception("Invalid environment, missing configuration:", missing,215"Please see tools/testing/selftests/drivers/net/README.rst")216217def resolve_remote_ifc(self):218v4 = v6 = None219if self.remote_addr_v["4"]:220v4 = ip("addr show to " + self.remote_addr_v["4"], json=True, host=self.remote)221if self.remote_addr_v["6"]:222v6 = ip("addr show to " + self.remote_addr_v["6"], json=True, host=self.remote)223if v4 and v6 and v4[0]["ifname"] != v6[0]["ifname"]:224raise Exception("Can't resolve remote interface name, v4 and v6 don't match")225if (v4 and len(v4) > 1) or (v6 and len(v6) > 1):226raise Exception("Can't resolve remote interface name, multiple interfaces match")227return v6[0]["ifname"] if v6 else v4[0]["ifname"]228229def __del__(self):230if self._ns:231self._ns.remove()232self._ns = None233if self._ns_peer:234self._ns_peer.remove()235self._ns_peer = None236if self._netns:237del self._netns238self._netns = None239if self.remote:240del self.remote241self.remote = None242243def require_ipver(self, ipver):244if not self.addr_v[ipver] or not self.remote_addr_v[ipver]:245raise KsftSkipEx(f"Test requires IPv{ipver} connectivity")246247def require_nsim(self):248if self._ns is None:249raise KsftXfailEx("Test only works on netdevsim")250251def _require_cmd(self, comm, key, host=None):252cached = self._required_cmd.get(comm, {})253if cached.get(key) is None:254cached[key] = cmd("command -v -- " + comm, fail=False,255shell=True, host=host).ret == 0256self._required_cmd[comm] = cached257return cached[key]258259def require_cmd(self, comm, local=True, remote=False):260if local:261if not self._require_cmd(comm, "local"):262raise KsftSkipEx("Test requires command: " + comm)263if remote:264if not self._require_cmd(comm, "remote", host=self.remote):265raise KsftSkipEx("Test requires (remote) command: " + comm)266267def wait_hw_stats_settle(self):268"""269Wait for HW stats to become consistent, some devices DMA HW stats270periodically so events won't be reflected until next sync.271Good drivers will tell us via ethtool what their sync period is.272"""273if self._stats_settle_time is None:274data = {}275try:276data = ethtool("-c " + self.ifname, json=True)[0]277except CmdExitFailure as e:278if "Operation not supported" not in e.cmd.stderr:279raise280281self._stats_settle_time = 0.025 + \282data.get('stats-block-usecs', 0) / 1000 / 1000283284time.sleep(self._stats_settle_time)285286287