diff options
Diffstat (limited to 'src')
-rwxr-xr-x | src/conf_mode/service_ssh.py | 130 | ||||
-rwxr-xr-x | src/conf_mode/system_login.py | 13 |
2 files changed, 50 insertions, 93 deletions
diff --git a/src/conf_mode/service_ssh.py b/src/conf_mode/service_ssh.py index 5e6a6f771..f3c76508b 100755 --- a/src/conf_mode/service_ssh.py +++ b/src/conf_mode/service_ssh.py @@ -31,6 +31,7 @@ from vyos import airbag from vyos.pki import find_chain from vyos.pki import encode_certificate from vyos.pki import load_certificate +from vyos.utils.dict import dict_search_recursive from vyos.utils.file import write_file airbag.enable() @@ -45,78 +46,6 @@ key_dsa = '/etc/ssh/ssh_host_dsa_key' key_ed25519 = '/etc/ssh/ssh_host_ed25519_key' trusted_user_ca_key = '/etc/ssh/trusted_user_ca_key' -authorized_principals = '/etc/ssh/authorized_principals' - - -def cleanup_authorized_principals_dir(valid_users: list[str]): - if not os.path.isdir(authorized_principals): - return - - # Check the files (user name) under the directory and delete unnecessary ones. - for filename in os.listdir(authorized_principals): - file_path = os.path.join(authorized_principals, filename) - if os.path.isfile(file_path) and filename not in valid_users: - os.remove(file_path) - - # If the directory is empty, delete it too - if not os.listdir(authorized_principals): - os.rmdir(authorized_principals) - - -def handle_trusted_user_ca_key(ssh: dict): - if 'trusted_user_ca_key' not in ssh: - if os.path.exists(trusted_user_ca_key): - os.unlink(trusted_user_ca_key) - - # remove authorized_principals directory if it exists - cleanup_authorized_principals_dir([]) - return - - # trusted_user_ca_key is present - ca_key_name = ssh['trusted_user_ca_key']['ca_certificate'] - pki_ca_cert = ssh['pki']['ca'][ca_key_name] - - loaded_ca_cert = load_certificate(pki_ca_cert['certificate']) - loaded_ca_certs = { - load_certificate(c['certificate']) - for c in ssh['pki']['ca'].values() - if 'certificate' in c - } - - ca_full_chain = find_chain(loaded_ca_cert, loaded_ca_certs) - write_file( - trusted_user_ca_key, '\n'.join(encode_certificate(c) for c in ca_full_chain) - ) - - if 'bind-user' not in ssh['trusted_user_ca_key']: - # remove authorized_principals directory if it exists - cleanup_authorized_principals_dir([]) - return - - # bind-user is present - configured_users = [] - for bind_user, bind_user_config in ssh['trusted_user_ca_key']['bind-user'].items(): - if bind_user not in ssh['login_users']: - raise ConfigError(f"User '{bind_user}' not found in system login users") - - if 'principal' not in bind_user_config: - raise ConfigError(f"Principal not found for user '{bind_user}'") - - principals = bind_user_config['principal'] - if isinstance(principals, str): - principals = [principals] - - if not os.path.isdir(authorized_principals): - os.makedirs(authorized_principals, exist_ok=True) - - principal_file = os.path.join(authorized_principals, bind_user) - contents = '\n'.join(principals) + '\n' - write_file(principal_file, contents) - configured_users.append(bind_user) - - # remove unnecessary files under authorized_principals directory - cleanup_authorized_principals_dir(configured_users) - def get_config(config=None): if config: @@ -126,19 +55,9 @@ def get_config(config=None): base = ['service', 'ssh'] if not conf.exists(base): return None + ssh = conf.get_config_dict(base, key_mangling=('-', '_'), + get_first_key=True, with_pki=True) - ssh = conf.get_config_dict( - base, key_mangling=('-', '_'), get_first_key=True, with_pki=True - ) - login_users_base = ['system', 'login', 'user'] - login_users = conf.get_config_dict( - login_users_base, - key_mangling=('-', '_'), - no_tag_node_value_mangle=True, - get_first_key=True, - ) - - # create a list of all users, cli and users tmp = is_node_changed(conf, base + ['vrf']) if tmp: ssh.update({'restart_required': {}}) @@ -147,17 +66,27 @@ def get_config(config=None): # options which we need to update into the dictionary retrived. ssh = conf.merge_defaults(ssh, recursive=True) - # pass config file path - used in override template - ssh['config_file'] = config_file - - # use for trusted ca - ssh['login_users'] = login_users - # Ignore default XML values if config doesn't exists # Delete key from dict if not conf.exists(base + ['dynamic-protection']): del ssh['dynamic_protection'] + # See if any user has specified a list of principal names that are accepted + # for certificate authentication. + tmp = conf.get_config_dict(['system', 'login', 'user'], + key_mangling=('-', '_'), + no_tag_node_value_mangle=True, + get_first_key=True) + + for value, _ in dict_search_recursive(tmp, 'principal'): + # Only enable principal handling if SSH trusted-user-ca-key is set + if 'trusted_user_ca_key' in ssh: + ssh['trusted_user_ca_key'].update({'has_principals': {}}) + # We do only need to execute this code path once as we need to know + # if any one of the local users has a principal set or not - this + # accounts for the entire system. + break + return ssh @@ -170,7 +99,8 @@ def verify(ssh): if 'trusted_user_ca_key' in ssh: if 'ca_certificate' not in ssh['trusted_user_ca_key']: - raise ConfigError('CA certificate is required for TrustedUserCAKey') + raise ConfigError('CA certificate is mandatory when using ' \ + 'trusted-user-ca-key') ca_key_name = ssh['trusted_user_ca_key']['ca_certificate'] verify_pki_ca_certificate(ssh, ca_key_name) @@ -201,7 +131,23 @@ def generate(ssh): syslog(LOG_INFO, 'SSH ed25519 host key not found, generating new key!') call(f'ssh-keygen -q -N "" -t ed25519 -f {key_ed25519}') - handle_trusted_user_ca_key(ssh) + if 'trusted_user_ca_key' in ssh: + ca_key_name = ssh['trusted_user_ca_key']['ca_certificate'] + pki_ca_cert = ssh['pki']['ca'][ca_key_name] + + loaded_ca_cert = load_certificate(pki_ca_cert['certificate']) + loaded_ca_certs = { + load_certificate(c['certificate']) + for c in ssh['pki']['ca'].values() + if 'certificate' in c + } + + ca_full_chain = find_chain(loaded_ca_cert, loaded_ca_certs) + write_file(trusted_user_ca_key, + '\n'.join(encode_certificate(c) for c in ca_full_chain)) + else: + if os.path.exists(trusted_user_ca_key): + os.unlink(trusted_user_ca_key) render(config_file, 'ssh/sshd_config.j2', ssh) diff --git a/src/conf_mode/system_login.py b/src/conf_mode/system_login.py index 4febb6494..fa866c0ce 100755 --- a/src/conf_mode/system_login.py +++ b/src/conf_mode/system_login.py @@ -345,6 +345,17 @@ def apply(login): user_config, permission=0o600, formater=lambda _: _.replace(""", '"'), user=user, group='users') + + principals_file = f'{home_dir}/.ssh/authorized_principals' + if dict_search('authentication.principal', user_config): + render(principals_file, 'login/authorized_principals.j2', + user_config, permission=0o600, + formater=lambda _: _.replace(""", '"'), + user=user, group='users') + else: + if os.path.exists(principals_file): + os.unlink(principals_file) + except Exception as e: raise ConfigError(f'Adding user "{user}" raised exception: "{e}"') @@ -420,7 +431,7 @@ def apply(login): # Enable/disable Google authenticator cmd('pam-auth-update --disable mfa-google-authenticator') if enable_otp: - cmd(f'pam-auth-update --enable mfa-google-authenticator') + cmd('pam-auth-update --enable mfa-google-authenticator') return None |