From 29e32d39ddd9cc1942f1826016d44897930b4248 Mon Sep 17 00:00:00 2001 From: Christian Poessinger Date: Tue, 29 Dec 2020 20:39:25 +0100 Subject: smoketest: T1466: add eapol tests --- smoketest/scripts/cli/base_interfaces_test.py | 85 ++++++--------- smoketest/scripts/cli/test_interfaces_ethernet.py | 127 ++++++++++++++++++---- 2 files changed, 140 insertions(+), 72 deletions(-) diff --git a/smoketest/scripts/cli/base_interfaces_test.py b/smoketest/scripts/cli/base_interfaces_test.py index 0ae55a22a..a784140f3 100644 --- a/smoketest/scripts/cli/base_interfaces_test.py +++ b/smoketest/scripts/cli/base_interfaces_test.py @@ -68,25 +68,16 @@ class BasicInterfaceTest: self._options = {} def tearDown(self): - # we should not remove ethernet from the overall CLI - if 'ethernet' in self._base_path: - for interface in self._interfaces: - # when using a dedicated interface to test via TEST_ETH environment - # variable only this one will be cleared in the end - usable to test - # ethernet interfaces via SSH - self.session.delete(self._base_path + [interface]) - self.session.set(self._base_path + [interface, 'duplex', 'auto']) - self.session.set(self._base_path + [interface, 'speed', 'auto']) - else: + # Ethernet is handled in its derived class + if 'ethernet' not in self._base_path: self.session.delete(self._base_path) self.session.commit() del self.session - def test_add_description(self): - """ - Check if description can be added to interface - """ + def test_interface_description(self): + # Check if description can be added to interface and + # can be read back for intf in self._interfaces: test_string=f'Description-Test-{intf}' self.session.set(self._base_path + [intf, 'description', test_string]) @@ -98,14 +89,10 @@ class BasicInterfaceTest: # Validate interface description for intf in self._interfaces: test_string=f'Description-Test-{intf}' - with open(f'/sys/class/net/{intf}/ifalias', 'r') as f: - tmp = f.read().rstrip() - self.assertTrue(tmp, test_string) - - def test_add_address_single(self): - """ - Check if a single address can be added to interface. - """ + tmp = read_file(f'/sys/class/net/{intf}/ifalias') + self.assertTrue(tmp, test_string) + + def test_add_single_ip_address(self): addr = '192.0.2.0/31' for intf in self._interfaces: self.session.set(self._base_path + [intf, 'address', addr]) @@ -117,11 +104,7 @@ class BasicInterfaceTest: for intf in self._interfaces: self.assertTrue(is_intf_addr_assigned(intf, addr)) - def test_add_address_multi(self): - """ - Check if IPv4/IPv6 addresses can be added to interface. - """ - + def test_add_multiple_ip_addresses(self): # Add address for intf in self._interfaces: for addr in self._test_addr: @@ -141,8 +124,8 @@ class BasicInterfaceTest: self.assertTrue(is_intf_addr_assigned(intf, addr['addr'])) - def test_ipv6_link_local(self): - """ Common function for IPv6 link-local address assignemnts """ + def test_ipv6_link_local_address(self): + # Common function for IPv6 link-local address assignemnts if not self._test_ipv6: return None @@ -169,14 +152,7 @@ class BasicInterfaceTest: for interface in self._interfaces: self.assertTrue(AF_INET6 not in ifaddresses(interface)) - def _mtu_test(self, intf): - """ helper function to verify MTU size """ - with open(f'/sys/class/net/{intf}/mtu', 'r') as f: - tmp = f.read().rstrip() - self.assertEqual(tmp, self._mtu) - - def test_change_mtu(self): - """ Testcase if MTU can be changed on interface """ + def test_interface_mtu(self): if not self._test_mtu: return None @@ -191,10 +167,12 @@ class BasicInterfaceTest: # verify changed MTU for intf in self._interfaces: - self._mtu_test(intf) + tmp = read_file(f'/sys/class/net/{intf}/mtu') + self.assertEqual(tmp, self._mtu) - def test_change_mtu_1200(self): - """ Testcase if MTU can be changed to 1200 on non IPv6 enabled interfaces """ + def test_mtu_1200_no_ipv6_interface(self): + # Testcase if MTU can be changed to 1200 on non IPv6 + # enabled interfaces if not self._test_mtu: return None @@ -214,12 +192,12 @@ class BasicInterfaceTest: # verify changed MTU for intf in self._interfaces: - self._mtu_test(intf) + tmp = read_file(f'/sys/class/net/{intf}/mtu') + self.assertEqual(tmp, self._mtu) self._mtu = old_mtu - def test_8021q_vlan(self): - """ Testcase for 802.1q VLAN interfaces """ + def test_8021q_vlan_interfaces(self): if not self._test_vlan: return None @@ -235,16 +213,18 @@ class BasicInterfaceTest: self.session.set(base + ['address', address]) self.session.commit() + for intf in self._interfaces: for vlan in self._vlan_range: vif = f'{intf}.{vlan}' for address in self._test_addr: self.assertTrue(is_intf_addr_assigned(vif, address)) - self._mtu_test(vif) + tmp = read_file(f'/sys/class/net/{vif}/mtu') + self.assertEqual(tmp, self._mtu) - def test_8021ad_qinq_vlan(self): - """ Testcase for 802.1ad Q-in-Q VLAN interfaces """ + + def test_8021ad_qinq_vlan_interfaces(self): if not self._test_qinq: return None @@ -271,10 +251,11 @@ class BasicInterfaceTest: vif = f'{interface}.{vif_s}.{vif_c}' for address in self._test_addr: self.assertTrue(is_intf_addr_assigned(vif, address)) - self._mtu_test(vif) - def test_ip_options(self): - """ Test interface base IPv4 options """ + tmp = read_file(f'/sys/class/net/{vif}/mtu') + self.assertEqual(tmp, self._mtu) + + def test_interface_ip_options(self): if not self._test_ip: return None @@ -325,8 +306,7 @@ class BasicInterfaceTest: tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/rp_filter') self.assertEqual('2', tmp) - def test_ipv6_options(self): - """ Test interface base IPv6 options """ + def test_interface_ipv6_options(self): if not self._test_ipv6: return None @@ -350,8 +330,7 @@ class BasicInterfaceTest: self.assertEqual(dad_transmits, tmp) - def test_ipv6_dhcpv6_pd(self): - """ Test interface base IPv6 options """ + def test_ipv6_dhcpv6_prefix_delegation(self): if not self._test_ipv6: return None diff --git a/smoketest/scripts/cli/test_interfaces_ethernet.py b/smoketest/scripts/cli/test_interfaces_ethernet.py index bdb20a5c7..d5dcdc536 100755 --- a/smoketest/scripts/cli/test_interfaces_ethernet.py +++ b/smoketest/scripts/cli/test_interfaces_ethernet.py @@ -15,10 +15,23 @@ # along with this program. If not, see . import os +import re import unittest from base_interfaces_test import BasicInterfaceTest from vyos.ifconfig import Section +from vyos.util import cmd +from vyos.util import process_named_running +from vyos.util import read_file + +ca_cert = '/config/auth/eapol_test_ca.pem' +ssl_cert = '/config/auth/eapol_test_server.pem' +ssl_key = '/config/auth/eapol_test_server.key' + +def get_wpa_supplicant_value(interface, key): + tmp = read_file(f'/run/wpa_supplicant/{interface}.conf') + tmp = re.findall(r'\n?{}=(.*)'.format(key), tmp) + return tmp[0] class EthernetInterfaceTest(BasicInterfaceTest.BaseTest): def setUp(self): @@ -43,27 +56,103 @@ class EthernetInterfaceTest(BasicInterfaceTest.BaseTest): if not '.' in tmp: self._interfaces.append(tmp) - def test_dhcp_disable(self): - """ - When interface is configured as admin down, it must be admin down even - """ - for interface in self._interfaces: - self.session.set(self._base_path + [interface, 'disable']) - for option in self._options.get(interface, []): - self.session.set(self._base_path + [interface] + option.split()) + self._macs = {} + for interface in self._interfaces: + try: + mac = self.session.show_config(self._base_path + + [interface, 'hw-id']).split()[1] + except: + # during initial system startup there is no hw-id node + mac = read_file(f'/sys/class/net/{interface}/address') + self._macs[interface] = mac + + + def tearDown(self): + for interface in self._interfaces: + # when using a dedicated interface to test via TEST_ETH environment + # variable only this one will be cleared in the end - usable to test + # ethernet interfaces via SSH + self.session.delete(self._base_path + [interface]) + self.session.set(self._base_path + [interface, 'duplex', 'auto']) + self.session.set(self._base_path + [interface, 'speed', 'auto']) + self.session.set(self._base_path + [interface, 'hw-id', self._macs[interface]]) + + super().tearDown() + + + def test_dhcp_disable_interface(self): + # When interface is configured as admin down, it must be admin down + # even when dhcpc starts on the given interface + for interface in self._interfaces: + self.session.set(self._base_path + [interface, 'disable']) + + # Also enable DHCP (ISC DHCP always places interface in admin up + # state so we check that we do not start DHCP client. + # https://phabricator.vyos.net/T2767 + self.session.set(self._base_path + [interface, 'address', 'dhcp']) + + self.session.commit() + + # Validate interface state + for interface in self._interfaces: + with open(f'/sys/class/net/{interface}/flags', 'r') as f: + flags = f.read() + self.assertEqual(int(flags, 16) & 1, 0) + - # Also enable DHCP (ISC DHCP always places interface in admin up - # state so we check that we do not start DHCP client. - # https://phabricator.vyos.net/T2767 - self.session.set(self._base_path + [interface, 'address', 'dhcp']) + def test_eapol_support(self): + for interface in self._interfaces: + # Enable EAPoL + self.session.set(self._base_path + [interface, 'eapol', 'ca-cert-file', ca_cert]) + self.session.set(self._base_path + [interface, 'eapol', 'cert-file', ssl_cert]) + self.session.set(self._base_path + [interface, 'eapol', 'key-file', ssl_key]) - self.session.commit() + self.session.commit() - # Validate interface state - for interface in self._interfaces: - with open(f'/sys/class/net/{interface}/flags', 'r') as f: - flags = f.read() - self.assertEqual(int(flags, 16) & 1, 0) + # Check for running process + self.assertTrue(process_named_running('wpa_supplicant')) + + # Validate interface config + for interface in self._interfaces: + tmp = get_wpa_supplicant_value(interface, 'key_mgmt') + self.assertEqual('IEEE8021X', tmp) + + tmp = get_wpa_supplicant_value(interface, 'eap') + self.assertEqual('TLS', tmp) + + tmp = get_wpa_supplicant_value(interface, 'eapol_flags') + self.assertEqual('0', tmp) + + tmp = get_wpa_supplicant_value(interface, 'ca_cert') + self.assertEqual(f'"{ca_cert}"', tmp) + + tmp = get_wpa_supplicant_value(interface, 'client_cert') + self.assertEqual(f'"{ssl_cert}"', tmp) + + tmp = get_wpa_supplicant_value(interface, 'private_key') + self.assertEqual(f'"{ssl_key}"', tmp) + + mac = read_file(f'/sys/class/net/{interface}/address') + tmp = get_wpa_supplicant_value(interface, 'identity') + self.assertEqual(f'"{mac}"', tmp) if __name__ == '__main__': - unittest.main() + # Our SSL certificates need a subject ... + subject = '/C=DE/ST=BY/O=VyOS/localityName=Cloud/commonName=vyos/' \ + 'organizationalUnitName=VyOS/emailAddress=maintainers@vyos.io/' + + if not (os.path.isfile(ssl_key) and os.path.isfile(ssl_cert)): + # Generate mandatory SSL certificate + tmp = f'openssl req -newkey rsa:4096 -new -nodes -x509 -days 3650 '\ + f'-keyout {ssl_key} -out {ssl_cert} -subj {subject}' + print(cmd(tmp)) + + if not os.path.isfile(ca_cert): + # Generate "CA" + tmp = f'openssl req -new -x509 -key {ssl_key} -out {ca_cert} -subj {subject}' + print(cmd(tmp)) + + for file in [ca_cert, ssl_cert, ssl_key]: + cmd(f'sudo chown radius_priv_user:vyattacfg {file}') + + unittest.main(verbosity=2) -- cgit v1.2.3