summaryrefslogtreecommitdiff
path: root/python/vyos/utils/network.py
diff options
context:
space:
mode:
authorChristian Breunig <christian@breunig.cc>2023-08-06 19:29:50 +0200
committerChristian Breunig <christian@breunig.cc>2023-08-06 20:25:01 +0200
commitd1c4294534dd04f075f89f1bb60736d56fc6c22a (patch)
treeac57287abb0842f5fa00dfa753b17b69e7df1de1 /python/vyos/utils/network.py
parent0d4a19b42d0f0c70cb3ec2119b3d8dbb67efdc75 (diff)
downloadvyos-1x-d1c4294534dd04f075f89f1bb60736d56fc6c22a.tar.gz
vyos-1x-d1c4294534dd04f075f89f1bb60736d56fc6c22a.zip
T5195: move helpers from vyos.validate to vyos.utils package
Diffstat (limited to 'python/vyos/utils/network.py')
-rw-r--r--python/vyos/utils/network.py186
1 files changed, 184 insertions, 2 deletions
diff --git a/python/vyos/utils/network.py b/python/vyos/utils/network.py
index 3786caf26..3f9a3ef4b 100644
--- a/python/vyos/utils/network.py
+++ b/python/vyos/utils/network.py
@@ -13,7 +13,15 @@
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
-import os
+def _are_same_ip(one, two):
+ from socket import AF_INET
+ from socket import AF_INET6
+ from socket import inet_pton
+ from vyos.template import is_ipv4
+ # compare the binary representation of the IP
+ f_one = AF_INET if is_ipv4(one) else AF_INET6
+ s_two = AF_INET if is_ipv4(two) else AF_INET6
+ return inet_pton(f_one, one) == inet_pton(f_one, two)
def get_protocol_by_name(protocol_name):
"""Get protocol number by protocol name
@@ -48,6 +56,7 @@ def get_interface_config(interface):
""" Returns the used encapsulation protocol for given interface.
If interface does not exist, None is returned.
"""
+ import os
if not os.path.exists(f'/sys/class/net/{interface}'):
return None
from json import loads
@@ -59,6 +68,7 @@ def get_interface_address(interface):
""" Returns the used encapsulation protocol for given interface.
If interface does not exist, None is returned.
"""
+ import os
if not os.path.exists(f'/sys/class/net/{interface}'):
return None
from json import loads
@@ -85,7 +95,6 @@ def get_interface_namespace(iface):
if iface == tmp["ifname"]:
return netns
-
def is_wwan_connected(interface):
""" Determine if a given WWAN interface, e.g. wwan0 is connected to the
carrier network or not """
@@ -110,6 +119,7 @@ def is_wwan_connected(interface):
def get_bridge_fdb(interface):
""" Returns the forwarding database entries for a given interface """
+ import os
if not os.path.exists(f'/sys/class/net/{interface}'):
return None
from json import loads
@@ -211,3 +221,175 @@ def is_listen_port_bind_service(port: int, service: str) -> bool:
if service == pid_name and port == pid_port:
return True
return False
+
+def is_ipv6_link_local(addr):
+ """ Check if addrsss is an IPv6 link-local address. Returns True/False """
+ from ipaddress import ip_interface
+ from vyos.template import is_ipv6
+ addr = addr.split('%')[0]
+ if is_ipv6(addr):
+ if ip_interface(addr).is_link_local:
+ return True
+
+ return False
+
+def is_addr_assigned(ip_address, vrf=None) -> bool:
+ """ Verify if the given IPv4/IPv6 address is assigned to any interface """
+ from netifaces import interfaces
+ from vyos.utils.network import get_interface_config
+ from vyos.utils.dict import dict_search
+
+ for interface in interfaces():
+ # Check if interface belongs to the requested VRF, if this is not the
+ # case there is no need to proceed with this data set - continue loop
+ # with next element
+ tmp = get_interface_config(interface)
+ if dict_search('master', tmp) != vrf:
+ continue
+
+ if is_intf_addr_assigned(interface, ip_address):
+ return True
+
+ return False
+
+def is_intf_addr_assigned(intf, address) -> bool:
+ """
+ Verify if the given IPv4/IPv6 address is assigned to specific interface.
+ It can check both a single IP address (e.g. 192.0.2.1 or a assigned CIDR
+ address 192.0.2.1/24.
+ """
+ from vyos.template import is_ipv4
+
+ from netifaces import ifaddresses
+ from netifaces import AF_INET
+ from netifaces import AF_INET6
+
+ # check if the requested address type is configured at all
+ # {
+ # 17: [{'addr': '08:00:27:d9:5b:04', 'broadcast': 'ff:ff:ff:ff:ff:ff'}],
+ # 2: [{'addr': '10.0.2.15', 'netmask': '255.255.255.0', 'broadcast': '10.0.2.255'}],
+ # 10: [{'addr': 'fe80::a00:27ff:fed9:5b04%eth0', 'netmask': 'ffff:ffff:ffff:ffff::'}]
+ # }
+ try:
+ addresses = ifaddresses(intf)
+ except ValueError as e:
+ print(e)
+ return False
+
+ # determine IP version (AF_INET or AF_INET6) depending on passed address
+ addr_type = AF_INET if is_ipv4(address) else AF_INET6
+
+ # Check every IP address on this interface for a match
+ netmask = None
+ if '/' in address:
+ address, netmask = address.split('/')
+ for ip in addresses.get(addr_type, []):
+ # ip can have the interface name in the 'addr' field, we need to remove it
+ # {'addr': 'fe80::a00:27ff:fec5:f821%eth2', 'netmask': 'ffff:ffff:ffff:ffff::'}
+ ip_addr = ip['addr'].split('%')[0]
+
+ if not _are_same_ip(address, ip_addr):
+ continue
+
+ # we do not have a netmask to compare against, they are the same
+ if not netmask:
+ return True
+
+ prefixlen = ''
+ if is_ipv4(ip_addr):
+ prefixlen = sum([bin(int(_)).count('1') for _ in ip['netmask'].split('.')])
+ else:
+ prefixlen = sum([bin(int(_,16)).count('1') for _ in ip['netmask'].split('/')[0].split(':') if _])
+
+ if str(prefixlen) == netmask:
+ return True
+
+ return False
+
+def is_loopback_addr(addr):
+ """ Check if supplied IPv4/IPv6 address is a loopback address """
+ from ipaddress import ip_address
+ return ip_address(addr).is_loopback
+
+def is_wireguard_key_pair(private_key: str, public_key:str) -> bool:
+ """
+ Checks if public/private keys are keypair
+ :param private_key: Wireguard private key
+ :type private_key: str
+ :param public_key: Wireguard public key
+ :type public_key: str
+ :return: If public/private keys are keypair returns True else False
+ :rtype: bool
+ """
+ from vyos.utils.process import cmd
+ gen_public_key = cmd('wg pubkey', input=private_key)
+ if gen_public_key == public_key:
+ return True
+ else:
+ return False
+
+def is_subnet_connected(subnet, primary=False):
+ """
+ Verify is the given IPv4/IPv6 subnet is connected to any interface on this
+ system.
+
+ primary check if the subnet is reachable via the primary IP address of this
+ interface, or in other words has a broadcast address configured. ISC DHCP
+ for instance will complain if it should listen on non broadcast interfaces.
+
+ Return True/False
+ """
+ from ipaddress import ip_address
+ from ipaddress import ip_network
+
+ from netifaces import ifaddresses
+ from netifaces import interfaces
+ from netifaces import AF_INET
+ from netifaces import AF_INET6
+
+ from vyos.template import is_ipv6
+
+ # determine IP version (AF_INET or AF_INET6) depending on passed address
+ addr_type = AF_INET
+ if is_ipv6(subnet):
+ addr_type = AF_INET6
+
+ for interface in interfaces():
+ # check if the requested address type is configured at all
+ if addr_type not in ifaddresses(interface).keys():
+ continue
+
+ # An interface can have multiple addresses, but some software components
+ # only support the primary address :(
+ if primary:
+ ip = ifaddresses(interface)[addr_type][0]['addr']
+ if ip_address(ip) in ip_network(subnet):
+ return True
+ else:
+ # Check every assigned IP address if it is connected to the subnet
+ # in question
+ for ip in ifaddresses(interface)[addr_type]:
+ # remove interface extension (e.g. %eth0) that gets thrown on the end of _some_ addrs
+ addr = ip['addr'].split('%')[0]
+ if ip_address(addr) in ip_network(subnet):
+ return True
+
+ return False
+
+def is_afi_configured(interface, afi):
+ """ Check if given address family is configured, or in other words - an IP
+ address is assigned to the interface. """
+ from netifaces import ifaddresses
+ from netifaces import AF_INET
+ from netifaces import AF_INET6
+
+ if afi not in [AF_INET, AF_INET6]:
+ raise ValueError('Address family must be in [AF_INET, AF_INET6]')
+
+ try:
+ addresses = ifaddresses(interface)
+ except ValueError as e:
+ print(e)
+ return False
+
+ return afi in addresses