diff options
| author | Christian Breunig <christian@breunig.cc> | 2025-01-13 19:53:37 +0100 | 
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-01-13 19:53:37 +0100 | 
| commit | 99d0c7a804ea3cf7f843f0d4810e6772cf7ceeb8 (patch) | |
| tree | 65b452e14327a50178d79db67519d3cde96a03c5 /smoketest/scripts/cli | |
| parent | e8581721313143c44bd0b19ac5c64b6499f1f605 (diff) | |
| parent | 21b2541d98b02602dc2301e57c2ca7efddbc6cff (diff) | |
| download | vyos-1x-99d0c7a804ea3cf7f843f0d4810e6772cf7ceeb8.tar.gz vyos-1x-99d0c7a804ea3cf7f843f0d4810e6772cf7ceeb8.zip  | |
Merge pull request #4299 from c-po/radius-smoketest
T7038: T7039: fix broken RADIUS IPv6 source address and add smoketests
Diffstat (limited to 'smoketest/scripts/cli')
| -rwxr-xr-x | smoketest/scripts/cli/test_system_login.py | 210 | 
1 files changed, 130 insertions, 80 deletions
diff --git a/smoketest/scripts/cli/test_system_login.py b/smoketest/scripts/cli/test_system_login.py index f6a2c3cb3..d79f5521c 100755 --- a/smoketest/scripts/cli/test_system_login.py +++ b/smoketest/scripts/cli/test_system_login.py @@ -31,17 +31,19 @@ from subprocess import PIPE  from pwd import getpwall  from vyos.configsession import ConfigSessionError +from vyos.configquery import ConfigTreeQuery  from vyos.utils.auth import get_current_user  from vyos.utils.process import cmd -from vyos.utils.process import process_named_running  from vyos.utils.file import read_file  from vyos.utils.file import write_file  from vyos.template import inc_ip +from vyos.template import is_ipv6 +from vyos.xml_ref import default_value  base_path = ['system', 'login']  users = ['vyos1', 'vyos-roxx123', 'VyOS-123_super.Nice'] -SSH_PROCESS_NAME = 'sshd' +ssh_test_command = '/opt/vyatta/bin/vyatta-op-cmd-wrapper show version'  ssh_pubkey = """  AAAAB3NzaC1yc2EAAAADAQABAAABgQD0NuhUOEtMIKnUVFIHoFatqX/c4mjerXyF @@ -57,7 +59,6 @@ TTSb0X1zPGxPIRFy5GoGtO9Mm5h4OZk=  tac_image = 'docker.io/lfkeitel/tacacs_plus:alpine'  tac_image_path = '/usr/share/vyos/tacplus-alpine.tar' -  TAC_PLUS_TMPL_SRC = """  id = spawnd {      debug redirect = /dev/stdout @@ -100,6 +101,25 @@ id = tac_plus {          member = admin      }  } + +""" + +radius_image = 'docker.io/dchidell/radius-web:latest' +radius_image_path = '/usr/share/vyos/radius-latest.tar' +RADIUS_CLIENTS_TMPL_SRC = """ +client SMOKETEST { +    secret = {{ radius_key }} +    nastype = other +    ipaddr = {{ source_address }} +} + +""" +RADIUS_USERS_TMPL_SRC = """ +# User configuration +{{ username }}  Cleartext-Password := "{{ password }}" +    Service-Type = NAS-Prompt-User, +    Cisco-AVPair = "shell:priv-lvl=15" +  """  class TestSystemLogin(VyOSUnitTestSHIM.TestCase): @@ -112,16 +132,36 @@ class TestSystemLogin(VyOSUnitTestSHIM.TestCase):          cls.cli_delete(cls, base_path + ['radius'])          cls.cli_delete(cls, base_path + ['tacacs']) -        # Load image for smoketest provided in vyos-1x-smoketest +        # Load images for smoketest provided in vyos-1x-smoketest          if not os.path.exists(tac_image_path):              cls.fail(cls, f'{tac_image} image not available')          cmd(f'sudo podman load -i {tac_image_path}') +        if not os.path.exists(radius_image_path): +            cls.fail(cls, f'{radius_image} image not available') +        cmd(f'sudo podman load -i {radius_image_path}') + +        cls.ssh_test_command_result = cls.op_mode(cls, ['show', 'version']) + +        # Dynamically start SSH service if it's not running +        config = ConfigTreeQuery() +        cls.is_sshd_pre_test = config.exists(['service', 'sshd']) +        if not cls.is_sshd_pre_test: +            # Start SSH service +            cls.cli_set(cls, ['service', 'ssh']) +      @classmethod      def tearDownClass(cls): +        # Stop SSH service - if it was not running before starting the test +        if not cls.is_sshd_pre_test: +            cls.cli_set(cls, ['service', 'ssh']) +            cls.cli_commit(cls) +          super(TestSystemLogin, cls).tearDownClass() -        # Cleanup podman image + +        # Cleanup container images          cmd(f'sudo podman image rm -f {tac_image}') +        cmd(f'sudo podman image rm -f {radius_image}')      def tearDown(self):          # Delete individual users from configuration @@ -152,9 +192,6 @@ class TestSystemLogin(VyOSUnitTestSHIM.TestCase):          self.cli_delete(base_path + ['user', system_user])      def test_system_login_user(self): -        # Check if user can be created and we can SSH to localhost -        self.cli_set(['service', 'ssh', 'port', '22']) -          for user in users:              name = f'VyOS Roxx {user}'              home_dir = f'/tmp/smoketest/{user}' @@ -240,71 +277,71 @@ class TestSystemLogin(VyOSUnitTestSHIM.TestCase):              self.assertIn(f'{option}=y', kernel_config)      def test_system_login_radius_ipv4(self): -        # Verify generated RADIUS configuration files - -        radius_key = 'VyOSsecretVyOS' -        radius_server = '172.16.100.10' -        radius_source = '127.0.0.1' -        radius_port = '2000' -        radius_timeout = '1' - -        self.cli_set(base_path + ['radius', 'server', radius_server, 'key', radius_key]) -        self.cli_set(base_path + ['radius', 'server', radius_server, 'port', radius_port]) -        self.cli_set(base_path + ['radius', 'server', radius_server, 'timeout', radius_timeout]) -        self.cli_set(base_path + ['radius', 'source-address', radius_source]) -        self.cli_set(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)]) +        radius_servers = ['100.64.0.4', '100.64.0.5'] +        radius_source = '100.64.0.1' +        self._system_login_radius_test_helper(radius_servers, radius_source) -        # check validate() - Only one IPv4 source-address supported -        with self.assertRaises(ConfigSessionError): -            self.cli_commit() -        self.cli_delete(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)]) - -        self.cli_commit() +    def test_system_login_radius_ipv6(self): +        radius_servers = ['2001:db8::4', '2001:db8::5'] +        radius_source = '2001:db8::1' +        self._system_login_radius_test_helper(radius_servers, radius_source) -        # this file must be read with higher permissions -        pam_radius_auth_conf = cmd('sudo cat /etc/pam_radius_auth.conf') -        tmp = re.findall(r'\n?{}:{}\s+{}\s+{}\s+{}'.format(radius_server, -                        radius_port, radius_key, radius_timeout, -                        radius_source), pam_radius_auth_conf) -        self.assertTrue(tmp) +    def _system_login_radius_test_helper(self, radius_servers: list, radius_source: str): +        # Verify generated RADIUS configuration files +        radius_key = ''.join(secrets.choice(string.ascii_letters + string.digits) for i in range(10)) -        # required, static options -        self.assertIn('priv-lvl 15', pam_radius_auth_conf) -        self.assertIn('mapped_priv_user radius_priv_user', pam_radius_auth_conf) +        default_port = default_value(base_path + ['radius', 'server', radius_servers[0], 'port']) +        default_timeout = default_value(base_path + ['radius', 'server', radius_servers[0], 'timeout']) -        # PAM -        pam_common_account = read_file('/etc/pam.d/common-account') -        self.assertIn('pam_radius_auth.so', pam_common_account) +        dummy_if = 'dum12760' -        pam_common_auth = read_file('/etc/pam.d/common-auth') -        self.assertIn('pam_radius_auth.so', pam_common_auth) +        # Load container image for FreeRADIUS server +        radius_config = '/tmp/smoketest-radius-server' +        radius_container_path = ['container', 'name', 'radius-1'] -        pam_common_session = read_file('/etc/pam.d/common-session') -        self.assertIn('pam_radius_auth.so', pam_common_session) - -        pam_common_session_noninteractive = read_file('/etc/pam.d/common-session-noninteractive') -        self.assertIn('pam_radius_auth.so', pam_common_session_noninteractive) +        # Generate random string with 10 digits +        username = 'radius-admin' +        password = ''.join(secrets.choice(string.ascii_letters + string.digits) for i in range(10)) +        radius_source_mask = '32' +        if is_ipv6(radius_source): +            radius_source_mask = '128' +        radius_test_user = { +            'username' : username, +            'password' : password, +            'radius_key' : radius_key, +            'source_address' : f'{radius_source}/{radius_source_mask}' +        } -        # NSS -        nsswitch_conf = read_file('/etc/nsswitch.conf') -        tmp = re.findall(r'passwd:\s+mapuid\s+files\s+mapname', nsswitch_conf) -        self.assertTrue(tmp) +        tmpl = jinja2.Template(RADIUS_CLIENTS_TMPL_SRC) +        write_file(f'{radius_config}/clients.cfg', tmpl.render(radius_test_user)) -        tmp = re.findall(r'group:\s+mapname\s+files', nsswitch_conf) -        self.assertTrue(tmp) +        tmpl = jinja2.Template(RADIUS_USERS_TMPL_SRC) +        write_file(f'{radius_config}/users', tmpl.render(radius_test_user)) -    def test_system_login_radius_ipv6(self): -        # Verify generated RADIUS configuration files +        # Start tac_plus container +        self.cli_set(radius_container_path + ['allow-host-networks']) +        self.cli_set(radius_container_path + ['image', radius_image]) +        self.cli_set(radius_container_path + ['volume', 'clients', 'destination', '/etc/raddb/clients.conf']) +        self.cli_set(radius_container_path + ['volume', 'clients', 'mode', 'ro']) +        self.cli_set(radius_container_path + ['volume', 'clients', 'source', f'{radius_config}/clients.cfg']) +        self.cli_set(radius_container_path + ['volume', 'users', 'destination', '/etc/raddb/users']) +        self.cli_set(radius_container_path + ['volume', 'users', 'mode', 'ro']) +        self.cli_set(radius_container_path + ['volume', 'users', 'source', f'{radius_config}/users']) -        radius_key = 'VyOS-VyOS' -        radius_server = '2001:db8::1' -        radius_source = '::1' -        radius_port = '4000' -        radius_timeout = '4' +        # Start container +        self.cli_commit() -        self.cli_set(base_path + ['radius', 'server', radius_server, 'key', radius_key]) -        self.cli_set(base_path + ['radius', 'server', radius_server, 'port', radius_port]) -        self.cli_set(base_path + ['radius', 'server', radius_server, 'timeout', radius_timeout]) +        # Deinfine RADIUS servers +        for radius_server in radius_servers: +            # Use this system as "remote" RADIUS server +            dummy_address_mask = '32' +            if is_ipv6(radius_server): +                dummy_address_mask = '128' +            self.cli_set(['interfaces', 'dummy', dummy_if, 'address', f'{radius_server}/{dummy_address_mask}']) +            self.cli_set(base_path + ['radius', 'server', radius_server, 'key', radius_key]) + +        # Define RADIUS traffic source address +        self.cli_set(['interfaces', 'dummy', dummy_if, 'address', f'{radius_source}/{radius_source_mask}'])          self.cli_set(base_path + ['radius', 'source-address', radius_source])          self.cli_set(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)]) @@ -317,10 +354,13 @@ class TestSystemLogin(VyOSUnitTestSHIM.TestCase):          # this file must be read with higher permissions          pam_radius_auth_conf = cmd('sudo cat /etc/pam_radius_auth.conf') -        tmp = re.findall(r'\n?\[{}\]:{}\s+{}\s+{}\s+\[{}\]'.format(radius_server, -                        radius_port, radius_key, radius_timeout, -                        radius_source), pam_radius_auth_conf) -        self.assertTrue(tmp) + +        for radius_server in radius_servers: +            if is_ipv6(radius_server): +                # it is essential to escape the [] brackets when searching with a regex +                radius_server = rf'\[{radius_server}\]' +            tmp = re.findall(rf'\n?{radius_server}:{default_port}\s+{radius_key}\s+{default_timeout}\s+{radius_source}', pam_radius_auth_conf) +            self.assertTrue(tmp)          # required, static options          self.assertIn('priv-lvl 15', pam_radius_auth_conf) @@ -347,6 +387,27 @@ class TestSystemLogin(VyOSUnitTestSHIM.TestCase):          tmp = re.findall(r'group:\s+mapname\s+files', nsswitch_conf)          self.assertTrue(tmp) +        # Login with proper credentials +        out, err = self.ssh_send_cmd(ssh_test_command, username, password) +        # verify login +        self.assertFalse(err) +        self.assertEqual(out, self.ssh_test_command_result) + +        # Login with invalid credentials +        with self.assertRaises(paramiko.ssh_exception.AuthenticationException): +            _, _ = self.ssh_send_cmd(ssh_test_command, username, f'{password}1') + +        # Remove RADIUS configuration +        self.cli_delete(base_path + ['radius']) +        # Remove RADIUS container +        self.cli_delete(radius_container_path) +        # Remove dummy interface +        self.cli_delete(['interfaces', 'dummy', dummy_if]) +        self.cli_commit() + +        # Remove rendered tac_plus daemon configuration +        shutil.rmtree(radius_config) +      def test_system_login_max_login_session(self):          max_logins = '2'          timeout = '600' @@ -390,12 +451,6 @@ class TestSystemLogin(VyOSUnitTestSHIM.TestCase):          tmpl = jinja2.Template(TAC_PLUS_TMPL_SRC)          write_file(f'{tac_plus_config}/tac_plus.cfg', tmpl.render(tac_test_user)) -        # Check if SSH service is running -        ssh_running = process_named_running(SSH_PROCESS_NAME) -        if not ssh_running: -            # Start SSH service -            self.cli_set(['service', 'ssh']) -          # Start tac_plus container          self.cli_set(tac_container_path + ['allow-host-networks'])          self.cli_set(tac_container_path + ['image', tac_image]) @@ -450,15 +505,14 @@ class TestSystemLogin(VyOSUnitTestSHIM.TestCase):              self.assertIn(f'server={server}', nss_tacacs_conf)          # Login with proper credentials -        test_command = 'uname -a' -        out, err = self.ssh_send_cmd(test_command, username, password) +        out, err = self.ssh_send_cmd(ssh_test_command, username, password)          # verify login          self.assertFalse(err) -        self.assertEqual(out, cmd(test_command)) +        self.assertEqual(out, self.ssh_test_command_result)          # Login with invalid credentials          with self.assertRaises(paramiko.ssh_exception.AuthenticationException): -            _, _ = self.ssh_send_cmd(test_command, username, f'{password}1') +            _, _ = self.ssh_send_cmd(ssh_test_command, username, f'{password}1')          # Remove TACACS configuration          self.cli_delete(base_path + ['tacacs']) @@ -471,10 +525,6 @@ class TestSystemLogin(VyOSUnitTestSHIM.TestCase):          # Remove rendered tac_plus daemon configuration          shutil.rmtree(tac_plus_config) -        # Stop SSH service if it was not running before -        if not ssh_running: -            self.cli_delete(['service', 'ssh']) -      def test_delete_current_user(self):          current_user = get_current_user()  | 
