diff options
-rw-r--r-- | scripts/cli/base_interfaces_test.py | 177 | ||||
-rwxr-xr-x | scripts/cli/test_interfaces_bonding.py | 56 | ||||
-rwxr-xr-x | scripts/cli/test_interfaces_bridge.py | 41 | ||||
-rwxr-xr-x | scripts/cli/test_interfaces_ethernet.py | 36 | ||||
-rwxr-xr-x | scripts/cli/test_interfaces_l2tpv3.py | 59 | ||||
-rwxr-xr-x | scripts/cli/test_interfaces_pppoe.py | 39 | ||||
-rwxr-xr-x | scripts/cli/test_interfaces_pseudo_ethernet.py | 23 | ||||
-rwxr-xr-x | scripts/cli/test_interfaces_wireless.py | 26 | ||||
-rwxr-xr-x | scripts/cli/test_nat.py | 63 | ||||
-rwxr-xr-x | scripts/cli/test_service_bcast-relay.py | 71 | ||||
-rwxr-xr-x | scripts/cli/test_service_mdns-repeater.py | 51 | ||||
-rwxr-xr-x | scripts/cli/test_service_pppoe-server.py | 154 | ||||
-rwxr-xr-x | scripts/cli/test_service_router-advert.py | 99 | ||||
-rwxr-xr-x | scripts/cli/test_service_snmp.py | 57 | ||||
-rwxr-xr-x | scripts/cli/test_service_ssh.py | 4 | ||||
-rwxr-xr-x | scripts/cli/test_system_ntp.py | 108 | ||||
-rwxr-xr-x | scripts/cli/test_vpn_anyconnect.py | 58 |
17 files changed, 1010 insertions, 112 deletions
diff --git a/scripts/cli/base_interfaces_test.py b/scripts/cli/base_interfaces_test.py index 53fe553bc..14ec7e137 100644 --- a/scripts/cli/base_interfaces_test.py +++ b/scripts/cli/base_interfaces_test.py @@ -15,17 +15,28 @@ import os import unittest -from vyos.configsession import ConfigSession from netifaces import ifaddresses, AF_INET, AF_INET6 -from vyos.validate import is_intf_addr_assigned, is_ipv6_link_local + +from vyos.configsession import ConfigSession from vyos.ifconfig import Interface +from vyos.util import read_file +from vyos.validate import is_intf_addr_assigned, is_ipv6_link_local class BasicInterfaceTest: class BaseTest(unittest.TestCase): + _test_ip = False _test_mtu = False + _test_vlan = False + _test_qinq = False + _test_ipv6 = False _base_path = [] + _options = {} _interfaces = [] + _qinq_range = ['10', '20', '30'] + _vlan_range = ['100', '200', '300', '2000'] + # choose IPv6 minimum MTU value for tests - this must always work + _mtu = '1280' def setUp(self): self.session = ConfigSession(os.getpid()) @@ -38,9 +49,14 @@ class BasicInterfaceTest: def tearDown(self): # we should not remove ethernet from the overall CLI if 'ethernet' in self._base_path: - self.session.delete(self._base_path) - for intf in self._interfaces: - self.session.set(self._base_path + [intf]) + for interface in self._interfaces: + # when using a dedicated interface to test via TEST_ETH environment + # variable only this one will be cleared in the end - usable to test + # ethernet interfaces via SSH + self.session.delete(self._base_path + [interface]) + self.session.set(self._base_path + [interface, 'duplex', 'auto']) + self.session.set(self._base_path + [interface, 'speed', 'auto']) + self.session.set(self._base_path + [interface, 'smp-affinity', 'auto']) else: self.session.delete(self._base_path) @@ -105,26 +121,149 @@ class BasicInterfaceTest: self.assertTrue(is_intf_addr_assigned(intf, addr['addr'])) + def test_ipv6_link_local(self): + """ Common function for IPv6 link-local address assignemnts """ + if not self._test_ipv6: + return None + + for interface in self._interfaces: + base = self._base_path + [interface] + for option in self._options.get(interface, []): + self.session.set(base + option.split()) + + # after commit we must have an IPv6 link-local address + self.session.commit() + + for interface in self._interfaces: + for addr in ifaddresses(interface)[AF_INET6]: + self.assertTrue(is_ipv6_link_local(addr['addr'])) + + # disable IPv6 link-local address assignment + for interface in self._interfaces: + base = self._base_path + [interface] + self.session.set(base + ['ipv6', 'address', 'no-default-link-local']) + + # after commit we must have no IPv6 link-local address + self.session.commit() + + for interface in self._interfaces: + self.assertTrue(AF_INET6 not in ifaddresses(interface)) + + def _mtu_test(self, intf): + """ helper function to verify MTU size """ + with open('/sys/class/net/{}/mtu'.format(intf), 'r') as f: + tmp = f.read().rstrip() + self.assertEqual(tmp, self._mtu) def test_change_mtu(self): - """ - Check if MTU can be changed on interface. - Test MTU size will be 1400 bytes. - """ - if self._test_mtu is False: + """ Testcase if MTU can be changed on interface """ + if not self._test_mtu: return None - - # choose a MTU which works on every interface - mtu = '1400' for intf in self._interfaces: - self.session.set(self._base_path + [intf, 'mtu', mtu]) + base = self._base_path + [intf] + self.session.set(base + ['mtu', self._mtu]) for option in self._options.get(intf, []): - self.session.set(self._base_path + [intf] + option.split()) + self.session.set(base + option.split()) self.session.commit() + for intf in self._interfaces: + self._mtu_test(intf) - # Validate interface description + def test_8021q_vlan(self): + """ Testcase for 802.1q VLAN interfaces """ + if not self._test_vlan: + return None + + for interface in self._interfaces: + base = self._base_path + [interface] + for option in self._options.get(interface, []): + self.session.set(base + option.split()) + + for vlan in self._vlan_range: + base = self._base_path + [interface, 'vif', vlan] + self.session.set(base + ['mtu', self._mtu]) + for address in self._test_addr: + self.session.set(base + ['address', address]) + + self.session.commit() for intf in self._interfaces: - with open('/sys/class/net/{}/mtu'.format(intf), 'r') as f: - tmp = f.read().rstrip() - self.assertTrue(tmp == mtu) + for vlan in self._vlan_range: + vif = f'{intf}.{vlan}' + for address in self._test_addr: + self.assertTrue(is_intf_addr_assigned(vif, address)) + self._mtu_test(vif) + + + def test_8021ad_qinq_vlan(self): + """ Testcase for 802.1ad Q-in-Q VLAN interfaces """ + if not self._test_qinq: + return None + + for interface in self._interfaces: + base = self._base_path + [interface] + for option in self._options.get(interface, []): + self.session.set(base + option.split()) + + for vif_s in self._qinq_range: + for vif_c in self._vlan_range: + base = self._base_path + [interface, 'vif-s', vif_s, 'vif-c', vif_c] + self.session.set(base + ['mtu', self._mtu]) + for address in self._test_addr: + self.session.set(base + ['address', address]) + + self.session.commit() + for interface in self._interfaces: + for vif_s in self._qinq_range: + for vif_c in self._vlan_range: + vif = f'{interface}.{vif_s}.{vif_c}' + for address in self._test_addr: + self.assertTrue(is_intf_addr_assigned(vif, address)) + self._mtu_test(vif) + + def test_ip_options(self): + """ test IP options like arp """ + if not self._test_ip: + return None + + for interface in self._interfaces: + arp_tmo = '300' + path = self._base_path + [interface] + for option in self._options.get(interface, []): + self.session.set(path + option.split()) + + # Options + self.session.set(path + ['ip', 'arp-cache-timeout', arp_tmo]) + self.session.set(path + ['ip', 'disable-arp-filter']) + self.session.set(path + ['ip', 'enable-arp-accept']) + self.session.set(path + ['ip', 'enable-arp-announce']) + self.session.set(path + ['ip', 'enable-arp-ignore']) + self.session.set(path + ['ip', 'enable-proxy-arp']) + self.session.set(path + ['ip', 'proxy-arp-pvlan']) + self.session.set(path + ['ip', 'source-validation', 'loose']) + + self.session.commit() + + for interface in self._interfaces: + tmp = read_file(f'/proc/sys/net/ipv4/neigh/{interface}/base_reachable_time_ms') + self.assertEqual(tmp, str((int(arp_tmo) * 1000))) # tmo value is in milli seconds + + tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/arp_filter') + self.assertEqual('0', tmp) + + tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/arp_accept') + self.assertEqual('1', tmp) + + tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/arp_announce') + self.assertEqual('1', tmp) + + tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/arp_ignore') + self.assertEqual('1', tmp) + + tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/proxy_arp') + self.assertEqual('1', tmp) + + tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/proxy_arp_pvlan') + self.assertEqual('1', tmp) + + tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/rp_filter') + self.assertEqual('2', tmp) diff --git a/scripts/cli/test_interfaces_bonding.py b/scripts/cli/test_interfaces_bonding.py index bfadc4a9d..e3d3b25ee 100755 --- a/scripts/cli/test_interfaces_bonding.py +++ b/scripts/cli/test_interfaces_bonding.py @@ -14,46 +14,48 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +import os import unittest from base_interfaces_test import BasicInterfaceTest from vyos.ifconfig import Section from vyos.configsession import ConfigSessionError +from vyos.util import read_file class BondingInterfaceTest(BasicInterfaceTest.BaseTest): def setUp(self): - super().setUp() + super().setUp() - self._base_path = ['interfaces', 'bonding'] - self._test_mtu = True - self._interfaces = ['bond0'] + self._base_path = ['interfaces', 'bonding'] + self._interfaces = ['bond0'] + self._test_mtu = True + self._test_vlan = True + self._test_qinq = True + self._test_ipv6 = True - def test_add_remove_member(self): - members = [] + self._members = [] # we need to filter out VLAN interfaces identified by a dot (.) # in their name - just in case! - for tmp in Section.interfaces("ethernet"): - if not '.' in tmp: - members.append(tmp) - - for intf in self._interfaces: - for member in members: - # We can not enslave an interface when there is an address - # assigned - take care here - or find them dynamically if a user - # runs vyos-smoketest on his production device? - self.session.set(self._base_path + [intf, 'member', 'interface', member]) - - self.session.commit() - - # check validate() - we can only add existing interfaces - self.session.set(self._base_path + [intf, 'member', 'interface', 'eth99']) - with self.assertRaises(ConfigSessionError): - self.session.commit() - - # check if member deletion works as expected - self.session.delete(self._base_path + [intf, 'member']) - self.session.commit() + if 'TEST_ETH' in os.environ: + self._members = os.environ['TEST_ETH'].split() + else: + for tmp in Section.interfaces("ethernet"): + if not '.' in tmp: + self._members.append(tmp) + + self._options['bond0'] = [] + for member in self._members: + self._options['bond0'].append(f'member interface {member}') + + + def test_add_address_single(self): + """ derived method to check if member interfaces are enslaved properly """ + super().test_add_address_single() + + for interface in self._interfaces: + slaves = read_file(f'/sys/class/net/{interface}/bonding/slaves').split() + self.assertListEqual(slaves, self._members) if __name__ == '__main__': unittest.main() diff --git a/scripts/cli/test_interfaces_bridge.py b/scripts/cli/test_interfaces_bridge.py index 03c78c210..bc0bb69c6 100755 --- a/scripts/cli/test_interfaces_bridge.py +++ b/scripts/cli/test_interfaces_bridge.py @@ -14,6 +14,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +import os import unittest from base_interfaces_test import BasicInterfaceTest @@ -23,35 +24,45 @@ class BridgeInterfaceTest(BasicInterfaceTest.BaseTest): def setUp(self): super().setUp() + self._test_ipv6 = True + self._base_path = ['interfaces', 'bridge'] self._interfaces = ['br0'] - def test_add_remove_member(self): - members = [] + self._members = [] # we need to filter out VLAN interfaces identified by a dot (.) # in their name - just in case! - for tmp in Section.interfaces("ethernet"): - if not '.' in tmp: - members.append(tmp) + if 'TEST_ETH' in os.environ: + self._members = os.environ['TEST_ETH'].split() + else: + for tmp in Section.interfaces("ethernet"): + if not '.' in tmp: + self._members.append(tmp) - for intf in self._interfaces: - cost = 1000 - priority = 10 + self._options['br0'] = [] + for member in self._members: + self._options['br0'].append(f'member interface {member}') - self.session.set(self._base_path + [intf, 'stp']) + def test_add_remove_member(self): + for interface in self._interfaces: + base = self._base_path + [interface] + self.session.set(base + ['stp']) + self.session.set(base + ['address', '192.0.2.1/24']) + cost = 1000 + priority = 10 # assign members to bridge interface - for member in members: - self.session.set(self._base_path + [intf, 'member', 'interface', member]) - self.session.set(self._base_path + [intf, 'member', 'interface', member, 'cost', str(cost)]) - self.session.set(self._base_path + [intf, 'member', 'interface', member, 'priority', str(priority)]) + for member in self._members: + base_member = base + ['member', 'interface', member] + self.session.set(base_member + ['cost', str(cost)]) + self.session.set(base_member + ['priority', str(priority)]) cost += 1 priority += 1 self.session.commit() - for intf in self._interfaces: - self.session.delete(self._base_path + [intf, 'member']) + for interface in self._interfaces: + self.session.delete(self._base_path + [interface, 'member']) self.session.commit() diff --git a/scripts/cli/test_interfaces_ethernet.py b/scripts/cli/test_interfaces_ethernet.py index 373c81680..761ec7506 100755 --- a/scripts/cli/test_interfaces_ethernet.py +++ b/scripts/cli/test_interfaces_ethernet.py @@ -14,6 +14,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +import os import unittest from base_interfaces_test import BasicInterfaceTest @@ -24,15 +25,44 @@ class EthernetInterfaceTest(BasicInterfaceTest.BaseTest): super().setUp() self._base_path = ['interfaces', 'ethernet'] + self._test_ip = True self._test_mtu = True + self._test_vlan = True + self._test_qinq = True + self._test_ipv6 = True self._interfaces = [] # we need to filter out VLAN interfaces identified by a dot (.) # in their name - just in case! - for tmp in Section.interfaces("ethernet"): - if not '.' in tmp: - self._interfaces.append(tmp) + if 'TEST_ETH' in os.environ: + tmp = os.environ['TEST_ETH'].split() + self._interfaces = tmp + else: + for tmp in Section.interfaces("ethernet"): + if not '.' in tmp: + self._interfaces.append(tmp) + def test_dhcp_disable(self): + """ + When interface is configured as admin down, it must be admin down even + """ + for interface in self._interfaces: + self.session.set(self._base_path + [interface, 'disable']) + for option in self._options.get(interface, []): + self.session.set(self._base_path + [interface] + option.split()) + + # Also enable DHCP (ISC DHCP always places interface in admin up + # state so we check that we do not start DHCP client. + # https://phabricator.vyos.net/T2767 + self.session.set(self._base_path + [interface, 'address', 'dhcp']) + + self.session.commit() + + # Validate interface state + for interface in self._interfaces: + with open(f'/sys/class/net/{interface}/flags', 'r') as f: + flags = f.read() + self.assertEqual(int(flags, 16) & 1, 0) if __name__ == '__main__': unittest.main() diff --git a/scripts/cli/test_interfaces_l2tpv3.py b/scripts/cli/test_interfaces_l2tpv3.py new file mode 100755 index 000000000..d8655d157 --- /dev/null +++ b/scripts/cli/test_interfaces_l2tpv3.py @@ -0,0 +1,59 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2020 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import json +import jmespath +import unittest + +from base_interfaces_test import BasicInterfaceTest +from vyos.util import cmd + +class GeneveInterfaceTest(BasicInterfaceTest.BaseTest): + def setUp(self): + super().setUp() + + self._base_path = ['interfaces', 'l2tpv3'] + self._options = { + 'l2tpeth10': ['local-ip 127.0.0.1', 'remote-ip 127.10.10.10', + 'tunnel-id 100', 'peer-tunnel-id 10', + 'session-id 100', 'peer-session-id 10', + 'source-port 1010', 'destination-port 10101'], + 'l2tpeth20': ['local-ip 127.0.0.1', 'peer-session-id 20', + 'peer-tunnel-id 200', 'remote-ip 127.20.20.20', + 'session-id 20', 'tunnel-id 200', + 'source-port 2020', 'destination-port 20202'], + } + self._interfaces = list(self._options) + + def test_add_address_single(self): + super().test_add_address_single() + + command = 'sudo ip -j l2tp show session' + json_out = json.loads(cmd(command)) + for interface in self._options: + for config in json_out: + if config['interface'] == interface: + # convert list with configuration items into a dict + dict = {} + for opt in self._options[interface]: + dict.update({opt.split()[0].replace('-','_'): opt.split()[1]}) + + for key in ['peer_session_id', 'peer_tunnel_id', 'session_id', 'tunnel_id']: + self.assertEqual(str(config[key]), dict[key]) + + +if __name__ == '__main__': + unittest.main() diff --git a/scripts/cli/test_interfaces_pppoe.py b/scripts/cli/test_interfaces_pppoe.py index 3c11da795..822f05de6 100755 --- a/scripts/cli/test_interfaces_pppoe.py +++ b/scripts/cli/test_interfaces_pppoe.py @@ -53,7 +53,7 @@ class PPPoEInterfaceTest(unittest.TestCase): self.session.commit() del self.session - def test_pppoe_1(self): + def test_pppoe(self): """ Check if PPPoE dialer can be configured and runs """ for interface in self._interfaces: user = 'VyOS-user-' + interface @@ -80,13 +80,13 @@ class PPPoEInterfaceTest(unittest.TestCase): password = 'VyOS-passwd-' + interface tmp = get_config_value(interface, 'mtu')[1] - self.assertTrue(tmp in mtu) + self.assertEqual(tmp, mtu) tmp = get_config_value(interface, 'user')[1].replace('"', '') - self.assertTrue(tmp in user) + self.assertEqual(tmp, user) tmp = get_config_value(interface, 'password')[1].replace('"', '') - self.assertTrue(tmp in password) + self.assertEqual(tmp, password) tmp = get_config_value(interface, 'ifname')[1] - self.assertTrue(tmp in interface) + self.assertEqual(tmp, interface) # Check if ppp process is running in the interface in question running = False @@ -98,7 +98,7 @@ class PPPoEInterfaceTest(unittest.TestCase): self.assertTrue(running) def test_pppoe_dhcpv6pd(self): - """ Check if PPPoE dialer can be configured and runs """ + """ Check if PPPoE dialer can be configured with DHCPv6-PD """ address = '1' sla_id = '0' sla_len = '8' @@ -111,39 +111,38 @@ class PPPoEInterfaceTest(unittest.TestCase): self.session.set(base_path + [interface, 'ipv6', 'enable']) # prefix delegation stuff - dhcpv6_pd_base = base_path + [interface, 'dhcpv6-options', 'prefix-delegation'] + dhcpv6_pd_base = base_path + [interface, 'dhcpv6-options', 'pd', '0'] self.session.set(dhcpv6_pd_base + ['length', '56']) self.session.set(dhcpv6_pd_base + ['interface', self._source_interface, 'address', address]) self.session.set(dhcpv6_pd_base + ['interface', self._source_interface, 'sla-id', sla_id]) - self.session.set(dhcpv6_pd_base + ['interface', self._source_interface, 'sla-len', sla_len]) # commit changes self.session.commit() # verify "normal" PPPoE value - 1492 is default MTU tmp = get_config_value(interface, 'mtu')[1] - self.assertTrue(tmp in '1492') + self.assertEqual(tmp, '1492') tmp = get_config_value(interface, 'user')[1].replace('"', '') - self.assertTrue(tmp in 'vyos') + self.assertEqual(tmp, 'vyos') tmp = get_config_value(interface, 'password')[1].replace('"', '') - self.assertTrue(tmp in 'vyos') + self.assertEqual(tmp, 'vyos') for param in ['+ipv6', 'ipv6cp-use-ipaddr']: tmp = get_config_value(interface, param)[0] - self.assertTrue(tmp in param) + self.assertEqual(tmp, param) # verify DHCPv6 prefix delegation # will return: ['delegation', '::/56 infinity;'] tmp = get_dhcp6c_config_value(interface, 'prefix')[1].split()[0] # mind the whitespace - self.assertTrue(tmp in '::/56') + self.assertEqual(tmp, '::/56') tmp = get_dhcp6c_config_value(interface, 'prefix-interface')[0].split()[0] - self.assertTrue(tmp in self._source_interface) + self.assertEqual(tmp, self._source_interface) tmp = get_dhcp6c_config_value(interface, 'ifid')[0] - self.assertTrue(tmp in address) + self.assertEqual(tmp, address) tmp = get_dhcp6c_config_value(interface, 'sla-id')[0] - self.assertTrue(tmp in sla_id) + self.assertEqual(tmp, sla_id) tmp = get_dhcp6c_config_value(interface, 'sla-len')[0] - self.assertTrue(tmp in sla_len) + self.assertEqual(tmp, sla_len) # Check if ppp process is running in the interface in question running = False @@ -153,7 +152,11 @@ class PPPoEInterfaceTest(unittest.TestCase): self.assertTrue(running) # We can not check if wide-dhcpv6 process is running as it is started - # after the PPP interface gets a link to the ISP + # after the PPP interface gets a link to the ISP - but we can see if + # it would be started by the scripts + tmp = read_file(f'/etc/ppp/ipv6-up.d/1000-vyos-pppoe-{interface}') + tmp = re.findall(f'systemctl start dhcp6c@{interface}.service', tmp) + self.assertTrue(tmp) if __name__ == '__main__': unittest.main() diff --git a/scripts/cli/test_interfaces_pseudo_ethernet.py b/scripts/cli/test_interfaces_pseudo_ethernet.py index 1f5de4f61..bc2e6e7eb 100755 --- a/scripts/cli/test_interfaces_pseudo_ethernet.py +++ b/scripts/cli/test_interfaces_pseudo_ethernet.py @@ -21,18 +21,19 @@ from base_interfaces_test import BasicInterfaceTest class PEthInterfaceTest(BasicInterfaceTest.BaseTest): def setUp(self): - super().setUp() - self._base_path = ['interfaces', 'pseudo-ethernet'] - options = ['source-interface eth0', 'ip arp-cache-timeout 10', - 'ip disable-arp-filter', 'ip enable-arp-accept', - 'ip enable-arp-announce', 'ip enable-arp-ignore', - 'ip enable-proxy-arp', 'ip proxy-arp-pvlan'] + super().setUp() + self._base_path = ['interfaces', 'pseudo-ethernet'] - self._options = { - 'peth0': options, - 'peth1': options, - } - self._interfaces = list(self._options) + self._test_ip = True + self._test_mtu = True + self._test_vlan = True + self._test_qinq = True + + self._options = { + 'peth0': ['source-interface eth1'], + 'peth1': ['source-interface eth1'], + } + self._interfaces = list(self._options) if __name__ == '__main__': unittest.main() diff --git a/scripts/cli/test_interfaces_wireless.py b/scripts/cli/test_interfaces_wireless.py index 5d04a1c44..fae233244 100755 --- a/scripts/cli/test_interfaces_wireless.py +++ b/scripts/cli/test_interfaces_wireless.py @@ -18,6 +18,8 @@ import os import unittest from base_interfaces_test import BasicInterfaceTest +from psutil import process_iter +from vyos.util import check_kmod class WirelessInterfaceTest(BasicInterfaceTest.BaseTest): def setUp(self): @@ -37,18 +39,20 @@ class WirelessInterfaceTest(BasicInterfaceTest.BaseTest): self._interfaces = list(self._options) self.session.set(['system', 'wifi-regulatory-domain', 'SE']) - def test_wifi_client(self): - """ test creation of a wireless station """ - for intf in self._interfaces: - # prepare interfaces - for option in self._options.get(intf, []): - self.session.set(self._base_path + [intf] + option.split()) - - # commit changes - self.session.commit() - + def test_add_address_single(self): + """ derived method to check if member interfaces are enslaved properly """ + super().test_add_address_single() + for option, option_value in self._options.items(): + if 'type access-point' in option_value: + # Check for running process + self.assertIn('hostapd', (p.name() for p in process_iter())) + elif 'type station' in option_value: + # Check for running process + self.assertIn('wpa_supplicant', (p.name() for p in process_iter())) + else: + self.assertTrue(False) if __name__ == '__main__': - os.system('sudo modprobe mac80211_hwsim') + check_kmod('mac80211_hwsim') unittest.main() diff --git a/scripts/cli/test_nat.py b/scripts/cli/test_nat.py new file mode 100755 index 000000000..416810e40 --- /dev/null +++ b/scripts/cli/test_nat.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2020 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import jmespath +import json +import unittest + +from vyos.configsession import ConfigSession, ConfigSessionError +from vyos.util import cmd + +base_path = ['nat'] +snat_pattern = 'nftables[?rule].rule[?chain].{chain: chain, comment: comment, address: { network: expr[].match.right.prefix.addr | [0], prefix: expr[].match.right.prefix.len | [0]}}' + +class TestNAT(unittest.TestCase): + def setUp(self): + # ensure we can also run this test on a live system - so lets clean + # out the current configuration :) + self.session = ConfigSession(os.getpid()) + self.session.delete(base_path) + + def tearDown(self): + self.session.delete(base_path) + self.session.commit() + + def test_source_nat(self): + """ Configure and validate source NAT rule(s) """ + + path = base_path + ['source'] + network = '192.168.0.0/16' + self.session.set(path + ['rule', '1', 'destination', 'address', network]) + self.session.set(path + ['rule', '1', 'exclude']) + + # check validate() - outbound-interface must be defined + with self.assertRaises(ConfigSessionError): + self.session.commit() + + self.session.set(path + ['rule', '1', 'outbound-interface', 'any']) + self.session.commit() + + tmp = cmd('sudo nft -j list table nat') + nftable_json = json.loads(tmp) + condensed_json = jmespath.search(snat_pattern, nftable_json)[0] + + self.assertEqual(condensed_json['comment'], 'DST-NAT-1') + self.assertEqual(condensed_json['address']['network'], network.split('/')[0]) + self.assertEqual(str(condensed_json['address']['prefix']), network.split('/')[1]) + +if __name__ == '__main__': + unittest.main() diff --git a/scripts/cli/test_service_bcast-relay.py b/scripts/cli/test_service_bcast-relay.py new file mode 100755 index 000000000..fe4531c3b --- /dev/null +++ b/scripts/cli/test_service_bcast-relay.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2019-2020 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import unittest + +from psutil import process_iter +from vyos.configsession import ConfigSession, ConfigSessionError + +base_path = ['service', 'broadcast-relay'] + +class TestServiceBroadcastRelay(unittest.TestCase): + _address1 = '192.0.2.1/24' + _address2 = '192.0.2.1/24' + + def setUp(self): + self.session = ConfigSession(os.getpid()) + self.session.set(['interfaces', 'dummy', 'dum1001', 'address', self._address1]) + self.session.set(['interfaces', 'dummy', 'dum1002', 'address', self._address2]) + self.session.commit() + + def tearDown(self): + self.session.delete(['interfaces', 'dummy', 'dum1001']) + self.session.delete(['interfaces', 'dummy', 'dum1002']) + self.session.delete(base_path) + self.session.commit() + del self.session + + def test_service(self): + """ Check if broadcast relay service can be configured and runs """ + ids = range(1, 5) + for id in ids: + base = base_path + ['id', str(id)] + self.session.set(base + ['description', 'vyos']) + self.session.set(base + ['port', str(10000 + id)]) + + # check validate() - two interfaces must be present + with self.assertRaises(ConfigSessionError): + self.session.commit() + + self.session.set(base + ['interface', 'dum1001']) + self.session.set(base + ['interface', 'dum1002']) + self.session.set(base + ['address', self._address1.split('/')[0]]) + + self.session.commit() + + for id in ids: + # check if process is running + running = False + for p in process_iter(): + if "udp-broadcast-relay" in p.name(): + if p.cmdline()[3] == str(id): + running = True + break + self.assertTrue(running) + +if __name__ == '__main__': + unittest.main() diff --git a/scripts/cli/test_service_mdns-repeater.py b/scripts/cli/test_service_mdns-repeater.py new file mode 100755 index 000000000..18900b6d2 --- /dev/null +++ b/scripts/cli/test_service_mdns-repeater.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2020 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import unittest + +from psutil import process_iter +from vyos.configsession import ConfigSession + +base_path = ['service', 'mdns', 'repeater'] +intf_base = ['interfaces', 'dummy'] + +class TestServiceMDNSrepeater(unittest.TestCase): + def setUp(self): + self.session = ConfigSession(os.getpid()) + + def tearDown(self): + self.session.delete(base_path) + self.session.delete(intf_base + ['dum10']) + self.session.delete(intf_base + ['dum20']) + self.session.commit() + del self.session + + def test_service(self): + # Service required a configured IP address on the interface + + self.session.set(intf_base + ['dum10', 'address', '192.0.2.1/30']) + self.session.set(intf_base + ['dum20', 'address', '192.0.2.5/30']) + + self.session.set(base_path + ['interface', 'dum10']) + self.session.set(base_path + ['interface', 'dum20']) + self.session.commit() + + # Check for running process + self.assertTrue("mdns-repeater" in (p.name() for p in process_iter())) + +if __name__ == '__main__': + unittest.main() diff --git a/scripts/cli/test_service_pppoe-server.py b/scripts/cli/test_service_pppoe-server.py new file mode 100755 index 000000000..318b6530a --- /dev/null +++ b/scripts/cli/test_service_pppoe-server.py @@ -0,0 +1,154 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 020 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import re +import os +import unittest + +from configparser import ConfigParser +from psutil import process_iter +from vyos.configsession import ConfigSession +from vyos.configsession import ConfigSessionError + +base_path = ['service', 'pppoe-server'] +local_if = ['interfaces', 'dummy', 'dum667'] +pppoe_conf = '/run/accel-pppd/pppoe.conf' + +ac_name = 'ACN' +subnet = '172.18.0.0/24' +gateway = '192.0.2.1' +nameserver = '9.9.9.9' +mtu = '1492' +interface = 'eth0' + +class TestServicePPPoEServer(unittest.TestCase): + def setUp(self): + self.session = ConfigSession(os.getpid()) + # ensure we can also run this test on a live system - so lets clean + # out the current configuration :) + self.session.delete(base_path) + + def tearDown(self): + self.session.delete(base_path) + self.session.delete(local_if) + self.session.commit() + del self.session + + def verify(self, conf): + # validate some common values in the configuration + for tmp in ['log_syslog', 'pppoe', 'chap-secrets', 'ippool', 'ipv6pool', + 'ipv6_nd', 'ipv6_dhcp', 'auth_mschap_v2', 'auth_mschap_v1', + 'auth_chap_md5', 'auth_pap', 'shaper']: + # Settings without values provide None + self.assertEqual(conf['modules'][tmp], None) + + # check Access Concentrator setting + self.assertTrue(conf['pppoe']['ac-name'] == ac_name) + self.assertTrue(conf['pppoe'].getboolean('verbose')) + self.assertTrue(conf['pppoe']['interface'], interface) + + # check configured subnet + self.assertEqual(conf['ip-pool'][subnet], None) + self.assertEqual(conf['ip-pool']['gw-ip-address'], gateway) + + # check ppp + self.assertTrue(conf['ppp'].getboolean('verbose')) + self.assertTrue(conf['ppp'].getboolean('check-ip')) + self.assertFalse(conf['ppp'].getboolean('ccp')) + self.assertEqual(conf['ppp']['min-mtu'], mtu) + self.assertEqual(conf['ppp']['mtu'], mtu) + self.assertEqual(conf['ppp']['mppe'], 'prefer') + self.assertEqual(conf['ppp']['lcp-echo-interval'], '30') + self.assertEqual(conf['ppp']['lcp-echo-timeout'], '0') + self.assertEqual(conf['ppp']['lcp-echo-failure'], '3') + + def test_local_auth(self): + """ Test configuration of local authentication for PPPoE server """ + self.session.set(local_if + ['address', '192.0.2.1/32']) + self.session.set(base_path + ['access-concentrator', ac_name]) + self.session.set(base_path + ['authentication', 'local-users', 'username', 'vyos', 'password', 'vyos']) + self.session.set(base_path + ['authentication', 'mode', 'local']) + self.session.set(base_path + ['client-ip-pool', 'subnet', subnet]) + self.session.set(base_path + ['name-server', nameserver]) + self.session.set(base_path + ['interface', interface]) + self.session.set(base_path + ['local-ip', gateway]) + + # commit changes + self.session.commit() + + # Validate configuration values + conf = ConfigParser(allow_no_value=True) + conf.read(pppoe_conf) + + # basic verification + self.verify(conf) + + # check auth + self.assertEqual(conf['chap-secrets']['chap-secrets'], '/run/accel-pppd/pppoe.chap-secrets') + self.assertEqual(conf['chap-secrets']['gw-ip-address'], gateway) + + # Check for running process + self.assertTrue('accel-pppd' in (p.name() for p in process_iter())) + + def test_radius_auth(self): + """ Test configuration of RADIUS authentication for PPPoE server """ + radius_server = '192.0.2.22' + radius_key = 'secretVyOS' + radius_port = '2000' + radius_port_acc = '3000' + + self.session.set(local_if + ['address', '192.0.2.1/32']) + self.session.set(base_path + ['access-concentrator', ac_name]) + self.session.set(base_path + ['authentication', 'radius', 'server', radius_server, 'key', radius_key]) + self.session.set(base_path + ['authentication', 'radius', 'server', radius_server, 'port', radius_port]) + self.session.set(base_path + ['authentication', 'radius', 'server', radius_server, 'acct-port', radius_port_acc]) + + self.session.set(base_path + ['authentication', 'mode', 'radius']) + self.session.set(base_path + ['client-ip-pool', 'subnet', subnet]) + self.session.set(base_path + ['name-server', nameserver]) + self.session.set(base_path + ['interface', interface]) + self.session.set(base_path + ['local-ip', gateway]) + + # commit changes + self.session.commit() + + # Validate configuration values + conf = ConfigParser(allow_no_value=True) + conf.read(pppoe_conf) + + # basic verification + self.verify(conf) + + # check auth + self.assertTrue(conf['radius'].getboolean('verbose')) + self.assertTrue(conf['radius']['acct-timeout'], '3') + self.assertTrue(conf['radius']['timeout'], '3') + self.assertTrue(conf['radius']['max-try'], '3') + self.assertTrue(conf['radius']['gw-ip-address'], gateway) + + server = conf['radius']['server'].split(',') + self.assertEqual(radius_server, server[0]) + self.assertEqual(radius_key, server[1]) + self.assertEqual(f'auth-port={radius_port}', server[2]) + self.assertEqual(f'acct-port={radius_port_acc}', server[3]) + self.assertEqual(f'req-limit=0', server[4]) + self.assertEqual(f'fail-time=0', server[5]) + + # Check for running process + self.assertTrue('accel-pppd' in (p.name() for p in process_iter())) + +if __name__ == '__main__': + unittest.main() diff --git a/scripts/cli/test_service_router-advert.py b/scripts/cli/test_service_router-advert.py new file mode 100755 index 000000000..ec2110c8a --- /dev/null +++ b/scripts/cli/test_service_router-advert.py @@ -0,0 +1,99 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2019-2020 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import re +import os +import unittest + +from psutil import process_iter +from vyos.configsession import ConfigSession +from vyos.util import read_file + +RADVD_CONF = '/run/radvd/radvd.conf' + +interface = 'eth1' +base_path = ['service', 'router-advert', 'interface', interface] +address_base = ['interfaces', 'ethernet', interface, 'address'] + +def get_config_value(key): + tmp = read_file(RADVD_CONF) + tmp = re.findall(r'\n?{}\s+(.*)'.format(key), tmp) + return tmp[0].split()[0].replace(';','') + +class TestServiceRADVD(unittest.TestCase): + def setUp(self): + self.session = ConfigSession(os.getpid()) + self.session.set(address_base + ['2001:db8::1/64']) + + def tearDown(self): + self.session.delete(address_base) + self.session.delete(base_path) + self.session.commit() + del self.session + + def test_single(self): + self.session.set(base_path + ['prefix', '::/64', 'no-on-link-flag']) + self.session.set(base_path + ['prefix', '::/64', 'no-autonomous-flag']) + self.session.set(base_path + ['prefix', '::/64', 'valid-lifetime', 'infinity']) + self.session.set(base_path + ['dnssl', '2001:db8::1234']) + self.session.set(base_path + ['other-config-flag']) + + # commit changes + self.session.commit() + + # verify values + tmp = get_config_value('interface') + self.assertEqual(tmp, interface) + + tmp = get_config_value('prefix') + self.assertEqual(tmp, '::/64') + + tmp = get_config_value('AdvOtherConfigFlag') + self.assertEqual(tmp, 'on') + + # this is a default value + tmp = get_config_value('AdvRetransTimer') + self.assertEqual(tmp, '0') + + # this is a default value + tmp = get_config_value('AdvCurHopLimit') + self.assertEqual(tmp, '64') + + # this is a default value + tmp = get_config_value('AdvDefaultPreference') + self.assertEqual(tmp, 'medium') + + tmp = get_config_value('AdvAutonomous') + self.assertEqual(tmp, 'off') + + # this is a default value + tmp = get_config_value('AdvValidLifetime') + self.assertEqual(tmp, 'infinity') + + # this is a default value + tmp = get_config_value('AdvPreferredLifetime') + self.assertEqual(tmp, '14400') + + tmp = get_config_value('AdvOnLink') + self.assertEqual(tmp, 'off') + + + + # Check for running process + self.assertTrue('radvd' in (p.name() for p in process_iter())) + +if __name__ == '__main__': + unittest.main() diff --git a/scripts/cli/test_service_snmp.py b/scripts/cli/test_service_snmp.py index d4bbdc0b1..fb5f5393f 100755 --- a/scripts/cli/test_service_snmp.py +++ b/scripts/cli/test_service_snmp.py @@ -81,10 +81,10 @@ class TestSNMPService(unittest.TestCase): self.assertTrue("snmpd" in (p.name() for p in process_iter())) - def test_snmpv3(self): - """ Check if SNMPv3 can be configured and service runs""" + def test_snmpv3_sha(self): + """ Check if SNMPv3 can be configured with SHA authentication and service runs""" - self.session.set(base_path + ['v3', 'engineid', '0xaffedeadbeef']) + self.session.set(base_path + ['v3', 'engineid', '000000000000000000000002']) self.session.set(base_path + ['v3', 'group', 'default', 'mode', 'ro']) # check validate() - a view must be created before this can be comitted with self.assertRaises(ConfigSessionError): @@ -92,19 +92,64 @@ class TestSNMPService(unittest.TestCase): self.session.set(base_path + ['v3', 'view', 'default', 'oid', '1']) self.session.set(base_path + ['v3', 'group', 'default', 'view', 'default']) - self.session.commit() # create user - for authpriv in ['auth', 'privacy']: - self.session.set(base_path + ['v3', 'user', 'vyos', authpriv, 'plaintext-key', 'vyos1234']) + self.session.set(base_path + ['v3', 'user', 'vyos', 'auth', 'plaintext-password', 'vyos12345678']) + self.session.set(base_path + ['v3', 'user', 'vyos', 'auth', 'type', 'sha']) + self.session.set(base_path + ['v3', 'user', 'vyos', 'privacy', 'plaintext-password', 'vyos12345678']) + self.session.set(base_path + ['v3', 'user', 'vyos', 'privacy', 'type', 'aes']) + self.session.set(base_path + ['v3', 'user', 'vyos', 'group', 'default']) + + self.session.commit() + + # commit will alter the CLI values - check if they have been updated: + hashed_password = '4e52fe55fd011c9c51ae2c65f4b78ca93dcafdfe' + tmp = self.session.show_config(base_path + ['v3', 'user', 'vyos', 'auth', 'encrypted-password']).split()[1] + self.assertEqual(tmp, hashed_password) + + tmp = self.session.show_config(base_path + ['v3', 'user', 'vyos', 'privacy', 'encrypted-password']).split()[1] + self.assertEqual(tmp, hashed_password) + + # TODO: read in config file and check values + + # Check for running process + self.assertTrue("snmpd" in (p.name() for p in process_iter())) + + def test_snmpv3_md5(self): + """ Check if SNMPv3 can be configured with MD5 authentication and service runs""" + + self.session.set(base_path + ['v3', 'engineid', '000000000000000000000002']) + self.session.set(base_path + ['v3', 'group', 'default', 'mode', 'ro']) + # check validate() - a view must be created before this can be comitted + with self.assertRaises(ConfigSessionError): + self.session.commit() + + self.session.set(base_path + ['v3', 'view', 'default', 'oid', '1']) + self.session.set(base_path + ['v3', 'group', 'default', 'view', 'default']) + # create user + self.session.set(base_path + ['v3', 'user', 'vyos', 'auth', 'plaintext-password', 'vyos12345678']) + self.session.set(base_path + ['v3', 'user', 'vyos', 'auth', 'type', 'md5']) + self.session.set(base_path + ['v3', 'user', 'vyos', 'privacy', 'plaintext-password', 'vyos12345678']) + self.session.set(base_path + ['v3', 'user', 'vyos', 'privacy', 'type', 'des']) self.session.set(base_path + ['v3', 'user', 'vyos', 'group', 'default']) + self.session.commit() + + # commit will alter the CLI values - check if they have been updated: + hashed_password = '4c67690d45d3dfcd33d0d7e308e370ad' + tmp = self.session.show_config(base_path + ['v3', 'user', 'vyos', 'auth', 'encrypted-password']).split()[1] + self.assertEqual(tmp, hashed_password) + + tmp = self.session.show_config(base_path + ['v3', 'user', 'vyos', 'privacy', 'encrypted-password']).split()[1] + self.assertEqual(tmp, hashed_password) + # TODO: read in config file and check values # Check for running process self.assertTrue("snmpd" in (p.name() for p in process_iter())) + if __name__ == '__main__': unittest.main() diff --git a/scripts/cli/test_service_ssh.py b/scripts/cli/test_service_ssh.py index 072984ca8..3ee498f3d 100755 --- a/scripts/cli/test_service_ssh.py +++ b/scripts/cli/test_service_ssh.py @@ -22,7 +22,7 @@ from psutil import process_iter from vyos.configsession import ConfigSession, ConfigSessionError from vyos.util import read_file -SSHD_CONF = '/etc/ssh/sshd_config' +SSHD_CONF = '/run/ssh/sshd_config' base_path = ['service', 'ssh'] def get_config_value(key): @@ -51,7 +51,7 @@ class TestServiceSSH(unittest.TestCase): self.session.set(base_path + ['port', '1234']) self.session.set(base_path + ['disable-host-validation']) self.session.set(base_path + ['disable-password-authentication']) - self.session.set(base_path + ['loglevel', 'VERBOSE']) + self.session.set(base_path + ['loglevel', 'verbose']) self.session.set(base_path + ['client-keepalive-interval', '100']) self.session.set(base_path + ['listen-address', '127.0.0.1']) diff --git a/scripts/cli/test_system_ntp.py b/scripts/cli/test_system_ntp.py new file mode 100755 index 000000000..856a28916 --- /dev/null +++ b/scripts/cli/test_system_ntp.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2019-2020 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import re +import os +import unittest + +from psutil import process_iter +from vyos.configsession import ConfigSession, ConfigSessionError +from vyos.template import vyos_address_from_cidr, vyos_netmask_from_cidr +from vyos.util import read_file + +NTP_CONF = '/etc/ntp.conf' +base_path = ['system', 'ntp'] + +def get_config_value(key): + tmp = read_file(NTP_CONF) + tmp = re.findall(r'\n?{}\s+(.*)'.format(key), tmp) + # remove possible trailing whitespaces + return [item.strip() for item in tmp] + +class TestSystemNTP(unittest.TestCase): + def setUp(self): + self.session = ConfigSession(os.getpid()) + # ensure we can also run this test on a live system - so lets clean + # out the current configuration :) + self.session.delete(base_path) + + def tearDown(self): + self.session.delete(base_path) + self.session.commit() + del self.session + + def test_ntp_options(self): + """ Test basic NTP support with multiple servers and their options """ + servers = ['192.0.2.1', '192.0.2.2'] + options = ['noselect', 'preempt', 'prefer'] + + for server in servers: + for option in options: + self.session.set(base_path + ['server', server, option]) + + # commit changes + self.session.commit() + + # Check generated configuration + tmp = get_config_value('server') + for server in servers: + test = f'{server} iburst ' + ' '.join(options) + self.assertTrue(test in tmp) + + # Check for running process + self.assertTrue("ntpd" in (p.name() for p in process_iter())) + + def test_ntp_clients(self): + """ Test the allowed-networks statement """ + listen_address = ['127.0.0.1', '::1'] + for listen in listen_address: + self.session.set(base_path + ['listen-address', listen]) + + networks = ['192.0.2.0/24', '2001:db8:1000::/64'] + for network in networks: + self.session.set(base_path + ['allow-clients', 'address', network]) + + # Verify "NTP server not configured" verify() statement + with self.assertRaises(ConfigSessionError): + self.session.commit() + + servers = ['192.0.2.1', '192.0.2.2'] + for server in servers: + self.session.set(base_path + ['server', server]) + + self.session.commit() + + # Check generated client address configuration + for network in networks: + network_address = vyos_address_from_cidr(network) + network_netmask = vyos_netmask_from_cidr(network) + + tmp = get_config_value(f'restrict {network_address}')[0] + test = f'mask {network_netmask} nomodify notrap nopeer' + self.assertTrue(tmp in test) + + # Check listen address + tmp = get_config_value('interface') + test = ['ignore wildcard'] + for listen in listen_address: + test.append(f'listen {listen}') + self.assertEqual(tmp, test) + + # Check for running process + self.assertTrue("ntpd" in (p.name() for p in process_iter())) + +if __name__ == '__main__': + unittest.main() diff --git a/scripts/cli/test_vpn_anyconnect.py b/scripts/cli/test_vpn_anyconnect.py new file mode 100755 index 000000000..dd8ab1609 --- /dev/null +++ b/scripts/cli/test_vpn_anyconnect.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2020 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import re +import os +import unittest + +from psutil import process_iter +from vyos.configsession import ConfigSession, ConfigSessionError +from vyos.util import read_file + +OCSERV_CONF = '/run/ocserv/ocserv.conf' +base_path = ['vpn', 'anyconnect'] +cert = '/etc/ssl/certs/ssl-cert-snakeoil.pem' +cert_key = '/etc/ssl/private/ssl-cert-snakeoil.key' + +class TestVpnAnyconnect(unittest.TestCase): + def setUp(self): + self.session = ConfigSession(os.getpid()) + + def tearDown(self): + # Delete vpn anyconnect configuration + self.session.delete(base_path) + self.session.commit() + + del self.session + + def test_vpn(self): + user = 'vyos_user' + password = 'vyos_pass' + self.session.delete(base_path) + self.session.set(base_path + ["authentication", "local-users", "username", user, "password", password]) + self.session.set(base_path + ["authentication", "mode", "local"]) + self.session.set(base_path + ["network-settings", "client-ip-settings", "subnet", "192.0.2.0/24"]) + self.session.set(base_path + ["ssl", "ca-cert-file", cert]) + self.session.set(base_path + ["ssl", "cert-file", cert]) + self.session.set(base_path + ["ssl", "key-file", cert_key]) + + self.session.commit() + + # Check for running process + self.assertTrue("ocserv-main" in (p.name() for p in process_iter())) + +if __name__ == '__main__': + unittest.main() |