diff options
author | Takeru Hayasaka <hayatake396@gmail.com> | 2024-12-28 19:58:02 +0000 |
---|---|---|
committer | Christian Breunig <christian@breunig.cc> | 2025-05-29 13:57:48 +0200 |
commit | 6c3b1ef2fede1e3c2b6e89060d3d645c2ba744cd (patch) | |
tree | 6c3c060d70a3d48f5b10709f9be067f6a9d49f33 | |
parent | e604e68a5a77718a25b60737dcb9699b84c8e34b (diff) | |
download | vyos-1x-6c3b1ef2fede1e3c2b6e89060d3d645c2ba744cd.tar.gz vyos-1x-6c3b1ef2fede1e3c2b6e89060d3d645c2ba744cd.zip |
ssh: T6013: support SSH AuthorizedPrincipalsFile in use with trusted-user-ca-key
Thisc omplements commit e7cab89f9f81 ("T6013: Add support for configuring
TrustedUserCAKeys in SSH service with local and remote CA keys"). It introduces
a new CLI node per user to support defining the authorized principals used by
any given PKI certificate. It is now possible to associate SSH login users with
their respective principals.
Authored-by: Takeru Hayasaka <hayatake396@gmail.com>
-rw-r--r-- | data/templates/ssh/sshd_config.j2 | 6 | ||||
-rw-r--r-- | interface-definitions/service_ssh.xml.in | 19 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_service_ssh.py | 89 | ||||
-rwxr-xr-x | src/conf_mode/service_ssh.py | 100 |
4 files changed, 196 insertions, 18 deletions
diff --git a/data/templates/ssh/sshd_config.j2 b/data/templates/ssh/sshd_config.j2 index 7e44efae8..d6e31b0f4 100644 --- a/data/templates/ssh/sshd_config.j2 +++ b/data/templates/ssh/sshd_config.j2 @@ -114,3 +114,9 @@ RekeyLimit {{ rekey.data }}M {{ rekey.time + 'M' if rekey.time is vyos_defined } {% if trusted_user_ca_key is vyos_defined %} TrustedUserCAKeys /etc/ssh/trusted_user_ca_key {% endif %} + +{% if trusted_user_ca_key is vyos_defined and trusted_user_ca_key.bind_user is vyos_defined %} +AuthorizedPrincipalsFile /etc/ssh/authorized_principals/%u +{% elif trusted_user_ca_key is vyos_defined %} +AuthorizedPrincipalsFile none +{% endif %} diff --git a/interface-definitions/service_ssh.xml.in b/interface-definitions/service_ssh.xml.in index 14d358c78..2ab9db48b 100644 --- a/interface-definitions/service_ssh.xml.in +++ b/interface-definitions/service_ssh.xml.in @@ -281,6 +281,25 @@ </properties> <children> #include <include/pki/ca-certificate.xml.i> + <tagNode name="bind-user"> + <properties> + <help>user-name</help> + <constraint> + #include <include/constraint/login-username.xml.i> + </constraint> + </properties> + <children> + <leafNode name="principal"> + <properties> + <help>principal-name</help> + <constraint> + #include <include/constraint/login-username.xml.i> + </constraint> + <multi/> + </properties> + </leafNode> + </children> + </tagNode> </children> </node> #include <include/vrf-multi.xml.i> diff --git a/smoketest/scripts/cli/test_service_ssh.py b/smoketest/scripts/cli/test_service_ssh.py index fa08a5b32..db83f14c3 100755 --- a/smoketest/scripts/cli/test_service_ssh.py +++ b/smoketest/scripts/cli/test_service_ssh.py @@ -39,6 +39,7 @@ key_rsa = '/etc/ssh/ssh_host_rsa_key' key_dsa = '/etc/ssh/ssh_host_dsa_key' key_ed25519 = '/etc/ssh/ssh_host_ed25519_key' trusted_user_ca_key = '/etc/ssh/trusted_user_ca_key' +authorized_principals_dir = '/etc/ssh/authorized_principals' def get_config_value(key): @@ -380,18 +381,104 @@ class TestServiceSSH(VyOSUnitTestSHIM.TestCase): trusted_user_ca_key_config = get_config_value('TrustedUserCAKeys') self.assertIn(trusted_user_ca_key, trusted_user_ca_key_config) + authorize_principals_file_config = get_config_value('AuthorizedPrincipalsFile') + self.assertIn('none', authorize_principals_file_config) with open(trusted_user_ca_key, 'r') as file: ca_key_contents = file.read() self.assertIn(ca_root_cert_data, ca_key_contents) - self.cli_delete(base_path + ['trusted-user-ca-key']) + self.cli_delete( + base_path + ['trusted-user-ca-key', 'ca-certificate', ca_cert_name] + ) self.cli_delete(['pki', 'ca', ca_cert_name]) self.cli_commit() # Verify the CA key is removed trusted_user_ca_key_config = get_config_value('TrustedUserCAKeys') self.assertNotIn(trusted_user_ca_key, trusted_user_ca_key_config) + authorize_principals_file_config = get_config_value('AuthorizedPrincipalsFile') + self.assertNotIn('none', authorize_principals_file_config) + + def test_ssh_trusted_user_ca_key_and_bind_user_with_principal(self): + ca_cert_name = 'test_ca' + bind_user = 'test_user' + principals = ['test_principal_alice', 'test_principal_bob'] + test_user = 'ssh_test' + test_pass = 'v2i57DZs8idUwMN3VC92' + + # Create a test user + self.cli_set( + [ + 'system', + 'login', + 'user', + test_user, + 'authentication', + 'plaintext-password', + test_pass, + ] + ) + + # set pki ca <ca_cert_name> certificate <ca_key_data> + # set service ssh trusted-user-ca-key ca-certificate <ca_cert_name> + # set service ssh trusted-user-ca-key bind-user <bind_user> principal <principals> + self.cli_set( + pki_path + + [ + 'ca', + ca_cert_name, + 'certificate', + ca_root_cert_data.replace('\n', ''), + ] + ) + self.cli_set( + base_path + ['trusted-user-ca-key', 'ca-certificate', ca_cert_name] + ) + for principal in principals: + self.cli_set( + base_path + + [ + 'trusted-user-ca-key', + 'bind-user', + bind_user, + 'principal', + principal, + ] + ) + self.cli_commit() + + trusted_user_ca_key_config = get_config_value('TrustedUserCAKeys') + self.assertIn(trusted_user_ca_key, trusted_user_ca_key_config) + authorized_principals_file = f'{authorized_principals_dir}/{bind_user}' + self.assertTrue(os.path.exists(authorized_principals_file)) + + with open(authorized_principals_file, 'r') as file: + authorized_principals = file.read() + for principal in principals: + self.assertIn(principal, authorized_principals) + + for principal in principals: + self.cli_delete( + base_path + + [ + 'trusted-user-ca-key', + 'bind-user', + bind_user, + 'principal', + principal, + ] + ) + + self.cli_delete( + base_path + ['trusted-user-ca-key', 'ca-certificate', ca_cert_name] + ) + self.cli_delete(['pki', 'ca', ca_cert_name]) + self.cli_delete(['system', 'login', 'user', test_user]) + self.cli_commit() + + # Verify the authorized principals file is removed + self.assertFalse(os.path.exists(authorized_principals_file)) if __name__ == '__main__': diff --git a/src/conf_mode/service_ssh.py b/src/conf_mode/service_ssh.py index 759f87bb2..5e6a6f771 100755 --- a/src/conf_mode/service_ssh.py +++ b/src/conf_mode/service_ssh.py @@ -45,6 +45,77 @@ key_dsa = '/etc/ssh/ssh_host_dsa_key' key_ed25519 = '/etc/ssh/ssh_host_ed25519_key' trusted_user_ca_key = '/etc/ssh/trusted_user_ca_key' +authorized_principals = '/etc/ssh/authorized_principals' + + +def cleanup_authorized_principals_dir(valid_users: list[str]): + if not os.path.isdir(authorized_principals): + return + + # Check the files (user name) under the directory and delete unnecessary ones. + for filename in os.listdir(authorized_principals): + file_path = os.path.join(authorized_principals, filename) + if os.path.isfile(file_path) and filename not in valid_users: + os.remove(file_path) + + # If the directory is empty, delete it too + if not os.listdir(authorized_principals): + os.rmdir(authorized_principals) + + +def handle_trusted_user_ca_key(ssh: dict): + if 'trusted_user_ca_key' not in ssh: + if os.path.exists(trusted_user_ca_key): + os.unlink(trusted_user_ca_key) + + # remove authorized_principals directory if it exists + cleanup_authorized_principals_dir([]) + return + + # trusted_user_ca_key is present + ca_key_name = ssh['trusted_user_ca_key']['ca_certificate'] + pki_ca_cert = ssh['pki']['ca'][ca_key_name] + + loaded_ca_cert = load_certificate(pki_ca_cert['certificate']) + loaded_ca_certs = { + load_certificate(c['certificate']) + for c in ssh['pki']['ca'].values() + if 'certificate' in c + } + + ca_full_chain = find_chain(loaded_ca_cert, loaded_ca_certs) + write_file( + trusted_user_ca_key, '\n'.join(encode_certificate(c) for c in ca_full_chain) + ) + + if 'bind-user' not in ssh['trusted_user_ca_key']: + # remove authorized_principals directory if it exists + cleanup_authorized_principals_dir([]) + return + + # bind-user is present + configured_users = [] + for bind_user, bind_user_config in ssh['trusted_user_ca_key']['bind-user'].items(): + if bind_user not in ssh['login_users']: + raise ConfigError(f"User '{bind_user}' not found in system login users") + + if 'principal' not in bind_user_config: + raise ConfigError(f"Principal not found for user '{bind_user}'") + + principals = bind_user_config['principal'] + if isinstance(principals, str): + principals = [principals] + + if not os.path.isdir(authorized_principals): + os.makedirs(authorized_principals, exist_ok=True) + + principal_file = os.path.join(authorized_principals, bind_user) + contents = '\n'.join(principals) + '\n' + write_file(principal_file, contents) + configured_users.append(bind_user) + + # remove unnecessary files under authorized_principals directory + cleanup_authorized_principals_dir(configured_users) def get_config(config=None): @@ -59,7 +130,15 @@ def get_config(config=None): ssh = conf.get_config_dict( base, key_mangling=('-', '_'), get_first_key=True, with_pki=True ) + login_users_base = ['system', 'login', 'user'] + login_users = conf.get_config_dict( + login_users_base, + key_mangling=('-', '_'), + no_tag_node_value_mangle=True, + get_first_key=True, + ) + # create a list of all users, cli and users tmp = is_node_changed(conf, base + ['vrf']) if tmp: ssh.update({'restart_required': {}}) @@ -71,6 +150,9 @@ def get_config(config=None): # pass config file path - used in override template ssh['config_file'] = config_file + # use for trusted ca + ssh['login_users'] = login_users + # Ignore default XML values if config doesn't exists # Delete key from dict if not conf.exists(base + ['dynamic-protection']): @@ -119,23 +201,7 @@ def generate(ssh): syslog(LOG_INFO, 'SSH ed25519 host key not found, generating new key!') call(f'ssh-keygen -q -N "" -t ed25519 -f {key_ed25519}') - if 'trusted_user_ca_key' in ssh: - ca_key_name = ssh['trusted_user_ca_key']['ca_certificate'] - pki_ca_cert = ssh['pki']['ca'][ca_key_name] - - loaded_ca_cert = load_certificate(pki_ca_cert['certificate']) - loaded_ca_certs = { - load_certificate(c['certificate']) - for c in ssh['pki']['ca'].values() - if 'certificate' in c - } - - ca_full_chain = find_chain(loaded_ca_cert, loaded_ca_certs) - write_file( - trusted_user_ca_key, '\n'.join(encode_certificate(c) for c in ca_full_chain) - ) - elif os.path.exists(trusted_user_ca_key): - os.unlink(trusted_user_ca_key) + handle_trusted_user_ca_key(ssh) render(config_file, 'ssh/sshd_config.j2', ssh) |