diff options
Diffstat (limited to 'smoketest/scripts')
-rw-r--r-- | smoketest/scripts/cli/base_vyostest_shim.py | 14 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_ha_vrrp.py | 3 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_service_ssh.py | 16 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_system_login.py | 46 |
4 files changed, 60 insertions, 19 deletions
diff --git a/smoketest/scripts/cli/base_vyostest_shim.py b/smoketest/scripts/cli/base_vyostest_shim.py index 7cfb53045..9415d6f44 100644 --- a/smoketest/scripts/cli/base_vyostest_shim.py +++ b/smoketest/scripts/cli/base_vyostest_shim.py @@ -14,6 +14,7 @@ import os import unittest +import paramiko from time import sleep from typing import Type @@ -87,6 +88,19 @@ class VyOSUnitTestSHIM: pprint.pprint(out) return out + @staticmethod + def ssh_send_cmd(command, username, password, hostname='localhost'): + """ SSH command execution helper """ + # Try to login via SSH + ssh_client = paramiko.SSHClient() + ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + ssh_client.connect(hostname=hostname, username=username, password=password) + _, stdout, stderr = ssh_client.exec_command(command) + output = stdout.read().decode().strip() + error = stderr.read().decode().strip() + ssh_client.close() + return output, error + # standard construction; typing suggestion: https://stackoverflow.com/a/70292317 def ignore_warning(warning: Type[Warning]): import warnings diff --git a/smoketest/scripts/cli/test_ha_vrrp.py b/smoketest/scripts/cli/test_ha_vrrp.py index 3a4de2d8d..03bdbdd61 100755 --- a/smoketest/scripts/cli/test_ha_vrrp.py +++ b/smoketest/scripts/cli/test_ha_vrrp.py @@ -96,6 +96,7 @@ class TestVRRP(VyOSUnitTestSHIM.TestCase): group_garp_master_delay = '12' group_garp_master_repeat = '13' group_garp_master_refresh = '14' + vrrp_version = '3' for group in groups: vlan_id = group.lstrip('VLAN') @@ -133,6 +134,7 @@ class TestVRRP(VyOSUnitTestSHIM.TestCase): self.cli_set(global_param_base + ['garp', 'master-repeat', f'{garp_master_repeat}']) self.cli_set(global_param_base + ['garp', 'master-refresh', f'{garp_master_refresh}']) self.cli_set(global_param_base + ['garp', 'master-refresh-repeat', f'{garp_master_refresh_repeat}']) + self.cli_set(global_param_base + ['version', vrrp_version]) # commit changes self.cli_commit() @@ -145,6 +147,7 @@ class TestVRRP(VyOSUnitTestSHIM.TestCase): self.assertIn(f'vrrp_garp_master_repeat {garp_master_repeat}', config) self.assertIn(f'vrrp_garp_master_refresh {garp_master_refresh}', config) self.assertIn(f'vrrp_garp_master_refresh_repeat {garp_master_refresh_repeat}', config) + self.assertIn(f'vrrp_version {vrrp_version}', config) for group in groups: vlan_id = group.lstrip('VLAN') diff --git a/smoketest/scripts/cli/test_service_ssh.py b/smoketest/scripts/cli/test_service_ssh.py index 8de98f34f..e03907dd8 100755 --- a/smoketest/scripts/cli/test_service_ssh.py +++ b/smoketest/scripts/cli/test_service_ssh.py @@ -174,18 +174,6 @@ class TestServiceSSH(VyOSUnitTestSHIM.TestCase): # # We also try to login as an invalid user - this is not allowed to work. - def ssh_send_cmd(command, username, password, host='localhost'): - """ SSH command execution helper """ - # Try to login via SSH - ssh_client = paramiko.SSHClient() - ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - ssh_client.connect(hostname='localhost', username=username, password=password) - _, stdout, stderr = ssh_client.exec_command(command) - output = stdout.read().decode().strip() - error = stderr.read().decode().strip() - ssh_client.close() - return output, error - test_user = 'ssh_test' test_pass = 'v2i57DZs8idUwMN3VC92' test_command = 'uname -a' @@ -197,14 +185,14 @@ class TestServiceSSH(VyOSUnitTestSHIM.TestCase): self.cli_commit() # Login with proper credentials - output, error = ssh_send_cmd(test_command, test_user, test_pass) + output, error = self.ssh_send_cmd(test_command, test_user, test_pass) # verify login self.assertFalse(error) self.assertEqual(output, cmd(test_command)) # Login with invalid credentials with self.assertRaises(paramiko.ssh_exception.AuthenticationException): - output, error = ssh_send_cmd(test_command, 'invalid_user', 'invalid_password') + output, error = self.ssh_send_cmd(test_command, 'invalid_user', 'invalid_password') self.cli_delete(['system', 'login', 'user', test_user]) self.cli_commit() diff --git a/smoketest/scripts/cli/test_system_login.py b/smoketest/scripts/cli/test_system_login.py index a1d2ba2ad..8a4f5fdd1 100755 --- a/smoketest/scripts/cli/test_system_login.py +++ b/smoketest/scripts/cli/test_system_login.py @@ -17,13 +17,13 @@ import re import platform import unittest +import paramiko from base_vyostest_shim import VyOSUnitTestSHIM -from distutils.version import LooseVersion -from platform import release as kernel_version from subprocess import Popen, PIPE from pwd import getpwall +from time import sleep from vyos.configsession import ConfigSessionError from vyos.util import cmd @@ -53,12 +53,16 @@ class TestSystemLogin(VyOSUnitTestSHIM.TestCase): # ensure we can also run this test on a live system - so lets clean # out the current configuration which will break this test cls.cli_delete(cls, base_path + ['radius']) + cls.cli_delete(cls, base_path + ['tacacs']) def tearDown(self): # Delete individual users from configuration for user in users: self.cli_delete(base_path + ['user', user]) + self.cli_delete(base_path + ['radius']) + self.cli_delete(base_path + ['tacacs']) + self.cli_commit() # After deletion, a user is not allowed to remain in /etc/passwd @@ -149,9 +153,6 @@ class TestSystemLogin(VyOSUnitTestSHIM.TestCase): # T2886 - RADIUS authentication - check for statically compiled options options = ['CONFIG_AUDIT', 'CONFIG_AUDITSYSCALL', 'CONFIG_AUDIT_ARCH'] - if LooseVersion(kernel_version()) < LooseVersion('5.0'): - options.append('CONFIG_AUDIT_WATCH') - options.append('CONFIG_AUDIT_TREE') for option in options: self.assertIn(f'{option}=y', kernel_config) @@ -284,6 +285,41 @@ class TestSystemLogin(VyOSUnitTestSHIM.TestCase): self.cli_delete(base_path + ['timeout']) self.cli_delete(base_path + ['max-login-session']) + def test_system_login_tacacs(self): + tacacs_secret = 'tac_plus_key' + tacacs_servers = ['100.64.0.11', '100.64.0.12'] + + # Enable TACACS + for server in tacacs_servers: + self.cli_set(base_path + ['tacacs', 'server', server, 'key', tacacs_secret]) + + self.cli_commit() + + # NSS + nsswitch_conf = read_file('/etc/nsswitch.conf') + tmp = re.findall(r'passwd:\s+tacplus\s+files', nsswitch_conf) + self.assertTrue(tmp) + + tmp = re.findall(r'group:\s+tacplus\s+files', nsswitch_conf) + self.assertTrue(tmp) + + # PAM TACACS configuration + pam_tacacs_conf = read_file('/etc/tacplus_servers') + # NSS TACACS configuration + nss_tacacs_conf = read_file('/etc/tacplus_nss.conf') + # Users have individual home directories + self.assertIn('user_homedir=1', pam_tacacs_conf) + + # specify services + self.assertIn('service=shell', pam_tacacs_conf) + self.assertIn('protocol=ssh', pam_tacacs_conf) + + for server in tacacs_servers: + self.assertIn(f'secret={tacacs_secret}', pam_tacacs_conf) + self.assertIn(f'server={server}', pam_tacacs_conf) + + self.assertIn(f'secret={tacacs_secret}', nss_tacacs_conf) + self.assertIn(f'server={server}', nss_tacacs_conf) if __name__ == '__main__': unittest.main(verbosity=2) |