summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorChristian Breunig <christian@breunig.cc>2025-05-20 19:49:39 +0200
committerChristian Breunig <christian@breunig.cc>2025-05-29 13:57:48 +0200
commit81dfb64ebb3ea3c58c92e8f26e8610a46e4c50d2 (patch)
tree90ff9aeae2bb90e7fd75ac5b31e08deabce9d8cd /src
parent6c3b1ef2fede1e3c2b6e89060d3d645c2ba744cd (diff)
downloadvyos-1x-81dfb64ebb3ea3c58c92e8f26e8610a46e4c50d2.tar.gz
vyos-1x-81dfb64ebb3ea3c58c92e8f26e8610a46e4c50d2.zip
ssh: T6013: move principal name to "system login user <name> authentication"
We already support using per-user SSH public keys for system authentication. Instead of introducing a new CLI path to configure per-user principal names, we should continue using the existing CLI location and store the principal names alongside the corresponding SSH public keys. set system login user <name> principal <principal> The certificate used for SSH authentication contains an embedded principal name, which is defined under this CLI node. Only users with matching principal names are permitted to log in.
Diffstat (limited to 'src')
-rwxr-xr-xsrc/conf_mode/service_ssh.py130
-rwxr-xr-xsrc/conf_mode/system_login.py13
2 files changed, 50 insertions, 93 deletions
diff --git a/src/conf_mode/service_ssh.py b/src/conf_mode/service_ssh.py
index 5e6a6f771..f3c76508b 100755
--- a/src/conf_mode/service_ssh.py
+++ b/src/conf_mode/service_ssh.py
@@ -31,6 +31,7 @@ from vyos import airbag
from vyos.pki import find_chain
from vyos.pki import encode_certificate
from vyos.pki import load_certificate
+from vyos.utils.dict import dict_search_recursive
from vyos.utils.file import write_file
airbag.enable()
@@ -45,78 +46,6 @@ key_dsa = '/etc/ssh/ssh_host_dsa_key'
key_ed25519 = '/etc/ssh/ssh_host_ed25519_key'
trusted_user_ca_key = '/etc/ssh/trusted_user_ca_key'
-authorized_principals = '/etc/ssh/authorized_principals'
-
-
-def cleanup_authorized_principals_dir(valid_users: list[str]):
- if not os.path.isdir(authorized_principals):
- return
-
- # Check the files (user name) under the directory and delete unnecessary ones.
- for filename in os.listdir(authorized_principals):
- file_path = os.path.join(authorized_principals, filename)
- if os.path.isfile(file_path) and filename not in valid_users:
- os.remove(file_path)
-
- # If the directory is empty, delete it too
- if not os.listdir(authorized_principals):
- os.rmdir(authorized_principals)
-
-
-def handle_trusted_user_ca_key(ssh: dict):
- if 'trusted_user_ca_key' not in ssh:
- if os.path.exists(trusted_user_ca_key):
- os.unlink(trusted_user_ca_key)
-
- # remove authorized_principals directory if it exists
- cleanup_authorized_principals_dir([])
- return
-
- # trusted_user_ca_key is present
- ca_key_name = ssh['trusted_user_ca_key']['ca_certificate']
- pki_ca_cert = ssh['pki']['ca'][ca_key_name]
-
- loaded_ca_cert = load_certificate(pki_ca_cert['certificate'])
- loaded_ca_certs = {
- load_certificate(c['certificate'])
- for c in ssh['pki']['ca'].values()
- if 'certificate' in c
- }
-
- ca_full_chain = find_chain(loaded_ca_cert, loaded_ca_certs)
- write_file(
- trusted_user_ca_key, '\n'.join(encode_certificate(c) for c in ca_full_chain)
- )
-
- if 'bind-user' not in ssh['trusted_user_ca_key']:
- # remove authorized_principals directory if it exists
- cleanup_authorized_principals_dir([])
- return
-
- # bind-user is present
- configured_users = []
- for bind_user, bind_user_config in ssh['trusted_user_ca_key']['bind-user'].items():
- if bind_user not in ssh['login_users']:
- raise ConfigError(f"User '{bind_user}' not found in system login users")
-
- if 'principal' not in bind_user_config:
- raise ConfigError(f"Principal not found for user '{bind_user}'")
-
- principals = bind_user_config['principal']
- if isinstance(principals, str):
- principals = [principals]
-
- if not os.path.isdir(authorized_principals):
- os.makedirs(authorized_principals, exist_ok=True)
-
- principal_file = os.path.join(authorized_principals, bind_user)
- contents = '\n'.join(principals) + '\n'
- write_file(principal_file, contents)
- configured_users.append(bind_user)
-
- # remove unnecessary files under authorized_principals directory
- cleanup_authorized_principals_dir(configured_users)
-
def get_config(config=None):
if config:
@@ -126,19 +55,9 @@ def get_config(config=None):
base = ['service', 'ssh']
if not conf.exists(base):
return None
+ ssh = conf.get_config_dict(base, key_mangling=('-', '_'),
+ get_first_key=True, with_pki=True)
- ssh = conf.get_config_dict(
- base, key_mangling=('-', '_'), get_first_key=True, with_pki=True
- )
- login_users_base = ['system', 'login', 'user']
- login_users = conf.get_config_dict(
- login_users_base,
- key_mangling=('-', '_'),
- no_tag_node_value_mangle=True,
- get_first_key=True,
- )
-
- # create a list of all users, cli and users
tmp = is_node_changed(conf, base + ['vrf'])
if tmp:
ssh.update({'restart_required': {}})
@@ -147,17 +66,27 @@ def get_config(config=None):
# options which we need to update into the dictionary retrived.
ssh = conf.merge_defaults(ssh, recursive=True)
- # pass config file path - used in override template
- ssh['config_file'] = config_file
-
- # use for trusted ca
- ssh['login_users'] = login_users
-
# Ignore default XML values if config doesn't exists
# Delete key from dict
if not conf.exists(base + ['dynamic-protection']):
del ssh['dynamic_protection']
+ # See if any user has specified a list of principal names that are accepted
+ # for certificate authentication.
+ tmp = conf.get_config_dict(['system', 'login', 'user'],
+ key_mangling=('-', '_'),
+ no_tag_node_value_mangle=True,
+ get_first_key=True)
+
+ for value, _ in dict_search_recursive(tmp, 'principal'):
+ # Only enable principal handling if SSH trusted-user-ca-key is set
+ if 'trusted_user_ca_key' in ssh:
+ ssh['trusted_user_ca_key'].update({'has_principals': {}})
+ # We do only need to execute this code path once as we need to know
+ # if any one of the local users has a principal set or not - this
+ # accounts for the entire system.
+ break
+
return ssh
@@ -170,7 +99,8 @@ def verify(ssh):
if 'trusted_user_ca_key' in ssh:
if 'ca_certificate' not in ssh['trusted_user_ca_key']:
- raise ConfigError('CA certificate is required for TrustedUserCAKey')
+ raise ConfigError('CA certificate is mandatory when using ' \
+ 'trusted-user-ca-key')
ca_key_name = ssh['trusted_user_ca_key']['ca_certificate']
verify_pki_ca_certificate(ssh, ca_key_name)
@@ -201,7 +131,23 @@ def generate(ssh):
syslog(LOG_INFO, 'SSH ed25519 host key not found, generating new key!')
call(f'ssh-keygen -q -N "" -t ed25519 -f {key_ed25519}')
- handle_trusted_user_ca_key(ssh)
+ if 'trusted_user_ca_key' in ssh:
+ ca_key_name = ssh['trusted_user_ca_key']['ca_certificate']
+ pki_ca_cert = ssh['pki']['ca'][ca_key_name]
+
+ loaded_ca_cert = load_certificate(pki_ca_cert['certificate'])
+ loaded_ca_certs = {
+ load_certificate(c['certificate'])
+ for c in ssh['pki']['ca'].values()
+ if 'certificate' in c
+ }
+
+ ca_full_chain = find_chain(loaded_ca_cert, loaded_ca_certs)
+ write_file(trusted_user_ca_key,
+ '\n'.join(encode_certificate(c) for c in ca_full_chain))
+ else:
+ if os.path.exists(trusted_user_ca_key):
+ os.unlink(trusted_user_ca_key)
render(config_file, 'ssh/sshd_config.j2', ssh)
diff --git a/src/conf_mode/system_login.py b/src/conf_mode/system_login.py
index 4febb6494..fa866c0ce 100755
--- a/src/conf_mode/system_login.py
+++ b/src/conf_mode/system_login.py
@@ -345,6 +345,17 @@ def apply(login):
user_config, permission=0o600,
formater=lambda _: _.replace("&quot;", '"'),
user=user, group='users')
+
+ principals_file = f'{home_dir}/.ssh/authorized_principals'
+ if dict_search('authentication.principal', user_config):
+ render(principals_file, 'login/authorized_principals.j2',
+ user_config, permission=0o600,
+ formater=lambda _: _.replace("&quot;", '"'),
+ user=user, group='users')
+ else:
+ if os.path.exists(principals_file):
+ os.unlink(principals_file)
+
except Exception as e:
raise ConfigError(f'Adding user "{user}" raised exception: "{e}"')
@@ -420,7 +431,7 @@ def apply(login):
# Enable/disable Google authenticator
cmd('pam-auth-update --disable mfa-google-authenticator')
if enable_otp:
- cmd(f'pam-auth-update --enable mfa-google-authenticator')
+ cmd('pam-auth-update --enable mfa-google-authenticator')
return None