From 6ce5fedb602c5ea0df52049a5e9c4fb4f5a86122 Mon Sep 17 00:00:00 2001 From: Nicolas Fort Date: Fri, 5 Jan 2024 12:13:17 +0000 Subject: T4839: firewall: Add dynamic address group in firewall configuration, and appropiate commands to populate such groups using source and destination address of the packet. --- src/conf_mode/nat.py | 4 +++ src/conf_mode/policy_route.py | 4 +++ src/op_mode/firewall.py | 57 +++++++++++++++++++++++++++++-------------- 3 files changed, 47 insertions(+), 18 deletions(-) (limited to 'src') diff --git a/src/conf_mode/nat.py b/src/conf_mode/nat.py index 20570da62..19b206c59 100755 --- a/src/conf_mode/nat.py +++ b/src/conf_mode/nat.py @@ -69,6 +69,10 @@ def get_config(config=None): nat['firewall_group'] = conf.get_config_dict(['firewall', 'group'], key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True) + # Remove dynamic firewall groups if present: + if 'dynamic_group' in nat['firewall_group']: + del nat['firewall_group']['dynamic_group'] + return nat def verify_rule(config, err_msg, groups_dict): diff --git a/src/conf_mode/policy_route.py b/src/conf_mode/policy_route.py index adad012de..6d7a06714 100755 --- a/src/conf_mode/policy_route.py +++ b/src/conf_mode/policy_route.py @@ -53,6 +53,10 @@ def get_config(config=None): policy['firewall_group'] = conf.get_config_dict(['firewall', 'group'], key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True) + # Remove dynamic firewall groups if present: + if 'dynamic_group' in policy['firewall_group']: + del policy['firewall_group']['dynamic_group'] + return policy def verify_rule(policy, name, rule_conf, ipv6, rule_id): diff --git a/src/op_mode/firewall.py b/src/op_mode/firewall.py index 36bb013fe..4dcffc412 100755 --- a/src/op_mode/firewall.py +++ b/src/op_mode/firewall.py @@ -327,6 +327,8 @@ def show_firewall_group(name=None): dest_group = dict_search_args(rule_conf, 'destination', 'group', group_type) in_interface = dict_search_args(rule_conf, 'inbound_interface', 'group') out_interface = dict_search_args(rule_conf, 'outbound_interface', 'group') + dyn_group_source = dict_search_args(rule_conf, 'add_address_to_group', 'source_address', group_type) + dyn_group_dst = dict_search_args(rule_conf, 'add_address_to_group', 'destination_address', group_type) if source_group: if source_group[0] == "!": source_group = source_group[1:] @@ -348,6 +350,14 @@ def show_firewall_group(name=None): if group_name == out_interface: out.append(f'{item}-{name_type}-{priority}-{rule_id}') + if dyn_group_source: + if group_name == dyn_group_source: + out.append(f'{item}-{name_type}-{priority}-{rule_id}') + if dyn_group_dst: + if group_name == dyn_group_dst: + out.append(f'{item}-{name_type}-{priority}-{rule_id}') + + # Look references in route | route6 for name_type in ['route', 'route6']: if name_type not in policy: @@ -423,26 +433,37 @@ def show_firewall_group(name=None): rows = [] for group_type, group_type_conf in firewall['group'].items(): - for group_name, group_conf in group_type_conf.items(): - if name and name != group_name: - continue + ## + if group_type != 'dynamic_group': - references = find_references(group_type, group_name) - row = [group_name, group_type, '\n'.join(references) or 'N/D'] - if 'address' in group_conf: - row.append("\n".join(sorted(group_conf['address']))) - elif 'network' in group_conf: - row.append("\n".join(sorted(group_conf['network'], key=ipaddress.ip_network))) - elif 'mac_address' in group_conf: - row.append("\n".join(sorted(group_conf['mac_address']))) - elif 'port' in group_conf: - row.append("\n".join(sorted(group_conf['port']))) - elif 'interface' in group_conf: - row.append("\n".join(sorted(group_conf['interface']))) - else: - row.append('N/D') - rows.append(row) + for group_name, group_conf in group_type_conf.items(): + if name and name != group_name: + continue + references = find_references(group_type, group_name) + row = [group_name, group_type, '\n'.join(references) or 'N/D'] + if 'address' in group_conf: + row.append("\n".join(sorted(group_conf['address']))) + elif 'network' in group_conf: + row.append("\n".join(sorted(group_conf['network'], key=ipaddress.ip_network))) + elif 'mac_address' in group_conf: + row.append("\n".join(sorted(group_conf['mac_address']))) + elif 'port' in group_conf: + row.append("\n".join(sorted(group_conf['port']))) + elif 'interface' in group_conf: + row.append("\n".join(sorted(group_conf['interface']))) + else: + row.append('N/D') + rows.append(row) + + else: + for dynamic_type in ['address_group', 'ipv6_address_group']: + if dynamic_type in firewall['group']['dynamic_group']: + for dynamic_name, dynamic_conf in firewall['group']['dynamic_group'][dynamic_type].items(): + references = find_references(dynamic_type, dynamic_name) + row = [dynamic_name, dynamic_type + '(dynamic)', '\n'.join(references) or 'N/D'] + row.append('N/D') + rows.append(row) if rows: print('Firewall Groups\n') -- cgit v1.2.3