diff options
Diffstat (limited to 'smoketest/scripts/cli')
-rwxr-xr-x | smoketest/scripts/cli/test_interfaces_bonding.py | 89 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_interfaces_bridge.py | 174 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_interfaces_macsec.py | 45 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_interfaces_openvpn.py | 38 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_load_balancning_wan.py | 189 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_protocols_isis.py | 69 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_service_dns_forwarding.py | 47 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_service_salt.py | 94 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_service_snmp.py | 47 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_service_ssh.py | 73 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_system_flow-accounting.py | 38 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_system_ipv6.py | 31 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_system_login.py | 8 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_system_ntp.py | 17 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_vpn_openconnect.py | 72 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_vrf.py | 141 |
16 files changed, 1013 insertions, 159 deletions
diff --git a/smoketest/scripts/cli/test_interfaces_bonding.py b/smoketest/scripts/cli/test_interfaces_bonding.py index 4f2fe979a..ab75cdcc9 100755 --- a/smoketest/scripts/cli/test_interfaces_bonding.py +++ b/smoketest/scripts/cli/test_interfaces_bonding.py @@ -49,7 +49,7 @@ class BondingInterfaceTest(BasicInterfaceTest.TestCase): if not '.' in tmp: cls._members.append(tmp) - cls._options['bond0'] = [] + cls._options = {'bond0' : []} for member in cls._members: cls._options['bond0'].append(f'member interface {member}') cls._interfaces = list(cls._options) @@ -136,7 +136,7 @@ class BondingInterfaceTest(BasicInterfaceTest.TestCase): def test_bonding_hash_policy(self): # Define available bonding hash policies - hash_policies = ['layer2', 'layer2+3', 'layer2+3', 'encap2+3', 'encap3+4'] + hash_policies = ['layer2', 'layer2+3', 'encap2+3', 'encap3+4'] for hash_policy in hash_policies: for interface in self._interfaces: for option in self._options.get(interface, []): @@ -151,6 +151,29 @@ class BondingInterfaceTest(BasicInterfaceTest.TestCase): defined_policy = read_file(f'/sys/class/net/{interface}/bonding/xmit_hash_policy').split() self.assertEqual(defined_policy[0], hash_policy) + def test_bonding_mii_monitoring_interval(self): + for interface in self._interfaces: + for option in self._options.get(interface, []): + self.cli_set(self._base_path + [interface] + option.split()) + + self.cli_commit() + + # verify default + for interface in self._interfaces: + tmp = read_file(f'/sys/class/net/{interface}/bonding/miimon').split() + self.assertIn('100', tmp) + + mii_mon = '250' + for interface in self._interfaces: + self.cli_set(self._base_path + [interface, 'mii-mon-interval', mii_mon]) + + self.cli_commit() + + # verify new CLI value + for interface in self._interfaces: + tmp = read_file(f'/sys/class/net/{interface}/bonding/miimon').split() + self.assertIn(mii_mon, tmp) + def test_bonding_multi_use_member(self): # Define available bonding hash policies for interface in ['bond10', 'bond20']: @@ -165,5 +188,67 @@ class BondingInterfaceTest(BasicInterfaceTest.TestCase): self.cli_commit() + def test_bonding_source_interface(self): + # Re-use member interface that is already a source-interface + bond = 'bond99' + pppoe = 'pppoe98756' + member = next(iter(self._members)) + + self.cli_set(self._base_path + [bond, 'member', 'interface', member]) + self.cli_set(['interfaces', 'pppoe', pppoe, 'source-interface', member]) + + # check validate() - can not add interface to bond, it is the source-interface of ... + with self.assertRaises(ConfigSessionError): + self.cli_commit() + + self.cli_delete(['interfaces', 'pppoe', pppoe]) + self.cli_commit() + + # verify config + slaves = read_file(f'/sys/class/net/{bond}/bonding/slaves').split() + self.assertIn(member, slaves) + + def test_bonding_source_bridge_interface(self): + # Re-use member interface that is already a source-interface + bond = 'bond1097' + bridge = 'br6327' + member = next(iter(self._members)) + + self.cli_set(self._base_path + [bond, 'member', 'interface', member]) + self.cli_set(['interfaces', 'bridge', bridge, 'member', 'interface', member]) + + # check validate() - can not add interface to bond, it is a member of bridge ... + with self.assertRaises(ConfigSessionError): + self.cli_commit() + + self.cli_delete(['interfaces', 'bridge', bridge]) + self.cli_commit() + + # verify config + slaves = read_file(f'/sys/class/net/{bond}/bonding/slaves').split() + self.assertIn(member, slaves) + + def test_bonding_uniq_member_description(self): + ethernet_path = ['interfaces', 'ethernet'] + for interface in self._interfaces: + for option in self._options.get(interface, []): + self.cli_set(self._base_path + [interface] + option.split()) + + self.cli_commit() + + # Add any changes on bonding members + # For example add description on separate ethX interfaces + for interface in self._interfaces: + for member in self._members: + self.cli_set(ethernet_path + [member, 'description', member + '_interface']) + + self.cli_commit() + + # verify config + for interface in self._interfaces: + slaves = read_file(f'/sys/class/net/{interface}/bonding/slaves').split() + for member in self._members: + self.assertIn(member, slaves) + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_interfaces_bridge.py b/smoketest/scripts/cli/test_interfaces_bridge.py index 09441326a..884fa5c6b 100755 --- a/smoketest/scripts/cli/test_interfaces_bridge.py +++ b/smoketest/scripts/cli/test_interfaces_bridge.py @@ -19,6 +19,7 @@ import json import unittest from base_interfaces_test import BasicInterfaceTest +from copy import deepcopy from glob import glob from netifaces import interfaces @@ -144,87 +145,123 @@ class BridgeInterfaceTest(BasicInterfaceTest.TestCase): super().test_vif_8021q_mtu_limits() def test_bridge_vlan_filter(self): - vif_vlan = 2 + vifs = ['10', '20', '30', '40'] + native_vlan = '20' + # Add member interface to bridge and set VLAN filter for interface in self._interfaces: base = self._base_path + [interface] self.cli_set(base + ['enable-vlan']) self.cli_set(base + ['address', '192.0.2.1/24']) - self.cli_set(base + ['vif', str(vif_vlan), 'address', '192.0.3.1/24']) - self.cli_set(base + ['vif', str(vif_vlan), 'mtu', self._mtu]) - vlan_id = 101 - allowed_vlan = 2 - allowed_vlan_range = '4-9' - # assign members to bridge interface + for vif in vifs: + self.cli_set(base + ['vif', vif, 'address', f'192.0.{vif}.1/24']) + self.cli_set(base + ['vif', vif, 'mtu', self._mtu]) + for member in self._members: base_member = base + ['member', 'interface', member] - self.cli_set(base_member + ['allowed-vlan', str(allowed_vlan)]) - self.cli_set(base_member + ['allowed-vlan', allowed_vlan_range]) - self.cli_set(base_member + ['native-vlan', str(vlan_id)]) - vlan_id += 1 + self.cli_set(base_member + ['native-vlan', native_vlan]) + for vif in vifs: + self.cli_set(base_member + ['allowed-vlan', vif]) # commit config self.cli_commit() - # Detect the vlan filter function + def _verify_members(interface, members) -> None: + # check member interfaces are added on the bridge + bridge_members = [] + for tmp in glob(f'/sys/class/net/{interface}/lower_*'): + bridge_members.append(os.path.basename(tmp).replace('lower_', '')) + + self.assertListEqual(sorted(members), sorted(bridge_members)) + + def _check_vlan_filter(interface, vifs) -> None: + configured_vlan_ids = [] + + bridge_json = cmd(f'bridge -j vlan show dev {interface}') + bridge_json = json.loads(bridge_json) + self.assertIsNotNone(bridge_json) + + for tmp in bridge_json: + self.assertIn('vlans', tmp) + + for vlan in tmp['vlans']: + self.assertIn('vlan', vlan) + configured_vlan_ids.append(str(vlan['vlan'])) + + # Verify native VLAN ID has 'PVID' flag set on individual member ports + if not interface.startswith('br') and str(vlan['vlan']) == native_vlan: + self.assertIn('flags', vlan) + self.assertIn('PVID', vlan['flags']) + + self.assertListEqual(sorted(configured_vlan_ids), sorted(vifs)) + + # Verify correct setting of VLAN filter function for interface in self._interfaces: tmp = read_file(f'/sys/class/net/{interface}/bridge/vlan_filtering') self.assertEqual(tmp, '1') - # Execute the program to obtain status information - json_data = cmd('bridge -j vlan show', shell=True) - vlan_filter_status = None - vlan_filter_status = json.loads(json_data) - - if vlan_filter_status is not None: - for interface_status in vlan_filter_status: - ifname = interface_status['ifname'] - for interface in self._members: - vlan_success = 0; - if interface == ifname: - vlans_status = interface_status['vlans'] - for vlan_status in vlans_status: - vlan_id = vlan_status['vlan'] - flag_num = 0 - if 'flags' in vlan_status: - flags = vlan_status['flags'] - for flag in flags: - flag_num = flag_num +1 - if vlan_id == 2: - if flag_num == 0: - vlan_success = vlan_success + 1 - else: - for id in range(4,10): - if vlan_id == id: - if flag_num == 0: - vlan_success = vlan_success + 1 - if vlan_id >= 101: - if flag_num == 2: - vlan_success = vlan_success + 1 - if vlan_success >= 7: - self.assertTrue(True) - else: - self.assertTrue(False) + # Obtain status information and verify proper VLAN filter setup. + # First check if all members are present, second check if all VLANs + # are assigned on the parend bridge interface, third verify all the + # VLANs are properly setup on the downstream "member" ports + for interface in self._interfaces: + # check member interfaces are added on the bridge + _verify_members(interface, self._members) - else: - self.assertTrue(False) + # Check if all VLAN ids are properly set up. Bridge interface always + # has native VLAN 1 + tmp = deepcopy(vifs) + tmp.append('1') + _check_vlan_filter(interface, tmp) - # check member interfaces are added on the bridge + for member in self._members: + _check_vlan_filter(member, vifs) + + # change member interface description to trigger config update, + # VLANs must still exist (T4565) for interface in self._interfaces: - bridge_members = [] - for tmp in glob(f'/sys/class/net/{interface}/lower_*'): - bridge_members.append(os.path.basename(tmp).replace('lower_', '')) + for member in self._members: + self.cli_set(['interfaces', Section.section(member), member, 'description', f'foo {member}']) + + # commit config + self.cli_commit() + + # Obtain status information and verify proper VLAN filter setup. + # First check if all members are present, second check if all VLANs + # are assigned on the parend bridge interface, third verify all the + # VLANs are properly setup on the downstream "member" ports + for interface in self._interfaces: + # check member interfaces are added on the bridge + _verify_members(interface, self._members) + + # Check if all VLAN ids are properly set up. Bridge interface always + # has native VLAN 1 + tmp = deepcopy(vifs) + tmp.append('1') + _check_vlan_filter(interface, tmp) for member in self._members: - self.assertIn(member, bridge_members) + _check_vlan_filter(member, vifs) # delete all members for interface in self._interfaces: self.cli_delete(self._base_path + [interface, 'member']) + # commit config + self.cli_commit() - def test_bridge_vlan_members(self): + # verify member interfaces are no longer assigned on the bridge + for interface in self._interfaces: + bridge_members = [] + for tmp in glob(f'/sys/class/net/{interface}/lower_*'): + bridge_members.append(os.path.basename(tmp).replace('lower_', '')) + + self.assertNotEqual(len(self._members), len(bridge_members)) + for member in self._members: + self.assertNotIn(member, bridge_members) + + def test_bridge_vif_members(self): # T2945: ensure that VIFs are not dropped from bridge vifs = ['300', '400'] for interface in self._interfaces: @@ -249,5 +286,34 @@ class BridgeInterfaceTest(BasicInterfaceTest.TestCase): self.cli_delete(['interfaces', 'ethernet', member, 'vif', vif]) self.cli_delete(['interfaces', 'bridge', interface, 'member', 'interface', f'{member}.{vif}']) + def test_bridge_vif_s_vif_c_members(self): + # T2945: ensure that VIFs are not dropped from bridge + vifs = ['300', '400'] + vifc = ['301', '401'] + for interface in self._interfaces: + for member in self._members: + for vif_s in vifs: + for vif_c in vifc: + self.cli_set(['interfaces', 'ethernet', member, 'vif-s', vif_s, 'vif-c', vif_c]) + self.cli_set(['interfaces', 'bridge', interface, 'member', 'interface', f'{member}.{vif_s}.{vif_c}']) + + self.cli_commit() + + # Verify config + for interface in self._interfaces: + for member in self._members: + for vif_s in vifs: + for vif_c in vifc: + # member interface must be assigned to the bridge + self.assertTrue(os.path.exists(f'/sys/class/net/{interface}/lower_{member}.{vif_s}.{vif_c}')) + + # delete all members + for interface in self._interfaces: + for member in self._members: + for vif_s in vifs: + self.cli_delete(['interfaces', 'ethernet', member, 'vif-s', vif_s]) + for vif_c in vifc: + self.cli_delete(['interfaces', 'bridge', interface, 'member', 'interface', f'{member}.{vif_s}.{vif_c}']) + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_interfaces_macsec.py b/smoketest/scripts/cli/test_interfaces_macsec.py index 5b10bfa44..64bfa0dc9 100755 --- a/smoketest/scripts/cli/test_interfaces_macsec.py +++ b/smoketest/scripts/cli/test_interfaces_macsec.py @@ -28,6 +28,8 @@ from vyos.util import read_file from vyos.util import get_interface_config from vyos.util import process_named_running +PROCESS_NAME = 'wpa_supplicant' + def get_config_value(interface, key): tmp = read_file(f'/run/wpa_supplicant/{interface}.conf') tmp = re.findall(r'\n?{}=(.*)'.format(key), tmp) @@ -55,6 +57,10 @@ class MACsecInterfaceTest(BasicInterfaceTest.TestCase): # call base-classes classmethod super(cls, cls).setUpClass() + def tearDown(self): + super().tearDown() + self.assertFalse(process_named_running(PROCESS_NAME)) + def test_macsec_encryption(self): # MACsec can be operating in authentication and encryption mode - both # using different mandatory settings, lets test encryption as the basic @@ -96,28 +102,29 @@ class MACsecInterfaceTest(BasicInterfaceTest.TestCase): self.cli_commit() tmp = get_config_value(src_interface, 'macsec_integ_only') - self.assertTrue("0" in tmp) + self.assertIn("0", tmp) tmp = get_config_value(src_interface, 'mka_cak') - self.assertTrue(mak_cak in tmp) + self.assertIn(mak_cak, tmp) tmp = get_config_value(src_interface, 'mka_ckn') - self.assertTrue(mak_ckn in tmp) + self.assertIn(mak_ckn, tmp) # check that the default priority of 255 is programmed tmp = get_config_value(src_interface, 'mka_priority') - self.assertTrue("255" in tmp) + self.assertIn("255", tmp) tmp = get_config_value(src_interface, 'macsec_replay_window') - self.assertTrue(replay_window in tmp) + self.assertIn(replay_window, tmp) tmp = read_file(f'/sys/class/net/{interface}/mtu') self.assertEqual(tmp, '1460') - # Check for running process - self.assertTrue(process_named_running('wpa_supplicant')) + # Check for running process + self.assertTrue(process_named_running(PROCESS_NAME)) def test_macsec_gcm_aes_128(self): + src_interface = 'eth0' interface = 'macsec1' cipher = 'gcm-aes-128' self.cli_set(self._base_path + [interface]) @@ -125,7 +132,7 @@ class MACsecInterfaceTest(BasicInterfaceTest.TestCase): # check validate() - source interface is mandatory with self.assertRaises(ConfigSessionError): self.cli_commit() - self.cli_set(self._base_path + [interface, 'source-interface', 'eth0']) + self.cli_set(self._base_path + [interface, 'source-interface', src_interface]) # check validate() - cipher is mandatory with self.assertRaises(ConfigSessionError): @@ -138,7 +145,15 @@ class MACsecInterfaceTest(BasicInterfaceTest.TestCase): self.assertIn(interface, interfaces()) self.assertEqual(cipher, get_cipher(interface)) + # check that we use the new macsec_csindex option (T4537) + tmp = get_config_value(src_interface, 'macsec_csindex') + self.assertIn("0", tmp) + + # Check for running process + self.assertTrue(process_named_running(PROCESS_NAME)) + def test_macsec_gcm_aes_256(self): + src_interface = 'eth0' interface = 'macsec4' cipher = 'gcm-aes-256' self.cli_set(self._base_path + [interface]) @@ -146,7 +161,7 @@ class MACsecInterfaceTest(BasicInterfaceTest.TestCase): # check validate() - source interface is mandatory with self.assertRaises(ConfigSessionError): self.cli_commit() - self.cli_set(self._base_path + [interface, 'source-interface', 'eth0']) + self.cli_set(self._base_path + [interface, 'source-interface', src_interface]) # check validate() - cipher is mandatory with self.assertRaises(ConfigSessionError): @@ -158,6 +173,13 @@ class MACsecInterfaceTest(BasicInterfaceTest.TestCase): self.assertIn(interface, interfaces()) self.assertEqual(cipher, get_cipher(interface)) + # check that we use the new macsec_csindex option (T4537) + tmp = get_config_value(src_interface, 'macsec_csindex') + self.assertIn("1", tmp) + + # Check for running process + self.assertTrue(process_named_running(PROCESS_NAME)) + def test_macsec_source_interface(self): # Ensure source-interface can bot be part of any other bond or bridge @@ -186,6 +208,9 @@ class MACsecInterfaceTest(BasicInterfaceTest.TestCase): self.cli_commit() self.assertIn(interface, interfaces()) + # Check for running process + self.assertTrue(process_named_running(PROCESS_NAME)) + if __name__ == '__main__': - unittest.main(verbosity=2) + unittest.main(verbosity=2, failfast=True) diff --git a/smoketest/scripts/cli/test_interfaces_openvpn.py b/smoketest/scripts/cli/test_interfaces_openvpn.py index 24df0af4d..55218ecac 100755 --- a/smoketest/scripts/cli/test_interfaces_openvpn.py +++ b/smoketest/scripts/cli/test_interfaces_openvpn.py @@ -607,6 +607,44 @@ class TestInterfacesOpenVPN(VyOSUnitTestSHIM.TestCase): interface = f'vtun{ii}' self.assertNotIn(interface, interfaces()) + def test_openvpn_options(self): + # Ensure OpenVPN process restart on openvpn-option CLI node change + + interface = 'vtun5001' + path = base_path + [interface] + + self.cli_set(path + ['mode', 'site-to-site']) + self.cli_set(path + ['local-address', '10.0.0.2']) + self.cli_set(path + ['remote-address', '192.168.0.3']) + self.cli_set(path + ['shared-secret-key-file', s2s_key]) + + self.cli_commit() + + # Now verify the OpenVPN "raw" option passing. Once an openvpn-option is + # added, modified or deleted from the CLI, OpenVPN daemon must be restarted + cur_pid = process_named_running('openvpn') + self.cli_set(path + ['openvpn-option', '--persist-tun']) + self.cli_commit() + + # PID must be different as OpenVPN Must be restarted + new_pid = process_named_running('openvpn') + self.assertNotEqual(cur_pid, new_pid) + cur_pid = new_pid + + self.cli_set(path + ['openvpn-option', '--persist-key']) + self.cli_commit() + + # PID must be different as OpenVPN Must be restarted + new_pid = process_named_running('openvpn') + self.assertNotEqual(cur_pid, new_pid) + cur_pid = new_pid + + self.cli_delete(path + ['openvpn-option']) + self.cli_commit() + + # PID must be different as OpenVPN Must be restarted + new_pid = process_named_running('openvpn') + self.assertNotEqual(cur_pid, new_pid) if __name__ == '__main__': # Our SSL certificates need a subject ... diff --git a/smoketest/scripts/cli/test_load_balancning_wan.py b/smoketest/scripts/cli/test_load_balancning_wan.py new file mode 100755 index 000000000..edc6deb04 --- /dev/null +++ b/smoketest/scripts/cli/test_load_balancning_wan.py @@ -0,0 +1,189 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2022 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import unittest +import time + +from base_vyostest_shim import VyOSUnitTestSHIM +from vyos.configsession import ConfigSessionError +from vyos.ifconfig import Section +from vyos.util import call +from vyos.util import cmd + + +base_path = ['load-balancing'] + + +def create_netns(name): + return call(f'sudo ip netns add {name}') + +def create_veth_pair(local='veth0', peer='ceth0'): + return call(f'sudo ip link add {local} type veth peer name {peer}') + +def move_interface_to_netns(iface, netns_name): + return call(f'sudo ip link set {iface} netns {netns_name}') + +def rename_interface(iface, new_name): + return call(f'sudo ip link set {iface} name {new_name}') + +def cmd_in_netns(netns, cmd): + return call(f'sudo ip netns exec {netns} {cmd}') + +def delete_netns(name): + return call(f'sudo ip netns del {name}') + + +class TestLoadBalancingWan(VyOSUnitTestSHIM.TestCase): + @classmethod + def setUpClass(cls): + super(TestLoadBalancingWan, cls).setUpClass() + + # ensure we can also run this test on a live system - so lets clean + # out the current configuration :) + cls.cli_delete(cls, base_path) + + def tearDown(self): + self.cli_delete(base_path) + self.cli_commit() + + def test_table_routes(self): + + ns1 = 'ns201' + ns2 = 'ns202' + iface1 = 'eth201' + iface2 = 'eth202' + container_iface1 = 'ceth0' + container_iface2 = 'ceth1' + + # Create network namespeces + create_netns(ns1) + create_netns(ns2) + create_veth_pair(iface1, container_iface1) + create_veth_pair(iface2, container_iface2) + move_interface_to_netns(container_iface1, ns1) + move_interface_to_netns(container_iface2, ns2) + call(f'sudo ip a add 203.0.113.10/24 dev {iface1}') + call(f'sudo ip a add 192.0.2.10/24 dev {iface2}') + call(f'sudo ip link set dev {iface1} up') + call(f'sudo ip link set dev {iface2} up') + cmd_in_netns(ns1, f'ip link set {container_iface1} name eth0') + cmd_in_netns(ns2, f'ip link set {container_iface2} name eth0') + cmd_in_netns(ns1, 'ip a add 203.0.113.1/24 dev eth0') + cmd_in_netns(ns2, 'ip a add 192.0.2.1/24 dev eth0') + cmd_in_netns(ns1, 'ip link set dev eth0 up') + cmd_in_netns(ns2, 'ip link set dev eth0 up') + + # Set load-balancing configuration + self.cli_set(base_path + ['wan', 'interface-health', iface1, 'failure-count', '2']) + self.cli_set(base_path + ['wan', 'interface-health', iface1, 'nexthop', '203.0.113.1']) + self.cli_set(base_path + ['wan', 'interface-health', iface1, 'success-count', '1']) + self.cli_set(base_path + ['wan', 'interface-health', iface2, 'failure-count', '2']) + self.cli_set(base_path + ['wan', 'interface-health', iface2, 'nexthop', '192.0.2.1']) + self.cli_set(base_path + ['wan', 'interface-health', iface2, 'success-count', '1']) + + # commit changes + self.cli_commit() + + time.sleep(5) + # Check default routes in tables 201, 202 + # Expected values + original = 'default via 203.0.113.1 dev eth201' + tmp = cmd('sudo ip route show table 201') + self.assertEqual(tmp, original) + + original = 'default via 192.0.2.1 dev eth202' + tmp = cmd('sudo ip route show table 202') + self.assertEqual(tmp, original) + + # Delete veth interfaces and netns + for iface in [iface1, iface2]: + call(f'sudo ip link del dev {iface}') + + delete_netns(ns1) + delete_netns(ns2) + + def test_check_chains(self): + + ns1 = 'nsA' + ns2 = 'nsB' + iface1 = 'veth1' + iface2 = 'veth2' + container_iface1 = 'ceth0' + container_iface2 = 'ceth1' + mangle_isp1 = """table ip mangle { + chain ISP_veth1 { + counter ct mark set 0xc9 + counter meta mark set 0xc9 + counter accept + } +}""" + mangle_isp2 = """table ip mangle { + chain ISP_veth2 { + counter ct mark set 0xca + counter meta mark set 0xca + counter accept + } +}""" + + # Create network namespeces + create_netns(ns1) + create_netns(ns2) + create_veth_pair(iface1, container_iface1) + create_veth_pair(iface2, container_iface2) + move_interface_to_netns(container_iface1, ns1) + move_interface_to_netns(container_iface2, ns2) + call(f'sudo ip a add 203.0.113.10/24 dev {iface1}') + call(f'sudo ip a add 192.0.2.10/24 dev {iface2}') + call(f'sudo ip link set dev {iface1} up') + call(f'sudo ip link set dev {iface2} up') + cmd_in_netns(ns1, f'ip link set {container_iface1} name eth0') + cmd_in_netns(ns2, f'ip link set {container_iface2} name eth0') + cmd_in_netns(ns1, 'ip a add 203.0.113.1/24 dev eth0') + cmd_in_netns(ns2, 'ip a add 192.0.2.1/24 dev eth0') + cmd_in_netns(ns1, 'ip link set dev eth0 up') + cmd_in_netns(ns2, 'ip link set dev eth0 up') + + # Set load-balancing configuration + self.cli_set(base_path + ['wan', 'interface-health', iface1, 'failure-count', '2']) + self.cli_set(base_path + ['wan', 'interface-health', iface1, 'nexthop', '203.0.113.1']) + self.cli_set(base_path + ['wan', 'interface-health', iface1, 'success-count', '1']) + self.cli_set(base_path + ['wan', 'interface-health', iface2, 'failure-count', '2']) + self.cli_set(base_path + ['wan', 'interface-health', iface2, 'nexthop', '192.0.2.1']) + self.cli_set(base_path + ['wan', 'interface-health', iface2, 'success-count', '1']) + + # commit changes + self.cli_commit() + + time.sleep(5) + # Check chains + #call('sudo nft list ruleset') + tmp = cmd(f'sudo nft -s list chain mangle ISP_{iface1}') + self.assertEqual(tmp, mangle_isp1) + + tmp = cmd(f'sudo nft -s list chain mangle ISP_{iface2}') + self.assertEqual(tmp, mangle_isp2) + + # Delete veth interfaces and netns + for iface in [iface1, iface2]: + call(f'sudo ip link del dev {iface}') + + delete_netns(ns1) + delete_netns(ns2) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_protocols_isis.py b/smoketest/scripts/cli/test_protocols_isis.py index 8abdd6d37..167cd05f8 100755 --- a/smoketest/scripts/cli/test_protocols_isis.py +++ b/smoketest/scripts/cli/test_protocols_isis.py @@ -71,13 +71,13 @@ class TestProtocolsISIS(VyOSUnitTestSHIM.TestCase): self.cli_commit() # Verify all changes - tmp = self.getFRRconfig(f'router isis {domain}') + tmp = self.getFRRconfig(f'router isis {domain}', daemon='isisd') self.assertIn(f' net {net}', tmp) self.assertIn(f' log-adjacency-changes', tmp) self.assertIn(f' redistribute ipv4 connected level-2 route-map {route_map}', tmp) for interface in self._interfaces: - tmp = self.getFRRconfig(f'interface {interface}') + tmp = self.getFRRconfig(f'interface {interface}', daemon='isisd') self.assertIn(f' ip router isis {domain}', tmp) self.assertIn(f' ipv6 router isis {domain}', tmp) @@ -93,22 +93,26 @@ class TestProtocolsISIS(VyOSUnitTestSHIM.TestCase): self.isis_base_config() self.cli_set(base_path + ['redistribute', 'ipv4', 'connected', 'level-2', 'route-map', route_map]) self.cli_set(base_path + ['route-map', route_map]) + self.cli_set(base_path + ['level', 'level-2']) # commit changes self.cli_commit() # Verify FRR configuration zebra_route_map = f'ip protocol isis route-map {route_map}' - frrconfig = self.getFRRconfig(zebra_route_map) + frrconfig = self.getFRRconfig(zebra_route_map, daemon='zebra') self.assertIn(zebra_route_map, frrconfig) + tmp = self.getFRRconfig(f'router isis {domain}', daemon='isisd') + self.assertIn(' is-type level-2-only', tmp) + # Remove the route-map again self.cli_delete(base_path + ['route-map']) # commit changes self.cli_commit() # Verify FRR configuration - frrconfig = self.getFRRconfig(zebra_route_map) + frrconfig = self.getFRRconfig(zebra_route_map, daemon='zebra') self.assertNotIn(zebra_route_map, frrconfig) self.cli_delete(['policy', 'route-map', route_map]) @@ -128,7 +132,7 @@ class TestProtocolsISIS(VyOSUnitTestSHIM.TestCase): self.cli_commit() # Verify all changes - tmp = self.getFRRconfig(f'router isis {domain}') + tmp = self.getFRRconfig(f'router isis {domain}', daemon='isisd') self.assertIn(f' net {net}', tmp) for afi in ['ipv4', 'ipv6']: @@ -140,6 +144,8 @@ class TestProtocolsISIS(VyOSUnitTestSHIM.TestCase): password = 'foo' self.isis_base_config() + for interface in self._interfaces: + self.cli_set(base_path + ['interface', interface, 'password', 'plaintext-password', f'{password}-{interface}']) self.cli_set(base_path + ['area-password', 'plaintext-password', password]) self.cli_set(base_path + ['area-password', 'md5', password]) @@ -160,10 +166,61 @@ class TestProtocolsISIS(VyOSUnitTestSHIM.TestCase): self.cli_commit() # Verify all changes - tmp = self.getFRRconfig(f'router isis {domain}') + tmp = self.getFRRconfig(f'router isis {domain}', daemon='isisd') self.assertIn(f' net {net}', tmp) self.assertIn(f' domain-password clear {password}', tmp) self.assertIn(f' area-password clear {password}', tmp) + for interface in self._interfaces: + tmp = self.getFRRconfig(f'interface {interface}', daemon='isisd') + self.assertIn(f' isis password clear {password}-{interface}', tmp) + + def test_isis_06_spf_delay(self): + network = 'point-to-point' + holddown = '10' + init_delay = '50' + long_delay = '200' + short_delay = '100' + time_to_learn = '75' + + self.cli_set(base_path + ['net', net]) + for interface in self._interfaces: + self.cli_set(base_path + ['interface', interface, 'network', network]) + + self.cli_set(base_path + ['spf-delay-ietf', 'holddown', holddown]) + # verify() - All types of spf-delay must be configured + with self.assertRaises(ConfigSessionError): + self.cli_commit() + + self.cli_set(base_path + ['spf-delay-ietf', 'init-delay', init_delay]) + # verify() - All types of spf-delay must be configured + with self.assertRaises(ConfigSessionError): + self.cli_commit() + + self.cli_set(base_path + ['spf-delay-ietf', 'long-delay', long_delay]) + # verify() - All types of spf-delay must be configured + with self.assertRaises(ConfigSessionError): + self.cli_commit() + + self.cli_set(base_path + ['spf-delay-ietf', 'short-delay', short_delay]) + # verify() - All types of spf-delay must be configured + with self.assertRaises(ConfigSessionError): + self.cli_commit() + self.cli_set(base_path + ['spf-delay-ietf', 'time-to-learn', time_to_learn]) + + # Commit all changes + self.cli_commit() + + # Verify all changes + tmp = self.getFRRconfig(f'router isis {domain}', daemon='isisd') + self.assertIn(f' net {net}', tmp) + self.assertIn(f' spf-delay-ietf init-delay {init_delay} short-delay {short_delay} long-delay {long_delay} holddown {holddown} time-to-learn {time_to_learn}', tmp) + + for interface in self._interfaces: + tmp = self.getFRRconfig(f'interface {interface}', daemon='isisd') + self.assertIn(f' ip router isis {domain}', tmp) + self.assertIn(f' ipv6 router isis {domain}', tmp) + self.assertIn(f' isis network {network}', tmp) + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_service_dns_forwarding.py b/smoketest/scripts/cli/test_service_dns_forwarding.py index 44e27828d..bffb1947f 100755 --- a/smoketest/scripts/cli/test_service_dns_forwarding.py +++ b/smoketest/scripts/cli/test_service_dns_forwarding.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2019-2020 VyOS maintainers and contributors +# Copyright (C) 2019-2022 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -26,7 +26,7 @@ from vyos.util import process_named_running CONFIG_FILE = '/run/powerdns/recursor.conf' FORWARD_FILE = '/run/powerdns/recursor.forward-zones.conf' HOSTSD_FILE = '/run/powerdns/recursor.vyos-hostsd.conf.lua' -PROCESS_NAME= 'pdns-r/worker' +PROCESS_NAME= 'pdns_recursor' base_path = ['service', 'dns', 'forwarding'] @@ -39,7 +39,18 @@ def get_config_value(key, file=CONFIG_FILE): return tmp[0] class TestServicePowerDNS(VyOSUnitTestSHIM.TestCase): + @classmethod + def setUpClass(cls): + super(TestServicePowerDNS, cls).setUpClass() + + # ensure we can also run this test on a live system - so lets clean + # out the current configuration :) + cls.cli_delete(cls, base_path) + def tearDown(self): + # Check for running process + self.assertTrue(process_named_running(PROCESS_NAME)) + # Delete DNS forwarding configuration self.cli_delete(base_path) self.cli_commit() @@ -93,9 +104,6 @@ class TestServicePowerDNS(VyOSUnitTestSHIM.TestCase): tmp = get_config_value('export-etc-hosts') self.assertEqual(tmp, 'no') - # Check for running process - self.assertTrue(process_named_running(PROCESS_NAME)) - def test_dnssec(self): # DNSSEC option testing @@ -114,9 +122,6 @@ class TestServicePowerDNS(VyOSUnitTestSHIM.TestCase): tmp = get_config_value('dnssec') self.assertEqual(tmp, option) - # Check for running process - self.assertTrue(process_named_running(PROCESS_NAME)) - def test_external_nameserver(self): # Externe Domain Name Servers (DNS) addresses @@ -140,9 +145,6 @@ class TestServicePowerDNS(VyOSUnitTestSHIM.TestCase): tmp = get_config_value('export-etc-hosts') self.assertEqual(tmp, 'yes') - # Check for running process - self.assertTrue(process_named_running(PROCESS_NAME)) - def test_domain_forwarding(self): for network in allow_from: self.cli_set(base_path + ['allow-from', network]) @@ -179,9 +181,26 @@ class TestServicePowerDNS(VyOSUnitTestSHIM.TestCase): if domain == domains[1]: self.assertIn(f'addNTA("{domain}", "static")', hosts_conf) - # Check for running process - self.assertTrue(process_named_running(PROCESS_NAME)) + def test_dns64(self): + dns_prefix = '64:ff9b::/96' + + for network in allow_from: + self.cli_set(base_path + ['allow-from', network]) + for address in listen_adress: + self.cli_set(base_path + ['listen-address', address]) + + # Check dns64-prefix - must be prefix /96 + self.cli_set(base_path + ['dns64-prefix', '2001:db8:aabb::/64']) + with self.assertRaises(ConfigSessionError): + self.cli_commit() + self.cli_set(base_path + ['dns64-prefix', dns_prefix]) + + # commit changes + self.cli_commit() + + # verify dns64-prefix configuration + tmp = get_config_value('dns64-prefix') + self.assertEqual(tmp, dns_prefix) if __name__ == '__main__': unittest.main(verbosity=2) - diff --git a/smoketest/scripts/cli/test_service_salt.py b/smoketest/scripts/cli/test_service_salt.py new file mode 100755 index 000000000..5b328677e --- /dev/null +++ b/smoketest/scripts/cli/test_service_salt.py @@ -0,0 +1,94 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2022 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import unittest + +from socket import gethostname +from base_vyostest_shim import VyOSUnitTestSHIM + +from vyos.util import process_named_running +from vyos.util import read_file +from vyos.util import cmd + +PROCESS_NAME = 'salt-minion' +SALT_CONF = '/etc/salt/minion' +base_path = ['service', 'salt-minion'] + +class TestServiceSALT(VyOSUnitTestSHIM.TestCase): + @classmethod + def setUpClass(cls): + super(TestServiceSALT, cls).setUpClass() + + # ensure we can also run this test on a live system - so lets clean + # out the current configuration :) + cls.cli_delete(cls, base_path) + + def tearDown(self): + # Check for running process + self.assertTrue(process_named_running(PROCESS_NAME)) + + # delete testing SALT config + self.cli_delete(base_path) + self.cli_commit() + + # For an unknown reason on QEMU systems (e.g. where smoketests are executed + # from the CI) salt-minion process is not killed by systemd. Apparently + # no issue on VMWare. + if cmd('systemd-detect-virt') != 'kvm': + self.assertFalse(process_named_running(PROCESS_NAME)) + + def test_default(self): + servers = ['192.0.2.1', '192.0.2.2'] + + for server in servers: + self.cli_set(base_path + ['master', server]) + + self.cli_commit() + + # commiconf = read_file() Check configured port + conf = read_file(SALT_CONF) + self.assertIn(f'- {server}', conf) + + # defaults + hostname = gethostname() + self.assertIn(f'hash_type: sha256', conf) + self.assertIn(f'id: {hostname}', conf) + self.assertIn(f'mine_interval: 60', conf) + + def test_options(self): + server = '192.0.2.3' + hash = 'sha1' + id = 'foo' + interval = '120' + + self.cli_set(base_path + ['master', server]) + self.cli_set(base_path + ['hash', hash]) + self.cli_set(base_path + ['id', id]) + self.cli_set(base_path + ['interval', interval]) + + self.cli_commit() + + # commiconf = read_file() Check configured port + conf = read_file(SALT_CONF) + self.assertIn(f'- {server}', conf) + + # defaults + self.assertIn(f'hash_type: {hash}', conf) + self.assertIn(f'id: {id}', conf) + self.assertIn(f'mine_interval: {interval}', conf) + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_service_snmp.py b/smoketest/scripts/cli/test_service_snmp.py index 00754e668..864097771 100755 --- a/smoketest/scripts/cli/test_service_snmp.py +++ b/smoketest/scripts/cli/test_service_snmp.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2019-2020 VyOS maintainers and contributors +# Copyright (C) 2019-2022 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -94,6 +94,51 @@ class TestSNMPService(VyOSUnitTestSHIM.TestCase): self.assertTrue(process_named_running(PROCESS_NAME)) self.cli_delete(['interfaces', 'dummy', dummy_if]) + def test_snmp_tcp(self): + dummy_if = 'dum123' + dummy_addr = '100.64.0.1/32' + self.cli_set(['interfaces', 'dummy', dummy_if, 'address', dummy_addr]) + + # Check if SNMP can be configured and service runs + clients = ['192.0.2.1', '2001:db8::1'] + networks = ['192.0.2.128/25', '2001:db8:babe::/48'] + listen = ['127.0.0.1', '::1', address_from_cidr(dummy_addr)] + port = '2325' + + for auth in ['ro', 'rw']: + community = 'VyOS' + auth + self.cli_set(base_path + ['community', community, 'authorization', auth]) + for client in clients: + self.cli_set(base_path + ['community', community, 'client', client]) + for network in networks: + self.cli_set(base_path + ['community', community, 'network', network]) + + for addr in listen: + self.cli_set(base_path + ['listen-address', addr, 'port', port]) + + self.cli_set(base_path + ['contact', 'maintainers@vyos.io']) + self.cli_set(base_path + ['location', 'qemu']) + self.cli_set(base_path + ['protocol', 'tcp']) + + self.cli_commit() + + # verify listen address, it will be returned as + # ['unix:/run/snmpd.socket,tcp:127.0.0.1:161,tcp6:[::1]:161'] + # thus we need to transfor this into a proper list + config = get_config_value('agentaddress') + expected = 'unix:/run/snmpd.socket' + self.assertIn(expected, config) + + for addr in listen: + if is_ipv4(addr): + expected = f'tcp:{addr}:{port}' + else: + expected = f'tcp6:[{addr}]:{port}' + self.assertIn(expected, config) + + # Check for running process + self.assertTrue(process_named_running(PROCESS_NAME)) + self.cli_delete(['interfaces', 'dummy', dummy_if]) def test_snmpv3_sha(self): # Check if SNMPv3 can be configured with SHA authentication diff --git a/smoketest/scripts/cli/test_service_ssh.py b/smoketest/scripts/cli/test_service_ssh.py index 6f58ce3d3..49a167e04 100755 --- a/smoketest/scripts/cli/test_service_ssh.py +++ b/smoketest/scripts/cli/test_service_ssh.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2019-2020 VyOS maintainers and contributors +# Copyright (C) 2019-2022 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -14,14 +14,19 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +import os +import paramiko import re import os import unittest +from pwd import getpwall + from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSessionError from vyos.util import cmd +from vyos.util import is_systemd_service_running from vyos.util import process_named_running from vyos.util import read_file @@ -42,10 +47,18 @@ class TestServiceSSH(VyOSUnitTestSHIM.TestCase): self.cli_delete(base_path) def tearDown(self): + # Check for running process + self.assertTrue(process_named_running(PROCESS_NAME)) + # delete testing SSH config self.cli_delete(base_path) self.cli_commit() + # Established SSH connections remains running after service is stopped. + # We can not use process_named_running here - we rather need to check + # that the systemd service is no longer running + self.assertFalse(is_systemd_service_running(PROCESS_NAME)) + def test_ssh_default(self): # Check if SSH service runs with default settings - used for checking # behavior of <defaultValue> in XML definition @@ -58,9 +71,6 @@ class TestServiceSSH(VyOSUnitTestSHIM.TestCase): port = get_config_value('Port')[0] self.assertEqual('22', port) - # Check for running process - self.assertTrue(process_named_running(PROCESS_NAME)) - def test_ssh_single_listen_address(self): # Check if SSH service can be configured and runs self.cli_set(base_path + ['port', '1234']) @@ -97,9 +107,6 @@ class TestServiceSSH(VyOSUnitTestSHIM.TestCase): keepalive = get_config_value('ClientAliveInterval')[0] self.assertTrue("100" in keepalive) - # Check for running process - self.assertTrue(process_named_running(PROCESS_NAME)) - def test_ssh_multiple_listen_addresses(self): # Check if SSH service can be configured and runs with multiple # listen ports and listen-addresses @@ -124,9 +131,6 @@ class TestServiceSSH(VyOSUnitTestSHIM.TestCase): for address in addresses: self.assertIn(address, tmp) - # Check for running process - self.assertTrue(process_named_running(PROCESS_NAME)) - def test_ssh_vrf(self): # Check if SSH service can be bound to given VRF port = '22' @@ -146,9 +150,6 @@ class TestServiceSSH(VyOSUnitTestSHIM.TestCase): tmp = get_config_value('Port') self.assertIn(port, tmp) - # Check for running process - self.assertTrue(process_named_running(PROCESS_NAME)) - # Check for process in VRF tmp = cmd(f'ip vrf pids {vrf}') self.assertIn(PROCESS_NAME, tmp) @@ -156,5 +157,51 @@ class TestServiceSSH(VyOSUnitTestSHIM.TestCase): # delete VRF self.cli_delete(['vrf', 'name', vrf]) + def test_ssh_login(self): + # Perform SSH login and command execution with a predefined user. The + # result (output of uname -a) must match the output if the command is + # run natively. + # + # We also try to login as an invalid user - this is not allowed to work. + + def ssh_send_cmd(command, username, password, host='localhost'): + """ SSH command execution helper """ + # Try to login via SSH + ssh_client = paramiko.SSHClient() + ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + ssh_client.connect(hostname='localhost', username=username, password=password) + _, stdout, stderr = ssh_client.exec_command(command) + output = stdout.read().decode().strip() + error = stderr.read().decode().strip() + ssh_client.close() + return output, error + + test_user = 'ssh_test' + test_pass = 'v2i57DZs8idUwMN3VC92' + test_command = 'uname -a' + + self.cli_set(base_path) + self.cli_set(['system', 'login', 'user', test_user, 'authentication', 'plaintext-password', test_pass]) + + # commit changes + self.cli_commit() + + # Login with proper credentials + output, error = ssh_send_cmd(test_command, test_user, test_pass) + # verify login + self.assertFalse(error) + self.assertEqual(output, cmd(test_command)) + + # Login with invalid credentials + with self.assertRaises(paramiko.ssh_exception.AuthenticationException): + output, error = ssh_send_cmd(test_command, 'invalid_user', 'invalid_password') + + self.cli_delete(['system', 'login', 'user', test_user]) + self.cli_commit() + + # After deletion the test user is not allowed to remain in /etc/passwd + usernames = [x[0] for x in getpwall()] + self.assertNotIn(test_user, usernames) + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_system_flow-accounting.py b/smoketest/scripts/cli/test_system_flow-accounting.py index 57866a198..92a7973fa 100755 --- a/smoketest/scripts/cli/test_system_flow-accounting.py +++ b/smoketest/scripts/cli/test_system_flow-accounting.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2021 VyOS maintainers and contributors +# Copyright (C) 2021-2022 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -64,7 +64,8 @@ class TestSystemFlowAccounting(VyOSUnitTestSHIM.TestCase): # verify configuration tmp = cmd('sudo iptables-save -t raw') for interface in Section.interfaces('ethernet'): - self.assertIn(f'-A VYATTA_CT_PREROUTING_HOOK -i {interface} -m comment --comment FLOW_ACCOUNTING_RULE -j NFLOG --nflog-group 2 --nflog-size 128 --nflog-threshold 100', tmp) + self.assertIn(f'-A VYATTA_CT_PREROUTING_HOOK -i {interface} -m comment --comment FLOW_ACCOUNTING_RULE -j ' + f'NFLOG --nflog-group 2 --nflog-size 128 --nflog-threshold 100', tmp) uacctd = read_file(uacctd_conf) # circular queue size - buffer_size @@ -130,14 +131,15 @@ class TestSystemFlowAccounting(VyOSUnitTestSHIM.TestCase): self.assertNotIn(f'plugins: memory', uacctd) for server, server_config in sflow_server.items(): + tmp_srv = server.replace('.', '-') if 'port' in server_config: - self.assertIn(f'sfprobe_receiver[sf_{server}]: {server}', uacctd) + self.assertIn(f'sfprobe_receiver[sf_{tmp_srv}]: {server}', uacctd) else: - self.assertIn(f'sfprobe_receiver[sf_{server}]: {server}:6343', uacctd) + self.assertIn(f'sfprobe_receiver[sf_{tmp_srv}]: {server}:6343', uacctd) - self.assertIn(f'sfprobe_agentip[sf_{server}]: {agent_address}', uacctd) - self.assertIn(f'sampling_rate[sf_{server}]: {sampling_rate}', uacctd) - self.assertIn(f'sfprobe_source_ip[sf_{server}]: {source_address}', uacctd) + self.assertIn(f'sfprobe_agentip[sf_{tmp_srv}]: {agent_address}', uacctd) + self.assertIn(f'sampling_rate[sf_{tmp_srv}]: {sampling_rate}', uacctd) + self.assertIn(f'sfprobe_source_ip[sf_{tmp_srv}]: {source_address}', uacctd) self.cli_delete(['interfaces', 'dummy', dummy_if]) @@ -203,23 +205,27 @@ class TestSystemFlowAccounting(VyOSUnitTestSHIM.TestCase): tmp = 'plugins: ' for server, server_config in netflow_server.items(): - tmp += f'nfprobe[nf_{server}],' + tmp_srv = server.replace('.', '-') + tmp += f'nfprobe[nf_{tmp_srv}],' tmp += 'memory' self.assertIn(f'{tmp}', uacctd) for server, server_config in netflow_server.items(): - self.assertIn(f'nfprobe_engine[nf_{server}]: {engine_id}', uacctd) - self.assertIn(f'nfprobe_maxflows[nf_{server}]: {max_flows}', uacctd) - self.assertIn(f'sampling_rate[nf_{server}]: {sampling_rate}', uacctd) - self.assertIn(f'nfprobe_source_ip[nf_{server}]: {source_address}', uacctd) - self.assertIn(f'nfprobe_version[nf_{server}]: {version}', uacctd) + tmp_srv = server.replace('.', '-') + self.assertIn(f'nfprobe_engine[nf_{tmp_srv}]: {engine_id}', uacctd) + self.assertIn(f'nfprobe_maxflows[nf_{tmp_srv}]: {max_flows}', uacctd) + self.assertIn(f'sampling_rate[nf_{tmp_srv}]: {sampling_rate}', uacctd) + self.assertIn(f'nfprobe_source_ip[nf_{tmp_srv}]: {source_address}', uacctd) + self.assertIn(f'nfprobe_version[nf_{tmp_srv}]: {version}', uacctd) if 'port' in server_config: - self.assertIn(f'nfprobe_receiver[nf_{server}]: {server}', uacctd) + self.assertIn(f'nfprobe_receiver[nf_{tmp_srv}]: {server}', uacctd) else: - self.assertIn(f'nfprobe_receiver[nf_{server}]: {server}:2055', uacctd) + self.assertIn(f'nfprobe_receiver[nf_{tmp_srv}]: {server}:2055', uacctd) - self.assertIn(f'nfprobe_timeouts[nf_{server}]: expint={tmo_expiry}:general={tmo_flow}:icmp={tmo_icmp}:maxlife={tmo_max}:tcp.fin={tmo_tcp_fin}:tcp={tmo_tcp_generic}:tcp.rst={tmo_tcp_rst}:udp={tmo_udp}', uacctd) + self.assertIn(f'nfprobe_timeouts[nf_{tmp_srv}]: expint={tmo_expiry}:general={tmo_flow}:' + f'icmp={tmo_icmp}:maxlife={tmo_max}:tcp.fin={tmo_tcp_fin}:tcp={tmo_tcp_generic}:' + f'tcp.rst={tmo_tcp_rst}:udp={tmo_udp}', uacctd) self.cli_delete(['interfaces', 'dummy', dummy_if]) diff --git a/smoketest/scripts/cli/test_system_ipv6.py b/smoketest/scripts/cli/test_system_ipv6.py index 3112d2e46..837d1dc12 100755 --- a/smoketest/scripts/cli/test_system_ipv6.py +++ b/smoketest/scripts/cli/test_system_ipv6.py @@ -17,7 +17,12 @@ import unittest from base_vyostest_shim import VyOSUnitTestSHIM + +from vyos.template import is_ipv4 from vyos.util import read_file +from vyos.util import is_ipv6_enabled +from vyos.util import get_interface_config +from vyos.validate import is_intf_addr_assigned base_path = ['system', 'ipv6'] @@ -42,6 +47,14 @@ class TestSystemIPv6(VyOSUnitTestSHIM.TestCase): self.assertEqual(read_file(file_forwarding), '0') def test_system_ipv6_disable(self): + # Verify previous "enable" state + self.assertEqual(read_file(file_disable), '0') + self.assertTrue(is_ipv6_enabled()) + + loopbacks = ['127.0.0.1', '::1'] + for addr in loopbacks: + self.assertTrue(is_intf_addr_assigned('lo', addr)) + # Do not assign any IPv6 address on interfaces, this requires a reboot # which can not be tested, but we can read the config file :) self.cli_set(base_path + ['disable']) @@ -49,6 +62,24 @@ class TestSystemIPv6(VyOSUnitTestSHIM.TestCase): # Verify configuration file self.assertEqual(read_file(file_disable), '1') + self.assertFalse(is_ipv6_enabled()) + + for addr in loopbacks: + if is_ipv4(addr): + self.assertTrue(is_intf_addr_assigned('lo', addr)) + else: + self.assertFalse(is_intf_addr_assigned('lo', addr)) + + # T4330: Verify MTU can be changed with IPv6 disabled + mtu = '1600' + eth_if = 'eth0' + self.cli_set(['interfaces', 'ethernet', eth_if, 'mtu', mtu]) + self.cli_commit() + + tmp = get_interface_config(eth_if) + self.assertEqual(tmp['mtu'], int(mtu)) + + self.cli_delete(['interfaces', 'ethernet', eth_if, 'mtu']) def test_system_ipv6_strict_dad(self): # This defaults to 1 diff --git a/smoketest/scripts/cli/test_system_login.py b/smoketest/scripts/cli/test_system_login.py index 69a06eeac..bc76de0ad 100755 --- a/smoketest/scripts/cli/test_system_login.py +++ b/smoketest/scripts/cli/test_system_login.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2019-2020 VyOS maintainers and contributors +# Copyright (C) 2019-2022 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -23,6 +23,7 @@ from base_vyostest_shim import VyOSUnitTestSHIM from distutils.version import LooseVersion from platform import release as kernel_version from subprocess import Popen, PIPE +from pwd import getpwall from vyos.configsession import ConfigSessionError from vyos.util import cmd @@ -52,6 +53,11 @@ class TestSystemLogin(VyOSUnitTestSHIM.TestCase): self.cli_commit() + # After deletion, a user is not allowed to remain in /etc/passwd + usernames = [x[0] for x in getpwall()] + for user in users: + self.assertNotIn(user, usernames) + def test_add_linux_system_user(self): # We are not allowed to re-use a username already taken by the Linux # base system diff --git a/smoketest/scripts/cli/test_system_ntp.py b/smoketest/scripts/cli/test_system_ntp.py index c8cf04b7d..0ee96157c 100755 --- a/smoketest/scripts/cli/test_system_ntp.py +++ b/smoketest/scripts/cli/test_system_ntp.py @@ -108,5 +108,22 @@ class TestSystemNTP(VyOSUnitTestSHIM.TestCase): for listen in listen_address: self.assertIn(f'interface listen {listen}', config) + def test_03_ntp_interface(self): + interfaces = ['eth0', 'eth1'] + for interface in interfaces: + self.cli_set(base_path + ['interface', interface]) + + servers = ['time1.vyos.net', 'time2.vyos.net'] + for server in servers: + self.cli_set(base_path + ['server', server]) + + self.cli_commit() + + # Check generated client address configuration + config = read_file(NTP_CONF) + self.assertIn('interface ignore wildcard', config) + for interface in interfaces: + self.assertIn(f'interface listen {interface}', config) + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_vpn_openconnect.py b/smoketest/scripts/cli/test_vpn_openconnect.py index ccac0820d..6db49abab 100755 --- a/smoketest/scripts/cli/test_vpn_openconnect.py +++ b/smoketest/scripts/cli/test_vpn_openconnect.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2020 VyOS maintainers and contributors +# Copyright (C) 2020-2022 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -19,36 +19,82 @@ import unittest from base_vyostest_shim import VyOSUnitTestSHIM from vyos.util import process_named_running from vyos.util import cmd +from vyos.util import read_file from os import path, mkdir -OCSERV_CONF = '/run/ocserv/ocserv.conf' base_path = ['vpn', 'openconnect'] cert_dir = '/config/auth/' ca_cert = f'{cert_dir}ca.crt' ssl_cert = f'{cert_dir}server.crt' ssl_key = f'{cert_dir}server.key' -class TestVpnOpenconnect(VyOSUnitTestSHIM.TestCase): +PROCESS_NAME = 'ocserv-main' +config_file = '/run/ocserv/ocserv.conf' +auth_file = '/run/ocserv/ocpasswd' +otp_file = '/run/ocserv/users.oath' + +class TestVPNOpenConnect(VyOSUnitTestSHIM.TestCase): + @classmethod + def setUpClass(cls): + super(TestVPNOpenConnect, cls).setUpClass() + + # ensure we can also run this test on a live system - so lets clean + # out the current configuration :) + cls.cli_delete(cls, base_path) + + cls.cli_set(cls, base_path + ["ssl", "ca-cert-file", ca_cert]) + cls.cli_set(cls, base_path + ["ssl", "cert-file", ssl_cert]) + cls.cli_set(cls, base_path + ["ssl", "key-file", ssl_key]) + def tearDown(self): + self.assertTrue(process_named_running(PROCESS_NAME)) + # Delete vpn openconnect configuration self.cli_delete(base_path) self.cli_commit() - def test_vpn(self): + self.assertFalse(process_named_running(PROCESS_NAME)) + + def test_ocserv(self): user = 'vyos_user' password = 'vyos_pass' - self.cli_delete(base_path) - self.cli_set(base_path + ["authentication", "local-users", "username", user, "password", password]) - self.cli_set(base_path + ["authentication", "mode", "local"]) - self.cli_set(base_path + ["network-settings", "client-ip-settings", "subnet", "192.0.2.0/24"]) - self.cli_set(base_path + ["ssl", "ca-cert-file", ca_cert]) - self.cli_set(base_path + ["ssl", "cert-file", ssl_cert]) - self.cli_set(base_path + ["ssl", "key-file", ssl_key]) + + v4_subnet = '192.0.2.0/24' + v6_prefix = '2001:db8:1000::/64' + v6_len = '126' + name_server = ['1.2.3.4', '1.2.3.5', '2001:db8::1'] + split_dns = ['vyos.net', 'vyos.io'] + + self.cli_set(base_path + ['authentication', 'local-users', 'username', user, 'password', password]) + self.cli_set(base_path + ['authentication', 'mode', "local"]) + self.cli_set(base_path + ["network-settings", "client-ip-settings", "subnet", v4_subnet]) + self.cli_set(base_path + ['network-settings', 'client-ip-settings', 'subnet', v4_subnet]) + self.cli_set(base_path + ['network-settings', 'client-ipv6-pool', 'prefix', v6_prefix]) + self.cli_set(base_path + ['network-settings', 'client-ipv6-pool', 'mask', v6_len]) + + for ns in name_server: + self.cli_set(base_path + ['network-settings', 'name-server', ns]) + for domain in split_dns: + self.cli_set(base_path + ['network-settings', 'split-dns', domain]) self.cli_commit() - # Check for running process - self.assertTrue(process_named_running('ocserv-main')) + # Verify configuration + daemon_config = read_file(config_file) + + # authentication mode local password-otp + self.assertIn(f'auth = "plain[/run/ocserv/ocpasswd]"', daemon_config) + self.assertIn(f'ipv4-network = {v4_subnet}', daemon_config) + self.assertIn(f'ipv6-network = {v6_prefix}', daemon_config) + self.assertIn(f'ipv6-subnet-prefix = {v6_len}', daemon_config) + + for ns in name_server: + self.assertIn(f'dns = {ns}', daemon_config) + for domain in split_dns: + self.assertIn(f'split-dns = {domain}', daemon_config) + + auth_config = read_file(auth_file) + self.assertIn(f'{user}:*:$', auth_config) if __name__ == '__main__': if not path.exists(cert_dir): diff --git a/smoketest/scripts/cli/test_vrf.py b/smoketest/scripts/cli/test_vrf.py index 0f006ca3c..6614aeb06 100755 --- a/smoketest/scripts/cli/test_vrf.py +++ b/smoketest/scripts/cli/test_vrf.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2020-2021 VyOS maintainers and contributors +# Copyright (C) 2020-2022 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -27,8 +27,10 @@ from vyos.configsession import ConfigSessionError from vyos.ifconfig import Interface from vyos.ifconfig import Section from vyos.template import is_ipv6 +from vyos.template import is_ipv4 from vyos.util import cmd from vyos.util import read_file +from vyos.util import get_interface_config from vyos.validate import is_intf_addr_assigned base_path = ['vrf'] @@ -61,6 +63,8 @@ class VRFTest(VyOSUnitTestSHIM.TestCase): def tearDown(self): # delete all VRFs self.cli_delete(base_path) + self.cli_delete(['interfaces', 'dummy']) + self.cli_delete(['protocols', 'vrf']) self.cli_commit() for vrf in vrfs: self.assertNotIn(vrf, interfaces()) @@ -108,9 +112,14 @@ class VRFTest(VyOSUnitTestSHIM.TestCase): # ... regex = f'{table}\s+{vrf}\s+#\s+{description}' self.assertTrue(re.findall(regex, iproute2_config)) + + tmp = get_interface_config(vrf) + self.assertEqual(int(table), tmp['linkinfo']['info_data']['table']) + + # Increment table ID for the next run table = str(int(table) + 1) - def test_vrf_loopback_ips(self): + def test_vrf_loopbacks_ips(self): table = '2000' for vrf in vrfs: base = base_path + ['name', vrf] @@ -121,10 +130,48 @@ class VRFTest(VyOSUnitTestSHIM.TestCase): self.cli_commit() # Verify VRF configuration + loopbacks = ['127.0.0.1', '::1'] for vrf in vrfs: - self.assertTrue(vrf in interfaces()) - self.assertTrue(is_intf_addr_assigned(vrf, '127.0.0.1')) - self.assertTrue(is_intf_addr_assigned(vrf, '::1')) + # Ensure VRF was created + self.assertIn(vrf, interfaces()) + # Test for proper loopback IP assignment + for addr in loopbacks: + self.assertTrue(is_intf_addr_assigned(vrf, addr)) + + def test_vrf_loopbacks_no_ipv6(self): + table = '2002' + for vrf in vrfs: + base = base_path + ['name', vrf] + self.cli_set(base + ['table', str(table)]) + table = str(int(table) + 1) + + # Globally disable IPv6 - this will remove all IPv6 interface addresses + self.cli_set(['system', 'ipv6', 'disable']) + + # commit changes + self.cli_commit() + + # Verify VRF configuration + table = '2002' + loopbacks = ['127.0.0.1', '::1'] + for vrf in vrfs: + # Ensure VRF was created + self.assertIn(vrf, interfaces()) + + # Verify VRF table ID + tmp = get_interface_config(vrf) + self.assertEqual(int(table), tmp['linkinfo']['info_data']['table']) + + # Test for proper loopback IP assignment + for addr in loopbacks: + if is_ipv4(addr): + self.assertTrue(is_intf_addr_assigned(vrf, addr)) + else: + self.assertFalse(is_intf_addr_assigned(vrf, addr)) + + table = str(int(table) + 1) + + self.cli_delete(['system', 'ipv6']) def test_vrf_bind_all(self): table = '2000' @@ -176,11 +223,11 @@ class VRFTest(VyOSUnitTestSHIM.TestCase): # commit changes self.cli_commit() - # Verify & cleanup + # Verify VRF assignmant for interface in self._interfaces: - # os.readlink resolves to: '../../../../../virtual/net/foovrf' - tmp = os.readlink(f'/sys/class/net/{interface}/master').split('/')[-1] - self.assertEqual(tmp, vrf) + tmp = get_interface_config(interface) + self.assertEqual(vrf, tmp['master']) + # cleanup section = Section.section(interface) self.cli_delete(['interfaces', section, interface, 'vrf']) @@ -204,14 +251,14 @@ class VRFTest(VyOSUnitTestSHIM.TestCase): }, } + # required interface for leaking to default table + self.cli_set(['interfaces', 'ethernet', 'eth0', 'address', '192.0.2.1/24']) + table = '2000' for vrf in vrfs: base = base_path + ['name', vrf] self.cli_set(base + ['table', str(table)]) - # required interface for leaking to default table - self.cli_set(['interfaces', 'ethernet', 'eth0', 'address', '192.0.2.1/24']) - # we also need an interface in "UP" state to install routes self.cli_set(['interfaces', 'dummy', f'dum{table}', 'vrf', vrf]) self.cli_set(['interfaces', 'dummy', f'dum{table}', 'address', '192.0.2.1/24']) @@ -233,28 +280,64 @@ class VRFTest(VyOSUnitTestSHIM.TestCase): self.cli_commit() # Verify routes - table = '2000' for vrf in vrfs: - for route, route_config in routes.items(): - if is_ipv6(route): - tmp = get_vrf_ipv6_routes(vrf) - else: - tmp = get_vrf_ipv4_routes(vrf) + self.assertIn(vrf, interfaces()) + frrconfig = self.getFRRconfig(f'vrf {vrf}') + for prefix, prefix_config in routes.items(): + tmp = 'ip' + if is_ipv6(prefix): + tmp += 'v6' - found = False - for result in tmp: - if 'dst' in result and result['dst'] == route: - if 'gateway' in result and result['gateway'] == route_config['next_hop']: - found = True + tmp += f' route {prefix} {prefix_config["next_hop"]}' + if 'distance' in prefix_config: + tmp += ' ' + prefix_config['distance'] + if 'next_hop_vrf' in prefix_config: + tmp += ' nexthop-vrf ' + prefix_config['next_hop_vrf'] - self.assertTrue(found) + self.assertIn(tmp, frrconfig) - # Cleanup - self.cli_delete(['protocols', 'vrf', vrf]) - self.cli_delete(['interfaces', 'dummy', f'dum{table}']) - self.cli_delete(['interfaces', 'ethernet', 'eth0', 'address', '192.0.2.1/24']) + self.cli_delete(['interfaces', 'ethernet', 'eth0', 'address']) - table = str(int(table) + 1) + + def test_vrf_link_local_ip_addresses(self): + # Testcase for issue T4331 + table = '100' + vrf = 'orange' + interface = 'dum9998' + addresses = ['192.0.2.1/26', '2001:db8:9998::1/64', 'fe80::1/64'] + + for address in addresses: + self.cli_set(['interfaces', 'dummy', interface, 'address', address]) + + # Create dummy interfaces + self.cli_commit() + + # ... and verify IP addresses got assigned + for address in addresses: + self.assertTrue(is_intf_addr_assigned(interface, address)) + + # Move interface to VRF + self.cli_set(base_path + ['name', vrf, 'table', table]) + self.cli_set(['interfaces', 'dummy', interface, 'vrf', vrf]) + + # Apply VRF config + self.cli_commit() + # Ensure VRF got created + self.assertIn(vrf, interfaces()) + # ... and IP addresses are still assigned + for address in addresses: + self.assertTrue(is_intf_addr_assigned(interface, address)) + # Verify VRF table ID + tmp = get_interface_config(vrf) + self.assertEqual(int(table), tmp['linkinfo']['info_data']['table']) + + # Verify interface is assigned to VRF + tmp = get_interface_config(interface) + self.assertEqual(vrf, tmp['master']) + + # Delete Interface + self.cli_delete(['interfaces', 'dummy', interface]) + self.cli_commit() if __name__ == '__main__': unittest.main(verbosity=2) |