diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/vyos/config.py | 102 | ||||
| -rw-r--r-- | python/vyos/xml_ref/__init__.py | 20 | ||||
| -rw-r--r-- | python/vyos/xml_ref/definition.py | 99 | 
3 files changed, 122 insertions, 99 deletions
| diff --git a/python/vyos/config.py b/python/vyos/config.py index 179f60c43..6fececd76 100644 --- a/python/vyos/config.py +++ b/python/vyos/config.py @@ -66,17 +66,31 @@ In operational mode, all functions return values from the running config.  import re  import json  from copy import deepcopy +from typing import Union  import vyos.configtree -from vyos.xml_ref import multi_to_list, from_source -from vyos.xml_ref import merge_defaults, relative_defaults -from vyos.utils.dict import get_sub_dict, mangle_dict_keys -from vyos.configsource import ConfigSource, ConfigSourceSession +from vyos.xml_ref import multi_to_list +from vyos.xml_ref import from_source +from vyos.xml_ref import ext_dict_merge +from vyos.xml_ref import relative_defaults +from vyos.utils.dict import get_sub_dict +from vyos.utils.dict import mangle_dict_keys +from vyos.configsource import ConfigSource +from vyos.configsource import ConfigSourceSession  class ConfigDict(dict):      _from_defaults = {} -    def from_defaults(self, path: list[str]): +    _dict_kwargs = {} +    def from_defaults(self, path: list[str]) -> bool:          return from_source(self._from_defaults, path) +    @property +    def kwargs(self) -> dict: +        return self._dict_kwargs + +def config_dict_merge(src: dict, dest: Union[dict, ConfigDict]) -> ConfigDict: +    if not isinstance(dest, ConfigDict): +        dest = ConfigDict(dest) +    return ext_dict_merge(src, dest)  class Config(object):      """ @@ -229,6 +243,13 @@ class Config(object):          return config_dict +    def verify_mangling(self, key_mangling): +        if not (isinstance(key_mangling, tuple) and \ +                (len(key_mangling) == 2) and \ +                isinstance(key_mangling[0], str) and \ +                isinstance(key_mangling[1], str)): +            raise ValueError("key_mangling must be a tuple of two strings") +      def get_config_dict(self, path=[], effective=False, key_mangling=None,                          get_first_key=False, no_multi_convert=False,                          no_tag_node_value_mangle=False, @@ -243,44 +264,37 @@ class Config(object):          Returns: a dict representation of the config under path          """ +        kwargs = locals().copy() +        del kwargs['self'] +        del kwargs['no_multi_convert'] +        del kwargs['with_defaults'] +        del kwargs['with_recursive_defaults'] +          lpath = self._make_path(path)          root_dict = self.get_cached_root_dict(effective)          conf_dict = get_sub_dict(root_dict, lpath, get_first_key=get_first_key) -        if key_mangling is None and no_multi_convert and not (with_defaults or with_recursive_defaults): -            return deepcopy(conf_dict) -          rpath = lpath if get_first_key else lpath[:-1]          if not no_multi_convert:              conf_dict = multi_to_list(rpath, conf_dict) +        if key_mangling is not None: +            self.verify_mangling(key_mangling) +            conf_dict = mangle_dict_keys(conf_dict, +                                         key_mangling[0], key_mangling[1], +                                         abs_path=rpath, +                                         no_tag_node_value_mangle=no_tag_node_value_mangle) +          if with_defaults or with_recursive_defaults: +            defaults = self.get_config_defaults(**kwargs, +                                                recursive=with_recursive_defaults) +            conf_dict = config_dict_merge(defaults, conf_dict) +        else:              conf_dict = ConfigDict(conf_dict) -            conf_dict = merge_defaults(lpath, conf_dict, -                                       get_first_key=get_first_key, -                                       recursive=with_recursive_defaults) -        if key_mangling is None: -            return conf_dict - -        if not (isinstance(key_mangling, tuple) and \ -                (len(key_mangling) == 2) and \ -                isinstance(key_mangling[0], str) and \ -                isinstance(key_mangling[1], str)): -            raise ValueError("key_mangling must be a tuple of two strings") - -        def mangle(obj): -            return mangle_dict_keys(obj, key_mangling[0], key_mangling[1], -                                    abs_path=rpath, -                                    no_tag_node_value_mangle=no_tag_node_value_mangle) - -        if isinstance(conf_dict, ConfigDict): -            from_defaults = mangle(conf_dict._from_defaults) -            conf_dict = mangle(conf_dict) -            conf_dict._from_defaults = from_defaults -        else: -            conf_dict = mangle(conf_dict) +        # save optional args for a call to get_config_defaults +        setattr(conf_dict, '_dict_kwargs', kwargs)          return conf_dict @@ -294,21 +308,29 @@ class Config(object):          defaults = relative_defaults(lpath, conf_dict,                                       get_first_key=get_first_key,                                       recursive=recursive) -        if key_mangling is None: -            return defaults          rpath = lpath if get_first_key else lpath[:-1] -        if not (isinstance(key_mangling, tuple) and \ -                (len(key_mangling) == 2) and \ -                isinstance(key_mangling[0], str) and \ -                isinstance(key_mangling[1], str)): -            raise ValueError("key_mangling must be a tuple of two strings") - -        defaults = mangle_dict_keys(defaults, key_mangling[0], key_mangling[1], abs_path=rpath, no_tag_node_value_mangle=no_tag_node_value_mangle) +        if key_mangling is not None: +            self.verify_mangling(key_mangling) +            defaults = mangle_dict_keys(defaults, +                                        key_mangling[0], key_mangling[1], +                                        abs_path=rpath, +                                        no_tag_node_value_mangle=no_tag_node_value_mangle)          return defaults +    def merge_defaults(self, config_dict: ConfigDict, recursive=False): +        if not isinstance(config_dict, ConfigDict): +            raise TypeError('argument is not of type ConfigDict') +        if not config_dict.kwargs: +            raise ValueError('argument missing metadata') + +        args = config_dict.kwargs +        d = self.get_config_defaults(**args, recursive=recursive) +        config_dict = config_dict_merge(d, config_dict) +        return config_dict +      def is_multi(self, path):          """          Args: diff --git a/python/vyos/xml_ref/__init__.py b/python/vyos/xml_ref/__init__.py index ad2130dca..02bbaffd8 100644 --- a/python/vyos/xml_ref/__init__.py +++ b/python/vyos/xml_ref/__init__.py @@ -13,8 +13,12 @@  # You should have received a copy of the GNU Lesser General Public License  # along with this library.  If not, see <http://www.gnu.org/licenses/>. +from typing import Optional, Union, TYPE_CHECKING  from vyos.xml_ref import definition +if TYPE_CHECKING: +    from vyos.config import ConfigDict +  def load_reference(cache=[]):      if cache:          return cache[0] @@ -48,12 +52,12 @@ def is_leaf(path: list) -> bool:  def cli_defined(path: list, node: str, non_local=False) -> bool:      return load_reference().cli_defined(path, node, non_local=non_local) -def from_source(d: dict, path: list) -> bool: -    return load_reference().from_source(d, path) -  def component_version() -> dict:      return load_reference().component_version() +def default_value(path: list) -> Optional[Union[str, list]]: +    return load_reference().default_value(path) +  def multi_to_list(rpath: list, conf: dict) -> dict:      return load_reference().multi_to_list(rpath, conf) @@ -68,8 +72,8 @@ def relative_defaults(rpath: list, conf: dict, get_first_key=False,                                                get_first_key=get_first_key,                                                recursive=recursive) -def merge_defaults(path: list, conf: dict, get_first_key=False, -                   recursive=False) -> dict: -    return load_reference().merge_defaults(path, conf, -                                           get_first_key=get_first_key, -                                           recursive=recursive) +def from_source(d: dict, path: list) -> bool: +    return definition.from_source(d, path) + +def ext_dict_merge(source: dict, destination: Union[dict, 'ConfigDict']): +    return definition.ext_dict_merge(source, destination) diff --git a/python/vyos/xml_ref/definition.py b/python/vyos/xml_ref/definition.py index d95d580e2..38e07f0a7 100644 --- a/python/vyos/xml_ref/definition.py +++ b/python/vyos/xml_ref/definition.py @@ -20,6 +20,45 @@ from typing import Optional, Union, Any, TYPE_CHECKING  if TYPE_CHECKING:      from vyos.config import ConfigDict +def set_source_recursive(o: Union[dict, str, list], b: bool): +    d = {} +    if not isinstance(o, dict): +        d = {'_source': b} +    else: +        for k, v in o.items(): +            d[k] = set_source_recursive(v, b) +        d |= {'_source': b} +    return d + +def source_dict_merge(src: dict, dest: dict): +    from copy import deepcopy +    dst = deepcopy(dest) +    from_src = {} + +    for key, value in src.items(): +        if key not in dst: +            dst[key] = value +            from_src[key] = set_source_recursive(value, True) +        elif isinstance(src[key], dict): +            dst[key], f = source_dict_merge(src[key], dst[key]) +            f |= {'_source': False} +            from_src[key] = f + +    return dst, from_src + +def ext_dict_merge(src: dict, dest: Union[dict, 'ConfigDict']): +    d, f = source_dict_merge(src, dest) +    if hasattr(d, '_from_defaults'): +        setattr(d, '_from_defaults', f) +    return d + +def from_source(d: dict, path: list) -> bool: +    for key in path: +        d  = d[key] if key in d else {} +        if not d or not isinstance(d, dict): +            return False +    return d.get('_source', False) +  class Xml:      def __init__(self):          self.ref = {} @@ -153,6 +192,15 @@ class Xml:              return default.split()          return default +    def default_value(self, path: list) -> Optional[Union[str, list]]: +        d = self._get_ref_path(path) +        default = self._get_default_value(d) +        if default is None: +            return None +        if self._is_multi_node(d) or self._is_tag_node(d): +            return default.split() +        return default +      def get_defaults(self, path: list, get_first_key=False, recursive=False) -> dict:          """Return dict containing default values below path @@ -212,43 +260,6 @@ class Xml:              return False          return True -    def _set_source_recursive(self, o: Union[dict, str, list], b: bool): -        d = {} -        if not isinstance(o, dict): -            d = {'_source': b} -        else: -            for k, v in o.items(): -                d[k] = self._set_source_recursive(v, b) -            d |= {'_source': b} -        return d - -    # use local copy of function in module configdict, to avoid circular -    # import -    # -    # extend dict_merge to keep track of keys only in source -    def _dict_merge(self, source, destination): -        from copy import deepcopy -        dest = deepcopy(destination) -        from_source = {} - -        for key, value in source.items(): -            if key not in dest: -                dest[key] = value -                from_source[key] = self._set_source_recursive(value, True) -            elif isinstance(source[key], dict): -                dest[key], f = self._dict_merge(source[key], dest[key]) -                f |= {'_source': False} -                from_source[key] = f - -        return dest, from_source - -    def from_source(self, d: dict, path: list) -> bool: -        for key in path: -            d  = d[key] if key in d else {} -            if not d or not isinstance(d, dict): -                return False -        return d.get('_source', False) -      def _relative_defaults(self, rpath: list, conf: dict, recursive=False) -> dict:          res: dict = {}          res = self.get_defaults(rpath, recursive=recursive, @@ -289,17 +300,3 @@ class Xml:                  res = {}          return res - -    def merge_defaults(self, path: list, conf: Union[dict, 'ConfigDict'], -                       get_first_key=False, recursive=False) -> dict: -        """Return config dict with defaults non-destructively merged - -        This merges non-recursive defaults relative to the config dict. -        """ -        d = self.relative_defaults(path, conf, get_first_key=get_first_key, -                                   recursive=recursive) -        d, f = self._dict_merge(d, conf) -        d = type(conf)(d) -        if hasattr(d, '_from_defaults'): -            setattr(d, '_from_defaults', f) -        return d | 
