summaryrefslogtreecommitdiff
path: root/smoketest
diff options
context:
space:
mode:
Diffstat (limited to 'smoketest')
-rw-r--r--smoketest/scripts/cli/base_vyostest_shim.py14
-rwxr-xr-xsmoketest/scripts/cli/test_ha_vrrp.py3
-rwxr-xr-xsmoketest/scripts/cli/test_service_ssh.py16
-rwxr-xr-xsmoketest/scripts/cli/test_system_login.py46
4 files changed, 60 insertions, 19 deletions
diff --git a/smoketest/scripts/cli/base_vyostest_shim.py b/smoketest/scripts/cli/base_vyostest_shim.py
index 7cfb53045..9415d6f44 100644
--- a/smoketest/scripts/cli/base_vyostest_shim.py
+++ b/smoketest/scripts/cli/base_vyostest_shim.py
@@ -14,6 +14,7 @@
import os
import unittest
+import paramiko
from time import sleep
from typing import Type
@@ -87,6 +88,19 @@ class VyOSUnitTestSHIM:
pprint.pprint(out)
return out
+ @staticmethod
+ def ssh_send_cmd(command, username, password, hostname='localhost'):
+ """ SSH command execution helper """
+ # Try to login via SSH
+ ssh_client = paramiko.SSHClient()
+ ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ ssh_client.connect(hostname=hostname, username=username, password=password)
+ _, stdout, stderr = ssh_client.exec_command(command)
+ output = stdout.read().decode().strip()
+ error = stderr.read().decode().strip()
+ ssh_client.close()
+ return output, error
+
# standard construction; typing suggestion: https://stackoverflow.com/a/70292317
def ignore_warning(warning: Type[Warning]):
import warnings
diff --git a/smoketest/scripts/cli/test_ha_vrrp.py b/smoketest/scripts/cli/test_ha_vrrp.py
index 3a4de2d8d..03bdbdd61 100755
--- a/smoketest/scripts/cli/test_ha_vrrp.py
+++ b/smoketest/scripts/cli/test_ha_vrrp.py
@@ -96,6 +96,7 @@ class TestVRRP(VyOSUnitTestSHIM.TestCase):
group_garp_master_delay = '12'
group_garp_master_repeat = '13'
group_garp_master_refresh = '14'
+ vrrp_version = '3'
for group in groups:
vlan_id = group.lstrip('VLAN')
@@ -133,6 +134,7 @@ class TestVRRP(VyOSUnitTestSHIM.TestCase):
self.cli_set(global_param_base + ['garp', 'master-repeat', f'{garp_master_repeat}'])
self.cli_set(global_param_base + ['garp', 'master-refresh', f'{garp_master_refresh}'])
self.cli_set(global_param_base + ['garp', 'master-refresh-repeat', f'{garp_master_refresh_repeat}'])
+ self.cli_set(global_param_base + ['version', vrrp_version])
# commit changes
self.cli_commit()
@@ -145,6 +147,7 @@ class TestVRRP(VyOSUnitTestSHIM.TestCase):
self.assertIn(f'vrrp_garp_master_repeat {garp_master_repeat}', config)
self.assertIn(f'vrrp_garp_master_refresh {garp_master_refresh}', config)
self.assertIn(f'vrrp_garp_master_refresh_repeat {garp_master_refresh_repeat}', config)
+ self.assertIn(f'vrrp_version {vrrp_version}', config)
for group in groups:
vlan_id = group.lstrip('VLAN')
diff --git a/smoketest/scripts/cli/test_service_ssh.py b/smoketest/scripts/cli/test_service_ssh.py
index 8de98f34f..e03907dd8 100755
--- a/smoketest/scripts/cli/test_service_ssh.py
+++ b/smoketest/scripts/cli/test_service_ssh.py
@@ -174,18 +174,6 @@ class TestServiceSSH(VyOSUnitTestSHIM.TestCase):
#
# We also try to login as an invalid user - this is not allowed to work.
- def ssh_send_cmd(command, username, password, host='localhost'):
- """ SSH command execution helper """
- # Try to login via SSH
- ssh_client = paramiko.SSHClient()
- ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- ssh_client.connect(hostname='localhost', username=username, password=password)
- _, stdout, stderr = ssh_client.exec_command(command)
- output = stdout.read().decode().strip()
- error = stderr.read().decode().strip()
- ssh_client.close()
- return output, error
-
test_user = 'ssh_test'
test_pass = 'v2i57DZs8idUwMN3VC92'
test_command = 'uname -a'
@@ -197,14 +185,14 @@ class TestServiceSSH(VyOSUnitTestSHIM.TestCase):
self.cli_commit()
# Login with proper credentials
- output, error = ssh_send_cmd(test_command, test_user, test_pass)
+ output, error = self.ssh_send_cmd(test_command, test_user, test_pass)
# verify login
self.assertFalse(error)
self.assertEqual(output, cmd(test_command))
# Login with invalid credentials
with self.assertRaises(paramiko.ssh_exception.AuthenticationException):
- output, error = ssh_send_cmd(test_command, 'invalid_user', 'invalid_password')
+ output, error = self.ssh_send_cmd(test_command, 'invalid_user', 'invalid_password')
self.cli_delete(['system', 'login', 'user', test_user])
self.cli_commit()
diff --git a/smoketest/scripts/cli/test_system_login.py b/smoketest/scripts/cli/test_system_login.py
index a1d2ba2ad..8a4f5fdd1 100755
--- a/smoketest/scripts/cli/test_system_login.py
+++ b/smoketest/scripts/cli/test_system_login.py
@@ -17,13 +17,13 @@
import re
import platform
import unittest
+import paramiko
from base_vyostest_shim import VyOSUnitTestSHIM
-from distutils.version import LooseVersion
-from platform import release as kernel_version
from subprocess import Popen, PIPE
from pwd import getpwall
+from time import sleep
from vyos.configsession import ConfigSessionError
from vyos.util import cmd
@@ -53,12 +53,16 @@ class TestSystemLogin(VyOSUnitTestSHIM.TestCase):
# ensure we can also run this test on a live system - so lets clean
# out the current configuration which will break this test
cls.cli_delete(cls, base_path + ['radius'])
+ cls.cli_delete(cls, base_path + ['tacacs'])
def tearDown(self):
# Delete individual users from configuration
for user in users:
self.cli_delete(base_path + ['user', user])
+ self.cli_delete(base_path + ['radius'])
+ self.cli_delete(base_path + ['tacacs'])
+
self.cli_commit()
# After deletion, a user is not allowed to remain in /etc/passwd
@@ -149,9 +153,6 @@ class TestSystemLogin(VyOSUnitTestSHIM.TestCase):
# T2886 - RADIUS authentication - check for statically compiled options
options = ['CONFIG_AUDIT', 'CONFIG_AUDITSYSCALL', 'CONFIG_AUDIT_ARCH']
- if LooseVersion(kernel_version()) < LooseVersion('5.0'):
- options.append('CONFIG_AUDIT_WATCH')
- options.append('CONFIG_AUDIT_TREE')
for option in options:
self.assertIn(f'{option}=y', kernel_config)
@@ -284,6 +285,41 @@ class TestSystemLogin(VyOSUnitTestSHIM.TestCase):
self.cli_delete(base_path + ['timeout'])
self.cli_delete(base_path + ['max-login-session'])
+ def test_system_login_tacacs(self):
+ tacacs_secret = 'tac_plus_key'
+ tacacs_servers = ['100.64.0.11', '100.64.0.12']
+
+ # Enable TACACS
+ for server in tacacs_servers:
+ self.cli_set(base_path + ['tacacs', 'server', server, 'key', tacacs_secret])
+
+ self.cli_commit()
+
+ # NSS
+ nsswitch_conf = read_file('/etc/nsswitch.conf')
+ tmp = re.findall(r'passwd:\s+tacplus\s+files', nsswitch_conf)
+ self.assertTrue(tmp)
+
+ tmp = re.findall(r'group:\s+tacplus\s+files', nsswitch_conf)
+ self.assertTrue(tmp)
+
+ # PAM TACACS configuration
+ pam_tacacs_conf = read_file('/etc/tacplus_servers')
+ # NSS TACACS configuration
+ nss_tacacs_conf = read_file('/etc/tacplus_nss.conf')
+ # Users have individual home directories
+ self.assertIn('user_homedir=1', pam_tacacs_conf)
+
+ # specify services
+ self.assertIn('service=shell', pam_tacacs_conf)
+ self.assertIn('protocol=ssh', pam_tacacs_conf)
+
+ for server in tacacs_servers:
+ self.assertIn(f'secret={tacacs_secret}', pam_tacacs_conf)
+ self.assertIn(f'server={server}', pam_tacacs_conf)
+
+ self.assertIn(f'secret={tacacs_secret}', nss_tacacs_conf)
+ self.assertIn(f'server={server}', nss_tacacs_conf)
if __name__ == '__main__':
unittest.main(verbosity=2)