summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Breunig <christian@breunig.cc>2025-05-20 19:57:24 +0200
committerChristian Breunig <christian@breunig.cc>2025-05-29 14:01:32 +0200
commit4b4bbd73b84c2c478c7752f58e7f66ec6d90459e (patch)
tree872749218a0efba4375cad579d617db02b6dac97
parentd2745a7b60a7fef88958bd52b3876c105da87e77 (diff)
downloadvyos-1x-4b4bbd73b84c2c478c7752f58e7f66ec6d90459e.tar.gz
vyos-1x-4b4bbd73b84c2c478c7752f58e7f66ec6d90459e.zip
ssh: T6013: rename trusted-user-ca-key -> truster-user-ca
The current implementation for SSH CA based authentication uses "set service ssh trusted-user-ca-key ca-certificate <foo>" to define an X.509 certificate from "set pki ca <foo> ..." - fun fact, native OpenSSH does not support X.509 certificates and only runs with OpenSSH ssh-keygen generated RSA or EC keys. This commit changes the bahavior to support antive certificates generated using ssh-keygen and loaded to our PKI tree. As the previous implementation did not work at all, no migrations cript is used.
-rw-r--r--data/templates/ssh/sshd_config.j26
-rw-r--r--interface-definitions/service_ssh.xml.in16
-rw-r--r--python/vyos/configverify.py19
-rw-r--r--python/vyos/defaults.py8
-rwxr-xr-xpython/vyos/template.py20
-rw-r--r--python/vyos/utils/file.py21
-rw-r--r--smoketest/scripts/cli/base_vyostest_shim.py6
-rwxr-xr-xsmoketest/scripts/cli/test_service_ssh.py278
-rwxr-xr-xsrc/conf_mode/pki.py5
-rwxr-xr-xsrc/conf_mode/service_ssh.py54
-rwxr-xr-xsrc/conf_mode/system_login.py23
-rw-r--r--src/tests/test_template.py5
12 files changed, 269 insertions, 192 deletions
diff --git a/data/templates/ssh/sshd_config.j2 b/data/templates/ssh/sshd_config.j2
index dce679936..1315bf2cb 100644
--- a/data/templates/ssh/sshd_config.j2
+++ b/data/templates/ssh/sshd_config.j2
@@ -111,17 +111,17 @@ ClientAliveInterval {{ client_keepalive_interval }}
RekeyLimit {{ rekey.data }}M {{ rekey.time + 'M' if rekey.time is vyos_defined }}
{% endif %}
-{% if trusted_user_ca_key is vyos_defined %}
+{% if trusted_user_ca is vyos_defined %}
# Specifies a file containing public keys of certificate authorities that are
# trusted to sign user certificates for authentication
-TrustedUserCAKeys /etc/ssh/trusted_user_ca_key
+TrustedUserCAKeys {{ get_default_config_file('sshd_user_ca') }}
# The default is "none", i.e. not to use a principals file - in this case, the
# username of the user must appear in a certificate's principals list for it
# to be accepted. ".ssh/authorized_principals" means a per-user configuration,
# relative to $HOME.
{% set filename = 'none' %}
-{% if trusted_user_ca_key.has_principals is vyos_defined %}
+{% if has_principals is vyos_defined %}
{% set filename = '.ssh/authorized_principals' %}
{% endif %}
AuthorizedPrincipalsFile {{ filename }}
diff --git a/interface-definitions/service_ssh.xml.in b/interface-definitions/service_ssh.xml.in
index 14d358c78..c659a7db7 100644
--- a/interface-definitions/service_ssh.xml.in
+++ b/interface-definitions/service_ssh.xml.in
@@ -275,14 +275,18 @@
</constraint>
</properties>
</leafNode>
- <node name="trusted-user-ca-key">
+ <leafNode name="trusted-user-ca">
<properties>
- <help>Trusted user CA key</help>
+ <help>OpenSSH trusted user CA</help>
+ <completionHelp>
+ <path>pki openssh</path>
+ </completionHelp>
+ <valueHelp>
+ <format>txt</format>
+ <description>OpenSSH certificate name from PKI subsystem</description>
+ </valueHelp>
</properties>
- <children>
- #include <include/pki/ca-certificate.xml.i>
- </children>
- </node>
+ </leafNode>
#include <include/vrf-multi.xml.i>
</children>
</node>
diff --git a/python/vyos/configverify.py b/python/vyos/configverify.py
index d5f443f15..07eb29a68 100644
--- a/python/vyos/configverify.py
+++ b/python/vyos/configverify.py
@@ -527,6 +527,25 @@ def verify_pki_dh_parameters(config: dict, dh_name: str, min_key_size: int=0):
if dh_bits < min_key_size:
raise ConfigError(f'Minimum DH key-size is {min_key_size} bits!')
+def verify_pki_openssh_key(config: dict, key_name: str):
+ """
+ Common helper function user by PKI consumers to perform recurring
+ validation functions on OpenSSH keys
+ """
+ if 'pki' not in config:
+ raise ConfigError('PKI is not configured!')
+
+ if 'openssh' not in config['pki']:
+ raise ConfigError('PKI does not contain any OpenSSH keys!')
+
+ if key_name not in config['pki']['openssh']:
+ raise ConfigError(f'OpenSSH key "{key_name}" not found in configuration!')
+
+ if 'public' in config['pki']['openssh'][key_name]:
+ if not {'key', 'type'} <= set(config['pki']['openssh'][key_name]['public']):
+ raise ConfigError('Both public key and type must be defined for '\
+ f'OpenSSH public key "{key_name}"!')
+
def verify_eapol(config: dict):
"""
Common helper function used by interface implementations to perform
diff --git a/python/vyos/defaults.py b/python/vyos/defaults.py
index fbde0298b..e42d92112 100644
--- a/python/vyos/defaults.py
+++ b/python/vyos/defaults.py
@@ -53,6 +53,10 @@ internal_ports = {
'certbot_haproxy' : 65080, # Certbot running behing haproxy
}
+config_files = {
+ 'sshd_user_ca' : '/run/sshd/trusted_user_ca',
+}
+
config_status = '/tmp/vyos-config-status'
api_config_state = '/run/http-api-state'
frr_debug_enable = '/tmp/vyos.frr.debug'
@@ -69,8 +73,8 @@ config_default = os.path.join(directories['data'], 'config.boot.default')
rt_symbolic_names = {
# Standard routing tables for Linux & reserved IDs for VyOS
- 'default': 253, # Confusingly, a final fallthru, not the default.
- 'main': 254, # The actual global table used by iproute2 unless told otherwise.
+ 'default': 253, # Confusingly, a final fallthru, not the default.
+ 'main': 254, # The actual global table used by iproute2 unless told otherwise.
'local': 255, # Special kernel loopback table.
}
diff --git a/python/vyos/template.py b/python/vyos/template.py
index aa215db95..bf7928914 100755
--- a/python/vyos/template.py
+++ b/python/vyos/template.py
@@ -1079,7 +1079,7 @@ def vyos_defined(value, test_value=None, var_type=None):
def get_default_port(service):
"""
Jinja2 plugin to retrieve common service port number from vyos.defaults
- class form a Jinja2 template. This removes the need to hardcode, or pass in
+ class from a Jinja2 template. This removes the need to hardcode, or pass in
the data using the general dictionary.
Added to remove code complexity and make it easier to read.
@@ -1092,3 +1092,21 @@ def get_default_port(service):
raise RuntimeError(f'Service "{service}" not found in internal ' \
'vyos.defaults.internal_ports dict!')
return internal_ports[service]
+
+@register_clever_function('get_default_config_file')
+def get_default_config_file(filename):
+ """
+ Jinja2 plugin to retrieve a common configuration file path from
+ vyos.defaults class from a Jinja2 template. This removes the need to
+ hardcode, or pass in the data using the general dictionary.
+
+ Added to remove code complexity and make it easier to read.
+
+ Example:
+ {{ get_default_config_file('certbot_haproxy') }}
+ """
+ from vyos.defaults import config_files
+ if filename not in config_files:
+ raise RuntimeError(f'Configuration file "{filename}" not found in '\
+ 'internal vyos.defaults.config_files dict!')
+ return config_files[filename]
diff --git a/python/vyos/utils/file.py b/python/vyos/utils/file.py
index eaebb57a3..cc46d77d1 100644
--- a/python/vyos/utils/file.py
+++ b/python/vyos/utils/file.py
@@ -28,22 +28,28 @@ def file_is_persistent(path):
absolute = os.path.abspath(os.path.dirname(path))
return re.match(location,absolute)
-def read_file(fname, defaultonfailure=None):
+def read_file(fname, defaultonfailure=None, sudo=False):
"""
read the content of a file, stripping any end characters (space, newlines)
should defaultonfailure be not None, it is returned on failure to read
"""
try:
- """ Read a file to string """
- with open(fname, 'r') as f:
- data = f.read().strip()
- return data
+ # Some files can only be read by root - emulate sudo cat call
+ if sudo:
+ from vyos.utils.process import cmd
+ data = cmd(['sudo', 'cat', fname])
+ else:
+ # If not sudo, just read the file
+ with open(fname, 'r') as f:
+ data = f.read()
+ return data.strip()
except Exception as e:
if defaultonfailure is not None:
return defaultonfailure
raise e
-def write_file(fname, data, defaultonfailure=None, user=None, group=None, mode=None, append=False):
+def write_file(fname, data, defaultonfailure=None, user=None, group=None,
+ mode=None, append=False, trailing_newline=False):
"""
Write content of data to given fname, should defaultonfailure be not None,
it is returned on failure to read.
@@ -60,6 +66,9 @@ def write_file(fname, data, defaultonfailure=None, user=None, group=None, mode=N
bytes = 0
with open(fname, 'w' if not append else 'a') as f:
bytes = f.write(data)
+ if trailing_newline and not data.endswith('\n'):
+ f.write('\n')
+ bytes += 1
chown(fname, user, group)
chmod(fname, mode)
return bytes
diff --git a/smoketest/scripts/cli/base_vyostest_shim.py b/smoketest/scripts/cli/base_vyostest_shim.py
index f0674f187..9b64d5c0e 100644
--- a/smoketest/scripts/cli/base_vyostest_shim.py
+++ b/smoketest/scripts/cli/base_vyostest_shim.py
@@ -152,12 +152,14 @@ class VyOSUnitTestSHIM:
return out
@staticmethod
- def ssh_send_cmd(command, username, password, hostname='localhost'):
+ def ssh_send_cmd(command, username, password, key_filename=None,
+ hostname='localhost'):
""" SSH command execution helper """
# Try to login via SSH
ssh_client = paramiko.SSHClient()
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- ssh_client.connect(hostname=hostname, username=username, password=password)
+ ssh_client.connect(hostname=hostname, username=username,
+ password=password, key_filename=key_filename)
_, stdout, stderr = ssh_client.exec_command(command)
output = stdout.read().decode().strip()
error = stderr.read().decode().strip()
diff --git a/smoketest/scripts/cli/test_service_ssh.py b/smoketest/scripts/cli/test_service_ssh.py
index db83f14c3..551991d69 100755
--- a/smoketest/scripts/cli/test_service_ssh.py
+++ b/smoketest/scripts/cli/test_service_ssh.py
@@ -24,10 +24,12 @@ from pwd import getpwall
from base_vyostest_shim import VyOSUnitTestSHIM
from vyos.configsession import ConfigSessionError
+from vyos.defaults import config_files
from vyos.utils.process import cmd
from vyos.utils.process import is_systemd_service_running
from vyos.utils.process import process_named_running
from vyos.utils.file import read_file
+from vyos.utils.file import write_file
from vyos.xml_ref import default_value
PROCESS_NAME = 'sshd'
@@ -38,27 +40,101 @@ pki_path = ['pki']
key_rsa = '/etc/ssh/ssh_host_rsa_key'
key_dsa = '/etc/ssh/ssh_host_dsa_key'
key_ed25519 = '/etc/ssh/ssh_host_ed25519_key'
-trusted_user_ca_key = '/etc/ssh/trusted_user_ca_key'
-authorized_principals_dir = '/etc/ssh/authorized_principals'
-
+trusted_user_ca = config_files['sshd_user_ca']
+test_command = 'uname -a'
def get_config_value(key):
tmp = read_file(SSHD_CONF)
tmp = re.findall(f'\n?{key}\s+(.*)', tmp)
return tmp
+trusted_user_ca_path = base_path + ['trusted-user-ca']
+# CA and signed user key generated using:
+# ssh-keygen -f vyos-ssh-ca.key
+# ssh-keygen -f vyos_testca -C "vyos_tesca@vyos.net"
+# ssh-keygen -s vyos-ssh-ca.key -I vyos_testca@vyos.net -n vyos,vyos_testca -V +520w vyos_testca.pub
+ca_cert_data = """
+AAAAB3NzaC1yc2EAAAADAQABAAABgQCTBa7+TTefsMLTHuuLPUmmm7SGAuoK03oZEIi2/O
+sww1uhCdKrm7bFvSUFpWvq3gX8TSS+yO5kNKz3BTMBu7oq01/Ewjyw0jR+fUog76x7mCzd
+2iI4QmPj4lNHSUFquaELt2aBwY4f7LtjxRCCgtWgirq/Qk+P27uJKErvndyYc95v9no15z
+lQFSdUid6tF8IjYljK8pXP0JshFp3XnFV2Rg80j7O66mRtVFC4tt2vluyIFeIID+5fL03v
+LXbT/2zNdoH6QiI9NGWkxhS7zFYziVd/rzG5xlEB1ezs2Sz4zjMPgV3GiMINb6tjEWNJhM
+KtDWIt+3UDpx+2T9PrhDBDFMlneiHCD6MxRv2sLbicevSj0PV7/fRnwoHs6hDKCU5eS2Mc
+CTxXr4jaboLZ6q3sbGHCHZo/PuA8Sl9iZCM4GCxx5bgvRRmGpgZv4PfFzA2b/wTHkKnf6E
+kuthoAJufmNxPaZQRQKF34SdmTKgSJTCY1gqwCH2iNg0PVKU+vN8c=
+"""
-ca_root_cert_data = """
-MIIBcTCCARagAwIBAgIUDcAf1oIQV+6WRaW7NPcSnECQ/lUwCgYIKoZIzj0EAwIw
-HjEcMBoGA1UEAwwTVnlPUyBzZXJ2ZXIgcm9vdCBDQTAeFw0yMjAyMTcxOTQxMjBa
-Fw0zMjAyMTUxOTQxMjBaMB4xHDAaBgNVBAMME1Z5T1Mgc2VydmVyIHJvb3QgQ0Ew
-WTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAAQ0y24GzKQf4aM2Ir12tI9yITOIzAUj
-ZXyJeCmYI6uAnyAMqc4Q4NKyfq3nBi4XP87cs1jlC1P2BZ8MsjL5MdGWozIwMDAP
-BgNVHRMBAf8EBTADAQH/MB0GA1UdDgQWBBRwC/YaieMEnjhYa7K3Flw/o0SFuzAK
-BggqhkjOPQQDAgNJADBGAiEAh3qEj8vScsjAdBy5shXzXDVVOKWCPTdGrPKnu8UW
-a2cCIQDlDgkzWmn5ujc5ATKz1fj+Se/aeqwh4QyoWCVTFLIxhQ==
+cert_user_key = """-----BEGIN OPENSSH PRIVATE KEY-----
+b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAABlwAAAAdzc2gtcn
+NhAAAAAwEAAQAAAYEArnIlFpMwSQax7+qH3+/gbv65mem6Ur+gepNYC8TYaE91xJxMoE5M
+Pyh1s8Kr/WYNF6aN43qdDnjvGy38oFng4lEfxG475AqpTIGmP4GvEOlnNLhjCcOHrOFuzg
+uRtDDvn0/TPhdqLTlbvgZ326WO7xQkCX11qmdGUUtC9Byd7p+EmnTe0oP8N6MeyYY78qa4
+HnzMd6EPb3vyWdASpPZjQE0OJCeAx6Mne2kOnKxUcW1UlczOa1PPIQMU+Rp1PWDtkdiYAd
+nbTbIdxDN8Bn3mC3JXD642EcwXSJ1+kov/8u8bBuYNt3t3nf/krSebx4Ge7ObYnURj31j0
+8L8Vv3fgv+T7pY8iyMh8dYfrZPAWQGN1pe8ZkDaM1QGKJncF+8N0UB4EVFBHNLt7W8+oHt
+LPMqYw13djZHg5Q1NxSxc1srOmEBZrWCBZgDGGiqtKo+lF+oVvqvBh/hncOBlDX5RFM8qw
+Qt4mem9TEZZrIvC9q1dcVpQUrt8BvBOSnGnBb7yTAAAFkEdBIUlHQSFJAAAAB3NzaC1yc2
+EAAAGBAK5yJRaTMEkGse/qh9/v4G7+uZnpulK/oHqTWAvE2GhPdcScTKBOTD8odbPCq/1m
+DRemjeN6nQ547xst/KBZ4OJRH8RuO+QKqUyBpj+BrxDpZzS4YwnDh6zhbs4LkbQw759P0z
+4Xai05W74Gd9ulju8UJAl9dapnRlFLQvQcne6fhJp03tKD/DejHsmGO/KmuB58zHehD297
+8lnQEqT2Y0BNDiQngMejJ3tpDpysVHFtVJXMzmtTzyEDFPkadT1g7ZHYmAHZ202yHcQzfA
+Z95gtyVw+uNhHMF0idfpKL//LvGwbmDbd7d53/5K0nm8eBnuzm2J1EY99Y9PC/Fb934L/k
++6WPIsjIfHWH62TwFkBjdaXvGZA2jNUBiiZ3BfvDdFAeBFRQRzS7e1vPqB7SzzKmMNd3Y2
+R4OUNTcUsXNbKzphAWa1ggWYAxhoqrSqPpRfqFb6rwYf4Z3DgZQ1+URTPKsELeJnpvUxGW
+ayLwvatXXFaUFK7fAbwTkpxpwW+8kwAAAAMBAAEAAAGAEeZQe+0vyoPPWkjRwbQBbszgX9
+9QaRE/TD82N5mZLbWJkK+2WnSY9O9tNGbIncBiSNz5ji/p/FmDCgzr8SAyfRvJ4K6sTTfy
+1eYvwtscYDsy2ywDAuDMrnvrPLqJ1tghSP2N4BR9ppT4yZosTkjB+TIzMxjBLB0GEBgNj1
+19rxswe2YmlFSgBVgi3pbRgT0uLfgBmvzXHUoLPL/8ScT7u4Csmh/GN7Xmuo5gcMnArcAu
+1Q17g3PJZcpv1Ser2VfKnVAwrURCLW8dlji5xat/3E/PLsrLvszVS6U0hFf3MaOixprxsz
+wc0n2Y4lAgkgkCZQ0Ty9TSXI/8TQWL8cPFej1TK15NWXlfElZxI+lhwcsnWmNy3mXD746/
+YZLH+OCs9isvewZWryQEkdVCU42MM/7L4Hoeqh2diGDV9wtKDW5FjHq/VRNOMVt59eCFlv
+eujh89/KY6wPxHoDoY3+olhggiKDGw1wUUpEXKNQhhTjx1g0xn7AFYz+Bp2svM9EdhAAAA
+wQDBq+zeOhsS/VrrVRkmOYYXnBSe0WcckjcYOly/8FLTPkq19aVY5eOmo6teegqvkWscGP
+Wisl7DW+kFNolIvwc6shf/8+PXC1KlADd9S1uoXvSmVoe3wSsIKRCsUuLZiiJkv4nqQ/BK
+T6ijvNG2Wu3YGsP8Tj+OcTebqk1vDItaickhKtFxCx6PBcV+RrDeK1TT6uAHd1AsGikTva
+V/BDMmtoDz7qFQbj9Vj2np88MakxYfm7u4DzKu082GHDBC44sAAADBAN8ATvmmfxqk5GFg
++2rbIW+qMJ2GwWXiTFLjH7u4HEhsmHbHYsQ0v+cGu2dKfBUVWoq/N2ltDQ0QYTgkmsxKvm
+I8AjVhLHhFB1DtPBMHibsF/rtBRgsItR+PveUtRYOmeY1PzJ3ygVNJpPJ87st0T4JVNQiE
++bFEhnJ/RcTHxzAAt8+gTn0PTen3+hn9Jk2YFHWFb51YDw2h00LL9XT9Enz4xkc6gTPL3M
+0IKULJWnyYGOLueSsQxJiaAUcsZg8W2QAAAMEAyEJ45HtbUqZ5xd2K5ZfY8cd1dC9uAx6a
+cSdENUvMW4yE3QEJ4xdonDUn9OQYR7GpseQWuXBrTO2PSsse7P6eHUsRhaUkFOvLzHSVzO
+bI9HDJAq6+KCPhm2eixfBiMs2meEle8MvNiiONwaY3JnPnGdsTpEjcm6oulyC52xRvHhvc
+nCuoRTqX7xcIka4jCXInYBS7GhlF5iAmIAAVkvfWjjNwZ3S0mnGUUOYgknidBhK+x0zCWt
+IXOeoIfjb/C4NLAAAAE3Z5b3NfdGVzY2FAdnlvcy5uZXQBAgMEBQYH
+-----END OPENSSH PRIVATE KEY-----
"""
+cert_user_signed = """
+ssh-rsa-cert-v01@openssh.com AAAAHHNzaC1yc2EtY2VydC12MDFAb3BlbnNzaC5jb2
+0AAAAglE+kjRPqsck/y2ywO+owv1FTeU6QFNPywFqD8aoEcA8AAAADAQABAAABgQCuciUWk
+zBJBrHv6off7+Bu/rmZ6bpSv6B6k1gLxNhoT3XEnEygTkw/KHWzwqv9Zg0Xpo3jep0OeO8b
+LfygWeDiUR/EbjvkCqlMgaY/ga8Q6Wc0uGMJw4es4W7OC5G0MO+fT9M+F2otOVu+BnfbpY7
+vFCQJfXWqZ0ZRS0L0HJ3un4SadN7Sg/w3ox7JhjvyprgefMx3oQ9ve/JZ0BKk9mNATQ4kJ4
+DHoyd7aQ6crFRxbVSVzM5rU88hAxT5GnU9YO2R2JgB2dtNsh3EM3wGfeYLclcPrjYRzBdIn
+X6Si//y7xsG5g23e3ed/+StJ5vHgZ7s5tidRGPfWPTwvxW/d+C/5PuljyLIyHx1h+tk8BZA
+Y3Wl7xmQNozVAYomdwX7w3RQHgRUUEc0u3tbz6ge0s8ypjDXd2NkeDlDU3FLFzWys6YQFmt
+YIFmAMYaKq0qj6UX6hW+q8GH+Gdw4GUNflEUzyrBC3iZ6b1MRlmsi8L2rV1xWlBSu3wG8E5
+KcacFvvJMAAAAAAAAAAAAAAAEAAAAUdnlvc190ZXN0Y2FAdnlvcy5uZXQAAAAXAAAABHZ5b
+3MAAAALdnlvc190ZXN0Y2EAAAAAaDg66AAAAAB69w9WAAAAAAAAAIIAAAAVcGVybWl0LVgx
+MS1mb3J3YXJkaW5nAAAAAAAAABdwZXJtaXQtYWdlbnQtZm9yd2FyZGluZwAAAAAAAAAWcGV
+ybWl0LXBvcnQtZm9yd2FyZGluZwAAAAAAAAAKcGVybWl0LXB0eQAAAAAAAAAOcGVybWl0LX
+VzZXItcmMAAAAAAAAAAAAAAZcAAAAHc3NoLXJzYQAAAAMBAAEAAAGBAJMFrv5NN5+wwtMe6
+4s9SaabtIYC6grTehkQiLb86zDDW6EJ0qubtsW9JQWla+reBfxNJL7I7mQ0rPcFMwG7uirT
+X8TCPLDSNH59SiDvrHuYLN3aIjhCY+PiU0dJQWq5oQu3ZoHBjh/su2PFEIKC1aCKur9CT4/
+bu4koSu+d3Jhz3m/2ejXnOVAVJ1SJ3q0XwiNiWMrylc/QmyEWndecVXZGDzSPs7rqZG1UUL
+i23a+W7IgV4ggP7l8vTe8tdtP/bM12gfpCIj00ZaTGFLvMVjOJV3+vMbnGUQHV7OzZLPjOM
+w+BXcaIwg1vq2MRY0mEwq0NYi37dQOnH7ZP0+uEMEMUyWd6IcIPozFG/awtuJx69KPQ9Xv9
+9GfCgezqEMoJTl5LYxwJPFeviNpugtnqrexsYcIdmj8+4DxKX2JkIzgYLHHluC9FGYamBm/
+g98XMDZv/BMeQqd/oSS62GgAm5+Y3E9plBFAoXfhJ2ZMqBIlMJjWCrAIfaI2DQ9UpT683xw
+AAAZQAAAAMcnNhLXNoYTItNTEyAAABgINZAr9M9ZYWDhhf5uWNkUBKq12OlJ3ImvHg5161P
+BAAL6crGS3WzyAs9LerxFcdMJ0gzMgUixR59MgGMAzfN+DjoSmgcLVT0eVoI5GMBkdiq8T5
+h3qjeXTc5BfLJiACbu7tOPhuIsIDreDnCVYmGr2z+rAPaqMETJa4L0submx4DqnahSY0ZSH
+WjTrjWCSPIdySh9HUXbpq3tYdNlqmpSY5YzvDmMC46kGMF10G5ycc58asWfUMwLMGsTEt2t
+R5DKRDw/iJch3r+L0xLMCSmEXnu6/Gl7Yq1XJdWm9cA1SvDyxEuB4yKIDkunXrPiuPn3zyv
+z1a/bY0hvuF+fyL+tRCbmrfOLreHuYh9aFg6e22MoKhrez5wP8Eoy1T+rlQrmlgCRDShBgj
+wMMhc+2fdrzTR07Ctnmv339p/SY5wBruzNM9R1mzyEuuJDE6OkKBTI8kuQu6ypGv+bLqSSt
+wujcNqOI4Vz61HiOsRSTUa7tA5q4hBwFqq7FB8+N0Ylfa5A== vyos_tesca@vyos.net
+"""
class TestServiceSSH(VyOSUnitTestSHIM.TestCase):
@classmethod
@@ -208,23 +284,12 @@ class TestServiceSSH(VyOSUnitTestSHIM.TestCase):
# run natively.
#
# We also try to login as an invalid user - this is not allowed to work.
-
test_user = 'ssh_test'
test_pass = 'v2i57DZs8idUwMN3VC92'
- test_command = 'uname -a'
self.cli_set(base_path)
- self.cli_set(
- [
- 'system',
- 'login',
- 'user',
- test_user,
- 'authentication',
- 'plaintext-password',
- test_pass,
- ]
- )
+ self.cli_set(['system', 'login', 'user', test_user, 'authentication',
+ 'plaintext-password', test_pass])
# commit changes
self.cli_commit()
@@ -237,9 +302,8 @@ class TestServiceSSH(VyOSUnitTestSHIM.TestCase):
# Login with invalid credentials
with self.assertRaises(paramiko.ssh_exception.AuthenticationException):
- output, error = self.ssh_send_cmd(
- test_command, 'invalid_user', 'invalid_password'
- )
+ output, error = self.ssh_send_cmd(test_command, 'invalid_user',
+ 'invalid_password')
self.cli_delete(['system', 'login', 'user', test_user])
self.cli_commit()
@@ -360,126 +424,74 @@ class TestServiceSSH(VyOSUnitTestSHIM.TestCase):
tmp_sshd_conf = read_file(SSHD_CONF)
self.assertIn(expected, tmp_sshd_conf)
- def test_ssh_trusted_user_ca_key(self):
+ def test_ssh_trusted_user_ca(self):
ca_cert_name = 'test_ca'
-
- # set pki ca <ca_cert_name> certificate <ca_key_data>
- # set service ssh trusted-user-ca-key ca-certificate <ca_cert_name>
- self.cli_set(
- pki_path
- + [
- 'ca',
- ca_cert_name,
- 'certificate',
- ca_root_cert_data.replace('\n', ''),
- ]
- )
- self.cli_set(
- base_path + ['trusted-user-ca-key', 'ca-certificate', ca_cert_name]
- )
+ public_key_type = 'ssh-rsa'
+ public_key_data = ca_cert_data.replace('\n', '')
+ test_user = 'vyos_testca'
+ principal = 'vyos'
+ user_auth_base = ['system', 'login', 'user', test_user]
+
+ # create user account
+ self.cli_set(user_auth_base)
+ self.cli_set(pki_path + ['openssh', ca_cert_name, 'public',
+ 'key', public_key_data])
+ self.cli_set(pki_path + ['openssh', ca_cert_name, 'public',
+ 'type', public_key_type])
+ self.cli_set(trusted_user_ca_path, value=ca_cert_name)
self.cli_commit()
- trusted_user_ca_key_config = get_config_value('TrustedUserCAKeys')
- self.assertIn(trusted_user_ca_key, trusted_user_ca_key_config)
+ trusted_user_ca_config = get_config_value('TrustedUserCAKeys')
+ self.assertIn(trusted_user_ca, trusted_user_ca_config)
+
authorize_principals_file_config = get_config_value('AuthorizedPrincipalsFile')
self.assertIn('none', authorize_principals_file_config)
- with open(trusted_user_ca_key, 'r') as file:
- ca_key_contents = file.read()
- self.assertIn(ca_root_cert_data, ca_key_contents)
-
- self.cli_delete(
- base_path + ['trusted-user-ca-key', 'ca-certificate', ca_cert_name]
- )
- self.cli_delete(['pki', 'ca', ca_cert_name])
- self.cli_commit()
+ ca_key_contents = read_file(trusted_user_ca).lstrip().rstrip()
+ self.assertIn(f'{public_key_type} {public_key_data}', ca_key_contents)
- # Verify the CA key is removed
- trusted_user_ca_key_config = get_config_value('TrustedUserCAKeys')
- self.assertNotIn(trusted_user_ca_key, trusted_user_ca_key_config)
- authorize_principals_file_config = get_config_value('AuthorizedPrincipalsFile')
- self.assertNotIn('none', authorize_principals_file_config)
+ # Verify functionality by logging into the system using signed user key
+ key_filename = f'/tmp/{test_user}'
+ write_file(key_filename, cert_user_key, mode=0o600)
+ write_file(f'{key_filename}-cert.pub', cert_user_signed.replace('\n', ''))
- def test_ssh_trusted_user_ca_key_and_bind_user_with_principal(self):
- ca_cert_name = 'test_ca'
- bind_user = 'test_user'
- principals = ['test_principal_alice', 'test_principal_bob']
- test_user = 'ssh_test'
- test_pass = 'v2i57DZs8idUwMN3VC92'
+ # Login with proper credentials
+ output, error = self.ssh_send_cmd(test_command, test_user, password=None,
+ key_filename=key_filename)
+ # Verify login
+ self.assertFalse(error)
+ self.assertEqual(output, cmd(test_command))
- # Create a test user
- self.cli_set(
- [
- 'system',
- 'login',
- 'user',
- test_user,
- 'authentication',
- 'plaintext-password',
- test_pass,
- ]
- )
-
- # set pki ca <ca_cert_name> certificate <ca_key_data>
- # set service ssh trusted-user-ca-key ca-certificate <ca_cert_name>
- # set service ssh trusted-user-ca-key bind-user <bind_user> principal <principals>
- self.cli_set(
- pki_path
- + [
- 'ca',
- ca_cert_name,
- 'certificate',
- ca_root_cert_data.replace('\n', ''),
- ]
- )
- self.cli_set(
- base_path + ['trusted-user-ca-key', 'ca-certificate', ca_cert_name]
- )
- for principal in principals:
- self.cli_set(
- base_path
- + [
- 'trusted-user-ca-key',
- 'bind-user',
- bind_user,
- 'principal',
- principal,
- ]
- )
+ # Enable user principal name - logins only allowed if certificate contains
+ # said principal name
+ self.cli_set(user_auth_base + ['authentication', 'principal', principal])
self.cli_commit()
- trusted_user_ca_key_config = get_config_value('TrustedUserCAKeys')
- self.assertIn(trusted_user_ca_key, trusted_user_ca_key_config)
- authorized_principals_file = f'{authorized_principals_dir}/{bind_user}'
- self.assertTrue(os.path.exists(authorized_principals_file))
-
- with open(authorized_principals_file, 'r') as file:
- authorized_principals = file.read()
- for principal in principals:
- self.assertIn(principal, authorized_principals)
-
- for principal in principals:
- self.cli_delete(
- base_path
- + [
- 'trusted-user-ca-key',
- 'bind-user',
- bind_user,
- 'principal',
- principal,
- ]
- )
-
- self.cli_delete(
- base_path + ['trusted-user-ca-key', 'ca-certificate', ca_cert_name]
- )
+ # Verify generated SSH principals
+ authorized_principals_file = f'/home/{test_user}/.ssh/authorized_principals'
+ authorized_principals = read_file(authorized_principals_file, sudo=True)
+ self.assertIn(principal, authorized_principals)
+
+ # Login with proper credentials
+ output, error = self.ssh_send_cmd(test_command, test_user, password=None,
+ key_filename=key_filename)
+ # Verify login
+ self.assertFalse(error)
+ self.assertEqual(output, cmd(test_command))
+
+ self.cli_delete(trusted_user_ca_path)
+ self.cli_delete(user_auth_base)
self.cli_delete(['pki', 'ca', ca_cert_name])
- self.cli_delete(['system', 'login', 'user', test_user])
self.cli_commit()
- # Verify the authorized principals file is removed
- self.assertFalse(os.path.exists(authorized_principals_file))
+ # Verify the CA key is removed
+ trusted_user_ca_config = get_config_value('TrustedUserCAKeys')
+ self.assertNotIn(trusted_user_ca, trusted_user_ca_config)
+ self.assertFalse(os.path.exists(trusted_user_ca))
+ authorize_principals_file_config = get_config_value('AuthorizedPrincipalsFile')
+ self.assertNotIn('none', authorize_principals_file_config)
+ self.assertFalse(os.path.exists(f'/home/{test_user}/.ssh/authorized_principals'))
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/src/conf_mode/pki.py b/src/conf_mode/pki.py
index 14fe86d56..7d01b6642 100755
--- a/src/conf_mode/pki.py
+++ b/src/conf_mode/pki.py
@@ -64,7 +64,7 @@ sync_search = [
'path': ['service', 'https'],
},
{
- 'keys': ['ca_certificate'],
+ 'keys': ['key'],
'path': ['service', 'ssh'],
},
{
@@ -418,7 +418,8 @@ def verify(pki):
if 'country' in default_values:
country = default_values['country']
if len(country) != 2 or not country.isalpha():
- raise ConfigError(f'Invalid default country value. Value must be 2 alpha characters.')
+ raise ConfigError('Invalid default country value. '\
+ 'Value must be 2 alpha characters.')
if 'changed' in pki:
# if the list is getting longer, we can move to a dict() and also embed the
diff --git a/src/conf_mode/service_ssh.py b/src/conf_mode/service_ssh.py
index f3c76508b..3d38d940a 100755
--- a/src/conf_mode/service_ssh.py
+++ b/src/conf_mode/service_ssh.py
@@ -23,14 +23,14 @@ from syslog import LOG_INFO
from vyos.config import Config
from vyos.configdict import is_node_changed
from vyos.configverify import verify_vrf
-from vyos.configverify import verify_pki_ca_certificate
+from vyos.configverify import verify_pki_openssh_key
+from vyos.defaults import config_files
from vyos.utils.process import call
from vyos.template import render
from vyos import ConfigError
from vyos import airbag
-from vyos.pki import find_chain
-from vyos.pki import encode_certificate
-from vyos.pki import load_certificate
+from vyos.pki import encode_public_key
+from vyos.pki import load_openssh_public_key
from vyos.utils.dict import dict_search_recursive
from vyos.utils.file import write_file
@@ -45,7 +45,7 @@ key_rsa = '/etc/ssh/ssh_host_rsa_key'
key_dsa = '/etc/ssh/ssh_host_dsa_key'
key_ed25519 = '/etc/ssh/ssh_host_ed25519_key'
-trusted_user_ca_key = '/etc/ssh/trusted_user_ca_key'
+trusted_user_ca = config_files['sshd_user_ca']
def get_config(config=None):
if config:
@@ -79,9 +79,9 @@ def get_config(config=None):
get_first_key=True)
for value, _ in dict_search_recursive(tmp, 'principal'):
- # Only enable principal handling if SSH trusted-user-ca-key is set
- if 'trusted_user_ca_key' in ssh:
- ssh['trusted_user_ca_key'].update({'has_principals': {}})
+ # Only enable principal handling if SSH trusted-user-ca is set
+ if 'trusted_user_ca' in ssh:
+ ssh['has_principals'] = {}
# We do only need to execute this code path once as we need to know
# if any one of the local users has a principal set or not - this
# accounts for the entire system.
@@ -97,16 +97,8 @@ def verify(ssh):
if 'rekey' in ssh and 'data' not in ssh['rekey']:
raise ConfigError('Rekey data is required!')
- if 'trusted_user_ca_key' in ssh:
- if 'ca_certificate' not in ssh['trusted_user_ca_key']:
- raise ConfigError('CA certificate is mandatory when using ' \
- 'trusted-user-ca-key')
-
- ca_key_name = ssh['trusted_user_ca_key']['ca_certificate']
- verify_pki_ca_certificate(ssh, ca_key_name)
- pki_ca_cert = ssh['pki']['ca'][ca_key_name]
- if 'certificate' not in pki_ca_cert or not pki_ca_cert['certificate']:
- raise ConfigError(f"CA certificate '{ca_key_name}' is not valid or missing")
+ if 'trusted_user_ca' in ssh:
+ verify_pki_openssh_key(ssh, ssh['trusted_user_ca'])
verify_vrf(ssh)
return None
@@ -131,23 +123,17 @@ def generate(ssh):
syslog(LOG_INFO, 'SSH ed25519 host key not found, generating new key!')
call(f'ssh-keygen -q -N "" -t ed25519 -f {key_ed25519}')
- if 'trusted_user_ca_key' in ssh:
- ca_key_name = ssh['trusted_user_ca_key']['ca_certificate']
- pki_ca_cert = ssh['pki']['ca'][ca_key_name]
-
- loaded_ca_cert = load_certificate(pki_ca_cert['certificate'])
- loaded_ca_certs = {
- load_certificate(c['certificate'])
- for c in ssh['pki']['ca'].values()
- if 'certificate' in c
- }
-
- ca_full_chain = find_chain(loaded_ca_cert, loaded_ca_certs)
- write_file(trusted_user_ca_key,
- '\n'.join(encode_certificate(c) for c in ca_full_chain))
+ if 'trusted_user_ca' in ssh:
+ key_name = ssh['trusted_user_ca']
+ openssh_cert = ssh['pki']['openssh'][key_name]
+ loaded_ca_cert = load_openssh_public_key(openssh_cert['public']['key'],
+ openssh_cert['public']['type'])
+ tmp = encode_public_key(loaded_ca_cert, encoding='OpenSSH',
+ key_format='OpenSSH')
+ write_file(trusted_user_ca, tmp, trailing_newline=True)
else:
- if os.path.exists(trusted_user_ca_key):
- os.unlink(trusted_user_ca_key)
+ if os.path.exists(trusted_user_ca):
+ os.unlink(trusted_user_ca)
render(config_file, 'ssh/sshd_config.j2', ssh)
diff --git a/src/conf_mode/system_login.py b/src/conf_mode/system_login.py
index 481fdd16e..22b6fcc98 100755
--- a/src/conf_mode/system_login.py
+++ b/src/conf_mode/system_login.py
@@ -375,14 +375,15 @@ def apply(login):
chown(home_dir, user=user, recursive=True)
# Generate 2FA/MFA One-Time-Pad configuration
+ google_auth_file = f'{home_dir}/.google_authenticator'
if dict_search('authentication.otp.key', user_config):
enable_otp = True
- render(f'{home_dir}/.google_authenticator', 'login/pam_otp_ga.conf.j2',
+ render(google_auth_file, 'login/pam_otp_ga.conf.j2',
user_config, permission=0o400, user=user, group='users')
else:
# delete configuration as it's not enabled for the user
- if os.path.exists(f'{home_dir}/.google_authenticator'):
- os.remove(f'{home_dir}/.google_authenticator')
+ if os.path.exists(google_auth_file):
+ os.unlink(google_auth_file)
# Lock/Unlock local user account
lock_unlock = '--unlock'
@@ -396,6 +397,22 @@ def apply(login):
# Disable user to prevent re-login
call(f'usermod -s /sbin/nologin {user}')
+ home_dir = getpwnam(user).pw_dir
+ # Remove SSH authorized keys file
+ authorized_keys_file = f'{home_dir}/.ssh/authorized_keys'
+ if os.path.exists(authorized_keys_file):
+ os.unlink(authorized_keys_file)
+
+ # Remove SSH authorized principals file
+ principals_file = f'{home_dir}/.ssh/authorized_principals'
+ if os.path.exists(principals_file):
+ os.unlink(principals_file)
+
+ # Remove Google Authenticator file
+ google_auth_file = f'{home_dir}/.google_authenticator'
+ if os.path.exists(google_auth_file):
+ os.unlink(google_auth_file)
+
# Logout user if he is still logged in
if user in list(set([tmp[0] for tmp in users()])):
print(f'{user} is logged in, forcing logout!')
diff --git a/src/tests/test_template.py b/src/tests/test_template.py
index 7cae867a0..4660c0038 100644
--- a/src/tests/test_template.py
+++ b/src/tests/test_template.py
@@ -192,10 +192,15 @@ class TestVyOSTemplate(TestCase):
self.assertIn(IKEv2_DEFAULT, ','.join(ciphers))
def test_get_default_port(self):
+ from vyos.defaults import config_files
from vyos.defaults import internal_ports
with self.assertRaises(RuntimeError):
+ vyos.template.get_default_config_file('UNKNOWN')
+ with self.assertRaises(RuntimeError):
vyos.template.get_default_port('UNKNOWN')
+ self.assertEqual(vyos.template.get_default_config_file('sshd_user_ca'),
+ config_files['sshd_user_ca'])
self.assertEqual(vyos.template.get_default_port('certbot_haproxy'),
internal_ports['certbot_haproxy'])