From 50f71042d7a00692ade8caaa32db1102157ad87a Mon Sep 17 00:00:00 2001 From: sarthurdev <965089+sarthurdev@users.noreply.github.com> Date: Sat, 3 Jul 2021 13:56:42 +0200 Subject: pki: T3642: Add support for signing and revoking subordinate CAs --- interface-definitions/pki.xml.in | 6 +++ op-mode-definitions/pki.xml.in | 20 +++++++ src/op_mode/pki.py | 112 ++++++++++++++++++++++++++++++++++----- 3 files changed, 124 insertions(+), 14 deletions(-) diff --git a/interface-definitions/pki.xml.in b/interface-definitions/pki.xml.in index 4b082cbc4..6d137c2ce 100644 --- a/interface-definitions/pki.xml.in +++ b/interface-definitions/pki.xml.in @@ -44,6 +44,12 @@ + + + If parent CA is present, this CA certificate will be included in generated CRLs + + + diff --git a/op-mode-definitions/pki.xml.in b/op-mode-definitions/pki.xml.in index 06b15eed4..9c6b56a68 100644 --- a/op-mode-definitions/pki.xml.in +++ b/op-mode-definitions/pki.xml.in @@ -12,6 +12,26 @@ Generate CA certificate + + + Sign generated CA certificate with another specified CA certificate + + pki ca + + + + + + Commands for installing generated certificate into running configuration + + <certificate name> + + + sudo ${vyos_op_scripts_dir}/pki.py --action generate --ca "$7" --sign "$5" --install + + + sudo ${vyos_op_scripts_dir}/pki.py --action generate --ca "noname" --sign "$5" + Commands for installing generated certificate into running configuration diff --git a/src/op_mode/pki.py b/src/op_mode/pki.py index d99a432aa..b2576fbb9 100755 --- a/src/op_mode/pki.py +++ b/src/op_mode/pki.py @@ -98,19 +98,28 @@ def get_certificate_ca(cert, ca_certs): def get_config_revoked_certificates(): # Fetch revoked certificates from config conf = Config() - base = ['pki', 'certificate'] - if not conf.exists(base): - return {} + ca_base = ['pki', 'ca'] + cert_base = ['pki', 'certificate'] - certificates = conf.get_config_dict(base, key_mangling=('-', '_'), - get_first_key=True, no_tag_node_value_mangle=True) + certs = [] + + if conf.exists(ca_base): + ca_certificates = conf.get_config_dict(ca_base, key_mangling=('-', '_'), + get_first_key=True, no_tag_node_value_mangle=True) + certs.extend(ca_certificates.values()) + + if conf.exists(cert_base): + certificates = conf.get_config_dict(cert_base, key_mangling=('-', '_'), + get_first_key=True, no_tag_node_value_mangle=True) + certs.extend(certificates.values()) - return {cert: cert_dict for cert, cert_dict in certificates.items() if 'revoke' in cert_dict} + return [cert_dict for cert_dict in certs if 'revoke' in cert_dict] def get_revoked_by_serial_numbers(serial_numbers=[]): # Return serial numbers of revoked certificates certs_out = [] certs = get_config_certificate() + ca_certs = get_config_ca_certificate() if certs: for cert_name, cert_dict in certs.items(): if 'certificate' not in cert_dict: @@ -119,8 +128,14 @@ def get_revoked_by_serial_numbers(serial_numbers=[]): cert = load_certificate(cert_dict['certificate']) if cert.serial_number in serial_numbers: certs_out.append(cert_name) - else: - certs_out.append(str(cert.serial_number)[0:10] + '...') + if ca_certs: + for cert_name, cert_dict in ca_certs.items(): + if 'certificate' not in cert_dict: + continue + + cert = load_certificate(cert_dict['certificate']) + if cert.serial_number in serial_numbers: + certs_out.append(cert_name) return certs_out def install_certificate(name, cert='', private_key=None, key_type=None, key_passphrase=None, is_ca=False): @@ -141,7 +156,7 @@ def install_certificate(name, cert='', private_key=None, key_type=None, key_pass def install_crl(ca_name, crl): # Show conf commands for installing crl print("Configure mode commands to install CRL:") - crl_pem = "".join(encode_public_key(crl).strip().split("\n")[1:-1]) + crl_pem = "".join(encode_certificate(crl).strip().split("\n")[1:-1]) print("set pki ca %s crl '%s'" % (ca_name, crl_pem)) def install_dh_parameters(name, params): @@ -281,6 +296,67 @@ def generate_ca_certificate(name, install=False): install_certificate(name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=True) +def generate_ca_certificate_sign(name, ca_name, install=False): + ca_dict = get_config_ca_certificate(ca_name) + + if not ca_dict: + print(f"CA certificate or private key for '{ca_name}' not found") + return None + + ca_cert = load_certificate(ca_dict['certificate']) + + if not ca_cert: + print("Failed to load signing CA certificate, aborting") + return None + + ca_private = ca_dict['private'] + ca_private_passphrase = None + if 'password_protected' in ca_private: + ca_private_passphrase = ask_input('Enter signing CA private key passphrase:') + ca_private_key = load_private_key(ca_private['key'], passphrase=ca_private_passphrase) + + if not ca_private_key: + print("Failed to load signing CA private key, aborting") + return None + + private_key = None + key_type = None + + cert_req = None + if not ask_yes_no('Do you already have a certificate request?'): + private_key, key_type = generate_private_key() + cert_req = generate_certificate_request(private_key, key_type, return_request=True) + else: + print("Paste certificate request and press enter:") + lines = [] + curr_line = '' + while True: + curr_line = input().strip() + if not curr_line or curr_line == CERT_REQ_END: + break + lines.append(curr_line) + + if not lines: + print("Aborted") + return None + + wrap = lines[0].find('-----') < 0 # Only base64 pasted, add the CSR tags for parsing + cert_req = load_certificate_request("\n".join(lines), wrap) + + if not cert_req: + print("Invalid certificate request") + return None + + cert = generate_certificate(cert_req, ca_cert, ca_private_key, is_ca=True) + passphrase = ask_passphrase() + + if not install: + print(encode_certificate(cert)) + print(encode_private_key(private_key, passphrase=passphrase)) + return None + + install_certificate(name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=True) + def generate_certificate_sign(name, ca_name, install=False): ca_dict = get_config_ca_certificate(ca_name) @@ -378,10 +454,10 @@ def generate_certificate_revocation_list(ca_name, install=False): print("Failed to load CA private key, aborting") return None - revoked_certs = get_config_revoked_Certificates() + revoked_certs = get_config_revoked_certificates() to_revoke = [] - for cert_name, cert_dict in revoked_certs.items(): + for cert_dict in revoked_certs: if 'certificate' not in cert_dict: continue @@ -500,7 +576,7 @@ def generate_wireguard_psk(name, install=False): # Show functions def show_certificate_authority(name=None): - headers = ['Name', 'Subject', 'Issued', 'Expiry', 'Private Key'] + headers = ['Name', 'Subject', 'Issuer CN', 'Issued', 'Expiry', 'Private Key', 'Parent'] data = [] certs = get_config_ca_certificate() if certs: @@ -511,12 +587,17 @@ def show_certificate_authority(name=None): continue cert = load_certificate(cert_dict['certificate']) + parent_ca_name = get_certificate_ca(cert, certs) + cert_issuer_cn = cert.issuer.rfc4514_string().split(",")[0] + + if not parent_ca_name or parent_ca_name == cert_name: + parent_ca_name = 'N/A' if not cert: continue have_private = 'Yes' if 'private' in cert_dict and 'key' in cert_dict['private'] else 'No' - data.append([cert_name, cert.subject.rfc4514_string(), cert.not_valid_before, cert.not_valid_after, have_private]) + data.append([cert_name, cert.subject.rfc4514_string(), cert_issuer_cn, cert.not_valid_before, cert.not_valid_after, have_private, parent_ca_name]) print("Certificate Authorities:") print(tabulate.tabulate(data, headers)) @@ -623,7 +704,10 @@ if __name__ == '__main__': try: if args.action == 'generate': if args.ca: - generate_ca_certificate(args.ca, args.install) + if args.sign: + generate_ca_certificate_sign(args.ca, args.sign, args.install) + else: + generate_ca_certificate(args.ca, args.install) elif args.certificate: if args.sign: generate_certificate_sign(args.certificate, args.sign, args.install) -- cgit v1.2.3