diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/vyos/configtree.py | 13 | ||||
| -rw-r--r-- | python/vyos/xml_ref/__init__.py | 66 | ||||
| -rw-r--r-- | python/vyos/xml_ref/definition.py | 231 | ||||
| -rwxr-xr-x | python/vyos/xml_ref/generate_cache.py | 94 | 
4 files changed, 404 insertions, 0 deletions
| diff --git a/python/vyos/configtree.py b/python/vyos/configtree.py index 9308bdde4..19b9838d4 100644 --- a/python/vyos/configtree.py +++ b/python/vyos/configtree.py @@ -373,6 +373,19 @@ def union(left, right, libpath=LIBPATH):      return tree +def reference_tree_to_json(from_dir, to_file, libpath=LIBPATH): +    __lib = cdll.LoadLibrary(libpath) +    __reference_tree_to_json = __lib.reference_tree_to_json +    __reference_tree_to_json.argtypes = [c_char_p, c_char_p] +    __get_error = __lib.get_error +    __get_error.argtypes = [] +    __get_error.restype = c_char_p + +    res = __reference_tree_to_json(from_dir.encode(), to_file.encode()) +    if res == 1: +        msg = __get_error().decode() +        raise ConfigTreeError(msg) +  class DiffTree:      def __init__(self, left, right, path=[], libpath=LIBPATH):          if left is None: diff --git a/python/vyos/xml_ref/__init__.py b/python/vyos/xml_ref/__init__.py new file mode 100644 index 000000000..ae5184746 --- /dev/null +++ b/python/vyos/xml_ref/__init__.py @@ -0,0 +1,66 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this library.  If not, see <http://www.gnu.org/licenses/>. + +from vyos.xml_ref import definition + +def load_reference(cache=[]): +    if cache: +        return cache[0] + +    xml = definition.Xml() + +    try: +        from vyos.xml_ref.cache import reference +        xml.define(reference) +        cache.append(xml) +    except Exception: +        raise ImportError('no xml reference cache !!') + +    return xml + +def is_tag(path: list) -> bool: +    return load_reference().is_tag(path) + +def is_tag_value(path: list) -> bool: +    return load_reference().is_tag_value(path) + +def is_multi(path: list) -> bool: +    return load_reference().is_multi(path) + +def is_valueless(path: list) -> bool: +    return load_reference().is_valueless(path) + +def is_leaf(path: list) -> bool: +    return load_reference().is_leaf(path) + +def component_version() -> dict: +    return load_reference().component_version() + +def multi_to_list(rpath: list, conf: dict) -> dict: +    return load_reference().multi_to_list(rpath, conf) + +def get_defaults(path: list, get_first_key=False, recursive=False) -> dict: +    return load_reference().get_defaults(path, get_first_key=get_first_key, +                                         recursive=recursive) + +def get_config_defaults(rpath: list, conf: dict, get_first_key=False, +                        recursive=False) -> dict: + +    return load_reference().relative_defaults(rpath, conf=conf, +                                              get_first_key=get_first_key, +                                              recursive=recursive) + +def merge_defaults(path: list, conf: dict) -> dict: +    return load_reference().merge_defaults(path, conf) diff --git a/python/vyos/xml_ref/definition.py b/python/vyos/xml_ref/definition.py new file mode 100644 index 000000000..429331577 --- /dev/null +++ b/python/vyos/xml_ref/definition.py @@ -0,0 +1,231 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this library.  If not, see <http://www.gnu.org/licenses/>. + +from typing import Union, Any +from vyos.configdict import dict_merge + +class Xml: +    def __init__(self): +        self.ref = {} + +    def define(self, ref: dict): +        self.ref = ref + +    def _get_ref_node_data(self, node: dict, data: str) -> Union[bool, str]: +        res = node.get('node_data', {}) +        if not res: +            raise ValueError("non-existent node data") +        if data not in res: +            raise ValueError("non-existent data field") + +        return res.get(data) + +    def _get_ref_path(self, path: list) -> dict: +        ref_path = path.copy() +        d = self.ref +        while ref_path and d: +            d = d.get(ref_path[0], {}) +            ref_path.pop(0) +            if self._is_tag_node(d) and ref_path: +                ref_path.pop(0) + +        return d + +    def _is_tag_node(self, node: dict) -> bool: +        res = self._get_ref_node_data(node, 'node_type') +        return res == 'tag' + +    def is_tag(self, path: list) -> bool: +        ref_path = path.copy() +        d = self.ref +        while ref_path and d: +            d = d.get(ref_path[0], {}) +            ref_path.pop(0) +            if self._is_tag_node(d) and ref_path: +                if len(ref_path) == 1: +                    return False +                ref_path.pop(0) + +        return self._is_tag_node(d) + +    def is_tag_value(self, path: list) -> bool: +        if len(path) < 2: +            return False + +        return self.is_tag(path[:-1]) + +    def _is_multi_node(self, node: dict) -> bool: +        b = self._get_ref_node_data(node, 'multi') +        assert isinstance(b, bool) +        return b + +    def is_multi(self, path: list) -> bool: +        d = self._get_ref_path(path) +        return  self._is_multi_node(d) + +    def _is_valueless_node(self, node: dict) -> bool: +        b = self._get_ref_node_data(node, 'valueless') +        assert isinstance(b, bool) +        return b + +    def is_valueless(self, path: list) -> bool: +        d = self._get_ref_path(path) +        return  self._is_valueless_node(d) + +    def _is_leaf_node(self, node: dict) -> bool: +        res = self._get_ref_node_data(node, 'node_type') +        return res == 'leaf' + +    def is_leaf(self, path: list) -> bool: +        d = self._get_ref_path(path) +        return self._is_leaf_node(d) + +    def component_version(self) -> dict: +        d = {} +        for k, v in self.ref['component_version']: +            d[k] = int(v) +        return d + +    def multi_to_list(self, rpath: list, conf: dict) -> dict: +        if rpath and rpath[-1] in list(conf): +            raise ValueError('rpath should be disjoint from conf keys') + +        res: Any = {} + +        for k in list(conf): +            d = self._get_ref_path(rpath + [k]) +            if self._is_leaf_node(d): +                if self._is_multi_node(d) and not isinstance(conf[k], list): +                    res[k] = [conf[k]] +                else: +                    res[k] = conf[k] +            else: +                res[k] = self.multi_to_list(rpath + [k], conf[k]) + +        return res + +    def _get_default_value(self, node: dict): +        return self._get_ref_node_data(node, "default_value") + +    def get_defaults(self, path: list, get_first_key=False, recursive=False) -> dict: +        """Return dict containing default values below path + +        Note that descent below path will not proceed beyond an encountered +        tag node, as no tag node value is known. For a default dict relative +        to an existing config dict containing tag node values, see function: +        'relative_defaults' +        """ +        res: dict = {} +        d = self._get_ref_path(path) +        for k in list(d): +            if k in ('node_data', 'component_version') : +                continue +            d_k = d[k] +            if self._is_leaf_node(d_k): +                default_value = self._get_default_value(d_k) +                if default_value is not None: +                    pos = default_value +                    if self._is_multi_node(d_k) and not isinstance(pos, list): +                        pos = [pos] +                    res |= {k: pos} +            elif self.is_tag(path + [k]): +                # tag node defaults are used as suggestion, not default value; +                # should this change, append to path and continue if recursive +                pass +            else: +                if recursive: +                    pos = self.get_defaults(path + [k], recursive=True) +                    res |= pos +        if res: +            if get_first_key or not path: +                if not isinstance(res, dict): +                    raise TypeError("Cannot get_first_key as data under node is not of type dict") +                return res +            return {path[-1]: res} + +        return {} + +    def _well_defined(self, path: list, conf: dict) -> bool: +        # test disjoint path + conf for sensible config paths +        def step(c): +            return [next(iter(c.keys()))] if c else [] +        try: +            tmp = step(conf) +            if self.is_tag_value(path + tmp): +                c = conf[tmp[0]] +                if not isinstance(c, dict): +                    raise ValueError +                tmp = tmp + step(c) +                self._get_ref_path(path + tmp) +            else: +                self._get_ref_path(path + tmp) +        except ValueError: +            return False +        return True + +    def relative_defaults(self, rpath: list, conf: dict, get_first_key=False, +                          recursive=False) -> dict: +        """Return dict containing defaults along paths of a config dict +        """ +        if not conf: +            return self.get_defaults(rpath, get_first_key=get_first_key, +                                     recursive=recursive) +        if rpath and rpath[-1] in list(conf): +            conf = conf[rpath[-1]] +            if not isinstance(conf, dict): +                raise TypeError('conf at path is not of type dict') + +        if not self._well_defined(rpath, conf): +            print('path to config dict does not define full config paths') +            return {} + +        res: dict = {} +        for k in list(conf): +            pos = self.get_defaults(rpath + [k], recursive=recursive) +            res |= pos + +            if isinstance(conf[k], dict): +                step = self.relative_defaults(rpath + [k], conf=conf[k], +                                              recursive=recursive) +                res |= step + +        if res: +            if get_first_key: +                return res +            return {rpath[-1]: res} if rpath else res + +        return {} + +    def merge_defaults(self, path: list, conf: dict) -> dict: +        """Return config dict with defaults non-destructively merged + +        This merges non-recursive defaults relative to the config dict. +        """ +        if path[-1] in list(conf): +            config = conf[path[-1]] +            if not isinstance(config, dict): +                raise TypeError('conf at path is not of type dict') +            shift = False +        else: +            config = conf +            shift = True + +        if not self._well_defined(path, config): +            print('path to config dict does not define config paths; conf returned unchanged') +            return conf + +        d = self.relative_defaults(path, conf=config, get_first_key=shift) +        d = dict_merge(d, conf) +        return d diff --git a/python/vyos/xml_ref/generate_cache.py b/python/vyos/xml_ref/generate_cache.py new file mode 100755 index 000000000..792c6eea7 --- /dev/null +++ b/python/vyos/xml_ref/generate_cache.py @@ -0,0 +1,94 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2023 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program.  If not, see <http://www.gnu.org/licenses/>. +# +# + +import sys +import json +import argparse +from os.path import join +from os.path import abspath +from os.path import dirname +from xmltodict import parse + +_here = dirname(__file__) + +sys.path.append(join(_here, '..')) +from configtree import reference_tree_to_json, ConfigTreeError + +xml_cache = abspath(join(_here, 'cache.py')) +xml_cache_json = 'xml_cache.json' +xml_tmp = join('/tmp', xml_cache_json) + +node_data_fields = ("node_type", "multi", "valueless", "default_value") + +def trim_node_data(cache: dict): +    for k in list(cache): +        if k == "node_data": +            for l in list(cache[k]): +                if l not in node_data_fields: +                    del cache[k][l] +        else: +            if isinstance(cache[k], dict): +                trim_node_data(cache[k]) + +def main(): +    parser = argparse.ArgumentParser(description='generate and save dict from xml defintions') +    parser.add_argument('--xml-dir', type=str, required=True, +                        help='transcluded xml interface-definition directory') +    parser.add_argument('--save-json-dir', type=str, +                        help='directory to save json cache if needed') +    args = parser.parse_args() + +    xml_dir = abspath(args.xml_dir) +    save_dir = abspath(args.save_json_dir) if args.save_json_dir else None + +    try: +        reference_tree_to_json(xml_dir, xml_tmp) +    except ConfigTreeError as e: +        print(e) +        sys.exit(1) + +    with open(xml_tmp) as f: +        d = json.loads(f.read()) + +    trim_node_data(d) + +    if save_dir is not None: +        save_file = join(save_dir, xml_cache_json) +        with open(save_file, 'w') as f: +            f.write(json.dumps(d)) + +    syntax_version = join(xml_dir, 'xml-component-version.xml') +    with open(syntax_version) as f: +        content = f.read() + +    parsed = parse(content) +    converted = parsed['interfaceDefinition']['syntaxVersion'] +    version = {} +    for i in converted: +        tmp = {i['@component']: i['@version']} +        version |= tmp + +    version = {"component_version": version} + +    d |= version + +    with open(xml_cache, 'w') as f: +        f.write(f'reference = {str(d)}') + +if __name__ == '__main__': +    main() | 
