diff options
Diffstat (limited to 'smoketest/scripts/cli/base_interfaces_test.py')
| -rw-r--r-- | smoketest/scripts/cli/base_interfaces_test.py | 390 |
1 files changed, 283 insertions, 107 deletions
diff --git a/smoketest/scripts/cli/base_interfaces_test.py b/smoketest/scripts/cli/base_interfaces_test.py index 8ee5395d0..f897088ef 100644 --- a/smoketest/scripts/cli/base_interfaces_test.py +++ b/smoketest/scripts/cli/base_interfaces_test.py @@ -1,4 +1,4 @@ -# Copyright (C) 2019-2020 VyOS maintainers and contributors +# Copyright (C) 2019-2021 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -12,16 +12,17 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import re import os import unittest -import json from binascii import hexlify -from netifaces import ifaddresses from netifaces import AF_INET from netifaces import AF_INET6 +from netifaces import ifaddresses +from netifaces import interfaces + +from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSession from vyos.ifconfig import Interface @@ -30,6 +31,7 @@ from vyos.util import read_file from vyos.util import cmd from vyos.util import dict_search from vyos.util import process_named_running +from vyos.util import get_interface_config from vyos.validate import is_intf_addr_assigned from vyos.validate import is_ipv6_link_local @@ -51,24 +53,15 @@ def is_mirrored_to(interface, mirror_if, qdisc): ret_val = True return ret_val - -dhcp6c_config_file = '/run/dhcp6c/dhcp6c.{}.conf' -def get_dhcp6c_config_value(interface, key): - tmp = read_file(dhcp6c_config_file.format(interface)) - tmp = re.findall(r'\n?{}\s+(.*)'.format(key), tmp) - - out = [] - for item in tmp: - out.append(item.replace(';','')) - return out - class BasicInterfaceTest: - class BaseTest(unittest.TestCase): + class TestCase(VyOSUnitTestSHIM.TestCase): _test_ip = False _test_mtu = False _test_vlan = False _test_qinq = False _test_ipv6 = False + _test_ipv6_pd = False + _test_ipv6_dhcpc6 = False _test_mirror = False _base_path = [] @@ -84,37 +77,35 @@ class BasicInterfaceTest: _mtu = '1280' def setUp(self): - self.session = ConfigSession(os.getpid()) - # Setup mirror interfaces for SPAN (Switch Port Analyzer) for span in self._mirror_interfaces: section = Section.section(span) - self.session.set(['interfaces', section, span]) + self.cli_set(['interfaces', section, span]) def tearDown(self): - # Ethernet is handled in its derived class - if 'ethernet' not in self._base_path: - self.session.delete(self._base_path) - # Tear down mirror interfaces for SPAN (Switch Port Analyzer) for span in self._mirror_interfaces: section = Section.section(span) - self.session.delete(['interfaces', section, span]) + self.cli_delete(['interfaces', section, span]) - self.session.commit() - del self.session + self.cli_delete(self._base_path) + self.cli_commit() + + # Verify that no previously interface remained on the system + for intf in self._interfaces: + self.assertNotIn(intf, interfaces()) def test_span_mirror(self): if not self._mirror_interfaces: - return None + self.skipTest('not supported') # Check the two-way mirror rules of ingress and egress for mirror in self._mirror_interfaces: for interface in self._interfaces: - self.session.set(self._base_path + [interface, 'mirror', 'ingress', mirror]) - self.session.set(self._base_path + [interface, 'mirror', 'egress', mirror]) + self.cli_set(self._base_path + [interface, 'mirror', 'ingress', mirror]) + self.cli_set(self._base_path + [interface, 'mirror', 'egress', mirror]) - self.session.commit() + self.cli_commit() # Verify config for mirror in self._mirror_interfaces: @@ -122,45 +113,69 @@ class BasicInterfaceTest: self.assertTrue(is_mirrored_to(interface, mirror, 'ffff')) self.assertTrue(is_mirrored_to(interface, mirror, '1')) + def test_interface_disable(self): + # Check if description can be added to interface and + # can be read back + for intf in self._interfaces: + self.cli_set(self._base_path + [intf, 'disable']) + for option in self._options.get(intf, []): + self.cli_set(self._base_path + [intf] + option.split()) + + self.cli_commit() + + # Validate interface description + for intf in self._interfaces: + self.assertEqual(Interface(intf).get_admin_state(), 'down') def test_interface_description(self): # Check if description can be added to interface and # can be read back for intf in self._interfaces: test_string=f'Description-Test-{intf}' - self.session.set(self._base_path + [intf, 'description', test_string]) + self.cli_set(self._base_path + [intf, 'description', test_string]) for option in self._options.get(intf, []): - self.session.set(self._base_path + [intf] + option.split()) + self.cli_set(self._base_path + [intf] + option.split()) - self.session.commit() + self.cli_commit() # Validate interface description for intf in self._interfaces: test_string=f'Description-Test-{intf}' tmp = read_file(f'/sys/class/net/{intf}/ifalias') - self.assertTrue(tmp, test_string) + self.assertEqual(tmp, test_string) + self.assertEqual(Interface(intf).get_alias(), test_string) + self.cli_delete(self._base_path + [intf, 'description']) + + self.cli_commit() + + # Validate remove interface description "empty" + for intf in self._interfaces: + tmp = read_file(f'/sys/class/net/{intf}/ifalias') + self.assertEqual(tmp, str()) + self.assertEqual(Interface(intf).get_alias(), str()) def test_add_single_ip_address(self): addr = '192.0.2.0/31' for intf in self._interfaces: - self.session.set(self._base_path + [intf, 'address', addr]) + self.cli_set(self._base_path + [intf, 'address', addr]) for option in self._options.get(intf, []): - self.session.set(self._base_path + [intf] + option.split()) + self.cli_set(self._base_path + [intf] + option.split()) - self.session.commit() + self.cli_commit() for intf in self._interfaces: self.assertTrue(is_intf_addr_assigned(intf, addr)) + self.assertEqual(Interface(intf).get_admin_state(), 'up') def test_add_multiple_ip_addresses(self): # Add address for intf in self._interfaces: for addr in self._test_addr: - self.session.set(self._base_path + [intf, 'address', addr]) + self.cli_set(self._base_path + [intf, 'address', addr]) for option in self._options.get(intf, []): - self.session.set(self._base_path + [intf] + option.split()) + self.cli_set(self._base_path + [intf] + option.split()) - self.session.commit() + self.cli_commit() # Validate address for intf in self._interfaces: @@ -175,15 +190,15 @@ class BasicInterfaceTest: def test_ipv6_link_local_address(self): # Common function for IPv6 link-local address assignemnts if not self._test_ipv6: - return None + self.skipTest('not supported') for interface in self._interfaces: base = self._base_path + [interface] for option in self._options.get(interface, []): - self.session.set(base + option.split()) + self.cli_set(base + option.split()) # after commit we must have an IPv6 link-local address - self.session.commit() + self.cli_commit() for interface in self._interfaces: for addr in ifaddresses(interface)[AF_INET6]: @@ -192,26 +207,26 @@ class BasicInterfaceTest: # disable IPv6 link-local address assignment for interface in self._interfaces: base = self._base_path + [interface] - self.session.set(base + ['ipv6', 'address', 'no-default-link-local']) + self.cli_set(base + ['ipv6', 'address', 'no-default-link-local']) # after commit we must have no IPv6 link-local address - self.session.commit() + self.cli_commit() for interface in self._interfaces: self.assertTrue(AF_INET6 not in ifaddresses(interface)) def test_interface_mtu(self): if not self._test_mtu: - return None + self.skipTest('not supported') for intf in self._interfaces: base = self._base_path + [intf] - self.session.set(base + ['mtu', self._mtu]) + self.cli_set(base + ['mtu', self._mtu]) for option in self._options.get(intf, []): - self.session.set(base + option.split()) + self.cli_set(base + option.split()) # commit interface changes - self.session.commit() + self.cli_commit() # verify changed MTU for intf in self._interfaces: @@ -222,21 +237,21 @@ class BasicInterfaceTest: # Testcase if MTU can be changed to 1200 on non IPv6 # enabled interfaces if not self._test_mtu: - return None + self.skipTest('not supported') old_mtu = self._mtu self._mtu = '1200' for intf in self._interfaces: base = self._base_path + [intf] - self.session.set(base + ['mtu', self._mtu]) - self.session.set(base + ['ipv6', 'address', 'no-default-link-local']) + self.cli_set(base + ['mtu', self._mtu]) + self.cli_set(base + ['ipv6', 'address', 'no-default-link-local']) for option in self._options.get(intf, []): - self.session.set(base + option.split()) + self.cli_set(base + option.split()) # commit interface changes - self.session.commit() + self.cli_commit() # verify changed MTU for intf in self._interfaces: @@ -245,22 +260,26 @@ class BasicInterfaceTest: self._mtu = old_mtu - def test_8021q_vlan_interfaces(self): + def test_vif_8021q_interfaces(self): + # XXX: This testcase is not allowed to run as first testcase, reason + # is the Wireless test will first load the wifi kernel hwsim module + # which creates a wlan0 and wlan1 interface which will fail the + # tearDown() test in the end that no interface is allowed to survive! if not self._test_vlan: - return None + self.skipTest('not supported') for interface in self._interfaces: base = self._base_path + [interface] for option in self._options.get(interface, []): - self.session.set(base + option.split()) + self.cli_set(base + option.split()) for vlan in self._vlan_range: base = self._base_path + [interface, 'vif', vlan] - self.session.set(base + ['mtu', self._mtu]) + self.cli_set(base + ['mtu', self._mtu]) for address in self._test_addr: - self.session.set(base + ['address', address]) + self.cli_set(base + ['address', address]) - self.session.commit() + self.cli_commit() for intf in self._interfaces: for vlan in self._vlan_range: @@ -270,29 +289,70 @@ class BasicInterfaceTest: tmp = read_file(f'/sys/class/net/{vif}/mtu') self.assertEqual(tmp, self._mtu) + self.assertEqual(Interface(vif).get_admin_state(), 'up') + + def test_vif_8021q_lower_up_down(self): + # Testcase for https://phabricator.vyos.net/T3349 + if not self._test_vlan: + self.skipTest('not supported') + + for interface in self._interfaces: + base = self._base_path + [interface] + for option in self._options.get(interface, []): + self.cli_set(base + option.split()) + + # disable the lower interface + self.cli_set(base + ['disable']) + + for vlan in self._vlan_range: + vlan_base = self._base_path + [interface, 'vif', vlan] + # disable the vlan interface + self.cli_set(vlan_base + ['disable']) + self.cli_commit() - def test_8021ad_qinq_vlan_interfaces(self): + # re-enable all lower interfaces + for interface in self._interfaces: + base = self._base_path + [interface] + self.cli_delete(base + ['disable']) + + self.cli_commit() + + # verify that the lower interfaces are admin up and the vlan + # interfaces are all admin down + for interface in self._interfaces: + self.assertEqual(Interface(interface).get_admin_state(), 'up') + + for vlan in self._vlan_range: + ifname = f'{interface}.{vlan}' + self.assertEqual(Interface(ifname).get_admin_state(), 'down') + + + def test_vif_s_8021ad_vlan_interfaces(self): + # XXX: This testcase is not allowed to run as first testcase, reason + # is the Wireless test will first load the wifi kernel hwsim module + # which creates a wlan0 and wlan1 interface which will fail the + # tearDown() test in the end that no interface is allowed to survive! if not self._test_qinq: - return None + self.skipTest('not supported') for interface in self._interfaces: base = self._base_path + [interface] for option in self._options.get(interface, []): - self.session.set(base + option.split()) + self.cli_set(base + option.split()) for vif_s in self._qinq_range: for vif_c in self._vlan_range: base = self._base_path + [interface, 'vif-s', vif_s, 'vif-c', vif_c] - self.session.set(base + ['mtu', self._mtu]) + self.cli_set(base + ['mtu', self._mtu]) for address in self._test_addr: - self.session.set(base + ['address', address]) + self.cli_set(base + ['address', address]) - self.session.commit() + self.cli_commit() for interface in self._interfaces: for vif_s in self._qinq_range: - tmp = json.loads(cmd(f'ip -d -j link show dev {interface}.{vif_s}'))[0] + tmp = get_interface_config(f'{interface}.{vif_s}') self.assertEqual(dict_search('linkinfo.info_data.protocol', tmp), '802.1ad') for vif_c in self._vlan_range: @@ -305,26 +365,26 @@ class BasicInterfaceTest: def test_interface_ip_options(self): if not self._test_ip: - return None + self.skipTest('not supported') for interface in self._interfaces: arp_tmo = '300' path = self._base_path + [interface] for option in self._options.get(interface, []): - self.session.set(path + option.split()) + self.cli_set(path + option.split()) # Options - self.session.set(path + ['ip', 'arp-cache-timeout', arp_tmo]) - self.session.set(path + ['ip', 'disable-arp-filter']) - self.session.set(path + ['ip', 'disable-forwarding']) - self.session.set(path + ['ip', 'enable-arp-accept']) - self.session.set(path + ['ip', 'enable-arp-announce']) - self.session.set(path + ['ip', 'enable-arp-ignore']) - self.session.set(path + ['ip', 'enable-proxy-arp']) - self.session.set(path + ['ip', 'proxy-arp-pvlan']) - self.session.set(path + ['ip', 'source-validation', 'loose']) - - self.session.commit() + self.cli_set(path + ['ip', 'arp-cache-timeout', arp_tmo]) + self.cli_set(path + ['ip', 'disable-arp-filter']) + self.cli_set(path + ['ip', 'disable-forwarding']) + self.cli_set(path + ['ip', 'enable-arp-accept']) + self.cli_set(path + ['ip', 'enable-arp-announce']) + self.cli_set(path + ['ip', 'enable-arp-ignore']) + self.cli_set(path + ['ip', 'enable-proxy-arp']) + self.cli_set(path + ['ip', 'proxy-arp-pvlan']) + self.cli_set(path + ['ip', 'source-validation', 'loose']) + + self.cli_commit() for interface in self._interfaces: tmp = read_file(f'/proc/sys/net/ipv4/neigh/{interface}/base_reachable_time_ms') @@ -356,19 +416,19 @@ class BasicInterfaceTest: def test_interface_ipv6_options(self): if not self._test_ipv6: - return None + self.skipTest('not supported') for interface in self._interfaces: dad_transmits = '10' path = self._base_path + [interface] for option in self._options.get(interface, []): - self.session.set(path + option.split()) + self.cli_set(path + option.split()) # Options - self.session.set(path + ['ipv6', 'disable-forwarding']) - self.session.set(path + ['ipv6', 'dup-addr-detect-transmits', dad_transmits]) + self.cli_set(path + ['ipv6', 'disable-forwarding']) + self.cli_set(path + ['ipv6', 'dup-addr-detect-transmits', dad_transmits]) - self.session.commit() + self.cli_commit() for interface in self._interfaces: tmp = read_file(f'/proc/sys/net/ipv6/conf/{interface}/forwarding') @@ -377,40 +437,156 @@ class BasicInterfaceTest: tmp = read_file(f'/proc/sys/net/ipv6/conf/{interface}/dad_transmits') self.assertEqual(dad_transmits, tmp) + def test_dhcpv6_clinet_options(self): + if not self._test_ipv6_dhcpc6: + self.skipTest('not supported') - def test_ipv6_dhcpv6_prefix_delegation(self): - if not self._test_ipv6: - return None + duid_base = 10 + for interface in self._interfaces: + duid = '00:01:00:01:27:71:db:f0:00:50:00:00:00:{}'.format(duid_base) + path = self._base_path + [interface] + for option in self._options.get(interface, []): + self.cli_set(path + option.split()) + + # Enable DHCPv6 client + self.cli_set(path + ['address', 'dhcpv6']) + self.cli_set(path + ['dhcpv6-options', 'rapid-commit']) + self.cli_set(path + ['dhcpv6-options', 'parameters-only']) + self.cli_set(path + ['dhcpv6-options', 'duid', duid]) + duid_base += 1 + + self.cli_commit() + + duid_base = 10 + for interface in self._interfaces: + duid = '00:01:00:01:27:71:db:f0:00:50:00:00:00:{}'.format(duid_base) + dhcpc6_config = read_file(f'/run/dhcp6c/dhcp6c.{interface}.conf') + self.assertIn(f'interface {interface} ' + '{', dhcpc6_config) + self.assertIn(f' request domain-name-servers;', dhcpc6_config) + self.assertIn(f' request domain-name;', dhcpc6_config) + self.assertIn(f' information-only;', dhcpc6_config) + self.assertIn(f' send ia-na 0;', dhcpc6_config) + self.assertIn(f' send rapid-commit;', dhcpc6_config) + self.assertIn(f' send client-id {duid};', dhcpc6_config) + self.assertIn('};', dhcpc6_config) + duid_base += 1 + + # Check for running process + self.assertTrue(process_named_running('dhcp6c')) + + def test_dhcpv6pd_auto_sla_id(self): + if not self._test_ipv6_pd: + self.skipTest('not supported') + + prefix_len = '56' + sla_len = str(64 - int(prefix_len)) + + delegatees = ['dum2340', 'dum2341', 'dum2342', 'dum2343', 'dum2344'] - address = '1' - sla_id = '0' - sla_len = '8' for interface in self._interfaces: path = self._base_path + [interface] for option in self._options.get(interface, []): - self.session.set(path + option.split()) + self.cli_set(path + option.split()) + address = '1' # prefix delegation stuff pd_base = path + ['dhcpv6-options', 'pd', '0'] - self.session.set(pd_base + ['length', '56']) - self.session.set(pd_base + ['interface', interface, 'address', address]) - self.session.set(pd_base + ['interface', interface, 'sla-id', sla_id]) + self.cli_set(pd_base + ['length', prefix_len]) + + for delegatee in delegatees: + section = Section.section(delegatee) + self.cli_set(['interfaces', section, delegatee]) + self.cli_set(pd_base + ['interface', delegatee, 'address', address]) + # increment interface address + address = str(int(address) + 1) - self.session.commit() + self.cli_commit() for interface in self._interfaces: + dhcpc6_config = read_file(f'/run/dhcp6c/dhcp6c.{interface}.conf') + # verify DHCPv6 prefix delegation - # will return: ['delegation', '::/56 infinity;'] - tmp = get_dhcp6c_config_value(interface, 'prefix')[1].split()[0] # mind the whitespace - self.assertEqual(tmp, '::/56') - tmp = get_dhcp6c_config_value(interface, 'prefix-interface')[0].split()[0] - self.assertEqual(tmp, interface) - tmp = get_dhcp6c_config_value(interface, 'ifid')[0] - self.assertEqual(tmp, address) - tmp = get_dhcp6c_config_value(interface, 'sla-id')[0] - self.assertEqual(tmp, sla_id) - tmp = get_dhcp6c_config_value(interface, 'sla-len')[0] - self.assertEqual(tmp, sla_len) + self.assertIn(f'prefix ::/{prefix_len} infinity;', dhcpc6_config) + + address = '1' + sla_id = '0' + for delegatee in delegatees: + self.assertIn(f'prefix-interface {delegatee}' + r' {', dhcpc6_config) + self.assertIn(f'ifid {address};', dhcpc6_config) + self.assertIn(f'sla-id {sla_id};', dhcpc6_config) + self.assertIn(f'sla-len {sla_len};', dhcpc6_config) + + # increment sla-id + sla_id = str(int(sla_id) + 1) + # increment interface address + address = str(int(address) + 1) # Check for running process self.assertTrue(process_named_running('dhcp6c')) + + for delegatee in delegatees: + # we can already cleanup the test delegatee interface here + # as until commit() is called, nothing happens + section = Section.section(delegatee) + self.cli_delete(['interfaces', section, delegatee]) + + def test_dhcpv6pd_manual_sla_id(self): + if not self._test_ipv6_pd: + self.skipTest('not supported') + + prefix_len = '56' + sla_len = str(64 - int(prefix_len)) + + delegatees = ['dum3340', 'dum3341', 'dum3342', 'dum3343', 'dum3344'] + + for interface in self._interfaces: + path = self._base_path + [interface] + for option in self._options.get(interface, []): + self.cli_set(path + option.split()) + + # prefix delegation stuff + address = '1' + sla_id = '1' + pd_base = path + ['dhcpv6-options', 'pd', '0'] + self.cli_set(pd_base + ['length', prefix_len]) + + for delegatee in delegatees: + section = Section.section(delegatee) + self.cli_set(['interfaces', section, delegatee]) + self.cli_set(pd_base + ['interface', delegatee, 'address', address]) + self.cli_set(pd_base + ['interface', delegatee, 'sla-id', sla_id]) + + # increment interface address + address = str(int(address) + 1) + sla_id = str(int(sla_id) + 1) + + self.cli_commit() + + # Verify dhcpc6 client configuration + for interface in self._interfaces: + address = '1' + sla_id = '1' + dhcpc6_config = read_file(f'/run/dhcp6c/dhcp6c.{interface}.conf') + + # verify DHCPv6 prefix delegation + self.assertIn(f'prefix ::/{prefix_len} infinity;', dhcpc6_config) + + for delegatee in delegatees: + self.assertIn(f'prefix-interface {delegatee}' + r' {', dhcpc6_config) + self.assertIn(f'ifid {address};', dhcpc6_config) + self.assertIn(f'sla-id {sla_id};', dhcpc6_config) + self.assertIn(f'sla-len {sla_len};', dhcpc6_config) + + # increment sla-id + sla_id = str(int(sla_id) + 1) + # increment interface address + address = str(int(address) + 1) + + # Check for running process + self.assertTrue(process_named_running('dhcp6c')) + + for delegatee in delegatees: + # we can already cleanup the test delegatee interface here + # as until commit() is called, nothing happens + section = Section.section(delegatee) + self.cli_delete(['interfaces', section, delegatee]) |
