summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Poessinger <christian@poessinger.com>2022-12-01 14:20:28 +0100
committerGitHub <noreply@github.com>2022-12-01 14:20:28 +0100
commit33c0d77bfa10c81dfc93c4eca781279df9cf1034 (patch)
tree274033b3413e383820c665de2b874c929a91524c
parent97f36fe0c1f337d73e5f0af4e2fecefadc3325b4 (diff)
parent032de023c21d92262243a2abb34bfd4c2009958e (diff)
downloadvyos-1x-33c0d77bfa10c81dfc93c4eca781279df9cf1034.tar.gz
vyos-1x-33c0d77bfa10c81dfc93c4eca781279df9cf1034.zip
Merge pull request #1684 from jestabro/config-script-dependency
pki: T4847: correct calling of config mode script dependencies from pki.py
-rw-r--r--data/config-mode-dependencies.json12
-rw-r--r--python/vyos/configdep.py24
-rwxr-xr-xsmoketest/scripts/cli/test_pki.py22
-rwxr-xr-xsrc/conf_mode/interfaces-ethernet.py2
-rwxr-xr-xsrc/conf_mode/pki.py71
5 files changed, 86 insertions, 45 deletions
diff --git a/data/config-mode-dependencies.json b/data/config-mode-dependencies.json
index dd0efda10..ad12cff87 100644
--- a/data/config-mode-dependencies.json
+++ b/data/config-mode-dependencies.json
@@ -1 +1,11 @@
-{"firewall": {"group_resync": ["nat", "policy-route"]}}
+{
+ "firewall": {"group_resync": ["nat", "policy-route"]},
+ "pki": {
+ "ethernet": ["interfaces-ethernet"],
+ "openvpn": ["interfaces-openvpn"],
+ "https": ["https"],
+ "ipsec": ["vpn_ipsec"],
+ "openconnect": ["vpn_openconnect"],
+ "sstp": ["vpn_sstp"]
+ }
+}
diff --git a/python/vyos/configdep.py b/python/vyos/configdep.py
index ca05cb092..d4b2cc78f 100644
--- a/python/vyos/configdep.py
+++ b/python/vyos/configdep.py
@@ -15,6 +15,7 @@
import os
import json
+import typing
from inspect import stack
from vyos.util import load_as_module
@@ -22,7 +23,12 @@ from vyos.defaults import directories
from vyos.configsource import VyOSError
from vyos import ConfigError
-dependent_func = {}
+# https://peps.python.org/pep-0484/#forward-references
+# for type 'Config'
+if typing.TYPE_CHECKING:
+ from vyos.config import Config
+
+dependent_func: dict[str, list[typing.Callable]] = {}
def canon_name(name: str) -> str:
return os.path.splitext(name)[0].replace('-', '_')
@@ -34,14 +40,14 @@ def canon_name_of_path(path: str) -> str:
def caller_name() -> str:
return stack()[-1].filename
-def read_dependency_dict():
+def read_dependency_dict() -> dict:
path = os.path.join(directories['data'],
'config-mode-dependencies.json')
with open(path) as f:
d = json.load(f)
return d
-def get_dependency_dict(config):
+def get_dependency_dict(config: 'Config') -> dict:
if hasattr(config, 'cached_dependency_dict'):
d = getattr(config, 'cached_dependency_dict')
else:
@@ -49,7 +55,7 @@ def get_dependency_dict(config):
setattr(config, 'cached_dependency_dict', d)
return d
-def run_config_mode_script(script: str, config):
+def run_config_mode_script(script: str, config: 'Config'):
path = os.path.join(directories['conf_mode'], script)
name = canon_name(script)
mod = load_as_module(name, path)
@@ -63,18 +69,22 @@ def run_config_mode_script(script: str, config):
except (VyOSError, ConfigError) as e:
raise ConfigError(repr(e))
-def def_closure(target: str, config):
+def def_closure(target: str, config: 'Config',
+ tagnode: typing.Optional[str] = None) -> typing.Callable:
script = target + '.py'
def func_impl():
+ if tagnode:
+ os.environ['VYOS_TAGNODE_VALUE'] = tagnode
run_config_mode_script(script, config)
return func_impl
-def set_dependents(case, config):
+def set_dependents(case: str, config: 'Config',
+ tagnode: typing.Optional[str] = None):
d = get_dependency_dict(config)
k = canon_name_of_path(caller_name())
l = dependent_func.setdefault(k, [])
for target in d[k][case]:
- func = def_closure(target, config)
+ func = def_closure(target, config, tagnode)
l.append(func)
def call_dependents():
diff --git a/smoketest/scripts/cli/test_pki.py b/smoketest/scripts/cli/test_pki.py
index cba5ffdde..b18b0b039 100755
--- a/smoketest/scripts/cli/test_pki.py
+++ b/smoketest/scripts/cli/test_pki.py
@@ -246,5 +246,27 @@ class TestPKI(VyOSUnitTestSHIM.TestCase):
self.cli_delete(['service', 'https', 'certificates', 'certificate'])
+ def test_certificate_eapol_update(self):
+ self.cli_set(base_path + ['certificate', 'smoketest', 'certificate', valid_ca_cert.replace('\n','')])
+ self.cli_set(base_path + ['certificate', 'smoketest', 'private', 'key', valid_ca_private_key.replace('\n','')])
+ self.cli_commit()
+
+ self.cli_set(['interfaces', 'ethernet', 'eth1', 'eapol', 'certificate', 'smoketest'])
+ self.cli_commit()
+
+ cert_data = None
+
+ with open('/run/wpa_supplicant/eth1_cert.pem') as f:
+ cert_data = f.read()
+
+ self.cli_set(base_path + ['certificate', 'smoketest', 'certificate', valid_update_cert.replace('\n','')])
+ self.cli_set(base_path + ['certificate', 'smoketest', 'private', 'key', valid_update_private_key.replace('\n','')])
+ self.cli_commit()
+
+ with open('/run/wpa_supplicant/eth1_cert.pem') as f:
+ self.assertNotEqual(cert_data, f.read())
+
+ self.cli_delete(['interfaces', 'ethernet', 'eth1', 'eapol'])
+
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/src/conf_mode/interfaces-ethernet.py b/src/conf_mode/interfaces-ethernet.py
index e02841831..b49c945cd 100755
--- a/src/conf_mode/interfaces-ethernet.py
+++ b/src/conf_mode/interfaces-ethernet.py
@@ -175,7 +175,7 @@ def generate(ethernet):
loaded_pki_cert = load_certificate(pki_cert['certificate'])
loaded_ca_certs = {load_certificate(c['certificate'])
- for c in ethernet['pki']['ca'].values()}
+ for c in ethernet['pki']['ca'].values()} if 'ca' in ethernet['pki'] else {}
cert_full_chain = find_chain(loaded_pki_cert, loaded_ca_certs)
diff --git a/src/conf_mode/pki.py b/src/conf_mode/pki.py
index 29ed7b1b7..e8f3cc87a 100755
--- a/src/conf_mode/pki.py
+++ b/src/conf_mode/pki.py
@@ -16,20 +16,16 @@
from sys import exit
-import jmespath
-
from vyos.config import Config
+from vyos.configdep import set_dependents, call_dependents
from vyos.configdict import dict_merge
from vyos.configdict import node_changed
from vyos.pki import is_ca_certificate
from vyos.pki import load_certificate
-from vyos.pki import load_certificate_request
from vyos.pki import load_public_key
from vyos.pki import load_private_key
from vyos.pki import load_crl
from vyos.pki import load_dh_parameters
-from vyos.util import ask_input
-from vyos.util import call
from vyos.util import dict_search_args
from vyos.util import dict_search_recursive
from vyos.xml import defaults
@@ -121,6 +117,39 @@ def get_config(config=None):
get_first_key=True,
no_tag_node_value_mangle=True)
+ if 'changed' in pki:
+ for search in sync_search:
+ for key in search['keys']:
+ changed_key = sync_translate[key]
+
+ if changed_key not in pki['changed']:
+ continue
+
+ for item_name in pki['changed'][changed_key]:
+ node_present = False
+ if changed_key == 'openvpn':
+ node_present = dict_search_args(pki, 'openvpn', 'shared_secret', item_name)
+ else:
+ node_present = dict_search_args(pki, changed_key, item_name)
+
+ if node_present:
+ search_dict = dict_search_args(pki['system'], *search['path'])
+
+ if not search_dict:
+ continue
+
+ for found_name, found_path in dict_search_recursive(search_dict, key):
+ if found_name == item_name:
+ path = search['path']
+ path_str = ' '.join(path + found_path)
+ print(f'pki: Updating config: {path_str} {found_name}')
+
+ if path[0] == 'interfaces':
+ ifname = found_path[0]
+ set_dependents(path[1], conf, ifname)
+ else:
+ set_dependents(path[1], conf)
+
return pki
def is_valid_certificate(raw_data):
@@ -259,37 +288,7 @@ def apply(pki):
return None
if 'changed' in pki:
- for search in sync_search:
- for key in search['keys']:
- changed_key = sync_translate[key]
-
- if changed_key not in pki['changed']:
- continue
-
- for item_name in pki['changed'][changed_key]:
- node_present = False
- if changed_key == 'openvpn':
- node_present = dict_search_args(pki, 'openvpn', 'shared_secret', item_name)
- else:
- node_present = dict_search_args(pki, changed_key, item_name)
-
- if node_present:
- search_dict = dict_search_args(pki['system'], *search['path'])
-
- if not search_dict:
- continue
-
- for found_name, found_path in dict_search_recursive(search_dict, key):
- if found_name == item_name:
- path_str = ' '.join(search['path'] + found_path)
- print(f'pki: Updating config: {path_str} {found_name}')
-
- script = search['script']
- if found_path[0] == 'interfaces':
- ifname = found_path[2]
- call(f'VYOS_TAGNODE_VALUE={ifname} {script}')
- else:
- call(script)
+ call_dependents()
return None