summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Estabrook <jestabro@vyos.io>2023-08-05 19:16:23 -0500
committerJohn Estabrook <jestabro@vyos.io>2023-08-07 00:48:31 -0500
commit0e4d09aea4caf13e0f994cbb042a0dbf98719820 (patch)
treef7c4526938127ebaa17960b1c1fcdd4c0091c3c6
parentecc00da8c5e303af006052a00f904d187c90bb4b (diff)
downloadvyos-1x-0e4d09aea4caf13e0f994cbb042a0dbf98719820.tar.gz
vyos-1x-0e4d09aea4caf13e0f994cbb042a0dbf98719820.zip
config: T5443: add config merge_defaults method
Drop low-level merge_defaults function in favor of Config method for a middle-grained level of control when merging defaults.
-rw-r--r--python/vyos/config.py102
-rw-r--r--python/vyos/xml_ref/__init__.py18
-rw-r--r--python/vyos/xml_ref/definition.py90
3 files changed, 110 insertions, 100 deletions
diff --git a/python/vyos/config.py b/python/vyos/config.py
index 179f60c43..6fececd76 100644
--- a/python/vyos/config.py
+++ b/python/vyos/config.py
@@ -66,17 +66,31 @@ In operational mode, all functions return values from the running config.
import re
import json
from copy import deepcopy
+from typing import Union
import vyos.configtree
-from vyos.xml_ref import multi_to_list, from_source
-from vyos.xml_ref import merge_defaults, relative_defaults
-from vyos.utils.dict import get_sub_dict, mangle_dict_keys
-from vyos.configsource import ConfigSource, ConfigSourceSession
+from vyos.xml_ref import multi_to_list
+from vyos.xml_ref import from_source
+from vyos.xml_ref import ext_dict_merge
+from vyos.xml_ref import relative_defaults
+from vyos.utils.dict import get_sub_dict
+from vyos.utils.dict import mangle_dict_keys
+from vyos.configsource import ConfigSource
+from vyos.configsource import ConfigSourceSession
class ConfigDict(dict):
_from_defaults = {}
- def from_defaults(self, path: list[str]):
+ _dict_kwargs = {}
+ def from_defaults(self, path: list[str]) -> bool:
return from_source(self._from_defaults, path)
+ @property
+ def kwargs(self) -> dict:
+ return self._dict_kwargs
+
+def config_dict_merge(src: dict, dest: Union[dict, ConfigDict]) -> ConfigDict:
+ if not isinstance(dest, ConfigDict):
+ dest = ConfigDict(dest)
+ return ext_dict_merge(src, dest)
class Config(object):
"""
@@ -229,6 +243,13 @@ class Config(object):
return config_dict
+ def verify_mangling(self, key_mangling):
+ if not (isinstance(key_mangling, tuple) and \
+ (len(key_mangling) == 2) and \
+ isinstance(key_mangling[0], str) and \
+ isinstance(key_mangling[1], str)):
+ raise ValueError("key_mangling must be a tuple of two strings")
+
def get_config_dict(self, path=[], effective=False, key_mangling=None,
get_first_key=False, no_multi_convert=False,
no_tag_node_value_mangle=False,
@@ -243,44 +264,37 @@ class Config(object):
Returns: a dict representation of the config under path
"""
+ kwargs = locals().copy()
+ del kwargs['self']
+ del kwargs['no_multi_convert']
+ del kwargs['with_defaults']
+ del kwargs['with_recursive_defaults']
+
lpath = self._make_path(path)
root_dict = self.get_cached_root_dict(effective)
conf_dict = get_sub_dict(root_dict, lpath, get_first_key=get_first_key)
- if key_mangling is None and no_multi_convert and not (with_defaults or with_recursive_defaults):
- return deepcopy(conf_dict)
-
rpath = lpath if get_first_key else lpath[:-1]
if not no_multi_convert:
conf_dict = multi_to_list(rpath, conf_dict)
+ if key_mangling is not None:
+ self.verify_mangling(key_mangling)
+ conf_dict = mangle_dict_keys(conf_dict,
+ key_mangling[0], key_mangling[1],
+ abs_path=rpath,
+ no_tag_node_value_mangle=no_tag_node_value_mangle)
+
if with_defaults or with_recursive_defaults:
+ defaults = self.get_config_defaults(**kwargs,
+ recursive=with_recursive_defaults)
+ conf_dict = config_dict_merge(defaults, conf_dict)
+ else:
conf_dict = ConfigDict(conf_dict)
- conf_dict = merge_defaults(lpath, conf_dict,
- get_first_key=get_first_key,
- recursive=with_recursive_defaults)
- if key_mangling is None:
- return conf_dict
-
- if not (isinstance(key_mangling, tuple) and \
- (len(key_mangling) == 2) and \
- isinstance(key_mangling[0], str) and \
- isinstance(key_mangling[1], str)):
- raise ValueError("key_mangling must be a tuple of two strings")
-
- def mangle(obj):
- return mangle_dict_keys(obj, key_mangling[0], key_mangling[1],
- abs_path=rpath,
- no_tag_node_value_mangle=no_tag_node_value_mangle)
-
- if isinstance(conf_dict, ConfigDict):
- from_defaults = mangle(conf_dict._from_defaults)
- conf_dict = mangle(conf_dict)
- conf_dict._from_defaults = from_defaults
- else:
- conf_dict = mangle(conf_dict)
+ # save optional args for a call to get_config_defaults
+ setattr(conf_dict, '_dict_kwargs', kwargs)
return conf_dict
@@ -294,21 +308,29 @@ class Config(object):
defaults = relative_defaults(lpath, conf_dict,
get_first_key=get_first_key,
recursive=recursive)
- if key_mangling is None:
- return defaults
rpath = lpath if get_first_key else lpath[:-1]
- if not (isinstance(key_mangling, tuple) and \
- (len(key_mangling) == 2) and \
- isinstance(key_mangling[0], str) and \
- isinstance(key_mangling[1], str)):
- raise ValueError("key_mangling must be a tuple of two strings")
-
- defaults = mangle_dict_keys(defaults, key_mangling[0], key_mangling[1], abs_path=rpath, no_tag_node_value_mangle=no_tag_node_value_mangle)
+ if key_mangling is not None:
+ self.verify_mangling(key_mangling)
+ defaults = mangle_dict_keys(defaults,
+ key_mangling[0], key_mangling[1],
+ abs_path=rpath,
+ no_tag_node_value_mangle=no_tag_node_value_mangle)
return defaults
+ def merge_defaults(self, config_dict: ConfigDict, recursive=False):
+ if not isinstance(config_dict, ConfigDict):
+ raise TypeError('argument is not of type ConfigDict')
+ if not config_dict.kwargs:
+ raise ValueError('argument missing metadata')
+
+ args = config_dict.kwargs
+ d = self.get_config_defaults(**args, recursive=recursive)
+ config_dict = config_dict_merge(d, config_dict)
+ return config_dict
+
def is_multi(self, path):
"""
Args:
diff --git a/python/vyos/xml_ref/__init__.py b/python/vyos/xml_ref/__init__.py
index 9f7a31e2c..02bbaffd8 100644
--- a/python/vyos/xml_ref/__init__.py
+++ b/python/vyos/xml_ref/__init__.py
@@ -13,9 +13,12 @@
# You should have received a copy of the GNU Lesser General Public License
# along with this library. If not, see <http://www.gnu.org/licenses/>.
-from typing import Optional, Union
+from typing import Optional, Union, TYPE_CHECKING
from vyos.xml_ref import definition
+if TYPE_CHECKING:
+ from vyos.config import ConfigDict
+
def load_reference(cache=[]):
if cache:
return cache[0]
@@ -49,9 +52,6 @@ def is_leaf(path: list) -> bool:
def cli_defined(path: list, node: str, non_local=False) -> bool:
return load_reference().cli_defined(path, node, non_local=non_local)
-def from_source(d: dict, path: list) -> bool:
- return load_reference().from_source(d, path)
-
def component_version() -> dict:
return load_reference().component_version()
@@ -72,8 +72,8 @@ def relative_defaults(rpath: list, conf: dict, get_first_key=False,
get_first_key=get_first_key,
recursive=recursive)
-def merge_defaults(path: list, conf: dict, get_first_key=False,
- recursive=False) -> dict:
- return load_reference().merge_defaults(path, conf,
- get_first_key=get_first_key,
- recursive=recursive)
+def from_source(d: dict, path: list) -> bool:
+ return definition.from_source(d, path)
+
+def ext_dict_merge(source: dict, destination: Union[dict, 'ConfigDict']):
+ return definition.ext_dict_merge(source, destination)
diff --git a/python/vyos/xml_ref/definition.py b/python/vyos/xml_ref/definition.py
index b0b8a2605..38e07f0a7 100644
--- a/python/vyos/xml_ref/definition.py
+++ b/python/vyos/xml_ref/definition.py
@@ -20,6 +20,45 @@ from typing import Optional, Union, Any, TYPE_CHECKING
if TYPE_CHECKING:
from vyos.config import ConfigDict
+def set_source_recursive(o: Union[dict, str, list], b: bool):
+ d = {}
+ if not isinstance(o, dict):
+ d = {'_source': b}
+ else:
+ for k, v in o.items():
+ d[k] = set_source_recursive(v, b)
+ d |= {'_source': b}
+ return d
+
+def source_dict_merge(src: dict, dest: dict):
+ from copy import deepcopy
+ dst = deepcopy(dest)
+ from_src = {}
+
+ for key, value in src.items():
+ if key not in dst:
+ dst[key] = value
+ from_src[key] = set_source_recursive(value, True)
+ elif isinstance(src[key], dict):
+ dst[key], f = source_dict_merge(src[key], dst[key])
+ f |= {'_source': False}
+ from_src[key] = f
+
+ return dst, from_src
+
+def ext_dict_merge(src: dict, dest: Union[dict, 'ConfigDict']):
+ d, f = source_dict_merge(src, dest)
+ if hasattr(d, '_from_defaults'):
+ setattr(d, '_from_defaults', f)
+ return d
+
+def from_source(d: dict, path: list) -> bool:
+ for key in path:
+ d = d[key] if key in d else {}
+ if not d or not isinstance(d, dict):
+ return False
+ return d.get('_source', False)
+
class Xml:
def __init__(self):
self.ref = {}
@@ -221,43 +260,6 @@ class Xml:
return False
return True
- def _set_source_recursive(self, o: Union[dict, str, list], b: bool):
- d = {}
- if not isinstance(o, dict):
- d = {'_source': b}
- else:
- for k, v in o.items():
- d[k] = self._set_source_recursive(v, b)
- d |= {'_source': b}
- return d
-
- # use local copy of function in module configdict, to avoid circular
- # import
- #
- # extend dict_merge to keep track of keys only in source
- def _dict_merge(self, source, destination):
- from copy import deepcopy
- dest = deepcopy(destination)
- from_source = {}
-
- for key, value in source.items():
- if key not in dest:
- dest[key] = value
- from_source[key] = self._set_source_recursive(value, True)
- elif isinstance(source[key], dict):
- dest[key], f = self._dict_merge(source[key], dest[key])
- f |= {'_source': False}
- from_source[key] = f
-
- return dest, from_source
-
- def from_source(self, d: dict, path: list) -> bool:
- for key in path:
- d = d[key] if key in d else {}
- if not d or not isinstance(d, dict):
- return False
- return d.get('_source', False)
-
def _relative_defaults(self, rpath: list, conf: dict, recursive=False) -> dict:
res: dict = {}
res = self.get_defaults(rpath, recursive=recursive,
@@ -298,17 +300,3 @@ class Xml:
res = {}
return res
-
- def merge_defaults(self, path: list, conf: Union[dict, 'ConfigDict'],
- get_first_key=False, recursive=False) -> dict:
- """Return config dict with defaults non-destructively merged
-
- This merges non-recursive defaults relative to the config dict.
- """
- d = self.relative_defaults(path, conf, get_first_key=get_first_key,
- recursive=recursive)
- d, f = self._dict_merge(d, conf)
- d = type(conf)(d)
- if hasattr(d, '_from_defaults'):
- setattr(d, '_from_defaults', f)
- return d