summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rwxr-xr-xsrc/conf_mode/pki.py46
-rw-r--r--src/tests/test_dict_search.py21
2 files changed, 61 insertions, 6 deletions
diff --git a/src/conf_mode/pki.py b/src/conf_mode/pki.py
index ef1b57650..efa3578b4 100755
--- a/src/conf_mode/pki.py
+++ b/src/conf_mode/pki.py
@@ -16,8 +16,11 @@
from sys import exit
+import jmespath
+
from vyos.config import Config
from vyos.configdict import dict_merge
+from vyos.configdict import node_changed
from vyos.pki import is_ca_certificate
from vyos.pki import load_certificate
from vyos.pki import load_certificate_request
@@ -26,6 +29,7 @@ from vyos.pki import load_private_key
from vyos.pki import load_crl
from vyos.pki import load_dh_parameters
from vyos.util import ask_input
+from vyos.util import dict_search_recursive
from vyos.xml import defaults
from vyos import ConfigError
from vyos import airbag
@@ -37,14 +41,29 @@ def get_config(config=None):
else:
conf = Config()
base = ['pki']
- if not conf.exists(base):
- return None
pki = conf.get_config_dict(base, key_mangling=('-', '_'),
- get_first_key=True, no_tag_node_value_mangle=True)
+ get_first_key=True,
+ no_tag_node_value_mangle=True)
+
+ pki['changed'] = {}
+ tmp = node_changed(conf, base + ['ca'], key_mangling=('-', '_'))
+ if tmp: pki['changed'].update({'ca' : tmp})
+
+ tmp = node_changed(conf, base + ['certificate'], key_mangling=('-', '_'))
+ if tmp: pki['changed'].update({'certificate' : tmp})
+
+ # We only merge on the defaults of there is a configuration at all
+ if conf.exists(base):
+ default_values = defaults(base)
+ pki = dict_merge(default_values, pki)
+
+ # We need to get the entire system configuration to verify that we are not
+ # deleting a certificate that is still referenced somewhere!
+ pki['system'] = conf.get_config_dict([], key_mangling=('-', '_'),
+ get_first_key=True,
+ no_tag_node_value_mangle=True)
- default_values = defaults(base)
- pki = dict_merge(default_values, pki)
return pki
def is_valid_certificate(raw_data):
@@ -142,6 +161,21 @@ def verify(pki):
if len(country) != 2 or not country.isalpha():
raise ConfigError(f'Invalid default country value. Value must be 2 alpha characters.')
+ if 'changed' in pki:
+ # if the list is getting longer, we can move to a dict() and also embed the
+ # search key as value from line 173 or 176
+ for cert_type in ['ca', 'certificate']:
+ if not cert_type in pki['changed']:
+ continue
+ for certificate in pki['changed'][cert_type]:
+ if cert_type not in pki or certificate not in pki['changed'][cert_type]:
+ if cert_type == 'ca':
+ if certificate in dict_search_recursive(pki['system'], 'ca_certificate'):
+ raise ConfigError(f'CA certificate "{certificate}" is still in use!')
+ elif cert_type == 'certificate':
+ if certificate in dict_search_recursive(pki['system'], 'certificate'):
+ raise ConfigError(f'Certificate "{certificate}" is still in use!')
+
return None
def generate(pki):
@@ -154,6 +188,8 @@ def apply(pki):
if not pki:
return None
+ # XXX: restart services if the content of a certificate changes
+
return None
if __name__ == '__main__':
diff --git a/src/tests/test_dict_search.py b/src/tests/test_dict_search.py
index 991722f0f..1028437b2 100644
--- a/src/tests/test_dict_search.py
+++ b/src/tests/test_dict_search.py
@@ -16,13 +16,25 @@
from unittest import TestCase
from vyos.util import dict_search
+from vyos.util import dict_search_recursive
data = {
'string': 'fooo',
'nested': {'string': 'bar', 'empty': '', 'list': ['foo', 'bar']},
'non': {},
'list': ['bar', 'baz'],
- 'dict': {'key_1': {}, 'key_2': 'vyos'}
+ 'dict': {'key_1': {}, 'key_2': 'vyos'},
+ 'interfaces': {'dummy': {'dum0': {'address': ['192.0.2.17/29']}},
+ 'ethernet': {'eth0': {'address': ['2001:db8::1/64', '192.0.2.1/29'],
+ 'description': 'Test123',
+ 'duplex': 'auto',
+ 'hw_id': '00:00:00:00:00:01',
+ 'speed': 'auto'},
+ 'eth1': {'address': ['192.0.2.9/29'],
+ 'description': 'Test456',
+ 'duplex': 'auto',
+ 'hw_id': '00:00:00:00:00:02',
+ 'speed': 'auto'}}}
}
class TestDictSearch(TestCase):
@@ -63,3 +75,10 @@ class TestDictSearch(TestCase):
# TestDictSearch: Return list items when querying nested list
self.assertEqual(dict_search('nested.list', None), None)
self.assertEqual(dict_search(None, data), None)
+
+ def test_dict_search_recursive(self):
+ # Test nested search in dictionary
+ tmp = list(dict_search_recursive(data, 'hw_id'))
+ self.assertEqual(len(tmp), 2)
+ tmp = list(dict_search_recursive(data, 'address'))
+ self.assertEqual(len(tmp), 3)