summaryrefslogtreecommitdiff
path: root/src/op_mode/firewall.py
diff options
context:
space:
mode:
authorDaniil Baturin <daniil@vyos.io>2023-11-03 17:04:56 +0000
committerGitHub <noreply@github.com>2023-11-03 17:04:56 +0000
commit1512d64c929fbb8de37a2d5b816e352e5942bfe0 (patch)
treefa9f24f68312cd450642ae3f5d4c480c149b88b3 /src/op_mode/firewall.py
parent8391fb5f2c3db32a925f23c3df4fa08d4980c252 (diff)
parent72b8eb4ac66a5db2848db804faca7ec56c15aab0 (diff)
downloadvyos-1x-1512d64c929fbb8de37a2d5b816e352e5942bfe0.tar.gz
vyos-1x-1512d64c929fbb8de37a2d5b816e352e5942bfe0.zip
Merge pull request #2432 from nicolas-fort/T5513-fwall-show-sagitta
T5513: firewall - op-mode command backport
Diffstat (limited to 'src/op_mode/firewall.py')
-rwxr-xr-xsrc/op_mode/firewall.py284
1 files changed, 210 insertions, 74 deletions
diff --git a/src/op_mode/firewall.py b/src/op_mode/firewall.py
index c5d5848c2..d426b62e5 100755
--- a/src/op_mode/firewall.py
+++ b/src/op_mode/firewall.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2021 VyOS maintainers and contributors
+# Copyright (C) 2023 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -24,27 +24,48 @@ from vyos.config import Config
from vyos.utils.process import cmd
from vyos.utils.dict import dict_search_args
-def get_config_firewall(conf, hook=None, priority=None, ipv6=False):
- config_path = ['firewall']
- if hook:
- config_path += ['ipv6' if ipv6 else 'ipv4', hook]
- if priority:
- config_path += [priority]
+def get_config_node(conf, node=None, family=None, hook=None, priority=None):
+ if node == 'nat':
+ if family == 'ipv6':
+ config_path = ['nat66']
+ else:
+ config_path = ['nat']
- firewall = conf.get_config_dict(config_path, key_mangling=('-', '_'),
+ elif node == 'policy':
+ config_path = ['policy']
+ else:
+ config_path = ['firewall']
+ if family:
+ config_path += [family]
+ if hook:
+ config_path += [hook]
+ if priority:
+ config_path += [priority]
+
+ node_config = conf.get_config_dict(config_path, key_mangling=('-', '_'),
get_first_key=True, no_tag_node_value_mangle=True)
- return firewall
+ return node_config
+
+def get_nftables_details(family, hook, priority):
+ if family == 'ipv6':
+ suffix = 'ip6'
+ name_prefix = 'NAME6_'
+ aux='IPV6_'
+ elif family == 'ipv4':
+ suffix = 'ip'
+ name_prefix = 'NAME_'
+ aux=''
+ else:
+ suffix = 'bridge'
+ name_prefix = 'NAME_'
+ aux=''
-def get_nftables_details(hook, priority, ipv6=False):
- suffix = '6' if ipv6 else ''
- aux = 'IPV6_' if ipv6 else ''
- name_prefix = 'NAME6_' if ipv6 else 'NAME_'
if hook == 'name' or hook == 'ipv6-name':
- command = f'sudo nft list chain ip{suffix} vyos_filter {name_prefix}{priority}'
+ command = f'sudo nft list chain {suffix} vyos_filter {name_prefix}{priority}'
else:
up_hook = hook.upper()
- command = f'sudo nft list chain ip{suffix} vyos_filter VYOS_{aux}{up_hook}_{priority}'
+ command = f'sudo nft list chain {suffix} vyos_filter VYOS_{aux}{up_hook}_{priority}'
try:
results = cmd(command)
@@ -68,11 +89,10 @@ def get_nftables_details(hook, priority, ipv6=False):
out[rule_id] = rule
return out
-def output_firewall_name(hook, priority, firewall_conf, ipv6=False, single_rule_id=None):
- ip_str = 'IPv6' if ipv6 else 'IPv4'
- print(f'\n---------------------------------\n{ip_str} Firewall "{hook} {priority}"\n')
+def output_firewall_name(family, hook, priority, firewall_conf, single_rule_id=None):
+ print(f'\n---------------------------------\n{family} Firewall "{hook} {priority}"\n')
- details = get_nftables_details(hook, priority, ipv6)
+ details = get_nftables_details(family, hook, priority)
rows = []
if 'rule' in firewall_conf:
@@ -91,7 +111,15 @@ def output_firewall_name(hook, priority, firewall_conf, ipv6=False, single_rule_
row.append(rule_details['conditions'])
rows.append(row)
- if 'default_action' in firewall_conf and not single_rule_id:
+ if hook in ['input', 'forward', 'output']:
+ def_action = firewall_conf['default_action'] if 'default_action' in firewall_conf else 'accept'
+ row = ['default', def_action, 'all']
+ rule_details = details['default-action']
+ row.append(rule_details.get('packets', 0))
+ row.append(rule_details.get('bytes', 0))
+ rows.append(row)
+
+ elif 'default_action' in firewall_conf and not single_rule_id:
row = ['default', firewall_conf['default_action'], 'all']
if 'default-action' in details:
rule_details = details['default-action']
@@ -103,11 +131,10 @@ def output_firewall_name(hook, priority, firewall_conf, ipv6=False, single_rule_
header = ['Rule', 'Action', 'Protocol', 'Packets', 'Bytes', 'Conditions']
print(tabulate.tabulate(rows, header) + '\n')
-def output_firewall_name_statistics(hook, prior, prior_conf, ipv6=False, single_rule_id=None):
- ip_str = 'IPv6' if ipv6 else 'IPv4'
- print(f'\n---------------------------------\n{ip_str} Firewall "{hook} {prior}"\n')
+def output_firewall_name_statistics(family, hook, prior, prior_conf, single_rule_id=None):
+ print(f'\n---------------------------------\n{family} Firewall "{hook} {prior}"\n')
- details = get_nftables_details(hook, prior, ipv6)
+ details = get_nftables_details(family, hook, prior)
rows = []
if 'rule' in prior_conf:
@@ -127,7 +154,15 @@ def output_firewall_name_statistics(hook, prior, prior_conf, ipv6=False, single_
if not source_addr:
source_addr = dict_search_args(rule_conf, 'source', 'group', 'domain_group')
if not source_addr:
- source_addr = '::/0' if ipv6 else '0.0.0.0/0'
+ source_addr = dict_search_args(rule_conf, 'source', 'fqdn')
+ if not source_addr:
+ source_addr = dict_search_args(rule_conf, 'source', 'geoip', 'country_code')
+ if source_addr:
+ source_addr = str(source_addr)[1:-1].replace('\'','')
+ if 'inverse_match' in dict_search_args(rule_conf, 'source', 'geoip'):
+ source_addr = 'NOT ' + str(source_addr)
+ if not source_addr:
+ source_addr = 'any'
# Get destination
dest_addr = dict_search_args(rule_conf, 'destination', 'address')
@@ -138,19 +173,27 @@ def output_firewall_name_statistics(hook, prior, prior_conf, ipv6=False, single_
if not dest_addr:
dest_addr = dict_search_args(rule_conf, 'destination', 'group', 'domain_group')
if not dest_addr:
- dest_addr = '::/0' if ipv6 else '0.0.0.0/0'
+ dest_addr = dict_search_args(rule_conf, 'destination', 'fqdn')
+ if not dest_addr:
+ dest_addr = dict_search_args(rule_conf, 'destination', 'geoip', 'country_code')
+ if dest_addr:
+ dest_addr = str(dest_addr)[1:-1].replace('\'','')
+ if 'inverse_match' in dict_search_args(rule_conf, 'destination', 'geoip'):
+ dest_addr = 'NOT ' + str(dest_addr)
+ if not dest_addr:
+ dest_addr = 'any'
# Get inbound interface
- iiface = dict_search_args(rule_conf, 'inbound_interface', 'interface_name')
+ iiface = dict_search_args(rule_conf, 'inbound_interface', 'name')
if not iiface:
- iiface = dict_search_args(rule_conf, 'inbound_interface', 'interface_group')
+ iiface = dict_search_args(rule_conf, 'inbound_interface', 'group')
if not iiface:
iiface = 'any'
# Get outbound interface
- oiface = dict_search_args(rule_conf, 'outbound_interface', 'interface_name')
+ oiface = dict_search_args(rule_conf, 'outbound_interface', 'name')
if not oiface:
- oiface = dict_search_args(rule_conf, 'outbound_interface', 'interface_group')
+ oiface = dict_search_args(rule_conf, 'outbound_interface', 'group')
if not oiface:
oiface = 'any'
@@ -169,7 +212,23 @@ def output_firewall_name_statistics(hook, prior, prior_conf, ipv6=False, single_
row.append(oiface)
rows.append(row)
- if 'default_action' in prior_conf and not single_rule_id:
+
+ if hook in ['input', 'forward', 'output']:
+ row = ['default']
+ rule_details = details['default-action']
+ row.append(rule_details.get('packets', 0))
+ row.append(rule_details.get('bytes', 0))
+ if 'default_action' in prior_conf:
+ row.append(prior_conf['default_action'])
+ else:
+ row.append('accept')
+ row.append('any')
+ row.append('any')
+ row.append('any')
+ row.append('any')
+ rows.append(row)
+
+ elif 'default_action' in prior_conf and not single_rule_id:
row = ['default']
if 'default-action' in details:
rule_details = details['default-action']
@@ -179,8 +238,10 @@ def output_firewall_name_statistics(hook, prior, prior_conf, ipv6=False, single_
row.append('0')
row.append('0')
row.append(prior_conf['default_action'])
- row.append('0.0.0.0/0') # Source
- row.append('0.0.0.0/0') # Dest
+ row.append('any') # Source
+ row.append('any') # Dest
+ row.append('any') # inbound-interface
+ row.append('any') # outbound-interface
rows.append(row)
if rows:
@@ -191,60 +252,56 @@ def show_firewall():
print('Rulesets Information')
conf = Config()
- firewall = get_config_firewall(conf)
+ firewall = get_config_node(conf)
if not firewall:
return
- if 'ipv4' in firewall:
- for hook, hook_conf in firewall['ipv4'].items():
- for prior, prior_conf in firewall['ipv4'][hook].items():
- output_firewall_name(hook, prior, prior_conf, ipv6=False)
-
- if 'ipv6' in firewall:
- for hook, hook_conf in firewall['ipv6'].items():
- for prior, prior_conf in firewall['ipv6'][hook].items():
- output_firewall_name(hook, prior, prior_conf, ipv6=True)
+ for family in ['ipv4', 'ipv6', 'bridge']:
+ if family in firewall:
+ for hook, hook_conf in firewall[family].items():
+ for prior, prior_conf in firewall[family][hook].items():
+ output_firewall_name(family, hook, prior, prior_conf)
def show_firewall_family(family):
print(f'Rulesets {family} Information')
conf = Config()
- firewall = get_config_firewall(conf)
+ firewall = get_config_node(conf)
- if not firewall:
+ if not firewall or family not in firewall:
return
for hook, hook_conf in firewall[family].items():
for prior, prior_conf in firewall[family][hook].items():
- if family == 'ipv6':
- output_firewall_name(hook, prior, prior_conf, ipv6=True)
- else:
- output_firewall_name(hook, prior, prior_conf, ipv6=False)
+ output_firewall_name(family, hook, prior, prior_conf)
-def show_firewall_name(hook, priority, ipv6=False):
+def show_firewall_name(family, hook, priority):
print('Ruleset Information')
conf = Config()
- firewall = get_config_firewall(conf, hook, priority, ipv6)
+ firewall = get_config_node(conf, 'firewall', family, hook, priority)
if firewall:
- output_firewall_name(hook, priority, firewall, ipv6)
+ output_firewall_name(family, hook, priority, firewall)
-def show_firewall_rule(hook, priority, rule_id, ipv6=False):
+def show_firewall_rule(family, hook, priority, rule_id):
print('Rule Information')
conf = Config()
- firewall = get_config_firewall(conf, hook, priority, ipv6)
+ firewall = get_config_node(conf, 'firewall', family, hook, priority)
if firewall:
- output_firewall_name(hook, priority, firewall, ipv6, rule_id)
+ output_firewall_name(family, hook, priority, firewall, rule_id)
def show_firewall_group(name=None):
conf = Config()
- firewall = get_config_firewall(conf)
+ firewall = get_config_node(conf, node='firewall')
if 'group' not in firewall:
return
+ nat = get_config_node(conf, node='nat')
+ policy = get_config_node(conf, node='policy')
+
def find_references(group_type, group_name):
out = []
family = []
@@ -260,6 +317,7 @@ def show_firewall_group(name=None):
family = ['ipv4', 'ipv6']
for item in family:
+ # Look references in firewall
for name_type in ['name', 'ipv6_name', 'forward', 'input', 'output']:
if item in firewall:
if name_type not in firewall[item]:
@@ -272,8 +330,8 @@ def show_firewall_group(name=None):
for rule_id, rule_conf in priority_conf['rule'].items():
source_group = dict_search_args(rule_conf, 'source', 'group', group_type)
dest_group = dict_search_args(rule_conf, 'destination', 'group', group_type)
- in_interface = dict_search_args(rule_conf, 'inbound_interface', 'interface_group')
- out_interface = dict_search_args(rule_conf, 'outbound_interface', 'interface_group')
+ in_interface = dict_search_args(rule_conf, 'inbound_interface', 'group')
+ out_interface = dict_search_args(rule_conf, 'outbound_interface', 'group')
if source_group:
if source_group[0] == "!":
source_group = source_group[1:]
@@ -294,6 +352,76 @@ def show_firewall_group(name=None):
out_interface = out_interface[1:]
if group_name == out_interface:
out.append(f'{item}-{name_type}-{priority}-{rule_id}')
+
+ # Look references in route | route6
+ for name_type in ['route', 'route6']:
+ if name_type not in policy:
+ continue
+ if name_type == 'route' and item == 'ipv6':
+ continue
+ elif name_type == 'route6' and item == 'ipv4':
+ continue
+ else:
+ for policy_name, policy_conf in policy[name_type].items():
+ if 'rule' not in policy_conf:
+ continue
+ for rule_id, rule_conf in policy_conf['rule'].items():
+ source_group = dict_search_args(rule_conf, 'source', 'group', group_type)
+ dest_group = dict_search_args(rule_conf, 'destination', 'group', group_type)
+ in_interface = dict_search_args(rule_conf, 'inbound_interface', 'group')
+ out_interface = dict_search_args(rule_conf, 'outbound_interface', 'group')
+ if source_group:
+ if source_group[0] == "!":
+ source_group = source_group[1:]
+ if group_name == source_group:
+ out.append(f'{name_type}-{policy_name}-{rule_id}')
+ if dest_group:
+ if dest_group[0] == "!":
+ dest_group = dest_group[1:]
+ if group_name == dest_group:
+ out.append(f'{name_type}-{policy_name}-{rule_id}')
+ if in_interface:
+ if in_interface[0] == "!":
+ in_interface = in_interface[1:]
+ if group_name == in_interface:
+ out.append(f'{name_type}-{policy_name}-{rule_id}')
+ if out_interface:
+ if out_interface[0] == "!":
+ out_interface = out_interface[1:]
+ if group_name == out_interface:
+ out.append(f'{name_type}-{policy_name}-{rule_id}')
+
+ ## Look references in nat table
+ for direction in ['source', 'destination']:
+ if direction in nat:
+ if 'rule' not in nat[direction]:
+ continue
+ for rule_id, rule_conf in nat[direction]['rule'].items():
+ source_group = dict_search_args(rule_conf, 'source', 'group', group_type)
+ dest_group = dict_search_args(rule_conf, 'destination', 'group', group_type)
+ in_interface = dict_search_args(rule_conf, 'inbound_interface', 'group')
+ out_interface = dict_search_args(rule_conf, 'outbound_interface', 'group')
+ if source_group:
+ if source_group[0] == "!":
+ source_group = source_group[1:]
+ if group_name == source_group:
+ out.append(f'nat-{direction}-{rule_id}')
+ if dest_group:
+ if dest_group[0] == "!":
+ dest_group = dest_group[1:]
+ if group_name == dest_group:
+ out.append(f'nat-{direction}-{rule_id}')
+ if in_interface:
+ if in_interface[0] == "!":
+ in_interface = in_interface[1:]
+ if group_name == in_interface:
+ out.append(f'nat-{direction}-{rule_id}')
+ if out_interface:
+ if out_interface[0] == "!":
+ out_interface = out_interface[1:]
+ if group_name == out_interface:
+ out.append(f'nat-{direction}-{rule_id}')
+
return out
header = ['Name', 'Type', 'References', 'Members']
@@ -305,7 +433,7 @@ def show_firewall_group(name=None):
continue
references = find_references(group_type, group_name)
- row = [group_name, group_type, '\n'.join(references) or 'N/A']
+ row = [group_name, group_type, '\n'.join(references) or 'N/D']
if 'address' in group_conf:
row.append("\n".join(sorted(group_conf['address'])))
elif 'network' in group_conf:
@@ -317,9 +445,10 @@ def show_firewall_group(name=None):
elif 'interface' in group_conf:
row.append("\n".join(sorted(group_conf['interface'])))
else:
- row.append('N/A')
+ row.append('N/D')
rows.append(row)
+
if rows:
print('Firewall Groups\n')
print(tabulate.tabulate(rows, header))
@@ -328,7 +457,7 @@ def show_summary():
print('Ruleset Summary')
conf = Config()
- firewall = get_config_firewall(conf)
+ firewall = get_config_node(conf)
if not firewall:
return
@@ -336,6 +465,7 @@ def show_summary():
header = ['Ruleset Hook', 'Ruleset Priority', 'Description', 'References']
v4_out = []
v6_out = []
+ br_out = []
if 'ipv4' in firewall:
for hook, hook_conf in firewall['ipv4'].items():
@@ -349,6 +479,12 @@ def show_summary():
description = prior_conf.get('description', '')
v6_out.append([hook, prior, description])
+ if 'bridge' in firewall:
+ for hook, hook_conf in firewall['bridge'].items():
+ for prior, prior_conf in firewall['bridge'][hook].items():
+ description = prior_conf.get('description', '')
+ br_out.append([hook, prior, description])
+
if v6_out:
print('\nIPv6 Ruleset:\n')
print(tabulate.tabulate(v6_out, header) + '\n')
@@ -357,26 +493,26 @@ def show_summary():
print('\nIPv4 Ruleset:\n')
print(tabulate.tabulate(v4_out, header) + '\n')
+ if br_out:
+ print('\nBridge Ruleset:\n')
+ print(tabulate.tabulate(br_out, header) + '\n')
+
show_firewall_group()
def show_statistics():
print('Rulesets Statistics')
conf = Config()
- firewall = get_config_firewall(conf)
+ firewall = get_config_node(conf)
if not firewall:
return
- if 'ipv4' in firewall:
- for hook, hook_conf in firewall['ipv4'].items():
- for prior, prior_conf in firewall['ipv4'][hook].items():
- output_firewall_name_statistics(hook,prior, prior_conf, ipv6=False)
-
- if 'ipv6' in firewall:
- for hook, hook_conf in firewall['ipv6'].items():
- for prior, prior_conf in firewall['ipv6'][hook].items():
- output_firewall_name_statistics(hook,prior, prior_conf, ipv6=True)
+ for family in ['ipv4', 'ipv6', 'bridge']:
+ if family in firewall:
+ for hook, hook_conf in firewall[family].items():
+ for prior, prior_conf in firewall[family][hook].items():
+ output_firewall_name_statistics(family, hook,prior, prior_conf)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
@@ -392,9 +528,9 @@ if __name__ == '__main__':
if args.action == 'show':
if not args.rule:
- show_firewall_name(args.hook, args.priority, args.ipv6)
+ show_firewall_name(args.family, args.hook, args.priority)
else:
- show_firewall_rule(args.hook, args.priority, args.rule, args.ipv6)
+ show_firewall_rule(args.family, args.hook, args.priority, args.rule)
elif args.action == 'show_all':
show_firewall()
elif args.action == 'show_family':
@@ -404,4 +540,4 @@ if __name__ == '__main__':
elif args.action == 'show_statistics':
show_statistics()
elif args.action == 'show_summary':
- show_summary()
+ show_summary() \ No newline at end of file