summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Breunig <christian@breunig.cc>2023-09-03 14:34:39 +0200
committerChristian Breunig <christian@breunig.cc>2023-09-03 14:34:42 +0200
commit00a9f6a39f556ee6eb8ac669dd86f5095c21b22f (patch)
treef9a796b0a5a8ac3a3994cb7babddcf4c580bbdd7
parent114f8a9a66e49449e09ac3a1721db42626e54212 (diff)
downloadvyos-1x-00a9f6a39f556ee6eb8ac669dd86f5095c21b22f.tar.gz
vyos-1x-00a9f6a39f556ee6eb8ac669dd86f5095c21b22f.zip
netns: T5241: improve get_interface_namespace() robustness
-rw-r--r--python/vyos/utils/network.py78
-rw-r--r--python/vyos/utils/process.py2
2 files changed, 28 insertions, 52 deletions
diff --git a/python/vyos/utils/network.py b/python/vyos/utils/network.py
index 8b4385cfb..4706dbfb7 100644
--- a/python/vyos/utils/network.py
+++ b/python/vyos/utils/network.py
@@ -96,23 +96,23 @@ def get_interface_address(interface):
tmp = loads(cmd(f'ip --detail --json addr show dev {interface}'))[0]
return tmp
-def get_interface_namespace(iface):
+def get_interface_namespace(interface: str):
"""
Returns wich netns the interface belongs to
"""
from json import loads
from vyos.utils.process import cmd
- # Check if netns exist
- tmp = loads(cmd(f'ip --json netns ls'))
- if len(tmp) == 0:
- return None
- for ns in tmp:
+ # Bail out early if netns does not exist
+ tmp = cmd(f'ip --json netns ls')
+ if not tmp: return None
+
+ for ns in loads(tmp):
netns = f'{ns["name"]}'
# Search interface in each netns
data = loads(cmd(f'ip netns exec {netns} ip --json link show'))
for tmp in data:
- if iface == tmp["ifname"]:
+ if interface == tmp["ifname"]:
return netns
def is_wwan_connected(interface):
@@ -271,57 +271,33 @@ def is_addr_assigned(ip_address, vrf=None) -> bool:
return False
-def is_intf_addr_assigned(intf, address) -> bool:
+def is_intf_addr_assigned(ifname: str, addr: str, netns: str=None) -> bool:
"""
Verify if the given IPv4/IPv6 address is assigned to specific interface.
It can check both a single IP address (e.g. 192.0.2.1 or a assigned CIDR
address 192.0.2.1/24.
"""
- from vyos.template import is_ipv4
-
- from netifaces import ifaddresses
- from netifaces import AF_INET
- from netifaces import AF_INET6
-
- # check if the requested address type is configured at all
- # {
- # 17: [{'addr': '08:00:27:d9:5b:04', 'broadcast': 'ff:ff:ff:ff:ff:ff'}],
- # 2: [{'addr': '10.0.2.15', 'netmask': '255.255.255.0', 'broadcast': '10.0.2.255'}],
- # 10: [{'addr': 'fe80::a00:27ff:fed9:5b04%eth0', 'netmask': 'ffff:ffff:ffff:ffff::'}]
- # }
- try:
- addresses = ifaddresses(intf)
- except ValueError as e:
- print(e)
- return False
-
- # determine IP version (AF_INET or AF_INET6) depending on passed address
- addr_type = AF_INET if is_ipv4(address) else AF_INET6
-
- # Check every IP address on this interface for a match
- netmask = None
- if '/' in address:
- address, netmask = address.split('/')
- for ip in addresses.get(addr_type, []):
- # ip can have the interface name in the 'addr' field, we need to remove it
- # {'addr': 'fe80::a00:27ff:fec5:f821%eth2', 'netmask': 'ffff:ffff:ffff:ffff::'}
- ip_addr = ip['addr'].split('%')[0]
-
- if not _are_same_ip(address, ip_addr):
- continue
-
- # we do not have a netmask to compare against, they are the same
- if not netmask:
- return True
+ import json
+ import jmespath
- prefixlen = ''
- if is_ipv4(ip_addr):
- prefixlen = sum([bin(int(_)).count('1') for _ in ip['netmask'].split('.')])
- else:
- prefixlen = sum([bin(int(_,16)).count('1') for _ in ip['netmask'].split('/')[0].split(':') if _])
+ from vyos.utils.process import rc_cmd
+ from ipaddress import ip_interface
- if str(prefixlen) == netmask:
- return True
+ netns_cmd = f'ip netns exec {netns}' if netns else ''
+ rc, out = rc_cmd(f'{netns_cmd} ip --json address show dev {ifname}')
+ if rc == 0:
+ json_out = json.loads(out)
+ addresses = jmespath.search("[].addr_info[].{family: family, address: local, prefixlen: prefixlen}", json_out)
+ for address_info in addresses:
+ family = address_info['family']
+ address = address_info['address']
+ prefixlen = address_info['prefixlen']
+ # Remove the interface name if present in the given address
+ if '%' in addr:
+ addr = addr.split('%')[0]
+ interface = ip_interface(f"{address}/{prefixlen}")
+ if ip_interface(addr) == interface or address == addr:
+ return True
return False
diff --git a/python/vyos/utils/process.py b/python/vyos/utils/process.py
index e09c7d86d..c2ef98140 100644
--- a/python/vyos/utils/process.py
+++ b/python/vyos/utils/process.py
@@ -170,7 +170,7 @@ def rc_cmd(command, flag='', shell=None, input=None, timeout=None, env=None,
(1, 'Device "eth99" does not exist.')
"""
out, code = popen(
- command, flag,
+ command.lstrip(), flag,
stdout=stdout, stderr=stderr,
input=input, timeout=timeout,
env=env, shell=shell,