diff options
Diffstat (limited to 'smoketest/scripts')
-rwxr-xr-x | smoketest/scripts/cli/test_firewall.py | 8 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_interfaces_bridge.py | 137 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_nat66.py | 48 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_protocols_bgp.py | 11 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_service_dns_forwarding.py | 2 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_service_monitoring_telegraf.py | 2 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_vpn_openconnect.py | 79 |
7 files changed, 201 insertions, 86 deletions
diff --git a/smoketest/scripts/cli/test_firewall.py b/smoketest/scripts/cli/test_firewall.py index 4de90e1ec..684a07681 100755 --- a/smoketest/scripts/cli/test_firewall.py +++ b/smoketest/scripts/cli/test_firewall.py @@ -177,6 +177,7 @@ class TestFirewall(VyOSUnitTestSHIM.TestCase): self.verify_nftables(nftables_search, 'ip filter') def test_basic_rules(self): + mss_range = '501-1460' self.cli_set(['firewall', 'name', 'smoketest', 'default-action', 'drop']) self.cli_set(['firewall', 'name', 'smoketest', 'enable-default-log']) self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'action', 'accept']) @@ -203,6 +204,10 @@ class TestFirewall(VyOSUnitTestSHIM.TestCase): self.cli_set(['firewall', 'name', 'smoketest', 'rule', '4', 'destination', 'port', '22']) self.cli_set(['firewall', 'name', 'smoketest', 'rule', '4', 'recent', 'count', '10']) self.cli_set(['firewall', 'name', 'smoketest', 'rule', '4', 'recent', 'time', 'minute']) + self.cli_set(['firewall', 'name', 'smoketest', 'rule', '5', 'action', 'accept']) + self.cli_set(['firewall', 'name', 'smoketest', 'rule', '5', 'protocol', 'tcp']) + self.cli_set(['firewall', 'name', 'smoketest', 'rule', '5', 'tcp', 'flags', 'syn']) + self.cli_set(['firewall', 'name', 'smoketest', 'rule', '5', 'tcp', 'mss', mss_range]) self.cli_set(['interfaces', 'ethernet', 'eth0', 'firewall', 'in', 'name', 'smoketest']) @@ -214,7 +219,8 @@ class TestFirewall(VyOSUnitTestSHIM.TestCase): ['tcp flags & (syn | ack) == syn', 'tcp dport { 8888 }', 'log prefix "[smoketest-2-R]" level err', 'ip ttl > 102', 'reject'], ['tcp dport { 22 }', 'limit rate 5/minute', 'return'], ['log prefix "[smoketest-default-D]"','smoketest default-action', 'drop'], - ['tcp dport { 22 }', 'add @RECENT_smoketest_4 { ip saddr limit rate over 10/minute burst 10 packets }', 'drop'] + ['tcp dport { 22 }', 'add @RECENT_smoketest_4 { ip saddr limit rate over 10/minute burst 10 packets }', 'drop'], + [f'tcp flags & syn == syn tcp option maxseg size {mss_range}'] ] self.verify_nftables(nftables_search, 'ip filter') diff --git a/smoketest/scripts/cli/test_interfaces_bridge.py b/smoketest/scripts/cli/test_interfaces_bridge.py index 8f711af20..6d7af78eb 100755 --- a/smoketest/scripts/cli/test_interfaces_bridge.py +++ b/smoketest/scripts/cli/test_interfaces_bridge.py @@ -19,6 +19,7 @@ import json import unittest from base_interfaces_test import BasicInterfaceTest +from copy import deepcopy from glob import glob from netifaces import interfaces @@ -224,85 +225,78 @@ class BridgeInterfaceTest(BasicInterfaceTest.TestCase): super().test_vif_8021q_mtu_limits() def test_bridge_vlan_filter(self): - def _verify_members() -> None: - # check member interfaces are added on the bridge - for interface in self._interfaces: - bridge_members = [] - for tmp in glob(f'/sys/class/net/{interface}/lower_*'): - bridge_members.append(os.path.basename(tmp).replace('lower_', '')) - - # We can not use assertListEqual() b/c the position of the interface - # names within the list is not fixed - self.assertEqual(len(self._members), len(bridge_members)) - for member in self._members: - self.assertIn(member, bridge_members) - - def _check_vlan_filter() -> None: - for interface in self._interfaces: - tmp = cmd(f'bridge -j vlan show dev {interface}') - tmp = json.loads(tmp) - self.assertIsNotNone(tmp) - - for interface_status in tmp: - ifname = interface_status['ifname'] - for interface in self._members: - vlan_success = 0; - if interface == ifname: - vlans_status = interface_status['vlans'] - for vlan_status in vlans_status: - vlan_id = vlan_status['vlan'] - flag_num = 0 - if 'flags' in vlan_status: - flags = vlan_status['flags'] - for flag in flags: - flag_num = flag_num +1 - if vlan_id == 2: - if flag_num == 0: - vlan_success = vlan_success + 1 - else: - for id in range(4,10): - if vlan_id == id: - if flag_num == 0: - vlan_success = vlan_success + 1 - if vlan_id >= 101: - if flag_num == 2: - vlan_success = vlan_success + 1 - self.assertGreaterEqual(vlan_success, 7) - - vif_vlan = 2 + vifs = ['10', '20', '30', '40'] + native_vlan = '20' + # Add member interface to bridge and set VLAN filter for interface in self._interfaces: base = self._base_path + [interface] self.cli_set(base + ['enable-vlan']) self.cli_set(base + ['address', '192.0.2.1/24']) - self.cli_set(base + ['vif', str(vif_vlan), 'address', '192.0.3.1/24']) - self.cli_set(base + ['vif', str(vif_vlan), 'mtu', self._mtu]) - vlan_id = 101 - allowed_vlan = 2 - allowed_vlan_range = '4-9' - # assign members to bridge interface + for vif in vifs: + self.cli_set(base + ['vif', vif, 'address', f'192.0.{vif}.1/24']) + self.cli_set(base + ['vif', vif, 'mtu', self._mtu]) + for member in self._members: base_member = base + ['member', 'interface', member] - self.cli_set(base_member + ['allowed-vlan', str(allowed_vlan)]) - self.cli_set(base_member + ['allowed-vlan', allowed_vlan_range]) - self.cli_set(base_member + ['native-vlan', str(vlan_id)]) - vlan_id += 1 + self.cli_set(base_member + ['native-vlan', native_vlan]) + for vif in vifs: + self.cli_set(base_member + ['allowed-vlan', vif]) # commit config self.cli_commit() + def _verify_members(interface, members) -> None: + # check member interfaces are added on the bridge + bridge_members = [] + for tmp in glob(f'/sys/class/net/{interface}/lower_*'): + bridge_members.append(os.path.basename(tmp).replace('lower_', '')) + + self.assertListEqual(sorted(members), sorted(bridge_members)) + + def _check_vlan_filter(interface, vifs) -> None: + configured_vlan_ids = [] + + bridge_json = cmd(f'bridge -j vlan show dev {interface}') + bridge_json = json.loads(bridge_json) + self.assertIsNotNone(bridge_json) + + for tmp in bridge_json: + self.assertIn('vlans', tmp) + + for vlan in tmp['vlans']: + self.assertIn('vlan', vlan) + configured_vlan_ids.append(str(vlan['vlan'])) + + # Verify native VLAN ID has 'PVID' flag set on individual member ports + if not interface.startswith('br') and str(vlan['vlan']) == native_vlan: + self.assertIn('flags', vlan) + self.assertIn('PVID', vlan['flags']) + + self.assertListEqual(sorted(configured_vlan_ids), sorted(vifs)) + # Verify correct setting of VLAN filter function for interface in self._interfaces: tmp = read_file(f'/sys/class/net/{interface}/bridge/vlan_filtering') self.assertEqual(tmp, '1') - # Execute the program to obtain status information and verify proper - # VLAN filter setup - _check_vlan_filter() + # Obtain status information and verify proper VLAN filter setup. + # First check if all members are present, second check if all VLANs + # are assigned on the parend bridge interface, third verify all the + # VLANs are properly setup on the downstream "member" ports + for interface in self._interfaces: + # check member interfaces are added on the bridge + _verify_members(interface, self._members) - # check member interfaces are added on the bridge - _verify_members() + # Check if all VLAN ids are properly set up. Bridge interface always + # has native VLAN 1 + tmp = deepcopy(vifs) + tmp.append('1') + _check_vlan_filter(interface, tmp) + + for member in self._members: + _check_vlan_filter(member, vifs) # change member interface description to trigger config update, # VLANs must still exist (T4565) @@ -313,12 +307,22 @@ class BridgeInterfaceTest(BasicInterfaceTest.TestCase): # commit config self.cli_commit() - # check member interfaces are added on the bridge - _verify_members() + # Obtain status information and verify proper VLAN filter setup. + # First check if all members are present, second check if all VLANs + # are assigned on the parend bridge interface, third verify all the + # VLANs are properly setup on the downstream "member" ports + for interface in self._interfaces: + # check member interfaces are added on the bridge + _verify_members(interface, self._members) + + # Check if all VLAN ids are properly set up. Bridge interface always + # has native VLAN 1 + tmp = deepcopy(vifs) + tmp.append('1') + _check_vlan_filter(interface, tmp) - # Execute the program to obtain status information and verify proper - # VLAN filter setup - _check_vlan_filter() + for member in self._members: + _check_vlan_filter(member, vifs) # delete all members for interface in self._interfaces: @@ -337,7 +341,6 @@ class BridgeInterfaceTest(BasicInterfaceTest.TestCase): for member in self._members: self.assertNotIn(member, bridge_members) - def test_bridge_vif_members(self): # T2945: ensure that VIFs are not dropped from bridge vifs = ['300', '400'] diff --git a/smoketest/scripts/cli/test_nat66.py b/smoketest/scripts/cli/test_nat66.py index 4b5625569..c5db066db 100755 --- a/smoketest/scripts/cli/test_nat66.py +++ b/smoketest/scripts/cli/test_nat66.py @@ -131,6 +131,30 @@ class TestNAT66(VyOSUnitTestSHIM.TestCase): self.verify_nftables(nftables_search, 'ip6 nat') + def test_destination_nat66_protocol(self): + translation_address = '2001:db8:1111::1' + source_prefix = '2001:db8:2222::/64' + dport = '4545' + sport = '8080' + tport = '5555' + proto = 'tcp' + self.cli_set(dst_path + ['rule', '1', 'inbound-interface', 'eth1']) + self.cli_set(dst_path + ['rule', '1', 'destination', 'port', dport]) + self.cli_set(dst_path + ['rule', '1', 'source', 'address', source_prefix]) + self.cli_set(dst_path + ['rule', '1', 'source', 'port', sport]) + self.cli_set(dst_path + ['rule', '1', 'protocol', proto]) + self.cli_set(dst_path + ['rule', '1', 'translation', 'address', translation_address]) + self.cli_set(dst_path + ['rule', '1', 'translation', 'port', tport]) + + # check validate() - outbound-interface must be defined + self.cli_commit() + + nftables_search = [ + ['iifname "eth1"', 'tcp dport { 4545 } ip6 saddr 2001:db8:2222::/64 tcp sport { 8080 } dnat to 2001:db8:1111::1:5555'] + ] + + self.verify_nftables(nftables_search, 'ip6 nat') + def test_destination_nat66_prefix(self): destination_prefix = 'fc00::/64' translation_prefix = 'fc01::/64' @@ -176,6 +200,30 @@ class TestNAT66(VyOSUnitTestSHIM.TestCase): self.cli_set(src_path + ['rule', rule, 'translation', 'address', 'masquerade']) self.cli_commit() + def test_source_nat66_protocol(self): + translation_address = '2001:db8:1111::1' + source_prefix = '2001:db8:2222::/64' + dport = '9999' + sport = '8080' + tport = '80' + proto = 'tcp' + self.cli_set(src_path + ['rule', '1', 'outbound-interface', 'eth1']) + self.cli_set(src_path + ['rule', '1', 'destination', 'port', dport]) + self.cli_set(src_path + ['rule', '1', 'source', 'prefix', source_prefix]) + self.cli_set(src_path + ['rule', '1', 'source', 'port', sport]) + self.cli_set(src_path + ['rule', '1', 'protocol', proto]) + self.cli_set(src_path + ['rule', '1', 'translation', 'address', translation_address]) + self.cli_set(src_path + ['rule', '1', 'translation', 'port', tport]) + + # check validate() - outbound-interface must be defined + self.cli_commit() + + nftables_search = [ + ['oifname "eth1"', 'ip6 saddr 2001:db8:2222::/64 tcp dport { 9999 } tcp sport { 8080 } snat to 2001:db8:1111::1:80'] + ] + + self.verify_nftables(nftables_search, 'ip6 nat') + def test_nat66_no_rules(self): # T3206: deleting all rules but keep the direction 'destination' or # 'source' resulteds in KeyError: 'rule'. diff --git a/smoketest/scripts/cli/test_protocols_bgp.py b/smoketest/scripts/cli/test_protocols_bgp.py index cefaad64a..6196ffe60 100755 --- a/smoketest/scripts/cli/test_protocols_bgp.py +++ b/smoketest/scripts/cli/test_protocols_bgp.py @@ -105,7 +105,8 @@ neighbor_config = { 'pfx_list_out' : prefix_list_out6, 'no_send_comm_ext' : '', 'peer_group' : 'foo-bar_baz', - 'graceful_rst_hlp' : '' + 'graceful_rst_hlp' : '', + 'disable_conn_chk' : '', }, } @@ -120,6 +121,7 @@ peer_group_config = { 'shutdown' : '', 'cap_over' : '', 'ttl_security' : '5', + 'disable_conn_chk' : '', }, 'bar' : { 'remote_as' : '111', @@ -251,6 +253,9 @@ class TestProtocolsBGP(VyOSUnitTestSHIM.TestCase): self.assertIn(f' neighbor {peer} graceful-restart-disable', frrconfig) if 'graceful_rst_hlp' in peer_config: self.assertIn(f' neighbor {peer} graceful-restart-helper', frrconfig) + if 'disable_conn_chk' in peer_config: + self.assertIn(f' neighbor {peer} disable-connected-check', frrconfig) + def test_bgp_01_simple(self): router_id = '127.0.0.1' @@ -400,6 +405,8 @@ class TestProtocolsBGP(VyOSUnitTestSHIM.TestCase): self.cli_set(base_path + ['neighbor', peer, 'graceful-restart', 'disable']) if 'graceful_rst_hlp' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'graceful-restart', 'restart-helper']) + if 'disable_conn_chk' in peer_config: + self.cli_set(base_path + ['neighbor', peer, 'disable-connected-check']) # Conditional advertisement if 'advertise_map' in peer_config: @@ -488,6 +495,8 @@ class TestProtocolsBGP(VyOSUnitTestSHIM.TestCase): self.cli_set(base_path + ['peer-group', peer_group, 'graceful-restart', 'disable']) if 'graceful_rst_hlp' in config: self.cli_set(base_path + ['peer-group', peer_group, 'graceful-restart', 'restart-helper']) + if 'disable_conn_chk' in config: + self.cli_set(base_path + ['peer-group', peer_group, 'disable-connected-check']) # Conditional advertisement if 'advertise_map' in config: diff --git a/smoketest/scripts/cli/test_service_dns_forwarding.py b/smoketest/scripts/cli/test_service_dns_forwarding.py index 65b676451..fe2682d50 100755 --- a/smoketest/scripts/cli/test_service_dns_forwarding.py +++ b/smoketest/scripts/cli/test_service_dns_forwarding.py @@ -26,7 +26,7 @@ from vyos.util import process_named_running CONFIG_FILE = '/run/powerdns/recursor.conf' FORWARD_FILE = '/run/powerdns/recursor.forward-zones.conf' HOSTSD_FILE = '/run/powerdns/recursor.vyos-hostsd.conf.lua' -PROCESS_NAME= 'pdns-r/worker' +PROCESS_NAME= 'pdns_recursor' base_path = ['service', 'dns', 'forwarding'] diff --git a/smoketest/scripts/cli/test_service_monitoring_telegraf.py b/smoketest/scripts/cli/test_service_monitoring_telegraf.py index 1c8cc9759..c1c4044e6 100755 --- a/smoketest/scripts/cli/test_service_monitoring_telegraf.py +++ b/smoketest/scripts/cli/test_service_monitoring_telegraf.py @@ -24,7 +24,7 @@ from vyos.util import process_named_running from vyos.util import read_file PROCESS_NAME = 'telegraf' -TELEGRAF_CONF = '/run/telegraf/vyos-telegraf.conf' +TELEGRAF_CONF = '/run/telegraf/telegraf.conf' base_path = ['service', 'monitoring', 'telegraf'] org = 'log@in.local' token = 'GuRJc12tIzfjnYdKRAIYbxdWd2aTpOT9PVYNddzDnFV4HkAcD7u7-kndTFXjGuXzJN6TTxmrvPODB4mnFcseDV==' diff --git a/smoketest/scripts/cli/test_vpn_openconnect.py b/smoketest/scripts/cli/test_vpn_openconnect.py index bda279342..8572d6d66 100755 --- a/smoketest/scripts/cli/test_vpn_openconnect.py +++ b/smoketest/scripts/cli/test_vpn_openconnect.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2020 VyOS maintainers and contributors +# Copyright (C) 2020-2022 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -19,6 +19,7 @@ import unittest from base_vyostest_shim import VyOSUnitTestSHIM from vyos.util import process_named_running +from vyos.util import read_file OCSERV_CONF = '/run/ocserv/ocserv.conf' base_path = ['vpn', 'openconnect'] @@ -46,36 +47,84 @@ MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgPLpD0Ohhoq0g4nhx u8/3jHMM7sDwL3aWzW/zp54/LhCWUoLMjDdDEEigK4fal4ZF9aA9F0Ww """ -class TestVpnOpenconnect(VyOSUnitTestSHIM.TestCase): +PROCESS_NAME = 'ocserv-main' +config_file = '/run/ocserv/ocserv.conf' +auth_file = '/run/ocserv/ocpasswd' +otp_file = '/run/ocserv/users.oath' + +class TestVPNOpenConnect(VyOSUnitTestSHIM.TestCase): + @classmethod + def setUpClass(cls): + super(TestVPNOpenConnect, cls).setUpClass() + + # ensure we can also run this test on a live system - so lets clean + # out the current configuration :) + cls.cli_delete(cls, base_path) + + cls.cli_set(cls, pki_path + ['ca', 'openconnect', 'certificate', cert_data.replace('\n','')]) + cls.cli_set(cls, pki_path + ['certificate', 'openconnect', 'certificate', cert_data.replace('\n','')]) + cls.cli_set(cls, pki_path + ['certificate', 'openconnect', 'private', 'key', key_data.replace('\n','')]) + + @classmethod + def tearDownClass(cls): + cls.cli_delete(cls, pki_path) + super(TestVPNOpenConnect, cls).tearDownClass() + def tearDown(self): - # Delete vpn openconnect configuration - self.cli_delete(pki_path) + self.assertTrue(process_named_running(PROCESS_NAME)) + self.cli_delete(base_path) self.cli_commit() - def test_vpn(self): + self.assertFalse(process_named_running(PROCESS_NAME)) + + def test_ocserv(self): user = 'vyos_user' password = 'vyos_pass' otp = '37500000026900000000200000000000' - - self.cli_delete(pki_path) - self.cli_delete(base_path) - - self.cli_set(pki_path + ['ca', 'openconnect', 'certificate', cert_data.replace('\n','')]) - self.cli_set(pki_path + ['certificate', 'openconnect', 'certificate', cert_data.replace('\n','')]) - self.cli_set(pki_path + ['certificate', 'openconnect', 'private', 'key', key_data.replace('\n','')]) + v4_subnet = '192.0.2.0/24' + v6_prefix = '2001:db8:1000::/64' + v6_len = '126' + name_server = ['1.2.3.4', '1.2.3.5', '2001:db8::1'] + split_dns = ['vyos.net', 'vyos.io'] self.cli_set(base_path + ['authentication', 'local-users', 'username', user, 'password', password]) self.cli_set(base_path + ['authentication', 'local-users', 'username', user, 'otp', 'key', otp]) self.cli_set(base_path + ['authentication', 'mode', 'local', 'password-otp']) - self.cli_set(base_path + ['network-settings', 'client-ip-settings', 'subnet', '192.0.2.0/24']) + + self.cli_set(base_path + ['network-settings', 'client-ip-settings', 'subnet', v4_subnet]) + self.cli_set(base_path + ['network-settings', 'client-ipv6-pool', 'prefix', v6_prefix]) + self.cli_set(base_path + ['network-settings', 'client-ipv6-pool', 'mask', v6_len]) + + for ns in name_server: + self.cli_set(base_path + ['network-settings', 'name-server', ns]) + for domain in split_dns: + self.cli_set(base_path + ['network-settings', 'split-dns', domain]) + self.cli_set(base_path + ['ssl', 'ca-certificate', 'openconnect']) self.cli_set(base_path + ['ssl', 'certificate', 'openconnect']) self.cli_commit() - # Check for running process - self.assertTrue(process_named_running('ocserv-main')) + # Verify configuration + daemon_config = read_file(config_file) + + # authentication mode local password-otp + self.assertIn(f'auth = "plain[passwd=/run/ocserv/ocpasswd,otp=/run/ocserv/users.oath]"', daemon_config) + self.assertIn(f'ipv4-network = {v4_subnet}', daemon_config) + self.assertIn(f'ipv6-network = {v6_prefix}', daemon_config) + self.assertIn(f'ipv6-subnet-prefix = {v6_len}', daemon_config) + + for ns in name_server: + self.assertIn(f'dns = {ns}', daemon_config) + for domain in split_dns: + self.assertIn(f'split-dns = {domain}', daemon_config) + + auth_config = read_file(auth_file) + self.assertIn(f'{user}:*:$', auth_config) + + otp_config = read_file(otp_file) + self.assertIn(f'HOTP/T30/6 {user} - {otp}', otp_config) if __name__ == '__main__': unittest.main(verbosity=2) |