diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/vyos/configdep.py | 2 | ||||
| -rw-r--r-- | python/vyos/configdict.py | 14 | ||||
| -rw-r--r-- | python/vyos/firewall.py | 17 | ||||
| -rw-r--r-- | python/vyos/ifconfig/bond.py | 13 | ||||
| -rw-r--r-- | python/vyos/ifconfig/ethernet.py | 34 | ||||
| -rw-r--r-- | python/vyos/ifconfig/vxlan.py | 25 | ||||
| -rw-r--r-- | python/vyos/ifconfig/wireguard.py | 5 | ||||
| -rw-r--r-- | python/vyos/nat.py | 34 | ||||
| -rw-r--r-- | python/vyos/qos/trafficshaper.py | 9 | ||||
| -rw-r--r-- | python/vyos/template.py | 3 | ||||
| -rw-r--r-- | python/vyos/utils/dict.py | 59 | ||||
| -rw-r--r-- | python/vyos/utils/network.py | 15 | ||||
| -rw-r--r-- | python/vyos/vpp.py | 315 | 
13 files changed, 197 insertions, 348 deletions
| diff --git a/python/vyos/configdep.py b/python/vyos/configdep.py index 05d9a3fa3..8a28811eb 100644 --- a/python/vyos/configdep.py +++ b/python/vyos/configdep.py @@ -43,7 +43,7 @@ def canon_name_of_path(path: str) -> str:      return canon_name(script)  def caller_name() -> str: -    return stack()[-1].filename +    return stack()[2].filename  def read_dependency_dict(dependency_dir: str = dependency_dir) -> dict:      res = {} diff --git a/python/vyos/configdict.py b/python/vyos/configdict.py index 71a06b625..075ffe466 100644 --- a/python/vyos/configdict.py +++ b/python/vyos/configdict.py @@ -258,10 +258,10 @@ def has_address_configured(conf, intf):      old_level = conf.get_level()      conf.set_level([]) -    intfpath = 'interfaces ' + Section.get_config_path(intf) -    if ( conf.exists(f'{intfpath} address') or -            conf.exists(f'{intfpath} ipv6 address autoconf') or -            conf.exists(f'{intfpath} ipv6 address eui64') ): +    intfpath = ['interfaces', Section.get_config_path(intf)] +    if (conf.exists([intfpath, 'address']) or +        conf.exists([intfpath, 'ipv6', 'address', 'autoconf']) or +        conf.exists([intfpath, 'ipv6', 'address', 'eui64'])):          ret = True      conf.set_level(old_level) @@ -279,8 +279,7 @@ def has_vrf_configured(conf, intf):      old_level = conf.get_level()      conf.set_level([]) -    tmp = ['interfaces', Section.get_config_path(intf), 'vrf'] -    if conf.exists(tmp): +    if conf.exists(['interfaces', Section.get_config_path(intf), 'vrf']):          ret = True      conf.set_level(old_level) @@ -298,8 +297,7 @@ def has_vlan_subinterface_configured(conf, intf):      ret = False      intfpath = ['interfaces', Section.section(intf), intf] -    if ( conf.exists(intfpath + ['vif']) or -            conf.exists(intfpath + ['vif-s'])): +    if (conf.exists(intfpath + ['vif']) or conf.exists(intfpath + ['vif-s'])):          ret = True      return ret diff --git a/python/vyos/firewall.py b/python/vyos/firewall.py index c07ed1adf..a2622fa00 100644 --- a/python/vyos/firewall.py +++ b/python/vyos/firewall.py @@ -87,7 +87,6 @@ def nft_action(vyos_action):  def parse_rule(rule_conf, hook, fw_name, rule_id, ip_name):      output = [] -    #def_suffix = '6' if ip_name == 'ip6' else ''      if ip_name == 'ip6':          def_suffix = '6' @@ -97,7 +96,7 @@ def parse_rule(rule_conf, hook, fw_name, rule_id, ip_name):          family = 'bri' if ip_name == 'bri' else 'ipv4'      if 'state' in rule_conf and rule_conf['state']: -        states = ",".join([s for s, v in rule_conf['state'].items() if v == 'enable']) +        states = ",".join([s for s in rule_conf['state']])          if states:              output.append(f'ct state {{{states}}}') @@ -275,14 +274,14 @@ def parse_rule(rule_conf, hook, fw_name, rule_id, ip_name):      if 'inbound_interface' in rule_conf:          operator = '' -        if 'interface_name' in rule_conf['inbound_interface']: -            iiface = rule_conf['inbound_interface']['interface_name'] +        if 'name' in rule_conf['inbound_interface']: +            iiface = rule_conf['inbound_interface']['name']              if iiface[0] == '!':                  operator = '!='                  iiface = iiface[1:]              output.append(f'iifname {operator} {{{iiface}}}')          else: -            iiface = rule_conf['inbound_interface']['interface_group'] +            iiface = rule_conf['inbound_interface']['group']              if iiface[0] == '!':                  operator = '!='                  iiface = iiface[1:] @@ -290,14 +289,14 @@ def parse_rule(rule_conf, hook, fw_name, rule_id, ip_name):      if 'outbound_interface' in rule_conf:          operator = '' -        if 'interface_name' in rule_conf['outbound_interface']: -            oiface = rule_conf['outbound_interface']['interface_name'] +        if 'name' in rule_conf['outbound_interface']: +            oiface = rule_conf['outbound_interface']['name']              if oiface[0] == '!':                  operator = '!='                  oiface = oiface[1:]              output.append(f'oifname {operator} {{{oiface}}}')          else: -            oiface = rule_conf['outbound_interface']['interface_group'] +            oiface = rule_conf['outbound_interface']['group']              if oiface[0] == '!':                  operator = '!='                  oiface = oiface[1:] @@ -395,7 +394,7 @@ def parse_rule(rule_conf, hook, fw_name, rule_id, ip_name):          if 'priority' in rule_conf['vlan']:              output.append(f'vlan pcp {rule_conf["vlan"]["priority"]}') -    if 'log' in rule_conf and rule_conf['log'] == 'enable': +    if 'log' in rule_conf:          action = rule_conf['action'] if 'action' in rule_conf else 'accept'          #output.append(f'log prefix "[{fw_name[:19]}-{rule_id}-{action[:1].upper()}]"')          output.append(f'log prefix "[{family}-{hook}-{fw_name}-{rule_id}-{action[:1].upper()}]"') diff --git a/python/vyos/ifconfig/bond.py b/python/vyos/ifconfig/bond.py index d1d7d48c4..45e6e4c16 100644 --- a/python/vyos/ifconfig/bond.py +++ b/python/vyos/ifconfig/bond.py @@ -92,6 +92,19 @@ class BondIf(Interface):          }      }} +    @staticmethod +    def get_inherit_bond_options() -> list: +        """ +        Returns list of option +        which are inherited from bond interface to member interfaces +        :return: List of interface options +        :rtype: list +        """ +        options = [ +            'mtu' +        ] +        return options +      def remove(self):          """          Remove interface from operating system. Removing the interface diff --git a/python/vyos/ifconfig/ethernet.py b/python/vyos/ifconfig/ethernet.py index 285542057..aa1e87744 100644 --- a/python/vyos/ifconfig/ethernet.py +++ b/python/vyos/ifconfig/ethernet.py @@ -75,6 +75,40 @@ class EthernetIf(Interface):          },      }} +    @staticmethod +    def get_bond_member_allowed_options() -> list: +        """ +        Return list of options which are allowed for changing, +        when interface is a bond member +        :return: List of interface options +        :rtype: list +        """ +        bond_allowed_sections = [ +            'description', +            'disable', +            'disable_flow_control', +            'disable_link_detect', +            'duplex', +            'eapol.ca_certificate', +            'eapol.certificate', +            'eapol.passphrase', +            'mirror.egress', +            'mirror.ingress', +            'offload.gro', +            'offload.gso', +            'offload.lro', +            'offload.rfs', +            'offload.rps', +            'offload.sg', +            'offload.tso', +            'redirect', +            'ring_buffer.rx', +            'ring_buffer.tx', +            'speed', +            'hw_id' +        ] +        return bond_allowed_sections +      def __init__(self, ifname, **kargs):          super().__init__(ifname, **kargs)          self.ethtool = Ethtool(ifname) diff --git a/python/vyos/ifconfig/vxlan.py b/python/vyos/ifconfig/vxlan.py index 1fe5db7cd..8c5a0220e 100644 --- a/python/vyos/ifconfig/vxlan.py +++ b/python/vyos/ifconfig/vxlan.py @@ -56,6 +56,10 @@ class VXLANIf(Interface):      }      _command_set = {**Interface._command_set, **{ +        'neigh_suppress': { +            'validate': lambda v: assert_list(v, ['on', 'off']), +            'shellcmd': 'bridge link set dev {ifname} neigh_suppress {value} learning off', +        },          'vlan_tunnel': {              'validate': lambda v: assert_list(v, ['on', 'off']),              'shellcmd': 'bridge link set dev {ifname} vlan_tunnel {value}', @@ -68,8 +72,8 @@ class VXLANIf(Interface):          # - https://man7.org/linux/man-pages/man8/ip-link.8.html          mapping = {              'group'                      : 'group', -            'external'                   : 'external',              'gpe'                        : 'gpe', +            'parameters.external'        : 'external',              'parameters.ip.df'           : 'df',              'parameters.ip.tos'          : 'tos',              'parameters.ip.ttl'          : 'ttl', @@ -113,6 +117,19 @@ class VXLANIf(Interface):                         'port {port} dev {ifname}'                  self._cmd(cmd.format(**self.config)) +    def set_neigh_suppress(self, state): +        """ +        Controls whether neigh discovery (arp and nd) proxy and suppression +        is enabled on the port. By default this flag is off. +        """ + +        # Determine current OS Kernel neigh_suppress setting - only adjust when needed +        tmp = get_interface_config(self.ifname) +        cur_state = 'on' if dict_search(f'linkinfo.info_slave_data.neigh_suppress', tmp) == True else 'off' +        new_state = 'on' if state else 'off' +        if cur_state != new_state: +            self.set_interface('neigh_suppress', state) +      def set_vlan_vni_mapping(self, state):          """          Controls whether vlan to tunnel mapping is enabled on the port. @@ -163,3 +180,9 @@ class VXLANIf(Interface):          # Enable/Disable VLAN tunnel mapping          # This is only possible after the interface was assigned to the bridge          self.set_vlan_vni_mapping(dict_search('vlan_to_vni', config) != None) + +        # Enable/Disable neighbor suppression and learning, there is no need to +        # explicitly "disable" it, as VXLAN interface will be recreated if anything +        # under "parameters" changes. +        if dict_search('parameters.neighbor_suppress', config) != None: +            self.set_neigh_suppress('on') diff --git a/python/vyos/ifconfig/wireguard.py b/python/vyos/ifconfig/wireguard.py index 4aac103ec..5704f8b64 100644 --- a/python/vyos/ifconfig/wireguard.py +++ b/python/vyos/ifconfig/wireguard.py @@ -167,11 +167,6 @@ class WireGuardIf(Interface):          interface setup code and provide a single point of entry when workin          on any interface. """ -        # remove no longer associated peers first -        if 'peer_remove' in config: -            for peer, public_key in config['peer_remove'].items(): -                self._cmd(f'wg set {self.ifname} peer {public_key} remove') -          tmp_file = NamedTemporaryFile('w')          tmp_file.write(config['private_key'])          tmp_file.flush() diff --git a/python/vyos/nat.py b/python/vyos/nat.py index 9cbc2b96e..392d38772 100644 --- a/python/vyos/nat.py +++ b/python/vyos/nat.py @@ -32,14 +32,34 @@ def parse_nat_rule(rule_conf, rule_id, nat_type, ipv6=False):      translation_str = ''      if 'inbound_interface' in rule_conf: -        ifname = rule_conf['inbound_interface'] -        if ifname != 'any': -            output.append(f'iifname "{ifname}"') +        operator = '' +        if 'name' in rule_conf['inbound_interface']: +            iiface = rule_conf['inbound_interface']['name'] +            if iiface[0] == '!': +                operator = '!=' +                iiface = iiface[1:] +            output.append(f'iifname {operator} {{{iiface}}}') +        else: +            iiface = rule_conf['inbound_interface']['group'] +            if iiface[0] == '!': +                operator = '!=' +                iiface = iiface[1:] +            output.append(f'iifname {operator} @I_{iiface}')      if 'outbound_interface' in rule_conf: -        ifname = rule_conf['outbound_interface'] -        if ifname != 'any': -            output.append(f'oifname "{ifname}"') +        operator = '' +        if 'name' in rule_conf['outbound_interface']: +            oiface = rule_conf['outbound_interface']['name'] +            if oiface[0] == '!': +                operator = '!=' +                oiface = oiface[1:] +            output.append(f'oifname {operator} {{{oiface}}}') +        else: +            oiface = rule_conf['outbound_interface']['group'] +            if oiface[0] == '!': +                operator = '!=' +                oiface = oiface[1:] +            output.append(f'oifname {operator} @I_{oiface}')      if 'protocol' in rule_conf and rule_conf['protocol'] != 'all':          protocol = rule_conf['protocol'] @@ -150,7 +170,7 @@ def parse_nat_rule(rule_conf, rule_id, nat_type, ipv6=False):              operator = ''              if addr_prefix[:1] == '!':                  operator = '!=' -                addr_prefix = addr[1:] +                addr_prefix = addr_prefix[1:]              output.append(f'ip6 {prefix}addr {operator} {addr_prefix}')          port = dict_search_args(side_conf, 'port') diff --git a/python/vyos/qos/trafficshaper.py b/python/vyos/qos/trafficshaper.py index c63c7cf39..0d5f9a8a1 100644 --- a/python/vyos/qos/trafficshaper.py +++ b/python/vyos/qos/trafficshaper.py @@ -1,4 +1,4 @@ -# Copyright 2022 VyOS maintainers and contributors <maintainers@vyos.io> +# Copyright 2022-2023 VyOS maintainers and contributors <maintainers@vyos.io>  #  # This library is free software; you can redistribute it and/or  # modify it under the terms of the GNU Lesser General Public @@ -89,6 +89,10 @@ class TrafficShaper(QoSBase):                  if 'priority' in cls_config:                      priority = cls_config['priority']                      tmp += f' prio {priority}' + +                if 'ceiling' in cls_config: +                    f_ceil = self._rate_convert(cls_config['ceiling']) +                    tmp += f' ceil {f_ceil}'                  self._cmd(tmp)                  tmp = f'tc qdisc replace dev {self._interface} parent {self._parent:x}:{cls:x} sfq' @@ -102,6 +106,9 @@ class TrafficShaper(QoSBase):                  if 'priority' in config['default']:                      priority = config['default']['priority']                      tmp += f' prio {priority}' +                if 'ceiling' in config['default']: +                    f_ceil = self._rate_convert(config['default']['ceiling']) +                    tmp += f' ceil {f_ceil}'                  self._cmd(tmp)                  tmp = f'tc qdisc replace dev {self._interface} parent {self._parent:x}:{default_minor_id:x} sfq' diff --git a/python/vyos/template.py b/python/vyos/template.py index 3be486cc4..c778d0de8 100644 --- a/python/vyos/template.py +++ b/python/vyos/template.py @@ -582,10 +582,11 @@ def nft_rule(rule_conf, fw_hook, fw_name, rule_id, ip_name='ip'):  def nft_default_rule(fw_conf, fw_name, ipv6=False):      output = ['counter']      default_action = fw_conf['default_action'] +    family = 'ipv6' if ipv6 else 'ipv4'      if 'enable_default_log' in fw_conf:          action_suffix = default_action[:1].upper() -        output.append(f'log prefix "[{fw_name[:19]}-default-{action_suffix}]"') +        output.append(f'log prefix "[{family}-{fw_name[:19]}-default-{action_suffix}]"')      #output.append(nft_action(default_action))      output.append(f'{default_action}') diff --git a/python/vyos/utils/dict.py b/python/vyos/utils/dict.py index 9484eacdd..d36b6fcfb 100644 --- a/python/vyos/utils/dict.py +++ b/python/vyos/utils/dict.py @@ -199,6 +199,31 @@ def dict_search_recursive(dict_object, key, path=[]):              for x in dict_search_recursive(j, key, new_path):                  yield x + +def dict_set(key_path, value, dict_object): +    """ Set value to Python dictionary (dict_object) using path to key delimited by dot (.). +        The key will be added if it does not exist. +    """ +    path_list = key_path.split(".") +    dynamic_dict = dict_object +    if len(path_list) > 0: +        for i in range(0, len(path_list)-1): +            dynamic_dict = dynamic_dict[path_list[i]] +        dynamic_dict[path_list[len(path_list)-1]] = value + +def dict_delete(key_path, dict_object): +    """ Delete key in Python dictionary (dict_object) using path to key delimited by dot (.). +    """ +    path_dict = dict_object +    path_list = key_path.split('.') +    inside = path_list[:-1] +    if not inside: +        del dict_object[path_list] +    else: +        for key in path_list[:-1]: +            path_dict = path_dict[key] +        del path_dict[path_list[len(path_list)-1]] +  def dict_to_list(d, save_key_to=None):      """ Convert a dict to a list of dicts. @@ -228,6 +253,39 @@ def dict_to_list(d, save_key_to=None):      return collect +def dict_to_paths_values(conf: dict) -> dict: +    """ +    Convert nested dictionary to simple dictionary, where key is a path is delimited by dot (.). +    """ +    list_of_paths = [] +    dict_of_options ={} +    for path in dict_to_key_paths(conf): +        str_path = '.'.join(path) +        list_of_paths.append(str_path) + +    for path in list_of_paths: +        dict_of_options[path] = dict_search(path,conf) + +    return dict_of_options +def dict_to_key_paths(d: dict) -> list: +    """ Generator to return list of key paths from dict of list[str]|str +    """ +    def func(d, path): +        if isinstance(d, dict): +            if not d: +                yield path +            for k, v in d.items(): +                for r in func(v, path + [k]): +                    yield r +        elif isinstance(d, list): +            yield path +        elif isinstance(d, str): +            yield path +        else: +            raise ValueError('object is not a dict of strings/list of strings') +    for r in func(d, []): +        yield r +  def dict_to_paths(d: dict) -> list:      """ Generator to return list of paths from dict of list[str]|str      """ @@ -305,3 +363,4 @@ class FixedDict(dict):          if k not in self._allowed:              raise ConfigError(f'Option "{k}" has no defined default')          super().__setitem__(k, v) + diff --git a/python/vyos/utils/network.py b/python/vyos/utils/network.py index 9354bd495..5d19f256b 100644 --- a/python/vyos/utils/network.py +++ b/python/vyos/utils/network.py @@ -197,6 +197,21 @@ def get_all_vrfs():          data[name] = entry      return data +def interface_list() -> list: +    """ +    Get list of interfaces in system +    :rtype: list +    """ +    return Section.interfaces() + + +def vrf_list() -> list: +    """ +    Get list of VRFs in system +    :rtype: list +    """ +    return list(get_all_vrfs().keys()) +  def mac2eui64(mac, prefix=None):      """      Convert a MAC address to a EUI64 address or, with prefix provided, a full diff --git a/python/vyos/vpp.py b/python/vyos/vpp.py deleted file mode 100644 index 76e5d29c3..000000000 --- a/python/vyos/vpp.py +++ /dev/null @@ -1,315 +0,0 @@ -# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library.  If not, see <http://www.gnu.org/licenses/>. - -from functools import wraps -from pathlib import Path -from re import search as re_search, fullmatch as re_fullmatch, MULTILINE as re_M -from subprocess import run -from time import sleep - -from vpp_papi import VPPApiClient -from vpp_papi import VPPIOError, VPPValueError - - -class VPPControl: -    """Control VPP network stack -    """ - -    class _Decorators: -        """Decorators for VPPControl -        """ - -        @classmethod -        def api_call(cls, decorated_func): -            """Check if API is connected before API call - -            Args: -                decorated_func: function to decorate - -            Raises: -                VPPIOError: Connection to API is not established -            """ - -            @wraps(decorated_func) -            def api_safe_wrapper(cls, *args, **kwargs): -                if not cls.vpp_api_client.transport.connected: -                    raise VPPIOError(2, 'VPP API is not connected') -                return decorated_func(cls, *args, **kwargs) - -            return api_safe_wrapper - -        @classmethod -        def check_retval(cls, decorated_func): -            """Check retval from API response - -            Args: -                decorated_func: function to decorate - -            Raises: -                VPPValueError: raised when retval is not 0 -            """ - -            @wraps(decorated_func) -            def check_retval_wrapper(cls, *args, **kwargs): -                return_value = decorated_func(cls, *args, **kwargs) -                if not return_value.retval == 0: -                    raise VPPValueError( -                        f'VPP API call failed: {return_value.retval}') -                return return_value - -            return check_retval_wrapper - -    def __init__(self, attempts: int = 5, interval: int = 1000) -> None: -        """Create VPP API connection - -        Args: -            attempts (int, optional): attempts to connect. Defaults to 5. -            interval (int, optional): interval between attempts in ms. Defaults to 1000. - -        Raises: -            VPPIOError: Connection to API cannot be established -        """ -        self.vpp_api_client = VPPApiClient() -        # connect with interval -        while attempts: -            try: -                attempts -= 1 -                self.vpp_api_client.connect('vpp-vyos') -                break -            except (ConnectionRefusedError, FileNotFoundError) as err: -                print(f'VPP API connection timeout: {err}') -                sleep(interval / 1000) -        # raise exception if connection was not successful in the end -        if not self.vpp_api_client.transport.connected: -            raise VPPIOError(2, 'Cannot connect to VPP API') - -    def __del__(self) -> None: -        """Disconnect from VPP API (destructor) -        """ -        self.disconnect() - -    def disconnect(self) -> None: -        """Disconnect from VPP API -        """ -        if self.vpp_api_client.transport.connected: -            self.vpp_api_client.disconnect() - -    @_Decorators.check_retval -    @_Decorators.api_call -    def cli_cmd(self, command: str): -        """Send raw CLI command - -        Args: -            command (str): command to send - -        Returns: -            vpp_papi.vpp_serializer.cli_inband_reply: CLI reply class -        """ -        return self.vpp_api_client.api.cli_inband(cmd=command) - -    @_Decorators.api_call -    def get_mac(self, ifname: str) -> str: -        """Find MAC address by interface name in VPP - -        Args: -            ifname (str): interface name inside VPP - -        Returns: -            str: MAC address -        """ -        for iface in self.vpp_api_client.api.sw_interface_dump(): -            if iface.interface_name == ifname: -                return iface.l2_address.mac_string -        return '' - -    @_Decorators.api_call -    def get_sw_if_index(self, ifname: str) -> int | None: -        """Find interface index by interface name in VPP - -        Args: -            ifname (str): interface name inside VPP - -        Returns: -            int | None: Interface index or None (if was not fount) -        """ -        for iface in self.vpp_api_client.api.sw_interface_dump(): -            if iface.interface_name == ifname: -                return iface.sw_if_index -        return None - -    @_Decorators.check_retval -    @_Decorators.api_call -    def lcp_pair_add(self, iface_name_vpp: str, iface_name_kernel: str) -> None: -        """Create LCP interface pair between VPP and kernel - -        Args: -            iface_name_vpp (str): interface name in VPP -            iface_name_kernel (str): interface name in kernel -        """ -        iface_index = self.get_sw_if_index(iface_name_vpp) -        if iface_index: -            return self.vpp_api_client.api.lcp_itf_pair_add_del( -                is_add=True, -                sw_if_index=iface_index, -                host_if_name=iface_name_kernel) - -    @_Decorators.check_retval -    @_Decorators.api_call -    def lcp_pair_del(self, iface_name_vpp: str, iface_name_kernel: str) -> None: -        """Delete LCP interface pair between VPP and kernel - -        Args: -            iface_name_vpp (str): interface name in VPP -            iface_name_kernel (str): interface name in kernel -        """ -        iface_index = self.get_sw_if_index(iface_name_vpp) -        if iface_index: -            return self.vpp_api_client.api.lcp_itf_pair_add_del( -                is_add=False, -                sw_if_index=iface_index, -                host_if_name=iface_name_kernel) - -    @_Decorators.check_retval -    @_Decorators.api_call -    def iface_rxmode(self, iface_name: str, rx_mode: str) -> None: -        """Set interface rx-mode in VPP - -        Args: -            iface_name (str): interface name in VPP -            rx_mode (str): mode (polling, interrupt, adaptive) -        """ -        modes_dict: dict[str, int] = { -            'polling': 1, -            'interrupt': 2, -            'adaptive': 3 -        } -        if rx_mode not in modes_dict: -            raise VPPValueError(f'Mode {rx_mode} is not known') -        iface_index = self.get_sw_if_index(iface_name) -        return self.vpp_api_client.api.sw_interface_set_rx_mode( -            sw_if_index=iface_index, mode=modes_dict[rx_mode]) - -    @_Decorators.api_call -    def get_pci_addr(self, ifname: str) -> str: -        """Find PCI address of interface by interface name in VPP - -        Args: -            ifname (str): interface name inside VPP - -        Returns: -            str: PCI address -        """ -        hw_info = self.cli_cmd(f'show hardware-interfaces {ifname}').reply - -        regex_filter = r'^\s+pci: device (?P<device>\w+:\w+) subsystem (?P<subsystem>\w+:\w+) address (?P<address>\w+:\w+:\w+\.\w+) numa (?P<numa>\w+)$' -        re_obj = re_search(regex_filter, hw_info, re_M) - -        # return empty string if no interface or no PCI info was found -        if not hw_info or not re_obj: -            return '' - -        address = re_obj.groupdict().get('address', '') - -        # we need to modify address to math kernel style -        # for example: 0000:06:14.00 -> 0000:06:14.0 -        address_chunks: list[str] = address.split('.') -        address_normalized: str = f'{address_chunks[0]}.{int(address_chunks[1])}' - -        return address_normalized - - -class HostControl: -    """Control Linux host -    """ - -    @staticmethod -    def pci_rescan(pci_addr: str = '') -> None: -        """Rescan PCI device by removing it and rescan PCI bus - -        If PCI address is not defined - just rescan PCI bus - -        Args: -            address (str, optional): PCI address of device. Defaults to ''. -        """ -        if pci_addr: -            device_file = Path(f'/sys/bus/pci/devices/{pci_addr}/remove') -            if device_file.exists(): -                device_file.write_text('1') -                # wait 10 seconds max until device will be removed -                attempts = 100 -                while device_file.exists() and attempts: -                    attempts -= 1 -                    sleep(0.1) -                if device_file.exists(): -                    raise TimeoutError( -                        f'Timeout was reached for removing PCI device {pci_addr}' -                    ) -            else: -                raise FileNotFoundError(f'PCI device {pci_addr} does not exist') -        rescan_file = Path('/sys/bus/pci/rescan') -        rescan_file.write_text('1') -        if pci_addr: -            # wait 10 seconds max until device will be installed -            attempts = 100 -            while not device_file.exists() and attempts: -                attempts -= 1 -                sleep(0.1) -            if not device_file.exists(): -                raise TimeoutError( -                    f'Timeout was reached for installing PCI device {pci_addr}') - -    @staticmethod -    def get_eth_name(pci_addr: str) -> str: -        """Find Ethernet interface name by PCI address - -        Args: -            pci_addr (str): PCI address - -        Raises: -            FileNotFoundError: no Ethernet interface was found - -        Returns: -            str: Ethernet interface name -        """ -        # find all PCI devices with eth* names -        net_devs: dict[str, str] = {} -        net_devs_dir = Path('/sys/class/net') -        regex_filter = r'^/sys/devices/pci[\w/:\.]+/(?P<pci_addr>\w+:\w+:\w+\.\w+)/[\w/:\.]+/(?P<iface_name>eth\d+)$' -        for dir in net_devs_dir.iterdir(): -            real_dir: str = dir.resolve().as_posix() -            re_obj = re_fullmatch(regex_filter, real_dir) -            if re_obj: -                iface_name: str = re_obj.group('iface_name') -                iface_addr: str = re_obj.group('pci_addr') -                net_devs.update({iface_addr: iface_name}) -        # match to provided PCI address and return a name if found -        if pci_addr in net_devs: -            return net_devs[pci_addr] -        # raise error if device was not found -        raise FileNotFoundError( -            f'PCI device {pci_addr} not found in ethernet interfaces') - -    @staticmethod -    def rename_iface(name_old: str, name_new: str) -> None: -        """Rename interface - -        Args: -            name_old (str): old name -            name_new (str): new name -        """ -        rename_cmd: list[str] = [ -            'ip', 'link', 'set', name_old, 'name', name_new -        ] -        run(rename_cmd) | 
