diff options
Diffstat (limited to 'src/conf_mode')
-rwxr-xr-x | src/conf_mode/interfaces_macsec.py | 10 | ||||
-rwxr-xr-x | src/conf_mode/interfaces_wireless.py | 11 | ||||
-rwxr-xr-x | src/conf_mode/nat_cgnat.py | 51 | ||||
-rwxr-xr-x | src/conf_mode/pki.py | 2 | ||||
-rwxr-xr-x | src/conf_mode/system_login.py | 50 | ||||
-rw-r--r-- | src/conf_mode/system_wireless.py | 64 | ||||
-rwxr-xr-x | src/conf_mode/vpn_openconnect.py | 43 |
7 files changed, 172 insertions, 59 deletions
diff --git a/src/conf_mode/interfaces_macsec.py b/src/conf_mode/interfaces_macsec.py index eb0ca9a8b..3ede4377a 100755 --- a/src/conf_mode/interfaces_macsec.py +++ b/src/conf_mode/interfaces_macsec.py @@ -103,9 +103,9 @@ def verify(macsec): # Logic to check static configuration if dict_search('security.static', macsec) != None: - # tx-key must be defined + # key must be defined if dict_search('security.static.key', macsec) == None: - raise ConfigError('Static MACsec tx-key must be defined.') + raise ConfigError('Static MACsec key must be defined.') tx_len = len(dict_search('security.static.key', macsec)) @@ -119,12 +119,12 @@ def verify(macsec): if 'peer' not in macsec['security']['static']: raise ConfigError('Must have at least one peer defined for static MACsec') - # For every enabled peer, make sure a MAC and rx-key is defined + # For every enabled peer, make sure a MAC and key is defined for peer, peer_config in macsec['security']['static']['peer'].items(): if 'disable' not in peer_config and ('mac' not in peer_config or 'key' not in peer_config): - raise ConfigError('Every enabled MACsec static peer must have a MAC address and rx-key defined.') + raise ConfigError('Every enabled MACsec static peer must have a MAC address and key defined!') - # check rx-key length against cipher suite + # check key length against cipher suite rx_len = len(peer_config['key']) if dict_search('security.cipher', macsec) == 'gcm-aes-128' and rx_len != GCM_AES_128_LEN: diff --git a/src/conf_mode/interfaces_wireless.py b/src/conf_mode/interfaces_wireless.py index c0a17c0bc..998ff9dba 100755 --- a/src/conf_mode/interfaces_wireless.py +++ b/src/conf_mode/interfaces_wireless.py @@ -44,6 +44,8 @@ hostapd_conf = '/run/hostapd/{ifname}.conf' hostapd_accept_station_conf = '/run/hostapd/{ifname}_station_accept.conf' hostapd_deny_station_conf = '/run/hostapd/{ifname}_station_deny.conf' +country_code_path = ['system', 'wireless', 'country-code'] + def find_other_stations(conf, base, ifname): """ Only one wireless interface per phy can be in station mode - @@ -78,7 +80,11 @@ def get_config(config=None): conf = Config() base = ['interfaces', 'wireless'] - ifname, wifi = get_interface_dict(conf, base) + _, wifi = get_interface_dict(conf, base) + + # retrieve global Wireless regulatory domain setting + if conf.exists(country_code_path): + wifi['country_code'] = conf.return_value(country_code_path) if 'deleted' not in wifi: # then get_interface_dict provides default keys @@ -131,7 +137,8 @@ def verify(wifi): if wifi['type'] == 'access-point': if 'country_code' not in wifi: - raise ConfigError('Wireless country-code is mandatory') + raise ConfigError(f'Wireless country-code is mandatory, use: '\ + f'"set {" ".join(country_code_path)}"!') if 'channel' not in wifi: raise ConfigError('Wireless channel must be configured!') diff --git a/src/conf_mode/nat_cgnat.py b/src/conf_mode/nat_cgnat.py index cb336a35c..34ec64fce 100755 --- a/src/conf_mode/nat_cgnat.py +++ b/src/conf_mode/nat_cgnat.py @@ -23,6 +23,7 @@ from sys import exit from logging.handlers import SysLogHandler from vyos.config import Config +from vyos.configdict import is_node_changed from vyos.template import render from vyos.utils.process import cmd from vyos.utils.process import run @@ -118,6 +119,41 @@ class IPOperations: + [self.ip_network.broadcast_address] ] + def get_prefix_by_ip_range(self): + """Return the common prefix for the address range + + Example: + % ip = IPOperations('100.64.0.1-100.64.0.5') + % ip.get_prefix_by_ip_range() + 100.64.0.0/29 + """ + if '-' in self.ip_prefix: + ip_start, ip_end = self.ip_prefix.split('-') + start_ip = ipaddress.IPv4Address(ip_start.strip()) + end_ip = ipaddress.IPv4Address(ip_end.strip()) + + start_int = int(start_ip) + end_int = int(end_ip) + + # XOR to find differing bits + xor = start_int ^ end_int + + # Count the number of leading zeros in the XOR result to find the prefix length + prefix_length = 32 - xor.bit_length() + + # Calculate the network address + network_int = start_int & (0xFFFFFFFF << (32 - prefix_length)) + network_address = ipaddress.IPv4Address(network_int) + + return f"{network_address}/{prefix_length}" + return self.ip_prefix + + +def _delete_conntrack_entries(source_prefixes: list) -> None: + """Delete all conntrack entries for the list of prefixes""" + for source_prefix in source_prefixes: + run(f'conntrack -D -s {source_prefix}') + def generate_port_rules( external_hosts: list, @@ -188,6 +224,9 @@ def get_config(config=None): with_recursive_defaults=True, ) + if conf.exists(base) and is_node_changed(conf, base + ['pool']): + config.update({'delete_conntrack_entries': {}}) + return config @@ -386,6 +425,18 @@ def apply(config): # Log error message logger.error(f"Error processing line '{allocation}': {e}") + # Delete conntrack entries + if 'delete_conntrack_entries' in config: + internal_pool_prefix_list = [] + for rule, rule_config in config['rule'].items(): + internal_pool = rule_config['source']['pool'] + internal_ip_ranges: list = config['pool']['internal'][internal_pool]['range'] + for internal_range in internal_ip_ranges: + ip_prefix = IPOperations(internal_range).get_prefix_by_ip_range() + internal_pool_prefix_list.append(ip_prefix) + # Deleta required sources for conntrack + _delete_conntrack_entries(internal_pool_prefix_list) + if __name__ == '__main__': try: diff --git a/src/conf_mode/pki.py b/src/conf_mode/pki.py index f37cac524..4a0e86f32 100755 --- a/src/conf_mode/pki.py +++ b/src/conf_mode/pki.py @@ -232,7 +232,7 @@ def get_config(config=None): path = search['path'] path_str = ' '.join(path + found_path) - print(f'PKI: Updating config: {path_str} {item_name}') + #print(f'PKI: Updating config: {path_str} {item_name}') if path[0] == 'interfaces': ifname = found_path[0] diff --git a/src/conf_mode/system_login.py b/src/conf_mode/system_login.py index 20121f170..439fa645b 100755 --- a/src/conf_mode/system_login.py +++ b/src/conf_mode/system_login.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2020-2023 VyOS maintainers and contributors +# Copyright (C) 2020-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -26,14 +26,15 @@ from time import sleep from vyos.config import Config from vyos.configverify import verify_vrf -from vyos.defaults import directories from vyos.template import render from vyos.template import is_ipv4 +from vyos.utils.auth import get_current_user +from vyos.utils.configfs import delete_cli_node +from vyos.utils.configfs import add_cli_node from vyos.utils.dict import dict_search from vyos.utils.file import chown from vyos.utils.process import cmd from vyos.utils.process import call -from vyos.utils.process import rc_cmd from vyos.utils.process import run from vyos.utils.process import DEVNULL from vyos import ConfigError @@ -125,10 +126,9 @@ def verify(login): # This check is required as the script is also executed from vyos-router # init script and there is no SUDO_USER environment variable available # during system boot. - if 'SUDO_USER' in os.environ: - cur_user = os.environ['SUDO_USER'] - if cur_user in login['rm_users']: - raise ConfigError(f'Attempting to delete current user: {cur_user}') + tmp = get_current_user() + if tmp in login['rm_users']: + raise ConfigError(f'Attempting to delete current user: {tmp}') if 'user' in login: system_users = getpwall() @@ -221,35 +221,13 @@ def generate(login): login['user'][user]['authentication']['encrypted_password'] = encrypted_password del login['user'][user]['authentication']['plaintext_password'] - # remove old plaintext password and set new encrypted password - env = os.environ.copy() - env['vyos_libexec_dir'] = directories['base'] - # Set default commands for re-adding user with encrypted password - del_user_plain = f"system login user {user} authentication plaintext-password" - add_user_encrypt = f"system login user {user} authentication encrypted-password '{encrypted_password}'" - - lvl = env['VYATTA_EDIT_LEVEL'] - # We're in config edit level, for example "edit system login" - # Change default commands for re-adding user with encrypted password - if lvl != '/': - # Replace '/system/login' to 'system login' - lvl = lvl.strip('/').split('/') - # Convert command str to list - del_user_plain = del_user_plain.split() - # New command exclude level, for example "edit system login" - del_user_plain = del_user_plain[len(lvl):] - # Convert string to list - del_user_plain = " ".join(del_user_plain) - - add_user_encrypt = add_user_encrypt.split() - add_user_encrypt = add_user_encrypt[len(lvl):] - add_user_encrypt = " ".join(add_user_encrypt) - - ret, out = rc_cmd(f"/opt/vyatta/sbin/my_delete {del_user_plain}", env=env) - if ret: raise ConfigError(out) - ret, out = rc_cmd(f"/opt/vyatta/sbin/my_set {add_user_encrypt}", env=env) - if ret: raise ConfigError(out) + del_user_plain = ['system', 'login', 'user', user, 'authentication', 'plaintext-password'] + add_user_encrypt = ['system', 'login', 'user', user, 'authentication', 'encrypted-password'] + + delete_cli_node(del_user_plain) + add_cli_node(add_user_encrypt, value=encrypted_password) + else: try: if get_shadow_password(user) == dict_search('authentication.encrypted_password', user_config): @@ -283,8 +261,6 @@ def generate(login): if os.path.isfile(tacacs_nss_config_file): os.unlink(tacacs_nss_config_file) - - # NSS must always be present on the system render(nss_config_file, 'login/nsswitch.conf.j2', login, permission=0o644, user='root', group='root') diff --git a/src/conf_mode/system_wireless.py b/src/conf_mode/system_wireless.py new file mode 100644 index 000000000..e0ca0ab8e --- /dev/null +++ b/src/conf_mode/system_wireless.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2024 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from sys import exit + +from vyos.config import Config +from vyos.configdep import set_dependents +from vyos.configdep import call_dependents +from vyos import ConfigError +from vyos import airbag +airbag.enable() + +def get_config(config=None): + if config: + conf = config + else: + conf = Config() + base = ['system', 'wireless'] + interface_base = ['interfaces', 'wireless'] + + wireless = conf.get_config_dict(base, key_mangling=('-', '_'), + get_first_key=True) + + + if conf.exists(interface_base): + wireless['interfaces'] = conf.list_nodes(interface_base) + for interface in wireless['interfaces']: + set_dependents('wireless', conf, interface) + + return wireless + +def verify(wireless): + pass + +def generate(wireless): + pass + +def apply(wireless): + if 'interfaces' in wireless: + call_dependents() + pass + +if __name__ == '__main__': + try: + c = get_config() + verify(c) + generate(c) + apply(c) + except ConfigError as e: + print(e) + exit(1) diff --git a/src/conf_mode/vpn_openconnect.py b/src/conf_mode/vpn_openconnect.py index 8159fedea..42785134f 100755 --- a/src/conf_mode/vpn_openconnect.py +++ b/src/conf_mode/vpn_openconnect.py @@ -21,14 +21,17 @@ from vyos.base import Warning from vyos.config import Config from vyos.configverify import verify_pki_certificate from vyos.configverify import verify_pki_ca_certificate -from vyos.pki import wrap_certificate +from vyos.pki import find_chain +from vyos.pki import encode_certificate +from vyos.pki import load_certificate from vyos.pki import wrap_private_key from vyos.template import render -from vyos.utils.process import call +from vyos.utils.dict import dict_search +from vyos.utils.file import write_file from vyos.utils.network import check_port_availability -from vyos.utils.process import is_systemd_service_running from vyos.utils.network import is_listen_port_bind_service -from vyos.utils.dict import dict_search +from vyos.utils.process import call +from vyos.utils.process import is_systemd_service_running from vyos import ConfigError from passlib.hash import sha512_crypt from time import sleep @@ -142,7 +145,8 @@ def verify(ocserv): verify_pki_certificate(ocserv, ocserv['ssl']['certificate']) if 'ca_certificate' in ocserv['ssl']: - verify_pki_ca_certificate(ocserv, ocserv['ssl']['ca_certificate']) + for ca_cert in ocserv['ssl']['ca_certificate']: + verify_pki_ca_certificate(ocserv, ca_cert) # Check network settings if "network_settings" in ocserv: @@ -219,25 +223,36 @@ def generate(ocserv): if "ssl" in ocserv: cert_file_path = os.path.join(cfg_dir, 'cert.pem') cert_key_path = os.path.join(cfg_dir, 'cert.key') - ca_cert_file_path = os.path.join(cfg_dir, 'ca.pem') + if 'certificate' in ocserv['ssl']: cert_name = ocserv['ssl']['certificate'] pki_cert = ocserv['pki']['certificate'][cert_name] - with open(cert_file_path, 'w') as f: - f.write(wrap_certificate(pki_cert['certificate'])) + loaded_pki_cert = load_certificate(pki_cert['certificate']) + loaded_ca_certs = {load_certificate(c['certificate']) + for c in ocserv['pki']['ca'].values()} if 'ca' in ocserv['pki'] else {} + + cert_full_chain = find_chain(loaded_pki_cert, loaded_ca_certs) + + write_file(cert_file_path, + '\n'.join(encode_certificate(c) for c in cert_full_chain)) if 'private' in pki_cert and 'key' in pki_cert['private']: - with open(cert_key_path, 'w') as f: - f.write(wrap_private_key(pki_cert['private']['key'])) + write_file(cert_key_path, wrap_private_key(pki_cert['private']['key'])) if 'ca_certificate' in ocserv['ssl']: - ca_name = ocserv['ssl']['ca_certificate'] - pki_ca_cert = ocserv['pki']['ca'][ca_name] + ca_cert_file_path = os.path.join(cfg_dir, 'ca.pem') + ca_chains = [] + + for ca_name in ocserv['ssl']['ca_certificate']: + pki_ca_cert = ocserv['pki']['ca'][ca_name] + loaded_ca_cert = load_certificate(pki_ca_cert['certificate']) + ca_full_chain = find_chain(loaded_ca_cert, loaded_ca_certs) + ca_chains.append( + '\n'.join(encode_certificate(c) for c in ca_full_chain)) - with open(ca_cert_file_path, 'w') as f: - f.write(wrap_certificate(pki_ca_cert['certificate'])) + write_file(ca_cert_file_path, '\n'.join(ca_chains)) # Render config render(ocserv_conf, 'ocserv/ocserv_config.j2', ocserv) |