diff options
Diffstat (limited to 'src/conf_mode')
-rwxr-xr-x | src/conf_mode/pki.py | 167 | ||||
-rwxr-xr-x | src/conf_mode/vpn_ipsec.py | 90 |
2 files changed, 228 insertions, 29 deletions
diff --git a/src/conf_mode/pki.py b/src/conf_mode/pki.py new file mode 100755 index 000000000..ef1b57650 --- /dev/null +++ b/src/conf_mode/pki.py @@ -0,0 +1,167 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2021 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from sys import exit + +from vyos.config import Config +from vyos.configdict import dict_merge +from vyos.pki import is_ca_certificate +from vyos.pki import load_certificate +from vyos.pki import load_certificate_request +from vyos.pki import load_public_key +from vyos.pki import load_private_key +from vyos.pki import load_crl +from vyos.pki import load_dh_parameters +from vyos.util import ask_input +from vyos.xml import defaults +from vyos import ConfigError +from vyos import airbag +airbag.enable() + +def get_config(config=None): + if config: + conf = config + else: + conf = Config() + base = ['pki'] + if not conf.exists(base): + return None + + pki = conf.get_config_dict(base, key_mangling=('-', '_'), + get_first_key=True, no_tag_node_value_mangle=True) + + default_values = defaults(base) + pki = dict_merge(default_values, pki) + return pki + +def is_valid_certificate(raw_data): + # If it loads correctly we're good, or return False + return load_certificate(raw_data, wrap_tags=True) + +def is_valid_ca_certificate(raw_data): + # Check if this is a valid certificate with CA attributes + cert = load_certificate(raw_data, wrap_tags=True) + if not cert: + return False + return is_ca_certificate(cert) + +def is_valid_public_key(raw_data): + # If it loads correctly we're good, or return False + return load_public_key(raw_data, wrap_tags=True) + +def is_valid_private_key(raw_data, protected=False): + # If it loads correctly we're good, or return False + # With encrypted private keys, we always return true as we cannot ask for password to verify + if protected: + return True + return load_private_key(raw_data, passphrase=None, wrap_tags=True) + +def is_valid_crl(raw_data): + # If it loads correctly we're good, or return False + return load_crl(raw_data, wrap_tags=True) + +def is_valid_dh_parameters(raw_data): + # If it loads correctly we're good, or return False + return load_dh_parameters(raw_data, wrap_tags=True) + +def verify(pki): + if not pki: + return None + + if 'ca' in pki: + for name, ca_conf in pki['ca'].items(): + if 'certificate' in ca_conf: + if not is_valid_ca_certificate(ca_conf['certificate']): + raise ConfigError(f'Invalid certificate on CA certificate "{name}"') + + if 'private' in ca_conf and 'key' in ca_conf['private']: + private = ca_conf['private'] + protected = 'password_protected' in private + + if not is_valid_private_key(private['key'], protected): + raise ConfigError(f'Invalid private key on CA certificate "{name}"') + + if 'crl' in ca_conf: + ca_crls = ca_conf['crl'] + if isinstance(ca_crls, str): + ca_crls = [ca_crls] + + for crl in ca_crls: + if not is_valid_crl(crl): + raise ConfigError(f'Invalid CRL on CA certificate "{name}"') + + if 'certificate' in pki: + for name, cert_conf in pki['certificate'].items(): + if 'certificate' in cert_conf: + if not is_valid_certificate(cert_conf['certificate']): + raise ConfigError(f'Invalid certificate on certificate "{name}"') + + if 'private' in cert_conf and 'key' in cert_conf['private']: + private = cert_conf['private'] + protected = 'password_protected' in private + + if not is_valid_private_key(private['key'], protected): + raise ConfigError(f'Invalid private key on certificate "{name}"') + + if 'dh' in pki: + for name, dh_conf in pki['dh'].items(): + if 'parameters' in dh_conf: + if not is_valid_dh_parameters(dh_conf['parameters']): + raise ConfigError(f'Invalid DH parameters on "{name}"') + + if 'key_pair' in pki: + for name, key_conf in pki['key_pair'].items(): + if 'public' in key_conf and 'key' in key_conf['public']: + if not is_valid_public_key(key_conf['public']['key']): + raise ConfigError(f'Invalid public key on key-pair "{name}"') + + if 'private' in key_conf and 'key' in key_conf['private']: + private = key_conf['private'] + protected = 'password_protected' in private + if not is_valid_private_key(private['key'], protected): + raise ConfigError(f'Invalid private key on key-pair "{name}"') + + if 'x509' in pki: + if 'default' in pki['x509']: + default_values = pki['x509']['default'] + if 'country' in default_values: + country = default_values['country'] + if len(country) != 2 or not country.isalpha(): + raise ConfigError(f'Invalid default country value. Value must be 2 alpha characters.') + + return None + +def generate(pki): + if not pki: + return None + + return None + +def apply(pki): + if not pki: + return None + + return None + +if __name__ == '__main__': + try: + c = get_config() + verify(c) + generate(c) + apply(c) + except ConfigError as e: + print(e) + exit(1) diff --git a/src/conf_mode/vpn_ipsec.py b/src/conf_mode/vpn_ipsec.py index a141fdddf..70b4c52e6 100755 --- a/src/conf_mode/vpn_ipsec.py +++ b/src/conf_mode/vpn_ipsec.py @@ -23,6 +23,10 @@ from vyos.config import Config from vyos.configdict import leaf_node_changed from vyos.configverify import verify_interface_exists from vyos.ifconfig import Interface +from vyos.pki import wrap_certificate +from vyos.pki import wrap_crl +from vyos.pki import wrap_public_key +from vyos.pki import wrap_private_key from vyos.template import ip_from_cidr from vyos.template import render from vyos.validate import is_ipv6_link_local @@ -113,6 +117,8 @@ def get_config(config=None): ipsec['interface_change'] = leaf_node_changed(conf, base + ['ipsec-interfaces', 'interface']) ipsec['l2tp_exists'] = conf.exists(['vpn', 'l2tp', 'remote-access', 'ipsec-settings']) ipsec['nhrp_exists'] = conf.exists(['protocols', 'nhrp', 'tunnel']) + ipsec['pki'] = conf.get_config_dict(['pki'], key_mangling=('-', '_'), + get_first_key=True, no_tag_node_value_mangle=True) ipsec['rsa_keys'] = conf.get_config_dict(['vpn', 'rsa-keys'], key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True) @@ -185,6 +191,24 @@ def get_dhcp_address(iface): return ip_from_cidr(address) return None +def verify_pki(pki, x509_conf): + if not pki or 'ca' not in pki or 'certificate' not in pki: + raise ConfigError(f'PKI is not configured') + + ca_cert_name = x509_conf['ca_certificate'] + cert_name = x509_conf['certificate'] + + if not dict_search(f'ca.{ca_cert_name}.certificate', ipsec['pki']): + raise ConfigError(f'Missing CA certificate on specified PKI CA certificate "{ca_cert_name}"') + + if not dict_search(f'certificate.{cert_name}.certificate', ipsec['pki']): + raise ConfigError(f'Missing certificate on specified PKI certificate "{cert_name}"') + + if not dict_search(f'certificate.{cert_name}.private.key', ipsec['pki']): + raise ConfigError(f'Missing private key on specified PKI certificate "{cert_name}"') + + return True + def verify(ipsec): if not ipsec: return None @@ -235,24 +259,12 @@ def verify(ipsec): if 'x509' not in peer_conf['authentication']: raise ConfigError(f"Missing x509 settings on site-to-site peer {peer}") - if 'key' not in peer_conf['authentication']['x509']: - raise ConfigError(f"Missing x509 key on site-to-site peer {peer}") - - if 'ca_cert_file' not in peer_conf['authentication']['x509'] or 'cert_file' not in peer_conf['authentication']['x509']: - raise ConfigError(f"Missing x509 settings on site-to-site peer {peer}") + x509 = peer_conf['authentication']['x509'] - if 'file' not in peer_conf['authentication']['x509']['key']: - raise ConfigError(f"Missing x509 key file on site-to-site peer {peer}") + if 'ca_certificate' not in x509 or 'certificate' not in x509: + raise ConfigError(f"Missing x509 certificates on site-to-site peer {peer}") - for key in ['ca_cert_file', 'cert_file', 'crl_file']: - if key in peer_conf['authentication']['x509']: - path = os.path.join(X509_PATH, peer_conf['authentication']['x509'][key]) - if not os.path.exists(path): - raise ConfigError(f"File not found for {key} on site-to-site peer {peer}") - - key_path = os.path.join(X509_PATH, peer_conf['authentication']['x509']['key']['file']) - if not os.path.exists(key_path): - raise ConfigError(f"Private key not found on site-to-site peer {peer}") + verify_pki(ipsec['pki'], x509) if peer_conf['authentication']['mode'] == 'rsa': if not verify_rsa_local_key(ipsec): @@ -318,6 +330,31 @@ def verify(ipsec): if ('local' in tunnel_conf and 'prefix' in tunnel_conf['local']) or ('remote' in tunnel_conf and 'prefix' in tunnel_conf['remote']): raise ConfigError(f"Local/remote prefix cannot be used with ESP transport mode on tunnel {tunnel} for site-to-site peer {peer}") +def generate_pki_files(pki, x509_conf): + ca_cert_name = x509_conf['ca_certificate'] + ca_cert_data = dict_search(f'ca.{ca_cert_name}.certificate', pki) + ca_cert_crls = dict_search(f'ca.{ca_cert_name}.crl', pki) or [] + crl_index = 1 + + cert_name = x509_conf['certificate'] + cert_data = dict_search(f'certificate.{cert_name}.certificate', pki) + key_data = dict_search(f'certificate.{cert_name}.private.key', pki) + protected = 'passphrase' in x509_conf + + with open(os.path.join(CA_PATH, f'{ca_cert_name}.pem'), 'w') as f: + f.write(wrap_certificate(ca_cert_data)) + + for crl in ca_cert_crls: + with open(os.path.join(CRL_PATH, f'{ca_cert_name}_{crl_index}.pem'), 'w') as f: + f.write(wrap_crl(crl)) + crl_index += 1 + + with open(os.path.join(CERT_PATH, f'{cert_name}.pem'), 'w') as f: + f.write(wrap_certificate(cert_data)) + + with open(os.path.join(KEY_PATH, f'{cert_name}.pem'), 'w') as f: + f.write(wrap_private_key(key_data, protected)) + def generate(ipsec): if not ipsec: for config_file in [ipsec_conf, ipsec_secrets, interface_conf, swanctl_conf]: @@ -336,25 +373,20 @@ def generate(ipsec): data['ciphers'] = {'ike': ike_ciphers, 'esp': esp_ciphers} data['rsa_local_key'] = verify_rsa_local_key(ipsec) + for path in [swanctl_dir, CERT_PATH, CA_PATH, CRL_PATH]: + if not os.path.exists(path): + os.mkdir(path, mode=0o755) + + if not os.path.exists(KEY_PATH): + os.mkdir(KEY_PATH, mode=0o700) + if 'site_to_site' in data and 'peer' in data['site_to_site']: for peer, peer_conf in ipsec['site_to_site']['peer'].items(): if peer in ipsec['dhcp_no_address']: continue if peer_conf['authentication']['mode'] == 'x509': - cert_file = os.path.join(X509_PATH, dict_search('authentication.x509.cert_file', peer_conf)) - copy_file(cert_file, CERT_PATH, True) - - key_file = os.path.join(X509_PATH, dict_search('authentication.x509.key.file', peer_conf)) - copy_file(key_file, KEY_PATH, True) - - ca_cert_file = os.path.join(X509_PATH, dict_search('authentication.x509.ca_cert_file', peer_conf)) - copy_file(ca_cert_file, CA_PATH, True) - - crl = dict_search('authentication.x509.crl_file', peer_conf) - if crl: - crl_file = os.path.join(X509_PATH, crl) - copy_file(crl_file, CRL_PATH, True) + generate_pki_files(ipsec['pki'], peer_conf['authentication']['x509']) local_ip = '' if 'local_address' in peer_conf: |