summaryrefslogtreecommitdiff
path: root/smoketest
diff options
context:
space:
mode:
Diffstat (limited to 'smoketest')
-rw-r--r--smoketest/scripts/cli/base_interfaces_test.py148
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_bonding.py1
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_bridge.py1
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_dummy.py8
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_ethernet.py23
-rwxr-xr-xsmoketest/scripts/cli/test_protocols_bgp.py239
-rwxr-xr-xsmoketest/scripts/cli/test_service_ssh.py14
-rwxr-xr-xsmoketest/scripts/cli/test_system_login.py66
8 files changed, 387 insertions, 113 deletions
diff --git a/smoketest/scripts/cli/base_interfaces_test.py b/smoketest/scripts/cli/base_interfaces_test.py
index 8ee5395d0..8b04eb337 100644
--- a/smoketest/scripts/cli/base_interfaces_test.py
+++ b/smoketest/scripts/cli/base_interfaces_test.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2019-2020 VyOS maintainers and contributors
+# Copyright (C) 2019-2021 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -12,7 +12,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import re
import os
import unittest
import json
@@ -51,17 +50,6 @@ def is_mirrored_to(interface, mirror_if, qdisc):
ret_val = True
return ret_val
-
-dhcp6c_config_file = '/run/dhcp6c/dhcp6c.{}.conf'
-def get_dhcp6c_config_value(interface, key):
- tmp = read_file(dhcp6c_config_file.format(interface))
- tmp = re.findall(r'\n?{}\s+(.*)'.format(key), tmp)
-
- out = []
- for item in tmp:
- out.append(item.replace(';',''))
- return out
-
class BasicInterfaceTest:
class BaseTest(unittest.TestCase):
_test_ip = False
@@ -106,7 +94,7 @@ class BasicInterfaceTest:
def test_span_mirror(self):
if not self._mirror_interfaces:
- return None
+ self.skipTest('not enabled')
# Check the two-way mirror rules of ingress and egress
for mirror in self._mirror_interfaces:
@@ -175,7 +163,7 @@ class BasicInterfaceTest:
def test_ipv6_link_local_address(self):
# Common function for IPv6 link-local address assignemnts
if not self._test_ipv6:
- return None
+ self.skipTest('not enabled')
for interface in self._interfaces:
base = self._base_path + [interface]
@@ -202,7 +190,7 @@ class BasicInterfaceTest:
def test_interface_mtu(self):
if not self._test_mtu:
- return None
+ self.skipTest('not enabled')
for intf in self._interfaces:
base = self._base_path + [intf]
@@ -222,7 +210,7 @@ class BasicInterfaceTest:
# Testcase if MTU can be changed to 1200 on non IPv6
# enabled interfaces
if not self._test_mtu:
- return None
+ self.skipTest('not enabled')
old_mtu = self._mtu
self._mtu = '1200'
@@ -247,7 +235,7 @@ class BasicInterfaceTest:
def test_8021q_vlan_interfaces(self):
if not self._test_vlan:
- return None
+ self.skipTest('not enabled')
for interface in self._interfaces:
base = self._base_path + [interface]
@@ -274,7 +262,7 @@ class BasicInterfaceTest:
def test_8021ad_qinq_vlan_interfaces(self):
if not self._test_qinq:
- return None
+ self.skipTest('not enabled')
for interface in self._interfaces:
base = self._base_path + [interface]
@@ -305,7 +293,7 @@ class BasicInterfaceTest:
def test_interface_ip_options(self):
if not self._test_ip:
- return None
+ self.skipTest('not enabled')
for interface in self._interfaces:
arp_tmo = '300'
@@ -356,7 +344,7 @@ class BasicInterfaceTest:
def test_interface_ipv6_options(self):
if not self._test_ipv6:
- return None
+ self.skipTest('not enabled')
for interface in self._interfaces:
dad_transmits = '10'
@@ -378,39 +366,119 @@ class BasicInterfaceTest:
self.assertEqual(dad_transmits, tmp)
- def test_ipv6_dhcpv6_prefix_delegation(self):
+ def test_dhcpv6pd_auto_sla_id(self):
+ if not self._test_ipv6:
+ self.skipTest('not enabled')
+
+ prefix_len = '56'
+ sla_len = str(64 - int(prefix_len))
+
+ delegatees = ['dum2340', 'dum2341', 'dum2342', 'dum2343', 'dum2344']
+
+ for interface in self._interfaces:
+ path = self._base_path + [interface]
+ for option in self._options.get(interface, []):
+ self.session.set(path + option.split())
+
+ address = '1'
+ # prefix delegation stuff
+ pd_base = path + ['dhcpv6-options', 'pd', '0']
+ self.session.set(pd_base + ['length', prefix_len])
+
+ for delegatee in delegatees:
+ section = Section.section(delegatee)
+ self.session.set(['interfaces', section, delegatee])
+ self.session.set(pd_base + ['interface', delegatee, 'address', address])
+ # increment interface address
+ address = str(int(address) + 1)
+
+ self.session.commit()
+
+ for interface in self._interfaces:
+ dhcpc6_config = read_file(f'/run/dhcp6c/dhcp6c.{interface}.conf')
+
+ # verify DHCPv6 prefix delegation
+ self.assertIn(f'prefix ::/{prefix_len} infinity;', dhcpc6_config)
+
+ address = '1'
+ sla_id = '0'
+ for delegatee in delegatees:
+ self.assertIn(f'prefix-interface {delegatee}' + r' {', dhcpc6_config)
+ self.assertIn(f'ifid {address};', dhcpc6_config)
+ self.assertIn(f'sla-id {sla_id};', dhcpc6_config)
+ self.assertIn(f'sla-len {sla_len};', dhcpc6_config)
+
+ # increment sla-id
+ sla_id = str(int(sla_id) + 1)
+ # increment interface address
+ address = str(int(address) + 1)
+
+ # Check for running process
+ self.assertTrue(process_named_running('dhcp6c'))
+
+ for delegatee in delegatees:
+ # we can already cleanup the test delegatee interface here
+ # as until commit() is called, nothing happens
+ section = Section.section(delegatee)
+ self.session.delete(['interfaces', section, delegatee])
+
+ def test_dhcpv6pd_manual_sla_id(self):
if not self._test_ipv6:
- return None
+ self.skipTest('not enabled')
+
+ prefix_len = '56'
+ sla_len = str(64 - int(prefix_len))
+
+ delegatees = ['dum3340', 'dum3341', 'dum3342', 'dum3343', 'dum3344']
- address = '1'
- sla_id = '0'
- sla_len = '8'
for interface in self._interfaces:
path = self._base_path + [interface]
for option in self._options.get(interface, []):
self.session.set(path + option.split())
# prefix delegation stuff
+ address = '1'
+ sla_id = '1'
pd_base = path + ['dhcpv6-options', 'pd', '0']
- self.session.set(pd_base + ['length', '56'])
- self.session.set(pd_base + ['interface', interface, 'address', address])
- self.session.set(pd_base + ['interface', interface, 'sla-id', sla_id])
+ self.session.set(pd_base + ['length', prefix_len])
+
+ for delegatee in delegatees:
+ section = Section.section(delegatee)
+ self.session.set(['interfaces', section, delegatee])
+ self.session.set(pd_base + ['interface', delegatee, 'address', address])
+ self.session.set(pd_base + ['interface', delegatee, 'sla-id', sla_id])
+
+ # increment interface address
+ address = str(int(address) + 1)
+ sla_id = str(int(sla_id) + 1)
self.session.commit()
+ # Verify dhcpc6 client configuration
for interface in self._interfaces:
+ address = '1'
+ sla_id = '1'
+ dhcpc6_config = read_file(f'/run/dhcp6c/dhcp6c.{interface}.conf')
+
# verify DHCPv6 prefix delegation
- # will return: ['delegation', '::/56 infinity;']
- tmp = get_dhcp6c_config_value(interface, 'prefix')[1].split()[0] # mind the whitespace
- self.assertEqual(tmp, '::/56')
- tmp = get_dhcp6c_config_value(interface, 'prefix-interface')[0].split()[0]
- self.assertEqual(tmp, interface)
- tmp = get_dhcp6c_config_value(interface, 'ifid')[0]
- self.assertEqual(tmp, address)
- tmp = get_dhcp6c_config_value(interface, 'sla-id')[0]
- self.assertEqual(tmp, sla_id)
- tmp = get_dhcp6c_config_value(interface, 'sla-len')[0]
- self.assertEqual(tmp, sla_len)
+ self.assertIn(f'prefix ::/{prefix_len} infinity;', dhcpc6_config)
+
+ for delegatee in delegatees:
+ self.assertIn(f'prefix-interface {delegatee}' + r' {', dhcpc6_config)
+ self.assertIn(f'ifid {address};', dhcpc6_config)
+ self.assertIn(f'sla-id {sla_id};', dhcpc6_config)
+ self.assertIn(f'sla-len {sla_len};', dhcpc6_config)
+
+ # increment sla-id
+ sla_id = str(int(sla_id) + 1)
+ # increment interface address
+ address = str(int(address) + 1)
# Check for running process
self.assertTrue(process_named_running('dhcp6c'))
+
+ for delegatee in delegatees:
+ # we can already cleanup the test delegatee interface here
+ # as until commit() is called, nothing happens
+ section = Section.section(delegatee)
+ self.session.delete(['interfaces', section, delegatee])
diff --git a/smoketest/scripts/cli/test_interfaces_bonding.py b/smoketest/scripts/cli/test_interfaces_bonding.py
index a35682b7c..d73ff09e9 100755
--- a/smoketest/scripts/cli/test_interfaces_bonding.py
+++ b/smoketest/scripts/cli/test_interfaces_bonding.py
@@ -26,6 +26,7 @@ from vyos.util import read_file
class BondingInterfaceTest(BasicInterfaceTest.BaseTest):
def setUp(self):
+ self._test_ip = True
self._test_mtu = True
self._test_vlan = True
self._test_qinq = True
diff --git a/smoketest/scripts/cli/test_interfaces_bridge.py b/smoketest/scripts/cli/test_interfaces_bridge.py
index 7444701c1..d47d236d0 100755
--- a/smoketest/scripts/cli/test_interfaces_bridge.py
+++ b/smoketest/scripts/cli/test_interfaces_bridge.py
@@ -28,6 +28,7 @@ from vyos.util import read_file
class BridgeInterfaceTest(BasicInterfaceTest.BaseTest):
def setUp(self):
+ self._test_ip = True
self._test_ipv6 = True
self._test_vlan = True
self._test_qinq = True
diff --git a/smoketest/scripts/cli/test_interfaces_dummy.py b/smoketest/scripts/cli/test_interfaces_dummy.py
index c482a6f0b..60465a1d5 100755
--- a/smoketest/scripts/cli/test_interfaces_dummy.py
+++ b/smoketest/scripts/cli/test_interfaces_dummy.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2020 VyOS maintainers and contributors
+# Copyright (C) 2020-2021 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -20,9 +20,9 @@ from base_interfaces_test import BasicInterfaceTest
class DummyInterfaceTest(BasicInterfaceTest.BaseTest):
def setUp(self):
- self._base_path = ['interfaces', 'dummy']
- self._interfaces = ['dum0', 'dum1', 'dum2']
- super().setUp()
+ self._base_path = ['interfaces', 'dummy']
+ self._interfaces = ['dum435', 'dum8677', 'dum0931', 'dum089']
+ super().setUp()
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_interfaces_ethernet.py b/smoketest/scripts/cli/test_interfaces_ethernet.py
index 3c4796283..6a0bdf150 100755
--- a/smoketest/scripts/cli/test_interfaces_ethernet.py
+++ b/smoketest/scripts/cli/test_interfaces_ethernet.py
@@ -19,6 +19,7 @@ import re
import unittest
from base_interfaces_test import BasicInterfaceTest
+from vyos.configsession import ConfigSessionError
from vyos.ifconfig import Section
from vyos.util import cmd
from vyos.util import process_named_running
@@ -123,6 +124,28 @@ class EthernetInterfaceTest(BasicInterfaceTest.BaseTest):
self.assertEqual(f'{cpus:x}', f'{rps_cpus:x}')
+ def test_non_existing_interface(self):
+ unknonw_interface = self._base_path + ['eth667']
+ self.session.set(unknonw_interface)
+
+ # check validate() - interface does not exist
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+
+ # we need to remove this wrong interface from the configuration
+ # manually, else tearDown() will have problem in commit()
+ self.session.delete(unknonw_interface)
+
+ def test_speed_duplex_verify(self):
+ for interface in self._interfaces:
+ self.session.set(self._base_path + [interface, 'speed', '1000'])
+
+ # check validate() - if either speed or duplex is not auto, the
+ # other one must be manually configured, too
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+ self.session.set(self._base_path + [interface, 'speed', 'auto'])
+ self.session.commit()
def test_eapol_support(self):
for interface in self._interfaces:
diff --git a/smoketest/scripts/cli/test_protocols_bgp.py b/smoketest/scripts/cli/test_protocols_bgp.py
index 941d7828f..1d93aeda4 100755
--- a/smoketest/scripts/cli/test_protocols_bgp.py
+++ b/smoketest/scripts/cli/test_protocols_bgp.py
@@ -28,6 +28,8 @@ base_path = ['protocols', 'bgp', ASN]
neighbor_config = {
'192.0.2.1' : {
+ 'cap_dynamic' : '',
+ 'cap_ext_next': '',
'remote_as' : '100',
'adv_interv' : '400',
'passive' : '',
@@ -35,6 +37,7 @@ neighbor_config = {
'shutdown' : '',
'cap_over' : '',
'ttl_security': '5',
+ 'local_as' : '300',
},
'192.0.2.2' : {
'remote_as' : '200',
@@ -49,6 +52,7 @@ neighbor_config = {
'remote_as' : '200',
'passive' : '',
'multi_hop' : '5',
+ 'update_src' : 'lo',
},
}
@@ -63,18 +67,23 @@ peer_group_config = {
# 'ttl_security': '5',
},
'bar' : {
+# XXX: not available in current Perl backend
+# 'description' : 'foo peer bar group',
'remote_as' : '200',
'shutdown' : '',
'no_cap_nego' : '',
+ 'local_as' : '300',
},
'baz' : {
+ 'cap_dynamic' : '',
+ 'cap_ext_next': '',
'remote_as' : '200',
'passive' : '',
'multi_hop' : '5',
+ 'update_src' : 'lo',
},
}
-
def getFRRBGPconfig():
return cmd(f'vtysh -c "show run" | sed -n "/router bgp {ASN}/,/^!/p"')
@@ -87,6 +96,35 @@ class TestProtocolsBGP(unittest.TestCase):
self.session.commit()
del self.session
+ def verify_frr_config(self, peer, peer_config, frrconfig):
+ # recurring patterns to verify for both a simple neighbor and a peer-group
+ if 'cap_dynamic' in peer_config:
+ self.assertIn(f' neighbor {peer} capability dynamic', frrconfig)
+ if 'cap_ext_next' in peer_config:
+ self.assertIn(f' neighbor {peer} capability extended-nexthop', frrconfig)
+ if 'description' in peer_config:
+ self.assertIn(f' neighbor {peer} description {peer_config["description"]}', frrconfig)
+ if 'no_cap_nego' in peer_config:
+ self.assertIn(f' neighbor {peer} dont-capability-negotiate', frrconfig)
+ if 'multi_hop' in peer_config:
+ self.assertIn(f' neighbor {peer} ebgp-multihop {peer_config["multi_hop"]}', frrconfig)
+ if 'local_as' in peer_config:
+ self.assertIn(f' neighbor {peer} local-as {peer_config["local_as"]}', frrconfig)
+ if 'cap_over' in peer_config:
+ self.assertIn(f' neighbor {peer} override-capability', frrconfig)
+ if 'passive' in peer_config:
+ self.assertIn(f' neighbor {peer} passive', frrconfig)
+ if 'password' in peer_config:
+ self.assertIn(f' neighbor {peer} password {peer_config["password"]}', frrconfig)
+ if 'remote_as' in peer_config:
+ self.assertIn(f' neighbor {peer} remote-as {peer_config["remote_as"]}', frrconfig)
+ if 'shutdown' in peer_config:
+ self.assertIn(f' neighbor {peer} shutdown', frrconfig)
+ if 'ttl_security' in peer_config:
+ self.assertIn(f' neighbor {peer} ttl-security hops {peer_config["ttl_security"]}', frrconfig)
+ if 'update_src' in peer_config:
+ self.assertIn(f' neighbor {peer} update-source {peer_config["update_src"]}', frrconfig)
+
def test_bgp_01_simple(self):
router_id = '127.0.0.1'
local_pref = '500'
@@ -113,31 +151,41 @@ class TestProtocolsBGP(unittest.TestCase):
self.assertTrue(process_named_running(PROCESS_NAME))
def test_bgp_02_neighbors(self):
+ # Test out individual neighbor configuration items, not all of them are
+ # also available to a peer-group!
for neighbor, config in neighbor_config.items():
- if 'remote_as' in config:
- self.session.set(base_path + ['neighbor', neighbor, 'remote-as', config["remote_as"]])
- if 'description' in config:
- self.session.set(base_path + ['neighbor', neighbor, 'description', config["description"]])
- if 'passive' in config:
- self.session.set(base_path + ['neighbor', neighbor, 'passive'])
- if 'password' in config:
- self.session.set(base_path + ['neighbor', neighbor, 'password', config["password"]])
- if 'shutdown' in config:
- self.session.set(base_path + ['neighbor', neighbor, 'shutdown'])
if 'adv_interv' in config:
self.session.set(base_path + ['neighbor', neighbor, 'advertisement-interval', config["adv_interv"]])
+ if 'cap_dynamic' in config:
+ self.session.set(base_path + ['neighbor', neighbor, 'capability', 'dynamic'])
+ if 'cap_ext_next' in config:
+ self.session.set(base_path + ['neighbor', neighbor, 'capability', 'extended-nexthop'])
+ if 'description' in config:
+ self.session.set(base_path + ['neighbor', neighbor, 'description', config["description"]])
if 'no_cap_nego' in config:
self.session.set(base_path + ['neighbor', neighbor, 'disable-capability-negotiation'])
- if 'port' in config:
- self.session.set(base_path + ['neighbor', neighbor, 'port', config["port"]])
if 'multi_hop' in config:
self.session.set(base_path + ['neighbor', neighbor, 'ebgp-multihop', config["multi_hop"]])
+ if 'local_as' in config:
+ self.session.set(base_path + ['neighbor', neighbor, 'local-as', config["local_as"]])
if 'cap_over' in config:
self.session.set(base_path + ['neighbor', neighbor, 'override-capability'])
+ if 'passive' in config:
+ self.session.set(base_path + ['neighbor', neighbor, 'passive'])
+ if 'password' in config:
+ self.session.set(base_path + ['neighbor', neighbor, 'password', config["password"]])
+ if 'port' in config:
+ self.session.set(base_path + ['neighbor', neighbor, 'port', config["port"]])
+ if 'remote_as' in config:
+ self.session.set(base_path + ['neighbor', neighbor, 'remote-as', config["remote_as"]])
if 'cap_strict' in config:
self.session.set(base_path + ['neighbor', neighbor, 'strict-capability-match'])
+ if 'shutdown' in config:
+ self.session.set(base_path + ['neighbor', neighbor, 'shutdown'])
if 'ttl_security' in config:
self.session.set(base_path + ['neighbor', neighbor, 'ttl-security', 'hops', config["ttl_security"]])
+ if 'update_src' in config:
+ self.session.set(base_path + ['neighbor', neighbor, 'update-source', config["update_src"]])
# commit changes
self.session.commit()
@@ -146,49 +194,45 @@ class TestProtocolsBGP(unittest.TestCase):
frrconfig = getFRRBGPconfig()
self.assertIn(f'router bgp {ASN}', frrconfig)
- for neighbor, config in neighbor_config.items():
- if 'remote_as' in config:
- self.assertIn(f' neighbor {neighbor} remote-as {config["remote_as"]}', frrconfig)
- if 'description' in config:
- self.assertIn(f' neighbor {neighbor} description {config["description"]}', frrconfig)
- if 'passive' in config:
- self.assertIn(f' neighbor {neighbor} passive', frrconfig)
- if 'password' in config:
- self.assertIn(f' neighbor {neighbor} password {config["password"]}', frrconfig)
- if 'shutdown' in config:
- self.assertIn(f' neighbor {neighbor} shutdown', frrconfig)
+ for peer, peer_config in neighbor_config.items():
if 'adv_interv' in config:
- self.assertIn(f' neighbor {neighbor} advertisement-interval {config["adv_interv"]}', frrconfig)
- if 'no_cap_nego' in config:
- self.assertIn(f' neighbor {neighbor} dont-capability-negotiate', frrconfig)
+ self.assertIn(f' neighbor {peer} advertisement-interval {peer_config["adv_interv"]}', frrconfig)
if 'port' in config:
- self.assertIn(f' neighbor {neighbor} port {config["port"]}', frrconfig)
- if 'multi_hop' in config:
- self.assertIn(f' neighbor {neighbor} ebgp-multihop {config["multi_hop"]}', frrconfig)
- if 'cap_over' in config:
- self.assertIn(f' neighbor {neighbor} override-capability', frrconfig)
+ self.assertIn(f' neighbor {peer} port {peer_config["port"]}', frrconfig)
if 'cap_strict' in config:
- self.assertIn(f' neighbor {neighbor} strict-capability-match', frrconfig)
- if 'ttl_security' in config:
- self.assertIn(f' neighbor {neighbor} ttl-security hops {config["ttl_security"]}', frrconfig)
+ self.assertIn(f' neighbor {peer} strict-capability-match', frrconfig)
+
+ self.verify_frr_config(peer, peer_config, frrconfig)
def test_bgp_03_peer_groups(self):
+ # Test out individual peer-group configuration items
for peer_group, config in peer_group_config.items():
- self.session.set(base_path + ['peer-group', peer_group, 'remote-as', config["remote_as"]])
- if 'passive' in config:
- self.session.set(base_path + ['peer-group', peer_group, 'passive'])
- if 'password' in config:
- self.session.set(base_path + ['peer-group', peer_group, 'password', config["password"]])
- if 'shutdown' in config:
- self.session.set(base_path + ['peer-group', peer_group, 'shutdown'])
+ if 'cap_dynamic' in config:
+ self.session.set(base_path + ['peer-group', peer_group, 'capability', 'dynamic'])
+ if 'cap_ext_next' in config:
+ self.session.set(base_path + ['peer-group', peer_group, 'capability', 'extended-nexthop'])
+ if 'description' in config:
+ self.session.set(base_path + ['peer-group', peer_group, 'description', config["description"]])
if 'no_cap_nego' in config:
self.session.set(base_path + ['peer-group', peer_group, 'disable-capability-negotiation'])
if 'multi_hop' in config:
self.session.set(base_path + ['peer-group', peer_group, 'ebgp-multihop', config["multi_hop"]])
+ if 'local_as' in config:
+ self.session.set(base_path + ['peer-group', peer_group, 'local-as', config["local_as"]])
if 'cap_over' in config:
self.session.set(base_path + ['peer-group', peer_group, 'override-capability'])
+ if 'passive' in config:
+ self.session.set(base_path + ['peer-group', peer_group, 'passive'])
+ if 'password' in config:
+ self.session.set(base_path + ['peer-group', peer_group, 'password', config["password"]])
+ if 'remote_as' in config:
+ self.session.set(base_path + ['peer-group', peer_group, 'remote-as', config["remote_as"]])
+ if 'shutdown' in config:
+ self.session.set(base_path + ['peer-group', peer_group, 'shutdown'])
if 'ttl_security' in config:
self.session.set(base_path + ['peer-group', peer_group, 'ttl-security', 'hops', config["ttl_security"]])
+ if 'update_src' in config:
+ self.session.set(base_path + ['peer-group', peer_group, 'update-source', config["update_src"]])
# commit changes
self.session.commit()
@@ -197,25 +241,102 @@ class TestProtocolsBGP(unittest.TestCase):
frrconfig = getFRRBGPconfig()
self.assertIn(f'router bgp {ASN}', frrconfig)
- for peer_group, config in peer_group_config.items():
+ for peer, peer_config in peer_group_config.items():
self.assertIn(f' neighbor {peer_group} peer-group', frrconfig)
+ self.verify_frr_config(peer, peer_config, frrconfig)
+
+
+ def test_bgp_04_afi_ipv4(self):
+ networks = {
+ '10.0.0.0/8' : {
+ 'as_set' : '',
+ },
+ '100.64.0.0/10' : {
+ 'as_set' : '',
+ },
+ '192.168.0.0/16' : {
+ 'summary_only' : '',
+ },
+ }
+
+ # We want to redistribute ...
+ redistributes = ['connected', 'kernel', 'ospf', 'rip', 'static']
+ for redistribute in redistributes:
+ self.session.set(base_path + ['address-family', 'ipv4-unicast',
+ 'redistribute', redistribute])
+
+ for network, network_config in networks.items():
+ self.session.set(base_path + ['address-family', 'ipv4-unicast',
+ 'network', network])
+ if 'as_set' in network_config:
+ self.session.set(base_path + ['address-family', 'ipv4-unicast',
+ 'aggregate-address', network, 'as-set'])
+ if 'summary_only' in network_config:
+ self.session.set(base_path + ['address-family', 'ipv4-unicast',
+ 'aggregate-address', network, 'summary-only'])
+
+ # commit changes
+ self.session.commit()
+
+ # Verify FRR bgpd configuration
+ frrconfig = getFRRBGPconfig()
+ self.assertIn(f'router bgp {ASN}', frrconfig)
+ self.assertIn(f' address-family ipv4 unicast', frrconfig)
+
+ for redistribute in redistributes:
+ self.assertIn(f' redistribute {redistribute}', frrconfig)
+
+ for network, network_config in networks.items():
+ self.assertIn(f' network {network}', frrconfig)
+ if 'as_set' in network_config:
+ self.assertIn(f' aggregate-address {network} as-set', frrconfig)
+ if 'summary_only' in network_config:
+ self.assertIn(f' aggregate-address {network} summary-only', frrconfig)
+
+
+ def test_bgp_05_afi_ipv6(self):
+ networks = {
+ '2001:db8:100::/48' : {
+ },
+ '2001:db8:200::/48' : {
+ },
+ '2001:db8:300::/48' : {
+ 'summary_only' : '',
+ },
+ }
+
+ # We want to redistribute ...
+ redistributes = ['connected', 'kernel', 'ospfv3', 'ripng', 'static']
+ for redistribute in redistributes:
+ self.session.set(base_path + ['address-family', 'ipv6-unicast',
+ 'redistribute', redistribute])
+
+ for network, network_config in networks.items():
+ self.session.set(base_path + ['address-family', 'ipv6-unicast',
+ 'network', network])
+ if 'summary_only' in network_config:
+ self.session.set(base_path + ['address-family', 'ipv6-unicast',
+ 'aggregate-address', network, 'summary-only'])
+
+ # commit changes
+ self.session.commit()
+
+ # Verify FRR bgpd configuration
+ frrconfig = getFRRBGPconfig()
+ self.assertIn(f'router bgp {ASN}', frrconfig)
+ self.assertIn(f' address-family ipv6 unicast', frrconfig)
+
+ for redistribute in redistributes:
+ # FRR calls this OSPF6
+ if redistribute == 'ospfv3':
+ redistribute = 'ospf6'
+ self.assertIn(f' redistribute {redistribute}', frrconfig)
+
+ for network, network_config in networks.items():
+ self.assertIn(f' network {network}', frrconfig)
+ if 'as_set' in network_config:
+ self.assertIn(f' aggregate-address {network} summary-only', frrconfig)
- if 'remote_as' in config:
- self.assertIn(f' neighbor {peer_group} remote-as {config["remote_as"]}', frrconfig)
- if 'passive' in config:
- self.assertIn(f' neighbor {peer_group} passive', frrconfig)
- if 'password' in config:
- self.assertIn(f' neighbor {peer_group} password {config["password"]}', frrconfig)
- if 'shutdown' in config:
- self.assertIn(f' neighbor {peer_group} shutdown', frrconfig)
- if 'no_cap_nego' in config:
- self.assertIn(f' neighbor {peer_group} dont-capability-negotiate', frrconfig)
- if 'multi_hop' in config:
- self.assertIn(f' neighbor {peer_group} ebgp-multihop {config["multi_hop"]}', frrconfig)
- if 'cap_over' in config:
- self.assertIn(f' neighbor {peer_group} override-capability', frrconfig)
- if 'ttl_security' in config:
- self.assertIn(f' neighbor {peer_group} ttl-security hops {config["ttl_security"]}', frrconfig)
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_service_ssh.py b/smoketest/scripts/cli/test_service_ssh.py
index 0bb907c3a..eede042de 100755
--- a/smoketest/scripts/cli/test_service_ssh.py
+++ b/smoketest/scripts/cli/test_service_ssh.py
@@ -25,7 +25,7 @@ from vyos.util import process_named_running
from vyos.util import read_file
PROCESS_NAME = 'sshd'
-SSHD_CONF = '/run/ssh/sshd_config'
+SSHD_CONF = '/run/sshd/sshd_config'
base_path = ['service', 'ssh']
vrf = 'ssh-test'
@@ -44,11 +44,6 @@ class TestServiceSSH(unittest.TestCase):
def tearDown(self):
# delete testing SSH config
self.session.delete(base_path)
- # restore "plain" SSH access
- self.session.set(base_path)
- # delete VRF
- self.session.delete(['vrf', 'name', vrf])
-
self.session.commit()
del self.session
@@ -109,7 +104,7 @@ class TestServiceSSH(unittest.TestCase):
def test_ssh_multiple_listen_addresses(self):
# Check if SSH service can be configured and runs with multiple
# listen ports and listen-addresses
- ports = ['22', '2222']
+ ports = ['22', '2222', '2223', '2224']
for port in ports:
self.session.set(base_path + ['port', port])
@@ -143,7 +138,7 @@ class TestServiceSSH(unittest.TestCase):
with self.assertRaises(ConfigSessionError):
self.session.commit()
- self.session.set(['vrf', 'name', vrf, 'table', '1001'])
+ self.session.set(['vrf', 'name', vrf, 'table', '1338'])
# commit changes
self.session.commit()
@@ -159,5 +154,8 @@ class TestServiceSSH(unittest.TestCase):
tmp = cmd(f'ip vrf pids {vrf}')
self.assertIn(PROCESS_NAME, tmp)
+ # delete VRF
+ self.session.delete(['vrf', 'name', vrf])
+
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_system_login.py b/smoketest/scripts/cli/test_system_login.py
index 6188cf38b..bb6f57fc2 100755
--- a/smoketest/scripts/cli/test_system_login.py
+++ b/smoketest/scripts/cli/test_system_login.py
@@ -24,8 +24,10 @@ from platform import release as kernel_version
from subprocess import Popen, PIPE
from vyos.configsession import ConfigSession
+from vyos.configsession import ConfigSessionError
from vyos.util import cmd
from vyos.util import read_file
+from vyos.template import inc_ip
base_path = ['system', 'login']
users = ['vyos1', 'vyos2']
@@ -42,7 +44,7 @@ class TestSystemLogin(unittest.TestCase):
self.session.commit()
del self.session
- def test_local_user(self):
+ def test_system_login_user(self):
# Check if user can be created and we can SSH to localhost
self.session.set(['service', 'ssh', 'port', '22'])
@@ -82,7 +84,7 @@ class TestSystemLogin(unittest.TestCase):
for option in options:
self.assertIn(f'{option}=y', kernel_config)
- def test_radius_config(self):
+ def test_system_login_radius_ipv4(self):
# Verify generated RADIUS configuration files
radius_key = 'VyOSsecretVyOS'
@@ -95,6 +97,12 @@ class TestSystemLogin(unittest.TestCase):
self.session.set(base_path + ['radius', 'server', radius_server, 'port', radius_port])
self.session.set(base_path + ['radius', 'server', radius_server, 'timeout', radius_timeout])
self.session.set(base_path + ['radius', 'source-address', radius_source])
+ self.session.set(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)])
+
+ # check validate() - Only one IPv4 source-address supported
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+ self.session.delete(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)])
self.session.commit()
@@ -130,5 +138,59 @@ class TestSystemLogin(unittest.TestCase):
tmp = re.findall(r'group:\s+mapname\s+files', nsswitch_conf)
self.assertTrue(tmp)
+ def test_system_login_radius_ipv6(self):
+ # Verify generated RADIUS configuration files
+
+ radius_key = 'VyOS-VyOS'
+ radius_server = '2001:db8::1'
+ radius_source = '::1'
+ radius_port = '4000'
+ radius_timeout = '4'
+
+ self.session.set(base_path + ['radius', 'server', radius_server, 'key', radius_key])
+ self.session.set(base_path + ['radius', 'server', radius_server, 'port', radius_port])
+ self.session.set(base_path + ['radius', 'server', radius_server, 'timeout', radius_timeout])
+ self.session.set(base_path + ['radius', 'source-address', radius_source])
+ self.session.set(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)])
+
+ # check validate() - Only one IPv4 source-address supported
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+ self.session.delete(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)])
+
+ self.session.commit()
+
+ # this file must be read with higher permissions
+ pam_radius_auth_conf = cmd('sudo cat /etc/pam_radius_auth.conf')
+ tmp = re.findall(r'\n?\[{}\]:{}\s+{}\s+{}\s+\[{}\]'.format(radius_server,
+ radius_port, radius_key, radius_timeout,
+ radius_source), pam_radius_auth_conf)
+ self.assertTrue(tmp)
+
+ # required, static options
+ self.assertIn('priv-lvl 15', pam_radius_auth_conf)
+ self.assertIn('mapped_priv_user radius_priv_user', pam_radius_auth_conf)
+
+ # PAM
+ pam_common_account = read_file('/etc/pam.d/common-account')
+ self.assertIn('pam_radius_auth.so', pam_common_account)
+
+ pam_common_auth = read_file('/etc/pam.d/common-auth')
+ self.assertIn('pam_radius_auth.so', pam_common_auth)
+
+ pam_common_session = read_file('/etc/pam.d/common-session')
+ self.assertIn('pam_radius_auth.so', pam_common_session)
+
+ pam_common_session_noninteractive = read_file('/etc/pam.d/common-session-noninteractive')
+ self.assertIn('pam_radius_auth.so', pam_common_session_noninteractive)
+
+ # NSS
+ nsswitch_conf = read_file('/etc/nsswitch.conf')
+ tmp = re.findall(r'passwd:\s+mapuid\s+files\s+mapname', nsswitch_conf)
+ self.assertTrue(tmp)
+
+ tmp = re.findall(r'group:\s+mapname\s+files', nsswitch_conf)
+ self.assertTrue(tmp)
+
if __name__ == '__main__':
unittest.main(verbosity=2)