diff options
Diffstat (limited to 'src')
| -rwxr-xr-x | src/conf_mode/service_ssh.py | 130 | ||||
| -rwxr-xr-x | src/conf_mode/system_login.py | 13 | 
2 files changed, 50 insertions, 93 deletions
| diff --git a/src/conf_mode/service_ssh.py b/src/conf_mode/service_ssh.py index 5e6a6f771..f3c76508b 100755 --- a/src/conf_mode/service_ssh.py +++ b/src/conf_mode/service_ssh.py @@ -31,6 +31,7 @@ from vyos import airbag  from vyos.pki import find_chain  from vyos.pki import encode_certificate  from vyos.pki import load_certificate +from vyos.utils.dict import dict_search_recursive  from vyos.utils.file import write_file  airbag.enable() @@ -45,78 +46,6 @@ key_dsa = '/etc/ssh/ssh_host_dsa_key'  key_ed25519 = '/etc/ssh/ssh_host_ed25519_key'  trusted_user_ca_key = '/etc/ssh/trusted_user_ca_key' -authorized_principals = '/etc/ssh/authorized_principals' - - -def cleanup_authorized_principals_dir(valid_users: list[str]): -    if not os.path.isdir(authorized_principals): -        return - -    # Check the files (user name) under the directory and delete unnecessary ones. -    for filename in os.listdir(authorized_principals): -        file_path = os.path.join(authorized_principals, filename) -        if os.path.isfile(file_path) and filename not in valid_users: -            os.remove(file_path) - -    # If the directory is empty, delete it too -    if not os.listdir(authorized_principals): -        os.rmdir(authorized_principals) - - -def handle_trusted_user_ca_key(ssh: dict): -    if 'trusted_user_ca_key' not in ssh: -        if os.path.exists(trusted_user_ca_key): -            os.unlink(trusted_user_ca_key) - -        # remove authorized_principals directory if it exists -        cleanup_authorized_principals_dir([]) -        return - -    # trusted_user_ca_key is present -    ca_key_name = ssh['trusted_user_ca_key']['ca_certificate'] -    pki_ca_cert = ssh['pki']['ca'][ca_key_name] - -    loaded_ca_cert = load_certificate(pki_ca_cert['certificate']) -    loaded_ca_certs = { -        load_certificate(c['certificate']) -        for c in ssh['pki']['ca'].values() -        if 'certificate' in c -    } - -    ca_full_chain = find_chain(loaded_ca_cert, loaded_ca_certs) -    write_file( -        trusted_user_ca_key, '\n'.join(encode_certificate(c) for c in ca_full_chain) -    ) - -    if 'bind-user' not in ssh['trusted_user_ca_key']: -        # remove authorized_principals directory if it exists -        cleanup_authorized_principals_dir([]) -        return - -    # bind-user is present -    configured_users = [] -    for bind_user, bind_user_config in ssh['trusted_user_ca_key']['bind-user'].items(): -        if bind_user not in ssh['login_users']: -            raise ConfigError(f"User '{bind_user}' not found in system login users") - -        if 'principal' not in bind_user_config: -            raise ConfigError(f"Principal not found for user '{bind_user}'") - -        principals = bind_user_config['principal'] -        if isinstance(principals, str): -            principals = [principals] - -        if not os.path.isdir(authorized_principals): -            os.makedirs(authorized_principals, exist_ok=True) - -        principal_file = os.path.join(authorized_principals, bind_user) -        contents = '\n'.join(principals) + '\n' -        write_file(principal_file, contents) -        configured_users.append(bind_user) - -    # remove unnecessary files under authorized_principals directory -    cleanup_authorized_principals_dir(configured_users) -  def get_config(config=None):      if config: @@ -126,19 +55,9 @@ def get_config(config=None):      base = ['service', 'ssh']      if not conf.exists(base):          return None +    ssh = conf.get_config_dict(base, key_mangling=('-', '_'), +                               get_first_key=True, with_pki=True) -    ssh = conf.get_config_dict( -        base, key_mangling=('-', '_'), get_first_key=True, with_pki=True -    ) -    login_users_base = ['system', 'login', 'user'] -    login_users = conf.get_config_dict( -        login_users_base, -        key_mangling=('-', '_'), -        no_tag_node_value_mangle=True, -        get_first_key=True, -    ) - -    # create a list of all users, cli and users      tmp = is_node_changed(conf, base + ['vrf'])      if tmp:          ssh.update({'restart_required': {}}) @@ -147,17 +66,27 @@ def get_config(config=None):      # options which we need to update into the dictionary retrived.      ssh = conf.merge_defaults(ssh, recursive=True) -    # pass config file path - used in override template -    ssh['config_file'] = config_file - -    # use for trusted ca -    ssh['login_users'] = login_users -      # Ignore default XML values if config doesn't exists      # Delete key from dict      if not conf.exists(base + ['dynamic-protection']):          del ssh['dynamic_protection'] +    # See if any user has specified a list of principal names that are accepted +    # for certificate authentication. +    tmp = conf.get_config_dict(['system', 'login', 'user'], +                                key_mangling=('-', '_'), +                                no_tag_node_value_mangle=True, +                                get_first_key=True) + +    for value, _ in dict_search_recursive(tmp, 'principal'): +        # Only enable principal handling if SSH trusted-user-ca-key is set +        if 'trusted_user_ca_key' in ssh: +            ssh['trusted_user_ca_key'].update({'has_principals': {}}) +        # We do only need to execute this code path once as we need to know +        # if any one of the local users has a principal set or not - this +        # accounts for the entire system. +        break +      return ssh @@ -170,7 +99,8 @@ def verify(ssh):      if 'trusted_user_ca_key' in ssh:          if 'ca_certificate' not in ssh['trusted_user_ca_key']: -            raise ConfigError('CA certificate is required for TrustedUserCAKey') +            raise ConfigError('CA certificate is mandatory when using ' \ +                              'trusted-user-ca-key')          ca_key_name = ssh['trusted_user_ca_key']['ca_certificate']          verify_pki_ca_certificate(ssh, ca_key_name) @@ -201,7 +131,23 @@ def generate(ssh):          syslog(LOG_INFO, 'SSH ed25519 host key not found, generating new key!')          call(f'ssh-keygen -q -N "" -t ed25519 -f {key_ed25519}') -    handle_trusted_user_ca_key(ssh) +    if 'trusted_user_ca_key' in ssh: +        ca_key_name = ssh['trusted_user_ca_key']['ca_certificate'] +        pki_ca_cert = ssh['pki']['ca'][ca_key_name] + +        loaded_ca_cert = load_certificate(pki_ca_cert['certificate']) +        loaded_ca_certs = { +            load_certificate(c['certificate']) +            for c in ssh['pki']['ca'].values() +            if 'certificate' in c +        } + +        ca_full_chain = find_chain(loaded_ca_cert, loaded_ca_certs) +        write_file(trusted_user_ca_key, +                   '\n'.join(encode_certificate(c) for c in ca_full_chain)) +    else: +        if os.path.exists(trusted_user_ca_key): +            os.unlink(trusted_user_ca_key)      render(config_file, 'ssh/sshd_config.j2', ssh) diff --git a/src/conf_mode/system_login.py b/src/conf_mode/system_login.py index 4febb6494..fa866c0ce 100755 --- a/src/conf_mode/system_login.py +++ b/src/conf_mode/system_login.py @@ -345,6 +345,17 @@ def apply(login):                         user_config, permission=0o600,                         formater=lambda _: _.replace(""", '"'),                         user=user, group='users') + +                principals_file = f'{home_dir}/.ssh/authorized_principals' +                if dict_search('authentication.principal', user_config): +                    render(principals_file, 'login/authorized_principals.j2', +                           user_config, permission=0o600, +                           formater=lambda _: _.replace(""", '"'), +                           user=user, group='users') +                else: +                    if os.path.exists(principals_file): +                        os.unlink(principals_file) +              except Exception as e:                  raise ConfigError(f'Adding user "{user}" raised exception: "{e}"') @@ -420,7 +431,7 @@ def apply(login):      # Enable/disable Google authenticator      cmd('pam-auth-update --disable mfa-google-authenticator')      if enable_otp: -        cmd(f'pam-auth-update --enable mfa-google-authenticator') +        cmd('pam-auth-update --enable mfa-google-authenticator')      return None | 
