diff options
author | Christian Poessinger <christian@poessinger.com> | 2022-12-01 14:20:28 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-12-01 14:20:28 +0100 |
commit | 33c0d77bfa10c81dfc93c4eca781279df9cf1034 (patch) | |
tree | 274033b3413e383820c665de2b874c929a91524c | |
parent | 97f36fe0c1f337d73e5f0af4e2fecefadc3325b4 (diff) | |
parent | 032de023c21d92262243a2abb34bfd4c2009958e (diff) | |
download | vyos-1x-33c0d77bfa10c81dfc93c4eca781279df9cf1034.tar.gz vyos-1x-33c0d77bfa10c81dfc93c4eca781279df9cf1034.zip |
Merge pull request #1684 from jestabro/config-script-dependency
pki: T4847: correct calling of config mode script dependencies from pki.py
-rw-r--r-- | data/config-mode-dependencies.json | 12 | ||||
-rw-r--r-- | python/vyos/configdep.py | 24 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_pki.py | 22 | ||||
-rwxr-xr-x | src/conf_mode/interfaces-ethernet.py | 2 | ||||
-rwxr-xr-x | src/conf_mode/pki.py | 71 |
5 files changed, 86 insertions, 45 deletions
diff --git a/data/config-mode-dependencies.json b/data/config-mode-dependencies.json index dd0efda10..ad12cff87 100644 --- a/data/config-mode-dependencies.json +++ b/data/config-mode-dependencies.json @@ -1 +1,11 @@ -{"firewall": {"group_resync": ["nat", "policy-route"]}} +{ + "firewall": {"group_resync": ["nat", "policy-route"]}, + "pki": { + "ethernet": ["interfaces-ethernet"], + "openvpn": ["interfaces-openvpn"], + "https": ["https"], + "ipsec": ["vpn_ipsec"], + "openconnect": ["vpn_openconnect"], + "sstp": ["vpn_sstp"] + } +} diff --git a/python/vyos/configdep.py b/python/vyos/configdep.py index ca05cb092..d4b2cc78f 100644 --- a/python/vyos/configdep.py +++ b/python/vyos/configdep.py @@ -15,6 +15,7 @@ import os import json +import typing from inspect import stack from vyos.util import load_as_module @@ -22,7 +23,12 @@ from vyos.defaults import directories from vyos.configsource import VyOSError from vyos import ConfigError -dependent_func = {} +# https://peps.python.org/pep-0484/#forward-references +# for type 'Config' +if typing.TYPE_CHECKING: + from vyos.config import Config + +dependent_func: dict[str, list[typing.Callable]] = {} def canon_name(name: str) -> str: return os.path.splitext(name)[0].replace('-', '_') @@ -34,14 +40,14 @@ def canon_name_of_path(path: str) -> str: def caller_name() -> str: return stack()[-1].filename -def read_dependency_dict(): +def read_dependency_dict() -> dict: path = os.path.join(directories['data'], 'config-mode-dependencies.json') with open(path) as f: d = json.load(f) return d -def get_dependency_dict(config): +def get_dependency_dict(config: 'Config') -> dict: if hasattr(config, 'cached_dependency_dict'): d = getattr(config, 'cached_dependency_dict') else: @@ -49,7 +55,7 @@ def get_dependency_dict(config): setattr(config, 'cached_dependency_dict', d) return d -def run_config_mode_script(script: str, config): +def run_config_mode_script(script: str, config: 'Config'): path = os.path.join(directories['conf_mode'], script) name = canon_name(script) mod = load_as_module(name, path) @@ -63,18 +69,22 @@ def run_config_mode_script(script: str, config): except (VyOSError, ConfigError) as e: raise ConfigError(repr(e)) -def def_closure(target: str, config): +def def_closure(target: str, config: 'Config', + tagnode: typing.Optional[str] = None) -> typing.Callable: script = target + '.py' def func_impl(): + if tagnode: + os.environ['VYOS_TAGNODE_VALUE'] = tagnode run_config_mode_script(script, config) return func_impl -def set_dependents(case, config): +def set_dependents(case: str, config: 'Config', + tagnode: typing.Optional[str] = None): d = get_dependency_dict(config) k = canon_name_of_path(caller_name()) l = dependent_func.setdefault(k, []) for target in d[k][case]: - func = def_closure(target, config) + func = def_closure(target, config, tagnode) l.append(func) def call_dependents(): diff --git a/smoketest/scripts/cli/test_pki.py b/smoketest/scripts/cli/test_pki.py index cba5ffdde..b18b0b039 100755 --- a/smoketest/scripts/cli/test_pki.py +++ b/smoketest/scripts/cli/test_pki.py @@ -246,5 +246,27 @@ class TestPKI(VyOSUnitTestSHIM.TestCase): self.cli_delete(['service', 'https', 'certificates', 'certificate']) + def test_certificate_eapol_update(self): + self.cli_set(base_path + ['certificate', 'smoketest', 'certificate', valid_ca_cert.replace('\n','')]) + self.cli_set(base_path + ['certificate', 'smoketest', 'private', 'key', valid_ca_private_key.replace('\n','')]) + self.cli_commit() + + self.cli_set(['interfaces', 'ethernet', 'eth1', 'eapol', 'certificate', 'smoketest']) + self.cli_commit() + + cert_data = None + + with open('/run/wpa_supplicant/eth1_cert.pem') as f: + cert_data = f.read() + + self.cli_set(base_path + ['certificate', 'smoketest', 'certificate', valid_update_cert.replace('\n','')]) + self.cli_set(base_path + ['certificate', 'smoketest', 'private', 'key', valid_update_private_key.replace('\n','')]) + self.cli_commit() + + with open('/run/wpa_supplicant/eth1_cert.pem') as f: + self.assertNotEqual(cert_data, f.read()) + + self.cli_delete(['interfaces', 'ethernet', 'eth1', 'eapol']) + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/src/conf_mode/interfaces-ethernet.py b/src/conf_mode/interfaces-ethernet.py index e02841831..b49c945cd 100755 --- a/src/conf_mode/interfaces-ethernet.py +++ b/src/conf_mode/interfaces-ethernet.py @@ -175,7 +175,7 @@ def generate(ethernet): loaded_pki_cert = load_certificate(pki_cert['certificate']) loaded_ca_certs = {load_certificate(c['certificate']) - for c in ethernet['pki']['ca'].values()} + for c in ethernet['pki']['ca'].values()} if 'ca' in ethernet['pki'] else {} cert_full_chain = find_chain(loaded_pki_cert, loaded_ca_certs) diff --git a/src/conf_mode/pki.py b/src/conf_mode/pki.py index 29ed7b1b7..e8f3cc87a 100755 --- a/src/conf_mode/pki.py +++ b/src/conf_mode/pki.py @@ -16,20 +16,16 @@ from sys import exit -import jmespath - from vyos.config import Config +from vyos.configdep import set_dependents, call_dependents from vyos.configdict import dict_merge from vyos.configdict import node_changed from vyos.pki import is_ca_certificate from vyos.pki import load_certificate -from vyos.pki import load_certificate_request from vyos.pki import load_public_key from vyos.pki import load_private_key from vyos.pki import load_crl from vyos.pki import load_dh_parameters -from vyos.util import ask_input -from vyos.util import call from vyos.util import dict_search_args from vyos.util import dict_search_recursive from vyos.xml import defaults @@ -121,6 +117,39 @@ def get_config(config=None): get_first_key=True, no_tag_node_value_mangle=True) + if 'changed' in pki: + for search in sync_search: + for key in search['keys']: + changed_key = sync_translate[key] + + if changed_key not in pki['changed']: + continue + + for item_name in pki['changed'][changed_key]: + node_present = False + if changed_key == 'openvpn': + node_present = dict_search_args(pki, 'openvpn', 'shared_secret', item_name) + else: + node_present = dict_search_args(pki, changed_key, item_name) + + if node_present: + search_dict = dict_search_args(pki['system'], *search['path']) + + if not search_dict: + continue + + for found_name, found_path in dict_search_recursive(search_dict, key): + if found_name == item_name: + path = search['path'] + path_str = ' '.join(path + found_path) + print(f'pki: Updating config: {path_str} {found_name}') + + if path[0] == 'interfaces': + ifname = found_path[0] + set_dependents(path[1], conf, ifname) + else: + set_dependents(path[1], conf) + return pki def is_valid_certificate(raw_data): @@ -259,37 +288,7 @@ def apply(pki): return None if 'changed' in pki: - for search in sync_search: - for key in search['keys']: - changed_key = sync_translate[key] - - if changed_key not in pki['changed']: - continue - - for item_name in pki['changed'][changed_key]: - node_present = False - if changed_key == 'openvpn': - node_present = dict_search_args(pki, 'openvpn', 'shared_secret', item_name) - else: - node_present = dict_search_args(pki, changed_key, item_name) - - if node_present: - search_dict = dict_search_args(pki['system'], *search['path']) - - if not search_dict: - continue - - for found_name, found_path in dict_search_recursive(search_dict, key): - if found_name == item_name: - path_str = ' '.join(search['path'] + found_path) - print(f'pki: Updating config: {path_str} {found_name}') - - script = search['script'] - if found_path[0] == 'interfaces': - ifname = found_path[2] - call(f'VYOS_TAGNODE_VALUE={ifname} {script}') - else: - call(script) + call_dependents() return None |