diff options
Diffstat (limited to 'smoketest')
-rwxr-xr-x | smoketest/scripts/cli/test_protocols_bgp.py | 39 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_protocols_rpki.py | 149 |
2 files changed, 152 insertions, 36 deletions
diff --git a/smoketest/scripts/cli/test_protocols_bgp.py b/smoketest/scripts/cli/test_protocols_bgp.py index 87d10eb85..30d98976d 100755 --- a/smoketest/scripts/cli/test_protocols_bgp.py +++ b/smoketest/scripts/cli/test_protocols_bgp.py @@ -122,6 +122,9 @@ class TestProtocolsBGP(unittest.TestCase): self.session.commit() del self.session + # Check for running process + self.assertTrue(process_named_running(PROCESS_NAME)) + def verify_frr_config(self, peer, peer_config, frrconfig): # recurring patterns to verify for both a simple neighbor and a peer-group if 'cap_dynamic' in peer_config: @@ -182,8 +185,6 @@ class TestProtocolsBGP(unittest.TestCase): self.assertIn(f' bgp default local-preference {local_pref}', frrconfig) self.assertIn(f' no bgp default ipv4-unicast', frrconfig) - # Check for running process - self.assertTrue(process_named_running(PROCESS_NAME)) def test_bgp_02_neighbors(self): # Test out individual neighbor configuration items, not all of them are @@ -419,39 +420,5 @@ class TestProtocolsBGP(unittest.TestCase): for prefix in listen_ranges: self.assertIn(f' bgp listen range {prefix} peer-group {peer_group}', frrconfig) - - def test_bgp_07_rpki(self): - rpki_path = ['protocols', 'rpki'] - init_tmo = '50' - polling = '400' - preference = '100' - timeout = '900' - - cache = { - 'foo' : { 'address' : '1.1.1.1', 'port' : '8080' }, -# T3253 only one peer supported -# 'bar' : { 'address' : '2.2.2.2', 'port' : '9090' }, - } - - self.session.set(rpki_path + ['polling-period', polling]) - self.session.set(rpki_path + ['preference', preference]) - - for name, config in cache.items(): - self.session.set(rpki_path + ['cache', name, 'address', config['address']]) - self.session.set(rpki_path + ['cache', name, 'port', config['port']]) - - # commit changes - self.session.commit() - - # Verify FRR bgpd configuration - frrconfig = getFRRRPKIconfig() - self.assertIn(f'rpki polling_period {polling}', frrconfig) - - for name, config in cache.items(): - self.assertIn('rpki cache {address} {port} preference 1'.format(**config), frrconfig) - - self.session.delete(rpki_path) - - if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_protocols_rpki.py b/smoketest/scripts/cli/test_protocols_rpki.py new file mode 100755 index 000000000..7c65ca26f --- /dev/null +++ b/smoketest/scripts/cli/test_protocols_rpki.py @@ -0,0 +1,149 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2021 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import unittest + +from vyos.configsession import ConfigSession +from vyos.configsession import ConfigSessionError +from vyos.util import cmd +from vyos.util import process_named_running + +base_path = ['protocols', 'rpki'] +PROCESS_NAME = 'bgpd' + +rpki_known_hosts = '/config/auth/known_hosts' +rpki_ssh_key = '/config/auth/id_rsa_rpki' +rpki_ssh_pub = f'{rpki_ssh_key}.pub' + +def getFRRRPKIconfig(): + return cmd(f'vtysh -c "show run" | sed -n "/rpki/,/^!/p"') + +class TestProtocolsRPKI(unittest.TestCase): + def setUp(self): + self.session = ConfigSession(os.getpid()) + + def tearDown(self): + self.session.delete(base_path) + self.session.commit() + del self.session + + # Nothing RPKI specific should be left over in the config + frrconfig = getFRRRPKIconfig() + self.assertNotIn('rpki', frrconfig) + + # Check for running process + self.assertTrue(process_named_running(PROCESS_NAME)) + + def test_rpki(self): + polling = '7200' + cache = { + '192.0.2.1' : { + 'port' : '8080', + 'preference' : '1' + }, + '192.0.2.2' : { + 'port' : '9090', + 'preference' : '2' + }, + } + + self.session.set(base_path + ['polling-period', polling]) + for peer, peer_config in cache.items(): + self.session.set(base_path + ['cache', peer, 'port', peer_config['port']]) + self.session.set(base_path + ['cache', peer, 'preference', peer_config['preference']]) + + # commit changes + self.session.commit() + + # Verify FRR configuration + frrconfig = getFRRRPKIconfig() + self.assertIn(f'rpki polling_period {polling}', frrconfig) + + for peer, peer_config in cache.items(): + port = peer_config['port'] + preference = peer_config['preference'] + self.assertIn(f'rpki cache {peer} {port} preference {preference}', frrconfig) + + def test_rpki_ssh(self): + polling = '7200' + cache = { + '192.0.2.3' : { + 'port' : '1234', + 'username' : 'foo', + 'preference' : '10' + }, + '192.0.2.4' : { + 'port' : '5678', + 'username' : 'bar', + 'preference' : '20' + }, + } + + self.session.set(base_path + ['polling-period', polling]) + + for peer, peer_config in cache.items(): + self.session.set(base_path + ['cache', peer, 'port', peer_config['port']]) + self.session.set(base_path + ['cache', peer, 'preference', peer_config['preference']]) + self.session.set(base_path + ['cache', peer, 'ssh', 'username', peer_config['username']]) + self.session.set(base_path + ['cache', peer, 'ssh', 'public-key-file', rpki_ssh_pub]) + self.session.set(base_path + ['cache', peer, 'ssh', 'private-key-file', rpki_ssh_key]) + self.session.set(base_path + ['cache', peer, 'ssh', 'known-hosts-file', rpki_known_hosts]) + + # commit changes + self.session.commit() + + # Verify FRR configuration + frrconfig = getFRRRPKIconfig() + self.assertIn(f'rpki polling_period {polling}', frrconfig) + + for peer, peer_config in cache.items(): + port = peer_config['port'] + preference = peer_config['preference'] + username = peer_config['username'] + self.assertIn(f'rpki cache {peer} {port} {username} {rpki_ssh_key} {rpki_known_hosts} preference {preference}', frrconfig) + + + def test_rpki_verify_preference(self): + cache = { + '192.0.2.1' : { + 'port' : '8080', + 'preference' : '1' + }, + '192.0.2.2' : { + 'port' : '9090', + 'preference' : '1' + }, + } + + for peer, peer_config in cache.items(): + self.session.set(base_path + ['cache', peer, 'port', peer_config['port']]) + self.session.set(base_path + ['cache', peer, 'preference', peer_config['preference']]) + + # check validate() - preferences must be unique + with self.assertRaises(ConfigSessionError): + self.session.commit() + + +if __name__ == '__main__': + # Create OpenSSH keypair used in RPKI tests + if not os.path.isfile(rpki_ssh_key): + cmd(f'ssh-keygen -t rsa -f {rpki_ssh_key} -N ""') + + if not os.path.isfile(rpki_known_hosts): + cmd(f'touch {rpki_known_hosts}') + + unittest.main(verbosity=2, failfast=True) |