summaryrefslogtreecommitdiff
path: root/smoketest/scripts/cli/base_interfaces_test.py
diff options
context:
space:
mode:
authorkumvijaya <kuvmijaya@gmail.com>2024-09-26 11:31:07 +0530
committerkumvijaya <kuvmijaya@gmail.com>2024-09-26 11:31:07 +0530
commita950059053f7394acfb453cc0d8194aa3dc721fa (patch)
treeeb0acf278f649b5d1417e18e34d728efcd16e745 /smoketest/scripts/cli/base_interfaces_test.py
parentf0815f3e9b212f424f5adb0c572a71119ad4a8a0 (diff)
downloadvyos-workflow-test-temp-a950059053f7394acfb453cc0d8194aa3dc721fa.tar.gz
vyos-workflow-test-temp-a950059053f7394acfb453cc0d8194aa3dc721fa.zip
T6732: added same as vyos 1x
Diffstat (limited to 'smoketest/scripts/cli/base_interfaces_test.py')
-rw-r--r--smoketest/scripts/cli/base_interfaces_test.py1320
1 files changed, 1320 insertions, 0 deletions
diff --git a/smoketest/scripts/cli/base_interfaces_test.py b/smoketest/scripts/cli/base_interfaces_test.py
new file mode 100644
index 0000000..593b4b4
--- /dev/null
+++ b/smoketest/scripts/cli/base_interfaces_test.py
@@ -0,0 +1,1320 @@
+# Copyright (C) 2019-2024 VyOS maintainers and contributors
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 or later as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import re
+
+from netifaces import AF_INET
+from netifaces import AF_INET6
+from netifaces import ifaddresses
+
+from base_vyostest_shim import VyOSUnitTestSHIM
+
+from vyos.configsession import ConfigSessionError
+from vyos.defaults import directories
+from vyos.ifconfig import Interface
+from vyos.ifconfig import Section
+from vyos.pki import CERT_BEGIN
+from vyos.utils.file import read_file
+from vyos.utils.dict import dict_search
+from vyos.utils.process import cmd
+from vyos.utils.process import process_named_running
+from vyos.utils.network import get_interface_config
+from vyos.utils.network import get_interface_vrf
+from vyos.utils.network import get_vrf_tableid
+from vyos.utils.network import interface_exists
+from vyos.utils.network import is_intf_addr_assigned
+from vyos.utils.network import is_ipv6_link_local
+from vyos.utils.network import get_nft_vrf_zone_mapping
+from vyos.xml_ref import cli_defined
+
+dhclient_base_dir = directories['isc_dhclient_dir']
+dhclient_process_name = 'dhclient'
+dhcp6c_base_dir = directories['dhcp6_client_dir']
+dhcp6c_process_name = 'dhcp6c'
+
+server_ca_root_cert_data = """
+MIIBcTCCARagAwIBAgIUDcAf1oIQV+6WRaW7NPcSnECQ/lUwCgYIKoZIzj0EAwIw
+HjEcMBoGA1UEAwwTVnlPUyBzZXJ2ZXIgcm9vdCBDQTAeFw0yMjAyMTcxOTQxMjBa
+Fw0zMjAyMTUxOTQxMjBaMB4xHDAaBgNVBAMME1Z5T1Mgc2VydmVyIHJvb3QgQ0Ew
+WTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAAQ0y24GzKQf4aM2Ir12tI9yITOIzAUj
+ZXyJeCmYI6uAnyAMqc4Q4NKyfq3nBi4XP87cs1jlC1P2BZ8MsjL5MdGWozIwMDAP
+BgNVHRMBAf8EBTADAQH/MB0GA1UdDgQWBBRwC/YaieMEnjhYa7K3Flw/o0SFuzAK
+BggqhkjOPQQDAgNJADBGAiEAh3qEj8vScsjAdBy5shXzXDVVOKWCPTdGrPKnu8UW
+a2cCIQDlDgkzWmn5ujc5ATKz1fj+Se/aeqwh4QyoWCVTFLIxhQ==
+"""
+
+server_ca_intermediate_cert_data = """
+MIIBmTCCAT+gAwIBAgIUNzrtHzLmi3QpPK57tUgCnJZhXXQwCgYIKoZIzj0EAwIw
+HjEcMBoGA1UEAwwTVnlPUyBzZXJ2ZXIgcm9vdCBDQTAeFw0yMjAyMTcxOTQxMjFa
+Fw0zMjAyMTUxOTQxMjFaMCYxJDAiBgNVBAMMG1Z5T1Mgc2VydmVyIGludGVybWVk
+aWF0ZSBDQTBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABEl2nJ1CzoqPV6hWII2m
+eGN/uieU6wDMECTk/LgG8CCCSYb488dibUiFN/1UFsmoLIdIhkx/6MUCYh62m8U2
+WNujUzBRMA8GA1UdEwEB/wQFMAMBAf8wHQYDVR0OBBYEFMV3YwH88I5gFsFUibbQ
+kMR0ECPsMB8GA1UdIwQYMBaAFHAL9hqJ4wSeOFhrsrcWXD+jRIW7MAoGCCqGSM49
+BAMCA0gAMEUCIQC/ahujD9dp5pMMCd3SZddqGC9cXtOwMN0JR3e5CxP13AIgIMQm
+jMYrinFoInxmX64HfshYqnUY8608nK9D2BNPOHo=
+"""
+
+client_ca_root_cert_data = """
+MIIBcDCCARagAwIBAgIUZmoW2xVdwkZSvglnkCq0AHKa6zIwCgYIKoZIzj0EAwIw
+HjEcMBoGA1UEAwwTVnlPUyBjbGllbnQgcm9vdCBDQTAeFw0yMjAyMTcxOTQxMjFa
+Fw0zMjAyMTUxOTQxMjFaMB4xHDAaBgNVBAMME1Z5T1MgY2xpZW50IHJvb3QgQ0Ew
+WTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAATUpKXzQk2NOVKDN4VULk2yw4mOKPvn
+mg947+VY7lbpfOfAUD0QRg95qZWCw899eKnXp/U4TkAVrmEKhUb6OJTFozIwMDAP
+BgNVHRMBAf8EBTADAQH/MB0GA1UdDgQWBBTXu6xGWUl25X3sBtrhm3BJSICIATAK
+BggqhkjOPQQDAgNIADBFAiEAnTzEwuTI9bz2Oae3LZbjP6f/f50KFJtjLZFDbQz7
+DpYCIDNRHV8zBUibC+zg5PqMpQBKd/oPfNU76nEv6xkp/ijO
+"""
+
+client_ca_intermediate_cert_data = """
+MIIBmDCCAT+gAwIBAgIUJEMdotgqA7wU4XXJvEzDulUAGqgwCgYIKoZIzj0EAwIw
+HjEcMBoGA1UEAwwTVnlPUyBjbGllbnQgcm9vdCBDQTAeFw0yMjAyMTcxOTQxMjJa
+Fw0zMjAyMTUxOTQxMjJaMCYxJDAiBgNVBAMMG1Z5T1MgY2xpZW50IGludGVybWVk
+aWF0ZSBDQTBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABGyIVIi217s9j3O+WQ2b
+6R65/Z0ZjQpELxPjBRc0CA0GFCo+pI5EvwI+jNFArvTAJ5+ZdEWUJ1DQhBKDDQdI
+avCjUzBRMA8GA1UdEwEB/wQFMAMBAf8wHQYDVR0OBBYEFOUS8oNJjChB1Rb9Blcl
+ETvziHJ9MB8GA1UdIwQYMBaAFNe7rEZZSXblfewG2uGbcElIgIgBMAoGCCqGSM49
+BAMCA0cAMEQCIArhaxWgRsAUbEeNHD/ULtstLHxw/P97qPUSROLQld53AiBjgiiz
+9pDfISmpekZYz6bIDWRIR0cXUToZEMFNzNMrQg==
+"""
+
+client_cert_data = """
+MIIBmTCCAUCgAwIBAgIUV5T77XdE/tV82Tk4Vzhp5BIFFm0wCgYIKoZIzj0EAwIw
+JjEkMCIGA1UEAwwbVnlPUyBjbGllbnQgaW50ZXJtZWRpYXRlIENBMB4XDTIyMDIx
+NzE5NDEyMloXDTMyMDIxNTE5NDEyMlowIjEgMB4GA1UEAwwXVnlPUyBjbGllbnQg
+Y2VydGlmaWNhdGUwWTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAARuyynqfc/qJj5e
+KJ03oOH8X4Z8spDeAPO9WYckMM0ldPj+9kU607szFzPwjaPWzPdgyIWz3hcN8yAh
+CIhytmJao1AwTjAMBgNVHRMBAf8EAjAAMB0GA1UdDgQWBBTIFKrxZ+PqOhYSUqnl
+TGCUmM7wTjAfBgNVHSMEGDAWgBTlEvKDSYwoQdUW/QZXJRE784hyfTAKBggqhkjO
+PQQDAgNHADBEAiAvO8/jvz05xqmP3OXD53XhfxDLMIxzN4KPoCkFqvjlhQIgIHq2
+/geVx3rAOtSps56q/jiDouN/aw01TdpmGKVAa9U=
+"""
+
+client_key_data = """
+MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgxaxAQsJwjoOCByQE
++qSYKtKtJzbdbOnTsKNSrfgkFH6hRANCAARuyynqfc/qJj5eKJ03oOH8X4Z8spDe
+APO9WYckMM0ldPj+9kU607szFzPwjaPWzPdgyIWz3hcN8yAhCIhytmJa
+"""
+
+def get_wpa_supplicant_value(interface, key):
+ tmp = read_file(f'/run/wpa_supplicant/{interface}.conf')
+ tmp = re.findall(r'\n?{}=(.*)'.format(key), tmp)
+ return tmp[0]
+
+def get_certificate_count(interface, cert_type):
+ tmp = read_file(f'/run/wpa_supplicant/{interface}_{cert_type}.pem')
+ return tmp.count(CERT_BEGIN)
+
+def is_mirrored_to(interface, mirror_if, qdisc):
+ """
+ Ask TC if we are mirroring traffic to a discrete interface.
+
+ interface: source interface
+ mirror_if: destination where we mirror our data to
+ qdisc: must be ffff or 1 for ingress/egress
+ """
+ if qdisc not in ['ffff', '1']:
+ raise ValueError()
+
+ ret_val = False
+ tmp = cmd(f'tc -s -p filter ls dev {interface} parent {qdisc}: | grep mirred')
+ tmp = tmp.lower()
+ if mirror_if in tmp:
+ ret_val = True
+ return ret_val
+class BasicInterfaceTest:
+ class TestCase(VyOSUnitTestSHIM.TestCase):
+ _test_dhcp = False
+ _test_eapol = False
+ _test_ip = False
+ _test_mtu = False
+ _test_vlan = False
+ _test_qinq = False
+ _test_ipv6 = False
+ _test_ipv6_pd = False
+ _test_ipv6_dhcpc6 = False
+ _test_mirror = False
+ _test_vrf = False
+ _base_path = []
+
+ _options = {}
+ _interfaces = []
+ _qinq_range = ['10', '20', '30']
+ _vlan_range = ['100', '200', '300', '2000']
+ _test_addr = ['192.0.2.1/26', '192.0.2.255/31', '192.0.2.64/32',
+ '2001:db8:1::ffff/64', '2001:db8:101::1/112']
+
+ _mirror_interfaces = []
+ # choose IPv6 minimum MTU value for tests - this must always work
+ _mtu = '1280'
+
+ @classmethod
+ def setUpClass(cls):
+ super(BasicInterfaceTest.TestCase, cls).setUpClass()
+
+ # XXX the case of test_vif_8021q_mtu_limits, below, shows that
+ # we should extend cli_defined to support more complex queries
+ cls._test_vlan = cli_defined(cls._base_path, 'vif')
+ cls._test_qinq = cli_defined(cls._base_path, 'vif-s')
+ cls._test_dhcp = cli_defined(cls._base_path, 'dhcp-options')
+ cls._test_eapol = cli_defined(cls._base_path, 'eapol')
+ cls._test_ip = cli_defined(cls._base_path, 'ip')
+ cls._test_ipv6 = cli_defined(cls._base_path, 'ipv6')
+ cls._test_ipv6_dhcpc6 = cli_defined(cls._base_path, 'dhcpv6-options')
+ cls._test_ipv6_pd = cli_defined(cls._base_path + ['dhcpv6-options'], 'pd')
+ cls._test_mtu = cli_defined(cls._base_path, 'mtu')
+ cls._test_vrf = cli_defined(cls._base_path, 'vrf')
+
+ # Setup mirror interfaces for SPAN (Switch Port Analyzer)
+ for span in cls._mirror_interfaces:
+ section = Section.section(span)
+ cls.cli_set(cls, ['interfaces', section, span])
+
+ @classmethod
+ def tearDownClass(cls):
+ # Tear down mirror interfaces for SPAN (Switch Port Analyzer)
+ for span in cls._mirror_interfaces:
+ section = Section.section(span)
+ cls.cli_delete(cls, ['interfaces', section, span])
+
+ super(BasicInterfaceTest.TestCase, cls).tearDownClass()
+
+ def tearDown(self):
+ self.cli_delete(self._base_path)
+ self.cli_commit()
+
+ # Verify that no previously interface remained on the system
+ ct_map = get_nft_vrf_zone_mapping()
+ for intf in self._interfaces:
+ self.assertFalse(interface_exists(intf))
+ for map_entry in ct_map:
+ self.assertNotEqual(intf, map_entry['interface'])
+
+ # No daemon started during tests should remain running
+ for daemon in ['dhcp6c', 'dhclient']:
+ # if _interface list is populated do a more fine grained search
+ # by also checking the cmd arguments passed to the daemon
+ if self._interfaces:
+ for tmp in self._interfaces:
+ self.assertFalse(process_named_running(daemon, tmp))
+ else:
+ self.assertFalse(process_named_running(daemon))
+
+ def test_dhcp_disable_interface(self):
+ if not self._test_dhcp:
+ self.skipTest('not supported')
+
+ # When interface is configured as admin down, it must be admin down
+ # even when dhcpc starts on the given interface
+ for interface in self._interfaces:
+ self.cli_set(self._base_path + [interface, 'disable'])
+ for option in self._options.get(interface, []):
+ self.cli_set(self._base_path + [interface] + option.split())
+
+ # Also enable DHCP (ISC DHCP always places interface in admin up
+ # state so we check that we do not start DHCP client.
+ # https://vyos.dev/T2767
+ self.cli_set(self._base_path + [interface, 'address', 'dhcp'])
+
+ self.cli_commit()
+
+ # Validate interface state
+ for interface in self._interfaces:
+ flags = read_file(f'/sys/class/net/{interface}/flags')
+ self.assertEqual(int(flags, 16) & 1, 0)
+
+ def test_dhcp_client_options(self):
+ if not self._test_dhcp or not self._test_vrf:
+ self.skipTest('not supported')
+
+ client_id = 'VyOS-router'
+ distance = '100'
+ hostname = 'vyos'
+ vendor_class_id = 'vyos-vendor'
+ user_class = 'vyos'
+
+ for interface in self._interfaces:
+ for option in self._options.get(interface, []):
+ self.cli_set(self._base_path + [interface] + option.split())
+
+ self.cli_set(self._base_path + [interface, 'address', 'dhcp'])
+ self.cli_set(self._base_path + [interface, 'dhcp-options', 'client-id', client_id])
+ self.cli_set(self._base_path + [interface, 'dhcp-options', 'default-route-distance', distance])
+ self.cli_set(self._base_path + [interface, 'dhcp-options', 'host-name', hostname])
+ self.cli_set(self._base_path + [interface, 'dhcp-options', 'vendor-class-id', vendor_class_id])
+ self.cli_set(self._base_path + [interface, 'dhcp-options', 'user-class', user_class])
+
+ self.cli_commit()
+
+ for interface in self._interfaces:
+ # Check if dhclient process runs
+ dhclient_pid = process_named_running(dhclient_process_name, cmdline=interface, timeout=10)
+ self.assertTrue(dhclient_pid)
+
+ dhclient_config = read_file(f'{dhclient_base_dir}/dhclient_{interface}.conf')
+ self.assertIn(f'request subnet-mask, broadcast-address, routers, domain-name-servers', dhclient_config)
+ self.assertIn(f'require subnet-mask;', dhclient_config)
+ self.assertIn(f'send host-name "{hostname}";', dhclient_config)
+ self.assertIn(f'send dhcp-client-identifier "{client_id}";', dhclient_config)
+ self.assertIn(f'send vendor-class-identifier "{vendor_class_id}";', dhclient_config)
+ self.assertIn(f'send user-class "{user_class}";', dhclient_config)
+
+ # and the commandline has the appropriate options
+ cmdline = read_file(f'/proc/{dhclient_pid}/cmdline')
+ self.assertIn(f'-e\x00IF_METRIC={distance}', cmdline)
+
+ def test_dhcp_vrf(self):
+ if not self._test_dhcp or not self._test_vrf:
+ self.skipTest('not supported')
+
+ vrf_name = 'purple4'
+ self.cli_set(['vrf', 'name', vrf_name, 'table', '65000'])
+
+ for interface in self._interfaces:
+ for option in self._options.get(interface, []):
+ self.cli_set(self._base_path + [interface] + option.split())
+
+ self.cli_set(self._base_path + [interface, 'address', 'dhcp'])
+ self.cli_set(self._base_path + [interface, 'vrf', vrf_name])
+
+ self.cli_commit()
+
+ # Validate interface state
+ for interface in self._interfaces:
+ tmp = get_interface_vrf(interface)
+ self.assertEqual(tmp, vrf_name)
+
+ # Check if dhclient process runs
+ dhclient_pid = process_named_running(dhclient_process_name, cmdline=interface, timeout=10)
+ self.assertTrue(dhclient_pid)
+ # .. inside the appropriate VRF instance
+ vrf_pids = cmd(f'ip vrf pids {vrf_name}')
+ self.assertIn(str(dhclient_pid), vrf_pids)
+ # and the commandline has the appropriate options
+ cmdline = read_file(f'/proc/{dhclient_pid}/cmdline')
+ self.assertIn('-e\x00IF_METRIC=210', cmdline) # 210 is the default value
+
+ self.cli_delete(['vrf', 'name', vrf_name])
+
+ def test_dhcpv6_vrf(self):
+ if not self._test_ipv6_dhcpc6 or not self._test_vrf:
+ self.skipTest('not supported')
+
+ vrf_name = 'purple6'
+ self.cli_set(['vrf', 'name', vrf_name, 'table', '65001'])
+
+ # When interface is configured as admin down, it must be admin down
+ # even when dhcpc starts on the given interface
+ for interface in self._interfaces:
+ for option in self._options.get(interface, []):
+ self.cli_set(self._base_path + [interface] + option.split())
+
+ self.cli_set(self._base_path + [interface, 'address', 'dhcpv6'])
+ self.cli_set(self._base_path + [interface, 'vrf', vrf_name])
+
+ self.cli_commit()
+
+ # Validate interface state
+ for interface in self._interfaces:
+ tmp = get_interface_vrf(interface)
+ self.assertEqual(tmp, vrf_name)
+
+ # Check if dhclient process runs
+ tmp = process_named_running(dhcp6c_process_name, cmdline=interface, timeout=10)
+ self.assertTrue(tmp)
+ # .. inside the appropriate VRF instance
+ vrf_pids = cmd(f'ip vrf pids {vrf_name}')
+ self.assertIn(str(tmp), vrf_pids)
+
+ self.cli_delete(['vrf', 'name', vrf_name])
+
+ def test_move_interface_between_vrf_instances(self):
+ if not self._test_vrf:
+ self.skipTest('not supported')
+
+ vrf1_name = 'smoketest_mgmt1'
+ vrf1_table = '5424'
+ vrf2_name = 'smoketest_mgmt2'
+ vrf2_table = '7412'
+
+ self.cli_set(['vrf', 'name', vrf1_name, 'table', vrf1_table])
+ self.cli_set(['vrf', 'name', vrf2_name, 'table', vrf2_table])
+
+ # move interface into first VRF
+ for interface in self._interfaces:
+ for option in self._options.get(interface, []):
+ self.cli_set(self._base_path + [interface] + option.split())
+ self.cli_set(self._base_path + [interface, 'vrf', vrf1_name])
+
+ self.cli_commit()
+
+ # check that interface belongs to proper VRF
+ for interface in self._interfaces:
+ tmp = get_interface_vrf(interface)
+ self.assertEqual(tmp, vrf1_name)
+
+ tmp = get_interface_config(vrf1_name)
+ self.assertEqual(int(vrf1_table), get_vrf_tableid(interface))
+
+ # move interface into second VRF
+ for interface in self._interfaces:
+ self.cli_set(self._base_path + [interface, 'vrf', vrf2_name])
+
+ self.cli_commit()
+
+ # check that interface belongs to proper VRF
+ for interface in self._interfaces:
+ tmp = get_interface_vrf(interface)
+ self.assertEqual(tmp, vrf2_name)
+
+ tmp = get_interface_config(vrf2_name)
+ self.assertEqual(int(vrf2_table), get_vrf_tableid(interface))
+
+ self.cli_delete(['vrf', 'name', vrf1_name])
+ self.cli_delete(['vrf', 'name', vrf2_name])
+
+ def test_add_to_invalid_vrf(self):
+ if not self._test_vrf:
+ self.skipTest('not supported')
+
+ # move interface into first VRF
+ for interface in self._interfaces:
+ for option in self._options.get(interface, []):
+ self.cli_set(self._base_path + [interface] + option.split())
+ self.cli_set(self._base_path + [interface, 'vrf', 'invalid'])
+
+ # check validate() - can not use a non-existing VRF
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
+
+ for interface in self._interfaces:
+ self.cli_delete(self._base_path + [interface, 'vrf', 'invalid'])
+ self.cli_set(self._base_path + [interface, 'description', 'test_add_to_invalid_vrf'])
+
+ def test_span_mirror(self):
+ if not self._mirror_interfaces:
+ self.skipTest('not supported')
+
+ # Check the two-way mirror rules of ingress and egress
+ for mirror in self._mirror_interfaces:
+ for interface in self._interfaces:
+ self.cli_set(self._base_path + [interface, 'mirror', 'ingress', mirror])
+ self.cli_set(self._base_path + [interface, 'mirror', 'egress', mirror])
+
+ self.cli_commit()
+
+ # Verify config
+ for mirror in self._mirror_interfaces:
+ for interface in self._interfaces:
+ self.assertTrue(is_mirrored_to(interface, mirror, 'ffff'))
+ self.assertTrue(is_mirrored_to(interface, mirror, '1'))
+
+ def test_interface_disable(self):
+ # Check if description can be added to interface and
+ # can be read back
+ for intf in self._interfaces:
+ self.cli_set(self._base_path + [intf, 'disable'])
+ for option in self._options.get(intf, []):
+ self.cli_set(self._base_path + [intf] + option.split())
+
+ self.cli_commit()
+
+ # Validate interface description
+ for intf in self._interfaces:
+ self.assertEqual(Interface(intf).get_admin_state(), 'down')
+
+ def test_interface_description(self):
+ # Check if description can be added to interface and
+ # can be read back
+ for intf in self._interfaces:
+ test_string=f'Description-Test-{intf}'
+ self.cli_set(self._base_path + [intf, 'description', test_string])
+ for option in self._options.get(intf, []):
+ self.cli_set(self._base_path + [intf] + option.split())
+
+ self.cli_commit()
+
+ # Validate interface description
+ for intf in self._interfaces:
+ test_string=f'Description-Test-{intf}'
+ tmp = read_file(f'/sys/class/net/{intf}/ifalias')
+ self.assertEqual(tmp, test_string)
+ self.assertEqual(Interface(intf).get_alias(), test_string)
+ self.cli_delete(self._base_path + [intf, 'description'])
+
+ self.cli_commit()
+
+ # Validate remove interface description "empty"
+ for intf in self._interfaces:
+ tmp = read_file(f'/sys/class/net/{intf}/ifalias')
+ self.assertEqual(tmp, str())
+ self.assertEqual(Interface(intf).get_alias(), str())
+
+ # Test maximum interface description lengt (255 characters)
+ test_string='abcdefghijklmnopqrstuvwxyz0123456789abcdefghijklmnopqrstuvwxyz0123456789abcdefghijklmnopqrstuvwxyz0123456789abcdefghijklmnopqrstuvwxyz0123456789abcdefghijklmnopqrstuvwxyz0123456789abcdefghijklmnopqrstuvwxyz0123456789abcdefghijklmnopqrstuvwxyz0123456789___'
+ for intf in self._interfaces:
+
+ self.cli_set(self._base_path + [intf, 'description', test_string])
+ for option in self._options.get(intf, []):
+ self.cli_set(self._base_path + [intf] + option.split())
+
+ self.cli_commit()
+
+ # Validate interface description
+ for intf in self._interfaces:
+ tmp = read_file(f'/sys/class/net/{intf}/ifalias')
+ self.assertEqual(tmp, test_string)
+ self.assertEqual(Interface(intf).get_alias(), test_string)
+
+ def test_add_single_ip_address(self):
+ addr = '192.0.2.0/31'
+ for intf in self._interfaces:
+ self.cli_set(self._base_path + [intf, 'address', addr])
+ for option in self._options.get(intf, []):
+ self.cli_set(self._base_path + [intf] + option.split())
+
+ self.cli_commit()
+
+ for intf in self._interfaces:
+ self.assertTrue(is_intf_addr_assigned(intf, addr))
+ self.assertEqual(Interface(intf).get_admin_state(), 'up')
+
+ def test_add_multiple_ip_addresses(self):
+ # Add address
+ for intf in self._interfaces:
+ for option in self._options.get(intf, []):
+ self.cli_set(self._base_path + [intf] + option.split())
+ for addr in self._test_addr:
+ self.cli_set(self._base_path + [intf, 'address', addr])
+
+ self.cli_commit()
+
+ # Validate address
+ for intf in self._interfaces:
+ for af in AF_INET, AF_INET6:
+ for addr in ifaddresses(intf)[af]:
+ # checking link local addresses makes no sense
+ if is_ipv6_link_local(addr['addr']):
+ continue
+
+ self.assertTrue(is_intf_addr_assigned(intf, addr['addr']))
+
+ def test_ipv6_link_local_address(self):
+ # Common function for IPv6 link-local address assignemnts
+ if not self._test_ipv6:
+ self.skipTest('not supported')
+
+ for interface in self._interfaces:
+ base = self._base_path + [interface]
+ # just set the interface base without any option - some interfaces
+ # (VTI) do not require any option to be brought up
+ self.cli_set(base)
+ for option in self._options.get(interface, []):
+ self.cli_set(base + option.split())
+
+ # after commit we must have an IPv6 link-local address
+ self.cli_commit()
+
+ for interface in self._interfaces:
+ self.assertIn(AF_INET6, ifaddresses(interface))
+ for addr in ifaddresses(interface)[AF_INET6]:
+ self.assertTrue(is_ipv6_link_local(addr['addr']))
+
+ # disable IPv6 link-local address assignment
+ for interface in self._interfaces:
+ base = self._base_path + [interface]
+ self.cli_set(base + ['ipv6', 'address', 'no-default-link-local'])
+
+ # after commit we must have no IPv6 link-local address
+ self.cli_commit()
+
+ for interface in self._interfaces:
+ self.assertNotIn(AF_INET6, ifaddresses(interface))
+
+ def test_interface_mtu(self):
+ if not self._test_mtu:
+ self.skipTest('not supported')
+
+ for intf in self._interfaces:
+ base = self._base_path + [intf]
+ self.cli_set(base + ['mtu', self._mtu])
+ for option in self._options.get(intf, []):
+ self.cli_set(base + option.split())
+
+ # commit interface changes
+ self.cli_commit()
+
+ # verify changed MTU
+ for intf in self._interfaces:
+ tmp = get_interface_config(intf)
+ self.assertEqual(tmp['mtu'], int(self._mtu))
+
+ def test_mtu_1200_no_ipv6_interface(self):
+ # Testcase if MTU can be changed to 1200 on non IPv6
+ # enabled interfaces
+ if not self._test_mtu:
+ self.skipTest('not supported')
+
+ old_mtu = self._mtu
+ self._mtu = '1200'
+
+ for intf in self._interfaces:
+ base = self._base_path + [intf]
+ for option in self._options.get(intf, []):
+ self.cli_set(base + option.split())
+ self.cli_set(base + ['mtu', self._mtu])
+
+ # check validate() - can not set low MTU if 'no-default-link-local'
+ # is not set on CLI
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
+
+ for intf in self._interfaces:
+ base = self._base_path + [intf]
+ self.cli_set(base + ['ipv6', 'address', 'no-default-link-local'])
+
+ # commit interface changes
+ self.cli_commit()
+
+ # verify changed MTU
+ for intf in self._interfaces:
+ tmp = get_interface_config(intf)
+ self.assertEqual(tmp['mtu'], int(self._mtu))
+
+ self._mtu = old_mtu
+
+ def test_vif_8021q_interfaces(self):
+ # XXX: This testcase is not allowed to run as first testcase, reason
+ # is the Wireless test will first load the wifi kernel hwsim module
+ # which creates a wlan0 and wlan1 interface which will fail the
+ # tearDown() test in the end that no interface is allowed to survive!
+ if not self._test_vlan:
+ self.skipTest('not supported')
+
+ for interface in self._interfaces:
+ base = self._base_path + [interface]
+ for option in self._options.get(interface, []):
+ self.cli_set(base + option.split())
+
+ for vlan in self._vlan_range:
+ base = self._base_path + [interface, 'vif', vlan]
+ for address in self._test_addr:
+ self.cli_set(base + ['address', address])
+
+ self.cli_commit()
+
+ for intf in self._interfaces:
+ for vlan in self._vlan_range:
+ vif = f'{intf}.{vlan}'
+ for address in self._test_addr:
+ self.assertTrue(is_intf_addr_assigned(vif, address))
+
+ self.assertEqual(Interface(vif).get_admin_state(), 'up')
+
+ # T4064: Delete interface addresses, keep VLAN interface
+ for interface in self._interfaces:
+ base = self._base_path + [interface]
+ for vlan in self._vlan_range:
+ base = self._base_path + [interface, 'vif', vlan]
+ self.cli_delete(base + ['address'])
+
+ self.cli_commit()
+
+ # Verify no IP address is assigned
+ for interface in self._interfaces:
+ for vlan in self._vlan_range:
+ vif = f'{intf}.{vlan}'
+ for address in self._test_addr:
+ self.assertFalse(is_intf_addr_assigned(vif, address))
+
+
+ def test_vif_8021q_mtu_limits(self):
+ # XXX: This testcase is not allowed to run as first testcase, reason
+ # is the Wireless test will first load the wifi kernel hwsim module
+ # which creates a wlan0 and wlan1 interface which will fail the
+ # tearDown() test in the end that no interface is allowed to survive!
+ if not self._test_vlan or not self._test_mtu:
+ self.skipTest('not supported')
+
+ mtu_1500 = '1500'
+ mtu_9000 = '9000'
+
+ for interface in self._interfaces:
+ base = self._base_path + [interface]
+ self.cli_set(base + ['mtu', mtu_1500])
+ for option in self._options.get(interface, []):
+ self.cli_set(base + option.split())
+ if 'source-interface' in option:
+ iface = option.split()[-1]
+ iface_type = Section.section(iface)
+ self.cli_set(['interfaces', iface_type, iface, 'mtu', mtu_9000])
+
+ for vlan in self._vlan_range:
+ base = self._base_path + [interface, 'vif', vlan]
+ self.cli_set(base + ['mtu', mtu_9000])
+
+ # check validate() - Interface MTU "9000" too high, parent interface MTU is "1500"!
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
+
+ # Change MTU on base interface to be the same as on the VIF interface
+ for interface in self._interfaces:
+ base = self._base_path + [interface]
+ self.cli_set(base + ['mtu', mtu_9000])
+
+ self.cli_commit()
+
+ # Verify MTU on base and VIF interfaces
+ for interface in self._interfaces:
+ tmp = get_interface_config(interface)
+ self.assertEqual(tmp['mtu'], int(mtu_9000))
+
+ for vlan in self._vlan_range:
+ tmp = get_interface_config(f'{interface}.{vlan}')
+ self.assertEqual(tmp['mtu'], int(mtu_9000))
+
+
+ def test_vif_8021q_qos_change(self):
+ # XXX: This testcase is not allowed to run as first testcase, reason
+ # is the Wireless test will first load the wifi kernel hwsim module
+ # which creates a wlan0 and wlan1 interface which will fail the
+ # tearDown() test in the end that no interface is allowed to survive!
+ if not self._test_vlan:
+ self.skipTest('not supported')
+
+ for interface in self._interfaces:
+ base = self._base_path + [interface]
+ for option in self._options.get(interface, []):
+ self.cli_set(base + option.split())
+
+ for vlan in self._vlan_range:
+ base = self._base_path + [interface, 'vif', vlan]
+ self.cli_set(base + ['ingress-qos', '0:1'])
+ self.cli_set(base + ['egress-qos', '1:6'])
+
+ self.cli_commit()
+
+ for intf in self._interfaces:
+ for vlan in self._vlan_range:
+ vif = f'{intf}.{vlan}'
+ tmp = get_interface_config(f'{vif}')
+
+ tmp2 = dict_search('linkinfo.info_data.ingress_qos', tmp)
+ for item in tmp2 if tmp2 else []:
+ from_key = item['from']
+ to_key = item['to']
+ self.assertEqual(from_key, 0)
+ self.assertEqual(to_key, 1)
+
+ tmp2 = dict_search('linkinfo.info_data.egress_qos', tmp)
+ for item in tmp2 if tmp2 else []:
+ from_key = item['from']
+ to_key = item['to']
+ self.assertEqual(from_key, 1)
+ self.assertEqual(to_key, 6)
+
+ self.assertEqual(Interface(vif).get_admin_state(), 'up')
+
+ new_ingress_qos_from = 1
+ new_ingress_qos_to = 6
+ new_egress_qos_from = 2
+ new_egress_qos_to = 7
+ for interface in self._interfaces:
+ base = self._base_path + [interface]
+ for vlan in self._vlan_range:
+ base = self._base_path + [interface, 'vif', vlan]
+ self.cli_set(base + ['ingress-qos', f'{new_ingress_qos_from}:{new_ingress_qos_to}'])
+ self.cli_set(base + ['egress-qos', f'{new_egress_qos_from}:{new_egress_qos_to}'])
+
+ self.cli_commit()
+
+ for intf in self._interfaces:
+ for vlan in self._vlan_range:
+ vif = f'{intf}.{vlan}'
+ tmp = get_interface_config(f'{vif}')
+
+ tmp2 = dict_search('linkinfo.info_data.ingress_qos', tmp)
+ if tmp2:
+ from_key = tmp2[0]['from']
+ to_key = tmp2[0]['to']
+ self.assertEqual(from_key, new_ingress_qos_from)
+ self.assertEqual(to_key, new_ingress_qos_to)
+
+ tmp2 = dict_search('linkinfo.info_data.egress_qos', tmp)
+ if tmp2:
+ from_key = tmp2[0]['from']
+ to_key = tmp2[0]['to']
+ self.assertEqual(from_key, new_egress_qos_from)
+ self.assertEqual(to_key, new_egress_qos_to)
+
+ def test_vif_8021q_lower_up_down(self):
+ # Testcase for https://vyos.dev/T3349
+ if not self._test_vlan:
+ self.skipTest('not supported')
+
+ for interface in self._interfaces:
+ base = self._base_path + [interface]
+ for option in self._options.get(interface, []):
+ self.cli_set(base + option.split())
+
+ # disable the lower interface
+ self.cli_set(base + ['disable'])
+
+ for vlan in self._vlan_range:
+ vlan_base = self._base_path + [interface, 'vif', vlan]
+ # disable the vlan interface
+ self.cli_set(vlan_base + ['disable'])
+
+ self.cli_commit()
+
+ # re-enable all lower interfaces
+ for interface in self._interfaces:
+ base = self._base_path + [interface]
+ self.cli_delete(base + ['disable'])
+
+ self.cli_commit()
+
+ # verify that the lower interfaces are admin up and the vlan
+ # interfaces are all admin down
+ for interface in self._interfaces:
+ self.assertEqual(Interface(interface).get_admin_state(), 'up')
+
+ for vlan in self._vlan_range:
+ ifname = f'{interface}.{vlan}'
+ self.assertEqual(Interface(ifname).get_admin_state(), 'down')
+
+
+ def test_vif_s_8021ad_vlan_interfaces(self):
+ # XXX: This testcase is not allowed to run as first testcase, reason
+ # is the Wireless test will first load the wifi kernel hwsim module
+ # which creates a wlan0 and wlan1 interface which will fail the
+ # tearDown() test in the end that no interface is allowed to survive!
+ if not self._test_qinq:
+ self.skipTest('not supported')
+
+ for interface in self._interfaces:
+ base = self._base_path + [interface]
+ for option in self._options.get(interface, []):
+ self.cli_set(base + option.split())
+
+ for vif_s in self._qinq_range:
+ for vif_c in self._vlan_range:
+ base = self._base_path + [interface, 'vif-s', vif_s, 'vif-c', vif_c]
+ self.cli_set(base + ['mtu', self._mtu])
+ for address in self._test_addr:
+ self.cli_set(base + ['address', address])
+
+ self.cli_commit()
+
+ for interface in self._interfaces:
+ for vif_s in self._qinq_range:
+ tmp = get_interface_config(f'{interface}.{vif_s}')
+ self.assertEqual(dict_search('linkinfo.info_data.protocol', tmp), '802.1ad')
+
+ for vif_c in self._vlan_range:
+ vif = f'{interface}.{vif_s}.{vif_c}'
+ # For an unknown reason this regularely fails on the QEMU builds,
+ # thus the test for reading back IP addresses is temporary
+ # disabled. There is no big deal here, as this uses the same
+ # methods on 802.1q and here it works and is verified.
+# for address in self._test_addr:
+# self.assertTrue(is_intf_addr_assigned(vif, address))
+
+ tmp = get_interface_config(vif)
+ self.assertEqual(tmp['mtu'], int(self._mtu))
+
+
+ # T4064: Delete interface addresses, keep VLAN interface
+ for interface in self._interfaces:
+ base = self._base_path + [interface]
+ for vif_s in self._qinq_range:
+ for vif_c in self._vlan_range:
+ self.cli_delete(self._base_path + [interface, 'vif-s', vif_s, 'vif-c', vif_c, 'address'])
+
+ self.cli_commit()
+ # Verify no IP address is assigned
+ for interface in self._interfaces:
+ base = self._base_path + [interface]
+ for vif_s in self._qinq_range:
+ for vif_c in self._vlan_range:
+ vif = f'{interface}.{vif_s}.{vif_c}'
+ for address in self._test_addr:
+ self.assertFalse(is_intf_addr_assigned(vif, address))
+
+ # T3972: remove vif-c interfaces from vif-s
+ for interface in self._interfaces:
+ base = self._base_path + [interface]
+ for vif_s in self._qinq_range:
+ base = self._base_path + [interface, 'vif-s', vif_s, 'vif-c']
+ self.cli_delete(base)
+
+ self.cli_commit()
+
+
+ def test_vif_s_protocol_change(self):
+ # XXX: This testcase is not allowed to run as first testcase, reason
+ # is the Wireless test will first load the wifi kernel hwsim module
+ # which creates a wlan0 and wlan1 interface which will fail the
+ # tearDown() test in the end that no interface is allowed to survive!
+ if not self._test_qinq:
+ self.skipTest('not supported')
+
+ for interface in self._interfaces:
+ base = self._base_path + [interface]
+ for option in self._options.get(interface, []):
+ self.cli_set(base + option.split())
+
+ for vif_s in self._qinq_range:
+ for vif_c in self._vlan_range:
+ base = self._base_path + [interface, 'vif-s', vif_s, 'vif-c', vif_c]
+ for address in self._test_addr:
+ self.cli_set(base + ['address', address])
+
+ self.cli_commit()
+
+ for interface in self._interfaces:
+ for vif_s in self._qinq_range:
+ tmp = get_interface_config(f'{interface}.{vif_s}')
+ # check for the default value
+ self.assertEqual(tmp['linkinfo']['info_data']['protocol'], '802.1ad')
+
+ # T3532: now change ethertype
+ new_protocol = '802.1q'
+ for interface in self._interfaces:
+ for vif_s in self._qinq_range:
+ base = self._base_path + [interface, 'vif-s', vif_s]
+ self.cli_set(base + ['protocol', new_protocol])
+
+ self.cli_commit()
+
+ # Verify new ethertype configuration
+ for interface in self._interfaces:
+ for vif_s in self._qinq_range:
+ tmp = get_interface_config(f'{interface}.{vif_s}')
+ self.assertEqual(tmp['linkinfo']['info_data']['protocol'], new_protocol.upper())
+
+ def test_interface_ip_options(self):
+ if not self._test_ip:
+ self.skipTest('not supported')
+
+ arp_tmo = '300'
+ mss = '1420'
+
+ for interface in self._interfaces:
+ path = self._base_path + [interface]
+ for option in self._options.get(interface, []):
+ self.cli_set(path + option.split())
+
+ # Options
+ if cli_defined(self._base_path + ['ip'], 'adjust-mss'):
+ self.cli_set(path + ['ip', 'adjust-mss', mss])
+
+ if cli_defined(self._base_path + ['ip'], 'arp-cache-timeout'):
+ self.cli_set(path + ['ip', 'arp-cache-timeout', arp_tmo])
+
+ if cli_defined(self._base_path + ['ip'], 'disable-arp-filter'):
+ self.cli_set(path + ['ip', 'disable-arp-filter'])
+
+ if cli_defined(self._base_path + ['ip'], 'disable-forwarding'):
+ self.cli_set(path + ['ip', 'disable-forwarding'])
+
+ if cli_defined(self._base_path + ['ip'], 'enable-directed-broadcast'):
+ self.cli_set(path + ['ip', 'enable-directed-broadcast'])
+
+ if cli_defined(self._base_path + ['ip'], 'enable-arp-accept'):
+ self.cli_set(path + ['ip', 'enable-arp-accept'])
+
+ if cli_defined(self._base_path + ['ip'], 'enable-arp-announce'):
+ self.cli_set(path + ['ip', 'enable-arp-announce'])
+
+ if cli_defined(self._base_path + ['ip'], 'enable-arp-ignore'):
+ self.cli_set(path + ['ip', 'enable-arp-ignore'])
+
+ if cli_defined(self._base_path + ['ip'], 'enable-proxy-arp'):
+ self.cli_set(path + ['ip', 'enable-proxy-arp'])
+
+ if cli_defined(self._base_path + ['ip'], 'proxy-arp-pvlan'):
+ self.cli_set(path + ['ip', 'proxy-arp-pvlan'])
+
+ if cli_defined(self._base_path + ['ip'], 'source-validation'):
+ self.cli_set(path + ['ip', 'source-validation', 'loose'])
+
+ self.cli_commit()
+
+ for interface in self._interfaces:
+ if cli_defined(self._base_path + ['ip'], 'adjust-mss'):
+ base_options = f'oifname "{interface}"'
+ out = cmd('sudo nft list chain raw VYOS_TCP_MSS')
+ for line in out.splitlines():
+ if line.startswith(base_options):
+ self.assertIn(f'tcp option maxseg size set {mss}', line)
+
+ if cli_defined(self._base_path + ['ip'], 'arp-cache-timeout'):
+ tmp = read_file(f'/proc/sys/net/ipv4/neigh/{interface}/base_reachable_time_ms')
+ self.assertEqual(tmp, str((int(arp_tmo) * 1000))) # tmo value is in milli seconds
+
+ proc_base = f'/proc/sys/net/ipv4/conf/{interface}'
+
+ if cli_defined(self._base_path + ['ip'], 'disable-arp-filter'):
+ tmp = read_file(f'{proc_base}/arp_filter')
+ self.assertEqual('0', tmp)
+
+ if cli_defined(self._base_path + ['ip'], 'enable-arp-accept'):
+ tmp = read_file(f'{proc_base}/arp_accept')
+ self.assertEqual('1', tmp)
+
+ if cli_defined(self._base_path + ['ip'], 'enable-arp-announce'):
+ tmp = read_file(f'{proc_base}/arp_announce')
+ self.assertEqual('1', tmp)
+
+ if cli_defined(self._base_path + ['ip'], 'enable-arp-ignore'):
+ tmp = read_file(f'{proc_base}/arp_ignore')
+ self.assertEqual('1', tmp)
+
+ if cli_defined(self._base_path + ['ip'], 'disable-forwarding'):
+ tmp = read_file(f'{proc_base}/forwarding')
+ self.assertEqual('0', tmp)
+
+ if cli_defined(self._base_path + ['ip'], 'enable-directed-broadcast'):
+ tmp = read_file(f'{proc_base}/bc_forwarding')
+ self.assertEqual('1', tmp)
+
+ if cli_defined(self._base_path + ['ip'], 'enable-proxy-arp'):
+ tmp = read_file(f'{proc_base}/proxy_arp')
+ self.assertEqual('1', tmp)
+
+ if cli_defined(self._base_path + ['ip'], 'proxy-arp-pvlan'):
+ tmp = read_file(f'{proc_base}/proxy_arp_pvlan')
+ self.assertEqual('1', tmp)
+
+ if cli_defined(self._base_path + ['ip'], 'source-validation'):
+ base_options = f'iifname "{interface}"'
+ out = cmd('sudo nft list chain ip raw vyos_rpfilter')
+ for line in out.splitlines():
+ if line.startswith(base_options):
+ self.assertIn('fib saddr oif 0', line)
+ self.assertIn('drop', line)
+
+ def test_interface_ipv6_options(self):
+ if not self._test_ipv6:
+ self.skipTest('not supported')
+
+ mss = '1400'
+ dad_transmits = '10'
+ accept_dad = '0'
+ source_validation = 'strict'
+
+ for interface in self._interfaces:
+ path = self._base_path + [interface]
+ for option in self._options.get(interface, []):
+ self.cli_set(path + option.split())
+
+ # Options
+ if cli_defined(self._base_path + ['ipv6'], 'adjust-mss'):
+ self.cli_set(path + ['ipv6', 'adjust-mss', mss])
+
+ if cli_defined(self._base_path + ['ipv6'], 'accept-dad'):
+ self.cli_set(path + ['ipv6', 'accept-dad', accept_dad])
+
+ if cli_defined(self._base_path + ['ipv6'], 'dup-addr-detect-transmits'):
+ self.cli_set(path + ['ipv6', 'dup-addr-detect-transmits', dad_transmits])
+
+ if cli_defined(self._base_path + ['ipv6'], 'disable-forwarding'):
+ self.cli_set(path + ['ipv6', 'disable-forwarding'])
+
+ if cli_defined(self._base_path + ['ipv6'], 'source-validation'):
+ self.cli_set(path + ['ipv6', 'source-validation', source_validation])
+
+ self.cli_commit()
+
+ for interface in self._interfaces:
+ proc_base = f'/proc/sys/net/ipv6/conf/{interface}'
+ if cli_defined(self._base_path + ['ipv6'], 'adjust-mss'):
+ base_options = f'oifname "{interface}"'
+ out = cmd('sudo nft list chain ip6 raw VYOS_TCP_MSS')
+ for line in out.splitlines():
+ if line.startswith(base_options):
+ self.assertIn(f'tcp option maxseg size set {mss}', line)
+
+ if cli_defined(self._base_path + ['ipv6'], 'accept-dad'):
+ tmp = read_file(f'{proc_base}/accept_dad')
+ self.assertEqual(accept_dad, tmp)
+
+ if cli_defined(self._base_path + ['ipv6'], 'dup-addr-detect-transmits'):
+ tmp = read_file(f'{proc_base}/dad_transmits')
+ self.assertEqual(dad_transmits, tmp)
+
+ if cli_defined(self._base_path + ['ipv6'], 'disable-forwarding'):
+ tmp = read_file(f'{proc_base}/forwarding')
+ self.assertEqual('0', tmp)
+
+ if cli_defined(self._base_path + ['ipv6'], 'source-validation'):
+ base_options = f'iifname "{interface}"'
+ out = cmd('sudo nft list chain ip6 raw vyos_rpfilter')
+ for line in out.splitlines():
+ if line.startswith(base_options):
+ self.assertIn('fib saddr . iif oif 0', line)
+ self.assertIn('drop', line)
+
+ def test_dhcpv6_client_options(self):
+ if not self._test_ipv6_dhcpc6:
+ self.skipTest('not supported')
+
+ duid_base = 10
+ for interface in self._interfaces:
+ duid = '00:01:00:01:27:71:db:f0:00:50:00:00:00:{}'.format(duid_base)
+ path = self._base_path + [interface]
+ for option in self._options.get(interface, []):
+ self.cli_set(path + option.split())
+
+ # Enable DHCPv6 client
+ self.cli_set(path + ['address', 'dhcpv6'])
+ self.cli_set(path + ['dhcpv6-options', 'no-release'])
+ self.cli_set(path + ['dhcpv6-options', 'rapid-commit'])
+ self.cli_set(path + ['dhcpv6-options', 'parameters-only'])
+ self.cli_set(path + ['dhcpv6-options', 'duid', duid])
+ duid_base += 1
+
+ self.cli_commit()
+
+ duid_base = 10
+ for interface in self._interfaces:
+ duid = '00:01:00:01:27:71:db:f0:00:50:00:00:00:{}'.format(duid_base)
+ dhcpc6_config = read_file(f'{dhcp6c_base_dir}/dhcp6c.{interface}.conf')
+ self.assertIn(f'interface {interface} ' + '{', dhcpc6_config)
+ self.assertIn(f' request domain-name-servers;', dhcpc6_config)
+ self.assertIn(f' request domain-name;', dhcpc6_config)
+ self.assertIn(f' information-only;', dhcpc6_config)
+ self.assertIn(f' send ia-na 0;', dhcpc6_config)
+ self.assertIn(f' send rapid-commit;', dhcpc6_config)
+ self.assertIn(f' send client-id {duid};', dhcpc6_config)
+ self.assertIn('};', dhcpc6_config)
+ duid_base += 1
+
+ # Better ask the process about it's commandline in the future
+ pid = process_named_running(dhcp6c_process_name, cmdline=interface, timeout=10)
+ self.assertTrue(pid)
+
+ dhcp6c_options = read_file(f'/proc/{pid}/cmdline')
+ self.assertIn('-n', dhcp6c_options)
+
+ def test_dhcpv6pd_auto_sla_id(self):
+ if not self._test_ipv6_pd:
+ self.skipTest('not supported')
+
+ prefix_len = '56'
+ sla_len = str(64 - int(prefix_len))
+
+ # Create delegatee interfaces first to avoid any confusion by dhcpc6
+ # this is mainly an "issue" with virtual-ethernet interfaces
+ delegatees = ['dum2340', 'dum2341', 'dum2342', 'dum2343', 'dum2344']
+ for delegatee in delegatees:
+ section = Section.section(delegatee)
+ self.cli_set(['interfaces', section, delegatee])
+
+ self.cli_commit()
+
+ for interface in self._interfaces:
+ path = self._base_path + [interface]
+ for option in self._options.get(interface, []):
+ self.cli_set(path + option.split())
+
+ address = '1'
+ # prefix delegation stuff
+ pd_base = path + ['dhcpv6-options', 'pd', '0']
+ self.cli_set(pd_base + ['length', prefix_len])
+
+ for delegatee in delegatees:
+ self.cli_set(pd_base + ['interface', delegatee, 'address', address])
+ # increment interface address
+ address = str(int(address) + 1)
+
+ self.cli_commit()
+
+ for interface in self._interfaces:
+ dhcpc6_config = read_file(f'{dhcp6c_base_dir}/dhcp6c.{interface}.conf')
+
+ # verify DHCPv6 prefix delegation
+ self.assertIn(f'prefix ::/{prefix_len} infinity;', dhcpc6_config)
+
+ address = '1'
+ sla_id = '0'
+ for delegatee in delegatees:
+ self.assertIn(f'prefix-interface {delegatee}' + r' {', dhcpc6_config)
+ self.assertIn(f'ifid {address};', dhcpc6_config)
+ self.assertIn(f'sla-id {sla_id};', dhcpc6_config)
+ self.assertIn(f'sla-len {sla_len};', dhcpc6_config)
+
+ # increment sla-id
+ sla_id = str(int(sla_id) + 1)
+ # increment interface address
+ address = str(int(address) + 1)
+
+ # Check for running process
+ self.assertTrue(process_named_running(dhcp6c_process_name, cmdline=interface, timeout=10))
+
+ for delegatee in delegatees:
+ # we can already cleanup the test delegatee interface here
+ # as until commit() is called, nothing happens
+ section = Section.section(delegatee)
+ self.cli_delete(['interfaces', section, delegatee])
+
+ def test_dhcpv6pd_manual_sla_id(self):
+ if not self._test_ipv6_pd:
+ self.skipTest('not supported')
+
+ prefix_len = '56'
+ sla_len = str(64 - int(prefix_len))
+
+ # Create delegatee interfaces first to avoid any confusion by dhcpc6
+ # this is mainly an "issue" with virtual-ethernet interfaces
+ delegatees = ['dum3340', 'dum3341', 'dum3342', 'dum3343', 'dum3344']
+ for delegatee in delegatees:
+ section = Section.section(delegatee)
+ self.cli_set(['interfaces', section, delegatee])
+
+ self.cli_commit()
+
+ for interface in self._interfaces:
+ path = self._base_path + [interface]
+ for option in self._options.get(interface, []):
+ self.cli_set(path + option.split())
+
+ # prefix delegation stuff
+ address = '1'
+ sla_id = '1'
+ pd_base = path + ['dhcpv6-options', 'pd', '0']
+ self.cli_set(pd_base + ['length', prefix_len])
+
+ for delegatee in delegatees:
+ self.cli_set(pd_base + ['interface', delegatee, 'address', address])
+ self.cli_set(pd_base + ['interface', delegatee, 'sla-id', sla_id])
+
+ # increment interface address
+ address = str(int(address) + 1)
+ sla_id = str(int(sla_id) + 1)
+
+ self.cli_commit()
+
+ # Verify dhcpc6 client configuration
+ for interface in self._interfaces:
+ address = '1'
+ sla_id = '1'
+ dhcpc6_config = read_file(f'{dhcp6c_base_dir}/dhcp6c.{interface}.conf')
+
+ # verify DHCPv6 prefix delegation
+ self.assertIn(f'prefix ::/{prefix_len} infinity;', dhcpc6_config)
+
+ for delegatee in delegatees:
+ self.assertIn(f'prefix-interface {delegatee}' + r' {', dhcpc6_config)
+ self.assertIn(f'ifid {address};', dhcpc6_config)
+ self.assertIn(f'sla-id {sla_id};', dhcpc6_config)
+ self.assertIn(f'sla-len {sla_len};', dhcpc6_config)
+
+ # increment sla-id
+ sla_id = str(int(sla_id) + 1)
+ # increment interface address
+ address = str(int(address) + 1)
+
+ # Check for running process
+ self.assertTrue(process_named_running(dhcp6c_process_name, cmdline=interface, timeout=10))
+
+ for delegatee in delegatees:
+ # we can already cleanup the test delegatee interface here
+ # as until commit() is called, nothing happens
+ section = Section.section(delegatee)
+ self.cli_delete(['interfaces', section, delegatee])
+
+ def test_eapol(self):
+ if not self._test_eapol:
+ self.skipTest('not supported')
+
+ cfg_dir = '/run/wpa_supplicant'
+
+ ca_certs = {
+ 'eapol-server-ca-root': server_ca_root_cert_data,
+ 'eapol-server-ca-intermediate': server_ca_intermediate_cert_data,
+ 'eapol-client-ca-root': client_ca_root_cert_data,
+ 'eapol-client-ca-intermediate': client_ca_intermediate_cert_data,
+ }
+ cert_name = 'eapol-client'
+
+ for name, data in ca_certs.items():
+ self.cli_set(['pki', 'ca', name, 'certificate', data.replace('\n','')])
+
+ self.cli_set(['pki', 'certificate', cert_name, 'certificate', client_cert_data.replace('\n','')])
+ self.cli_set(['pki', 'certificate', cert_name, 'private', 'key', client_key_data.replace('\n','')])
+
+ for interface in self._interfaces:
+ path = self._base_path + [interface]
+ for option in self._options.get(interface, []):
+ self.cli_set(path + option.split())
+
+ # Enable EAPoL
+ self.cli_set(self._base_path + [interface, 'eapol', 'ca-certificate', 'eapol-server-ca-intermediate'])
+ self.cli_set(self._base_path + [interface, 'eapol', 'ca-certificate', 'eapol-client-ca-intermediate'])
+ self.cli_set(self._base_path + [interface, 'eapol', 'certificate', cert_name])
+
+ self.cli_commit()
+
+ # Test multiple CA chains
+ self.assertEqual(get_certificate_count(interface, 'ca'), 4)
+
+ for interface in self._interfaces:
+ self.cli_delete(self._base_path + [interface, 'eapol', 'ca-certificate', 'eapol-client-ca-intermediate'])
+
+ self.cli_commit()
+
+ # Validate interface config
+ for interface in self._interfaces:
+ tmp = get_wpa_supplicant_value(interface, 'key_mgmt')
+ self.assertEqual('IEEE8021X', tmp)
+
+ tmp = get_wpa_supplicant_value(interface, 'eap')
+ self.assertEqual('TLS', tmp)
+
+ tmp = get_wpa_supplicant_value(interface, 'eapol_flags')
+ self.assertEqual('0', tmp)
+
+ tmp = get_wpa_supplicant_value(interface, 'ca_cert')
+ self.assertEqual(f'"{cfg_dir}/{interface}_ca.pem"', tmp)
+
+ tmp = get_wpa_supplicant_value(interface, 'client_cert')
+ self.assertEqual(f'"{cfg_dir}/{interface}_cert.pem"', tmp)
+
+ tmp = get_wpa_supplicant_value(interface, 'private_key')
+ self.assertEqual(f'"{cfg_dir}/{interface}_cert.key"', tmp)
+
+ mac = read_file(f'/sys/class/net/{interface}/address')
+ tmp = get_wpa_supplicant_value(interface, 'identity')
+ self.assertEqual(f'"{mac}"', tmp)
+
+ # Check certificate files have the full chain
+ self.assertEqual(get_certificate_count(interface, 'ca'), 2)
+ self.assertEqual(get_certificate_count(interface, 'cert'), 3)
+
+ # Check for running process
+ self.assertTrue(process_named_running('wpa_supplicant', cmdline=f'-i{interface}'))
+
+ # Remove EAPoL configuration
+ for interface in self._interfaces:
+ self.cli_delete(self._base_path + [interface, 'eapol'])
+
+ # Commit and check that process is no longer running
+ self.cli_commit()
+ self.assertFalse(process_named_running('wpa_supplicant'))
+
+ for name in ca_certs:
+ self.cli_delete(['pki', 'ca', name])
+ self.cli_delete(['pki', 'certificate', cert_name])