summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rwxr-xr-xsrc/op_mode/pki.py517
1 files changed, 375 insertions, 142 deletions
diff --git a/src/op_mode/pki.py b/src/op_mode/pki.py
index 56b873bb1..49a461e9e 100755
--- a/src/op_mode/pki.py
+++ b/src/op_mode/pki.py
@@ -86,13 +86,17 @@ def _verify(target):
def get_default_values():
# Fetch default x509 values
base = ['pki', 'x509', 'default']
- x509_defaults = conf.get_config_dict(base, key_mangling=('-', '_'),
- no_tag_node_value_mangle=True,
- get_first_key=True,
- with_recursive_defaults=True)
+ x509_defaults = conf.get_config_dict(
+ base,
+ key_mangling=('-', '_'),
+ no_tag_node_value_mangle=True,
+ get_first_key=True,
+ with_recursive_defaults=True,
+ )
return x509_defaults
+
def get_config_ca_certificate(name=None):
# Fetch ca certificates from config
base = ['pki', 'ca']
@@ -101,12 +105,15 @@ def get_config_ca_certificate(name=None):
if name:
base = base + [name]
- if not conf.exists(base + ['private', 'key']) or not conf.exists(base + ['certificate']):
+ if not conf.exists(base + ['private', 'key']) or not conf.exists(
+ base + ['certificate']
+ ):
return False
- return conf.get_config_dict(base, key_mangling=('-', '_'),
- get_first_key=True,
- no_tag_node_value_mangle=True)
+ return conf.get_config_dict(
+ base, key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True
+ )
+
def get_config_certificate(name=None):
# Get certificates from config
@@ -116,18 +123,21 @@ def get_config_certificate(name=None):
if name:
base = base + [name]
- if not conf.exists(base + ['private', 'key']) or not conf.exists(base + ['certificate']):
+ if not conf.exists(base + ['private', 'key']) or not conf.exists(
+ base + ['certificate']
+ ):
return False
- pki = conf.get_config_dict(base, key_mangling=('-', '_'),
- get_first_key=True,
- no_tag_node_value_mangle=True)
+ pki = conf.get_config_dict(
+ base, key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True
+ )
if pki:
for certificate in pki:
pki[certificate] = config_dict_mangle_acme(certificate, pki[certificate])
return pki
+
def get_certificate_ca(cert, ca_certs):
# Find CA certificate for given certificate
if not ca_certs:
@@ -146,6 +156,7 @@ def get_certificate_ca(cert, ca_certs):
return ca_name
return None
+
def get_config_revoked_certificates():
# Fetch revoked certificates from config
ca_base = ['pki', 'ca']
@@ -154,19 +165,26 @@ def get_config_revoked_certificates():
certs = []
if conf.exists(ca_base):
- ca_certificates = conf.get_config_dict(ca_base, key_mangling=('-', '_'),
- get_first_key=True,
- no_tag_node_value_mangle=True)
+ ca_certificates = conf.get_config_dict(
+ ca_base,
+ key_mangling=('-', '_'),
+ get_first_key=True,
+ no_tag_node_value_mangle=True,
+ )
certs.extend(ca_certificates.values())
if conf.exists(cert_base):
- certificates = conf.get_config_dict(cert_base, key_mangling=('-', '_'),
- get_first_key=True,
- no_tag_node_value_mangle=True)
+ certificates = conf.get_config_dict(
+ cert_base,
+ key_mangling=('-', '_'),
+ get_first_key=True,
+ no_tag_node_value_mangle=True,
+ )
certs.extend(certificates.values())
return [cert_dict for cert_dict in certs if 'revoke' in cert_dict]
+
def get_revoked_by_serial_numbers(serial_numbers=[]):
# Return serial numbers of revoked certificates
certs_out = []
@@ -190,113 +208,153 @@ def get_revoked_by_serial_numbers(serial_numbers=[]):
certs_out.append(cert_name)
return certs_out
-def install_certificate(name, cert='', private_key=None, key_type=None, key_passphrase=None, is_ca=False):
+
+def install_certificate(
+ name, cert='', private_key=None, key_type=None, key_passphrase=None, is_ca=False
+):
# Show/install conf commands for certificate
prefix = 'ca' if is_ca else 'certificate'
- base = f"pki {prefix} {name}"
+ base = f'pki {prefix} {name}'
config_paths = []
if cert:
- cert_pem = "".join(encode_certificate(cert).strip().split("\n")[1:-1])
+ cert_pem = ''.join(encode_certificate(cert).strip().split('\n')[1:-1])
config_paths.append(f"{base} certificate '{cert_pem}'")
if private_key:
- key_pem = "".join(encode_private_key(private_key, passphrase=key_passphrase).strip().split("\n")[1:-1])
+ key_pem = ''.join(
+ encode_private_key(private_key, passphrase=key_passphrase)
+ .strip()
+ .split('\n')[1:-1]
+ )
config_paths.append(f"{base} private key '{key_pem}'")
if key_passphrase:
- config_paths.append(f"{base} private password-protected")
+ config_paths.append(f'{base} private password-protected')
install_into_config(conf, config_paths)
+
def install_crl(ca_name, crl):
# Show/install conf commands for crl
- crl_pem = "".join(encode_certificate(crl).strip().split("\n")[1:-1])
+ crl_pem = ''.join(encode_certificate(crl).strip().split('\n')[1:-1])
install_into_config(conf, [f"pki ca {ca_name} crl '{crl_pem}'"])
+
def install_dh_parameters(name, params):
# Show/install conf commands for dh params
- dh_pem = "".join(encode_dh_parameters(params).strip().split("\n")[1:-1])
+ dh_pem = ''.join(encode_dh_parameters(params).strip().split('\n')[1:-1])
install_into_config(conf, [f"pki dh {name} parameters '{dh_pem}'"])
+
def install_ssh_key(name, public_key, private_key, passphrase=None):
# Show/install conf commands for ssh key
- key_openssh = encode_public_key(public_key, encoding='OpenSSH', key_format='OpenSSH')
+ key_openssh = encode_public_key(
+ public_key, encoding='OpenSSH', key_format='OpenSSH'
+ )
username = os.getlogin()
- type_key_split = key_openssh.split(" ")
-
- base = f"system login user {username} authentication public-keys {name}"
- install_into_config(conf, [
- f"{base} key '{type_key_split[1]}'",
- f"{base} type '{type_key_split[0]}'"
- ])
- print(encode_private_key(private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase))
-
-def install_keypair(name, key_type, private_key=None, public_key=None, passphrase=None, prompt=True):
+ type_key_split = key_openssh.split(' ')
+
+ base = f'system login user {username} authentication public-keys {name}'
+ install_into_config(
+ conf,
+ [f"{base} key '{type_key_split[1]}'", f"{base} type '{type_key_split[0]}'"],
+ )
+ print(
+ encode_private_key(
+ private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase
+ )
+ )
+
+
+def install_keypair(
+ name, key_type, private_key=None, public_key=None, passphrase=None, prompt=True
+):
# Show/install conf commands for key-pair
config_paths = []
if public_key:
- install_public_key = not prompt or ask_yes_no('Do you want to install the public key?', default=True)
+ install_public_key = not prompt or ask_yes_no(
+ 'Do you want to install the public key?', default=True
+ )
public_key_pem = encode_public_key(public_key)
if install_public_key:
- install_public_pem = "".join(public_key_pem.strip().split("\n")[1:-1])
- config_paths.append(f"pki key-pair {name} public key '{install_public_pem}'")
+ install_public_pem = ''.join(public_key_pem.strip().split('\n')[1:-1])
+ config_paths.append(
+ f"pki key-pair {name} public key '{install_public_pem}'"
+ )
else:
- print("Public key:")
+ print('Public key:')
print(public_key_pem)
if private_key:
- install_private_key = not prompt or ask_yes_no('Do you want to install the private key?', default=True)
+ install_private_key = not prompt or ask_yes_no(
+ 'Do you want to install the private key?', default=True
+ )
private_key_pem = encode_private_key(private_key, passphrase=passphrase)
if install_private_key:
- install_private_pem = "".join(private_key_pem.strip().split("\n")[1:-1])
- config_paths.append(f"pki key-pair {name} private key '{install_private_pem}'")
+ install_private_pem = ''.join(private_key_pem.strip().split('\n')[1:-1])
+ config_paths.append(
+ f"pki key-pair {name} private key '{install_private_pem}'"
+ )
if passphrase:
- config_paths.append(f"pki key-pair {name} private password-protected")
+ config_paths.append(f'pki key-pair {name} private password-protected')
else:
- print("Private key:")
+ print('Private key:')
print(private_key_pem)
install_into_config(conf, config_paths)
+
def install_openvpn_key(name, key_data, key_version='1'):
config_paths = [
f"pki openvpn shared-secret {name} key '{key_data}'",
- f"pki openvpn shared-secret {name} version '{key_version}'"
+ f"pki openvpn shared-secret {name} version '{key_version}'",
]
install_into_config(conf, config_paths)
+
def install_wireguard_key(interface, private_key, public_key):
# Show conf commands for installing wireguard key pairs
from vyos.ifconfig import Section
+
if Section.section(interface) != 'wireguard':
print(f'"{interface}" is not a WireGuard interface name!')
exit(1)
# Check if we are running in a config session - if yes, we can directly write to the CLI
- install_into_config(conf, [f"interfaces wireguard {interface} private-key '{private_key}'"])
+ install_into_config(
+ conf, [f"interfaces wireguard {interface} private-key '{private_key}'"]
+ )
print(f"Corresponding public-key to use on peer system is: '{public_key}'")
+
def install_wireguard_psk(interface, peer, psk):
from vyos.ifconfig import Section
+
if Section.section(interface) != 'wireguard':
print(f'"{interface}" is not a WireGuard interface name!')
exit(1)
# Check if we are running in a config session - if yes, we can directly write to the CLI
- install_into_config(conf, [f"interfaces wireguard {interface} peer {peer} preshared-key '{psk}'"])
+ install_into_config(
+ conf, [f"interfaces wireguard {interface} peer {peer} preshared-key '{psk}'"]
+ )
+
def ask_passphrase():
passphrase = None
- print("Note: If you plan to use the generated key on this router, do not encrypt the private key.")
+ print(
+ 'Note: If you plan to use the generated key on this router, do not encrypt the private key.'
+ )
if ask_yes_no('Do you want to encrypt the private key with a passphrase?'):
passphrase = ask_input('Enter passphrase:')
return passphrase
+
def write_file(filename, contents):
full_path = os.path.join(auth_dir, filename)
directory = os.path.dirname(full_path)
@@ -305,7 +363,9 @@ def write_file(filename, contents):
print('Failed to write file: directory does not exist')
return False
- if os.path.exists(full_path) and not ask_yes_no('Do you want to overwrite the existing file?'):
+ if os.path.exists(full_path) and not ask_yes_no(
+ 'Do you want to overwrite the existing file?'
+ ):
return False
with open(full_path, 'w') as f:
@@ -313,10 +373,14 @@ def write_file(filename, contents):
print(f'File written to {full_path}')
-# Generation functions
+# Generation functions
def generate_private_key():
- key_type = ask_input('Enter private key type: [rsa, dsa, ec]', default='rsa', valid_responses=['rsa', 'dsa', 'ec'])
+ key_type = ask_input(
+ 'Enter private key type: [rsa, dsa, ec]',
+ default='rsa',
+ valid_responses=['rsa', 'dsa', 'ec'],
+ )
size_valid = []
size_default = 0
@@ -328,28 +392,43 @@ def generate_private_key():
size_default = 256
size_valid = [224, 256, 384, 521]
- size = ask_input('Enter private key bits:', default=size_default, numeric_only=True, valid_responses=size_valid)
+ size = ask_input(
+ 'Enter private key bits:',
+ default=size_default,
+ numeric_only=True,
+ valid_responses=size_valid,
+ )
return create_private_key(key_type, size), key_type
+
def parse_san_string(san_string):
if not san_string:
return None
output = []
- san_split = san_string.strip().split(",")
+ san_split = san_string.strip().split(',')
for pair_str in san_split:
- tag, value = pair_str.strip().split(":", 1)
+ tag, value = pair_str.strip().split(':', 1)
if tag == 'ipv4':
output.append(ipaddress.IPv4Address(value))
elif tag == 'ipv6':
output.append(ipaddress.IPv6Address(value))
elif tag == 'dns' or tag == 'rfc822':
output.append(value)
- return output
+ return
-def generate_certificate_request(private_key=None, key_type=None, return_request=False, name=None, install=False, file=False, ask_san=True):
+
+def generate_certificate_request(
+ private_key=None,
+ key_type=None,
+ return_request=False,
+ name=None,
+ install=False,
+ file=False,
+ ask_san=True,
+):
if not private_key:
private_key, key_type = generate_private_key()
@@ -358,18 +437,24 @@ def generate_certificate_request(private_key=None, key_type=None, return_request
while True:
country = ask_input('Enter country code:', default=default_values['country'])
if len(country) != 2:
- print("Country name must be a 2 character country code")
+ print('Country name must be a 2 character country code')
continue
subject['country'] = country
break
subject['state'] = ask_input('Enter state:', default=default_values['state'])
- subject['locality'] = ask_input('Enter locality:', default=default_values['locality'])
- subject['organization'] = ask_input('Enter organization name:', default=default_values['organization'])
+ subject['locality'] = ask_input(
+ 'Enter locality:', default=default_values['locality']
+ )
+ subject['organization'] = ask_input(
+ 'Enter organization name:', default=default_values['organization']
+ )
subject['common_name'] = ask_input('Enter common name:', default='vyos.io')
subject_alt_names = None
if ask_san and ask_yes_no('Do you want to configure Subject Alternative Names?'):
- print("Enter alternative names in a comma separate list, example: ipv4:1.1.1.1,ipv6:fe80::1,dns:vyos.net,rfc822:user@vyos.net")
+ print(
+ 'Enter alternative names in a comma separate list, example: ipv4:1.1.1.1,ipv6:fe80::1,dns:vyos.net,rfc822:user@vyos.net'
+ )
san_string = ask_input('Enter Subject Alternative Names:')
subject_alt_names = parse_san_string(san_string)
@@ -386,24 +471,48 @@ def generate_certificate_request(private_key=None, key_type=None, return_request
return None
if install:
- print("Certificate request:")
- print(encode_certificate(cert_req) + "\n")
- install_certificate(name, private_key=private_key, key_type=key_type, key_passphrase=passphrase, is_ca=False)
+ print('Certificate request:')
+ print(encode_certificate(cert_req) + '\n')
+ install_certificate(
+ name,
+ private_key=private_key,
+ key_type=key_type,
+ key_passphrase=passphrase,
+ is_ca=False,
+ )
if file:
write_file(f'{name}.csr', encode_certificate(cert_req))
- write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase))
+ write_file(
+ f'{name}.key', encode_private_key(private_key, passphrase=passphrase)
+ )
-def generate_certificate(cert_req, ca_cert, ca_private_key, is_ca=False, is_sub_ca=False):
- valid_days = ask_input('Enter how many days certificate will be valid:', default='365' if not is_ca else '1825', numeric_only=True)
+
+def generate_certificate(
+ cert_req, ca_cert, ca_private_key, is_ca=False, is_sub_ca=False
+):
+ valid_days = ask_input(
+ 'Enter how many days certificate will be valid:',
+ default='365' if not is_ca else '1825',
+ numeric_only=True,
+ )
cert_type = None
if not is_ca:
- cert_type = ask_input('Enter certificate type: (client, server)', default='server', valid_responses=['client', 'server'])
- return create_certificate(cert_req, ca_cert, ca_private_key, valid_days, cert_type, is_ca, is_sub_ca)
+ cert_type = ask_input(
+ 'Enter certificate type: (client, server)',
+ default='server',
+ valid_responses=['client', 'server'],
+ )
+ return create_certificate(
+ cert_req, ca_cert, ca_private_key, valid_days, cert_type, is_ca, is_sub_ca
+ )
+
def generate_ca_certificate(name, install=False, file=False):
private_key, key_type = generate_private_key()
- cert_req = generate_certificate_request(private_key, key_type, return_request=True, ask_san=False)
+ cert_req = generate_certificate_request(
+ private_key, key_type, return_request=True, ask_san=False
+ )
cert = generate_certificate(cert_req, cert_req, private_key, is_ca=True)
passphrase = ask_passphrase()
@@ -413,11 +522,16 @@ def generate_ca_certificate(name, install=False, file=False):
return None
if install:
- install_certificate(name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=True)
+ install_certificate(
+ name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=True
+ )
if file:
write_file(f'{name}.pem', encode_certificate(cert))
- write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase))
+ write_file(
+ f'{name}.key', encode_private_key(private_key, passphrase=passphrase)
+ )
+
def generate_ca_certificate_sign(name, ca_name, install=False, file=False):
ca_dict = get_config_ca_certificate(ca_name)
@@ -429,17 +543,19 @@ def generate_ca_certificate_sign(name, ca_name, install=False, file=False):
ca_cert = load_certificate(ca_dict['certificate'])
if not ca_cert:
- print("Failed to load signing CA certificate, aborting")
+ print('Failed to load signing CA certificate, aborting')
return None
ca_private = ca_dict['private']
ca_private_passphrase = None
if 'password_protected' in ca_private:
ca_private_passphrase = ask_input('Enter signing CA private key passphrase:')
- ca_private_key = load_private_key(ca_private['key'], passphrase=ca_private_passphrase)
+ ca_private_key = load_private_key(
+ ca_private['key'], passphrase=ca_private_passphrase
+ )
if not ca_private_key:
- print("Failed to load signing CA private key, aborting")
+ print('Failed to load signing CA private key, aborting')
return None
private_key = None
@@ -448,9 +564,11 @@ def generate_ca_certificate_sign(name, ca_name, install=False, file=False):
cert_req = None
if not ask_yes_no('Do you already have a certificate request?'):
private_key, key_type = generate_private_key()
- cert_req = generate_certificate_request(private_key, key_type, return_request=True, ask_san=False)
+ cert_req = generate_certificate_request(
+ private_key, key_type, return_request=True, ask_san=False
+ )
else:
- print("Paste certificate request and press enter:")
+ print('Paste certificate request and press enter:')
lines = []
curr_line = ''
while True:
@@ -460,17 +578,21 @@ def generate_ca_certificate_sign(name, ca_name, install=False, file=False):
lines.append(curr_line)
if not lines:
- print("Aborted")
+ print('Aborted')
return None
- wrap = lines[0].find('-----') < 0 # Only base64 pasted, add the CSR tags for parsing
- cert_req = load_certificate_request("\n".join(lines), wrap)
+ wrap = (
+ lines[0].find('-----') < 0
+ ) # Only base64 pasted, add the CSR tags for parsing
+ cert_req = load_certificate_request('\n'.join(lines), wrap)
if not cert_req:
- print("Invalid certificate request")
+ print('Invalid certificate request')
return None
- cert = generate_certificate(cert_req, ca_cert, ca_private_key, is_ca=True, is_sub_ca=True)
+ cert = generate_certificate(
+ cert_req, ca_cert, ca_private_key, is_ca=True, is_sub_ca=True
+ )
passphrase = None
if private_key is not None:
@@ -483,12 +605,17 @@ def generate_ca_certificate_sign(name, ca_name, install=False, file=False):
return None
if install:
- install_certificate(name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=True)
+ install_certificate(
+ name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=True
+ )
if file:
write_file(f'{name}.pem', encode_certificate(cert))
if private_key is not None:
- write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase))
+ write_file(
+ f'{name}.key', encode_private_key(private_key, passphrase=passphrase)
+ )
+
def generate_certificate_sign(name, ca_name, install=False, file=False):
ca_dict = get_config_ca_certificate(ca_name)
@@ -500,17 +627,19 @@ def generate_certificate_sign(name, ca_name, install=False, file=False):
ca_cert = load_certificate(ca_dict['certificate'])
if not ca_cert:
- print("Failed to load CA certificate, aborting")
+ print('Failed to load CA certificate, aborting')
return None
ca_private = ca_dict['private']
ca_private_passphrase = None
if 'password_protected' in ca_private:
ca_private_passphrase = ask_input('Enter CA private key passphrase:')
- ca_private_key = load_private_key(ca_private['key'], passphrase=ca_private_passphrase)
+ ca_private_key = load_private_key(
+ ca_private['key'], passphrase=ca_private_passphrase
+ )
if not ca_private_key:
- print("Failed to load CA private key, aborting")
+ print('Failed to load CA private key, aborting')
return None
private_key = None
@@ -519,9 +648,11 @@ def generate_certificate_sign(name, ca_name, install=False, file=False):
cert_req = None
if not ask_yes_no('Do you already have a certificate request?'):
private_key, key_type = generate_private_key()
- cert_req = generate_certificate_request(private_key, key_type, return_request=True)
+ cert_req = generate_certificate_request(
+ private_key, key_type, return_request=True
+ )
else:
- print("Paste certificate request and press enter:")
+ print('Paste certificate request and press enter:')
lines = []
curr_line = ''
while True:
@@ -531,18 +662,20 @@ def generate_certificate_sign(name, ca_name, install=False, file=False):
lines.append(curr_line)
if not lines:
- print("Aborted")
+ print('Aborted')
return None
- wrap = lines[0].find('-----') < 0 # Only base64 pasted, add the CSR tags for parsing
- cert_req = load_certificate_request("\n".join(lines), wrap)
+ wrap = (
+ lines[0].find('-----') < 0
+ ) # Only base64 pasted, add the CSR tags for parsing
+ cert_req = load_certificate_request('\n'.join(lines), wrap)
if not cert_req:
- print("Invalid certificate request")
+ print('Invalid certificate request')
return None
cert = generate_certificate(cert_req, ca_cert, ca_private_key, is_ca=False)
-
+
passphrase = None
if private_key is not None:
passphrase = ask_passphrase()
@@ -554,12 +687,17 @@ def generate_certificate_sign(name, ca_name, install=False, file=False):
return None
if install:
- install_certificate(name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=False)
+ install_certificate(
+ name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=False
+ )
if file:
write_file(f'{name}.pem', encode_certificate(cert))
if private_key is not None:
- write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase))
+ write_file(
+ f'{name}.key', encode_private_key(private_key, passphrase=passphrase)
+ )
+
def generate_certificate_selfsign(name, install=False, file=False):
private_key, key_type = generate_private_key()
@@ -573,11 +711,21 @@ def generate_certificate_selfsign(name, install=False, file=False):
return None
if install:
- install_certificate(name, cert, private_key=private_key, key_type=key_type, key_passphrase=passphrase, is_ca=False)
+ install_certificate(
+ name,
+ cert,
+ private_key=private_key,
+ key_type=key_type,
+ key_passphrase=passphrase,
+ is_ca=False,
+ )
if file:
write_file(f'{name}.pem', encode_certificate(cert))
- write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase))
+ write_file(
+ f'{name}.key', encode_private_key(private_key, passphrase=passphrase)
+ )
+
def generate_certificate_revocation_list(ca_name, install=False, file=False):
ca_dict = get_config_ca_certificate(ca_name)
@@ -589,17 +737,19 @@ def generate_certificate_revocation_list(ca_name, install=False, file=False):
ca_cert = load_certificate(ca_dict['certificate'])
if not ca_cert:
- print("Failed to load CA certificate, aborting")
+ print('Failed to load CA certificate, aborting')
return None
ca_private = ca_dict['private']
ca_private_passphrase = None
if 'password_protected' in ca_private:
ca_private_passphrase = ask_input('Enter CA private key passphrase:')
- ca_private_key = load_private_key(ca_private['key'], passphrase=ca_private_passphrase)
+ ca_private_key = load_private_key(
+ ca_private['key'], passphrase=ca_private_passphrase
+ )
if not ca_private_key:
- print("Failed to load CA private key, aborting")
+ print('Failed to load CA private key, aborting')
return None
revoked_certs = get_config_revoked_certificates()
@@ -620,13 +770,13 @@ def generate_certificate_revocation_list(ca_name, install=False, file=False):
continue
if not to_revoke:
- print("No revoked certificates to add to the CRL")
+ print('No revoked certificates to add to the CRL')
return None
crl = create_certificate_revocation_list(ca_cert, ca_private_key, to_revoke)
if not crl:
- print("Failed to create CRL")
+ print('Failed to create CRL')
return None
if not install and not file:
@@ -637,7 +787,8 @@ def generate_certificate_revocation_list(ca_name, install=False, file=False):
install_crl(ca_name, crl)
if file:
- write_file(f'{name}.crl', encode_certificate(crl))
+ write_file(f'{ca_name}.crl', encode_certificate(crl))
+
def generate_ssh_keypair(name, install=False, file=False):
private_key, key_type = generate_private_key()
@@ -646,29 +797,42 @@ def generate_ssh_keypair(name, install=False, file=False):
if not install and not file:
print(encode_public_key(public_key, encoding='OpenSSH', key_format='OpenSSH'))
- print("")
- print(encode_private_key(private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase))
+ print('')
+ print(
+ encode_private_key(
+ private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase
+ )
+ )
return None
if install:
install_ssh_key(name, public_key, private_key, passphrase)
if file:
- write_file(f'{name}.pem', encode_public_key(public_key, encoding='OpenSSH', key_format='OpenSSH'))
- write_file(f'{name}.key', encode_private_key(private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase))
+ write_file(
+ f'{name}.pem',
+ encode_public_key(public_key, encoding='OpenSSH', key_format='OpenSSH'),
+ )
+ write_file(
+ f'{name}.key',
+ encode_private_key(
+ private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase
+ ),
+ )
+
def generate_dh_parameters(name, install=False, file=False):
bits = ask_input('Enter DH parameters key size:', default=2048, numeric_only=True)
- print("Generating parameters...")
+ print('Generating parameters...')
dh_params = create_dh_parameters(bits)
if not dh_params:
- print("Failed to create DH parameters")
+ print('Failed to create DH parameters')
return None
if not install and not file:
- print("DH Parameters:")
+ print('DH Parameters:')
print(encode_dh_parameters(dh_params))
if install:
@@ -677,6 +841,7 @@ def generate_dh_parameters(name, install=False, file=False):
if file:
write_file(f'{name}.pem', encode_dh_parameters(dh_params))
+
def generate_keypair(name, install=False, file=False):
private_key, key_type = generate_private_key()
public_key = private_key.public_key()
@@ -684,7 +849,7 @@ def generate_keypair(name, install=False, file=False):
if not install and not file:
print(encode_public_key(public_key))
- print("")
+ print('')
print(encode_private_key(private_key, passphrase=passphrase))
return None
@@ -693,13 +858,16 @@ def generate_keypair(name, install=False, file=False):
if file:
write_file(f'{name}.pem', encode_public_key(public_key))
- write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase))
+ write_file(
+ f'{name}.key', encode_private_key(private_key, passphrase=passphrase)
+ )
+
def generate_openvpn_key(name, install=False, file=False):
result = cmd('openvpn --genkey secret /dev/stdout | grep -o "^[^#]*"')
if not result:
- print("Failed to generate OpenVPN key")
+ print('Failed to generate OpenVPN key')
return None
if not install and not file:
@@ -707,11 +875,13 @@ def generate_openvpn_key(name, install=False, file=False):
return None
if install:
- key_lines = result.split("\n")
- key_data = "".join(key_lines[1:-1]) # Remove wrapper tags and line endings
+ key_lines = result.split('\n')
+ key_data = ''.join(key_lines[1:-1]) # Remove wrapper tags and line endings
key_version = '1'
- version_search = re.search(r'BEGIN OpenVPN Static key V(\d+)', result) # Future-proofing (hopefully)
+ version_search = re.search(
+ r'BEGIN OpenVPN Static key V(\d+)', result
+ ) # Future-proofing (hopefully)
if version_search:
key_version = version_search[1]
@@ -720,6 +890,7 @@ def generate_openvpn_key(name, install=False, file=False):
if file:
write_file(f'{name}.key', result)
+
def generate_wireguard_key(interface=None, install=False):
private_key = cmd('wg genkey')
public_key = cmd('wg pubkey', input=private_key)
@@ -730,6 +901,7 @@ def generate_wireguard_key(interface=None, install=False):
print(f'Private key: {private_key}')
print(f'Public key: {public_key}', end='\n\n')
+
def generate_wireguard_psk(interface=None, peer=None, install=False):
psk = cmd('wg genpsk')
if interface and peer and install:
@@ -737,8 +909,11 @@ def generate_wireguard_psk(interface=None, peer=None, install=False):
else:
print(f'Pre-shared key: {psk}')
+
# Import functions
-def import_ca_certificate(name, path=None, key_path=None, no_prompt=False, passphrase=None):
+def import_ca_certificate(
+ name, path=None, key_path=None, no_prompt=False, passphrase=None
+):
if path:
if not os.path.exists(path):
print(f'File not found: {path}')
@@ -775,7 +950,10 @@ def import_ca_certificate(name, path=None, key_path=None, no_prompt=False, passp
install_certificate(name, private_key=key, is_ca=True)
-def import_certificate(name, path=None, key_path=None, no_prompt=False, passphrase=None):
+
+def import_certificate(
+ name, path=None, key_path=None, no_prompt=False, passphrase=None
+):
if path:
if not os.path.exists(path):
print(f'File not found: {path}')
@@ -812,6 +990,7 @@ def import_certificate(name, path=None, key_path=None, no_prompt=False, passphra
install_certificate(name, private_key=key, is_ca=False)
+
def import_crl(name, path):
if not os.path.exists(path):
print(f'File not found: {path}')
@@ -829,6 +1008,7 @@ def import_crl(name, path):
install_crl(name, crl)
+
def import_dh_parameters(name, path):
if not os.path.exists(path):
print(f'File not found: {path}')
@@ -846,6 +1026,7 @@ def import_dh_parameters(name, path):
install_dh_parameters(name, dh)
+
def import_keypair(name, path=None, key_path=None, no_prompt=False, passphrase=None):
if path:
if not os.path.exists(path):
@@ -883,6 +1064,7 @@ def import_keypair(name, path=None, key_path=None, no_prompt=False, passphrase=N
install_keypair(name, None, private_key=key, prompt=False)
+
def import_openvpn_secret(name, path):
if not os.path.exists(path):
print(f'File not found: {path}')
@@ -892,11 +1074,15 @@ def import_openvpn_secret(name, path):
key_version = '1'
with open(path) as f:
- key_lines = f.read().strip().split("\n")
- key_lines = list(filter(lambda line: not line.strip().startswith('#'), key_lines)) # Remove commented lines
- key_data = "".join(key_lines[1:-1]) # Remove wrapper tags and line endings
-
- version_search = re.search(r'BEGIN OpenVPN Static key V(\d+)', key_lines[0]) # Future-proofing (hopefully)
+ key_lines = f.read().strip().split('\n')
+ key_lines = list(
+ filter(lambda line: not line.strip().startswith('#'), key_lines)
+ ) # Remove commented lines
+ key_data = ''.join(key_lines[1:-1]) # Remove wrapper tags and line endings
+
+ version_search = re.search(
+ r'BEGIN OpenVPN Static key V(\d+)', key_lines[0]
+ ) # Future-proofing (hopefully)
if version_search:
key_version = version_search[1]
@@ -1007,7 +1193,15 @@ def import_pki(
def show_certificate_authority(
raw: bool, name: typing.Optional[str] = None, pem: typing.Optional[bool] = False
):
- headers = ['Name', 'Subject', 'Issuer CN', 'Issued', 'Expiry', 'Private Key', 'Parent']
+ headers = [
+ 'Name',
+ 'Subject',
+ 'Issuer CN',
+ 'Issued',
+ 'Expiry',
+ 'Private Key',
+ 'Parent',
+ ]
data = []
certs = get_config_ca_certificate()
if certs:
@@ -1024,7 +1218,7 @@ def show_certificate_authority(
return
parent_ca_name = get_certificate_ca(cert, certs)
- cert_issuer_cn = cert.issuer.rfc4514_string().split(",")[0]
+ cert_issuer_cn = cert.issuer.rfc4514_string().split(',')[0]
if not parent_ca_name or parent_ca_name == cert_name:
parent_ca_name = 'N/A'
@@ -1032,10 +1226,24 @@ def show_certificate_authority(
if not cert:
continue
- have_private = 'Yes' if 'private' in cert_dict and 'key' in cert_dict['private'] else 'No'
- data.append([cert_name, cert.subject.rfc4514_string(), cert_issuer_cn, cert.not_valid_before, cert.not_valid_after, have_private, parent_ca_name])
+ have_private = (
+ 'Yes'
+ if 'private' in cert_dict and 'key' in cert_dict['private']
+ else 'No'
+ )
+ data.append(
+ [
+ cert_name,
+ cert.subject.rfc4514_string(),
+ cert_issuer_cn,
+ cert.not_valid_before,
+ cert.not_valid_after,
+ have_private,
+ parent_ca_name,
+ ]
+ )
- print("Certificate Authorities:")
+ print('Certificate Authorities:')
print(tabulate.tabulate(data, headers))
@@ -1046,7 +1254,17 @@ def show_certificate(
pem: typing.Optional[bool] = False,
fingerprint: typing.Optional[ArgsFingerprint] = None,
):
- headers = ['Name', 'Type', 'Subject CN', 'Issuer CN', 'Issued', 'Expiry', 'Revoked', 'Private Key', 'CA Present']
+ headers = [
+ 'Name',
+ 'Type',
+ 'Subject CN',
+ 'Issuer CN',
+ 'Issued',
+ 'Expiry',
+ 'Revoked',
+ 'Private Key',
+ 'CA Present',
+ ]
data = []
certs = get_config_certificate()
if certs:
@@ -1071,8 +1289,8 @@ def show_certificate(
return
ca_name = get_certificate_ca(cert, ca_certs)
- cert_subject_cn = cert.subject.rfc4514_string().split(",")[0]
- cert_issuer_cn = cert.issuer.rfc4514_string().split(",")[0]
+ cert_subject_cn = cert.subject.rfc4514_string().split(',')[0]
+ cert_issuer_cn = cert.issuer.rfc4514_string().split(',')[0]
cert_type = 'Unknown'
try:
@@ -1081,18 +1299,31 @@ def show_certificate(
cert_type = 'Server'
elif ext and ExtendedKeyUsageOID.CLIENT_AUTH in ext.value:
cert_type = 'Client'
- except:
+ except Exception:
pass
revoked = 'Yes' if 'revoke' in cert_dict else 'No'
- have_private = 'Yes' if 'private' in cert_dict and 'key' in cert_dict['private'] else 'No'
+ have_private = (
+ 'Yes'
+ if 'private' in cert_dict and 'key' in cert_dict['private']
+ else 'No'
+ )
have_ca = f'Yes ({ca_name})' if ca_name else 'No'
- data.append([
- cert_name, cert_type, cert_subject_cn, cert_issuer_cn,
- cert.not_valid_before, cert.not_valid_after,
- revoked, have_private, have_ca])
+ data.append(
+ [
+ cert_name,
+ cert_type,
+ cert_subject_cn,
+ cert_issuer_cn,
+ cert.not_valid_before,
+ cert.not_valid_after,
+ revoked,
+ have_private,
+ have_ca,
+ ]
+ )
- print("Certificates:")
+ print('Certificates:')
print(tabulate.tabulate(data, headers))
@@ -1123,13 +1354,15 @@ def show_crl(
print(encode_certificate(crl))
continue
- certs = get_revoked_by_serial_numbers([revoked.serial_number for revoked in crl])
- data.append([cert_name, crl.last_update, ", ".join(certs)])
+ certs = get_revoked_by_serial_numbers(
+ [revoked.serial_number for revoked in crl]
+ )
+ data.append([cert_name, crl.last_update, ', '.join(certs)])
if name and pem:
return
- print("Certificate Revocation Lists:")
+ print('Certificate Revocation Lists:')
print(tabulate.tabulate(data, headers))