summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--data/templates/login/pam_radius_auth.conf.j24
-rwxr-xr-xdebian/vyos-1x-smoketest.postinst6
-rwxr-xr-xsmoketest/scripts/cli/test_system_login.py210
3 files changed, 138 insertions, 82 deletions
diff --git a/data/templates/login/pam_radius_auth.conf.j2 b/data/templates/login/pam_radius_auth.conf.j2
index 75437ca71..f9b8d5e87 100644
--- a/data/templates/login/pam_radius_auth.conf.j2
+++ b/data/templates/login/pam_radius_auth.conf.j2
@@ -9,7 +9,7 @@
{% if address | is_ipv4 %}
{% set source_address.ipv4 = address %}
{% elif address | is_ipv6 %}
-{% set source_address.ipv6 = "[" + address + "]" %}
+{% set source_address.ipv6 = address %}
{% endif %}
{% endfor %}
{% endif %}
@@ -21,7 +21,7 @@
{% if server | is_ipv4 %}
{{ server }}:{{ options.port }} {{ "%-25s" | format(options.key) }} {{ "%-10s" | format(options.timeout) }} {{ source_address.ipv4 if source_address.ipv4 is vyos_defined }}
{% else %}
-[{{ server }}]:{{ options.port }} {{ "%-25s" | format(options.key) }} {{ "%-10s" | format(options.timeout) }} {{ source_address.ipv6 if source_address.ipv6 is vyos_defined }}
+{{ server | bracketize_ipv6 }}:{{ options.port }} {{ "%-25s" | format(options.key) }} {{ "%-10s" | format(options.timeout) }} {{ source_address.ipv6 if source_address.ipv6 is vyos_defined }}
{% endif %}
{% endfor %}
{% endif %}
diff --git a/debian/vyos-1x-smoketest.postinst b/debian/vyos-1x-smoketest.postinst
index 08d6d7d4f..bff73796c 100755
--- a/debian/vyos-1x-smoketest.postinst
+++ b/debian/vyos-1x-smoketest.postinst
@@ -11,3 +11,9 @@ TACPLUS_PATH="/usr/share/vyos/tacplus-alpine.tar"
if [[ ! -f $TACPLUS_PATH ]]; then
skopeo copy --additional-tag "$TACPLUS_TAG" "docker://$TACPLUS_TAG" "docker-archive:/$TACPLUS_PATH"
fi
+
+RADIUS_TAG="docker.io/dchidell/radius-web:latest"
+RADIUS_PATH="/usr/share/vyos/radius-latest.tar"
+if [[ ! -f $RADIUS_PATH ]]; then
+ skopeo copy --additional-tag "$RADIUS_TAG" "docker://$RADIUS_TAG" "docker-archive:/$RADIUS_PATH"
+fi
diff --git a/smoketest/scripts/cli/test_system_login.py b/smoketest/scripts/cli/test_system_login.py
index f6a2c3cb3..d79f5521c 100755
--- a/smoketest/scripts/cli/test_system_login.py
+++ b/smoketest/scripts/cli/test_system_login.py
@@ -31,17 +31,19 @@ from subprocess import PIPE
from pwd import getpwall
from vyos.configsession import ConfigSessionError
+from vyos.configquery import ConfigTreeQuery
from vyos.utils.auth import get_current_user
from vyos.utils.process import cmd
-from vyos.utils.process import process_named_running
from vyos.utils.file import read_file
from vyos.utils.file import write_file
from vyos.template import inc_ip
+from vyos.template import is_ipv6
+from vyos.xml_ref import default_value
base_path = ['system', 'login']
users = ['vyos1', 'vyos-roxx123', 'VyOS-123_super.Nice']
-SSH_PROCESS_NAME = 'sshd'
+ssh_test_command = '/opt/vyatta/bin/vyatta-op-cmd-wrapper show version'
ssh_pubkey = """
AAAAB3NzaC1yc2EAAAADAQABAAABgQD0NuhUOEtMIKnUVFIHoFatqX/c4mjerXyF
@@ -57,7 +59,6 @@ TTSb0X1zPGxPIRFy5GoGtO9Mm5h4OZk=
tac_image = 'docker.io/lfkeitel/tacacs_plus:alpine'
tac_image_path = '/usr/share/vyos/tacplus-alpine.tar'
-
TAC_PLUS_TMPL_SRC = """
id = spawnd {
debug redirect = /dev/stdout
@@ -100,6 +101,25 @@ id = tac_plus {
member = admin
}
}
+
+"""
+
+radius_image = 'docker.io/dchidell/radius-web:latest'
+radius_image_path = '/usr/share/vyos/radius-latest.tar'
+RADIUS_CLIENTS_TMPL_SRC = """
+client SMOKETEST {
+ secret = {{ radius_key }}
+ nastype = other
+ ipaddr = {{ source_address }}
+}
+
+"""
+RADIUS_USERS_TMPL_SRC = """
+# User configuration
+{{ username }} Cleartext-Password := "{{ password }}"
+ Service-Type = NAS-Prompt-User,
+ Cisco-AVPair = "shell:priv-lvl=15"
+
"""
class TestSystemLogin(VyOSUnitTestSHIM.TestCase):
@@ -112,16 +132,36 @@ class TestSystemLogin(VyOSUnitTestSHIM.TestCase):
cls.cli_delete(cls, base_path + ['radius'])
cls.cli_delete(cls, base_path + ['tacacs'])
- # Load image for smoketest provided in vyos-1x-smoketest
+ # Load images for smoketest provided in vyos-1x-smoketest
if not os.path.exists(tac_image_path):
cls.fail(cls, f'{tac_image} image not available')
cmd(f'sudo podman load -i {tac_image_path}')
+ if not os.path.exists(radius_image_path):
+ cls.fail(cls, f'{radius_image} image not available')
+ cmd(f'sudo podman load -i {radius_image_path}')
+
+ cls.ssh_test_command_result = cls.op_mode(cls, ['show', 'version'])
+
+ # Dynamically start SSH service if it's not running
+ config = ConfigTreeQuery()
+ cls.is_sshd_pre_test = config.exists(['service', 'sshd'])
+ if not cls.is_sshd_pre_test:
+ # Start SSH service
+ cls.cli_set(cls, ['service', 'ssh'])
+
@classmethod
def tearDownClass(cls):
+ # Stop SSH service - if it was not running before starting the test
+ if not cls.is_sshd_pre_test:
+ cls.cli_set(cls, ['service', 'ssh'])
+ cls.cli_commit(cls)
+
super(TestSystemLogin, cls).tearDownClass()
- # Cleanup podman image
+
+ # Cleanup container images
cmd(f'sudo podman image rm -f {tac_image}')
+ cmd(f'sudo podman image rm -f {radius_image}')
def tearDown(self):
# Delete individual users from configuration
@@ -152,9 +192,6 @@ class TestSystemLogin(VyOSUnitTestSHIM.TestCase):
self.cli_delete(base_path + ['user', system_user])
def test_system_login_user(self):
- # Check if user can be created and we can SSH to localhost
- self.cli_set(['service', 'ssh', 'port', '22'])
-
for user in users:
name = f'VyOS Roxx {user}'
home_dir = f'/tmp/smoketest/{user}'
@@ -240,71 +277,71 @@ class TestSystemLogin(VyOSUnitTestSHIM.TestCase):
self.assertIn(f'{option}=y', kernel_config)
def test_system_login_radius_ipv4(self):
- # Verify generated RADIUS configuration files
-
- radius_key = 'VyOSsecretVyOS'
- radius_server = '172.16.100.10'
- radius_source = '127.0.0.1'
- radius_port = '2000'
- radius_timeout = '1'
-
- self.cli_set(base_path + ['radius', 'server', radius_server, 'key', radius_key])
- self.cli_set(base_path + ['radius', 'server', radius_server, 'port', radius_port])
- self.cli_set(base_path + ['radius', 'server', radius_server, 'timeout', radius_timeout])
- self.cli_set(base_path + ['radius', 'source-address', radius_source])
- self.cli_set(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)])
+ radius_servers = ['100.64.0.4', '100.64.0.5']
+ radius_source = '100.64.0.1'
+ self._system_login_radius_test_helper(radius_servers, radius_source)
- # check validate() - Only one IPv4 source-address supported
- with self.assertRaises(ConfigSessionError):
- self.cli_commit()
- self.cli_delete(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)])
-
- self.cli_commit()
+ def test_system_login_radius_ipv6(self):
+ radius_servers = ['2001:db8::4', '2001:db8::5']
+ radius_source = '2001:db8::1'
+ self._system_login_radius_test_helper(radius_servers, radius_source)
- # this file must be read with higher permissions
- pam_radius_auth_conf = cmd('sudo cat /etc/pam_radius_auth.conf')
- tmp = re.findall(r'\n?{}:{}\s+{}\s+{}\s+{}'.format(radius_server,
- radius_port, radius_key, radius_timeout,
- radius_source), pam_radius_auth_conf)
- self.assertTrue(tmp)
+ def _system_login_radius_test_helper(self, radius_servers: list, radius_source: str):
+ # Verify generated RADIUS configuration files
+ radius_key = ''.join(secrets.choice(string.ascii_letters + string.digits) for i in range(10))
- # required, static options
- self.assertIn('priv-lvl 15', pam_radius_auth_conf)
- self.assertIn('mapped_priv_user radius_priv_user', pam_radius_auth_conf)
+ default_port = default_value(base_path + ['radius', 'server', radius_servers[0], 'port'])
+ default_timeout = default_value(base_path + ['radius', 'server', radius_servers[0], 'timeout'])
- # PAM
- pam_common_account = read_file('/etc/pam.d/common-account')
- self.assertIn('pam_radius_auth.so', pam_common_account)
+ dummy_if = 'dum12760'
- pam_common_auth = read_file('/etc/pam.d/common-auth')
- self.assertIn('pam_radius_auth.so', pam_common_auth)
+ # Load container image for FreeRADIUS server
+ radius_config = '/tmp/smoketest-radius-server'
+ radius_container_path = ['container', 'name', 'radius-1']
- pam_common_session = read_file('/etc/pam.d/common-session')
- self.assertIn('pam_radius_auth.so', pam_common_session)
-
- pam_common_session_noninteractive = read_file('/etc/pam.d/common-session-noninteractive')
- self.assertIn('pam_radius_auth.so', pam_common_session_noninteractive)
+ # Generate random string with 10 digits
+ username = 'radius-admin'
+ password = ''.join(secrets.choice(string.ascii_letters + string.digits) for i in range(10))
+ radius_source_mask = '32'
+ if is_ipv6(radius_source):
+ radius_source_mask = '128'
+ radius_test_user = {
+ 'username' : username,
+ 'password' : password,
+ 'radius_key' : radius_key,
+ 'source_address' : f'{radius_source}/{radius_source_mask}'
+ }
- # NSS
- nsswitch_conf = read_file('/etc/nsswitch.conf')
- tmp = re.findall(r'passwd:\s+mapuid\s+files\s+mapname', nsswitch_conf)
- self.assertTrue(tmp)
+ tmpl = jinja2.Template(RADIUS_CLIENTS_TMPL_SRC)
+ write_file(f'{radius_config}/clients.cfg', tmpl.render(radius_test_user))
- tmp = re.findall(r'group:\s+mapname\s+files', nsswitch_conf)
- self.assertTrue(tmp)
+ tmpl = jinja2.Template(RADIUS_USERS_TMPL_SRC)
+ write_file(f'{radius_config}/users', tmpl.render(radius_test_user))
- def test_system_login_radius_ipv6(self):
- # Verify generated RADIUS configuration files
+ # Start tac_plus container
+ self.cli_set(radius_container_path + ['allow-host-networks'])
+ self.cli_set(radius_container_path + ['image', radius_image])
+ self.cli_set(radius_container_path + ['volume', 'clients', 'destination', '/etc/raddb/clients.conf'])
+ self.cli_set(radius_container_path + ['volume', 'clients', 'mode', 'ro'])
+ self.cli_set(radius_container_path + ['volume', 'clients', 'source', f'{radius_config}/clients.cfg'])
+ self.cli_set(radius_container_path + ['volume', 'users', 'destination', '/etc/raddb/users'])
+ self.cli_set(radius_container_path + ['volume', 'users', 'mode', 'ro'])
+ self.cli_set(radius_container_path + ['volume', 'users', 'source', f'{radius_config}/users'])
- radius_key = 'VyOS-VyOS'
- radius_server = '2001:db8::1'
- radius_source = '::1'
- radius_port = '4000'
- radius_timeout = '4'
+ # Start container
+ self.cli_commit()
- self.cli_set(base_path + ['radius', 'server', radius_server, 'key', radius_key])
- self.cli_set(base_path + ['radius', 'server', radius_server, 'port', radius_port])
- self.cli_set(base_path + ['radius', 'server', radius_server, 'timeout', radius_timeout])
+ # Deinfine RADIUS servers
+ for radius_server in radius_servers:
+ # Use this system as "remote" RADIUS server
+ dummy_address_mask = '32'
+ if is_ipv6(radius_server):
+ dummy_address_mask = '128'
+ self.cli_set(['interfaces', 'dummy', dummy_if, 'address', f'{radius_server}/{dummy_address_mask}'])
+ self.cli_set(base_path + ['radius', 'server', radius_server, 'key', radius_key])
+
+ # Define RADIUS traffic source address
+ self.cli_set(['interfaces', 'dummy', dummy_if, 'address', f'{radius_source}/{radius_source_mask}'])
self.cli_set(base_path + ['radius', 'source-address', radius_source])
self.cli_set(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)])
@@ -317,10 +354,13 @@ class TestSystemLogin(VyOSUnitTestSHIM.TestCase):
# this file must be read with higher permissions
pam_radius_auth_conf = cmd('sudo cat /etc/pam_radius_auth.conf')
- tmp = re.findall(r'\n?\[{}\]:{}\s+{}\s+{}\s+\[{}\]'.format(radius_server,
- radius_port, radius_key, radius_timeout,
- radius_source), pam_radius_auth_conf)
- self.assertTrue(tmp)
+
+ for radius_server in radius_servers:
+ if is_ipv6(radius_server):
+ # it is essential to escape the [] brackets when searching with a regex
+ radius_server = rf'\[{radius_server}\]'
+ tmp = re.findall(rf'\n?{radius_server}:{default_port}\s+{radius_key}\s+{default_timeout}\s+{radius_source}', pam_radius_auth_conf)
+ self.assertTrue(tmp)
# required, static options
self.assertIn('priv-lvl 15', pam_radius_auth_conf)
@@ -347,6 +387,27 @@ class TestSystemLogin(VyOSUnitTestSHIM.TestCase):
tmp = re.findall(r'group:\s+mapname\s+files', nsswitch_conf)
self.assertTrue(tmp)
+ # Login with proper credentials
+ out, err = self.ssh_send_cmd(ssh_test_command, username, password)
+ # verify login
+ self.assertFalse(err)
+ self.assertEqual(out, self.ssh_test_command_result)
+
+ # Login with invalid credentials
+ with self.assertRaises(paramiko.ssh_exception.AuthenticationException):
+ _, _ = self.ssh_send_cmd(ssh_test_command, username, f'{password}1')
+
+ # Remove RADIUS configuration
+ self.cli_delete(base_path + ['radius'])
+ # Remove RADIUS container
+ self.cli_delete(radius_container_path)
+ # Remove dummy interface
+ self.cli_delete(['interfaces', 'dummy', dummy_if])
+ self.cli_commit()
+
+ # Remove rendered tac_plus daemon configuration
+ shutil.rmtree(radius_config)
+
def test_system_login_max_login_session(self):
max_logins = '2'
timeout = '600'
@@ -390,12 +451,6 @@ class TestSystemLogin(VyOSUnitTestSHIM.TestCase):
tmpl = jinja2.Template(TAC_PLUS_TMPL_SRC)
write_file(f'{tac_plus_config}/tac_plus.cfg', tmpl.render(tac_test_user))
- # Check if SSH service is running
- ssh_running = process_named_running(SSH_PROCESS_NAME)
- if not ssh_running:
- # Start SSH service
- self.cli_set(['service', 'ssh'])
-
# Start tac_plus container
self.cli_set(tac_container_path + ['allow-host-networks'])
self.cli_set(tac_container_path + ['image', tac_image])
@@ -450,15 +505,14 @@ class TestSystemLogin(VyOSUnitTestSHIM.TestCase):
self.assertIn(f'server={server}', nss_tacacs_conf)
# Login with proper credentials
- test_command = 'uname -a'
- out, err = self.ssh_send_cmd(test_command, username, password)
+ out, err = self.ssh_send_cmd(ssh_test_command, username, password)
# verify login
self.assertFalse(err)
- self.assertEqual(out, cmd(test_command))
+ self.assertEqual(out, self.ssh_test_command_result)
# Login with invalid credentials
with self.assertRaises(paramiko.ssh_exception.AuthenticationException):
- _, _ = self.ssh_send_cmd(test_command, username, f'{password}1')
+ _, _ = self.ssh_send_cmd(ssh_test_command, username, f'{password}1')
# Remove TACACS configuration
self.cli_delete(base_path + ['tacacs'])
@@ -471,10 +525,6 @@ class TestSystemLogin(VyOSUnitTestSHIM.TestCase):
# Remove rendered tac_plus daemon configuration
shutil.rmtree(tac_plus_config)
- # Stop SSH service if it was not running before
- if not ssh_running:
- self.cli_delete(['service', 'ssh'])
-
def test_delete_current_user(self):
current_user = get_current_user()