diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/vyos/xml_ref/__init__.py | 28 | ||||
-rwxr-xr-x | python/vyos/xml_ref/generate_op_cache.py | 134 | ||||
-rw-r--r-- | python/vyos/xml_ref/op_definition.py | 181 |
3 files changed, 294 insertions, 49 deletions
diff --git a/python/vyos/xml_ref/__init__.py b/python/vyos/xml_ref/__init__.py index 99d8432d2..cd50a3ec2 100644 --- a/python/vyos/xml_ref/__init__.py +++ b/python/vyos/xml_ref/__init__.py @@ -14,6 +14,8 @@ # along with this library. If not, see <http://www.gnu.org/licenses/>. from typing import Optional, Union, TYPE_CHECKING +from typing import Callable +from typing import Any from vyos.xml_ref import definition from vyos.xml_ref import op_definition @@ -89,6 +91,7 @@ def from_source(d: dict, path: list) -> bool: def ext_dict_merge(source: dict, destination: Union[dict, 'ConfigDict']): return definition.ext_dict_merge(source, destination) + def load_op_reference(op_cache=[]): if op_cache: return op_cache[0] @@ -108,5 +111,26 @@ def load_op_reference(op_cache=[]): return op_xml -def get_op_ref_path(path: list) -> list[op_definition.PathData]: - return load_op_reference()._get_op_ref_path(path) + +def walk_op_data(func: Callable[[tuple, dict], Any]): + return load_op_reference().walk(func) + + +def walk_op_node_data(): + return load_op_reference().walk_node_data() + + +def lookup_op_data( + path: list, tag_values: bool = False, last_node_type: str = '' +) -> (dict, list[str]): + return load_op_reference().lookup( + path, tag_values=tag_values, last_node_type=last_node_type + ) + + +def lookup_op_node_data( + path: list, tag_values: bool = False, last_node_type: str = '' +) -> list[op_definition.NodeData]: + return load_op_reference().lookup_node_data( + path, tag_values=tag_values, last_node_type=last_node_type + ) diff --git a/python/vyos/xml_ref/generate_op_cache.py b/python/vyos/xml_ref/generate_op_cache.py index 95779d066..c00e676df 100755 --- a/python/vyos/xml_ref/generate_op_cache.py +++ b/python/vyos/xml_ref/generate_op_cache.py @@ -14,10 +14,11 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +import io import re import sys -import json import glob +import atexit from argparse import ArgumentParser from os.path import join @@ -25,23 +26,40 @@ from os.path import abspath from os.path import dirname from xml.etree import ElementTree as ET from xml.etree.ElementTree import Element +from functools import cmp_to_key from typing import TypeAlias from typing import Optional +from op_definition import NodeData +from op_definition import OpKey # pylint: disable=unused-import # noqa: F401 +from op_definition import OpData # pylint: disable=unused-import # noqa: F401 +from op_definition import key_name +from op_definition import key_type + _here = dirname(__file__) sys.path.append(join(_here, '..')) -from defaults import directories - -from op_definition import PathData +# pylint: disable=wrong-import-position,wrong-import-order +from defaults import directories # noqa: E402 -xml_op_cache_json = 'xml_op_cache.json' -xml_op_tmp = join('/tmp', xml_op_cache_json) op_ref_cache = abspath(join(_here, 'op_cache.py')) OptElement: TypeAlias = Optional[Element] -DEBUG = False + + +# It is expected that the node_data help txt contained in top-level nodes, +# shared across files, e.g.'show', will reveal inconsistencies; to list +# differences, use --check-xml-consistency +CHECK_XML_CONSISTENCY = False +err_buf = io.StringIO() + + +def write_err_buf(): + err_buf.seek(0) + out = err_buf.read() + print(out) + err_buf.close() def translate_exec(s: str) -> str: @@ -74,8 +92,44 @@ def translate_op_script(s: str) -> str: return s -def insert_node(n: Element, l: list[PathData], path=None) -> None: - # pylint: disable=too-many-locals,too-many-branches +def compare_keys(a, b): + match key_type(a), key_type(b): + case None, None: + if key_name(a) == key_name(b): + return 0 + return -1 if key_name(a) < key_name(b) else 1 + case None, _: + return -1 + case _, None: + return 1 + case _, _: + if key_name(a) == key_name(b): + if key_type(a) == key_type(b): + return 0 + return -1 if key_type(a) < key_type(b) else 1 + return -1 if key_name(a) < key_name(b) else 1 + + +def sort_func(obj: dict, key_func): + if not obj or not isinstance(obj, dict): + return obj + k_list = list(obj.keys()) + if not isinstance(k_list[0], tuple): + return obj + k_list = sorted(k_list, key=key_func) + v_list = map(lambda t: sort_func(obj[t], key_func), k_list) + return dict(zip(k_list, v_list)) + + +def sort_op_data(obj): + key_func = cmp_to_key(compare_keys) + return sort_func(obj, key_func) + + +def insert_node( + n: Element, d: dict, path: list[str] = None, parent: NodeData = None +) -> None: + # pylint: disable=too-many-locals,too-many-branches,too-many-statements prop: OptElement = n.find('properties') children: OptElement = n.find('children') command: OptElement = n.find('command') @@ -124,31 +178,48 @@ def insert_node(n: Element, l: list[PathData], path=None) -> None: if comp_scripts: comp_help['script'] = comp_scripts - cur_node_dict = {} - cur_node_dict['name'] = name - cur_node_dict['type'] = node_type - cur_node_dict['comp_help'] = comp_help - cur_node_dict['help'] = help_text - cur_node_dict['command'] = command_text - cur_node_dict['path'] = path - cur_node_dict['children'] = [] - l.append(cur_node_dict) + cur_node_data = NodeData() + cur_node_data.name = name + cur_node_data.node_type = node_type + cur_node_data.comp_help = comp_help + cur_node_data.help_text = help_text + cur_node_data.command = command_text + cur_node_data.path = path + + value = {('node_data', None): cur_node_data} + key = (name, node_type) + + cur_value = d.setdefault(key, value) + + if parent and key not in parent.children: + parent.children.append(key) + + if ( + CHECK_XML_CONSISTENCY + and cur_value[('node_data', None)] != value[('node_data', None)] + ): + err_buf.write( + f"prev: {cur_value[('node_data', None)]}; new: {value[('node_data', None)]}\n" + ) if children is not None: inner_nodes = children.iterfind('*') for inner_n in inner_nodes: inner_path = path[:] - insert_node(inner_n, cur_node_dict['children'], inner_path) + insert_node(inner_n, d[key], inner_path, cur_node_data) -def parse_file(file_path, l): +def parse_file(file_path, d): tree = ET.parse(file_path) root = tree.getroot() for n in root.iterfind('*'): - insert_node(n, l) + insert_node(n, d) def main(): + # pylint: disable=global-statement + global CHECK_XML_CONSISTENCY + parser = ArgumentParser(description='generate dict from xml defintions') parser.add_argument( '--xml-dir', @@ -156,21 +227,30 @@ def main(): required=True, help='transcluded xml op-mode-definition file', ) + parser.add_argument( + '--check-xml-consistency', + action='store_true', + help='check consistency of node data across files', + ) args = vars(parser.parse_args()) + if args['check_xml_consistency']: + CHECK_XML_CONSISTENCY = True + atexit.register(write_err_buf) + xml_dir = abspath(args['xml_dir']) - l = [] + d = {} - for fname in glob.glob(f'{xml_dir}/*.xml'): - parse_file(fname, l) + for fname in sorted(glob.glob(f'{xml_dir}/*.xml')): + parse_file(fname, d) - with open(xml_op_tmp, 'w') as f: - json.dump(l, f, indent=2) + d = sort_op_data(d) with open(op_ref_cache, 'w') as f: - f.write(f'op_reference = {str(l)}') + f.write('from vyos.xml_ref.op_definition import NodeData\n') + f.write(f'op_reference = {str(d)}') if __name__ == '__main__': diff --git a/python/vyos/xml_ref/op_definition.py b/python/vyos/xml_ref/op_definition.py index 914f3a105..f61181262 100644 --- a/python/vyos/xml_ref/op_definition.py +++ b/python/vyos/xml_ref/op_definition.py @@ -13,37 +13,178 @@ # You should have received a copy of the GNU Lesser General Public License # along with this library. If not, see <http://www.gnu.org/licenses/>. -from typing import TypedDict from typing import TypeAlias -from typing import Optional from typing import Union +from typing import Iterator +from dataclasses import dataclass +from dataclasses import field +from itertools import filterfalse -class NodeData(TypedDict): - node_type: Optional[str] - help_text: Optional[str] - comp_help: Optional[dict[str, list]] - command: Optional[str] - path: Optional[list[str]] +@dataclass +class NodeData: + name: str = '' + node_type: str = 'node' + help_text: str = '' + comp_help: dict[str, list] = field(default_factory=dict) + command: str = '' + path: list[str] = field(default_factory=list) + children: list[tuple] = field(default_factory=list) -PathData: TypeAlias = dict[str, Union[NodeData|list['PathData']]] +OpKey: TypeAlias = tuple[str, str] +OpData: TypeAlias = dict[OpKey, Union[NodeData, 'OpData']] + + +def key_name(k: OpKey): + return k[0] + + +def key_type(k: OpKey): + return k[1] + + +def key_names(l: list): # noqa: E741 + return list(map(lambda t: t[0], l)) + + +def keys_of_name(s: str, l: list): # noqa: E741 + filter(lambda t: t[0] == s, l) + + +def is_tag_node(t: tuple): + return t[1] == 'tagNode' + + +def subdict_of_name(s: str, d: dict) -> dict: + res = {} + for t, v in d.items(): + if not isinstance(t, tuple): + break + if key_name(t) == s: + res[t] = v + + return res + + +def next_keys(d: dict) -> list: + key_set = set() + for k in list(d.keys()): + if isinstance(d[k], dict): + key_set |= set(d[k].keys()) + return list(key_set) + + +def tuple_paths(d: dict) -> Iterator[list[tuple]]: + def func(d, path): + if isinstance(d, dict): + if not d: + yield path + for k, v in d.items(): + if isinstance(k, tuple) and key_name(k) != 'node_data': + for r in func(v, path + [k]): + yield r + else: + yield path + else: + yield path + + for r in func(d, []): + yield r + + +def match_tuple_paths( + path: list[str], paths: list[list[tuple[str, str]]] +) -> list[list[tuple[str, str]]]: + return list(filter(lambda p: key_names(p) == path, paths)) + + +def get_node_data_at_path(d: dict, tpath): + if not tpath: + return {} + # operates on actual paths, not names: + if not isinstance(tpath[0], tuple): + raise ValueError('must be path of tuples') + while tpath and d: + d = d.get(tpath[0], {}) + tpath = tpath[1:] + + return d.get(('node_data', None), {}) class OpXml: def __init__(self): self.op_ref = {} - def define(self, op_ref: list[PathData]) -> None: + def define(self, op_ref: dict) -> None: self.op_ref = op_ref - def _get_op_ref_path(self, path: list[str]) -> list[PathData]: - def _get_path_list(path: list[str], l: list[PathData]) -> list[PathData]: - if not path: - return l - for d in l: - if path[0] in list(d): - return _get_path_list(path[1:], d[path[0]]) - return [] - l = self.op_ref - return _get_path_list(path, l) + def walk(self, func): + def walk_op_data(obj, func): + if isinstance(obj, dict): + for k, v in obj.items(): + if isinstance(k, tuple): + res = func(k, v) + yield res + yield from walk_op_data(v, func) + + return walk_op_data(self.op_ref, func) + + @staticmethod + def get_node_data_func(k, v): + if key_name(k) == 'node_data': + return v + return None + + def walk_node_data(self): + return filterfalse(lambda x: x is None, self.walk(self.get_node_data_func)) + + def lookup( + self, path: list[str], tag_values: bool = False, last_node_type: str = '' + ) -> (OpData, list[str]): + path = path[:] + + ref_path = [] + + def prune_tree(d: dict, p: list[str]): + p = p[:] + if not d or not isinstance(d, dict) or not p: + return d + op_data: dict = subdict_of_name(p[0], d) + op_keys = list(op_data.keys()) + ref_path.append(p[0]) + if len(p) < 2: + # check last node_type + if last_node_type: + keys = list(filter(lambda t: t[1] == last_node_type, op_keys)) + values = list(map(lambda t: op_data[t], keys)) + return dict(zip(keys, values)) + return op_data + + if p[1] not in key_names(next_keys(op_data)): + # check if tag_values + if tag_values: + p = p[2:] + keys = list(filter(is_tag_node, op_keys)) + values = list(map(lambda t: prune_tree(op_data[t], p), keys)) + return dict(zip(keys, values)) + return {} + + p = p[1:] + op_data = list(map(lambda t: prune_tree(op_data[t], p), op_keys)) + + return dict(zip(op_keys, op_data)) + + return prune_tree(self.op_ref, path), ref_path + + def lookup_node_data( + self, path: list[str], tag_values: bool = False, last_node_type: str = '' + ) -> list[NodeData]: + res = [] + d, ref_path = self.lookup(path, tag_values, last_node_type) + paths = list(tuple_paths(d)) + paths = match_tuple_paths(ref_path, paths) + for p in paths: + res.append(get_node_data_at_path(d, p)) + + return res |