From d9434e4b6e64e00eddb735a265f3fd86610d7004 Mon Sep 17 00:00:00 2001 From: sarthurdev <965089+sarthurdev@users.noreply.github.com> Date: Mon, 30 May 2022 20:52:22 +0200 Subject: pki: T3642: Add ability to import files into PKi configuration --- src/op_mode/pki.py | 196 +++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 190 insertions(+), 6 deletions(-) (limited to 'src') diff --git a/src/op_mode/pki.py b/src/op_mode/pki.py index bc7813052..193d97744 100755 --- a/src/op_mode/pki.py +++ b/src/op_mode/pki.py @@ -17,6 +17,7 @@ import argparse import ipaddress import os +import re import sys import tabulate @@ -30,7 +31,8 @@ from vyos.pki import encode_certificate, encode_public_key, encode_private_key, from vyos.pki import create_certificate, create_certificate_request, create_certificate_revocation_list from vyos.pki import create_private_key from vyos.pki import create_dh_parameters -from vyos.pki import load_certificate, load_certificate_request, load_private_key, load_crl +from vyos.pki import load_certificate, load_certificate_request, load_private_key +from vyos.pki import load_crl, load_dh_parameters, load_public_key from vyos.pki import verify_certificate from vyos.xml import defaults from vyos.util import ask_input, ask_yes_no @@ -183,13 +185,13 @@ def install_ssh_key(name, public_key, private_key, passphrase=None): ]) print(encode_private_key(private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase)) -def install_keypair(name, key_type, private_key=None, public_key=None, passphrase=None): +def install_keypair(name, key_type, private_key=None, public_key=None, passphrase=None, prompt=True): # Show/install conf commands for key-pair config_paths = [] if public_key: - install_public_key = ask_yes_no('Do you want to install the public key?', default=True) + install_public_key = not prompt or ask_yes_no('Do you want to install the public key?', default=True) public_key_pem = encode_public_key(public_key) if install_public_key: @@ -200,7 +202,7 @@ def install_keypair(name, key_type, private_key=None, public_key=None, passphras print(public_key_pem) if private_key: - install_private_key = ask_yes_no('Do you want to install the private key?', default=True) + install_private_key = not prompt or ask_yes_no('Do you want to install the private key?', default=True) private_key_pem = encode_private_key(private_key, passphrase=passphrase) if install_private_key: @@ -214,6 +216,13 @@ def install_keypair(name, key_type, private_key=None, public_key=None, passphras install_into_config(conf, config_paths) +def install_openvpn_key(name, key_data, key_version='1'): + config_paths = [ + f"pki openvpn shared-secret {name} key '{key_data}'", + f"pki openvpn shared-secret {name} version '{key_version}'" + ] + install_into_config(conf, config_paths) + def install_wireguard_key(interface, private_key, public_key): # Show conf commands for installing wireguard key pairs from vyos.ifconfig import Section @@ -640,7 +649,6 @@ def generate_openvpn_key(name, install=False, file=False): key_data = "".join(key_lines[1:-1]) # Remove wrapper tags and line endings key_version = '1' - import re version_search = re.search(r'BEGIN OpenVPN Static key V(\d+)', result) # Future-proofing (hopefully) if version_search: key_version = version_search[1] @@ -670,6 +678,167 @@ def generate_wireguard_psk(interface=None, peer=None, install=False): else: print(f'Pre-shared key: {psk}') +# Import functions +def import_ca_certificate(name, path=None, key_path=None): + if path: + if not os.path.exists(path): + print(f'File not found: {path}') + return + + cert = None + + with open(path) as f: + cert_data = f.read() + cert = load_certificate(cert_data, wrap_tags=False) + + if not cert: + print(f'Invalid certificate: {path}') + return + + install_certificate(name, cert, is_ca=True) + + if key_path: + if not os.path.exists(key_path): + print(f'File not found: {key_path}') + return + + key = None + passphrase = ask_input('Enter private key passphrase: ') or None + + with open(key_path) as f: + key_data = f.read() + key = load_private_key(key_data, passphrase=passphrase, wrap_tags=False) + + if not key: + print(f'Invalid private key or passphrase: {path}') + return + + install_certificate(name, private_key=key, is_ca=True) + +def import_certificate(name, path=None, key_path=None): + if path: + if not os.path.exists(path): + print(f'File not found: {path}') + return + + cert = None + + with open(path) as f: + cert_data = f.read() + cert = load_certificate(cert_data, wrap_tags=False) + + if not cert: + print(f'Invalid certificate: {path}') + return + + install_certificate(name, cert, is_ca=False) + + if key_path: + if not os.path.exists(key_path): + print(f'File not found: {key_path}') + return + + key = None + passphrase = ask_input('Enter private key passphrase: ') or None + + with open(key_path) as f: + key_data = f.read() + key = load_private_key(key_data, passphrase=passphrase, wrap_tags=False) + + if not key: + print(f'Invalid private key or passphrase: {path}') + return + + install_certificate(name, private_key=key, is_ca=False) + +def import_crl(name, path): + if not os.path.exists(path): + print(f'File not found: {path}') + return + + crl = None + + with open(path) as f: + crl_data = f.read() + crl = load_crl(crl_data, wrap_tags=False) + + if not crl: + print(f'Invalid certificate: {path}') + return + + install_crl(name, crl) + +def import_dh_parameters(name, path): + if not os.path.exists(path): + print(f'File not found: {path}') + return + + dh = None + + with open(path) as f: + dh_data = f.read() + dh = load_dh_parameters(dh_data, wrap_tags=False) + + if not dh: + print(f'Invalid DH parameters: {path}') + return + + install_dh_parameters(name, dh) + +def import_keypair(name, path=None, key_path=None): + if path: + if not os.path.exists(path): + print(f'File not found: {path}') + return + + key = None + + with open(path) as f: + key_data = f.read() + key = load_public_key(key_data, wrap_tags=False) + + if not key: + print(f'Invalid public key: {path}') + return + + install_keypair(name, None, public_key=key, prompt=False) + + if key_path: + if not os.path.exists(key_path): + print(f'File not found: {key_path}') + return + + key = None + passphrase = ask_input('Enter private key passphrase: ') or None + + with open(key_path) as f: + key_data = f.read() + key = load_private_key(key_data, passphrase=passphrase, wrap_tags=False) + + if not key: + print(f'Invalid private key or passphrase: {path}') + return + + install_keypair(name, None, private_key=key, prompt=False) + +def import_openvpn_secret(name, path): + if not os.path.exists(path): + print(f'File not found: {path}') + return + + key_data = None + key_version = '1' + + with open(path) as f: + key_lines = f.read().split("\n") + key_data = "".join(key_lines[1:-1]) # Remove wrapper tags and line endings + + version_search = re.search(r'BEGIN OpenVPN Static key V(\d+)', key_lines[0]) # Future-proofing (hopefully) + if version_search: + key_version = version_search[1] + + install_openvpn_key(name, key_data, key_version) + # Show functions def show_certificate_authority(name=None): headers = ['Name', 'Subject', 'Issuer CN', 'Issued', 'Expiry', 'Private Key', 'Parent'] @@ -799,6 +968,9 @@ if __name__ == '__main__': parser.add_argument('--file', help='Write generated keys into specified filename', action='store_true') parser.add_argument('--install', help='Install generated keys into running-config', action='store_true') + parser.add_argument('--filename', help='Write certificate into specified filename', action='store') + parser.add_argument('--key-filename', help='Write key into specified filename', action='store') + args = parser.parse_args() try: @@ -840,7 +1012,19 @@ if __name__ == '__main__': generate_wireguard_key(args.interface, install=args.install) if args.psk: generate_wireguard_psk(args.interface, peer=args.peer, install=args.install) - + elif args.action == 'import': + if args.ca: + import_ca_certificate(args.ca, path=args.filename, key_path=args.key_filename) + elif args.certificate: + import_certificate(args.certificate, path=args.filename, key_path=args.key_filename) + elif args.crl: + import_crl(args.crl, args.filename) + elif args.dh: + import_dh_parameters(args.dh, args.filename) + elif args.keypair: + import_keypair(args.keypair, path=args.filename, key_path=args.key_filename) + elif args.openvpn: + import_openvpn_secret(args.openvpn, args.filename) elif args.action == 'show': if args.ca: ca_name = None if args.ca == 'all' else args.ca -- cgit v1.2.3