diff options
Diffstat (limited to 'smoketest')
-rw-r--r-- | smoketest/scripts/cli/base_interfaces_test.py | 148 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_interfaces_bonding.py | 1 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_interfaces_bridge.py | 1 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_interfaces_dummy.py | 8 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_interfaces_ethernet.py | 23 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_protocols_bgp.py | 239 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_service_ssh.py | 14 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_system_login.py | 66 |
8 files changed, 387 insertions, 113 deletions
diff --git a/smoketest/scripts/cli/base_interfaces_test.py b/smoketest/scripts/cli/base_interfaces_test.py index 8ee5395d0..8b04eb337 100644 --- a/smoketest/scripts/cli/base_interfaces_test.py +++ b/smoketest/scripts/cli/base_interfaces_test.py @@ -1,4 +1,4 @@ -# Copyright (C) 2019-2020 VyOS maintainers and contributors +# Copyright (C) 2019-2021 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -12,7 +12,6 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import re import os import unittest import json @@ -51,17 +50,6 @@ def is_mirrored_to(interface, mirror_if, qdisc): ret_val = True return ret_val - -dhcp6c_config_file = '/run/dhcp6c/dhcp6c.{}.conf' -def get_dhcp6c_config_value(interface, key): - tmp = read_file(dhcp6c_config_file.format(interface)) - tmp = re.findall(r'\n?{}\s+(.*)'.format(key), tmp) - - out = [] - for item in tmp: - out.append(item.replace(';','')) - return out - class BasicInterfaceTest: class BaseTest(unittest.TestCase): _test_ip = False @@ -106,7 +94,7 @@ class BasicInterfaceTest: def test_span_mirror(self): if not self._mirror_interfaces: - return None + self.skipTest('not enabled') # Check the two-way mirror rules of ingress and egress for mirror in self._mirror_interfaces: @@ -175,7 +163,7 @@ class BasicInterfaceTest: def test_ipv6_link_local_address(self): # Common function for IPv6 link-local address assignemnts if not self._test_ipv6: - return None + self.skipTest('not enabled') for interface in self._interfaces: base = self._base_path + [interface] @@ -202,7 +190,7 @@ class BasicInterfaceTest: def test_interface_mtu(self): if not self._test_mtu: - return None + self.skipTest('not enabled') for intf in self._interfaces: base = self._base_path + [intf] @@ -222,7 +210,7 @@ class BasicInterfaceTest: # Testcase if MTU can be changed to 1200 on non IPv6 # enabled interfaces if not self._test_mtu: - return None + self.skipTest('not enabled') old_mtu = self._mtu self._mtu = '1200' @@ -247,7 +235,7 @@ class BasicInterfaceTest: def test_8021q_vlan_interfaces(self): if not self._test_vlan: - return None + self.skipTest('not enabled') for interface in self._interfaces: base = self._base_path + [interface] @@ -274,7 +262,7 @@ class BasicInterfaceTest: def test_8021ad_qinq_vlan_interfaces(self): if not self._test_qinq: - return None + self.skipTest('not enabled') for interface in self._interfaces: base = self._base_path + [interface] @@ -305,7 +293,7 @@ class BasicInterfaceTest: def test_interface_ip_options(self): if not self._test_ip: - return None + self.skipTest('not enabled') for interface in self._interfaces: arp_tmo = '300' @@ -356,7 +344,7 @@ class BasicInterfaceTest: def test_interface_ipv6_options(self): if not self._test_ipv6: - return None + self.skipTest('not enabled') for interface in self._interfaces: dad_transmits = '10' @@ -378,39 +366,119 @@ class BasicInterfaceTest: self.assertEqual(dad_transmits, tmp) - def test_ipv6_dhcpv6_prefix_delegation(self): + def test_dhcpv6pd_auto_sla_id(self): + if not self._test_ipv6: + self.skipTest('not enabled') + + prefix_len = '56' + sla_len = str(64 - int(prefix_len)) + + delegatees = ['dum2340', 'dum2341', 'dum2342', 'dum2343', 'dum2344'] + + for interface in self._interfaces: + path = self._base_path + [interface] + for option in self._options.get(interface, []): + self.session.set(path + option.split()) + + address = '1' + # prefix delegation stuff + pd_base = path + ['dhcpv6-options', 'pd', '0'] + self.session.set(pd_base + ['length', prefix_len]) + + for delegatee in delegatees: + section = Section.section(delegatee) + self.session.set(['interfaces', section, delegatee]) + self.session.set(pd_base + ['interface', delegatee, 'address', address]) + # increment interface address + address = str(int(address) + 1) + + self.session.commit() + + for interface in self._interfaces: + dhcpc6_config = read_file(f'/run/dhcp6c/dhcp6c.{interface}.conf') + + # verify DHCPv6 prefix delegation + self.assertIn(f'prefix ::/{prefix_len} infinity;', dhcpc6_config) + + address = '1' + sla_id = '0' + for delegatee in delegatees: + self.assertIn(f'prefix-interface {delegatee}' + r' {', dhcpc6_config) + self.assertIn(f'ifid {address};', dhcpc6_config) + self.assertIn(f'sla-id {sla_id};', dhcpc6_config) + self.assertIn(f'sla-len {sla_len};', dhcpc6_config) + + # increment sla-id + sla_id = str(int(sla_id) + 1) + # increment interface address + address = str(int(address) + 1) + + # Check for running process + self.assertTrue(process_named_running('dhcp6c')) + + for delegatee in delegatees: + # we can already cleanup the test delegatee interface here + # as until commit() is called, nothing happens + section = Section.section(delegatee) + self.session.delete(['interfaces', section, delegatee]) + + def test_dhcpv6pd_manual_sla_id(self): if not self._test_ipv6: - return None + self.skipTest('not enabled') + + prefix_len = '56' + sla_len = str(64 - int(prefix_len)) + + delegatees = ['dum3340', 'dum3341', 'dum3342', 'dum3343', 'dum3344'] - address = '1' - sla_id = '0' - sla_len = '8' for interface in self._interfaces: path = self._base_path + [interface] for option in self._options.get(interface, []): self.session.set(path + option.split()) # prefix delegation stuff + address = '1' + sla_id = '1' pd_base = path + ['dhcpv6-options', 'pd', '0'] - self.session.set(pd_base + ['length', '56']) - self.session.set(pd_base + ['interface', interface, 'address', address]) - self.session.set(pd_base + ['interface', interface, 'sla-id', sla_id]) + self.session.set(pd_base + ['length', prefix_len]) + + for delegatee in delegatees: + section = Section.section(delegatee) + self.session.set(['interfaces', section, delegatee]) + self.session.set(pd_base + ['interface', delegatee, 'address', address]) + self.session.set(pd_base + ['interface', delegatee, 'sla-id', sla_id]) + + # increment interface address + address = str(int(address) + 1) + sla_id = str(int(sla_id) + 1) self.session.commit() + # Verify dhcpc6 client configuration for interface in self._interfaces: + address = '1' + sla_id = '1' + dhcpc6_config = read_file(f'/run/dhcp6c/dhcp6c.{interface}.conf') + # verify DHCPv6 prefix delegation - # will return: ['delegation', '::/56 infinity;'] - tmp = get_dhcp6c_config_value(interface, 'prefix')[1].split()[0] # mind the whitespace - self.assertEqual(tmp, '::/56') - tmp = get_dhcp6c_config_value(interface, 'prefix-interface')[0].split()[0] - self.assertEqual(tmp, interface) - tmp = get_dhcp6c_config_value(interface, 'ifid')[0] - self.assertEqual(tmp, address) - tmp = get_dhcp6c_config_value(interface, 'sla-id')[0] - self.assertEqual(tmp, sla_id) - tmp = get_dhcp6c_config_value(interface, 'sla-len')[0] - self.assertEqual(tmp, sla_len) + self.assertIn(f'prefix ::/{prefix_len} infinity;', dhcpc6_config) + + for delegatee in delegatees: + self.assertIn(f'prefix-interface {delegatee}' + r' {', dhcpc6_config) + self.assertIn(f'ifid {address};', dhcpc6_config) + self.assertIn(f'sla-id {sla_id};', dhcpc6_config) + self.assertIn(f'sla-len {sla_len};', dhcpc6_config) + + # increment sla-id + sla_id = str(int(sla_id) + 1) + # increment interface address + address = str(int(address) + 1) # Check for running process self.assertTrue(process_named_running('dhcp6c')) + + for delegatee in delegatees: + # we can already cleanup the test delegatee interface here + # as until commit() is called, nothing happens + section = Section.section(delegatee) + self.session.delete(['interfaces', section, delegatee]) diff --git a/smoketest/scripts/cli/test_interfaces_bonding.py b/smoketest/scripts/cli/test_interfaces_bonding.py index a35682b7c..d73ff09e9 100755 --- a/smoketest/scripts/cli/test_interfaces_bonding.py +++ b/smoketest/scripts/cli/test_interfaces_bonding.py @@ -26,6 +26,7 @@ from vyos.util import read_file class BondingInterfaceTest(BasicInterfaceTest.BaseTest): def setUp(self): + self._test_ip = True self._test_mtu = True self._test_vlan = True self._test_qinq = True diff --git a/smoketest/scripts/cli/test_interfaces_bridge.py b/smoketest/scripts/cli/test_interfaces_bridge.py index 7444701c1..d47d236d0 100755 --- a/smoketest/scripts/cli/test_interfaces_bridge.py +++ b/smoketest/scripts/cli/test_interfaces_bridge.py @@ -28,6 +28,7 @@ from vyos.util import read_file class BridgeInterfaceTest(BasicInterfaceTest.BaseTest): def setUp(self): + self._test_ip = True self._test_ipv6 = True self._test_vlan = True self._test_qinq = True diff --git a/smoketest/scripts/cli/test_interfaces_dummy.py b/smoketest/scripts/cli/test_interfaces_dummy.py index c482a6f0b..60465a1d5 100755 --- a/smoketest/scripts/cli/test_interfaces_dummy.py +++ b/smoketest/scripts/cli/test_interfaces_dummy.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2020 VyOS maintainers and contributors +# Copyright (C) 2020-2021 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -20,9 +20,9 @@ from base_interfaces_test import BasicInterfaceTest class DummyInterfaceTest(BasicInterfaceTest.BaseTest): def setUp(self): - self._base_path = ['interfaces', 'dummy'] - self._interfaces = ['dum0', 'dum1', 'dum2'] - super().setUp() + self._base_path = ['interfaces', 'dummy'] + self._interfaces = ['dum435', 'dum8677', 'dum0931', 'dum089'] + super().setUp() if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_interfaces_ethernet.py b/smoketest/scripts/cli/test_interfaces_ethernet.py index 3c4796283..6a0bdf150 100755 --- a/smoketest/scripts/cli/test_interfaces_ethernet.py +++ b/smoketest/scripts/cli/test_interfaces_ethernet.py @@ -19,6 +19,7 @@ import re import unittest from base_interfaces_test import BasicInterfaceTest +from vyos.configsession import ConfigSessionError from vyos.ifconfig import Section from vyos.util import cmd from vyos.util import process_named_running @@ -123,6 +124,28 @@ class EthernetInterfaceTest(BasicInterfaceTest.BaseTest): self.assertEqual(f'{cpus:x}', f'{rps_cpus:x}') + def test_non_existing_interface(self): + unknonw_interface = self._base_path + ['eth667'] + self.session.set(unknonw_interface) + + # check validate() - interface does not exist + with self.assertRaises(ConfigSessionError): + self.session.commit() + + # we need to remove this wrong interface from the configuration + # manually, else tearDown() will have problem in commit() + self.session.delete(unknonw_interface) + + def test_speed_duplex_verify(self): + for interface in self._interfaces: + self.session.set(self._base_path + [interface, 'speed', '1000']) + + # check validate() - if either speed or duplex is not auto, the + # other one must be manually configured, too + with self.assertRaises(ConfigSessionError): + self.session.commit() + self.session.set(self._base_path + [interface, 'speed', 'auto']) + self.session.commit() def test_eapol_support(self): for interface in self._interfaces: diff --git a/smoketest/scripts/cli/test_protocols_bgp.py b/smoketest/scripts/cli/test_protocols_bgp.py index 941d7828f..1d93aeda4 100755 --- a/smoketest/scripts/cli/test_protocols_bgp.py +++ b/smoketest/scripts/cli/test_protocols_bgp.py @@ -28,6 +28,8 @@ base_path = ['protocols', 'bgp', ASN] neighbor_config = { '192.0.2.1' : { + 'cap_dynamic' : '', + 'cap_ext_next': '', 'remote_as' : '100', 'adv_interv' : '400', 'passive' : '', @@ -35,6 +37,7 @@ neighbor_config = { 'shutdown' : '', 'cap_over' : '', 'ttl_security': '5', + 'local_as' : '300', }, '192.0.2.2' : { 'remote_as' : '200', @@ -49,6 +52,7 @@ neighbor_config = { 'remote_as' : '200', 'passive' : '', 'multi_hop' : '5', + 'update_src' : 'lo', }, } @@ -63,18 +67,23 @@ peer_group_config = { # 'ttl_security': '5', }, 'bar' : { +# XXX: not available in current Perl backend +# 'description' : 'foo peer bar group', 'remote_as' : '200', 'shutdown' : '', 'no_cap_nego' : '', + 'local_as' : '300', }, 'baz' : { + 'cap_dynamic' : '', + 'cap_ext_next': '', 'remote_as' : '200', 'passive' : '', 'multi_hop' : '5', + 'update_src' : 'lo', }, } - def getFRRBGPconfig(): return cmd(f'vtysh -c "show run" | sed -n "/router bgp {ASN}/,/^!/p"') @@ -87,6 +96,35 @@ class TestProtocolsBGP(unittest.TestCase): self.session.commit() del self.session + def verify_frr_config(self, peer, peer_config, frrconfig): + # recurring patterns to verify for both a simple neighbor and a peer-group + if 'cap_dynamic' in peer_config: + self.assertIn(f' neighbor {peer} capability dynamic', frrconfig) + if 'cap_ext_next' in peer_config: + self.assertIn(f' neighbor {peer} capability extended-nexthop', frrconfig) + if 'description' in peer_config: + self.assertIn(f' neighbor {peer} description {peer_config["description"]}', frrconfig) + if 'no_cap_nego' in peer_config: + self.assertIn(f' neighbor {peer} dont-capability-negotiate', frrconfig) + if 'multi_hop' in peer_config: + self.assertIn(f' neighbor {peer} ebgp-multihop {peer_config["multi_hop"]}', frrconfig) + if 'local_as' in peer_config: + self.assertIn(f' neighbor {peer} local-as {peer_config["local_as"]}', frrconfig) + if 'cap_over' in peer_config: + self.assertIn(f' neighbor {peer} override-capability', frrconfig) + if 'passive' in peer_config: + self.assertIn(f' neighbor {peer} passive', frrconfig) + if 'password' in peer_config: + self.assertIn(f' neighbor {peer} password {peer_config["password"]}', frrconfig) + if 'remote_as' in peer_config: + self.assertIn(f' neighbor {peer} remote-as {peer_config["remote_as"]}', frrconfig) + if 'shutdown' in peer_config: + self.assertIn(f' neighbor {peer} shutdown', frrconfig) + if 'ttl_security' in peer_config: + self.assertIn(f' neighbor {peer} ttl-security hops {peer_config["ttl_security"]}', frrconfig) + if 'update_src' in peer_config: + self.assertIn(f' neighbor {peer} update-source {peer_config["update_src"]}', frrconfig) + def test_bgp_01_simple(self): router_id = '127.0.0.1' local_pref = '500' @@ -113,31 +151,41 @@ class TestProtocolsBGP(unittest.TestCase): self.assertTrue(process_named_running(PROCESS_NAME)) def test_bgp_02_neighbors(self): + # Test out individual neighbor configuration items, not all of them are + # also available to a peer-group! for neighbor, config in neighbor_config.items(): - if 'remote_as' in config: - self.session.set(base_path + ['neighbor', neighbor, 'remote-as', config["remote_as"]]) - if 'description' in config: - self.session.set(base_path + ['neighbor', neighbor, 'description', config["description"]]) - if 'passive' in config: - self.session.set(base_path + ['neighbor', neighbor, 'passive']) - if 'password' in config: - self.session.set(base_path + ['neighbor', neighbor, 'password', config["password"]]) - if 'shutdown' in config: - self.session.set(base_path + ['neighbor', neighbor, 'shutdown']) if 'adv_interv' in config: self.session.set(base_path + ['neighbor', neighbor, 'advertisement-interval', config["adv_interv"]]) + if 'cap_dynamic' in config: + self.session.set(base_path + ['neighbor', neighbor, 'capability', 'dynamic']) + if 'cap_ext_next' in config: + self.session.set(base_path + ['neighbor', neighbor, 'capability', 'extended-nexthop']) + if 'description' in config: + self.session.set(base_path + ['neighbor', neighbor, 'description', config["description"]]) if 'no_cap_nego' in config: self.session.set(base_path + ['neighbor', neighbor, 'disable-capability-negotiation']) - if 'port' in config: - self.session.set(base_path + ['neighbor', neighbor, 'port', config["port"]]) if 'multi_hop' in config: self.session.set(base_path + ['neighbor', neighbor, 'ebgp-multihop', config["multi_hop"]]) + if 'local_as' in config: + self.session.set(base_path + ['neighbor', neighbor, 'local-as', config["local_as"]]) if 'cap_over' in config: self.session.set(base_path + ['neighbor', neighbor, 'override-capability']) + if 'passive' in config: + self.session.set(base_path + ['neighbor', neighbor, 'passive']) + if 'password' in config: + self.session.set(base_path + ['neighbor', neighbor, 'password', config["password"]]) + if 'port' in config: + self.session.set(base_path + ['neighbor', neighbor, 'port', config["port"]]) + if 'remote_as' in config: + self.session.set(base_path + ['neighbor', neighbor, 'remote-as', config["remote_as"]]) if 'cap_strict' in config: self.session.set(base_path + ['neighbor', neighbor, 'strict-capability-match']) + if 'shutdown' in config: + self.session.set(base_path + ['neighbor', neighbor, 'shutdown']) if 'ttl_security' in config: self.session.set(base_path + ['neighbor', neighbor, 'ttl-security', 'hops', config["ttl_security"]]) + if 'update_src' in config: + self.session.set(base_path + ['neighbor', neighbor, 'update-source', config["update_src"]]) # commit changes self.session.commit() @@ -146,49 +194,45 @@ class TestProtocolsBGP(unittest.TestCase): frrconfig = getFRRBGPconfig() self.assertIn(f'router bgp {ASN}', frrconfig) - for neighbor, config in neighbor_config.items(): - if 'remote_as' in config: - self.assertIn(f' neighbor {neighbor} remote-as {config["remote_as"]}', frrconfig) - if 'description' in config: - self.assertIn(f' neighbor {neighbor} description {config["description"]}', frrconfig) - if 'passive' in config: - self.assertIn(f' neighbor {neighbor} passive', frrconfig) - if 'password' in config: - self.assertIn(f' neighbor {neighbor} password {config["password"]}', frrconfig) - if 'shutdown' in config: - self.assertIn(f' neighbor {neighbor} shutdown', frrconfig) + for peer, peer_config in neighbor_config.items(): if 'adv_interv' in config: - self.assertIn(f' neighbor {neighbor} advertisement-interval {config["adv_interv"]}', frrconfig) - if 'no_cap_nego' in config: - self.assertIn(f' neighbor {neighbor} dont-capability-negotiate', frrconfig) + self.assertIn(f' neighbor {peer} advertisement-interval {peer_config["adv_interv"]}', frrconfig) if 'port' in config: - self.assertIn(f' neighbor {neighbor} port {config["port"]}', frrconfig) - if 'multi_hop' in config: - self.assertIn(f' neighbor {neighbor} ebgp-multihop {config["multi_hop"]}', frrconfig) - if 'cap_over' in config: - self.assertIn(f' neighbor {neighbor} override-capability', frrconfig) + self.assertIn(f' neighbor {peer} port {peer_config["port"]}', frrconfig) if 'cap_strict' in config: - self.assertIn(f' neighbor {neighbor} strict-capability-match', frrconfig) - if 'ttl_security' in config: - self.assertIn(f' neighbor {neighbor} ttl-security hops {config["ttl_security"]}', frrconfig) + self.assertIn(f' neighbor {peer} strict-capability-match', frrconfig) + + self.verify_frr_config(peer, peer_config, frrconfig) def test_bgp_03_peer_groups(self): + # Test out individual peer-group configuration items for peer_group, config in peer_group_config.items(): - self.session.set(base_path + ['peer-group', peer_group, 'remote-as', config["remote_as"]]) - if 'passive' in config: - self.session.set(base_path + ['peer-group', peer_group, 'passive']) - if 'password' in config: - self.session.set(base_path + ['peer-group', peer_group, 'password', config["password"]]) - if 'shutdown' in config: - self.session.set(base_path + ['peer-group', peer_group, 'shutdown']) + if 'cap_dynamic' in config: + self.session.set(base_path + ['peer-group', peer_group, 'capability', 'dynamic']) + if 'cap_ext_next' in config: + self.session.set(base_path + ['peer-group', peer_group, 'capability', 'extended-nexthop']) + if 'description' in config: + self.session.set(base_path + ['peer-group', peer_group, 'description', config["description"]]) if 'no_cap_nego' in config: self.session.set(base_path + ['peer-group', peer_group, 'disable-capability-negotiation']) if 'multi_hop' in config: self.session.set(base_path + ['peer-group', peer_group, 'ebgp-multihop', config["multi_hop"]]) + if 'local_as' in config: + self.session.set(base_path + ['peer-group', peer_group, 'local-as', config["local_as"]]) if 'cap_over' in config: self.session.set(base_path + ['peer-group', peer_group, 'override-capability']) + if 'passive' in config: + self.session.set(base_path + ['peer-group', peer_group, 'passive']) + if 'password' in config: + self.session.set(base_path + ['peer-group', peer_group, 'password', config["password"]]) + if 'remote_as' in config: + self.session.set(base_path + ['peer-group', peer_group, 'remote-as', config["remote_as"]]) + if 'shutdown' in config: + self.session.set(base_path + ['peer-group', peer_group, 'shutdown']) if 'ttl_security' in config: self.session.set(base_path + ['peer-group', peer_group, 'ttl-security', 'hops', config["ttl_security"]]) + if 'update_src' in config: + self.session.set(base_path + ['peer-group', peer_group, 'update-source', config["update_src"]]) # commit changes self.session.commit() @@ -197,25 +241,102 @@ class TestProtocolsBGP(unittest.TestCase): frrconfig = getFRRBGPconfig() self.assertIn(f'router bgp {ASN}', frrconfig) - for peer_group, config in peer_group_config.items(): + for peer, peer_config in peer_group_config.items(): self.assertIn(f' neighbor {peer_group} peer-group', frrconfig) + self.verify_frr_config(peer, peer_config, frrconfig) + + + def test_bgp_04_afi_ipv4(self): + networks = { + '10.0.0.0/8' : { + 'as_set' : '', + }, + '100.64.0.0/10' : { + 'as_set' : '', + }, + '192.168.0.0/16' : { + 'summary_only' : '', + }, + } + + # We want to redistribute ... + redistributes = ['connected', 'kernel', 'ospf', 'rip', 'static'] + for redistribute in redistributes: + self.session.set(base_path + ['address-family', 'ipv4-unicast', + 'redistribute', redistribute]) + + for network, network_config in networks.items(): + self.session.set(base_path + ['address-family', 'ipv4-unicast', + 'network', network]) + if 'as_set' in network_config: + self.session.set(base_path + ['address-family', 'ipv4-unicast', + 'aggregate-address', network, 'as-set']) + if 'summary_only' in network_config: + self.session.set(base_path + ['address-family', 'ipv4-unicast', + 'aggregate-address', network, 'summary-only']) + + # commit changes + self.session.commit() + + # Verify FRR bgpd configuration + frrconfig = getFRRBGPconfig() + self.assertIn(f'router bgp {ASN}', frrconfig) + self.assertIn(f' address-family ipv4 unicast', frrconfig) + + for redistribute in redistributes: + self.assertIn(f' redistribute {redistribute}', frrconfig) + + for network, network_config in networks.items(): + self.assertIn(f' network {network}', frrconfig) + if 'as_set' in network_config: + self.assertIn(f' aggregate-address {network} as-set', frrconfig) + if 'summary_only' in network_config: + self.assertIn(f' aggregate-address {network} summary-only', frrconfig) + + + def test_bgp_05_afi_ipv6(self): + networks = { + '2001:db8:100::/48' : { + }, + '2001:db8:200::/48' : { + }, + '2001:db8:300::/48' : { + 'summary_only' : '', + }, + } + + # We want to redistribute ... + redistributes = ['connected', 'kernel', 'ospfv3', 'ripng', 'static'] + for redistribute in redistributes: + self.session.set(base_path + ['address-family', 'ipv6-unicast', + 'redistribute', redistribute]) + + for network, network_config in networks.items(): + self.session.set(base_path + ['address-family', 'ipv6-unicast', + 'network', network]) + if 'summary_only' in network_config: + self.session.set(base_path + ['address-family', 'ipv6-unicast', + 'aggregate-address', network, 'summary-only']) + + # commit changes + self.session.commit() + + # Verify FRR bgpd configuration + frrconfig = getFRRBGPconfig() + self.assertIn(f'router bgp {ASN}', frrconfig) + self.assertIn(f' address-family ipv6 unicast', frrconfig) + + for redistribute in redistributes: + # FRR calls this OSPF6 + if redistribute == 'ospfv3': + redistribute = 'ospf6' + self.assertIn(f' redistribute {redistribute}', frrconfig) + + for network, network_config in networks.items(): + self.assertIn(f' network {network}', frrconfig) + if 'as_set' in network_config: + self.assertIn(f' aggregate-address {network} summary-only', frrconfig) - if 'remote_as' in config: - self.assertIn(f' neighbor {peer_group} remote-as {config["remote_as"]}', frrconfig) - if 'passive' in config: - self.assertIn(f' neighbor {peer_group} passive', frrconfig) - if 'password' in config: - self.assertIn(f' neighbor {peer_group} password {config["password"]}', frrconfig) - if 'shutdown' in config: - self.assertIn(f' neighbor {peer_group} shutdown', frrconfig) - if 'no_cap_nego' in config: - self.assertIn(f' neighbor {peer_group} dont-capability-negotiate', frrconfig) - if 'multi_hop' in config: - self.assertIn(f' neighbor {peer_group} ebgp-multihop {config["multi_hop"]}', frrconfig) - if 'cap_over' in config: - self.assertIn(f' neighbor {peer_group} override-capability', frrconfig) - if 'ttl_security' in config: - self.assertIn(f' neighbor {peer_group} ttl-security hops {config["ttl_security"]}', frrconfig) if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_service_ssh.py b/smoketest/scripts/cli/test_service_ssh.py index 0bb907c3a..eede042de 100755 --- a/smoketest/scripts/cli/test_service_ssh.py +++ b/smoketest/scripts/cli/test_service_ssh.py @@ -25,7 +25,7 @@ from vyos.util import process_named_running from vyos.util import read_file PROCESS_NAME = 'sshd' -SSHD_CONF = '/run/ssh/sshd_config' +SSHD_CONF = '/run/sshd/sshd_config' base_path = ['service', 'ssh'] vrf = 'ssh-test' @@ -44,11 +44,6 @@ class TestServiceSSH(unittest.TestCase): def tearDown(self): # delete testing SSH config self.session.delete(base_path) - # restore "plain" SSH access - self.session.set(base_path) - # delete VRF - self.session.delete(['vrf', 'name', vrf]) - self.session.commit() del self.session @@ -109,7 +104,7 @@ class TestServiceSSH(unittest.TestCase): def test_ssh_multiple_listen_addresses(self): # Check if SSH service can be configured and runs with multiple # listen ports and listen-addresses - ports = ['22', '2222'] + ports = ['22', '2222', '2223', '2224'] for port in ports: self.session.set(base_path + ['port', port]) @@ -143,7 +138,7 @@ class TestServiceSSH(unittest.TestCase): with self.assertRaises(ConfigSessionError): self.session.commit() - self.session.set(['vrf', 'name', vrf, 'table', '1001']) + self.session.set(['vrf', 'name', vrf, 'table', '1338']) # commit changes self.session.commit() @@ -159,5 +154,8 @@ class TestServiceSSH(unittest.TestCase): tmp = cmd(f'ip vrf pids {vrf}') self.assertIn(PROCESS_NAME, tmp) + # delete VRF + self.session.delete(['vrf', 'name', vrf]) + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_system_login.py b/smoketest/scripts/cli/test_system_login.py index 6188cf38b..bb6f57fc2 100755 --- a/smoketest/scripts/cli/test_system_login.py +++ b/smoketest/scripts/cli/test_system_login.py @@ -24,8 +24,10 @@ from platform import release as kernel_version from subprocess import Popen, PIPE from vyos.configsession import ConfigSession +from vyos.configsession import ConfigSessionError from vyos.util import cmd from vyos.util import read_file +from vyos.template import inc_ip base_path = ['system', 'login'] users = ['vyos1', 'vyos2'] @@ -42,7 +44,7 @@ class TestSystemLogin(unittest.TestCase): self.session.commit() del self.session - def test_local_user(self): + def test_system_login_user(self): # Check if user can be created and we can SSH to localhost self.session.set(['service', 'ssh', 'port', '22']) @@ -82,7 +84,7 @@ class TestSystemLogin(unittest.TestCase): for option in options: self.assertIn(f'{option}=y', kernel_config) - def test_radius_config(self): + def test_system_login_radius_ipv4(self): # Verify generated RADIUS configuration files radius_key = 'VyOSsecretVyOS' @@ -95,6 +97,12 @@ class TestSystemLogin(unittest.TestCase): self.session.set(base_path + ['radius', 'server', radius_server, 'port', radius_port]) self.session.set(base_path + ['radius', 'server', radius_server, 'timeout', radius_timeout]) self.session.set(base_path + ['radius', 'source-address', radius_source]) + self.session.set(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)]) + + # check validate() - Only one IPv4 source-address supported + with self.assertRaises(ConfigSessionError): + self.session.commit() + self.session.delete(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)]) self.session.commit() @@ -130,5 +138,59 @@ class TestSystemLogin(unittest.TestCase): tmp = re.findall(r'group:\s+mapname\s+files', nsswitch_conf) self.assertTrue(tmp) + def test_system_login_radius_ipv6(self): + # Verify generated RADIUS configuration files + + radius_key = 'VyOS-VyOS' + radius_server = '2001:db8::1' + radius_source = '::1' + radius_port = '4000' + radius_timeout = '4' + + self.session.set(base_path + ['radius', 'server', radius_server, 'key', radius_key]) + self.session.set(base_path + ['radius', 'server', radius_server, 'port', radius_port]) + self.session.set(base_path + ['radius', 'server', radius_server, 'timeout', radius_timeout]) + self.session.set(base_path + ['radius', 'source-address', radius_source]) + self.session.set(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)]) + + # check validate() - Only one IPv4 source-address supported + with self.assertRaises(ConfigSessionError): + self.session.commit() + self.session.delete(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)]) + + self.session.commit() + + # this file must be read with higher permissions + pam_radius_auth_conf = cmd('sudo cat /etc/pam_radius_auth.conf') + tmp = re.findall(r'\n?\[{}\]:{}\s+{}\s+{}\s+\[{}\]'.format(radius_server, + radius_port, radius_key, radius_timeout, + radius_source), pam_radius_auth_conf) + self.assertTrue(tmp) + + # required, static options + self.assertIn('priv-lvl 15', pam_radius_auth_conf) + self.assertIn('mapped_priv_user radius_priv_user', pam_radius_auth_conf) + + # PAM + pam_common_account = read_file('/etc/pam.d/common-account') + self.assertIn('pam_radius_auth.so', pam_common_account) + + pam_common_auth = read_file('/etc/pam.d/common-auth') + self.assertIn('pam_radius_auth.so', pam_common_auth) + + pam_common_session = read_file('/etc/pam.d/common-session') + self.assertIn('pam_radius_auth.so', pam_common_session) + + pam_common_session_noninteractive = read_file('/etc/pam.d/common-session-noninteractive') + self.assertIn('pam_radius_auth.so', pam_common_session_noninteractive) + + # NSS + nsswitch_conf = read_file('/etc/nsswitch.conf') + tmp = re.findall(r'passwd:\s+mapuid\s+files\s+mapname', nsswitch_conf) + self.assertTrue(tmp) + + tmp = re.findall(r'group:\s+mapname\s+files', nsswitch_conf) + self.assertTrue(tmp) + if __name__ == '__main__': unittest.main(verbosity=2) |