diff options
Diffstat (limited to 'scripts')
35 files changed, 833 insertions, 2598 deletions
diff --git a/scripts/build-command-op-templates b/scripts/build-command-op-templates new file mode 100755 index 000000000..c60b32a1e --- /dev/null +++ b/scripts/build-command-op-templates @@ -0,0 +1,225 @@ +#!/usr/bin/env python3 +# +# build-command-template: converts new style command definitions in XML +# to the old style (bunch of dirs and node.def's) command templates +# +# Copyright (C) 2017 VyOS maintainers <maintainers@vyos.net> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 +# USA + +import sys +import os +import argparse +import copy +import functools + +from lxml import etree as ET + +# Defaults + +validator_dir = "/opt/vyatta/libexec/validators" +default_constraint_err_msg = "Invalid value" + + +## Get arguments + +parser = argparse.ArgumentParser(description='Converts new-style XML interface definitions to old-style command templates') +parser.add_argument('--debug', help='Enable debug information output', action='store_true') +parser.add_argument('INPUT_FILE', type=str, help="XML interface definition file") +parser.add_argument('SCHEMA_FILE', type=str, help="RelaxNG schema file") +parser.add_argument('OUTPUT_DIR', type=str, help="Output directory") + +args = parser.parse_args() + +input_file = args.INPUT_FILE +schema_file = args.SCHEMA_FILE +output_dir = args.OUTPUT_DIR +debug = args.debug + +## Load and validate the inputs + +try: + xml = ET.parse(input_file) +except Exception as e: + print("Failed to load interface definition file {0}".format(input_file)) + print(e) + sys.exit(1) + +try: + relaxng_xml = ET.parse(schema_file) + validator = ET.RelaxNG(relaxng_xml) + + if not validator.validate(xml): + print(validator.error_log) + print("Interface definition file {0} does not match the schema!".format(input_file)) + sys.exit(1) +except Exception as e: + print("Failed to load the XML schema {0}".format(schema_file)) + print(e) + sys.exit(1) + +if not os.access(output_dir, os.W_OK): + print("The output directory {0} is not writeable".format(output_dir)) + sys.exit(1) + +## If we got this far, everything must be ok and we can convert the file + +def make_path(l): + path = functools.reduce(os.path.join, l) + if debug: + print(path) + return path + +def get_properties(p): + props = {} + + if p is None: + return props + + # Get the help string + try: + props["help"] = p.find("help").text + except: + props["help"] = "No help available" + + + # Get the completion help strings + try: + che = p.findall("completionHelp") + ch = "" + for c in che: + scripts = c.findall("script") + paths = c.findall("path") + lists = c.findall("list") + + # Current backend doesn't support multiple allowed: tags + # so we get to emulate it + comp_exprs = [] + for i in lists: + comp_exprs.append("echo \"{0}\"".format(i.text)) + for i in paths: + comp_exprs.append("/bin/cli-shell-api listActiveNodes {0} | sed -e \"s/'//g\" && echo".format(i.text)) + for i in scripts: + comp_exprs.append("{0}".format(i.text)) + comp_help = " && ".join(comp_exprs) + props["comp_help"] = comp_help + except: + props["comp_help"] = [] + + return props + + +def make_node_def(props, command): + # XXX: replace with a template processor if it grows + # out of control + + node_def = "" + + if "help" in props: + node_def += "help: {0}\n".format(props["help"]) + + + if "comp_help" in props: + node_def += "allowed: {0}\n".format(props["comp_help"]) + + + if command is not None: + node_def += "run: {0}\n".format(command.text) + + + if debug: + print("The contents of the node.def file:\n", node_def) + + return node_def + +def process_node(n, tmpl_dir): + # Avoid mangling the path from the outer call + my_tmpl_dir = copy.copy(tmpl_dir) + + props_elem = n.find("properties") + children = n.find("children") + command = n.find("command") + + name = n.get("name") + + node_type = n.tag + + my_tmpl_dir.append(name) + + if debug: + print("Name of the node: {};\n Created directory: ".format(name), end="") + os.makedirs(make_path(my_tmpl_dir), exist_ok=True) + + props = get_properties(props_elem) + + if node_type == "node": + if debug: + print("Processing node {}".format(name)) + + nodedef_path = os.path.join(make_path(my_tmpl_dir), "node.def") + if not os.path.exists(nodedef_path): + with open(nodedef_path, "w") as f: + f.write(make_node_def(props, command)) + else: + # Something has already generated this file + pass + + if children is not None: + inner_nodes = children.iterfind("*") + for inner_n in inner_nodes: + process_node(inner_n, my_tmpl_dir) + if node_type == "tagNode": + if debug: + print("Processing tag node {}".format(name)) + + os.makedirs(make_path(my_tmpl_dir), exist_ok=True) + + nodedef_path = os.path.join(make_path(my_tmpl_dir), "node.def") + if not os.path.exists(nodedef_path): + with open(nodedef_path, "w") as f: + f.write('help: {0}\n'.format(props['help'])) + else: + # Something has already generated this file + pass + + # Create the inner node.tag part + my_tmpl_dir.append("node.tag") + os.makedirs(make_path(my_tmpl_dir), exist_ok=True) + if debug: + print("Created path for the tagNode: {}".format(make_path(my_tmpl_dir)), end="") + + # Not sure if we want partially defined tag nodes, write the file unconditionally + with open(os.path.join(make_path(my_tmpl_dir), "node.def"), "w") as f: + f.write(make_node_def(props, command)) + + if children is not None: + inner_nodes = children.iterfind("*") + for inner_n in inner_nodes: + process_node(inner_n, my_tmpl_dir) + else: + # This is a leaf node + if debug: + print("Processing leaf node {}".format(name)) + + with open(os.path.join(make_path(my_tmpl_dir), "node.def"), "w") as f: + f.write(make_node_def(props, command)) + + +root = xml.getroot() + +nodes = root.iterfind("*") +for n in nodes: + process_node(n, [output_dir]) diff --git a/scripts/build-command-templates b/scripts/build-command-templates new file mode 100755 index 000000000..457adbec2 --- /dev/null +++ b/scripts/build-command-templates @@ -0,0 +1,306 @@ +#!/usr/bin/env python3 +# +# build-command-template: converts new style command definitions in XML +# to the old style (bunch of dirs and node.def's) command templates +# +# Copyright (C) 2017 VyOS maintainers <maintainers@vyos.net> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 +# USA + +import sys +import os +import argparse +import copy +import functools + +from lxml import etree as ET + +# Defaults + +#validator_dir = "/usr/libexec/vyos/validators" +validator_dir = "${vyos_validators_dir}" +default_constraint_err_msg = "Invalid value" + + +## Get arguments + +parser = argparse.ArgumentParser(description='Converts new-style XML interface definitions to old-style command templates') +parser.add_argument('--debug', help='Enable debug information output', action='store_true') +parser.add_argument('INPUT_FILE', type=str, help="XML interface definition file") +parser.add_argument('SCHEMA_FILE', type=str, help="RelaxNG schema file") +parser.add_argument('OUTPUT_DIR', type=str, help="Output directory") + +args = parser.parse_args() + +input_file = args.INPUT_FILE +schema_file = args.SCHEMA_FILE +output_dir = args.OUTPUT_DIR +debug = args.debug + +#debug = True + +## Load and validate the inputs + +try: + xml = ET.parse(input_file) +except Exception as e: + print("Failed to load interface definition file {0}".format(input_file)) + print(e) + sys.exit(1) + +try: + relaxng_xml = ET.parse(schema_file) + validator = ET.RelaxNG(relaxng_xml) + + if not validator.validate(xml): + print(validator.error_log) + print("Interface definition file {0} does not match the schema!".format(input_file)) + sys.exit(1) +except Exception as e: + print("Failed to load the XML schema {0}".format(schema_file)) + print(e) + sys.exit(1) + +if not os.access(output_dir, os.W_OK): + print("The output directory {0} is not writeable".format(output_dir)) + sys.exit(1) + +## If we got this far, everything must be ok and we can convert the file + +def make_path(l): + path = functools.reduce(os.path.join, l) + if debug: + print(path) + return path + +def get_properties(p): + props = {} + + if p is None: + return props + + # Get the help string + try: + props["help"] = p.find("help").text + except: + pass + + # Get value help strings + try: + vhe = p.findall("valueHelp") + vh = [] + for v in vhe: + vh.append( (v.find("format").text, v.find("description").text) ) + props["val_help"] = vh + except: + props["val_help"] = [] + + # Get the constraint statements + error_msg = default_constraint_err_msg + # Get the error message if it's there + try: + error_msg = p.find("constraintErrorMessage").text + except: + pass + + vce = p.find("constraint") + vc = [] + if vce is not None: + # The old backend doesn't support multiple validators in OR mode + # so we emulate it + + regexes = [] + regex_elements = vce.findall("regex") + if regex_elements is not None: + regexes = list(map(lambda e: e.text.strip().replace('\\','\\\\'), regex_elements)) + if "" in regexes: + print("Warning: empty regex, node will be accepting any value") + + validator_elements = vce.findall("validator") + validators = [] + if validator_elements is not None: + for v in validator_elements: + v_name = os.path.join(validator_dir, v.get("name")) + + # XXX: lxml returns None for empty arguments + v_argument = None + try: + v_argument = v.get("argument") + except: + pass + if v_argument is None: + v_argument = "" + + validators.append("{0} {1}".format(v_name, v_argument)) + + + regex_args = " ".join(map(lambda s: "--regex \\\'{0}\\\'".format(s), regexes)) + validator_args = " ".join(map(lambda s: "--exec \\\"{0}\\\"".format(s), validators)) + validator_script = '${vyos_libexec_dir}/validate-value' + validator_string = "exec \"{0} {1} {2} --value \\\'$VAR(@)\\\'\"; \"{3}\"".format(validator_script, regex_args, validator_args, error_msg) + + props["constraint"] = validator_string + + # Get the completion help strings + try: + che = p.findall("completionHelp") + ch = "" + for c in che: + scripts = c.findall("script") + paths = c.findall("path") + lists = c.findall("list") + + # Current backend doesn't support multiple allowed: tags + # so we get to emulate it + comp_exprs = [] + for i in lists: + comp_exprs.append("echo \"{0}\"".format(i.text)) + for i in paths: + comp_exprs.append("/bin/cli-shell-api listNodes {0}".format(i.text)) + for i in scripts: + comp_exprs.append("sh -c \"{0}\"".format(i.text)) + comp_help = " && ".join(comp_exprs) + props["comp_help"] = comp_help + except: + props["comp_help"] = [] + + # Get priority + try: + props["priority"] = p.find("priority").text + except: + pass + + # Get "multi" + if p.find("multi") is not None: + props["multi"] = True + + # Get "valueless" + if p.find("valueless") is not None: + props["valueless"] = True + + return props + +def make_node_def(props): + # XXX: replace with a template processor if it grows + # out of control + + node_def = "" + + if "tag" in props: + node_def += "tag:\n" + + if "multi" in props: + node_def += "multi:\n" + + if "type" in props: + # Will always be txt in practice if it's set + node_def += "type: {0}\n".format(props["type"]) + + if "priority" in props: + node_def += "priority: {0}\n".format(props["priority"]) + + if "help" in props: + node_def += "help: {0}\n".format(props["help"]) + + if "val_help" in props: + for v in props["val_help"]: + node_def += "val_help: {0}; {1}\n".format(v[0], v[1]) + + if "comp_help" in props: + node_def += "allowed: {0}\n".format(props["comp_help"]) + + if "constraint" in props: + node_def += "syntax:expression: {0}\n".format(props["constraint"]) + + if "owner" in props: + if "tag" in props: + node_def += "end: sudo sh -c \"VYOS_TAGNODE_VALUE='$VAR(@)' {0}\"\n".format(props["owner"]) + else: + node_def += "end: sudo sh -c \"{0}\"\n".format(props["owner"]) + + if debug: + print("The contents of the node.def file:\n", node_def) + + return node_def + +def process_node(n, tmpl_dir): + # Avoid mangling the path from the outer call + my_tmpl_dir = copy.copy(tmpl_dir) + + props_elem = n.find("properties") + children = n.find("children") + + name = n.get("name") + owner = n.get("owner") + node_type = n.tag + + my_tmpl_dir.append(name) + + if debug: + print("Name of the node: {0}. Created directory: {1}\n".format(name, "/".join(my_tmpl_dir)), end="") + os.makedirs(make_path(my_tmpl_dir), exist_ok=True) + + props = get_properties(props_elem) + if owner: + props["owner"] = owner + # Type should not be set for non-tag, non-leaf nodes + # For non-valueless leaf nodes, set the type to txt: to make them have some type, + # actual value validation is handled by constraints translated to syntax:expression: + if node_type != "node": + if "valueless" not in props.keys(): + props["type"] = "txt" + if node_type == "tagNode": + props["tag"] = "True" + + if node_type != "leafNode": + if "multi" in props: + raise ValueError("<multi/> tag is only allowed in <leafNode>") + if "valueless" in props: + raise ValueError("<valueless/> is only allowed in <leafNode>") + + nodedef_path = os.path.join(make_path(my_tmpl_dir), "node.def") + if not os.path.exists(nodedef_path): + with open(nodedef_path, "w") as f: + f.write(make_node_def(props)) + else: + # Something has already generated that file + pass + + + if node_type == "node": + inner_nodes = children.iterfind("*") + for inner_n in inner_nodes: + process_node(inner_n, my_tmpl_dir) + if node_type == "tagNode": + my_tmpl_dir.append("node.tag") + if debug: + print("Created path for the tagNode:", end="") + os.makedirs(make_path(my_tmpl_dir), exist_ok=True) + inner_nodes = children.iterfind("*") + for inner_n in inner_nodes: + process_node(inner_n, my_tmpl_dir) + else: + # This is a leaf node + pass + + +root = xml.getroot() + +nodes = root.iterfind("*") +for n in nodes: + if n.tag == "syntaxVersion": + continue + process_node(n, [output_dir]) diff --git a/scripts/build-component-versions b/scripts/build-component-versions new file mode 100755 index 000000000..5362dbdd4 --- /dev/null +++ b/scripts/build-component-versions @@ -0,0 +1,47 @@ +#!/usr/bin/env python3 + +import sys +import os +import argparse +import json + +from lxml import etree as ET + +parser = argparse.ArgumentParser() +parser.add_argument('INPUT_DIR', type=str, + help="Directory containing XML interface definition files") +parser.add_argument('OUTPUT_DIR', type=str, + help="Output directory for JSON file") + +args = parser.parse_args() + +input_dir = args.INPUT_DIR +output_dir = args.OUTPUT_DIR + +version_dict = {} + +for filename in os.listdir(input_dir): + filepath = os.path.join(input_dir, filename) + print(filepath) + try: + xml = ET.parse(filepath) + except Exception as e: + print("Failed to load interface definition file {0}".format(filename)) + print(e) + sys.exit(1) + + root = xml.getroot() + version_data = root.iterfind("syntaxVersion") + for ver in version_data: + component = ver.get("component") + version = int(ver.get("version")) + + v = version_dict.get(component) + if v is None: + version_dict[component] = version + elif version > v: + version_dict[component] = version + +out_file = os.path.join(output_dir, 'component-versions.json') +with open(out_file, 'w') as f: + json.dump(version_dict, f, indent=4, sort_keys=True) diff --git a/scripts/cli/base_interfaces_test.py b/scripts/cli/base_interfaces_test.py deleted file mode 100644 index 14ec7e137..000000000 --- a/scripts/cli/base_interfaces_test.py +++ /dev/null @@ -1,269 +0,0 @@ -# Copyright (C) 2019-2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import os -import unittest - -from netifaces import ifaddresses, AF_INET, AF_INET6 - -from vyos.configsession import ConfigSession -from vyos.ifconfig import Interface -from vyos.util import read_file -from vyos.validate import is_intf_addr_assigned, is_ipv6_link_local - -class BasicInterfaceTest: - class BaseTest(unittest.TestCase): - _test_ip = False - _test_mtu = False - _test_vlan = False - _test_qinq = False - _test_ipv6 = False - _base_path = [] - - _options = {} - _interfaces = [] - _qinq_range = ['10', '20', '30'] - _vlan_range = ['100', '200', '300', '2000'] - # choose IPv6 minimum MTU value for tests - this must always work - _mtu = '1280' - - def setUp(self): - self.session = ConfigSession(os.getpid()) - - self._test_addr = ['192.0.2.1/26', '192.0.2.255/31', '192.0.2.64/32', - '2001:db8:1::ffff/64', '2001:db8:101::1/112'] - self._test_mtu = False - self._options = {} - - def tearDown(self): - # we should not remove ethernet from the overall CLI - if 'ethernet' in self._base_path: - for interface in self._interfaces: - # when using a dedicated interface to test via TEST_ETH environment - # variable only this one will be cleared in the end - usable to test - # ethernet interfaces via SSH - self.session.delete(self._base_path + [interface]) - self.session.set(self._base_path + [interface, 'duplex', 'auto']) - self.session.set(self._base_path + [interface, 'speed', 'auto']) - self.session.set(self._base_path + [interface, 'smp-affinity', 'auto']) - else: - self.session.delete(self._base_path) - - self.session.commit() - del self.session - - def test_add_description(self): - """ - Check if description can be added to interface - """ - for intf in self._interfaces: - test_string='Description-Test-{}'.format(intf) - self.session.set(self._base_path + [intf, 'description', test_string]) - for option in self._options.get(intf, []): - self.session.set(self._base_path + [intf] + option.split()) - - self.session.commit() - - # Validate interface description - for intf in self._interfaces: - test_string='Description-Test-{}'.format(intf) - with open('/sys/class/net/{}/ifalias'.format(intf), 'r') as f: - tmp = f.read().rstrip() - self.assertTrue(tmp, test_string) - - def test_add_address_single(self): - """ - Check if a single address can be added to interface. - """ - addr = '192.0.2.0/31' - for intf in self._interfaces: - self.session.set(self._base_path + [intf, 'address', addr]) - for option in self._options.get(intf, []): - self.session.set(self._base_path + [intf] + option.split()) - - self.session.commit() - - for intf in self._interfaces: - self.assertTrue(is_intf_addr_assigned(intf, addr)) - - def test_add_address_multi(self): - """ - Check if IPv4/IPv6 addresses can be added to interface. - """ - - # Add address - for intf in self._interfaces: - for addr in self._test_addr: - self.session.set(self._base_path + [intf, 'address', addr]) - for option in self._options.get(intf, []): - self.session.set(self._base_path + [intf] + option.split()) - - self.session.commit() - - # Validate address - for intf in self._interfaces: - for af in AF_INET, AF_INET6: - for addr in ifaddresses(intf)[af]: - # checking link local addresses makes no sense - if is_ipv6_link_local(addr['addr']): - continue - - self.assertTrue(is_intf_addr_assigned(intf, addr['addr'])) - - def test_ipv6_link_local(self): - """ Common function for IPv6 link-local address assignemnts """ - if not self._test_ipv6: - return None - - for interface in self._interfaces: - base = self._base_path + [interface] - for option in self._options.get(interface, []): - self.session.set(base + option.split()) - - # after commit we must have an IPv6 link-local address - self.session.commit() - - for interface in self._interfaces: - for addr in ifaddresses(interface)[AF_INET6]: - self.assertTrue(is_ipv6_link_local(addr['addr'])) - - # disable IPv6 link-local address assignment - for interface in self._interfaces: - base = self._base_path + [interface] - self.session.set(base + ['ipv6', 'address', 'no-default-link-local']) - - # after commit we must have no IPv6 link-local address - self.session.commit() - - for interface in self._interfaces: - self.assertTrue(AF_INET6 not in ifaddresses(interface)) - - def _mtu_test(self, intf): - """ helper function to verify MTU size """ - with open('/sys/class/net/{}/mtu'.format(intf), 'r') as f: - tmp = f.read().rstrip() - self.assertEqual(tmp, self._mtu) - - def test_change_mtu(self): - """ Testcase if MTU can be changed on interface """ - if not self._test_mtu: - return None - for intf in self._interfaces: - base = self._base_path + [intf] - self.session.set(base + ['mtu', self._mtu]) - for option in self._options.get(intf, []): - self.session.set(base + option.split()) - - self.session.commit() - for intf in self._interfaces: - self._mtu_test(intf) - - def test_8021q_vlan(self): - """ Testcase for 802.1q VLAN interfaces """ - if not self._test_vlan: - return None - - for interface in self._interfaces: - base = self._base_path + [interface] - for option in self._options.get(interface, []): - self.session.set(base + option.split()) - - for vlan in self._vlan_range: - base = self._base_path + [interface, 'vif', vlan] - self.session.set(base + ['mtu', self._mtu]) - for address in self._test_addr: - self.session.set(base + ['address', address]) - - self.session.commit() - for intf in self._interfaces: - for vlan in self._vlan_range: - vif = f'{intf}.{vlan}' - for address in self._test_addr: - self.assertTrue(is_intf_addr_assigned(vif, address)) - self._mtu_test(vif) - - - def test_8021ad_qinq_vlan(self): - """ Testcase for 802.1ad Q-in-Q VLAN interfaces """ - if not self._test_qinq: - return None - - for interface in self._interfaces: - base = self._base_path + [interface] - for option in self._options.get(interface, []): - self.session.set(base + option.split()) - - for vif_s in self._qinq_range: - for vif_c in self._vlan_range: - base = self._base_path + [interface, 'vif-s', vif_s, 'vif-c', vif_c] - self.session.set(base + ['mtu', self._mtu]) - for address in self._test_addr: - self.session.set(base + ['address', address]) - - self.session.commit() - for interface in self._interfaces: - for vif_s in self._qinq_range: - for vif_c in self._vlan_range: - vif = f'{interface}.{vif_s}.{vif_c}' - for address in self._test_addr: - self.assertTrue(is_intf_addr_assigned(vif, address)) - self._mtu_test(vif) - - def test_ip_options(self): - """ test IP options like arp """ - if not self._test_ip: - return None - - for interface in self._interfaces: - arp_tmo = '300' - path = self._base_path + [interface] - for option in self._options.get(interface, []): - self.session.set(path + option.split()) - - # Options - self.session.set(path + ['ip', 'arp-cache-timeout', arp_tmo]) - self.session.set(path + ['ip', 'disable-arp-filter']) - self.session.set(path + ['ip', 'enable-arp-accept']) - self.session.set(path + ['ip', 'enable-arp-announce']) - self.session.set(path + ['ip', 'enable-arp-ignore']) - self.session.set(path + ['ip', 'enable-proxy-arp']) - self.session.set(path + ['ip', 'proxy-arp-pvlan']) - self.session.set(path + ['ip', 'source-validation', 'loose']) - - self.session.commit() - - for interface in self._interfaces: - tmp = read_file(f'/proc/sys/net/ipv4/neigh/{interface}/base_reachable_time_ms') - self.assertEqual(tmp, str((int(arp_tmo) * 1000))) # tmo value is in milli seconds - - tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/arp_filter') - self.assertEqual('0', tmp) - - tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/arp_accept') - self.assertEqual('1', tmp) - - tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/arp_announce') - self.assertEqual('1', tmp) - - tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/arp_ignore') - self.assertEqual('1', tmp) - - tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/proxy_arp') - self.assertEqual('1', tmp) - - tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/proxy_arp_pvlan') - self.assertEqual('1', tmp) - - tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/rp_filter') - self.assertEqual('2', tmp) diff --git a/scripts/cli/test_interfaces_bonding.py b/scripts/cli/test_interfaces_bonding.py deleted file mode 100755 index e3d3b25ee..000000000 --- a/scripts/cli/test_interfaces_bonding.py +++ /dev/null @@ -1,61 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import os -import unittest - -from base_interfaces_test import BasicInterfaceTest - -from vyos.ifconfig import Section -from vyos.configsession import ConfigSessionError -from vyos.util import read_file - -class BondingInterfaceTest(BasicInterfaceTest.BaseTest): - def setUp(self): - super().setUp() - - self._base_path = ['interfaces', 'bonding'] - self._interfaces = ['bond0'] - self._test_mtu = True - self._test_vlan = True - self._test_qinq = True - self._test_ipv6 = True - - self._members = [] - # we need to filter out VLAN interfaces identified by a dot (.) - # in their name - just in case! - if 'TEST_ETH' in os.environ: - self._members = os.environ['TEST_ETH'].split() - else: - for tmp in Section.interfaces("ethernet"): - if not '.' in tmp: - self._members.append(tmp) - - self._options['bond0'] = [] - for member in self._members: - self._options['bond0'].append(f'member interface {member}') - - - def test_add_address_single(self): - """ derived method to check if member interfaces are enslaved properly """ - super().test_add_address_single() - - for interface in self._interfaces: - slaves = read_file(f'/sys/class/net/{interface}/bonding/slaves').split() - self.assertListEqual(slaves, self._members) - -if __name__ == '__main__': - unittest.main() diff --git a/scripts/cli/test_interfaces_bridge.py b/scripts/cli/test_interfaces_bridge.py deleted file mode 100755 index bc0bb69c6..000000000 --- a/scripts/cli/test_interfaces_bridge.py +++ /dev/null @@ -1,70 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import os -import unittest - -from base_interfaces_test import BasicInterfaceTest -from vyos.ifconfig import Section - -class BridgeInterfaceTest(BasicInterfaceTest.BaseTest): - def setUp(self): - super().setUp() - - self._test_ipv6 = True - - self._base_path = ['interfaces', 'bridge'] - self._interfaces = ['br0'] - - self._members = [] - # we need to filter out VLAN interfaces identified by a dot (.) - # in their name - just in case! - if 'TEST_ETH' in os.environ: - self._members = os.environ['TEST_ETH'].split() - else: - for tmp in Section.interfaces("ethernet"): - if not '.' in tmp: - self._members.append(tmp) - - self._options['br0'] = [] - for member in self._members: - self._options['br0'].append(f'member interface {member}') - - def test_add_remove_member(self): - for interface in self._interfaces: - base = self._base_path + [interface] - self.session.set(base + ['stp']) - self.session.set(base + ['address', '192.0.2.1/24']) - - cost = 1000 - priority = 10 - # assign members to bridge interface - for member in self._members: - base_member = base + ['member', 'interface', member] - self.session.set(base_member + ['cost', str(cost)]) - self.session.set(base_member + ['priority', str(priority)]) - cost += 1 - priority += 1 - - self.session.commit() - - for interface in self._interfaces: - self.session.delete(self._base_path + [interface, 'member']) - - self.session.commit() - -if __name__ == '__main__': - unittest.main() diff --git a/scripts/cli/test_interfaces_dummy.py b/scripts/cli/test_interfaces_dummy.py deleted file mode 100755 index 01942fc89..000000000 --- a/scripts/cli/test_interfaces_dummy.py +++ /dev/null @@ -1,28 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import unittest - -from base_interfaces_test import BasicInterfaceTest - -class DummyInterfaceTest(BasicInterfaceTest.BaseTest): - def setUp(self): - super().setUp() - self._base_path = ['interfaces', 'dummy'] - self._interfaces = ['dum0', 'dum1', 'dum2'] - -if __name__ == '__main__': - unittest.main() diff --git a/scripts/cli/test_interfaces_ethernet.py b/scripts/cli/test_interfaces_ethernet.py deleted file mode 100755 index 761ec7506..000000000 --- a/scripts/cli/test_interfaces_ethernet.py +++ /dev/null @@ -1,68 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import os -import unittest - -from base_interfaces_test import BasicInterfaceTest -from vyos.ifconfig import Section - -class EthernetInterfaceTest(BasicInterfaceTest.BaseTest): - def setUp(self): - super().setUp() - - self._base_path = ['interfaces', 'ethernet'] - self._test_ip = True - self._test_mtu = True - self._test_vlan = True - self._test_qinq = True - self._test_ipv6 = True - self._interfaces = [] - - # we need to filter out VLAN interfaces identified by a dot (.) - # in their name - just in case! - if 'TEST_ETH' in os.environ: - tmp = os.environ['TEST_ETH'].split() - self._interfaces = tmp - else: - for tmp in Section.interfaces("ethernet"): - if not '.' in tmp: - self._interfaces.append(tmp) - - def test_dhcp_disable(self): - """ - When interface is configured as admin down, it must be admin down even - """ - for interface in self._interfaces: - self.session.set(self._base_path + [interface, 'disable']) - for option in self._options.get(interface, []): - self.session.set(self._base_path + [interface] + option.split()) - - # Also enable DHCP (ISC DHCP always places interface in admin up - # state so we check that we do not start DHCP client. - # https://phabricator.vyos.net/T2767 - self.session.set(self._base_path + [interface, 'address', 'dhcp']) - - self.session.commit() - - # Validate interface state - for interface in self._interfaces: - with open(f'/sys/class/net/{interface}/flags', 'r') as f: - flags = f.read() - self.assertEqual(int(flags, 16) & 1, 0) - -if __name__ == '__main__': - unittest.main() diff --git a/scripts/cli/test_interfaces_geneve.py b/scripts/cli/test_interfaces_geneve.py deleted file mode 100755 index f84a55f86..000000000 --- a/scripts/cli/test_interfaces_geneve.py +++ /dev/null @@ -1,37 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import os -import unittest - -from vyos.configsession import ConfigSession, ConfigSessionError -from base_interfaces_test import BasicInterfaceTest - - -class GeneveInterfaceTest(BasicInterfaceTest.BaseTest): - def setUp(self): - super().setUp() - - self._base_path = ['interfaces', 'geneve'] - self._options = { - 'gnv0': ['vni 10', 'remote 127.0.1.1'], - 'gnv1': ['vni 20', 'remote 127.0.1.2'], - } - self._interfaces = list(self._options) - - -if __name__ == '__main__': - unittest.main() diff --git a/scripts/cli/test_interfaces_l2tpv3.py b/scripts/cli/test_interfaces_l2tpv3.py deleted file mode 100755 index d8655d157..000000000 --- a/scripts/cli/test_interfaces_l2tpv3.py +++ /dev/null @@ -1,59 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import json -import jmespath -import unittest - -from base_interfaces_test import BasicInterfaceTest -from vyos.util import cmd - -class GeneveInterfaceTest(BasicInterfaceTest.BaseTest): - def setUp(self): - super().setUp() - - self._base_path = ['interfaces', 'l2tpv3'] - self._options = { - 'l2tpeth10': ['local-ip 127.0.0.1', 'remote-ip 127.10.10.10', - 'tunnel-id 100', 'peer-tunnel-id 10', - 'session-id 100', 'peer-session-id 10', - 'source-port 1010', 'destination-port 10101'], - 'l2tpeth20': ['local-ip 127.0.0.1', 'peer-session-id 20', - 'peer-tunnel-id 200', 'remote-ip 127.20.20.20', - 'session-id 20', 'tunnel-id 200', - 'source-port 2020', 'destination-port 20202'], - } - self._interfaces = list(self._options) - - def test_add_address_single(self): - super().test_add_address_single() - - command = 'sudo ip -j l2tp show session' - json_out = json.loads(cmd(command)) - for interface in self._options: - for config in json_out: - if config['interface'] == interface: - # convert list with configuration items into a dict - dict = {} - for opt in self._options[interface]: - dict.update({opt.split()[0].replace('-','_'): opt.split()[1]}) - - for key in ['peer_session_id', 'peer_tunnel_id', 'session_id', 'tunnel_id']: - self.assertEqual(str(config[key]), dict[key]) - - -if __name__ == '__main__': - unittest.main() diff --git a/scripts/cli/test_interfaces_loopback.py b/scripts/cli/test_interfaces_loopback.py deleted file mode 100755 index ba428b5d3..000000000 --- a/scripts/cli/test_interfaces_loopback.py +++ /dev/null @@ -1,41 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import unittest - -from base_interfaces_test import BasicInterfaceTest -from vyos.validate import is_intf_addr_assigned - -class LoopbackInterfaceTest(BasicInterfaceTest.BaseTest): - def setUp(self): - super().setUp() - # these addresses are never allowed to be removed from the system - self._loopback_addresses = ['127.0.0.1', '::1'] - self._base_path = ['interfaces', 'loopback'] - self._interfaces = ['lo'] - - def test_add_address_single(self): - super().test_add_address_single() - for addr in self._loopback_addresses: - self.assertTrue(is_intf_addr_assigned('lo', addr)) - - def test_add_address_multi(self): - super().test_add_address_multi() - for addr in self._loopback_addresses: - self.assertTrue(is_intf_addr_assigned('lo', addr)) - -if __name__ == '__main__': - unittest.main() diff --git a/scripts/cli/test_interfaces_macsec.py b/scripts/cli/test_interfaces_macsec.py deleted file mode 100755 index 0f1b6486d..000000000 --- a/scripts/cli/test_interfaces_macsec.py +++ /dev/null @@ -1,102 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import re -import unittest -from psutil import process_iter - -from vyos.ifconfig import Section -from base_interfaces_test import BasicInterfaceTest -from vyos.configsession import ConfigSessionError -from vyos.util import read_file - -def get_config_value(intf, key): - tmp = read_file(f'/run/wpa_supplicant/{intf}.conf') - tmp = re.findall(r'\n?{}=(.*)'.format(key), tmp) - return tmp[0] - -class MACsecInterfaceTest(BasicInterfaceTest.BaseTest): - def setUp(self): - super().setUp() - self._base_path = ['interfaces', 'macsec'] - self._options = { - 'macsec0': ['source-interface eth0', - 'security cipher gcm-aes-128'] - } - - # if we have a physical eth1 interface, add a second macsec instance - if 'eth1' in Section.interfaces("ethernet"): - macsec = { 'macsec1': ['source-interface eth1', 'security cipher gcm-aes-128'] } - self._options.update(macsec) - - self._interfaces = list(self._options) - - def test_encryption(self): - """ MACsec can be operating in authentication and encryption - mode - both using different mandatory settings, lets test - encryption as the basic authentication test has been performed - using the base class tests """ - intf = 'macsec0' - src_intf = 'eth0' - mak_cak = '232e44b7fda6f8e2d88a07bf78a7aff4' - mak_ckn = '40916f4b23e3d548ad27eedd2d10c6f98c2d21684699647d63d41b500dfe8836' - replay_window = '64' - self.session.set(self._base_path + [intf, 'security', 'encrypt']) - - # check validate() - Cipher suite must be set for MACsec - with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(self._base_path + [intf, 'security', 'cipher', 'gcm-aes-128']) - - # check validate() - Physical source interface must be set for MACsec - with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(self._base_path + [intf, 'source-interface', src_intf]) - - # check validate() - MACsec security keys mandartory when encryption is enabled - with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(self._base_path + [intf, 'security', 'mka', 'cak', mak_cak]) - - # check validate() - MACsec security keys mandartory when encryption is enabled - with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(self._base_path + [intf, 'security', 'mka', 'ckn', mak_ckn]) - - self.session.set(self._base_path + [intf, 'security', 'replay-window', replay_window]) - self.session.commit() - - tmp = get_config_value(src_intf, 'macsec_integ_only') - self.assertTrue("0" in tmp) - - tmp = get_config_value(src_intf, 'mka_cak') - self.assertTrue(mak_cak in tmp) - - tmp = get_config_value(src_intf, 'mka_ckn') - self.assertTrue(mak_ckn in tmp) - - # check that the default priority of 255 is programmed - tmp = get_config_value(src_intf, 'mka_priority') - self.assertTrue("255" in tmp) - - tmp = get_config_value(src_intf, 'macsec_replay_window') - self.assertTrue(replay_window in tmp) - - # Check for running process - self.assertTrue("wpa_supplicant" in (p.name() for p in process_iter())) - -if __name__ == '__main__': - unittest.main() diff --git a/scripts/cli/test_interfaces_pppoe.py b/scripts/cli/test_interfaces_pppoe.py deleted file mode 100755 index 822f05de6..000000000 --- a/scripts/cli/test_interfaces_pppoe.py +++ /dev/null @@ -1,162 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2019-2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import re -import os -import unittest - -from psutil import process_iter -from vyos.configsession import ConfigSession, ConfigSessionError -from vyos.util import read_file - -config_file = '/etc/ppp/peers/{}' -dhcp6c_config_file = '/run/dhcp6c/dhcp6c.{}.conf' -base_path = ['interfaces', 'pppoe'] - -def get_config_value(interface, key): - with open(config_file.format(interface), 'r') as f: - for line in f: - if line.startswith(key): - return list(line.split()) - return [] - -def get_dhcp6c_config_value(interface, key): - tmp = read_file(dhcp6c_config_file.format(interface)) - tmp = re.findall(r'\n?{}\s+(.*)'.format(key), tmp) - - out = [] - for item in tmp: - out.append(item.replace(';','')) - return out - -class PPPoEInterfaceTest(unittest.TestCase): - def setUp(self): - self.session = ConfigSession(os.getpid()) - self._interfaces = ['pppoe0', 'pppoe50'] - self._source_interface = 'eth0' - - def tearDown(self): - self.session.delete(base_path) - self.session.commit() - del self.session - - def test_pppoe(self): - """ Check if PPPoE dialer can be configured and runs """ - for interface in self._interfaces: - user = 'VyOS-user-' + interface - passwd = 'VyOS-passwd-' + interface - mtu = '1400' - - self.session.set(base_path + [interface, 'authentication', 'user', user]) - self.session.set(base_path + [interface, 'authentication', 'password', passwd]) - self.session.set(base_path + [interface, 'default-route', 'auto']) - self.session.set(base_path + [interface, 'mtu', mtu]) - self.session.set(base_path + [interface, 'no-peer-dns']) - - # check validate() - a source-interface is required - with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(base_path + [interface, 'source-interface', self._source_interface]) - - # commit changes - self.session.commit() - - # verify configuration file(s) - for interface in self._interfaces: - user = 'VyOS-user-' + interface - password = 'VyOS-passwd-' + interface - - tmp = get_config_value(interface, 'mtu')[1] - self.assertEqual(tmp, mtu) - tmp = get_config_value(interface, 'user')[1].replace('"', '') - self.assertEqual(tmp, user) - tmp = get_config_value(interface, 'password')[1].replace('"', '') - self.assertEqual(tmp, password) - tmp = get_config_value(interface, 'ifname')[1] - self.assertEqual(tmp, interface) - - # Check if ppp process is running in the interface in question - running = False - for p in process_iter(): - if "pppd" in p.name(): - if interface in p.cmdline(): - running = True - - self.assertTrue(running) - - def test_pppoe_dhcpv6pd(self): - """ Check if PPPoE dialer can be configured with DHCPv6-PD """ - address = '1' - sla_id = '0' - sla_len = '8' - for interface in self._interfaces: - self.session.set(base_path + [interface, 'authentication', 'user', 'vyos']) - self.session.set(base_path + [interface, 'authentication', 'password', 'vyos']) - self.session.set(base_path + [interface, 'default-route', 'none']) - self.session.set(base_path + [interface, 'no-peer-dns']) - self.session.set(base_path + [interface, 'source-interface', self._source_interface]) - self.session.set(base_path + [interface, 'ipv6', 'enable']) - - # prefix delegation stuff - dhcpv6_pd_base = base_path + [interface, 'dhcpv6-options', 'pd', '0'] - self.session.set(dhcpv6_pd_base + ['length', '56']) - self.session.set(dhcpv6_pd_base + ['interface', self._source_interface, 'address', address]) - self.session.set(dhcpv6_pd_base + ['interface', self._source_interface, 'sla-id', sla_id]) - - # commit changes - self.session.commit() - - # verify "normal" PPPoE value - 1492 is default MTU - tmp = get_config_value(interface, 'mtu')[1] - self.assertEqual(tmp, '1492') - tmp = get_config_value(interface, 'user')[1].replace('"', '') - self.assertEqual(tmp, 'vyos') - tmp = get_config_value(interface, 'password')[1].replace('"', '') - self.assertEqual(tmp, 'vyos') - - for param in ['+ipv6', 'ipv6cp-use-ipaddr']: - tmp = get_config_value(interface, param)[0] - self.assertEqual(tmp, param) - - # verify DHCPv6 prefix delegation - # will return: ['delegation', '::/56 infinity;'] - tmp = get_dhcp6c_config_value(interface, 'prefix')[1].split()[0] # mind the whitespace - self.assertEqual(tmp, '::/56') - tmp = get_dhcp6c_config_value(interface, 'prefix-interface')[0].split()[0] - self.assertEqual(tmp, self._source_interface) - tmp = get_dhcp6c_config_value(interface, 'ifid')[0] - self.assertEqual(tmp, address) - tmp = get_dhcp6c_config_value(interface, 'sla-id')[0] - self.assertEqual(tmp, sla_id) - tmp = get_dhcp6c_config_value(interface, 'sla-len')[0] - self.assertEqual(tmp, sla_len) - - # Check if ppp process is running in the interface in question - running = False - for p in process_iter(): - if "pppd" in p.name(): - running = True - self.assertTrue(running) - - # We can not check if wide-dhcpv6 process is running as it is started - # after the PPP interface gets a link to the ISP - but we can see if - # it would be started by the scripts - tmp = read_file(f'/etc/ppp/ipv6-up.d/1000-vyos-pppoe-{interface}') - tmp = re.findall(f'systemctl start dhcp6c@{interface}.service', tmp) - self.assertTrue(tmp) - -if __name__ == '__main__': - unittest.main() diff --git a/scripts/cli/test_interfaces_pseudo_ethernet.py b/scripts/cli/test_interfaces_pseudo_ethernet.py deleted file mode 100755 index bc2e6e7eb..000000000 --- a/scripts/cli/test_interfaces_pseudo_ethernet.py +++ /dev/null @@ -1,39 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import unittest - -from base_interfaces_test import BasicInterfaceTest - -class PEthInterfaceTest(BasicInterfaceTest.BaseTest): - - def setUp(self): - super().setUp() - self._base_path = ['interfaces', 'pseudo-ethernet'] - - self._test_ip = True - self._test_mtu = True - self._test_vlan = True - self._test_qinq = True - - self._options = { - 'peth0': ['source-interface eth1'], - 'peth1': ['source-interface eth1'], - } - self._interfaces = list(self._options) - -if __name__ == '__main__': - unittest.main() diff --git a/scripts/cli/test_interfaces_tunnel.py b/scripts/cli/test_interfaces_tunnel.py deleted file mode 100755 index 7611ffe26..000000000 --- a/scripts/cli/test_interfaces_tunnel.py +++ /dev/null @@ -1,109 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import os -import unittest - -from vyos.configsession import ConfigSession - -from base_interfaces_test import BasicInterfaceTest - -class TunnelInterfaceTest(BasicInterfaceTest.BaseTest): - # encoding, tunnel endpoint (v4/v6), address (v4/v6) - _valid = [ - ('gre', 4, 4), - ('gre', 4, 6), - ('ip6gre', 6, 4), - ('ip6gre', 6, 6), - ('gre-bridge', 4, 4), - ('ipip', 4, 4), - ('ipip', 4, 6), - ('ipip6', 6, 4), - ('ipip6', 6, 6), - ('ip6ip6', 6, 6), - ('sit', 4, 6), - ] - - local = { - 4: '10.100.{}.1/24', - 6: '2001:db8:{}::1/64', - } - - remote = { - 4: '192.0.{}.1', - 6: '2002::{}:1', - } - - address = { - 4: '10.100.{}.1/24', - 6: '2001:db8:{}::1/64', - } - - def setUp(self): - local = {} - remote = {} - address = {} - - self._intf_dummy = ['interfaces', 'dummy'] - self._base_path = ['interfaces', 'tunnel'] - self._interfaces = ['tun{}'.format(n) for n in range(len(self._valid))] - - self._test_mtu = True - super().setUp() - - for number in range(len(self._valid)): - dum4 = 'dum4{}'.format(number) - dum6 = 'dum6{}'.format(number) - - ipv4 = self.local[4].format(number) - ipv6 = self.local[6].format(number) - - local.setdefault(4, {})[number] = ipv4 - local.setdefault(6, {})[number] = ipv6 - - ipv4 = self.remote[4].format(number) - ipv6 = self.remote[6].format(number) - - remote.setdefault(4, {})[number] = ipv4 - remote.setdefault(6, {})[number] = ipv6 - - ipv4 = self.address[4].format(number) - ipv6 = self.address[6].format(number) - - address.setdefault(4, {})[number] = ipv4 - address.setdefault(6, {})[number] = ipv6 - - self.session.set(self._intf_dummy + [dum4, 'address', ipv4]) - self.session.set(self._intf_dummy + [dum6, 'address', ipv6]) - self.session.commit() - - for number, (encap, p2p, addr) in enumerate(self._valid): - intf = 'tun%d' % number - tunnel = {} - tunnel['encapsulation'] = encap - tunnel['local-ip'] = local[p2p][number].split('/')[0] - tunnel['remote-ip'] = remote[p2p][number].split('/')[0] - tunnel['address'] = address[addr][number] - for name in tunnel: - self.session.set(self._base_path + [intf, name, tunnel[name]]) - - def tearDown(self): - self.session.delete(self._intf_dummy) - super().tearDown() - - -if __name__ == '__main__': - unittest.main() diff --git a/scripts/cli/test_interfaces_vxlan.py b/scripts/cli/test_interfaces_vxlan.py deleted file mode 100755 index 2628e0285..000000000 --- a/scripts/cli/test_interfaces_vxlan.py +++ /dev/null @@ -1,35 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import unittest - -from vyos.configsession import ConfigSession, ConfigSessionError -from base_interfaces_test import BasicInterfaceTest - -class VXLANInterfaceTest(BasicInterfaceTest.BaseTest): - def setUp(self): - super().setUp() - - self._test_mtu = True - self._base_path = ['interfaces', 'vxlan'] - self._options = { - 'vxlan0': ['vni 10', 'remote 127.0.0.2'], - 'vxlan1': ['vni 20', 'group 239.1.1.1', 'source-interface eth0'], - } - self._interfaces = list(self._options) - -if __name__ == '__main__': - unittest.main() diff --git a/scripts/cli/test_interfaces_wireguard.py b/scripts/cli/test_interfaces_wireguard.py deleted file mode 100755 index 0c32a4696..000000000 --- a/scripts/cli/test_interfaces_wireguard.py +++ /dev/null @@ -1,68 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import os -import unittest - -from vyos.configsession import ConfigSession, ConfigSessionError -from base_interfaces_test import BasicInterfaceTest - -# Generate WireGuard default keypair -if not os.path.isdir('/config/auth/wireguard/default'): - os.system('sudo /usr/libexec/vyos/op_mode/wireguard.py --genkey') - -base_path = ['interfaces', 'wireguard'] - -class WireGuardInterfaceTest(unittest.TestCase): - def setUp(self): - self.session = ConfigSession(os.getpid()) - self._test_addr = ['192.0.2.1/26', '192.0.2.255/31', '192.0.2.64/32', - '2001:db8:1::ffff/64', '2001:db8:101::1/112'] - self._interfaces = ['wg0', 'wg1'] - - def tearDown(self): - self.session.delete(base_path) - self.session.commit() - del self.session - - def test_peer_setup(self): - """ - Create WireGuard interfaces with associated peers - """ - for intf in self._interfaces: - peer = 'foo-' + intf - psk = 'u2xdA70hkz0S1CG0dZlOh0aq2orwFXRIVrKo4DCvHgM=' - pubkey = 'n6ZZL7ph/QJUJSUUTyu19c77my1dRCDHkMzFQUO9Z3A=' - - for addr in self._test_addr: - self.session.set(base_path + [intf, 'address', addr]) - - self.session.set(base_path + [intf, 'peer', peer, 'address', '127.0.0.1']) - self.session.set(base_path + [intf, 'peer', peer, 'port', '1337']) - - # Allow different prefixes to traverse the tunnel - allowed_ips = ['10.0.0.0/8', '172.16.0.0/12', '192.168.0.0/16'] - for ip in allowed_ips: - self.session.set(base_path + [intf, 'peer', peer, 'allowed-ips', ip]) - - self.session.set(base_path + [intf, 'peer', peer, 'preshared-key', psk]) - self.session.set(base_path + [intf, 'peer', peer, 'pubkey', pubkey]) - self.session.commit() - - self.assertTrue(os.path.isdir(f'/sys/class/net/{intf}')) - -if __name__ == '__main__': - unittest.main() diff --git a/scripts/cli/test_interfaces_wireless.py b/scripts/cli/test_interfaces_wireless.py deleted file mode 100755 index fae233244..000000000 --- a/scripts/cli/test_interfaces_wireless.py +++ /dev/null @@ -1,58 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import os -import unittest - -from base_interfaces_test import BasicInterfaceTest -from psutil import process_iter -from vyos.util import check_kmod - -class WirelessInterfaceTest(BasicInterfaceTest.BaseTest): - def setUp(self): - super().setUp() - - self._base_path = ['interfaces', 'wireless'] - self._options = { - 'wlan0': ['physical-device phy0', 'ssid VyOS-WIFI-0', - 'type station', 'address 192.0.2.1/30'], - 'wlan1': ['physical-device phy0', 'ssid VyOS-WIFI-1', - 'type access-point', 'address 192.0.2.5/30', 'channel 0'], - 'wlan10': ['physical-device phy1', 'ssid VyOS-WIFI-2', - 'type station', 'address 192.0.2.9/30'], - 'wlan11': ['physical-device phy1', 'ssid VyOS-WIFI-3', - 'type access-point', 'address 192.0.2.13/30', 'channel 0'], - } - self._interfaces = list(self._options) - self.session.set(['system', 'wifi-regulatory-domain', 'SE']) - - def test_add_address_single(self): - """ derived method to check if member interfaces are enslaved properly """ - super().test_add_address_single() - - for option, option_value in self._options.items(): - if 'type access-point' in option_value: - # Check for running process - self.assertIn('hostapd', (p.name() for p in process_iter())) - elif 'type station' in option_value: - # Check for running process - self.assertIn('wpa_supplicant', (p.name() for p in process_iter())) - else: - self.assertTrue(False) - -if __name__ == '__main__': - check_kmod('mac80211_hwsim') - unittest.main() diff --git a/scripts/cli/test_interfaces_wirelessmodem.py b/scripts/cli/test_interfaces_wirelessmodem.py deleted file mode 100755 index 40cd03b93..000000000 --- a/scripts/cli/test_interfaces_wirelessmodem.py +++ /dev/null @@ -1,82 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import os -import unittest - -from psutil import process_iter -from vyos.configsession import ConfigSession, ConfigSessionError - -config_file = '/etc/ppp/peers/{}' -base_path = ['interfaces', 'wirelessmodem'] - -def get_config_value(interface, key): - with open(config_file.format(interface), 'r') as f: - for line in f: - if line.startswith(key): - return list(line.split()) - return [] - -class WWANInterfaceTest(unittest.TestCase): - def setUp(self): - self.session = ConfigSession(os.getpid()) - self._interfaces = ['wlm0', 'wlm1'] - - def tearDown(self): - self.session.delete(base_path) - self.session.commit() - del self.session - - def test_wlm_1(self): - for interface in self._interfaces: - self.session.set(base_path + [interface, 'no-peer-dns']) - self.session.set(base_path + [interface, 'ondemand']) - - # check validate() - APN must be configure - with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(base_path + [interface, 'apn', 'vyos.net']) - - # check validate() - device must be configure - with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(base_path + [interface, 'device', 'ttyS0']) - - # commit changes - self.session.commit() - - # verify configuration file(s) - for interface in self._interfaces: - tmp = get_config_value(interface, 'ifname')[1] - self.assertTrue(interface in tmp) - - tmp = get_config_value(interface, 'demand')[0] - self.assertTrue('demand' in tmp) - - tmp = os.path.isfile(f'/etc/ppp/peers/chat.{interface}') - self.assertTrue(tmp) - - # Check if ppp process is running in the interface in question - running = False - for p in process_iter(): - if "pppd" in p.name(): - if interface in p.cmdline(): - running = True - - self.assertTrue(running) - -if __name__ == '__main__': - unittest.main() diff --git a/scripts/cli/test_nat.py b/scripts/cli/test_nat.py deleted file mode 100755 index 416810e40..000000000 --- a/scripts/cli/test_nat.py +++ /dev/null @@ -1,63 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import os -import jmespath -import json -import unittest - -from vyos.configsession import ConfigSession, ConfigSessionError -from vyos.util import cmd - -base_path = ['nat'] -snat_pattern = 'nftables[?rule].rule[?chain].{chain: chain, comment: comment, address: { network: expr[].match.right.prefix.addr | [0], prefix: expr[].match.right.prefix.len | [0]}}' - -class TestNAT(unittest.TestCase): - def setUp(self): - # ensure we can also run this test on a live system - so lets clean - # out the current configuration :) - self.session = ConfigSession(os.getpid()) - self.session.delete(base_path) - - def tearDown(self): - self.session.delete(base_path) - self.session.commit() - - def test_source_nat(self): - """ Configure and validate source NAT rule(s) """ - - path = base_path + ['source'] - network = '192.168.0.0/16' - self.session.set(path + ['rule', '1', 'destination', 'address', network]) - self.session.set(path + ['rule', '1', 'exclude']) - - # check validate() - outbound-interface must be defined - with self.assertRaises(ConfigSessionError): - self.session.commit() - - self.session.set(path + ['rule', '1', 'outbound-interface', 'any']) - self.session.commit() - - tmp = cmd('sudo nft -j list table nat') - nftable_json = json.loads(tmp) - condensed_json = jmespath.search(snat_pattern, nftable_json)[0] - - self.assertEqual(condensed_json['comment'], 'DST-NAT-1') - self.assertEqual(condensed_json['address']['network'], network.split('/')[0]) - self.assertEqual(str(condensed_json['address']['prefix']), network.split('/')[1]) - -if __name__ == '__main__': - unittest.main() diff --git a/scripts/cli/test_service_bcast-relay.py b/scripts/cli/test_service_bcast-relay.py deleted file mode 100755 index fe4531c3b..000000000 --- a/scripts/cli/test_service_bcast-relay.py +++ /dev/null @@ -1,71 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2019-2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import os -import unittest - -from psutil import process_iter -from vyos.configsession import ConfigSession, ConfigSessionError - -base_path = ['service', 'broadcast-relay'] - -class TestServiceBroadcastRelay(unittest.TestCase): - _address1 = '192.0.2.1/24' - _address2 = '192.0.2.1/24' - - def setUp(self): - self.session = ConfigSession(os.getpid()) - self.session.set(['interfaces', 'dummy', 'dum1001', 'address', self._address1]) - self.session.set(['interfaces', 'dummy', 'dum1002', 'address', self._address2]) - self.session.commit() - - def tearDown(self): - self.session.delete(['interfaces', 'dummy', 'dum1001']) - self.session.delete(['interfaces', 'dummy', 'dum1002']) - self.session.delete(base_path) - self.session.commit() - del self.session - - def test_service(self): - """ Check if broadcast relay service can be configured and runs """ - ids = range(1, 5) - for id in ids: - base = base_path + ['id', str(id)] - self.session.set(base + ['description', 'vyos']) - self.session.set(base + ['port', str(10000 + id)]) - - # check validate() - two interfaces must be present - with self.assertRaises(ConfigSessionError): - self.session.commit() - - self.session.set(base + ['interface', 'dum1001']) - self.session.set(base + ['interface', 'dum1002']) - self.session.set(base + ['address', self._address1.split('/')[0]]) - - self.session.commit() - - for id in ids: - # check if process is running - running = False - for p in process_iter(): - if "udp-broadcast-relay" in p.name(): - if p.cmdline()[3] == str(id): - running = True - break - self.assertTrue(running) - -if __name__ == '__main__': - unittest.main() diff --git a/scripts/cli/test_service_dns_dynamic.py b/scripts/cli/test_service_dns_dynamic.py deleted file mode 100755 index be52360ed..000000000 --- a/scripts/cli/test_service_dns_dynamic.py +++ /dev/null @@ -1,141 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2019-2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import re -import os -import unittest - -from getpass import getuser -from psutil import process_iter -from vyos.configsession import ConfigSession, ConfigSessionError -from vyos.util import read_file - -DDCLIENT_CONF = '/run/ddclient/ddclient.conf' -base_path = ['service', 'dns', 'dynamic'] - -def get_config_value(key): - tmp = read_file(DDCLIENT_CONF) - tmp = re.findall(r'\n?{}=+(.*)'.format(key), tmp) - tmp = tmp[0].rstrip(',') - return tmp - -def check_process(): - """ - Check for running process, process name changes dynamically e.g. - "ddclient - sleeping for 270 seconds", thus we need a different approach - """ - running = False - for p in process_iter(): - if "ddclient" in p.name(): - running = True - return running - -class TestServiceDDNS(unittest.TestCase): - def setUp(self): - self.session = ConfigSession(os.getpid()) - - def tearDown(self): - # Delete DDNS configuration - self.session.delete(base_path) - self.session.commit() - - del self.session - - def test_service(self): - """ Check individual DDNS service providers """ - ddns = ['interface', 'eth0', 'service'] - services = ['cloudflare', 'afraid', 'dyndns', 'zoneedit'] - - for service in services: - user = 'vyos_user' - password = 'vyos_pass' - zone = 'vyos.io' - self.session.delete(base_path) - self.session.set(base_path + ddns + [service, 'host-name', 'test.ddns.vyos.io']) - self.session.set(base_path + ddns + [service, 'login', user]) - self.session.set(base_path + ddns + [service, 'password', password]) - self.session.set(base_path + ddns + [service, 'zone', zone]) - - # commit changes - if service == 'cloudflare': - self.session.commit() - else: - # zone option only works on cloudflare, an exception is raised - # for all others - with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.delete(base_path + ddns + [service, 'zone', 'vyos.io']) - # commit changes again - now it should work - self.session.commit() - - # we can only read the configuration file when we operate as 'root' - if getuser() == 'root': - protocol = get_config_value('protocol') - login = get_config_value('login') - pwd = get_config_value('password') - - # some services need special treatment - protoname = service - if service == 'cloudflare': - tmp = get_config_value('zone') - self.assertTrue(tmp == zone) - elif service == 'afraid': - protoname = 'freedns' - elif service == 'dyndns': - protoname = 'dyndns2' - elif service == 'zoneedit': - protoname = 'zoneedit1' - - self.assertTrue(protocol == protoname) - self.assertTrue(login == user) - self.assertTrue(pwd == "'" + password + "'") - - # Check for running process - self.assertTrue(check_process()) - - - def test_rfc2136(self): - """ Check if DDNS service can be configured and runs """ - ddns = ['interface', 'eth0', 'rfc2136', 'vyos'] - ddns_key_file = '/config/auth/my.key' - - self.session.set(base_path + ddns + ['key', ddns_key_file]) - self.session.set(base_path + ddns + ['record', 'test.ddns.vyos.io']) - self.session.set(base_path + ddns + ['server', 'ns1.vyos.io']) - self.session.set(base_path + ddns + ['ttl', '300']) - self.session.set(base_path + ddns + ['zone', 'vyos.io']) - - # ensure an exception will be raised as no key is present - if os.path.exists(ddns_key_file): - os.unlink(ddns_key_file) - - # check validate() - the key file does not exist yet - with self.assertRaises(ConfigSessionError): - self.session.commit() - - with open(ddns_key_file, 'w') as f: - f.write('S3cretKey') - - # commit changes - self.session.commit() - - # TODO: inspect generated configuration file - - # Check for running process - self.assertTrue(check_process()) - -if __name__ == '__main__': - unittest.main() diff --git a/scripts/cli/test_service_mdns-repeater.py b/scripts/cli/test_service_mdns-repeater.py deleted file mode 100755 index 18900b6d2..000000000 --- a/scripts/cli/test_service_mdns-repeater.py +++ /dev/null @@ -1,51 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import os -import unittest - -from psutil import process_iter -from vyos.configsession import ConfigSession - -base_path = ['service', 'mdns', 'repeater'] -intf_base = ['interfaces', 'dummy'] - -class TestServiceMDNSrepeater(unittest.TestCase): - def setUp(self): - self.session = ConfigSession(os.getpid()) - - def tearDown(self): - self.session.delete(base_path) - self.session.delete(intf_base + ['dum10']) - self.session.delete(intf_base + ['dum20']) - self.session.commit() - del self.session - - def test_service(self): - # Service required a configured IP address on the interface - - self.session.set(intf_base + ['dum10', 'address', '192.0.2.1/30']) - self.session.set(intf_base + ['dum20', 'address', '192.0.2.5/30']) - - self.session.set(base_path + ['interface', 'dum10']) - self.session.set(base_path + ['interface', 'dum20']) - self.session.commit() - - # Check for running process - self.assertTrue("mdns-repeater" in (p.name() for p in process_iter())) - -if __name__ == '__main__': - unittest.main() diff --git a/scripts/cli/test_service_pppoe-server.py b/scripts/cli/test_service_pppoe-server.py deleted file mode 100755 index 901ca792d..000000000 --- a/scripts/cli/test_service_pppoe-server.py +++ /dev/null @@ -1,167 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import re -import os -import unittest - -from configparser import ConfigParser -from psutil import process_iter -from vyos.configsession import ConfigSession -from vyos.configsession import ConfigSessionError - -base_path = ['service', 'pppoe-server'] -local_if = ['interfaces', 'dummy', 'dum667'] -pppoe_conf = '/run/accel-pppd/pppoe.conf' - -ac_name = 'ACN' -subnet = '172.18.0.0/24' -gateway = '192.0.2.1' -nameserver = '9.9.9.9' -mtu = '1492' -interface = 'eth0' - -class TestServicePPPoEServer(unittest.TestCase): - def setUp(self): - self.session = ConfigSession(os.getpid()) - # ensure we can also run this test on a live system - so lets clean - # out the current configuration :) - self.session.delete(base_path) - - def tearDown(self): - self.session.delete(base_path) - self.session.delete(local_if) - self.session.commit() - del self.session - - def verify(self, conf): - # validate some common values in the configuration - for tmp in ['log_syslog', 'pppoe', 'chap-secrets', 'ippool', 'ipv6pool', - 'ipv6_nd', 'ipv6_dhcp', 'auth_mschap_v2', 'auth_mschap_v1', - 'auth_chap_md5', 'auth_pap', 'shaper']: - # Settings without values provide None - self.assertEqual(conf['modules'][tmp], None) - - # check Access Concentrator setting - self.assertTrue(conf['pppoe']['ac-name'] == ac_name) - self.assertTrue(conf['pppoe'].getboolean('verbose')) - self.assertTrue(conf['pppoe']['interface'], interface) - - # check configured subnet - self.assertEqual(conf['ip-pool'][subnet], None) - self.assertEqual(conf['ip-pool']['gw-ip-address'], gateway) - - # check ppp - self.assertTrue(conf['ppp'].getboolean('verbose')) - self.assertTrue(conf['ppp'].getboolean('check-ip')) - self.assertEqual(conf['ppp']['min-mtu'], mtu) - self.assertEqual(conf['ppp']['mtu'], mtu) - self.assertEqual(conf['ppp']['lcp-echo-interval'], '30') - self.assertEqual(conf['ppp']['lcp-echo-timeout'], '0') - self.assertEqual(conf['ppp']['lcp-echo-failure'], '3') - - def basic_config(self): - self.session.set(local_if + ['address', '192.0.2.1/32']) - - self.session.set(base_path + ['access-concentrator', ac_name]) - self.session.set(base_path + ['authentication', 'mode', 'local']) - self.session.set(base_path + ['client-ip-pool', 'subnet', subnet]) - self.session.set(base_path + ['name-server', nameserver]) - self.session.set(base_path + ['interface', interface]) - self.session.set(base_path + ['local-ip', gateway]) - - def test_local_auth(self): - """ Test configuration of local authentication for PPPoE server """ - self.basic_config() - # authentication - self.session.set(base_path + ['authentication', 'local-users', 'username', 'vyos', 'password', 'vyos']) - self.session.set(base_path + ['authentication', 'mode', 'local']) - # other settings - self.session.set(base_path + ['ppp-options', 'ccp']) - self.session.set(base_path + ['ppp-options', 'mppe', 'require']) - self.session.set(base_path + ['limits', 'connection-limit', '20/min']) - - # commit changes - self.session.commit() - - # Validate configuration values - conf = ConfigParser(allow_no_value=True) - conf.read(pppoe_conf) - - # basic verification - self.verify(conf) - - # check auth - self.assertEqual(conf['chap-secrets']['chap-secrets'], '/run/accel-pppd/pppoe.chap-secrets') - self.assertEqual(conf['chap-secrets']['gw-ip-address'], gateway) - - # check pado - self.assertEqual(conf['ppp']['mppe'], 'require') - self.assertTrue(conf['ppp'].getboolean('ccp')) - - # check other settings - self.assertEqual(conf['connlimit']['limit'], '20/min') - - # Check for running process - self.assertTrue('accel-pppd' in (p.name() for p in process_iter())) - - def test_radius_auth(self): - """ Test configuration of RADIUS authentication for PPPoE server """ - radius_server = '192.0.2.22' - radius_key = 'secretVyOS' - radius_port = '2000' - radius_port_acc = '3000' - - self.basic_config() - self.session.set(base_path + ['authentication', 'radius', 'server', radius_server, 'key', radius_key]) - self.session.set(base_path + ['authentication', 'radius', 'server', radius_server, 'port', radius_port]) - self.session.set(base_path + ['authentication', 'radius', 'server', radius_server, 'acct-port', radius_port_acc]) - self.session.set(base_path + ['authentication', 'mode', 'radius']) - - # commit changes - self.session.commit() - - # Validate configuration values - conf = ConfigParser(allow_no_value=True) - conf.read(pppoe_conf) - - # basic verification - self.verify(conf) - - # check auth - self.assertTrue(conf['radius'].getboolean('verbose')) - self.assertTrue(conf['radius']['acct-timeout'], '3') - self.assertTrue(conf['radius']['timeout'], '3') - self.assertTrue(conf['radius']['max-try'], '3') - self.assertTrue(conf['radius']['gw-ip-address'], gateway) - - server = conf['radius']['server'].split(',') - self.assertEqual(radius_server, server[0]) - self.assertEqual(radius_key, server[1]) - self.assertEqual(f'auth-port={radius_port}', server[2]) - self.assertEqual(f'acct-port={radius_port_acc}', server[3]) - self.assertEqual(f'req-limit=0', server[4]) - self.assertEqual(f'fail-time=0', server[5]) - - # check defaults - self.assertEqual(conf['ppp']['mppe'], 'prefer') - self.assertFalse(conf['ppp'].getboolean('ccp')) - - # Check for running process - self.assertTrue('accel-pppd' in (p.name() for p in process_iter())) - -if __name__ == '__main__': - unittest.main() diff --git a/scripts/cli/test_service_router-advert.py b/scripts/cli/test_service_router-advert.py deleted file mode 100755 index ec2110c8a..000000000 --- a/scripts/cli/test_service_router-advert.py +++ /dev/null @@ -1,99 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2019-2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import re -import os -import unittest - -from psutil import process_iter -from vyos.configsession import ConfigSession -from vyos.util import read_file - -RADVD_CONF = '/run/radvd/radvd.conf' - -interface = 'eth1' -base_path = ['service', 'router-advert', 'interface', interface] -address_base = ['interfaces', 'ethernet', interface, 'address'] - -def get_config_value(key): - tmp = read_file(RADVD_CONF) - tmp = re.findall(r'\n?{}\s+(.*)'.format(key), tmp) - return tmp[0].split()[0].replace(';','') - -class TestServiceRADVD(unittest.TestCase): - def setUp(self): - self.session = ConfigSession(os.getpid()) - self.session.set(address_base + ['2001:db8::1/64']) - - def tearDown(self): - self.session.delete(address_base) - self.session.delete(base_path) - self.session.commit() - del self.session - - def test_single(self): - self.session.set(base_path + ['prefix', '::/64', 'no-on-link-flag']) - self.session.set(base_path + ['prefix', '::/64', 'no-autonomous-flag']) - self.session.set(base_path + ['prefix', '::/64', 'valid-lifetime', 'infinity']) - self.session.set(base_path + ['dnssl', '2001:db8::1234']) - self.session.set(base_path + ['other-config-flag']) - - # commit changes - self.session.commit() - - # verify values - tmp = get_config_value('interface') - self.assertEqual(tmp, interface) - - tmp = get_config_value('prefix') - self.assertEqual(tmp, '::/64') - - tmp = get_config_value('AdvOtherConfigFlag') - self.assertEqual(tmp, 'on') - - # this is a default value - tmp = get_config_value('AdvRetransTimer') - self.assertEqual(tmp, '0') - - # this is a default value - tmp = get_config_value('AdvCurHopLimit') - self.assertEqual(tmp, '64') - - # this is a default value - tmp = get_config_value('AdvDefaultPreference') - self.assertEqual(tmp, 'medium') - - tmp = get_config_value('AdvAutonomous') - self.assertEqual(tmp, 'off') - - # this is a default value - tmp = get_config_value('AdvValidLifetime') - self.assertEqual(tmp, 'infinity') - - # this is a default value - tmp = get_config_value('AdvPreferredLifetime') - self.assertEqual(tmp, '14400') - - tmp = get_config_value('AdvOnLink') - self.assertEqual(tmp, 'off') - - - - # Check for running process - self.assertTrue('radvd' in (p.name() for p in process_iter())) - -if __name__ == '__main__': - unittest.main() diff --git a/scripts/cli/test_service_snmp.py b/scripts/cli/test_service_snmp.py deleted file mode 100755 index fb5f5393f..000000000 --- a/scripts/cli/test_service_snmp.py +++ /dev/null @@ -1,155 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2019-2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import os -import re -import unittest - -from vyos.validate import is_ipv4 -from psutil import process_iter - -from vyos.configsession import ConfigSession, ConfigSessionError -from vyos.util import read_file - -SNMPD_CONF = '/etc/snmp/snmpd.conf' -base_path = ['service', 'snmp'] - -def get_config_value(key): - tmp = read_file(SNMPD_CONF) - tmp = re.findall(r'\n?{}\s+(.*)'.format(key), tmp) - return tmp[0] - -class TestSNMPService(unittest.TestCase): - def setUp(self): - self.session = ConfigSession(os.getpid()) - # ensure we can also run this test on a live system - so lets clean - # out the current configuration :) - self.session.delete(base_path) - - def tearDown(self): - del self.session - - def test_snmp(self): - """ Check if SNMP can be configured and service runs """ - clients = ['192.0.2.1', '2001:db8::1'] - networks = ['192.0.2.128/25', '2001:db8:babe::/48'] - listen = ['127.0.0.1', '::1'] - - for auth in ['ro', 'rw']: - community = 'VyOS' + auth - self.session.set(base_path + ['community', community, 'authorization', auth]) - for client in clients: - self.session.set(base_path + ['community', community, 'client', client]) - for network in networks: - self.session.set(base_path + ['community', community, 'network', network]) - - for addr in listen: - self.session.set(base_path + ['listen-address', addr]) - - self.session.set(base_path + ['contact', 'maintainers@vyos.io']) - self.session.set(base_path + ['location', 'qemu']) - - self.session.commit() - - # verify listen address, it will be returned as - # ['unix:/run/snmpd.socket,udp:127.0.0.1:161,udp6:[::1]:161'] - # thus we need to transfor this into a proper list - config = get_config_value('agentaddress') - expected = 'unix:/run/snmpd.socket' - for addr in listen: - if is_ipv4(addr): - expected += ',udp:{}:161'.format(addr) - else: - expected += ',udp6:[{}]:161'.format(addr) - - self.assertTrue(expected in config) - - # Check for running process - self.assertTrue("snmpd" in (p.name() for p in process_iter())) - - - def test_snmpv3_sha(self): - """ Check if SNMPv3 can be configured with SHA authentication and service runs""" - - self.session.set(base_path + ['v3', 'engineid', '000000000000000000000002']) - self.session.set(base_path + ['v3', 'group', 'default', 'mode', 'ro']) - # check validate() - a view must be created before this can be comitted - with self.assertRaises(ConfigSessionError): - self.session.commit() - - self.session.set(base_path + ['v3', 'view', 'default', 'oid', '1']) - self.session.set(base_path + ['v3', 'group', 'default', 'view', 'default']) - - # create user - self.session.set(base_path + ['v3', 'user', 'vyos', 'auth', 'plaintext-password', 'vyos12345678']) - self.session.set(base_path + ['v3', 'user', 'vyos', 'auth', 'type', 'sha']) - self.session.set(base_path + ['v3', 'user', 'vyos', 'privacy', 'plaintext-password', 'vyos12345678']) - self.session.set(base_path + ['v3', 'user', 'vyos', 'privacy', 'type', 'aes']) - self.session.set(base_path + ['v3', 'user', 'vyos', 'group', 'default']) - - self.session.commit() - - # commit will alter the CLI values - check if they have been updated: - hashed_password = '4e52fe55fd011c9c51ae2c65f4b78ca93dcafdfe' - tmp = self.session.show_config(base_path + ['v3', 'user', 'vyos', 'auth', 'encrypted-password']).split()[1] - self.assertEqual(tmp, hashed_password) - - tmp = self.session.show_config(base_path + ['v3', 'user', 'vyos', 'privacy', 'encrypted-password']).split()[1] - self.assertEqual(tmp, hashed_password) - - # TODO: read in config file and check values - - # Check for running process - self.assertTrue("snmpd" in (p.name() for p in process_iter())) - - def test_snmpv3_md5(self): - """ Check if SNMPv3 can be configured with MD5 authentication and service runs""" - - self.session.set(base_path + ['v3', 'engineid', '000000000000000000000002']) - self.session.set(base_path + ['v3', 'group', 'default', 'mode', 'ro']) - # check validate() - a view must be created before this can be comitted - with self.assertRaises(ConfigSessionError): - self.session.commit() - - self.session.set(base_path + ['v3', 'view', 'default', 'oid', '1']) - self.session.set(base_path + ['v3', 'group', 'default', 'view', 'default']) - - # create user - self.session.set(base_path + ['v3', 'user', 'vyos', 'auth', 'plaintext-password', 'vyos12345678']) - self.session.set(base_path + ['v3', 'user', 'vyos', 'auth', 'type', 'md5']) - self.session.set(base_path + ['v3', 'user', 'vyos', 'privacy', 'plaintext-password', 'vyos12345678']) - self.session.set(base_path + ['v3', 'user', 'vyos', 'privacy', 'type', 'des']) - self.session.set(base_path + ['v3', 'user', 'vyos', 'group', 'default']) - - self.session.commit() - - # commit will alter the CLI values - check if they have been updated: - hashed_password = '4c67690d45d3dfcd33d0d7e308e370ad' - tmp = self.session.show_config(base_path + ['v3', 'user', 'vyos', 'auth', 'encrypted-password']).split()[1] - self.assertEqual(tmp, hashed_password) - - tmp = self.session.show_config(base_path + ['v3', 'user', 'vyos', 'privacy', 'encrypted-password']).split()[1] - self.assertEqual(tmp, hashed_password) - - # TODO: read in config file and check values - - # Check for running process - self.assertTrue("snmpd" in (p.name() for p in process_iter())) - - -if __name__ == '__main__': - unittest.main() - diff --git a/scripts/cli/test_service_ssh.py b/scripts/cli/test_service_ssh.py deleted file mode 100755 index 3ee498f3d..000000000 --- a/scripts/cli/test_service_ssh.py +++ /dev/null @@ -1,116 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2019-2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import re -import os -import unittest - -from psutil import process_iter -from vyos.configsession import ConfigSession, ConfigSessionError -from vyos.util import read_file - -SSHD_CONF = '/run/ssh/sshd_config' -base_path = ['service', 'ssh'] - -def get_config_value(key): - tmp = read_file(SSHD_CONF) - tmp = re.findall(r'\n?{}\s+(.*)'.format(key), tmp) - return tmp - -class TestServiceSSH(unittest.TestCase): - def setUp(self): - self.session = ConfigSession(os.getpid()) - # ensure we can also run this test on a live system - so lets clean - # out the current configuration :) - self.session.delete(base_path) - - def tearDown(self): - # delete testing SSH config - self.session.delete(base_path) - # restore "plain" SSH access - self.session.set(base_path) - - self.session.commit() - del self.session - - def test_ssh_single(self): - """ Check if SSH service can be configured and runs """ - self.session.set(base_path + ['port', '1234']) - self.session.set(base_path + ['disable-host-validation']) - self.session.set(base_path + ['disable-password-authentication']) - self.session.set(base_path + ['loglevel', 'verbose']) - self.session.set(base_path + ['client-keepalive-interval', '100']) - self.session.set(base_path + ['listen-address', '127.0.0.1']) - - # commit changes - self.session.commit() - - # Check configured port - port = get_config_value('Port')[0] - self.assertTrue("1234" in port) - - # Check DNS usage - dns = get_config_value('UseDNS')[0] - self.assertTrue("no" in dns) - - # Check PasswordAuthentication - pwd = get_config_value('PasswordAuthentication')[0] - self.assertTrue("no" in pwd) - - # Check loglevel - loglevel = get_config_value('LogLevel')[0] - self.assertTrue("VERBOSE" in loglevel) - - # Check listen address - address = get_config_value('ListenAddress')[0] - self.assertTrue("127.0.0.1" in address) - - # Check keepalive - keepalive = get_config_value('ClientAliveInterval')[0] - self.assertTrue("100" in keepalive) - - # Check for running process - self.assertTrue("sshd" in (p.name() for p in process_iter())) - - def test_ssh_multi(self): - """ Check if SSH service can be configured and runs with multiple - listen ports and listen-addresses """ - ports = ['22', '2222'] - for port in ports: - self.session.set(base_path + ['port', port]) - - addresses = ['127.0.0.1', '::1'] - for address in addresses: - self.session.set(base_path + ['listen-address', address]) - - # commit changes - self.session.commit() - - # Check configured port - tmp = get_config_value('Port') - for port in ports: - self.assertIn(port, tmp) - - # Check listen address - tmp = get_config_value('ListenAddress') - for address in addresses: - self.assertIn(address, tmp) - - # Check for running process - self.assertTrue("sshd" in (p.name() for p in process_iter())) - -if __name__ == '__main__': - unittest.main() diff --git a/scripts/cli/test_system_lcd.py b/scripts/cli/test_system_lcd.py deleted file mode 100755 index 931a91c53..000000000 --- a/scripts/cli/test_system_lcd.py +++ /dev/null @@ -1,54 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2020 Francois Mertz fireboxled@gmail.com -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import os -import unittest - -from configparser import ConfigParser -from psutil import process_iter -from vyos.configsession import ConfigSession - -base_path = ['system', 'lcd'] - -class TestSystemLCD(unittest.TestCase): - def setUp(self): - self.session = ConfigSession(os.getpid()) - - def tearDown(self): - self.session.delete(base_path) - self.session.commit() - del self.session - - def test_system_display(self): - # configure some system display - self.session.set(base_path + ['device', 'ttyS1']) - self.session.set(base_path + ['model', 'cfa-533']) - - # commit changes - self.session.commit() - - # load up ini-styled LCDd.conf - conf = ConfigParser() - conf.read('/run/LCDd/LCDd.conf') - - self.assertEqual(conf['CFontzPacket']['Model'], '533') - self.assertEqual(conf['CFontzPacket']['Device'], '/dev/ttyS1') - - # both processes running - self.assertTrue('LCDd' in (p.name() for p in process_iter())) - -if __name__ == '__main__': - unittest.main() diff --git a/scripts/cli/test_system_login.py b/scripts/cli/test_system_login.py deleted file mode 100755 index 3c4b1fa28..000000000 --- a/scripts/cli/test_system_login.py +++ /dev/null @@ -1,67 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2019-2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import os -import re -import unittest - -from subprocess import Popen, PIPE -from vyos.configsession import ConfigSession, ConfigSessionError -import vyos.util as util - -base_path = ['system', 'login'] -users = ['vyos1', 'vyos2'] - -class TestSystemLogin(unittest.TestCase): - def setUp(self): - self.session = ConfigSession(os.getpid()) - - def tearDown(self): - # Delete individual users from configuration - for user in users: - self.session.delete(base_path + ['user', user]) - - self.session.commit() - del self.session - - def test_user(self): - """ Check if user can be created and we can SSH to localhost """ - self.session.set(['service', 'ssh', 'port', '22']) - - for user in users: - name = "VyOS Roxx " + user - home_dir = "/tmp/" + user - - self.session.set(base_path + ['user', user, 'authentication', 'plaintext-password', user]) - self.session.set(base_path + ['user', user, 'full-name', 'VyOS Roxx']) - self.session.set(base_path + ['user', user, 'home-directory', home_dir]) - - self.session.commit() - - for user in users: - cmd = ['su','-', user] - proc = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE) - tmp = "{}\nuname -a".format(user) - proc.stdin.write(tmp.encode()) - proc.stdin.flush() - (stdout, stderr) = proc.communicate() - - # stdout is something like this: - # b'Linux vyos 4.19.101-amd64-vyos #1 SMP Sun Feb 2 10:18:07 UTC 2020 x86_64 GNU/Linux\n' - self.assertTrue(len(stdout) > 40) - -if __name__ == '__main__': - unittest.main() diff --git a/scripts/cli/test_system_nameserver.py b/scripts/cli/test_system_nameserver.py deleted file mode 100755 index 9040be072..000000000 --- a/scripts/cli/test_system_nameserver.py +++ /dev/null @@ -1,66 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2019-2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import os -import re -import unittest - -from vyos.configsession import ConfigSession, ConfigSessionError -import vyos.util as util - -RESOLV_CONF = '/etc/resolv.conf' - -test_servers = ['192.0.2.10', '2001:db8:1::100'] -base_path = ['system', 'name-server'] - -def get_name_servers(): - resolv_conf = util.read_file(RESOLV_CONF) - return re.findall(r'\n?nameserver\s+(.*)', resolv_conf) - -class TestSystemNameServer(unittest.TestCase): - def setUp(self): - self.session = ConfigSession(os.getpid()) - - def tearDown(self): - # Delete existing name servers - self.session.delete(base_path) - self.session.commit() - - del self.session - - def test_add_server(self): - """ Check if server is added to resolv.conf """ - for s in test_servers: - self.session.set(base_path + [s]) - self.session.commit() - - servers = get_name_servers() - for s in servers: - self.assertTrue(s in servers) - - def test_delete_server(self): - """ Test if a deleted server disappears from resolv.conf """ - for s in test_servers: - self.session.delete(base_path + [s]) - self.session.commit() - - servers = get_name_servers() - for s in servers: - self.assertTrue(test_server_1 not in servers) - -if __name__ == '__main__': - unittest.main() - diff --git a/scripts/cli/test_system_ntp.py b/scripts/cli/test_system_ntp.py deleted file mode 100755 index 856a28916..000000000 --- a/scripts/cli/test_system_ntp.py +++ /dev/null @@ -1,108 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2019-2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import re -import os -import unittest - -from psutil import process_iter -from vyos.configsession import ConfigSession, ConfigSessionError -from vyos.template import vyos_address_from_cidr, vyos_netmask_from_cidr -from vyos.util import read_file - -NTP_CONF = '/etc/ntp.conf' -base_path = ['system', 'ntp'] - -def get_config_value(key): - tmp = read_file(NTP_CONF) - tmp = re.findall(r'\n?{}\s+(.*)'.format(key), tmp) - # remove possible trailing whitespaces - return [item.strip() for item in tmp] - -class TestSystemNTP(unittest.TestCase): - def setUp(self): - self.session = ConfigSession(os.getpid()) - # ensure we can also run this test on a live system - so lets clean - # out the current configuration :) - self.session.delete(base_path) - - def tearDown(self): - self.session.delete(base_path) - self.session.commit() - del self.session - - def test_ntp_options(self): - """ Test basic NTP support with multiple servers and their options """ - servers = ['192.0.2.1', '192.0.2.2'] - options = ['noselect', 'preempt', 'prefer'] - - for server in servers: - for option in options: - self.session.set(base_path + ['server', server, option]) - - # commit changes - self.session.commit() - - # Check generated configuration - tmp = get_config_value('server') - for server in servers: - test = f'{server} iburst ' + ' '.join(options) - self.assertTrue(test in tmp) - - # Check for running process - self.assertTrue("ntpd" in (p.name() for p in process_iter())) - - def test_ntp_clients(self): - """ Test the allowed-networks statement """ - listen_address = ['127.0.0.1', '::1'] - for listen in listen_address: - self.session.set(base_path + ['listen-address', listen]) - - networks = ['192.0.2.0/24', '2001:db8:1000::/64'] - for network in networks: - self.session.set(base_path + ['allow-clients', 'address', network]) - - # Verify "NTP server not configured" verify() statement - with self.assertRaises(ConfigSessionError): - self.session.commit() - - servers = ['192.0.2.1', '192.0.2.2'] - for server in servers: - self.session.set(base_path + ['server', server]) - - self.session.commit() - - # Check generated client address configuration - for network in networks: - network_address = vyos_address_from_cidr(network) - network_netmask = vyos_netmask_from_cidr(network) - - tmp = get_config_value(f'restrict {network_address}')[0] - test = f'mask {network_netmask} nomodify notrap nopeer' - self.assertTrue(tmp in test) - - # Check listen address - tmp = get_config_value('interface') - test = ['ignore wildcard'] - for listen in listen_address: - test.append(f'listen {listen}') - self.assertEqual(tmp, test) - - # Check for running process - self.assertTrue("ntpd" in (p.name() for p in process_iter())) - -if __name__ == '__main__': - unittest.main() diff --git a/scripts/cli/test_vpn_anyconnect.py b/scripts/cli/test_vpn_anyconnect.py deleted file mode 100755 index dd8ab1609..000000000 --- a/scripts/cli/test_vpn_anyconnect.py +++ /dev/null @@ -1,58 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import re -import os -import unittest - -from psutil import process_iter -from vyos.configsession import ConfigSession, ConfigSessionError -from vyos.util import read_file - -OCSERV_CONF = '/run/ocserv/ocserv.conf' -base_path = ['vpn', 'anyconnect'] -cert = '/etc/ssl/certs/ssl-cert-snakeoil.pem' -cert_key = '/etc/ssl/private/ssl-cert-snakeoil.key' - -class TestVpnAnyconnect(unittest.TestCase): - def setUp(self): - self.session = ConfigSession(os.getpid()) - - def tearDown(self): - # Delete vpn anyconnect configuration - self.session.delete(base_path) - self.session.commit() - - del self.session - - def test_vpn(self): - user = 'vyos_user' - password = 'vyos_pass' - self.session.delete(base_path) - self.session.set(base_path + ["authentication", "local-users", "username", user, "password", password]) - self.session.set(base_path + ["authentication", "mode", "local"]) - self.session.set(base_path + ["network-settings", "client-ip-settings", "subnet", "192.0.2.0/24"]) - self.session.set(base_path + ["ssl", "ca-cert-file", cert]) - self.session.set(base_path + ["ssl", "cert-file", cert]) - self.session.set(base_path + ["ssl", "key-file", cert_key]) - - self.session.commit() - - # Check for running process - self.assertTrue("ocserv-main" in (p.name() for p in process_iter())) - -if __name__ == '__main__': - unittest.main() diff --git a/scripts/cli/test_vrf.py b/scripts/cli/test_vrf.py deleted file mode 100755 index efa095b30..000000000 --- a/scripts/cli/test_vrf.py +++ /dev/null @@ -1,52 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import os -import unittest - -from vyos.configsession import ConfigSession, ConfigSessionError -from vyos.util import read_file - -class VRFTest(unittest.TestCase): - def setUp(self): - self.session = ConfigSession(os.getpid()) - self._vrfs = ['red', 'green', 'blue'] - - def tearDown(self): - # delete all VRFs - self.session.delete(['vrf']) - self.session.commit() - del self.session - - def test_table_id(self): - table = 1000 - for vrf in self._vrfs: - base = ['vrf', 'name', vrf] - description = "VyOS-VRF-" + vrf - self.session.set(base + ['description', description]) - - # check validate() - a table ID is mandatory - with self.assertRaises(ConfigSessionError): - self.session.commit() - - self.session.set(base + ['table', str(table)]) - table += 1 - - # commit changes - self.session.commit() - -if __name__ == '__main__': - unittest.main() diff --git a/scripts/import-conf-mode-commands b/scripts/import-conf-mode-commands new file mode 100755 index 000000000..996b31c9c --- /dev/null +++ b/scripts/import-conf-mode-commands @@ -0,0 +1,255 @@ +#!/usr/bin/env python3 +# +# build-command-template: converts old style commands definitions to XML +# +# Copyright (C) 2019 VyOS maintainers <maintainers@vyos.net> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 +# USA + + +import os +import re +import sys + +from lxml import etree + + +# Node types +NODE = 0 +LEAF_NODE = 1 +TAG_NODE = 2 + +def parse_command_data(t): + regs = { + 'help': r'\bhelp:(.*)(?:\n|$)', + 'priority': r'\bpriority:(.*)(?:\n|$)', + 'type': r'\btype:(.*)(?:\n|$)', + 'syntax_expression_var': r'\bsyntax:expression: \$VAR\(\@\) in (.*)' + } + + data = {'multi': False, 'help': ""} + + for r in regs: + try: + data[r] = re.search(regs[r], t).group(1).strip() + except: + data[r] = None + + # val_help is special: there can be multiple instances + val_help_strings = re.findall(r'\bval_help:(.*)(?:\n|$)', t) + val_help = [] + for v in val_help_strings: + try: + fmt, msg = re.match(r'\s*(.*)\s*;\s*(.*)\s*(?:\n|$)', v).groups() + except: + fmt = "<text>" + msg = v + val_help.append((fmt, msg)) + data['val_help'] = val_help + + # multi is on/off + if re.match(r'\bmulti:', t): + data['multi'] = True + + return(data) + +def walk(tree, base_path, name): + path = os.path.join(base_path, name) + + contents = os.listdir(path) + + # Determine node type and create XML element for the node + # Tag node dirs will always have 'node.tag' subdir and 'node.def' file + # Leaf node dirs have nothing but a 'node.def' file + # Everything that doesn't match either of these patterns is a normal node + if 'node.tag' in contents: + print("Creating a tag node from {0}".format(path)) + elem = etree.Element('tagNode') + node_type = TAG_NODE + elif contents == ['node.def']: + print("Creating a leaf node from {0}".format(path)) + elem = etree.Element('leafNode') + node_type = LEAF_NODE + else: + print("Creating a node from {0}".format(path)) + elem = etree.Element('node') + node_type = NODE + + # Read and parse the command definition data (the 'node.def' file) + with open(os.path.join(path, 'node.def'), 'r') as f: + node_def = f.read() + data = parse_command_data(node_def) + + # Import the data into the properties element + props_elem = etree.Element('properties') + + if data['priority']: + # Priority values sometimes come with comments that explain the value choice + try: + prio, prio_comment = re.match(r'\s*(\d+)\s*#(.*)', data['priority']).groups() + except: + prio = data['priority'].strip() + prio_comment = None + prio_elem = etree.Element('priority') + prio_elem.text = prio + props_elem.append(prio_elem) + if prio_comment: + prio_comment_elem = etree.Comment(prio_comment) + props_elem.append(prio_comment_elem) + + if data['multi']: + multi_elem = etree.Element('multi') + props_elem.append(multi_elem) + + if data['help']: + help_elem = etree.Element('help') + help_elem.text = data['help'] + props_elem.append(help_elem) + + # For leaf nodes, absense of a type: tag means they take no values + # For any other nodes, it doesn't mean anything + if not data['type'] and (node_type == LEAF_NODE): + valueless = etree.Element('valueless') + props_elem.append(valueless) + + # There can be only one constraint element in the definition + # Create it now, we'll modify it in the next two cases, then append + constraint_elem = etree.Element('constraint') + has_constraint = False + + # Add regexp field for multiple options + if data['syntax_expression_var']: + regex = etree.Element('regex') + constraint_error=etree.Element('constraintErrorMessage') + values = re.search(r'(.+) ; (.+)', data['syntax_expression_var']).group(1) + message = re.search(r'(.+) ; (.+)', data['syntax_expression_var']).group(2) + values = re.findall(r'\"(.+?)\"', values) + regex.text = '|'.join(values) + constraint_error.text = re.sub('\".*?VAR.*?\"', '', message) + constraint_error.text = re.sub(r'[\"|\\]', '', message) + constraint_elem.append(regex) + props_elem.append(constraint_elem) + props_elem.append(constraint_error) + + if data['val_help']: + for vh in data['val_help']: + vh_elem = etree.Element('valueHelp') + + vh_fmt_elem = etree.Element('format') + # Many commands use special "u32:<start>-<end>" format for ranges + if re.match(r'u32:', vh[0]): + vh_fmt = re.match(r'u32:(.*)', vh[0]).group(1).strip() + + # If valid range of values is specified in val_help, we can automatically + # create a constraint for it + # Extracting it from syntax:expression: would be much more complicated + vh_validator = etree.Element('validator') + vh_validator.set("name", "numeric") + vh_validator.set("argument", "--range {0}".format(vh_fmt)) + constraint_elem.append(vh_validator) + has_constraint = True + else: + vh_fmt = vh[0] + vh_fmt_elem.text = vh_fmt + + vh_help_elem = etree.Element('description') + vh_help_elem.text = vh[1] + + vh_elem.append(vh_fmt_elem) + vh_elem.append(vh_help_elem) + props_elem.append(vh_elem) + + # Translate the "type:" to the new validator system + if data['type']: + t = data['type'] + if t == 'txt': + # Can't infer anything from the generic "txt" type + pass + else: + validator = etree.Element('validator') + if t == 'u32': + validator.set('name', 'numeric') + validator.set('argument', '--non-negative') + elif t == 'ipv4': + validator.set('name', 'ipv4-address') + elif t == 'ipv4net': + validator.set('name', 'ipv4-prefix') + elif t == 'ipv6': + validator.set('name', 'ipv6-address') + elif t == 'ipv6net': + validator.set('name', 'ipv6-prefix') + elif t == 'macaddr': + validator.set('name', 'mac-address') + else: + print("Warning: unsupported type \'{0}\'".format(t)) + validator = None + + if (validator is not None) and (not has_constraint): + # If has_constraint is true, it means a more specific validator + # was already extracted from another option + constraint_elem.append(validator) + has_constraint = True + + if has_constraint: + props_elem.append(constraint_elem) + + elem.append(props_elem) + + elem.set("name", name) + + if node_type != LEAF_NODE: + children = etree.Element('children') + + # Create the next level dir path, + # accounting for the "virtual" node.tag subdir for tag nodes + next_level = path + if node_type == TAG_NODE: + next_level = os.path.join(path, 'node.tag') + + # Walk the subdirs of the next level + for d in os.listdir(next_level): + dp = os.path.join(next_level, d) + if os.path.isdir(dp): + walk(children, next_level, d) + + elem.append(children) + + tree.append(elem) + +if __name__ == '__main__': + if len(sys.argv) < 2: + print("Usage: {0} <base path>".format(sys.argv[0])) + sys.exit(1) + else: + base_path = sys.argv[1] + + root = etree.Element('interfaceDefinition') + contents = os.listdir(base_path) + elem = etree.Element('node') + elem.set('name', os.path.basename(base_path)) + children = etree.Element('children') + + for c in contents: + path = os.path.join(base_path, c) + if os.path.isdir(path): + walk(children, base_path, c) + + elem.append(children) + root.append(elem) + + xml_data = etree.tostring(root, pretty_print=True).decode() + with open('output.xml', 'w') as f: + f.write(xml_data) diff --git a/scripts/system/test_module_load.py b/scripts/system/test_module_load.py deleted file mode 100755 index 59c3e0b6a..000000000 --- a/scripts/system/test_module_load.py +++ /dev/null @@ -1,42 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2019-2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import os -import unittest - -modules = { - "intel": ["e1000", "e1000e", "igb", "ixgb", "ixgbe", "ixgbevf", "i40e", "i40evf", "iavf"], - "accel_ppp": ["ipoe", "vlan_mon"], - "misc": ["wireguard"] -} - -class TestKernelModules(unittest.TestCase): - def test_load_modules(self): - success = True - for msk in modules: - ms = modules[msk] - for m in ms: - # We want to uncover all modules that fail, - # not fail at the first one - try: - os.system("modprobe {0}".format(m)) - except: - success = False - - self.assertTrue(success) - -if __name__ == '__main__': - unittest.main() |