From 8943446c8c79c6daad2326ddc7a59950123b35d2 Mon Sep 17 00:00:00 2001 From: Nataliia Solomko Date: Fri, 18 Oct 2024 15:42:59 +0300 Subject: pki: T4914: Rewrite the PKI op mode in the new style --- src/op_mode/pki.py | 295 +++++++++++++++++++++------------------ src/services/api/rest/routers.py | 4 +- 2 files changed, 165 insertions(+), 134 deletions(-) (limited to 'src') diff --git a/src/op_mode/pki.py b/src/op_mode/pki.py index 5652a5d74..56b873bb1 100755 --- a/src/op_mode/pki.py +++ b/src/op_mode/pki.py @@ -14,16 +14,18 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import argparse import ipaddress import os import re import sys import tabulate +import typing from cryptography import x509 from cryptography.x509.oid import ExtendedKeyUsageOID +import vyos.opmode + from vyos.config import Config from vyos.config import config_dict_mangle_acme from vyos.pki import encode_certificate @@ -51,8 +53,36 @@ from vyos.utils.process import cmd CERT_REQ_END = '-----END CERTIFICATE REQUEST-----' auth_dir = '/config/auth' +ArgsPkiType = typing.Literal['ca', 'certificate', 'dh', 'key-pair', 'openvpn', 'crl'] +ArgsPkiTypeGen = typing.Literal[ArgsPkiType, typing.Literal['ssh', 'wireguard']] +ArgsFingerprint = typing.Literal['sha256', 'sha384', 'sha512'] + # Helper Functions conf = Config() + + +def _verify(target): + """Decorator checks if config for PKI exists""" + from functools import wraps + + if target not in ['ca', 'certificate']: + raise ValueError('Invalid PKI') + + def _verify_target(func): + @wraps(func) + def _wrapper(*args, **kwargs): + name = kwargs.get('name') + unconf_message = f'PKI {target} "{name}" does not exist!' + if name: + if not conf.exists(['pki', target, name]): + raise vyos.opmode.UnconfiguredSubsystem(unconf_message) + return func(*args, **kwargs) + + return _wrapper + + return _verify_target + + def get_default_values(): # Fetch default x509 values base = ['pki', 'x509', 'default'] @@ -872,8 +902,111 @@ def import_openvpn_secret(name, path): install_openvpn_key(name, key_data, key_version) -# Show functions -def show_certificate_authority(name=None, pem=False): + +def generate_pki( + raw: bool, + pki_type: ArgsPkiTypeGen, + name: typing.Optional[str], + file: typing.Optional[bool], + install: typing.Optional[bool], + sign: typing.Optional[str], + self_sign: typing.Optional[bool], + key: typing.Optional[bool], + psk: typing.Optional[bool], + interface: typing.Optional[str], + peer: typing.Optional[str], +): + try: + if pki_type == 'ca': + if sign: + generate_ca_certificate_sign(name, sign, install=install, file=file) + else: + generate_ca_certificate(name, install=install, file=file) + elif pki_type == 'certificate': + if sign: + generate_certificate_sign(name, sign, install=install, file=file) + elif self_sign: + generate_certificate_selfsign(name, install=install, file=file) + else: + generate_certificate_request(name=name, install=install, file=file) + + elif pki_type == 'crl': + generate_certificate_revocation_list(name, install=install, file=file) + + elif pki_type == 'ssh': + generate_ssh_keypair(name, install=install, file=file) + + elif pki_type == 'dh': + generate_dh_parameters(name, install=install, file=file) + + elif pki_type == 'key-pair': + generate_keypair(name, install=install, file=file) + + elif pki_type == 'openvpn': + generate_openvpn_key(name, install=install, file=file) + + elif pki_type == 'wireguard': + # WireGuard supports writing key directly into the CLI, but this + # requires the vyos_libexec_dir environment variable to be set + os.environ['vyos_libexec_dir'] = '/usr/libexec/vyos' + + if key: + generate_wireguard_key(interface, install=install) + if psk: + generate_wireguard_psk(interface, peer=peer, install=install) + except KeyboardInterrupt: + print('Aborted') + sys.exit(0) + + +def import_pki( + name: str, + pki_type: ArgsPkiType, + filename: typing.Optional[str], + key_filename: typing.Optional[str], + no_prompt: typing.Optional[bool], + passphrase: typing.Optional[str], +): + try: + if pki_type == 'ca': + import_ca_certificate( + name, + path=filename, + key_path=key_filename, + no_prompt=no_prompt, + passphrase=passphrase, + ) + elif pki_type == 'certificate': + import_certificate( + name, + path=filename, + key_path=key_filename, + no_prompt=no_prompt, + passphrase=passphrase, + ) + elif pki_type == 'crl': + import_crl(name, filename) + elif pki_type == 'dh': + import_dh_parameters(name, filename) + elif pki_type == 'key-pair': + import_keypair( + name, + path=filename, + key_path=key_filename, + no_prompt=no_prompt, + passphrase=passphrase, + ) + elif pki_type == 'openvpn': + import_openvpn_secret(name, filename) + except KeyboardInterrupt: + print('Aborted') + sys.exit(0) + + +@_verify('ca') +def show_certificate_authority( + raw: bool, name: typing.Optional[str] = None, pem: typing.Optional[bool] = False +): headers = ['Name', 'Subject', 'Issuer CN', 'Issued', 'Expiry', 'Private Key', 'Parent'] data = [] certs = get_config_ca_certificate() @@ -905,7 +1038,14 @@ def show_certificate_authority(name=None, pem=False): print("Certificate Authorities:") print(tabulate.tabulate(data, headers)) -def show_certificate(name=None, pem=False, fingerprint_hash=None): + +@_verify('certificate') +def show_certificate( + raw: bool, + name: typing.Optional[str] = None, + pem: typing.Optional[bool] = False, + fingerprint: typing.Optional[ArgsFingerprint] = None, +): headers = ['Name', 'Type', 'Subject CN', 'Issuer CN', 'Issued', 'Expiry', 'Revoked', 'Private Key', 'CA Present'] data = [] certs = get_config_certificate() @@ -926,8 +1066,8 @@ def show_certificate(name=None, pem=False, fingerprint_hash=None): if name and pem: print(encode_certificate(cert)) return - elif name and fingerprint_hash: - print(get_certificate_fingerprint(cert, fingerprint_hash)) + elif name and fingerprint: + print(get_certificate_fingerprint(cert, fingerprint)) return ca_name = get_certificate_ca(cert, ca_certs) @@ -955,7 +1095,10 @@ def show_certificate(name=None, pem=False, fingerprint_hash=None): print("Certificates:") print(tabulate.tabulate(data, headers)) -def show_crl(name=None, pem=False): + +def show_crl( + raw: bool, name: typing.Optional[str] = None, pem: typing.Optional[bool] = False +): headers = ['CA Name', 'Updated', 'Revokes'] data = [] certs = get_config_ca_certificate() @@ -989,132 +1132,20 @@ def show_crl(name=None, pem=False): print("Certificate Revocation Lists:") print(tabulate.tabulate(data, headers)) -if __name__ == '__main__': - parser = argparse.ArgumentParser() - parser.add_argument('--action', help='PKI action', required=True) - - # X509 - parser.add_argument('--ca', help='Certificate Authority', required=False) - parser.add_argument('--certificate', help='Certificate', required=False) - parser.add_argument('--crl', help='Certificate Revocation List', required=False) - parser.add_argument('--sign', help='Sign certificate with specified CA', required=False) - parser.add_argument('--self-sign', help='Self-sign the certificate', action='store_true') - parser.add_argument('--pem', help='Output using PEM encoding', action='store_true') - parser.add_argument('--fingerprint', help='Show fingerprint and exit', action='store') - # SSH - parser.add_argument('--ssh', help='SSH Key', required=False) +def show_all(raw: bool): + show_certificate_authority(raw) + print('\n') + show_certificate(raw) + print('\n') + show_crl(raw) - # DH - parser.add_argument('--dh', help='DH Parameters', required=False) - - # Key pair - parser.add_argument('--keypair', help='Key pair', required=False) - - # OpenVPN - parser.add_argument('--openvpn', help='OpenVPN TLS key', required=False) - - # WireGuard - parser.add_argument('--wireguard', help='Wireguard', action='store_true') - group = parser.add_mutually_exclusive_group() - group.add_argument('--key', help='Wireguard key pair', action='store_true', required=False) - group.add_argument('--psk', help='Wireguard pre shared key', action='store_true', required=False) - parser.add_argument('--interface', help='Install generated keys into running-config for named interface', action='store') - parser.add_argument('--peer', help='Install generated keys into running-config for peer', action='store') - - # Global - parser.add_argument('--file', help='Write generated keys into specified filename', action='store_true') - parser.add_argument('--install', help='Install generated keys into running-config', action='store_true') - - parser.add_argument('--filename', help='Write certificate into specified filename', action='store') - parser.add_argument('--key-filename', help='Write key into specified filename', action='store') - - parser.add_argument('--no-prompt', action='store_true', help='Perform action non-interactively') - parser.add_argument('--passphrase', help='A passphrase to decrypt the private key') - - args = parser.parse_args() +if __name__ == '__main__': try: - if args.action == 'generate': - if args.ca: - if args.sign: - generate_ca_certificate_sign(args.ca, args.sign, install=args.install, file=args.file) - else: - generate_ca_certificate(args.ca, install=args.install, file=args.file) - elif args.certificate: - if args.sign: - generate_certificate_sign(args.certificate, args.sign, install=args.install, file=args.file) - elif args.self_sign: - generate_certificate_selfsign(args.certificate, install=args.install, file=args.file) - else: - generate_certificate_request(name=args.certificate, install=args.install, file=args.file) - - elif args.crl: - generate_certificate_revocation_list(args.crl, install=args.install, file=args.file) - - elif args.ssh: - generate_ssh_keypair(args.ssh, install=args.install, file=args.file) - - elif args.dh: - generate_dh_parameters(args.dh, install=args.install, file=args.file) - - elif args.keypair: - generate_keypair(args.keypair, install=args.install, file=args.file) - - elif args.openvpn: - generate_openvpn_key(args.openvpn, install=args.install, file=args.file) - - elif args.wireguard: - # WireGuard supports writing key directly into the CLI, but this - # requires the vyos_libexec_dir environment variable to be set - os.environ["vyos_libexec_dir"] = "/usr/libexec/vyos" - - if args.key: - generate_wireguard_key(args.interface, install=args.install) - if args.psk: - generate_wireguard_psk(args.interface, peer=args.peer, install=args.install) - elif args.action == 'import': - if args.ca: - import_ca_certificate(args.ca, path=args.filename, key_path=args.key_filename, - no_prompt=args.no_prompt, passphrase=args.passphrase) - elif args.certificate: - import_certificate(args.certificate, path=args.filename, key_path=args.key_filename, - no_prompt=args.no_prompt, passphrase=args.passphrase) - elif args.crl: - import_crl(args.crl, args.filename) - elif args.dh: - import_dh_parameters(args.dh, args.filename) - elif args.keypair: - import_keypair(args.keypair, path=args.filename, key_path=args.key_filename, - no_prompt=args.no_prompt, passphrase=args.passphrase) - elif args.openvpn: - import_openvpn_secret(args.openvpn, args.filename) - elif args.action == 'show': - if args.ca: - ca_name = None if args.ca == 'all' else args.ca - if ca_name: - if not conf.exists(['pki', 'ca', ca_name]): - print(f'CA "{ca_name}" does not exist!') - exit(1) - show_certificate_authority(ca_name, args.pem) - elif args.certificate: - cert_name = None if args.certificate == 'all' else args.certificate - if cert_name: - if not conf.exists(['pki', 'certificate', cert_name]): - print(f'Certificate "{cert_name}" does not exist!') - exit(1) - if args.fingerprint is None: - show_certificate(None if args.certificate == 'all' else args.certificate, args.pem) - else: - show_certificate(args.certificate, fingerprint_hash=args.fingerprint) - elif args.crl: - show_crl(None if args.crl == 'all' else args.crl, args.pem) - else: - show_certificate_authority() - print('\n') - show_certificate() - print('\n') - show_crl() - except KeyboardInterrupt: - print("Aborted") - sys.exit(0) + res = vyos.opmode.run(sys.modules[__name__]) + if res: + print(res) + except (ValueError, vyos.opmode.Error) as e: + print(e) + sys.exit(1) diff --git a/src/services/api/rest/routers.py b/src/services/api/rest/routers.py index da981d5bf..5612e947c 100644 --- a/src/services/api/rest/routers.py +++ b/src/services/api/rest/routers.py @@ -423,9 +423,9 @@ def create_path_import_pki_no_prompt(path): correct_paths = ['ca', 'certificate', 'key-pair'] if path[1] not in correct_paths: return False - path[1] = '--' + path[1].replace('-', '') path[3] = '--key-filename' - return path[1:] + path.insert(2, '--name') + return ['--pki-type'] + path[1:] @router.post('/configure') -- cgit v1.2.3