diff options
author | Christian Breunig <christian@breunig.cc> | 2025-05-20 19:57:24 +0200 |
---|---|---|
committer | Christian Breunig <christian@breunig.cc> | 2025-05-29 14:01:32 +0200 |
commit | 4b4bbd73b84c2c478c7752f58e7f66ec6d90459e (patch) | |
tree | 872749218a0efba4375cad579d617db02b6dac97 | |
parent | d2745a7b60a7fef88958bd52b3876c105da87e77 (diff) | |
download | vyos-1x-4b4bbd73b84c2c478c7752f58e7f66ec6d90459e.tar.gz vyos-1x-4b4bbd73b84c2c478c7752f58e7f66ec6d90459e.zip |
ssh: T6013: rename trusted-user-ca-key -> truster-user-ca
The current implementation for SSH CA based authentication uses "set service
ssh trusted-user-ca-key ca-certificate <foo>" to define an X.509 certificate
from "set pki ca <foo> ..." - fun fact, native OpenSSH does not support X.509
certificates and only runs with OpenSSH ssh-keygen generated RSA or EC keys.
This commit changes the bahavior to support antive certificates generated using
ssh-keygen and loaded to our PKI tree. As the previous implementation
did not work at all, no migrations cript is used.
-rw-r--r-- | data/templates/ssh/sshd_config.j2 | 6 | ||||
-rw-r--r-- | interface-definitions/service_ssh.xml.in | 16 | ||||
-rw-r--r-- | python/vyos/configverify.py | 19 | ||||
-rw-r--r-- | python/vyos/defaults.py | 8 | ||||
-rwxr-xr-x | python/vyos/template.py | 20 | ||||
-rw-r--r-- | python/vyos/utils/file.py | 21 | ||||
-rw-r--r-- | smoketest/scripts/cli/base_vyostest_shim.py | 6 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_service_ssh.py | 278 | ||||
-rwxr-xr-x | src/conf_mode/pki.py | 5 | ||||
-rwxr-xr-x | src/conf_mode/service_ssh.py | 54 | ||||
-rwxr-xr-x | src/conf_mode/system_login.py | 23 | ||||
-rw-r--r-- | src/tests/test_template.py | 5 |
12 files changed, 269 insertions, 192 deletions
diff --git a/data/templates/ssh/sshd_config.j2 b/data/templates/ssh/sshd_config.j2 index dce679936..1315bf2cb 100644 --- a/data/templates/ssh/sshd_config.j2 +++ b/data/templates/ssh/sshd_config.j2 @@ -111,17 +111,17 @@ ClientAliveInterval {{ client_keepalive_interval }} RekeyLimit {{ rekey.data }}M {{ rekey.time + 'M' if rekey.time is vyos_defined }} {% endif %} -{% if trusted_user_ca_key is vyos_defined %} +{% if trusted_user_ca is vyos_defined %} # Specifies a file containing public keys of certificate authorities that are # trusted to sign user certificates for authentication -TrustedUserCAKeys /etc/ssh/trusted_user_ca_key +TrustedUserCAKeys {{ get_default_config_file('sshd_user_ca') }} # The default is "none", i.e. not to use a principals file - in this case, the # username of the user must appear in a certificate's principals list for it # to be accepted. ".ssh/authorized_principals" means a per-user configuration, # relative to $HOME. {% set filename = 'none' %} -{% if trusted_user_ca_key.has_principals is vyos_defined %} +{% if has_principals is vyos_defined %} {% set filename = '.ssh/authorized_principals' %} {% endif %} AuthorizedPrincipalsFile {{ filename }} diff --git a/interface-definitions/service_ssh.xml.in b/interface-definitions/service_ssh.xml.in index 14d358c78..c659a7db7 100644 --- a/interface-definitions/service_ssh.xml.in +++ b/interface-definitions/service_ssh.xml.in @@ -275,14 +275,18 @@ </constraint> </properties> </leafNode> - <node name="trusted-user-ca-key"> + <leafNode name="trusted-user-ca"> <properties> - <help>Trusted user CA key</help> + <help>OpenSSH trusted user CA</help> + <completionHelp> + <path>pki openssh</path> + </completionHelp> + <valueHelp> + <format>txt</format> + <description>OpenSSH certificate name from PKI subsystem</description> + </valueHelp> </properties> - <children> - #include <include/pki/ca-certificate.xml.i> - </children> - </node> + </leafNode> #include <include/vrf-multi.xml.i> </children> </node> diff --git a/python/vyos/configverify.py b/python/vyos/configverify.py index d5f443f15..07eb29a68 100644 --- a/python/vyos/configverify.py +++ b/python/vyos/configverify.py @@ -527,6 +527,25 @@ def verify_pki_dh_parameters(config: dict, dh_name: str, min_key_size: int=0): if dh_bits < min_key_size: raise ConfigError(f'Minimum DH key-size is {min_key_size} bits!') +def verify_pki_openssh_key(config: dict, key_name: str): + """ + Common helper function user by PKI consumers to perform recurring + validation functions on OpenSSH keys + """ + if 'pki' not in config: + raise ConfigError('PKI is not configured!') + + if 'openssh' not in config['pki']: + raise ConfigError('PKI does not contain any OpenSSH keys!') + + if key_name not in config['pki']['openssh']: + raise ConfigError(f'OpenSSH key "{key_name}" not found in configuration!') + + if 'public' in config['pki']['openssh'][key_name]: + if not {'key', 'type'} <= set(config['pki']['openssh'][key_name]['public']): + raise ConfigError('Both public key and type must be defined for '\ + f'OpenSSH public key "{key_name}"!') + def verify_eapol(config: dict): """ Common helper function used by interface implementations to perform diff --git a/python/vyos/defaults.py b/python/vyos/defaults.py index fbde0298b..e42d92112 100644 --- a/python/vyos/defaults.py +++ b/python/vyos/defaults.py @@ -53,6 +53,10 @@ internal_ports = { 'certbot_haproxy' : 65080, # Certbot running behing haproxy } +config_files = { + 'sshd_user_ca' : '/run/sshd/trusted_user_ca', +} + config_status = '/tmp/vyos-config-status' api_config_state = '/run/http-api-state' frr_debug_enable = '/tmp/vyos.frr.debug' @@ -69,8 +73,8 @@ config_default = os.path.join(directories['data'], 'config.boot.default') rt_symbolic_names = { # Standard routing tables for Linux & reserved IDs for VyOS - 'default': 253, # Confusingly, a final fallthru, not the default. - 'main': 254, # The actual global table used by iproute2 unless told otherwise. + 'default': 253, # Confusingly, a final fallthru, not the default. + 'main': 254, # The actual global table used by iproute2 unless told otherwise. 'local': 255, # Special kernel loopback table. } diff --git a/python/vyos/template.py b/python/vyos/template.py index aa215db95..bf7928914 100755 --- a/python/vyos/template.py +++ b/python/vyos/template.py @@ -1079,7 +1079,7 @@ def vyos_defined(value, test_value=None, var_type=None): def get_default_port(service): """ Jinja2 plugin to retrieve common service port number from vyos.defaults - class form a Jinja2 template. This removes the need to hardcode, or pass in + class from a Jinja2 template. This removes the need to hardcode, or pass in the data using the general dictionary. Added to remove code complexity and make it easier to read. @@ -1092,3 +1092,21 @@ def get_default_port(service): raise RuntimeError(f'Service "{service}" not found in internal ' \ 'vyos.defaults.internal_ports dict!') return internal_ports[service] + +@register_clever_function('get_default_config_file') +def get_default_config_file(filename): + """ + Jinja2 plugin to retrieve a common configuration file path from + vyos.defaults class from a Jinja2 template. This removes the need to + hardcode, or pass in the data using the general dictionary. + + Added to remove code complexity and make it easier to read. + + Example: + {{ get_default_config_file('certbot_haproxy') }} + """ + from vyos.defaults import config_files + if filename not in config_files: + raise RuntimeError(f'Configuration file "{filename}" not found in '\ + 'internal vyos.defaults.config_files dict!') + return config_files[filename] diff --git a/python/vyos/utils/file.py b/python/vyos/utils/file.py index eaebb57a3..cc46d77d1 100644 --- a/python/vyos/utils/file.py +++ b/python/vyos/utils/file.py @@ -28,22 +28,28 @@ def file_is_persistent(path): absolute = os.path.abspath(os.path.dirname(path)) return re.match(location,absolute) -def read_file(fname, defaultonfailure=None): +def read_file(fname, defaultonfailure=None, sudo=False): """ read the content of a file, stripping any end characters (space, newlines) should defaultonfailure be not None, it is returned on failure to read """ try: - """ Read a file to string """ - with open(fname, 'r') as f: - data = f.read().strip() - return data + # Some files can only be read by root - emulate sudo cat call + if sudo: + from vyos.utils.process import cmd + data = cmd(['sudo', 'cat', fname]) + else: + # If not sudo, just read the file + with open(fname, 'r') as f: + data = f.read() + return data.strip() except Exception as e: if defaultonfailure is not None: return defaultonfailure raise e -def write_file(fname, data, defaultonfailure=None, user=None, group=None, mode=None, append=False): +def write_file(fname, data, defaultonfailure=None, user=None, group=None, + mode=None, append=False, trailing_newline=False): """ Write content of data to given fname, should defaultonfailure be not None, it is returned on failure to read. @@ -60,6 +66,9 @@ def write_file(fname, data, defaultonfailure=None, user=None, group=None, mode=N bytes = 0 with open(fname, 'w' if not append else 'a') as f: bytes = f.write(data) + if trailing_newline and not data.endswith('\n'): + f.write('\n') + bytes += 1 chown(fname, user, group) chmod(fname, mode) return bytes diff --git a/smoketest/scripts/cli/base_vyostest_shim.py b/smoketest/scripts/cli/base_vyostest_shim.py index f0674f187..9b64d5c0e 100644 --- a/smoketest/scripts/cli/base_vyostest_shim.py +++ b/smoketest/scripts/cli/base_vyostest_shim.py @@ -152,12 +152,14 @@ class VyOSUnitTestSHIM: return out @staticmethod - def ssh_send_cmd(command, username, password, hostname='localhost'): + def ssh_send_cmd(command, username, password, key_filename=None, + hostname='localhost'): """ SSH command execution helper """ # Try to login via SSH ssh_client = paramiko.SSHClient() ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - ssh_client.connect(hostname=hostname, username=username, password=password) + ssh_client.connect(hostname=hostname, username=username, + password=password, key_filename=key_filename) _, stdout, stderr = ssh_client.exec_command(command) output = stdout.read().decode().strip() error = stderr.read().decode().strip() diff --git a/smoketest/scripts/cli/test_service_ssh.py b/smoketest/scripts/cli/test_service_ssh.py index db83f14c3..551991d69 100755 --- a/smoketest/scripts/cli/test_service_ssh.py +++ b/smoketest/scripts/cli/test_service_ssh.py @@ -24,10 +24,12 @@ from pwd import getpwall from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSessionError +from vyos.defaults import config_files from vyos.utils.process import cmd from vyos.utils.process import is_systemd_service_running from vyos.utils.process import process_named_running from vyos.utils.file import read_file +from vyos.utils.file import write_file from vyos.xml_ref import default_value PROCESS_NAME = 'sshd' @@ -38,27 +40,101 @@ pki_path = ['pki'] key_rsa = '/etc/ssh/ssh_host_rsa_key' key_dsa = '/etc/ssh/ssh_host_dsa_key' key_ed25519 = '/etc/ssh/ssh_host_ed25519_key' -trusted_user_ca_key = '/etc/ssh/trusted_user_ca_key' -authorized_principals_dir = '/etc/ssh/authorized_principals' - +trusted_user_ca = config_files['sshd_user_ca'] +test_command = 'uname -a' def get_config_value(key): tmp = read_file(SSHD_CONF) tmp = re.findall(f'\n?{key}\s+(.*)', tmp) return tmp +trusted_user_ca_path = base_path + ['trusted-user-ca'] +# CA and signed user key generated using: +# ssh-keygen -f vyos-ssh-ca.key +# ssh-keygen -f vyos_testca -C "vyos_tesca@vyos.net" +# ssh-keygen -s vyos-ssh-ca.key -I vyos_testca@vyos.net -n vyos,vyos_testca -V +520w vyos_testca.pub +ca_cert_data = """ +AAAAB3NzaC1yc2EAAAADAQABAAABgQCTBa7+TTefsMLTHuuLPUmmm7SGAuoK03oZEIi2/O +sww1uhCdKrm7bFvSUFpWvq3gX8TSS+yO5kNKz3BTMBu7oq01/Ewjyw0jR+fUog76x7mCzd +2iI4QmPj4lNHSUFquaELt2aBwY4f7LtjxRCCgtWgirq/Qk+P27uJKErvndyYc95v9no15z +lQFSdUid6tF8IjYljK8pXP0JshFp3XnFV2Rg80j7O66mRtVFC4tt2vluyIFeIID+5fL03v +LXbT/2zNdoH6QiI9NGWkxhS7zFYziVd/rzG5xlEB1ezs2Sz4zjMPgV3GiMINb6tjEWNJhM +KtDWIt+3UDpx+2T9PrhDBDFMlneiHCD6MxRv2sLbicevSj0PV7/fRnwoHs6hDKCU5eS2Mc +CTxXr4jaboLZ6q3sbGHCHZo/PuA8Sl9iZCM4GCxx5bgvRRmGpgZv4PfFzA2b/wTHkKnf6E +kuthoAJufmNxPaZQRQKF34SdmTKgSJTCY1gqwCH2iNg0PVKU+vN8c= +""" -ca_root_cert_data = """ -MIIBcTCCARagAwIBAgIUDcAf1oIQV+6WRaW7NPcSnECQ/lUwCgYIKoZIzj0EAwIw -HjEcMBoGA1UEAwwTVnlPUyBzZXJ2ZXIgcm9vdCBDQTAeFw0yMjAyMTcxOTQxMjBa -Fw0zMjAyMTUxOTQxMjBaMB4xHDAaBgNVBAMME1Z5T1Mgc2VydmVyIHJvb3QgQ0Ew -WTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAAQ0y24GzKQf4aM2Ir12tI9yITOIzAUj -ZXyJeCmYI6uAnyAMqc4Q4NKyfq3nBi4XP87cs1jlC1P2BZ8MsjL5MdGWozIwMDAP -BgNVHRMBAf8EBTADAQH/MB0GA1UdDgQWBBRwC/YaieMEnjhYa7K3Flw/o0SFuzAK -BggqhkjOPQQDAgNJADBGAiEAh3qEj8vScsjAdBy5shXzXDVVOKWCPTdGrPKnu8UW -a2cCIQDlDgkzWmn5ujc5ATKz1fj+Se/aeqwh4QyoWCVTFLIxhQ== +cert_user_key = """-----BEGIN OPENSSH PRIVATE KEY----- +b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAABlwAAAAdzc2gtcn +NhAAAAAwEAAQAAAYEArnIlFpMwSQax7+qH3+/gbv65mem6Ur+gepNYC8TYaE91xJxMoE5M +Pyh1s8Kr/WYNF6aN43qdDnjvGy38oFng4lEfxG475AqpTIGmP4GvEOlnNLhjCcOHrOFuzg +uRtDDvn0/TPhdqLTlbvgZ326WO7xQkCX11qmdGUUtC9Byd7p+EmnTe0oP8N6MeyYY78qa4 +HnzMd6EPb3vyWdASpPZjQE0OJCeAx6Mne2kOnKxUcW1UlczOa1PPIQMU+Rp1PWDtkdiYAd +nbTbIdxDN8Bn3mC3JXD642EcwXSJ1+kov/8u8bBuYNt3t3nf/krSebx4Ge7ObYnURj31j0 +8L8Vv3fgv+T7pY8iyMh8dYfrZPAWQGN1pe8ZkDaM1QGKJncF+8N0UB4EVFBHNLt7W8+oHt +LPMqYw13djZHg5Q1NxSxc1srOmEBZrWCBZgDGGiqtKo+lF+oVvqvBh/hncOBlDX5RFM8qw +Qt4mem9TEZZrIvC9q1dcVpQUrt8BvBOSnGnBb7yTAAAFkEdBIUlHQSFJAAAAB3NzaC1yc2 +EAAAGBAK5yJRaTMEkGse/qh9/v4G7+uZnpulK/oHqTWAvE2GhPdcScTKBOTD8odbPCq/1m +DRemjeN6nQ547xst/KBZ4OJRH8RuO+QKqUyBpj+BrxDpZzS4YwnDh6zhbs4LkbQw759P0z +4Xai05W74Gd9ulju8UJAl9dapnRlFLQvQcne6fhJp03tKD/DejHsmGO/KmuB58zHehD297 +8lnQEqT2Y0BNDiQngMejJ3tpDpysVHFtVJXMzmtTzyEDFPkadT1g7ZHYmAHZ202yHcQzfA +Z95gtyVw+uNhHMF0idfpKL//LvGwbmDbd7d53/5K0nm8eBnuzm2J1EY99Y9PC/Fb934L/k ++6WPIsjIfHWH62TwFkBjdaXvGZA2jNUBiiZ3BfvDdFAeBFRQRzS7e1vPqB7SzzKmMNd3Y2 +R4OUNTcUsXNbKzphAWa1ggWYAxhoqrSqPpRfqFb6rwYf4Z3DgZQ1+URTPKsELeJnpvUxGW +ayLwvatXXFaUFK7fAbwTkpxpwW+8kwAAAAMBAAEAAAGAEeZQe+0vyoPPWkjRwbQBbszgX9 +9QaRE/TD82N5mZLbWJkK+2WnSY9O9tNGbIncBiSNz5ji/p/FmDCgzr8SAyfRvJ4K6sTTfy +1eYvwtscYDsy2ywDAuDMrnvrPLqJ1tghSP2N4BR9ppT4yZosTkjB+TIzMxjBLB0GEBgNj1 +19rxswe2YmlFSgBVgi3pbRgT0uLfgBmvzXHUoLPL/8ScT7u4Csmh/GN7Xmuo5gcMnArcAu +1Q17g3PJZcpv1Ser2VfKnVAwrURCLW8dlji5xat/3E/PLsrLvszVS6U0hFf3MaOixprxsz +wc0n2Y4lAgkgkCZQ0Ty9TSXI/8TQWL8cPFej1TK15NWXlfElZxI+lhwcsnWmNy3mXD746/ +YZLH+OCs9isvewZWryQEkdVCU42MM/7L4Hoeqh2diGDV9wtKDW5FjHq/VRNOMVt59eCFlv +eujh89/KY6wPxHoDoY3+olhggiKDGw1wUUpEXKNQhhTjx1g0xn7AFYz+Bp2svM9EdhAAAA +wQDBq+zeOhsS/VrrVRkmOYYXnBSe0WcckjcYOly/8FLTPkq19aVY5eOmo6teegqvkWscGP +Wisl7DW+kFNolIvwc6shf/8+PXC1KlADd9S1uoXvSmVoe3wSsIKRCsUuLZiiJkv4nqQ/BK +T6ijvNG2Wu3YGsP8Tj+OcTebqk1vDItaickhKtFxCx6PBcV+RrDeK1TT6uAHd1AsGikTva +V/BDMmtoDz7qFQbj9Vj2np88MakxYfm7u4DzKu082GHDBC44sAAADBAN8ATvmmfxqk5GFg ++2rbIW+qMJ2GwWXiTFLjH7u4HEhsmHbHYsQ0v+cGu2dKfBUVWoq/N2ltDQ0QYTgkmsxKvm +I8AjVhLHhFB1DtPBMHibsF/rtBRgsItR+PveUtRYOmeY1PzJ3ygVNJpPJ87st0T4JVNQiE ++bFEhnJ/RcTHxzAAt8+gTn0PTen3+hn9Jk2YFHWFb51YDw2h00LL9XT9Enz4xkc6gTPL3M +0IKULJWnyYGOLueSsQxJiaAUcsZg8W2QAAAMEAyEJ45HtbUqZ5xd2K5ZfY8cd1dC9uAx6a +cSdENUvMW4yE3QEJ4xdonDUn9OQYR7GpseQWuXBrTO2PSsse7P6eHUsRhaUkFOvLzHSVzO +bI9HDJAq6+KCPhm2eixfBiMs2meEle8MvNiiONwaY3JnPnGdsTpEjcm6oulyC52xRvHhvc +nCuoRTqX7xcIka4jCXInYBS7GhlF5iAmIAAVkvfWjjNwZ3S0mnGUUOYgknidBhK+x0zCWt +IXOeoIfjb/C4NLAAAAE3Z5b3NfdGVzY2FAdnlvcy5uZXQBAgMEBQYH +-----END OPENSSH PRIVATE KEY----- """ +cert_user_signed = """ +ssh-rsa-cert-v01@openssh.com AAAAHHNzaC1yc2EtY2VydC12MDFAb3BlbnNzaC5jb2 +0AAAAglE+kjRPqsck/y2ywO+owv1FTeU6QFNPywFqD8aoEcA8AAAADAQABAAABgQCuciUWk +zBJBrHv6off7+Bu/rmZ6bpSv6B6k1gLxNhoT3XEnEygTkw/KHWzwqv9Zg0Xpo3jep0OeO8b +LfygWeDiUR/EbjvkCqlMgaY/ga8Q6Wc0uGMJw4es4W7OC5G0MO+fT9M+F2otOVu+BnfbpY7 +vFCQJfXWqZ0ZRS0L0HJ3un4SadN7Sg/w3ox7JhjvyprgefMx3oQ9ve/JZ0BKk9mNATQ4kJ4 +DHoyd7aQ6crFRxbVSVzM5rU88hAxT5GnU9YO2R2JgB2dtNsh3EM3wGfeYLclcPrjYRzBdIn +X6Si//y7xsG5g23e3ed/+StJ5vHgZ7s5tidRGPfWPTwvxW/d+C/5PuljyLIyHx1h+tk8BZA +Y3Wl7xmQNozVAYomdwX7w3RQHgRUUEc0u3tbz6ge0s8ypjDXd2NkeDlDU3FLFzWys6YQFmt +YIFmAMYaKq0qj6UX6hW+q8GH+Gdw4GUNflEUzyrBC3iZ6b1MRlmsi8L2rV1xWlBSu3wG8E5 +KcacFvvJMAAAAAAAAAAAAAAAEAAAAUdnlvc190ZXN0Y2FAdnlvcy5uZXQAAAAXAAAABHZ5b +3MAAAALdnlvc190ZXN0Y2EAAAAAaDg66AAAAAB69w9WAAAAAAAAAIIAAAAVcGVybWl0LVgx +MS1mb3J3YXJkaW5nAAAAAAAAABdwZXJtaXQtYWdlbnQtZm9yd2FyZGluZwAAAAAAAAAWcGV +ybWl0LXBvcnQtZm9yd2FyZGluZwAAAAAAAAAKcGVybWl0LXB0eQAAAAAAAAAOcGVybWl0LX +VzZXItcmMAAAAAAAAAAAAAAZcAAAAHc3NoLXJzYQAAAAMBAAEAAAGBAJMFrv5NN5+wwtMe6 +4s9SaabtIYC6grTehkQiLb86zDDW6EJ0qubtsW9JQWla+reBfxNJL7I7mQ0rPcFMwG7uirT +X8TCPLDSNH59SiDvrHuYLN3aIjhCY+PiU0dJQWq5oQu3ZoHBjh/su2PFEIKC1aCKur9CT4/ +bu4koSu+d3Jhz3m/2ejXnOVAVJ1SJ3q0XwiNiWMrylc/QmyEWndecVXZGDzSPs7rqZG1UUL +i23a+W7IgV4ggP7l8vTe8tdtP/bM12gfpCIj00ZaTGFLvMVjOJV3+vMbnGUQHV7OzZLPjOM +w+BXcaIwg1vq2MRY0mEwq0NYi37dQOnH7ZP0+uEMEMUyWd6IcIPozFG/awtuJx69KPQ9Xv9 +9GfCgezqEMoJTl5LYxwJPFeviNpugtnqrexsYcIdmj8+4DxKX2JkIzgYLHHluC9FGYamBm/ +g98XMDZv/BMeQqd/oSS62GgAm5+Y3E9plBFAoXfhJ2ZMqBIlMJjWCrAIfaI2DQ9UpT683xw +AAAZQAAAAMcnNhLXNoYTItNTEyAAABgINZAr9M9ZYWDhhf5uWNkUBKq12OlJ3ImvHg5161P +BAAL6crGS3WzyAs9LerxFcdMJ0gzMgUixR59MgGMAzfN+DjoSmgcLVT0eVoI5GMBkdiq8T5 +h3qjeXTc5BfLJiACbu7tOPhuIsIDreDnCVYmGr2z+rAPaqMETJa4L0submx4DqnahSY0ZSH +WjTrjWCSPIdySh9HUXbpq3tYdNlqmpSY5YzvDmMC46kGMF10G5ycc58asWfUMwLMGsTEt2t +R5DKRDw/iJch3r+L0xLMCSmEXnu6/Gl7Yq1XJdWm9cA1SvDyxEuB4yKIDkunXrPiuPn3zyv +z1a/bY0hvuF+fyL+tRCbmrfOLreHuYh9aFg6e22MoKhrez5wP8Eoy1T+rlQrmlgCRDShBgj +wMMhc+2fdrzTR07Ctnmv339p/SY5wBruzNM9R1mzyEuuJDE6OkKBTI8kuQu6ypGv+bLqSSt +wujcNqOI4Vz61HiOsRSTUa7tA5q4hBwFqq7FB8+N0Ylfa5A== vyos_tesca@vyos.net +""" class TestServiceSSH(VyOSUnitTestSHIM.TestCase): @classmethod @@ -208,23 +284,12 @@ class TestServiceSSH(VyOSUnitTestSHIM.TestCase): # run natively. # # We also try to login as an invalid user - this is not allowed to work. - test_user = 'ssh_test' test_pass = 'v2i57DZs8idUwMN3VC92' - test_command = 'uname -a' self.cli_set(base_path) - self.cli_set( - [ - 'system', - 'login', - 'user', - test_user, - 'authentication', - 'plaintext-password', - test_pass, - ] - ) + self.cli_set(['system', 'login', 'user', test_user, 'authentication', + 'plaintext-password', test_pass]) # commit changes self.cli_commit() @@ -237,9 +302,8 @@ class TestServiceSSH(VyOSUnitTestSHIM.TestCase): # Login with invalid credentials with self.assertRaises(paramiko.ssh_exception.AuthenticationException): - output, error = self.ssh_send_cmd( - test_command, 'invalid_user', 'invalid_password' - ) + output, error = self.ssh_send_cmd(test_command, 'invalid_user', + 'invalid_password') self.cli_delete(['system', 'login', 'user', test_user]) self.cli_commit() @@ -360,126 +424,74 @@ class TestServiceSSH(VyOSUnitTestSHIM.TestCase): tmp_sshd_conf = read_file(SSHD_CONF) self.assertIn(expected, tmp_sshd_conf) - def test_ssh_trusted_user_ca_key(self): + def test_ssh_trusted_user_ca(self): ca_cert_name = 'test_ca' - - # set pki ca <ca_cert_name> certificate <ca_key_data> - # set service ssh trusted-user-ca-key ca-certificate <ca_cert_name> - self.cli_set( - pki_path - + [ - 'ca', - ca_cert_name, - 'certificate', - ca_root_cert_data.replace('\n', ''), - ] - ) - self.cli_set( - base_path + ['trusted-user-ca-key', 'ca-certificate', ca_cert_name] - ) + public_key_type = 'ssh-rsa' + public_key_data = ca_cert_data.replace('\n', '') + test_user = 'vyos_testca' + principal = 'vyos' + user_auth_base = ['system', 'login', 'user', test_user] + + # create user account + self.cli_set(user_auth_base) + self.cli_set(pki_path + ['openssh', ca_cert_name, 'public', + 'key', public_key_data]) + self.cli_set(pki_path + ['openssh', ca_cert_name, 'public', + 'type', public_key_type]) + self.cli_set(trusted_user_ca_path, value=ca_cert_name) self.cli_commit() - trusted_user_ca_key_config = get_config_value('TrustedUserCAKeys') - self.assertIn(trusted_user_ca_key, trusted_user_ca_key_config) + trusted_user_ca_config = get_config_value('TrustedUserCAKeys') + self.assertIn(trusted_user_ca, trusted_user_ca_config) + authorize_principals_file_config = get_config_value('AuthorizedPrincipalsFile') self.assertIn('none', authorize_principals_file_config) - with open(trusted_user_ca_key, 'r') as file: - ca_key_contents = file.read() - self.assertIn(ca_root_cert_data, ca_key_contents) - - self.cli_delete( - base_path + ['trusted-user-ca-key', 'ca-certificate', ca_cert_name] - ) - self.cli_delete(['pki', 'ca', ca_cert_name]) - self.cli_commit() + ca_key_contents = read_file(trusted_user_ca).lstrip().rstrip() + self.assertIn(f'{public_key_type} {public_key_data}', ca_key_contents) - # Verify the CA key is removed - trusted_user_ca_key_config = get_config_value('TrustedUserCAKeys') - self.assertNotIn(trusted_user_ca_key, trusted_user_ca_key_config) - authorize_principals_file_config = get_config_value('AuthorizedPrincipalsFile') - self.assertNotIn('none', authorize_principals_file_config) + # Verify functionality by logging into the system using signed user key + key_filename = f'/tmp/{test_user}' + write_file(key_filename, cert_user_key, mode=0o600) + write_file(f'{key_filename}-cert.pub', cert_user_signed.replace('\n', '')) - def test_ssh_trusted_user_ca_key_and_bind_user_with_principal(self): - ca_cert_name = 'test_ca' - bind_user = 'test_user' - principals = ['test_principal_alice', 'test_principal_bob'] - test_user = 'ssh_test' - test_pass = 'v2i57DZs8idUwMN3VC92' + # Login with proper credentials + output, error = self.ssh_send_cmd(test_command, test_user, password=None, + key_filename=key_filename) + # Verify login + self.assertFalse(error) + self.assertEqual(output, cmd(test_command)) - # Create a test user - self.cli_set( - [ - 'system', - 'login', - 'user', - test_user, - 'authentication', - 'plaintext-password', - test_pass, - ] - ) - - # set pki ca <ca_cert_name> certificate <ca_key_data> - # set service ssh trusted-user-ca-key ca-certificate <ca_cert_name> - # set service ssh trusted-user-ca-key bind-user <bind_user> principal <principals> - self.cli_set( - pki_path - + [ - 'ca', - ca_cert_name, - 'certificate', - ca_root_cert_data.replace('\n', ''), - ] - ) - self.cli_set( - base_path + ['trusted-user-ca-key', 'ca-certificate', ca_cert_name] - ) - for principal in principals: - self.cli_set( - base_path - + [ - 'trusted-user-ca-key', - 'bind-user', - bind_user, - 'principal', - principal, - ] - ) + # Enable user principal name - logins only allowed if certificate contains + # said principal name + self.cli_set(user_auth_base + ['authentication', 'principal', principal]) self.cli_commit() - trusted_user_ca_key_config = get_config_value('TrustedUserCAKeys') - self.assertIn(trusted_user_ca_key, trusted_user_ca_key_config) - authorized_principals_file = f'{authorized_principals_dir}/{bind_user}' - self.assertTrue(os.path.exists(authorized_principals_file)) - - with open(authorized_principals_file, 'r') as file: - authorized_principals = file.read() - for principal in principals: - self.assertIn(principal, authorized_principals) - - for principal in principals: - self.cli_delete( - base_path - + [ - 'trusted-user-ca-key', - 'bind-user', - bind_user, - 'principal', - principal, - ] - ) - - self.cli_delete( - base_path + ['trusted-user-ca-key', 'ca-certificate', ca_cert_name] - ) + # Verify generated SSH principals + authorized_principals_file = f'/home/{test_user}/.ssh/authorized_principals' + authorized_principals = read_file(authorized_principals_file, sudo=True) + self.assertIn(principal, authorized_principals) + + # Login with proper credentials + output, error = self.ssh_send_cmd(test_command, test_user, password=None, + key_filename=key_filename) + # Verify login + self.assertFalse(error) + self.assertEqual(output, cmd(test_command)) + + self.cli_delete(trusted_user_ca_path) + self.cli_delete(user_auth_base) self.cli_delete(['pki', 'ca', ca_cert_name]) - self.cli_delete(['system', 'login', 'user', test_user]) self.cli_commit() - # Verify the authorized principals file is removed - self.assertFalse(os.path.exists(authorized_principals_file)) + # Verify the CA key is removed + trusted_user_ca_config = get_config_value('TrustedUserCAKeys') + self.assertNotIn(trusted_user_ca, trusted_user_ca_config) + self.assertFalse(os.path.exists(trusted_user_ca)) + authorize_principals_file_config = get_config_value('AuthorizedPrincipalsFile') + self.assertNotIn('none', authorize_principals_file_config) + self.assertFalse(os.path.exists(f'/home/{test_user}/.ssh/authorized_principals')) if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/src/conf_mode/pki.py b/src/conf_mode/pki.py index 14fe86d56..7d01b6642 100755 --- a/src/conf_mode/pki.py +++ b/src/conf_mode/pki.py @@ -64,7 +64,7 @@ sync_search = [ 'path': ['service', 'https'], }, { - 'keys': ['ca_certificate'], + 'keys': ['key'], 'path': ['service', 'ssh'], }, { @@ -418,7 +418,8 @@ def verify(pki): if 'country' in default_values: country = default_values['country'] if len(country) != 2 or not country.isalpha(): - raise ConfigError(f'Invalid default country value. Value must be 2 alpha characters.') + raise ConfigError('Invalid default country value. '\ + 'Value must be 2 alpha characters.') if 'changed' in pki: # if the list is getting longer, we can move to a dict() and also embed the diff --git a/src/conf_mode/service_ssh.py b/src/conf_mode/service_ssh.py index f3c76508b..3d38d940a 100755 --- a/src/conf_mode/service_ssh.py +++ b/src/conf_mode/service_ssh.py @@ -23,14 +23,14 @@ from syslog import LOG_INFO from vyos.config import Config from vyos.configdict import is_node_changed from vyos.configverify import verify_vrf -from vyos.configverify import verify_pki_ca_certificate +from vyos.configverify import verify_pki_openssh_key +from vyos.defaults import config_files from vyos.utils.process import call from vyos.template import render from vyos import ConfigError from vyos import airbag -from vyos.pki import find_chain -from vyos.pki import encode_certificate -from vyos.pki import load_certificate +from vyos.pki import encode_public_key +from vyos.pki import load_openssh_public_key from vyos.utils.dict import dict_search_recursive from vyos.utils.file import write_file @@ -45,7 +45,7 @@ key_rsa = '/etc/ssh/ssh_host_rsa_key' key_dsa = '/etc/ssh/ssh_host_dsa_key' key_ed25519 = '/etc/ssh/ssh_host_ed25519_key' -trusted_user_ca_key = '/etc/ssh/trusted_user_ca_key' +trusted_user_ca = config_files['sshd_user_ca'] def get_config(config=None): if config: @@ -79,9 +79,9 @@ def get_config(config=None): get_first_key=True) for value, _ in dict_search_recursive(tmp, 'principal'): - # Only enable principal handling if SSH trusted-user-ca-key is set - if 'trusted_user_ca_key' in ssh: - ssh['trusted_user_ca_key'].update({'has_principals': {}}) + # Only enable principal handling if SSH trusted-user-ca is set + if 'trusted_user_ca' in ssh: + ssh['has_principals'] = {} # We do only need to execute this code path once as we need to know # if any one of the local users has a principal set or not - this # accounts for the entire system. @@ -97,16 +97,8 @@ def verify(ssh): if 'rekey' in ssh and 'data' not in ssh['rekey']: raise ConfigError('Rekey data is required!') - if 'trusted_user_ca_key' in ssh: - if 'ca_certificate' not in ssh['trusted_user_ca_key']: - raise ConfigError('CA certificate is mandatory when using ' \ - 'trusted-user-ca-key') - - ca_key_name = ssh['trusted_user_ca_key']['ca_certificate'] - verify_pki_ca_certificate(ssh, ca_key_name) - pki_ca_cert = ssh['pki']['ca'][ca_key_name] - if 'certificate' not in pki_ca_cert or not pki_ca_cert['certificate']: - raise ConfigError(f"CA certificate '{ca_key_name}' is not valid or missing") + if 'trusted_user_ca' in ssh: + verify_pki_openssh_key(ssh, ssh['trusted_user_ca']) verify_vrf(ssh) return None @@ -131,23 +123,17 @@ def generate(ssh): syslog(LOG_INFO, 'SSH ed25519 host key not found, generating new key!') call(f'ssh-keygen -q -N "" -t ed25519 -f {key_ed25519}') - if 'trusted_user_ca_key' in ssh: - ca_key_name = ssh['trusted_user_ca_key']['ca_certificate'] - pki_ca_cert = ssh['pki']['ca'][ca_key_name] - - loaded_ca_cert = load_certificate(pki_ca_cert['certificate']) - loaded_ca_certs = { - load_certificate(c['certificate']) - for c in ssh['pki']['ca'].values() - if 'certificate' in c - } - - ca_full_chain = find_chain(loaded_ca_cert, loaded_ca_certs) - write_file(trusted_user_ca_key, - '\n'.join(encode_certificate(c) for c in ca_full_chain)) + if 'trusted_user_ca' in ssh: + key_name = ssh['trusted_user_ca'] + openssh_cert = ssh['pki']['openssh'][key_name] + loaded_ca_cert = load_openssh_public_key(openssh_cert['public']['key'], + openssh_cert['public']['type']) + tmp = encode_public_key(loaded_ca_cert, encoding='OpenSSH', + key_format='OpenSSH') + write_file(trusted_user_ca, tmp, trailing_newline=True) else: - if os.path.exists(trusted_user_ca_key): - os.unlink(trusted_user_ca_key) + if os.path.exists(trusted_user_ca): + os.unlink(trusted_user_ca) render(config_file, 'ssh/sshd_config.j2', ssh) diff --git a/src/conf_mode/system_login.py b/src/conf_mode/system_login.py index 481fdd16e..22b6fcc98 100755 --- a/src/conf_mode/system_login.py +++ b/src/conf_mode/system_login.py @@ -375,14 +375,15 @@ def apply(login): chown(home_dir, user=user, recursive=True) # Generate 2FA/MFA One-Time-Pad configuration + google_auth_file = f'{home_dir}/.google_authenticator' if dict_search('authentication.otp.key', user_config): enable_otp = True - render(f'{home_dir}/.google_authenticator', 'login/pam_otp_ga.conf.j2', + render(google_auth_file, 'login/pam_otp_ga.conf.j2', user_config, permission=0o400, user=user, group='users') else: # delete configuration as it's not enabled for the user - if os.path.exists(f'{home_dir}/.google_authenticator'): - os.remove(f'{home_dir}/.google_authenticator') + if os.path.exists(google_auth_file): + os.unlink(google_auth_file) # Lock/Unlock local user account lock_unlock = '--unlock' @@ -396,6 +397,22 @@ def apply(login): # Disable user to prevent re-login call(f'usermod -s /sbin/nologin {user}') + home_dir = getpwnam(user).pw_dir + # Remove SSH authorized keys file + authorized_keys_file = f'{home_dir}/.ssh/authorized_keys' + if os.path.exists(authorized_keys_file): + os.unlink(authorized_keys_file) + + # Remove SSH authorized principals file + principals_file = f'{home_dir}/.ssh/authorized_principals' + if os.path.exists(principals_file): + os.unlink(principals_file) + + # Remove Google Authenticator file + google_auth_file = f'{home_dir}/.google_authenticator' + if os.path.exists(google_auth_file): + os.unlink(google_auth_file) + # Logout user if he is still logged in if user in list(set([tmp[0] for tmp in users()])): print(f'{user} is logged in, forcing logout!') diff --git a/src/tests/test_template.py b/src/tests/test_template.py index 7cae867a0..4660c0038 100644 --- a/src/tests/test_template.py +++ b/src/tests/test_template.py @@ -192,10 +192,15 @@ class TestVyOSTemplate(TestCase): self.assertIn(IKEv2_DEFAULT, ','.join(ciphers)) def test_get_default_port(self): + from vyos.defaults import config_files from vyos.defaults import internal_ports with self.assertRaises(RuntimeError): + vyos.template.get_default_config_file('UNKNOWN') + with self.assertRaises(RuntimeError): vyos.template.get_default_port('UNKNOWN') + self.assertEqual(vyos.template.get_default_config_file('sshd_user_ca'), + config_files['sshd_user_ca']) self.assertEqual(vyos.template.get_default_port('certbot_haproxy'), internal_ports['certbot_haproxy']) |