diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/setup.py | 10 | ||||
-rw-r--r-- | python/vyos/ifconfig/wireguard.py | 58 | ||||
-rw-r--r-- | python/vyos/xml/.gitignore | 1 | ||||
-rw-r--r-- | python/vyos/xml/__init__.py | 39 | ||||
-rw-r--r-- | python/vyos/xml/cache/__init__.py | 0 | ||||
-rw-r--r-- | python/vyos/xml/definition.py | 301 | ||||
-rwxr-xr-x | python/vyos/xml/generate.py | 70 | ||||
-rw-r--r-- | python/vyos/xml/kw.py | 83 | ||||
-rw-r--r-- | python/vyos/xml/load.py | 290 | ||||
-rw-r--r-- | python/vyos/xml/test_xml.py | 279 |
10 files changed, 1100 insertions, 31 deletions
diff --git a/python/setup.py b/python/setup.py index 9440e7fe7..e2d28bd6b 100644 --- a/python/setup.py +++ b/python/setup.py @@ -1,6 +1,13 @@ import os from setuptools import setup +def packages(directory): + return [ + _[0].replace('/','.') + for _ in os.walk(directory) + if os.path.isfile(os.path.join(_[0], '__init__.py')) + ] + setup( name = "vyos", version = "1.3.0", @@ -10,7 +17,7 @@ setup( license = "LGPLv2+", keywords = "vyos", url = "http://www.vyos.io", - packages=["vyos","vyos.ifconfig"], + packages = packages('vyos'), long_description="VyOS configuration libraries", classifiers=[ "Development Status :: 4 - Beta", @@ -18,4 +25,3 @@ setup( "License :: OSI Approved :: GNU Lesser General Public License v2 or later (LGPLv2+)", ], ) - diff --git a/python/vyos/ifconfig/wireguard.py b/python/vyos/ifconfig/wireguard.py index 027b5ea8c..a90a66ac3 100644 --- a/python/vyos/ifconfig/wireguard.py +++ b/python/vyos/ifconfig/wireguard.py @@ -149,10 +149,10 @@ class WireGuardIf(Interface): default = { 'type': 'wireguard', 'port': 0, - 'private-key': None, + 'private_key': None, 'pubkey': None, - 'psk': '/dev/null', - 'allowed-ips': [], + 'psk': '', + 'allowed_ips': [], 'fwmark': 0x00, 'endpoint': None, 'keepalive': 0 @@ -166,8 +166,8 @@ class WireGuardIf(Interface): } } options = Interface.options + \ - ['port', 'private-key', 'pubkey', 'psk', - 'allowed-ips', 'fwmark', 'endpoint', 'keepalive'] + ['port', 'private_key', 'pubkey', 'psk', + 'allowed_ips', 'fwmark', 'endpoint', 'keepalive'] """ Wireguard interface class, contains a comnfig dictionary since @@ -180,44 +180,44 @@ class WireGuardIf(Interface): >>> from vyos.ifconfig import WireGuardIf as wg_if >>> wg_intfc = wg_if("wg01") >>> print (wg_intfc.wg_config) - {'private-key': None, 'keepalive': 0, 'endpoint': None, 'port': 0, - 'allowed-ips': [], 'pubkey': None, 'fwmark': 0, 'psk': '/dev/null'} + {'private_key': None, 'keepalive': 0, 'endpoint': None, 'port': 0, + 'allowed_ips': [], 'pubkey': None, 'fwmark': 0, 'psk': '/dev/null'} >>> wg_intfc.wg_config['keepalive'] = 100 >>> print (wg_intfc.wg_config) - {'private-key': None, 'keepalive': 100, 'endpoint': None, 'port': 0, - 'allowed-ips': [], 'pubkey': None, 'fwmark': 0, 'psk': '/dev/null'} + {'private_key': None, 'keepalive': 100, 'endpoint': None, 'port': 0, + 'allowed_ips': [], 'pubkey': None, 'fwmark': 0, 'psk': '/dev/null'} """ def update(self): - if not self.config['private-key']: + if not self.config['private_key']: raise ValueError("private key required") else: # fmask permission check? pass - cmd = "wg set {} ".format(self.config['ifname']) - cmd += "listen-port {} ".format(self.config['port']) - cmd += "fwmark {} ".format(str(self.config['fwmark'])) - cmd += "private-key {} ".format(self.config['private-key']) - cmd += "peer {} ".format(self.config['pubkey']) - cmd += " preshared-key {} ".format(self.config['psk']) - cmd += " allowed-ips " - for aip in self.config['allowed-ips']: - if aip != self.config['allowed-ips'][-1]: - cmd += aip + "," - else: - cmd += aip + cmd = 'wg set {ifname}'.format(**self.config) + cmd += ' listen-port {port}'.format(**self.config) + cmd += ' fwmark "{fwmark}" '.format(**self.config) + cmd += ' private-key {private_key}'.format(**self.config) + cmd += ' peer {pubkey}'.format(**self.config) + cmd += ' persistent-keepalive {keepalive}'.format(**self.config) + cmd += ' allowed-ips {}'.format(', '.join(self.config['allowed-ips'])) + if self.config['endpoint']: - cmd += " endpoint '{}'".format(self.config['endpoint']) - cmd += " persistent-keepalive {}".format(self.config['keepalive']) + cmd += ' endpoint "{endpoint}"'.format(**self.config) + + psk_file = '' + if self.config['psk']: + psk_file = '/tmp/{ifname}.psk'.format(**self.config) + with open(psk_file, 'w') as f: + f.write(self.config['psk']) + cmd += f' preshared-key {psk_file}' self._cmd(cmd) - # remove psk since it isn't required anymore and is saved in the cli - # config only !! - if self.config['psk'] != '/dev/null': - if os.path.exists(self.config['psk']): - os.remove(self.config['psk']) + # PSK key file is not required to be stored persistently as its backed by CLI + if os.path.exists(psk_file): + os.remove(psk_file) def remove_peer(self, peerkey): """ diff --git a/python/vyos/xml/.gitignore b/python/vyos/xml/.gitignore new file mode 100644 index 000000000..e934adfd1 --- /dev/null +++ b/python/vyos/xml/.gitignore @@ -0,0 +1 @@ +cache/ diff --git a/python/vyos/xml/__init__.py b/python/vyos/xml/__init__.py new file mode 100644 index 000000000..52f5bfb38 --- /dev/null +++ b/python/vyos/xml/__init__.py @@ -0,0 +1,39 @@ +# Copyright (C) 2020 VyOS maintainers and contributors +# +# This library is free software; you can redistribute it and/or modify it under the terms of +# the GNU Lesser General Public License as published by the Free Software Foundation; +# either version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; +# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# See the GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License along with this library; +# if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + +from vyos.xml import definition +from vyos.xml import load +from vyos.xml import kw + + +def load_configuration(cache=[]): + if cache: + return cache[0] + + xml = definition.XML() + + try: + from vyos.xml.cache import configuration + xml.update(configuration.definition) + cache.append(xml) + except Exception: + xml = definition.XML() + print('no xml configuration cache') + xml.update(load.xml(load.configuration_definition)) + + return xml + + +def defaults(lpath): + return load_configuration().defaults(lpath) diff --git a/python/vyos/xml/cache/__init__.py b/python/vyos/xml/cache/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/python/vyos/xml/cache/__init__.py diff --git a/python/vyos/xml/definition.py b/python/vyos/xml/definition.py new file mode 100644 index 000000000..c5f6b0fc7 --- /dev/null +++ b/python/vyos/xml/definition.py @@ -0,0 +1,301 @@ +# Copyright (C) 2020 VyOS maintainers and contributors +# +# This library is free software; you can redistribute it and/or modify it under the terms of +# the GNU Lesser General Public License as published by the Free Software Foundation; +# either version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; +# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# See the GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License along with this library; +# if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + +from vyos.xml import kw + +# As we index by key, the name is first and then the data: +# {'dummy': { +# '[node]': '[tagNode]', +# 'address': { ... } +# } } + +# so when we encounter a tagNode, we are really encountering +# the tagNode data. + + +class XML(dict): + def __init__(self): + self[kw.tree] = {} + self[kw.priorities] = {} + self[kw.owners] = {} + self[kw.default] = {} + self[kw.tags] = [] + + dict.__init__(self) + + self.tree = self[kw.tree] + # the options which matched the last incomplete world we had + # or the last word in a list + self.options = [] + # store all the part of the command we processed + self.inside = [] + # should we check the data pass with the constraints + self.check = False + # are we still typing a word + self.filling = False + # do what have the tagNode value ? + self.filled = False + # last word seen + self.word = '' + # do we have all the data we want ? + self.final = False + # do we have too much data ? + self.extra = False + # what kind of node are we in plain vs data not + self.plain = True + + def reset(self): + self.tree = self[kw.tree] + self.options = [] + self.inside = [] + self.check = False + self.filling = False + self.filled = False + self.word = '' + self.final = False + self.extra = False + self.plain = True + + # from functools import lru_cache + # @lru_cache(maxsize=100) + # XXX: need to use cachetool instead - for later + + def traverse(self, cmd): + self.reset() + + # using split() intead of split(' ') eats the final ' ' + words = cmd.split(' ') + passed = [] + word = '' + data_node = False + space = False + + while words: + word = words.pop(0) + space = word == '' + perfect = False + if word in self.tree: + passed = [] + perfect = True + self.tree = self.tree[word] + data_node = self.tree[kw.node] + self.inside.append(word) + word = '' + continue + if word and data_node: + passed.append(word) + + is_valueless = self.tree.get(kw.valueless, False) + is_leafNode = data_node == kw.leafNode + is_dataNode = data_node in (kw.leafNode, kw.tagNode) + named_options = [_ for _ in self.tree if not kw.found(_)] + + if is_leafNode: + self.final = is_valueless or len(passed) > 0 + self.extra = is_valueless and len(passed) > 0 + self.check = len(passed) >= 1 + else: + self.final = False + self.extra = False + self.check = len(passed) == 1 and not space + + if self.final: + self.word = ' '.join(passed) + else: + self.word = word + + if self.final: + self.filling = True + else: + self.filling = not perfect and bool(cmd and word != '') + + self.filled = self.final or (is_dataNode and len(passed) > 0 and word == '') + + if is_dataNode and len(passed) == 0: + self.options = [] + elif word: + if data_node != kw.plainNode or len(passed) == 1: + self.options = [_ for _ in self.tree if _.startswith(word)] + else: + self.options = [] + else: + self.options = named_options + + self.plain = not is_dataNode + + # self.debug() + + return self.word + + def speculate(self): + if len(self.options) == 1: + self.tree = self.tree[self.options[0]] + self.word = '' + if self.tree.get(kw.node,'') not in (kw.tagNode, kw.leafNode): + self.options = [_ for _ in self.tree if not kw.found(_)] + + def checks(self, cmd): + # as we move thought the named node twice + # the first time we get the data with the node + # and the second with the pass parameters + xml = self[kw.tree] + + words = cmd.split(' ') + send = True + last = [] + while words: + word = words.pop(0) + if word in xml: + xml = xml[word] + send = True + last = [] + continue + if xml[kw.node] in (kw.tagNode, kw.leafNode): + if kw.constraint in xml: + if send: + yield (word, xml[kw.constraint]) + send = False + else: + last.append((word, None)) + if len(last) >= 2: + yield last[0] + + def summary(self): + yield ('enter', '[ summary ]', str(self.inside)) + + if kw.help not in self.tree: + yield ('skip', '[ summary ]', str(self.inside)) + return + + if self.filled: + return + + yield('', '', '\nHelp:') + + if kw.help in self.tree: + summary = self.tree[kw.help].get(kw.summary) + values = self.tree[kw.help].get(kw.valuehelp, []) + if summary: + yield(summary, '', '') + for value in values: + yield(value[kw.format], value[kw.description], '') + + def constraint(self): + yield ('enter', '[ constraint ]', str(self.inside)) + + if kw.help in self.tree: + yield ('skip', '[ constraint ]', str(self.inside)) + return + if kw.error not in self.tree: + yield ('skip', '[ constraint ]', str(self.inside)) + return + if not self.word or self.filling: + yield ('skip', '[ constraint ]', str(self.inside)) + return + + yield('', '', '\nData Constraint:') + + yield('', 'constraint', str(self.tree[kw.error])) + + def listing(self): + yield ('enter', '[ listing ]', str(self.inside)) + + # only show the details when we passed the tagNode data + if not self.plain and not self.filled: + yield ('skip', '[ listing ]', str(self.inside)) + return + + yield('', '', '\nPossible completions:') + + options = list(self.tree.keys()) + options.sort() + for option in options: + if kw.found(option): + continue + if not option.startswith(self.word): + continue + inner = self.tree[option] + prefix = '+> ' if inner.get(kw.node, '') != kw.leafNode else ' ' + if kw.help in inner: + h = inner[kw.help] + yield (prefix + option, h.get(kw.summary), '') + + def debug(self): + print('------') + print("word '%s'" % self.word) + print("filling " + str(self.filling)) + print("filled " + str(self.filled)) + print("final " + str(self.final)) + print("extra " + str(self.extra)) + print("plain " + str(self.plain)) + print("options " + str(self.options)) + + # from functools import lru_cache + # @lru_cache(maxsize=100) + # XXX: need to use cachetool instead - for later + + def defaults(self, lpath): + d = self[kw.default] + for k in lpath: + d = d[k] + r = {} + + def _flatten(inside, index, d, r): + local = inside[index:] + prefix = '_'.join(_.replace('-','_') for _ in local) + '_' if local else '' + for k in d: + under = prefix + k.replace('-','_') + level = inside + [k] + if isinstance(d[k],dict): + _flatten(level, index, d[k], r) + continue + if self.is_multi(level): + r[under] = [_.strip() for _ in d[k].split(',')] + continue + r[under] = d[k] + + _flatten(lpath, len(lpath), d, r) + return r + + # from functools import lru_cache + # @lru_cache(maxsize=100) + # XXX: need to use cachetool instead - for later + + def _tree(self, lpath): + """ + returns the part of the tree searched or None if it does not exists + """ + tree = self[kw.tree] + spath = lpath.copy() + while spath: + p = spath.pop(0) + if p not in tree: + return None + tree = tree[p] + return tree + + def _get(self, lpath, tag): + return self._tree(lpath + [tag]) + + def is_multi(self, lpath): + return self._get(lpath, kw.multi) is True + + def is_tag(self, lpath): + return self._get(lpath, kw.node) == kw.tagNode + + def is_leaf(self, lpath): + return self._get(lpath, kw.node) == kw.leafNode + + def exists(self, lpath): + return self._get(lpath, kw.node) is not None diff --git a/python/vyos/xml/generate.py b/python/vyos/xml/generate.py new file mode 100755 index 000000000..dfbbadd74 --- /dev/null +++ b/python/vyos/xml/generate.py @@ -0,0 +1,70 @@ + +#!/usr/bin/env python3 + +# Copyright (C) 2020 VyOS maintainers and contributors +# +# This library is free software; you can redistribute it and/or modify it under the terms of +# the GNU Lesser General Public License as published by the Free Software Foundation; +# either version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; +# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# See the GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License along with this library; +# if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +import os +import sys +import pprint +import argparse + +from vyos.xml import kw +from vyos.xml import load + + +# import json +# def save_json(fname, loaded): +# with open(fname, 'w') as w: +# print(f'saving {fname}') +# w.write(json.dumps(loaded)) + + +def save_dict(fname, loaded): + with open(fname, 'w') as w: + print(f'saving {fname}') + w.write(f'# generated by {__file__}\n\n') + w.write('definition = ') + w.write(str(loaded)) + + +def main(): + parser = argparse.ArgumentParser(description='generate python file from xml defintions') + parser.add_argument('--conf-folder', type=str, default=load.configuration_definition, help='XML interface definition folder') + parser.add_argument('--conf-cache', type=str, default=load.configuration_cache, help='python file with the conf mode dict') + + # parser.add_argument('--op-folder', type=str, default=load.operational_definition, help='XML interface definition folder') + # parser.add_argument('--op-cache', type=str, default=load.operational_cache, help='python file with the conf mode dict') + + parser.add_argument('--dry', action='store_true', help='dry run, print to screen') + + args = parser.parse_args() + + if os.path.exists(load.configuration_cache): + os.remove(load.configuration_cache) + # if os.path.exists(load.operational_cache): + # os.remove(load.operational_cache) + + conf = load.xml(args.conf_folder) + # op = load.xml(args.op_folder) + + if args.dry: + pprint.pprint(conf) + return + + save_dict(args.conf_cache, conf) + # save_dict(args.op_cache, op) + + +if __name__ == '__main__': + main() diff --git a/python/vyos/xml/kw.py b/python/vyos/xml/kw.py new file mode 100644 index 000000000..c85d9e0fd --- /dev/null +++ b/python/vyos/xml/kw.py @@ -0,0 +1,83 @@ +# Copyright (C) 2020 VyOS maintainers and contributors +# +# This library is free software; you can redistribute it and/or modify it under the terms of +# the GNU Lesser General Public License as published by the Free Software Foundation; +# either version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; +# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# See the GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License along with this library; +# if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +# all named used as key (keywords) in this module are defined here. +# using variable name will allow the linter to warn on typos +# it separates our dict syntax from the xmldict one, making it easy to change + +# we are redefining a python keyword "list" for ease + + +def found(word): + """ + is the word following the format for a keyword + """ + return word and word[0] == '[' and word[-1] == ']' + + +# root + +version = '(version)' +tree = '(tree)' +priorities = '(priorities)' +owners = '(owners)' +tags = '(tags)' +default = '(default)' + +# nodes + +node = '[node]' + +plainNode = '[plainNode]' +leafNode = '[leafNode]' +tagNode = '[tagNode]' + +owner = '[owner]' + +valueless = '[valueless]' +multi = '[multi]' +hidden = '[hidden]' + +# properties + +priority = '[priority]' + +completion = '[completion]' +list = '[list]' +script = '[script]' +path = '[path]' + +# help + +help = '[help]' + +summary = '[summary]' + +valuehelp = '[valuehelp]' +format = 'format' +description = 'description' + +# constraint + +constraint = '[constraint]' +name = '[name]' + +regex = '[regex]' +validator = '[validator]' +argument = '[argument]' + +error = '[error]' + +# created + +node = '[node]' diff --git a/python/vyos/xml/load.py b/python/vyos/xml/load.py new file mode 100644 index 000000000..1f463a5b7 --- /dev/null +++ b/python/vyos/xml/load.py @@ -0,0 +1,290 @@ +# Copyright (C) 2020 VyOS maintainers and contributors +# +# This library is free software; you can redistribute it and/or modify it under the terms of +# the GNU Lesser General Public License as published by the Free Software Foundation; +# either version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; +# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# See the GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License along with this library; +# if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +import glob + +from os.path import join +from os.path import abspath +from os.path import dirname + +import xmltodict + +from vyos import debug +from vyos.xml import kw +from vyos.xml import definition + + +# where the files are located + +_here = dirname(__file__) + +configuration_definition = abspath(join(_here, '..', '..' ,'..', 'interface-definitions')) +configuration_cache = abspath(join(_here, 'cache', 'configuration.py')) + +operational_definition = abspath(join(_here, '..', '..' ,'..', 'op-mode-definitions')) +operational_cache = abspath(join(_here, 'cache', 'operational.py')) + + +# This code is only ran during the creation of the debian package +# therefore we accept that failure can be fatal and not handled +# gracefully. + + +def _fatal(debug_info=''): + """ + raise a RuntimeError or if in developer mode stop the code + """ + if not debug.enabled('developer'): + raise RuntimeError(str(debug_info)) + + if debug_info: + print(debug_info) + breakpoint() + + +def _safe_update(dict1, dict2): + """ + return a dict made of two, raise if any root key would be overwritten + """ + if set(dict1).intersection(dict2): + raise RuntimeError('overlapping configuration') + return {**dict1, **dict2} + + +def _merge(dict1, dict2): + """ + merge dict2 in to dict1 and return it + """ + for k in list(dict2): + if k not in dict1: + dict1[k] = dict2[k] + continue + if isinstance(dict1[k], dict) and isinstance(dict2[k], dict): + dict1[k] = _merge(dict1[k], dict2[k]) + elif isinstance(dict1[k], dict) and isinstance(dict2[k], dict): + dict1[k].extend(dict2[k]) + elif dict1[k] == dict2[k]: + # A definition shared between multiple files + if k in (kw.valueless, kw.multi, kw.hidden, kw.node, kw.summary, kw.owner, kw.priority): + continue + _fatal() + raise RuntimeError('parsing issue - undefined leaf?') + else: + raise RuntimeError('parsing issue - we messed up?') + return dict1 + + +def _include(fname, folder=''): + """ + return the content of a file, including any file referenced with a #include + """ + if not folder: + folder = dirname(fname) + content = '' + with open(fname, 'r') as r: + for line in r.readlines(): + if '#include' in line: + content += _include(join(folder,line.strip()[10:-1]), folder) + continue + content += line + return content + + +def _format_nodes(inside, conf, xml): + r = {} + while conf: + nodetype = '' + nodename = '' + if 'node' in conf.keys(): + nodetype = 'node' + nodename = kw.plainNode + elif 'leafNode' in conf.keys(): + nodetype = 'leafNode' + nodename = kw.leafNode + elif 'tagNode' in conf.keys(): + nodetype = 'tagNode' + nodename = kw.tagNode + elif 'syntaxVersion' in conf.keys(): + r[kw.version] = conf.pop('syntaxVersion')['@version'] + continue + else: + _fatal(conf.keys()) + + nodes = conf.pop(nodetype) + if isinstance(nodes, list): + for node in nodes: + name = node.pop('@name') + into = inside + [name] + r[name] = _format_node(into, node, xml) + r[name][kw.node] = nodename + xml[kw.tags].append(' '.join(into)) + else: + node = nodes + name = node.pop('@name') + into = inside + [name] + r[name] = _format_node(inside + [name], node, xml) + r[name][kw.node] = nodename + xml[kw.tags].append(' '.join(into)) + return r + + +def _set_validator(r, validator): + v = {} + while validator: + if '@name' in validator: + v[kw.name] = validator.pop('@name') + elif '@argument' in validator: + v[kw.argument] = validator.pop('@argument') + else: + _fatal(validator) + r[kw.constraint][kw.validator].append(v) + + +def _format_node(inside, conf, xml): + r = { + kw.valueless: False, + kw.multi: False, + kw.hidden: False, + } + + if '@owner' in conf: + owner = conf.pop('@owner', '') + r[kw.owner] = owner + xml[kw.owners][' '.join(inside)] = owner + + while conf: + keys = conf.keys() + if 'children' in keys: + children = conf.pop('children') + + if isinstance(conf, list): + for child in children: + r = _safe_update(r, _format_nodes(inside, child, xml)) + else: + child = children + r = _safe_update(r, _format_nodes(inside, child, xml)) + + elif 'properties' in keys: + properties = conf.pop('properties') + + while properties: + if 'help' in properties: + helpname = properties.pop('help') + r[kw.help] = {} + r[kw.help][kw.summary] = helpname + + elif 'valueHelp' in properties: + valuehelps = properties.pop('valueHelp') + if kw.valuehelp in r[kw.help]: + _fatal(valuehelps) + r[kw.help][kw.valuehelp] = [] + if isinstance(valuehelps, list): + for valuehelp in valuehelps: + r[kw.help][kw.valuehelp].append(dict(valuehelp)) + else: + valuehelp = valuehelps + r[kw.help][kw.valuehelp].append(dict(valuehelp)) + + elif 'constraint' in properties: + constraint = properties.pop('constraint') + r[kw.constraint] = {} + while constraint: + if 'regex' in constraint: + regexes = constraint.pop('regex') + if kw.regex in kw.constraint: + _fatal(regexes) + r[kw.constraint][kw.regex] = [] + if isinstance(regexes, list): + r[kw.constraint][kw.regex] = [] + for regex in regexes: + r[kw.constraint][kw.regex].append(regex) + else: + regex = regexes + r[kw.constraint][kw.regex].append(regex) + elif 'validator' in constraint: + validators = constraint.pop('validator') + if kw.validator in r[kw.constraint]: + _fatal(validators) + r[kw.constraint][kw.validator] = [] + if isinstance(validators, list): + for validator in validators: + _set_validator(r, validator) + else: + validator = validators + _set_validator(r, validator) + else: + _fatal(constraint) + + elif 'constraintErrorMessage' in properties: + r[kw.error] = properties.pop('constraintErrorMessage') + + elif 'valueless' in properties: + properties.pop('valueless') + r[kw.valueless] = True + + elif 'multi' in properties: + properties.pop('multi') + r[kw.multi] = True + + elif 'hidden' in properties: + properties.pop('hidden') + r[kw.hidden] = True + + elif 'completionHelp' in properties: + completionHelp = properties.pop('completionHelp') + r[kw.completion] = {} + while completionHelp: + if 'list' in completionHelp: + r[kw.completion][kw.list] = completionHelp.pop('list') + elif 'script' in completionHelp: + r[kw.completion][kw.script] = completionHelp.pop('script') + elif 'path' in completionHelp: + r[kw.completion][kw.path] = completionHelp.pop('path') + else: + _fatal(completionHelp.keys()) + + elif 'priority' in properties: + priority = int(properties.pop('priority')) + r[kw.priority] = priority + xml[kw.priorities].setdefault(priority, []).append(' '.join(inside)) + + else: + _fatal(properties.keys()) + + elif 'defaultValue' in keys: + default = conf.pop('defaultValue') + x = xml[kw.default] + for k in inside[:-1]: + x = x.setdefault(k,{}) + x[inside[-1]] = '' if default is None else default + + else: + _fatal(conf) + + return r + + +def xml(folder): + """ + read all the xml in the folder + """ + xml = definition.XML() + for fname in glob.glob(f'{folder}/*.xml.in'): + parsed = xmltodict.parse(_include(fname)) + formated = _format_nodes([], parsed['interfaceDefinition'], xml) + _merge(xml[kw.tree], formated) + # fix the configuration root node for completion + # as we moved all the name "up" the chain to use them as index. + xml[kw.tree][kw.node] = kw.plainNode + # XXX: do the others + return xml diff --git a/python/vyos/xml/test_xml.py b/python/vyos/xml/test_xml.py new file mode 100644 index 000000000..ac0620d99 --- /dev/null +++ b/python/vyos/xml/test_xml.py @@ -0,0 +1,279 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2020 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# +# + +import os +import unittest +from unittest import TestCase, mock + +from vyos.xml import load_configuration + +import sys + + +class TestSearch(TestCase): + def setUp(self): + self.xml = load_configuration() + + def test_(self): + last = self.xml.traverse("") + self.assertEqual(last, '') + self.assertEqual(self.xml.inside, []) + self.assertEqual(self.xml.options, ['protocols', 'service', 'system', 'firewall', 'interfaces', 'vpn', 'nat', 'vrf', 'high-availability']) + self.assertEqual(self.xml.filling, False) + self.assertEqual(self.xml.word, last) + self.assertEqual(self.xml.check, False) + self.assertEqual(self.xml.final, False) + self.assertEqual(self.xml.extra, False) + self.assertEqual(self.xml.filled, False) + self.assertEqual(self.xml.plain, True) + + def test_i(self): + last = self.xml.traverse("i") + self.assertEqual(last, 'i') + self.assertEqual(self.xml.inside, []) + self.assertEqual(self.xml.options, ['interfaces']) + self.assertEqual(self.xml.filling, True) + self.assertEqual(self.xml.word, last) + self.assertEqual(self.xml.check, False) + self.assertEqual(self.xml.final, False) + self.assertEqual(self.xml.extra, False) + self.assertEqual(self.xml.filled, False) + self.assertEqual(self.xml.plain, True) + + def test_interfaces(self): + last = self.xml.traverse("interfaces") + self.assertEqual(last, '') + self.assertEqual(self.xml.inside, ['interfaces']) + self.assertEqual(self.xml.options, ['bonding', 'bridge', 'dummy', 'ethernet', 'geneve', 'l2tpv3', 'loopback', 'macsec', 'openvpn', 'pppoe', 'pseudo-ethernet', 'tunnel', 'vxlan', 'wireguard', 'wireless', 'wirelessmodem']) + self.assertEqual(self.xml.filling, False) + self.assertEqual(self.xml.word, '') + self.assertEqual(self.xml.check, False) + self.assertEqual(self.xml.final, False) + self.assertEqual(self.xml.extra, False) + self.assertEqual(self.xml.filled, False) + self.assertEqual(self.xml.plain, True) + + def test_interfaces_space(self): + last = self.xml.traverse("interfaces ") + self.assertEqual(last, '') + self.assertEqual(self.xml.inside, ['interfaces']) + self.assertEqual(self.xml.options, ['bonding', 'bridge', 'dummy', 'ethernet', 'geneve', 'l2tpv3', 'loopback', 'macsec', 'openvpn', 'pppoe', 'pseudo-ethernet', 'tunnel', 'vxlan', 'wireguard', 'wireless', 'wirelessmodem']) + self.assertEqual(self.xml.filling, False) + self.assertEqual(self.xml.word, last) + self.assertEqual(self.xml.check, False) + self.assertEqual(self.xml.final, False) + self.assertEqual(self.xml.extra, False) + self.assertEqual(self.xml.filled, False) + self.assertEqual(self.xml.plain, True) + + def test_interfaces_w(self): + last = self.xml.traverse("interfaces w") + self.assertEqual(last, 'w') + self.assertEqual(self.xml.inside, ['interfaces']) + self.assertEqual(self.xml.options, ['wireguard', 'wireless', 'wirelessmodem']) + self.assertEqual(self.xml.filling, True) + self.assertEqual(self.xml.word, last) + self.assertEqual(self.xml.check, True) + self.assertEqual(self.xml.final, False) + self.assertEqual(self.xml.extra, False) + self.assertEqual(self.xml.filled, False) + self.assertEqual(self.xml.plain, True) + + def test_interfaces_ethernet(self): + last = self.xml.traverse("interfaces ethernet") + self.assertEqual(last, '') + self.assertEqual(self.xml.inside, ['interfaces', 'ethernet']) + self.assertEqual(self.xml.options, []) + self.assertEqual(self.xml.filling, False) + self.assertEqual(self.xml.word, '') + self.assertEqual(self.xml.check, False) + self.assertEqual(self.xml.final, False) + self.assertEqual(self.xml.extra, False) + self.assertEqual(self.xml.filled, False) + self.assertEqual(self.xml.plain, False) + + def test_interfaces_ethernet_space(self): + last = self.xml.traverse("interfaces ethernet ") + self.assertEqual(last, '') + self.assertEqual(self.xml.inside, ['interfaces', 'ethernet']) + self.assertEqual(self.xml.options, []) + self.assertEqual(self.xml.filling, False) + self.assertEqual(self.xml.word, '') + self.assertEqual(self.xml.check, False) + self.assertEqual(self.xml.final, False) + self.assertEqual(self.xml.extra, False) + self.assertEqual(self.xml.filled, False) + self.assertEqual(self.xml.plain, False) + + def test_interfaces_ethernet_e(self): + last = self.xml.traverse("interfaces ethernet e") + self.assertEqual(last, 'e') + self.assertEqual(self.xml.inside, ['interfaces', 'ethernet']) + self.assertEqual(self.xml.options, []) + self.assertEqual(self.xml.filling, True) + self.assertEqual(self.xml.word, last) + self.assertEqual(self.xml.check, True) + self.assertEqual(self.xml.final, False) + self.assertEqual(self.xml.extra, False) + self.assertEqual(self.xml.filled, False) + self.assertEqual(self.xml.plain, False) + + def test_interfaces_la(self): + last = self.xml.traverse("interfaces ethernet la") + self.assertEqual(last, 'la') + self.assertEqual(self.xml.inside, ['interfaces', 'ethernet']) + self.assertEqual(self.xml.options, []) + self.assertEqual(self.xml.filling, True) + self.assertEqual(self.xml.word, last) + self.assertEqual(self.xml.check, True) + self.assertEqual(self.xml.final, False) + self.assertEqual(self.xml.extra, False) + self.assertEqual(self.xml.filled, False) + self.assertEqual(self.xml.plain, False) + + def test_interfaces_ethernet_lan0(self): + last = self.xml.traverse("interfaces ethernet lan0") + self.assertEqual(last, 'lan0') + self.assertEqual(self.xml.inside, ['interfaces', 'ethernet']) + self.assertEqual(self.xml.options, []) + self.assertEqual(self.xml.filling, True) + self.assertEqual(self.xml.word, last) + self.assertEqual(self.xml.check, True) + self.assertEqual(self.xml.final, False) + self.assertEqual(self.xml.extra, False) + self.assertEqual(self.xml.filled, False) + self.assertEqual(self.xml.plain, False) + + def test_interfaces_ethernet_lan0_space(self): + last = self.xml.traverse("interfaces ethernet lan0 ") + self.assertEqual(last, '') + self.assertEqual(self.xml.inside, ['interfaces', 'ethernet']) + self.assertEqual(len(self.xml.options), 19) + self.assertEqual(self.xml.filling, False) + self.assertEqual(self.xml.word, last) + self.assertEqual(self.xml.check, False) + self.assertEqual(self.xml.final, False) + self.assertEqual(self.xml.extra, False) + self.assertEqual(self.xml.filled, True) + self.assertEqual(self.xml.plain, False) + + def test_interfaces_ethernet_lan0_ad(self): + last = self.xml.traverse("interfaces ethernet lan0 ad") + self.assertEqual(last, 'ad') + self.assertEqual(self.xml.inside, ['interfaces', 'ethernet']) + self.assertEqual(self.xml.options, ['address']) + self.assertEqual(self.xml.filling, True) + self.assertEqual(self.xml.word, last) + self.assertEqual(self.xml.check, False) + self.assertEqual(self.xml.final, False) + self.assertEqual(self.xml.extra, False) + self.assertEqual(self.xml.filled, False) + self.assertEqual(self.xml.plain, False) + + def test_interfaces_ethernet_lan0_address(self): + last = self.xml.traverse("interfaces ethernet lan0 address") + self.assertEqual(last, '') + self.assertEqual(self.xml.inside, ['interfaces', 'ethernet', 'address']) + self.assertEqual(self.xml.options, []) + self.assertEqual(self.xml.filling, False) + self.assertEqual(self.xml.word, last) + self.assertEqual(self.xml.check, False) + self.assertEqual(self.xml.final, False) + self.assertEqual(self.xml.extra, False) + self.assertEqual(self.xml.filled, False) + self.assertEqual(self.xml.plain, False) + + def test_interfaces_ethernet_lan0_address_space(self): + last = self.xml.traverse("interfaces ethernet lan0 address ") + self.assertEqual(last, '') + self.assertEqual(self.xml.inside, ['interfaces', 'ethernet', 'address']) + self.assertEqual(self.xml.options, []) + self.assertEqual(self.xml.filling, False) + self.assertEqual(self.xml.word, last) + self.assertEqual(self.xml.check, False) + self.assertEqual(self.xml.final, False) + self.assertEqual(self.xml.extra, False) + self.assertEqual(self.xml.filled, False) + self.assertEqual(self.xml.plain, False) + + def test_interfaces_ethernet_lan0_address_space_11(self): + last = self.xml.traverse("interfaces ethernet lan0 address 1.1") + self.assertEqual(last, '1.1') + self.assertEqual(self.xml.inside, ['interfaces', 'ethernet', 'address']) + self.assertEqual(self.xml.options, []) + self.assertEqual(self.xml.filling, True) + self.assertEqual(self.xml.word, last) + self.assertEqual(self.xml.check, True) + self.assertEqual(self.xml.final, True) + self.assertEqual(self.xml.extra, False) + self.assertEqual(self.xml.filled, True) + self.assertEqual(self.xml.plain, False) + + def test_interfaces_ethernet_lan0_address_space_1111_32(self): + last = self.xml.traverse("interfaces ethernet lan0 address 1.1.1.1/32") + self.assertEqual(last, '1.1.1.1/32') + self.assertEqual(self.xml.inside, ['interfaces', 'ethernet', 'address']) + self.assertEqual(self.xml.options, []) + self.assertEqual(self.xml.filling, True) + self.assertEqual(self.xml.word, last) + self.assertEqual(self.xml.check, True) + self.assertEqual(self.xml.final, True) + self.assertEqual(self.xml.extra, False) + self.assertEqual(self.xml.filled, True) + self.assertEqual(self.xml.plain, False) + + def test_interfaces_ethernet_lan0_address_space_1111_32_space(self): + last = self.xml.traverse("interfaces ethernet lan0 address 1.1.1.1/32 ") + self.assertEqual(last, '1.1.1.1/32') + self.assertEqual(self.xml.inside, ['interfaces', 'ethernet', 'address']) + self.assertEqual(self.xml.options, []) + self.assertEqual(self.xml.filling, True) + self.assertEqual(self.xml.word, last) + self.assertEqual(self.xml.check, True) + self.assertEqual(self.xml.final, True) + self.assertEqual(self.xml.extra, False) + self.assertEqual(self.xml.filled, True) + self.assertEqual(self.xml.plain, False) + + def test_interfaces_ethernet_lan0_address_space_1111_32_space_text(self): + last = self.xml.traverse("interfaces ethernet lan0 address 1.1.1.1/32 text") + self.assertEqual(last, '1.1.1.1/32 text') + self.assertEqual(self.xml.inside, ['interfaces', 'ethernet', 'address']) + self.assertEqual(self.xml.options, []) + self.assertEqual(self.xml.filling, True) + self.assertEqual(self.xml.word, last) + self.assertEqual(self.xml.check, True) + self.assertEqual(self.xml.final, True) + self.assertEqual(self.xml.extra, False) + self.assertEqual(self.xml.filled, True) + self.assertEqual(self.xml.plain, False) + + def test_interfaces_ethernet_lan0_address_space_1111_32_space_text_space(self): + last = self.xml.traverse("interfaces ethernet lan0 address 1.1.1.1/32 text ") + self.assertEqual(last, '1.1.1.1/32 text') + self.assertEqual(self.xml.inside, ['interfaces', 'ethernet', 'address']) + self.assertEqual(self.xml.options, []) + self.assertEqual(self.xml.filling, True) + self.assertEqual(self.xml.word, last) + self.assertEqual(self.xml.check, True) + self.assertEqual(self.xml.final, True) + self.assertEqual(self.xml.extra, False) + self.assertEqual(self.xml.filled, True) + self.assertEqual(self.xml.plain, False) + + # Need to add a check for a valuless leafNode
\ No newline at end of file |