diff options
| -rw-r--r-- | python/vyos/xml_ref/__init__.py | 58 | ||||
| -rw-r--r-- | python/vyos/xml_ref/definition.py | 234 | 
2 files changed, 292 insertions, 0 deletions
| diff --git a/python/vyos/xml_ref/__init__.py b/python/vyos/xml_ref/__init__.py new file mode 100644 index 000000000..66525865c --- /dev/null +++ b/python/vyos/xml_ref/__init__.py @@ -0,0 +1,58 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this library.  If not, see <http://www.gnu.org/licenses/>. + +from vyos.xml_ref import definition + +def load_reference(cache=[]): +    if cache: +        return cache[0] + +    xml = definition.Xml() + +    try: +        from vyos.xml_ref.cache import reference +        xml.define(reference) +        cache.append(xml) +    except Exception: +        raise ImportError('no xml reference cache !!') + +    return xml + +def is_tag(path: list) -> bool: +    return load_reference().is_tag(path) + +def is_tag_value(path: list) -> bool: +    return load_reference().is_tag_value(path) + +def is_multi(path: list) -> bool: +    return load_reference().is_multi(path) + +def is_valueless(path: list) -> bool: +    return load_reference().is_valueless(path) + +def is_leaf(path: list) -> bool: +    return load_reference().is_leaf(path) + +def component_version() -> dict: +    return load_reference().component_version() + +def multi_to_list(rpath: list, conf: dict) -> dict: +    return load_reference().multi_to_list(rpath, conf) + +def get_defaults(path: list, get_first_key=False) -> dict: +    return load_reference().get_defaults(path, get_first_key=get_first_key) + +def merge_defaults(path: list, conf: dict) -> dict: +    return load_reference().merge_defaults(path, conf) diff --git a/python/vyos/xml_ref/definition.py b/python/vyos/xml_ref/definition.py new file mode 100644 index 000000000..64a33e4d0 --- /dev/null +++ b/python/vyos/xml_ref/definition.py @@ -0,0 +1,234 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this library.  If not, see <http://www.gnu.org/licenses/>. + +from typing import Union, Optional, Any +from vyos.configdict import dict_merge + +class Xml: +    def __init__(self): +        self.ref = {} + +    def define(self, ref: dict): +        self.ref = ref + +    def _get_ref_node_data(self, node: dict, data: str) -> Union[bool, str]: +        res = node.get('node_data', {}) +        if not res: +            raise ValueError("non-existent node data") +        if data not in res: +            raise ValueError("non-existent data field") + +        return res.get(data) + +    def _get_ref_path(self, path: list) -> dict: +        ref_path = path.copy() +        d = self.ref +        while ref_path and d: +            d = d.get(ref_path[0], {}) +            ref_path.pop(0) +            if self._is_tag_node(d) and ref_path: +                ref_path.pop(0) + +        return d + +    def _is_tag_node(self, node: dict) -> bool: +        res = self._get_ref_node_data(node, 'node_type') +        return res == 'tag' + +    def is_tag(self, path: list) -> bool: +        ref_path = path.copy() +        d = self.ref +        while ref_path and d: +            d = d.get(ref_path[0], {}) +            ref_path.pop(0) +            if self._is_tag_node(d) and ref_path: +                if len(ref_path) == 1: +                    return False +                ref_path.pop(0) + +        return self._is_tag_node(d) + +    def is_tag_value(self, path: list) -> bool: +        if len(path) < 2: +            return False + +        return self.is_tag(path[:-1]) + +    def _is_multi_node(self, node: dict) -> bool: +        b = self._get_ref_node_data(node, 'multi') +        assert isinstance(b, bool) +        return b + +    def is_multi(self, path: list) -> bool: +        d = self._get_ref_path(path) +        return  self._is_multi_node(d) + +    def _is_valueless_node(self, node: dict) -> bool: +        b = self._get_ref_node_data(node, 'valueless') +        assert isinstance(b, bool) +        return b + +    def is_valueless(self, path: list) -> bool: +        d = self._get_ref_path(path) +        return  self._is_valueless_node(d) + +    def _is_leaf_node(self, node: dict) -> bool: +        res = self._get_ref_node_data(node, 'node_type') +        return res == 'leaf' + +    def is_leaf(self, path: list) -> bool: +        d = self._get_ref_path(path) +        return self._is_leaf_node(d) + +    def component_version(self) -> dict: +        d = {} +        for k, v in self.ref['component_version']: +            d[k] = int(v) +        return d + +    def multi_to_list(self, rpath: list, conf: dict) -> dict: +        if rpath and rpath[-1] in list(conf): +            raise ValueError('rpath should be disjoint from conf keys') + +        res = {} + +        for k in list(conf): +            d = self._get_ref_path(rpath + [k]) +            if self._is_leaf_node(d): +                if self._is_multi_node(d) and not isinstance(conf[k], list): +                    res[k] = [conf[k]] +                else: +                    res[k] = conf[k] +            else: +                res[k] = self.multi_to_list(rpath + [k], conf[k]) + +        return res + +    def _get_default_value(self, node: dict): +        return self._get_ref_node_data(node, "default_value") + +    def get_defaults(self, path: list, get_first_key=False) -> dict: +        """Return dict containing default values below path + +        Note that descent below path will not proceed beyond an encountered +        tag node, as no tag node value is known. For a default dict relative +        to an existing config dict containing tag node values, see function: +        'relative_defaults' +        """ +        res: Any = {} +        d = self._get_ref_path(path) +        if self._is_leaf_node(d): +            default_value = self._get_default_value(d) +            if default_value is not None: +                res = default_value +                if self._is_multi_node(d) and not isinstance(res, list): +                    res = [res] +        elif self.is_tag(path): +            # tag node defaults are used as suggestion, not default value; +            # should this change, append to path and continue recursion +            pass +        else: +            for k in list(d): +                if k in ('node_data', 'component_version') : +                    continue +                pos = self.get_defaults(path + [k]) +                res |= pos +        if res: +            if get_first_key or not path: +                if not isinstance(res, dict): +                    raise TypeError("Cannot get_first_key as data under node is not of type dict") +                return res +            return {path[-1]: res} + +        return {} + +    def _relative_defaults(self, rpath: list, conf: Optional[dict] = None, +                          get_first_key=False) -> dict: +        # Return dict containing defaults along paths of a config dict + +        # Note that for conf argument {} or None, this function returns the +        # same result as 'get_defaults' above; for clarity, the functions +        # are individually defined. + +        if conf is None: +            conf = {} +        if rpath and rpath[-1] in list(conf): +            raise ValueError('rpath should be disjoint from conf keys') +        res: Any = {} +        d = self._get_ref_path(rpath) +        if self._is_leaf_node(d): +            default_value = self._get_default_value(d) +            if default_value is not None: +                res = default_value +                if self._is_multi_node(d) and not isinstance(res, list): +                    res = [res] +        elif self.is_tag(rpath): +            for k in list(conf): +                pos = self._relative_defaults(rpath + [k], conf[k]) +                res |= pos +        else: +            for k in list(d): +                if k in ('node_data', 'component_version') : +                    continue +                pos = self._relative_defaults(rpath + [k], conf.get(k, {})) +                res |= pos +        if res: +            if get_first_key or not rpath: +                if not isinstance(res, dict): +                    raise TypeError("Cannot get_first_key as data under node is not of type dict") +                return res +            return {rpath[-1]: res} + +        return {} + +    def _defines_config_path(self, path: list, conf: dict) -> bool: +        # test disjoint path + conf for sensible config paths +        def walk(c): +            return [next(iter(c.keys()))] if c else [] +        try: +            tmp = walk(conf) +            if self.is_tag_value(path + tmp): +                c = conf[tmp[0]] +                if not isinstance(c, dict): +                    raise ValueError +                tmp = tmp + walk(c) +                self._get_ref_path(path + tmp) +            else: +                self._get_ref_path(path + tmp) +        except ValueError: +            return False +        return True + +    def merge_defaults(self, path: list, conf: dict) -> dict: +        """Return config dict with defaults non-destructively merged +        """ +        if not path: +            path = [next(iter(conf.keys()))] +        if path[-1] in list(conf): +            config = conf[path[-1]] +            if not isinstance(config, dict): +                raise ValueError('conf at path is not of type dict') +            first = False +        else: +            config = conf +            first = True + +        if not self._defines_config_path(path, config): +            print('path + conf do not define config paths; conf returned unchanged') +            return conf + +        d = self._relative_defaults(path, conf=config, get_first_key=first) +        d = dict_merge(d, conf) +        return d | 
