From 90a4827284acd3cb072cdfeef323c522802c6449 Mon Sep 17 00:00:00 2001 From: sarthurdev <965089+sarthurdev@users.noreply.github.com> Date: Wed, 9 Oct 2024 14:55:11 +0200 Subject: haproxy: T6745: Rename `reverse-proxy` to `haproxy` --- src/op_mode/load-balancing_haproxy.py | 237 ++++++++++++++++++++++++++++++++++ src/op_mode/restart.py | 10 +- src/op_mode/reverseproxy.py | 237 ---------------------------------- 3 files changed, 242 insertions(+), 242 deletions(-) create mode 100755 src/op_mode/load-balancing_haproxy.py delete mode 100755 src/op_mode/reverseproxy.py (limited to 'src/op_mode') diff --git a/src/op_mode/load-balancing_haproxy.py b/src/op_mode/load-balancing_haproxy.py new file mode 100755 index 000000000..ae6734e16 --- /dev/null +++ b/src/op_mode/load-balancing_haproxy.py @@ -0,0 +1,237 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2023-2024 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import json +import socket +import sys + +from tabulate import tabulate +from vyos.configquery import ConfigTreeQuery + +import vyos.opmode + +socket_path = '/run/haproxy/admin.sock' +timeout = 5 + + +def _execute_haproxy_command(command): + """Execute a command on the HAProxy UNIX socket and retrieve the response. + + Args: + command (str): The command to be executed. + + Returns: + str: The response received from the HAProxy UNIX socket. + + Raises: + socket.error: If there is an error while connecting or communicating with the socket. + + Finally: + Closes the socket connection. + + Notes: + - HAProxy expects a newline character at the end of the command. + - The socket connection is established using the HAProxy UNIX socket. + - The response from the socket is received and decoded. + + Example: + response = _execute_haproxy_command('show stat') + print(response) + """ + try: + # HAProxy expects new line for command + command = f'{command}\n' + + # Connect to the HAProxy UNIX socket + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.connect(socket_path) + + # Set the socket timeout + sock.settimeout(timeout) + + # Send the command + sock.sendall(command.encode()) + + # Receive and decode the response + response = b'' + while True: + data = sock.recv(4096) + if not data: + break + response += data + response = response.decode() + + return (response) + + except socket.error as e: + print(f"Error: {e}") + + finally: + # Close the socket + sock.close() + + +def _convert_seconds(seconds): + """Convert seconds to days, hours, minutes, and seconds. + + Args: + seconds (int): The number of seconds to convert. + + Returns: + tuple: A tuple containing the number of days, hours, minutes, and seconds. + """ + minutes = seconds // 60 + hours = minutes // 60 + days = hours // 24 + + return days, hours % 24, minutes % 60, seconds % 60 + + +def _last_change_format(seconds): + """Format the time components into a string representation. + + Args: + seconds (int): The total number of seconds. + + Returns: + str: The formatted time string with days, hours, minutes, and seconds. + + Examples: + >>> _last_change_format(1434) + '23m54s' + >>> _last_change_format(93734) + '1d0h23m54s' + >>> _last_change_format(85434) + '23h23m54s' + """ + days, hours, minutes, seconds = _convert_seconds(seconds) + time_format = "" + + if days: + time_format += f"{days}d" + if hours: + time_format += f"{hours}h" + if minutes: + time_format += f"{minutes}m" + if seconds: + time_format += f"{seconds}s" + + return time_format + + +def _get_json_data(): + """Get haproxy data format JSON""" + return _execute_haproxy_command('show stat json') + + +def _get_raw_data(): + """Retrieve raw data from JSON and organize it into a dictionary. + + Returns: + dict: A dictionary containing the organized data categorized + into frontend, backend, and server. + """ + + data = json.loads(_get_json_data()) + lb_dict = {'frontend': [], 'backend': [], 'server': []} + + for key in data: + frontend = [] + backend = [] + server = [] + for entry in key: + obj_type = entry['objType'].lower() + position = entry['field']['pos'] + name = entry['field']['name'] + value = entry['value']['value'] + + dict_entry = {'pos': position, 'name': {name: value}} + + if obj_type == 'frontend': + frontend.append(dict_entry) + elif obj_type == 'backend': + backend.append(dict_entry) + elif obj_type == 'server': + server.append(dict_entry) + + if len(frontend) > 0: + lb_dict['frontend'].append(frontend) + if len(backend) > 0: + lb_dict['backend'].append(backend) + if len(server) > 0: + lb_dict['server'].append(server) + + return lb_dict + + +def _get_formatted_output(data): + """ + Format the data into a tabulated output. + + Args: + data (dict): The data to be formatted. + + Returns: + str: The tabulated output representing the formatted data. + """ + table = [] + headers = [ + "Proxy name", "Role", "Status", "Req rate", "Resp time", "Last change" + ] + + for key in data: + for item in data[key]: + row = [None] * len(headers) + + for element in item: + if 'pxname' in element['name']: + row[0] = element['name']['pxname'] + elif 'svname' in element['name']: + row[1] = element['name']['svname'] + elif 'status' in element['name']: + row[2] = element['name']['status'] + elif 'req_rate' in element['name']: + row[3] = element['name']['req_rate'] + elif 'rtime' in element['name']: + row[4] = f"{element['name']['rtime']} ms" + elif 'lastchg' in element['name']: + row[5] = _last_change_format(element['name']['lastchg']) + table.append(row) + + out = tabulate(table, headers, numalign="left") + return out + + +def show(raw: bool): + config = ConfigTreeQuery() + if not config.exists('load-balancing haproxy'): + raise vyos.opmode.UnconfiguredSubsystem('Haproxy is not configured') + + data = _get_raw_data() + if raw: + return data + else: + return _get_formatted_output(data) + + +if __name__ == '__main__': + try: + res = vyos.opmode.run(sys.modules[__name__]) + if res: + print(res) + except (ValueError, vyos.opmode.Error) as e: + print(e) + sys.exit(1) diff --git a/src/op_mode/restart.py b/src/op_mode/restart.py index a83c8b9d8..3b0031f34 100755 --- a/src/op_mode/restart.py +++ b/src/op_mode/restart.py @@ -41,6 +41,10 @@ service_map = { 'systemd_service': 'pdns-recursor', 'path': ['service', 'dns', 'forwarding'], }, + 'haproxy': { + 'systemd_service': 'haproxy', + 'path': ['load-balancing', 'haproxy'], + }, 'igmp_proxy': { 'systemd_service': 'igmpproxy', 'path': ['protocols', 'igmp-proxy'], @@ -53,10 +57,6 @@ service_map = { 'systemd_service': 'avahi-daemon', 'path': ['service', 'mdns', 'repeater'], }, - 'reverse_proxy': { - 'systemd_service': 'haproxy', - 'path': ['load-balancing', 'reverse-proxy'], - }, 'router_advert': { 'systemd_service': 'radvd', 'path': ['service', 'router-advert'], @@ -83,10 +83,10 @@ services = typing.Literal[ 'dhcpv6', 'dns_dynamic', 'dns_forwarding', + 'haproxy', 'igmp_proxy', 'ipsec', 'mdns_repeater', - 'reverse_proxy', 'router_advert', 'snmp', 'ssh', diff --git a/src/op_mode/reverseproxy.py b/src/op_mode/reverseproxy.py deleted file mode 100755 index 19704182a..000000000 --- a/src/op_mode/reverseproxy.py +++ /dev/null @@ -1,237 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2023-2024 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -import json -import socket -import sys - -from tabulate import tabulate -from vyos.configquery import ConfigTreeQuery - -import vyos.opmode - -socket_path = '/run/haproxy/admin.sock' -timeout = 5 - - -def _execute_haproxy_command(command): - """Execute a command on the HAProxy UNIX socket and retrieve the response. - - Args: - command (str): The command to be executed. - - Returns: - str: The response received from the HAProxy UNIX socket. - - Raises: - socket.error: If there is an error while connecting or communicating with the socket. - - Finally: - Closes the socket connection. - - Notes: - - HAProxy expects a newline character at the end of the command. - - The socket connection is established using the HAProxy UNIX socket. - - The response from the socket is received and decoded. - - Example: - response = _execute_haproxy_command('show stat') - print(response) - """ - try: - # HAProxy expects new line for command - command = f'{command}\n' - - # Connect to the HAProxy UNIX socket - sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - sock.connect(socket_path) - - # Set the socket timeout - sock.settimeout(timeout) - - # Send the command - sock.sendall(command.encode()) - - # Receive and decode the response - response = b'' - while True: - data = sock.recv(4096) - if not data: - break - response += data - response = response.decode() - - return (response) - - except socket.error as e: - print(f"Error: {e}") - - finally: - # Close the socket - sock.close() - - -def _convert_seconds(seconds): - """Convert seconds to days, hours, minutes, and seconds. - - Args: - seconds (int): The number of seconds to convert. - - Returns: - tuple: A tuple containing the number of days, hours, minutes, and seconds. - """ - minutes = seconds // 60 - hours = minutes // 60 - days = hours // 24 - - return days, hours % 24, minutes % 60, seconds % 60 - - -def _last_change_format(seconds): - """Format the time components into a string representation. - - Args: - seconds (int): The total number of seconds. - - Returns: - str: The formatted time string with days, hours, minutes, and seconds. - - Examples: - >>> _last_change_format(1434) - '23m54s' - >>> _last_change_format(93734) - '1d0h23m54s' - >>> _last_change_format(85434) - '23h23m54s' - """ - days, hours, minutes, seconds = _convert_seconds(seconds) - time_format = "" - - if days: - time_format += f"{days}d" - if hours: - time_format += f"{hours}h" - if minutes: - time_format += f"{minutes}m" - if seconds: - time_format += f"{seconds}s" - - return time_format - - -def _get_json_data(): - """Get haproxy data format JSON""" - return _execute_haproxy_command('show stat json') - - -def _get_raw_data(): - """Retrieve raw data from JSON and organize it into a dictionary. - - Returns: - dict: A dictionary containing the organized data categorized - into frontend, backend, and server. - """ - - data = json.loads(_get_json_data()) - lb_dict = {'frontend': [], 'backend': [], 'server': []} - - for key in data: - frontend = [] - backend = [] - server = [] - for entry in key: - obj_type = entry['objType'].lower() - position = entry['field']['pos'] - name = entry['field']['name'] - value = entry['value']['value'] - - dict_entry = {'pos': position, 'name': {name: value}} - - if obj_type == 'frontend': - frontend.append(dict_entry) - elif obj_type == 'backend': - backend.append(dict_entry) - elif obj_type == 'server': - server.append(dict_entry) - - if len(frontend) > 0: - lb_dict['frontend'].append(frontend) - if len(backend) > 0: - lb_dict['backend'].append(backend) - if len(server) > 0: - lb_dict['server'].append(server) - - return lb_dict - - -def _get_formatted_output(data): - """ - Format the data into a tabulated output. - - Args: - data (dict): The data to be formatted. - - Returns: - str: The tabulated output representing the formatted data. - """ - table = [] - headers = [ - "Proxy name", "Role", "Status", "Req rate", "Resp time", "Last change" - ] - - for key in data: - for item in data[key]: - row = [None] * len(headers) - - for element in item: - if 'pxname' in element['name']: - row[0] = element['name']['pxname'] - elif 'svname' in element['name']: - row[1] = element['name']['svname'] - elif 'status' in element['name']: - row[2] = element['name']['status'] - elif 'req_rate' in element['name']: - row[3] = element['name']['req_rate'] - elif 'rtime' in element['name']: - row[4] = f"{element['name']['rtime']} ms" - elif 'lastchg' in element['name']: - row[5] = _last_change_format(element['name']['lastchg']) - table.append(row) - - out = tabulate(table, headers, numalign="left") - return out - - -def show(raw: bool): - config = ConfigTreeQuery() - if not config.exists('load-balancing reverse-proxy'): - raise vyos.opmode.UnconfiguredSubsystem('Reverse-proxy is not configured') - - data = _get_raw_data() - if raw: - return data - else: - return _get_formatted_output(data) - - -if __name__ == '__main__': - try: - res = vyos.opmode.run(sys.modules[__name__]) - if res: - print(res) - except (ValueError, vyos.opmode.Error) as e: - print(e) - sys.exit(1) -- cgit v1.2.3 From 8943446c8c79c6daad2326ddc7a59950123b35d2 Mon Sep 17 00:00:00 2001 From: Nataliia Solomko Date: Fri, 18 Oct 2024 15:42:59 +0300 Subject: pki: T4914: Rewrite the PKI op mode in the new style --- op-mode-definitions/pki.xml.in | 108 +++++++------- python/vyos/configsession.py | 3 +- python/vyos/opmode.py | 5 +- src/op_mode/pki.py | 295 +++++++++++++++++++++------------------ src/services/api/rest/routers.py | 4 +- 5 files changed, 224 insertions(+), 191 deletions(-) (limited to 'src/op_mode') diff --git a/op-mode-definitions/pki.xml.in b/op-mode-definitions/pki.xml.in index 254ef08cc..866f482bf 100644 --- a/op-mode-definitions/pki.xml.in +++ b/op-mode-definitions/pki.xml.in @@ -27,7 +27,7 @@ <filename> - sudo -E ${vyos_op_scripts_dir}/pki.py --action generate --ca "$7" --sign "$5" --file + sudo -E ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type ca --name "$7" --sign "$5" --file @@ -36,10 +36,10 @@ <certificate name> - ${vyos_op_scripts_dir}/pki.py --action generate --ca "$7" --sign "$5" --install + ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type ca --name "$7" --sign "$5" --install - ${vyos_op_scripts_dir}/pki.py --action generate --ca "noname" --sign "$5" + ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type ca --sign "$5" @@ -48,7 +48,7 @@ <filename> - sudo -E ${vyos_op_scripts_dir}/pki.py --action generate --ca "$5" --file + sudo -E ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type ca --name "$5" --file @@ -57,10 +57,10 @@ <CA name> - ${vyos_op_scripts_dir}/pki.py --action generate --ca "$5" --install + ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type ca --name "$5" --install - ${vyos_op_scripts_dir}/pki.py --action generate --ca "noname" + ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type ca @@ -79,7 +79,7 @@ <filename> - sudo -E ${vyos_op_scripts_dir}/pki.py --action generate --certificate "$6" --self-sign --file + sudo -E ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type certificate --name "$6" --self-sign --file @@ -88,10 +88,10 @@ <certificate name> - ${vyos_op_scripts_dir}/pki.py --action generate --certificate "$6" --self-sign --install + ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type certificate --name "$6" --self-sign --install - ${vyos_op_scripts_dir}/pki.py --action generate --certificate "noname" --self-sign + ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type certificate --self-sign @@ -108,7 +108,7 @@ <filename> - sudo -E ${vyos_op_scripts_dir}/pki.py --action generate --certificate "$7" --sign "$5" --file + sudo -E ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type certificate --name "$7" --sign "$5" --file @@ -117,10 +117,10 @@ <certificate name> - ${vyos_op_scripts_dir}/pki.py --action generate --certificate "$7" --sign "$5" --install + ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type certificate --name "$7" --sign "$5" --install - ${vyos_op_scripts_dir}/pki.py --action generate --certificate "noname" --sign "$5" + ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type certificate --sign "$5" @@ -129,7 +129,7 @@ <filename> - sudo -E ${vyos_op_scripts_dir}/pki.py --action generate --certificate "$5" --file + sudo -E ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type certificate --name "$5" --file @@ -138,10 +138,10 @@ <certificate name> - ${vyos_op_scripts_dir}/pki.py --action generate --certificate "$5" --install + ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type certificate --name "$5" --install - ${vyos_op_scripts_dir}/pki.py --action generate --certificate "noname" + ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type certificate @@ -158,16 +158,16 @@ <filename> - sudo -E ${vyos_op_scripts_dir}/pki.py --action generate --crl "$4" --file + sudo -E ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type crl --name "$4" --file Commands for installing generated CRL into running configuration - ${vyos_op_scripts_dir}/pki.py --action generate --crl "$4" --install + ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type crl --name "$4" --install - ${vyos_op_scripts_dir}/pki.py --action generate --crl "$4" + ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type crl --name "$4" @@ -181,7 +181,7 @@ <filename> - sudo -E ${vyos_op_scripts_dir}/pki.py --action generate --dh "$5" --file + sudo -E ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type dh --name "$5" --file @@ -190,10 +190,10 @@ <DH name> - ${vyos_op_scripts_dir}/pki.py --action generate --dh "$5" --install + ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type dh --name "$5" --install - ${vyos_op_scripts_dir}/pki.py --action generate --dh "noname" + ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type dh @@ -207,7 +207,7 @@ <filename> - sudo -E ${vyos_op_scripts_dir}/pki.py --action generate --keypair "$5" --file + sudo -E ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type key-pair --name "$5" --file @@ -216,10 +216,10 @@ <key name> - ${vyos_op_scripts_dir}/pki.py --action generate --keypair "$5" --install + ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type key-pair --name "$5" --install - ${vyos_op_scripts_dir}/pki.py --action generate --keypair "noname" + ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type key-pair @@ -238,7 +238,7 @@ <filename> - sudo -E ${vyos_op_scripts_dir}/pki.py --action generate --openvpn "$6" --file + sudo -E ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type openvpn --name "$6" --file @@ -247,10 +247,10 @@ <key name> - ${vyos_op_scripts_dir}/pki.py --action generate --openvpn "$6" --install + ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type openvpn --name "$6" --install - ${vyos_op_scripts_dir}/pki.py --action generate --openvpn "noname" + ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type openvpn @@ -266,7 +266,7 @@ <filename> - sudo -E ${vyos_op_scripts_dir}/pki.py --action generate --ssh "$5" --file + sudo -E ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type ssh --name "$5" --file @@ -275,10 +275,10 @@ <key name> - ${vyos_op_scripts_dir}/pki.py --action generate --ssh "$5" --install + ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type ssh --name "$5" --install - ${vyos_op_scripts_dir}/pki.py --action generate --ssh "noname" + ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type ssh @@ -302,12 +302,12 @@ interfaces wireguard - ${vyos_op_scripts_dir}/pki.py --action generate --wireguard --key --interface "$7" --install + ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type wireguard --key --interface "$7" --install - ${vyos_op_scripts_dir}/pki.py --action generate --wireguard --key + ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type wireguard --key @@ -334,14 +334,14 @@ interfaces wireguard ${COMP_WORDS[COMP_CWORD-2]} peer - ${vyos_op_scripts_dir}/pki.py --action generate --wireguard --psk --interface "$7" --peer "$9" --install + ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type wireguard --psk --interface "$7" --peer "$9" --install - ${vyos_op_scripts_dir}/pki.py --action generate --wireguard --psk + ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type wireguard --psk @@ -371,13 +371,13 @@ Path to CA certificate file - sudo -E ${vyos_op_scripts_dir}/pki.py --action import --ca "$4" --filename "$6" + sudo -E ${vyos_op_scripts_dir}/pki.py import_pki --pki-type ca --name "$4" --filename "$6" Path to private key file - sudo -E ${vyos_op_scripts_dir}/pki.py --action import --ca "$4" --key-filename "$6" + sudo -E ${vyos_op_scripts_dir}/pki.py import_pki --pki-type ca --name "$4" --key-filename "$6" @@ -393,13 +393,13 @@ Path to certificate file - sudo -E ${vyos_op_scripts_dir}/pki.py --action import --certificate "$4" --filename "$6" + sudo -E ${vyos_op_scripts_dir}/pki.py import_pki --pki-type certificate --name "$4" --filename "$6" Path to private key file - sudo -E ${vyos_op_scripts_dir}/pki.py --action import --certificate "$4" --key-filename "$6" + sudo -E ${vyos_op_scripts_dir}/pki.py import_pki --pki-type certificate --name "$4" --key-filename "$6" @@ -415,7 +415,7 @@ Path to CRL file - sudo -E ${vyos_op_scripts_dir}/pki.py --action import --crl "$4" --filename "$6" + sudo -E ${vyos_op_scripts_dir}/pki.py import_pki --pki-type crl --name "$4" --filename "$6" @@ -431,7 +431,7 @@ Path to DH parameters file - sudo -E ${vyos_op_scripts_dir}/pki.py --action import --dh "$4" --filename "$6" + sudo -E ${vyos_op_scripts_dir}/pki.py import_pki --pki-type dh --name "$4" --filename "$6" @@ -447,13 +447,13 @@ Path to public key file - sudo -E ${vyos_op_scripts_dir}/pki.py --action import --keypair "$4" --filename "$6" + sudo -E ${vyos_op_scripts_dir}/pki.py import_pki --pki-type key-pair --name "$4" --filename "$6" Path to private key file - sudo -E ${vyos_op_scripts_dir}/pki.py --action import --keypair "$4" --key-filename "$6" + sudo -E ${vyos_op_scripts_dir}/pki.py import_pki --pki-type key-pair --name "$4" --key-filename "$6" @@ -474,7 +474,7 @@ Path to shared secret key file - sudo -E ${vyos_op_scripts_dir}/pki.py --action import --openvpn "$5" --filename "$7" + sudo -E ${vyos_op_scripts_dir}/pki.py import_pki --pki-type openvpn --name "$5" --filename "$7" @@ -490,13 +490,13 @@ Show PKI x509 certificates - sudo ${vyos_op_scripts_dir}/pki.py --action show + sudo ${vyos_op_scripts_dir}/pki.py show_all Show x509 CA certificates - sudo ${vyos_op_scripts_dir}/pki.py --action show --ca "all" + sudo ${vyos_op_scripts_dir}/pki.py show_certificate_authority @@ -505,13 +505,13 @@ pki ca - sudo ${vyos_op_scripts_dir}/pki.py --action show --ca "$4" + sudo ${vyos_op_scripts_dir}/pki.py show_certificate_authority --name "$4" Show x509 CA certificate in PEM format - sudo ${vyos_op_scripts_dir}/pki.py --action show --ca "$4" --pem + sudo ${vyos_op_scripts_dir}/pki.py show_certificate_authority --name "$4" --pem @@ -519,7 +519,7 @@ Show x509 certificates - sudo ${vyos_op_scripts_dir}/pki.py --action show --certificate "all" + sudo ${vyos_op_scripts_dir}/pki.py show_certificate @@ -528,13 +528,13 @@ pki certificate - sudo ${vyos_op_scripts_dir}/pki.py --action show --certificate "$4" + sudo ${vyos_op_scripts_dir}/pki.py show_certificate --name "$4" Show x509 certificate in PEM format - sudo ${vyos_op_scripts_dir}/pki.py --action show --certificate "$4" --pem + sudo ${vyos_op_scripts_dir}/pki.py show_certificate --name "$4" --pem @@ -543,7 +543,7 @@ sha256 sha384 sha512 - sudo ${vyos_op_scripts_dir}/pki.py --action show --certificate "$4" --fingerprint "$6" + sudo ${vyos_op_scripts_dir}/pki.py show_certificate --name "$4" --fingerprint "$6" @@ -551,7 +551,7 @@ Show x509 certificate revocation lists - ${vyos_op_scripts_dir}/pki.py --action show --crl "all" + ${vyos_op_scripts_dir}/pki.py show_crl @@ -560,13 +560,13 @@ pki ca - ${vyos_op_scripts_dir}/pki.py --action show --crl "$4" + ${vyos_op_scripts_dir}/pki.py show_crl --name "$4" Show x509 certificate revocation lists by CA name in PEM format - ${vyos_op_scripts_dir}/pki.py --action show --crl "$4" --pem + ${vyos_op_scripts_dir}/pki.py show_crl --name "$4" --pem diff --git a/python/vyos/configsession.py b/python/vyos/configsession.py index 9c56d246a..5876dc5b0 100644 --- a/python/vyos/configsession.py +++ b/python/vyos/configsession.py @@ -42,8 +42,7 @@ INSTALL_IMAGE = [ IMPORT_PKI = ['/opt/vyatta/bin/vyatta-op-cmd-wrapper', 'import'] IMPORT_PKI_NO_PROMPT = [ '/usr/libexec/vyos/op_mode/pki.py', - '--action', - 'import', + 'import_pki', '--no-prompt', ] REMOVE_IMAGE = [ diff --git a/python/vyos/opmode.py b/python/vyos/opmode.py index 066c8058f..136ac35e2 100644 --- a/python/vyos/opmode.py +++ b/python/vyos/opmode.py @@ -89,7 +89,10 @@ class InternalError(Error): def _is_op_mode_function_name(name): - if re.match(r"^(show|clear|reset|restart|add|update|delete|generate|set|renew|release|execute)", name): + if re.match( + r'^(show|clear|reset|restart|add|update|delete|generate|set|renew|release|execute|import)', + name, + ): return True else: return False diff --git a/src/op_mode/pki.py b/src/op_mode/pki.py index 5652a5d74..56b873bb1 100755 --- a/src/op_mode/pki.py +++ b/src/op_mode/pki.py @@ -14,16 +14,18 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import argparse import ipaddress import os import re import sys import tabulate +import typing from cryptography import x509 from cryptography.x509.oid import ExtendedKeyUsageOID +import vyos.opmode + from vyos.config import Config from vyos.config import config_dict_mangle_acme from vyos.pki import encode_certificate @@ -51,8 +53,36 @@ from vyos.utils.process import cmd CERT_REQ_END = '-----END CERTIFICATE REQUEST-----' auth_dir = '/config/auth' +ArgsPkiType = typing.Literal['ca', 'certificate', 'dh', 'key-pair', 'openvpn', 'crl'] +ArgsPkiTypeGen = typing.Literal[ArgsPkiType, typing.Literal['ssh', 'wireguard']] +ArgsFingerprint = typing.Literal['sha256', 'sha384', 'sha512'] + # Helper Functions conf = Config() + + +def _verify(target): + """Decorator checks if config for PKI exists""" + from functools import wraps + + if target not in ['ca', 'certificate']: + raise ValueError('Invalid PKI') + + def _verify_target(func): + @wraps(func) + def _wrapper(*args, **kwargs): + name = kwargs.get('name') + unconf_message = f'PKI {target} "{name}" does not exist!' + if name: + if not conf.exists(['pki', target, name]): + raise vyos.opmode.UnconfiguredSubsystem(unconf_message) + return func(*args, **kwargs) + + return _wrapper + + return _verify_target + + def get_default_values(): # Fetch default x509 values base = ['pki', 'x509', 'default'] @@ -872,8 +902,111 @@ def import_openvpn_secret(name, path): install_openvpn_key(name, key_data, key_version) -# Show functions -def show_certificate_authority(name=None, pem=False): + +def generate_pki( + raw: bool, + pki_type: ArgsPkiTypeGen, + name: typing.Optional[str], + file: typing.Optional[bool], + install: typing.Optional[bool], + sign: typing.Optional[str], + self_sign: typing.Optional[bool], + key: typing.Optional[bool], + psk: typing.Optional[bool], + interface: typing.Optional[str], + peer: typing.Optional[str], +): + try: + if pki_type == 'ca': + if sign: + generate_ca_certificate_sign(name, sign, install=install, file=file) + else: + generate_ca_certificate(name, install=install, file=file) + elif pki_type == 'certificate': + if sign: + generate_certificate_sign(name, sign, install=install, file=file) + elif self_sign: + generate_certificate_selfsign(name, install=install, file=file) + else: + generate_certificate_request(name=name, install=install, file=file) + + elif pki_type == 'crl': + generate_certificate_revocation_list(name, install=install, file=file) + + elif pki_type == 'ssh': + generate_ssh_keypair(name, install=install, file=file) + + elif pki_type == 'dh': + generate_dh_parameters(name, install=install, file=file) + + elif pki_type == 'key-pair': + generate_keypair(name, install=install, file=file) + + elif pki_type == 'openvpn': + generate_openvpn_key(name, install=install, file=file) + + elif pki_type == 'wireguard': + # WireGuard supports writing key directly into the CLI, but this + # requires the vyos_libexec_dir environment variable to be set + os.environ['vyos_libexec_dir'] = '/usr/libexec/vyos' + + if key: + generate_wireguard_key(interface, install=install) + if psk: + generate_wireguard_psk(interface, peer=peer, install=install) + except KeyboardInterrupt: + print('Aborted') + sys.exit(0) + + +def import_pki( + name: str, + pki_type: ArgsPkiType, + filename: typing.Optional[str], + key_filename: typing.Optional[str], + no_prompt: typing.Optional[bool], + passphrase: typing.Optional[str], +): + try: + if pki_type == 'ca': + import_ca_certificate( + name, + path=filename, + key_path=key_filename, + no_prompt=no_prompt, + passphrase=passphrase, + ) + elif pki_type == 'certificate': + import_certificate( + name, + path=filename, + key_path=key_filename, + no_prompt=no_prompt, + passphrase=passphrase, + ) + elif pki_type == 'crl': + import_crl(name, filename) + elif pki_type == 'dh': + import_dh_parameters(name, filename) + elif pki_type == 'key-pair': + import_keypair( + name, + path=filename, + key_path=key_filename, + no_prompt=no_prompt, + passphrase=passphrase, + ) + elif pki_type == 'openvpn': + import_openvpn_secret(name, filename) + except KeyboardInterrupt: + print('Aborted') + sys.exit(0) + + +@_verify('ca') +def show_certificate_authority( + raw: bool, name: typing.Optional[str] = None, pem: typing.Optional[bool] = False +): headers = ['Name', 'Subject', 'Issuer CN', 'Issued', 'Expiry', 'Private Key', 'Parent'] data = [] certs = get_config_ca_certificate() @@ -905,7 +1038,14 @@ def show_certificate_authority(name=None, pem=False): print("Certificate Authorities:") print(tabulate.tabulate(data, headers)) -def show_certificate(name=None, pem=False, fingerprint_hash=None): + +@_verify('certificate') +def show_certificate( + raw: bool, + name: typing.Optional[str] = None, + pem: typing.Optional[bool] = False, + fingerprint: typing.Optional[ArgsFingerprint] = None, +): headers = ['Name', 'Type', 'Subject CN', 'Issuer CN', 'Issued', 'Expiry', 'Revoked', 'Private Key', 'CA Present'] data = [] certs = get_config_certificate() @@ -926,8 +1066,8 @@ def show_certificate(name=None, pem=False, fingerprint_hash=None): if name and pem: print(encode_certificate(cert)) return - elif name and fingerprint_hash: - print(get_certificate_fingerprint(cert, fingerprint_hash)) + elif name and fingerprint: + print(get_certificate_fingerprint(cert, fingerprint)) return ca_name = get_certificate_ca(cert, ca_certs) @@ -955,7 +1095,10 @@ def show_certificate(name=None, pem=False, fingerprint_hash=None): print("Certificates:") print(tabulate.tabulate(data, headers)) -def show_crl(name=None, pem=False): + +def show_crl( + raw: bool, name: typing.Optional[str] = None, pem: typing.Optional[bool] = False +): headers = ['CA Name', 'Updated', 'Revokes'] data = [] certs = get_config_ca_certificate() @@ -989,132 +1132,20 @@ def show_crl(name=None, pem=False): print("Certificate Revocation Lists:") print(tabulate.tabulate(data, headers)) -if __name__ == '__main__': - parser = argparse.ArgumentParser() - parser.add_argument('--action', help='PKI action', required=True) - - # X509 - parser.add_argument('--ca', help='Certificate Authority', required=False) - parser.add_argument('--certificate', help='Certificate', required=False) - parser.add_argument('--crl', help='Certificate Revocation List', required=False) - parser.add_argument('--sign', help='Sign certificate with specified CA', required=False) - parser.add_argument('--self-sign', help='Self-sign the certificate', action='store_true') - parser.add_argument('--pem', help='Output using PEM encoding', action='store_true') - parser.add_argument('--fingerprint', help='Show fingerprint and exit', action='store') - # SSH - parser.add_argument('--ssh', help='SSH Key', required=False) +def show_all(raw: bool): + show_certificate_authority(raw) + print('\n') + show_certificate(raw) + print('\n') + show_crl(raw) - # DH - parser.add_argument('--dh', help='DH Parameters', required=False) - - # Key pair - parser.add_argument('--keypair', help='Key pair', required=False) - - # OpenVPN - parser.add_argument('--openvpn', help='OpenVPN TLS key', required=False) - - # WireGuard - parser.add_argument('--wireguard', help='Wireguard', action='store_true') - group = parser.add_mutually_exclusive_group() - group.add_argument('--key', help='Wireguard key pair', action='store_true', required=False) - group.add_argument('--psk', help='Wireguard pre shared key', action='store_true', required=False) - parser.add_argument('--interface', help='Install generated keys into running-config for named interface', action='store') - parser.add_argument('--peer', help='Install generated keys into running-config for peer', action='store') - - # Global - parser.add_argument('--file', help='Write generated keys into specified filename', action='store_true') - parser.add_argument('--install', help='Install generated keys into running-config', action='store_true') - - parser.add_argument('--filename', help='Write certificate into specified filename', action='store') - parser.add_argument('--key-filename', help='Write key into specified filename', action='store') - - parser.add_argument('--no-prompt', action='store_true', help='Perform action non-interactively') - parser.add_argument('--passphrase', help='A passphrase to decrypt the private key') - - args = parser.parse_args() +if __name__ == '__main__': try: - if args.action == 'generate': - if args.ca: - if args.sign: - generate_ca_certificate_sign(args.ca, args.sign, install=args.install, file=args.file) - else: - generate_ca_certificate(args.ca, install=args.install, file=args.file) - elif args.certificate: - if args.sign: - generate_certificate_sign(args.certificate, args.sign, install=args.install, file=args.file) - elif args.self_sign: - generate_certificate_selfsign(args.certificate, install=args.install, file=args.file) - else: - generate_certificate_request(name=args.certificate, install=args.install, file=args.file) - - elif args.crl: - generate_certificate_revocation_list(args.crl, install=args.install, file=args.file) - - elif args.ssh: - generate_ssh_keypair(args.ssh, install=args.install, file=args.file) - - elif args.dh: - generate_dh_parameters(args.dh, install=args.install, file=args.file) - - elif args.keypair: - generate_keypair(args.keypair, install=args.install, file=args.file) - - elif args.openvpn: - generate_openvpn_key(args.openvpn, install=args.install, file=args.file) - - elif args.wireguard: - # WireGuard supports writing key directly into the CLI, but this - # requires the vyos_libexec_dir environment variable to be set - os.environ["vyos_libexec_dir"] = "/usr/libexec/vyos" - - if args.key: - generate_wireguard_key(args.interface, install=args.install) - if args.psk: - generate_wireguard_psk(args.interface, peer=args.peer, install=args.install) - elif args.action == 'import': - if args.ca: - import_ca_certificate(args.ca, path=args.filename, key_path=args.key_filename, - no_prompt=args.no_prompt, passphrase=args.passphrase) - elif args.certificate: - import_certificate(args.certificate, path=args.filename, key_path=args.key_filename, - no_prompt=args.no_prompt, passphrase=args.passphrase) - elif args.crl: - import_crl(args.crl, args.filename) - elif args.dh: - import_dh_parameters(args.dh, args.filename) - elif args.keypair: - import_keypair(args.keypair, path=args.filename, key_path=args.key_filename, - no_prompt=args.no_prompt, passphrase=args.passphrase) - elif args.openvpn: - import_openvpn_secret(args.openvpn, args.filename) - elif args.action == 'show': - if args.ca: - ca_name = None if args.ca == 'all' else args.ca - if ca_name: - if not conf.exists(['pki', 'ca', ca_name]): - print(f'CA "{ca_name}" does not exist!') - exit(1) - show_certificate_authority(ca_name, args.pem) - elif args.certificate: - cert_name = None if args.certificate == 'all' else args.certificate - if cert_name: - if not conf.exists(['pki', 'certificate', cert_name]): - print(f'Certificate "{cert_name}" does not exist!') - exit(1) - if args.fingerprint is None: - show_certificate(None if args.certificate == 'all' else args.certificate, args.pem) - else: - show_certificate(args.certificate, fingerprint_hash=args.fingerprint) - elif args.crl: - show_crl(None if args.crl == 'all' else args.crl, args.pem) - else: - show_certificate_authority() - print('\n') - show_certificate() - print('\n') - show_crl() - except KeyboardInterrupt: - print("Aborted") - sys.exit(0) + res = vyos.opmode.run(sys.modules[__name__]) + if res: + print(res) + except (ValueError, vyos.opmode.Error) as e: + print(e) + sys.exit(1) diff --git a/src/services/api/rest/routers.py b/src/services/api/rest/routers.py index da981d5bf..5612e947c 100644 --- a/src/services/api/rest/routers.py +++ b/src/services/api/rest/routers.py @@ -423,9 +423,9 @@ def create_path_import_pki_no_prompt(path): correct_paths = ['ca', 'certificate', 'key-pair'] if path[1] not in correct_paths: return False - path[1] = '--' + path[1].replace('-', '') path[3] = '--key-filename' - return path[1:] + path.insert(2, '--name') + return ['--pki-type'] + path[1:] @router.post('/configure') -- cgit v1.2.3 From 52e3ba35c09237744340c2cce4cba1783ec3f2be Mon Sep 17 00:00:00 2001 From: Nataliia Solomko Date: Fri, 18 Oct 2024 16:46:39 +0300 Subject: pki: T4914: reformat file by linter rules --- src/op_mode/pki.py | 517 ++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 375 insertions(+), 142 deletions(-) (limited to 'src/op_mode') diff --git a/src/op_mode/pki.py b/src/op_mode/pki.py index 56b873bb1..49a461e9e 100755 --- a/src/op_mode/pki.py +++ b/src/op_mode/pki.py @@ -86,13 +86,17 @@ def _verify(target): def get_default_values(): # Fetch default x509 values base = ['pki', 'x509', 'default'] - x509_defaults = conf.get_config_dict(base, key_mangling=('-', '_'), - no_tag_node_value_mangle=True, - get_first_key=True, - with_recursive_defaults=True) + x509_defaults = conf.get_config_dict( + base, + key_mangling=('-', '_'), + no_tag_node_value_mangle=True, + get_first_key=True, + with_recursive_defaults=True, + ) return x509_defaults + def get_config_ca_certificate(name=None): # Fetch ca certificates from config base = ['pki', 'ca'] @@ -101,12 +105,15 @@ def get_config_ca_certificate(name=None): if name: base = base + [name] - if not conf.exists(base + ['private', 'key']) or not conf.exists(base + ['certificate']): + if not conf.exists(base + ['private', 'key']) or not conf.exists( + base + ['certificate'] + ): return False - return conf.get_config_dict(base, key_mangling=('-', '_'), - get_first_key=True, - no_tag_node_value_mangle=True) + return conf.get_config_dict( + base, key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True + ) + def get_config_certificate(name=None): # Get certificates from config @@ -116,18 +123,21 @@ def get_config_certificate(name=None): if name: base = base + [name] - if not conf.exists(base + ['private', 'key']) or not conf.exists(base + ['certificate']): + if not conf.exists(base + ['private', 'key']) or not conf.exists( + base + ['certificate'] + ): return False - pki = conf.get_config_dict(base, key_mangling=('-', '_'), - get_first_key=True, - no_tag_node_value_mangle=True) + pki = conf.get_config_dict( + base, key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True + ) if pki: for certificate in pki: pki[certificate] = config_dict_mangle_acme(certificate, pki[certificate]) return pki + def get_certificate_ca(cert, ca_certs): # Find CA certificate for given certificate if not ca_certs: @@ -146,6 +156,7 @@ def get_certificate_ca(cert, ca_certs): return ca_name return None + def get_config_revoked_certificates(): # Fetch revoked certificates from config ca_base = ['pki', 'ca'] @@ -154,19 +165,26 @@ def get_config_revoked_certificates(): certs = [] if conf.exists(ca_base): - ca_certificates = conf.get_config_dict(ca_base, key_mangling=('-', '_'), - get_first_key=True, - no_tag_node_value_mangle=True) + ca_certificates = conf.get_config_dict( + ca_base, + key_mangling=('-', '_'), + get_first_key=True, + no_tag_node_value_mangle=True, + ) certs.extend(ca_certificates.values()) if conf.exists(cert_base): - certificates = conf.get_config_dict(cert_base, key_mangling=('-', '_'), - get_first_key=True, - no_tag_node_value_mangle=True) + certificates = conf.get_config_dict( + cert_base, + key_mangling=('-', '_'), + get_first_key=True, + no_tag_node_value_mangle=True, + ) certs.extend(certificates.values()) return [cert_dict for cert_dict in certs if 'revoke' in cert_dict] + def get_revoked_by_serial_numbers(serial_numbers=[]): # Return serial numbers of revoked certificates certs_out = [] @@ -190,113 +208,153 @@ def get_revoked_by_serial_numbers(serial_numbers=[]): certs_out.append(cert_name) return certs_out -def install_certificate(name, cert='', private_key=None, key_type=None, key_passphrase=None, is_ca=False): + +def install_certificate( + name, cert='', private_key=None, key_type=None, key_passphrase=None, is_ca=False +): # Show/install conf commands for certificate prefix = 'ca' if is_ca else 'certificate' - base = f"pki {prefix} {name}" + base = f'pki {prefix} {name}' config_paths = [] if cert: - cert_pem = "".join(encode_certificate(cert).strip().split("\n")[1:-1]) + cert_pem = ''.join(encode_certificate(cert).strip().split('\n')[1:-1]) config_paths.append(f"{base} certificate '{cert_pem}'") if private_key: - key_pem = "".join(encode_private_key(private_key, passphrase=key_passphrase).strip().split("\n")[1:-1]) + key_pem = ''.join( + encode_private_key(private_key, passphrase=key_passphrase) + .strip() + .split('\n')[1:-1] + ) config_paths.append(f"{base} private key '{key_pem}'") if key_passphrase: - config_paths.append(f"{base} private password-protected") + config_paths.append(f'{base} private password-protected') install_into_config(conf, config_paths) + def install_crl(ca_name, crl): # Show/install conf commands for crl - crl_pem = "".join(encode_certificate(crl).strip().split("\n")[1:-1]) + crl_pem = ''.join(encode_certificate(crl).strip().split('\n')[1:-1]) install_into_config(conf, [f"pki ca {ca_name} crl '{crl_pem}'"]) + def install_dh_parameters(name, params): # Show/install conf commands for dh params - dh_pem = "".join(encode_dh_parameters(params).strip().split("\n")[1:-1]) + dh_pem = ''.join(encode_dh_parameters(params).strip().split('\n')[1:-1]) install_into_config(conf, [f"pki dh {name} parameters '{dh_pem}'"]) + def install_ssh_key(name, public_key, private_key, passphrase=None): # Show/install conf commands for ssh key - key_openssh = encode_public_key(public_key, encoding='OpenSSH', key_format='OpenSSH') + key_openssh = encode_public_key( + public_key, encoding='OpenSSH', key_format='OpenSSH' + ) username = os.getlogin() - type_key_split = key_openssh.split(" ") - - base = f"system login user {username} authentication public-keys {name}" - install_into_config(conf, [ - f"{base} key '{type_key_split[1]}'", - f"{base} type '{type_key_split[0]}'" - ]) - print(encode_private_key(private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase)) - -def install_keypair(name, key_type, private_key=None, public_key=None, passphrase=None, prompt=True): + type_key_split = key_openssh.split(' ') + + base = f'system login user {username} authentication public-keys {name}' + install_into_config( + conf, + [f"{base} key '{type_key_split[1]}'", f"{base} type '{type_key_split[0]}'"], + ) + print( + encode_private_key( + private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase + ) + ) + + +def install_keypair( + name, key_type, private_key=None, public_key=None, passphrase=None, prompt=True +): # Show/install conf commands for key-pair config_paths = [] if public_key: - install_public_key = not prompt or ask_yes_no('Do you want to install the public key?', default=True) + install_public_key = not prompt or ask_yes_no( + 'Do you want to install the public key?', default=True + ) public_key_pem = encode_public_key(public_key) if install_public_key: - install_public_pem = "".join(public_key_pem.strip().split("\n")[1:-1]) - config_paths.append(f"pki key-pair {name} public key '{install_public_pem}'") + install_public_pem = ''.join(public_key_pem.strip().split('\n')[1:-1]) + config_paths.append( + f"pki key-pair {name} public key '{install_public_pem}'" + ) else: - print("Public key:") + print('Public key:') print(public_key_pem) if private_key: - install_private_key = not prompt or ask_yes_no('Do you want to install the private key?', default=True) + install_private_key = not prompt or ask_yes_no( + 'Do you want to install the private key?', default=True + ) private_key_pem = encode_private_key(private_key, passphrase=passphrase) if install_private_key: - install_private_pem = "".join(private_key_pem.strip().split("\n")[1:-1]) - config_paths.append(f"pki key-pair {name} private key '{install_private_pem}'") + install_private_pem = ''.join(private_key_pem.strip().split('\n')[1:-1]) + config_paths.append( + f"pki key-pair {name} private key '{install_private_pem}'" + ) if passphrase: - config_paths.append(f"pki key-pair {name} private password-protected") + config_paths.append(f'pki key-pair {name} private password-protected') else: - print("Private key:") + print('Private key:') print(private_key_pem) install_into_config(conf, config_paths) + def install_openvpn_key(name, key_data, key_version='1'): config_paths = [ f"pki openvpn shared-secret {name} key '{key_data}'", - f"pki openvpn shared-secret {name} version '{key_version}'" + f"pki openvpn shared-secret {name} version '{key_version}'", ] install_into_config(conf, config_paths) + def install_wireguard_key(interface, private_key, public_key): # Show conf commands for installing wireguard key pairs from vyos.ifconfig import Section + if Section.section(interface) != 'wireguard': print(f'"{interface}" is not a WireGuard interface name!') exit(1) # Check if we are running in a config session - if yes, we can directly write to the CLI - install_into_config(conf, [f"interfaces wireguard {interface} private-key '{private_key}'"]) + install_into_config( + conf, [f"interfaces wireguard {interface} private-key '{private_key}'"] + ) print(f"Corresponding public-key to use on peer system is: '{public_key}'") + def install_wireguard_psk(interface, peer, psk): from vyos.ifconfig import Section + if Section.section(interface) != 'wireguard': print(f'"{interface}" is not a WireGuard interface name!') exit(1) # Check if we are running in a config session - if yes, we can directly write to the CLI - install_into_config(conf, [f"interfaces wireguard {interface} peer {peer} preshared-key '{psk}'"]) + install_into_config( + conf, [f"interfaces wireguard {interface} peer {peer} preshared-key '{psk}'"] + ) + def ask_passphrase(): passphrase = None - print("Note: If you plan to use the generated key on this router, do not encrypt the private key.") + print( + 'Note: If you plan to use the generated key on this router, do not encrypt the private key.' + ) if ask_yes_no('Do you want to encrypt the private key with a passphrase?'): passphrase = ask_input('Enter passphrase:') return passphrase + def write_file(filename, contents): full_path = os.path.join(auth_dir, filename) directory = os.path.dirname(full_path) @@ -305,7 +363,9 @@ def write_file(filename, contents): print('Failed to write file: directory does not exist') return False - if os.path.exists(full_path) and not ask_yes_no('Do you want to overwrite the existing file?'): + if os.path.exists(full_path) and not ask_yes_no( + 'Do you want to overwrite the existing file?' + ): return False with open(full_path, 'w') as f: @@ -313,10 +373,14 @@ def write_file(filename, contents): print(f'File written to {full_path}') -# Generation functions +# Generation functions def generate_private_key(): - key_type = ask_input('Enter private key type: [rsa, dsa, ec]', default='rsa', valid_responses=['rsa', 'dsa', 'ec']) + key_type = ask_input( + 'Enter private key type: [rsa, dsa, ec]', + default='rsa', + valid_responses=['rsa', 'dsa', 'ec'], + ) size_valid = [] size_default = 0 @@ -328,28 +392,43 @@ def generate_private_key(): size_default = 256 size_valid = [224, 256, 384, 521] - size = ask_input('Enter private key bits:', default=size_default, numeric_only=True, valid_responses=size_valid) + size = ask_input( + 'Enter private key bits:', + default=size_default, + numeric_only=True, + valid_responses=size_valid, + ) return create_private_key(key_type, size), key_type + def parse_san_string(san_string): if not san_string: return None output = [] - san_split = san_string.strip().split(",") + san_split = san_string.strip().split(',') for pair_str in san_split: - tag, value = pair_str.strip().split(":", 1) + tag, value = pair_str.strip().split(':', 1) if tag == 'ipv4': output.append(ipaddress.IPv4Address(value)) elif tag == 'ipv6': output.append(ipaddress.IPv6Address(value)) elif tag == 'dns' or tag == 'rfc822': output.append(value) - return output + return -def generate_certificate_request(private_key=None, key_type=None, return_request=False, name=None, install=False, file=False, ask_san=True): + +def generate_certificate_request( + private_key=None, + key_type=None, + return_request=False, + name=None, + install=False, + file=False, + ask_san=True, +): if not private_key: private_key, key_type = generate_private_key() @@ -358,18 +437,24 @@ def generate_certificate_request(private_key=None, key_type=None, return_request while True: country = ask_input('Enter country code:', default=default_values['country']) if len(country) != 2: - print("Country name must be a 2 character country code") + print('Country name must be a 2 character country code') continue subject['country'] = country break subject['state'] = ask_input('Enter state:', default=default_values['state']) - subject['locality'] = ask_input('Enter locality:', default=default_values['locality']) - subject['organization'] = ask_input('Enter organization name:', default=default_values['organization']) + subject['locality'] = ask_input( + 'Enter locality:', default=default_values['locality'] + ) + subject['organization'] = ask_input( + 'Enter organization name:', default=default_values['organization'] + ) subject['common_name'] = ask_input('Enter common name:', default='vyos.io') subject_alt_names = None if ask_san and ask_yes_no('Do you want to configure Subject Alternative Names?'): - print("Enter alternative names in a comma separate list, example: ipv4:1.1.1.1,ipv6:fe80::1,dns:vyos.net,rfc822:user@vyos.net") + print( + 'Enter alternative names in a comma separate list, example: ipv4:1.1.1.1,ipv6:fe80::1,dns:vyos.net,rfc822:user@vyos.net' + ) san_string = ask_input('Enter Subject Alternative Names:') subject_alt_names = parse_san_string(san_string) @@ -386,24 +471,48 @@ def generate_certificate_request(private_key=None, key_type=None, return_request return None if install: - print("Certificate request:") - print(encode_certificate(cert_req) + "\n") - install_certificate(name, private_key=private_key, key_type=key_type, key_passphrase=passphrase, is_ca=False) + print('Certificate request:') + print(encode_certificate(cert_req) + '\n') + install_certificate( + name, + private_key=private_key, + key_type=key_type, + key_passphrase=passphrase, + is_ca=False, + ) if file: write_file(f'{name}.csr', encode_certificate(cert_req)) - write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase)) + write_file( + f'{name}.key', encode_private_key(private_key, passphrase=passphrase) + ) -def generate_certificate(cert_req, ca_cert, ca_private_key, is_ca=False, is_sub_ca=False): - valid_days = ask_input('Enter how many days certificate will be valid:', default='365' if not is_ca else '1825', numeric_only=True) + +def generate_certificate( + cert_req, ca_cert, ca_private_key, is_ca=False, is_sub_ca=False +): + valid_days = ask_input( + 'Enter how many days certificate will be valid:', + default='365' if not is_ca else '1825', + numeric_only=True, + ) cert_type = None if not is_ca: - cert_type = ask_input('Enter certificate type: (client, server)', default='server', valid_responses=['client', 'server']) - return create_certificate(cert_req, ca_cert, ca_private_key, valid_days, cert_type, is_ca, is_sub_ca) + cert_type = ask_input( + 'Enter certificate type: (client, server)', + default='server', + valid_responses=['client', 'server'], + ) + return create_certificate( + cert_req, ca_cert, ca_private_key, valid_days, cert_type, is_ca, is_sub_ca + ) + def generate_ca_certificate(name, install=False, file=False): private_key, key_type = generate_private_key() - cert_req = generate_certificate_request(private_key, key_type, return_request=True, ask_san=False) + cert_req = generate_certificate_request( + private_key, key_type, return_request=True, ask_san=False + ) cert = generate_certificate(cert_req, cert_req, private_key, is_ca=True) passphrase = ask_passphrase() @@ -413,11 +522,16 @@ def generate_ca_certificate(name, install=False, file=False): return None if install: - install_certificate(name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=True) + install_certificate( + name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=True + ) if file: write_file(f'{name}.pem', encode_certificate(cert)) - write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase)) + write_file( + f'{name}.key', encode_private_key(private_key, passphrase=passphrase) + ) + def generate_ca_certificate_sign(name, ca_name, install=False, file=False): ca_dict = get_config_ca_certificate(ca_name) @@ -429,17 +543,19 @@ def generate_ca_certificate_sign(name, ca_name, install=False, file=False): ca_cert = load_certificate(ca_dict['certificate']) if not ca_cert: - print("Failed to load signing CA certificate, aborting") + print('Failed to load signing CA certificate, aborting') return None ca_private = ca_dict['private'] ca_private_passphrase = None if 'password_protected' in ca_private: ca_private_passphrase = ask_input('Enter signing CA private key passphrase:') - ca_private_key = load_private_key(ca_private['key'], passphrase=ca_private_passphrase) + ca_private_key = load_private_key( + ca_private['key'], passphrase=ca_private_passphrase + ) if not ca_private_key: - print("Failed to load signing CA private key, aborting") + print('Failed to load signing CA private key, aborting') return None private_key = None @@ -448,9 +564,11 @@ def generate_ca_certificate_sign(name, ca_name, install=False, file=False): cert_req = None if not ask_yes_no('Do you already have a certificate request?'): private_key, key_type = generate_private_key() - cert_req = generate_certificate_request(private_key, key_type, return_request=True, ask_san=False) + cert_req = generate_certificate_request( + private_key, key_type, return_request=True, ask_san=False + ) else: - print("Paste certificate request and press enter:") + print('Paste certificate request and press enter:') lines = [] curr_line = '' while True: @@ -460,17 +578,21 @@ def generate_ca_certificate_sign(name, ca_name, install=False, file=False): lines.append(curr_line) if not lines: - print("Aborted") + print('Aborted') return None - wrap = lines[0].find('-----') < 0 # Only base64 pasted, add the CSR tags for parsing - cert_req = load_certificate_request("\n".join(lines), wrap) + wrap = ( + lines[0].find('-----') < 0 + ) # Only base64 pasted, add the CSR tags for parsing + cert_req = load_certificate_request('\n'.join(lines), wrap) if not cert_req: - print("Invalid certificate request") + print('Invalid certificate request') return None - cert = generate_certificate(cert_req, ca_cert, ca_private_key, is_ca=True, is_sub_ca=True) + cert = generate_certificate( + cert_req, ca_cert, ca_private_key, is_ca=True, is_sub_ca=True + ) passphrase = None if private_key is not None: @@ -483,12 +605,17 @@ def generate_ca_certificate_sign(name, ca_name, install=False, file=False): return None if install: - install_certificate(name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=True) + install_certificate( + name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=True + ) if file: write_file(f'{name}.pem', encode_certificate(cert)) if private_key is not None: - write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase)) + write_file( + f'{name}.key', encode_private_key(private_key, passphrase=passphrase) + ) + def generate_certificate_sign(name, ca_name, install=False, file=False): ca_dict = get_config_ca_certificate(ca_name) @@ -500,17 +627,19 @@ def generate_certificate_sign(name, ca_name, install=False, file=False): ca_cert = load_certificate(ca_dict['certificate']) if not ca_cert: - print("Failed to load CA certificate, aborting") + print('Failed to load CA certificate, aborting') return None ca_private = ca_dict['private'] ca_private_passphrase = None if 'password_protected' in ca_private: ca_private_passphrase = ask_input('Enter CA private key passphrase:') - ca_private_key = load_private_key(ca_private['key'], passphrase=ca_private_passphrase) + ca_private_key = load_private_key( + ca_private['key'], passphrase=ca_private_passphrase + ) if not ca_private_key: - print("Failed to load CA private key, aborting") + print('Failed to load CA private key, aborting') return None private_key = None @@ -519,9 +648,11 @@ def generate_certificate_sign(name, ca_name, install=False, file=False): cert_req = None if not ask_yes_no('Do you already have a certificate request?'): private_key, key_type = generate_private_key() - cert_req = generate_certificate_request(private_key, key_type, return_request=True) + cert_req = generate_certificate_request( + private_key, key_type, return_request=True + ) else: - print("Paste certificate request and press enter:") + print('Paste certificate request and press enter:') lines = [] curr_line = '' while True: @@ -531,18 +662,20 @@ def generate_certificate_sign(name, ca_name, install=False, file=False): lines.append(curr_line) if not lines: - print("Aborted") + print('Aborted') return None - wrap = lines[0].find('-----') < 0 # Only base64 pasted, add the CSR tags for parsing - cert_req = load_certificate_request("\n".join(lines), wrap) + wrap = ( + lines[0].find('-----') < 0 + ) # Only base64 pasted, add the CSR tags for parsing + cert_req = load_certificate_request('\n'.join(lines), wrap) if not cert_req: - print("Invalid certificate request") + print('Invalid certificate request') return None cert = generate_certificate(cert_req, ca_cert, ca_private_key, is_ca=False) - + passphrase = None if private_key is not None: passphrase = ask_passphrase() @@ -554,12 +687,17 @@ def generate_certificate_sign(name, ca_name, install=False, file=False): return None if install: - install_certificate(name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=False) + install_certificate( + name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=False + ) if file: write_file(f'{name}.pem', encode_certificate(cert)) if private_key is not None: - write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase)) + write_file( + f'{name}.key', encode_private_key(private_key, passphrase=passphrase) + ) + def generate_certificate_selfsign(name, install=False, file=False): private_key, key_type = generate_private_key() @@ -573,11 +711,21 @@ def generate_certificate_selfsign(name, install=False, file=False): return None if install: - install_certificate(name, cert, private_key=private_key, key_type=key_type, key_passphrase=passphrase, is_ca=False) + install_certificate( + name, + cert, + private_key=private_key, + key_type=key_type, + key_passphrase=passphrase, + is_ca=False, + ) if file: write_file(f'{name}.pem', encode_certificate(cert)) - write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase)) + write_file( + f'{name}.key', encode_private_key(private_key, passphrase=passphrase) + ) + def generate_certificate_revocation_list(ca_name, install=False, file=False): ca_dict = get_config_ca_certificate(ca_name) @@ -589,17 +737,19 @@ def generate_certificate_revocation_list(ca_name, install=False, file=False): ca_cert = load_certificate(ca_dict['certificate']) if not ca_cert: - print("Failed to load CA certificate, aborting") + print('Failed to load CA certificate, aborting') return None ca_private = ca_dict['private'] ca_private_passphrase = None if 'password_protected' in ca_private: ca_private_passphrase = ask_input('Enter CA private key passphrase:') - ca_private_key = load_private_key(ca_private['key'], passphrase=ca_private_passphrase) + ca_private_key = load_private_key( + ca_private['key'], passphrase=ca_private_passphrase + ) if not ca_private_key: - print("Failed to load CA private key, aborting") + print('Failed to load CA private key, aborting') return None revoked_certs = get_config_revoked_certificates() @@ -620,13 +770,13 @@ def generate_certificate_revocation_list(ca_name, install=False, file=False): continue if not to_revoke: - print("No revoked certificates to add to the CRL") + print('No revoked certificates to add to the CRL') return None crl = create_certificate_revocation_list(ca_cert, ca_private_key, to_revoke) if not crl: - print("Failed to create CRL") + print('Failed to create CRL') return None if not install and not file: @@ -637,7 +787,8 @@ def generate_certificate_revocation_list(ca_name, install=False, file=False): install_crl(ca_name, crl) if file: - write_file(f'{name}.crl', encode_certificate(crl)) + write_file(f'{ca_name}.crl', encode_certificate(crl)) + def generate_ssh_keypair(name, install=False, file=False): private_key, key_type = generate_private_key() @@ -646,29 +797,42 @@ def generate_ssh_keypair(name, install=False, file=False): if not install and not file: print(encode_public_key(public_key, encoding='OpenSSH', key_format='OpenSSH')) - print("") - print(encode_private_key(private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase)) + print('') + print( + encode_private_key( + private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase + ) + ) return None if install: install_ssh_key(name, public_key, private_key, passphrase) if file: - write_file(f'{name}.pem', encode_public_key(public_key, encoding='OpenSSH', key_format='OpenSSH')) - write_file(f'{name}.key', encode_private_key(private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase)) + write_file( + f'{name}.pem', + encode_public_key(public_key, encoding='OpenSSH', key_format='OpenSSH'), + ) + write_file( + f'{name}.key', + encode_private_key( + private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase + ), + ) + def generate_dh_parameters(name, install=False, file=False): bits = ask_input('Enter DH parameters key size:', default=2048, numeric_only=True) - print("Generating parameters...") + print('Generating parameters...') dh_params = create_dh_parameters(bits) if not dh_params: - print("Failed to create DH parameters") + print('Failed to create DH parameters') return None if not install and not file: - print("DH Parameters:") + print('DH Parameters:') print(encode_dh_parameters(dh_params)) if install: @@ -677,6 +841,7 @@ def generate_dh_parameters(name, install=False, file=False): if file: write_file(f'{name}.pem', encode_dh_parameters(dh_params)) + def generate_keypair(name, install=False, file=False): private_key, key_type = generate_private_key() public_key = private_key.public_key() @@ -684,7 +849,7 @@ def generate_keypair(name, install=False, file=False): if not install and not file: print(encode_public_key(public_key)) - print("") + print('') print(encode_private_key(private_key, passphrase=passphrase)) return None @@ -693,13 +858,16 @@ def generate_keypair(name, install=False, file=False): if file: write_file(f'{name}.pem', encode_public_key(public_key)) - write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase)) + write_file( + f'{name}.key', encode_private_key(private_key, passphrase=passphrase) + ) + def generate_openvpn_key(name, install=False, file=False): result = cmd('openvpn --genkey secret /dev/stdout | grep -o "^[^#]*"') if not result: - print("Failed to generate OpenVPN key") + print('Failed to generate OpenVPN key') return None if not install and not file: @@ -707,11 +875,13 @@ def generate_openvpn_key(name, install=False, file=False): return None if install: - key_lines = result.split("\n") - key_data = "".join(key_lines[1:-1]) # Remove wrapper tags and line endings + key_lines = result.split('\n') + key_data = ''.join(key_lines[1:-1]) # Remove wrapper tags and line endings key_version = '1' - version_search = re.search(r'BEGIN OpenVPN Static key V(\d+)', result) # Future-proofing (hopefully) + version_search = re.search( + r'BEGIN OpenVPN Static key V(\d+)', result + ) # Future-proofing (hopefully) if version_search: key_version = version_search[1] @@ -720,6 +890,7 @@ def generate_openvpn_key(name, install=False, file=False): if file: write_file(f'{name}.key', result) + def generate_wireguard_key(interface=None, install=False): private_key = cmd('wg genkey') public_key = cmd('wg pubkey', input=private_key) @@ -730,6 +901,7 @@ def generate_wireguard_key(interface=None, install=False): print(f'Private key: {private_key}') print(f'Public key: {public_key}', end='\n\n') + def generate_wireguard_psk(interface=None, peer=None, install=False): psk = cmd('wg genpsk') if interface and peer and install: @@ -737,8 +909,11 @@ def generate_wireguard_psk(interface=None, peer=None, install=False): else: print(f'Pre-shared key: {psk}') + # Import functions -def import_ca_certificate(name, path=None, key_path=None, no_prompt=False, passphrase=None): +def import_ca_certificate( + name, path=None, key_path=None, no_prompt=False, passphrase=None +): if path: if not os.path.exists(path): print(f'File not found: {path}') @@ -775,7 +950,10 @@ def import_ca_certificate(name, path=None, key_path=None, no_prompt=False, passp install_certificate(name, private_key=key, is_ca=True) -def import_certificate(name, path=None, key_path=None, no_prompt=False, passphrase=None): + +def import_certificate( + name, path=None, key_path=None, no_prompt=False, passphrase=None +): if path: if not os.path.exists(path): print(f'File not found: {path}') @@ -812,6 +990,7 @@ def import_certificate(name, path=None, key_path=None, no_prompt=False, passphra install_certificate(name, private_key=key, is_ca=False) + def import_crl(name, path): if not os.path.exists(path): print(f'File not found: {path}') @@ -829,6 +1008,7 @@ def import_crl(name, path): install_crl(name, crl) + def import_dh_parameters(name, path): if not os.path.exists(path): print(f'File not found: {path}') @@ -846,6 +1026,7 @@ def import_dh_parameters(name, path): install_dh_parameters(name, dh) + def import_keypair(name, path=None, key_path=None, no_prompt=False, passphrase=None): if path: if not os.path.exists(path): @@ -883,6 +1064,7 @@ def import_keypair(name, path=None, key_path=None, no_prompt=False, passphrase=N install_keypair(name, None, private_key=key, prompt=False) + def import_openvpn_secret(name, path): if not os.path.exists(path): print(f'File not found: {path}') @@ -892,11 +1074,15 @@ def import_openvpn_secret(name, path): key_version = '1' with open(path) as f: - key_lines = f.read().strip().split("\n") - key_lines = list(filter(lambda line: not line.strip().startswith('#'), key_lines)) # Remove commented lines - key_data = "".join(key_lines[1:-1]) # Remove wrapper tags and line endings - - version_search = re.search(r'BEGIN OpenVPN Static key V(\d+)', key_lines[0]) # Future-proofing (hopefully) + key_lines = f.read().strip().split('\n') + key_lines = list( + filter(lambda line: not line.strip().startswith('#'), key_lines) + ) # Remove commented lines + key_data = ''.join(key_lines[1:-1]) # Remove wrapper tags and line endings + + version_search = re.search( + r'BEGIN OpenVPN Static key V(\d+)', key_lines[0] + ) # Future-proofing (hopefully) if version_search: key_version = version_search[1] @@ -1007,7 +1193,15 @@ def import_pki( def show_certificate_authority( raw: bool, name: typing.Optional[str] = None, pem: typing.Optional[bool] = False ): - headers = ['Name', 'Subject', 'Issuer CN', 'Issued', 'Expiry', 'Private Key', 'Parent'] + headers = [ + 'Name', + 'Subject', + 'Issuer CN', + 'Issued', + 'Expiry', + 'Private Key', + 'Parent', + ] data = [] certs = get_config_ca_certificate() if certs: @@ -1024,7 +1218,7 @@ def show_certificate_authority( return parent_ca_name = get_certificate_ca(cert, certs) - cert_issuer_cn = cert.issuer.rfc4514_string().split(",")[0] + cert_issuer_cn = cert.issuer.rfc4514_string().split(',')[0] if not parent_ca_name or parent_ca_name == cert_name: parent_ca_name = 'N/A' @@ -1032,10 +1226,24 @@ def show_certificate_authority( if not cert: continue - have_private = 'Yes' if 'private' in cert_dict and 'key' in cert_dict['private'] else 'No' - data.append([cert_name, cert.subject.rfc4514_string(), cert_issuer_cn, cert.not_valid_before, cert.not_valid_after, have_private, parent_ca_name]) + have_private = ( + 'Yes' + if 'private' in cert_dict and 'key' in cert_dict['private'] + else 'No' + ) + data.append( + [ + cert_name, + cert.subject.rfc4514_string(), + cert_issuer_cn, + cert.not_valid_before, + cert.not_valid_after, + have_private, + parent_ca_name, + ] + ) - print("Certificate Authorities:") + print('Certificate Authorities:') print(tabulate.tabulate(data, headers)) @@ -1046,7 +1254,17 @@ def show_certificate( pem: typing.Optional[bool] = False, fingerprint: typing.Optional[ArgsFingerprint] = None, ): - headers = ['Name', 'Type', 'Subject CN', 'Issuer CN', 'Issued', 'Expiry', 'Revoked', 'Private Key', 'CA Present'] + headers = [ + 'Name', + 'Type', + 'Subject CN', + 'Issuer CN', + 'Issued', + 'Expiry', + 'Revoked', + 'Private Key', + 'CA Present', + ] data = [] certs = get_config_certificate() if certs: @@ -1071,8 +1289,8 @@ def show_certificate( return ca_name = get_certificate_ca(cert, ca_certs) - cert_subject_cn = cert.subject.rfc4514_string().split(",")[0] - cert_issuer_cn = cert.issuer.rfc4514_string().split(",")[0] + cert_subject_cn = cert.subject.rfc4514_string().split(',')[0] + cert_issuer_cn = cert.issuer.rfc4514_string().split(',')[0] cert_type = 'Unknown' try: @@ -1081,18 +1299,31 @@ def show_certificate( cert_type = 'Server' elif ext and ExtendedKeyUsageOID.CLIENT_AUTH in ext.value: cert_type = 'Client' - except: + except Exception: pass revoked = 'Yes' if 'revoke' in cert_dict else 'No' - have_private = 'Yes' if 'private' in cert_dict and 'key' in cert_dict['private'] else 'No' + have_private = ( + 'Yes' + if 'private' in cert_dict and 'key' in cert_dict['private'] + else 'No' + ) have_ca = f'Yes ({ca_name})' if ca_name else 'No' - data.append([ - cert_name, cert_type, cert_subject_cn, cert_issuer_cn, - cert.not_valid_before, cert.not_valid_after, - revoked, have_private, have_ca]) + data.append( + [ + cert_name, + cert_type, + cert_subject_cn, + cert_issuer_cn, + cert.not_valid_before, + cert.not_valid_after, + revoked, + have_private, + have_ca, + ] + ) - print("Certificates:") + print('Certificates:') print(tabulate.tabulate(data, headers)) @@ -1123,13 +1354,15 @@ def show_crl( print(encode_certificate(crl)) continue - certs = get_revoked_by_serial_numbers([revoked.serial_number for revoked in crl]) - data.append([cert_name, crl.last_update, ", ".join(certs)]) + certs = get_revoked_by_serial_numbers( + [revoked.serial_number for revoked in crl] + ) + data.append([cert_name, crl.last_update, ', '.join(certs)]) if name and pem: return - print("Certificate Revocation Lists:") + print('Certificate Revocation Lists:') print(tabulate.tabulate(data, headers)) -- cgit v1.2.3