From 3b758d870449e92fece9e29c791b950b332e6e65 Mon Sep 17 00:00:00 2001
From: Christian Breunig <christian@breunig.cc>
Date: Tue, 2 Apr 2024 18:52:29 +0200
Subject: configverify: T6198: add common helper for PKI certificate validation

The next evolutional step after adding get_config_dict(..., with_pki=True) is
to add a common verification function for the recurring task of validating SSL
certificate existance in e.g. EAPoL, OpenConnect, SSTP or HTTPS.
---
 src/conf_mode/interfaces_ethernet.py          | 20 ++++++++-
 src/conf_mode/load-balancing_reverse-proxy.py | 38 ++++------------
 src/conf_mode/service_https.py                | 39 +++++-----------
 src/conf_mode/vpn_openconnect.py              | 52 +++++++--------------
 src/conf_mode/vpn_sstp.py                     | 65 +++++++--------------------
 5 files changed, 70 insertions(+), 144 deletions(-)

(limited to 'src')

diff --git a/src/conf_mode/interfaces_ethernet.py b/src/conf_mode/interfaces_ethernet.py
index 2c0f846c3..504d48f89 100755
--- a/src/conf_mode/interfaces_ethernet.py
+++ b/src/conf_mode/interfaces_ethernet.py
@@ -15,7 +15,6 @@
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
 import os
-import pprint
 
 from glob import glob
 from sys import exit
@@ -26,7 +25,6 @@ from vyos.configdict import get_interface_dict
 from vyos.configdict import is_node_changed
 from vyos.configverify import verify_address
 from vyos.configverify import verify_dhcpv6
-from vyos.configverify import verify_eapol
 from vyos.configverify import verify_interface_exists
 from vyos.configverify import verify_mirror_redirect
 from vyos.configverify import verify_mtu
@@ -34,6 +32,8 @@ from vyos.configverify import verify_mtu_ipv6
 from vyos.configverify import verify_vlan_config
 from vyos.configverify import verify_vrf
 from vyos.configverify import verify_bond_bridge_member
+from vyos.configverify import verify_pki_certificate
+from vyos.configverify import verify_pki_ca_certificate
 from vyos.ethtool import Ethtool
 from vyos.ifconfig import EthernetIf
 from vyos.ifconfig import BondIf
@@ -263,6 +263,22 @@ def verify_allowedbond_changes(ethernet: dict):
                               f' on interface "{ethernet["ifname"]}".' \
                               f' Interface is a bond member')
 
+def verify_eapol(ethernet: dict):
+    """
+    Common helper function used by interface implementations to perform
+    recurring validation of EAPoL configuration.
+    """
+    if 'eapol' not in ethernet:
+        return
+
+    if 'certificate' not in ethernet['eapol']:
+        raise ConfigError('Certificate must be specified when using EAPoL!')
+
+    verify_pki_certificate(ethernet, ethernet['eapol']['certificate'], no_password_protected=True)
+
+    if 'ca_certificate' in ethernet['eapol']:
+        for ca_cert in ethernet['eapol']['ca_certificate']:
+            verify_pki_ca_certificate(ethernet, ca_cert)
 
 def verify(ethernet):
     if 'deleted' in ethernet:
diff --git a/src/conf_mode/load-balancing_reverse-proxy.py b/src/conf_mode/load-balancing_reverse-proxy.py
index 2a0acd84a..694a4e1ea 100755
--- a/src/conf_mode/load-balancing_reverse-proxy.py
+++ b/src/conf_mode/load-balancing_reverse-proxy.py
@@ -20,6 +20,9 @@ from sys import exit
 from shutil import rmtree
 
 from vyos.config import Config
+from vyos.configverify import verify_pki_certificate
+from vyos.configverify import verify_pki_ca_certificate
+from vyos.utils.dict import dict_search
 from vyos.utils.process import call
 from vyos.utils.network import check_port_availability
 from vyos.utils.network import is_listen_port_bind_service
@@ -33,8 +36,7 @@ airbag.enable()
 load_balancing_dir = '/run/haproxy'
 load_balancing_conf_file = f'{load_balancing_dir}/haproxy.cfg'
 systemd_service = 'haproxy.service'
-systemd_override = r'/run/systemd/system/haproxy.service.d/10-override.conf'
-
+systemd_override = '/run/systemd/system/haproxy.service.d/10-override.conf'
 
 def get_config(config=None):
     if config:
@@ -54,30 +56,6 @@ def get_config(config=None):
 
     return lb
 
-
-def _verify_cert(lb: dict, config: dict) -> None:
-    if 'ca_certificate' in config['ssl']:
-        ca_name = config['ssl']['ca_certificate']
-        pki_ca = lb['pki'].get('ca')
-        if pki_ca is None:
-            raise ConfigError(f'CA certificates does not exist in PKI')
-        else:
-            ca = pki_ca.get(ca_name)
-            if ca is None:
-                raise ConfigError(f'CA certificate "{ca_name}" does not exist')
-
-    elif 'certificate' in config['ssl']:
-        cert_names = config['ssl']['certificate']
-        pki_certs = lb['pki'].get('certificate')
-        if pki_certs is None:
-            raise ConfigError(f'Certificates does not exist in PKI')
-
-        for cert_name in cert_names:
-            pki_cert = pki_certs.get(cert_name)
-            if pki_cert is None:
-                raise ConfigError(f'Certificate "{cert_name}" does not exist')
-
-
 def verify(lb):
     if not lb:
         return None
@@ -107,12 +85,12 @@ def verify(lb):
                 raise ConfigError(f'Cannot use both "send-proxy" and "send-proxy-v2" for server "{bk_server}"')
 
     for front, front_config in lb['service'].items():
-        if 'ssl' in front_config:
-            _verify_cert(lb, front_config)
+        for cert in dict_search('ssl.certificate', front_config) or []:
+            verify_pki_certificate(lb, cert)
 
     for back, back_config in lb['backend'].items():
-        if 'ssl' in back_config:
-            _verify_cert(lb, back_config)
+        tmp = dict_search('ssl.ca_certificate', front_config)
+        if tmp: verify_pki_ca_certificate(lb, tmp)
 
 
 def generate(lb):
diff --git a/src/conf_mode/service_https.py b/src/conf_mode/service_https.py
index 46efc3c93..9e58b4c72 100755
--- a/src/conf_mode/service_https.py
+++ b/src/conf_mode/service_https.py
@@ -24,13 +24,14 @@ from time import sleep
 from vyos.base import Warning
 from vyos.config import Config
 from vyos.config import config_dict_merge
-from vyos.configdiff import get_config_diff
 from vyos.configverify import verify_vrf
+from vyos.configverify import verify_pki_certificate
+from vyos.configverify import verify_pki_ca_certificate
+from vyos.configverify import verify_pki_dh_parameters
 from vyos.defaults import api_config_state
 from vyos.pki import wrap_certificate
 from vyos.pki import wrap_private_key
 from vyos.pki import wrap_dh_parameters
-from vyos.pki import load_dh_parameters
 from vyos.template import render
 from vyos.utils.dict import dict_search
 from vyos.utils.process import call
@@ -84,33 +85,14 @@ def verify(https):
     if https is None:
         return None
 
-    if 'certificates' in https and 'certificate' in https['certificates']:
-        cert_name = https['certificates']['certificate']
-        if 'pki' not in https:
-            raise ConfigError('PKI is not configured!')
-
-        if cert_name not in https['pki']['certificate']:
-            raise ConfigError('Invalid certificate in configuration!')
+    if dict_search('certificates.certificate', https) != None:
+        verify_pki_certificate(https, https['certificates']['certificate'])
 
-        pki_cert = https['pki']['certificate'][cert_name]
-
-        if 'certificate' not in pki_cert:
-            raise ConfigError('Missing certificate in configuration!')
+        tmp = dict_search('certificates.ca_certificate', https)
+        if tmp != None: verify_pki_ca_certificate(https, tmp)
 
-        if 'private' not in pki_cert or 'key' not in pki_cert['private']:
-            raise ConfigError('Missing certificate private key in configuration!')
-
-        if 'dh_params' in https['certificates']:
-            dh_name = https['certificates']['dh_params']
-            if dh_name not in https['pki']['dh']:
-                raise ConfigError('Invalid DH parameter in configuration!')
-
-            pki_dh = https['pki']['dh'][dh_name]
-            dh_params = load_dh_parameters(pki_dh['parameters'])
-            dh_numbers = dh_params.parameter_numbers()
-            dh_bits = dh_numbers.p.bit_length()
-            if dh_bits < 2048:
-                raise ConfigError(f'Minimum DH key-size is 2048 bits')
+        tmp = dict_search('certificates.dh_params', https)
+        if tmp != None: verify_pki_dh_parameters(https, tmp, 2048)
 
     else:
         Warning('No certificate specified, using build-in self-signed certificates. '\
@@ -214,7 +196,8 @@ def apply(https):
     https_service_name = 'nginx.service'
 
     if https is None:
-        call(f'systemctl stop {http_api_service_name}')
+        if is_systemd_service_active(http_api_service_name):
+            call(f'systemctl stop {http_api_service_name}')
         call(f'systemctl stop {https_service_name}')
         return
 
diff --git a/src/conf_mode/vpn_openconnect.py b/src/conf_mode/vpn_openconnect.py
index 08e4fc6db..8159fedea 100755
--- a/src/conf_mode/vpn_openconnect.py
+++ b/src/conf_mode/vpn_openconnect.py
@@ -1,6 +1,6 @@
 #!/usr/bin/env python3
 #
-# Copyright (C) 2018-2023 VyOS maintainers and contributors
+# Copyright (C) 2018-2024 VyOS maintainers and contributors
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License version 2 or later as
@@ -19,6 +19,8 @@ from sys import exit
 
 from vyos.base import Warning
 from vyos.config import Config
+from vyos.configverify import verify_pki_certificate
+from vyos.configverify import verify_pki_ca_certificate
 from vyos.pki import wrap_certificate
 from vyos.pki import wrap_private_key
 from vyos.template import render
@@ -75,7 +77,7 @@ def verify(ocserv):
     if "accounting" in ocserv:
         if "mode" in ocserv["accounting"] and "radius" in ocserv["accounting"]["mode"]:
             if not origin["accounting"]['radius']['server']:
-                raise ConfigError('Openconnect accounting mode radius requires at least one RADIUS server')
+                raise ConfigError('OpenConnect accounting mode radius requires at least one RADIUS server')
             if "authentication" not in ocserv or "mode" not in ocserv["authentication"]:
                 raise ConfigError('Accounting depends on OpenConnect authentication configuration')
             elif "radius" not in ocserv["authentication"]["mode"]:
@@ -89,12 +91,12 @@ def verify(ocserv):
                     raise ConfigError('OpenConnect authentication modes are mutually-exclusive, remove either local or radius from your configuration')
             if "radius" in ocserv["authentication"]["mode"]:
                 if not ocserv["authentication"]['radius']['server']:
-                    raise ConfigError('Openconnect authentication mode radius requires at least one RADIUS server')
+                    raise ConfigError('OpenConnect authentication mode radius requires at least one RADIUS server')
             if "local" in ocserv["authentication"]["mode"]:
                 if not ocserv.get("authentication", {}).get("local_users"):
-                    raise ConfigError('openconnect mode local required at least one user')
+                    raise ConfigError('OpenConnect mode local required at least one user')
                 if not ocserv["authentication"]["local_users"]["username"]:
-                    raise ConfigError('openconnect mode local required at least one user')
+                    raise ConfigError('OpenConnect mode local required at least one user')
                 else:
                     # For OTP mode: verify that each local user has an OTP key
                     if "otp" in ocserv["authentication"]["mode"]["local"]:
@@ -127,40 +129,20 @@ def verify(ocserv):
                     if 'default_config' not in ocserv["authentication"]["identity_based_config"]:
                         raise ConfigError('OpenConnect identity-based-config enabled but default-config not set')
         else:
-            raise ConfigError('openconnect authentication mode required')
+            raise ConfigError('OpenConnect authentication mode required')
     else:
-        raise ConfigError('openconnect authentication credentials required')
+        raise ConfigError('OpenConnect authentication credentials required')
 
     # Check ssl
     if 'ssl' not in ocserv:
-        raise ConfigError('openconnect ssl required')
+        raise ConfigError('SSL missing on OpenConnect config!')
 
-    if not ocserv['pki'] or 'certificate' not in ocserv['pki']:
-        raise ConfigError('PKI not configured')
+    if 'certificate' not in ocserv['ssl']:
+        raise ConfigError('SSL certificate missing on OpenConnect config!')
+    verify_pki_certificate(ocserv, ocserv['ssl']['certificate'])
 
-    ssl = ocserv['ssl']
-    if 'certificate' not in ssl:
-        raise ConfigError('openconnect ssl certificate required')
-
-    cert_name = ssl['certificate']
-
-    if cert_name not in ocserv['pki']['certificate']:
-        raise ConfigError('Invalid openconnect ssl certificate')
-
-    cert = ocserv['pki']['certificate'][cert_name]
-
-    if 'certificate' not in cert:
-        raise ConfigError('Missing certificate in PKI')
-
-    if 'private' not in cert or 'key' not in cert['private']:
-        raise ConfigError('Missing private key in PKI')
-
-    if 'ca_certificate' in ssl:
-        if 'ca' not in ocserv['pki']:
-            raise ConfigError('PKI not configured')
-
-        if ssl['ca_certificate'] not in ocserv['pki']['ca']:
-            raise ConfigError('Invalid openconnect ssl CA certificate')
+    if 'ca_certificate' in ocserv['ssl']:
+        verify_pki_ca_certificate(ocserv, ocserv['ssl']['ca_certificate'])
 
     # Check network settings
     if "network_settings" in ocserv:
@@ -172,7 +154,7 @@ def verify(ocserv):
         else:
             ocserv["network_settings"]["push_route"] = ["default"]
     else:
-        raise ConfigError('openconnect network settings required')
+        raise ConfigError('OpenConnect network settings required!')
 
 def generate(ocserv):
     if not ocserv:
@@ -276,7 +258,7 @@ def apply(ocserv):
                 break
             sleep(0.250)
             if counter > 5:
-                raise ConfigError('openconnect failed to start, check the logs for details')
+                raise ConfigError('OpenConnect failed to start, check the logs for details')
                 break
             counter += 1
 
diff --git a/src/conf_mode/vpn_sstp.py b/src/conf_mode/vpn_sstp.py
index 8661a8aff..7490fd0e0 100755
--- a/src/conf_mode/vpn_sstp.py
+++ b/src/conf_mode/vpn_sstp.py
@@ -20,6 +20,8 @@ from sys import exit
 
 from vyos.config import Config
 from vyos.configdict import get_accel_dict
+from vyos.configverify import verify_pki_certificate
+from vyos.configverify import verify_pki_ca_certificate
 from vyos.pki import wrap_certificate
 from vyos.pki import wrap_private_key
 from vyos.template import render
@@ -46,51 +48,6 @@ cert_key_path = os.path.join(cfg_dir, 'sstp-cert.key')
 ca_cert_file_path = os.path.join(cfg_dir, 'sstp-ca.pem')
 
 
-def verify_certificate(config):
-    #
-    # SSL certificate checks
-    #
-    if not config['pki']:
-        raise ConfigError('PKI is not configured')
-
-    if 'ssl' not in config:
-        raise ConfigError('SSL missing on SSTP config')
-
-    ssl = config['ssl']
-
-    # CA
-    if 'ca_certificate' not in ssl:
-        raise ConfigError('SSL CA certificate missing on SSTP config')
-
-    ca_name = ssl['ca_certificate']
-
-    if ca_name not in config['pki']['ca']:
-        raise ConfigError('Invalid CA certificate on SSTP config')
-
-    if 'certificate' not in config['pki']['ca'][ca_name]:
-        raise ConfigError('Missing certificate data for CA certificate on SSTP config')
-
-    # Certificate
-    if 'certificate' not in ssl:
-        raise ConfigError('SSL certificate missing on SSTP config')
-
-    cert_name = ssl['certificate']
-
-    if cert_name not in config['pki']['certificate']:
-        raise ConfigError('Invalid certificate on SSTP config')
-
-    pki_cert = config['pki']['certificate'][cert_name]
-
-    if 'certificate' not in pki_cert:
-        raise ConfigError('Missing certificate data for certificate on SSTP config')
-
-    if 'private' not in pki_cert or 'key' not in pki_cert['private']:
-        raise ConfigError('Missing private key for certificate on SSTP config')
-
-    if 'password_protected' in pki_cert['private']:
-        raise ConfigError('Encrypted private key is not supported on SSTP config')
-
-
 def get_config(config=None):
     if config:
         conf = config
@@ -124,7 +81,17 @@ def verify(sstp):
     verify_accel_ppp_ip_pool(sstp)
     verify_accel_ppp_name_servers(sstp)
     verify_accel_ppp_wins_servers(sstp)
-    verify_certificate(sstp)
+
+    if 'ssl' not in sstp:
+        raise ConfigError('SSL missing on SSTP config!')
+
+    if 'certificate' not in sstp['ssl']:
+        raise ConfigError('SSL certificate missing on SSTP config!')
+    verify_pki_certificate(sstp, sstp['ssl']['certificate'])
+
+    if 'ca_certificate' not in sstp['ssl']:
+        raise ConfigError('SSL CA certificate missing on SSTP config!')
+    verify_pki_ca_certificate(sstp, sstp['ssl']['ca_certificate'])
 
 
 def generate(sstp):
@@ -154,15 +121,15 @@ def generate(sstp):
 
 
 def apply(sstp):
+    systemd_service = 'accel-ppp@sstp.service'
     if not sstp:
-        call('systemctl stop accel-ppp@sstp.service')
+        call(f'systemctl stop {systemd_service}')
         for file in [sstp_chap_secrets, sstp_conf]:
             if os.path.exists(file):
                 os.unlink(file)
-
         return None
 
-    call('systemctl restart accel-ppp@sstp.service')
+    call(f'systemctl reload-or-restart {systemd_service}')
 
 
 if __name__ == '__main__':
-- 
cgit v1.2.3