diff options
Diffstat (limited to 'smoketest/scripts/cli/test_protocols_rpki.py')
-rwxr-xr-x | smoketest/scripts/cli/test_protocols_rpki.py | 59 |
1 files changed, 50 insertions, 9 deletions
diff --git a/smoketest/scripts/cli/test_protocols_rpki.py b/smoketest/scripts/cli/test_protocols_rpki.py index 29f03a26a..0addf7fee 100755 --- a/smoketest/scripts/cli/test_protocols_rpki.py +++ b/smoketest/scripts/cli/test_protocols_rpki.py @@ -17,14 +17,14 @@ import unittest from base_vyostest_shim import VyOSUnitTestSHIM +from base_vyostest_shim import CSTORE_GUARD_TIME from vyos.configsession import ConfigSessionError +from vyos.frrender import bgp_daemon from vyos.utils.file import read_file from vyos.utils.process import process_named_running base_path = ['protocols', 'rpki'] -PROCESS_NAME = 'bgpd' - rpki_key_name = 'rpki-smoketest' rpki_key_type = 'ssh-rsa' @@ -108,17 +108,22 @@ class TestProtocolsRPKI(VyOSUnitTestSHIM.TestCase): # call base-classes classmethod super(TestProtocolsRPKI, cls).setUpClass() # Retrieve FRR daemon PID - it is not allowed to crash, thus PID must remain the same - cls.daemon_pid = process_named_running(PROCESS_NAME) + cls.daemon_pid = process_named_running(bgp_daemon) # ensure we can also run this test on a live system - so lets clean # out the current configuration :) cls.cli_delete(cls, base_path) + # Enable CSTORE guard time required by FRR related tests + cls._commit_guard_time = CSTORE_GUARD_TIME def tearDown(self): self.cli_delete(base_path) self.cli_commit() + frrconfig = self.getFRRconfig('rpki', endsection='^exit') + self.assertNotIn(f'rpki', frrconfig) + # check process health and continuity - self.assertEqual(self.daemon_pid, process_named_running(PROCESS_NAME)) + self.assertEqual(self.daemon_pid, process_named_running(bgp_daemon)) def test_rpki(self): expire_interval = '3600' @@ -151,7 +156,7 @@ class TestProtocolsRPKI(VyOSUnitTestSHIM.TestCase): self.cli_commit() # Verify FRR configuration - frrconfig = self.getFRRconfig('rpki') + frrconfig = self.getFRRconfig('rpki', endsection='^exit') self.assertIn(f'rpki expire_interval {expire_interval}', frrconfig) self.assertIn(f'rpki polling_period {polling_period}', frrconfig) self.assertIn(f'rpki retry_interval {retry_interval}', frrconfig) @@ -159,7 +164,7 @@ class TestProtocolsRPKI(VyOSUnitTestSHIM.TestCase): for peer, peer_config in cache.items(): port = peer_config['port'] preference = peer_config['preference'] - self.assertIn(f'rpki cache {peer} {port} preference {preference}', frrconfig) + self.assertIn(f'rpki cache tcp {peer} {port} preference {preference}', frrconfig) def test_rpki_ssh(self): polling = '7200' @@ -190,12 +195,12 @@ class TestProtocolsRPKI(VyOSUnitTestSHIM.TestCase): self.cli_commit() # Verify FRR configuration - frrconfig = self.getFRRconfig('rpki') + frrconfig = self.getFRRconfig('rpki', endsection='^exit') for cache_name, cache_config in cache.items(): port = cache_config['port'] preference = cache_config['preference'] username = cache_config['username'] - self.assertIn(f'rpki cache {cache_name} {port} {username} /run/frr/id_rpki_{cache_name} /run/frr/id_rpki_{cache_name}.pub preference {preference}', frrconfig) + self.assertIn(f'rpki cache ssh {cache_name} {port} {username} /run/frr/id_rpki_{cache_name} /run/frr/id_rpki_{cache_name}.pub preference {preference}', frrconfig) # Verify content of SSH keys tmp = read_file(f'/run/frr/id_rpki_{cache_name}') @@ -213,7 +218,7 @@ class TestProtocolsRPKI(VyOSUnitTestSHIM.TestCase): port = cache_config['port'] preference = cache_config['preference'] username = cache_config['username'] - self.assertIn(f'rpki cache {cache_name} {port} {username} /run/frr/id_rpki_{cache_name} /run/frr/id_rpki_{cache_name}.pub preference {preference}', frrconfig) + self.assertIn(f'rpki cache ssh {cache_name} {port} {username} /run/frr/id_rpki_{cache_name} /run/frr/id_rpki_{cache_name}.pub preference {preference}', frrconfig) # Verify content of SSH keys tmp = read_file(f'/run/frr/id_rpki_{cache_name}') @@ -243,5 +248,41 @@ class TestProtocolsRPKI(VyOSUnitTestSHIM.TestCase): with self.assertRaises(ConfigSessionError): self.cli_commit() + def test_rpki_source_address(self): + peer = '192.0.2.1' + port = '8080' + preference = '1' + username = 'foo' + source_address = '100.10.10.1' + + self.cli_set(['interfaces', 'ethernet', 'eth0', 'address', f'{source_address}/24']) + + # Configure a TCP cache server + self.cli_set(base_path + ['cache', peer, 'port', port]) + self.cli_set(base_path + ['cache', peer, 'preference', preference]) + self.cli_set(base_path + ['cache', peer, 'source-address', source_address]) + self.cli_commit() + + # Verify FRR configuration + frrconfig = self.getFRRconfig('rpki') + self.assertIn(f'rpki cache tcp {peer} {port} source {source_address} preference {preference}', frrconfig) + + self.cli_set(['pki', 'openssh', rpki_key_name, 'private', 'key', rpki_ssh_key.replace('\n', '')]) + self.cli_set(['pki', 'openssh', rpki_key_name, 'public', 'key', rpki_ssh_pub.replace('\n', '')]) + self.cli_set(['pki', 'openssh', rpki_key_name, 'public', 'type', rpki_key_type]) + + # Configure a SSH cache server + self.cli_set(base_path + ['cache', peer, 'ssh', 'username', username]) + self.cli_set(base_path + ['cache', peer, 'ssh', 'key', rpki_key_name]) + self.cli_commit() + + # Verify FRR configuration + frrconfig = self.getFRRconfig('rpki') + self.assertIn( + f'rpki cache ssh {peer} {port} {username} /run/frr/id_rpki_{peer} /run/frr/id_rpki_{peer}.pub source {source_address} preference {preference}', + frrconfig, + ) + + if __name__ == '__main__': unittest.main(verbosity=2) |