From 5a8e3089b35e2eca2f896a01410fcdf6ac928278 Mon Sep 17 00:00:00 2001 From: sarthurdev <965089+sarthurdev@users.noreply.github.com> Date: Sun, 3 Sep 2023 13:08:00 +0200 Subject: conntrack: T4309: T4903: Refactor `system conntrack ignore` rule generation, add IPv6 support and firewall groups --- python/vyos/template.py | 78 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) (limited to 'python') diff --git a/python/vyos/template.py b/python/vyos/template.py index e167488c6..c1b57b883 100644 --- a/python/vyos/template.py +++ b/python/vyos/template.py @@ -663,6 +663,84 @@ def nat_static_rule(rule_conf, rule_id, nat_type): from vyos.nat import parse_nat_static_rule return parse_nat_static_rule(rule_conf, rule_id, nat_type) +@register_filter('conntrack_ignore_rule') +def conntrack_ignore_rule(rule_conf, rule_id, ipv6=False): + ip_prefix = 'ip6' if ipv6 else 'ip' + def_suffix = '6' if ipv6 else '' + output = [] + + if 'inbound_interface' in rule_conf: + ifname = rule_conf['inbound_interface'] + output.append(f'iifname {ifname}') + + if 'protocol' in rule_conf: + proto = rule_conf['protocol'] + output.append(f'meta l4proto {proto}') + + for side in ['source', 'destination']: + if side in rule_conf: + side_conf = rule_conf[side] + prefix = side[0] + + if 'address' in side_conf: + address = side_conf['address'] + operator = '' + if address[0] == '!': + operator = '!=' + address = address[1:] + output.append(f'{ip_prefix} {prefix}addr {operator} {address}') + + if 'port' in side_conf: + port = side_conf['port'] + operator = '' + if port[0] == '!': + operator = '!=' + port = port[1:] + output.append(f'th {prefix}port {operator} {port}') + + if 'group' in side_conf: + group = side_conf['group'] + + if 'address_group' in group: + group_name = group['address_group'] + operator = '' + if group_name[0] == '!': + operator = '!=' + group_name = group_name[1:] + output.append(f'{ip_prefix} {prefix}addr {operator} @A{def_suffix}_{group_name}') + # Generate firewall group domain-group + elif 'domain_group' in group: + group_name = group['domain_group'] + operator = '' + if group_name[0] == '!': + operator = '!=' + group_name = group_name[1:] + output.append(f'{ip_prefix} {prefix}addr {operator} @D_{group_name}') + elif 'network_group' in group: + group_name = group['network_group'] + operator = '' + if group_name[0] == '!': + operator = '!=' + group_name = group_name[1:] + output.append(f'{ip_prefix} {prefix}addr {operator} @N{def_suffix}_{group_name}') + if 'port_group' in group: + group_name = group['port_group'] + + if proto == 'tcp_udp': + proto = 'th' + + operator = '' + if group_name[0] == '!': + operator = '!=' + group_name = group_name[1:] + + output.append(f'{proto} {prefix}port {operator} @P_{group_name}') + + output.append('counter notrack') + output.append(f'comment "ignore-{rule_id}"') + + return " ".join(output) + @register_filter('range_to_regex') def range_to_regex(num_range): """Convert range of numbers or list of ranges -- cgit v1.2.3