diff options
Diffstat (limited to 'python/vyos')
-rw-r--r-- | python/vyos/config.py | 102 | ||||
-rw-r--r-- | python/vyos/configtree.py | 18 | ||||
-rw-r--r-- | python/vyos/nat.py | 2 | ||||
-rw-r--r-- | python/vyos/pki.py | 14 | ||||
-rw-r--r-- | python/vyos/xml_ref/__init__.py | 28 | ||||
-rw-r--r-- | python/vyos/xml_ref/definition.py | 99 |
6 files changed, 152 insertions, 111 deletions
diff --git a/python/vyos/config.py b/python/vyos/config.py index 179f60c43..6fececd76 100644 --- a/python/vyos/config.py +++ b/python/vyos/config.py @@ -66,17 +66,31 @@ In operational mode, all functions return values from the running config. import re import json from copy import deepcopy +from typing import Union import vyos.configtree -from vyos.xml_ref import multi_to_list, from_source -from vyos.xml_ref import merge_defaults, relative_defaults -from vyos.utils.dict import get_sub_dict, mangle_dict_keys -from vyos.configsource import ConfigSource, ConfigSourceSession +from vyos.xml_ref import multi_to_list +from vyos.xml_ref import from_source +from vyos.xml_ref import ext_dict_merge +from vyos.xml_ref import relative_defaults +from vyos.utils.dict import get_sub_dict +from vyos.utils.dict import mangle_dict_keys +from vyos.configsource import ConfigSource +from vyos.configsource import ConfigSourceSession class ConfigDict(dict): _from_defaults = {} - def from_defaults(self, path: list[str]): + _dict_kwargs = {} + def from_defaults(self, path: list[str]) -> bool: return from_source(self._from_defaults, path) + @property + def kwargs(self) -> dict: + return self._dict_kwargs + +def config_dict_merge(src: dict, dest: Union[dict, ConfigDict]) -> ConfigDict: + if not isinstance(dest, ConfigDict): + dest = ConfigDict(dest) + return ext_dict_merge(src, dest) class Config(object): """ @@ -229,6 +243,13 @@ class Config(object): return config_dict + def verify_mangling(self, key_mangling): + if not (isinstance(key_mangling, tuple) and \ + (len(key_mangling) == 2) and \ + isinstance(key_mangling[0], str) and \ + isinstance(key_mangling[1], str)): + raise ValueError("key_mangling must be a tuple of two strings") + def get_config_dict(self, path=[], effective=False, key_mangling=None, get_first_key=False, no_multi_convert=False, no_tag_node_value_mangle=False, @@ -243,44 +264,37 @@ class Config(object): Returns: a dict representation of the config under path """ + kwargs = locals().copy() + del kwargs['self'] + del kwargs['no_multi_convert'] + del kwargs['with_defaults'] + del kwargs['with_recursive_defaults'] + lpath = self._make_path(path) root_dict = self.get_cached_root_dict(effective) conf_dict = get_sub_dict(root_dict, lpath, get_first_key=get_first_key) - if key_mangling is None and no_multi_convert and not (with_defaults or with_recursive_defaults): - return deepcopy(conf_dict) - rpath = lpath if get_first_key else lpath[:-1] if not no_multi_convert: conf_dict = multi_to_list(rpath, conf_dict) + if key_mangling is not None: + self.verify_mangling(key_mangling) + conf_dict = mangle_dict_keys(conf_dict, + key_mangling[0], key_mangling[1], + abs_path=rpath, + no_tag_node_value_mangle=no_tag_node_value_mangle) + if with_defaults or with_recursive_defaults: + defaults = self.get_config_defaults(**kwargs, + recursive=with_recursive_defaults) + conf_dict = config_dict_merge(defaults, conf_dict) + else: conf_dict = ConfigDict(conf_dict) - conf_dict = merge_defaults(lpath, conf_dict, - get_first_key=get_first_key, - recursive=with_recursive_defaults) - if key_mangling is None: - return conf_dict - - if not (isinstance(key_mangling, tuple) and \ - (len(key_mangling) == 2) and \ - isinstance(key_mangling[0], str) and \ - isinstance(key_mangling[1], str)): - raise ValueError("key_mangling must be a tuple of two strings") - - def mangle(obj): - return mangle_dict_keys(obj, key_mangling[0], key_mangling[1], - abs_path=rpath, - no_tag_node_value_mangle=no_tag_node_value_mangle) - - if isinstance(conf_dict, ConfigDict): - from_defaults = mangle(conf_dict._from_defaults) - conf_dict = mangle(conf_dict) - conf_dict._from_defaults = from_defaults - else: - conf_dict = mangle(conf_dict) + # save optional args for a call to get_config_defaults + setattr(conf_dict, '_dict_kwargs', kwargs) return conf_dict @@ -294,21 +308,29 @@ class Config(object): defaults = relative_defaults(lpath, conf_dict, get_first_key=get_first_key, recursive=recursive) - if key_mangling is None: - return defaults rpath = lpath if get_first_key else lpath[:-1] - if not (isinstance(key_mangling, tuple) and \ - (len(key_mangling) == 2) and \ - isinstance(key_mangling[0], str) and \ - isinstance(key_mangling[1], str)): - raise ValueError("key_mangling must be a tuple of two strings") - - defaults = mangle_dict_keys(defaults, key_mangling[0], key_mangling[1], abs_path=rpath, no_tag_node_value_mangle=no_tag_node_value_mangle) + if key_mangling is not None: + self.verify_mangling(key_mangling) + defaults = mangle_dict_keys(defaults, + key_mangling[0], key_mangling[1], + abs_path=rpath, + no_tag_node_value_mangle=no_tag_node_value_mangle) return defaults + def merge_defaults(self, config_dict: ConfigDict, recursive=False): + if not isinstance(config_dict, ConfigDict): + raise TypeError('argument is not of type ConfigDict') + if not config_dict.kwargs: + raise ValueError('argument missing metadata') + + args = config_dict.kwargs + d = self.get_config_defaults(**args, recursive=recursive) + config_dict = config_dict_merge(d, config_dict) + return config_dict + def is_multi(self, path): """ Args: diff --git a/python/vyos/configtree.py b/python/vyos/configtree.py index e18d9817d..09cfd43d3 100644 --- a/python/vyos/configtree.py +++ b/python/vyos/configtree.py @@ -383,14 +383,16 @@ def union(left, right, libpath=LIBPATH): return tree def reference_tree_to_json(from_dir, to_file, libpath=LIBPATH): - __lib = cdll.LoadLibrary(libpath) - __reference_tree_to_json = __lib.reference_tree_to_json - __reference_tree_to_json.argtypes = [c_char_p, c_char_p] - __get_error = __lib.get_error - __get_error.argtypes = [] - __get_error.restype = c_char_p - - res = __reference_tree_to_json(from_dir.encode(), to_file.encode()) + try: + __lib = cdll.LoadLibrary(libpath) + __reference_tree_to_json = __lib.reference_tree_to_json + __reference_tree_to_json.argtypes = [c_char_p, c_char_p] + __get_error = __lib.get_error + __get_error.argtypes = [] + __get_error.restype = c_char_p + res = __reference_tree_to_json(from_dir.encode(), to_file.encode()) + except Exception as e: + raise ConfigTreeError(e) if res == 1: msg = __get_error().decode() raise ConfigTreeError(msg) diff --git a/python/vyos/nat.py b/python/vyos/nat.py index 418efe649..b6702f7e2 100644 --- a/python/vyos/nat.py +++ b/python/vyos/nat.py @@ -94,7 +94,7 @@ def parse_nat_rule(rule_conf, rule_id, nat_type, ipv6=False): if options: translation_str += f' {",".join(options)}' - if 'backend' in rule_conf['load_balance']: + if not ipv6 and 'backend' in rule_conf['load_balance']: hash_input_items = [] current_prob = 0 nat_map = [] diff --git a/python/vyos/pki.py b/python/vyos/pki.py index cd15e3878..792e24b76 100644 --- a/python/vyos/pki.py +++ b/python/vyos/pki.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2021 VyOS maintainers and contributors +# Copyright (C) 2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -63,6 +63,18 @@ private_format_map = { 'OpenSSH': serialization.PrivateFormat.OpenSSH } +hash_map = { + 'sha256': hashes.SHA256, + 'sha384': hashes.SHA384, + 'sha512': hashes.SHA512, +} + +def get_certificate_fingerprint(cert, hash): + hash_algorithm = hash_map[hash]() + fp = cert.fingerprint(hash_algorithm) + + return fp.hex(':').upper() + def encode_certificate(cert): return cert.public_bytes(encoding=serialization.Encoding.PEM).decode('utf-8') diff --git a/python/vyos/xml_ref/__init__.py b/python/vyos/xml_ref/__init__.py index ad2130dca..bf434865d 100644 --- a/python/vyos/xml_ref/__init__.py +++ b/python/vyos/xml_ref/__init__.py @@ -13,8 +13,12 @@ # You should have received a copy of the GNU Lesser General Public License # along with this library. If not, see <http://www.gnu.org/licenses/>. +from typing import Optional, Union, TYPE_CHECKING from vyos.xml_ref import definition +if TYPE_CHECKING: + from vyos.config import ConfigDict + def load_reference(cache=[]): if cache: return cache[0] @@ -23,11 +27,15 @@ def load_reference(cache=[]): try: from vyos.xml_ref.cache import reference - xml.define(reference) - cache.append(xml) except Exception: raise ImportError('no xml reference cache !!') + if not reference: + raise ValueError('empty xml reference cache !!') + + xml.define(reference) + cache.append(xml) + return xml def is_tag(path: list) -> bool: @@ -48,12 +56,12 @@ def is_leaf(path: list) -> bool: def cli_defined(path: list, node: str, non_local=False) -> bool: return load_reference().cli_defined(path, node, non_local=non_local) -def from_source(d: dict, path: list) -> bool: - return load_reference().from_source(d, path) - def component_version() -> dict: return load_reference().component_version() +def default_value(path: list) -> Optional[Union[str, list]]: + return load_reference().default_value(path) + def multi_to_list(rpath: list, conf: dict) -> dict: return load_reference().multi_to_list(rpath, conf) @@ -68,8 +76,8 @@ def relative_defaults(rpath: list, conf: dict, get_first_key=False, get_first_key=get_first_key, recursive=recursive) -def merge_defaults(path: list, conf: dict, get_first_key=False, - recursive=False) -> dict: - return load_reference().merge_defaults(path, conf, - get_first_key=get_first_key, - recursive=recursive) +def from_source(d: dict, path: list) -> bool: + return definition.from_source(d, path) + +def ext_dict_merge(source: dict, destination: Union[dict, 'ConfigDict']): + return definition.ext_dict_merge(source, destination) diff --git a/python/vyos/xml_ref/definition.py b/python/vyos/xml_ref/definition.py index d95d580e2..38e07f0a7 100644 --- a/python/vyos/xml_ref/definition.py +++ b/python/vyos/xml_ref/definition.py @@ -20,6 +20,45 @@ from typing import Optional, Union, Any, TYPE_CHECKING if TYPE_CHECKING: from vyos.config import ConfigDict +def set_source_recursive(o: Union[dict, str, list], b: bool): + d = {} + if not isinstance(o, dict): + d = {'_source': b} + else: + for k, v in o.items(): + d[k] = set_source_recursive(v, b) + d |= {'_source': b} + return d + +def source_dict_merge(src: dict, dest: dict): + from copy import deepcopy + dst = deepcopy(dest) + from_src = {} + + for key, value in src.items(): + if key not in dst: + dst[key] = value + from_src[key] = set_source_recursive(value, True) + elif isinstance(src[key], dict): + dst[key], f = source_dict_merge(src[key], dst[key]) + f |= {'_source': False} + from_src[key] = f + + return dst, from_src + +def ext_dict_merge(src: dict, dest: Union[dict, 'ConfigDict']): + d, f = source_dict_merge(src, dest) + if hasattr(d, '_from_defaults'): + setattr(d, '_from_defaults', f) + return d + +def from_source(d: dict, path: list) -> bool: + for key in path: + d = d[key] if key in d else {} + if not d or not isinstance(d, dict): + return False + return d.get('_source', False) + class Xml: def __init__(self): self.ref = {} @@ -153,6 +192,15 @@ class Xml: return default.split() return default + def default_value(self, path: list) -> Optional[Union[str, list]]: + d = self._get_ref_path(path) + default = self._get_default_value(d) + if default is None: + return None + if self._is_multi_node(d) or self._is_tag_node(d): + return default.split() + return default + def get_defaults(self, path: list, get_first_key=False, recursive=False) -> dict: """Return dict containing default values below path @@ -212,43 +260,6 @@ class Xml: return False return True - def _set_source_recursive(self, o: Union[dict, str, list], b: bool): - d = {} - if not isinstance(o, dict): - d = {'_source': b} - else: - for k, v in o.items(): - d[k] = self._set_source_recursive(v, b) - d |= {'_source': b} - return d - - # use local copy of function in module configdict, to avoid circular - # import - # - # extend dict_merge to keep track of keys only in source - def _dict_merge(self, source, destination): - from copy import deepcopy - dest = deepcopy(destination) - from_source = {} - - for key, value in source.items(): - if key not in dest: - dest[key] = value - from_source[key] = self._set_source_recursive(value, True) - elif isinstance(source[key], dict): - dest[key], f = self._dict_merge(source[key], dest[key]) - f |= {'_source': False} - from_source[key] = f - - return dest, from_source - - def from_source(self, d: dict, path: list) -> bool: - for key in path: - d = d[key] if key in d else {} - if not d or not isinstance(d, dict): - return False - return d.get('_source', False) - def _relative_defaults(self, rpath: list, conf: dict, recursive=False) -> dict: res: dict = {} res = self.get_defaults(rpath, recursive=recursive, @@ -289,17 +300,3 @@ class Xml: res = {} return res - - def merge_defaults(self, path: list, conf: Union[dict, 'ConfigDict'], - get_first_key=False, recursive=False) -> dict: - """Return config dict with defaults non-destructively merged - - This merges non-recursive defaults relative to the config dict. - """ - d = self.relative_defaults(path, conf, get_first_key=get_first_key, - recursive=recursive) - d, f = self._dict_merge(d, conf) - d = type(conf)(d) - if hasattr(d, '_from_defaults'): - setattr(d, '_from_defaults', f) - return d |