summaryrefslogtreecommitdiff
path: root/python/vyos
diff options
context:
space:
mode:
Diffstat (limited to 'python/vyos')
-rw-r--r--python/vyos/configinterface.py153
-rw-r--r--python/vyos/configsession.py9
-rw-r--r--python/vyos/interfaceconfig.py376
-rw-r--r--python/vyos/validate.py78
4 files changed, 596 insertions, 20 deletions
diff --git a/python/vyos/configinterface.py b/python/vyos/configinterface.py
new file mode 100644
index 000000000..0f5b0842c
--- /dev/null
+++ b/python/vyos/configinterface.py
@@ -0,0 +1,153 @@
+# Copyright 2019 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import vyos.validate
+
+def validate_mac_address(addr):
+ # a mac address consits out of 6 octets
+ octets = len(addr.split(':'))
+ if octets != 6:
+ raise ValueError('wrong number of MAC octets: {} '.format(octets))
+
+ # validate against the first mac address byte if it's a multicast address
+ if int(addr.split(':')[0]) & 1:
+ raise ValueError('{} is a multicast MAC address'.format(addr))
+
+ # overall mac address is not allowed to be 00:00:00:00:00:00
+ if sum(int(i, 16) for i in addr.split(':')) == 0:
+ raise ValueError('00:00:00:00:00:00 is not a valid MAC address')
+
+ # check for VRRP mac address
+ if addr.split(':')[0] == '0' and addr.split(':')[1] == '0' and addr.split(':')[2] == '94' and addr.split(':')[3] == '0' and addr.split(':')[4] == '1':
+ raise ValueError('{} is a VRRP MAC address')
+
+ pass
+
+def set_mac_address(intf, addr):
+ """
+ Configure interface mac address using iproute2 command
+ """
+ validate_mac_address(addr)
+
+ os.system('ip link set {} address {}'.format(intf, addr))
+ pass
+
+def set_description(intf, desc):
+ """
+ Sets the interface secription reported usually by SNMP
+ """
+ with open('/sys/class/net/' + intf + '/ifalias', 'w') as f:
+ f.write(desc)
+
+ pass
+
+def set_arp_cache_timeout(intf, tmoMS):
+ """
+ Configure the ARP cache entry timeout in milliseconds
+ """
+ with open('/proc/sys/net/ipv4/neigh/' + intf + '/base_reachable_time_ms', 'w') as f:
+ f.write(tmoMS)
+
+ pass
+
+def set_multicast_querier(intf, enable):
+ """
+ Sets whether the bridge actively runs a multicast querier or not. When a
+ bridge receives a 'multicast host membership' query from another network host,
+ that host is tracked based on the time that the query was received plus the
+ multicast query interval time.
+
+ use enable=1 to enable or enable=0 to disable
+ """
+
+ if int(enable) >= 0 and int(enable) <= 1:
+ with open('/sys/devices/virtual/net/' + intf + '/bridge/multicast_querier', 'w') as f:
+ f.write(str(enable))
+ else:
+ raise ValueError("malformed configuration string on interface {}: enable={}".format(intf, enable))
+
+ pass
+
+def set_link_detect(intf, enable):
+ """
+ 0 - Allow packets to be received for the address on this interface
+ even if interface is disabled or no carrier.
+
+ 1 - Ignore packets received if interface associated with the incoming
+ address is down.
+
+ 2 - Ignore packets received if interface associated with the incoming
+ address is down or has no carrier.
+
+ Kernel Source: Documentation/networking/ip-sysctl.txt
+ """
+
+ # Note can't use sysctl it is broken for vif name because of dots
+ # link_filter values:
+ # 0 - always receive
+ # 1 - ignore receive if admin_down
+ # 2 - ignore receive if admin_down or link down
+
+ with open('/proc/sys/net/ipv4/conf/' + intf + '/link_filter', 'w') as f:
+ if enable == True or enable == 1:
+ f.write('2')
+ if os.path.isfile('/usr/bin/vtysh'):
+ os.system('/usr/bin/vtysh -c "configure terminal" -c "interface {}" -c "link-detect"'.format(intf))
+ else:
+ f.write('1')
+ if os.path.isfile('/usr/bin/vtysh'):
+ os.system('/usr/bin/vtysh -c "configure terminal" -c "interface {}" -c "no link-detect"'.format(intf))
+
+ pass
+
+def add_interface_address(intf, addr):
+ """
+ Configure an interface IPv4/IPv6 address
+ """
+ if addr == "dhcp":
+ os.system('/opt/vyatta/sbin/vyatta-interfaces.pl --dev="{}" --dhcp=start'.format(intf))
+ elif addr == "dhcpv6":
+ os.system('/opt/vyatta/sbin/vyatta-dhcpv6-client.pl --start -ifname "{}"'.format(intf))
+ elif vyos.validate.is_ipv4(addr):
+ if not vyos.validate.is_intf_addr_assigned(intf, addr):
+ print("Assigning {} to {}".format(addr, intf))
+ os.system('sudo ip -4 addr add "{}" broadcast + dev "{}"'.format(addr, intf))
+ elif vyos.validate.is_ipv6(addr):
+ if not vyos.validate.is_intf_addr_assigned(intf, addr):
+ print("Assigning {} to {}".format(addr, intf))
+ os.system('sudo ip -6 addr add "{}" dev "{}"'.format(addr, intf))
+ else:
+ raise ConfigError('{} is not a valid interface address'.format(addr))
+
+ pass
+
+def remove_interface_address(intf, addr):
+ """
+ Remove IPv4/IPv6 address from given interface
+ """
+
+ if addr == "dhcp":
+ os.system('/opt/vyatta/sbin/vyatta-interfaces.pl --dev="{}" --dhcp=stop'.format(intf))
+ elif addr == "dhcpv6":
+ os.system('/opt/vyatta/sbin/vyatta-dhcpv6-client.pl --stop -ifname "{}"'.format(intf))
+ elif vyos.validate.is_ipv4(addr):
+ os.system('ip -4 addr del "{}" dev "{}"'.format(addr, intf))
+ elif vyos.validate.is_ipv6(addr):
+ os.system('ip -6 addr del "{}" dev "{}"'.format(addr, intf))
+ else:
+ raise ConfigError('{} is not a valid interface address'.format(addr))
+
+ pass
diff --git a/python/vyos/configsession.py b/python/vyos/configsession.py
index 78f332d66..8626839f2 100644
--- a/python/vyos/configsession.py
+++ b/python/vyos/configsession.py
@@ -23,6 +23,7 @@ DELETE = '/opt/vyatta/sbin/my_delete'
COMMENT = '/opt/vyatta/sbin/my_comment'
COMMIT = '/opt/vyatta/sbin/my_commit'
DISCARD = '/opt/vyatta/sbin/my_discard'
+SHOW_CONFIG = ['/bin/cli-shell-api', 'showConfig']
# Default "commit via" string
APP = "vyos-http-api"
@@ -116,6 +117,7 @@ class ConfigSession(object):
output = p.stdout.read().decode()
if result != 0:
raise ConfigSessionError(output)
+ return output
def get_session_env(self):
return self.__session_env
@@ -146,3 +148,10 @@ class ConfigSession(object):
def discard(self):
self.__run_command([DISCARD])
+
+ def show_config(self, path, format='raw'):
+ config_data = self.__run_command(SHOW_CONFIG + path)
+
+ if format == 'raw':
+ return config_data
+
diff --git a/python/vyos/interfaceconfig.py b/python/vyos/interfaceconfig.py
new file mode 100644
index 000000000..b8bfb707e
--- /dev/null
+++ b/python/vyos/interfaceconfig.py
@@ -0,0 +1,376 @@
+#!/usr/bin/python3
+
+# Copyright 2019 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+import sys
+import os
+import re
+import json
+import socket
+import subprocess
+
+dhclient_conf_dir = r'/var/lib/dhcp/dhclient_'
+
+class Interface:
+ def __init__(self, ifname=None, type=None):
+ if not ifname:
+ raise Exception("interface name required")
+ if not os.path.exists('/sys/class/net/{0}'.format(ifname)) and not type:
+ raise Exception("interface {0} not found".format(str(ifname)))
+ else:
+ if not os.path.exists('/sys/class/net/{0}'.format(ifname)):
+ try:
+ ret = subprocess.check_output(['ip link add dev ' + str(ifname) + ' type ' + type], stderr=subprocess.STDOUT, shell=True).decode()
+ except subprocess.CalledProcessError as e:
+ if self._debug():
+ self._debug(e)
+ if "Operation not supported" in str(e.output.decode()):
+ print(str(e.output.decode()))
+ sys.exit(0)
+
+ self._ifname = str(ifname)
+
+
+ @property
+ def mtu(self):
+ return self._mtu
+
+ @mtu.setter
+ def mtu(self, mtu=None):
+ if mtu < 68 or mtu > 9000:
+ raise ValueError("mtu size invalid value")
+ self._mtu = mtu
+ try:
+ ret = subprocess.check_output(['ip link set mtu ' + str(mtu) + ' dev ' + self._ifname], shell=True).decode()
+ except subprocess.CalledProcessError as e:
+ if self._debug():
+ self._debug(e)
+
+
+ @property
+ def macaddr(self):
+ return self._macaddr
+
+ @macaddr.setter
+ def macaddr(self, mac=None):
+ if not re.search('^[a-f0-9:]{17}$', str(mac)):
+ raise ValueError("mac address invalid")
+ self._macaddr = str(mac)
+ try:
+ ret = subprocess.check_output(['ip link set address ' + mac + ' ' + self._ifname], shell=True).decode()
+ except subprocess.CalledProcessError as e:
+ if self._debug():
+ self._debug(e)
+
+ @property
+ def ifalias(self):
+ return self._ifalias
+
+ @ifalias.setter
+ def ifalias(self, ifalias=None):
+ if not ifalias:
+ self._ifalias = self._ifname
+ else:
+ self._ifalias = str(ifalias)
+ open('/sys/class/net/{0}/ifalias'.format(self._ifname),'w').write(self._ifalias)
+
+ @property
+ def linkstate(self):
+ return self._linkstate
+
+ @linkstate.setter
+ def linkstate(self, state='up'):
+ if str(state).lower() == 'up' or str(state).lower() == 'down':
+ self._linkstate = str(state).lower()
+ else:
+ self._linkstate = 'up'
+ try:
+ ret = subprocess.check_output(['ip link set dev ' + self._ifname + ' ' + state], shell=True).decode()
+ except subprocess.CalledProcessError as e:
+ if self._debug():
+ self._debug(e)
+
+
+
+ def _debug(self, e=None):
+ """
+ export DEBUG=1 to see debug messages
+ """
+ if os.getenv('DEBUG') == '1':
+ if e:
+ print ("Exception raised:\ncommand: {0}\nerror code: {1}\nsubprocess output: {2}".format(e.cmd, e.returncode, e.output.decode()) )
+ return True
+ return False
+
+ def get_mtu(self):
+ try:
+ ret = subprocess.check_output(['ip -j link list dev ' + self._ifname], shell=True).decode()
+ a = json.loads(ret)[0]
+ return a['mtu']
+ except subprocess.CalledProcessError as e:
+ if self._debug():
+ self._debug(e)
+ return None
+
+ def get_macaddr(self):
+ try:
+ ret = subprocess.check_output(['ip -j -4 link show dev ' + self._ifname], stderr=subprocess.STDOUT, shell=True).decode()
+ j = json.loads(ret)
+ return j[0]['address']
+ except subprocess.CalledProcessError as e:
+ if self._debug():
+ self._debug(e)
+ return None
+
+ def get_alias(self):
+ return open('/sys/class/net/{0}/ifalias'.format(self._ifname),'r').read()
+
+ def del_alias(self):
+ open('/sys/class/net/{0}/ifalias'.format(self._ifname),'w').write()
+
+ def get_link_state(self):
+ """
+ returns either up/down or None if it can't find the state
+ """
+ try:
+ ret = subprocess.check_output(['ip -j link show ' + self._ifname], shell=True).decode()
+ s = json.loads(ret)
+ return s[0]['operstate'].lower()
+ except subprocess.CalledProcessError as e:
+ if self._debug():
+ self._debug(e)
+ return None
+
+ def remove_interface(self):
+ try:
+ ret = subprocess.check_output(['ip link del dev ' + self._ifname], shell=True).decode()
+ return 0
+ except subprocess.CalledProcessError as e:
+ if self._debug():
+ self._debug(e)
+ return None
+
+ def get_ipv4_addr(self):
+ """
+ reads all IPs assigned to an interface and returns it in a list,
+ or None if no IP address is assigned to the interface
+ """
+ ips = []
+ try:
+ ret = subprocess.check_output(['ip -j -4 addr show dev ' + self._ifname], stderr=subprocess.STDOUT, shell=True).decode()
+ j = json.loads(ret)
+ for i in j:
+ if len(i) != 0:
+ for addr in i['addr_info']:
+ ips.append(addr['local'])
+ return ips
+ except subprocess.CalledProcessError as e:
+ if self._debug():
+ self._debug(e)
+ return None
+
+
+ def get_ipv6_addr(self):
+ """
+ reads all IPs assigned to an interface and returns it in a list,
+ or None if no IP address is assigned to the interface
+ """
+ ips = []
+ try:
+ ret = subprocess.check_output(['ip -j -6 addr show dev ' + self._ifname], stderr=subprocess.STDOUT, shell=True).decode()
+ j = json.loads(ret)
+ for i in j:
+ if len(i) != 0:
+ for addr in i['addr_info']:
+ ips.append(addr['local'])
+ return ips
+ except subprocess.CalledProcessError as e:
+ if self._debug():
+ self._debug(e)
+ return None
+
+
+ def add_ipv4_addr(self, ipaddr=[]):
+ """
+ add addresses on the interface
+ """
+ for ip in ipaddr:
+ try:
+ ret = subprocess.check_output(['ip -4 address add ' + ip + ' dev ' + self._ifname], stderr=subprocess.STDOUT, shell=True).decode()
+ except subprocess.CalledProcessError as e:
+ if self._debug():
+ self._debug(e)
+ return None
+ return True
+
+
+ def del_ipv4_addr(self, ipaddr=[]):
+ """
+ delete addresses on the interface
+ """
+ for ip in ipaddr:
+ try:
+ ret = subprocess.check_output(['ip -4 address del ' + ip + ' dev ' + self._ifname], stderr=subprocess.STDOUT, shell=True).decode()
+ except subprocess.CalledProcessError as e:
+ if self._debug():
+ self._debug(e)
+ return None
+ return True
+
+
+ def add_ipv6_addr(self, ipaddr=[]):
+ """
+ add addresses on the interface
+ """
+ for ip in ipaddr:
+ try:
+ ret = subprocess.check_output(['ip -6 address add ' + ip + ' dev ' + self._ifname], stderr=subprocess.STDOUT, shell=True).decode()
+ except subprocess.CalledProcessError as e:
+ if self._debug():
+ self._debug(e)
+ return None
+ return True
+
+
+ def del_ipv6_addr(self, ipaddr=[]):
+ """
+ delete addresses on the interface
+ """
+ for ip in ipaddr:
+ try:
+ ret = subprocess.check_output(['ip -6 address del ' + ip + ' dev ' + self._ifname], stderr=subprocess.STDOUT, shell=True).decode()
+ except subprocess.CalledProcessError as e:
+ if self._debug():
+ self._debug(e)
+ return None
+ return True
+
+
+ #### replace dhcpv4/v6 with systemd.networkd?
+ def set_dhcpv4(self):
+ conf_file = dhclient_conf_dir + self._ifname + '.conf'
+ pidfile = dhclient_conf_dir + self._ifname + '.pid'
+ leasefile = dhclient_conf_dir + self._ifname + '.leases'
+
+ a = [
+ '# generated by interface_config.py',
+ 'option rfc3442-classless-static-routes code 121 = array of unsigned integer 8;',
+ 'interface \"' + self._ifname + '\" {',
+ '\tsend host-name \"' + socket.gethostname() +'\";',
+ '\trequest subnet-mask, broadcast-address, routers, domain-name-servers, rfc3442-classless-static-routes, domain-name, interface-mtu;',
+ '}'
+ ]
+
+ cnf = ""
+ for ln in a:
+ cnf +=str(ln + "\n")
+ open(conf_file, 'w').write(cnf)
+ if os.path.exists(dhclient_conf_dir + self._ifname + '.pid'):
+ try:
+ ret = subprocess.check_output(['/sbin/dhclient -4 -r -pf ' + pidfile], shell=True).decode()
+ except subprocess.CalledProcessError as e:
+ if self._debug():
+ self._debug(e)
+ try:
+ ret = subprocess.check_output(['/sbin/dhclient -4 -q -nw -cf ' + conf_file + ' -pf ' + pidfile + ' -lf ' + leasefile + ' ' + self._ifname], shell=True).decode()
+ return True
+ except subprocess.CalledProcessError as e:
+ if self._debug():
+ self._debug(e)
+ return None
+
+ def del_dhcpv4(self):
+ conf_file = dhclient_conf_dir + self._ifname + '.conf'
+ pidfile = dhclient_conf_dir + self._ifname + '.pid'
+ leasefile = dhclient_conf_dir + self._ifname + '.leases'
+ if not os.path.exists(pidfile):
+ return 1
+ try:
+ ret = subprocess.check_output(['/sbin/dhclient -4 -r -pf ' + pidfile], shell=True).decode()
+ return True
+ except subprocess.CalledProcessError as e:
+ if self._debug():
+ self._debug(e)
+ return None
+
+ def get_dhcpv4(self):
+ pidfile = dhclient_conf_dir + self._ifname + '.pid'
+ if not os.path.exists(pidfile):
+ print ("no dhcp client running on interface {0}".format(self._ifname))
+ return False
+ else:
+ pid = open(pidfile, 'r').read()
+ print("dhclient running on {0} with pid {1}".format(self._ifname, pid))
+ return True
+
+
+ def set_dhcpv6(self):
+ conf_file = dhclient_conf_dir + self._ifname + '.v6conf'
+ pidfile = dhclient_conf_dir + self._ifname + '.v6pid'
+ leasefile = dhclient_conf_dir + self._ifname + '.v6leases'
+ a = [
+ '# generated by interface_config.py',
+ 'interface \"' + self._ifname + '\" {',
+ '\trequest routers, domain-name-servers, domain-name;',
+ '}'
+ ]
+ cnf = ""
+ for ln in a:
+ cnf +=str(ln + "\n")
+ open(conf_file, 'w').write(cnf)
+ subprocess.call(['sysctl', '-q', '-w', 'net.ipv6.conf.' + self._ifname + '.accept_ra=0'])
+ if os.path.exists(pidfile):
+ try:
+ ret = subprocess.check_output(['/sbin/dhclient -6 -q -x -pf ' + pidfile], shell=True).decode()
+ except subprocess.CalledProcessError as e:
+ if self._debug():
+ self._debug(e)
+ try:
+ ret = subprocess.check_output(['/sbin/dhclient -6 -q -nw -cf ' + conf_file + ' -pf ' + pidfile + ' -lf ' + leasefile + ' ' + self._ifname], shell=True).decode()
+ return True
+ except subprocess.CalledProcessError as e:
+ if self._debug():
+ self._debug(e)
+ return None
+
+ def del_dhcpv6(self):
+ conf_file = dhclient_conf_dir + self._ifname + '.v6conf'
+ pidfile = dhclient_conf_dir + self._ifname + '.v6pid'
+ leasefile = dhclient_conf_dir + self._ifname + '.v6leases'
+ if not os.path.exists(pidfile):
+ return 1
+ try:
+ ret = subprocess.check_output(['/sbin/dhclient -6 -q -x -pf ' + pidfile], shell=True).decode()
+ subprocess.call(['sysctl', '-q', '-w', 'net.ipv6.conf.' + self._ifname + '.accept_ra=1'])
+ return True
+ except subprocess.CalledProcessError as e:
+ if self._debug():
+ self._debug(e)
+ return None
+
+ def get_dhcpv6(self):
+ pidfile = dhclient_conf_dir + self._ifname + '.v6pid'
+ if not os.path.exists(pidfile):
+ print ("no dhcpv6 client running on interface {0}".format(self._ifname))
+ return False
+ else:
+ pid = open(pidfile, 'r').read()
+ print("dhclientv6 running on {0} with pid {1}".format(self._ifname, pid))
+ return True
+
+
+#### TODO: dhcpv6-pd via dhclient
+
diff --git a/python/vyos/validate.py b/python/vyos/validate.py
index 8def0a510..97a401423 100644
--- a/python/vyos/validate.py
+++ b/python/vyos/validate.py
@@ -18,32 +18,33 @@ import ipaddress
def is_ipv4(addr):
"""
- Check addr if it is an IPv4 address/network.
-
- Return True/False
+ Check addr if it is an IPv4 address/network. Returns True/False
"""
- if ipaddress.ip_network(addr).version == 4:
+
+ # With the below statement we can check for IPv4 networks and host
+ # addresses at the same time
+ if ipaddress.ip_address(addr.split(r'/')[0]).version == 4:
return True
else:
return False
def is_ipv6(addr):
"""
- Check addr if it is an IPv6 address/network.
-
- Return True/False
+ Check addr if it is an IPv6 address/network. Returns True/False
"""
- if ipaddress.ip_network(addr).version == 6:
+
+ # With the below statement we can check for IPv4 networks and host
+ # addresses at the same time
+ if ipaddress.ip_network(addr.split(r'/')[0]).version == 6:
return True
else:
return False
-def is_addr_assigned(addr):
+def is_intf_addr_assigned(intf, addr):
"""
- Verify if the given IPv4/IPv6 address is assigned to any interface on this
- system.
-
- Return True/False
+ Verify if the given IPv4/IPv6 address is assigned to specific interface.
+ It can check both a single IP address (e.g. 192.0.2.1 or a assigned CIDR
+ address 192.0.2.1/24.
"""
# determine IP version (AF_INET or AF_INET6) depending on passed address
@@ -51,15 +52,52 @@ def is_addr_assigned(addr):
if is_ipv6(addr):
addr_type = netifaces.AF_INET6
- for interface in netifaces.interfaces():
- # check if the requested address type is configured at all
- if addr_type in netifaces.ifaddresses(interface).keys():
- # Check every IP address on this interface for a match
- for ip in netifaces.ifaddresses(interface)[addr_type]:
- # Check if it matches to the address requested
- if ip['addr'] == addr:
+ # check if the requested address type is configured at all
+ try:
+ netifaces.ifaddresses(intf)
+ except ValueError as e:
+ print(e)
+ return False
+
+ if addr_type in netifaces.ifaddresses(intf).keys():
+ # Check every IP address on this interface for a match
+ for ip in netifaces.ifaddresses(intf)[addr_type]:
+ # Check if it matches to the address requested
+ # If passed address contains a '/' indicating a normalized IP
+ # address we have to take this into account, too
+ if r'/' in addr:
+ prefixlen = ''
+ if is_ipv6(addr):
+ # Note that currently expanded netmasks are not supported. That means
+ # 2001:db00::0/24 is a valid argument while 2001:db00::0/ffff:ff00:: not.
+ # see https://docs.python.org/3/library/ipaddress.html
+ bits = bin( int(ip['netmask'].replace(':',''), 16) ).count('1')
+ prefixlen = '/' + str(bits)
+
+ else:
+ prefixlen = '/' + str(ipaddress.IPv4Network('0.0.0.0/' + ip['netmask']).prefixlen)
+
+ # construct temporary variable holding IPv6 address and netmask
+ # in CIDR notation
+ tmp = ip['addr'] + prefixlen
+ if addr == tmp:
return True
+ elif ip['addr'] == addr:
+ return True
+
+ return False
+
+def is_addr_assigned(addr):
+ """
+ Verify if the given IPv4/IPv6 address is assigned to any interface
+ """
+
+ for intf in netifaces.interfaces():
+ tmp = is_intf_addr_assigned(intf, addr)
+ if tmp == True:
+ return True
+
return False
def is_subnet_connected(subnet, primary=False):