From 50f71042d7a00692ade8caaa32db1102157ad87a Mon Sep 17 00:00:00 2001
From: sarthurdev <965089+sarthurdev@users.noreply.github.com>
Date: Sat, 3 Jul 2021 13:56:42 +0200
Subject: pki: T3642: Add support for signing and revoking subordinate CAs
---
interface-definitions/pki.xml.in | 6 +++
op-mode-definitions/pki.xml.in | 20 +++++++
src/op_mode/pki.py | 112 ++++++++++++++++++++++++++++++++++-----
3 files changed, 124 insertions(+), 14 deletions(-)
diff --git a/interface-definitions/pki.xml.in b/interface-definitions/pki.xml.in
index 4b082cbc4..6d137c2ce 100644
--- a/interface-definitions/pki.xml.in
+++ b/interface-definitions/pki.xml.in
@@ -44,6 +44,12 @@
+
+
+ If parent CA is present, this CA certificate will be included in generated CRLs
+
+
+
diff --git a/op-mode-definitions/pki.xml.in b/op-mode-definitions/pki.xml.in
index 06b15eed4..9c6b56a68 100644
--- a/op-mode-definitions/pki.xml.in
+++ b/op-mode-definitions/pki.xml.in
@@ -12,6 +12,26 @@
Generate CA certificate
+
+
+ Sign generated CA certificate with another specified CA certificate
+
+ pki ca
+
+
+
+
+
+ Commands for installing generated certificate into running configuration
+
+ <certificate name>
+
+
+ sudo ${vyos_op_scripts_dir}/pki.py --action generate --ca "$7" --sign "$5" --install
+
+
+ sudo ${vyos_op_scripts_dir}/pki.py --action generate --ca "noname" --sign "$5"
+
Commands for installing generated certificate into running configuration
diff --git a/src/op_mode/pki.py b/src/op_mode/pki.py
index d99a432aa..b2576fbb9 100755
--- a/src/op_mode/pki.py
+++ b/src/op_mode/pki.py
@@ -98,19 +98,28 @@ def get_certificate_ca(cert, ca_certs):
def get_config_revoked_certificates():
# Fetch revoked certificates from config
conf = Config()
- base = ['pki', 'certificate']
- if not conf.exists(base):
- return {}
+ ca_base = ['pki', 'ca']
+ cert_base = ['pki', 'certificate']
- certificates = conf.get_config_dict(base, key_mangling=('-', '_'),
- get_first_key=True, no_tag_node_value_mangle=True)
+ certs = []
+
+ if conf.exists(ca_base):
+ ca_certificates = conf.get_config_dict(ca_base, key_mangling=('-', '_'),
+ get_first_key=True, no_tag_node_value_mangle=True)
+ certs.extend(ca_certificates.values())
+
+ if conf.exists(cert_base):
+ certificates = conf.get_config_dict(cert_base, key_mangling=('-', '_'),
+ get_first_key=True, no_tag_node_value_mangle=True)
+ certs.extend(certificates.values())
- return {cert: cert_dict for cert, cert_dict in certificates.items() if 'revoke' in cert_dict}
+ return [cert_dict for cert_dict in certs if 'revoke' in cert_dict]
def get_revoked_by_serial_numbers(serial_numbers=[]):
# Return serial numbers of revoked certificates
certs_out = []
certs = get_config_certificate()
+ ca_certs = get_config_ca_certificate()
if certs:
for cert_name, cert_dict in certs.items():
if 'certificate' not in cert_dict:
@@ -119,8 +128,14 @@ def get_revoked_by_serial_numbers(serial_numbers=[]):
cert = load_certificate(cert_dict['certificate'])
if cert.serial_number in serial_numbers:
certs_out.append(cert_name)
- else:
- certs_out.append(str(cert.serial_number)[0:10] + '...')
+ if ca_certs:
+ for cert_name, cert_dict in ca_certs.items():
+ if 'certificate' not in cert_dict:
+ continue
+
+ cert = load_certificate(cert_dict['certificate'])
+ if cert.serial_number in serial_numbers:
+ certs_out.append(cert_name)
return certs_out
def install_certificate(name, cert='', private_key=None, key_type=None, key_passphrase=None, is_ca=False):
@@ -141,7 +156,7 @@ def install_certificate(name, cert='', private_key=None, key_type=None, key_pass
def install_crl(ca_name, crl):
# Show conf commands for installing crl
print("Configure mode commands to install CRL:")
- crl_pem = "".join(encode_public_key(crl).strip().split("\n")[1:-1])
+ crl_pem = "".join(encode_certificate(crl).strip().split("\n")[1:-1])
print("set pki ca %s crl '%s'" % (ca_name, crl_pem))
def install_dh_parameters(name, params):
@@ -281,6 +296,67 @@ def generate_ca_certificate(name, install=False):
install_certificate(name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=True)
+def generate_ca_certificate_sign(name, ca_name, install=False):
+ ca_dict = get_config_ca_certificate(ca_name)
+
+ if not ca_dict:
+ print(f"CA certificate or private key for '{ca_name}' not found")
+ return None
+
+ ca_cert = load_certificate(ca_dict['certificate'])
+
+ if not ca_cert:
+ print("Failed to load signing CA certificate, aborting")
+ return None
+
+ ca_private = ca_dict['private']
+ ca_private_passphrase = None
+ if 'password_protected' in ca_private:
+ ca_private_passphrase = ask_input('Enter signing CA private key passphrase:')
+ ca_private_key = load_private_key(ca_private['key'], passphrase=ca_private_passphrase)
+
+ if not ca_private_key:
+ print("Failed to load signing CA private key, aborting")
+ return None
+
+ private_key = None
+ key_type = None
+
+ cert_req = None
+ if not ask_yes_no('Do you already have a certificate request?'):
+ private_key, key_type = generate_private_key()
+ cert_req = generate_certificate_request(private_key, key_type, return_request=True)
+ else:
+ print("Paste certificate request and press enter:")
+ lines = []
+ curr_line = ''
+ while True:
+ curr_line = input().strip()
+ if not curr_line or curr_line == CERT_REQ_END:
+ break
+ lines.append(curr_line)
+
+ if not lines:
+ print("Aborted")
+ return None
+
+ wrap = lines[0].find('-----') < 0 # Only base64 pasted, add the CSR tags for parsing
+ cert_req = load_certificate_request("\n".join(lines), wrap)
+
+ if not cert_req:
+ print("Invalid certificate request")
+ return None
+
+ cert = generate_certificate(cert_req, ca_cert, ca_private_key, is_ca=True)
+ passphrase = ask_passphrase()
+
+ if not install:
+ print(encode_certificate(cert))
+ print(encode_private_key(private_key, passphrase=passphrase))
+ return None
+
+ install_certificate(name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=True)
+
def generate_certificate_sign(name, ca_name, install=False):
ca_dict = get_config_ca_certificate(ca_name)
@@ -378,10 +454,10 @@ def generate_certificate_revocation_list(ca_name, install=False):
print("Failed to load CA private key, aborting")
return None
- revoked_certs = get_config_revoked_Certificates()
+ revoked_certs = get_config_revoked_certificates()
to_revoke = []
- for cert_name, cert_dict in revoked_certs.items():
+ for cert_dict in revoked_certs:
if 'certificate' not in cert_dict:
continue
@@ -500,7 +576,7 @@ def generate_wireguard_psk(name, install=False):
# Show functions
def show_certificate_authority(name=None):
- headers = ['Name', 'Subject', 'Issued', 'Expiry', 'Private Key']
+ headers = ['Name', 'Subject', 'Issuer CN', 'Issued', 'Expiry', 'Private Key', 'Parent']
data = []
certs = get_config_ca_certificate()
if certs:
@@ -511,12 +587,17 @@ def show_certificate_authority(name=None):
continue
cert = load_certificate(cert_dict['certificate'])
+ parent_ca_name = get_certificate_ca(cert, certs)
+ cert_issuer_cn = cert.issuer.rfc4514_string().split(",")[0]
+
+ if not parent_ca_name or parent_ca_name == cert_name:
+ parent_ca_name = 'N/A'
if not cert:
continue
have_private = 'Yes' if 'private' in cert_dict and 'key' in cert_dict['private'] else 'No'
- data.append([cert_name, cert.subject.rfc4514_string(), cert.not_valid_before, cert.not_valid_after, have_private])
+ data.append([cert_name, cert.subject.rfc4514_string(), cert_issuer_cn, cert.not_valid_before, cert.not_valid_after, have_private, parent_ca_name])
print("Certificate Authorities:")
print(tabulate.tabulate(data, headers))
@@ -623,7 +704,10 @@ if __name__ == '__main__':
try:
if args.action == 'generate':
if args.ca:
- generate_ca_certificate(args.ca, args.install)
+ if args.sign:
+ generate_ca_certificate_sign(args.ca, args.sign, args.install)
+ else:
+ generate_ca_certificate(args.ca, args.install)
elif args.certificate:
if args.sign:
generate_certificate_sign(args.certificate, args.sign, args.install)
--
cgit v1.2.3