summaryrefslogtreecommitdiff
path: root/smoketest/scripts/cli/base_interfaces_test.py
diff options
context:
space:
mode:
Diffstat (limited to 'smoketest/scripts/cli/base_interfaces_test.py')
-rw-r--r--smoketest/scripts/cli/base_interfaces_test.py390
1 files changed, 283 insertions, 107 deletions
diff --git a/smoketest/scripts/cli/base_interfaces_test.py b/smoketest/scripts/cli/base_interfaces_test.py
index 8ee5395d0..f897088ef 100644
--- a/smoketest/scripts/cli/base_interfaces_test.py
+++ b/smoketest/scripts/cli/base_interfaces_test.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2019-2020 VyOS maintainers and contributors
+# Copyright (C) 2019-2021 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -12,16 +12,17 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import re
import os
import unittest
-import json
from binascii import hexlify
-from netifaces import ifaddresses
from netifaces import AF_INET
from netifaces import AF_INET6
+from netifaces import ifaddresses
+from netifaces import interfaces
+
+from base_vyostest_shim import VyOSUnitTestSHIM
from vyos.configsession import ConfigSession
from vyos.ifconfig import Interface
@@ -30,6 +31,7 @@ from vyos.util import read_file
from vyos.util import cmd
from vyos.util import dict_search
from vyos.util import process_named_running
+from vyos.util import get_interface_config
from vyos.validate import is_intf_addr_assigned
from vyos.validate import is_ipv6_link_local
@@ -51,24 +53,15 @@ def is_mirrored_to(interface, mirror_if, qdisc):
ret_val = True
return ret_val
-
-dhcp6c_config_file = '/run/dhcp6c/dhcp6c.{}.conf'
-def get_dhcp6c_config_value(interface, key):
- tmp = read_file(dhcp6c_config_file.format(interface))
- tmp = re.findall(r'\n?{}\s+(.*)'.format(key), tmp)
-
- out = []
- for item in tmp:
- out.append(item.replace(';',''))
- return out
-
class BasicInterfaceTest:
- class BaseTest(unittest.TestCase):
+ class TestCase(VyOSUnitTestSHIM.TestCase):
_test_ip = False
_test_mtu = False
_test_vlan = False
_test_qinq = False
_test_ipv6 = False
+ _test_ipv6_pd = False
+ _test_ipv6_dhcpc6 = False
_test_mirror = False
_base_path = []
@@ -84,37 +77,35 @@ class BasicInterfaceTest:
_mtu = '1280'
def setUp(self):
- self.session = ConfigSession(os.getpid())
-
# Setup mirror interfaces for SPAN (Switch Port Analyzer)
for span in self._mirror_interfaces:
section = Section.section(span)
- self.session.set(['interfaces', section, span])
+ self.cli_set(['interfaces', section, span])
def tearDown(self):
- # Ethernet is handled in its derived class
- if 'ethernet' not in self._base_path:
- self.session.delete(self._base_path)
-
# Tear down mirror interfaces for SPAN (Switch Port Analyzer)
for span in self._mirror_interfaces:
section = Section.section(span)
- self.session.delete(['interfaces', section, span])
+ self.cli_delete(['interfaces', section, span])
- self.session.commit()
- del self.session
+ self.cli_delete(self._base_path)
+ self.cli_commit()
+
+ # Verify that no previously interface remained on the system
+ for intf in self._interfaces:
+ self.assertNotIn(intf, interfaces())
def test_span_mirror(self):
if not self._mirror_interfaces:
- return None
+ self.skipTest('not supported')
# Check the two-way mirror rules of ingress and egress
for mirror in self._mirror_interfaces:
for interface in self._interfaces:
- self.session.set(self._base_path + [interface, 'mirror', 'ingress', mirror])
- self.session.set(self._base_path + [interface, 'mirror', 'egress', mirror])
+ self.cli_set(self._base_path + [interface, 'mirror', 'ingress', mirror])
+ self.cli_set(self._base_path + [interface, 'mirror', 'egress', mirror])
- self.session.commit()
+ self.cli_commit()
# Verify config
for mirror in self._mirror_interfaces:
@@ -122,45 +113,69 @@ class BasicInterfaceTest:
self.assertTrue(is_mirrored_to(interface, mirror, 'ffff'))
self.assertTrue(is_mirrored_to(interface, mirror, '1'))
+ def test_interface_disable(self):
+ # Check if description can be added to interface and
+ # can be read back
+ for intf in self._interfaces:
+ self.cli_set(self._base_path + [intf, 'disable'])
+ for option in self._options.get(intf, []):
+ self.cli_set(self._base_path + [intf] + option.split())
+
+ self.cli_commit()
+
+ # Validate interface description
+ for intf in self._interfaces:
+ self.assertEqual(Interface(intf).get_admin_state(), 'down')
def test_interface_description(self):
# Check if description can be added to interface and
# can be read back
for intf in self._interfaces:
test_string=f'Description-Test-{intf}'
- self.session.set(self._base_path + [intf, 'description', test_string])
+ self.cli_set(self._base_path + [intf, 'description', test_string])
for option in self._options.get(intf, []):
- self.session.set(self._base_path + [intf] + option.split())
+ self.cli_set(self._base_path + [intf] + option.split())
- self.session.commit()
+ self.cli_commit()
# Validate interface description
for intf in self._interfaces:
test_string=f'Description-Test-{intf}'
tmp = read_file(f'/sys/class/net/{intf}/ifalias')
- self.assertTrue(tmp, test_string)
+ self.assertEqual(tmp, test_string)
+ self.assertEqual(Interface(intf).get_alias(), test_string)
+ self.cli_delete(self._base_path + [intf, 'description'])
+
+ self.cli_commit()
+
+ # Validate remove interface description "empty"
+ for intf in self._interfaces:
+ tmp = read_file(f'/sys/class/net/{intf}/ifalias')
+ self.assertEqual(tmp, str())
+ self.assertEqual(Interface(intf).get_alias(), str())
def test_add_single_ip_address(self):
addr = '192.0.2.0/31'
for intf in self._interfaces:
- self.session.set(self._base_path + [intf, 'address', addr])
+ self.cli_set(self._base_path + [intf, 'address', addr])
for option in self._options.get(intf, []):
- self.session.set(self._base_path + [intf] + option.split())
+ self.cli_set(self._base_path + [intf] + option.split())
- self.session.commit()
+ self.cli_commit()
for intf in self._interfaces:
self.assertTrue(is_intf_addr_assigned(intf, addr))
+ self.assertEqual(Interface(intf).get_admin_state(), 'up')
def test_add_multiple_ip_addresses(self):
# Add address
for intf in self._interfaces:
for addr in self._test_addr:
- self.session.set(self._base_path + [intf, 'address', addr])
+ self.cli_set(self._base_path + [intf, 'address', addr])
for option in self._options.get(intf, []):
- self.session.set(self._base_path + [intf] + option.split())
+ self.cli_set(self._base_path + [intf] + option.split())
- self.session.commit()
+ self.cli_commit()
# Validate address
for intf in self._interfaces:
@@ -175,15 +190,15 @@ class BasicInterfaceTest:
def test_ipv6_link_local_address(self):
# Common function for IPv6 link-local address assignemnts
if not self._test_ipv6:
- return None
+ self.skipTest('not supported')
for interface in self._interfaces:
base = self._base_path + [interface]
for option in self._options.get(interface, []):
- self.session.set(base + option.split())
+ self.cli_set(base + option.split())
# after commit we must have an IPv6 link-local address
- self.session.commit()
+ self.cli_commit()
for interface in self._interfaces:
for addr in ifaddresses(interface)[AF_INET6]:
@@ -192,26 +207,26 @@ class BasicInterfaceTest:
# disable IPv6 link-local address assignment
for interface in self._interfaces:
base = self._base_path + [interface]
- self.session.set(base + ['ipv6', 'address', 'no-default-link-local'])
+ self.cli_set(base + ['ipv6', 'address', 'no-default-link-local'])
# after commit we must have no IPv6 link-local address
- self.session.commit()
+ self.cli_commit()
for interface in self._interfaces:
self.assertTrue(AF_INET6 not in ifaddresses(interface))
def test_interface_mtu(self):
if not self._test_mtu:
- return None
+ self.skipTest('not supported')
for intf in self._interfaces:
base = self._base_path + [intf]
- self.session.set(base + ['mtu', self._mtu])
+ self.cli_set(base + ['mtu', self._mtu])
for option in self._options.get(intf, []):
- self.session.set(base + option.split())
+ self.cli_set(base + option.split())
# commit interface changes
- self.session.commit()
+ self.cli_commit()
# verify changed MTU
for intf in self._interfaces:
@@ -222,21 +237,21 @@ class BasicInterfaceTest:
# Testcase if MTU can be changed to 1200 on non IPv6
# enabled interfaces
if not self._test_mtu:
- return None
+ self.skipTest('not supported')
old_mtu = self._mtu
self._mtu = '1200'
for intf in self._interfaces:
base = self._base_path + [intf]
- self.session.set(base + ['mtu', self._mtu])
- self.session.set(base + ['ipv6', 'address', 'no-default-link-local'])
+ self.cli_set(base + ['mtu', self._mtu])
+ self.cli_set(base + ['ipv6', 'address', 'no-default-link-local'])
for option in self._options.get(intf, []):
- self.session.set(base + option.split())
+ self.cli_set(base + option.split())
# commit interface changes
- self.session.commit()
+ self.cli_commit()
# verify changed MTU
for intf in self._interfaces:
@@ -245,22 +260,26 @@ class BasicInterfaceTest:
self._mtu = old_mtu
- def test_8021q_vlan_interfaces(self):
+ def test_vif_8021q_interfaces(self):
+ # XXX: This testcase is not allowed to run as first testcase, reason
+ # is the Wireless test will first load the wifi kernel hwsim module
+ # which creates a wlan0 and wlan1 interface which will fail the
+ # tearDown() test in the end that no interface is allowed to survive!
if not self._test_vlan:
- return None
+ self.skipTest('not supported')
for interface in self._interfaces:
base = self._base_path + [interface]
for option in self._options.get(interface, []):
- self.session.set(base + option.split())
+ self.cli_set(base + option.split())
for vlan in self._vlan_range:
base = self._base_path + [interface, 'vif', vlan]
- self.session.set(base + ['mtu', self._mtu])
+ self.cli_set(base + ['mtu', self._mtu])
for address in self._test_addr:
- self.session.set(base + ['address', address])
+ self.cli_set(base + ['address', address])
- self.session.commit()
+ self.cli_commit()
for intf in self._interfaces:
for vlan in self._vlan_range:
@@ -270,29 +289,70 @@ class BasicInterfaceTest:
tmp = read_file(f'/sys/class/net/{vif}/mtu')
self.assertEqual(tmp, self._mtu)
+ self.assertEqual(Interface(vif).get_admin_state(), 'up')
+
+ def test_vif_8021q_lower_up_down(self):
+ # Testcase for https://phabricator.vyos.net/T3349
+ if not self._test_vlan:
+ self.skipTest('not supported')
+
+ for interface in self._interfaces:
+ base = self._base_path + [interface]
+ for option in self._options.get(interface, []):
+ self.cli_set(base + option.split())
+
+ # disable the lower interface
+ self.cli_set(base + ['disable'])
+
+ for vlan in self._vlan_range:
+ vlan_base = self._base_path + [interface, 'vif', vlan]
+ # disable the vlan interface
+ self.cli_set(vlan_base + ['disable'])
+ self.cli_commit()
- def test_8021ad_qinq_vlan_interfaces(self):
+ # re-enable all lower interfaces
+ for interface in self._interfaces:
+ base = self._base_path + [interface]
+ self.cli_delete(base + ['disable'])
+
+ self.cli_commit()
+
+ # verify that the lower interfaces are admin up and the vlan
+ # interfaces are all admin down
+ for interface in self._interfaces:
+ self.assertEqual(Interface(interface).get_admin_state(), 'up')
+
+ for vlan in self._vlan_range:
+ ifname = f'{interface}.{vlan}'
+ self.assertEqual(Interface(ifname).get_admin_state(), 'down')
+
+
+ def test_vif_s_8021ad_vlan_interfaces(self):
+ # XXX: This testcase is not allowed to run as first testcase, reason
+ # is the Wireless test will first load the wifi kernel hwsim module
+ # which creates a wlan0 and wlan1 interface which will fail the
+ # tearDown() test in the end that no interface is allowed to survive!
if not self._test_qinq:
- return None
+ self.skipTest('not supported')
for interface in self._interfaces:
base = self._base_path + [interface]
for option in self._options.get(interface, []):
- self.session.set(base + option.split())
+ self.cli_set(base + option.split())
for vif_s in self._qinq_range:
for vif_c in self._vlan_range:
base = self._base_path + [interface, 'vif-s', vif_s, 'vif-c', vif_c]
- self.session.set(base + ['mtu', self._mtu])
+ self.cli_set(base + ['mtu', self._mtu])
for address in self._test_addr:
- self.session.set(base + ['address', address])
+ self.cli_set(base + ['address', address])
- self.session.commit()
+ self.cli_commit()
for interface in self._interfaces:
for vif_s in self._qinq_range:
- tmp = json.loads(cmd(f'ip -d -j link show dev {interface}.{vif_s}'))[0]
+ tmp = get_interface_config(f'{interface}.{vif_s}')
self.assertEqual(dict_search('linkinfo.info_data.protocol', tmp), '802.1ad')
for vif_c in self._vlan_range:
@@ -305,26 +365,26 @@ class BasicInterfaceTest:
def test_interface_ip_options(self):
if not self._test_ip:
- return None
+ self.skipTest('not supported')
for interface in self._interfaces:
arp_tmo = '300'
path = self._base_path + [interface]
for option in self._options.get(interface, []):
- self.session.set(path + option.split())
+ self.cli_set(path + option.split())
# Options
- self.session.set(path + ['ip', 'arp-cache-timeout', arp_tmo])
- self.session.set(path + ['ip', 'disable-arp-filter'])
- self.session.set(path + ['ip', 'disable-forwarding'])
- self.session.set(path + ['ip', 'enable-arp-accept'])
- self.session.set(path + ['ip', 'enable-arp-announce'])
- self.session.set(path + ['ip', 'enable-arp-ignore'])
- self.session.set(path + ['ip', 'enable-proxy-arp'])
- self.session.set(path + ['ip', 'proxy-arp-pvlan'])
- self.session.set(path + ['ip', 'source-validation', 'loose'])
-
- self.session.commit()
+ self.cli_set(path + ['ip', 'arp-cache-timeout', arp_tmo])
+ self.cli_set(path + ['ip', 'disable-arp-filter'])
+ self.cli_set(path + ['ip', 'disable-forwarding'])
+ self.cli_set(path + ['ip', 'enable-arp-accept'])
+ self.cli_set(path + ['ip', 'enable-arp-announce'])
+ self.cli_set(path + ['ip', 'enable-arp-ignore'])
+ self.cli_set(path + ['ip', 'enable-proxy-arp'])
+ self.cli_set(path + ['ip', 'proxy-arp-pvlan'])
+ self.cli_set(path + ['ip', 'source-validation', 'loose'])
+
+ self.cli_commit()
for interface in self._interfaces:
tmp = read_file(f'/proc/sys/net/ipv4/neigh/{interface}/base_reachable_time_ms')
@@ -356,19 +416,19 @@ class BasicInterfaceTest:
def test_interface_ipv6_options(self):
if not self._test_ipv6:
- return None
+ self.skipTest('not supported')
for interface in self._interfaces:
dad_transmits = '10'
path = self._base_path + [interface]
for option in self._options.get(interface, []):
- self.session.set(path + option.split())
+ self.cli_set(path + option.split())
# Options
- self.session.set(path + ['ipv6', 'disable-forwarding'])
- self.session.set(path + ['ipv6', 'dup-addr-detect-transmits', dad_transmits])
+ self.cli_set(path + ['ipv6', 'disable-forwarding'])
+ self.cli_set(path + ['ipv6', 'dup-addr-detect-transmits', dad_transmits])
- self.session.commit()
+ self.cli_commit()
for interface in self._interfaces:
tmp = read_file(f'/proc/sys/net/ipv6/conf/{interface}/forwarding')
@@ -377,40 +437,156 @@ class BasicInterfaceTest:
tmp = read_file(f'/proc/sys/net/ipv6/conf/{interface}/dad_transmits')
self.assertEqual(dad_transmits, tmp)
+ def test_dhcpv6_clinet_options(self):
+ if not self._test_ipv6_dhcpc6:
+ self.skipTest('not supported')
- def test_ipv6_dhcpv6_prefix_delegation(self):
- if not self._test_ipv6:
- return None
+ duid_base = 10
+ for interface in self._interfaces:
+ duid = '00:01:00:01:27:71:db:f0:00:50:00:00:00:{}'.format(duid_base)
+ path = self._base_path + [interface]
+ for option in self._options.get(interface, []):
+ self.cli_set(path + option.split())
+
+ # Enable DHCPv6 client
+ self.cli_set(path + ['address', 'dhcpv6'])
+ self.cli_set(path + ['dhcpv6-options', 'rapid-commit'])
+ self.cli_set(path + ['dhcpv6-options', 'parameters-only'])
+ self.cli_set(path + ['dhcpv6-options', 'duid', duid])
+ duid_base += 1
+
+ self.cli_commit()
+
+ duid_base = 10
+ for interface in self._interfaces:
+ duid = '00:01:00:01:27:71:db:f0:00:50:00:00:00:{}'.format(duid_base)
+ dhcpc6_config = read_file(f'/run/dhcp6c/dhcp6c.{interface}.conf')
+ self.assertIn(f'interface {interface} ' + '{', dhcpc6_config)
+ self.assertIn(f' request domain-name-servers;', dhcpc6_config)
+ self.assertIn(f' request domain-name;', dhcpc6_config)
+ self.assertIn(f' information-only;', dhcpc6_config)
+ self.assertIn(f' send ia-na 0;', dhcpc6_config)
+ self.assertIn(f' send rapid-commit;', dhcpc6_config)
+ self.assertIn(f' send client-id {duid};', dhcpc6_config)
+ self.assertIn('};', dhcpc6_config)
+ duid_base += 1
+
+ # Check for running process
+ self.assertTrue(process_named_running('dhcp6c'))
+
+ def test_dhcpv6pd_auto_sla_id(self):
+ if not self._test_ipv6_pd:
+ self.skipTest('not supported')
+
+ prefix_len = '56'
+ sla_len = str(64 - int(prefix_len))
+
+ delegatees = ['dum2340', 'dum2341', 'dum2342', 'dum2343', 'dum2344']
- address = '1'
- sla_id = '0'
- sla_len = '8'
for interface in self._interfaces:
path = self._base_path + [interface]
for option in self._options.get(interface, []):
- self.session.set(path + option.split())
+ self.cli_set(path + option.split())
+ address = '1'
# prefix delegation stuff
pd_base = path + ['dhcpv6-options', 'pd', '0']
- self.session.set(pd_base + ['length', '56'])
- self.session.set(pd_base + ['interface', interface, 'address', address])
- self.session.set(pd_base + ['interface', interface, 'sla-id', sla_id])
+ self.cli_set(pd_base + ['length', prefix_len])
+
+ for delegatee in delegatees:
+ section = Section.section(delegatee)
+ self.cli_set(['interfaces', section, delegatee])
+ self.cli_set(pd_base + ['interface', delegatee, 'address', address])
+ # increment interface address
+ address = str(int(address) + 1)
- self.session.commit()
+ self.cli_commit()
for interface in self._interfaces:
+ dhcpc6_config = read_file(f'/run/dhcp6c/dhcp6c.{interface}.conf')
+
# verify DHCPv6 prefix delegation
- # will return: ['delegation', '::/56 infinity;']
- tmp = get_dhcp6c_config_value(interface, 'prefix')[1].split()[0] # mind the whitespace
- self.assertEqual(tmp, '::/56')
- tmp = get_dhcp6c_config_value(interface, 'prefix-interface')[0].split()[0]
- self.assertEqual(tmp, interface)
- tmp = get_dhcp6c_config_value(interface, 'ifid')[0]
- self.assertEqual(tmp, address)
- tmp = get_dhcp6c_config_value(interface, 'sla-id')[0]
- self.assertEqual(tmp, sla_id)
- tmp = get_dhcp6c_config_value(interface, 'sla-len')[0]
- self.assertEqual(tmp, sla_len)
+ self.assertIn(f'prefix ::/{prefix_len} infinity;', dhcpc6_config)
+
+ address = '1'
+ sla_id = '0'
+ for delegatee in delegatees:
+ self.assertIn(f'prefix-interface {delegatee}' + r' {', dhcpc6_config)
+ self.assertIn(f'ifid {address};', dhcpc6_config)
+ self.assertIn(f'sla-id {sla_id};', dhcpc6_config)
+ self.assertIn(f'sla-len {sla_len};', dhcpc6_config)
+
+ # increment sla-id
+ sla_id = str(int(sla_id) + 1)
+ # increment interface address
+ address = str(int(address) + 1)
# Check for running process
self.assertTrue(process_named_running('dhcp6c'))
+
+ for delegatee in delegatees:
+ # we can already cleanup the test delegatee interface here
+ # as until commit() is called, nothing happens
+ section = Section.section(delegatee)
+ self.cli_delete(['interfaces', section, delegatee])
+
+ def test_dhcpv6pd_manual_sla_id(self):
+ if not self._test_ipv6_pd:
+ self.skipTest('not supported')
+
+ prefix_len = '56'
+ sla_len = str(64 - int(prefix_len))
+
+ delegatees = ['dum3340', 'dum3341', 'dum3342', 'dum3343', 'dum3344']
+
+ for interface in self._interfaces:
+ path = self._base_path + [interface]
+ for option in self._options.get(interface, []):
+ self.cli_set(path + option.split())
+
+ # prefix delegation stuff
+ address = '1'
+ sla_id = '1'
+ pd_base = path + ['dhcpv6-options', 'pd', '0']
+ self.cli_set(pd_base + ['length', prefix_len])
+
+ for delegatee in delegatees:
+ section = Section.section(delegatee)
+ self.cli_set(['interfaces', section, delegatee])
+ self.cli_set(pd_base + ['interface', delegatee, 'address', address])
+ self.cli_set(pd_base + ['interface', delegatee, 'sla-id', sla_id])
+
+ # increment interface address
+ address = str(int(address) + 1)
+ sla_id = str(int(sla_id) + 1)
+
+ self.cli_commit()
+
+ # Verify dhcpc6 client configuration
+ for interface in self._interfaces:
+ address = '1'
+ sla_id = '1'
+ dhcpc6_config = read_file(f'/run/dhcp6c/dhcp6c.{interface}.conf')
+
+ # verify DHCPv6 prefix delegation
+ self.assertIn(f'prefix ::/{prefix_len} infinity;', dhcpc6_config)
+
+ for delegatee in delegatees:
+ self.assertIn(f'prefix-interface {delegatee}' + r' {', dhcpc6_config)
+ self.assertIn(f'ifid {address};', dhcpc6_config)
+ self.assertIn(f'sla-id {sla_id};', dhcpc6_config)
+ self.assertIn(f'sla-len {sla_len};', dhcpc6_config)
+
+ # increment sla-id
+ sla_id = str(int(sla_id) + 1)
+ # increment interface address
+ address = str(int(address) + 1)
+
+ # Check for running process
+ self.assertTrue(process_named_running('dhcp6c'))
+
+ for delegatee in delegatees:
+ # we can already cleanup the test delegatee interface here
+ # as until commit() is called, nothing happens
+ section = Section.section(delegatee)
+ self.cli_delete(['interfaces', section, delegatee])