diff options
Diffstat (limited to 'src')
-rwxr-xr-x | src/conf_mode/pki.py | 2 | ||||
-rwxr-xr-x | src/conf_mode/vpn_openconnect.py | 43 | ||||
-rwxr-xr-x | src/op_mode/ikev2_profile_generator.py | 57 | ||||
-rwxr-xr-x | src/op_mode/pki.py | 22 |
4 files changed, 79 insertions, 45 deletions
diff --git a/src/conf_mode/pki.py b/src/conf_mode/pki.py index f37cac524..4a0e86f32 100755 --- a/src/conf_mode/pki.py +++ b/src/conf_mode/pki.py @@ -232,7 +232,7 @@ def get_config(config=None): path = search['path'] path_str = ' '.join(path + found_path) - print(f'PKI: Updating config: {path_str} {item_name}') + #print(f'PKI: Updating config: {path_str} {item_name}') if path[0] == 'interfaces': ifname = found_path[0] diff --git a/src/conf_mode/vpn_openconnect.py b/src/conf_mode/vpn_openconnect.py index 8159fedea..42785134f 100755 --- a/src/conf_mode/vpn_openconnect.py +++ b/src/conf_mode/vpn_openconnect.py @@ -21,14 +21,17 @@ from vyos.base import Warning from vyos.config import Config from vyos.configverify import verify_pki_certificate from vyos.configverify import verify_pki_ca_certificate -from vyos.pki import wrap_certificate +from vyos.pki import find_chain +from vyos.pki import encode_certificate +from vyos.pki import load_certificate from vyos.pki import wrap_private_key from vyos.template import render -from vyos.utils.process import call +from vyos.utils.dict import dict_search +from vyos.utils.file import write_file from vyos.utils.network import check_port_availability -from vyos.utils.process import is_systemd_service_running from vyos.utils.network import is_listen_port_bind_service -from vyos.utils.dict import dict_search +from vyos.utils.process import call +from vyos.utils.process import is_systemd_service_running from vyos import ConfigError from passlib.hash import sha512_crypt from time import sleep @@ -142,7 +145,8 @@ def verify(ocserv): verify_pki_certificate(ocserv, ocserv['ssl']['certificate']) if 'ca_certificate' in ocserv['ssl']: - verify_pki_ca_certificate(ocserv, ocserv['ssl']['ca_certificate']) + for ca_cert in ocserv['ssl']['ca_certificate']: + verify_pki_ca_certificate(ocserv, ca_cert) # Check network settings if "network_settings" in ocserv: @@ -219,25 +223,36 @@ def generate(ocserv): if "ssl" in ocserv: cert_file_path = os.path.join(cfg_dir, 'cert.pem') cert_key_path = os.path.join(cfg_dir, 'cert.key') - ca_cert_file_path = os.path.join(cfg_dir, 'ca.pem') + if 'certificate' in ocserv['ssl']: cert_name = ocserv['ssl']['certificate'] pki_cert = ocserv['pki']['certificate'][cert_name] - with open(cert_file_path, 'w') as f: - f.write(wrap_certificate(pki_cert['certificate'])) + loaded_pki_cert = load_certificate(pki_cert['certificate']) + loaded_ca_certs = {load_certificate(c['certificate']) + for c in ocserv['pki']['ca'].values()} if 'ca' in ocserv['pki'] else {} + + cert_full_chain = find_chain(loaded_pki_cert, loaded_ca_certs) + + write_file(cert_file_path, + '\n'.join(encode_certificate(c) for c in cert_full_chain)) if 'private' in pki_cert and 'key' in pki_cert['private']: - with open(cert_key_path, 'w') as f: - f.write(wrap_private_key(pki_cert['private']['key'])) + write_file(cert_key_path, wrap_private_key(pki_cert['private']['key'])) if 'ca_certificate' in ocserv['ssl']: - ca_name = ocserv['ssl']['ca_certificate'] - pki_ca_cert = ocserv['pki']['ca'][ca_name] + ca_cert_file_path = os.path.join(cfg_dir, 'ca.pem') + ca_chains = [] + + for ca_name in ocserv['ssl']['ca_certificate']: + pki_ca_cert = ocserv['pki']['ca'][ca_name] + loaded_ca_cert = load_certificate(pki_ca_cert['certificate']) + ca_full_chain = find_chain(loaded_ca_cert, loaded_ca_certs) + ca_chains.append( + '\n'.join(encode_certificate(c) for c in ca_full_chain)) - with open(ca_cert_file_path, 'w') as f: - f.write(wrap_certificate(pki_ca_cert['certificate'])) + write_file(ca_cert_file_path, '\n'.join(ca_chains)) # Render config render(ocserv_conf, 'ocserv/ocserv_config.j2', ocserv) diff --git a/src/op_mode/ikev2_profile_generator.py b/src/op_mode/ikev2_profile_generator.py index 169a15840..b193d8109 100755 --- a/src/op_mode/ikev2_profile_generator.py +++ b/src/op_mode/ikev2_profile_generator.py @@ -21,6 +21,7 @@ from socket import getfqdn from cryptography.x509.oid import NameOID from vyos.configquery import ConfigTreeQuery +from vyos.config import config_dict_mangle_acme from vyos.pki import CERT_BEGIN from vyos.pki import CERT_END from vyos.pki import find_chain @@ -123,6 +124,8 @@ pki_base = ['pki'] conf = ConfigTreeQuery() if not conf.exists(config_base): exit('IPsec remote-access is not configured!') +if not conf.exists(pki_base): + exit('PKI is not configured!') profile_name = 'VyOS IKEv2 Profile' if args.profile: @@ -147,30 +150,36 @@ tmp = getfqdn().split('.') tmp = reversed(tmp) data['rfqdn'] = '.'.join(tmp) -pki = conf.get_config_dict(pki_base, get_first_key=True) -cert_name = data['authentication']['x509']['certificate'] - -cert_data = load_certificate(pki['certificate'][cert_name]['certificate']) -data['cert_common_name'] = cert_data.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value -data['ca_common_name'] = cert_data.issuer.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value -data['ca_certificates'] = [] - -loaded_ca_certs = {load_certificate(c['certificate']) - for c in pki['ca'].values()} if 'ca' in pki else {} - -for ca_name in data['authentication']['x509']['ca_certificate']: - loaded_ca_cert = load_certificate(pki['ca'][ca_name]['certificate']) - ca_full_chain = find_chain(loaded_ca_cert, loaded_ca_certs) - for ca in ca_full_chain: - tmp = { - 'ca_name' : ca.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value, - 'ca_chain' : encode_certificate(ca).replace(CERT_BEGIN, '').replace(CERT_END, '').replace('\n', ''), - } - data['ca_certificates'].append(tmp) - -# Remove duplicate list entries for CA certificates, as they are added by their common name -# https://stackoverflow.com/a/9427216 -data['ca_certificates'] = [dict(t) for t in {tuple(d.items()) for d in data['ca_certificates']}] +if args.os == 'ios': + pki = conf.get_config_dict(pki_base, get_first_key=True) + if 'certificate' in pki: + for certificate in pki['certificate']: + pki['certificate'][certificate] = config_dict_mangle_acme(certificate, pki['certificate'][certificate]) + + cert_name = data['authentication']['x509']['certificate'] + + + cert_data = load_certificate(pki['certificate'][cert_name]['certificate']) + data['cert_common_name'] = cert_data.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value + data['ca_common_name'] = cert_data.issuer.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value + data['ca_certificates'] = [] + + loaded_ca_certs = {load_certificate(c['certificate']) + for c in pki['ca'].values()} if 'ca' in pki else {} + + for ca_name in data['authentication']['x509']['ca_certificate']: + loaded_ca_cert = load_certificate(pki['ca'][ca_name]['certificate']) + ca_full_chain = find_chain(loaded_ca_cert, loaded_ca_certs) + for ca in ca_full_chain: + tmp = { + 'ca_name' : ca.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value, + 'ca_chain' : encode_certificate(ca).replace(CERT_BEGIN, '').replace(CERT_END, '').replace('\n', ''), + } + data['ca_certificates'].append(tmp) + + # Remove duplicate list entries for CA certificates, as they are added by their common name + # https://stackoverflow.com/a/9427216 + data['ca_certificates'] = [dict(t) for t in {tuple(d.items()) for d in data['ca_certificates']}] esp_proposals = conf.get_config_dict(ipsec_base + ['esp-group', data['esp_group'], 'proposal'], key_mangling=('-', '_'), get_first_key=True) diff --git a/src/op_mode/pki.py b/src/op_mode/pki.py index 4490e609c..57b97a47d 100755 --- a/src/op_mode/pki.py +++ b/src/op_mode/pki.py @@ -426,11 +426,15 @@ def generate_ca_certificate_sign(name, ca_name, install=False, file=False): return None cert = generate_certificate(cert_req, ca_cert, ca_private_key, is_ca=True, is_sub_ca=True) - passphrase = ask_passphrase() + + passphrase = None + if private_key is not None: + passphrase = ask_passphrase() if not install and not file: print(encode_certificate(cert)) - print(encode_private_key(private_key, passphrase=passphrase)) + if private_key is not None: + print(encode_private_key(private_key, passphrase=passphrase)) return None if install: @@ -438,7 +442,8 @@ def generate_ca_certificate_sign(name, ca_name, install=False, file=False): if file: write_file(f'{name}.pem', encode_certificate(cert)) - write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase)) + if private_key is not None: + write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase)) def generate_certificate_sign(name, ca_name, install=False, file=False): ca_dict = get_config_ca_certificate(ca_name) @@ -492,11 +497,15 @@ def generate_certificate_sign(name, ca_name, install=False, file=False): return None cert = generate_certificate(cert_req, ca_cert, ca_private_key, is_ca=False) - passphrase = ask_passphrase() + + passphrase = None + if private_key is not None: + passphrase = ask_passphrase() if not install and not file: print(encode_certificate(cert)) - print(encode_private_key(private_key, passphrase=passphrase)) + if private_key is not None: + print(encode_private_key(private_key, passphrase=passphrase)) return None if install: @@ -504,7 +513,8 @@ def generate_certificate_sign(name, ca_name, install=False, file=False): if file: write_file(f'{name}.pem', encode_certificate(cert)) - write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase)) + if private_key is not None: + write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase)) def generate_certificate_selfsign(name, install=False, file=False): private_key, key_type = generate_private_key() |