diff options
Diffstat (limited to 'python/vyos/utils')
-rw-r--r-- | python/vyos/utils/network.py | 128 | ||||
-rw-r--r-- | python/vyos/utils/process.py | 2 |
2 files changed, 70 insertions, 60 deletions
diff --git a/python/vyos/utils/network.py b/python/vyos/utils/network.py index 2f181d8d9..55ff29f0c 100644 --- a/python/vyos/utils/network.py +++ b/python/vyos/utils/network.py @@ -40,13 +40,19 @@ def interface_exists(interface) -> bool: import os return os.path.exists(f'/sys/class/net/{interface}') -def interface_exists_in_netns(interface_name, netns): +def is_netns_interface(interface, netns): from vyos.utils.process import rc_cmd - rc, out = rc_cmd(f'ip netns exec {netns} ip link show dev {interface_name}') + rc, out = rc_cmd(f'sudo ip netns exec {netns} ip link show dev {interface}') if rc == 0: return True return False +def get_netns_all() -> list: + from json import loads + from vyos.utils.process import cmd + tmp = loads(cmd('ip --json netns ls')) + return [ netns['name'] for netns in tmp ] + def get_vrf_members(vrf: str) -> list: """ Get list of interface VRF members @@ -78,8 +84,7 @@ def get_interface_config(interface): """ Returns the used encapsulation protocol for given interface. If interface does not exist, None is returned. """ - import os - if not os.path.exists(f'/sys/class/net/{interface}'): + if not interface_exists(interface): return None from json import loads from vyos.utils.process import cmd @@ -90,33 +95,63 @@ def get_interface_address(interface): """ Returns the used encapsulation protocol for given interface. If interface does not exist, None is returned. """ - import os - if not os.path.exists(f'/sys/class/net/{interface}'): + if not interface_exists(interface): return None from json import loads from vyos.utils.process import cmd tmp = loads(cmd(f'ip --detail --json addr show dev {interface}'))[0] return tmp -def get_interface_namespace(iface): +def get_interface_namespace(interface: str): """ Returns wich netns the interface belongs to """ from json import loads from vyos.utils.process import cmd - # Check if netns exist - tmp = loads(cmd(f'ip --json netns ls')) - if len(tmp) == 0: - return None - for ns in tmp: + # Bail out early if netns does not exist + tmp = cmd(f'ip --json netns ls') + if not tmp: return None + + for ns in loads(tmp): netns = f'{ns["name"]}' # Search interface in each netns data = loads(cmd(f'ip netns exec {netns} ip --json link show')) for tmp in data: - if iface == tmp["ifname"]: + if interface == tmp["ifname"]: return netns +def is_ipv6_tentative(iface: str, ipv6_address: str) -> bool: + """Check if IPv6 address is in tentative state. + + This function checks if an IPv6 address on a specific network interface is + in the tentative state. IPv6 tentative addresses are not fully configured + and are undergoing Duplicate Address Detection (DAD) to ensure they are + unique on the network. + + Args: + iface (str): The name of the network interface. + ipv6_address (str): The IPv6 address to check. + + Returns: + bool: True if the IPv6 address is tentative, False otherwise. + """ + import json + from vyos.utils.process import rc_cmd + + rc, out = rc_cmd(f'ip -6 --json address show dev {iface} scope global') + if rc: + return False + + data = json.loads(out) + for addr_info in data[0]['addr_info']: + if ( + addr_info.get('local') == ipv6_address and + addr_info.get('tentative', False) + ): + return True + return False + def is_wwan_connected(interface): """ Determine if a given WWAN interface, e.g. wwan0 is connected to the carrier network or not """ @@ -141,8 +176,7 @@ def is_wwan_connected(interface): def get_bridge_fdb(interface): """ Returns the forwarding database entries for a given interface """ - import os - if not os.path.exists(f'/sys/class/net/{interface}'): + if not interface_exists(interface): return None from json import loads from vyos.utils.process import cmd @@ -274,57 +308,33 @@ def is_addr_assigned(ip_address, vrf=None) -> bool: return False -def is_intf_addr_assigned(intf, address) -> bool: +def is_intf_addr_assigned(ifname: str, addr: str, netns: str=None) -> bool: """ Verify if the given IPv4/IPv6 address is assigned to specific interface. It can check both a single IP address (e.g. 192.0.2.1 or a assigned CIDR address 192.0.2.1/24. """ - from vyos.template import is_ipv4 - - from netifaces import ifaddresses - from netifaces import AF_INET - from netifaces import AF_INET6 - - # check if the requested address type is configured at all - # { - # 17: [{'addr': '08:00:27:d9:5b:04', 'broadcast': 'ff:ff:ff:ff:ff:ff'}], - # 2: [{'addr': '10.0.2.15', 'netmask': '255.255.255.0', 'broadcast': '10.0.2.255'}], - # 10: [{'addr': 'fe80::a00:27ff:fed9:5b04%eth0', 'netmask': 'ffff:ffff:ffff:ffff::'}] - # } - try: - addresses = ifaddresses(intf) - except ValueError as e: - print(e) - return False - - # determine IP version (AF_INET or AF_INET6) depending on passed address - addr_type = AF_INET if is_ipv4(address) else AF_INET6 - - # Check every IP address on this interface for a match - netmask = None - if '/' in address: - address, netmask = address.split('/') - for ip in addresses.get(addr_type, []): - # ip can have the interface name in the 'addr' field, we need to remove it - # {'addr': 'fe80::a00:27ff:fec5:f821%eth2', 'netmask': 'ffff:ffff:ffff:ffff::'} - ip_addr = ip['addr'].split('%')[0] - - if not _are_same_ip(address, ip_addr): - continue - - # we do not have a netmask to compare against, they are the same - if not netmask: - return True + import json + import jmespath - prefixlen = '' - if is_ipv4(ip_addr): - prefixlen = sum([bin(int(_)).count('1') for _ in ip['netmask'].split('.')]) - else: - prefixlen = sum([bin(int(_,16)).count('1') for _ in ip['netmask'].split('/')[0].split(':') if _]) + from vyos.utils.process import rc_cmd + from ipaddress import ip_interface - if str(prefixlen) == netmask: - return True + netns_cmd = f'ip netns exec {netns}' if netns else '' + rc, out = rc_cmd(f'{netns_cmd} ip --json address show dev {ifname}') + if rc == 0: + json_out = json.loads(out) + addresses = jmespath.search("[].addr_info[].{family: family, address: local, prefixlen: prefixlen}", json_out) + for address_info in addresses: + family = address_info['family'] + address = address_info['address'] + prefixlen = address_info['prefixlen'] + # Remove the interface name if present in the given address + if '%' in addr: + addr = addr.split('%')[0] + interface = ip_interface(f"{address}/{prefixlen}") + if ip_interface(addr) == interface or address == addr: + return True return False diff --git a/python/vyos/utils/process.py b/python/vyos/utils/process.py index e09c7d86d..c2ef98140 100644 --- a/python/vyos/utils/process.py +++ b/python/vyos/utils/process.py @@ -170,7 +170,7 @@ def rc_cmd(command, flag='', shell=None, input=None, timeout=None, env=None, (1, 'Device "eth99" does not exist.') """ out, code = popen( - command, flag, + command.lstrip(), flag, stdout=stdout, stderr=stderr, input=input, timeout=timeout, env=env, shell=shell, |