summaryrefslogtreecommitdiff
path: root/smoketest
diff options
context:
space:
mode:
Diffstat (limited to 'smoketest')
-rw-r--r--smoketest/scripts/cli/base_interfaces_test.py151
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_bonding.py4
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_bridge.py2
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_dummy.py8
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_ethernet.py26
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_geneve.py2
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_l2tpv3.py2
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_macsec.py2
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_pseudo_ethernet.py1
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_tunnel.py262
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_vxlan.py2
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_wireless.py1
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_wirelessmodem.py2
-rwxr-xr-xsmoketest/scripts/cli/test_nat.py24
-rwxr-xr-xsmoketest/scripts/cli/test_service_ssh.py14
-rwxr-xr-xsmoketest/scripts/cli/test_system_login.py66
-rwxr-xr-xsmoketest/scripts/cli/test_system_ntp.py10
17 files changed, 301 insertions, 278 deletions
diff --git a/smoketest/scripts/cli/base_interfaces_test.py b/smoketest/scripts/cli/base_interfaces_test.py
index 8ee5395d0..36b085c7f 100644
--- a/smoketest/scripts/cli/base_interfaces_test.py
+++ b/smoketest/scripts/cli/base_interfaces_test.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2019-2020 VyOS maintainers and contributors
+# Copyright (C) 2019-2021 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -12,7 +12,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import re
import os
import unittest
import json
@@ -51,17 +50,6 @@ def is_mirrored_to(interface, mirror_if, qdisc):
ret_val = True
return ret_val
-
-dhcp6c_config_file = '/run/dhcp6c/dhcp6c.{}.conf'
-def get_dhcp6c_config_value(interface, key):
- tmp = read_file(dhcp6c_config_file.format(interface))
- tmp = re.findall(r'\n?{}\s+(.*)'.format(key), tmp)
-
- out = []
- for item in tmp:
- out.append(item.replace(';',''))
- return out
-
class BasicInterfaceTest:
class BaseTest(unittest.TestCase):
_test_ip = False
@@ -69,6 +57,7 @@ class BasicInterfaceTest:
_test_vlan = False
_test_qinq = False
_test_ipv6 = False
+ _test_ipv6_pd = False
_test_mirror = False
_base_path = []
@@ -106,7 +95,7 @@ class BasicInterfaceTest:
def test_span_mirror(self):
if not self._mirror_interfaces:
- return None
+ self.skipTest('not enabled')
# Check the two-way mirror rules of ingress and egress
for mirror in self._mirror_interfaces:
@@ -175,7 +164,7 @@ class BasicInterfaceTest:
def test_ipv6_link_local_address(self):
# Common function for IPv6 link-local address assignemnts
if not self._test_ipv6:
- return None
+ self.skipTest('not enabled')
for interface in self._interfaces:
base = self._base_path + [interface]
@@ -202,7 +191,7 @@ class BasicInterfaceTest:
def test_interface_mtu(self):
if not self._test_mtu:
- return None
+ self.skipTest('not enabled')
for intf in self._interfaces:
base = self._base_path + [intf]
@@ -222,7 +211,7 @@ class BasicInterfaceTest:
# Testcase if MTU can be changed to 1200 on non IPv6
# enabled interfaces
if not self._test_mtu:
- return None
+ self.skipTest('not enabled')
old_mtu = self._mtu
self._mtu = '1200'
@@ -247,7 +236,7 @@ class BasicInterfaceTest:
def test_8021q_vlan_interfaces(self):
if not self._test_vlan:
- return None
+ self.skipTest('not enabled')
for interface in self._interfaces:
base = self._base_path + [interface]
@@ -274,7 +263,7 @@ class BasicInterfaceTest:
def test_8021ad_qinq_vlan_interfaces(self):
if not self._test_qinq:
- return None
+ self.skipTest('not enabled')
for interface in self._interfaces:
base = self._base_path + [interface]
@@ -305,7 +294,7 @@ class BasicInterfaceTest:
def test_interface_ip_options(self):
if not self._test_ip:
- return None
+ self.skipTest('not enabled')
for interface in self._interfaces:
arp_tmo = '300'
@@ -356,7 +345,7 @@ class BasicInterfaceTest:
def test_interface_ipv6_options(self):
if not self._test_ipv6:
- return None
+ self.skipTest('not enabled')
for interface in self._interfaces:
dad_transmits = '10'
@@ -378,39 +367,119 @@ class BasicInterfaceTest:
self.assertEqual(dad_transmits, tmp)
- def test_ipv6_dhcpv6_prefix_delegation(self):
- if not self._test_ipv6:
- return None
+ def test_dhcpv6pd_auto_sla_id(self):
+ if not self._test_ipv6_pd:
+ self.skipTest('not enabled')
+
+ prefix_len = '56'
+ sla_len = str(64 - int(prefix_len))
+
+ delegatees = ['dum2340', 'dum2341', 'dum2342', 'dum2343', 'dum2344']
+
+ for interface in self._interfaces:
+ path = self._base_path + [interface]
+ for option in self._options.get(interface, []):
+ self.session.set(path + option.split())
+
+ address = '1'
+ # prefix delegation stuff
+ pd_base = path + ['dhcpv6-options', 'pd', '0']
+ self.session.set(pd_base + ['length', prefix_len])
+
+ for delegatee in delegatees:
+ section = Section.section(delegatee)
+ self.session.set(['interfaces', section, delegatee])
+ self.session.set(pd_base + ['interface', delegatee, 'address', address])
+ # increment interface address
+ address = str(int(address) + 1)
+
+ self.session.commit()
+
+ for interface in self._interfaces:
+ dhcpc6_config = read_file(f'/run/dhcp6c/dhcp6c.{interface}.conf')
+
+ # verify DHCPv6 prefix delegation
+ self.assertIn(f'prefix ::/{prefix_len} infinity;', dhcpc6_config)
+
+ address = '1'
+ sla_id = '0'
+ for delegatee in delegatees:
+ self.assertIn(f'prefix-interface {delegatee}' + r' {', dhcpc6_config)
+ self.assertIn(f'ifid {address};', dhcpc6_config)
+ self.assertIn(f'sla-id {sla_id};', dhcpc6_config)
+ self.assertIn(f'sla-len {sla_len};', dhcpc6_config)
+
+ # increment sla-id
+ sla_id = str(int(sla_id) + 1)
+ # increment interface address
+ address = str(int(address) + 1)
+
+ # Check for running process
+ self.assertTrue(process_named_running('dhcp6c'))
+
+ for delegatee in delegatees:
+ # we can already cleanup the test delegatee interface here
+ # as until commit() is called, nothing happens
+ section = Section.section(delegatee)
+ self.session.delete(['interfaces', section, delegatee])
+
+ def test_dhcpv6pd_manual_sla_id(self):
+ if not self._test_ipv6_pd:
+ self.skipTest('not enabled')
+
+ prefix_len = '56'
+ sla_len = str(64 - int(prefix_len))
+
+ delegatees = ['dum3340', 'dum3341', 'dum3342', 'dum3343', 'dum3344']
- address = '1'
- sla_id = '0'
- sla_len = '8'
for interface in self._interfaces:
path = self._base_path + [interface]
for option in self._options.get(interface, []):
self.session.set(path + option.split())
# prefix delegation stuff
+ address = '1'
+ sla_id = '1'
pd_base = path + ['dhcpv6-options', 'pd', '0']
- self.session.set(pd_base + ['length', '56'])
- self.session.set(pd_base + ['interface', interface, 'address', address])
- self.session.set(pd_base + ['interface', interface, 'sla-id', sla_id])
+ self.session.set(pd_base + ['length', prefix_len])
+
+ for delegatee in delegatees:
+ section = Section.section(delegatee)
+ self.session.set(['interfaces', section, delegatee])
+ self.session.set(pd_base + ['interface', delegatee, 'address', address])
+ self.session.set(pd_base + ['interface', delegatee, 'sla-id', sla_id])
+
+ # increment interface address
+ address = str(int(address) + 1)
+ sla_id = str(int(sla_id) + 1)
self.session.commit()
+ # Verify dhcpc6 client configuration
for interface in self._interfaces:
+ address = '1'
+ sla_id = '1'
+ dhcpc6_config = read_file(f'/run/dhcp6c/dhcp6c.{interface}.conf')
+
# verify DHCPv6 prefix delegation
- # will return: ['delegation', '::/56 infinity;']
- tmp = get_dhcp6c_config_value(interface, 'prefix')[1].split()[0] # mind the whitespace
- self.assertEqual(tmp, '::/56')
- tmp = get_dhcp6c_config_value(interface, 'prefix-interface')[0].split()[0]
- self.assertEqual(tmp, interface)
- tmp = get_dhcp6c_config_value(interface, 'ifid')[0]
- self.assertEqual(tmp, address)
- tmp = get_dhcp6c_config_value(interface, 'sla-id')[0]
- self.assertEqual(tmp, sla_id)
- tmp = get_dhcp6c_config_value(interface, 'sla-len')[0]
- self.assertEqual(tmp, sla_len)
+ self.assertIn(f'prefix ::/{prefix_len} infinity;', dhcpc6_config)
+
+ for delegatee in delegatees:
+ self.assertIn(f'prefix-interface {delegatee}' + r' {', dhcpc6_config)
+ self.assertIn(f'ifid {address};', dhcpc6_config)
+ self.assertIn(f'sla-id {sla_id};', dhcpc6_config)
+ self.assertIn(f'sla-len {sla_len};', dhcpc6_config)
+
+ # increment sla-id
+ sla_id = str(int(sla_id) + 1)
+ # increment interface address
+ address = str(int(address) + 1)
# Check for running process
self.assertTrue(process_named_running('dhcp6c'))
+
+ for delegatee in delegatees:
+ # we can already cleanup the test delegatee interface here
+ # as until commit() is called, nothing happens
+ section = Section.section(delegatee)
+ self.session.delete(['interfaces', section, delegatee])
diff --git a/smoketest/scripts/cli/test_interfaces_bonding.py b/smoketest/scripts/cli/test_interfaces_bonding.py
index a35682b7c..f42ec3e9b 100755
--- a/smoketest/scripts/cli/test_interfaces_bonding.py
+++ b/smoketest/scripts/cli/test_interfaces_bonding.py
@@ -26,10 +26,12 @@ from vyos.util import read_file
class BondingInterfaceTest(BasicInterfaceTest.BaseTest):
def setUp(self):
+ self._test_ip = True
+ self._test_ipv6 = True
+ self._test_ipv6_pd = True
self._test_mtu = True
self._test_vlan = True
self._test_qinq = True
- self._test_ipv6 = True
self._base_path = ['interfaces', 'bonding']
self._interfaces = ['bond0']
self._mirror_interfaces = ['dum21354']
diff --git a/smoketest/scripts/cli/test_interfaces_bridge.py b/smoketest/scripts/cli/test_interfaces_bridge.py
index 7444701c1..03d8f6e9c 100755
--- a/smoketest/scripts/cli/test_interfaces_bridge.py
+++ b/smoketest/scripts/cli/test_interfaces_bridge.py
@@ -28,7 +28,9 @@ from vyos.util import read_file
class BridgeInterfaceTest(BasicInterfaceTest.BaseTest):
def setUp(self):
+ self._test_ip = True
self._test_ipv6 = True
+ self._test_ipv6_pd = True
self._test_vlan = True
self._test_qinq = True
self._base_path = ['interfaces', 'bridge']
diff --git a/smoketest/scripts/cli/test_interfaces_dummy.py b/smoketest/scripts/cli/test_interfaces_dummy.py
index c482a6f0b..60465a1d5 100755
--- a/smoketest/scripts/cli/test_interfaces_dummy.py
+++ b/smoketest/scripts/cli/test_interfaces_dummy.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2020 VyOS maintainers and contributors
+# Copyright (C) 2020-2021 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -20,9 +20,9 @@ from base_interfaces_test import BasicInterfaceTest
class DummyInterfaceTest(BasicInterfaceTest.BaseTest):
def setUp(self):
- self._base_path = ['interfaces', 'dummy']
- self._interfaces = ['dum0', 'dum1', 'dum2']
- super().setUp()
+ self._base_path = ['interfaces', 'dummy']
+ self._interfaces = ['dum435', 'dum8677', 'dum0931', 'dum089']
+ super().setUp()
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_interfaces_ethernet.py b/smoketest/scripts/cli/test_interfaces_ethernet.py
index 3c4796283..42c1f15df 100755
--- a/smoketest/scripts/cli/test_interfaces_ethernet.py
+++ b/smoketest/scripts/cli/test_interfaces_ethernet.py
@@ -19,6 +19,7 @@ import re
import unittest
from base_interfaces_test import BasicInterfaceTest
+from vyos.configsession import ConfigSessionError
from vyos.ifconfig import Section
from vyos.util import cmd
from vyos.util import process_named_running
@@ -36,10 +37,11 @@ def get_wpa_supplicant_value(interface, key):
class EthernetInterfaceTest(BasicInterfaceTest.BaseTest):
def setUp(self):
self._test_ip = True
+ self._test_ipv6 = True
+ self._test_ipv6_pd = True
self._test_mtu = True
self._test_vlan = True
self._test_qinq = True
- self._test_ipv6 = True
self._base_path = ['interfaces', 'ethernet']
self._mirror_interfaces = ['dum21354']
@@ -123,6 +125,28 @@ class EthernetInterfaceTest(BasicInterfaceTest.BaseTest):
self.assertEqual(f'{cpus:x}', f'{rps_cpus:x}')
+ def test_non_existing_interface(self):
+ unknonw_interface = self._base_path + ['eth667']
+ self.session.set(unknonw_interface)
+
+ # check validate() - interface does not exist
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+
+ # we need to remove this wrong interface from the configuration
+ # manually, else tearDown() will have problem in commit()
+ self.session.delete(unknonw_interface)
+
+ def test_speed_duplex_verify(self):
+ for interface in self._interfaces:
+ self.session.set(self._base_path + [interface, 'speed', '1000'])
+
+ # check validate() - if either speed or duplex is not auto, the
+ # other one must be manually configured, too
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+ self.session.set(self._base_path + [interface, 'speed', 'auto'])
+ self.session.commit()
def test_eapol_support(self):
for interface in self._interfaces:
diff --git a/smoketest/scripts/cli/test_interfaces_geneve.py b/smoketest/scripts/cli/test_interfaces_geneve.py
index 98f55210f..12cded400 100755
--- a/smoketest/scripts/cli/test_interfaces_geneve.py
+++ b/smoketest/scripts/cli/test_interfaces_geneve.py
@@ -21,6 +21,8 @@ from base_interfaces_test import BasicInterfaceTest
class GeneveInterfaceTest(BasicInterfaceTest.BaseTest):
def setUp(self):
+ self._test_ip = True
+ self._test_ipv6 = True
self._base_path = ['interfaces', 'geneve']
self._options = {
'gnv0': ['vni 10', 'remote 127.0.1.1'],
diff --git a/smoketest/scripts/cli/test_interfaces_l2tpv3.py b/smoketest/scripts/cli/test_interfaces_l2tpv3.py
index c756bfdd5..81af6d7f4 100755
--- a/smoketest/scripts/cli/test_interfaces_l2tpv3.py
+++ b/smoketest/scripts/cli/test_interfaces_l2tpv3.py
@@ -22,6 +22,8 @@ from vyos.util import cmd
class GeneveInterfaceTest(BasicInterfaceTest.BaseTest):
def setUp(self):
+ self._test_ip = True
+ self._test_ipv6 = True
self._base_path = ['interfaces', 'l2tpv3']
self._options = {
'l2tpeth10': ['local-ip 127.0.0.1', 'remote-ip 127.10.10.10',
diff --git a/smoketest/scripts/cli/test_interfaces_macsec.py b/smoketest/scripts/cli/test_interfaces_macsec.py
index d9635951f..89743e5fd 100755
--- a/smoketest/scripts/cli/test_interfaces_macsec.py
+++ b/smoketest/scripts/cli/test_interfaces_macsec.py
@@ -33,6 +33,8 @@ def get_config_value(interface, key):
class MACsecInterfaceTest(BasicInterfaceTest.BaseTest):
def setUp(self):
super().setUp()
+ self._test_ip = True
+ self._test_ipv6 = True
self._base_path = ['interfaces', 'macsec']
self._options = { 'macsec0': ['source-interface eth0', 'security cipher gcm-aes-128'] }
diff --git a/smoketest/scripts/cli/test_interfaces_pseudo_ethernet.py b/smoketest/scripts/cli/test_interfaces_pseudo_ethernet.py
index 85e5e70bd..10bd7ca34 100755
--- a/smoketest/scripts/cli/test_interfaces_pseudo_ethernet.py
+++ b/smoketest/scripts/cli/test_interfaces_pseudo_ethernet.py
@@ -22,6 +22,7 @@ class PEthInterfaceTest(BasicInterfaceTest.BaseTest):
def setUp(self):
self._test_ip = True
self._test_ipv6 = True
+ self._test_ipv6_pd = True
self._test_mtu = True
self._test_vlan = True
self._test_qinq = True
diff --git a/smoketest/scripts/cli/test_interfaces_tunnel.py b/smoketest/scripts/cli/test_interfaces_tunnel.py
index ca68cb8ba..f67b813af 100755
--- a/smoketest/scripts/cli/test_interfaces_tunnel.py
+++ b/smoketest/scripts/cli/test_interfaces_tunnel.py
@@ -62,6 +62,8 @@ def tunnel_conf(interface):
class TunnelInterfaceTest(BasicInterfaceTest.BaseTest):
def setUp(self):
+ self._test_ip = True
+ self._test_ipv6 = True
self._test_mtu = True
self._base_path = ['interfaces', 'tunnel']
self.local_v4 = '192.0.2.1'
@@ -82,85 +84,14 @@ class TunnelInterfaceTest(BasicInterfaceTest.BaseTest):
self.session.delete(['interfaces', 'dummy', source_if])
super().tearDown()
- def test_ipip(self):
- interface = 'tun100'
- encapsulation = 'ipip'
- local_if_addr = '10.10.10.1/24'
- self.session.set(self._base_path + [interface, 'address', local_if_addr])
-
- # Must provide an "encapsulation" for tunnel tun10
- with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(self._base_path + [interface, 'encapsulation', encapsulation])
-
- # Must configure either local-ip or dhcp-interface for tunnel ipip tun100
- with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(self._base_path + [interface, 'local-ip', self.local_v4])
-
- # missing required option remote for ipip
- with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(self._base_path + [interface, 'remote-ip', remote_ip4])
-
- # Configure Tunnel Source interface
- self.session.set(self._base_path + [interface, 'source-interface', source_if])
-
- self.session.commit()
-
- conf = tunnel_conf(interface)
- self.assertEqual(interface, conf['ifname'])
- self.assertEqual(encapsulation, conf['link_type'])
- self.assertEqual(mtu, conf['mtu'])
- self.assertEqual(source_if, conf['link'])
-
- self.assertEqual(self.local_v4, conf['linkinfo']['info_data']['local'])
- self.assertEqual(remote_ip4, conf['linkinfo']['info_data']['remote'])
-
- def test_ipip6(self):
- interface = 'tun110'
- encapsulation = 'ipip6'
- local_if_addr = '10.10.10.1/24'
-
- self.session.set(self._base_path + [interface, 'address', local_if_addr])
-
- # Must provide an "encapsulation" for tunnel tun10
- with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(self._base_path + [interface, 'encapsulation', encapsulation])
-
- # Must configure either local-ip or dhcp-interface for tunnel ipip tun100
- with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(self._base_path + [interface, 'local-ip', self.local_v6])
-
- # missing required option remote for ipip
- with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(self._base_path + [interface, 'remote-ip', remote_ip6])
-
- # Configure Tunnel Source interface
- self.session.set(self._base_path + [interface, 'source-interface', source_if])
-
- self.session.commit()
-
- conf = tunnel_conf(interface)
- self.assertEqual(interface, conf['ifname'])
- self.assertEqual('tunnel6', conf['link_type'])
- self.assertEqual(mtu, conf['mtu'])
- self.assertEqual(source_if, conf['link'])
-
- self.assertEqual(self.local_v6, conf['linkinfo']['info_data']['local'])
- self.assertEqual(remote_ip6, conf['linkinfo']['info_data']['remote'])
-
- def test_tunnel_verify_ipv4_local_remote_addr(self):
+ def test_ipv4_encapsulations(self):
# When running tests ensure that for certain encapsulation types the
# local and remote IP address is actually an IPv4 address
interface = f'tun1000'
local_if_addr = f'10.10.200.1/24'
- for encapsulation in ['ipip', 'sit', 'gre']:
+ for encapsulation in ['ipip', 'sit', 'gre', 'gre-bridge']:
self.session.set(self._base_path + [interface, 'address', local_if_addr])
self.session.set(self._base_path + [interface, 'encapsulation', encapsulation])
self.session.set(self._base_path + [interface, 'local-ip', self.local_v6])
@@ -176,14 +107,35 @@ class TunnelInterfaceTest(BasicInterfaceTest.BaseTest):
self.session.commit()
self.session.set(self._base_path + [interface, 'remote-ip', remote_ip4])
+ self.session.set(self._base_path + [interface, 'source-interface', source_if])
+
+ # Source interface can not be used with sit and gre-bridge
+ if encapsulation in ['sit', 'gre-bridge']:
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+ self.session.delete(self._base_path + [interface, 'source-interface'])
+
# Check if commit is ok
self.session.commit()
+ conf = tunnel_conf(interface)
+ self.assertEqual(interface, conf['ifname'])
+ self.assertEqual(mtu, conf['mtu'])
+
+ if encapsulation not in ['sit', 'gre-bridge']:
+ self.assertEqual(source_if, conf['link'])
+ self.assertEqual(encapsulation, conf['link_type'])
+ elif encapsulation in ['gre-bridge']:
+ self.assertEqual('ether', conf['link_type'])
+
+ self.assertEqual(self.local_v4, conf['linkinfo']['info_data']['local'])
+ self.assertEqual(remote_ip4, conf['linkinfo']['info_data']['remote'])
+
# cleanup this instance
self.session.delete(self._base_path + [interface])
self.session.commit()
- def test_tunnel_verify_ipv6_local_remote_addr(self):
+ def test_ipv6_encapsulations(self):
# When running tests ensure that for certain encapsulation types the
# local and remote IP address is actually an IPv6 address
@@ -205,9 +157,28 @@ class TunnelInterfaceTest(BasicInterfaceTest.BaseTest):
self.session.commit()
self.session.set(self._base_path + [interface, 'remote-ip', remote_ip6])
+ # Configure Tunnel Source interface
+ self.session.set(self._base_path + [interface, 'source-interface', source_if])
+
# Check if commit is ok
self.session.commit()
+ conf = tunnel_conf(interface)
+ self.assertEqual(interface, conf['ifname'])
+ self.assertEqual(mtu, conf['mtu'])
+ self.assertEqual(source_if, conf['link'])
+
+ # remap encapsulation protocol(s)
+ if encapsulation in ['ipip6', 'ip6ip6']:
+ encapsulation = 'tunnel6'
+ elif encapsulation in ['ip6gre']:
+ encapsulation = 'gre6'
+
+ self.assertEqual(encapsulation, conf['link_type'])
+
+ self.assertEqual(self.local_v6, conf['linkinfo']['info_data']['local'])
+ self.assertEqual(remote_ip6, conf['linkinfo']['info_data']['remote'])
+
# cleanup this instance
self.session.delete(self._base_path + [interface])
self.session.commit()
@@ -232,148 +203,5 @@ class TunnelInterfaceTest(BasicInterfaceTest.BaseTest):
# Check if commit is ok
self.session.commit()
- def test_tunnel_ip6ip6(self):
- interface = 'tun120'
- encapsulation = 'ip6ip6'
- local_if_addr = '2001:db8:f00::1/24'
-
- self.session.set(self._base_path + [interface, 'address', local_if_addr])
-
- # Must provide an "encapsulation" for tunnel tun10
- with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(self._base_path + [interface, 'encapsulation', encapsulation])
-
- # Must configure either local-ip or dhcp-interface for tunnel ipip tun100
- with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(self._base_path + [interface, 'local-ip', self.local_v6])
-
- # missing required option remote for ipip
- with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(self._base_path + [interface, 'remote-ip', remote_ip6])
-
- # Configure Tunnel Source interface
- self.session.set(self._base_path + [interface, 'source-interface', source_if])
-
- self.session.commit()
-
- conf = tunnel_conf(interface)
- self.assertEqual(interface, conf['ifname'])
- self.assertEqual('tunnel6', conf['link_type'])
- self.assertEqual(mtu, conf['mtu'])
- self.assertEqual(source_if, conf['link'])
-
- self.assertEqual(self.local_v6, conf['linkinfo']['info_data']['local'])
- self.assertEqual(remote_ip6, conf['linkinfo']['info_data']['remote'])
-
- def test_tunnel_gre_ipv4(self):
- interface = 'tun200'
- encapsulation = 'gre'
- local_if_addr = '172.16.1.1/24'
-
- self.session.set(self._base_path + [interface, 'address', local_if_addr])
-
- # Must provide an "encapsulation" for tunnel tun10
- with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(self._base_path + [interface, 'encapsulation', encapsulation])
-
- # Must configure either local-ip or dhcp-interface
- with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(self._base_path + [interface, 'local-ip', self.local_v4])
-
- # No assertion is raised for GRE remote-ip when missing
- self.session.set(self._base_path + [interface, 'remote-ip', remote_ip4])
-
- # Configure Tunnel Source interface
- self.session.set(self._base_path + [interface, 'source-interface', source_if])
-
- self.session.commit()
-
- conf = tunnel_conf(interface)
- self.assertEqual(interface, conf['ifname'])
- self.assertEqual(encapsulation, conf['link_type'])
- self.assertEqual(mtu, conf['mtu'])
- self.assertEqual(source_if, conf['link'])
-
- self.assertEqual(self.local_v4, conf['linkinfo']['info_data']['local'])
- self.assertEqual(remote_ip4, conf['linkinfo']['info_data']['remote'])
-
-
- def test_gre_ipv6(self):
- interface = 'tun210'
- encapsulation = 'ip6gre'
- local_if_addr = '2001:db8:f01::1/24'
-
- self.session.set(self._base_path + [interface, 'address', local_if_addr])
-
- # Must provide an "encapsulation" for tunnel tun10
- with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(self._base_path + [interface, 'encapsulation', encapsulation])
-
- # Must configure either local-ip or dhcp-interface
- with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(self._base_path + [interface, 'local-ip', self.local_v6])
-
- # No assertion is raised for GRE remote-ip when missing
- self.session.set(self._base_path + [interface, 'remote-ip', remote_ip6])
-
- # Configure Tunnel Source interface
- self.session.set(self._base_path + [interface, 'source-interface', source_if])
-
- self.session.commit()
-
- conf = tunnel_conf(interface)
- self.assertEqual(interface, conf['ifname'])
- self.assertEqual(encapsulation, conf['link_type'])
- self.assertEqual(mtu, conf['mtu'])
- self.assertEqual(source_if, conf['link'])
-
- self.assertEqual(self.local_v6, conf['linkinfo']['info_data']['local'])
- self.assertEqual(remote_ip6, conf['linkinfo']['info_data']['remote'])
-
-
- def test_tunnel_sit(self):
- interface = 'tun300'
- encapsulation = 'sit'
- local_if_addr = '172.16.2.1/24'
-
- self.session.set(self._base_path + [interface, 'address', local_if_addr])
-
- # Must provide an "encapsulation" for tunnel tun10
- with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(self._base_path + [interface, 'encapsulation', encapsulation])
-
- # Must configure either local-ip or dhcp-interface
- with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(self._base_path + [interface, 'local-ip', self.local_v4])
-
- # No assertion is raised for GRE remote-ip when missing
- self.session.set(self._base_path + [interface, 'remote-ip', remote_ip4])
-
- # Source interface can not be used with sit
- self.session.set(self._base_path + [interface, 'source-interface', source_if])
- with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.delete(self._base_path + [interface, 'source-interface'])
-
- self.session.commit()
-
- conf = tunnel_conf(interface)
- self.assertEqual(interface, conf['ifname'])
- self.assertEqual(encapsulation, conf['link_type'])
- self.assertEqual(mtu, conf['mtu'])
-
- self.assertEqual(self.local_v4, conf['linkinfo']['info_data']['local'])
- self.assertEqual(remote_ip4, conf['linkinfo']['info_data']['remote'])
-
-
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_interfaces_vxlan.py b/smoketest/scripts/cli/test_interfaces_vxlan.py
index a9b0fc5a1..a726aa610 100755
--- a/smoketest/scripts/cli/test_interfaces_vxlan.py
+++ b/smoketest/scripts/cli/test_interfaces_vxlan.py
@@ -21,6 +21,8 @@ from base_interfaces_test import BasicInterfaceTest
class VXLANInterfaceTest(BasicInterfaceTest.BaseTest):
def setUp(self):
+ self._test_ip = True
+ self._test_ipv6 = True
self._test_mtu = True
self._base_path = ['interfaces', 'vxlan']
self._options = {
diff --git a/smoketest/scripts/cli/test_interfaces_wireless.py b/smoketest/scripts/cli/test_interfaces_wireless.py
index ffaa7d523..51d97f032 100755
--- a/smoketest/scripts/cli/test_interfaces_wireless.py
+++ b/smoketest/scripts/cli/test_interfaces_wireless.py
@@ -33,6 +33,7 @@ def get_config_value(interface, key):
class WirelessInterfaceTest(BasicInterfaceTest.BaseTest):
def setUp(self):
+ self._test_ip = True
self._base_path = ['interfaces', 'wireless']
self._options = {
'wlan0': ['physical-device phy0', 'ssid VyOS-WIFI-0',
diff --git a/smoketest/scripts/cli/test_interfaces_wirelessmodem.py b/smoketest/scripts/cli/test_interfaces_wirelessmodem.py
index 45cd069f4..696a6946b 100755
--- a/smoketest/scripts/cli/test_interfaces_wirelessmodem.py
+++ b/smoketest/scripts/cli/test_interfaces_wirelessmodem.py
@@ -40,7 +40,7 @@ class WWANInterfaceTest(unittest.TestCase):
self.session.commit()
del self.session
- def test_wlm_1(self):
+ def test_wwan(self):
for interface in self._interfaces:
self.session.set(base_path + [interface, 'no-peer-dns'])
self.session.set(base_path + [interface, 'connect-on-demand'])
diff --git a/smoketest/scripts/cli/test_nat.py b/smoketest/scripts/cli/test_nat.py
index 7ca82f86f..b5702d691 100755
--- a/smoketest/scripts/cli/test_nat.py
+++ b/smoketest/scripts/cli/test_nat.py
@@ -138,7 +138,6 @@ class TestNAT(unittest.TestCase):
else:
self.assertEqual(iface, inbound_iface_200)
-
def test_snat_required_translation_address(self):
# T2813: Ensure translation address is specified
rule = '5'
@@ -156,5 +155,28 @@ class TestNAT(unittest.TestCase):
self.session.set(src_path + ['rule', rule, 'translation', 'address', 'masquerade'])
self.session.commit()
+ def test_dnat_negated_addresses(self):
+ # T3186: negated addresses are not accepted by nftables
+ rule = '1000'
+ self.session.set(dst_path + ['rule', rule, 'destination', 'address', '!192.0.2.1'])
+ self.session.set(dst_path + ['rule', rule, 'destination', 'port', '53'])
+ self.session.set(dst_path + ['rule', rule, 'inbound-interface', 'eth0'])
+ self.session.set(dst_path + ['rule', rule, 'protocol', 'tcp_udp'])
+ self.session.set(dst_path + ['rule', rule, 'source', 'address', '!192.0.2.1'])
+ self.session.set(dst_path + ['rule', rule, 'translation', 'address', '192.0.2.1'])
+ self.session.set(dst_path + ['rule', rule, 'translation', 'port', '53'])
+ self.session.commit()
+
+ def test_nat_no_rules(self):
+ # T3206: deleting all rules but keep the direction 'destination' or
+ # 'source' resulteds in KeyError: 'rule'.
+ #
+ # Test that both 'nat destination' and 'nat source' nodes can exist
+ # without any rule
+ self.session.set(src_path)
+ self.session.set(dst_path)
+ self.session.commit()
+
+
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_service_ssh.py b/smoketest/scripts/cli/test_service_ssh.py
index 0bb907c3a..eede042de 100755
--- a/smoketest/scripts/cli/test_service_ssh.py
+++ b/smoketest/scripts/cli/test_service_ssh.py
@@ -25,7 +25,7 @@ from vyos.util import process_named_running
from vyos.util import read_file
PROCESS_NAME = 'sshd'
-SSHD_CONF = '/run/ssh/sshd_config'
+SSHD_CONF = '/run/sshd/sshd_config'
base_path = ['service', 'ssh']
vrf = 'ssh-test'
@@ -44,11 +44,6 @@ class TestServiceSSH(unittest.TestCase):
def tearDown(self):
# delete testing SSH config
self.session.delete(base_path)
- # restore "plain" SSH access
- self.session.set(base_path)
- # delete VRF
- self.session.delete(['vrf', 'name', vrf])
-
self.session.commit()
del self.session
@@ -109,7 +104,7 @@ class TestServiceSSH(unittest.TestCase):
def test_ssh_multiple_listen_addresses(self):
# Check if SSH service can be configured and runs with multiple
# listen ports and listen-addresses
- ports = ['22', '2222']
+ ports = ['22', '2222', '2223', '2224']
for port in ports:
self.session.set(base_path + ['port', port])
@@ -143,7 +138,7 @@ class TestServiceSSH(unittest.TestCase):
with self.assertRaises(ConfigSessionError):
self.session.commit()
- self.session.set(['vrf', 'name', vrf, 'table', '1001'])
+ self.session.set(['vrf', 'name', vrf, 'table', '1338'])
# commit changes
self.session.commit()
@@ -159,5 +154,8 @@ class TestServiceSSH(unittest.TestCase):
tmp = cmd(f'ip vrf pids {vrf}')
self.assertIn(PROCESS_NAME, tmp)
+ # delete VRF
+ self.session.delete(['vrf', 'name', vrf])
+
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_system_login.py b/smoketest/scripts/cli/test_system_login.py
index 6188cf38b..bb6f57fc2 100755
--- a/smoketest/scripts/cli/test_system_login.py
+++ b/smoketest/scripts/cli/test_system_login.py
@@ -24,8 +24,10 @@ from platform import release as kernel_version
from subprocess import Popen, PIPE
from vyos.configsession import ConfigSession
+from vyos.configsession import ConfigSessionError
from vyos.util import cmd
from vyos.util import read_file
+from vyos.template import inc_ip
base_path = ['system', 'login']
users = ['vyos1', 'vyos2']
@@ -42,7 +44,7 @@ class TestSystemLogin(unittest.TestCase):
self.session.commit()
del self.session
- def test_local_user(self):
+ def test_system_login_user(self):
# Check if user can be created and we can SSH to localhost
self.session.set(['service', 'ssh', 'port', '22'])
@@ -82,7 +84,7 @@ class TestSystemLogin(unittest.TestCase):
for option in options:
self.assertIn(f'{option}=y', kernel_config)
- def test_radius_config(self):
+ def test_system_login_radius_ipv4(self):
# Verify generated RADIUS configuration files
radius_key = 'VyOSsecretVyOS'
@@ -95,6 +97,12 @@ class TestSystemLogin(unittest.TestCase):
self.session.set(base_path + ['radius', 'server', radius_server, 'port', radius_port])
self.session.set(base_path + ['radius', 'server', radius_server, 'timeout', radius_timeout])
self.session.set(base_path + ['radius', 'source-address', radius_source])
+ self.session.set(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)])
+
+ # check validate() - Only one IPv4 source-address supported
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+ self.session.delete(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)])
self.session.commit()
@@ -130,5 +138,59 @@ class TestSystemLogin(unittest.TestCase):
tmp = re.findall(r'group:\s+mapname\s+files', nsswitch_conf)
self.assertTrue(tmp)
+ def test_system_login_radius_ipv6(self):
+ # Verify generated RADIUS configuration files
+
+ radius_key = 'VyOS-VyOS'
+ radius_server = '2001:db8::1'
+ radius_source = '::1'
+ radius_port = '4000'
+ radius_timeout = '4'
+
+ self.session.set(base_path + ['radius', 'server', radius_server, 'key', radius_key])
+ self.session.set(base_path + ['radius', 'server', radius_server, 'port', radius_port])
+ self.session.set(base_path + ['radius', 'server', radius_server, 'timeout', radius_timeout])
+ self.session.set(base_path + ['radius', 'source-address', radius_source])
+ self.session.set(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)])
+
+ # check validate() - Only one IPv4 source-address supported
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+ self.session.delete(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)])
+
+ self.session.commit()
+
+ # this file must be read with higher permissions
+ pam_radius_auth_conf = cmd('sudo cat /etc/pam_radius_auth.conf')
+ tmp = re.findall(r'\n?\[{}\]:{}\s+{}\s+{}\s+\[{}\]'.format(radius_server,
+ radius_port, radius_key, radius_timeout,
+ radius_source), pam_radius_auth_conf)
+ self.assertTrue(tmp)
+
+ # required, static options
+ self.assertIn('priv-lvl 15', pam_radius_auth_conf)
+ self.assertIn('mapped_priv_user radius_priv_user', pam_radius_auth_conf)
+
+ # PAM
+ pam_common_account = read_file('/etc/pam.d/common-account')
+ self.assertIn('pam_radius_auth.so', pam_common_account)
+
+ pam_common_auth = read_file('/etc/pam.d/common-auth')
+ self.assertIn('pam_radius_auth.so', pam_common_auth)
+
+ pam_common_session = read_file('/etc/pam.d/common-session')
+ self.assertIn('pam_radius_auth.so', pam_common_session)
+
+ pam_common_session_noninteractive = read_file('/etc/pam.d/common-session-noninteractive')
+ self.assertIn('pam_radius_auth.so', pam_common_session_noninteractive)
+
+ # NSS
+ nsswitch_conf = read_file('/etc/nsswitch.conf')
+ tmp = re.findall(r'passwd:\s+mapuid\s+files\s+mapname', nsswitch_conf)
+ self.assertTrue(tmp)
+
+ tmp = re.findall(r'group:\s+mapname\s+files', nsswitch_conf)
+ self.assertTrue(tmp)
+
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_system_ntp.py b/smoketest/scripts/cli/test_system_ntp.py
index 7d1bc144f..986c8dfb2 100755
--- a/smoketest/scripts/cli/test_system_ntp.py
+++ b/smoketest/scripts/cli/test_system_ntp.py
@@ -76,7 +76,11 @@ class TestSystemNTP(unittest.TestCase):
self.assertTrue(process_named_running(PROCESS_NAME))
def test_ntp_clients(self):
- # Test the allowed-networks statement
+ """ Test the allowed-networks statement """
+ listen_address = ['127.0.0.1', '::1']
+ for listen in listen_address:
+ self.session.set(base_path + ['listen-address', listen])
+
networks = ['192.0.2.0/24', '2001:db8:1000::/64']
for network in networks:
self.session.set(base_path + ['allow-clients', 'address', network])
@@ -102,7 +106,9 @@ class TestSystemNTP(unittest.TestCase):
# Check listen address
tmp = get_config_value('interface')
- test = ['ignore wildcard', 'listen 127.0.0.1', 'listen ::1']
+ test = ['ignore wildcard']
+ for listen in listen_address:
+ test.append(f'listen {listen}')
self.assertEqual(tmp, test)
# Check for running process