summaryrefslogtreecommitdiff
path: root/python/vyos
diff options
context:
space:
mode:
Diffstat (limited to 'python/vyos')
-rw-r--r--python/vyos/base.py6
-rw-r--r--python/vyos/configverify.py57
-rw-r--r--python/vyos/firewall.py2
-rwxr-xr-xpython/vyos/ifconfig/interface.py190
-rw-r--r--python/vyos/ifconfig/loopback.py12
-rw-r--r--python/vyos/template.py77
-rw-r--r--python/vyos/util.py9
7 files changed, 223 insertions, 130 deletions
diff --git a/python/vyos/base.py b/python/vyos/base.py
index c78045548..fd22eaccd 100644
--- a/python/vyos/base.py
+++ b/python/vyos/base.py
@@ -15,6 +15,12 @@
from textwrap import fill
+class DeprecationWarning():
+ def __init__(self, message):
+ # Reformat the message and trim it to 72 characters in length
+ message = fill(message, width=72)
+ print(f'\nDEPRECATION WARNING: {message}\n')
+
class ConfigError(Exception):
def __init__(self, message):
# Reformat the message and trim it to 72 characters in length
diff --git a/python/vyos/configverify.py b/python/vyos/configverify.py
index 7f1258575..1062d51ee 100644
--- a/python/vyos/configverify.py
+++ b/python/vyos/configverify.py
@@ -178,31 +178,37 @@ def verify_eapol(config):
if 'certificate' not in ca_cert:
raise ConfigError('Invalid CA certificate specified for EAPoL')
-def verify_mirror(config):
+def verify_mirror_redirect(config):
"""
Common helper function used by interface implementations to perform
- recurring validation of mirror interface configuration.
+ recurring validation of mirror and redirect interface configuration via tc(8)
It makes no sense to mirror traffic back at yourself!
"""
+ import os
+ if {'mirror', 'redirect'} <= set(config):
+ raise ConfigError('Mirror and redirect can not be enabled at the same time!')
+
if 'mirror' in config:
for direction, mirror_interface in config['mirror'].items():
+ if not os.path.exists(f'/sys/class/net/{mirror_interface}'):
+ raise ConfigError(f'Requested mirror interface "{mirror_interface}" '\
+ 'does not exist!')
+
if mirror_interface == config['ifname']:
- raise ConfigError(f'Can not mirror "{direction}" traffic back ' \
+ raise ConfigError(f'Can not mirror "{direction}" traffic back '\
'the originating interface!')
-def verify_redirect(config):
- """
- Common helper function used by interface implementations to perform
- recurring validation of the redirect interface configuration.
-
- It makes no sense to mirror and redirect traffic at the same time!
- """
- if {'mirror', 'redirect'} <= set(config):
- raise ConfigError('Can not do both redirect and mirror')
+ if 'redirect' in config:
+ redirect_ifname = config['redirect']
+ if not os.path.exists(f'/sys/class/net/{redirect_ifname}'):
+ raise ConfigError(f'Requested redirect interface "{redirect_ifname}" '\
+ 'does not exist!')
if dict_search('traffic_policy.in', config) != None:
- raise ConfigError('Can not use ingress policy and redirect')
+ # XXX: support combination of limiting and redirect/mirror - this is an
+ # artificial limitation
+ raise ConfigError('Can not use ingress policy tigether with mirror or redirect!')
def verify_authentication(config):
"""
@@ -322,30 +328,37 @@ def verify_vlan_config(config):
if duplicate:
raise ConfigError(f'Duplicate VLAN id "{duplicate[0]}" used for vif and vif-s interfaces!')
+ parent_ifname = config['ifname']
# 802.1q VLANs
- for vlan in config.get('vif', {}):
- vlan = config['vif'][vlan]
+ for vlan_id in config.get('vif', {}):
+ vlan = config['vif'][vlan_id]
+ vlan['ifname'] = f'{parent_ifname}.{vlan_id}'
+
verify_dhcpv6(vlan)
verify_address(vlan)
verify_vrf(vlan)
- verify_redirect(vlan)
+ verify_mirror_redirect(vlan)
verify_mtu_parent(vlan, config)
# 802.1ad (Q-in-Q) VLANs
- for s_vlan in config.get('vif_s', {}):
- s_vlan = config['vif_s'][s_vlan]
+ for s_vlan_id in config.get('vif_s', {}):
+ s_vlan = config['vif_s'][s_vlan_id]
+ s_vlan['ifname'] = f'{parent_ifname}.{s_vlan_id}'
+
verify_dhcpv6(s_vlan)
verify_address(s_vlan)
verify_vrf(s_vlan)
- verify_redirect(s_vlan)
+ verify_mirror_redirect(s_vlan)
verify_mtu_parent(s_vlan, config)
- for c_vlan in s_vlan.get('vif_c', {}):
- c_vlan = s_vlan['vif_c'][c_vlan]
+ for c_vlan_id in s_vlan.get('vif_c', {}):
+ c_vlan = s_vlan['vif_c'][c_vlan_id]
+ c_vlan['ifname'] = f'{parent_ifname}.{s_vlan_id}.{c_vlan_id}'
+
verify_dhcpv6(c_vlan)
verify_address(c_vlan)
verify_vrf(c_vlan)
- verify_redirect(c_vlan)
+ verify_mirror_redirect(c_vlan)
verify_mtu_parent(c_vlan, config)
verify_mtu_parent(c_vlan, s_vlan)
diff --git a/python/vyos/firewall.py b/python/vyos/firewall.py
index 55ce318e7..ff8623592 100644
--- a/python/vyos/firewall.py
+++ b/python/vyos/firewall.py
@@ -174,7 +174,7 @@ def parse_rule(rule_conf, fw_name, rule_id, ip_name):
if 'limit' in rule_conf:
if 'rate' in rule_conf['limit']:
- output.append(f'limit rate {rule_conf["limit"]["rate"]}/second')
+ output.append(f'limit rate {rule_conf["limit"]["rate"]}')
if 'burst' in rule_conf['limit']:
output.append(f'burst {rule_conf["limit"]["burst"]} packets')
diff --git a/python/vyos/ifconfig/interface.py b/python/vyos/ifconfig/interface.py
index f39da90e4..6b0f08fd4 100755
--- a/python/vyos/ifconfig/interface.py
+++ b/python/vyos/ifconfig/interface.py
@@ -13,7 +13,6 @@
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
-from netifaces import interfaces
import os
import re
import json
@@ -39,7 +38,6 @@ from vyos.util import read_file
from vyos.util import get_interface_config
from vyos.util import get_interface_namespace
from vyos.util import is_systemd_service_active
-from vyos.util import is_ipv6_enabled
from vyos.template import is_ipv4
from vyos.template import is_ipv6
from vyos.validate import is_intf_addr_assigned
@@ -1081,12 +1079,6 @@ class Interface(Control):
if addr in self._addr:
return False
- addr_is_v4 = is_ipv4(addr)
-
- # Failsave - do not add IPv6 address if IPv6 is disabled
- if is_ipv6(addr) and not is_ipv6_enabled():
- return False
-
# add to interface
if addr == 'dhcp':
self.set_dhcp(True)
@@ -1294,48 +1286,57 @@ class Interface(Control):
if os.path.isfile(config_file):
os.remove(config_file)
- def set_mirror(self):
+ def set_mirror_redirect(self):
# Please refer to the document for details
# - https://man7.org/linux/man-pages/man8/tc.8.html
# - https://man7.org/linux/man-pages/man8/tc-mirred.8.html
# Depening if we are the source or the target interface of the port
# mirror we need to setup some variables.
source_if = self._config['ifname']
- config = self._config.get('mirror', None)
+ mirror_config = None
+ if 'mirror' in self._config:
+ mirror_config = self._config['mirror']
if 'is_mirror_intf' in self._config:
source_if = next(iter(self._config['is_mirror_intf']))
- config = self._config['is_mirror_intf'][source_if].get('mirror', None)
-
- # Check configuration stored by old perl code before delete T3782/T4056
- if not 'redirect' in self._config and not 'traffic_policy' in self._config:
- # Please do not clear the 'set $? = 0 '. It's meant to force a return of 0
- # Remove existing mirroring rules
- delete_tc_cmd = f'tc qdisc del dev {source_if} handle ffff: ingress 2> /dev/null;'
- delete_tc_cmd += f'tc qdisc del dev {source_if} handle 1: root prio 2> /dev/null;'
- delete_tc_cmd += 'set $?=0'
- self._popen(delete_tc_cmd)
-
- # Bail out early if nothing needs to be configured
- if not config:
- return
-
- for direction, mirror_if in config.items():
- if mirror_if not in interfaces():
- continue
-
- if direction == 'ingress':
- handle = 'ffff: ingress'
- parent = 'ffff:'
- elif direction == 'egress':
- handle = '1: root prio'
- parent = '1:'
-
- # Mirror egress traffic
- mirror_cmd = f'tc qdisc add dev {source_if} handle {handle}; '
- # Export the mirrored traffic to the interface
- mirror_cmd += f'tc filter add dev {source_if} parent {parent} protocol all prio 10 u32 match u32 0 0 flowid 1:1 action mirred egress mirror dev {mirror_if}'
- self._popen(mirror_cmd)
+ mirror_config = self._config['is_mirror_intf'][source_if].get('mirror', None)
+
+ redirect_config = None
+
+ # clear existing ingess - ignore errors (e.g. "Error: Cannot find specified
+ # qdisc on specified device") - we simply cleanup all stuff here
+ self._popen(f'tc qdisc del dev {source_if} parent ffff: 2>/dev/null');
+ self._popen(f'tc qdisc del dev {source_if} parent 1: 2>/dev/null');
+
+ # Apply interface mirror policy
+ if mirror_config:
+ for direction, target_if in mirror_config.items():
+ if direction == 'ingress':
+ handle = 'ffff: ingress'
+ parent = 'ffff:'
+ elif direction == 'egress':
+ handle = '1: root prio'
+ parent = '1:'
+
+ # Mirror egress traffic
+ mirror_cmd = f'tc qdisc add dev {source_if} handle {handle}; '
+ # Export the mirrored traffic to the interface
+ mirror_cmd += f'tc filter add dev {source_if} parent {parent} protocol '\
+ f'all prio 10 u32 match u32 0 0 flowid 1:1 action mirred '\
+ f'egress mirror dev {target_if}'
+ _, err = self._popen(mirror_cmd)
+ if err: print('tc qdisc(filter for mirror port failed')
+
+ # Apply interface traffic redirection policy
+ elif 'redirect' in self._config:
+ _, err = self._popen(f'tc qdisc add dev {source_if} handle ffff: ingress')
+ if err: print(f'tc qdisc add for redirect failed!')
+
+ target_if = self._config['redirect']
+ _, err = self._popen(f'tc filter add dev {source_if} parent ffff: protocol '\
+ f'all prio 10 u32 match u32 0 0 flowid 1:1 action mirred '\
+ f'egress redirect dev {target_if}')
+ if err: print('tc filter add for redirect failed')
def set_xdp(self, state):
"""
@@ -1434,9 +1435,6 @@ class Interface(Control):
else:
self.del_addr(addr)
- for addr in new_addr:
- self.add_addr(addr)
-
# start DHCPv6 client when only PD was configured
if dhcpv6pd:
self.set_dhcpv6(True)
@@ -1451,6 +1449,10 @@ class Interface(Control):
# checked before
self.set_vrf(config.get('vrf', ''))
+ # Add this section after vrf T4331
+ for addr in new_addr:
+ self.add_addr(addr)
+
# Configure MSS value for IPv4 TCP connections
tmp = dict_search('ip.adjust_mss', config)
value = tmp if (tmp != None) else '0'
@@ -1501,55 +1503,56 @@ class Interface(Control):
value = tmp if (tmp != None) else '0'
self.set_ipv4_source_validation(value)
- # Only change IPv6 parameters if IPv6 was not explicitly disabled
- if is_ipv6_enabled():
- # Configure MSS value for IPv6 TCP connections
- tmp = dict_search('ipv6.adjust_mss', config)
- value = tmp if (tmp != None) else '0'
- self.set_tcp_ipv6_mss(value)
-
- # IPv6 forwarding
- tmp = dict_search('ipv6.disable_forwarding', config)
- value = '0' if (tmp != None) else '1'
- self.set_ipv6_forwarding(value)
-
- # IPv6 router advertisements
- tmp = dict_search('ipv6.address.autoconf', config)
- value = '2' if (tmp != None) else '1'
- if 'dhcpv6' in new_addr:
- value = '2'
- self.set_ipv6_accept_ra(value)
-
- # IPv6 address autoconfiguration
- tmp = dict_search('ipv6.address.autoconf', config)
- value = '1' if (tmp != None) else '0'
- self.set_ipv6_autoconf(value)
-
- # IPv6 Duplicate Address Detection (DAD) tries
- tmp = dict_search('ipv6.dup_addr_detect_transmits', config)
- value = tmp if (tmp != None) else '1'
- self.set_ipv6_dad_messages(value)
-
- # Delete old IPv6 EUI64 addresses before changing MAC
- for addr in (dict_search('ipv6.address.eui64_old', config) or []):
- self.del_ipv6_eui64_address(addr)
-
- # Manage IPv6 link-local addresses
- if dict_search('ipv6.address.no_default_link_local', config) != None:
- self.del_ipv6_eui64_address('fe80::/64')
- else:
- self.add_ipv6_eui64_address('fe80::/64')
-
- # Add IPv6 EUI-based addresses
- tmp = dict_search('ipv6.address.eui64', config)
- if tmp:
- for addr in tmp:
- self.add_ipv6_eui64_address(addr)
-
- # MTU - Maximum Transfer Unit
+ # MTU - Maximum Transfer Unit has a default value. It must ALWAYS be set
+ # before mangling any IPv6 option. If MTU is less then 1280 IPv6 will be
+ # automatically disabled by the kernel. Also MTU must be increased before
+ # configuring any IPv6 address on the interface.
if 'mtu' in config:
self.set_mtu(config.get('mtu'))
+ # Configure MSS value for IPv6 TCP connections
+ tmp = dict_search('ipv6.adjust_mss', config)
+ value = tmp if (tmp != None) else '0'
+ self.set_tcp_ipv6_mss(value)
+
+ # IPv6 forwarding
+ tmp = dict_search('ipv6.disable_forwarding', config)
+ value = '0' if (tmp != None) else '1'
+ self.set_ipv6_forwarding(value)
+
+ # IPv6 router advertisements
+ tmp = dict_search('ipv6.address.autoconf', config)
+ value = '2' if (tmp != None) else '1'
+ if 'dhcpv6' in new_addr:
+ value = '2'
+ self.set_ipv6_accept_ra(value)
+
+ # IPv6 address autoconfiguration
+ tmp = dict_search('ipv6.address.autoconf', config)
+ value = '1' if (tmp != None) else '0'
+ self.set_ipv6_autoconf(value)
+
+ # IPv6 Duplicate Address Detection (DAD) tries
+ tmp = dict_search('ipv6.dup_addr_detect_transmits', config)
+ value = tmp if (tmp != None) else '1'
+ self.set_ipv6_dad_messages(value)
+
+ # Delete old IPv6 EUI64 addresses before changing MAC
+ for addr in (dict_search('ipv6.address.eui64_old', config) or []):
+ self.del_ipv6_eui64_address(addr)
+
+ # Manage IPv6 link-local addresses
+ if dict_search('ipv6.address.no_default_link_local', config) != None:
+ self.del_ipv6_eui64_address('fe80::/64')
+ else:
+ self.add_ipv6_eui64_address('fe80::/64')
+
+ # Add IPv6 EUI-based addresses
+ tmp = dict_search('ipv6.address.eui64', config)
+ if tmp:
+ for addr in tmp:
+ self.add_ipv6_eui64_address(addr)
+
# re-add ourselves to any bridge we might have fallen out of
if 'is_bridge_member' in config:
bridge_dict = config.get('is_bridge_member')
@@ -1558,8 +1561,8 @@ class Interface(Control):
# eXpress Data Path - highly experimental
self.set_xdp('xdp' in config)
- # configure port mirror
- self.set_mirror()
+ # configure interface mirror or redirection target
+ self.set_mirror_redirect()
# Enable/Disable of an interface must always be done at the end of the
# derived class to make use of the ref-counting set_admin_state()
@@ -1718,6 +1721,3 @@ class VLANIf(Interface):
return None
return super().set_admin_state(state)
-
- def set_mirror(self):
- return
diff --git a/python/vyos/ifconfig/loopback.py b/python/vyos/ifconfig/loopback.py
index 30c890fdf..b3babfadc 100644
--- a/python/vyos/ifconfig/loopback.py
+++ b/python/vyos/ifconfig/loopback.py
@@ -14,7 +14,6 @@
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
from vyos.ifconfig.interface import Interface
-from vyos.util import is_ipv6_enabled
@Interface.register
class LoopbackIf(Interface):
@@ -58,15 +57,14 @@ class LoopbackIf(Interface):
interface setup code and provide a single point of entry when workin
on any interface. """
- addr = config.get('address', [])
-
+ address = config.get('address', [])
# We must ensure that the loopback addresses are never deleted from the system
- addr.append('127.0.0.1/8')
- if is_ipv6_enabled():
- addr.append('::1/128')
+ for tmp in self._persistent_addresses:
+ if tmp not in address:
+ address.append(tmp)
# Update IP address entry in our dictionary
- config.update({'address' : addr})
+ config.update({'address' : address})
# call base class
super().update(config)
diff --git a/python/vyos/template.py b/python/vyos/template.py
index dabf53692..132f5ddde 100644
--- a/python/vyos/template.py
+++ b/python/vyos/template.py
@@ -18,7 +18,7 @@ import os
from jinja2 import Environment
from jinja2 import FileSystemLoader
-
+from jinja2 import ChainableUndefined
from vyos.defaults import directories
from vyos.util import chmod
from vyos.util import chown
@@ -27,6 +27,7 @@ from vyos.util import makedir
# Holds template filters registered via register_filter()
_FILTERS = {}
+_TESTS = {}
# reuse Environments with identical settings to improve performance
@functools.lru_cache(maxsize=2)
@@ -42,8 +43,10 @@ def _get_environment(location=None):
cache_size=100,
loader=loc_loader,
trim_blocks=True,
+ undefined=ChainableUndefined,
)
env.filters.update(_FILTERS)
+ env.tests.update(_TESTS)
return env
@@ -67,6 +70,26 @@ def register_filter(name, func=None):
_FILTERS[name] = func
return func
+def register_test(name, func=None):
+ """Register a function to be available as test in templates under given name.
+
+ It can also be used as a decorator, see below in this module for examples.
+
+ :raise RuntimeError:
+ when trying to register a test after a template has been rendered already
+ :raise ValueError: when trying to register a name which was taken already
+ """
+ if func is None:
+ return functools.partial(register_test, name)
+ if _get_environment.cache_info().currsize:
+ raise RuntimeError(
+ "Tests can only be registered before rendering the first template"
+ )
+ if name in _TESTS:
+ raise ValueError(f"A test with name {name!r} was registered already")
+ _TESTS[name] = func
+ return func
+
def render_to_string(template, content, formater=None, location=None):
"""Render a template from the template directory, raise on any errors.
@@ -566,3 +589,55 @@ def nft_intra_zone_action(zone_conf, ipv6=False):
name = dict_search_args(intra_zone, 'firewall', fw_name)
return f'jump {name_prefix}{name}'
return 'return'
+
+@register_test('vyos_defined')
+def vyos_defined(value, test_value=None, var_type=None):
+ """
+ Jinja2 plugin to test if a variable is defined and not none - vyos_defined
+ will test value if defined and is not none and return true or false.
+
+ If test_value is supplied, the value must also pass == test_value to return true.
+ If var_type is supplied, the value must also be of the specified class/type
+
+ Examples:
+ 1. Test if var is defined and not none:
+ {% if foo is vyos_defined %}
+ ...
+ {% endif %}
+
+ 2. Test if variable is defined, not none and has value "something"
+ {% if bar is vyos_defined("something") %}
+ ...
+ {% endif %}
+
+ Parameters
+ ----------
+ value : any
+ Value to test from ansible
+ test_value : any, optional
+ Value to test in addition of defined and not none, by default None
+ var_type : ['float', 'int', 'str', 'list', 'dict', 'tuple', 'bool'], optional
+ Type or Class to test for
+
+ Returns
+ -------
+ boolean
+ True if variable matches criteria, False in other cases.
+
+ Implementation inspired and re-used from https://github.com/aristanetworks/ansible-avd/
+ """
+
+ from jinja2 import Undefined
+
+ if isinstance(value, Undefined) or value is None:
+ # Invalid value - return false
+ return False
+ elif test_value is not None and value != test_value:
+ # Valid value but not matching the optional argument
+ return False
+ elif str(var_type).lower() in ['float', 'int', 'str', 'list', 'dict', 'tuple', 'bool'] and str(var_type).lower() != type(value).__name__:
+ # Invalid class - return false
+ return False
+ else:
+ # Valid value and is matching optional argument if provided - return true
+ return True
diff --git a/python/vyos/util.py b/python/vyos/util.py
index f3f323c34..de55e108b 100644
--- a/python/vyos/util.py
+++ b/python/vyos/util.py
@@ -989,6 +989,11 @@ def is_wwan_connected(interface):
if not interface.startswith('wwan'):
raise ValueError(f'Specified interface "{interface}" is not a WWAN interface')
+ # ModemManager is required for connection(s) - if service is not running,
+ # there won't be any connection at all!
+ if not is_systemd_service_active('ModemManager.service'):
+ return False
+
modem = interface.lstrip('wwan')
tmp = cmd(f'mmcli --modem {modem} --output-json')
@@ -1019,7 +1024,3 @@ def sysctl_write(name, value):
call(f'sysctl -wq {name}={value}')
return True
return False
-
-def is_ipv6_enabled() -> bool:
- """ Check if IPv6 support on the system is enabled or not """
- return (sysctl_read('net.ipv6.conf.all.disable_ipv6') == '0')