summaryrefslogtreecommitdiff
path: root/scripts
diff options
context:
space:
mode:
Diffstat (limited to 'scripts')
-rwxr-xr-xscripts/build-command-op-templates225
-rwxr-xr-xscripts/build-command-templates306
-rwxr-xr-xscripts/build-component-versions47
-rw-r--r--scripts/cli/base_interfaces_test.py269
-rwxr-xr-xscripts/cli/test_interfaces_bonding.py61
-rwxr-xr-xscripts/cli/test_interfaces_bridge.py70
-rwxr-xr-xscripts/cli/test_interfaces_dummy.py28
-rwxr-xr-xscripts/cli/test_interfaces_ethernet.py68
-rwxr-xr-xscripts/cli/test_interfaces_geneve.py37
-rwxr-xr-xscripts/cli/test_interfaces_l2tpv3.py59
-rwxr-xr-xscripts/cli/test_interfaces_loopback.py41
-rwxr-xr-xscripts/cli/test_interfaces_macsec.py102
-rwxr-xr-xscripts/cli/test_interfaces_pppoe.py162
-rwxr-xr-xscripts/cli/test_interfaces_pseudo_ethernet.py39
-rwxr-xr-xscripts/cli/test_interfaces_tunnel.py109
-rwxr-xr-xscripts/cli/test_interfaces_vxlan.py35
-rwxr-xr-xscripts/cli/test_interfaces_wireguard.py68
-rwxr-xr-xscripts/cli/test_interfaces_wireless.py58
-rwxr-xr-xscripts/cli/test_interfaces_wirelessmodem.py82
-rwxr-xr-xscripts/cli/test_nat.py63
-rwxr-xr-xscripts/cli/test_service_bcast-relay.py71
-rwxr-xr-xscripts/cli/test_service_dns_dynamic.py141
-rwxr-xr-xscripts/cli/test_service_mdns-repeater.py51
-rwxr-xr-xscripts/cli/test_service_pppoe-server.py167
-rwxr-xr-xscripts/cli/test_service_router-advert.py99
-rwxr-xr-xscripts/cli/test_service_snmp.py155
-rwxr-xr-xscripts/cli/test_service_ssh.py116
-rwxr-xr-xscripts/cli/test_system_lcd.py54
-rwxr-xr-xscripts/cli/test_system_login.py67
-rwxr-xr-xscripts/cli/test_system_nameserver.py66
-rwxr-xr-xscripts/cli/test_system_ntp.py108
-rwxr-xr-xscripts/cli/test_vpn_anyconnect.py58
-rwxr-xr-xscripts/cli/test_vrf.py52
-rwxr-xr-xscripts/import-conf-mode-commands255
-rwxr-xr-xscripts/system/test_module_load.py42
35 files changed, 833 insertions, 2598 deletions
diff --git a/scripts/build-command-op-templates b/scripts/build-command-op-templates
new file mode 100755
index 000000000..c60b32a1e
--- /dev/null
+++ b/scripts/build-command-op-templates
@@ -0,0 +1,225 @@
+#!/usr/bin/env python3
+#
+# build-command-template: converts new style command definitions in XML
+# to the old style (bunch of dirs and node.def's) command templates
+#
+# Copyright (C) 2017 VyOS maintainers <maintainers@vyos.net>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+# USA
+
+import sys
+import os
+import argparse
+import copy
+import functools
+
+from lxml import etree as ET
+
+# Defaults
+
+validator_dir = "/opt/vyatta/libexec/validators"
+default_constraint_err_msg = "Invalid value"
+
+
+## Get arguments
+
+parser = argparse.ArgumentParser(description='Converts new-style XML interface definitions to old-style command templates')
+parser.add_argument('--debug', help='Enable debug information output', action='store_true')
+parser.add_argument('INPUT_FILE', type=str, help="XML interface definition file")
+parser.add_argument('SCHEMA_FILE', type=str, help="RelaxNG schema file")
+parser.add_argument('OUTPUT_DIR', type=str, help="Output directory")
+
+args = parser.parse_args()
+
+input_file = args.INPUT_FILE
+schema_file = args.SCHEMA_FILE
+output_dir = args.OUTPUT_DIR
+debug = args.debug
+
+## Load and validate the inputs
+
+try:
+ xml = ET.parse(input_file)
+except Exception as e:
+ print("Failed to load interface definition file {0}".format(input_file))
+ print(e)
+ sys.exit(1)
+
+try:
+ relaxng_xml = ET.parse(schema_file)
+ validator = ET.RelaxNG(relaxng_xml)
+
+ if not validator.validate(xml):
+ print(validator.error_log)
+ print("Interface definition file {0} does not match the schema!".format(input_file))
+ sys.exit(1)
+except Exception as e:
+ print("Failed to load the XML schema {0}".format(schema_file))
+ print(e)
+ sys.exit(1)
+
+if not os.access(output_dir, os.W_OK):
+ print("The output directory {0} is not writeable".format(output_dir))
+ sys.exit(1)
+
+## If we got this far, everything must be ok and we can convert the file
+
+def make_path(l):
+ path = functools.reduce(os.path.join, l)
+ if debug:
+ print(path)
+ return path
+
+def get_properties(p):
+ props = {}
+
+ if p is None:
+ return props
+
+ # Get the help string
+ try:
+ props["help"] = p.find("help").text
+ except:
+ props["help"] = "No help available"
+
+
+ # Get the completion help strings
+ try:
+ che = p.findall("completionHelp")
+ ch = ""
+ for c in che:
+ scripts = c.findall("script")
+ paths = c.findall("path")
+ lists = c.findall("list")
+
+ # Current backend doesn't support multiple allowed: tags
+ # so we get to emulate it
+ comp_exprs = []
+ for i in lists:
+ comp_exprs.append("echo \"{0}\"".format(i.text))
+ for i in paths:
+ comp_exprs.append("/bin/cli-shell-api listActiveNodes {0} | sed -e \"s/'//g\" && echo".format(i.text))
+ for i in scripts:
+ comp_exprs.append("{0}".format(i.text))
+ comp_help = " && ".join(comp_exprs)
+ props["comp_help"] = comp_help
+ except:
+ props["comp_help"] = []
+
+ return props
+
+
+def make_node_def(props, command):
+ # XXX: replace with a template processor if it grows
+ # out of control
+
+ node_def = ""
+
+ if "help" in props:
+ node_def += "help: {0}\n".format(props["help"])
+
+
+ if "comp_help" in props:
+ node_def += "allowed: {0}\n".format(props["comp_help"])
+
+
+ if command is not None:
+ node_def += "run: {0}\n".format(command.text)
+
+
+ if debug:
+ print("The contents of the node.def file:\n", node_def)
+
+ return node_def
+
+def process_node(n, tmpl_dir):
+ # Avoid mangling the path from the outer call
+ my_tmpl_dir = copy.copy(tmpl_dir)
+
+ props_elem = n.find("properties")
+ children = n.find("children")
+ command = n.find("command")
+
+ name = n.get("name")
+
+ node_type = n.tag
+
+ my_tmpl_dir.append(name)
+
+ if debug:
+ print("Name of the node: {};\n Created directory: ".format(name), end="")
+ os.makedirs(make_path(my_tmpl_dir), exist_ok=True)
+
+ props = get_properties(props_elem)
+
+ if node_type == "node":
+ if debug:
+ print("Processing node {}".format(name))
+
+ nodedef_path = os.path.join(make_path(my_tmpl_dir), "node.def")
+ if not os.path.exists(nodedef_path):
+ with open(nodedef_path, "w") as f:
+ f.write(make_node_def(props, command))
+ else:
+ # Something has already generated this file
+ pass
+
+ if children is not None:
+ inner_nodes = children.iterfind("*")
+ for inner_n in inner_nodes:
+ process_node(inner_n, my_tmpl_dir)
+ if node_type == "tagNode":
+ if debug:
+ print("Processing tag node {}".format(name))
+
+ os.makedirs(make_path(my_tmpl_dir), exist_ok=True)
+
+ nodedef_path = os.path.join(make_path(my_tmpl_dir), "node.def")
+ if not os.path.exists(nodedef_path):
+ with open(nodedef_path, "w") as f:
+ f.write('help: {0}\n'.format(props['help']))
+ else:
+ # Something has already generated this file
+ pass
+
+ # Create the inner node.tag part
+ my_tmpl_dir.append("node.tag")
+ os.makedirs(make_path(my_tmpl_dir), exist_ok=True)
+ if debug:
+ print("Created path for the tagNode: {}".format(make_path(my_tmpl_dir)), end="")
+
+ # Not sure if we want partially defined tag nodes, write the file unconditionally
+ with open(os.path.join(make_path(my_tmpl_dir), "node.def"), "w") as f:
+ f.write(make_node_def(props, command))
+
+ if children is not None:
+ inner_nodes = children.iterfind("*")
+ for inner_n in inner_nodes:
+ process_node(inner_n, my_tmpl_dir)
+ else:
+ # This is a leaf node
+ if debug:
+ print("Processing leaf node {}".format(name))
+
+ with open(os.path.join(make_path(my_tmpl_dir), "node.def"), "w") as f:
+ f.write(make_node_def(props, command))
+
+
+root = xml.getroot()
+
+nodes = root.iterfind("*")
+for n in nodes:
+ process_node(n, [output_dir])
diff --git a/scripts/build-command-templates b/scripts/build-command-templates
new file mode 100755
index 000000000..457adbec2
--- /dev/null
+++ b/scripts/build-command-templates
@@ -0,0 +1,306 @@
+#!/usr/bin/env python3
+#
+# build-command-template: converts new style command definitions in XML
+# to the old style (bunch of dirs and node.def's) command templates
+#
+# Copyright (C) 2017 VyOS maintainers <maintainers@vyos.net>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+# USA
+
+import sys
+import os
+import argparse
+import copy
+import functools
+
+from lxml import etree as ET
+
+# Defaults
+
+#validator_dir = "/usr/libexec/vyos/validators"
+validator_dir = "${vyos_validators_dir}"
+default_constraint_err_msg = "Invalid value"
+
+
+## Get arguments
+
+parser = argparse.ArgumentParser(description='Converts new-style XML interface definitions to old-style command templates')
+parser.add_argument('--debug', help='Enable debug information output', action='store_true')
+parser.add_argument('INPUT_FILE', type=str, help="XML interface definition file")
+parser.add_argument('SCHEMA_FILE', type=str, help="RelaxNG schema file")
+parser.add_argument('OUTPUT_DIR', type=str, help="Output directory")
+
+args = parser.parse_args()
+
+input_file = args.INPUT_FILE
+schema_file = args.SCHEMA_FILE
+output_dir = args.OUTPUT_DIR
+debug = args.debug
+
+#debug = True
+
+## Load and validate the inputs
+
+try:
+ xml = ET.parse(input_file)
+except Exception as e:
+ print("Failed to load interface definition file {0}".format(input_file))
+ print(e)
+ sys.exit(1)
+
+try:
+ relaxng_xml = ET.parse(schema_file)
+ validator = ET.RelaxNG(relaxng_xml)
+
+ if not validator.validate(xml):
+ print(validator.error_log)
+ print("Interface definition file {0} does not match the schema!".format(input_file))
+ sys.exit(1)
+except Exception as e:
+ print("Failed to load the XML schema {0}".format(schema_file))
+ print(e)
+ sys.exit(1)
+
+if not os.access(output_dir, os.W_OK):
+ print("The output directory {0} is not writeable".format(output_dir))
+ sys.exit(1)
+
+## If we got this far, everything must be ok and we can convert the file
+
+def make_path(l):
+ path = functools.reduce(os.path.join, l)
+ if debug:
+ print(path)
+ return path
+
+def get_properties(p):
+ props = {}
+
+ if p is None:
+ return props
+
+ # Get the help string
+ try:
+ props["help"] = p.find("help").text
+ except:
+ pass
+
+ # Get value help strings
+ try:
+ vhe = p.findall("valueHelp")
+ vh = []
+ for v in vhe:
+ vh.append( (v.find("format").text, v.find("description").text) )
+ props["val_help"] = vh
+ except:
+ props["val_help"] = []
+
+ # Get the constraint statements
+ error_msg = default_constraint_err_msg
+ # Get the error message if it's there
+ try:
+ error_msg = p.find("constraintErrorMessage").text
+ except:
+ pass
+
+ vce = p.find("constraint")
+ vc = []
+ if vce is not None:
+ # The old backend doesn't support multiple validators in OR mode
+ # so we emulate it
+
+ regexes = []
+ regex_elements = vce.findall("regex")
+ if regex_elements is not None:
+ regexes = list(map(lambda e: e.text.strip().replace('\\','\\\\'), regex_elements))
+ if "" in regexes:
+ print("Warning: empty regex, node will be accepting any value")
+
+ validator_elements = vce.findall("validator")
+ validators = []
+ if validator_elements is not None:
+ for v in validator_elements:
+ v_name = os.path.join(validator_dir, v.get("name"))
+
+ # XXX: lxml returns None for empty arguments
+ v_argument = None
+ try:
+ v_argument = v.get("argument")
+ except:
+ pass
+ if v_argument is None:
+ v_argument = ""
+
+ validators.append("{0} {1}".format(v_name, v_argument))
+
+
+ regex_args = " ".join(map(lambda s: "--regex \\\'{0}\\\'".format(s), regexes))
+ validator_args = " ".join(map(lambda s: "--exec \\\"{0}\\\"".format(s), validators))
+ validator_script = '${vyos_libexec_dir}/validate-value'
+ validator_string = "exec \"{0} {1} {2} --value \\\'$VAR(@)\\\'\"; \"{3}\"".format(validator_script, regex_args, validator_args, error_msg)
+
+ props["constraint"] = validator_string
+
+ # Get the completion help strings
+ try:
+ che = p.findall("completionHelp")
+ ch = ""
+ for c in che:
+ scripts = c.findall("script")
+ paths = c.findall("path")
+ lists = c.findall("list")
+
+ # Current backend doesn't support multiple allowed: tags
+ # so we get to emulate it
+ comp_exprs = []
+ for i in lists:
+ comp_exprs.append("echo \"{0}\"".format(i.text))
+ for i in paths:
+ comp_exprs.append("/bin/cli-shell-api listNodes {0}".format(i.text))
+ for i in scripts:
+ comp_exprs.append("sh -c \"{0}\"".format(i.text))
+ comp_help = " && ".join(comp_exprs)
+ props["comp_help"] = comp_help
+ except:
+ props["comp_help"] = []
+
+ # Get priority
+ try:
+ props["priority"] = p.find("priority").text
+ except:
+ pass
+
+ # Get "multi"
+ if p.find("multi") is not None:
+ props["multi"] = True
+
+ # Get "valueless"
+ if p.find("valueless") is not None:
+ props["valueless"] = True
+
+ return props
+
+def make_node_def(props):
+ # XXX: replace with a template processor if it grows
+ # out of control
+
+ node_def = ""
+
+ if "tag" in props:
+ node_def += "tag:\n"
+
+ if "multi" in props:
+ node_def += "multi:\n"
+
+ if "type" in props:
+ # Will always be txt in practice if it's set
+ node_def += "type: {0}\n".format(props["type"])
+
+ if "priority" in props:
+ node_def += "priority: {0}\n".format(props["priority"])
+
+ if "help" in props:
+ node_def += "help: {0}\n".format(props["help"])
+
+ if "val_help" in props:
+ for v in props["val_help"]:
+ node_def += "val_help: {0}; {1}\n".format(v[0], v[1])
+
+ if "comp_help" in props:
+ node_def += "allowed: {0}\n".format(props["comp_help"])
+
+ if "constraint" in props:
+ node_def += "syntax:expression: {0}\n".format(props["constraint"])
+
+ if "owner" in props:
+ if "tag" in props:
+ node_def += "end: sudo sh -c \"VYOS_TAGNODE_VALUE='$VAR(@)' {0}\"\n".format(props["owner"])
+ else:
+ node_def += "end: sudo sh -c \"{0}\"\n".format(props["owner"])
+
+ if debug:
+ print("The contents of the node.def file:\n", node_def)
+
+ return node_def
+
+def process_node(n, tmpl_dir):
+ # Avoid mangling the path from the outer call
+ my_tmpl_dir = copy.copy(tmpl_dir)
+
+ props_elem = n.find("properties")
+ children = n.find("children")
+
+ name = n.get("name")
+ owner = n.get("owner")
+ node_type = n.tag
+
+ my_tmpl_dir.append(name)
+
+ if debug:
+ print("Name of the node: {0}. Created directory: {1}\n".format(name, "/".join(my_tmpl_dir)), end="")
+ os.makedirs(make_path(my_tmpl_dir), exist_ok=True)
+
+ props = get_properties(props_elem)
+ if owner:
+ props["owner"] = owner
+ # Type should not be set for non-tag, non-leaf nodes
+ # For non-valueless leaf nodes, set the type to txt: to make them have some type,
+ # actual value validation is handled by constraints translated to syntax:expression:
+ if node_type != "node":
+ if "valueless" not in props.keys():
+ props["type"] = "txt"
+ if node_type == "tagNode":
+ props["tag"] = "True"
+
+ if node_type != "leafNode":
+ if "multi" in props:
+ raise ValueError("<multi/> tag is only allowed in <leafNode>")
+ if "valueless" in props:
+ raise ValueError("<valueless/> is only allowed in <leafNode>")
+
+ nodedef_path = os.path.join(make_path(my_tmpl_dir), "node.def")
+ if not os.path.exists(nodedef_path):
+ with open(nodedef_path, "w") as f:
+ f.write(make_node_def(props))
+ else:
+ # Something has already generated that file
+ pass
+
+
+ if node_type == "node":
+ inner_nodes = children.iterfind("*")
+ for inner_n in inner_nodes:
+ process_node(inner_n, my_tmpl_dir)
+ if node_type == "tagNode":
+ my_tmpl_dir.append("node.tag")
+ if debug:
+ print("Created path for the tagNode:", end="")
+ os.makedirs(make_path(my_tmpl_dir), exist_ok=True)
+ inner_nodes = children.iterfind("*")
+ for inner_n in inner_nodes:
+ process_node(inner_n, my_tmpl_dir)
+ else:
+ # This is a leaf node
+ pass
+
+
+root = xml.getroot()
+
+nodes = root.iterfind("*")
+for n in nodes:
+ if n.tag == "syntaxVersion":
+ continue
+ process_node(n, [output_dir])
diff --git a/scripts/build-component-versions b/scripts/build-component-versions
new file mode 100755
index 000000000..5362dbdd4
--- /dev/null
+++ b/scripts/build-component-versions
@@ -0,0 +1,47 @@
+#!/usr/bin/env python3
+
+import sys
+import os
+import argparse
+import json
+
+from lxml import etree as ET
+
+parser = argparse.ArgumentParser()
+parser.add_argument('INPUT_DIR', type=str,
+ help="Directory containing XML interface definition files")
+parser.add_argument('OUTPUT_DIR', type=str,
+ help="Output directory for JSON file")
+
+args = parser.parse_args()
+
+input_dir = args.INPUT_DIR
+output_dir = args.OUTPUT_DIR
+
+version_dict = {}
+
+for filename in os.listdir(input_dir):
+ filepath = os.path.join(input_dir, filename)
+ print(filepath)
+ try:
+ xml = ET.parse(filepath)
+ except Exception as e:
+ print("Failed to load interface definition file {0}".format(filename))
+ print(e)
+ sys.exit(1)
+
+ root = xml.getroot()
+ version_data = root.iterfind("syntaxVersion")
+ for ver in version_data:
+ component = ver.get("component")
+ version = int(ver.get("version"))
+
+ v = version_dict.get(component)
+ if v is None:
+ version_dict[component] = version
+ elif version > v:
+ version_dict[component] = version
+
+out_file = os.path.join(output_dir, 'component-versions.json')
+with open(out_file, 'w') as f:
+ json.dump(version_dict, f, indent=4, sort_keys=True)
diff --git a/scripts/cli/base_interfaces_test.py b/scripts/cli/base_interfaces_test.py
deleted file mode 100644
index 14ec7e137..000000000
--- a/scripts/cli/base_interfaces_test.py
+++ /dev/null
@@ -1,269 +0,0 @@
-# Copyright (C) 2019-2020 VyOS maintainers and contributors
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 2 or later as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import unittest
-
-from netifaces import ifaddresses, AF_INET, AF_INET6
-
-from vyos.configsession import ConfigSession
-from vyos.ifconfig import Interface
-from vyos.util import read_file
-from vyos.validate import is_intf_addr_assigned, is_ipv6_link_local
-
-class BasicInterfaceTest:
- class BaseTest(unittest.TestCase):
- _test_ip = False
- _test_mtu = False
- _test_vlan = False
- _test_qinq = False
- _test_ipv6 = False
- _base_path = []
-
- _options = {}
- _interfaces = []
- _qinq_range = ['10', '20', '30']
- _vlan_range = ['100', '200', '300', '2000']
- # choose IPv6 minimum MTU value for tests - this must always work
- _mtu = '1280'
-
- def setUp(self):
- self.session = ConfigSession(os.getpid())
-
- self._test_addr = ['192.0.2.1/26', '192.0.2.255/31', '192.0.2.64/32',
- '2001:db8:1::ffff/64', '2001:db8:101::1/112']
- self._test_mtu = False
- self._options = {}
-
- def tearDown(self):
- # we should not remove ethernet from the overall CLI
- if 'ethernet' in self._base_path:
- for interface in self._interfaces:
- # when using a dedicated interface to test via TEST_ETH environment
- # variable only this one will be cleared in the end - usable to test
- # ethernet interfaces via SSH
- self.session.delete(self._base_path + [interface])
- self.session.set(self._base_path + [interface, 'duplex', 'auto'])
- self.session.set(self._base_path + [interface, 'speed', 'auto'])
- self.session.set(self._base_path + [interface, 'smp-affinity', 'auto'])
- else:
- self.session.delete(self._base_path)
-
- self.session.commit()
- del self.session
-
- def test_add_description(self):
- """
- Check if description can be added to interface
- """
- for intf in self._interfaces:
- test_string='Description-Test-{}'.format(intf)
- self.session.set(self._base_path + [intf, 'description', test_string])
- for option in self._options.get(intf, []):
- self.session.set(self._base_path + [intf] + option.split())
-
- self.session.commit()
-
- # Validate interface description
- for intf in self._interfaces:
- test_string='Description-Test-{}'.format(intf)
- with open('/sys/class/net/{}/ifalias'.format(intf), 'r') as f:
- tmp = f.read().rstrip()
- self.assertTrue(tmp, test_string)
-
- def test_add_address_single(self):
- """
- Check if a single address can be added to interface.
- """
- addr = '192.0.2.0/31'
- for intf in self._interfaces:
- self.session.set(self._base_path + [intf, 'address', addr])
- for option in self._options.get(intf, []):
- self.session.set(self._base_path + [intf] + option.split())
-
- self.session.commit()
-
- for intf in self._interfaces:
- self.assertTrue(is_intf_addr_assigned(intf, addr))
-
- def test_add_address_multi(self):
- """
- Check if IPv4/IPv6 addresses can be added to interface.
- """
-
- # Add address
- for intf in self._interfaces:
- for addr in self._test_addr:
- self.session.set(self._base_path + [intf, 'address', addr])
- for option in self._options.get(intf, []):
- self.session.set(self._base_path + [intf] + option.split())
-
- self.session.commit()
-
- # Validate address
- for intf in self._interfaces:
- for af in AF_INET, AF_INET6:
- for addr in ifaddresses(intf)[af]:
- # checking link local addresses makes no sense
- if is_ipv6_link_local(addr['addr']):
- continue
-
- self.assertTrue(is_intf_addr_assigned(intf, addr['addr']))
-
- def test_ipv6_link_local(self):
- """ Common function for IPv6 link-local address assignemnts """
- if not self._test_ipv6:
- return None
-
- for interface in self._interfaces:
- base = self._base_path + [interface]
- for option in self._options.get(interface, []):
- self.session.set(base + option.split())
-
- # after commit we must have an IPv6 link-local address
- self.session.commit()
-
- for interface in self._interfaces:
- for addr in ifaddresses(interface)[AF_INET6]:
- self.assertTrue(is_ipv6_link_local(addr['addr']))
-
- # disable IPv6 link-local address assignment
- for interface in self._interfaces:
- base = self._base_path + [interface]
- self.session.set(base + ['ipv6', 'address', 'no-default-link-local'])
-
- # after commit we must have no IPv6 link-local address
- self.session.commit()
-
- for interface in self._interfaces:
- self.assertTrue(AF_INET6 not in ifaddresses(interface))
-
- def _mtu_test(self, intf):
- """ helper function to verify MTU size """
- with open('/sys/class/net/{}/mtu'.format(intf), 'r') as f:
- tmp = f.read().rstrip()
- self.assertEqual(tmp, self._mtu)
-
- def test_change_mtu(self):
- """ Testcase if MTU can be changed on interface """
- if not self._test_mtu:
- return None
- for intf in self._interfaces:
- base = self._base_path + [intf]
- self.session.set(base + ['mtu', self._mtu])
- for option in self._options.get(intf, []):
- self.session.set(base + option.split())
-
- self.session.commit()
- for intf in self._interfaces:
- self._mtu_test(intf)
-
- def test_8021q_vlan(self):
- """ Testcase for 802.1q VLAN interfaces """
- if not self._test_vlan:
- return None
-
- for interface in self._interfaces:
- base = self._base_path + [interface]
- for option in self._options.get(interface, []):
- self.session.set(base + option.split())
-
- for vlan in self._vlan_range:
- base = self._base_path + [interface, 'vif', vlan]
- self.session.set(base + ['mtu', self._mtu])
- for address in self._test_addr:
- self.session.set(base + ['address', address])
-
- self.session.commit()
- for intf in self._interfaces:
- for vlan in self._vlan_range:
- vif = f'{intf}.{vlan}'
- for address in self._test_addr:
- self.assertTrue(is_intf_addr_assigned(vif, address))
- self._mtu_test(vif)
-
-
- def test_8021ad_qinq_vlan(self):
- """ Testcase for 802.1ad Q-in-Q VLAN interfaces """
- if not self._test_qinq:
- return None
-
- for interface in self._interfaces:
- base = self._base_path + [interface]
- for option in self._options.get(interface, []):
- self.session.set(base + option.split())
-
- for vif_s in self._qinq_range:
- for vif_c in self._vlan_range:
- base = self._base_path + [interface, 'vif-s', vif_s, 'vif-c', vif_c]
- self.session.set(base + ['mtu', self._mtu])
- for address in self._test_addr:
- self.session.set(base + ['address', address])
-
- self.session.commit()
- for interface in self._interfaces:
- for vif_s in self._qinq_range:
- for vif_c in self._vlan_range:
- vif = f'{interface}.{vif_s}.{vif_c}'
- for address in self._test_addr:
- self.assertTrue(is_intf_addr_assigned(vif, address))
- self._mtu_test(vif)
-
- def test_ip_options(self):
- """ test IP options like arp """
- if not self._test_ip:
- return None
-
- for interface in self._interfaces:
- arp_tmo = '300'
- path = self._base_path + [interface]
- for option in self._options.get(interface, []):
- self.session.set(path + option.split())
-
- # Options
- self.session.set(path + ['ip', 'arp-cache-timeout', arp_tmo])
- self.session.set(path + ['ip', 'disable-arp-filter'])
- self.session.set(path + ['ip', 'enable-arp-accept'])
- self.session.set(path + ['ip', 'enable-arp-announce'])
- self.session.set(path + ['ip', 'enable-arp-ignore'])
- self.session.set(path + ['ip', 'enable-proxy-arp'])
- self.session.set(path + ['ip', 'proxy-arp-pvlan'])
- self.session.set(path + ['ip', 'source-validation', 'loose'])
-
- self.session.commit()
-
- for interface in self._interfaces:
- tmp = read_file(f'/proc/sys/net/ipv4/neigh/{interface}/base_reachable_time_ms')
- self.assertEqual(tmp, str((int(arp_tmo) * 1000))) # tmo value is in milli seconds
-
- tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/arp_filter')
- self.assertEqual('0', tmp)
-
- tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/arp_accept')
- self.assertEqual('1', tmp)
-
- tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/arp_announce')
- self.assertEqual('1', tmp)
-
- tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/arp_ignore')
- self.assertEqual('1', tmp)
-
- tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/proxy_arp')
- self.assertEqual('1', tmp)
-
- tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/proxy_arp_pvlan')
- self.assertEqual('1', tmp)
-
- tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/rp_filter')
- self.assertEqual('2', tmp)
diff --git a/scripts/cli/test_interfaces_bonding.py b/scripts/cli/test_interfaces_bonding.py
deleted file mode 100755
index e3d3b25ee..000000000
--- a/scripts/cli/test_interfaces_bonding.py
+++ /dev/null
@@ -1,61 +0,0 @@
-#!/usr/bin/env python3
-#
-# Copyright (C) 2020 VyOS maintainers and contributors
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 2 or later as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import unittest
-
-from base_interfaces_test import BasicInterfaceTest
-
-from vyos.ifconfig import Section
-from vyos.configsession import ConfigSessionError
-from vyos.util import read_file
-
-class BondingInterfaceTest(BasicInterfaceTest.BaseTest):
- def setUp(self):
- super().setUp()
-
- self._base_path = ['interfaces', 'bonding']
- self._interfaces = ['bond0']
- self._test_mtu = True
- self._test_vlan = True
- self._test_qinq = True
- self._test_ipv6 = True
-
- self._members = []
- # we need to filter out VLAN interfaces identified by a dot (.)
- # in their name - just in case!
- if 'TEST_ETH' in os.environ:
- self._members = os.environ['TEST_ETH'].split()
- else:
- for tmp in Section.interfaces("ethernet"):
- if not '.' in tmp:
- self._members.append(tmp)
-
- self._options['bond0'] = []
- for member in self._members:
- self._options['bond0'].append(f'member interface {member}')
-
-
- def test_add_address_single(self):
- """ derived method to check if member interfaces are enslaved properly """
- super().test_add_address_single()
-
- for interface in self._interfaces:
- slaves = read_file(f'/sys/class/net/{interface}/bonding/slaves').split()
- self.assertListEqual(slaves, self._members)
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/scripts/cli/test_interfaces_bridge.py b/scripts/cli/test_interfaces_bridge.py
deleted file mode 100755
index bc0bb69c6..000000000
--- a/scripts/cli/test_interfaces_bridge.py
+++ /dev/null
@@ -1,70 +0,0 @@
-#!/usr/bin/env python3
-#
-# Copyright (C) 2020 VyOS maintainers and contributors
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 2 or later as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import unittest
-
-from base_interfaces_test import BasicInterfaceTest
-from vyos.ifconfig import Section
-
-class BridgeInterfaceTest(BasicInterfaceTest.BaseTest):
- def setUp(self):
- super().setUp()
-
- self._test_ipv6 = True
-
- self._base_path = ['interfaces', 'bridge']
- self._interfaces = ['br0']
-
- self._members = []
- # we need to filter out VLAN interfaces identified by a dot (.)
- # in their name - just in case!
- if 'TEST_ETH' in os.environ:
- self._members = os.environ['TEST_ETH'].split()
- else:
- for tmp in Section.interfaces("ethernet"):
- if not '.' in tmp:
- self._members.append(tmp)
-
- self._options['br0'] = []
- for member in self._members:
- self._options['br0'].append(f'member interface {member}')
-
- def test_add_remove_member(self):
- for interface in self._interfaces:
- base = self._base_path + [interface]
- self.session.set(base + ['stp'])
- self.session.set(base + ['address', '192.0.2.1/24'])
-
- cost = 1000
- priority = 10
- # assign members to bridge interface
- for member in self._members:
- base_member = base + ['member', 'interface', member]
- self.session.set(base_member + ['cost', str(cost)])
- self.session.set(base_member + ['priority', str(priority)])
- cost += 1
- priority += 1
-
- self.session.commit()
-
- for interface in self._interfaces:
- self.session.delete(self._base_path + [interface, 'member'])
-
- self.session.commit()
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/scripts/cli/test_interfaces_dummy.py b/scripts/cli/test_interfaces_dummy.py
deleted file mode 100755
index 01942fc89..000000000
--- a/scripts/cli/test_interfaces_dummy.py
+++ /dev/null
@@ -1,28 +0,0 @@
-#!/usr/bin/env python3
-#
-# Copyright (C) 2020 VyOS maintainers and contributors
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 2 or later as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import unittest
-
-from base_interfaces_test import BasicInterfaceTest
-
-class DummyInterfaceTest(BasicInterfaceTest.BaseTest):
- def setUp(self):
- super().setUp()
- self._base_path = ['interfaces', 'dummy']
- self._interfaces = ['dum0', 'dum1', 'dum2']
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/scripts/cli/test_interfaces_ethernet.py b/scripts/cli/test_interfaces_ethernet.py
deleted file mode 100755
index 761ec7506..000000000
--- a/scripts/cli/test_interfaces_ethernet.py
+++ /dev/null
@@ -1,68 +0,0 @@
-#!/usr/bin/env python3
-#
-# Copyright (C) 2020 VyOS maintainers and contributors
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 2 or later as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import unittest
-
-from base_interfaces_test import BasicInterfaceTest
-from vyos.ifconfig import Section
-
-class EthernetInterfaceTest(BasicInterfaceTest.BaseTest):
- def setUp(self):
- super().setUp()
-
- self._base_path = ['interfaces', 'ethernet']
- self._test_ip = True
- self._test_mtu = True
- self._test_vlan = True
- self._test_qinq = True
- self._test_ipv6 = True
- self._interfaces = []
-
- # we need to filter out VLAN interfaces identified by a dot (.)
- # in their name - just in case!
- if 'TEST_ETH' in os.environ:
- tmp = os.environ['TEST_ETH'].split()
- self._interfaces = tmp
- else:
- for tmp in Section.interfaces("ethernet"):
- if not '.' in tmp:
- self._interfaces.append(tmp)
-
- def test_dhcp_disable(self):
- """
- When interface is configured as admin down, it must be admin down even
- """
- for interface in self._interfaces:
- self.session.set(self._base_path + [interface, 'disable'])
- for option in self._options.get(interface, []):
- self.session.set(self._base_path + [interface] + option.split())
-
- # Also enable DHCP (ISC DHCP always places interface in admin up
- # state so we check that we do not start DHCP client.
- # https://phabricator.vyos.net/T2767
- self.session.set(self._base_path + [interface, 'address', 'dhcp'])
-
- self.session.commit()
-
- # Validate interface state
- for interface in self._interfaces:
- with open(f'/sys/class/net/{interface}/flags', 'r') as f:
- flags = f.read()
- self.assertEqual(int(flags, 16) & 1, 0)
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/scripts/cli/test_interfaces_geneve.py b/scripts/cli/test_interfaces_geneve.py
deleted file mode 100755
index f84a55f86..000000000
--- a/scripts/cli/test_interfaces_geneve.py
+++ /dev/null
@@ -1,37 +0,0 @@
-#!/usr/bin/env python3
-#
-# Copyright (C) 2020 VyOS maintainers and contributors
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 2 or later as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import unittest
-
-from vyos.configsession import ConfigSession, ConfigSessionError
-from base_interfaces_test import BasicInterfaceTest
-
-
-class GeneveInterfaceTest(BasicInterfaceTest.BaseTest):
- def setUp(self):
- super().setUp()
-
- self._base_path = ['interfaces', 'geneve']
- self._options = {
- 'gnv0': ['vni 10', 'remote 127.0.1.1'],
- 'gnv1': ['vni 20', 'remote 127.0.1.2'],
- }
- self._interfaces = list(self._options)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/scripts/cli/test_interfaces_l2tpv3.py b/scripts/cli/test_interfaces_l2tpv3.py
deleted file mode 100755
index d8655d157..000000000
--- a/scripts/cli/test_interfaces_l2tpv3.py
+++ /dev/null
@@ -1,59 +0,0 @@
-#!/usr/bin/env python3
-#
-# Copyright (C) 2020 VyOS maintainers and contributors
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 2 or later as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import json
-import jmespath
-import unittest
-
-from base_interfaces_test import BasicInterfaceTest
-from vyos.util import cmd
-
-class GeneveInterfaceTest(BasicInterfaceTest.BaseTest):
- def setUp(self):
- super().setUp()
-
- self._base_path = ['interfaces', 'l2tpv3']
- self._options = {
- 'l2tpeth10': ['local-ip 127.0.0.1', 'remote-ip 127.10.10.10',
- 'tunnel-id 100', 'peer-tunnel-id 10',
- 'session-id 100', 'peer-session-id 10',
- 'source-port 1010', 'destination-port 10101'],
- 'l2tpeth20': ['local-ip 127.0.0.1', 'peer-session-id 20',
- 'peer-tunnel-id 200', 'remote-ip 127.20.20.20',
- 'session-id 20', 'tunnel-id 200',
- 'source-port 2020', 'destination-port 20202'],
- }
- self._interfaces = list(self._options)
-
- def test_add_address_single(self):
- super().test_add_address_single()
-
- command = 'sudo ip -j l2tp show session'
- json_out = json.loads(cmd(command))
- for interface in self._options:
- for config in json_out:
- if config['interface'] == interface:
- # convert list with configuration items into a dict
- dict = {}
- for opt in self._options[interface]:
- dict.update({opt.split()[0].replace('-','_'): opt.split()[1]})
-
- for key in ['peer_session_id', 'peer_tunnel_id', 'session_id', 'tunnel_id']:
- self.assertEqual(str(config[key]), dict[key])
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/scripts/cli/test_interfaces_loopback.py b/scripts/cli/test_interfaces_loopback.py
deleted file mode 100755
index ba428b5d3..000000000
--- a/scripts/cli/test_interfaces_loopback.py
+++ /dev/null
@@ -1,41 +0,0 @@
-#!/usr/bin/env python3
-#
-# Copyright (C) 2020 VyOS maintainers and contributors
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 2 or later as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import unittest
-
-from base_interfaces_test import BasicInterfaceTest
-from vyos.validate import is_intf_addr_assigned
-
-class LoopbackInterfaceTest(BasicInterfaceTest.BaseTest):
- def setUp(self):
- super().setUp()
- # these addresses are never allowed to be removed from the system
- self._loopback_addresses = ['127.0.0.1', '::1']
- self._base_path = ['interfaces', 'loopback']
- self._interfaces = ['lo']
-
- def test_add_address_single(self):
- super().test_add_address_single()
- for addr in self._loopback_addresses:
- self.assertTrue(is_intf_addr_assigned('lo', addr))
-
- def test_add_address_multi(self):
- super().test_add_address_multi()
- for addr in self._loopback_addresses:
- self.assertTrue(is_intf_addr_assigned('lo', addr))
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/scripts/cli/test_interfaces_macsec.py b/scripts/cli/test_interfaces_macsec.py
deleted file mode 100755
index 0f1b6486d..000000000
--- a/scripts/cli/test_interfaces_macsec.py
+++ /dev/null
@@ -1,102 +0,0 @@
-#!/usr/bin/env python3
-#
-# Copyright (C) 2020 VyOS maintainers and contributors
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 2 or later as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import re
-import unittest
-from psutil import process_iter
-
-from vyos.ifconfig import Section
-from base_interfaces_test import BasicInterfaceTest
-from vyos.configsession import ConfigSessionError
-from vyos.util import read_file
-
-def get_config_value(intf, key):
- tmp = read_file(f'/run/wpa_supplicant/{intf}.conf')
- tmp = re.findall(r'\n?{}=(.*)'.format(key), tmp)
- return tmp[0]
-
-class MACsecInterfaceTest(BasicInterfaceTest.BaseTest):
- def setUp(self):
- super().setUp()
- self._base_path = ['interfaces', 'macsec']
- self._options = {
- 'macsec0': ['source-interface eth0',
- 'security cipher gcm-aes-128']
- }
-
- # if we have a physical eth1 interface, add a second macsec instance
- if 'eth1' in Section.interfaces("ethernet"):
- macsec = { 'macsec1': ['source-interface eth1', 'security cipher gcm-aes-128'] }
- self._options.update(macsec)
-
- self._interfaces = list(self._options)
-
- def test_encryption(self):
- """ MACsec can be operating in authentication and encryption
- mode - both using different mandatory settings, lets test
- encryption as the basic authentication test has been performed
- using the base class tests """
- intf = 'macsec0'
- src_intf = 'eth0'
- mak_cak = '232e44b7fda6f8e2d88a07bf78a7aff4'
- mak_ckn = '40916f4b23e3d548ad27eedd2d10c6f98c2d21684699647d63d41b500dfe8836'
- replay_window = '64'
- self.session.set(self._base_path + [intf, 'security', 'encrypt'])
-
- # check validate() - Cipher suite must be set for MACsec
- with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(self._base_path + [intf, 'security', 'cipher', 'gcm-aes-128'])
-
- # check validate() - Physical source interface must be set for MACsec
- with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(self._base_path + [intf, 'source-interface', src_intf])
-
- # check validate() - MACsec security keys mandartory when encryption is enabled
- with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(self._base_path + [intf, 'security', 'mka', 'cak', mak_cak])
-
- # check validate() - MACsec security keys mandartory when encryption is enabled
- with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(self._base_path + [intf, 'security', 'mka', 'ckn', mak_ckn])
-
- self.session.set(self._base_path + [intf, 'security', 'replay-window', replay_window])
- self.session.commit()
-
- tmp = get_config_value(src_intf, 'macsec_integ_only')
- self.assertTrue("0" in tmp)
-
- tmp = get_config_value(src_intf, 'mka_cak')
- self.assertTrue(mak_cak in tmp)
-
- tmp = get_config_value(src_intf, 'mka_ckn')
- self.assertTrue(mak_ckn in tmp)
-
- # check that the default priority of 255 is programmed
- tmp = get_config_value(src_intf, 'mka_priority')
- self.assertTrue("255" in tmp)
-
- tmp = get_config_value(src_intf, 'macsec_replay_window')
- self.assertTrue(replay_window in tmp)
-
- # Check for running process
- self.assertTrue("wpa_supplicant" in (p.name() for p in process_iter()))
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/scripts/cli/test_interfaces_pppoe.py b/scripts/cli/test_interfaces_pppoe.py
deleted file mode 100755
index 822f05de6..000000000
--- a/scripts/cli/test_interfaces_pppoe.py
+++ /dev/null
@@ -1,162 +0,0 @@
-#!/usr/bin/env python3
-#
-# Copyright (C) 2019-2020 VyOS maintainers and contributors
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 2 or later as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import re
-import os
-import unittest
-
-from psutil import process_iter
-from vyos.configsession import ConfigSession, ConfigSessionError
-from vyos.util import read_file
-
-config_file = '/etc/ppp/peers/{}'
-dhcp6c_config_file = '/run/dhcp6c/dhcp6c.{}.conf'
-base_path = ['interfaces', 'pppoe']
-
-def get_config_value(interface, key):
- with open(config_file.format(interface), 'r') as f:
- for line in f:
- if line.startswith(key):
- return list(line.split())
- return []
-
-def get_dhcp6c_config_value(interface, key):
- tmp = read_file(dhcp6c_config_file.format(interface))
- tmp = re.findall(r'\n?{}\s+(.*)'.format(key), tmp)
-
- out = []
- for item in tmp:
- out.append(item.replace(';',''))
- return out
-
-class PPPoEInterfaceTest(unittest.TestCase):
- def setUp(self):
- self.session = ConfigSession(os.getpid())
- self._interfaces = ['pppoe0', 'pppoe50']
- self._source_interface = 'eth0'
-
- def tearDown(self):
- self.session.delete(base_path)
- self.session.commit()
- del self.session
-
- def test_pppoe(self):
- """ Check if PPPoE dialer can be configured and runs """
- for interface in self._interfaces:
- user = 'VyOS-user-' + interface
- passwd = 'VyOS-passwd-' + interface
- mtu = '1400'
-
- self.session.set(base_path + [interface, 'authentication', 'user', user])
- self.session.set(base_path + [interface, 'authentication', 'password', passwd])
- self.session.set(base_path + [interface, 'default-route', 'auto'])
- self.session.set(base_path + [interface, 'mtu', mtu])
- self.session.set(base_path + [interface, 'no-peer-dns'])
-
- # check validate() - a source-interface is required
- with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(base_path + [interface, 'source-interface', self._source_interface])
-
- # commit changes
- self.session.commit()
-
- # verify configuration file(s)
- for interface in self._interfaces:
- user = 'VyOS-user-' + interface
- password = 'VyOS-passwd-' + interface
-
- tmp = get_config_value(interface, 'mtu')[1]
- self.assertEqual(tmp, mtu)
- tmp = get_config_value(interface, 'user')[1].replace('"', '')
- self.assertEqual(tmp, user)
- tmp = get_config_value(interface, 'password')[1].replace('"', '')
- self.assertEqual(tmp, password)
- tmp = get_config_value(interface, 'ifname')[1]
- self.assertEqual(tmp, interface)
-
- # Check if ppp process is running in the interface in question
- running = False
- for p in process_iter():
- if "pppd" in p.name():
- if interface in p.cmdline():
- running = True
-
- self.assertTrue(running)
-
- def test_pppoe_dhcpv6pd(self):
- """ Check if PPPoE dialer can be configured with DHCPv6-PD """
- address = '1'
- sla_id = '0'
- sla_len = '8'
- for interface in self._interfaces:
- self.session.set(base_path + [interface, 'authentication', 'user', 'vyos'])
- self.session.set(base_path + [interface, 'authentication', 'password', 'vyos'])
- self.session.set(base_path + [interface, 'default-route', 'none'])
- self.session.set(base_path + [interface, 'no-peer-dns'])
- self.session.set(base_path + [interface, 'source-interface', self._source_interface])
- self.session.set(base_path + [interface, 'ipv6', 'enable'])
-
- # prefix delegation stuff
- dhcpv6_pd_base = base_path + [interface, 'dhcpv6-options', 'pd', '0']
- self.session.set(dhcpv6_pd_base + ['length', '56'])
- self.session.set(dhcpv6_pd_base + ['interface', self._source_interface, 'address', address])
- self.session.set(dhcpv6_pd_base + ['interface', self._source_interface, 'sla-id', sla_id])
-
- # commit changes
- self.session.commit()
-
- # verify "normal" PPPoE value - 1492 is default MTU
- tmp = get_config_value(interface, 'mtu')[1]
- self.assertEqual(tmp, '1492')
- tmp = get_config_value(interface, 'user')[1].replace('"', '')
- self.assertEqual(tmp, 'vyos')
- tmp = get_config_value(interface, 'password')[1].replace('"', '')
- self.assertEqual(tmp, 'vyos')
-
- for param in ['+ipv6', 'ipv6cp-use-ipaddr']:
- tmp = get_config_value(interface, param)[0]
- self.assertEqual(tmp, param)
-
- # verify DHCPv6 prefix delegation
- # will return: ['delegation', '::/56 infinity;']
- tmp = get_dhcp6c_config_value(interface, 'prefix')[1].split()[0] # mind the whitespace
- self.assertEqual(tmp, '::/56')
- tmp = get_dhcp6c_config_value(interface, 'prefix-interface')[0].split()[0]
- self.assertEqual(tmp, self._source_interface)
- tmp = get_dhcp6c_config_value(interface, 'ifid')[0]
- self.assertEqual(tmp, address)
- tmp = get_dhcp6c_config_value(interface, 'sla-id')[0]
- self.assertEqual(tmp, sla_id)
- tmp = get_dhcp6c_config_value(interface, 'sla-len')[0]
- self.assertEqual(tmp, sla_len)
-
- # Check if ppp process is running in the interface in question
- running = False
- for p in process_iter():
- if "pppd" in p.name():
- running = True
- self.assertTrue(running)
-
- # We can not check if wide-dhcpv6 process is running as it is started
- # after the PPP interface gets a link to the ISP - but we can see if
- # it would be started by the scripts
- tmp = read_file(f'/etc/ppp/ipv6-up.d/1000-vyos-pppoe-{interface}')
- tmp = re.findall(f'systemctl start dhcp6c@{interface}.service', tmp)
- self.assertTrue(tmp)
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/scripts/cli/test_interfaces_pseudo_ethernet.py b/scripts/cli/test_interfaces_pseudo_ethernet.py
deleted file mode 100755
index bc2e6e7eb..000000000
--- a/scripts/cli/test_interfaces_pseudo_ethernet.py
+++ /dev/null
@@ -1,39 +0,0 @@
-#!/usr/bin/env python3
-#
-# Copyright (C) 2020 VyOS maintainers and contributors
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 2 or later as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import unittest
-
-from base_interfaces_test import BasicInterfaceTest
-
-class PEthInterfaceTest(BasicInterfaceTest.BaseTest):
-
- def setUp(self):
- super().setUp()
- self._base_path = ['interfaces', 'pseudo-ethernet']
-
- self._test_ip = True
- self._test_mtu = True
- self._test_vlan = True
- self._test_qinq = True
-
- self._options = {
- 'peth0': ['source-interface eth1'],
- 'peth1': ['source-interface eth1'],
- }
- self._interfaces = list(self._options)
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/scripts/cli/test_interfaces_tunnel.py b/scripts/cli/test_interfaces_tunnel.py
deleted file mode 100755
index 7611ffe26..000000000
--- a/scripts/cli/test_interfaces_tunnel.py
+++ /dev/null
@@ -1,109 +0,0 @@
-#!/usr/bin/env python3
-#
-# Copyright (C) 2020 VyOS maintainers and contributors
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 2 or later as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import unittest
-
-from vyos.configsession import ConfigSession
-
-from base_interfaces_test import BasicInterfaceTest
-
-class TunnelInterfaceTest(BasicInterfaceTest.BaseTest):
- # encoding, tunnel endpoint (v4/v6), address (v4/v6)
- _valid = [
- ('gre', 4, 4),
- ('gre', 4, 6),
- ('ip6gre', 6, 4),
- ('ip6gre', 6, 6),
- ('gre-bridge', 4, 4),
- ('ipip', 4, 4),
- ('ipip', 4, 6),
- ('ipip6', 6, 4),
- ('ipip6', 6, 6),
- ('ip6ip6', 6, 6),
- ('sit', 4, 6),
- ]
-
- local = {
- 4: '10.100.{}.1/24',
- 6: '2001:db8:{}::1/64',
- }
-
- remote = {
- 4: '192.0.{}.1',
- 6: '2002::{}:1',
- }
-
- address = {
- 4: '10.100.{}.1/24',
- 6: '2001:db8:{}::1/64',
- }
-
- def setUp(self):
- local = {}
- remote = {}
- address = {}
-
- self._intf_dummy = ['interfaces', 'dummy']
- self._base_path = ['interfaces', 'tunnel']
- self._interfaces = ['tun{}'.format(n) for n in range(len(self._valid))]
-
- self._test_mtu = True
- super().setUp()
-
- for number in range(len(self._valid)):
- dum4 = 'dum4{}'.format(number)
- dum6 = 'dum6{}'.format(number)
-
- ipv4 = self.local[4].format(number)
- ipv6 = self.local[6].format(number)
-
- local.setdefault(4, {})[number] = ipv4
- local.setdefault(6, {})[number] = ipv6
-
- ipv4 = self.remote[4].format(number)
- ipv6 = self.remote[6].format(number)
-
- remote.setdefault(4, {})[number] = ipv4
- remote.setdefault(6, {})[number] = ipv6
-
- ipv4 = self.address[4].format(number)
- ipv6 = self.address[6].format(number)
-
- address.setdefault(4, {})[number] = ipv4
- address.setdefault(6, {})[number] = ipv6
-
- self.session.set(self._intf_dummy + [dum4, 'address', ipv4])
- self.session.set(self._intf_dummy + [dum6, 'address', ipv6])
- self.session.commit()
-
- for number, (encap, p2p, addr) in enumerate(self._valid):
- intf = 'tun%d' % number
- tunnel = {}
- tunnel['encapsulation'] = encap
- tunnel['local-ip'] = local[p2p][number].split('/')[0]
- tunnel['remote-ip'] = remote[p2p][number].split('/')[0]
- tunnel['address'] = address[addr][number]
- for name in tunnel:
- self.session.set(self._base_path + [intf, name, tunnel[name]])
-
- def tearDown(self):
- self.session.delete(self._intf_dummy)
- super().tearDown()
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/scripts/cli/test_interfaces_vxlan.py b/scripts/cli/test_interfaces_vxlan.py
deleted file mode 100755
index 2628e0285..000000000
--- a/scripts/cli/test_interfaces_vxlan.py
+++ /dev/null
@@ -1,35 +0,0 @@
-#!/usr/bin/env python3
-#
-# Copyright (C) 2020 VyOS maintainers and contributors
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 2 or later as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import unittest
-
-from vyos.configsession import ConfigSession, ConfigSessionError
-from base_interfaces_test import BasicInterfaceTest
-
-class VXLANInterfaceTest(BasicInterfaceTest.BaseTest):
- def setUp(self):
- super().setUp()
-
- self._test_mtu = True
- self._base_path = ['interfaces', 'vxlan']
- self._options = {
- 'vxlan0': ['vni 10', 'remote 127.0.0.2'],
- 'vxlan1': ['vni 20', 'group 239.1.1.1', 'source-interface eth0'],
- }
- self._interfaces = list(self._options)
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/scripts/cli/test_interfaces_wireguard.py b/scripts/cli/test_interfaces_wireguard.py
deleted file mode 100755
index 0c32a4696..000000000
--- a/scripts/cli/test_interfaces_wireguard.py
+++ /dev/null
@@ -1,68 +0,0 @@
-#!/usr/bin/env python3
-#
-# Copyright (C) 2020 VyOS maintainers and contributors
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 2 or later as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import unittest
-
-from vyos.configsession import ConfigSession, ConfigSessionError
-from base_interfaces_test import BasicInterfaceTest
-
-# Generate WireGuard default keypair
-if not os.path.isdir('/config/auth/wireguard/default'):
- os.system('sudo /usr/libexec/vyos/op_mode/wireguard.py --genkey')
-
-base_path = ['interfaces', 'wireguard']
-
-class WireGuardInterfaceTest(unittest.TestCase):
- def setUp(self):
- self.session = ConfigSession(os.getpid())
- self._test_addr = ['192.0.2.1/26', '192.0.2.255/31', '192.0.2.64/32',
- '2001:db8:1::ffff/64', '2001:db8:101::1/112']
- self._interfaces = ['wg0', 'wg1']
-
- def tearDown(self):
- self.session.delete(base_path)
- self.session.commit()
- del self.session
-
- def test_peer_setup(self):
- """
- Create WireGuard interfaces with associated peers
- """
- for intf in self._interfaces:
- peer = 'foo-' + intf
- psk = 'u2xdA70hkz0S1CG0dZlOh0aq2orwFXRIVrKo4DCvHgM='
- pubkey = 'n6ZZL7ph/QJUJSUUTyu19c77my1dRCDHkMzFQUO9Z3A='
-
- for addr in self._test_addr:
- self.session.set(base_path + [intf, 'address', addr])
-
- self.session.set(base_path + [intf, 'peer', peer, 'address', '127.0.0.1'])
- self.session.set(base_path + [intf, 'peer', peer, 'port', '1337'])
-
- # Allow different prefixes to traverse the tunnel
- allowed_ips = ['10.0.0.0/8', '172.16.0.0/12', '192.168.0.0/16']
- for ip in allowed_ips:
- self.session.set(base_path + [intf, 'peer', peer, 'allowed-ips', ip])
-
- self.session.set(base_path + [intf, 'peer', peer, 'preshared-key', psk])
- self.session.set(base_path + [intf, 'peer', peer, 'pubkey', pubkey])
- self.session.commit()
-
- self.assertTrue(os.path.isdir(f'/sys/class/net/{intf}'))
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/scripts/cli/test_interfaces_wireless.py b/scripts/cli/test_interfaces_wireless.py
deleted file mode 100755
index fae233244..000000000
--- a/scripts/cli/test_interfaces_wireless.py
+++ /dev/null
@@ -1,58 +0,0 @@
-#!/usr/bin/env python3
-#
-# Copyright (C) 2020 VyOS maintainers and contributors
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 2 or later as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import unittest
-
-from base_interfaces_test import BasicInterfaceTest
-from psutil import process_iter
-from vyos.util import check_kmod
-
-class WirelessInterfaceTest(BasicInterfaceTest.BaseTest):
- def setUp(self):
- super().setUp()
-
- self._base_path = ['interfaces', 'wireless']
- self._options = {
- 'wlan0': ['physical-device phy0', 'ssid VyOS-WIFI-0',
- 'type station', 'address 192.0.2.1/30'],
- 'wlan1': ['physical-device phy0', 'ssid VyOS-WIFI-1',
- 'type access-point', 'address 192.0.2.5/30', 'channel 0'],
- 'wlan10': ['physical-device phy1', 'ssid VyOS-WIFI-2',
- 'type station', 'address 192.0.2.9/30'],
- 'wlan11': ['physical-device phy1', 'ssid VyOS-WIFI-3',
- 'type access-point', 'address 192.0.2.13/30', 'channel 0'],
- }
- self._interfaces = list(self._options)
- self.session.set(['system', 'wifi-regulatory-domain', 'SE'])
-
- def test_add_address_single(self):
- """ derived method to check if member interfaces are enslaved properly """
- super().test_add_address_single()
-
- for option, option_value in self._options.items():
- if 'type access-point' in option_value:
- # Check for running process
- self.assertIn('hostapd', (p.name() for p in process_iter()))
- elif 'type station' in option_value:
- # Check for running process
- self.assertIn('wpa_supplicant', (p.name() for p in process_iter()))
- else:
- self.assertTrue(False)
-
-if __name__ == '__main__':
- check_kmod('mac80211_hwsim')
- unittest.main()
diff --git a/scripts/cli/test_interfaces_wirelessmodem.py b/scripts/cli/test_interfaces_wirelessmodem.py
deleted file mode 100755
index 40cd03b93..000000000
--- a/scripts/cli/test_interfaces_wirelessmodem.py
+++ /dev/null
@@ -1,82 +0,0 @@
-#!/usr/bin/env python3
-#
-# Copyright (C) 2020 VyOS maintainers and contributors
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 2 or later as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import unittest
-
-from psutil import process_iter
-from vyos.configsession import ConfigSession, ConfigSessionError
-
-config_file = '/etc/ppp/peers/{}'
-base_path = ['interfaces', 'wirelessmodem']
-
-def get_config_value(interface, key):
- with open(config_file.format(interface), 'r') as f:
- for line in f:
- if line.startswith(key):
- return list(line.split())
- return []
-
-class WWANInterfaceTest(unittest.TestCase):
- def setUp(self):
- self.session = ConfigSession(os.getpid())
- self._interfaces = ['wlm0', 'wlm1']
-
- def tearDown(self):
- self.session.delete(base_path)
- self.session.commit()
- del self.session
-
- def test_wlm_1(self):
- for interface in self._interfaces:
- self.session.set(base_path + [interface, 'no-peer-dns'])
- self.session.set(base_path + [interface, 'ondemand'])
-
- # check validate() - APN must be configure
- with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(base_path + [interface, 'apn', 'vyos.net'])
-
- # check validate() - device must be configure
- with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(base_path + [interface, 'device', 'ttyS0'])
-
- # commit changes
- self.session.commit()
-
- # verify configuration file(s)
- for interface in self._interfaces:
- tmp = get_config_value(interface, 'ifname')[1]
- self.assertTrue(interface in tmp)
-
- tmp = get_config_value(interface, 'demand')[0]
- self.assertTrue('demand' in tmp)
-
- tmp = os.path.isfile(f'/etc/ppp/peers/chat.{interface}')
- self.assertTrue(tmp)
-
- # Check if ppp process is running in the interface in question
- running = False
- for p in process_iter():
- if "pppd" in p.name():
- if interface in p.cmdline():
- running = True
-
- self.assertTrue(running)
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/scripts/cli/test_nat.py b/scripts/cli/test_nat.py
deleted file mode 100755
index 416810e40..000000000
--- a/scripts/cli/test_nat.py
+++ /dev/null
@@ -1,63 +0,0 @@
-#!/usr/bin/env python3
-#
-# Copyright (C) 2020 VyOS maintainers and contributors
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 2 or later as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import jmespath
-import json
-import unittest
-
-from vyos.configsession import ConfigSession, ConfigSessionError
-from vyos.util import cmd
-
-base_path = ['nat']
-snat_pattern = 'nftables[?rule].rule[?chain].{chain: chain, comment: comment, address: { network: expr[].match.right.prefix.addr | [0], prefix: expr[].match.right.prefix.len | [0]}}'
-
-class TestNAT(unittest.TestCase):
- def setUp(self):
- # ensure we can also run this test on a live system - so lets clean
- # out the current configuration :)
- self.session = ConfigSession(os.getpid())
- self.session.delete(base_path)
-
- def tearDown(self):
- self.session.delete(base_path)
- self.session.commit()
-
- def test_source_nat(self):
- """ Configure and validate source NAT rule(s) """
-
- path = base_path + ['source']
- network = '192.168.0.0/16'
- self.session.set(path + ['rule', '1', 'destination', 'address', network])
- self.session.set(path + ['rule', '1', 'exclude'])
-
- # check validate() - outbound-interface must be defined
- with self.assertRaises(ConfigSessionError):
- self.session.commit()
-
- self.session.set(path + ['rule', '1', 'outbound-interface', 'any'])
- self.session.commit()
-
- tmp = cmd('sudo nft -j list table nat')
- nftable_json = json.loads(tmp)
- condensed_json = jmespath.search(snat_pattern, nftable_json)[0]
-
- self.assertEqual(condensed_json['comment'], 'DST-NAT-1')
- self.assertEqual(condensed_json['address']['network'], network.split('/')[0])
- self.assertEqual(str(condensed_json['address']['prefix']), network.split('/')[1])
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/scripts/cli/test_service_bcast-relay.py b/scripts/cli/test_service_bcast-relay.py
deleted file mode 100755
index fe4531c3b..000000000
--- a/scripts/cli/test_service_bcast-relay.py
+++ /dev/null
@@ -1,71 +0,0 @@
-#!/usr/bin/env python3
-#
-# Copyright (C) 2019-2020 VyOS maintainers and contributors
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 2 or later as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import unittest
-
-from psutil import process_iter
-from vyos.configsession import ConfigSession, ConfigSessionError
-
-base_path = ['service', 'broadcast-relay']
-
-class TestServiceBroadcastRelay(unittest.TestCase):
- _address1 = '192.0.2.1/24'
- _address2 = '192.0.2.1/24'
-
- def setUp(self):
- self.session = ConfigSession(os.getpid())
- self.session.set(['interfaces', 'dummy', 'dum1001', 'address', self._address1])
- self.session.set(['interfaces', 'dummy', 'dum1002', 'address', self._address2])
- self.session.commit()
-
- def tearDown(self):
- self.session.delete(['interfaces', 'dummy', 'dum1001'])
- self.session.delete(['interfaces', 'dummy', 'dum1002'])
- self.session.delete(base_path)
- self.session.commit()
- del self.session
-
- def test_service(self):
- """ Check if broadcast relay service can be configured and runs """
- ids = range(1, 5)
- for id in ids:
- base = base_path + ['id', str(id)]
- self.session.set(base + ['description', 'vyos'])
- self.session.set(base + ['port', str(10000 + id)])
-
- # check validate() - two interfaces must be present
- with self.assertRaises(ConfigSessionError):
- self.session.commit()
-
- self.session.set(base + ['interface', 'dum1001'])
- self.session.set(base + ['interface', 'dum1002'])
- self.session.set(base + ['address', self._address1.split('/')[0]])
-
- self.session.commit()
-
- for id in ids:
- # check if process is running
- running = False
- for p in process_iter():
- if "udp-broadcast-relay" in p.name():
- if p.cmdline()[3] == str(id):
- running = True
- break
- self.assertTrue(running)
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/scripts/cli/test_service_dns_dynamic.py b/scripts/cli/test_service_dns_dynamic.py
deleted file mode 100755
index be52360ed..000000000
--- a/scripts/cli/test_service_dns_dynamic.py
+++ /dev/null
@@ -1,141 +0,0 @@
-#!/usr/bin/env python3
-#
-# Copyright (C) 2019-2020 VyOS maintainers and contributors
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 2 or later as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import re
-import os
-import unittest
-
-from getpass import getuser
-from psutil import process_iter
-from vyos.configsession import ConfigSession, ConfigSessionError
-from vyos.util import read_file
-
-DDCLIENT_CONF = '/run/ddclient/ddclient.conf'
-base_path = ['service', 'dns', 'dynamic']
-
-def get_config_value(key):
- tmp = read_file(DDCLIENT_CONF)
- tmp = re.findall(r'\n?{}=+(.*)'.format(key), tmp)
- tmp = tmp[0].rstrip(',')
- return tmp
-
-def check_process():
- """
- Check for running process, process name changes dynamically e.g.
- "ddclient - sleeping for 270 seconds", thus we need a different approach
- """
- running = False
- for p in process_iter():
- if "ddclient" in p.name():
- running = True
- return running
-
-class TestServiceDDNS(unittest.TestCase):
- def setUp(self):
- self.session = ConfigSession(os.getpid())
-
- def tearDown(self):
- # Delete DDNS configuration
- self.session.delete(base_path)
- self.session.commit()
-
- del self.session
-
- def test_service(self):
- """ Check individual DDNS service providers """
- ddns = ['interface', 'eth0', 'service']
- services = ['cloudflare', 'afraid', 'dyndns', 'zoneedit']
-
- for service in services:
- user = 'vyos_user'
- password = 'vyos_pass'
- zone = 'vyos.io'
- self.session.delete(base_path)
- self.session.set(base_path + ddns + [service, 'host-name', 'test.ddns.vyos.io'])
- self.session.set(base_path + ddns + [service, 'login', user])
- self.session.set(base_path + ddns + [service, 'password', password])
- self.session.set(base_path + ddns + [service, 'zone', zone])
-
- # commit changes
- if service == 'cloudflare':
- self.session.commit()
- else:
- # zone option only works on cloudflare, an exception is raised
- # for all others
- with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.delete(base_path + ddns + [service, 'zone', 'vyos.io'])
- # commit changes again - now it should work
- self.session.commit()
-
- # we can only read the configuration file when we operate as 'root'
- if getuser() == 'root':
- protocol = get_config_value('protocol')
- login = get_config_value('login')
- pwd = get_config_value('password')
-
- # some services need special treatment
- protoname = service
- if service == 'cloudflare':
- tmp = get_config_value('zone')
- self.assertTrue(tmp == zone)
- elif service == 'afraid':
- protoname = 'freedns'
- elif service == 'dyndns':
- protoname = 'dyndns2'
- elif service == 'zoneedit':
- protoname = 'zoneedit1'
-
- self.assertTrue(protocol == protoname)
- self.assertTrue(login == user)
- self.assertTrue(pwd == "'" + password + "'")
-
- # Check for running process
- self.assertTrue(check_process())
-
-
- def test_rfc2136(self):
- """ Check if DDNS service can be configured and runs """
- ddns = ['interface', 'eth0', 'rfc2136', 'vyos']
- ddns_key_file = '/config/auth/my.key'
-
- self.session.set(base_path + ddns + ['key', ddns_key_file])
- self.session.set(base_path + ddns + ['record', 'test.ddns.vyos.io'])
- self.session.set(base_path + ddns + ['server', 'ns1.vyos.io'])
- self.session.set(base_path + ddns + ['ttl', '300'])
- self.session.set(base_path + ddns + ['zone', 'vyos.io'])
-
- # ensure an exception will be raised as no key is present
- if os.path.exists(ddns_key_file):
- os.unlink(ddns_key_file)
-
- # check validate() - the key file does not exist yet
- with self.assertRaises(ConfigSessionError):
- self.session.commit()
-
- with open(ddns_key_file, 'w') as f:
- f.write('S3cretKey')
-
- # commit changes
- self.session.commit()
-
- # TODO: inspect generated configuration file
-
- # Check for running process
- self.assertTrue(check_process())
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/scripts/cli/test_service_mdns-repeater.py b/scripts/cli/test_service_mdns-repeater.py
deleted file mode 100755
index 18900b6d2..000000000
--- a/scripts/cli/test_service_mdns-repeater.py
+++ /dev/null
@@ -1,51 +0,0 @@
-#!/usr/bin/env python3
-#
-# Copyright (C) 2020 VyOS maintainers and contributors
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 2 or later as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import unittest
-
-from psutil import process_iter
-from vyos.configsession import ConfigSession
-
-base_path = ['service', 'mdns', 'repeater']
-intf_base = ['interfaces', 'dummy']
-
-class TestServiceMDNSrepeater(unittest.TestCase):
- def setUp(self):
- self.session = ConfigSession(os.getpid())
-
- def tearDown(self):
- self.session.delete(base_path)
- self.session.delete(intf_base + ['dum10'])
- self.session.delete(intf_base + ['dum20'])
- self.session.commit()
- del self.session
-
- def test_service(self):
- # Service required a configured IP address on the interface
-
- self.session.set(intf_base + ['dum10', 'address', '192.0.2.1/30'])
- self.session.set(intf_base + ['dum20', 'address', '192.0.2.5/30'])
-
- self.session.set(base_path + ['interface', 'dum10'])
- self.session.set(base_path + ['interface', 'dum20'])
- self.session.commit()
-
- # Check for running process
- self.assertTrue("mdns-repeater" in (p.name() for p in process_iter()))
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/scripts/cli/test_service_pppoe-server.py b/scripts/cli/test_service_pppoe-server.py
deleted file mode 100755
index 901ca792d..000000000
--- a/scripts/cli/test_service_pppoe-server.py
+++ /dev/null
@@ -1,167 +0,0 @@
-#!/usr/bin/env python3
-#
-# Copyright (C) 020 VyOS maintainers and contributors
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 2 or later as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import re
-import os
-import unittest
-
-from configparser import ConfigParser
-from psutil import process_iter
-from vyos.configsession import ConfigSession
-from vyos.configsession import ConfigSessionError
-
-base_path = ['service', 'pppoe-server']
-local_if = ['interfaces', 'dummy', 'dum667']
-pppoe_conf = '/run/accel-pppd/pppoe.conf'
-
-ac_name = 'ACN'
-subnet = '172.18.0.0/24'
-gateway = '192.0.2.1'
-nameserver = '9.9.9.9'
-mtu = '1492'
-interface = 'eth0'
-
-class TestServicePPPoEServer(unittest.TestCase):
- def setUp(self):
- self.session = ConfigSession(os.getpid())
- # ensure we can also run this test on a live system - so lets clean
- # out the current configuration :)
- self.session.delete(base_path)
-
- def tearDown(self):
- self.session.delete(base_path)
- self.session.delete(local_if)
- self.session.commit()
- del self.session
-
- def verify(self, conf):
- # validate some common values in the configuration
- for tmp in ['log_syslog', 'pppoe', 'chap-secrets', 'ippool', 'ipv6pool',
- 'ipv6_nd', 'ipv6_dhcp', 'auth_mschap_v2', 'auth_mschap_v1',
- 'auth_chap_md5', 'auth_pap', 'shaper']:
- # Settings without values provide None
- self.assertEqual(conf['modules'][tmp], None)
-
- # check Access Concentrator setting
- self.assertTrue(conf['pppoe']['ac-name'] == ac_name)
- self.assertTrue(conf['pppoe'].getboolean('verbose'))
- self.assertTrue(conf['pppoe']['interface'], interface)
-
- # check configured subnet
- self.assertEqual(conf['ip-pool'][subnet], None)
- self.assertEqual(conf['ip-pool']['gw-ip-address'], gateway)
-
- # check ppp
- self.assertTrue(conf['ppp'].getboolean('verbose'))
- self.assertTrue(conf['ppp'].getboolean('check-ip'))
- self.assertEqual(conf['ppp']['min-mtu'], mtu)
- self.assertEqual(conf['ppp']['mtu'], mtu)
- self.assertEqual(conf['ppp']['lcp-echo-interval'], '30')
- self.assertEqual(conf['ppp']['lcp-echo-timeout'], '0')
- self.assertEqual(conf['ppp']['lcp-echo-failure'], '3')
-
- def basic_config(self):
- self.session.set(local_if + ['address', '192.0.2.1/32'])
-
- self.session.set(base_path + ['access-concentrator', ac_name])
- self.session.set(base_path + ['authentication', 'mode', 'local'])
- self.session.set(base_path + ['client-ip-pool', 'subnet', subnet])
- self.session.set(base_path + ['name-server', nameserver])
- self.session.set(base_path + ['interface', interface])
- self.session.set(base_path + ['local-ip', gateway])
-
- def test_local_auth(self):
- """ Test configuration of local authentication for PPPoE server """
- self.basic_config()
- # authentication
- self.session.set(base_path + ['authentication', 'local-users', 'username', 'vyos', 'password', 'vyos'])
- self.session.set(base_path + ['authentication', 'mode', 'local'])
- # other settings
- self.session.set(base_path + ['ppp-options', 'ccp'])
- self.session.set(base_path + ['ppp-options', 'mppe', 'require'])
- self.session.set(base_path + ['limits', 'connection-limit', '20/min'])
-
- # commit changes
- self.session.commit()
-
- # Validate configuration values
- conf = ConfigParser(allow_no_value=True)
- conf.read(pppoe_conf)
-
- # basic verification
- self.verify(conf)
-
- # check auth
- self.assertEqual(conf['chap-secrets']['chap-secrets'], '/run/accel-pppd/pppoe.chap-secrets')
- self.assertEqual(conf['chap-secrets']['gw-ip-address'], gateway)
-
- # check pado
- self.assertEqual(conf['ppp']['mppe'], 'require')
- self.assertTrue(conf['ppp'].getboolean('ccp'))
-
- # check other settings
- self.assertEqual(conf['connlimit']['limit'], '20/min')
-
- # Check for running process
- self.assertTrue('accel-pppd' in (p.name() for p in process_iter()))
-
- def test_radius_auth(self):
- """ Test configuration of RADIUS authentication for PPPoE server """
- radius_server = '192.0.2.22'
- radius_key = 'secretVyOS'
- radius_port = '2000'
- radius_port_acc = '3000'
-
- self.basic_config()
- self.session.set(base_path + ['authentication', 'radius', 'server', radius_server, 'key', radius_key])
- self.session.set(base_path + ['authentication', 'radius', 'server', radius_server, 'port', radius_port])
- self.session.set(base_path + ['authentication', 'radius', 'server', radius_server, 'acct-port', radius_port_acc])
- self.session.set(base_path + ['authentication', 'mode', 'radius'])
-
- # commit changes
- self.session.commit()
-
- # Validate configuration values
- conf = ConfigParser(allow_no_value=True)
- conf.read(pppoe_conf)
-
- # basic verification
- self.verify(conf)
-
- # check auth
- self.assertTrue(conf['radius'].getboolean('verbose'))
- self.assertTrue(conf['radius']['acct-timeout'], '3')
- self.assertTrue(conf['radius']['timeout'], '3')
- self.assertTrue(conf['radius']['max-try'], '3')
- self.assertTrue(conf['radius']['gw-ip-address'], gateway)
-
- server = conf['radius']['server'].split(',')
- self.assertEqual(radius_server, server[0])
- self.assertEqual(radius_key, server[1])
- self.assertEqual(f'auth-port={radius_port}', server[2])
- self.assertEqual(f'acct-port={radius_port_acc}', server[3])
- self.assertEqual(f'req-limit=0', server[4])
- self.assertEqual(f'fail-time=0', server[5])
-
- # check defaults
- self.assertEqual(conf['ppp']['mppe'], 'prefer')
- self.assertFalse(conf['ppp'].getboolean('ccp'))
-
- # Check for running process
- self.assertTrue('accel-pppd' in (p.name() for p in process_iter()))
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/scripts/cli/test_service_router-advert.py b/scripts/cli/test_service_router-advert.py
deleted file mode 100755
index ec2110c8a..000000000
--- a/scripts/cli/test_service_router-advert.py
+++ /dev/null
@@ -1,99 +0,0 @@
-#!/usr/bin/env python3
-#
-# Copyright (C) 2019-2020 VyOS maintainers and contributors
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 2 or later as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import re
-import os
-import unittest
-
-from psutil import process_iter
-from vyos.configsession import ConfigSession
-from vyos.util import read_file
-
-RADVD_CONF = '/run/radvd/radvd.conf'
-
-interface = 'eth1'
-base_path = ['service', 'router-advert', 'interface', interface]
-address_base = ['interfaces', 'ethernet', interface, 'address']
-
-def get_config_value(key):
- tmp = read_file(RADVD_CONF)
- tmp = re.findall(r'\n?{}\s+(.*)'.format(key), tmp)
- return tmp[0].split()[0].replace(';','')
-
-class TestServiceRADVD(unittest.TestCase):
- def setUp(self):
- self.session = ConfigSession(os.getpid())
- self.session.set(address_base + ['2001:db8::1/64'])
-
- def tearDown(self):
- self.session.delete(address_base)
- self.session.delete(base_path)
- self.session.commit()
- del self.session
-
- def test_single(self):
- self.session.set(base_path + ['prefix', '::/64', 'no-on-link-flag'])
- self.session.set(base_path + ['prefix', '::/64', 'no-autonomous-flag'])
- self.session.set(base_path + ['prefix', '::/64', 'valid-lifetime', 'infinity'])
- self.session.set(base_path + ['dnssl', '2001:db8::1234'])
- self.session.set(base_path + ['other-config-flag'])
-
- # commit changes
- self.session.commit()
-
- # verify values
- tmp = get_config_value('interface')
- self.assertEqual(tmp, interface)
-
- tmp = get_config_value('prefix')
- self.assertEqual(tmp, '::/64')
-
- tmp = get_config_value('AdvOtherConfigFlag')
- self.assertEqual(tmp, 'on')
-
- # this is a default value
- tmp = get_config_value('AdvRetransTimer')
- self.assertEqual(tmp, '0')
-
- # this is a default value
- tmp = get_config_value('AdvCurHopLimit')
- self.assertEqual(tmp, '64')
-
- # this is a default value
- tmp = get_config_value('AdvDefaultPreference')
- self.assertEqual(tmp, 'medium')
-
- tmp = get_config_value('AdvAutonomous')
- self.assertEqual(tmp, 'off')
-
- # this is a default value
- tmp = get_config_value('AdvValidLifetime')
- self.assertEqual(tmp, 'infinity')
-
- # this is a default value
- tmp = get_config_value('AdvPreferredLifetime')
- self.assertEqual(tmp, '14400')
-
- tmp = get_config_value('AdvOnLink')
- self.assertEqual(tmp, 'off')
-
-
-
- # Check for running process
- self.assertTrue('radvd' in (p.name() for p in process_iter()))
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/scripts/cli/test_service_snmp.py b/scripts/cli/test_service_snmp.py
deleted file mode 100755
index fb5f5393f..000000000
--- a/scripts/cli/test_service_snmp.py
+++ /dev/null
@@ -1,155 +0,0 @@
-#!/usr/bin/env python3
-#
-# Copyright (C) 2019-2020 VyOS maintainers and contributors
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 2 or later as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import re
-import unittest
-
-from vyos.validate import is_ipv4
-from psutil import process_iter
-
-from vyos.configsession import ConfigSession, ConfigSessionError
-from vyos.util import read_file
-
-SNMPD_CONF = '/etc/snmp/snmpd.conf'
-base_path = ['service', 'snmp']
-
-def get_config_value(key):
- tmp = read_file(SNMPD_CONF)
- tmp = re.findall(r'\n?{}\s+(.*)'.format(key), tmp)
- return tmp[0]
-
-class TestSNMPService(unittest.TestCase):
- def setUp(self):
- self.session = ConfigSession(os.getpid())
- # ensure we can also run this test on a live system - so lets clean
- # out the current configuration :)
- self.session.delete(base_path)
-
- def tearDown(self):
- del self.session
-
- def test_snmp(self):
- """ Check if SNMP can be configured and service runs """
- clients = ['192.0.2.1', '2001:db8::1']
- networks = ['192.0.2.128/25', '2001:db8:babe::/48']
- listen = ['127.0.0.1', '::1']
-
- for auth in ['ro', 'rw']:
- community = 'VyOS' + auth
- self.session.set(base_path + ['community', community, 'authorization', auth])
- for client in clients:
- self.session.set(base_path + ['community', community, 'client', client])
- for network in networks:
- self.session.set(base_path + ['community', community, 'network', network])
-
- for addr in listen:
- self.session.set(base_path + ['listen-address', addr])
-
- self.session.set(base_path + ['contact', 'maintainers@vyos.io'])
- self.session.set(base_path + ['location', 'qemu'])
-
- self.session.commit()
-
- # verify listen address, it will be returned as
- # ['unix:/run/snmpd.socket,udp:127.0.0.1:161,udp6:[::1]:161']
- # thus we need to transfor this into a proper list
- config = get_config_value('agentaddress')
- expected = 'unix:/run/snmpd.socket'
- for addr in listen:
- if is_ipv4(addr):
- expected += ',udp:{}:161'.format(addr)
- else:
- expected += ',udp6:[{}]:161'.format(addr)
-
- self.assertTrue(expected in config)
-
- # Check for running process
- self.assertTrue("snmpd" in (p.name() for p in process_iter()))
-
-
- def test_snmpv3_sha(self):
- """ Check if SNMPv3 can be configured with SHA authentication and service runs"""
-
- self.session.set(base_path + ['v3', 'engineid', '000000000000000000000002'])
- self.session.set(base_path + ['v3', 'group', 'default', 'mode', 'ro'])
- # check validate() - a view must be created before this can be comitted
- with self.assertRaises(ConfigSessionError):
- self.session.commit()
-
- self.session.set(base_path + ['v3', 'view', 'default', 'oid', '1'])
- self.session.set(base_path + ['v3', 'group', 'default', 'view', 'default'])
-
- # create user
- self.session.set(base_path + ['v3', 'user', 'vyos', 'auth', 'plaintext-password', 'vyos12345678'])
- self.session.set(base_path + ['v3', 'user', 'vyos', 'auth', 'type', 'sha'])
- self.session.set(base_path + ['v3', 'user', 'vyos', 'privacy', 'plaintext-password', 'vyos12345678'])
- self.session.set(base_path + ['v3', 'user', 'vyos', 'privacy', 'type', 'aes'])
- self.session.set(base_path + ['v3', 'user', 'vyos', 'group', 'default'])
-
- self.session.commit()
-
- # commit will alter the CLI values - check if they have been updated:
- hashed_password = '4e52fe55fd011c9c51ae2c65f4b78ca93dcafdfe'
- tmp = self.session.show_config(base_path + ['v3', 'user', 'vyos', 'auth', 'encrypted-password']).split()[1]
- self.assertEqual(tmp, hashed_password)
-
- tmp = self.session.show_config(base_path + ['v3', 'user', 'vyos', 'privacy', 'encrypted-password']).split()[1]
- self.assertEqual(tmp, hashed_password)
-
- # TODO: read in config file and check values
-
- # Check for running process
- self.assertTrue("snmpd" in (p.name() for p in process_iter()))
-
- def test_snmpv3_md5(self):
- """ Check if SNMPv3 can be configured with MD5 authentication and service runs"""
-
- self.session.set(base_path + ['v3', 'engineid', '000000000000000000000002'])
- self.session.set(base_path + ['v3', 'group', 'default', 'mode', 'ro'])
- # check validate() - a view must be created before this can be comitted
- with self.assertRaises(ConfigSessionError):
- self.session.commit()
-
- self.session.set(base_path + ['v3', 'view', 'default', 'oid', '1'])
- self.session.set(base_path + ['v3', 'group', 'default', 'view', 'default'])
-
- # create user
- self.session.set(base_path + ['v3', 'user', 'vyos', 'auth', 'plaintext-password', 'vyos12345678'])
- self.session.set(base_path + ['v3', 'user', 'vyos', 'auth', 'type', 'md5'])
- self.session.set(base_path + ['v3', 'user', 'vyos', 'privacy', 'plaintext-password', 'vyos12345678'])
- self.session.set(base_path + ['v3', 'user', 'vyos', 'privacy', 'type', 'des'])
- self.session.set(base_path + ['v3', 'user', 'vyos', 'group', 'default'])
-
- self.session.commit()
-
- # commit will alter the CLI values - check if they have been updated:
- hashed_password = '4c67690d45d3dfcd33d0d7e308e370ad'
- tmp = self.session.show_config(base_path + ['v3', 'user', 'vyos', 'auth', 'encrypted-password']).split()[1]
- self.assertEqual(tmp, hashed_password)
-
- tmp = self.session.show_config(base_path + ['v3', 'user', 'vyos', 'privacy', 'encrypted-password']).split()[1]
- self.assertEqual(tmp, hashed_password)
-
- # TODO: read in config file and check values
-
- # Check for running process
- self.assertTrue("snmpd" in (p.name() for p in process_iter()))
-
-
-if __name__ == '__main__':
- unittest.main()
-
diff --git a/scripts/cli/test_service_ssh.py b/scripts/cli/test_service_ssh.py
deleted file mode 100755
index 3ee498f3d..000000000
--- a/scripts/cli/test_service_ssh.py
+++ /dev/null
@@ -1,116 +0,0 @@
-#!/usr/bin/env python3
-#
-# Copyright (C) 2019-2020 VyOS maintainers and contributors
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 2 or later as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import re
-import os
-import unittest
-
-from psutil import process_iter
-from vyos.configsession import ConfigSession, ConfigSessionError
-from vyos.util import read_file
-
-SSHD_CONF = '/run/ssh/sshd_config'
-base_path = ['service', 'ssh']
-
-def get_config_value(key):
- tmp = read_file(SSHD_CONF)
- tmp = re.findall(r'\n?{}\s+(.*)'.format(key), tmp)
- return tmp
-
-class TestServiceSSH(unittest.TestCase):
- def setUp(self):
- self.session = ConfigSession(os.getpid())
- # ensure we can also run this test on a live system - so lets clean
- # out the current configuration :)
- self.session.delete(base_path)
-
- def tearDown(self):
- # delete testing SSH config
- self.session.delete(base_path)
- # restore "plain" SSH access
- self.session.set(base_path)
-
- self.session.commit()
- del self.session
-
- def test_ssh_single(self):
- """ Check if SSH service can be configured and runs """
- self.session.set(base_path + ['port', '1234'])
- self.session.set(base_path + ['disable-host-validation'])
- self.session.set(base_path + ['disable-password-authentication'])
- self.session.set(base_path + ['loglevel', 'verbose'])
- self.session.set(base_path + ['client-keepalive-interval', '100'])
- self.session.set(base_path + ['listen-address', '127.0.0.1'])
-
- # commit changes
- self.session.commit()
-
- # Check configured port
- port = get_config_value('Port')[0]
- self.assertTrue("1234" in port)
-
- # Check DNS usage
- dns = get_config_value('UseDNS')[0]
- self.assertTrue("no" in dns)
-
- # Check PasswordAuthentication
- pwd = get_config_value('PasswordAuthentication')[0]
- self.assertTrue("no" in pwd)
-
- # Check loglevel
- loglevel = get_config_value('LogLevel')[0]
- self.assertTrue("VERBOSE" in loglevel)
-
- # Check listen address
- address = get_config_value('ListenAddress')[0]
- self.assertTrue("127.0.0.1" in address)
-
- # Check keepalive
- keepalive = get_config_value('ClientAliveInterval')[0]
- self.assertTrue("100" in keepalive)
-
- # Check for running process
- self.assertTrue("sshd" in (p.name() for p in process_iter()))
-
- def test_ssh_multi(self):
- """ Check if SSH service can be configured and runs with multiple
- listen ports and listen-addresses """
- ports = ['22', '2222']
- for port in ports:
- self.session.set(base_path + ['port', port])
-
- addresses = ['127.0.0.1', '::1']
- for address in addresses:
- self.session.set(base_path + ['listen-address', address])
-
- # commit changes
- self.session.commit()
-
- # Check configured port
- tmp = get_config_value('Port')
- for port in ports:
- self.assertIn(port, tmp)
-
- # Check listen address
- tmp = get_config_value('ListenAddress')
- for address in addresses:
- self.assertIn(address, tmp)
-
- # Check for running process
- self.assertTrue("sshd" in (p.name() for p in process_iter()))
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/scripts/cli/test_system_lcd.py b/scripts/cli/test_system_lcd.py
deleted file mode 100755
index 931a91c53..000000000
--- a/scripts/cli/test_system_lcd.py
+++ /dev/null
@@ -1,54 +0,0 @@
-#!/usr/bin/env python3
-#
-# Copyright (C) 2020 Francois Mertz fireboxled@gmail.com
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 2 or later as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import unittest
-
-from configparser import ConfigParser
-from psutil import process_iter
-from vyos.configsession import ConfigSession
-
-base_path = ['system', 'lcd']
-
-class TestSystemLCD(unittest.TestCase):
- def setUp(self):
- self.session = ConfigSession(os.getpid())
-
- def tearDown(self):
- self.session.delete(base_path)
- self.session.commit()
- del self.session
-
- def test_system_display(self):
- # configure some system display
- self.session.set(base_path + ['device', 'ttyS1'])
- self.session.set(base_path + ['model', 'cfa-533'])
-
- # commit changes
- self.session.commit()
-
- # load up ini-styled LCDd.conf
- conf = ConfigParser()
- conf.read('/run/LCDd/LCDd.conf')
-
- self.assertEqual(conf['CFontzPacket']['Model'], '533')
- self.assertEqual(conf['CFontzPacket']['Device'], '/dev/ttyS1')
-
- # both processes running
- self.assertTrue('LCDd' in (p.name() for p in process_iter()))
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/scripts/cli/test_system_login.py b/scripts/cli/test_system_login.py
deleted file mode 100755
index 3c4b1fa28..000000000
--- a/scripts/cli/test_system_login.py
+++ /dev/null
@@ -1,67 +0,0 @@
-#!/usr/bin/env python3
-#
-# Copyright (C) 2019-2020 VyOS maintainers and contributors
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 2 or later as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import re
-import unittest
-
-from subprocess import Popen, PIPE
-from vyos.configsession import ConfigSession, ConfigSessionError
-import vyos.util as util
-
-base_path = ['system', 'login']
-users = ['vyos1', 'vyos2']
-
-class TestSystemLogin(unittest.TestCase):
- def setUp(self):
- self.session = ConfigSession(os.getpid())
-
- def tearDown(self):
- # Delete individual users from configuration
- for user in users:
- self.session.delete(base_path + ['user', user])
-
- self.session.commit()
- del self.session
-
- def test_user(self):
- """ Check if user can be created and we can SSH to localhost """
- self.session.set(['service', 'ssh', 'port', '22'])
-
- for user in users:
- name = "VyOS Roxx " + user
- home_dir = "/tmp/" + user
-
- self.session.set(base_path + ['user', user, 'authentication', 'plaintext-password', user])
- self.session.set(base_path + ['user', user, 'full-name', 'VyOS Roxx'])
- self.session.set(base_path + ['user', user, 'home-directory', home_dir])
-
- self.session.commit()
-
- for user in users:
- cmd = ['su','-', user]
- proc = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)
- tmp = "{}\nuname -a".format(user)
- proc.stdin.write(tmp.encode())
- proc.stdin.flush()
- (stdout, stderr) = proc.communicate()
-
- # stdout is something like this:
- # b'Linux vyos 4.19.101-amd64-vyos #1 SMP Sun Feb 2 10:18:07 UTC 2020 x86_64 GNU/Linux\n'
- self.assertTrue(len(stdout) > 40)
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/scripts/cli/test_system_nameserver.py b/scripts/cli/test_system_nameserver.py
deleted file mode 100755
index 9040be072..000000000
--- a/scripts/cli/test_system_nameserver.py
+++ /dev/null
@@ -1,66 +0,0 @@
-#!/usr/bin/env python3
-#
-# Copyright (C) 2019-2020 VyOS maintainers and contributors
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 2 or later as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import re
-import unittest
-
-from vyos.configsession import ConfigSession, ConfigSessionError
-import vyos.util as util
-
-RESOLV_CONF = '/etc/resolv.conf'
-
-test_servers = ['192.0.2.10', '2001:db8:1::100']
-base_path = ['system', 'name-server']
-
-def get_name_servers():
- resolv_conf = util.read_file(RESOLV_CONF)
- return re.findall(r'\n?nameserver\s+(.*)', resolv_conf)
-
-class TestSystemNameServer(unittest.TestCase):
- def setUp(self):
- self.session = ConfigSession(os.getpid())
-
- def tearDown(self):
- # Delete existing name servers
- self.session.delete(base_path)
- self.session.commit()
-
- del self.session
-
- def test_add_server(self):
- """ Check if server is added to resolv.conf """
- for s in test_servers:
- self.session.set(base_path + [s])
- self.session.commit()
-
- servers = get_name_servers()
- for s in servers:
- self.assertTrue(s in servers)
-
- def test_delete_server(self):
- """ Test if a deleted server disappears from resolv.conf """
- for s in test_servers:
- self.session.delete(base_path + [s])
- self.session.commit()
-
- servers = get_name_servers()
- for s in servers:
- self.assertTrue(test_server_1 not in servers)
-
-if __name__ == '__main__':
- unittest.main()
-
diff --git a/scripts/cli/test_system_ntp.py b/scripts/cli/test_system_ntp.py
deleted file mode 100755
index 856a28916..000000000
--- a/scripts/cli/test_system_ntp.py
+++ /dev/null
@@ -1,108 +0,0 @@
-#!/usr/bin/env python3
-#
-# Copyright (C) 2019-2020 VyOS maintainers and contributors
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 2 or later as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import re
-import os
-import unittest
-
-from psutil import process_iter
-from vyos.configsession import ConfigSession, ConfigSessionError
-from vyos.template import vyos_address_from_cidr, vyos_netmask_from_cidr
-from vyos.util import read_file
-
-NTP_CONF = '/etc/ntp.conf'
-base_path = ['system', 'ntp']
-
-def get_config_value(key):
- tmp = read_file(NTP_CONF)
- tmp = re.findall(r'\n?{}\s+(.*)'.format(key), tmp)
- # remove possible trailing whitespaces
- return [item.strip() for item in tmp]
-
-class TestSystemNTP(unittest.TestCase):
- def setUp(self):
- self.session = ConfigSession(os.getpid())
- # ensure we can also run this test on a live system - so lets clean
- # out the current configuration :)
- self.session.delete(base_path)
-
- def tearDown(self):
- self.session.delete(base_path)
- self.session.commit()
- del self.session
-
- def test_ntp_options(self):
- """ Test basic NTP support with multiple servers and their options """
- servers = ['192.0.2.1', '192.0.2.2']
- options = ['noselect', 'preempt', 'prefer']
-
- for server in servers:
- for option in options:
- self.session.set(base_path + ['server', server, option])
-
- # commit changes
- self.session.commit()
-
- # Check generated configuration
- tmp = get_config_value('server')
- for server in servers:
- test = f'{server} iburst ' + ' '.join(options)
- self.assertTrue(test in tmp)
-
- # Check for running process
- self.assertTrue("ntpd" in (p.name() for p in process_iter()))
-
- def test_ntp_clients(self):
- """ Test the allowed-networks statement """
- listen_address = ['127.0.0.1', '::1']
- for listen in listen_address:
- self.session.set(base_path + ['listen-address', listen])
-
- networks = ['192.0.2.0/24', '2001:db8:1000::/64']
- for network in networks:
- self.session.set(base_path + ['allow-clients', 'address', network])
-
- # Verify "NTP server not configured" verify() statement
- with self.assertRaises(ConfigSessionError):
- self.session.commit()
-
- servers = ['192.0.2.1', '192.0.2.2']
- for server in servers:
- self.session.set(base_path + ['server', server])
-
- self.session.commit()
-
- # Check generated client address configuration
- for network in networks:
- network_address = vyos_address_from_cidr(network)
- network_netmask = vyos_netmask_from_cidr(network)
-
- tmp = get_config_value(f'restrict {network_address}')[0]
- test = f'mask {network_netmask} nomodify notrap nopeer'
- self.assertTrue(tmp in test)
-
- # Check listen address
- tmp = get_config_value('interface')
- test = ['ignore wildcard']
- for listen in listen_address:
- test.append(f'listen {listen}')
- self.assertEqual(tmp, test)
-
- # Check for running process
- self.assertTrue("ntpd" in (p.name() for p in process_iter()))
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/scripts/cli/test_vpn_anyconnect.py b/scripts/cli/test_vpn_anyconnect.py
deleted file mode 100755
index dd8ab1609..000000000
--- a/scripts/cli/test_vpn_anyconnect.py
+++ /dev/null
@@ -1,58 +0,0 @@
-#!/usr/bin/env python3
-#
-# Copyright (C) 2020 VyOS maintainers and contributors
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 2 or later as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import re
-import os
-import unittest
-
-from psutil import process_iter
-from vyos.configsession import ConfigSession, ConfigSessionError
-from vyos.util import read_file
-
-OCSERV_CONF = '/run/ocserv/ocserv.conf'
-base_path = ['vpn', 'anyconnect']
-cert = '/etc/ssl/certs/ssl-cert-snakeoil.pem'
-cert_key = '/etc/ssl/private/ssl-cert-snakeoil.key'
-
-class TestVpnAnyconnect(unittest.TestCase):
- def setUp(self):
- self.session = ConfigSession(os.getpid())
-
- def tearDown(self):
- # Delete vpn anyconnect configuration
- self.session.delete(base_path)
- self.session.commit()
-
- del self.session
-
- def test_vpn(self):
- user = 'vyos_user'
- password = 'vyos_pass'
- self.session.delete(base_path)
- self.session.set(base_path + ["authentication", "local-users", "username", user, "password", password])
- self.session.set(base_path + ["authentication", "mode", "local"])
- self.session.set(base_path + ["network-settings", "client-ip-settings", "subnet", "192.0.2.0/24"])
- self.session.set(base_path + ["ssl", "ca-cert-file", cert])
- self.session.set(base_path + ["ssl", "cert-file", cert])
- self.session.set(base_path + ["ssl", "key-file", cert_key])
-
- self.session.commit()
-
- # Check for running process
- self.assertTrue("ocserv-main" in (p.name() for p in process_iter()))
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/scripts/cli/test_vrf.py b/scripts/cli/test_vrf.py
deleted file mode 100755
index efa095b30..000000000
--- a/scripts/cli/test_vrf.py
+++ /dev/null
@@ -1,52 +0,0 @@
-#!/usr/bin/env python3
-#
-# Copyright (C) 2020 VyOS maintainers and contributors
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 2 or later as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import unittest
-
-from vyos.configsession import ConfigSession, ConfigSessionError
-from vyos.util import read_file
-
-class VRFTest(unittest.TestCase):
- def setUp(self):
- self.session = ConfigSession(os.getpid())
- self._vrfs = ['red', 'green', 'blue']
-
- def tearDown(self):
- # delete all VRFs
- self.session.delete(['vrf'])
- self.session.commit()
- del self.session
-
- def test_table_id(self):
- table = 1000
- for vrf in self._vrfs:
- base = ['vrf', 'name', vrf]
- description = "VyOS-VRF-" + vrf
- self.session.set(base + ['description', description])
-
- # check validate() - a table ID is mandatory
- with self.assertRaises(ConfigSessionError):
- self.session.commit()
-
- self.session.set(base + ['table', str(table)])
- table += 1
-
- # commit changes
- self.session.commit()
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/scripts/import-conf-mode-commands b/scripts/import-conf-mode-commands
new file mode 100755
index 000000000..996b31c9c
--- /dev/null
+++ b/scripts/import-conf-mode-commands
@@ -0,0 +1,255 @@
+#!/usr/bin/env python3
+#
+# build-command-template: converts old style commands definitions to XML
+#
+# Copyright (C) 2019 VyOS maintainers <maintainers@vyos.net>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+# USA
+
+
+import os
+import re
+import sys
+
+from lxml import etree
+
+
+# Node types
+NODE = 0
+LEAF_NODE = 1
+TAG_NODE = 2
+
+def parse_command_data(t):
+ regs = {
+ 'help': r'\bhelp:(.*)(?:\n|$)',
+ 'priority': r'\bpriority:(.*)(?:\n|$)',
+ 'type': r'\btype:(.*)(?:\n|$)',
+ 'syntax_expression_var': r'\bsyntax:expression: \$VAR\(\@\) in (.*)'
+ }
+
+ data = {'multi': False, 'help': ""}
+
+ for r in regs:
+ try:
+ data[r] = re.search(regs[r], t).group(1).strip()
+ except:
+ data[r] = None
+
+ # val_help is special: there can be multiple instances
+ val_help_strings = re.findall(r'\bval_help:(.*)(?:\n|$)', t)
+ val_help = []
+ for v in val_help_strings:
+ try:
+ fmt, msg = re.match(r'\s*(.*)\s*;\s*(.*)\s*(?:\n|$)', v).groups()
+ except:
+ fmt = "<text>"
+ msg = v
+ val_help.append((fmt, msg))
+ data['val_help'] = val_help
+
+ # multi is on/off
+ if re.match(r'\bmulti:', t):
+ data['multi'] = True
+
+ return(data)
+
+def walk(tree, base_path, name):
+ path = os.path.join(base_path, name)
+
+ contents = os.listdir(path)
+
+ # Determine node type and create XML element for the node
+ # Tag node dirs will always have 'node.tag' subdir and 'node.def' file
+ # Leaf node dirs have nothing but a 'node.def' file
+ # Everything that doesn't match either of these patterns is a normal node
+ if 'node.tag' in contents:
+ print("Creating a tag node from {0}".format(path))
+ elem = etree.Element('tagNode')
+ node_type = TAG_NODE
+ elif contents == ['node.def']:
+ print("Creating a leaf node from {0}".format(path))
+ elem = etree.Element('leafNode')
+ node_type = LEAF_NODE
+ else:
+ print("Creating a node from {0}".format(path))
+ elem = etree.Element('node')
+ node_type = NODE
+
+ # Read and parse the command definition data (the 'node.def' file)
+ with open(os.path.join(path, 'node.def'), 'r') as f:
+ node_def = f.read()
+ data = parse_command_data(node_def)
+
+ # Import the data into the properties element
+ props_elem = etree.Element('properties')
+
+ if data['priority']:
+ # Priority values sometimes come with comments that explain the value choice
+ try:
+ prio, prio_comment = re.match(r'\s*(\d+)\s*#(.*)', data['priority']).groups()
+ except:
+ prio = data['priority'].strip()
+ prio_comment = None
+ prio_elem = etree.Element('priority')
+ prio_elem.text = prio
+ props_elem.append(prio_elem)
+ if prio_comment:
+ prio_comment_elem = etree.Comment(prio_comment)
+ props_elem.append(prio_comment_elem)
+
+ if data['multi']:
+ multi_elem = etree.Element('multi')
+ props_elem.append(multi_elem)
+
+ if data['help']:
+ help_elem = etree.Element('help')
+ help_elem.text = data['help']
+ props_elem.append(help_elem)
+
+ # For leaf nodes, absense of a type: tag means they take no values
+ # For any other nodes, it doesn't mean anything
+ if not data['type'] and (node_type == LEAF_NODE):
+ valueless = etree.Element('valueless')
+ props_elem.append(valueless)
+
+ # There can be only one constraint element in the definition
+ # Create it now, we'll modify it in the next two cases, then append
+ constraint_elem = etree.Element('constraint')
+ has_constraint = False
+
+ # Add regexp field for multiple options
+ if data['syntax_expression_var']:
+ regex = etree.Element('regex')
+ constraint_error=etree.Element('constraintErrorMessage')
+ values = re.search(r'(.+) ; (.+)', data['syntax_expression_var']).group(1)
+ message = re.search(r'(.+) ; (.+)', data['syntax_expression_var']).group(2)
+ values = re.findall(r'\"(.+?)\"', values)
+ regex.text = '|'.join(values)
+ constraint_error.text = re.sub('\".*?VAR.*?\"', '', message)
+ constraint_error.text = re.sub(r'[\"|\\]', '', message)
+ constraint_elem.append(regex)
+ props_elem.append(constraint_elem)
+ props_elem.append(constraint_error)
+
+ if data['val_help']:
+ for vh in data['val_help']:
+ vh_elem = etree.Element('valueHelp')
+
+ vh_fmt_elem = etree.Element('format')
+ # Many commands use special "u32:<start>-<end>" format for ranges
+ if re.match(r'u32:', vh[0]):
+ vh_fmt = re.match(r'u32:(.*)', vh[0]).group(1).strip()
+
+ # If valid range of values is specified in val_help, we can automatically
+ # create a constraint for it
+ # Extracting it from syntax:expression: would be much more complicated
+ vh_validator = etree.Element('validator')
+ vh_validator.set("name", "numeric")
+ vh_validator.set("argument", "--range {0}".format(vh_fmt))
+ constraint_elem.append(vh_validator)
+ has_constraint = True
+ else:
+ vh_fmt = vh[0]
+ vh_fmt_elem.text = vh_fmt
+
+ vh_help_elem = etree.Element('description')
+ vh_help_elem.text = vh[1]
+
+ vh_elem.append(vh_fmt_elem)
+ vh_elem.append(vh_help_elem)
+ props_elem.append(vh_elem)
+
+ # Translate the "type:" to the new validator system
+ if data['type']:
+ t = data['type']
+ if t == 'txt':
+ # Can't infer anything from the generic "txt" type
+ pass
+ else:
+ validator = etree.Element('validator')
+ if t == 'u32':
+ validator.set('name', 'numeric')
+ validator.set('argument', '--non-negative')
+ elif t == 'ipv4':
+ validator.set('name', 'ipv4-address')
+ elif t == 'ipv4net':
+ validator.set('name', 'ipv4-prefix')
+ elif t == 'ipv6':
+ validator.set('name', 'ipv6-address')
+ elif t == 'ipv6net':
+ validator.set('name', 'ipv6-prefix')
+ elif t == 'macaddr':
+ validator.set('name', 'mac-address')
+ else:
+ print("Warning: unsupported type \'{0}\'".format(t))
+ validator = None
+
+ if (validator is not None) and (not has_constraint):
+ # If has_constraint is true, it means a more specific validator
+ # was already extracted from another option
+ constraint_elem.append(validator)
+ has_constraint = True
+
+ if has_constraint:
+ props_elem.append(constraint_elem)
+
+ elem.append(props_elem)
+
+ elem.set("name", name)
+
+ if node_type != LEAF_NODE:
+ children = etree.Element('children')
+
+ # Create the next level dir path,
+ # accounting for the "virtual" node.tag subdir for tag nodes
+ next_level = path
+ if node_type == TAG_NODE:
+ next_level = os.path.join(path, 'node.tag')
+
+ # Walk the subdirs of the next level
+ for d in os.listdir(next_level):
+ dp = os.path.join(next_level, d)
+ if os.path.isdir(dp):
+ walk(children, next_level, d)
+
+ elem.append(children)
+
+ tree.append(elem)
+
+if __name__ == '__main__':
+ if len(sys.argv) < 2:
+ print("Usage: {0} <base path>".format(sys.argv[0]))
+ sys.exit(1)
+ else:
+ base_path = sys.argv[1]
+
+ root = etree.Element('interfaceDefinition')
+ contents = os.listdir(base_path)
+ elem = etree.Element('node')
+ elem.set('name', os.path.basename(base_path))
+ children = etree.Element('children')
+
+ for c in contents:
+ path = os.path.join(base_path, c)
+ if os.path.isdir(path):
+ walk(children, base_path, c)
+
+ elem.append(children)
+ root.append(elem)
+
+ xml_data = etree.tostring(root, pretty_print=True).decode()
+ with open('output.xml', 'w') as f:
+ f.write(xml_data)
diff --git a/scripts/system/test_module_load.py b/scripts/system/test_module_load.py
deleted file mode 100755
index 59c3e0b6a..000000000
--- a/scripts/system/test_module_load.py
+++ /dev/null
@@ -1,42 +0,0 @@
-#!/usr/bin/env python3
-#
-# Copyright (C) 2019-2020 VyOS maintainers and contributors
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 2 or later as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import unittest
-
-modules = {
- "intel": ["e1000", "e1000e", "igb", "ixgb", "ixgbe", "ixgbevf", "i40e", "i40evf", "iavf"],
- "accel_ppp": ["ipoe", "vlan_mon"],
- "misc": ["wireguard"]
-}
-
-class TestKernelModules(unittest.TestCase):
- def test_load_modules(self):
- success = True
- for msk in modules:
- ms = modules[msk]
- for m in ms:
- # We want to uncover all modules that fail,
- # not fail at the first one
- try:
- os.system("modprobe {0}".format(m))
- except:
- success = False
-
- self.assertTrue(success)
-
-if __name__ == '__main__':
- unittest.main()