diff options
author | Kim <kim.sidney@gmail.com> | 2021-10-07 16:52:56 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-10-07 16:52:56 +0200 |
commit | 2274dbf9047493a00a6f30346b38dacd8cfcf965 (patch) | |
tree | f431f5f6f1b2770c98ed9047e1cec9209e536366 /src/op_mode/pki.py | |
parent | 2acfffab8b98238e7d869673a858a4ae21651f0b (diff) | |
parent | adc7ef387d40e92bd7163ee6b401e99e554394a3 (diff) | |
download | vyos-1x-2274dbf9047493a00a6f30346b38dacd8cfcf965.tar.gz vyos-1x-2274dbf9047493a00a6f30346b38dacd8cfcf965.zip |
Merge branch 'current' into 2fa
Diffstat (limited to 'src/op_mode/pki.py')
-rwxr-xr-x | src/op_mode/pki.py | 189 |
1 files changed, 113 insertions, 76 deletions
diff --git a/src/op_mode/pki.py b/src/op_mode/pki.py index 297270cf1..2283cd820 100755 --- a/src/op_mode/pki.py +++ b/src/op_mode/pki.py @@ -17,7 +17,6 @@ import argparse import ipaddress import os -import re import sys import tabulate @@ -25,6 +24,7 @@ from cryptography import x509 from cryptography.x509.oid import ExtendedKeyUsageOID from vyos.config import Config +from vyos.configquery import ConfigTreeQuery from vyos.configdict import dict_merge from vyos.pki import encode_certificate, encode_public_key, encode_private_key, encode_dh_parameters from vyos.pki import create_certificate, create_certificate_request, create_certificate_revocation_list @@ -37,25 +37,24 @@ from vyos.util import ask_input, ask_yes_no from vyos.util import cmd CERT_REQ_END = '-----END CERTIFICATE REQUEST-----' - auth_dir = '/config/auth' # Helper Functions - +conf = ConfigTreeQuery() def get_default_values(): # Fetch default x509 values - conf = Config() base = ['pki', 'x509', 'default'] x509_defaults = conf.get_config_dict(base, key_mangling=('-', '_'), - get_first_key=True, no_tag_node_value_mangle=True) + get_first_key=True, + no_tag_node_value_mangle=True) default_values = defaults(base) - return dict_merge(default_values, x509_defaults) + x509_defaults = dict_merge(default_values, x509_defaults) + + return x509_defaults def get_config_ca_certificate(name=None): # Fetch ca certificates from config - conf = Config() base = ['pki', 'ca'] - if not conf.exists(base): return False @@ -65,13 +64,12 @@ def get_config_ca_certificate(name=None): return False return conf.get_config_dict(base, key_mangling=('-', '_'), - get_first_key=True, no_tag_node_value_mangle=True) + get_first_key=True, + no_tag_node_value_mangle=True) def get_config_certificate(name=None): # Get certificates from config - conf = Config() base = ['pki', 'certificate'] - if not conf.exists(base): return False @@ -81,7 +79,8 @@ def get_config_certificate(name=None): return False return conf.get_config_dict(base, key_mangling=('-', '_'), - get_first_key=True, no_tag_node_value_mangle=True) + get_first_key=True, + no_tag_node_value_mangle=True) def get_certificate_ca(cert, ca_certs): # Find CA certificate for given certificate @@ -100,7 +99,6 @@ def get_certificate_ca(cert, ca_certs): def get_config_revoked_certificates(): # Fetch revoked certificates from config - conf = Config() ca_base = ['pki', 'ca'] cert_base = ['pki', 'certificate'] @@ -108,12 +106,14 @@ def get_config_revoked_certificates(): if conf.exists(ca_base): ca_certificates = conf.get_config_dict(ca_base, key_mangling=('-', '_'), - get_first_key=True, no_tag_node_value_mangle=True) + get_first_key=True, + no_tag_node_value_mangle=True) certs.extend(ca_certificates.values()) if conf.exists(cert_base): certificates = conf.get_config_dict(cert_base, key_mangling=('-', '_'), - get_first_key=True, no_tag_node_value_mangle=True) + get_first_key=True, + no_tag_node_value_mangle=True) certs.extend(certificates.values()) return [cert_dict for cert_dict in certs if 'revoke' in cert_dict] @@ -144,39 +144,41 @@ def get_revoked_by_serial_numbers(serial_numbers=[]): def install_certificate(name, cert='', private_key=None, key_type=None, key_passphrase=None, is_ca=False): # Show conf commands for installing certificate prefix = 'ca' if is_ca else 'certificate' - print("Configure mode commands to install:") + print('Configure mode commands to install:') + base = f"set pki {prefix} {name}" if cert: cert_pem = "".join(encode_certificate(cert).strip().split("\n")[1:-1]) - print("set pki %s %s certificate '%s'" % (prefix, name, cert_pem)) + print(f"{base} certificate '{cert_pem}'") if private_key: key_pem = "".join(encode_private_key(private_key, passphrase=key_passphrase).strip().split("\n")[1:-1]) - print("set pki %s %s private key '%s'" % (prefix, name, key_pem)) + print(f"{base} private key '{key_pem}'") if key_passphrase: - print("set pki %s %s private password-protected" % (prefix, name)) + print(f"{base} private password-protected") def install_crl(ca_name, crl): # Show conf commands for installing crl print("Configure mode commands to install CRL:") crl_pem = "".join(encode_certificate(crl).strip().split("\n")[1:-1]) - print("set pki ca %s crl '%s'" % (ca_name, crl_pem)) + print(f"set pki ca {ca_name} crl '{crl_pem}'") def install_dh_parameters(name, params): # Show conf commands for installing dh params print("Configure mode commands to install DH parameters:") dh_pem = "".join(encode_dh_parameters(params).strip().split("\n")[1:-1]) - print("set pki dh %s parameters '%s'" % (name, dh_pem)) + print(f"set pki dh {name} parameters '{dh_pem}'") def install_ssh_key(name, public_key, private_key, passphrase=None): # Show conf commands for installing ssh key key_openssh = encode_public_key(public_key, encoding='OpenSSH', key_format='OpenSSH') username = os.getlogin() type_key_split = key_openssh.split(" ") + + base = f"set system login user {username} authentication public-keys {name}" print("Configure mode commands to install SSH key:") - print("set system login user %s authentication public-keys %s key '%s'" % (username, name, type_key_split[1])) - print("set system login user %s authentication public-keys %s type '%s'" % (username, name, type_key_split[0])) - print("") + print(f"{base} key '{type_key_split[1]}'") + print(f"{base} type '{type_key_split[0]}'", end="\n\n") print(encode_private_key(private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase)) def install_keypair(name, key_type, private_key=None, public_key=None, passphrase=None): @@ -189,7 +191,7 @@ def install_keypair(name, key_type, private_key=None, public_key=None, passphras if install_public_key: install_public_pem = "".join(public_key_pem.strip().split("\n")[1:-1]) - print("set pki key-pair %s public key '%s'" % (name, install_public_pem)) + print(f"set pki key-pair {name} public key '{install_public_pem}'") else: print("Public key:") print(public_key_pem) @@ -200,30 +202,53 @@ def install_keypair(name, key_type, private_key=None, public_key=None, passphras if install_private_key: install_private_pem = "".join(private_key_pem.strip().split("\n")[1:-1]) - print("set pki key-pair %s private key '%s'" % (name, install_private_pem)) + print(f"set pki key-pair {name} private key '{install_private_pem}'") if passphrase: - print("set pki key-pair %s private password-protected" % (name,)) + print(f"set pki key-pair {name} private password-protected") else: print("Private key:") print(private_key_pem) -def install_wireguard_key(name, private_key, public_key): +def install_wireguard_key(interface, private_key, public_key): # Show conf commands for installing wireguard key pairs - is_interface = re.match(r'^wg[\d]+$', name) + from vyos.ifconfig import Section + if Section.section(interface) != 'wireguard': + print(f'"{interface}" is not a WireGuard interface name!') + exit(1) + + # Check if we are running in a config session - if yes, we can directly write to the CLI + cli_string = f"interfaces wireguard {interface} private-key '{private_key}'" + if Config().in_session(): + cmd(f"/opt/vyatta/sbin/my_set {cli_string}") + + print('"generate" CLI command executed from config session.\nGenerated private-key was imported to CLI!',end='\n\n') + print(f'Use the following command to verify: show interfaces wireguard {interface}') + else: + print('"generate" CLI command executed from operational level.\n' + 'Generated private-key is not stored to CLI, use configure mode commands to install key:', end='\n\n') + print(f"set {cli_string}", end="\n\n") - print("Configure mode commands to install key:") - if is_interface: - print("set interfaces wireguard %s private-key '%s'" % (name, private_key)) - print("") - print("Public key for use on peer configuration: " + public_key) + print(f"Corresponding public-key to use on peer system is: '{public_key}'") + + +def install_wireguard_psk(interface, peer, psk): + from vyos.ifconfig import Section + if Section.section(interface) != 'wireguard': + print(f'"{interface}" is not a WireGuard interface name!') + exit(1) + + # Check if we are running in a config session - if yes, we can directly write to the CLI + cli_string = f"interfaces wireguard {interface} peer {peer} preshared-key '{psk}'" + if Config().in_session(): + cmd(f"/opt/vyatta/sbin/my_set {cli_string}") + + print('"generate" CLI command executed from config session.\nGenerated preshared-key was imported to CLI!',end='\n\n') + print(f'Use the following command to verify: show interfaces wireguard {interface}') else: - print("set interfaces wireguard [INTERFACE] peer %s public-key '%s'" % (name, public_key)) - print("") - print("Private key for use on peer configuration: " + private_key) + print('"generate" CLI command executed from operational level.\n' + 'Generated preshared-key is not stored to CLI, use configure mode commands to install key:', end='\n\n') + print(f"set {cli_string}", end="\n\n") -def install_wireguard_psk(name, psk): - # Show conf commands for installing wireguard psk - print("set interfaces wireguard [INTERFACE] peer %s preshared-key '%s'" % (name, psk)) def ask_passphrase(): passphrase = None @@ -464,7 +489,7 @@ def generate_certificate_sign(name, ca_name, install=False, file=False): if not cert_req: print("Invalid certificate request") return None - + cert = generate_certificate(cert_req, ca_cert, ca_private_key, is_ca=False) passphrase = ask_passphrase() @@ -630,49 +655,37 @@ def generate_openvpn_key(name, install=False, file=False): key_data = "".join(key_lines[1:-1]) # Remove wrapper tags and line endings key_version = '1' + import re version_search = re.search(r'BEGIN OpenVPN Static key V(\d+)', result) # Future-proofing (hopefully) if version_search: key_version = version_search[1] + base = f"set pki openvpn shared-secret {name}" print("Configure mode commands to install OpenVPN key:") - print("set pki openvpn shared-secret %s key '%s'" % (name, key_data)) - print("set pki openvpn shared-secret %s version '%s'" % (name, key_version)) + print(f"{base} key '{key_data}'") + print(f"{base} version '{key_version}'") if file: write_file(f'{name}.key', result) -def generate_wireguard_key(name, install=False, file=False): +def generate_wireguard_key(interface=None, install=False): private_key = cmd('wg genkey') public_key = cmd('wg pubkey', input=private_key) - if not install: - print("Private key: " + private_key) - print("Public key: " + public_key) - return None - - if install: - install_wireguard_key(name, private_key, public_key) - - if file: - write_file(f'{name}_public.key', public_key) - write_file(f'{name}_private.key', private_key) + if interface and install: + install_wireguard_key(interface, private_key, public_key) + else: + print(f'Private key: {private_key}') + print(f'Public key: {public_key}', end='\n\n') -def generate_wireguard_psk(name, install=False, file=False): +def generate_wireguard_psk(interface=None, peer=None, install=False): psk = cmd('wg genpsk') - - if not install and not file: - print("Pre-shared key:") - print(psk) - return None - - if install: - install_wireguard_psk(name, psk) - - if file: - write_file(f'{name}.key', psk) + if interface and peer and install: + install_wireguard_psk(interface, peer, psk) + else: + print(f'Pre-shared key: {psk}') # Show functions - def show_certificate_authority(name=None): headers = ['Name', 'Subject', 'Issuer CN', 'Issued', 'Expiry', 'Private Key', 'Parent'] data = [] @@ -789,10 +802,13 @@ if __name__ == '__main__': # OpenVPN parser.add_argument('--openvpn', help='OpenVPN TLS key', required=False) - # Wireguard + # WireGuard parser.add_argument('--wireguard', help='Wireguard', action='store_true') - parser.add_argument('--key', help='Wireguard key pair', required=False) - parser.add_argument('--psk', help='Wireguard pre shared key', required=False) + group = parser.add_mutually_exclusive_group() + group.add_argument('--key', help='Wireguard key pair', action='store_true', required=False) + group.add_argument('--psk', help='Wireguard pre shared key', action='store_true', required=False) + parser.add_argument('--interface', help='Install generated keys into running-config for named interface', action='store') + parser.add_argument('--peer', help='Install generated keys into running-config for peer', action='store') # Global parser.add_argument('--file', help='Write generated keys into specified filename', action='store_true') @@ -813,26 +829,47 @@ if __name__ == '__main__': elif args.self_sign: generate_certificate_selfsign(args.certificate, install=args.install, file=args.file) else: - generate_certificate_request(name=args.certificate, install=args.install) + generate_certificate_request(name=args.certificate, install=args.install, file=args.file) + elif args.crl: generate_certificate_revocation_list(args.crl, install=args.install, file=args.file) + elif args.ssh: generate_ssh_keypair(args.ssh, install=args.install, file=args.file) + elif args.dh: generate_dh_parameters(args.dh, install=args.install, file=args.file) + elif args.keypair: generate_keypair(args.keypair, install=args.install, file=args.file) + elif args.openvpn: generate_openvpn_key(args.openvpn, install=args.install, file=args.file) + elif args.wireguard: + # WireGuard supports writing key directly into the CLI, but this + # requires the vyos_libexec_dir environment variable to be set + os.environ["vyos_libexec_dir"] = "/usr/libexec/vyos" + if args.key: - generate_wireguard_key(args.key, install=args.install, file=args.file) - elif args.psk: - generate_wireguard_psk(args.psk, install=args.install, file=args.file) + generate_wireguard_key(args.interface, install=args.install) + if args.psk: + generate_wireguard_psk(args.interface, peer=args.peer, install=args.install) + elif args.action == 'show': if args.ca: - show_certificate_authority(None if args.ca == 'all' else args.ca) + ca_name = None if args.ca == 'all' else args.ca + if ca_name: + if not conf.exists(['pki', 'ca', ca_name]): + print(f'CA "{ca_name}" does not exist!') + exit(1) + show_certificate_authority(ca_name) elif args.certificate: + cert_name = None if args.certificate == 'all' else args.certificate + if cert_name: + if not conf.exists(['pki', 'certificate', cert_name]): + print(f'Certificate "{cert_name}" does not exist!') + exit(1) show_certificate(None if args.certificate == 'all' else args.certificate) elif args.crl: show_crl(None if args.crl == 'all' else args.crl) |