summaryrefslogtreecommitdiff
path: root/src/conf_mode
diff options
context:
space:
mode:
Diffstat (limited to 'src/conf_mode')
-rwxr-xr-xsrc/conf_mode/pki.py167
-rwxr-xr-xsrc/conf_mode/vpn_ipsec.py90
2 files changed, 228 insertions, 29 deletions
diff --git a/src/conf_mode/pki.py b/src/conf_mode/pki.py
new file mode 100755
index 000000000..ef1b57650
--- /dev/null
+++ b/src/conf_mode/pki.py
@@ -0,0 +1,167 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 2021 VyOS maintainers and contributors
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 or later as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+from sys import exit
+
+from vyos.config import Config
+from vyos.configdict import dict_merge
+from vyos.pki import is_ca_certificate
+from vyos.pki import load_certificate
+from vyos.pki import load_certificate_request
+from vyos.pki import load_public_key
+from vyos.pki import load_private_key
+from vyos.pki import load_crl
+from vyos.pki import load_dh_parameters
+from vyos.util import ask_input
+from vyos.xml import defaults
+from vyos import ConfigError
+from vyos import airbag
+airbag.enable()
+
+def get_config(config=None):
+ if config:
+ conf = config
+ else:
+ conf = Config()
+ base = ['pki']
+ if not conf.exists(base):
+ return None
+
+ pki = conf.get_config_dict(base, key_mangling=('-', '_'),
+ get_first_key=True, no_tag_node_value_mangle=True)
+
+ default_values = defaults(base)
+ pki = dict_merge(default_values, pki)
+ return pki
+
+def is_valid_certificate(raw_data):
+ # If it loads correctly we're good, or return False
+ return load_certificate(raw_data, wrap_tags=True)
+
+def is_valid_ca_certificate(raw_data):
+ # Check if this is a valid certificate with CA attributes
+ cert = load_certificate(raw_data, wrap_tags=True)
+ if not cert:
+ return False
+ return is_ca_certificate(cert)
+
+def is_valid_public_key(raw_data):
+ # If it loads correctly we're good, or return False
+ return load_public_key(raw_data, wrap_tags=True)
+
+def is_valid_private_key(raw_data, protected=False):
+ # If it loads correctly we're good, or return False
+ # With encrypted private keys, we always return true as we cannot ask for password to verify
+ if protected:
+ return True
+ return load_private_key(raw_data, passphrase=None, wrap_tags=True)
+
+def is_valid_crl(raw_data):
+ # If it loads correctly we're good, or return False
+ return load_crl(raw_data, wrap_tags=True)
+
+def is_valid_dh_parameters(raw_data):
+ # If it loads correctly we're good, or return False
+ return load_dh_parameters(raw_data, wrap_tags=True)
+
+def verify(pki):
+ if not pki:
+ return None
+
+ if 'ca' in pki:
+ for name, ca_conf in pki['ca'].items():
+ if 'certificate' in ca_conf:
+ if not is_valid_ca_certificate(ca_conf['certificate']):
+ raise ConfigError(f'Invalid certificate on CA certificate "{name}"')
+
+ if 'private' in ca_conf and 'key' in ca_conf['private']:
+ private = ca_conf['private']
+ protected = 'password_protected' in private
+
+ if not is_valid_private_key(private['key'], protected):
+ raise ConfigError(f'Invalid private key on CA certificate "{name}"')
+
+ if 'crl' in ca_conf:
+ ca_crls = ca_conf['crl']
+ if isinstance(ca_crls, str):
+ ca_crls = [ca_crls]
+
+ for crl in ca_crls:
+ if not is_valid_crl(crl):
+ raise ConfigError(f'Invalid CRL on CA certificate "{name}"')
+
+ if 'certificate' in pki:
+ for name, cert_conf in pki['certificate'].items():
+ if 'certificate' in cert_conf:
+ if not is_valid_certificate(cert_conf['certificate']):
+ raise ConfigError(f'Invalid certificate on certificate "{name}"')
+
+ if 'private' in cert_conf and 'key' in cert_conf['private']:
+ private = cert_conf['private']
+ protected = 'password_protected' in private
+
+ if not is_valid_private_key(private['key'], protected):
+ raise ConfigError(f'Invalid private key on certificate "{name}"')
+
+ if 'dh' in pki:
+ for name, dh_conf in pki['dh'].items():
+ if 'parameters' in dh_conf:
+ if not is_valid_dh_parameters(dh_conf['parameters']):
+ raise ConfigError(f'Invalid DH parameters on "{name}"')
+
+ if 'key_pair' in pki:
+ for name, key_conf in pki['key_pair'].items():
+ if 'public' in key_conf and 'key' in key_conf['public']:
+ if not is_valid_public_key(key_conf['public']['key']):
+ raise ConfigError(f'Invalid public key on key-pair "{name}"')
+
+ if 'private' in key_conf and 'key' in key_conf['private']:
+ private = key_conf['private']
+ protected = 'password_protected' in private
+ if not is_valid_private_key(private['key'], protected):
+ raise ConfigError(f'Invalid private key on key-pair "{name}"')
+
+ if 'x509' in pki:
+ if 'default' in pki['x509']:
+ default_values = pki['x509']['default']
+ if 'country' in default_values:
+ country = default_values['country']
+ if len(country) != 2 or not country.isalpha():
+ raise ConfigError(f'Invalid default country value. Value must be 2 alpha characters.')
+
+ return None
+
+def generate(pki):
+ if not pki:
+ return None
+
+ return None
+
+def apply(pki):
+ if not pki:
+ return None
+
+ return None
+
+if __name__ == '__main__':
+ try:
+ c = get_config()
+ verify(c)
+ generate(c)
+ apply(c)
+ except ConfigError as e:
+ print(e)
+ exit(1)
diff --git a/src/conf_mode/vpn_ipsec.py b/src/conf_mode/vpn_ipsec.py
index a141fdddf..70b4c52e6 100755
--- a/src/conf_mode/vpn_ipsec.py
+++ b/src/conf_mode/vpn_ipsec.py
@@ -23,6 +23,10 @@ from vyos.config import Config
from vyos.configdict import leaf_node_changed
from vyos.configverify import verify_interface_exists
from vyos.ifconfig import Interface
+from vyos.pki import wrap_certificate
+from vyos.pki import wrap_crl
+from vyos.pki import wrap_public_key
+from vyos.pki import wrap_private_key
from vyos.template import ip_from_cidr
from vyos.template import render
from vyos.validate import is_ipv6_link_local
@@ -113,6 +117,8 @@ def get_config(config=None):
ipsec['interface_change'] = leaf_node_changed(conf, base + ['ipsec-interfaces', 'interface'])
ipsec['l2tp_exists'] = conf.exists(['vpn', 'l2tp', 'remote-access', 'ipsec-settings'])
ipsec['nhrp_exists'] = conf.exists(['protocols', 'nhrp', 'tunnel'])
+ ipsec['pki'] = conf.get_config_dict(['pki'], key_mangling=('-', '_'),
+ get_first_key=True, no_tag_node_value_mangle=True)
ipsec['rsa_keys'] = conf.get_config_dict(['vpn', 'rsa-keys'], key_mangling=('-', '_'),
get_first_key=True, no_tag_node_value_mangle=True)
@@ -185,6 +191,24 @@ def get_dhcp_address(iface):
return ip_from_cidr(address)
return None
+def verify_pki(pki, x509_conf):
+ if not pki or 'ca' not in pki or 'certificate' not in pki:
+ raise ConfigError(f'PKI is not configured')
+
+ ca_cert_name = x509_conf['ca_certificate']
+ cert_name = x509_conf['certificate']
+
+ if not dict_search(f'ca.{ca_cert_name}.certificate', ipsec['pki']):
+ raise ConfigError(f'Missing CA certificate on specified PKI CA certificate "{ca_cert_name}"')
+
+ if not dict_search(f'certificate.{cert_name}.certificate', ipsec['pki']):
+ raise ConfigError(f'Missing certificate on specified PKI certificate "{cert_name}"')
+
+ if not dict_search(f'certificate.{cert_name}.private.key', ipsec['pki']):
+ raise ConfigError(f'Missing private key on specified PKI certificate "{cert_name}"')
+
+ return True
+
def verify(ipsec):
if not ipsec:
return None
@@ -235,24 +259,12 @@ def verify(ipsec):
if 'x509' not in peer_conf['authentication']:
raise ConfigError(f"Missing x509 settings on site-to-site peer {peer}")
- if 'key' not in peer_conf['authentication']['x509']:
- raise ConfigError(f"Missing x509 key on site-to-site peer {peer}")
-
- if 'ca_cert_file' not in peer_conf['authentication']['x509'] or 'cert_file' not in peer_conf['authentication']['x509']:
- raise ConfigError(f"Missing x509 settings on site-to-site peer {peer}")
+ x509 = peer_conf['authentication']['x509']
- if 'file' not in peer_conf['authentication']['x509']['key']:
- raise ConfigError(f"Missing x509 key file on site-to-site peer {peer}")
+ if 'ca_certificate' not in x509 or 'certificate' not in x509:
+ raise ConfigError(f"Missing x509 certificates on site-to-site peer {peer}")
- for key in ['ca_cert_file', 'cert_file', 'crl_file']:
- if key in peer_conf['authentication']['x509']:
- path = os.path.join(X509_PATH, peer_conf['authentication']['x509'][key])
- if not os.path.exists(path):
- raise ConfigError(f"File not found for {key} on site-to-site peer {peer}")
-
- key_path = os.path.join(X509_PATH, peer_conf['authentication']['x509']['key']['file'])
- if not os.path.exists(key_path):
- raise ConfigError(f"Private key not found on site-to-site peer {peer}")
+ verify_pki(ipsec['pki'], x509)
if peer_conf['authentication']['mode'] == 'rsa':
if not verify_rsa_local_key(ipsec):
@@ -318,6 +330,31 @@ def verify(ipsec):
if ('local' in tunnel_conf and 'prefix' in tunnel_conf['local']) or ('remote' in tunnel_conf and 'prefix' in tunnel_conf['remote']):
raise ConfigError(f"Local/remote prefix cannot be used with ESP transport mode on tunnel {tunnel} for site-to-site peer {peer}")
+def generate_pki_files(pki, x509_conf):
+ ca_cert_name = x509_conf['ca_certificate']
+ ca_cert_data = dict_search(f'ca.{ca_cert_name}.certificate', pki)
+ ca_cert_crls = dict_search(f'ca.{ca_cert_name}.crl', pki) or []
+ crl_index = 1
+
+ cert_name = x509_conf['certificate']
+ cert_data = dict_search(f'certificate.{cert_name}.certificate', pki)
+ key_data = dict_search(f'certificate.{cert_name}.private.key', pki)
+ protected = 'passphrase' in x509_conf
+
+ with open(os.path.join(CA_PATH, f'{ca_cert_name}.pem'), 'w') as f:
+ f.write(wrap_certificate(ca_cert_data))
+
+ for crl in ca_cert_crls:
+ with open(os.path.join(CRL_PATH, f'{ca_cert_name}_{crl_index}.pem'), 'w') as f:
+ f.write(wrap_crl(crl))
+ crl_index += 1
+
+ with open(os.path.join(CERT_PATH, f'{cert_name}.pem'), 'w') as f:
+ f.write(wrap_certificate(cert_data))
+
+ with open(os.path.join(KEY_PATH, f'{cert_name}.pem'), 'w') as f:
+ f.write(wrap_private_key(key_data, protected))
+
def generate(ipsec):
if not ipsec:
for config_file in [ipsec_conf, ipsec_secrets, interface_conf, swanctl_conf]:
@@ -336,25 +373,20 @@ def generate(ipsec):
data['ciphers'] = {'ike': ike_ciphers, 'esp': esp_ciphers}
data['rsa_local_key'] = verify_rsa_local_key(ipsec)
+ for path in [swanctl_dir, CERT_PATH, CA_PATH, CRL_PATH]:
+ if not os.path.exists(path):
+ os.mkdir(path, mode=0o755)
+
+ if not os.path.exists(KEY_PATH):
+ os.mkdir(KEY_PATH, mode=0o700)
+
if 'site_to_site' in data and 'peer' in data['site_to_site']:
for peer, peer_conf in ipsec['site_to_site']['peer'].items():
if peer in ipsec['dhcp_no_address']:
continue
if peer_conf['authentication']['mode'] == 'x509':
- cert_file = os.path.join(X509_PATH, dict_search('authentication.x509.cert_file', peer_conf))
- copy_file(cert_file, CERT_PATH, True)
-
- key_file = os.path.join(X509_PATH, dict_search('authentication.x509.key.file', peer_conf))
- copy_file(key_file, KEY_PATH, True)
-
- ca_cert_file = os.path.join(X509_PATH, dict_search('authentication.x509.ca_cert_file', peer_conf))
- copy_file(ca_cert_file, CA_PATH, True)
-
- crl = dict_search('authentication.x509.crl_file', peer_conf)
- if crl:
- crl_file = os.path.join(X509_PATH, crl)
- copy_file(crl_file, CRL_PATH, True)
+ generate_pki_files(ipsec['pki'], peer_conf['authentication']['x509'])
local_ip = ''
if 'local_address' in peer_conf: