diff options
Diffstat (limited to 'smoketest/scripts')
31 files changed, 2609 insertions, 0 deletions
diff --git a/smoketest/scripts/cli/base_interfaces_test.py b/smoketest/scripts/cli/base_interfaces_test.py new file mode 100644 index 000000000..14ec7e137 --- /dev/null +++ b/smoketest/scripts/cli/base_interfaces_test.py @@ -0,0 +1,269 @@ +# Copyright (C) 2019-2020 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import unittest + +from netifaces import ifaddresses, AF_INET, AF_INET6 + +from vyos.configsession import ConfigSession +from vyos.ifconfig import Interface +from vyos.util import read_file +from vyos.validate import is_intf_addr_assigned, is_ipv6_link_local + +class BasicInterfaceTest: + class BaseTest(unittest.TestCase): + _test_ip = False + _test_mtu = False + _test_vlan = False + _test_qinq = False + _test_ipv6 = False + _base_path = [] + + _options = {} + _interfaces = [] + _qinq_range = ['10', '20', '30'] + _vlan_range = ['100', '200', '300', '2000'] + # choose IPv6 minimum MTU value for tests - this must always work + _mtu = '1280' + + def setUp(self): + self.session = ConfigSession(os.getpid()) + + self._test_addr = ['192.0.2.1/26', '192.0.2.255/31', '192.0.2.64/32', + '2001:db8:1::ffff/64', '2001:db8:101::1/112'] + self._test_mtu = False + self._options = {} + + def tearDown(self): + # we should not remove ethernet from the overall CLI + if 'ethernet' in self._base_path: + for interface in self._interfaces: + # when using a dedicated interface to test via TEST_ETH environment + # variable only this one will be cleared in the end - usable to test + # ethernet interfaces via SSH + self.session.delete(self._base_path + [interface]) + self.session.set(self._base_path + [interface, 'duplex', 'auto']) + self.session.set(self._base_path + [interface, 'speed', 'auto']) + self.session.set(self._base_path + [interface, 'smp-affinity', 'auto']) + else: + self.session.delete(self._base_path) + + self.session.commit() + del self.session + + def test_add_description(self): + """ + Check if description can be added to interface + """ + for intf in self._interfaces: + test_string='Description-Test-{}'.format(intf) + self.session.set(self._base_path + [intf, 'description', test_string]) + for option in self._options.get(intf, []): + self.session.set(self._base_path + [intf] + option.split()) + + self.session.commit() + + # Validate interface description + for intf in self._interfaces: + test_string='Description-Test-{}'.format(intf) + with open('/sys/class/net/{}/ifalias'.format(intf), 'r') as f: + tmp = f.read().rstrip() + self.assertTrue(tmp, test_string) + + def test_add_address_single(self): + """ + Check if a single address can be added to interface. + """ + addr = '192.0.2.0/31' + for intf in self._interfaces: + self.session.set(self._base_path + [intf, 'address', addr]) + for option in self._options.get(intf, []): + self.session.set(self._base_path + [intf] + option.split()) + + self.session.commit() + + for intf in self._interfaces: + self.assertTrue(is_intf_addr_assigned(intf, addr)) + + def test_add_address_multi(self): + """ + Check if IPv4/IPv6 addresses can be added to interface. + """ + + # Add address + for intf in self._interfaces: + for addr in self._test_addr: + self.session.set(self._base_path + [intf, 'address', addr]) + for option in self._options.get(intf, []): + self.session.set(self._base_path + [intf] + option.split()) + + self.session.commit() + + # Validate address + for intf in self._interfaces: + for af in AF_INET, AF_INET6: + for addr in ifaddresses(intf)[af]: + # checking link local addresses makes no sense + if is_ipv6_link_local(addr['addr']): + continue + + self.assertTrue(is_intf_addr_assigned(intf, addr['addr'])) + + def test_ipv6_link_local(self): + """ Common function for IPv6 link-local address assignemnts """ + if not self._test_ipv6: + return None + + for interface in self._interfaces: + base = self._base_path + [interface] + for option in self._options.get(interface, []): + self.session.set(base + option.split()) + + # after commit we must have an IPv6 link-local address + self.session.commit() + + for interface in self._interfaces: + for addr in ifaddresses(interface)[AF_INET6]: + self.assertTrue(is_ipv6_link_local(addr['addr'])) + + # disable IPv6 link-local address assignment + for interface in self._interfaces: + base = self._base_path + [interface] + self.session.set(base + ['ipv6', 'address', 'no-default-link-local']) + + # after commit we must have no IPv6 link-local address + self.session.commit() + + for interface in self._interfaces: + self.assertTrue(AF_INET6 not in ifaddresses(interface)) + + def _mtu_test(self, intf): + """ helper function to verify MTU size """ + with open('/sys/class/net/{}/mtu'.format(intf), 'r') as f: + tmp = f.read().rstrip() + self.assertEqual(tmp, self._mtu) + + def test_change_mtu(self): + """ Testcase if MTU can be changed on interface """ + if not self._test_mtu: + return None + for intf in self._interfaces: + base = self._base_path + [intf] + self.session.set(base + ['mtu', self._mtu]) + for option in self._options.get(intf, []): + self.session.set(base + option.split()) + + self.session.commit() + for intf in self._interfaces: + self._mtu_test(intf) + + def test_8021q_vlan(self): + """ Testcase for 802.1q VLAN interfaces """ + if not self._test_vlan: + return None + + for interface in self._interfaces: + base = self._base_path + [interface] + for option in self._options.get(interface, []): + self.session.set(base + option.split()) + + for vlan in self._vlan_range: + base = self._base_path + [interface, 'vif', vlan] + self.session.set(base + ['mtu', self._mtu]) + for address in self._test_addr: + self.session.set(base + ['address', address]) + + self.session.commit() + for intf in self._interfaces: + for vlan in self._vlan_range: + vif = f'{intf}.{vlan}' + for address in self._test_addr: + self.assertTrue(is_intf_addr_assigned(vif, address)) + self._mtu_test(vif) + + + def test_8021ad_qinq_vlan(self): + """ Testcase for 802.1ad Q-in-Q VLAN interfaces """ + if not self._test_qinq: + return None + + for interface in self._interfaces: + base = self._base_path + [interface] + for option in self._options.get(interface, []): + self.session.set(base + option.split()) + + for vif_s in self._qinq_range: + for vif_c in self._vlan_range: + base = self._base_path + [interface, 'vif-s', vif_s, 'vif-c', vif_c] + self.session.set(base + ['mtu', self._mtu]) + for address in self._test_addr: + self.session.set(base + ['address', address]) + + self.session.commit() + for interface in self._interfaces: + for vif_s in self._qinq_range: + for vif_c in self._vlan_range: + vif = f'{interface}.{vif_s}.{vif_c}' + for address in self._test_addr: + self.assertTrue(is_intf_addr_assigned(vif, address)) + self._mtu_test(vif) + + def test_ip_options(self): + """ test IP options like arp """ + if not self._test_ip: + return None + + for interface in self._interfaces: + arp_tmo = '300' + path = self._base_path + [interface] + for option in self._options.get(interface, []): + self.session.set(path + option.split()) + + # Options + self.session.set(path + ['ip', 'arp-cache-timeout', arp_tmo]) + self.session.set(path + ['ip', 'disable-arp-filter']) + self.session.set(path + ['ip', 'enable-arp-accept']) + self.session.set(path + ['ip', 'enable-arp-announce']) + self.session.set(path + ['ip', 'enable-arp-ignore']) + self.session.set(path + ['ip', 'enable-proxy-arp']) + self.session.set(path + ['ip', 'proxy-arp-pvlan']) + self.session.set(path + ['ip', 'source-validation', 'loose']) + + self.session.commit() + + for interface in self._interfaces: + tmp = read_file(f'/proc/sys/net/ipv4/neigh/{interface}/base_reachable_time_ms') + self.assertEqual(tmp, str((int(arp_tmo) * 1000))) # tmo value is in milli seconds + + tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/arp_filter') + self.assertEqual('0', tmp) + + tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/arp_accept') + self.assertEqual('1', tmp) + + tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/arp_announce') + self.assertEqual('1', tmp) + + tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/arp_ignore') + self.assertEqual('1', tmp) + + tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/proxy_arp') + self.assertEqual('1', tmp) + + tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/proxy_arp_pvlan') + self.assertEqual('1', tmp) + + tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/rp_filter') + self.assertEqual('2', tmp) diff --git a/smoketest/scripts/cli/test_interfaces_bonding.py b/smoketest/scripts/cli/test_interfaces_bonding.py new file mode 100755 index 000000000..e3d3b25ee --- /dev/null +++ b/smoketest/scripts/cli/test_interfaces_bonding.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2020 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import unittest + +from base_interfaces_test import BasicInterfaceTest + +from vyos.ifconfig import Section +from vyos.configsession import ConfigSessionError +from vyos.util import read_file + +class BondingInterfaceTest(BasicInterfaceTest.BaseTest): + def setUp(self): + super().setUp() + + self._base_path = ['interfaces', 'bonding'] + self._interfaces = ['bond0'] + self._test_mtu = True + self._test_vlan = True + self._test_qinq = True + self._test_ipv6 = True + + self._members = [] + # we need to filter out VLAN interfaces identified by a dot (.) + # in their name - just in case! + if 'TEST_ETH' in os.environ: + self._members = os.environ['TEST_ETH'].split() + else: + for tmp in Section.interfaces("ethernet"): + if not '.' in tmp: + self._members.append(tmp) + + self._options['bond0'] = [] + for member in self._members: + self._options['bond0'].append(f'member interface {member}') + + + def test_add_address_single(self): + """ derived method to check if member interfaces are enslaved properly """ + super().test_add_address_single() + + for interface in self._interfaces: + slaves = read_file(f'/sys/class/net/{interface}/bonding/slaves').split() + self.assertListEqual(slaves, self._members) + +if __name__ == '__main__': + unittest.main() diff --git a/smoketest/scripts/cli/test_interfaces_bridge.py b/smoketest/scripts/cli/test_interfaces_bridge.py new file mode 100755 index 000000000..bc0bb69c6 --- /dev/null +++ b/smoketest/scripts/cli/test_interfaces_bridge.py @@ -0,0 +1,70 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2020 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import unittest + +from base_interfaces_test import BasicInterfaceTest +from vyos.ifconfig import Section + +class BridgeInterfaceTest(BasicInterfaceTest.BaseTest): + def setUp(self): + super().setUp() + + self._test_ipv6 = True + + self._base_path = ['interfaces', 'bridge'] + self._interfaces = ['br0'] + + self._members = [] + # we need to filter out VLAN interfaces identified by a dot (.) + # in their name - just in case! + if 'TEST_ETH' in os.environ: + self._members = os.environ['TEST_ETH'].split() + else: + for tmp in Section.interfaces("ethernet"): + if not '.' in tmp: + self._members.append(tmp) + + self._options['br0'] = [] + for member in self._members: + self._options['br0'].append(f'member interface {member}') + + def test_add_remove_member(self): + for interface in self._interfaces: + base = self._base_path + [interface] + self.session.set(base + ['stp']) + self.session.set(base + ['address', '192.0.2.1/24']) + + cost = 1000 + priority = 10 + # assign members to bridge interface + for member in self._members: + base_member = base + ['member', 'interface', member] + self.session.set(base_member + ['cost', str(cost)]) + self.session.set(base_member + ['priority', str(priority)]) + cost += 1 + priority += 1 + + self.session.commit() + + for interface in self._interfaces: + self.session.delete(self._base_path + [interface, 'member']) + + self.session.commit() + +if __name__ == '__main__': + unittest.main() diff --git a/smoketest/scripts/cli/test_interfaces_dummy.py b/smoketest/scripts/cli/test_interfaces_dummy.py new file mode 100755 index 000000000..01942fc89 --- /dev/null +++ b/smoketest/scripts/cli/test_interfaces_dummy.py @@ -0,0 +1,28 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2020 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import unittest + +from base_interfaces_test import BasicInterfaceTest + +class DummyInterfaceTest(BasicInterfaceTest.BaseTest): + def setUp(self): + super().setUp() + self._base_path = ['interfaces', 'dummy'] + self._interfaces = ['dum0', 'dum1', 'dum2'] + +if __name__ == '__main__': + unittest.main() diff --git a/smoketest/scripts/cli/test_interfaces_ethernet.py b/smoketest/scripts/cli/test_interfaces_ethernet.py new file mode 100755 index 000000000..761ec7506 --- /dev/null +++ b/smoketest/scripts/cli/test_interfaces_ethernet.py @@ -0,0 +1,68 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2020 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import unittest + +from base_interfaces_test import BasicInterfaceTest +from vyos.ifconfig import Section + +class EthernetInterfaceTest(BasicInterfaceTest.BaseTest): + def setUp(self): + super().setUp() + + self._base_path = ['interfaces', 'ethernet'] + self._test_ip = True + self._test_mtu = True + self._test_vlan = True + self._test_qinq = True + self._test_ipv6 = True + self._interfaces = [] + + # we need to filter out VLAN interfaces identified by a dot (.) + # in their name - just in case! + if 'TEST_ETH' in os.environ: + tmp = os.environ['TEST_ETH'].split() + self._interfaces = tmp + else: + for tmp in Section.interfaces("ethernet"): + if not '.' in tmp: + self._interfaces.append(tmp) + + def test_dhcp_disable(self): + """ + When interface is configured as admin down, it must be admin down even + """ + for interface in self._interfaces: + self.session.set(self._base_path + [interface, 'disable']) + for option in self._options.get(interface, []): + self.session.set(self._base_path + [interface] + option.split()) + + # Also enable DHCP (ISC DHCP always places interface in admin up + # state so we check that we do not start DHCP client. + # https://phabricator.vyos.net/T2767 + self.session.set(self._base_path + [interface, 'address', 'dhcp']) + + self.session.commit() + + # Validate interface state + for interface in self._interfaces: + with open(f'/sys/class/net/{interface}/flags', 'r') as f: + flags = f.read() + self.assertEqual(int(flags, 16) & 1, 0) + +if __name__ == '__main__': + unittest.main() diff --git a/smoketest/scripts/cli/test_interfaces_geneve.py b/smoketest/scripts/cli/test_interfaces_geneve.py new file mode 100755 index 000000000..f84a55f86 --- /dev/null +++ b/smoketest/scripts/cli/test_interfaces_geneve.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2020 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import unittest + +from vyos.configsession import ConfigSession, ConfigSessionError +from base_interfaces_test import BasicInterfaceTest + + +class GeneveInterfaceTest(BasicInterfaceTest.BaseTest): + def setUp(self): + super().setUp() + + self._base_path = ['interfaces', 'geneve'] + self._options = { + 'gnv0': ['vni 10', 'remote 127.0.1.1'], + 'gnv1': ['vni 20', 'remote 127.0.1.2'], + } + self._interfaces = list(self._options) + + +if __name__ == '__main__': + unittest.main() diff --git a/smoketest/scripts/cli/test_interfaces_l2tpv3.py b/smoketest/scripts/cli/test_interfaces_l2tpv3.py new file mode 100755 index 000000000..d8655d157 --- /dev/null +++ b/smoketest/scripts/cli/test_interfaces_l2tpv3.py @@ -0,0 +1,59 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2020 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import json +import jmespath +import unittest + +from base_interfaces_test import BasicInterfaceTest +from vyos.util import cmd + +class GeneveInterfaceTest(BasicInterfaceTest.BaseTest): + def setUp(self): + super().setUp() + + self._base_path = ['interfaces', 'l2tpv3'] + self._options = { + 'l2tpeth10': ['local-ip 127.0.0.1', 'remote-ip 127.10.10.10', + 'tunnel-id 100', 'peer-tunnel-id 10', + 'session-id 100', 'peer-session-id 10', + 'source-port 1010', 'destination-port 10101'], + 'l2tpeth20': ['local-ip 127.0.0.1', 'peer-session-id 20', + 'peer-tunnel-id 200', 'remote-ip 127.20.20.20', + 'session-id 20', 'tunnel-id 200', + 'source-port 2020', 'destination-port 20202'], + } + self._interfaces = list(self._options) + + def test_add_address_single(self): + super().test_add_address_single() + + command = 'sudo ip -j l2tp show session' + json_out = json.loads(cmd(command)) + for interface in self._options: + for config in json_out: + if config['interface'] == interface: + # convert list with configuration items into a dict + dict = {} + for opt in self._options[interface]: + dict.update({opt.split()[0].replace('-','_'): opt.split()[1]}) + + for key in ['peer_session_id', 'peer_tunnel_id', 'session_id', 'tunnel_id']: + self.assertEqual(str(config[key]), dict[key]) + + +if __name__ == '__main__': + unittest.main() diff --git a/smoketest/scripts/cli/test_interfaces_loopback.py b/smoketest/scripts/cli/test_interfaces_loopback.py new file mode 100755 index 000000000..ba428b5d3 --- /dev/null +++ b/smoketest/scripts/cli/test_interfaces_loopback.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2020 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import unittest + +from base_interfaces_test import BasicInterfaceTest +from vyos.validate import is_intf_addr_assigned + +class LoopbackInterfaceTest(BasicInterfaceTest.BaseTest): + def setUp(self): + super().setUp() + # these addresses are never allowed to be removed from the system + self._loopback_addresses = ['127.0.0.1', '::1'] + self._base_path = ['interfaces', 'loopback'] + self._interfaces = ['lo'] + + def test_add_address_single(self): + super().test_add_address_single() + for addr in self._loopback_addresses: + self.assertTrue(is_intf_addr_assigned('lo', addr)) + + def test_add_address_multi(self): + super().test_add_address_multi() + for addr in self._loopback_addresses: + self.assertTrue(is_intf_addr_assigned('lo', addr)) + +if __name__ == '__main__': + unittest.main() diff --git a/smoketest/scripts/cli/test_interfaces_macsec.py b/smoketest/scripts/cli/test_interfaces_macsec.py new file mode 100755 index 000000000..0f1b6486d --- /dev/null +++ b/smoketest/scripts/cli/test_interfaces_macsec.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2020 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import re +import unittest +from psutil import process_iter + +from vyos.ifconfig import Section +from base_interfaces_test import BasicInterfaceTest +from vyos.configsession import ConfigSessionError +from vyos.util import read_file + +def get_config_value(intf, key): + tmp = read_file(f'/run/wpa_supplicant/{intf}.conf') + tmp = re.findall(r'\n?{}=(.*)'.format(key), tmp) + return tmp[0] + +class MACsecInterfaceTest(BasicInterfaceTest.BaseTest): + def setUp(self): + super().setUp() + self._base_path = ['interfaces', 'macsec'] + self._options = { + 'macsec0': ['source-interface eth0', + 'security cipher gcm-aes-128'] + } + + # if we have a physical eth1 interface, add a second macsec instance + if 'eth1' in Section.interfaces("ethernet"): + macsec = { 'macsec1': ['source-interface eth1', 'security cipher gcm-aes-128'] } + self._options.update(macsec) + + self._interfaces = list(self._options) + + def test_encryption(self): + """ MACsec can be operating in authentication and encryption + mode - both using different mandatory settings, lets test + encryption as the basic authentication test has been performed + using the base class tests """ + intf = 'macsec0' + src_intf = 'eth0' + mak_cak = '232e44b7fda6f8e2d88a07bf78a7aff4' + mak_ckn = '40916f4b23e3d548ad27eedd2d10c6f98c2d21684699647d63d41b500dfe8836' + replay_window = '64' + self.session.set(self._base_path + [intf, 'security', 'encrypt']) + + # check validate() - Cipher suite must be set for MACsec + with self.assertRaises(ConfigSessionError): + self.session.commit() + self.session.set(self._base_path + [intf, 'security', 'cipher', 'gcm-aes-128']) + + # check validate() - Physical source interface must be set for MACsec + with self.assertRaises(ConfigSessionError): + self.session.commit() + self.session.set(self._base_path + [intf, 'source-interface', src_intf]) + + # check validate() - MACsec security keys mandartory when encryption is enabled + with self.assertRaises(ConfigSessionError): + self.session.commit() + self.session.set(self._base_path + [intf, 'security', 'mka', 'cak', mak_cak]) + + # check validate() - MACsec security keys mandartory when encryption is enabled + with self.assertRaises(ConfigSessionError): + self.session.commit() + self.session.set(self._base_path + [intf, 'security', 'mka', 'ckn', mak_ckn]) + + self.session.set(self._base_path + [intf, 'security', 'replay-window', replay_window]) + self.session.commit() + + tmp = get_config_value(src_intf, 'macsec_integ_only') + self.assertTrue("0" in tmp) + + tmp = get_config_value(src_intf, 'mka_cak') + self.assertTrue(mak_cak in tmp) + + tmp = get_config_value(src_intf, 'mka_ckn') + self.assertTrue(mak_ckn in tmp) + + # check that the default priority of 255 is programmed + tmp = get_config_value(src_intf, 'mka_priority') + self.assertTrue("255" in tmp) + + tmp = get_config_value(src_intf, 'macsec_replay_window') + self.assertTrue(replay_window in tmp) + + # Check for running process + self.assertTrue("wpa_supplicant" in (p.name() for p in process_iter())) + +if __name__ == '__main__': + unittest.main() diff --git a/smoketest/scripts/cli/test_interfaces_pppoe.py b/smoketest/scripts/cli/test_interfaces_pppoe.py new file mode 100755 index 000000000..822f05de6 --- /dev/null +++ b/smoketest/scripts/cli/test_interfaces_pppoe.py @@ -0,0 +1,162 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2019-2020 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import re +import os +import unittest + +from psutil import process_iter +from vyos.configsession import ConfigSession, ConfigSessionError +from vyos.util import read_file + +config_file = '/etc/ppp/peers/{}' +dhcp6c_config_file = '/run/dhcp6c/dhcp6c.{}.conf' +base_path = ['interfaces', 'pppoe'] + +def get_config_value(interface, key): + with open(config_file.format(interface), 'r') as f: + for line in f: + if line.startswith(key): + return list(line.split()) + return [] + +def get_dhcp6c_config_value(interface, key): + tmp = read_file(dhcp6c_config_file.format(interface)) + tmp = re.findall(r'\n?{}\s+(.*)'.format(key), tmp) + + out = [] + for item in tmp: + out.append(item.replace(';','')) + return out + +class PPPoEInterfaceTest(unittest.TestCase): + def setUp(self): + self.session = ConfigSession(os.getpid()) + self._interfaces = ['pppoe0', 'pppoe50'] + self._source_interface = 'eth0' + + def tearDown(self): + self.session.delete(base_path) + self.session.commit() + del self.session + + def test_pppoe(self): + """ Check if PPPoE dialer can be configured and runs """ + for interface in self._interfaces: + user = 'VyOS-user-' + interface + passwd = 'VyOS-passwd-' + interface + mtu = '1400' + + self.session.set(base_path + [interface, 'authentication', 'user', user]) + self.session.set(base_path + [interface, 'authentication', 'password', passwd]) + self.session.set(base_path + [interface, 'default-route', 'auto']) + self.session.set(base_path + [interface, 'mtu', mtu]) + self.session.set(base_path + [interface, 'no-peer-dns']) + + # check validate() - a source-interface is required + with self.assertRaises(ConfigSessionError): + self.session.commit() + self.session.set(base_path + [interface, 'source-interface', self._source_interface]) + + # commit changes + self.session.commit() + + # verify configuration file(s) + for interface in self._interfaces: + user = 'VyOS-user-' + interface + password = 'VyOS-passwd-' + interface + + tmp = get_config_value(interface, 'mtu')[1] + self.assertEqual(tmp, mtu) + tmp = get_config_value(interface, 'user')[1].replace('"', '') + self.assertEqual(tmp, user) + tmp = get_config_value(interface, 'password')[1].replace('"', '') + self.assertEqual(tmp, password) + tmp = get_config_value(interface, 'ifname')[1] + self.assertEqual(tmp, interface) + + # Check if ppp process is running in the interface in question + running = False + for p in process_iter(): + if "pppd" in p.name(): + if interface in p.cmdline(): + running = True + + self.assertTrue(running) + + def test_pppoe_dhcpv6pd(self): + """ Check if PPPoE dialer can be configured with DHCPv6-PD """ + address = '1' + sla_id = '0' + sla_len = '8' + for interface in self._interfaces: + self.session.set(base_path + [interface, 'authentication', 'user', 'vyos']) + self.session.set(base_path + [interface, 'authentication', 'password', 'vyos']) + self.session.set(base_path + [interface, 'default-route', 'none']) + self.session.set(base_path + [interface, 'no-peer-dns']) + self.session.set(base_path + [interface, 'source-interface', self._source_interface]) + self.session.set(base_path + [interface, 'ipv6', 'enable']) + + # prefix delegation stuff + dhcpv6_pd_base = base_path + [interface, 'dhcpv6-options', 'pd', '0'] + self.session.set(dhcpv6_pd_base + ['length', '56']) + self.session.set(dhcpv6_pd_base + ['interface', self._source_interface, 'address', address]) + self.session.set(dhcpv6_pd_base + ['interface', self._source_interface, 'sla-id', sla_id]) + + # commit changes + self.session.commit() + + # verify "normal" PPPoE value - 1492 is default MTU + tmp = get_config_value(interface, 'mtu')[1] + self.assertEqual(tmp, '1492') + tmp = get_config_value(interface, 'user')[1].replace('"', '') + self.assertEqual(tmp, 'vyos') + tmp = get_config_value(interface, 'password')[1].replace('"', '') + self.assertEqual(tmp, 'vyos') + + for param in ['+ipv6', 'ipv6cp-use-ipaddr']: + tmp = get_config_value(interface, param)[0] + self.assertEqual(tmp, param) + + # verify DHCPv6 prefix delegation + # will return: ['delegation', '::/56 infinity;'] + tmp = get_dhcp6c_config_value(interface, 'prefix')[1].split()[0] # mind the whitespace + self.assertEqual(tmp, '::/56') + tmp = get_dhcp6c_config_value(interface, 'prefix-interface')[0].split()[0] + self.assertEqual(tmp, self._source_interface) + tmp = get_dhcp6c_config_value(interface, 'ifid')[0] + self.assertEqual(tmp, address) + tmp = get_dhcp6c_config_value(interface, 'sla-id')[0] + self.assertEqual(tmp, sla_id) + tmp = get_dhcp6c_config_value(interface, 'sla-len')[0] + self.assertEqual(tmp, sla_len) + + # Check if ppp process is running in the interface in question + running = False + for p in process_iter(): + if "pppd" in p.name(): + running = True + self.assertTrue(running) + + # We can not check if wide-dhcpv6 process is running as it is started + # after the PPP interface gets a link to the ISP - but we can see if + # it would be started by the scripts + tmp = read_file(f'/etc/ppp/ipv6-up.d/1000-vyos-pppoe-{interface}') + tmp = re.findall(f'systemctl start dhcp6c@{interface}.service', tmp) + self.assertTrue(tmp) + +if __name__ == '__main__': + unittest.main() diff --git a/smoketest/scripts/cli/test_interfaces_pseudo_ethernet.py b/smoketest/scripts/cli/test_interfaces_pseudo_ethernet.py new file mode 100755 index 000000000..bc2e6e7eb --- /dev/null +++ b/smoketest/scripts/cli/test_interfaces_pseudo_ethernet.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2020 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import unittest + +from base_interfaces_test import BasicInterfaceTest + +class PEthInterfaceTest(BasicInterfaceTest.BaseTest): + + def setUp(self): + super().setUp() + self._base_path = ['interfaces', 'pseudo-ethernet'] + + self._test_ip = True + self._test_mtu = True + self._test_vlan = True + self._test_qinq = True + + self._options = { + 'peth0': ['source-interface eth1'], + 'peth1': ['source-interface eth1'], + } + self._interfaces = list(self._options) + +if __name__ == '__main__': + unittest.main() diff --git a/smoketest/scripts/cli/test_interfaces_tunnel.py b/smoketest/scripts/cli/test_interfaces_tunnel.py new file mode 100755 index 000000000..7611ffe26 --- /dev/null +++ b/smoketest/scripts/cli/test_interfaces_tunnel.py @@ -0,0 +1,109 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2020 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import unittest + +from vyos.configsession import ConfigSession + +from base_interfaces_test import BasicInterfaceTest + +class TunnelInterfaceTest(BasicInterfaceTest.BaseTest): + # encoding, tunnel endpoint (v4/v6), address (v4/v6) + _valid = [ + ('gre', 4, 4), + ('gre', 4, 6), + ('ip6gre', 6, 4), + ('ip6gre', 6, 6), + ('gre-bridge', 4, 4), + ('ipip', 4, 4), + ('ipip', 4, 6), + ('ipip6', 6, 4), + ('ipip6', 6, 6), + ('ip6ip6', 6, 6), + ('sit', 4, 6), + ] + + local = { + 4: '10.100.{}.1/24', + 6: '2001:db8:{}::1/64', + } + + remote = { + 4: '192.0.{}.1', + 6: '2002::{}:1', + } + + address = { + 4: '10.100.{}.1/24', + 6: '2001:db8:{}::1/64', + } + + def setUp(self): + local = {} + remote = {} + address = {} + + self._intf_dummy = ['interfaces', 'dummy'] + self._base_path = ['interfaces', 'tunnel'] + self._interfaces = ['tun{}'.format(n) for n in range(len(self._valid))] + + self._test_mtu = True + super().setUp() + + for number in range(len(self._valid)): + dum4 = 'dum4{}'.format(number) + dum6 = 'dum6{}'.format(number) + + ipv4 = self.local[4].format(number) + ipv6 = self.local[6].format(number) + + local.setdefault(4, {})[number] = ipv4 + local.setdefault(6, {})[number] = ipv6 + + ipv4 = self.remote[4].format(number) + ipv6 = self.remote[6].format(number) + + remote.setdefault(4, {})[number] = ipv4 + remote.setdefault(6, {})[number] = ipv6 + + ipv4 = self.address[4].format(number) + ipv6 = self.address[6].format(number) + + address.setdefault(4, {})[number] = ipv4 + address.setdefault(6, {})[number] = ipv6 + + self.session.set(self._intf_dummy + [dum4, 'address', ipv4]) + self.session.set(self._intf_dummy + [dum6, 'address', ipv6]) + self.session.commit() + + for number, (encap, p2p, addr) in enumerate(self._valid): + intf = 'tun%d' % number + tunnel = {} + tunnel['encapsulation'] = encap + tunnel['local-ip'] = local[p2p][number].split('/')[0] + tunnel['remote-ip'] = remote[p2p][number].split('/')[0] + tunnel['address'] = address[addr][number] + for name in tunnel: + self.session.set(self._base_path + [intf, name, tunnel[name]]) + + def tearDown(self): + self.session.delete(self._intf_dummy) + super().tearDown() + + +if __name__ == '__main__': + unittest.main() diff --git a/smoketest/scripts/cli/test_interfaces_vxlan.py b/smoketest/scripts/cli/test_interfaces_vxlan.py new file mode 100755 index 000000000..2628e0285 --- /dev/null +++ b/smoketest/scripts/cli/test_interfaces_vxlan.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2020 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import unittest + +from vyos.configsession import ConfigSession, ConfigSessionError +from base_interfaces_test import BasicInterfaceTest + +class VXLANInterfaceTest(BasicInterfaceTest.BaseTest): + def setUp(self): + super().setUp() + + self._test_mtu = True + self._base_path = ['interfaces', 'vxlan'] + self._options = { + 'vxlan0': ['vni 10', 'remote 127.0.0.2'], + 'vxlan1': ['vni 20', 'group 239.1.1.1', 'source-interface eth0'], + } + self._interfaces = list(self._options) + +if __name__ == '__main__': + unittest.main() diff --git a/smoketest/scripts/cli/test_interfaces_wireguard.py b/smoketest/scripts/cli/test_interfaces_wireguard.py new file mode 100755 index 000000000..0c32a4696 --- /dev/null +++ b/smoketest/scripts/cli/test_interfaces_wireguard.py @@ -0,0 +1,68 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2020 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import unittest + +from vyos.configsession import ConfigSession, ConfigSessionError +from base_interfaces_test import BasicInterfaceTest + +# Generate WireGuard default keypair +if not os.path.isdir('/config/auth/wireguard/default'): + os.system('sudo /usr/libexec/vyos/op_mode/wireguard.py --genkey') + +base_path = ['interfaces', 'wireguard'] + +class WireGuardInterfaceTest(unittest.TestCase): + def setUp(self): + self.session = ConfigSession(os.getpid()) + self._test_addr = ['192.0.2.1/26', '192.0.2.255/31', '192.0.2.64/32', + '2001:db8:1::ffff/64', '2001:db8:101::1/112'] + self._interfaces = ['wg0', 'wg1'] + + def tearDown(self): + self.session.delete(base_path) + self.session.commit() + del self.session + + def test_peer_setup(self): + """ + Create WireGuard interfaces with associated peers + """ + for intf in self._interfaces: + peer = 'foo-' + intf + psk = 'u2xdA70hkz0S1CG0dZlOh0aq2orwFXRIVrKo4DCvHgM=' + pubkey = 'n6ZZL7ph/QJUJSUUTyu19c77my1dRCDHkMzFQUO9Z3A=' + + for addr in self._test_addr: + self.session.set(base_path + [intf, 'address', addr]) + + self.session.set(base_path + [intf, 'peer', peer, 'address', '127.0.0.1']) + self.session.set(base_path + [intf, 'peer', peer, 'port', '1337']) + + # Allow different prefixes to traverse the tunnel + allowed_ips = ['10.0.0.0/8', '172.16.0.0/12', '192.168.0.0/16'] + for ip in allowed_ips: + self.session.set(base_path + [intf, 'peer', peer, 'allowed-ips', ip]) + + self.session.set(base_path + [intf, 'peer', peer, 'preshared-key', psk]) + self.session.set(base_path + [intf, 'peer', peer, 'pubkey', pubkey]) + self.session.commit() + + self.assertTrue(os.path.isdir(f'/sys/class/net/{intf}')) + +if __name__ == '__main__': + unittest.main() diff --git a/smoketest/scripts/cli/test_interfaces_wireless.py b/smoketest/scripts/cli/test_interfaces_wireless.py new file mode 100755 index 000000000..fae233244 --- /dev/null +++ b/smoketest/scripts/cli/test_interfaces_wireless.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2020 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import unittest + +from base_interfaces_test import BasicInterfaceTest +from psutil import process_iter +from vyos.util import check_kmod + +class WirelessInterfaceTest(BasicInterfaceTest.BaseTest): + def setUp(self): + super().setUp() + + self._base_path = ['interfaces', 'wireless'] + self._options = { + 'wlan0': ['physical-device phy0', 'ssid VyOS-WIFI-0', + 'type station', 'address 192.0.2.1/30'], + 'wlan1': ['physical-device phy0', 'ssid VyOS-WIFI-1', + 'type access-point', 'address 192.0.2.5/30', 'channel 0'], + 'wlan10': ['physical-device phy1', 'ssid VyOS-WIFI-2', + 'type station', 'address 192.0.2.9/30'], + 'wlan11': ['physical-device phy1', 'ssid VyOS-WIFI-3', + 'type access-point', 'address 192.0.2.13/30', 'channel 0'], + } + self._interfaces = list(self._options) + self.session.set(['system', 'wifi-regulatory-domain', 'SE']) + + def test_add_address_single(self): + """ derived method to check if member interfaces are enslaved properly """ + super().test_add_address_single() + + for option, option_value in self._options.items(): + if 'type access-point' in option_value: + # Check for running process + self.assertIn('hostapd', (p.name() for p in process_iter())) + elif 'type station' in option_value: + # Check for running process + self.assertIn('wpa_supplicant', (p.name() for p in process_iter())) + else: + self.assertTrue(False) + +if __name__ == '__main__': + check_kmod('mac80211_hwsim') + unittest.main() diff --git a/smoketest/scripts/cli/test_interfaces_wirelessmodem.py b/smoketest/scripts/cli/test_interfaces_wirelessmodem.py new file mode 100755 index 000000000..40cd03b93 --- /dev/null +++ b/smoketest/scripts/cli/test_interfaces_wirelessmodem.py @@ -0,0 +1,82 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2020 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import unittest + +from psutil import process_iter +from vyos.configsession import ConfigSession, ConfigSessionError + +config_file = '/etc/ppp/peers/{}' +base_path = ['interfaces', 'wirelessmodem'] + +def get_config_value(interface, key): + with open(config_file.format(interface), 'r') as f: + for line in f: + if line.startswith(key): + return list(line.split()) + return [] + +class WWANInterfaceTest(unittest.TestCase): + def setUp(self): + self.session = ConfigSession(os.getpid()) + self._interfaces = ['wlm0', 'wlm1'] + + def tearDown(self): + self.session.delete(base_path) + self.session.commit() + del self.session + + def test_wlm_1(self): + for interface in self._interfaces: + self.session.set(base_path + [interface, 'no-peer-dns']) + self.session.set(base_path + [interface, 'ondemand']) + + # check validate() - APN must be configure + with self.assertRaises(ConfigSessionError): + self.session.commit() + self.session.set(base_path + [interface, 'apn', 'vyos.net']) + + # check validate() - device must be configure + with self.assertRaises(ConfigSessionError): + self.session.commit() + self.session.set(base_path + [interface, 'device', 'ttyS0']) + + # commit changes + self.session.commit() + + # verify configuration file(s) + for interface in self._interfaces: + tmp = get_config_value(interface, 'ifname')[1] + self.assertTrue(interface in tmp) + + tmp = get_config_value(interface, 'demand')[0] + self.assertTrue('demand' in tmp) + + tmp = os.path.isfile(f'/etc/ppp/peers/chat.{interface}') + self.assertTrue(tmp) + + # Check if ppp process is running in the interface in question + running = False + for p in process_iter(): + if "pppd" in p.name(): + if interface in p.cmdline(): + running = True + + self.assertTrue(running) + +if __name__ == '__main__': + unittest.main() diff --git a/smoketest/scripts/cli/test_nat.py b/smoketest/scripts/cli/test_nat.py new file mode 100755 index 000000000..b06fa239d --- /dev/null +++ b/smoketest/scripts/cli/test_nat.py @@ -0,0 +1,74 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2020 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import jmespath +import json +import unittest + +from vyos.configsession import ConfigSession, ConfigSessionError +from vyos.util import cmd + +base_path = ['nat'] +source_path = base_path + ['source'] + +snat_pattern = 'nftables[?rule].rule[?chain].{chain: chain, comment: comment, address: { network: expr[].match.right.prefix.addr | [0], prefix: expr[].match.right.prefix.len | [0]}}' + +class TestNAT(unittest.TestCase): + def setUp(self): + # ensure we can also run this test on a live system - so lets clean + # out the current configuration :) + self.session = ConfigSession(os.getpid()) + self.session.delete(base_path) + + def tearDown(self): + self.session.delete(base_path) + self.session.commit() + + def test_source_nat(self): + """ Configure and validate source NAT rule(s) """ + + network = '192.168.0.0/16' + self.session.set(source_path + ['rule', '1', 'destination', 'address', network]) + self.session.set(source_path + ['rule', '1', 'exclude']) + + # check validate() - outbound-interface must be defined + with self.assertRaises(ConfigSessionError): + self.session.commit() + + self.session.set(source_path + ['rule', '1', 'outbound-interface', 'any']) + self.session.commit() + + tmp = cmd('sudo nft -j list table nat') + nftable_json = json.loads(tmp) + condensed_json = jmespath.search(snat_pattern, nftable_json)[0] + + self.assertEqual(condensed_json['comment'], 'DST-NAT-1') + self.assertEqual(condensed_json['address']['network'], network.split('/')[0]) + self.assertEqual(str(condensed_json['address']['prefix']), network.split('/')[1]) + + + def test_validation(self): + """ T2813: Ensure translation address is specified """ + self.session.set(source_path + ['rule', '100', 'outbound-interface', 'eth0']) + + # check validate() - translation address not specified + with self.assertRaises(ConfigSessionError): + self.session.commit() + + +if __name__ == '__main__': + unittest.main() diff --git a/smoketest/scripts/cli/test_service_bcast-relay.py b/smoketest/scripts/cli/test_service_bcast-relay.py new file mode 100755 index 000000000..fe4531c3b --- /dev/null +++ b/smoketest/scripts/cli/test_service_bcast-relay.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2019-2020 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import unittest + +from psutil import process_iter +from vyos.configsession import ConfigSession, ConfigSessionError + +base_path = ['service', 'broadcast-relay'] + +class TestServiceBroadcastRelay(unittest.TestCase): + _address1 = '192.0.2.1/24' + _address2 = '192.0.2.1/24' + + def setUp(self): + self.session = ConfigSession(os.getpid()) + self.session.set(['interfaces', 'dummy', 'dum1001', 'address', self._address1]) + self.session.set(['interfaces', 'dummy', 'dum1002', 'address', self._address2]) + self.session.commit() + + def tearDown(self): + self.session.delete(['interfaces', 'dummy', 'dum1001']) + self.session.delete(['interfaces', 'dummy', 'dum1002']) + self.session.delete(base_path) + self.session.commit() + del self.session + + def test_service(self): + """ Check if broadcast relay service can be configured and runs """ + ids = range(1, 5) + for id in ids: + base = base_path + ['id', str(id)] + self.session.set(base + ['description', 'vyos']) + self.session.set(base + ['port', str(10000 + id)]) + + # check validate() - two interfaces must be present + with self.assertRaises(ConfigSessionError): + self.session.commit() + + self.session.set(base + ['interface', 'dum1001']) + self.session.set(base + ['interface', 'dum1002']) + self.session.set(base + ['address', self._address1.split('/')[0]]) + + self.session.commit() + + for id in ids: + # check if process is running + running = False + for p in process_iter(): + if "udp-broadcast-relay" in p.name(): + if p.cmdline()[3] == str(id): + running = True + break + self.assertTrue(running) + +if __name__ == '__main__': + unittest.main() diff --git a/smoketest/scripts/cli/test_service_dns_dynamic.py b/smoketest/scripts/cli/test_service_dns_dynamic.py new file mode 100755 index 000000000..be52360ed --- /dev/null +++ b/smoketest/scripts/cli/test_service_dns_dynamic.py @@ -0,0 +1,141 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2019-2020 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import re +import os +import unittest + +from getpass import getuser +from psutil import process_iter +from vyos.configsession import ConfigSession, ConfigSessionError +from vyos.util import read_file + +DDCLIENT_CONF = '/run/ddclient/ddclient.conf' +base_path = ['service', 'dns', 'dynamic'] + +def get_config_value(key): + tmp = read_file(DDCLIENT_CONF) + tmp = re.findall(r'\n?{}=+(.*)'.format(key), tmp) + tmp = tmp[0].rstrip(',') + return tmp + +def check_process(): + """ + Check for running process, process name changes dynamically e.g. + "ddclient - sleeping for 270 seconds", thus we need a different approach + """ + running = False + for p in process_iter(): + if "ddclient" in p.name(): + running = True + return running + +class TestServiceDDNS(unittest.TestCase): + def setUp(self): + self.session = ConfigSession(os.getpid()) + + def tearDown(self): + # Delete DDNS configuration + self.session.delete(base_path) + self.session.commit() + + del self.session + + def test_service(self): + """ Check individual DDNS service providers """ + ddns = ['interface', 'eth0', 'service'] + services = ['cloudflare', 'afraid', 'dyndns', 'zoneedit'] + + for service in services: + user = 'vyos_user' + password = 'vyos_pass' + zone = 'vyos.io' + self.session.delete(base_path) + self.session.set(base_path + ddns + [service, 'host-name', 'test.ddns.vyos.io']) + self.session.set(base_path + ddns + [service, 'login', user]) + self.session.set(base_path + ddns + [service, 'password', password]) + self.session.set(base_path + ddns + [service, 'zone', zone]) + + # commit changes + if service == 'cloudflare': + self.session.commit() + else: + # zone option only works on cloudflare, an exception is raised + # for all others + with self.assertRaises(ConfigSessionError): + self.session.commit() + self.session.delete(base_path + ddns + [service, 'zone', 'vyos.io']) + # commit changes again - now it should work + self.session.commit() + + # we can only read the configuration file when we operate as 'root' + if getuser() == 'root': + protocol = get_config_value('protocol') + login = get_config_value('login') + pwd = get_config_value('password') + + # some services need special treatment + protoname = service + if service == 'cloudflare': + tmp = get_config_value('zone') + self.assertTrue(tmp == zone) + elif service == 'afraid': + protoname = 'freedns' + elif service == 'dyndns': + protoname = 'dyndns2' + elif service == 'zoneedit': + protoname = 'zoneedit1' + + self.assertTrue(protocol == protoname) + self.assertTrue(login == user) + self.assertTrue(pwd == "'" + password + "'") + + # Check for running process + self.assertTrue(check_process()) + + + def test_rfc2136(self): + """ Check if DDNS service can be configured and runs """ + ddns = ['interface', 'eth0', 'rfc2136', 'vyos'] + ddns_key_file = '/config/auth/my.key' + + self.session.set(base_path + ddns + ['key', ddns_key_file]) + self.session.set(base_path + ddns + ['record', 'test.ddns.vyos.io']) + self.session.set(base_path + ddns + ['server', 'ns1.vyos.io']) + self.session.set(base_path + ddns + ['ttl', '300']) + self.session.set(base_path + ddns + ['zone', 'vyos.io']) + + # ensure an exception will be raised as no key is present + if os.path.exists(ddns_key_file): + os.unlink(ddns_key_file) + + # check validate() - the key file does not exist yet + with self.assertRaises(ConfigSessionError): + self.session.commit() + + with open(ddns_key_file, 'w') as f: + f.write('S3cretKey') + + # commit changes + self.session.commit() + + # TODO: inspect generated configuration file + + # Check for running process + self.assertTrue(check_process()) + +if __name__ == '__main__': + unittest.main() diff --git a/smoketest/scripts/cli/test_service_mdns-repeater.py b/smoketest/scripts/cli/test_service_mdns-repeater.py new file mode 100755 index 000000000..18900b6d2 --- /dev/null +++ b/smoketest/scripts/cli/test_service_mdns-repeater.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2020 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import unittest + +from psutil import process_iter +from vyos.configsession import ConfigSession + +base_path = ['service', 'mdns', 'repeater'] +intf_base = ['interfaces', 'dummy'] + +class TestServiceMDNSrepeater(unittest.TestCase): + def setUp(self): + self.session = ConfigSession(os.getpid()) + + def tearDown(self): + self.session.delete(base_path) + self.session.delete(intf_base + ['dum10']) + self.session.delete(intf_base + ['dum20']) + self.session.commit() + del self.session + + def test_service(self): + # Service required a configured IP address on the interface + + self.session.set(intf_base + ['dum10', 'address', '192.0.2.1/30']) + self.session.set(intf_base + ['dum20', 'address', '192.0.2.5/30']) + + self.session.set(base_path + ['interface', 'dum10']) + self.session.set(base_path + ['interface', 'dum20']) + self.session.commit() + + # Check for running process + self.assertTrue("mdns-repeater" in (p.name() for p in process_iter())) + +if __name__ == '__main__': + unittest.main() diff --git a/smoketest/scripts/cli/test_service_pppoe-server.py b/smoketest/scripts/cli/test_service_pppoe-server.py new file mode 100755 index 000000000..901ca792d --- /dev/null +++ b/smoketest/scripts/cli/test_service_pppoe-server.py @@ -0,0 +1,167 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 020 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import re +import os +import unittest + +from configparser import ConfigParser +from psutil import process_iter +from vyos.configsession import ConfigSession +from vyos.configsession import ConfigSessionError + +base_path = ['service', 'pppoe-server'] +local_if = ['interfaces', 'dummy', 'dum667'] +pppoe_conf = '/run/accel-pppd/pppoe.conf' + +ac_name = 'ACN' +subnet = '172.18.0.0/24' +gateway = '192.0.2.1' +nameserver = '9.9.9.9' +mtu = '1492' +interface = 'eth0' + +class TestServicePPPoEServer(unittest.TestCase): + def setUp(self): + self.session = ConfigSession(os.getpid()) + # ensure we can also run this test on a live system - so lets clean + # out the current configuration :) + self.session.delete(base_path) + + def tearDown(self): + self.session.delete(base_path) + self.session.delete(local_if) + self.session.commit() + del self.session + + def verify(self, conf): + # validate some common values in the configuration + for tmp in ['log_syslog', 'pppoe', 'chap-secrets', 'ippool', 'ipv6pool', + 'ipv6_nd', 'ipv6_dhcp', 'auth_mschap_v2', 'auth_mschap_v1', + 'auth_chap_md5', 'auth_pap', 'shaper']: + # Settings without values provide None + self.assertEqual(conf['modules'][tmp], None) + + # check Access Concentrator setting + self.assertTrue(conf['pppoe']['ac-name'] == ac_name) + self.assertTrue(conf['pppoe'].getboolean('verbose')) + self.assertTrue(conf['pppoe']['interface'], interface) + + # check configured subnet + self.assertEqual(conf['ip-pool'][subnet], None) + self.assertEqual(conf['ip-pool']['gw-ip-address'], gateway) + + # check ppp + self.assertTrue(conf['ppp'].getboolean('verbose')) + self.assertTrue(conf['ppp'].getboolean('check-ip')) + self.assertEqual(conf['ppp']['min-mtu'], mtu) + self.assertEqual(conf['ppp']['mtu'], mtu) + self.assertEqual(conf['ppp']['lcp-echo-interval'], '30') + self.assertEqual(conf['ppp']['lcp-echo-timeout'], '0') + self.assertEqual(conf['ppp']['lcp-echo-failure'], '3') + + def basic_config(self): + self.session.set(local_if + ['address', '192.0.2.1/32']) + + self.session.set(base_path + ['access-concentrator', ac_name]) + self.session.set(base_path + ['authentication', 'mode', 'local']) + self.session.set(base_path + ['client-ip-pool', 'subnet', subnet]) + self.session.set(base_path + ['name-server', nameserver]) + self.session.set(base_path + ['interface', interface]) + self.session.set(base_path + ['local-ip', gateway]) + + def test_local_auth(self): + """ Test configuration of local authentication for PPPoE server """ + self.basic_config() + # authentication + self.session.set(base_path + ['authentication', 'local-users', 'username', 'vyos', 'password', 'vyos']) + self.session.set(base_path + ['authentication', 'mode', 'local']) + # other settings + self.session.set(base_path + ['ppp-options', 'ccp']) + self.session.set(base_path + ['ppp-options', 'mppe', 'require']) + self.session.set(base_path + ['limits', 'connection-limit', '20/min']) + + # commit changes + self.session.commit() + + # Validate configuration values + conf = ConfigParser(allow_no_value=True) + conf.read(pppoe_conf) + + # basic verification + self.verify(conf) + + # check auth + self.assertEqual(conf['chap-secrets']['chap-secrets'], '/run/accel-pppd/pppoe.chap-secrets') + self.assertEqual(conf['chap-secrets']['gw-ip-address'], gateway) + + # check pado + self.assertEqual(conf['ppp']['mppe'], 'require') + self.assertTrue(conf['ppp'].getboolean('ccp')) + + # check other settings + self.assertEqual(conf['connlimit']['limit'], '20/min') + + # Check for running process + self.assertTrue('accel-pppd' in (p.name() for p in process_iter())) + + def test_radius_auth(self): + """ Test configuration of RADIUS authentication for PPPoE server """ + radius_server = '192.0.2.22' + radius_key = 'secretVyOS' + radius_port = '2000' + radius_port_acc = '3000' + + self.basic_config() + self.session.set(base_path + ['authentication', 'radius', 'server', radius_server, 'key', radius_key]) + self.session.set(base_path + ['authentication', 'radius', 'server', radius_server, 'port', radius_port]) + self.session.set(base_path + ['authentication', 'radius', 'server', radius_server, 'acct-port', radius_port_acc]) + self.session.set(base_path + ['authentication', 'mode', 'radius']) + + # commit changes + self.session.commit() + + # Validate configuration values + conf = ConfigParser(allow_no_value=True) + conf.read(pppoe_conf) + + # basic verification + self.verify(conf) + + # check auth + self.assertTrue(conf['radius'].getboolean('verbose')) + self.assertTrue(conf['radius']['acct-timeout'], '3') + self.assertTrue(conf['radius']['timeout'], '3') + self.assertTrue(conf['radius']['max-try'], '3') + self.assertTrue(conf['radius']['gw-ip-address'], gateway) + + server = conf['radius']['server'].split(',') + self.assertEqual(radius_server, server[0]) + self.assertEqual(radius_key, server[1]) + self.assertEqual(f'auth-port={radius_port}', server[2]) + self.assertEqual(f'acct-port={radius_port_acc}', server[3]) + self.assertEqual(f'req-limit=0', server[4]) + self.assertEqual(f'fail-time=0', server[5]) + + # check defaults + self.assertEqual(conf['ppp']['mppe'], 'prefer') + self.assertFalse(conf['ppp'].getboolean('ccp')) + + # Check for running process + self.assertTrue('accel-pppd' in (p.name() for p in process_iter())) + +if __name__ == '__main__': + unittest.main() diff --git a/smoketest/scripts/cli/test_service_router-advert.py b/smoketest/scripts/cli/test_service_router-advert.py new file mode 100755 index 000000000..ec2110c8a --- /dev/null +++ b/smoketest/scripts/cli/test_service_router-advert.py @@ -0,0 +1,99 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2019-2020 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import re +import os +import unittest + +from psutil import process_iter +from vyos.configsession import ConfigSession +from vyos.util import read_file + +RADVD_CONF = '/run/radvd/radvd.conf' + +interface = 'eth1' +base_path = ['service', 'router-advert', 'interface', interface] +address_base = ['interfaces', 'ethernet', interface, 'address'] + +def get_config_value(key): + tmp = read_file(RADVD_CONF) + tmp = re.findall(r'\n?{}\s+(.*)'.format(key), tmp) + return tmp[0].split()[0].replace(';','') + +class TestServiceRADVD(unittest.TestCase): + def setUp(self): + self.session = ConfigSession(os.getpid()) + self.session.set(address_base + ['2001:db8::1/64']) + + def tearDown(self): + self.session.delete(address_base) + self.session.delete(base_path) + self.session.commit() + del self.session + + def test_single(self): + self.session.set(base_path + ['prefix', '::/64', 'no-on-link-flag']) + self.session.set(base_path + ['prefix', '::/64', 'no-autonomous-flag']) + self.session.set(base_path + ['prefix', '::/64', 'valid-lifetime', 'infinity']) + self.session.set(base_path + ['dnssl', '2001:db8::1234']) + self.session.set(base_path + ['other-config-flag']) + + # commit changes + self.session.commit() + + # verify values + tmp = get_config_value('interface') + self.assertEqual(tmp, interface) + + tmp = get_config_value('prefix') + self.assertEqual(tmp, '::/64') + + tmp = get_config_value('AdvOtherConfigFlag') + self.assertEqual(tmp, 'on') + + # this is a default value + tmp = get_config_value('AdvRetransTimer') + self.assertEqual(tmp, '0') + + # this is a default value + tmp = get_config_value('AdvCurHopLimit') + self.assertEqual(tmp, '64') + + # this is a default value + tmp = get_config_value('AdvDefaultPreference') + self.assertEqual(tmp, 'medium') + + tmp = get_config_value('AdvAutonomous') + self.assertEqual(tmp, 'off') + + # this is a default value + tmp = get_config_value('AdvValidLifetime') + self.assertEqual(tmp, 'infinity') + + # this is a default value + tmp = get_config_value('AdvPreferredLifetime') + self.assertEqual(tmp, '14400') + + tmp = get_config_value('AdvOnLink') + self.assertEqual(tmp, 'off') + + + + # Check for running process + self.assertTrue('radvd' in (p.name() for p in process_iter())) + +if __name__ == '__main__': + unittest.main() diff --git a/smoketest/scripts/cli/test_service_snmp.py b/smoketest/scripts/cli/test_service_snmp.py new file mode 100755 index 000000000..fb5f5393f --- /dev/null +++ b/smoketest/scripts/cli/test_service_snmp.py @@ -0,0 +1,155 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2019-2020 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import re +import unittest + +from vyos.validate import is_ipv4 +from psutil import process_iter + +from vyos.configsession import ConfigSession, ConfigSessionError +from vyos.util import read_file + +SNMPD_CONF = '/etc/snmp/snmpd.conf' +base_path = ['service', 'snmp'] + +def get_config_value(key): + tmp = read_file(SNMPD_CONF) + tmp = re.findall(r'\n?{}\s+(.*)'.format(key), tmp) + return tmp[0] + +class TestSNMPService(unittest.TestCase): + def setUp(self): + self.session = ConfigSession(os.getpid()) + # ensure we can also run this test on a live system - so lets clean + # out the current configuration :) + self.session.delete(base_path) + + def tearDown(self): + del self.session + + def test_snmp(self): + """ Check if SNMP can be configured and service runs """ + clients = ['192.0.2.1', '2001:db8::1'] + networks = ['192.0.2.128/25', '2001:db8:babe::/48'] + listen = ['127.0.0.1', '::1'] + + for auth in ['ro', 'rw']: + community = 'VyOS' + auth + self.session.set(base_path + ['community', community, 'authorization', auth]) + for client in clients: + self.session.set(base_path + ['community', community, 'client', client]) + for network in networks: + self.session.set(base_path + ['community', community, 'network', network]) + + for addr in listen: + self.session.set(base_path + ['listen-address', addr]) + + self.session.set(base_path + ['contact', 'maintainers@vyos.io']) + self.session.set(base_path + ['location', 'qemu']) + + self.session.commit() + + # verify listen address, it will be returned as + # ['unix:/run/snmpd.socket,udp:127.0.0.1:161,udp6:[::1]:161'] + # thus we need to transfor this into a proper list + config = get_config_value('agentaddress') + expected = 'unix:/run/snmpd.socket' + for addr in listen: + if is_ipv4(addr): + expected += ',udp:{}:161'.format(addr) + else: + expected += ',udp6:[{}]:161'.format(addr) + + self.assertTrue(expected in config) + + # Check for running process + self.assertTrue("snmpd" in (p.name() for p in process_iter())) + + + def test_snmpv3_sha(self): + """ Check if SNMPv3 can be configured with SHA authentication and service runs""" + + self.session.set(base_path + ['v3', 'engineid', '000000000000000000000002']) + self.session.set(base_path + ['v3', 'group', 'default', 'mode', 'ro']) + # check validate() - a view must be created before this can be comitted + with self.assertRaises(ConfigSessionError): + self.session.commit() + + self.session.set(base_path + ['v3', 'view', 'default', 'oid', '1']) + self.session.set(base_path + ['v3', 'group', 'default', 'view', 'default']) + + # create user + self.session.set(base_path + ['v3', 'user', 'vyos', 'auth', 'plaintext-password', 'vyos12345678']) + self.session.set(base_path + ['v3', 'user', 'vyos', 'auth', 'type', 'sha']) + self.session.set(base_path + ['v3', 'user', 'vyos', 'privacy', 'plaintext-password', 'vyos12345678']) + self.session.set(base_path + ['v3', 'user', 'vyos', 'privacy', 'type', 'aes']) + self.session.set(base_path + ['v3', 'user', 'vyos', 'group', 'default']) + + self.session.commit() + + # commit will alter the CLI values - check if they have been updated: + hashed_password = '4e52fe55fd011c9c51ae2c65f4b78ca93dcafdfe' + tmp = self.session.show_config(base_path + ['v3', 'user', 'vyos', 'auth', 'encrypted-password']).split()[1] + self.assertEqual(tmp, hashed_password) + + tmp = self.session.show_config(base_path + ['v3', 'user', 'vyos', 'privacy', 'encrypted-password']).split()[1] + self.assertEqual(tmp, hashed_password) + + # TODO: read in config file and check values + + # Check for running process + self.assertTrue("snmpd" in (p.name() for p in process_iter())) + + def test_snmpv3_md5(self): + """ Check if SNMPv3 can be configured with MD5 authentication and service runs""" + + self.session.set(base_path + ['v3', 'engineid', '000000000000000000000002']) + self.session.set(base_path + ['v3', 'group', 'default', 'mode', 'ro']) + # check validate() - a view must be created before this can be comitted + with self.assertRaises(ConfigSessionError): + self.session.commit() + + self.session.set(base_path + ['v3', 'view', 'default', 'oid', '1']) + self.session.set(base_path + ['v3', 'group', 'default', 'view', 'default']) + + # create user + self.session.set(base_path + ['v3', 'user', 'vyos', 'auth', 'plaintext-password', 'vyos12345678']) + self.session.set(base_path + ['v3', 'user', 'vyos', 'auth', 'type', 'md5']) + self.session.set(base_path + ['v3', 'user', 'vyos', 'privacy', 'plaintext-password', 'vyos12345678']) + self.session.set(base_path + ['v3', 'user', 'vyos', 'privacy', 'type', 'des']) + self.session.set(base_path + ['v3', 'user', 'vyos', 'group', 'default']) + + self.session.commit() + + # commit will alter the CLI values - check if they have been updated: + hashed_password = '4c67690d45d3dfcd33d0d7e308e370ad' + tmp = self.session.show_config(base_path + ['v3', 'user', 'vyos', 'auth', 'encrypted-password']).split()[1] + self.assertEqual(tmp, hashed_password) + + tmp = self.session.show_config(base_path + ['v3', 'user', 'vyos', 'privacy', 'encrypted-password']).split()[1] + self.assertEqual(tmp, hashed_password) + + # TODO: read in config file and check values + + # Check for running process + self.assertTrue("snmpd" in (p.name() for p in process_iter())) + + +if __name__ == '__main__': + unittest.main() + diff --git a/smoketest/scripts/cli/test_service_ssh.py b/smoketest/scripts/cli/test_service_ssh.py new file mode 100755 index 000000000..3ee498f3d --- /dev/null +++ b/smoketest/scripts/cli/test_service_ssh.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2019-2020 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import re +import os +import unittest + +from psutil import process_iter +from vyos.configsession import ConfigSession, ConfigSessionError +from vyos.util import read_file + +SSHD_CONF = '/run/ssh/sshd_config' +base_path = ['service', 'ssh'] + +def get_config_value(key): + tmp = read_file(SSHD_CONF) + tmp = re.findall(r'\n?{}\s+(.*)'.format(key), tmp) + return tmp + +class TestServiceSSH(unittest.TestCase): + def setUp(self): + self.session = ConfigSession(os.getpid()) + # ensure we can also run this test on a live system - so lets clean + # out the current configuration :) + self.session.delete(base_path) + + def tearDown(self): + # delete testing SSH config + self.session.delete(base_path) + # restore "plain" SSH access + self.session.set(base_path) + + self.session.commit() + del self.session + + def test_ssh_single(self): + """ Check if SSH service can be configured and runs """ + self.session.set(base_path + ['port', '1234']) + self.session.set(base_path + ['disable-host-validation']) + self.session.set(base_path + ['disable-password-authentication']) + self.session.set(base_path + ['loglevel', 'verbose']) + self.session.set(base_path + ['client-keepalive-interval', '100']) + self.session.set(base_path + ['listen-address', '127.0.0.1']) + + # commit changes + self.session.commit() + + # Check configured port + port = get_config_value('Port')[0] + self.assertTrue("1234" in port) + + # Check DNS usage + dns = get_config_value('UseDNS')[0] + self.assertTrue("no" in dns) + + # Check PasswordAuthentication + pwd = get_config_value('PasswordAuthentication')[0] + self.assertTrue("no" in pwd) + + # Check loglevel + loglevel = get_config_value('LogLevel')[0] + self.assertTrue("VERBOSE" in loglevel) + + # Check listen address + address = get_config_value('ListenAddress')[0] + self.assertTrue("127.0.0.1" in address) + + # Check keepalive + keepalive = get_config_value('ClientAliveInterval')[0] + self.assertTrue("100" in keepalive) + + # Check for running process + self.assertTrue("sshd" in (p.name() for p in process_iter())) + + def test_ssh_multi(self): + """ Check if SSH service can be configured and runs with multiple + listen ports and listen-addresses """ + ports = ['22', '2222'] + for port in ports: + self.session.set(base_path + ['port', port]) + + addresses = ['127.0.0.1', '::1'] + for address in addresses: + self.session.set(base_path + ['listen-address', address]) + + # commit changes + self.session.commit() + + # Check configured port + tmp = get_config_value('Port') + for port in ports: + self.assertIn(port, tmp) + + # Check listen address + tmp = get_config_value('ListenAddress') + for address in addresses: + self.assertIn(address, tmp) + + # Check for running process + self.assertTrue("sshd" in (p.name() for p in process_iter())) + +if __name__ == '__main__': + unittest.main() diff --git a/smoketest/scripts/cli/test_system_lcd.py b/smoketest/scripts/cli/test_system_lcd.py new file mode 100755 index 000000000..931a91c53 --- /dev/null +++ b/smoketest/scripts/cli/test_system_lcd.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2020 Francois Mertz fireboxled@gmail.com +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import unittest + +from configparser import ConfigParser +from psutil import process_iter +from vyos.configsession import ConfigSession + +base_path = ['system', 'lcd'] + +class TestSystemLCD(unittest.TestCase): + def setUp(self): + self.session = ConfigSession(os.getpid()) + + def tearDown(self): + self.session.delete(base_path) + self.session.commit() + del self.session + + def test_system_display(self): + # configure some system display + self.session.set(base_path + ['device', 'ttyS1']) + self.session.set(base_path + ['model', 'cfa-533']) + + # commit changes + self.session.commit() + + # load up ini-styled LCDd.conf + conf = ConfigParser() + conf.read('/run/LCDd/LCDd.conf') + + self.assertEqual(conf['CFontzPacket']['Model'], '533') + self.assertEqual(conf['CFontzPacket']['Device'], '/dev/ttyS1') + + # both processes running + self.assertTrue('LCDd' in (p.name() for p in process_iter())) + +if __name__ == '__main__': + unittest.main() diff --git a/smoketest/scripts/cli/test_system_login.py b/smoketest/scripts/cli/test_system_login.py new file mode 100755 index 000000000..3c4b1fa28 --- /dev/null +++ b/smoketest/scripts/cli/test_system_login.py @@ -0,0 +1,67 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2019-2020 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import re +import unittest + +from subprocess import Popen, PIPE +from vyos.configsession import ConfigSession, ConfigSessionError +import vyos.util as util + +base_path = ['system', 'login'] +users = ['vyos1', 'vyos2'] + +class TestSystemLogin(unittest.TestCase): + def setUp(self): + self.session = ConfigSession(os.getpid()) + + def tearDown(self): + # Delete individual users from configuration + for user in users: + self.session.delete(base_path + ['user', user]) + + self.session.commit() + del self.session + + def test_user(self): + """ Check if user can be created and we can SSH to localhost """ + self.session.set(['service', 'ssh', 'port', '22']) + + for user in users: + name = "VyOS Roxx " + user + home_dir = "/tmp/" + user + + self.session.set(base_path + ['user', user, 'authentication', 'plaintext-password', user]) + self.session.set(base_path + ['user', user, 'full-name', 'VyOS Roxx']) + self.session.set(base_path + ['user', user, 'home-directory', home_dir]) + + self.session.commit() + + for user in users: + cmd = ['su','-', user] + proc = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE) + tmp = "{}\nuname -a".format(user) + proc.stdin.write(tmp.encode()) + proc.stdin.flush() + (stdout, stderr) = proc.communicate() + + # stdout is something like this: + # b'Linux vyos 4.19.101-amd64-vyos #1 SMP Sun Feb 2 10:18:07 UTC 2020 x86_64 GNU/Linux\n' + self.assertTrue(len(stdout) > 40) + +if __name__ == '__main__': + unittest.main() diff --git a/smoketest/scripts/cli/test_system_nameserver.py b/smoketest/scripts/cli/test_system_nameserver.py new file mode 100755 index 000000000..9040be072 --- /dev/null +++ b/smoketest/scripts/cli/test_system_nameserver.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2019-2020 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import re +import unittest + +from vyos.configsession import ConfigSession, ConfigSessionError +import vyos.util as util + +RESOLV_CONF = '/etc/resolv.conf' + +test_servers = ['192.0.2.10', '2001:db8:1::100'] +base_path = ['system', 'name-server'] + +def get_name_servers(): + resolv_conf = util.read_file(RESOLV_CONF) + return re.findall(r'\n?nameserver\s+(.*)', resolv_conf) + +class TestSystemNameServer(unittest.TestCase): + def setUp(self): + self.session = ConfigSession(os.getpid()) + + def tearDown(self): + # Delete existing name servers + self.session.delete(base_path) + self.session.commit() + + del self.session + + def test_add_server(self): + """ Check if server is added to resolv.conf """ + for s in test_servers: + self.session.set(base_path + [s]) + self.session.commit() + + servers = get_name_servers() + for s in servers: + self.assertTrue(s in servers) + + def test_delete_server(self): + """ Test if a deleted server disappears from resolv.conf """ + for s in test_servers: + self.session.delete(base_path + [s]) + self.session.commit() + + servers = get_name_servers() + for s in servers: + self.assertTrue(test_server_1 not in servers) + +if __name__ == '__main__': + unittest.main() + diff --git a/smoketest/scripts/cli/test_system_ntp.py b/smoketest/scripts/cli/test_system_ntp.py new file mode 100755 index 000000000..856a28916 --- /dev/null +++ b/smoketest/scripts/cli/test_system_ntp.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2019-2020 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import re +import os +import unittest + +from psutil import process_iter +from vyos.configsession import ConfigSession, ConfigSessionError +from vyos.template import vyos_address_from_cidr, vyos_netmask_from_cidr +from vyos.util import read_file + +NTP_CONF = '/etc/ntp.conf' +base_path = ['system', 'ntp'] + +def get_config_value(key): + tmp = read_file(NTP_CONF) + tmp = re.findall(r'\n?{}\s+(.*)'.format(key), tmp) + # remove possible trailing whitespaces + return [item.strip() for item in tmp] + +class TestSystemNTP(unittest.TestCase): + def setUp(self): + self.session = ConfigSession(os.getpid()) + # ensure we can also run this test on a live system - so lets clean + # out the current configuration :) + self.session.delete(base_path) + + def tearDown(self): + self.session.delete(base_path) + self.session.commit() + del self.session + + def test_ntp_options(self): + """ Test basic NTP support with multiple servers and their options """ + servers = ['192.0.2.1', '192.0.2.2'] + options = ['noselect', 'preempt', 'prefer'] + + for server in servers: + for option in options: + self.session.set(base_path + ['server', server, option]) + + # commit changes + self.session.commit() + + # Check generated configuration + tmp = get_config_value('server') + for server in servers: + test = f'{server} iburst ' + ' '.join(options) + self.assertTrue(test in tmp) + + # Check for running process + self.assertTrue("ntpd" in (p.name() for p in process_iter())) + + def test_ntp_clients(self): + """ Test the allowed-networks statement """ + listen_address = ['127.0.0.1', '::1'] + for listen in listen_address: + self.session.set(base_path + ['listen-address', listen]) + + networks = ['192.0.2.0/24', '2001:db8:1000::/64'] + for network in networks: + self.session.set(base_path + ['allow-clients', 'address', network]) + + # Verify "NTP server not configured" verify() statement + with self.assertRaises(ConfigSessionError): + self.session.commit() + + servers = ['192.0.2.1', '192.0.2.2'] + for server in servers: + self.session.set(base_path + ['server', server]) + + self.session.commit() + + # Check generated client address configuration + for network in networks: + network_address = vyos_address_from_cidr(network) + network_netmask = vyos_netmask_from_cidr(network) + + tmp = get_config_value(f'restrict {network_address}')[0] + test = f'mask {network_netmask} nomodify notrap nopeer' + self.assertTrue(tmp in test) + + # Check listen address + tmp = get_config_value('interface') + test = ['ignore wildcard'] + for listen in listen_address: + test.append(f'listen {listen}') + self.assertEqual(tmp, test) + + # Check for running process + self.assertTrue("ntpd" in (p.name() for p in process_iter())) + +if __name__ == '__main__': + unittest.main() diff --git a/smoketest/scripts/cli/test_vpn_anyconnect.py b/smoketest/scripts/cli/test_vpn_anyconnect.py new file mode 100755 index 000000000..dd8ab1609 --- /dev/null +++ b/smoketest/scripts/cli/test_vpn_anyconnect.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2020 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import re +import os +import unittest + +from psutil import process_iter +from vyos.configsession import ConfigSession, ConfigSessionError +from vyos.util import read_file + +OCSERV_CONF = '/run/ocserv/ocserv.conf' +base_path = ['vpn', 'anyconnect'] +cert = '/etc/ssl/certs/ssl-cert-snakeoil.pem' +cert_key = '/etc/ssl/private/ssl-cert-snakeoil.key' + +class TestVpnAnyconnect(unittest.TestCase): + def setUp(self): + self.session = ConfigSession(os.getpid()) + + def tearDown(self): + # Delete vpn anyconnect configuration + self.session.delete(base_path) + self.session.commit() + + del self.session + + def test_vpn(self): + user = 'vyos_user' + password = 'vyos_pass' + self.session.delete(base_path) + self.session.set(base_path + ["authentication", "local-users", "username", user, "password", password]) + self.session.set(base_path + ["authentication", "mode", "local"]) + self.session.set(base_path + ["network-settings", "client-ip-settings", "subnet", "192.0.2.0/24"]) + self.session.set(base_path + ["ssl", "ca-cert-file", cert]) + self.session.set(base_path + ["ssl", "cert-file", cert]) + self.session.set(base_path + ["ssl", "key-file", cert_key]) + + self.session.commit() + + # Check for running process + self.assertTrue("ocserv-main" in (p.name() for p in process_iter())) + +if __name__ == '__main__': + unittest.main() diff --git a/smoketest/scripts/cli/test_vrf.py b/smoketest/scripts/cli/test_vrf.py new file mode 100755 index 000000000..efa095b30 --- /dev/null +++ b/smoketest/scripts/cli/test_vrf.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2020 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import unittest + +from vyos.configsession import ConfigSession, ConfigSessionError +from vyos.util import read_file + +class VRFTest(unittest.TestCase): + def setUp(self): + self.session = ConfigSession(os.getpid()) + self._vrfs = ['red', 'green', 'blue'] + + def tearDown(self): + # delete all VRFs + self.session.delete(['vrf']) + self.session.commit() + del self.session + + def test_table_id(self): + table = 1000 + for vrf in self._vrfs: + base = ['vrf', 'name', vrf] + description = "VyOS-VRF-" + vrf + self.session.set(base + ['description', description]) + + # check validate() - a table ID is mandatory + with self.assertRaises(ConfigSessionError): + self.session.commit() + + self.session.set(base + ['table', str(table)]) + table += 1 + + # commit changes + self.session.commit() + +if __name__ == '__main__': + unittest.main() diff --git a/smoketest/scripts/system/test_module_load.py b/smoketest/scripts/system/test_module_load.py new file mode 100755 index 000000000..59c3e0b6a --- /dev/null +++ b/smoketest/scripts/system/test_module_load.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2019-2020 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import unittest + +modules = { + "intel": ["e1000", "e1000e", "igb", "ixgb", "ixgbe", "ixgbevf", "i40e", "i40evf", "iavf"], + "accel_ppp": ["ipoe", "vlan_mon"], + "misc": ["wireguard"] +} + +class TestKernelModules(unittest.TestCase): + def test_load_modules(self): + success = True + for msk in modules: + ms = modules[msk] + for m in ms: + # We want to uncover all modules that fail, + # not fail at the first one + try: + os.system("modprobe {0}".format(m)) + except: + success = False + + self.assertTrue(success) + +if __name__ == '__main__': + unittest.main() |