summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rwxr-xr-xsrc/conf_mode/firewall.py26
-rwxr-xr-xsrc/conf_mode/system_login.py3
-rwxr-xr-xsrc/op_mode/firewall.py34
-rwxr-xr-xsrc/op_mode/image_installer.py3
-rwxr-xr-xsrc/op_mode/qos.py2
-rwxr-xr-xsrc/services/vyos-domain-resolver58
-rw-r--r--src/tests/test_config_parser.py4
-rwxr-xr-xsrc/validators/base6422
8 files changed, 120 insertions, 32 deletions
diff --git a/src/conf_mode/firewall.py b/src/conf_mode/firewall.py
index 768bb127d..cebe57092 100755
--- a/src/conf_mode/firewall.py
+++ b/src/conf_mode/firewall.py
@@ -44,6 +44,7 @@ airbag.enable()
nftables_conf = '/run/nftables.conf'
domain_resolver_usage = '/run/use-vyos-domain-resolver-firewall'
+firewall_config_dir = "/config/firewall"
sysctl_file = r'/run/sysctl/10-vyos-firewall.conf'
@@ -53,7 +54,8 @@ valid_groups = [
'network_group',
'port_group',
'interface_group',
- ## Added for group ussage in bridge firewall
+ 'remote_group',
+ ## Added for group usage in bridge firewall
'ipv4_address_group',
'ipv6_address_group',
'ipv4_network_group',
@@ -311,8 +313,8 @@ def verify_rule(firewall, family, hook, priority, rule_id, rule_conf):
raise ConfigError('Only one of address, fqdn or geoip can be specified')
if 'group' in side_conf:
- if len({'address_group', 'network_group', 'domain_group'} & set(side_conf['group'])) > 1:
- raise ConfigError('Only one address-group, network-group or domain-group can be specified')
+ if len({'address_group', 'network_group', 'domain_group', 'remote_group'} & set(side_conf['group'])) > 1:
+ raise ConfigError('Only one address-group, network-group, remote-group or domain-group can be specified')
for group in valid_groups:
if group in side_conf['group']:
@@ -332,7 +334,7 @@ def verify_rule(firewall, family, hook, priority, rule_id, rule_conf):
error_group = fw_group.replace("_", "-")
- if group in ['address_group', 'network_group', 'domain_group']:
+ if group in ['address_group', 'network_group', 'domain_group', 'remote_group']:
types = [t for t in ['address', 'fqdn', 'geoip'] if t in side_conf]
if types:
raise ConfigError(f'{error_group} and {types[0]} cannot both be defined')
@@ -442,6 +444,11 @@ def verify(firewall):
for group_name, group in groups.items():
verify_nested_group(group_name, group, groups, [])
+ if 'remote_group' in firewall['group']:
+ for group_name, group in firewall['group']['remote_group'].items():
+ if 'url' not in group:
+ raise ConfigError(f'remote-group {group_name} must have a url configured')
+
for family in ['ipv4', 'ipv6', 'bridge']:
if family in firewall:
for chain in ['name','forward','input','output', 'prerouting']:
@@ -539,6 +546,15 @@ def verify(firewall):
def generate(firewall):
render(nftables_conf, 'firewall/nftables.j2', firewall)
render(sysctl_file, 'firewall/sysctl-firewall.conf.j2', firewall)
+
+ # Cleanup remote-group cache files
+ if os.path.exists(firewall_config_dir):
+ for fw_file in os.listdir(firewall_config_dir):
+ # Delete matching files in 'config/firewall' that no longer exist as a remote-group in config
+ if fw_file.startswith("R_") and fw_file.endswith(".txt"):
+ if 'group' not in firewall or 'remote_group' not in firewall['group'] or fw_file[2:-4] not in firewall['group']['remote_group'].keys():
+ os.unlink(os.path.join(firewall_config_dir, fw_file))
+
return None
def parse_firewall_error(output):
@@ -598,7 +614,7 @@ def apply(firewall):
## DOMAIN RESOLVER
domain_action = 'restart'
- if dict_search_args(firewall, 'group', 'domain_group') or firewall['ip_fqdn'].items() or firewall['ip6_fqdn'].items():
+ if dict_search_args(firewall, 'group', 'remote_group') or dict_search_args(firewall, 'group', 'domain_group') or firewall['ip_fqdn'].items() or firewall['ip6_fqdn'].items():
text = f'# Automatically generated by firewall.py\nThis file indicates that vyos-domain-resolver service is used by the firewall.\n'
Path(domain_resolver_usage).write_text(text)
else:
diff --git a/src/conf_mode/system_login.py b/src/conf_mode/system_login.py
index 1e6061ecf..3fed6d273 100755
--- a/src/conf_mode/system_login.py
+++ b/src/conf_mode/system_login.py
@@ -160,9 +160,10 @@ def verify(login):
dict_object=user_config
) or None
+ failed_check_status = [EPasswdStrength.WEAK, EPasswdStrength.ERROR]
if plaintext_password is not None:
result = evaluate_strength(plaintext_password)
- if result['strength'] == EPasswdStrength.WEAK:
+ if result['strength'] in failed_check_status:
Warning(result['error'])
for pubkey, pubkey_options in (dict_search('authentication.public_keys', user_config) or {}).items():
diff --git a/src/op_mode/firewall.py b/src/op_mode/firewall.py
index c197ca434..7a3ab921d 100755
--- a/src/op_mode/firewall.py
+++ b/src/op_mode/firewall.py
@@ -253,15 +253,17 @@ def output_firewall_name_statistics(family, hook, prior, prior_conf, single_rule
if not source_addr:
source_addr = dict_search_args(rule_conf, 'source', 'group', 'domain_group')
if not source_addr:
- source_addr = dict_search_args(rule_conf, 'source', 'fqdn')
+ source_addr = dict_search_args(rule_conf, 'source', 'group', 'remote_group')
if not source_addr:
- source_addr = dict_search_args(rule_conf, 'source', 'geoip', 'country_code')
- if source_addr:
- source_addr = str(source_addr)[1:-1].replace('\'','')
- if 'inverse_match' in dict_search_args(rule_conf, 'source', 'geoip'):
- source_addr = 'NOT ' + str(source_addr)
+ source_addr = dict_search_args(rule_conf, 'source', 'fqdn')
if not source_addr:
- source_addr = 'any'
+ source_addr = dict_search_args(rule_conf, 'source', 'geoip', 'country_code')
+ if source_addr:
+ source_addr = str(source_addr)[1:-1].replace('\'','')
+ if 'inverse_match' in dict_search_args(rule_conf, 'source', 'geoip'):
+ source_addr = 'NOT ' + str(source_addr)
+ if not source_addr:
+ source_addr = 'any'
# Get destination
dest_addr = dict_search_args(rule_conf, 'destination', 'address')
@@ -272,15 +274,17 @@ def output_firewall_name_statistics(family, hook, prior, prior_conf, single_rule
if not dest_addr:
dest_addr = dict_search_args(rule_conf, 'destination', 'group', 'domain_group')
if not dest_addr:
- dest_addr = dict_search_args(rule_conf, 'destination', 'fqdn')
+ dest_addr = dict_search_args(rule_conf, 'destination', 'group', 'remote_group')
if not dest_addr:
- dest_addr = dict_search_args(rule_conf, 'destination', 'geoip', 'country_code')
- if dest_addr:
- dest_addr = str(dest_addr)[1:-1].replace('\'','')
- if 'inverse_match' in dict_search_args(rule_conf, 'destination', 'geoip'):
- dest_addr = 'NOT ' + str(dest_addr)
+ dest_addr = dict_search_args(rule_conf, 'destination', 'fqdn')
if not dest_addr:
- dest_addr = 'any'
+ dest_addr = dict_search_args(rule_conf, 'destination', 'geoip', 'country_code')
+ if dest_addr:
+ dest_addr = str(dest_addr)[1:-1].replace('\'','')
+ if 'inverse_match' in dict_search_args(rule_conf, 'destination', 'geoip'):
+ dest_addr = 'NOT ' + str(dest_addr)
+ if not dest_addr:
+ dest_addr = 'any'
# Get inbound interface
iiface = dict_search_args(rule_conf, 'inbound_interface', 'name')
@@ -571,6 +575,8 @@ def show_firewall_group(name=None):
row.append("\n".join(sorted(group_conf['port'])))
elif 'interface' in group_conf:
row.append("\n".join(sorted(group_conf['interface'])))
+ elif 'url' in group_conf:
+ row.append(group_conf['url'])
else:
row.append('N/D')
rows.append(row)
diff --git a/src/op_mode/image_installer.py b/src/op_mode/image_installer.py
index c6e9c7f6f..82756daec 100755
--- a/src/op_mode/image_installer.py
+++ b/src/op_mode/image_installer.py
@@ -783,6 +783,7 @@ def install_image() -> None:
break
print(MSG_WARN_IMAGE_NAME_WRONG)
+ failed_check_status = [EPasswdStrength.WEAK, EPasswdStrength.ERROR]
# ask for password
while True:
user_password: str = ask_input(MSG_INPUT_PASSWORD, no_echo=True,
@@ -792,7 +793,7 @@ def install_image() -> None:
Warning(MSG_WARN_CHANGE_PASSWORD)
else:
result = evaluate_strength(user_password)
- if result['strength'] == EPasswdStrength.WEAK:
+ if result['strength'] in failed_check_status:
Warning(result['error'])
confirm: str = ask_input(MSG_INPUT_PASSWORD_CONFIRM, no_echo=True,
diff --git a/src/op_mode/qos.py b/src/op_mode/qos.py
index b8ca149a0..464b552ee 100755
--- a/src/op_mode/qos.py
+++ b/src/op_mode/qos.py
@@ -38,7 +38,7 @@ def get_tc_info(interface_dict, interface_name, policy_type):
if not policy_name:
return None, None
- class_dict = op_mode_config_dict(['qos', 'policy', policy_type, policy_name], key_mangling=('-', '_'),
+ class_dict = op_mode_config_dict(['qos', 'policy', policy_type, policy_name],
get_first_key=True)
if not class_dict:
return None, None
diff --git a/src/services/vyos-domain-resolver b/src/services/vyos-domain-resolver
index 48c6b86d8..aba5ba9db 100755
--- a/src/services/vyos-domain-resolver
+++ b/src/services/vyos-domain-resolver
@@ -13,19 +13,22 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
import json
import time
import logging
+import os
from vyos.configdict import dict_merge
from vyos.configquery import ConfigTreeQuery
from vyos.firewall import fqdn_config_parse
from vyos.firewall import fqdn_resolve
from vyos.ifconfig import WireGuardIf
+from vyos.remote import download
from vyos.utils.commit import commit_in_progress
from vyos.utils.dict import dict_search_args
from vyos.utils.kernel import WIREGUARD_REKEY_AFTER_TIME
+from vyos.utils.file import makedir, chmod_775, write_file, read_file
+from vyos.utils.network import is_valid_ipv4_address_or_range
from vyos.utils.process import cmd
from vyos.utils.process import run
from vyos.xml_ref import get_defaults
@@ -37,6 +40,8 @@ base_firewall = ['firewall']
base_nat = ['nat']
base_interfaces = ['interfaces']
+firewall_config_dir = "/config/firewall"
+
domain_state = {}
ipv4_tables = {
@@ -121,6 +126,56 @@ def nft_valid_sets():
except:
return []
+def update_remote_group(config):
+ conf_lines = []
+ count = 0
+ valid_sets = nft_valid_sets()
+
+ remote_groups = dict_search_args(config, 'group', 'remote_group')
+ if remote_groups:
+ # Create directory for list files if necessary
+ if not os.path.isdir(firewall_config_dir):
+ makedir(firewall_config_dir, group='vyattacfg')
+ chmod_775(firewall_config_dir)
+
+ for set_name, remote_config in remote_groups.items():
+ if 'url' not in remote_config:
+ continue
+ nft_set_name = f'R_{set_name}'
+
+ # Create list file if necessary
+ list_file = os.path.join(firewall_config_dir, f"{nft_set_name}.txt")
+ if not os.path.exists(list_file):
+ write_file(list_file, '', user="root", group="vyattacfg", mode=0o644)
+
+ # Attempt to download file, use cached version if download fails
+ try:
+ download(list_file, remote_config['url'], raise_error=True)
+ except:
+ logger.error(f'Failed to download list-file for {set_name} remote group')
+ logger.info(f'Using cached list-file for {set_name} remote group')
+
+ # Read list file
+ ip_list = []
+ for line in read_file(list_file).splitlines():
+ line_first_word = line.strip().partition(' ')[0]
+
+ if is_valid_ipv4_address_or_range(line_first_word):
+ ip_list.append(line_first_word)
+
+ # Load tables
+ for table in ipv4_tables:
+ if (table, nft_set_name) in valid_sets:
+ conf_lines += nft_output(table, nft_set_name, ip_list)
+
+ count += 1
+
+ nft_conf_str = "\n".join(conf_lines) + "\n"
+ code = run(f'nft --file -', input=nft_conf_str)
+
+ logger.info(f'Updated {count} remote-groups in firewall - result: {code}')
+
+
def update_fqdn(config, node):
conf_lines = []
count = 0
@@ -234,5 +289,6 @@ if __name__ == '__main__':
while True:
update_fqdn(firewall, 'firewall')
update_fqdn(nat, 'nat')
+ update_remote_group(firewall)
update_interfaces(interfaces, 'interfaces')
time.sleep(timeout)
diff --git a/src/tests/test_config_parser.py b/src/tests/test_config_parser.py
index 9a4f02859..1b4a57311 100644
--- a/src/tests/test_config_parser.py
+++ b/src/tests/test_config_parser.py
@@ -51,3 +51,7 @@ class TestConfigParser(TestCase):
def test_rename_duplicate(self):
with self.assertRaises(vyos.configtree.ConfigTreeError):
self.config.rename(["top-level-tag-node", "foo"], "bar")
+
+ def test_leading_slashes(self):
+ self.assertTrue(self.config.exists(["normal-node", "value-with-leading-slashes"]))
+ self.assertEqual(self.config.return_value(["normal-node", "value-with-leading-slashes"]), "//other-value")
diff --git a/src/validators/base64 b/src/validators/base64
index e2b1e730d..a54168ef7 100755
--- a/src/validators/base64
+++ b/src/validators/base64
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2021 VyOS maintainers and contributors
+# Copyright (C) 2021-2025 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -15,13 +15,17 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import base64
-from sys import argv
+import argparse
-if __name__ == '__main__':
- if len(argv) != 2:
- exit(1)
- try:
- base64.b64decode(argv[1])
- except:
+parser = argparse.ArgumentParser(description="Validate base64 input.")
+parser.add_argument("base64", help="Base64 encoded string to validate")
+parser.add_argument("--decoded-len", type=int, help="Optional list of valid lengths for the decoded input")
+args = parser.parse_args()
+
+try:
+ decoded = base64.b64decode(args.base64)
+ if args.decoded_len and len(decoded) != args.decoded_len:
exit(1)
- exit(0)
+except:
+ exit(1)
+exit(0)