summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--python/vyos/util.py13
-rwxr-xr-xsrc/conf_mode/vpn_ipsec.py24
2 files changed, 25 insertions, 12 deletions
diff --git a/python/vyos/util.py b/python/vyos/util.py
index 8247ccb2d..c64b477ef 100644
--- a/python/vyos/util.py
+++ b/python/vyos/util.py
@@ -705,6 +705,19 @@ def dict_search(path, my_dict):
c = c.get(p, {})
return c.get(parts[-1], None)
+def dict_search_args(dict_object, *path):
+ # Traverse dictionary using variable arguments
+ # Added due to above function not allowing for '.' in the key names
+ # Example: dict_search_args(some_dict, 'key', 'subkey', 'subsubkey', ...)
+ if not isinstance(dict_object, dict) or not path:
+ return None
+
+ for item in path:
+ if item not in dict_object:
+ return None
+ dict_object = dict_object[item]
+ return dict_object
+
def get_interface_config(interface):
""" Returns the used encapsulation protocol for given interface.
If interface does not exist, None is returned.
diff --git a/src/conf_mode/vpn_ipsec.py b/src/conf_mode/vpn_ipsec.py
index 50223320d..76ee64a20 100755
--- a/src/conf_mode/vpn_ipsec.py
+++ b/src/conf_mode/vpn_ipsec.py
@@ -33,7 +33,7 @@ from vyos.template import ip_from_cidr
from vyos.template import render
from vyos.validate import is_ipv6_link_local
from vyos.util import call
-from vyos.util import dict_search
+from vyos.util import dict_search_args
from vyos.util import run
from vyos.xml import defaults
from vyos import ConfigError
@@ -116,7 +116,7 @@ def get_config(config=None):
return ipsec
def get_rsa_local_key(ipsec):
- return dict_search('local_key.file', ipsec['rsa_keys'])
+ return dict_search_args(ipsec['rsa_keys'], 'local_key', 'file')
def verify_rsa_local_key(ipsec):
file = get_rsa_local_key(ipsec)
@@ -132,7 +132,7 @@ def verify_rsa_local_key(ipsec):
return False
def verify_rsa_key(ipsec, key_name):
- return dict_search(f'rsa_key_name.{key_name}.rsa_key', ipsec['rsa_keys'])
+ return dict_search_args(ipsec['rsa_keys'], 'rsa_key_name', key_name, 'rsa_key')
def get_dhcp_address(iface):
addresses = Interface(iface).get_addr()
@@ -150,13 +150,13 @@ def verify_pki(pki, x509_conf):
ca_cert_name = x509_conf['ca_certificate']
cert_name = x509_conf['certificate']
- if not dict_search(f'ca.{ca_cert_name}.certificate', ipsec['pki']):
+ if not dict_search_args(ipsec['pki'], 'ca', ca_cert_name, 'certificate'):
raise ConfigError(f'Missing CA certificate on specified PKI CA certificate "{ca_cert_name}"')
- if not dict_search(f'certificate.{cert_name}.certificate', ipsec['pki']):
+ if not dict_search_args(ipsec['pki'], 'certificate', cert_name, 'certificate'):
raise ConfigError(f'Missing certificate on specified PKI certificate "{cert_name}"')
- if not dict_search(f'certificate.{cert_name}.private.key', ipsec['pki']):
+ if not dict_search_args(ipsec['pki'], 'certificate', cert_name, 'private', 'key'):
raise ConfigError(f'Missing private key on specified PKI certificate "{cert_name}"')
return True
@@ -284,13 +284,13 @@ def verify(ipsec):
def generate_pki_files(pki, x509_conf):
ca_cert_name = x509_conf['ca_certificate']
- ca_cert_data = dict_search(f'ca.{ca_cert_name}.certificate', pki)
- ca_cert_crls = dict_search(f'ca.{ca_cert_name}.crl', pki) or []
+ ca_cert_data = dict_search_args(pki, 'ca', ca_cert_name, 'certificate')
+ ca_cert_crls = dict_search_args(pki, 'ca', ca_cert_name, 'crl') or []
crl_index = 1
cert_name = x509_conf['certificate']
- cert_data = dict_search(f'certificate.{cert_name}.certificate', pki)
- key_data = dict_search(f'certificate.{cert_name}.private.key', pki)
+ cert_data = dict_search_args(pki, 'certificate', cert_name, 'certificate')
+ key_data = dict_search_args(pki, 'certificate', cert_name, 'private', 'key')
protected = 'passphrase' in x509_conf
with open(os.path.join(CA_PATH, f'{ca_cert_name}.pem'), 'w') as f:
@@ -351,8 +351,8 @@ def generate(ipsec):
if 'tunnel' in peer_conf:
for tunnel, tunnel_conf in peer_conf['tunnel'].items():
- local_prefixes = dict_search('local.prefix', tunnel_conf)
- remote_prefixes = dict_search('remote.prefix', tunnel_conf)
+ local_prefixes = dict_search_args(tunnel_conf, 'local', 'prefix')
+ remote_prefixes = dict_search_args(tunnel_conf, 'remote', 'prefix')
if not local_prefixes or not remote_prefixes:
continue