diff options
author | kumvijaya <kuvmijaya@gmail.com> | 2024-09-26 11:31:07 +0530 |
---|---|---|
committer | kumvijaya <kuvmijaya@gmail.com> | 2024-09-26 11:31:07 +0530 |
commit | a950059053f7394acfb453cc0d8194aa3dc721fa (patch) | |
tree | eb0acf278f649b5d1417e18e34d728efcd16e745 /smoketest/scripts/cli/test_service_ssh.py | |
parent | f0815f3e9b212f424f5adb0c572a71119ad4a8a0 (diff) | |
download | vyos-workflow-test-temp-a950059053f7394acfb453cc0d8194aa3dc721fa.tar.gz vyos-workflow-test-temp-a950059053f7394acfb453cc0d8194aa3dc721fa.zip |
T6732: added same as vyos 1x
Diffstat (limited to 'smoketest/scripts/cli/test_service_ssh.py')
-rw-r--r-- | smoketest/scripts/cli/test_service_ssh.py | 325 |
1 files changed, 325 insertions, 0 deletions
diff --git a/smoketest/scripts/cli/test_service_ssh.py b/smoketest/scripts/cli/test_service_ssh.py new file mode 100644 index 0000000..d8e325e --- /dev/null +++ b/smoketest/scripts/cli/test_service_ssh.py @@ -0,0 +1,325 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2019-2024 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import paramiko +import re +import unittest + +from pwd import getpwall + +from base_vyostest_shim import VyOSUnitTestSHIM + +from vyos.configsession import ConfigSessionError +from vyos.utils.process import cmd +from vyos.utils.process import is_systemd_service_running +from vyos.utils.process import process_named_running +from vyos.utils.file import read_file +from vyos.xml_ref import default_value + +PROCESS_NAME = 'sshd' +SSHD_CONF = '/run/sshd/sshd_config' +base_path = ['service', 'ssh'] + +key_rsa = '/etc/ssh/ssh_host_rsa_key' +key_dsa = '/etc/ssh/ssh_host_dsa_key' +key_ed25519 = '/etc/ssh/ssh_host_ed25519_key' + +def get_config_value(key): + tmp = read_file(SSHD_CONF) + tmp = re.findall(f'\n?{key}\s+(.*)', tmp) + return tmp + +class TestServiceSSH(VyOSUnitTestSHIM.TestCase): + @classmethod + def setUpClass(cls): + super(TestServiceSSH, cls).setUpClass() + + # ensure we can also run this test on a live system - so lets clean + # out the current configuration :) + cls.cli_delete(cls, base_path) + cls.cli_delete(cls, ['vrf']) + + def tearDown(self): + # Check for running process + self.assertTrue(process_named_running(PROCESS_NAME)) + + # delete testing SSH config + self.cli_delete(base_path) + self.cli_delete(['vrf']) + self.cli_commit() + + self.assertTrue(os.path.isfile(key_rsa)) + self.assertTrue(os.path.isfile(key_dsa)) + self.assertTrue(os.path.isfile(key_ed25519)) + + # Established SSH connections remains running after service is stopped. + # We can not use process_named_running here - we rather need to check + # that the systemd service is no longer running + self.assertFalse(is_systemd_service_running(PROCESS_NAME)) + + def test_ssh_default(self): + # Check if SSH service runs with default settings - used for checking + # behavior of <defaultValue> in XML definition + self.cli_set(base_path) + + # commit changes + self.cli_commit() + + # Check configured port agains CLI default value + port = get_config_value('Port') + cli_default = default_value(base_path + ['port']) + self.assertEqual(port, cli_default) + + def test_ssh_single_listen_address(self): + # Check if SSH service can be configured and runs + self.cli_set(base_path + ['port', '1234']) + self.cli_set(base_path + ['disable-host-validation']) + self.cli_set(base_path + ['disable-password-authentication']) + self.cli_set(base_path + ['loglevel', 'verbose']) + self.cli_set(base_path + ['client-keepalive-interval', '100']) + self.cli_set(base_path + ['listen-address', '127.0.0.1']) + + # commit changes + self.cli_commit() + + # Check configured port + port = get_config_value('Port')[0] + self.assertTrue("1234" in port) + + # Check DNS usage + dns = get_config_value('UseDNS')[0] + self.assertTrue("no" in dns) + + # Check PasswordAuthentication + pwd = get_config_value('PasswordAuthentication')[0] + self.assertTrue("no" in pwd) + + # Check loglevel + loglevel = get_config_value('LogLevel')[0] + self.assertTrue("VERBOSE" in loglevel) + + # Check listen address + address = get_config_value('ListenAddress')[0] + self.assertTrue("127.0.0.1" in address) + + # Check keepalive + keepalive = get_config_value('ClientAliveInterval')[0] + self.assertTrue("100" in keepalive) + + def test_ssh_multiple_listen_addresses(self): + # Check if SSH service can be configured and runs with multiple + # listen ports and listen-addresses + ports = ['22', '2222', '2223', '2224'] + for port in ports: + self.cli_set(base_path + ['port', port]) + + addresses = ['127.0.0.1', '::1'] + for address in addresses: + self.cli_set(base_path + ['listen-address', address]) + + # commit changes + self.cli_commit() + + # Check configured port + tmp = get_config_value('Port') + for port in ports: + self.assertIn(port, tmp) + + # Check listen address + tmp = get_config_value('ListenAddress') + for address in addresses: + self.assertIn(address, tmp) + + def test_ssh_vrf_single(self): + vrf = 'mgmt' + # Check if SSH service can be bound to given VRF + self.cli_set(base_path + ['vrf', vrf]) + + # VRF does yet not exist - an error must be thrown + with self.assertRaises(ConfigSessionError): + self.cli_commit() + + self.cli_set(['vrf', 'name', vrf, 'table', '1338']) + + # commit changes + self.cli_commit() + + # Check for process in VRF + tmp = cmd(f'ip vrf pids {vrf}') + self.assertIn(PROCESS_NAME, tmp) + + def test_ssh_vrf_multi(self): + # Check if SSH service can be bound to multiple VRFs + vrfs = ['red', 'blue', 'green'] + for vrf in vrfs: + self.cli_set(base_path + ['vrf', vrf]) + + # VRF does yet not exist - an error must be thrown + with self.assertRaises(ConfigSessionError): + self.cli_commit() + + table = 12345 + for vrf in vrfs: + self.cli_set(['vrf', 'name', vrf, 'table', str(table)]) + table += 1 + + # commit changes + self.cli_commit() + + # Check for process in VRF + for vrf in vrfs: + tmp = cmd(f'ip vrf pids {vrf}') + self.assertIn(PROCESS_NAME, tmp) + + def test_ssh_login(self): + # Perform SSH login and command execution with a predefined user. The + # result (output of uname -a) must match the output if the command is + # run natively. + # + # We also try to login as an invalid user - this is not allowed to work. + + test_user = 'ssh_test' + test_pass = 'v2i57DZs8idUwMN3VC92' + test_command = 'uname -a' + + self.cli_set(base_path) + self.cli_set(['system', 'login', 'user', test_user, 'authentication', 'plaintext-password', test_pass]) + + # commit changes + self.cli_commit() + + # Login with proper credentials + output, error = self.ssh_send_cmd(test_command, test_user, test_pass) + # verify login + self.assertFalse(error) + self.assertEqual(output, cmd(test_command)) + + # Login with invalid credentials + with self.assertRaises(paramiko.ssh_exception.AuthenticationException): + output, error = self.ssh_send_cmd(test_command, 'invalid_user', 'invalid_password') + + self.cli_delete(['system', 'login', 'user', test_user]) + self.cli_commit() + + # After deletion the test user is not allowed to remain in /etc/passwd + usernames = [x[0] for x in getpwall()] + self.assertNotIn(test_user, usernames) + + def test_ssh_dynamic_protection(self): + # check sshguard service + + SSHGUARD_CONFIG = '/etc/sshguard/sshguard.conf' + SSHGUARD_WHITELIST = '/etc/sshguard/whitelist' + SSHGUARD_PROCESS = 'sshguard' + block_time = '123' + detect_time = '1804' + port = '22' + threshold = '10' + allow_list = ['192.0.2.0/24', '2001:db8::/48'] + + self.cli_set(base_path + ['dynamic-protection', 'block-time', block_time]) + self.cli_set(base_path + ['dynamic-protection', 'detect-time', detect_time]) + self.cli_set(base_path + ['dynamic-protection', 'threshold', threshold]) + for allow in allow_list: + self.cli_set(base_path + ['dynamic-protection', 'allow-from', allow]) + + # commit changes + self.cli_commit() + + # Check configured port + tmp = get_config_value('Port') + self.assertIn(port, tmp) + + # Check sshgurad service + self.assertTrue(process_named_running(SSHGUARD_PROCESS)) + + sshguard_lines = [ + f'THRESHOLD={threshold}', + f'BLOCK_TIME={block_time}', + f'DETECTION_TIME={detect_time}' + ] + + tmp_sshguard_conf = read_file(SSHGUARD_CONFIG) + for line in sshguard_lines: + self.assertIn(line, tmp_sshguard_conf) + + tmp_whitelist_conf = read_file(SSHGUARD_WHITELIST) + for allow in allow_list: + self.assertIn(allow, tmp_whitelist_conf) + + # Delete service ssh dynamic-protection + # but not service ssh itself + self.cli_delete(base_path + ['dynamic-protection']) + self.cli_commit() + + self.assertFalse(process_named_running(SSHGUARD_PROCESS)) + + + # Network Device Collaborative Protection Profile + def test_ssh_ndcpp(self): + ciphers = ['aes128-cbc', 'aes128-ctr', 'aes256-cbc', 'aes256-ctr'] + host_key_algs = ['sk-ssh-ed25519@openssh.com', 'ssh-rsa', 'ssh-ed25519'] + kexes = ['diffie-hellman-group14-sha1', 'ecdh-sha2-nistp256', 'ecdh-sha2-nistp384', 'ecdh-sha2-nistp521'] + macs = ['hmac-sha1', 'hmac-sha2-256', 'hmac-sha2-512'] + rekey_time = '60' + rekey_data = '1024' + + for cipher in ciphers: + self.cli_set(base_path + ['ciphers', cipher]) + for host_key in host_key_algs: + self.cli_set(base_path + ['hostkey-algorithm', host_key]) + for kex in kexes: + self.cli_set(base_path + ['key-exchange', kex]) + for mac in macs: + self.cli_set(base_path + ['mac', mac]) + # Optional rekey parameters + self.cli_set(base_path + ['rekey', 'data', rekey_data]) + self.cli_set(base_path + ['rekey', 'time', rekey_time]) + + # commit changes + self.cli_commit() + + ssh_lines = ['Ciphers aes128-cbc,aes128-ctr,aes256-cbc,aes256-ctr', + 'HostKeyAlgorithms sk-ssh-ed25519@openssh.com,ssh-rsa,ssh-ed25519', + 'MACs hmac-sha1,hmac-sha2-256,hmac-sha2-512', + 'KexAlgorithms diffie-hellman-group14-sha1,ecdh-sha2-nistp256,ecdh-sha2-nistp384,ecdh-sha2-nistp521', + 'RekeyLimit 1024M 60M' + ] + tmp_sshd_conf = read_file(SSHD_CONF) + + for line in ssh_lines: + self.assertIn(line, tmp_sshd_conf) + + def test_ssh_pubkey_accepted_algorithm(self): + algs = ['ssh-ed25519', 'ecdsa-sha2-nistp256', 'ecdsa-sha2-nistp384', + 'ecdsa-sha2-nistp521', 'ssh-dss', 'ssh-rsa', 'rsa-sha2-256', + 'rsa-sha2-512' + ] + + expected = 'PubkeyAcceptedAlgorithms ' + for alg in algs: + self.cli_set(base_path + ['pubkey-accepted-algorithm', alg]) + expected = f'{expected}{alg},' + expected = expected[:-1] + + self.cli_commit() + tmp_sshd_conf = read_file(SSHD_CONF) + self.assertIn(expected, tmp_sshd_conf) + + +if __name__ == '__main__': + unittest.main(verbosity=2) |