diff options
Diffstat (limited to 'smoketest/scripts/cli/base_accel_ppp_test.py')
-rw-r--r-- | smoketest/scripts/cli/base_accel_ppp_test.py | 379 |
1 files changed, 302 insertions, 77 deletions
diff --git a/smoketest/scripts/cli/base_accel_ppp_test.py b/smoketest/scripts/cli/base_accel_ppp_test.py index 989028f64..1ea5db898 100644 --- a/smoketest/scripts/cli/base_accel_ppp_test.py +++ b/smoketest/scripts/cli/base_accel_ppp_test.py @@ -11,10 +11,10 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. - import re import unittest + from base_vyostest_shim import VyOSUnitTestSHIM from configparser import ConfigParser @@ -25,12 +25,12 @@ from vyos.utils.system import get_half_cpus from vyos.utils.process import process_named_running from vyos.utils.process import cmd + class BasicAccelPPPTest: class TestCase(VyOSUnitTestSHIM.TestCase): - @classmethod def setUpClass(cls): - cls._process_name = 'accel-pppd' + cls._process_name = "accel-pppd" super(BasicAccelPPPTest.TestCase, cls).setUpClass() @@ -39,7 +39,7 @@ class BasicAccelPPPTest: cls.cli_delete(cls, cls._base_path) def setUp(self): - self._gateway = '192.0.2.1' + self._gateway = "192.0.2.1" # ensure we can also run this test on a live system - so lets clean # out the current configuration :) self.cli_delete(self._base_path) @@ -60,84 +60,188 @@ class BasicAccelPPPTest: def delete(self, path): self.cli_delete(self._base_path + path) - def basic_config(self): - # PPPoE local auth mode requires local users to be configured! - self.set(['authentication', 'local-users', 'username', 'vyos', 'password', 'vyos']) - self.set(['authentication', 'mode', 'local']) - self.set(['gateway-address', self._gateway]) + def basic_protocol_specific_config(self): + """ + An astract method. + Initialize protocol scpecific configureations. + """ + self.assertFalse(True, msg="Function must be defined") + + def initial_auth_config(self): + """ + Initialization of default authentication for all protocols + """ + self.set( + [ + "authentication", + "local-users", + "username", + "vyos", + "password", + "vyos", + ] + ) + self.set(["authentication", "mode", "local"]) + + def initial_gateway_config(self): + """ + Initialization of default gateway + """ + self.set(["gateway-address", self._gateway]) + + def initial_pool_config(self): + """ + Initialization of default client ip pool + """ + first_pool = "SIMPLE-POOL" + self.set(["client-ip-pool", first_pool, "range", "192.0.2.0/24"]) + self.set(["default-pool", first_pool]) + + def basic_config(self, is_auth=True, is_gateway=True, is_client_pool=True): + """ + Initialization of basic configuration + :param is_auth: authentication initialization + :type is_auth: bool + :param is_gateway: gateway initialization + :type is_gateway: bool + :param is_client_pool: client ip pool initialization + :type is_client_pool: bool + """ + self.basic_protocol_specific_config() + if is_auth: + self.initial_auth_config() + if is_gateway: + self.initial_gateway_config() + if is_client_pool: + self.initial_pool_config() + + def getConfig(self, start, end="cli"): + """ + Return part of configuration from line + where the first injection of start keyword to the line + where the first injection of end keyowrd + :param start: start keyword + :type start: str + :param end: end keyword + :type end: str + :return: part of config + :rtype: str + """ + command = f'cat {self._config_file} | sed -n "/^\[{start}/,/^\[{end}/p"' + out = cmd(command) + return out def verify(self, conf): - self.assertEqual(conf['core']['thread-count'], str(get_half_cpus())) + self.assertEqual(conf["core"]["thread-count"], str(get_half_cpus())) def test_accel_name_servers(self): # Verify proper Name-Server configuration for IPv4 and IPv6 self.basic_config() - nameserver = ['192.0.2.1', '192.0.2.2', '2001:db8::1'] + nameserver = ["192.0.2.1", "192.0.2.2", "2001:db8::1"] for ns in nameserver: - self.set(['name-server', ns]) + self.set(["name-server", ns]) # commit changes self.cli_commit() # Validate configuration values - conf = ConfigParser(allow_no_value=True, delimiters='=') + conf = ConfigParser(allow_no_value=True, delimiters="=", strict=False) conf.read(self._config_file) # IPv4 and IPv6 nameservers must be checked individually for ns in nameserver: if is_ipv4(ns): - self.assertIn(ns, [conf['dns']['dns1'], conf['dns']['dns2']]) + self.assertIn(ns, [conf["dns"]["dns1"], conf["dns"]["dns2"]]) else: - self.assertEqual(conf['ipv6-dns'][ns], None) + self.assertEqual(conf["ipv6-dns"][ns], None) def test_accel_local_authentication(self): # Test configuration of local authentication self.basic_config() # upload / download limit - user = 'test' - password = 'test2' - static_ip = '100.100.100.101' - upload = '5000' - download = '10000' - - self.set(['authentication', 'local-users', 'username', user, 'password', password]) - self.set(['authentication', 'local-users', 'username', user, 'static-ip', static_ip]) - self.set(['authentication', 'local-users', 'username', user, 'rate-limit', 'upload', upload]) + user = "test" + password = "test2" + static_ip = "100.100.100.101" + upload = "5000" + download = "10000" + self.set( + [ + "authentication", + "local-users", + "username", + user, + "password", + password, + ] + ) + self.set( + [ + "authentication", + "local-users", + "username", + user, + "static-ip", + static_ip, + ] + ) + self.set( + [ + "authentication", + "local-users", + "username", + user, + "rate-limit", + "upload", + upload, + ] + ) # upload rate-limit requires also download rate-limit with self.assertRaises(ConfigSessionError): self.cli_commit() - self.set(['authentication', 'local-users', 'username', user, 'rate-limit', 'download', download]) + self.set( + [ + "authentication", + "local-users", + "username", + user, + "rate-limit", + "download", + download, + ] + ) # commit changes self.cli_commit() # Validate configuration values - conf = ConfigParser(allow_no_value=True, delimiters='=') + conf = ConfigParser(allow_no_value=True, delimiters="=", strict=False) conf.read(self._config_file) # check proper path to chap-secrets file - self.assertEqual(conf['chap-secrets']['chap-secrets'], self._chap_secrets) + self.assertEqual(conf["chap-secrets"]["chap-secrets"], self._chap_secrets) # basic verification self.verify(conf) # check local users - tmp = cmd(f'sudo cat {self._chap_secrets}') - regex = f'{user}\s+\*\s+{password}\s+{static_ip}\s+{download}/{upload}' + tmp = cmd(f"sudo cat {self._chap_secrets}") + regex = f"{user}\s+\*\s+{password}\s+{static_ip}\s+{download}/{upload}" tmp = re.findall(regex, tmp) self.assertTrue(tmp) # Check local-users default value(s) - self.delete(['authentication', 'local-users', 'username', user, 'static-ip']) + self.delete( + ["authentication", "local-users", "username", user, "static-ip"] + ) # commit changes self.cli_commit() # check local users - tmp = cmd(f'sudo cat {self._chap_secrets}') - regex = f'{user}\s+\*\s+{password}\s+\*\s+{download}/{upload}' + tmp = cmd(f"sudo cat {self._chap_secrets}") + regex = f"{user}\s+\*\s+{password}\s+\*\s+{download}/{upload}" tmp = re.findall(regex, tmp) self.assertTrue(tmp) @@ -145,74 +249,195 @@ class BasicAccelPPPTest: # Test configuration of RADIUS authentication for PPPoE server self.basic_config() - radius_server = '192.0.2.22' - radius_key = 'secretVyOS' - radius_port = '2000' - radius_port_acc = '3000' - - self.set(['authentication', 'mode', 'radius']) - self.set(['authentication', 'radius', 'server', radius_server, 'key', radius_key]) - self.set(['authentication', 'radius', 'server', radius_server, 'port', radius_port]) - self.set(['authentication', 'radius', 'server', radius_server, 'acct-port', radius_port_acc]) - - coa_server = '4.4.4.4' - coa_key = 'testCoA' - self.set(['authentication', 'radius', 'dynamic-author', 'server', coa_server]) - self.set(['authentication', 'radius', 'dynamic-author', 'key', coa_key]) - - nas_id = 'VyOS-PPPoE' - nas_ip = '7.7.7.7' - self.set(['authentication', 'radius', 'nas-identifier', nas_id]) - self.set(['authentication', 'radius', 'nas-ip-address', nas_ip]) - - source_address = '1.2.3.4' - self.set(['authentication', 'radius', 'source-address', source_address]) + radius_server = "192.0.2.22" + radius_key = "secretVyOS" + radius_port = "2000" + radius_port_acc = "3000" + acct_interim_jitter = '10' + acct_interim_interval = '10' + acct_timeout = '30' + + self.set(["authentication", "mode", "radius"]) + self.set( + ["authentication", "radius", "server", radius_server, "key", radius_key] + ) + self.set( + [ + "authentication", + "radius", + "server", + radius_server, + "port", + radius_port, + ] + ) + self.set( + [ + "authentication", + "radius", + "server", + radius_server, + "acct-port", + radius_port_acc, + ] + ) + self.set( + [ + "authentication", + "radius", + "acct-interim-jitter", + acct_interim_jitter, + ] + ) + self.set( + [ + "authentication", + "radius", + "accounting-interim-interval", + acct_interim_interval, + ] + ) + self.set( + [ + "authentication", + "radius", + "acct-timeout", + acct_timeout, + ] + ) + + coa_server = "4.4.4.4" + coa_key = "testCoA" + self.set( + ["authentication", "radius", "dynamic-author", "server", coa_server] + ) + self.set(["authentication", "radius", "dynamic-author", "key", coa_key]) + + nas_id = "VyOS-PPPoE" + nas_ip = "7.7.7.7" + self.set(["authentication", "radius", "nas-identifier", nas_id]) + self.set(["authentication", "radius", "nas-ip-address", nas_ip]) + + source_address = "1.2.3.4" + self.set(["authentication", "radius", "source-address", source_address]) # commit changes self.cli_commit() # Validate configuration values - conf = ConfigParser(allow_no_value=True, delimiters='=') + conf = ConfigParser(allow_no_value=True, delimiters="=", strict=False) conf.read(self._config_file) # basic verification self.verify(conf) # check auth - self.assertTrue(conf['radius'].getboolean('verbose')) - self.assertEqual(conf['radius']['acct-timeout'], '3') - self.assertEqual(conf['radius']['timeout'], '3') - self.assertEqual(conf['radius']['max-try'], '3') - - self.assertEqual(conf['radius']['dae-server'], f'{coa_server}:1700,{coa_key}') - self.assertEqual(conf['radius']['nas-identifier'], nas_id) - self.assertEqual(conf['radius']['nas-ip-address'], nas_ip) - self.assertEqual(conf['radius']['bind'], source_address) - - server = conf['radius']['server'].split(',') + self.assertTrue(conf["radius"].getboolean("verbose")) + self.assertEqual(conf["radius"]["acct-timeout"], acct_timeout) + self.assertEqual(conf["radius"]["acct-interim-interval"], acct_interim_interval) + self.assertEqual(conf["radius"]["acct-interim-jitter"], acct_interim_jitter) + self.assertEqual(conf["radius"]["timeout"], "3") + self.assertEqual(conf["radius"]["max-try"], "3") + + self.assertEqual( + conf["radius"]["dae-server"], f"{coa_server}:1700,{coa_key}" + ) + self.assertEqual(conf["radius"]["nas-identifier"], nas_id) + self.assertEqual(conf["radius"]["nas-ip-address"], nas_ip) + self.assertEqual(conf["radius"]["bind"], source_address) + + server = conf["radius"]["server"].split(",") self.assertEqual(radius_server, server[0]) self.assertEqual(radius_key, server[1]) - self.assertEqual(f'auth-port={radius_port}', server[2]) - self.assertEqual(f'acct-port={radius_port_acc}', server[3]) - self.assertEqual(f'req-limit=0', server[4]) - self.assertEqual(f'fail-time=0', server[5]) + self.assertEqual(f"auth-port={radius_port}", server[2]) + self.assertEqual(f"acct-port={radius_port_acc}", server[3]) + self.assertEqual(f"req-limit=0", server[4]) + self.assertEqual(f"fail-time=0", server[5]) # # Disable Radius Accounting # - self.delete(['authentication', 'radius', 'server', radius_server, 'acct-port']) - self.set(['authentication', 'radius', 'server', radius_server, 'disable-accounting']) + self.delete( + ["authentication", "radius", "server", radius_server, "acct-port"] + ) + self.set( + [ + "authentication", + "radius", + "server", + radius_server, + "disable-accounting", + ] + ) # commit changes self.cli_commit() conf.read(self._config_file) - server = conf['radius']['server'].split(',') + server = conf["radius"]["server"].split(",") self.assertEqual(radius_server, server[0]) self.assertEqual(radius_key, server[1]) - self.assertEqual(f'auth-port={radius_port}', server[2]) - self.assertEqual(f'acct-port=0', server[3]) - self.assertEqual(f'req-limit=0', server[4]) - self.assertEqual(f'fail-time=0', server[5]) + self.assertEqual(f"auth-port={radius_port}", server[2]) + self.assertEqual(f"acct-port=0", server[3]) + self.assertEqual(f"req-limit=0", server[4]) + self.assertEqual(f"fail-time=0", server[5]) + + def test_accel_ipv4_pool(self): + self.basic_config(is_gateway=False, is_client_pool=False) + gateway = "192.0.2.1" + subnet = "172.16.0.0/24" + first_pool = "POOL1" + second_pool = "POOL2" + range = "192.0.2.10-192.0.2.20" + range_config = "192.0.2.10-20" + + self.set(["gateway-address", gateway]) + self.set(["client-ip-pool", first_pool, "range", subnet]) + self.set(["client-ip-pool", first_pool, "next-pool", second_pool]) + self.set(["client-ip-pool", second_pool, "range", range]) + self.set(["default-pool", first_pool]) + # commit changes + + self.cli_commit() + + # Validate configuration values + conf = ConfigParser(allow_no_value=True, delimiters="=", strict=False) + conf.read(self._config_file) + + self.assertEqual( + f"{first_pool},next={second_pool}", conf["ip-pool"][f"{subnet},name"] + ) + self.assertEqual(second_pool, conf["ip-pool"][f"{range_config},name"]) + self.assertEqual(gateway, conf["ip-pool"]["gw-ip-address"]) + self.assertEqual(first_pool, conf[self._protocol_section]["ip-pool"]) + + def test_accel_next_pool(self): + # T5099 required specific order + self.basic_config(is_gateway=False, is_client_pool=False) + + gateway = "192.0.2.1" + first_pool = "VyOS-pool1" + first_subnet = "192.0.2.0/25" + second_pool = "Vyos-pool2" + second_subnet = "203.0.113.0/25" + third_pool = "Vyos-pool3" + third_subnet = "198.51.100.0/24" + + self.set(["gateway-address", gateway]) + self.set(["client-ip-pool", first_pool, "range", first_subnet]) + self.set(["client-ip-pool", first_pool, "next-pool", second_pool]) + self.set(["client-ip-pool", second_pool, "range", second_subnet]) + self.set(["client-ip-pool", second_pool, "next-pool", third_pool]) + self.set(["client-ip-pool", third_pool, "range", third_subnet]) + + # commit changes + self.cli_commit() + + config = self.getConfig("ip-pool") + pool_config = f"""gw-ip-address={gateway} +{third_subnet},name={third_pool} +{second_subnet},name={second_pool},next={third_pool} +{first_subnet},name={first_pool},next={second_pool}""" + self.assertIn(pool_config, config) |