summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorNataliia S. <81954790+natali-rs1985@users.noreply.github.com>2024-10-21 10:30:38 +0300
committerGitHub <noreply@github.com>2024-10-21 10:30:38 +0300
commit18a9cec3deb6cc2dc49020a89208dc70defe9822 (patch)
treebe0bcfa93d52d559da03bc97eb928fc508bacd7d /src
parent5c76607a9faef1fb5dc07459a38d37261ce988c1 (diff)
parentfa272f5297177354714ee7867104f43e4e322df6 (diff)
downloadvyos-1x-18a9cec3deb6cc2dc49020a89208dc70defe9822.tar.gz
vyos-1x-18a9cec3deb6cc2dc49020a89208dc70defe9822.zip
Merge branch 'current' into T6695
Diffstat (limited to 'src')
-rw-r--r--[-rwxr-xr-x]src/conf_mode/load-balancing_haproxy.py (renamed from src/conf_mode/load-balancing_reverse-proxy.py)2
-rwxr-xr-xsrc/conf_mode/pki.py2
-rwxr-xr-xsrc/conf_mode/service_monitoring_frr-exporter.py101
-rwxr-xr-xsrc/conf_mode/system_config-management.py20
-rwxr-xr-xsrc/conf_mode/system_login_banner.py4
-rwxr-xr-xsrc/helpers/commit-confirm-notify.py48
-rwxr-xr-xsrc/migration-scripts/reverse-proxy/1-to-227
-rwxr-xr-xsrc/op_mode/load-balancing_haproxy.py (renamed from src/op_mode/reverseproxy.py)4
-rwxr-xr-xsrc/op_mode/pki.py820
-rwxr-xr-xsrc/op_mode/restart.py10
-rw-r--r--src/services/api/rest/routers.py4
11 files changed, 737 insertions, 305 deletions
diff --git a/src/conf_mode/load-balancing_reverse-proxy.py b/src/conf_mode/load-balancing_haproxy.py
index 17226efe9..45042dd52 100755..100644
--- a/src/conf_mode/load-balancing_reverse-proxy.py
+++ b/src/conf_mode/load-balancing_haproxy.py
@@ -48,7 +48,7 @@ def get_config(config=None):
else:
conf = Config()
- base = ['load-balancing', 'reverse-proxy']
+ base = ['load-balancing', 'haproxy']
if not conf.exists(base):
return None
lb = conf.get_config_dict(base,
diff --git a/src/conf_mode/pki.py b/src/conf_mode/pki.py
index 233d73ba8..45e0129a3 100755
--- a/src/conf_mode/pki.py
+++ b/src/conf_mode/pki.py
@@ -71,7 +71,7 @@ sync_search = [
},
{
'keys': ['certificate', 'ca_certificate'],
- 'path': ['load_balancing', 'reverse_proxy'],
+ 'path': ['load_balancing', 'haproxy'],
},
{
'keys': ['key'],
diff --git a/src/conf_mode/service_monitoring_frr-exporter.py b/src/conf_mode/service_monitoring_frr-exporter.py
new file mode 100755
index 000000000..01527d579
--- /dev/null
+++ b/src/conf_mode/service_monitoring_frr-exporter.py
@@ -0,0 +1,101 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 2024 VyOS maintainers and contributors
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 or later as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+
+from sys import exit
+
+from vyos.config import Config
+from vyos.configdict import is_node_changed
+from vyos.configverify import verify_vrf
+from vyos.template import render
+from vyos.utils.process import call
+from vyos import ConfigError
+from vyos import airbag
+
+
+airbag.enable()
+
+service_file = '/etc/systemd/system/frr_exporter.service'
+systemd_service = 'frr_exporter.service'
+
+
+def get_config(config=None):
+ if config:
+ conf = config
+ else:
+ conf = Config()
+ base = ['service', 'monitoring', 'frr-exporter']
+ if not conf.exists(base):
+ return None
+
+ config_data = conf.get_config_dict(
+ base, key_mangling=('-', '_'), get_first_key=True
+ )
+ config_data = conf.merge_defaults(config_data, recursive=True)
+
+ tmp = is_node_changed(conf, base + ['vrf'])
+ if tmp:
+ config_data.update({'restart_required': {}})
+
+ return config_data
+
+
+def verify(config_data):
+ # bail out early - looks like removal from running config
+ if not config_data:
+ return None
+
+ verify_vrf(config_data)
+ return None
+
+
+def generate(config_data):
+ if not config_data:
+ # Delete systemd files
+ if os.path.isfile(service_file):
+ os.unlink(service_file)
+ return None
+
+ # Render frr_exporter service_file
+ render(service_file, 'frr_exporter/frr_exporter.service.j2', config_data)
+ return None
+
+
+def apply(config_data):
+ # Reload systemd manager configuration
+ call('systemctl daemon-reload')
+ if not config_data:
+ call(f'systemctl stop {systemd_service}')
+ return
+
+ # we need to restart the service if e.g. the VRF name changed
+ systemd_action = 'reload-or-restart'
+ if 'restart_required' in config_data:
+ systemd_action = 'restart'
+
+ call(f'systemctl {systemd_action} {systemd_service}')
+
+
+if __name__ == '__main__':
+ try:
+ c = get_config()
+ verify(c)
+ generate(c)
+ apply(c)
+ except ConfigError as e:
+ print(e)
+ exit(1)
diff --git a/src/conf_mode/system_config-management.py b/src/conf_mode/system_config-management.py
index c681a8405..8de4e5342 100755
--- a/src/conf_mode/system_config-management.py
+++ b/src/conf_mode/system_config-management.py
@@ -22,6 +22,7 @@ from vyos.config import Config
from vyos.config_mgmt import ConfigMgmt
from vyos.config_mgmt import commit_post_hook_dir, commit_hooks
+
def get_config(config=None):
if config:
conf = config
@@ -36,22 +37,29 @@ def get_config(config=None):
return mgmt
-def verify(_mgmt):
+
+def verify(mgmt):
+ d = mgmt.config_dict
+ confirm = d.get('commit_confirm', {})
+ if confirm.get('action', '') == 'reload' and 'commit_revisions' not in d:
+ raise ConfigError('commit-confirm reload requires non-zero commit-revisions')
+
return
+
def generate(mgmt):
if mgmt is None:
return
mgmt.initialize_revision()
+
def apply(mgmt):
if mgmt is None:
return
locations = mgmt.locations
- archive_target = os.path.join(commit_post_hook_dir,
- commit_hooks['commit_archive'])
+ archive_target = os.path.join(commit_post_hook_dir, commit_hooks['commit_archive'])
if locations:
try:
os.symlink('/usr/bin/config-mgmt', archive_target)
@@ -68,8 +76,9 @@ def apply(mgmt):
raise ConfigError from exc
revisions = mgmt.max_revisions
- revision_target = os.path.join(commit_post_hook_dir,
- commit_hooks['commit_revision'])
+ revision_target = os.path.join(
+ commit_post_hook_dir, commit_hooks['commit_revision']
+ )
if revisions > 0:
try:
os.symlink('/usr/bin/config-mgmt', revision_target)
@@ -85,6 +94,7 @@ def apply(mgmt):
except OSError as exc:
raise ConfigError from exc
+
if __name__ == '__main__':
try:
c = get_config()
diff --git a/src/conf_mode/system_login_banner.py b/src/conf_mode/system_login_banner.py
index 923e1bf57..5826d8042 100755
--- a/src/conf_mode/system_login_banner.py
+++ b/src/conf_mode/system_login_banner.py
@@ -28,6 +28,7 @@ airbag.enable()
PRELOGIN_FILE = r'/etc/issue'
PRELOGIN_NET_FILE = r'/etc/issue.net'
POSTLOGIN_FILE = r'/etc/motd'
+POSTLOGIN_VYOS_FILE = r'/run/motd.d/01-vyos-nonproduction'
default_config_data = {
'issue': 'Welcome to VyOS - \\n \\l\n\n',
@@ -94,6 +95,9 @@ def apply(banner):
render(POSTLOGIN_FILE, 'login/default_motd.j2', banner,
permission=0o644, user='root', group='root')
+ render(POSTLOGIN_VYOS_FILE, 'login/motd_vyos_nonproduction.j2', banner,
+ permission=0o644, user='root', group='root')
+
return None
if __name__ == '__main__':
diff --git a/src/helpers/commit-confirm-notify.py b/src/helpers/commit-confirm-notify.py
index 8d7626c78..af6167651 100755
--- a/src/helpers/commit-confirm-notify.py
+++ b/src/helpers/commit-confirm-notify.py
@@ -2,30 +2,56 @@
import os
import sys
import time
+from argparse import ArgumentParser
# Minutes before reboot to trigger notification.
intervals = [1, 5, 15, 60]
-def notify(interval):
- s = "" if interval == 1 else "s"
+parser = ArgumentParser()
+parser.add_argument(
+ 'minutes', type=int, help='minutes before rollback to trigger notification'
+)
+parser.add_argument(
+ '--reboot', action='store_true', help="use 'soft' rollback instead of reboot"
+)
+
+
+def notify(interval, reboot=False):
+ s = '' if interval == 1 else 's'
time.sleep((minutes - interval) * 60)
- message = ('"[commit-confirm] System is going to reboot in '
- f'{interval} minute{s} to rollback the last commit.\n'
- 'Confirm your changes to cancel the reboot."')
- os.system("wall -n " + message)
+ if reboot:
+ message = (
+ '"[commit-confirm] System will reboot in '
+ f'{interval} minute{s}\nto rollback the last commit.\n'
+ 'Confirm your changes to cancel the reboot."'
+ )
+ os.system('wall -n ' + message)
+ else:
+ message = (
+ '"[commit-confirm] System will reload previous config in '
+ f'{interval} minute{s}\nto rollback the last commit.\n'
+ 'Confirm your changes to cancel the reload."'
+ )
+ os.system('wall -n ' + message)
+
-if __name__ == "__main__":
+if __name__ == '__main__':
# Must be run as root to call wall(1) without a banner.
- if len(sys.argv) != 2 or os.getuid() != 0:
+ if os.getuid() != 0:
print('This script requires superuser privileges.', file=sys.stderr)
exit(1)
- minutes = int(sys.argv[1])
+
+ args = parser.parse_args()
+
+ minutes = args.minutes
+ reboot = args.reboot
+
# Drop the argument from the list so that the notification
# doesn't kick in immediately.
if minutes in intervals:
intervals.remove(minutes)
for interval in sorted(intervals, reverse=True):
if minutes >= interval:
- notify(interval)
- minutes -= (minutes - interval)
+ notify(interval, reboot=reboot)
+ minutes -= minutes - interval
exit(0)
diff --git a/src/migration-scripts/reverse-proxy/1-to-2 b/src/migration-scripts/reverse-proxy/1-to-2
new file mode 100755
index 000000000..61612bc36
--- /dev/null
+++ b/src/migration-scripts/reverse-proxy/1-to-2
@@ -0,0 +1,27 @@
+# Copyright 2024 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+# T6745: Rename base node to haproxy
+
+from vyos.configtree import ConfigTree
+
+base = ['load-balancing', 'reverse-proxy']
+
+def migrate(config: ConfigTree) -> None:
+ if not config.exists(base):
+ # Nothing to do
+ return
+
+ config.rename(base, 'haproxy')
diff --git a/src/op_mode/reverseproxy.py b/src/op_mode/load-balancing_haproxy.py
index 19704182a..ae6734e16 100755
--- a/src/op_mode/reverseproxy.py
+++ b/src/op_mode/load-balancing_haproxy.py
@@ -217,8 +217,8 @@ def _get_formatted_output(data):
def show(raw: bool):
config = ConfigTreeQuery()
- if not config.exists('load-balancing reverse-proxy'):
- raise vyos.opmode.UnconfiguredSubsystem('Reverse-proxy is not configured')
+ if not config.exists('load-balancing haproxy'):
+ raise vyos.opmode.UnconfiguredSubsystem('Haproxy is not configured')
data = _get_raw_data()
if raw:
diff --git a/src/op_mode/pki.py b/src/op_mode/pki.py
index 5652a5d74..49a461e9e 100755
--- a/src/op_mode/pki.py
+++ b/src/op_mode/pki.py
@@ -14,16 +14,18 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import argparse
import ipaddress
import os
import re
import sys
import tabulate
+import typing
from cryptography import x509
from cryptography.x509.oid import ExtendedKeyUsageOID
+import vyos.opmode
+
from vyos.config import Config
from vyos.config import config_dict_mangle_acme
from vyos.pki import encode_certificate
@@ -51,18 +53,50 @@ from vyos.utils.process import cmd
CERT_REQ_END = '-----END CERTIFICATE REQUEST-----'
auth_dir = '/config/auth'
+ArgsPkiType = typing.Literal['ca', 'certificate', 'dh', 'key-pair', 'openvpn', 'crl']
+ArgsPkiTypeGen = typing.Literal[ArgsPkiType, typing.Literal['ssh', 'wireguard']]
+ArgsFingerprint = typing.Literal['sha256', 'sha384', 'sha512']
+
# Helper Functions
conf = Config()
+
+
+def _verify(target):
+ """Decorator checks if config for PKI exists"""
+ from functools import wraps
+
+ if target not in ['ca', 'certificate']:
+ raise ValueError('Invalid PKI')
+
+ def _verify_target(func):
+ @wraps(func)
+ def _wrapper(*args, **kwargs):
+ name = kwargs.get('name')
+ unconf_message = f'PKI {target} "{name}" does not exist!'
+ if name:
+ if not conf.exists(['pki', target, name]):
+ raise vyos.opmode.UnconfiguredSubsystem(unconf_message)
+ return func(*args, **kwargs)
+
+ return _wrapper
+
+ return _verify_target
+
+
def get_default_values():
# Fetch default x509 values
base = ['pki', 'x509', 'default']
- x509_defaults = conf.get_config_dict(base, key_mangling=('-', '_'),
- no_tag_node_value_mangle=True,
- get_first_key=True,
- with_recursive_defaults=True)
+ x509_defaults = conf.get_config_dict(
+ base,
+ key_mangling=('-', '_'),
+ no_tag_node_value_mangle=True,
+ get_first_key=True,
+ with_recursive_defaults=True,
+ )
return x509_defaults
+
def get_config_ca_certificate(name=None):
# Fetch ca certificates from config
base = ['pki', 'ca']
@@ -71,12 +105,15 @@ def get_config_ca_certificate(name=None):
if name:
base = base + [name]
- if not conf.exists(base + ['private', 'key']) or not conf.exists(base + ['certificate']):
+ if not conf.exists(base + ['private', 'key']) or not conf.exists(
+ base + ['certificate']
+ ):
return False
- return conf.get_config_dict(base, key_mangling=('-', '_'),
- get_first_key=True,
- no_tag_node_value_mangle=True)
+ return conf.get_config_dict(
+ base, key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True
+ )
+
def get_config_certificate(name=None):
# Get certificates from config
@@ -86,18 +123,21 @@ def get_config_certificate(name=None):
if name:
base = base + [name]
- if not conf.exists(base + ['private', 'key']) or not conf.exists(base + ['certificate']):
+ if not conf.exists(base + ['private', 'key']) or not conf.exists(
+ base + ['certificate']
+ ):
return False
- pki = conf.get_config_dict(base, key_mangling=('-', '_'),
- get_first_key=True,
- no_tag_node_value_mangle=True)
+ pki = conf.get_config_dict(
+ base, key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True
+ )
if pki:
for certificate in pki:
pki[certificate] = config_dict_mangle_acme(certificate, pki[certificate])
return pki
+
def get_certificate_ca(cert, ca_certs):
# Find CA certificate for given certificate
if not ca_certs:
@@ -116,6 +156,7 @@ def get_certificate_ca(cert, ca_certs):
return ca_name
return None
+
def get_config_revoked_certificates():
# Fetch revoked certificates from config
ca_base = ['pki', 'ca']
@@ -124,19 +165,26 @@ def get_config_revoked_certificates():
certs = []
if conf.exists(ca_base):
- ca_certificates = conf.get_config_dict(ca_base, key_mangling=('-', '_'),
- get_first_key=True,
- no_tag_node_value_mangle=True)
+ ca_certificates = conf.get_config_dict(
+ ca_base,
+ key_mangling=('-', '_'),
+ get_first_key=True,
+ no_tag_node_value_mangle=True,
+ )
certs.extend(ca_certificates.values())
if conf.exists(cert_base):
- certificates = conf.get_config_dict(cert_base, key_mangling=('-', '_'),
- get_first_key=True,
- no_tag_node_value_mangle=True)
+ certificates = conf.get_config_dict(
+ cert_base,
+ key_mangling=('-', '_'),
+ get_first_key=True,
+ no_tag_node_value_mangle=True,
+ )
certs.extend(certificates.values())
return [cert_dict for cert_dict in certs if 'revoke' in cert_dict]
+
def get_revoked_by_serial_numbers(serial_numbers=[]):
# Return serial numbers of revoked certificates
certs_out = []
@@ -160,113 +208,153 @@ def get_revoked_by_serial_numbers(serial_numbers=[]):
certs_out.append(cert_name)
return certs_out
-def install_certificate(name, cert='', private_key=None, key_type=None, key_passphrase=None, is_ca=False):
+
+def install_certificate(
+ name, cert='', private_key=None, key_type=None, key_passphrase=None, is_ca=False
+):
# Show/install conf commands for certificate
prefix = 'ca' if is_ca else 'certificate'
- base = f"pki {prefix} {name}"
+ base = f'pki {prefix} {name}'
config_paths = []
if cert:
- cert_pem = "".join(encode_certificate(cert).strip().split("\n")[1:-1])
+ cert_pem = ''.join(encode_certificate(cert).strip().split('\n')[1:-1])
config_paths.append(f"{base} certificate '{cert_pem}'")
if private_key:
- key_pem = "".join(encode_private_key(private_key, passphrase=key_passphrase).strip().split("\n")[1:-1])
+ key_pem = ''.join(
+ encode_private_key(private_key, passphrase=key_passphrase)
+ .strip()
+ .split('\n')[1:-1]
+ )
config_paths.append(f"{base} private key '{key_pem}'")
if key_passphrase:
- config_paths.append(f"{base} private password-protected")
+ config_paths.append(f'{base} private password-protected')
install_into_config(conf, config_paths)
+
def install_crl(ca_name, crl):
# Show/install conf commands for crl
- crl_pem = "".join(encode_certificate(crl).strip().split("\n")[1:-1])
+ crl_pem = ''.join(encode_certificate(crl).strip().split('\n')[1:-1])
install_into_config(conf, [f"pki ca {ca_name} crl '{crl_pem}'"])
+
def install_dh_parameters(name, params):
# Show/install conf commands for dh params
- dh_pem = "".join(encode_dh_parameters(params).strip().split("\n")[1:-1])
+ dh_pem = ''.join(encode_dh_parameters(params).strip().split('\n')[1:-1])
install_into_config(conf, [f"pki dh {name} parameters '{dh_pem}'"])
+
def install_ssh_key(name, public_key, private_key, passphrase=None):
# Show/install conf commands for ssh key
- key_openssh = encode_public_key(public_key, encoding='OpenSSH', key_format='OpenSSH')
+ key_openssh = encode_public_key(
+ public_key, encoding='OpenSSH', key_format='OpenSSH'
+ )
username = os.getlogin()
- type_key_split = key_openssh.split(" ")
-
- base = f"system login user {username} authentication public-keys {name}"
- install_into_config(conf, [
- f"{base} key '{type_key_split[1]}'",
- f"{base} type '{type_key_split[0]}'"
- ])
- print(encode_private_key(private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase))
-
-def install_keypair(name, key_type, private_key=None, public_key=None, passphrase=None, prompt=True):
+ type_key_split = key_openssh.split(' ')
+
+ base = f'system login user {username} authentication public-keys {name}'
+ install_into_config(
+ conf,
+ [f"{base} key '{type_key_split[1]}'", f"{base} type '{type_key_split[0]}'"],
+ )
+ print(
+ encode_private_key(
+ private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase
+ )
+ )
+
+
+def install_keypair(
+ name, key_type, private_key=None, public_key=None, passphrase=None, prompt=True
+):
# Show/install conf commands for key-pair
config_paths = []
if public_key:
- install_public_key = not prompt or ask_yes_no('Do you want to install the public key?', default=True)
+ install_public_key = not prompt or ask_yes_no(
+ 'Do you want to install the public key?', default=True
+ )
public_key_pem = encode_public_key(public_key)
if install_public_key:
- install_public_pem = "".join(public_key_pem.strip().split("\n")[1:-1])
- config_paths.append(f"pki key-pair {name} public key '{install_public_pem}'")
+ install_public_pem = ''.join(public_key_pem.strip().split('\n')[1:-1])
+ config_paths.append(
+ f"pki key-pair {name} public key '{install_public_pem}'"
+ )
else:
- print("Public key:")
+ print('Public key:')
print(public_key_pem)
if private_key:
- install_private_key = not prompt or ask_yes_no('Do you want to install the private key?', default=True)
+ install_private_key = not prompt or ask_yes_no(
+ 'Do you want to install the private key?', default=True
+ )
private_key_pem = encode_private_key(private_key, passphrase=passphrase)
if install_private_key:
- install_private_pem = "".join(private_key_pem.strip().split("\n")[1:-1])
- config_paths.append(f"pki key-pair {name} private key '{install_private_pem}'")
+ install_private_pem = ''.join(private_key_pem.strip().split('\n')[1:-1])
+ config_paths.append(
+ f"pki key-pair {name} private key '{install_private_pem}'"
+ )
if passphrase:
- config_paths.append(f"pki key-pair {name} private password-protected")
+ config_paths.append(f'pki key-pair {name} private password-protected')
else:
- print("Private key:")
+ print('Private key:')
print(private_key_pem)
install_into_config(conf, config_paths)
+
def install_openvpn_key(name, key_data, key_version='1'):
config_paths = [
f"pki openvpn shared-secret {name} key '{key_data}'",
- f"pki openvpn shared-secret {name} version '{key_version}'"
+ f"pki openvpn shared-secret {name} version '{key_version}'",
]
install_into_config(conf, config_paths)
+
def install_wireguard_key(interface, private_key, public_key):
# Show conf commands for installing wireguard key pairs
from vyos.ifconfig import Section
+
if Section.section(interface) != 'wireguard':
print(f'"{interface}" is not a WireGuard interface name!')
exit(1)
# Check if we are running in a config session - if yes, we can directly write to the CLI
- install_into_config(conf, [f"interfaces wireguard {interface} private-key '{private_key}'"])
+ install_into_config(
+ conf, [f"interfaces wireguard {interface} private-key '{private_key}'"]
+ )
print(f"Corresponding public-key to use on peer system is: '{public_key}'")
+
def install_wireguard_psk(interface, peer, psk):
from vyos.ifconfig import Section
+
if Section.section(interface) != 'wireguard':
print(f'"{interface}" is not a WireGuard interface name!')
exit(1)
# Check if we are running in a config session - if yes, we can directly write to the CLI
- install_into_config(conf, [f"interfaces wireguard {interface} peer {peer} preshared-key '{psk}'"])
+ install_into_config(
+ conf, [f"interfaces wireguard {interface} peer {peer} preshared-key '{psk}'"]
+ )
+
def ask_passphrase():
passphrase = None
- print("Note: If you plan to use the generated key on this router, do not encrypt the private key.")
+ print(
+ 'Note: If you plan to use the generated key on this router, do not encrypt the private key.'
+ )
if ask_yes_no('Do you want to encrypt the private key with a passphrase?'):
passphrase = ask_input('Enter passphrase:')
return passphrase
+
def write_file(filename, contents):
full_path = os.path.join(auth_dir, filename)
directory = os.path.dirname(full_path)
@@ -275,7 +363,9 @@ def write_file(filename, contents):
print('Failed to write file: directory does not exist')
return False
- if os.path.exists(full_path) and not ask_yes_no('Do you want to overwrite the existing file?'):
+ if os.path.exists(full_path) and not ask_yes_no(
+ 'Do you want to overwrite the existing file?'
+ ):
return False
with open(full_path, 'w') as f:
@@ -283,10 +373,14 @@ def write_file(filename, contents):
print(f'File written to {full_path}')
-# Generation functions
+# Generation functions
def generate_private_key():
- key_type = ask_input('Enter private key type: [rsa, dsa, ec]', default='rsa', valid_responses=['rsa', 'dsa', 'ec'])
+ key_type = ask_input(
+ 'Enter private key type: [rsa, dsa, ec]',
+ default='rsa',
+ valid_responses=['rsa', 'dsa', 'ec'],
+ )
size_valid = []
size_default = 0
@@ -298,28 +392,43 @@ def generate_private_key():
size_default = 256
size_valid = [224, 256, 384, 521]
- size = ask_input('Enter private key bits:', default=size_default, numeric_only=True, valid_responses=size_valid)
+ size = ask_input(
+ 'Enter private key bits:',
+ default=size_default,
+ numeric_only=True,
+ valid_responses=size_valid,
+ )
return create_private_key(key_type, size), key_type
+
def parse_san_string(san_string):
if not san_string:
return None
output = []
- san_split = san_string.strip().split(",")
+ san_split = san_string.strip().split(',')
for pair_str in san_split:
- tag, value = pair_str.strip().split(":", 1)
+ tag, value = pair_str.strip().split(':', 1)
if tag == 'ipv4':
output.append(ipaddress.IPv4Address(value))
elif tag == 'ipv6':
output.append(ipaddress.IPv6Address(value))
elif tag == 'dns' or tag == 'rfc822':
output.append(value)
- return output
-
-def generate_certificate_request(private_key=None, key_type=None, return_request=False, name=None, install=False, file=False, ask_san=True):
+ return
+
+
+def generate_certificate_request(
+ private_key=None,
+ key_type=None,
+ return_request=False,
+ name=None,
+ install=False,
+ file=False,
+ ask_san=True,
+):
if not private_key:
private_key, key_type = generate_private_key()
@@ -328,18 +437,24 @@ def generate_certificate_request(private_key=None, key_type=None, return_request
while True:
country = ask_input('Enter country code:', default=default_values['country'])
if len(country) != 2:
- print("Country name must be a 2 character country code")
+ print('Country name must be a 2 character country code')
continue
subject['country'] = country
break
subject['state'] = ask_input('Enter state:', default=default_values['state'])
- subject['locality'] = ask_input('Enter locality:', default=default_values['locality'])
- subject['organization'] = ask_input('Enter organization name:', default=default_values['organization'])
+ subject['locality'] = ask_input(
+ 'Enter locality:', default=default_values['locality']
+ )
+ subject['organization'] = ask_input(
+ 'Enter organization name:', default=default_values['organization']
+ )
subject['common_name'] = ask_input('Enter common name:', default='vyos.io')
subject_alt_names = None
if ask_san and ask_yes_no('Do you want to configure Subject Alternative Names?'):
- print("Enter alternative names in a comma separate list, example: ipv4:1.1.1.1,ipv6:fe80::1,dns:vyos.net,rfc822:user@vyos.net")
+ print(
+ 'Enter alternative names in a comma separate list, example: ipv4:1.1.1.1,ipv6:fe80::1,dns:vyos.net,rfc822:user@vyos.net'
+ )
san_string = ask_input('Enter Subject Alternative Names:')
subject_alt_names = parse_san_string(san_string)
@@ -356,24 +471,48 @@ def generate_certificate_request(private_key=None, key_type=None, return_request
return None
if install:
- print("Certificate request:")
- print(encode_certificate(cert_req) + "\n")
- install_certificate(name, private_key=private_key, key_type=key_type, key_passphrase=passphrase, is_ca=False)
+ print('Certificate request:')
+ print(encode_certificate(cert_req) + '\n')
+ install_certificate(
+ name,
+ private_key=private_key,
+ key_type=key_type,
+ key_passphrase=passphrase,
+ is_ca=False,
+ )
if file:
write_file(f'{name}.csr', encode_certificate(cert_req))
- write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase))
-
-def generate_certificate(cert_req, ca_cert, ca_private_key, is_ca=False, is_sub_ca=False):
- valid_days = ask_input('Enter how many days certificate will be valid:', default='365' if not is_ca else '1825', numeric_only=True)
+ write_file(
+ f'{name}.key', encode_private_key(private_key, passphrase=passphrase)
+ )
+
+
+def generate_certificate(
+ cert_req, ca_cert, ca_private_key, is_ca=False, is_sub_ca=False
+):
+ valid_days = ask_input(
+ 'Enter how many days certificate will be valid:',
+ default='365' if not is_ca else '1825',
+ numeric_only=True,
+ )
cert_type = None
if not is_ca:
- cert_type = ask_input('Enter certificate type: (client, server)', default='server', valid_responses=['client', 'server'])
- return create_certificate(cert_req, ca_cert, ca_private_key, valid_days, cert_type, is_ca, is_sub_ca)
+ cert_type = ask_input(
+ 'Enter certificate type: (client, server)',
+ default='server',
+ valid_responses=['client', 'server'],
+ )
+ return create_certificate(
+ cert_req, ca_cert, ca_private_key, valid_days, cert_type, is_ca, is_sub_ca
+ )
+
def generate_ca_certificate(name, install=False, file=False):
private_key, key_type = generate_private_key()
- cert_req = generate_certificate_request(private_key, key_type, return_request=True, ask_san=False)
+ cert_req = generate_certificate_request(
+ private_key, key_type, return_request=True, ask_san=False
+ )
cert = generate_certificate(cert_req, cert_req, private_key, is_ca=True)
passphrase = ask_passphrase()
@@ -383,11 +522,16 @@ def generate_ca_certificate(name, install=False, file=False):
return None
if install:
- install_certificate(name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=True)
+ install_certificate(
+ name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=True
+ )
if file:
write_file(f'{name}.pem', encode_certificate(cert))
- write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase))
+ write_file(
+ f'{name}.key', encode_private_key(private_key, passphrase=passphrase)
+ )
+
def generate_ca_certificate_sign(name, ca_name, install=False, file=False):
ca_dict = get_config_ca_certificate(ca_name)
@@ -399,17 +543,19 @@ def generate_ca_certificate_sign(name, ca_name, install=False, file=False):
ca_cert = load_certificate(ca_dict['certificate'])
if not ca_cert:
- print("Failed to load signing CA certificate, aborting")
+ print('Failed to load signing CA certificate, aborting')
return None
ca_private = ca_dict['private']
ca_private_passphrase = None
if 'password_protected' in ca_private:
ca_private_passphrase = ask_input('Enter signing CA private key passphrase:')
- ca_private_key = load_private_key(ca_private['key'], passphrase=ca_private_passphrase)
+ ca_private_key = load_private_key(
+ ca_private['key'], passphrase=ca_private_passphrase
+ )
if not ca_private_key:
- print("Failed to load signing CA private key, aborting")
+ print('Failed to load signing CA private key, aborting')
return None
private_key = None
@@ -418,9 +564,11 @@ def generate_ca_certificate_sign(name, ca_name, install=False, file=False):
cert_req = None
if not ask_yes_no('Do you already have a certificate request?'):
private_key, key_type = generate_private_key()
- cert_req = generate_certificate_request(private_key, key_type, return_request=True, ask_san=False)
+ cert_req = generate_certificate_request(
+ private_key, key_type, return_request=True, ask_san=False
+ )
else:
- print("Paste certificate request and press enter:")
+ print('Paste certificate request and press enter:')
lines = []
curr_line = ''
while True:
@@ -430,17 +578,21 @@ def generate_ca_certificate_sign(name, ca_name, install=False, file=False):
lines.append(curr_line)
if not lines:
- print("Aborted")
+ print('Aborted')
return None
- wrap = lines[0].find('-----') < 0 # Only base64 pasted, add the CSR tags for parsing
- cert_req = load_certificate_request("\n".join(lines), wrap)
+ wrap = (
+ lines[0].find('-----') < 0
+ ) # Only base64 pasted, add the CSR tags for parsing
+ cert_req = load_certificate_request('\n'.join(lines), wrap)
if not cert_req:
- print("Invalid certificate request")
+ print('Invalid certificate request')
return None
- cert = generate_certificate(cert_req, ca_cert, ca_private_key, is_ca=True, is_sub_ca=True)
+ cert = generate_certificate(
+ cert_req, ca_cert, ca_private_key, is_ca=True, is_sub_ca=True
+ )
passphrase = None
if private_key is not None:
@@ -453,12 +605,17 @@ def generate_ca_certificate_sign(name, ca_name, install=False, file=False):
return None
if install:
- install_certificate(name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=True)
+ install_certificate(
+ name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=True
+ )
if file:
write_file(f'{name}.pem', encode_certificate(cert))
if private_key is not None:
- write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase))
+ write_file(
+ f'{name}.key', encode_private_key(private_key, passphrase=passphrase)
+ )
+
def generate_certificate_sign(name, ca_name, install=False, file=False):
ca_dict = get_config_ca_certificate(ca_name)
@@ -470,17 +627,19 @@ def generate_certificate_sign(name, ca_name, install=False, file=False):
ca_cert = load_certificate(ca_dict['certificate'])
if not ca_cert:
- print("Failed to load CA certificate, aborting")
+ print('Failed to load CA certificate, aborting')
return None
ca_private = ca_dict['private']
ca_private_passphrase = None
if 'password_protected' in ca_private:
ca_private_passphrase = ask_input('Enter CA private key passphrase:')
- ca_private_key = load_private_key(ca_private['key'], passphrase=ca_private_passphrase)
+ ca_private_key = load_private_key(
+ ca_private['key'], passphrase=ca_private_passphrase
+ )
if not ca_private_key:
- print("Failed to load CA private key, aborting")
+ print('Failed to load CA private key, aborting')
return None
private_key = None
@@ -489,9 +648,11 @@ def generate_certificate_sign(name, ca_name, install=False, file=False):
cert_req = None
if not ask_yes_no('Do you already have a certificate request?'):
private_key, key_type = generate_private_key()
- cert_req = generate_certificate_request(private_key, key_type, return_request=True)
+ cert_req = generate_certificate_request(
+ private_key, key_type, return_request=True
+ )
else:
- print("Paste certificate request and press enter:")
+ print('Paste certificate request and press enter:')
lines = []
curr_line = ''
while True:
@@ -501,18 +662,20 @@ def generate_certificate_sign(name, ca_name, install=False, file=False):
lines.append(curr_line)
if not lines:
- print("Aborted")
+ print('Aborted')
return None
- wrap = lines[0].find('-----') < 0 # Only base64 pasted, add the CSR tags for parsing
- cert_req = load_certificate_request("\n".join(lines), wrap)
+ wrap = (
+ lines[0].find('-----') < 0
+ ) # Only base64 pasted, add the CSR tags for parsing
+ cert_req = load_certificate_request('\n'.join(lines), wrap)
if not cert_req:
- print("Invalid certificate request")
+ print('Invalid certificate request')
return None
cert = generate_certificate(cert_req, ca_cert, ca_private_key, is_ca=False)
-
+
passphrase = None
if private_key is not None:
passphrase = ask_passphrase()
@@ -524,12 +687,17 @@ def generate_certificate_sign(name, ca_name, install=False, file=False):
return None
if install:
- install_certificate(name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=False)
+ install_certificate(
+ name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=False
+ )
if file:
write_file(f'{name}.pem', encode_certificate(cert))
if private_key is not None:
- write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase))
+ write_file(
+ f'{name}.key', encode_private_key(private_key, passphrase=passphrase)
+ )
+
def generate_certificate_selfsign(name, install=False, file=False):
private_key, key_type = generate_private_key()
@@ -543,11 +711,21 @@ def generate_certificate_selfsign(name, install=False, file=False):
return None
if install:
- install_certificate(name, cert, private_key=private_key, key_type=key_type, key_passphrase=passphrase, is_ca=False)
+ install_certificate(
+ name,
+ cert,
+ private_key=private_key,
+ key_type=key_type,
+ key_passphrase=passphrase,
+ is_ca=False,
+ )
if file:
write_file(f'{name}.pem', encode_certificate(cert))
- write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase))
+ write_file(
+ f'{name}.key', encode_private_key(private_key, passphrase=passphrase)
+ )
+
def generate_certificate_revocation_list(ca_name, install=False, file=False):
ca_dict = get_config_ca_certificate(ca_name)
@@ -559,17 +737,19 @@ def generate_certificate_revocation_list(ca_name, install=False, file=False):
ca_cert = load_certificate(ca_dict['certificate'])
if not ca_cert:
- print("Failed to load CA certificate, aborting")
+ print('Failed to load CA certificate, aborting')
return None
ca_private = ca_dict['private']
ca_private_passphrase = None
if 'password_protected' in ca_private:
ca_private_passphrase = ask_input('Enter CA private key passphrase:')
- ca_private_key = load_private_key(ca_private['key'], passphrase=ca_private_passphrase)
+ ca_private_key = load_private_key(
+ ca_private['key'], passphrase=ca_private_passphrase
+ )
if not ca_private_key:
- print("Failed to load CA private key, aborting")
+ print('Failed to load CA private key, aborting')
return None
revoked_certs = get_config_revoked_certificates()
@@ -590,13 +770,13 @@ def generate_certificate_revocation_list(ca_name, install=False, file=False):
continue
if not to_revoke:
- print("No revoked certificates to add to the CRL")
+ print('No revoked certificates to add to the CRL')
return None
crl = create_certificate_revocation_list(ca_cert, ca_private_key, to_revoke)
if not crl:
- print("Failed to create CRL")
+ print('Failed to create CRL')
return None
if not install and not file:
@@ -607,7 +787,8 @@ def generate_certificate_revocation_list(ca_name, install=False, file=False):
install_crl(ca_name, crl)
if file:
- write_file(f'{name}.crl', encode_certificate(crl))
+ write_file(f'{ca_name}.crl', encode_certificate(crl))
+
def generate_ssh_keypair(name, install=False, file=False):
private_key, key_type = generate_private_key()
@@ -616,29 +797,42 @@ def generate_ssh_keypair(name, install=False, file=False):
if not install and not file:
print(encode_public_key(public_key, encoding='OpenSSH', key_format='OpenSSH'))
- print("")
- print(encode_private_key(private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase))
+ print('')
+ print(
+ encode_private_key(
+ private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase
+ )
+ )
return None
if install:
install_ssh_key(name, public_key, private_key, passphrase)
if file:
- write_file(f'{name}.pem', encode_public_key(public_key, encoding='OpenSSH', key_format='OpenSSH'))
- write_file(f'{name}.key', encode_private_key(private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase))
+ write_file(
+ f'{name}.pem',
+ encode_public_key(public_key, encoding='OpenSSH', key_format='OpenSSH'),
+ )
+ write_file(
+ f'{name}.key',
+ encode_private_key(
+ private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase
+ ),
+ )
+
def generate_dh_parameters(name, install=False, file=False):
bits = ask_input('Enter DH parameters key size:', default=2048, numeric_only=True)
- print("Generating parameters...")
+ print('Generating parameters...')
dh_params = create_dh_parameters(bits)
if not dh_params:
- print("Failed to create DH parameters")
+ print('Failed to create DH parameters')
return None
if not install and not file:
- print("DH Parameters:")
+ print('DH Parameters:')
print(encode_dh_parameters(dh_params))
if install:
@@ -647,6 +841,7 @@ def generate_dh_parameters(name, install=False, file=False):
if file:
write_file(f'{name}.pem', encode_dh_parameters(dh_params))
+
def generate_keypair(name, install=False, file=False):
private_key, key_type = generate_private_key()
public_key = private_key.public_key()
@@ -654,7 +849,7 @@ def generate_keypair(name, install=False, file=False):
if not install and not file:
print(encode_public_key(public_key))
- print("")
+ print('')
print(encode_private_key(private_key, passphrase=passphrase))
return None
@@ -663,13 +858,16 @@ def generate_keypair(name, install=False, file=False):
if file:
write_file(f'{name}.pem', encode_public_key(public_key))
- write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase))
+ write_file(
+ f'{name}.key', encode_private_key(private_key, passphrase=passphrase)
+ )
+
def generate_openvpn_key(name, install=False, file=False):
result = cmd('openvpn --genkey secret /dev/stdout | grep -o "^[^#]*"')
if not result:
- print("Failed to generate OpenVPN key")
+ print('Failed to generate OpenVPN key')
return None
if not install and not file:
@@ -677,11 +875,13 @@ def generate_openvpn_key(name, install=False, file=False):
return None
if install:
- key_lines = result.split("\n")
- key_data = "".join(key_lines[1:-1]) # Remove wrapper tags and line endings
+ key_lines = result.split('\n')
+ key_data = ''.join(key_lines[1:-1]) # Remove wrapper tags and line endings
key_version = '1'
- version_search = re.search(r'BEGIN OpenVPN Static key V(\d+)', result) # Future-proofing (hopefully)
+ version_search = re.search(
+ r'BEGIN OpenVPN Static key V(\d+)', result
+ ) # Future-proofing (hopefully)
if version_search:
key_version = version_search[1]
@@ -690,6 +890,7 @@ def generate_openvpn_key(name, install=False, file=False):
if file:
write_file(f'{name}.key', result)
+
def generate_wireguard_key(interface=None, install=False):
private_key = cmd('wg genkey')
public_key = cmd('wg pubkey', input=private_key)
@@ -700,6 +901,7 @@ def generate_wireguard_key(interface=None, install=False):
print(f'Private key: {private_key}')
print(f'Public key: {public_key}', end='\n\n')
+
def generate_wireguard_psk(interface=None, peer=None, install=False):
psk = cmd('wg genpsk')
if interface and peer and install:
@@ -707,8 +909,11 @@ def generate_wireguard_psk(interface=None, peer=None, install=False):
else:
print(f'Pre-shared key: {psk}')
+
# Import functions
-def import_ca_certificate(name, path=None, key_path=None, no_prompt=False, passphrase=None):
+def import_ca_certificate(
+ name, path=None, key_path=None, no_prompt=False, passphrase=None
+):
if path:
if not os.path.exists(path):
print(f'File not found: {path}')
@@ -745,7 +950,10 @@ def import_ca_certificate(name, path=None, key_path=None, no_prompt=False, passp
install_certificate(name, private_key=key, is_ca=True)
-def import_certificate(name, path=None, key_path=None, no_prompt=False, passphrase=None):
+
+def import_certificate(
+ name, path=None, key_path=None, no_prompt=False, passphrase=None
+):
if path:
if not os.path.exists(path):
print(f'File not found: {path}')
@@ -782,6 +990,7 @@ def import_certificate(name, path=None, key_path=None, no_prompt=False, passphra
install_certificate(name, private_key=key, is_ca=False)
+
def import_crl(name, path):
if not os.path.exists(path):
print(f'File not found: {path}')
@@ -799,6 +1008,7 @@ def import_crl(name, path):
install_crl(name, crl)
+
def import_dh_parameters(name, path):
if not os.path.exists(path):
print(f'File not found: {path}')
@@ -816,6 +1026,7 @@ def import_dh_parameters(name, path):
install_dh_parameters(name, dh)
+
def import_keypair(name, path=None, key_path=None, no_prompt=False, passphrase=None):
if path:
if not os.path.exists(path):
@@ -853,6 +1064,7 @@ def import_keypair(name, path=None, key_path=None, no_prompt=False, passphrase=N
install_keypair(name, None, private_key=key, prompt=False)
+
def import_openvpn_secret(name, path):
if not os.path.exists(path):
print(f'File not found: {path}')
@@ -862,19 +1074,134 @@ def import_openvpn_secret(name, path):
key_version = '1'
with open(path) as f:
- key_lines = f.read().strip().split("\n")
- key_lines = list(filter(lambda line: not line.strip().startswith('#'), key_lines)) # Remove commented lines
- key_data = "".join(key_lines[1:-1]) # Remove wrapper tags and line endings
-
- version_search = re.search(r'BEGIN OpenVPN Static key V(\d+)', key_lines[0]) # Future-proofing (hopefully)
+ key_lines = f.read().strip().split('\n')
+ key_lines = list(
+ filter(lambda line: not line.strip().startswith('#'), key_lines)
+ ) # Remove commented lines
+ key_data = ''.join(key_lines[1:-1]) # Remove wrapper tags and line endings
+
+ version_search = re.search(
+ r'BEGIN OpenVPN Static key V(\d+)', key_lines[0]
+ ) # Future-proofing (hopefully)
if version_search:
key_version = version_search[1]
install_openvpn_key(name, key_data, key_version)
-# Show functions
-def show_certificate_authority(name=None, pem=False):
- headers = ['Name', 'Subject', 'Issuer CN', 'Issued', 'Expiry', 'Private Key', 'Parent']
+
+def generate_pki(
+ raw: bool,
+ pki_type: ArgsPkiTypeGen,
+ name: typing.Optional[str],
+ file: typing.Optional[bool],
+ install: typing.Optional[bool],
+ sign: typing.Optional[str],
+ self_sign: typing.Optional[bool],
+ key: typing.Optional[bool],
+ psk: typing.Optional[bool],
+ interface: typing.Optional[str],
+ peer: typing.Optional[str],
+):
+ try:
+ if pki_type == 'ca':
+ if sign:
+ generate_ca_certificate_sign(name, sign, install=install, file=file)
+ else:
+ generate_ca_certificate(name, install=install, file=file)
+ elif pki_type == 'certificate':
+ if sign:
+ generate_certificate_sign(name, sign, install=install, file=file)
+ elif self_sign:
+ generate_certificate_selfsign(name, install=install, file=file)
+ else:
+ generate_certificate_request(name=name, install=install, file=file)
+
+ elif pki_type == 'crl':
+ generate_certificate_revocation_list(name, install=install, file=file)
+
+ elif pki_type == 'ssh':
+ generate_ssh_keypair(name, install=install, file=file)
+
+ elif pki_type == 'dh':
+ generate_dh_parameters(name, install=install, file=file)
+
+ elif pki_type == 'key-pair':
+ generate_keypair(name, install=install, file=file)
+
+ elif pki_type == 'openvpn':
+ generate_openvpn_key(name, install=install, file=file)
+
+ elif pki_type == 'wireguard':
+ # WireGuard supports writing key directly into the CLI, but this
+ # requires the vyos_libexec_dir environment variable to be set
+ os.environ['vyos_libexec_dir'] = '/usr/libexec/vyos'
+
+ if key:
+ generate_wireguard_key(interface, install=install)
+ if psk:
+ generate_wireguard_psk(interface, peer=peer, install=install)
+ except KeyboardInterrupt:
+ print('Aborted')
+ sys.exit(0)
+
+
+def import_pki(
+ name: str,
+ pki_type: ArgsPkiType,
+ filename: typing.Optional[str],
+ key_filename: typing.Optional[str],
+ no_prompt: typing.Optional[bool],
+ passphrase: typing.Optional[str],
+):
+ try:
+ if pki_type == 'ca':
+ import_ca_certificate(
+ name,
+ path=filename,
+ key_path=key_filename,
+ no_prompt=no_prompt,
+ passphrase=passphrase,
+ )
+ elif pki_type == 'certificate':
+ import_certificate(
+ name,
+ path=filename,
+ key_path=key_filename,
+ no_prompt=no_prompt,
+ passphrase=passphrase,
+ )
+ elif pki_type == 'crl':
+ import_crl(name, filename)
+ elif pki_type == 'dh':
+ import_dh_parameters(name, filename)
+ elif pki_type == 'key-pair':
+ import_keypair(
+ name,
+ path=filename,
+ key_path=key_filename,
+ no_prompt=no_prompt,
+ passphrase=passphrase,
+ )
+ elif pki_type == 'openvpn':
+ import_openvpn_secret(name, filename)
+ except KeyboardInterrupt:
+ print('Aborted')
+ sys.exit(0)
+
+
+@_verify('ca')
+def show_certificate_authority(
+ raw: bool, name: typing.Optional[str] = None, pem: typing.Optional[bool] = False
+):
+ headers = [
+ 'Name',
+ 'Subject',
+ 'Issuer CN',
+ 'Issued',
+ 'Expiry',
+ 'Private Key',
+ 'Parent',
+ ]
data = []
certs = get_config_ca_certificate()
if certs:
@@ -891,7 +1218,7 @@ def show_certificate_authority(name=None, pem=False):
return
parent_ca_name = get_certificate_ca(cert, certs)
- cert_issuer_cn = cert.issuer.rfc4514_string().split(",")[0]
+ cert_issuer_cn = cert.issuer.rfc4514_string().split(',')[0]
if not parent_ca_name or parent_ca_name == cert_name:
parent_ca_name = 'N/A'
@@ -899,14 +1226,45 @@ def show_certificate_authority(name=None, pem=False):
if not cert:
continue
- have_private = 'Yes' if 'private' in cert_dict and 'key' in cert_dict['private'] else 'No'
- data.append([cert_name, cert.subject.rfc4514_string(), cert_issuer_cn, cert.not_valid_before, cert.not_valid_after, have_private, parent_ca_name])
-
- print("Certificate Authorities:")
+ have_private = (
+ 'Yes'
+ if 'private' in cert_dict and 'key' in cert_dict['private']
+ else 'No'
+ )
+ data.append(
+ [
+ cert_name,
+ cert.subject.rfc4514_string(),
+ cert_issuer_cn,
+ cert.not_valid_before,
+ cert.not_valid_after,
+ have_private,
+ parent_ca_name,
+ ]
+ )
+
+ print('Certificate Authorities:')
print(tabulate.tabulate(data, headers))
-def show_certificate(name=None, pem=False, fingerprint_hash=None):
- headers = ['Name', 'Type', 'Subject CN', 'Issuer CN', 'Issued', 'Expiry', 'Revoked', 'Private Key', 'CA Present']
+
+@_verify('certificate')
+def show_certificate(
+ raw: bool,
+ name: typing.Optional[str] = None,
+ pem: typing.Optional[bool] = False,
+ fingerprint: typing.Optional[ArgsFingerprint] = None,
+):
+ headers = [
+ 'Name',
+ 'Type',
+ 'Subject CN',
+ 'Issuer CN',
+ 'Issued',
+ 'Expiry',
+ 'Revoked',
+ 'Private Key',
+ 'CA Present',
+ ]
data = []
certs = get_config_certificate()
if certs:
@@ -926,13 +1284,13 @@ def show_certificate(name=None, pem=False, fingerprint_hash=None):
if name and pem:
print(encode_certificate(cert))
return
- elif name and fingerprint_hash:
- print(get_certificate_fingerprint(cert, fingerprint_hash))
+ elif name and fingerprint:
+ print(get_certificate_fingerprint(cert, fingerprint))
return
ca_name = get_certificate_ca(cert, ca_certs)
- cert_subject_cn = cert.subject.rfc4514_string().split(",")[0]
- cert_issuer_cn = cert.issuer.rfc4514_string().split(",")[0]
+ cert_subject_cn = cert.subject.rfc4514_string().split(',')[0]
+ cert_issuer_cn = cert.issuer.rfc4514_string().split(',')[0]
cert_type = 'Unknown'
try:
@@ -941,21 +1299,37 @@ def show_certificate(name=None, pem=False, fingerprint_hash=None):
cert_type = 'Server'
elif ext and ExtendedKeyUsageOID.CLIENT_AUTH in ext.value:
cert_type = 'Client'
- except:
+ except Exception:
pass
revoked = 'Yes' if 'revoke' in cert_dict else 'No'
- have_private = 'Yes' if 'private' in cert_dict and 'key' in cert_dict['private'] else 'No'
+ have_private = (
+ 'Yes'
+ if 'private' in cert_dict and 'key' in cert_dict['private']
+ else 'No'
+ )
have_ca = f'Yes ({ca_name})' if ca_name else 'No'
- data.append([
- cert_name, cert_type, cert_subject_cn, cert_issuer_cn,
- cert.not_valid_before, cert.not_valid_after,
- revoked, have_private, have_ca])
-
- print("Certificates:")
+ data.append(
+ [
+ cert_name,
+ cert_type,
+ cert_subject_cn,
+ cert_issuer_cn,
+ cert.not_valid_before,
+ cert.not_valid_after,
+ revoked,
+ have_private,
+ have_ca,
+ ]
+ )
+
+ print('Certificates:')
print(tabulate.tabulate(data, headers))
-def show_crl(name=None, pem=False):
+
+def show_crl(
+ raw: bool, name: typing.Optional[str] = None, pem: typing.Optional[bool] = False
+):
headers = ['CA Name', 'Updated', 'Revokes']
data = []
certs = get_config_ca_certificate()
@@ -980,141 +1354,31 @@ def show_crl(name=None, pem=False):
print(encode_certificate(crl))
continue
- certs = get_revoked_by_serial_numbers([revoked.serial_number for revoked in crl])
- data.append([cert_name, crl.last_update, ", ".join(certs)])
+ certs = get_revoked_by_serial_numbers(
+ [revoked.serial_number for revoked in crl]
+ )
+ data.append([cert_name, crl.last_update, ', '.join(certs)])
if name and pem:
return
- print("Certificate Revocation Lists:")
+ print('Certificate Revocation Lists:')
print(tabulate.tabulate(data, headers))
-if __name__ == '__main__':
- parser = argparse.ArgumentParser()
- parser.add_argument('--action', help='PKI action', required=True)
-
- # X509
- parser.add_argument('--ca', help='Certificate Authority', required=False)
- parser.add_argument('--certificate', help='Certificate', required=False)
- parser.add_argument('--crl', help='Certificate Revocation List', required=False)
- parser.add_argument('--sign', help='Sign certificate with specified CA', required=False)
- parser.add_argument('--self-sign', help='Self-sign the certificate', action='store_true')
- parser.add_argument('--pem', help='Output using PEM encoding', action='store_true')
- parser.add_argument('--fingerprint', help='Show fingerprint and exit', action='store')
- # SSH
- parser.add_argument('--ssh', help='SSH Key', required=False)
+def show_all(raw: bool):
+ show_certificate_authority(raw)
+ print('\n')
+ show_certificate(raw)
+ print('\n')
+ show_crl(raw)
- # DH
- parser.add_argument('--dh', help='DH Parameters', required=False)
-
- # Key pair
- parser.add_argument('--keypair', help='Key pair', required=False)
-
- # OpenVPN
- parser.add_argument('--openvpn', help='OpenVPN TLS key', required=False)
-
- # WireGuard
- parser.add_argument('--wireguard', help='Wireguard', action='store_true')
- group = parser.add_mutually_exclusive_group()
- group.add_argument('--key', help='Wireguard key pair', action='store_true', required=False)
- group.add_argument('--psk', help='Wireguard pre shared key', action='store_true', required=False)
- parser.add_argument('--interface', help='Install generated keys into running-config for named interface', action='store')
- parser.add_argument('--peer', help='Install generated keys into running-config for peer', action='store')
-
- # Global
- parser.add_argument('--file', help='Write generated keys into specified filename', action='store_true')
- parser.add_argument('--install', help='Install generated keys into running-config', action='store_true')
-
- parser.add_argument('--filename', help='Write certificate into specified filename', action='store')
- parser.add_argument('--key-filename', help='Write key into specified filename', action='store')
-
- parser.add_argument('--no-prompt', action='store_true', help='Perform action non-interactively')
- parser.add_argument('--passphrase', help='A passphrase to decrypt the private key')
-
- args = parser.parse_args()
+if __name__ == '__main__':
try:
- if args.action == 'generate':
- if args.ca:
- if args.sign:
- generate_ca_certificate_sign(args.ca, args.sign, install=args.install, file=args.file)
- else:
- generate_ca_certificate(args.ca, install=args.install, file=args.file)
- elif args.certificate:
- if args.sign:
- generate_certificate_sign(args.certificate, args.sign, install=args.install, file=args.file)
- elif args.self_sign:
- generate_certificate_selfsign(args.certificate, install=args.install, file=args.file)
- else:
- generate_certificate_request(name=args.certificate, install=args.install, file=args.file)
-
- elif args.crl:
- generate_certificate_revocation_list(args.crl, install=args.install, file=args.file)
-
- elif args.ssh:
- generate_ssh_keypair(args.ssh, install=args.install, file=args.file)
-
- elif args.dh:
- generate_dh_parameters(args.dh, install=args.install, file=args.file)
-
- elif args.keypair:
- generate_keypair(args.keypair, install=args.install, file=args.file)
-
- elif args.openvpn:
- generate_openvpn_key(args.openvpn, install=args.install, file=args.file)
-
- elif args.wireguard:
- # WireGuard supports writing key directly into the CLI, but this
- # requires the vyos_libexec_dir environment variable to be set
- os.environ["vyos_libexec_dir"] = "/usr/libexec/vyos"
-
- if args.key:
- generate_wireguard_key(args.interface, install=args.install)
- if args.psk:
- generate_wireguard_psk(args.interface, peer=args.peer, install=args.install)
- elif args.action == 'import':
- if args.ca:
- import_ca_certificate(args.ca, path=args.filename, key_path=args.key_filename,
- no_prompt=args.no_prompt, passphrase=args.passphrase)
- elif args.certificate:
- import_certificate(args.certificate, path=args.filename, key_path=args.key_filename,
- no_prompt=args.no_prompt, passphrase=args.passphrase)
- elif args.crl:
- import_crl(args.crl, args.filename)
- elif args.dh:
- import_dh_parameters(args.dh, args.filename)
- elif args.keypair:
- import_keypair(args.keypair, path=args.filename, key_path=args.key_filename,
- no_prompt=args.no_prompt, passphrase=args.passphrase)
- elif args.openvpn:
- import_openvpn_secret(args.openvpn, args.filename)
- elif args.action == 'show':
- if args.ca:
- ca_name = None if args.ca == 'all' else args.ca
- if ca_name:
- if not conf.exists(['pki', 'ca', ca_name]):
- print(f'CA "{ca_name}" does not exist!')
- exit(1)
- show_certificate_authority(ca_name, args.pem)
- elif args.certificate:
- cert_name = None if args.certificate == 'all' else args.certificate
- if cert_name:
- if not conf.exists(['pki', 'certificate', cert_name]):
- print(f'Certificate "{cert_name}" does not exist!')
- exit(1)
- if args.fingerprint is None:
- show_certificate(None if args.certificate == 'all' else args.certificate, args.pem)
- else:
- show_certificate(args.certificate, fingerprint_hash=args.fingerprint)
- elif args.crl:
- show_crl(None if args.crl == 'all' else args.crl, args.pem)
- else:
- show_certificate_authority()
- print('\n')
- show_certificate()
- print('\n')
- show_crl()
- except KeyboardInterrupt:
- print("Aborted")
- sys.exit(0)
+ res = vyos.opmode.run(sys.modules[__name__])
+ if res:
+ print(res)
+ except (ValueError, vyos.opmode.Error) as e:
+ print(e)
+ sys.exit(1)
diff --git a/src/op_mode/restart.py b/src/op_mode/restart.py
index a83c8b9d8..3b0031f34 100755
--- a/src/op_mode/restart.py
+++ b/src/op_mode/restart.py
@@ -41,6 +41,10 @@ service_map = {
'systemd_service': 'pdns-recursor',
'path': ['service', 'dns', 'forwarding'],
},
+ 'haproxy': {
+ 'systemd_service': 'haproxy',
+ 'path': ['load-balancing', 'haproxy'],
+ },
'igmp_proxy': {
'systemd_service': 'igmpproxy',
'path': ['protocols', 'igmp-proxy'],
@@ -53,10 +57,6 @@ service_map = {
'systemd_service': 'avahi-daemon',
'path': ['service', 'mdns', 'repeater'],
},
- 'reverse_proxy': {
- 'systemd_service': 'haproxy',
- 'path': ['load-balancing', 'reverse-proxy'],
- },
'router_advert': {
'systemd_service': 'radvd',
'path': ['service', 'router-advert'],
@@ -83,10 +83,10 @@ services = typing.Literal[
'dhcpv6',
'dns_dynamic',
'dns_forwarding',
+ 'haproxy',
'igmp_proxy',
'ipsec',
'mdns_repeater',
- 'reverse_proxy',
'router_advert',
'snmp',
'ssh',
diff --git a/src/services/api/rest/routers.py b/src/services/api/rest/routers.py
index 47d06b7e9..e52c77fda 100644
--- a/src/services/api/rest/routers.py
+++ b/src/services/api/rest/routers.py
@@ -425,9 +425,9 @@ def create_path_import_pki_no_prompt(path):
correct_paths = ['ca', 'certificate', 'key-pair']
if path[1] not in correct_paths:
return False
- path[1] = '--' + path[1].replace('-', '')
path[3] = '--key-filename'
- return path[1:]
+ path.insert(2, '--name')
+ return ['--pki-type'] + path[1:]
@router.post('/configure')