diff options
Diffstat (limited to 'scripts')
35 files changed, 833 insertions, 2598 deletions
| diff --git a/scripts/build-command-op-templates b/scripts/build-command-op-templates new file mode 100755 index 000000000..c60b32a1e --- /dev/null +++ b/scripts/build-command-op-templates @@ -0,0 +1,225 @@ +#!/usr/bin/env python3 +# +#    build-command-template: converts new style command definitions in XML +#      to the old style (bunch of dirs and node.def's) command templates +# +#    Copyright (C) 2017 VyOS maintainers <maintainers@vyos.net> +# +#    This library is free software; you can redistribute it and/or +#    modify it under the terms of the GNU Lesser General Public +#    License as published by the Free Software Foundation; either +#    version 2.1 of the License, or (at your option) any later version. +# +#    This library is distributed in the hope that it will be useful, +#    but WITHOUT ANY WARRANTY; without even the implied warranty of +#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +#    Lesser General Public License for more details. +# +#    You should have received a copy of the GNU Lesser General Public +#    License along with this library; if not, write to the Free Software +#    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 +#    USA + +import sys +import os +import argparse +import copy +import functools + +from lxml import etree as ET + +# Defaults + +validator_dir = "/opt/vyatta/libexec/validators" +default_constraint_err_msg = "Invalid value" + + +## Get arguments + +parser = argparse.ArgumentParser(description='Converts new-style XML interface definitions to old-style command templates') +parser.add_argument('--debug', help='Enable debug information output', action='store_true') +parser.add_argument('INPUT_FILE', type=str, help="XML interface definition file") +parser.add_argument('SCHEMA_FILE', type=str, help="RelaxNG schema file") +parser.add_argument('OUTPUT_DIR', type=str, help="Output directory") + +args = parser.parse_args() + +input_file = args.INPUT_FILE +schema_file = args.SCHEMA_FILE +output_dir = args.OUTPUT_DIR +debug = args.debug + +## Load and validate the inputs + +try: +    xml = ET.parse(input_file) +except Exception as e: +    print("Failed to load interface definition file {0}".format(input_file)) +    print(e) +    sys.exit(1) + +try: +    relaxng_xml = ET.parse(schema_file) +    validator = ET.RelaxNG(relaxng_xml) + +    if not validator.validate(xml): +        print(validator.error_log) +        print("Interface definition file {0} does not match the schema!".format(input_file)) +        sys.exit(1) +except Exception as e: +    print("Failed to load the XML schema {0}".format(schema_file)) +    print(e) +    sys.exit(1) + +if not os.access(output_dir, os.W_OK): +    print("The output directory {0} is not writeable".format(output_dir)) +    sys.exit(1) + +## If we got this far, everything must be ok and we can convert the file + +def make_path(l): +    path = functools.reduce(os.path.join, l) +    if debug: +        print(path) +    return path + +def get_properties(p): +    props = {} + +    if p is None: +        return props + +    # Get the help string +    try: +        props["help"] = p.find("help").text +    except: +        props["help"] = "No help available" + + +    # Get the completion help strings +    try: +        che = p.findall("completionHelp") +        ch = "" +        for c in che: +            scripts = c.findall("script") +            paths = c.findall("path") +            lists = c.findall("list") + +            # Current backend doesn't support multiple allowed: tags +            # so we get to emulate it +            comp_exprs = [] +            for i in lists: +                comp_exprs.append("echo \"{0}\"".format(i.text)) +            for i in paths: +                comp_exprs.append("/bin/cli-shell-api listActiveNodes {0} | sed -e \"s/'//g\" && echo".format(i.text)) +            for i in scripts: +                comp_exprs.append("{0}".format(i.text)) +            comp_help = " && ".join(comp_exprs) +            props["comp_help"] = comp_help +    except: +        props["comp_help"] = [] + +    return props + + +def make_node_def(props, command): +    # XXX: replace with a template processor if it grows +    #      out of control + +    node_def = "" + +    if "help" in props: +        node_def += "help: {0}\n".format(props["help"]) + + +    if "comp_help" in props: +        node_def += "allowed: {0}\n".format(props["comp_help"]) + + +    if command is not None: +        node_def += "run: {0}\n".format(command.text) + + +    if debug: +        print("The contents of the node.def file:\n", node_def) + +    return node_def + +def process_node(n, tmpl_dir): +    # Avoid mangling the path from the outer call +    my_tmpl_dir = copy.copy(tmpl_dir) + +    props_elem = n.find("properties") +    children = n.find("children") +    command = n.find("command") + +    name = n.get("name") + +    node_type = n.tag + +    my_tmpl_dir.append(name) + +    if debug: +        print("Name of the node: {};\n Created directory: ".format(name), end="") +    os.makedirs(make_path(my_tmpl_dir), exist_ok=True) + +    props = get_properties(props_elem) + +    if node_type == "node": +        if debug: +          print("Processing node {}".format(name)) + +        nodedef_path = os.path.join(make_path(my_tmpl_dir), "node.def") +        if not os.path.exists(nodedef_path): +            with open(nodedef_path, "w") as f: +                f.write(make_node_def(props, command)) +        else: +           # Something has already generated this file +           pass + +        if children is not None: +            inner_nodes = children.iterfind("*") +            for inner_n in inner_nodes: +                process_node(inner_n, my_tmpl_dir) +    if node_type == "tagNode": +        if debug: +          print("Processing tag node {}".format(name)) + +        os.makedirs(make_path(my_tmpl_dir), exist_ok=True) + +        nodedef_path = os.path.join(make_path(my_tmpl_dir), "node.def") +        if not os.path.exists(nodedef_path): +            with open(nodedef_path, "w") as f: +                f.write('help: {0}\n'.format(props['help'])) +        else: +            # Something has already generated this file +            pass + +        # Create the inner node.tag part +        my_tmpl_dir.append("node.tag") +        os.makedirs(make_path(my_tmpl_dir), exist_ok=True) +        if debug: +            print("Created path for the tagNode: {}".format(make_path(my_tmpl_dir)), end="") + +        # Not sure if we want partially defined tag nodes, write the file unconditionally +        with open(os.path.join(make_path(my_tmpl_dir), "node.def"), "w") as f: +            f.write(make_node_def(props, command)) + +        if children is not None: +            inner_nodes = children.iterfind("*") +            for inner_n in inner_nodes: +                process_node(inner_n, my_tmpl_dir) +    else: +        # This is a leaf node +        if debug: +            print("Processing leaf node {}".format(name)) + +        with open(os.path.join(make_path(my_tmpl_dir), "node.def"), "w") as f: +            f.write(make_node_def(props, command)) + + +root = xml.getroot() + +nodes = root.iterfind("*") +for n in nodes: +    process_node(n, [output_dir]) diff --git a/scripts/build-command-templates b/scripts/build-command-templates new file mode 100755 index 000000000..457adbec2 --- /dev/null +++ b/scripts/build-command-templates @@ -0,0 +1,306 @@ +#!/usr/bin/env python3 +# +#    build-command-template: converts new style command definitions in XML +#      to the old style (bunch of dirs and node.def's) command templates +# +#    Copyright (C) 2017 VyOS maintainers <maintainers@vyos.net> +# +#    This library is free software; you can redistribute it and/or +#    modify it under the terms of the GNU Lesser General Public +#    License as published by the Free Software Foundation; either +#    version 2.1 of the License, or (at your option) any later version. +# +#    This library is distributed in the hope that it will be useful, +#    but WITHOUT ANY WARRANTY; without even the implied warranty of +#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +#    Lesser General Public License for more details. +# +#    You should have received a copy of the GNU Lesser General Public +#    License along with this library; if not, write to the Free Software +#    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 +#    USA + +import sys +import os +import argparse +import copy +import functools + +from lxml import etree as ET + +# Defaults + +#validator_dir = "/usr/libexec/vyos/validators" +validator_dir = "${vyos_validators_dir}" +default_constraint_err_msg = "Invalid value" + + +## Get arguments + +parser = argparse.ArgumentParser(description='Converts new-style XML interface definitions to old-style command templates') +parser.add_argument('--debug', help='Enable debug information output', action='store_true') +parser.add_argument('INPUT_FILE', type=str, help="XML interface definition file") +parser.add_argument('SCHEMA_FILE', type=str, help="RelaxNG schema file") +parser.add_argument('OUTPUT_DIR', type=str, help="Output directory") + +args = parser.parse_args() + +input_file = args.INPUT_FILE +schema_file = args.SCHEMA_FILE +output_dir = args.OUTPUT_DIR +debug = args.debug + +#debug = True + +## Load and validate the inputs + +try: +    xml = ET.parse(input_file) +except Exception as e: +    print("Failed to load interface definition file {0}".format(input_file)) +    print(e) +    sys.exit(1) + +try: +    relaxng_xml = ET.parse(schema_file) +    validator = ET.RelaxNG(relaxng_xml) + +    if not validator.validate(xml): +        print(validator.error_log) +        print("Interface definition file {0} does not match the schema!".format(input_file)) +        sys.exit(1) +except Exception as e: +    print("Failed to load the XML schema {0}".format(schema_file)) +    print(e) +    sys.exit(1) + +if not os.access(output_dir, os.W_OK): +    print("The output directory {0} is not writeable".format(output_dir)) +    sys.exit(1) + +## If we got this far, everything must be ok and we can convert the file + +def make_path(l): +    path = functools.reduce(os.path.join, l) +    if debug: +        print(path) +    return path + +def get_properties(p): +    props = {} + +    if p is None: +        return props + +    # Get the help string +    try: +        props["help"] = p.find("help").text +    except: +        pass + +    # Get value help strings +    try: +        vhe = p.findall("valueHelp") +        vh = [] +        for v in vhe: +            vh.append( (v.find("format").text, v.find("description").text) ) +        props["val_help"] = vh +    except: +        props["val_help"] = [] + +    # Get the constraint statements +    error_msg = default_constraint_err_msg +    # Get the error message if it's there +    try: +        error_msg = p.find("constraintErrorMessage").text +    except: +        pass + +    vce = p.find("constraint") +    vc = [] +    if vce is not None: +        # The old backend doesn't support multiple validators in OR mode +        # so we emulate it + +        regexes = [] +        regex_elements = vce.findall("regex") +        if regex_elements is not None: +            regexes = list(map(lambda e: e.text.strip().replace('\\','\\\\'), regex_elements)) +        if "" in regexes: +            print("Warning: empty regex, node will be accepting any value") + +        validator_elements = vce.findall("validator") +        validators = [] +        if validator_elements is not None: +            for v in validator_elements: +                v_name = os.path.join(validator_dir, v.get("name")) + +                # XXX: lxml returns None for empty arguments +                v_argument = None +                try: +                    v_argument = v.get("argument") +                except: +                    pass +                if v_argument is None: +                    v_argument = "" + +                validators.append("{0} {1}".format(v_name, v_argument)) + + +        regex_args = " ".join(map(lambda s: "--regex \\\'{0}\\\'".format(s), regexes)) +        validator_args = " ".join(map(lambda s: "--exec \\\"{0}\\\"".format(s), validators)) +        validator_script = '${vyos_libexec_dir}/validate-value' +        validator_string = "exec \"{0} {1} {2} --value \\\'$VAR(@)\\\'\"; \"{3}\"".format(validator_script, regex_args, validator_args, error_msg) + +        props["constraint"] = validator_string + +    # Get the completion help strings +    try: +        che = p.findall("completionHelp") +        ch = "" +        for c in che: +            scripts = c.findall("script") +            paths = c.findall("path") +            lists = c.findall("list") + +            # Current backend doesn't support multiple allowed: tags +            # so we get to emulate it +            comp_exprs = [] +            for i in lists: +                comp_exprs.append("echo \"{0}\"".format(i.text)) +            for i in paths: +                comp_exprs.append("/bin/cli-shell-api listNodes {0}".format(i.text)) +            for i in scripts: +                comp_exprs.append("sh -c \"{0}\"".format(i.text)) +            comp_help = " && ".join(comp_exprs) +            props["comp_help"] = comp_help +    except: +        props["comp_help"] = [] + +    # Get priority +    try: +        props["priority"] = p.find("priority").text +    except: +        pass + +    # Get "multi" +    if p.find("multi") is not None: +        props["multi"] = True + +    # Get "valueless" +    if p.find("valueless") is not None: +        props["valueless"] = True + +    return props + +def make_node_def(props): +    # XXX: replace with a template processor if it grows +    #      out of control + +    node_def = "" + +    if "tag" in props: +        node_def += "tag:\n" + +    if "multi" in props: +        node_def += "multi:\n" + +    if "type" in props: +        # Will always be txt in practice if it's set +        node_def += "type: {0}\n".format(props["type"]) + +    if "priority" in props: +        node_def += "priority: {0}\n".format(props["priority"]) + +    if "help" in props: +        node_def += "help: {0}\n".format(props["help"]) + +    if "val_help" in props: +        for v in props["val_help"]: +            node_def += "val_help: {0}; {1}\n".format(v[0], v[1]) + +    if "comp_help" in props: +        node_def += "allowed: {0}\n".format(props["comp_help"]) + +    if "constraint" in props: +        node_def += "syntax:expression: {0}\n".format(props["constraint"]) + +    if "owner" in props: +        if "tag" in props: +            node_def += "end: sudo sh -c \"VYOS_TAGNODE_VALUE='$VAR(@)' {0}\"\n".format(props["owner"]) +        else: +            node_def += "end: sudo sh -c \"{0}\"\n".format(props["owner"]) + +    if debug: +        print("The contents of the node.def file:\n", node_def) + +    return node_def + +def process_node(n, tmpl_dir): +    # Avoid mangling the path from the outer call +    my_tmpl_dir = copy.copy(tmpl_dir) + +    props_elem = n.find("properties") +    children = n.find("children") + +    name = n.get("name") +    owner = n.get("owner") +    node_type = n.tag + +    my_tmpl_dir.append(name) + +    if debug: +        print("Name of the node: {0}. Created directory: {1}\n".format(name, "/".join(my_tmpl_dir)), end="") +    os.makedirs(make_path(my_tmpl_dir), exist_ok=True) + +    props = get_properties(props_elem) +    if owner: +        props["owner"] = owner +    # Type should not be set for non-tag, non-leaf nodes +    # For non-valueless leaf nodes, set the type to txt: to make them have some type, +    # actual value validation is handled by constraints translated to syntax:expression: +    if node_type != "node": +        if "valueless" not in props.keys(): +            props["type"] = "txt" +    if node_type == "tagNode": +        props["tag"] = "True" + +    if node_type != "leafNode": +        if "multi" in props: +            raise ValueError("<multi/> tag is only allowed in <leafNode>") +        if "valueless" in props: +            raise ValueError("<valueless/> is only allowed in <leafNode>") + +    nodedef_path = os.path.join(make_path(my_tmpl_dir), "node.def") +    if not os.path.exists(nodedef_path): +        with open(nodedef_path, "w") as f: +            f.write(make_node_def(props)) +    else: +        # Something has already generated that file +        pass + + +    if node_type == "node": +        inner_nodes = children.iterfind("*") +        for inner_n in inner_nodes: +            process_node(inner_n, my_tmpl_dir) +    if node_type == "tagNode": +        my_tmpl_dir.append("node.tag") +        if debug: +            print("Created path for the tagNode:", end="") +        os.makedirs(make_path(my_tmpl_dir), exist_ok=True) +        inner_nodes = children.iterfind("*") +        for inner_n in inner_nodes: +            process_node(inner_n, my_tmpl_dir) +    else: +        # This is a leaf node +        pass + + +root = xml.getroot() + +nodes = root.iterfind("*") +for n in nodes: +    if n.tag == "syntaxVersion": +        continue +    process_node(n, [output_dir]) diff --git a/scripts/build-component-versions b/scripts/build-component-versions new file mode 100755 index 000000000..5362dbdd4 --- /dev/null +++ b/scripts/build-component-versions @@ -0,0 +1,47 @@ +#!/usr/bin/env python3 + +import sys +import os +import argparse +import json + +from lxml import etree as ET + +parser = argparse.ArgumentParser() +parser.add_argument('INPUT_DIR', type=str, +                    help="Directory containing XML interface definition files") +parser.add_argument('OUTPUT_DIR', type=str, +                    help="Output directory for JSON file") + +args = parser.parse_args() + +input_dir = args.INPUT_DIR +output_dir = args.OUTPUT_DIR + +version_dict = {} + +for filename in os.listdir(input_dir): +    filepath = os.path.join(input_dir, filename) +    print(filepath) +    try: +        xml = ET.parse(filepath) +    except Exception as e: +        print("Failed to load interface definition file {0}".format(filename)) +        print(e) +        sys.exit(1) + +    root = xml.getroot() +    version_data = root.iterfind("syntaxVersion") +    for ver in version_data: +        component = ver.get("component") +        version = int(ver.get("version")) + +        v = version_dict.get(component) +        if v is None: +            version_dict[component] = version +        elif version > v: +            version_dict[component] = version + +out_file = os.path.join(output_dir, 'component-versions.json') +with open(out_file, 'w') as f: +    json.dump(version_dict, f, indent=4, sort_keys=True) diff --git a/scripts/cli/base_interfaces_test.py b/scripts/cli/base_interfaces_test.py deleted file mode 100644 index 14ec7e137..000000000 --- a/scripts/cli/base_interfaces_test.py +++ /dev/null @@ -1,269 +0,0 @@ -# Copyright (C) 2019-2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program.  If not, see <http://www.gnu.org/licenses/>. - -import os -import unittest - -from netifaces import ifaddresses, AF_INET, AF_INET6 - -from vyos.configsession import ConfigSession -from vyos.ifconfig import Interface -from vyos.util import read_file -from vyos.validate import is_intf_addr_assigned, is_ipv6_link_local - -class BasicInterfaceTest: -    class BaseTest(unittest.TestCase): -        _test_ip = False -        _test_mtu = False -        _test_vlan = False -        _test_qinq = False -        _test_ipv6 = False -        _base_path = [] - -        _options = {} -        _interfaces = [] -        _qinq_range = ['10', '20', '30'] -        _vlan_range = ['100', '200', '300', '2000'] -        # choose IPv6 minimum MTU value for tests - this must always work -        _mtu = '1280' - -        def setUp(self): -            self.session = ConfigSession(os.getpid()) - -            self._test_addr = ['192.0.2.1/26', '192.0.2.255/31', '192.0.2.64/32', -                                '2001:db8:1::ffff/64', '2001:db8:101::1/112'] -            self._test_mtu = False -            self._options = {} - -        def tearDown(self): -            # we should not remove ethernet from the overall CLI -            if 'ethernet' in self._base_path: -                for interface in self._interfaces: -                    # when using a dedicated interface to test via TEST_ETH environment -                    # variable only this one will be cleared in the end - usable to test -                    # ethernet interfaces via SSH -                    self.session.delete(self._base_path + [interface]) -                    self.session.set(self._base_path + [interface, 'duplex', 'auto']) -                    self.session.set(self._base_path + [interface, 'speed', 'auto']) -                    self.session.set(self._base_path + [interface, 'smp-affinity', 'auto']) -            else: -                self.session.delete(self._base_path) - -            self.session.commit() -            del self.session - -        def test_add_description(self): -            """ -            Check if description can be added to interface -            """ -            for intf in self._interfaces: -                test_string='Description-Test-{}'.format(intf) -                self.session.set(self._base_path + [intf, 'description', test_string]) -                for option in self._options.get(intf, []): -                    self.session.set(self._base_path + [intf] + option.split()) - -            self.session.commit() - -            # Validate interface description -            for intf in self._interfaces: -                test_string='Description-Test-{}'.format(intf) -                with open('/sys/class/net/{}/ifalias'.format(intf), 'r') as f: -                    tmp = f.read().rstrip() -                    self.assertTrue(tmp, test_string) - -        def test_add_address_single(self): -            """ -            Check if a single address can be added to interface. -            """ -            addr = '192.0.2.0/31' -            for intf in self._interfaces: -                self.session.set(self._base_path + [intf, 'address', addr]) -                for option in self._options.get(intf, []): -                    self.session.set(self._base_path + [intf] + option.split()) - -            self.session.commit() - -            for intf in self._interfaces: -                self.assertTrue(is_intf_addr_assigned(intf, addr)) - -        def test_add_address_multi(self): -            """ -            Check if IPv4/IPv6 addresses can be added to interface. -            """ - -            # Add address -            for intf in self._interfaces: -                for addr in self._test_addr: -                    self.session.set(self._base_path + [intf, 'address', addr]) -                    for option in self._options.get(intf, []): -                        self.session.set(self._base_path + [intf] + option.split()) - -            self.session.commit() - -            # Validate address -            for intf in self._interfaces: -                for af in AF_INET, AF_INET6: -                    for addr in ifaddresses(intf)[af]: -                        # checking link local addresses makes no sense -                        if is_ipv6_link_local(addr['addr']): -                            continue - -                        self.assertTrue(is_intf_addr_assigned(intf, addr['addr'])) - -        def test_ipv6_link_local(self): -            """ Common function for IPv6 link-local address assignemnts """ -            if not self._test_ipv6: -                return None - -            for interface in self._interfaces: -                base = self._base_path + [interface] -                for option in self._options.get(interface, []): -                    self.session.set(base + option.split()) - -            # after commit we must have an IPv6 link-local address -            self.session.commit() - -            for interface in self._interfaces: -                for addr in ifaddresses(interface)[AF_INET6]: -                    self.assertTrue(is_ipv6_link_local(addr['addr'])) - -            # disable IPv6 link-local address assignment -            for interface in self._interfaces: -                base = self._base_path + [interface] -                self.session.set(base + ['ipv6', 'address', 'no-default-link-local']) - -            # after commit we must have no IPv6 link-local address -            self.session.commit() - -            for interface in self._interfaces: -                self.assertTrue(AF_INET6 not in ifaddresses(interface)) - -        def _mtu_test(self, intf): -            """ helper function to verify MTU size """ -            with open('/sys/class/net/{}/mtu'.format(intf), 'r') as f: -                tmp = f.read().rstrip() -                self.assertEqual(tmp, self._mtu) - -        def test_change_mtu(self): -            """ Testcase if MTU can be changed on interface """ -            if not self._test_mtu: -                return None -            for intf in self._interfaces: -                base = self._base_path + [intf] -                self.session.set(base + ['mtu', self._mtu]) -                for option in self._options.get(intf, []): -                    self.session.set(base + option.split()) - -            self.session.commit() -            for intf in self._interfaces: -                self._mtu_test(intf) - -        def test_8021q_vlan(self): -            """ Testcase for 802.1q VLAN interfaces """ -            if not self._test_vlan: -                return None - -            for interface in self._interfaces: -                base = self._base_path + [interface] -                for option in self._options.get(interface, []): -                    self.session.set(base + option.split()) - -                for vlan in self._vlan_range: -                    base = self._base_path + [interface, 'vif', vlan] -                    self.session.set(base + ['mtu', self._mtu]) -                    for address in self._test_addr: -                        self.session.set(base + ['address', address]) - -            self.session.commit() -            for intf in self._interfaces: -                for vlan in self._vlan_range: -                    vif = f'{intf}.{vlan}' -                    for address in self._test_addr: -                        self.assertTrue(is_intf_addr_assigned(vif, address)) -                    self._mtu_test(vif) - - -        def test_8021ad_qinq_vlan(self): -            """ Testcase for 802.1ad Q-in-Q VLAN interfaces """ -            if not self._test_qinq: -                return None - -            for interface in self._interfaces: -                base = self._base_path + [interface] -                for option in self._options.get(interface, []): -                    self.session.set(base + option.split()) - -                for vif_s in self._qinq_range: -                    for vif_c in self._vlan_range: -                        base = self._base_path + [interface, 'vif-s', vif_s, 'vif-c', vif_c] -                        self.session.set(base + ['mtu', self._mtu]) -                        for address in self._test_addr: -                            self.session.set(base + ['address', address]) - -            self.session.commit() -            for interface in self._interfaces: -                for vif_s in self._qinq_range: -                    for vif_c in self._vlan_range: -                        vif = f'{interface}.{vif_s}.{vif_c}' -                        for address in self._test_addr: -                            self.assertTrue(is_intf_addr_assigned(vif, address)) -                        self._mtu_test(vif) - -        def test_ip_options(self): -            """ test IP options like arp """ -            if not self._test_ip: -                return None - -            for interface in self._interfaces: -                arp_tmo = '300' -                path = self._base_path + [interface] -                for option in self._options.get(interface, []): -                    self.session.set(path + option.split()) - -                # Options -                self.session.set(path + ['ip', 'arp-cache-timeout', arp_tmo]) -                self.session.set(path + ['ip', 'disable-arp-filter']) -                self.session.set(path + ['ip', 'enable-arp-accept']) -                self.session.set(path + ['ip', 'enable-arp-announce']) -                self.session.set(path + ['ip', 'enable-arp-ignore']) -                self.session.set(path + ['ip', 'enable-proxy-arp']) -                self.session.set(path + ['ip', 'proxy-arp-pvlan']) -                self.session.set(path + ['ip', 'source-validation', 'loose']) - -            self.session.commit() - -            for interface in self._interfaces: -                tmp = read_file(f'/proc/sys/net/ipv4/neigh/{interface}/base_reachable_time_ms') -                self.assertEqual(tmp, str((int(arp_tmo) * 1000))) # tmo value is in milli seconds - -                tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/arp_filter') -                self.assertEqual('0', tmp) - -                tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/arp_accept') -                self.assertEqual('1', tmp) - -                tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/arp_announce') -                self.assertEqual('1', tmp) - -                tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/arp_ignore') -                self.assertEqual('1', tmp) - -                tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/proxy_arp') -                self.assertEqual('1', tmp) - -                tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/proxy_arp_pvlan') -                self.assertEqual('1', tmp) - -                tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/rp_filter') -                self.assertEqual('2', tmp) diff --git a/scripts/cli/test_interfaces_bonding.py b/scripts/cli/test_interfaces_bonding.py deleted file mode 100755 index e3d3b25ee..000000000 --- a/scripts/cli/test_interfaces_bonding.py +++ /dev/null @@ -1,61 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program.  If not, see <http://www.gnu.org/licenses/>. - -import os -import unittest - -from base_interfaces_test import BasicInterfaceTest - -from vyos.ifconfig import Section -from vyos.configsession import ConfigSessionError -from vyos.util import read_file - -class BondingInterfaceTest(BasicInterfaceTest.BaseTest): -    def setUp(self): -        super().setUp() - -        self._base_path = ['interfaces', 'bonding'] -        self._interfaces = ['bond0'] -        self._test_mtu = True -        self._test_vlan = True -        self._test_qinq = True -        self._test_ipv6 = True - -        self._members = [] -        # we need to filter out VLAN interfaces identified by a dot (.) -        # in their name - just in case! -        if 'TEST_ETH' in os.environ: -            self._members = os.environ['TEST_ETH'].split() -        else: -            for tmp in Section.interfaces("ethernet"): -                if not '.' in tmp: -                    self._members.append(tmp) - -        self._options['bond0'] = [] -        for member in self._members: -            self._options['bond0'].append(f'member interface {member}') - - -    def test_add_address_single(self): -        """ derived method to check if member interfaces are enslaved properly """ -        super().test_add_address_single() - -        for interface in self._interfaces: -            slaves = read_file(f'/sys/class/net/{interface}/bonding/slaves').split() -            self.assertListEqual(slaves, self._members) - -if __name__ == '__main__': -    unittest.main() diff --git a/scripts/cli/test_interfaces_bridge.py b/scripts/cli/test_interfaces_bridge.py deleted file mode 100755 index bc0bb69c6..000000000 --- a/scripts/cli/test_interfaces_bridge.py +++ /dev/null @@ -1,70 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program.  If not, see <http://www.gnu.org/licenses/>. - -import os -import unittest - -from base_interfaces_test import BasicInterfaceTest -from vyos.ifconfig import Section - -class BridgeInterfaceTest(BasicInterfaceTest.BaseTest): -    def setUp(self): -        super().setUp() - -        self._test_ipv6 = True - -        self._base_path = ['interfaces', 'bridge'] -        self._interfaces = ['br0'] - -        self._members = [] -        # we need to filter out VLAN interfaces identified by a dot (.) -        # in their name - just in case! -        if 'TEST_ETH' in os.environ: -            self._members = os.environ['TEST_ETH'].split() -        else: -            for tmp in Section.interfaces("ethernet"): -                if not '.' in tmp: -                    self._members.append(tmp) - -        self._options['br0'] = [] -        for member in self._members: -            self._options['br0'].append(f'member interface {member}') - -    def test_add_remove_member(self): -        for interface in self._interfaces: -            base = self._base_path + [interface] -            self.session.set(base + ['stp']) -            self.session.set(base + ['address', '192.0.2.1/24']) - -            cost = 1000 -            priority = 10 -            # assign members to bridge interface -            for member in self._members: -                base_member = base + ['member', 'interface', member] -                self.session.set(base_member + ['cost', str(cost)]) -                self.session.set(base_member + ['priority', str(priority)]) -                cost += 1 -                priority += 1 - -        self.session.commit() - -        for interface in self._interfaces: -            self.session.delete(self._base_path + [interface, 'member']) - -        self.session.commit() - -if __name__ == '__main__': -    unittest.main() diff --git a/scripts/cli/test_interfaces_dummy.py b/scripts/cli/test_interfaces_dummy.py deleted file mode 100755 index 01942fc89..000000000 --- a/scripts/cli/test_interfaces_dummy.py +++ /dev/null @@ -1,28 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program.  If not, see <http://www.gnu.org/licenses/>. - -import unittest - -from base_interfaces_test import BasicInterfaceTest - -class DummyInterfaceTest(BasicInterfaceTest.BaseTest): -    def setUp(self): -         super().setUp() -         self._base_path = ['interfaces', 'dummy'] -         self._interfaces = ['dum0', 'dum1', 'dum2'] - -if __name__ == '__main__': -    unittest.main() diff --git a/scripts/cli/test_interfaces_ethernet.py b/scripts/cli/test_interfaces_ethernet.py deleted file mode 100755 index 761ec7506..000000000 --- a/scripts/cli/test_interfaces_ethernet.py +++ /dev/null @@ -1,68 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program.  If not, see <http://www.gnu.org/licenses/>. - -import os -import unittest - -from base_interfaces_test import BasicInterfaceTest -from vyos.ifconfig import Section - -class EthernetInterfaceTest(BasicInterfaceTest.BaseTest): -    def setUp(self): -        super().setUp() - -        self._base_path = ['interfaces', 'ethernet'] -        self._test_ip = True -        self._test_mtu = True -        self._test_vlan = True -        self._test_qinq = True -        self._test_ipv6 = True -        self._interfaces = [] - -        # we need to filter out VLAN interfaces identified by a dot (.) -        # in their name - just in case! -        if 'TEST_ETH' in os.environ: -            tmp = os.environ['TEST_ETH'].split() -            self._interfaces = tmp -        else: -            for tmp in Section.interfaces("ethernet"): -                if not '.' in tmp: -                    self._interfaces.append(tmp) - -        def test_dhcp_disable(self): -            """ -            When interface is configured as admin down, it must be admin down even -            """ -            for interface in self._interfaces: -                self.session.set(self._base_path + [interface, 'disable']) -                for option in self._options.get(interface, []): -                    self.session.set(self._base_path + [interface] + option.split()) - -                # Also enable DHCP (ISC DHCP always places interface in admin up -                # state so we check that we do not start DHCP client. -                # https://phabricator.vyos.net/T2767 -                self.session.set(self._base_path + [interface, 'address', 'dhcp']) - -            self.session.commit() - -            # Validate interface state -            for interface in self._interfaces: -                with open(f'/sys/class/net/{interface}/flags', 'r') as f: -                    flags = f.read() -                self.assertEqual(int(flags, 16) & 1, 0) - -if __name__ == '__main__': -    unittest.main() diff --git a/scripts/cli/test_interfaces_geneve.py b/scripts/cli/test_interfaces_geneve.py deleted file mode 100755 index f84a55f86..000000000 --- a/scripts/cli/test_interfaces_geneve.py +++ /dev/null @@ -1,37 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program.  If not, see <http://www.gnu.org/licenses/>. - -import os -import unittest - -from vyos.configsession import ConfigSession, ConfigSessionError -from base_interfaces_test import BasicInterfaceTest - - -class GeneveInterfaceTest(BasicInterfaceTest.BaseTest): -    def setUp(self): -        super().setUp() - -        self._base_path = ['interfaces', 'geneve'] -        self._options = { -            'gnv0': ['vni 10', 'remote 127.0.1.1'], -            'gnv1': ['vni 20', 'remote 127.0.1.2'], -        } -        self._interfaces = list(self._options) - - -if __name__ == '__main__': -    unittest.main() diff --git a/scripts/cli/test_interfaces_l2tpv3.py b/scripts/cli/test_interfaces_l2tpv3.py deleted file mode 100755 index d8655d157..000000000 --- a/scripts/cli/test_interfaces_l2tpv3.py +++ /dev/null @@ -1,59 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program.  If not, see <http://www.gnu.org/licenses/>. - -import json -import jmespath -import unittest - -from base_interfaces_test import BasicInterfaceTest -from vyos.util import cmd - -class GeneveInterfaceTest(BasicInterfaceTest.BaseTest): -    def setUp(self): -        super().setUp() - -        self._base_path = ['interfaces', 'l2tpv3'] -        self._options = { -            'l2tpeth10': ['local-ip 127.0.0.1', 'remote-ip 127.10.10.10', -                          'tunnel-id 100', 'peer-tunnel-id 10', -                          'session-id 100', 'peer-session-id 10', -                          'source-port 1010', 'destination-port 10101'], -            'l2tpeth20': ['local-ip 127.0.0.1', 'peer-session-id 20', -                          'peer-tunnel-id 200', 'remote-ip 127.20.20.20', -                          'session-id 20', 'tunnel-id 200', -                          'source-port 2020', 'destination-port 20202'], -        } -        self._interfaces = list(self._options) - -    def test_add_address_single(self): -        super().test_add_address_single() - -        command = 'sudo ip -j l2tp show session' -        json_out = json.loads(cmd(command)) -        for interface in self._options: -            for config in json_out: -                if config['interface'] == interface: -                    # convert list with configuration items into a dict -                    dict = {} -                    for opt in self._options[interface]: -                        dict.update({opt.split()[0].replace('-','_'): opt.split()[1]}) - -                    for key in ['peer_session_id', 'peer_tunnel_id', 'session_id', 'tunnel_id']: -                        self.assertEqual(str(config[key]), dict[key]) - - -if __name__ == '__main__': -    unittest.main() diff --git a/scripts/cli/test_interfaces_loopback.py b/scripts/cli/test_interfaces_loopback.py deleted file mode 100755 index ba428b5d3..000000000 --- a/scripts/cli/test_interfaces_loopback.py +++ /dev/null @@ -1,41 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program.  If not, see <http://www.gnu.org/licenses/>. - -import unittest - -from base_interfaces_test import BasicInterfaceTest -from vyos.validate import is_intf_addr_assigned - -class LoopbackInterfaceTest(BasicInterfaceTest.BaseTest): -    def setUp(self): -         super().setUp() -         # these addresses are never allowed to be removed from the system -         self._loopback_addresses = ['127.0.0.1', '::1'] -         self._base_path = ['interfaces', 'loopback'] -         self._interfaces = ['lo'] - -    def test_add_address_single(self): -        super().test_add_address_single() -        for addr in self._loopback_addresses: -            self.assertTrue(is_intf_addr_assigned('lo', addr)) - -    def test_add_address_multi(self): -        super().test_add_address_multi() -        for addr in self._loopback_addresses: -            self.assertTrue(is_intf_addr_assigned('lo', addr)) - -if __name__ == '__main__': -    unittest.main() diff --git a/scripts/cli/test_interfaces_macsec.py b/scripts/cli/test_interfaces_macsec.py deleted file mode 100755 index 0f1b6486d..000000000 --- a/scripts/cli/test_interfaces_macsec.py +++ /dev/null @@ -1,102 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program.  If not, see <http://www.gnu.org/licenses/>. - -import re -import unittest -from psutil import process_iter - -from vyos.ifconfig import Section -from base_interfaces_test import BasicInterfaceTest -from vyos.configsession import ConfigSessionError -from vyos.util import read_file - -def get_config_value(intf, key): -    tmp = read_file(f'/run/wpa_supplicant/{intf}.conf') -    tmp = re.findall(r'\n?{}=(.*)'.format(key), tmp) -    return tmp[0] - -class MACsecInterfaceTest(BasicInterfaceTest.BaseTest): -    def setUp(self): -         super().setUp() -         self._base_path = ['interfaces', 'macsec'] -         self._options = { -             'macsec0': ['source-interface eth0', -                         'security cipher gcm-aes-128'] -         } - -         # if we have a physical eth1 interface, add a second macsec instance -         if 'eth1' in Section.interfaces("ethernet"): -             macsec = { 'macsec1': ['source-interface eth1', 'security cipher gcm-aes-128'] } -             self._options.update(macsec) - -         self._interfaces = list(self._options) - -    def test_encryption(self): -        """ MACsec can be operating in authentication and encryption -        mode - both using different mandatory settings, lets test -        encryption as the basic authentication test has been performed -        using the base class tests """ -        intf = 'macsec0' -        src_intf = 'eth0' -        mak_cak = '232e44b7fda6f8e2d88a07bf78a7aff4' -        mak_ckn = '40916f4b23e3d548ad27eedd2d10c6f98c2d21684699647d63d41b500dfe8836' -        replay_window = '64' -        self.session.set(self._base_path + [intf, 'security', 'encrypt']) - -        # check validate() - Cipher suite must be set for MACsec -        with self.assertRaises(ConfigSessionError): -            self.session.commit() -        self.session.set(self._base_path + [intf, 'security', 'cipher', 'gcm-aes-128']) - -        # check validate() - Physical source interface must be set for MACsec -        with self.assertRaises(ConfigSessionError): -            self.session.commit() -        self.session.set(self._base_path + [intf, 'source-interface', src_intf]) - -        # check validate() - MACsec security keys mandartory when encryption is enabled -        with self.assertRaises(ConfigSessionError): -            self.session.commit() -        self.session.set(self._base_path + [intf, 'security', 'mka', 'cak', mak_cak]) - -        # check validate() - MACsec security keys mandartory when encryption is enabled -        with self.assertRaises(ConfigSessionError): -            self.session.commit() -        self.session.set(self._base_path + [intf, 'security', 'mka', 'ckn', mak_ckn]) - -        self.session.set(self._base_path + [intf, 'security', 'replay-window', replay_window]) -        self.session.commit() - -        tmp = get_config_value(src_intf, 'macsec_integ_only') -        self.assertTrue("0" in tmp) - -        tmp = get_config_value(src_intf, 'mka_cak') -        self.assertTrue(mak_cak in tmp) - -        tmp = get_config_value(src_intf, 'mka_ckn') -        self.assertTrue(mak_ckn in tmp) - -        # check that the default priority of 255 is programmed -        tmp = get_config_value(src_intf, 'mka_priority') -        self.assertTrue("255" in tmp) - -        tmp = get_config_value(src_intf, 'macsec_replay_window') -        self.assertTrue(replay_window in tmp) - -        # Check for running process -        self.assertTrue("wpa_supplicant" in (p.name() for p in process_iter())) - -if __name__ == '__main__': -    unittest.main() diff --git a/scripts/cli/test_interfaces_pppoe.py b/scripts/cli/test_interfaces_pppoe.py deleted file mode 100755 index 822f05de6..000000000 --- a/scripts/cli/test_interfaces_pppoe.py +++ /dev/null @@ -1,162 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2019-2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program.  If not, see <http://www.gnu.org/licenses/>. - -import re -import os -import unittest - -from psutil import process_iter -from vyos.configsession import ConfigSession, ConfigSessionError -from vyos.util import read_file - -config_file = '/etc/ppp/peers/{}' -dhcp6c_config_file = '/run/dhcp6c/dhcp6c.{}.conf' -base_path = ['interfaces', 'pppoe'] - -def get_config_value(interface, key): -    with open(config_file.format(interface), 'r') as f: -        for line in f: -            if line.startswith(key): -                return list(line.split()) -    return [] - -def get_dhcp6c_config_value(interface, key): -    tmp = read_file(dhcp6c_config_file.format(interface)) -    tmp = re.findall(r'\n?{}\s+(.*)'.format(key), tmp) - -    out = [] -    for item in tmp: -        out.append(item.replace(';','')) -    return out - -class PPPoEInterfaceTest(unittest.TestCase): -    def setUp(self): -        self.session = ConfigSession(os.getpid()) -        self._interfaces = ['pppoe0', 'pppoe50'] -        self._source_interface = 'eth0' - -    def tearDown(self): -        self.session.delete(base_path) -        self.session.commit() -        del self.session - -    def test_pppoe(self): -        """ Check if PPPoE dialer can be configured and runs """ -        for interface in self._interfaces: -            user = 'VyOS-user-' + interface -            passwd = 'VyOS-passwd-' + interface -            mtu = '1400' - -            self.session.set(base_path + [interface, 'authentication', 'user', user]) -            self.session.set(base_path + [interface, 'authentication', 'password', passwd]) -            self.session.set(base_path + [interface, 'default-route', 'auto']) -            self.session.set(base_path + [interface, 'mtu', mtu]) -            self.session.set(base_path + [interface, 'no-peer-dns']) - -            # check validate() - a source-interface is required -            with self.assertRaises(ConfigSessionError): -                self.session.commit() -            self.session.set(base_path + [interface, 'source-interface', self._source_interface]) - -            # commit changes -            self.session.commit() - -        # verify configuration file(s) -        for interface in self._interfaces: -            user = 'VyOS-user-' + interface -            password = 'VyOS-passwd-' + interface - -            tmp = get_config_value(interface, 'mtu')[1] -            self.assertEqual(tmp, mtu) -            tmp = get_config_value(interface, 'user')[1].replace('"', '') -            self.assertEqual(tmp, user) -            tmp = get_config_value(interface, 'password')[1].replace('"', '') -            self.assertEqual(tmp, password) -            tmp = get_config_value(interface, 'ifname')[1] -            self.assertEqual(tmp, interface) - -            # Check if ppp process is running in the interface in question -            running = False -            for p in process_iter(): -                if "pppd" in p.name(): -                    if interface in p.cmdline(): -                        running = True - -            self.assertTrue(running) - -    def test_pppoe_dhcpv6pd(self): -        """ Check if PPPoE dialer can be configured with DHCPv6-PD """ -        address = '1' -        sla_id = '0' -        sla_len = '8' -        for interface in self._interfaces: -            self.session.set(base_path + [interface, 'authentication', 'user', 'vyos']) -            self.session.set(base_path + [interface, 'authentication', 'password', 'vyos']) -            self.session.set(base_path + [interface, 'default-route', 'none']) -            self.session.set(base_path + [interface, 'no-peer-dns']) -            self.session.set(base_path + [interface, 'source-interface', self._source_interface]) -            self.session.set(base_path + [interface, 'ipv6', 'enable']) - -            # prefix delegation stuff -            dhcpv6_pd_base = base_path + [interface, 'dhcpv6-options', 'pd', '0'] -            self.session.set(dhcpv6_pd_base + ['length', '56']) -            self.session.set(dhcpv6_pd_base + ['interface', self._source_interface, 'address', address]) -            self.session.set(dhcpv6_pd_base + ['interface', self._source_interface, 'sla-id',  sla_id]) - -            # commit changes -            self.session.commit() - -            # verify "normal" PPPoE value - 1492 is default MTU -            tmp = get_config_value(interface, 'mtu')[1] -            self.assertEqual(tmp, '1492') -            tmp = get_config_value(interface, 'user')[1].replace('"', '') -            self.assertEqual(tmp, 'vyos') -            tmp = get_config_value(interface, 'password')[1].replace('"', '') -            self.assertEqual(tmp, 'vyos') - -            for param in ['+ipv6', 'ipv6cp-use-ipaddr']: -                tmp = get_config_value(interface, param)[0] -                self.assertEqual(tmp, param) - -            # verify DHCPv6 prefix delegation -            # will return: ['delegation', '::/56 infinity;'] -            tmp = get_dhcp6c_config_value(interface, 'prefix')[1].split()[0] # mind the whitespace -            self.assertEqual(tmp, '::/56') -            tmp = get_dhcp6c_config_value(interface, 'prefix-interface')[0].split()[0] -            self.assertEqual(tmp, self._source_interface) -            tmp = get_dhcp6c_config_value(interface, 'ifid')[0] -            self.assertEqual(tmp, address) -            tmp = get_dhcp6c_config_value(interface, 'sla-id')[0] -            self.assertEqual(tmp, sla_id) -            tmp = get_dhcp6c_config_value(interface, 'sla-len')[0] -            self.assertEqual(tmp, sla_len) - -            # Check if ppp process is running in the interface in question -            running = False -            for p in process_iter(): -                if "pppd" in p.name(): -                    running = True -            self.assertTrue(running) - -            # We can not check if wide-dhcpv6 process is running as it is started -            # after the PPP interface gets a link to the ISP - but we can see if -            # it would be started by the scripts -            tmp = read_file(f'/etc/ppp/ipv6-up.d/1000-vyos-pppoe-{interface}') -            tmp = re.findall(f'systemctl start dhcp6c@{interface}.service', tmp) -            self.assertTrue(tmp) - -if __name__ == '__main__': -    unittest.main() diff --git a/scripts/cli/test_interfaces_pseudo_ethernet.py b/scripts/cli/test_interfaces_pseudo_ethernet.py deleted file mode 100755 index bc2e6e7eb..000000000 --- a/scripts/cli/test_interfaces_pseudo_ethernet.py +++ /dev/null @@ -1,39 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program.  If not, see <http://www.gnu.org/licenses/>. - -import unittest - -from base_interfaces_test import BasicInterfaceTest - -class PEthInterfaceTest(BasicInterfaceTest.BaseTest): - -    def setUp(self): -        super().setUp() -        self._base_path = ['interfaces', 'pseudo-ethernet'] - -        self._test_ip = True -        self._test_mtu = True -        self._test_vlan = True -        self._test_qinq = True - -        self._options = { -            'peth0': ['source-interface eth1'], -            'peth1': ['source-interface eth1'], -        } -        self._interfaces = list(self._options) - -if __name__ == '__main__': -    unittest.main() diff --git a/scripts/cli/test_interfaces_tunnel.py b/scripts/cli/test_interfaces_tunnel.py deleted file mode 100755 index 7611ffe26..000000000 --- a/scripts/cli/test_interfaces_tunnel.py +++ /dev/null @@ -1,109 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program.  If not, see <http://www.gnu.org/licenses/>. - -import os -import unittest - -from vyos.configsession import ConfigSession - -from base_interfaces_test import BasicInterfaceTest - -class TunnelInterfaceTest(BasicInterfaceTest.BaseTest): -    # encoding, tunnel endpoint (v4/v6), address (v4/v6) -    _valid = [ -        ('gre', 4, 4), -        ('gre', 4, 6), -        ('ip6gre', 6, 4), -        ('ip6gre', 6, 6), -        ('gre-bridge', 4, 4), -        ('ipip', 4, 4), -        ('ipip', 4, 6), -        ('ipip6', 6, 4), -        ('ipip6', 6, 6), -        ('ip6ip6', 6, 6), -        ('sit', 4, 6), -    ] - -    local = { -        4: '10.100.{}.1/24', -        6:  '2001:db8:{}::1/64', -    } - -    remote = { -        4: '192.0.{}.1', -        6: '2002::{}:1', -    } - -    address = { -        4: '10.100.{}.1/24', -        6:  '2001:db8:{}::1/64', -    } - -    def setUp(self): -        local = {} -        remote = {} -        address = {} - -        self._intf_dummy = ['interfaces', 'dummy'] -        self._base_path = ['interfaces', 'tunnel'] -        self._interfaces = ['tun{}'.format(n) for n in range(len(self._valid))] - -        self._test_mtu = True -        super().setUp() - -        for number in range(len(self._valid)): -            dum4 = 'dum4{}'.format(number) -            dum6 = 'dum6{}'.format(number) - -            ipv4 = self.local[4].format(number) -            ipv6 = self.local[6].format(number) - -            local.setdefault(4, {})[number] = ipv4 -            local.setdefault(6, {})[number] = ipv6 - -            ipv4 = self.remote[4].format(number) -            ipv6 = self.remote[6].format(number) - -            remote.setdefault(4, {})[number] = ipv4 -            remote.setdefault(6, {})[number] = ipv6 - -            ipv4 = self.address[4].format(number) -            ipv6 = self.address[6].format(number) - -            address.setdefault(4, {})[number] = ipv4 -            address.setdefault(6, {})[number] = ipv6 - -            self.session.set(self._intf_dummy + [dum4, 'address', ipv4]) -            self.session.set(self._intf_dummy + [dum6, 'address', ipv6]) -        self.session.commit() - -        for number, (encap, p2p, addr) in enumerate(self._valid): -            intf = 'tun%d' % number -            tunnel = {} -            tunnel['encapsulation'] = encap -            tunnel['local-ip'] = local[p2p][number].split('/')[0] -            tunnel['remote-ip'] = remote[p2p][number].split('/')[0] -            tunnel['address'] = address[addr][number] -            for name in tunnel: -                self.session.set(self._base_path + [intf, name, tunnel[name]]) - -    def tearDown(self): -        self.session.delete(self._intf_dummy) -        super().tearDown() - - -if __name__ == '__main__': -    unittest.main() diff --git a/scripts/cli/test_interfaces_vxlan.py b/scripts/cli/test_interfaces_vxlan.py deleted file mode 100755 index 2628e0285..000000000 --- a/scripts/cli/test_interfaces_vxlan.py +++ /dev/null @@ -1,35 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program.  If not, see <http://www.gnu.org/licenses/>. - -import unittest - -from vyos.configsession import ConfigSession, ConfigSessionError -from base_interfaces_test import BasicInterfaceTest - -class VXLANInterfaceTest(BasicInterfaceTest.BaseTest): -    def setUp(self): -        super().setUp() - -        self._test_mtu = True -        self._base_path = ['interfaces', 'vxlan'] -        self._options = { -            'vxlan0': ['vni 10', 'remote 127.0.0.2'], -            'vxlan1': ['vni 20', 'group 239.1.1.1', 'source-interface eth0'], -        } -        self._interfaces = list(self._options) - -if __name__ == '__main__': -    unittest.main() diff --git a/scripts/cli/test_interfaces_wireguard.py b/scripts/cli/test_interfaces_wireguard.py deleted file mode 100755 index 0c32a4696..000000000 --- a/scripts/cli/test_interfaces_wireguard.py +++ /dev/null @@ -1,68 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program.  If not, see <http://www.gnu.org/licenses/>. - -import os -import unittest - -from vyos.configsession import ConfigSession, ConfigSessionError -from base_interfaces_test import BasicInterfaceTest - -# Generate WireGuard default keypair -if not os.path.isdir('/config/auth/wireguard/default'): -    os.system('sudo /usr/libexec/vyos/op_mode/wireguard.py --genkey') - -base_path = ['interfaces', 'wireguard'] - -class WireGuardInterfaceTest(unittest.TestCase): -    def setUp(self): -        self.session = ConfigSession(os.getpid()) -        self._test_addr = ['192.0.2.1/26', '192.0.2.255/31', '192.0.2.64/32', -                          '2001:db8:1::ffff/64', '2001:db8:101::1/112'] -        self._interfaces = ['wg0', 'wg1'] - -    def tearDown(self): -        self.session.delete(base_path) -        self.session.commit() -        del self.session - -    def test_peer_setup(self): -        """ -        Create WireGuard interfaces with associated peers -        """ -        for intf in self._interfaces: -            peer = 'foo-' + intf -            psk = 'u2xdA70hkz0S1CG0dZlOh0aq2orwFXRIVrKo4DCvHgM=' -            pubkey = 'n6ZZL7ph/QJUJSUUTyu19c77my1dRCDHkMzFQUO9Z3A=' - -            for addr in self._test_addr: -                self.session.set(base_path + [intf, 'address', addr]) - -            self.session.set(base_path + [intf, 'peer', peer, 'address', '127.0.0.1']) -            self.session.set(base_path + [intf, 'peer', peer, 'port', '1337']) - -            # Allow different prefixes to traverse the tunnel -            allowed_ips = ['10.0.0.0/8', '172.16.0.0/12', '192.168.0.0/16'] -            for ip in allowed_ips: -                self.session.set(base_path + [intf, 'peer', peer, 'allowed-ips', ip]) - -            self.session.set(base_path + [intf, 'peer', peer, 'preshared-key', psk]) -            self.session.set(base_path + [intf, 'peer', peer, 'pubkey', pubkey]) -            self.session.commit() - -            self.assertTrue(os.path.isdir(f'/sys/class/net/{intf}')) - -if __name__ == '__main__': -    unittest.main() diff --git a/scripts/cli/test_interfaces_wireless.py b/scripts/cli/test_interfaces_wireless.py deleted file mode 100755 index fae233244..000000000 --- a/scripts/cli/test_interfaces_wireless.py +++ /dev/null @@ -1,58 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program.  If not, see <http://www.gnu.org/licenses/>. - -import os -import unittest - -from base_interfaces_test import BasicInterfaceTest -from psutil import process_iter -from vyos.util import check_kmod - -class WirelessInterfaceTest(BasicInterfaceTest.BaseTest): -    def setUp(self): -        super().setUp() - -        self._base_path = ['interfaces', 'wireless'] -        self._options = { -            'wlan0':  ['physical-device phy0', 'ssid VyOS-WIFI-0', -                       'type station', 'address 192.0.2.1/30'], -            'wlan1':  ['physical-device phy0', 'ssid VyOS-WIFI-1', -                       'type access-point', 'address 192.0.2.5/30', 'channel 0'], -            'wlan10': ['physical-device phy1', 'ssid VyOS-WIFI-2', -                       'type station', 'address 192.0.2.9/30'], -            'wlan11': ['physical-device phy1', 'ssid VyOS-WIFI-3', -                       'type access-point', 'address 192.0.2.13/30', 'channel 0'], -        } -        self._interfaces = list(self._options) -        self.session.set(['system', 'wifi-regulatory-domain', 'SE']) - -    def test_add_address_single(self): -        """ derived method to check if member interfaces are enslaved properly """ -        super().test_add_address_single() - -        for option, option_value in self._options.items(): -            if 'type access-point' in option_value: -                # Check for running process -                self.assertIn('hostapd', (p.name() for p in process_iter())) -            elif 'type station' in option_value: -                # Check for running process -                self.assertIn('wpa_supplicant', (p.name() for p in process_iter())) -            else: -                self.assertTrue(False) - -if __name__ == '__main__': -    check_kmod('mac80211_hwsim') -    unittest.main() diff --git a/scripts/cli/test_interfaces_wirelessmodem.py b/scripts/cli/test_interfaces_wirelessmodem.py deleted file mode 100755 index 40cd03b93..000000000 --- a/scripts/cli/test_interfaces_wirelessmodem.py +++ /dev/null @@ -1,82 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program.  If not, see <http://www.gnu.org/licenses/>. - -import os -import unittest - -from psutil import process_iter -from vyos.configsession import ConfigSession, ConfigSessionError - -config_file = '/etc/ppp/peers/{}' -base_path = ['interfaces', 'wirelessmodem'] - -def get_config_value(interface, key): -    with open(config_file.format(interface), 'r') as f: -        for line in f: -            if line.startswith(key): -                return list(line.split()) -    return [] - -class WWANInterfaceTest(unittest.TestCase): -    def setUp(self): -        self.session = ConfigSession(os.getpid()) -        self._interfaces = ['wlm0', 'wlm1'] - -    def tearDown(self): -        self.session.delete(base_path) -        self.session.commit() -        del self.session - -    def test_wlm_1(self): -        for interface in self._interfaces: -            self.session.set(base_path + [interface, 'no-peer-dns']) -            self.session.set(base_path + [interface, 'ondemand']) - -            # check validate() - APN must be configure -            with self.assertRaises(ConfigSessionError): -                self.session.commit() -            self.session.set(base_path + [interface, 'apn', 'vyos.net']) - -            # check validate() - device must be configure -            with self.assertRaises(ConfigSessionError): -                self.session.commit() -            self.session.set(base_path + [interface, 'device', 'ttyS0']) - -            # commit changes -            self.session.commit() - -        # verify configuration file(s) -        for interface in self._interfaces: -            tmp = get_config_value(interface, 'ifname')[1] -            self.assertTrue(interface in tmp) - -            tmp = get_config_value(interface, 'demand')[0] -            self.assertTrue('demand' in tmp) - -            tmp = os.path.isfile(f'/etc/ppp/peers/chat.{interface}') -            self.assertTrue(tmp) - -            # Check if ppp process is running in the interface in question -            running = False -            for p in process_iter(): -                if "pppd" in p.name(): -                    if interface in p.cmdline(): -                        running = True - -            self.assertTrue(running) - -if __name__ == '__main__': -    unittest.main() diff --git a/scripts/cli/test_nat.py b/scripts/cli/test_nat.py deleted file mode 100755 index 416810e40..000000000 --- a/scripts/cli/test_nat.py +++ /dev/null @@ -1,63 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program.  If not, see <http://www.gnu.org/licenses/>. - -import os -import jmespath -import json -import unittest - -from vyos.configsession import ConfigSession, ConfigSessionError -from vyos.util import cmd - -base_path = ['nat'] -snat_pattern = 'nftables[?rule].rule[?chain].{chain: chain, comment: comment, address: { network: expr[].match.right.prefix.addr | [0], prefix: expr[].match.right.prefix.len | [0]}}' - -class TestNAT(unittest.TestCase): -    def setUp(self): -        # ensure we can also run this test on a live system - so lets clean -        # out the current configuration :) -        self.session = ConfigSession(os.getpid()) -        self.session.delete(base_path) - -    def tearDown(self): -        self.session.delete(base_path) -        self.session.commit() - -    def test_source_nat(self): -        """ Configure and validate source NAT rule(s) """ - -        path = base_path + ['source'] -        network = '192.168.0.0/16' -        self.session.set(path + ['rule', '1', 'destination', 'address', network]) -        self.session.set(path + ['rule', '1', 'exclude']) - -        # check validate() - outbound-interface must be defined -        with self.assertRaises(ConfigSessionError): -            self.session.commit() - -        self.session.set(path + ['rule', '1', 'outbound-interface', 'any']) -        self.session.commit() - -        tmp = cmd('sudo nft -j list table nat') -        nftable_json = json.loads(tmp) -        condensed_json = jmespath.search(snat_pattern, nftable_json)[0] - -        self.assertEqual(condensed_json['comment'], 'DST-NAT-1') -        self.assertEqual(condensed_json['address']['network'], network.split('/')[0]) -        self.assertEqual(str(condensed_json['address']['prefix']), network.split('/')[1]) - -if __name__ == '__main__': -    unittest.main() diff --git a/scripts/cli/test_service_bcast-relay.py b/scripts/cli/test_service_bcast-relay.py deleted file mode 100755 index fe4531c3b..000000000 --- a/scripts/cli/test_service_bcast-relay.py +++ /dev/null @@ -1,71 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2019-2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program.  If not, see <http://www.gnu.org/licenses/>. - -import os -import unittest - -from psutil import process_iter -from vyos.configsession import ConfigSession, ConfigSessionError - -base_path = ['service', 'broadcast-relay'] - -class TestServiceBroadcastRelay(unittest.TestCase): -    _address1 = '192.0.2.1/24' -    _address2 = '192.0.2.1/24' - -    def setUp(self): -        self.session = ConfigSession(os.getpid()) -        self.session.set(['interfaces', 'dummy', 'dum1001', 'address', self._address1]) -        self.session.set(['interfaces', 'dummy', 'dum1002', 'address', self._address2]) -        self.session.commit() - -    def tearDown(self): -        self.session.delete(['interfaces', 'dummy', 'dum1001']) -        self.session.delete(['interfaces', 'dummy', 'dum1002']) -        self.session.delete(base_path) -        self.session.commit() -        del self.session - -    def test_service(self): -        """ Check if broadcast relay service can be configured and runs """ -        ids = range(1, 5) -        for id in ids: -            base = base_path + ['id', str(id)] -            self.session.set(base + ['description', 'vyos']) -            self.session.set(base + ['port', str(10000 + id)]) - -            # check validate() - two interfaces must be present -            with self.assertRaises(ConfigSessionError): -                self.session.commit() - -            self.session.set(base + ['interface', 'dum1001']) -            self.session.set(base + ['interface', 'dum1002']) -            self.session.set(base + ['address', self._address1.split('/')[0]]) - -        self.session.commit() - -        for id in ids: -            # check if process is running -            running = False -            for p in process_iter(): -                if "udp-broadcast-relay" in p.name(): -                    if p.cmdline()[3] == str(id): -                        running = True -                        break -            self.assertTrue(running) - -if __name__ == '__main__': -    unittest.main() diff --git a/scripts/cli/test_service_dns_dynamic.py b/scripts/cli/test_service_dns_dynamic.py deleted file mode 100755 index be52360ed..000000000 --- a/scripts/cli/test_service_dns_dynamic.py +++ /dev/null @@ -1,141 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2019-2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program.  If not, see <http://www.gnu.org/licenses/>. - -import re -import os -import unittest - -from getpass import getuser -from psutil import process_iter -from vyos.configsession import ConfigSession, ConfigSessionError -from vyos.util import read_file - -DDCLIENT_CONF = '/run/ddclient/ddclient.conf' -base_path = ['service', 'dns', 'dynamic'] - -def get_config_value(key): -    tmp = read_file(DDCLIENT_CONF) -    tmp = re.findall(r'\n?{}=+(.*)'.format(key), tmp) -    tmp = tmp[0].rstrip(',') -    return tmp - -def check_process(): -    """ -    Check for running process, process name changes dynamically e.g. -    "ddclient - sleeping for 270 seconds", thus we need a different approach -    """ -    running = False -    for p in process_iter(): -        if "ddclient" in p.name(): -            running = True -    return running - -class TestServiceDDNS(unittest.TestCase): -    def setUp(self): -        self.session = ConfigSession(os.getpid()) - -    def tearDown(self): -        # Delete DDNS configuration -        self.session.delete(base_path) -        self.session.commit() - -        del self.session - -    def test_service(self): -        """ Check individual DDNS service providers """ -        ddns = ['interface', 'eth0', 'service'] -        services = ['cloudflare', 'afraid', 'dyndns', 'zoneedit'] - -        for service in services: -            user = 'vyos_user' -            password = 'vyos_pass' -            zone = 'vyos.io' -            self.session.delete(base_path) -            self.session.set(base_path + ddns + [service, 'host-name', 'test.ddns.vyos.io']) -            self.session.set(base_path + ddns + [service, 'login', user]) -            self.session.set(base_path + ddns + [service, 'password', password]) -            self.session.set(base_path + ddns + [service, 'zone', zone]) - -            # commit changes -            if service == 'cloudflare': -                self.session.commit() -            else: -                # zone option only works on cloudflare, an exception is raised -                # for all others -                with self.assertRaises(ConfigSessionError): -                    self.session.commit() -                self.session.delete(base_path + ddns + [service, 'zone', 'vyos.io']) -                # commit changes again - now it should work -                self.session.commit() - -            # we can only read the configuration file when we operate as 'root' -            if getuser() == 'root': -                protocol = get_config_value('protocol') -                login = get_config_value('login') -                pwd = get_config_value('password') - -                # some services need special treatment -                protoname = service -                if service == 'cloudflare': -                    tmp = get_config_value('zone') -                    self.assertTrue(tmp == zone) -                elif service == 'afraid': -                    protoname = 'freedns' -                elif service == 'dyndns': -                    protoname = 'dyndns2' -                elif service == 'zoneedit': -                    protoname = 'zoneedit1' - -                self.assertTrue(protocol == protoname) -                self.assertTrue(login == user) -                self.assertTrue(pwd == "'" + password + "'") - -            # Check for running process -            self.assertTrue(check_process()) - - -    def test_rfc2136(self): -        """ Check if DDNS service can be configured and runs """ -        ddns = ['interface', 'eth0', 'rfc2136', 'vyos'] -        ddns_key_file = '/config/auth/my.key' - -        self.session.set(base_path + ddns + ['key', ddns_key_file]) -        self.session.set(base_path + ddns + ['record', 'test.ddns.vyos.io']) -        self.session.set(base_path + ddns + ['server', 'ns1.vyos.io']) -        self.session.set(base_path + ddns + ['ttl', '300']) -        self.session.set(base_path + ddns + ['zone', 'vyos.io']) - -        # ensure an exception will be raised as no key is present -        if os.path.exists(ddns_key_file): -            os.unlink(ddns_key_file) - -        # check validate() - the key file does not exist yet -        with self.assertRaises(ConfigSessionError): -            self.session.commit() - -        with open(ddns_key_file, 'w') as f: -            f.write('S3cretKey') - -        # commit changes -        self.session.commit() - -        # TODO: inspect generated configuration file - -        # Check for running process -        self.assertTrue(check_process()) - -if __name__ == '__main__': -    unittest.main() diff --git a/scripts/cli/test_service_mdns-repeater.py b/scripts/cli/test_service_mdns-repeater.py deleted file mode 100755 index 18900b6d2..000000000 --- a/scripts/cli/test_service_mdns-repeater.py +++ /dev/null @@ -1,51 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program.  If not, see <http://www.gnu.org/licenses/>. - -import os -import unittest - -from psutil import process_iter -from vyos.configsession import ConfigSession - -base_path = ['service', 'mdns', 'repeater'] -intf_base = ['interfaces', 'dummy'] - -class TestServiceMDNSrepeater(unittest.TestCase): -    def setUp(self): -        self.session = ConfigSession(os.getpid()) - -    def tearDown(self): -        self.session.delete(base_path) -        self.session.delete(intf_base + ['dum10']) -        self.session.delete(intf_base + ['dum20']) -        self.session.commit() -        del self.session - -    def test_service(self): -        # Service required a configured IP address on the interface - -        self.session.set(intf_base + ['dum10', 'address', '192.0.2.1/30']) -        self.session.set(intf_base + ['dum20', 'address', '192.0.2.5/30']) - -        self.session.set(base_path + ['interface', 'dum10']) -        self.session.set(base_path + ['interface', 'dum20']) -        self.session.commit() - -        # Check for running process -        self.assertTrue("mdns-repeater" in (p.name() for p in process_iter())) - -if __name__ == '__main__': -    unittest.main() diff --git a/scripts/cli/test_service_pppoe-server.py b/scripts/cli/test_service_pppoe-server.py deleted file mode 100755 index 901ca792d..000000000 --- a/scripts/cli/test_service_pppoe-server.py +++ /dev/null @@ -1,167 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program.  If not, see <http://www.gnu.org/licenses/>. - -import re -import os -import unittest - -from configparser import ConfigParser -from psutil import process_iter -from vyos.configsession import ConfigSession -from vyos.configsession import ConfigSessionError - -base_path = ['service', 'pppoe-server'] -local_if = ['interfaces', 'dummy', 'dum667'] -pppoe_conf = '/run/accel-pppd/pppoe.conf' - -ac_name = 'ACN' -subnet = '172.18.0.0/24' -gateway = '192.0.2.1' -nameserver = '9.9.9.9' -mtu = '1492' -interface = 'eth0' - -class TestServicePPPoEServer(unittest.TestCase): -    def setUp(self): -        self.session = ConfigSession(os.getpid()) -        # ensure we can also run this test on a live system - so lets clean -        # out the current configuration :) -        self.session.delete(base_path) - -    def tearDown(self): -        self.session.delete(base_path) -        self.session.delete(local_if) -        self.session.commit() -        del self.session - -    def verify(self, conf): -        # validate some common values in the configuration -        for tmp in ['log_syslog', 'pppoe', 'chap-secrets', 'ippool', 'ipv6pool', -                    'ipv6_nd', 'ipv6_dhcp', 'auth_mschap_v2', 'auth_mschap_v1', -                    'auth_chap_md5', 'auth_pap', 'shaper']: -            # Settings without values provide None -            self.assertEqual(conf['modules'][tmp], None) - -        # check Access Concentrator setting -        self.assertTrue(conf['pppoe']['ac-name'] == ac_name) -        self.assertTrue(conf['pppoe'].getboolean('verbose')) -        self.assertTrue(conf['pppoe']['interface'], interface) - -        # check configured subnet -        self.assertEqual(conf['ip-pool'][subnet], None) -        self.assertEqual(conf['ip-pool']['gw-ip-address'], gateway) - -        # check ppp -        self.assertTrue(conf['ppp'].getboolean('verbose')) -        self.assertTrue(conf['ppp'].getboolean('check-ip')) -        self.assertEqual(conf['ppp']['min-mtu'], mtu) -        self.assertEqual(conf['ppp']['mtu'], mtu) -        self.assertEqual(conf['ppp']['lcp-echo-interval'], '30') -        self.assertEqual(conf['ppp']['lcp-echo-timeout'], '0') -        self.assertEqual(conf['ppp']['lcp-echo-failure'], '3') - -    def basic_config(self): -        self.session.set(local_if + ['address', '192.0.2.1/32']) - -        self.session.set(base_path + ['access-concentrator', ac_name]) -        self.session.set(base_path + ['authentication', 'mode', 'local']) -        self.session.set(base_path + ['client-ip-pool', 'subnet', subnet]) -        self.session.set(base_path + ['name-server', nameserver]) -        self.session.set(base_path + ['interface', interface]) -        self.session.set(base_path + ['local-ip', gateway]) - -    def test_local_auth(self): -        """ Test configuration of local authentication for PPPoE server """ -        self.basic_config() -        # authentication -        self.session.set(base_path + ['authentication', 'local-users', 'username', 'vyos', 'password', 'vyos']) -        self.session.set(base_path + ['authentication', 'mode', 'local']) -        # other settings -        self.session.set(base_path + ['ppp-options', 'ccp']) -        self.session.set(base_path + ['ppp-options', 'mppe', 'require']) -        self.session.set(base_path + ['limits', 'connection-limit', '20/min']) - -        # commit changes -        self.session.commit() - -        # Validate configuration values -        conf = ConfigParser(allow_no_value=True) -        conf.read(pppoe_conf) - -        # basic verification -        self.verify(conf) - -        # check auth -        self.assertEqual(conf['chap-secrets']['chap-secrets'], '/run/accel-pppd/pppoe.chap-secrets') -        self.assertEqual(conf['chap-secrets']['gw-ip-address'], gateway) - -        # check pado -        self.assertEqual(conf['ppp']['mppe'], 'require') -        self.assertTrue(conf['ppp'].getboolean('ccp')) - -        # check other settings -        self.assertEqual(conf['connlimit']['limit'], '20/min') - -        # Check for running process -        self.assertTrue('accel-pppd' in (p.name() for p in process_iter())) - -    def test_radius_auth(self): -        """ Test configuration of RADIUS authentication for PPPoE server """ -        radius_server = '192.0.2.22' -        radius_key = 'secretVyOS' -        radius_port = '2000' -        radius_port_acc = '3000' - -        self.basic_config() -        self.session.set(base_path + ['authentication', 'radius', 'server', radius_server, 'key', radius_key]) -        self.session.set(base_path + ['authentication', 'radius', 'server', radius_server, 'port', radius_port]) -        self.session.set(base_path + ['authentication', 'radius', 'server', radius_server, 'acct-port', radius_port_acc]) -        self.session.set(base_path + ['authentication', 'mode', 'radius']) - -        # commit changes -        self.session.commit() - -        # Validate configuration values -        conf = ConfigParser(allow_no_value=True) -        conf.read(pppoe_conf) - -        # basic verification -        self.verify(conf) - -        # check auth -        self.assertTrue(conf['radius'].getboolean('verbose')) -        self.assertTrue(conf['radius']['acct-timeout'], '3') -        self.assertTrue(conf['radius']['timeout'], '3') -        self.assertTrue(conf['radius']['max-try'], '3') -        self.assertTrue(conf['radius']['gw-ip-address'], gateway) - -        server = conf['radius']['server'].split(',') -        self.assertEqual(radius_server, server[0]) -        self.assertEqual(radius_key, server[1]) -        self.assertEqual(f'auth-port={radius_port}', server[2]) -        self.assertEqual(f'acct-port={radius_port_acc}', server[3]) -        self.assertEqual(f'req-limit=0', server[4]) -        self.assertEqual(f'fail-time=0', server[5]) - -        # check defaults -        self.assertEqual(conf['ppp']['mppe'], 'prefer') -        self.assertFalse(conf['ppp'].getboolean('ccp')) - -        # Check for running process -        self.assertTrue('accel-pppd' in (p.name() for p in process_iter())) - -if __name__ == '__main__': -    unittest.main() diff --git a/scripts/cli/test_service_router-advert.py b/scripts/cli/test_service_router-advert.py deleted file mode 100755 index ec2110c8a..000000000 --- a/scripts/cli/test_service_router-advert.py +++ /dev/null @@ -1,99 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2019-2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program.  If not, see <http://www.gnu.org/licenses/>. - -import re -import os -import unittest - -from psutil import process_iter -from vyos.configsession import ConfigSession -from vyos.util import read_file - -RADVD_CONF = '/run/radvd/radvd.conf' - -interface = 'eth1' -base_path = ['service', 'router-advert', 'interface', interface] -address_base = ['interfaces', 'ethernet', interface, 'address'] - -def get_config_value(key): -    tmp = read_file(RADVD_CONF) -    tmp = re.findall(r'\n?{}\s+(.*)'.format(key), tmp) -    return tmp[0].split()[0].replace(';','') - -class TestServiceRADVD(unittest.TestCase): -    def setUp(self): -        self.session = ConfigSession(os.getpid()) -        self.session.set(address_base + ['2001:db8::1/64']) - -    def tearDown(self): -        self.session.delete(address_base) -        self.session.delete(base_path) -        self.session.commit() -        del self.session - -    def test_single(self): -        self.session.set(base_path + ['prefix', '::/64', 'no-on-link-flag']) -        self.session.set(base_path + ['prefix', '::/64', 'no-autonomous-flag']) -        self.session.set(base_path + ['prefix', '::/64', 'valid-lifetime', 'infinity']) -        self.session.set(base_path + ['dnssl', '2001:db8::1234']) -        self.session.set(base_path + ['other-config-flag']) - -        # commit changes -        self.session.commit() - -        # verify values -        tmp = get_config_value('interface') -        self.assertEqual(tmp, interface) - -        tmp = get_config_value('prefix') -        self.assertEqual(tmp, '::/64') - -        tmp = get_config_value('AdvOtherConfigFlag') -        self.assertEqual(tmp, 'on') - -        # this is a default value -        tmp = get_config_value('AdvRetransTimer') -        self.assertEqual(tmp, '0') - -        # this is a default value -        tmp = get_config_value('AdvCurHopLimit') -        self.assertEqual(tmp, '64') - -        # this is a default value -        tmp = get_config_value('AdvDefaultPreference') -        self.assertEqual(tmp, 'medium') - -        tmp = get_config_value('AdvAutonomous') -        self.assertEqual(tmp, 'off') - -        # this is a default value -        tmp = get_config_value('AdvValidLifetime') -        self.assertEqual(tmp, 'infinity') - -        # this is a default value -        tmp = get_config_value('AdvPreferredLifetime') -        self.assertEqual(tmp, '14400') - -        tmp = get_config_value('AdvOnLink') -        self.assertEqual(tmp, 'off') - - - -        # Check for running process -        self.assertTrue('radvd' in (p.name() for p in process_iter())) - -if __name__ == '__main__': -    unittest.main() diff --git a/scripts/cli/test_service_snmp.py b/scripts/cli/test_service_snmp.py deleted file mode 100755 index fb5f5393f..000000000 --- a/scripts/cli/test_service_snmp.py +++ /dev/null @@ -1,155 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2019-2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program.  If not, see <http://www.gnu.org/licenses/>. - -import os -import re -import unittest - -from vyos.validate import is_ipv4 -from psutil import process_iter - -from vyos.configsession import ConfigSession, ConfigSessionError -from vyos.util import read_file - -SNMPD_CONF = '/etc/snmp/snmpd.conf' -base_path = ['service', 'snmp'] - -def get_config_value(key): -    tmp = read_file(SNMPD_CONF) -    tmp = re.findall(r'\n?{}\s+(.*)'.format(key), tmp) -    return tmp[0] - -class TestSNMPService(unittest.TestCase): -    def setUp(self): -        self.session = ConfigSession(os.getpid()) -        # ensure we can also run this test on a live system - so lets clean -        # out the current configuration :) -        self.session.delete(base_path) - -    def tearDown(self): -        del self.session - -    def test_snmp(self): -        """ Check if SNMP can be configured and service runs """ -        clients = ['192.0.2.1', '2001:db8::1'] -        networks = ['192.0.2.128/25', '2001:db8:babe::/48'] -        listen = ['127.0.0.1', '::1'] - -        for auth in ['ro', 'rw']: -            community = 'VyOS' + auth -            self.session.set(base_path + ['community', community, 'authorization', auth]) -            for client in clients: -                self.session.set(base_path + ['community', community, 'client', client]) -            for network in networks: -                self.session.set(base_path + ['community', community, 'network', network]) - -        for addr in listen: -            self.session.set(base_path + ['listen-address', addr]) - -        self.session.set(base_path + ['contact', 'maintainers@vyos.io']) -        self.session.set(base_path + ['location', 'qemu']) - -        self.session.commit() - -        # verify listen address, it will be returned as -        # ['unix:/run/snmpd.socket,udp:127.0.0.1:161,udp6:[::1]:161'] -        # thus we need to transfor this into a proper list -        config = get_config_value('agentaddress') -        expected = 'unix:/run/snmpd.socket' -        for addr in listen: -            if is_ipv4(addr): -                expected += ',udp:{}:161'.format(addr) -            else: -                expected += ',udp6:[{}]:161'.format(addr) - -        self.assertTrue(expected in config) - -        # Check for running process -        self.assertTrue("snmpd" in (p.name() for p in process_iter())) - - -    def test_snmpv3_sha(self): -        """ Check if SNMPv3 can be configured with SHA authentication and service runs""" - -        self.session.set(base_path + ['v3', 'engineid', '000000000000000000000002']) -        self.session.set(base_path + ['v3', 'group', 'default', 'mode', 'ro']) -        # check validate() - a view must be created before this can be comitted -        with self.assertRaises(ConfigSessionError): -            self.session.commit() - -        self.session.set(base_path + ['v3', 'view', 'default', 'oid', '1']) -        self.session.set(base_path + ['v3', 'group', 'default', 'view', 'default']) - -        # create user -        self.session.set(base_path + ['v3', 'user', 'vyos', 'auth', 'plaintext-password', 'vyos12345678']) -        self.session.set(base_path + ['v3', 'user', 'vyos', 'auth', 'type', 'sha']) -        self.session.set(base_path + ['v3', 'user', 'vyos', 'privacy', 'plaintext-password', 'vyos12345678']) -        self.session.set(base_path + ['v3', 'user', 'vyos', 'privacy', 'type', 'aes']) -        self.session.set(base_path + ['v3', 'user', 'vyos', 'group', 'default']) - -        self.session.commit() - -        # commit will alter the CLI values - check if they have been updated: -        hashed_password = '4e52fe55fd011c9c51ae2c65f4b78ca93dcafdfe' -        tmp = self.session.show_config(base_path + ['v3', 'user', 'vyos', 'auth', 'encrypted-password']).split()[1] -        self.assertEqual(tmp, hashed_password) - -        tmp = self.session.show_config(base_path + ['v3', 'user', 'vyos', 'privacy', 'encrypted-password']).split()[1] -        self.assertEqual(tmp, hashed_password) - -        # TODO: read in config file and check values - -        # Check for running process -        self.assertTrue("snmpd" in (p.name() for p in process_iter())) - -    def test_snmpv3_md5(self): -        """ Check if SNMPv3 can be configured with MD5 authentication and service runs""" - -        self.session.set(base_path + ['v3', 'engineid', '000000000000000000000002']) -        self.session.set(base_path + ['v3', 'group', 'default', 'mode', 'ro']) -        # check validate() - a view must be created before this can be comitted -        with self.assertRaises(ConfigSessionError): -            self.session.commit() - -        self.session.set(base_path + ['v3', 'view', 'default', 'oid', '1']) -        self.session.set(base_path + ['v3', 'group', 'default', 'view', 'default']) - -        # create user -        self.session.set(base_path + ['v3', 'user', 'vyos', 'auth', 'plaintext-password', 'vyos12345678']) -        self.session.set(base_path + ['v3', 'user', 'vyos', 'auth', 'type', 'md5']) -        self.session.set(base_path + ['v3', 'user', 'vyos', 'privacy', 'plaintext-password', 'vyos12345678']) -        self.session.set(base_path + ['v3', 'user', 'vyos', 'privacy', 'type', 'des']) -        self.session.set(base_path + ['v3', 'user', 'vyos', 'group', 'default']) - -        self.session.commit() - -        # commit will alter the CLI values - check if they have been updated: -        hashed_password = '4c67690d45d3dfcd33d0d7e308e370ad' -        tmp = self.session.show_config(base_path + ['v3', 'user', 'vyos', 'auth', 'encrypted-password']).split()[1] -        self.assertEqual(tmp, hashed_password) - -        tmp = self.session.show_config(base_path + ['v3', 'user', 'vyos', 'privacy', 'encrypted-password']).split()[1] -        self.assertEqual(tmp, hashed_password) - -        # TODO: read in config file and check values - -        # Check for running process -        self.assertTrue("snmpd" in (p.name() for p in process_iter())) - - -if __name__ == '__main__': -    unittest.main() - diff --git a/scripts/cli/test_service_ssh.py b/scripts/cli/test_service_ssh.py deleted file mode 100755 index 3ee498f3d..000000000 --- a/scripts/cli/test_service_ssh.py +++ /dev/null @@ -1,116 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2019-2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program.  If not, see <http://www.gnu.org/licenses/>. - -import re -import os -import unittest - -from psutil import process_iter -from vyos.configsession import ConfigSession, ConfigSessionError -from vyos.util import read_file - -SSHD_CONF = '/run/ssh/sshd_config' -base_path = ['service', 'ssh'] - -def get_config_value(key): -    tmp = read_file(SSHD_CONF) -    tmp = re.findall(r'\n?{}\s+(.*)'.format(key), tmp) -    return tmp - -class TestServiceSSH(unittest.TestCase): -    def setUp(self): -        self.session = ConfigSession(os.getpid()) -        # ensure we can also run this test on a live system - so lets clean -        # out the current configuration :) -        self.session.delete(base_path) - -    def tearDown(self): -        # delete testing SSH config -        self.session.delete(base_path) -        # restore "plain" SSH access -        self.session.set(base_path) - -        self.session.commit() -        del self.session - -    def test_ssh_single(self): -        """ Check if SSH service can be configured and runs """ -        self.session.set(base_path + ['port', '1234']) -        self.session.set(base_path + ['disable-host-validation']) -        self.session.set(base_path + ['disable-password-authentication']) -        self.session.set(base_path + ['loglevel', 'verbose']) -        self.session.set(base_path + ['client-keepalive-interval', '100']) -        self.session.set(base_path + ['listen-address', '127.0.0.1']) - -        # commit changes -        self.session.commit() - -        # Check configured port -        port = get_config_value('Port')[0] -        self.assertTrue("1234" in port) - -        # Check DNS usage -        dns = get_config_value('UseDNS')[0] -        self.assertTrue("no" in dns) - -        # Check PasswordAuthentication -        pwd = get_config_value('PasswordAuthentication')[0] -        self.assertTrue("no" in pwd) - -        # Check loglevel -        loglevel = get_config_value('LogLevel')[0] -        self.assertTrue("VERBOSE" in loglevel) - -        # Check listen address -        address = get_config_value('ListenAddress')[0] -        self.assertTrue("127.0.0.1" in address) - -        # Check keepalive -        keepalive = get_config_value('ClientAliveInterval')[0] -        self.assertTrue("100" in keepalive) - -        # Check for running process -        self.assertTrue("sshd" in (p.name() for p in process_iter())) - -    def test_ssh_multi(self): -        """ Check if SSH service can be configured and runs with multiple -            listen ports and listen-addresses """ -        ports = ['22', '2222'] -        for port in ports: -            self.session.set(base_path + ['port', port]) - -        addresses = ['127.0.0.1', '::1'] -        for address in addresses: -            self.session.set(base_path + ['listen-address', address]) - -        # commit changes -        self.session.commit() - -        # Check configured port -        tmp = get_config_value('Port') -        for port in ports: -            self.assertIn(port, tmp) - -        # Check listen address -        tmp = get_config_value('ListenAddress') -        for address in addresses: -            self.assertIn(address, tmp) - -        # Check for running process -        self.assertTrue("sshd" in (p.name() for p in process_iter())) - -if __name__ == '__main__': -    unittest.main() diff --git a/scripts/cli/test_system_lcd.py b/scripts/cli/test_system_lcd.py deleted file mode 100755 index 931a91c53..000000000 --- a/scripts/cli/test_system_lcd.py +++ /dev/null @@ -1,54 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2020 Francois Mertz fireboxled@gmail.com -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program.  If not, see <http://www.gnu.org/licenses/>. - -import os -import unittest - -from configparser import ConfigParser -from psutil import process_iter -from vyos.configsession import ConfigSession - -base_path = ['system', 'lcd'] - -class TestSystemLCD(unittest.TestCase): -    def setUp(self): -        self.session = ConfigSession(os.getpid()) - -    def tearDown(self): -        self.session.delete(base_path) -        self.session.commit() -        del self.session - -    def test_system_display(self): -        # configure some system display -        self.session.set(base_path + ['device', 'ttyS1']) -        self.session.set(base_path + ['model', 'cfa-533']) - -        # commit changes -        self.session.commit() - -        # load up ini-styled LCDd.conf -        conf = ConfigParser() -        conf.read('/run/LCDd/LCDd.conf') - -        self.assertEqual(conf['CFontzPacket']['Model'], '533') -        self.assertEqual(conf['CFontzPacket']['Device'], '/dev/ttyS1') - -        # both processes running -        self.assertTrue('LCDd' in (p.name() for p in process_iter())) - -if __name__ == '__main__': -    unittest.main() diff --git a/scripts/cli/test_system_login.py b/scripts/cli/test_system_login.py deleted file mode 100755 index 3c4b1fa28..000000000 --- a/scripts/cli/test_system_login.py +++ /dev/null @@ -1,67 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2019-2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program.  If not, see <http://www.gnu.org/licenses/>. - -import os -import re -import unittest - -from subprocess import Popen, PIPE -from vyos.configsession import ConfigSession, ConfigSessionError -import vyos.util as util - -base_path = ['system', 'login'] -users = ['vyos1', 'vyos2'] - -class TestSystemLogin(unittest.TestCase): -    def setUp(self): -        self.session = ConfigSession(os.getpid()) - -    def tearDown(self): -        # Delete individual users from configuration -        for user in users: -            self.session.delete(base_path + ['user', user]) - -        self.session.commit() -        del self.session - -    def test_user(self): -        """ Check if user can be created and we can SSH to localhost """ -        self.session.set(['service', 'ssh', 'port', '22']) - -        for user in users: -            name = "VyOS Roxx " + user -            home_dir = "/tmp/" + user - -            self.session.set(base_path + ['user', user, 'authentication', 'plaintext-password', user]) -            self.session.set(base_path + ['user', user, 'full-name', 'VyOS Roxx']) -            self.session.set(base_path + ['user', user, 'home-directory', home_dir]) - -        self.session.commit() - -        for user in users: -            cmd = ['su','-', user] -            proc = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE) -            tmp = "{}\nuname -a".format(user) -            proc.stdin.write(tmp.encode()) -            proc.stdin.flush() -            (stdout, stderr) = proc.communicate() - -            # stdout is something like this: -            # b'Linux vyos 4.19.101-amd64-vyos #1 SMP Sun Feb 2 10:18:07 UTC 2020 x86_64 GNU/Linux\n' -            self.assertTrue(len(stdout) > 40) - -if __name__ == '__main__': -    unittest.main() diff --git a/scripts/cli/test_system_nameserver.py b/scripts/cli/test_system_nameserver.py deleted file mode 100755 index 9040be072..000000000 --- a/scripts/cli/test_system_nameserver.py +++ /dev/null @@ -1,66 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2019-2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program.  If not, see <http://www.gnu.org/licenses/>. - -import os -import re -import unittest - -from vyos.configsession import ConfigSession, ConfigSessionError -import vyos.util as util - -RESOLV_CONF = '/etc/resolv.conf' - -test_servers = ['192.0.2.10', '2001:db8:1::100'] -base_path = ['system', 'name-server'] - -def get_name_servers(): -    resolv_conf = util.read_file(RESOLV_CONF) -    return re.findall(r'\n?nameserver\s+(.*)', resolv_conf) - -class TestSystemNameServer(unittest.TestCase): -    def setUp(self): -        self.session = ConfigSession(os.getpid()) - -    def tearDown(self): -        # Delete existing name servers -        self.session.delete(base_path) -        self.session.commit() - -        del self.session - -    def test_add_server(self): -        """ Check if server is added to resolv.conf """ -        for s in test_servers: -            self.session.set(base_path + [s]) -        self.session.commit() - -        servers = get_name_servers() -        for s in servers: -            self.assertTrue(s in servers) - -    def test_delete_server(self): -        """ Test if a deleted server disappears from resolv.conf """ -        for s in test_servers: -          self.session.delete(base_path + [s]) -        self.session.commit() - -        servers = get_name_servers() -        for s in servers: -            self.assertTrue(test_server_1 not in servers) - -if __name__ == '__main__': -    unittest.main() - diff --git a/scripts/cli/test_system_ntp.py b/scripts/cli/test_system_ntp.py deleted file mode 100755 index 856a28916..000000000 --- a/scripts/cli/test_system_ntp.py +++ /dev/null @@ -1,108 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2019-2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program.  If not, see <http://www.gnu.org/licenses/>. - -import re -import os -import unittest - -from psutil import process_iter -from vyos.configsession import ConfigSession, ConfigSessionError -from vyos.template import vyos_address_from_cidr, vyos_netmask_from_cidr -from vyos.util import read_file - -NTP_CONF = '/etc/ntp.conf' -base_path = ['system', 'ntp'] - -def get_config_value(key): -    tmp = read_file(NTP_CONF) -    tmp = re.findall(r'\n?{}\s+(.*)'.format(key), tmp) -    # remove possible trailing whitespaces -    return [item.strip() for item in tmp] - -class TestSystemNTP(unittest.TestCase): -    def setUp(self): -        self.session = ConfigSession(os.getpid()) -        # ensure we can also run this test on a live system - so lets clean -        # out the current configuration :) -        self.session.delete(base_path) - -    def tearDown(self): -        self.session.delete(base_path) -        self.session.commit() -        del self.session - -    def test_ntp_options(self): -        """ Test basic NTP support with multiple servers and their options """ -        servers = ['192.0.2.1', '192.0.2.2'] -        options = ['noselect', 'preempt', 'prefer'] - -        for server in servers: -            for option in options: -                self.session.set(base_path + ['server', server, option]) - -        # commit changes -        self.session.commit() - -        # Check generated configuration -        tmp = get_config_value('server') -        for server in servers: -            test = f'{server} iburst ' + ' '.join(options) -            self.assertTrue(test in tmp) - -        # Check for running process -        self.assertTrue("ntpd" in (p.name() for p in process_iter())) - -    def test_ntp_clients(self): -        """ Test the allowed-networks statement """ -        listen_address = ['127.0.0.1', '::1'] -        for listen in listen_address: -            self.session.set(base_path + ['listen-address', listen]) - -        networks = ['192.0.2.0/24', '2001:db8:1000::/64'] -        for network in networks: -            self.session.set(base_path + ['allow-clients', 'address', network]) - -        # Verify "NTP server not configured" verify() statement -        with self.assertRaises(ConfigSessionError): -            self.session.commit() - -        servers = ['192.0.2.1', '192.0.2.2'] -        for server in servers: -            self.session.set(base_path + ['server', server]) - -        self.session.commit() - -        # Check generated client address configuration -        for network in networks: -            network_address = vyos_address_from_cidr(network) -            network_netmask = vyos_netmask_from_cidr(network) - -            tmp = get_config_value(f'restrict {network_address}')[0] -            test = f'mask {network_netmask} nomodify notrap nopeer' -            self.assertTrue(tmp in test) - -        # Check listen address -        tmp = get_config_value('interface') -        test = ['ignore wildcard'] -        for listen in listen_address: -            test.append(f'listen {listen}') -        self.assertEqual(tmp, test) - -        # Check for running process -        self.assertTrue("ntpd" in (p.name() for p in process_iter())) - -if __name__ == '__main__': -    unittest.main() diff --git a/scripts/cli/test_vpn_anyconnect.py b/scripts/cli/test_vpn_anyconnect.py deleted file mode 100755 index dd8ab1609..000000000 --- a/scripts/cli/test_vpn_anyconnect.py +++ /dev/null @@ -1,58 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program.  If not, see <http://www.gnu.org/licenses/>. - -import re -import os -import unittest - -from psutil import process_iter -from vyos.configsession import ConfigSession, ConfigSessionError -from vyos.util import read_file - -OCSERV_CONF = '/run/ocserv/ocserv.conf' -base_path = ['vpn', 'anyconnect'] -cert = '/etc/ssl/certs/ssl-cert-snakeoil.pem' -cert_key = '/etc/ssl/private/ssl-cert-snakeoil.key' - -class TestVpnAnyconnect(unittest.TestCase): -    def setUp(self): -        self.session = ConfigSession(os.getpid()) - -    def tearDown(self): -        # Delete vpn anyconnect configuration -        self.session.delete(base_path) -        self.session.commit() - -        del self.session - -    def test_vpn(self): -        user = 'vyos_user' -        password = 'vyos_pass' -        self.session.delete(base_path) -        self.session.set(base_path + ["authentication", "local-users", "username", user, "password", password]) -        self.session.set(base_path + ["authentication", "mode", "local"]) -        self.session.set(base_path + ["network-settings", "client-ip-settings", "subnet", "192.0.2.0/24"]) -        self.session.set(base_path + ["ssl", "ca-cert-file", cert]) -        self.session.set(base_path + ["ssl", "cert-file", cert]) -        self.session.set(base_path + ["ssl", "key-file", cert_key]) - -        self.session.commit() - -        # Check for running process -        self.assertTrue("ocserv-main" in (p.name() for p in process_iter())) - -if __name__ == '__main__': -    unittest.main() diff --git a/scripts/cli/test_vrf.py b/scripts/cli/test_vrf.py deleted file mode 100755 index efa095b30..000000000 --- a/scripts/cli/test_vrf.py +++ /dev/null @@ -1,52 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program.  If not, see <http://www.gnu.org/licenses/>. - -import os -import unittest - -from vyos.configsession import ConfigSession, ConfigSessionError -from vyos.util import read_file - -class VRFTest(unittest.TestCase): -    def setUp(self): -        self.session = ConfigSession(os.getpid()) -        self._vrfs = ['red', 'green', 'blue'] - -    def tearDown(self): -        # delete all VRFs -        self.session.delete(['vrf']) -        self.session.commit() -        del self.session - -    def test_table_id(self): -        table = 1000 -        for vrf in self._vrfs: -            base = ['vrf', 'name', vrf] -            description = "VyOS-VRF-" + vrf -            self.session.set(base + ['description', description]) - -            # check validate() - a table ID is mandatory -            with self.assertRaises(ConfigSessionError): -                self.session.commit() - -            self.session.set(base + ['table', str(table)]) -            table += 1 - -        # commit changes -        self.session.commit() - -if __name__ == '__main__': -    unittest.main() diff --git a/scripts/import-conf-mode-commands b/scripts/import-conf-mode-commands new file mode 100755 index 000000000..996b31c9c --- /dev/null +++ b/scripts/import-conf-mode-commands @@ -0,0 +1,255 @@ +#!/usr/bin/env python3 +# +#    build-command-template: converts old style commands definitions to XML +# +#    Copyright (C) 2019 VyOS maintainers <maintainers@vyos.net> +# +#    This library is free software; you can redistribute it and/or +#    modify it under the terms of the GNU Lesser General Public +#    License as published by the Free Software Foundation; either +#    version 2.1 of the License, or (at your option) any later version. +# +#    This library is distributed in the hope that it will be useful, +#    but WITHOUT ANY WARRANTY; without even the implied warranty of +#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +#    Lesser General Public License for more details. +# +#    You should have received a copy of the GNU Lesser General Public +#    License along with this library; if not, write to the Free Software +#    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 +#    USA + + +import os +import re +import sys + +from lxml import etree + + +# Node types +NODE = 0 +LEAF_NODE = 1 +TAG_NODE = 2 + +def parse_command_data(t): +    regs = { +        'help': r'\bhelp:(.*)(?:\n|$)', +        'priority': r'\bpriority:(.*)(?:\n|$)', +        'type': r'\btype:(.*)(?:\n|$)', +        'syntax_expression_var': r'\bsyntax:expression: \$VAR\(\@\) in (.*)' +    } + +    data = {'multi': False, 'help': ""} + +    for r in regs: +        try: +            data[r] = re.search(regs[r], t).group(1).strip() +        except: +            data[r] = None + +    # val_help is special: there can be multiple instances +    val_help_strings = re.findall(r'\bval_help:(.*)(?:\n|$)', t) +    val_help = [] +    for v in val_help_strings: +        try: +            fmt, msg = re.match(r'\s*(.*)\s*;\s*(.*)\s*(?:\n|$)', v).groups() +        except: +            fmt = "<text>" +            msg = v +        val_help.append((fmt, msg)) +    data['val_help'] = val_help + +    # multi is on/off +    if re.match(r'\bmulti:', t): +        data['multi'] = True + +    return(data) + +def walk(tree, base_path, name): +    path = os.path.join(base_path, name) + +    contents = os.listdir(path) + +    # Determine node type and create XML element for the node +    # Tag node dirs will always have 'node.tag' subdir and 'node.def' file +    # Leaf node dirs have nothing but a 'node.def' file +    # Everything that doesn't match either of these patterns is a normal node +    if 'node.tag' in contents: +        print("Creating a tag node from {0}".format(path)) +        elem = etree.Element('tagNode') +        node_type = TAG_NODE +    elif contents == ['node.def']: +        print("Creating a leaf node from {0}".format(path)) +        elem = etree.Element('leafNode') +        node_type = LEAF_NODE +    else: +        print("Creating a node from {0}".format(path)) +        elem = etree.Element('node') +        node_type = NODE + +    # Read and parse the command definition data (the 'node.def' file) +    with open(os.path.join(path, 'node.def'), 'r') as f: +        node_def = f.read() +    data = parse_command_data(node_def) + +    # Import the data into the properties element +    props_elem = etree.Element('properties') + +    if data['priority']: +        # Priority values sometimes come with comments that explain the value choice +        try: +            prio, prio_comment = re.match(r'\s*(\d+)\s*#(.*)', data['priority']).groups() +        except: +            prio = data['priority'].strip() +            prio_comment = None +        prio_elem = etree.Element('priority') +        prio_elem.text = prio +        props_elem.append(prio_elem) +        if prio_comment: +            prio_comment_elem = etree.Comment(prio_comment) +            props_elem.append(prio_comment_elem) + +    if data['multi']: +        multi_elem = etree.Element('multi') +        props_elem.append(multi_elem) + +    if data['help']: +        help_elem = etree.Element('help') +        help_elem.text = data['help'] +        props_elem.append(help_elem) + +    # For leaf nodes, absense of a type: tag means they take no values +    # For any other nodes, it doesn't mean anything +    if not data['type'] and (node_type == LEAF_NODE): +        valueless = etree.Element('valueless') +        props_elem.append(valueless) + +    # There can be only one constraint element in the definition +    # Create it now, we'll modify it in the next two cases, then append +    constraint_elem = etree.Element('constraint') +    has_constraint = False + +    # Add regexp field for multiple options +    if data['syntax_expression_var']: +        regex = etree.Element('regex') +        constraint_error=etree.Element('constraintErrorMessage') +        values = re.search(r'(.+) ; (.+)', data['syntax_expression_var']).group(1) +        message = re.search(r'(.+) ; (.+)', data['syntax_expression_var']).group(2) +        values = re.findall(r'\"(.+?)\"', values) +        regex.text = '|'.join(values) +        constraint_error.text = re.sub('\".*?VAR.*?\"', '', message) +        constraint_error.text = re.sub(r'[\"|\\]', '', message) +        constraint_elem.append(regex) +        props_elem.append(constraint_elem) +        props_elem.append(constraint_error) + +    if data['val_help']: +        for vh in data['val_help']: +            vh_elem = etree.Element('valueHelp') + +            vh_fmt_elem = etree.Element('format') +            # Many commands use special "u32:<start>-<end>" format for ranges +            if re.match(r'u32:', vh[0]): +                vh_fmt = re.match(r'u32:(.*)', vh[0]).group(1).strip() + +                # If valid range of values is specified in val_help, we can automatically +                # create a constraint for it +                # Extracting it from syntax:expression: would be much more complicated +                vh_validator = etree.Element('validator') +                vh_validator.set("name", "numeric") +                vh_validator.set("argument", "--range {0}".format(vh_fmt)) +                constraint_elem.append(vh_validator) +                has_constraint = True +            else: +                vh_fmt = vh[0] +            vh_fmt_elem.text = vh_fmt + +            vh_help_elem = etree.Element('description') +            vh_help_elem.text = vh[1] + +            vh_elem.append(vh_fmt_elem) +            vh_elem.append(vh_help_elem) +            props_elem.append(vh_elem) + +    # Translate the "type:" to the new validator system +    if data['type']: +        t = data['type'] +        if t == 'txt': +            # Can't infer anything from the generic "txt" type +            pass +        else: +            validator = etree.Element('validator') +            if t == 'u32': +                validator.set('name', 'numeric') +                validator.set('argument', '--non-negative') +            elif t == 'ipv4': +                validator.set('name', 'ipv4-address') +            elif t == 'ipv4net': +                validator.set('name', 'ipv4-prefix') +            elif t == 'ipv6': +                validator.set('name', 'ipv6-address') +            elif t == 'ipv6net': +                validator.set('name', 'ipv6-prefix') +            elif t == 'macaddr': +                validator.set('name', 'mac-address') +            else: +                print("Warning: unsupported type \'{0}\'".format(t)) +                validator = None + +            if (validator is not None) and (not has_constraint): +                # If has_constraint is true, it means a more specific validator +                # was already extracted from another option +                constraint_elem.append(validator) +                has_constraint = True + +    if has_constraint: +        props_elem.append(constraint_elem) + +    elem.append(props_elem) + +    elem.set("name", name) + +    if node_type != LEAF_NODE: +        children = etree.Element('children') + +        # Create the next level dir path, +        # accounting for the "virtual" node.tag subdir for tag nodes +        next_level = path +        if node_type == TAG_NODE: +            next_level = os.path.join(path, 'node.tag') + +        # Walk the subdirs of the next level +        for d in os.listdir(next_level): +            dp = os.path.join(next_level, d) +            if os.path.isdir(dp): +                walk(children, next_level, d) + +        elem.append(children) + +    tree.append(elem) + +if __name__ == '__main__': +    if len(sys.argv) < 2: +        print("Usage: {0} <base path>".format(sys.argv[0])) +        sys.exit(1) +    else: +        base_path = sys.argv[1] + +    root = etree.Element('interfaceDefinition') +    contents = os.listdir(base_path) +    elem = etree.Element('node') +    elem.set('name', os.path.basename(base_path)) +    children = etree.Element('children') + +    for c in contents: +        path = os.path.join(base_path, c) +        if os.path.isdir(path): +            walk(children, base_path, c) + +    elem.append(children) +    root.append(elem) + +    xml_data = etree.tostring(root, pretty_print=True).decode() +    with open('output.xml', 'w') as f: +        f.write(xml_data) diff --git a/scripts/system/test_module_load.py b/scripts/system/test_module_load.py deleted file mode 100755 index 59c3e0b6a..000000000 --- a/scripts/system/test_module_load.py +++ /dev/null @@ -1,42 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2019-2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program.  If not, see <http://www.gnu.org/licenses/>. - -import os -import unittest - -modules = { -    "intel": ["e1000", "e1000e", "igb", "ixgb", "ixgbe", "ixgbevf", "i40e", "i40evf", "iavf"], -    "accel_ppp": ["ipoe", "vlan_mon"], -    "misc": ["wireguard"] -} - -class TestKernelModules(unittest.TestCase): -    def test_load_modules(self): -        success = True -        for msk in modules: -            ms = modules[msk] -            for m in ms: -                # We want to uncover all modules that fail, -                # not fail at the first one -                try: -                    os.system("modprobe {0}".format(m)) -                except: -                    success = False - -            self.assertTrue(success) - -if __name__ == '__main__': -    unittest.main() | 
