diff options
Diffstat (limited to 'smoketest/scripts/cli')
19 files changed, 647 insertions, 62 deletions
diff --git a/smoketest/scripts/cli/base_interfaces_test.py b/smoketest/scripts/cli/base_interfaces_test.py index 9be2c2f1a..e7e29387f 100644 --- a/smoketest/scripts/cli/base_interfaces_test.py +++ b/smoketest/scripts/cli/base_interfaces_test.py @@ -15,7 +15,6 @@ from netifaces import AF_INET from netifaces import AF_INET6 from netifaces import ifaddresses -from netifaces import interfaces from base_vyostest_shim import VyOSUnitTestSHIM @@ -25,12 +24,15 @@ from vyos.ifconfig import Interface from vyos.ifconfig import Section from vyos.utils.file import read_file from vyos.utils.dict import dict_search +from vyos.utils.process import cmd from vyos.utils.process import process_named_running from vyos.utils.network import get_interface_config from vyos.utils.network import get_interface_vrf -from vyos.utils.process import cmd +from vyos.utils.network import get_vrf_tableid +from vyos.utils.network import interface_exists from vyos.utils.network import is_intf_addr_assigned from vyos.utils.network import is_ipv6_link_local +from vyos.utils.network import get_nft_vrf_zone_mapping from vyos.xml_ref import cli_defined dhclient_base_dir = directories['isc_dhclient_dir'] @@ -116,8 +118,11 @@ class BasicInterfaceTest: self.cli_commit() # Verify that no previously interface remained on the system + ct_map = get_nft_vrf_zone_mapping() for intf in self._interfaces: - self.assertNotIn(intf, interfaces()) + self.assertFalse(interface_exists(intf)) + for map_entry in ct_map: + self.assertNotEqual(intf, map_entry['interface']) # No daemon started during tests should remain running for daemon in ['dhcp6c', 'dhclient']: @@ -257,6 +262,69 @@ class BasicInterfaceTest: self.cli_delete(['vrf', 'name', vrf_name]) + def test_move_interface_between_vrf_instances(self): + if not self._test_vrf: + self.skipTest('not supported') + + vrf1_name = 'smoketest_mgmt1' + vrf1_table = '5424' + vrf2_name = 'smoketest_mgmt2' + vrf2_table = '7412' + + self.cli_set(['vrf', 'name', vrf1_name, 'table', vrf1_table]) + self.cli_set(['vrf', 'name', vrf2_name, 'table', vrf2_table]) + + # move interface into first VRF + for interface in self._interfaces: + for option in self._options.get(interface, []): + self.cli_set(self._base_path + [interface] + option.split()) + self.cli_set(self._base_path + [interface, 'vrf', vrf1_name]) + + self.cli_commit() + + # check that interface belongs to proper VRF + for interface in self._interfaces: + tmp = get_interface_vrf(interface) + self.assertEqual(tmp, vrf1_name) + + tmp = get_interface_config(vrf1_name) + self.assertEqual(int(vrf1_table), get_vrf_tableid(interface)) + + # move interface into second VRF + for interface in self._interfaces: + self.cli_set(self._base_path + [interface, 'vrf', vrf2_name]) + + self.cli_commit() + + # check that interface belongs to proper VRF + for interface in self._interfaces: + tmp = get_interface_vrf(interface) + self.assertEqual(tmp, vrf2_name) + + tmp = get_interface_config(vrf2_name) + self.assertEqual(int(vrf2_table), get_vrf_tableid(interface)) + + self.cli_delete(['vrf', 'name', vrf1_name]) + self.cli_delete(['vrf', 'name', vrf2_name]) + + def test_add_to_invalid_vrf(self): + if not self._test_vrf: + self.skipTest('not supported') + + # move interface into first VRF + for interface in self._interfaces: + for option in self._options.get(interface, []): + self.cli_set(self._base_path + [interface] + option.split()) + self.cli_set(self._base_path + [interface, 'vrf', 'invalid']) + + # check validate() - can not use a non-existing VRF + with self.assertRaises(ConfigSessionError): + self.cli_commit() + + for interface in self._interfaces: + self.cli_delete(self._base_path + [interface, 'vrf', 'invalid']) + self.cli_set(self._base_path + [interface, 'description', 'test_add_to_invalid_vrf']) + def test_span_mirror(self): if not self._mirror_interfaces: self.skipTest('not supported') diff --git a/smoketest/scripts/cli/base_vyostest_shim.py b/smoketest/scripts/cli/base_vyostest_shim.py index 4bcc50453..940306ac3 100644 --- a/smoketest/scripts/cli/base_vyostest_shim.py +++ b/smoketest/scripts/cli/base_vyostest_shim.py @@ -15,6 +15,7 @@ import os import unittest import paramiko +import pprint from time import sleep from typing import Type @@ -80,18 +81,32 @@ class VyOSUnitTestSHIM: self._session.discard() def cli_commit(self): + if self.debug: + print('commit') self._session.commit() # during a commit there is a process opening commit_lock, and run() returns 0 while run(f'sudo lsof -nP {commit_lock}') == 0: sleep(0.250) + def op_mode(self, path : list) -> None: + """ + Execute OP-mode command and return stdout + """ + if self.debug: + print('commit') + path = ' '.join(path) + out = cmd(f'/opt/vyatta/bin/vyatta-op-cmd-wrapper {path}') + if self.debug: + print(f'\n\ncommand "{path}" returned:\n') + pprint.pprint(out) + return out + def getFRRconfig(self, string=None, end='$', endsection='^!', daemon=''): """ Retrieve current "running configuration" from FRR """ command = f'vtysh -c "show run {daemon} no-header"' if string: command += f' | sed -n "/^{string}{end}/,/{endsection}/p"' out = cmd(command) if self.debug: - import pprint print(f'\n\ncommand "{command}" returned:\n') pprint.pprint(out) return out diff --git a/smoketest/scripts/cli/test_config_dependency.py b/smoketest/scripts/cli/test_config_dependency.py new file mode 100755 index 000000000..14e88321a --- /dev/null +++ b/smoketest/scripts/cli/test_config_dependency.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python3 +# Copyright 2024 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this library. If not, see <http://www.gnu.org/licenses/>. + + +import unittest + +from base_vyostest_shim import VyOSUnitTestSHIM + +from vyos.configsession import ConfigSessionError + + +class TestConfigDep(VyOSUnitTestSHIM.TestCase): + def test_configdep_error(self): + address_group = 'AG' + address = '192.168.137.5' + nat_base = ['nat', 'source', 'rule', '10'] + interface = 'eth1' + + self.cli_set(['firewall', 'group', 'address-group', address_group, + 'address', address]) + self.cli_set(nat_base + ['outbound-interface', 'name', interface]) + self.cli_set(nat_base + ['source', 'group', 'address-group', address_group]) + self.cli_set(nat_base + ['translation', 'address', 'masquerade']) + self.cli_commit() + + self.cli_delete(['firewall']) + # check error in call to dependent script (nat) + with self.assertRaises(ConfigSessionError): + self.cli_commit() + + # clean up remaining + self.cli_delete(['nat']) + self.cli_commit() + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_firewall.py b/smoketest/scripts/cli/test_firewall.py index 0943d8e24..e6317050c 100755 --- a/smoketest/scripts/cli/test_firewall.py +++ b/smoketest/scripts/cli/test_firewall.py @@ -995,5 +995,81 @@ class TestFirewall(VyOSUnitTestSHIM.TestCase): self.verify_nftables_chain([['accept']], 'ip vyos_conntrack', 'FW_CONNTRACK') self.verify_nftables_chain([['accept']], 'ip6 vyos_conntrack', 'FW_CONNTRACK') + def test_ipsec_metadata_match(self): + self.cli_set(['firewall', 'ipv4', 'name', 'smoketest-ipsec-in4', 'rule', '1', 'action', 'accept']) + self.cli_set(['firewall', 'ipv4', 'name', 'smoketest-ipsec-in4', 'rule', '1', 'ipsec', 'match-ipsec-in']) + self.cli_set(['firewall', 'ipv4', 'name', 'smoketest-ipsec-in4', 'rule', '2', 'action', 'drop']) + self.cli_set(['firewall', 'ipv4', 'name', 'smoketest-ipsec-in4', 'rule', '2', 'ipsec', 'match-none-in']) + self.cli_set(['firewall', 'ipv4', 'name', 'smoketest-ipsec-out4', 'rule', '1', 'action', 'continue']) + self.cli_set(['firewall', 'ipv4', 'name', 'smoketest-ipsec-out4', 'rule', '1', 'ipsec', 'match-ipsec-out']) + self.cli_set(['firewall', 'ipv4', 'name', 'smoketest-ipsec-out4', 'rule', '2', 'action', 'reject']) + self.cli_set(['firewall', 'ipv4', 'name', 'smoketest-ipsec-out4', 'rule', '2', 'ipsec', 'match-none-out']) + self.cli_set(['firewall', 'ipv6', 'name', 'smoketest-ipsec-in6', 'rule', '1', 'action', 'accept']) + self.cli_set(['firewall', 'ipv6', 'name', 'smoketest-ipsec-in6', 'rule', '1', 'ipsec', 'match-ipsec-in']) + self.cli_set(['firewall', 'ipv6', 'name', 'smoketest-ipsec-in6', 'rule', '2', 'action', 'drop']) + self.cli_set(['firewall', 'ipv6', 'name', 'smoketest-ipsec-in6', 'rule', '2', 'ipsec', 'match-none-in']) + self.cli_set(['firewall', 'ipv6', 'name', 'smoketest-ipsec-out6', 'rule', '1', 'action', 'continue']) + self.cli_set(['firewall', 'ipv6', 'name', 'smoketest-ipsec-out6', 'rule', '1', 'ipsec', 'match-ipsec-out']) + self.cli_set(['firewall', 'ipv6', 'name', 'smoketest-ipsec-out6', 'rule', '2', 'action', 'reject']) + self.cli_set(['firewall', 'ipv6', 'name', 'smoketest-ipsec-out6', 'rule', '2', 'ipsec', 'match-none-out']) + + self.cli_commit() + + nftables_search = [ + ['meta ipsec exists', 'accept comment'], + ['meta ipsec missing', 'drop comment'], + ['rt ipsec exists', 'continue comment'], + ['rt ipsec missing', 'reject comment'], + ] + + self.verify_nftables(nftables_search, 'ip vyos_filter') + self.verify_nftables(nftables_search, 'ip6 vyos_filter') + + self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '1', 'action', 'jump']) + self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '1', 'jump-target', 'smoketest-ipsec-in4']) + self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '1', 'action', 'jump']) + self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '1', 'jump-target', 'smoketest-ipsec-in4']) + self.cli_set(['firewall', 'ipv4', 'prerouting', 'raw', 'rule', '1', 'action', 'jump']) + self.cli_set(['firewall', 'ipv4', 'prerouting', 'raw', 'rule', '1', 'jump-target', 'smoketest-ipsec-in4']) + + self.cli_set(['firewall', 'ipv4', 'output', 'filter', 'rule', '1', 'action', 'jump']) + self.cli_set(['firewall', 'ipv4', 'output', 'filter', 'rule', '1', 'jump-target', 'smoketest-ipsec-out4']) + self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '1', 'action', 'jump']) + self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '1', 'jump-target', 'smoketest-ipsec-out4']) + + # All valid directional usage of ipsec matches + self.cli_commit() + + self.cli_set(['firewall', 'ipv4', 'name', 'smoketest-ipsec-in-indirect', 'rule', '1', 'action', 'jump']) + self.cli_set(['firewall', 'ipv4', 'name', 'smoketest-ipsec-in-indirect', 'rule', '1', 'jump-target', 'smoketest-ipsec-in4']) + + self.cli_set(['firewall', 'ipv4', 'output', 'filter', 'rule', '1', 'action', 'jump']) + self.cli_set(['firewall', 'ipv4', 'output', 'filter', 'rule', '1', 'jump-target', 'smoketest-ipsec-in-indirect']) + + # nft does not support ANY usage of 'meta ipsec' under an output hook, it will fail to load cfg + with self.assertRaises(ConfigSessionError): + self.cli_commit() + + def test_cyclic_jump_validation(self): + self.cli_set(['firewall', 'ipv4', 'name', 'smoketest-cycle-1', 'rule', '1', 'action', 'jump']) + self.cli_set(['firewall', 'ipv4', 'name', 'smoketest-cycle-1', 'rule', '1', 'jump-target', 'smoketest-cycle-2']) + self.cli_set(['firewall', 'ipv4', 'name', 'smoketest-cycle-2', 'rule', '1', 'action', 'jump']) + self.cli_set(['firewall', 'ipv4', 'name', 'smoketest-cycle-2', 'rule', '1', 'jump-target', 'smoketest-cycle-3']) + self.cli_set(['firewall', 'ipv4', 'name', 'smoketest-cycle-3', 'rule', '1', 'action', 'accept']) + self.cli_set(['firewall', 'ipv4', 'name', 'smoketest-cycle-3', 'rule', '1', 'log']) + self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '1', 'action', 'jump']) + self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '1', 'jump-target', 'smoketest-cycle-1']) + + # Multi-level jumps are unwise but allowed + self.cli_commit() + + self.cli_set(['firewall', 'ipv4', 'name', 'smoketest-cycle-3', 'rule', '1', 'action', 'jump']) + self.cli_set(['firewall', 'ipv4', 'name', 'smoketest-cycle-3', 'rule', '1', 'jump-target', 'smoketest-cycle-1']) + + # nft will fail to load cyclic jumps in any form, whether the rule is reachable or not. + # It should be caught by conf validation. + with self.assertRaises(ConfigSessionError): + self.cli_commit() + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_interfaces_l2tpv3.py b/smoketest/scripts/cli/test_interfaces_l2tpv3.py index af3d49f75..28165736b 100755 --- a/smoketest/scripts/cli/test_interfaces_l2tpv3.py +++ b/smoketest/scripts/cli/test_interfaces_l2tpv3.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2020-2021 VyOS maintainers and contributors +# Copyright (C) 2020-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -14,13 +14,12 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import os import json import unittest from base_interfaces_test import BasicInterfaceTest from vyos.utils.process import cmd - +from vyos.utils.kernel import unload_kmod class L2TPv3InterfaceTest(BasicInterfaceTest.TestCase): @classmethod def setUpClass(cls): @@ -62,7 +61,6 @@ if __name__ == '__main__': # reloaded on demand - not needed but test more and more features for module in ['l2tp_ip6', 'l2tp_ip', 'l2tp_eth', 'l2tp_eth', 'l2tp_netlink', 'l2tp_core']: - if os.path.exists(f'/sys/module/{module}'): - cmd(f'sudo rmmod {module}') + unload_kmod(module) unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_interfaces_openvpn.py b/smoketest/scripts/cli/test_interfaces_openvpn.py index 9ca661e87..ca47c3218 100755 --- a/smoketest/scripts/cli/test_interfaces_openvpn.py +++ b/smoketest/scripts/cli/test_interfaces_openvpn.py @@ -123,7 +123,7 @@ class TestInterfacesOpenVPN(VyOSUnitTestSHIM.TestCase): interface = 'vtun2000' path = base_path + [interface] self.cli_set(path + ['mode', 'client']) - self.cli_set(path + ['encryption', 'ncp-ciphers', 'aes192gcm']) + self.cli_set(path + ['encryption', 'data-ciphers', 'aes192gcm']) # check validate() - cannot specify local-port in client mode self.cli_set(path + ['local-port', '5000']) @@ -197,7 +197,7 @@ class TestInterfacesOpenVPN(VyOSUnitTestSHIM.TestCase): auth_hash = 'sha1' self.cli_set(path + ['device-type', 'tun']) - self.cli_set(path + ['encryption', 'ncp-ciphers', 'aes256']) + self.cli_set(path + ['encryption', 'data-ciphers', 'aes256']) self.cli_set(path + ['hash', auth_hash]) self.cli_set(path + ['mode', 'client']) self.cli_set(path + ['persistent-tunnel']) @@ -371,7 +371,7 @@ class TestInterfacesOpenVPN(VyOSUnitTestSHIM.TestCase): port = str(2000 + ii) self.cli_set(path + ['device-type', 'tun']) - self.cli_set(path + ['encryption', 'ncp-ciphers', 'aes192']) + self.cli_set(path + ['encryption', 'data-ciphers', 'aes192']) self.cli_set(path + ['hash', auth_hash]) self.cli_set(path + ['mode', 'server']) self.cli_set(path + ['local-port', port]) @@ -462,8 +462,8 @@ class TestInterfacesOpenVPN(VyOSUnitTestSHIM.TestCase): self.cli_set(path + ['mode', 'site-to-site']) - # check validate() - encryption ncp-ciphers cannot be specified in site-to-site mode - self.cli_set(path + ['encryption', 'ncp-ciphers', 'aes192gcm']) + # check validate() - cipher negotiation cannot be enabled in site-to-site mode + self.cli_set(path + ['encryption', 'data-ciphers', 'aes192gcm']) with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(path + ['encryption']) diff --git a/smoketest/scripts/cli/test_interfaces_vxlan.py b/smoketest/scripts/cli/test_interfaces_vxlan.py index 18676491b..b2076b43b 100755 --- a/smoketest/scripts/cli/test_interfaces_vxlan.py +++ b/smoketest/scripts/cli/test_interfaces_vxlan.py @@ -27,6 +27,13 @@ from vyos.utils.network import get_vxlan_vni_filter from vyos.template import is_ipv6 from base_interfaces_test import BasicInterfaceTest +def convert_to_list(ranges_to_convert): + result_list = [] + for r in ranges_to_convert: + ranges = r.split('-') + result_list.extend([str(i) for i in range(int(ranges[0]), int(ranges[1]) + 1)]) + return result_list + class VXLANInterfaceTest(BasicInterfaceTest.TestCase): @classmethod def setUpClass(cls): @@ -153,6 +160,11 @@ class VXLANInterfaceTest(BasicInterfaceTest.TestCase): '31': '10031', } + vlan_to_vni_ranges = { + '40-43': '10040-10043', + '45-47': '10045-10047' + } + self.cli_set(self._base_path + [interface, 'parameters', 'external']) self.cli_set(self._base_path + [interface, 'source-address', source_address]) @@ -185,6 +197,26 @@ class VXLANInterfaceTest(BasicInterfaceTest.TestCase): tmp = get_vxlan_vlan_tunnels('vxlan0') self.assertEqual(tmp, list(vlan_to_vni)) + # add ranged VLAN - VNI mapping + for vlan, vni in vlan_to_vni_ranges.items(): + self.cli_set(self._base_path + [interface, 'vlan-to-vni', vlan, 'vni', vni]) + self.cli_commit() + + tmp = get_vxlan_vlan_tunnels('vxlan0') + vlans_list = convert_to_list(vlan_to_vni_ranges.keys()) + self.assertEqual(tmp, list(vlan_to_vni) + vlans_list) + + # check validate() - cannot map VNI range to a single VLAN id + self.cli_set(self._base_path + [interface, 'vlan-to-vni', '100', 'vni', '100-102']) + with self.assertRaises(ConfigSessionError): + self.cli_commit() + self.cli_delete(self._base_path + [interface, 'vlan-to-vni', '100']) + + # check validate() - cannot map VLAN to VNI with different ranges + self.cli_set(self._base_path + [interface, 'vlan-to-vni', '100-102', 'vni', '100-105']) + with self.assertRaises(ConfigSessionError): + self.cli_commit() + self.cli_delete(['interfaces', 'bridge', bridge]) def test_vxlan_neighbor_suppress(self): @@ -287,6 +319,12 @@ class VXLANInterfaceTest(BasicInterfaceTest.TestCase): '60': '10060', '69': '10069', } + + vlan_to_vni_ranges = { + '70-73': '10070-10073', + '75-77': '10075-10077' + } + for vlan, vni in vlan_to_vni.items(): self.cli_set(self._base_path + [interface, 'vlan-to-vni', vlan, 'vni', vni]) # we need a bridge ... @@ -313,6 +351,15 @@ class VXLANInterfaceTest(BasicInterfaceTest.TestCase): tmp = get_vxlan_vni_filter(interface) self.assertListEqual(list(vlan_to_vni.values()), tmp) + # add ranged VLAN - VNI mapping + for vlan, vni in vlan_to_vni_ranges.items(): + self.cli_set(self._base_path + [interface, 'vlan-to-vni', vlan, 'vni', vni]) + self.cli_commit() + + tmp = get_vxlan_vni_filter(interface) + vnis_list = convert_to_list(vlan_to_vni_ranges.values()) + self.assertListEqual(list(vlan_to_vni.values()) + vnis_list, tmp) + self.cli_delete(['interfaces', 'bridge', bridge]) if __name__ == '__main__': diff --git a/smoketest/scripts/cli/test_interfaces_wireless.py b/smoketest/scripts/cli/test_interfaces_wireless.py index 58aef0001..7bfe0d221 100755 --- a/smoketest/scripts/cli/test_interfaces_wireless.py +++ b/smoketest/scripts/cli/test_interfaces_wireless.py @@ -22,9 +22,11 @@ from base_interfaces_test import BasicInterfaceTest from glob import glob from vyos.configsession import ConfigSessionError -from vyos.utils.process import process_named_running -from vyos.utils.kernel import check_kmod from vyos.utils.file import read_file +from vyos.utils.kernel import check_kmod +from vyos.utils.network import interface_exists +from vyos.utils.process import process_named_running +from vyos.utils.process import call from vyos.xml_ref import default_value def get_config_value(interface, key): @@ -33,7 +35,7 @@ def get_config_value(interface, key): return tmp[0] wifi_cc_path = ['system', 'wireless', 'country-code'] - +country = 'se' class WirelessInterfaceTest(BasicInterfaceTest.TestCase): @classmethod def setUpClass(cls): @@ -66,7 +68,8 @@ class WirelessInterfaceTest(BasicInterfaceTest.TestCase): cls._test_ipv6 = False cls._test_vlan = False - cls.cli_set(cls, wifi_cc_path + ['se']) + cls.cli_set(cls, wifi_cc_path + [country]) + def test_wireless_add_single_ip_address(self): # derived method to check if member interfaces are enslaved properly @@ -84,7 +87,7 @@ class WirelessInterfaceTest(BasicInterfaceTest.TestCase): def test_wireless_hostapd_config(self): # Only set the hostapd (access-point) options - interface = 'wlan1' + interface = self._interfaces[1] # wlan1 ssid = 'ssid' self.cli_set(self._base_path + [interface, 'ssid', ssid]) @@ -161,7 +164,7 @@ class WirelessInterfaceTest(BasicInterfaceTest.TestCase): def test_wireless_hostapd_vht_mu_beamformer_config(self): # Multi-User-Beamformer - interface = 'wlan1' + interface = self._interfaces[1] # wlan1 ssid = 'vht_mu-beamformer' antennas = '3' @@ -230,7 +233,7 @@ class WirelessInterfaceTest(BasicInterfaceTest.TestCase): def test_wireless_hostapd_vht_su_beamformer_config(self): # Single-User-Beamformer - interface = 'wlan1' + interface = self._interfaces[1] # wlan1 ssid = 'vht_su-beamformer' antennas = '3' @@ -299,16 +302,14 @@ class WirelessInterfaceTest(BasicInterfaceTest.TestCase): def test_wireless_hostapd_he_config(self): # Only set the hostapd (access-point) options - HE mode for 802.11ax at 6GHz - interface = 'wlan1' + interface = self._interfaces[1] # wlan1 ssid = 'ssid' channel = '1' sae_pw = 'VyOSVyOSVyOS' - country = 'de' bss_color = '37' channel_set_width = '134' center_channel_freq_1 = '15' - self.cli_set(wifi_cc_path + [country]) self.cli_set(self._base_path + [interface, 'ssid', ssid]) self.cli_set(self._base_path + [interface, 'type', 'access-point']) self.cli_set(self._base_path + [interface, 'channel', channel]) @@ -353,10 +354,6 @@ class WirelessInterfaceTest(BasicInterfaceTest.TestCase): tmp = get_config_value(interface, 'he_oper_centr_freq_seg0_idx') self.assertEqual(center_channel_freq_1, tmp) - # Country code - tmp = get_config_value(interface, 'country_code') - self.assertEqual(country.upper(), tmp) - # BSS coloring tmp = get_config_value(interface, 'he_bss_color') self.assertEqual(bss_color, tmp) @@ -386,15 +383,12 @@ class WirelessInterfaceTest(BasicInterfaceTest.TestCase): def test_wireless_hostapd_wpa_config(self): # Only set the hostapd (access-point) options - interface = 'wlan1' - phy = 'phy0' + interface = self._interfaces[1] # wlan1 ssid = 'VyOS-SMOKETEST' channel = '1' wpa_key = 'VyOSVyOSVyOS' mode = 'n' - country = 'de' - self.cli_set(self._base_path + [interface, 'physical-device', phy]) self.cli_set(self._base_path + [interface, 'type', 'access-point']) self.cli_set(self._base_path + [interface, 'mode', mode]) @@ -454,7 +448,7 @@ class WirelessInterfaceTest(BasicInterfaceTest.TestCase): self.assertTrue(process_named_running('hostapd')) def test_wireless_access_point_bridge(self): - interface = 'wlan1' + interface = self._interfaces[1] # wlan1 ssid = 'VyOS-Test' bridge = 'br42477' @@ -491,7 +485,7 @@ class WirelessInterfaceTest(BasicInterfaceTest.TestCase): self.cli_delete(bridge_path) def test_wireless_security_station_address(self): - interface = 'wlan1' + interface = self._interfaces[1] # wlan1 ssid = 'VyOS-ACL' hostapd_accept_station_conf = f'/run/hostapd/{interface}_station_accept.conf' @@ -511,6 +505,12 @@ class WirelessInterfaceTest(BasicInterfaceTest.TestCase): self.cli_commit() + self.assertTrue(interface_exists(interface)) + self.assertTrue(os.path.isfile(f'/run/hostapd/{interface}_station_accept.conf')) + self.assertTrue(os.path.isfile(f'/run/hostapd/{interface}_station_deny.conf')) + + self.assertTrue(process_named_running('hostapd')) + # in accept mode all addresses are allowed unless specified in the deny list tmp = get_config_value(interface, 'macaddr_acl') self.assertEqual(tmp, '0') @@ -526,6 +526,11 @@ class WirelessInterfaceTest(BasicInterfaceTest.TestCase): # Switch mode accept -> deny self.cli_set(self._base_path + [interface, 'security', 'station-address', 'mode', 'deny']) self.cli_commit() + + self.assertTrue(interface_exists(interface)) + self.assertTrue(os.path.isfile(f'/run/hostapd/{interface}_station_accept.conf')) + self.assertTrue(os.path.isfile(f'/run/hostapd/{interface}_station_deny.conf')) + # In deny mode all addresses are denied unless specified in the allow list tmp = get_config_value(interface, 'macaddr_acl') self.assertEqual(tmp, '1') @@ -535,4 +540,9 @@ class WirelessInterfaceTest(BasicInterfaceTest.TestCase): if __name__ == '__main__': check_kmod('mac80211_hwsim') - unittest.main(verbosity=2, failfast=True) + # loading the module created two WIFI Interfaces in the background (wlan0 and wlan1) + # remove them to have a clean test start + for interface in ['wlan0', 'wlan1']: + if interface_exists(interface): + call(f'sudo iw dev {interface} del') + unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_load-balancing_reverse-proxy.py b/smoketest/scripts/cli/test_load-balancing_reverse-proxy.py index aa796f59f..34f77b95d 100755 --- a/smoketest/scripts/cli/test_load-balancing_reverse-proxy.py +++ b/smoketest/scripts/cli/test_load-balancing_reverse-proxy.py @@ -458,6 +458,45 @@ class TestLoadBalancingReverseProxy(VyOSUnitTestSHIM.TestCase): config = read_file(HAPROXY_CONF) self.assertIn(f'option smtpchk', config) + def test_09_lb_reverse_proxy_logging(self): + # Setup base + self.base_config() + self.cli_commit() + + # Ensure default logging configuration is present + config = read_file(HAPROXY_CONF) + + # Test global-parameters logging options + self.cli_set(base_path + ['global-parameters', 'logging', 'facility', 'local1', 'level', 'err']) + self.cli_set(base_path + ['global-parameters', 'logging', 'facility', 'local2', 'level', 'warning']) + self.cli_commit() + + # Test global logging parameters are generated in configuration file + config = read_file(HAPROXY_CONF) + self.assertIn('log /dev/log local1 err', config) + self.assertIn('log /dev/log local2 warning', config) + + # Test backend logging options + backend_path = base_path + ['backend', 'bk-01'] + self.cli_set(backend_path + ['logging', 'facility', 'local3', 'level', 'debug']) + self.cli_set(backend_path + ['logging', 'facility', 'local4', 'level', 'info']) + self.cli_commit() + + # Test backend logging parameters are generated in configuration file + config = read_file(HAPROXY_CONF) + self.assertIn('log /dev/log local3 debug', config) + self.assertIn('log /dev/log local4 info', config) + + # Test service logging options + service_path = base_path + ['service', 'https_front'] + self.cli_set(service_path + ['logging', 'facility', 'local5', 'level', 'notice']) + self.cli_set(service_path + ['logging', 'facility', 'local6', 'level', 'crit']) + self.cli_commit() + + # Test service logging parameters are generated in configuration file + config = read_file(HAPROXY_CONF) + self.assertIn('log /dev/log local5 notice', config) + self.assertIn('log /dev/log local6 crit', config) if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_op-mode_show.py b/smoketest/scripts/cli/test_op-mode_show.py new file mode 100755 index 000000000..fba60cc01 --- /dev/null +++ b/smoketest/scripts/cli/test_op-mode_show.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2024 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import unittest +from base_vyostest_shim import VyOSUnitTestSHIM + +from vyos.version import get_version + +base_path = ['show'] + +class TestOPModeShow(VyOSUnitTestSHIM.TestCase): + def test_op_mode_show_version(self): + # Retrieve output of "show version" OP-mode command + tmp = self.op_mode(base_path + ['version']) + # Validate + version = get_version() + self.assertIn(f'Version: VyOS {version}', tmp) + + def test_op_mode_show_vrf(self): + # Retrieve output of "show version" OP-mode command + tmp = self.op_mode(base_path + ['vrf']) + # Validate + self.assertIn('VRF is not configured', tmp) + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_policy_route.py b/smoketest/scripts/cli/test_policy_route.py index 462fc24d0..797ab9770 100755 --- a/smoketest/scripts/cli/test_policy_route.py +++ b/smoketest/scripts/cli/test_policy_route.py @@ -25,6 +25,8 @@ conn_mark = '555' conn_mark_set = '111' table_mark_offset = 0x7fffffff table_id = '101' +vrf = 'PBRVRF' +vrf_table_id = '102' interface = 'eth0' interface_wc = 'ppp*' interface_ip = '172.16.10.1/24' @@ -39,11 +41,14 @@ class TestPolicyRoute(VyOSUnitTestSHIM.TestCase): cls.cli_set(cls, ['interfaces', 'ethernet', interface, 'address', interface_ip]) cls.cli_set(cls, ['protocols', 'static', 'table', table_id, 'route', '0.0.0.0/0', 'interface', interface]) + + cls.cli_set(cls, ['vrf', 'name', vrf, 'table', vrf_table_id]) @classmethod def tearDownClass(cls): cls.cli_delete(cls, ['interfaces', 'ethernet', interface, 'address', interface_ip]) cls.cli_delete(cls, ['protocols', 'static', 'table', table_id]) + cls.cli_delete(cls, ['vrf', 'name', vrf]) super(TestPolicyRoute, cls).tearDownClass() @@ -180,6 +185,50 @@ class TestPolicyRoute(VyOSUnitTestSHIM.TestCase): self.verify_rules(ip_rule_search) + def test_pbr_vrf(self): + self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'protocol', 'tcp']) + self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'destination', 'port', '8888']) + self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'tcp', 'flags', 'syn']) + self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'tcp', 'flags', 'not', 'ack']) + self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'set', 'vrf', vrf]) + self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '1', 'protocol', 'tcp_udp']) + self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '1', 'destination', 'port', '8888']) + self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '1', 'set', 'vrf', vrf]) + + self.cli_set(['policy', 'route', 'smoketest', 'interface', interface]) + self.cli_set(['policy', 'route6', 'smoketest6', 'interface', interface]) + + self.cli_commit() + + mark_hex = "{0:#010x}".format(table_mark_offset - int(vrf_table_id)) + + # IPv4 + + nftables_search = [ + [f'iifname "{interface}"', 'jump VYOS_PBR_UD_smoketest'], + ['tcp flags syn / syn,ack', 'tcp dport 8888', 'meta mark set ' + mark_hex] + ] + + self.verify_nftables(nftables_search, 'ip vyos_mangle') + + # IPv6 + + nftables6_search = [ + [f'iifname "{interface}"', 'jump VYOS_PBR6_UD_smoketest'], + ['meta l4proto { tcp, udp }', 'th dport 8888', 'meta mark set ' + mark_hex] + ] + + self.verify_nftables(nftables6_search, 'ip6 vyos_mangle') + + # IP rule fwmark -> table + + ip_rule_search = [ + ['fwmark ' + hex(table_mark_offset - int(vrf_table_id)), 'lookup ' + vrf] + ] + + self.verify_rules(ip_rule_search) + + def test_pbr_matching_criteria(self): self.cli_set(['policy', 'route', 'smoketest', 'default-log']) self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'protocol', 'udp']) diff --git a/smoketest/scripts/cli/test_protocols_ospf.py b/smoketest/scripts/cli/test_protocols_ospf.py index 585c1dc89..905eaf2e9 100755 --- a/smoketest/scripts/cli/test_protocols_ospf.py +++ b/smoketest/scripts/cli/test_protocols_ospf.py @@ -16,7 +16,6 @@ import unittest -from time import sleep from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSessionError @@ -27,6 +26,7 @@ PROCESS_NAME = 'ospfd' base_path = ['protocols', 'ospf'] route_map = 'foo-bar-baz10' +dummy_if = 'dum3562' class TestProtocolsOSPF(VyOSUnitTestSHIM.TestCase): @classmethod @@ -38,6 +38,7 @@ class TestProtocolsOSPF(VyOSUnitTestSHIM.TestCase): cls.cli_set(cls, ['policy', 'route-map', route_map, 'rule', '10', 'action', 'permit']) cls.cli_set(cls, ['policy', 'route-map', route_map, 'rule', '20', 'action', 'permit']) + cls.cli_set(cls, ['interfaces', 'dummy', dummy_if]) # ensure we can also run this test on a live system - so lets clean # out the current configuration :) @@ -46,6 +47,7 @@ class TestProtocolsOSPF(VyOSUnitTestSHIM.TestCase): @classmethod def tearDownClass(cls): cls.cli_delete(cls, ['policy', 'route-map', route_map]) + cls.cli_delete(cls, ['interfaces', 'dummy', dummy_if]) super(TestProtocolsOSPF, cls).tearDownClass() def tearDown(self): @@ -441,14 +443,13 @@ class TestProtocolsOSPF(VyOSUnitTestSHIM.TestCase): global_block_high = "399" local_block_low = "400" local_block_high = "499" - interface = 'lo' maximum_stack_size = '5' prefix_one = '192.168.0.1/32' prefix_two = '192.168.0.2/32' prefix_one_value = '1' prefix_two_value = '2' - self.cli_set(base_path + ['interface', interface]) + self.cli_set(base_path + ['interface', dummy_if]) self.cli_set(base_path + ['segment-routing', 'maximum-label-depth', maximum_stack_size]) self.cli_set(base_path + ['segment-routing', 'global-block', 'low-label-value', global_block_low]) self.cli_set(base_path + ['segment-routing', 'global-block', 'high-label-value', global_block_high]) @@ -472,17 +473,14 @@ class TestProtocolsOSPF(VyOSUnitTestSHIM.TestCase): def test_ospf_15_ldp_sync(self): holddown = "500" - interface = 'lo' interfaces = Section.interfaces('ethernet') - self.cli_set(base_path + ['interface', interface]) + self.cli_set(base_path + ['interface', dummy_if]) self.cli_set(base_path + ['ldp-sync', 'holddown', holddown]) # Commit main OSPF changes self.cli_commit() - sleep(10) - # Verify main OSPF changes frrconfig = self.getFRRconfig('router ospf', daemon=PROCESS_NAME) self.assertIn(f'router ospf', frrconfig) @@ -514,7 +512,7 @@ class TestProtocolsOSPF(VyOSUnitTestSHIM.TestCase): config = self.getFRRconfig(f'interface {interface}', daemon=PROCESS_NAME) self.assertIn(f'interface {interface}', config) self.assertIn(f' ip ospf dead-interval 40', config) - self.assertIn(f' no ip ospf mpls ldp-sync', config) + self.assertNotIn(f' ip ospf mpls ldp-sync', config) def test_ospf_16_graceful_restart(self): period = '300' diff --git a/smoketest/scripts/cli/test_protocols_static.py b/smoketest/scripts/cli/test_protocols_static.py index c5cf2aab6..f676e2a52 100755 --- a/smoketest/scripts/cli/test_protocols_static.py +++ b/smoketest/scripts/cli/test_protocols_static.py @@ -21,6 +21,7 @@ from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSessionError from vyos.template import is_ipv6 from vyos.utils.network import get_interface_config +from vyos.utils.network import get_vrf_tableid base_path = ['protocols', 'static'] vrf_path = ['protocols', 'vrf'] @@ -421,7 +422,7 @@ class TestProtocolsStatic(VyOSUnitTestSHIM.TestCase): tmp = get_interface_config(vrf) # Compare VRF table ID - self.assertEqual(tmp['linkinfo']['info_data']['table'], int(vrf_config['table'])) + self.assertEqual(get_vrf_tableid(vrf), int(vrf_config['table'])) self.assertEqual(tmp['linkinfo']['info_kind'], 'vrf') # Verify FRR bgpd configuration @@ -478,4 +479,4 @@ class TestProtocolsStatic(VyOSUnitTestSHIM.TestCase): self.assertIn(tmp, frrconfig) if __name__ == '__main__': - unittest.main(verbosity=2) + unittest.main(verbosity=2, failfast=True) diff --git a/smoketest/scripts/cli/test_service_snmp.py b/smoketest/scripts/cli/test_service_snmp.py index b3daa90d0..7d5eaa440 100755 --- a/smoketest/scripts/cli/test_service_snmp.py +++ b/smoketest/scripts/cli/test_service_snmp.py @@ -246,5 +246,19 @@ class TestSNMPService(VyOSUnitTestSHIM.TestCase): for excluded in snmpv3_view_oid_exclude: self.assertIn(f'view {snmpv3_view} excluded .{excluded}', tmp) + def test_snmp_script_extensions(self): + extensions = { + 'default': 'snmp_smoketest_extension_script.sh', + 'external': '/run/external_snmp_smoketest_extension_script.sh' + } + + for key, val in extensions.items(): + self.cli_set(base_path + ['script-extensions', 'extension-name', key, 'script', val]) + self.cli_commit() + + self.assertEqual(get_config_value('extend default'), f'/config/user-data/{extensions["default"]}') + self.assertEqual(get_config_value('extend external'), extensions["external"]) + + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_service_ssh.py b/smoketest/scripts/cli/test_service_ssh.py index b09990c92..d8e325eee 100755 --- a/smoketest/scripts/cli/test_service_ssh.py +++ b/smoketest/scripts/cli/test_service_ssh.py @@ -304,6 +304,22 @@ class TestServiceSSH(VyOSUnitTestSHIM.TestCase): for line in ssh_lines: self.assertIn(line, tmp_sshd_conf) + def test_ssh_pubkey_accepted_algorithm(self): + algs = ['ssh-ed25519', 'ecdsa-sha2-nistp256', 'ecdsa-sha2-nistp384', + 'ecdsa-sha2-nistp521', 'ssh-dss', 'ssh-rsa', 'rsa-sha2-256', + 'rsa-sha2-512' + ] + + expected = 'PubkeyAcceptedAlgorithms ' + for alg in algs: + self.cli_set(base_path + ['pubkey-accepted-algorithm', alg]) + expected = f'{expected}{alg},' + expected = expected[:-1] + + self.cli_commit() + tmp_sshd_conf = read_file(SSHD_CONF) + self.assertIn(expected, tmp_sshd_conf) + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_system_conntrack.py b/smoketest/scripts/cli/test_system_conntrack.py index 3ae7b6217..c07fdce77 100755 --- a/smoketest/scripts/cli/test_system_conntrack.py +++ b/smoketest/scripts/cli/test_system_conntrack.py @@ -20,7 +20,7 @@ import unittest from base_vyostest_shim import VyOSUnitTestSHIM from vyos.firewall import find_nftables_rule -from vyos.utils.file import read_file +from vyos.utils.file import read_file, read_json base_path = ['system', 'conntrack'] @@ -28,6 +28,9 @@ def get_sysctl(parameter): tmp = parameter.replace(r'.', r'/') return read_file(f'/proc/sys/{tmp}') +def get_logger_config(): + return read_json('/run/vyos-conntrack-logger.conf') + class TestSystemConntrack(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): @@ -280,5 +283,35 @@ class TestSystemConntrack(VyOSUnitTestSHIM.TestCase): self.verify_nftables(nftables6_search, 'ip6 vyos_conntrack') self.cli_delete(['firewall']) + + def test_conntrack_log(self): + expected_config = { + 'event': { + 'destroy': {}, + 'new': {}, + 'update': {}, + }, + 'queue_size': '10000' + } + self.cli_set(base_path + ['log', 'event', 'destroy']) + self.cli_set(base_path + ['log', 'event', 'new']) + self.cli_set(base_path + ['log', 'event', 'update']) + self.cli_set(base_path + ['log', 'queue-size', '10000']) + self.cli_commit() + self.assertEqual(expected_config, get_logger_config()) + self.assertEqual('0', get_sysctl('net.netfilter.nf_conntrack_timestamp')) + + for event in ['destroy', 'new', 'update']: + for proto in ['icmp', 'other', 'tcp', 'udp']: + self.cli_set(base_path + ['log', 'event', event, proto]) + expected_config['event'][event][proto] = {} + self.cli_set(base_path + ['log', 'timestamp']) + expected_config['timestamp'] = {} + self.cli_commit() + + self.assertEqual(expected_config, get_logger_config()) + self.assertEqual('1', get_sysctl('net.netfilter.nf_conntrack_timestamp')) + + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_system_syslog.py b/smoketest/scripts/cli/test_system_syslog.py index 030ec587b..45a5b4087 100755 --- a/smoketest/scripts/cli/test_system_syslog.py +++ b/smoketest/scripts/cli/test_system_syslog.py @@ -53,8 +53,8 @@ class TestRSYSLOGService(VyOSUnitTestSHIM.TestCase): self.assertFalse(process_named_running(PROCESS_NAME)) def test_syslog_basic(self): - host1 = '198.51.100.1' - host2 = '192.0.2.1' + host1 = '127.0.0.10' + host2 = '127.0.0.20' self.cli_set(base_path + ['host', host1, 'port', '999']) self.cli_set(base_path + ['host', host1, 'facility', 'all', 'level', 'all']) @@ -68,7 +68,7 @@ class TestRSYSLOGService(VyOSUnitTestSHIM.TestCase): # *.* @198.51.100.1:999 # kern.err @192.0.2.1:514 config = [get_config_value('\*.\*'), get_config_value('kern.err'), get_config_value('\*.warning')] - expected = ['@198.51.100.1:999', '@192.0.2.1:514', '/dev/console'] + expected = [f'@{host1}:999', f'@{host2}:514', '/dev/console'] for i in range(0,3): self.assertIn(expected[i], config[i]) diff --git a/smoketest/scripts/cli/test_vpn_ipsec.py b/smoketest/scripts/cli/test_vpn_ipsec.py index 27356d70e..2dc66485b 100755 --- a/smoketest/scripts/cli/test_vpn_ipsec.py +++ b/smoketest/scripts/cli/test_vpn_ipsec.py @@ -252,6 +252,15 @@ class TestVPNIPsec(VyOSUnitTestSHIM.TestCase): for line in swanctl_conf_lines: self.assertIn(line, swanctl_conf) + # if dpd is not specified it should not be enabled (see T6599) + swanctl_unexpected_lines = [ + f'dpd_timeout' + f'dpd_delay' + ] + + for unexpected_line in swanctl_unexpected_lines: + self.assertNotIn(unexpected_line, swanctl_conf) + swanctl_secrets_lines = [ f'id-{regex_uuid4} = "{local_id}"', f'id-{regex_uuid4} = "{remote_id}"', @@ -639,8 +648,7 @@ class TestVPNIPsec(VyOSUnitTestSHIM.TestCase): f'auth = eap-mschapv2', f'eap_id = %any', f'esp_proposals = aes256-sha512,aes256-sha384,aes256-sha256,aes256-sha1,aes128gcm128-sha256', - f'rekey_time = {eap_lifetime}s', - f'rand_time = 540s', + f'life_time = {eap_lifetime}s', f'dpd_action = clear', f'replay_window = 32', f'inactivity = 28800', @@ -761,8 +769,7 @@ class TestVPNIPsec(VyOSUnitTestSHIM.TestCase): f'auth = eap-tls', f'eap_id = %any', f'esp_proposals = aes256-sha512,aes256-sha384,aes256-sha256,aes256-sha1,aes128gcm128-sha256', - f'rekey_time = {eap_lifetime}s', - f'rand_time = 540s', + f'life_time = {eap_lifetime}s', f'dpd_action = clear', f'inactivity = 28800', f'local_ts = 0.0.0.0/0,::/0', @@ -876,8 +883,7 @@ class TestVPNIPsec(VyOSUnitTestSHIM.TestCase): f'certs = peer1.pem', f'cacerts = MyVyOS-CA.pem,MyVyOS-IntCA.pem', f'esp_proposals = aes256-sha512,aes256-sha384,aes256-sha256,aes256-sha1,aes128gcm128-sha256', - f'rekey_time = {eap_lifetime}s', - f'rand_time = 540s', + f'life_time = {eap_lifetime}s', f'dpd_action = clear', f'inactivity = 28800', f'local_ts = 0.0.0.0/0,::/0', @@ -968,5 +974,117 @@ class TestVPNIPsec(VyOSUnitTestSHIM.TestCase): self.tearDownPKI() + def test_remote_access_no_rekey(self): + # In some RA secnarios, disabling server-initiated rekey of IKE and CHILD SA is desired + self.setupPKI() + + ike_group = 'IKE-RW' + esp_group = 'ESP-RW' + + conn_name = 'vyos-rw' + local_address = '192.0.2.1' + ip_pool_name = 'ra-rw-ipv4' + ike_lifetime = '7200' + eap_lifetime = '3600' + local_id = 'ipsec.vyos.net' + + name_servers = ['172.16.254.100', '172.16.254.101'] + prefix = '172.16.250.0/28' + + # IKE + self.cli_set(base_path + ['ike-group', ike_group, 'key-exchange', 'ikev2']) + self.cli_set(base_path + ['ike-group', ike_group, 'lifetime', '0']) + self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '1', 'dh-group', '14']) + self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '1', 'encryption', 'aes256']) + self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '1', 'hash', 'sha512']) + self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '2', 'dh-group', '14']) + self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '2', 'encryption', 'aes256']) + self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '2', 'hash', 'sha256']) + self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '3', 'dh-group', '2']) + self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '3', 'encryption', 'aes256']) + self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '3', 'hash', 'sha256']) + self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '10', 'dh-group', '14']) + self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '10', 'encryption', 'aes128gcm128']) + self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '10', 'hash', 'sha256']) + + # ESP + self.cli_set(base_path + ['esp-group', esp_group, 'lifetime', eap_lifetime]) + self.cli_set(base_path + ['esp-group', esp_group, 'pfs', 'disable']) + self.cli_set(base_path + ['esp-group', esp_group, 'disable-rekey']) + self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '1', 'encryption', 'aes256']) + self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '1', 'hash', 'sha512']) + self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '2', 'encryption', 'aes256']) + self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '2', 'hash', 'sha384']) + self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '3', 'encryption', 'aes256']) + self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '3', 'hash', 'sha256']) + self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '4', 'encryption', 'aes256']) + self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '4', 'hash', 'sha1']) + self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '10', 'encryption', 'aes128gcm128']) + self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '10', 'hash', 'sha256']) + + self.cli_set(base_path + ['remote-access', 'connection', conn_name, 'authentication', 'local-id', local_id]) + # Use client-mode x509 instead of default EAP-MSCHAPv2 + self.cli_set(base_path + ['remote-access', 'connection', conn_name, 'authentication', 'client-mode', 'x509']) + self.cli_set(base_path + ['remote-access', 'connection', conn_name, 'authentication', 'server-mode', 'x509']) + + self.cli_set(base_path + ['remote-access', 'connection', conn_name, 'authentication', 'x509', 'certificate', peer_name]) + # verify() - CA cert required for x509 auth + with self.assertRaises(ConfigSessionError): + self.cli_commit() + self.cli_set(base_path + ['remote-access', 'connection', conn_name, 'authentication', 'x509', 'ca-certificate', ca_name]) + self.cli_set(base_path + ['remote-access', 'connection', conn_name, 'authentication', 'x509', 'ca-certificate', int_ca_name]) + + self.cli_set(base_path + ['remote-access', 'connection', conn_name, 'esp-group', esp_group]) + self.cli_set(base_path + ['remote-access', 'connection', conn_name, 'ike-group', ike_group]) + self.cli_set(base_path + ['remote-access', 'connection', conn_name, 'local-address', local_address]) + self.cli_set(base_path + ['remote-access', 'connection', conn_name, 'pool', ip_pool_name]) + + for ns in name_servers: + self.cli_set(base_path + ['remote-access', 'pool', ip_pool_name, 'name-server', ns]) + self.cli_set(base_path + ['remote-access', 'pool', ip_pool_name, 'prefix', prefix]) + + self.cli_commit() + + # verify applied configuration + swanctl_conf = read_file(swanctl_file) + swanctl_lines = [ + f'{conn_name}', + f'remote_addrs = %any', + f'local_addrs = {local_address}', + f'proposals = aes256-sha512-modp2048,aes256-sha256-modp2048,aes256-sha256-modp1024,aes128gcm128-sha256-modp2048', + f'version = 2', + f'send_certreq = no', + f'rekey_time = 0s', + f'keyingtries = 0', + f'pools = {ip_pool_name}', + f'id = "{local_id}"', + f'auth = pubkey', + f'certs = peer1.pem', + f'cacerts = MyVyOS-CA.pem,MyVyOS-IntCA.pem', + f'esp_proposals = aes256-sha512,aes256-sha384,aes256-sha256,aes256-sha1,aes128gcm128-sha256', + f'life_time = {eap_lifetime}s', + f'rekey_time = 0s', + f'dpd_action = clear', + f'inactivity = 28800', + f'local_ts = 0.0.0.0/0,::/0', + ] + for line in swanctl_lines: + self.assertIn(line, swanctl_conf) + + swanctl_pool_lines = [ + f'{ip_pool_name}', + f'addrs = {prefix}', + f'dns = {",".join(name_servers)}', + ] + for line in swanctl_pool_lines: + self.assertIn(line, swanctl_conf) + + # Check Root CA, Intermediate CA and Peer cert/key pair is present + self.assertTrue(os.path.exists(os.path.join(CA_PATH, f'{ca_name}.pem'))) + self.assertTrue(os.path.exists(os.path.join(CA_PATH, f'{int_ca_name}.pem'))) + self.assertTrue(os.path.exists(os.path.join(CERT_PATH, f'{peer_name}.pem'))) + + self.tearDownPKI() + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_vrf.py b/smoketest/scripts/cli/test_vrf.py index 243397dc2..2bb6c91c1 100755 --- a/smoketest/scripts/cli/test_vrf.py +++ b/smoketest/scripts/cli/test_vrf.py @@ -19,14 +19,18 @@ import os import unittest from base_vyostest_shim import VyOSUnitTestSHIM +from json import loads +from jmespath import search from vyos.configsession import ConfigSessionError from vyos.ifconfig import Interface from vyos.ifconfig import Section from vyos.utils.file import read_file from vyos.utils.network import get_interface_config +from vyos.utils.network import get_vrf_tableid from vyos.utils.network import is_intf_addr_assigned from vyos.utils.network import interface_exists +from vyos.utils.process import cmd from vyos.utils.system import sysctl_read base_path = ['vrf'] @@ -111,8 +115,7 @@ class VRFTest(VyOSUnitTestSHIM.TestCase): frrconfig = self.getFRRconfig(f'vrf {vrf}') self.assertIn(f' vni {table}', frrconfig) - tmp = get_interface_config(vrf) - self.assertEqual(int(table), tmp['linkinfo']['info_data']['table']) + self.assertEqual(int(table), get_vrf_tableid(vrf)) # Increment table ID for the next run table = str(int(table) + 1) @@ -266,8 +269,7 @@ class VRFTest(VyOSUnitTestSHIM.TestCase): for address in addresses: self.assertTrue(is_intf_addr_assigned(interface, address)) # Verify VRF table ID - tmp = get_interface_config(vrf) - self.assertEqual(int(table), tmp['linkinfo']['info_data']['table']) + self.assertEqual(int(table), get_vrf_tableid(vrf)) # Verify interface is assigned to VRF tmp = get_interface_config(interface) @@ -558,26 +560,39 @@ class VRFTest(VyOSUnitTestSHIM.TestCase): self.assertNotIn(f' no ipv6 nht resolve-via-default', frrconfig) def test_vrf_conntrack(self): - table = '1000' + table = '8710' nftables_rules = { 'vrf_zones_ct_in': ['ct original zone set iifname map @ct_iface_map'], 'vrf_zones_ct_out': ['ct original zone set oifname map @ct_iface_map'] } - self.cli_set(base_path + ['name', 'blue', 'table', table]) + self.cli_set(base_path + ['name', 'randomVRF', 'table', '1000']) self.cli_commit() # Conntrack rules should not be present for chain, rule in nftables_rules.items(): self.verify_nftables_chain(rule, 'inet vrf_zones', chain, inverse=True) + # conntrack is only enabled once NAT, NAT66 or firewalling is enabled self.cli_set(['nat']) - self.cli_commit() + + for vrf in vrfs: + base = base_path + ['name', vrf] + self.cli_set(base + ['table', table]) + table = str(int(table) + 1) + # We need the commit inside the loop to trigger the bug in T6603 + self.cli_commit() # Conntrack rules should now be present for chain, rule in nftables_rules.items(): self.verify_nftables_chain(rule, 'inet vrf_zones', chain, inverse=False) + # T6603: there should be only ONE entry for the iifname/oifname in the chains + tmp = loads(cmd('sudo nft -j list table inet vrf_zones')) + num_rules = len(search("nftables[].rule[].chain", tmp)) + # ['vrf_zones_ct_in', 'vrf_zones_ct_out'] + self.assertEqual(num_rules, 2) + self.cli_delete(['nat']) if __name__ == '__main__': |