#!/usr/bin/env python3 # # Copyright (C) 2019-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import re import platform import unittest import paramiko from base_vyostest_shim import VyOSUnitTestSHIM from subprocess import Popen, PIPE from pwd import getpwall from time import sleep from vyos.configsession import ConfigSessionError from vyos.utils.process import cmd from vyos.utils.file import read_file from vyos.template import inc_ip base_path = ['system', 'login'] users = ['vyos1', 'vyos-roxx123', 'VyOS-123_super.Nice'] ssh_pubkey = """ AAAAB3NzaC1yc2EAAAADAQABAAABgQD0NuhUOEtMIKnUVFIHoFatqX/c4mjerXyF TlXYfVt6Ls2NZZsUSwHbnhK4BKDrPvVZMW/LycjQPzWW6TGtk6UbZP1WqdviQ9hP jsEeKJSTKciMSvQpjBWyEQQPXSKYQC7ryQQilZDqnJgzqwzejKEe+nhhOdBvjuZc uukxjT69E0UmWAwLxzvfiurwiQaC7tG+PwqvtfHOPL3i6yRO2C5ORpFarx8PeGDS IfIXJCr3LoUbLHeuE7T2KaOKQcX0UsWJ4CoCapRLpTVYPDB32BYfgq7cW1Sal1re EGH2PzuXBklinTBgCHA87lHjpwDIAqdmvMj7SXIW9LxazLtP+e37sexE7xEs0cpN l68txdDbY2P2Kbz5mqGFfCvBYKv9V2clM5vyWNy/Xp5TsCis89nn83KJmgFS7sMx pHJz8umqkxy3hfw0K7BRFtjWd63sbOP8Q/SDV7LPaIfIxenA9zv2rY7y+AIqTmSr TTSb0X1zPGxPIRFy5GoGtO9Mm5h4OZk= """ class TestSystemLogin(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): super(TestSystemLogin, cls).setUpClass() # ensure we can also run this test on a live system - so lets clean # out the current configuration which will break this test cls.cli_delete(cls, base_path + ['radius']) cls.cli_delete(cls, base_path + ['tacacs']) def tearDown(self): # Delete individual users from configuration for user in users: self.cli_delete(base_path + ['user', user]) self.cli_delete(base_path + ['radius']) self.cli_delete(base_path + ['tacacs']) self.cli_commit() # After deletion, a user is not allowed to remain in /etc/passwd usernames = [x[0] for x in getpwall()] for user in users: self.assertNotIn(user, usernames) def test_add_linux_system_user(self): # We are not allowed to re-use a username already taken by the Linux # base system system_user = 'backup' self.cli_set(base_path + ['user', system_user, 'authentication', 'plaintext-password', system_user]) # check validate() - can not add username which exists on the Debian # base system (UID < 1000) with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(base_path + ['user', system_user]) def test_system_login_user(self): # Check if user can be created and we can SSH to localhost self.cli_set(['service', 'ssh', 'port', '22']) for user in users: name = "VyOS Roxx " + user home_dir = "/tmp/" + user self.cli_set(base_path + ['user', user, 'authentication', 'plaintext-password', user]) self.cli_set(base_path + ['user', user, 'full-name', 'VyOS Roxx']) self.cli_set(base_path + ['user', user, 'home-directory', home_dir]) self.cli_commit() for user in users: cmd = ['su','-', user] proc = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE) tmp = "{}\nuname -a".format(user) proc.stdin.write(tmp.encode()) proc.stdin.flush() (stdout, stderr) = proc.communicate() # stdout is something like this: # b'Linux LR1.wue3 5.10.61-amd64-vyos #1 SMP Fri Aug 27 08:55:46 UTC 2021 x86_64 GNU/Linux\n' self.assertTrue(len(stdout) > 40) def test_system_login_otp(self): otp_user = 'otp-test_user' otp_password = 'SuperTestPassword' otp_key = '76A3ZS6HFHBTOK2H4NDHTIVFPQ' self.cli_set(base_path + ['user', otp_user, 'authentication', 'plaintext-password', otp_password]) self.cli_set(base_path + ['user', otp_user, 'authentication', 'otp', 'key', otp_key]) self.cli_commit() # Check if OTP key was written properly tmp = cmd(f'sudo head -1 /home/{otp_user}/.google_authenticator') self.assertIn(otp_key, tmp) self.cli_delete(base_path + ['user', otp_user]) def test_system_user_ssh_key(self): ssh_user = 'ssh-test_user' public_keys = 'vyos_test@domain-foo.com' type = 'ssh-rsa' self.cli_set(base_path + ['user', ssh_user, 'authentication', 'public-keys', public_keys, 'key', ssh_pubkey.replace('\n','')]) # check validate() - missing type for public-key with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(base_path + ['user', ssh_user, 'authentication', 'public-keys', public_keys, 'type', type]) self.cli_commit() # Check that SSH key was written properly tmp = cmd(f'sudo cat /home/{ssh_user}/.ssh/authorized_keys') key = f'{type} ' + ssh_pubkey.replace('\n','') self.assertIn(key, tmp) self.cli_delete(base_path + ['user', ssh_user]) def test_radius_kernel_features(self): # T2886: RADIUS requires some Kernel options to be present kernel = platform.release() kernel_config = read_file(f'/boot/config-{kernel}') # T2886 - RADIUS authentication - check for statically compiled options options = ['CONFIG_AUDIT', 'CONFIG_AUDITSYSCALL', 'CONFIG_AUDIT_ARCH'] for option in options: self.assertIn(f'{option}=y', kernel_config) def test_system_login_radius_ipv4(self): # Verify generated RADIUS configuration files radius_key = 'VyOSsecretVyOS' radius_server = '172.16.100.10' radius_source = '127.0.0.1' radius_port = '2000' radius_timeout = '1' self.cli_set(base_path + ['radius', 'server', radius_server, 'key', radius_key]) self.cli_set(base_path + ['radius', 'server', radius_server, 'port', radius_port]) self.cli_set(base_path + ['radius', 'server', radius_server, 'timeout', radius_timeout]) self.cli_set(base_path + ['radius', 'source-address', radius_source]) self.cli_set(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)]) # check validate() - Only one IPv4 source-address supported with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)]) self.cli_commit() # this file must be read with higher permissions pam_radius_auth_conf = cmd('sudo cat /etc/pam_radius_auth.conf') tmp = re.findall(r'\n?{}:{}\s+{}\s+{}\s+{}'.format(radius_server, radius_port, radius_key, radius_timeout, radius_source), pam_radius_auth_conf) self.assertTrue(tmp) # required, static options self.assertIn('priv-lvl 15', pam_radius_auth_conf) self.assertIn('mapped_priv_user radius_priv_user', pam_radius_auth_conf) # PAM pam_common_account = read_file('/etc/pam.d/common-account') self.assertIn('pam_radius_auth.so', pam_common_account) pam_common_auth = read_file('/etc/pam.d/common-auth') self.assertIn('pam_radius_auth.so', pam_common_auth) pam_common_session = read_file('/etc/pam.d/common-session') self.assertIn('pam_radius_auth.so', pam_common_session) pam_common_session_noninteractive = read_file('/etc/pam.d/common-session-noninteractive') self.assertIn('pam_radius_auth.so', pam_common_session_noninteractive) # NSS nsswitch_conf = read_file('/etc/nsswitch.conf') tmp = re.findall(r'passwd:\s+mapuid\s+files\s+mapname', nsswitch_conf) self.assertTrue(tmp) tmp = re.findall(r'group:\s+mapname\s+files', nsswitch_conf) self.assertTrue(tmp) def test_system_login_radius_ipv6(self): # Verify generated RADIUS configuration files radius_key = 'VyOS-VyOS' radius_server = '2001:db8::1' radius_source = '::1' radius_port = '4000' radius_timeout = '4' self.cli_set(base_path + ['radius', 'server', radius_server, 'key', radius_key]) self.cli_set(base_path + ['radius', 'server', radius_server, 'port', radius_port]) self.cli_set(base_path + ['radius', 'server', radius_server, 'timeout', radius_timeout]) self.cli_set(base_path + ['radius', 'source-address', radius_source]) self.cli_set(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)]) # check validate() - Only one IPv4 source-address supported with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)]) self.cli_commit() # this file must be read with higher permissions pam_radius_auth_conf = cmd('sudo cat /etc/pam_radius_auth.conf') tmp = re.findall(r'\n?\[{}\]:{}\s+{}\s+{}\s+\[{}\]'.format(radius_server, radius_port, radius_key, radius_timeout, radius_source), pam_radius_auth_conf) self.assertTrue(tmp) # required, static options self.assertIn('priv-lvl 15', pam_radius_auth_conf) self.assertIn('mapped_priv_user radius_priv_user', pam_radius_auth_conf) # PAM pam_common_account = read_file('/etc/pam.d/common-account') self.assertIn('pam_radius_auth.so', pam_common_account) pam_common_auth = read_file('/etc/pam.d/common-auth') self.assertIn('pam_radius_auth.so', pam_common_auth) pam_common_session = read_file('/etc/pam.d/common-session') self.assertIn('pam_radius_auth.so', pam_common_session) pam_common_session_noninteractive = read_file('/etc/pam.d/common-session-noninteractive') self.assertIn('pam_radius_auth.so', pam_common_session_noninteractive) # NSS nsswitch_conf = read_file('/etc/nsswitch.conf') tmp = re.findall(r'passwd:\s+mapuid\s+files\s+mapname', nsswitch_conf) self.assertTrue(tmp) tmp = re.findall(r'group:\s+mapname\s+files', nsswitch_conf) self.assertTrue(tmp) def test_system_login_max_login_session(self): max_logins = '2' timeout = '600' self.cli_set(base_path + ['max-login-session', max_logins]) # 'max-login-session' must be only with 'timeout' option with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(base_path + ['timeout', timeout]) self.cli_commit() security_limits = read_file('/etc/security/limits.d/10-vyos.conf') self.assertIn(f'* - maxsyslogins {max_logins}', security_limits) self.cli_delete(base_path + ['timeout']) self.cli_delete(base_path + ['max-login-session']) def test_system_login_tacacs(self): tacacs_secret = 'tac_plus_key' tacacs_servers = ['100.64.0.11', '100.64.0.12'] # Enable TACACS for server in tacacs_servers: self.cli_set(base_path + ['tacacs', 'server', server, 'key', tacacs_secret]) self.cli_commit() # NSS nsswitch_conf = read_file('/etc/nsswitch.conf') tmp = re.findall(r'passwd:\s+tacplus\s+files', nsswitch_conf) self.assertTrue(tmp) tmp = re.findall(r'group:\s+tacplus\s+files', nsswitch_conf) self.assertTrue(tmp) # PAM TACACS configuration pam_tacacs_conf = read_file('/etc/tacplus_servers') # NSS TACACS configuration nss_tacacs_conf = read_file('/etc/tacplus_nss.conf') # Users have individual home directories self.assertIn('user_homedir=1', pam_tacacs_conf) # specify services self.assertIn('service=shell', pam_tacacs_conf) self.assertIn('protocol=ssh', pam_tacacs_conf) for server in tacacs_servers: self.assertIn(f'secret={tacacs_secret}', pam_tacacs_conf) self.assertIn(f'server={server}', pam_tacacs_conf) self.assertIn(f'secret={tacacs_secret}', nss_tacacs_conf) self.assertIn(f'server={server}', nss_tacacs_conf) if __name__ == '__main__': unittest.main(verbosity=2)