diff options
| author | Christian Breunig <christian@breunig.cc> | 2024-10-19 15:49:25 +0200 | 
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-10-19 15:49:25 +0200 | 
| commit | 1bf2cf737e9e7f09ee1d5c62691cd5bfa6eb9f05 (patch) | |
| tree | aa9690bd054958a2819aca16ddc15b121d28d1e4 | |
| parent | 3f933f1642debfca5c9e3873790a5742ef242c75 (diff) | |
| parent | 52e3ba35c09237744340c2cce4cba1783ec3f2be (diff) | |
| download | vyos-1x-1bf2cf737e9e7f09ee1d5c62691cd5bfa6eb9f05.tar.gz vyos-1x-1bf2cf737e9e7f09ee1d5c62691cd5bfa6eb9f05.zip | |
Merge pull request #4165 from natali-rs1985/T4914-current
pki: T4914: Rewrite the PKI op mode in the new style
| -rw-r--r-- | op-mode-definitions/pki.xml.in | 108 | ||||
| -rw-r--r-- | python/vyos/configsession.py | 3 | ||||
| -rw-r--r-- | python/vyos/opmode.py | 5 | ||||
| -rwxr-xr-x | src/op_mode/pki.py | 820 | ||||
| -rw-r--r-- | src/services/api/rest/routers.py | 4 | 
5 files changed, 603 insertions, 337 deletions
| diff --git a/op-mode-definitions/pki.xml.in b/op-mode-definitions/pki.xml.in index 254ef08cc..866f482bf 100644 --- a/op-mode-definitions/pki.xml.in +++ b/op-mode-definitions/pki.xml.in @@ -27,7 +27,7 @@                          <list><filename></list>                        </completionHelp>                      </properties> -                    <command>sudo -E ${vyos_op_scripts_dir}/pki.py --action generate --ca "$7" --sign "$5" --file</command> +                    <command>sudo -E ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type ca --name "$7" --sign "$5" --file</command>                    </tagNode>                    <tagNode name="install">                      <properties> @@ -36,10 +36,10 @@                          <list><certificate name></list>                        </completionHelp>                      </properties> -                    <command>${vyos_op_scripts_dir}/pki.py --action generate --ca "$7" --sign "$5" --install</command> +                    <command>${vyos_op_scripts_dir}/pki.py generate_pki --pki-type ca --name "$7" --sign "$5" --install</command>                    </tagNode>                  </children> -                <command>${vyos_op_scripts_dir}/pki.py --action generate --ca "noname" --sign "$5"</command> +                <command>${vyos_op_scripts_dir}/pki.py generate_pki --pki-type ca --sign "$5"</command>                </tagNode>                <tagNode name="file">                  <properties> @@ -48,7 +48,7 @@                      <list><filename></list>                    </completionHelp>                  </properties> -                <command>sudo -E ${vyos_op_scripts_dir}/pki.py --action generate --ca "$5" --file</command> +                <command>sudo -E ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type ca --name "$5" --file</command>                </tagNode>                <tagNode name="install">                  <properties> @@ -57,10 +57,10 @@                      <list><CA name></list>                    </completionHelp>                  </properties> -                <command>${vyos_op_scripts_dir}/pki.py --action generate --ca "$5" --install</command> +                <command>${vyos_op_scripts_dir}/pki.py generate_pki --pki-type ca --name "$5" --install</command>                </tagNode>              </children> -            <command>${vyos_op_scripts_dir}/pki.py --action generate --ca "noname"</command> +            <command>${vyos_op_scripts_dir}/pki.py generate_pki --pki-type ca</command>            </node>            <node name="certificate">              <properties> @@ -79,7 +79,7 @@                          <list><filename></list>                        </completionHelp>                      </properties> -                    <command>sudo -E ${vyos_op_scripts_dir}/pki.py --action generate --certificate "$6" --self-sign --file</command> +                    <command>sudo -E ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type certificate --name "$6" --self-sign --file</command>                    </tagNode>                    <tagNode name="install">                      <properties> @@ -88,10 +88,10 @@                          <list><certificate name></list>                        </completionHelp>                      </properties> -                    <command>${vyos_op_scripts_dir}/pki.py --action generate --certificate "$6" --self-sign --install</command> +                    <command>${vyos_op_scripts_dir}/pki.py generate_pki --pki-type certificate --name "$6" --self-sign --install</command>                    </tagNode>                  </children> -                <command>${vyos_op_scripts_dir}/pki.py --action generate --certificate "noname" --self-sign</command> +                <command>${vyos_op_scripts_dir}/pki.py generate_pki --pki-type certificate --self-sign</command>                </node>                <tagNode name="sign">                  <properties> @@ -108,7 +108,7 @@                          <list><filename></list>                        </completionHelp>                      </properties> -                    <command>sudo -E ${vyos_op_scripts_dir}/pki.py --action generate --certificate "$7" --sign "$5" --file</command> +                    <command>sudo -E ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type certificate --name "$7" --sign "$5" --file</command>                    </tagNode>                    <tagNode name="install">                      <properties> @@ -117,10 +117,10 @@                          <list><certificate name></list>                        </completionHelp>                      </properties> -                    <command>${vyos_op_scripts_dir}/pki.py --action generate --certificate "$7" --sign "$5" --install</command> +                    <command>${vyos_op_scripts_dir}/pki.py generate_pki --pki-type certificate --name "$7" --sign "$5" --install</command>                    </tagNode>                  </children> -                <command>${vyos_op_scripts_dir}/pki.py --action generate --certificate "noname" --sign "$5"</command> +                <command>${vyos_op_scripts_dir}/pki.py generate_pki --pki-type certificate --sign "$5"</command>                </tagNode>                <tagNode name="file">                  <properties> @@ -129,7 +129,7 @@                      <list><filename></list>                    </completionHelp>                  </properties> -                <command>sudo -E ${vyos_op_scripts_dir}/pki.py --action generate --certificate "$5" --file</command> +                <command>sudo -E ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type certificate --name "$5" --file</command>                </tagNode>                <tagNode name="install">                  <properties> @@ -138,10 +138,10 @@                      <list><certificate name></list>                    </completionHelp>                  </properties> -                <command>${vyos_op_scripts_dir}/pki.py --action generate --certificate "$5" --install</command> +                <command>${vyos_op_scripts_dir}/pki.py generate_pki --pki-type certificate --name "$5" --install</command>                </tagNode>              </children> -            <command>${vyos_op_scripts_dir}/pki.py --action generate --certificate "noname"</command> +            <command>${vyos_op_scripts_dir}/pki.py generate_pki --pki-type certificate</command>            </node>            <tagNode name="crl">              <properties> @@ -158,16 +158,16 @@                      <list><filename></list>                    </completionHelp>                  </properties> -                <command>sudo -E ${vyos_op_scripts_dir}/pki.py --action generate --crl "$4" --file</command> +                <command>sudo -E ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type crl --name "$4" --file</command>                </tagNode>                <leafNode name="install">                  <properties>                    <help>Commands for installing generated CRL into running configuration</help>                  </properties> -                <command>${vyos_op_scripts_dir}/pki.py --action generate --crl "$4" --install</command> +                <command>${vyos_op_scripts_dir}/pki.py generate_pki --pki-type crl --name "$4" --install</command>                </leafNode>              </children> -            <command>${vyos_op_scripts_dir}/pki.py --action generate --crl "$4"</command> +            <command>${vyos_op_scripts_dir}/pki.py generate_pki --pki-type crl --name "$4"</command>            </tagNode>            <node name="dh">              <properties> @@ -181,7 +181,7 @@                      <list><filename></list>                    </completionHelp>                  </properties> -                <command>sudo -E ${vyos_op_scripts_dir}/pki.py --action generate --dh "$5" --file</command> +                <command>sudo -E ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type dh --name "$5" --file</command>                </tagNode>                <tagNode name="install">                  <properties> @@ -190,10 +190,10 @@                      <list><DH name></list>                    </completionHelp>                  </properties> -                <command>${vyos_op_scripts_dir}/pki.py --action generate --dh "$5" --install</command> +                <command>${vyos_op_scripts_dir}/pki.py generate_pki --pki-type dh --name "$5" --install</command>                </tagNode>              </children> -            <command>${vyos_op_scripts_dir}/pki.py --action generate --dh "noname"</command> +            <command>${vyos_op_scripts_dir}/pki.py generate_pki --pki-type dh</command>            </node>            <node name="key-pair">              <properties> @@ -207,7 +207,7 @@                      <list><filename></list>                    </completionHelp>                  </properties> -                <command>sudo -E ${vyos_op_scripts_dir}/pki.py --action generate --keypair "$5" --file</command> +                <command>sudo -E ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type key-pair --name "$5" --file</command>                </tagNode>                <tagNode name="install">                  <properties> @@ -216,10 +216,10 @@                      <list><key name></list>                    </completionHelp>                  </properties> -                <command>${vyos_op_scripts_dir}/pki.py --action generate --keypair "$5" --install</command> +                <command>${vyos_op_scripts_dir}/pki.py generate_pki --pki-type key-pair --name "$5" --install</command>                </tagNode>              </children> -            <command>${vyos_op_scripts_dir}/pki.py --action generate --keypair "noname"</command> +            <command>${vyos_op_scripts_dir}/pki.py generate_pki --pki-type key-pair</command>            </node>            <node name="openvpn">              <properties> @@ -238,7 +238,7 @@                          <list><filename></list>                        </completionHelp>                      </properties> -                    <command>sudo -E ${vyos_op_scripts_dir}/pki.py --action generate --openvpn "$6" --file</command> +                    <command>sudo -E ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type openvpn --name "$6" --file</command>                    </tagNode>                    <tagNode name="install">                      <properties> @@ -247,10 +247,10 @@                          <list><key name></list>                        </completionHelp>                      </properties> -                    <command>${vyos_op_scripts_dir}/pki.py --action generate --openvpn "$6" --install</command> +                    <command>${vyos_op_scripts_dir}/pki.py generate_pki --pki-type openvpn --name "$6" --install</command>                    </tagNode>                  </children> -                <command>${vyos_op_scripts_dir}/pki.py --action generate --openvpn "noname"</command> +                <command>${vyos_op_scripts_dir}/pki.py generate_pki --pki-type openvpn</command>                </node>              </children>            </node> @@ -266,7 +266,7 @@                      <list><filename></list>                    </completionHelp>                  </properties> -                <command>sudo -E ${vyos_op_scripts_dir}/pki.py --action generate --ssh "$5" --file</command> +                <command>sudo -E ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type ssh --name "$5" --file</command>                </tagNode>                <tagNode name="install">                  <properties> @@ -275,10 +275,10 @@                      <list><key name></list>                    </completionHelp>                  </properties> -                <command>${vyos_op_scripts_dir}/pki.py --action generate --ssh "$5" --install</command> +                <command>${vyos_op_scripts_dir}/pki.py generate_pki --pki-type ssh --name "$5" --install</command>                </tagNode>              </children> -            <command>${vyos_op_scripts_dir}/pki.py --action generate --ssh "noname"</command> +            <command>${vyos_op_scripts_dir}/pki.py generate_pki --pki-type ssh</command>            </node>            <node name="wireguard">              <properties> @@ -302,12 +302,12 @@                              <path>interfaces wireguard</path>                            </completionHelp>                          </properties> -                        <command>${vyos_op_scripts_dir}/pki.py --action generate --wireguard --key --interface "$7" --install</command> +                        <command>${vyos_op_scripts_dir}/pki.py generate_pki --pki-type wireguard --key --interface "$7" --install</command>                        </tagNode>                      </children>                    </node>                  </children> -                <command>${vyos_op_scripts_dir}/pki.py --action generate --wireguard --key</command> +                <command>${vyos_op_scripts_dir}/pki.py generate_pki --pki-type wireguard --key</command>                </node>                <node name="preshared-key">                  <properties> @@ -334,14 +334,14 @@                                  <path>interfaces wireguard ${COMP_WORDS[COMP_CWORD-2]} peer</path>                                </completionHelp>                              </properties> -                            <command>${vyos_op_scripts_dir}/pki.py --action generate --wireguard --psk --interface "$7" --peer "$9" --install</command> +                            <command>${vyos_op_scripts_dir}/pki.py generate_pki --pki-type wireguard --psk --interface "$7" --peer "$9" --install</command>                            </tagNode>                          </children>                        </tagNode>                      </children>                    </node>                  </children> -                <command>${vyos_op_scripts_dir}/pki.py --action generate --wireguard --psk</command> +                <command>${vyos_op_scripts_dir}/pki.py generate_pki --pki-type wireguard --psk</command>                </node>              </children>            </node> @@ -371,13 +371,13 @@                  <properties>                    <help>Path to CA certificate file</help>                  </properties> -                <command>sudo -E ${vyos_op_scripts_dir}/pki.py --action import --ca "$4" --filename "$6"</command> +                <command>sudo -E ${vyos_op_scripts_dir}/pki.py import_pki --pki-type ca --name "$4" --filename "$6"</command>                </tagNode>                <tagNode name="key-file">                  <properties>                    <help>Path to private key file</help>                  </properties> -                <command>sudo -E ${vyos_op_scripts_dir}/pki.py --action import --ca "$4" --key-filename "$6"</command> +                <command>sudo -E ${vyos_op_scripts_dir}/pki.py import_pki --pki-type ca --name "$4" --key-filename "$6"</command>                </tagNode>              </children>            </tagNode> @@ -393,13 +393,13 @@                  <properties>                    <help>Path to certificate file</help>                  </properties> -                <command>sudo -E ${vyos_op_scripts_dir}/pki.py --action import --certificate "$4" --filename "$6"</command> +                <command>sudo -E ${vyos_op_scripts_dir}/pki.py import_pki --pki-type certificate --name "$4" --filename "$6"</command>                </tagNode>                <tagNode name="key-file">                  <properties>                    <help>Path to private key file</help>                  </properties> -                <command>sudo -E ${vyos_op_scripts_dir}/pki.py --action import --certificate "$4" --key-filename "$6"</command> +                <command>sudo -E ${vyos_op_scripts_dir}/pki.py import_pki --pki-type certificate --name "$4" --key-filename "$6"</command>                </tagNode>              </children>            </tagNode> @@ -415,7 +415,7 @@                  <properties>                    <help>Path to CRL file</help>                  </properties> -                <command>sudo -E ${vyos_op_scripts_dir}/pki.py --action import --crl "$4" --filename "$6"</command> +                <command>sudo -E ${vyos_op_scripts_dir}/pki.py import_pki --pki-type crl --name "$4" --filename "$6"</command>                </tagNode>              </children>            </tagNode> @@ -431,7 +431,7 @@                  <properties>                    <help>Path to DH parameters file</help>                  </properties> -                <command>sudo -E ${vyos_op_scripts_dir}/pki.py --action import --dh "$4" --filename "$6"</command> +                <command>sudo -E ${vyos_op_scripts_dir}/pki.py import_pki --pki-type dh --name "$4" --filename "$6"</command>                </tagNode>              </children>            </tagNode> @@ -447,13 +447,13 @@                  <properties>                    <help>Path to public key file</help>                  </properties> -                <command>sudo -E ${vyos_op_scripts_dir}/pki.py --action import --keypair "$4" --filename "$6"</command> +                <command>sudo -E ${vyos_op_scripts_dir}/pki.py import_pki --pki-type key-pair --name "$4" --filename "$6"</command>                </tagNode>                <tagNode name="private-file">                  <properties>                    <help>Path to private key file</help>                  </properties> -                <command>sudo -E ${vyos_op_scripts_dir}/pki.py --action import --keypair "$4" --key-filename "$6"</command> +                <command>sudo -E ${vyos_op_scripts_dir}/pki.py import_pki --pki-type key-pair --name "$4" --key-filename "$6"</command>                </tagNode>              </children>            </tagNode> @@ -474,7 +474,7 @@                      <properties>                        <help>Path to shared secret key file</help>                      </properties> -                    <command>sudo -E ${vyos_op_scripts_dir}/pki.py --action import --openvpn "$5" --filename "$7"</command> +                    <command>sudo -E ${vyos_op_scripts_dir}/pki.py import_pki --pki-type openvpn --name "$5" --filename "$7"</command>                    </tagNode>                  </children>                </tagNode> @@ -490,13 +490,13 @@          <properties>            <help>Show PKI x509 certificates</help>          </properties> -        <command>sudo ${vyos_op_scripts_dir}/pki.py --action show</command> +        <command>sudo ${vyos_op_scripts_dir}/pki.py show_all</command>          <children>            <leafNode name="ca">              <properties>                <help>Show x509 CA certificates</help>              </properties> -            <command>sudo ${vyos_op_scripts_dir}/pki.py --action show --ca "all"</command> +            <command>sudo ${vyos_op_scripts_dir}/pki.py show_certificate_authority</command>            </leafNode>            <tagNode name="ca">              <properties> @@ -505,13 +505,13 @@                  <path>pki ca</path>                </completionHelp>              </properties> -            <command>sudo ${vyos_op_scripts_dir}/pki.py --action show --ca "$4"</command> +            <command>sudo ${vyos_op_scripts_dir}/pki.py show_certificate_authority --name "$4"</command>              <children>                <leafNode name="pem">                  <properties>                    <help>Show x509 CA certificate in PEM format</help>                  </properties> -                <command>sudo ${vyos_op_scripts_dir}/pki.py --action show --ca "$4" --pem</command> +                <command>sudo ${vyos_op_scripts_dir}/pki.py show_certificate_authority --name "$4" --pem</command>                </leafNode>              </children>            </tagNode> @@ -519,7 +519,7 @@              <properties>                <help>Show x509 certificates</help>              </properties> -            <command>sudo ${vyos_op_scripts_dir}/pki.py --action show --certificate "all"</command> +            <command>sudo ${vyos_op_scripts_dir}/pki.py show_certificate</command>            </leafNode>            <tagNode name="certificate">              <properties> @@ -528,13 +528,13 @@                  <path>pki certificate</path>                </completionHelp>              </properties> -            <command>sudo ${vyos_op_scripts_dir}/pki.py --action show --certificate "$4"</command> +            <command>sudo ${vyos_op_scripts_dir}/pki.py show_certificate --name "$4"</command>              <children>                <leafNode name="pem">                  <properties>                    <help>Show x509 certificate in PEM format</help>                  </properties> -                <command>sudo ${vyos_op_scripts_dir}/pki.py --action show --certificate "$4" --pem</command> +                <command>sudo ${vyos_op_scripts_dir}/pki.py show_certificate --name "$4" --pem</command>                </leafNode>                <tagNode name="fingerprint">                  <properties> @@ -543,7 +543,7 @@                      <list>sha256 sha384 sha512</list>                    </completionHelp>                  </properties> -                <command>sudo ${vyos_op_scripts_dir}/pki.py --action show --certificate "$4" --fingerprint "$6"</command> +                <command>sudo ${vyos_op_scripts_dir}/pki.py show_certificate --name "$4" --fingerprint "$6"</command>                </tagNode>              </children>            </tagNode> @@ -551,7 +551,7 @@              <properties>                <help>Show x509 certificate revocation lists</help>              </properties> -            <command>${vyos_op_scripts_dir}/pki.py --action show --crl "all"</command> +            <command>${vyos_op_scripts_dir}/pki.py show_crl</command>            </leafNode>            <tagNode name="crl">              <properties> @@ -560,13 +560,13 @@                  <path>pki ca</path>                </completionHelp>              </properties> -            <command>${vyos_op_scripts_dir}/pki.py --action show --crl "$4"</command> +            <command>${vyos_op_scripts_dir}/pki.py show_crl --name "$4"</command>              <children>                <leafNode name="pem">                  <properties>                    <help>Show x509 certificate revocation lists by CA name in PEM format</help>                  </properties> -                <command>${vyos_op_scripts_dir}/pki.py --action show --crl "$4" --pem</command> +                <command>${vyos_op_scripts_dir}/pki.py show_crl --name "$4" --pem</command>                </leafNode>              </children>            </tagNode> diff --git a/python/vyos/configsession.py b/python/vyos/configsession.py index 9c56d246a..5876dc5b0 100644 --- a/python/vyos/configsession.py +++ b/python/vyos/configsession.py @@ -42,8 +42,7 @@ INSTALL_IMAGE = [  IMPORT_PKI = ['/opt/vyatta/bin/vyatta-op-cmd-wrapper', 'import']  IMPORT_PKI_NO_PROMPT = [      '/usr/libexec/vyos/op_mode/pki.py', -    '--action', -    'import', +    'import_pki',      '--no-prompt',  ]  REMOVE_IMAGE = [ diff --git a/python/vyos/opmode.py b/python/vyos/opmode.py index 066c8058f..136ac35e2 100644 --- a/python/vyos/opmode.py +++ b/python/vyos/opmode.py @@ -89,7 +89,10 @@ class InternalError(Error):  def _is_op_mode_function_name(name): -    if re.match(r"^(show|clear|reset|restart|add|update|delete|generate|set|renew|release|execute)", name): +    if re.match( +        r'^(show|clear|reset|restart|add|update|delete|generate|set|renew|release|execute|import)', +        name, +    ):          return True      else:          return False diff --git a/src/op_mode/pki.py b/src/op_mode/pki.py index 5652a5d74..49a461e9e 100755 --- a/src/op_mode/pki.py +++ b/src/op_mode/pki.py @@ -14,16 +14,18 @@  # You should have received a copy of the GNU General Public License  # along with this program.  If not, see <http://www.gnu.org/licenses/>. -import argparse  import ipaddress  import os  import re  import sys  import tabulate +import typing  from cryptography import x509  from cryptography.x509.oid import ExtendedKeyUsageOID +import vyos.opmode +  from vyos.config import Config  from vyos.config import config_dict_mangle_acme  from vyos.pki import encode_certificate @@ -51,18 +53,50 @@ from vyos.utils.process import cmd  CERT_REQ_END = '-----END CERTIFICATE REQUEST-----'  auth_dir = '/config/auth' +ArgsPkiType = typing.Literal['ca', 'certificate', 'dh', 'key-pair', 'openvpn', 'crl'] +ArgsPkiTypeGen = typing.Literal[ArgsPkiType, typing.Literal['ssh', 'wireguard']] +ArgsFingerprint = typing.Literal['sha256', 'sha384', 'sha512'] +  # Helper Functions  conf = Config() + + +def _verify(target): +    """Decorator checks if config for PKI exists""" +    from functools import wraps + +    if target not in ['ca', 'certificate']: +        raise ValueError('Invalid PKI') + +    def _verify_target(func): +        @wraps(func) +        def _wrapper(*args, **kwargs): +            name = kwargs.get('name') +            unconf_message = f'PKI {target} "{name}" does not exist!' +            if name: +                if not conf.exists(['pki', target, name]): +                    raise vyos.opmode.UnconfiguredSubsystem(unconf_message) +            return func(*args, **kwargs) + +        return _wrapper + +    return _verify_target + +  def get_default_values():      # Fetch default x509 values      base = ['pki', 'x509', 'default'] -    x509_defaults = conf.get_config_dict(base, key_mangling=('-', '_'), -                                     no_tag_node_value_mangle=True, -                                     get_first_key=True, -                                     with_recursive_defaults=True) +    x509_defaults = conf.get_config_dict( +        base, +        key_mangling=('-', '_'), +        no_tag_node_value_mangle=True, +        get_first_key=True, +        with_recursive_defaults=True, +    )      return x509_defaults +  def get_config_ca_certificate(name=None):      # Fetch ca certificates from config      base = ['pki', 'ca'] @@ -71,12 +105,15 @@ def get_config_ca_certificate(name=None):      if name:          base = base + [name] -        if not conf.exists(base + ['private', 'key']) or not conf.exists(base + ['certificate']): +        if not conf.exists(base + ['private', 'key']) or not conf.exists( +            base + ['certificate'] +        ):              return False -    return conf.get_config_dict(base, key_mangling=('-', '_'), -                                get_first_key=True, -                                no_tag_node_value_mangle=True) +    return conf.get_config_dict( +        base, key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True +    ) +  def get_config_certificate(name=None):      # Get certificates from config @@ -86,18 +123,21 @@ def get_config_certificate(name=None):      if name:          base = base + [name] -        if not conf.exists(base + ['private', 'key']) or not conf.exists(base + ['certificate']): +        if not conf.exists(base + ['private', 'key']) or not conf.exists( +            base + ['certificate'] +        ):              return False -    pki = conf.get_config_dict(base, key_mangling=('-', '_'), -                                get_first_key=True, -                                no_tag_node_value_mangle=True) +    pki = conf.get_config_dict( +        base, key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True +    )      if pki:          for certificate in pki:              pki[certificate] = config_dict_mangle_acme(certificate, pki[certificate])      return pki +  def get_certificate_ca(cert, ca_certs):      # Find CA certificate for given certificate      if not ca_certs: @@ -116,6 +156,7 @@ def get_certificate_ca(cert, ca_certs):              return ca_name      return None +  def get_config_revoked_certificates():      # Fetch revoked certificates from config      ca_base = ['pki', 'ca'] @@ -124,19 +165,26 @@ def get_config_revoked_certificates():      certs = []      if conf.exists(ca_base): -        ca_certificates = conf.get_config_dict(ca_base, key_mangling=('-', '_'), -                                               get_first_key=True, -                                               no_tag_node_value_mangle=True) +        ca_certificates = conf.get_config_dict( +            ca_base, +            key_mangling=('-', '_'), +            get_first_key=True, +            no_tag_node_value_mangle=True, +        )          certs.extend(ca_certificates.values())      if conf.exists(cert_base): -        certificates = conf.get_config_dict(cert_base, key_mangling=('-', '_'), -                                            get_first_key=True, -                                            no_tag_node_value_mangle=True) +        certificates = conf.get_config_dict( +            cert_base, +            key_mangling=('-', '_'), +            get_first_key=True, +            no_tag_node_value_mangle=True, +        )          certs.extend(certificates.values())      return [cert_dict for cert_dict in certs if 'revoke' in cert_dict] +  def get_revoked_by_serial_numbers(serial_numbers=[]):      # Return serial numbers of revoked certificates      certs_out = [] @@ -160,113 +208,153 @@ def get_revoked_by_serial_numbers(serial_numbers=[]):                  certs_out.append(cert_name)      return certs_out -def install_certificate(name, cert='', private_key=None, key_type=None, key_passphrase=None, is_ca=False): + +def install_certificate( +    name, cert='', private_key=None, key_type=None, key_passphrase=None, is_ca=False +):      # Show/install conf commands for certificate      prefix = 'ca' if is_ca else 'certificate' -    base = f"pki {prefix} {name}" +    base = f'pki {prefix} {name}'      config_paths = []      if cert: -        cert_pem = "".join(encode_certificate(cert).strip().split("\n")[1:-1]) +        cert_pem = ''.join(encode_certificate(cert).strip().split('\n')[1:-1])          config_paths.append(f"{base} certificate '{cert_pem}'")      if private_key: -        key_pem = "".join(encode_private_key(private_key, passphrase=key_passphrase).strip().split("\n")[1:-1]) +        key_pem = ''.join( +            encode_private_key(private_key, passphrase=key_passphrase) +            .strip() +            .split('\n')[1:-1] +        )          config_paths.append(f"{base} private key '{key_pem}'")          if key_passphrase: -            config_paths.append(f"{base} private password-protected") +            config_paths.append(f'{base} private password-protected')      install_into_config(conf, config_paths) +  def install_crl(ca_name, crl):      # Show/install conf commands for crl -    crl_pem = "".join(encode_certificate(crl).strip().split("\n")[1:-1]) +    crl_pem = ''.join(encode_certificate(crl).strip().split('\n')[1:-1])      install_into_config(conf, [f"pki ca {ca_name} crl '{crl_pem}'"]) +  def install_dh_parameters(name, params):      # Show/install conf commands for dh params -    dh_pem = "".join(encode_dh_parameters(params).strip().split("\n")[1:-1]) +    dh_pem = ''.join(encode_dh_parameters(params).strip().split('\n')[1:-1])      install_into_config(conf, [f"pki dh {name} parameters '{dh_pem}'"]) +  def install_ssh_key(name, public_key, private_key, passphrase=None):      # Show/install conf commands for ssh key -    key_openssh = encode_public_key(public_key, encoding='OpenSSH', key_format='OpenSSH') +    key_openssh = encode_public_key( +        public_key, encoding='OpenSSH', key_format='OpenSSH' +    )      username = os.getlogin() -    type_key_split = key_openssh.split(" ") - -    base = f"system login user {username} authentication public-keys {name}" -    install_into_config(conf, [ -        f"{base} key '{type_key_split[1]}'", -        f"{base} type '{type_key_split[0]}'" -    ]) -    print(encode_private_key(private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase)) - -def install_keypair(name, key_type, private_key=None, public_key=None, passphrase=None, prompt=True): +    type_key_split = key_openssh.split(' ') + +    base = f'system login user {username} authentication public-keys {name}' +    install_into_config( +        conf, +        [f"{base} key '{type_key_split[1]}'", f"{base} type '{type_key_split[0]}'"], +    ) +    print( +        encode_private_key( +            private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase +        ) +    ) + + +def install_keypair( +    name, key_type, private_key=None, public_key=None, passphrase=None, prompt=True +):      # Show/install conf commands for key-pair      config_paths = []      if public_key: -        install_public_key = not prompt or ask_yes_no('Do you want to install the public key?', default=True) +        install_public_key = not prompt or ask_yes_no( +            'Do you want to install the public key?', default=True +        )          public_key_pem = encode_public_key(public_key)          if install_public_key: -            install_public_pem = "".join(public_key_pem.strip().split("\n")[1:-1]) -            config_paths.append(f"pki key-pair {name} public key '{install_public_pem}'") +            install_public_pem = ''.join(public_key_pem.strip().split('\n')[1:-1]) +            config_paths.append( +                f"pki key-pair {name} public key '{install_public_pem}'" +            )          else: -            print("Public key:") +            print('Public key:')              print(public_key_pem)      if private_key: -        install_private_key = not prompt or ask_yes_no('Do you want to install the private key?', default=True) +        install_private_key = not prompt or ask_yes_no( +            'Do you want to install the private key?', default=True +        )          private_key_pem = encode_private_key(private_key, passphrase=passphrase)          if install_private_key: -            install_private_pem = "".join(private_key_pem.strip().split("\n")[1:-1]) -            config_paths.append(f"pki key-pair {name} private key '{install_private_pem}'") +            install_private_pem = ''.join(private_key_pem.strip().split('\n')[1:-1]) +            config_paths.append( +                f"pki key-pair {name} private key '{install_private_pem}'" +            )              if passphrase: -                config_paths.append(f"pki key-pair {name} private password-protected") +                config_paths.append(f'pki key-pair {name} private password-protected')          else: -            print("Private key:") +            print('Private key:')              print(private_key_pem)      install_into_config(conf, config_paths) +  def install_openvpn_key(name, key_data, key_version='1'):      config_paths = [          f"pki openvpn shared-secret {name} key '{key_data}'", -        f"pki openvpn shared-secret {name} version '{key_version}'" +        f"pki openvpn shared-secret {name} version '{key_version}'",      ]      install_into_config(conf, config_paths) +  def install_wireguard_key(interface, private_key, public_key):      # Show conf commands for installing wireguard key pairs      from vyos.ifconfig import Section +      if Section.section(interface) != 'wireguard':          print(f'"{interface}" is not a WireGuard interface name!')          exit(1)      # Check if we are running in a config session - if yes, we can directly write to the CLI -    install_into_config(conf, [f"interfaces wireguard {interface} private-key '{private_key}'"]) +    install_into_config( +        conf, [f"interfaces wireguard {interface} private-key '{private_key}'"] +    )      print(f"Corresponding public-key to use on peer system is: '{public_key}'") +  def install_wireguard_psk(interface, peer, psk):      from vyos.ifconfig import Section +      if Section.section(interface) != 'wireguard':          print(f'"{interface}" is not a WireGuard interface name!')          exit(1)      # Check if we are running in a config session - if yes, we can directly write to the CLI -    install_into_config(conf, [f"interfaces wireguard {interface} peer {peer} preshared-key '{psk}'"]) +    install_into_config( +        conf, [f"interfaces wireguard {interface} peer {peer} preshared-key '{psk}'"] +    ) +  def ask_passphrase():      passphrase = None -    print("Note: If you plan to use the generated key on this router, do not encrypt the private key.") +    print( +        'Note: If you plan to use the generated key on this router, do not encrypt the private key.' +    )      if ask_yes_no('Do you want to encrypt the private key with a passphrase?'):          passphrase = ask_input('Enter passphrase:')      return passphrase +  def write_file(filename, contents):      full_path = os.path.join(auth_dir, filename)      directory = os.path.dirname(full_path) @@ -275,7 +363,9 @@ def write_file(filename, contents):          print('Failed to write file: directory does not exist')          return False -    if os.path.exists(full_path) and not ask_yes_no('Do you want to overwrite the existing file?'): +    if os.path.exists(full_path) and not ask_yes_no( +        'Do you want to overwrite the existing file?' +    ):          return False      with open(full_path, 'w') as f: @@ -283,10 +373,14 @@ def write_file(filename, contents):      print(f'File written to {full_path}') -# Generation functions +# Generation functions  def generate_private_key(): -    key_type = ask_input('Enter private key type: [rsa, dsa, ec]', default='rsa', valid_responses=['rsa', 'dsa', 'ec']) +    key_type = ask_input( +        'Enter private key type: [rsa, dsa, ec]', +        default='rsa', +        valid_responses=['rsa', 'dsa', 'ec'], +    )      size_valid = []      size_default = 0 @@ -298,28 +392,43 @@ def generate_private_key():          size_default = 256          size_valid = [224, 256, 384, 521] -    size = ask_input('Enter private key bits:', default=size_default, numeric_only=True, valid_responses=size_valid) +    size = ask_input( +        'Enter private key bits:', +        default=size_default, +        numeric_only=True, +        valid_responses=size_valid, +    )      return create_private_key(key_type, size), key_type +  def parse_san_string(san_string):      if not san_string:          return None      output = [] -    san_split = san_string.strip().split(",") +    san_split = san_string.strip().split(',')      for pair_str in san_split: -        tag, value = pair_str.strip().split(":", 1) +        tag, value = pair_str.strip().split(':', 1)          if tag == 'ipv4':              output.append(ipaddress.IPv4Address(value))          elif tag == 'ipv6':              output.append(ipaddress.IPv6Address(value))          elif tag == 'dns' or tag == 'rfc822':              output.append(value) -    return output - -def generate_certificate_request(private_key=None, key_type=None, return_request=False, name=None, install=False, file=False, ask_san=True): +    return + + +def generate_certificate_request( +    private_key=None, +    key_type=None, +    return_request=False, +    name=None, +    install=False, +    file=False, +    ask_san=True, +):      if not private_key:          private_key, key_type = generate_private_key() @@ -328,18 +437,24 @@ def generate_certificate_request(private_key=None, key_type=None, return_request      while True:          country = ask_input('Enter country code:', default=default_values['country'])          if len(country) != 2: -            print("Country name must be a 2 character country code") +            print('Country name must be a 2 character country code')              continue          subject['country'] = country          break      subject['state'] = ask_input('Enter state:', default=default_values['state']) -    subject['locality'] = ask_input('Enter locality:', default=default_values['locality']) -    subject['organization'] = ask_input('Enter organization name:', default=default_values['organization']) +    subject['locality'] = ask_input( +        'Enter locality:', default=default_values['locality'] +    ) +    subject['organization'] = ask_input( +        'Enter organization name:', default=default_values['organization'] +    )      subject['common_name'] = ask_input('Enter common name:', default='vyos.io')      subject_alt_names = None      if ask_san and ask_yes_no('Do you want to configure Subject Alternative Names?'): -        print("Enter alternative names in a comma separate list, example: ipv4:1.1.1.1,ipv6:fe80::1,dns:vyos.net,rfc822:user@vyos.net") +        print( +            'Enter alternative names in a comma separate list, example: ipv4:1.1.1.1,ipv6:fe80::1,dns:vyos.net,rfc822:user@vyos.net' +        )          san_string = ask_input('Enter Subject Alternative Names:')          subject_alt_names = parse_san_string(san_string) @@ -356,24 +471,48 @@ def generate_certificate_request(private_key=None, key_type=None, return_request          return None      if install: -        print("Certificate request:") -        print(encode_certificate(cert_req) + "\n") -        install_certificate(name, private_key=private_key, key_type=key_type, key_passphrase=passphrase, is_ca=False) +        print('Certificate request:') +        print(encode_certificate(cert_req) + '\n') +        install_certificate( +            name, +            private_key=private_key, +            key_type=key_type, +            key_passphrase=passphrase, +            is_ca=False, +        )      if file:          write_file(f'{name}.csr', encode_certificate(cert_req)) -        write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase)) - -def generate_certificate(cert_req, ca_cert, ca_private_key, is_ca=False, is_sub_ca=False): -    valid_days = ask_input('Enter how many days certificate will be valid:', default='365' if not is_ca else '1825', numeric_only=True) +        write_file( +            f'{name}.key', encode_private_key(private_key, passphrase=passphrase) +        ) + + +def generate_certificate( +    cert_req, ca_cert, ca_private_key, is_ca=False, is_sub_ca=False +): +    valid_days = ask_input( +        'Enter how many days certificate will be valid:', +        default='365' if not is_ca else '1825', +        numeric_only=True, +    )      cert_type = None      if not is_ca: -        cert_type = ask_input('Enter certificate type: (client, server)', default='server', valid_responses=['client', 'server']) -    return create_certificate(cert_req, ca_cert, ca_private_key, valid_days, cert_type, is_ca, is_sub_ca) +        cert_type = ask_input( +            'Enter certificate type: (client, server)', +            default='server', +            valid_responses=['client', 'server'], +        ) +    return create_certificate( +        cert_req, ca_cert, ca_private_key, valid_days, cert_type, is_ca, is_sub_ca +    ) +  def generate_ca_certificate(name, install=False, file=False):      private_key, key_type = generate_private_key() -    cert_req = generate_certificate_request(private_key, key_type, return_request=True, ask_san=False) +    cert_req = generate_certificate_request( +        private_key, key_type, return_request=True, ask_san=False +    )      cert = generate_certificate(cert_req, cert_req, private_key, is_ca=True)      passphrase = ask_passphrase() @@ -383,11 +522,16 @@ def generate_ca_certificate(name, install=False, file=False):          return None      if install: -        install_certificate(name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=True) +        install_certificate( +            name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=True +        )      if file:          write_file(f'{name}.pem', encode_certificate(cert)) -        write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase)) +        write_file( +            f'{name}.key', encode_private_key(private_key, passphrase=passphrase) +        ) +  def generate_ca_certificate_sign(name, ca_name, install=False, file=False):      ca_dict = get_config_ca_certificate(ca_name) @@ -399,17 +543,19 @@ def generate_ca_certificate_sign(name, ca_name, install=False, file=False):      ca_cert = load_certificate(ca_dict['certificate'])      if not ca_cert: -        print("Failed to load signing CA certificate, aborting") +        print('Failed to load signing CA certificate, aborting')          return None      ca_private = ca_dict['private']      ca_private_passphrase = None      if 'password_protected' in ca_private:          ca_private_passphrase = ask_input('Enter signing CA private key passphrase:') -    ca_private_key = load_private_key(ca_private['key'], passphrase=ca_private_passphrase) +    ca_private_key = load_private_key( +        ca_private['key'], passphrase=ca_private_passphrase +    )      if not ca_private_key: -        print("Failed to load signing CA private key, aborting") +        print('Failed to load signing CA private key, aborting')          return None      private_key = None @@ -418,9 +564,11 @@ def generate_ca_certificate_sign(name, ca_name, install=False, file=False):      cert_req = None      if not ask_yes_no('Do you already have a certificate request?'):          private_key, key_type = generate_private_key() -        cert_req = generate_certificate_request(private_key, key_type, return_request=True, ask_san=False) +        cert_req = generate_certificate_request( +            private_key, key_type, return_request=True, ask_san=False +        )      else: -        print("Paste certificate request and press enter:") +        print('Paste certificate request and press enter:')          lines = []          curr_line = ''          while True: @@ -430,17 +578,21 @@ def generate_ca_certificate_sign(name, ca_name, install=False, file=False):              lines.append(curr_line)          if not lines: -            print("Aborted") +            print('Aborted')              return None -        wrap = lines[0].find('-----') < 0 # Only base64 pasted, add the CSR tags for parsing -        cert_req = load_certificate_request("\n".join(lines), wrap) +        wrap = ( +            lines[0].find('-----') < 0 +        )  # Only base64 pasted, add the CSR tags for parsing +        cert_req = load_certificate_request('\n'.join(lines), wrap)      if not cert_req: -        print("Invalid certificate request") +        print('Invalid certificate request')          return None -    cert = generate_certificate(cert_req, ca_cert, ca_private_key, is_ca=True, is_sub_ca=True) +    cert = generate_certificate( +        cert_req, ca_cert, ca_private_key, is_ca=True, is_sub_ca=True +    )      passphrase = None      if private_key is not None: @@ -453,12 +605,17 @@ def generate_ca_certificate_sign(name, ca_name, install=False, file=False):          return None      if install: -        install_certificate(name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=True) +        install_certificate( +            name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=True +        )      if file:          write_file(f'{name}.pem', encode_certificate(cert))          if private_key is not None: -            write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase)) +            write_file( +                f'{name}.key', encode_private_key(private_key, passphrase=passphrase) +            ) +  def generate_certificate_sign(name, ca_name, install=False, file=False):      ca_dict = get_config_ca_certificate(ca_name) @@ -470,17 +627,19 @@ def generate_certificate_sign(name, ca_name, install=False, file=False):      ca_cert = load_certificate(ca_dict['certificate'])      if not ca_cert: -        print("Failed to load CA certificate, aborting") +        print('Failed to load CA certificate, aborting')          return None      ca_private = ca_dict['private']      ca_private_passphrase = None      if 'password_protected' in ca_private:          ca_private_passphrase = ask_input('Enter CA private key passphrase:') -    ca_private_key = load_private_key(ca_private['key'], passphrase=ca_private_passphrase) +    ca_private_key = load_private_key( +        ca_private['key'], passphrase=ca_private_passphrase +    )      if not ca_private_key: -        print("Failed to load CA private key, aborting") +        print('Failed to load CA private key, aborting')          return None      private_key = None @@ -489,9 +648,11 @@ def generate_certificate_sign(name, ca_name, install=False, file=False):      cert_req = None      if not ask_yes_no('Do you already have a certificate request?'):          private_key, key_type = generate_private_key() -        cert_req = generate_certificate_request(private_key, key_type, return_request=True) +        cert_req = generate_certificate_request( +            private_key, key_type, return_request=True +        )      else: -        print("Paste certificate request and press enter:") +        print('Paste certificate request and press enter:')          lines = []          curr_line = ''          while True: @@ -501,18 +662,20 @@ def generate_certificate_sign(name, ca_name, install=False, file=False):              lines.append(curr_line)          if not lines: -            print("Aborted") +            print('Aborted')              return None -        wrap = lines[0].find('-----') < 0 # Only base64 pasted, add the CSR tags for parsing -        cert_req = load_certificate_request("\n".join(lines), wrap) +        wrap = ( +            lines[0].find('-----') < 0 +        )  # Only base64 pasted, add the CSR tags for parsing +        cert_req = load_certificate_request('\n'.join(lines), wrap)      if not cert_req: -        print("Invalid certificate request") +        print('Invalid certificate request')          return None      cert = generate_certificate(cert_req, ca_cert, ca_private_key, is_ca=False) -     +      passphrase = None      if private_key is not None:          passphrase = ask_passphrase() @@ -524,12 +687,17 @@ def generate_certificate_sign(name, ca_name, install=False, file=False):          return None      if install: -        install_certificate(name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=False) +        install_certificate( +            name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=False +        )      if file:          write_file(f'{name}.pem', encode_certificate(cert))          if private_key is not None: -            write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase)) +            write_file( +                f'{name}.key', encode_private_key(private_key, passphrase=passphrase) +            ) +  def generate_certificate_selfsign(name, install=False, file=False):      private_key, key_type = generate_private_key() @@ -543,11 +711,21 @@ def generate_certificate_selfsign(name, install=False, file=False):          return None      if install: -        install_certificate(name, cert, private_key=private_key, key_type=key_type, key_passphrase=passphrase, is_ca=False) +        install_certificate( +            name, +            cert, +            private_key=private_key, +            key_type=key_type, +            key_passphrase=passphrase, +            is_ca=False, +        )      if file:          write_file(f'{name}.pem', encode_certificate(cert)) -        write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase)) +        write_file( +            f'{name}.key', encode_private_key(private_key, passphrase=passphrase) +        ) +  def generate_certificate_revocation_list(ca_name, install=False, file=False):      ca_dict = get_config_ca_certificate(ca_name) @@ -559,17 +737,19 @@ def generate_certificate_revocation_list(ca_name, install=False, file=False):      ca_cert = load_certificate(ca_dict['certificate'])      if not ca_cert: -        print("Failed to load CA certificate, aborting") +        print('Failed to load CA certificate, aborting')          return None      ca_private = ca_dict['private']      ca_private_passphrase = None      if 'password_protected' in ca_private:          ca_private_passphrase = ask_input('Enter CA private key passphrase:') -    ca_private_key = load_private_key(ca_private['key'], passphrase=ca_private_passphrase) +    ca_private_key = load_private_key( +        ca_private['key'], passphrase=ca_private_passphrase +    )      if not ca_private_key: -        print("Failed to load CA private key, aborting") +        print('Failed to load CA private key, aborting')          return None      revoked_certs = get_config_revoked_certificates() @@ -590,13 +770,13 @@ def generate_certificate_revocation_list(ca_name, install=False, file=False):              continue      if not to_revoke: -        print("No revoked certificates to add to the CRL") +        print('No revoked certificates to add to the CRL')          return None      crl = create_certificate_revocation_list(ca_cert, ca_private_key, to_revoke)      if not crl: -        print("Failed to create CRL") +        print('Failed to create CRL')          return None      if not install and not file: @@ -607,7 +787,8 @@ def generate_certificate_revocation_list(ca_name, install=False, file=False):          install_crl(ca_name, crl)      if file: -        write_file(f'{name}.crl', encode_certificate(crl)) +        write_file(f'{ca_name}.crl', encode_certificate(crl)) +  def generate_ssh_keypair(name, install=False, file=False):      private_key, key_type = generate_private_key() @@ -616,29 +797,42 @@ def generate_ssh_keypair(name, install=False, file=False):      if not install and not file:          print(encode_public_key(public_key, encoding='OpenSSH', key_format='OpenSSH')) -        print("") -        print(encode_private_key(private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase)) +        print('') +        print( +            encode_private_key( +                private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase +            ) +        )          return None      if install:          install_ssh_key(name, public_key, private_key, passphrase)      if file: -        write_file(f'{name}.pem', encode_public_key(public_key, encoding='OpenSSH', key_format='OpenSSH')) -        write_file(f'{name}.key', encode_private_key(private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase)) +        write_file( +            f'{name}.pem', +            encode_public_key(public_key, encoding='OpenSSH', key_format='OpenSSH'), +        ) +        write_file( +            f'{name}.key', +            encode_private_key( +                private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase +            ), +        ) +  def generate_dh_parameters(name, install=False, file=False):      bits = ask_input('Enter DH parameters key size:', default=2048, numeric_only=True) -    print("Generating parameters...") +    print('Generating parameters...')      dh_params = create_dh_parameters(bits)      if not dh_params: -        print("Failed to create DH parameters") +        print('Failed to create DH parameters')          return None      if not install and not file: -        print("DH Parameters:") +        print('DH Parameters:')          print(encode_dh_parameters(dh_params))      if install: @@ -647,6 +841,7 @@ def generate_dh_parameters(name, install=False, file=False):      if file:          write_file(f'{name}.pem', encode_dh_parameters(dh_params)) +  def generate_keypair(name, install=False, file=False):      private_key, key_type = generate_private_key()      public_key = private_key.public_key() @@ -654,7 +849,7 @@ def generate_keypair(name, install=False, file=False):      if not install and not file:          print(encode_public_key(public_key)) -        print("") +        print('')          print(encode_private_key(private_key, passphrase=passphrase))          return None @@ -663,13 +858,16 @@ def generate_keypair(name, install=False, file=False):      if file:          write_file(f'{name}.pem', encode_public_key(public_key)) -        write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase)) +        write_file( +            f'{name}.key', encode_private_key(private_key, passphrase=passphrase) +        ) +  def generate_openvpn_key(name, install=False, file=False):      result = cmd('openvpn --genkey secret /dev/stdout | grep -o "^[^#]*"')      if not result: -        print("Failed to generate OpenVPN key") +        print('Failed to generate OpenVPN key')          return None      if not install and not file: @@ -677,11 +875,13 @@ def generate_openvpn_key(name, install=False, file=False):          return None      if install: -        key_lines = result.split("\n") -        key_data = "".join(key_lines[1:-1]) # Remove wrapper tags and line endings +        key_lines = result.split('\n') +        key_data = ''.join(key_lines[1:-1])  # Remove wrapper tags and line endings          key_version = '1' -        version_search = re.search(r'BEGIN OpenVPN Static key V(\d+)', result) # Future-proofing (hopefully) +        version_search = re.search( +            r'BEGIN OpenVPN Static key V(\d+)', result +        )  # Future-proofing (hopefully)          if version_search:              key_version = version_search[1] @@ -690,6 +890,7 @@ def generate_openvpn_key(name, install=False, file=False):      if file:          write_file(f'{name}.key', result) +  def generate_wireguard_key(interface=None, install=False):      private_key = cmd('wg genkey')      public_key = cmd('wg pubkey', input=private_key) @@ -700,6 +901,7 @@ def generate_wireguard_key(interface=None, install=False):          print(f'Private key: {private_key}')          print(f'Public key: {public_key}', end='\n\n') +  def generate_wireguard_psk(interface=None, peer=None, install=False):      psk = cmd('wg genpsk')      if interface and peer and install: @@ -707,8 +909,11 @@ def generate_wireguard_psk(interface=None, peer=None, install=False):      else:          print(f'Pre-shared key: {psk}') +  # Import functions -def import_ca_certificate(name, path=None, key_path=None, no_prompt=False, passphrase=None): +def import_ca_certificate( +    name, path=None, key_path=None, no_prompt=False, passphrase=None +):      if path:          if not os.path.exists(path):              print(f'File not found: {path}') @@ -745,7 +950,10 @@ def import_ca_certificate(name, path=None, key_path=None, no_prompt=False, passp          install_certificate(name, private_key=key, is_ca=True) -def import_certificate(name, path=None, key_path=None, no_prompt=False, passphrase=None): + +def import_certificate( +    name, path=None, key_path=None, no_prompt=False, passphrase=None +):      if path:          if not os.path.exists(path):              print(f'File not found: {path}') @@ -782,6 +990,7 @@ def import_certificate(name, path=None, key_path=None, no_prompt=False, passphra          install_certificate(name, private_key=key, is_ca=False) +  def import_crl(name, path):      if not os.path.exists(path):          print(f'File not found: {path}') @@ -799,6 +1008,7 @@ def import_crl(name, path):      install_crl(name, crl) +  def import_dh_parameters(name, path):      if not os.path.exists(path):          print(f'File not found: {path}') @@ -816,6 +1026,7 @@ def import_dh_parameters(name, path):      install_dh_parameters(name, dh) +  def import_keypair(name, path=None, key_path=None, no_prompt=False, passphrase=None):      if path:          if not os.path.exists(path): @@ -853,6 +1064,7 @@ def import_keypair(name, path=None, key_path=None, no_prompt=False, passphrase=N          install_keypair(name, None, private_key=key, prompt=False) +  def import_openvpn_secret(name, path):      if not os.path.exists(path):          print(f'File not found: {path}') @@ -862,19 +1074,134 @@ def import_openvpn_secret(name, path):      key_version = '1'      with open(path) as f: -        key_lines = f.read().strip().split("\n") -        key_lines = list(filter(lambda line: not line.strip().startswith('#'), key_lines)) # Remove commented lines -        key_data = "".join(key_lines[1:-1]) # Remove wrapper tags and line endings - -    version_search = re.search(r'BEGIN OpenVPN Static key V(\d+)', key_lines[0]) # Future-proofing (hopefully) +        key_lines = f.read().strip().split('\n') +        key_lines = list( +            filter(lambda line: not line.strip().startswith('#'), key_lines) +        )  # Remove commented lines +        key_data = ''.join(key_lines[1:-1])  # Remove wrapper tags and line endings + +    version_search = re.search( +        r'BEGIN OpenVPN Static key V(\d+)', key_lines[0] +    )  # Future-proofing (hopefully)      if version_search:          key_version = version_search[1]      install_openvpn_key(name, key_data, key_version) -# Show functions -def show_certificate_authority(name=None, pem=False): -    headers = ['Name', 'Subject', 'Issuer CN', 'Issued', 'Expiry', 'Private Key', 'Parent'] + +def generate_pki( +    raw: bool, +    pki_type: ArgsPkiTypeGen, +    name: typing.Optional[str], +    file: typing.Optional[bool], +    install: typing.Optional[bool], +    sign: typing.Optional[str], +    self_sign: typing.Optional[bool], +    key: typing.Optional[bool], +    psk: typing.Optional[bool], +    interface: typing.Optional[str], +    peer: typing.Optional[str], +): +    try: +        if pki_type == 'ca': +            if sign: +                generate_ca_certificate_sign(name, sign, install=install, file=file) +            else: +                generate_ca_certificate(name, install=install, file=file) +        elif pki_type == 'certificate': +            if sign: +                generate_certificate_sign(name, sign, install=install, file=file) +            elif self_sign: +                generate_certificate_selfsign(name, install=install, file=file) +            else: +                generate_certificate_request(name=name, install=install, file=file) + +        elif pki_type == 'crl': +            generate_certificate_revocation_list(name, install=install, file=file) + +        elif pki_type == 'ssh': +            generate_ssh_keypair(name, install=install, file=file) + +        elif pki_type == 'dh': +            generate_dh_parameters(name, install=install, file=file) + +        elif pki_type == 'key-pair': +            generate_keypair(name, install=install, file=file) + +        elif pki_type == 'openvpn': +            generate_openvpn_key(name, install=install, file=file) + +        elif pki_type == 'wireguard': +            # WireGuard supports writing key directly into the CLI, but this +            # requires the vyos_libexec_dir environment variable to be set +            os.environ['vyos_libexec_dir'] = '/usr/libexec/vyos' + +            if key: +                generate_wireguard_key(interface, install=install) +            if psk: +                generate_wireguard_psk(interface, peer=peer, install=install) +    except KeyboardInterrupt: +        print('Aborted') +        sys.exit(0) + + +def import_pki( +    name: str, +    pki_type: ArgsPkiType, +    filename: typing.Optional[str], +    key_filename: typing.Optional[str], +    no_prompt: typing.Optional[bool], +    passphrase: typing.Optional[str], +): +    try: +        if pki_type == 'ca': +            import_ca_certificate( +                name, +                path=filename, +                key_path=key_filename, +                no_prompt=no_prompt, +                passphrase=passphrase, +            ) +        elif pki_type == 'certificate': +            import_certificate( +                name, +                path=filename, +                key_path=key_filename, +                no_prompt=no_prompt, +                passphrase=passphrase, +            ) +        elif pki_type == 'crl': +            import_crl(name, filename) +        elif pki_type == 'dh': +            import_dh_parameters(name, filename) +        elif pki_type == 'key-pair': +            import_keypair( +                name, +                path=filename, +                key_path=key_filename, +                no_prompt=no_prompt, +                passphrase=passphrase, +            ) +        elif pki_type == 'openvpn': +            import_openvpn_secret(name, filename) +    except KeyboardInterrupt: +        print('Aborted') +        sys.exit(0) + + +@_verify('ca') +def show_certificate_authority( +    raw: bool, name: typing.Optional[str] = None, pem: typing.Optional[bool] = False +): +    headers = [ +        'Name', +        'Subject', +        'Issuer CN', +        'Issued', +        'Expiry', +        'Private Key', +        'Parent', +    ]      data = []      certs = get_config_ca_certificate()      if certs: @@ -891,7 +1218,7 @@ def show_certificate_authority(name=None, pem=False):                  return              parent_ca_name = get_certificate_ca(cert, certs) -            cert_issuer_cn = cert.issuer.rfc4514_string().split(",")[0] +            cert_issuer_cn = cert.issuer.rfc4514_string().split(',')[0]              if not parent_ca_name or parent_ca_name == cert_name:                  parent_ca_name = 'N/A' @@ -899,14 +1226,45 @@ def show_certificate_authority(name=None, pem=False):              if not cert:                  continue -            have_private = 'Yes' if 'private' in cert_dict and 'key' in cert_dict['private'] else 'No' -            data.append([cert_name, cert.subject.rfc4514_string(), cert_issuer_cn, cert.not_valid_before, cert.not_valid_after, have_private, parent_ca_name]) - -    print("Certificate Authorities:") +            have_private = ( +                'Yes' +                if 'private' in cert_dict and 'key' in cert_dict['private'] +                else 'No' +            ) +            data.append( +                [ +                    cert_name, +                    cert.subject.rfc4514_string(), +                    cert_issuer_cn, +                    cert.not_valid_before, +                    cert.not_valid_after, +                    have_private, +                    parent_ca_name, +                ] +            ) + +    print('Certificate Authorities:')      print(tabulate.tabulate(data, headers)) -def show_certificate(name=None, pem=False, fingerprint_hash=None): -    headers = ['Name', 'Type', 'Subject CN', 'Issuer CN', 'Issued', 'Expiry', 'Revoked', 'Private Key', 'CA Present'] + +@_verify('certificate') +def show_certificate( +    raw: bool, +    name: typing.Optional[str] = None, +    pem: typing.Optional[bool] = False, +    fingerprint: typing.Optional[ArgsFingerprint] = None, +): +    headers = [ +        'Name', +        'Type', +        'Subject CN', +        'Issuer CN', +        'Issued', +        'Expiry', +        'Revoked', +        'Private Key', +        'CA Present', +    ]      data = []      certs = get_config_certificate()      if certs: @@ -926,13 +1284,13 @@ def show_certificate(name=None, pem=False, fingerprint_hash=None):              if name and pem:                  print(encode_certificate(cert))                  return -            elif name and fingerprint_hash: -                print(get_certificate_fingerprint(cert, fingerprint_hash)) +            elif name and fingerprint: +                print(get_certificate_fingerprint(cert, fingerprint))                  return              ca_name = get_certificate_ca(cert, ca_certs) -            cert_subject_cn = cert.subject.rfc4514_string().split(",")[0] -            cert_issuer_cn = cert.issuer.rfc4514_string().split(",")[0] +            cert_subject_cn = cert.subject.rfc4514_string().split(',')[0] +            cert_issuer_cn = cert.issuer.rfc4514_string().split(',')[0]              cert_type = 'Unknown'              try: @@ -941,21 +1299,37 @@ def show_certificate(name=None, pem=False, fingerprint_hash=None):                      cert_type = 'Server'                  elif ext and ExtendedKeyUsageOID.CLIENT_AUTH in ext.value:                      cert_type = 'Client' -            except: +            except Exception:                  pass              revoked = 'Yes' if 'revoke' in cert_dict else 'No' -            have_private = 'Yes' if 'private' in cert_dict and 'key' in cert_dict['private'] else 'No' +            have_private = ( +                'Yes' +                if 'private' in cert_dict and 'key' in cert_dict['private'] +                else 'No' +            )              have_ca = f'Yes ({ca_name})' if ca_name else 'No' -            data.append([ -                cert_name, cert_type, cert_subject_cn, cert_issuer_cn, -                cert.not_valid_before, cert.not_valid_after, -                revoked, have_private, have_ca]) - -    print("Certificates:") +            data.append( +                [ +                    cert_name, +                    cert_type, +                    cert_subject_cn, +                    cert_issuer_cn, +                    cert.not_valid_before, +                    cert.not_valid_after, +                    revoked, +                    have_private, +                    have_ca, +                ] +            ) + +    print('Certificates:')      print(tabulate.tabulate(data, headers)) -def show_crl(name=None, pem=False): + +def show_crl( +    raw: bool, name: typing.Optional[str] = None, pem: typing.Optional[bool] = False +):      headers = ['CA Name', 'Updated', 'Revokes']      data = []      certs = get_config_ca_certificate() @@ -980,141 +1354,31 @@ def show_crl(name=None, pem=False):                      print(encode_certificate(crl))                      continue -                certs = get_revoked_by_serial_numbers([revoked.serial_number for revoked in crl]) -                data.append([cert_name, crl.last_update, ", ".join(certs)]) +                certs = get_revoked_by_serial_numbers( +                    [revoked.serial_number for revoked in crl] +                ) +                data.append([cert_name, crl.last_update, ', '.join(certs)])      if name and pem:          return -    print("Certificate Revocation Lists:") +    print('Certificate Revocation Lists:')      print(tabulate.tabulate(data, headers)) -if __name__ == '__main__': -    parser = argparse.ArgumentParser() -    parser.add_argument('--action', help='PKI action', required=True) - -    # X509 -    parser.add_argument('--ca', help='Certificate Authority', required=False) -    parser.add_argument('--certificate', help='Certificate', required=False) -    parser.add_argument('--crl', help='Certificate Revocation List', required=False) -    parser.add_argument('--sign', help='Sign certificate with specified CA', required=False) -    parser.add_argument('--self-sign', help='Self-sign the certificate', action='store_true') -    parser.add_argument('--pem', help='Output using PEM encoding', action='store_true') -    parser.add_argument('--fingerprint', help='Show fingerprint and exit', action='store') -    # SSH -    parser.add_argument('--ssh', help='SSH Key', required=False) +def show_all(raw: bool): +    show_certificate_authority(raw) +    print('\n') +    show_certificate(raw) +    print('\n') +    show_crl(raw) -    # DH -    parser.add_argument('--dh', help='DH Parameters', required=False) - -    # Key pair -    parser.add_argument('--keypair', help='Key pair', required=False) - -    # OpenVPN -    parser.add_argument('--openvpn', help='OpenVPN TLS key', required=False) - -    # WireGuard -    parser.add_argument('--wireguard', help='Wireguard', action='store_true') -    group = parser.add_mutually_exclusive_group() -    group.add_argument('--key', help='Wireguard key pair', action='store_true', required=False) -    group.add_argument('--psk', help='Wireguard pre shared key', action='store_true', required=False) -    parser.add_argument('--interface', help='Install generated keys into running-config for named interface', action='store') -    parser.add_argument('--peer', help='Install generated keys into running-config for peer', action='store') - -    # Global -    parser.add_argument('--file', help='Write generated keys into specified filename', action='store_true') -    parser.add_argument('--install', help='Install generated keys into running-config', action='store_true') - -    parser.add_argument('--filename', help='Write certificate into specified filename', action='store') -    parser.add_argument('--key-filename', help='Write key into specified filename', action='store') - -    parser.add_argument('--no-prompt', action='store_true', help='Perform action non-interactively') -    parser.add_argument('--passphrase', help='A passphrase to decrypt the private key') - -    args = parser.parse_args() +if __name__ == '__main__':      try: -        if args.action == 'generate': -            if args.ca: -                if args.sign: -                    generate_ca_certificate_sign(args.ca, args.sign, install=args.install, file=args.file) -                else: -                    generate_ca_certificate(args.ca, install=args.install, file=args.file) -            elif args.certificate: -                if args.sign: -                    generate_certificate_sign(args.certificate, args.sign, install=args.install, file=args.file) -                elif args.self_sign: -                    generate_certificate_selfsign(args.certificate, install=args.install, file=args.file) -                else: -                    generate_certificate_request(name=args.certificate, install=args.install, file=args.file) - -            elif args.crl: -                generate_certificate_revocation_list(args.crl, install=args.install, file=args.file) - -            elif args.ssh: -                generate_ssh_keypair(args.ssh, install=args.install, file=args.file) - -            elif args.dh: -                generate_dh_parameters(args.dh, install=args.install, file=args.file) - -            elif args.keypair: -                generate_keypair(args.keypair, install=args.install, file=args.file) - -            elif args.openvpn: -                generate_openvpn_key(args.openvpn, install=args.install, file=args.file) - -            elif args.wireguard: -                # WireGuard supports writing key directly into the CLI, but this -                # requires the vyos_libexec_dir environment variable to be set -                os.environ["vyos_libexec_dir"] = "/usr/libexec/vyos" - -                if args.key: -                    generate_wireguard_key(args.interface, install=args.install) -                if args.psk: -                    generate_wireguard_psk(args.interface, peer=args.peer, install=args.install) -        elif args.action == 'import': -            if args.ca: -                import_ca_certificate(args.ca, path=args.filename, key_path=args.key_filename, -                                      no_prompt=args.no_prompt, passphrase=args.passphrase) -            elif args.certificate: -                import_certificate(args.certificate, path=args.filename, key_path=args.key_filename, -                                   no_prompt=args.no_prompt, passphrase=args.passphrase) -            elif args.crl: -                import_crl(args.crl, args.filename) -            elif args.dh: -                import_dh_parameters(args.dh, args.filename) -            elif args.keypair: -                import_keypair(args.keypair, path=args.filename, key_path=args.key_filename, -                               no_prompt=args.no_prompt, passphrase=args.passphrase) -            elif args.openvpn: -                import_openvpn_secret(args.openvpn, args.filename) -        elif args.action == 'show': -            if args.ca: -                ca_name = None if args.ca == 'all' else args.ca -                if ca_name: -                    if not conf.exists(['pki', 'ca', ca_name]): -                        print(f'CA "{ca_name}" does not exist!') -                        exit(1) -                show_certificate_authority(ca_name, args.pem) -            elif args.certificate: -                cert_name = None if args.certificate == 'all' else args.certificate -                if cert_name: -                    if not conf.exists(['pki', 'certificate', cert_name]): -                        print(f'Certificate "{cert_name}" does not exist!') -                        exit(1) -                if args.fingerprint is None: -                    show_certificate(None if args.certificate == 'all' else args.certificate, args.pem) -                else: -                    show_certificate(args.certificate, fingerprint_hash=args.fingerprint) -            elif args.crl: -                show_crl(None if args.crl == 'all' else args.crl, args.pem) -            else: -                show_certificate_authority() -                print('\n') -                show_certificate() -                print('\n') -                show_crl() -    except KeyboardInterrupt: -        print("Aborted") -        sys.exit(0) +        res = vyos.opmode.run(sys.modules[__name__]) +        if res: +            print(res) +    except (ValueError, vyos.opmode.Error) as e: +        print(e) +        sys.exit(1) diff --git a/src/services/api/rest/routers.py b/src/services/api/rest/routers.py index da981d5bf..5612e947c 100644 --- a/src/services/api/rest/routers.py +++ b/src/services/api/rest/routers.py @@ -423,9 +423,9 @@ def create_path_import_pki_no_prompt(path):      correct_paths = ['ca', 'certificate', 'key-pair']      if path[1] not in correct_paths:          return False -    path[1] = '--' + path[1].replace('-', '')      path[3] = '--key-filename' -    return path[1:] +    path.insert(2, '--name') +    return ['--pki-type'] + path[1:]  @router.post('/configure') | 
