diff options
-rw-r--r-- | interface-definitions/include/ipsec/authentication-x509.xml.i | 2 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_vpn_ipsec.py | 11 | ||||
-rwxr-xr-x | src/conf_mode/vpn_ipsec.py | 37 |
3 files changed, 20 insertions, 30 deletions
diff --git a/interface-definitions/include/ipsec/authentication-x509.xml.i b/interface-definitions/include/ipsec/authentication-x509.xml.i index db675c0bf..1d04c94ba 100644 --- a/interface-definitions/include/ipsec/authentication-x509.xml.i +++ b/interface-definitions/include/ipsec/authentication-x509.xml.i @@ -5,7 +5,7 @@ </properties> <children> #include <include/pki/certificate-key.xml.i> - #include <include/pki/ca-certificate.xml.i> + #include <include/pki/ca-certificate-multi.xml.i> </children> </node> <!-- include end --> diff --git a/smoketest/scripts/cli/test_vpn_ipsec.py b/smoketest/scripts/cli/test_vpn_ipsec.py index 09e10a2c4..884394bac 100755 --- a/smoketest/scripts/cli/test_vpn_ipsec.py +++ b/smoketest/scripts/cli/test_vpn_ipsec.py @@ -413,6 +413,7 @@ class TestVPNIPsec(VyOSUnitTestSHIM.TestCase): self.cli_set(peer_base_path + ['authentication', 'local-id', peer_name]) self.cli_set(peer_base_path + ['authentication', 'mode', 'x509']) self.cli_set(peer_base_path + ['authentication', 'remote-id', 'peer2']) + self.cli_set(peer_base_path + ['authentication', 'x509', 'ca-certificate', ca_name]) self.cli_set(peer_base_path + ['authentication', 'x509', 'ca-certificate', int_ca_name]) self.cli_set(peer_base_path + ['authentication', 'x509', 'certificate', peer_name]) self.cli_set(peer_base_path + ['connection-type', 'initiate']) @@ -465,8 +466,8 @@ class TestVPNIPsec(VyOSUnitTestSHIM.TestCase): self.assertIn(line, swanctl_conf) # Check Root CA, Intermediate CA and Peer cert/key pair is present - self.assertTrue(os.path.exists(os.path.join(CA_PATH, f'{int_ca_name}_1.pem'))) - self.assertTrue(os.path.exists(os.path.join(CA_PATH, f'{int_ca_name}_2.pem'))) + self.assertTrue(os.path.exists(os.path.join(CA_PATH, f'{ca_name}.pem'))) + self.assertTrue(os.path.exists(os.path.join(CA_PATH, f'{int_ca_name}.pem'))) self.assertTrue(os.path.exists(os.path.join(CERT_PATH, f'{peer_name}.pem'))) # There is only one VTI test so no need to delete this globally in tearDown() @@ -666,7 +667,7 @@ class TestVPNIPsec(VyOSUnitTestSHIM.TestCase): self.assertIn(line, swanctl_conf) # Check Root CA, Intermediate CA and Peer cert/key pair is present - self.assertTrue(os.path.exists(os.path.join(CA_PATH, f'{ca_name}_1.pem'))) + self.assertTrue(os.path.exists(os.path.join(CA_PATH, f'{ca_name}.pem'))) self.assertTrue(os.path.exists(os.path.join(CERT_PATH, f'{peer_name}.pem'))) self.tearDownPKI() @@ -778,7 +779,7 @@ class TestVPNIPsec(VyOSUnitTestSHIM.TestCase): self.assertIn(line, swanctl_conf) # Check Root CA, Intermediate CA and Peer cert/key pair is present - self.assertTrue(os.path.exists(os.path.join(CA_PATH, f'{ca_name}_1.pem'))) + self.assertTrue(os.path.exists(os.path.join(CA_PATH, f'{ca_name}.pem'))) self.assertTrue(os.path.exists(os.path.join(CERT_PATH, f'{peer_name}.pem'))) self.tearDownPKI() @@ -893,7 +894,7 @@ class TestVPNIPsec(VyOSUnitTestSHIM.TestCase): self.assertIn(line, swanctl_conf) # Check Root CA, Intermediate CA and Peer cert/key pair is present - self.assertTrue(os.path.exists(os.path.join(CA_PATH, f'{ca_name}_1.pem'))) + self.assertTrue(os.path.exists(os.path.join(CA_PATH, f'{ca_name}.pem'))) self.assertTrue(os.path.exists(os.path.join(CERT_PATH, f'{peer_name}.pem'))) self.tearDownPKI() diff --git a/src/conf_mode/vpn_ipsec.py b/src/conf_mode/vpn_ipsec.py index 388f2a709..46f041cce 100755 --- a/src/conf_mode/vpn_ipsec.py +++ b/src/conf_mode/vpn_ipsec.py @@ -32,10 +32,7 @@ from vyos.configverify import verify_interface_exists from vyos.configverify import dynamic_interface_pattern from vyos.defaults import directories from vyos.ifconfig import Interface -from vyos.pki import encode_certificate from vyos.pki import encode_public_key -from vyos.pki import find_chain -from vyos.pki import load_certificate from vyos.pki import load_private_key from vyos.pki import wrap_certificate from vyos.pki import wrap_crl @@ -126,11 +123,11 @@ def verify_pki_x509(pki, x509_conf): if not pki or 'ca' not in pki or 'certificate' not in pki: raise ConfigError(f'PKI is not configured') - ca_cert_name = x509_conf['ca_certificate'] cert_name = x509_conf['certificate'] - if not dict_search_args(pki, 'ca', ca_cert_name, 'certificate'): - raise ConfigError(f'Missing CA certificate on specified PKI CA certificate "{ca_cert_name}"') + for ca_cert_name in x509_conf['ca_certificate']: + if not dict_search_args(pki, 'ca', ca_cert_name, 'certificate'): + raise ConfigError(f'Missing CA certificate on specified PKI CA certificate "{ca_cert_name}"') if not dict_search_args(pki, 'certificate', cert_name, 'certificate'): raise ConfigError(f'Missing certificate on specified PKI certificate "{cert_name}"') @@ -443,32 +440,24 @@ def cleanup_pki_files(): os.unlink(file_path) def generate_pki_files_x509(pki, x509_conf): - ca_cert_name = x509_conf['ca_certificate'] - ca_cert_data = dict_search_args(pki, 'ca', ca_cert_name, 'certificate') - ca_cert_crls = dict_search_args(pki, 'ca', ca_cert_name, 'crl') or [] - ca_index = 1 - crl_index = 1 + for ca_cert_name in x509_conf['ca_certificate']: + ca_cert_data = dict_search_args(pki, 'ca', ca_cert_name, 'certificate') + ca_cert_crls = dict_search_args(pki, 'ca', ca_cert_name, 'crl') or [] + crl_index = 1 - ca_cert = load_certificate(ca_cert_data) - pki_ca_certs = [load_certificate(ca['certificate']) for ca in pki['ca'].values()] + with open(os.path.join(CA_PATH, f'{ca_cert_name}.pem'), 'w') as f: + f.write(wrap_certificate(ca_cert_data)) - ca_cert_chain = find_chain(ca_cert, pki_ca_certs) + for crl in ca_cert_crls: + with open(os.path.join(CRL_PATH, f'{ca_cert_name}_{crl_index}.pem'), 'w') as f: + f.write(wrap_crl(crl)) + crl_index += 1 cert_name = x509_conf['certificate'] cert_data = dict_search_args(pki, 'certificate', cert_name, 'certificate') key_data = dict_search_args(pki, 'certificate', cert_name, 'private', 'key') protected = 'passphrase' in x509_conf - for ca_cert_obj in ca_cert_chain: - with open(os.path.join(CA_PATH, f'{ca_cert_name}_{ca_index}.pem'), 'w') as f: - f.write(encode_certificate(ca_cert_obj)) - ca_index += 1 - - for crl in ca_cert_crls: - with open(os.path.join(CRL_PATH, f'{ca_cert_name}_{crl_index}.pem'), 'w') as f: - f.write(wrap_crl(crl)) - crl_index += 1 - with open(os.path.join(CERT_PATH, f'{cert_name}.pem'), 'w') as f: f.write(wrap_certificate(cert_data)) |