diff options
-rw-r--r-- | python/vyos/xml/.gitignore | 1 | ||||
-rw-r--r-- | python/vyos/xml/__init__.py | 65 | ||||
-rw-r--r-- | python/vyos/xml/cache/__init__.py | 0 | ||||
-rw-r--r-- | python/vyos/xml/definition.py | 360 | ||||
-rwxr-xr-x | python/vyos/xml/generate.py | 67 | ||||
-rw-r--r-- | python/vyos/xml/kw.py | 83 | ||||
-rw-r--r-- | python/vyos/xml/load.py | 300 | ||||
-rw-r--r-- | python/vyos/xml/test_xml.py | 271 | ||||
-rwxr-xr-x | src/op_mode/otp.py | 18 |
9 files changed, 7 insertions, 1158 deletions
diff --git a/python/vyos/xml/.gitignore b/python/vyos/xml/.gitignore deleted file mode 100644 index e934adfd1..000000000 --- a/python/vyos/xml/.gitignore +++ /dev/null @@ -1 +0,0 @@ -cache/ diff --git a/python/vyos/xml/__init__.py b/python/vyos/xml/__init__.py deleted file mode 100644 index 6db446a40..000000000 --- a/python/vyos/xml/__init__.py +++ /dev/null @@ -1,65 +0,0 @@ -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This library is free software; you can redistribute it and/or modify it under the terms of -# the GNU Lesser General Public License as published by the Free Software Foundation; -# either version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; -# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. -# See the GNU Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public License along with this library; -# if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - - -from vyos.xml import definition -from vyos.xml import load -from vyos.xml import kw - - -def load_configuration(cache=[]): - if cache: - return cache[0] - - xml = definition.XML() - - try: - from vyos.xml.cache import configuration - xml.update(configuration.definition) - cache.append(xml) - except Exception: - xml = definition.XML() - print('no xml configuration cache') - xml.update(load.xml(load.configuration_definition)) - - return xml - - -# def is_multi(lpath): -# return load_configuration().is_multi(lpath) - - -def is_tag(lpath): - return load_configuration().is_tag(lpath) - - -def is_leaf(lpath, flat=True): - return load_configuration().is_leaf(lpath, flat) - -def component_version(): - return load_configuration().component_version() - -def defaults(lpath, flat=False): - return load_configuration().defaults(lpath, flat) - - -def multi_to_list(lpath, conf): - return load_configuration().multi_to_list(lpath, conf) - - -if __name__ == '__main__': - print(defaults(['service'], flat=True)) - print(defaults(['service'], flat=False)) - - print(is_tag(["system", "login", "user", "vyos", "authentication", "public-keys"])) - print(is_tag(['protocols', 'static', 'multicast', 'route', '0.0.0.0/0', 'next-hop'])) diff --git a/python/vyos/xml/cache/__init__.py b/python/vyos/xml/cache/__init__.py deleted file mode 100644 index e69de29bb..000000000 --- a/python/vyos/xml/cache/__init__.py +++ /dev/null diff --git a/python/vyos/xml/definition.py b/python/vyos/xml/definition.py deleted file mode 100644 index bc3892b42..000000000 --- a/python/vyos/xml/definition.py +++ /dev/null @@ -1,360 +0,0 @@ -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This library is free software; you can redistribute it and/or modify it under the terms of -# the GNU Lesser General Public License as published by the Free Software Foundation; -# either version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; -# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. -# See the GNU Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public License along with this library; -# if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - -from vyos.xml import kw - -# As we index by key, the name is first and then the data: -# {'dummy': { -# '[node]': '[tagNode]', -# 'address': { ... } -# } } - -# so when we encounter a tagNode, we are really encountering -# the tagNode data. - - -class XML(dict): - def __init__(self): - self[kw.tree] = {} - self[kw.priorities] = {} - self[kw.owners] = {} - self[kw.default] = {} - self[kw.tags] = [] - self[kw.component_version] = {} - - dict.__init__(self) - - self.tree = self[kw.tree] - # the options which matched the last incomplete world we had - # or the last word in a list - self.options = [] - # store all the part of the command we processed - self.inside = [] - # should we check the data pass with the constraints - self.check = False - # are we still typing a word - self.filling = False - # do what have the tagNode value ? - self.filled = False - # last word seen - self.word = '' - # do we have all the data we want ? - self.final = False - # do we have too much data ? - self.extra = False - # what kind of node are we in plain vs data not - self.plain = True - - def reset(self): - self.tree = self[kw.tree] - self.options = [] - self.inside = [] - self.check = False - self.filling = False - self.filled = False - self.word = '' - self.final = False - self.extra = False - self.plain = True - - # from functools import lru_cache - # @lru_cache(maxsize=100) - # XXX: need to use cachetool instead - for later - - def traverse(self, cmd): - self.reset() - - # using split() intead of split(' ') eats the final ' ' - words = cmd.split(' ') - passed = [] - word = '' - data_node = False - space = False - - while words: - word = words.pop(0) - space = word == '' - perfect = False - if word in self.tree: - passed = [] - perfect = True - self.tree = self.tree[word] - data_node = self.tree[kw.node] - self.inside.append(word) - word = '' - continue - if word and data_node: - passed.append(word) - - is_valueless = self.tree.get(kw.valueless, False) - is_leafNode = data_node == kw.leafNode - is_dataNode = data_node in (kw.leafNode, kw.tagNode) - named_options = [_ for _ in self.tree if not kw.found(_)] - - if is_leafNode: - self.final = is_valueless or len(passed) > 0 - self.extra = is_valueless and len(passed) > 0 - self.check = len(passed) >= 1 - else: - self.final = False - self.extra = False - self.check = len(passed) == 1 and not space - - if self.final: - self.word = ' '.join(passed) - else: - self.word = word - - if self.final: - self.filling = True - else: - self.filling = not perfect and bool(cmd and word != '') - - self.filled = self.final or (is_dataNode and len(passed) > 0 and word == '') - - if is_dataNode and len(passed) == 0: - self.options = [] - elif word: - if data_node != kw.plainNode or len(passed) == 1: - self.options = [_ for _ in self.tree if _.startswith(word)] - self.options.sort() - else: - self.options = [] - else: - self.options = named_options - self.options.sort() - - self.plain = not is_dataNode - - # self.debug() - - return self.word - - def speculate(self): - if len(self.options) == 1: - self.tree = self.tree[self.options[0]] - self.word = '' - if self.tree.get(kw.node,'') not in (kw.tagNode, kw.leafNode): - self.options = [_ for _ in self.tree if not kw.found(_)] - self.options.sort() - - def checks(self, cmd): - # as we move thought the named node twice - # the first time we get the data with the node - # and the second with the pass parameters - xml = self[kw.tree] - - words = cmd.split(' ') - send = True - last = [] - while words: - word = words.pop(0) - if word in xml: - xml = xml[word] - send = True - last = [] - continue - if xml[kw.node] in (kw.tagNode, kw.leafNode): - if kw.constraint in xml: - if send: - yield (word, xml[kw.constraint]) - send = False - else: - last.append((word, None)) - if len(last) >= 2: - yield last[0] - - def summary(self): - yield ('enter', '[ summary ]', str(self.inside)) - - if kw.help not in self.tree: - yield ('skip', '[ summary ]', str(self.inside)) - return - - if self.filled: - return - - yield('', '', '\nHelp:') - - if kw.help in self.tree: - summary = self.tree[kw.help].get(kw.summary) - values = self.tree[kw.help].get(kw.valuehelp, []) - if summary: - yield(summary, '', '') - for value in values: - yield(value[kw.format], value[kw.description], '') - - def constraint(self): - yield ('enter', '[ constraint ]', str(self.inside)) - - if kw.help in self.tree: - yield ('skip', '[ constraint ]', str(self.inside)) - return - if kw.error not in self.tree: - yield ('skip', '[ constraint ]', str(self.inside)) - return - if not self.word or self.filling: - yield ('skip', '[ constraint ]', str(self.inside)) - return - - yield('', '', '\nData Constraint:') - - yield('', 'constraint', str(self.tree[kw.error])) - - def listing(self): - yield ('enter', '[ listing ]', str(self.inside)) - - # only show the details when we passed the tagNode data - if not self.plain and not self.filled: - yield ('skip', '[ listing ]', str(self.inside)) - return - - yield('', '', '\nPossible completions:') - - options = list(self.tree.keys()) - options.sort() - for option in options: - if kw.found(option): - continue - if not option.startswith(self.word): - continue - inner = self.tree[option] - prefix = '+> ' if inner.get(kw.node, '') != kw.leafNode else ' ' - if kw.help in inner: - yield (prefix + option, inner[kw.help].get(kw.summary), '') - else: - yield (prefix + option, '(no help available)', '') - - def debug(self): - print('------') - print("word '%s'" % self.word) - print("filling " + str(self.filling)) - print("filled " + str(self.filled)) - print("final " + str(self.final)) - print("extra " + str(self.extra)) - print("plain " + str(self.plain)) - print("options " + str(self.options)) - - # from functools import lru_cache - # @lru_cache(maxsize=100) - # XXX: need to use cachetool instead - for later - - def component_version(self) -> dict: - d = {} - for k in sorted(self[kw.component_version]): - d[k] = int(self[kw.component_version][k]) - return d - - def defaults(self, lpath, flat): - d = self[kw.default] - for k in lpath: - d = d.get(k, {}) - - if not flat: - # _flatten will make this conversion - d = self.multi_to_list(lpath, d, defaults=True) - - r = {} - for k in d: - under = k.replace('-','_') - if isinstance(d[k],dict): - r[under] = self.defaults(lpath + [k], flat) - continue - r[under] = d[k] - return r - - def _flatten(inside, index, d): - r = {} - local = inside[index:] - prefix = '_'.join(_.replace('-','_') for _ in local) + '_' if local else '' - for k in d: - under = prefix + k.replace('-','_') - level = inside + [k] - if isinstance(d[k],dict): - r.update(_flatten(level, index, d[k])) - continue - if self.is_multi(level, with_tag=False): - r[under] = [_.strip() for _ in d[k].split(',')] - continue - r[under] = d[k] - return r - - return _flatten(lpath, len(lpath), d) - - def multi_to_list(self, lpath, conf, defaults=False): - r = {} - for k in conf: - # key mangling could also be done here - # it would prevent two parsing of the config tree - # under = k.replace('-','_') - under = k - fpath = lpath + [k] - if isinstance(conf[k],dict): - r[under] = self.multi_to_list(fpath, conf[k], defaults) - continue - value = conf[k] - if self.is_multi(fpath) and not isinstance(value, list): - if not defaults: - value = [value] - else: - value = value.split(' ') - r[under] = value - return r - - # from functools import lru_cache - # @lru_cache(maxsize=100) - # XXX: need to use cachetool instead - for later - - def _tree(self, lpath, with_tag=True): - """ - returns the part of the tree searched or None if it does not exists - if with_tag is set, this is a configuration path (with tagNode names) - and tag name will be removed from the path when traversing the tree - """ - tree = self[kw.tree] - spath = lpath.copy() - while spath: - p = spath.pop(0) - if p not in tree: - return None - tree = tree[p] - if with_tag and spath and tree[kw.node] == kw.tagNode: - spath.pop(0) - return tree - - def _get(self, lpath, tag, with_tag=True): - tree = self._tree(lpath, with_tag) - if tree is None: - return None - return tree.get(tag, None) - - def is_multi(self, lpath, with_tag=True): - tree = self._get(lpath, kw.multi, with_tag) - if tree is None: - return None - return tree is True - - def is_tag(self, lpath, with_tag=True): - tree = self._get(lpath, kw.node, with_tag) - if tree is None: - return None - return tree == kw.tagNode - - def is_leaf(self, lpath, with_tag=True): - tree = self._get(lpath, kw.node, with_tag) - if tree is None: - return None - return tree == kw.leafNode - - def exists(self, lpath, with_tag=True): - return self._get(lpath, kw.node, with_tag) is not None diff --git a/python/vyos/xml/generate.py b/python/vyos/xml/generate.py deleted file mode 100755 index 267cb84f6..000000000 --- a/python/vyos/xml/generate.py +++ /dev/null @@ -1,67 +0,0 @@ - -#!/usr/bin/env python3 - -# Copyright (C) 2020-2024 VyOS maintainers and contributors -# -# This library is free software; you can redistribute it and/or modify it under the terms of -# the GNU Lesser General Public License as published by the Free Software Foundation; -# either version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; -# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. -# See the GNU Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public License along with this library; -# if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - -import os -import pprint -import argparse - -from vyos.xml import load - -# import json -# def save_json(fname, loaded): -# with open(fname, 'w') as w: -# print(f'saving {fname}') -# w.write(json.dumps(loaded)) - - -def save_dict(fname, loaded): - with open(fname, 'w') as w: - print(f'saving {fname}') - w.write(f'# generated by {__file__}\n\n') - w.write('definition = ') - w.write(str(loaded)) - - -def main(): - parser = argparse.ArgumentParser(description='generate python file from xml defintions') - parser.add_argument('--conf-folder', type=str, default=load.configuration_definition, help='XML interface definition folder') - parser.add_argument('--conf-cache', type=str, default=load.configuration_cache, help='python file with the conf mode dict') - - # parser.add_argument('--op-folder', type=str, default=load.operational_definition, help='XML interface definition folder') - # parser.add_argument('--op-cache', type=str, default=load.operational_cache, help='python file with the conf mode dict') - - parser.add_argument('--dry', action='store_true', help='dry run, print to screen') - - args = parser.parse_args() - - if os.path.exists(load.configuration_cache): - os.remove(load.configuration_cache) - # if os.path.exists(load.operational_cache): - # os.remove(load.operational_cache) - - conf = load.xml(args.conf_folder) - # op = load.xml(args.op_folder) - - if args.dry: - pprint.pprint(conf) - return - - save_dict(args.conf_cache, conf) - # save_dict(args.op_cache, op) - - -if __name__ == '__main__': - main() diff --git a/python/vyos/xml/kw.py b/python/vyos/xml/kw.py deleted file mode 100644 index 48226ce96..000000000 --- a/python/vyos/xml/kw.py +++ /dev/null @@ -1,83 +0,0 @@ -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This library is free software; you can redistribute it and/or modify it under the terms of -# the GNU Lesser General Public License as published by the Free Software Foundation; -# either version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; -# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. -# See the GNU Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public License along with this library; -# if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - -# all named used as key (keywords) in this module are defined here. -# using variable name will allow the linter to warn on typos -# it separates our dict syntax from the xmldict one, making it easy to change - -# we are redefining a python keyword "list" for ease - - -def found(word): - """ - is the word following the format for a keyword - """ - return word and word[0] == '[' and word[-1] == ']' - - -# root - -tree = '[tree]' -priorities = '[priorities]' -owners = '[owners]' -tags = '[tags]' -default = '[default]' -component_version = '[component_version]' - -# nodes - -node = '[node]' - -plainNode = '[plainNode]' -leafNode = '[leafNode]' -tagNode = '[tagNode]' - -owner = '[owner]' - -valueless = '[valueless]' -multi = '[multi]' -hidden = '[hidden]' - -# properties - -priority = '[priority]' - -completion = '[completion]' -list = '[list]' -script = '[script]' -path = '[path]' - -# help - -help = '[help]' - -summary = '[summary]' - -valuehelp = '[valuehelp]' -format = 'format' -description = 'description' - -# constraint - -constraint = '[constraint]' -name = '[name]' - -regex = '[regex]' -validator = '[validator]' -argument = '[argument]' - -error = '[error]' - -# created - -node = '[node]' diff --git a/python/vyos/xml/load.py b/python/vyos/xml/load.py deleted file mode 100644 index f842ff9ce..000000000 --- a/python/vyos/xml/load.py +++ /dev/null @@ -1,300 +0,0 @@ -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This library is free software; you can redistribute it and/or modify it under the terms of -# the GNU Lesser General Public License as published by the Free Software Foundation; -# either version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; -# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. -# See the GNU Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public License along with this library; -# if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - -import glob - -from os.path import join -from os.path import abspath -from os.path import dirname - -import xmltodict - -from vyos import debug -from vyos.xml import kw -from vyos.xml import definition - - -# where the files are located - -_here = dirname(__file__) - -configuration_definition = abspath(join(_here, '..', '..' ,'..', 'interface-definitions')) -configuration_cache = abspath(join(_here, 'cache', 'configuration.py')) - -operational_definition = abspath(join(_here, '..', '..' ,'..', 'op-mode-definitions')) -operational_cache = abspath(join(_here, 'cache', 'operational.py')) - - -# This code is only ran during the creation of the debian package -# therefore we accept that failure can be fatal and not handled -# gracefully. - - -def _fatal(debug_info=''): - """ - raise a RuntimeError or if in developer mode stop the code - """ - if not debug.enabled('developer'): - raise RuntimeError(str(debug_info)) - - if debug_info: - print(debug_info) - breakpoint() - - -def _safe_update(dict1, dict2): - """ - return a dict made of two, raise if any root key would be overwritten - """ - if set(dict1).intersection(dict2): - raise RuntimeError('overlapping configuration') - return {**dict1, **dict2} - - -def _merge(dict1, dict2): - """ - merge dict2 in to dict1 and return it - """ - for k in list(dict2): - if k not in dict1: - dict1[k] = dict2[k] - continue - if isinstance(dict1[k], dict) and isinstance(dict2[k], dict): - dict1[k] = _merge(dict1[k], dict2[k]) - elif isinstance(dict1[k], list) and isinstance(dict2[k], list): - dict1[k].extend(dict2[k]) - elif dict1[k] == dict2[k]: - continue - else: - dict1[k] = dict2[k] - return dict1 - - -def _include(fname, folder=''): - """ - return the content of a file, including any file referenced with a #include - """ - if not folder: - folder = dirname(fname) - content = '' - with open(fname, 'r') as r: - for line in r.readlines(): - if '#include' in line: - content += _include(join(folder,line.strip()[10:-1]), folder) - continue - content += line - return content - - -def _format_nodes(inside, conf, xml): - r = {} - while conf: - nodetype = '' - nodename = '' - if 'node' in conf.keys(): - nodetype = 'node' - nodename = kw.plainNode - elif 'leafNode' in conf.keys(): - nodetype = 'leafNode' - nodename = kw.leafNode - elif 'tagNode' in conf.keys(): - nodetype = 'tagNode' - nodename = kw.tagNode - elif 'syntaxVersion' in conf.keys(): - sv = conf.pop('syntaxVersion') - if isinstance(sv, list): - for v in sv: - xml[kw.component_version][v['@component']] = v['@version'] - else: - xml[kw.component_version][sv['@component']] = sv['@version'] - continue - else: - _fatal(conf.keys()) - - nodes = conf.pop(nodetype) - if isinstance(nodes, list): - for node in nodes: - name = node.pop('@name') - into = inside + [name] - if name in r: - _merge(r[name], _format_node(into, node, xml)) - else: - r[name] = _format_node(into, node, xml) - r[name][kw.node] = nodename - xml[kw.tags].append(' '.join(into)) - else: - node = nodes - name = node.pop('@name') - into = inside + [name] - if name in r: - _merge(r[name], _format_node(inside + [name], node, xml)) - else: - r[name] = _format_node(inside + [name], node, xml) - r[name][kw.node] = nodename - xml[kw.tags].append(' '.join(into)) - return r - - -def _set_validator(r, validator): - v = {} - while validator: - if '@name' in validator: - v[kw.name] = validator.pop('@name') - elif '@argument' in validator: - v[kw.argument] = validator.pop('@argument') - else: - _fatal(validator) - r[kw.constraint][kw.validator].append(v) - - -def _format_node(inside, conf, xml): - r = { - kw.valueless: False, - kw.multi: False, - kw.hidden: False, - } - - if '@owner' in conf: - owner = conf.pop('@owner', '') - r[kw.owner] = owner - xml[kw.owners][' '.join(inside)] = owner - - while conf: - keys = conf.keys() - if 'children' in keys: - children = conf.pop('children') - - if isinstance(conf, list): - for child in children: - _merge(r, _format_nodes(inside, child, xml)) - else: - child = children - _merge(r, _format_nodes(inside, child, xml)) - - elif 'properties' in keys: - properties = conf.pop('properties') - - while properties: - if 'help' in properties: - helpname = properties.pop('help') - r[kw.help] = {} - r[kw.help][kw.summary] = helpname - - elif 'valueHelp' in properties: - valuehelps = properties.pop('valueHelp') - if kw.valuehelp in r[kw.help]: - _fatal(valuehelps) - r[kw.help][kw.valuehelp] = [] - if isinstance(valuehelps, list): - for valuehelp in valuehelps: - r[kw.help][kw.valuehelp].append(dict(valuehelp)) - else: - valuehelp = valuehelps - r[kw.help][kw.valuehelp].append(dict(valuehelp)) - - elif 'constraint' in properties: - constraint = properties.pop('constraint') - r[kw.constraint] = {} - while constraint: - if 'regex' in constraint: - regexes = constraint.pop('regex') - if kw.regex in kw.constraint: - _fatal(regexes) - r[kw.constraint][kw.regex] = [] - if isinstance(regexes, list): - r[kw.constraint][kw.regex] = [] - for regex in regexes: - r[kw.constraint][kw.regex].append(regex) - else: - regex = regexes - r[kw.constraint][kw.regex].append(regex) - elif 'validator' in constraint: - validators = constraint.pop('validator') - if kw.validator in r[kw.constraint]: - _fatal(validators) - r[kw.constraint][kw.validator] = [] - if isinstance(validators, list): - for validator in validators: - _set_validator(r, validator) - else: - validator = validators - _set_validator(r, validator) - else: - _fatal(constraint) - - elif 'constraintGroup' in properties: - properties.pop('constraintGroup') - - elif 'constraintErrorMessage' in properties: - r[kw.error] = properties.pop('constraintErrorMessage') - - elif 'valueless' in properties: - properties.pop('valueless') - r[kw.valueless] = True - - elif 'multi' in properties: - properties.pop('multi') - r[kw.multi] = True - - elif 'hidden' in properties: - properties.pop('hidden') - r[kw.hidden] = True - - elif 'completionHelp' in properties: - completionHelp = properties.pop('completionHelp') - r[kw.completion] = {} - while completionHelp: - if 'list' in completionHelp: - r[kw.completion][kw.list] = completionHelp.pop('list') - elif 'script' in completionHelp: - r[kw.completion][kw.script] = completionHelp.pop('script') - elif 'path' in completionHelp: - r[kw.completion][kw.path] = completionHelp.pop('path') - else: - _fatal(completionHelp.keys()) - - elif 'priority' in properties: - priority = int(properties.pop('priority')) - r[kw.priority] = priority - xml[kw.priorities].setdefault(priority, []).append(' '.join(inside)) - - else: - _fatal(properties.keys()) - - elif 'defaultValue' in keys: - default = conf.pop('defaultValue') - x = xml[kw.default] - for k in inside[:-1]: - x = x.setdefault(k,{}) - x[inside[-1]] = '' if default is None else default - - else: - _fatal(conf) - - return r - - -def xml(folder): - """ - read all the xml in the folder - """ - xml = definition.XML() - for fname in glob.glob(f'{folder}/*.xml.in'): - parsed = xmltodict.parse(_include(fname)) - formated = _format_nodes([], parsed['interfaceDefinition'], xml) - _merge(xml[kw.tree], formated) - # fix the configuration root node for completion - # as we moved all the name "up" the chain to use them as index. - xml[kw.tree][kw.node] = kw.plainNode - # XXX: do the others - return xml diff --git a/python/vyos/xml/test_xml.py b/python/vyos/xml/test_xml.py deleted file mode 100644 index 50fdc7470..000000000 --- a/python/vyos/xml/test_xml.py +++ /dev/null @@ -1,271 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2020-2024 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -from unittest import TestCase -from vyos.xml import load_configuration - -class TestSearch(TestCase): - def setUp(self): - self.xml = load_configuration() - - def test_(self): - last = self.xml.traverse("") - self.assertEqual(last, '') - self.assertEqual(self.xml.inside, []) - self.assertEqual(self.xml.options, ['firewall', 'high-availability', 'interfaces', 'nat', 'protocols', 'service', 'system', 'vpn', 'vrf']) - self.assertEqual(self.xml.filling, False) - self.assertEqual(self.xml.word, last) - self.assertEqual(self.xml.check, False) - self.assertEqual(self.xml.final, False) - self.assertEqual(self.xml.extra, False) - self.assertEqual(self.xml.filled, False) - self.assertEqual(self.xml.plain, True) - - def test_i(self): - last = self.xml.traverse("i") - self.assertEqual(last, 'i') - self.assertEqual(self.xml.inside, []) - self.assertEqual(self.xml.options, ['interfaces']) - self.assertEqual(self.xml.filling, True) - self.assertEqual(self.xml.word, last) - self.assertEqual(self.xml.check, False) - self.assertEqual(self.xml.final, False) - self.assertEqual(self.xml.extra, False) - self.assertEqual(self.xml.filled, False) - self.assertEqual(self.xml.plain, True) - - def test_interfaces(self): - last = self.xml.traverse("interfaces") - self.assertEqual(last, '') - self.assertEqual(self.xml.inside, ['interfaces']) - self.assertEqual(self.xml.options, ['bonding', 'bridge', 'dummy', 'ethernet', 'geneve', 'l2tpv3', 'loopback', 'macsec', 'openvpn', 'pppoe', 'pseudo-ethernet', 'tunnel', 'vxlan', 'wireguard', 'wireless', 'wwan']) - self.assertEqual(self.xml.filling, False) - self.assertEqual(self.xml.word, '') - self.assertEqual(self.xml.check, False) - self.assertEqual(self.xml.final, False) - self.assertEqual(self.xml.extra, False) - self.assertEqual(self.xml.filled, False) - self.assertEqual(self.xml.plain, True) - - def test_interfaces_space(self): - last = self.xml.traverse("interfaces ") - self.assertEqual(last, '') - self.assertEqual(self.xml.inside, ['interfaces']) - self.assertEqual(self.xml.options, ['bonding', 'bridge', 'dummy', 'ethernet', 'geneve', 'l2tpv3', 'loopback', 'macsec', 'openvpn', 'pppoe', 'pseudo-ethernet', 'tunnel', 'vxlan', 'wireguard', 'wireless', 'wwan']) - self.assertEqual(self.xml.filling, False) - self.assertEqual(self.xml.word, last) - self.assertEqual(self.xml.check, False) - self.assertEqual(self.xml.final, False) - self.assertEqual(self.xml.extra, False) - self.assertEqual(self.xml.filled, False) - self.assertEqual(self.xml.plain, True) - - def test_interfaces_w(self): - last = self.xml.traverse("interfaces w") - self.assertEqual(last, 'w') - self.assertEqual(self.xml.inside, ['interfaces']) - self.assertEqual(self.xml.options, ['wireguard', 'wireless', 'wwan']) - self.assertEqual(self.xml.filling, True) - self.assertEqual(self.xml.word, last) - self.assertEqual(self.xml.check, True) - self.assertEqual(self.xml.final, False) - self.assertEqual(self.xml.extra, False) - self.assertEqual(self.xml.filled, False) - self.assertEqual(self.xml.plain, True) - - def test_interfaces_ethernet(self): - last = self.xml.traverse("interfaces ethernet") - self.assertEqual(last, '') - self.assertEqual(self.xml.inside, ['interfaces', 'ethernet']) - self.assertEqual(self.xml.options, []) - self.assertEqual(self.xml.filling, False) - self.assertEqual(self.xml.word, '') - self.assertEqual(self.xml.check, False) - self.assertEqual(self.xml.final, False) - self.assertEqual(self.xml.extra, False) - self.assertEqual(self.xml.filled, False) - self.assertEqual(self.xml.plain, False) - - def test_interfaces_ethernet_space(self): - last = self.xml.traverse("interfaces ethernet ") - self.assertEqual(last, '') - self.assertEqual(self.xml.inside, ['interfaces', 'ethernet']) - self.assertEqual(self.xml.options, []) - self.assertEqual(self.xml.filling, False) - self.assertEqual(self.xml.word, '') - self.assertEqual(self.xml.check, False) - self.assertEqual(self.xml.final, False) - self.assertEqual(self.xml.extra, False) - self.assertEqual(self.xml.filled, False) - self.assertEqual(self.xml.plain, False) - - def test_interfaces_ethernet_e(self): - last = self.xml.traverse("interfaces ethernet e") - self.assertEqual(last, 'e') - self.assertEqual(self.xml.inside, ['interfaces', 'ethernet']) - self.assertEqual(self.xml.options, []) - self.assertEqual(self.xml.filling, True) - self.assertEqual(self.xml.word, last) - self.assertEqual(self.xml.check, True) - self.assertEqual(self.xml.final, False) - self.assertEqual(self.xml.extra, False) - self.assertEqual(self.xml.filled, False) - self.assertEqual(self.xml.plain, False) - - def test_interfaces_la(self): - last = self.xml.traverse("interfaces ethernet la") - self.assertEqual(last, 'la') - self.assertEqual(self.xml.inside, ['interfaces', 'ethernet']) - self.assertEqual(self.xml.options, []) - self.assertEqual(self.xml.filling, True) - self.assertEqual(self.xml.word, last) - self.assertEqual(self.xml.check, True) - self.assertEqual(self.xml.final, False) - self.assertEqual(self.xml.extra, False) - self.assertEqual(self.xml.filled, False) - self.assertEqual(self.xml.plain, False) - - def test_interfaces_ethernet_lan0(self): - last = self.xml.traverse("interfaces ethernet lan0") - self.assertEqual(last, 'lan0') - self.assertEqual(self.xml.inside, ['interfaces', 'ethernet']) - self.assertEqual(self.xml.options, []) - self.assertEqual(self.xml.filling, True) - self.assertEqual(self.xml.word, last) - self.assertEqual(self.xml.check, True) - self.assertEqual(self.xml.final, False) - self.assertEqual(self.xml.extra, False) - self.assertEqual(self.xml.filled, False) - self.assertEqual(self.xml.plain, False) - - def test_interfaces_ethernet_lan0_space(self): - last = self.xml.traverse("interfaces ethernet lan0 ") - self.assertEqual(last, '') - self.assertEqual(self.xml.inside, ['interfaces', 'ethernet']) - self.assertEqual(len(self.xml.options), 19) - self.assertEqual(self.xml.filling, False) - self.assertEqual(self.xml.word, last) - self.assertEqual(self.xml.check, False) - self.assertEqual(self.xml.final, False) - self.assertEqual(self.xml.extra, False) - self.assertEqual(self.xml.filled, True) - self.assertEqual(self.xml.plain, False) - - def test_interfaces_ethernet_lan0_ad(self): - last = self.xml.traverse("interfaces ethernet lan0 ad") - self.assertEqual(last, 'ad') - self.assertEqual(self.xml.inside, ['interfaces', 'ethernet']) - self.assertEqual(self.xml.options, ['address']) - self.assertEqual(self.xml.filling, True) - self.assertEqual(self.xml.word, last) - self.assertEqual(self.xml.check, False) - self.assertEqual(self.xml.final, False) - self.assertEqual(self.xml.extra, False) - self.assertEqual(self.xml.filled, False) - self.assertEqual(self.xml.plain, False) - - def test_interfaces_ethernet_lan0_address(self): - last = self.xml.traverse("interfaces ethernet lan0 address") - self.assertEqual(last, '') - self.assertEqual(self.xml.inside, ['interfaces', 'ethernet', 'address']) - self.assertEqual(self.xml.options, []) - self.assertEqual(self.xml.filling, False) - self.assertEqual(self.xml.word, last) - self.assertEqual(self.xml.check, False) - self.assertEqual(self.xml.final, False) - self.assertEqual(self.xml.extra, False) - self.assertEqual(self.xml.filled, False) - self.assertEqual(self.xml.plain, False) - - def test_interfaces_ethernet_lan0_address_space(self): - last = self.xml.traverse("interfaces ethernet lan0 address ") - self.assertEqual(last, '') - self.assertEqual(self.xml.inside, ['interfaces', 'ethernet', 'address']) - self.assertEqual(self.xml.options, []) - self.assertEqual(self.xml.filling, False) - self.assertEqual(self.xml.word, last) - self.assertEqual(self.xml.check, False) - self.assertEqual(self.xml.final, False) - self.assertEqual(self.xml.extra, False) - self.assertEqual(self.xml.filled, False) - self.assertEqual(self.xml.plain, False) - - def test_interfaces_ethernet_lan0_address_space_11(self): - last = self.xml.traverse("interfaces ethernet lan0 address 1.1") - self.assertEqual(last, '1.1') - self.assertEqual(self.xml.inside, ['interfaces', 'ethernet', 'address']) - self.assertEqual(self.xml.options, []) - self.assertEqual(self.xml.filling, True) - self.assertEqual(self.xml.word, last) - self.assertEqual(self.xml.check, True) - self.assertEqual(self.xml.final, True) - self.assertEqual(self.xml.extra, False) - self.assertEqual(self.xml.filled, True) - self.assertEqual(self.xml.plain, False) - - def test_interfaces_ethernet_lan0_address_space_1111_32(self): - last = self.xml.traverse("interfaces ethernet lan0 address 1.1.1.1/32") - self.assertEqual(last, '1.1.1.1/32') - self.assertEqual(self.xml.inside, ['interfaces', 'ethernet', 'address']) - self.assertEqual(self.xml.options, []) - self.assertEqual(self.xml.filling, True) - self.assertEqual(self.xml.word, last) - self.assertEqual(self.xml.check, True) - self.assertEqual(self.xml.final, True) - self.assertEqual(self.xml.extra, False) - self.assertEqual(self.xml.filled, True) - self.assertEqual(self.xml.plain, False) - - def test_interfaces_ethernet_lan0_address_space_1111_32_space(self): - last = self.xml.traverse("interfaces ethernet lan0 address 1.1.1.1/32 ") - self.assertEqual(last, '1.1.1.1/32') - self.assertEqual(self.xml.inside, ['interfaces', 'ethernet', 'address']) - self.assertEqual(self.xml.options, []) - self.assertEqual(self.xml.filling, True) - self.assertEqual(self.xml.word, last) - self.assertEqual(self.xml.check, True) - self.assertEqual(self.xml.final, True) - self.assertEqual(self.xml.extra, False) - self.assertEqual(self.xml.filled, True) - self.assertEqual(self.xml.plain, False) - - def test_interfaces_ethernet_lan0_address_space_1111_32_space_text(self): - last = self.xml.traverse("interfaces ethernet lan0 address 1.1.1.1/32 text") - self.assertEqual(last, '1.1.1.1/32 text') - self.assertEqual(self.xml.inside, ['interfaces', 'ethernet', 'address']) - self.assertEqual(self.xml.options, []) - self.assertEqual(self.xml.filling, True) - self.assertEqual(self.xml.word, last) - self.assertEqual(self.xml.check, True) - self.assertEqual(self.xml.final, True) - self.assertEqual(self.xml.extra, False) - self.assertEqual(self.xml.filled, True) - self.assertEqual(self.xml.plain, False) - - def test_interfaces_ethernet_lan0_address_space_1111_32_space_text_space(self): - last = self.xml.traverse("interfaces ethernet lan0 address 1.1.1.1/32 text ") - self.assertEqual(last, '1.1.1.1/32 text') - self.assertEqual(self.xml.inside, ['interfaces', 'ethernet', 'address']) - self.assertEqual(self.xml.options, []) - self.assertEqual(self.xml.filling, True) - self.assertEqual(self.xml.word, last) - self.assertEqual(self.xml.check, True) - self.assertEqual(self.xml.final, True) - self.assertEqual(self.xml.extra, False) - self.assertEqual(self.xml.filled, True) - self.assertEqual(self.xml.plain, False) - - # Need to add a check for a valuless leafNode diff --git a/src/op_mode/otp.py b/src/op_mode/otp.py index 6d4298894..a4ab9b22b 100755 --- a/src/op_mode/otp.py +++ b/src/op_mode/otp.py @@ -20,9 +20,7 @@ import sys import os import vyos.opmode from jinja2 import Template -from vyos.configquery import ConfigTreeQuery -from vyos.xml import defaults -from vyos.configdict import dict_merge +from vyos.config import Config from vyos.utils.process import popen @@ -61,7 +59,7 @@ def _check_uname_otp(username:str): """ Check if "username" exists and have an OTP key """ - config = ConfigTreeQuery() + config = Config() base_key = ['system', 'login', 'user', username, 'authentication', 'otp', 'key'] if not config.exists(base_key): return None @@ -71,15 +69,13 @@ def _get_login_otp(username: str, info:str): """ Retrieve user settings from configuration and set some defaults """ - config = ConfigTreeQuery() + config = Config() base = ['system', 'login', 'user', username] if not config.exists(base): return None - user_otp = config.get_config_dict(base, key_mangling=('-', '_'), get_first_key=True) - # We have gathered the dict representation of the CLI, but there are default - # options which we need to update into the dictionary retrived. - default_values = defaults(['system', 'login', 'user']) - user_otp = dict_merge(default_values, user_otp) + user_otp = config.get_config_dict(base, key_mangling=('-', '_'), + get_first_key=True, + with_recursive_defaults=True) result = user_otp['authentication']['otp'] # Filling in the system and default options result['info'] = info @@ -94,7 +90,7 @@ def _get_login_otp(username: str, info:str): result['otp_url'] = ''.join(["otpauth://",token_type_acrn,"/",username,"@",\ result['hostname'],"?secret=",result['key_base32'],"&digits=",\ result['otp_length'],"&period=",result['interval']]) - result['qrcode'],err = popen('qrencode -t ansiutf8', input=result['otp_url']) + result['qrcode'],_ = popen('qrencode -t ansiutf8', input=result['otp_url']) return result def show_login(raw: bool, username: str, info:str): |