summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/vyos/xml_ref/__init__.py28
-rwxr-xr-xpython/vyos/xml_ref/generate_op_cache.py134
-rw-r--r--python/vyos/xml_ref/op_definition.py181
3 files changed, 294 insertions, 49 deletions
diff --git a/python/vyos/xml_ref/__init__.py b/python/vyos/xml_ref/__init__.py
index 99d8432d2..cd50a3ec2 100644
--- a/python/vyos/xml_ref/__init__.py
+++ b/python/vyos/xml_ref/__init__.py
@@ -14,6 +14,8 @@
# along with this library. If not, see <http://www.gnu.org/licenses/>.
from typing import Optional, Union, TYPE_CHECKING
+from typing import Callable
+from typing import Any
from vyos.xml_ref import definition
from vyos.xml_ref import op_definition
@@ -89,6 +91,7 @@ def from_source(d: dict, path: list) -> bool:
def ext_dict_merge(source: dict, destination: Union[dict, 'ConfigDict']):
return definition.ext_dict_merge(source, destination)
+
def load_op_reference(op_cache=[]):
if op_cache:
return op_cache[0]
@@ -108,5 +111,26 @@ def load_op_reference(op_cache=[]):
return op_xml
-def get_op_ref_path(path: list) -> list[op_definition.PathData]:
- return load_op_reference()._get_op_ref_path(path)
+
+def walk_op_data(func: Callable[[tuple, dict], Any]):
+ return load_op_reference().walk(func)
+
+
+def walk_op_node_data():
+ return load_op_reference().walk_node_data()
+
+
+def lookup_op_data(
+ path: list, tag_values: bool = False, last_node_type: str = ''
+) -> (dict, list[str]):
+ return load_op_reference().lookup(
+ path, tag_values=tag_values, last_node_type=last_node_type
+ )
+
+
+def lookup_op_node_data(
+ path: list, tag_values: bool = False, last_node_type: str = ''
+) -> list[op_definition.NodeData]:
+ return load_op_reference().lookup_node_data(
+ path, tag_values=tag_values, last_node_type=last_node_type
+ )
diff --git a/python/vyos/xml_ref/generate_op_cache.py b/python/vyos/xml_ref/generate_op_cache.py
index 95779d066..c00e676df 100755
--- a/python/vyos/xml_ref/generate_op_cache.py
+++ b/python/vyos/xml_ref/generate_op_cache.py
@@ -14,10 +14,11 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import io
import re
import sys
-import json
import glob
+import atexit
from argparse import ArgumentParser
from os.path import join
@@ -25,23 +26,40 @@ from os.path import abspath
from os.path import dirname
from xml.etree import ElementTree as ET
from xml.etree.ElementTree import Element
+from functools import cmp_to_key
from typing import TypeAlias
from typing import Optional
+from op_definition import NodeData
+from op_definition import OpKey # pylint: disable=unused-import # noqa: F401
+from op_definition import OpData # pylint: disable=unused-import # noqa: F401
+from op_definition import key_name
+from op_definition import key_type
+
_here = dirname(__file__)
sys.path.append(join(_here, '..'))
-from defaults import directories
-
-from op_definition import PathData
+# pylint: disable=wrong-import-position,wrong-import-order
+from defaults import directories # noqa: E402
-xml_op_cache_json = 'xml_op_cache.json'
-xml_op_tmp = join('/tmp', xml_op_cache_json)
op_ref_cache = abspath(join(_here, 'op_cache.py'))
OptElement: TypeAlias = Optional[Element]
-DEBUG = False
+
+
+# It is expected that the node_data help txt contained in top-level nodes,
+# shared across files, e.g.'show', will reveal inconsistencies; to list
+# differences, use --check-xml-consistency
+CHECK_XML_CONSISTENCY = False
+err_buf = io.StringIO()
+
+
+def write_err_buf():
+ err_buf.seek(0)
+ out = err_buf.read()
+ print(out)
+ err_buf.close()
def translate_exec(s: str) -> str:
@@ -74,8 +92,44 @@ def translate_op_script(s: str) -> str:
return s
-def insert_node(n: Element, l: list[PathData], path=None) -> None:
- # pylint: disable=too-many-locals,too-many-branches
+def compare_keys(a, b):
+ match key_type(a), key_type(b):
+ case None, None:
+ if key_name(a) == key_name(b):
+ return 0
+ return -1 if key_name(a) < key_name(b) else 1
+ case None, _:
+ return -1
+ case _, None:
+ return 1
+ case _, _:
+ if key_name(a) == key_name(b):
+ if key_type(a) == key_type(b):
+ return 0
+ return -1 if key_type(a) < key_type(b) else 1
+ return -1 if key_name(a) < key_name(b) else 1
+
+
+def sort_func(obj: dict, key_func):
+ if not obj or not isinstance(obj, dict):
+ return obj
+ k_list = list(obj.keys())
+ if not isinstance(k_list[0], tuple):
+ return obj
+ k_list = sorted(k_list, key=key_func)
+ v_list = map(lambda t: sort_func(obj[t], key_func), k_list)
+ return dict(zip(k_list, v_list))
+
+
+def sort_op_data(obj):
+ key_func = cmp_to_key(compare_keys)
+ return sort_func(obj, key_func)
+
+
+def insert_node(
+ n: Element, d: dict, path: list[str] = None, parent: NodeData = None
+) -> None:
+ # pylint: disable=too-many-locals,too-many-branches,too-many-statements
prop: OptElement = n.find('properties')
children: OptElement = n.find('children')
command: OptElement = n.find('command')
@@ -124,31 +178,48 @@ def insert_node(n: Element, l: list[PathData], path=None) -> None:
if comp_scripts:
comp_help['script'] = comp_scripts
- cur_node_dict = {}
- cur_node_dict['name'] = name
- cur_node_dict['type'] = node_type
- cur_node_dict['comp_help'] = comp_help
- cur_node_dict['help'] = help_text
- cur_node_dict['command'] = command_text
- cur_node_dict['path'] = path
- cur_node_dict['children'] = []
- l.append(cur_node_dict)
+ cur_node_data = NodeData()
+ cur_node_data.name = name
+ cur_node_data.node_type = node_type
+ cur_node_data.comp_help = comp_help
+ cur_node_data.help_text = help_text
+ cur_node_data.command = command_text
+ cur_node_data.path = path
+
+ value = {('node_data', None): cur_node_data}
+ key = (name, node_type)
+
+ cur_value = d.setdefault(key, value)
+
+ if parent and key not in parent.children:
+ parent.children.append(key)
+
+ if (
+ CHECK_XML_CONSISTENCY
+ and cur_value[('node_data', None)] != value[('node_data', None)]
+ ):
+ err_buf.write(
+ f"prev: {cur_value[('node_data', None)]}; new: {value[('node_data', None)]}\n"
+ )
if children is not None:
inner_nodes = children.iterfind('*')
for inner_n in inner_nodes:
inner_path = path[:]
- insert_node(inner_n, cur_node_dict['children'], inner_path)
+ insert_node(inner_n, d[key], inner_path, cur_node_data)
-def parse_file(file_path, l):
+def parse_file(file_path, d):
tree = ET.parse(file_path)
root = tree.getroot()
for n in root.iterfind('*'):
- insert_node(n, l)
+ insert_node(n, d)
def main():
+ # pylint: disable=global-statement
+ global CHECK_XML_CONSISTENCY
+
parser = ArgumentParser(description='generate dict from xml defintions')
parser.add_argument(
'--xml-dir',
@@ -156,21 +227,30 @@ def main():
required=True,
help='transcluded xml op-mode-definition file',
)
+ parser.add_argument(
+ '--check-xml-consistency',
+ action='store_true',
+ help='check consistency of node data across files',
+ )
args = vars(parser.parse_args())
+ if args['check_xml_consistency']:
+ CHECK_XML_CONSISTENCY = True
+ atexit.register(write_err_buf)
+
xml_dir = abspath(args['xml_dir'])
- l = []
+ d = {}
- for fname in glob.glob(f'{xml_dir}/*.xml'):
- parse_file(fname, l)
+ for fname in sorted(glob.glob(f'{xml_dir}/*.xml')):
+ parse_file(fname, d)
- with open(xml_op_tmp, 'w') as f:
- json.dump(l, f, indent=2)
+ d = sort_op_data(d)
with open(op_ref_cache, 'w') as f:
- f.write(f'op_reference = {str(l)}')
+ f.write('from vyos.xml_ref.op_definition import NodeData\n')
+ f.write(f'op_reference = {str(d)}')
if __name__ == '__main__':
diff --git a/python/vyos/xml_ref/op_definition.py b/python/vyos/xml_ref/op_definition.py
index 914f3a105..f61181262 100644
--- a/python/vyos/xml_ref/op_definition.py
+++ b/python/vyos/xml_ref/op_definition.py
@@ -13,37 +13,178 @@
# You should have received a copy of the GNU Lesser General Public License
# along with this library. If not, see <http://www.gnu.org/licenses/>.
-from typing import TypedDict
from typing import TypeAlias
-from typing import Optional
from typing import Union
+from typing import Iterator
+from dataclasses import dataclass
+from dataclasses import field
+from itertools import filterfalse
-class NodeData(TypedDict):
- node_type: Optional[str]
- help_text: Optional[str]
- comp_help: Optional[dict[str, list]]
- command: Optional[str]
- path: Optional[list[str]]
+@dataclass
+class NodeData:
+ name: str = ''
+ node_type: str = 'node'
+ help_text: str = ''
+ comp_help: dict[str, list] = field(default_factory=dict)
+ command: str = ''
+ path: list[str] = field(default_factory=list)
+ children: list[tuple] = field(default_factory=list)
-PathData: TypeAlias = dict[str, Union[NodeData|list['PathData']]]
+OpKey: TypeAlias = tuple[str, str]
+OpData: TypeAlias = dict[OpKey, Union[NodeData, 'OpData']]
+
+
+def key_name(k: OpKey):
+ return k[0]
+
+
+def key_type(k: OpKey):
+ return k[1]
+
+
+def key_names(l: list): # noqa: E741
+ return list(map(lambda t: t[0], l))
+
+
+def keys_of_name(s: str, l: list): # noqa: E741
+ filter(lambda t: t[0] == s, l)
+
+
+def is_tag_node(t: tuple):
+ return t[1] == 'tagNode'
+
+
+def subdict_of_name(s: str, d: dict) -> dict:
+ res = {}
+ for t, v in d.items():
+ if not isinstance(t, tuple):
+ break
+ if key_name(t) == s:
+ res[t] = v
+
+ return res
+
+
+def next_keys(d: dict) -> list:
+ key_set = set()
+ for k in list(d.keys()):
+ if isinstance(d[k], dict):
+ key_set |= set(d[k].keys())
+ return list(key_set)
+
+
+def tuple_paths(d: dict) -> Iterator[list[tuple]]:
+ def func(d, path):
+ if isinstance(d, dict):
+ if not d:
+ yield path
+ for k, v in d.items():
+ if isinstance(k, tuple) and key_name(k) != 'node_data':
+ for r in func(v, path + [k]):
+ yield r
+ else:
+ yield path
+ else:
+ yield path
+
+ for r in func(d, []):
+ yield r
+
+
+def match_tuple_paths(
+ path: list[str], paths: list[list[tuple[str, str]]]
+) -> list[list[tuple[str, str]]]:
+ return list(filter(lambda p: key_names(p) == path, paths))
+
+
+def get_node_data_at_path(d: dict, tpath):
+ if not tpath:
+ return {}
+ # operates on actual paths, not names:
+ if not isinstance(tpath[0], tuple):
+ raise ValueError('must be path of tuples')
+ while tpath and d:
+ d = d.get(tpath[0], {})
+ tpath = tpath[1:]
+
+ return d.get(('node_data', None), {})
class OpXml:
def __init__(self):
self.op_ref = {}
- def define(self, op_ref: list[PathData]) -> None:
+ def define(self, op_ref: dict) -> None:
self.op_ref = op_ref
- def _get_op_ref_path(self, path: list[str]) -> list[PathData]:
- def _get_path_list(path: list[str], l: list[PathData]) -> list[PathData]:
- if not path:
- return l
- for d in l:
- if path[0] in list(d):
- return _get_path_list(path[1:], d[path[0]])
- return []
- l = self.op_ref
- return _get_path_list(path, l)
+ def walk(self, func):
+ def walk_op_data(obj, func):
+ if isinstance(obj, dict):
+ for k, v in obj.items():
+ if isinstance(k, tuple):
+ res = func(k, v)
+ yield res
+ yield from walk_op_data(v, func)
+
+ return walk_op_data(self.op_ref, func)
+
+ @staticmethod
+ def get_node_data_func(k, v):
+ if key_name(k) == 'node_data':
+ return v
+ return None
+
+ def walk_node_data(self):
+ return filterfalse(lambda x: x is None, self.walk(self.get_node_data_func))
+
+ def lookup(
+ self, path: list[str], tag_values: bool = False, last_node_type: str = ''
+ ) -> (OpData, list[str]):
+ path = path[:]
+
+ ref_path = []
+
+ def prune_tree(d: dict, p: list[str]):
+ p = p[:]
+ if not d or not isinstance(d, dict) or not p:
+ return d
+ op_data: dict = subdict_of_name(p[0], d)
+ op_keys = list(op_data.keys())
+ ref_path.append(p[0])
+ if len(p) < 2:
+ # check last node_type
+ if last_node_type:
+ keys = list(filter(lambda t: t[1] == last_node_type, op_keys))
+ values = list(map(lambda t: op_data[t], keys))
+ return dict(zip(keys, values))
+ return op_data
+
+ if p[1] not in key_names(next_keys(op_data)):
+ # check if tag_values
+ if tag_values:
+ p = p[2:]
+ keys = list(filter(is_tag_node, op_keys))
+ values = list(map(lambda t: prune_tree(op_data[t], p), keys))
+ return dict(zip(keys, values))
+ return {}
+
+ p = p[1:]
+ op_data = list(map(lambda t: prune_tree(op_data[t], p), op_keys))
+
+ return dict(zip(op_keys, op_data))
+
+ return prune_tree(self.op_ref, path), ref_path
+
+ def lookup_node_data(
+ self, path: list[str], tag_values: bool = False, last_node_type: str = ''
+ ) -> list[NodeData]:
+ res = []
+ d, ref_path = self.lookup(path, tag_values, last_node_type)
+ paths = list(tuple_paths(d))
+ paths = match_tuple_paths(ref_path, paths)
+ for p in paths:
+ res.append(get_node_data_at_path(d, p))
+
+ return res