diff options
| author | Nataliia Solomko <natalirs1985@gmail.com> | 2024-10-18 15:42:59 +0300 | 
|---|---|---|
| committer | Nataliia Solomko <natalirs1985@gmail.com> | 2024-10-18 15:42:59 +0300 | 
| commit | 8943446c8c79c6daad2326ddc7a59950123b35d2 (patch) | |
| tree | eca41b73cac77d5bc6362f3f13d3b6adf531978c /src | |
| parent | e5d2ac54150922640c08bacab124e96c7bbd1f7f (diff) | |
| download | vyos-1x-8943446c8c79c6daad2326ddc7a59950123b35d2.tar.gz vyos-1x-8943446c8c79c6daad2326ddc7a59950123b35d2.zip | |
pki: T4914: Rewrite the PKI op mode in the new style
Diffstat (limited to 'src')
| -rwxr-xr-x | src/op_mode/pki.py | 295 | ||||
| -rw-r--r-- | src/services/api/rest/routers.py | 4 | 
2 files changed, 165 insertions, 134 deletions
| diff --git a/src/op_mode/pki.py b/src/op_mode/pki.py index 5652a5d74..56b873bb1 100755 --- a/src/op_mode/pki.py +++ b/src/op_mode/pki.py @@ -14,16 +14,18 @@  # You should have received a copy of the GNU General Public License  # along with this program.  If not, see <http://www.gnu.org/licenses/>. -import argparse  import ipaddress  import os  import re  import sys  import tabulate +import typing  from cryptography import x509  from cryptography.x509.oid import ExtendedKeyUsageOID +import vyos.opmode +  from vyos.config import Config  from vyos.config import config_dict_mangle_acme  from vyos.pki import encode_certificate @@ -51,8 +53,36 @@ from vyos.utils.process import cmd  CERT_REQ_END = '-----END CERTIFICATE REQUEST-----'  auth_dir = '/config/auth' +ArgsPkiType = typing.Literal['ca', 'certificate', 'dh', 'key-pair', 'openvpn', 'crl'] +ArgsPkiTypeGen = typing.Literal[ArgsPkiType, typing.Literal['ssh', 'wireguard']] +ArgsFingerprint = typing.Literal['sha256', 'sha384', 'sha512'] +  # Helper Functions  conf = Config() + + +def _verify(target): +    """Decorator checks if config for PKI exists""" +    from functools import wraps + +    if target not in ['ca', 'certificate']: +        raise ValueError('Invalid PKI') + +    def _verify_target(func): +        @wraps(func) +        def _wrapper(*args, **kwargs): +            name = kwargs.get('name') +            unconf_message = f'PKI {target} "{name}" does not exist!' +            if name: +                if not conf.exists(['pki', target, name]): +                    raise vyos.opmode.UnconfiguredSubsystem(unconf_message) +            return func(*args, **kwargs) + +        return _wrapper + +    return _verify_target + +  def get_default_values():      # Fetch default x509 values      base = ['pki', 'x509', 'default'] @@ -872,8 +902,111 @@ def import_openvpn_secret(name, path):      install_openvpn_key(name, key_data, key_version) -# Show functions -def show_certificate_authority(name=None, pem=False): + +def generate_pki( +    raw: bool, +    pki_type: ArgsPkiTypeGen, +    name: typing.Optional[str], +    file: typing.Optional[bool], +    install: typing.Optional[bool], +    sign: typing.Optional[str], +    self_sign: typing.Optional[bool], +    key: typing.Optional[bool], +    psk: typing.Optional[bool], +    interface: typing.Optional[str], +    peer: typing.Optional[str], +): +    try: +        if pki_type == 'ca': +            if sign: +                generate_ca_certificate_sign(name, sign, install=install, file=file) +            else: +                generate_ca_certificate(name, install=install, file=file) +        elif pki_type == 'certificate': +            if sign: +                generate_certificate_sign(name, sign, install=install, file=file) +            elif self_sign: +                generate_certificate_selfsign(name, install=install, file=file) +            else: +                generate_certificate_request(name=name, install=install, file=file) + +        elif pki_type == 'crl': +            generate_certificate_revocation_list(name, install=install, file=file) + +        elif pki_type == 'ssh': +            generate_ssh_keypair(name, install=install, file=file) + +        elif pki_type == 'dh': +            generate_dh_parameters(name, install=install, file=file) + +        elif pki_type == 'key-pair': +            generate_keypair(name, install=install, file=file) + +        elif pki_type == 'openvpn': +            generate_openvpn_key(name, install=install, file=file) + +        elif pki_type == 'wireguard': +            # WireGuard supports writing key directly into the CLI, but this +            # requires the vyos_libexec_dir environment variable to be set +            os.environ['vyos_libexec_dir'] = '/usr/libexec/vyos' + +            if key: +                generate_wireguard_key(interface, install=install) +            if psk: +                generate_wireguard_psk(interface, peer=peer, install=install) +    except KeyboardInterrupt: +        print('Aborted') +        sys.exit(0) + + +def import_pki( +    name: str, +    pki_type: ArgsPkiType, +    filename: typing.Optional[str], +    key_filename: typing.Optional[str], +    no_prompt: typing.Optional[bool], +    passphrase: typing.Optional[str], +): +    try: +        if pki_type == 'ca': +            import_ca_certificate( +                name, +                path=filename, +                key_path=key_filename, +                no_prompt=no_prompt, +                passphrase=passphrase, +            ) +        elif pki_type == 'certificate': +            import_certificate( +                name, +                path=filename, +                key_path=key_filename, +                no_prompt=no_prompt, +                passphrase=passphrase, +            ) +        elif pki_type == 'crl': +            import_crl(name, filename) +        elif pki_type == 'dh': +            import_dh_parameters(name, filename) +        elif pki_type == 'key-pair': +            import_keypair( +                name, +                path=filename, +                key_path=key_filename, +                no_prompt=no_prompt, +                passphrase=passphrase, +            ) +        elif pki_type == 'openvpn': +            import_openvpn_secret(name, filename) +    except KeyboardInterrupt: +        print('Aborted') +        sys.exit(0) + + +@_verify('ca') +def show_certificate_authority( +    raw: bool, name: typing.Optional[str] = None, pem: typing.Optional[bool] = False +):      headers = ['Name', 'Subject', 'Issuer CN', 'Issued', 'Expiry', 'Private Key', 'Parent']      data = []      certs = get_config_ca_certificate() @@ -905,7 +1038,14 @@ def show_certificate_authority(name=None, pem=False):      print("Certificate Authorities:")      print(tabulate.tabulate(data, headers)) -def show_certificate(name=None, pem=False, fingerprint_hash=None): + +@_verify('certificate') +def show_certificate( +    raw: bool, +    name: typing.Optional[str] = None, +    pem: typing.Optional[bool] = False, +    fingerprint: typing.Optional[ArgsFingerprint] = None, +):      headers = ['Name', 'Type', 'Subject CN', 'Issuer CN', 'Issued', 'Expiry', 'Revoked', 'Private Key', 'CA Present']      data = []      certs = get_config_certificate() @@ -926,8 +1066,8 @@ def show_certificate(name=None, pem=False, fingerprint_hash=None):              if name and pem:                  print(encode_certificate(cert))                  return -            elif name and fingerprint_hash: -                print(get_certificate_fingerprint(cert, fingerprint_hash)) +            elif name and fingerprint: +                print(get_certificate_fingerprint(cert, fingerprint))                  return              ca_name = get_certificate_ca(cert, ca_certs) @@ -955,7 +1095,10 @@ def show_certificate(name=None, pem=False, fingerprint_hash=None):      print("Certificates:")      print(tabulate.tabulate(data, headers)) -def show_crl(name=None, pem=False): + +def show_crl( +    raw: bool, name: typing.Optional[str] = None, pem: typing.Optional[bool] = False +):      headers = ['CA Name', 'Updated', 'Revokes']      data = []      certs = get_config_ca_certificate() @@ -989,132 +1132,20 @@ def show_crl(name=None, pem=False):      print("Certificate Revocation Lists:")      print(tabulate.tabulate(data, headers)) -if __name__ == '__main__': -    parser = argparse.ArgumentParser() -    parser.add_argument('--action', help='PKI action', required=True) - -    # X509 -    parser.add_argument('--ca', help='Certificate Authority', required=False) -    parser.add_argument('--certificate', help='Certificate', required=False) -    parser.add_argument('--crl', help='Certificate Revocation List', required=False) -    parser.add_argument('--sign', help='Sign certificate with specified CA', required=False) -    parser.add_argument('--self-sign', help='Self-sign the certificate', action='store_true') -    parser.add_argument('--pem', help='Output using PEM encoding', action='store_true') -    parser.add_argument('--fingerprint', help='Show fingerprint and exit', action='store') -    # SSH -    parser.add_argument('--ssh', help='SSH Key', required=False) +def show_all(raw: bool): +    show_certificate_authority(raw) +    print('\n') +    show_certificate(raw) +    print('\n') +    show_crl(raw) -    # DH -    parser.add_argument('--dh', help='DH Parameters', required=False) - -    # Key pair -    parser.add_argument('--keypair', help='Key pair', required=False) - -    # OpenVPN -    parser.add_argument('--openvpn', help='OpenVPN TLS key', required=False) - -    # WireGuard -    parser.add_argument('--wireguard', help='Wireguard', action='store_true') -    group = parser.add_mutually_exclusive_group() -    group.add_argument('--key', help='Wireguard key pair', action='store_true', required=False) -    group.add_argument('--psk', help='Wireguard pre shared key', action='store_true', required=False) -    parser.add_argument('--interface', help='Install generated keys into running-config for named interface', action='store') -    parser.add_argument('--peer', help='Install generated keys into running-config for peer', action='store') - -    # Global -    parser.add_argument('--file', help='Write generated keys into specified filename', action='store_true') -    parser.add_argument('--install', help='Install generated keys into running-config', action='store_true') - -    parser.add_argument('--filename', help='Write certificate into specified filename', action='store') -    parser.add_argument('--key-filename', help='Write key into specified filename', action='store') - -    parser.add_argument('--no-prompt', action='store_true', help='Perform action non-interactively') -    parser.add_argument('--passphrase', help='A passphrase to decrypt the private key') - -    args = parser.parse_args() +if __name__ == '__main__':      try: -        if args.action == 'generate': -            if args.ca: -                if args.sign: -                    generate_ca_certificate_sign(args.ca, args.sign, install=args.install, file=args.file) -                else: -                    generate_ca_certificate(args.ca, install=args.install, file=args.file) -            elif args.certificate: -                if args.sign: -                    generate_certificate_sign(args.certificate, args.sign, install=args.install, file=args.file) -                elif args.self_sign: -                    generate_certificate_selfsign(args.certificate, install=args.install, file=args.file) -                else: -                    generate_certificate_request(name=args.certificate, install=args.install, file=args.file) - -            elif args.crl: -                generate_certificate_revocation_list(args.crl, install=args.install, file=args.file) - -            elif args.ssh: -                generate_ssh_keypair(args.ssh, install=args.install, file=args.file) - -            elif args.dh: -                generate_dh_parameters(args.dh, install=args.install, file=args.file) - -            elif args.keypair: -                generate_keypair(args.keypair, install=args.install, file=args.file) - -            elif args.openvpn: -                generate_openvpn_key(args.openvpn, install=args.install, file=args.file) - -            elif args.wireguard: -                # WireGuard supports writing key directly into the CLI, but this -                # requires the vyos_libexec_dir environment variable to be set -                os.environ["vyos_libexec_dir"] = "/usr/libexec/vyos" - -                if args.key: -                    generate_wireguard_key(args.interface, install=args.install) -                if args.psk: -                    generate_wireguard_psk(args.interface, peer=args.peer, install=args.install) -        elif args.action == 'import': -            if args.ca: -                import_ca_certificate(args.ca, path=args.filename, key_path=args.key_filename, -                                      no_prompt=args.no_prompt, passphrase=args.passphrase) -            elif args.certificate: -                import_certificate(args.certificate, path=args.filename, key_path=args.key_filename, -                                   no_prompt=args.no_prompt, passphrase=args.passphrase) -            elif args.crl: -                import_crl(args.crl, args.filename) -            elif args.dh: -                import_dh_parameters(args.dh, args.filename) -            elif args.keypair: -                import_keypair(args.keypair, path=args.filename, key_path=args.key_filename, -                               no_prompt=args.no_prompt, passphrase=args.passphrase) -            elif args.openvpn: -                import_openvpn_secret(args.openvpn, args.filename) -        elif args.action == 'show': -            if args.ca: -                ca_name = None if args.ca == 'all' else args.ca -                if ca_name: -                    if not conf.exists(['pki', 'ca', ca_name]): -                        print(f'CA "{ca_name}" does not exist!') -                        exit(1) -                show_certificate_authority(ca_name, args.pem) -            elif args.certificate: -                cert_name = None if args.certificate == 'all' else args.certificate -                if cert_name: -                    if not conf.exists(['pki', 'certificate', cert_name]): -                        print(f'Certificate "{cert_name}" does not exist!') -                        exit(1) -                if args.fingerprint is None: -                    show_certificate(None if args.certificate == 'all' else args.certificate, args.pem) -                else: -                    show_certificate(args.certificate, fingerprint_hash=args.fingerprint) -            elif args.crl: -                show_crl(None if args.crl == 'all' else args.crl, args.pem) -            else: -                show_certificate_authority() -                print('\n') -                show_certificate() -                print('\n') -                show_crl() -    except KeyboardInterrupt: -        print("Aborted") -        sys.exit(0) +        res = vyos.opmode.run(sys.modules[__name__]) +        if res: +            print(res) +    except (ValueError, vyos.opmode.Error) as e: +        print(e) +        sys.exit(1) diff --git a/src/services/api/rest/routers.py b/src/services/api/rest/routers.py index da981d5bf..5612e947c 100644 --- a/src/services/api/rest/routers.py +++ b/src/services/api/rest/routers.py @@ -423,9 +423,9 @@ def create_path_import_pki_no_prompt(path):      correct_paths = ['ca', 'certificate', 'key-pair']      if path[1] not in correct_paths:          return False -    path[1] = '--' + path[1].replace('-', '')      path[3] = '--key-filename' -    return path[1:] +    path.insert(2, '--name') +    return ['--pki-type'] + path[1:]  @router.post('/configure') | 
