From f5a8a9cdfe52c331177c8bc7b8fb84fc08d4f60a Mon Sep 17 00:00:00 2001 From: sarthurdev <965089+sarthurdev@users.noreply.github.com> Date: Tue, 29 Jun 2021 11:06:44 +0200 Subject: pki: ipsec: T3642: Migrate IPSec to use PKI configuration --- src/conf_mode/vpn_ipsec.py | 89 +++++++++++++++++++++++++++++++--------------- 1 file changed, 61 insertions(+), 28 deletions(-) (limited to 'src/conf_mode') diff --git a/src/conf_mode/vpn_ipsec.py b/src/conf_mode/vpn_ipsec.py index d598ff6da..e8e8b453a 100755 --- a/src/conf_mode/vpn_ipsec.py +++ b/src/conf_mode/vpn_ipsec.py @@ -23,6 +23,10 @@ from vyos.config import Config from vyos.configdict import leaf_node_changed from vyos.configverify import verify_interface_exists from vyos.ifconfig import Interface +from vyos.pki import wrap_certificate +from vyos.pki import wrap_crl +from vyos.pki import wrap_public_key +from vyos.pki import wrap_private_key from vyos.template import ip_from_cidr from vyos.template import render from vyos.validate import is_ipv6_link_local @@ -115,6 +119,8 @@ def get_config(config=None): ipsec['interface_change'] = leaf_node_changed(conf, base + ['ipsec-interfaces', 'interface']) ipsec['l2tp_exists'] = conf.exists(['vpn', 'l2tp', 'remote-access', 'ipsec-settings']) ipsec['nhrp_exists'] = conf.exists(['protocols', 'nhrp', 'tunnel']) + ipsec['pki'] = conf.get_config_dict(['pki'], key_mangling=('-', '_'), + get_first_key=True, no_tag_node_value_mangle=True) ipsec['rsa_keys'] = conf.get_config_dict(['vpn', 'rsa-keys'], key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True) @@ -187,6 +193,24 @@ def get_dhcp_address(iface): return ip_from_cidr(address) return None +def verify_pki(pki, x509_conf): + if not pki or 'ca' not in pki or 'certificate' not in pki: + raise ConfigError(f'PKI is not configured') + + ca_cert_name = x509_conf['ca_certificate'] + cert_name = x509_conf['certificate'] + + if not dict_search(f'ca.{ca_cert_name}.certificate', ipsec['pki']): + raise ConfigError(f'Missing CA certificate on specified PKI CA certificate "{ca_cert_name}"') + + if not dict_search(f'certificate.{cert_name}.certificate', ipsec['pki']): + raise ConfigError(f'Missing certificate on specified PKI certificate "{cert_name}"') + + if not dict_search(f'certificate.{cert_name}.private.key', ipsec['pki']): + raise ConfigError(f'Missing private key on specified PKI certificate "{cert_name}"') + + return True + def verify(ipsec): if not ipsec: return None @@ -237,24 +261,12 @@ def verify(ipsec): if 'x509' not in peer_conf['authentication']: raise ConfigError(f"Missing x509 settings on site-to-site peer {peer}") - if 'key' not in peer_conf['authentication']['x509']: - raise ConfigError(f"Missing x509 key on site-to-site peer {peer}") - - if 'ca_cert_file' not in peer_conf['authentication']['x509'] or 'cert_file' not in peer_conf['authentication']['x509']: - raise ConfigError(f"Missing x509 settings on site-to-site peer {peer}") + x509 = peer_conf['authentication']['x509'] - if 'file' not in peer_conf['authentication']['x509']['key']: - raise ConfigError(f"Missing x509 key file on site-to-site peer {peer}") + if 'ca_certificate' not in x509 or 'certificate' not in x509: + raise ConfigError(f"Missing x509 certificates on site-to-site peer {peer}") - for key in ['ca_cert_file', 'cert_file', 'crl_file']: - if key in peer_conf['authentication']['x509']: - path = os.path.join(X509_PATH, peer_conf['authentication']['x509'][key]) - if not os.path.exists(path): - raise ConfigError(f"File not found for {key} on site-to-site peer {peer}") - - key_path = os.path.join(X509_PATH, peer_conf['authentication']['x509']['key']['file']) - if not os.path.exists(key_path): - raise ConfigError(f"Private key not found on site-to-site peer {peer}") + verify_pki(ipsec['pki'], x509) if peer_conf['authentication']['mode'] == 'rsa': if not verify_rsa_local_key(ipsec): @@ -320,6 +332,31 @@ def verify(ipsec): if ('local' in tunnel_conf and 'prefix' in tunnel_conf['local']) or ('remote' in tunnel_conf and 'prefix' in tunnel_conf['remote']): raise ConfigError(f"Local/remote prefix cannot be used with ESP transport mode on tunnel {tunnel} for site-to-site peer {peer}") +def generate_pki_files(pki, x509_conf): + ca_cert_name = x509_conf['ca_certificate'] + ca_cert_data = dict_search(f'ca.{ca_cert_name}.certificate', pki) + ca_cert_crls = dict_search(f'ca.{ca_cert_name}.crl', pki) or [] + crl_index = 1 + + cert_name = x509_conf['certificate'] + cert_data = dict_search(f'certificate.{cert_name}.certificate', pki) + key_data = dict_search(f'certificate.{cert_name}.private.key', pki) + protected = 'passphrase' in x509_conf + + with open(os.path.join(CA_PATH, f'{ca_cert_name}.pem'), 'w') as f: + f.write(wrap_certificate(ca_cert_data)) + + for crl in ca_cert_crls: + with open(os.path.join(CRL_PATH, f'{ca_cert_name}_{crl_index}.pem'), 'w') as f: + f.write(wrap_crl(crl)) + crl_index += 1 + + with open(os.path.join(CERT_PATH, f'{cert_name}.pem'), 'w') as f: + f.write(wrap_certificate(cert_data)) + + with open(os.path.join(KEY_PATH, f'{cert_name}.pem'), 'w') as f: + f.write(wrap_private_key(key_data, protected)) + def generate(ipsec): data = {} @@ -334,24 +371,20 @@ def generate(ipsec): data['marks'] = {} data['rsa_local_key'] = verify_rsa_local_key(ipsec) + for path in [swanctl_dir, CERT_PATH, CA_PATH, CRL_PATH]: + if not os.path.exists(path): + os.mkdir(path, mode=0o755) + + if not os.path.exists(KEY_PATH): + os.mkdir(KEY_PATH, mode=0o700) + if 'site_to_site' in data and 'peer' in data['site_to_site']: for peer, peer_conf in ipsec['site_to_site']['peer'].items(): if peer in ipsec['dhcp_no_address']: continue if peer_conf['authentication']['mode'] == 'x509': - cert_file = os.path.join(X509_PATH, peer_conf['authentication']['x509']['cert_file']) - copy_file(cert_file, CERT_PATH, True) - - key_file = os.path.join(X509_PATH, peer_conf['authentication']['x509']['key']['file']) - copy_file(key_file, X509_PATH, True) - - ca_cert_file = os.path.join(X509_PATH, peer_conf['authentication']['x509']['ca_cert_file']) - copy_file(ca_cert_file, CA_PATH, True) - - if 'crl_file' in peer_conf['authentication']['x509']: - crl_file = os.path.join(X509_PATH, peer_conf['authentication']['x509']['crl_file']) - copy_file(crl_file, CRL_PATH, True) + generate_pki_files(ipsec['pki'], peer_conf['authentication']['x509']) local_ip = '' if 'local_address' in peer_conf: -- cgit v1.2.3