summaryrefslogtreecommitdiff
path: root/src/op_mode/firewall.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/op_mode/firewall.py')
-rwxr-xr-xsrc/op_mode/firewall.py337
1 files changed, 213 insertions, 124 deletions
diff --git a/src/op_mode/firewall.py b/src/op_mode/firewall.py
index 950feb625..11cbd977d 100755
--- a/src/op_mode/firewall.py
+++ b/src/op_mode/firewall.py
@@ -21,65 +21,31 @@ import re
import tabulate
from vyos.config import Config
-from vyos.util import cmd
-from vyos.util import dict_search_args
+from vyos.utils.process import cmd
+from vyos.utils.dict import dict_search_args
-def get_firewall_interfaces(firewall, name=None, ipv6=False):
- directions = ['in', 'out', 'local']
-
- if 'interface' in firewall:
- for ifname, if_conf in firewall['interface'].items():
- for direction in directions:
- if direction not in if_conf:
- continue
-
- fw_conf = if_conf[direction]
- name_str = f'({ifname},{direction})'
-
- if 'name' in fw_conf:
- fw_name = fw_conf['name']
-
- if not name:
- firewall['name'][fw_name]['interface'].append(name_str)
- elif not ipv6 and name == fw_name:
- firewall['interface'].append(name_str)
-
- if 'ipv6_name' in fw_conf:
- fw_name = fw_conf['ipv6_name']
-
- if not name:
- firewall['ipv6_name'][fw_name]['interface'].append(name_str)
- elif ipv6 and name == fw_name:
- firewall['interface'].append(name_str)
-
- return firewall
-
-def get_config_firewall(conf, name=None, ipv6=False, interfaces=True):
+def get_config_firewall(conf, hook=None, priority=None, ipv6=False):
config_path = ['firewall']
- if name:
- config_path += ['ipv6-name' if ipv6 else 'name', name]
+ if hook:
+ config_path += ['ipv6' if ipv6 else 'ipv4', hook]
+ if priority:
+ config_path += [priority]
firewall = conf.get_config_dict(config_path, key_mangling=('-', '_'),
get_first_key=True, no_tag_node_value_mangle=True)
- if firewall and interfaces:
- if name:
- firewall['interface'] = []
- else:
- if 'name' in firewall:
- for fw_name, name_conf in firewall['name'].items():
- name_conf['interface'] = []
- if 'ipv6_name' in firewall:
- for fw_name, name_conf in firewall['ipv6_name'].items():
- name_conf['interface'] = []
-
- get_firewall_interfaces(firewall, name, ipv6)
return firewall
-def get_nftables_details(name, ipv6=False):
+def get_nftables_details(hook, priority, ipv6=False):
suffix = '6' if ipv6 else ''
+ aux = 'IPV6_' if ipv6 else ''
name_prefix = 'NAME6_' if ipv6 else 'NAME_'
- command = f'sudo nft list chain ip{suffix} vyos_filter {name_prefix}{name}'
+ if hook == 'name' or hook == 'ipv6-name':
+ command = f'sudo nft list chain ip{suffix} vyos_filter {name_prefix}{priority}'
+ else:
+ up_hook = hook.upper()
+ command = f'sudo nft list chain ip{suffix} vyos_filter VYOS_{aux}{up_hook}_{priority}'
+
try:
results = cmd(command)
except:
@@ -87,7 +53,7 @@ def get_nftables_details(name, ipv6=False):
out = {}
for line in results.split('\n'):
- comment_search = re.search(rf'{name}[\- ](\d+|default-action)', line)
+ comment_search = re.search(rf'{priority}[\- ](\d+|default-action)', line)
if not comment_search:
continue
@@ -102,18 +68,15 @@ def get_nftables_details(name, ipv6=False):
out[rule_id] = rule
return out
-def output_firewall_name(name, name_conf, ipv6=False, single_rule_id=None):
+def output_firewall_name(hook, priority, firewall_conf, ipv6=False, single_rule_id=None):
ip_str = 'IPv6' if ipv6 else 'IPv4'
- print(f'\n---------------------------------\n{ip_str} Firewall "{name}"\n')
+ print(f'\n---------------------------------\n{ip_str} Firewall "{hook} {priority}"\n')
- if name_conf['interface']:
- print('Active on: {0}\n'.format(" ".join(name_conf['interface'])))
-
- details = get_nftables_details(name, ipv6)
+ details = get_nftables_details(hook, priority, ipv6)
rows = []
- if 'rule' in name_conf:
- for rule_id, rule_conf in name_conf['rule'].items():
+ if 'rule' in firewall_conf:
+ for rule_id, rule_conf in firewall_conf['rule'].items():
if single_rule_id and rule_id != single_rule_id:
continue
@@ -128,8 +91,8 @@ def output_firewall_name(name, name_conf, ipv6=False, single_rule_id=None):
row.append(rule_details['conditions'])
rows.append(row)
- if 'default_action' in name_conf and not single_rule_id:
- row = ['default', name_conf['default_action'], 'all']
+ if 'default_action' in firewall_conf and not single_rule_id:
+ row = ['default', firewall_conf['default_action'], 'all']
if 'default-action' in details:
rule_details = details['default-action']
row.append(rule_details.get('packets', 0))
@@ -140,26 +103,72 @@ def output_firewall_name(name, name_conf, ipv6=False, single_rule_id=None):
header = ['Rule', 'Action', 'Protocol', 'Packets', 'Bytes', 'Conditions']
print(tabulate.tabulate(rows, header) + '\n')
-def output_firewall_name_statistics(name, name_conf, ipv6=False, single_rule_id=None):
+def output_firewall_name_statistics(hook, prior, prior_conf, ipv6=False, single_rule_id=None):
ip_str = 'IPv6' if ipv6 else 'IPv4'
- print(f'\n---------------------------------\n{ip_str} Firewall "{name}"\n')
-
- if name_conf['interface']:
- print('Active on: {0}\n'.format(" ".join(name_conf['interface'])))
+ print(f'\n---------------------------------\n{ip_str} Firewall "{hook} {prior}"\n')
- details = get_nftables_details(name, ipv6)
+ details = get_nftables_details(hook, prior, ipv6)
rows = []
- if 'rule' in name_conf:
- for rule_id, rule_conf in name_conf['rule'].items():
+ if 'rule' in prior_conf:
+ for rule_id, rule_conf in prior_conf['rule'].items():
if single_rule_id and rule_id != single_rule_id:
continue
if 'disable' in rule_conf:
continue
- source_addr = dict_search_args(rule_conf, 'source', 'address') or '0.0.0.0/0'
- dest_addr = dict_search_args(rule_conf, 'destination', 'address') or '0.0.0.0/0'
+ # Get source
+ source_addr = dict_search_args(rule_conf, 'source', 'address')
+ if not source_addr:
+ source_addr = dict_search_args(rule_conf, 'source', 'group', 'address_group')
+ if not source_addr:
+ source_addr = dict_search_args(rule_conf, 'source', 'group', 'network_group')
+ if not source_addr:
+ source_addr = dict_search_args(rule_conf, 'source', 'group', 'domain_group')
+ if not source_addr:
+ source_addr = dict_search_args(rule_conf, 'source', 'fqdn')
+ if not source_addr:
+ source_addr = dict_search_args(rule_conf, 'source', 'geoip', 'country_code')
+ if source_addr:
+ source_addr = str(source_addr)[1:-1].replace('\'','')
+ if 'inverse_match' in dict_search_args(rule_conf, 'source', 'geoip'):
+ source_addr = 'NOT ' + str(source_addr)
+ if not source_addr:
+ source_addr = 'any'
+
+ # Get destination
+ dest_addr = dict_search_args(rule_conf, 'destination', 'address')
+ if not dest_addr:
+ dest_addr = dict_search_args(rule_conf, 'destination', 'group', 'address_group')
+ if not dest_addr:
+ dest_addr = dict_search_args(rule_conf, 'destination', 'group', 'network_group')
+ if not dest_addr:
+ dest_addr = dict_search_args(rule_conf, 'destination', 'group', 'domain_group')
+ if not dest_addr:
+ dest_addr = dict_search_args(rule_conf, 'destination', 'fqdn')
+ if not dest_addr:
+ dest_addr = dict_search_args(rule_conf, 'destination', 'geoip', 'country_code')
+ if dest_addr:
+ dest_addr = str(dest_addr)[1:-1].replace('\'','')
+ if 'inverse_match' in dict_search_args(rule_conf, 'destination', 'geoip'):
+ dest_addr = 'NOT ' + str(dest_addr)
+ if not dest_addr:
+ dest_addr = 'any'
+
+ # Get inbound interface
+ iiface = dict_search_args(rule_conf, 'inbound_interface', 'interface_name')
+ if not iiface:
+ iiface = dict_search_args(rule_conf, 'inbound_interface', 'interface_group')
+ if not iiface:
+ iiface = 'any'
+
+ # Get outbound interface
+ oiface = dict_search_args(rule_conf, 'outbound_interface', 'interface_name')
+ if not oiface:
+ oiface = dict_search_args(rule_conf, 'outbound_interface', 'interface_group')
+ if not oiface:
+ oiface = 'any'
row = [rule_id]
if rule_id in details:
@@ -172,9 +181,26 @@ def output_firewall_name_statistics(name, name_conf, ipv6=False, single_rule_id=
row.append(rule_conf['action'])
row.append(source_addr)
row.append(dest_addr)
+ row.append(iiface)
+ row.append(oiface)
rows.append(row)
- if 'default_action' in name_conf and not single_rule_id:
+
+ if hook in ['input', 'forward', 'output']:
+ row = ['default']
+ row.append('N/A')
+ row.append('N/A')
+ if 'default_action' in prior_conf:
+ row.append(prior_conf['default_action'])
+ else:
+ row.append('accept')
+ row.append('any')
+ row.append('any')
+ row.append('any')
+ row.append('any')
+ rows.append(row)
+
+ elif 'default_action' in prior_conf and not single_rule_id:
row = ['default']
if 'default-action' in details:
rule_details = details['default-action']
@@ -183,13 +209,15 @@ def output_firewall_name_statistics(name, name_conf, ipv6=False, single_rule_id=
else:
row.append('0')
row.append('0')
- row.append(name_conf['default_action'])
- row.append('0.0.0.0/0') # Source
- row.append('0.0.0.0/0') # Dest
+ row.append(prior_conf['default_action'])
+ row.append('any') # Source
+ row.append('any') # Dest
+ row.append('any') # inbound-interface
+ row.append('any') # outbound-interface
rows.append(row)
if rows:
- header = ['Rule', 'Packets', 'Bytes', 'Action', 'Source', 'Destination']
+ header = ['Rule', 'Packets', 'Bytes', 'Action', 'Source', 'Destination', 'Inbound-Interface', 'Outbound-interface']
print(tabulate.tabulate(rows, header) + '\n')
def show_firewall():
@@ -201,52 +229,104 @@ def show_firewall():
if not firewall:
return
- if 'name' in firewall:
- for name, name_conf in firewall['name'].items():
- output_firewall_name(name, name_conf, ipv6=False)
+ if 'ipv4' in firewall:
+ for hook, hook_conf in firewall['ipv4'].items():
+ for prior, prior_conf in firewall['ipv4'][hook].items():
+ output_firewall_name(hook, prior, prior_conf, ipv6=False)
+
+ if 'ipv6' in firewall:
+ for hook, hook_conf in firewall['ipv6'].items():
+ for prior, prior_conf in firewall['ipv6'][hook].items():
+ output_firewall_name(hook, prior, prior_conf, ipv6=True)
- if 'ipv6_name' in firewall:
- for name, name_conf in firewall['ipv6_name'].items():
- output_firewall_name(name, name_conf, ipv6=True)
+def show_firewall_family(family):
+ print(f'Rulesets {family} Information')
-def show_firewall_name(name, ipv6=False):
+ conf = Config()
+ firewall = get_config_firewall(conf)
+
+ if not firewall:
+ return
+
+ for hook, hook_conf in firewall[family].items():
+ for prior, prior_conf in firewall[family][hook].items():
+ if family == 'ipv6':
+ output_firewall_name(hook, prior, prior_conf, ipv6=True)
+ else:
+ output_firewall_name(hook, prior, prior_conf, ipv6=False)
+
+def show_firewall_name(hook, priority, ipv6=False):
print('Ruleset Information')
conf = Config()
- firewall = get_config_firewall(conf, name, ipv6)
+ firewall = get_config_firewall(conf, hook, priority, ipv6)
if firewall:
- output_firewall_name(name, firewall, ipv6)
+ output_firewall_name(hook, priority, firewall, ipv6)
-def show_firewall_rule(name, rule_id, ipv6=False):
+def show_firewall_rule(hook, priority, rule_id, ipv6=False):
print('Rule Information')
conf = Config()
- firewall = get_config_firewall(conf, name, ipv6)
+ firewall = get_config_firewall(conf, hook, priority, ipv6)
if firewall:
- output_firewall_name(name, firewall, ipv6, rule_id)
+ output_firewall_name(hook, priority, firewall, ipv6, rule_id)
def show_firewall_group(name=None):
conf = Config()
- firewall = get_config_firewall(conf, interfaces=False)
+ firewall = get_config_firewall(conf)
if 'group' not in firewall:
return
def find_references(group_type, group_name):
out = []
- for name_type in ['name', 'ipv6_name']:
- if name_type not in firewall:
- continue
- for name, name_conf in firewall[name_type].items():
- if 'rule' not in name_conf:
- continue
- for rule_id, rule_conf in name_conf['rule'].items():
- source_group = dict_search_args(rule_conf, 'source', 'group', group_type)
- dest_group = dict_search_args(rule_conf, 'destination', 'group', group_type)
- if source_group and group_name == source_group:
- out.append(f'{name}-{rule_id}')
- elif dest_group and group_name == dest_group:
- out.append(f'{name}-{rule_id}')
+ family = []
+ if group_type in ['address_group', 'network_group']:
+ family = ['ipv4']
+ elif group_type == 'ipv6_address_group':
+ family = ['ipv6']
+ group_type = 'address_group'
+ elif group_type == 'ipv6_network_group':
+ family = ['ipv6']
+ group_type = 'network_group'
+ else:
+ family = ['ipv4', 'ipv6']
+
+ for item in family:
+ for name_type in ['name', 'ipv6_name', 'forward', 'input', 'output']:
+ if item in firewall:
+ if name_type not in firewall[item]:
+ continue
+ for priority, priority_conf in firewall[item][name_type].items():
+ if priority not in firewall[item][name_type]:
+ continue
+ if 'rule' not in priority_conf:
+ continue
+ for rule_id, rule_conf in priority_conf['rule'].items():
+ source_group = dict_search_args(rule_conf, 'source', 'group', group_type)
+ dest_group = dict_search_args(rule_conf, 'destination', 'group', group_type)
+ in_interface = dict_search_args(rule_conf, 'inbound_interface', 'interface_group')
+ out_interface = dict_search_args(rule_conf, 'outbound_interface', 'interface_group')
+ if source_group:
+ if source_group[0] == "!":
+ source_group = source_group[1:]
+ if group_name == source_group:
+ out.append(f'{item}-{name_type}-{priority}-{rule_id}')
+ if dest_group:
+ if dest_group[0] == "!":
+ dest_group = dest_group[1:]
+ if group_name == dest_group:
+ out.append(f'{item}-{name_type}-{priority}-{rule_id}')
+ if in_interface:
+ if in_interface[0] == "!":
+ in_interface = in_interface[1:]
+ if group_name == in_interface:
+ out.append(f'{item}-{name_type}-{priority}-{rule_id}')
+ if out_interface:
+ if out_interface[0] == "!":
+ out_interface = out_interface[1:]
+ if group_name == out_interface:
+ out.append(f'{item}-{name_type}-{priority}-{rule_id}')
return out
header = ['Name', 'Type', 'References', 'Members']
@@ -258,7 +338,7 @@ def show_firewall_group(name=None):
continue
references = find_references(group_type, group_name)
- row = [group_name, group_type, '\n'.join(references) or 'N/A']
+ row = [group_name, group_type, '\n'.join(references) or 'N/D']
if 'address' in group_conf:
row.append("\n".join(sorted(group_conf['address'])))
elif 'network' in group_conf:
@@ -267,8 +347,10 @@ def show_firewall_group(name=None):
row.append("\n".join(sorted(group_conf['mac_address'])))
elif 'port' in group_conf:
row.append("\n".join(sorted(group_conf['port'])))
+ elif 'interface' in group_conf:
+ row.append("\n".join(sorted(group_conf['interface'])))
else:
- row.append('N/A')
+ row.append('N/D')
rows.append(row)
if rows:
@@ -284,28 +366,28 @@ def show_summary():
if not firewall:
return
- header = ['Ruleset Name', 'Description', 'References']
+ header = ['Ruleset Hook', 'Ruleset Priority', 'Description', 'References']
v4_out = []
v6_out = []
- if 'name' in firewall:
- for name, name_conf in firewall['name'].items():
- description = name_conf.get('description', '')
- interfaces = ", ".join(name_conf['interface'])
- v4_out.append([name, description, interfaces])
+ if 'ipv4' in firewall:
+ for hook, hook_conf in firewall['ipv4'].items():
+ for prior, prior_conf in firewall['ipv4'][hook].items():
+ description = prior_conf.get('description', '')
+ v4_out.append([hook, prior, description])
- if 'ipv6_name' in firewall:
- for name, name_conf in firewall['ipv6_name'].items():
- description = name_conf.get('description', '')
- interfaces = ", ".join(name_conf['interface'])
- v6_out.append([name, description, interfaces or 'N/A'])
+ if 'ipv6' in firewall:
+ for hook, hook_conf in firewall['ipv6'].items():
+ for prior, prior_conf in firewall['ipv6'][hook].items():
+ description = prior_conf.get('description', '')
+ v6_out.append([hook, prior, description])
if v6_out:
- print('\nIPv6 name:\n')
+ print('\nIPv6 Ruleset:\n')
print(tabulate.tabulate(v6_out, header) + '\n')
if v4_out:
- print('\nIPv4 name:\n')
+ print('\nIPv4 Ruleset:\n')
print(tabulate.tabulate(v4_out, header) + '\n')
show_firewall_group()
@@ -319,18 +401,23 @@ def show_statistics():
if not firewall:
return
- if 'name' in firewall:
- for name, name_conf in firewall['name'].items():
- output_firewall_name_statistics(name, name_conf, ipv6=False)
+ if 'ipv4' in firewall:
+ for hook, hook_conf in firewall['ipv4'].items():
+ for prior, prior_conf in firewall['ipv4'][hook].items():
+ output_firewall_name_statistics(hook,prior, prior_conf, ipv6=False)
- if 'ipv6_name' in firewall:
- for name, name_conf in firewall['ipv6_name'].items():
- output_firewall_name_statistics(name, name_conf, ipv6=True)
+ if 'ipv6' in firewall:
+ for hook, hook_conf in firewall['ipv6'].items():
+ for prior, prior_conf in firewall['ipv6'][hook].items():
+ output_firewall_name_statistics(hook,prior, prior_conf, ipv6=True)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--action', help='Action', required=False)
parser.add_argument('--name', help='Firewall name', required=False, action='store', nargs='?', default='')
+ parser.add_argument('--family', help='IP family', required=False, action='store', nargs='?', default='')
+ parser.add_argument('--hook', help='Firewall hook', required=False, action='store', nargs='?', default='')
+ parser.add_argument('--priority', help='Firewall priority', required=False, action='store', nargs='?', default='')
parser.add_argument('--rule', help='Firewall Rule ID', required=False)
parser.add_argument('--ipv6', help='IPv6 toggle', action='store_true')
@@ -338,11 +425,13 @@ if __name__ == '__main__':
if args.action == 'show':
if not args.rule:
- show_firewall_name(args.name, args.ipv6)
+ show_firewall_name(args.hook, args.priority, args.ipv6)
else:
- show_firewall_rule(args.name, args.rule, args.ipv6)
+ show_firewall_rule(args.hook, args.priority, args.rule, args.ipv6)
elif args.action == 'show_all':
show_firewall()
+ elif args.action == 'show_family':
+ show_firewall_family(args.family)
elif args.action == 'show_group':
show_firewall_group(args.name)
elif args.action == 'show_statistics':