summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/vyos/defaults.py4
-rwxr-xr-xpython/vyos/firewall.py4
-rw-r--r--python/vyos/ifconfig/interface.py4
-rwxr-xr-xpython/vyos/template.py4
-rw-r--r--python/vyos/xml_ref/__init__.py28
-rwxr-xr-xpython/vyos/xml_ref/generate_op_cache.py191
-rw-r--r--python/vyos/xml_ref/op_definition.py246
7 files changed, 424 insertions, 57 deletions
diff --git a/python/vyos/defaults.py b/python/vyos/defaults.py
index f84b14040..63f3b5358 100644
--- a/python/vyos/defaults.py
+++ b/python/vyos/defaults.py
@@ -15,10 +15,10 @@
import os
-base_dir = '/usr/libexec/vyos/'
+base_dir = '/usr/libexec/vyos'
directories = {
- 'base' : base_dir,
+ 'base' : f'{base_dir}',
'data' : '/usr/share/vyos/',
'conf_mode' : f'{base_dir}/conf_mode',
'op_mode' : f'{base_dir}/op_mode',
diff --git a/python/vyos/firewall.py b/python/vyos/firewall.py
index 64022db84..0643107a9 100755
--- a/python/vyos/firewall.py
+++ b/python/vyos/firewall.py
@@ -361,7 +361,7 @@ def parse_rule(rule_conf, hook, fw_name, rule_id, ip_name):
if iiface[0] == '!':
operator = '!='
iiface = iiface[1:]
- output.append(f'iifname {operator} {{{iiface}}}')
+ output.append(f'iifname {operator} {{"{iiface}"}}')
elif 'group' in rule_conf['inbound_interface']:
iiface = rule_conf['inbound_interface']['group']
if iiface[0] == '!':
@@ -376,7 +376,7 @@ def parse_rule(rule_conf, hook, fw_name, rule_id, ip_name):
if oiface[0] == '!':
operator = '!='
oiface = oiface[1:]
- output.append(f'oifname {operator} {{{oiface}}}')
+ output.append(f'oifname {operator} {{"{oiface}"}}')
elif 'group' in rule_conf['outbound_interface']:
oiface = rule_conf['outbound_interface']['group']
if oiface[0] == '!':
diff --git a/python/vyos/ifconfig/interface.py b/python/vyos/ifconfig/interface.py
index 91b3a0c28..33c6830bc 100644
--- a/python/vyos/ifconfig/interface.py
+++ b/python/vyos/ifconfig/interface.py
@@ -423,11 +423,11 @@ class Interface(Control):
self._cmd(f'nft {nft_command}')
def _del_interface_from_ct_iface_map(self):
- nft_command = f'delete element inet vrf_zones ct_iface_map {{ "{self.ifname}" }}'
+ nft_command = f'delete element inet vrf_zones ct_iface_map {{ \'"{self.ifname}"\' }}'
self._nft_check_and_run(nft_command)
def _add_interface_to_ct_iface_map(self, vrf_table_id: int):
- nft_command = f'add element inet vrf_zones ct_iface_map {{ "{self.ifname}" : {vrf_table_id} }}'
+ nft_command = f'add element inet vrf_zones ct_iface_map {{ \'"{self.ifname}"\' : {vrf_table_id} }}'
self._nft_check_and_run(nft_command)
def get_ifindex(self):
diff --git a/python/vyos/template.py b/python/vyos/template.py
index bf2f13183..c6e35e9c7 100755
--- a/python/vyos/template.py
+++ b/python/vyos/template.py
@@ -582,6 +582,10 @@ def snmp_auth_oid(type):
}
return OIDs[type]
+@register_filter('quoted_join')
+def quoted_join(input_list, join_str, quote='"'):
+ return str(join_str).join(f'{quote}{elem}{quote}' for elem in input_list)
+
@register_filter('nft_action')
def nft_action(vyos_action):
if vyos_action == 'accept':
diff --git a/python/vyos/xml_ref/__init__.py b/python/vyos/xml_ref/__init__.py
index 99d8432d2..cd50a3ec2 100644
--- a/python/vyos/xml_ref/__init__.py
+++ b/python/vyos/xml_ref/__init__.py
@@ -14,6 +14,8 @@
# along with this library. If not, see <http://www.gnu.org/licenses/>.
from typing import Optional, Union, TYPE_CHECKING
+from typing import Callable
+from typing import Any
from vyos.xml_ref import definition
from vyos.xml_ref import op_definition
@@ -89,6 +91,7 @@ def from_source(d: dict, path: list) -> bool:
def ext_dict_merge(source: dict, destination: Union[dict, 'ConfigDict']):
return definition.ext_dict_merge(source, destination)
+
def load_op_reference(op_cache=[]):
if op_cache:
return op_cache[0]
@@ -108,5 +111,26 @@ def load_op_reference(op_cache=[]):
return op_xml
-def get_op_ref_path(path: list) -> list[op_definition.PathData]:
- return load_op_reference()._get_op_ref_path(path)
+
+def walk_op_data(func: Callable[[tuple, dict], Any]):
+ return load_op_reference().walk(func)
+
+
+def walk_op_node_data():
+ return load_op_reference().walk_node_data()
+
+
+def lookup_op_data(
+ path: list, tag_values: bool = False, last_node_type: str = ''
+) -> (dict, list[str]):
+ return load_op_reference().lookup(
+ path, tag_values=tag_values, last_node_type=last_node_type
+ )
+
+
+def lookup_op_node_data(
+ path: list, tag_values: bool = False, last_node_type: str = ''
+) -> list[op_definition.NodeData]:
+ return load_op_reference().lookup_node_data(
+ path, tag_values=tag_values, last_node_type=last_node_type
+ )
diff --git a/python/vyos/xml_ref/generate_op_cache.py b/python/vyos/xml_ref/generate_op_cache.py
index 95779d066..0c4ae7182 100755
--- a/python/vyos/xml_ref/generate_op_cache.py
+++ b/python/vyos/xml_ref/generate_op_cache.py
@@ -14,10 +14,13 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import os
+import io
import re
import sys
-import json
import glob
+import json
+import atexit
from argparse import ArgumentParser
from os.path import join
@@ -25,23 +28,44 @@ from os.path import abspath
from os.path import dirname
from xml.etree import ElementTree as ET
from xml.etree.ElementTree import Element
+from functools import cmp_to_key
from typing import TypeAlias
from typing import Optional
+from op_definition import NodeData
+from op_definition import OpKey # pylint: disable=unused-import # noqa: F401
+from op_definition import OpData # pylint: disable=unused-import # noqa: F401
+from op_definition import key_name
+from op_definition import key_type
+from op_definition import node_data_difference
+from op_definition import get_node_data
+from op_definition import collapse
+
_here = dirname(__file__)
sys.path.append(join(_here, '..'))
-from defaults import directories
-
-from op_definition import PathData
+# pylint: disable=wrong-import-position,wrong-import-order
+from defaults import directories # noqa: E402
-xml_op_cache_json = 'xml_op_cache.json'
-xml_op_tmp = join('/tmp', xml_op_cache_json)
op_ref_cache = abspath(join(_here, 'op_cache.py'))
+op_ref_json = abspath(join(_here, 'op_cache.json'))
OptElement: TypeAlias = Optional[Element]
-DEBUG = False
+
+
+# It is expected that the node_data help txt contained in top-level nodes,
+# shared across files, e.g.'show', will reveal inconsistencies; to list
+# differences, use --check-xml-consistency
+CHECK_XML_CONSISTENCY = False
+err_buf = io.StringIO()
+
+
+def write_err_buf():
+ err_buf.seek(0)
+ out = err_buf.read()
+ print(out)
+ err_buf.close()
def translate_exec(s: str) -> str:
@@ -74,14 +98,58 @@ def translate_op_script(s: str) -> str:
return s
-def insert_node(n: Element, l: list[PathData], path=None) -> None:
- # pylint: disable=too-many-locals,too-many-branches
+def compare_keys(a, b):
+ # pylint: disable=too-many-return-statements
+ match key_type(a), key_type(b):
+ case None, None:
+ if key_name(a) == key_name(b):
+ return 0
+ return -1 if key_name(a) < key_name(b) else 1
+ case None, _:
+ return -1
+ case _, None:
+ return 1
+ case _, _:
+ if key_name(a) == key_name(b):
+ if key_type(a) == key_type(b):
+ return 0
+ return -1 if key_type(a) < key_type(b) else 1
+ return -1 if key_name(a) < key_name(b) else 1
+
+
+def sort_func(obj: dict, key_func):
+ if not obj or not isinstance(obj, dict):
+ return obj
+ k_list = list(obj.keys())
+ if not isinstance(k_list[0], tuple):
+ return obj
+ k_list = sorted(k_list, key=key_func)
+ v_list = map(lambda t: sort_func(obj[t], key_func), k_list)
+ return dict(zip(k_list, v_list))
+
+
+def sort_op_data(obj):
+ key_func = cmp_to_key(compare_keys)
+ return sort_func(obj, key_func)
+
+
+def insert_node(
+ n: Element, d: dict, path: list[str] = None, parent: NodeData = None, file: str = ''
+) -> None:
+ # pylint: disable=too-many-locals,too-many-branches,too-many-statements
prop: OptElement = n.find('properties')
children: OptElement = n.find('children')
command: OptElement = n.find('command')
- # name is not None as required by schema
- name: str = n.get('name', 'schema_error')
+ standalone: OptElement = n.find('standalone')
node_type: str = n.tag
+
+ if node_type == 'virtualTagNode':
+ name = '__virtual_tag'
+ else:
+ name = n.get('name')
+ if not name:
+ raise ValueError("Node name is required for all node types except <virtualTagNode>")
+
if path is None:
path = []
@@ -95,6 +163,16 @@ def insert_node(n: Element, l: list[PathData], path=None) -> None:
if command_text is not None:
command_text = translate_command(command_text, path)
+ try:
+ standalone_command = translate_command(standalone.find('command').text, path)
+ except AttributeError:
+ standalone_command = None
+
+ try:
+ standalone_help_text = translate_command(standalone.find('help').text, path)
+ except AttributeError:
+ standalone_help_text = None
+
comp_help = {}
if prop is not None:
che = prop.findall('completionHelp')
@@ -124,31 +202,49 @@ def insert_node(n: Element, l: list[PathData], path=None) -> None:
if comp_scripts:
comp_help['script'] = comp_scripts
- cur_node_dict = {}
- cur_node_dict['name'] = name
- cur_node_dict['type'] = node_type
- cur_node_dict['comp_help'] = comp_help
- cur_node_dict['help'] = help_text
- cur_node_dict['command'] = command_text
- cur_node_dict['path'] = path
- cur_node_dict['children'] = []
- l.append(cur_node_dict)
+ cur_node_data = NodeData()
+ cur_node_data.name = name
+ cur_node_data.node_type = node_type
+ cur_node_data.comp_help = comp_help
+ cur_node_data.help_text = help_text
+ cur_node_data.command = command_text
+ cur_node_data.standalone_help_text = standalone_help_text
+ cur_node_data.standalone_command = standalone_command
+ cur_node_data.path = path
+ cur_node_data.file = file
+
+ value = {('__node_data', None): cur_node_data}
+ key = (name, node_type)
+
+ cur_value = d.setdefault(key, value)
+
+ if parent and key not in parent.children:
+ parent.children.append(key)
+
+ if CHECK_XML_CONSISTENCY:
+ out = node_data_difference(get_node_data(cur_value), get_node_data(value))
+ if out:
+ err_buf.write(out)
if children is not None:
inner_nodes = children.iterfind('*')
for inner_n in inner_nodes:
inner_path = path[:]
- insert_node(inner_n, cur_node_dict['children'], inner_path)
+ insert_node(inner_n, d[key], inner_path, cur_node_data, file)
-def parse_file(file_path, l):
+def parse_file(file_path, d):
tree = ET.parse(file_path)
root = tree.getroot()
+ file = os.path.basename(file_path)
for n in root.iterfind('*'):
- insert_node(n, l)
+ insert_node(n, d, file=file)
def main():
+ # pylint: disable=global-statement
+ global CHECK_XML_CONSISTENCY
+
parser = ArgumentParser(description='generate dict from xml defintions')
parser.add_argument(
'--xml-dir',
@@ -156,21 +252,58 @@ def main():
required=True,
help='transcluded xml op-mode-definition file',
)
+ parser.add_argument(
+ '--check-xml-consistency',
+ action='store_true',
+ help='check consistency of node data across files',
+ )
+ parser.add_argument(
+ '--check-path-ambiguity',
+ action='store_true',
+ help='attempt to reduce to unique paths, reporting if error',
+ )
+ parser.add_argument(
+ '--select',
+ type=str,
+ help='limit cache to a subset of XML files: "power_ctl | multicast-group | ..."',
+ )
args = vars(parser.parse_args())
+ if args['check_xml_consistency']:
+ CHECK_XML_CONSISTENCY = True
+ atexit.register(write_err_buf)
+
xml_dir = abspath(args['xml_dir'])
- l = []
+ d = {}
+
+ select = args['select']
+ if select:
+ select = [item.strip() for item in select.split('|')]
+
+ for fname in sorted(glob.glob(f'{xml_dir}/*.xml')):
+ file = os.path.basename(fname)
+ if not select or os.path.splitext(file)[0] in select:
+ parse_file(fname, d)
- for fname in glob.glob(f'{xml_dir}/*.xml'):
- parse_file(fname, l)
+ d = sort_op_data(d)
- with open(xml_op_tmp, 'w') as f:
- json.dump(l, f, indent=2)
+ if args['check_path_ambiguity']:
+ # when the following passes without error, return value will be the
+ # full dictionary indexed by str, not tuple
+ res, out, err = collapse(d)
+ if not err:
+ with open(op_ref_json, 'w') as f:
+ json.dump(res, f, indent=2)
+ else:
+ print('Found the following duplicate paths:\n')
+ print(out)
+ sys.exit(1)
with open(op_ref_cache, 'w') as f:
- f.write(f'op_reference = {str(l)}')
+ f.write('from vyos.xml_ref.op_definition import NodeData\n')
+ f.write(f'op_reference = {str(d)}')
if __name__ == '__main__':
diff --git a/python/vyos/xml_ref/op_definition.py b/python/vyos/xml_ref/op_definition.py
index 914f3a105..6a8368118 100644
--- a/python/vyos/xml_ref/op_definition.py
+++ b/python/vyos/xml_ref/op_definition.py
@@ -13,37 +13,243 @@
# You should have received a copy of the GNU Lesser General Public License
# along with this library. If not, see <http://www.gnu.org/licenses/>.
-from typing import TypedDict
from typing import TypeAlias
-from typing import Optional
from typing import Union
+from typing import Optional
+from typing import Iterator
+from dataclasses import dataclass
+from dataclasses import field
+from dataclasses import fields
+from dataclasses import asdict
+from itertools import filterfalse
+
+
+@dataclass
+class NodeData:
+ # pylint: disable=too-many-instance-attributes
+ name: str = ''
+ node_type: str = 'node'
+ help_text: str = ''
+ comp_help: dict[str, list] = field(default_factory=dict)
+ command: str = ''
+ standalone_help_text: Optional[str] = None
+ standalone_command: Optional[str] = None
+ path: list[str] = field(default_factory=list)
+ file: str = ''
+ children: list[tuple] = field(default_factory=list)
+
+
+OpKey: TypeAlias = tuple[str, str]
+OpData: TypeAlias = dict[OpKey, Union[NodeData, 'OpData']]
+
+
+def key_name(k: OpKey):
+ return k[0]
+
+
+def key_type(k: OpKey):
+ return k[1]
+
+
+def key_names(l: list): # noqa: E741
+ return list(map(lambda t: t[0], l))
+
+
+def keys_of_name(s: str, l: list): # noqa: E741
+ filter(lambda t: t[0] == s, l)
+
+
+def is_tag_node(t: tuple):
+ return t[1] == 'tagNode'
+
+
+def subdict_of_name(s: str, d: dict) -> dict:
+ res = {}
+ for t, v in d.items():
+ if not isinstance(t, tuple):
+ break
+ if key_name(t) == s:
+ res[t] = v
+
+ return res
+
+
+def next_keys(d: dict) -> list:
+ key_set = set()
+ for k in list(d.keys()):
+ if isinstance(d[k], dict):
+ key_set |= set(d[k].keys())
+ return list(key_set)
+
+
+def tuple_paths(d: dict) -> Iterator[list[tuple]]:
+ def func(d, path):
+ if isinstance(d, dict):
+ if not d:
+ yield path
+ for k, v in d.items():
+ if isinstance(k, tuple) and key_name(k) != '__node_data':
+ for r in func(v, path + [k]):
+ yield r
+ else:
+ yield path
+ else:
+ yield path
+ for r in func(d, []):
+ yield r
-class NodeData(TypedDict):
- node_type: Optional[str]
- help_text: Optional[str]
- comp_help: Optional[dict[str, list]]
- command: Optional[str]
- path: Optional[list[str]]
+def match_tuple_paths(
+ path: list[str], paths: list[list[tuple[str, str]]]
+) -> list[list[tuple[str, str]]]:
+ return list(filter(lambda p: key_names(p) == path, paths))
-PathData: TypeAlias = dict[str, Union[NodeData|list['PathData']]]
+
+def get_node_data(d: dict) -> NodeData:
+ return d.get(('__node_data', None), {})
+
+
+def get_node_data_at_path(d: dict, tpath):
+ if not tpath:
+ return {}
+ # operates on actual paths, not names:
+ if not isinstance(tpath[0], tuple):
+ raise ValueError('must be path of tuples')
+ while tpath and d:
+ d = d.get(tpath[0], {})
+ tpath = tpath[1:]
+
+ return get_node_data(d)
+
+
+def node_data_difference(a: NodeData, b: NodeData):
+ out = ''
+ for fld in fields(NodeData):
+ if fld.name in ('children', 'file'):
+ continue
+ a_fld = getattr(a, fld.name)
+ b_fld = getattr(b, fld.name)
+ if a_fld != b_fld:
+ out += f'prev: {a.file} {a.path} {fld.name}: {a_fld}\n'
+ out += f'new: {b.file} {b.path} {fld.name}: {b_fld}\n'
+ out += '\n'
+
+ return out
+
+
+def collapse(d: OpData, acc: dict = None) -> tuple[dict, str, bool]:
+ err = False
+ inner_err = False
+ out = ''
+ inner_out = ''
+ if acc is None:
+ acc = {}
+ if not isinstance(d, dict):
+ return d
+ for k, v in d.items():
+ if isinstance(k, tuple):
+ name = key_name(k)
+ if name != '__node_data':
+ new_data = get_node_data(v)
+ if name in list(acc.keys()):
+ err = True
+ prev_data = acc[name].get('__node_data', {})
+ if prev_data:
+ out += f'prev: {prev_data["file"]} {prev_data["path"]}\n'
+ else:
+ out += '\n'
+ out += f'new: {new_data.file} {new_data.path}\n\n'
+ else:
+ acc[name] = {}
+ acc[name]['__node_data'] = asdict(new_data)
+ inner, o, e = collapse(v)
+ inner_err |= e
+ inner_out += o
+ acc[name].update(inner)
+ else:
+ name = k
+ acc[name] = v
+
+ err |= inner_err
+ out += inner_out
+
+ return acc, out, err
class OpXml:
def __init__(self):
self.op_ref = {}
- def define(self, op_ref: list[PathData]) -> None:
+ def define(self, op_ref: dict) -> None:
self.op_ref = op_ref
- def _get_op_ref_path(self, path: list[str]) -> list[PathData]:
- def _get_path_list(path: list[str], l: list[PathData]) -> list[PathData]:
- if not path:
- return l
- for d in l:
- if path[0] in list(d):
- return _get_path_list(path[1:], d[path[0]])
- return []
- l = self.op_ref
- return _get_path_list(path, l)
+ def walk(self, func):
+ def walk_op_data(obj, func):
+ if isinstance(obj, dict):
+ for k, v in obj.items():
+ if isinstance(k, tuple):
+ res = func(k, v)
+ yield res
+ yield from walk_op_data(v, func)
+
+ return walk_op_data(self.op_ref, func)
+
+ @staticmethod
+ def get_node_data_func(k, v):
+ if key_name(k) == '__node_data':
+ return v
+ return None
+
+ def walk_node_data(self):
+ return filterfalse(lambda x: x is None, self.walk(self.get_node_data_func))
+
+ def lookup(
+ self, path: list[str], tag_values: bool = False, last_node_type: str = ''
+ ) -> (OpData, list[str]):
+ path = path[:]
+
+ ref_path = []
+
+ def prune_tree(d: dict, p: list[str]):
+ p = p[:]
+ if not d or not isinstance(d, dict) or not p:
+ return d
+ op_data: dict = subdict_of_name(p[0], d)
+ op_keys = list(op_data.keys())
+ ref_path.append(p[0])
+ if len(p) < 2:
+ # check last node_type
+ if last_node_type:
+ keys = list(filter(lambda t: t[1] == last_node_type, op_keys))
+ values = list(map(lambda t: op_data[t], keys))
+ return dict(zip(keys, values))
+ return op_data
+
+ if p[1] not in key_names(next_keys(op_data)):
+ # check if tag_values
+ if tag_values:
+ p = p[2:]
+ keys = list(filter(is_tag_node, op_keys))
+ values = list(map(lambda t: prune_tree(op_data[t], p), keys))
+ return dict(zip(keys, values))
+ return {}
+
+ p = p[1:]
+ op_data = list(map(lambda t: prune_tree(op_data[t], p), op_keys))
+
+ return dict(zip(op_keys, op_data))
+
+ return prune_tree(self.op_ref, path), ref_path
+
+ def lookup_node_data(
+ self, path: list[str], tag_values: bool = False, last_node_type: str = ''
+ ) -> list[NodeData]:
+ res = []
+ d, ref_path = self.lookup(path, tag_values, last_node_type)
+ paths = list(tuple_paths(d))
+ paths = match_tuple_paths(ref_path, paths)
+ for p in paths:
+ res.append(get_node_data_at_path(d, p))
+
+ return res