diff options
Diffstat (limited to 'smoketest')
-rw-r--r-- | smoketest/scripts/cli/base_interfaces_test.py | 151 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_interfaces_bonding.py | 4 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_interfaces_bridge.py | 2 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_interfaces_dummy.py | 8 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_interfaces_ethernet.py | 26 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_interfaces_geneve.py | 2 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_interfaces_l2tpv3.py | 2 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_interfaces_macsec.py | 2 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_interfaces_pseudo_ethernet.py | 1 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_interfaces_tunnel.py | 262 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_interfaces_vxlan.py | 2 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_interfaces_wireless.py | 1 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_interfaces_wirelessmodem.py | 2 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_nat.py | 24 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_service_ssh.py | 14 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_system_login.py | 66 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_system_ntp.py | 10 |
17 files changed, 301 insertions, 278 deletions
diff --git a/smoketest/scripts/cli/base_interfaces_test.py b/smoketest/scripts/cli/base_interfaces_test.py index 8ee5395d0..36b085c7f 100644 --- a/smoketest/scripts/cli/base_interfaces_test.py +++ b/smoketest/scripts/cli/base_interfaces_test.py @@ -1,4 +1,4 @@ -# Copyright (C) 2019-2020 VyOS maintainers and contributors +# Copyright (C) 2019-2021 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -12,7 +12,6 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import re import os import unittest import json @@ -51,17 +50,6 @@ def is_mirrored_to(interface, mirror_if, qdisc): ret_val = True return ret_val - -dhcp6c_config_file = '/run/dhcp6c/dhcp6c.{}.conf' -def get_dhcp6c_config_value(interface, key): - tmp = read_file(dhcp6c_config_file.format(interface)) - tmp = re.findall(r'\n?{}\s+(.*)'.format(key), tmp) - - out = [] - for item in tmp: - out.append(item.replace(';','')) - return out - class BasicInterfaceTest: class BaseTest(unittest.TestCase): _test_ip = False @@ -69,6 +57,7 @@ class BasicInterfaceTest: _test_vlan = False _test_qinq = False _test_ipv6 = False + _test_ipv6_pd = False _test_mirror = False _base_path = [] @@ -106,7 +95,7 @@ class BasicInterfaceTest: def test_span_mirror(self): if not self._mirror_interfaces: - return None + self.skipTest('not enabled') # Check the two-way mirror rules of ingress and egress for mirror in self._mirror_interfaces: @@ -175,7 +164,7 @@ class BasicInterfaceTest: def test_ipv6_link_local_address(self): # Common function for IPv6 link-local address assignemnts if not self._test_ipv6: - return None + self.skipTest('not enabled') for interface in self._interfaces: base = self._base_path + [interface] @@ -202,7 +191,7 @@ class BasicInterfaceTest: def test_interface_mtu(self): if not self._test_mtu: - return None + self.skipTest('not enabled') for intf in self._interfaces: base = self._base_path + [intf] @@ -222,7 +211,7 @@ class BasicInterfaceTest: # Testcase if MTU can be changed to 1200 on non IPv6 # enabled interfaces if not self._test_mtu: - return None + self.skipTest('not enabled') old_mtu = self._mtu self._mtu = '1200' @@ -247,7 +236,7 @@ class BasicInterfaceTest: def test_8021q_vlan_interfaces(self): if not self._test_vlan: - return None + self.skipTest('not enabled') for interface in self._interfaces: base = self._base_path + [interface] @@ -274,7 +263,7 @@ class BasicInterfaceTest: def test_8021ad_qinq_vlan_interfaces(self): if not self._test_qinq: - return None + self.skipTest('not enabled') for interface in self._interfaces: base = self._base_path + [interface] @@ -305,7 +294,7 @@ class BasicInterfaceTest: def test_interface_ip_options(self): if not self._test_ip: - return None + self.skipTest('not enabled') for interface in self._interfaces: arp_tmo = '300' @@ -356,7 +345,7 @@ class BasicInterfaceTest: def test_interface_ipv6_options(self): if not self._test_ipv6: - return None + self.skipTest('not enabled') for interface in self._interfaces: dad_transmits = '10' @@ -378,39 +367,119 @@ class BasicInterfaceTest: self.assertEqual(dad_transmits, tmp) - def test_ipv6_dhcpv6_prefix_delegation(self): - if not self._test_ipv6: - return None + def test_dhcpv6pd_auto_sla_id(self): + if not self._test_ipv6_pd: + self.skipTest('not enabled') + + prefix_len = '56' + sla_len = str(64 - int(prefix_len)) + + delegatees = ['dum2340', 'dum2341', 'dum2342', 'dum2343', 'dum2344'] + + for interface in self._interfaces: + path = self._base_path + [interface] + for option in self._options.get(interface, []): + self.session.set(path + option.split()) + + address = '1' + # prefix delegation stuff + pd_base = path + ['dhcpv6-options', 'pd', '0'] + self.session.set(pd_base + ['length', prefix_len]) + + for delegatee in delegatees: + section = Section.section(delegatee) + self.session.set(['interfaces', section, delegatee]) + self.session.set(pd_base + ['interface', delegatee, 'address', address]) + # increment interface address + address = str(int(address) + 1) + + self.session.commit() + + for interface in self._interfaces: + dhcpc6_config = read_file(f'/run/dhcp6c/dhcp6c.{interface}.conf') + + # verify DHCPv6 prefix delegation + self.assertIn(f'prefix ::/{prefix_len} infinity;', dhcpc6_config) + + address = '1' + sla_id = '0' + for delegatee in delegatees: + self.assertIn(f'prefix-interface {delegatee}' + r' {', dhcpc6_config) + self.assertIn(f'ifid {address};', dhcpc6_config) + self.assertIn(f'sla-id {sla_id};', dhcpc6_config) + self.assertIn(f'sla-len {sla_len};', dhcpc6_config) + + # increment sla-id + sla_id = str(int(sla_id) + 1) + # increment interface address + address = str(int(address) + 1) + + # Check for running process + self.assertTrue(process_named_running('dhcp6c')) + + for delegatee in delegatees: + # we can already cleanup the test delegatee interface here + # as until commit() is called, nothing happens + section = Section.section(delegatee) + self.session.delete(['interfaces', section, delegatee]) + + def test_dhcpv6pd_manual_sla_id(self): + if not self._test_ipv6_pd: + self.skipTest('not enabled') + + prefix_len = '56' + sla_len = str(64 - int(prefix_len)) + + delegatees = ['dum3340', 'dum3341', 'dum3342', 'dum3343', 'dum3344'] - address = '1' - sla_id = '0' - sla_len = '8' for interface in self._interfaces: path = self._base_path + [interface] for option in self._options.get(interface, []): self.session.set(path + option.split()) # prefix delegation stuff + address = '1' + sla_id = '1' pd_base = path + ['dhcpv6-options', 'pd', '0'] - self.session.set(pd_base + ['length', '56']) - self.session.set(pd_base + ['interface', interface, 'address', address]) - self.session.set(pd_base + ['interface', interface, 'sla-id', sla_id]) + self.session.set(pd_base + ['length', prefix_len]) + + for delegatee in delegatees: + section = Section.section(delegatee) + self.session.set(['interfaces', section, delegatee]) + self.session.set(pd_base + ['interface', delegatee, 'address', address]) + self.session.set(pd_base + ['interface', delegatee, 'sla-id', sla_id]) + + # increment interface address + address = str(int(address) + 1) + sla_id = str(int(sla_id) + 1) self.session.commit() + # Verify dhcpc6 client configuration for interface in self._interfaces: + address = '1' + sla_id = '1' + dhcpc6_config = read_file(f'/run/dhcp6c/dhcp6c.{interface}.conf') + # verify DHCPv6 prefix delegation - # will return: ['delegation', '::/56 infinity;'] - tmp = get_dhcp6c_config_value(interface, 'prefix')[1].split()[0] # mind the whitespace - self.assertEqual(tmp, '::/56') - tmp = get_dhcp6c_config_value(interface, 'prefix-interface')[0].split()[0] - self.assertEqual(tmp, interface) - tmp = get_dhcp6c_config_value(interface, 'ifid')[0] - self.assertEqual(tmp, address) - tmp = get_dhcp6c_config_value(interface, 'sla-id')[0] - self.assertEqual(tmp, sla_id) - tmp = get_dhcp6c_config_value(interface, 'sla-len')[0] - self.assertEqual(tmp, sla_len) + self.assertIn(f'prefix ::/{prefix_len} infinity;', dhcpc6_config) + + for delegatee in delegatees: + self.assertIn(f'prefix-interface {delegatee}' + r' {', dhcpc6_config) + self.assertIn(f'ifid {address};', dhcpc6_config) + self.assertIn(f'sla-id {sla_id};', dhcpc6_config) + self.assertIn(f'sla-len {sla_len};', dhcpc6_config) + + # increment sla-id + sla_id = str(int(sla_id) + 1) + # increment interface address + address = str(int(address) + 1) # Check for running process self.assertTrue(process_named_running('dhcp6c')) + + for delegatee in delegatees: + # we can already cleanup the test delegatee interface here + # as until commit() is called, nothing happens + section = Section.section(delegatee) + self.session.delete(['interfaces', section, delegatee]) diff --git a/smoketest/scripts/cli/test_interfaces_bonding.py b/smoketest/scripts/cli/test_interfaces_bonding.py index a35682b7c..f42ec3e9b 100755 --- a/smoketest/scripts/cli/test_interfaces_bonding.py +++ b/smoketest/scripts/cli/test_interfaces_bonding.py @@ -26,10 +26,12 @@ from vyos.util import read_file class BondingInterfaceTest(BasicInterfaceTest.BaseTest): def setUp(self): + self._test_ip = True + self._test_ipv6 = True + self._test_ipv6_pd = True self._test_mtu = True self._test_vlan = True self._test_qinq = True - self._test_ipv6 = True self._base_path = ['interfaces', 'bonding'] self._interfaces = ['bond0'] self._mirror_interfaces = ['dum21354'] diff --git a/smoketest/scripts/cli/test_interfaces_bridge.py b/smoketest/scripts/cli/test_interfaces_bridge.py index 7444701c1..03d8f6e9c 100755 --- a/smoketest/scripts/cli/test_interfaces_bridge.py +++ b/smoketest/scripts/cli/test_interfaces_bridge.py @@ -28,7 +28,9 @@ from vyos.util import read_file class BridgeInterfaceTest(BasicInterfaceTest.BaseTest): def setUp(self): + self._test_ip = True self._test_ipv6 = True + self._test_ipv6_pd = True self._test_vlan = True self._test_qinq = True self._base_path = ['interfaces', 'bridge'] diff --git a/smoketest/scripts/cli/test_interfaces_dummy.py b/smoketest/scripts/cli/test_interfaces_dummy.py index c482a6f0b..60465a1d5 100755 --- a/smoketest/scripts/cli/test_interfaces_dummy.py +++ b/smoketest/scripts/cli/test_interfaces_dummy.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2020 VyOS maintainers and contributors +# Copyright (C) 2020-2021 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -20,9 +20,9 @@ from base_interfaces_test import BasicInterfaceTest class DummyInterfaceTest(BasicInterfaceTest.BaseTest): def setUp(self): - self._base_path = ['interfaces', 'dummy'] - self._interfaces = ['dum0', 'dum1', 'dum2'] - super().setUp() + self._base_path = ['interfaces', 'dummy'] + self._interfaces = ['dum435', 'dum8677', 'dum0931', 'dum089'] + super().setUp() if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_interfaces_ethernet.py b/smoketest/scripts/cli/test_interfaces_ethernet.py index 3c4796283..42c1f15df 100755 --- a/smoketest/scripts/cli/test_interfaces_ethernet.py +++ b/smoketest/scripts/cli/test_interfaces_ethernet.py @@ -19,6 +19,7 @@ import re import unittest from base_interfaces_test import BasicInterfaceTest +from vyos.configsession import ConfigSessionError from vyos.ifconfig import Section from vyos.util import cmd from vyos.util import process_named_running @@ -36,10 +37,11 @@ def get_wpa_supplicant_value(interface, key): class EthernetInterfaceTest(BasicInterfaceTest.BaseTest): def setUp(self): self._test_ip = True + self._test_ipv6 = True + self._test_ipv6_pd = True self._test_mtu = True self._test_vlan = True self._test_qinq = True - self._test_ipv6 = True self._base_path = ['interfaces', 'ethernet'] self._mirror_interfaces = ['dum21354'] @@ -123,6 +125,28 @@ class EthernetInterfaceTest(BasicInterfaceTest.BaseTest): self.assertEqual(f'{cpus:x}', f'{rps_cpus:x}') + def test_non_existing_interface(self): + unknonw_interface = self._base_path + ['eth667'] + self.session.set(unknonw_interface) + + # check validate() - interface does not exist + with self.assertRaises(ConfigSessionError): + self.session.commit() + + # we need to remove this wrong interface from the configuration + # manually, else tearDown() will have problem in commit() + self.session.delete(unknonw_interface) + + def test_speed_duplex_verify(self): + for interface in self._interfaces: + self.session.set(self._base_path + [interface, 'speed', '1000']) + + # check validate() - if either speed or duplex is not auto, the + # other one must be manually configured, too + with self.assertRaises(ConfigSessionError): + self.session.commit() + self.session.set(self._base_path + [interface, 'speed', 'auto']) + self.session.commit() def test_eapol_support(self): for interface in self._interfaces: diff --git a/smoketest/scripts/cli/test_interfaces_geneve.py b/smoketest/scripts/cli/test_interfaces_geneve.py index 98f55210f..12cded400 100755 --- a/smoketest/scripts/cli/test_interfaces_geneve.py +++ b/smoketest/scripts/cli/test_interfaces_geneve.py @@ -21,6 +21,8 @@ from base_interfaces_test import BasicInterfaceTest class GeneveInterfaceTest(BasicInterfaceTest.BaseTest): def setUp(self): + self._test_ip = True + self._test_ipv6 = True self._base_path = ['interfaces', 'geneve'] self._options = { 'gnv0': ['vni 10', 'remote 127.0.1.1'], diff --git a/smoketest/scripts/cli/test_interfaces_l2tpv3.py b/smoketest/scripts/cli/test_interfaces_l2tpv3.py index c756bfdd5..81af6d7f4 100755 --- a/smoketest/scripts/cli/test_interfaces_l2tpv3.py +++ b/smoketest/scripts/cli/test_interfaces_l2tpv3.py @@ -22,6 +22,8 @@ from vyos.util import cmd class GeneveInterfaceTest(BasicInterfaceTest.BaseTest): def setUp(self): + self._test_ip = True + self._test_ipv6 = True self._base_path = ['interfaces', 'l2tpv3'] self._options = { 'l2tpeth10': ['local-ip 127.0.0.1', 'remote-ip 127.10.10.10', diff --git a/smoketest/scripts/cli/test_interfaces_macsec.py b/smoketest/scripts/cli/test_interfaces_macsec.py index d9635951f..89743e5fd 100755 --- a/smoketest/scripts/cli/test_interfaces_macsec.py +++ b/smoketest/scripts/cli/test_interfaces_macsec.py @@ -33,6 +33,8 @@ def get_config_value(interface, key): class MACsecInterfaceTest(BasicInterfaceTest.BaseTest): def setUp(self): super().setUp() + self._test_ip = True + self._test_ipv6 = True self._base_path = ['interfaces', 'macsec'] self._options = { 'macsec0': ['source-interface eth0', 'security cipher gcm-aes-128'] } diff --git a/smoketest/scripts/cli/test_interfaces_pseudo_ethernet.py b/smoketest/scripts/cli/test_interfaces_pseudo_ethernet.py index 85e5e70bd..10bd7ca34 100755 --- a/smoketest/scripts/cli/test_interfaces_pseudo_ethernet.py +++ b/smoketest/scripts/cli/test_interfaces_pseudo_ethernet.py @@ -22,6 +22,7 @@ class PEthInterfaceTest(BasicInterfaceTest.BaseTest): def setUp(self): self._test_ip = True self._test_ipv6 = True + self._test_ipv6_pd = True self._test_mtu = True self._test_vlan = True self._test_qinq = True diff --git a/smoketest/scripts/cli/test_interfaces_tunnel.py b/smoketest/scripts/cli/test_interfaces_tunnel.py index ca68cb8ba..f67b813af 100755 --- a/smoketest/scripts/cli/test_interfaces_tunnel.py +++ b/smoketest/scripts/cli/test_interfaces_tunnel.py @@ -62,6 +62,8 @@ def tunnel_conf(interface): class TunnelInterfaceTest(BasicInterfaceTest.BaseTest): def setUp(self): + self._test_ip = True + self._test_ipv6 = True self._test_mtu = True self._base_path = ['interfaces', 'tunnel'] self.local_v4 = '192.0.2.1' @@ -82,85 +84,14 @@ class TunnelInterfaceTest(BasicInterfaceTest.BaseTest): self.session.delete(['interfaces', 'dummy', source_if]) super().tearDown() - def test_ipip(self): - interface = 'tun100' - encapsulation = 'ipip' - local_if_addr = '10.10.10.1/24' - self.session.set(self._base_path + [interface, 'address', local_if_addr]) - - # Must provide an "encapsulation" for tunnel tun10 - with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(self._base_path + [interface, 'encapsulation', encapsulation]) - - # Must configure either local-ip or dhcp-interface for tunnel ipip tun100 - with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(self._base_path + [interface, 'local-ip', self.local_v4]) - - # missing required option remote for ipip - with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(self._base_path + [interface, 'remote-ip', remote_ip4]) - - # Configure Tunnel Source interface - self.session.set(self._base_path + [interface, 'source-interface', source_if]) - - self.session.commit() - - conf = tunnel_conf(interface) - self.assertEqual(interface, conf['ifname']) - self.assertEqual(encapsulation, conf['link_type']) - self.assertEqual(mtu, conf['mtu']) - self.assertEqual(source_if, conf['link']) - - self.assertEqual(self.local_v4, conf['linkinfo']['info_data']['local']) - self.assertEqual(remote_ip4, conf['linkinfo']['info_data']['remote']) - - def test_ipip6(self): - interface = 'tun110' - encapsulation = 'ipip6' - local_if_addr = '10.10.10.1/24' - - self.session.set(self._base_path + [interface, 'address', local_if_addr]) - - # Must provide an "encapsulation" for tunnel tun10 - with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(self._base_path + [interface, 'encapsulation', encapsulation]) - - # Must configure either local-ip or dhcp-interface for tunnel ipip tun100 - with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(self._base_path + [interface, 'local-ip', self.local_v6]) - - # missing required option remote for ipip - with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(self._base_path + [interface, 'remote-ip', remote_ip6]) - - # Configure Tunnel Source interface - self.session.set(self._base_path + [interface, 'source-interface', source_if]) - - self.session.commit() - - conf = tunnel_conf(interface) - self.assertEqual(interface, conf['ifname']) - self.assertEqual('tunnel6', conf['link_type']) - self.assertEqual(mtu, conf['mtu']) - self.assertEqual(source_if, conf['link']) - - self.assertEqual(self.local_v6, conf['linkinfo']['info_data']['local']) - self.assertEqual(remote_ip6, conf['linkinfo']['info_data']['remote']) - - def test_tunnel_verify_ipv4_local_remote_addr(self): + def test_ipv4_encapsulations(self): # When running tests ensure that for certain encapsulation types the # local and remote IP address is actually an IPv4 address interface = f'tun1000' local_if_addr = f'10.10.200.1/24' - for encapsulation in ['ipip', 'sit', 'gre']: + for encapsulation in ['ipip', 'sit', 'gre', 'gre-bridge']: self.session.set(self._base_path + [interface, 'address', local_if_addr]) self.session.set(self._base_path + [interface, 'encapsulation', encapsulation]) self.session.set(self._base_path + [interface, 'local-ip', self.local_v6]) @@ -176,14 +107,35 @@ class TunnelInterfaceTest(BasicInterfaceTest.BaseTest): self.session.commit() self.session.set(self._base_path + [interface, 'remote-ip', remote_ip4]) + self.session.set(self._base_path + [interface, 'source-interface', source_if]) + + # Source interface can not be used with sit and gre-bridge + if encapsulation in ['sit', 'gre-bridge']: + with self.assertRaises(ConfigSessionError): + self.session.commit() + self.session.delete(self._base_path + [interface, 'source-interface']) + # Check if commit is ok self.session.commit() + conf = tunnel_conf(interface) + self.assertEqual(interface, conf['ifname']) + self.assertEqual(mtu, conf['mtu']) + + if encapsulation not in ['sit', 'gre-bridge']: + self.assertEqual(source_if, conf['link']) + self.assertEqual(encapsulation, conf['link_type']) + elif encapsulation in ['gre-bridge']: + self.assertEqual('ether', conf['link_type']) + + self.assertEqual(self.local_v4, conf['linkinfo']['info_data']['local']) + self.assertEqual(remote_ip4, conf['linkinfo']['info_data']['remote']) + # cleanup this instance self.session.delete(self._base_path + [interface]) self.session.commit() - def test_tunnel_verify_ipv6_local_remote_addr(self): + def test_ipv6_encapsulations(self): # When running tests ensure that for certain encapsulation types the # local and remote IP address is actually an IPv6 address @@ -205,9 +157,28 @@ class TunnelInterfaceTest(BasicInterfaceTest.BaseTest): self.session.commit() self.session.set(self._base_path + [interface, 'remote-ip', remote_ip6]) + # Configure Tunnel Source interface + self.session.set(self._base_path + [interface, 'source-interface', source_if]) + # Check if commit is ok self.session.commit() + conf = tunnel_conf(interface) + self.assertEqual(interface, conf['ifname']) + self.assertEqual(mtu, conf['mtu']) + self.assertEqual(source_if, conf['link']) + + # remap encapsulation protocol(s) + if encapsulation in ['ipip6', 'ip6ip6']: + encapsulation = 'tunnel6' + elif encapsulation in ['ip6gre']: + encapsulation = 'gre6' + + self.assertEqual(encapsulation, conf['link_type']) + + self.assertEqual(self.local_v6, conf['linkinfo']['info_data']['local']) + self.assertEqual(remote_ip6, conf['linkinfo']['info_data']['remote']) + # cleanup this instance self.session.delete(self._base_path + [interface]) self.session.commit() @@ -232,148 +203,5 @@ class TunnelInterfaceTest(BasicInterfaceTest.BaseTest): # Check if commit is ok self.session.commit() - def test_tunnel_ip6ip6(self): - interface = 'tun120' - encapsulation = 'ip6ip6' - local_if_addr = '2001:db8:f00::1/24' - - self.session.set(self._base_path + [interface, 'address', local_if_addr]) - - # Must provide an "encapsulation" for tunnel tun10 - with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(self._base_path + [interface, 'encapsulation', encapsulation]) - - # Must configure either local-ip or dhcp-interface for tunnel ipip tun100 - with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(self._base_path + [interface, 'local-ip', self.local_v6]) - - # missing required option remote for ipip - with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(self._base_path + [interface, 'remote-ip', remote_ip6]) - - # Configure Tunnel Source interface - self.session.set(self._base_path + [interface, 'source-interface', source_if]) - - self.session.commit() - - conf = tunnel_conf(interface) - self.assertEqual(interface, conf['ifname']) - self.assertEqual('tunnel6', conf['link_type']) - self.assertEqual(mtu, conf['mtu']) - self.assertEqual(source_if, conf['link']) - - self.assertEqual(self.local_v6, conf['linkinfo']['info_data']['local']) - self.assertEqual(remote_ip6, conf['linkinfo']['info_data']['remote']) - - def test_tunnel_gre_ipv4(self): - interface = 'tun200' - encapsulation = 'gre' - local_if_addr = '172.16.1.1/24' - - self.session.set(self._base_path + [interface, 'address', local_if_addr]) - - # Must provide an "encapsulation" for tunnel tun10 - with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(self._base_path + [interface, 'encapsulation', encapsulation]) - - # Must configure either local-ip or dhcp-interface - with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(self._base_path + [interface, 'local-ip', self.local_v4]) - - # No assertion is raised for GRE remote-ip when missing - self.session.set(self._base_path + [interface, 'remote-ip', remote_ip4]) - - # Configure Tunnel Source interface - self.session.set(self._base_path + [interface, 'source-interface', source_if]) - - self.session.commit() - - conf = tunnel_conf(interface) - self.assertEqual(interface, conf['ifname']) - self.assertEqual(encapsulation, conf['link_type']) - self.assertEqual(mtu, conf['mtu']) - self.assertEqual(source_if, conf['link']) - - self.assertEqual(self.local_v4, conf['linkinfo']['info_data']['local']) - self.assertEqual(remote_ip4, conf['linkinfo']['info_data']['remote']) - - - def test_gre_ipv6(self): - interface = 'tun210' - encapsulation = 'ip6gre' - local_if_addr = '2001:db8:f01::1/24' - - self.session.set(self._base_path + [interface, 'address', local_if_addr]) - - # Must provide an "encapsulation" for tunnel tun10 - with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(self._base_path + [interface, 'encapsulation', encapsulation]) - - # Must configure either local-ip or dhcp-interface - with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(self._base_path + [interface, 'local-ip', self.local_v6]) - - # No assertion is raised for GRE remote-ip when missing - self.session.set(self._base_path + [interface, 'remote-ip', remote_ip6]) - - # Configure Tunnel Source interface - self.session.set(self._base_path + [interface, 'source-interface', source_if]) - - self.session.commit() - - conf = tunnel_conf(interface) - self.assertEqual(interface, conf['ifname']) - self.assertEqual(encapsulation, conf['link_type']) - self.assertEqual(mtu, conf['mtu']) - self.assertEqual(source_if, conf['link']) - - self.assertEqual(self.local_v6, conf['linkinfo']['info_data']['local']) - self.assertEqual(remote_ip6, conf['linkinfo']['info_data']['remote']) - - - def test_tunnel_sit(self): - interface = 'tun300' - encapsulation = 'sit' - local_if_addr = '172.16.2.1/24' - - self.session.set(self._base_path + [interface, 'address', local_if_addr]) - - # Must provide an "encapsulation" for tunnel tun10 - with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(self._base_path + [interface, 'encapsulation', encapsulation]) - - # Must configure either local-ip or dhcp-interface - with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(self._base_path + [interface, 'local-ip', self.local_v4]) - - # No assertion is raised for GRE remote-ip when missing - self.session.set(self._base_path + [interface, 'remote-ip', remote_ip4]) - - # Source interface can not be used with sit - self.session.set(self._base_path + [interface, 'source-interface', source_if]) - with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.delete(self._base_path + [interface, 'source-interface']) - - self.session.commit() - - conf = tunnel_conf(interface) - self.assertEqual(interface, conf['ifname']) - self.assertEqual(encapsulation, conf['link_type']) - self.assertEqual(mtu, conf['mtu']) - - self.assertEqual(self.local_v4, conf['linkinfo']['info_data']['local']) - self.assertEqual(remote_ip4, conf['linkinfo']['info_data']['remote']) - - if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_interfaces_vxlan.py b/smoketest/scripts/cli/test_interfaces_vxlan.py index a9b0fc5a1..a726aa610 100755 --- a/smoketest/scripts/cli/test_interfaces_vxlan.py +++ b/smoketest/scripts/cli/test_interfaces_vxlan.py @@ -21,6 +21,8 @@ from base_interfaces_test import BasicInterfaceTest class VXLANInterfaceTest(BasicInterfaceTest.BaseTest): def setUp(self): + self._test_ip = True + self._test_ipv6 = True self._test_mtu = True self._base_path = ['interfaces', 'vxlan'] self._options = { diff --git a/smoketest/scripts/cli/test_interfaces_wireless.py b/smoketest/scripts/cli/test_interfaces_wireless.py index ffaa7d523..51d97f032 100755 --- a/smoketest/scripts/cli/test_interfaces_wireless.py +++ b/smoketest/scripts/cli/test_interfaces_wireless.py @@ -33,6 +33,7 @@ def get_config_value(interface, key): class WirelessInterfaceTest(BasicInterfaceTest.BaseTest): def setUp(self): + self._test_ip = True self._base_path = ['interfaces', 'wireless'] self._options = { 'wlan0': ['physical-device phy0', 'ssid VyOS-WIFI-0', diff --git a/smoketest/scripts/cli/test_interfaces_wirelessmodem.py b/smoketest/scripts/cli/test_interfaces_wirelessmodem.py index 45cd069f4..696a6946b 100755 --- a/smoketest/scripts/cli/test_interfaces_wirelessmodem.py +++ b/smoketest/scripts/cli/test_interfaces_wirelessmodem.py @@ -40,7 +40,7 @@ class WWANInterfaceTest(unittest.TestCase): self.session.commit() del self.session - def test_wlm_1(self): + def test_wwan(self): for interface in self._interfaces: self.session.set(base_path + [interface, 'no-peer-dns']) self.session.set(base_path + [interface, 'connect-on-demand']) diff --git a/smoketest/scripts/cli/test_nat.py b/smoketest/scripts/cli/test_nat.py index 7ca82f86f..b5702d691 100755 --- a/smoketest/scripts/cli/test_nat.py +++ b/smoketest/scripts/cli/test_nat.py @@ -138,7 +138,6 @@ class TestNAT(unittest.TestCase): else: self.assertEqual(iface, inbound_iface_200) - def test_snat_required_translation_address(self): # T2813: Ensure translation address is specified rule = '5' @@ -156,5 +155,28 @@ class TestNAT(unittest.TestCase): self.session.set(src_path + ['rule', rule, 'translation', 'address', 'masquerade']) self.session.commit() + def test_dnat_negated_addresses(self): + # T3186: negated addresses are not accepted by nftables + rule = '1000' + self.session.set(dst_path + ['rule', rule, 'destination', 'address', '!192.0.2.1']) + self.session.set(dst_path + ['rule', rule, 'destination', 'port', '53']) + self.session.set(dst_path + ['rule', rule, 'inbound-interface', 'eth0']) + self.session.set(dst_path + ['rule', rule, 'protocol', 'tcp_udp']) + self.session.set(dst_path + ['rule', rule, 'source', 'address', '!192.0.2.1']) + self.session.set(dst_path + ['rule', rule, 'translation', 'address', '192.0.2.1']) + self.session.set(dst_path + ['rule', rule, 'translation', 'port', '53']) + self.session.commit() + + def test_nat_no_rules(self): + # T3206: deleting all rules but keep the direction 'destination' or + # 'source' resulteds in KeyError: 'rule'. + # + # Test that both 'nat destination' and 'nat source' nodes can exist + # without any rule + self.session.set(src_path) + self.session.set(dst_path) + self.session.commit() + + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_service_ssh.py b/smoketest/scripts/cli/test_service_ssh.py index 0bb907c3a..eede042de 100755 --- a/smoketest/scripts/cli/test_service_ssh.py +++ b/smoketest/scripts/cli/test_service_ssh.py @@ -25,7 +25,7 @@ from vyos.util import process_named_running from vyos.util import read_file PROCESS_NAME = 'sshd' -SSHD_CONF = '/run/ssh/sshd_config' +SSHD_CONF = '/run/sshd/sshd_config' base_path = ['service', 'ssh'] vrf = 'ssh-test' @@ -44,11 +44,6 @@ class TestServiceSSH(unittest.TestCase): def tearDown(self): # delete testing SSH config self.session.delete(base_path) - # restore "plain" SSH access - self.session.set(base_path) - # delete VRF - self.session.delete(['vrf', 'name', vrf]) - self.session.commit() del self.session @@ -109,7 +104,7 @@ class TestServiceSSH(unittest.TestCase): def test_ssh_multiple_listen_addresses(self): # Check if SSH service can be configured and runs with multiple # listen ports and listen-addresses - ports = ['22', '2222'] + ports = ['22', '2222', '2223', '2224'] for port in ports: self.session.set(base_path + ['port', port]) @@ -143,7 +138,7 @@ class TestServiceSSH(unittest.TestCase): with self.assertRaises(ConfigSessionError): self.session.commit() - self.session.set(['vrf', 'name', vrf, 'table', '1001']) + self.session.set(['vrf', 'name', vrf, 'table', '1338']) # commit changes self.session.commit() @@ -159,5 +154,8 @@ class TestServiceSSH(unittest.TestCase): tmp = cmd(f'ip vrf pids {vrf}') self.assertIn(PROCESS_NAME, tmp) + # delete VRF + self.session.delete(['vrf', 'name', vrf]) + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_system_login.py b/smoketest/scripts/cli/test_system_login.py index 6188cf38b..bb6f57fc2 100755 --- a/smoketest/scripts/cli/test_system_login.py +++ b/smoketest/scripts/cli/test_system_login.py @@ -24,8 +24,10 @@ from platform import release as kernel_version from subprocess import Popen, PIPE from vyos.configsession import ConfigSession +from vyos.configsession import ConfigSessionError from vyos.util import cmd from vyos.util import read_file +from vyos.template import inc_ip base_path = ['system', 'login'] users = ['vyos1', 'vyos2'] @@ -42,7 +44,7 @@ class TestSystemLogin(unittest.TestCase): self.session.commit() del self.session - def test_local_user(self): + def test_system_login_user(self): # Check if user can be created and we can SSH to localhost self.session.set(['service', 'ssh', 'port', '22']) @@ -82,7 +84,7 @@ class TestSystemLogin(unittest.TestCase): for option in options: self.assertIn(f'{option}=y', kernel_config) - def test_radius_config(self): + def test_system_login_radius_ipv4(self): # Verify generated RADIUS configuration files radius_key = 'VyOSsecretVyOS' @@ -95,6 +97,12 @@ class TestSystemLogin(unittest.TestCase): self.session.set(base_path + ['radius', 'server', radius_server, 'port', radius_port]) self.session.set(base_path + ['radius', 'server', radius_server, 'timeout', radius_timeout]) self.session.set(base_path + ['radius', 'source-address', radius_source]) + self.session.set(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)]) + + # check validate() - Only one IPv4 source-address supported + with self.assertRaises(ConfigSessionError): + self.session.commit() + self.session.delete(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)]) self.session.commit() @@ -130,5 +138,59 @@ class TestSystemLogin(unittest.TestCase): tmp = re.findall(r'group:\s+mapname\s+files', nsswitch_conf) self.assertTrue(tmp) + def test_system_login_radius_ipv6(self): + # Verify generated RADIUS configuration files + + radius_key = 'VyOS-VyOS' + radius_server = '2001:db8::1' + radius_source = '::1' + radius_port = '4000' + radius_timeout = '4' + + self.session.set(base_path + ['radius', 'server', radius_server, 'key', radius_key]) + self.session.set(base_path + ['radius', 'server', radius_server, 'port', radius_port]) + self.session.set(base_path + ['radius', 'server', radius_server, 'timeout', radius_timeout]) + self.session.set(base_path + ['radius', 'source-address', radius_source]) + self.session.set(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)]) + + # check validate() - Only one IPv4 source-address supported + with self.assertRaises(ConfigSessionError): + self.session.commit() + self.session.delete(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)]) + + self.session.commit() + + # this file must be read with higher permissions + pam_radius_auth_conf = cmd('sudo cat /etc/pam_radius_auth.conf') + tmp = re.findall(r'\n?\[{}\]:{}\s+{}\s+{}\s+\[{}\]'.format(radius_server, + radius_port, radius_key, radius_timeout, + radius_source), pam_radius_auth_conf) + self.assertTrue(tmp) + + # required, static options + self.assertIn('priv-lvl 15', pam_radius_auth_conf) + self.assertIn('mapped_priv_user radius_priv_user', pam_radius_auth_conf) + + # PAM + pam_common_account = read_file('/etc/pam.d/common-account') + self.assertIn('pam_radius_auth.so', pam_common_account) + + pam_common_auth = read_file('/etc/pam.d/common-auth') + self.assertIn('pam_radius_auth.so', pam_common_auth) + + pam_common_session = read_file('/etc/pam.d/common-session') + self.assertIn('pam_radius_auth.so', pam_common_session) + + pam_common_session_noninteractive = read_file('/etc/pam.d/common-session-noninteractive') + self.assertIn('pam_radius_auth.so', pam_common_session_noninteractive) + + # NSS + nsswitch_conf = read_file('/etc/nsswitch.conf') + tmp = re.findall(r'passwd:\s+mapuid\s+files\s+mapname', nsswitch_conf) + self.assertTrue(tmp) + + tmp = re.findall(r'group:\s+mapname\s+files', nsswitch_conf) + self.assertTrue(tmp) + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_system_ntp.py b/smoketest/scripts/cli/test_system_ntp.py index 7d1bc144f..986c8dfb2 100755 --- a/smoketest/scripts/cli/test_system_ntp.py +++ b/smoketest/scripts/cli/test_system_ntp.py @@ -76,7 +76,11 @@ class TestSystemNTP(unittest.TestCase): self.assertTrue(process_named_running(PROCESS_NAME)) def test_ntp_clients(self): - # Test the allowed-networks statement + """ Test the allowed-networks statement """ + listen_address = ['127.0.0.1', '::1'] + for listen in listen_address: + self.session.set(base_path + ['listen-address', listen]) + networks = ['192.0.2.0/24', '2001:db8:1000::/64'] for network in networks: self.session.set(base_path + ['allow-clients', 'address', network]) @@ -102,7 +106,9 @@ class TestSystemNTP(unittest.TestCase): # Check listen address tmp = get_config_value('interface') - test = ['ignore wildcard', 'listen 127.0.0.1', 'listen ::1'] + test = ['ignore wildcard'] + for listen in listen_address: + test.append(f'listen {listen}') self.assertEqual(tmp, test) # Check for running process |