#!/usr/bin/env python3
#
# Copyright (C) 2019-2024 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import os
import paramiko
import re
import unittest

from pwd import getpwall

from base_vyostest_shim import VyOSUnitTestSHIM

from vyos.configsession import ConfigSessionError
from vyos.utils.process import cmd
from vyos.utils.process import is_systemd_service_running
from vyos.utils.process import process_named_running
from vyos.utils.file import read_file
from vyos.xml_ref import default_value

PROCESS_NAME = 'sshd'
SSHD_CONF = '/run/sshd/sshd_config'
base_path = ['service', 'ssh']

key_rsa = '/etc/ssh/ssh_host_rsa_key'
key_dsa = '/etc/ssh/ssh_host_dsa_key'
key_ed25519 = '/etc/ssh/ssh_host_ed25519_key'

def get_config_value(key):
    tmp = read_file(SSHD_CONF)
    tmp = re.findall(f'\n?{key}\s+(.*)', tmp)
    return tmp

class TestServiceSSH(VyOSUnitTestSHIM.TestCase):
    @classmethod
    def setUpClass(cls):
        super(TestServiceSSH, cls).setUpClass()

        # ensure we can also run this test on a live system - so lets clean
        # out the current configuration :)
        cls.cli_delete(cls, base_path)
        cls.cli_delete(cls, ['vrf'])

    def tearDown(self):
        # Check for running process
        self.assertTrue(process_named_running(PROCESS_NAME))

        # delete testing SSH config
        self.cli_delete(base_path)
        self.cli_delete(['vrf'])
        self.cli_commit()

        self.assertTrue(os.path.isfile(key_rsa))
        self.assertTrue(os.path.isfile(key_dsa))
        self.assertTrue(os.path.isfile(key_ed25519))

        # Established SSH connections remains running after service is stopped.
        # We can not use process_named_running here - we rather need to check
        # that the systemd service is no longer running
        self.assertFalse(is_systemd_service_running(PROCESS_NAME))

    def test_ssh_default(self):
        # Check if SSH service runs with default settings - used for checking
        # behavior of <defaultValue> in XML definition
        self.cli_set(base_path)

        # commit changes
        self.cli_commit()

        # Check configured port agains CLI default value
        port = get_config_value('Port')
        cli_default = default_value(base_path + ['port'])
        self.assertEqual(port, cli_default)

    def test_ssh_single_listen_address(self):
        # Check if SSH service can be configured and runs
        self.cli_set(base_path + ['port', '1234'])
        self.cli_set(base_path + ['disable-host-validation'])
        self.cli_set(base_path + ['disable-password-authentication'])
        self.cli_set(base_path + ['loglevel', 'verbose'])
        self.cli_set(base_path + ['client-keepalive-interval', '100'])
        self.cli_set(base_path + ['listen-address', '127.0.0.1'])

        # commit changes
        self.cli_commit()

        # Check configured port
        port = get_config_value('Port')[0]
        self.assertTrue("1234" in port)

        # Check DNS usage
        dns = get_config_value('UseDNS')[0]
        self.assertTrue("no" in dns)

        # Check PasswordAuthentication
        pwd = get_config_value('PasswordAuthentication')[0]
        self.assertTrue("no" in pwd)

        # Check loglevel
        loglevel = get_config_value('LogLevel')[0]
        self.assertTrue("VERBOSE" in loglevel)

        # Check listen address
        address = get_config_value('ListenAddress')[0]
        self.assertTrue("127.0.0.1" in address)

        # Check keepalive
        keepalive = get_config_value('ClientAliveInterval')[0]
        self.assertTrue("100" in keepalive)

    def test_ssh_multiple_listen_addresses(self):
        # Check if SSH service can be configured and runs with multiple
        # listen ports and listen-addresses
        ports = ['22', '2222', '2223', '2224']
        for port in ports:
            self.cli_set(base_path + ['port', port])

        addresses = ['127.0.0.1', '::1']
        for address in addresses:
            self.cli_set(base_path + ['listen-address', address])

        # commit changes
        self.cli_commit()

        # Check configured port
        tmp = get_config_value('Port')
        for port in ports:
            self.assertIn(port, tmp)

        # Check listen address
        tmp = get_config_value('ListenAddress')
        for address in addresses:
            self.assertIn(address, tmp)

    def test_ssh_vrf_single(self):
        vrf = 'mgmt'
        # Check if SSH service can be bound to given VRF
        self.cli_set(base_path + ['vrf', vrf])

        # VRF does yet not exist - an error must be thrown
        with self.assertRaises(ConfigSessionError):
            self.cli_commit()

        self.cli_set(['vrf', 'name', vrf, 'table', '1338'])

        # commit changes
        self.cli_commit()

        # Check for process in VRF
        tmp = cmd(f'ip vrf pids {vrf}')
        self.assertIn(PROCESS_NAME, tmp)

    def test_ssh_vrf_multi(self):
        # Check if SSH service can be bound to multiple VRFs
        vrfs = ['red', 'blue', 'green']
        for vrf in vrfs:
            self.cli_set(base_path + ['vrf', vrf])

        # VRF does yet not exist - an error must be thrown
        with self.assertRaises(ConfigSessionError):
            self.cli_commit()

        table = 12345
        for vrf in vrfs:
            self.cli_set(['vrf', 'name', vrf, 'table', str(table)])
            table += 1

        # commit changes
        self.cli_commit()

        # Check for process in VRF
        for vrf in vrfs:
            tmp = cmd(f'ip vrf pids {vrf}')
            self.assertIn(PROCESS_NAME, tmp)

    def test_ssh_login(self):
        # Perform SSH login and command execution with a predefined user. The
        # result (output of uname -a) must match the output if the command is
        # run natively.
        #
        # We also try to login as an invalid user - this is not allowed to work.

        test_user = 'ssh_test'
        test_pass = 'v2i57DZs8idUwMN3VC92'
        test_command = 'uname -a'

        self.cli_set(base_path)
        self.cli_set(['system', 'login', 'user', test_user, 'authentication', 'plaintext-password', test_pass])

        # commit changes
        self.cli_commit()

        # Login with proper credentials
        output, error = self.ssh_send_cmd(test_command, test_user, test_pass)
        # verify login
        self.assertFalse(error)
        self.assertEqual(output, cmd(test_command))

        # Login with invalid credentials
        with self.assertRaises(paramiko.ssh_exception.AuthenticationException):
            output, error = self.ssh_send_cmd(test_command, 'invalid_user', 'invalid_password')

        self.cli_delete(['system', 'login', 'user', test_user])
        self.cli_commit()

        # After deletion the test user is not allowed to remain in /etc/passwd
        usernames = [x[0] for x in getpwall()]
        self.assertNotIn(test_user, usernames)

    def test_ssh_dynamic_protection(self):
        # check sshguard service

        SSHGUARD_CONFIG = '/etc/sshguard/sshguard.conf'
        SSHGUARD_WHITELIST = '/etc/sshguard/whitelist'
        SSHGUARD_PROCESS = 'sshguard'
        block_time = '123'
        detect_time = '1804'
        port = '22'
        threshold = '10'
        allow_list = ['192.0.2.0/24', '2001:db8::/48']

        self.cli_set(base_path + ['dynamic-protection', 'block-time', block_time])
        self.cli_set(base_path + ['dynamic-protection', 'detect-time', detect_time])
        self.cli_set(base_path + ['dynamic-protection', 'threshold', threshold])
        for allow in allow_list:
            self.cli_set(base_path + ['dynamic-protection', 'allow-from', allow])

        # commit changes
        self.cli_commit()

        # Check configured port
        tmp = get_config_value('Port')
        self.assertIn(port, tmp)

        # Check sshgurad service
        self.assertTrue(process_named_running(SSHGUARD_PROCESS))

        sshguard_lines = [
            f'THRESHOLD={threshold}',
            f'BLOCK_TIME={block_time}',
            f'DETECTION_TIME={detect_time}'
        ]

        tmp_sshguard_conf = read_file(SSHGUARD_CONFIG)
        for line in sshguard_lines:
            self.assertIn(line, tmp_sshguard_conf)

        tmp_whitelist_conf = read_file(SSHGUARD_WHITELIST)
        for allow in allow_list:
            self.assertIn(allow, tmp_whitelist_conf)

        # Delete service ssh dynamic-protection
        # but not service ssh itself
        self.cli_delete(base_path + ['dynamic-protection'])
        self.cli_commit()

        self.assertFalse(process_named_running(SSHGUARD_PROCESS))


    # Network Device Collaborative Protection Profile
    def test_ssh_ndcpp(self):
        ciphers = ['aes128-cbc', 'aes128-ctr', 'aes256-cbc', 'aes256-ctr']
        host_key_algs = ['sk-ssh-ed25519@openssh.com', 'ssh-rsa', 'ssh-ed25519']
        kexes = ['diffie-hellman-group14-sha1', 'ecdh-sha2-nistp256', 'ecdh-sha2-nistp384', 'ecdh-sha2-nistp521']
        macs = ['hmac-sha1', 'hmac-sha2-256', 'hmac-sha2-512']
        rekey_time = '60'
        rekey_data = '1024'

        for cipher in ciphers:
            self.cli_set(base_path + ['ciphers', cipher])
        for host_key in host_key_algs:
            self.cli_set(base_path + ['hostkey-algorithm', host_key])
        for kex in kexes:
            self.cli_set(base_path + ['key-exchange', kex])
        for mac in macs:
            self.cli_set(base_path + ['mac', mac])
        # Optional rekey parameters
        self.cli_set(base_path + ['rekey', 'data', rekey_data])
        self.cli_set(base_path + ['rekey', 'time', rekey_time])

        # commit changes
        self.cli_commit()

        ssh_lines = ['Ciphers aes128-cbc,aes128-ctr,aes256-cbc,aes256-ctr',
                     'HostKeyAlgorithms sk-ssh-ed25519@openssh.com,ssh-rsa,ssh-ed25519',
                     'MACs hmac-sha1,hmac-sha2-256,hmac-sha2-512',
                     'KexAlgorithms diffie-hellman-group14-sha1,ecdh-sha2-nistp256,ecdh-sha2-nistp384,ecdh-sha2-nistp521',
                     'RekeyLimit 1024M 60M'
                     ]
        tmp_sshd_conf = read_file(SSHD_CONF)

        for line in ssh_lines:
            self.assertIn(line, tmp_sshd_conf)

    def test_ssh_pubkey_accepted_algorithm(self):
        algs = ['ssh-ed25519', 'ecdsa-sha2-nistp256', 'ecdsa-sha2-nistp384',
                'ecdsa-sha2-nistp521', 'ssh-dss', 'ssh-rsa', 'rsa-sha2-256',
                'rsa-sha2-512'
                ]

        expected = 'PubkeyAcceptedAlgorithms '
        for alg in algs:
            self.cli_set(base_path + ['pubkey-accepted-algorithm', alg])
            expected = f'{expected}{alg},'
        expected = expected[:-1]

        self.cli_commit()
        tmp_sshd_conf = read_file(SSHD_CONF)
        self.assertIn(expected, tmp_sshd_conf)


if __name__ == '__main__':
    unittest.main(verbosity=2)