summaryrefslogtreecommitdiff
path: root/src/conf_mode/firewall.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/conf_mode/firewall.py')
-rwxr-xr-xsrc/conf_mode/firewall.py46
1 files changed, 46 insertions, 0 deletions
diff --git a/src/conf_mode/firewall.py b/src/conf_mode/firewall.py
index 352d5cbb1..77218cc77 100755
--- a/src/conf_mode/firewall.py
+++ b/src/conf_mode/firewall.py
@@ -36,6 +36,7 @@ from vyos.utils.process import cmd
from vyos.utils.process import rc_cmd
from vyos import ConfigError
from vyos import airbag
+from subprocess import run as subp_run
airbag.enable()
@@ -495,8 +496,53 @@ def generate(firewall):
render(sysctl_file, 'firewall/sysctl-firewall.conf.j2', firewall)
return None
+def parse_firewall_error(output):
+ # Define the regex patterns to extract the error message and the comment
+ error_pattern = re.compile(r'Error:\s*(.*?)\n')
+ comment_pattern = re.compile(r'comment\s+"([^"]+)"')
+ error_output = []
+
+ # Find all error messages in the output
+ error_matches = error_pattern.findall(output)
+ # Find all comment matches in the output
+ comment_matches = comment_pattern.findall(output)
+
+ if not error_matches or not comment_matches:
+ raise ConfigError(f'Unknown firewall error detected: {output}')
+
+ error_output.append('Fail to apply firewall')
+ # Loop over the matches and process them
+ for error_message, comment in zip(error_matches, comment_matches):
+ # Parse the comment
+ parsed_entries = comment.split('-')
+ family = 'bridge' if parsed_entries[0] == 'bri' else parsed_entries[0]
+ if parsed_entries[1] == 'NAM':
+ chain = 'name'
+ elif parsed_entries[1] == 'FWD':
+ chain = 'forward'
+ elif parsed_entries[1] == 'INP':
+ chain = 'input'
+ elif parsed_entries[1] == 'OUT':
+ chain = 'output'
+ elif parsed_entries[1] == 'PRE':
+ chain = 'prerouting'
+ error_output.append(f'Error found on: firewall {family} {chain} {parsed_entries[2]} rule {parsed_entries[3]}')
+ error_output.append(f'\tError message: {error_message.strip()}')
+
+ raise ConfigError('\n'.join(error_output))
+
def apply(firewall):
+ # Use nft -c option to check current configuration file
+ completed_process = subp_run(['nft', '-c', '--file', nftables_conf], capture_output=True)
+ install_result = completed_process.returncode
+ if install_result == 1:
+ # We need to handle firewall error
+ output = completed_process.stderr
+ parse_firewall_error(output.decode())
+
+ # No error detected during check, we can apply the new configuration
install_result, output = rc_cmd(f'nft --file {nftables_conf}')
+ # Double check just in case
if install_result == 1:
raise ConfigError(f'Failed to apply firewall: {output}')