diff options
Diffstat (limited to 'smoketest/scripts/cli/test_protocols_rpki.py')
-rwxr-xr-x | smoketest/scripts/cli/test_protocols_rpki.py | 189 |
1 files changed, 114 insertions, 75 deletions
diff --git a/smoketest/scripts/cli/test_protocols_rpki.py b/smoketest/scripts/cli/test_protocols_rpki.py index 0addf7fee..50a157fd0 100755 --- a/smoketest/scripts/cli/test_protocols_rpki.py +++ b/smoketest/scripts/cli/test_protocols_rpki.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2021-2024 VyOS maintainers and contributors +# Copyright VyOS maintainers and contributors <maintainers@vyos.io> # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -25,6 +25,11 @@ from vyos.utils.file import read_file from vyos.utils.process import process_named_running base_path = ['protocols', 'rpki'] +base_frr_config_args = {'string': 'rpki', 'endsection': '^exit'} +vrf = 'blue' +vrf_path = ['vrf', 'name', vrf] +vrf_frr_config_args = {'string': f'vrf {vrf}', 'endsection':'^exit-vrf', + 'substring': ' rpki', 'endsubsection': '^ exit'} rpki_key_name = 'rpki-smoketest' rpki_key_type = 'ssh-rsa' @@ -112,14 +117,19 @@ class TestProtocolsRPKI(VyOSUnitTestSHIM.TestCase): # ensure we can also run this test on a live system - so lets clean # out the current configuration :) cls.cli_delete(cls, base_path) + cls.cli_delete(cls, vrf_path) # Enable CSTORE guard time required by FRR related tests cls._commit_guard_time = CSTORE_GUARD_TIME def tearDown(self): self.cli_delete(base_path) + self.cli_delete(vrf_path) self.cli_commit() - frrconfig = self.getFRRconfig('rpki', endsection='^exit') + frrconfig = self.getFRRconfig(**base_frr_config_args) + self.assertNotIn(f'rpki', frrconfig) + + frrconfig = self.getFRRconfig(**vrf_frr_config_args) self.assertNotIn(f'rpki', frrconfig) # check process health and continuity @@ -144,27 +154,33 @@ class TestProtocolsRPKI(VyOSUnitTestSHIM.TestCase): }, } - self.cli_set(base_path + ['expire-interval', expire_interval]) - self.cli_set(base_path + ['polling-period', polling_period]) - self.cli_set(base_path + ['retry-interval', retry_interval]) + for test_set in [ {'path': base_path, 'frrargs': base_frr_config_args}, + {'path': vrf_path + base_path, 'frrargs': vrf_frr_config_args} ]: - for peer, peer_config in cache.items(): - self.cli_set(base_path + ['cache', peer, 'port', peer_config['port']]) - self.cli_set(base_path + ['cache', peer, 'preference', peer_config['preference']]) + if 'vrf' in test_set['path']: + self.cli_set(vrf_path + ['table', '1000']) - # commit changes - self.cli_commit() + self.cli_set(test_set['path'] + ['expire-interval', expire_interval]) + self.cli_set(test_set['path'] + ['polling-period', polling_period]) + self.cli_set(test_set['path'] + ['retry-interval', retry_interval]) + + for peer, peer_config in cache.items(): + self.cli_set(test_set['path'] + ['cache', peer, 'port', peer_config['port']]) + self.cli_set(test_set['path'] + ['cache', peer, 'preference', peer_config['preference']]) + + # commit changes + self.cli_commit() - # Verify FRR configuration - frrconfig = self.getFRRconfig('rpki', endsection='^exit') - self.assertIn(f'rpki expire_interval {expire_interval}', frrconfig) - self.assertIn(f'rpki polling_period {polling_period}', frrconfig) - self.assertIn(f'rpki retry_interval {retry_interval}', frrconfig) + # Verify FRR configuration + frrconfig = self.getFRRconfig(**test_set['frrargs']) + self.assertIn(f'rpki expire_interval {expire_interval}', frrconfig) + self.assertIn(f'rpki polling_period {polling_period}', frrconfig) + self.assertIn(f'rpki retry_interval {retry_interval}', frrconfig) - for peer, peer_config in cache.items(): - port = peer_config['port'] - preference = peer_config['preference'] - self.assertIn(f'rpki cache tcp {peer} {port} preference {preference}', frrconfig) + for peer, peer_config in cache.items(): + port = peer_config['port'] + preference = peer_config['preference'] + self.assertIn(f'rpki cache tcp {peer} {port} preference {preference}', frrconfig) def test_rpki_ssh(self): polling = '7200' @@ -185,28 +201,34 @@ class TestProtocolsRPKI(VyOSUnitTestSHIM.TestCase): self.cli_set(['pki', 'openssh', rpki_key_name, 'public', 'key', rpki_ssh_pub.replace('\n','')]) self.cli_set(['pki', 'openssh', rpki_key_name, 'public', 'type', rpki_key_type]) - for cache_name, cache_config in cache.items(): - self.cli_set(base_path + ['cache', cache_name, 'port', cache_config['port']]) - self.cli_set(base_path + ['cache', cache_name, 'preference', cache_config['preference']]) - self.cli_set(base_path + ['cache', cache_name, 'ssh', 'username', cache_config['username']]) - self.cli_set(base_path + ['cache', cache_name, 'ssh', 'key', rpki_key_name]) + for test_set in [ {'path': base_path, 'frrargs': base_frr_config_args}, + {'path': vrf_path + base_path, 'frrargs': vrf_frr_config_args} ]: - # commit changes - self.cli_commit() + if 'vrf' in test_set['path']: + self.cli_set(vrf_path + ['table', '1000']) + + for cache_name, cache_config in cache.items(): + self.cli_set(test_set['path'] + ['cache', cache_name, 'port', cache_config['port']]) + self.cli_set(test_set['path'] + ['cache', cache_name, 'preference', cache_config['preference']]) + self.cli_set(test_set['path'] + ['cache', cache_name, 'ssh', 'username', cache_config['username']]) + self.cli_set(test_set['path'] + ['cache', cache_name, 'ssh', 'key', rpki_key_name]) + + # commit changes + self.cli_commit() - # Verify FRR configuration - frrconfig = self.getFRRconfig('rpki', endsection='^exit') - for cache_name, cache_config in cache.items(): - port = cache_config['port'] - preference = cache_config['preference'] - username = cache_config['username'] - self.assertIn(f'rpki cache ssh {cache_name} {port} {username} /run/frr/id_rpki_{cache_name} /run/frr/id_rpki_{cache_name}.pub preference {preference}', frrconfig) + # Verify FRR configuration + frrconfig = self.getFRRconfig(**test_set['frrargs']) + for cache_name, cache_config in cache.items(): + port = cache_config['port'] + preference = cache_config['preference'] + username = cache_config['username'] + self.assertIn(f'rpki cache ssh {cache_name} {port} {username} /run/frr/id_rpki_{cache_name} /run/frr/id_rpki_{cache_name}.pub preference {preference}', frrconfig) - # Verify content of SSH keys - tmp = read_file(f'/run/frr/id_rpki_{cache_name}') - self.assertIn(rpki_ssh_key.replace('\n',''), tmp) - tmp = read_file(f'/run/frr/id_rpki_{cache_name}.pub') - self.assertIn(rpki_ssh_pub.replace('\n',''), tmp) + # Verify content of SSH keys + tmp = read_file(f'/run/frr/id_rpki_{cache_name}') + self.assertIn(rpki_ssh_key.replace('\n',''), tmp) + tmp = read_file(f'/run/frr/id_rpki_{cache_name}.pub') + self.assertIn(rpki_ssh_pub.replace('\n',''), tmp) # Change OpenSSH key and verify it was properly written to filesystem self.cli_set(['pki', 'openssh', rpki_key_name, 'private', 'key', rpki_ssh_key_replacement.replace('\n','')]) @@ -214,17 +236,21 @@ class TestProtocolsRPKI(VyOSUnitTestSHIM.TestCase): # commit changes self.cli_commit() - for cache_name, cache_config in cache.items(): - port = cache_config['port'] - preference = cache_config['preference'] - username = cache_config['username'] - self.assertIn(f'rpki cache ssh {cache_name} {port} {username} /run/frr/id_rpki_{cache_name} /run/frr/id_rpki_{cache_name}.pub preference {preference}', frrconfig) + for test_set in [ {'path': base_path, 'frrargs': base_frr_config_args}, + {'path': vrf_path + base_path, 'frrargs': vrf_frr_config_args} ]: - # Verify content of SSH keys - tmp = read_file(f'/run/frr/id_rpki_{cache_name}') - self.assertIn(rpki_ssh_key_replacement.replace('\n',''), tmp) - tmp = read_file(f'/run/frr/id_rpki_{cache_name}.pub') - self.assertIn(rpki_ssh_pub_replacement.replace('\n',''), tmp) + frrconfig = self.getFRRconfig(**test_set['frrargs']) + for cache_name, cache_config in cache.items(): + port = cache_config['port'] + preference = cache_config['preference'] + username = cache_config['username'] + self.assertIn(f'rpki cache ssh {cache_name} {port} {username} /run/frr/id_rpki_{cache_name} /run/frr/id_rpki_{cache_name}.pub preference {preference}', frrconfig) + + # Verify content of SSH keys + tmp = read_file(f'/run/frr/id_rpki_{cache_name}') + self.assertIn(rpki_ssh_key_replacement.replace('\n',''), tmp) + tmp = read_file(f'/run/frr/id_rpki_{cache_name}.pub') + self.assertIn(rpki_ssh_pub_replacement.replace('\n',''), tmp) self.cli_delete(['pki', 'openssh']) @@ -240,13 +266,19 @@ class TestProtocolsRPKI(VyOSUnitTestSHIM.TestCase): }, } - for peer, peer_config in cache.items(): - self.cli_set(base_path + ['cache', peer, 'port', peer_config['port']]) - self.cli_set(base_path + ['cache', peer, 'preference', peer_config['preference']]) + for test_set in [ {'path': base_path, 'frrargs': base_frr_config_args}, + {'path': vrf_path + base_path, 'frrargs': vrf_frr_config_args} ]: - # check validate() - preferences must be unique - with self.assertRaises(ConfigSessionError): - self.cli_commit() + if 'vrf' in test_set['path']: + self.cli_set(vrf_path + ['table', '1000']) + + for peer, peer_config in cache.items(): + self.cli_set(test_set['path'] + ['cache', peer, 'port', peer_config['port']]) + self.cli_set(test_set['path'] + ['cache', peer, 'preference', peer_config['preference']]) + + # check validate() - preferences must be unique + with self.assertRaises(ConfigSessionError): + self.cli_commit() def test_rpki_source_address(self): peer = '192.0.2.1' @@ -257,31 +289,38 @@ class TestProtocolsRPKI(VyOSUnitTestSHIM.TestCase): self.cli_set(['interfaces', 'ethernet', 'eth0', 'address', f'{source_address}/24']) - # Configure a TCP cache server - self.cli_set(base_path + ['cache', peer, 'port', port]) - self.cli_set(base_path + ['cache', peer, 'preference', preference]) - self.cli_set(base_path + ['cache', peer, 'source-address', source_address]) - self.cli_commit() - # Verify FRR configuration - frrconfig = self.getFRRconfig('rpki') - self.assertIn(f'rpki cache tcp {peer} {port} source {source_address} preference {preference}', frrconfig) + for test_set in [ {'path': base_path, 'frrargs': base_frr_config_args}, + {'path': vrf_path + base_path, 'frrargs': vrf_frr_config_args} ]: - self.cli_set(['pki', 'openssh', rpki_key_name, 'private', 'key', rpki_ssh_key.replace('\n', '')]) - self.cli_set(['pki', 'openssh', rpki_key_name, 'public', 'key', rpki_ssh_pub.replace('\n', '')]) - self.cli_set(['pki', 'openssh', rpki_key_name, 'public', 'type', rpki_key_type]) + if 'vrf' in test_set['path']: + self.cli_set(vrf_path + ['table', '1000']) - # Configure a SSH cache server - self.cli_set(base_path + ['cache', peer, 'ssh', 'username', username]) - self.cli_set(base_path + ['cache', peer, 'ssh', 'key', rpki_key_name]) - self.cli_commit() + # Configure a TCP cache server + self.cli_set(test_set['path'] + ['cache', peer, 'port', port]) + self.cli_set(test_set['path'] + ['cache', peer, 'preference', preference]) + self.cli_set(test_set['path'] + ['cache', peer, 'source-address', source_address]) + self.cli_commit() + + # Verify FRR configuration + frrconfig = self.getFRRconfig(**test_set['frrargs']) + self.assertIn(f'rpki cache tcp {peer} {port} source {source_address} preference {preference}', frrconfig) + + self.cli_set(['pki', 'openssh', rpki_key_name, 'private', 'key', rpki_ssh_key.replace('\n', '')]) + self.cli_set(['pki', 'openssh', rpki_key_name, 'public', 'key', rpki_ssh_pub.replace('\n', '')]) + self.cli_set(['pki', 'openssh', rpki_key_name, 'public', 'type', rpki_key_type]) + + # Configure a SSH cache server + self.cli_set(test_set['path'] + ['cache', peer, 'ssh', 'username', username]) + self.cli_set(test_set['path'] + ['cache', peer, 'ssh', 'key', rpki_key_name]) + self.cli_commit() - # Verify FRR configuration - frrconfig = self.getFRRconfig('rpki') - self.assertIn( - f'rpki cache ssh {peer} {port} {username} /run/frr/id_rpki_{peer} /run/frr/id_rpki_{peer}.pub source {source_address} preference {preference}', - frrconfig, - ) + # Verify FRR configuration + frrconfig = self.getFRRconfig(**test_set['frrargs']) + self.assertIn( + f'rpki cache ssh {peer} {port} {username} /run/frr/id_rpki_{peer} /run/frr/id_rpki_{peer}.pub source {source_address} preference {preference}', + frrconfig, + ) if __name__ == '__main__': |