diff options
Diffstat (limited to 'smoketest')
23 files changed, 1251 insertions, 368 deletions
diff --git a/smoketest/scripts/cli/base_accel_ppp_test.py b/smoketest/scripts/cli/base_accel_ppp_test.py index 989028f64..32624719f 100644 --- a/smoketest/scripts/cli/base_accel_ppp_test.py +++ b/smoketest/scripts/cli/base_accel_ppp_test.py @@ -11,10 +11,10 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. - import re import unittest + from base_vyostest_shim import VyOSUnitTestSHIM from configparser import ConfigParser @@ -25,12 +25,12 @@ from vyos.utils.system import get_half_cpus from vyos.utils.process import process_named_running from vyos.utils.process import cmd + class BasicAccelPPPTest: class TestCase(VyOSUnitTestSHIM.TestCase): - @classmethod def setUpClass(cls): - cls._process_name = 'accel-pppd' + cls._process_name = "accel-pppd" super(BasicAccelPPPTest.TestCase, cls).setUpClass() @@ -39,7 +39,7 @@ class BasicAccelPPPTest: cls.cli_delete(cls, cls._base_path) def setUp(self): - self._gateway = '192.0.2.1' + self._gateway = "192.0.2.1" # ensure we can also run this test on a live system - so lets clean # out the current configuration :) self.cli_delete(self._base_path) @@ -60,84 +60,189 @@ class BasicAccelPPPTest: def delete(self, path): self.cli_delete(self._base_path + path) - def basic_config(self): - # PPPoE local auth mode requires local users to be configured! - self.set(['authentication', 'local-users', 'username', 'vyos', 'password', 'vyos']) - self.set(['authentication', 'mode', 'local']) - self.set(['gateway-address', self._gateway]) + def basic_protocol_specific_config(self): + """ + An astract method. + Initialize protocol scpecific configureations. + """ + self.assertFalse(True, msg="Function must be defined") + + def initial_auth_config(self): + """ + Initialization of default authentication for all protocols + """ + self.set( + [ + "authentication", + "local-users", + "username", + "vyos", + "password", + "vyos", + ] + ) + self.set(["authentication", "mode", "local"]) + + def initial_gateway_config(self): + """ + Initialization of default gateway + """ + self.set(["gateway-address", self._gateway]) + + def initial_pool_config(self): + """ + Initialization of default client ip pool + """ + first_pool = "SIMPLE-POOL" + self.set(["client-ip-pool", first_pool, "range", "192.0.2.0/24"]) + self.set(["default-pool", first_pool]) + + def basic_config(self, is_auth=True, is_gateway=True, is_client_pool=True): + """ + Initialization of basic configuration + :param is_auth: authentication initialization + :type is_auth: bool + :param is_gateway: gateway initialization + :type is_gateway: bool + :param is_client_pool: client ip pool initialization + :type is_client_pool: bool + """ + self.basic_protocol_specific_config() + if is_auth: + self.initial_auth_config() + if is_gateway: + self.initial_gateway_config() + if is_client_pool: + self.initial_pool_config() + + def getConfig(self, start, end="cli"): + """ + Return part of configuration from line + where the first injection of start keyword to the line + where the first injection of end keyowrd + :param start: start keyword + :type start: str + :param end: end keyword + :type end: str + :return: part of config + :rtype: str + """ + command = f'cat {self._config_file} | sed -n "/^\[{start}/,/^\[{end}/p"' + out = cmd(command) + return out def verify(self, conf): - self.assertEqual(conf['core']['thread-count'], str(get_half_cpus())) + self.assertEqual(conf["core"]["thread-count"], str(get_half_cpus())) def test_accel_name_servers(self): # Verify proper Name-Server configuration for IPv4 and IPv6 self.basic_config() - nameserver = ['192.0.2.1', '192.0.2.2', '2001:db8::1'] + nameserver = ["192.0.2.1", "192.0.2.2", "2001:db8::1"] for ns in nameserver: - self.set(['name-server', ns]) + self.set(["name-server", ns]) # commit changes self.cli_commit() # Validate configuration values - conf = ConfigParser(allow_no_value=True, delimiters='=') + conf = ConfigParser(allow_no_value=True, delimiters="=", strict=False) conf.read(self._config_file) # IPv4 and IPv6 nameservers must be checked individually for ns in nameserver: if is_ipv4(ns): - self.assertIn(ns, [conf['dns']['dns1'], conf['dns']['dns2']]) + self.assertIn(ns, [conf["dns"]["dns1"], conf["dns"]["dns2"]]) else: - self.assertEqual(conf['ipv6-dns'][ns], None) + self.assertEqual(conf["ipv6-dns"][ns], None) def test_accel_local_authentication(self): # Test configuration of local authentication self.basic_config() # upload / download limit - user = 'test' - password = 'test2' - static_ip = '100.100.100.101' - upload = '5000' - download = '10000' - - self.set(['authentication', 'local-users', 'username', user, 'password', password]) - self.set(['authentication', 'local-users', 'username', user, 'static-ip', static_ip]) - self.set(['authentication', 'local-users', 'username', user, 'rate-limit', 'upload', upload]) + user = "test" + password = "test2" + static_ip = "100.100.100.101" + upload = "5000" + download = "10000" + + self.set( + [ + "authentication", + "local-users", + "username", + user, + "password", + password, + ] + ) + self.set( + [ + "authentication", + "local-users", + "username", + user, + "static-ip", + static_ip, + ] + ) + self.set( + [ + "authentication", + "local-users", + "username", + user, + "rate-limit", + "upload", + upload, + ] + ) # upload rate-limit requires also download rate-limit with self.assertRaises(ConfigSessionError): self.cli_commit() - self.set(['authentication', 'local-users', 'username', user, 'rate-limit', 'download', download]) + self.set( + [ + "authentication", + "local-users", + "username", + user, + "rate-limit", + "download", + download, + ] + ) # commit changes self.cli_commit() # Validate configuration values - conf = ConfigParser(allow_no_value=True, delimiters='=') + conf = ConfigParser(allow_no_value=True, delimiters="=", strict=False) conf.read(self._config_file) # check proper path to chap-secrets file - self.assertEqual(conf['chap-secrets']['chap-secrets'], self._chap_secrets) + self.assertEqual(conf["chap-secrets"]["chap-secrets"], self._chap_secrets) # basic verification self.verify(conf) # check local users - tmp = cmd(f'sudo cat {self._chap_secrets}') - regex = f'{user}\s+\*\s+{password}\s+{static_ip}\s+{download}/{upload}' + tmp = cmd(f"sudo cat {self._chap_secrets}") + regex = f"{user}\s+\*\s+{password}\s+{static_ip}\s+{download}/{upload}" tmp = re.findall(regex, tmp) self.assertTrue(tmp) # Check local-users default value(s) - self.delete(['authentication', 'local-users', 'username', user, 'static-ip']) + self.delete( + ["authentication", "local-users", "username", user, "static-ip"] + ) # commit changes self.cli_commit() # check local users - tmp = cmd(f'sudo cat {self._chap_secrets}') - regex = f'{user}\s+\*\s+{password}\s+\*\s+{download}/{upload}' + tmp = cmd(f"sudo cat {self._chap_secrets}") + regex = f"{user}\s+\*\s+{password}\s+\*\s+{download}/{upload}" tmp = re.findall(regex, tmp) self.assertTrue(tmp) @@ -145,74 +250,170 @@ class BasicAccelPPPTest: # Test configuration of RADIUS authentication for PPPoE server self.basic_config() - radius_server = '192.0.2.22' - radius_key = 'secretVyOS' - radius_port = '2000' - radius_port_acc = '3000' - - self.set(['authentication', 'mode', 'radius']) - self.set(['authentication', 'radius', 'server', radius_server, 'key', radius_key]) - self.set(['authentication', 'radius', 'server', radius_server, 'port', radius_port]) - self.set(['authentication', 'radius', 'server', radius_server, 'acct-port', radius_port_acc]) - - coa_server = '4.4.4.4' - coa_key = 'testCoA' - self.set(['authentication', 'radius', 'dynamic-author', 'server', coa_server]) - self.set(['authentication', 'radius', 'dynamic-author', 'key', coa_key]) - - nas_id = 'VyOS-PPPoE' - nas_ip = '7.7.7.7' - self.set(['authentication', 'radius', 'nas-identifier', nas_id]) - self.set(['authentication', 'radius', 'nas-ip-address', nas_ip]) - - source_address = '1.2.3.4' - self.set(['authentication', 'radius', 'source-address', source_address]) + radius_server = "192.0.2.22" + radius_key = "secretVyOS" + radius_port = "2000" + radius_port_acc = "3000" + + self.set(["authentication", "mode", "radius"]) + self.set( + ["authentication", "radius", "server", radius_server, "key", radius_key] + ) + self.set( + [ + "authentication", + "radius", + "server", + radius_server, + "port", + radius_port, + ] + ) + self.set( + [ + "authentication", + "radius", + "server", + radius_server, + "acct-port", + radius_port_acc, + ] + ) + + coa_server = "4.4.4.4" + coa_key = "testCoA" + self.set( + ["authentication", "radius", "dynamic-author", "server", coa_server] + ) + self.set(["authentication", "radius", "dynamic-author", "key", coa_key]) + + nas_id = "VyOS-PPPoE" + nas_ip = "7.7.7.7" + self.set(["authentication", "radius", "nas-identifier", nas_id]) + self.set(["authentication", "radius", "nas-ip-address", nas_ip]) + + source_address = "1.2.3.4" + self.set(["authentication", "radius", "source-address", source_address]) # commit changes self.cli_commit() # Validate configuration values - conf = ConfigParser(allow_no_value=True, delimiters='=') + conf = ConfigParser(allow_no_value=True, delimiters="=", strict=False) conf.read(self._config_file) # basic verification self.verify(conf) # check auth - self.assertTrue(conf['radius'].getboolean('verbose')) - self.assertEqual(conf['radius']['acct-timeout'], '3') - self.assertEqual(conf['radius']['timeout'], '3') - self.assertEqual(conf['radius']['max-try'], '3') - - self.assertEqual(conf['radius']['dae-server'], f'{coa_server}:1700,{coa_key}') - self.assertEqual(conf['radius']['nas-identifier'], nas_id) - self.assertEqual(conf['radius']['nas-ip-address'], nas_ip) - self.assertEqual(conf['radius']['bind'], source_address) - - server = conf['radius']['server'].split(',') + self.assertTrue(conf["radius"].getboolean("verbose")) + self.assertEqual(conf["radius"]["acct-timeout"], "3") + self.assertEqual(conf["radius"]["timeout"], "3") + self.assertEqual(conf["radius"]["max-try"], "3") + + self.assertEqual( + conf["radius"]["dae-server"], f"{coa_server}:1700,{coa_key}" + ) + self.assertEqual(conf["radius"]["nas-identifier"], nas_id) + self.assertEqual(conf["radius"]["nas-ip-address"], nas_ip) + self.assertEqual(conf["radius"]["bind"], source_address) + + server = conf["radius"]["server"].split(",") self.assertEqual(radius_server, server[0]) self.assertEqual(radius_key, server[1]) - self.assertEqual(f'auth-port={radius_port}', server[2]) - self.assertEqual(f'acct-port={radius_port_acc}', server[3]) - self.assertEqual(f'req-limit=0', server[4]) - self.assertEqual(f'fail-time=0', server[5]) + self.assertEqual(f"auth-port={radius_port}", server[2]) + self.assertEqual(f"acct-port={radius_port_acc}", server[3]) + self.assertEqual(f"req-limit=0", server[4]) + self.assertEqual(f"fail-time=0", server[5]) # # Disable Radius Accounting # - self.delete(['authentication', 'radius', 'server', radius_server, 'acct-port']) - self.set(['authentication', 'radius', 'server', radius_server, 'disable-accounting']) + self.delete( + ["authentication", "radius", "server", radius_server, "acct-port"] + ) + self.set( + [ + "authentication", + "radius", + "server", + radius_server, + "disable-accounting", + ] + ) # commit changes self.cli_commit() conf.read(self._config_file) - server = conf['radius']['server'].split(',') + server = conf["radius"]["server"].split(",") self.assertEqual(radius_server, server[0]) self.assertEqual(radius_key, server[1]) - self.assertEqual(f'auth-port={radius_port}', server[2]) - self.assertEqual(f'acct-port=0', server[3]) - self.assertEqual(f'req-limit=0', server[4]) - self.assertEqual(f'fail-time=0', server[5]) + self.assertEqual(f"auth-port={radius_port}", server[2]) + self.assertEqual(f"acct-port=0", server[3]) + self.assertEqual(f"req-limit=0", server[4]) + self.assertEqual(f"fail-time=0", server[5]) + + def test_accel_ipv4_pool(self): + """ + Test accel-ppp IPv4 pool + """ + self.basic_config(is_gateway=False, is_client_pool=False) + gateway = "192.0.2.1" + subnet = "172.16.0.0/24" + first_pool = "POOL1" + second_pool = "POOL2" + range = "192.0.2.10-192.0.2.20" + + self.set(["gateway-address", gateway]) + self.set(["client-ip-pool", first_pool, "range", subnet]) + self.set(["client-ip-pool", first_pool, "next-pool", second_pool]) + self.set(["client-ip-pool", second_pool, "range", range]) + self.set(["default-pool", first_pool]) + # commit changes + + self.cli_commit() + + # Validate configuration values + conf = ConfigParser(allow_no_value=True, delimiters="=", strict=False) + conf.read(self._config_file) + + self.assertEqual( + f"{first_pool},next={second_pool}", conf["ip-pool"][f"{subnet},name"] + ) + self.assertEqual(second_pool, conf["ip-pool"][f"{range},name"]) + self.assertEqual(gateway, conf["ip-pool"]["gw-ip-address"]) + self.assertEqual(first_pool, conf[self._protocol_section]["ip-pool"]) + + def test_accel_next_pool(self): + """ + T5099 required specific order + """ + self.basic_config(is_gateway=False, is_client_pool=False) + + gateway = "192.0.2.1" + first_pool = "VyOS-pool1" + first_subnet = "192.0.2.0/25" + second_pool = "Vyos-pool2" + second_subnet = "203.0.113.0/25" + third_pool = "Vyos-pool3" + third_subnet = "198.51.100.0/24" + + self.set(["gateway-address", gateway]) + self.set(["client-ip-pool", first_pool, "range", first_subnet]) + self.set(["client-ip-pool", first_pool, "next-pool", second_pool]) + self.set(["client-ip-pool", second_pool, "range", second_subnet]) + self.set(["client-ip-pool", second_pool, "next-pool", third_pool]) + self.set(["client-ip-pool", third_pool, "range", third_subnet]) + + # commit changes + self.cli_commit() + + config = self.getConfig("ip-pool") + pool_config = f"""gw-ip-address={gateway} +{third_subnet},name={third_pool} +{second_subnet},name={second_pool},next={third_pool} +{first_subnet},name={first_pool},next={second_pool}""" + self.assertIn(pool_config, config) diff --git a/smoketest/scripts/cli/test_nat64.py b/smoketest/scripts/cli/test_nat64.py new file mode 100755 index 000000000..b5723ac7e --- /dev/null +++ b/smoketest/scripts/cli/test_nat64.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2023 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import json +import os +import unittest + +from base_vyostest_shim import VyOSUnitTestSHIM +from vyos.configsession import ConfigSessionError +from vyos.utils.process import cmd +from vyos.utils.dict import dict_search + +base_path = ['nat64'] +src_path = base_path + ['source'] + +jool_nat64_config = '/run/jool/instance-100.json' + + +class TestNAT64(VyOSUnitTestSHIM.TestCase): + @classmethod + def setUpClass(cls): + super(TestNAT64, cls).setUpClass() + + # ensure we can also run this test on a live system - so lets clean + # out the current configuration :) + cls.cli_delete(cls, base_path) + + def tearDown(self): + self.cli_delete(base_path) + self.cli_commit() + self.assertFalse(os.path.exists(jool_nat64_config)) + + def test_snat64(self): + rule = '100' + translation_rule = '10' + prefix_v6 = '64:ff9b::/96' + pool = '192.0.2.10' + pool_port = '1-65535' + + self.cli_set(src_path + ['rule', rule, 'source', 'prefix', prefix_v6]) + self.cli_set( + src_path + + ['rule', rule, 'translation', 'pool', translation_rule, 'address', pool] + ) + self.cli_set( + src_path + + ['rule', rule, 'translation', 'pool', translation_rule, 'port', pool_port] + ) + self.cli_commit() + + # Load the JSON file + with open(f'/run/jool/instance-{rule}.json', 'r') as json_file: + config_data = json.load(json_file) + + # Assertions based on the content of the JSON file + self.assertEqual(config_data['instance'], f'instance-{rule}') + self.assertEqual(config_data['framework'], 'netfilter') + self.assertEqual(config_data['global']['pool6'], prefix_v6) + self.assertTrue(config_data['global']['manually-enabled']) + + # Check the pool4 entries + pool4_entries = config_data.get('pool4', []) + self.assertIsInstance(pool4_entries, list) + self.assertGreater(len(pool4_entries), 0) + + for entry in pool4_entries: + self.assertIn('protocol', entry) + self.assertIn('prefix', entry) + self.assertIn('port range', entry) + + protocol = entry['protocol'] + prefix = entry['prefix'] + port_range = entry['port range'] + + if protocol == 'ICMP': + self.assertEqual(prefix, pool) + self.assertEqual(port_range, pool_port) + elif protocol == 'UDP': + self.assertEqual(prefix, pool) + self.assertEqual(port_range, pool_port) + elif protocol == 'TCP': + self.assertEqual(prefix, pool) + self.assertEqual(port_range, pool_port) + else: + self.fail(f'Unexpected protocol: {protocol}') + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_policy.py b/smoketest/scripts/cli/test_policy.py index 51a33f978..c21d8af4e 100755 --- a/smoketest/scripts/cli/test_policy.py +++ b/smoketest/scripts/cli/test_policy.py @@ -1107,6 +1107,33 @@ class TestPolicy(VyOSUnitTestSHIM.TestCase): 'metric' : '-20', }, }, + '30': { + 'action': 'permit', + 'match': { + 'ip-nexthop-addr': ipv4_nexthop_address, + }, + 'set': { + 'metric': 'rtt', + }, + }, + '40': { + 'action': 'permit', + 'match': { + 'ip-nexthop-addr': ipv4_nexthop_address, + }, + 'set': { + 'metric': '+rtt', + }, + }, + '50': { + 'action': 'permit', + 'match': { + 'ip-nexthop-addr': ipv4_nexthop_address, + }, + 'set': { + 'metric': '-rtt', + }, + }, }, }, } diff --git a/smoketest/scripts/cli/test_protocols_bfd.py b/smoketest/scripts/cli/test_protocols_bfd.py index 451565664..f209eae3a 100755 --- a/smoketest/scripts/cli/test_protocols_bfd.py +++ b/smoketest/scripts/cli/test_protocols_bfd.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2021 VyOS maintainers and contributors +# Copyright (C) 2021-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -77,11 +77,23 @@ profiles = { } class TestProtocolsBFD(VyOSUnitTestSHIM.TestCase): + @classmethod + def setUpClass(cls): + super(TestProtocolsBFD, cls).setUpClass() + + # Retrieve FRR daemon PID - it is not allowed to crash, thus PID must remain the same + cls.daemon_pid = process_named_running(PROCESS_NAME) + + # ensure we can also run this test on a live system - so lets clean + # out the current configuration :) + cls.cli_delete(cls, base_path) + def tearDown(self): self.cli_delete(base_path) self.cli_commit() - # Check for running process - self.assertTrue(process_named_running(PROCESS_NAME)) + + # check process health and continuity + self.assertEqual(self.daemon_pid, process_named_running(PROCESS_NAME)) def test_bfd_peer(self): self.cli_set(['vrf', 'name', vrf_name, 'table', '1000']) diff --git a/smoketest/scripts/cli/test_protocols_bgp.py b/smoketest/scripts/cli/test_protocols_bgp.py index 23e138ebe..71e2142f9 100755 --- a/smoketest/scripts/cli/test_protocols_bgp.py +++ b/smoketest/scripts/cli/test_protocols_bgp.py @@ -174,9 +174,16 @@ class TestProtocolsBGP(VyOSUnitTestSHIM.TestCase): def setUpClass(cls): super(TestProtocolsBGP, cls).setUpClass() + # Retrieve FRR daemon PID - it is not allowed to crash, thus PID must remain the same + cls.daemon_pid = process_named_running(PROCESS_NAME) + # ensure we can also run this test on a live system - so lets clean # out the current configuration :) cls.cli_delete(cls, base_path) + cls.cli_delete(cls, ['policy', 'route-map']) + cls.cli_delete(cls, ['policy', 'prefix-list']) + cls.cli_delete(cls, ['policy', 'prefix-list6']) + cls.cli_delete(cls, ['vrf']) cls.cli_set(cls, ['policy', 'route-map', route_map_in, 'rule', '10', 'action', 'permit']) cls.cli_set(cls, ['policy', 'route-map', route_map_out, 'rule', '10', 'action', 'permit']) @@ -192,18 +199,23 @@ class TestProtocolsBGP(VyOSUnitTestSHIM.TestCase): @classmethod def tearDownClass(cls): - cls.cli_delete(cls, ['policy']) + cls.cli_delete(cls, ['policy', 'route-map']) + cls.cli_delete(cls, ['policy', 'prefix-list']) + cls.cli_delete(cls, ['policy', 'prefix-list6']) def setUp(self): self.cli_set(base_path + ['system-as', ASN]) def tearDown(self): + # cleanup any possible VRF mess self.cli_delete(['vrf']) + # always destrox the entire bgpd configuration to make the processes + # life as hard as possible self.cli_delete(base_path) self.cli_commit() - # Check for running process - self.assertTrue(process_named_running(PROCESS_NAME)) + # check process health and continuity + self.assertEqual(self.daemon_pid, process_named_running(PROCESS_NAME)) def create_bgp_instances_for_import_test(self): table = '1000' diff --git a/smoketest/scripts/cli/test_protocols_igmp-proxy.py b/smoketest/scripts/cli/test_protocols_igmp-proxy.py index a75003b12..df10442ea 100755 --- a/smoketest/scripts/cli/test_protocols_igmp-proxy.py +++ b/smoketest/scripts/cli/test_protocols_igmp-proxy.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2019-2020 VyOS maintainers and contributors +# Copyright (C) 2019-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -29,14 +29,32 @@ upstream_if = 'eth1' downstream_if = 'eth2' class TestProtocolsIGMPProxy(VyOSUnitTestSHIM.TestCase): - def setUp(self): - self.cli_set(['interfaces', 'ethernet', upstream_if, 'address', '172.16.1.1/24']) + @classmethod + def setUpClass(cls): + # call base-classes classmethod + super(TestProtocolsIGMPProxy, cls).setUpClass() + # ensure we can also run this test on a live system - so lets clean + # out the current configuration :) + cls.cli_delete(cls, base_path) + cls.cli_set(cls, ['interfaces', 'ethernet', upstream_if, 'address', '172.16.1.1/24']) + + @classmethod + def tearDownClass(cls): + cls.cli_delete(cls, ['interfaces', 'ethernet', upstream_if, 'address']) + + # call base-classes classmethod + super(TestProtocolsIGMPProxy, cls).tearDownClass() def tearDown(self): - self.cli_delete(['interfaces', 'ethernet', upstream_if, 'address']) + # Check for running process + self.assertTrue(process_named_running(PROCESS_NAME)) + self.cli_delete(base_path) self.cli_commit() + # Check for no longer running process + self.assertFalse(process_named_running(PROCESS_NAME)) + def test_igmpproxy(self): threshold = '20' altnet = '192.0.2.0/24' @@ -74,8 +92,5 @@ class TestProtocolsIGMPProxy(VyOSUnitTestSHIM.TestCase): self.assertIn(f'whitelist {whitelist}', config) self.assertIn(f'phyint {downstream_if} downstream ratelimit 0 threshold 1', config) - # Check for running process - self.assertTrue(process_named_running(PROCESS_NAME)) - if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_protocols_isis.py b/smoketest/scripts/cli/test_protocols_isis.py index 8b423dbea..aa5f2f38c 100755 --- a/smoketest/scripts/cli/test_protocols_isis.py +++ b/smoketest/scripts/cli/test_protocols_isis.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2021-2022 VyOS maintainers and contributors +# Copyright (C) 2021-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -31,20 +31,25 @@ class TestProtocolsISIS(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): cls._interfaces = Section.interfaces('ethernet') - # call base-classes classmethod super(TestProtocolsISIS, cls).setUpClass() - + # Retrieve FRR daemon PID - it is not allowed to crash, thus PID must remain the same + cls.daemon_pid = process_named_running(PROCESS_NAME) # ensure we can also run this test on a live system - so lets clean # out the current configuration :) cls.cli_delete(cls, base_path) + cls.cli_delete(cls, ['vrf']) def tearDown(self): + # cleanup any possible VRF mess + self.cli_delete(['vrf']) + # always destrox the entire isisd configuration to make the processes + # life as hard as possible self.cli_delete(base_path) self.cli_commit() - # Check for running process - self.assertTrue(process_named_running(PROCESS_NAME)) + # check process health and continuity + self.assertEqual(self.daemon_pid, process_named_running(PROCESS_NAME)) def isis_base_config(self): self.cli_set(base_path + ['net', net]) @@ -333,7 +338,7 @@ class TestProtocolsISIS(VyOSUnitTestSHIM.TestCase): self.cli_set(base_path + ['interface', interface]) self.cli_set(['policy', 'prefix-list', prefix_list, 'rule', '1', 'action', 'permit']) self.cli_set(['policy', 'prefix-list', prefix_list, 'rule', '1', 'prefix', prefix_list_address]) - + # Commit main ISIS changes self.cli_commit() @@ -385,4 +390,4 @@ class TestProtocolsISIS(VyOSUnitTestSHIM.TestCase): self.cli_commit() if __name__ == '__main__': - unittest.main(verbosity=2)
\ No newline at end of file + unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_protocols_mpls.py b/smoketest/scripts/cli/test_protocols_mpls.py index 06f21c6e1..0c1599f9b 100755 --- a/smoketest/scripts/cli/test_protocols_mpls.py +++ b/smoketest/scripts/cli/test_protocols_mpls.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2021 VyOS maintainers and contributors +# Copyright (C) 2021-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -70,6 +70,9 @@ class TestProtocolsMPLS(VyOSUnitTestSHIM.TestCase): def setUpClass(cls): super(TestProtocolsMPLS, cls).setUpClass() + # Retrieve FRR daemon PID - it is not allowed to crash, thus PID must remain the same + cls.daemon_pid = process_named_running(PROCESS_NAME) + # ensure we can also run this test on a live system - so lets clean # out the current configuration :) cls.cli_delete(cls, base_path) @@ -77,8 +80,9 @@ class TestProtocolsMPLS(VyOSUnitTestSHIM.TestCase): def tearDown(self): self.cli_delete(base_path) self.cli_commit() - # Check for running process - self.assertTrue(process_named_running(PROCESS_NAME)) + + # check process health and continuity + self.assertEqual(self.daemon_pid, process_named_running(PROCESS_NAME)) def test_mpls_basic(self): router_id = '1.2.3.4' diff --git a/smoketest/scripts/cli/test_protocols_ospf.py b/smoketest/scripts/cli/test_protocols_ospf.py index a6850db71..6bffc7c45 100755 --- a/smoketest/scripts/cli/test_protocols_ospf.py +++ b/smoketest/scripts/cli/test_protocols_ospf.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2021-2022 VyOS maintainers and contributors +# Copyright (C) 2021-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -32,6 +32,9 @@ class TestProtocolsOSPF(VyOSUnitTestSHIM.TestCase): def setUpClass(cls): super(TestProtocolsOSPF, cls).setUpClass() + # Retrieve FRR daemon PID - it is not allowed to crash, thus PID must remain the same + cls.daemon_pid = process_named_running(PROCESS_NAME) + cls.cli_set(cls, ['policy', 'route-map', route_map, 'rule', '10', 'action', 'permit']) cls.cli_set(cls, ['policy', 'route-map', route_map, 'rule', '20', 'action', 'permit']) @@ -45,11 +48,12 @@ class TestProtocolsOSPF(VyOSUnitTestSHIM.TestCase): super(TestProtocolsOSPF, cls).tearDownClass() def tearDown(self): - # Check for running process - self.assertTrue(process_named_running(PROCESS_NAME)) self.cli_delete(base_path) self.cli_commit() + # check process health and continuity + self.assertEqual(self.daemon_pid, process_named_running(PROCESS_NAME)) + def test_ospf_01_defaults(self): # commit changes self.cli_set(base_path) diff --git a/smoketest/scripts/cli/test_protocols_ospfv3.py b/smoketest/scripts/cli/test_protocols_ospfv3.py index 0d6c6c691..4ae7f05d9 100755 --- a/smoketest/scripts/cli/test_protocols_ospfv3.py +++ b/smoketest/scripts/cli/test_protocols_ospfv3.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2021-2022 VyOS maintainers and contributors +# Copyright (C) 2021-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -35,6 +35,9 @@ class TestProtocolsOSPFv3(VyOSUnitTestSHIM.TestCase): def setUpClass(cls): super(TestProtocolsOSPFv3, cls).setUpClass() + # Retrieve FRR daemon PID - it is not allowed to crash, thus PID must remain the same + cls.daemon_pid = process_named_running(PROCESS_NAME) + cls.cli_set(cls, ['policy', 'route-map', route_map, 'rule', '10', 'action', 'permit']) cls.cli_set(cls, ['policy', 'route-map', route_map, 'rule', '20', 'action', 'permit']) @@ -48,11 +51,12 @@ class TestProtocolsOSPFv3(VyOSUnitTestSHIM.TestCase): super(TestProtocolsOSPFv3, cls).tearDownClass() def tearDown(self): - # Check for running process - self.assertTrue(process_named_running(PROCESS_NAME)) self.cli_delete(base_path) self.cli_commit() + # check process health and continuity + self.assertEqual(self.daemon_pid, process_named_running(PROCESS_NAME)) + def test_ospfv3_01_basic(self): seq = '10' prefix = '2001:db8::/32' diff --git a/smoketest/scripts/cli/test_protocols_pim6.py b/smoketest/scripts/cli/test_protocols_pim6.py index e22a7c722..ba24edca2 100755 --- a/smoketest/scripts/cli/test_protocols_pim6.py +++ b/smoketest/scripts/cli/test_protocols_pim6.py @@ -25,15 +25,22 @@ PROCESS_NAME = 'pim6d' base_path = ['protocols', 'pim6'] class TestProtocolsPIMv6(VyOSUnitTestSHIM.TestCase): - def tearDown(self): - # Check for running process - self.assertTrue(process_named_running(PROCESS_NAME)) + @classmethod + def setUpClass(cls): + # call base-classes classmethod + super(TestProtocolsPIMv6, cls).setUpClass() + # Retrieve FRR daemon PID - it is not allowed to crash, thus PID must remain the same + cls.daemon_pid = process_named_running(PROCESS_NAME) + # ensure we can also run this test on a live system - so lets clean + # out the current configuration :) + cls.cli_delete(cls, base_path) + def tearDown(self): self.cli_delete(base_path) self.cli_commit() - # Check for running process - self.assertTrue(process_named_running(PROCESS_NAME)) + # check process health and continuity + self.assertEqual(self.daemon_pid, process_named_running(PROCESS_NAME)) def test_pim6_01_mld_simple(self): # commit changes diff --git a/smoketest/scripts/cli/test_protocols_rip.py b/smoketest/scripts/cli/test_protocols_rip.py index 925499fc8..bfc327fd4 100755 --- a/smoketest/scripts/cli/test_protocols_rip.py +++ b/smoketest/scripts/cli/test_protocols_rip.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2021 VyOS maintainers and contributors +# Copyright (C) 2021-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -34,7 +34,8 @@ class TestProtocolsRIP(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): super(TestProtocolsRIP, cls).setUpClass() - + # Retrieve FRR daemon PID - it is not allowed to crash, thus PID must remain the same + cls.daemon_pid = process_named_running(PROCESS_NAME) # ensure we can also run this test on a live system - so lets clean # out the current configuration :) cls.cli_delete(cls, base_path) @@ -65,8 +66,8 @@ class TestProtocolsRIP(VyOSUnitTestSHIM.TestCase): self.cli_delete(base_path) self.cli_commit() - # Check for running process - self.assertTrue(process_named_running(PROCESS_NAME)) + # check process health and continuity + self.assertEqual(self.daemon_pid, process_named_running(PROCESS_NAME)) def test_rip_01_parameters(self): distance = '40' diff --git a/smoketest/scripts/cli/test_protocols_ripng.py b/smoketest/scripts/cli/test_protocols_ripng.py index 0a8ce7eef..0cfb065c6 100755 --- a/smoketest/scripts/cli/test_protocols_ripng.py +++ b/smoketest/scripts/cli/test_protocols_ripng.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2021 VyOS maintainers and contributors +# Copyright (C) 2021-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -31,28 +31,43 @@ route_map = 'FooBar123' base_path = ['protocols', 'ripng'] class TestProtocolsRIPng(VyOSUnitTestSHIM.TestCase): - def setUp(self): - self.cli_set(['policy', 'access-list6', acl_in, 'rule', '10', 'action', 'permit']) - self.cli_set(['policy', 'access-list6', acl_in, 'rule', '10', 'source', 'any']) - self.cli_set(['policy', 'access-list6', acl_out, 'rule', '20', 'action', 'deny']) - self.cli_set(['policy', 'access-list6', acl_out, 'rule', '20', 'source', 'any']) - self.cli_set(['policy', 'prefix-list6', prefix_list_in, 'rule', '100', 'action', 'permit']) - self.cli_set(['policy', 'prefix-list6', prefix_list_in, 'rule', '100', 'prefix', '2001:db8::/32']) - self.cli_set(['policy', 'prefix-list6', prefix_list_out, 'rule', '200', 'action', 'deny']) - self.cli_set(['policy', 'prefix-list6', prefix_list_out, 'rule', '200', 'prefix', '2001:db8::/32']) - self.cli_set(['policy', 'route-map', route_map, 'rule', '10', 'action', 'permit']) + @classmethod + def setUpClass(cls): + # call base-classes classmethod + super(TestProtocolsRIPng, cls).setUpClass() + # Retrieve FRR daemon PID - it is not allowed to crash, thus PID must remain the same + cls.daemon_pid = process_named_running(PROCESS_NAME) + # ensure we can also run this test on a live system - so lets clean + # out the current configuration :) + cls.cli_delete(cls, base_path) + + cls.cli_set(cls, ['policy', 'access-list6', acl_in, 'rule', '10', 'action', 'permit']) + cls.cli_set(cls, ['policy', 'access-list6', acl_in, 'rule', '10', 'source', 'any']) + cls.cli_set(cls, ['policy', 'access-list6', acl_out, 'rule', '20', 'action', 'deny']) + cls.cli_set(cls, ['policy', 'access-list6', acl_out, 'rule', '20', 'source', 'any']) + cls.cli_set(cls, ['policy', 'prefix-list6', prefix_list_in, 'rule', '100', 'action', 'permit']) + cls.cli_set(cls, ['policy', 'prefix-list6', prefix_list_in, 'rule', '100', 'prefix', '2001:db8::/32']) + cls.cli_set(cls, ['policy', 'prefix-list6', prefix_list_out, 'rule', '200', 'action', 'deny']) + cls.cli_set(cls, ['policy', 'prefix-list6', prefix_list_out, 'rule', '200', 'prefix', '2001:db8::/32']) + cls.cli_set(cls, ['policy', 'route-map', route_map, 'rule', '10', 'action', 'permit']) + + @classmethod + def tearDownClass(cls): + # call base-classes classmethod + super(TestProtocolsRIPng, cls).tearDownClass() + + cls.cli_delete(cls, ['policy', 'access-list6', acl_in]) + cls.cli_delete(cls, ['policy', 'access-list6', acl_out]) + cls.cli_delete(cls, ['policy', 'prefix-list6', prefix_list_in]) + cls.cli_delete(cls, ['policy', 'prefix-list6', prefix_list_out]) + cls.cli_delete(cls, ['policy', 'route-map', route_map]) def tearDown(self): self.cli_delete(base_path) - self.cli_delete(['policy', 'access-list6', acl_in]) - self.cli_delete(['policy', 'access-list6', acl_out]) - self.cli_delete(['policy', 'prefix-list6', prefix_list_in]) - self.cli_delete(['policy', 'prefix-list6', prefix_list_out]) - self.cli_delete(['policy', 'route-map', route_map]) self.cli_commit() - # Check for running process - self.assertTrue(process_named_running(PROCESS_NAME)) + # check process health and continuity + self.assertEqual(self.daemon_pid, process_named_running(PROCESS_NAME)) def test_ripng_01_parameters(self): metric = '8' diff --git a/smoketest/scripts/cli/test_protocols_rpki.py b/smoketest/scripts/cli/test_protocols_rpki.py index f4aedcbc3..ab3f076ac 100755 --- a/smoketest/scripts/cli/test_protocols_rpki.py +++ b/smoketest/scripts/cli/test_protocols_rpki.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2021 VyOS maintainers and contributors +# Copyright (C) 2021-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -31,6 +31,16 @@ rpki_ssh_key = '/config/auth/id_rsa_rpki' rpki_ssh_pub = f'{rpki_ssh_key}.pub' class TestProtocolsRPKI(VyOSUnitTestSHIM.TestCase): + @classmethod + def setUpClass(cls): + # call base-classes classmethod + super(TestProtocolsRPKI, cls).setUpClass() + # Retrieve FRR daemon PID - it is not allowed to crash, thus PID must remain the same + cls.daemon_pid = process_named_running(PROCESS_NAME) + # ensure we can also run this test on a live system - so lets clean + # out the current configuration :) + cls.cli_delete(cls, base_path) + def tearDown(self): self.cli_delete(base_path) self.cli_commit() @@ -39,8 +49,8 @@ class TestProtocolsRPKI(VyOSUnitTestSHIM.TestCase): # frrconfig = self.getFRRconfig('rpki') # self.assertNotIn('rpki', frrconfig) - # Check for running process - self.assertTrue(process_named_running(PROCESS_NAME)) + # check process health and continuity + self.assertEqual(self.daemon_pid, process_named_running(PROCESS_NAME)) def test_rpki(self): polling = '7200' diff --git a/smoketest/scripts/cli/test_service_dns_dynamic.py b/smoketest/scripts/cli/test_service_dns_dynamic.py index fe213a8ae..cb3d90593 100755 --- a/smoketest/scripts/cli/test_service_dns_dynamic.py +++ b/smoketest/scripts/cli/test_service_dns_dynamic.py @@ -32,6 +32,7 @@ DDCLIENT_PID = '/run/ddclient/ddclient.pid' DDCLIENT_PNAME = 'ddclient' base_path = ['service', 'dns', 'dynamic'] +name_path = base_path + ['name'] server = 'ddns.vyos.io' hostname = 'test.ddns.vyos.io' zone = 'vyos.io' @@ -58,38 +59,38 @@ class TestServiceDDNS(VyOSUnitTestSHIM.TestCase): # IPv4 standard DDNS service configuration def test_01_dyndns_service_standard(self): - svc_path = ['address', interface, 'service'] services = {'cloudflare': {'protocol': 'cloudflare'}, 'freedns': {'protocol': 'freedns', 'username': username}, 'zoneedit': {'protocol': 'zoneedit1', 'username': username}} for svc, details in services.items(): - self.cli_set(base_path + svc_path + [svc, 'host-name', hostname]) - self.cli_set(base_path + svc_path + [svc, 'password', password]) - self.cli_set(base_path + svc_path + [svc, 'zone', zone]) - self.cli_set(base_path + svc_path + [svc, 'ttl', ttl]) + self.cli_set(name_path + [svc, 'address', interface]) + self.cli_set(name_path + [svc, 'host-name', hostname]) + self.cli_set(name_path + [svc, 'password', password]) + self.cli_set(name_path + [svc, 'zone', zone]) + self.cli_set(name_path + [svc, 'ttl', ttl]) for opt, value in details.items(): - self.cli_set(base_path + svc_path + [svc, opt, value]) + self.cli_set(name_path + [svc, opt, value]) # 'zone' option is supported and required by 'cloudfare', but not 'freedns' and 'zoneedit' - self.cli_set(base_path + svc_path + [svc, 'zone', zone]) + self.cli_set(name_path + [svc, 'zone', zone]) if details['protocol'] == 'cloudflare': pass else: # exception is raised for unsupported ones with self.assertRaises(ConfigSessionError): self.cli_commit() - self.cli_delete(base_path + svc_path + [svc, 'zone']) + self.cli_delete(name_path + [svc, 'zone']) # 'ttl' option is supported by 'cloudfare', but not 'freedns' and 'zoneedit' - self.cli_set(base_path + svc_path + [svc, 'ttl', ttl]) + self.cli_set(name_path + [svc, 'ttl', ttl]) if details['protocol'] == 'cloudflare': pass else: # exception is raised for unsupported ones with self.assertRaises(ConfigSessionError): self.cli_commit() - self.cli_delete(base_path + svc_path + [svc, 'ttl']) + self.cli_delete(name_path + [svc, 'ttl']) # commit changes self.cli_commit() @@ -113,7 +114,7 @@ class TestServiceDDNS(VyOSUnitTestSHIM.TestCase): # IPv6 only DDNS service configuration def test_02_dyndns_service_ipv6(self): interval = '60' - svc_path = ['address', interface, 'service', 'dynv6'] + svc_path = name_path + ['dynv6'] proto = 'dyndns2' ip_version = 'ipv6' wait_time = '600' @@ -121,19 +122,20 @@ class TestServiceDDNS(VyOSUnitTestSHIM.TestCase): expiry_time_bad = '360' self.cli_set(base_path + ['interval', interval]) - self.cli_set(base_path + svc_path + ['ip-version', ip_version]) - self.cli_set(base_path + svc_path + ['protocol', proto]) - self.cli_set(base_path + svc_path + ['server', server]) - self.cli_set(base_path + svc_path + ['username', username]) - self.cli_set(base_path + svc_path + ['password', password]) - self.cli_set(base_path + svc_path + ['host-name', hostname]) - self.cli_set(base_path + svc_path + ['wait-time', wait_time]) + self.cli_set(svc_path + ['address', interface]) + self.cli_set(svc_path + ['ip-version', ip_version]) + self.cli_set(svc_path + ['protocol', proto]) + self.cli_set(svc_path + ['server', server]) + self.cli_set(svc_path + ['username', username]) + self.cli_set(svc_path + ['password', password]) + self.cli_set(svc_path + ['host-name', hostname]) + self.cli_set(svc_path + ['wait-time', wait_time]) # expiry-time must be greater than wait-time, exception is raised otherwise - self.cli_set(base_path + svc_path + ['expiry-time', expiry_time_bad]) with self.assertRaises(ConfigSessionError): + self.cli_set(svc_path + ['expiry-time', expiry_time_bad]) self.cli_commit() - self.cli_set(base_path + svc_path + ['expiry-time', expiry_time_good]) + self.cli_set(svc_path + ['expiry-time', expiry_time_good]) # commit changes self.cli_commit() @@ -152,25 +154,25 @@ class TestServiceDDNS(VyOSUnitTestSHIM.TestCase): # IPv4+IPv6 dual DDNS service configuration def test_03_dyndns_service_dual_stack(self): - svc_path = ['address', interface, 'service'] services = {'cloudflare': {'protocol': 'cloudflare', 'zone': zone}, 'freedns': {'protocol': 'freedns', 'username': username}, 'google': {'protocol': 'googledomains', 'username': username}} ip_version = 'both' for name, details in services.items(): - self.cli_set(base_path + svc_path + [name, 'host-name', hostname]) - self.cli_set(base_path + svc_path + [name, 'password', password]) + self.cli_set(name_path + [name, 'address', interface]) + self.cli_set(name_path + [name, 'host-name', hostname]) + self.cli_set(name_path + [name, 'password', password]) for opt, value in details.items(): - self.cli_set(base_path + svc_path + [name, opt, value]) + self.cli_set(name_path + [name, opt, value]) # Dual stack is supported by 'cloudfare' and 'freedns' but not 'googledomains' # exception is raised for unsupported ones - self.cli_set(base_path + svc_path + [name, 'ip-version', ip_version]) + self.cli_set(name_path + [name, 'ip-version', ip_version]) if details['protocol'] not in ['cloudflare', 'freedns']: with self.assertRaises(ConfigSessionError): self.cli_commit() - self.cli_delete(base_path + svc_path + [name, 'ip-version']) + self.cli_delete(name_path + [name, 'ip-version']) # commit changes self.cli_commit() @@ -197,16 +199,19 @@ class TestServiceDDNS(VyOSUnitTestSHIM.TestCase): def test_04_dyndns_rfc2136(self): # Check if DDNS service can be configured and runs - svc_path = ['address', interface, 'rfc2136', 'vyos'] + svc_path = name_path + ['vyos'] + proto = 'nsupdate' with tempfile.NamedTemporaryFile(prefix='/config/auth/') as key_file: key_file.write(b'S3cretKey') - self.cli_set(base_path + svc_path + ['server', server]) - self.cli_set(base_path + svc_path + ['zone', zone]) - self.cli_set(base_path + svc_path + ['key', key_file.name]) - self.cli_set(base_path + svc_path + ['ttl', ttl]) - self.cli_set(base_path + svc_path + ['host-name', hostname]) + self.cli_set(svc_path + ['address', interface]) + self.cli_set(svc_path + ['protocol', proto]) + self.cli_set(svc_path + ['server', server]) + self.cli_set(svc_path + ['zone', zone]) + self.cli_set(svc_path + ['key', key_file.name]) + self.cli_set(svc_path + ['ttl', ttl]) + self.cli_set(svc_path + ['host-name', hostname]) # commit changes self.cli_commit() @@ -215,7 +220,7 @@ class TestServiceDDNS(VyOSUnitTestSHIM.TestCase): ddclient_conf = cmd(f'sudo cat {DDCLIENT_CONF}') self.assertIn(f'use=if', ddclient_conf) self.assertIn(f'if={interface}', ddclient_conf) - self.assertIn(f'protocol=nsupdate', ddclient_conf) + self.assertIn(f'protocol={proto}', ddclient_conf) self.assertIn(f'server={server}', ddclient_conf) self.assertIn(f'zone={zone}', ddclient_conf) self.assertIn(f'password=\'{key_file.name}\'', ddclient_conf) @@ -223,16 +228,17 @@ class TestServiceDDNS(VyOSUnitTestSHIM.TestCase): def test_05_dyndns_hostname(self): # Check if DDNS service can be configured and runs - svc_path = ['address', interface, 'service', 'namecheap'] + svc_path = name_path + ['namecheap'] proto = 'namecheap' hostnames = ['@', 'www', hostname, f'@.{hostname}'] for name in hostnames: - self.cli_set(base_path + svc_path + ['protocol', proto]) - self.cli_set(base_path + svc_path + ['server', server]) - self.cli_set(base_path + svc_path + ['username', username]) - self.cli_set(base_path + svc_path + ['password', password]) - self.cli_set(base_path + svc_path + ['host-name', name]) + self.cli_set(svc_path + ['address', interface]) + self.cli_set(svc_path + ['protocol', proto]) + self.cli_set(svc_path + ['server', server]) + self.cli_set(svc_path + ['username', username]) + self.cli_set(svc_path + ['password', password]) + self.cli_set(svc_path + ['host-name', name]) # commit changes self.cli_commit() @@ -247,42 +253,32 @@ class TestServiceDDNS(VyOSUnitTestSHIM.TestCase): def test_06_dyndns_web_options(self): # Check if DDNS service can be configured and runs - base_path_iface = base_path + ['address', interface] - base_path_web = base_path + ['address', 'web'] - svc_path_iface = base_path_iface + ['service', 'cloudflare'] - svc_path_web = base_path_web + ['service', 'cloudflare'] + svc_path = name_path + ['cloudflare'] proto = 'cloudflare' web_url_good = 'https://ifconfig.me/ip' web_url_bad = 'http:/ifconfig.me/ip' - self.cli_set(svc_path_iface + ['protocol', proto]) - self.cli_set(svc_path_iface + ['zone', zone]) - self.cli_set(svc_path_iface + ['password', password]) - self.cli_set(svc_path_iface + ['host-name', hostname]) - self.cli_set(base_path_iface + ['web-options', 'url', web_url_good]) + self.cli_set(svc_path + ['protocol', proto]) + self.cli_set(svc_path + ['zone', zone]) + self.cli_set(svc_path + ['password', password]) + self.cli_set(svc_path + ['host-name', hostname]) + self.cli_set(svc_path + ['web-options', 'url', web_url_good]) # web-options is supported only with web service based address lookup # exception is raised for interface based address lookup with self.assertRaises(ConfigSessionError): + self.cli_set(svc_path + ['address', interface]) self.cli_commit() - self.cli_delete(base_path_iface + ['web-options']) + self.cli_set(svc_path + ['address', 'web']) # commit changes self.cli_commit() - # web-options is supported with web service based address lookup - # this should work, but clear interface based config first - self.cli_delete(base_path_iface) - self.cli_set(svc_path_web + ['protocol', proto]) - self.cli_set(svc_path_web + ['zone', zone]) - self.cli_set(svc_path_web + ['password', password]) - self.cli_set(svc_path_web + ['host-name', hostname]) - # web-options must be a valid URL - with self.assertRaises(ConfigSessionError) as cm: - self.cli_set(base_path_web + ['web-options', 'url', web_url_bad]) - self.assertIn(f'"{web_url_bad.removeprefix("http:")}" is not a valid URI', str(cm.exception)) - self.cli_set(base_path_web + ['web-options', 'url', web_url_good]) + with self.assertRaises(ConfigSessionError): + self.cli_set(svc_path + ['web-options', 'url', web_url_bad]) + self.cli_commit() + self.cli_set(svc_path + ['web-options', 'url', web_url_good]) # commit changes self.cli_commit() @@ -300,15 +296,17 @@ class TestServiceDDNS(VyOSUnitTestSHIM.TestCase): # Table number randomized, but should be within range 100-65535 vrf_table = "".join(random.choices(string.digits, k=4)) vrf_name = f'vyos-test-{vrf_table}' - svc_path = ['address', interface, 'service', 'cloudflare'] + svc_path = name_path + ['cloudflare'] + proto = 'cloudflare' self.cli_set(['vrf', 'name', vrf_name, 'table', vrf_table]) self.cli_set(base_path + ['vrf', vrf_name]) - self.cli_set(base_path + svc_path + ['protocol', 'cloudflare']) - self.cli_set(base_path + svc_path + ['host-name', hostname]) - self.cli_set(base_path + svc_path + ['zone', zone]) - self.cli_set(base_path + svc_path + ['password', password]) + self.cli_set(svc_path + ['address', interface]) + self.cli_set(svc_path + ['protocol', proto]) + self.cli_set(svc_path + ['host-name', hostname]) + self.cli_set(svc_path + ['zone', zone]) + self.cli_set(svc_path + ['password', password]) # commit changes self.cli_commit() diff --git a/smoketest/scripts/cli/test_service_https.py b/smoketest/scripts/cli/test_service_https.py index 24e1f1299..6cb91bcf1 100755 --- a/smoketest/scripts/cli/test_service_https.py +++ b/smoketest/scripts/cli/test_service_https.py @@ -254,6 +254,35 @@ class TestHTTPSService(VyOSUnitTestSHIM.TestCase): self.assertTrue(success) @ignore_warning(InsecureRequestWarning) + def test_api_add_delete(self): + address = '127.0.0.1' + key = 'VyOS-key' + url = f'https://{address}/retrieve' + payload = {'data': '{"op": "showConfig", "path": []}', 'key': f'{key}'} + headers = {} + + self.cli_set(base_path) + self.cli_commit() + + r = request('POST', url, verify=False, headers=headers, data=payload) + # api not configured; expect 503 + self.assertEqual(r.status_code, 503) + + self.cli_set(base_path + ['api', 'keys', 'id', 'key-01', 'key', key]) + self.cli_commit() + + r = request('POST', url, verify=False, headers=headers, data=payload) + # api configured; expect 200 + self.assertEqual(r.status_code, 200) + + self.cli_delete(base_path + ['api']) + self.cli_commit() + + r = request('POST', url, verify=False, headers=headers, data=payload) + # api deleted; expect 503 + self.assertEqual(r.status_code, 503) + + @ignore_warning(InsecureRequestWarning) def test_api_show(self): address = '127.0.0.1' key = 'VyOS-key' diff --git a/smoketest/scripts/cli/test_service_ipoe-server.py b/smoketest/scripts/cli/test_service_ipoe-server.py index 4dd3e761c..358668e0d 100755 --- a/smoketest/scripts/cli/test_service_ipoe-server.py +++ b/smoketest/scripts/cli/test_service_ipoe-server.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2022 VyOS maintainers and contributors +# Copyright (C) 2022-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -17,28 +17,35 @@ import re import unittest +from collections import OrderedDict from base_accel_ppp_test import BasicAccelPPPTest from vyos.configsession import ConfigSessionError from vyos.utils.process import cmd - from configparser import ConfigParser +from configparser import RawConfigParser -ac_name = 'ACN' -interface = 'eth0' +ac_name = "ACN" +interface = "eth0" -def getConfig(string, end='cli'): - command = f'cat /run/accel-pppd/ipoe.conf | sed -n "/^{string}/,/^{end}/p"' - out = cmd(command) - return out +class MultiOrderedDict(OrderedDict): + # Accel-ppp has duplicate keys in config file (gw-ip-address) + # This class is used to define dictionary which can contain multiple values + # in one key. + def __setitem__(self, key, value): + if isinstance(value, list) and key in self: + self[key].extend(value) + else: + super(OrderedDict, self).__setitem__(key, value) class TestServiceIPoEServer(BasicAccelPPPTest.TestCase): @classmethod def setUpClass(cls): - cls._base_path = ['service', 'ipoe-server'] - cls._config_file = '/run/accel-pppd/ipoe.conf' - cls._chap_secrets = '/run/accel-pppd/ipoe.chap-secrets' + cls._base_path = ["service", "ipoe-server"] + cls._config_file = "/run/accel-pppd/ipoe.conf" + cls._chap_secrets = "/run/accel-pppd/ipoe.chap-secrets" + cls._protocol_section = "ipoe" # call base-classes classmethod super(TestServiceIPoEServer, cls).setUpClass() @@ -47,22 +54,29 @@ class TestServiceIPoEServer(BasicAccelPPPTest.TestCase): super().verify(conf) # Validate configuration values - accel_modules = list(conf['modules'].keys()) - self.assertIn('log_syslog', accel_modules) - self.assertIn('ipoe', accel_modules) - self.assertIn('shaper', accel_modules) - self.assertIn('ipv6pool', accel_modules) - self.assertIn('ipv6_nd', accel_modules) - self.assertIn('ipv6_dhcp', accel_modules) - self.assertIn('ippool', accel_modules) - - def basic_config(self): - self.set(['interface', interface, 'client-subnet', '192.168.0.0/24']) + accel_modules = list(conf["modules"].keys()) + self.assertIn("log_syslog", accel_modules) + self.assertIn("ipoe", accel_modules) + self.assertIn("shaper", accel_modules) + self.assertIn("ipv6pool", accel_modules) + self.assertIn("ipv6_nd", accel_modules) + self.assertIn("ipv6_dhcp", accel_modules) + self.assertIn("ippool", accel_modules) + + def initial_gateway_config(self): + self._gateway = "192.0.2.1/24" + super().initial_gateway_config() + + def initial_auth_config(self): + self.set(["authentication", "mode", "noauth"]) + + def basic_protocol_specific_config(self): + self.set(["interface", interface, "client-subnet", "192.168.0.0/24"]) def test_accel_local_authentication(self): - mac_address = '08:00:27:2f:d8:06' - self.set(['authentication', 'interface', interface, 'mac', mac_address]) - self.set(['authentication', 'mode', 'local']) + mac_address = "08:00:27:2f:d8:06" + self.set(["authentication", "interface", interface, "mac", mac_address]) + self.set(["authentication", "mode", "local"]) # No IPoE interface configured with self.assertRaises(ConfigSessionError): @@ -70,115 +84,109 @@ class TestServiceIPoEServer(BasicAccelPPPTest.TestCase): # Test configuration of local authentication for PPPoE server self.basic_config() - + # Rewrite authentication from basic_config + self.set(["authentication", "interface", interface, "mac", mac_address]) + self.set(["authentication", "mode", "local"]) # commit changes self.cli_commit() # Validate configuration values - conf = ConfigParser(allow_no_value=True, delimiters='=') + conf = ConfigParser(allow_no_value=True, delimiters="=", strict=False) conf.read(self._config_file) # check proper path to chap-secrets file - self.assertEqual(conf['chap-secrets']['chap-secrets'], self._chap_secrets) + self.assertEqual(conf["chap-secrets"]["chap-secrets"], self._chap_secrets) - accel_modules = list(conf['modules'].keys()) - self.assertIn('chap-secrets', accel_modules) + accel_modules = list(conf["modules"].keys()) + self.assertIn("chap-secrets", accel_modules) # basic verification self.verify(conf) # check local users - tmp = cmd(f'sudo cat {self._chap_secrets}') - regex = f'{interface}\s+\*\s+{mac_address}\s+\*' + tmp = cmd(f"sudo cat {self._chap_secrets}") + regex = f"{interface}\s+\*\s+{mac_address}\s+\*" tmp = re.findall(regex, tmp) self.assertTrue(tmp) - def test_accel_named_pool(self): - first_pool = 'VyOS-pool1' - first_subnet = '192.0.2.0/25' - first_gateway = '192.0.2.1' - second_pool = 'Vyos-pool2' - second_subnet = '203.0.113.0/25' - second_gateway = '203.0.113.1' - - self.set(['authentication', 'mode', 'noauth']) - self.set(['client-ip-pool', 'name', first_pool, 'gateway-address', first_gateway]) - self.set(['client-ip-pool', 'name', first_pool, 'subnet', first_subnet]) - self.set(['client-ip-pool', 'name', second_pool, 'gateway-address', second_gateway]) - self.set(['client-ip-pool', 'name', second_pool, 'subnet', second_subnet]) - self.set(['interface', interface]) + def test_accel_ipv4_pool(self): + self.basic_config(is_gateway=False, is_client_pool=False) + gateway = ["172.16.0.1/25", "192.0.2.1/24"] + subnet = "172.16.0.0/24" + first_pool = "POOL1" + second_pool = "POOL2" + range = "192.0.2.10-192.0.2.20" + + for gw in gateway: + self.set(["gateway-address", gw]) + + self.set(["client-ip-pool", first_pool, "range", subnet]) + self.set(["client-ip-pool", first_pool, "next-pool", second_pool]) + self.set(["client-ip-pool", second_pool, "range", range]) + self.set(["default-pool", first_pool]) # commit changes - self.cli_commit() + self.cli_commit() # Validate configuration values - conf = ConfigParser(allow_no_value=True, delimiters='=', strict=False) + conf = RawConfigParser( + allow_no_value=True, + delimiters="=", + strict=False, + dict_type=MultiOrderedDict, + ) conf.read(self._config_file) - self.assertTrue(conf['ipoe']['interface'], f'{interface},shared=1,mode=L2,ifcfg=1,start=dhcpv4,ipv6=1') - self.assertTrue(conf['ipoe']['noauth'], '1') - self.assertTrue(conf['ipoe']['ip-pool'], first_pool) - self.assertTrue(conf['ipoe']['ip-pool'], second_pool) - self.assertTrue(conf['ipoe']['gw-ip-address'], f'{first_gateway}/25') - self.assertTrue(conf['ipoe']['gw-ip-address'], f'{second_gateway}/25') - - config = getConfig('[ip-pool]') - pool_config = f'''{second_subnet},name={second_pool} -{first_subnet},name={first_pool} -gw-ip-address={second_gateway}/25 -gw-ip-address={first_gateway}/25''' - self.assertIn(pool_config, config) + self.assertIn( + f"{first_pool},next={second_pool}", conf["ip-pool"][f"{subnet},name"] + ) + self.assertIn(second_pool, conf["ip-pool"][f"{range},name"]) + + gw_pool_config_list = conf.get("ip-pool", "gw-ip-address") + gw_ipoe_config_list = conf.get(self._protocol_section, "gw-ip-address") + for gw in gateway: + self.assertIn(gw.split("/")[0], gw_pool_config_list) + self.assertIn(gw, gw_ipoe_config_list) + self.assertIn(first_pool, conf[self._protocol_section]["ip-pool"]) def test_accel_next_pool(self): - first_pool = 'VyOS-pool1' - first_subnet = '192.0.2.0/25' - first_gateway = '192.0.2.1' - second_pool = 'Vyos-pool2' - second_subnet = '203.0.113.0/25' - second_gateway = '203.0.113.1' - third_pool = 'Vyos-pool3' - third_subnet = '198.51.100.0/24' - third_gateway = '198.51.100.1' - - self.set(['authentication', 'mode', 'noauth']) - self.set(['client-ip-pool', 'name', first_pool, 'gateway-address', first_gateway]) - self.set(['client-ip-pool', 'name', first_pool, 'subnet', first_subnet]) - self.set(['client-ip-pool', 'name', first_pool, 'next-pool', second_pool]) - self.set(['client-ip-pool', 'name', second_pool, 'gateway-address', second_gateway]) - self.set(['client-ip-pool', 'name', second_pool, 'subnet', second_subnet]) - self.set(['client-ip-pool', 'name', second_pool, 'next-pool', third_pool]) - self.set(['client-ip-pool', 'name', third_pool, 'gateway-address', third_gateway]) - self.set(['client-ip-pool', 'name', third_pool, 'subnet', third_subnet]) - self.set(['interface', interface]) + self.basic_config(is_gateway=False, is_client_pool=False) + + first_pool = "VyOS-pool1" + first_subnet = "192.0.2.0/25" + first_gateway = "192.0.2.1/24" + second_pool = "Vyos-pool2" + second_subnet = "203.0.113.0/25" + second_gateway = "203.0.113.1/24" + third_pool = "Vyos-pool3" + third_subnet = "198.51.100.0/24" + third_gateway = "198.51.100.1/24" + + self.set(["gateway-address", f"{first_gateway}"]) + self.set(["gateway-address", f"{second_gateway}"]) + self.set(["gateway-address", f"{third_gateway}"]) + + self.set(["client-ip-pool", first_pool, "range", first_subnet]) + self.set(["client-ip-pool", first_pool, "next-pool", second_pool]) + self.set(["client-ip-pool", second_pool, "range", second_subnet]) + self.set(["client-ip-pool", second_pool, "next-pool", third_pool]) + self.set(["client-ip-pool", third_pool, "range", third_subnet]) # commit changes self.cli_commit() - - # Validate configuration values - conf = ConfigParser(allow_no_value=True, delimiters='=', strict=False) - conf.read(self._config_file) - - self.assertTrue(conf['ipoe']['interface'], f'{interface},shared=1,mode=L2,ifcfg=1,start=dhcpv4,ipv6=1') - self.assertTrue(conf['ipoe']['noauth'], '1') - self.assertTrue(conf['ipoe']['ip-pool'], first_pool) - self.assertTrue(conf['ipoe']['gw-ip-address'], f'{first_gateway}/25') - self.assertTrue(conf['ipoe']['gw-ip-address'], f'{second_gateway}/25') - self.assertTrue(conf['ipoe']['gw-ip-address'], f'{third_gateway}/24') - - config = getConfig('[ip-pool]') + config = self.getConfig("ip-pool") # T5099 required specific order - pool_config = f'''{third_subnet},name={third_pool} + pool_config = f"""gw-ip-address={first_gateway.split('/')[0]} +gw-ip-address={second_gateway.split('/')[0]} +gw-ip-address={third_gateway.split('/')[0]} +{third_subnet},name={third_pool} {second_subnet},name={second_pool},next={third_pool} -{first_subnet},name={first_pool},next={second_pool} -gw-ip-address={third_gateway}/24 -gw-ip-address={second_gateway}/25 -gw-ip-address={first_gateway}/25''' +{first_subnet},name={first_pool},next={second_pool}""" self.assertIn(pool_config, config) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main(verbosity=2) - diff --git a/smoketest/scripts/cli/test_service_pppoe-server.py b/smoketest/scripts/cli/test_service_pppoe-server.py index 969abd3d5..3001e71bf 100755 --- a/smoketest/scripts/cli/test_service_pppoe-server.py +++ b/smoketest/scripts/cli/test_service_pppoe-server.py @@ -32,7 +32,7 @@ class TestServicePPPoEServer(BasicAccelPPPTest.TestCase): cls._base_path = ['service', 'pppoe-server'] cls._config_file = '/run/accel-pppd/pppoe.conf' cls._chap_secrets = '/run/accel-pppd/pppoe.chap-secrets' - + cls._protocol_section = 'pppoe' # call base-classes classmethod super(TestServicePPPoEServer, cls).setUpClass() @@ -65,13 +65,11 @@ class TestServicePPPoEServer(BasicAccelPPPTest.TestCase): super().verify(conf) - def basic_config(self): + def basic_protocol_specific_config(self): self.cli_set(local_if + ['address', '192.0.2.1/32']) - self.set(['access-concentrator', ac_name]) self.set(['interface', interface]) - super().basic_config() def test_pppoe_server_ppp_options(self): # Test configuration of local authentication for PPPoE server @@ -120,7 +118,6 @@ class TestServicePPPoEServer(BasicAccelPPPTest.TestCase): # check interface-cache self.assertEqual(conf['ppp']['unit-cache'], interface_cache) - def test_pppoe_server_authentication_protocols(self): # Test configuration of local authentication for PPPoE server self.basic_config() @@ -137,68 +134,25 @@ class TestServicePPPoEServer(BasicAccelPPPTest.TestCase): self.assertEqual(conf['modules']['auth_mschap_v2'], None) - - def test_pppoe_server_client_ip_pool(self): - # Test configuration of IPv6 client pools - self.basic_config() - - subnet = '172.18.0.0/24' + def test_pppoe_server_shaper(self): fwmark = '223' limiter = 'tbf' + self.basic_config() - self.set(['client-ip-pool', 'subnet', subnet]) - - start = '192.0.2.10' - stop = '192.0.2.20' - stop_octet = stop.split('.')[3] - start_stop = f'{start}-{stop_octet}' - self.set(['client-ip-pool', 'start', start]) - self.set(['client-ip-pool', 'stop', stop]) self.set(['shaper', 'fwmark', fwmark]) - # commit changes - self.cli_commit() - - # Validate configuration values - conf = ConfigParser(allow_no_value=True) - conf.read(self._config_file) - # check configured subnet - self.assertEqual(conf['ip-pool'][subnet], None) - self.assertEqual(conf['ip-pool'][start_stop], None) - self.assertEqual(conf['ip-pool']['gw-ip-address'], self._gateway) - self.assertEqual(conf['shaper']['fwmark'], fwmark) - self.assertEqual(conf['shaper']['down-limiter'], limiter) - - - def test_pppoe_server_client_ip_pool_name(self): - # Test configuration of named client pools - self.basic_config() - - subnet = '192.0.2.0/24' - gateway = '192.0.2.1' - pool = 'VYOS' - - subnet_name = f'{subnet},name' - gw_ip_prefix = f'{gateway}/24' - - self.set(['client-ip-pool', 'name', pool, 'subnet', subnet]) - self.set(['client-ip-pool', 'name', pool, 'gateway-address', gateway]) - self.cli_delete(self._base_path + ['gateway-address']) - - # commit changes self.cli_commit() # Validate configuration values conf = ConfigParser(allow_no_value=True, delimiters='=') conf.read(self._config_file) - # Validate configuration - self.assertEqual(conf['ip-pool'][subnet_name], pool) - self.assertEqual(conf['ip-pool']['gw-ip-address'], gateway) - self.assertEqual(conf['pppoe']['ip-pool'], pool) - self.assertEqual(conf['pppoe']['gw-ip-address'], gw_ip_prefix) + # basic verification + self.verify(conf) + self.assertEqual(conf['shaper']['fwmark'], fwmark) + self.assertEqual(conf['shaper']['down-limiter'], limiter) def test_pppoe_server_client_ipv6_pool(self): # Test configuration of IPv6 client pools @@ -239,7 +193,6 @@ class TestServicePPPoEServer(BasicAccelPPPTest.TestCase): self.assertEqual(conf['ipv6-pool'][client_prefix], None) self.assertEqual(conf['ipv6-pool']['delegate'], f'{delegate_prefix},{delegate_mask}') - def test_accel_radius_authentication(self): radius_called_sid = 'ifname:mac' radius_acct_interim_jitter = '9' @@ -261,7 +214,6 @@ class TestServicePPPoEServer(BasicAccelPPPTest.TestCase): self.assertEqual(conf['radius']['acct-interim-jitter'], radius_acct_interim_jitter) self.assertEqual(conf['radius']['acct-interim-interval'], radius_acct_interim_interval) - def test_pppoe_server_vlan(self): vlans = ['100', '200', '300-310'] @@ -284,5 +236,6 @@ class TestServicePPPoEServer(BasicAccelPPPTest.TestCase): tmp = ','.join(vlans) self.assertIn(f'vlan-mon={interface},{tmp}', config) + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_system_conntrack.py b/smoketest/scripts/cli/test_system_conntrack.py index 7657ab724..0dbc97d49 100755 --- a/smoketest/scripts/cli/test_system_conntrack.py +++ b/smoketest/scripts/cli/test_system_conntrack.py @@ -297,5 +297,49 @@ class TestSystemConntrack(VyOSUnitTestSHIM.TestCase): self.cli_delete(['firewall']) + def test_conntrack_timeout_custom(self): + + self.cli_set(base_path + ['timeout', 'custom', 'ipv4', 'rule', '1', 'source', 'address', '192.0.2.1']) + self.cli_set(base_path + ['timeout', 'custom', 'ipv4', 'rule', '1', 'destination', 'address', '192.0.2.2']) + self.cli_set(base_path + ['timeout', 'custom', 'ipv4', 'rule', '1', 'destination', 'port', '22']) + self.cli_set(base_path + ['timeout', 'custom', 'ipv4', 'rule', '1', 'protocol', 'tcp', 'syn-sent', '77']) + self.cli_set(base_path + ['timeout', 'custom', 'ipv4', 'rule', '1', 'protocol', 'tcp', 'close', '88']) + self.cli_set(base_path + ['timeout', 'custom', 'ipv4', 'rule', '1', 'protocol', 'tcp', 'established', '99']) + + self.cli_set(base_path + ['timeout', 'custom', 'ipv4', 'rule', '2', 'inbound-interface', 'eth1']) + self.cli_set(base_path + ['timeout', 'custom', 'ipv4', 'rule', '2', 'source', 'address', '198.51.100.1']) + self.cli_set(base_path + ['timeout', 'custom', 'ipv4', 'rule', '2', 'protocol', 'udp', 'unreplied', '55']) + + self.cli_set(base_path + ['timeout', 'custom', 'ipv6', 'rule', '1', 'source', 'address', '2001:db8::1']) + self.cli_set(base_path + ['timeout', 'custom', 'ipv6', 'rule', '1', 'inbound-interface', 'eth2']) + self.cli_set(base_path + ['timeout', 'custom', 'ipv6', 'rule', '1', 'protocol', 'tcp', 'time-wait', '22']) + self.cli_set(base_path + ['timeout', 'custom', 'ipv6', 'rule', '1', 'protocol', 'tcp', 'last-ack', '33']) + + self.cli_commit() + + nftables_search = [ + ['ct timeout ct-timeout-1 {'], + ['protocol tcp'], + ['policy = { syn_sent : 77, established : 99, close : 88 }'], + ['ct timeout ct-timeout-2 {'], + ['protocol udp'], + ['policy = { unreplied : 55 }'], + ['chain VYOS_CT_TIMEOUT {'], + ['ip saddr 192.0.2.1', 'ip daddr 192.0.2.2', 'tcp dport 22', 'ct timeout set "ct-timeout-1"'], + ['iifname "eth1"', 'meta l4proto udp', 'ip saddr 198.51.100.1', 'ct timeout set "ct-timeout-2"'] + ] + + nftables6_search = [ + ['ct timeout ct-timeout-1 {'], + ['protocol tcp'], + ['policy = { last_ack : 33, time_wait : 22 }'], + ['chain VYOS_CT_TIMEOUT {'], + ['iifname "eth2"', 'meta l4proto tcp', 'ip6 saddr 2001:db8::1', 'ct timeout set "ct-timeout-1"'] + ] + + self.verify_nftables(nftables_search, 'ip vyos_conntrack') + self.verify_nftables(nftables6_search, 'ip6 vyos_conntrack') + + self.cli_delete(['firewall']) if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_vpn_l2tp.py b/smoketest/scripts/cli/test_vpn_l2tp.py new file mode 100755 index 000000000..05ffb6bb5 --- /dev/null +++ b/smoketest/scripts/cli/test_vpn_l2tp.py @@ -0,0 +1,212 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2023 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +import re +import unittest + +from base_accel_ppp_test import BasicAccelPPPTest +from configparser import ConfigParser +from vyos.utils.process import cmd + + +class TestVPNL2TPServer(BasicAccelPPPTest.TestCase): + @classmethod + def setUpClass(cls): + cls._base_path = ['vpn', 'l2tp', 'remote-access'] + cls._config_file = '/run/accel-pppd/l2tp.conf' + cls._chap_secrets = '/run/accel-pppd/l2tp.chap-secrets' + cls._protocol_section = 'l2tp' + # call base-classes classmethod + super(TestVPNL2TPServer, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(TestVPNL2TPServer, cls).tearDownClass() + + def basic_protocol_specific_config(self): + pass + + def test_accel_local_authentication(self): + # Test configuration of local authentication + self.basic_config() + + # upload / download limit + user = "test" + password = "test2" + static_ip = "100.100.100.101" + upload = "5000" + download = "10000" + + self.set( + [ + "authentication", + "local-users", + "username", + user, + "password", + password, + ] + ) + self.set( + [ + "authentication", + "local-users", + "username", + user, + "static-ip", + static_ip, + ] + ) + self.set( + [ + "authentication", + "local-users", + "username", + user, + "rate-limit", + "upload", + upload, + ] + ) + self.set( + [ + "authentication", + "local-users", + "username", + user, + "rate-limit", + "download", + download, + ] + ) + + # commit changes + self.cli_commit() + + # Validate configuration values + conf = ConfigParser(allow_no_value=True, delimiters="=", strict=False) + conf.read(self._config_file) + + # check proper path to chap-secrets file + self.assertEqual(conf["chap-secrets"]["chap-secrets"], self._chap_secrets) + + # basic verification + self.verify(conf) + + # check local users + tmp = cmd(f"sudo cat {self._chap_secrets}") + regex = f"{user}\s+\*\s+{password}\s+{static_ip}\s+{download}/{upload}" + tmp = re.findall(regex, tmp) + self.assertTrue(tmp) + + # Check local-users default value(s) + self.delete(["authentication", "local-users", "username", user, "static-ip"]) + # commit changes + self.cli_commit() + + # check local users + tmp = cmd(f"sudo cat {self._chap_secrets}") + regex = f"{user}\s+\*\s+{password}\s+\*\s+{download}/{upload}" + tmp = re.findall(regex, tmp) + self.assertTrue(tmp) + + def test_accel_radius_authentication(self): + # Test configuration of RADIUS authentication for PPPoE server + self.basic_config() + + radius_server = "192.0.2.22" + radius_key = "secretVyOS" + radius_port = "2000" + + self.set(["authentication", "mode", "radius"]) + self.set( + ["authentication", "radius", "server", radius_server, "key", radius_key] + ) + self.set( + [ + "authentication", + "radius", + "server", + radius_server, + "port", + radius_port, + ] + ) + + + nas_id = "VyOS-PPPoE" + nas_ip = "7.7.7.7" + self.set(["authentication", "radius", "nas-identifier", nas_id]) + self.set(["authentication", "radius", "nas-ip-address", nas_ip]) + + source_address = "1.2.3.4" + self.set(["authentication", "radius", "source-address", source_address]) + + # commit changes + self.cli_commit() + + # Validate configuration values + conf = ConfigParser(allow_no_value=True, delimiters="=", strict=False) + conf.read(self._config_file) + + # basic verification + self.verify(conf) + + # check auth + self.assertTrue(conf["radius"].getboolean("verbose")) + self.assertEqual(conf["radius"]["acct-timeout"], "3") + self.assertEqual(conf["radius"]["timeout"], "3") + self.assertEqual(conf["radius"]["max-try"], "3") + + self.assertEqual(conf["radius"]["nas-identifier"], nas_id) + self.assertEqual(conf["radius"]["nas-ip-address"], nas_ip) + self.assertEqual(conf["radius"]["bind"], source_address) + + server = conf["radius"]["server"].split(",") + self.assertEqual(radius_server, server[0]) + self.assertEqual(radius_key, server[1]) + self.assertEqual(f"auth-port={radius_port}", server[2]) + self.assertEqual(f"req-limit=0", server[4]) + self.assertEqual(f"fail-time=0", server[5]) + + # + # Disable Radius Accounting + # + self.set( + [ + "authentication", + "radius", + "server", + radius_server, + "disable-accounting", + ] + ) + + # commit changes + self.cli_commit() + + conf.read(self._config_file) + + server = conf["radius"]["server"].split(",") + self.assertEqual(radius_server, server[0]) + self.assertEqual(radius_key, server[1]) + self.assertEqual(f"auth-port={radius_port}", server[2]) + self.assertEqual(f"acct-port=0", server[3]) + self.assertEqual(f"req-limit=0", server[4]) + self.assertEqual(f"fail-time=0", server[5]) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_vpn_pptp.py b/smoketest/scripts/cli/test_vpn_pptp.py new file mode 100755 index 000000000..0d9ea312e --- /dev/null +++ b/smoketest/scripts/cli/test_vpn_pptp.py @@ -0,0 +1,223 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2023 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import re +import unittest + +from configparser import ConfigParser +from vyos.utils.process import cmd +from base_accel_ppp_test import BasicAccelPPPTest +from vyos.template import is_ipv4 + + +class TestVPNPPTPServer(BasicAccelPPPTest.TestCase): + @classmethod + def setUpClass(cls): + cls._base_path = ['vpn', 'pptp', 'remote-access'] + cls._config_file = '/run/accel-pppd/pptp.conf' + cls._chap_secrets = '/run/accel-pppd/pptp.chap-secrets' + cls._protocol_section = 'pptp' + # call base-classes classmethod + super(TestVPNPPTPServer, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(TestVPNPPTPServer, cls).tearDownClass() + + def basic_protocol_specific_config(self): + pass + + def test_accel_name_servers(self): + # Verify proper Name-Server configuration for IPv4 + self.basic_config() + + nameserver = ["192.0.2.1", "192.0.2.2"] + for ns in nameserver: + self.set(["name-server", ns]) + + # commit changes + self.cli_commit() + + # Validate configuration values + conf = ConfigParser(allow_no_value=True, delimiters="=", strict=False) + conf.read(self._config_file) + + # IPv4 and IPv6 nameservers must be checked individually + for ns in nameserver: + self.assertIn(ns, [conf["dns"]["dns1"], conf["dns"]["dns2"]]) + + def test_accel_local_authentication(self): + # Test configuration of local authentication + self.basic_config() + + # upload / download limit + user = "test" + password = "test2" + static_ip = "100.100.100.101" + upload = "5000" + download = "10000" + + self.set( + [ + "authentication", + "local-users", + "username", + user, + "password", + password, + ] + ) + self.set( + [ + "authentication", + "local-users", + "username", + user, + "static-ip", + static_ip, + ] + ) + + # commit changes + self.cli_commit() + + # Validate configuration values + conf = ConfigParser(allow_no_value=True, delimiters="=", strict=False) + conf.read(self._config_file) + + # check proper path to chap-secrets file + self.assertEqual(conf["chap-secrets"]["chap-secrets"], self._chap_secrets) + + # basic verification + self.verify(conf) + + # check local users + tmp = cmd(f"sudo cat {self._chap_secrets}") + regex = f"{user}\s+\*\s+{password}\s+{static_ip}\s" + tmp = re.findall(regex, tmp) + self.assertTrue(tmp) + + # Check local-users default value(s) + self.delete(["authentication", "local-users", "username", user, "static-ip"]) + # commit changes + self.cli_commit() + + # check local users + tmp = cmd(f"sudo cat {self._chap_secrets}") + regex = f"{user}\s+\*\s+{password}\s+\*\s" + tmp = re.findall(regex, tmp) + self.assertTrue(tmp) + + def test_accel_radius_authentication(self): + # Test configuration of RADIUS authentication for PPPoE server + self.basic_config() + + radius_server = "192.0.2.22" + radius_key = "secretVyOS" + radius_port = "2000" + radius_port_acc = "3000" + + self.set(["authentication", "mode", "radius"]) + self.set( + ["authentication", "radius", "server", radius_server, "key", radius_key] + ) + self.set( + [ + "authentication", + "radius", + "server", + radius_server, + "port", + radius_port, + ] + ) + self.set( + [ + "authentication", + "radius", + "server", + radius_server, + "acct-port", + radius_port_acc, + ] + ) + + nas_id = "VyOS-PPPoE" + nas_ip = "7.7.7.7" + self.set(["authentication", "radius", "nas-identifier", nas_id]) + self.set(["authentication", "radius", "nas-ip-address", nas_ip]) + + source_address = "1.2.3.4" + self.set(["authentication", "radius", "source-address", source_address]) + + # commit changes + self.cli_commit() + + # Validate configuration values + conf = ConfigParser(allow_no_value=True, delimiters="=", strict=False) + conf.read(self._config_file) + + # basic verification + self.verify(conf) + + # check auth + self.assertTrue(conf["radius"].getboolean("verbose")) + self.assertEqual(conf["radius"]["acct-timeout"], "30") + self.assertEqual(conf["radius"]["timeout"], "30") + self.assertEqual(conf["radius"]["max-try"], "3") + + self.assertEqual(conf["radius"]["nas-identifier"], nas_id) + self.assertEqual(conf["radius"]["nas-ip-address"], nas_ip) + self.assertEqual(conf["radius"]["bind"], source_address) + + server = conf["radius"]["server"].split(",") + self.assertEqual(radius_server, server[0]) + self.assertEqual(radius_key, server[1]) + self.assertEqual(f"auth-port={radius_port}", server[2]) + self.assertEqual(f"acct-port={radius_port_acc}", server[3]) + self.assertEqual(f"req-limit=0", server[4]) + self.assertEqual(f"fail-time=0", server[5]) + + # + # Disable Radius Accounting + # + self.delete(["authentication", "radius", "server", radius_server, "acct-port"]) + self.set( + [ + "authentication", + "radius", + "server", + radius_server, + "disable-accounting", + ] + ) + + # commit changes + self.cli_commit() + + conf.read(self._config_file) + + server = conf["radius"]["server"].split(",") + self.assertEqual(radius_server, server[0]) + self.assertEqual(radius_key, server[1]) + self.assertEqual(f"auth-port={radius_port}", server[2]) + self.assertEqual(f"acct-port=0", server[3]) + self.assertEqual(f"req-limit=0", server[4]) + self.assertEqual(f"fail-time=0", server[5]) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_vpn_sstp.py b/smoketest/scripts/cli/test_vpn_sstp.py index 232eafcf2..f0695d577 100755 --- a/smoketest/scripts/cli/test_vpn_sstp.py +++ b/smoketest/scripts/cli/test_vpn_sstp.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2020-2022 VyOS maintainers and contributors +# Copyright (C) 2020-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -47,7 +47,7 @@ class TestVPNSSTPServer(BasicAccelPPPTest.TestCase): cls._base_path = ['vpn', 'sstp'] cls._config_file = '/run/accel-pppd/sstp.conf' cls._chap_secrets = '/run/accel-pppd/sstp.chap-secrets' - + cls._protocol_section = 'sstp' # call base-classes classmethod super(TestVPNSSTPServer, cls).setUpClass() @@ -58,26 +58,23 @@ class TestVPNSSTPServer(BasicAccelPPPTest.TestCase): @classmethod def tearDownClass(cls): cls.cli_delete(cls, pki_path) - super(TestVPNSSTPServer, cls).tearDownClass() - def basic_config(self): - # SSL is mandatory + def basic_protocol_specific_config(self): self.set(['ssl', 'ca-certificate', 'sstp']) self.set(['ssl', 'certificate', 'sstp']) - self.set(['client-ip-pool', 'subnet', '192.0.2.0/24']) - - super().basic_config() def test_accel_local_authentication(self): # Change default port port = '8443' self.set(['port', port]) + self.basic_config() super().test_accel_local_authentication() config = read_file(self._config_file) self.assertIn(f'port={port}', config) + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_vrf.py b/smoketest/scripts/cli/test_vrf.py index 0f658f366..bb91eddea 100755 --- a/smoketest/scripts/cli/test_vrf.py +++ b/smoketest/scripts/cli/test_vrf.py @@ -489,4 +489,4 @@ class VRFTest(VyOSUnitTestSHIM.TestCase): if __name__ == '__main__': - unittest.main(verbosity=2, failfast=True) + unittest.main(verbosity=2) |