summaryrefslogtreecommitdiff
path: root/smoketest/scripts/cli
diff options
context:
space:
mode:
Diffstat (limited to 'smoketest/scripts/cli')
-rw-r--r--smoketest/scripts/cli/base_interfaces_test.py124
-rw-r--r--smoketest/scripts/cli/base_vyostest_shim.py22
-rwxr-xr-xsmoketest/scripts/cli/test_container.py156
-rwxr-xr-xsmoketest/scripts/cli/test_firewall.py132
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_bonding.py13
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_bridge.py15
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_loopback.py5
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_virtual-ethernet.py5
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_vxlan.py26
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_wireless.py25
-rwxr-xr-xsmoketest/scripts/cli/test_load-balancing_haproxy.py186
-rwxr-xr-xsmoketest/scripts/cli/test_load-balancing_wan.py156
-rwxr-xr-xsmoketest/scripts/cli/test_nat66.py29
-rwxr-xr-xsmoketest/scripts/cli/test_policy.py15
-rwxr-xr-xsmoketest/scripts/cli/test_policy_route.py34
-rwxr-xr-xsmoketest/scripts/cli/test_protocols_babel.py4
-rwxr-xr-xsmoketest/scripts/cli/test_protocols_bgp.py242
-rwxr-xr-xsmoketest/scripts/cli/test_protocols_isis.py9
-rwxr-xr-xsmoketest/scripts/cli/test_protocols_mpls.py71
-rwxr-xr-xsmoketest/scripts/cli/test_protocols_nhrp.py3
-rwxr-xr-xsmoketest/scripts/cli/test_protocols_ospf.py4
-rwxr-xr-xsmoketest/scripts/cli/test_protocols_rip.py4
-rwxr-xr-xsmoketest/scripts/cli/test_protocols_rpki.py36
-rwxr-xr-xsmoketest/scripts/cli/test_service_dhcp-server.py1104
-rwxr-xr-xsmoketest/scripts/cli/test_service_dhcpv6-server.py5
-rwxr-xr-xsmoketest/scripts/cli/test_service_https.py24
-rwxr-xr-xsmoketest/scripts/cli/test_service_ids_ddos-protection.py116
-rwxr-xr-xsmoketest/scripts/cli/test_service_ipoe-server.py26
-rwxr-xr-xsmoketest/scripts/cli/test_service_lldp.py49
-rwxr-xr-xsmoketest/scripts/cli/test_service_router-advert.py112
-rwxr-xr-xsmoketest/scripts/cli/test_service_webproxy.py18
-rwxr-xr-xsmoketest/scripts/cli/test_system_login.py18
-rwxr-xr-xsmoketest/scripts/cli/test_system_option.py62
-rwxr-xr-xsmoketest/scripts/cli/test_system_syslog.py293
-rwxr-xr-xsmoketest/scripts/cli/test_vpn_ipsec.py88
35 files changed, 2595 insertions, 636 deletions
diff --git a/smoketest/scripts/cli/base_interfaces_test.py b/smoketest/scripts/cli/base_interfaces_test.py
index c19bfcfe2..5348b0cc3 100644
--- a/smoketest/scripts/cli/base_interfaces_test.py
+++ b/smoketest/scripts/cli/base_interfaces_test.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2019-2024 VyOS maintainers and contributors
+# Copyright (C) 2019-2025 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -14,9 +14,11 @@
import re
+from json import loads
from netifaces import AF_INET
from netifaces import AF_INET6
from netifaces import ifaddresses
+from systemd import journal
from base_vyostest_shim import VyOSUnitTestSHIM
from base_vyostest_shim import CSTORE_GUARD_TIME
@@ -38,12 +40,15 @@ from vyos.utils.network import is_intf_addr_assigned
from vyos.utils.network import is_ipv6_link_local
from vyos.utils.network import get_nft_vrf_zone_mapping
from vyos.xml_ref import cli_defined
+from vyos.xml_ref import default_value
dhclient_base_dir = directories['isc_dhclient_dir']
dhclient_process_name = 'dhclient'
dhcp6c_base_dir = directories['dhcp6_client_dir']
dhcp6c_process_name = 'dhcp6c'
+MSG_TESTCASE_UNSUPPORTED = 'unsupported on interface family'
+
server_ca_root_cert_data = """
MIIBcTCCARagAwIBAgIUDcAf1oIQV+6WRaW7NPcSnECQ/lUwCgYIKoZIzj0EAwIw
HjEcMBoGA1UEAwwTVnlPUyBzZXJ2ZXIgcm9vdCBDQTAeFw0yMjAyMTcxOTQxMjBa
@@ -134,6 +139,7 @@ def is_mirrored_to(interface, mirror_if, qdisc):
if mirror_if in tmp:
ret_val = True
return ret_val
+
class BasicInterfaceTest:
class TestCase(VyOSUnitTestSHIM.TestCase):
_test_dhcp = False
@@ -217,7 +223,7 @@ class BasicInterfaceTest:
def test_dhcp_disable_interface(self):
if not self._test_dhcp:
- self.skipTest('not supported')
+ self.skipTest(MSG_TESTCASE_UNSUPPORTED)
# When interface is configured as admin down, it must be admin down
# even when dhcpc starts on the given interface
@@ -240,7 +246,7 @@ class BasicInterfaceTest:
def test_dhcp_client_options(self):
if not self._test_dhcp or not self._test_vrf:
- self.skipTest('not supported')
+ self.skipTest(MSG_TESTCASE_UNSUPPORTED)
client_id = 'VyOS-router'
distance = '100'
@@ -280,7 +286,10 @@ class BasicInterfaceTest:
def test_dhcp_vrf(self):
if not self._test_dhcp or not self._test_vrf:
- self.skipTest('not supported')
+ self.skipTest(MSG_TESTCASE_UNSUPPORTED)
+
+ cli_default_metric = default_value(self._base_path + [self._interfaces[0],
+ 'dhcp-options', 'default-route-distance'])
vrf_name = 'purple4'
self.cli_set(['vrf', 'name', vrf_name, 'table', '65000'])
@@ -307,13 +316,34 @@ class BasicInterfaceTest:
self.assertIn(str(dhclient_pid), vrf_pids)
# and the commandline has the appropriate options
cmdline = read_file(f'/proc/{dhclient_pid}/cmdline')
- self.assertIn('-e\x00IF_METRIC=210', cmdline) # 210 is the default value
+ self.assertIn(f'-e\x00IF_METRIC={cli_default_metric}', cmdline)
+
+ # T5103: remove interface from VRF instance and move DHCP client
+ # back to default VRF. This must restart the DHCP client process
+ for interface in self._interfaces:
+ self.cli_delete(self._base_path + [interface, 'vrf'])
+
+ self.cli_commit()
+
+ # Validate interface state
+ for interface in self._interfaces:
+ tmp = get_interface_vrf(interface)
+ self.assertEqual(tmp, 'default')
+ # Check if dhclient process runs
+ dhclient_pid = process_named_running(dhclient_process_name, cmdline=interface, timeout=10)
+ self.assertTrue(dhclient_pid)
+ # .. inside the appropriate VRF instance
+ vrf_pids = cmd(f'ip vrf pids {vrf_name}')
+ self.assertNotIn(str(dhclient_pid), vrf_pids)
+ # and the commandline has the appropriate options
+ cmdline = read_file(f'/proc/{dhclient_pid}/cmdline')
+ self.assertIn(f'-e\x00IF_METRIC={cli_default_metric}', cmdline)
self.cli_delete(['vrf', 'name', vrf_name])
def test_dhcpv6_vrf(self):
if not self._test_ipv6_dhcpc6 or not self._test_vrf:
- self.skipTest('not supported')
+ self.skipTest(MSG_TESTCASE_UNSUPPORTED)
vrf_name = 'purple6'
self.cli_set(['vrf', 'name', vrf_name, 'table', '65001'])
@@ -341,11 +371,31 @@ class BasicInterfaceTest:
vrf_pids = cmd(f'ip vrf pids {vrf_name}')
self.assertIn(str(tmp), vrf_pids)
+ # T7135: remove interface from VRF instance and move DHCP client
+ # back to default VRF. This must restart the DHCP client process
+ for interface in self._interfaces:
+ self.cli_delete(self._base_path + [interface, 'vrf'])
+
+ self.cli_commit()
+
+ # Validate interface state
+ for interface in self._interfaces:
+ tmp = get_interface_vrf(interface)
+ self.assertEqual(tmp, 'default')
+
+ # Check if dhclient process runs
+ tmp = process_named_running(dhcp6c_process_name, cmdline=interface, timeout=10)
+ self.assertTrue(tmp)
+ # .. inside the appropriate VRF instance
+ vrf_pids = cmd(f'ip vrf pids {vrf_name}')
+ self.assertNotIn(str(tmp), vrf_pids)
+
+
self.cli_delete(['vrf', 'name', vrf_name])
def test_move_interface_between_vrf_instances(self):
if not self._test_vrf:
- self.skipTest('not supported')
+ self.skipTest(MSG_TESTCASE_UNSUPPORTED)
vrf1_name = 'smoketest_mgmt1'
vrf1_table = '5424'
@@ -390,7 +440,7 @@ class BasicInterfaceTest:
def test_add_to_invalid_vrf(self):
if not self._test_vrf:
- self.skipTest('not supported')
+ self.skipTest(MSG_TESTCASE_UNSUPPORTED)
# move interface into first VRF
for interface in self._interfaces:
@@ -408,7 +458,7 @@ class BasicInterfaceTest:
def test_span_mirror(self):
if not self._mirror_interfaces:
- self.skipTest('not supported')
+ self.skipTest(MSG_TESTCASE_UNSUPPORTED)
# Check the two-way mirror rules of ingress and egress
for mirror in self._mirror_interfaces:
@@ -517,7 +567,7 @@ class BasicInterfaceTest:
def test_ipv6_link_local_address(self):
# Common function for IPv6 link-local address assignemnts
if not self._test_ipv6:
- self.skipTest('not supported')
+ self.skipTest(MSG_TESTCASE_UNSUPPORTED)
for interface in self._interfaces:
base = self._base_path + [interface]
@@ -548,7 +598,7 @@ class BasicInterfaceTest:
def test_interface_mtu(self):
if not self._test_mtu:
- self.skipTest('not supported')
+ self.skipTest(MSG_TESTCASE_UNSUPPORTED)
for intf in self._interfaces:
base = self._base_path + [intf]
@@ -567,8 +617,8 @@ class BasicInterfaceTest:
def test_mtu_1200_no_ipv6_interface(self):
# Testcase if MTU can be changed to 1200 on non IPv6
# enabled interfaces
- if not self._test_mtu:
- self.skipTest('not supported')
+ if not self._test_mtu or not self._test_ipv6:
+ self.skipTest(MSG_TESTCASE_UNSUPPORTED)
old_mtu = self._mtu
self._mtu = '1200'
@@ -604,7 +654,7 @@ class BasicInterfaceTest:
# which creates a wlan0 and wlan1 interface which will fail the
# tearDown() test in the end that no interface is allowed to survive!
if not self._test_vlan:
- self.skipTest('not supported')
+ self.skipTest(MSG_TESTCASE_UNSUPPORTED)
for interface in self._interfaces:
base = self._base_path + [interface]
@@ -649,7 +699,7 @@ class BasicInterfaceTest:
# which creates a wlan0 and wlan1 interface which will fail the
# tearDown() test in the end that no interface is allowed to survive!
if not self._test_vlan or not self._test_mtu:
- self.skipTest('not supported')
+ self.skipTest(MSG_TESTCASE_UNSUPPORTED)
mtu_1500 = '1500'
mtu_9000 = '9000'
@@ -695,7 +745,7 @@ class BasicInterfaceTest:
# which creates a wlan0 and wlan1 interface which will fail the
# tearDown() test in the end that no interface is allowed to survive!
if not self._test_vlan:
- self.skipTest('not supported')
+ self.skipTest(MSG_TESTCASE_UNSUPPORTED)
for interface in self._interfaces:
base = self._base_path + [interface]
@@ -765,7 +815,7 @@ class BasicInterfaceTest:
def test_vif_8021q_lower_up_down(self):
# Testcase for https://vyos.dev/T3349
if not self._test_vlan:
- self.skipTest('not supported')
+ self.skipTest(MSG_TESTCASE_UNSUPPORTED)
for interface in self._interfaces:
base = self._base_path + [interface]
@@ -805,7 +855,7 @@ class BasicInterfaceTest:
# which creates a wlan0 and wlan1 interface which will fail the
# tearDown() test in the end that no interface is allowed to survive!
if not self._test_qinq:
- self.skipTest('not supported')
+ self.skipTest(MSG_TESTCASE_UNSUPPORTED)
for interface in self._interfaces:
base = self._base_path + [interface]
@@ -872,7 +922,7 @@ class BasicInterfaceTest:
# which creates a wlan0 and wlan1 interface which will fail the
# tearDown() test in the end that no interface is allowed to survive!
if not self._test_qinq:
- self.skipTest('not supported')
+ self.skipTest(MSG_TESTCASE_UNSUPPORTED)
for interface in self._interfaces:
base = self._base_path + [interface]
@@ -910,7 +960,7 @@ class BasicInterfaceTest:
def test_interface_ip_options(self):
if not self._test_ip:
- self.skipTest('not supported')
+ self.skipTest(MSG_TESTCASE_UNSUPPORTED)
arp_tmo = '300'
mss = '1420'
@@ -1012,12 +1062,13 @@ class BasicInterfaceTest:
def test_interface_ipv6_options(self):
if not self._test_ipv6:
- self.skipTest('not supported')
+ self.skipTest(MSG_TESTCASE_UNSUPPORTED)
mss = '1400'
dad_transmits = '10'
accept_dad = '0'
source_validation = 'strict'
+ interface_identifier = '::fffe'
for interface in self._interfaces:
path = self._base_path + [interface]
@@ -1040,6 +1091,9 @@ class BasicInterfaceTest:
if cli_defined(self._base_path + ['ipv6'], 'source-validation'):
self.cli_set(path + ['ipv6', 'source-validation', source_validation])
+ if cli_defined(self._base_path + ['ipv6', 'address'], 'interface-identifier'):
+ self.cli_set(path + ['ipv6', 'address', 'interface-identifier', interface_identifier])
+
self.cli_commit()
for interface in self._interfaces:
@@ -1071,13 +1125,20 @@ class BasicInterfaceTest:
self.assertIn('fib saddr . iif oif 0', line)
self.assertIn('drop', line)
+ if cli_defined(self._base_path + ['ipv6', 'address'], 'interface-identifier'):
+ tmp = cmd(f'ip -j token show dev {interface}')
+ tmp = loads(tmp)[0]
+ self.assertEqual(tmp['token'], interface_identifier)
+ self.assertEqual(tmp['ifname'], interface)
+
+
def test_dhcpv6_client_options(self):
if not self._test_ipv6_dhcpc6:
- self.skipTest('not supported')
+ self.skipTest(MSG_TESTCASE_UNSUPPORTED)
duid_base = 10
for interface in self._interfaces:
- duid = '00:01:00:01:27:71:db:f0:00:50:00:00:00:{}'.format(duid_base)
+ duid = f'00:01:00:01:27:71:db:f0:00:50:00:00:00:{duid_base}'
path = self._base_path + [interface]
for option in self._options.get(interface, []):
self.cli_set(path + option.split())
@@ -1094,7 +1155,7 @@ class BasicInterfaceTest:
duid_base = 10
for interface in self._interfaces:
- duid = '00:01:00:01:27:71:db:f0:00:50:00:00:00:{}'.format(duid_base)
+ duid = f'00:01:00:01:27:71:db:f0:00:50:00:00:00:{duid_base}'
dhcpc6_config = read_file(f'{dhcp6c_base_dir}/dhcp6c.{interface}.conf')
self.assertIn(f'interface {interface} ' + '{', dhcpc6_config)
self.assertIn(f' request domain-name-servers;', dhcpc6_config)
@@ -1106,16 +1167,25 @@ class BasicInterfaceTest:
self.assertIn('};', dhcpc6_config)
duid_base += 1
+ # T7058: verify daemon has no problems understanding the custom DUID option
+ j = journal.Reader()
+ j.this_boot()
+ j.add_match(_SYSTEMD_UNIT=f'dhcp6c@{interface}.service')
+ for entry in j:
+ self.assertNotIn('yyerror0', entry.get('MESSAGE', ''))
+ self.assertNotIn('syntax error', entry.get('MESSAGE', ''))
+
# Better ask the process about it's commandline in the future
pid = process_named_running(dhcp6c_process_name, cmdline=interface, timeout=10)
self.assertTrue(pid)
+ # DHCPv6 option "no-release" requires "-n" daemon startup option
dhcp6c_options = read_file(f'/proc/{pid}/cmdline')
self.assertIn('-n', dhcp6c_options)
def test_dhcpv6pd_auto_sla_id(self):
if not self._test_ipv6_pd:
- self.skipTest('not supported')
+ self.skipTest(MSG_TESTCASE_UNSUPPORTED)
prefix_len = '56'
sla_len = str(64 - int(prefix_len))
@@ -1176,7 +1246,7 @@ class BasicInterfaceTest:
def test_dhcpv6pd_manual_sla_id(self):
if not self._test_ipv6_pd:
- self.skipTest('not supported')
+ self.skipTest(MSG_TESTCASE_UNSUPPORTED)
prefix_len = '56'
sla_len = str(64 - int(prefix_len))
@@ -1242,7 +1312,7 @@ class BasicInterfaceTest:
def test_eapol(self):
if not self._test_eapol:
- self.skipTest('not supported')
+ self.skipTest(MSG_TESTCASE_UNSUPPORTED)
cfg_dir = '/run/wpa_supplicant'
diff --git a/smoketest/scripts/cli/base_vyostest_shim.py b/smoketest/scripts/cli/base_vyostest_shim.py
index a54622700..f0674f187 100644
--- a/smoketest/scripts/cli/base_vyostest_shim.py
+++ b/smoketest/scripts/cli/base_vyostest_shim.py
@@ -75,10 +75,11 @@ class VyOSUnitTestSHIM:
cls._session.discard()
cls.fail(cls)
- def cli_set(self, config):
+ def cli_set(self, path, value=None):
if self.debug:
- print('set ' + ' '.join(config))
- self._session.set(config)
+ str = f'set {" ".join(path)} {value}' if value else f'set {" ".join(path)}'
+ print(str)
+ self._session.set(path, value)
def cli_delete(self, config):
if self.debug:
@@ -93,14 +94,18 @@ class VyOSUnitTestSHIM:
def cli_commit(self):
if self.debug:
print('commit')
- self._session.commit()
# During a commit there is a process opening commit_lock, and run()
# returns 0
while run(f'sudo lsof -nP {commit_lock}') == 0:
sleep(0.250)
+ # Return the output of commit
+ # Necessary for testing Warning cases
+ out = self._session.commit()
# Wait for CStore completion for fast non-interactive commits
sleep(self._commit_guard_time)
+ return out
+
def op_mode(self, path : list) -> None:
"""
Execute OP-mode command and return stdout
@@ -182,6 +187,15 @@ class VyOSUnitTestSHIM:
break
self.assertTrue(not matched if inverse else matched, msg=search)
+ def verify_nftables_chain_exists(self, table, chain, inverse=False):
+ try:
+ cmd(f'sudo nft list chain {table} {chain}')
+ if inverse:
+ self.fail(f'Chain exists: {table} {chain}')
+ except OSError:
+ if not inverse:
+ self.fail(f'Chain does not exist: {table} {chain}')
+
# Verify ip rule output
def verify_rules(self, rules_search, inverse=False, addr_family='inet'):
rule_output = cmd(f'ip -family {addr_family} rule show')
diff --git a/smoketest/scripts/cli/test_container.py b/smoketest/scripts/cli/test_container.py
index 36622cad1..daad3a909 100755
--- a/smoketest/scripts/cli/test_container.py
+++ b/smoketest/scripts/cli/test_container.py
@@ -33,11 +33,13 @@ PROCESS_PIDFILE = '/run/vyos-container-{0}.service.pid'
busybox_image = 'busybox:stable'
busybox_image_path = '/usr/share/vyos/busybox-stable.tar'
+
def cmd_to_json(command):
c = cmd(command + ' --format=json')
data = json.loads(c)[0]
return data
+
class TestContainer(VyOSUnitTestSHIM.TestCase):
@classmethod
def setUpClass(cls):
@@ -73,13 +75,26 @@ class TestContainer(VyOSUnitTestSHIM.TestCase):
cont_name = 'c1'
self.cli_set(['interfaces', 'ethernet', 'eth0', 'address', '10.0.2.15/24'])
- self.cli_set(['protocols', 'static', 'route', '0.0.0.0/0', 'next-hop', '10.0.2.2'])
+ self.cli_set(
+ ['protocols', 'static', 'route', '0.0.0.0/0', 'next-hop', '10.0.2.2']
+ )
self.cli_set(['system', 'name-server', '1.1.1.1'])
self.cli_set(['system', 'name-server', '8.8.8.8'])
self.cli_set(base_path + ['name', cont_name, 'image', busybox_image])
self.cli_set(base_path + ['name', cont_name, 'allow-host-networks'])
- self.cli_set(base_path + ['name', cont_name, 'sysctl', 'parameter', 'kernel.msgmax', 'value', '4096'])
+ self.cli_set(
+ base_path
+ + [
+ 'name',
+ cont_name,
+ 'sysctl',
+ 'parameter',
+ 'kernel.msgmax',
+ 'value',
+ '4096',
+ ]
+ )
# commit changes
self.cli_commit()
@@ -95,6 +110,14 @@ class TestContainer(VyOSUnitTestSHIM.TestCase):
tmp = cmd(f'sudo podman exec -it {cont_name} sysctl kernel.msgmax')
self.assertEqual(tmp, 'kernel.msgmax = 4096')
+ def test_log_driver(self):
+ self.cli_set(base_path + ['log-driver', 'journald'])
+
+ self.cli_commit()
+
+ tmp = cmd('podman info --format "{{ .Host.LogDriver }}"')
+ self.assertEqual(tmp, 'journald')
+
def test_name_server(self):
cont_name = 'dns-test'
net_name = 'net-test'
@@ -105,7 +128,17 @@ class TestContainer(VyOSUnitTestSHIM.TestCase):
self.cli_set(base_path + ['name', cont_name, 'image', busybox_image])
self.cli_set(base_path + ['name', cont_name, 'name-server', name_server])
- self.cli_set(base_path + ['name', cont_name, 'network', net_name, 'address', str(ip_interface(prefix).ip + 2)])
+ self.cli_set(
+ base_path
+ + [
+ 'name',
+ cont_name,
+ 'network',
+ net_name,
+ 'address',
+ str(ip_interface(prefix).ip + 2),
+ ]
+ )
# verify() - name server has no effect when container network has dns enabled
with self.assertRaises(ConfigSessionError):
@@ -146,7 +179,17 @@ class TestContainer(VyOSUnitTestSHIM.TestCase):
for ii in range(1, 6):
name = f'{base_name}-{ii}'
self.cli_set(base_path + ['name', name, 'image', busybox_image])
- self.cli_set(base_path + ['name', name, 'network', net_name, 'address', str(ip_interface(prefix).ip + ii)])
+ self.cli_set(
+ base_path
+ + [
+ 'name',
+ name,
+ 'network',
+ net_name,
+ 'address',
+ str(ip_interface(prefix).ip + ii),
+ ]
+ )
# verify() - first IP address of a prefix can not be used by a container
with self.assertRaises(ConfigSessionError):
@@ -163,8 +206,14 @@ class TestContainer(VyOSUnitTestSHIM.TestCase):
for ii in range(2, 6):
name = f'{base_name}-{ii}'
c = cmd_to_json(f'sudo podman container inspect {name}')
- self.assertEqual(c['NetworkSettings']['Networks'][net_name]['Gateway'] , str(ip_interface(prefix).ip + 1))
- self.assertEqual(c['NetworkSettings']['Networks'][net_name]['IPAddress'], str(ip_interface(prefix).ip + ii))
+ self.assertEqual(
+ c['NetworkSettings']['Networks'][net_name]['Gateway'],
+ str(ip_interface(prefix).ip + 1),
+ )
+ self.assertEqual(
+ c['NetworkSettings']['Networks'][net_name]['IPAddress'],
+ str(ip_interface(prefix).ip + ii),
+ )
def test_ipv6_network(self):
prefix = '2001:db8::/64'
@@ -176,7 +225,17 @@ class TestContainer(VyOSUnitTestSHIM.TestCase):
for ii in range(1, 6):
name = f'{base_name}-{ii}'
self.cli_set(base_path + ['name', name, 'image', busybox_image])
- self.cli_set(base_path + ['name', name, 'network', net_name, 'address', str(ip_interface(prefix).ip + ii)])
+ self.cli_set(
+ base_path
+ + [
+ 'name',
+ name,
+ 'network',
+ net_name,
+ 'address',
+ str(ip_interface(prefix).ip + ii),
+ ]
+ )
# verify() - first IP address of a prefix can not be used by a container
with self.assertRaises(ConfigSessionError):
@@ -193,8 +252,14 @@ class TestContainer(VyOSUnitTestSHIM.TestCase):
for ii in range(2, 6):
name = f'{base_name}-{ii}'
c = cmd_to_json(f'sudo podman container inspect {name}')
- self.assertEqual(c['NetworkSettings']['Networks'][net_name]['IPv6Gateway'] , str(ip_interface(prefix).ip + 1))
- self.assertEqual(c['NetworkSettings']['Networks'][net_name]['GlobalIPv6Address'], str(ip_interface(prefix).ip + ii))
+ self.assertEqual(
+ c['NetworkSettings']['Networks'][net_name]['IPv6Gateway'],
+ str(ip_interface(prefix).ip + 1),
+ )
+ self.assertEqual(
+ c['NetworkSettings']['Networks'][net_name]['GlobalIPv6Address'],
+ str(ip_interface(prefix).ip + ii),
+ )
def test_dual_stack_network(self):
prefix4 = '192.0.2.0/24'
@@ -208,8 +273,28 @@ class TestContainer(VyOSUnitTestSHIM.TestCase):
for ii in range(1, 6):
name = f'{base_name}-{ii}'
self.cli_set(base_path + ['name', name, 'image', busybox_image])
- self.cli_set(base_path + ['name', name, 'network', net_name, 'address', str(ip_interface(prefix4).ip + ii)])
- self.cli_set(base_path + ['name', name, 'network', net_name, 'address', str(ip_interface(prefix6).ip + ii)])
+ self.cli_set(
+ base_path
+ + [
+ 'name',
+ name,
+ 'network',
+ net_name,
+ 'address',
+ str(ip_interface(prefix4).ip + ii),
+ ]
+ )
+ self.cli_set(
+ base_path
+ + [
+ 'name',
+ name,
+ 'network',
+ net_name,
+ 'address',
+ str(ip_interface(prefix6).ip + ii),
+ ]
+ )
# verify() - first IP address of a prefix can not be used by a container
with self.assertRaises(ConfigSessionError):
@@ -227,10 +312,22 @@ class TestContainer(VyOSUnitTestSHIM.TestCase):
for ii in range(2, 6):
name = f'{base_name}-{ii}'
c = cmd_to_json(f'sudo podman container inspect {name}')
- self.assertEqual(c['NetworkSettings']['Networks'][net_name]['IPv6Gateway'] , str(ip_interface(prefix6).ip + 1))
- self.assertEqual(c['NetworkSettings']['Networks'][net_name]['GlobalIPv6Address'], str(ip_interface(prefix6).ip + ii))
- self.assertEqual(c['NetworkSettings']['Networks'][net_name]['Gateway'] , str(ip_interface(prefix4).ip + 1))
- self.assertEqual(c['NetworkSettings']['Networks'][net_name]['IPAddress'] , str(ip_interface(prefix4).ip + ii))
+ self.assertEqual(
+ c['NetworkSettings']['Networks'][net_name]['IPv6Gateway'],
+ str(ip_interface(prefix6).ip + 1),
+ )
+ self.assertEqual(
+ c['NetworkSettings']['Networks'][net_name]['GlobalIPv6Address'],
+ str(ip_interface(prefix6).ip + ii),
+ )
+ self.assertEqual(
+ c['NetworkSettings']['Networks'][net_name]['Gateway'],
+ str(ip_interface(prefix4).ip + 1),
+ )
+ self.assertEqual(
+ c['NetworkSettings']['Networks'][net_name]['IPAddress'],
+ str(ip_interface(prefix4).ip + ii),
+ )
def test_no_name_server(self):
prefix = '192.0.2.0/24'
@@ -242,7 +339,17 @@ class TestContainer(VyOSUnitTestSHIM.TestCase):
name = f'{base_name}-2'
self.cli_set(base_path + ['name', name, 'image', busybox_image])
- self.cli_set(base_path + ['name', name, 'network', net_name, 'address', str(ip_interface(prefix).ip + 2)])
+ self.cli_set(
+ base_path
+ + [
+ 'name',
+ name,
+ 'network',
+ net_name,
+ 'address',
+ str(ip_interface(prefix).ip + 2),
+ ]
+ )
self.cli_commit()
n = cmd_to_json(f'sudo podman network inspect {net_name}')
@@ -258,7 +365,17 @@ class TestContainer(VyOSUnitTestSHIM.TestCase):
name = f'{base_name}-2'
self.cli_set(base_path + ['name', name, 'image', busybox_image])
- self.cli_set(base_path + ['name', name, 'network', net_name, 'address', str(ip_interface(prefix).ip + 2)])
+ self.cli_set(
+ base_path
+ + [
+ 'name',
+ name,
+ 'network',
+ net_name,
+ 'address',
+ str(ip_interface(prefix).ip + 2),
+ ]
+ )
self.cli_commit()
n = cmd_to_json(f'sudo podman network inspect {net_name}')
@@ -298,11 +415,14 @@ class TestContainer(VyOSUnitTestSHIM.TestCase):
self.cli_commit()
# Query API about running containers
- tmp = cmd("sudo curl --unix-socket /run/podman/podman.sock -H 'content-type: application/json' -sf http://localhost/containers/json")
+ tmp = cmd(
+ "sudo curl --unix-socket /run/podman/podman.sock -H 'content-type: application/json' -sf http://localhost/containers/json"
+ )
tmp = json.loads(tmp)
# We expect the same amount of containers from the API that we started above
self.assertEqual(len(container_list), len(tmp))
+
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_firewall.py b/smoketest/scripts/cli/test_firewall.py
index 10301831e..851a15f16 100755
--- a/smoketest/scripts/cli/test_firewall.py
+++ b/smoketest/scripts/cli/test_firewall.py
@@ -119,6 +119,7 @@ class TestFirewall(VyOSUnitTestSHIM.TestCase):
self.cli_set(['firewall', 'group', 'domain-group', 'smoketest_domain', 'address', 'example.com'])
self.cli_set(['firewall', 'group', 'domain-group', 'smoketest_domain', 'address', 'example.org'])
self.cli_set(['firewall', 'group', 'interface-group', 'smoketest_interface', 'interface', 'eth0'])
+ self.cli_set(['firewall', 'group', 'interface-group', 'smoketest_interface', 'interface', 'pod-smoketest'])
self.cli_set(['firewall', 'group', 'interface-group', 'smoketest_interface', 'interface', 'vtun0'])
self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '1', 'action', 'accept'])
@@ -133,6 +134,9 @@ class TestFirewall(VyOSUnitTestSHIM.TestCase):
self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '4', 'action', 'accept'])
self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '4', 'outbound-interface', 'group', '!smoketest_interface'])
+ # Create container network so test won't fail
+ self.cli_set(['container', 'network', 'smoketest', 'prefix', '10.0.0.0/24'])
+
self.cli_commit()
self.wait_for_domain_resolver('ip vyos_filter', 'D_smoketest_domain', '192.0.2.5')
@@ -638,6 +642,10 @@ class TestFirewall(VyOSUnitTestSHIM.TestCase):
self.verify_nftables(nftables_search, 'ip6 vyos_filter')
def test_ipv4_global_state(self):
+ self.cli_set(['firewall', 'flowtable', 'smoketest', 'interface', 'eth0'])
+ self.cli_set(['firewall', 'flowtable', 'smoketest', 'offload', 'software'])
+
+ self.cli_set(['firewall', 'global-options', 'state-policy', 'offload', 'offload-target', 'smoketest'])
self.cli_set(['firewall', 'global-options', 'state-policy', 'established', 'action', 'accept'])
self.cli_set(['firewall', 'global-options', 'state-policy', 'related', 'action', 'accept'])
self.cli_set(['firewall', 'global-options', 'state-policy', 'invalid', 'action', 'drop'])
@@ -647,6 +655,9 @@ class TestFirewall(VyOSUnitTestSHIM.TestCase):
nftables_search = [
['jump VYOS_STATE_POLICY'],
['chain VYOS_STATE_POLICY'],
+ ['jump VYOS_STATE_POLICY_FORWARD'],
+ ['chain VYOS_STATE_POLICY_FORWARD'],
+ ['flow add @VYOS_FLOWTABLE_smoketest'],
['ct state established', 'accept'],
['ct state invalid', 'drop'],
['ct state related', 'accept']
@@ -654,6 +665,13 @@ class TestFirewall(VyOSUnitTestSHIM.TestCase):
self.verify_nftables(nftables_search, 'ip vyos_filter')
+ # T7148 - Ensure bridge rule reject -> drop
+ self.cli_set(['firewall', 'global-options', 'state-policy', 'invalid', 'action', 'reject'])
+ self.cli_commit()
+
+ self.verify_nftables([['ct state invalid', 'reject']], 'ip vyos_filter')
+ self.verify_nftables([['ct state invalid', 'drop']], 'bridge vyos_filter')
+
# Check conntrack is enabled from state-policy
self.verify_nftables_chain([['accept']], 'ip vyos_conntrack', 'FW_CONNTRACK')
self.verify_nftables_chain([['accept']], 'ip6 vyos_conntrack', 'FW_CONNTRACK')
@@ -1167,7 +1185,7 @@ class TestFirewall(VyOSUnitTestSHIM.TestCase):
self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '1', 'jump-target', 'smoketest-ipsec-in4'])
self.cli_set(['firewall', 'ipv4', 'prerouting', 'raw', 'rule', '1', 'action', 'jump'])
self.cli_set(['firewall', 'ipv4', 'prerouting', 'raw', 'rule', '1', 'jump-target', 'smoketest-ipsec-in4'])
-
+
self.cli_set(['firewall', 'ipv4', 'output', 'filter', 'rule', '1', 'action', 'jump'])
self.cli_set(['firewall', 'ipv4', 'output', 'filter', 'rule', '1', 'jump-target', 'smoketest-ipsec-out4'])
self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '1', 'action', 'jump'])
@@ -1202,8 +1220,8 @@ class TestFirewall(VyOSUnitTestSHIM.TestCase):
self.cli_set(['firewall', 'ipv4', 'name', 'smoketest-cycle-3', 'rule', '1', 'action', 'jump'])
self.cli_set(['firewall', 'ipv4', 'name', 'smoketest-cycle-3', 'rule', '1', 'jump-target', 'smoketest-cycle-1'])
- # nft will fail to load cyclic jumps in any form, whether the rule is reachable or not.
- # It should be caught by conf validation.
+ # nft will fail to load cyclic jumps in any form, whether the rule is reachable or not.
+ # It should be caught by conf validation.
with self.assertRaises(ConfigSessionError):
self.cli_commit()
@@ -1262,5 +1280,113 @@ class TestFirewall(VyOSUnitTestSHIM.TestCase):
with self.assertRaises(ConfigSessionError):
self.cli_commit()
+ def test_ipv4_remote_group(self):
+ # Setup base config for test
+ self.cli_set(['firewall', 'group', 'remote-group', 'group01', 'url', 'http://127.0.0.1:80/list.txt'])
+ self.cli_set(['firewall', 'group', 'remote-group', 'group01', 'description', 'Example Group 01'])
+ self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '10', 'action', 'drop'])
+ self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '10', 'protocol', 'tcp'])
+ self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '10', 'destination', 'group', 'remote-group', 'group01'])
+
+ self.cli_commit()
+
+ # Test remote-group had been loaded correctly in nft
+ nftables_search = [
+ ['R_group01'],
+ ['type ipv4_addr'],
+ ['flags interval'],
+ ['meta l4proto', 'daddr @R_group01', 'ipv4-INP-filter-10']
+ ]
+ self.verify_nftables(nftables_search, 'ip vyos_filter')
+
+ # Test remote-group cannot be configured without a URL
+ self.cli_delete(['firewall', 'group', 'remote-group', 'group01', 'url'])
+
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
+ self.cli_discard()
+
+ # Test remote-group cannot be set alongside address in rules
+ self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '10', 'destination', 'address', '127.0.0.1'])
+
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
+ self.cli_discard()
+
+
+ def test_ipv6_remote_group(self):
+ # Setup base config for test
+ self.cli_set(['firewall', 'group', 'remote-group', 'group01', 'url', 'http://127.0.0.1:80/list.txt'])
+ self.cli_set(['firewall', 'group', 'remote-group', 'group01', 'description', 'Example Group 01'])
+ self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '10', 'action', 'drop'])
+ self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '10', 'protocol', 'tcp'])
+ self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '10', 'destination', 'group', 'remote-group', 'group01'])
+
+ self.cli_commit()
+
+ # Test remote-group had been loaded correctly in nft
+ nftables_search = [
+ ['R6_group01'],
+ ['type ipv6_addr'],
+ ['flags interval'],
+ ['meta l4proto', 'daddr @R6_group01', 'ipv6-INP-filter-10']
+ ]
+ self.verify_nftables(nftables_search, 'ip6 vyos_filter')
+
+ # Test remote-group cannot be configured without a URL
+ self.cli_delete(['firewall', 'group', 'remote-group', 'group01', 'url'])
+
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
+ self.cli_discard()
+
+ # Test remote-group cannot be set alongside address in rules
+ self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '10', 'destination', 'address', '2001:db8::1'])
+
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
+ self.cli_discard()
+
+
+ def test_remote_group(self):
+ # Setup base config for test adding remote group to both ipv4 and ipv6 rules
+ self.cli_set(['firewall', 'group', 'remote-group', 'group01', 'url', 'http://127.0.0.1:80/list.txt'])
+ self.cli_set(['firewall', 'group', 'remote-group', 'group01', 'description', 'Example Group 01'])
+ self.cli_set(['firewall', 'ipv4', 'output', 'filter', 'rule', '10', 'action', 'drop'])
+ self.cli_set(['firewall', 'ipv4', 'output', 'filter', 'rule', '10', 'protocol', 'tcp'])
+ self.cli_set(['firewall', 'ipv4', 'output', 'filter', 'rule', '10', 'destination', 'group', 'remote-group', 'group01'])
+ self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '10', 'action', 'drop'])
+ self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '10', 'protocol', 'tcp'])
+ self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '10', 'source', 'group', 'remote-group', 'group01'])
+ self.cli_set(['firewall', 'ipv6', 'output', 'filter', 'rule', '10', 'action', 'drop'])
+ self.cli_set(['firewall', 'ipv6', 'output', 'filter', 'rule', '10', 'protocol', 'tcp'])
+ self.cli_set(['firewall', 'ipv6', 'output', 'filter', 'rule', '10', 'destination', 'group', 'remote-group', 'group01'])
+ self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '10', 'action', 'drop'])
+ self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '10', 'protocol', 'tcp'])
+ self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '10', 'source', 'group', 'remote-group', 'group01'])
+
+ self.cli_commit()
+
+ # Test remote-group had been loaded correctly in nft ip table
+ nftables_v4_search = [
+ ['R_group01'],
+ ['type ipv4_addr'],
+ ['flags interval'],
+ ['meta l4proto', 'daddr @R_group01', 'ipv4-OUT-filter-10'],
+ ['meta l4proto', 'saddr @R_group01', 'ipv4-INP-filter-10'],
+ ]
+ self.verify_nftables(nftables_v4_search, 'ip vyos_filter')
+
+ # Test remote-group had been loaded correctly in nft ip6 table
+ nftables_v6_search = [
+ ['R6_group01'],
+ ['type ipv6_addr'],
+ ['flags interval'],
+ ['meta l4proto', 'daddr @R6_group01', 'ipv6-OUT-filter-10'],
+ ['meta l4proto', 'saddr @R6_group01', 'ipv6-INP-filter-10'],
+ ]
+ self.verify_nftables(nftables_v6_search, 'ip6 vyos_filter')
+
+
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_interfaces_bonding.py b/smoketest/scripts/cli/test_interfaces_bonding.py
index 1a72f9dc4..f99fd0363 100755
--- a/smoketest/scripts/cli/test_interfaces_bonding.py
+++ b/smoketest/scripts/cli/test_interfaces_bonding.py
@@ -167,18 +167,25 @@ class BondingInterfaceTest(BasicInterfaceTest.TestCase):
def test_bonding_multi_use_member(self):
# Define available bonding hash policies
- for interface in ['bond10', 'bond20']:
+ bonds = ['bond10', 'bond20', 'bond30']
+ for interface in bonds:
for member in self._members:
self.cli_set(self._base_path + [interface, 'member', 'interface', member])
# check validate() - can not use the same member interfaces multiple times
with self.assertRaises(ConfigSessionError):
self.cli_commit()
-
- self.cli_delete(self._base_path + ['bond20'])
+ # only keep the first bond interface configuration
+ for interface in bonds[1:]:
+ self.cli_delete(self._base_path + [interface])
self.cli_commit()
+ bond = bonds[0]
+ member_ifaces = read_file(f'/sys/class/net/{bond}/bonding/slaves').split()
+ for member in self._members:
+ self.assertIn(member, member_ifaces)
+
def test_bonding_source_interface(self):
# Re-use member interface that is already a source-interface
bond = 'bond99'
diff --git a/smoketest/scripts/cli/test_interfaces_bridge.py b/smoketest/scripts/cli/test_interfaces_bridge.py
index 54c981adc..4041b3ef3 100755
--- a/smoketest/scripts/cli/test_interfaces_bridge.py
+++ b/smoketest/scripts/cli/test_interfaces_bridge.py
@@ -158,6 +158,21 @@ class BridgeInterfaceTest(BasicInterfaceTest.TestCase):
# verify member is assigned to the bridge
self.assertEqual(interface, tmp['master'])
+ def test_bridge_multi_use_member(self):
+ # Define available bonding hash policies
+ bridges = ['br10', 'br20', 'br30']
+ for interface in bridges:
+ for member in self._members:
+ self.cli_set(self._base_path + [interface, 'member', 'interface', member])
+
+ # check validate() - can not use the same member interfaces multiple times
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
+ # only keep the first bond interface configuration
+ for interface in bridges[1:]:
+ self.cli_delete(self._base_path + [interface])
+
+ self.cli_commit()
def test_add_remove_bridge_member(self):
# Add member interfaces to bridge and set STP cost/priority
diff --git a/smoketest/scripts/cli/test_interfaces_loopback.py b/smoketest/scripts/cli/test_interfaces_loopback.py
index 0454dc658..f4b6038c5 100755
--- a/smoketest/scripts/cli/test_interfaces_loopback.py
+++ b/smoketest/scripts/cli/test_interfaces_loopback.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2020-2023 VyOS maintainers and contributors
+# Copyright (C) 2020-2025 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -17,6 +17,7 @@
import unittest
from base_interfaces_test import BasicInterfaceTest
+from base_interfaces_test import MSG_TESTCASE_UNSUPPORTED
from netifaces import interfaces
from vyos.utils.network import is_intf_addr_assigned
@@ -53,7 +54,7 @@ class LoopbackInterfaceTest(BasicInterfaceTest.TestCase):
self.assertTrue(is_intf_addr_assigned('lo', addr))
def test_interface_disable(self):
- self.skipTest('not supported')
+ self.skipTest(MSG_TESTCASE_UNSUPPORTED)
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_interfaces_virtual-ethernet.py b/smoketest/scripts/cli/test_interfaces_virtual-ethernet.py
index c6a4613a7..b2af86139 100755
--- a/smoketest/scripts/cli/test_interfaces_virtual-ethernet.py
+++ b/smoketest/scripts/cli/test_interfaces_virtual-ethernet.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2023-2024 VyOS maintainers and contributors
+# Copyright (C) 2023-2025 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -34,9 +34,6 @@ class VEthInterfaceTest(BasicInterfaceTest.TestCase):
# call base-classes classmethod
super(VEthInterfaceTest, cls).setUpClass()
- def test_vif_8021q_mtu_limits(self):
- self.skipTest('not supported')
-
# As we always need a pair of veth interfaces, we can not rely on the base
# class check to determine if there is a dhcp6c or dhclient instance running.
# This test will always fail as there is an instance running on the peer
diff --git a/smoketest/scripts/cli/test_interfaces_vxlan.py b/smoketest/scripts/cli/test_interfaces_vxlan.py
index b2076b43b..132496124 100755
--- a/smoketest/scripts/cli/test_interfaces_vxlan.py
+++ b/smoketest/scripts/cli/test_interfaces_vxlan.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2020-2023 VyOS maintainers and contributors
+# Copyright (C) 2020-2025 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -114,6 +114,30 @@ class VXLANInterfaceTest(BasicInterfaceTest.TestCase):
self.assertEqual(Interface(interface).get_admin_state(), 'up')
ttl += 10
+
+ def test_vxlan_group_remote_error(self):
+ intf = 'vxlan60'
+ options = [
+ 'group 239.4.4.5',
+ 'mtu 1420',
+ 'remote 192.168.0.254',
+ 'source-address 192.168.0.1',
+ 'source-interface eth0',
+ 'vni 60'
+ ]
+ for option in options:
+ opts = option.split()
+ self.cli_set(self._base_path + [intf] + opts)
+
+ # verify() - Both group and remote cannot be specified
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
+
+ # Remove blocking CLI option
+ self.cli_delete(self._base_path + [intf, 'group'])
+ self.cli_commit()
+
+
def test_vxlan_external(self):
interface = 'vxlan0'
source_address = '192.0.2.1'
diff --git a/smoketest/scripts/cli/test_interfaces_wireless.py b/smoketest/scripts/cli/test_interfaces_wireless.py
index b8b18f30f..1c69c1be5 100755
--- a/smoketest/scripts/cli/test_interfaces_wireless.py
+++ b/smoketest/scripts/cli/test_interfaces_wireless.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2020-2024 VyOS maintainers and contributors
+# Copyright (C) 2020-2025 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -64,13 +64,23 @@ class WirelessInterfaceTest(BasicInterfaceTest.TestCase):
# call base-classes classmethod
super(WirelessInterfaceTest, cls).setUpClass()
- # T5245 - currently testcases are disabled
- cls._test_ipv6 = False
- cls._test_vlan = False
+ # If any wireless interface is based on mac80211_hwsim, disable all
+ # VLAN related testcases. See T5245, T7325
+ tmp = read_file('/proc/modules')
+ if 'mac80211_hwsim' in tmp:
+ cls._test_ipv6 = False
+ cls._test_vlan = False
+ cls._test_qinq = False
+
+ # Loading mac80211_hwsim module created two WIFI Interfaces in the
+ # background (wlan0 and wlan1), remove them to have a clean test start.
+ # This must happen AFTER the above check for unsupported drivers
+ for interface in cls._interfaces:
+ if interface_exists(interface):
+ call(f'sudo iw dev {interface} del')
cls.cli_set(cls, wifi_cc_path + [country])
-
def test_wireless_add_single_ip_address(self):
# derived method to check if member interfaces are enslaved properly
super().test_add_single_ip_address()
@@ -627,9 +637,4 @@ class WirelessInterfaceTest(BasicInterfaceTest.TestCase):
if __name__ == '__main__':
check_kmod('mac80211_hwsim')
- # loading the module created two WIFI Interfaces in the background (wlan0 and wlan1)
- # remove them to have a clean test start
- for interface in ['wlan0', 'wlan1']:
- if interface_exists(interface):
- call(f'sudo iw dev {interface} del')
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_load-balancing_haproxy.py b/smoketest/scripts/cli/test_load-balancing_haproxy.py
index 9f412aa95..833e0a92b 100755
--- a/smoketest/scripts/cli/test_load-balancing_haproxy.py
+++ b/smoketest/scripts/cli/test_load-balancing_haproxy.py
@@ -14,11 +14,14 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import re
+import textwrap
import unittest
from base_vyostest_shim import VyOSUnitTestSHIM
from vyos.configsession import ConfigSessionError
+from vyos.template import get_default_port
from vyos.utils.process import process_named_running
from vyos.utils.file import read_file
@@ -131,7 +134,25 @@ ZXLrtgVJR9W020qTurO2f91qfU8646n11hR9ObBB1IYbagOU0Pw1Nrq/FRp/u2tx
7i7xFz2WEiQeSCPaKYOiqM3t
"""
+haproxy_service_name = 'https_front'
+haproxy_backend_name = 'bk-01'
+def parse_haproxy_config() -> dict:
+ config_str = read_file(HAPROXY_CONF)
+ section_pattern = re.compile(r'^(global|defaults|frontend\s+\S+|backend\s+\S+)', re.MULTILINE)
+ sections = {}
+
+ matches = list(section_pattern.finditer(config_str))
+
+ for i, match in enumerate(matches):
+ section_name = match.group(1).strip()
+ start = match.end()
+ end = matches[i + 1].start() if i + 1 < len(matches) else len(config_str)
+ section_body = config_str[start:end]
+ dedented_body = textwrap.dedent(section_body).strip()
+ sections[section_name] = dedented_body
+
+ return sections
class TestLoadBalancingReverseProxy(VyOSUnitTestSHIM.TestCase):
def tearDown(self):
# Check for running process
@@ -146,14 +167,14 @@ class TestLoadBalancingReverseProxy(VyOSUnitTestSHIM.TestCase):
self.assertFalse(process_named_running(PROCESS_NAME))
def base_config(self):
- self.cli_set(base_path + ['service', 'https_front', 'mode', 'http'])
- self.cli_set(base_path + ['service', 'https_front', 'port', '4433'])
- self.cli_set(base_path + ['service', 'https_front', 'backend', 'bk-01'])
+ self.cli_set(base_path + ['service', haproxy_service_name, 'mode', 'http'])
+ self.cli_set(base_path + ['service', haproxy_service_name, 'port', '4433'])
+ self.cli_set(base_path + ['service', haproxy_service_name, 'backend', haproxy_backend_name])
- self.cli_set(base_path + ['backend', 'bk-01', 'mode', 'http'])
- self.cli_set(base_path + ['backend', 'bk-01', 'server', 'bk-01', 'address', '192.0.2.11'])
- self.cli_set(base_path + ['backend', 'bk-01', 'server', 'bk-01', 'port', '9090'])
- self.cli_set(base_path + ['backend', 'bk-01', 'server', 'bk-01', 'send-proxy'])
+ self.cli_set(base_path + ['backend', haproxy_backend_name, 'mode', 'http'])
+ self.cli_set(base_path + ['backend', haproxy_backend_name, 'server', haproxy_backend_name, 'address', '192.0.2.11'])
+ self.cli_set(base_path + ['backend', haproxy_backend_name, 'server', haproxy_backend_name, 'port', '9090'])
+ self.cli_set(base_path + ['backend', haproxy_backend_name, 'server', haproxy_backend_name, 'send-proxy'])
self.cli_set(base_path + ['global-parameters', 'max-connections', '1000'])
@@ -167,15 +188,15 @@ class TestLoadBalancingReverseProxy(VyOSUnitTestSHIM.TestCase):
self.cli_set(['pki', 'certificate', 'smoketest', 'certificate', valid_cert.replace('\n','')])
self.cli_set(['pki', 'certificate', 'smoketest', 'private', 'key', valid_cert_private_key.replace('\n','')])
- def test_01_lb_reverse_proxy_domain(self):
+ def test_reverse_proxy_domain(self):
domains_bk_first = ['n1.example.com', 'n2.example.com', 'n3.example.com']
domain_bk_second = 'n5.example.com'
- frontend = 'https_front'
+ frontend = 'vyos_smoketest'
front_port = '4433'
bk_server_first = '192.0.2.11'
bk_server_second = '192.0.2.12'
- bk_first_name = 'bk-01'
- bk_second_name = 'bk-02'
+ bk_first_name = 'vyosbk-01'
+ bk_second_name = 'vyosbk-02'
bk_server_port = '9090'
mode = 'http'
rule_ten = '10'
@@ -241,9 +262,9 @@ class TestLoadBalancingReverseProxy(VyOSUnitTestSHIM.TestCase):
self.assertIn(f'server {bk_second_name} {bk_server_second}:{bk_server_port}', config)
self.assertIn(f'server {bk_second_name} {bk_server_second}:{bk_server_port} backup', config)
- def test_02_lb_reverse_proxy_cert_not_exists(self):
+ def test_reverse_proxy_cert_not_exists(self):
self.base_config()
- self.cli_set(base_path + ['service', 'https_front', 'ssl', 'certificate', 'cert'])
+ self.cli_set(base_path + ['service', haproxy_service_name, 'ssl', 'certificate', 'cert'])
with self.assertRaises(ConfigSessionError) as e:
self.cli_commit()
@@ -253,19 +274,19 @@ class TestLoadBalancingReverseProxy(VyOSUnitTestSHIM.TestCase):
self.configure_pki()
self.base_config()
- self.cli_set(base_path + ['service', 'https_front', 'ssl', 'certificate', 'cert'])
+ self.cli_set(base_path + ['service', haproxy_service_name, 'ssl', 'certificate', 'cert'])
with self.assertRaises(ConfigSessionError) as e:
self.cli_commit()
# self.assertIn('\nCertificate "cert" does not exist\n', str(e.exception))
- self.cli_delete(base_path + ['service', 'https_front', 'ssl', 'certificate', 'cert'])
- self.cli_set(base_path + ['service', 'https_front', 'ssl', 'certificate', 'smoketest'])
+ self.cli_delete(base_path + ['service', haproxy_service_name, 'ssl', 'certificate', 'cert'])
+ self.cli_set(base_path + ['service', haproxy_service_name, 'ssl', 'certificate', 'smoketest'])
self.cli_commit()
- def test_03_lb_reverse_proxy_ca_not_exists(self):
+ def test_reverse_proxy_ca_not_exists(self):
self.base_config()
- self.cli_set(base_path + ['backend', 'bk-01', 'ssl', 'ca-certificate', 'ca-test'])
+ self.cli_set(base_path + ['backend', haproxy_backend_name, 'ssl', 'ca-certificate', 'ca-test'])
with self.assertRaises(ConfigSessionError) as e:
self.cli_commit()
@@ -275,40 +296,40 @@ class TestLoadBalancingReverseProxy(VyOSUnitTestSHIM.TestCase):
self.configure_pki()
self.base_config()
- self.cli_set(base_path + ['backend', 'bk-01', 'ssl', 'ca-certificate', 'ca-test'])
+ self.cli_set(base_path + ['backend', haproxy_backend_name, 'ssl', 'ca-certificate', 'ca-test'])
with self.assertRaises(ConfigSessionError) as e:
self.cli_commit()
# self.assertIn('\nCA certificate "ca-test" does not exist\n', str(e.exception))
- self.cli_delete(base_path + ['backend', 'bk-01', 'ssl', 'ca-certificate', 'ca-test'])
- self.cli_set(base_path + ['backend', 'bk-01', 'ssl', 'ca-certificate', 'smoketest'])
+ self.cli_delete(base_path + ['backend', haproxy_backend_name, 'ssl', 'ca-certificate', 'ca-test'])
+ self.cli_set(base_path + ['backend', haproxy_backend_name, 'ssl', 'ca-certificate', 'smoketest'])
self.cli_commit()
- def test_04_lb_reverse_proxy_backend_ssl_no_verify(self):
+ def test_reverse_proxy_backend_ssl_no_verify(self):
# Setup base
self.configure_pki()
self.base_config()
# Set no-verify option
- self.cli_set(base_path + ['backend', 'bk-01', 'ssl', 'no-verify'])
+ self.cli_set(base_path + ['backend', haproxy_backend_name, 'ssl', 'no-verify'])
self.cli_commit()
# Test no-verify option
config = read_file(HAPROXY_CONF)
- self.assertIn('server bk-01 192.0.2.11:9090 send-proxy ssl verify none', config)
+ self.assertIn(f'server {haproxy_backend_name} 192.0.2.11:9090 send-proxy ssl verify none', config)
# Test setting ca-certificate alongside no-verify option fails, to test config validation
- self.cli_set(base_path + ['backend', 'bk-01', 'ssl', 'ca-certificate', 'smoketest'])
+ self.cli_set(base_path + ['backend', haproxy_backend_name, 'ssl', 'ca-certificate', 'smoketest'])
with self.assertRaises(ConfigSessionError) as e:
self.cli_commit()
- def test_05_lb_reverse_proxy_backend_http_check(self):
+ def test_reverse_proxy_backend_http_check(self):
# Setup base
self.base_config()
# Set http-check
- self.cli_set(base_path + ['backend', 'bk-01', 'http-check', 'method', 'get'])
+ self.cli_set(base_path + ['backend', haproxy_backend_name, 'http-check', 'method', 'get'])
self.cli_commit()
# Test http-check
@@ -317,8 +338,8 @@ class TestLoadBalancingReverseProxy(VyOSUnitTestSHIM.TestCase):
self.assertIn('http-check send meth GET', config)
# Set http-check with uri and status
- self.cli_set(base_path + ['backend', 'bk-01', 'http-check', 'uri', '/health'])
- self.cli_set(base_path + ['backend', 'bk-01', 'http-check', 'expect', 'status', '200'])
+ self.cli_set(base_path + ['backend', haproxy_backend_name, 'http-check', 'uri', '/health'])
+ self.cli_set(base_path + ['backend', haproxy_backend_name, 'http-check', 'expect', 'status', '200'])
self.cli_commit()
# Test http-check with uri and status
@@ -328,8 +349,8 @@ class TestLoadBalancingReverseProxy(VyOSUnitTestSHIM.TestCase):
self.assertIn('http-check expect status 200', config)
# Set http-check with string
- self.cli_delete(base_path + ['backend', 'bk-01', 'http-check', 'expect', 'status', '200'])
- self.cli_set(base_path + ['backend', 'bk-01', 'http-check', 'expect', 'string', 'success'])
+ self.cli_delete(base_path + ['backend', haproxy_backend_name, 'http-check', 'expect', 'status', '200'])
+ self.cli_set(base_path + ['backend', haproxy_backend_name, 'http-check', 'expect', 'string', 'success'])
self.cli_commit()
# Test http-check with string
@@ -339,11 +360,11 @@ class TestLoadBalancingReverseProxy(VyOSUnitTestSHIM.TestCase):
self.assertIn('http-check expect string success', config)
# Test configuring both http-check & health-check fails validation script
- self.cli_set(base_path + ['backend', 'bk-01', 'health-check', 'ldap'])
+ self.cli_set(base_path + ['backend', haproxy_backend_name, 'health-check', 'ldap'])
with self.assertRaises(ConfigSessionError) as e:
self.cli_commit()
- def test_06_lb_reverse_proxy_tcp_mode(self):
+ def test_reverse_proxy_tcp_mode(self):
frontend = 'tcp_8443'
mode = 'tcp'
front_port = '8433'
@@ -390,27 +411,27 @@ class TestLoadBalancingReverseProxy(VyOSUnitTestSHIM.TestCase):
self.assertIn(f'mode {mode}', config)
self.assertIn(f'server {bk_name} {bk_server}:{bk_server_port}', config)
- def test_07_lb_reverse_proxy_http_response_headers(self):
+ def test_reverse_proxy_http_response_headers(self):
# Setup base
self.configure_pki()
self.base_config()
# Set example headers in both frontend and backend
- self.cli_set(base_path + ['service', 'https_front', 'http-response-headers', 'Cache-Control', 'value', 'max-age=604800'])
- self.cli_set(base_path + ['backend', 'bk-01', 'http-response-headers', 'Proxy-Backend-ID', 'value', 'bk-01'])
+ self.cli_set(base_path + ['service', haproxy_service_name, 'http-response-headers', 'Cache-Control', 'value', 'max-age=604800'])
+ self.cli_set(base_path + ['backend', haproxy_backend_name, 'http-response-headers', 'Proxy-Backend-ID', 'value', haproxy_backend_name])
self.cli_commit()
# Test headers are present in generated configuration file
config = read_file(HAPROXY_CONF)
self.assertIn('http-response set-header Cache-Control \'max-age=604800\'', config)
- self.assertIn('http-response set-header Proxy-Backend-ID \'bk-01\'', config)
+ self.assertIn(f'http-response set-header Proxy-Backend-ID \'{haproxy_backend_name}\'', config)
# Test setting alongside modes other than http is blocked by validation conditions
- self.cli_set(base_path + ['service', 'https_front', 'mode', 'tcp'])
+ self.cli_set(base_path + ['service', haproxy_service_name, 'mode', 'tcp'])
with self.assertRaises(ConfigSessionError) as e:
self.cli_commit()
- def test_08_lb_reverse_proxy_tcp_health_checks(self):
+ def test_reverse_proxy_tcp_health_checks(self):
# Setup PKI
self.configure_pki()
@@ -458,7 +479,7 @@ class TestLoadBalancingReverseProxy(VyOSUnitTestSHIM.TestCase):
config = read_file(HAPROXY_CONF)
self.assertIn(f'option smtpchk', config)
- def test_09_lb_reverse_proxy_logging(self):
+ def test_reverse_proxy_logging(self):
# Setup base
self.base_config()
self.cli_commit()
@@ -477,7 +498,7 @@ class TestLoadBalancingReverseProxy(VyOSUnitTestSHIM.TestCase):
self.assertIn('log /dev/log local2 warning', config)
# Test backend logging options
- backend_path = base_path + ['backend', 'bk-01']
+ backend_path = base_path + ['backend', haproxy_backend_name]
self.cli_set(backend_path + ['logging', 'facility', 'local3', 'level', 'debug'])
self.cli_set(backend_path + ['logging', 'facility', 'local4', 'level', 'info'])
self.cli_commit()
@@ -488,7 +509,7 @@ class TestLoadBalancingReverseProxy(VyOSUnitTestSHIM.TestCase):
self.assertIn('log /dev/log local4 info', config)
# Test service logging options
- service_path = base_path + ['service', 'https_front']
+ service_path = base_path + ['service', haproxy_service_name]
self.cli_set(service_path + ['logging', 'facility', 'local5', 'level', 'notice'])
self.cli_set(service_path + ['logging', 'facility', 'local6', 'level', 'crit'])
self.cli_commit()
@@ -498,16 +519,17 @@ class TestLoadBalancingReverseProxy(VyOSUnitTestSHIM.TestCase):
self.assertIn('log /dev/log local5 notice', config)
self.assertIn('log /dev/log local6 crit', config)
- def test_10_lb_reverse_proxy_http_compression(self):
+ def test_reverse_proxy_http_compression(self):
# Setup base
self.configure_pki()
self.base_config()
# Configure compression in frontend
- self.cli_set(base_path + ['service', 'https_front', 'http-compression', 'algorithm', 'gzip'])
- self.cli_set(base_path + ['service', 'https_front', 'http-compression', 'mime-type', 'text/html'])
- self.cli_set(base_path + ['service', 'https_front', 'http-compression', 'mime-type', 'text/javascript'])
- self.cli_set(base_path + ['service', 'https_front', 'http-compression', 'mime-type', 'text/plain'])
+ http_comp_path = base_path + ['service', haproxy_service_name, 'http-compression']
+ self.cli_set(http_comp_path + ['algorithm', 'gzip'])
+ self.cli_set(http_comp_path + ['mime-type', 'text/html'])
+ self.cli_set(http_comp_path + ['mime-type', 'text/javascript'])
+ self.cli_set(http_comp_path + ['mime-type', 'text/plain'])
self.cli_commit()
# Test compression is present in generated configuration file
@@ -517,9 +539,77 @@ class TestLoadBalancingReverseProxy(VyOSUnitTestSHIM.TestCase):
self.assertIn('compression type text/html text/javascript text/plain', config)
# Test setting compression without specifying any mime-types fails verification
- self.cli_delete(base_path + ['service', 'https_front', 'http-compression', 'mime-type'])
+ self.cli_delete(base_path + ['service', haproxy_service_name, 'http-compression', 'mime-type'])
with self.assertRaises(ConfigSessionError) as e:
self.cli_commit()
+ def test_reverse_proxy_timeout(self):
+ t_default_check = '5'
+ t_default_client = '50'
+ t_default_connect = '10'
+ t_default_server ='50'
+ t_check = '4'
+ t_client = '300'
+ t_connect = '12'
+ t_server ='120'
+ t_front_client = '600'
+
+ self.base_config()
+ self.cli_commit()
+ # Check default timeout options
+ config_entries = (
+ f'timeout check {t_default_check}s',
+ f'timeout connect {t_default_connect}s',
+ f'timeout client {t_default_client}s',
+ f'timeout server {t_default_server}s',
+ )
+ # Check default timeout options
+ config = read_file(HAPROXY_CONF)
+ for config_entry in config_entries:
+ self.assertIn(config_entry, config)
+
+ # Set custom timeout options
+ self.cli_set(base_path + ['timeout', 'check', t_check])
+ self.cli_set(base_path + ['timeout', 'client', t_client])
+ self.cli_set(base_path + ['timeout', 'connect', t_connect])
+ self.cli_set(base_path + ['timeout', 'server', t_server])
+ self.cli_set(base_path + ['service', haproxy_service_name, 'timeout', 'client', t_front_client])
+
+ self.cli_commit()
+
+ # Check custom timeout options
+ config_entries = (
+ f'timeout check {t_check}s',
+ f'timeout connect {t_connect}s',
+ f'timeout client {t_client}s',
+ f'timeout server {t_server}s',
+ f'timeout client {t_front_client}s',
+ )
+
+ # Check configured options
+ config = read_file(HAPROXY_CONF)
+ for config_entry in config_entries:
+ self.assertIn(config_entry, config)
+
+ def test_reverse_proxy_http_redirect(self):
+ self.base_config()
+ self.cli_set(base_path + ['service', haproxy_service_name, 'redirect-http-to-https'])
+
+ self.cli_commit()
+
+ config = parse_haproxy_config()
+ frontend_name = f'frontend {haproxy_service_name}-http'
+ self.assertIn(frontend_name, config.keys())
+ self.assertIn('mode http', config[frontend_name])
+ self.assertIn('bind [::]:80 v4v6', config[frontend_name])
+ self.assertIn('acl acme_acl path_beg /.well-known/acme-challenge/', config[frontend_name])
+ self.assertIn('use_backend buildin_acme_certbot if acme_acl', config[frontend_name])
+ self.assertIn('redirect scheme https code 301 if !acme_acl', config[frontend_name])
+
+ backend_name = 'backend buildin_acme_certbot'
+ self.assertIn(backend_name, config.keys())
+ port = get_default_port('certbot_haproxy')
+ self.assertIn(f'server localhost 127.0.0.1:{port}', config[backend_name])
+
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_load-balancing_wan.py b/smoketest/scripts/cli/test_load-balancing_wan.py
index 92b4000b8..32e5f6915 100755
--- a/smoketest/scripts/cli/test_load-balancing_wan.py
+++ b/smoketest/scripts/cli/test_load-balancing_wan.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2022-2024 VyOS maintainers and contributors
+# Copyright (C) 2022-2025 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -14,10 +14,13 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import os
import unittest
import time
from base_vyostest_shim import VyOSUnitTestSHIM
+from vyos.utils.file import chmod_755
+from vyos.utils.file import write_file
from vyos.utils.process import call
from vyos.utils.process import cmd
@@ -54,6 +57,16 @@ class TestLoadBalancingWan(VyOSUnitTestSHIM.TestCase):
self.cli_delete(base_path)
self.cli_commit()
+ removed_chains = [
+ 'wlb_mangle_isp_veth1',
+ 'wlb_mangle_isp_veth2',
+ 'wlb_mangle_isp_eth201',
+ 'wlb_mangle_isp_eth202'
+ ]
+
+ for chain in removed_chains:
+ self.verify_nftables_chain_exists('ip vyos_wanloadbalance', chain, inverse=True)
+
def test_table_routes(self):
ns1 = 'ns201'
ns2 = 'ns202'
@@ -93,6 +106,7 @@ class TestLoadBalancingWan(VyOSUnitTestSHIM.TestCase):
cmd_in_netns(ns3, 'ip link set dev eth0 up')
# Set load-balancing configuration
+ self.cli_set(base_path + ['wan', 'hook', '/bin/true'])
self.cli_set(base_path + ['wan', 'interface-health', iface1, 'failure-count', '2'])
self.cli_set(base_path + ['wan', 'interface-health', iface1, 'nexthop', '203.0.113.1'])
self.cli_set(base_path + ['wan', 'interface-health', iface1, 'success-count', '1'])
@@ -102,7 +116,8 @@ class TestLoadBalancingWan(VyOSUnitTestSHIM.TestCase):
self.cli_set(base_path + ['wan', 'rule', '10', 'inbound-interface', iface3])
self.cli_set(base_path + ['wan', 'rule', '10', 'source', 'address', '198.51.100.0/24'])
-
+ self.cli_set(base_path + ['wan', 'rule', '10', 'interface', iface1])
+ self.cli_set(base_path + ['wan', 'rule', '10', 'interface', iface2])
# commit changes
self.cli_commit()
@@ -127,7 +142,6 @@ class TestLoadBalancingWan(VyOSUnitTestSHIM.TestCase):
delete_netns(ns3)
def test_check_chains(self):
-
ns1 = 'nsA'
ns2 = 'nsB'
ns3 = 'nsC'
@@ -137,43 +151,28 @@ class TestLoadBalancingWan(VyOSUnitTestSHIM.TestCase):
container_iface1 = 'ceth0'
container_iface2 = 'ceth1'
container_iface3 = 'ceth2'
- mangle_isp1 = """table ip mangle {
- chain ISP_veth1 {
- counter ct mark set 0xc9
- counter meta mark set 0xc9
- counter accept
+ mangle_isp1 = """table ip vyos_wanloadbalance {
+ chain wlb_mangle_isp_veth1 {
+ meta mark set 0x000000c9 ct mark set 0x000000c9 counter accept
}
}"""
- mangle_isp2 = """table ip mangle {
- chain ISP_veth2 {
- counter ct mark set 0xca
- counter meta mark set 0xca
- counter accept
+ mangle_isp2 = """table ip vyos_wanloadbalance {
+ chain wlb_mangle_isp_veth2 {
+ meta mark set 0x000000ca ct mark set 0x000000ca counter accept
}
}"""
- mangle_prerouting = """table ip mangle {
- chain PREROUTING {
+ mangle_prerouting = """table ip vyos_wanloadbalance {
+ chain wlb_mangle_prerouting {
type filter hook prerouting priority mangle; policy accept;
- counter jump WANLOADBALANCE_PRE
- }
-}"""
- mangle_wanloadbalance_pre = """table ip mangle {
- chain WANLOADBALANCE_PRE {
- iifname "veth3" ip saddr 198.51.100.0/24 ct state new meta random & 2147483647 < 1073741824 counter jump ISP_veth1
- iifname "veth3" ip saddr 198.51.100.0/24 ct state new counter jump ISP_veth2
+ iifname "veth3" ip saddr 198.51.100.0/24 ct state new limit rate 5/second burst 5 packets counter numgen random mod 11 vmap { 0 : jump wlb_mangle_isp_veth1, 1-10 : jump wlb_mangle_isp_veth2 }
iifname "veth3" ip saddr 198.51.100.0/24 counter meta mark set ct mark
}
}"""
- nat_wanloadbalance = """table ip nat {
- chain WANLOADBALANCE {
- ct mark 0xc9 counter snat to 203.0.113.10
- ct mark 0xca counter snat to 192.0.2.10
- }
-}"""
- nat_vyos_pre_snat_hook = """table ip nat {
- chain VYOS_PRE_SNAT_HOOK {
+ nat_wanloadbalance = """table ip vyos_wanloadbalance {
+ chain wlb_nat_postrouting {
type nat hook postrouting priority srcnat - 1; policy accept;
- counter jump WANLOADBALANCE
+ ct mark 0x000000c9 counter snat to 203.0.113.10
+ ct mark 0x000000ca counter snat to 192.0.2.10
}
}"""
@@ -214,7 +213,7 @@ class TestLoadBalancingWan(VyOSUnitTestSHIM.TestCase):
self.cli_set(base_path + ['wan', 'rule', '10', 'inbound-interface', iface3])
self.cli_set(base_path + ['wan', 'rule', '10', 'source', 'address', '198.51.100.0/24'])
self.cli_set(base_path + ['wan', 'rule', '10', 'interface', iface1])
- self.cli_set(base_path + ['wan', 'rule', '10', 'interface', iface2])
+ self.cli_set(base_path + ['wan', 'rule', '10', 'interface', iface2, 'weight', '10'])
# commit changes
self.cli_commit()
@@ -222,25 +221,19 @@ class TestLoadBalancingWan(VyOSUnitTestSHIM.TestCase):
time.sleep(5)
# Check mangle chains
- tmp = cmd(f'sudo nft -s list chain mangle ISP_{iface1}')
+ tmp = cmd(f'sudo nft -s list chain ip vyos_wanloadbalance wlb_mangle_isp_{iface1}')
self.assertEqual(tmp, mangle_isp1)
- tmp = cmd(f'sudo nft -s list chain mangle ISP_{iface2}')
+ tmp = cmd(f'sudo nft -s list chain ip vyos_wanloadbalance wlb_mangle_isp_{iface2}')
self.assertEqual(tmp, mangle_isp2)
- tmp = cmd(f'sudo nft -s list chain mangle PREROUTING')
+ tmp = cmd('sudo nft -s list chain ip vyos_wanloadbalance wlb_mangle_prerouting')
self.assertEqual(tmp, mangle_prerouting)
- tmp = cmd(f'sudo nft -s list chain mangle WANLOADBALANCE_PRE')
- self.assertEqual(tmp, mangle_wanloadbalance_pre)
-
# Check nat chains
- tmp = cmd(f'sudo nft -s list chain nat WANLOADBALANCE')
+ tmp = cmd('sudo nft -s list chain ip vyos_wanloadbalance wlb_nat_postrouting')
self.assertEqual(tmp, nat_wanloadbalance)
- tmp = cmd(f'sudo nft -s list chain nat VYOS_PRE_SNAT_HOOK')
- self.assertEqual(tmp, nat_vyos_pre_snat_hook)
-
# Delete veth interfaces and netns
for iface in [iface1, iface2, iface3]:
call(f'sudo ip link del dev {iface}')
@@ -249,6 +242,85 @@ class TestLoadBalancingWan(VyOSUnitTestSHIM.TestCase):
delete_netns(ns2)
delete_netns(ns3)
+ def test_criteria_failover_hook(self):
+ isp1_iface = 'eth0'
+ isp2_iface = 'eth1'
+ lan_iface = 'eth2'
+
+ hook_path = '/tmp/wlb_hook.sh'
+ hook_output_path = '/tmp/wlb_hook_output'
+ hook_script = f"""
+#!/bin/sh
+
+ifname=$WLB_INTERFACE_NAME
+state=$WLB_INTERFACE_STATE
+
+echo "$ifname - $state" > {hook_output_path}
+"""
+
+ write_file(hook_path, hook_script)
+ chmod_755(hook_path)
+
+ self.cli_set(['interfaces', 'ethernet', isp1_iface, 'address', '203.0.113.2/30'])
+ self.cli_set(['interfaces', 'ethernet', isp2_iface, 'address', '192.0.2.2/30'])
+ self.cli_set(['interfaces', 'ethernet', lan_iface, 'address', '198.51.100.2/30'])
+
+ self.cli_set(base_path + ['wan', 'hook', hook_path])
+ self.cli_set(base_path + ['wan', 'interface-health', isp1_iface, 'failure-count', '1'])
+ self.cli_set(base_path + ['wan', 'interface-health', isp1_iface, 'nexthop', '203.0.113.2'])
+ self.cli_set(base_path + ['wan', 'interface-health', isp1_iface, 'success-count', '1'])
+ self.cli_set(base_path + ['wan', 'interface-health', isp2_iface, 'failure-count', '1'])
+ self.cli_set(base_path + ['wan', 'interface-health', isp2_iface, 'nexthop', '192.0.2.2'])
+ self.cli_set(base_path + ['wan', 'interface-health', isp2_iface, 'success-count', '1'])
+ self.cli_set(base_path + ['wan', 'rule', '5', 'exclude'])
+ self.cli_set(base_path + ['wan', 'rule', '5', 'inbound-interface', 'eth*'])
+ self.cli_set(base_path + ['wan', 'rule', '5', 'destination', 'address', '10.0.0.0/8'])
+ self.cli_set(base_path + ['wan', 'rule', '10', 'failover'])
+ self.cli_set(base_path + ['wan', 'rule', '10', 'inbound-interface', lan_iface])
+ self.cli_set(base_path + ['wan', 'rule', '10', 'protocol', 'udp'])
+ self.cli_set(base_path + ['wan', 'rule', '10', 'source', 'address', '198.51.100.0/24'])
+ self.cli_set(base_path + ['wan', 'rule', '10', 'source', 'port', '53'])
+ self.cli_set(base_path + ['wan', 'rule', '10', 'destination', 'address', '192.0.2.0/24'])
+ self.cli_set(base_path + ['wan', 'rule', '10', 'destination', 'port', '53'])
+ self.cli_set(base_path + ['wan', 'rule', '10', 'interface', isp1_iface])
+ self.cli_set(base_path + ['wan', 'rule', '10', 'interface', isp1_iface, 'weight', '10'])
+ self.cli_set(base_path + ['wan', 'rule', '10', 'interface', isp2_iface])
+
+ # commit changes
+ self.cli_commit()
+
+ time.sleep(5)
+
+ # Verify isp1 + criteria
+
+ nftables_search = [
+ [f'iifname "eth*"', 'ip daddr 10.0.0.0/8', 'return'],
+ [f'iifname "{lan_iface}"', 'ip saddr 198.51.100.0/24', 'udp sport 53', 'ip daddr 192.0.2.0/24', 'udp dport 53', f'jump wlb_mangle_isp_{isp1_iface}']
+ ]
+
+ self.verify_nftables_chain(nftables_search, 'ip vyos_wanloadbalance', 'wlb_mangle_prerouting')
+
+ # Trigger failure on isp1 health check
+
+ self.cli_delete(['interfaces', 'ethernet', isp1_iface, 'address', '203.0.113.2/30'])
+ self.cli_commit()
+
+ time.sleep(10)
+
+ # Verify failover to isp2
+
+ nftables_search = [
+ [f'iifname "{lan_iface}"', f'jump wlb_mangle_isp_{isp2_iface}']
+ ]
+
+ self.verify_nftables_chain(nftables_search, 'ip vyos_wanloadbalance', 'wlb_mangle_prerouting')
+
+ # Verify hook output
+
+ self.assertTrue(os.path.exists(hook_output_path))
+
+ with open(hook_output_path, 'r') as f:
+ self.assertIn('eth0 - FAILED', f.read())
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_nat66.py b/smoketest/scripts/cli/test_nat66.py
index 52ad8e3ef..d4b5d6aa4 100755
--- a/smoketest/scripts/cli/test_nat66.py
+++ b/smoketest/scripts/cli/test_nat66.py
@@ -227,6 +227,35 @@ class TestNAT66(VyOSUnitTestSHIM.TestCase):
self.verify_nftables(nftables_search, 'ip6 vyos_nat')
+ def test_source_nat66_network_group(self):
+ address_group = 'smoketest_addr'
+ address_group_member = 'fc00::1'
+ network_group = 'smoketest_net'
+ network_group_member = 'fc00::/64'
+ translation_prefix = 'fc01::/64'
+
+ self.cli_set(['firewall', 'group', 'ipv6-address-group', address_group, 'address', address_group_member])
+ self.cli_set(['firewall', 'group', 'ipv6-network-group', network_group, 'network', network_group_member])
+
+ self.cli_set(src_path + ['rule', '1', 'destination', 'group', 'address-group', address_group])
+ self.cli_set(src_path + ['rule', '1', 'translation', 'address', translation_prefix])
+
+ self.cli_set(src_path + ['rule', '2', 'destination', 'group', 'network-group', network_group])
+ self.cli_set(src_path + ['rule', '2', 'translation', 'address', translation_prefix])
+
+ self.cli_commit()
+
+ nftables_search = [
+ [f'set A6_{address_group}'],
+ [f'elements = {{ {address_group_member} }}'],
+ [f'set N6_{network_group}'],
+ [f'elements = {{ {network_group_member} }}'],
+ ['ip6 daddr', f'@A6_{address_group}', 'snat prefix to fc01::/64'],
+ ['ip6 daddr', f'@N6_{network_group}', 'snat prefix to fc01::/64']
+ ]
+
+ self.verify_nftables(nftables_search, 'ip6 vyos_nat')
+
def test_nat66_no_rules(self):
# T3206: deleting all rules but keep the direction 'destination' or
# 'source' resulteds in KeyError: 'rule'.
diff --git a/smoketest/scripts/cli/test_policy.py b/smoketest/scripts/cli/test_policy.py
index 9d4fc0845..985097726 100755
--- a/smoketest/scripts/cli/test_policy.py
+++ b/smoketest/scripts/cli/test_policy.py
@@ -1149,6 +1149,16 @@ class TestPolicy(VyOSUnitTestSHIM.TestCase):
},
},
},
+ 'vrf-match': {
+ 'rule': {
+ '10': {
+ 'action': 'permit',
+ 'match': {
+ 'source-vrf': 'TEST',
+ },
+ },
+ },
+ },
}
self.cli_set(['policy', 'access-list', access_list, 'rule', '10', 'action', 'permit'])
@@ -1260,6 +1270,8 @@ class TestPolicy(VyOSUnitTestSHIM.TestCase):
self.cli_set(path + ['rule', rule, 'match', 'rpki', 'valid'])
if 'protocol' in rule_config['match']:
self.cli_set(path + ['rule', rule, 'match', 'protocol', rule_config['match']['protocol']])
+ if 'source-vrf' in rule_config['match']:
+ self.cli_set(path + ['rule', rule, 'match', 'source-vrf', rule_config['match']['source-vrf']])
if 'tag' in rule_config['match']:
self.cli_set(path + ['rule', rule, 'match', 'tag', rule_config['match']['tag']])
@@ -1438,6 +1450,9 @@ class TestPolicy(VyOSUnitTestSHIM.TestCase):
if 'rpki-valid' in rule_config['match']:
tmp = f'match rpki valid'
self.assertIn(tmp, config)
+ if 'source-vrf' in rule_config['match']:
+ tmp = f'match source-vrf {rule_config["match"]["source-vrf"]}'
+ self.assertIn(tmp, config)
if 'tag' in rule_config['match']:
tmp = f'match tag {rule_config["match"]["tag"]}'
self.assertIn(tmp, config)
diff --git a/smoketest/scripts/cli/test_policy_route.py b/smoketest/scripts/cli/test_policy_route.py
index 53761b7d6..15ddd857e 100755
--- a/smoketest/scripts/cli/test_policy_route.py
+++ b/smoketest/scripts/cli/test_policy_route.py
@@ -307,5 +307,39 @@ class TestPolicyRoute(VyOSUnitTestSHIM.TestCase):
self.verify_nftables(nftables6_search, 'ip6 vyos_mangle')
+ def test_geoip(self):
+ self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'action', 'drop'])
+ self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'source', 'geoip', 'country-code', 'se'])
+ self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'source', 'geoip', 'country-code', 'gb'])
+ self.cli_set(['policy', 'route', 'smoketest', 'rule', '2', 'action', 'accept'])
+ self.cli_set(['policy', 'route', 'smoketest', 'rule', '2', 'source', 'geoip', 'country-code', 'de'])
+ self.cli_set(['policy', 'route', 'smoketest', 'rule', '2', 'source', 'geoip', 'country-code', 'fr'])
+ self.cli_set(['policy', 'route', 'smoketest', 'rule', '2', 'source', 'geoip', 'inverse-match'])
+
+ self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '1', 'action', 'drop'])
+ self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '1', 'source', 'geoip', 'country-code', 'se'])
+ self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '1', 'source', 'geoip', 'country-code', 'gb'])
+ self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '2', 'action', 'accept'])
+ self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '2', 'source', 'geoip', 'country-code', 'de'])
+ self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '2', 'source', 'geoip', 'country-code', 'fr'])
+ self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '2', 'source', 'geoip', 'inverse-match'])
+
+ self.cli_commit()
+
+ nftables_search = [
+ ['ip saddr @GEOIP_CC_route_smoketest_1', 'drop'],
+ ['ip saddr != @GEOIP_CC_route_smoketest_2', 'accept'],
+ ]
+
+ # -t prevents 1000+ GeoIP elements being returned
+ self.verify_nftables(nftables_search, 'ip vyos_mangle', args='-t')
+
+ nftables_search = [
+ ['ip6 saddr @GEOIP_CC6_route6_smoketest6_1', 'drop'],
+ ['ip6 saddr != @GEOIP_CC6_route6_smoketest6_2', 'accept'],
+ ]
+
+ self.verify_nftables(nftables_search, 'ip6 vyos_mangle', args='-t')
+
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_protocols_babel.py b/smoketest/scripts/cli/test_protocols_babel.py
index 7ecf54600..3a9ee2d62 100755
--- a/smoketest/scripts/cli/test_protocols_babel.py
+++ b/smoketest/scripts/cli/test_protocols_babel.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2024 VyOS maintainers and contributors
+# Copyright (C) 2024-2025 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -72,7 +72,7 @@ class TestProtocolsBABEL(VyOSUnitTestSHIM.TestCase):
self.assertIn(f' babel smoothing-half-life {smoothing_half_life}', frrconfig)
def test_02_redistribute(self):
- ipv4_protos = ['bgp', 'connected', 'isis', 'kernel', 'ospf', 'rip', 'static']
+ ipv4_protos = ['bgp', 'connected', 'isis', 'kernel', 'nhrp', 'ospf', 'rip', 'static']
ipv6_protos = ['bgp', 'connected', 'isis', 'kernel', 'ospfv3', 'ripng', 'static']
self.cli_set(base_path + ['interface', self._interfaces[0], 'enable-timestamps'])
diff --git a/smoketest/scripts/cli/test_protocols_bgp.py b/smoketest/scripts/cli/test_protocols_bgp.py
index 761eb8bfe..8403dcc37 100755
--- a/smoketest/scripts/cli/test_protocols_bgp.py
+++ b/smoketest/scripts/cli/test_protocols_bgp.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2021-2024 VyOS maintainers and contributors
+# Copyright (C) 2021-2025 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -655,10 +655,71 @@ class TestProtocolsBGP(VyOSUnitTestSHIM.TestCase):
}
# We want to redistribute ...
- redistributes = ['connected', 'isis', 'kernel', 'ospf', 'rip', 'static']
- for redistribute in redistributes:
- self.cli_set(base_path + ['address-family', 'ipv4-unicast',
- 'redistribute', redistribute])
+ redistributes = {
+ 'babel' : {
+ 'metric' : '100',
+ 'route_map' : 'redistr-ipv4-babel',
+ },
+ 'connected' : {
+ 'metric' : '200',
+ 'route_map' : 'redistr-ipv4-connected',
+ },
+ 'isis' : {
+ 'metric' : '300',
+ 'route_map' : 'redistr-ipv4-isis',
+ },
+ 'kernel' : {
+ 'metric' : '400',
+ 'route_map' : 'redistr-ipv4-kernel',
+ },
+ 'nhrp': {
+ 'metric': '400',
+ 'route_map': 'redistr-ipv4-nhrp',
+ },
+ 'ospf' : {
+ 'metric' : '500',
+ 'route_map' : 'redistr-ipv4-ospf',
+ },
+ 'rip' : {
+ 'metric' : '600',
+ 'route_map' : 'redistr-ipv4-rip',
+ },
+ 'static' : {
+ 'metric' : '700',
+ 'route_map' : 'redistr-ipv4-static',
+ },
+ 'table' : {
+ '10' : {
+ 'metric' : '810',
+ 'route_map' : 'redistr-ipv4-table-10',
+ },
+ '20' : {
+ 'metric' : '820',
+ 'route_map' : 'redistr-ipv4-table-20',
+ },
+ '30' : {
+ 'metric' : '830',
+ 'route_map' : 'redistr-ipv4-table-30',
+ },
+ },
+ }
+ for proto, proto_config in redistributes.items():
+ proto_path = base_path + ['address-family', 'ipv4-unicast', 'redistribute', proto]
+ if proto == 'table':
+ for table, table_config in proto_config.items():
+ self.cli_set(proto_path + [table])
+ if 'metric' in table_config:
+ self.cli_set(proto_path + [table, 'metric'], value=table_config['metric'])
+ if 'route_map' in table_config:
+ self.cli_set(['policy', 'route-map', table_config['route_map'], 'rule', '10', 'action'], value='permit')
+ self.cli_set(proto_path + [table, 'route-map'], value=table_config['route_map'])
+ else:
+ self.cli_set(proto_path)
+ if 'metric' in proto_config:
+ self.cli_set(proto_path + ['metric', proto_config['metric']])
+ if 'route_map' in proto_config:
+ self.cli_set(['policy', 'route-map', proto_config['route_map'], 'rule', '10', 'action', 'permit'])
+ self.cli_set(proto_path + ['route-map', proto_config['route_map']])
for network, network_config in networks.items():
self.cli_set(base_path + ['address-family', 'ipv4-unicast',
@@ -679,10 +740,29 @@ class TestProtocolsBGP(VyOSUnitTestSHIM.TestCase):
# Verify FRR bgpd configuration
frrconfig = self.getFRRconfig(f'router bgp {ASN}', endsection='^exit')
self.assertIn(f'router bgp {ASN}', frrconfig)
- self.assertIn(f' address-family ipv4 unicast', frrconfig)
-
- for redistribute in redistributes:
- self.assertIn(f' redistribute {redistribute}', frrconfig)
+ self.assertIn(' address-family ipv4 unicast', frrconfig)
+
+ for proto, proto_config in redistributes.items():
+ if proto == 'table':
+ for table, table_config in proto_config.items():
+ tmp = f' redistribute table-direct {table}'
+ if 'metric' in proto_config:
+ metric = proto_config['metric']
+ tmp += f' metric {metric}'
+ if 'route_map' in proto_config:
+ route_map = proto_config['route_map']
+ tmp += f' route-map {route_map}'
+ self.assertIn(tmp, frrconfig)
+ else:
+ tmp = f' redistribute {proto}'
+ if 'metric' in proto_config:
+ metric = proto_config['metric']
+ tmp += f' metric {metric}'
+ if 'route_map' in proto_config:
+ route_map = proto_config['route_map']
+ tmp += f' route-map {route_map}'
+
+ self.assertIn(tmp, frrconfig)
for network, network_config in networks.items():
self.assertIn(f' network {network}', frrconfig)
@@ -695,6 +775,10 @@ class TestProtocolsBGP(VyOSUnitTestSHIM.TestCase):
command = f'{command} route-map {network_config["route_map"]}'
self.assertIn(command, frrconfig)
+ for proto, proto_config in redistributes.items():
+ if 'route_map' in proto_config:
+ self.cli_delete(['policy', 'route-map', proto_config['route_map']])
+
def test_bgp_05_afi_ipv6(self):
networks = {
'2001:db8:100::/48' : {
@@ -707,10 +791,67 @@ class TestProtocolsBGP(VyOSUnitTestSHIM.TestCase):
}
# We want to redistribute ...
- redistributes = ['connected', 'kernel', 'ospfv3', 'ripng', 'static']
- for redistribute in redistributes:
- self.cli_set(base_path + ['address-family', 'ipv6-unicast',
- 'redistribute', redistribute])
+ redistributes = {
+ 'babel' : {
+ 'metric' : '100',
+ 'route_map' : 'redistr-ipv6-babel',
+ },
+ 'connected' : {
+ 'metric' : '200',
+ 'route_map' : 'redistr-ipv6-connected',
+ },
+ 'isis' : {
+ 'metric' : '300',
+ 'route_map' : 'redistr-ipv6-isis',
+ },
+ 'kernel' : {
+ 'metric' : '400',
+ 'route_map' : 'redistr-ipv6-kernel',
+ },
+ 'ospfv3' : {
+ 'metric' : '500',
+ 'route_map' : 'redistr-ipv6-ospfv3',
+ },
+ 'ripng' : {
+ 'metric' : '600',
+ 'route_map' : 'redistr-ipv6-ripng',
+ },
+ 'static' : {
+ 'metric' : '700',
+ 'route_map' : 'redistr-ipv6-static',
+ },
+ 'table' : {
+ '110' : {
+ 'metric' : '811',
+ 'route_map' : 'redistr-ipv6-table-110',
+ },
+ '120' : {
+ 'metric' : '821',
+ 'route_map' : 'redistr-ipv6-table-120',
+ },
+ '130' : {
+ 'metric' : '831',
+ 'route_map' : 'redistr-ipv6-table-130',
+ },
+ },
+ }
+ for proto, proto_config in redistributes.items():
+ proto_path = base_path + ['address-family', 'ipv6-unicast', 'redistribute', proto]
+ if proto == 'table':
+ for table, table_config in proto_config.items():
+ self.cli_set(proto_path + [table])
+ if 'metric' in table_config:
+ self.cli_set(proto_path + [table, 'metric'], value=table_config['metric'])
+ if 'route_map' in table_config:
+ self.cli_set(['policy', 'route-map', table_config['route_map'], 'rule', '10', 'action'], value='permit')
+ self.cli_set(proto_path + [table, 'route-map'], value=table_config['route_map'])
+ else:
+ self.cli_set(proto_path)
+ if 'metric' in proto_config:
+ self.cli_set(proto_path + ['metric', proto_config['metric']])
+ if 'route_map' in proto_config:
+ self.cli_set(['policy', 'route-map', proto_config['route_map'], 'rule', '20', 'action', 'permit'])
+ self.cli_set(proto_path + ['route-map', proto_config['route_map']])
for network, network_config in networks.items():
self.cli_set(base_path + ['address-family', 'ipv6-unicast',
@@ -725,22 +866,45 @@ class TestProtocolsBGP(VyOSUnitTestSHIM.TestCase):
# Verify FRR bgpd configuration
frrconfig = self.getFRRconfig(f'router bgp {ASN}', endsection='^exit')
self.assertIn(f'router bgp {ASN}', frrconfig)
- self.assertIn(f' address-family ipv6 unicast', frrconfig)
+ self.assertIn(' address-family ipv6 unicast', frrconfig)
# T2100: By default ebgp-requires-policy is disabled to keep VyOS
# 1.3 and 1.2 backwards compatibility
- self.assertIn(f' no bgp ebgp-requires-policy', frrconfig)
-
- for redistribute in redistributes:
- # FRR calls this OSPF6
- if redistribute == 'ospfv3':
- redistribute = 'ospf6'
- self.assertIn(f' redistribute {redistribute}', frrconfig)
+ self.assertIn(' no bgp ebgp-requires-policy', frrconfig)
+
+ for proto, proto_config in redistributes.items():
+ if proto == 'table':
+ for table, table_config in proto_config.items():
+ tmp = f' redistribute table-direct {table}'
+ if 'metric' in proto_config:
+ metric = proto_config['metric']
+ tmp += f' metric {metric}'
+ if 'route_map' in proto_config:
+ route_map = proto_config['route_map']
+ tmp += f' route-map {route_map}'
+ self.assertIn(tmp, frrconfig)
+ else:
+ # FRR calls this OSPF6
+ if proto == 'ospfv3':
+ proto = 'ospf6'
+ tmp = f' redistribute {proto}'
+ if 'metric' in proto_config:
+ metric = proto_config['metric']
+ tmp += f' metric {metric}'
+ if 'route_map' in proto_config:
+ route_map = proto_config['route_map']
+ tmp += f' route-map {route_map}'
+
+ self.assertIn(tmp, frrconfig)
for network, network_config in networks.items():
self.assertIn(f' network {network}', frrconfig)
if 'as_set' in network_config:
self.assertIn(f' aggregate-address {network} summary-only', frrconfig)
+ for proto, proto_config in redistributes.items():
+ if 'route_map' in proto_config:
+ self.cli_delete(['policy', 'route-map', proto_config['route_map']])
+
def test_bgp_06_listen_range(self):
# Implemented via T1875
limit = '64'
@@ -1376,6 +1540,42 @@ class TestProtocolsBGP(VyOSUnitTestSHIM.TestCase):
self.assertIn(f'neighbor OVERLAY remote-as {int(ASN) + 1}', conf)
self.assertIn(f'neighbor OVERLAY local-as {int(ASN) + 1}', conf)
+ def test_bgp_30_import_vrf_routemap(self):
+ router_id = '127.0.0.3'
+ table = '1000'
+ vrf = 'red'
+ vrf_base = ['vrf', 'name', vrf]
+ self.cli_set(vrf_base + ['table', table])
+ self.cli_set(vrf_base + ['protocols', 'bgp', 'system-as', ASN])
+ self.cli_set(
+ vrf_base + ['protocols', 'bgp', 'parameters', 'router-id',
+ router_id])
+
+ self.cli_set(
+ base_path + ['address-family', 'ipv4-unicast', 'import',
+ 'vrf', vrf])
+ self.cli_set(
+ base_path + ['address-family', 'ipv4-unicast', 'route-map',
+ 'vrf', 'import', route_map_in])
+
+ self.cli_commit()
+
+ # Verify FRR bgpd configuration
+ frrconfig = self.getFRRconfig(f'router bgp {ASN}',
+ endsection='^exit')
+ self.assertIn(f'router bgp {ASN}', frrconfig)
+ self.assertIn(f' address-family ipv4 unicast', frrconfig)
+
+ self.assertIn(f' import vrf {vrf}', frrconfig)
+ self.assertIn(f' import vrf route-map {route_map_in}', frrconfig)
+
+ # Verify FRR bgpd configuration
+ frr_vrf_config = self.getFRRconfig(
+ f'router bgp {ASN} vrf {vrf}', endsection='^exit')
+ self.assertIn(f'router bgp {ASN} vrf {vrf}', frr_vrf_config)
+ self.assertIn(f' bgp router-id {router_id}', frr_vrf_config)
+
+
def test_bgp_99_bmp(self):
target_name = 'instance-bmp'
target_address = '127.0.0.1'
diff --git a/smoketest/scripts/cli/test_protocols_isis.py b/smoketest/scripts/cli/test_protocols_isis.py
index 598250d28..14e833fd9 100755
--- a/smoketest/scripts/cli/test_protocols_isis.py
+++ b/smoketest/scripts/cli/test_protocols_isis.py
@@ -59,7 +59,7 @@ class TestProtocolsISIS(VyOSUnitTestSHIM.TestCase):
route_map = 'EXPORT-ISIS'
rule = '10'
metric_style = 'transition'
-
+ redistribute = ['babel', 'bgp', 'connected', 'kernel', 'nhrp', 'ospf', 'rip', 'static']
self.cli_set(['policy', 'prefix-list', prefix_list, 'rule', rule, 'action', 'permit'])
self.cli_set(['policy', 'prefix-list', prefix_list, 'rule', rule, 'prefix', '203.0.113.0/24'])
self.cli_set(['policy', 'route-map', route_map, 'rule', rule, 'action', 'permit'])
@@ -80,7 +80,9 @@ class TestProtocolsISIS(VyOSUnitTestSHIM.TestCase):
with self.assertRaises(ConfigSessionError):
self.cli_commit()
- self.cli_set(base_path + ['redistribute', 'ipv4', 'connected', 'level-2', 'route-map', route_map])
+ for proto in redistribute:
+ self.cli_set(base_path + ['redistribute', 'ipv4', proto, 'level-2', 'route-map', route_map])
+
self.cli_set(base_path + ['metric-style', metric_style])
self.cli_set(base_path + ['log-adjacency-changes'])
@@ -92,7 +94,8 @@ class TestProtocolsISIS(VyOSUnitTestSHIM.TestCase):
self.assertIn(f' net {net}', tmp)
self.assertIn(f' metric-style {metric_style}', tmp)
self.assertIn(f' log-adjacency-changes', tmp)
- self.assertIn(f' redistribute ipv4 connected level-2 route-map {route_map}', tmp)
+ for proto in redistribute:
+ self.assertIn(f' redistribute ipv4 {proto} level-2 route-map {route_map}', tmp)
for interface in self._interfaces:
tmp = self.getFRRconfig(f'interface {interface}', endsection='^exit')
diff --git a/smoketest/scripts/cli/test_protocols_mpls.py b/smoketest/scripts/cli/test_protocols_mpls.py
index 654f2f099..3840c24f4 100755
--- a/smoketest/scripts/cli/test_protocols_mpls.py
+++ b/smoketest/scripts/cli/test_protocols_mpls.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2021-2024 VyOS maintainers and contributors
+# Copyright (C) 2021-2025 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -121,5 +121,74 @@ class TestProtocolsMPLS(VyOSUnitTestSHIM.TestCase):
for interface in interfaces:
self.assertIn(f' interface {interface}', afiv4_config)
+ def test_02_mpls_disable_establish_hello(self):
+ router_id = '1.2.3.4'
+ transport_ipv4_addr = '5.6.7.8'
+ transport_ipv6_addr = '2001:db8:1111::1111'
+ interfaces = Section.interfaces('ethernet')
+
+ self.cli_set(base_path + ['router-id', router_id])
+
+ # At least one LDP interface must be configured
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
+ for interface in interfaces:
+ self.cli_set(base_path + ['interface', interface, 'disable-establish-hello'])
+
+ # LDP transport address missing
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
+ self.cli_set(base_path + ['discovery', 'transport-ipv4-address', transport_ipv4_addr])
+ self.cli_set(base_path + ['discovery', 'transport-ipv6-address', transport_ipv6_addr])
+
+ # Commit changes
+ self.cli_commit()
+
+ # Validate configuration
+ frrconfig = self.getFRRconfig('mpls ldp', endsection='^exit')
+ self.assertIn(f'mpls ldp', frrconfig)
+ self.assertIn(f' router-id {router_id}', frrconfig)
+
+ # Validate AFI IPv4
+ afiv4_config = self.getFRRconfig('mpls ldp', endsection='^exit',
+ substring=' address-family ipv4',
+ endsubsection='^ exit-address-family')
+ self.assertIn(f' discovery transport-address {transport_ipv4_addr}', afiv4_config)
+ for interface in interfaces:
+ self.assertIn(f' interface {interface}', afiv4_config)
+ self.assertIn(f' disable-establish-hello', afiv4_config)
+
+ # Validate AFI IPv6
+ afiv6_config = self.getFRRconfig('mpls ldp', endsection='^exit',
+ substring=' address-family ipv6',
+ endsubsection='^ exit-address-family')
+ self.assertIn(f' discovery transport-address {transport_ipv6_addr}', afiv6_config)
+ for interface in interfaces:
+ self.assertIn(f' interface {interface}', afiv6_config)
+ self.assertIn(f' disable-establish-hello', afiv6_config)
+
+ # Delete disable-establish-hello
+ for interface in interfaces:
+ self.cli_delete(base_path + ['interface', interface, 'disable-establish-hello'])
+
+ # Commit changes
+ self.cli_commit()
+
+ # Validate AFI IPv4
+ afiv4_config = self.getFRRconfig('mpls ldp', endsection='^exit',
+ substring=' address-family ipv4',
+ endsubsection='^ exit-address-family')
+ # Validate AFI IPv6
+ afiv6_config = self.getFRRconfig('mpls ldp', endsection='^exit',
+ substring=' address-family ipv6',
+ endsubsection='^ exit-address-family')
+ # Check deleted 'disable-establish-hello' option per interface
+ for interface in interfaces:
+ self.assertIn(f' interface {interface}', afiv4_config)
+ self.assertNotIn(f' disable-establish-hello', afiv4_config)
+ self.assertIn(f' interface {interface}', afiv6_config)
+ self.assertNotIn(f' disable-establish-hello', afiv6_config)
+
+
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_protocols_nhrp.py b/smoketest/scripts/cli/test_protocols_nhrp.py
index f6d1f1da5..73a760945 100755
--- a/smoketest/scripts/cli/test_protocols_nhrp.py
+++ b/smoketest/scripts/cli/test_protocols_nhrp.py
@@ -17,10 +17,7 @@
import unittest
from base_vyostest_shim import VyOSUnitTestSHIM
-
-from vyos.firewall import find_nftables_rule
from vyos.utils.process import process_named_running
-from vyos.utils.file import read_file
tunnel_path = ['interfaces', 'tunnel']
nhrp_path = ['protocols', 'nhrp']
diff --git a/smoketest/scripts/cli/test_protocols_ospf.py b/smoketest/scripts/cli/test_protocols_ospf.py
index 77882737f..ea55fa031 100755
--- a/smoketest/scripts/cli/test_protocols_ospf.py
+++ b/smoketest/scripts/cli/test_protocols_ospf.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2021-2024 VyOS maintainers and contributors
+# Copyright (C) 2021-2025 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -255,7 +255,7 @@ class TestProtocolsOSPF(VyOSUnitTestSHIM.TestCase):
def test_ospf_07_redistribute(self):
metric = '15'
metric_type = '1'
- redistribute = ['babel', 'bgp', 'connected', 'isis', 'kernel', 'rip', 'static']
+ redistribute = ['babel', 'bgp', 'connected', 'isis', 'kernel', 'nhrp', 'rip', 'static']
for protocol in redistribute:
self.cli_set(base_path + ['redistribute', protocol, 'metric', metric])
diff --git a/smoketest/scripts/cli/test_protocols_rip.py b/smoketest/scripts/cli/test_protocols_rip.py
index 671ef8cd5..27b543803 100755
--- a/smoketest/scripts/cli/test_protocols_rip.py
+++ b/smoketest/scripts/cli/test_protocols_rip.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2021-2023 VyOS maintainers and contributors
+# Copyright (C) 2021-2025 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -82,7 +82,7 @@ class TestProtocolsRIP(VyOSUnitTestSHIM.TestCase):
interfaces = Section.interfaces('ethernet')
neighbors = ['1.2.3.4', '1.2.3.5', '1.2.3.6']
networks = ['10.0.0.0/8', '172.16.0.0/12', '192.168.0.0/16']
- redistribute = ['bgp', 'connected', 'isis', 'kernel', 'ospf', 'static']
+ redistribute = ['bgp', 'connected', 'isis', 'kernel', 'nhrp', 'ospf', 'static']
timer_garbage = '888'
timer_timeout = '1000'
timer_update = '90'
diff --git a/smoketest/scripts/cli/test_protocols_rpki.py b/smoketest/scripts/cli/test_protocols_rpki.py
index ef2f30d3e..0addf7fee 100755
--- a/smoketest/scripts/cli/test_protocols_rpki.py
+++ b/smoketest/scripts/cli/test_protocols_rpki.py
@@ -248,5 +248,41 @@ class TestProtocolsRPKI(VyOSUnitTestSHIM.TestCase):
with self.assertRaises(ConfigSessionError):
self.cli_commit()
+ def test_rpki_source_address(self):
+ peer = '192.0.2.1'
+ port = '8080'
+ preference = '1'
+ username = 'foo'
+ source_address = '100.10.10.1'
+
+ self.cli_set(['interfaces', 'ethernet', 'eth0', 'address', f'{source_address}/24'])
+
+ # Configure a TCP cache server
+ self.cli_set(base_path + ['cache', peer, 'port', port])
+ self.cli_set(base_path + ['cache', peer, 'preference', preference])
+ self.cli_set(base_path + ['cache', peer, 'source-address', source_address])
+ self.cli_commit()
+
+ # Verify FRR configuration
+ frrconfig = self.getFRRconfig('rpki')
+ self.assertIn(f'rpki cache tcp {peer} {port} source {source_address} preference {preference}', frrconfig)
+
+ self.cli_set(['pki', 'openssh', rpki_key_name, 'private', 'key', rpki_ssh_key.replace('\n', '')])
+ self.cli_set(['pki', 'openssh', rpki_key_name, 'public', 'key', rpki_ssh_pub.replace('\n', '')])
+ self.cli_set(['pki', 'openssh', rpki_key_name, 'public', 'type', rpki_key_type])
+
+ # Configure a SSH cache server
+ self.cli_set(base_path + ['cache', peer, 'ssh', 'username', username])
+ self.cli_set(base_path + ['cache', peer, 'ssh', 'key', rpki_key_name])
+ self.cli_commit()
+
+ # Verify FRR configuration
+ frrconfig = self.getFRRconfig('rpki')
+ self.assertIn(
+ f'rpki cache ssh {peer} {port} {username} /run/frr/id_rpki_{peer} /run/frr/id_rpki_{peer}.pub source {source_address} preference {preference}',
+ frrconfig,
+ )
+
+
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_service_dhcp-server.py b/smoketest/scripts/cli/test_service_dhcp-server.py
index f891bf295..e421f04d2 100755
--- a/smoketest/scripts/cli/test_service_dhcp-server.py
+++ b/smoketest/scripts/cli/test_service_dhcp-server.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2020-2024 VyOS maintainers and contributors
+# Copyright (C) 2020-2025 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -15,6 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
+import re
import unittest
from json import loads
@@ -22,15 +23,21 @@ from json import loads
from base_vyostest_shim import VyOSUnitTestSHIM
from vyos.configsession import ConfigSessionError
+from vyos.kea import kea_add_lease
+from vyos.kea import kea_delete_lease
+from vyos.utils.process import cmd
from vyos.utils.process import process_named_running
from vyos.utils.file import read_file
from vyos.template import inc_ip
from vyos.template import dec_ip
PROCESS_NAME = 'kea-dhcp4'
+D2_PROCESS_NAME = 'kea-dhcp-ddns'
CTRL_PROCESS_NAME = 'kea-ctrl-agent'
KEA4_CONF = '/run/kea/kea-dhcp4.conf'
+KEA4_D2_CONF = '/run/kea/kea-dhcp-ddns.conf'
KEA4_CTRL = '/run/kea/dhcp4-ctrl-socket'
+HOSTSD_CLIENT = '/usr/bin/vyos-hostsd-client'
base_path = ['service', 'dhcp-server']
interface = 'dum8765'
subnet = '192.0.2.0/25'
@@ -39,15 +46,18 @@ dns_1 = inc_ip(subnet, 2)
dns_2 = inc_ip(subnet, 3)
domain_name = 'vyos.net'
+
class TestServiceDHCPServer(VyOSUnitTestSHIM.TestCase):
@classmethod
def setUpClass(cls):
super(TestServiceDHCPServer, cls).setUpClass()
- # Clear out current configuration to allow running this test on a live system
+ # Clear out current configuration to allow running this test on a live system
cls.cli_delete(cls, base_path)
cidr_mask = subnet.split('/')[-1]
- cls.cli_set(cls, ['interfaces', 'dummy', interface, 'address', f'{router}/{cidr_mask}'])
+ cls.cli_set(
+ cls, ['interfaces', 'dummy', interface, 'address', f'{router}/{cidr_mask}']
+ )
@classmethod
def tearDownClass(cls):
@@ -69,7 +79,7 @@ class TestServiceDHCPServer(VyOSUnitTestSHIM.TestCase):
self.assertTrue(isinstance(current, list), msg=f'Failed path: {path}')
self.assertTrue(0 <= key < len(current), msg=f'Failed path: {path}')
else:
- assert False, "Invalid type"
+ assert False, 'Invalid type'
current = current[key]
@@ -88,19 +98,26 @@ class TestServiceDHCPServer(VyOSUnitTestSHIM.TestCase):
self.assertTrue(key in base_obj)
self.assertEqual(base_obj[key], value)
+ def verify_service_running(self):
+ tmp = cmd('tail -n 100 /var/log/messages | grep kea')
+ self.assertTrue(process_named_running(PROCESS_NAME), msg=f'Service not running, log: {tmp}')
+
def test_dhcp_single_pool_range(self):
shared_net_name = 'SMOKE-1'
range_0_start = inc_ip(subnet, 10)
- range_0_stop = inc_ip(subnet, 20)
+ range_0_stop = inc_ip(subnet, 20)
range_1_start = inc_ip(subnet, 40)
- range_1_stop = inc_ip(subnet, 50)
+ range_1_stop = inc_ip(subnet, 50)
self.cli_set(base_path + ['listen-interface', interface])
+ self.cli_set(base_path + ['shared-network-name', shared_net_name, 'ping-check'])
+
pool = base_path + ['shared-network-name', shared_net_name, 'subnet', subnet]
self.cli_set(pool + ['subnet-id', '1'])
self.cli_set(pool + ['ignore-client-id'])
+ self.cli_set(pool + ['ping-check'])
# we use the first subnet IP address as default gateway
self.cli_set(pool + ['option', 'default-router', router])
self.cli_set(pool + ['option', 'name-server', dns_1])
@@ -121,55 +138,90 @@ class TestServiceDHCPServer(VyOSUnitTestSHIM.TestCase):
config = read_file(KEA4_CONF)
obj = loads(config)
- self.verify_config_value(obj, ['Dhcp4', 'interfaces-config'], 'interfaces', [interface])
- self.verify_config_value(obj, ['Dhcp4', 'shared-networks'], 'name', shared_net_name)
- self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'subnet', subnet)
- self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'id', 1)
- self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'match-client-id', False)
- self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'valid-lifetime', 86400)
- self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'max-valid-lifetime', 86400)
+ self.verify_config_value(
+ obj, ['Dhcp4', 'interfaces-config'], 'interfaces', [interface]
+ )
+ self.verify_config_value(
+ obj, ['Dhcp4', 'shared-networks'], 'name', shared_net_name
+ )
+ self.verify_config_value(
+ obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'subnet', subnet
+ )
+ self.verify_config_value(
+ obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'id', 1
+ )
+ self.verify_config_value(
+ obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'match-client-id', False
+ )
+ self.verify_config_value(
+ obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'valid-lifetime', 86400
+ )
+ self.verify_config_value(
+ obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'max-valid-lifetime', 86400
+ )
+
+ # Verify ping-check
+ self.verify_config_value(
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'user-context'],
+ 'enable-ping-check',
+ True
+ )
+
+ self.verify_config_value(
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'user-context'],
+ 'enable-ping-check',
+ True
+ )
# Verify options
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
- {'name': 'domain-name', 'data': domain_name})
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
+ {'name': 'domain-name', 'data': domain_name},
+ )
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
- {'name': 'domain-name-servers', 'data': f'{dns_1}, {dns_2}'})
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
+ {'name': 'domain-name-servers', 'data': f'{dns_1}, {dns_2}'},
+ )
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
- {'name': 'routers', 'data': router})
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
+ {'name': 'routers', 'data': router},
+ )
# Verify pools
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'pools'],
- {'pool': f'{range_0_start} - {range_0_stop}'})
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'pools'],
+ {'pool': f'{range_0_start} - {range_0_stop}'},
+ )
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'pools'],
- {'pool': f'{range_1_start} - {range_1_stop}'})
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'pools'],
+ {'pool': f'{range_1_start} - {range_1_stop}'},
+ )
# Check for running process
- self.assertTrue(process_named_running(PROCESS_NAME))
+ self.verify_service_running()
def test_dhcp_single_pool_options(self):
shared_net_name = 'SMOKE-0815'
- range_0_start = inc_ip(subnet, 10)
- range_0_stop = inc_ip(subnet, 20)
- smtp_server = '1.2.3.4'
- time_server = '4.3.2.1'
- tftp_server = 'tftp.vyos.io'
- search_domains = ['foo.vyos.net', 'bar.vyos.net']
- bootfile_name = 'vyos'
- bootfile_server = '192.0.2.1'
- wpad = 'http://wpad.vyos.io/foo/bar'
- server_identifier = bootfile_server
+ range_0_start = inc_ip(subnet, 10)
+ range_0_stop = inc_ip(subnet, 20)
+ smtp_server = '1.2.3.4'
+ time_server = '4.3.2.1'
+ tftp_server = 'tftp.vyos.io'
+ search_domains = ['foo.vyos.net', 'bar.vyos.net']
+ bootfile_name = 'vyos'
+ bootfile_server = '192.0.2.1'
+ wpad = 'http://wpad.vyos.io/foo/bar'
+ server_identifier = bootfile_server
ipv6_only_preferred = '300'
+ capwap_access_controller = '192.168.2.125'
pool = base_path + ['shared-network-name', shared_net_name, 'subnet', subnet]
self.cli_set(pool + ['subnet-id', '1'])
@@ -189,8 +241,16 @@ class TestServiceDHCPServer(VyOSUnitTestSHIM.TestCase):
self.cli_set(pool + ['option', 'bootfile-server', bootfile_server])
self.cli_set(pool + ['option', 'wpad-url', wpad])
self.cli_set(pool + ['option', 'server-identifier', server_identifier])
+ self.cli_set(
+ pool + ['option', 'capwap-controller', capwap_access_controller]
+ )
+
+ static_route = '10.0.0.0/24'
+ static_route_nexthop = '192.0.2.1'
- self.cli_set(pool + ['option', 'static-route', '10.0.0.0/24', 'next-hop', '192.0.2.1'])
+ self.cli_set(
+ pool + ['option', 'static-route', static_route, 'next-hop', static_route_nexthop]
+ )
self.cli_set(pool + ['option', 'ipv6-only-preferred', ipv6_only_preferred])
self.cli_set(pool + ['option', 'time-zone', 'Europe/London'])
@@ -203,95 +263,133 @@ class TestServiceDHCPServer(VyOSUnitTestSHIM.TestCase):
config = read_file(KEA4_CONF)
obj = loads(config)
- self.verify_config_value(obj, ['Dhcp4', 'shared-networks'], 'name', shared_net_name)
- self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'subnet', subnet)
- self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'boot-file-name', bootfile_name)
- self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'next-server', bootfile_server)
- self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'valid-lifetime', 86400)
- self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'max-valid-lifetime', 86400)
+ self.verify_config_value(
+ obj, ['Dhcp4', 'shared-networks'], 'name', shared_net_name
+ )
+ self.verify_config_value(
+ obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'subnet', subnet
+ )
+ self.verify_config_value(
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'subnet4'],
+ 'boot-file-name',
+ bootfile_name,
+ )
+ self.verify_config_value(
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'subnet4'],
+ 'next-server',
+ bootfile_server,
+ )
+ self.verify_config_value(
+ obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'valid-lifetime', 86400
+ )
+ self.verify_config_value(
+ obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'max-valid-lifetime', 86400
+ )
# Verify options
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
- {'name': 'domain-name', 'data': domain_name})
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
+ {'name': 'domain-name', 'data': domain_name},
+ )
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
- {'name': 'domain-name-servers', 'data': f'{dns_1}, {dns_2}'})
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
+ {'name': 'domain-name-servers', 'data': f'{dns_1}, {dns_2}'},
+ )
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
- {'name': 'domain-search', 'data': ', '.join(search_domains)})
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
+ {'name': 'domain-search', 'data': ', '.join(search_domains)},
+ )
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
- {'name': 'pop-server', 'data': smtp_server})
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
+ {'name': 'pop-server', 'data': smtp_server},
+ )
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
- {'name': 'smtp-server', 'data': smtp_server})
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
+ {'name': 'smtp-server', 'data': smtp_server},
+ )
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
- {'name': 'time-servers', 'data': time_server})
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
+ {'name': 'time-servers', 'data': time_server},
+ )
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
- {'name': 'routers', 'data': router})
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
+ {'name': 'routers', 'data': router},
+ )
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
- {'name': 'dhcp-server-identifier', 'data': server_identifier})
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
+ {'name': 'dhcp-server-identifier', 'data': server_identifier},
+ )
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
- {'name': 'tftp-server-name', 'data': tftp_server})
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
+ {'name': 'capwap-ac-v4', 'data': capwap_access_controller},
+ )
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
- {'name': 'wpad-url', 'data': wpad})
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
+ {'name': 'tftp-server-name', 'data': tftp_server},
+ )
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
- {'name': 'rfc3442-static-route', 'data': '24,10,0,0,192,0,2,1, 0,192,0,2,1'})
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
+ {'name': 'wpad-url', 'data': wpad},
+ )
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
- {'name': 'windows-static-route', 'data': '24,10,0,0,192,0,2,1'})
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
+ {
+ 'name': 'classless-static-route',
+ 'data': f'{static_route} - {static_route_nexthop}, 0.0.0.0/0 - {router}',
+ },
+ )
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
- {'name': 'v6-only-preferred', 'data': ipv6_only_preferred})
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
+ {'name': 'v6-only-preferred', 'data': ipv6_only_preferred},
+ )
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
- {'name': 'ip-forwarding', 'data': "true"})
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
+ {'name': 'ip-forwarding', 'data': 'true'},
+ )
# Time zone
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
- {'name': 'pcode', 'data': 'GMT0BST,M3.5.0/1,M10.5.0'})
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
+ {'name': 'pcode', 'data': 'GMT0BST,M3.5.0/1,M10.5.0'},
+ )
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
- {'name': 'tcode', 'data': 'Europe/London'})
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
+ {'name': 'tcode', 'data': 'Europe/London'},
+ )
# Verify pools
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'pools'],
- {'pool': f'{range_0_start} - {range_0_stop}'})
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'pools'],
+ {'pool': f'{range_0_start} - {range_0_stop}'},
+ )
# Check for running process
- self.assertTrue(process_named_running(PROCESS_NAME))
+ self.verify_service_running()
def test_dhcp_single_pool_options_scoped(self):
shared_net_name = 'SMOKE-2'
range_0_start = inc_ip(subnet, 10)
- range_0_stop = inc_ip(subnet, 20)
+ range_0_stop = inc_ip(subnet, 20)
range_router = inc_ip(subnet, 5)
range_dns_1 = inc_ip(subnet, 6)
@@ -320,40 +418,58 @@ class TestServiceDHCPServer(VyOSUnitTestSHIM.TestCase):
config = read_file(KEA4_CONF)
obj = loads(config)
- self.verify_config_value(obj, ['Dhcp4', 'shared-networks'], 'name', shared_net_name)
- self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'subnet', subnet)
- self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'valid-lifetime', 86400)
- self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'max-valid-lifetime', 86400)
+ self.verify_config_value(
+ obj, ['Dhcp4', 'shared-networks'], 'name', shared_net_name
+ )
+ self.verify_config_value(
+ obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'subnet', subnet
+ )
+ self.verify_config_value(
+ obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'valid-lifetime', 86400
+ )
+ self.verify_config_value(
+ obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'max-valid-lifetime', 86400
+ )
# Verify shared-network options
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', 0, 'option-data'],
- {'name': 'domain-name', 'data': domain_name})
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'option-data'],
+ {'name': 'domain-name', 'data': domain_name},
+ )
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', 0, 'option-data'],
- {'name': 'domain-name-servers', 'data': f'{dns_1}, {dns_2}'})
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'option-data'],
+ {'name': 'domain-name-servers', 'data': f'{dns_1}, {dns_2}'},
+ )
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', 0, 'option-data'],
- {'name': 'routers', 'data': router})
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'option-data'],
+ {'name': 'routers', 'data': router},
+ )
# Verify range options
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'pools', 0, 'option-data'],
- {'name': 'domain-name-servers', 'data': f'{range_dns_1}, {range_dns_2}'})
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'pools', 0, 'option-data'],
+ {'name': 'domain-name-servers', 'data': f'{range_dns_1}, {range_dns_2}'},
+ )
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'pools', 0, 'option-data'],
- {'name': 'routers', 'data': range_router})
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'pools', 0, 'option-data'],
+ {'name': 'routers', 'data': range_router},
+ )
# Verify pool
- self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'pools'], 'pool', f'{range_0_start} - {range_0_stop}')
+ self.verify_config_value(
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'pools'],
+ 'pool',
+ f'{range_0_start} - {range_0_stop}',
+ )
# Check for running process
- self.assertTrue(process_named_running(PROCESS_NAME))
+ self.verify_service_running()
def test_dhcp_single_pool_static_mapping(self):
shared_net_name = 'SMOKE-2'
@@ -375,18 +491,31 @@ class TestServiceDHCPServer(VyOSUnitTestSHIM.TestCase):
for client in ['client1', 'client2', 'client3']:
mac = '00:50:00:00:00:{}'.format(client_base)
self.cli_set(pool + ['static-mapping', client, 'mac', mac])
- self.cli_set(pool + ['static-mapping', client, 'ip-address', inc_ip(subnet, client_base)])
+ self.cli_set(
+ pool
+ + ['static-mapping', client, 'ip-address', inc_ip(subnet, client_base)]
+ )
client_base += 1
# cannot have both mac-address and duid set
with self.assertRaises(ConfigSessionError):
- self.cli_set(pool + ['static-mapping', 'client1', 'duid', '00:01:00:01:12:34:56:78:aa:bb:cc:dd:ee:11'])
+ self.cli_set(
+ pool
+ + [
+ 'static-mapping',
+ 'client1',
+ 'duid',
+ '00:01:00:01:12:34:56:78:aa:bb:cc:dd:ee:11',
+ ]
+ )
self.cli_commit()
self.cli_delete(pool + ['static-mapping', 'client1', 'duid'])
# cannot have mappings with duplicate IP addresses
self.cli_set(pool + ['static-mapping', 'dupe1', 'mac', '00:50:00:00:fe:ff'])
- self.cli_set(pool + ['static-mapping', 'dupe1', 'ip-address', inc_ip(subnet, 10)])
+ self.cli_set(
+ pool + ['static-mapping', 'dupe1', 'ip-address', inc_ip(subnet, 10)]
+ )
with self.assertRaises(ConfigSessionError):
self.cli_commit()
# Should allow disabled duplicate
@@ -396,17 +525,38 @@ class TestServiceDHCPServer(VyOSUnitTestSHIM.TestCase):
# cannot have mappings with duplicate MAC addresses
self.cli_set(pool + ['static-mapping', 'dupe2', 'mac', '00:50:00:00:00:10'])
- self.cli_set(pool + ['static-mapping', 'dupe2', 'ip-address', inc_ip(subnet, 120)])
+ self.cli_set(
+ pool + ['static-mapping', 'dupe2', 'ip-address', inc_ip(subnet, 120)]
+ )
with self.assertRaises(ConfigSessionError):
self.cli_commit()
self.cli_delete(pool + ['static-mapping', 'dupe2'])
-
# cannot have mappings with duplicate MAC addresses
- self.cli_set(pool + ['static-mapping', 'dupe3', 'duid', '00:01:02:03:04:05:06:07:aa:aa:aa:aa:aa:01'])
- self.cli_set(pool + ['static-mapping', 'dupe3', 'ip-address', inc_ip(subnet, 121)])
- self.cli_set(pool + ['static-mapping', 'dupe4', 'duid', '00:01:02:03:04:05:06:07:aa:aa:aa:aa:aa:01'])
- self.cli_set(pool + ['static-mapping', 'dupe4', 'ip-address', inc_ip(subnet, 121)])
+ self.cli_set(
+ pool
+ + [
+ 'static-mapping',
+ 'dupe3',
+ 'duid',
+ '00:01:02:03:04:05:06:07:aa:aa:aa:aa:aa:01',
+ ]
+ )
+ self.cli_set(
+ pool + ['static-mapping', 'dupe3', 'ip-address', inc_ip(subnet, 121)]
+ )
+ self.cli_set(
+ pool
+ + [
+ 'static-mapping',
+ 'dupe4',
+ 'duid',
+ '00:01:02:03:04:05:06:07:aa:aa:aa:aa:aa:01',
+ ]
+ )
+ self.cli_set(
+ pool + ['static-mapping', 'dupe4', 'ip-address', inc_ip(subnet, 121)]
+ )
with self.assertRaises(ConfigSessionError):
self.cli_commit()
self.cli_delete(pool + ['static-mapping', 'dupe3'])
@@ -418,25 +568,38 @@ class TestServiceDHCPServer(VyOSUnitTestSHIM.TestCase):
config = read_file(KEA4_CONF)
obj = loads(config)
- self.verify_config_value(obj, ['Dhcp4', 'shared-networks'], 'name', shared_net_name)
- self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'subnet', subnet)
- self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'id', 1)
- self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'valid-lifetime', 86400)
- self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'max-valid-lifetime', 86400)
+ self.verify_config_value(
+ obj, ['Dhcp4', 'shared-networks'], 'name', shared_net_name
+ )
+ self.verify_config_value(
+ obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'subnet', subnet
+ )
+ self.verify_config_value(
+ obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'id', 1
+ )
+ self.verify_config_value(
+ obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'valid-lifetime', 86400
+ )
+ self.verify_config_value(
+ obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'max-valid-lifetime', 86400
+ )
# Verify options
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
- {'name': 'domain-name', 'data': domain_name})
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
+ {'name': 'domain-name', 'data': domain_name},
+ )
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
- {'name': 'domain-name-servers', 'data': f'{dns_1}, {dns_2}'})
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
+ {'name': 'domain-name-servers', 'data': f'{dns_1}, {dns_2}'},
+ )
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
- {'name': 'routers', 'data': router})
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
+ {'name': 'routers', 'data': router},
+ )
client_base = 10
for client in ['client1', 'client2', 'client3']:
@@ -444,14 +607,15 @@ class TestServiceDHCPServer(VyOSUnitTestSHIM.TestCase):
ip = inc_ip(subnet, client_base)
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'reservations'],
- {'hostname': client, 'hw-address': mac, 'ip-address': ip})
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'reservations'],
+ {'hostname': client, 'hw-address': mac, 'ip-address': ip},
+ )
client_base += 1
# Check for running process
- self.assertTrue(process_named_running(PROCESS_NAME))
+ self.verify_service_running()
def test_dhcp_multiple_pools(self):
lease_time = '14400'
@@ -463,11 +627,16 @@ class TestServiceDHCPServer(VyOSUnitTestSHIM.TestCase):
dns_1 = inc_ip(subnet, 2)
range_0_start = inc_ip(subnet, 10)
- range_0_stop = inc_ip(subnet, 20)
+ range_0_stop = inc_ip(subnet, 20)
range_1_start = inc_ip(subnet, 30)
- range_1_stop = inc_ip(subnet, 40)
-
- pool = base_path + ['shared-network-name', shared_net_name, 'subnet', subnet]
+ range_1_stop = inc_ip(subnet, 40)
+
+ pool = base_path + [
+ 'shared-network-name',
+ shared_net_name,
+ 'subnet',
+ subnet,
+ ]
self.cli_set(pool + ['subnet-id', str(int(network) + 1)])
# we use the first subnet IP address as default gateway
self.cli_set(pool + ['option', 'default-router', router])
@@ -484,7 +653,15 @@ class TestServiceDHCPServer(VyOSUnitTestSHIM.TestCase):
for client in ['client1', 'client2', 'client3', 'client4']:
mac = '02:50:00:00:00:{}'.format(client_base)
self.cli_set(pool + ['static-mapping', client, 'mac', mac])
- self.cli_set(pool + ['static-mapping', client, 'ip-address', inc_ip(subnet, client_base)])
+ self.cli_set(
+ pool
+ + [
+ 'static-mapping',
+ client,
+ 'ip-address',
+ inc_ip(subnet, client_base),
+ ]
+ )
client_base += 1
# commit changes
@@ -500,37 +677,64 @@ class TestServiceDHCPServer(VyOSUnitTestSHIM.TestCase):
dns_1 = inc_ip(subnet, 2)
range_0_start = inc_ip(subnet, 10)
- range_0_stop = inc_ip(subnet, 20)
+ range_0_stop = inc_ip(subnet, 20)
range_1_start = inc_ip(subnet, 30)
- range_1_stop = inc_ip(subnet, 40)
+ range_1_stop = inc_ip(subnet, 40)
- self.verify_config_value(obj, ['Dhcp4', 'shared-networks'], 'name', shared_net_name)
- self.verify_config_value(obj, ['Dhcp4', 'shared-networks', int(network), 'subnet4'], 'subnet', subnet)
- self.verify_config_value(obj, ['Dhcp4', 'shared-networks', int(network), 'subnet4'], 'id', int(network) + 1)
- self.verify_config_value(obj, ['Dhcp4', 'shared-networks', int(network), 'subnet4'], 'valid-lifetime', int(lease_time))
- self.verify_config_value(obj, ['Dhcp4', 'shared-networks', int(network), 'subnet4'], 'max-valid-lifetime', int(lease_time))
+ self.verify_config_value(
+ obj, ['Dhcp4', 'shared-networks'], 'name', shared_net_name
+ )
+ self.verify_config_value(
+ obj,
+ ['Dhcp4', 'shared-networks', int(network), 'subnet4'],
+ 'subnet',
+ subnet,
+ )
+ self.verify_config_value(
+ obj,
+ ['Dhcp4', 'shared-networks', int(network), 'subnet4'],
+ 'id',
+ int(network) + 1,
+ )
+ self.verify_config_value(
+ obj,
+ ['Dhcp4', 'shared-networks', int(network), 'subnet4'],
+ 'valid-lifetime',
+ int(lease_time),
+ )
+ self.verify_config_value(
+ obj,
+ ['Dhcp4', 'shared-networks', int(network), 'subnet4'],
+ 'max-valid-lifetime',
+ int(lease_time),
+ )
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', int(network), 'subnet4', 0, 'option-data'],
- {'name': 'domain-name', 'data': domain_name})
+ obj,
+ ['Dhcp4', 'shared-networks', int(network), 'subnet4', 0, 'option-data'],
+ {'name': 'domain-name', 'data': domain_name},
+ )
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', int(network), 'subnet4', 0, 'option-data'],
- {'name': 'domain-name-servers', 'data': dns_1})
+ obj,
+ ['Dhcp4', 'shared-networks', int(network), 'subnet4', 0, 'option-data'],
+ {'name': 'domain-name-servers', 'data': dns_1},
+ )
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', int(network), 'subnet4', 0, 'option-data'],
- {'name': 'routers', 'data': router})
+ obj,
+ ['Dhcp4', 'shared-networks', int(network), 'subnet4', 0, 'option-data'],
+ {'name': 'routers', 'data': router},
+ )
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', int(network), 'subnet4', 0, 'pools'],
- {'pool': f'{range_0_start} - {range_0_stop}'})
+ obj,
+ ['Dhcp4', 'shared-networks', int(network), 'subnet4', 0, 'pools'],
+ {'pool': f'{range_0_start} - {range_0_stop}'},
+ )
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', int(network), 'subnet4', 0, 'pools'],
- {'pool': f'{range_1_start} - {range_1_stop}'})
+ obj,
+ ['Dhcp4', 'shared-networks', int(network), 'subnet4', 0, 'pools'],
+ {'pool': f'{range_1_start} - {range_1_stop}'},
+ )
client_base = 60
for client in ['client1', 'client2', 'client3', 'client4']:
@@ -538,20 +742,28 @@ class TestServiceDHCPServer(VyOSUnitTestSHIM.TestCase):
ip = inc_ip(subnet, client_base)
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', int(network), 'subnet4', 0, 'reservations'],
- {'hostname': client, 'hw-address': mac, 'ip-address': ip})
+ obj,
+ [
+ 'Dhcp4',
+ 'shared-networks',
+ int(network),
+ 'subnet4',
+ 0,
+ 'reservations',
+ ],
+ {'hostname': client, 'hw-address': mac, 'ip-address': ip},
+ )
client_base += 1
# Check for running process
- self.assertTrue(process_named_running(PROCESS_NAME))
+ self.verify_service_running()
def test_dhcp_exclude_not_in_range(self):
# T3180: verify else path when slicing DHCP ranges and exclude address
# is not part of the DHCP range
range_0_start = inc_ip(subnet, 10)
- range_0_stop = inc_ip(subnet, 20)
+ range_0_stop = inc_ip(subnet, 20)
pool = base_path + ['shared-network-name', 'EXCLUDE-TEST', 'subnet', subnet]
self.cli_set(pool + ['subnet-id', '1'])
@@ -567,38 +779,42 @@ class TestServiceDHCPServer(VyOSUnitTestSHIM.TestCase):
config = read_file(KEA4_CONF)
obj = loads(config)
- self.verify_config_value(obj, ['Dhcp4', 'shared-networks'], 'name', 'EXCLUDE-TEST')
- self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'subnet', subnet)
+ self.verify_config_value(
+ obj, ['Dhcp4', 'shared-networks'], 'name', 'EXCLUDE-TEST'
+ )
+ self.verify_config_value(
+ obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'subnet', subnet
+ )
pool_obj = {
'pool': f'{range_0_start} - {range_0_stop}',
- 'option-data': [{'name': 'routers', 'data': router}]
+ 'option-data': [{'name': 'routers', 'data': router}],
}
# Verify options
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
- {'name': 'routers', 'data': router})
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
+ {'name': 'routers', 'data': router},
+ )
# Verify pools
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'pools'],
- pool_obj)
+ obj, ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'pools'], pool_obj
+ )
# Check for running process
- self.assertTrue(process_named_running(PROCESS_NAME))
+ self.verify_service_running()
def test_dhcp_exclude_in_range(self):
# T3180: verify else path when slicing DHCP ranges and exclude address
# is not part of the DHCP range
range_0_start = inc_ip(subnet, 10)
- range_0_stop = inc_ip(subnet, 100)
+ range_0_stop = inc_ip(subnet, 100)
# the DHCP exclude addresse is blanked out of the range which is done
# by slicing one range into two ranges
- exclude_addr = inc_ip(range_0_start, 20)
+ exclude_addr = inc_ip(range_0_start, 20)
range_0_stop_excl = dec_ip(exclude_addr, 1)
range_0_start_excl = inc_ip(exclude_addr, 1)
@@ -616,37 +832,42 @@ class TestServiceDHCPServer(VyOSUnitTestSHIM.TestCase):
config = read_file(KEA4_CONF)
obj = loads(config)
- self.verify_config_value(obj, ['Dhcp4', 'shared-networks'], 'name', 'EXCLUDE-TEST-2')
- self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'subnet', subnet)
+ self.verify_config_value(
+ obj, ['Dhcp4', 'shared-networks'], 'name', 'EXCLUDE-TEST-2'
+ )
+ self.verify_config_value(
+ obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'subnet', subnet
+ )
pool_obj = {
'pool': f'{range_0_start} - {range_0_stop_excl}',
- 'option-data': [{'name': 'routers', 'data': router}]
+ 'option-data': [{'name': 'routers', 'data': router}],
}
pool_exclude_obj = {
'pool': f'{range_0_start_excl} - {range_0_stop}',
- 'option-data': [{'name': 'routers', 'data': router}]
+ 'option-data': [{'name': 'routers', 'data': router}],
}
# Verify options
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
- {'name': 'routers', 'data': router})
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
+ {'name': 'routers', 'data': router},
+ )
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'pools'],
- pool_obj)
+ obj, ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'pools'], pool_obj
+ )
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'pools'],
- pool_exclude_obj)
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'pools'],
+ pool_exclude_obj,
+ )
# Check for running process
- self.assertTrue(process_named_running(PROCESS_NAME))
+ self.verify_service_running()
def test_dhcp_relay_server(self):
# Listen on specific address and return DHCP leases from a non
@@ -657,7 +878,7 @@ class TestServiceDHCPServer(VyOSUnitTestSHIM.TestCase):
relay_router = inc_ip(relay_subnet, 1)
range_0_start = '10.0.1.0'
- range_0_stop = '10.0.250.255'
+ range_0_stop = '10.0.250.255'
pool = base_path + ['shared-network-name', 'RELAY', 'subnet', relay_subnet]
self.cli_set(pool + ['subnet-id', '1'])
@@ -671,31 +892,37 @@ class TestServiceDHCPServer(VyOSUnitTestSHIM.TestCase):
config = read_file(KEA4_CONF)
obj = loads(config)
- self.verify_config_value(obj, ['Dhcp4', 'interfaces-config'], 'interfaces', [f'{interface}/{router}'])
+ self.verify_config_value(
+ obj, ['Dhcp4', 'interfaces-config'], 'interfaces', [f'{interface}/{router}']
+ )
self.verify_config_value(obj, ['Dhcp4', 'shared-networks'], 'name', 'RELAY')
- self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'subnet', relay_subnet)
+ self.verify_config_value(
+ obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'subnet', relay_subnet
+ )
# Verify options
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
- {'name': 'routers', 'data': relay_router})
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
+ {'name': 'routers', 'data': relay_router},
+ )
# Verify pools
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'pools'],
- {'pool': f'{range_0_start} - {range_0_stop}'})
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'pools'],
+ {'pool': f'{range_0_start} - {range_0_stop}'},
+ )
# Check for running process
- self.assertTrue(process_named_running(PROCESS_NAME))
+ self.verify_service_running()
def test_dhcp_high_availability(self):
shared_net_name = 'FAILOVER'
failover_name = 'VyOS-Failover'
range_0_start = inc_ip(subnet, 10)
- range_0_stop = inc_ip(subnet, 20)
+ range_0_stop = inc_ip(subnet, 20)
pool = base_path + ['shared-network-name', shared_net_name, 'subnet', subnet]
self.cli_set(pool + ['subnet-id', '1'])
@@ -712,7 +939,9 @@ class TestServiceDHCPServer(VyOSUnitTestSHIM.TestCase):
failover_local = router
failover_remote = inc_ip(router, 1)
- self.cli_set(base_path + ['high-availability', 'source-address', failover_local])
+ self.cli_set(
+ base_path + ['high-availability', 'source-address', failover_local]
+ )
self.cli_set(base_path + ['high-availability', 'name', failover_name])
self.cli_set(base_path + ['high-availability', 'remote', failover_remote])
self.cli_set(base_path + ['high-availability', 'status', 'primary'])
@@ -725,43 +954,78 @@ class TestServiceDHCPServer(VyOSUnitTestSHIM.TestCase):
obj = loads(config)
# Verify failover
- self.verify_config_value(obj, ['Dhcp4', 'control-socket'], 'socket-name', KEA4_CTRL)
+ self.verify_config_value(
+ obj, ['Dhcp4', 'control-socket'], 'socket-name', KEA4_CTRL
+ )
self.verify_config_object(
obj,
- ['Dhcp4', 'hooks-libraries', 0, 'parameters', 'high-availability', 0, 'peers'],
- {'name': os.uname()[1], 'url': f'http://{failover_local}:647/', 'role': 'primary', 'auto-failover': True})
+ [
+ 'Dhcp4',
+ 'hooks-libraries',
+ 0,
+ 'parameters',
+ 'high-availability',
+ 0,
+ 'peers',
+ ],
+ {
+ 'name': os.uname()[1],
+ 'url': f'http://{failover_local}:647/',
+ 'role': 'primary',
+ 'auto-failover': True,
+ },
+ )
self.verify_config_object(
obj,
- ['Dhcp4', 'hooks-libraries', 0, 'parameters', 'high-availability', 0, 'peers'],
- {'name': failover_name, 'url': f'http://{failover_remote}:647/', 'role': 'secondary', 'auto-failover': True})
-
- self.verify_config_value(obj, ['Dhcp4', 'shared-networks'], 'name', shared_net_name)
- self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'subnet', subnet)
+ [
+ 'Dhcp4',
+ 'hooks-libraries',
+ 0,
+ 'parameters',
+ 'high-availability',
+ 0,
+ 'peers',
+ ],
+ {
+ 'name': failover_name,
+ 'url': f'http://{failover_remote}:647/',
+ 'role': 'secondary',
+ 'auto-failover': True,
+ },
+ )
+
+ self.verify_config_value(
+ obj, ['Dhcp4', 'shared-networks'], 'name', shared_net_name
+ )
+ self.verify_config_value(
+ obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'subnet', subnet
+ )
# Verify options
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
- {'name': 'routers', 'data': router})
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
+ {'name': 'routers', 'data': router},
+ )
# Verify pools
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'pools'],
- {'pool': f'{range_0_start} - {range_0_stop}'})
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'pools'],
+ {'pool': f'{range_0_start} - {range_0_stop}'},
+ )
# Check for running process
- self.assertTrue(process_named_running(PROCESS_NAME))
- self.assertTrue(process_named_running(CTRL_PROCESS_NAME))
+ self.verify_service_running()
def test_dhcp_high_availability_standby(self):
shared_net_name = 'FAILOVER'
failover_name = 'VyOS-Failover'
range_0_start = inc_ip(subnet, 10)
- range_0_stop = inc_ip(subnet, 20)
+ range_0_stop = inc_ip(subnet, 20)
pool = base_path + ['shared-network-name', shared_net_name, 'subnet', subnet]
self.cli_set(pool + ['subnet-id', '1'])
@@ -774,7 +1038,9 @@ class TestServiceDHCPServer(VyOSUnitTestSHIM.TestCase):
failover_local = router
failover_remote = inc_ip(router, 1)
- self.cli_set(base_path + ['high-availability', 'source-address', failover_local])
+ self.cli_set(
+ base_path + ['high-availability', 'source-address', failover_local]
+ )
self.cli_set(base_path + ['high-availability', 'name', failover_name])
self.cli_set(base_path + ['high-availability', 'remote', failover_remote])
self.cli_set(base_path + ['high-availability', 'status', 'secondary'])
@@ -787,61 +1053,383 @@ class TestServiceDHCPServer(VyOSUnitTestSHIM.TestCase):
obj = loads(config)
# Verify failover
- self.verify_config_value(obj, ['Dhcp4', 'control-socket'], 'socket-name', KEA4_CTRL)
+ self.verify_config_value(
+ obj, ['Dhcp4', 'control-socket'], 'socket-name', KEA4_CTRL
+ )
self.verify_config_object(
obj,
- ['Dhcp4', 'hooks-libraries', 0, 'parameters', 'high-availability', 0, 'peers'],
- {'name': os.uname()[1], 'url': f'http://{failover_local}:647/', 'role': 'standby', 'auto-failover': True})
+ [
+ 'Dhcp4',
+ 'hooks-libraries',
+ 0,
+ 'parameters',
+ 'high-availability',
+ 0,
+ 'peers',
+ ],
+ {
+ 'name': os.uname()[1],
+ 'url': f'http://{failover_local}:647/',
+ 'role': 'standby',
+ 'auto-failover': True,
+ },
+ )
self.verify_config_object(
obj,
- ['Dhcp4', 'hooks-libraries', 0, 'parameters', 'high-availability', 0, 'peers'],
- {'name': failover_name, 'url': f'http://{failover_remote}:647/', 'role': 'primary', 'auto-failover': True})
+ [
+ 'Dhcp4',
+ 'hooks-libraries',
+ 0,
+ 'parameters',
+ 'high-availability',
+ 0,
+ 'peers',
+ ],
+ {
+ 'name': failover_name,
+ 'url': f'http://{failover_remote}:647/',
+ 'role': 'primary',
+ 'auto-failover': True,
+ },
+ )
+
+ self.verify_config_value(
+ obj, ['Dhcp4', 'shared-networks'], 'name', shared_net_name
+ )
+ self.verify_config_value(
+ obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'subnet', subnet
+ )
+
+ # Verify options
+ self.verify_config_object(
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
+ {'name': 'routers', 'data': router},
+ )
+
+ # Verify pools
+ self.verify_config_object(
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'pools'],
+ {'pool': f'{range_0_start} - {range_0_stop}'},
+ )
+
+ # Check for running process
+ self.verify_service_running()
+
+ def test_dhcp_dynamic_dns_update(self):
+ shared_net_name = 'SMOKE-1DDNS'
+
+ range_0_start = inc_ip(subnet, 10)
+ range_0_stop = inc_ip(subnet, 20)
+
+ self.cli_set(base_path + ['listen-interface', interface])
+
+ ddns = base_path + ['dynamic-dns-update']
+
+ self.cli_set(ddns + ['send-updates', 'enable'])
+ self.cli_set(ddns + ['conflict-resolution', 'enable'])
+ self.cli_set(ddns + ['override-no-update', 'enable'])
+ self.cli_set(ddns + ['override-client-update', 'enable'])
+ self.cli_set(ddns + ['replace-client-name', 'always'])
+ self.cli_set(ddns + ['update-on-renew', 'enable'])
+
+ self.cli_set(ddns + ['tsig-key', 'domain-lan-updates', 'algorithm', 'sha256'])
+ self.cli_set(ddns + ['tsig-key', 'domain-lan-updates', 'secret', 'SXQncyBXZWRuZXNkYXkgbWFoIGR1ZGVzIQ=='])
+ self.cli_set(ddns + ['tsig-key', 'reverse-0-168-192', 'algorithm', 'sha256'])
+ self.cli_set(ddns + ['tsig-key', 'reverse-0-168-192', 'secret', 'VGhhbmsgR29kIGl0J3MgRnJpZGF5IQ=='])
+ self.cli_set(ddns + ['forward-domain', 'domain.lan', 'dns-server', '1', 'address', '192.168.0.1'])
+ self.cli_set(ddns + ['forward-domain', 'domain.lan', 'dns-server', '2', 'address', '100.100.0.1'])
+ self.cli_set(ddns + ['forward-domain', 'domain.lan', 'key-name', 'domain-lan-updates'])
+ self.cli_set(ddns + ['reverse-domain', '0.168.192.in-addr.arpa', 'dns-server', '1', 'address', '192.168.0.1'])
+ self.cli_set(ddns + ['reverse-domain', '0.168.192.in-addr.arpa', 'dns-server', '1', 'port', '1053'])
+ self.cli_set(ddns + ['reverse-domain', '0.168.192.in-addr.arpa', 'dns-server', '2', 'address', '100.100.0.1'])
+ self.cli_set(ddns + ['reverse-domain', '0.168.192.in-addr.arpa', 'dns-server', '2', 'port', '1153'])
+ self.cli_set(ddns + ['reverse-domain', '0.168.192.in-addr.arpa', 'key-name', 'reverse-0-168-192'])
+
+ shared = base_path + ['shared-network-name', shared_net_name]
+
+ self.cli_set(shared + ['dynamic-dns-update', 'send-updates', 'enable'])
+ self.cli_set(shared + ['dynamic-dns-update', 'conflict-resolution', 'enable'])
+ self.cli_set(shared + ['dynamic-dns-update', 'ttl-percent', '75'])
+
+ pool = shared + [ 'subnet', subnet]
+ self.cli_set(pool + ['subnet-id', '1'])
+
+ self.cli_set(pool + ['range', '0', 'start', range_0_start])
+ self.cli_set(pool + ['range', '0', 'stop', range_0_stop])
+
+ self.cli_set(pool + ['dynamic-dns-update', 'send-updates', 'enable'])
+ self.cli_set(pool + ['dynamic-dns-update', 'generated-prefix', 'myfunnyprefix'])
+ self.cli_set(pool + ['dynamic-dns-update', 'qualifying-suffix', 'suffix.lan'])
+ self.cli_set(pool + ['dynamic-dns-update', 'hostname-char-set', 'xXyYzZ'])
+ self.cli_set(pool + ['dynamic-dns-update', 'hostname-char-replacement', '_xXx_'])
+
+ self.cli_commit()
+
+ config = read_file(KEA4_CONF)
+ d2_config = read_file(KEA4_D2_CONF)
+
+ obj = loads(config)
+ d2_obj = loads(d2_config)
+
+ # Verify global DDNS parameters in the main config file
+ self.verify_config_value(
+ obj,
+ ['Dhcp4'], 'dhcp-ddns',
+ {'enable-updates': True, 'server-ip': '127.0.0.1', 'server-port': 53001, 'sender-ip': '', 'sender-port': 0,
+ 'max-queue-size': 1024, 'ncr-protocol': 'UDP', 'ncr-format': 'JSON'})
+
+ self.verify_config_value(obj, ['Dhcp4'], 'ddns-send-updates', True)
+ self.verify_config_value(obj, ['Dhcp4'], 'ddns-use-conflict-resolution', True)
+ self.verify_config_value(obj, ['Dhcp4'], 'ddns-override-no-update', True)
+ self.verify_config_value(obj, ['Dhcp4'], 'ddns-override-client-update', True)
+ self.verify_config_value(obj, ['Dhcp4'], 'ddns-replace-client-name', 'always')
+ self.verify_config_value(obj, ['Dhcp4'], 'ddns-update-on-renew', True)
+
+ # Verify scoped DDNS parameters in the main config file
self.verify_config_value(obj, ['Dhcp4', 'shared-networks'], 'name', shared_net_name)
+ self.verify_config_value(obj, ['Dhcp4', 'shared-networks'], 'ddns-send-updates', True)
+ self.verify_config_value(obj, ['Dhcp4', 'shared-networks'], 'ddns-use-conflict-resolution', True)
+ self.verify_config_value(obj, ['Dhcp4', 'shared-networks'], 'ddns-ttl-percent', 0.75)
+
self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'subnet', subnet)
+ self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'id', 1)
+ self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'ddns-send-updates', True)
+ self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'ddns-generated-prefix', 'myfunnyprefix')
+ self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'ddns-qualifying-suffix', 'suffix.lan')
+ self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'hostname-char-set', 'xXyYzZ')
+ self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'hostname-char-replacement', '_xXx_')
- # Verify options
+ # Verify keys and domains configuration in the D2 config
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
- {'name': 'routers', 'data': router})
+ d2_obj,
+ ['DhcpDdns', 'tsig-keys'],
+ {'name': 'domain-lan-updates', 'algorithm': 'HMAC-SHA256', 'secret': 'SXQncyBXZWRuZXNkYXkgbWFoIGR1ZGVzIQ=='}
+ )
+ self.verify_config_object(
+ d2_obj,
+ ['DhcpDdns', 'tsig-keys'],
+ {'name': 'reverse-0-168-192', 'algorithm': 'HMAC-SHA256', 'secret': 'VGhhbmsgR29kIGl0J3MgRnJpZGF5IQ=='}
+ )
- # Verify pools
+ self.verify_config_value(d2_obj, ['DhcpDdns', 'forward-ddns', 'ddns-domains', 0], 'name', 'domain.lan')
+ self.verify_config_value(d2_obj, ['DhcpDdns', 'forward-ddns', 'ddns-domains', 0], 'key-name', 'domain-lan-updates')
self.verify_config_object(
- obj,
- ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'pools'],
- {'pool': f'{range_0_start} - {range_0_stop}'})
+ d2_obj,
+ ['DhcpDdns', 'forward-ddns', 'ddns-domains', 0, 'dns-servers'],
+ {'ip-address': '192.168.0.1'}
+ )
+ self.verify_config_object(
+ d2_obj,
+ ['DhcpDdns', 'forward-ddns', 'ddns-domains', 0, 'dns-servers'],
+ {'ip-address': '100.100.0.1'}
+ )
+
+ self.verify_config_value(d2_obj, ['DhcpDdns', 'reverse-ddns', 'ddns-domains', 0], 'name', '0.168.192.in-addr.arpa')
+ self.verify_config_value(d2_obj, ['DhcpDdns', 'reverse-ddns', 'ddns-domains', 0], 'key-name', 'reverse-0-168-192')
+ self.verify_config_object(
+ d2_obj,
+ ['DhcpDdns', 'reverse-ddns', 'ddns-domains', 0, 'dns-servers'],
+ {'ip-address': '192.168.0.1', 'port': 1053}
+ )
+ self.verify_config_object(
+ d2_obj,
+ ['DhcpDdns', 'reverse-ddns', 'ddns-domains', 0, 'dns-servers'],
+ {'ip-address': '100.100.0.1', 'port': 1153}
+ )
# Check for running process
self.assertTrue(process_named_running(PROCESS_NAME))
- self.assertTrue(process_named_running(CTRL_PROCESS_NAME))
+ self.assertTrue(process_named_running(D2_PROCESS_NAME))
def test_dhcp_on_interface_with_vrf(self):
self.cli_set(['interfaces', 'ethernet', 'eth1', 'address', '10.1.1.1/30'])
self.cli_set(['interfaces', 'ethernet', 'eth1', 'vrf', 'SMOKE-DHCP'])
- self.cli_set(['protocols', 'static', 'route', '10.1.10.0/24', 'interface', 'eth1', 'vrf', 'SMOKE-DHCP'])
- self.cli_set(['vrf', 'name', 'SMOKE-DHCP', 'protocols', 'static', 'route', '10.1.10.0/24', 'next-hop', '10.1.1.2'])
+ self.cli_set(
+ [
+ 'protocols',
+ 'static',
+ 'route',
+ '10.1.10.0/24',
+ 'interface',
+ 'eth1',
+ 'vrf',
+ 'SMOKE-DHCP',
+ ]
+ )
+ self.cli_set(
+ [
+ 'vrf',
+ 'name',
+ 'SMOKE-DHCP',
+ 'protocols',
+ 'static',
+ 'route',
+ '10.1.10.0/24',
+ 'next-hop',
+ '10.1.1.2',
+ ]
+ )
self.cli_set(['vrf', 'name', 'SMOKE-DHCP', 'table', '1000'])
- self.cli_set(base_path + ['shared-network-name', 'SMOKE-DHCP-NETWORK', 'subnet', '10.1.10.0/24', 'subnet-id', '1'])
- self.cli_set(base_path + ['shared-network-name', 'SMOKE-DHCP-NETWORK', 'subnet', '10.1.10.0/24', 'option', 'default-router', '10.1.10.1'])
- self.cli_set(base_path + ['shared-network-name', 'SMOKE-DHCP-NETWORK', 'subnet', '10.1.10.0/24', 'option', 'name-server', '1.1.1.1'])
- self.cli_set(base_path + ['shared-network-name', 'SMOKE-DHCP-NETWORK', 'subnet', '10.1.10.0/24', 'range', '1', 'start', '10.1.10.10'])
- self.cli_set(base_path + ['shared-network-name', 'SMOKE-DHCP-NETWORK', 'subnet', '10.1.10.0/24', 'range', '1', 'stop', '10.1.10.20'])
+ self.cli_set(
+ base_path
+ + [
+ 'shared-network-name',
+ 'SMOKE-DHCP-NETWORK',
+ 'subnet',
+ '10.1.10.0/24',
+ 'subnet-id',
+ '1',
+ ]
+ )
+ self.cli_set(
+ base_path
+ + [
+ 'shared-network-name',
+ 'SMOKE-DHCP-NETWORK',
+ 'subnet',
+ '10.1.10.0/24',
+ 'option',
+ 'default-router',
+ '10.1.10.1',
+ ]
+ )
+ self.cli_set(
+ base_path
+ + [
+ 'shared-network-name',
+ 'SMOKE-DHCP-NETWORK',
+ 'subnet',
+ '10.1.10.0/24',
+ 'option',
+ 'name-server',
+ '1.1.1.1',
+ ]
+ )
+ self.cli_set(
+ base_path
+ + [
+ 'shared-network-name',
+ 'SMOKE-DHCP-NETWORK',
+ 'subnet',
+ '10.1.10.0/24',
+ 'range',
+ '1',
+ 'start',
+ '10.1.10.10',
+ ]
+ )
+ self.cli_set(
+ base_path
+ + [
+ 'shared-network-name',
+ 'SMOKE-DHCP-NETWORK',
+ 'subnet',
+ '10.1.10.0/24',
+ 'range',
+ '1',
+ 'stop',
+ '10.1.10.20',
+ ]
+ )
self.cli_set(base_path + ['listen-address', '10.1.1.1'])
self.cli_commit()
config = read_file(KEA4_CONF)
obj = loads(config)
- self.verify_config_value(obj, ['Dhcp4', 'interfaces-config'], 'interfaces', ['eth1/10.1.1.1'])
+ self.verify_config_value(
+ obj, ['Dhcp4', 'interfaces-config'], 'interfaces', ['eth1/10.1.1.1']
+ )
self.cli_delete(['interfaces', 'ethernet', 'eth1', 'vrf', 'SMOKE-DHCP'])
- self.cli_delete(['protocols', 'static', 'route', '10.1.10.0/24', 'interface', 'eth1', 'vrf'])
+ self.cli_delete(
+ ['protocols', 'static', 'route', '10.1.10.0/24', 'interface', 'eth1', 'vrf']
+ )
self.cli_delete(['vrf', 'name', 'SMOKE-DHCP'])
self.cli_commit()
+ def test_dhcp_hostsd_lease_sync(self):
+ shared_net_name = 'SMOKE-LEASE-SYNC'
+ domain_name = 'sync.private'
+
+ client_range = range(1, 4)
+ subnet_range_start = inc_ip(subnet, 10)
+ subnet_range_stop = inc_ip(subnet, 20)
+
+ def internal_cleanup():
+ for seq in client_range:
+ ip_addr = inc_ip(subnet, seq)
+ kea_delete_lease(4, ip_addr)
+ cmd(
+ f'{HOSTSD_CLIENT} --delete-hosts --tag dhcp-server-{ip_addr} --apply'
+ )
+
+ self.addClassCleanup(internal_cleanup)
+
+ pool = base_path + ['shared-network-name', shared_net_name, 'subnet', subnet]
+ self.cli_set(pool + ['subnet-id', '1'])
+ self.cli_set(pool + ['option', 'domain-name', domain_name])
+ self.cli_set(pool + ['range', '0', 'start', subnet_range_start])
+ self.cli_set(pool + ['range', '0', 'stop', subnet_range_stop])
+
+ # commit changes
+ self.cli_commit()
+
+ config = read_file(KEA4_CONF)
+ obj = loads(config)
+
+ self.verify_config_value(
+ obj, ['Dhcp4', 'shared-networks'], 'name', shared_net_name
+ )
+ self.verify_config_value(
+ obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'subnet', subnet
+ )
+ self.verify_config_value(
+ obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'id', 1
+ )
+
+ # Verify options
+ self.verify_config_object(
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'],
+ {'name': 'domain-name', 'data': domain_name},
+ )
+ self.verify_config_object(
+ obj,
+ ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'pools'],
+ {'pool': f'{subnet_range_start} - {subnet_range_stop}'},
+ )
+
+ # Check for running process
+ self.verify_service_running()
+
+ # All up and running, now test vyos-hostsd store
+
+ # 1. Inject leases into kea
+ for seq in client_range:
+ client = f'client{seq}'
+ mac = f'00:50:00:00:00:{seq:02}'
+ ip = inc_ip(subnet, seq)
+ kea_add_lease(4, ip, host_name=client, mac_address=mac)
+
+ # 2. Verify that leases are not available in vyos-hostsd
+ tag_regex = re.escape(f'dhcp-server-{subnet.rsplit(".", 1)[0]}')
+ host_json = cmd(f'{HOSTSD_CLIENT} --get-hosts {tag_regex}')
+ self.assertFalse(host_json.strip('{}'))
+
+ # 3. Restart the service to trigger vyos-hostsd sync and wait for it to start
+ self.assertTrue(process_named_running(PROCESS_NAME, timeout=30))
+
+ # 4. Verify that leases are synced and available in vyos-hostsd
+ tag_regex = re.escape(f'dhcp-server-{subnet.rsplit(".", 1)[0]}')
+ host_json = cmd(f'{HOSTSD_CLIENT} --get-hosts {tag_regex}')
+ self.assertTrue(host_json)
+
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_service_dhcpv6-server.py b/smoketest/scripts/cli/test_service_dhcpv6-server.py
index 6ecf6c1cf..6535ca72d 100755
--- a/smoketest/scripts/cli/test_service_dhcpv6-server.py
+++ b/smoketest/scripts/cli/test_service_dhcpv6-server.py
@@ -108,6 +108,7 @@ class TestServiceDHCPv6Server(VyOSUnitTestSHIM.TestCase):
self.cli_set(pool + ['lease-time', 'default', lease_time])
self.cli_set(pool + ['lease-time', 'maximum', max_lease_time])
self.cli_set(pool + ['lease-time', 'minimum', min_lease_time])
+ self.cli_set(pool + ['option', 'capwap-controller', dns_1])
self.cli_set(pool + ['option', 'name-server', dns_1])
self.cli_set(pool + ['option', 'name-server', dns_2])
self.cli_set(pool + ['option', 'name-server', dns_2])
@@ -157,6 +158,10 @@ class TestServiceDHCPv6Server(VyOSUnitTestSHIM.TestCase):
self.verify_config_object(
obj,
['Dhcp6', 'shared-networks', 0, 'subnet6', 0, 'option-data'],
+ {'name': 'capwap-ac-v6', 'data': dns_1})
+ self.verify_config_object(
+ obj,
+ ['Dhcp6', 'shared-networks', 0, 'subnet6', 0, 'option-data'],
{'name': 'dns-servers', 'data': f'{dns_1}, {dns_2}'})
self.verify_config_object(
obj,
diff --git a/smoketest/scripts/cli/test_service_https.py b/smoketest/scripts/cli/test_service_https.py
index 04c4a2e51..b4fe35d81 100755
--- a/smoketest/scripts/cli/test_service_https.py
+++ b/smoketest/scripts/cli/test_service_https.py
@@ -16,6 +16,7 @@
import unittest
import json
+import psutil
from requests import request
from urllib3.exceptions import InsecureRequestWarning
@@ -113,6 +114,29 @@ class TestHTTPSService(VyOSUnitTestSHIM.TestCase):
# Check for stopped process
self.assertFalse(process_named_running(PROCESS_NAME))
+ def test_listen_address(self):
+ test_prefix = ['192.0.2.1/26', '2001:db8:1::ffff/64']
+ test_addr = [ i.split('/')[0] for i in test_prefix ]
+ for i, addr in enumerate(test_prefix):
+ self.cli_set(['interfaces', 'dummy', f'dum{i}', 'address', addr])
+
+ key = 'MySuperSecretVyOS'
+ self.cli_set(base_path + ['api', 'keys', 'id', 'key-01', 'key', key])
+ # commit base config first, for testing update of listen-address
+ self.cli_commit()
+
+ for addr in test_addr:
+ self.cli_set(base_path + ['listen-address', addr])
+ self.cli_commit()
+
+ res = set()
+ t = psutil.net_connections(kind="tcp")
+ for c in t:
+ if c.laddr.port == 443:
+ res.add(c.laddr.ip)
+
+ self.assertEqual(res, set(test_addr))
+
def test_certificate(self):
cert_name = 'test_https'
dh_name = 'dh-test'
diff --git a/smoketest/scripts/cli/test_service_ids_ddos-protection.py b/smoketest/scripts/cli/test_service_ids_ddos-protection.py
deleted file mode 100755
index 91b056eea..000000000
--- a/smoketest/scripts/cli/test_service_ids_ddos-protection.py
+++ /dev/null
@@ -1,116 +0,0 @@
-#!/usr/bin/env python3
-#
-# Copyright (C) 2022 VyOS maintainers and contributors
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 2 or later as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import unittest
-
-from base_vyostest_shim import VyOSUnitTestSHIM
-
-from vyos.configsession import ConfigSessionError
-from vyos.utils.process import process_named_running
-from vyos.utils.file import read_file
-
-PROCESS_NAME = 'fastnetmon'
-FASTNETMON_CONF = '/run/fastnetmon/fastnetmon.conf'
-NETWORKS_CONF = '/run/fastnetmon/networks_list'
-EXCLUDED_NETWORKS_CONF = '/run/fastnetmon/excluded_networks_list'
-base_path = ['service', 'ids', 'ddos-protection']
-
-class TestServiceIDS(VyOSUnitTestSHIM.TestCase):
- @classmethod
- def setUpClass(cls):
- super(TestServiceIDS, cls).setUpClass()
-
- # ensure we can also run this test on a live system - so lets clean
- # out the current configuration :)
- cls.cli_delete(cls, base_path)
-
- def tearDown(self):
- # Check for running process
- self.assertTrue(process_named_running(PROCESS_NAME))
-
- # delete test config
- self.cli_delete(base_path)
- self.cli_commit()
-
- self.assertFalse(os.path.exists(FASTNETMON_CONF))
- self.assertFalse(process_named_running(PROCESS_NAME))
-
- def test_fastnetmon(self):
- networks = ['10.0.0.0/24', '10.5.5.0/24', '2001:db8:10::/64', '2001:db8:20::/64']
- excluded_networks = ['10.0.0.1/32', '2001:db8:10::1/128']
- interfaces = ['eth0', 'eth1']
- fps = '3500'
- mbps = '300'
- pps = '60000'
-
- self.cli_set(base_path + ['mode', 'mirror'])
- # Required network!
- with self.assertRaises(ConfigSessionError):
- self.cli_commit()
- for tmp in networks:
- self.cli_set(base_path + ['network', tmp])
-
- # optional excluded-network!
- with self.assertRaises(ConfigSessionError):
- self.cli_commit()
- for tmp in excluded_networks:
- self.cli_set(base_path + ['excluded-network', tmp])
-
- # Required interface(s)!
- with self.assertRaises(ConfigSessionError):
- self.cli_commit()
- for tmp in interfaces:
- self.cli_set(base_path + ['listen-interface', tmp])
-
- self.cli_set(base_path + ['direction', 'in'])
- self.cli_set(base_path + ['threshold', 'general', 'fps', fps])
- self.cli_set(base_path + ['threshold', 'general', 'pps', pps])
- self.cli_set(base_path + ['threshold', 'general', 'mbps', mbps])
-
- # commit changes
- self.cli_commit()
-
- # Check configured port
- config = read_file(FASTNETMON_CONF)
- self.assertIn(f'mirror_afpacket = on', config)
- self.assertIn(f'process_incoming_traffic = on', config)
- self.assertIn(f'process_outgoing_traffic = off', config)
- self.assertIn(f'ban_for_flows = on', config)
- self.assertIn(f'threshold_flows = {fps}', config)
- self.assertIn(f'ban_for_bandwidth = on', config)
- self.assertIn(f'threshold_mbps = {mbps}', config)
- self.assertIn(f'ban_for_pps = on', config)
- self.assertIn(f'threshold_pps = {pps}', config)
- # default
- self.assertIn(f'enable_ban = on', config)
- self.assertIn(f'enable_ban_ipv6 = on', config)
- self.assertIn(f'ban_time = 1900', config)
-
- tmp = ','.join(interfaces)
- self.assertIn(f'interfaces = {tmp}', config)
-
-
- network_config = read_file(NETWORKS_CONF)
- for tmp in networks:
- self.assertIn(f'{tmp}', network_config)
-
- excluded_network_config = read_file(EXCLUDED_NETWORKS_CONF)
- for tmp in excluded_networks:
- self.assertIn(f'{tmp}', excluded_network_config)
-
-if __name__ == '__main__':
- unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_service_ipoe-server.py b/smoketest/scripts/cli/test_service_ipoe-server.py
index ab0898d17..3b3c205cd 100755
--- a/smoketest/scripts/cli/test_service_ipoe-server.py
+++ b/smoketest/scripts/cli/test_service_ipoe-server.py
@@ -260,7 +260,7 @@ delegate={delegate_2_prefix},{delegate_mask},name={pool_name}"""
tmp = ','.join(vlans)
self.assertIn(f'{interface},{tmp}', conf['ipoe']['vlan-mon'])
- def test_ipoe_server_static_client_ip(self):
+ def test_ipoe_server_static_client_ip_address(self):
mac_address = '08:00:27:2f:d8:06'
ip_address = '192.0.2.100'
@@ -274,7 +274,7 @@ delegate={delegate_2_prefix},{delegate_mask},name={pool_name}"""
interface,
'mac',
mac_address,
- 'static-ip',
+ 'ip-address',
ip_address,
]
)
@@ -295,6 +295,28 @@ delegate={delegate_2_prefix},{delegate_mask},name={pool_name}"""
tmp = re.findall(regex, tmp)
self.assertTrue(tmp)
+ def test_ipoe_server_start_session(self):
+ start_session = 'auto'
+
+ # Configuration of local authentication for PPPoE server
+ self.basic_config()
+ self.cli_commit()
+
+ # Validate configuration values
+ conf = ConfigParser(allow_no_value=True, delimiters='=', strict=False)
+ conf.read(self._config_file)
+ # if 'start-session' option is not set the default value is 'dhcp'
+ self.assertIn(f'start=dhcpv4', conf['ipoe']['interface'])
+
+ # change 'start-session' option to 'auto'
+ self.set(['interface', interface, 'start-session', start_session])
+ self.cli_commit()
+
+ # Validate changed configuration values
+ conf = ConfigParser(allow_no_value=True, delimiters='=', strict=False)
+ conf.read(self._config_file)
+ self.assertIn(f'start={start_session}', conf['ipoe']['interface'])
+
@unittest.skip("PPP is not a part of IPoE")
def test_accel_ppp_options(self):
pass
diff --git a/smoketest/scripts/cli/test_service_lldp.py b/smoketest/scripts/cli/test_service_lldp.py
index 9d72ef78f..c73707e0d 100755
--- a/smoketest/scripts/cli/test_service_lldp.py
+++ b/smoketest/scripts/cli/test_service_lldp.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2022-2024 VyOS maintainers and contributors
+# Copyright (C) 2022-2025 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -117,6 +117,8 @@ class TestServiceLLDP(VyOSUnitTestSHIM.TestCase):
config = read_file(LLDPD_CONF)
self.assertIn(f'configure ports {interface} med location elin "{elin}"', config)
+ # This is the CLI default mode
+ self.assertIn(f'configure ports {interface} lldp status rx-and-tx', config)
self.assertIn(f'configure system interface pattern "{interface}"', config)
def test_06_lldp_snmp(self):
@@ -134,5 +136,50 @@ class TestServiceLLDP(VyOSUnitTestSHIM.TestCase):
self.cli_delete(['service', 'snmp'])
+ def test_07_lldp_interface_mode(self):
+ interfaces = Section.interfaces('ethernet', vlan=False)
+
+ # set interface mode to 'tx'
+ self.cli_set(base_path + ['interface', 'all'])
+ for interface in interfaces:
+ self.cli_set(base_path + ['interface', interface, 'mode', 'disable'])
+ # commit changes
+ self.cli_commit()
+
+ # verify configuration
+ config = read_file(LLDPD_CONF)
+ for interface in interfaces:
+ self.assertIn(f'configure ports {interface} lldp status disable', config)
+
+ # Change configuration to rx-only
+ for interface in interfaces:
+ self.cli_set(base_path + ['interface', interface, 'mode', 'rx'])
+ # commit changes
+ self.cli_commit()
+ # verify configuration
+ config = read_file(LLDPD_CONF)
+ for interface in interfaces:
+ self.assertIn(f'configure ports {interface} lldp status rx-only', config)
+
+ # Change configuration to tx-only
+ for interface in interfaces:
+ self.cli_set(base_path + ['interface', interface, 'mode', 'tx'])
+ # commit changes
+ self.cli_commit()
+ # verify configuration
+ config = read_file(LLDPD_CONF)
+ for interface in interfaces:
+ self.assertIn(f'configure ports {interface} lldp status tx-only', config)
+
+ # Change configuration to rx-only
+ for interface in interfaces:
+ self.cli_set(base_path + ['interface', interface, 'mode', 'rx-tx'])
+ # commit changes
+ self.cli_commit()
+ # verify configuration
+ config = read_file(LLDPD_CONF)
+ for interface in interfaces:
+ self.assertIn(f'configure ports {interface} lldp status rx-and-tx', config)
+
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_service_router-advert.py b/smoketest/scripts/cli/test_service_router-advert.py
index 6dbb6add4..33ce93ef4 100755
--- a/smoketest/scripts/cli/test_service_router-advert.py
+++ b/smoketest/scripts/cli/test_service_router-advert.py
@@ -252,6 +252,118 @@ class TestServiceRADVD(VyOSUnitTestSHIM.TestCase):
tmp = get_config_value('AdvIntervalOpt')
self.assertEqual(tmp, 'off')
+ def test_auto_ignore(self):
+ isp_prefix = '2001:db8::/64'
+ ula_prefixes = ['fd00::/64', 'fd01::/64']
+
+ # configure wildcard prefix
+ self.cli_set(base_path + ['prefix', '::/64'])
+
+ # test auto-ignore CLI behaviors with no prefix overrides
+ # set auto-ignore for all three prefixes
+ self.cli_set(base_path + ['auto-ignore', isp_prefix])
+
+ for ula_prefix in ula_prefixes:
+ self.cli_set(base_path + ['auto-ignore', ula_prefix])
+
+ # commit and reload config
+ self.cli_commit()
+ config = read_file(RADVD_CONF)
+
+ # ensure autoignoreprefixes block is generated in config file
+ tmp = f'autoignoreprefixes' + ' {'
+ self.assertIn(tmp, config)
+
+ # ensure all three prefixes are contained in the block
+ self.assertIn(f' {isp_prefix};', config)
+ for ula_prefix in ula_prefixes:
+ self.assertIn(f' {ula_prefix};', config)
+
+ # remove a prefix and verify it's gone
+ self.cli_delete(base_path + ['auto-ignore', ula_prefixes[1]])
+
+ # commit and reload config
+ self.cli_commit()
+ config = read_file(RADVD_CONF)
+
+ self.assertNotIn(f' {ula_prefixes[1]};', config)
+
+ # ensure remaining two prefixes are still present
+ self.assertIn(f' {ula_prefixes[0]};', config)
+ self.assertIn(f' {isp_prefix};', config)
+
+ # remove the remaining two prefixes and verify the config block is gone
+ self.cli_delete(base_path + ['auto-ignore', ula_prefixes[0]])
+ self.cli_delete(base_path + ['auto-ignore', isp_prefix])
+
+ # commit and reload config
+ self.cli_commit()
+ config = read_file(RADVD_CONF)
+
+ tmp = f'autoignoreprefixes' + ' {'
+ self.assertNotIn(tmp, config)
+
+ # test wildcard prefix overrides, with and without auto-ignore CLI configuration
+ newline = '\n'
+ left_curly = '{'
+ right_curly = '}'
+
+ # override ULA prefixes
+ for ula_prefix in ula_prefixes:
+ self.cli_set(base_path + ['prefix', ula_prefix])
+
+ # commit and reload config
+ self.cli_commit()
+ config = read_file(RADVD_CONF)
+
+ # ensure autoignoreprefixes block is generated in config file with both prefixes
+ tmp = f'autoignoreprefixes' + f' {left_curly}{newline} {ula_prefixes[0]};{newline} {ula_prefixes[1]};{newline} {right_curly};'
+ self.assertIn(tmp, config)
+
+ # remove a ULA prefix and ensure there is only one prefix in the config block
+ self.cli_delete(base_path + ['prefix', ula_prefixes[0]])
+
+ # commit and reload config
+ self.cli_commit()
+ config = read_file(RADVD_CONF)
+
+ # ensure autoignoreprefixes block is generated in config file with only one prefix
+ tmp = f'autoignoreprefixes' + f' {left_curly}{newline} {ula_prefixes[1]};{newline} {right_curly};'
+ self.assertIn(tmp, config)
+
+ # exclude a prefix with auto-ignore CLI syntax
+ self.cli_set(base_path + ['auto-ignore', ula_prefixes[0]])
+
+ # commit and reload config
+ self.cli_commit()
+ config = read_file(RADVD_CONF)
+
+ # verify that both prefixes appear in config block once again
+ tmp = f'autoignoreprefixes' + f' {left_curly}{newline} {ula_prefixes[0]};{newline} {ula_prefixes[1]};{newline} {right_curly};'
+ self.assertIn(tmp, config)
+
+ # override first ULA prefix again
+ # first ULA is auto-ignored in CLI, it must appear only once in config
+ self.cli_set(base_path + ['prefix', ula_prefixes[0]])
+
+ # commit and reload config
+ self.cli_commit()
+ config = read_file(RADVD_CONF)
+
+ # verify that both prefixes appear uniquely
+ tmp = f'autoignoreprefixes' + f' {left_curly}{newline} {ula_prefixes[0]};{newline} {ula_prefixes[1]};{newline} {right_curly};'
+ self.assertIn(tmp, config)
+
+ # remove wildcard prefix and verify config block is gone
+ self.cli_delete(base_path + ['prefix', '::/64'])
+
+ # commit and reload config
+ self.cli_commit()
+ config = read_file(RADVD_CONF)
+
+ # verify config block is gone
+ tmp = f'autoignoreprefixes' + ' {'
+ self.assertNotIn(tmp, config)
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_service_webproxy.py b/smoketest/scripts/cli/test_service_webproxy.py
index 2b3f6d21c..ab4707a61 100755
--- a/smoketest/scripts/cli/test_service_webproxy.py
+++ b/smoketest/scripts/cli/test_service_webproxy.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2020-2022 VyOS maintainers and contributors
+# Copyright (C) 2020-2025 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -297,6 +297,22 @@ class TestServiceWebProxy(VyOSUnitTestSHIM.TestCase):
# Check for running process
self.assertTrue(process_named_running(PROCESS_NAME))
+ def test_06_nocache_domain_proxy(self):
+ domains_nocache = ['test1.net', 'test2.net']
+ self.cli_set(base_path + ['listen-address', listen_ip])
+ for domain in domains_nocache:
+ self.cli_set(base_path + ['domain-noncache', domain])
+ # commit changes
+ self.cli_commit()
+
+ config = read_file(PROXY_CONF)
+
+ for domain in domains_nocache:
+ self.assertIn(f'acl NOCACHE dstdomain {domain}', config)
+ self.assertIn(f'no_cache deny NOCACHE', config)
+
+ # Check for running process
+ self.assertTrue(process_named_running(PROCESS_NAME))
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_system_login.py b/smoketest/scripts/cli/test_system_login.py
index d79f5521c..71dec68d8 100755
--- a/smoketest/scripts/cli/test_system_login.py
+++ b/smoketest/scripts/cli/test_system_login.py
@@ -42,6 +42,7 @@ from vyos.xml_ref import default_value
base_path = ['system', 'login']
users = ['vyos1', 'vyos-roxx123', 'VyOS-123_super.Nice']
+weak_passwd_user = ['test_user', 'passWord1']
ssh_test_command = '/opt/vyatta/bin/vyatta-op-cmd-wrapper show version'
@@ -194,18 +195,20 @@ class TestSystemLogin(VyOSUnitTestSHIM.TestCase):
def test_system_login_user(self):
for user in users:
name = f'VyOS Roxx {user}'
+ passwd = f'{user}-pSWd-t3st'
home_dir = f'/tmp/smoketest/{user}'
- self.cli_set(base_path + ['user', user, 'authentication', 'plaintext-password', user])
+ self.cli_set(base_path + ['user', user, 'authentication', 'plaintext-password', passwd])
self.cli_set(base_path + ['user', user, 'full-name', name])
self.cli_set(base_path + ['user', user, 'home-directory', home_dir])
self.cli_commit()
for user in users:
+ passwd = f'{user}-pSWd-t3st'
tmp = ['su','-', user]
proc = Popen(tmp, stdin=PIPE, stdout=PIPE, stderr=PIPE)
- tmp = f'{user}\nuname -a'
+ tmp = f'{passwd}\nuname -a'
proc.stdin.write(tmp.encode())
proc.stdin.flush()
(stdout, stderr) = proc.communicate()
@@ -229,6 +232,17 @@ class TestSystemLogin(VyOSUnitTestSHIM.TestCase):
tmp = cmd(f'sudo passwd -S {locked_user}')
self.assertIn(f'{locked_user} P ', tmp)
+ def test_system_login_weak_password_warning(self):
+ self.cli_set(base_path + [
+ 'user', weak_passwd_user[0], 'authentication',
+ 'plaintext-password', weak_passwd_user[1]
+ ])
+
+ out = self.cli_commit().strip()
+
+ self.assertIn('WARNING: The password complexity is too low', out)
+ self.cli_delete(base_path + ['user', weak_passwd_user[0]])
+
def test_system_login_otp(self):
otp_user = 'otp-test_user'
otp_password = 'SuperTestPassword'
diff --git a/smoketest/scripts/cli/test_system_option.py b/smoketest/scripts/cli/test_system_option.py
index f3112cf0b..c7f8c1f3e 100755
--- a/smoketest/scripts/cli/test_system_option.py
+++ b/smoketest/scripts/cli/test_system_option.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2024 VyOS maintainers and contributors
+# Copyright (C) 2024-2025 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -16,14 +16,18 @@
import os
import unittest
+
from base_vyostest_shim import VyOSUnitTestSHIM
+
+from vyos.configsession import ConfigSessionError
+from vyos.utils.cpu import get_cpus
from vyos.utils.file import read_file
from vyos.utils.process import is_systemd_service_active
from vyos.utils.system import sysctl_read
+from vyos.system import image
base_path = ['system', 'option']
-
class TestSystemOption(VyOSUnitTestSHIM.TestCase):
def tearDown(self):
self.cli_delete(base_path)
@@ -96,6 +100,60 @@ class TestSystemOption(VyOSUnitTestSHIM.TestCase):
self.cli_commit()
self.assertFalse(os.path.exists(ssh_client_opt_file))
+ def test_kernel_options(self):
+ amd_pstate_mode = 'active'
+ isolate_cpus = '1,2,3'
+ nohz_full = '2'
+ rcu_no_cbs = '1,2,4-5'
+ default_hp_size = '2M'
+ hp_size_1g = '1G'
+ hp_size_2m = '2M'
+ hp_count_1g = '2'
+ hp_count_2m = '512'
+
+ self.cli_set(['system', 'option', 'kernel', 'cpu', 'disable-nmi-watchdog'])
+ self.cli_set(['system', 'option', 'kernel', 'cpu', 'isolate-cpus', isolate_cpus])
+ self.cli_set(['system', 'option', 'kernel', 'cpu', 'nohz-full', nohz_full])
+ self.cli_set(['system', 'option', 'kernel', 'cpu', 'rcu-no-cbs', rcu_no_cbs])
+ self.cli_set(['system', 'option', 'kernel', 'disable-hpet'])
+ self.cli_set(['system', 'option', 'kernel', 'disable-mce'])
+ self.cli_set(['system', 'option', 'kernel', 'disable-mitigations'])
+ self.cli_set(['system', 'option', 'kernel', 'disable-power-saving'])
+ self.cli_set(['system', 'option', 'kernel', 'disable-softlockup'])
+ self.cli_set(['system', 'option', 'kernel', 'memory', 'disable-numa-balancing'])
+ self.cli_set(['system', 'option', 'kernel', 'memory', 'default-hugepage-size', default_hp_size])
+ self.cli_set(['system', 'option', 'kernel', 'memory', 'hugepage-size', hp_size_1g, 'hugepage-count', hp_count_1g])
+ self.cli_set(['system', 'option', 'kernel', 'memory', 'hugepage-size', hp_size_2m, 'hugepage-count', hp_count_2m])
+ self.cli_set(['system', 'option', 'kernel', 'quiet'])
+
+ self.cli_set(['system', 'option', 'kernel', 'amd-pstate-driver', amd_pstate_mode])
+ cpu_vendor = get_cpus()[0]['vendor_id']
+ if cpu_vendor != 'AuthenticAMD':
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
+ self.cli_delete(['system', 'option', 'kernel', 'amd-pstate-driver'])
+
+ self.cli_commit()
+
+ # Read GRUB config file for current running image
+ tmp = read_file(f'{image.grub.GRUB_DIR_VYOS_VERS}/{image.get_running_image()}.cfg')
+ self.assertIn(' mitigations=off', tmp)
+ self.assertIn(' intel_idle.max_cstate=0 processor.max_cstate=1', tmp)
+ self.assertIn(' quiet', tmp)
+ self.assertIn(' nmi_watchdog=0', tmp)
+ self.assertIn(' hpet=disable', tmp)
+ self.assertIn(' mce=off', tmp)
+ self.assertIn(' nosoftlockup', tmp)
+ self.assertIn(f' isolcpus={isolate_cpus}', tmp)
+ self.assertIn(f' nohz_full={nohz_full}', tmp)
+ self.assertIn(f' rcu_nocbs={rcu_no_cbs}', tmp)
+ self.assertIn(f' default_hugepagesz={default_hp_size}', tmp)
+ self.assertIn(f' hugepagesz={hp_size_1g} hugepages={hp_count_1g}', tmp)
+ self.assertIn(f' hugepagesz={hp_size_2m} hugepages={hp_count_2m}', tmp)
+ self.assertIn(' numa_balancing=disable', tmp)
+
+ if cpu_vendor == 'AuthenticAMD':
+ self.assertIn(f' initcall_blacklist=acpi_cpufreq_init amd_pstate={amd_pstate_mode}', tmp)
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_system_syslog.py b/smoketest/scripts/cli/test_system_syslog.py
index a86711119..f3e1f65ea 100755
--- a/smoketest/scripts/cli/test_system_syslog.py
+++ b/smoketest/scripts/cli/test_system_syslog.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2019-2024 VyOS maintainers and contributors
+# Copyright (C) 2019-2025 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -14,24 +14,33 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import re
+import os
import unittest
from base_vyostest_shim import VyOSUnitTestSHIM
+from vyos.configsession import ConfigSessionError
from vyos.utils.file import read_file
+from vyos.utils.process import cmd
from vyos.utils.process import process_named_running
from vyos.xml_ref import default_value
PROCESS_NAME = 'rsyslogd'
-RSYSLOG_CONF = '/etc/rsyslog.d/00-vyos.conf'
+RSYSLOG_CONF = '/run/rsyslog/rsyslog.conf'
base_path = ['system', 'syslog']
-def get_config_value(key):
- tmp = read_file(RSYSLOG_CONF)
- tmp = re.findall(r'\n?{}\s+(.*)'.format(key), tmp)
- return tmp[0]
+dummy_interface = 'dum372874'
+
+def get_config(string=''):
+ """
+ Retrieve current "running configuration" from FRR
+ string: search for a specific start string in the configuration
+ """
+ command = 'cat /run/rsyslog/rsyslog.conf'
+ if string:
+ command += f' | sed -n "/^{string}$/,/}}/p"' # }} required to escape } in f-string
+ return cmd(command)
class TestRSYSLOGService(VyOSUnitTestSHIM.TestCase):
@classmethod
@@ -41,6 +50,7 @@ class TestRSYSLOGService(VyOSUnitTestSHIM.TestCase):
# ensure we can also run this test on a live system - so lets clean
# out the current configuration :)
cls.cli_delete(cls, base_path)
+ cls.cli_delete(cls, ['vrf'])
def tearDown(self):
# Check for running process
@@ -50,82 +60,249 @@ class TestRSYSLOGService(VyOSUnitTestSHIM.TestCase):
self.cli_delete(base_path)
self.cli_commit()
+ # The default syslog implementation should make syslog.service a
+ # symlink to itself
+ self.assertEqual(os.readlink('/etc/systemd/system/syslog.service'),
+ '/lib/systemd/system/rsyslog.service')
+
# Check for running process
self.assertFalse(process_named_running(PROCESS_NAME))
- def test_syslog_basic(self):
- host1 = '127.0.0.10'
- host2 = '127.0.0.20'
-
- self.cli_set(base_path + ['host', host1, 'port', '999'])
- self.cli_set(base_path + ['host', host1, 'facility', 'all', 'level', 'all'])
- self.cli_set(base_path + ['host', host2, 'facility', 'kern', 'level', 'err'])
- self.cli_set(base_path + ['console', 'facility', 'all', 'level', 'warning'])
-
+ def test_console(self):
+ level = 'warning'
+ self.cli_set(base_path + ['console', 'facility', 'all', 'level'], value=level)
self.cli_commit()
- # verify log level and facilities in config file
- # *.warning /dev/console
- # *.* @198.51.100.1:999
- # kern.err @192.0.2.1:514
+
+ rsyslog_conf = get_config()
config = [
- get_config_value('\*.\*'),
- get_config_value('kern.err'),
- get_config_value('\*.warning'),
+ f'if prifilt("*.{level}") then {{', # {{ required to escape { in f-string
+ 'action(type="omfile" file="/dev/console")',
]
- expected = [f'@{host1}:999', f'@{host2}:514', '/dev/console']
+ for tmp in config:
+ self.assertIn(tmp, rsyslog_conf)
- for i in range(0, 3):
- self.assertIn(expected[i], config[i])
- # Check for running process
- self.assertTrue(process_named_running(PROCESS_NAME))
-
- def test_syslog_global(self):
+ def test_basic(self):
hostname = 'vyos123'
- domainname = 'example.local'
- self.cli_set(['system', 'host-name', hostname])
- self.cli_set(['system', 'domain-name', domainname])
- self.cli_set(base_path + ['global', 'marker', 'interval', '600'])
- self.cli_set(base_path + ['global', 'preserve-fqdn'])
- self.cli_set(base_path + ['global', 'facility', 'kern', 'level', 'err'])
+ domain_name = 'example.local'
+ default_marker_interval = default_value(base_path + ['marker', 'interval'])
+
+ facility = {
+ 'auth': {'level': 'info'},
+ 'kern': {'level': 'debug'},
+ 'all': {'level': 'notice'},
+ }
+
+ self.cli_set(['system', 'host-name'], value=hostname)
+ self.cli_set(['system', 'domain-name'], value=domain_name)
+ self.cli_set(base_path + ['preserve-fqdn'])
+
+ for tmp, tmp_options in facility.items():
+ level = tmp_options['level']
+ self.cli_set(base_path + ['local', 'facility', tmp, 'level'], value=level)
self.cli_commit()
- config = read_file(RSYSLOG_CONF)
+ config = get_config('')
expected = [
- '$MarkMessagePeriod 600',
- '$PreserveFQDN on',
- 'kern.err',
- f'$LocalHostName {hostname}.{domainname}',
+ f'module(load="immark" interval="{default_marker_interval}")',
+ 'global(preserveFQDN="on")',
+ f'global(localHostname="{hostname}.{domain_name}")',
]
-
for e in expected:
self.assertIn(e, config)
- # Check for running process
- self.assertTrue(process_named_running(PROCESS_NAME))
- def test_syslog_remote(self):
- rhost = '169.254.0.1'
- default_port = default_value(base_path + ['host', rhost, 'port'])
+ config = get_config('#### GLOBAL LOGGING ####')
+ prifilt = []
+ for tmp, tmp_options in facility.items():
+ if tmp == 'all':
+ tmp = '*'
+ level = tmp_options['level']
+ prifilt.append(f'{tmp}.{level}')
+
+ prifilt.sort()
+ prifilt = ','.join(prifilt)
- self.cli_set(base_path + ['global', 'facility', 'all', 'level', 'info'])
- self.cli_set(base_path + ['global', 'facility', 'local7', 'level', 'debug'])
- self.cli_set(base_path + ['host', rhost, 'facility', 'all', 'level', 'all'])
- self.cli_set(base_path + ['host', rhost, 'protocol', 'tcp'])
+ self.assertIn(f'if prifilt("{prifilt}") then {{', config)
+ self.assertIn( ' action(', config)
+ self.assertIn( ' type="omfile"', config)
+ self.assertIn( ' file="/var/log/messages"', config)
+ self.assertIn( ' rotation.sizeLimit="524288"', config)
+ self.assertIn( ' rotation.sizeLimitCommand="/usr/sbin/logrotate /etc/logrotate.d/vyos-rsyslog"', config)
+ self.cli_set(base_path + ['marker', 'disable'])
self.cli_commit()
- config = read_file(RSYSLOG_CONF)
- self.assertIn(f'*.* @@{rhost}:{default_port}', config)
+ config = get_config('')
+ self.assertNotIn('module(load="immark"', config)
+
+ def test_remote(self):
+ dummy_if_path = ['interfaces', 'dummy', dummy_interface]
+ rhosts = {
+ '169.254.0.1': {
+ 'facility': {'auth' : {'level': 'info'}},
+ 'protocol': 'udp',
+ },
+ '2001:db8::1': {
+ 'facility': {'all' : {'level': 'debug'}},
+ 'port': '1514',
+ 'protocol': 'udp',
+ },
+ 'syslog.vyos.net': {
+ 'facility': {'all' : {'level': 'debug'}},
+ 'port': '1515',
+ 'protocol': 'tcp',
+ },
+ '169.254.0.3': {
+ 'facility': {'auth' : {'level': 'info'},
+ 'kern' : {'level': 'debug'},
+ 'all' : {'level': 'notice'},
+ },
+ 'format': ['include-timezone', 'octet-counted'],
+ 'protocol': 'tcp',
+ 'port': '10514',
+ },
+ }
+ default_port = default_value(base_path + ['remote', next(iter(rhosts)), 'port'])
+ default_protocol = default_value(base_path + ['remote', next(iter(rhosts)), 'protocol'])
+
+ for remote, remote_options in rhosts.items():
+ remote_base = base_path + ['remote', remote]
+
+ if 'port' in remote_options:
+ self.cli_set(remote_base + ['port'], value=remote_options['port'])
+
+ if 'facility' in remote_options:
+ for facility, facility_options in remote_options['facility'].items():
+ level = facility_options['level']
+ self.cli_set(remote_base + ['facility', facility, 'level'],
+ value=level)
+
+ if 'format' in remote_options:
+ for format in remote_options['format']:
+ self.cli_set(remote_base + ['format'], value=format)
+
+ if 'protocol' in remote_options:
+ protocol = remote_options['protocol']
+ self.cli_set(remote_base + ['protocol'], value=protocol)
+
+ if 'source_address' in remote_options:
+ source_address = remote_options['source_address']
+ self.cli_set(remote_base + ['source-address', source_address])
+
+ # check validate() - source address does not exist
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
+ self.cli_set(dummy_if_path + ['address', f'{source_address}/32'])
- # Change default port and enable "octet-counting" mode
- port = '10514'
- self.cli_set(base_path + ['host', rhost, 'port', port])
- self.cli_set(base_path + ['host', rhost, 'format', 'octet-counted'])
self.cli_commit()
config = read_file(RSYSLOG_CONF)
- self.assertIn(f'*.* @@(o){rhost}:{port}', config)
+ for remote, remote_options in rhosts.items():
+ config = get_config(f'# Remote syslog to {remote}')
+ prifilt = []
+ if 'facility' in remote_options:
+ for facility, facility_options in remote_options['facility'].items():
+ level = facility_options['level']
+ if facility == 'all':
+ facility = '*'
+ prifilt.append(f'{facility}.{level}')
+
+ prifilt.sort()
+ prifilt = ','.join(prifilt)
+ if not prifilt:
+ # Skip test - as we do not render anything if no facility is set
+ continue
+
+ self.assertIn(f'if prifilt("{prifilt}") then {{', config)
+ self.assertIn( ' type="omfwd"', config)
+ self.assertIn(f' target="{remote}"', config)
+
+ port = default_port
+ if 'port' in remote_options:
+ port = remote_options['port']
+ self.assertIn(f'port="{port}"', config)
+
+ protocol = default_protocol
+ if 'protocol' in remote_options:
+ protocol = remote_options['protocol']
+ self.assertIn(f'protocol="{protocol}"', config)
+
+ if 'format' in remote_options:
+ if 'include-timezone' in remote_options['format']:
+ self.assertIn( ' template="RSYSLOG_SyslogProtocol23Format"', config)
+
+ if 'octet-counted' in remote_options['format']:
+ self.assertIn( ' TCP_Framing="octet-counted"', config)
+ else:
+ self.assertIn( ' TCP_Framing="traditional"', config)
+
+ # cleanup dummy interface
+ self.cli_delete(dummy_if_path)
+
+ def test_vrf_source_address(self):
+ rhosts = {
+ '169.254.0.10': { },
+ '169.254.0.11': {
+ 'vrf': {'name' : 'red', 'table' : '12321'},
+ 'source_address' : '169.254.0.11',
+ },
+ '169.254.0.12': {
+ 'vrf': {'name' : 'green', 'table' : '12322'},
+ 'source_address' : '169.254.0.12',
+ },
+ '169.254.0.13': {
+ 'vrf': {'name' : 'blue', 'table' : '12323'},
+ 'source_address' : '169.254.0.13',
+ },
+ }
+
+ for remote, remote_options in rhosts.items():
+ remote_base = base_path + ['remote', remote]
+ self.cli_set(remote_base + ['facility', 'all'])
+
+ vrf = None
+ if 'vrf' in remote_options:
+ vrf = remote_options['vrf']['name']
+ self.cli_set(['vrf', 'name', vrf, 'table'],
+ value=remote_options['vrf']['table'])
+ self.cli_set(remote_base + ['vrf'], value=vrf)
+
+ if 'source_address' in remote_options:
+ source_address = remote_options['source_address']
+ self.cli_set(remote_base + ['source-address'],
+ value=source_address)
+
+ idx = source_address.split('.')[-1]
+ self.cli_set(['interfaces', 'dummy', f'dum{idx}', 'address'],
+ value=f'{source_address}/32')
+ if vrf:
+ self.cli_set(['interfaces', 'dummy', f'dum{idx}', 'vrf'],
+ value=vrf)
+
+ self.cli_commit()
+
+ for remote, remote_options in rhosts.items():
+ config = get_config(f'# Remote syslog to {remote}')
+
+ self.assertIn(f'target="{remote}"', config)
+ if 'vrf' in remote_options:
+ vrf = remote_options['vrf']['name']
+ self.assertIn(f'Device="{vrf}"', config)
+
+ if 'source_address' in remote_options:
+ source_address = remote_options['source_address']
+ self.assertIn(f'Address="{source_address}"', config)
+
+ # Cleanup VRF/Dummy interfaces
+ for remote, remote_options in rhosts.items():
+ if 'vrf' in remote_options:
+ vrf = remote_options['vrf']['name']
+ self.cli_delete(['vrf', 'name', vrf])
+ if 'source_address' in remote_options:
+ source_address = remote_options['source_address']
+ idx = source_address.split('.')[-1]
+ self.cli_delete(['interfaces', 'dummy', f'dum{idx}'])
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_vpn_ipsec.py b/smoketest/scripts/cli/test_vpn_ipsec.py
index 91a76e6f6..c1d943bde 100755
--- a/smoketest/scripts/cli/test_vpn_ipsec.py
+++ b/smoketest/scripts/cli/test_vpn_ipsec.py
@@ -352,6 +352,94 @@ class TestVPNIPsec(VyOSUnitTestSHIM.TestCase):
self.tearDownPKI()
+ def test_site_to_site_vti_ts_afi(self):
+ local_address = '192.0.2.10'
+ vti = 'vti10'
+ # IKE
+ self.cli_set(base_path + ['ike-group', ike_group, 'key-exchange', 'ikev2'])
+ self.cli_set(base_path + ['ike-group', ike_group, 'disable-mobike'])
+ # ESP
+ self.cli_set(base_path + ['esp-group', esp_group, 'compression'])
+ # VTI interface
+ self.cli_set(vti_path + [vti, 'address', '10.1.1.1/24'])
+
+ # vpn ipsec auth psk <tag> id <x.x.x.x>
+ self.cli_set(base_path + ['authentication', 'psk', connection_name, 'id', local_id])
+ self.cli_set(base_path + ['authentication', 'psk', connection_name, 'id', remote_id])
+ self.cli_set(base_path + ['authentication', 'psk', connection_name, 'id', peer_ip])
+ self.cli_set(base_path + ['authentication', 'psk', connection_name, 'secret', secret])
+
+ # Site to site
+ peer_base_path = base_path + ['site-to-site', 'peer', connection_name]
+ self.cli_set(peer_base_path + ['authentication', 'mode', 'pre-shared-secret'])
+ self.cli_set(peer_base_path + ['connection-type', 'none'])
+ self.cli_set(peer_base_path + ['force-udp-encapsulation'])
+ self.cli_set(peer_base_path + ['ike-group', ike_group])
+ self.cli_set(peer_base_path + ['default-esp-group', esp_group])
+ self.cli_set(peer_base_path + ['local-address', local_address])
+ self.cli_set(peer_base_path + ['remote-address', peer_ip])
+ self.cli_set(peer_base_path + ['vti', 'bind', vti])
+ self.cli_set(peer_base_path + ['vti', 'esp-group', esp_group])
+ self.cli_set(peer_base_path + ['vti', 'traffic-selector', 'local', 'prefix', '0.0.0.0/0'])
+ self.cli_set(peer_base_path + ['vti', 'traffic-selector', 'remote', 'prefix', '192.0.2.1/32'])
+ self.cli_set(peer_base_path + ['vti', 'traffic-selector', 'remote', 'prefix', '192.0.2.3/32'])
+
+ self.cli_commit()
+
+ swanctl_conf = read_file(swanctl_file)
+ if_id = vti.lstrip('vti')
+ # The key defaults to 0 and will match any policies which similarly do
+ # not have a lookup key configuration - thus we shift the key by one
+ # to also support a vti0 interface
+ if_id = str(int(if_id) +1)
+ swanctl_conf_lines = [
+ f'version = 2',
+ f'auth = psk',
+ f'proposals = aes128-sha1-modp1024',
+ f'esp_proposals = aes128-sha1-modp1024',
+ f'local_addrs = {local_address} # dhcp:no',
+ f'mobike = no',
+ f'remote_addrs = {peer_ip}',
+ f'mode = tunnel',
+ f'local_ts = 0.0.0.0/0',
+ f'remote_ts = 192.0.2.1/32,192.0.2.3/32',
+ f'ipcomp = yes',
+ f'start_action = none',
+ f'replay_window = 32',
+ f'if_id_in = {if_id}', # will be 11 for vti10 - shifted by one
+ f'if_id_out = {if_id}',
+ f'updown = "/etc/ipsec.d/vti-up-down {vti}"'
+ ]
+ for line in swanctl_conf_lines:
+ self.assertIn(line, swanctl_conf)
+
+ # Check IPv6 TS
+ self.cli_delete(peer_base_path + ['vti', 'traffic-selector'])
+ self.cli_set(peer_base_path + ['vti', 'traffic-selector', 'local', 'prefix', '::/0'])
+ self.cli_set(peer_base_path + ['vti', 'traffic-selector', 'remote', 'prefix', '::/0'])
+ self.cli_commit()
+ swanctl_conf = read_file(swanctl_file)
+ swanctl_conf_lines = [
+ f'local_ts = ::/0',
+ f'remote_ts = ::/0',
+ f'updown = "/etc/ipsec.d/vti-up-down {vti}"'
+ ]
+ for line in swanctl_conf_lines:
+ self.assertIn(line, swanctl_conf)
+
+ # Check both TS (IPv4 + IPv6)
+ self.cli_delete(peer_base_path + ['vti', 'traffic-selector'])
+ self.cli_commit()
+ swanctl_conf = read_file(swanctl_file)
+ swanctl_conf_lines = [
+ f'local_ts = 0.0.0.0/0,::/0',
+ f'remote_ts = 0.0.0.0/0,::/0',
+ f'updown = "/etc/ipsec.d/vti-up-down {vti}"'
+ ]
+ for line in swanctl_conf_lines:
+ self.assertIn(line, swanctl_conf)
+
+
def test_dmvpn(self):
ike_lifetime = '3600'
esp_lifetime = '1800'