diff options
Diffstat (limited to 'smoketest/scripts')
-rw-r--r-- | smoketest/scripts/cli/base_interfaces_test.py | 59 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_interfaces_bonding.py | 4 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_interfaces_bridge.py | 33 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_interfaces_ethernet.py | 11 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_interfaces_loopback.py | 14 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_interfaces_tunnel.py | 31 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_service_ssh.py | 10 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_system_ip.py | 2 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_system_ipv6.py | 102 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_system_ntp.py | 6 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_vrf.py | 90 |
11 files changed, 310 insertions, 52 deletions
diff --git a/smoketest/scripts/cli/base_interfaces_test.py b/smoketest/scripts/cli/base_interfaces_test.py index 8a09dd96f..6a68bcc26 100644 --- a/smoketest/scripts/cli/base_interfaces_test.py +++ b/smoketest/scripts/cli/base_interfaces_test.py @@ -18,9 +18,10 @@ import json from binascii import hexlify -from netifaces import ifaddresses from netifaces import AF_INET from netifaces import AF_INET6 +from netifaces import ifaddresses +from netifaces import interfaces from vyos.configsession import ConfigSession from vyos.ifconfig import Interface @@ -81,21 +82,22 @@ class BasicInterfaceTest: self.session.set(['interfaces', section, span]) def tearDown(self): - # Ethernet is handled in its derived class - if 'ethernet' not in self._base_path: - self.session.delete(self._base_path) - # Tear down mirror interfaces for SPAN (Switch Port Analyzer) for span in self._mirror_interfaces: section = Section.section(span) self.session.delete(['interfaces', section, span]) + self.session.delete(self._base_path) self.session.commit() del self.session + # Verify that no previously interface remained on the system + for intf in self._interfaces: + self.assertNotIn(intf, interfaces()) + def test_span_mirror(self): if not self._mirror_interfaces: - self.skipTest('not enabled') + self.skipTest('not supported') # Check the two-way mirror rules of ingress and egress for mirror in self._mirror_interfaces: @@ -111,6 +113,19 @@ class BasicInterfaceTest: self.assertTrue(is_mirrored_to(interface, mirror, 'ffff')) self.assertTrue(is_mirrored_to(interface, mirror, '1')) + def test_interface_disable(self): + # Check if description can be added to interface and + # can be read back + for intf in self._interfaces: + self.session.set(self._base_path + [intf, 'disable']) + for option in self._options.get(intf, []): + self.session.set(self._base_path + [intf] + option.split()) + + self.session.commit() + + # Validate interface description + for intf in self._interfaces: + self.assertEqual(Interface(intf).get_admin_state(), 'down') def test_interface_description(self): # Check if description can be added to interface and @@ -150,6 +165,7 @@ class BasicInterfaceTest: for intf in self._interfaces: self.assertTrue(is_intf_addr_assigned(intf, addr)) + self.assertEqual(Interface(intf).get_admin_state(), 'up') def test_add_multiple_ip_addresses(self): # Add address @@ -174,7 +190,7 @@ class BasicInterfaceTest: def test_ipv6_link_local_address(self): # Common function for IPv6 link-local address assignemnts if not self._test_ipv6: - self.skipTest('not enabled') + self.skipTest('not supported') for interface in self._interfaces: base = self._base_path + [interface] @@ -201,7 +217,7 @@ class BasicInterfaceTest: def test_interface_mtu(self): if not self._test_mtu: - self.skipTest('not enabled') + self.skipTest('not supported') for intf in self._interfaces: base = self._base_path + [intf] @@ -221,7 +237,7 @@ class BasicInterfaceTest: # Testcase if MTU can be changed to 1200 on non IPv6 # enabled interfaces if not self._test_mtu: - self.skipTest('not enabled') + self.skipTest('not supported') old_mtu = self._mtu self._mtu = '1200' @@ -244,9 +260,13 @@ class BasicInterfaceTest: self._mtu = old_mtu - def test_8021q_vlan_interfaces(self): + def test_vif_8021q_interfaces(self): + # XXX: This testcase is not allowed to run as first testcase, reason + # is the Wireless test will first load the wifi kernel hwsim module + # which creates a wlan0 and wlan1 interface which will fail the + # tearDown() test in the end that no interface is allowed to survive! if not self._test_vlan: - self.skipTest('not enabled') + self.skipTest('not supported') for interface in self._interfaces: base = self._base_path + [interface] @@ -269,11 +289,16 @@ class BasicInterfaceTest: tmp = read_file(f'/sys/class/net/{vif}/mtu') self.assertEqual(tmp, self._mtu) + self.assertEqual(Interface(vif).get_admin_state(), 'up') - def test_8021ad_qinq_vlan_interfaces(self): + def test_vif_s_8021ad_vlan_interfaces(self): + # XXX: This testcase is not allowed to run as first testcase, reason + # is the Wireless test will first load the wifi kernel hwsim module + # which creates a wlan0 and wlan1 interface which will fail the + # tearDown() test in the end that no interface is allowed to survive! if not self._test_qinq: - self.skipTest('not enabled') + self.skipTest('not supported') for interface in self._interfaces: base = self._base_path + [interface] @@ -304,7 +329,7 @@ class BasicInterfaceTest: def test_interface_ip_options(self): if not self._test_ip: - self.skipTest('not enabled') + self.skipTest('not supported') for interface in self._interfaces: arp_tmo = '300' @@ -355,7 +380,7 @@ class BasicInterfaceTest: def test_interface_ipv6_options(self): if not self._test_ipv6: - self.skipTest('not enabled') + self.skipTest('not supported') for interface in self._interfaces: dad_transmits = '10' @@ -379,7 +404,7 @@ class BasicInterfaceTest: def test_dhcpv6pd_auto_sla_id(self): if not self._test_ipv6_pd: - self.skipTest('not enabled') + self.skipTest('not supported') prefix_len = '56' sla_len = str(64 - int(prefix_len)) @@ -435,7 +460,7 @@ class BasicInterfaceTest: def test_dhcpv6pd_manual_sla_id(self): if not self._test_ipv6_pd: - self.skipTest('not enabled') + self.skipTest('not supported') prefix_len = '56' sla_len = str(64 - int(prefix_len)) diff --git a/smoketest/scripts/cli/test_interfaces_bonding.py b/smoketest/scripts/cli/test_interfaces_bonding.py index f42ec3e9b..560bfb92b 100755 --- a/smoketest/scripts/cli/test_interfaces_bonding.py +++ b/smoketest/scripts/cli/test_interfaces_bonding.py @@ -60,8 +60,8 @@ class BondingInterfaceTest(BasicInterfaceTest.BaseTest): slaves = read_file(f'/sys/class/net/{interface}/bonding/slaves').split() self.assertListEqual(slaves, self._members) - def test_8021q_vlan_interfaces(self): - super().test_8021q_vlan_interfaces() + def test_vif_8021q_interfaces(self): + super().test_vif_8021q_interfaces() for interface in self._interfaces: slaves = read_file(f'/sys/class/net/{interface}/bonding/slaves').split() diff --git a/smoketest/scripts/cli/test_interfaces_bridge.py b/smoketest/scripts/cli/test_interfaces_bridge.py index 464226b6f..8f03290be 100755 --- a/smoketest/scripts/cli/test_interfaces_bridge.py +++ b/smoketest/scripts/cli/test_interfaces_bridge.py @@ -25,6 +25,7 @@ from netifaces import interfaces from vyos.ifconfig import Section from vyos.util import cmd from vyos.util import read_file +from vyos.validate import is_intf_addr_assigned class BridgeInterfaceTest(BasicInterfaceTest.BaseTest): def setUp(self): @@ -32,7 +33,6 @@ class BridgeInterfaceTest(BasicInterfaceTest.BaseTest): self._test_ipv6 = True self._test_ipv6_pd = True self._test_vlan = True - self._test_qinq = True self._base_path = ['interfaces', 'bridge'] self._mirror_interfaces = ['dum21354'] self._members = [] @@ -53,6 +53,12 @@ class BridgeInterfaceTest(BasicInterfaceTest.BaseTest): super().setUp() + def tearDown(self): + for intf in self._interfaces: + self.session.delete(self._base_path + [intf]) + + super().tearDown() + def test_add_remove_bridge_member(self): # Add member interfaces to bridge and set STP cost/priority for interface in self._interfaces: @@ -87,12 +93,22 @@ class BridgeInterfaceTest(BasicInterfaceTest.BaseTest): self.session.commit() + def test_vif_8021q_interfaces(self): + for interface in self._interfaces: + base = self._base_path + [interface] + self.session.set(base + ['enable-vlan']) + super().test_vif_8021q_interfaces() + def test_bridge_vlan_filter(self): + + vif_vlan = 2 # Add member interface to bridge and set VLAN filter for interface in self._interfaces: base = self._base_path + [interface] - self.session.set(base + ['vif', '1', 'address', '192.0.2.1/24']) - self.session.set(base + ['vif', '2', 'address', '192.0.3.1/24']) + self.session.set(base + ['enable-vlan']) + self.session.set(base + ['address', '192.0.2.1/24']) + self.session.set(base + ['vif', str(vif_vlan), 'address', '192.0.3.1/24']) + self.session.set(base + ['vif', str(vif_vlan), 'mtu', self._mtu]) vlan_id = 101 allowed_vlan = 2 @@ -182,10 +198,13 @@ class BridgeInterfaceTest(BasicInterfaceTest.BaseTest): for vif in vifs: # member interface must be assigned to the bridge self.assertTrue(os.path.exists(f'/sys/class/net/{interface}/lower_{member}.{vif}')) - - # remove VLAN interfaces - for vif in vifs: - self.session.delete(['interfaces', 'ethernet', member, 'vif', vif]) + + # delete all members + for interface in self._interfaces: + for member in self._members: + for vif in vifs: + self.session.delete(['interfaces', 'ethernet', member, 'vif', vif]) + self.session.delete(['interfaces', 'bridge', interface, 'member', 'interface', f'{member}.{vif}']) if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_interfaces_ethernet.py b/smoketest/scripts/cli/test_interfaces_ethernet.py index 42c1f15df..9d896f690 100755 --- a/smoketest/scripts/cli/test_interfaces_ethernet.py +++ b/smoketest/scripts/cli/test_interfaces_ethernet.py @@ -78,8 +78,13 @@ class EthernetInterfaceTest(BasicInterfaceTest.BaseTest): self.session.set(self._base_path + [interface, 'speed', 'auto']) self.session.set(self._base_path + [interface, 'hw-id', self._macs[interface]]) - super().tearDown() + # Tear down mirror interfaces for SPAN (Switch Port Analyzer) + for span in self._mirror_interfaces: + section = Section.section(span) + self.session.delete(['interfaces', section, span]) + self.session.commit() + del self.session def test_dhcp_disable_interface(self): # When interface is configured as admin down, it must be admin down @@ -193,12 +198,12 @@ if __name__ == '__main__': # Generate mandatory SSL certificate tmp = f'openssl req -newkey rsa:4096 -new -nodes -x509 -days 3650 '\ f'-keyout {ssl_key} -out {ssl_cert} -subj {subject}' - print(cmd(tmp)) + cmd(tmp) if not os.path.isfile(ca_cert): # Generate "CA" tmp = f'openssl req -new -x509 -key {ssl_key} -out {ca_cert} -subj {subject}' - print(cmd(tmp)) + cmd(tmp) for file in [ca_cert, ssl_cert, ssl_key]: cmd(f'sudo chown radius_priv_user:vyattacfg {file}') diff --git a/smoketest/scripts/cli/test_interfaces_loopback.py b/smoketest/scripts/cli/test_interfaces_loopback.py index 79225a1bd..36000c3ff 100755 --- a/smoketest/scripts/cli/test_interfaces_loopback.py +++ b/smoketest/scripts/cli/test_interfaces_loopback.py @@ -17,6 +17,8 @@ import unittest from base_interfaces_test import BasicInterfaceTest +from netifaces import interfaces + from vyos.validate import is_intf_addr_assigned class LoopbackInterfaceTest(BasicInterfaceTest.BaseTest): @@ -27,6 +29,15 @@ class LoopbackInterfaceTest(BasicInterfaceTest.BaseTest): self._base_path = ['interfaces', 'loopback'] self._interfaces = ['lo'] + def tearDown(self): + self.session.delete(self._base_path) + self.session.commit() + del self.session + + # loopback interface must persist! + for intf in self._interfaces: + self.assertIn(intf, interfaces()) + def test_add_single_ip_address(self): super().test_add_single_ip_address() for addr in self._loopback_addresses: @@ -37,5 +48,8 @@ class LoopbackInterfaceTest(BasicInterfaceTest.BaseTest): for addr in self._loopback_addresses: self.assertTrue(is_intf_addr_assigned('lo', addr)) + def test_interface_disable(self): + self.skipTest('not supported') + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_interfaces_tunnel.py b/smoketest/scripts/cli/test_interfaces_tunnel.py index f67b813af..8405fc7d0 100755 --- a/smoketest/scripts/cli/test_interfaces_tunnel.py +++ b/smoketest/scripts/cli/test_interfaces_tunnel.py @@ -84,7 +84,6 @@ class TunnelInterfaceTest(BasicInterfaceTest.BaseTest): self.session.delete(['interfaces', 'dummy', source_if]) super().tearDown() - def test_ipv4_encapsulations(self): # When running tests ensure that for certain encapsulation types the # local and remote IP address is actually an IPv4 address @@ -106,7 +105,6 @@ class TunnelInterfaceTest(BasicInterfaceTest.BaseTest): with self.assertRaises(ConfigSessionError): self.session.commit() self.session.set(self._base_path + [interface, 'remote-ip', remote_ip4]) - self.session.set(self._base_path + [interface, 'source-interface', source_if]) # Source interface can not be used with sit and gre-bridge @@ -130,6 +128,7 @@ class TunnelInterfaceTest(BasicInterfaceTest.BaseTest): self.assertEqual(self.local_v4, conf['linkinfo']['info_data']['local']) self.assertEqual(remote_ip4, conf['linkinfo']['info_data']['remote']) + self.assertTrue(conf['linkinfo']['info_data']['pmtudisc']) # cleanup this instance self.session.delete(self._base_path + [interface]) @@ -177,7 +176,7 @@ class TunnelInterfaceTest(BasicInterfaceTest.BaseTest): self.assertEqual(encapsulation, conf['link_type']) self.assertEqual(self.local_v6, conf['linkinfo']['info_data']['local']) - self.assertEqual(remote_ip6, conf['linkinfo']['info_data']['remote']) + self.assertEqual(remote_ip6, conf['linkinfo']['info_data']['remote']) # cleanup this instance self.session.delete(self._base_path + [interface]) @@ -203,5 +202,31 @@ class TunnelInterfaceTest(BasicInterfaceTest.BaseTest): # Check if commit is ok self.session.commit() + def test_tunnel_parameters_gre(self): + interface = f'tun1030' + gre_key = '10' + encapsulation = 'gre' + tos = '20' + + self.session.set(self._base_path + [interface, 'encapsulation', encapsulation]) + self.session.set(self._base_path + [interface, 'local-ip', self.local_v4]) + self.session.set(self._base_path + [interface, 'remote-ip', remote_ip4]) + + self.session.set(self._base_path + [interface, 'parameters', 'ip', 'no-pmtu-discovery']) + self.session.set(self._base_path + [interface, 'parameters', 'ip', 'key', gre_key]) + self.session.set(self._base_path + [interface, 'parameters', 'ip', 'tos', tos]) + + # Check if commit is ok + self.session.commit() + + conf = tunnel_conf(interface) + self.assertEqual(mtu, conf['mtu']) + self.assertEqual(interface, conf['ifname']) + self.assertEqual(encapsulation, conf['link_type']) + self.assertEqual(self.local_v4, conf['linkinfo']['info_data']['local']) + self.assertEqual(remote_ip4, conf['linkinfo']['info_data']['remote']) + self.assertEqual(0, conf['linkinfo']['info_data']['ttl']) + self.assertFalse( conf['linkinfo']['info_data']['pmtudisc']) + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_service_ssh.py b/smoketest/scripts/cli/test_service_ssh.py index eede042de..68081e56f 100755 --- a/smoketest/scripts/cli/test_service_ssh.py +++ b/smoketest/scripts/cli/test_service_ssh.py @@ -27,7 +27,11 @@ from vyos.util import read_file PROCESS_NAME = 'sshd' SSHD_CONF = '/run/sshd/sshd_config' base_path = ['service', 'ssh'] -vrf = 'ssh-test' +vrf = 'mgmt' + +key_rsa = '/etc/ssh/ssh_host_rsa_key' +key_dsa = '/etc/ssh/ssh_host_dsa_key' +key_ed25519 = '/etc/ssh/ssh_host_ed25519_key' def get_config_value(key): tmp = read_file(SSHD_CONF) @@ -47,6 +51,10 @@ class TestServiceSSH(unittest.TestCase): self.session.commit() del self.session + self.assertTrue(os.path.isfile(key_rsa)) + self.assertTrue(os.path.isfile(key_dsa)) + self.assertTrue(os.path.isfile(key_ed25519)) + def test_ssh_default(self): # Check if SSH service runs with default settings - used for checking # behavior of <defaultValue> in XML definition diff --git a/smoketest/scripts/cli/test_system_ip.py b/smoketest/scripts/cli/test_system_ip.py index 8fc18ba88..4ad8d537d 100755 --- a/smoketest/scripts/cli/test_system_ip.py +++ b/smoketest/scripts/cli/test_system_ip.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2020 Francois Mertz fireboxled@gmail.com +# Copyright (C) 2020 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as diff --git a/smoketest/scripts/cli/test_system_ipv6.py b/smoketest/scripts/cli/test_system_ipv6.py new file mode 100755 index 000000000..df69739eb --- /dev/null +++ b/smoketest/scripts/cli/test_system_ipv6.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2021 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import unittest + +from vyos.configsession import ConfigSession +from vyos.util import read_file + +base_path = ['system', 'ipv6'] + +file_forwarding = '/proc/sys/net/ipv6/conf/all/forwarding' +file_disable = '/etc/modprobe.d/vyos_disable_ipv6.conf' +file_dad = '/proc/sys/net/ipv6/conf/all/accept_dad' +file_multipath = '/proc/sys/net/ipv6/fib_multipath_hash_policy' + +class TestSystemIPv6(unittest.TestCase): + def setUp(self): + self.session = ConfigSession(os.getpid()) + + def tearDown(self): + self.session.delete(base_path) + self.session.commit() + del self.session + + def test_system_ipv6_forwarding(self): + # Test if IPv6 forwarding can be disabled globally, default is '1' + # which means forwearding enabled + self.assertEqual(read_file(file_forwarding), '1') + + self.session.set(base_path + ['disable-forwarding']) + self.session.commit() + + self.assertEqual(read_file(file_forwarding), '0') + + def test_system_ipv6_disable(self): + # Do not assign any IPv6 address on interfaces, this requires a reboot + # which can not be tested, but we can read the config file :) + self.session.set(base_path + ['disable']) + self.session.commit() + + # Verify configuration file + self.assertEqual(read_file(file_disable), 'options ipv6 disable_ipv6=1') + + def test_system_ipv6_strict_dad(self): + # This defaults to 1 + self.assertEqual(read_file(file_dad), '1') + + # Do not assign any IPv6 address on interfaces, this requires a reboot + # which can not be tested, but we can read the config file :) + self.session.set(base_path + ['strict-dad']) + self.session.commit() + + # Verify configuration file + self.assertEqual(read_file(file_dad), '2') + + def test_system_ipv6_multipath(self): + # This defaults to 0 + self.assertEqual(read_file(file_multipath), '0') + + # Do not assign any IPv6 address on interfaces, this requires a reboot + # which can not be tested, but we can read the config file :) + self.session.set(base_path + ['multipath', 'layer4-hashing']) + self.session.commit() + + # Verify configuration file + self.assertEqual(read_file(file_multipath), '1') + + def test_system_ipv6_neighbor_table_size(self): + # Maximum number of entries to keep in the ARP cache, the + # default is 8192 + + gc_thresh3 = '/proc/sys/net/ipv6/neigh/default/gc_thresh3' + gc_thresh2 = '/proc/sys/net/ipv6/neigh/default/gc_thresh2' + gc_thresh1 = '/proc/sys/net/ipv6/neigh/default/gc_thresh1' + self.assertEqual(read_file(gc_thresh3), '8192') + self.assertEqual(read_file(gc_thresh2), '4096') + self.assertEqual(read_file(gc_thresh1), '1024') + + for size in [1024, 2048, 4096, 8192, 16384, 32768]: + self.session.set(base_path + ['neighbor', 'table-size', str(size)]) + self.session.commit() + + self.assertEqual(read_file(gc_thresh3), str(size)) + self.assertEqual(read_file(gc_thresh2), str(size // 2)) + self.assertEqual(read_file(gc_thresh1), str(size // 8)) + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_system_ntp.py b/smoketest/scripts/cli/test_system_ntp.py index 986c8dfb2..edb6ad94d 100755 --- a/smoketest/scripts/cli/test_system_ntp.py +++ b/smoketest/scripts/cli/test_system_ntp.py @@ -26,7 +26,7 @@ from vyos.util import read_file from vyos.util import process_named_running PROCESS_NAME = 'ntpd' -NTP_CONF = '/etc/ntp.conf' +NTP_CONF = '/run/ntpd/ntpd.conf' base_path = ['system', 'ntp'] def get_config_value(key): @@ -47,6 +47,8 @@ class TestSystemNTP(unittest.TestCase): self.session.commit() del self.session + self.assertFalse(process_named_running(PROCESS_NAME)) + def test_ntp_options(self): # Test basic NTP support with multiple servers and their options servers = ['192.0.2.1', '192.0.2.2'] @@ -76,7 +78,7 @@ class TestSystemNTP(unittest.TestCase): self.assertTrue(process_named_running(PROCESS_NAME)) def test_ntp_clients(self): - """ Test the allowed-networks statement """ + # Test the allowed-networks statement listen_address = ['127.0.0.1', '::1'] for listen in listen_address: self.session.set(base_path + ['listen-address', listen]) diff --git a/smoketest/scripts/cli/test_vrf.py b/smoketest/scripts/cli/test_vrf.py index 7bcfea861..5270e758a 100755 --- a/smoketest/scripts/cli/test_vrf.py +++ b/smoketest/scripts/cli/test_vrf.py @@ -14,53 +14,111 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +import re import os import unittest +from netifaces import interfaces -from vyos.configsession import ConfigSession, ConfigSessionError +from vyos.configsession import ConfigSession +from vyos.configsession import ConfigSessionError from vyos.util import read_file from vyos.validate import is_intf_addr_assigned +from vyos.ifconfig import Interface + +base_path = ['vrf'] +vrfs = ['red', 'green', 'blue', 'foo-bar', 'baz_foo'] class VRFTest(unittest.TestCase): def setUp(self): self.session = ConfigSession(os.getpid()) - self._vrfs = ['red', 'green', 'blue'] def tearDown(self): # delete all VRFs - self.session.delete(['vrf']) + self.session.delete(base_path) self.session.commit() - del self.session + for vrf in vrfs: + self.assertNotIn(vrf, interfaces()) def test_vrf_table_id(self): - table = 1000 - for vrf in self._vrfs: - base = ['vrf', 'name', vrf] - description = "VyOS-VRF-" + vrf + table = '1000' + for vrf in vrfs: + base = base_path + ['name', vrf] + description = f'VyOS-VRF-{vrf}' self.session.set(base + ['description', description]) # check validate() - a table ID is mandatory with self.assertRaises(ConfigSessionError): self.session.commit() - self.session.set(base + ['table', str(table)]) - table += 1 + self.session.set(base + ['table', table]) + if vrf == 'green': + self.session.set(base + ['disable']) + + table = str(int(table) + 1) # commit changes self.session.commit() + # Verify VRF configuration + table = '1000' + iproute2_config = read_file('/etc/iproute2/rt_tables.d/vyos-vrf.conf') + for vrf in vrfs: + description = f'VyOS-VRF-{vrf}' + self.assertTrue(vrf in interfaces()) + vrf_if = Interface(vrf) + # validate proper interface description + self.assertEqual(vrf_if.get_alias(), description) + # validate admin up/down state of VRF + state = 'up' + if vrf == 'green': + state = 'down' + self.assertEqual(vrf_if.get_admin_state(), state) + + # Test the iproute2 lookup file, syntax is as follows: + # + # # id vrf name comment + # 1000 red # VyOS-VRF-red + # 1001 green # VyOS-VRF-green + # ... + regex = f'{table}\s+{vrf}\s+#\s+{description}' + self.assertTrue(re.findall(regex, iproute2_config)) + table = str(int(table) + 1) + def test_vrf_loopback_ips(self): - table = 1000 - for vrf in self._vrfs: - base = ['vrf', 'name', vrf] + table = '2000' + for vrf in vrfs: + base = base_path + ['name', vrf] self.session.set(base + ['table', str(table)]) - table += 1 + table = str(int(table) + 1) # commit changes self.session.commit() - for vrf in self._vrfs: + + # Verify VRF configuration + for vrf in vrfs: + self.assertTrue(vrf in interfaces()) self.assertTrue(is_intf_addr_assigned(vrf, '127.0.0.1')) self.assertTrue(is_intf_addr_assigned(vrf, '::1')) + def test_vrf_table_id_is_unalterable(self): + # Linux Kernel prohibits the change of a VRF table on the fly. + # VRF must be deleted and recreated! + table = '1000' + vrf = vrfs[0] + base = base_path + ['name', vrf] + self.session.set(base + ['table', table]) + + # commit changes + self.session.commit() + + # Check if VRF has been created + self.assertTrue(vrf in interfaces()) + + table = str(int(table) + 1) + self.session.set(base + ['table', table]) + # check validate() - table ID can not be altered! + with self.assertRaises(ConfigSessionError): + self.session.commit() + if __name__ == '__main__': - unittest.main(verbosity=2) + unittest.main(verbosity=2, failfast=True) |