summaryrefslogtreecommitdiff
path: root/src/conf_mode/firewall.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/conf_mode/firewall.py')
-rwxr-xr-xsrc/conf_mode/firewall.py163
1 files changed, 99 insertions, 64 deletions
diff --git a/src/conf_mode/firewall.py b/src/conf_mode/firewall.py
index 352d5cbb1..02bf00bcc 100755
--- a/src/conf_mode/firewall.py
+++ b/src/conf_mode/firewall.py
@@ -36,6 +36,7 @@ from vyos.utils.process import cmd
from vyos.utils.process import rc_cmd
from vyos import ConfigError
from vyos import airbag
+from subprocess import run as subp_run
airbag.enable()
@@ -47,7 +48,12 @@ valid_groups = [
'domain_group',
'network_group',
'port_group',
- 'interface_group'
+ 'interface_group',
+ ## Added for group ussage in bridge firewall
+ 'ipv4_address_group',
+ 'ipv6_address_group',
+ 'ipv4_network_group',
+ 'ipv6_network_group'
]
nested_group_types = [
@@ -128,41 +134,32 @@ def get_config(config=None):
return firewall
-def verify_jump_target(firewall, root_chain, jump_target, ipv6, recursive=False):
+def verify_jump_target(firewall, hook, jump_target, family, recursive=False):
targets_seen = []
targets_pending = [jump_target]
while targets_pending:
target = targets_pending.pop()
- if not ipv6:
- if target not in dict_search_args(firewall, 'ipv4', 'name'):
- raise ConfigError(f'Invalid jump-target. Firewall name {target} does not exist on the system')
- target_rules = dict_search_args(firewall, 'ipv4', 'name', target, 'rule')
- else:
- if target not in dict_search_args(firewall, 'ipv6', 'name'):
- raise ConfigError(f'Invalid jump-target. Firewall ipv6 name {target} does not exist on the system')
- target_rules = dict_search_args(firewall, 'ipv6', 'name', target, 'rule')
+ if 'name' not in firewall[family]:
+ raise ConfigError(f'Invalid jump-target. Firewall {family} name {target} does not exist on the system')
+ elif target not in dict_search_args(firewall, family, 'name'):
+ raise ConfigError(f'Invalid jump-target. Firewall {family} name {target} does not exist on the system')
- no_ipsec_in = root_chain in ('output', )
+ target_rules = dict_search_args(firewall, family, 'name', target, 'rule')
+ no_ipsec_in = hook in ('output', )
if target_rules:
for target_rule_conf in target_rules.values():
# Output hook types will not tolerate 'meta ipsec exists' matches even in jump targets:
if no_ipsec_in and (dict_search_args(target_rule_conf, 'ipsec', 'match_ipsec_in') is not None \
or dict_search_args(target_rule_conf, 'ipsec', 'match_none_in') is not None):
- if not ipv6:
- raise ConfigError(f'Invalid jump-target for {root_chain}. Firewall name {target} rules contain incompatible ipsec inbound matches')
- else:
- raise ConfigError(f'Invalid jump-target for {root_chain}. Firewall ipv6 name {target} rules contain incompatible ipsec inbound matches')
+ raise ConfigError(f'Invalid jump-target for {hook}. Firewall {family} name {target} rules contain incompatible ipsec inbound matches')
# Make sure we're not looping back on ourselves somewhere:
if recursive and 'jump_target' in target_rule_conf:
child_target = target_rule_conf['jump_target']
if child_target in targets_seen:
- if not ipv6:
- raise ConfigError(f'Loop detected in jump-targets, firewall name {target} refers to previously traversed name {child_target}')
- else:
- raise ConfigError(f'Loop detected in jump-targets, firewall ipv6 name {target} refers to previously traversed ipv6 name {child_target}')
+ raise ConfigError(f'Loop detected in jump-targets, firewall {family} name {target} refers to previously traversed {family} name {child_target}')
targets_pending.append(child_target)
if len(targets_seen) == 7:
path_txt = ' -> '.join(targets_seen)
@@ -170,7 +167,7 @@ def verify_jump_target(firewall, root_chain, jump_target, ipv6, recursive=False)
targets_seen.append(target)
-def verify_rule(firewall, chain_name, rule_conf, ipv6):
+def verify_rule(firewall, family, hook, priority, rule_id, rule_conf):
if 'action' not in rule_conf:
raise ConfigError('Rule action must be defined')
@@ -181,10 +178,10 @@ def verify_rule(firewall, chain_name, rule_conf, ipv6):
if 'jump' not in rule_conf['action']:
raise ConfigError('jump-target defined, but action jump needed and it is not defined')
target = rule_conf['jump_target']
- if chain_name != 'name': # This is a bit clumsy, but consolidates a chunk of code.
- verify_jump_target(firewall, chain_name, target, ipv6, recursive=True)
+ if hook != 'name': # This is a bit clumsy, but consolidates a chunk of code.
+ verify_jump_target(firewall, hook, target, family, recursive=True)
else:
- verify_jump_target(firewall, chain_name, target, ipv6, recursive=False)
+ verify_jump_target(firewall, hook, target, family, recursive=False)
if rule_conf['action'] == 'offload':
if 'offload_target' not in rule_conf:
@@ -246,9 +243,9 @@ def verify_rule(firewall, chain_name, rule_conf, ipv6):
raise ConfigError(f'Cannot match a tcp flag as set and not set')
if 'protocol' in rule_conf:
- if rule_conf['protocol'] == 'icmp' and ipv6:
+ if rule_conf['protocol'] == 'icmp' and family == 'ipv6':
raise ConfigError(f'Cannot match IPv4 ICMP protocol on IPv6, use ipv6-icmp')
- if rule_conf['protocol'] == 'ipv6-icmp' and not ipv6:
+ if rule_conf['protocol'] == 'ipv6-icmp' and family == 'ipv4':
raise ConfigError(f'Cannot match IPv6 ICMP protocol on IPv4, use icmp')
for side in ['destination', 'source']:
@@ -266,7 +263,18 @@ def verify_rule(firewall, chain_name, rule_conf, ipv6):
if group in side_conf['group']:
group_name = side_conf['group'][group]
- fw_group = f'ipv6_{group}' if ipv6 and group in ['address_group', 'network_group'] else group
+ if family == 'ipv6' and group in ['address_group', 'network_group']:
+ fw_group = f'ipv6_{group}'
+ elif family == 'bridge':
+ if group =='ipv4_address_group':
+ fw_group = 'address_group'
+ elif group == 'ipv4_network_group':
+ fw_group = 'network_group'
+ else:
+ fw_group = group
+ else:
+ fw_group = group
+
error_group = fw_group.replace("_", "-")
if group in ['address_group', 'network_group', 'domain_group']:
@@ -302,7 +310,7 @@ def verify_rule(firewall, chain_name, rule_conf, ipv6):
raise ConfigError(f'Dynamic address group must be defined.')
else:
target = rule_conf['add_address_to_group'][type]['address_group']
- fwall_group = 'ipv6_address_group' if ipv6 else 'address_group'
+ fwall_group = 'ipv6_address_group' if family == 'ipv6' else 'address_group'
group_obj = dict_search_args(firewall, 'group', 'dynamic_group', fwall_group, target)
if group_obj is None:
raise ConfigError(f'Invalid dynamic address group on firewall rule')
@@ -379,43 +387,25 @@ def verify(firewall):
for group_name, group in groups.items():
verify_nested_group(group_name, group, groups, [])
- if 'ipv4' in firewall:
- for name in ['name','forward','input','output', 'prerouting']:
- if name in firewall['ipv4']:
- for name_id, name_conf in firewall['ipv4'][name].items():
- if 'jump' in name_conf['default_action'] and 'default_jump_target' not in name_conf:
- raise ConfigError('default-action set to jump, but no default-jump-target specified')
- if 'default_jump_target' in name_conf:
- target = name_conf['default_jump_target']
- if 'jump' not in name_conf['default_action']:
- raise ConfigError('default-jump-target defined, but default-action jump needed and it is not defined')
- if name_conf['default_jump_target'] == name_id:
- raise ConfigError(f'Loop detected on default-jump-target.')
- verify_jump_target(firewall, name, target, False, recursive=True)
-
- if 'rule' in name_conf:
- for rule_id, rule_conf in name_conf['rule'].items():
- verify_rule(firewall, name, rule_conf, False)
-
- if 'ipv6' in firewall:
- for name in ['name','forward','input','output', 'prerouting']:
- if name in firewall['ipv6']:
- for name_id, name_conf in firewall['ipv6'][name].items():
- if 'jump' in name_conf['default_action'] and 'default_jump_target' not in name_conf:
- raise ConfigError('default-action set to jump, but no default-jump-target specified')
- if 'default_jump_target' in name_conf:
- target = name_conf['default_jump_target']
- if 'jump' not in name_conf['default_action']:
- raise ConfigError('default-jump-target defined, but default-action jump needed and it is not defined')
- if name_conf['default_jump_target'] == name_id:
- raise ConfigError(f'Loop detected on default-jump-target.')
- verify_jump_target(firewall, name, target, True, recursive=True)
-
- if 'rule' in name_conf:
- for rule_id, rule_conf in name_conf['rule'].items():
- verify_rule(firewall, name, rule_conf, True)
-
- #### ZONESSSS
+ for family in ['ipv4', 'ipv6', 'bridge']:
+ if family in firewall:
+ for chain in ['name','forward','input','output', 'prerouting']:
+ if chain in firewall[family]:
+ for priority, priority_conf in firewall[family][chain].items():
+ if 'jump' in priority_conf['default_action'] and 'default_jump_target' not in priority_conf:
+ raise ConfigError('default-action set to jump, but no default-jump-target specified')
+ if 'default_jump_target' in priority_conf:
+ target = priority_conf['default_jump_target']
+ if 'jump' not in priority_conf['default_action']:
+ raise ConfigError('default-jump-target defined, but default-action jump needed and it is not defined')
+ if priority_conf['default_jump_target'] == priority:
+ raise ConfigError(f'Loop detected on default-jump-target.')
+ if target not in dict_search_args(firewall[family], 'name'):
+ raise ConfigError(f'Invalid jump-target. Firewall name {target} does not exist on the system')
+ if 'rule' in priority_conf:
+ for rule_id, rule_conf in priority_conf['rule'].items():
+ verify_rule(firewall, family, chain, priority, rule_id, rule_conf)
+
local_zone = False
zone_interfaces = []
@@ -495,8 +485,53 @@ def generate(firewall):
render(sysctl_file, 'firewall/sysctl-firewall.conf.j2', firewall)
return None
+def parse_firewall_error(output):
+ # Define the regex patterns to extract the error message and the comment
+ error_pattern = re.compile(r'Error:\s*(.*?)\n')
+ comment_pattern = re.compile(r'comment\s+"([^"]+)"')
+ error_output = []
+
+ # Find all error messages in the output
+ error_matches = error_pattern.findall(output)
+ # Find all comment matches in the output
+ comment_matches = comment_pattern.findall(output)
+
+ if not error_matches or not comment_matches:
+ raise ConfigError(f'Unknown firewall error detected: {output}')
+
+ error_output.append('Fail to apply firewall')
+ # Loop over the matches and process them
+ for error_message, comment in zip(error_matches, comment_matches):
+ # Parse the comment
+ parsed_entries = comment.split('-')
+ family = 'bridge' if parsed_entries[0] == 'bri' else parsed_entries[0]
+ if parsed_entries[1] == 'NAM':
+ chain = 'name'
+ elif parsed_entries[1] == 'FWD':
+ chain = 'forward'
+ elif parsed_entries[1] == 'INP':
+ chain = 'input'
+ elif parsed_entries[1] == 'OUT':
+ chain = 'output'
+ elif parsed_entries[1] == 'PRE':
+ chain = 'prerouting'
+ error_output.append(f'Error found on: firewall {family} {chain} {parsed_entries[2]} rule {parsed_entries[3]}')
+ error_output.append(f'\tError message: {error_message.strip()}')
+
+ raise ConfigError('\n'.join(error_output))
+
def apply(firewall):
+ # Use nft -c option to check current configuration file
+ completed_process = subp_run(['nft', '-c', '--file', nftables_conf], capture_output=True)
+ install_result = completed_process.returncode
+ if install_result == 1:
+ # We need to handle firewall error
+ output = completed_process.stderr
+ parse_firewall_error(output.decode())
+
+ # No error detected during check, we can apply the new configuration
install_result, output = rc_cmd(f'nft --file {nftables_conf}')
+ # Double check just in case
if install_result == 1:
raise ConfigError(f'Failed to apply firewall: {output}')