Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
torvalds
GitHub Repository: torvalds/linux
Path: blob/master/tools/testing/selftests/drivers/net/lib/py/env.py
29581 views
1
# SPDX-License-Identifier: GPL-2.0
2
3
import os
4
import time
5
from pathlib import Path
6
from lib.py import KsftSkipEx, KsftXfailEx
7
from lib.py import ksft_setup, wait_file
8
from lib.py import cmd, ethtool, ip, CmdExitFailure
9
from lib.py import NetNS, NetdevSimDev
10
from .remote import Remote
11
12
13
class NetDrvEnvBase:
14
"""
15
Base class for a NIC / host environments
16
17
Attributes:
18
test_dir: Path to the source directory of the test
19
net_lib_dir: Path to the net/lib directory
20
"""
21
def __init__(self, src_path):
22
self.src_path = Path(src_path)
23
self.test_dir = self.src_path.parent.resolve()
24
self.net_lib_dir = (Path(__file__).parent / "../../../../net/lib").resolve()
25
26
self.env = self._load_env_file()
27
28
# Following attrs must be set be inheriting classes
29
self.dev = None
30
31
def _load_env_file(self):
32
env = os.environ.copy()
33
34
src_dir = Path(self.src_path).parent.resolve()
35
if not (src_dir / "net.config").exists():
36
return ksft_setup(env)
37
38
with open((src_dir / "net.config").as_posix(), 'r') as fp:
39
for line in fp.readlines():
40
full_file = line
41
# Strip comments
42
pos = line.find("#")
43
if pos >= 0:
44
line = line[:pos]
45
line = line.strip()
46
if not line:
47
continue
48
pair = line.split('=', maxsplit=1)
49
if len(pair) != 2:
50
raise Exception("Can't parse configuration line:", full_file)
51
env[pair[0]] = pair[1]
52
return ksft_setup(env)
53
54
def __del__(self):
55
pass
56
57
def __enter__(self):
58
ip(f"link set dev {self.dev['ifname']} up")
59
wait_file(f"/sys/class/net/{self.dev['ifname']}/carrier",
60
lambda x: x.strip() == "1")
61
62
return self
63
64
def __exit__(self, ex_type, ex_value, ex_tb):
65
"""
66
__exit__ gets called at the end of a "with" block.
67
"""
68
self.__del__()
69
70
71
class NetDrvEnv(NetDrvEnvBase):
72
"""
73
Class for a single NIC / host env, with no remote end
74
"""
75
def __init__(self, src_path, nsim_test=None, **kwargs):
76
super().__init__(src_path)
77
78
self._ns = None
79
80
if 'NETIF' in self.env:
81
if nsim_test is True:
82
raise KsftXfailEx("Test only works on netdevsim")
83
84
self.dev = ip("-d link show dev " + self.env['NETIF'], json=True)[0]
85
else:
86
if nsim_test is False:
87
raise KsftXfailEx("Test does not work on netdevsim")
88
89
self._ns = NetdevSimDev(**kwargs)
90
self.dev = self._ns.nsims[0].dev
91
self.ifname = self.dev['ifname']
92
self.ifindex = self.dev['ifindex']
93
94
def __del__(self):
95
if self._ns:
96
self._ns.remove()
97
self._ns = None
98
99
100
class NetDrvEpEnv(NetDrvEnvBase):
101
"""
102
Class for an environment with a local device and "remote endpoint"
103
which can be used to send traffic in.
104
105
For local testing it creates two network namespaces and a pair
106
of netdevsim devices.
107
"""
108
109
# Network prefixes used for local tests
110
nsim_v4_pfx = "192.0.2."
111
nsim_v6_pfx = "2001:db8::"
112
113
def __init__(self, src_path, nsim_test=None):
114
super().__init__(src_path)
115
116
self._stats_settle_time = None
117
118
# Things we try to destroy
119
self.remote = None
120
# These are for local testing state
121
self._netns = None
122
self._ns = None
123
self._ns_peer = None
124
125
self.addr_v = { "4": None, "6": None }
126
self.remote_addr_v = { "4": None, "6": None }
127
128
if "NETIF" in self.env:
129
if nsim_test is True:
130
raise KsftXfailEx("Test only works on netdevsim")
131
self._check_env()
132
133
self.dev = ip("-d link show dev " + self.env['NETIF'], json=True)[0]
134
135
self.addr_v["4"] = self.env.get("LOCAL_V4")
136
self.addr_v["6"] = self.env.get("LOCAL_V6")
137
self.remote_addr_v["4"] = self.env.get("REMOTE_V4")
138
self.remote_addr_v["6"] = self.env.get("REMOTE_V6")
139
kind = self.env["REMOTE_TYPE"]
140
args = self.env["REMOTE_ARGS"]
141
else:
142
if nsim_test is False:
143
raise KsftXfailEx("Test does not work on netdevsim")
144
145
self.create_local()
146
147
self.dev = self._ns.nsims[0].dev
148
149
self.addr_v["4"] = self.nsim_v4_pfx + "1"
150
self.addr_v["6"] = self.nsim_v6_pfx + "1"
151
self.remote_addr_v["4"] = self.nsim_v4_pfx + "2"
152
self.remote_addr_v["6"] = self.nsim_v6_pfx + "2"
153
kind = "netns"
154
args = self._netns.name
155
156
self.remote = Remote(kind, args, src_path)
157
158
self.addr_ipver = "6" if self.addr_v["6"] else "4"
159
self.addr = self.addr_v[self.addr_ipver]
160
self.remote_addr = self.remote_addr_v[self.addr_ipver]
161
162
# Bracketed addresses, some commands need IPv6 to be inside []
163
self.baddr = f"[{self.addr_v['6']}]" if self.addr_v["6"] else self.addr_v["4"]
164
self.remote_baddr = f"[{self.remote_addr_v['6']}]" if self.remote_addr_v["6"] else self.remote_addr_v["4"]
165
166
self.ifname = self.dev['ifname']
167
self.ifindex = self.dev['ifindex']
168
169
# resolve remote interface name
170
self.remote_ifname = self.resolve_remote_ifc()
171
172
self._required_cmd = {}
173
174
def create_local(self):
175
self._netns = NetNS()
176
self._ns = NetdevSimDev()
177
self._ns_peer = NetdevSimDev(ns=self._netns)
178
179
with open("/proc/self/ns/net") as nsfd0, \
180
open("/var/run/netns/" + self._netns.name) as nsfd1:
181
ifi0 = self._ns.nsims[0].ifindex
182
ifi1 = self._ns_peer.nsims[0].ifindex
183
NetdevSimDev.ctrl_write('link_device',
184
f'{nsfd0.fileno()}:{ifi0} {nsfd1.fileno()}:{ifi1}')
185
186
ip(f" addr add dev {self._ns.nsims[0].ifname} {self.nsim_v4_pfx}1/24")
187
ip(f"-6 addr add dev {self._ns.nsims[0].ifname} {self.nsim_v6_pfx}1/64 nodad")
188
ip(f" link set dev {self._ns.nsims[0].ifname} up")
189
190
ip(f" addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v4_pfx}2/24", ns=self._netns)
191
ip(f"-6 addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v6_pfx}2/64 nodad", ns=self._netns)
192
ip(f" link set dev {self._ns_peer.nsims[0].ifname} up", ns=self._netns)
193
194
def _check_env(self):
195
vars_needed = [
196
["LOCAL_V4", "LOCAL_V6"],
197
["REMOTE_V4", "REMOTE_V6"],
198
["REMOTE_TYPE"],
199
["REMOTE_ARGS"]
200
]
201
missing = []
202
203
for choice in vars_needed:
204
for entry in choice:
205
if entry in self.env:
206
break
207
else:
208
missing.append(choice)
209
# Make sure v4 / v6 configs are symmetric
210
if ("LOCAL_V6" in self.env) != ("REMOTE_V6" in self.env):
211
missing.append(["LOCAL_V6", "REMOTE_V6"])
212
if ("LOCAL_V4" in self.env) != ("REMOTE_V4" in self.env):
213
missing.append(["LOCAL_V4", "REMOTE_V4"])
214
if missing:
215
raise Exception("Invalid environment, missing configuration:", missing,
216
"Please see tools/testing/selftests/drivers/net/README.rst")
217
218
def resolve_remote_ifc(self):
219
v4 = v6 = None
220
if self.remote_addr_v["4"]:
221
v4 = ip("addr show to " + self.remote_addr_v["4"], json=True, host=self.remote)
222
if self.remote_addr_v["6"]:
223
v6 = ip("addr show to " + self.remote_addr_v["6"], json=True, host=self.remote)
224
if v4 and v6 and v4[0]["ifname"] != v6[0]["ifname"]:
225
raise Exception("Can't resolve remote interface name, v4 and v6 don't match")
226
if (v4 and len(v4) > 1) or (v6 and len(v6) > 1):
227
raise Exception("Can't resolve remote interface name, multiple interfaces match")
228
return v6[0]["ifname"] if v6 else v4[0]["ifname"]
229
230
def __del__(self):
231
if self._ns:
232
self._ns.remove()
233
self._ns = None
234
if self._ns_peer:
235
self._ns_peer.remove()
236
self._ns_peer = None
237
if self._netns:
238
del self._netns
239
self._netns = None
240
if self.remote:
241
del self.remote
242
self.remote = None
243
244
def require_ipver(self, ipver):
245
if not self.addr_v[ipver] or not self.remote_addr_v[ipver]:
246
raise KsftSkipEx(f"Test requires IPv{ipver} connectivity")
247
248
def require_nsim(self):
249
if self._ns is None:
250
raise KsftXfailEx("Test only works on netdevsim")
251
252
def _require_cmd(self, comm, key, host=None):
253
cached = self._required_cmd.get(comm, {})
254
if cached.get(key) is None:
255
cached[key] = cmd("command -v -- " + comm, fail=False,
256
shell=True, host=host).ret == 0
257
self._required_cmd[comm] = cached
258
return cached[key]
259
260
def require_cmd(self, comm, local=True, remote=False):
261
if local:
262
if not self._require_cmd(comm, "local"):
263
raise KsftSkipEx("Test requires command: " + comm)
264
if remote:
265
if not self._require_cmd(comm, "remote", host=self.remote):
266
raise KsftSkipEx("Test requires (remote) command: " + comm)
267
268
def wait_hw_stats_settle(self):
269
"""
270
Wait for HW stats to become consistent, some devices DMA HW stats
271
periodically so events won't be reflected until next sync.
272
Good drivers will tell us via ethtool what their sync period is.
273
"""
274
if self._stats_settle_time is None:
275
data = {}
276
try:
277
data = ethtool("-c " + self.ifname, json=True)[0]
278
except CmdExitFailure as e:
279
if "Operation not supported" not in e.cmd.stderr:
280
raise
281
282
self._stats_settle_time = 0.025 + \
283
data.get('stats-block-usecs', 0) / 1000 / 1000
284
285
time.sleep(self._stats_settle_time)
286
287