#!/usr/bin/env python3 # # Copyright (C) 2019-2020 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import re import os import unittest from vyos.configsession import ConfigSession from vyos.configsession import ConfigSessionError from vyos.util import cmd from vyos.util import process_named_running from vyos.util import read_file PROCESS_NAME = 'sshd' SSHD_CONF = '/run/ssh/sshd_config' base_path = ['service', 'ssh'] vrf = 'ssh-test' def get_config_value(key): tmp = read_file(SSHD_CONF) tmp = re.findall(f'\n?{key}\s+(.*)', tmp) return tmp class TestServiceSSH(unittest.TestCase): def setUp(self): self.session = ConfigSession(os.getpid()) # ensure we can also run this test on a live system - so lets clean # out the current configuration :) self.session.delete(base_path) def tearDown(self): # delete testing SSH config self.session.delete(base_path) # restore "plain" SSH access self.session.set(base_path) # delete VRF self.session.delete(['vrf', 'name', vrf]) self.session.commit() del self.session def test_ssh_default(self): # Check if SSH service runs with default settings - used for checking # behavior of in XML definition self.session.set(base_path) # commit changes self.session.commit() # Check configured port port = get_config_value('Port')[0] self.assertEqual('22', port) # Check for running process self.assertTrue(process_named_running(PROCESS_NAME)) def test_ssh_single_listen_address(self): # Check if SSH service can be configured and runs self.session.set(base_path + ['port', '1234']) self.session.set(base_path + ['disable-host-validation']) self.session.set(base_path + ['disable-password-authentication']) self.session.set(base_path + ['loglevel', 'verbose']) self.session.set(base_path + ['client-keepalive-interval', '100']) self.session.set(base_path + ['listen-address', '127.0.0.1']) # commit changes self.session.commit() # Check configured port port = get_config_value('Port')[0] self.assertTrue("1234" in port) # Check DNS usage dns = get_config_value('UseDNS')[0] self.assertTrue("no" in dns) # Check PasswordAuthentication pwd = get_config_value('PasswordAuthentication')[0] self.assertTrue("no" in pwd) # Check loglevel loglevel = get_config_value('LogLevel')[0] self.assertTrue("VERBOSE" in loglevel) # Check listen address address = get_config_value('ListenAddress')[0] self.assertTrue("127.0.0.1" in address) # Check keepalive keepalive = get_config_value('ClientAliveInterval')[0] self.assertTrue("100" in keepalive) # Check for running process self.assertTrue(process_named_running(PROCESS_NAME)) def test_ssh_multiple_listen_addresses(self): # Check if SSH service can be configured and runs with multiple # listen ports and listen-addresses ports = ['22', '2222'] for port in ports: self.session.set(base_path + ['port', port]) addresses = ['127.0.0.1', '::1'] for address in addresses: self.session.set(base_path + ['listen-address', address]) # commit changes self.session.commit() # Check configured port tmp = get_config_value('Port') for port in ports: self.assertIn(port, tmp) # Check listen address tmp = get_config_value('ListenAddress') for address in addresses: self.assertIn(address, tmp) # Check for running process self.assertTrue(process_named_running(PROCESS_NAME)) def test_ssh_vrf(self): # Check if SSH service can be bound to given VRF port = '22' self.session.set(base_path + ['port', port]) self.session.set(base_path + ['vrf', vrf]) # VRF does yet not exist - an error must be thrown with self.assertRaises(ConfigSessionError): self.session.commit() self.session.set(['vrf', 'name', vrf, 'table', '1001']) # commit changes self.session.commit() # Check configured port tmp = get_config_value('Port') self.assertIn(port, tmp) # Check for running process self.assertTrue(process_named_running(PROCESS_NAME)) # Check for process in VRF tmp = cmd(f'ip vrf pids {vrf}') self.assertIn(PROCESS_NAME, tmp) if __name__ == '__main__': unittest.main(verbosity=2)