diff options
Diffstat (limited to 'python/vyos/template.py')
-rw-r--r-- | python/vyos/template.py | 101 |
1 files changed, 99 insertions, 2 deletions
diff --git a/python/vyos/template.py b/python/vyos/template.py index 6f65c6c98..132f5ddde 100644 --- a/python/vyos/template.py +++ b/python/vyos/template.py @@ -18,7 +18,7 @@ import os from jinja2 import Environment from jinja2 import FileSystemLoader - +from jinja2 import ChainableUndefined from vyos.defaults import directories from vyos.util import chmod from vyos.util import chown @@ -27,6 +27,7 @@ from vyos.util import makedir # Holds template filters registered via register_filter() _FILTERS = {} +_TESTS = {} # reuse Environments with identical settings to improve performance @functools.lru_cache(maxsize=2) @@ -42,8 +43,10 @@ def _get_environment(location=None): cache_size=100, loader=loc_loader, trim_blocks=True, + undefined=ChainableUndefined, ) env.filters.update(_FILTERS) + env.tests.update(_TESTS) return env @@ -67,6 +70,26 @@ def register_filter(name, func=None): _FILTERS[name] = func return func +def register_test(name, func=None): + """Register a function to be available as test in templates under given name. + + It can also be used as a decorator, see below in this module for examples. + + :raise RuntimeError: + when trying to register a test after a template has been rendered already + :raise ValueError: when trying to register a name which was taken already + """ + if func is None: + return functools.partial(register_test, name) + if _get_environment.cache_info().currsize: + raise RuntimeError( + "Tests can only be registered before rendering the first template" + ) + if name in _TESTS: + raise ValueError(f"A test with name {name!r} was registered already") + _TESTS[name] = func + return func + def render_to_string(template, content, formater=None, location=None): """Render a template from the template directory, raise on any errors. @@ -127,6 +150,14 @@ def render( ################################## # Custom template filters follow # ################################## +@register_filter('force_to_list') +def force_to_list(value): + """ Convert scalars to single-item lists and leave lists untouched """ + if isinstance(value, list): + return value + else: + return [value] + @register_filter('ip_from_cidr') def ip_from_cidr(prefix): """ Take an IPv4/IPv6 CIDR host and strip cidr mask. @@ -516,6 +547,19 @@ def nft_rule(rule_conf, fw_name, rule_id, ip_name='ip'): from vyos.firewall import parse_rule return parse_rule(rule_conf, fw_name, rule_id, ip_name) +@register_filter('nft_default_rule') +def nft_default_rule(fw_conf, fw_name): + output = ['counter'] + default_action = fw_conf.get('default_action', 'accept') + + if 'enable_default_log' in fw_conf: + action_suffix = default_action[:1].upper() + output.append(f'log prefix "[{fw_name[:19]}-default-{action_suffix}] "') + + output.append(nft_action(default_action)) + output.append(f'comment "{fw_name} default-action {default_action}"') + return " ".join(output) + @register_filter('nft_state_policy') def nft_state_policy(conf, state, ipv6=False): out = [f'ct state {state}'] @@ -535,6 +579,7 @@ def nft_intra_zone_action(zone_conf, ipv6=False): if 'intra_zone_filtering' in zone_conf: intra_zone = zone_conf['intra_zone_filtering'] fw_name = 'ipv6_name' if ipv6 else 'name' + name_prefix = 'NAME6_' if ipv6 else 'NAME_' if 'action' in intra_zone: if intra_zone['action'] == 'accept': @@ -542,5 +587,57 @@ def nft_intra_zone_action(zone_conf, ipv6=False): return intra_zone['action'] elif dict_search_args(intra_zone, 'firewall', fw_name): name = dict_search_args(intra_zone, 'firewall', fw_name) - return f'jump {name}' + return f'jump {name_prefix}{name}' return 'return' + +@register_test('vyos_defined') +def vyos_defined(value, test_value=None, var_type=None): + """ + Jinja2 plugin to test if a variable is defined and not none - vyos_defined + will test value if defined and is not none and return true or false. + + If test_value is supplied, the value must also pass == test_value to return true. + If var_type is supplied, the value must also be of the specified class/type + + Examples: + 1. Test if var is defined and not none: + {% if foo is vyos_defined %} + ... + {% endif %} + + 2. Test if variable is defined, not none and has value "something" + {% if bar is vyos_defined("something") %} + ... + {% endif %} + + Parameters + ---------- + value : any + Value to test from ansible + test_value : any, optional + Value to test in addition of defined and not none, by default None + var_type : ['float', 'int', 'str', 'list', 'dict', 'tuple', 'bool'], optional + Type or Class to test for + + Returns + ------- + boolean + True if variable matches criteria, False in other cases. + + Implementation inspired and re-used from https://github.com/aristanetworks/ansible-avd/ + """ + + from jinja2 import Undefined + + if isinstance(value, Undefined) or value is None: + # Invalid value - return false + return False + elif test_value is not None and value != test_value: + # Valid value but not matching the optional argument + return False + elif str(var_type).lower() in ['float', 'int', 'str', 'list', 'dict', 'tuple', 'bool'] and str(var_type).lower() != type(value).__name__: + # Invalid class - return false + return False + else: + # Valid value and is matching optional argument if provided - return true + return True |