summaryrefslogtreecommitdiff
path: root/python/vyos/xml_ref
diff options
context:
space:
mode:
Diffstat (limited to 'python/vyos/xml_ref')
-rw-r--r--python/vyos/xml_ref/__init__.py28
-rw-r--r--python/vyos/xml_ref/definition.py101
-rwxr-xr-xpython/vyos/xml_ref/generate_cache.py68
-rw-r--r--python/vyos/xml_ref/pkg_cache/__init__.py0
-rwxr-xr-xpython/vyos/xml_ref/update_cache.py51
5 files changed, 165 insertions, 83 deletions
diff --git a/python/vyos/xml_ref/__init__.py b/python/vyos/xml_ref/__init__.py
index ad2130dca..bf434865d 100644
--- a/python/vyos/xml_ref/__init__.py
+++ b/python/vyos/xml_ref/__init__.py
@@ -13,8 +13,12 @@
# You should have received a copy of the GNU Lesser General Public License
# along with this library. If not, see <http://www.gnu.org/licenses/>.
+from typing import Optional, Union, TYPE_CHECKING
from vyos.xml_ref import definition
+if TYPE_CHECKING:
+ from vyos.config import ConfigDict
+
def load_reference(cache=[]):
if cache:
return cache[0]
@@ -23,11 +27,15 @@ def load_reference(cache=[]):
try:
from vyos.xml_ref.cache import reference
- xml.define(reference)
- cache.append(xml)
except Exception:
raise ImportError('no xml reference cache !!')
+ if not reference:
+ raise ValueError('empty xml reference cache !!')
+
+ xml.define(reference)
+ cache.append(xml)
+
return xml
def is_tag(path: list) -> bool:
@@ -48,12 +56,12 @@ def is_leaf(path: list) -> bool:
def cli_defined(path: list, node: str, non_local=False) -> bool:
return load_reference().cli_defined(path, node, non_local=non_local)
-def from_source(d: dict, path: list) -> bool:
- return load_reference().from_source(d, path)
-
def component_version() -> dict:
return load_reference().component_version()
+def default_value(path: list) -> Optional[Union[str, list]]:
+ return load_reference().default_value(path)
+
def multi_to_list(rpath: list, conf: dict) -> dict:
return load_reference().multi_to_list(rpath, conf)
@@ -68,8 +76,8 @@ def relative_defaults(rpath: list, conf: dict, get_first_key=False,
get_first_key=get_first_key,
recursive=recursive)
-def merge_defaults(path: list, conf: dict, get_first_key=False,
- recursive=False) -> dict:
- return load_reference().merge_defaults(path, conf,
- get_first_key=get_first_key,
- recursive=recursive)
+def from_source(d: dict, path: list) -> bool:
+ return definition.from_source(d, path)
+
+def ext_dict_merge(source: dict, destination: Union[dict, 'ConfigDict']):
+ return definition.ext_dict_merge(source, destination)
diff --git a/python/vyos/xml_ref/definition.py b/python/vyos/xml_ref/definition.py
index d95d580e2..c90c5ddbc 100644
--- a/python/vyos/xml_ref/definition.py
+++ b/python/vyos/xml_ref/definition.py
@@ -20,6 +20,45 @@ from typing import Optional, Union, Any, TYPE_CHECKING
if TYPE_CHECKING:
from vyos.config import ConfigDict
+def set_source_recursive(o: Union[dict, str, list], b: bool):
+ d = {}
+ if not isinstance(o, dict):
+ d = {'_source': b}
+ else:
+ for k, v in o.items():
+ d[k] = set_source_recursive(v, b)
+ d |= {'_source': b}
+ return d
+
+def source_dict_merge(src: dict, dest: dict):
+ from copy import deepcopy
+ dst = deepcopy(dest)
+ from_src = {}
+
+ for key, value in src.items():
+ if key not in dst:
+ dst[key] = value
+ from_src[key] = set_source_recursive(value, True)
+ elif isinstance(src[key], dict):
+ dst[key], f = source_dict_merge(src[key], dst[key])
+ f |= {'_source': False}
+ from_src[key] = f
+
+ return dst, from_src
+
+def ext_dict_merge(src: dict, dest: Union[dict, 'ConfigDict']):
+ d, f = source_dict_merge(src, dest)
+ if hasattr(d, '_from_defaults'):
+ setattr(d, '_from_defaults', f)
+ return d
+
+def from_source(d: dict, path: list) -> bool:
+ for key in path:
+ d = d[key] if key in d else {}
+ if not d or not isinstance(d, dict):
+ return False
+ return d.get('_source', False)
+
class Xml:
def __init__(self):
self.ref = {}
@@ -123,7 +162,7 @@ class Xml:
def component_version(self) -> dict:
d = {}
- for k, v in self.ref['component_version']:
+ for k, v in self.ref['component_version'].items():
d[k] = int(v)
return d
@@ -153,6 +192,15 @@ class Xml:
return default.split()
return default
+ def default_value(self, path: list) -> Optional[Union[str, list]]:
+ d = self._get_ref_path(path)
+ default = self._get_default_value(d)
+ if default is None:
+ return None
+ if self._is_multi_node(d) or self._is_tag_node(d):
+ return default.split()
+ return default
+
def get_defaults(self, path: list, get_first_key=False, recursive=False) -> dict:
"""Return dict containing default values below path
@@ -212,43 +260,6 @@ class Xml:
return False
return True
- def _set_source_recursive(self, o: Union[dict, str, list], b: bool):
- d = {}
- if not isinstance(o, dict):
- d = {'_source': b}
- else:
- for k, v in o.items():
- d[k] = self._set_source_recursive(v, b)
- d |= {'_source': b}
- return d
-
- # use local copy of function in module configdict, to avoid circular
- # import
- #
- # extend dict_merge to keep track of keys only in source
- def _dict_merge(self, source, destination):
- from copy import deepcopy
- dest = deepcopy(destination)
- from_source = {}
-
- for key, value in source.items():
- if key not in dest:
- dest[key] = value
- from_source[key] = self._set_source_recursive(value, True)
- elif isinstance(source[key], dict):
- dest[key], f = self._dict_merge(source[key], dest[key])
- f |= {'_source': False}
- from_source[key] = f
-
- return dest, from_source
-
- def from_source(self, d: dict, path: list) -> bool:
- for key in path:
- d = d[key] if key in d else {}
- if not d or not isinstance(d, dict):
- return False
- return d.get('_source', False)
-
def _relative_defaults(self, rpath: list, conf: dict, recursive=False) -> dict:
res: dict = {}
res = self.get_defaults(rpath, recursive=recursive,
@@ -289,17 +300,3 @@ class Xml:
res = {}
return res
-
- def merge_defaults(self, path: list, conf: Union[dict, 'ConfigDict'],
- get_first_key=False, recursive=False) -> dict:
- """Return config dict with defaults non-destructively merged
-
- This merges non-recursive defaults relative to the config dict.
- """
- d = self.relative_defaults(path, conf, get_first_key=get_first_key,
- recursive=recursive)
- d, f = self._dict_merge(d, conf)
- d = type(conf)(d)
- if hasattr(d, '_from_defaults'):
- setattr(d, '_from_defaults', f)
- return d
diff --git a/python/vyos/xml_ref/generate_cache.py b/python/vyos/xml_ref/generate_cache.py
index 792c6eea7..6a05d4608 100755
--- a/python/vyos/xml_ref/generate_cache.py
+++ b/python/vyos/xml_ref/generate_cache.py
@@ -18,10 +18,14 @@
import sys
import json
-import argparse
+from argparse import ArgumentParser
+from argparse import ArgumentTypeError
+from os import getcwd
+from os import makedirs
from os.path import join
from os.path import abspath
from os.path import dirname
+from os.path import basename
from xmltodict import parse
_here = dirname(__file__)
@@ -29,9 +33,10 @@ _here = dirname(__file__)
sys.path.append(join(_here, '..'))
from configtree import reference_tree_to_json, ConfigTreeError
-xml_cache = abspath(join(_here, 'cache.py'))
xml_cache_json = 'xml_cache.json'
xml_tmp = join('/tmp', xml_cache_json)
+pkg_cache = abspath(join(_here, 'pkg_cache'))
+ref_cache = abspath(join(_here, 'cache.py'))
node_data_fields = ("node_type", "multi", "valueless", "default_value")
@@ -45,16 +50,26 @@ def trim_node_data(cache: dict):
if isinstance(cache[k], dict):
trim_node_data(cache[k])
+def non_trivial(s):
+ if not s:
+ raise ArgumentTypeError("Argument must be non empty string")
+ return s
+
def main():
- parser = argparse.ArgumentParser(description='generate and save dict from xml defintions')
+ parser = ArgumentParser(description='generate and save dict from xml defintions')
parser.add_argument('--xml-dir', type=str, required=True,
help='transcluded xml interface-definition directory')
- parser.add_argument('--save-json-dir', type=str,
- help='directory to save json cache if needed')
- args = parser.parse_args()
-
- xml_dir = abspath(args.xml_dir)
- save_dir = abspath(args.save_json_dir) if args.save_json_dir else None
+ parser.add_argument('--package-name', type=non_trivial, default='vyos-1x',
+ help='name of current package')
+ parser.add_argument('--output-path', help='path to generated cache')
+ args = vars(parser.parse_args())
+
+ xml_dir = abspath(args['xml_dir'])
+ pkg_name = args['package_name'].replace('-','_')
+ cache_name = pkg_name + '_cache.py'
+ out_path = args['output_path']
+ path = out_path if out_path is not None else pkg_cache
+ xml_cache = abspath(join(path, cache_name))
try:
reference_tree_to_json(xml_dir, xml_tmp)
@@ -67,21 +82,30 @@ def main():
trim_node_data(d)
- if save_dir is not None:
- save_file = join(save_dir, xml_cache_json)
- with open(save_file, 'w') as f:
- f.write(json.dumps(d))
-
syntax_version = join(xml_dir, 'xml-component-version.xml')
- with open(syntax_version) as f:
- content = f.read()
+ try:
+ with open(syntax_version) as f:
+ component = f.read()
+ except FileNotFoundError:
+ if pkg_name != 'vyos_1x':
+ component = ''
+ else:
+ print("\nWARNING: missing xml-component-version.xml\n")
+ sys.exit(1)
- parsed = parse(content)
- converted = parsed['interfaceDefinition']['syntaxVersion']
+ if component:
+ parsed = parse(component)
+ else:
+ parsed = None
version = {}
- for i in converted:
- tmp = {i['@component']: i['@version']}
- version |= tmp
+ # addon package definitions may have empty (== 0) version info
+ if parsed is not None and parsed['interfaceDefinition'] is not None:
+ converted = parsed['interfaceDefinition']['syntaxVersion']
+ if not isinstance(converted, list):
+ converted = [converted]
+ for i in converted:
+ tmp = {i['@component']: i['@version']}
+ version |= tmp
version = {"component_version": version}
@@ -90,5 +114,7 @@ def main():
with open(xml_cache, 'w') as f:
f.write(f'reference = {str(d)}')
+ print(cache_name)
+
if __name__ == '__main__':
main()
diff --git a/python/vyos/xml_ref/pkg_cache/__init__.py b/python/vyos/xml_ref/pkg_cache/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/python/vyos/xml_ref/pkg_cache/__init__.py
diff --git a/python/vyos/xml_ref/update_cache.py b/python/vyos/xml_ref/update_cache.py
new file mode 100755
index 000000000..0842bcbe9
--- /dev/null
+++ b/python/vyos/xml_ref/update_cache.py
@@ -0,0 +1,51 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 2023 VyOS maintainers and contributors
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 or later as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+#
+import os
+from copy import deepcopy
+from generate_cache import pkg_cache
+from generate_cache import ref_cache
+
+def dict_merge(source, destination):
+ dest = deepcopy(destination)
+
+ for key, value in source.items():
+ if key not in dest:
+ dest[key] = value
+ elif isinstance(source[key], dict):
+ dest[key] = dict_merge(source[key], dest[key])
+
+ return dest
+
+def main():
+ res = {}
+ cache_dir = os.path.basename(pkg_cache)
+ for mod in os.listdir(pkg_cache):
+ mod = os.path.splitext(mod)[0]
+ if not mod.endswith('_cache'):
+ continue
+ d = getattr(__import__(f'{cache_dir}.{mod}', fromlist=[mod]), 'reference')
+ if mod == 'vyos_1x_cache':
+ res = dict_merge(res, d)
+ else:
+ res = dict_merge(d, res)
+
+ with open(ref_cache, 'w') as f:
+ f.write(f'reference = {str(res)}')
+
+if __name__ == '__main__':
+ main()