summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xsmoketest/scripts/cli/test_protocols_static.py409
-rwxr-xr-xsmoketest/scripts/cli/test_vrf.py71
2 files changed, 325 insertions, 155 deletions
diff --git a/smoketest/scripts/cli/test_protocols_static.py b/smoketest/scripts/cli/test_protocols_static.py
index 100fd3387..a4c320a62 100755
--- a/smoketest/scripts/cli/test_protocols_static.py
+++ b/smoketest/scripts/cli/test_protocols_static.py
@@ -14,141 +14,382 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import re
import os
-import json
import unittest
-from netifaces import interfaces
-
from vyos.configsession import ConfigSession
from vyos.configsession import ConfigSessionError
-from vyos.ifconfig import Interface
-from vyos.ifconfig import Section
from vyos.template import is_ipv6
from vyos.util import cmd
-from vyos.util import read_file
-from vyos.validate import is_intf_addr_assigned
-dummy_if = 'dum08765'
base_path = ['protocols', 'static']
+vrf_path = ['protocols', 'vrf']
+
+def getFRRCconfig(vrf=None):
+ if vrf:
+ return cmd(f'vtysh -c "show run" | sed -n "/^vrf {vrf}/,/^!/p"')
+ else:
+ return cmd(f'vtysh -c "show run" | sed -n "/^ip route/,/^!/p"')
routes = {
'10.0.0.0/8' : {
- 'next_hop' : '192.0.2.2',
- 'distance' : '200',
+ 'next_hop' : {
+ '192.0.2.100' : { 'distance' : '100' },
+ '192.0.2.110' : { 'distance' : '110', 'interface' : 'eth0' },
+ '192.0.2.120' : { 'distance' : '120', 'disable' : '' },
+ },
+ 'interface' : {
+ 'eth0' : { 'distance' : '130' },
+ 'eth1' : { 'distance' : '140' },
+ },
+ 'blackhole' : { 'distance' : '250', 'tag' : '500' },
},
'172.16.0.0/12' : {
- 'next_hop' : '192.0.2.3',
- },
- '192.168.0.0/16' : {
- 'next_hop' : '192.0.2.3',
- },
- '2001:db8:1000::/48' : {
- 'next_hop' : '2001:db8::1000',
+ 'interface' : {
+ 'eth0' : { 'distance' : '50', 'vrf' : 'black' },
+ 'eth1' : { 'distance' : '60', 'vrf' : 'black' },
+ },
+ 'blackhole' : { 'distance' : '90' },
},
- '2001:db8:2000::/48' : {
- 'next_hop' : '2001:db8::2000',
- },
-}
-
-interface_routes = {
- '10.0.0.0/8' : {
- 'next_hop' : dummy_if,
- 'distance' : '200',
+ '192.0.2.0/24' : {
+ 'interface' : {
+ 'eth0' : { 'distance' : '50', 'vrf' : 'black' },
+ 'eth1' : { 'disable' : '' },
+ },
+ 'blackhole' : { 'distance' : '90' },
},
- '172.16.0.0/12' : {
- 'next_hop' : dummy_if,
+ '100.64.0.0/10' : {
+ 'blackhole' : { },
},
- '192.168.0.0/16' : {
- 'next_hop' : dummy_if,
+ '2001:db8:100::/40' : {
+ 'next_hop' : {
+ '2001:db8::1' : { 'distance' : '10' },
+ '2001:db8::2' : { 'distance' : '20', 'interface' : 'eth0' },
+ '2001:db8::3' : { 'distance' : '30', 'disable' : '' },
+ },
+ 'interface' : {
+ 'eth0' : { 'distance' : '40', 'vrf' : 'black' },
+ 'eth1' : { 'distance' : '50', 'disable' : '' },
+ },
+ 'blackhole' : { 'distance' : '250', 'tag' : '500' },
},
- '2001:db8:1000::/48' : {
- 'next_hop' : dummy_if,
+ '2001:db8:200::/40' : {
+ 'interface' : {
+ 'eth0' : { 'distance' : '40' },
+ 'eth1' : { 'distance' : '50', 'disable' : '' },
+ },
+ 'blackhole' : { 'distance' : '250', 'tag' : '500' },
},
- '2001:db8:2000::/48' : {
- 'next_hop' : dummy_if,
+ '2001:db8::/32' : {
+ 'blackhole' : { 'distance' : '200', 'tag' : '600' },
},
}
+vrfs = ['red', 'green', 'blue']
+tables = ['80', '81', '82']
class StaticRouteTest(unittest.TestCase):
def setUp(self):
self.session = ConfigSession(os.getpid())
- # we need an alive next-hop interface
- self.session.set(['interfaces', 'dummy', dummy_if, 'address', '192.0.2.1/24'])
- self.session.set(['interfaces', 'dummy', dummy_if, 'address', '2001:db8::1/64'])
def tearDown(self):
- self.session.delete(['interfaces', 'dummy', dummy_if])
+ for route, route_config in routes.items():
+ route_type = 'route'
+ if is_ipv6(route):
+ route_type = 'route6'
+ self.session.delete(base_path + [route_type, route])
+
+ for vrf in vrfs:
+ self.session.delete(vrf_path + [vrf])
+
+ for table in tables:
+ self.session.delete(base_path + ['table', table])
+
self.session.commit()
+ del self.session
- def test_static_routes(self):
+ def test_protocols_static(self):
for route, route_config in routes.items():
route_type = 'route'
if is_ipv6(route):
route_type = 'route6'
- self.session.set(base_path + [route_type, route, 'next-hop', route_config['next_hop']])
- if 'distance' in route_config:
- self.session.set(base_path + [route_type, route, 'next-hop', route_config['next_hop'], 'distance', route_config['distance']])
+ base = base_path + [route_type, route]
+ if 'next_hop' in route_config:
+ for next_hop, next_hop_config in route_config['next_hop'].items():
+ self.session.set(base + ['next-hop', next_hop])
+ if 'disable' in next_hop_config:
+ self.session.set(base + ['next-hop', next_hop, 'disable'])
+ if 'distance' in next_hop_config:
+ self.session.set(base + ['next-hop', next_hop, 'distance', next_hop_config['distance']])
+ if 'interface' in next_hop_config:
+ self.session.set(base + ['next-hop', next_hop, 'interface', next_hop_config['interface']])
+ if 'vrf' in next_hop_config:
+ self.session.set(base + ['next-hop', next_hop, 'vrf', next_hop_config['vrf']])
+
+
+ if 'interface' in route_config:
+ for interface, interface_config in route_config['interface'].items():
+ self.session.set(base + ['interface', interface])
+ if 'disable' in interface_config:
+ self.session.set(base + ['interface', interface, 'disable'])
+ if 'distance' in interface_config:
+ self.session.set(base + ['interface', interface, 'distance', interface_config['distance']])
+ if 'vrf' in interface_config:
+ self.session.set(base + ['interface', interface, 'vrf', interface_config['vrf']])
+
+ if 'blackhole' in route_config:
+ self.session.set(base + ['blackhole'])
+ if 'distance' in route_config['blackhole']:
+ self.session.set(base + ['blackhole', 'distance', route_config['blackhole']['distance']])
+ if 'tag' in route_config['blackhole']:
+ self.session.set(base + ['blackhole', 'tag', route_config['blackhole']['tag']])
# commit changes
self.session.commit()
+ # Verify FRR bgpd configuration
+ frrconfig = getFRRCconfig()
+
# Verify routes
for route, route_config in routes.items():
- ip_ver = '-4'
+ ip_ipv6 = 'ip'
if is_ipv6(route):
- ip_ver = '-6'
- tmp = json.loads(cmd(f'ip {ip_ver} -d -j route show {route}'))
+ ip_ipv6 = 'ipv6'
- found = False
- for result in tmp:
- # unfortunately iproute2 does not return the distance
- if 'dst' in result and result['dst'] == route:
- if 'gateway' in result and result['gateway'] == route_config['next_hop']:
- found = True
+ if 'next_hop' in route_config:
+ for next_hop, next_hop_config in route_config['next_hop'].items():
+ tmp = f'{ip_ipv6} route {route} {next_hop}'
+ if 'interface' in next_hop_config:
+ tmp += ' ' + next_hop_config['interface']
+ if 'distance' in next_hop_config:
+ tmp += ' ' + next_hop_config['distance']
+ if 'vrf' in next_hop_config:
+ tmp += ' nexthop-vrf ' + next_hop_config['vrf']
- self.assertTrue(found)
+ if 'disable' in next_hop_config:
+ self.assertNotIn(tmp, frrconfig)
+ else:
+ self.assertIn(tmp, frrconfig)
- route_type = 'route'
- if is_ipv6(route):
- route_type = 'route6'
- self.session.delete(base_path + [route_type, route])
+ if 'interface' in route_config:
+ for interface, interface_config in route_config['interface'].items():
+ tmp = f'{ip_ipv6} route {route} {interface}'
+ if 'interface' in interface_config:
+ tmp += ' ' + interface_config['interface']
+ if 'distance' in interface_config:
+ tmp += ' ' + interface_config['distance']
+ if 'vrf' in interface_config:
+ tmp += ' nexthop-vrf ' + interface_config['vrf']
- def test_interface_routes(self):
- for route, route_config in interface_routes.items():
- route_type = 'interface-route'
- if is_ipv6(route):
- route_type = 'interface-route6'
- self.session.set(base_path + [route_type, route, 'next-hop-interface', route_config['next_hop']])
- if 'distance' in route_config:
- self.session.set(base_path + [route_type, route, 'next-hop-interface', route_config['next_hop'], 'distance', route_config['distance']])
+ if 'disable' in interface_config:
+ self.assertNotIn(tmp, frrconfig)
+ else:
+ self.assertIn(tmp, frrconfig)
+
+ if 'blackhole' in route_config:
+ tmp = f'{ip_ipv6} route {route} blackhole'
+ if 'tag' in route_config['blackhole']:
+ tmp += ' tag ' + route_config['blackhole']['tag']
+ if 'distance' in route_config['blackhole']:
+ tmp += ' ' + route_config['blackhole']['distance']
+
+ self.assertIn(tmp, frrconfig)
+
+ def test_protocols_static_table(self):
+ for table in tables:
+ for route, route_config in routes.items():
+ route_type = 'route'
+ if is_ipv6(route):
+ route_type = 'route6'
+ base = base_path + ['table', table, route_type, route]
+
+ if 'next_hop' in route_config:
+ for next_hop, next_hop_config in route_config['next_hop'].items():
+ self.session.set(base + ['next-hop', next_hop])
+ if 'disable' in next_hop_config:
+ self.session.set(base + ['next-hop', next_hop, 'disable'])
+ if 'distance' in next_hop_config:
+ self.session.set(base + ['next-hop', next_hop, 'distance', next_hop_config['distance']])
+ if 'interface' in next_hop_config:
+ self.session.set(base + ['next-hop', next_hop, 'interface', next_hop_config['interface']])
+
+ # This is currently not supported because of an FRR issue:
+ # https://github.com/FRRouting/frr/issues/8016
+ # if 'vrf' in next_hop_config:
+ # self.session.set(base + ['next-hop', next_hop, 'vrf', next_hop_config['vrf']])
+
+
+ if 'interface' in route_config:
+ for interface, interface_config in route_config['interface'].items():
+ self.session.set(base + ['interface', interface])
+ if 'disable' in interface_config:
+ self.session.set(base + ['interface', interface, 'disable'])
+ if 'distance' in interface_config:
+ self.session.set(base + ['interface', interface, 'distance', interface_config['distance']])
+
+ # This is currently not supported because of an FRR issue:
+ # https://github.com/FRRouting/frr/issues/8016
+ # if 'vrf' in interface_config:
+ # self.session.set(base + ['interface', interface, 'vrf', interface_config['vrf']])
+
+ if 'blackhole' in route_config:
+ self.session.set(base + ['blackhole'])
+ if 'distance' in route_config['blackhole']:
+ self.session.set(base + ['blackhole', 'distance', route_config['blackhole']['distance']])
+ if 'tag' in route_config['blackhole']:
+ self.session.set(base + ['blackhole', 'tag', route_config['blackhole']['tag']])
# commit changes
self.session.commit()
- # Verify routes
- for route, route_config in interface_routes.items():
- ip_ver = '-4'
- if is_ipv6(route):
- ip_ver = '-6'
- tmp = json.loads(cmd(f'ip {ip_ver} -d -j route show {route}'))
+ # Verify FRR bgpd configuration
+ frrconfig = getFRRCconfig()
- found = False
- for result in tmp:
- # unfortunately iproute2 does not return the distance
- if 'dst' in result and result['dst'] == route:
- if 'dev' in result and result['dev'] == route_config['next_hop']:
- found = True
- break
+ for table in tables:
+ # Verify routes
+ for route, route_config in routes.items():
+ ip_ipv6 = 'ip'
+ if is_ipv6(route):
+ ip_ipv6 = 'ipv6'
- self.assertTrue(found)
+ if 'next_hop' in route_config:
+ for next_hop, next_hop_config in route_config['next_hop'].items():
+ tmp = f'{ip_ipv6} route {route} {next_hop}'
+ if 'interface' in next_hop_config:
+ tmp += ' ' + next_hop_config['interface']
+ if 'distance' in next_hop_config:
+ tmp += ' ' + next_hop_config['distance']
+ # This is currently not supported because of an FRR issue:
+ # https://github.com/FRRouting/frr/issues/8016
+ # if 'vrf' in next_hop_config:
+ # tmp += ' nexthop-vrf ' + next_hop_config['vrf']
+
+ tmp += ' table ' + table
+ if 'disable' in next_hop_config:
+ self.assertNotIn(tmp, frrconfig)
+ else:
+ self.assertIn(tmp, frrconfig)
+
+ if 'interface' in route_config:
+ for interface, interface_config in route_config['interface'].items():
+ tmp = f'{ip_ipv6} route {route} {interface}'
+ if 'interface' in interface_config:
+ tmp += ' ' + interface_config['interface']
+ if 'distance' in interface_config:
+ tmp += ' ' + interface_config['distance']
+ # This is currently not supported because of an FRR issue:
+ # https://github.com/FRRouting/frr/issues/8016
+ # if 'vrf' in interface_config:
+ # tmp += ' nexthop-vrf ' + interface_config['vrf']
+
+ tmp += ' table ' + table
+ if 'disable' in interface_config:
+ self.assertNotIn(tmp, frrconfig)
+ else:
+ self.assertIn(tmp, frrconfig)
+
+ if 'blackhole' in route_config:
+ tmp = f'{ip_ipv6} route {route} blackhole'
+ if 'tag' in route_config['blackhole']:
+ tmp += ' tag ' + route_config['blackhole']['tag']
+ if 'distance' in route_config['blackhole']:
+ tmp += ' ' + route_config['blackhole']['distance']
+
+ tmp += ' table ' + table
+ self.assertIn(tmp, frrconfig)
+
+
+ def test_protocols_vrf_static(self):
+ for vrf in vrfs:
+ for route, route_config in routes.items():
+ route_type = 'route'
+ if is_ipv6(route):
+ route_type = 'route6'
+ base = vrf_path + [vrf, 'static', route_type, route]
+
+ if 'next_hop' in route_config:
+ for next_hop, next_hop_config in route_config['next_hop'].items():
+ self.session.set(base + ['next-hop', next_hop])
+ if 'disable' in next_hop_config:
+ self.session.set(base + ['next-hop', next_hop, 'disable'])
+ if 'distance' in next_hop_config:
+ self.session.set(base + ['next-hop', next_hop, 'distance', next_hop_config['distance']])
+ if 'interface' in next_hop_config:
+ self.session.set(base + ['next-hop', next_hop, 'interface', next_hop_config['interface']])
+ if 'vrf' in next_hop_config:
+ self.session.set(base + ['next-hop', next_hop, 'vrf', next_hop_config['vrf']])
+
+
+ if 'interface' in route_config:
+ for interface, interface_config in route_config['interface'].items():
+ self.session.set(base + ['interface', interface])
+ if 'disable' in interface_config:
+ self.session.set(base + ['interface', interface, 'disable'])
+ if 'distance' in interface_config:
+ self.session.set(base + ['interface', interface, 'distance', interface_config['distance']])
+ if 'vrf' in interface_config:
+ self.session.set(base + ['interface', interface, 'vrf', interface_config['vrf']])
+
+ if 'blackhole' in route_config:
+ self.session.set(base + ['blackhole'])
+ if 'distance' in route_config['blackhole']:
+ self.session.set(base + ['blackhole', 'distance', route_config['blackhole']['distance']])
+ if 'tag' in route_config['blackhole']:
+ self.session.set(base + ['blackhole', 'tag', route_config['blackhole']['tag']])
+
+ # commit changes
+ self.session.commit()
+
+ for vrf in vrfs:
+ # Verify FRR bgpd configuration
+ frrconfig = getFRRCconfig(vrf)
+ self.assertIn(f'vrf {vrf}', frrconfig)
+
+ # Verify routes
+ for route, route_config in routes.items():
+ ip_ipv6 = 'ip'
+ if is_ipv6(route):
+ ip_ipv6 = 'ipv6'
+
+ if 'next_hop' in route_config:
+ for next_hop, next_hop_config in route_config['next_hop'].items():
+ tmp = f'{ip_ipv6} route {route} {next_hop}'
+ if 'interface' in next_hop_config:
+ tmp += ' ' + next_hop_config['interface']
+ if 'distance' in next_hop_config:
+ tmp += ' ' + next_hop_config['distance']
+ if 'vrf' in next_hop_config:
+ tmp += ' nexthop-vrf ' + next_hop_config['vrf']
+
+ if 'disable' in next_hop_config:
+ self.assertNotIn(tmp, frrconfig)
+ else:
+ self.assertIn(tmp, frrconfig)
+
+ if 'interface' in route_config:
+ for interface, interface_config in route_config['interface'].items():
+ tmp = f'{ip_ipv6} route {route} {interface}'
+ if 'interface' in interface_config:
+ tmp += ' ' + interface_config['interface']
+ if 'distance' in interface_config:
+ tmp += ' ' + interface_config['distance']
+ if 'vrf' in interface_config:
+ tmp += ' nexthop-vrf ' + interface_config['vrf']
+
+ if 'disable' in interface_config:
+ self.assertNotIn(tmp, frrconfig)
+ else:
+ self.assertIn(tmp, frrconfig)
+
+ if 'blackhole' in route_config:
+ tmp = f'{ip_ipv6} route {route} blackhole'
+ if 'tag' in route_config['blackhole']:
+ tmp += ' tag ' + route_config['blackhole']['tag']
+ if 'distance' in route_config['blackhole']:
+ tmp += ' ' + route_config['blackhole']['distance']
+
+ self.assertIn(tmp, frrconfig)
- route_type = 'interface-route'
- if is_ipv6(route):
- route_type = 'interface-route6'
- self.session.delete(base_path + [route_type, route])
if __name__ == '__main__':
- unittest.main(verbosity=2, failfast=True)
+ unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_vrf.py b/smoketest/scripts/cli/test_vrf.py
index 856baa070..8e977d407 100755
--- a/smoketest/scripts/cli/test_vrf.py
+++ b/smoketest/scripts/cli/test_vrf.py
@@ -166,76 +166,5 @@ class VRFTest(unittest.TestCase):
section = Section.section(interface)
self.session.delete(['interfaces', section, interface, 'vrf'])
- def test_vrf_static_routes(self):
- routes = {
- '10.0.0.0/8' : {
- 'next_hop' : '192.0.2.2',
- 'distance' : '200',
- 'next_hop_vrf' : 'default',
- },
- '172.16.0.0/12' : {
- 'next_hop' : '192.0.2.3',
- 'next_hop_vrf' : 'default',
- },
- '192.168.0.0/16' : {
- 'next_hop' : '192.0.2.3',
- },
- '2001:db8:1000::/48' : {
- 'next_hop' : '2001:db8::2',
- },
- }
-
- table = '2000'
- for vrf in vrfs:
- base = base_path + ['name', vrf]
- self.session.set(base + ['table', str(table)])
-
- # required interface for leaking to default table
- self.session.set(['interfaces', 'ethernet', 'eth0', 'address', '192.0.2.1/24'])
-
- # we also need an interface in "UP" state to install routes
- self.session.set(['interfaces', 'dummy', f'dum{table}', 'vrf', vrf])
- self.session.set(['interfaces', 'dummy', f'dum{table}', 'address', '192.0.2.1/24'])
- self.session.set(['interfaces', 'dummy', f'dum{table}', 'address', '2001:db8::1/64'])
- table = str(int(table) + 1)
-
- proto_base = ['protocols', 'vrf', vrf, 'static']
- for route, route_config in routes.items():
- route_type = 'route'
- if is_ipv6(route):
- route_type = 'route6'
- self.session.set(proto_base + [route_type, route, 'next-hop', route_config['next_hop']])
- if 'distance' in route_config:
- self.session.set(proto_base + [route_type, route, 'next-hop', route_config['next_hop'], 'distance', route_config['distance']])
- if 'next_hop_vrf' in route_config:
- self.session.set(proto_base + [route_type, route, 'next-hop', route_config['next_hop'], 'next-hop-vrf', route_config['next_hop_vrf']])
-
- # commit changes
- self.session.commit()
-
- # Verify routes
- table = '2000'
- for vrf in vrfs:
- for route, route_config in routes.items():
- if is_ipv6(route):
- tmp = get_vrf_ipv6_routes(vrf)
- else:
- tmp = get_vrf_ipv4_routes(vrf)
-
- found = False
- for result in tmp:
- if 'dst' in result and result['dst'] == route:
- if 'gateway' in result and result['gateway'] == route_config['next_hop']:
- found = True
-
- self.assertTrue(found)
-
- # Cleanup
- self.session.delete(['protocols', 'vrf', vrf])
- self.session.delete(['interfaces', 'dummy', f'dum{table}'])
- self.session.delete(['interfaces', 'ethernet', 'eth0', 'address', '192.0.2.1/24'])
-
- table = str(int(table) + 1)
-
if __name__ == '__main__':
unittest.main(verbosity=2)