summaryrefslogtreecommitdiff
path: root/smoketest/scripts/cli/test_service_ssh.py
diff options
context:
space:
mode:
Diffstat (limited to 'smoketest/scripts/cli/test_service_ssh.py')
-rw-r--r--smoketest/scripts/cli/test_service_ssh.py325
1 files changed, 325 insertions, 0 deletions
diff --git a/smoketest/scripts/cli/test_service_ssh.py b/smoketest/scripts/cli/test_service_ssh.py
new file mode 100644
index 0000000..d8e325e
--- /dev/null
+++ b/smoketest/scripts/cli/test_service_ssh.py
@@ -0,0 +1,325 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 2019-2024 VyOS maintainers and contributors
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 or later as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import paramiko
+import re
+import unittest
+
+from pwd import getpwall
+
+from base_vyostest_shim import VyOSUnitTestSHIM
+
+from vyos.configsession import ConfigSessionError
+from vyos.utils.process import cmd
+from vyos.utils.process import is_systemd_service_running
+from vyos.utils.process import process_named_running
+from vyos.utils.file import read_file
+from vyos.xml_ref import default_value
+
+PROCESS_NAME = 'sshd'
+SSHD_CONF = '/run/sshd/sshd_config'
+base_path = ['service', 'ssh']
+
+key_rsa = '/etc/ssh/ssh_host_rsa_key'
+key_dsa = '/etc/ssh/ssh_host_dsa_key'
+key_ed25519 = '/etc/ssh/ssh_host_ed25519_key'
+
+def get_config_value(key):
+ tmp = read_file(SSHD_CONF)
+ tmp = re.findall(f'\n?{key}\s+(.*)', tmp)
+ return tmp
+
+class TestServiceSSH(VyOSUnitTestSHIM.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ super(TestServiceSSH, cls).setUpClass()
+
+ # ensure we can also run this test on a live system - so lets clean
+ # out the current configuration :)
+ cls.cli_delete(cls, base_path)
+ cls.cli_delete(cls, ['vrf'])
+
+ def tearDown(self):
+ # Check for running process
+ self.assertTrue(process_named_running(PROCESS_NAME))
+
+ # delete testing SSH config
+ self.cli_delete(base_path)
+ self.cli_delete(['vrf'])
+ self.cli_commit()
+
+ self.assertTrue(os.path.isfile(key_rsa))
+ self.assertTrue(os.path.isfile(key_dsa))
+ self.assertTrue(os.path.isfile(key_ed25519))
+
+ # Established SSH connections remains running after service is stopped.
+ # We can not use process_named_running here - we rather need to check
+ # that the systemd service is no longer running
+ self.assertFalse(is_systemd_service_running(PROCESS_NAME))
+
+ def test_ssh_default(self):
+ # Check if SSH service runs with default settings - used for checking
+ # behavior of <defaultValue> in XML definition
+ self.cli_set(base_path)
+
+ # commit changes
+ self.cli_commit()
+
+ # Check configured port agains CLI default value
+ port = get_config_value('Port')
+ cli_default = default_value(base_path + ['port'])
+ self.assertEqual(port, cli_default)
+
+ def test_ssh_single_listen_address(self):
+ # Check if SSH service can be configured and runs
+ self.cli_set(base_path + ['port', '1234'])
+ self.cli_set(base_path + ['disable-host-validation'])
+ self.cli_set(base_path + ['disable-password-authentication'])
+ self.cli_set(base_path + ['loglevel', 'verbose'])
+ self.cli_set(base_path + ['client-keepalive-interval', '100'])
+ self.cli_set(base_path + ['listen-address', '127.0.0.1'])
+
+ # commit changes
+ self.cli_commit()
+
+ # Check configured port
+ port = get_config_value('Port')[0]
+ self.assertTrue("1234" in port)
+
+ # Check DNS usage
+ dns = get_config_value('UseDNS')[0]
+ self.assertTrue("no" in dns)
+
+ # Check PasswordAuthentication
+ pwd = get_config_value('PasswordAuthentication')[0]
+ self.assertTrue("no" in pwd)
+
+ # Check loglevel
+ loglevel = get_config_value('LogLevel')[0]
+ self.assertTrue("VERBOSE" in loglevel)
+
+ # Check listen address
+ address = get_config_value('ListenAddress')[0]
+ self.assertTrue("127.0.0.1" in address)
+
+ # Check keepalive
+ keepalive = get_config_value('ClientAliveInterval')[0]
+ self.assertTrue("100" in keepalive)
+
+ def test_ssh_multiple_listen_addresses(self):
+ # Check if SSH service can be configured and runs with multiple
+ # listen ports and listen-addresses
+ ports = ['22', '2222', '2223', '2224']
+ for port in ports:
+ self.cli_set(base_path + ['port', port])
+
+ addresses = ['127.0.0.1', '::1']
+ for address in addresses:
+ self.cli_set(base_path + ['listen-address', address])
+
+ # commit changes
+ self.cli_commit()
+
+ # Check configured port
+ tmp = get_config_value('Port')
+ for port in ports:
+ self.assertIn(port, tmp)
+
+ # Check listen address
+ tmp = get_config_value('ListenAddress')
+ for address in addresses:
+ self.assertIn(address, tmp)
+
+ def test_ssh_vrf_single(self):
+ vrf = 'mgmt'
+ # Check if SSH service can be bound to given VRF
+ self.cli_set(base_path + ['vrf', vrf])
+
+ # VRF does yet not exist - an error must be thrown
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
+
+ self.cli_set(['vrf', 'name', vrf, 'table', '1338'])
+
+ # commit changes
+ self.cli_commit()
+
+ # Check for process in VRF
+ tmp = cmd(f'ip vrf pids {vrf}')
+ self.assertIn(PROCESS_NAME, tmp)
+
+ def test_ssh_vrf_multi(self):
+ # Check if SSH service can be bound to multiple VRFs
+ vrfs = ['red', 'blue', 'green']
+ for vrf in vrfs:
+ self.cli_set(base_path + ['vrf', vrf])
+
+ # VRF does yet not exist - an error must be thrown
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
+
+ table = 12345
+ for vrf in vrfs:
+ self.cli_set(['vrf', 'name', vrf, 'table', str(table)])
+ table += 1
+
+ # commit changes
+ self.cli_commit()
+
+ # Check for process in VRF
+ for vrf in vrfs:
+ tmp = cmd(f'ip vrf pids {vrf}')
+ self.assertIn(PROCESS_NAME, tmp)
+
+ def test_ssh_login(self):
+ # Perform SSH login and command execution with a predefined user. The
+ # result (output of uname -a) must match the output if the command is
+ # run natively.
+ #
+ # We also try to login as an invalid user - this is not allowed to work.
+
+ test_user = 'ssh_test'
+ test_pass = 'v2i57DZs8idUwMN3VC92'
+ test_command = 'uname -a'
+
+ self.cli_set(base_path)
+ self.cli_set(['system', 'login', 'user', test_user, 'authentication', 'plaintext-password', test_pass])
+
+ # commit changes
+ self.cli_commit()
+
+ # Login with proper credentials
+ output, error = self.ssh_send_cmd(test_command, test_user, test_pass)
+ # verify login
+ self.assertFalse(error)
+ self.assertEqual(output, cmd(test_command))
+
+ # Login with invalid credentials
+ with self.assertRaises(paramiko.ssh_exception.AuthenticationException):
+ output, error = self.ssh_send_cmd(test_command, 'invalid_user', 'invalid_password')
+
+ self.cli_delete(['system', 'login', 'user', test_user])
+ self.cli_commit()
+
+ # After deletion the test user is not allowed to remain in /etc/passwd
+ usernames = [x[0] for x in getpwall()]
+ self.assertNotIn(test_user, usernames)
+
+ def test_ssh_dynamic_protection(self):
+ # check sshguard service
+
+ SSHGUARD_CONFIG = '/etc/sshguard/sshguard.conf'
+ SSHGUARD_WHITELIST = '/etc/sshguard/whitelist'
+ SSHGUARD_PROCESS = 'sshguard'
+ block_time = '123'
+ detect_time = '1804'
+ port = '22'
+ threshold = '10'
+ allow_list = ['192.0.2.0/24', '2001:db8::/48']
+
+ self.cli_set(base_path + ['dynamic-protection', 'block-time', block_time])
+ self.cli_set(base_path + ['dynamic-protection', 'detect-time', detect_time])
+ self.cli_set(base_path + ['dynamic-protection', 'threshold', threshold])
+ for allow in allow_list:
+ self.cli_set(base_path + ['dynamic-protection', 'allow-from', allow])
+
+ # commit changes
+ self.cli_commit()
+
+ # Check configured port
+ tmp = get_config_value('Port')
+ self.assertIn(port, tmp)
+
+ # Check sshgurad service
+ self.assertTrue(process_named_running(SSHGUARD_PROCESS))
+
+ sshguard_lines = [
+ f'THRESHOLD={threshold}',
+ f'BLOCK_TIME={block_time}',
+ f'DETECTION_TIME={detect_time}'
+ ]
+
+ tmp_sshguard_conf = read_file(SSHGUARD_CONFIG)
+ for line in sshguard_lines:
+ self.assertIn(line, tmp_sshguard_conf)
+
+ tmp_whitelist_conf = read_file(SSHGUARD_WHITELIST)
+ for allow in allow_list:
+ self.assertIn(allow, tmp_whitelist_conf)
+
+ # Delete service ssh dynamic-protection
+ # but not service ssh itself
+ self.cli_delete(base_path + ['dynamic-protection'])
+ self.cli_commit()
+
+ self.assertFalse(process_named_running(SSHGUARD_PROCESS))
+
+
+ # Network Device Collaborative Protection Profile
+ def test_ssh_ndcpp(self):
+ ciphers = ['aes128-cbc', 'aes128-ctr', 'aes256-cbc', 'aes256-ctr']
+ host_key_algs = ['sk-ssh-ed25519@openssh.com', 'ssh-rsa', 'ssh-ed25519']
+ kexes = ['diffie-hellman-group14-sha1', 'ecdh-sha2-nistp256', 'ecdh-sha2-nistp384', 'ecdh-sha2-nistp521']
+ macs = ['hmac-sha1', 'hmac-sha2-256', 'hmac-sha2-512']
+ rekey_time = '60'
+ rekey_data = '1024'
+
+ for cipher in ciphers:
+ self.cli_set(base_path + ['ciphers', cipher])
+ for host_key in host_key_algs:
+ self.cli_set(base_path + ['hostkey-algorithm', host_key])
+ for kex in kexes:
+ self.cli_set(base_path + ['key-exchange', kex])
+ for mac in macs:
+ self.cli_set(base_path + ['mac', mac])
+ # Optional rekey parameters
+ self.cli_set(base_path + ['rekey', 'data', rekey_data])
+ self.cli_set(base_path + ['rekey', 'time', rekey_time])
+
+ # commit changes
+ self.cli_commit()
+
+ ssh_lines = ['Ciphers aes128-cbc,aes128-ctr,aes256-cbc,aes256-ctr',
+ 'HostKeyAlgorithms sk-ssh-ed25519@openssh.com,ssh-rsa,ssh-ed25519',
+ 'MACs hmac-sha1,hmac-sha2-256,hmac-sha2-512',
+ 'KexAlgorithms diffie-hellman-group14-sha1,ecdh-sha2-nistp256,ecdh-sha2-nistp384,ecdh-sha2-nistp521',
+ 'RekeyLimit 1024M 60M'
+ ]
+ tmp_sshd_conf = read_file(SSHD_CONF)
+
+ for line in ssh_lines:
+ self.assertIn(line, tmp_sshd_conf)
+
+ def test_ssh_pubkey_accepted_algorithm(self):
+ algs = ['ssh-ed25519', 'ecdsa-sha2-nistp256', 'ecdsa-sha2-nistp384',
+ 'ecdsa-sha2-nistp521', 'ssh-dss', 'ssh-rsa', 'rsa-sha2-256',
+ 'rsa-sha2-512'
+ ]
+
+ expected = 'PubkeyAcceptedAlgorithms '
+ for alg in algs:
+ self.cli_set(base_path + ['pubkey-accepted-algorithm', alg])
+ expected = f'{expected}{alg},'
+ expected = expected[:-1]
+
+ self.cli_commit()
+ tmp_sshd_conf = read_file(SSHD_CONF)
+ self.assertIn(expected, tmp_sshd_conf)
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)