summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/vyos/configdict.py20
-rw-r--r--python/vyos/configverify.py62
-rw-r--r--python/vyos/ifconfig/bond.py8
-rw-r--r--python/vyos/ifconfig/bridge.py8
-rw-r--r--python/vyos/ifconfig/ethernet.py12
-rw-r--r--python/vyos/ifconfig/interface.py36
-rw-r--r--python/vyos/ifconfig/vtun.py44
-rw-r--r--python/vyos/template.py71
-rw-r--r--python/vyos/util.py2
-rw-r--r--python/vyos/validate.py72
10 files changed, 210 insertions, 125 deletions
diff --git a/python/vyos/configdict.py b/python/vyos/configdict.py
index db17c33fc..e43b68f6f 100644
--- a/python/vyos/configdict.py
+++ b/python/vyos/configdict.py
@@ -18,7 +18,7 @@ A library for retrieving value dicts from VyOS configs in a declarative fashion.
"""
import os
-from vyos.util import vyos_dict_search
+from vyos.util import dict_search
from vyos.xml import defaults
from vyos import ConfigError
@@ -174,10 +174,10 @@ def T2665_set_dhcpv6pd_defaults(config_dict):
pd_defaults = defaults(['interfaces', 'ethernet', 'dhcpv6-options', 'pd'])
# Implant default dictionary for DHCPv6-PD instances
- if vyos_dict_search('dhcpv6_options.pd.length', config_dict):
+ if dict_search('dhcpv6_options.pd.length', config_dict):
del config_dict['dhcpv6_options']['pd']['length']
- for pd in (vyos_dict_search('dhcpv6_options.pd', config_dict) or []):
+ for pd in (dict_search('dhcpv6_options.pd', config_dict) or []):
config_dict['dhcpv6_options']['pd'][pd] = dict_merge(pd_defaults,
config_dict['dhcpv6_options']['pd'][pd])
@@ -332,7 +332,7 @@ def get_interface_dict(config, base, ifname=''):
eui64 = leaf_node_changed(config, ['ipv6', 'address', 'eui64'])
if eui64:
- tmp = vyos_dict_search('ipv6.address', dict)
+ tmp = dict_search('ipv6.address', dict)
if not tmp:
dict.update({'ipv6': {'address': {'eui64_old': eui64}}})
else:
@@ -419,12 +419,12 @@ def get_accel_dict(config, base, chap_secrets):
# defaults include RADIUS server specifics per TAG node which need to be
# added to individual RADIUS servers instead - so we can simply delete them
- if vyos_dict_search('authentication.radius.server', default_values):
+ if dict_search('authentication.radius.server', default_values):
del default_values['authentication']['radius']['server']
# defaults include static-ip address per TAG node which need to be added to
# individual local users instead - so we can simply delete them
- if vyos_dict_search('authentication.local_users.username', default_values):
+ if dict_search('authentication.local_users.username', default_values):
del default_values['authentication']['local_users']['username']
dict = dict_merge(default_values, dict)
@@ -448,10 +448,10 @@ def get_accel_dict(config, base, chap_secrets):
del dict['name_server']
# Add individual RADIUS server default values
- if vyos_dict_search('authentication.radius.server', dict):
+ if dict_search('authentication.radius.server', dict):
default_values = defaults(base + ['authentication', 'radius', 'server'])
- for server in vyos_dict_search('authentication.radius.server', dict):
+ for server in dict_search('authentication.radius.server', dict):
dict['authentication']['radius']['server'][server] = dict_merge(
default_values, dict['authentication']['radius']['server'][server])
@@ -461,10 +461,10 @@ def get_accel_dict(config, base, chap_secrets):
dict['authentication']['radius']['server'][server]['acct_port'] = '0'
# Add individual local-user default values
- if vyos_dict_search('authentication.local_users.username', dict):
+ if dict_search('authentication.local_users.username', dict):
default_values = defaults(base + ['authentication', 'local-users', 'username'])
- for username in vyos_dict_search('authentication.local_users.username', dict):
+ for username in dict_search('authentication.local_users.username', dict):
dict['authentication']['local_users']['username'][username] = dict_merge(
default_values, dict['authentication']['local_users']['username'][username])
diff --git a/python/vyos/configverify.py b/python/vyos/configverify.py
index 422483663..babb0feb7 100644
--- a/python/vyos/configverify.py
+++ b/python/vyos/configverify.py
@@ -22,7 +22,7 @@
# makes use of it!
from vyos import ConfigError
-from vyos.util import vyos_dict_search
+from vyos.util import dict_search
def verify_mtu(config):
"""
@@ -60,19 +60,19 @@ def verify_mtu_ipv6(config):
error_msg = f'IPv6 address will be configured on interface "{interface}" ' \
f'thus the minimum MTU requirement is {min_mtu}!'
- if not vyos_dict_search('ipv6.address.no_default_link_local', config):
- raise ConfigError('link-local ' + error_msg)
-
- for address in (vyos_dict_search('address', config) or []):
+ for address in (dict_search('address', config) or []):
if address in ['dhcpv6'] or is_ipv6(address):
raise ConfigError(error_msg)
- if vyos_dict_search('ipv6.address.autoconf', config):
- raise ConfigError(error_msg)
+ tmp = dict_search('ipv6.address', config)
+ if tmp and 'no_default_link_local' not in tmp:
+ raise ConfigError('link-local ' + error_msg)
- if vyos_dict_search('ipv6.address.eui64', config):
+ if tmp and 'autoconf' in tmp:
raise ConfigError(error_msg)
+ if tmp and 'eui64' in tmp:
+ raise ConfigError(error_msg)
def verify_vrf(config):
"""
@@ -154,7 +154,7 @@ def verify_dhcpv6(config):
recurring validation of DHCPv6 options which are mutually exclusive.
"""
if 'dhcpv6_options' in config:
- from vyos.util import vyos_dict_search
+ from vyos.util import dict_search
if {'parameters_only', 'temporary'} <= set(config['dhcpv6_options']):
raise ConfigError('DHCPv6 temporary and parameters-only options '
@@ -162,15 +162,15 @@ def verify_dhcpv6(config):
# It is not allowed to have duplicate SLA-IDs as those identify an
# assigned IPv6 subnet from a delegated prefix
- for pd in vyos_dict_search('dhcpv6_options.pd', config):
+ for pd in dict_search('dhcpv6_options.pd', config):
sla_ids = []
- if not vyos_dict_search(f'dhcpv6_options.pd.{pd}.interface', config):
+ if not dict_search(f'dhcpv6_options.pd.{pd}.interface', config):
raise ConfigError('DHCPv6-PD requires an interface where to assign '
'the delegated prefix!')
- for interface in vyos_dict_search(f'dhcpv6_options.pd.{pd}.interface', config):
- sla_id = vyos_dict_search(
+ for interface in dict_search(f'dhcpv6_options.pd.{pd}.interface', config):
+ sla_id = dict_search(
f'dhcpv6_options.pd.{pd}.interface.{interface}.sla_id', config)
sla_ids.append(sla_id)
@@ -211,11 +211,11 @@ def verify_accel_ppp_base_service(config):
on get_config_dict()
"""
# vertify auth settings
- if vyos_dict_search('authentication.mode', config) == 'local':
- if not vyos_dict_search('authentication.local_users', config):
+ if dict_search('authentication.mode', config) == 'local':
+ if not dict_search('authentication.local_users', config):
raise ConfigError('PPPoE local auth mode requires local users to be configured!')
- for user in vyos_dict_search('authentication.local_users.username', config):
+ for user in dict_search('authentication.local_users.username', config):
user_config = config['authentication']['local_users']['username'][user]
if 'password' not in user_config:
@@ -227,11 +227,11 @@ def verify_accel_ppp_base_service(config):
raise ConfigError(f'User "{user}" has rate-limit configured for only one ' \
'direction but both upload and download must be given!')
- elif vyos_dict_search('authentication.mode', config) == 'radius':
- if not vyos_dict_search('authentication.radius.server', config):
+ elif dict_search('authentication.mode', config) == 'radius':
+ if not dict_search('authentication.radius.server', config):
raise ConfigError('RADIUS authentication requires at least one server')
- for server in vyos_dict_search('authentication.radius.server', config):
+ for server in dict_search('authentication.radius.server', config):
radius_config = config['authentication']['radius']['server'][server]
if 'key' not in radius_config:
raise ConfigError(f'Missing RADIUS secret key for server "{server}"')
@@ -259,3 +259,27 @@ def verify_accel_ppp_base_service(config):
if 'delegation_prefix' not in ipv6_pool['delegate'][delegate]:
raise ConfigError('delegation-prefix length required!')
+def verify_diffie_hellman_length(file, min_keysize):
+ """ Verify Diffie-Hellamn keypair length given via file. It must be greater
+ then or equal to min_keysize """
+
+ try:
+ keysize = str(min_keysize)
+ except:
+ return False
+
+ import os
+ import re
+ from vyos.util import cmd
+
+ if os.path.exists(file):
+
+ out = cmd(f'openssl dhparam -inform PEM -in {file} -text')
+ prog = re.compile('\d+\s+bit')
+ if prog.search(out):
+ bits = prog.search(out)[0].split()[0]
+ if int(min_keysize) >= int(bits):
+ return True
+
+ return False
+
diff --git a/python/vyos/ifconfig/bond.py b/python/vyos/ifconfig/bond.py
index 9108fc180..709222b09 100644
--- a/python/vyos/ifconfig/bond.py
+++ b/python/vyos/ifconfig/bond.py
@@ -17,7 +17,7 @@ import os
from vyos.ifconfig.interface import Interface
from vyos.util import cmd
-from vyos.util import vyos_dict_search
+from vyos.util import dict_search
from vyos.validate import assert_list
from vyos.validate import assert_positive
@@ -360,7 +360,7 @@ class BondIf(Interface):
self.set_arp_ip_target('-' + addr)
# Add configured ARP target addresses
- value = vyos_dict_search('arp_monitor.target', config)
+ value = dict_search('arp_monitor.target', config)
if isinstance(value, str):
value = [value]
if value:
@@ -384,7 +384,7 @@ class BondIf(Interface):
# Removing an interface from a bond will always place the underlaying
# physical interface in admin-down state! If physical interface is
# not disabled, re-enable it.
- if not vyos_dict_search(f'member.interface_remove.{interface}.disable', config):
+ if not dict_search(f'member.interface_remove.{interface}.disable', config):
Interface(interface).set_admin_state('up')
# Bonding policy/mode
@@ -392,7 +392,7 @@ class BondIf(Interface):
if value: self.set_mode(value)
# Add (enslave) interfaces to bond
- value = vyos_dict_search('member.interface', config)
+ value = dict_search('member.interface', config)
for interface in (value or []):
# if we've come here we already verified the interface
# does not have an addresses configured so just flush
diff --git a/python/vyos/ifconfig/bridge.py b/python/vyos/ifconfig/bridge.py
index bf78f8972..f7388b298 100644
--- a/python/vyos/ifconfig/bridge.py
+++ b/python/vyos/ifconfig/bridge.py
@@ -19,7 +19,7 @@ from vyos.ifconfig.interface import Interface
from vyos.validate import assert_boolean
from vyos.validate import assert_positive
from vyos.util import cmd
-from vyos.util import vyos_dict_search
+from vyos.util import dict_search
@Interface.register
class BridgeIf(Interface):
@@ -223,17 +223,17 @@ class BridgeIf(Interface):
self.set_stp(value)
# enable or disable IGMP querier
- tmp = vyos_dict_search('igmp.querier', config)
+ tmp = dict_search('igmp.querier', config)
value = '1' if (tmp != None) else '0'
self.set_multicast_querier(value)
# remove interface from bridge
- tmp = vyos_dict_search('member.interface_remove', config)
+ tmp = dict_search('member.interface_remove', config)
for member in (tmp or []):
if member in interfaces():
self.del_port(member)
- tmp = vyos_dict_search('member.interface', config)
+ tmp = dict_search('member.interface', config)
if tmp:
for interface, interface_config in tmp.items():
# if interface does yet not exist bail out early and
diff --git a/python/vyos/ifconfig/ethernet.py b/python/vyos/ifconfig/ethernet.py
index 1d48941f9..12d1ec265 100644
--- a/python/vyos/ifconfig/ethernet.py
+++ b/python/vyos/ifconfig/ethernet.py
@@ -19,7 +19,7 @@ import re
from vyos.ifconfig.interface import Interface
from vyos.validate import assert_list
from vyos.util import run
-from vyos.util import vyos_dict_search
+from vyos.util import dict_search
@Interface.register
class EthernetIf(Interface):
@@ -282,27 +282,27 @@ class EthernetIf(Interface):
self.set_flow_control(value)
# GRO (generic receive offload)
- tmp = vyos_dict_search('offload_options.generic_receive', config)
+ tmp = dict_search('offload_options.generic_receive', config)
value = tmp if (tmp != None) else 'off'
self.set_gro(value)
# GSO (generic segmentation offload)
- tmp = vyos_dict_search('offload_options.generic_segmentation', config)
+ tmp = dict_search('offload_options.generic_segmentation', config)
value = tmp if (tmp != None) else 'off'
self.set_gso(value)
# scatter-gather option
- tmp = vyos_dict_search('offload_options.scatter_gather', config)
+ tmp = dict_search('offload_options.scatter_gather', config)
value = tmp if (tmp != None) else 'off'
self.set_sg(value)
# TSO (TCP segmentation offloading)
- tmp = vyos_dict_search('offload_options.udp_fragmentation', config)
+ tmp = dict_search('offload_options.udp_fragmentation', config)
value = tmp if (tmp != None) else 'off'
self.set_tso(value)
# UDP fragmentation offloading
- tmp = vyos_dict_search('offload_options.udp_fragmentation', config)
+ tmp = dict_search('offload_options.udp_fragmentation', config)
value = tmp if (tmp != None) else 'off'
self.set_ufo(value)
diff --git a/python/vyos/ifconfig/interface.py b/python/vyos/ifconfig/interface.py
index ae747e87c..894410871 100644
--- a/python/vyos/ifconfig/interface.py
+++ b/python/vyos/ifconfig/interface.py
@@ -34,7 +34,7 @@ from vyos.configdict import list_diff
from vyos.configdict import dict_merge
from vyos.template import render
from vyos.util import mac2eui64
-from vyos.util import vyos_dict_search
+from vyos.util import dict_search
from vyos.validate import is_ipv4
from vyos.validate import is_ipv6
from vyos.validate import is_intf_addr_assigned
@@ -880,7 +880,7 @@ class Interface(Control):
lease_file = f'{config_base}_{ifname}.leases'
if enable and 'disable' not in self._config:
- if vyos_dict_search('dhcp_options.host_name', self._config) == None:
+ if dict_search('dhcp_options.host_name', self._config) == None:
# read configured system hostname.
# maybe change to vyos hostd client ???
hostname = 'vyos'
@@ -959,7 +959,7 @@ class Interface(Control):
# always ensure DHCPv6 client is stopped (when not configured as client
# for IPv6 address or prefix delegation
- dhcpv6pd = vyos_dict_search('dhcpv6_options.pd', config)
+ dhcpv6pd = dict_search('dhcpv6_options.pd', config)
if 'dhcpv6' not in new_addr or dhcpv6pd == None:
self.del_addr('dhcpv6')
@@ -987,64 +987,64 @@ class Interface(Control):
self.set_vrf(config.get('vrf', ''))
# Configure ARP cache timeout in milliseconds - has default value
- tmp = vyos_dict_search('ip.arp_cache_timeout', config)
+ tmp = dict_search('ip.arp_cache_timeout', config)
value = tmp if (tmp != None) else '30'
self.set_arp_cache_tmo(value)
# Configure ARP filter configuration
- tmp = vyos_dict_search('ip.disable_arp_filter', config)
+ tmp = dict_search('ip.disable_arp_filter', config)
value = '0' if (tmp != None) else '1'
self.set_arp_filter(value)
# Configure ARP accept
- tmp = vyos_dict_search('ip.enable_arp_accept', config)
+ tmp = dict_search('ip.enable_arp_accept', config)
value = '1' if (tmp != None) else '0'
self.set_arp_accept(value)
# Configure ARP announce
- tmp = vyos_dict_search('ip.enable_arp_announce', config)
+ tmp = dict_search('ip.enable_arp_announce', config)
value = '1' if (tmp != None) else '0'
self.set_arp_announce(value)
# Configure ARP ignore
- tmp = vyos_dict_search('ip.enable_arp_ignore', config)
+ tmp = dict_search('ip.enable_arp_ignore', config)
value = '1' if (tmp != None) else '0'
self.set_arp_ignore(value)
# Enable proxy-arp on this interface
- tmp = vyos_dict_search('ip.enable_proxy_arp', config)
+ tmp = dict_search('ip.enable_proxy_arp', config)
value = '1' if (tmp != None) else '0'
self.set_proxy_arp(value)
# Enable private VLAN proxy ARP on this interface
- tmp = vyos_dict_search('ip.proxy_arp_pvlan', config)
+ tmp = dict_search('ip.proxy_arp_pvlan', config)
value = '1' if (tmp != None) else '0'
self.set_proxy_arp_pvlan(value)
# IPv4 forwarding
- tmp = vyos_dict_search('ip.disable_forwarding', config)
+ tmp = dict_search('ip.disable_forwarding', config)
value = '0' if (tmp != None) else '1'
self.set_ipv4_forwarding(value)
# IPv6 forwarding
- tmp = vyos_dict_search('ipv6.disable_forwarding', config)
+ tmp = dict_search('ipv6.disable_forwarding', config)
value = '0' if (tmp != None) else '1'
self.set_ipv6_forwarding(value)
# IPv6 router advertisements
- tmp = vyos_dict_search('ipv6.address.autoconf', config)
+ tmp = dict_search('ipv6.address.autoconf', config)
value = '2' if (tmp != None) else '1'
if 'dhcpv6' in new_addr:
value = '2'
self.set_ipv6_accept_ra(value)
# IPv6 address autoconfiguration
- tmp = vyos_dict_search('ipv6.address.autoconf', config)
+ tmp = dict_search('ipv6.address.autoconf', config)
value = '1' if (tmp != None) else '0'
self.set_ipv6_autoconf(value)
# IPv6 Duplicate Address Detection (DAD) tries
- tmp = vyos_dict_search('ipv6.dup_addr_detect_transmits', config)
+ tmp = dict_search('ipv6.dup_addr_detect_transmits', config)
value = tmp if (tmp != None) else '1'
self.set_ipv6_dad_messages(value)
@@ -1053,7 +1053,7 @@ class Interface(Control):
self.set_mtu(config.get('mtu'))
# Delete old IPv6 EUI64 addresses before changing MAC
- tmp = vyos_dict_search('ipv6.address.eui64_old', config)
+ tmp = dict_search('ipv6.address.eui64_old', config)
if tmp:
for addr in tmp:
self.del_ipv6_eui64_address(addr)
@@ -1068,7 +1068,7 @@ class Interface(Control):
self.set_mac(mac)
# Manage IPv6 link-local addresses
- tmp = vyos_dict_search('ipv6.address.no_default_link_local', config)
+ tmp = dict_search('ipv6.address.no_default_link_local', config)
# we must check explicitly for None type as if the key is set we will
# get an empty dict (<class 'dict'>)
if tmp is not None:
@@ -1077,7 +1077,7 @@ class Interface(Control):
self.add_ipv6_eui64_address('fe80::/64')
# Add IPv6 EUI-based addresses
- tmp = vyos_dict_search('ipv6.address.eui64', config)
+ tmp = dict_search('ipv6.address.eui64', config)
if tmp:
for addr in tmp:
self.add_ipv6_eui64_address(addr)
diff --git a/python/vyos/ifconfig/vtun.py b/python/vyos/ifconfig/vtun.py
index b25e32d63..99a592b3e 100644
--- a/python/vyos/ifconfig/vtun.py
+++ b/python/vyos/ifconfig/vtun.py
@@ -19,6 +19,7 @@ from vyos.ifconfig.interface import Interface
class VTunIf(Interface):
default = {
'type': 'vtun',
+ 'device_type': 'tun',
}
definition = {
**Interface.definition,
@@ -28,15 +29,44 @@ class VTunIf(Interface):
'bridgeable': True,
},
}
-
- # stub this interface is created in the configure script
+ options = Interface.options + ['device_type']
def _create(self):
- # we can not create this interface as it is managed outside
- # it requires configuring OpenVPN
+ """ Depending on OpenVPN operation mode the interface is created
+ immediately (e.g. Server mode) or once the connection to the server is
+ established (client mode). The latter will only be brought up once the
+ server can be reached, thus we might need to create this interface in
+ advance for the service to be operational. """
+ try:
+ cmd = 'openvpn --mktun --dev-type {device_type} --dev {ifname}'.format(**self.config)
+ return self._cmd(cmd)
+ except PermissionError:
+ # interface created by OpenVPN daemon in the meantime ...
+ pass
+
+ def add_addr(self, addr):
+ # IP addresses are managed by OpenVPN daemon
pass
- def _delete(self):
- # we can not create this interface as it is managed outside
- # it requires configuring OpenVPN
+ def del_addr(self, addr):
+ # IP addresses are managed by OpenVPN daemon
pass
+
+ def update(self, config):
+ """ General helper function which works on a dictionary retrived by
+ get_config_dict(). It's main intention is to consolidate the scattered
+ interface setup code and provide a single point of entry when workin
+ on any interface. """
+
+ # call base class first
+ super().update(config)
+
+ # Enable/Disable of an interface must always be done at the end of the
+ # derived class to make use of the ref-counting set_admin_state()
+ # function. We will only enable the interface if 'up' was called as
+ # often as 'down'. This is required by some interface implementations
+ # as certain parameters can only be changed when the interface is
+ # in admin-down state. This ensures the link does not flap during
+ # reconfiguration.
+ state = 'down' if 'disable' in config else 'up'
+ self.set_admin_state(state)
diff --git a/python/vyos/template.py b/python/vyos/template.py
index 1f1ddc000..389f6927f 100644
--- a/python/vyos/template.py
+++ b/python/vyos/template.py
@@ -16,7 +16,6 @@
import functools
import os
-from ipaddress import ip_network
from jinja2 import Environment
from jinja2 import FileSystemLoader
from vyos.defaults import directories
@@ -124,20 +123,78 @@ def render(
# Custom template filters follow #
##################################
-
-@register_filter("address_from_cidr")
+@register_filter('address_from_cidr')
def vyos_address_from_cidr(text):
""" Take an IPv4/IPv6 CIDR prefix and convert the network to an "address".
Example:
192.0.2.0/24 -> 192.0.2.0, 2001:db8::/48 -> 2001:db8::
"""
+ from ipaddress import ip_network
return str(ip_network(text).network_address)
-
-@register_filter("netmask_from_cidr")
+@register_filter('netmask_from_cidr')
def vyos_netmask_from_cidr(text):
- """ Take an IPv4/IPv6 CIDR prefix and convert the prefix length to a "subnet mask".
+ """ Take CIDR prefix and convert the prefix length to a "subnet mask".
Example:
- 192.0.2.0/24 -> 255.255.255.0, 2001:db8::/48 -> ffff:ffff:ffff::
+ - 192.0.2.0/24 -> 255.255.255.0
+ - 2001:db8::/48 -> ffff:ffff:ffff::
"""
+ from ipaddress import ip_network
return str(ip_network(text).netmask)
+
+@register_filter('ipv4')
+def vyos_ipv4(text):
+ """ Filter IP address, return True on IPv4 address, False otherwise """
+ from ipaddress import ip_interface
+ try: return ip_interface(text).version == 4
+ except: return False
+
+@register_filter('ipv6')
+def vyos_ipv6(text):
+ """ Filter IP address, return True on IPv6 address, False otherwise """
+ from ipaddress import ip_interface
+ try: return ip_interface(text).version == 6
+ except: return False
+
+@register_filter('first_host_address')
+def vyos_first_host_address(text):
+ """ Return first usable (host) IP address from given prefix.
+ Example:
+ - 10.0.0.0/24 -> 10.0.0.1
+ - 2001:db8::/64 -> 2001:db8::
+ """
+ from ipaddress import ip_interface
+ from ipaddress import IPv4Network
+ from ipaddress import IPv6Network
+
+ addr = ip_interface(text)
+ if addr.version == 4:
+ return str(addr.ip +1)
+ return str(addr.ip)
+
+@register_filter('last_host_address')
+def vyos_last_host_address(text):
+ """ Return first usable IP address from given prefix.
+ Example:
+ - 10.0.0.0/24 -> 10.0.0.254
+ - 2001:db8::/64 -> 2001:db8::ffff:ffff:ffff:ffff
+ """
+ from ipaddress import ip_interface
+ from ipaddress import IPv4Network
+ from ipaddress import IPv6Network
+
+ addr = ip_interface(text)
+ if addr.version == 4:
+ return str(IPv4Network(addr).broadcast_address - 1)
+
+ return str(IPv6Network(addr).broadcast_address)
+
+@register_filter('inc_ip')
+def vyos_inc_ip(text, increment):
+ """ Return first usable IP address from given prefix.
+ Example:
+ - 10.0.0.0/24 -> 10.0.0.1
+ - 2001:db8::/64 -> 2001:db8::1
+ """
+ from ipaddress import ip_interface
+ return str(ip_interface(text).ip + int(increment))
diff --git a/python/vyos/util.py b/python/vyos/util.py
index e3e389baf..fc6915687 100644
--- a/python/vyos/util.py
+++ b/python/vyos/util.py
@@ -603,7 +603,7 @@ def find_device_file(device):
return None
-def vyos_dict_search(path, dict):
+def dict_search(path, dict):
""" Traverse Python dictionary (dict) delimited by dot (.).
Return value of key if found, None otherwise.
diff --git a/python/vyos/validate.py b/python/vyos/validate.py
index 691cf3c8e..74488bed6 100644
--- a/python/vyos/validate.py
+++ b/python/vyos/validate.py
@@ -1,4 +1,4 @@
-# Copyright 2018 VyOS maintainers and contributors <maintainers@vyos.io>
+# Copyright 2018-2020 VyOS maintainers and contributors <maintainers@vyos.io>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -13,11 +13,7 @@
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
-import json
-import socket
import netifaces
-import ipaddress
-
from vyos.util import cmd
# Important note when you are adding new validation functions:
@@ -29,60 +25,36 @@ from vyos.util import cmd
# parameters with default will be left unset
# all other paramters will receive the value to check
-
def is_ip(addr):
- """
- Check addr if it is an IPv4 or IPv6 address
- """
+ """ Check addr if it is an IPv4 or IPv6 address """
return is_ipv4(addr) or is_ipv6(addr)
def is_ipv4(addr):
- """
- Check addr if it is an IPv4 address/network. Returns True/False
- """
-
- # With the below statement we can check for IPv4 networks and host
- # addresses at the same time
- try:
- if ipaddress.ip_address(addr.split(r'/')[0]).version == 4:
- return True
- except:
- pass
-
- return False
+ from vyos.template import vyos_ipv4
+ return vyos_ipv4(addr)
def is_ipv6(addr):
- """
- Check addr if it is an IPv6 address/network. Returns True/False
- """
-
- # With the below statement we can check for IPv4 networks and host
- # addresses at the same time
- try:
- if ipaddress.ip_network(addr.split(r'/')[0]).version == 6:
- return True
- except:
- pass
-
- return False
+ from vyos.template import vyos_ipv6
+ return vyos_ipv6(addr)
def is_ipv6_link_local(addr):
- """
- Check addr if it is an IPv6 link-local address/network. Returns True/False
- """
-
+ """ Check if addrsss is an IPv6 link-local address. Returns True/False """
+ from ipaddress import IPv6Address
addr = addr.split('%')[0]
if is_ipv6(addr):
- if ipaddress.IPv6Address(addr).is_link_local:
+ if IPv6Address(addr).is_link_local:
return True
return False
def _are_same_ip(one, two):
+ from socket import AF_INET
+ from socket import AF_INET6
+ from socket import inet_pton
# compare the binary representation of the IP
- f_one = socket.AF_INET if is_ipv4(one) else socket.AF_INET6
- s_two = socket.AF_INET if is_ipv4(two) else socket.AF_INET6
- return socket.inet_pton(f_one, one) == socket.inet_pton(f_one, two)
+ f_one = AF_INET if is_ipv4(one) else AF_INET6
+ s_two = AF_INET if is_ipv4(two) else AF_INET6
+ return inet_pton(f_one, one) == inet_pton(f_one, two)
def is_intf_addr_assigned(intf, addr):
if '/' in addr:
@@ -149,10 +121,9 @@ def is_addr_assigned(addr):
return False
def is_loopback_addr(addr):
- """
- Check if supplied IPv4/IPv6 address is a loopback address
- """
- return ipaddress.ip_address(addr).is_loopback
+ """ Check if supplied IPv4/IPv6 address is a loopback address """
+ from ipaddress import ip_address
+ return ip_address(addr).is_loopback
def is_subnet_connected(subnet, primary=False):
"""
@@ -165,6 +136,8 @@ def is_subnet_connected(subnet, primary=False):
Return True/False
"""
+ from ipaddress import ip_address
+ from ipaddress import ip_network
# determine IP version (AF_INET or AF_INET6) depending on passed address
addr_type = netifaces.AF_INET
@@ -180,7 +153,7 @@ def is_subnet_connected(subnet, primary=False):
# only support the primary address :(
if primary:
ip = netifaces.ifaddresses(interface)[addr_type][0]['addr']
- if ipaddress.ip_address(ip) in ipaddress.ip_network(subnet):
+ if ip_address(ip) in ip_network(subnet):
return True
else:
# Check every assigned IP address if it is connected to the subnet
@@ -188,7 +161,7 @@ def is_subnet_connected(subnet, primary=False):
for ip in netifaces.ifaddresses(interface)[addr_type]:
# remove interface extension (e.g. %eth0) that gets thrown on the end of _some_ addrs
addr = ip['addr'].split('%')[0]
- if ipaddress.ip_address(addr) in ipaddress.ip_network(subnet):
+ if ip_address(addr) in ip_network(subnet):
return True
return False
@@ -224,6 +197,7 @@ def assert_positive(n, smaller=0):
def assert_mtu(mtu, ifname):
assert_number(mtu)
+ import json
out = cmd(f'ip -j -d link show dev {ifname}')
# [{"ifindex":2,"ifname":"eth0","flags":["BROADCAST","MULTICAST","UP","LOWER_UP"],"mtu":1500,"qdisc":"pfifo_fast","operstate":"UP","linkmode":"DEFAULT","group":"default","txqlen":1000,"link_type":"ether","address":"08:00:27:d9:5b:04","broadcast":"ff:ff:ff:ff:ff:ff","promiscuity":0,"min_mtu":46,"max_mtu":16110,"inet6_addr_gen_mode":"none","num_tx_queues":1,"num_rx_queues":1,"gso_max_size":65536,"gso_max_segs":65535}]
parsed = json.loads(out)[0]