diff options
| -rw-r--r-- | python/vyos/vpp.py | 132 | ||||
| -rwxr-xr-x | src/conf_mode/vpp.py | 14 | 
2 files changed, 123 insertions, 23 deletions
| diff --git a/python/vyos/vpp.py b/python/vyos/vpp.py index cf0d27eb1..76e5d29c3 100644 --- a/python/vyos/vpp.py +++ b/python/vyos/vpp.py @@ -15,11 +15,12 @@  from functools import wraps  from pathlib import Path -from re import search as re_search, MULTILINE as re_M +from re import search as re_search, fullmatch as re_fullmatch, MULTILINE as re_M +from subprocess import run  from time import sleep  from vpp_papi import VPPApiClient -from vpp_papi import VPPIOError +from vpp_papi import VPPIOError, VPPValueError  class VPPControl: @@ -32,6 +33,14 @@ class VPPControl:          @classmethod          def api_call(cls, decorated_func): +            """Check if API is connected before API call + +            Args: +                decorated_func: function to decorate + +            Raises: +                VPPIOError: Connection to API is not established +            """              @wraps(decorated_func)              def api_safe_wrapper(cls, *args, **kwargs): @@ -41,6 +50,27 @@ class VPPControl:              return api_safe_wrapper +        @classmethod +        def check_retval(cls, decorated_func): +            """Check retval from API response + +            Args: +                decorated_func: function to decorate + +            Raises: +                VPPValueError: raised when retval is not 0 +            """ + +            @wraps(decorated_func) +            def check_retval_wrapper(cls, *args, **kwargs): +                return_value = decorated_func(cls, *args, **kwargs) +                if not return_value.retval == 0: +                    raise VPPValueError( +                        f'VPP API call failed: {return_value.retval}') +                return return_value + +            return check_retval_wrapper +      def __init__(self, attempts: int = 5, interval: int = 1000) -> None:          """Create VPP API connection @@ -76,21 +106,18 @@ class VPPControl:          if self.vpp_api_client.transport.connected:              self.vpp_api_client.disconnect() +    @_Decorators.check_retval      @_Decorators.api_call -    def cli_cmd(self, command: str, return_output: bool = False) -> str: +    def cli_cmd(self, command: str):          """Send raw CLI command          Args:              command (str): command to send -            return_output (bool, optional): Return command output. Defaults to False.          Returns: -            str: output of the command, only if it was successful +            vpp_papi.vpp_serializer.cli_inband_reply: CLI reply class          """ -        cli_answer = self.vpp_api_client.api.cli_inband(cmd=command) -        if return_output and cli_answer.retval == 0: -            return cli_answer.reply -        return '' +        return self.vpp_api_client.api.cli_inband(cmd=command)      @_Decorators.api_call      def get_mac(self, ifname: str) -> str: @@ -122,6 +149,7 @@ class VPPControl:                  return iface.sw_if_index          return None +    @_Decorators.check_retval      @_Decorators.api_call      def lcp_pair_add(self, iface_name_vpp: str, iface_name_kernel: str) -> None:          """Create LCP interface pair between VPP and kernel @@ -132,11 +160,12 @@ class VPPControl:          """          iface_index = self.get_sw_if_index(iface_name_vpp)          if iface_index: -            self.vpp_api_client.api.lcp_itf_pair_add_del( +            return self.vpp_api_client.api.lcp_itf_pair_add_del(                  is_add=True,                  sw_if_index=iface_index,                  host_if_name=iface_name_kernel) +    @_Decorators.check_retval      @_Decorators.api_call      def lcp_pair_del(self, iface_name_vpp: str, iface_name_kernel: str) -> None:          """Delete LCP interface pair between VPP and kernel @@ -147,11 +176,12 @@ class VPPControl:          """          iface_index = self.get_sw_if_index(iface_name_vpp)          if iface_index: -            self.vpp_api_client.api.lcp_itf_pair_add_del( +            return self.vpp_api_client.api.lcp_itf_pair_add_del(                  is_add=False,                  sw_if_index=iface_index,                  host_if_name=iface_name_kernel) +    @_Decorators.check_retval      @_Decorators.api_call      def iface_rxmode(self, iface_name: str, rx_mode: str) -> None:          """Set interface rx-mode in VPP @@ -166,9 +196,9 @@ class VPPControl:              'adaptive': 3          }          if rx_mode not in modes_dict: -            return +            raise VPPValueError(f'Mode {rx_mode} is not known')          iface_index = self.get_sw_if_index(iface_name) -        self.vpp_api_client.api.sw_interface_set_rx_mode( +        return self.vpp_api_client.api.sw_interface_set_rx_mode(              sw_if_index=iface_index, mode=modes_dict[rx_mode])      @_Decorators.api_call @@ -181,8 +211,7 @@ class VPPControl:          Returns:              str: PCI address          """ -        hw_info = self.cli_cmd(f'show hardware-interfaces {ifname}', -                               return_output=True) +        hw_info = self.cli_cmd(f'show hardware-interfaces {ifname}').reply          regex_filter = r'^\s+pci: device (?P<device>\w+:\w+) subsystem (?P<subsystem>\w+:\w+) address (?P<address>\w+:\w+:\w+\.\w+) numa (?P<numa>\w+)$'          re_obj = re_search(regex_filter, hw_info, re_M) @@ -195,7 +224,7 @@ class VPPControl:          # we need to modify address to math kernel style          # for example: 0000:06:14.00 -> 0000:06:14.0 -        address_chunks: list[str] | Any = address.split('.') +        address_chunks: list[str] = address.split('.')          address_normalized: str = f'{address_chunks[0]}.{int(address_chunks[1])}'          return address_normalized @@ -205,7 +234,8 @@ class HostControl:      """Control Linux host      """ -    def pci_rescan(self, address: str = '') -> None: +    @staticmethod +    def pci_rescan(pci_addr: str = '') -> None:          """Rescan PCI device by removing it and rescan PCI bus          If PCI address is not defined - just rescan PCI bus @@ -213,9 +243,73 @@ class HostControl:          Args:              address (str, optional): PCI address of device. Defaults to ''.          """ -        if address: -            device_file = Path(f'/sys/bus/pci/devices/{address}/remove') +        if pci_addr: +            device_file = Path(f'/sys/bus/pci/devices/{pci_addr}/remove')              if device_file.exists():                  device_file.write_text('1') +                # wait 10 seconds max until device will be removed +                attempts = 100 +                while device_file.exists() and attempts: +                    attempts -= 1 +                    sleep(0.1) +                if device_file.exists(): +                    raise TimeoutError( +                        f'Timeout was reached for removing PCI device {pci_addr}' +                    ) +            else: +                raise FileNotFoundError(f'PCI device {pci_addr} does not exist')          rescan_file = Path('/sys/bus/pci/rescan')          rescan_file.write_text('1') +        if pci_addr: +            # wait 10 seconds max until device will be installed +            attempts = 100 +            while not device_file.exists() and attempts: +                attempts -= 1 +                sleep(0.1) +            if not device_file.exists(): +                raise TimeoutError( +                    f'Timeout was reached for installing PCI device {pci_addr}') + +    @staticmethod +    def get_eth_name(pci_addr: str) -> str: +        """Find Ethernet interface name by PCI address + +        Args: +            pci_addr (str): PCI address + +        Raises: +            FileNotFoundError: no Ethernet interface was found + +        Returns: +            str: Ethernet interface name +        """ +        # find all PCI devices with eth* names +        net_devs: dict[str, str] = {} +        net_devs_dir = Path('/sys/class/net') +        regex_filter = r'^/sys/devices/pci[\w/:\.]+/(?P<pci_addr>\w+:\w+:\w+\.\w+)/[\w/:\.]+/(?P<iface_name>eth\d+)$' +        for dir in net_devs_dir.iterdir(): +            real_dir: str = dir.resolve().as_posix() +            re_obj = re_fullmatch(regex_filter, real_dir) +            if re_obj: +                iface_name: str = re_obj.group('iface_name') +                iface_addr: str = re_obj.group('pci_addr') +                net_devs.update({iface_addr: iface_name}) +        # match to provided PCI address and return a name if found +        if pci_addr in net_devs: +            return net_devs[pci_addr] +        # raise error if device was not found +        raise FileNotFoundError( +            f'PCI device {pci_addr} not found in ethernet interfaces') + +    @staticmethod +    def rename_iface(name_old: str, name_new: str) -> None: +        """Rename interface + +        Args: +            name_old (str): old name +            name_new (str): new name +        """ +        rename_cmd: list[str] = [ +            'ip', 'link', 'set', name_old, 'name', name_new +        ] +        run(rename_cmd) diff --git a/src/conf_mode/vpp.py b/src/conf_mode/vpp.py index 25fe159f8..dd01da87e 100755 --- a/src/conf_mode/vpp.py +++ b/src/conf_mode/vpp.py @@ -53,12 +53,12 @@ def _get_pci_address_by_interface(iface) -> str:              address = re_obj.groupdict().get('address', '')              return address      # use VPP - maybe interface already attached to it -    vpp_control = VPPControl() +    vpp_control = VPPControl(attempts=20, interval=500)      pci_addr = vpp_control.get_pci_addr(iface)      if pci_addr:          return pci_addr -    # return empty string if address was not found -    return '' +    # raise error if PCI address was not found +    raise ConfigError(f'Cannot find PCI address for interface {iface}') @@ -148,8 +148,14 @@ def apply(config):          call('systemctl daemon-reload')          call(f'systemctl restart {service_name}.service') +    # Initialize interfaces removed from VPP      for iface in config.get('removed_ifaces', []): -        HostControl().pci_rescan(iface['iface_pci_addr']) +        host_control = HostControl() +        # rescan PCI to use a proper driver +        host_control.pci_rescan(iface['iface_pci_addr']) +        # rename to the proper name +        iface_new_name: str = host_control.get_eth_name(iface['iface_pci_addr']) +        host_control.rename_iface(iface_new_name, iface['iface_name'])      if 'interface' in config:          # connect to VPP | 
