summaryrefslogtreecommitdiff
path: root/python/vyos/template.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/vyos/template.py')
-rw-r--r--python/vyos/template.py101
1 files changed, 99 insertions, 2 deletions
diff --git a/python/vyos/template.py b/python/vyos/template.py
index 6f65c6c98..132f5ddde 100644
--- a/python/vyos/template.py
+++ b/python/vyos/template.py
@@ -18,7 +18,7 @@ import os
from jinja2 import Environment
from jinja2 import FileSystemLoader
-
+from jinja2 import ChainableUndefined
from vyos.defaults import directories
from vyos.util import chmod
from vyos.util import chown
@@ -27,6 +27,7 @@ from vyos.util import makedir
# Holds template filters registered via register_filter()
_FILTERS = {}
+_TESTS = {}
# reuse Environments with identical settings to improve performance
@functools.lru_cache(maxsize=2)
@@ -42,8 +43,10 @@ def _get_environment(location=None):
cache_size=100,
loader=loc_loader,
trim_blocks=True,
+ undefined=ChainableUndefined,
)
env.filters.update(_FILTERS)
+ env.tests.update(_TESTS)
return env
@@ -67,6 +70,26 @@ def register_filter(name, func=None):
_FILTERS[name] = func
return func
+def register_test(name, func=None):
+ """Register a function to be available as test in templates under given name.
+
+ It can also be used as a decorator, see below in this module for examples.
+
+ :raise RuntimeError:
+ when trying to register a test after a template has been rendered already
+ :raise ValueError: when trying to register a name which was taken already
+ """
+ if func is None:
+ return functools.partial(register_test, name)
+ if _get_environment.cache_info().currsize:
+ raise RuntimeError(
+ "Tests can only be registered before rendering the first template"
+ )
+ if name in _TESTS:
+ raise ValueError(f"A test with name {name!r} was registered already")
+ _TESTS[name] = func
+ return func
+
def render_to_string(template, content, formater=None, location=None):
"""Render a template from the template directory, raise on any errors.
@@ -127,6 +150,14 @@ def render(
##################################
# Custom template filters follow #
##################################
+@register_filter('force_to_list')
+def force_to_list(value):
+ """ Convert scalars to single-item lists and leave lists untouched """
+ if isinstance(value, list):
+ return value
+ else:
+ return [value]
+
@register_filter('ip_from_cidr')
def ip_from_cidr(prefix):
""" Take an IPv4/IPv6 CIDR host and strip cidr mask.
@@ -516,6 +547,19 @@ def nft_rule(rule_conf, fw_name, rule_id, ip_name='ip'):
from vyos.firewall import parse_rule
return parse_rule(rule_conf, fw_name, rule_id, ip_name)
+@register_filter('nft_default_rule')
+def nft_default_rule(fw_conf, fw_name):
+ output = ['counter']
+ default_action = fw_conf.get('default_action', 'accept')
+
+ if 'enable_default_log' in fw_conf:
+ action_suffix = default_action[:1].upper()
+ output.append(f'log prefix "[{fw_name[:19]}-default-{action_suffix}] "')
+
+ output.append(nft_action(default_action))
+ output.append(f'comment "{fw_name} default-action {default_action}"')
+ return " ".join(output)
+
@register_filter('nft_state_policy')
def nft_state_policy(conf, state, ipv6=False):
out = [f'ct state {state}']
@@ -535,6 +579,7 @@ def nft_intra_zone_action(zone_conf, ipv6=False):
if 'intra_zone_filtering' in zone_conf:
intra_zone = zone_conf['intra_zone_filtering']
fw_name = 'ipv6_name' if ipv6 else 'name'
+ name_prefix = 'NAME6_' if ipv6 else 'NAME_'
if 'action' in intra_zone:
if intra_zone['action'] == 'accept':
@@ -542,5 +587,57 @@ def nft_intra_zone_action(zone_conf, ipv6=False):
return intra_zone['action']
elif dict_search_args(intra_zone, 'firewall', fw_name):
name = dict_search_args(intra_zone, 'firewall', fw_name)
- return f'jump {name}'
+ return f'jump {name_prefix}{name}'
return 'return'
+
+@register_test('vyos_defined')
+def vyos_defined(value, test_value=None, var_type=None):
+ """
+ Jinja2 plugin to test if a variable is defined and not none - vyos_defined
+ will test value if defined and is not none and return true or false.
+
+ If test_value is supplied, the value must also pass == test_value to return true.
+ If var_type is supplied, the value must also be of the specified class/type
+
+ Examples:
+ 1. Test if var is defined and not none:
+ {% if foo is vyos_defined %}
+ ...
+ {% endif %}
+
+ 2. Test if variable is defined, not none and has value "something"
+ {% if bar is vyos_defined("something") %}
+ ...
+ {% endif %}
+
+ Parameters
+ ----------
+ value : any
+ Value to test from ansible
+ test_value : any, optional
+ Value to test in addition of defined and not none, by default None
+ var_type : ['float', 'int', 'str', 'list', 'dict', 'tuple', 'bool'], optional
+ Type or Class to test for
+
+ Returns
+ -------
+ boolean
+ True if variable matches criteria, False in other cases.
+
+ Implementation inspired and re-used from https://github.com/aristanetworks/ansible-avd/
+ """
+
+ from jinja2 import Undefined
+
+ if isinstance(value, Undefined) or value is None:
+ # Invalid value - return false
+ return False
+ elif test_value is not None and value != test_value:
+ # Valid value but not matching the optional argument
+ return False
+ elif str(var_type).lower() in ['float', 'int', 'str', 'list', 'dict', 'tuple', 'bool'] and str(var_type).lower() != type(value).__name__:
+ # Invalid class - return false
+ return False
+ else:
+ # Valid value and is matching optional argument if provided - return true
+ return True