diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/vyos/configverify.py | 103 | ||||
-rw-r--r-- | python/vyos/firewall.py | 6 | ||||
-rw-r--r-- | python/vyos/ifconfig/interface.py | 2 |
3 files changed, 70 insertions, 41 deletions
diff --git a/python/vyos/configverify.py b/python/vyos/configverify.py index 651036bad..300647d21 100644 --- a/python/vyos/configverify.py +++ b/python/vyos/configverify.py @@ -169,43 +169,6 @@ def verify_tunnel(config): if 'source_address' in config and is_ipv6(config['source_address']): raise ConfigError('Can not use local IPv6 address is for mGRE tunnels') -def verify_eapol(config): - """ - Common helper function used by interface implementations to perform - recurring validation of EAPoL configuration. - """ - if 'eapol' in config: - if 'certificate' not in config['eapol']: - raise ConfigError('Certificate must be specified when using EAPoL!') - - if 'pki' not in config or 'certificate' not in config['pki']: - raise ConfigError('Invalid certificate specified for EAPoL') - - cert_name = config['eapol']['certificate'] - if cert_name not in config['pki']['certificate']: - raise ConfigError('Invalid certificate specified for EAPoL') - - cert = config['pki']['certificate'][cert_name] - - if 'certificate' not in cert or 'private' not in cert or 'key' not in cert['private']: - raise ConfigError('Invalid certificate/private key specified for EAPoL') - - if 'password_protected' in cert['private']: - raise ConfigError('Encrypted private key cannot be used for EAPoL') - - if 'ca_certificate' in config['eapol']: - if 'ca' not in config['pki']: - raise ConfigError('Invalid CA certificate specified for EAPoL') - - for ca_cert_name in config['eapol']['ca_certificate']: - if ca_cert_name not in config['pki']['ca']: - raise ConfigError('Invalid CA certificate specified for EAPoL') - - ca_cert = config['pki']['ca'][ca_cert_name] - - if 'certificate' not in ca_cert: - raise ConfigError('Invalid CA certificate specified for EAPoL') - def verify_mirror_redirect(config): """ Common helper function used by interface implementations to perform @@ -495,3 +458,69 @@ def verify_access_list(access_list, config, version=''): # Check if the specified ACL exists, if not error out if dict_search(f'policy.access-list{version}.{access_list}', config) == None: raise ConfigError(f'Specified access-list{version} "{access_list}" does not exist!') + +def verify_pki_certificate(config: dict, cert_name: str, no_password_protected: bool=False): + """ + Common helper function user by PKI consumers to perform recurring + validation functions for PEM based certificates + """ + if 'pki' not in config: + raise ConfigError('PKI is not configured!') + + if 'certificate' not in config['pki']: + raise ConfigError('PKI does not contain any certificates!') + + if cert_name not in config['pki']['certificate']: + raise ConfigError(f'Certificate "{cert_name}" not found in configuration!') + + pki_cert = config['pki']['certificate'][cert_name] + if 'certificate' not in pki_cert: + raise ConfigError(f'PEM certificate for "{cert_name}" missing in configuration!') + + if 'private' not in pki_cert or 'key' not in pki_cert['private']: + raise ConfigError(f'PEM private key for "{cert_name}" missing in configuration!') + + if no_password_protected and 'password_protected' in pki_cert['private']: + raise ConfigError('Password protected PEM private key is not supported!') + +def verify_pki_ca_certificate(config: dict, ca_name: str): + """ + Common helper function user by PKI consumers to perform recurring + validation functions for PEM based CA certificates + """ + if 'pki' not in config: + raise ConfigError('PKI is not configured!') + + if 'ca' not in config['pki']: + raise ConfigError('PKI does not contain any CA certificates!') + + if ca_name not in config['pki']['ca']: + raise ConfigError(f'CA Certificate "{ca_name}" not found in configuration!') + + pki_cert = config['pki']['ca'][ca_name] + if 'certificate' not in pki_cert: + raise ConfigError(f'PEM CA certificate for "{cert_name}" missing in configuration!') + +def verify_pki_dh_parameters(config: dict, dh_name: str, min_key_size: int=0): + """ + Common helper function user by PKI consumers to perform recurring + validation functions on DH parameters + """ + from vyos.pki import load_dh_parameters + + if 'pki' not in config: + raise ConfigError('PKI is not configured!') + + if 'dh' not in config['pki']: + raise ConfigError('PKI does not contain any DH parameters!') + + if dh_name not in config['pki']['dh']: + raise ConfigError(f'DH parameter "{dh_name}" not found in configuration!') + + if min_key_size: + pki_dh = config['pki']['dh'][dh_name] + dh_params = load_dh_parameters(pki_dh['parameters']) + dh_numbers = dh_params.parameter_numbers() + dh_bits = dh_numbers.p.bit_length() + if dh_bits < min_key_size: + raise ConfigError(f'Minimum DH key-size is {min_key_size} bits!') diff --git a/python/vyos/firewall.py b/python/vyos/firewall.py index e70b4f0d9..e29aeb0c6 100644 --- a/python/vyos/firewall.py +++ b/python/vyos/firewall.py @@ -66,7 +66,7 @@ def fqdn_config_parse(firewall): rule = path[4] suffix = path[5][0] set_name = f'{hook_name}_{priority}_{rule}_{suffix}' - + if (path[0] == 'ipv4') and (path[1] == 'forward' or path[1] == 'input' or path[1] == 'output' or path[1] == 'name'): firewall['ip_fqdn'][set_name] = domain elif (path[0] == 'ipv6') and (path[1] == 'forward' or path[1] == 'input' or path[1] == 'output' or path[1] == 'name'): @@ -85,7 +85,7 @@ def fqdn_resolve(fqdn, ipv6=False): def find_nftables_rule(table, chain, rule_matches=[]): # Find rule in table/chain that matches all criteria and return the handle - results = cmd(f'sudo nft -a list chain {table} {chain}').split("\n") + results = cmd(f'sudo nft --handle list chain {table} {chain}').split("\n") for line in results: if all(rule_match in line for rule_match in rule_matches): handle_search = re.search('handle (\d+)', line) @@ -655,7 +655,7 @@ def geoip_update(firewall, force=False): 'ipv6_sets': ipv6_sets }) - result = run(f'nft -f {nftables_geoip_conf}') + result = run(f'nft --file {nftables_geoip_conf}') if result != 0: print('Error: GeoIP failed to update firewall') return False diff --git a/python/vyos/ifconfig/interface.py b/python/vyos/ifconfig/interface.py index 430a8dfc3..b159b2367 100644 --- a/python/vyos/ifconfig/interface.py +++ b/python/vyos/ifconfig/interface.py @@ -400,7 +400,7 @@ class Interface(Control): else: nft_del_element = f'delete element inet vrf_zones ct_iface_map {{ "{self.ifname}" }}' # Check if deleting is possible first to avoid raising errors - _, err = self._popen(f'nft -c {nft_del_element}') + _, err = self._popen(f'nft --check {nft_del_element}') if not err: # Remove map element self._cmd(f'nft {nft_del_element}') |