diff options
| author | Christian Poessinger <christian@poessinger.com> | 2022-05-31 06:40:18 +0200 | 
|---|---|---|
| committer | GitHub <noreply@github.com> | 2022-05-31 06:40:18 +0200 | 
| commit | e771eab37de6841b4c3b9d98e8f974dacde34075 (patch) | |
| tree | 3070cb82df5de7a565db284f6a1c0a16392c6cc3 /src | |
| parent | 5642a1e01fff03a1b3a28a654ce363aab51b2b33 (diff) | |
| parent | 43cd3468868712434625826379e0e6b95b34f474 (diff) | |
| download | vyos-1x-e771eab37de6841b4c3b9d98e8f974dacde34075.tar.gz vyos-1x-e771eab37de6841b4c3b9d98e8f974dacde34075.zip | |
Merge pull request #1343 from sarthurdev/pki_import
pki: T3642: Add ability to import files into PKi configuration
Diffstat (limited to 'src')
| -rwxr-xr-x | src/op_mode/pki.py | 201 | 
1 files changed, 191 insertions, 10 deletions
| diff --git a/src/op_mode/pki.py b/src/op_mode/pki.py index bc7813052..1e78c3a03 100755 --- a/src/op_mode/pki.py +++ b/src/op_mode/pki.py @@ -17,6 +17,7 @@  import argparse  import ipaddress  import os +import re  import sys  import tabulate @@ -30,7 +31,8 @@ from vyos.pki import encode_certificate, encode_public_key, encode_private_key,  from vyos.pki import create_certificate, create_certificate_request, create_certificate_revocation_list  from vyos.pki import create_private_key  from vyos.pki import create_dh_parameters -from vyos.pki import load_certificate, load_certificate_request, load_private_key, load_crl +from vyos.pki import load_certificate, load_certificate_request, load_private_key +from vyos.pki import load_crl, load_dh_parameters, load_public_key  from vyos.pki import verify_certificate  from vyos.xml import defaults  from vyos.util import ask_input, ask_yes_no @@ -183,13 +185,13 @@ def install_ssh_key(name, public_key, private_key, passphrase=None):      ])      print(encode_private_key(private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase)) -def install_keypair(name, key_type, private_key=None, public_key=None, passphrase=None): +def install_keypair(name, key_type, private_key=None, public_key=None, passphrase=None, prompt=True):      # Show/install conf commands for key-pair      config_paths = []      if public_key: -        install_public_key = ask_yes_no('Do you want to install the public key?', default=True) +        install_public_key = not prompt or ask_yes_no('Do you want to install the public key?', default=True)          public_key_pem = encode_public_key(public_key)          if install_public_key: @@ -200,7 +202,7 @@ def install_keypair(name, key_type, private_key=None, public_key=None, passphras              print(public_key_pem)      if private_key: -        install_private_key = ask_yes_no('Do you want to install the private key?', default=True) +        install_private_key = not prompt or ask_yes_no('Do you want to install the private key?', default=True)          private_key_pem = encode_private_key(private_key, passphrase=passphrase)          if install_private_key: @@ -214,6 +216,13 @@ def install_keypair(name, key_type, private_key=None, public_key=None, passphras      install_into_config(conf, config_paths) +def install_openvpn_key(name, key_data, key_version='1'): +    config_paths = [ +        f"pki openvpn shared-secret {name} key '{key_data}'", +        f"pki openvpn shared-secret {name} version '{key_version}'" +    ] +    install_into_config(conf, config_paths) +  def install_wireguard_key(interface, private_key, public_key):      # Show conf commands for installing wireguard key pairs      from vyos.ifconfig import Section @@ -640,15 +649,11 @@ def generate_openvpn_key(name, install=False, file=False):          key_data = "".join(key_lines[1:-1]) # Remove wrapper tags and line endings          key_version = '1' -        import re          version_search = re.search(r'BEGIN OpenVPN Static key V(\d+)', result) # Future-proofing (hopefully)          if version_search:              key_version = version_search[1] -        base = f"set pki openvpn shared-secret {name}" -        print("Configure mode commands to install OpenVPN key:") -        print(f"{base} key '{key_data}'") -        print(f"{base} version '{key_version}'") +        install_openvpn_key(name, key_data, key_version)      if file:          write_file(f'{name}.key', result) @@ -670,6 +675,167 @@ def generate_wireguard_psk(interface=None, peer=None, install=False):      else:          print(f'Pre-shared key: {psk}') +# Import functions +def import_ca_certificate(name, path=None, key_path=None): +    if path: +        if not os.path.exists(path): +            print(f'File not found: {path}') +            return + +        cert = None + +        with open(path) as f: +            cert_data = f.read() +            cert = load_certificate(cert_data, wrap_tags=False) + +        if not cert: +            print(f'Invalid certificate: {path}') +            return + +        install_certificate(name, cert, is_ca=True) + +    if key_path: +        if not os.path.exists(key_path): +            print(f'File not found: {key_path}') +            return + +        key = None +        passphrase = ask_input('Enter private key passphrase: ') or None + +        with open(key_path) as f: +            key_data = f.read() +            key = load_private_key(key_data, passphrase=passphrase, wrap_tags=False) + +        if not key: +            print(f'Invalid private key or passphrase: {path}') +            return + +        install_certificate(name, private_key=key, is_ca=True) + +def import_certificate(name, path=None, key_path=None): +    if path: +        if not os.path.exists(path): +            print(f'File not found: {path}') +            return + +        cert = None + +        with open(path) as f: +            cert_data = f.read() +            cert = load_certificate(cert_data, wrap_tags=False) + +        if not cert: +            print(f'Invalid certificate: {path}') +            return + +        install_certificate(name, cert, is_ca=False) + +    if key_path: +        if not os.path.exists(key_path): +            print(f'File not found: {key_path}') +            return + +        key = None +        passphrase = ask_input('Enter private key passphrase: ') or None + +        with open(key_path) as f: +            key_data = f.read() +            key = load_private_key(key_data, passphrase=passphrase, wrap_tags=False) + +        if not key: +            print(f'Invalid private key or passphrase: {path}') +            return + +        install_certificate(name, private_key=key, is_ca=False) + +def import_crl(name, path): +    if not os.path.exists(path): +        print(f'File not found: {path}') +        return + +    crl = None + +    with open(path) as f: +        crl_data = f.read() +        crl = load_crl(crl_data, wrap_tags=False) + +    if not crl: +        print(f'Invalid certificate: {path}') +        return + +    install_crl(name, crl) + +def import_dh_parameters(name, path): +    if not os.path.exists(path): +        print(f'File not found: {path}') +        return + +    dh = None + +    with open(path) as f: +        dh_data = f.read() +        dh = load_dh_parameters(dh_data, wrap_tags=False) + +    if not dh: +        print(f'Invalid DH parameters: {path}') +        return + +    install_dh_parameters(name, dh) + +def import_keypair(name, path=None, key_path=None): +    if path: +        if not os.path.exists(path): +            print(f'File not found: {path}') +            return + +        key = None + +        with open(path) as f: +            key_data = f.read() +            key = load_public_key(key_data, wrap_tags=False) + +        if not key: +            print(f'Invalid public key: {path}') +            return + +        install_keypair(name, None, public_key=key, prompt=False) + +    if key_path: +        if not os.path.exists(key_path): +            print(f'File not found: {key_path}') +            return + +        key = None +        passphrase = ask_input('Enter private key passphrase: ') or None + +        with open(key_path) as f: +            key_data = f.read() +            key = load_private_key(key_data, passphrase=passphrase, wrap_tags=False) + +        if not key: +            print(f'Invalid private key or passphrase: {path}') +            return + +        install_keypair(name, None, private_key=key, prompt=False) + +def import_openvpn_secret(name, path): +    if not os.path.exists(path): +        print(f'File not found: {path}') +        return + +    key_data = None +    key_version = '1' + +    with open(path) as f: +        key_lines = f.read().split("\n") +        key_data = "".join(key_lines[1:-1]) # Remove wrapper tags and line endings + +    version_search = re.search(r'BEGIN OpenVPN Static key V(\d+)', key_lines[0]) # Future-proofing (hopefully) +    if version_search: +        key_version = version_search[1] + +    install_openvpn_key(name, key_data, key_version) +  # Show functions  def show_certificate_authority(name=None):      headers = ['Name', 'Subject', 'Issuer CN', 'Issued', 'Expiry', 'Private Key', 'Parent'] @@ -799,6 +965,9 @@ if __name__ == '__main__':      parser.add_argument('--file', help='Write generated keys into specified filename', action='store_true')      parser.add_argument('--install', help='Install generated keys into running-config', action='store_true') +    parser.add_argument('--filename', help='Write certificate into specified filename', action='store') +    parser.add_argument('--key-filename', help='Write key into specified filename', action='store') +      args = parser.parse_args()      try: @@ -840,7 +1009,19 @@ if __name__ == '__main__':                      generate_wireguard_key(args.interface, install=args.install)                  if args.psk:                      generate_wireguard_psk(args.interface, peer=args.peer, install=args.install) - +        elif args.action == 'import': +            if args.ca: +                import_ca_certificate(args.ca, path=args.filename, key_path=args.key_filename) +            elif args.certificate: +                import_certificate(args.certificate, path=args.filename, key_path=args.key_filename) +            elif args.crl: +                import_crl(args.crl, args.filename) +            elif args.dh: +                import_dh_parameters(args.dh, args.filename) +            elif args.keypair: +                import_keypair(args.keypair, path=args.filename, key_path=args.key_filename) +            elif args.openvpn: +                import_openvpn_secret(args.openvpn, args.filename)          elif args.action == 'show':              if args.ca:                  ca_name = None if args.ca == 'all' else args.ca | 
