summaryrefslogtreecommitdiff
path: root/python/vyos/utils/network.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/vyos/utils/network.py')
-rw-r--r--python/vyos/utils/network.py106
1 files changed, 97 insertions, 9 deletions
diff --git a/python/vyos/utils/network.py b/python/vyos/utils/network.py
index 9354bd495..cac59475d 100644
--- a/python/vyos/utils/network.py
+++ b/python/vyos/utils/network.py
@@ -61,14 +61,17 @@ def get_vrf_members(vrf: str) -> list:
"""
import json
from vyos.utils.process import cmd
- if not interface_exists(vrf):
- raise ValueError(f'VRF "{vrf}" does not exist!')
- output = cmd(f'ip --json --brief link show master {vrf}')
- answer = json.loads(output)
interfaces = []
- for data in answer:
- if 'ifname' in data:
- interfaces.append(data.get('ifname'))
+ try:
+ if not interface_exists(vrf):
+ raise ValueError(f'VRF "{vrf}" does not exist!')
+ output = cmd(f'ip --json --brief link show vrf {vrf}')
+ answer = json.loads(output)
+ for data in answer:
+ if 'ifname' in data:
+ interfaces.append(data.get('ifname'))
+ except:
+ pass
return interfaces
def get_interface_vrf(interface):
@@ -156,7 +159,9 @@ def is_wwan_connected(interface):
""" Determine if a given WWAN interface, e.g. wwan0 is connected to the
carrier network or not """
import json
+ from vyos.utils.dict import dict_search
from vyos.utils.process import cmd
+ from vyos.utils.process import is_systemd_service_active
if not interface.startswith('wwan'):
raise ValueError(f'Specified interface "{interface}" is not a WWAN interface')
@@ -197,6 +202,22 @@ def get_all_vrfs():
data[name] = entry
return data
+def interface_list() -> list:
+ from vyos.ifconfig import Section
+ """
+ Get list of interfaces in system
+ :rtype: list
+ """
+ return Section.interfaces()
+
+
+def vrf_list() -> list:
+ """
+ Get list of VRFs in system
+ :rtype: list
+ """
+ return list(get_all_vrfs().keys())
+
def mac2eui64(mac, prefix=None):
"""
Convert a MAC address to a EUI64 address or, with prefix provided, a full
@@ -289,7 +310,7 @@ def is_ipv6_link_local(addr):
return False
-def is_addr_assigned(ip_address, vrf=None) -> bool:
+def is_addr_assigned(ip_address, vrf=None, return_ifname=False) -> bool | str:
""" Verify if the given IPv4/IPv6 address is assigned to any interface """
from netifaces import interfaces
from vyos.utils.network import get_interface_config
@@ -304,7 +325,7 @@ def is_addr_assigned(ip_address, vrf=None) -> bool:
continue
if is_intf_addr_assigned(interface, ip_address):
- return True
+ return interface if return_ifname else True
return False
@@ -468,3 +489,70 @@ def get_vxlan_vlan_tunnels(interface: str) -> list:
os_configured_vlan_ids.append(str(vlanStart))
return os_configured_vlan_ids
+
+def get_vxlan_vni_filter(interface: str) -> list:
+ """ Return a list of strings with VNIs configured in the Kernel"""
+ from json import loads
+ from vyos.utils.process import cmd
+
+ if not interface.startswith('vxlan'):
+ raise ValueError('Only applicable for VXLAN interfaces!')
+
+ # Determine current OS Kernel configured VNI filters in VXLAN interface
+ #
+ # $ bridge -j vni show dev vxlan1
+ # [{"ifname":"vxlan1","vnis":[{"vni":100},{"vni":200},{"vni":300,"vniEnd":399}]}]
+ #
+ # Example output: ['10010', '10020', '10021', '10022']
+ os_configured_vnis = []
+ tmp = loads(cmd(f'bridge --json vni show dev {interface}'))
+ if tmp:
+ for tunnel in tmp[0].get('vnis', {}):
+ vniStart = tunnel['vni']
+ if 'vniEnd' in tunnel:
+ vniEnd = tunnel['vniEnd']
+ # Build a real list for user VNIs
+ vni_list = list(range(vniStart, vniEnd +1))
+ # Convert list of integers to list or strings
+ os_configured_vnis.extend(map(str, vni_list))
+ # Proceed with next tunnel - this one is complete
+ continue
+
+ # Add single tunel id - not part of a range
+ os_configured_vnis.append(str(vniStart))
+
+ return os_configured_vnis
+
+# Calculate prefix length of an IPv6 range, where possible
+# Python-ified from source: https://gitlab.isc.org/isc-projects/dhcp/-/blob/master/keama/confparse.c#L4591
+def ipv6_prefix_length(low, high):
+ import socket
+
+ bytemasks = [0x80, 0xc0, 0xe0, 0xf0, 0xf8, 0xfc, 0xfe, 0xff]
+
+ try:
+ lo = bytearray(socket.inet_pton(socket.AF_INET6, low))
+ hi = bytearray(socket.inet_pton(socket.AF_INET6, high))
+ except:
+ return None
+
+ xor = bytearray(a ^ b for a, b in zip(lo, hi))
+
+ plen = 0
+ while plen < 128 and xor[plen // 8] == 0:
+ plen += 8
+
+ if plen == 128:
+ return plen
+
+ for i in range((plen // 8) + 1, 16):
+ if xor[i] != 0:
+ return None
+
+ for i in range(8):
+ msk = ~xor[plen // 8] & 0xff
+
+ if msk == bytemasks[i]:
+ return plen + i + 1
+
+ return None