summaryrefslogtreecommitdiff
path: root/smoketest/scripts/cli
diff options
context:
space:
mode:
Diffstat (limited to 'smoketest/scripts/cli')
-rw-r--r--smoketest/scripts/cli/base_interfaces_test.py23
-rw-r--r--smoketest/scripts/cli/base_vyostest_shim.py5
-rwxr-xr-xsmoketest/scripts/cli/test_firewall.py7
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_macsec.py35
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_pppoe.py67
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_vxlan.py103
-rwxr-xr-xsmoketest/scripts/cli/test_policy.py27
-rwxr-xr-xsmoketest/scripts/cli/test_protocols_bfd.py18
-rwxr-xr-xsmoketest/scripts/cli/test_protocols_bgp.py18
-rwxr-xr-xsmoketest/scripts/cli/test_protocols_igmp-proxy.py29
-rwxr-xr-xsmoketest/scripts/cli/test_protocols_isis.py19
-rwxr-xr-xsmoketest/scripts/cli/test_protocols_mpls.py10
-rwxr-xr-xsmoketest/scripts/cli/test_protocols_ospf.py10
-rwxr-xr-xsmoketest/scripts/cli/test_protocols_ospfv3.py10
-rwxr-xr-xsmoketest/scripts/cli/test_protocols_pim.py192
-rwxr-xr-xsmoketest/scripts/cli/test_protocols_pim6.py84
-rwxr-xr-xsmoketest/scripts/cli/test_protocols_rip.py9
-rwxr-xr-xsmoketest/scripts/cli/test_protocols_ripng.py51
-rwxr-xr-xsmoketest/scripts/cli/test_protocols_rpki.py16
-rwxr-xr-xsmoketest/scripts/cli/test_service_dns_dynamic.py130
-rwxr-xr-xsmoketest/scripts/cli/test_service_https.py176
-rwxr-xr-xsmoketest/scripts/cli/test_vrf.py2
22 files changed, 830 insertions, 211 deletions
diff --git a/smoketest/scripts/cli/base_interfaces_test.py b/smoketest/scripts/cli/base_interfaces_test.py
index 51ccbc9e6..3f42196f7 100644
--- a/smoketest/scripts/cli/base_interfaces_test.py
+++ b/smoketest/scripts/cli/base_interfaces_test.py
@@ -12,9 +12,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import os
-import unittest
-
from binascii import hexlify
from netifaces import AF_INET
@@ -24,7 +21,6 @@ from netifaces import interfaces
from base_vyostest_shim import VyOSUnitTestSHIM
-from vyos.configsession import ConfigSession
from vyos.configsession import ConfigSessionError
from vyos.defaults import directories
from vyos.ifconfig import Interface
@@ -162,14 +158,22 @@ class BasicInterfaceTest:
if not self._test_dhcp or not self._test_vrf:
self.skipTest('not supported')
+ client_id = 'VyOS-router'
distance = '100'
+ hostname = 'vyos'
+ vendor_class_id = 'vyos-vendor'
+ user_class = 'vyos'
for interface in self._interfaces:
for option in self._options.get(interface, []):
self.cli_set(self._base_path + [interface] + option.split())
self.cli_set(self._base_path + [interface, 'address', 'dhcp'])
+ self.cli_set(self._base_path + [interface, 'dhcp-options', 'client-id', client_id])
self.cli_set(self._base_path + [interface, 'dhcp-options', 'default-route-distance', distance])
+ self.cli_set(self._base_path + [interface, 'dhcp-options', 'host-name', hostname])
+ self.cli_set(self._base_path + [interface, 'dhcp-options', 'vendor-class-id', vendor_class_id])
+ self.cli_set(self._base_path + [interface, 'dhcp-options', 'user-class', user_class])
self.cli_commit()
@@ -179,8 +183,12 @@ class BasicInterfaceTest:
self.assertTrue(dhclient_pid)
dhclient_config = read_file(f'{dhclient_base_dir}/dhclient_{interface}.conf')
- self.assertIn('request subnet-mask, broadcast-address, routers, domain-name-servers', dhclient_config)
- self.assertIn('require subnet-mask;', dhclient_config)
+ self.assertIn(f'request subnet-mask, broadcast-address, routers, domain-name-servers', dhclient_config)
+ self.assertIn(f'require subnet-mask;', dhclient_config)
+ self.assertIn(f'send host-name "{hostname}";', dhclient_config)
+ self.assertIn(f'send dhcp-client-identifier "{client_id}";', dhclient_config)
+ self.assertIn(f'send vendor-class-identifier "{vendor_class_id}";', dhclient_config)
+ self.assertIn(f'send user-class "{user_class}";', dhclient_config)
# and the commandline has the appropriate options
cmdline = read_file(f'/proc/{dhclient_pid}/cmdline')
@@ -404,10 +412,9 @@ class BasicInterfaceTest:
for intf in self._interfaces:
base = self._base_path + [intf]
- self.cli_set(base + ['mtu', self._mtu])
-
for option in self._options.get(intf, []):
self.cli_set(base + option.split())
+ self.cli_set(base + ['mtu', self._mtu])
# check validate() - can not set low MTU if 'no-default-link-local'
# is not set on CLI
diff --git a/smoketest/scripts/cli/base_vyostest_shim.py b/smoketest/scripts/cli/base_vyostest_shim.py
index f694f539d..140642806 100644
--- a/smoketest/scripts/cli/base_vyostest_shim.py
+++ b/smoketest/scripts/cli/base_vyostest_shim.py
@@ -78,9 +78,10 @@ class VyOSUnitTestSHIM:
while run(f'sudo lsof -nP {commit_lock}') == 0:
sleep(0.250)
- def getFRRconfig(self, string, end='$', endsection='^!', daemon=''):
+ def getFRRconfig(self, string=None, end='$', endsection='^!', daemon=''):
""" Retrieve current "running configuration" from FRR """
- command = f'vtysh -c "show run {daemon} no-header" | sed -n "/^{string}{end}/,/{endsection}/p"'
+ command = f'vtysh -c "show run {daemon} no-header"'
+ if string: command += f' | sed -n "/^{string}{end}/,/{endsection}/p"'
out = cmd(command)
if self.debug:
import pprint
diff --git a/smoketest/scripts/cli/test_firewall.py b/smoketest/scripts/cli/test_firewall.py
index 8c3e00a2a..cffa1c0be 100755
--- a/smoketest/scripts/cli/test_firewall.py
+++ b/smoketest/scripts/cli/test_firewall.py
@@ -586,6 +586,7 @@ class TestFirewall(VyOSUnitTestSHIM.TestCase):
self.cli_set(['firewall', 'bridge', 'name', name, 'rule', '1', 'log-options', 'level', 'crit'])
self.cli_set(['firewall', 'bridge', 'forward', 'filter', 'default-action', 'drop'])
+ self.cli_set(['firewall', 'bridge', 'forward', 'filter', 'enable-default-log'])
self.cli_set(['firewall', 'bridge', 'forward', 'filter', 'rule', '1', 'action', 'accept'])
self.cli_set(['firewall', 'bridge', 'forward', 'filter', 'rule', '1', 'vlan', 'id', vlan_id])
self.cli_set(['firewall', 'bridge', 'forward', 'filter', 'rule', '2', 'action', 'jump'])
@@ -596,11 +597,13 @@ class TestFirewall(VyOSUnitTestSHIM.TestCase):
nftables_search = [
['chain VYOS_FORWARD_filter'],
- ['type filter hook forward priority filter; policy drop;'],
+ ['type filter hook forward priority filter; policy accept;'],
[f'vlan id {vlan_id}', 'accept'],
[f'vlan pcp {vlan_prior}', f'jump NAME_{name}'],
+ ['log prefix "[bri-FWD-filter-default-D]"', 'drop', 'FWD-filter default-action drop'],
[f'chain NAME_{name}'],
- [f'ether saddr {mac_address}', f'iifname "{interface_in}"', f'log prefix "[bri-NAM-{name}-1-A]" log level crit', 'accept']
+ [f'ether saddr {mac_address}', f'iifname "{interface_in}"', f'log prefix "[bri-NAM-{name}-1-A]" log level crit', 'accept'],
+ ['accept', f'{name} default-action accept']
]
self.verify_nftables(nftables_search, 'bridge vyos_filter')
diff --git a/smoketest/scripts/cli/test_interfaces_macsec.py b/smoketest/scripts/cli/test_interfaces_macsec.py
index ea0f00071..d8d564792 100755
--- a/smoketest/scripts/cli/test_interfaces_macsec.py
+++ b/smoketest/scripts/cli/test_interfaces_macsec.py
@@ -14,7 +14,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import os
import re
import unittest
@@ -23,9 +22,10 @@ from netifaces import interfaces
from vyos.configsession import ConfigSessionError
from vyos.ifconfig import Section
-from vyos.utils.process import cmd
from vyos.utils.file import read_file
from vyos.utils.network import get_interface_config
+from vyos.utils.network import interface_exists
+from vyos.utils.process import cmd
from vyos.utils.process import process_named_running
PROCESS_NAME = 'wpa_supplicant'
@@ -35,10 +35,6 @@ def get_config_value(interface, key):
tmp = re.findall(r'\n?{}=(.*)'.format(key), tmp)
return tmp[0]
-def get_cipher(interface):
- tmp = get_interface_config(interface)
- return tmp['linkinfo']['info_data']['cipher_suite'].lower()
-
class MACsecInterfaceTest(BasicInterfaceTest.TestCase):
@classmethod
def setUpClass(cls):
@@ -117,6 +113,10 @@ class MACsecInterfaceTest(BasicInterfaceTest.TestCase):
tmp = read_file(f'/sys/class/net/{interface}/mtu')
self.assertEqual(tmp, '1460')
+ # Encryption enabled?
+ tmp = get_interface_config(interface)
+ self.assertTrue(tmp['linkinfo']['info_data']['encrypt'])
+
# Check for running process
self.assertTrue(process_named_running(PROCESS_NAME))
@@ -138,10 +138,11 @@ class MACsecInterfaceTest(BasicInterfaceTest.TestCase):
# final commit and verify
self.cli_commit()
- self.assertIn(interface, interfaces())
+ self.assertTrue(interface_exists(interface))
# Verify proper cipher suite (T4537)
- self.assertEqual(cipher, get_cipher(interface))
+ tmp = get_interface_config(interface)
+ self.assertEqual(cipher, tmp['linkinfo']['info_data']['cipher_suite'].lower())
def test_macsec_gcm_aes_256(self):
src_interface = 'eth0'
@@ -161,10 +162,11 @@ class MACsecInterfaceTest(BasicInterfaceTest.TestCase):
# final commit and verify
self.cli_commit()
- self.assertIn(interface, interfaces())
+ self.assertTrue(interface_exists(interface))
# Verify proper cipher suite (T4537)
- self.assertEqual(cipher, get_cipher(interface))
+ tmp = get_interface_config(interface)
+ self.assertEqual(cipher, tmp['linkinfo']['info_data']['cipher_suite'].lower())
def test_macsec_source_interface(self):
# Ensure source-interface can bot be part of any other bond or bridge
@@ -191,7 +193,7 @@ class MACsecInterfaceTest(BasicInterfaceTest.TestCase):
# final commit and verify
self.cli_commit()
- self.assertIn(interface, interfaces())
+ self.assertTrue(interface_exists(interface))
def test_macsec_static_keys(self):
src_interface = 'eth0'
@@ -205,7 +207,7 @@ class MACsecInterfaceTest(BasicInterfaceTest.TestCase):
peer_mac = '00:11:22:33:44:55'
self.cli_set(self._base_path + [interface])
- # Encrypt link
+ # Encrypt link
self.cli_set(self._base_path + [interface, 'security', 'encrypt'])
# check validate() - source interface is mandatory
@@ -261,9 +263,12 @@ class MACsecInterfaceTest(BasicInterfaceTest.TestCase):
# final commit and verify
self.cli_commit()
- self.assertIn(interface, interfaces())
- self.assertEqual(cipher2, get_cipher(interface))
- self.assertTrue(os.path.isdir(f'/sys/class/net/{interface}'))
+
+ self.assertTrue(interface_exists(interface))
+ tmp = get_interface_config(interface)
+ self.assertEqual(cipher2, tmp['linkinfo']['info_data']['cipher_suite'].lower())
+ # Encryption enabled?
+ self.assertTrue(tmp['linkinfo']['info_data']['encrypt'])
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_interfaces_pppoe.py b/smoketest/scripts/cli/test_interfaces_pppoe.py
index 7b702759f..e99d8b3d1 100755
--- a/smoketest/scripts/cli/test_interfaces_pppoe.py
+++ b/smoketest/scripts/cli/test_interfaces_pppoe.py
@@ -36,6 +36,9 @@ class PPPoEInterfaceTest(VyOSUnitTestSHIM.TestCase):
@classmethod
def setUpClass(cls):
super(PPPoEInterfaceTest, cls).setUpClass()
+ # ensure we can also run this test on a live system - so lets clean
+ # out the current configuration :)
+ cls.cli_delete(cls, base_path)
cls._interfaces = ['pppoe10', 'pppoe20', 'pppoe30']
cls._source_interface = 'eth0'
@@ -53,18 +56,16 @@ class PPPoEInterfaceTest(VyOSUnitTestSHIM.TestCase):
self.cli_delete(base_path)
self.cli_commit()
- def test_01_pppoe_client(self):
+ def test_pppoe_client(self):
# Check if PPPoE dialer can be configured and runs
for interface in self._interfaces:
user = f'VyOS-user-{interface}'
passwd = f'VyOS-passwd-{interface}'
mtu = '1400'
- mru = '1300'
self.cli_set(base_path + [interface, 'authentication', 'username', user])
self.cli_set(base_path + [interface, 'authentication', 'password', passwd])
self.cli_set(base_path + [interface, 'mtu', mtu])
- self.cli_set(base_path + [interface, 'mru', '9000'])
self.cli_set(base_path + [interface, 'no-peer-dns'])
# check validate() - a source-interface is required
@@ -72,11 +73,6 @@ class PPPoEInterfaceTest(VyOSUnitTestSHIM.TestCase):
self.cli_commit()
self.cli_set(base_path + [interface, 'source-interface', self._source_interface])
- # check validate() - MRU needs to be less or equal then MTU
- with self.assertRaises(ConfigSessionError):
- self.cli_commit()
- self.cli_set(base_path + [interface, 'mru', mru])
-
# commit changes
self.cli_commit()
@@ -87,8 +83,9 @@ class PPPoEInterfaceTest(VyOSUnitTestSHIM.TestCase):
tmp = get_config_value(interface, 'mtu')[1]
self.assertEqual(tmp, mtu)
+ # MRU must default to MTU if not specified on CLI
tmp = get_config_value(interface, 'mru')[1]
- self.assertEqual(tmp, mru)
+ self.assertEqual(tmp, mtu)
tmp = get_config_value(interface, 'user')[1].replace('"', '')
self.assertEqual(tmp, user)
tmp = get_config_value(interface, 'password')[1].replace('"', '')
@@ -96,7 +93,7 @@ class PPPoEInterfaceTest(VyOSUnitTestSHIM.TestCase):
tmp = get_config_value(interface, 'ifname')[1]
self.assertEqual(tmp, interface)
- def test_02_pppoe_client_disabled_interface(self):
+ def test_pppoe_client_disabled_interface(self):
# Check if PPPoE Client can be disabled
for interface in self._interfaces:
user = f'VyOS-user-{interface}'
@@ -125,16 +122,16 @@ class PPPoEInterfaceTest(VyOSUnitTestSHIM.TestCase):
self.cli_commit()
- def test_03_pppoe_authentication(self):
+ def test_pppoe_authentication(self):
# When username or password is set - so must be the other
for interface in self._interfaces:
user = f'VyOS-user-{interface}'
passwd = f'VyOS-passwd-{interface}'
- self.cli_set(base_path + [interface, 'authentication', 'username', user])
self.cli_set(base_path + [interface, 'source-interface', self._source_interface])
self.cli_set(base_path + [interface, 'ipv6', 'address', 'autoconf'])
+ self.cli_set(base_path + [interface, 'authentication', 'username', user])
# check validate() - if user is set, so must be the password
with self.assertRaises(ConfigSessionError):
self.cli_commit()
@@ -143,7 +140,7 @@ class PPPoEInterfaceTest(VyOSUnitTestSHIM.TestCase):
self.cli_commit()
- def test_04_pppoe_dhcpv6pd(self):
+ def test_pppoe_dhcpv6pd(self):
# Check if PPPoE dialer can be configured with DHCPv6-PD
address = '1'
sla_id = '0'
@@ -183,7 +180,7 @@ class PPPoEInterfaceTest(VyOSUnitTestSHIM.TestCase):
tmp = get_config_value(interface, '+ipv6 ipv6cp-use-ipaddr')
self.assertListEqual(tmp, ['+ipv6', 'ipv6cp-use-ipaddr'])
- def test_05_pppoe_options(self):
+ def test_pppoe_options(self):
# Check if PPPoE dialer can be configured with DHCPv6-PD
for interface in self._interfaces:
user = f'VyOS-user-{interface}'
@@ -215,5 +212,47 @@ class PPPoEInterfaceTest(VyOSUnitTestSHIM.TestCase):
tmp = get_config_value(interface, 'pppoe-host-uniq')[1]
self.assertEqual(tmp, f'"{host_uniq}"')
+ def test_pppoe_mtu_mru(self):
+ # Check if PPPoE dialer can be configured and runs
+ for interface in self._interfaces:
+ user = f'VyOS-user-{interface}'
+ passwd = f'VyOS-passwd-{interface}'
+ mtu = '1400'
+ mru = '1300'
+
+ self.cli_set(base_path + [interface, 'authentication', 'username', user])
+ self.cli_set(base_path + [interface, 'authentication', 'password', passwd])
+ self.cli_set(base_path + [interface, 'mtu', mtu])
+ self.cli_set(base_path + [interface, 'mru', '9000'])
+
+ # check validate() - a source-interface is required
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
+ self.cli_set(base_path + [interface, 'source-interface', self._source_interface])
+
+ # check validate() - MRU needs to be less or equal then MTU
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
+ self.cli_set(base_path + [interface, 'mru', mru])
+
+ # commit changes
+ self.cli_commit()
+
+ # verify configuration file(s)
+ for interface in self._interfaces:
+ user = f'VyOS-user-{interface}'
+ passwd = f'VyOS-passwd-{interface}'
+
+ tmp = get_config_value(interface, 'mtu')[1]
+ self.assertEqual(tmp, mtu)
+ tmp = get_config_value(interface, 'mru')[1]
+ self.assertEqual(tmp, mru)
+ tmp = get_config_value(interface, 'user')[1].replace('"', '')
+ self.assertEqual(tmp, user)
+ tmp = get_config_value(interface, 'password')[1].replace('"', '')
+ self.assertEqual(tmp, passwd)
+ tmp = get_config_value(interface, 'ifname')[1]
+ self.assertEqual(tmp, interface)
+
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_interfaces_vxlan.py b/smoketest/scripts/cli/test_interfaces_vxlan.py
index 17e4fc36f..18676491b 100755
--- a/smoketest/scripts/cli/test_interfaces_vxlan.py
+++ b/smoketest/scripts/cli/test_interfaces_vxlan.py
@@ -18,10 +18,12 @@ import unittest
from vyos.configsession import ConfigSessionError
from vyos.ifconfig import Interface
+from vyos.ifconfig import Section
from vyos.utils.network import get_bridge_fdb
from vyos.utils.network import get_interface_config
from vyos.utils.network import interface_exists
from vyos.utils.network import get_vxlan_vlan_tunnels
+from vyos.utils.network import get_vxlan_vni_filter
from vyos.template import is_ipv6
from base_interfaces_test import BasicInterfaceTest
@@ -31,12 +33,13 @@ class VXLANInterfaceTest(BasicInterfaceTest.TestCase):
cls._base_path = ['interfaces', 'vxlan']
cls._options = {
'vxlan10': ['vni 10', 'remote 127.0.0.2'],
- 'vxlan20': ['vni 20', 'group 239.1.1.1', 'source-interface eth0'],
+ 'vxlan20': ['vni 20', 'group 239.1.1.1', 'source-interface eth0', 'mtu 1450'],
'vxlan30': ['vni 30', 'remote 2001:db8:2000::1', 'source-address 2001:db8:1000::1', 'parameters ipv6 flowlabel 0x1000'],
'vxlan40': ['vni 40', 'remote 127.0.0.2', 'remote 127.0.0.3'],
'vxlan50': ['vni 50', 'remote 2001:db8:2000::1', 'remote 2001:db8:2000::2', 'parameters ipv6 flowlabel 0x1000'],
}
cls._interfaces = list(cls._options)
+ cls._mtu = '1450'
# call base-classes classmethod
super(VXLANInterfaceTest, cls).setUpClass()
@@ -138,7 +141,7 @@ class VXLANInterfaceTest(BasicInterfaceTest.TestCase):
def test_vxlan_vlan_vni_mapping(self):
bridge = 'br0'
interface = 'vxlan0'
- source_interface = 'eth0'
+ source_address = '192.0.2.99'
vlan_to_vni = {
'10': '10010',
@@ -151,7 +154,7 @@ class VXLANInterfaceTest(BasicInterfaceTest.TestCase):
}
self.cli_set(self._base_path + [interface, 'parameters', 'external'])
- self.cli_set(self._base_path + [interface, 'source-interface', source_interface])
+ self.cli_set(self._base_path + [interface, 'source-address', source_address])
for vlan, vni in vlan_to_vni.items():
self.cli_set(self._base_path + [interface, 'vlan-to-vni', vlan, 'vni', vni])
@@ -187,11 +190,12 @@ class VXLANInterfaceTest(BasicInterfaceTest.TestCase):
def test_vxlan_neighbor_suppress(self):
bridge = 'br555'
interface = 'vxlan555'
- source_interface = 'eth0'
+ source_interface = 'dum0'
+
+ self.cli_set(['interfaces', Section.section(source_interface), source_interface, 'mtu', '9000'])
self.cli_set(self._base_path + [interface, 'parameters', 'external'])
self.cli_set(self._base_path + [interface, 'source-interface', source_interface])
-
self.cli_set(self._base_path + [interface, 'parameters', 'neighbor-suppress'])
# This must fail as this VXLAN interface is not associated with any bridge
@@ -221,6 +225,95 @@ class VXLANInterfaceTest(BasicInterfaceTest.TestCase):
self.assertTrue(tmp['linkinfo']['info_slave_data']['learning'])
self.cli_delete(['interfaces', 'bridge', bridge])
+ self.cli_delete(['interfaces', Section.section(source_interface), source_interface])
+
+ def test_vxlan_vni_filter(self):
+ interfaces = ['vxlan987', 'vxlan986', 'vxlan985']
+ source_address = '192.0.2.77'
+
+ for interface in interfaces:
+ self.cli_set(self._base_path + [interface, 'parameters', 'external'])
+ self.cli_set(self._base_path + [interface, 'source-address', source_address])
+
+ # This must fail as there can only be one "external" VXLAN device unless "vni-filter" is defined
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
+
+ # Enable "vni-filter" on the first VXLAN interface
+ self.cli_set(self._base_path + [interfaces[0], 'parameters', 'vni-filter'])
+
+ # This must fail as if it's enabled on one VXLAN interface, it must be enabled on all
+ # VXLAN interfaces
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
+ for interface in interfaces:
+ self.cli_set(self._base_path + [interface, 'parameters', 'vni-filter'])
+
+ # commit configuration
+ self.cli_commit()
+
+ for interface in interfaces:
+ self.assertTrue(interface_exists(interface))
+
+ tmp = get_interface_config(interface)
+ self.assertTrue(tmp['linkinfo']['info_data']['vnifilter'])
+
+ def test_vxlan_vni_filter_add_remove(self):
+ interface = 'vxlan987'
+ source_address = '192.0.2.66'
+ bridge = 'br0'
+
+ self.cli_set(self._base_path + [interface, 'parameters', 'external'])
+ self.cli_set(self._base_path + [interface, 'source-address', source_address])
+ self.cli_set(self._base_path + [interface, 'parameters', 'vni-filter'])
+
+ # commit configuration
+ self.cli_commit()
+
+ # Check if VXLAN interface got created
+ self.assertTrue(interface_exists(interface))
+
+ # VNI filter configured?
+ tmp = get_interface_config(interface)
+ self.assertTrue(tmp['linkinfo']['info_data']['vnifilter'])
+
+ # Now create some VLAN mappings and VNI filter
+ vlan_to_vni = {
+ '50': '10050',
+ '51': '10051',
+ '52': '10052',
+ '53': '10053',
+ '54': '10054',
+ '60': '10060',
+ '69': '10069',
+ }
+ for vlan, vni in vlan_to_vni.items():
+ self.cli_set(self._base_path + [interface, 'vlan-to-vni', vlan, 'vni', vni])
+ # we need a bridge ...
+ self.cli_set(['interfaces', 'bridge', bridge, 'member', 'interface', interface])
+ # commit configuration
+ self.cli_commit()
+
+ # All VNIs configured?
+ tmp = get_vxlan_vni_filter(interface)
+ self.assertListEqual(list(vlan_to_vni.values()), tmp)
+
+ #
+ # Delete a VLAN mappings and check if all VNIs are properly set up
+ #
+ vlan_to_vni.popitem()
+ self.cli_delete(self._base_path + [interface, 'vlan-to-vni'])
+ for vlan, vni in vlan_to_vni.items():
+ self.cli_set(self._base_path + [interface, 'vlan-to-vni', vlan, 'vni', vni])
+
+ # commit configuration
+ self.cli_commit()
+
+ # All VNIs configured?
+ tmp = get_vxlan_vni_filter(interface)
+ self.assertListEqual(list(vlan_to_vni.values()), tmp)
+
+ self.cli_delete(['interfaces', 'bridge', bridge])
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_policy.py b/smoketest/scripts/cli/test_policy.py
index 51a33f978..c21d8af4e 100755
--- a/smoketest/scripts/cli/test_policy.py
+++ b/smoketest/scripts/cli/test_policy.py
@@ -1107,6 +1107,33 @@ class TestPolicy(VyOSUnitTestSHIM.TestCase):
'metric' : '-20',
},
},
+ '30': {
+ 'action': 'permit',
+ 'match': {
+ 'ip-nexthop-addr': ipv4_nexthop_address,
+ },
+ 'set': {
+ 'metric': 'rtt',
+ },
+ },
+ '40': {
+ 'action': 'permit',
+ 'match': {
+ 'ip-nexthop-addr': ipv4_nexthop_address,
+ },
+ 'set': {
+ 'metric': '+rtt',
+ },
+ },
+ '50': {
+ 'action': 'permit',
+ 'match': {
+ 'ip-nexthop-addr': ipv4_nexthop_address,
+ },
+ 'set': {
+ 'metric': '-rtt',
+ },
+ },
},
},
}
diff --git a/smoketest/scripts/cli/test_protocols_bfd.py b/smoketest/scripts/cli/test_protocols_bfd.py
index 451565664..f209eae3a 100755
--- a/smoketest/scripts/cli/test_protocols_bfd.py
+++ b/smoketest/scripts/cli/test_protocols_bfd.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2021 VyOS maintainers and contributors
+# Copyright (C) 2021-2023 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -77,11 +77,23 @@ profiles = {
}
class TestProtocolsBFD(VyOSUnitTestSHIM.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ super(TestProtocolsBFD, cls).setUpClass()
+
+ # Retrieve FRR daemon PID - it is not allowed to crash, thus PID must remain the same
+ cls.daemon_pid = process_named_running(PROCESS_NAME)
+
+ # ensure we can also run this test on a live system - so lets clean
+ # out the current configuration :)
+ cls.cli_delete(cls, base_path)
+
def tearDown(self):
self.cli_delete(base_path)
self.cli_commit()
- # Check for running process
- self.assertTrue(process_named_running(PROCESS_NAME))
+
+ # check process health and continuity
+ self.assertEqual(self.daemon_pid, process_named_running(PROCESS_NAME))
def test_bfd_peer(self):
self.cli_set(['vrf', 'name', vrf_name, 'table', '1000'])
diff --git a/smoketest/scripts/cli/test_protocols_bgp.py b/smoketest/scripts/cli/test_protocols_bgp.py
index 23e138ebe..71e2142f9 100755
--- a/smoketest/scripts/cli/test_protocols_bgp.py
+++ b/smoketest/scripts/cli/test_protocols_bgp.py
@@ -174,9 +174,16 @@ class TestProtocolsBGP(VyOSUnitTestSHIM.TestCase):
def setUpClass(cls):
super(TestProtocolsBGP, cls).setUpClass()
+ # Retrieve FRR daemon PID - it is not allowed to crash, thus PID must remain the same
+ cls.daemon_pid = process_named_running(PROCESS_NAME)
+
# ensure we can also run this test on a live system - so lets clean
# out the current configuration :)
cls.cli_delete(cls, base_path)
+ cls.cli_delete(cls, ['policy', 'route-map'])
+ cls.cli_delete(cls, ['policy', 'prefix-list'])
+ cls.cli_delete(cls, ['policy', 'prefix-list6'])
+ cls.cli_delete(cls, ['vrf'])
cls.cli_set(cls, ['policy', 'route-map', route_map_in, 'rule', '10', 'action', 'permit'])
cls.cli_set(cls, ['policy', 'route-map', route_map_out, 'rule', '10', 'action', 'permit'])
@@ -192,18 +199,23 @@ class TestProtocolsBGP(VyOSUnitTestSHIM.TestCase):
@classmethod
def tearDownClass(cls):
- cls.cli_delete(cls, ['policy'])
+ cls.cli_delete(cls, ['policy', 'route-map'])
+ cls.cli_delete(cls, ['policy', 'prefix-list'])
+ cls.cli_delete(cls, ['policy', 'prefix-list6'])
def setUp(self):
self.cli_set(base_path + ['system-as', ASN])
def tearDown(self):
+ # cleanup any possible VRF mess
self.cli_delete(['vrf'])
+ # always destrox the entire bgpd configuration to make the processes
+ # life as hard as possible
self.cli_delete(base_path)
self.cli_commit()
- # Check for running process
- self.assertTrue(process_named_running(PROCESS_NAME))
+ # check process health and continuity
+ self.assertEqual(self.daemon_pid, process_named_running(PROCESS_NAME))
def create_bgp_instances_for_import_test(self):
table = '1000'
diff --git a/smoketest/scripts/cli/test_protocols_igmp-proxy.py b/smoketest/scripts/cli/test_protocols_igmp-proxy.py
index a75003b12..df10442ea 100755
--- a/smoketest/scripts/cli/test_protocols_igmp-proxy.py
+++ b/smoketest/scripts/cli/test_protocols_igmp-proxy.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2019-2020 VyOS maintainers and contributors
+# Copyright (C) 2019-2023 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -29,14 +29,32 @@ upstream_if = 'eth1'
downstream_if = 'eth2'
class TestProtocolsIGMPProxy(VyOSUnitTestSHIM.TestCase):
- def setUp(self):
- self.cli_set(['interfaces', 'ethernet', upstream_if, 'address', '172.16.1.1/24'])
+ @classmethod
+ def setUpClass(cls):
+ # call base-classes classmethod
+ super(TestProtocolsIGMPProxy, cls).setUpClass()
+ # ensure we can also run this test on a live system - so lets clean
+ # out the current configuration :)
+ cls.cli_delete(cls, base_path)
+ cls.cli_set(cls, ['interfaces', 'ethernet', upstream_if, 'address', '172.16.1.1/24'])
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.cli_delete(cls, ['interfaces', 'ethernet', upstream_if, 'address'])
+
+ # call base-classes classmethod
+ super(TestProtocolsIGMPProxy, cls).tearDownClass()
def tearDown(self):
- self.cli_delete(['interfaces', 'ethernet', upstream_if, 'address'])
+ # Check for running process
+ self.assertTrue(process_named_running(PROCESS_NAME))
+
self.cli_delete(base_path)
self.cli_commit()
+ # Check for no longer running process
+ self.assertFalse(process_named_running(PROCESS_NAME))
+
def test_igmpproxy(self):
threshold = '20'
altnet = '192.0.2.0/24'
@@ -74,8 +92,5 @@ class TestProtocolsIGMPProxy(VyOSUnitTestSHIM.TestCase):
self.assertIn(f'whitelist {whitelist}', config)
self.assertIn(f'phyint {downstream_if} downstream ratelimit 0 threshold 1', config)
- # Check for running process
- self.assertTrue(process_named_running(PROCESS_NAME))
-
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_protocols_isis.py b/smoketest/scripts/cli/test_protocols_isis.py
index 8b423dbea..aa5f2f38c 100755
--- a/smoketest/scripts/cli/test_protocols_isis.py
+++ b/smoketest/scripts/cli/test_protocols_isis.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2021-2022 VyOS maintainers and contributors
+# Copyright (C) 2021-2023 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -31,20 +31,25 @@ class TestProtocolsISIS(VyOSUnitTestSHIM.TestCase):
@classmethod
def setUpClass(cls):
cls._interfaces = Section.interfaces('ethernet')
-
# call base-classes classmethod
super(TestProtocolsISIS, cls).setUpClass()
-
+ # Retrieve FRR daemon PID - it is not allowed to crash, thus PID must remain the same
+ cls.daemon_pid = process_named_running(PROCESS_NAME)
# ensure we can also run this test on a live system - so lets clean
# out the current configuration :)
cls.cli_delete(cls, base_path)
+ cls.cli_delete(cls, ['vrf'])
def tearDown(self):
+ # cleanup any possible VRF mess
+ self.cli_delete(['vrf'])
+ # always destrox the entire isisd configuration to make the processes
+ # life as hard as possible
self.cli_delete(base_path)
self.cli_commit()
- # Check for running process
- self.assertTrue(process_named_running(PROCESS_NAME))
+ # check process health and continuity
+ self.assertEqual(self.daemon_pid, process_named_running(PROCESS_NAME))
def isis_base_config(self):
self.cli_set(base_path + ['net', net])
@@ -333,7 +338,7 @@ class TestProtocolsISIS(VyOSUnitTestSHIM.TestCase):
self.cli_set(base_path + ['interface', interface])
self.cli_set(['policy', 'prefix-list', prefix_list, 'rule', '1', 'action', 'permit'])
self.cli_set(['policy', 'prefix-list', prefix_list, 'rule', '1', 'prefix', prefix_list_address])
-
+
# Commit main ISIS changes
self.cli_commit()
@@ -385,4 +390,4 @@ class TestProtocolsISIS(VyOSUnitTestSHIM.TestCase):
self.cli_commit()
if __name__ == '__main__':
- unittest.main(verbosity=2) \ No newline at end of file
+ unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_protocols_mpls.py b/smoketest/scripts/cli/test_protocols_mpls.py
index 06f21c6e1..0c1599f9b 100755
--- a/smoketest/scripts/cli/test_protocols_mpls.py
+++ b/smoketest/scripts/cli/test_protocols_mpls.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2021 VyOS maintainers and contributors
+# Copyright (C) 2021-2023 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -70,6 +70,9 @@ class TestProtocolsMPLS(VyOSUnitTestSHIM.TestCase):
def setUpClass(cls):
super(TestProtocolsMPLS, cls).setUpClass()
+ # Retrieve FRR daemon PID - it is not allowed to crash, thus PID must remain the same
+ cls.daemon_pid = process_named_running(PROCESS_NAME)
+
# ensure we can also run this test on a live system - so lets clean
# out the current configuration :)
cls.cli_delete(cls, base_path)
@@ -77,8 +80,9 @@ class TestProtocolsMPLS(VyOSUnitTestSHIM.TestCase):
def tearDown(self):
self.cli_delete(base_path)
self.cli_commit()
- # Check for running process
- self.assertTrue(process_named_running(PROCESS_NAME))
+
+ # check process health and continuity
+ self.assertEqual(self.daemon_pid, process_named_running(PROCESS_NAME))
def test_mpls_basic(self):
router_id = '1.2.3.4'
diff --git a/smoketest/scripts/cli/test_protocols_ospf.py b/smoketest/scripts/cli/test_protocols_ospf.py
index a6850db71..6bffc7c45 100755
--- a/smoketest/scripts/cli/test_protocols_ospf.py
+++ b/smoketest/scripts/cli/test_protocols_ospf.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2021-2022 VyOS maintainers and contributors
+# Copyright (C) 2021-2023 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -32,6 +32,9 @@ class TestProtocolsOSPF(VyOSUnitTestSHIM.TestCase):
def setUpClass(cls):
super(TestProtocolsOSPF, cls).setUpClass()
+ # Retrieve FRR daemon PID - it is not allowed to crash, thus PID must remain the same
+ cls.daemon_pid = process_named_running(PROCESS_NAME)
+
cls.cli_set(cls, ['policy', 'route-map', route_map, 'rule', '10', 'action', 'permit'])
cls.cli_set(cls, ['policy', 'route-map', route_map, 'rule', '20', 'action', 'permit'])
@@ -45,11 +48,12 @@ class TestProtocolsOSPF(VyOSUnitTestSHIM.TestCase):
super(TestProtocolsOSPF, cls).tearDownClass()
def tearDown(self):
- # Check for running process
- self.assertTrue(process_named_running(PROCESS_NAME))
self.cli_delete(base_path)
self.cli_commit()
+ # check process health and continuity
+ self.assertEqual(self.daemon_pid, process_named_running(PROCESS_NAME))
+
def test_ospf_01_defaults(self):
# commit changes
self.cli_set(base_path)
diff --git a/smoketest/scripts/cli/test_protocols_ospfv3.py b/smoketest/scripts/cli/test_protocols_ospfv3.py
index 0d6c6c691..4ae7f05d9 100755
--- a/smoketest/scripts/cli/test_protocols_ospfv3.py
+++ b/smoketest/scripts/cli/test_protocols_ospfv3.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2021-2022 VyOS maintainers and contributors
+# Copyright (C) 2021-2023 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -35,6 +35,9 @@ class TestProtocolsOSPFv3(VyOSUnitTestSHIM.TestCase):
def setUpClass(cls):
super(TestProtocolsOSPFv3, cls).setUpClass()
+ # Retrieve FRR daemon PID - it is not allowed to crash, thus PID must remain the same
+ cls.daemon_pid = process_named_running(PROCESS_NAME)
+
cls.cli_set(cls, ['policy', 'route-map', route_map, 'rule', '10', 'action', 'permit'])
cls.cli_set(cls, ['policy', 'route-map', route_map, 'rule', '20', 'action', 'permit'])
@@ -48,11 +51,12 @@ class TestProtocolsOSPFv3(VyOSUnitTestSHIM.TestCase):
super(TestProtocolsOSPFv3, cls).tearDownClass()
def tearDown(self):
- # Check for running process
- self.assertTrue(process_named_running(PROCESS_NAME))
self.cli_delete(base_path)
self.cli_commit()
+ # check process health and continuity
+ self.assertEqual(self.daemon_pid, process_named_running(PROCESS_NAME))
+
def test_ospfv3_01_basic(self):
seq = '10'
prefix = '2001:db8::/32'
diff --git a/smoketest/scripts/cli/test_protocols_pim.py b/smoketest/scripts/cli/test_protocols_pim.py
new file mode 100755
index 000000000..ccfced138
--- /dev/null
+++ b/smoketest/scripts/cli/test_protocols_pim.py
@@ -0,0 +1,192 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 2023 VyOS maintainers and contributors
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 or later as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import unittest
+
+from base_vyostest_shim import VyOSUnitTestSHIM
+
+from vyos.configsession import ConfigSessionError
+from vyos.ifconfig import Section
+from vyos.utils.process import process_named_running
+
+PROCESS_NAME = 'pimd'
+base_path = ['protocols', 'pim']
+
+class TestProtocolsPIM(VyOSUnitTestSHIM.TestCase):
+ def tearDown(self):
+ # pimd process must be running
+ self.assertTrue(process_named_running(PROCESS_NAME))
+
+ self.cli_delete(base_path)
+ self.cli_commit()
+
+ # pimd process must be stopped by now
+ self.assertFalse(process_named_running(PROCESS_NAME))
+
+ def test_01_pim_basic(self):
+ rp = '127.0.0.1'
+ group = '224.0.0.0/4'
+ hello = '100'
+ dr_priority = '64'
+
+ self.cli_set(base_path + ['rp', 'address', rp, 'group', group])
+
+ interfaces = Section.interfaces('ethernet')
+ for interface in interfaces:
+ self.cli_set(base_path + ['interface', interface , 'bfd'])
+ self.cli_set(base_path + ['interface', interface , 'dr-priority', dr_priority])
+ self.cli_set(base_path + ['interface', interface , 'hello', hello])
+ self.cli_set(base_path + ['interface', interface , 'no-bsm'])
+ self.cli_set(base_path + ['interface', interface , 'no-unicast-bsm'])
+ self.cli_set(base_path + ['interface', interface , 'passive'])
+
+ # commit changes
+ self.cli_commit()
+
+ # Verify FRR pimd configuration
+ frrconfig = self.getFRRconfig(daemon=PROCESS_NAME)
+ self.assertIn(f'ip pim rp {rp} {group}', frrconfig)
+
+ for interface in interfaces:
+ frrconfig = self.getFRRconfig(f'interface {interface}', daemon=PROCESS_NAME)
+ self.assertIn(f'interface {interface}', frrconfig)
+ self.assertIn(f' ip pim', frrconfig)
+ self.assertIn(f' ip pim bfd', frrconfig)
+ self.assertIn(f' ip pim drpriority {dr_priority}', frrconfig)
+ self.assertIn(f' ip pim hello {hello}', frrconfig)
+ self.assertIn(f' no ip pim bsm', frrconfig)
+ self.assertIn(f' no ip pim unicast-bsm', frrconfig)
+ self.assertIn(f' ip pim passive', frrconfig)
+
+ self.cli_commit()
+
+ def test_02_pim_advanced(self):
+ rp = '127.0.0.2'
+ group = '224.0.0.0/4'
+ join_prune_interval = '123'
+ rp_keep_alive_timer = '190'
+ keep_alive_timer = '180'
+ packets = '10'
+ prefix_list = 'pim-test'
+ register_suppress_time = '300'
+
+ self.cli_set(base_path + ['rp', 'address', rp, 'group', group])
+ self.cli_set(base_path + ['rp', 'keep-alive-timer', rp_keep_alive_timer])
+
+ self.cli_set(base_path + ['ecmp', 'rebalance'])
+ self.cli_set(base_path + ['join-prune-interval', join_prune_interval])
+ self.cli_set(base_path + ['keep-alive-timer', keep_alive_timer])
+ self.cli_set(base_path + ['packets', packets])
+ self.cli_set(base_path + ['register-accept-list', 'prefix-list', prefix_list])
+ self.cli_set(base_path + ['register-suppress-time', register_suppress_time])
+ self.cli_set(base_path + ['no-v6-secondary'])
+ self.cli_set(base_path + ['spt-switchover', 'infinity-and-beyond', 'prefix-list', prefix_list])
+ self.cli_set(base_path + ['ssm', 'prefix-list', prefix_list])
+
+ # check validate() - PIM require defined interfaces!
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
+
+ interfaces = Section.interfaces('ethernet')
+ for interface in interfaces:
+ self.cli_set(base_path + ['interface', interface])
+
+ # commit changes
+ self.cli_commit()
+
+ # Verify FRR pimd configuration
+ frrconfig = self.getFRRconfig(daemon=PROCESS_NAME)
+ self.assertIn(f'ip pim rp {rp} {group}', frrconfig)
+ self.assertIn(f'ip pim rp keep-alive-timer {rp_keep_alive_timer}', frrconfig)
+ self.assertIn(f'ip pim ecmp rebalance', frrconfig)
+ self.assertIn(f'ip pim join-prune-interval {join_prune_interval}', frrconfig)
+ self.assertIn(f'ip pim keep-alive-timer {keep_alive_timer}', frrconfig)
+ self.assertIn(f'ip pim packets {packets}', frrconfig)
+ self.assertIn(f'ip pim register-accept-list {prefix_list}', frrconfig)
+ self.assertIn(f'ip pim register-suppress-time {register_suppress_time}', frrconfig)
+ self.assertIn(f'no ip pim send-v6-secondary', frrconfig)
+ self.assertIn(f'ip pim spt-switchover infinity-and-beyond prefix-list {prefix_list}', frrconfig)
+ self.assertIn(f'ip pim ssm prefix-list {prefix_list}', frrconfig)
+
+ def test_03_pim_igmp_proxy(self):
+ igmp_proxy = ['protocols', 'igmp-proxy']
+ rp = '127.0.0.1'
+ group = '224.0.0.0/4'
+
+ self.cli_set(base_path)
+ self.cli_set(igmp_proxy)
+
+ # check validate() - can not set both IGMP proxy and PIM
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
+
+ self.cli_delete(igmp_proxy)
+
+ self.cli_set(base_path + ['rp', 'address', rp, 'group', group])
+ interfaces = Section.interfaces('ethernet')
+ for interface in interfaces:
+ self.cli_set(base_path + ['interface', interface , 'bfd'])
+
+ # commit changes
+ self.cli_commit()
+
+ def test_04_igmp(self):
+ watermark_warning = '2000'
+ query_interval = '1000'
+ query_max_response_time = '200'
+ version = '2'
+
+ igmp_join = {
+ '224.1.1.1' : { 'source' : ['1.1.1.1', '2.2.2.2', '3.3.3.3'] },
+ '224.1.2.2' : { 'source' : [] },
+ '224.1.3.3' : {},
+ }
+
+ self.cli_set(base_path + ['igmp', 'watermark-warning', watermark_warning])
+ interfaces = Section.interfaces('ethernet')
+ for interface in interfaces:
+ self.cli_set(base_path + ['interface', interface , 'igmp', 'version', version])
+ self.cli_set(base_path + ['interface', interface , 'igmp', 'query-interval', query_interval])
+ self.cli_set(base_path + ['interface', interface , 'igmp', 'query-max-response-time', query_max_response_time])
+
+ for join, join_config in igmp_join.items():
+ self.cli_set(base_path + ['interface', interface , 'igmp', 'join', join])
+ if 'source' in join_config:
+ for source in join_config['source']:
+ self.cli_set(base_path + ['interface', interface , 'igmp', 'join', join, 'source-address', source])
+
+ self.cli_commit()
+
+ frrconfig = self.getFRRconfig(daemon=PROCESS_NAME)
+ self.assertIn(f'ip igmp watermark-warn {watermark_warning}', frrconfig)
+
+ for interface in interfaces:
+ frrconfig = self.getFRRconfig(f'interface {interface}', daemon=PROCESS_NAME)
+ self.assertIn(f'interface {interface}', frrconfig)
+ self.assertIn(f' ip igmp', frrconfig)
+ self.assertIn(f' ip igmp version {version}', frrconfig)
+ self.assertIn(f' ip igmp query-interval {query_interval}', frrconfig)
+ self.assertIn(f' ip igmp query-max-response-time {query_max_response_time}', frrconfig)
+
+ for join, join_config in igmp_join.items():
+ if 'source' in join_config:
+ for source in join_config['source']:
+ self.assertIn(f' ip igmp join {join} {source}', frrconfig)
+ else:
+ self.assertIn(f' ip igmp join {join}', frrconfig)
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_protocols_pim6.py b/smoketest/scripts/cli/test_protocols_pim6.py
index 1be12836d..ba24edca2 100755
--- a/smoketest/scripts/cli/test_protocols_pim6.py
+++ b/smoketest/scripts/cli/test_protocols_pim6.py
@@ -24,18 +24,27 @@ from vyos.utils.process import process_named_running
PROCESS_NAME = 'pim6d'
base_path = ['protocols', 'pim6']
-
class TestProtocolsPIMv6(VyOSUnitTestSHIM.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ # call base-classes classmethod
+ super(TestProtocolsPIMv6, cls).setUpClass()
+ # Retrieve FRR daemon PID - it is not allowed to crash, thus PID must remain the same
+ cls.daemon_pid = process_named_running(PROCESS_NAME)
+ # ensure we can also run this test on a live system - so lets clean
+ # out the current configuration :)
+ cls.cli_delete(cls, base_path)
+
def tearDown(self):
- # Check for running process
- self.assertTrue(process_named_running(PROCESS_NAME))
self.cli_delete(base_path)
self.cli_commit()
+ # check process health and continuity
+ self.assertEqual(self.daemon_pid, process_named_running(PROCESS_NAME))
+
def test_pim6_01_mld_simple(self):
# commit changes
interfaces = Section.interfaces('ethernet')
-
for interface in interfaces:
self.cli_set(base_path + ['interface', interface])
@@ -43,68 +52,95 @@ class TestProtocolsPIMv6(VyOSUnitTestSHIM.TestCase):
# Verify FRR pim6d configuration
for interface in interfaces:
- config = self.getFRRconfig(
- f'interface {interface}', daemon=PROCESS_NAME)
+ config = self.getFRRconfig(f'interface {interface}', daemon=PROCESS_NAME)
self.assertIn(f'interface {interface}', config)
self.assertIn(f' ipv6 mld', config)
self.assertNotIn(f' ipv6 mld version 1', config)
# Change to MLD version 1
for interface in interfaces:
- self.cli_set(base_path + ['interface',
- interface, 'mld', 'version', '1'])
+ self.cli_set(base_path + ['interface', interface, 'mld', 'version', '1'])
self.cli_commit()
# Verify FRR pim6d configuration
for interface in interfaces:
- config = self.getFRRconfig(
- f'interface {interface}', daemon=PROCESS_NAME)
+ config = self.getFRRconfig(f'interface {interface}', daemon=PROCESS_NAME)
self.assertIn(f'interface {interface}', config)
self.assertIn(f' ipv6 mld', config)
self.assertIn(f' ipv6 mld version 1', config)
def test_pim6_02_mld_join(self):
- # commit changes
interfaces = Section.interfaces('ethernet')
-
- # Use an invalid multiple group address
+ # Use an invalid multicast group address
for interface in interfaces:
- self.cli_set(base_path + ['interface',
- interface, 'mld', 'join', 'fd00::1234'])
+ self.cli_set(base_path + ['interface', interface, 'mld', 'join', 'fd00::1234'])
with self.assertRaises(ConfigSessionError):
self.cli_commit()
self.cli_delete(base_path + ['interface'])
- # Use a valid multiple group address
+ # Use a valid multicast group address
for interface in interfaces:
- self.cli_set(base_path + ['interface',
- interface, 'mld', 'join', 'ff18::1234'])
+ self.cli_set(base_path + ['interface', interface, 'mld', 'join', 'ff18::1234'])
self.cli_commit()
# Verify FRR pim6d configuration
for interface in interfaces:
- config = self.getFRRconfig(
- f'interface {interface}', daemon=PROCESS_NAME)
+ config = self.getFRRconfig(f'interface {interface}', daemon=PROCESS_NAME)
self.assertIn(f'interface {interface}', config)
self.assertIn(f' ipv6 mld join ff18::1234', config)
# Join a source-specific multicast group
for interface in interfaces:
- self.cli_set(base_path + ['interface', interface,
- 'mld', 'join', 'ff38::5678', 'source', '2001:db8::5678'])
+ self.cli_set(base_path + ['interface', interface, 'mld', 'join', 'ff38::5678', 'source', '2001:db8::5678'])
self.cli_commit()
# Verify FRR pim6d configuration
for interface in interfaces:
- config = self.getFRRconfig(
- f'interface {interface}', daemon=PROCESS_NAME)
+ config = self.getFRRconfig(f'interface {interface}', daemon=PROCESS_NAME)
self.assertIn(f'interface {interface}', config)
self.assertIn(f' ipv6 mld join ff38::5678 2001:db8::5678', config)
+ def test_pim6_03_basic(self):
+ interfaces = Section.interfaces('ethernet')
+ join_prune_interval = '123'
+ keep_alive_timer = '77'
+ packets = '5'
+ register_suppress_time = '99'
+ dr_priority = '100'
+ hello = '50'
+
+ self.cli_set(base_path + ['join-prune-interval', join_prune_interval])
+ self.cli_set(base_path + ['keep-alive-timer', keep_alive_timer])
+ self.cli_set(base_path + ['packets', packets])
+ self.cli_set(base_path + ['register-suppress-time', register_suppress_time])
+
+ for interface in interfaces:
+ self.cli_set(base_path + ['interface', interface, 'dr-priority', dr_priority])
+ self.cli_set(base_path + ['interface', interface, 'hello', hello])
+ self.cli_set(base_path + ['interface', interface, 'no-bsm'])
+ self.cli_set(base_path + ['interface', interface, 'no-unicast-bsm'])
+ self.cli_set(base_path + ['interface', interface, 'passive'])
+
+ self.cli_commit()
+
+ # Verify FRR pim6d configuration
+ config = self.getFRRconfig(daemon=PROCESS_NAME)
+ self.assertIn(f'ipv6 pim join-prune-interval {join_prune_interval}', config)
+ self.assertIn(f'ipv6 pim keep-alive-timer {keep_alive_timer}', config)
+ self.assertIn(f'ipv6 pim packets {packets}', config)
+ self.assertIn(f'ipv6 pim register-suppress-time {register_suppress_time}', config)
+
+ for interface in interfaces:
+ config = self.getFRRconfig(f'interface {interface}', daemon=PROCESS_NAME)
+ self.assertIn(f' ipv6 pim drpriority {dr_priority}', config)
+ self.assertIn(f' ipv6 pim hello {hello}', config)
+ self.assertIn(f' no ipv6 pim bsm', config)
+ self.assertIn(f' no ipv6 pim unicast-bsm', config)
+ self.assertIn(f' ipv6 pim passive', config)
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_protocols_rip.py b/smoketest/scripts/cli/test_protocols_rip.py
index 925499fc8..bfc327fd4 100755
--- a/smoketest/scripts/cli/test_protocols_rip.py
+++ b/smoketest/scripts/cli/test_protocols_rip.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2021 VyOS maintainers and contributors
+# Copyright (C) 2021-2023 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -34,7 +34,8 @@ class TestProtocolsRIP(VyOSUnitTestSHIM.TestCase):
@classmethod
def setUpClass(cls):
super(TestProtocolsRIP, cls).setUpClass()
-
+ # Retrieve FRR daemon PID - it is not allowed to crash, thus PID must remain the same
+ cls.daemon_pid = process_named_running(PROCESS_NAME)
# ensure we can also run this test on a live system - so lets clean
# out the current configuration :)
cls.cli_delete(cls, base_path)
@@ -65,8 +66,8 @@ class TestProtocolsRIP(VyOSUnitTestSHIM.TestCase):
self.cli_delete(base_path)
self.cli_commit()
- # Check for running process
- self.assertTrue(process_named_running(PROCESS_NAME))
+ # check process health and continuity
+ self.assertEqual(self.daemon_pid, process_named_running(PROCESS_NAME))
def test_rip_01_parameters(self):
distance = '40'
diff --git a/smoketest/scripts/cli/test_protocols_ripng.py b/smoketest/scripts/cli/test_protocols_ripng.py
index 0a8ce7eef..0cfb065c6 100755
--- a/smoketest/scripts/cli/test_protocols_ripng.py
+++ b/smoketest/scripts/cli/test_protocols_ripng.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2021 VyOS maintainers and contributors
+# Copyright (C) 2021-2023 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -31,28 +31,43 @@ route_map = 'FooBar123'
base_path = ['protocols', 'ripng']
class TestProtocolsRIPng(VyOSUnitTestSHIM.TestCase):
- def setUp(self):
- self.cli_set(['policy', 'access-list6', acl_in, 'rule', '10', 'action', 'permit'])
- self.cli_set(['policy', 'access-list6', acl_in, 'rule', '10', 'source', 'any'])
- self.cli_set(['policy', 'access-list6', acl_out, 'rule', '20', 'action', 'deny'])
- self.cli_set(['policy', 'access-list6', acl_out, 'rule', '20', 'source', 'any'])
- self.cli_set(['policy', 'prefix-list6', prefix_list_in, 'rule', '100', 'action', 'permit'])
- self.cli_set(['policy', 'prefix-list6', prefix_list_in, 'rule', '100', 'prefix', '2001:db8::/32'])
- self.cli_set(['policy', 'prefix-list6', prefix_list_out, 'rule', '200', 'action', 'deny'])
- self.cli_set(['policy', 'prefix-list6', prefix_list_out, 'rule', '200', 'prefix', '2001:db8::/32'])
- self.cli_set(['policy', 'route-map', route_map, 'rule', '10', 'action', 'permit'])
+ @classmethod
+ def setUpClass(cls):
+ # call base-classes classmethod
+ super(TestProtocolsRIPng, cls).setUpClass()
+ # Retrieve FRR daemon PID - it is not allowed to crash, thus PID must remain the same
+ cls.daemon_pid = process_named_running(PROCESS_NAME)
+ # ensure we can also run this test on a live system - so lets clean
+ # out the current configuration :)
+ cls.cli_delete(cls, base_path)
+
+ cls.cli_set(cls, ['policy', 'access-list6', acl_in, 'rule', '10', 'action', 'permit'])
+ cls.cli_set(cls, ['policy', 'access-list6', acl_in, 'rule', '10', 'source', 'any'])
+ cls.cli_set(cls, ['policy', 'access-list6', acl_out, 'rule', '20', 'action', 'deny'])
+ cls.cli_set(cls, ['policy', 'access-list6', acl_out, 'rule', '20', 'source', 'any'])
+ cls.cli_set(cls, ['policy', 'prefix-list6', prefix_list_in, 'rule', '100', 'action', 'permit'])
+ cls.cli_set(cls, ['policy', 'prefix-list6', prefix_list_in, 'rule', '100', 'prefix', '2001:db8::/32'])
+ cls.cli_set(cls, ['policy', 'prefix-list6', prefix_list_out, 'rule', '200', 'action', 'deny'])
+ cls.cli_set(cls, ['policy', 'prefix-list6', prefix_list_out, 'rule', '200', 'prefix', '2001:db8::/32'])
+ cls.cli_set(cls, ['policy', 'route-map', route_map, 'rule', '10', 'action', 'permit'])
+
+ @classmethod
+ def tearDownClass(cls):
+ # call base-classes classmethod
+ super(TestProtocolsRIPng, cls).tearDownClass()
+
+ cls.cli_delete(cls, ['policy', 'access-list6', acl_in])
+ cls.cli_delete(cls, ['policy', 'access-list6', acl_out])
+ cls.cli_delete(cls, ['policy', 'prefix-list6', prefix_list_in])
+ cls.cli_delete(cls, ['policy', 'prefix-list6', prefix_list_out])
+ cls.cli_delete(cls, ['policy', 'route-map', route_map])
def tearDown(self):
self.cli_delete(base_path)
- self.cli_delete(['policy', 'access-list6', acl_in])
- self.cli_delete(['policy', 'access-list6', acl_out])
- self.cli_delete(['policy', 'prefix-list6', prefix_list_in])
- self.cli_delete(['policy', 'prefix-list6', prefix_list_out])
- self.cli_delete(['policy', 'route-map', route_map])
self.cli_commit()
- # Check for running process
- self.assertTrue(process_named_running(PROCESS_NAME))
+ # check process health and continuity
+ self.assertEqual(self.daemon_pid, process_named_running(PROCESS_NAME))
def test_ripng_01_parameters(self):
metric = '8'
diff --git a/smoketest/scripts/cli/test_protocols_rpki.py b/smoketest/scripts/cli/test_protocols_rpki.py
index f4aedcbc3..ab3f076ac 100755
--- a/smoketest/scripts/cli/test_protocols_rpki.py
+++ b/smoketest/scripts/cli/test_protocols_rpki.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2021 VyOS maintainers and contributors
+# Copyright (C) 2021-2023 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -31,6 +31,16 @@ rpki_ssh_key = '/config/auth/id_rsa_rpki'
rpki_ssh_pub = f'{rpki_ssh_key}.pub'
class TestProtocolsRPKI(VyOSUnitTestSHIM.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ # call base-classes classmethod
+ super(TestProtocolsRPKI, cls).setUpClass()
+ # Retrieve FRR daemon PID - it is not allowed to crash, thus PID must remain the same
+ cls.daemon_pid = process_named_running(PROCESS_NAME)
+ # ensure we can also run this test on a live system - so lets clean
+ # out the current configuration :)
+ cls.cli_delete(cls, base_path)
+
def tearDown(self):
self.cli_delete(base_path)
self.cli_commit()
@@ -39,8 +49,8 @@ class TestProtocolsRPKI(VyOSUnitTestSHIM.TestCase):
# frrconfig = self.getFRRconfig('rpki')
# self.assertNotIn('rpki', frrconfig)
- # Check for running process
- self.assertTrue(process_named_running(PROCESS_NAME))
+ # check process health and continuity
+ self.assertEqual(self.daemon_pid, process_named_running(PROCESS_NAME))
def test_rpki(self):
polling = '7200'
diff --git a/smoketest/scripts/cli/test_service_dns_dynamic.py b/smoketest/scripts/cli/test_service_dns_dynamic.py
index fe213a8ae..cb3d90593 100755
--- a/smoketest/scripts/cli/test_service_dns_dynamic.py
+++ b/smoketest/scripts/cli/test_service_dns_dynamic.py
@@ -32,6 +32,7 @@ DDCLIENT_PID = '/run/ddclient/ddclient.pid'
DDCLIENT_PNAME = 'ddclient'
base_path = ['service', 'dns', 'dynamic']
+name_path = base_path + ['name']
server = 'ddns.vyos.io'
hostname = 'test.ddns.vyos.io'
zone = 'vyos.io'
@@ -58,38 +59,38 @@ class TestServiceDDNS(VyOSUnitTestSHIM.TestCase):
# IPv4 standard DDNS service configuration
def test_01_dyndns_service_standard(self):
- svc_path = ['address', interface, 'service']
services = {'cloudflare': {'protocol': 'cloudflare'},
'freedns': {'protocol': 'freedns', 'username': username},
'zoneedit': {'protocol': 'zoneedit1', 'username': username}}
for svc, details in services.items():
- self.cli_set(base_path + svc_path + [svc, 'host-name', hostname])
- self.cli_set(base_path + svc_path + [svc, 'password', password])
- self.cli_set(base_path + svc_path + [svc, 'zone', zone])
- self.cli_set(base_path + svc_path + [svc, 'ttl', ttl])
+ self.cli_set(name_path + [svc, 'address', interface])
+ self.cli_set(name_path + [svc, 'host-name', hostname])
+ self.cli_set(name_path + [svc, 'password', password])
+ self.cli_set(name_path + [svc, 'zone', zone])
+ self.cli_set(name_path + [svc, 'ttl', ttl])
for opt, value in details.items():
- self.cli_set(base_path + svc_path + [svc, opt, value])
+ self.cli_set(name_path + [svc, opt, value])
# 'zone' option is supported and required by 'cloudfare', but not 'freedns' and 'zoneedit'
- self.cli_set(base_path + svc_path + [svc, 'zone', zone])
+ self.cli_set(name_path + [svc, 'zone', zone])
if details['protocol'] == 'cloudflare':
pass
else:
# exception is raised for unsupported ones
with self.assertRaises(ConfigSessionError):
self.cli_commit()
- self.cli_delete(base_path + svc_path + [svc, 'zone'])
+ self.cli_delete(name_path + [svc, 'zone'])
# 'ttl' option is supported by 'cloudfare', but not 'freedns' and 'zoneedit'
- self.cli_set(base_path + svc_path + [svc, 'ttl', ttl])
+ self.cli_set(name_path + [svc, 'ttl', ttl])
if details['protocol'] == 'cloudflare':
pass
else:
# exception is raised for unsupported ones
with self.assertRaises(ConfigSessionError):
self.cli_commit()
- self.cli_delete(base_path + svc_path + [svc, 'ttl'])
+ self.cli_delete(name_path + [svc, 'ttl'])
# commit changes
self.cli_commit()
@@ -113,7 +114,7 @@ class TestServiceDDNS(VyOSUnitTestSHIM.TestCase):
# IPv6 only DDNS service configuration
def test_02_dyndns_service_ipv6(self):
interval = '60'
- svc_path = ['address', interface, 'service', 'dynv6']
+ svc_path = name_path + ['dynv6']
proto = 'dyndns2'
ip_version = 'ipv6'
wait_time = '600'
@@ -121,19 +122,20 @@ class TestServiceDDNS(VyOSUnitTestSHIM.TestCase):
expiry_time_bad = '360'
self.cli_set(base_path + ['interval', interval])
- self.cli_set(base_path + svc_path + ['ip-version', ip_version])
- self.cli_set(base_path + svc_path + ['protocol', proto])
- self.cli_set(base_path + svc_path + ['server', server])
- self.cli_set(base_path + svc_path + ['username', username])
- self.cli_set(base_path + svc_path + ['password', password])
- self.cli_set(base_path + svc_path + ['host-name', hostname])
- self.cli_set(base_path + svc_path + ['wait-time', wait_time])
+ self.cli_set(svc_path + ['address', interface])
+ self.cli_set(svc_path + ['ip-version', ip_version])
+ self.cli_set(svc_path + ['protocol', proto])
+ self.cli_set(svc_path + ['server', server])
+ self.cli_set(svc_path + ['username', username])
+ self.cli_set(svc_path + ['password', password])
+ self.cli_set(svc_path + ['host-name', hostname])
+ self.cli_set(svc_path + ['wait-time', wait_time])
# expiry-time must be greater than wait-time, exception is raised otherwise
- self.cli_set(base_path + svc_path + ['expiry-time', expiry_time_bad])
with self.assertRaises(ConfigSessionError):
+ self.cli_set(svc_path + ['expiry-time', expiry_time_bad])
self.cli_commit()
- self.cli_set(base_path + svc_path + ['expiry-time', expiry_time_good])
+ self.cli_set(svc_path + ['expiry-time', expiry_time_good])
# commit changes
self.cli_commit()
@@ -152,25 +154,25 @@ class TestServiceDDNS(VyOSUnitTestSHIM.TestCase):
# IPv4+IPv6 dual DDNS service configuration
def test_03_dyndns_service_dual_stack(self):
- svc_path = ['address', interface, 'service']
services = {'cloudflare': {'protocol': 'cloudflare', 'zone': zone},
'freedns': {'protocol': 'freedns', 'username': username},
'google': {'protocol': 'googledomains', 'username': username}}
ip_version = 'both'
for name, details in services.items():
- self.cli_set(base_path + svc_path + [name, 'host-name', hostname])
- self.cli_set(base_path + svc_path + [name, 'password', password])
+ self.cli_set(name_path + [name, 'address', interface])
+ self.cli_set(name_path + [name, 'host-name', hostname])
+ self.cli_set(name_path + [name, 'password', password])
for opt, value in details.items():
- self.cli_set(base_path + svc_path + [name, opt, value])
+ self.cli_set(name_path + [name, opt, value])
# Dual stack is supported by 'cloudfare' and 'freedns' but not 'googledomains'
# exception is raised for unsupported ones
- self.cli_set(base_path + svc_path + [name, 'ip-version', ip_version])
+ self.cli_set(name_path + [name, 'ip-version', ip_version])
if details['protocol'] not in ['cloudflare', 'freedns']:
with self.assertRaises(ConfigSessionError):
self.cli_commit()
- self.cli_delete(base_path + svc_path + [name, 'ip-version'])
+ self.cli_delete(name_path + [name, 'ip-version'])
# commit changes
self.cli_commit()
@@ -197,16 +199,19 @@ class TestServiceDDNS(VyOSUnitTestSHIM.TestCase):
def test_04_dyndns_rfc2136(self):
# Check if DDNS service can be configured and runs
- svc_path = ['address', interface, 'rfc2136', 'vyos']
+ svc_path = name_path + ['vyos']
+ proto = 'nsupdate'
with tempfile.NamedTemporaryFile(prefix='/config/auth/') as key_file:
key_file.write(b'S3cretKey')
- self.cli_set(base_path + svc_path + ['server', server])
- self.cli_set(base_path + svc_path + ['zone', zone])
- self.cli_set(base_path + svc_path + ['key', key_file.name])
- self.cli_set(base_path + svc_path + ['ttl', ttl])
- self.cli_set(base_path + svc_path + ['host-name', hostname])
+ self.cli_set(svc_path + ['address', interface])
+ self.cli_set(svc_path + ['protocol', proto])
+ self.cli_set(svc_path + ['server', server])
+ self.cli_set(svc_path + ['zone', zone])
+ self.cli_set(svc_path + ['key', key_file.name])
+ self.cli_set(svc_path + ['ttl', ttl])
+ self.cli_set(svc_path + ['host-name', hostname])
# commit changes
self.cli_commit()
@@ -215,7 +220,7 @@ class TestServiceDDNS(VyOSUnitTestSHIM.TestCase):
ddclient_conf = cmd(f'sudo cat {DDCLIENT_CONF}')
self.assertIn(f'use=if', ddclient_conf)
self.assertIn(f'if={interface}', ddclient_conf)
- self.assertIn(f'protocol=nsupdate', ddclient_conf)
+ self.assertIn(f'protocol={proto}', ddclient_conf)
self.assertIn(f'server={server}', ddclient_conf)
self.assertIn(f'zone={zone}', ddclient_conf)
self.assertIn(f'password=\'{key_file.name}\'', ddclient_conf)
@@ -223,16 +228,17 @@ class TestServiceDDNS(VyOSUnitTestSHIM.TestCase):
def test_05_dyndns_hostname(self):
# Check if DDNS service can be configured and runs
- svc_path = ['address', interface, 'service', 'namecheap']
+ svc_path = name_path + ['namecheap']
proto = 'namecheap'
hostnames = ['@', 'www', hostname, f'@.{hostname}']
for name in hostnames:
- self.cli_set(base_path + svc_path + ['protocol', proto])
- self.cli_set(base_path + svc_path + ['server', server])
- self.cli_set(base_path + svc_path + ['username', username])
- self.cli_set(base_path + svc_path + ['password', password])
- self.cli_set(base_path + svc_path + ['host-name', name])
+ self.cli_set(svc_path + ['address', interface])
+ self.cli_set(svc_path + ['protocol', proto])
+ self.cli_set(svc_path + ['server', server])
+ self.cli_set(svc_path + ['username', username])
+ self.cli_set(svc_path + ['password', password])
+ self.cli_set(svc_path + ['host-name', name])
# commit changes
self.cli_commit()
@@ -247,42 +253,32 @@ class TestServiceDDNS(VyOSUnitTestSHIM.TestCase):
def test_06_dyndns_web_options(self):
# Check if DDNS service can be configured and runs
- base_path_iface = base_path + ['address', interface]
- base_path_web = base_path + ['address', 'web']
- svc_path_iface = base_path_iface + ['service', 'cloudflare']
- svc_path_web = base_path_web + ['service', 'cloudflare']
+ svc_path = name_path + ['cloudflare']
proto = 'cloudflare'
web_url_good = 'https://ifconfig.me/ip'
web_url_bad = 'http:/ifconfig.me/ip'
- self.cli_set(svc_path_iface + ['protocol', proto])
- self.cli_set(svc_path_iface + ['zone', zone])
- self.cli_set(svc_path_iface + ['password', password])
- self.cli_set(svc_path_iface + ['host-name', hostname])
- self.cli_set(base_path_iface + ['web-options', 'url', web_url_good])
+ self.cli_set(svc_path + ['protocol', proto])
+ self.cli_set(svc_path + ['zone', zone])
+ self.cli_set(svc_path + ['password', password])
+ self.cli_set(svc_path + ['host-name', hostname])
+ self.cli_set(svc_path + ['web-options', 'url', web_url_good])
# web-options is supported only with web service based address lookup
# exception is raised for interface based address lookup
with self.assertRaises(ConfigSessionError):
+ self.cli_set(svc_path + ['address', interface])
self.cli_commit()
- self.cli_delete(base_path_iface + ['web-options'])
+ self.cli_set(svc_path + ['address', 'web'])
# commit changes
self.cli_commit()
- # web-options is supported with web service based address lookup
- # this should work, but clear interface based config first
- self.cli_delete(base_path_iface)
- self.cli_set(svc_path_web + ['protocol', proto])
- self.cli_set(svc_path_web + ['zone', zone])
- self.cli_set(svc_path_web + ['password', password])
- self.cli_set(svc_path_web + ['host-name', hostname])
-
# web-options must be a valid URL
- with self.assertRaises(ConfigSessionError) as cm:
- self.cli_set(base_path_web + ['web-options', 'url', web_url_bad])
- self.assertIn(f'"{web_url_bad.removeprefix("http:")}" is not a valid URI', str(cm.exception))
- self.cli_set(base_path_web + ['web-options', 'url', web_url_good])
+ with self.assertRaises(ConfigSessionError):
+ self.cli_set(svc_path + ['web-options', 'url', web_url_bad])
+ self.cli_commit()
+ self.cli_set(svc_path + ['web-options', 'url', web_url_good])
# commit changes
self.cli_commit()
@@ -300,15 +296,17 @@ class TestServiceDDNS(VyOSUnitTestSHIM.TestCase):
# Table number randomized, but should be within range 100-65535
vrf_table = "".join(random.choices(string.digits, k=4))
vrf_name = f'vyos-test-{vrf_table}'
- svc_path = ['address', interface, 'service', 'cloudflare']
+ svc_path = name_path + ['cloudflare']
+ proto = 'cloudflare'
self.cli_set(['vrf', 'name', vrf_name, 'table', vrf_table])
self.cli_set(base_path + ['vrf', vrf_name])
- self.cli_set(base_path + svc_path + ['protocol', 'cloudflare'])
- self.cli_set(base_path + svc_path + ['host-name', hostname])
- self.cli_set(base_path + svc_path + ['zone', zone])
- self.cli_set(base_path + svc_path + ['password', password])
+ self.cli_set(svc_path + ['address', interface])
+ self.cli_set(svc_path + ['protocol', proto])
+ self.cli_set(svc_path + ['host-name', hostname])
+ self.cli_set(svc_path + ['zone', zone])
+ self.cli_set(svc_path + ['password', password])
# commit changes
self.cli_commit()
diff --git a/smoketest/scripts/cli/test_service_https.py b/smoketest/scripts/cli/test_service_https.py
index 1ae5c104c..6cb91bcf1 100755
--- a/smoketest/scripts/cli/test_service_https.py
+++ b/smoketest/scripts/cli/test_service_https.py
@@ -15,6 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
+import json
from requests import request
from urllib3.exceptions import InsecureRequestWarning
@@ -22,7 +23,9 @@ from urllib3.exceptions import InsecureRequestWarning
from base_vyostest_shim import VyOSUnitTestSHIM
from base_vyostest_shim import ignore_warning
from vyos.utils.file import read_file
-from vyos.utils.process import run
+from vyos.utils.process import process_named_running
+
+from vyos.configsession import ConfigSessionError
base_path = ['service', 'https']
pki_base = ['pki']
@@ -48,24 +51,25 @@ MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgPLpD0Ohhoq0g4nhx
u8/3jHMM7sDwL3aWzW/zp54/LhCWUoLMjDdDEEigK4fal4ZF9aA9F0Ww
"""
+PROCESS_NAME = 'nginx'
+
class TestHTTPSService(VyOSUnitTestSHIM.TestCase):
- def setUp(self):
+ @classmethod
+ def setUpClass(cls):
+ super(TestHTTPSService, cls).setUpClass()
+
# ensure we can also run this test on a live system - so lets clean
# out the current configuration :)
- self.cli_delete(base_path)
- self.cli_delete(pki_base)
+ cls.cli_delete(cls, base_path)
+ cls.cli_delete(cls, pki_base)
def tearDown(self):
self.cli_delete(base_path)
self.cli_delete(pki_base)
self.cli_commit()
- def test_default(self):
- self.cli_set(base_path)
- self.cli_commit()
-
- ret = run('sudo /usr/sbin/nginx -t')
- self.assertEqual(ret, 0)
+ # Check for stopped process
+ self.assertFalse(process_named_running(PROCESS_NAME))
def test_server_block(self):
vhost_id = 'example'
@@ -76,17 +80,15 @@ class TestHTTPSService(VyOSUnitTestSHIM.TestCase):
test_path = base_path + ['virtual-host', vhost_id]
self.cli_set(test_path + ['listen-address', address])
- self.cli_set(test_path + ['listen-port', port])
+ self.cli_set(test_path + ['port', port])
self.cli_set(test_path + ['server-name', name])
self.cli_commit()
- ret = run('sudo /usr/sbin/nginx -t')
- self.assertEqual(ret, 0)
-
nginx_config = read_file('/etc/nginx/sites-enabled/default')
self.assertIn(f'listen {address}:{port} ssl;', nginx_config)
self.assertIn(f'ssl_protocols TLSv1.2 TLSv1.3;', nginx_config)
+ self.assertTrue(process_named_running(PROCESS_NAME))
def test_certificate(self):
self.cli_set(pki_base + ['certificate', 'test_https', 'certificate', cert_data.replace('\n','')])
@@ -95,24 +97,28 @@ class TestHTTPSService(VyOSUnitTestSHIM.TestCase):
self.cli_set(base_path + ['certificates', 'certificate', 'test_https'])
self.cli_commit()
+ self.assertTrue(process_named_running(PROCESS_NAME))
- ret = run('sudo /usr/sbin/nginx -t')
- self.assertEqual(ret, 0)
+ def test_api_missing_keys(self):
+ self.cli_set(base_path + ['api'])
+ self.assertRaises(ConfigSessionError, self.cli_commit)
+
+ def test_api_incomplete_key(self):
+ self.cli_set(base_path + ['api', 'keys', 'id', 'key-01'])
+ self.assertRaises(ConfigSessionError, self.cli_commit)
@ignore_warning(InsecureRequestWarning)
def test_api_auth(self):
vhost_id = 'example'
address = '127.0.0.1'
- port = '443'
+ port = '443' # default value
name = 'localhost'
- self.cli_set(base_path + ['api', 'socket'])
key = 'MySuperSecretVyOS'
self.cli_set(base_path + ['api', 'keys', 'id', 'key-01', 'key', key])
test_path = base_path + ['virtual-host', vhost_id]
self.cli_set(test_path + ['listen-address', address])
- self.cli_set(test_path + ['listen-port', port])
self.cli_set(test_path + ['server-name', name])
self.cli_commit()
@@ -138,6 +144,13 @@ class TestHTTPSService(VyOSUnitTestSHIM.TestCase):
# Must get HTTP code 401 on missing key (Unauthorized)
self.assertEqual(r.status_code, 401)
+ # Check path config
+ payload = {'data': '{"op": "showConfig", "path": ["system", "login"]}', 'key': f'{key}'}
+ r = request('POST', url, verify=False, headers=headers, data=payload)
+ response = r.json()
+ vyos_user_exists = 'vyos' in response.get('data', {}).get('user', {})
+ self.assertTrue(vyos_user_exists, "The 'vyos' user does not exist in the response.")
+
# GraphQL auth test: a missing key will return status code 400, as
# 'key' is a non-nullable field in the schema; an incorrect key is
# caught by the resolver, and returns success 'False', so one must
@@ -240,5 +253,128 @@ class TestHTTPSService(VyOSUnitTestSHIM.TestCase):
success = r.json()['data']['ShowVersion']['success']
self.assertTrue(success)
+ @ignore_warning(InsecureRequestWarning)
+ def test_api_add_delete(self):
+ address = '127.0.0.1'
+ key = 'VyOS-key'
+ url = f'https://{address}/retrieve'
+ payload = {'data': '{"op": "showConfig", "path": []}', 'key': f'{key}'}
+ headers = {}
+
+ self.cli_set(base_path)
+ self.cli_commit()
+
+ r = request('POST', url, verify=False, headers=headers, data=payload)
+ # api not configured; expect 503
+ self.assertEqual(r.status_code, 503)
+
+ self.cli_set(base_path + ['api', 'keys', 'id', 'key-01', 'key', key])
+ self.cli_commit()
+
+ r = request('POST', url, verify=False, headers=headers, data=payload)
+ # api configured; expect 200
+ self.assertEqual(r.status_code, 200)
+
+ self.cli_delete(base_path + ['api'])
+ self.cli_commit()
+
+ r = request('POST', url, verify=False, headers=headers, data=payload)
+ # api deleted; expect 503
+ self.assertEqual(r.status_code, 503)
+
+ @ignore_warning(InsecureRequestWarning)
+ def test_api_show(self):
+ address = '127.0.0.1'
+ key = 'VyOS-key'
+ url = f'https://{address}/show'
+ headers = {}
+
+ self.cli_set(base_path + ['api', 'keys', 'id', 'key-01', 'key', key])
+ self.cli_commit()
+
+ payload = {
+ 'data': '{"op": "show", "path": ["system", "image"]}',
+ 'key': f'{key}',
+ }
+ r = request('POST', url, verify=False, headers=headers, data=payload)
+ self.assertEqual(r.status_code, 200)
+
+ @ignore_warning(InsecureRequestWarning)
+ def test_api_generate(self):
+ address = '127.0.0.1'
+ key = 'VyOS-key'
+ url = f'https://{address}/generate'
+ headers = {}
+
+ self.cli_set(base_path + ['api', 'keys', 'id', 'key-01', 'key', key])
+ self.cli_commit()
+
+ payload = {
+ 'data': '{"op": "generate", "path": ["macsec", "mka", "cak", "gcm-aes-256"]}',
+ 'key': f'{key}',
+ }
+ r = request('POST', url, verify=False, headers=headers, data=payload)
+ self.assertEqual(r.status_code, 200)
+
+ @ignore_warning(InsecureRequestWarning)
+ def test_api_configure(self):
+ address = '127.0.0.1'
+ key = 'VyOS-key'
+ url = f'https://{address}/configure'
+ headers = {}
+ conf_interface = 'dum0'
+ conf_address = '192.0.2.44/32'
+
+ self.cli_set(base_path + ['api', 'keys', 'id', 'key-01', 'key', key])
+ self.cli_commit()
+
+ payload_path = [
+ "interfaces",
+ "dummy",
+ f"{conf_interface}",
+ "address",
+ f"{conf_address}",
+ ]
+
+ payload = {'data': json.dumps({"op": "set", "path": payload_path}), 'key': key}
+
+ r = request('POST', url, verify=False, headers=headers, data=payload)
+ self.assertEqual(r.status_code, 200)
+
+ @ignore_warning(InsecureRequestWarning)
+ def test_api_config_file(self):
+ address = '127.0.0.1'
+ key = 'VyOS-key'
+ url = f'https://{address}/config-file'
+ headers = {}
+
+ self.cli_set(base_path + ['api', 'keys', 'id', 'key-01', 'key', key])
+ self.cli_commit()
+
+ payload = {
+ 'data': '{"op": "save"}',
+ 'key': f'{key}',
+ }
+ r = request('POST', url, verify=False, headers=headers, data=payload)
+ self.assertEqual(r.status_code, 200)
+
+ @ignore_warning(InsecureRequestWarning)
+ def test_api_reset(self):
+ address = '127.0.0.1'
+ key = 'VyOS-key'
+ url = f'https://{address}/reset'
+ headers = {}
+
+ self.cli_set(base_path + ['api', 'keys', 'id', 'key-01', 'key', key])
+ self.cli_commit()
+
+ payload = {
+ 'data': '{"op": "reset", "path": ["ip", "arp", "table"]}',
+ 'key': f'{key}',
+ }
+ r = request('POST', url, verify=False, headers=headers, data=payload)
+ self.assertEqual(r.status_code, 200)
+
+
if __name__ == '__main__':
- unittest.main(verbosity=2)
+ unittest.main(verbosity=5)
diff --git a/smoketest/scripts/cli/test_vrf.py b/smoketest/scripts/cli/test_vrf.py
index 0f658f366..bb91eddea 100755
--- a/smoketest/scripts/cli/test_vrf.py
+++ b/smoketest/scripts/cli/test_vrf.py
@@ -489,4 +489,4 @@ class VRFTest(VyOSUnitTestSHIM.TestCase):
if __name__ == '__main__':
- unittest.main(verbosity=2, failfast=True)
+ unittest.main(verbosity=2)