From d5062cb045fae8b0b5d68b3b1198c3b86de4d558 Mon Sep 17 00:00:00 2001 From: aapostoliuk Date: Mon, 13 Nov 2023 11:17:23 +0200 Subject: accel-ppp: T5688: Standardized pool configuration in accel-ppp Standardized pool configuration for all accel-ppp services. 1. Only named pools are used now. 2. Allows all services to use range in x.x.x.x/mask and x.x.x.x-x.x.x.y format 3. next-pool can be used in all services 2. Allows to use in ipoe gw-ip-address without pool configuration which allows to use Fraimed-IP-Address attribute by radius. 3. Default pool name should be explicidly configured with default-pool. 4. In ipoe netmask and range subnet can be different. (cherry picked from commit 422eb463d413da812eabc28706e507a9910d7b53) --- smoketest/scripts/cli/base_accel_ppp_test.py | 355 ++++++++++++++++----- smoketest/scripts/cli/test_service_ipoe-server.py | 210 ++++++------ smoketest/scripts/cli/test_service_pppoe-server.py | 65 +--- smoketest/scripts/cli/test_vpn_l2tp.py | 212 ++++++++++++ smoketest/scripts/cli/test_vpn_pptp.py | 223 +++++++++++++ smoketest/scripts/cli/test_vpn_sstp.py | 13 +- 6 files changed, 836 insertions(+), 242 deletions(-) create mode 100755 smoketest/scripts/cli/test_vpn_l2tp.py create mode 100755 smoketest/scripts/cli/test_vpn_pptp.py (limited to 'smoketest') diff --git a/smoketest/scripts/cli/base_accel_ppp_test.py b/smoketest/scripts/cli/base_accel_ppp_test.py index 989028f64..32624719f 100644 --- a/smoketest/scripts/cli/base_accel_ppp_test.py +++ b/smoketest/scripts/cli/base_accel_ppp_test.py @@ -11,10 +11,10 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see . - import re import unittest + from base_vyostest_shim import VyOSUnitTestSHIM from configparser import ConfigParser @@ -25,12 +25,12 @@ from vyos.utils.system import get_half_cpus from vyos.utils.process import process_named_running from vyos.utils.process import cmd + class BasicAccelPPPTest: class TestCase(VyOSUnitTestSHIM.TestCase): - @classmethod def setUpClass(cls): - cls._process_name = 'accel-pppd' + cls._process_name = "accel-pppd" super(BasicAccelPPPTest.TestCase, cls).setUpClass() @@ -39,7 +39,7 @@ class BasicAccelPPPTest: cls.cli_delete(cls, cls._base_path) def setUp(self): - self._gateway = '192.0.2.1' + self._gateway = "192.0.2.1" # ensure we can also run this test on a live system - so lets clean # out the current configuration :) self.cli_delete(self._base_path) @@ -60,84 +60,189 @@ class BasicAccelPPPTest: def delete(self, path): self.cli_delete(self._base_path + path) - def basic_config(self): - # PPPoE local auth mode requires local users to be configured! - self.set(['authentication', 'local-users', 'username', 'vyos', 'password', 'vyos']) - self.set(['authentication', 'mode', 'local']) - self.set(['gateway-address', self._gateway]) + def basic_protocol_specific_config(self): + """ + An astract method. + Initialize protocol scpecific configureations. + """ + self.assertFalse(True, msg="Function must be defined") + + def initial_auth_config(self): + """ + Initialization of default authentication for all protocols + """ + self.set( + [ + "authentication", + "local-users", + "username", + "vyos", + "password", + "vyos", + ] + ) + self.set(["authentication", "mode", "local"]) + + def initial_gateway_config(self): + """ + Initialization of default gateway + """ + self.set(["gateway-address", self._gateway]) + + def initial_pool_config(self): + """ + Initialization of default client ip pool + """ + first_pool = "SIMPLE-POOL" + self.set(["client-ip-pool", first_pool, "range", "192.0.2.0/24"]) + self.set(["default-pool", first_pool]) + + def basic_config(self, is_auth=True, is_gateway=True, is_client_pool=True): + """ + Initialization of basic configuration + :param is_auth: authentication initialization + :type is_auth: bool + :param is_gateway: gateway initialization + :type is_gateway: bool + :param is_client_pool: client ip pool initialization + :type is_client_pool: bool + """ + self.basic_protocol_specific_config() + if is_auth: + self.initial_auth_config() + if is_gateway: + self.initial_gateway_config() + if is_client_pool: + self.initial_pool_config() + + def getConfig(self, start, end="cli"): + """ + Return part of configuration from line + where the first injection of start keyword to the line + where the first injection of end keyowrd + :param start: start keyword + :type start: str + :param end: end keyword + :type end: str + :return: part of config + :rtype: str + """ + command = f'cat {self._config_file} | sed -n "/^\[{start}/,/^\[{end}/p"' + out = cmd(command) + return out def verify(self, conf): - self.assertEqual(conf['core']['thread-count'], str(get_half_cpus())) + self.assertEqual(conf["core"]["thread-count"], str(get_half_cpus())) def test_accel_name_servers(self): # Verify proper Name-Server configuration for IPv4 and IPv6 self.basic_config() - nameserver = ['192.0.2.1', '192.0.2.2', '2001:db8::1'] + nameserver = ["192.0.2.1", "192.0.2.2", "2001:db8::1"] for ns in nameserver: - self.set(['name-server', ns]) + self.set(["name-server", ns]) # commit changes self.cli_commit() # Validate configuration values - conf = ConfigParser(allow_no_value=True, delimiters='=') + conf = ConfigParser(allow_no_value=True, delimiters="=", strict=False) conf.read(self._config_file) # IPv4 and IPv6 nameservers must be checked individually for ns in nameserver: if is_ipv4(ns): - self.assertIn(ns, [conf['dns']['dns1'], conf['dns']['dns2']]) + self.assertIn(ns, [conf["dns"]["dns1"], conf["dns"]["dns2"]]) else: - self.assertEqual(conf['ipv6-dns'][ns], None) + self.assertEqual(conf["ipv6-dns"][ns], None) def test_accel_local_authentication(self): # Test configuration of local authentication self.basic_config() # upload / download limit - user = 'test' - password = 'test2' - static_ip = '100.100.100.101' - upload = '5000' - download = '10000' - - self.set(['authentication', 'local-users', 'username', user, 'password', password]) - self.set(['authentication', 'local-users', 'username', user, 'static-ip', static_ip]) - self.set(['authentication', 'local-users', 'username', user, 'rate-limit', 'upload', upload]) + user = "test" + password = "test2" + static_ip = "100.100.100.101" + upload = "5000" + download = "10000" + + self.set( + [ + "authentication", + "local-users", + "username", + user, + "password", + password, + ] + ) + self.set( + [ + "authentication", + "local-users", + "username", + user, + "static-ip", + static_ip, + ] + ) + self.set( + [ + "authentication", + "local-users", + "username", + user, + "rate-limit", + "upload", + upload, + ] + ) # upload rate-limit requires also download rate-limit with self.assertRaises(ConfigSessionError): self.cli_commit() - self.set(['authentication', 'local-users', 'username', user, 'rate-limit', 'download', download]) + self.set( + [ + "authentication", + "local-users", + "username", + user, + "rate-limit", + "download", + download, + ] + ) # commit changes self.cli_commit() # Validate configuration values - conf = ConfigParser(allow_no_value=True, delimiters='=') + conf = ConfigParser(allow_no_value=True, delimiters="=", strict=False) conf.read(self._config_file) # check proper path to chap-secrets file - self.assertEqual(conf['chap-secrets']['chap-secrets'], self._chap_secrets) + self.assertEqual(conf["chap-secrets"]["chap-secrets"], self._chap_secrets) # basic verification self.verify(conf) # check local users - tmp = cmd(f'sudo cat {self._chap_secrets}') - regex = f'{user}\s+\*\s+{password}\s+{static_ip}\s+{download}/{upload}' + tmp = cmd(f"sudo cat {self._chap_secrets}") + regex = f"{user}\s+\*\s+{password}\s+{static_ip}\s+{download}/{upload}" tmp = re.findall(regex, tmp) self.assertTrue(tmp) # Check local-users default value(s) - self.delete(['authentication', 'local-users', 'username', user, 'static-ip']) + self.delete( + ["authentication", "local-users", "username", user, "static-ip"] + ) # commit changes self.cli_commit() # check local users - tmp = cmd(f'sudo cat {self._chap_secrets}') - regex = f'{user}\s+\*\s+{password}\s+\*\s+{download}/{upload}' + tmp = cmd(f"sudo cat {self._chap_secrets}") + regex = f"{user}\s+\*\s+{password}\s+\*\s+{download}/{upload}" tmp = re.findall(regex, tmp) self.assertTrue(tmp) @@ -145,74 +250,170 @@ class BasicAccelPPPTest: # Test configuration of RADIUS authentication for PPPoE server self.basic_config() - radius_server = '192.0.2.22' - radius_key = 'secretVyOS' - radius_port = '2000' - radius_port_acc = '3000' - - self.set(['authentication', 'mode', 'radius']) - self.set(['authentication', 'radius', 'server', radius_server, 'key', radius_key]) - self.set(['authentication', 'radius', 'server', radius_server, 'port', radius_port]) - self.set(['authentication', 'radius', 'server', radius_server, 'acct-port', radius_port_acc]) - - coa_server = '4.4.4.4' - coa_key = 'testCoA' - self.set(['authentication', 'radius', 'dynamic-author', 'server', coa_server]) - self.set(['authentication', 'radius', 'dynamic-author', 'key', coa_key]) - - nas_id = 'VyOS-PPPoE' - nas_ip = '7.7.7.7' - self.set(['authentication', 'radius', 'nas-identifier', nas_id]) - self.set(['authentication', 'radius', 'nas-ip-address', nas_ip]) - - source_address = '1.2.3.4' - self.set(['authentication', 'radius', 'source-address', source_address]) + radius_server = "192.0.2.22" + radius_key = "secretVyOS" + radius_port = "2000" + radius_port_acc = "3000" + + self.set(["authentication", "mode", "radius"]) + self.set( + ["authentication", "radius", "server", radius_server, "key", radius_key] + ) + self.set( + [ + "authentication", + "radius", + "server", + radius_server, + "port", + radius_port, + ] + ) + self.set( + [ + "authentication", + "radius", + "server", + radius_server, + "acct-port", + radius_port_acc, + ] + ) + + coa_server = "4.4.4.4" + coa_key = "testCoA" + self.set( + ["authentication", "radius", "dynamic-author", "server", coa_server] + ) + self.set(["authentication", "radius", "dynamic-author", "key", coa_key]) + + nas_id = "VyOS-PPPoE" + nas_ip = "7.7.7.7" + self.set(["authentication", "radius", "nas-identifier", nas_id]) + self.set(["authentication", "radius", "nas-ip-address", nas_ip]) + + source_address = "1.2.3.4" + self.set(["authentication", "radius", "source-address", source_address]) # commit changes self.cli_commit() # Validate configuration values - conf = ConfigParser(allow_no_value=True, delimiters='=') + conf = ConfigParser(allow_no_value=True, delimiters="=", strict=False) conf.read(self._config_file) # basic verification self.verify(conf) # check auth - self.assertTrue(conf['radius'].getboolean('verbose')) - self.assertEqual(conf['radius']['acct-timeout'], '3') - self.assertEqual(conf['radius']['timeout'], '3') - self.assertEqual(conf['radius']['max-try'], '3') - - self.assertEqual(conf['radius']['dae-server'], f'{coa_server}:1700,{coa_key}') - self.assertEqual(conf['radius']['nas-identifier'], nas_id) - self.assertEqual(conf['radius']['nas-ip-address'], nas_ip) - self.assertEqual(conf['radius']['bind'], source_address) - - server = conf['radius']['server'].split(',') + self.assertTrue(conf["radius"].getboolean("verbose")) + self.assertEqual(conf["radius"]["acct-timeout"], "3") + self.assertEqual(conf["radius"]["timeout"], "3") + self.assertEqual(conf["radius"]["max-try"], "3") + + self.assertEqual( + conf["radius"]["dae-server"], f"{coa_server}:1700,{coa_key}" + ) + self.assertEqual(conf["radius"]["nas-identifier"], nas_id) + self.assertEqual(conf["radius"]["nas-ip-address"], nas_ip) + self.assertEqual(conf["radius"]["bind"], source_address) + + server = conf["radius"]["server"].split(",") self.assertEqual(radius_server, server[0]) self.assertEqual(radius_key, server[1]) - self.assertEqual(f'auth-port={radius_port}', server[2]) - self.assertEqual(f'acct-port={radius_port_acc}', server[3]) - self.assertEqual(f'req-limit=0', server[4]) - self.assertEqual(f'fail-time=0', server[5]) + self.assertEqual(f"auth-port={radius_port}", server[2]) + self.assertEqual(f"acct-port={radius_port_acc}", server[3]) + self.assertEqual(f"req-limit=0", server[4]) + self.assertEqual(f"fail-time=0", server[5]) # # Disable Radius Accounting # - self.delete(['authentication', 'radius', 'server', radius_server, 'acct-port']) - self.set(['authentication', 'radius', 'server', radius_server, 'disable-accounting']) + self.delete( + ["authentication", "radius", "server", radius_server, "acct-port"] + ) + self.set( + [ + "authentication", + "radius", + "server", + radius_server, + "disable-accounting", + ] + ) # commit changes self.cli_commit() conf.read(self._config_file) - server = conf['radius']['server'].split(',') + server = conf["radius"]["server"].split(",") self.assertEqual(radius_server, server[0]) self.assertEqual(radius_key, server[1]) - self.assertEqual(f'auth-port={radius_port}', server[2]) - self.assertEqual(f'acct-port=0', server[3]) - self.assertEqual(f'req-limit=0', server[4]) - self.assertEqual(f'fail-time=0', server[5]) + self.assertEqual(f"auth-port={radius_port}", server[2]) + self.assertEqual(f"acct-port=0", server[3]) + self.assertEqual(f"req-limit=0", server[4]) + self.assertEqual(f"fail-time=0", server[5]) + + def test_accel_ipv4_pool(self): + """ + Test accel-ppp IPv4 pool + """ + self.basic_config(is_gateway=False, is_client_pool=False) + gateway = "192.0.2.1" + subnet = "172.16.0.0/24" + first_pool = "POOL1" + second_pool = "POOL2" + range = "192.0.2.10-192.0.2.20" + + self.set(["gateway-address", gateway]) + self.set(["client-ip-pool", first_pool, "range", subnet]) + self.set(["client-ip-pool", first_pool, "next-pool", second_pool]) + self.set(["client-ip-pool", second_pool, "range", range]) + self.set(["default-pool", first_pool]) + # commit changes + + self.cli_commit() + + # Validate configuration values + conf = ConfigParser(allow_no_value=True, delimiters="=", strict=False) + conf.read(self._config_file) + + self.assertEqual( + f"{first_pool},next={second_pool}", conf["ip-pool"][f"{subnet},name"] + ) + self.assertEqual(second_pool, conf["ip-pool"][f"{range},name"]) + self.assertEqual(gateway, conf["ip-pool"]["gw-ip-address"]) + self.assertEqual(first_pool, conf[self._protocol_section]["ip-pool"]) + + def test_accel_next_pool(self): + """ + T5099 required specific order + """ + self.basic_config(is_gateway=False, is_client_pool=False) + + gateway = "192.0.2.1" + first_pool = "VyOS-pool1" + first_subnet = "192.0.2.0/25" + second_pool = "Vyos-pool2" + second_subnet = "203.0.113.0/25" + third_pool = "Vyos-pool3" + third_subnet = "198.51.100.0/24" + + self.set(["gateway-address", gateway]) + self.set(["client-ip-pool", first_pool, "range", first_subnet]) + self.set(["client-ip-pool", first_pool, "next-pool", second_pool]) + self.set(["client-ip-pool", second_pool, "range", second_subnet]) + self.set(["client-ip-pool", second_pool, "next-pool", third_pool]) + self.set(["client-ip-pool", third_pool, "range", third_subnet]) + + # commit changes + self.cli_commit() + + config = self.getConfig("ip-pool") + pool_config = f"""gw-ip-address={gateway} +{third_subnet},name={third_pool} +{second_subnet},name={second_pool},next={third_pool} +{first_subnet},name={first_pool},next={second_pool}""" + self.assertIn(pool_config, config) diff --git a/smoketest/scripts/cli/test_service_ipoe-server.py b/smoketest/scripts/cli/test_service_ipoe-server.py index 4dd3e761c..358668e0d 100755 --- a/smoketest/scripts/cli/test_service_ipoe-server.py +++ b/smoketest/scripts/cli/test_service_ipoe-server.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2022 VyOS maintainers and contributors +# Copyright (C) 2022-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -17,28 +17,35 @@ import re import unittest +from collections import OrderedDict from base_accel_ppp_test import BasicAccelPPPTest from vyos.configsession import ConfigSessionError from vyos.utils.process import cmd - from configparser import ConfigParser +from configparser import RawConfigParser -ac_name = 'ACN' -interface = 'eth0' +ac_name = "ACN" +interface = "eth0" -def getConfig(string, end='cli'): - command = f'cat /run/accel-pppd/ipoe.conf | sed -n "/^{string}/,/^{end}/p"' - out = cmd(command) - return out +class MultiOrderedDict(OrderedDict): + # Accel-ppp has duplicate keys in config file (gw-ip-address) + # This class is used to define dictionary which can contain multiple values + # in one key. + def __setitem__(self, key, value): + if isinstance(value, list) and key in self: + self[key].extend(value) + else: + super(OrderedDict, self).__setitem__(key, value) class TestServiceIPoEServer(BasicAccelPPPTest.TestCase): @classmethod def setUpClass(cls): - cls._base_path = ['service', 'ipoe-server'] - cls._config_file = '/run/accel-pppd/ipoe.conf' - cls._chap_secrets = '/run/accel-pppd/ipoe.chap-secrets' + cls._base_path = ["service", "ipoe-server"] + cls._config_file = "/run/accel-pppd/ipoe.conf" + cls._chap_secrets = "/run/accel-pppd/ipoe.chap-secrets" + cls._protocol_section = "ipoe" # call base-classes classmethod super(TestServiceIPoEServer, cls).setUpClass() @@ -47,22 +54,29 @@ class TestServiceIPoEServer(BasicAccelPPPTest.TestCase): super().verify(conf) # Validate configuration values - accel_modules = list(conf['modules'].keys()) - self.assertIn('log_syslog', accel_modules) - self.assertIn('ipoe', accel_modules) - self.assertIn('shaper', accel_modules) - self.assertIn('ipv6pool', accel_modules) - self.assertIn('ipv6_nd', accel_modules) - self.assertIn('ipv6_dhcp', accel_modules) - self.assertIn('ippool', accel_modules) - - def basic_config(self): - self.set(['interface', interface, 'client-subnet', '192.168.0.0/24']) + accel_modules = list(conf["modules"].keys()) + self.assertIn("log_syslog", accel_modules) + self.assertIn("ipoe", accel_modules) + self.assertIn("shaper", accel_modules) + self.assertIn("ipv6pool", accel_modules) + self.assertIn("ipv6_nd", accel_modules) + self.assertIn("ipv6_dhcp", accel_modules) + self.assertIn("ippool", accel_modules) + + def initial_gateway_config(self): + self._gateway = "192.0.2.1/24" + super().initial_gateway_config() + + def initial_auth_config(self): + self.set(["authentication", "mode", "noauth"]) + + def basic_protocol_specific_config(self): + self.set(["interface", interface, "client-subnet", "192.168.0.0/24"]) def test_accel_local_authentication(self): - mac_address = '08:00:27:2f:d8:06' - self.set(['authentication', 'interface', interface, 'mac', mac_address]) - self.set(['authentication', 'mode', 'local']) + mac_address = "08:00:27:2f:d8:06" + self.set(["authentication", "interface", interface, "mac", mac_address]) + self.set(["authentication", "mode", "local"]) # No IPoE interface configured with self.assertRaises(ConfigSessionError): @@ -70,115 +84,109 @@ class TestServiceIPoEServer(BasicAccelPPPTest.TestCase): # Test configuration of local authentication for PPPoE server self.basic_config() - + # Rewrite authentication from basic_config + self.set(["authentication", "interface", interface, "mac", mac_address]) + self.set(["authentication", "mode", "local"]) # commit changes self.cli_commit() # Validate configuration values - conf = ConfigParser(allow_no_value=True, delimiters='=') + conf = ConfigParser(allow_no_value=True, delimiters="=", strict=False) conf.read(self._config_file) # check proper path to chap-secrets file - self.assertEqual(conf['chap-secrets']['chap-secrets'], self._chap_secrets) + self.assertEqual(conf["chap-secrets"]["chap-secrets"], self._chap_secrets) - accel_modules = list(conf['modules'].keys()) - self.assertIn('chap-secrets', accel_modules) + accel_modules = list(conf["modules"].keys()) + self.assertIn("chap-secrets", accel_modules) # basic verification self.verify(conf) # check local users - tmp = cmd(f'sudo cat {self._chap_secrets}') - regex = f'{interface}\s+\*\s+{mac_address}\s+\*' + tmp = cmd(f"sudo cat {self._chap_secrets}") + regex = f"{interface}\s+\*\s+{mac_address}\s+\*" tmp = re.findall(regex, tmp) self.assertTrue(tmp) - def test_accel_named_pool(self): - first_pool = 'VyOS-pool1' - first_subnet = '192.0.2.0/25' - first_gateway = '192.0.2.1' - second_pool = 'Vyos-pool2' - second_subnet = '203.0.113.0/25' - second_gateway = '203.0.113.1' - - self.set(['authentication', 'mode', 'noauth']) - self.set(['client-ip-pool', 'name', first_pool, 'gateway-address', first_gateway]) - self.set(['client-ip-pool', 'name', first_pool, 'subnet', first_subnet]) - self.set(['client-ip-pool', 'name', second_pool, 'gateway-address', second_gateway]) - self.set(['client-ip-pool', 'name', second_pool, 'subnet', second_subnet]) - self.set(['interface', interface]) + def test_accel_ipv4_pool(self): + self.basic_config(is_gateway=False, is_client_pool=False) + gateway = ["172.16.0.1/25", "192.0.2.1/24"] + subnet = "172.16.0.0/24" + first_pool = "POOL1" + second_pool = "POOL2" + range = "192.0.2.10-192.0.2.20" + + for gw in gateway: + self.set(["gateway-address", gw]) + + self.set(["client-ip-pool", first_pool, "range", subnet]) + self.set(["client-ip-pool", first_pool, "next-pool", second_pool]) + self.set(["client-ip-pool", second_pool, "range", range]) + self.set(["default-pool", first_pool]) # commit changes - self.cli_commit() + self.cli_commit() # Validate configuration values - conf = ConfigParser(allow_no_value=True, delimiters='=', strict=False) + conf = RawConfigParser( + allow_no_value=True, + delimiters="=", + strict=False, + dict_type=MultiOrderedDict, + ) conf.read(self._config_file) - self.assertTrue(conf['ipoe']['interface'], f'{interface},shared=1,mode=L2,ifcfg=1,start=dhcpv4,ipv6=1') - self.assertTrue(conf['ipoe']['noauth'], '1') - self.assertTrue(conf['ipoe']['ip-pool'], first_pool) - self.assertTrue(conf['ipoe']['ip-pool'], second_pool) - self.assertTrue(conf['ipoe']['gw-ip-address'], f'{first_gateway}/25') - self.assertTrue(conf['ipoe']['gw-ip-address'], f'{second_gateway}/25') - - config = getConfig('[ip-pool]') - pool_config = f'''{second_subnet},name={second_pool} -{first_subnet},name={first_pool} -gw-ip-address={second_gateway}/25 -gw-ip-address={first_gateway}/25''' - self.assertIn(pool_config, config) + self.assertIn( + f"{first_pool},next={second_pool}", conf["ip-pool"][f"{subnet},name"] + ) + self.assertIn(second_pool, conf["ip-pool"][f"{range},name"]) + + gw_pool_config_list = conf.get("ip-pool", "gw-ip-address") + gw_ipoe_config_list = conf.get(self._protocol_section, "gw-ip-address") + for gw in gateway: + self.assertIn(gw.split("/")[0], gw_pool_config_list) + self.assertIn(gw, gw_ipoe_config_list) + self.assertIn(first_pool, conf[self._protocol_section]["ip-pool"]) def test_accel_next_pool(self): - first_pool = 'VyOS-pool1' - first_subnet = '192.0.2.0/25' - first_gateway = '192.0.2.1' - second_pool = 'Vyos-pool2' - second_subnet = '203.0.113.0/25' - second_gateway = '203.0.113.1' - third_pool = 'Vyos-pool3' - third_subnet = '198.51.100.0/24' - third_gateway = '198.51.100.1' - - self.set(['authentication', 'mode', 'noauth']) - self.set(['client-ip-pool', 'name', first_pool, 'gateway-address', first_gateway]) - self.set(['client-ip-pool', 'name', first_pool, 'subnet', first_subnet]) - self.set(['client-ip-pool', 'name', first_pool, 'next-pool', second_pool]) - self.set(['client-ip-pool', 'name', second_pool, 'gateway-address', second_gateway]) - self.set(['client-ip-pool', 'name', second_pool, 'subnet', second_subnet]) - self.set(['client-ip-pool', 'name', second_pool, 'next-pool', third_pool]) - self.set(['client-ip-pool', 'name', third_pool, 'gateway-address', third_gateway]) - self.set(['client-ip-pool', 'name', third_pool, 'subnet', third_subnet]) - self.set(['interface', interface]) + self.basic_config(is_gateway=False, is_client_pool=False) + + first_pool = "VyOS-pool1" + first_subnet = "192.0.2.0/25" + first_gateway = "192.0.2.1/24" + second_pool = "Vyos-pool2" + second_subnet = "203.0.113.0/25" + second_gateway = "203.0.113.1/24" + third_pool = "Vyos-pool3" + third_subnet = "198.51.100.0/24" + third_gateway = "198.51.100.1/24" + + self.set(["gateway-address", f"{first_gateway}"]) + self.set(["gateway-address", f"{second_gateway}"]) + self.set(["gateway-address", f"{third_gateway}"]) + + self.set(["client-ip-pool", first_pool, "range", first_subnet]) + self.set(["client-ip-pool", first_pool, "next-pool", second_pool]) + self.set(["client-ip-pool", second_pool, "range", second_subnet]) + self.set(["client-ip-pool", second_pool, "next-pool", third_pool]) + self.set(["client-ip-pool", third_pool, "range", third_subnet]) # commit changes self.cli_commit() - - # Validate configuration values - conf = ConfigParser(allow_no_value=True, delimiters='=', strict=False) - conf.read(self._config_file) - - self.assertTrue(conf['ipoe']['interface'], f'{interface},shared=1,mode=L2,ifcfg=1,start=dhcpv4,ipv6=1') - self.assertTrue(conf['ipoe']['noauth'], '1') - self.assertTrue(conf['ipoe']['ip-pool'], first_pool) - self.assertTrue(conf['ipoe']['gw-ip-address'], f'{first_gateway}/25') - self.assertTrue(conf['ipoe']['gw-ip-address'], f'{second_gateway}/25') - self.assertTrue(conf['ipoe']['gw-ip-address'], f'{third_gateway}/24') - - config = getConfig('[ip-pool]') + config = self.getConfig("ip-pool") # T5099 required specific order - pool_config = f'''{third_subnet},name={third_pool} + pool_config = f"""gw-ip-address={first_gateway.split('/')[0]} +gw-ip-address={second_gateway.split('/')[0]} +gw-ip-address={third_gateway.split('/')[0]} +{third_subnet},name={third_pool} {second_subnet},name={second_pool},next={third_pool} -{first_subnet},name={first_pool},next={second_pool} -gw-ip-address={third_gateway}/24 -gw-ip-address={second_gateway}/25 -gw-ip-address={first_gateway}/25''' +{first_subnet},name={first_pool},next={second_pool}""" self.assertIn(pool_config, config) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main(verbosity=2) - diff --git a/smoketest/scripts/cli/test_service_pppoe-server.py b/smoketest/scripts/cli/test_service_pppoe-server.py index 969abd3d5..3001e71bf 100755 --- a/smoketest/scripts/cli/test_service_pppoe-server.py +++ b/smoketest/scripts/cli/test_service_pppoe-server.py @@ -32,7 +32,7 @@ class TestServicePPPoEServer(BasicAccelPPPTest.TestCase): cls._base_path = ['service', 'pppoe-server'] cls._config_file = '/run/accel-pppd/pppoe.conf' cls._chap_secrets = '/run/accel-pppd/pppoe.chap-secrets' - + cls._protocol_section = 'pppoe' # call base-classes classmethod super(TestServicePPPoEServer, cls).setUpClass() @@ -65,13 +65,11 @@ class TestServicePPPoEServer(BasicAccelPPPTest.TestCase): super().verify(conf) - def basic_config(self): + def basic_protocol_specific_config(self): self.cli_set(local_if + ['address', '192.0.2.1/32']) - self.set(['access-concentrator', ac_name]) self.set(['interface', interface]) - super().basic_config() def test_pppoe_server_ppp_options(self): # Test configuration of local authentication for PPPoE server @@ -120,7 +118,6 @@ class TestServicePPPoEServer(BasicAccelPPPTest.TestCase): # check interface-cache self.assertEqual(conf['ppp']['unit-cache'], interface_cache) - def test_pppoe_server_authentication_protocols(self): # Test configuration of local authentication for PPPoE server self.basic_config() @@ -137,68 +134,25 @@ class TestServicePPPoEServer(BasicAccelPPPTest.TestCase): self.assertEqual(conf['modules']['auth_mschap_v2'], None) - - def test_pppoe_server_client_ip_pool(self): - # Test configuration of IPv6 client pools - self.basic_config() - - subnet = '172.18.0.0/24' + def test_pppoe_server_shaper(self): fwmark = '223' limiter = 'tbf' + self.basic_config() - self.set(['client-ip-pool', 'subnet', subnet]) - - start = '192.0.2.10' - stop = '192.0.2.20' - stop_octet = stop.split('.')[3] - start_stop = f'{start}-{stop_octet}' - self.set(['client-ip-pool', 'start', start]) - self.set(['client-ip-pool', 'stop', stop]) self.set(['shaper', 'fwmark', fwmark]) - # commit changes - self.cli_commit() - - # Validate configuration values - conf = ConfigParser(allow_no_value=True) - conf.read(self._config_file) - # check configured subnet - self.assertEqual(conf['ip-pool'][subnet], None) - self.assertEqual(conf['ip-pool'][start_stop], None) - self.assertEqual(conf['ip-pool']['gw-ip-address'], self._gateway) - self.assertEqual(conf['shaper']['fwmark'], fwmark) - self.assertEqual(conf['shaper']['down-limiter'], limiter) - - - def test_pppoe_server_client_ip_pool_name(self): - # Test configuration of named client pools - self.basic_config() - - subnet = '192.0.2.0/24' - gateway = '192.0.2.1' - pool = 'VYOS' - - subnet_name = f'{subnet},name' - gw_ip_prefix = f'{gateway}/24' - - self.set(['client-ip-pool', 'name', pool, 'subnet', subnet]) - self.set(['client-ip-pool', 'name', pool, 'gateway-address', gateway]) - self.cli_delete(self._base_path + ['gateway-address']) - - # commit changes self.cli_commit() # Validate configuration values conf = ConfigParser(allow_no_value=True, delimiters='=') conf.read(self._config_file) - # Validate configuration - self.assertEqual(conf['ip-pool'][subnet_name], pool) - self.assertEqual(conf['ip-pool']['gw-ip-address'], gateway) - self.assertEqual(conf['pppoe']['ip-pool'], pool) - self.assertEqual(conf['pppoe']['gw-ip-address'], gw_ip_prefix) + # basic verification + self.verify(conf) + self.assertEqual(conf['shaper']['fwmark'], fwmark) + self.assertEqual(conf['shaper']['down-limiter'], limiter) def test_pppoe_server_client_ipv6_pool(self): # Test configuration of IPv6 client pools @@ -239,7 +193,6 @@ class TestServicePPPoEServer(BasicAccelPPPTest.TestCase): self.assertEqual(conf['ipv6-pool'][client_prefix], None) self.assertEqual(conf['ipv6-pool']['delegate'], f'{delegate_prefix},{delegate_mask}') - def test_accel_radius_authentication(self): radius_called_sid = 'ifname:mac' radius_acct_interim_jitter = '9' @@ -261,7 +214,6 @@ class TestServicePPPoEServer(BasicAccelPPPTest.TestCase): self.assertEqual(conf['radius']['acct-interim-jitter'], radius_acct_interim_jitter) self.assertEqual(conf['radius']['acct-interim-interval'], radius_acct_interim_interval) - def test_pppoe_server_vlan(self): vlans = ['100', '200', '300-310'] @@ -284,5 +236,6 @@ class TestServicePPPoEServer(BasicAccelPPPTest.TestCase): tmp = ','.join(vlans) self.assertIn(f'vlan-mon={interface},{tmp}', config) + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_vpn_l2tp.py b/smoketest/scripts/cli/test_vpn_l2tp.py new file mode 100755 index 000000000..05ffb6bb5 --- /dev/null +++ b/smoketest/scripts/cli/test_vpn_l2tp.py @@ -0,0 +1,212 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2023 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +import re +import unittest + +from base_accel_ppp_test import BasicAccelPPPTest +from configparser import ConfigParser +from vyos.utils.process import cmd + + +class TestVPNL2TPServer(BasicAccelPPPTest.TestCase): + @classmethod + def setUpClass(cls): + cls._base_path = ['vpn', 'l2tp', 'remote-access'] + cls._config_file = '/run/accel-pppd/l2tp.conf' + cls._chap_secrets = '/run/accel-pppd/l2tp.chap-secrets' + cls._protocol_section = 'l2tp' + # call base-classes classmethod + super(TestVPNL2TPServer, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(TestVPNL2TPServer, cls).tearDownClass() + + def basic_protocol_specific_config(self): + pass + + def test_accel_local_authentication(self): + # Test configuration of local authentication + self.basic_config() + + # upload / download limit + user = "test" + password = "test2" + static_ip = "100.100.100.101" + upload = "5000" + download = "10000" + + self.set( + [ + "authentication", + "local-users", + "username", + user, + "password", + password, + ] + ) + self.set( + [ + "authentication", + "local-users", + "username", + user, + "static-ip", + static_ip, + ] + ) + self.set( + [ + "authentication", + "local-users", + "username", + user, + "rate-limit", + "upload", + upload, + ] + ) + self.set( + [ + "authentication", + "local-users", + "username", + user, + "rate-limit", + "download", + download, + ] + ) + + # commit changes + self.cli_commit() + + # Validate configuration values + conf = ConfigParser(allow_no_value=True, delimiters="=", strict=False) + conf.read(self._config_file) + + # check proper path to chap-secrets file + self.assertEqual(conf["chap-secrets"]["chap-secrets"], self._chap_secrets) + + # basic verification + self.verify(conf) + + # check local users + tmp = cmd(f"sudo cat {self._chap_secrets}") + regex = f"{user}\s+\*\s+{password}\s+{static_ip}\s+{download}/{upload}" + tmp = re.findall(regex, tmp) + self.assertTrue(tmp) + + # Check local-users default value(s) + self.delete(["authentication", "local-users", "username", user, "static-ip"]) + # commit changes + self.cli_commit() + + # check local users + tmp = cmd(f"sudo cat {self._chap_secrets}") + regex = f"{user}\s+\*\s+{password}\s+\*\s+{download}/{upload}" + tmp = re.findall(regex, tmp) + self.assertTrue(tmp) + + def test_accel_radius_authentication(self): + # Test configuration of RADIUS authentication for PPPoE server + self.basic_config() + + radius_server = "192.0.2.22" + radius_key = "secretVyOS" + radius_port = "2000" + + self.set(["authentication", "mode", "radius"]) + self.set( + ["authentication", "radius", "server", radius_server, "key", radius_key] + ) + self.set( + [ + "authentication", + "radius", + "server", + radius_server, + "port", + radius_port, + ] + ) + + + nas_id = "VyOS-PPPoE" + nas_ip = "7.7.7.7" + self.set(["authentication", "radius", "nas-identifier", nas_id]) + self.set(["authentication", "radius", "nas-ip-address", nas_ip]) + + source_address = "1.2.3.4" + self.set(["authentication", "radius", "source-address", source_address]) + + # commit changes + self.cli_commit() + + # Validate configuration values + conf = ConfigParser(allow_no_value=True, delimiters="=", strict=False) + conf.read(self._config_file) + + # basic verification + self.verify(conf) + + # check auth + self.assertTrue(conf["radius"].getboolean("verbose")) + self.assertEqual(conf["radius"]["acct-timeout"], "3") + self.assertEqual(conf["radius"]["timeout"], "3") + self.assertEqual(conf["radius"]["max-try"], "3") + + self.assertEqual(conf["radius"]["nas-identifier"], nas_id) + self.assertEqual(conf["radius"]["nas-ip-address"], nas_ip) + self.assertEqual(conf["radius"]["bind"], source_address) + + server = conf["radius"]["server"].split(",") + self.assertEqual(radius_server, server[0]) + self.assertEqual(radius_key, server[1]) + self.assertEqual(f"auth-port={radius_port}", server[2]) + self.assertEqual(f"req-limit=0", server[4]) + self.assertEqual(f"fail-time=0", server[5]) + + # + # Disable Radius Accounting + # + self.set( + [ + "authentication", + "radius", + "server", + radius_server, + "disable-accounting", + ] + ) + + # commit changes + self.cli_commit() + + conf.read(self._config_file) + + server = conf["radius"]["server"].split(",") + self.assertEqual(radius_server, server[0]) + self.assertEqual(radius_key, server[1]) + self.assertEqual(f"auth-port={radius_port}", server[2]) + self.assertEqual(f"acct-port=0", server[3]) + self.assertEqual(f"req-limit=0", server[4]) + self.assertEqual(f"fail-time=0", server[5]) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_vpn_pptp.py b/smoketest/scripts/cli/test_vpn_pptp.py new file mode 100755 index 000000000..0d9ea312e --- /dev/null +++ b/smoketest/scripts/cli/test_vpn_pptp.py @@ -0,0 +1,223 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2023 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import re +import unittest + +from configparser import ConfigParser +from vyos.utils.process import cmd +from base_accel_ppp_test import BasicAccelPPPTest +from vyos.template import is_ipv4 + + +class TestVPNPPTPServer(BasicAccelPPPTest.TestCase): + @classmethod + def setUpClass(cls): + cls._base_path = ['vpn', 'pptp', 'remote-access'] + cls._config_file = '/run/accel-pppd/pptp.conf' + cls._chap_secrets = '/run/accel-pppd/pptp.chap-secrets' + cls._protocol_section = 'pptp' + # call base-classes classmethod + super(TestVPNPPTPServer, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(TestVPNPPTPServer, cls).tearDownClass() + + def basic_protocol_specific_config(self): + pass + + def test_accel_name_servers(self): + # Verify proper Name-Server configuration for IPv4 + self.basic_config() + + nameserver = ["192.0.2.1", "192.0.2.2"] + for ns in nameserver: + self.set(["name-server", ns]) + + # commit changes + self.cli_commit() + + # Validate configuration values + conf = ConfigParser(allow_no_value=True, delimiters="=", strict=False) + conf.read(self._config_file) + + # IPv4 and IPv6 nameservers must be checked individually + for ns in nameserver: + self.assertIn(ns, [conf["dns"]["dns1"], conf["dns"]["dns2"]]) + + def test_accel_local_authentication(self): + # Test configuration of local authentication + self.basic_config() + + # upload / download limit + user = "test" + password = "test2" + static_ip = "100.100.100.101" + upload = "5000" + download = "10000" + + self.set( + [ + "authentication", + "local-users", + "username", + user, + "password", + password, + ] + ) + self.set( + [ + "authentication", + "local-users", + "username", + user, + "static-ip", + static_ip, + ] + ) + + # commit changes + self.cli_commit() + + # Validate configuration values + conf = ConfigParser(allow_no_value=True, delimiters="=", strict=False) + conf.read(self._config_file) + + # check proper path to chap-secrets file + self.assertEqual(conf["chap-secrets"]["chap-secrets"], self._chap_secrets) + + # basic verification + self.verify(conf) + + # check local users + tmp = cmd(f"sudo cat {self._chap_secrets}") + regex = f"{user}\s+\*\s+{password}\s+{static_ip}\s" + tmp = re.findall(regex, tmp) + self.assertTrue(tmp) + + # Check local-users default value(s) + self.delete(["authentication", "local-users", "username", user, "static-ip"]) + # commit changes + self.cli_commit() + + # check local users + tmp = cmd(f"sudo cat {self._chap_secrets}") + regex = f"{user}\s+\*\s+{password}\s+\*\s" + tmp = re.findall(regex, tmp) + self.assertTrue(tmp) + + def test_accel_radius_authentication(self): + # Test configuration of RADIUS authentication for PPPoE server + self.basic_config() + + radius_server = "192.0.2.22" + radius_key = "secretVyOS" + radius_port = "2000" + radius_port_acc = "3000" + + self.set(["authentication", "mode", "radius"]) + self.set( + ["authentication", "radius", "server", radius_server, "key", radius_key] + ) + self.set( + [ + "authentication", + "radius", + "server", + radius_server, + "port", + radius_port, + ] + ) + self.set( + [ + "authentication", + "radius", + "server", + radius_server, + "acct-port", + radius_port_acc, + ] + ) + + nas_id = "VyOS-PPPoE" + nas_ip = "7.7.7.7" + self.set(["authentication", "radius", "nas-identifier", nas_id]) + self.set(["authentication", "radius", "nas-ip-address", nas_ip]) + + source_address = "1.2.3.4" + self.set(["authentication", "radius", "source-address", source_address]) + + # commit changes + self.cli_commit() + + # Validate configuration values + conf = ConfigParser(allow_no_value=True, delimiters="=", strict=False) + conf.read(self._config_file) + + # basic verification + self.verify(conf) + + # check auth + self.assertTrue(conf["radius"].getboolean("verbose")) + self.assertEqual(conf["radius"]["acct-timeout"], "30") + self.assertEqual(conf["radius"]["timeout"], "30") + self.assertEqual(conf["radius"]["max-try"], "3") + + self.assertEqual(conf["radius"]["nas-identifier"], nas_id) + self.assertEqual(conf["radius"]["nas-ip-address"], nas_ip) + self.assertEqual(conf["radius"]["bind"], source_address) + + server = conf["radius"]["server"].split(",") + self.assertEqual(radius_server, server[0]) + self.assertEqual(radius_key, server[1]) + self.assertEqual(f"auth-port={radius_port}", server[2]) + self.assertEqual(f"acct-port={radius_port_acc}", server[3]) + self.assertEqual(f"req-limit=0", server[4]) + self.assertEqual(f"fail-time=0", server[5]) + + # + # Disable Radius Accounting + # + self.delete(["authentication", "radius", "server", radius_server, "acct-port"]) + self.set( + [ + "authentication", + "radius", + "server", + radius_server, + "disable-accounting", + ] + ) + + # commit changes + self.cli_commit() + + conf.read(self._config_file) + + server = conf["radius"]["server"].split(",") + self.assertEqual(radius_server, server[0]) + self.assertEqual(radius_key, server[1]) + self.assertEqual(f"auth-port={radius_port}", server[2]) + self.assertEqual(f"acct-port=0", server[3]) + self.assertEqual(f"req-limit=0", server[4]) + self.assertEqual(f"fail-time=0", server[5]) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_vpn_sstp.py b/smoketest/scripts/cli/test_vpn_sstp.py index 232eafcf2..f0695d577 100755 --- a/smoketest/scripts/cli/test_vpn_sstp.py +++ b/smoketest/scripts/cli/test_vpn_sstp.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2020-2022 VyOS maintainers and contributors +# Copyright (C) 2020-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -47,7 +47,7 @@ class TestVPNSSTPServer(BasicAccelPPPTest.TestCase): cls._base_path = ['vpn', 'sstp'] cls._config_file = '/run/accel-pppd/sstp.conf' cls._chap_secrets = '/run/accel-pppd/sstp.chap-secrets' - + cls._protocol_section = 'sstp' # call base-classes classmethod super(TestVPNSSTPServer, cls).setUpClass() @@ -58,26 +58,23 @@ class TestVPNSSTPServer(BasicAccelPPPTest.TestCase): @classmethod def tearDownClass(cls): cls.cli_delete(cls, pki_path) - super(TestVPNSSTPServer, cls).tearDownClass() - def basic_config(self): - # SSL is mandatory + def basic_protocol_specific_config(self): self.set(['ssl', 'ca-certificate', 'sstp']) self.set(['ssl', 'certificate', 'sstp']) - self.set(['client-ip-pool', 'subnet', '192.0.2.0/24']) - - super().basic_config() def test_accel_local_authentication(self): # Change default port port = '8443' self.set(['port', port]) + self.basic_config() super().test_accel_local_authentication() config = read_file(self._config_file) self.assertIn(f'port={port}', config) + if __name__ == '__main__': unittest.main(verbosity=2) -- cgit v1.2.3