summaryrefslogtreecommitdiff
path: root/smoketest/scripts
diff options
context:
space:
mode:
Diffstat (limited to 'smoketest/scripts')
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_dummy.py1
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_ethernet.py2
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_input.py52
-rwxr-xr-xsmoketest/scripts/cli/test_load_balancing_wan.py (renamed from smoketest/scripts/cli/test_load_balancning_wan.py)15
-rwxr-xr-xsmoketest/scripts/cli/test_protocols_bgp.py108
-rwxr-xr-xsmoketest/scripts/cli/test_protocols_ospf.py15
-rwxr-xr-xsmoketest/scripts/cli/test_qos.py547
-rwxr-xr-xsmoketest/scripts/cli/test_service_dhcpv6-relay.py28
-rwxr-xr-xsmoketest/scripts/cli/test_service_ntp.py (renamed from smoketest/scripts/cli/test_system_ntp.py)70
-rwxr-xr-xsmoketest/scripts/cli/test_service_router-advert.py39
-rwxr-xr-xsmoketest/scripts/cli/test_service_tftp-server.py31
-rwxr-xr-xsmoketest/scripts/cli/test_vpn_ipsec.py7
-rwxr-xr-xsmoketest/scripts/cli/test_vpn_openconnect.py15
-rwxr-xr-xsmoketest/scripts/system/test_module_load.py5
14 files changed, 867 insertions, 68 deletions
diff --git a/smoketest/scripts/cli/test_interfaces_dummy.py b/smoketest/scripts/cli/test_interfaces_dummy.py
index d96ec2c5d..a79e4cb1b 100755
--- a/smoketest/scripts/cli/test_interfaces_dummy.py
+++ b/smoketest/scripts/cli/test_interfaces_dummy.py
@@ -21,6 +21,7 @@ from base_interfaces_test import BasicInterfaceTest
class DummyInterfaceTest(BasicInterfaceTest.TestCase):
@classmethod
def setUpClass(cls):
+ cls._test_mtu = True
cls._base_path = ['interfaces', 'dummy']
cls._interfaces = ['dum435', 'dum8677', 'dum0931', 'dum089']
# call base-classes classmethod
diff --git a/smoketest/scripts/cli/test_interfaces_ethernet.py b/smoketest/scripts/cli/test_interfaces_ethernet.py
index ed611062a..e53413f0d 100755
--- a/smoketest/scripts/cli/test_interfaces_ethernet.py
+++ b/smoketest/scripts/cli/test_interfaces_ethernet.py
@@ -160,7 +160,7 @@ class EthernetInterfaceTest(BasicInterfaceTest.TestCase):
self.assertFalse(is_intf_addr_assigned(intf, addr['addr']))
def test_offloading_rps(self):
- # enable RPS on all available CPUs, RPS works woth a CPU bitmask,
+ # enable RPS on all available CPUs, RPS works with a CPU bitmask,
# where each bit represents a CPU (core/thread). The formula below
# expands to rps_cpus = 255 for a 8 core system
rps_cpus = (1 << os.cpu_count()) -1
diff --git a/smoketest/scripts/cli/test_interfaces_input.py b/smoketest/scripts/cli/test_interfaces_input.py
new file mode 100755
index 000000000..c6d7febec
--- /dev/null
+++ b/smoketest/scripts/cli/test_interfaces_input.py
@@ -0,0 +1,52 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 2023 VyOS maintainers and contributors
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 or later as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import unittest
+
+from vyos.util import read_file
+from vyos.ifconfig import Interface
+from base_vyostest_shim import VyOSUnitTestSHIM
+
+base_path = ['interfaces', 'input']
+
+# add a classmethod to setup a temporaray PPPoE server for "proper" validation
+class InputInterfaceTest(VyOSUnitTestSHIM.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ super(InputInterfaceTest, cls).setUpClass()
+
+ cls._interfaces = ['ifb10', 'ifb20', 'ifb30']
+
+ def tearDown(self):
+ self.cli_delete(base_path)
+ self.cli_commit()
+
+ def test_01_description(self):
+ # Check if PPPoE dialer can be configured and runs
+ for interface in self._interfaces:
+ self.cli_set(base_path + [interface, 'description', f'foo-{interface}'])
+
+ # commit changes
+ self.cli_commit()
+
+ # Validate remove interface description "empty"
+ for interface in self._interfaces:
+ tmp = read_file(f'/sys/class/net/{interface}/ifalias')
+ self.assertEqual(tmp, f'foo-{interface}')
+ self.assertEqual(Interface(interface).get_alias(), f'foo-{interface}')
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_load_balancning_wan.py b/smoketest/scripts/cli/test_load_balancing_wan.py
index 23020b9b1..33c69c595 100755
--- a/smoketest/scripts/cli/test_load_balancning_wan.py
+++ b/smoketest/scripts/cli/test_load_balancing_wan.py
@@ -46,7 +46,6 @@ def cmd_in_netns(netns, cmd):
def delete_netns(name):
return call(f'sudo ip netns del {name}')
-
class TestLoadBalancingWan(VyOSUnitTestSHIM.TestCase):
@classmethod
def setUpClass(cls):
@@ -61,7 +60,6 @@ class TestLoadBalancingWan(VyOSUnitTestSHIM.TestCase):
self.cli_commit()
def test_table_routes(self):
-
ns1 = 'ns201'
ns2 = 'ns202'
ns3 = 'ns203'
@@ -79,6 +77,7 @@ class TestLoadBalancingWan(VyOSUnitTestSHIM.TestCase):
create_veth_pair(iface1, container_iface1)
create_veth_pair(iface2, container_iface2)
create_veth_pair(iface3, container_iface3)
+
move_interface_to_netns(container_iface1, ns1)
move_interface_to_netns(container_iface2, ns2)
move_interface_to_netns(container_iface3, ns3)
@@ -125,7 +124,7 @@ class TestLoadBalancingWan(VyOSUnitTestSHIM.TestCase):
self.assertEqual(tmp, original)
# Delete veth interfaces and netns
- for iface in [iface1, iface2]:
+ for iface in [iface1, iface2, iface3, container_iface1, container_iface2, container_iface3]:
call(f'sudo ip link del dev {iface}')
delete_netns(ns1)
@@ -196,9 +195,10 @@ class TestLoadBalancingWan(VyOSUnitTestSHIM.TestCase):
call(f'sudo ip address add 203.0.113.10/24 dev {iface1}')
call(f'sudo ip address add 192.0.2.10/24 dev {iface2}')
call(f'sudo ip address add 198.51.100.10/24 dev {iface3}')
- call(f'sudo ip link set dev {iface1} up')
- call(f'sudo ip link set dev {iface2} up')
- call(f'sudo ip link set dev {iface3} up')
+
+ for iface in [iface1, iface2, iface3]:
+ call(f'sudo ip link set dev {iface} up')
+
cmd_in_netns(ns1, f'ip link set {container_iface1} name eth0')
cmd_in_netns(ns2, f'ip link set {container_iface2} name eth0')
cmd_in_netns(ns3, f'ip link set {container_iface3} name eth0')
@@ -247,12 +247,11 @@ class TestLoadBalancingWan(VyOSUnitTestSHIM.TestCase):
self.assertEqual(tmp, nat_vyos_pre_snat_hook)
# Delete veth interfaces and netns
- for iface in [iface1, iface2]:
+ for iface in [iface1, iface2, iface3, container_iface1, container_iface2, container_iface3]:
call(f'sudo ip link del dev {iface}')
delete_netns(ns1)
delete_netns(ns2)
-
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_protocols_bgp.py b/smoketest/scripts/cli/test_protocols_bgp.py
index debc8270c..e33be6644 100755
--- a/smoketest/scripts/cli/test_protocols_bgp.py
+++ b/smoketest/scripts/cli/test_protocols_bgp.py
@@ -34,6 +34,10 @@ prefix_list_in6 = 'pfx-foo-in6'
prefix_list_out6 = 'pfx-foo-out6'
bfd_profile = 'foo-bar-baz'
+import_afi = 'ipv4-unicast'
+import_vrf = 'red'
+import_rd = ASN + ':100'
+import_vrf_base = ['vrf', 'name']
neighbor_config = {
'192.0.2.1' : {
'bfd' : '',
@@ -189,6 +193,15 @@ class TestProtocolsBGP(VyOSUnitTestSHIM.TestCase):
# Check for running process
self.assertTrue(process_named_running(PROCESS_NAME))
+
+ def create_bgp_instances_for_import_test(self):
+ table = '1000'
+ self.cli_set(base_path + ['system-as', ASN])
+ # testing only one AFI is sufficient as it's generic code
+
+ self.cli_set(import_vrf_base + [import_vrf, 'table', table])
+ self.cli_set(import_vrf_base + [import_vrf, 'protocols', 'bgp', 'system-as', ASN])
+
def verify_frr_config(self, peer, peer_config, frrconfig):
# recurring patterns to verify for both a simple neighbor and a peer-group
if 'bfd' in peer_config:
@@ -959,6 +972,101 @@ class TestProtocolsBGP(VyOSUnitTestSHIM.TestCase):
self.assertIn(f' neighbor {neighbor} remote-as {remote_asn}', frrconfig)
self.assertIn(f' neighbor {neighbor} local-as {local_asn}', frrconfig)
+ def test_bgp_16_import_rd_rt_compatibility(self):
+ # Verify if import vrf and rd vpn export
+ # exist in the same address family
+ self.create_bgp_instances_for_import_test()
+ self.cli_set(
+ base_path + ['address-family', import_afi, 'import', 'vrf',
+ import_vrf])
+ self.cli_set(
+ base_path + ['address-family', import_afi, 'rd', 'vpn', 'export',
+ import_rd])
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
+
+ def test_bgp_17_import_rd_rt_compatibility(self):
+ # Verify if vrf that is in import vrf list contains rd vpn export
+ self.create_bgp_instances_for_import_test()
+ self.cli_set(
+ base_path + ['address-family', import_afi, 'import', 'vrf',
+ import_vrf])
+ self.cli_commit()
+ frrconfig = self.getFRRconfig(f'router bgp {ASN}')
+ frrconfig_vrf = self.getFRRconfig(f'router bgp {ASN} vrf {import_vrf}')
+
+ self.assertIn(f'router bgp {ASN}', frrconfig)
+ self.assertIn(f'address-family ipv4 unicast', frrconfig)
+ self.assertIn(f' import vrf {import_vrf}', frrconfig)
+ self.assertIn(f'router bgp {ASN} vrf {import_vrf}', frrconfig_vrf)
+
+ self.cli_set(
+ import_vrf_base + [import_vrf] + base_path + ['address-family',
+ import_afi, 'rd',
+ 'vpn', 'export',
+ import_rd])
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
+
+ def test_bgp_18_deleting_import_vrf(self):
+ # Verify deleting vrf that is in import vrf list
+ self.create_bgp_instances_for_import_test()
+ self.cli_set(
+ base_path + ['address-family', import_afi, 'import', 'vrf',
+ import_vrf])
+ self.cli_commit()
+ frrconfig = self.getFRRconfig(f'router bgp {ASN}')
+ frrconfig_vrf = self.getFRRconfig(f'router bgp {ASN} vrf {import_vrf}')
+ self.assertIn(f'router bgp {ASN}', frrconfig)
+ self.assertIn(f'address-family ipv4 unicast', frrconfig)
+ self.assertIn(f' import vrf {import_vrf}', frrconfig)
+ self.assertIn(f'router bgp {ASN} vrf {import_vrf}', frrconfig_vrf)
+ self.cli_delete(import_vrf_base + [import_vrf])
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
+
+ def test_bgp_19_deleting_default_vrf(self):
+ # Verify deleting existent vrf default if other vrfs were created
+ self.create_bgp_instances_for_import_test()
+ self.cli_commit()
+ frrconfig = self.getFRRconfig(f'router bgp {ASN}')
+ frrconfig_vrf = self.getFRRconfig(f'router bgp {ASN} vrf {import_vrf}')
+ self.assertIn(f'router bgp {ASN}', frrconfig)
+ self.assertIn(f'router bgp {ASN} vrf {import_vrf}', frrconfig_vrf)
+ self.cli_delete(base_path)
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
+
+ def test_bgp_20_import_rd_rt_compatibility(self):
+ # Verify if vrf that has rd vpn export is in import vrf of other vrfs
+ self.create_bgp_instances_for_import_test()
+ self.cli_set(
+ import_vrf_base + [import_vrf] + base_path + ['address-family',
+ import_afi, 'rd',
+ 'vpn', 'export',
+ import_rd])
+ self.cli_commit()
+ frrconfig = self.getFRRconfig(f'router bgp {ASN}')
+ frrconfig_vrf = self.getFRRconfig(f'router bgp {ASN} vrf {import_vrf}')
+ self.assertIn(f'router bgp {ASN}', frrconfig)
+ self.assertIn(f'router bgp {ASN} vrf {import_vrf}', frrconfig_vrf)
+ self.assertIn(f'address-family ipv4 unicast', frrconfig_vrf)
+ self.assertIn(f' rd vpn export {import_rd}', frrconfig_vrf)
+
+ self.cli_set(
+ base_path + ['address-family', import_afi, 'import', 'vrf',
+ import_vrf])
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
+
+ def test_bgp_21_import_unspecified_vrf(self):
+ # Verify if vrf that is in import is unspecified
+ self.create_bgp_instances_for_import_test()
+ self.cli_set(
+ base_path + ['address-family', import_afi, 'import', 'vrf',
+ 'test'])
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_protocols_ospf.py b/smoketest/scripts/cli/test_protocols_ospf.py
index 339713bf6..581959b15 100755
--- a/smoketest/scripts/cli/test_protocols_ospf.py
+++ b/smoketest/scripts/cli/test_protocols_ospf.py
@@ -74,6 +74,11 @@ class TestProtocolsOSPF(VyOSUnitTestSHIM.TestCase):
self.cli_set(base_path + ['parameters', 'rfc1583-compatibility'])
self.cli_set(base_path + ['log-adjacency-changes', 'detail'])
self.cli_set(base_path + ['default-metric', metric])
+ self.cli_set(base_path + ['passive-interface', 'default'])
+ self.cli_set(base_path + ['area', '10', 'area-type', 'stub'])
+ self.cli_set(base_path + ['area', '10', 'network', '10.0.0.0/16'])
+ self.cli_set(base_path + ['area', '10', 'range', '10.0.1.0/24'])
+ self.cli_set(base_path + ['area', '10', 'range', '10.0.2.0/24', 'not-advertise'])
# commit changes
self.cli_commit()
@@ -88,6 +93,12 @@ class TestProtocolsOSPF(VyOSUnitTestSHIM.TestCase):
self.assertIn(f' timers throttle spf 200 1000 10000', frrconfig) # defaults
self.assertIn(f' capability opaque', frrconfig)
self.assertIn(f' default-metric {metric}', frrconfig)
+ self.assertIn(f' passive-interface default', frrconfig)
+ self.assertIn(f' area 10 stub', frrconfig)
+ self.assertIn(f' network 10.0.0.0/16 area 10', frrconfig)
+ self.assertIn(f' area 10 range 10.0.1.0/24', frrconfig)
+ self.assertNotIn(f' area 10 range 10.0.1.0/24 not-advertise', frrconfig)
+ self.assertIn(f' area 10 range 10.0.2.0/24 not-advertise', frrconfig)
def test_ospf_03_access_list(self):
@@ -272,6 +283,10 @@ class TestProtocolsOSPF(VyOSUnitTestSHIM.TestCase):
# commit changes
self.cli_commit()
+ frrconfig = self.getFRRconfig('router ospf')
+ self.assertIn(f'router ospf', frrconfig)
+ self.assertIn(f' passive-interface default', frrconfig)
+
for interface in interfaces:
config = self.getFRRconfig(f'interface {interface}')
self.assertIn(f'interface {interface}', config)
diff --git a/smoketest/scripts/cli/test_qos.py b/smoketest/scripts/cli/test_qos.py
new file mode 100755
index 000000000..0092473d6
--- /dev/null
+++ b/smoketest/scripts/cli/test_qos.py
@@ -0,0 +1,547 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 2022 VyOS maintainers and contributors
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 or later as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import unittest
+
+from json import loads
+from base_vyostest_shim import VyOSUnitTestSHIM
+
+from vyos.configsession import ConfigSessionError
+from vyos.ifconfig import Section
+from vyos.util import cmd
+
+base_path = ['qos']
+
+def get_tc_qdisc_json(interface) -> dict:
+ tmp = cmd(f'tc -detail -json qdisc show dev {interface}')
+ tmp = loads(tmp)
+ return next(iter(tmp))
+
+def get_tc_filter_json(interface, direction) -> list:
+ if direction not in ['ingress', 'egress']:
+ raise ValueError()
+ tmp = cmd(f'tc -detail -json filter show dev {interface} {direction}')
+ tmp = loads(tmp)
+ return tmp
+
+class TestQoS(VyOSUnitTestSHIM.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ super(TestQoS, cls).setUpClass()
+
+ # ensure we can also run this test on a live system - so lets clean
+ # out the current configuration :)
+ cls.cli_delete(cls, base_path)
+
+ # We only test on physical interfaces and not VLAN (sub-)interfaces
+ cls._interfaces = []
+ if 'TEST_ETH' in os.environ:
+ tmp = os.environ['TEST_ETH'].split()
+ cls._interfaces = tmp
+ else:
+ for tmp in Section.interfaces('ethernet', vlan=False):
+ cls._interfaces.append(tmp)
+
+ def tearDown(self):
+ # delete testing SSH config
+ self.cli_delete(base_path)
+ self.cli_commit()
+
+ def test_01_cake(self):
+ bandwidth = 1000000
+ rtt = 200
+
+ for interface in self._interfaces:
+ policy_name = f'qos-policy-{interface}'
+ self.cli_set(base_path + ['interface', interface, 'egress', policy_name])
+ self.cli_set(base_path + ['policy', 'cake', policy_name, 'bandwidth', str(bandwidth)])
+ self.cli_set(base_path + ['policy', 'cake', policy_name, 'rtt', str(rtt)])
+ self.cli_set(base_path + ['policy', 'cake', policy_name, 'flow-isolation', 'dual-src-host'])
+
+ bandwidth += 1000000
+ rtt += 20
+
+ # commit changes
+ self.cli_commit()
+
+ bandwidth = 1000000
+ rtt = 200
+ for interface in self._interfaces:
+ tmp = get_tc_qdisc_json(interface)
+
+ self.assertEqual('cake', tmp['kind'])
+ # TC store rates as a 32-bit unsigned integer in bps (Bytes per second)
+ self.assertEqual(int(bandwidth *125), tmp['options']['bandwidth'])
+ # RTT internally is in us
+ self.assertEqual(int(rtt *1000), tmp['options']['rtt'])
+ self.assertEqual('dual-srchost', tmp['options']['flowmode'])
+ self.assertFalse(tmp['options']['ingress'])
+ self.assertFalse(tmp['options']['nat'])
+ self.assertTrue(tmp['options']['raw'])
+
+ bandwidth += 1000000
+ rtt += 20
+
+ def test_02_drop_tail(self):
+ queue_limit = 50
+
+ first = True
+ for interface in self._interfaces:
+ policy_name = f'qos-policy-{interface}'
+
+ if first:
+ self.cli_set(base_path + ['interface', interface, 'ingress', policy_name])
+ # verify() - selected QoS policy on interface only supports egress
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
+ self.cli_delete(base_path + ['interface', interface, 'ingress', policy_name])
+ first = False
+
+ self.cli_set(base_path + ['interface', interface, 'egress', policy_name])
+ self.cli_set(base_path + ['policy', 'drop-tail', policy_name, 'queue-limit', str(queue_limit)])
+
+ queue_limit += 10
+
+ # commit changes
+ self.cli_commit()
+
+ queue_limit = 50
+ for interface in self._interfaces:
+ tmp = get_tc_qdisc_json(interface)
+
+ self.assertEqual('pfifo', tmp['kind'])
+ self.assertEqual(queue_limit, tmp['options']['limit'])
+
+ queue_limit += 10
+
+ def test_03_fair_queue(self):
+ hash_interval = 10
+ queue_limit = 5
+ policy_type = 'fair-queue'
+
+ first = True
+ for interface in self._interfaces:
+ policy_name = f'qos-policy-{interface}'
+
+ if first:
+ self.cli_set(base_path + ['interface', interface, 'ingress', policy_name])
+ # verify() - selected QoS policy on interface only supports egress
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
+ self.cli_delete(base_path + ['interface', interface, 'ingress', policy_name])
+ first = False
+
+ self.cli_set(base_path + ['interface', interface, 'egress', policy_name])
+ self.cli_set(base_path + ['policy', policy_type, policy_name, 'hash-interval', str(hash_interval)])
+ self.cli_set(base_path + ['policy', policy_type, policy_name, 'queue-limit', str(queue_limit)])
+
+ hash_interval += 1
+ queue_limit += 1
+
+ # commit changes
+ self.cli_commit()
+
+ hash_interval = 10
+ queue_limit = 5
+ for interface in self._interfaces:
+ tmp = get_tc_qdisc_json(interface)
+
+ self.assertEqual('sfq', tmp['kind'])
+ self.assertEqual(hash_interval, tmp['options']['perturb'])
+ self.assertEqual(queue_limit, tmp['options']['limit'])
+
+ hash_interval += 1
+ queue_limit += 1
+
+ def test_04_fq_codel(self):
+ policy_type = 'fq-codel'
+ codel_quantum = 1500
+ flows = 512
+ interval = 100
+ queue_limit = 2048
+ target = 5
+
+ first = True
+ for interface in self._interfaces:
+ policy_name = f'qos-policy-{interface}'
+
+ if first:
+ self.cli_set(base_path + ['interface', interface, 'ingress', policy_name])
+ # verify() - selected QoS policy on interface only supports egress
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
+ self.cli_delete(base_path + ['interface', interface, 'ingress', policy_name])
+ first = False
+
+ self.cli_set(base_path + ['interface', interface, 'egress', policy_name])
+ self.cli_set(base_path + ['policy', policy_type, policy_name, 'codel-quantum', str(codel_quantum)])
+ self.cli_set(base_path + ['policy', policy_type, policy_name, 'flows', str(flows)])
+ self.cli_set(base_path + ['policy', policy_type, policy_name, 'interval', str(interval)])
+ self.cli_set(base_path + ['policy', policy_type, policy_name, 'queue-limit', str(queue_limit)])
+ self.cli_set(base_path + ['policy', policy_type, policy_name, 'target', str(target)])
+
+ codel_quantum += 10
+ flows += 2
+ interval += 10
+ queue_limit += 512
+ target += 1
+
+ # commit changes
+ self.cli_commit()
+
+ codel_quantum = 1500
+ flows = 512
+ interval = 100
+ queue_limit = 2048
+ target = 5
+ for interface in self._interfaces:
+ tmp = get_tc_qdisc_json(interface)
+
+ self.assertEqual('fq_codel', tmp['kind'])
+ self.assertEqual(codel_quantum, tmp['options']['quantum'])
+ self.assertEqual(flows, tmp['options']['flows'])
+ self.assertEqual(queue_limit, tmp['options']['limit'])
+
+ # due to internal rounding we need to substract 1 from interval and target after converting to milliseconds
+ # configuration of:
+ # tc qdisc add dev eth0 root fq_codel quantum 1500 flows 512 interval 100ms limit 2048 target 5ms noecn
+ # results in: tc -j qdisc show dev eth0
+ # [{"kind":"fq_codel","handle":"8046:","root":true,"refcnt":3,"options":{"limit":2048,"flows":512,
+ # "quantum":1500,"target":4999,"interval":99999,"memory_limit":33554432,"drop_batch":64}}]
+ self.assertAlmostEqual(tmp['options']['interval'], interval *1000, delta=1)
+ self.assertAlmostEqual(tmp['options']['target'], target *1000 -1, delta=1)
+
+ codel_quantum += 10
+ flows += 2
+ interval += 10
+ queue_limit += 512
+ target += 1
+
+ def test_05_limiter(self):
+ qos_config = {
+ '1' : {
+ 'bandwidth' : '1000000',
+ 'match4' : {
+ 'ssh' : { 'dport' : '22', },
+ },
+ },
+ '2' : {
+ 'bandwidth' : '1000000',
+ 'match6' : {
+ 'ssh' : { 'dport' : '22', },
+ },
+ },
+ }
+
+ first = True
+ for interface in self._interfaces:
+ policy_name = f'qos-policy-{interface}'
+
+ if first:
+ self.cli_set(base_path + ['interface', interface, 'egress', policy_name])
+ # verify() - selected QoS policy on interface only supports egress
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
+ self.cli_delete(base_path + ['interface', interface, 'egress', policy_name])
+ first = False
+
+ self.cli_set(base_path + ['interface', interface, 'ingress', policy_name])
+ # set default bandwidth parameter for all remaining connections
+ self.cli_set(base_path + ['policy', 'limiter', policy_name, 'default', 'bandwidth', '500000'])
+
+ for qos_class, qos_class_config in qos_config.items():
+ qos_class_base = base_path + ['policy', 'limiter', policy_name, 'class', qos_class]
+
+ if 'match4' in qos_class_config:
+ for match, match_config in qos_class_config['match4'].items():
+ if 'dport' in match_config:
+ self.cli_set(qos_class_base + ['match', match, 'ip', 'destination', 'port', match_config['dport']])
+
+ if 'match6' in qos_class_config:
+ for match, match_config in qos_class_config['match6'].items():
+ if 'dport' in match_config:
+ self.cli_set(qos_class_base + ['match', match, 'ipv6', 'destination', 'port', match_config['dport']])
+
+ if 'bandwidth' in qos_class_config:
+ self.cli_set(qos_class_base + ['bandwidth', qos_class_config['bandwidth']])
+
+
+ # commit changes
+ self.cli_commit()
+
+ for interface in self._interfaces:
+ for filter in get_tc_filter_json(interface, 'ingress'):
+ # bail out early if filter has no attached action
+ if 'options' not in filter or 'actions' not in filter['options']:
+ continue
+
+ for qos_class, qos_class_config in qos_config.items():
+ # Every flowid starts with ffff and we encopde the class number after the colon
+ if 'flowid' not in filter['options'] or filter['options']['flowid'] != f'ffff:{qos_class}':
+ continue
+
+ ip_hdr_offset = 20
+ if 'match6' in qos_class_config:
+ ip_hdr_offset = 40
+
+ self.assertEqual(ip_hdr_offset, filter['options']['match']['off'])
+ if 'dport' in match_config:
+ dport = int(match_config['dport'])
+ self.assertEqual(f'{dport:x}', filter['options']['match']['value'])
+
+ def test_06_network_emulator(self):
+ policy_type = 'network-emulator'
+
+ bandwidth = 1000000
+ corruption = 1
+ delay = 2
+ duplicate = 3
+ loss = 4
+ queue_limit = 5
+ reordering = 6
+
+ first = True
+ for interface in self._interfaces:
+ policy_name = f'qos-policy-{interface}'
+
+ if first:
+ self.cli_set(base_path + ['interface', interface, 'ingress', policy_name])
+ # verify() - selected QoS policy on interface only supports egress
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
+ self.cli_delete(base_path + ['interface', interface, 'ingress', policy_name])
+ first = False
+
+ self.cli_set(base_path + ['interface', interface, 'egress', policy_name])
+
+ self.cli_set(base_path + ['policy', policy_type, policy_name, 'bandwidth', str(bandwidth)])
+ self.cli_set(base_path + ['policy', policy_type, policy_name, 'corruption', str(corruption)])
+ self.cli_set(base_path + ['policy', policy_type, policy_name, 'delay', str(delay)])
+ self.cli_set(base_path + ['policy', policy_type, policy_name, 'duplicate', str(duplicate)])
+ self.cli_set(base_path + ['policy', policy_type, policy_name, 'loss', str(loss)])
+ self.cli_set(base_path + ['policy', policy_type, policy_name, 'queue-limit', str(queue_limit)])
+ self.cli_set(base_path + ['policy', policy_type, policy_name, 'reordering', str(reordering)])
+
+ bandwidth += 1000000
+ corruption += 1
+ delay += 1
+ duplicate +=1
+ loss += 1
+ queue_limit += 1
+ reordering += 1
+
+ # commit changes
+ self.cli_commit()
+
+ bandwidth = 1000000
+ corruption = 1
+ delay = 2
+ duplicate = 3
+ loss = 4
+ queue_limit = 5
+ reordering = 6
+ for interface in self._interfaces:
+ tmp = get_tc_qdisc_json(interface)
+ self.assertEqual('netem', tmp['kind'])
+
+ self.assertEqual(int(bandwidth *125), tmp['options']['rate']['rate'])
+ # values are in %
+ self.assertEqual(corruption/100, tmp['options']['corrupt']['corrupt'])
+ self.assertEqual(duplicate/100, tmp['options']['duplicate']['duplicate'])
+ self.assertEqual(loss/100, tmp['options']['loss-random']['loss'])
+ self.assertEqual(reordering/100, tmp['options']['reorder']['reorder'])
+ self.assertEqual(delay/1000, tmp['options']['delay']['delay'])
+
+ self.assertEqual(queue_limit, tmp['options']['limit'])
+
+ bandwidth += 1000000
+ corruption += 1
+ delay += 1
+ duplicate += 1
+ loss += 1
+ queue_limit += 1
+ reordering += 1
+
+ def test_07_priority_queue(self):
+ priorities = ['1', '2', '3', '4', '5']
+
+ first = True
+ for interface in self._interfaces:
+ policy_name = f'qos-policy-{interface}'
+
+ if first:
+ self.cli_set(base_path + ['interface', interface, 'ingress', policy_name])
+ # verify() - selected QoS policy on interface only supports egress
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
+ self.cli_delete(base_path + ['interface', interface, 'ingress', policy_name])
+ first = False
+
+ self.cli_set(base_path + ['interface', interface, 'egress', policy_name])
+ self.cli_set(base_path + ['policy', 'priority-queue', policy_name, 'default', 'queue-limit', '10'])
+
+ for priority in priorities:
+ prio_base = base_path + ['policy', 'priority-queue', policy_name, 'class', priority]
+ self.cli_set(prio_base + ['match', f'prio-{priority}', 'ip', 'destination', 'port', str(1000 + int(priority))])
+
+ # commit changes
+ self.cli_commit()
+
+ def test_08_random_detect(self):
+ self.skipTest('tc returns invalid JSON here - needs iproute2 fix')
+ bandwidth = 5000
+
+ first = True
+ for interface in self._interfaces:
+ policy_name = f'qos-policy-{interface}'
+
+ if first:
+ self.cli_set(base_path + ['interface', interface, 'ingress', policy_name])
+ # verify() - selected QoS policy on interface only supports egress
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
+ self.cli_delete(base_path + ['interface', interface, 'ingress', policy_name])
+ first = False
+
+ self.cli_set(base_path + ['interface', interface, 'egress', policy_name])
+ self.cli_set(base_path + ['policy', 'random-detect', policy_name, 'bandwidth', str(bandwidth)])
+
+ bandwidth += 1000
+
+ # commit changes
+ self.cli_commit()
+
+ bandwidth = 5000
+ for interface in self._interfaces:
+ tmp = get_tc_qdisc_json(interface)
+ import pprint
+ pprint.pprint(tmp)
+
+ def test_09_rate_control(self):
+ bandwidth = 5000
+ burst = 20
+ latency = 5
+
+ first = True
+ for interface in self._interfaces:
+ policy_name = f'qos-policy-{interface}'
+
+ if first:
+ self.cli_set(base_path + ['interface', interface, 'ingress', policy_name])
+ # verify() - selected QoS policy on interface only supports egress
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
+ self.cli_delete(base_path + ['interface', interface, 'ingress', policy_name])
+ first = False
+
+ self.cli_set(base_path + ['interface', interface, 'egress', policy_name])
+ self.cli_set(base_path + ['policy', 'rate-control', policy_name, 'bandwidth', str(bandwidth)])
+ self.cli_set(base_path + ['policy', 'rate-control', policy_name, 'burst', str(burst)])
+ self.cli_set(base_path + ['policy', 'rate-control', policy_name, 'latency', str(latency)])
+
+ bandwidth += 1000
+ burst += 5
+ latency += 1
+ # commit changes
+ self.cli_commit()
+
+ bandwidth = 5000
+ burst = 20
+ latency = 5
+ for interface in self._interfaces:
+ tmp = get_tc_qdisc_json(interface)
+
+ self.assertEqual('tbf', tmp['kind'])
+ self.assertEqual(0, tmp['options']['mpu'])
+ # TC store rates as a 32-bit unsigned integer in bps (Bytes per second)
+ self.assertEqual(int(bandwidth * 125), tmp['options']['rate'])
+
+ bandwidth += 1000
+ burst += 5
+ latency += 1
+
+ def test_10_round_robin(self):
+ qos_config = {
+ '1' : {
+ 'match4' : {
+ 'ssh' : { 'dport' : '22', },
+ },
+ },
+ '2' : {
+ 'match6' : {
+ 'ssh' : { 'dport' : '22', },
+ },
+ },
+ }
+
+ first = True
+ for interface in self._interfaces:
+ policy_name = f'qos-policy-{interface}'
+
+ if first:
+ self.cli_set(base_path + ['interface', interface, 'ingress', policy_name])
+ # verify() - selected QoS policy on interface only supports egress
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
+ self.cli_delete(base_path + ['interface', interface, 'ingress', policy_name])
+ first = False
+
+ self.cli_set(base_path + ['interface', interface, 'egress', policy_name])
+
+ for qos_class, qos_class_config in qos_config.items():
+ qos_class_base = base_path + ['policy', 'round-robin', policy_name, 'class', qos_class]
+
+ if 'match4' in qos_class_config:
+ for match, match_config in qos_class_config['match4'].items():
+ if 'dport' in match_config:
+ self.cli_set(qos_class_base + ['match', match, 'ip', 'destination', 'port', match_config['dport']])
+
+ if 'match6' in qos_class_config:
+ for match, match_config in qos_class_config['match6'].items():
+ if 'dport' in match_config:
+ self.cli_set(qos_class_base + ['match', match, 'ipv6', 'destination', 'port', match_config['dport']])
+
+
+ # commit changes
+ self.cli_commit()
+
+ for interface in self._interfaces:
+ import pprint
+ tmp = get_tc_qdisc_json(interface)
+ self.assertEqual('drr', tmp['kind'])
+
+ for filter in get_tc_filter_json(interface, 'ingress'):
+ # bail out early if filter has no attached action
+ if 'options' not in filter or 'actions' not in filter['options']:
+ continue
+
+ for qos_class, qos_class_config in qos_config.items():
+ # Every flowid starts with ffff and we encopde the class number after the colon
+ if 'flowid' not in filter['options'] or filter['options']['flowid'] != f'ffff:{qos_class}':
+ continue
+
+ ip_hdr_offset = 20
+ if 'match6' in qos_class_config:
+ ip_hdr_offset = 40
+
+ self.assertEqual(ip_hdr_offset, filter['options']['match']['off'])
+ if 'dport' in match_config:
+ dport = int(match_config['dport'])
+ self.assertEqual(f'{dport:x}', filter['options']['match']['value'])
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2, failfast=True)
diff --git a/smoketest/scripts/cli/test_service_dhcpv6-relay.py b/smoketest/scripts/cli/test_service_dhcpv6-relay.py
index fc206435b..8bb58d296 100755
--- a/smoketest/scripts/cli/test_service_dhcpv6-relay.py
+++ b/smoketest/scripts/cli/test_service_dhcpv6-relay.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2020 VyOS maintainers and contributors
+# Copyright (C) 2020-2023 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -34,22 +34,30 @@ listen_addr = '2001:db8:ffff::1/64'
interfaces = []
class TestServiceDHCPv6Relay(VyOSUnitTestSHIM.TestCase):
- def setUp(self):
- for tmp in interfaces:
+ @classmethod
+ def setUpClass(cls):
+ super(TestServiceDHCPv6Relay, cls).setUpClass()
+
+ # ensure we can also run this test on a live system - so lets clean
+ # out the current configuration :)
+ cls.cli_delete(cls, base_path)
+
+ for tmp in Section.interfaces('ethernet', vlan=False):
+ interfaces.append(tmp)
listen = listen_addr
if tmp == upstream_if:
listen = upstream_if_addr
- self.cli_set(['interfaces', 'ethernet', tmp, 'address', listen])
+ cls.cli_set(cls, ['interfaces', 'ethernet', tmp, 'address', listen])
- def tearDown(self):
- self.cli_delete(base_path)
+ @classmethod
+ def tearDownClass(cls):
for tmp in interfaces:
listen = listen_addr
if tmp == upstream_if:
listen = upstream_if_addr
- self.cli_delete(['interfaces', 'ethernet', tmp, 'address', listen])
+ cls.cli_delete(cls, ['interfaces', 'ethernet', tmp, 'address', listen])
- self.cli_commit()
+ super(TestServiceDHCPv6Relay, cls).tearDownClass()
def test_relay_default(self):
dhcpv6_server = '2001:db8::ffff'
@@ -100,9 +108,5 @@ class TestServiceDHCPv6Relay(VyOSUnitTestSHIM.TestCase):
self.assertTrue(process_named_running(PROCESS_NAME))
if __name__ == '__main__':
- for tmp in Section.interfaces('ethernet'):
- if '.' not in tmp:
- interfaces.append(tmp)
-
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_system_ntp.py b/smoketest/scripts/cli/test_service_ntp.py
index a0806acf0..3ccd19a31 100755
--- a/smoketest/scripts/cli/test_system_ntp.py
+++ b/smoketest/scripts/cli/test_service_ntp.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2019-2022 VyOS maintainers and contributors
+# Copyright (C) 2019-2023 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -19,14 +19,12 @@ import unittest
from base_vyostest_shim import VyOSUnitTestSHIM
from vyos.configsession import ConfigSessionError
-from vyos.template import address_from_cidr
-from vyos.template import netmask_from_cidr
-from vyos.util import read_file
+from vyos.util import cmd
from vyos.util import process_named_running
-PROCESS_NAME = 'ntpd'
-NTP_CONF = '/run/ntpd/ntpd.conf'
-base_path = ['system', 'ntp']
+PROCESS_NAME = 'chronyd'
+NTP_CONF = '/run/chrony/chrony.conf'
+base_path = ['service', 'ntp']
class TestSystemNTP(VyOSUnitTestSHIM.TestCase):
@classmethod
@@ -38,6 +36,8 @@ class TestSystemNTP(VyOSUnitTestSHIM.TestCase):
cls.cli_delete(cls, base_path)
def tearDown(self):
+ self.assertTrue(process_named_running(PROCESS_NAME))
+
self.cli_delete(base_path)
self.cli_commit()
@@ -46,7 +46,7 @@ class TestSystemNTP(VyOSUnitTestSHIM.TestCase):
def test_01_ntp_options(self):
# Test basic NTP support with multiple servers and their options
servers = ['192.0.2.1', '192.0.2.2']
- options = ['noselect', 'preempt', 'prefer']
+ options = ['noselect', 'prefer']
pools = ['pool.vyos.io']
for server in servers:
@@ -61,12 +61,14 @@ class TestSystemNTP(VyOSUnitTestSHIM.TestCase):
self.cli_commit()
# Check generated configuration
- config = read_file(NTP_CONF)
- self.assertIn('driftfile /var/lib/ntp/ntp.drift', config)
- self.assertIn('restrict default noquery nopeer notrap nomodify', config)
- self.assertIn('restrict source nomodify notrap noquery', config)
- self.assertIn('restrict 127.0.0.1', config)
- self.assertIn('restrict -6 ::1', config)
+ # this file must be read with higher permissions
+ config = cmd(f'sudo cat {NTP_CONF}')
+ self.assertIn('driftfile /run/chrony/drift', config)
+ self.assertIn('dumpdir /run/chrony', config)
+ self.assertIn('clientloglimit 1048576', config)
+ self.assertIn('rtcsync', config)
+ self.assertIn('makestep 1.0 3', config)
+ self.assertIn('leapsectz right/UTC', config)
for server in servers:
self.assertIn(f'server {server} iburst ' + ' '.join(options), config)
@@ -80,9 +82,9 @@ class TestSystemNTP(VyOSUnitTestSHIM.TestCase):
for listen in listen_address:
self.cli_set(base_path + ['listen-address', listen])
- networks = ['192.0.2.0/24', '2001:db8:1000::/64']
+ networks = ['192.0.2.0/24', '2001:db8:1000::/64', '100.64.0.0', '2001:db8::ffff']
for network in networks:
- self.cli_set(base_path + ['allow-clients', 'address', network])
+ self.cli_set(base_path + ['allow-client', 'address', network])
# Verify "NTP server not configured" verify() statement
with self.assertRaises(ConfigSessionError):
@@ -95,18 +97,14 @@ class TestSystemNTP(VyOSUnitTestSHIM.TestCase):
self.cli_commit()
# Check generated client address configuration
- config = read_file(NTP_CONF)
- self.assertIn('restrict default ignore', config)
-
+ # this file must be read with higher permissions
+ config = cmd(f'sudo cat {NTP_CONF}')
for network in networks:
- network_address = address_from_cidr(network)
- network_netmask = netmask_from_cidr(network)
- self.assertIn(f'restrict {network_address} mask {network_netmask} nomodify notrap nopeer', config)
+ self.assertIn(f'allow {network}', config)
# Check listen address
- self.assertIn('interface ignore wildcard', config)
for listen in listen_address:
- self.assertIn(f'interface listen {listen}', config)
+ self.assertIn(f'bindaddress {listen}', config)
def test_03_ntp_interface(self):
interfaces = ['eth0', 'eth1']
@@ -120,10 +118,28 @@ class TestSystemNTP(VyOSUnitTestSHIM.TestCase):
self.cli_commit()
# Check generated client address configuration
- config = read_file(NTP_CONF)
- self.assertIn('interface ignore wildcard', config)
+ # this file must be read with higher permissions
+ config = cmd(f'sudo cat {NTP_CONF}')
for interface in interfaces:
- self.assertIn(f'interface listen {interface}', config)
+ self.assertIn(f'binddevice {interface}', config)
+
+ def test_04_ntp_vrf(self):
+ vrf_name = 'vyos-mgmt'
+
+ self.cli_set(['vrf', 'name', vrf_name, 'table', '12345'])
+ self.cli_set(base_path + ['vrf', vrf_name])
+
+ servers = ['time1.vyos.net', 'time2.vyos.net']
+ for server in servers:
+ self.cli_set(base_path + ['server', server])
+
+ self.cli_commit()
+
+ # Check for process in VRF
+ tmp = cmd(f'ip vrf pids {vrf_name}')
+ self.assertIn(PROCESS_NAME, tmp)
+
+ self.cli_delete(['vrf', 'name', vrf_name])
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_service_router-advert.py b/smoketest/scripts/cli/test_service_router-advert.py
index 873be7df0..0169b7934 100755
--- a/smoketest/scripts/cli/test_service_router-advert.py
+++ b/smoketest/scripts/cli/test_service_router-advert.py
@@ -37,7 +37,6 @@ def get_config_value(key):
return tmp[0].split()[0].replace(';','')
class TestServiceRADVD(VyOSUnitTestSHIM.TestCase):
-
@classmethod
def setUpClass(cls):
super(TestServiceRADVD, cls).setUpClass()
@@ -114,7 +113,6 @@ class TestServiceRADVD(VyOSUnitTestSHIM.TestCase):
tmp = get_config_value('DecrementLifetimes')
self.assertEqual(tmp, 'off')
-
def test_dns(self):
nameserver = ['2001:db8::1', '2001:db8::2']
dnssl = ['vyos.net', 'vyos.io']
@@ -150,7 +148,6 @@ class TestServiceRADVD(VyOSUnitTestSHIM.TestCase):
tmp = 'DNSSL ' + ' '.join(dnssl) + ' {'
self.assertIn(tmp, config)
-
def test_deprecate_prefix(self):
self.cli_set(base_path + ['prefix', prefix, 'valid-lifetime', 'infinity'])
self.cli_set(base_path + ['prefix', prefix, 'deprecate-prefix'])
@@ -159,13 +156,45 @@ class TestServiceRADVD(VyOSUnitTestSHIM.TestCase):
# commit changes
self.cli_commit()
- config = read_file(RADVD_CONF)
-
tmp = get_config_value('DeprecatePrefix')
self.assertEqual(tmp, 'on')
tmp = get_config_value('DecrementLifetimes')
self.assertEqual(tmp, 'on')
+ def test_route(self):
+ route = '2001:db8:1000::/64'
+
+ self.cli_set(base_path + ['prefix', prefix])
+ self.cli_set(base_path + ['route', route])
+
+ # commit changes
+ self.cli_commit()
+
+ config = read_file(RADVD_CONF)
+
+ tmp = f'route {route}' + ' {'
+ self.assertIn(tmp, config)
+
+ self.assertIn('AdvRouteLifetime 1800;', config)
+ self.assertIn('AdvRoutePreference medium;', config)
+ self.assertIn('RemoveRoute on;', config)
+
+ def test_rasrcaddress(self):
+ ra_src = ['fe80::1', 'fe80::2']
+
+ self.cli_set(base_path + ['prefix', prefix])
+ for src in ra_src:
+ self.cli_set(base_path + ['source-address', src])
+
+ # commit changes
+ self.cli_commit()
+
+ config = read_file(RADVD_CONF)
+ self.assertIn('AdvRASrcAddress {', config)
+ for src in ra_src:
+ self.assertIn(f' {src};', config)
+
+
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_service_tftp-server.py b/smoketest/scripts/cli/test_service_tftp-server.py
index b57c33f26..99d81e203 100755
--- a/smoketest/scripts/cli/test_service_tftp-server.py
+++ b/smoketest/scripts/cli/test_service_tftp-server.py
@@ -33,15 +33,32 @@ address_ipv6 = '2001:db8::1'
vrf = 'mgmt'
class TestServiceTFTPD(VyOSUnitTestSHIM.TestCase):
- def setUp(self):
- self.cli_set(dummy_if_path + ['address', address_ipv4 + '/32'])
- self.cli_set(dummy_if_path + ['address', address_ipv6 + '/128'])
+ @classmethod
+ def setUpClass(cls):
+ super(TestServiceTFTPD, cls).setUpClass()
+
+ # ensure we can also run this test on a live system - so lets clean
+ # out the current configuration :)
+ cls.cli_delete(cls, base_path)
+
+ cls.cli_set(cls, dummy_if_path + ['address', address_ipv4 + '/32'])
+ cls.cli_set(cls, dummy_if_path + ['address', address_ipv6 + '/128'])
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.cli_delete(cls, dummy_if_path)
+ super(TestServiceTFTPD, cls).tearDownClass()
def tearDown(self):
+ # Check for running process
+ self.assertTrue(process_named_running(PROCESS_NAME))
+
self.cli_delete(base_path)
- self.cli_delete(dummy_if_path)
self.cli_commit()
+ # Check for no longer running process
+ self.assertFalse(process_named_running(PROCESS_NAME))
+
def test_01_tftpd_single(self):
directory = '/tmp'
port = '69' # default port
@@ -61,9 +78,6 @@ class TestServiceTFTPD(VyOSUnitTestSHIM.TestCase):
# verify upload
self.assertIn('--create --umask 000', config)
- # Check for running process
- self.assertTrue(process_named_running(PROCESS_NAME))
-
def test_02_tftpd_multi(self):
directory = '/tmp'
address = [address_ipv4, address_ipv6]
@@ -125,9 +139,6 @@ class TestServiceTFTPD(VyOSUnitTestSHIM.TestCase):
# verify upload
self.assertIn('--create --umask 000', config)
- # Check for running process
- self.assertTrue(process_named_running(PROCESS_NAME))
-
# Check for process in VRF
tmp = cmd(f'ip vrf pids {vrf}')
self.assertIn(PROCESS_NAME, tmp)
diff --git a/smoketest/scripts/cli/test_vpn_ipsec.py b/smoketest/scripts/cli/test_vpn_ipsec.py
index bd242104f..46db0bbf5 100755
--- a/smoketest/scripts/cli/test_vpn_ipsec.py
+++ b/smoketest/scripts/cli/test_vpn_ipsec.py
@@ -39,6 +39,7 @@ vif = '100'
esp_group = 'MyESPGroup'
ike_group = 'MyIKEGroup'
secret = 'MYSECRETKEY'
+PROCESS_NAME = 'charon'
ca_pem = """
MIIDSzCCAjOgAwIBAgIUQHK+ZgTUYZksvXY2/MyW+Jiels4wDQYJKoZIhvcNAQEL
@@ -137,14 +138,14 @@ class TestVPNIPsec(VyOSUnitTestSHIM.TestCase):
def tearDown(self):
# Check for running process
- self.assertTrue(process_named_running('charon'))
+ self.assertTrue(process_named_running(PROCESS_NAME))
self.cli_delete(base_path)
self.cli_delete(tunnel_path)
self.cli_commit()
# Check for no longer running process
- self.assertFalse(process_named_running('charon'))
+ self.assertFalse(process_named_running(PROCESS_NAME))
def test_01_dhcp_fail_handling(self):
# Interface for dhcp-interface
@@ -166,6 +167,8 @@ class TestVPNIPsec(VyOSUnitTestSHIM.TestCase):
dhcp_waiting = read_file(dhcp_waiting_file)
self.assertIn(f'{interface}.{vif}', dhcp_waiting) # Ensure dhcp-failed interface was added for dhclient hook
+ self.cli_delete(ethernet_path + [interface, 'vif', vif, 'address'])
+
def test_02_site_to_site(self):
self.cli_set(base_path + ['ike-group', ike_group, 'key-exchange', 'ikev2'])
diff --git a/smoketest/scripts/cli/test_vpn_openconnect.py b/smoketest/scripts/cli/test_vpn_openconnect.py
index 8572d6d66..ec8ecacb9 100755
--- a/smoketest/scripts/cli/test_vpn_openconnect.py
+++ b/smoketest/scripts/cli/test_vpn_openconnect.py
@@ -18,6 +18,7 @@ import unittest
from base_vyostest_shim import VyOSUnitTestSHIM
+from vyos.template import ip_from_cidr
from vyos.util import process_named_running
from vyos.util import read_file
@@ -52,6 +53,9 @@ config_file = '/run/ocserv/ocserv.conf'
auth_file = '/run/ocserv/ocpasswd'
otp_file = '/run/ocserv/users.oath'
+listen_if = 'dum116'
+listen_address = '100.64.0.1/32'
+
class TestVPNOpenConnect(VyOSUnitTestSHIM.TestCase):
@classmethod
def setUpClass(cls):
@@ -61,6 +65,8 @@ class TestVPNOpenConnect(VyOSUnitTestSHIM.TestCase):
# out the current configuration :)
cls.cli_delete(cls, base_path)
+ cls.cli_set(cls, ['interfaces', 'dummy', listen_if, 'address', listen_address])
+
cls.cli_set(cls, pki_path + ['ca', 'openconnect', 'certificate', cert_data.replace('\n','')])
cls.cli_set(cls, pki_path + ['certificate', 'openconnect', 'certificate', cert_data.replace('\n','')])
cls.cli_set(cls, pki_path + ['certificate', 'openconnect', 'private', 'key', key_data.replace('\n','')])
@@ -68,6 +74,7 @@ class TestVPNOpenConnect(VyOSUnitTestSHIM.TestCase):
@classmethod
def tearDownClass(cls):
cls.cli_delete(cls, pki_path)
+ cls.cli_delete(cls, ['interfaces', 'dummy', listen_if])
super(TestVPNOpenConnect, cls).tearDownClass()
def tearDown(self):
@@ -104,6 +111,9 @@ class TestVPNOpenConnect(VyOSUnitTestSHIM.TestCase):
self.cli_set(base_path + ['ssl', 'ca-certificate', 'openconnect'])
self.cli_set(base_path + ['ssl', 'certificate', 'openconnect'])
+ listen_ip_no_cidr = ip_from_cidr(listen_address)
+ self.cli_set(base_path + ['listen-address', listen_ip_no_cidr])
+
self.cli_commit()
# Verify configuration
@@ -111,10 +121,15 @@ class TestVPNOpenConnect(VyOSUnitTestSHIM.TestCase):
# authentication mode local password-otp
self.assertIn(f'auth = "plain[passwd=/run/ocserv/ocpasswd,otp=/run/ocserv/users.oath]"', daemon_config)
+ self.assertIn(f'listen-host = {listen_ip_no_cidr}', daemon_config)
self.assertIn(f'ipv4-network = {v4_subnet}', daemon_config)
self.assertIn(f'ipv6-network = {v6_prefix}', daemon_config)
self.assertIn(f'ipv6-subnet-prefix = {v6_len}', daemon_config)
+ # defaults
+ self.assertIn(f'tcp-port = 443', daemon_config)
+ self.assertIn(f'udp-port = 443', daemon_config)
+
for ns in name_server:
self.assertIn(f'dns = {ns}', daemon_config)
for domain in split_dns:
diff --git a/smoketest/scripts/system/test_module_load.py b/smoketest/scripts/system/test_module_load.py
index 76a41ac4d..bd30c57ec 100755
--- a/smoketest/scripts/system/test_module_load.py
+++ b/smoketest/scripts/system/test_module_load.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2019-2020 VyOS maintainers and contributors
+# Copyright (C) 2019-2023 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -23,8 +23,7 @@ modules = {
"intel_qat": ["qat_200xx", "qat_200xxvf", "qat_c3xxx", "qat_c3xxxvf",
"qat_c62x", "qat_c62xvf", "qat_d15xx", "qat_d15xxvf",
"qat_dh895xcc", "qat_dh895xccvf"],
- "accel_ppp": ["ipoe", "vlan_mon"],
- "misc": ["wireguard"]
+ "accel_ppp": ["ipoe", "vlan_mon"]
}
class TestKernelModules(unittest.TestCase):