#!/usr/bin/env python3 # # Copyright (C) 2020-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import os import unittest from base_accel_ppp_test import BasicAccelPPPTest from configparser import ConfigParser from vyos.configsession import ConfigSessionError from vyos.util import process_named_running local_if = ['interfaces', 'dummy', 'dum667'] ac_name = 'ACN' interface = 'eth0' class TestServicePPPoEServer(BasicAccelPPPTest.TestCase): def setUp(self): self._base_path = ['service', 'pppoe-server'] self._process_name = 'accel-pppd' self._config_file = '/run/accel-pppd/pppoe.conf' self._chap_secrets = '/run/accel-pppd/pppoe.chap-secrets' super().setUp() def tearDown(self): self.cli_delete(local_if) super().tearDown() def verify(self, conf): mtu = '1492' # validate some common values in the configuration for tmp in ['log_syslog', 'pppoe', 'ippool', 'auth_mschap_v2', 'auth_mschap_v1', 'auth_chap_md5', 'auth_pap', 'shaper']: # Settings without values provide None self.assertEqual(conf['modules'][tmp], None) # check Access Concentrator setting self.assertTrue(conf['pppoe']['ac-name'] == ac_name) self.assertTrue(conf['pppoe'].getboolean('verbose')) self.assertTrue(conf['pppoe']['interface'], interface) # check ppp self.assertTrue(conf['ppp'].getboolean('verbose')) self.assertTrue(conf['ppp'].getboolean('check-ip')) self.assertEqual(conf['ppp']['mtu'], mtu) self.assertEqual(conf['ppp']['lcp-echo-interval'], '30') self.assertEqual(conf['ppp']['lcp-echo-timeout'], '0') self.assertEqual(conf['ppp']['lcp-echo-failure'], '3') super().verify(conf) def basic_config(self): self.cli_set(local_if + ['address', '192.0.2.1/32']) self.set(['access-concentrator', ac_name]) self.set(['interface', interface]) super().basic_config() def test_pppoe_server_ppp_options(self): # Test configuration of local authentication for PPPoE server self.basic_config() # other settings mppe = 'require' self.set(['ppp-options', 'ccp']) self.set(['ppp-options', 'mppe', mppe]) self.set(['limits', 'connection-limit', '20/min']) # min-mtu min_mtu = '1400' self.set(['ppp-options', 'min-mtu', min_mtu]) # mru mru = '9000' self.set(['ppp-options', 'mru', mru]) # interface-cache interface_cache = '128000' self.set(['ppp-options', 'interface-cache', interface_cache]) # commit changes self.cli_commit() # Validate configuration values conf = ConfigParser(allow_no_value=True, delimiters='=') conf.read(self._config_file) # basic verification self.verify(conf) self.assertEqual(conf['chap-secrets']['gw-ip-address'], self._gateway) # check ppp self.assertEqual(conf['ppp']['mppe'], mppe) self.assertEqual(conf['ppp']['min-mtu'], min_mtu) self.assertEqual(conf['ppp']['mru'], mru) self.assertTrue(conf['ppp'].getboolean('ccp')) # check other settings self.assertEqual(conf['connlimit']['limit'], '20/min') # check interface-cache self.assertEqual(conf['ppp']['unit-cache'], interface_cache) # Check for running process self.assertTrue(process_named_running(self._process_name)) def test_pppoe_server_authentication_protocols(self): # Test configuration of local authentication for PPPoE server self.basic_config() # explicitly test mschap-v2 - no special reason self.set( ['authentication', 'protocols', 'mschap-v2']) # commit changes self.cli_commit() # Validate configuration values conf = ConfigParser(allow_no_value=True) conf.read(self._config_file) self.assertEqual(conf['modules']['auth_mschap_v2'], None) # Check for running process self.assertTrue(process_named_running(self._process_name)) def test_pppoe_server_client_ip_pool(self): # Test configuration of IPv6 client pools self.basic_config() subnet = '172.18.0.0/24' fwmark = '223' limiter = 'htb' self.set(['client-ip-pool', 'subnet', subnet]) start = '192.0.2.10' stop = '192.0.2.20' stop_octet = stop.split('.')[3] start_stop = f'{start}-{stop_octet}' self.set(['client-ip-pool', 'start', start]) self.set(['client-ip-pool', 'stop', stop]) self.set(['shaper', 'fwmark', fwmark]) # commit changes self.cli_commit() # Validate configuration values conf = ConfigParser(allow_no_value=True) conf.read(self._config_file) # check configured subnet self.assertEqual(conf['ip-pool'][subnet], None) self.assertEqual(conf['ip-pool'][start_stop], None) self.assertEqual(conf['ip-pool']['gw-ip-address'], self._gateway) # Check for running process self.assertTrue(process_named_running(self._process_name)) def test_pppoe_server_client_ip_pool_name(self): # Test configuration of named client pools self.basic_config() subnet = '192.0.2.0/24' gateway = '192.0.2.1' pool = 'VYOS' subnet_name = f'{subnet},name' gw_ip_prefix = f'{gateway}/24' self.set(['client-ip-pool', 'name', pool, 'subnet', subnet]) self.set(['client-ip-pool', 'name', pool, 'gateway-address', gateway]) self.cli_delete(self._base_path + ['gateway-address']) # commit changes self.cli_commit() # Validate configuration values conf = ConfigParser(allow_no_value=True, delimiters='=') conf.read(self._config_file) # Validate configuration self.assertEqual(conf['ip-pool'][subnet_name], pool) self.assertEqual(conf['ip-pool']['gw-ip-address'], gateway) self.assertEqual(conf['pppoe']['ip-pool'], pool) self.assertEqual(conf['pppoe']['gw-ip-address'], gw_ip_prefix) def test_pppoe_server_client_ipv6_pool(self): # Test configuration of IPv6 client pools self.basic_config() # Enable IPv6 allow_ipv6 = 'allow' random = 'random' self.set(['ppp-options', 'ipv6', allow_ipv6]) self.set(['ppp-options', 'ipv6-intf-id', random]) self.set(['ppp-options', 'ipv6-accept-peer-intf-id']) self.set(['ppp-options', 'ipv6-peer-intf-id', random]) prefix = '2001:db8:ffff::/64' prefix_mask = '128' client_prefix = f'{prefix},{prefix_mask}' self.set(['client-ipv6-pool', 'prefix', prefix, 'mask', prefix_mask]) delegate_prefix = '2001:db8::/40' delegate_mask = '56' self.set(['client-ipv6-pool', 'delegate', delegate_prefix, 'delegation-prefix', delegate_mask]) # commit changes self.cli_commit() # Validate configuration values conf = ConfigParser(allow_no_value=True, delimiters='=') conf.read(self._config_file) for tmp in ['ipv6pool', 'ipv6_nd', 'ipv6_dhcp']: self.assertEqual(conf['modules'][tmp], None) self.assertEqual(conf['ppp']['ipv6'], allow_ipv6) self.assertEqual(conf['ppp']['ipv6-intf-id'], random) self.assertEqual(conf['ppp']['ipv6-peer-intf-id'], random) self.assertTrue(conf['ppp'].getboolean('ipv6-accept-peer-intf-id')) self.assertEqual(conf['ipv6-pool'][client_prefix], None) self.assertEqual(conf['ipv6-pool']['delegate'], f'{delegate_prefix},{delegate_mask}') # Check for running process self.assertTrue(process_named_running(self._process_name)) def test_accel_radius_authentication(self): radius_called_sid = 'ifname:mac' radius_acct_interim_jitter = '9' self.set(['authentication', 'radius', 'called-sid-format', radius_called_sid]) self.set(['authentication', 'radius', 'acct-interim-jitter', radius_acct_interim_jitter]) # run common tests super().test_accel_radius_authentication() # Validate configuration values conf = ConfigParser(allow_no_value=True, delimiters='=') conf.read(self._config_file) # Validate configuration self.assertEqual(conf['pppoe']['called-sid'], radius_called_sid) self.assertEqual(conf['radius']['acct-interim-jitter'], radius_acct_interim_jitter) if __name__ == '__main__': unittest.main(verbosity=2)