diff options
Diffstat (limited to 'src/op_mode/firewall.py')
-rwxr-xr-x | src/op_mode/firewall.py | 159 |
1 files changed, 102 insertions, 57 deletions
diff --git a/src/op_mode/firewall.py b/src/op_mode/firewall.py index 581710b31..3434707ec 100755 --- a/src/op_mode/firewall.py +++ b/src/op_mode/firewall.py @@ -24,27 +24,39 @@ from vyos.config import Config from vyos.utils.process import cmd from vyos.utils.dict import dict_search_args -def get_config_firewall(conf, hook=None, priority=None, ipv6=False): +def get_config_firewall(conf, family=None, hook=None, priority=None): config_path = ['firewall'] - if hook: - config_path += ['ipv6' if ipv6 else 'ipv4', hook] - if priority: - config_path += [priority] + if family: + config_path += [family] + if hook: + config_path += [hook] + if priority: + config_path += [priority] firewall = conf.get_config_dict(config_path, key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True) return firewall -def get_nftables_details(hook, priority, ipv6=False): - suffix = '6' if ipv6 else '' - aux = 'IPV6_' if ipv6 else '' - name_prefix = 'NAME6_' if ipv6 else 'NAME_' +def get_nftables_details(family, hook, priority): + if family == 'ipv6': + suffix = 'ip6' + name_prefix = 'NAME6_' + aux='IPV6_' + elif family == 'ipv4': + suffix = 'ip' + name_prefix = 'NAME_' + aux='' + else: + suffix = 'bridge' + name_prefix = 'NAME_' + aux='' + if hook == 'name' or hook == 'ipv6-name': - command = f'sudo nft list chain ip{suffix} vyos_filter {name_prefix}{priority}' + command = f'sudo nft list chain {suffix} vyos_filter {name_prefix}{priority}' else: up_hook = hook.upper() - command = f'sudo nft list chain ip{suffix} vyos_filter VYOS_{aux}{up_hook}_{priority}' + command = f'sudo nft list chain {suffix} vyos_filter VYOS_{aux}{up_hook}_{priority}' try: results = cmd(command) @@ -68,11 +80,10 @@ def get_nftables_details(hook, priority, ipv6=False): out[rule_id] = rule return out -def output_firewall_name(hook, priority, firewall_conf, ipv6=False, single_rule_id=None): - ip_str = 'IPv6' if ipv6 else 'IPv4' - print(f'\n---------------------------------\n{ip_str} Firewall "{hook} {priority}"\n') +def output_firewall_name(family, hook, priority, firewall_conf, single_rule_id=None): + print(f'\n---------------------------------\n{family} Firewall "{hook} {priority}"\n') - details = get_nftables_details(hook, priority, ipv6) + details = get_nftables_details(family, hook, priority) rows = [] if 'rule' in firewall_conf: @@ -103,11 +114,10 @@ def output_firewall_name(hook, priority, firewall_conf, ipv6=False, single_rule_ header = ['Rule', 'Action', 'Protocol', 'Packets', 'Bytes', 'Conditions'] print(tabulate.tabulate(rows, header) + '\n') -def output_firewall_name_statistics(hook, prior, prior_conf, ipv6=False, single_rule_id=None): - ip_str = 'IPv6' if ipv6 else 'IPv4' - print(f'\n---------------------------------\n{ip_str} Firewall "{hook} {prior}"\n') +def output_firewall_name_statistics(family, hook, prior, prior_conf, single_rule_id=None): + print(f'\n---------------------------------\n{family} Firewall "{hook} {prior}"\n') - details = get_nftables_details(hook, prior, ipv6) + details = get_nftables_details(family, hook, prior) rows = [] if 'rule' in prior_conf: @@ -127,7 +137,15 @@ def output_firewall_name_statistics(hook, prior, prior_conf, ipv6=False, single_ if not source_addr: source_addr = dict_search_args(rule_conf, 'source', 'group', 'domain_group') if not source_addr: - source_addr = '::/0' if ipv6 else '0.0.0.0/0' + source_addr = dict_search_args(rule_conf, 'source', 'fqdn') + if not source_addr: + source_addr = dict_search_args(rule_conf, 'source', 'geoip', 'country_code') + if source_addr: + source_addr = str(source_addr)[1:-1].replace('\'','') + if 'inverse_match' in dict_search_args(rule_conf, 'source', 'geoip'): + source_addr = 'NOT ' + str(source_addr) + if not source_addr: + source_addr = 'any' # Get destination dest_addr = dict_search_args(rule_conf, 'destination', 'address') @@ -138,7 +156,15 @@ def output_firewall_name_statistics(hook, prior, prior_conf, ipv6=False, single_ if not dest_addr: dest_addr = dict_search_args(rule_conf, 'destination', 'group', 'domain_group') if not dest_addr: - dest_addr = '::/0' if ipv6 else '0.0.0.0/0' + dest_addr = dict_search_args(rule_conf, 'destination', 'fqdn') + if not dest_addr: + dest_addr = dict_search_args(rule_conf, 'destination', 'geoip', 'country_code') + if dest_addr: + dest_addr = str(dest_addr)[1:-1].replace('\'','') + if 'inverse_match' in dict_search_args(rule_conf, 'destination', 'geoip'): + dest_addr = 'NOT ' + str(dest_addr) + if not dest_addr: + dest_addr = 'any' # Get inbound interface iiface = dict_search_args(rule_conf, 'inbound_interface', 'interface_name') @@ -169,7 +195,22 @@ def output_firewall_name_statistics(hook, prior, prior_conf, ipv6=False, single_ row.append(oiface) rows.append(row) - if 'default_action' in prior_conf and not single_rule_id: + + if hook in ['input', 'forward', 'output']: + row = ['default'] + row.append('N/A') + row.append('N/A') + if 'default_action' in prior_conf: + row.append(prior_conf['default_action']) + else: + row.append('accept') + row.append('any') + row.append('any') + row.append('any') + row.append('any') + rows.append(row) + + elif 'default_action' in prior_conf and not single_rule_id: row = ['default'] if 'default-action' in details: rule_details = details['default-action'] @@ -179,8 +220,10 @@ def output_firewall_name_statistics(hook, prior, prior_conf, ipv6=False, single_ row.append('0') row.append('0') row.append(prior_conf['default_action']) - row.append('0.0.0.0/0') # Source - row.append('0.0.0.0/0') # Dest + row.append('any') # Source + row.append('any') # Dest + row.append('any') # inbound-interface + row.append('any') # outbound-interface rows.append(row) if rows: @@ -196,15 +239,11 @@ def show_firewall(): if not firewall: return - if 'ipv4' in firewall: - for hook, hook_conf in firewall['ipv4'].items(): - for prior, prior_conf in firewall['ipv4'][hook].items(): - output_firewall_name(hook, prior, prior_conf, ipv6=False) - - if 'ipv6' in firewall: - for hook, hook_conf in firewall['ipv6'].items(): - for prior, prior_conf in firewall['ipv6'][hook].items(): - output_firewall_name(hook, prior, prior_conf, ipv6=True) + for family in ['ipv4', 'ipv6', 'bridge']: + if family in firewall: + for hook, hook_conf in firewall[family].items(): + for prior, prior_conf in firewall[family][hook].items(): + output_firewall_name(family, hook, prior, prior_conf) def show_firewall_family(family): print(f'Rulesets {family} Information') @@ -212,31 +251,28 @@ def show_firewall_family(family): conf = Config() firewall = get_config_firewall(conf) - if not firewall: + if not firewall or family not in firewall: return for hook, hook_conf in firewall[family].items(): for prior, prior_conf in firewall[family][hook].items(): - if family == 'ipv6': - output_firewall_name(hook, prior, prior_conf, ipv6=True) - else: - output_firewall_name(hook, prior, prior_conf, ipv6=False) + output_firewall_name(family, hook, prior, prior_conf) -def show_firewall_name(hook, priority, ipv6=False): +def show_firewall_name(family, hook, priority): print('Ruleset Information') conf = Config() - firewall = get_config_firewall(conf, hook, priority, ipv6) + firewall = get_config_firewall(conf, family, hook, priority) if firewall: - output_firewall_name(hook, priority, firewall, ipv6) + output_firewall_name(family, hook, priority, firewall) -def show_firewall_rule(hook, priority, rule_id, ipv6=False): +def show_firewall_rule(family, hook, priority, rule_id): print('Rule Information') conf = Config() - firewall = get_config_firewall(conf, hook, priority, ipv6) + firewall = get_config_firewall(conf, family, hook, priority) if firewall: - output_firewall_name(hook, priority, firewall, ipv6, rule_id) + output_firewall_name(family, hook, priority, firewall, rule_id) def show_firewall_group(name=None): conf = Config() @@ -267,6 +303,8 @@ def show_firewall_group(name=None): for priority, priority_conf in firewall[item][name_type].items(): if priority not in firewall[item][name_type]: continue + if 'rule' not in priority_conf: + continue for rule_id, rule_conf in priority_conf['rule'].items(): source_group = dict_search_args(rule_conf, 'source', 'group', group_type) dest_group = dict_search_args(rule_conf, 'destination', 'group', group_type) @@ -303,7 +341,7 @@ def show_firewall_group(name=None): continue references = find_references(group_type, group_name) - row = [group_name, group_type, '\n'.join(references) or 'N/A'] + row = [group_name, group_type, '\n'.join(references) or 'N/D'] if 'address' in group_conf: row.append("\n".join(sorted(group_conf['address']))) elif 'network' in group_conf: @@ -315,7 +353,7 @@ def show_firewall_group(name=None): elif 'interface' in group_conf: row.append("\n".join(sorted(group_conf['interface']))) else: - row.append('N/A') + row.append('N/D') rows.append(row) if rows: @@ -334,6 +372,7 @@ def show_summary(): header = ['Ruleset Hook', 'Ruleset Priority', 'Description', 'References'] v4_out = [] v6_out = [] + br_out = [] if 'ipv4' in firewall: for hook, hook_conf in firewall['ipv4'].items(): @@ -347,6 +386,12 @@ def show_summary(): description = prior_conf.get('description', '') v6_out.append([hook, prior, description]) + if 'bridge' in firewall: + for hook, hook_conf in firewall['bridge'].items(): + for prior, prior_conf in firewall['bridge'][hook].items(): + description = prior_conf.get('description', '') + br_out.append([hook, prior, description]) + if v6_out: print('\nIPv6 Ruleset:\n') print(tabulate.tabulate(v6_out, header) + '\n') @@ -355,6 +400,10 @@ def show_summary(): print('\nIPv4 Ruleset:\n') print(tabulate.tabulate(v4_out, header) + '\n') + if br_out: + print('\nBridge Ruleset:\n') + print(tabulate.tabulate(br_out, header) + '\n') + show_firewall_group() def show_statistics(): @@ -366,15 +415,11 @@ def show_statistics(): if not firewall: return - if 'ipv4' in firewall: - for hook, hook_conf in firewall['ipv4'].items(): - for prior, prior_conf in firewall['ipv4'][hook].items(): - output_firewall_name_statistics(hook,prior, prior_conf, ipv6=False) - - if 'ipv6' in firewall: - for hook, hook_conf in firewall['ipv6'].items(): - for prior, prior_conf in firewall['ipv6'][hook].items(): - output_firewall_name_statistics(hook,prior, prior_conf, ipv6=True) + for family in ['ipv4', 'ipv6', 'bridge']: + if family in firewall: + for hook, hook_conf in firewall[family].items(): + for prior, prior_conf in firewall[family][hook].items(): + output_firewall_name_statistics(family, hook,prior, prior_conf) if __name__ == '__main__': parser = argparse.ArgumentParser() @@ -390,9 +435,9 @@ if __name__ == '__main__': if args.action == 'show': if not args.rule: - show_firewall_name(args.hook, args.priority, args.ipv6) + show_firewall_name(args.family, args.hook, args.priority) else: - show_firewall_rule(args.hook, args.priority, args.rule, args.ipv6) + show_firewall_rule(args.family, args.hook, args.priority, args.rule) elif args.action == 'show_all': show_firewall() elif args.action == 'show_family': |