summaryrefslogtreecommitdiff
path: root/smoketest/scripts/cli/test_protocols_rpki.py
diff options
context:
space:
mode:
Diffstat (limited to 'smoketest/scripts/cli/test_protocols_rpki.py')
-rwxr-xr-xsmoketest/scripts/cli/test_protocols_rpki.py192
1 files changed, 136 insertions, 56 deletions
diff --git a/smoketest/scripts/cli/test_protocols_rpki.py b/smoketest/scripts/cli/test_protocols_rpki.py
index 29f03a26a..50a157fd0 100755
--- a/smoketest/scripts/cli/test_protocols_rpki.py
+++ b/smoketest/scripts/cli/test_protocols_rpki.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2021-2024 VyOS maintainers and contributors
+# Copyright VyOS maintainers and contributors <maintainers@vyos.io>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -17,14 +17,19 @@
import unittest
from base_vyostest_shim import VyOSUnitTestSHIM
+from base_vyostest_shim import CSTORE_GUARD_TIME
from vyos.configsession import ConfigSessionError
+from vyos.frrender import bgp_daemon
from vyos.utils.file import read_file
from vyos.utils.process import process_named_running
base_path = ['protocols', 'rpki']
-PROCESS_NAME = 'bgpd'
-
+base_frr_config_args = {'string': 'rpki', 'endsection': '^exit'}
+vrf = 'blue'
+vrf_path = ['vrf', 'name', vrf]
+vrf_frr_config_args = {'string': f'vrf {vrf}', 'endsection':'^exit-vrf',
+ 'substring': ' rpki', 'endsubsection': '^ exit'}
rpki_key_name = 'rpki-smoketest'
rpki_key_type = 'ssh-rsa'
@@ -108,17 +113,27 @@ class TestProtocolsRPKI(VyOSUnitTestSHIM.TestCase):
# call base-classes classmethod
super(TestProtocolsRPKI, cls).setUpClass()
# Retrieve FRR daemon PID - it is not allowed to crash, thus PID must remain the same
- cls.daemon_pid = process_named_running(PROCESS_NAME)
+ cls.daemon_pid = process_named_running(bgp_daemon)
# ensure we can also run this test on a live system - so lets clean
# out the current configuration :)
cls.cli_delete(cls, base_path)
+ cls.cli_delete(cls, vrf_path)
+ # Enable CSTORE guard time required by FRR related tests
+ cls._commit_guard_time = CSTORE_GUARD_TIME
def tearDown(self):
self.cli_delete(base_path)
+ self.cli_delete(vrf_path)
self.cli_commit()
+ frrconfig = self.getFRRconfig(**base_frr_config_args)
+ self.assertNotIn(f'rpki', frrconfig)
+
+ frrconfig = self.getFRRconfig(**vrf_frr_config_args)
+ self.assertNotIn(f'rpki', frrconfig)
+
# check process health and continuity
- self.assertEqual(self.daemon_pid, process_named_running(PROCESS_NAME))
+ self.assertEqual(self.daemon_pid, process_named_running(bgp_daemon))
def test_rpki(self):
expire_interval = '3600'
@@ -139,27 +154,33 @@ class TestProtocolsRPKI(VyOSUnitTestSHIM.TestCase):
},
}
- self.cli_set(base_path + ['expire-interval', expire_interval])
- self.cli_set(base_path + ['polling-period', polling_period])
- self.cli_set(base_path + ['retry-interval', retry_interval])
+ for test_set in [ {'path': base_path, 'frrargs': base_frr_config_args},
+ {'path': vrf_path + base_path, 'frrargs': vrf_frr_config_args} ]:
- for peer, peer_config in cache.items():
- self.cli_set(base_path + ['cache', peer, 'port', peer_config['port']])
- self.cli_set(base_path + ['cache', peer, 'preference', peer_config['preference']])
+ if 'vrf' in test_set['path']:
+ self.cli_set(vrf_path + ['table', '1000'])
- # commit changes
- self.cli_commit()
+ self.cli_set(test_set['path'] + ['expire-interval', expire_interval])
+ self.cli_set(test_set['path'] + ['polling-period', polling_period])
+ self.cli_set(test_set['path'] + ['retry-interval', retry_interval])
+
+ for peer, peer_config in cache.items():
+ self.cli_set(test_set['path'] + ['cache', peer, 'port', peer_config['port']])
+ self.cli_set(test_set['path'] + ['cache', peer, 'preference', peer_config['preference']])
+
+ # commit changes
+ self.cli_commit()
- # Verify FRR configuration
- frrconfig = self.getFRRconfig('rpki')
- self.assertIn(f'rpki expire_interval {expire_interval}', frrconfig)
- self.assertIn(f'rpki polling_period {polling_period}', frrconfig)
- self.assertIn(f'rpki retry_interval {retry_interval}', frrconfig)
+ # Verify FRR configuration
+ frrconfig = self.getFRRconfig(**test_set['frrargs'])
+ self.assertIn(f'rpki expire_interval {expire_interval}', frrconfig)
+ self.assertIn(f'rpki polling_period {polling_period}', frrconfig)
+ self.assertIn(f'rpki retry_interval {retry_interval}', frrconfig)
- for peer, peer_config in cache.items():
- port = peer_config['port']
- preference = peer_config['preference']
- self.assertIn(f'rpki cache {peer} {port} preference {preference}', frrconfig)
+ for peer, peer_config in cache.items():
+ port = peer_config['port']
+ preference = peer_config['preference']
+ self.assertIn(f'rpki cache tcp {peer} {port} preference {preference}', frrconfig)
def test_rpki_ssh(self):
polling = '7200'
@@ -180,28 +201,34 @@ class TestProtocolsRPKI(VyOSUnitTestSHIM.TestCase):
self.cli_set(['pki', 'openssh', rpki_key_name, 'public', 'key', rpki_ssh_pub.replace('\n','')])
self.cli_set(['pki', 'openssh', rpki_key_name, 'public', 'type', rpki_key_type])
- for cache_name, cache_config in cache.items():
- self.cli_set(base_path + ['cache', cache_name, 'port', cache_config['port']])
- self.cli_set(base_path + ['cache', cache_name, 'preference', cache_config['preference']])
- self.cli_set(base_path + ['cache', cache_name, 'ssh', 'username', cache_config['username']])
- self.cli_set(base_path + ['cache', cache_name, 'ssh', 'key', rpki_key_name])
+ for test_set in [ {'path': base_path, 'frrargs': base_frr_config_args},
+ {'path': vrf_path + base_path, 'frrargs': vrf_frr_config_args} ]:
- # commit changes
- self.cli_commit()
+ if 'vrf' in test_set['path']:
+ self.cli_set(vrf_path + ['table', '1000'])
- # Verify FRR configuration
- frrconfig = self.getFRRconfig('rpki')
- for cache_name, cache_config in cache.items():
- port = cache_config['port']
- preference = cache_config['preference']
- username = cache_config['username']
- self.assertIn(f'rpki cache {cache_name} {port} {username} /run/frr/id_rpki_{cache_name} /run/frr/id_rpki_{cache_name}.pub preference {preference}', frrconfig)
+ for cache_name, cache_config in cache.items():
+ self.cli_set(test_set['path'] + ['cache', cache_name, 'port', cache_config['port']])
+ self.cli_set(test_set['path'] + ['cache', cache_name, 'preference', cache_config['preference']])
+ self.cli_set(test_set['path'] + ['cache', cache_name, 'ssh', 'username', cache_config['username']])
+ self.cli_set(test_set['path'] + ['cache', cache_name, 'ssh', 'key', rpki_key_name])
- # Verify content of SSH keys
- tmp = read_file(f'/run/frr/id_rpki_{cache_name}')
- self.assertIn(rpki_ssh_key.replace('\n',''), tmp)
- tmp = read_file(f'/run/frr/id_rpki_{cache_name}.pub')
- self.assertIn(rpki_ssh_pub.replace('\n',''), tmp)
+ # commit changes
+ self.cli_commit()
+
+ # Verify FRR configuration
+ frrconfig = self.getFRRconfig(**test_set['frrargs'])
+ for cache_name, cache_config in cache.items():
+ port = cache_config['port']
+ preference = cache_config['preference']
+ username = cache_config['username']
+ self.assertIn(f'rpki cache ssh {cache_name} {port} {username} /run/frr/id_rpki_{cache_name} /run/frr/id_rpki_{cache_name}.pub preference {preference}', frrconfig)
+
+ # Verify content of SSH keys
+ tmp = read_file(f'/run/frr/id_rpki_{cache_name}')
+ self.assertIn(rpki_ssh_key.replace('\n',''), tmp)
+ tmp = read_file(f'/run/frr/id_rpki_{cache_name}.pub')
+ self.assertIn(rpki_ssh_pub.replace('\n',''), tmp)
# Change OpenSSH key and verify it was properly written to filesystem
self.cli_set(['pki', 'openssh', rpki_key_name, 'private', 'key', rpki_ssh_key_replacement.replace('\n','')])
@@ -209,17 +236,21 @@ class TestProtocolsRPKI(VyOSUnitTestSHIM.TestCase):
# commit changes
self.cli_commit()
- for cache_name, cache_config in cache.items():
- port = cache_config['port']
- preference = cache_config['preference']
- username = cache_config['username']
- self.assertIn(f'rpki cache {cache_name} {port} {username} /run/frr/id_rpki_{cache_name} /run/frr/id_rpki_{cache_name}.pub preference {preference}', frrconfig)
+ for test_set in [ {'path': base_path, 'frrargs': base_frr_config_args},
+ {'path': vrf_path + base_path, 'frrargs': vrf_frr_config_args} ]:
+
+ frrconfig = self.getFRRconfig(**test_set['frrargs'])
+ for cache_name, cache_config in cache.items():
+ port = cache_config['port']
+ preference = cache_config['preference']
+ username = cache_config['username']
+ self.assertIn(f'rpki cache ssh {cache_name} {port} {username} /run/frr/id_rpki_{cache_name} /run/frr/id_rpki_{cache_name}.pub preference {preference}', frrconfig)
- # Verify content of SSH keys
- tmp = read_file(f'/run/frr/id_rpki_{cache_name}')
- self.assertIn(rpki_ssh_key_replacement.replace('\n',''), tmp)
- tmp = read_file(f'/run/frr/id_rpki_{cache_name}.pub')
- self.assertIn(rpki_ssh_pub_replacement.replace('\n',''), tmp)
+ # Verify content of SSH keys
+ tmp = read_file(f'/run/frr/id_rpki_{cache_name}')
+ self.assertIn(rpki_ssh_key_replacement.replace('\n',''), tmp)
+ tmp = read_file(f'/run/frr/id_rpki_{cache_name}.pub')
+ self.assertIn(rpki_ssh_pub_replacement.replace('\n',''), tmp)
self.cli_delete(['pki', 'openssh'])
@@ -235,13 +266,62 @@ class TestProtocolsRPKI(VyOSUnitTestSHIM.TestCase):
},
}
- for peer, peer_config in cache.items():
- self.cli_set(base_path + ['cache', peer, 'port', peer_config['port']])
- self.cli_set(base_path + ['cache', peer, 'preference', peer_config['preference']])
+ for test_set in [ {'path': base_path, 'frrargs': base_frr_config_args},
+ {'path': vrf_path + base_path, 'frrargs': vrf_frr_config_args} ]:
- # check validate() - preferences must be unique
- with self.assertRaises(ConfigSessionError):
+ if 'vrf' in test_set['path']:
+ self.cli_set(vrf_path + ['table', '1000'])
+
+ for peer, peer_config in cache.items():
+ self.cli_set(test_set['path'] + ['cache', peer, 'port', peer_config['port']])
+ self.cli_set(test_set['path'] + ['cache', peer, 'preference', peer_config['preference']])
+
+ # check validate() - preferences must be unique
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
+
+ def test_rpki_source_address(self):
+ peer = '192.0.2.1'
+ port = '8080'
+ preference = '1'
+ username = 'foo'
+ source_address = '100.10.10.1'
+
+ self.cli_set(['interfaces', 'ethernet', 'eth0', 'address', f'{source_address}/24'])
+
+
+ for test_set in [ {'path': base_path, 'frrargs': base_frr_config_args},
+ {'path': vrf_path + base_path, 'frrargs': vrf_frr_config_args} ]:
+
+ if 'vrf' in test_set['path']:
+ self.cli_set(vrf_path + ['table', '1000'])
+
+ # Configure a TCP cache server
+ self.cli_set(test_set['path'] + ['cache', peer, 'port', port])
+ self.cli_set(test_set['path'] + ['cache', peer, 'preference', preference])
+ self.cli_set(test_set['path'] + ['cache', peer, 'source-address', source_address])
self.cli_commit()
+ # Verify FRR configuration
+ frrconfig = self.getFRRconfig(**test_set['frrargs'])
+ self.assertIn(f'rpki cache tcp {peer} {port} source {source_address} preference {preference}', frrconfig)
+
+ self.cli_set(['pki', 'openssh', rpki_key_name, 'private', 'key', rpki_ssh_key.replace('\n', '')])
+ self.cli_set(['pki', 'openssh', rpki_key_name, 'public', 'key', rpki_ssh_pub.replace('\n', '')])
+ self.cli_set(['pki', 'openssh', rpki_key_name, 'public', 'type', rpki_key_type])
+
+ # Configure a SSH cache server
+ self.cli_set(test_set['path'] + ['cache', peer, 'ssh', 'username', username])
+ self.cli_set(test_set['path'] + ['cache', peer, 'ssh', 'key', rpki_key_name])
+ self.cli_commit()
+
+ # Verify FRR configuration
+ frrconfig = self.getFRRconfig(**test_set['frrargs'])
+ self.assertIn(
+ f'rpki cache ssh {peer} {port} {username} /run/frr/id_rpki_{peer} /run/frr/id_rpki_{peer}.pub source {source_address} preference {preference}',
+ frrconfig,
+ )
+
+
if __name__ == '__main__':
unittest.main(verbosity=2)