diff options
Diffstat (limited to 'smoketest/scripts/cli/test_system_login.py')
| -rwxr-xr-x | smoketest/scripts/cli/test_system_login.py | 354 |
1 files changed, 279 insertions, 75 deletions
diff --git a/smoketest/scripts/cli/test_system_login.py b/smoketest/scripts/cli/test_system_login.py index 28abba012..71dec68d8 100755 --- a/smoketest/scripts/cli/test_system_login.py +++ b/smoketest/scripts/cli/test_system_login.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2019-2024 VyOS maintainers and contributors +# Copyright (C) 2019-2025 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -14,23 +14,37 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +import os import re import unittest +import jinja2 +import secrets +import string +import paramiko +import shutil from base_vyostest_shim import VyOSUnitTestSHIM from gzip import GzipFile -from subprocess import Popen, PIPE +from subprocess import Popen +from subprocess import PIPE from pwd import getpwall from vyos.configsession import ConfigSessionError +from vyos.configquery import ConfigTreeQuery from vyos.utils.auth import get_current_user from vyos.utils.process import cmd from vyos.utils.file import read_file +from vyos.utils.file import write_file from vyos.template import inc_ip +from vyos.template import is_ipv6 +from vyos.xml_ref import default_value base_path = ['system', 'login'] users = ['vyos1', 'vyos-roxx123', 'VyOS-123_super.Nice'] +weak_passwd_user = ['test_user', 'passWord1'] + +ssh_test_command = '/opt/vyatta/bin/vyatta-op-cmd-wrapper show version' ssh_pubkey = """ AAAAB3NzaC1yc2EAAAADAQABAAABgQD0NuhUOEtMIKnUVFIHoFatqX/c4mjerXyF @@ -44,6 +58,71 @@ pHJz8umqkxy3hfw0K7BRFtjWd63sbOP8Q/SDV7LPaIfIxenA9zv2rY7y+AIqTmSr TTSb0X1zPGxPIRFy5GoGtO9Mm5h4OZk= """ +tac_image = 'docker.io/lfkeitel/tacacs_plus:alpine' +tac_image_path = '/usr/share/vyos/tacplus-alpine.tar' +TAC_PLUS_TMPL_SRC = """ +id = spawnd { + debug redirect = /dev/stdout + listen = { port = 49 } + spawn = { + instances min = 1 + instances max = 10 + } + background = no +} + +id = tac_plus { + debug = ALL + log = stdout { + destination = /dev/stdout + } + authorization log group = yes + authentication log = stdout + authorization log = stdout + accounting log = stdout + + host = smoketest { + address = {{ source_address }}/32 + enable = clear enable + key = {{ tacacs_secret }} + } + + group = admin { + default service = permit + enable = permit + service = shell { + default command = permit + default attribute = permit + set priv-lvl = 15 + } + } + + user = {{ username }} { + password = clear {{ password }} + member = admin + } +} + +""" + +radius_image = 'docker.io/dchidell/radius-web:latest' +radius_image_path = '/usr/share/vyos/radius-latest.tar' +RADIUS_CLIENTS_TMPL_SRC = """ +client SMOKETEST { + secret = {{ radius_key }} + nastype = other + ipaddr = {{ source_address }} +} + +""" +RADIUS_USERS_TMPL_SRC = """ +# User configuration +{{ username }} Cleartext-Password := "{{ password }}" + Service-Type = NAS-Prompt-User, + Cisco-AVPair = "shell:priv-lvl=15" + +""" + class TestSystemLogin(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): @@ -54,6 +133,37 @@ class TestSystemLogin(VyOSUnitTestSHIM.TestCase): cls.cli_delete(cls, base_path + ['radius']) cls.cli_delete(cls, base_path + ['tacacs']) + # Load images for smoketest provided in vyos-1x-smoketest + if not os.path.exists(tac_image_path): + cls.fail(cls, f'{tac_image} image not available') + cmd(f'sudo podman load -i {tac_image_path}') + + if not os.path.exists(radius_image_path): + cls.fail(cls, f'{radius_image} image not available') + cmd(f'sudo podman load -i {radius_image_path}') + + cls.ssh_test_command_result = cls.op_mode(cls, ['show', 'version']) + + # Dynamically start SSH service if it's not running + config = ConfigTreeQuery() + cls.is_sshd_pre_test = config.exists(['service', 'sshd']) + if not cls.is_sshd_pre_test: + # Start SSH service + cls.cli_set(cls, ['service', 'ssh']) + + @classmethod + def tearDownClass(cls): + # Stop SSH service - if it was not running before starting the test + if not cls.is_sshd_pre_test: + cls.cli_set(cls, ['service', 'ssh']) + cls.cli_commit(cls) + + super(TestSystemLogin, cls).tearDownClass() + + # Cleanup container images + cmd(f'sudo podman image rm -f {tac_image}') + cmd(f'sudo podman image rm -f {radius_image}') + def tearDown(self): # Delete individual users from configuration for user in users: @@ -83,29 +193,28 @@ class TestSystemLogin(VyOSUnitTestSHIM.TestCase): self.cli_delete(base_path + ['user', system_user]) def test_system_login_user(self): - # Check if user can be created and we can SSH to localhost - self.cli_set(['service', 'ssh', 'port', '22']) - for user in users: - name = "VyOS Roxx " + user - home_dir = "/tmp/" + user + name = f'VyOS Roxx {user}' + passwd = f'{user}-pSWd-t3st' + home_dir = f'/tmp/smoketest/{user}' - self.cli_set(base_path + ['user', user, 'authentication', 'plaintext-password', user]) - self.cli_set(base_path + ['user', user, 'full-name', 'VyOS Roxx']) + self.cli_set(base_path + ['user', user, 'authentication', 'plaintext-password', passwd]) + self.cli_set(base_path + ['user', user, 'full-name', name]) self.cli_set(base_path + ['user', user, 'home-directory', home_dir]) self.cli_commit() for user in users: + passwd = f'{user}-pSWd-t3st' tmp = ['su','-', user] proc = Popen(tmp, stdin=PIPE, stdout=PIPE, stderr=PIPE) - tmp = "{}\nuname -a".format(user) + tmp = f'{passwd}\nuname -a' proc.stdin.write(tmp.encode()) proc.stdin.flush() (stdout, stderr) = proc.communicate() # stdout is something like this: - # b'Linux LR1.wue3 5.10.61-amd64-vyos #1 SMP Fri Aug 27 08:55:46 UTC 2021 x86_64 GNU/Linux\n' + # b'Linux vyos 6.6.66-vyos 6.6.66-vyos #1 SMP Mon Dec 30 19:05:15 UTC 2024 x86_64 GNU/Linux\n' self.assertTrue(len(stdout) > 40) locked_user = users[0] @@ -123,6 +232,16 @@ class TestSystemLogin(VyOSUnitTestSHIM.TestCase): tmp = cmd(f'sudo passwd -S {locked_user}') self.assertIn(f'{locked_user} P ', tmp) + def test_system_login_weak_password_warning(self): + self.cli_set(base_path + [ + 'user', weak_passwd_user[0], 'authentication', + 'plaintext-password', weak_passwd_user[1] + ]) + + out = self.cli_commit().strip() + + self.assertIn('WARNING: The password complexity is too low', out) + self.cli_delete(base_path + ['user', weak_passwd_user[0]]) def test_system_login_otp(self): otp_user = 'otp-test_user' @@ -172,17 +291,71 @@ class TestSystemLogin(VyOSUnitTestSHIM.TestCase): self.assertIn(f'{option}=y', kernel_config) def test_system_login_radius_ipv4(self): - # Verify generated RADIUS configuration files + radius_servers = ['100.64.0.4', '100.64.0.5'] + radius_source = '100.64.0.1' + self._system_login_radius_test_helper(radius_servers, radius_source) - radius_key = 'VyOSsecretVyOS' - radius_server = '172.16.100.10' - radius_source = '127.0.0.1' - radius_port = '2000' - radius_timeout = '1' + def test_system_login_radius_ipv6(self): + radius_servers = ['2001:db8::4', '2001:db8::5'] + radius_source = '2001:db8::1' + self._system_login_radius_test_helper(radius_servers, radius_source) + + def _system_login_radius_test_helper(self, radius_servers: list, radius_source: str): + # Verify generated RADIUS configuration files + radius_key = ''.join(secrets.choice(string.ascii_letters + string.digits) for i in range(10)) + + default_port = default_value(base_path + ['radius', 'server', radius_servers[0], 'port']) + default_timeout = default_value(base_path + ['radius', 'server', radius_servers[0], 'timeout']) + + dummy_if = 'dum12760' + + # Load container image for FreeRADIUS server + radius_config = '/tmp/smoketest-radius-server' + radius_container_path = ['container', 'name', 'radius-1'] + + # Generate random string with 10 digits + username = 'radius-admin' + password = ''.join(secrets.choice(string.ascii_letters + string.digits) for i in range(10)) + radius_source_mask = '32' + if is_ipv6(radius_source): + radius_source_mask = '128' + radius_test_user = { + 'username' : username, + 'password' : password, + 'radius_key' : radius_key, + 'source_address' : f'{radius_source}/{radius_source_mask}' + } + + tmpl = jinja2.Template(RADIUS_CLIENTS_TMPL_SRC) + write_file(f'{radius_config}/clients.cfg', tmpl.render(radius_test_user)) + + tmpl = jinja2.Template(RADIUS_USERS_TMPL_SRC) + write_file(f'{radius_config}/users', tmpl.render(radius_test_user)) + + # Start tac_plus container + self.cli_set(radius_container_path + ['allow-host-networks']) + self.cli_set(radius_container_path + ['image', radius_image]) + self.cli_set(radius_container_path + ['volume', 'clients', 'destination', '/etc/raddb/clients.conf']) + self.cli_set(radius_container_path + ['volume', 'clients', 'mode', 'ro']) + self.cli_set(radius_container_path + ['volume', 'clients', 'source', f'{radius_config}/clients.cfg']) + self.cli_set(radius_container_path + ['volume', 'users', 'destination', '/etc/raddb/users']) + self.cli_set(radius_container_path + ['volume', 'users', 'mode', 'ro']) + self.cli_set(radius_container_path + ['volume', 'users', 'source', f'{radius_config}/users']) + + # Start container + self.cli_commit() - self.cli_set(base_path + ['radius', 'server', radius_server, 'key', radius_key]) - self.cli_set(base_path + ['radius', 'server', radius_server, 'port', radius_port]) - self.cli_set(base_path + ['radius', 'server', radius_server, 'timeout', radius_timeout]) + # Deinfine RADIUS servers + for radius_server in radius_servers: + # Use this system as "remote" RADIUS server + dummy_address_mask = '32' + if is_ipv6(radius_server): + dummy_address_mask = '128' + self.cli_set(['interfaces', 'dummy', dummy_if, 'address', f'{radius_server}/{dummy_address_mask}']) + self.cli_set(base_path + ['radius', 'server', radius_server, 'key', radius_key]) + + # Define RADIUS traffic source address + self.cli_set(['interfaces', 'dummy', dummy_if, 'address', f'{radius_source}/{radius_source_mask}']) self.cli_set(base_path + ['radius', 'source-address', radius_source]) self.cli_set(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)]) @@ -195,10 +368,13 @@ class TestSystemLogin(VyOSUnitTestSHIM.TestCase): # this file must be read with higher permissions pam_radius_auth_conf = cmd('sudo cat /etc/pam_radius_auth.conf') - tmp = re.findall(r'\n?{}:{}\s+{}\s+{}\s+{}'.format(radius_server, - radius_port, radius_key, radius_timeout, - radius_source), pam_radius_auth_conf) - self.assertTrue(tmp) + + for radius_server in radius_servers: + if is_ipv6(radius_server): + # it is essential to escape the [] brackets when searching with a regex + radius_server = rf'\[{radius_server}\]' + tmp = re.findall(rf'\n?{radius_server}:{default_port}\s+{radius_key}\s+{default_timeout}\s+{radius_source}', pam_radius_auth_conf) + self.assertTrue(tmp) # required, static options self.assertIn('priv-lvl 15', pam_radius_auth_conf) @@ -225,59 +401,26 @@ class TestSystemLogin(VyOSUnitTestSHIM.TestCase): tmp = re.findall(r'group:\s+mapname\s+files', nsswitch_conf) self.assertTrue(tmp) - def test_system_login_radius_ipv6(self): - # Verify generated RADIUS configuration files + # Login with proper credentials + out, err = self.ssh_send_cmd(ssh_test_command, username, password) + # verify login + self.assertFalse(err) + self.assertEqual(out, self.ssh_test_command_result) - radius_key = 'VyOS-VyOS' - radius_server = '2001:db8::1' - radius_source = '::1' - radius_port = '4000' - radius_timeout = '4' - - self.cli_set(base_path + ['radius', 'server', radius_server, 'key', radius_key]) - self.cli_set(base_path + ['radius', 'server', radius_server, 'port', radius_port]) - self.cli_set(base_path + ['radius', 'server', radius_server, 'timeout', radius_timeout]) - self.cli_set(base_path + ['radius', 'source-address', radius_source]) - self.cli_set(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)]) - - # check validate() - Only one IPv4 source-address supported - with self.assertRaises(ConfigSessionError): - self.cli_commit() - self.cli_delete(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)]) + # Login with invalid credentials + with self.assertRaises(paramiko.ssh_exception.AuthenticationException): + _, _ = self.ssh_send_cmd(ssh_test_command, username, f'{password}1') + # Remove RADIUS configuration + self.cli_delete(base_path + ['radius']) + # Remove RADIUS container + self.cli_delete(radius_container_path) + # Remove dummy interface + self.cli_delete(['interfaces', 'dummy', dummy_if]) self.cli_commit() - # this file must be read with higher permissions - pam_radius_auth_conf = cmd('sudo cat /etc/pam_radius_auth.conf') - tmp = re.findall(r'\n?\[{}\]:{}\s+{}\s+{}\s+\[{}\]'.format(radius_server, - radius_port, radius_key, radius_timeout, - radius_source), pam_radius_auth_conf) - self.assertTrue(tmp) - - # required, static options - self.assertIn('priv-lvl 15', pam_radius_auth_conf) - self.assertIn('mapped_priv_user radius_priv_user', pam_radius_auth_conf) - - # PAM - pam_common_account = read_file('/etc/pam.d/common-account') - self.assertIn('pam_radius_auth.so', pam_common_account) - - pam_common_auth = read_file('/etc/pam.d/common-auth') - self.assertIn('pam_radius_auth.so', pam_common_auth) - - pam_common_session = read_file('/etc/pam.d/common-session') - self.assertIn('pam_radius_auth.so', pam_common_session) - - pam_common_session_noninteractive = read_file('/etc/pam.d/common-session-noninteractive') - self.assertIn('pam_radius_auth.so', pam_common_session_noninteractive) - - # NSS - nsswitch_conf = read_file('/etc/nsswitch.conf') - tmp = re.findall(r'passwd:\s+mapuid\s+files\s+mapname', nsswitch_conf) - self.assertTrue(tmp) - - tmp = re.findall(r'group:\s+mapname\s+files', nsswitch_conf) - self.assertTrue(tmp) + # Remove rendered tac_plus daemon configuration + shutil.rmtree(radius_config) def test_system_login_max_login_session(self): max_logins = '2' @@ -300,11 +443,46 @@ class TestSystemLogin(VyOSUnitTestSHIM.TestCase): self.cli_delete(base_path + ['max-login-session']) def test_system_login_tacacs(self): - tacacs_secret = 'tac_plus_key' + tacacs_secret = ''.join(secrets.choice(string.ascii_letters + string.digits) for i in range(10)) tacacs_servers = ['100.64.0.11', '100.64.0.12'] + source_address = '100.64.0.1' + dummy_if = 'dum12759' + + # Load container image for lac_plus daemon + tac_plus_config = '/tmp/smoketest-tacacs-server' + tac_container_path = ['container', 'name', 'tacacs-1'] + + # Generate random string with 10 digits + username = 'tactest' + password = ''.join(secrets.choice(string.ascii_letters + string.digits) for i in range(10)) + tac_test_user = { + 'username' : username, + 'password' : password, + 'tacacs_secret' : tacacs_secret, + 'source_address' : source_address, + } + + tmpl = jinja2.Template(TAC_PLUS_TMPL_SRC) + write_file(f'{tac_plus_config}/tac_plus.cfg', tmpl.render(tac_test_user)) + + # Start tac_plus container + self.cli_set(tac_container_path + ['allow-host-networks']) + self.cli_set(tac_container_path + ['image', tac_image]) + self.cli_set(tac_container_path + ['volume', 'config', 'destination', '/etc/tac_plus']) + self.cli_set(tac_container_path + ['volume', 'config', 'mode', 'ro']) + self.cli_set(tac_container_path + ['volume', 'config', 'source', tac_plus_config]) + + # Start container + self.cli_commit() + + # Define TACACS traffic source address + self.cli_set(['interfaces', 'dummy', dummy_if, 'address', f'{source_address}/32']) + self.cli_set(base_path + ['tacacs', 'source-address', source_address]) - # Enable TACACS + # Define TACACS servers for server in tacacs_servers: + # Use this system as "remote" TACACS server + self.cli_set(['interfaces', 'dummy', dummy_if, 'address', f'{server}/32']) self.cli_set(base_path + ['tacacs', 'server', server, 'key', tacacs_secret]) self.cli_commit() @@ -328,6 +506,11 @@ class TestSystemLogin(VyOSUnitTestSHIM.TestCase): self.assertIn('service=shell', pam_tacacs_conf) self.assertIn('protocol=ssh', pam_tacacs_conf) + # Verify configured TACACS source address + self.assertIn(f'source_ip={source_address}', pam_tacacs_conf) + self.assertIn(f'source_ip={source_address}', nss_tacacs_conf) + + # Verify configured TACACS servers for server in tacacs_servers: self.assertIn(f'secret={tacacs_secret}', pam_tacacs_conf) self.assertIn(f'server={server}', pam_tacacs_conf) @@ -335,6 +518,27 @@ class TestSystemLogin(VyOSUnitTestSHIM.TestCase): self.assertIn(f'secret={tacacs_secret}', nss_tacacs_conf) self.assertIn(f'server={server}', nss_tacacs_conf) + # Login with proper credentials + out, err = self.ssh_send_cmd(ssh_test_command, username, password) + # verify login + self.assertFalse(err) + self.assertEqual(out, self.ssh_test_command_result) + + # Login with invalid credentials + with self.assertRaises(paramiko.ssh_exception.AuthenticationException): + _, _ = self.ssh_send_cmd(ssh_test_command, username, f'{password}1') + + # Remove TACACS configuration + self.cli_delete(base_path + ['tacacs']) + # Remove tac_plus container + self.cli_delete(tac_container_path) + # Remove dummy interface + self.cli_delete(['interfaces', 'dummy', dummy_if]) + self.cli_commit() + + # Remove rendered tac_plus daemon configuration + shutil.rmtree(tac_plus_config) + def test_delete_current_user(self): current_user = get_current_user() |
