diff options
author | John Estabrook <jestabro@vyos.io> | 2025-06-13 07:31:40 -0500 |
---|---|---|
committer | John Estabrook <jestabro@vyos.io> | 2025-06-20 09:58:46 -0500 |
commit | e5f3f1b2bb6cadaa7586beff100f83e4ff9159db (patch) | |
tree | bf7fc2872f01cec31fccf1e0f3f52cb1dd3e31c3 /python | |
parent | bdb5846987fa21f410204f0aa15670f37339da8c (diff) | |
download | vyos-1x-e5f3f1b2bb6cadaa7586beff100f83e4ff9159db.tar.gz vyos-1x-e5f3f1b2bb6cadaa7586beff100f83e4ff9159db.zip |
T7561: simplify op-mode-definitions XML cache and add interface methods
The original implementation of the op-mode XML cache generation resulted
in a structure that was difficult to use, for example, in documentation
generation. The source of complication is that, unlike the XML of
interface-definitions, path names are not unique: the same path may
occur as both a regular node and as a tag node. Here we simplify the
underlying structure by enriching path names with type information, thus
disambiguating paths. An interface to the cache is provided by explicit
generator and lookup functions.
Diffstat (limited to 'python')
-rw-r--r-- | python/vyos/xml_ref/__init__.py | 28 | ||||
-rwxr-xr-x | python/vyos/xml_ref/generate_op_cache.py | 134 | ||||
-rw-r--r-- | python/vyos/xml_ref/op_definition.py | 181 |
3 files changed, 294 insertions, 49 deletions
diff --git a/python/vyos/xml_ref/__init__.py b/python/vyos/xml_ref/__init__.py index 99d8432d2..cd50a3ec2 100644 --- a/python/vyos/xml_ref/__init__.py +++ b/python/vyos/xml_ref/__init__.py @@ -14,6 +14,8 @@ # along with this library. If not, see <http://www.gnu.org/licenses/>. from typing import Optional, Union, TYPE_CHECKING +from typing import Callable +from typing import Any from vyos.xml_ref import definition from vyos.xml_ref import op_definition @@ -89,6 +91,7 @@ def from_source(d: dict, path: list) -> bool: def ext_dict_merge(source: dict, destination: Union[dict, 'ConfigDict']): return definition.ext_dict_merge(source, destination) + def load_op_reference(op_cache=[]): if op_cache: return op_cache[0] @@ -108,5 +111,26 @@ def load_op_reference(op_cache=[]): return op_xml -def get_op_ref_path(path: list) -> list[op_definition.PathData]: - return load_op_reference()._get_op_ref_path(path) + +def walk_op_data(func: Callable[[tuple, dict], Any]): + return load_op_reference().walk(func) + + +def walk_op_node_data(): + return load_op_reference().walk_node_data() + + +def lookup_op_data( + path: list, tag_values: bool = False, last_node_type: str = '' +) -> (dict, list[str]): + return load_op_reference().lookup( + path, tag_values=tag_values, last_node_type=last_node_type + ) + + +def lookup_op_node_data( + path: list, tag_values: bool = False, last_node_type: str = '' +) -> list[op_definition.NodeData]: + return load_op_reference().lookup_node_data( + path, tag_values=tag_values, last_node_type=last_node_type + ) diff --git a/python/vyos/xml_ref/generate_op_cache.py b/python/vyos/xml_ref/generate_op_cache.py index 95779d066..c00e676df 100755 --- a/python/vyos/xml_ref/generate_op_cache.py +++ b/python/vyos/xml_ref/generate_op_cache.py @@ -14,10 +14,11 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +import io import re import sys -import json import glob +import atexit from argparse import ArgumentParser from os.path import join @@ -25,23 +26,40 @@ from os.path import abspath from os.path import dirname from xml.etree import ElementTree as ET from xml.etree.ElementTree import Element +from functools import cmp_to_key from typing import TypeAlias from typing import Optional +from op_definition import NodeData +from op_definition import OpKey # pylint: disable=unused-import # noqa: F401 +from op_definition import OpData # pylint: disable=unused-import # noqa: F401 +from op_definition import key_name +from op_definition import key_type + _here = dirname(__file__) sys.path.append(join(_here, '..')) -from defaults import directories - -from op_definition import PathData +# pylint: disable=wrong-import-position,wrong-import-order +from defaults import directories # noqa: E402 -xml_op_cache_json = 'xml_op_cache.json' -xml_op_tmp = join('/tmp', xml_op_cache_json) op_ref_cache = abspath(join(_here, 'op_cache.py')) OptElement: TypeAlias = Optional[Element] -DEBUG = False + + +# It is expected that the node_data help txt contained in top-level nodes, +# shared across files, e.g.'show', will reveal inconsistencies; to list +# differences, use --check-xml-consistency +CHECK_XML_CONSISTENCY = False +err_buf = io.StringIO() + + +def write_err_buf(): + err_buf.seek(0) + out = err_buf.read() + print(out) + err_buf.close() def translate_exec(s: str) -> str: @@ -74,8 +92,44 @@ def translate_op_script(s: str) -> str: return s -def insert_node(n: Element, l: list[PathData], path=None) -> None: - # pylint: disable=too-many-locals,too-many-branches +def compare_keys(a, b): + match key_type(a), key_type(b): + case None, None: + if key_name(a) == key_name(b): + return 0 + return -1 if key_name(a) < key_name(b) else 1 + case None, _: + return -1 + case _, None: + return 1 + case _, _: + if key_name(a) == key_name(b): + if key_type(a) == key_type(b): + return 0 + return -1 if key_type(a) < key_type(b) else 1 + return -1 if key_name(a) < key_name(b) else 1 + + +def sort_func(obj: dict, key_func): + if not obj or not isinstance(obj, dict): + return obj + k_list = list(obj.keys()) + if not isinstance(k_list[0], tuple): + return obj + k_list = sorted(k_list, key=key_func) + v_list = map(lambda t: sort_func(obj[t], key_func), k_list) + return dict(zip(k_list, v_list)) + + +def sort_op_data(obj): + key_func = cmp_to_key(compare_keys) + return sort_func(obj, key_func) + + +def insert_node( + n: Element, d: dict, path: list[str] = None, parent: NodeData = None +) -> None: + # pylint: disable=too-many-locals,too-many-branches,too-many-statements prop: OptElement = n.find('properties') children: OptElement = n.find('children') command: OptElement = n.find('command') @@ -124,31 +178,48 @@ def insert_node(n: Element, l: list[PathData], path=None) -> None: if comp_scripts: comp_help['script'] = comp_scripts - cur_node_dict = {} - cur_node_dict['name'] = name - cur_node_dict['type'] = node_type - cur_node_dict['comp_help'] = comp_help - cur_node_dict['help'] = help_text - cur_node_dict['command'] = command_text - cur_node_dict['path'] = path - cur_node_dict['children'] = [] - l.append(cur_node_dict) + cur_node_data = NodeData() + cur_node_data.name = name + cur_node_data.node_type = node_type + cur_node_data.comp_help = comp_help + cur_node_data.help_text = help_text + cur_node_data.command = command_text + cur_node_data.path = path + + value = {('node_data', None): cur_node_data} + key = (name, node_type) + + cur_value = d.setdefault(key, value) + + if parent and key not in parent.children: + parent.children.append(key) + + if ( + CHECK_XML_CONSISTENCY + and cur_value[('node_data', None)] != value[('node_data', None)] + ): + err_buf.write( + f"prev: {cur_value[('node_data', None)]}; new: {value[('node_data', None)]}\n" + ) if children is not None: inner_nodes = children.iterfind('*') for inner_n in inner_nodes: inner_path = path[:] - insert_node(inner_n, cur_node_dict['children'], inner_path) + insert_node(inner_n, d[key], inner_path, cur_node_data) -def parse_file(file_path, l): +def parse_file(file_path, d): tree = ET.parse(file_path) root = tree.getroot() for n in root.iterfind('*'): - insert_node(n, l) + insert_node(n, d) def main(): + # pylint: disable=global-statement + global CHECK_XML_CONSISTENCY + parser = ArgumentParser(description='generate dict from xml defintions') parser.add_argument( '--xml-dir', @@ -156,21 +227,30 @@ def main(): required=True, help='transcluded xml op-mode-definition file', ) + parser.add_argument( + '--check-xml-consistency', + action='store_true', + help='check consistency of node data across files', + ) args = vars(parser.parse_args()) + if args['check_xml_consistency']: + CHECK_XML_CONSISTENCY = True + atexit.register(write_err_buf) + xml_dir = abspath(args['xml_dir']) - l = [] + d = {} - for fname in glob.glob(f'{xml_dir}/*.xml'): - parse_file(fname, l) + for fname in sorted(glob.glob(f'{xml_dir}/*.xml')): + parse_file(fname, d) - with open(xml_op_tmp, 'w') as f: - json.dump(l, f, indent=2) + d = sort_op_data(d) with open(op_ref_cache, 'w') as f: - f.write(f'op_reference = {str(l)}') + f.write('from vyos.xml_ref.op_definition import NodeData\n') + f.write(f'op_reference = {str(d)}') if __name__ == '__main__': diff --git a/python/vyos/xml_ref/op_definition.py b/python/vyos/xml_ref/op_definition.py index 914f3a105..f61181262 100644 --- a/python/vyos/xml_ref/op_definition.py +++ b/python/vyos/xml_ref/op_definition.py @@ -13,37 +13,178 @@ # You should have received a copy of the GNU Lesser General Public License # along with this library. If not, see <http://www.gnu.org/licenses/>. -from typing import TypedDict from typing import TypeAlias -from typing import Optional from typing import Union +from typing import Iterator +from dataclasses import dataclass +from dataclasses import field +from itertools import filterfalse -class NodeData(TypedDict): - node_type: Optional[str] - help_text: Optional[str] - comp_help: Optional[dict[str, list]] - command: Optional[str] - path: Optional[list[str]] +@dataclass +class NodeData: + name: str = '' + node_type: str = 'node' + help_text: str = '' + comp_help: dict[str, list] = field(default_factory=dict) + command: str = '' + path: list[str] = field(default_factory=list) + children: list[tuple] = field(default_factory=list) -PathData: TypeAlias = dict[str, Union[NodeData|list['PathData']]] +OpKey: TypeAlias = tuple[str, str] +OpData: TypeAlias = dict[OpKey, Union[NodeData, 'OpData']] + + +def key_name(k: OpKey): + return k[0] + + +def key_type(k: OpKey): + return k[1] + + +def key_names(l: list): # noqa: E741 + return list(map(lambda t: t[0], l)) + + +def keys_of_name(s: str, l: list): # noqa: E741 + filter(lambda t: t[0] == s, l) + + +def is_tag_node(t: tuple): + return t[1] == 'tagNode' + + +def subdict_of_name(s: str, d: dict) -> dict: + res = {} + for t, v in d.items(): + if not isinstance(t, tuple): + break + if key_name(t) == s: + res[t] = v + + return res + + +def next_keys(d: dict) -> list: + key_set = set() + for k in list(d.keys()): + if isinstance(d[k], dict): + key_set |= set(d[k].keys()) + return list(key_set) + + +def tuple_paths(d: dict) -> Iterator[list[tuple]]: + def func(d, path): + if isinstance(d, dict): + if not d: + yield path + for k, v in d.items(): + if isinstance(k, tuple) and key_name(k) != 'node_data': + for r in func(v, path + [k]): + yield r + else: + yield path + else: + yield path + + for r in func(d, []): + yield r + + +def match_tuple_paths( + path: list[str], paths: list[list[tuple[str, str]]] +) -> list[list[tuple[str, str]]]: + return list(filter(lambda p: key_names(p) == path, paths)) + + +def get_node_data_at_path(d: dict, tpath): + if not tpath: + return {} + # operates on actual paths, not names: + if not isinstance(tpath[0], tuple): + raise ValueError('must be path of tuples') + while tpath and d: + d = d.get(tpath[0], {}) + tpath = tpath[1:] + + return d.get(('node_data', None), {}) class OpXml: def __init__(self): self.op_ref = {} - def define(self, op_ref: list[PathData]) -> None: + def define(self, op_ref: dict) -> None: self.op_ref = op_ref - def _get_op_ref_path(self, path: list[str]) -> list[PathData]: - def _get_path_list(path: list[str], l: list[PathData]) -> list[PathData]: - if not path: - return l - for d in l: - if path[0] in list(d): - return _get_path_list(path[1:], d[path[0]]) - return [] - l = self.op_ref - return _get_path_list(path, l) + def walk(self, func): + def walk_op_data(obj, func): + if isinstance(obj, dict): + for k, v in obj.items(): + if isinstance(k, tuple): + res = func(k, v) + yield res + yield from walk_op_data(v, func) + + return walk_op_data(self.op_ref, func) + + @staticmethod + def get_node_data_func(k, v): + if key_name(k) == 'node_data': + return v + return None + + def walk_node_data(self): + return filterfalse(lambda x: x is None, self.walk(self.get_node_data_func)) + + def lookup( + self, path: list[str], tag_values: bool = False, last_node_type: str = '' + ) -> (OpData, list[str]): + path = path[:] + + ref_path = [] + + def prune_tree(d: dict, p: list[str]): + p = p[:] + if not d or not isinstance(d, dict) or not p: + return d + op_data: dict = subdict_of_name(p[0], d) + op_keys = list(op_data.keys()) + ref_path.append(p[0]) + if len(p) < 2: + # check last node_type + if last_node_type: + keys = list(filter(lambda t: t[1] == last_node_type, op_keys)) + values = list(map(lambda t: op_data[t], keys)) + return dict(zip(keys, values)) + return op_data + + if p[1] not in key_names(next_keys(op_data)): + # check if tag_values + if tag_values: + p = p[2:] + keys = list(filter(is_tag_node, op_keys)) + values = list(map(lambda t: prune_tree(op_data[t], p), keys)) + return dict(zip(keys, values)) + return {} + + p = p[1:] + op_data = list(map(lambda t: prune_tree(op_data[t], p), op_keys)) + + return dict(zip(op_keys, op_data)) + + return prune_tree(self.op_ref, path), ref_path + + def lookup_node_data( + self, path: list[str], tag_values: bool = False, last_node_type: str = '' + ) -> list[NodeData]: + res = [] + d, ref_path = self.lookup(path, tag_values, last_node_type) + paths = list(tuple_paths(d)) + paths = match_tuple_paths(ref_path, paths) + for p in paths: + res.append(get_node_data_at_path(d, p)) + + return res |