diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/vyos/vpp.py | 79 |
1 files changed, 75 insertions, 4 deletions
diff --git a/python/vyos/vpp.py b/python/vyos/vpp.py index d60ecc1b3..cf0d27eb1 100644 --- a/python/vyos/vpp.py +++ b/python/vyos/vpp.py @@ -13,20 +13,57 @@ # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see <http://www.gnu.org/licenses/>. +from functools import wraps +from pathlib import Path from re import search as re_search, MULTILINE as re_M +from time import sleep from vpp_papi import VPPApiClient +from vpp_papi import VPPIOError class VPPControl: """Control VPP network stack """ - def __init__(self) -> None: + class _Decorators: + """Decorators for VPPControl + """ + + @classmethod + def api_call(cls, decorated_func): + + @wraps(decorated_func) + def api_safe_wrapper(cls, *args, **kwargs): + if not cls.vpp_api_client.transport.connected: + raise VPPIOError(2, 'VPP API is not connected') + return decorated_func(cls, *args, **kwargs) + + return api_safe_wrapper + + def __init__(self, attempts: int = 5, interval: int = 1000) -> None: """Create VPP API connection + + Args: + attempts (int, optional): attempts to connect. Defaults to 5. + interval (int, optional): interval between attempts in ms. Defaults to 1000. + + Raises: + VPPIOError: Connection to API cannot be established """ self.vpp_api_client = VPPApiClient() - self.vpp_api_client.connect('vpp-vyos') + # connect with interval + while attempts: + try: + attempts -= 1 + self.vpp_api_client.connect('vpp-vyos') + break + except (ConnectionRefusedError, FileNotFoundError) as err: + print(f'VPP API connection timeout: {err}') + sleep(interval / 1000) + # raise exception if connection was not successful in the end + if not self.vpp_api_client.transport.connected: + raise VPPIOError(2, 'Cannot connect to VPP API') def __del__(self) -> None: """Disconnect from VPP API (destructor) @@ -36,8 +73,10 @@ class VPPControl: def disconnect(self) -> None: """Disconnect from VPP API """ - self.vpp_api_client.disconnect() + if self.vpp_api_client.transport.connected: + self.vpp_api_client.disconnect() + @_Decorators.api_call def cli_cmd(self, command: str, return_output: bool = False) -> str: """Send raw CLI command @@ -53,6 +92,7 @@ class VPPControl: return cli_answer.reply return '' + @_Decorators.api_call def get_mac(self, ifname: str) -> str: """Find MAC address by interface name in VPP @@ -67,6 +107,7 @@ class VPPControl: return iface.l2_address.mac_string return '' + @_Decorators.api_call def get_sw_if_index(self, ifname: str) -> int | None: """Find interface index by interface name in VPP @@ -81,6 +122,7 @@ class VPPControl: return iface.sw_if_index return None + @_Decorators.api_call def lcp_pair_add(self, iface_name_vpp: str, iface_name_kernel: str) -> None: """Create LCP interface pair between VPP and kernel @@ -95,6 +137,7 @@ class VPPControl: sw_if_index=iface_index, host_if_name=iface_name_kernel) + @_Decorators.api_call def lcp_pair_del(self, iface_name_vpp: str, iface_name_kernel: str) -> None: """Delete LCP interface pair between VPP and kernel @@ -109,6 +152,7 @@ class VPPControl: sw_if_index=iface_index, host_if_name=iface_name_kernel) + @_Decorators.api_call def iface_rxmode(self, iface_name: str, rx_mode: str) -> None: """Set interface rx-mode in VPP @@ -127,6 +171,7 @@ class VPPControl: self.vpp_api_client.api.sw_interface_set_rx_mode( sw_if_index=iface_index, mode=modes_dict[rx_mode]) + @_Decorators.api_call def get_pci_addr(self, ifname: str) -> str: """Find PCI address of interface by interface name in VPP @@ -147,4 +192,30 @@ class VPPControl: return '' address = re_obj.groupdict().get('address', '') - return address + + # we need to modify address to math kernel style + # for example: 0000:06:14.00 -> 0000:06:14.0 + address_chunks: list[str] | Any = address.split('.') + address_normalized: str = f'{address_chunks[0]}.{int(address_chunks[1])}' + + return address_normalized + + +class HostControl: + """Control Linux host + """ + + def pci_rescan(self, address: str = '') -> None: + """Rescan PCI device by removing it and rescan PCI bus + + If PCI address is not defined - just rescan PCI bus + + Args: + address (str, optional): PCI address of device. Defaults to ''. + """ + if address: + device_file = Path(f'/sys/bus/pci/devices/{address}/remove') + if device_file.exists(): + device_file.write_text('1') + rescan_file = Path('/sys/bus/pci/rescan') + rescan_file.write_text('1') |