#!/usr/bin/env python3
#
# Copyright (C) 2019-2022 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import os
import paramiko
import re
import unittest
from pwd import getpwall
from base_vyostest_shim import VyOSUnitTestSHIM
from vyos.configsession import ConfigSessionError
from vyos.utils.process import cmd
from vyos.utils.process import is_systemd_service_running
from vyos.utils.process import process_named_running
from vyos.utils.file import read_file
PROCESS_NAME = 'sshd'
SSHD_CONF = '/run/sshd/sshd_config'
base_path = ['service', 'ssh']
key_rsa = '/etc/ssh/ssh_host_rsa_key'
key_dsa = '/etc/ssh/ssh_host_dsa_key'
key_ed25519 = '/etc/ssh/ssh_host_ed25519_key'
def get_config_value(key):
tmp = read_file(SSHD_CONF)
tmp = re.findall(f'\n?{key}\s+(.*)', tmp)
return tmp
class TestServiceSSH(VyOSUnitTestSHIM.TestCase):
@classmethod
def setUpClass(cls):
super(TestServiceSSH, cls).setUpClass()
# ensure we can also run this test on a live system - so lets clean
# out the current configuration :)
cls.cli_delete(cls, base_path)
cls.cli_delete(cls, ['vrf'])
def tearDown(self):
# Check for running process
self.assertTrue(process_named_running(PROCESS_NAME))
# delete testing SSH config
self.cli_delete(base_path)
self.cli_delete(['vrf'])
self.cli_commit()
self.assertTrue(os.path.isfile(key_rsa))
self.assertTrue(os.path.isfile(key_dsa))
self.assertTrue(os.path.isfile(key_ed25519))
# Established SSH connections remains running after service is stopped.
# We can not use process_named_running here - we rather need to check
# that the systemd service is no longer running
self.assertFalse(is_systemd_service_running(PROCESS_NAME))
def test_ssh_default(self):
# Check if SSH service runs with default settings - used for checking
# behavior of in XML definition
self.cli_set(base_path)
# commit changes
self.cli_commit()
# Check configured port
port = get_config_value('Port')[0]
self.assertEqual('22', port) # default value
def test_ssh_single_listen_address(self):
# Check if SSH service can be configured and runs
self.cli_set(base_path + ['port', '1234'])
self.cli_set(base_path + ['disable-host-validation'])
self.cli_set(base_path + ['disable-password-authentication'])
self.cli_set(base_path + ['loglevel', 'verbose'])
self.cli_set(base_path + ['client-keepalive-interval', '100'])
self.cli_set(base_path + ['listen-address', '127.0.0.1'])
# commit changes
self.cli_commit()
# Check configured port
port = get_config_value('Port')[0]
self.assertTrue("1234" in port)
# Check DNS usage
dns = get_config_value('UseDNS')[0]
self.assertTrue("no" in dns)
# Check PasswordAuthentication
pwd = get_config_value('PasswordAuthentication')[0]
self.assertTrue("no" in pwd)
# Check loglevel
loglevel = get_config_value('LogLevel')[0]
self.assertTrue("VERBOSE" in loglevel)
# Check listen address
address = get_config_value('ListenAddress')[0]
self.assertTrue("127.0.0.1" in address)
# Check keepalive
keepalive = get_config_value('ClientAliveInterval')[0]
self.assertTrue("100" in keepalive)
def test_ssh_multiple_listen_addresses(self):
# Check if SSH service can be configured and runs with multiple
# listen ports and listen-addresses
ports = ['22', '2222', '2223', '2224']
for port in ports:
self.cli_set(base_path + ['port', port])
addresses = ['127.0.0.1', '::1']
for address in addresses:
self.cli_set(base_path + ['listen-address', address])
# commit changes
self.cli_commit()
# Check configured port
tmp = get_config_value('Port')
for port in ports:
self.assertIn(port, tmp)
# Check listen address
tmp = get_config_value('ListenAddress')
for address in addresses:
self.assertIn(address, tmp)
def test_ssh_vrf_single(self):
vrf = 'mgmt'
# Check if SSH service can be bound to given VRF
self.cli_set(base_path + ['vrf', vrf])
# VRF does yet not exist - an error must be thrown
with self.assertRaises(ConfigSessionError):
self.cli_commit()
self.cli_set(['vrf', 'name', vrf, 'table', '1338'])
# commit changes
self.cli_commit()
# Check for process in VRF
tmp = cmd(f'ip vrf pids {vrf}')
self.assertIn(PROCESS_NAME, tmp)
def test_ssh_vrf_multi(self):
# Check if SSH service can be bound to multiple VRFs
vrfs = ['red', 'blue', 'green']
for vrf in vrfs:
self.cli_set(base_path + ['vrf', vrf])
# VRF does yet not exist - an error must be thrown
with self.assertRaises(ConfigSessionError):
self.cli_commit()
table = 12345
for vrf in vrfs:
self.cli_set(['vrf', 'name', vrf, 'table', str(table)])
table += 1
# commit changes
self.cli_commit()
# Check for process in VRF
for vrf in vrfs:
tmp = cmd(f'ip vrf pids {vrf}')
self.assertIn(PROCESS_NAME, tmp)
def test_ssh_login(self):
# Perform SSH login and command execution with a predefined user. The
# result (output of uname -a) must match the output if the command is
# run natively.
#
# We also try to login as an invalid user - this is not allowed to work.
test_user = 'ssh_test'
test_pass = 'v2i57DZs8idUwMN3VC92'
test_command = 'uname -a'
self.cli_set(base_path)
self.cli_set(['system', 'login', 'user', test_user, 'authentication', 'plaintext-password', test_pass])
# commit changes
self.cli_commit()
# Login with proper credentials
output, error = self.ssh_send_cmd(test_command, test_user, test_pass)
# verify login
self.assertFalse(error)
self.assertEqual(output, cmd(test_command))
# Login with invalid credentials
with self.assertRaises(paramiko.ssh_exception.AuthenticationException):
output, error = self.ssh_send_cmd(test_command, 'invalid_user', 'invalid_password')
self.cli_delete(['system', 'login', 'user', test_user])
self.cli_commit()
# After deletion the test user is not allowed to remain in /etc/passwd
usernames = [x[0] for x in getpwall()]
self.assertNotIn(test_user, usernames)
def test_ssh_dynamic_protection(self):
# check sshguard service
SSHGUARD_CONFIG = '/etc/sshguard/sshguard.conf'
SSHGUARD_WHITELIST = '/etc/sshguard/whitelist'
SSHGUARD_PROCESS = 'sshguard'
block_time = '123'
detect_time = '1804'
port = '22'
threshold = '10'
allow_list = ['192.0.2.0/24', '2001:db8::/48']
self.cli_set(base_path + ['dynamic-protection', 'block-time', block_time])
self.cli_set(base_path + ['dynamic-protection', 'detect-time', detect_time])
self.cli_set(base_path + ['dynamic-protection', 'threshold', threshold])
for allow in allow_list:
self.cli_set(base_path + ['dynamic-protection', 'allow-from', allow])
# commit changes
self.cli_commit()
# Check configured port
tmp = get_config_value('Port')
self.assertIn(port, tmp)
# Check sshgurad service
self.assertTrue(process_named_running(SSHGUARD_PROCESS))
sshguard_lines = [
f'THRESHOLD={threshold}',
f'BLOCK_TIME={block_time}',
f'DETECTION_TIME={detect_time}'
]
tmp_sshguard_conf = read_file(SSHGUARD_CONFIG)
for line in sshguard_lines:
self.assertIn(line, tmp_sshguard_conf)
tmp_whitelist_conf = read_file(SSHGUARD_WHITELIST)
for allow in allow_list:
self.assertIn(allow, tmp_whitelist_conf)
# Delete service ssh dynamic-protection
# but not service ssh itself
self.cli_delete(base_path + ['dynamic-protection'])
self.cli_commit()
self.assertFalse(process_named_running(SSHGUARD_PROCESS))
# Network Device Collaborative Protection Profile
def test_ssh_ndcpp(self):
ciphers = ['aes128-cbc', 'aes128-ctr', 'aes256-cbc', 'aes256-ctr']
host_key_algs = ['sk-ssh-ed25519@openssh.com', 'ssh-rsa', 'ssh-ed25519']
kexes = ['diffie-hellman-group14-sha1', 'ecdh-sha2-nistp256', 'ecdh-sha2-nistp384', 'ecdh-sha2-nistp521']
macs = ['hmac-sha1', 'hmac-sha2-256', 'hmac-sha2-512']
rekey_time = '60'
rekey_data = '1024'
for cipher in ciphers:
self.cli_set(base_path + ['ciphers', cipher])
for host_key in host_key_algs:
self.cli_set(base_path + ['hostkey-algorithm', host_key])
for kex in kexes:
self.cli_set(base_path + ['key-exchange', kex])
for mac in macs:
self.cli_set(base_path + ['mac', mac])
# Optional rekey parameters
self.cli_set(base_path + ['rekey', 'data', rekey_data])
self.cli_set(base_path + ['rekey', 'time', rekey_time])
# commit changes
self.cli_commit()
ssh_lines = ['Ciphers aes128-cbc,aes128-ctr,aes256-cbc,aes256-ctr',
'HostKeyAlgorithms sk-ssh-ed25519@openssh.com,ssh-rsa,ssh-ed25519',
'MACs hmac-sha1,hmac-sha2-256,hmac-sha2-512',
'KexAlgorithms diffie-hellman-group14-sha1,ecdh-sha2-nistp256,ecdh-sha2-nistp384,ecdh-sha2-nistp521',
'RekeyLimit 1024M 60M'
]
tmp_sshd_conf = read_file(SSHD_CONF)
for line in ssh_lines:
self.assertIn(line, tmp_sshd_conf)
if __name__ == '__main__':
unittest.main(verbosity=2)