summaryrefslogtreecommitdiff
path: root/python/vyos
diff options
context:
space:
mode:
Diffstat (limited to 'python/vyos')
-rw-r--r--python/vyos/ifconfig/wireguard.py58
-rw-r--r--python/vyos/xml/.gitignore1
-rw-r--r--python/vyos/xml/__init__.py39
-rw-r--r--python/vyos/xml/cache/__init__.py0
-rw-r--r--python/vyos/xml/definition.py301
-rwxr-xr-xpython/vyos/xml/generate.py70
-rw-r--r--python/vyos/xml/kw.py83
-rw-r--r--python/vyos/xml/load.py290
-rw-r--r--python/vyos/xml/test_xml.py279
9 files changed, 1092 insertions, 29 deletions
diff --git a/python/vyos/ifconfig/wireguard.py b/python/vyos/ifconfig/wireguard.py
index 027b5ea8c..a90a66ac3 100644
--- a/python/vyos/ifconfig/wireguard.py
+++ b/python/vyos/ifconfig/wireguard.py
@@ -149,10 +149,10 @@ class WireGuardIf(Interface):
default = {
'type': 'wireguard',
'port': 0,
- 'private-key': None,
+ 'private_key': None,
'pubkey': None,
- 'psk': '/dev/null',
- 'allowed-ips': [],
+ 'psk': '',
+ 'allowed_ips': [],
'fwmark': 0x00,
'endpoint': None,
'keepalive': 0
@@ -166,8 +166,8 @@ class WireGuardIf(Interface):
}
}
options = Interface.options + \
- ['port', 'private-key', 'pubkey', 'psk',
- 'allowed-ips', 'fwmark', 'endpoint', 'keepalive']
+ ['port', 'private_key', 'pubkey', 'psk',
+ 'allowed_ips', 'fwmark', 'endpoint', 'keepalive']
"""
Wireguard interface class, contains a comnfig dictionary since
@@ -180,44 +180,44 @@ class WireGuardIf(Interface):
>>> from vyos.ifconfig import WireGuardIf as wg_if
>>> wg_intfc = wg_if("wg01")
>>> print (wg_intfc.wg_config)
- {'private-key': None, 'keepalive': 0, 'endpoint': None, 'port': 0,
- 'allowed-ips': [], 'pubkey': None, 'fwmark': 0, 'psk': '/dev/null'}
+ {'private_key': None, 'keepalive': 0, 'endpoint': None, 'port': 0,
+ 'allowed_ips': [], 'pubkey': None, 'fwmark': 0, 'psk': '/dev/null'}
>>> wg_intfc.wg_config['keepalive'] = 100
>>> print (wg_intfc.wg_config)
- {'private-key': None, 'keepalive': 100, 'endpoint': None, 'port': 0,
- 'allowed-ips': [], 'pubkey': None, 'fwmark': 0, 'psk': '/dev/null'}
+ {'private_key': None, 'keepalive': 100, 'endpoint': None, 'port': 0,
+ 'allowed_ips': [], 'pubkey': None, 'fwmark': 0, 'psk': '/dev/null'}
"""
def update(self):
- if not self.config['private-key']:
+ if not self.config['private_key']:
raise ValueError("private key required")
else:
# fmask permission check?
pass
- cmd = "wg set {} ".format(self.config['ifname'])
- cmd += "listen-port {} ".format(self.config['port'])
- cmd += "fwmark {} ".format(str(self.config['fwmark']))
- cmd += "private-key {} ".format(self.config['private-key'])
- cmd += "peer {} ".format(self.config['pubkey'])
- cmd += " preshared-key {} ".format(self.config['psk'])
- cmd += " allowed-ips "
- for aip in self.config['allowed-ips']:
- if aip != self.config['allowed-ips'][-1]:
- cmd += aip + ","
- else:
- cmd += aip
+ cmd = 'wg set {ifname}'.format(**self.config)
+ cmd += ' listen-port {port}'.format(**self.config)
+ cmd += ' fwmark "{fwmark}" '.format(**self.config)
+ cmd += ' private-key {private_key}'.format(**self.config)
+ cmd += ' peer {pubkey}'.format(**self.config)
+ cmd += ' persistent-keepalive {keepalive}'.format(**self.config)
+ cmd += ' allowed-ips {}'.format(', '.join(self.config['allowed-ips']))
+
if self.config['endpoint']:
- cmd += " endpoint '{}'".format(self.config['endpoint'])
- cmd += " persistent-keepalive {}".format(self.config['keepalive'])
+ cmd += ' endpoint "{endpoint}"'.format(**self.config)
+
+ psk_file = ''
+ if self.config['psk']:
+ psk_file = '/tmp/{ifname}.psk'.format(**self.config)
+ with open(psk_file, 'w') as f:
+ f.write(self.config['psk'])
+ cmd += f' preshared-key {psk_file}'
self._cmd(cmd)
- # remove psk since it isn't required anymore and is saved in the cli
- # config only !!
- if self.config['psk'] != '/dev/null':
- if os.path.exists(self.config['psk']):
- os.remove(self.config['psk'])
+ # PSK key file is not required to be stored persistently as its backed by CLI
+ if os.path.exists(psk_file):
+ os.remove(psk_file)
def remove_peer(self, peerkey):
"""
diff --git a/python/vyos/xml/.gitignore b/python/vyos/xml/.gitignore
new file mode 100644
index 000000000..e934adfd1
--- /dev/null
+++ b/python/vyos/xml/.gitignore
@@ -0,0 +1 @@
+cache/
diff --git a/python/vyos/xml/__init__.py b/python/vyos/xml/__init__.py
new file mode 100644
index 000000000..52f5bfb38
--- /dev/null
+++ b/python/vyos/xml/__init__.py
@@ -0,0 +1,39 @@
+# Copyright (C) 2020 VyOS maintainers and contributors
+#
+# This library is free software; you can redistribute it and/or modify it under the terms of
+# the GNU Lesser General Public License as published by the Free Software Foundation;
+# either version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+# See the GNU Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License along with this library;
+# if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+
+from vyos.xml import definition
+from vyos.xml import load
+from vyos.xml import kw
+
+
+def load_configuration(cache=[]):
+ if cache:
+ return cache[0]
+
+ xml = definition.XML()
+
+ try:
+ from vyos.xml.cache import configuration
+ xml.update(configuration.definition)
+ cache.append(xml)
+ except Exception:
+ xml = definition.XML()
+ print('no xml configuration cache')
+ xml.update(load.xml(load.configuration_definition))
+
+ return xml
+
+
+def defaults(lpath):
+ return load_configuration().defaults(lpath)
diff --git a/python/vyos/xml/cache/__init__.py b/python/vyos/xml/cache/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/python/vyos/xml/cache/__init__.py
diff --git a/python/vyos/xml/definition.py b/python/vyos/xml/definition.py
new file mode 100644
index 000000000..c5f6b0fc7
--- /dev/null
+++ b/python/vyos/xml/definition.py
@@ -0,0 +1,301 @@
+# Copyright (C) 2020 VyOS maintainers and contributors
+#
+# This library is free software; you can redistribute it and/or modify it under the terms of
+# the GNU Lesser General Public License as published by the Free Software Foundation;
+# either version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+# See the GNU Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License along with this library;
+# if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+
+from vyos.xml import kw
+
+# As we index by key, the name is first and then the data:
+# {'dummy': {
+# '[node]': '[tagNode]',
+# 'address': { ... }
+# } }
+
+# so when we encounter a tagNode, we are really encountering
+# the tagNode data.
+
+
+class XML(dict):
+ def __init__(self):
+ self[kw.tree] = {}
+ self[kw.priorities] = {}
+ self[kw.owners] = {}
+ self[kw.default] = {}
+ self[kw.tags] = []
+
+ dict.__init__(self)
+
+ self.tree = self[kw.tree]
+ # the options which matched the last incomplete world we had
+ # or the last word in a list
+ self.options = []
+ # store all the part of the command we processed
+ self.inside = []
+ # should we check the data pass with the constraints
+ self.check = False
+ # are we still typing a word
+ self.filling = False
+ # do what have the tagNode value ?
+ self.filled = False
+ # last word seen
+ self.word = ''
+ # do we have all the data we want ?
+ self.final = False
+ # do we have too much data ?
+ self.extra = False
+ # what kind of node are we in plain vs data not
+ self.plain = True
+
+ def reset(self):
+ self.tree = self[kw.tree]
+ self.options = []
+ self.inside = []
+ self.check = False
+ self.filling = False
+ self.filled = False
+ self.word = ''
+ self.final = False
+ self.extra = False
+ self.plain = True
+
+ # from functools import lru_cache
+ # @lru_cache(maxsize=100)
+ # XXX: need to use cachetool instead - for later
+
+ def traverse(self, cmd):
+ self.reset()
+
+ # using split() intead of split(' ') eats the final ' '
+ words = cmd.split(' ')
+ passed = []
+ word = ''
+ data_node = False
+ space = False
+
+ while words:
+ word = words.pop(0)
+ space = word == ''
+ perfect = False
+ if word in self.tree:
+ passed = []
+ perfect = True
+ self.tree = self.tree[word]
+ data_node = self.tree[kw.node]
+ self.inside.append(word)
+ word = ''
+ continue
+ if word and data_node:
+ passed.append(word)
+
+ is_valueless = self.tree.get(kw.valueless, False)
+ is_leafNode = data_node == kw.leafNode
+ is_dataNode = data_node in (kw.leafNode, kw.tagNode)
+ named_options = [_ for _ in self.tree if not kw.found(_)]
+
+ if is_leafNode:
+ self.final = is_valueless or len(passed) > 0
+ self.extra = is_valueless and len(passed) > 0
+ self.check = len(passed) >= 1
+ else:
+ self.final = False
+ self.extra = False
+ self.check = len(passed) == 1 and not space
+
+ if self.final:
+ self.word = ' '.join(passed)
+ else:
+ self.word = word
+
+ if self.final:
+ self.filling = True
+ else:
+ self.filling = not perfect and bool(cmd and word != '')
+
+ self.filled = self.final or (is_dataNode and len(passed) > 0 and word == '')
+
+ if is_dataNode and len(passed) == 0:
+ self.options = []
+ elif word:
+ if data_node != kw.plainNode or len(passed) == 1:
+ self.options = [_ for _ in self.tree if _.startswith(word)]
+ else:
+ self.options = []
+ else:
+ self.options = named_options
+
+ self.plain = not is_dataNode
+
+ # self.debug()
+
+ return self.word
+
+ def speculate(self):
+ if len(self.options) == 1:
+ self.tree = self.tree[self.options[0]]
+ self.word = ''
+ if self.tree.get(kw.node,'') not in (kw.tagNode, kw.leafNode):
+ self.options = [_ for _ in self.tree if not kw.found(_)]
+
+ def checks(self, cmd):
+ # as we move thought the named node twice
+ # the first time we get the data with the node
+ # and the second with the pass parameters
+ xml = self[kw.tree]
+
+ words = cmd.split(' ')
+ send = True
+ last = []
+ while words:
+ word = words.pop(0)
+ if word in xml:
+ xml = xml[word]
+ send = True
+ last = []
+ continue
+ if xml[kw.node] in (kw.tagNode, kw.leafNode):
+ if kw.constraint in xml:
+ if send:
+ yield (word, xml[kw.constraint])
+ send = False
+ else:
+ last.append((word, None))
+ if len(last) >= 2:
+ yield last[0]
+
+ def summary(self):
+ yield ('enter', '[ summary ]', str(self.inside))
+
+ if kw.help not in self.tree:
+ yield ('skip', '[ summary ]', str(self.inside))
+ return
+
+ if self.filled:
+ return
+
+ yield('', '', '\nHelp:')
+
+ if kw.help in self.tree:
+ summary = self.tree[kw.help].get(kw.summary)
+ values = self.tree[kw.help].get(kw.valuehelp, [])
+ if summary:
+ yield(summary, '', '')
+ for value in values:
+ yield(value[kw.format], value[kw.description], '')
+
+ def constraint(self):
+ yield ('enter', '[ constraint ]', str(self.inside))
+
+ if kw.help in self.tree:
+ yield ('skip', '[ constraint ]', str(self.inside))
+ return
+ if kw.error not in self.tree:
+ yield ('skip', '[ constraint ]', str(self.inside))
+ return
+ if not self.word or self.filling:
+ yield ('skip', '[ constraint ]', str(self.inside))
+ return
+
+ yield('', '', '\nData Constraint:')
+
+ yield('', 'constraint', str(self.tree[kw.error]))
+
+ def listing(self):
+ yield ('enter', '[ listing ]', str(self.inside))
+
+ # only show the details when we passed the tagNode data
+ if not self.plain and not self.filled:
+ yield ('skip', '[ listing ]', str(self.inside))
+ return
+
+ yield('', '', '\nPossible completions:')
+
+ options = list(self.tree.keys())
+ options.sort()
+ for option in options:
+ if kw.found(option):
+ continue
+ if not option.startswith(self.word):
+ continue
+ inner = self.tree[option]
+ prefix = '+> ' if inner.get(kw.node, '') != kw.leafNode else ' '
+ if kw.help in inner:
+ h = inner[kw.help]
+ yield (prefix + option, h.get(kw.summary), '')
+
+ def debug(self):
+ print('------')
+ print("word '%s'" % self.word)
+ print("filling " + str(self.filling))
+ print("filled " + str(self.filled))
+ print("final " + str(self.final))
+ print("extra " + str(self.extra))
+ print("plain " + str(self.plain))
+ print("options " + str(self.options))
+
+ # from functools import lru_cache
+ # @lru_cache(maxsize=100)
+ # XXX: need to use cachetool instead - for later
+
+ def defaults(self, lpath):
+ d = self[kw.default]
+ for k in lpath:
+ d = d[k]
+ r = {}
+
+ def _flatten(inside, index, d, r):
+ local = inside[index:]
+ prefix = '_'.join(_.replace('-','_') for _ in local) + '_' if local else ''
+ for k in d:
+ under = prefix + k.replace('-','_')
+ level = inside + [k]
+ if isinstance(d[k],dict):
+ _flatten(level, index, d[k], r)
+ continue
+ if self.is_multi(level):
+ r[under] = [_.strip() for _ in d[k].split(',')]
+ continue
+ r[under] = d[k]
+
+ _flatten(lpath, len(lpath), d, r)
+ return r
+
+ # from functools import lru_cache
+ # @lru_cache(maxsize=100)
+ # XXX: need to use cachetool instead - for later
+
+ def _tree(self, lpath):
+ """
+ returns the part of the tree searched or None if it does not exists
+ """
+ tree = self[kw.tree]
+ spath = lpath.copy()
+ while spath:
+ p = spath.pop(0)
+ if p not in tree:
+ return None
+ tree = tree[p]
+ return tree
+
+ def _get(self, lpath, tag):
+ return self._tree(lpath + [tag])
+
+ def is_multi(self, lpath):
+ return self._get(lpath, kw.multi) is True
+
+ def is_tag(self, lpath):
+ return self._get(lpath, kw.node) == kw.tagNode
+
+ def is_leaf(self, lpath):
+ return self._get(lpath, kw.node) == kw.leafNode
+
+ def exists(self, lpath):
+ return self._get(lpath, kw.node) is not None
diff --git a/python/vyos/xml/generate.py b/python/vyos/xml/generate.py
new file mode 100755
index 000000000..dfbbadd74
--- /dev/null
+++ b/python/vyos/xml/generate.py
@@ -0,0 +1,70 @@
+
+#!/usr/bin/env python3
+
+# Copyright (C) 2020 VyOS maintainers and contributors
+#
+# This library is free software; you can redistribute it and/or modify it under the terms of
+# the GNU Lesser General Public License as published by the Free Software Foundation;
+# either version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+# See the GNU Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License along with this library;
+# if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+import os
+import sys
+import pprint
+import argparse
+
+from vyos.xml import kw
+from vyos.xml import load
+
+
+# import json
+# def save_json(fname, loaded):
+# with open(fname, 'w') as w:
+# print(f'saving {fname}')
+# w.write(json.dumps(loaded))
+
+
+def save_dict(fname, loaded):
+ with open(fname, 'w') as w:
+ print(f'saving {fname}')
+ w.write(f'# generated by {__file__}\n\n')
+ w.write('definition = ')
+ w.write(str(loaded))
+
+
+def main():
+ parser = argparse.ArgumentParser(description='generate python file from xml defintions')
+ parser.add_argument('--conf-folder', type=str, default=load.configuration_definition, help='XML interface definition folder')
+ parser.add_argument('--conf-cache', type=str, default=load.configuration_cache, help='python file with the conf mode dict')
+
+ # parser.add_argument('--op-folder', type=str, default=load.operational_definition, help='XML interface definition folder')
+ # parser.add_argument('--op-cache', type=str, default=load.operational_cache, help='python file with the conf mode dict')
+
+ parser.add_argument('--dry', action='store_true', help='dry run, print to screen')
+
+ args = parser.parse_args()
+
+ if os.path.exists(load.configuration_cache):
+ os.remove(load.configuration_cache)
+ # if os.path.exists(load.operational_cache):
+ # os.remove(load.operational_cache)
+
+ conf = load.xml(args.conf_folder)
+ # op = load.xml(args.op_folder)
+
+ if args.dry:
+ pprint.pprint(conf)
+ return
+
+ save_dict(args.conf_cache, conf)
+ # save_dict(args.op_cache, op)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/python/vyos/xml/kw.py b/python/vyos/xml/kw.py
new file mode 100644
index 000000000..c85d9e0fd
--- /dev/null
+++ b/python/vyos/xml/kw.py
@@ -0,0 +1,83 @@
+# Copyright (C) 2020 VyOS maintainers and contributors
+#
+# This library is free software; you can redistribute it and/or modify it under the terms of
+# the GNU Lesser General Public License as published by the Free Software Foundation;
+# either version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+# See the GNU Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License along with this library;
+# if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+# all named used as key (keywords) in this module are defined here.
+# using variable name will allow the linter to warn on typos
+# it separates our dict syntax from the xmldict one, making it easy to change
+
+# we are redefining a python keyword "list" for ease
+
+
+def found(word):
+ """
+ is the word following the format for a keyword
+ """
+ return word and word[0] == '[' and word[-1] == ']'
+
+
+# root
+
+version = '(version)'
+tree = '(tree)'
+priorities = '(priorities)'
+owners = '(owners)'
+tags = '(tags)'
+default = '(default)'
+
+# nodes
+
+node = '[node]'
+
+plainNode = '[plainNode]'
+leafNode = '[leafNode]'
+tagNode = '[tagNode]'
+
+owner = '[owner]'
+
+valueless = '[valueless]'
+multi = '[multi]'
+hidden = '[hidden]'
+
+# properties
+
+priority = '[priority]'
+
+completion = '[completion]'
+list = '[list]'
+script = '[script]'
+path = '[path]'
+
+# help
+
+help = '[help]'
+
+summary = '[summary]'
+
+valuehelp = '[valuehelp]'
+format = 'format'
+description = 'description'
+
+# constraint
+
+constraint = '[constraint]'
+name = '[name]'
+
+regex = '[regex]'
+validator = '[validator]'
+argument = '[argument]'
+
+error = '[error]'
+
+# created
+
+node = '[node]'
diff --git a/python/vyos/xml/load.py b/python/vyos/xml/load.py
new file mode 100644
index 000000000..1f463a5b7
--- /dev/null
+++ b/python/vyos/xml/load.py
@@ -0,0 +1,290 @@
+# Copyright (C) 2020 VyOS maintainers and contributors
+#
+# This library is free software; you can redistribute it and/or modify it under the terms of
+# the GNU Lesser General Public License as published by the Free Software Foundation;
+# either version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+# See the GNU Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License along with this library;
+# if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+import glob
+
+from os.path import join
+from os.path import abspath
+from os.path import dirname
+
+import xmltodict
+
+from vyos import debug
+from vyos.xml import kw
+from vyos.xml import definition
+
+
+# where the files are located
+
+_here = dirname(__file__)
+
+configuration_definition = abspath(join(_here, '..', '..' ,'..', 'interface-definitions'))
+configuration_cache = abspath(join(_here, 'cache', 'configuration.py'))
+
+operational_definition = abspath(join(_here, '..', '..' ,'..', 'op-mode-definitions'))
+operational_cache = abspath(join(_here, 'cache', 'operational.py'))
+
+
+# This code is only ran during the creation of the debian package
+# therefore we accept that failure can be fatal and not handled
+# gracefully.
+
+
+def _fatal(debug_info=''):
+ """
+ raise a RuntimeError or if in developer mode stop the code
+ """
+ if not debug.enabled('developer'):
+ raise RuntimeError(str(debug_info))
+
+ if debug_info:
+ print(debug_info)
+ breakpoint()
+
+
+def _safe_update(dict1, dict2):
+ """
+ return a dict made of two, raise if any root key would be overwritten
+ """
+ if set(dict1).intersection(dict2):
+ raise RuntimeError('overlapping configuration')
+ return {**dict1, **dict2}
+
+
+def _merge(dict1, dict2):
+ """
+ merge dict2 in to dict1 and return it
+ """
+ for k in list(dict2):
+ if k not in dict1:
+ dict1[k] = dict2[k]
+ continue
+ if isinstance(dict1[k], dict) and isinstance(dict2[k], dict):
+ dict1[k] = _merge(dict1[k], dict2[k])
+ elif isinstance(dict1[k], dict) and isinstance(dict2[k], dict):
+ dict1[k].extend(dict2[k])
+ elif dict1[k] == dict2[k]:
+ # A definition shared between multiple files
+ if k in (kw.valueless, kw.multi, kw.hidden, kw.node, kw.summary, kw.owner, kw.priority):
+ continue
+ _fatal()
+ raise RuntimeError('parsing issue - undefined leaf?')
+ else:
+ raise RuntimeError('parsing issue - we messed up?')
+ return dict1
+
+
+def _include(fname, folder=''):
+ """
+ return the content of a file, including any file referenced with a #include
+ """
+ if not folder:
+ folder = dirname(fname)
+ content = ''
+ with open(fname, 'r') as r:
+ for line in r.readlines():
+ if '#include' in line:
+ content += _include(join(folder,line.strip()[10:-1]), folder)
+ continue
+ content += line
+ return content
+
+
+def _format_nodes(inside, conf, xml):
+ r = {}
+ while conf:
+ nodetype = ''
+ nodename = ''
+ if 'node' in conf.keys():
+ nodetype = 'node'
+ nodename = kw.plainNode
+ elif 'leafNode' in conf.keys():
+ nodetype = 'leafNode'
+ nodename = kw.leafNode
+ elif 'tagNode' in conf.keys():
+ nodetype = 'tagNode'
+ nodename = kw.tagNode
+ elif 'syntaxVersion' in conf.keys():
+ r[kw.version] = conf.pop('syntaxVersion')['@version']
+ continue
+ else:
+ _fatal(conf.keys())
+
+ nodes = conf.pop(nodetype)
+ if isinstance(nodes, list):
+ for node in nodes:
+ name = node.pop('@name')
+ into = inside + [name]
+ r[name] = _format_node(into, node, xml)
+ r[name][kw.node] = nodename
+ xml[kw.tags].append(' '.join(into))
+ else:
+ node = nodes
+ name = node.pop('@name')
+ into = inside + [name]
+ r[name] = _format_node(inside + [name], node, xml)
+ r[name][kw.node] = nodename
+ xml[kw.tags].append(' '.join(into))
+ return r
+
+
+def _set_validator(r, validator):
+ v = {}
+ while validator:
+ if '@name' in validator:
+ v[kw.name] = validator.pop('@name')
+ elif '@argument' in validator:
+ v[kw.argument] = validator.pop('@argument')
+ else:
+ _fatal(validator)
+ r[kw.constraint][kw.validator].append(v)
+
+
+def _format_node(inside, conf, xml):
+ r = {
+ kw.valueless: False,
+ kw.multi: False,
+ kw.hidden: False,
+ }
+
+ if '@owner' in conf:
+ owner = conf.pop('@owner', '')
+ r[kw.owner] = owner
+ xml[kw.owners][' '.join(inside)] = owner
+
+ while conf:
+ keys = conf.keys()
+ if 'children' in keys:
+ children = conf.pop('children')
+
+ if isinstance(conf, list):
+ for child in children:
+ r = _safe_update(r, _format_nodes(inside, child, xml))
+ else:
+ child = children
+ r = _safe_update(r, _format_nodes(inside, child, xml))
+
+ elif 'properties' in keys:
+ properties = conf.pop('properties')
+
+ while properties:
+ if 'help' in properties:
+ helpname = properties.pop('help')
+ r[kw.help] = {}
+ r[kw.help][kw.summary] = helpname
+
+ elif 'valueHelp' in properties:
+ valuehelps = properties.pop('valueHelp')
+ if kw.valuehelp in r[kw.help]:
+ _fatal(valuehelps)
+ r[kw.help][kw.valuehelp] = []
+ if isinstance(valuehelps, list):
+ for valuehelp in valuehelps:
+ r[kw.help][kw.valuehelp].append(dict(valuehelp))
+ else:
+ valuehelp = valuehelps
+ r[kw.help][kw.valuehelp].append(dict(valuehelp))
+
+ elif 'constraint' in properties:
+ constraint = properties.pop('constraint')
+ r[kw.constraint] = {}
+ while constraint:
+ if 'regex' in constraint:
+ regexes = constraint.pop('regex')
+ if kw.regex in kw.constraint:
+ _fatal(regexes)
+ r[kw.constraint][kw.regex] = []
+ if isinstance(regexes, list):
+ r[kw.constraint][kw.regex] = []
+ for regex in regexes:
+ r[kw.constraint][kw.regex].append(regex)
+ else:
+ regex = regexes
+ r[kw.constraint][kw.regex].append(regex)
+ elif 'validator' in constraint:
+ validators = constraint.pop('validator')
+ if kw.validator in r[kw.constraint]:
+ _fatal(validators)
+ r[kw.constraint][kw.validator] = []
+ if isinstance(validators, list):
+ for validator in validators:
+ _set_validator(r, validator)
+ else:
+ validator = validators
+ _set_validator(r, validator)
+ else:
+ _fatal(constraint)
+
+ elif 'constraintErrorMessage' in properties:
+ r[kw.error] = properties.pop('constraintErrorMessage')
+
+ elif 'valueless' in properties:
+ properties.pop('valueless')
+ r[kw.valueless] = True
+
+ elif 'multi' in properties:
+ properties.pop('multi')
+ r[kw.multi] = True
+
+ elif 'hidden' in properties:
+ properties.pop('hidden')
+ r[kw.hidden] = True
+
+ elif 'completionHelp' in properties:
+ completionHelp = properties.pop('completionHelp')
+ r[kw.completion] = {}
+ while completionHelp:
+ if 'list' in completionHelp:
+ r[kw.completion][kw.list] = completionHelp.pop('list')
+ elif 'script' in completionHelp:
+ r[kw.completion][kw.script] = completionHelp.pop('script')
+ elif 'path' in completionHelp:
+ r[kw.completion][kw.path] = completionHelp.pop('path')
+ else:
+ _fatal(completionHelp.keys())
+
+ elif 'priority' in properties:
+ priority = int(properties.pop('priority'))
+ r[kw.priority] = priority
+ xml[kw.priorities].setdefault(priority, []).append(' '.join(inside))
+
+ else:
+ _fatal(properties.keys())
+
+ elif 'defaultValue' in keys:
+ default = conf.pop('defaultValue')
+ x = xml[kw.default]
+ for k in inside[:-1]:
+ x = x.setdefault(k,{})
+ x[inside[-1]] = '' if default is None else default
+
+ else:
+ _fatal(conf)
+
+ return r
+
+
+def xml(folder):
+ """
+ read all the xml in the folder
+ """
+ xml = definition.XML()
+ for fname in glob.glob(f'{folder}/*.xml.in'):
+ parsed = xmltodict.parse(_include(fname))
+ formated = _format_nodes([], parsed['interfaceDefinition'], xml)
+ _merge(xml[kw.tree], formated)
+ # fix the configuration root node for completion
+ # as we moved all the name "up" the chain to use them as index.
+ xml[kw.tree][kw.node] = kw.plainNode
+ # XXX: do the others
+ return xml
diff --git a/python/vyos/xml/test_xml.py b/python/vyos/xml/test_xml.py
new file mode 100644
index 000000000..ac0620d99
--- /dev/null
+++ b/python/vyos/xml/test_xml.py
@@ -0,0 +1,279 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 2020 VyOS maintainers and contributors
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 or later as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+#
+
+import os
+import unittest
+from unittest import TestCase, mock
+
+from vyos.xml import load_configuration
+
+import sys
+
+
+class TestSearch(TestCase):
+ def setUp(self):
+ self.xml = load_configuration()
+
+ def test_(self):
+ last = self.xml.traverse("")
+ self.assertEqual(last, '')
+ self.assertEqual(self.xml.inside, [])
+ self.assertEqual(self.xml.options, ['protocols', 'service', 'system', 'firewall', 'interfaces', 'vpn', 'nat', 'vrf', 'high-availability'])
+ self.assertEqual(self.xml.filling, False)
+ self.assertEqual(self.xml.word, last)
+ self.assertEqual(self.xml.check, False)
+ self.assertEqual(self.xml.final, False)
+ self.assertEqual(self.xml.extra, False)
+ self.assertEqual(self.xml.filled, False)
+ self.assertEqual(self.xml.plain, True)
+
+ def test_i(self):
+ last = self.xml.traverse("i")
+ self.assertEqual(last, 'i')
+ self.assertEqual(self.xml.inside, [])
+ self.assertEqual(self.xml.options, ['interfaces'])
+ self.assertEqual(self.xml.filling, True)
+ self.assertEqual(self.xml.word, last)
+ self.assertEqual(self.xml.check, False)
+ self.assertEqual(self.xml.final, False)
+ self.assertEqual(self.xml.extra, False)
+ self.assertEqual(self.xml.filled, False)
+ self.assertEqual(self.xml.plain, True)
+
+ def test_interfaces(self):
+ last = self.xml.traverse("interfaces")
+ self.assertEqual(last, '')
+ self.assertEqual(self.xml.inside, ['interfaces'])
+ self.assertEqual(self.xml.options, ['bonding', 'bridge', 'dummy', 'ethernet', 'geneve', 'l2tpv3', 'loopback', 'macsec', 'openvpn', 'pppoe', 'pseudo-ethernet', 'tunnel', 'vxlan', 'wireguard', 'wireless', 'wirelessmodem'])
+ self.assertEqual(self.xml.filling, False)
+ self.assertEqual(self.xml.word, '')
+ self.assertEqual(self.xml.check, False)
+ self.assertEqual(self.xml.final, False)
+ self.assertEqual(self.xml.extra, False)
+ self.assertEqual(self.xml.filled, False)
+ self.assertEqual(self.xml.plain, True)
+
+ def test_interfaces_space(self):
+ last = self.xml.traverse("interfaces ")
+ self.assertEqual(last, '')
+ self.assertEqual(self.xml.inside, ['interfaces'])
+ self.assertEqual(self.xml.options, ['bonding', 'bridge', 'dummy', 'ethernet', 'geneve', 'l2tpv3', 'loopback', 'macsec', 'openvpn', 'pppoe', 'pseudo-ethernet', 'tunnel', 'vxlan', 'wireguard', 'wireless', 'wirelessmodem'])
+ self.assertEqual(self.xml.filling, False)
+ self.assertEqual(self.xml.word, last)
+ self.assertEqual(self.xml.check, False)
+ self.assertEqual(self.xml.final, False)
+ self.assertEqual(self.xml.extra, False)
+ self.assertEqual(self.xml.filled, False)
+ self.assertEqual(self.xml.plain, True)
+
+ def test_interfaces_w(self):
+ last = self.xml.traverse("interfaces w")
+ self.assertEqual(last, 'w')
+ self.assertEqual(self.xml.inside, ['interfaces'])
+ self.assertEqual(self.xml.options, ['wireguard', 'wireless', 'wirelessmodem'])
+ self.assertEqual(self.xml.filling, True)
+ self.assertEqual(self.xml.word, last)
+ self.assertEqual(self.xml.check, True)
+ self.assertEqual(self.xml.final, False)
+ self.assertEqual(self.xml.extra, False)
+ self.assertEqual(self.xml.filled, False)
+ self.assertEqual(self.xml.plain, True)
+
+ def test_interfaces_ethernet(self):
+ last = self.xml.traverse("interfaces ethernet")
+ self.assertEqual(last, '')
+ self.assertEqual(self.xml.inside, ['interfaces', 'ethernet'])
+ self.assertEqual(self.xml.options, [])
+ self.assertEqual(self.xml.filling, False)
+ self.assertEqual(self.xml.word, '')
+ self.assertEqual(self.xml.check, False)
+ self.assertEqual(self.xml.final, False)
+ self.assertEqual(self.xml.extra, False)
+ self.assertEqual(self.xml.filled, False)
+ self.assertEqual(self.xml.plain, False)
+
+ def test_interfaces_ethernet_space(self):
+ last = self.xml.traverse("interfaces ethernet ")
+ self.assertEqual(last, '')
+ self.assertEqual(self.xml.inside, ['interfaces', 'ethernet'])
+ self.assertEqual(self.xml.options, [])
+ self.assertEqual(self.xml.filling, False)
+ self.assertEqual(self.xml.word, '')
+ self.assertEqual(self.xml.check, False)
+ self.assertEqual(self.xml.final, False)
+ self.assertEqual(self.xml.extra, False)
+ self.assertEqual(self.xml.filled, False)
+ self.assertEqual(self.xml.plain, False)
+
+ def test_interfaces_ethernet_e(self):
+ last = self.xml.traverse("interfaces ethernet e")
+ self.assertEqual(last, 'e')
+ self.assertEqual(self.xml.inside, ['interfaces', 'ethernet'])
+ self.assertEqual(self.xml.options, [])
+ self.assertEqual(self.xml.filling, True)
+ self.assertEqual(self.xml.word, last)
+ self.assertEqual(self.xml.check, True)
+ self.assertEqual(self.xml.final, False)
+ self.assertEqual(self.xml.extra, False)
+ self.assertEqual(self.xml.filled, False)
+ self.assertEqual(self.xml.plain, False)
+
+ def test_interfaces_la(self):
+ last = self.xml.traverse("interfaces ethernet la")
+ self.assertEqual(last, 'la')
+ self.assertEqual(self.xml.inside, ['interfaces', 'ethernet'])
+ self.assertEqual(self.xml.options, [])
+ self.assertEqual(self.xml.filling, True)
+ self.assertEqual(self.xml.word, last)
+ self.assertEqual(self.xml.check, True)
+ self.assertEqual(self.xml.final, False)
+ self.assertEqual(self.xml.extra, False)
+ self.assertEqual(self.xml.filled, False)
+ self.assertEqual(self.xml.plain, False)
+
+ def test_interfaces_ethernet_lan0(self):
+ last = self.xml.traverse("interfaces ethernet lan0")
+ self.assertEqual(last, 'lan0')
+ self.assertEqual(self.xml.inside, ['interfaces', 'ethernet'])
+ self.assertEqual(self.xml.options, [])
+ self.assertEqual(self.xml.filling, True)
+ self.assertEqual(self.xml.word, last)
+ self.assertEqual(self.xml.check, True)
+ self.assertEqual(self.xml.final, False)
+ self.assertEqual(self.xml.extra, False)
+ self.assertEqual(self.xml.filled, False)
+ self.assertEqual(self.xml.plain, False)
+
+ def test_interfaces_ethernet_lan0_space(self):
+ last = self.xml.traverse("interfaces ethernet lan0 ")
+ self.assertEqual(last, '')
+ self.assertEqual(self.xml.inside, ['interfaces', 'ethernet'])
+ self.assertEqual(len(self.xml.options), 19)
+ self.assertEqual(self.xml.filling, False)
+ self.assertEqual(self.xml.word, last)
+ self.assertEqual(self.xml.check, False)
+ self.assertEqual(self.xml.final, False)
+ self.assertEqual(self.xml.extra, False)
+ self.assertEqual(self.xml.filled, True)
+ self.assertEqual(self.xml.plain, False)
+
+ def test_interfaces_ethernet_lan0_ad(self):
+ last = self.xml.traverse("interfaces ethernet lan0 ad")
+ self.assertEqual(last, 'ad')
+ self.assertEqual(self.xml.inside, ['interfaces', 'ethernet'])
+ self.assertEqual(self.xml.options, ['address'])
+ self.assertEqual(self.xml.filling, True)
+ self.assertEqual(self.xml.word, last)
+ self.assertEqual(self.xml.check, False)
+ self.assertEqual(self.xml.final, False)
+ self.assertEqual(self.xml.extra, False)
+ self.assertEqual(self.xml.filled, False)
+ self.assertEqual(self.xml.plain, False)
+
+ def test_interfaces_ethernet_lan0_address(self):
+ last = self.xml.traverse("interfaces ethernet lan0 address")
+ self.assertEqual(last, '')
+ self.assertEqual(self.xml.inside, ['interfaces', 'ethernet', 'address'])
+ self.assertEqual(self.xml.options, [])
+ self.assertEqual(self.xml.filling, False)
+ self.assertEqual(self.xml.word, last)
+ self.assertEqual(self.xml.check, False)
+ self.assertEqual(self.xml.final, False)
+ self.assertEqual(self.xml.extra, False)
+ self.assertEqual(self.xml.filled, False)
+ self.assertEqual(self.xml.plain, False)
+
+ def test_interfaces_ethernet_lan0_address_space(self):
+ last = self.xml.traverse("interfaces ethernet lan0 address ")
+ self.assertEqual(last, '')
+ self.assertEqual(self.xml.inside, ['interfaces', 'ethernet', 'address'])
+ self.assertEqual(self.xml.options, [])
+ self.assertEqual(self.xml.filling, False)
+ self.assertEqual(self.xml.word, last)
+ self.assertEqual(self.xml.check, False)
+ self.assertEqual(self.xml.final, False)
+ self.assertEqual(self.xml.extra, False)
+ self.assertEqual(self.xml.filled, False)
+ self.assertEqual(self.xml.plain, False)
+
+ def test_interfaces_ethernet_lan0_address_space_11(self):
+ last = self.xml.traverse("interfaces ethernet lan0 address 1.1")
+ self.assertEqual(last, '1.1')
+ self.assertEqual(self.xml.inside, ['interfaces', 'ethernet', 'address'])
+ self.assertEqual(self.xml.options, [])
+ self.assertEqual(self.xml.filling, True)
+ self.assertEqual(self.xml.word, last)
+ self.assertEqual(self.xml.check, True)
+ self.assertEqual(self.xml.final, True)
+ self.assertEqual(self.xml.extra, False)
+ self.assertEqual(self.xml.filled, True)
+ self.assertEqual(self.xml.plain, False)
+
+ def test_interfaces_ethernet_lan0_address_space_1111_32(self):
+ last = self.xml.traverse("interfaces ethernet lan0 address 1.1.1.1/32")
+ self.assertEqual(last, '1.1.1.1/32')
+ self.assertEqual(self.xml.inside, ['interfaces', 'ethernet', 'address'])
+ self.assertEqual(self.xml.options, [])
+ self.assertEqual(self.xml.filling, True)
+ self.assertEqual(self.xml.word, last)
+ self.assertEqual(self.xml.check, True)
+ self.assertEqual(self.xml.final, True)
+ self.assertEqual(self.xml.extra, False)
+ self.assertEqual(self.xml.filled, True)
+ self.assertEqual(self.xml.plain, False)
+
+ def test_interfaces_ethernet_lan0_address_space_1111_32_space(self):
+ last = self.xml.traverse("interfaces ethernet lan0 address 1.1.1.1/32 ")
+ self.assertEqual(last, '1.1.1.1/32')
+ self.assertEqual(self.xml.inside, ['interfaces', 'ethernet', 'address'])
+ self.assertEqual(self.xml.options, [])
+ self.assertEqual(self.xml.filling, True)
+ self.assertEqual(self.xml.word, last)
+ self.assertEqual(self.xml.check, True)
+ self.assertEqual(self.xml.final, True)
+ self.assertEqual(self.xml.extra, False)
+ self.assertEqual(self.xml.filled, True)
+ self.assertEqual(self.xml.plain, False)
+
+ def test_interfaces_ethernet_lan0_address_space_1111_32_space_text(self):
+ last = self.xml.traverse("interfaces ethernet lan0 address 1.1.1.1/32 text")
+ self.assertEqual(last, '1.1.1.1/32 text')
+ self.assertEqual(self.xml.inside, ['interfaces', 'ethernet', 'address'])
+ self.assertEqual(self.xml.options, [])
+ self.assertEqual(self.xml.filling, True)
+ self.assertEqual(self.xml.word, last)
+ self.assertEqual(self.xml.check, True)
+ self.assertEqual(self.xml.final, True)
+ self.assertEqual(self.xml.extra, False)
+ self.assertEqual(self.xml.filled, True)
+ self.assertEqual(self.xml.plain, False)
+
+ def test_interfaces_ethernet_lan0_address_space_1111_32_space_text_space(self):
+ last = self.xml.traverse("interfaces ethernet lan0 address 1.1.1.1/32 text ")
+ self.assertEqual(last, '1.1.1.1/32 text')
+ self.assertEqual(self.xml.inside, ['interfaces', 'ethernet', 'address'])
+ self.assertEqual(self.xml.options, [])
+ self.assertEqual(self.xml.filling, True)
+ self.assertEqual(self.xml.word, last)
+ self.assertEqual(self.xml.check, True)
+ self.assertEqual(self.xml.final, True)
+ self.assertEqual(self.xml.extra, False)
+ self.assertEqual(self.xml.filled, True)
+ self.assertEqual(self.xml.plain, False)
+
+ # Need to add a check for a valuless leafNode \ No newline at end of file