diff options
| -rw-r--r-- | python/vyos/utils/network.py | 78 | ||||
| -rw-r--r-- | python/vyos/utils/process.py | 2 | 
2 files changed, 28 insertions, 52 deletions
| diff --git a/python/vyos/utils/network.py b/python/vyos/utils/network.py index 8b4385cfb..4706dbfb7 100644 --- a/python/vyos/utils/network.py +++ b/python/vyos/utils/network.py @@ -96,23 +96,23 @@ def get_interface_address(interface):      tmp = loads(cmd(f'ip --detail --json addr show dev {interface}'))[0]      return tmp -def get_interface_namespace(iface): +def get_interface_namespace(interface: str):      """         Returns wich netns the interface belongs to      """      from json import loads      from vyos.utils.process import cmd -    # Check if netns exist -    tmp = loads(cmd(f'ip --json netns ls')) -    if len(tmp) == 0: -        return None -    for ns in tmp: +    # Bail out early if netns does not exist +    tmp = cmd(f'ip --json netns ls') +    if not tmp: return None + +    for ns in loads(tmp):          netns = f'{ns["name"]}'          # Search interface in each netns          data = loads(cmd(f'ip netns exec {netns} ip --json link show'))          for tmp in data: -            if iface == tmp["ifname"]: +            if interface == tmp["ifname"]:                  return netns  def is_wwan_connected(interface): @@ -271,57 +271,33 @@ def is_addr_assigned(ip_address, vrf=None) -> bool:      return False -def is_intf_addr_assigned(intf, address) -> bool: +def is_intf_addr_assigned(ifname: str, addr: str, netns: str=None) -> bool:      """      Verify if the given IPv4/IPv6 address is assigned to specific interface.      It can check both a single IP address (e.g. 192.0.2.1 or a assigned CIDR      address 192.0.2.1/24.      """ -    from vyos.template import is_ipv4 - -    from netifaces import ifaddresses -    from netifaces import AF_INET -    from netifaces import AF_INET6 - -    # check if the requested address type is configured at all -    # { -    # 17: [{'addr': '08:00:27:d9:5b:04', 'broadcast': 'ff:ff:ff:ff:ff:ff'}], -    # 2:  [{'addr': '10.0.2.15', 'netmask': '255.255.255.0', 'broadcast': '10.0.2.255'}], -    # 10: [{'addr': 'fe80::a00:27ff:fed9:5b04%eth0', 'netmask': 'ffff:ffff:ffff:ffff::'}] -    # } -    try: -        addresses = ifaddresses(intf) -    except ValueError as e: -        print(e) -        return False - -    # determine IP version (AF_INET or AF_INET6) depending on passed address -    addr_type = AF_INET if is_ipv4(address) else AF_INET6 - -    # Check every IP address on this interface for a match -    netmask = None -    if '/' in address: -        address, netmask = address.split('/') -    for ip in addresses.get(addr_type, []): -        # ip can have the interface name in the 'addr' field, we need to remove it -        # {'addr': 'fe80::a00:27ff:fec5:f821%eth2', 'netmask': 'ffff:ffff:ffff:ffff::'} -        ip_addr = ip['addr'].split('%')[0] - -        if not _are_same_ip(address, ip_addr): -            continue - -        # we do not have a netmask to compare against, they are the same -        if not netmask: -            return True +    import json +    import jmespath -        prefixlen = '' -        if is_ipv4(ip_addr): -            prefixlen = sum([bin(int(_)).count('1') for _ in ip['netmask'].split('.')]) -        else: -            prefixlen = sum([bin(int(_,16)).count('1') for _ in ip['netmask'].split('/')[0].split(':') if _]) +    from vyos.utils.process import rc_cmd +    from ipaddress import ip_interface -        if str(prefixlen) == netmask: -            return True +    netns_cmd = f'ip netns exec {netns}' if netns else '' +    rc, out = rc_cmd(f'{netns_cmd} ip --json address show dev {ifname}') +    if rc == 0: +        json_out = json.loads(out) +        addresses = jmespath.search("[].addr_info[].{family: family, address: local, prefixlen: prefixlen}", json_out) +        for address_info in addresses: +            family = address_info['family'] +            address = address_info['address'] +            prefixlen = address_info['prefixlen'] +            # Remove the interface name if present in the given address +            if '%' in addr: +                addr = addr.split('%')[0] +            interface = ip_interface(f"{address}/{prefixlen}") +            if ip_interface(addr) == interface or address == addr: +                return True      return False diff --git a/python/vyos/utils/process.py b/python/vyos/utils/process.py index e09c7d86d..c2ef98140 100644 --- a/python/vyos/utils/process.py +++ b/python/vyos/utils/process.py @@ -170,7 +170,7 @@ def rc_cmd(command, flag='', shell=None, input=None, timeout=None, env=None,      (1, 'Device "eth99" does not exist.')      """      out, code = popen( -        command, flag, +        command.lstrip(), flag,          stdout=stdout, stderr=stderr,          input=input, timeout=timeout,          env=env, shell=shell, | 
