summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xsmoketest/scripts/cli/test_service_ssh.py50
1 files changed, 50 insertions, 0 deletions
diff --git a/smoketest/scripts/cli/test_service_ssh.py b/smoketest/scripts/cli/test_service_ssh.py
index 6f58ce3d3..6d6c362a3 100755
--- a/smoketest/scripts/cli/test_service_ssh.py
+++ b/smoketest/scripts/cli/test_service_ssh.py
@@ -14,10 +14,14 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import os
+import paramiko
import re
import os
import unittest
+from pwd import getpwall
+
from base_vyostest_shim import VyOSUnitTestSHIM
from vyos.configsession import ConfigSessionError
@@ -156,5 +160,51 @@ class TestServiceSSH(VyOSUnitTestSHIM.TestCase):
# delete VRF
self.cli_delete(['vrf', 'name', vrf])
+ def test_ssh_login(self):
+ # Perform SSH login and command execution with a predefined user. The
+ # result (output of uname -a) must match the output if the command is
+ # run natively.
+ #
+ # We also try to login as an invalid user - this is not allowed to work.
+
+ def ssh_send_cmd(command, username, password, host='localhost'):
+ """ SSH command execution helper """
+ # Try to login via SSH
+ ssh_client = paramiko.SSHClient()
+ ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ ssh_client.connect(hostname='localhost', username=username, password=password)
+ _, stdout, stderr = ssh_client.exec_command(command)
+ output = stdout.read().decode().strip()
+ error = stderr.read().decode().strip()
+ ssh_client.close()
+ return output, error
+
+ test_user = 'ssh_test'
+ test_pass = 'v2i57DZs8idUwMN3VC92'
+ test_command = 'uname -a'
+
+ self.cli_set(base_path)
+ self.cli_set(['system', 'login', 'user', test_user, 'authentication', 'plaintext-password', test_pass])
+
+ # commit changes
+ self.cli_commit()
+
+ # Login with proper credentials
+ output, error = ssh_send_cmd(test_command, test_user, test_pass)
+ # verify login
+ self.assertFalse(error)
+ self.assertEqual(output, cmd(test_command))
+
+ # Login with invalid credentials
+ with self.assertRaises(paramiko.ssh_exception.AuthenticationException):
+ output, error = ssh_send_cmd(test_command, 'invalid_user', 'invalid_password')
+
+ self.cli_delete(['system', 'login', 'user', test_user])
+ self.cli_commit()
+
+ # After deletion the test user is not allowed to remain in /etc/passwd
+ usernames = [x[0] for x in getpwall()]
+ self.assertNotIn(test_user, usernames)
+
if __name__ == '__main__':
unittest.main(verbosity=2)