diff options
Diffstat (limited to 'smoketest/scripts/cli/test_protocols_bgp.py')
-rwxr-xr-x | smoketest/scripts/cli/test_protocols_bgp.py | 354 |
1 files changed, 222 insertions, 132 deletions
diff --git a/smoketest/scripts/cli/test_protocols_bgp.py b/smoketest/scripts/cli/test_protocols_bgp.py index 9aa1541cf..8ed0f7228 100755 --- a/smoketest/scripts/cli/test_protocols_bgp.py +++ b/smoketest/scripts/cli/test_protocols_bgp.py @@ -14,18 +14,17 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import os import unittest -from vyos.configsession import ConfigSession +from base_vyostest_shim import VyOSUnitTestSHIM + from vyos.configsession import ConfigSessionError from vyos.template import is_ipv6 -from vyos.util import cmd from vyos.util import process_named_running PROCESS_NAME = 'bgpd' ASN = '64512' -base_path = ['protocols', 'bgp', ASN] +base_path = ['protocols', 'bgp'] route_map_in = 'foo-map-in' route_map_out = 'foo-map-out' @@ -103,8 +102,7 @@ peer_group_config = { 'password' : 'VyOS-Secure123', 'shutdown' : '', 'cap_over' : '', -# XXX: not available in current Perl backend -# 'ttl_security': '5', + 'ttl_security': '5', }, 'bar' : { 'description' : 'foo peer bar group', @@ -128,45 +126,32 @@ peer_group_config = { }, } -def getFRRBGPconfig(): - return cmd(f'vtysh -c "show run" | sed -n "/^router bgp {ASN}/,/^!/p"') - -def getFRRBgpAfiConfig(afi): - return cmd(f'vtysh -c "show run" | sed -n "/^router bgp {ASN}/,/^!/p" | sed -n "/^ address-family {afi} unicast/,/^ exit-address-family/p"') - -def getFRRBGPVNIconfig(vni): - return cmd(f'vtysh -c "show run" | sed -n "/^ vni {vni}/,/^!/p"') - -def getFRRRPKIconfig(): - return cmd(f'vtysh -c "show run" | sed -n "/^rpki/,/^!/p"') - -class TestProtocolsBGP(unittest.TestCase): +class TestProtocolsBGP(VyOSUnitTestSHIM.TestCase): def setUp(self): - self.session = ConfigSession(os.getpid()) + self.cli_set(['policy', 'route-map', route_map_in, 'rule', '10', 'action', 'permit']) + self.cli_set(['policy', 'route-map', route_map_out, 'rule', '10', 'action', 'permit']) + self.cli_set(['policy', 'prefix-list', prefix_list_in, 'rule', '10', 'action', 'permit']) + self.cli_set(['policy', 'prefix-list', prefix_list_in, 'rule', '10', 'prefix', '192.0.2.0/25']) + self.cli_set(['policy', 'prefix-list', prefix_list_out, 'rule', '10', 'action', 'permit']) + self.cli_set(['policy', 'prefix-list', prefix_list_out, 'rule', '10', 'prefix', '192.0.2.128/25']) - self.session.set(['policy', 'route-map', route_map_in, 'rule', '10', 'action', 'permit']) - self.session.set(['policy', 'route-map', route_map_out, 'rule', '10', 'action', 'permit']) - self.session.set(['policy', 'prefix-list', prefix_list_in, 'rule', '10', 'action', 'permit']) - self.session.set(['policy', 'prefix-list', prefix_list_in, 'rule', '10', 'prefix', '192.0.2.0/25']) - self.session.set(['policy', 'prefix-list', prefix_list_out, 'rule', '10', 'action', 'permit']) - self.session.set(['policy', 'prefix-list', prefix_list_out, 'rule', '10', 'prefix', '192.0.2.128/25']) + self.cli_set(['policy', 'prefix-list6', prefix_list_in6, 'rule', '10', 'action', 'permit']) + self.cli_set(['policy', 'prefix-list6', prefix_list_in6, 'rule', '10', 'prefix', '2001:db8:1000::/64']) + self.cli_set(['policy', 'prefix-list6', prefix_list_out6, 'rule', '10', 'action', 'deny']) + self.cli_set(['policy', 'prefix-list6', prefix_list_out6, 'rule', '10', 'prefix', '2001:db8:2000::/64']) - self.session.set(['policy', 'prefix-list6', prefix_list_in6, 'rule', '10', 'action', 'permit']) - self.session.set(['policy', 'prefix-list6', prefix_list_in6, 'rule', '10', 'prefix', '2001:db8:1000::/64']) - self.session.set(['policy', 'prefix-list6', prefix_list_out6, 'rule', '10', 'action', 'deny']) - self.session.set(['policy', 'prefix-list6', prefix_list_out6, 'rule', '10', 'prefix', '2001:db8:2000::/64']) + self.cli_set(base_path + ['local-as', ASN]) def tearDown(self): - self.session.delete(['policy', 'route-map', route_map_in]) - self.session.delete(['policy', 'route-map', route_map_out]) - self.session.delete(['policy', 'prefix-list', prefix_list_in]) - self.session.delete(['policy', 'prefix-list', prefix_list_out]) - self.session.delete(['policy', 'prefix-list6', prefix_list_in6]) - self.session.delete(['policy', 'prefix-list6', prefix_list_out6]) + self.cli_delete(['policy', 'route-map', route_map_in]) + self.cli_delete(['policy', 'route-map', route_map_out]) + self.cli_delete(['policy', 'prefix-list', prefix_list_in]) + self.cli_delete(['policy', 'prefix-list', prefix_list_out]) + self.cli_delete(['policy', 'prefix-list6', prefix_list_in6]) + self.cli_delete(['policy', 'prefix-list6', prefix_list_out6]) - self.session.delete(base_path) - self.session.commit() - del self.session + self.cli_delete(base_path) + self.cli_commit() # Check for running process self.assertTrue(process_named_running(PROCESS_NAME)) @@ -226,27 +211,35 @@ class TestProtocolsBGP(unittest.TestCase): max_path_v6 = '8' max_path_v6ibgp = '16' - self.session.set(base_path + ['parameters', 'router-id', router_id]) - self.session.set(base_path + ['parameters', 'log-neighbor-changes']) + self.cli_set(base_path + ['parameters', 'router-id', router_id]) + self.cli_set(base_path + ['parameters', 'log-neighbor-changes']) + + # Local AS number MUST be defined - as this is set in setUp() we remove + # this once for testing of the proper error + self.cli_delete(base_path + ['local-as']) + with self.assertRaises(ConfigSessionError): + self.cli_commit() + self.cli_set(base_path + ['local-as', ASN]) + # Default local preference (higher = more preferred, default value is 100) - self.session.set(base_path + ['parameters', 'default', 'local-pref', local_pref]) + self.cli_set(base_path + ['parameters', 'default', 'local-pref', local_pref]) # Deactivate IPv4 unicast for a peer by default - self.session.set(base_path + ['parameters', 'default', 'no-ipv4-unicast']) - self.session.set(base_path + ['parameters', 'graceful-restart', 'stalepath-time', stalepath_time]) - self.session.set(base_path + ['parameters', 'graceful-shutdown']) - self.session.set(base_path + ['parameters', 'ebgp-requires-policy']) + self.cli_set(base_path + ['parameters', 'default', 'no-ipv4-unicast']) + self.cli_set(base_path + ['parameters', 'graceful-restart', 'stalepath-time', stalepath_time]) + self.cli_set(base_path + ['parameters', 'graceful-shutdown']) + self.cli_set(base_path + ['parameters', 'ebgp-requires-policy']) # AFI maximum path support - self.session.set(base_path + ['address-family', 'ipv4-unicast', 'maximum-paths', 'ebgp', max_path_v4]) - self.session.set(base_path + ['address-family', 'ipv4-unicast', 'maximum-paths', 'ibgp', max_path_v4ibgp]) - self.session.set(base_path + ['address-family', 'ipv6-unicast', 'maximum-paths', 'ebgp', max_path_v6]) - self.session.set(base_path + ['address-family', 'ipv6-unicast', 'maximum-paths', 'ibgp', max_path_v6ibgp]) + self.cli_set(base_path + ['address-family', 'ipv4-unicast', 'maximum-paths', 'ebgp', max_path_v4]) + self.cli_set(base_path + ['address-family', 'ipv4-unicast', 'maximum-paths', 'ibgp', max_path_v4ibgp]) + self.cli_set(base_path + ['address-family', 'ipv6-unicast', 'maximum-paths', 'ebgp', max_path_v6]) + self.cli_set(base_path + ['address-family', 'ipv6-unicast', 'maximum-paths', 'ibgp', max_path_v6ibgp]) # commit changes - self.session.commit() + self.cli_commit() # Verify FRR bgpd configuration - frrconfig = getFRRBGPconfig() + frrconfig = self.getFRRconfig(f'router bgp {ASN}') self.assertIn(f'router bgp {ASN}', frrconfig) self.assertIn(f' bgp router-id {router_id}', frrconfig) self.assertIn(f' bgp log-neighbor-changes', frrconfig) @@ -256,11 +249,11 @@ class TestProtocolsBGP(unittest.TestCase): self.assertIn(f' bgp graceful-shutdown', frrconfig) self.assertNotIn(f'bgp ebgp-requires-policy', frrconfig) - afiv4_config = getFRRBgpAfiConfig('ipv4') + afiv4_config = self.getFRRconfig(' address-family ipv4 unicast') self.assertIn(f' maximum-paths {max_path_v4}', afiv4_config) self.assertIn(f' maximum-paths ibgp {max_path_v4ibgp}', afiv4_config) - afiv6_config = getFRRBgpAfiConfig('ipv6') + afiv6_config = self.getFRRconfig(' address-family ipv6 unicast') self.assertIn(f' maximum-paths {max_path_v6}', afiv6_config) self.assertIn(f' maximum-paths ibgp {max_path_v6ibgp}', afiv6_config) @@ -274,59 +267,59 @@ class TestProtocolsBGP(unittest.TestCase): afi = 'ipv6-unicast' if 'adv_interv' in peer_config: - self.session.set(base_path + ['neighbor', peer, 'advertisement-interval', peer_config["adv_interv"]]) + self.cli_set(base_path + ['neighbor', peer, 'advertisement-interval', peer_config["adv_interv"]]) if 'cap_dynamic' in peer_config: - self.session.set(base_path + ['neighbor', peer, 'capability', 'dynamic']) + self.cli_set(base_path + ['neighbor', peer, 'capability', 'dynamic']) if 'cap_ext_next' in peer_config: - self.session.set(base_path + ['neighbor', peer, 'capability', 'extended-nexthop']) + self.cli_set(base_path + ['neighbor', peer, 'capability', 'extended-nexthop']) if 'description' in peer_config: - self.session.set(base_path + ['neighbor', peer, 'description', peer_config["description"]]) + self.cli_set(base_path + ['neighbor', peer, 'description', peer_config["description"]]) if 'no_cap_nego' in peer_config: - self.session.set(base_path + ['neighbor', peer, 'disable-capability-negotiation']) + self.cli_set(base_path + ['neighbor', peer, 'disable-capability-negotiation']) if 'multi_hop' in peer_config: - self.session.set(base_path + ['neighbor', peer, 'ebgp-multihop', peer_config["multi_hop"]]) + self.cli_set(base_path + ['neighbor', peer, 'ebgp-multihop', peer_config["multi_hop"]]) if 'local_as' in peer_config: - self.session.set(base_path + ['neighbor', peer, 'local-as', peer_config["local_as"]]) + self.cli_set(base_path + ['neighbor', peer, 'local-as', peer_config["local_as"]]) if 'cap_over' in peer_config: - self.session.set(base_path + ['neighbor', peer, 'override-capability']) + self.cli_set(base_path + ['neighbor', peer, 'override-capability']) if 'passive' in peer_config: - self.session.set(base_path + ['neighbor', peer, 'passive']) + self.cli_set(base_path + ['neighbor', peer, 'passive']) if 'password' in peer_config: - self.session.set(base_path + ['neighbor', peer, 'password', peer_config["password"]]) + self.cli_set(base_path + ['neighbor', peer, 'password', peer_config["password"]]) if 'port' in peer_config: - self.session.set(base_path + ['neighbor', peer, 'port', peer_config["port"]]) + self.cli_set(base_path + ['neighbor', peer, 'port', peer_config["port"]]) if 'remote_as' in peer_config: - self.session.set(base_path + ['neighbor', peer, 'remote-as', peer_config["remote_as"]]) + self.cli_set(base_path + ['neighbor', peer, 'remote-as', peer_config["remote_as"]]) if 'cap_strict' in peer_config: - self.session.set(base_path + ['neighbor', peer, 'strict-capability-match']) + self.cli_set(base_path + ['neighbor', peer, 'strict-capability-match']) if 'shutdown' in peer_config: - self.session.set(base_path + ['neighbor', peer, 'shutdown']) + self.cli_set(base_path + ['neighbor', peer, 'shutdown']) if 'ttl_security' in peer_config: - self.session.set(base_path + ['neighbor', peer, 'ttl-security', 'hops', peer_config["ttl_security"]]) + self.cli_set(base_path + ['neighbor', peer, 'ttl-security', 'hops', peer_config["ttl_security"]]) if 'update_src' in peer_config: - self.session.set(base_path + ['neighbor', peer, 'update-source', peer_config["update_src"]]) + self.cli_set(base_path + ['neighbor', peer, 'update-source', peer_config["update_src"]]) if 'route_map_in' in peer_config: - self.session.set(base_path + ['neighbor', peer, 'address-family', afi, 'route-map', 'import', peer_config["route_map_in"]]) + self.cli_set(base_path + ['neighbor', peer, 'address-family', afi, 'route-map', 'import', peer_config["route_map_in"]]) if 'route_map_out' in peer_config: - self.session.set(base_path + ['neighbor', peer, 'address-family', afi, 'route-map', 'export', peer_config["route_map_out"]]) + self.cli_set(base_path + ['neighbor', peer, 'address-family', afi, 'route-map', 'export', peer_config["route_map_out"]]) if 'pfx_list_in' in peer_config: - self.session.set(base_path + ['neighbor', peer, 'address-family', afi, 'prefix-list', 'import', peer_config["pfx_list_in"]]) + self.cli_set(base_path + ['neighbor', peer, 'address-family', afi, 'prefix-list', 'import', peer_config["pfx_list_in"]]) if 'pfx_list_out' in peer_config: - self.session.set(base_path + ['neighbor', peer, 'address-family', afi, 'prefix-list', 'export', peer_config["pfx_list_out"]]) + self.cli_set(base_path + ['neighbor', peer, 'address-family', afi, 'prefix-list', 'export', peer_config["pfx_list_out"]]) if 'no_send_comm_std' in peer_config: - self.session.set(base_path + ['neighbor', peer, 'address-family', afi, 'disable-send-community', 'standard']) + self.cli_set(base_path + ['neighbor', peer, 'address-family', afi, 'disable-send-community', 'standard']) if 'no_send_comm_ext' in peer_config: - self.session.set(base_path + ['neighbor', peer, 'address-family', afi, 'disable-send-community', 'extended']) + self.cli_set(base_path + ['neighbor', peer, 'address-family', afi, 'disable-send-community', 'extended']) if 'addpath_all' in peer_config: - self.session.set(base_path + ['neighbor', peer, 'address-family', afi, 'addpath-tx-all']) + self.cli_set(base_path + ['neighbor', peer, 'address-family', afi, 'addpath-tx-all']) if 'addpath_per_as' in peer_config: - self.session.set(base_path + ['neighbor', peer, 'address-family', afi, 'addpath-tx-per-as']) + self.cli_set(base_path + ['neighbor', peer, 'address-family', afi, 'addpath-tx-per-as']) # commit changes - self.session.commit() + self.cli_commit() # Verify FRR bgpd configuration - frrconfig = getFRRBGPconfig() + frrconfig = self.getFRRconfig(f'router bgp {ASN}') self.assertIn(f'router bgp {ASN}', frrconfig) for peer, peer_config in neighbor_config.items(): @@ -343,53 +336,53 @@ class TestProtocolsBGP(unittest.TestCase): # Test out individual peer-group configuration items for peer_group, config in peer_group_config.items(): if 'cap_dynamic' in config: - self.session.set(base_path + ['peer-group', peer_group, 'capability', 'dynamic']) + self.cli_set(base_path + ['peer-group', peer_group, 'capability', 'dynamic']) if 'cap_ext_next' in config: - self.session.set(base_path + ['peer-group', peer_group, 'capability', 'extended-nexthop']) + self.cli_set(base_path + ['peer-group', peer_group, 'capability', 'extended-nexthop']) if 'description' in config: - self.session.set(base_path + ['peer-group', peer_group, 'description', config["description"]]) + self.cli_set(base_path + ['peer-group', peer_group, 'description', config["description"]]) if 'no_cap_nego' in config: - self.session.set(base_path + ['peer-group', peer_group, 'disable-capability-negotiation']) + self.cli_set(base_path + ['peer-group', peer_group, 'disable-capability-negotiation']) if 'multi_hop' in config: - self.session.set(base_path + ['peer-group', peer_group, 'ebgp-multihop', config["multi_hop"]]) + self.cli_set(base_path + ['peer-group', peer_group, 'ebgp-multihop', config["multi_hop"]]) if 'local_as' in config: - self.session.set(base_path + ['peer-group', peer_group, 'local-as', config["local_as"]]) + self.cli_set(base_path + ['peer-group', peer_group, 'local-as', config["local_as"]]) if 'cap_over' in config: - self.session.set(base_path + ['peer-group', peer_group, 'override-capability']) + self.cli_set(base_path + ['peer-group', peer_group, 'override-capability']) if 'passive' in config: - self.session.set(base_path + ['peer-group', peer_group, 'passive']) + self.cli_set(base_path + ['peer-group', peer_group, 'passive']) if 'password' in config: - self.session.set(base_path + ['peer-group', peer_group, 'password', config["password"]]) + self.cli_set(base_path + ['peer-group', peer_group, 'password', config["password"]]) if 'remote_as' in config: - self.session.set(base_path + ['peer-group', peer_group, 'remote-as', config["remote_as"]]) + self.cli_set(base_path + ['peer-group', peer_group, 'remote-as', config["remote_as"]]) if 'shutdown' in config: - self.session.set(base_path + ['peer-group', peer_group, 'shutdown']) + self.cli_set(base_path + ['peer-group', peer_group, 'shutdown']) if 'ttl_security' in config: - self.session.set(base_path + ['peer-group', peer_group, 'ttl-security', 'hops', config["ttl_security"]]) + self.cli_set(base_path + ['peer-group', peer_group, 'ttl-security', 'hops', config["ttl_security"]]) if 'update_src' in config: - self.session.set(base_path + ['peer-group', peer_group, 'update-source', config["update_src"]]) + self.cli_set(base_path + ['peer-group', peer_group, 'update-source', config["update_src"]]) if 'route_map_in' in config: - self.session.set(base_path + ['peer-group', peer_group, 'address-family', 'ipv4-unicast', 'route-map', 'import', config["route_map_in"]]) + self.cli_set(base_path + ['peer-group', peer_group, 'address-family', 'ipv4-unicast', 'route-map', 'import', config["route_map_in"]]) if 'route_map_out' in config: - self.session.set(base_path + ['peer-group', peer_group, 'address-family', 'ipv4-unicast', 'route-map', 'export', config["route_map_out"]]) + self.cli_set(base_path + ['peer-group', peer_group, 'address-family', 'ipv4-unicast', 'route-map', 'export', config["route_map_out"]]) if 'pfx_list_in' in config: - self.session.set(base_path + ['peer-group', peer_group, 'address-family', 'ipv4-unicast', 'prefix-list', 'import', config["pfx_list_in"]]) + self.cli_set(base_path + ['peer-group', peer_group, 'address-family', 'ipv4-unicast', 'prefix-list', 'import', config["pfx_list_in"]]) if 'pfx_list_out' in config: - self.session.set(base_path + ['peer-group', peer_group, 'address-family', 'ipv4-unicast', 'prefix-list', 'export', config["pfx_list_out"]]) + self.cli_set(base_path + ['peer-group', peer_group, 'address-family', 'ipv4-unicast', 'prefix-list', 'export', config["pfx_list_out"]]) if 'no_send_comm_std' in config: - self.session.set(base_path + ['peer-group', peer_group, 'address-family', 'ipv4-unicast', 'disable-send-community', 'standard']) + self.cli_set(base_path + ['peer-group', peer_group, 'address-family', 'ipv4-unicast', 'disable-send-community', 'standard']) if 'no_send_comm_ext' in config: - self.session.set(base_path + ['peer-group', peer_group, 'address-family', 'ipv4-unicast', 'disable-send-community', 'extended']) + self.cli_set(base_path + ['peer-group', peer_group, 'address-family', 'ipv4-unicast', 'disable-send-community', 'extended']) if 'addpath_all' in config: - self.session.set(base_path + ['peer-group', peer_group, 'address-family', 'ipv4-unicast', 'addpath-tx-all']) + self.cli_set(base_path + ['peer-group', peer_group, 'address-family', 'ipv4-unicast', 'addpath-tx-all']) if 'addpath_per_as' in config: - self.session.set(base_path + ['peer-group', peer_group, 'address-family', 'ipv4-unicast', 'addpath-tx-per-as']) + self.cli_set(base_path + ['peer-group', peer_group, 'address-family', 'ipv4-unicast', 'addpath-tx-per-as']) # commit changes - self.session.commit() + self.cli_commit() # Verify FRR bgpd configuration - frrconfig = getFRRBGPconfig() + frrconfig = self.getFRRconfig(f'router bgp {ASN}') self.assertIn(f'router bgp {ASN}', frrconfig) for peer, peer_config in peer_group_config.items(): @@ -413,24 +406,24 @@ class TestProtocolsBGP(unittest.TestCase): # We want to redistribute ... redistributes = ['connected', 'isis', 'kernel', 'ospf', 'rip', 'static'] for redistribute in redistributes: - self.session.set(base_path + ['address-family', 'ipv4-unicast', + self.cli_set(base_path + ['address-family', 'ipv4-unicast', 'redistribute', redistribute]) for network, network_config in networks.items(): - self.session.set(base_path + ['address-family', 'ipv4-unicast', + self.cli_set(base_path + ['address-family', 'ipv4-unicast', 'network', network]) if 'as_set' in network_config: - self.session.set(base_path + ['address-family', 'ipv4-unicast', + self.cli_set(base_path + ['address-family', 'ipv4-unicast', 'aggregate-address', network, 'as-set']) if 'summary_only' in network_config: - self.session.set(base_path + ['address-family', 'ipv4-unicast', + self.cli_set(base_path + ['address-family', 'ipv4-unicast', 'aggregate-address', network, 'summary-only']) # commit changes - self.session.commit() + self.cli_commit() # Verify FRR bgpd configuration - frrconfig = getFRRBGPconfig() + frrconfig = self.getFRRconfig(f'router bgp {ASN}') self.assertIn(f'router bgp {ASN}', frrconfig) self.assertIn(f' address-family ipv4 unicast', frrconfig) @@ -459,21 +452,21 @@ class TestProtocolsBGP(unittest.TestCase): # We want to redistribute ... redistributes = ['connected', 'kernel', 'ospfv3', 'ripng', 'static'] for redistribute in redistributes: - self.session.set(base_path + ['address-family', 'ipv6-unicast', + self.cli_set(base_path + ['address-family', 'ipv6-unicast', 'redistribute', redistribute]) for network, network_config in networks.items(): - self.session.set(base_path + ['address-family', 'ipv6-unicast', + self.cli_set(base_path + ['address-family', 'ipv6-unicast', 'network', network]) if 'summary_only' in network_config: - self.session.set(base_path + ['address-family', 'ipv6-unicast', + self.cli_set(base_path + ['address-family', 'ipv6-unicast', 'aggregate-address', network, 'summary-only']) # commit changes - self.session.commit() + self.cli_commit() # Verify FRR bgpd configuration - frrconfig = getFRRBGPconfig() + frrconfig = self.getFRRconfig(f'router bgp {ASN}') self.assertIn(f'router bgp {ASN}', frrconfig) self.assertIn(f' address-family ipv6 unicast', frrconfig) # T2100: By default ebgp-requires-policy is disabled to keep VyOS @@ -497,24 +490,26 @@ class TestProtocolsBGP(unittest.TestCase): limit = '64' listen_ranges = ['192.0.2.0/25', '192.0.2.128/25'] peer_group = 'listenfoobar' - self.session.set(base_path + ['listen', 'limit', limit]) + + self.cli_set(base_path + ['listen', 'limit', limit]) + for prefix in listen_ranges: - self.session.set(base_path + ['listen', 'range', prefix]) + self.cli_set(base_path + ['listen', 'range', prefix]) # check validate() - peer-group must be defined for range/prefix with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(base_path + ['listen', 'range', prefix, 'peer-group', peer_group]) + self.cli_commit() + self.cli_set(base_path + ['listen', 'range', prefix, 'peer-group', peer_group]) # check validate() - peer-group does yet not exist! with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(base_path + ['peer-group', peer_group, 'remote-as', ASN]) + self.cli_commit() + self.cli_set(base_path + ['peer-group', peer_group, 'remote-as', ASN]) # commit changes - self.session.commit() + self.cli_commit() # Verify FRR bgpd configuration - frrconfig = getFRRBGPconfig() + frrconfig = self.getFRRconfig(f'router bgp {ASN}') self.assertIn(f'router bgp {ASN}', frrconfig) self.assertIn(f' neighbor {peer_group} peer-group', frrconfig) self.assertIn(f' neighbor {peer_group} remote-as {ASN}', frrconfig) @@ -526,19 +521,20 @@ class TestProtocolsBGP(unittest.TestCase): def test_bgp_07_l2vpn_evpn(self): vnis = ['10010', '10020', '10030'] neighbors = ['192.0.2.10', '192.0.2.20', '192.0.2.30'] - self.session.set(base_path + ['address-family', 'l2vpn-evpn', 'advertise-all-vni']) - self.session.set(base_path + ['address-family', 'l2vpn-evpn', 'advertise-default-gw']) - self.session.set(base_path + ['address-family', 'l2vpn-evpn', 'advertise-svi-ip']) - self.session.set(base_path + ['address-family', 'l2vpn-evpn', 'flooding', 'disable']) + + self.cli_set(base_path + ['address-family', 'l2vpn-evpn', 'advertise-all-vni']) + self.cli_set(base_path + ['address-family', 'l2vpn-evpn', 'advertise-default-gw']) + self.cli_set(base_path + ['address-family', 'l2vpn-evpn', 'advertise-svi-ip']) + self.cli_set(base_path + ['address-family', 'l2vpn-evpn', 'flooding', 'disable']) for vni in vnis: - self.session.set(base_path + ['address-family', 'l2vpn-evpn', 'vni', vni, 'advertise-default-gw']) - self.session.set(base_path + ['address-family', 'l2vpn-evpn', 'vni', vni, 'advertise-svi-ip']) + self.cli_set(base_path + ['address-family', 'l2vpn-evpn', 'vni', vni, 'advertise-default-gw']) + self.cli_set(base_path + ['address-family', 'l2vpn-evpn', 'vni', vni, 'advertise-svi-ip']) # commit changes - self.session.commit() + self.cli_commit() # Verify FRR bgpd configuration - frrconfig = getFRRBGPconfig() + frrconfig = self.getFRRconfig(f'router bgp {ASN}') self.assertIn(f'router bgp {ASN}', frrconfig) self.assertIn(f' address-family l2vpn evpn', frrconfig) self.assertIn(f' advertise-all-vni', frrconfig) @@ -546,10 +542,104 @@ class TestProtocolsBGP(unittest.TestCase): self.assertIn(f' advertise-svi-ip', frrconfig) self.assertIn(f' flooding disable', frrconfig) for vni in vnis: - vniconfig = getFRRBGPVNIconfig(vni) + vniconfig = self.getFRRconfig(f' vni {vni}') self.assertIn(f'vni {vni}', vniconfig) self.assertIn(f' advertise-default-gw', vniconfig) self.assertIn(f' advertise-svi-ip', vniconfig) + def test_bgp_08_zebra_route_map(self): + # Implemented because of T3328 + self.cli_set(base_path + ['route-map', route_map_in]) + # commit changes + self.cli_commit() + + # Verify FRR configuration + zebra_route_map = f'ip protocol bgp route-map {route_map_in}' + frrconfig = self.getFRRconfig(zebra_route_map) + self.assertIn(zebra_route_map, frrconfig) + + # Remove the route-map again + self.cli_delete(base_path + ['route-map']) + # commit changes + self.cli_commit() + + # Verify FRR configuration + frrconfig = self.getFRRconfig(zebra_route_map) + self.assertNotIn(zebra_route_map, frrconfig) + + def test_bgp_09_distance_and_flowspec(self): + distance_external = '25' + distance_internal = '30' + distance_local = '35' + distance_v4_prefix = '169.254.0.0/32' + distance_v6_prefix = '2001::/128' + distance_prefix_value = '110' + distance_families = ['ipv4-unicast', 'ipv6-unicast','ipv4-multicast', 'ipv6-multicast'] + verify_families = ['ipv4 unicast', 'ipv6 unicast','ipv4 multicast', 'ipv6 multicast'] + flowspec_families = ['address-family ipv4 flowspec', 'address-family ipv6 flowspec'] + flowspec_int = 'lo' + + # Per family distance support + for family in distance_families: + self.cli_set(base_path + ['address-family', family, 'distance', 'external', distance_external]) + self.cli_set(base_path + ['address-family', family, 'distance', 'internal', distance_internal]) + self.cli_set(base_path + ['address-family', family, 'distance', 'local', distance_local]) + if 'ipv4' in family: + self.cli_set(base_path + ['address-family', family, 'distance', + 'prefix', distance_v4_prefix, 'distance', distance_prefix_value]) + if 'ipv6' in family: + self.cli_set(base_path + ['address-family', family, 'distance', + 'prefix', distance_v6_prefix, 'distance', distance_prefix_value]) + + # IPv4 flowspec interface check + self.cli_set(base_path + ['address-family', 'ipv4-flowspec', 'local-install', 'interface', flowspec_int]) + + # IPv6 flowspec interface check + self.cli_set(base_path + ['address-family', 'ipv6-flowspec', 'local-install', 'interface', flowspec_int]) + + # Commit changes + self.cli_commit() + + # Verify FRR distances configuration + frrconfig = self.getFRRconfig(f'router bgp {ASN}') + self.assertIn(f'router bgp {ASN}', frrconfig) + for family in verify_families: + self.assertIn(f'address-family {family}', frrconfig) + self.assertIn(f'distance bgp {distance_external} {distance_internal} {distance_local}', frrconfig) + if 'ipv4' in family: + self.assertIn(f'distance {distance_prefix_value} {distance_v4_prefix}', frrconfig) + if 'ipv6' in family: + self.assertIn(f'distance {distance_prefix_value} {distance_v6_prefix}', frrconfig) + + # Verify FRR flowspec configuration + for family in flowspec_families: + self.assertIn(f'{family}', frrconfig) + self.assertIn(f'local-install {flowspec_int}', frrconfig) + + def test_bgp_10_vrf_simple(self): + router_id = '127.0.0.3' + vrfs = ['red', 'green', 'blue'] + + # It is safe to assume that when the basic VRF test works, all + # other BGP related features work, as we entirely inherit the CLI + # templates and Jinja2 FRR template. + table = '1000' + + for vrf in vrfs: + vrf_base = ['vrf', 'name', vrf] + self.cli_set(vrf_base + ['table', table]) + self.cli_set(vrf_base + ['protocols', 'bgp', 'local-as', ASN]) + self.cli_set(vrf_base + ['protocols', 'bgp', 'parameters', 'router-id', router_id]) + table = str(int(table) + 1000) + + self.cli_commit() + + for vrf in vrfs: + # Verify FRR bgpd configuration + frrconfig = self.getFRRconfig(f'router bgp {ASN} vrf {vrf}') + + self.assertIn(f'router bgp {ASN} vrf {vrf}', frrconfig) + self.assertIn(f' bgp router-id {router_id}', frrconfig) + if __name__ == '__main__': - unittest.main(verbosity=2) + unittest.main(verbosity=2)
\ No newline at end of file |