# Copyright 2021-2024 VyOS maintainers and contributors # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this library. If not, see . # Migrate Wireguard to store keys in CLI # Migrate EAPoL to PKI configuration import os from vyos.configtree import ConfigTree from vyos.pki import CERT_BEGIN from vyos.pki import load_certificate from vyos.pki import load_crl from vyos.pki import load_dh_parameters from vyos.pki import load_private_key from vyos.pki import encode_certificate from vyos.pki import encode_dh_parameters from vyos.pki import encode_private_key from vyos.pki import verify_crl from vyos.utils.process import run def wrapped_pem_to_config_value(pem): out = [] for line in pem.strip().split("\n"): if not line or line.startswith("-----") or line[0] == '#': continue out.append(line) return "".join(out) def read_file_for_pki(config_auth_path): full_path = os.path.join(AUTH_DIR, config_auth_path) output = None if os.path.isfile(full_path): if not os.access(full_path, os.R_OK): run(f'sudo chmod 644 {full_path}') with open(full_path, 'r') as f: output = f.read() return output AUTH_DIR = '/config/auth' pki_base = ['pki'] def migrate(config: ConfigTree) -> None: # OpenVPN base = ['interfaces', 'openvpn'] if config.exists(base): for interface in config.list_nodes(base): x509_base = base + [interface, 'tls'] pki_name = f'openvpn_{interface}' if config.exists(base + [interface, 'shared-secret-key-file']): if not config.exists(pki_base + ['openvpn', 'shared-secret']): config.set(pki_base + ['openvpn', 'shared-secret']) config.set_tag(pki_base + ['openvpn', 'shared-secret']) key_file = config.return_value(base + [interface, 'shared-secret-key-file']) key = read_file_for_pki(key_file) key_pki_name = f'{pki_name}_shared' if key: config.set(pki_base + ['openvpn', 'shared-secret', key_pki_name, 'key'], value=wrapped_pem_to_config_value(key)) config.set(pki_base + ['openvpn', 'shared-secret', key_pki_name, 'version'], value='1') config.set(base + [interface, 'shared-secret-key'], value=key_pki_name) else: print(f'Failed to migrate shared-secret-key on openvpn interface {interface}') config.delete(base + [interface, 'shared-secret-key-file']) if not config.exists(base + [interface, 'tls']): continue if config.exists(base + [interface, 'tls', 'auth-file']): if not config.exists(pki_base + ['openvpn', 'shared-secret']): config.set(pki_base + ['openvpn', 'shared-secret']) config.set_tag(pki_base + ['openvpn', 'shared-secret']) key_file = config.return_value(base + [interface, 'tls', 'auth-file']) key = read_file_for_pki(key_file) key_pki_name = f'{pki_name}_auth' if key: config.set(pki_base + ['openvpn', 'shared-secret', key_pki_name, 'key'], value=wrapped_pem_to_config_value(key)) config.set(pki_base + ['openvpn', 'shared-secret', key_pki_name, 'version'], value='1') config.set(base + [interface, 'tls', 'auth-key'], value=key_pki_name) else: print(f'Failed to migrate auth-key on openvpn interface {interface}') config.delete(base + [interface, 'tls', 'auth-file']) if config.exists(base + [interface, 'tls', 'crypt-file']): if not config.exists(pki_base + ['openvpn', 'shared-secret']): config.set(pki_base + ['openvpn', 'shared-secret']) config.set_tag(pki_base + ['openvpn', 'shared-secret']) key_file = config.return_value(base + [interface, 'tls', 'crypt-file']) key = read_file_for_pki(key_file) key_pki_name = f'{pki_name}_crypt' if key: config.set(pki_base + ['openvpn', 'shared-secret', key_pki_name, 'key'], value=wrapped_pem_to_config_value(key)) config.set(pki_base + ['openvpn', 'shared-secret', key_pki_name, 'version'], value='1') config.set(base + [interface, 'tls', 'crypt-key'], value=key_pki_name) else: print(f'Failed to migrate crypt-key on openvpn interface {interface}') config.delete(base + [interface, 'tls', 'crypt-file']) ca_certs = {} if config.exists(x509_base + ['ca-cert-file']): if not config.exists(pki_base + ['ca']): config.set(pki_base + ['ca']) config.set_tag(pki_base + ['ca']) cert_file = config.return_value(x509_base + ['ca-cert-file']) cert_path = os.path.join(AUTH_DIR, cert_file) if os.path.isfile(cert_path): if not os.access(cert_path, os.R_OK): run(f'sudo chmod 644 {cert_path}') with open(cert_path, 'r') as f: certs_str = f.read() certs_data = certs_str.split(CERT_BEGIN) index = 1 for cert_data in certs_data[1:]: cert = load_certificate(CERT_BEGIN + cert_data, wrap_tags=False) if cert: ca_certs[f'{pki_name}_{index}'] = cert cert_pem = encode_certificate(cert) config.set(pki_base + ['ca', f'{pki_name}_{index}', 'certificate'], value=wrapped_pem_to_config_value(cert_pem)) config.set(x509_base + ['ca-certificate'], value=f'{pki_name}_{index}', replace=False) else: print(f'Failed to migrate CA certificate on openvpn interface {interface}') index += 1 else: print(f'Failed to migrate CA certificate on openvpn interface {interface}') config.delete(x509_base + ['ca-cert-file']) if config.exists(x509_base + ['crl-file']): if not config.exists(pki_base + ['ca']): config.set(pki_base + ['ca']) config.set_tag(pki_base + ['ca']) crl_file = config.return_value(x509_base + ['crl-file']) crl_path = os.path.join(AUTH_DIR, crl_file) crl = None crl_ca_name = None if os.path.isfile(crl_path): if not os.access(crl_path, os.R_OK): run(f'sudo chmod 644 {crl_path}') with open(crl_path, 'r') as f: crl_data = f.read() crl = load_crl(crl_data, wrap_tags=False) for ca_name, ca_cert in ca_certs.items(): if verify_crl(crl, ca_cert): crl_ca_name = ca_name break if crl and crl_ca_name: crl_pem = encode_certificate(crl) config.set(pki_base + ['ca', crl_ca_name, 'crl'], value=wrapped_pem_to_config_value(crl_pem)) else: print(f'Failed to migrate CRL on openvpn interface {interface}') config.delete(x509_base + ['crl-file']) if config.exists(x509_base + ['cert-file']): if not config.exists(pki_base + ['certificate']): config.set(pki_base + ['certificate']) config.set_tag(pki_base + ['certificate']) cert_file = config.return_value(x509_base + ['cert-file']) cert_path = os.path.join(AUTH_DIR, cert_file) cert = None if os.path.isfile(cert_path): if not os.access(cert_path, os.R_OK): run(f'sudo chmod 644 {cert_path}') with open(cert_path, 'r') as f: cert_data = f.read() cert = load_certificate(cert_data, wrap_tags=False) if cert: cert_pem = encode_certificate(cert) config.set(pki_base + ['certificate', pki_name, 'certificate'], value=wrapped_pem_to_config_value(cert_pem)) config.set(x509_base + ['certificate'], value=pki_name) else: print(f'Failed to migrate certificate on openvpn interface {interface}') config.delete(x509_base + ['cert-file']) if config.exists(x509_base + ['key-file']): key_file = config.return_value(x509_base + ['key-file']) key_path = os.path.join(AUTH_DIR, key_file) key = None if os.path.isfile(key_path): if not os.access(key_path, os.R_OK): run(f'sudo chmod 644 {key_path}') with open(key_path, 'r') as f: key_data = f.read() key = load_private_key(key_data, passphrase=None, wrap_tags=False) if key: key_pem = encode_private_key(key, passphrase=None) config.set(pki_base + ['certificate', pki_name, 'private', 'key'], value=wrapped_pem_to_config_value(key_pem)) else: print(f'Failed to migrate private key on openvpn interface {interface}') config.delete(x509_base + ['key-file']) if config.exists(x509_base + ['dh-file']): if not config.exists(pki_base + ['dh']): config.set(pki_base + ['dh']) config.set_tag(pki_base + ['dh']) dh_file = config.return_value(x509_base + ['dh-file']) dh_path = os.path.join(AUTH_DIR, dh_file) dh = None if os.path.isfile(dh_path): if not os.access(dh_path, os.R_OK): run(f'sudo chmod 644 {dh_path}') with open(dh_path, 'r') as f: dh_data = f.read() dh = load_dh_parameters(dh_data, wrap_tags=False) if dh: dh_pem = encode_dh_parameters(dh) config.set(pki_base + ['dh', pki_name, 'parameters'], value=wrapped_pem_to_config_value(dh_pem)) config.set(x509_base + ['dh-params'], value=pki_name) else: print(f'Failed to migrate DH parameters on openvpn interface {interface}') config.delete(x509_base + ['dh-file']) # Wireguard base = ['interfaces', 'wireguard'] if config.exists(base): for interface in config.list_nodes(base): private_key_path = base + [interface, 'private-key'] key_file = 'default' if config.exists(private_key_path): key_file = config.return_value(private_key_path) full_key_path = f'/config/auth/wireguard/{key_file}/private.key' if not os.path.exists(full_key_path): print(f'Could not find wireguard private key for migration on interface "{interface}"') continue with open(full_key_path, 'r') as f: key_data = f.read().strip() config.set(private_key_path, value=key_data) for peer in config.list_nodes(base + [interface, 'peer']): config.rename(base + [interface, 'peer', peer, 'pubkey'], 'public-key') # Ethernet EAPoL base = ['interfaces', 'ethernet'] if config.exists(base): for interface in config.list_nodes(base): if not config.exists(base + [interface, 'eapol']): continue x509_base = base + [interface, 'eapol'] pki_name = f'eapol_{interface}' if config.exists(x509_base + ['ca-cert-file']): if not config.exists(pki_base + ['ca']): config.set(pki_base + ['ca']) config.set_tag(pki_base + ['ca']) cert_file = config.return_value(x509_base + ['ca-cert-file']) cert_path = os.path.join(AUTH_DIR, cert_file) cert = None if os.path.isfile(cert_path): if not os.access(cert_path, os.R_OK): run(f'sudo chmod 644 {cert_path}') with open(cert_path, 'r') as f: cert_data = f.read() cert = load_certificate(cert_data, wrap_tags=False) if cert: cert_pem = encode_certificate(cert) config.set(pki_base + ['ca', pki_name, 'certificate'], value=wrapped_pem_to_config_value(cert_pem)) config.set(x509_base + ['ca-certificate'], value=pki_name) else: print(f'Failed to migrate CA certificate on eapol config for interface {interface}') config.delete(x509_base + ['ca-cert-file']) if config.exists(x509_base + ['cert-file']): if not config.exists(pki_base + ['certificate']): config.set(pki_base + ['certificate']) config.set_tag(pki_base + ['certificate']) cert_file = config.return_value(x509_base + ['cert-file']) cert_path = os.path.join(AUTH_DIR, cert_file) cert = None if os.path.isfile(cert_path): if not os.access(cert_path, os.R_OK): run(f'sudo chmod 644 {cert_path}') with open(cert_path, 'r') as f: cert_data = f.read() cert = load_certificate(cert_data, wrap_tags=False) if cert: cert_pem = encode_certificate(cert) config.set(pki_base + ['certificate', pki_name, 'certificate'], value=wrapped_pem_to_config_value(cert_pem)) config.set(x509_base + ['certificate'], value=pki_name) else: print(f'Failed to migrate certificate on eapol config for interface {interface}') config.delete(x509_base + ['cert-file']) if config.exists(x509_base + ['key-file']): key_file = config.return_value(x509_base + ['key-file']) key_path = os.path.join(AUTH_DIR, key_file) key = None if os.path.isfile(key_path): if not os.access(key_path, os.R_OK): run(f'sudo chmod 644 {key_path}') with open(key_path, 'r') as f: key_data = f.read() key = load_private_key(key_data, passphrase=None, wrap_tags=False) if key: key_pem = encode_private_key(key, passphrase=None) config.set(pki_base + ['certificate', pki_name, 'private', 'key'], value=wrapped_pem_to_config_value(key_pem)) else: print(f'Failed to migrate private key on eapol config for interface {interface}') config.delete(x509_base + ['key-file'])