diff options
-rw-r--r-- | interface-definitions/pki.xml.in | 1 | ||||
-rw-r--r-- | python/vyos/util.py | 17 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_pki.py | 57 | ||||
-rwxr-xr-x | src/conf_mode/pki.py | 129 |
4 files changed, 184 insertions, 20 deletions
diff --git a/interface-definitions/pki.xml.in b/interface-definitions/pki.xml.in index 6d137c2ce..c4fde2c78 100644 --- a/interface-definitions/pki.xml.in +++ b/interface-definitions/pki.xml.in @@ -3,6 +3,7 @@ <node name="pki" owner="${vyos_conf_scripts_dir}/pki.py"> <properties> <help>VyOS PKI configuration</help> + <priority>300</priority> </properties> <children> <tagNode name="ca"> diff --git a/python/vyos/util.py b/python/vyos/util.py index de55e108b..0d62fbfe9 100644 --- a/python/vyos/util.py +++ b/python/vyos/util.py @@ -757,21 +757,26 @@ def dict_search_args(dict_object, *path): dict_object = dict_object[item] return dict_object -def dict_search_recursive(dict_object, key): +def dict_search_recursive(dict_object, key, path=[]): """ Traverse a dictionary recurisvely and return the value of the key we are looking for. Thankfully copied from https://stackoverflow.com/a/19871956 + + Modified to yield optional path to found keys """ if isinstance(dict_object, list): for i in dict_object: - for x in dict_search_recursive(i, key): - yield x + new_path = path + [i] + for x in dict_search_recursive(i, key, new_path): + yield x elif isinstance(dict_object, dict): if key in dict_object: - yield dict_object[key] - for j in dict_object.values(): - for x in dict_search_recursive(j, key): + new_path = path + [key] + yield dict_object[key], new_path + for k, j in dict_object.items(): + new_path = path + [k] + for x in dict_search_recursive(j, key, new_path): yield x def get_bridge_fdb(interface): diff --git a/smoketest/scripts/cli/test_pki.py b/smoketest/scripts/cli/test_pki.py index e92123dbc..cba5ffdde 100755 --- a/smoketest/scripts/cli/test_pki.py +++ b/smoketest/scripts/cli/test_pki.py @@ -128,6 +128,27 @@ g6a75NnEXo0J6YLAOOxd8fD2/HidhbceCmTF+3msidIzCsBidBkgn6V5TXx2IyMS xGsJxVHfSKeooUQn6q76sg== """ +valid_update_cert = """ +MIICJTCCAcugAwIBAgIUZJqjNmPfVQwePjNFBtB6WI31ThMwCgYIKoZIzj0EAwIw +VzELMAkGA1UEBhMCR0IxEzARBgNVBAgMClNvbWUtU3RhdGUxEjAQBgNVBAcMCVNv +bWUtQ2l0eTENMAsGA1UECgwEVnlPUzEQMA4GA1UEAwwHdnlvcy5pbzAeFw0yMjA1 +MzExNTE3NDlaFw0yMzA1MzExNTE3NDlaMFcxCzAJBgNVBAYTAkdCMRMwEQYDVQQI +DApTb21lLVN0YXRlMRIwEAYDVQQHDAlTb21lLUNpdHkxDTALBgNVBAoMBFZ5T1Mx +EDAOBgNVBAMMB3Z5b3MuaW8wWTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAAQMe0h/ +3CdD8mEgy+klk55QfJ8R3ZycefxCn4abWjzTXz/TuCIxqb4wpRT8DZtIn4NRimFT +mODYdEDOYxFtZm37o3UwczAMBgNVHRMBAf8EAjAAMA4GA1UdDwEB/wQEAwIHgDAT +BgNVHSUEDDAKBggrBgEFBQcDAjAdBgNVHQ4EFgQUqH7KSZpzArpMFuxLXqI8e1QD +fBkwHwYDVR0jBBgwFoAUqH7KSZpzArpMFuxLXqI8e1QDfBkwCgYIKoZIzj0EAwID +SAAwRQIhAKofUgRtcUljmbubPF6sqHtn/3TRvuafl8VfPbk3s2bJAiBp3Q1AnU/O +i7t5FGhCgnv5m8DW2F3LZPCJdW4ELQ3d9A== +""" + +valid_update_private_key = """ +MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgvyODf22w/p7Zgfz9 +dyLIT09LqLOrUN6zbAecfukiiiyhRANCAAQMe0h/3CdD8mEgy+klk55QfJ8R3Zyc +efxCn4abWjzTXz/TuCIxqb4wpRT8DZtIn4NRimFTmODYdEDOYxFtZm37 +""" + class TestPKI(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): @@ -189,5 +210,41 @@ class TestPKI(VyOSUnitTestSHIM.TestCase): with self.assertRaises(ConfigSessionError): self.cli_commit() + def test_certificate_in_use(self): + self.cli_set(base_path + ['certificate', 'smoketest', 'certificate', valid_ca_cert.replace('\n','')]) + self.cli_set(base_path + ['certificate', 'smoketest', 'private', 'key', valid_ca_private_key.replace('\n','')]) + self.cli_commit() + + self.cli_set(['service', 'https', 'certificates', 'certificate', 'smoketest']) + self.cli_commit() + + self.cli_delete(base_path + ['certificate', 'smoketest']) + with self.assertRaises(ConfigSessionError): + self.cli_commit() + + self.cli_delete(['service', 'https', 'certificates', 'certificate']) + + def test_certificate_https_update(self): + self.cli_set(base_path + ['certificate', 'smoketest', 'certificate', valid_ca_cert.replace('\n','')]) + self.cli_set(base_path + ['certificate', 'smoketest', 'private', 'key', valid_ca_private_key.replace('\n','')]) + self.cli_commit() + + self.cli_set(['service', 'https', 'certificates', 'certificate', 'smoketest']) + self.cli_commit() + + cert_data = None + + with open('/etc/ssl/certs/smoketest.pem') as f: + cert_data = f.read() + + self.cli_set(base_path + ['certificate', 'smoketest', 'certificate', valid_update_cert.replace('\n','')]) + self.cli_set(base_path + ['certificate', 'smoketest', 'private', 'key', valid_update_private_key.replace('\n','')]) + self.cli_commit() + + with open('/etc/ssl/certs/smoketest.pem') as f: + self.assertNotEqual(cert_data, f.read()) + + self.cli_delete(['service', 'https', 'certificates', 'certificate']) + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/src/conf_mode/pki.py b/src/conf_mode/pki.py index efa3578b4..29ed7b1b7 100755 --- a/src/conf_mode/pki.py +++ b/src/conf_mode/pki.py @@ -29,12 +29,60 @@ from vyos.pki import load_private_key from vyos.pki import load_crl from vyos.pki import load_dh_parameters from vyos.util import ask_input +from vyos.util import call +from vyos.util import dict_search_args from vyos.util import dict_search_recursive from vyos.xml import defaults from vyos import ConfigError from vyos import airbag airbag.enable() +# keys to recursively search for under specified path, script to call if update required +sync_search = [ + { + 'keys': ['certificate'], + 'path': ['service', 'https'], + 'script': '/usr/libexec/vyos/conf_mode/https.py' + }, + { + 'keys': ['certificate', 'ca_certificate'], + 'path': ['interfaces', 'ethernet'], + 'script': '/usr/libexec/vyos/conf_mode/interfaces-ethernet.py' + }, + { + 'keys': ['certificate', 'ca_certificate', 'dh_params', 'shared_secret_key', 'auth_key', 'crypt_key'], + 'path': ['interfaces', 'openvpn'], + 'script': '/usr/libexec/vyos/conf_mode/interfaces-openvpn.py' + }, + { + 'keys': ['certificate', 'ca_certificate', 'local_key', 'remote_key'], + 'path': ['vpn', 'ipsec'], + 'script': '/usr/libexec/vyos/conf_mode/vpn_ipsec.py' + }, + { + 'keys': ['certificate', 'ca_certificate'], + 'path': ['vpn', 'openconnect'], + 'script': '/usr/libexec/vyos/conf_mode/vpn_openconnect.py' + }, + { + 'keys': ['certificate', 'ca_certificate'], + 'path': ['vpn', 'sstp'], + 'script': '/usr/libexec/vyos/conf_mode/vpn_sstp.py' + } +] + +# key from other config nodes -> key in pki['changed'] and pki +sync_translate = { + 'certificate': 'certificate', + 'ca_certificate': 'ca', + 'dh_params': 'dh', + 'local_key': 'key_pair', + 'remote_key': 'key_pair', + 'shared_secret_key': 'openvpn', + 'auth_key': 'openvpn', + 'crypt_key': 'openvpn' +} + def get_config(config=None): if config: conf = config @@ -47,12 +95,21 @@ def get_config(config=None): no_tag_node_value_mangle=True) pki['changed'] = {} - tmp = node_changed(conf, base + ['ca'], key_mangling=('-', '_')) + tmp = node_changed(conf, base + ['ca'], key_mangling=('-', '_'), recursive=True) if tmp: pki['changed'].update({'ca' : tmp}) - tmp = node_changed(conf, base + ['certificate'], key_mangling=('-', '_')) + tmp = node_changed(conf, base + ['certificate'], key_mangling=('-', '_'), recursive=True) if tmp: pki['changed'].update({'certificate' : tmp}) + tmp = node_changed(conf, base + ['dh'], key_mangling=('-', '_'), recursive=True) + if tmp: pki['changed'].update({'dh' : tmp}) + + tmp = node_changed(conf, base + ['key-pair'], key_mangling=('-', '_'), recursive=True) + if tmp: pki['changed'].update({'key_pair' : tmp}) + + tmp = node_changed(conf, base + ['openvpn', 'shared-secret'], key_mangling=('-', '_'), recursive=True) + if tmp: pki['changed'].update({'openvpn' : tmp}) + # We only merge on the defaults of there is a configuration at all if conf.exists(base): default_values = defaults(base) @@ -164,17 +221,30 @@ def verify(pki): if 'changed' in pki: # if the list is getting longer, we can move to a dict() and also embed the # search key as value from line 173 or 176 - for cert_type in ['ca', 'certificate']: - if not cert_type in pki['changed']: - continue - for certificate in pki['changed'][cert_type]: - if cert_type not in pki or certificate not in pki['changed'][cert_type]: - if cert_type == 'ca': - if certificate in dict_search_recursive(pki['system'], 'ca_certificate'): - raise ConfigError(f'CA certificate "{certificate}" is still in use!') - elif cert_type == 'certificate': - if certificate in dict_search_recursive(pki['system'], 'certificate'): - raise ConfigError(f'Certificate "{certificate}" is still in use!') + for search in sync_search: + for key in search['keys']: + changed_key = sync_translate[key] + + if changed_key not in pki['changed']: + continue + + for item_name in pki['changed'][changed_key]: + node_present = False + if changed_key == 'openvpn': + node_present = dict_search_args(pki, 'openvpn', 'shared_secret', item_name) + else: + node_present = dict_search_args(pki, changed_key, item_name) + + if not node_present: + search_dict = dict_search_args(pki['system'], *search['path']) + + if not search_dict: + continue + + for found_name, found_path in dict_search_recursive(search_dict, key): + if found_name == item_name: + path_str = " ".join(search['path'] + found_path) + raise ConfigError(f'PKI object "{item_name}" still in use by "{path_str}"') return None @@ -188,7 +258,38 @@ def apply(pki): if not pki: return None - # XXX: restart services if the content of a certificate changes + if 'changed' in pki: + for search in sync_search: + for key in search['keys']: + changed_key = sync_translate[key] + + if changed_key not in pki['changed']: + continue + + for item_name in pki['changed'][changed_key]: + node_present = False + if changed_key == 'openvpn': + node_present = dict_search_args(pki, 'openvpn', 'shared_secret', item_name) + else: + node_present = dict_search_args(pki, changed_key, item_name) + + if node_present: + search_dict = dict_search_args(pki['system'], *search['path']) + + if not search_dict: + continue + + for found_name, found_path in dict_search_recursive(search_dict, key): + if found_name == item_name: + path_str = ' '.join(search['path'] + found_path) + print(f'pki: Updating config: {path_str} {found_name}') + + script = search['script'] + if found_path[0] == 'interfaces': + ifname = found_path[2] + call(f'VYOS_TAGNODE_VALUE={ifname} {script}') + else: + call(script) return None |