summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/vyos/configdep.py2
-rw-r--r--python/vyos/configdict.py14
-rw-r--r--python/vyos/firewall.py17
-rw-r--r--python/vyos/ifconfig/bond.py13
-rw-r--r--python/vyos/ifconfig/ethernet.py34
-rw-r--r--python/vyos/ifconfig/vxlan.py25
-rw-r--r--python/vyos/ifconfig/wireguard.py5
-rw-r--r--python/vyos/nat.py34
-rw-r--r--python/vyos/qos/trafficshaper.py9
-rw-r--r--python/vyos/template.py3
-rw-r--r--python/vyos/utils/dict.py59
-rw-r--r--python/vyos/utils/network.py15
-rw-r--r--python/vyos/vpp.py315
13 files changed, 197 insertions, 348 deletions
diff --git a/python/vyos/configdep.py b/python/vyos/configdep.py
index 05d9a3fa3..8a28811eb 100644
--- a/python/vyos/configdep.py
+++ b/python/vyos/configdep.py
@@ -43,7 +43,7 @@ def canon_name_of_path(path: str) -> str:
return canon_name(script)
def caller_name() -> str:
- return stack()[-1].filename
+ return stack()[2].filename
def read_dependency_dict(dependency_dir: str = dependency_dir) -> dict:
res = {}
diff --git a/python/vyos/configdict.py b/python/vyos/configdict.py
index 71a06b625..075ffe466 100644
--- a/python/vyos/configdict.py
+++ b/python/vyos/configdict.py
@@ -258,10 +258,10 @@ def has_address_configured(conf, intf):
old_level = conf.get_level()
conf.set_level([])
- intfpath = 'interfaces ' + Section.get_config_path(intf)
- if ( conf.exists(f'{intfpath} address') or
- conf.exists(f'{intfpath} ipv6 address autoconf') or
- conf.exists(f'{intfpath} ipv6 address eui64') ):
+ intfpath = ['interfaces', Section.get_config_path(intf)]
+ if (conf.exists([intfpath, 'address']) or
+ conf.exists([intfpath, 'ipv6', 'address', 'autoconf']) or
+ conf.exists([intfpath, 'ipv6', 'address', 'eui64'])):
ret = True
conf.set_level(old_level)
@@ -279,8 +279,7 @@ def has_vrf_configured(conf, intf):
old_level = conf.get_level()
conf.set_level([])
- tmp = ['interfaces', Section.get_config_path(intf), 'vrf']
- if conf.exists(tmp):
+ if conf.exists(['interfaces', Section.get_config_path(intf), 'vrf']):
ret = True
conf.set_level(old_level)
@@ -298,8 +297,7 @@ def has_vlan_subinterface_configured(conf, intf):
ret = False
intfpath = ['interfaces', Section.section(intf), intf]
- if ( conf.exists(intfpath + ['vif']) or
- conf.exists(intfpath + ['vif-s'])):
+ if (conf.exists(intfpath + ['vif']) or conf.exists(intfpath + ['vif-s'])):
ret = True
return ret
diff --git a/python/vyos/firewall.py b/python/vyos/firewall.py
index c07ed1adf..a2622fa00 100644
--- a/python/vyos/firewall.py
+++ b/python/vyos/firewall.py
@@ -87,7 +87,6 @@ def nft_action(vyos_action):
def parse_rule(rule_conf, hook, fw_name, rule_id, ip_name):
output = []
- #def_suffix = '6' if ip_name == 'ip6' else ''
if ip_name == 'ip6':
def_suffix = '6'
@@ -97,7 +96,7 @@ def parse_rule(rule_conf, hook, fw_name, rule_id, ip_name):
family = 'bri' if ip_name == 'bri' else 'ipv4'
if 'state' in rule_conf and rule_conf['state']:
- states = ",".join([s for s, v in rule_conf['state'].items() if v == 'enable'])
+ states = ",".join([s for s in rule_conf['state']])
if states:
output.append(f'ct state {{{states}}}')
@@ -275,14 +274,14 @@ def parse_rule(rule_conf, hook, fw_name, rule_id, ip_name):
if 'inbound_interface' in rule_conf:
operator = ''
- if 'interface_name' in rule_conf['inbound_interface']:
- iiface = rule_conf['inbound_interface']['interface_name']
+ if 'name' in rule_conf['inbound_interface']:
+ iiface = rule_conf['inbound_interface']['name']
if iiface[0] == '!':
operator = '!='
iiface = iiface[1:]
output.append(f'iifname {operator} {{{iiface}}}')
else:
- iiface = rule_conf['inbound_interface']['interface_group']
+ iiface = rule_conf['inbound_interface']['group']
if iiface[0] == '!':
operator = '!='
iiface = iiface[1:]
@@ -290,14 +289,14 @@ def parse_rule(rule_conf, hook, fw_name, rule_id, ip_name):
if 'outbound_interface' in rule_conf:
operator = ''
- if 'interface_name' in rule_conf['outbound_interface']:
- oiface = rule_conf['outbound_interface']['interface_name']
+ if 'name' in rule_conf['outbound_interface']:
+ oiface = rule_conf['outbound_interface']['name']
if oiface[0] == '!':
operator = '!='
oiface = oiface[1:]
output.append(f'oifname {operator} {{{oiface}}}')
else:
- oiface = rule_conf['outbound_interface']['interface_group']
+ oiface = rule_conf['outbound_interface']['group']
if oiface[0] == '!':
operator = '!='
oiface = oiface[1:]
@@ -395,7 +394,7 @@ def parse_rule(rule_conf, hook, fw_name, rule_id, ip_name):
if 'priority' in rule_conf['vlan']:
output.append(f'vlan pcp {rule_conf["vlan"]["priority"]}')
- if 'log' in rule_conf and rule_conf['log'] == 'enable':
+ if 'log' in rule_conf:
action = rule_conf['action'] if 'action' in rule_conf else 'accept'
#output.append(f'log prefix "[{fw_name[:19]}-{rule_id}-{action[:1].upper()}]"')
output.append(f'log prefix "[{family}-{hook}-{fw_name}-{rule_id}-{action[:1].upper()}]"')
diff --git a/python/vyos/ifconfig/bond.py b/python/vyos/ifconfig/bond.py
index d1d7d48c4..45e6e4c16 100644
--- a/python/vyos/ifconfig/bond.py
+++ b/python/vyos/ifconfig/bond.py
@@ -92,6 +92,19 @@ class BondIf(Interface):
}
}}
+ @staticmethod
+ def get_inherit_bond_options() -> list:
+ """
+ Returns list of option
+ which are inherited from bond interface to member interfaces
+ :return: List of interface options
+ :rtype: list
+ """
+ options = [
+ 'mtu'
+ ]
+ return options
+
def remove(self):
"""
Remove interface from operating system. Removing the interface
diff --git a/python/vyos/ifconfig/ethernet.py b/python/vyos/ifconfig/ethernet.py
index 285542057..aa1e87744 100644
--- a/python/vyos/ifconfig/ethernet.py
+++ b/python/vyos/ifconfig/ethernet.py
@@ -75,6 +75,40 @@ class EthernetIf(Interface):
},
}}
+ @staticmethod
+ def get_bond_member_allowed_options() -> list:
+ """
+ Return list of options which are allowed for changing,
+ when interface is a bond member
+ :return: List of interface options
+ :rtype: list
+ """
+ bond_allowed_sections = [
+ 'description',
+ 'disable',
+ 'disable_flow_control',
+ 'disable_link_detect',
+ 'duplex',
+ 'eapol.ca_certificate',
+ 'eapol.certificate',
+ 'eapol.passphrase',
+ 'mirror.egress',
+ 'mirror.ingress',
+ 'offload.gro',
+ 'offload.gso',
+ 'offload.lro',
+ 'offload.rfs',
+ 'offload.rps',
+ 'offload.sg',
+ 'offload.tso',
+ 'redirect',
+ 'ring_buffer.rx',
+ 'ring_buffer.tx',
+ 'speed',
+ 'hw_id'
+ ]
+ return bond_allowed_sections
+
def __init__(self, ifname, **kargs):
super().__init__(ifname, **kargs)
self.ethtool = Ethtool(ifname)
diff --git a/python/vyos/ifconfig/vxlan.py b/python/vyos/ifconfig/vxlan.py
index 1fe5db7cd..8c5a0220e 100644
--- a/python/vyos/ifconfig/vxlan.py
+++ b/python/vyos/ifconfig/vxlan.py
@@ -56,6 +56,10 @@ class VXLANIf(Interface):
}
_command_set = {**Interface._command_set, **{
+ 'neigh_suppress': {
+ 'validate': lambda v: assert_list(v, ['on', 'off']),
+ 'shellcmd': 'bridge link set dev {ifname} neigh_suppress {value} learning off',
+ },
'vlan_tunnel': {
'validate': lambda v: assert_list(v, ['on', 'off']),
'shellcmd': 'bridge link set dev {ifname} vlan_tunnel {value}',
@@ -68,8 +72,8 @@ class VXLANIf(Interface):
# - https://man7.org/linux/man-pages/man8/ip-link.8.html
mapping = {
'group' : 'group',
- 'external' : 'external',
'gpe' : 'gpe',
+ 'parameters.external' : 'external',
'parameters.ip.df' : 'df',
'parameters.ip.tos' : 'tos',
'parameters.ip.ttl' : 'ttl',
@@ -113,6 +117,19 @@ class VXLANIf(Interface):
'port {port} dev {ifname}'
self._cmd(cmd.format(**self.config))
+ def set_neigh_suppress(self, state):
+ """
+ Controls whether neigh discovery (arp and nd) proxy and suppression
+ is enabled on the port. By default this flag is off.
+ """
+
+ # Determine current OS Kernel neigh_suppress setting - only adjust when needed
+ tmp = get_interface_config(self.ifname)
+ cur_state = 'on' if dict_search(f'linkinfo.info_slave_data.neigh_suppress', tmp) == True else 'off'
+ new_state = 'on' if state else 'off'
+ if cur_state != new_state:
+ self.set_interface('neigh_suppress', state)
+
def set_vlan_vni_mapping(self, state):
"""
Controls whether vlan to tunnel mapping is enabled on the port.
@@ -163,3 +180,9 @@ class VXLANIf(Interface):
# Enable/Disable VLAN tunnel mapping
# This is only possible after the interface was assigned to the bridge
self.set_vlan_vni_mapping(dict_search('vlan_to_vni', config) != None)
+
+ # Enable/Disable neighbor suppression and learning, there is no need to
+ # explicitly "disable" it, as VXLAN interface will be recreated if anything
+ # under "parameters" changes.
+ if dict_search('parameters.neighbor_suppress', config) != None:
+ self.set_neigh_suppress('on')
diff --git a/python/vyos/ifconfig/wireguard.py b/python/vyos/ifconfig/wireguard.py
index 4aac103ec..5704f8b64 100644
--- a/python/vyos/ifconfig/wireguard.py
+++ b/python/vyos/ifconfig/wireguard.py
@@ -167,11 +167,6 @@ class WireGuardIf(Interface):
interface setup code and provide a single point of entry when workin
on any interface. """
- # remove no longer associated peers first
- if 'peer_remove' in config:
- for peer, public_key in config['peer_remove'].items():
- self._cmd(f'wg set {self.ifname} peer {public_key} remove')
-
tmp_file = NamedTemporaryFile('w')
tmp_file.write(config['private_key'])
tmp_file.flush()
diff --git a/python/vyos/nat.py b/python/vyos/nat.py
index 9cbc2b96e..392d38772 100644
--- a/python/vyos/nat.py
+++ b/python/vyos/nat.py
@@ -32,14 +32,34 @@ def parse_nat_rule(rule_conf, rule_id, nat_type, ipv6=False):
translation_str = ''
if 'inbound_interface' in rule_conf:
- ifname = rule_conf['inbound_interface']
- if ifname != 'any':
- output.append(f'iifname "{ifname}"')
+ operator = ''
+ if 'name' in rule_conf['inbound_interface']:
+ iiface = rule_conf['inbound_interface']['name']
+ if iiface[0] == '!':
+ operator = '!='
+ iiface = iiface[1:]
+ output.append(f'iifname {operator} {{{iiface}}}')
+ else:
+ iiface = rule_conf['inbound_interface']['group']
+ if iiface[0] == '!':
+ operator = '!='
+ iiface = iiface[1:]
+ output.append(f'iifname {operator} @I_{iiface}')
if 'outbound_interface' in rule_conf:
- ifname = rule_conf['outbound_interface']
- if ifname != 'any':
- output.append(f'oifname "{ifname}"')
+ operator = ''
+ if 'name' in rule_conf['outbound_interface']:
+ oiface = rule_conf['outbound_interface']['name']
+ if oiface[0] == '!':
+ operator = '!='
+ oiface = oiface[1:]
+ output.append(f'oifname {operator} {{{oiface}}}')
+ else:
+ oiface = rule_conf['outbound_interface']['group']
+ if oiface[0] == '!':
+ operator = '!='
+ oiface = oiface[1:]
+ output.append(f'oifname {operator} @I_{oiface}')
if 'protocol' in rule_conf and rule_conf['protocol'] != 'all':
protocol = rule_conf['protocol']
@@ -150,7 +170,7 @@ def parse_nat_rule(rule_conf, rule_id, nat_type, ipv6=False):
operator = ''
if addr_prefix[:1] == '!':
operator = '!='
- addr_prefix = addr[1:]
+ addr_prefix = addr_prefix[1:]
output.append(f'ip6 {prefix}addr {operator} {addr_prefix}')
port = dict_search_args(side_conf, 'port')
diff --git a/python/vyos/qos/trafficshaper.py b/python/vyos/qos/trafficshaper.py
index c63c7cf39..0d5f9a8a1 100644
--- a/python/vyos/qos/trafficshaper.py
+++ b/python/vyos/qos/trafficshaper.py
@@ -1,4 +1,4 @@
-# Copyright 2022 VyOS maintainers and contributors <maintainers@vyos.io>
+# Copyright 2022-2023 VyOS maintainers and contributors <maintainers@vyos.io>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -89,6 +89,10 @@ class TrafficShaper(QoSBase):
if 'priority' in cls_config:
priority = cls_config['priority']
tmp += f' prio {priority}'
+
+ if 'ceiling' in cls_config:
+ f_ceil = self._rate_convert(cls_config['ceiling'])
+ tmp += f' ceil {f_ceil}'
self._cmd(tmp)
tmp = f'tc qdisc replace dev {self._interface} parent {self._parent:x}:{cls:x} sfq'
@@ -102,6 +106,9 @@ class TrafficShaper(QoSBase):
if 'priority' in config['default']:
priority = config['default']['priority']
tmp += f' prio {priority}'
+ if 'ceiling' in config['default']:
+ f_ceil = self._rate_convert(config['default']['ceiling'])
+ tmp += f' ceil {f_ceil}'
self._cmd(tmp)
tmp = f'tc qdisc replace dev {self._interface} parent {self._parent:x}:{default_minor_id:x} sfq'
diff --git a/python/vyos/template.py b/python/vyos/template.py
index 3be486cc4..c778d0de8 100644
--- a/python/vyos/template.py
+++ b/python/vyos/template.py
@@ -582,10 +582,11 @@ def nft_rule(rule_conf, fw_hook, fw_name, rule_id, ip_name='ip'):
def nft_default_rule(fw_conf, fw_name, ipv6=False):
output = ['counter']
default_action = fw_conf['default_action']
+ family = 'ipv6' if ipv6 else 'ipv4'
if 'enable_default_log' in fw_conf:
action_suffix = default_action[:1].upper()
- output.append(f'log prefix "[{fw_name[:19]}-default-{action_suffix}]"')
+ output.append(f'log prefix "[{family}-{fw_name[:19]}-default-{action_suffix}]"')
#output.append(nft_action(default_action))
output.append(f'{default_action}')
diff --git a/python/vyos/utils/dict.py b/python/vyos/utils/dict.py
index 9484eacdd..d36b6fcfb 100644
--- a/python/vyos/utils/dict.py
+++ b/python/vyos/utils/dict.py
@@ -199,6 +199,31 @@ def dict_search_recursive(dict_object, key, path=[]):
for x in dict_search_recursive(j, key, new_path):
yield x
+
+def dict_set(key_path, value, dict_object):
+ """ Set value to Python dictionary (dict_object) using path to key delimited by dot (.).
+ The key will be added if it does not exist.
+ """
+ path_list = key_path.split(".")
+ dynamic_dict = dict_object
+ if len(path_list) > 0:
+ for i in range(0, len(path_list)-1):
+ dynamic_dict = dynamic_dict[path_list[i]]
+ dynamic_dict[path_list[len(path_list)-1]] = value
+
+def dict_delete(key_path, dict_object):
+ """ Delete key in Python dictionary (dict_object) using path to key delimited by dot (.).
+ """
+ path_dict = dict_object
+ path_list = key_path.split('.')
+ inside = path_list[:-1]
+ if not inside:
+ del dict_object[path_list]
+ else:
+ for key in path_list[:-1]:
+ path_dict = path_dict[key]
+ del path_dict[path_list[len(path_list)-1]]
+
def dict_to_list(d, save_key_to=None):
""" Convert a dict to a list of dicts.
@@ -228,6 +253,39 @@ def dict_to_list(d, save_key_to=None):
return collect
+def dict_to_paths_values(conf: dict) -> dict:
+ """
+ Convert nested dictionary to simple dictionary, where key is a path is delimited by dot (.).
+ """
+ list_of_paths = []
+ dict_of_options ={}
+ for path in dict_to_key_paths(conf):
+ str_path = '.'.join(path)
+ list_of_paths.append(str_path)
+
+ for path in list_of_paths:
+ dict_of_options[path] = dict_search(path,conf)
+
+ return dict_of_options
+def dict_to_key_paths(d: dict) -> list:
+ """ Generator to return list of key paths from dict of list[str]|str
+ """
+ def func(d, path):
+ if isinstance(d, dict):
+ if not d:
+ yield path
+ for k, v in d.items():
+ for r in func(v, path + [k]):
+ yield r
+ elif isinstance(d, list):
+ yield path
+ elif isinstance(d, str):
+ yield path
+ else:
+ raise ValueError('object is not a dict of strings/list of strings')
+ for r in func(d, []):
+ yield r
+
def dict_to_paths(d: dict) -> list:
""" Generator to return list of paths from dict of list[str]|str
"""
@@ -305,3 +363,4 @@ class FixedDict(dict):
if k not in self._allowed:
raise ConfigError(f'Option "{k}" has no defined default')
super().__setitem__(k, v)
+
diff --git a/python/vyos/utils/network.py b/python/vyos/utils/network.py
index 9354bd495..5d19f256b 100644
--- a/python/vyos/utils/network.py
+++ b/python/vyos/utils/network.py
@@ -197,6 +197,21 @@ def get_all_vrfs():
data[name] = entry
return data
+def interface_list() -> list:
+ """
+ Get list of interfaces in system
+ :rtype: list
+ """
+ return Section.interfaces()
+
+
+def vrf_list() -> list:
+ """
+ Get list of VRFs in system
+ :rtype: list
+ """
+ return list(get_all_vrfs().keys())
+
def mac2eui64(mac, prefix=None):
"""
Convert a MAC address to a EUI64 address or, with prefix provided, a full
diff --git a/python/vyos/vpp.py b/python/vyos/vpp.py
deleted file mode 100644
index 76e5d29c3..000000000
--- a/python/vyos/vpp.py
+++ /dev/null
@@ -1,315 +0,0 @@
-# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io>
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library. If not, see <http://www.gnu.org/licenses/>.
-
-from functools import wraps
-from pathlib import Path
-from re import search as re_search, fullmatch as re_fullmatch, MULTILINE as re_M
-from subprocess import run
-from time import sleep
-
-from vpp_papi import VPPApiClient
-from vpp_papi import VPPIOError, VPPValueError
-
-
-class VPPControl:
- """Control VPP network stack
- """
-
- class _Decorators:
- """Decorators for VPPControl
- """
-
- @classmethod
- def api_call(cls, decorated_func):
- """Check if API is connected before API call
-
- Args:
- decorated_func: function to decorate
-
- Raises:
- VPPIOError: Connection to API is not established
- """
-
- @wraps(decorated_func)
- def api_safe_wrapper(cls, *args, **kwargs):
- if not cls.vpp_api_client.transport.connected:
- raise VPPIOError(2, 'VPP API is not connected')
- return decorated_func(cls, *args, **kwargs)
-
- return api_safe_wrapper
-
- @classmethod
- def check_retval(cls, decorated_func):
- """Check retval from API response
-
- Args:
- decorated_func: function to decorate
-
- Raises:
- VPPValueError: raised when retval is not 0
- """
-
- @wraps(decorated_func)
- def check_retval_wrapper(cls, *args, **kwargs):
- return_value = decorated_func(cls, *args, **kwargs)
- if not return_value.retval == 0:
- raise VPPValueError(
- f'VPP API call failed: {return_value.retval}')
- return return_value
-
- return check_retval_wrapper
-
- def __init__(self, attempts: int = 5, interval: int = 1000) -> None:
- """Create VPP API connection
-
- Args:
- attempts (int, optional): attempts to connect. Defaults to 5.
- interval (int, optional): interval between attempts in ms. Defaults to 1000.
-
- Raises:
- VPPIOError: Connection to API cannot be established
- """
- self.vpp_api_client = VPPApiClient()
- # connect with interval
- while attempts:
- try:
- attempts -= 1
- self.vpp_api_client.connect('vpp-vyos')
- break
- except (ConnectionRefusedError, FileNotFoundError) as err:
- print(f'VPP API connection timeout: {err}')
- sleep(interval / 1000)
- # raise exception if connection was not successful in the end
- if not self.vpp_api_client.transport.connected:
- raise VPPIOError(2, 'Cannot connect to VPP API')
-
- def __del__(self) -> None:
- """Disconnect from VPP API (destructor)
- """
- self.disconnect()
-
- def disconnect(self) -> None:
- """Disconnect from VPP API
- """
- if self.vpp_api_client.transport.connected:
- self.vpp_api_client.disconnect()
-
- @_Decorators.check_retval
- @_Decorators.api_call
- def cli_cmd(self, command: str):
- """Send raw CLI command
-
- Args:
- command (str): command to send
-
- Returns:
- vpp_papi.vpp_serializer.cli_inband_reply: CLI reply class
- """
- return self.vpp_api_client.api.cli_inband(cmd=command)
-
- @_Decorators.api_call
- def get_mac(self, ifname: str) -> str:
- """Find MAC address by interface name in VPP
-
- Args:
- ifname (str): interface name inside VPP
-
- Returns:
- str: MAC address
- """
- for iface in self.vpp_api_client.api.sw_interface_dump():
- if iface.interface_name == ifname:
- return iface.l2_address.mac_string
- return ''
-
- @_Decorators.api_call
- def get_sw_if_index(self, ifname: str) -> int | None:
- """Find interface index by interface name in VPP
-
- Args:
- ifname (str): interface name inside VPP
-
- Returns:
- int | None: Interface index or None (if was not fount)
- """
- for iface in self.vpp_api_client.api.sw_interface_dump():
- if iface.interface_name == ifname:
- return iface.sw_if_index
- return None
-
- @_Decorators.check_retval
- @_Decorators.api_call
- def lcp_pair_add(self, iface_name_vpp: str, iface_name_kernel: str) -> None:
- """Create LCP interface pair between VPP and kernel
-
- Args:
- iface_name_vpp (str): interface name in VPP
- iface_name_kernel (str): interface name in kernel
- """
- iface_index = self.get_sw_if_index(iface_name_vpp)
- if iface_index:
- return self.vpp_api_client.api.lcp_itf_pair_add_del(
- is_add=True,
- sw_if_index=iface_index,
- host_if_name=iface_name_kernel)
-
- @_Decorators.check_retval
- @_Decorators.api_call
- def lcp_pair_del(self, iface_name_vpp: str, iface_name_kernel: str) -> None:
- """Delete LCP interface pair between VPP and kernel
-
- Args:
- iface_name_vpp (str): interface name in VPP
- iface_name_kernel (str): interface name in kernel
- """
- iface_index = self.get_sw_if_index(iface_name_vpp)
- if iface_index:
- return self.vpp_api_client.api.lcp_itf_pair_add_del(
- is_add=False,
- sw_if_index=iface_index,
- host_if_name=iface_name_kernel)
-
- @_Decorators.check_retval
- @_Decorators.api_call
- def iface_rxmode(self, iface_name: str, rx_mode: str) -> None:
- """Set interface rx-mode in VPP
-
- Args:
- iface_name (str): interface name in VPP
- rx_mode (str): mode (polling, interrupt, adaptive)
- """
- modes_dict: dict[str, int] = {
- 'polling': 1,
- 'interrupt': 2,
- 'adaptive': 3
- }
- if rx_mode not in modes_dict:
- raise VPPValueError(f'Mode {rx_mode} is not known')
- iface_index = self.get_sw_if_index(iface_name)
- return self.vpp_api_client.api.sw_interface_set_rx_mode(
- sw_if_index=iface_index, mode=modes_dict[rx_mode])
-
- @_Decorators.api_call
- def get_pci_addr(self, ifname: str) -> str:
- """Find PCI address of interface by interface name in VPP
-
- Args:
- ifname (str): interface name inside VPP
-
- Returns:
- str: PCI address
- """
- hw_info = self.cli_cmd(f'show hardware-interfaces {ifname}').reply
-
- regex_filter = r'^\s+pci: device (?P<device>\w+:\w+) subsystem (?P<subsystem>\w+:\w+) address (?P<address>\w+:\w+:\w+\.\w+) numa (?P<numa>\w+)$'
- re_obj = re_search(regex_filter, hw_info, re_M)
-
- # return empty string if no interface or no PCI info was found
- if not hw_info or not re_obj:
- return ''
-
- address = re_obj.groupdict().get('address', '')
-
- # we need to modify address to math kernel style
- # for example: 0000:06:14.00 -> 0000:06:14.0
- address_chunks: list[str] = address.split('.')
- address_normalized: str = f'{address_chunks[0]}.{int(address_chunks[1])}'
-
- return address_normalized
-
-
-class HostControl:
- """Control Linux host
- """
-
- @staticmethod
- def pci_rescan(pci_addr: str = '') -> None:
- """Rescan PCI device by removing it and rescan PCI bus
-
- If PCI address is not defined - just rescan PCI bus
-
- Args:
- address (str, optional): PCI address of device. Defaults to ''.
- """
- if pci_addr:
- device_file = Path(f'/sys/bus/pci/devices/{pci_addr}/remove')
- if device_file.exists():
- device_file.write_text('1')
- # wait 10 seconds max until device will be removed
- attempts = 100
- while device_file.exists() and attempts:
- attempts -= 1
- sleep(0.1)
- if device_file.exists():
- raise TimeoutError(
- f'Timeout was reached for removing PCI device {pci_addr}'
- )
- else:
- raise FileNotFoundError(f'PCI device {pci_addr} does not exist')
- rescan_file = Path('/sys/bus/pci/rescan')
- rescan_file.write_text('1')
- if pci_addr:
- # wait 10 seconds max until device will be installed
- attempts = 100
- while not device_file.exists() and attempts:
- attempts -= 1
- sleep(0.1)
- if not device_file.exists():
- raise TimeoutError(
- f'Timeout was reached for installing PCI device {pci_addr}')
-
- @staticmethod
- def get_eth_name(pci_addr: str) -> str:
- """Find Ethernet interface name by PCI address
-
- Args:
- pci_addr (str): PCI address
-
- Raises:
- FileNotFoundError: no Ethernet interface was found
-
- Returns:
- str: Ethernet interface name
- """
- # find all PCI devices with eth* names
- net_devs: dict[str, str] = {}
- net_devs_dir = Path('/sys/class/net')
- regex_filter = r'^/sys/devices/pci[\w/:\.]+/(?P<pci_addr>\w+:\w+:\w+\.\w+)/[\w/:\.]+/(?P<iface_name>eth\d+)$'
- for dir in net_devs_dir.iterdir():
- real_dir: str = dir.resolve().as_posix()
- re_obj = re_fullmatch(regex_filter, real_dir)
- if re_obj:
- iface_name: str = re_obj.group('iface_name')
- iface_addr: str = re_obj.group('pci_addr')
- net_devs.update({iface_addr: iface_name})
- # match to provided PCI address and return a name if found
- if pci_addr in net_devs:
- return net_devs[pci_addr]
- # raise error if device was not found
- raise FileNotFoundError(
- f'PCI device {pci_addr} not found in ethernet interfaces')
-
- @staticmethod
- def rename_iface(name_old: str, name_new: str) -> None:
- """Rename interface
-
- Args:
- name_old (str): old name
- name_new (str): new name
- """
- rename_cmd: list[str] = [
- 'ip', 'link', 'set', name_old, 'name', name_new
- ]
- run(rename_cmd)