summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--interface-definitions/pki.xml.in1
-rw-r--r--python/vyos/util.py17
-rwxr-xr-xsmoketest/scripts/cli/test_pki.py57
-rwxr-xr-xsrc/conf_mode/pki.py129
4 files changed, 184 insertions, 20 deletions
diff --git a/interface-definitions/pki.xml.in b/interface-definitions/pki.xml.in
index 6d137c2ce..c4fde2c78 100644
--- a/interface-definitions/pki.xml.in
+++ b/interface-definitions/pki.xml.in
@@ -3,6 +3,7 @@
<node name="pki" owner="${vyos_conf_scripts_dir}/pki.py">
<properties>
<help>VyOS PKI configuration</help>
+ <priority>300</priority>
</properties>
<children>
<tagNode name="ca">
diff --git a/python/vyos/util.py b/python/vyos/util.py
index de55e108b..0d62fbfe9 100644
--- a/python/vyos/util.py
+++ b/python/vyos/util.py
@@ -757,21 +757,26 @@ def dict_search_args(dict_object, *path):
dict_object = dict_object[item]
return dict_object
-def dict_search_recursive(dict_object, key):
+def dict_search_recursive(dict_object, key, path=[]):
""" Traverse a dictionary recurisvely and return the value of the key
we are looking for.
Thankfully copied from https://stackoverflow.com/a/19871956
+
+ Modified to yield optional path to found keys
"""
if isinstance(dict_object, list):
for i in dict_object:
- for x in dict_search_recursive(i, key):
- yield x
+ new_path = path + [i]
+ for x in dict_search_recursive(i, key, new_path):
+ yield x
elif isinstance(dict_object, dict):
if key in dict_object:
- yield dict_object[key]
- for j in dict_object.values():
- for x in dict_search_recursive(j, key):
+ new_path = path + [key]
+ yield dict_object[key], new_path
+ for k, j in dict_object.items():
+ new_path = path + [k]
+ for x in dict_search_recursive(j, key, new_path):
yield x
def get_bridge_fdb(interface):
diff --git a/smoketest/scripts/cli/test_pki.py b/smoketest/scripts/cli/test_pki.py
index e92123dbc..cba5ffdde 100755
--- a/smoketest/scripts/cli/test_pki.py
+++ b/smoketest/scripts/cli/test_pki.py
@@ -128,6 +128,27 @@ g6a75NnEXo0J6YLAOOxd8fD2/HidhbceCmTF+3msidIzCsBidBkgn6V5TXx2IyMS
xGsJxVHfSKeooUQn6q76sg==
"""
+valid_update_cert = """
+MIICJTCCAcugAwIBAgIUZJqjNmPfVQwePjNFBtB6WI31ThMwCgYIKoZIzj0EAwIw
+VzELMAkGA1UEBhMCR0IxEzARBgNVBAgMClNvbWUtU3RhdGUxEjAQBgNVBAcMCVNv
+bWUtQ2l0eTENMAsGA1UECgwEVnlPUzEQMA4GA1UEAwwHdnlvcy5pbzAeFw0yMjA1
+MzExNTE3NDlaFw0yMzA1MzExNTE3NDlaMFcxCzAJBgNVBAYTAkdCMRMwEQYDVQQI
+DApTb21lLVN0YXRlMRIwEAYDVQQHDAlTb21lLUNpdHkxDTALBgNVBAoMBFZ5T1Mx
+EDAOBgNVBAMMB3Z5b3MuaW8wWTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAAQMe0h/
+3CdD8mEgy+klk55QfJ8R3ZycefxCn4abWjzTXz/TuCIxqb4wpRT8DZtIn4NRimFT
+mODYdEDOYxFtZm37o3UwczAMBgNVHRMBAf8EAjAAMA4GA1UdDwEB/wQEAwIHgDAT
+BgNVHSUEDDAKBggrBgEFBQcDAjAdBgNVHQ4EFgQUqH7KSZpzArpMFuxLXqI8e1QD
+fBkwHwYDVR0jBBgwFoAUqH7KSZpzArpMFuxLXqI8e1QDfBkwCgYIKoZIzj0EAwID
+SAAwRQIhAKofUgRtcUljmbubPF6sqHtn/3TRvuafl8VfPbk3s2bJAiBp3Q1AnU/O
+i7t5FGhCgnv5m8DW2F3LZPCJdW4ELQ3d9A==
+"""
+
+valid_update_private_key = """
+MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgvyODf22w/p7Zgfz9
+dyLIT09LqLOrUN6zbAecfukiiiyhRANCAAQMe0h/3CdD8mEgy+klk55QfJ8R3Zyc
+efxCn4abWjzTXz/TuCIxqb4wpRT8DZtIn4NRimFTmODYdEDOYxFtZm37
+"""
+
class TestPKI(VyOSUnitTestSHIM.TestCase):
@classmethod
def setUpClass(cls):
@@ -189,5 +210,41 @@ class TestPKI(VyOSUnitTestSHIM.TestCase):
with self.assertRaises(ConfigSessionError):
self.cli_commit()
+ def test_certificate_in_use(self):
+ self.cli_set(base_path + ['certificate', 'smoketest', 'certificate', valid_ca_cert.replace('\n','')])
+ self.cli_set(base_path + ['certificate', 'smoketest', 'private', 'key', valid_ca_private_key.replace('\n','')])
+ self.cli_commit()
+
+ self.cli_set(['service', 'https', 'certificates', 'certificate', 'smoketest'])
+ self.cli_commit()
+
+ self.cli_delete(base_path + ['certificate', 'smoketest'])
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
+
+ self.cli_delete(['service', 'https', 'certificates', 'certificate'])
+
+ def test_certificate_https_update(self):
+ self.cli_set(base_path + ['certificate', 'smoketest', 'certificate', valid_ca_cert.replace('\n','')])
+ self.cli_set(base_path + ['certificate', 'smoketest', 'private', 'key', valid_ca_private_key.replace('\n','')])
+ self.cli_commit()
+
+ self.cli_set(['service', 'https', 'certificates', 'certificate', 'smoketest'])
+ self.cli_commit()
+
+ cert_data = None
+
+ with open('/etc/ssl/certs/smoketest.pem') as f:
+ cert_data = f.read()
+
+ self.cli_set(base_path + ['certificate', 'smoketest', 'certificate', valid_update_cert.replace('\n','')])
+ self.cli_set(base_path + ['certificate', 'smoketest', 'private', 'key', valid_update_private_key.replace('\n','')])
+ self.cli_commit()
+
+ with open('/etc/ssl/certs/smoketest.pem') as f:
+ self.assertNotEqual(cert_data, f.read())
+
+ self.cli_delete(['service', 'https', 'certificates', 'certificate'])
+
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/src/conf_mode/pki.py b/src/conf_mode/pki.py
index efa3578b4..29ed7b1b7 100755
--- a/src/conf_mode/pki.py
+++ b/src/conf_mode/pki.py
@@ -29,12 +29,60 @@ from vyos.pki import load_private_key
from vyos.pki import load_crl
from vyos.pki import load_dh_parameters
from vyos.util import ask_input
+from vyos.util import call
+from vyos.util import dict_search_args
from vyos.util import dict_search_recursive
from vyos.xml import defaults
from vyos import ConfigError
from vyos import airbag
airbag.enable()
+# keys to recursively search for under specified path, script to call if update required
+sync_search = [
+ {
+ 'keys': ['certificate'],
+ 'path': ['service', 'https'],
+ 'script': '/usr/libexec/vyos/conf_mode/https.py'
+ },
+ {
+ 'keys': ['certificate', 'ca_certificate'],
+ 'path': ['interfaces', 'ethernet'],
+ 'script': '/usr/libexec/vyos/conf_mode/interfaces-ethernet.py'
+ },
+ {
+ 'keys': ['certificate', 'ca_certificate', 'dh_params', 'shared_secret_key', 'auth_key', 'crypt_key'],
+ 'path': ['interfaces', 'openvpn'],
+ 'script': '/usr/libexec/vyos/conf_mode/interfaces-openvpn.py'
+ },
+ {
+ 'keys': ['certificate', 'ca_certificate', 'local_key', 'remote_key'],
+ 'path': ['vpn', 'ipsec'],
+ 'script': '/usr/libexec/vyos/conf_mode/vpn_ipsec.py'
+ },
+ {
+ 'keys': ['certificate', 'ca_certificate'],
+ 'path': ['vpn', 'openconnect'],
+ 'script': '/usr/libexec/vyos/conf_mode/vpn_openconnect.py'
+ },
+ {
+ 'keys': ['certificate', 'ca_certificate'],
+ 'path': ['vpn', 'sstp'],
+ 'script': '/usr/libexec/vyos/conf_mode/vpn_sstp.py'
+ }
+]
+
+# key from other config nodes -> key in pki['changed'] and pki
+sync_translate = {
+ 'certificate': 'certificate',
+ 'ca_certificate': 'ca',
+ 'dh_params': 'dh',
+ 'local_key': 'key_pair',
+ 'remote_key': 'key_pair',
+ 'shared_secret_key': 'openvpn',
+ 'auth_key': 'openvpn',
+ 'crypt_key': 'openvpn'
+}
+
def get_config(config=None):
if config:
conf = config
@@ -47,12 +95,21 @@ def get_config(config=None):
no_tag_node_value_mangle=True)
pki['changed'] = {}
- tmp = node_changed(conf, base + ['ca'], key_mangling=('-', '_'))
+ tmp = node_changed(conf, base + ['ca'], key_mangling=('-', '_'), recursive=True)
if tmp: pki['changed'].update({'ca' : tmp})
- tmp = node_changed(conf, base + ['certificate'], key_mangling=('-', '_'))
+ tmp = node_changed(conf, base + ['certificate'], key_mangling=('-', '_'), recursive=True)
if tmp: pki['changed'].update({'certificate' : tmp})
+ tmp = node_changed(conf, base + ['dh'], key_mangling=('-', '_'), recursive=True)
+ if tmp: pki['changed'].update({'dh' : tmp})
+
+ tmp = node_changed(conf, base + ['key-pair'], key_mangling=('-', '_'), recursive=True)
+ if tmp: pki['changed'].update({'key_pair' : tmp})
+
+ tmp = node_changed(conf, base + ['openvpn', 'shared-secret'], key_mangling=('-', '_'), recursive=True)
+ if tmp: pki['changed'].update({'openvpn' : tmp})
+
# We only merge on the defaults of there is a configuration at all
if conf.exists(base):
default_values = defaults(base)
@@ -164,17 +221,30 @@ def verify(pki):
if 'changed' in pki:
# if the list is getting longer, we can move to a dict() and also embed the
# search key as value from line 173 or 176
- for cert_type in ['ca', 'certificate']:
- if not cert_type in pki['changed']:
- continue
- for certificate in pki['changed'][cert_type]:
- if cert_type not in pki or certificate not in pki['changed'][cert_type]:
- if cert_type == 'ca':
- if certificate in dict_search_recursive(pki['system'], 'ca_certificate'):
- raise ConfigError(f'CA certificate "{certificate}" is still in use!')
- elif cert_type == 'certificate':
- if certificate in dict_search_recursive(pki['system'], 'certificate'):
- raise ConfigError(f'Certificate "{certificate}" is still in use!')
+ for search in sync_search:
+ for key in search['keys']:
+ changed_key = sync_translate[key]
+
+ if changed_key not in pki['changed']:
+ continue
+
+ for item_name in pki['changed'][changed_key]:
+ node_present = False
+ if changed_key == 'openvpn':
+ node_present = dict_search_args(pki, 'openvpn', 'shared_secret', item_name)
+ else:
+ node_present = dict_search_args(pki, changed_key, item_name)
+
+ if not node_present:
+ search_dict = dict_search_args(pki['system'], *search['path'])
+
+ if not search_dict:
+ continue
+
+ for found_name, found_path in dict_search_recursive(search_dict, key):
+ if found_name == item_name:
+ path_str = " ".join(search['path'] + found_path)
+ raise ConfigError(f'PKI object "{item_name}" still in use by "{path_str}"')
return None
@@ -188,7 +258,38 @@ def apply(pki):
if not pki:
return None
- # XXX: restart services if the content of a certificate changes
+ if 'changed' in pki:
+ for search in sync_search:
+ for key in search['keys']:
+ changed_key = sync_translate[key]
+
+ if changed_key not in pki['changed']:
+ continue
+
+ for item_name in pki['changed'][changed_key]:
+ node_present = False
+ if changed_key == 'openvpn':
+ node_present = dict_search_args(pki, 'openvpn', 'shared_secret', item_name)
+ else:
+ node_present = dict_search_args(pki, changed_key, item_name)
+
+ if node_present:
+ search_dict = dict_search_args(pki['system'], *search['path'])
+
+ if not search_dict:
+ continue
+
+ for found_name, found_path in dict_search_recursive(search_dict, key):
+ if found_name == item_name:
+ path_str = ' '.join(search['path'] + found_path)
+ print(f'pki: Updating config: {path_str} {found_name}')
+
+ script = search['script']
+ if found_path[0] == 'interfaces':
+ ifname = found_path[2]
+ call(f'VYOS_TAGNODE_VALUE={ifname} {script}')
+ else:
+ call(script)
return None