summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/vyos/configverify.py103
-rw-r--r--python/vyos/firewall.py6
-rw-r--r--python/vyos/ifconfig/interface.py2
3 files changed, 70 insertions, 41 deletions
diff --git a/python/vyos/configverify.py b/python/vyos/configverify.py
index 651036bad..300647d21 100644
--- a/python/vyos/configverify.py
+++ b/python/vyos/configverify.py
@@ -169,43 +169,6 @@ def verify_tunnel(config):
if 'source_address' in config and is_ipv6(config['source_address']):
raise ConfigError('Can not use local IPv6 address is for mGRE tunnels')
-def verify_eapol(config):
- """
- Common helper function used by interface implementations to perform
- recurring validation of EAPoL configuration.
- """
- if 'eapol' in config:
- if 'certificate' not in config['eapol']:
- raise ConfigError('Certificate must be specified when using EAPoL!')
-
- if 'pki' not in config or 'certificate' not in config['pki']:
- raise ConfigError('Invalid certificate specified for EAPoL')
-
- cert_name = config['eapol']['certificate']
- if cert_name not in config['pki']['certificate']:
- raise ConfigError('Invalid certificate specified for EAPoL')
-
- cert = config['pki']['certificate'][cert_name]
-
- if 'certificate' not in cert or 'private' not in cert or 'key' not in cert['private']:
- raise ConfigError('Invalid certificate/private key specified for EAPoL')
-
- if 'password_protected' in cert['private']:
- raise ConfigError('Encrypted private key cannot be used for EAPoL')
-
- if 'ca_certificate' in config['eapol']:
- if 'ca' not in config['pki']:
- raise ConfigError('Invalid CA certificate specified for EAPoL')
-
- for ca_cert_name in config['eapol']['ca_certificate']:
- if ca_cert_name not in config['pki']['ca']:
- raise ConfigError('Invalid CA certificate specified for EAPoL')
-
- ca_cert = config['pki']['ca'][ca_cert_name]
-
- if 'certificate' not in ca_cert:
- raise ConfigError('Invalid CA certificate specified for EAPoL')
-
def verify_mirror_redirect(config):
"""
Common helper function used by interface implementations to perform
@@ -495,3 +458,69 @@ def verify_access_list(access_list, config, version=''):
# Check if the specified ACL exists, if not error out
if dict_search(f'policy.access-list{version}.{access_list}', config) == None:
raise ConfigError(f'Specified access-list{version} "{access_list}" does not exist!')
+
+def verify_pki_certificate(config: dict, cert_name: str, no_password_protected: bool=False):
+ """
+ Common helper function user by PKI consumers to perform recurring
+ validation functions for PEM based certificates
+ """
+ if 'pki' not in config:
+ raise ConfigError('PKI is not configured!')
+
+ if 'certificate' not in config['pki']:
+ raise ConfigError('PKI does not contain any certificates!')
+
+ if cert_name not in config['pki']['certificate']:
+ raise ConfigError(f'Certificate "{cert_name}" not found in configuration!')
+
+ pki_cert = config['pki']['certificate'][cert_name]
+ if 'certificate' not in pki_cert:
+ raise ConfigError(f'PEM certificate for "{cert_name}" missing in configuration!')
+
+ if 'private' not in pki_cert or 'key' not in pki_cert['private']:
+ raise ConfigError(f'PEM private key for "{cert_name}" missing in configuration!')
+
+ if no_password_protected and 'password_protected' in pki_cert['private']:
+ raise ConfigError('Password protected PEM private key is not supported!')
+
+def verify_pki_ca_certificate(config: dict, ca_name: str):
+ """
+ Common helper function user by PKI consumers to perform recurring
+ validation functions for PEM based CA certificates
+ """
+ if 'pki' not in config:
+ raise ConfigError('PKI is not configured!')
+
+ if 'ca' not in config['pki']:
+ raise ConfigError('PKI does not contain any CA certificates!')
+
+ if ca_name not in config['pki']['ca']:
+ raise ConfigError(f'CA Certificate "{ca_name}" not found in configuration!')
+
+ pki_cert = config['pki']['ca'][ca_name]
+ if 'certificate' not in pki_cert:
+ raise ConfigError(f'PEM CA certificate for "{cert_name}" missing in configuration!')
+
+def verify_pki_dh_parameters(config: dict, dh_name: str, min_key_size: int=0):
+ """
+ Common helper function user by PKI consumers to perform recurring
+ validation functions on DH parameters
+ """
+ from vyos.pki import load_dh_parameters
+
+ if 'pki' not in config:
+ raise ConfigError('PKI is not configured!')
+
+ if 'dh' not in config['pki']:
+ raise ConfigError('PKI does not contain any DH parameters!')
+
+ if dh_name not in config['pki']['dh']:
+ raise ConfigError(f'DH parameter "{dh_name}" not found in configuration!')
+
+ if min_key_size:
+ pki_dh = config['pki']['dh'][dh_name]
+ dh_params = load_dh_parameters(pki_dh['parameters'])
+ dh_numbers = dh_params.parameter_numbers()
+ dh_bits = dh_numbers.p.bit_length()
+ if dh_bits < min_key_size:
+ raise ConfigError(f'Minimum DH key-size is {min_key_size} bits!')
diff --git a/python/vyos/firewall.py b/python/vyos/firewall.py
index e70b4f0d9..e29aeb0c6 100644
--- a/python/vyos/firewall.py
+++ b/python/vyos/firewall.py
@@ -66,7 +66,7 @@ def fqdn_config_parse(firewall):
rule = path[4]
suffix = path[5][0]
set_name = f'{hook_name}_{priority}_{rule}_{suffix}'
-
+
if (path[0] == 'ipv4') and (path[1] == 'forward' or path[1] == 'input' or path[1] == 'output' or path[1] == 'name'):
firewall['ip_fqdn'][set_name] = domain
elif (path[0] == 'ipv6') and (path[1] == 'forward' or path[1] == 'input' or path[1] == 'output' or path[1] == 'name'):
@@ -85,7 +85,7 @@ def fqdn_resolve(fqdn, ipv6=False):
def find_nftables_rule(table, chain, rule_matches=[]):
# Find rule in table/chain that matches all criteria and return the handle
- results = cmd(f'sudo nft -a list chain {table} {chain}').split("\n")
+ results = cmd(f'sudo nft --handle list chain {table} {chain}').split("\n")
for line in results:
if all(rule_match in line for rule_match in rule_matches):
handle_search = re.search('handle (\d+)', line)
@@ -655,7 +655,7 @@ def geoip_update(firewall, force=False):
'ipv6_sets': ipv6_sets
})
- result = run(f'nft -f {nftables_geoip_conf}')
+ result = run(f'nft --file {nftables_geoip_conf}')
if result != 0:
print('Error: GeoIP failed to update firewall')
return False
diff --git a/python/vyos/ifconfig/interface.py b/python/vyos/ifconfig/interface.py
index 430a8dfc3..b159b2367 100644
--- a/python/vyos/ifconfig/interface.py
+++ b/python/vyos/ifconfig/interface.py
@@ -400,7 +400,7 @@ class Interface(Control):
else:
nft_del_element = f'delete element inet vrf_zones ct_iface_map {{ "{self.ifname}" }}'
# Check if deleting is possible first to avoid raising errors
- _, err = self._popen(f'nft -c {nft_del_element}')
+ _, err = self._popen(f'nft --check {nft_del_element}')
if not err:
# Remove map element
self._cmd(f'nft {nft_del_element}')