summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorsarthurdev <965089+sarthurdev@users.noreply.github.com>2021-07-23 13:39:14 +0200
committersarthurdev <965089+sarthurdev@users.noreply.github.com>2021-07-23 14:37:00 +0200
commit77a9473915b46879bae504dfa3c1c4d0d60fa2e9 (patch)
treeb3bd901659bf6124864cd894d4c27bbf4b69064d /src
parentd4b2777c1bffca47d9b3b21d8907818f06591c59 (diff)
downloadvyos-1x-77a9473915b46879bae504dfa3c1c4d0d60fa2e9.tar.gz
vyos-1x-77a9473915b46879bae504dfa3c1c4d0d60fa2e9.zip
pki: T3642: Add ability to write generated certificates/keys to specified filenames
Diffstat (limited to 'src')
-rwxr-xr-xsrc/op_mode/pki.py187
1 files changed, 131 insertions, 56 deletions
diff --git a/src/op_mode/pki.py b/src/op_mode/pki.py
index b4a68b31c..297270cf1 100755
--- a/src/op_mode/pki.py
+++ b/src/op_mode/pki.py
@@ -38,6 +38,8 @@ from vyos.util import cmd
CERT_REQ_END = '-----END CERTIFICATE REQUEST-----'
+auth_dir = '/config/auth'
+
# Helper Functions
def get_default_values():
@@ -230,6 +232,22 @@ def ask_passphrase():
passphrase = ask_input('Enter passphrase:')
return passphrase
+def write_file(filename, contents):
+ full_path = os.path.join(auth_dir, filename)
+ directory = os.path.dirname(full_path)
+
+ if not os.path.exists(directory):
+ print('Failed to write file: directory does not exist')
+ return False
+
+ if os.path.exists(full_path) and not ask_yes_no('Do you want to overwrite the existing file?'):
+ return False
+
+ with open(full_path, 'w') as f:
+ f.write(contents)
+
+ print(f'File written to {full_path}')
+
# Generation functions
def generate_private_key():
@@ -266,7 +284,7 @@ def parse_san_string(san_string):
output.append(value)
return output
-def generate_certificate_request(private_key=None, key_type=None, return_request=False, name=None, install=False, ask_san=True):
+def generate_certificate_request(private_key=None, key_type=None, return_request=False, name=None, install=False, file=False, ask_san=True):
if not private_key:
private_key, key_type = generate_private_key()
@@ -291,14 +309,19 @@ def generate_certificate_request(private_key=None, key_type=None, return_request
passphrase = ask_passphrase()
- if not install:
+ if not install and not file:
print(encode_certificate(cert_req))
print(encode_private_key(private_key, passphrase=passphrase))
return None
- print("Certificate request:")
- print(encode_certificate(cert_req) + "\n")
- install_certificate(name, private_key=private_key, key_type=key_type, key_passphrase=passphrase, is_ca=False)
+ if install:
+ print("Certificate request:")
+ print(encode_certificate(cert_req) + "\n")
+ install_certificate(name, private_key=private_key, key_type=key_type, key_passphrase=passphrase, is_ca=False)
+
+ if file:
+ write_file(f'{name}.csr', encode_certificate(cert_req))
+ write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase))
def generate_certificate(cert_req, ca_cert, ca_private_key, is_ca=False, is_sub_ca=False):
valid_days = ask_input('Enter how many days certificate will be valid:', default='365' if not is_ca else '1825', numeric_only=True)
@@ -307,20 +330,25 @@ def generate_certificate(cert_req, ca_cert, ca_private_key, is_ca=False, is_sub_
cert_type = ask_input('Enter certificate type: (client, server)', default='server', valid_responses=['client', 'server'])
return create_certificate(cert_req, ca_cert, ca_private_key, valid_days, cert_type, is_ca, is_sub_ca)
-def generate_ca_certificate(name, install=False):
+def generate_ca_certificate(name, install=False, file=False):
private_key, key_type = generate_private_key()
cert_req = generate_certificate_request(private_key, key_type, return_request=True, ask_san=False)
cert = generate_certificate(cert_req, cert_req, private_key, is_ca=True)
passphrase = ask_passphrase()
- if not install:
+ if not install and not file:
print(encode_certificate(cert))
print(encode_private_key(private_key, passphrase=passphrase))
return None
- install_certificate(name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=True)
+ if install:
+ install_certificate(name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=True)
+
+ if file:
+ write_file(f'{name}.pem', encode_certificate(cert))
+ write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase))
-def generate_ca_certificate_sign(name, ca_name, install=False):
+def generate_ca_certificate_sign(name, ca_name, install=False, file=False):
ca_dict = get_config_ca_certificate(ca_name)
if not ca_dict:
@@ -374,14 +402,19 @@ def generate_ca_certificate_sign(name, ca_name, install=False):
cert = generate_certificate(cert_req, ca_cert, ca_private_key, is_ca=True, is_sub_ca=True)
passphrase = ask_passphrase()
- if not install:
+ if not install and not file:
print(encode_certificate(cert))
print(encode_private_key(private_key, passphrase=passphrase))
return None
- install_certificate(name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=True)
+ if install:
+ install_certificate(name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=True)
+
+ if file:
+ write_file(f'{name}.pem', encode_certificate(cert))
+ write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase))
-def generate_certificate_sign(name, ca_name, install=False):
+def generate_certificate_sign(name, ca_name, install=False, file=False):
ca_dict = get_config_ca_certificate(ca_name)
if not ca_dict:
@@ -435,27 +468,37 @@ def generate_certificate_sign(name, ca_name, install=False):
cert = generate_certificate(cert_req, ca_cert, ca_private_key, is_ca=False)
passphrase = ask_passphrase()
- if not install:
+ if not install and not file:
print(encode_certificate(cert))
print(encode_private_key(private_key, passphrase=passphrase))
return None
- install_certificate(name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=False)
+ if install:
+ install_certificate(name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=False)
+
+ if file:
+ write_file(f'{name}.pem', encode_certificate(cert))
+ write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase))
-def generate_certificate_selfsign(name, install=False):
+def generate_certificate_selfsign(name, install=False, file=False):
private_key, key_type = generate_private_key()
cert_req = generate_certificate_request(private_key, key_type, return_request=True)
cert = generate_certificate(cert_req, cert_req, private_key, is_ca=False)
passphrase = ask_passphrase()
- if not install:
+ if not install and not file:
print(encode_certificate(cert))
print(encode_private_key(private_key, passphrase=passphrase))
return None
- install_certificate(name, cert, private_key=private_key, key_type=key_type, key_passphrase=passphrase, is_ca=False)
+ if install:
+ install_certificate(name, cert, private_key=private_key, key_type=key_type, key_passphrase=passphrase, is_ca=False)
+
+ if file:
+ write_file(f'{name}.pem', encode_certificate(cert))
+ write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase))
-def generate_certificate_revocation_list(ca_name, install=False):
+def generate_certificate_revocation_list(ca_name, install=False, file=False):
ca_dict = get_config_ca_certificate(ca_name)
if not ca_dict:
@@ -505,26 +548,35 @@ def generate_certificate_revocation_list(ca_name, install=False):
print("Failed to create CRL")
return None
- if not install:
+ if not install and not file:
print(encode_certificate(crl))
return None
- install_crl(ca_name, crl)
+ if install:
+ install_crl(ca_name, crl)
+
+ if file:
+ write_file(f'{name}.crl', encode_certificate(crl))
-def generate_ssh_keypair(name, install=False):
+def generate_ssh_keypair(name, install=False, file=False):
private_key, key_type = generate_private_key()
public_key = private_key.public_key()
passphrase = ask_passphrase()
- if not install:
+ if not install and not file:
print(encode_public_key(public_key, encoding='OpenSSH', key_format='OpenSSH'))
print("")
print(encode_private_key(private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase))
return None
- install_ssh_key(name, public_key, private_key, passphrase)
+ if install:
+ install_ssh_key(name, public_key, private_key, passphrase)
+
+ if file:
+ write_file(f'{name}.pem', encode_public_key(public_key, encoding='OpenSSH', key_format='OpenSSH'))
+ write_file(f'{name}.key', encode_private_key(private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase))
-def generate_dh_parameters(name, install=False):
+def generate_dh_parameters(name, install=False, file=False):
bits = ask_input('Enter DH parameters key size:', default=2048, numeric_only=True)
print("Generating parameters...")
@@ -534,49 +586,62 @@ def generate_dh_parameters(name, install=False):
print("Failed to create DH parameters")
return None
- if not install:
+ if not install and not file:
print("DH Parameters:")
print(encode_dh_parameters(dh_params))
- install_dh_parameters(name, dh_params)
+ if install:
+ install_dh_parameters(name, dh_params)
+
+ if file:
+ write_file(f'{name}.pem', encode_dh_parameters(dh_params))
-def generate_keypair(name, install=False):
+def generate_keypair(name, install=False, file=False):
private_key, key_type = generate_private_key()
public_key = private_key.public_key()
passphrase = ask_passphrase()
- if not install:
+ if not install and not file:
print(encode_public_key(public_key))
print("")
print(encode_private_key(private_key, passphrase=passphrase))
return None
- install_keypair(name, key_type, private_key, public_key, passphrase)
+ if install:
+ install_keypair(name, key_type, private_key, public_key, passphrase)
+
+ if file:
+ write_file(f'{name}.pem', encode_public_key(public_key))
+ write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase))
-def generate_openvpn_key(name, install=False):
+def generate_openvpn_key(name, install=False, file=False):
result = cmd('openvpn --genkey secret /dev/stdout | grep -o "^[^#]*"')
if not result:
print("Failed to generate OpenVPN key")
return None
- if not install:
+ if not install and not file:
print(result)
return None
- key_lines = result.split("\n")
- key_data = "".join(key_lines[1:-1]) # Remove wrapper tags and line endings
- key_version = '1'
+ if install:
+ key_lines = result.split("\n")
+ key_data = "".join(key_lines[1:-1]) # Remove wrapper tags and line endings
+ key_version = '1'
+
+ version_search = re.search(r'BEGIN OpenVPN Static key V(\d+)', result) # Future-proofing (hopefully)
+ if version_search:
+ key_version = version_search[1]
- version_search = re.search(r'BEGIN OpenVPN Static key V(\d+)', result) # Future-proofing (hopefully)
- if version_search:
- key_version = version_search[1]
+ print("Configure mode commands to install OpenVPN key:")
+ print("set pki openvpn shared-secret %s key '%s'" % (name, key_data))
+ print("set pki openvpn shared-secret %s version '%s'" % (name, key_version))
- print("Configure mode commands to install OpenVPN key:")
- print("set pki openvpn shared-secret %s key '%s'" % (name, key_data))
- print("set pki openvpn shared-secret %s version '%s'" % (name, key_version))
+ if file:
+ write_file(f'{name}.key', result)
-def generate_wireguard_key(name, install=False):
+def generate_wireguard_key(name, install=False, file=False):
private_key = cmd('wg genkey')
public_key = cmd('wg pubkey', input=private_key)
@@ -585,17 +650,26 @@ def generate_wireguard_key(name, install=False):
print("Public key: " + public_key)
return None
- install_wireguard_key(name, private_key, public_key)
+ if install:
+ install_wireguard_key(name, private_key, public_key)
-def generate_wireguard_psk(name, install=False):
+ if file:
+ write_file(f'{name}_public.key', public_key)
+ write_file(f'{name}_private.key', private_key)
+
+def generate_wireguard_psk(name, install=False, file=False):
psk = cmd('wg genpsk')
- if not install:
+ if not install and not file:
print("Pre-shared key:")
print(psk)
return None
- install_wireguard_psk(name, psk)
+ if install:
+ install_wireguard_psk(name, psk)
+
+ if file:
+ write_file(f'{name}.key', psk)
# Show functions
@@ -721,6 +795,7 @@ if __name__ == '__main__':
parser.add_argument('--psk', help='Wireguard pre shared key', required=False)
# Global
+ parser.add_argument('--file', help='Write generated keys into specified filename', action='store_true')
parser.add_argument('--install', help='Install generated keys into running-config', action='store_true')
args = parser.parse_args()
@@ -729,31 +804,31 @@ if __name__ == '__main__':
if args.action == 'generate':
if args.ca:
if args.sign:
- generate_ca_certificate_sign(args.ca, args.sign, args.install)
+ generate_ca_certificate_sign(args.ca, args.sign, install=args.install, file=args.file)
else:
- generate_ca_certificate(args.ca, args.install)
+ generate_ca_certificate(args.ca, install=args.install, file=args.file)
elif args.certificate:
if args.sign:
- generate_certificate_sign(args.certificate, args.sign, args.install)
+ generate_certificate_sign(args.certificate, args.sign, install=args.install, file=args.file)
elif args.self_sign:
- generate_certificate_selfsign(args.certificate, args.install)
+ generate_certificate_selfsign(args.certificate, install=args.install, file=args.file)
else:
generate_certificate_request(name=args.certificate, install=args.install)
elif args.crl:
- generate_certificate_revocation_list(args.crl, args.install)
+ generate_certificate_revocation_list(args.crl, install=args.install, file=args.file)
elif args.ssh:
- generate_ssh_keypair(args.ssh, args.install)
+ generate_ssh_keypair(args.ssh, install=args.install, file=args.file)
elif args.dh:
- generate_dh_parameters(args.dh, args.install)
+ generate_dh_parameters(args.dh, install=args.install, file=args.file)
elif args.keypair:
- generate_keypair(args.keypair, args.install)
+ generate_keypair(args.keypair, install=args.install, file=args.file)
elif args.openvpn:
- generate_openvpn_key(args.openvpn, args.install)
+ generate_openvpn_key(args.openvpn, install=args.install, file=args.file)
elif args.wireguard:
if args.key:
- generate_wireguard_key(args.key, args.install)
+ generate_wireguard_key(args.key, install=args.install, file=args.file)
elif args.psk:
- generate_wireguard_psk(args.psk, args.install)
+ generate_wireguard_psk(args.psk, install=args.install, file=args.file)
elif args.action == 'show':
if args.ca:
show_certificate_authority(None if args.ca == 'all' else args.ca)