From 6c3b1ef2fede1e3c2b6e89060d3d645c2ba744cd Mon Sep 17 00:00:00 2001 From: Takeru Hayasaka Date: Sat, 28 Dec 2024 19:58:02 +0000 Subject: ssh: T6013: support SSH AuthorizedPrincipalsFile in use with trusted-user-ca-key Thisc omplements commit e7cab89f9f81 ("T6013: Add support for configuring TrustedUserCAKeys in SSH service with local and remote CA keys"). It introduces a new CLI node per user to support defining the authorized principals used by any given PKI certificate. It is now possible to associate SSH login users with their respective principals. Authored-by: Takeru Hayasaka --- smoketest/scripts/cli/test_service_ssh.py | 89 ++++++++++++++++++++++++++++++- 1 file changed, 88 insertions(+), 1 deletion(-) (limited to 'smoketest/scripts/cli/test_service_ssh.py') diff --git a/smoketest/scripts/cli/test_service_ssh.py b/smoketest/scripts/cli/test_service_ssh.py index fa08a5b32..db83f14c3 100755 --- a/smoketest/scripts/cli/test_service_ssh.py +++ b/smoketest/scripts/cli/test_service_ssh.py @@ -39,6 +39,7 @@ key_rsa = '/etc/ssh/ssh_host_rsa_key' key_dsa = '/etc/ssh/ssh_host_dsa_key' key_ed25519 = '/etc/ssh/ssh_host_ed25519_key' trusted_user_ca_key = '/etc/ssh/trusted_user_ca_key' +authorized_principals_dir = '/etc/ssh/authorized_principals' def get_config_value(key): @@ -380,18 +381,104 @@ class TestServiceSSH(VyOSUnitTestSHIM.TestCase): trusted_user_ca_key_config = get_config_value('TrustedUserCAKeys') self.assertIn(trusted_user_ca_key, trusted_user_ca_key_config) + authorize_principals_file_config = get_config_value('AuthorizedPrincipalsFile') + self.assertIn('none', authorize_principals_file_config) with open(trusted_user_ca_key, 'r') as file: ca_key_contents = file.read() self.assertIn(ca_root_cert_data, ca_key_contents) - self.cli_delete(base_path + ['trusted-user-ca-key']) + self.cli_delete( + base_path + ['trusted-user-ca-key', 'ca-certificate', ca_cert_name] + ) self.cli_delete(['pki', 'ca', ca_cert_name]) self.cli_commit() # Verify the CA key is removed trusted_user_ca_key_config = get_config_value('TrustedUserCAKeys') self.assertNotIn(trusted_user_ca_key, trusted_user_ca_key_config) + authorize_principals_file_config = get_config_value('AuthorizedPrincipalsFile') + self.assertNotIn('none', authorize_principals_file_config) + + def test_ssh_trusted_user_ca_key_and_bind_user_with_principal(self): + ca_cert_name = 'test_ca' + bind_user = 'test_user' + principals = ['test_principal_alice', 'test_principal_bob'] + test_user = 'ssh_test' + test_pass = 'v2i57DZs8idUwMN3VC92' + + # Create a test user + self.cli_set( + [ + 'system', + 'login', + 'user', + test_user, + 'authentication', + 'plaintext-password', + test_pass, + ] + ) + + # set pki ca certificate + # set service ssh trusted-user-ca-key ca-certificate + # set service ssh trusted-user-ca-key bind-user principal + self.cli_set( + pki_path + + [ + 'ca', + ca_cert_name, + 'certificate', + ca_root_cert_data.replace('\n', ''), + ] + ) + self.cli_set( + base_path + ['trusted-user-ca-key', 'ca-certificate', ca_cert_name] + ) + for principal in principals: + self.cli_set( + base_path + + [ + 'trusted-user-ca-key', + 'bind-user', + bind_user, + 'principal', + principal, + ] + ) + self.cli_commit() + + trusted_user_ca_key_config = get_config_value('TrustedUserCAKeys') + self.assertIn(trusted_user_ca_key, trusted_user_ca_key_config) + authorized_principals_file = f'{authorized_principals_dir}/{bind_user}' + self.assertTrue(os.path.exists(authorized_principals_file)) + + with open(authorized_principals_file, 'r') as file: + authorized_principals = file.read() + for principal in principals: + self.assertIn(principal, authorized_principals) + + for principal in principals: + self.cli_delete( + base_path + + [ + 'trusted-user-ca-key', + 'bind-user', + bind_user, + 'principal', + principal, + ] + ) + + self.cli_delete( + base_path + ['trusted-user-ca-key', 'ca-certificate', ca_cert_name] + ) + self.cli_delete(['pki', 'ca', ca_cert_name]) + self.cli_delete(['system', 'login', 'user', test_user]) + self.cli_commit() + + # Verify the authorized principals file is removed + self.assertFalse(os.path.exists(authorized_principals_file)) if __name__ == '__main__': -- cgit v1.2.3 From 4b4bbd73b84c2c478c7752f58e7f66ec6d90459e Mon Sep 17 00:00:00 2001 From: Christian Breunig Date: Tue, 20 May 2025 19:57:24 +0200 Subject: ssh: T6013: rename trusted-user-ca-key -> truster-user-ca The current implementation for SSH CA based authentication uses "set service ssh trusted-user-ca-key ca-certificate " to define an X.509 certificate from "set pki ca ..." - fun fact, native OpenSSH does not support X.509 certificates and only runs with OpenSSH ssh-keygen generated RSA or EC keys. This commit changes the bahavior to support antive certificates generated using ssh-keygen and loaded to our PKI tree. As the previous implementation did not work at all, no migrations cript is used. --- data/templates/ssh/sshd_config.j2 | 6 +- interface-definitions/service_ssh.xml.in | 16 +- python/vyos/configverify.py | 19 ++ python/vyos/defaults.py | 8 +- python/vyos/template.py | 20 +- python/vyos/utils/file.py | 21 ++- smoketest/scripts/cli/base_vyostest_shim.py | 6 +- smoketest/scripts/cli/test_service_ssh.py | 278 +++++++++++++++------------- src/conf_mode/pki.py | 5 +- src/conf_mode/service_ssh.py | 54 ++---- src/conf_mode/system_login.py | 23 ++- src/tests/test_template.py | 5 + 12 files changed, 269 insertions(+), 192 deletions(-) (limited to 'smoketest/scripts/cli/test_service_ssh.py') diff --git a/data/templates/ssh/sshd_config.j2 b/data/templates/ssh/sshd_config.j2 index dce679936..1315bf2cb 100644 --- a/data/templates/ssh/sshd_config.j2 +++ b/data/templates/ssh/sshd_config.j2 @@ -111,17 +111,17 @@ ClientAliveInterval {{ client_keepalive_interval }} RekeyLimit {{ rekey.data }}M {{ rekey.time + 'M' if rekey.time is vyos_defined }} {% endif %} -{% if trusted_user_ca_key is vyos_defined %} +{% if trusted_user_ca is vyos_defined %} # Specifies a file containing public keys of certificate authorities that are # trusted to sign user certificates for authentication -TrustedUserCAKeys /etc/ssh/trusted_user_ca_key +TrustedUserCAKeys {{ get_default_config_file('sshd_user_ca') }} # The default is "none", i.e. not to use a principals file - in this case, the # username of the user must appear in a certificate's principals list for it # to be accepted. ".ssh/authorized_principals" means a per-user configuration, # relative to $HOME. {% set filename = 'none' %} -{% if trusted_user_ca_key.has_principals is vyos_defined %} +{% if has_principals is vyos_defined %} {% set filename = '.ssh/authorized_principals' %} {% endif %} AuthorizedPrincipalsFile {{ filename }} diff --git a/interface-definitions/service_ssh.xml.in b/interface-definitions/service_ssh.xml.in index 14d358c78..c659a7db7 100644 --- a/interface-definitions/service_ssh.xml.in +++ b/interface-definitions/service_ssh.xml.in @@ -275,14 +275,18 @@ - + - Trusted user CA key + OpenSSH trusted user CA + + pki openssh + + + txt + OpenSSH certificate name from PKI subsystem + - - #include - - + #include diff --git a/python/vyos/configverify.py b/python/vyos/configverify.py index d5f443f15..07eb29a68 100644 --- a/python/vyos/configverify.py +++ b/python/vyos/configverify.py @@ -527,6 +527,25 @@ def verify_pki_dh_parameters(config: dict, dh_name: str, min_key_size: int=0): if dh_bits < min_key_size: raise ConfigError(f'Minimum DH key-size is {min_key_size} bits!') +def verify_pki_openssh_key(config: dict, key_name: str): + """ + Common helper function user by PKI consumers to perform recurring + validation functions on OpenSSH keys + """ + if 'pki' not in config: + raise ConfigError('PKI is not configured!') + + if 'openssh' not in config['pki']: + raise ConfigError('PKI does not contain any OpenSSH keys!') + + if key_name not in config['pki']['openssh']: + raise ConfigError(f'OpenSSH key "{key_name}" not found in configuration!') + + if 'public' in config['pki']['openssh'][key_name]: + if not {'key', 'type'} <= set(config['pki']['openssh'][key_name]['public']): + raise ConfigError('Both public key and type must be defined for '\ + f'OpenSSH public key "{key_name}"!') + def verify_eapol(config: dict): """ Common helper function used by interface implementations to perform diff --git a/python/vyos/defaults.py b/python/vyos/defaults.py index fbde0298b..e42d92112 100644 --- a/python/vyos/defaults.py +++ b/python/vyos/defaults.py @@ -53,6 +53,10 @@ internal_ports = { 'certbot_haproxy' : 65080, # Certbot running behing haproxy } +config_files = { + 'sshd_user_ca' : '/run/sshd/trusted_user_ca', +} + config_status = '/tmp/vyos-config-status' api_config_state = '/run/http-api-state' frr_debug_enable = '/tmp/vyos.frr.debug' @@ -69,8 +73,8 @@ config_default = os.path.join(directories['data'], 'config.boot.default') rt_symbolic_names = { # Standard routing tables for Linux & reserved IDs for VyOS - 'default': 253, # Confusingly, a final fallthru, not the default. - 'main': 254, # The actual global table used by iproute2 unless told otherwise. + 'default': 253, # Confusingly, a final fallthru, not the default. + 'main': 254, # The actual global table used by iproute2 unless told otherwise. 'local': 255, # Special kernel loopback table. } diff --git a/python/vyos/template.py b/python/vyos/template.py index aa215db95..bf7928914 100755 --- a/python/vyos/template.py +++ b/python/vyos/template.py @@ -1079,7 +1079,7 @@ def vyos_defined(value, test_value=None, var_type=None): def get_default_port(service): """ Jinja2 plugin to retrieve common service port number from vyos.defaults - class form a Jinja2 template. This removes the need to hardcode, or pass in + class from a Jinja2 template. This removes the need to hardcode, or pass in the data using the general dictionary. Added to remove code complexity and make it easier to read. @@ -1092,3 +1092,21 @@ def get_default_port(service): raise RuntimeError(f'Service "{service}" not found in internal ' \ 'vyos.defaults.internal_ports dict!') return internal_ports[service] + +@register_clever_function('get_default_config_file') +def get_default_config_file(filename): + """ + Jinja2 plugin to retrieve a common configuration file path from + vyos.defaults class from a Jinja2 template. This removes the need to + hardcode, or pass in the data using the general dictionary. + + Added to remove code complexity and make it easier to read. + + Example: + {{ get_default_config_file('certbot_haproxy') }} + """ + from vyos.defaults import config_files + if filename not in config_files: + raise RuntimeError(f'Configuration file "{filename}" not found in '\ + 'internal vyos.defaults.config_files dict!') + return config_files[filename] diff --git a/python/vyos/utils/file.py b/python/vyos/utils/file.py index eaebb57a3..cc46d77d1 100644 --- a/python/vyos/utils/file.py +++ b/python/vyos/utils/file.py @@ -28,22 +28,28 @@ def file_is_persistent(path): absolute = os.path.abspath(os.path.dirname(path)) return re.match(location,absolute) -def read_file(fname, defaultonfailure=None): +def read_file(fname, defaultonfailure=None, sudo=False): """ read the content of a file, stripping any end characters (space, newlines) should defaultonfailure be not None, it is returned on failure to read """ try: - """ Read a file to string """ - with open(fname, 'r') as f: - data = f.read().strip() - return data + # Some files can only be read by root - emulate sudo cat call + if sudo: + from vyos.utils.process import cmd + data = cmd(['sudo', 'cat', fname]) + else: + # If not sudo, just read the file + with open(fname, 'r') as f: + data = f.read() + return data.strip() except Exception as e: if defaultonfailure is not None: return defaultonfailure raise e -def write_file(fname, data, defaultonfailure=None, user=None, group=None, mode=None, append=False): +def write_file(fname, data, defaultonfailure=None, user=None, group=None, + mode=None, append=False, trailing_newline=False): """ Write content of data to given fname, should defaultonfailure be not None, it is returned on failure to read. @@ -60,6 +66,9 @@ def write_file(fname, data, defaultonfailure=None, user=None, group=None, mode=N bytes = 0 with open(fname, 'w' if not append else 'a') as f: bytes = f.write(data) + if trailing_newline and not data.endswith('\n'): + f.write('\n') + bytes += 1 chown(fname, user, group) chmod(fname, mode) return bytes diff --git a/smoketest/scripts/cli/base_vyostest_shim.py b/smoketest/scripts/cli/base_vyostest_shim.py index f0674f187..9b64d5c0e 100644 --- a/smoketest/scripts/cli/base_vyostest_shim.py +++ b/smoketest/scripts/cli/base_vyostest_shim.py @@ -152,12 +152,14 @@ class VyOSUnitTestSHIM: return out @staticmethod - def ssh_send_cmd(command, username, password, hostname='localhost'): + def ssh_send_cmd(command, username, password, key_filename=None, + hostname='localhost'): """ SSH command execution helper """ # Try to login via SSH ssh_client = paramiko.SSHClient() ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - ssh_client.connect(hostname=hostname, username=username, password=password) + ssh_client.connect(hostname=hostname, username=username, + password=password, key_filename=key_filename) _, stdout, stderr = ssh_client.exec_command(command) output = stdout.read().decode().strip() error = stderr.read().decode().strip() diff --git a/smoketest/scripts/cli/test_service_ssh.py b/smoketest/scripts/cli/test_service_ssh.py index db83f14c3..551991d69 100755 --- a/smoketest/scripts/cli/test_service_ssh.py +++ b/smoketest/scripts/cli/test_service_ssh.py @@ -24,10 +24,12 @@ from pwd import getpwall from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSessionError +from vyos.defaults import config_files from vyos.utils.process import cmd from vyos.utils.process import is_systemd_service_running from vyos.utils.process import process_named_running from vyos.utils.file import read_file +from vyos.utils.file import write_file from vyos.xml_ref import default_value PROCESS_NAME = 'sshd' @@ -38,27 +40,101 @@ pki_path = ['pki'] key_rsa = '/etc/ssh/ssh_host_rsa_key' key_dsa = '/etc/ssh/ssh_host_dsa_key' key_ed25519 = '/etc/ssh/ssh_host_ed25519_key' -trusted_user_ca_key = '/etc/ssh/trusted_user_ca_key' -authorized_principals_dir = '/etc/ssh/authorized_principals' - +trusted_user_ca = config_files['sshd_user_ca'] +test_command = 'uname -a' def get_config_value(key): tmp = read_file(SSHD_CONF) tmp = re.findall(f'\n?{key}\s+(.*)', tmp) return tmp +trusted_user_ca_path = base_path + ['trusted-user-ca'] +# CA and signed user key generated using: +# ssh-keygen -f vyos-ssh-ca.key +# ssh-keygen -f vyos_testca -C "vyos_tesca@vyos.net" +# ssh-keygen -s vyos-ssh-ca.key -I vyos_testca@vyos.net -n vyos,vyos_testca -V +520w vyos_testca.pub +ca_cert_data = """ +AAAAB3NzaC1yc2EAAAADAQABAAABgQCTBa7+TTefsMLTHuuLPUmmm7SGAuoK03oZEIi2/O +sww1uhCdKrm7bFvSUFpWvq3gX8TSS+yO5kNKz3BTMBu7oq01/Ewjyw0jR+fUog76x7mCzd +2iI4QmPj4lNHSUFquaELt2aBwY4f7LtjxRCCgtWgirq/Qk+P27uJKErvndyYc95v9no15z +lQFSdUid6tF8IjYljK8pXP0JshFp3XnFV2Rg80j7O66mRtVFC4tt2vluyIFeIID+5fL03v +LXbT/2zNdoH6QiI9NGWkxhS7zFYziVd/rzG5xlEB1ezs2Sz4zjMPgV3GiMINb6tjEWNJhM +KtDWIt+3UDpx+2T9PrhDBDFMlneiHCD6MxRv2sLbicevSj0PV7/fRnwoHs6hDKCU5eS2Mc +CTxXr4jaboLZ6q3sbGHCHZo/PuA8Sl9iZCM4GCxx5bgvRRmGpgZv4PfFzA2b/wTHkKnf6E +kuthoAJufmNxPaZQRQKF34SdmTKgSJTCY1gqwCH2iNg0PVKU+vN8c= +""" -ca_root_cert_data = """ -MIIBcTCCARagAwIBAgIUDcAf1oIQV+6WRaW7NPcSnECQ/lUwCgYIKoZIzj0EAwIw -HjEcMBoGA1UEAwwTVnlPUyBzZXJ2ZXIgcm9vdCBDQTAeFw0yMjAyMTcxOTQxMjBa -Fw0zMjAyMTUxOTQxMjBaMB4xHDAaBgNVBAMME1Z5T1Mgc2VydmVyIHJvb3QgQ0Ew -WTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAAQ0y24GzKQf4aM2Ir12tI9yITOIzAUj -ZXyJeCmYI6uAnyAMqc4Q4NKyfq3nBi4XP87cs1jlC1P2BZ8MsjL5MdGWozIwMDAP -BgNVHRMBAf8EBTADAQH/MB0GA1UdDgQWBBRwC/YaieMEnjhYa7K3Flw/o0SFuzAK -BggqhkjOPQQDAgNJADBGAiEAh3qEj8vScsjAdBy5shXzXDVVOKWCPTdGrPKnu8UW -a2cCIQDlDgkzWmn5ujc5ATKz1fj+Se/aeqwh4QyoWCVTFLIxhQ== +cert_user_key = """-----BEGIN OPENSSH PRIVATE KEY----- +b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAABlwAAAAdzc2gtcn +NhAAAAAwEAAQAAAYEArnIlFpMwSQax7+qH3+/gbv65mem6Ur+gepNYC8TYaE91xJxMoE5M +Pyh1s8Kr/WYNF6aN43qdDnjvGy38oFng4lEfxG475AqpTIGmP4GvEOlnNLhjCcOHrOFuzg +uRtDDvn0/TPhdqLTlbvgZ326WO7xQkCX11qmdGUUtC9Byd7p+EmnTe0oP8N6MeyYY78qa4 +HnzMd6EPb3vyWdASpPZjQE0OJCeAx6Mne2kOnKxUcW1UlczOa1PPIQMU+Rp1PWDtkdiYAd +nbTbIdxDN8Bn3mC3JXD642EcwXSJ1+kov/8u8bBuYNt3t3nf/krSebx4Ge7ObYnURj31j0 +8L8Vv3fgv+T7pY8iyMh8dYfrZPAWQGN1pe8ZkDaM1QGKJncF+8N0UB4EVFBHNLt7W8+oHt +LPMqYw13djZHg5Q1NxSxc1srOmEBZrWCBZgDGGiqtKo+lF+oVvqvBh/hncOBlDX5RFM8qw +Qt4mem9TEZZrIvC9q1dcVpQUrt8BvBOSnGnBb7yTAAAFkEdBIUlHQSFJAAAAB3NzaC1yc2 +EAAAGBAK5yJRaTMEkGse/qh9/v4G7+uZnpulK/oHqTWAvE2GhPdcScTKBOTD8odbPCq/1m +DRemjeN6nQ547xst/KBZ4OJRH8RuO+QKqUyBpj+BrxDpZzS4YwnDh6zhbs4LkbQw759P0z +4Xai05W74Gd9ulju8UJAl9dapnRlFLQvQcne6fhJp03tKD/DejHsmGO/KmuB58zHehD297 +8lnQEqT2Y0BNDiQngMejJ3tpDpysVHFtVJXMzmtTzyEDFPkadT1g7ZHYmAHZ202yHcQzfA +Z95gtyVw+uNhHMF0idfpKL//LvGwbmDbd7d53/5K0nm8eBnuzm2J1EY99Y9PC/Fb934L/k ++6WPIsjIfHWH62TwFkBjdaXvGZA2jNUBiiZ3BfvDdFAeBFRQRzS7e1vPqB7SzzKmMNd3Y2 +R4OUNTcUsXNbKzphAWa1ggWYAxhoqrSqPpRfqFb6rwYf4Z3DgZQ1+URTPKsELeJnpvUxGW +ayLwvatXXFaUFK7fAbwTkpxpwW+8kwAAAAMBAAEAAAGAEeZQe+0vyoPPWkjRwbQBbszgX9 +9QaRE/TD82N5mZLbWJkK+2WnSY9O9tNGbIncBiSNz5ji/p/FmDCgzr8SAyfRvJ4K6sTTfy +1eYvwtscYDsy2ywDAuDMrnvrPLqJ1tghSP2N4BR9ppT4yZosTkjB+TIzMxjBLB0GEBgNj1 +19rxswe2YmlFSgBVgi3pbRgT0uLfgBmvzXHUoLPL/8ScT7u4Csmh/GN7Xmuo5gcMnArcAu +1Q17g3PJZcpv1Ser2VfKnVAwrURCLW8dlji5xat/3E/PLsrLvszVS6U0hFf3MaOixprxsz +wc0n2Y4lAgkgkCZQ0Ty9TSXI/8TQWL8cPFej1TK15NWXlfElZxI+lhwcsnWmNy3mXD746/ +YZLH+OCs9isvewZWryQEkdVCU42MM/7L4Hoeqh2diGDV9wtKDW5FjHq/VRNOMVt59eCFlv +eujh89/KY6wPxHoDoY3+olhggiKDGw1wUUpEXKNQhhTjx1g0xn7AFYz+Bp2svM9EdhAAAA +wQDBq+zeOhsS/VrrVRkmOYYXnBSe0WcckjcYOly/8FLTPkq19aVY5eOmo6teegqvkWscGP +Wisl7DW+kFNolIvwc6shf/8+PXC1KlADd9S1uoXvSmVoe3wSsIKRCsUuLZiiJkv4nqQ/BK +T6ijvNG2Wu3YGsP8Tj+OcTebqk1vDItaickhKtFxCx6PBcV+RrDeK1TT6uAHd1AsGikTva +V/BDMmtoDz7qFQbj9Vj2np88MakxYfm7u4DzKu082GHDBC44sAAADBAN8ATvmmfxqk5GFg ++2rbIW+qMJ2GwWXiTFLjH7u4HEhsmHbHYsQ0v+cGu2dKfBUVWoq/N2ltDQ0QYTgkmsxKvm +I8AjVhLHhFB1DtPBMHibsF/rtBRgsItR+PveUtRYOmeY1PzJ3ygVNJpPJ87st0T4JVNQiE ++bFEhnJ/RcTHxzAAt8+gTn0PTen3+hn9Jk2YFHWFb51YDw2h00LL9XT9Enz4xkc6gTPL3M +0IKULJWnyYGOLueSsQxJiaAUcsZg8W2QAAAMEAyEJ45HtbUqZ5xd2K5ZfY8cd1dC9uAx6a +cSdENUvMW4yE3QEJ4xdonDUn9OQYR7GpseQWuXBrTO2PSsse7P6eHUsRhaUkFOvLzHSVzO +bI9HDJAq6+KCPhm2eixfBiMs2meEle8MvNiiONwaY3JnPnGdsTpEjcm6oulyC52xRvHhvc +nCuoRTqX7xcIka4jCXInYBS7GhlF5iAmIAAVkvfWjjNwZ3S0mnGUUOYgknidBhK+x0zCWt +IXOeoIfjb/C4NLAAAAE3Z5b3NfdGVzY2FAdnlvcy5uZXQBAgMEBQYH +-----END OPENSSH PRIVATE KEY----- """ +cert_user_signed = """ +ssh-rsa-cert-v01@openssh.com AAAAHHNzaC1yc2EtY2VydC12MDFAb3BlbnNzaC5jb2 +0AAAAglE+kjRPqsck/y2ywO+owv1FTeU6QFNPywFqD8aoEcA8AAAADAQABAAABgQCuciUWk +zBJBrHv6off7+Bu/rmZ6bpSv6B6k1gLxNhoT3XEnEygTkw/KHWzwqv9Zg0Xpo3jep0OeO8b +LfygWeDiUR/EbjvkCqlMgaY/ga8Q6Wc0uGMJw4es4W7OC5G0MO+fT9M+F2otOVu+BnfbpY7 +vFCQJfXWqZ0ZRS0L0HJ3un4SadN7Sg/w3ox7JhjvyprgefMx3oQ9ve/JZ0BKk9mNATQ4kJ4 +DHoyd7aQ6crFRxbVSVzM5rU88hAxT5GnU9YO2R2JgB2dtNsh3EM3wGfeYLclcPrjYRzBdIn +X6Si//y7xsG5g23e3ed/+StJ5vHgZ7s5tidRGPfWPTwvxW/d+C/5PuljyLIyHx1h+tk8BZA +Y3Wl7xmQNozVAYomdwX7w3RQHgRUUEc0u3tbz6ge0s8ypjDXd2NkeDlDU3FLFzWys6YQFmt +YIFmAMYaKq0qj6UX6hW+q8GH+Gdw4GUNflEUzyrBC3iZ6b1MRlmsi8L2rV1xWlBSu3wG8E5 +KcacFvvJMAAAAAAAAAAAAAAAEAAAAUdnlvc190ZXN0Y2FAdnlvcy5uZXQAAAAXAAAABHZ5b +3MAAAALdnlvc190ZXN0Y2EAAAAAaDg66AAAAAB69w9WAAAAAAAAAIIAAAAVcGVybWl0LVgx +MS1mb3J3YXJkaW5nAAAAAAAAABdwZXJtaXQtYWdlbnQtZm9yd2FyZGluZwAAAAAAAAAWcGV +ybWl0LXBvcnQtZm9yd2FyZGluZwAAAAAAAAAKcGVybWl0LXB0eQAAAAAAAAAOcGVybWl0LX +VzZXItcmMAAAAAAAAAAAAAAZcAAAAHc3NoLXJzYQAAAAMBAAEAAAGBAJMFrv5NN5+wwtMe6 +4s9SaabtIYC6grTehkQiLb86zDDW6EJ0qubtsW9JQWla+reBfxNJL7I7mQ0rPcFMwG7uirT +X8TCPLDSNH59SiDvrHuYLN3aIjhCY+PiU0dJQWq5oQu3ZoHBjh/su2PFEIKC1aCKur9CT4/ +bu4koSu+d3Jhz3m/2ejXnOVAVJ1SJ3q0XwiNiWMrylc/QmyEWndecVXZGDzSPs7rqZG1UUL +i23a+W7IgV4ggP7l8vTe8tdtP/bM12gfpCIj00ZaTGFLvMVjOJV3+vMbnGUQHV7OzZLPjOM +w+BXcaIwg1vq2MRY0mEwq0NYi37dQOnH7ZP0+uEMEMUyWd6IcIPozFG/awtuJx69KPQ9Xv9 +9GfCgezqEMoJTl5LYxwJPFeviNpugtnqrexsYcIdmj8+4DxKX2JkIzgYLHHluC9FGYamBm/ +g98XMDZv/BMeQqd/oSS62GgAm5+Y3E9plBFAoXfhJ2ZMqBIlMJjWCrAIfaI2DQ9UpT683xw +AAAZQAAAAMcnNhLXNoYTItNTEyAAABgINZAr9M9ZYWDhhf5uWNkUBKq12OlJ3ImvHg5161P +BAAL6crGS3WzyAs9LerxFcdMJ0gzMgUixR59MgGMAzfN+DjoSmgcLVT0eVoI5GMBkdiq8T5 +h3qjeXTc5BfLJiACbu7tOPhuIsIDreDnCVYmGr2z+rAPaqMETJa4L0submx4DqnahSY0ZSH +WjTrjWCSPIdySh9HUXbpq3tYdNlqmpSY5YzvDmMC46kGMF10G5ycc58asWfUMwLMGsTEt2t +R5DKRDw/iJch3r+L0xLMCSmEXnu6/Gl7Yq1XJdWm9cA1SvDyxEuB4yKIDkunXrPiuPn3zyv +z1a/bY0hvuF+fyL+tRCbmrfOLreHuYh9aFg6e22MoKhrez5wP8Eoy1T+rlQrmlgCRDShBgj +wMMhc+2fdrzTR07Ctnmv339p/SY5wBruzNM9R1mzyEuuJDE6OkKBTI8kuQu6ypGv+bLqSSt +wujcNqOI4Vz61HiOsRSTUa7tA5q4hBwFqq7FB8+N0Ylfa5A== vyos_tesca@vyos.net +""" class TestServiceSSH(VyOSUnitTestSHIM.TestCase): @classmethod @@ -208,23 +284,12 @@ class TestServiceSSH(VyOSUnitTestSHIM.TestCase): # run natively. # # We also try to login as an invalid user - this is not allowed to work. - test_user = 'ssh_test' test_pass = 'v2i57DZs8idUwMN3VC92' - test_command = 'uname -a' self.cli_set(base_path) - self.cli_set( - [ - 'system', - 'login', - 'user', - test_user, - 'authentication', - 'plaintext-password', - test_pass, - ] - ) + self.cli_set(['system', 'login', 'user', test_user, 'authentication', + 'plaintext-password', test_pass]) # commit changes self.cli_commit() @@ -237,9 +302,8 @@ class TestServiceSSH(VyOSUnitTestSHIM.TestCase): # Login with invalid credentials with self.assertRaises(paramiko.ssh_exception.AuthenticationException): - output, error = self.ssh_send_cmd( - test_command, 'invalid_user', 'invalid_password' - ) + output, error = self.ssh_send_cmd(test_command, 'invalid_user', + 'invalid_password') self.cli_delete(['system', 'login', 'user', test_user]) self.cli_commit() @@ -360,126 +424,74 @@ class TestServiceSSH(VyOSUnitTestSHIM.TestCase): tmp_sshd_conf = read_file(SSHD_CONF) self.assertIn(expected, tmp_sshd_conf) - def test_ssh_trusted_user_ca_key(self): + def test_ssh_trusted_user_ca(self): ca_cert_name = 'test_ca' - - # set pki ca certificate - # set service ssh trusted-user-ca-key ca-certificate - self.cli_set( - pki_path - + [ - 'ca', - ca_cert_name, - 'certificate', - ca_root_cert_data.replace('\n', ''), - ] - ) - self.cli_set( - base_path + ['trusted-user-ca-key', 'ca-certificate', ca_cert_name] - ) + public_key_type = 'ssh-rsa' + public_key_data = ca_cert_data.replace('\n', '') + test_user = 'vyos_testca' + principal = 'vyos' + user_auth_base = ['system', 'login', 'user', test_user] + + # create user account + self.cli_set(user_auth_base) + self.cli_set(pki_path + ['openssh', ca_cert_name, 'public', + 'key', public_key_data]) + self.cli_set(pki_path + ['openssh', ca_cert_name, 'public', + 'type', public_key_type]) + self.cli_set(trusted_user_ca_path, value=ca_cert_name) self.cli_commit() - trusted_user_ca_key_config = get_config_value('TrustedUserCAKeys') - self.assertIn(trusted_user_ca_key, trusted_user_ca_key_config) + trusted_user_ca_config = get_config_value('TrustedUserCAKeys') + self.assertIn(trusted_user_ca, trusted_user_ca_config) + authorize_principals_file_config = get_config_value('AuthorizedPrincipalsFile') self.assertIn('none', authorize_principals_file_config) - with open(trusted_user_ca_key, 'r') as file: - ca_key_contents = file.read() - self.assertIn(ca_root_cert_data, ca_key_contents) - - self.cli_delete( - base_path + ['trusted-user-ca-key', 'ca-certificate', ca_cert_name] - ) - self.cli_delete(['pki', 'ca', ca_cert_name]) - self.cli_commit() + ca_key_contents = read_file(trusted_user_ca).lstrip().rstrip() + self.assertIn(f'{public_key_type} {public_key_data}', ca_key_contents) - # Verify the CA key is removed - trusted_user_ca_key_config = get_config_value('TrustedUserCAKeys') - self.assertNotIn(trusted_user_ca_key, trusted_user_ca_key_config) - authorize_principals_file_config = get_config_value('AuthorizedPrincipalsFile') - self.assertNotIn('none', authorize_principals_file_config) + # Verify functionality by logging into the system using signed user key + key_filename = f'/tmp/{test_user}' + write_file(key_filename, cert_user_key, mode=0o600) + write_file(f'{key_filename}-cert.pub', cert_user_signed.replace('\n', '')) - def test_ssh_trusted_user_ca_key_and_bind_user_with_principal(self): - ca_cert_name = 'test_ca' - bind_user = 'test_user' - principals = ['test_principal_alice', 'test_principal_bob'] - test_user = 'ssh_test' - test_pass = 'v2i57DZs8idUwMN3VC92' + # Login with proper credentials + output, error = self.ssh_send_cmd(test_command, test_user, password=None, + key_filename=key_filename) + # Verify login + self.assertFalse(error) + self.assertEqual(output, cmd(test_command)) - # Create a test user - self.cli_set( - [ - 'system', - 'login', - 'user', - test_user, - 'authentication', - 'plaintext-password', - test_pass, - ] - ) - - # set pki ca certificate - # set service ssh trusted-user-ca-key ca-certificate - # set service ssh trusted-user-ca-key bind-user principal - self.cli_set( - pki_path - + [ - 'ca', - ca_cert_name, - 'certificate', - ca_root_cert_data.replace('\n', ''), - ] - ) - self.cli_set( - base_path + ['trusted-user-ca-key', 'ca-certificate', ca_cert_name] - ) - for principal in principals: - self.cli_set( - base_path - + [ - 'trusted-user-ca-key', - 'bind-user', - bind_user, - 'principal', - principal, - ] - ) + # Enable user principal name - logins only allowed if certificate contains + # said principal name + self.cli_set(user_auth_base + ['authentication', 'principal', principal]) self.cli_commit() - trusted_user_ca_key_config = get_config_value('TrustedUserCAKeys') - self.assertIn(trusted_user_ca_key, trusted_user_ca_key_config) - authorized_principals_file = f'{authorized_principals_dir}/{bind_user}' - self.assertTrue(os.path.exists(authorized_principals_file)) - - with open(authorized_principals_file, 'r') as file: - authorized_principals = file.read() - for principal in principals: - self.assertIn(principal, authorized_principals) - - for principal in principals: - self.cli_delete( - base_path - + [ - 'trusted-user-ca-key', - 'bind-user', - bind_user, - 'principal', - principal, - ] - ) - - self.cli_delete( - base_path + ['trusted-user-ca-key', 'ca-certificate', ca_cert_name] - ) + # Verify generated SSH principals + authorized_principals_file = f'/home/{test_user}/.ssh/authorized_principals' + authorized_principals = read_file(authorized_principals_file, sudo=True) + self.assertIn(principal, authorized_principals) + + # Login with proper credentials + output, error = self.ssh_send_cmd(test_command, test_user, password=None, + key_filename=key_filename) + # Verify login + self.assertFalse(error) + self.assertEqual(output, cmd(test_command)) + + self.cli_delete(trusted_user_ca_path) + self.cli_delete(user_auth_base) self.cli_delete(['pki', 'ca', ca_cert_name]) - self.cli_delete(['system', 'login', 'user', test_user]) self.cli_commit() - # Verify the authorized principals file is removed - self.assertFalse(os.path.exists(authorized_principals_file)) + # Verify the CA key is removed + trusted_user_ca_config = get_config_value('TrustedUserCAKeys') + self.assertNotIn(trusted_user_ca, trusted_user_ca_config) + self.assertFalse(os.path.exists(trusted_user_ca)) + authorize_principals_file_config = get_config_value('AuthorizedPrincipalsFile') + self.assertNotIn('none', authorize_principals_file_config) + self.assertFalse(os.path.exists(f'/home/{test_user}/.ssh/authorized_principals')) if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/src/conf_mode/pki.py b/src/conf_mode/pki.py index 14fe86d56..7d01b6642 100755 --- a/src/conf_mode/pki.py +++ b/src/conf_mode/pki.py @@ -64,7 +64,7 @@ sync_search = [ 'path': ['service', 'https'], }, { - 'keys': ['ca_certificate'], + 'keys': ['key'], 'path': ['service', 'ssh'], }, { @@ -418,7 +418,8 @@ def verify(pki): if 'country' in default_values: country = default_values['country'] if len(country) != 2 or not country.isalpha(): - raise ConfigError(f'Invalid default country value. Value must be 2 alpha characters.') + raise ConfigError('Invalid default country value. '\ + 'Value must be 2 alpha characters.') if 'changed' in pki: # if the list is getting longer, we can move to a dict() and also embed the diff --git a/src/conf_mode/service_ssh.py b/src/conf_mode/service_ssh.py index f3c76508b..3d38d940a 100755 --- a/src/conf_mode/service_ssh.py +++ b/src/conf_mode/service_ssh.py @@ -23,14 +23,14 @@ from syslog import LOG_INFO from vyos.config import Config from vyos.configdict import is_node_changed from vyos.configverify import verify_vrf -from vyos.configverify import verify_pki_ca_certificate +from vyos.configverify import verify_pki_openssh_key +from vyos.defaults import config_files from vyos.utils.process import call from vyos.template import render from vyos import ConfigError from vyos import airbag -from vyos.pki import find_chain -from vyos.pki import encode_certificate -from vyos.pki import load_certificate +from vyos.pki import encode_public_key +from vyos.pki import load_openssh_public_key from vyos.utils.dict import dict_search_recursive from vyos.utils.file import write_file @@ -45,7 +45,7 @@ key_rsa = '/etc/ssh/ssh_host_rsa_key' key_dsa = '/etc/ssh/ssh_host_dsa_key' key_ed25519 = '/etc/ssh/ssh_host_ed25519_key' -trusted_user_ca_key = '/etc/ssh/trusted_user_ca_key' +trusted_user_ca = config_files['sshd_user_ca'] def get_config(config=None): if config: @@ -79,9 +79,9 @@ def get_config(config=None): get_first_key=True) for value, _ in dict_search_recursive(tmp, 'principal'): - # Only enable principal handling if SSH trusted-user-ca-key is set - if 'trusted_user_ca_key' in ssh: - ssh['trusted_user_ca_key'].update({'has_principals': {}}) + # Only enable principal handling if SSH trusted-user-ca is set + if 'trusted_user_ca' in ssh: + ssh['has_principals'] = {} # We do only need to execute this code path once as we need to know # if any one of the local users has a principal set or not - this # accounts for the entire system. @@ -97,16 +97,8 @@ def verify(ssh): if 'rekey' in ssh and 'data' not in ssh['rekey']: raise ConfigError('Rekey data is required!') - if 'trusted_user_ca_key' in ssh: - if 'ca_certificate' not in ssh['trusted_user_ca_key']: - raise ConfigError('CA certificate is mandatory when using ' \ - 'trusted-user-ca-key') - - ca_key_name = ssh['trusted_user_ca_key']['ca_certificate'] - verify_pki_ca_certificate(ssh, ca_key_name) - pki_ca_cert = ssh['pki']['ca'][ca_key_name] - if 'certificate' not in pki_ca_cert or not pki_ca_cert['certificate']: - raise ConfigError(f"CA certificate '{ca_key_name}' is not valid or missing") + if 'trusted_user_ca' in ssh: + verify_pki_openssh_key(ssh, ssh['trusted_user_ca']) verify_vrf(ssh) return None @@ -131,23 +123,17 @@ def generate(ssh): syslog(LOG_INFO, 'SSH ed25519 host key not found, generating new key!') call(f'ssh-keygen -q -N "" -t ed25519 -f {key_ed25519}') - if 'trusted_user_ca_key' in ssh: - ca_key_name = ssh['trusted_user_ca_key']['ca_certificate'] - pki_ca_cert = ssh['pki']['ca'][ca_key_name] - - loaded_ca_cert = load_certificate(pki_ca_cert['certificate']) - loaded_ca_certs = { - load_certificate(c['certificate']) - for c in ssh['pki']['ca'].values() - if 'certificate' in c - } - - ca_full_chain = find_chain(loaded_ca_cert, loaded_ca_certs) - write_file(trusted_user_ca_key, - '\n'.join(encode_certificate(c) for c in ca_full_chain)) + if 'trusted_user_ca' in ssh: + key_name = ssh['trusted_user_ca'] + openssh_cert = ssh['pki']['openssh'][key_name] + loaded_ca_cert = load_openssh_public_key(openssh_cert['public']['key'], + openssh_cert['public']['type']) + tmp = encode_public_key(loaded_ca_cert, encoding='OpenSSH', + key_format='OpenSSH') + write_file(trusted_user_ca, tmp, trailing_newline=True) else: - if os.path.exists(trusted_user_ca_key): - os.unlink(trusted_user_ca_key) + if os.path.exists(trusted_user_ca): + os.unlink(trusted_user_ca) render(config_file, 'ssh/sshd_config.j2', ssh) diff --git a/src/conf_mode/system_login.py b/src/conf_mode/system_login.py index 481fdd16e..22b6fcc98 100755 --- a/src/conf_mode/system_login.py +++ b/src/conf_mode/system_login.py @@ -375,14 +375,15 @@ def apply(login): chown(home_dir, user=user, recursive=True) # Generate 2FA/MFA One-Time-Pad configuration + google_auth_file = f'{home_dir}/.google_authenticator' if dict_search('authentication.otp.key', user_config): enable_otp = True - render(f'{home_dir}/.google_authenticator', 'login/pam_otp_ga.conf.j2', + render(google_auth_file, 'login/pam_otp_ga.conf.j2', user_config, permission=0o400, user=user, group='users') else: # delete configuration as it's not enabled for the user - if os.path.exists(f'{home_dir}/.google_authenticator'): - os.remove(f'{home_dir}/.google_authenticator') + if os.path.exists(google_auth_file): + os.unlink(google_auth_file) # Lock/Unlock local user account lock_unlock = '--unlock' @@ -396,6 +397,22 @@ def apply(login): # Disable user to prevent re-login call(f'usermod -s /sbin/nologin {user}') + home_dir = getpwnam(user).pw_dir + # Remove SSH authorized keys file + authorized_keys_file = f'{home_dir}/.ssh/authorized_keys' + if os.path.exists(authorized_keys_file): + os.unlink(authorized_keys_file) + + # Remove SSH authorized principals file + principals_file = f'{home_dir}/.ssh/authorized_principals' + if os.path.exists(principals_file): + os.unlink(principals_file) + + # Remove Google Authenticator file + google_auth_file = f'{home_dir}/.google_authenticator' + if os.path.exists(google_auth_file): + os.unlink(google_auth_file) + # Logout user if he is still logged in if user in list(set([tmp[0] for tmp in users()])): print(f'{user} is logged in, forcing logout!') diff --git a/src/tests/test_template.py b/src/tests/test_template.py index 7cae867a0..4660c0038 100644 --- a/src/tests/test_template.py +++ b/src/tests/test_template.py @@ -192,10 +192,15 @@ class TestVyOSTemplate(TestCase): self.assertIn(IKEv2_DEFAULT, ','.join(ciphers)) def test_get_default_port(self): + from vyos.defaults import config_files from vyos.defaults import internal_ports + with self.assertRaises(RuntimeError): + vyos.template.get_default_config_file('UNKNOWN') with self.assertRaises(RuntimeError): vyos.template.get_default_port('UNKNOWN') + self.assertEqual(vyos.template.get_default_config_file('sshd_user_ca'), + config_files['sshd_user_ca']) self.assertEqual(vyos.template.get_default_port('certbot_haproxy'), internal_ports['certbot_haproxy']) -- cgit v1.2.3