From 90a4827284acd3cb072cdfeef323c522802c6449 Mon Sep 17 00:00:00 2001
From: sarthurdev <965089+sarthurdev@users.noreply.github.com>
Date: Wed, 9 Oct 2024 14:55:11 +0200
Subject: haproxy: T6745: Rename `reverse-proxy` to `haproxy`
---
src/op_mode/load-balancing_haproxy.py | 237 ++++++++++++++++++++++++++++++++++
src/op_mode/restart.py | 10 +-
src/op_mode/reverseproxy.py | 237 ----------------------------------
3 files changed, 242 insertions(+), 242 deletions(-)
create mode 100755 src/op_mode/load-balancing_haproxy.py
delete mode 100755 src/op_mode/reverseproxy.py
(limited to 'src/op_mode')
diff --git a/src/op_mode/load-balancing_haproxy.py b/src/op_mode/load-balancing_haproxy.py
new file mode 100755
index 000000000..ae6734e16
--- /dev/null
+++ b/src/op_mode/load-balancing_haproxy.py
@@ -0,0 +1,237 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 2023-2024 VyOS maintainers and contributors
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 or later as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+import json
+import socket
+import sys
+
+from tabulate import tabulate
+from vyos.configquery import ConfigTreeQuery
+
+import vyos.opmode
+
+socket_path = '/run/haproxy/admin.sock'
+timeout = 5
+
+
+def _execute_haproxy_command(command):
+ """Execute a command on the HAProxy UNIX socket and retrieve the response.
+
+ Args:
+ command (str): The command to be executed.
+
+ Returns:
+ str: The response received from the HAProxy UNIX socket.
+
+ Raises:
+ socket.error: If there is an error while connecting or communicating with the socket.
+
+ Finally:
+ Closes the socket connection.
+
+ Notes:
+ - HAProxy expects a newline character at the end of the command.
+ - The socket connection is established using the HAProxy UNIX socket.
+ - The response from the socket is received and decoded.
+
+ Example:
+ response = _execute_haproxy_command('show stat')
+ print(response)
+ """
+ try:
+ # HAProxy expects new line for command
+ command = f'{command}\n'
+
+ # Connect to the HAProxy UNIX socket
+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ sock.connect(socket_path)
+
+ # Set the socket timeout
+ sock.settimeout(timeout)
+
+ # Send the command
+ sock.sendall(command.encode())
+
+ # Receive and decode the response
+ response = b''
+ while True:
+ data = sock.recv(4096)
+ if not data:
+ break
+ response += data
+ response = response.decode()
+
+ return (response)
+
+ except socket.error as e:
+ print(f"Error: {e}")
+
+ finally:
+ # Close the socket
+ sock.close()
+
+
+def _convert_seconds(seconds):
+ """Convert seconds to days, hours, minutes, and seconds.
+
+ Args:
+ seconds (int): The number of seconds to convert.
+
+ Returns:
+ tuple: A tuple containing the number of days, hours, minutes, and seconds.
+ """
+ minutes = seconds // 60
+ hours = minutes // 60
+ days = hours // 24
+
+ return days, hours % 24, minutes % 60, seconds % 60
+
+
+def _last_change_format(seconds):
+ """Format the time components into a string representation.
+
+ Args:
+ seconds (int): The total number of seconds.
+
+ Returns:
+ str: The formatted time string with days, hours, minutes, and seconds.
+
+ Examples:
+ >>> _last_change_format(1434)
+ '23m54s'
+ >>> _last_change_format(93734)
+ '1d0h23m54s'
+ >>> _last_change_format(85434)
+ '23h23m54s'
+ """
+ days, hours, minutes, seconds = _convert_seconds(seconds)
+ time_format = ""
+
+ if days:
+ time_format += f"{days}d"
+ if hours:
+ time_format += f"{hours}h"
+ if minutes:
+ time_format += f"{minutes}m"
+ if seconds:
+ time_format += f"{seconds}s"
+
+ return time_format
+
+
+def _get_json_data():
+ """Get haproxy data format JSON"""
+ return _execute_haproxy_command('show stat json')
+
+
+def _get_raw_data():
+ """Retrieve raw data from JSON and organize it into a dictionary.
+
+ Returns:
+ dict: A dictionary containing the organized data categorized
+ into frontend, backend, and server.
+ """
+
+ data = json.loads(_get_json_data())
+ lb_dict = {'frontend': [], 'backend': [], 'server': []}
+
+ for key in data:
+ frontend = []
+ backend = []
+ server = []
+ for entry in key:
+ obj_type = entry['objType'].lower()
+ position = entry['field']['pos']
+ name = entry['field']['name']
+ value = entry['value']['value']
+
+ dict_entry = {'pos': position, 'name': {name: value}}
+
+ if obj_type == 'frontend':
+ frontend.append(dict_entry)
+ elif obj_type == 'backend':
+ backend.append(dict_entry)
+ elif obj_type == 'server':
+ server.append(dict_entry)
+
+ if len(frontend) > 0:
+ lb_dict['frontend'].append(frontend)
+ if len(backend) > 0:
+ lb_dict['backend'].append(backend)
+ if len(server) > 0:
+ lb_dict['server'].append(server)
+
+ return lb_dict
+
+
+def _get_formatted_output(data):
+ """
+ Format the data into a tabulated output.
+
+ Args:
+ data (dict): The data to be formatted.
+
+ Returns:
+ str: The tabulated output representing the formatted data.
+ """
+ table = []
+ headers = [
+ "Proxy name", "Role", "Status", "Req rate", "Resp time", "Last change"
+ ]
+
+ for key in data:
+ for item in data[key]:
+ row = [None] * len(headers)
+
+ for element in item:
+ if 'pxname' in element['name']:
+ row[0] = element['name']['pxname']
+ elif 'svname' in element['name']:
+ row[1] = element['name']['svname']
+ elif 'status' in element['name']:
+ row[2] = element['name']['status']
+ elif 'req_rate' in element['name']:
+ row[3] = element['name']['req_rate']
+ elif 'rtime' in element['name']:
+ row[4] = f"{element['name']['rtime']} ms"
+ elif 'lastchg' in element['name']:
+ row[5] = _last_change_format(element['name']['lastchg'])
+ table.append(row)
+
+ out = tabulate(table, headers, numalign="left")
+ return out
+
+
+def show(raw: bool):
+ config = ConfigTreeQuery()
+ if not config.exists('load-balancing haproxy'):
+ raise vyos.opmode.UnconfiguredSubsystem('Haproxy is not configured')
+
+ data = _get_raw_data()
+ if raw:
+ return data
+ else:
+ return _get_formatted_output(data)
+
+
+if __name__ == '__main__':
+ try:
+ res = vyos.opmode.run(sys.modules[__name__])
+ if res:
+ print(res)
+ except (ValueError, vyos.opmode.Error) as e:
+ print(e)
+ sys.exit(1)
diff --git a/src/op_mode/restart.py b/src/op_mode/restart.py
index a83c8b9d8..3b0031f34 100755
--- a/src/op_mode/restart.py
+++ b/src/op_mode/restart.py
@@ -41,6 +41,10 @@ service_map = {
'systemd_service': 'pdns-recursor',
'path': ['service', 'dns', 'forwarding'],
},
+ 'haproxy': {
+ 'systemd_service': 'haproxy',
+ 'path': ['load-balancing', 'haproxy'],
+ },
'igmp_proxy': {
'systemd_service': 'igmpproxy',
'path': ['protocols', 'igmp-proxy'],
@@ -53,10 +57,6 @@ service_map = {
'systemd_service': 'avahi-daemon',
'path': ['service', 'mdns', 'repeater'],
},
- 'reverse_proxy': {
- 'systemd_service': 'haproxy',
- 'path': ['load-balancing', 'reverse-proxy'],
- },
'router_advert': {
'systemd_service': 'radvd',
'path': ['service', 'router-advert'],
@@ -83,10 +83,10 @@ services = typing.Literal[
'dhcpv6',
'dns_dynamic',
'dns_forwarding',
+ 'haproxy',
'igmp_proxy',
'ipsec',
'mdns_repeater',
- 'reverse_proxy',
'router_advert',
'snmp',
'ssh',
diff --git a/src/op_mode/reverseproxy.py b/src/op_mode/reverseproxy.py
deleted file mode 100755
index 19704182a..000000000
--- a/src/op_mode/reverseproxy.py
+++ /dev/null
@@ -1,237 +0,0 @@
-#!/usr/bin/env python3
-#
-# Copyright (C) 2023-2024 VyOS maintainers and contributors
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 2 or later as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see .
-
-import json
-import socket
-import sys
-
-from tabulate import tabulate
-from vyos.configquery import ConfigTreeQuery
-
-import vyos.opmode
-
-socket_path = '/run/haproxy/admin.sock'
-timeout = 5
-
-
-def _execute_haproxy_command(command):
- """Execute a command on the HAProxy UNIX socket and retrieve the response.
-
- Args:
- command (str): The command to be executed.
-
- Returns:
- str: The response received from the HAProxy UNIX socket.
-
- Raises:
- socket.error: If there is an error while connecting or communicating with the socket.
-
- Finally:
- Closes the socket connection.
-
- Notes:
- - HAProxy expects a newline character at the end of the command.
- - The socket connection is established using the HAProxy UNIX socket.
- - The response from the socket is received and decoded.
-
- Example:
- response = _execute_haproxy_command('show stat')
- print(response)
- """
- try:
- # HAProxy expects new line for command
- command = f'{command}\n'
-
- # Connect to the HAProxy UNIX socket
- sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- sock.connect(socket_path)
-
- # Set the socket timeout
- sock.settimeout(timeout)
-
- # Send the command
- sock.sendall(command.encode())
-
- # Receive and decode the response
- response = b''
- while True:
- data = sock.recv(4096)
- if not data:
- break
- response += data
- response = response.decode()
-
- return (response)
-
- except socket.error as e:
- print(f"Error: {e}")
-
- finally:
- # Close the socket
- sock.close()
-
-
-def _convert_seconds(seconds):
- """Convert seconds to days, hours, minutes, and seconds.
-
- Args:
- seconds (int): The number of seconds to convert.
-
- Returns:
- tuple: A tuple containing the number of days, hours, minutes, and seconds.
- """
- minutes = seconds // 60
- hours = minutes // 60
- days = hours // 24
-
- return days, hours % 24, minutes % 60, seconds % 60
-
-
-def _last_change_format(seconds):
- """Format the time components into a string representation.
-
- Args:
- seconds (int): The total number of seconds.
-
- Returns:
- str: The formatted time string with days, hours, minutes, and seconds.
-
- Examples:
- >>> _last_change_format(1434)
- '23m54s'
- >>> _last_change_format(93734)
- '1d0h23m54s'
- >>> _last_change_format(85434)
- '23h23m54s'
- """
- days, hours, minutes, seconds = _convert_seconds(seconds)
- time_format = ""
-
- if days:
- time_format += f"{days}d"
- if hours:
- time_format += f"{hours}h"
- if minutes:
- time_format += f"{minutes}m"
- if seconds:
- time_format += f"{seconds}s"
-
- return time_format
-
-
-def _get_json_data():
- """Get haproxy data format JSON"""
- return _execute_haproxy_command('show stat json')
-
-
-def _get_raw_data():
- """Retrieve raw data from JSON and organize it into a dictionary.
-
- Returns:
- dict: A dictionary containing the organized data categorized
- into frontend, backend, and server.
- """
-
- data = json.loads(_get_json_data())
- lb_dict = {'frontend': [], 'backend': [], 'server': []}
-
- for key in data:
- frontend = []
- backend = []
- server = []
- for entry in key:
- obj_type = entry['objType'].lower()
- position = entry['field']['pos']
- name = entry['field']['name']
- value = entry['value']['value']
-
- dict_entry = {'pos': position, 'name': {name: value}}
-
- if obj_type == 'frontend':
- frontend.append(dict_entry)
- elif obj_type == 'backend':
- backend.append(dict_entry)
- elif obj_type == 'server':
- server.append(dict_entry)
-
- if len(frontend) > 0:
- lb_dict['frontend'].append(frontend)
- if len(backend) > 0:
- lb_dict['backend'].append(backend)
- if len(server) > 0:
- lb_dict['server'].append(server)
-
- return lb_dict
-
-
-def _get_formatted_output(data):
- """
- Format the data into a tabulated output.
-
- Args:
- data (dict): The data to be formatted.
-
- Returns:
- str: The tabulated output representing the formatted data.
- """
- table = []
- headers = [
- "Proxy name", "Role", "Status", "Req rate", "Resp time", "Last change"
- ]
-
- for key in data:
- for item in data[key]:
- row = [None] * len(headers)
-
- for element in item:
- if 'pxname' in element['name']:
- row[0] = element['name']['pxname']
- elif 'svname' in element['name']:
- row[1] = element['name']['svname']
- elif 'status' in element['name']:
- row[2] = element['name']['status']
- elif 'req_rate' in element['name']:
- row[3] = element['name']['req_rate']
- elif 'rtime' in element['name']:
- row[4] = f"{element['name']['rtime']} ms"
- elif 'lastchg' in element['name']:
- row[5] = _last_change_format(element['name']['lastchg'])
- table.append(row)
-
- out = tabulate(table, headers, numalign="left")
- return out
-
-
-def show(raw: bool):
- config = ConfigTreeQuery()
- if not config.exists('load-balancing reverse-proxy'):
- raise vyos.opmode.UnconfiguredSubsystem('Reverse-proxy is not configured')
-
- data = _get_raw_data()
- if raw:
- return data
- else:
- return _get_formatted_output(data)
-
-
-if __name__ == '__main__':
- try:
- res = vyos.opmode.run(sys.modules[__name__])
- if res:
- print(res)
- except (ValueError, vyos.opmode.Error) as e:
- print(e)
- sys.exit(1)
--
cgit v1.2.3
From 8943446c8c79c6daad2326ddc7a59950123b35d2 Mon Sep 17 00:00:00 2001
From: Nataliia Solomko
Date: Fri, 18 Oct 2024 15:42:59 +0300
Subject: pki: T4914: Rewrite the PKI op mode in the new style
---
op-mode-definitions/pki.xml.in | 108 +++++++-------
python/vyos/configsession.py | 3 +-
python/vyos/opmode.py | 5 +-
src/op_mode/pki.py | 295 +++++++++++++++++++++------------------
src/services/api/rest/routers.py | 4 +-
5 files changed, 224 insertions(+), 191 deletions(-)
(limited to 'src/op_mode')
diff --git a/op-mode-definitions/pki.xml.in b/op-mode-definitions/pki.xml.in
index 254ef08cc..866f482bf 100644
--- a/op-mode-definitions/pki.xml.in
+++ b/op-mode-definitions/pki.xml.in
@@ -27,7 +27,7 @@
<filename>
- sudo -E ${vyos_op_scripts_dir}/pki.py --action generate --ca "$7" --sign "$5" --file
+ sudo -E ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type ca --name "$7" --sign "$5" --file
@@ -36,10 +36,10 @@
<certificate name>
- ${vyos_op_scripts_dir}/pki.py --action generate --ca "$7" --sign "$5" --install
+ ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type ca --name "$7" --sign "$5" --install
- ${vyos_op_scripts_dir}/pki.py --action generate --ca "noname" --sign "$5"
+ ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type ca --sign "$5"
@@ -48,7 +48,7 @@
<filename>
- sudo -E ${vyos_op_scripts_dir}/pki.py --action generate --ca "$5" --file
+ sudo -E ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type ca --name "$5" --file
@@ -57,10 +57,10 @@
<CA name>
- ${vyos_op_scripts_dir}/pki.py --action generate --ca "$5" --install
+ ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type ca --name "$5" --install
- ${vyos_op_scripts_dir}/pki.py --action generate --ca "noname"
+ ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type ca
@@ -79,7 +79,7 @@
<filename>
- sudo -E ${vyos_op_scripts_dir}/pki.py --action generate --certificate "$6" --self-sign --file
+ sudo -E ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type certificate --name "$6" --self-sign --file
@@ -88,10 +88,10 @@
<certificate name>
- ${vyos_op_scripts_dir}/pki.py --action generate --certificate "$6" --self-sign --install
+ ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type certificate --name "$6" --self-sign --install
- ${vyos_op_scripts_dir}/pki.py --action generate --certificate "noname" --self-sign
+ ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type certificate --self-sign
@@ -108,7 +108,7 @@
<filename>
- sudo -E ${vyos_op_scripts_dir}/pki.py --action generate --certificate "$7" --sign "$5" --file
+ sudo -E ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type certificate --name "$7" --sign "$5" --file
@@ -117,10 +117,10 @@
<certificate name>
- ${vyos_op_scripts_dir}/pki.py --action generate --certificate "$7" --sign "$5" --install
+ ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type certificate --name "$7" --sign "$5" --install
- ${vyos_op_scripts_dir}/pki.py --action generate --certificate "noname" --sign "$5"
+ ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type certificate --sign "$5"
@@ -129,7 +129,7 @@
<filename>
- sudo -E ${vyos_op_scripts_dir}/pki.py --action generate --certificate "$5" --file
+ sudo -E ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type certificate --name "$5" --file
@@ -138,10 +138,10 @@
<certificate name>
- ${vyos_op_scripts_dir}/pki.py --action generate --certificate "$5" --install
+ ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type certificate --name "$5" --install
- ${vyos_op_scripts_dir}/pki.py --action generate --certificate "noname"
+ ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type certificate
@@ -158,16 +158,16 @@
<filename>
- sudo -E ${vyos_op_scripts_dir}/pki.py --action generate --crl "$4" --file
+ sudo -E ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type crl --name "$4" --file
Commands for installing generated CRL into running configuration
- ${vyos_op_scripts_dir}/pki.py --action generate --crl "$4" --install
+ ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type crl --name "$4" --install
- ${vyos_op_scripts_dir}/pki.py --action generate --crl "$4"
+ ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type crl --name "$4"
@@ -181,7 +181,7 @@
<filename>
- sudo -E ${vyos_op_scripts_dir}/pki.py --action generate --dh "$5" --file
+ sudo -E ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type dh --name "$5" --file
@@ -190,10 +190,10 @@
<DH name>
- ${vyos_op_scripts_dir}/pki.py --action generate --dh "$5" --install
+ ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type dh --name "$5" --install
- ${vyos_op_scripts_dir}/pki.py --action generate --dh "noname"
+ ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type dh
@@ -207,7 +207,7 @@
<filename>
- sudo -E ${vyos_op_scripts_dir}/pki.py --action generate --keypair "$5" --file
+ sudo -E ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type key-pair --name "$5" --file
@@ -216,10 +216,10 @@
<key name>
- ${vyos_op_scripts_dir}/pki.py --action generate --keypair "$5" --install
+ ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type key-pair --name "$5" --install
- ${vyos_op_scripts_dir}/pki.py --action generate --keypair "noname"
+ ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type key-pair
@@ -238,7 +238,7 @@
<filename>
- sudo -E ${vyos_op_scripts_dir}/pki.py --action generate --openvpn "$6" --file
+ sudo -E ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type openvpn --name "$6" --file
@@ -247,10 +247,10 @@
<key name>
- ${vyos_op_scripts_dir}/pki.py --action generate --openvpn "$6" --install
+ ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type openvpn --name "$6" --install
- ${vyos_op_scripts_dir}/pki.py --action generate --openvpn "noname"
+ ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type openvpn
@@ -266,7 +266,7 @@
<filename>
- sudo -E ${vyos_op_scripts_dir}/pki.py --action generate --ssh "$5" --file
+ sudo -E ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type ssh --name "$5" --file
@@ -275,10 +275,10 @@
<key name>
- ${vyos_op_scripts_dir}/pki.py --action generate --ssh "$5" --install
+ ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type ssh --name "$5" --install
- ${vyos_op_scripts_dir}/pki.py --action generate --ssh "noname"
+ ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type ssh
@@ -302,12 +302,12 @@
interfaces wireguard
- ${vyos_op_scripts_dir}/pki.py --action generate --wireguard --key --interface "$7" --install
+ ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type wireguard --key --interface "$7" --install
- ${vyos_op_scripts_dir}/pki.py --action generate --wireguard --key
+ ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type wireguard --key
@@ -334,14 +334,14 @@
interfaces wireguard ${COMP_WORDS[COMP_CWORD-2]} peer
- ${vyos_op_scripts_dir}/pki.py --action generate --wireguard --psk --interface "$7" --peer "$9" --install
+ ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type wireguard --psk --interface "$7" --peer "$9" --install
- ${vyos_op_scripts_dir}/pki.py --action generate --wireguard --psk
+ ${vyos_op_scripts_dir}/pki.py generate_pki --pki-type wireguard --psk
@@ -371,13 +371,13 @@
Path to CA certificate file
- sudo -E ${vyos_op_scripts_dir}/pki.py --action import --ca "$4" --filename "$6"
+ sudo -E ${vyos_op_scripts_dir}/pki.py import_pki --pki-type ca --name "$4" --filename "$6"
Path to private key file
- sudo -E ${vyos_op_scripts_dir}/pki.py --action import --ca "$4" --key-filename "$6"
+ sudo -E ${vyos_op_scripts_dir}/pki.py import_pki --pki-type ca --name "$4" --key-filename "$6"
@@ -393,13 +393,13 @@
Path to certificate file
- sudo -E ${vyos_op_scripts_dir}/pki.py --action import --certificate "$4" --filename "$6"
+ sudo -E ${vyos_op_scripts_dir}/pki.py import_pki --pki-type certificate --name "$4" --filename "$6"
Path to private key file
- sudo -E ${vyos_op_scripts_dir}/pki.py --action import --certificate "$4" --key-filename "$6"
+ sudo -E ${vyos_op_scripts_dir}/pki.py import_pki --pki-type certificate --name "$4" --key-filename "$6"
@@ -415,7 +415,7 @@
Path to CRL file
- sudo -E ${vyos_op_scripts_dir}/pki.py --action import --crl "$4" --filename "$6"
+ sudo -E ${vyos_op_scripts_dir}/pki.py import_pki --pki-type crl --name "$4" --filename "$6"
@@ -431,7 +431,7 @@
Path to DH parameters file
- sudo -E ${vyos_op_scripts_dir}/pki.py --action import --dh "$4" --filename "$6"
+ sudo -E ${vyos_op_scripts_dir}/pki.py import_pki --pki-type dh --name "$4" --filename "$6"
@@ -447,13 +447,13 @@
Path to public key file
- sudo -E ${vyos_op_scripts_dir}/pki.py --action import --keypair "$4" --filename "$6"
+ sudo -E ${vyos_op_scripts_dir}/pki.py import_pki --pki-type key-pair --name "$4" --filename "$6"
Path to private key file
- sudo -E ${vyos_op_scripts_dir}/pki.py --action import --keypair "$4" --key-filename "$6"
+ sudo -E ${vyos_op_scripts_dir}/pki.py import_pki --pki-type key-pair --name "$4" --key-filename "$6"
@@ -474,7 +474,7 @@
Path to shared secret key file
- sudo -E ${vyos_op_scripts_dir}/pki.py --action import --openvpn "$5" --filename "$7"
+ sudo -E ${vyos_op_scripts_dir}/pki.py import_pki --pki-type openvpn --name "$5" --filename "$7"
@@ -490,13 +490,13 @@
Show PKI x509 certificates
- sudo ${vyos_op_scripts_dir}/pki.py --action show
+ sudo ${vyos_op_scripts_dir}/pki.py show_all
Show x509 CA certificates
- sudo ${vyos_op_scripts_dir}/pki.py --action show --ca "all"
+ sudo ${vyos_op_scripts_dir}/pki.py show_certificate_authority
@@ -505,13 +505,13 @@
pki ca
- sudo ${vyos_op_scripts_dir}/pki.py --action show --ca "$4"
+ sudo ${vyos_op_scripts_dir}/pki.py show_certificate_authority --name "$4"
Show x509 CA certificate in PEM format
- sudo ${vyos_op_scripts_dir}/pki.py --action show --ca "$4" --pem
+ sudo ${vyos_op_scripts_dir}/pki.py show_certificate_authority --name "$4" --pem
@@ -519,7 +519,7 @@
Show x509 certificates
- sudo ${vyos_op_scripts_dir}/pki.py --action show --certificate "all"
+ sudo ${vyos_op_scripts_dir}/pki.py show_certificate
@@ -528,13 +528,13 @@
pki certificate
- sudo ${vyos_op_scripts_dir}/pki.py --action show --certificate "$4"
+ sudo ${vyos_op_scripts_dir}/pki.py show_certificate --name "$4"
Show x509 certificate in PEM format
- sudo ${vyos_op_scripts_dir}/pki.py --action show --certificate "$4" --pem
+ sudo ${vyos_op_scripts_dir}/pki.py show_certificate --name "$4" --pem
@@ -543,7 +543,7 @@
sha256 sha384 sha512
- sudo ${vyos_op_scripts_dir}/pki.py --action show --certificate "$4" --fingerprint "$6"
+ sudo ${vyos_op_scripts_dir}/pki.py show_certificate --name "$4" --fingerprint "$6"
@@ -551,7 +551,7 @@
Show x509 certificate revocation lists
- ${vyos_op_scripts_dir}/pki.py --action show --crl "all"
+ ${vyos_op_scripts_dir}/pki.py show_crl
@@ -560,13 +560,13 @@
pki ca
- ${vyos_op_scripts_dir}/pki.py --action show --crl "$4"
+ ${vyos_op_scripts_dir}/pki.py show_crl --name "$4"
Show x509 certificate revocation lists by CA name in PEM format
- ${vyos_op_scripts_dir}/pki.py --action show --crl "$4" --pem
+ ${vyos_op_scripts_dir}/pki.py show_crl --name "$4" --pem
diff --git a/python/vyos/configsession.py b/python/vyos/configsession.py
index 9c56d246a..5876dc5b0 100644
--- a/python/vyos/configsession.py
+++ b/python/vyos/configsession.py
@@ -42,8 +42,7 @@ INSTALL_IMAGE = [
IMPORT_PKI = ['/opt/vyatta/bin/vyatta-op-cmd-wrapper', 'import']
IMPORT_PKI_NO_PROMPT = [
'/usr/libexec/vyos/op_mode/pki.py',
- '--action',
- 'import',
+ 'import_pki',
'--no-prompt',
]
REMOVE_IMAGE = [
diff --git a/python/vyos/opmode.py b/python/vyos/opmode.py
index 066c8058f..136ac35e2 100644
--- a/python/vyos/opmode.py
+++ b/python/vyos/opmode.py
@@ -89,7 +89,10 @@ class InternalError(Error):
def _is_op_mode_function_name(name):
- if re.match(r"^(show|clear|reset|restart|add|update|delete|generate|set|renew|release|execute)", name):
+ if re.match(
+ r'^(show|clear|reset|restart|add|update|delete|generate|set|renew|release|execute|import)',
+ name,
+ ):
return True
else:
return False
diff --git a/src/op_mode/pki.py b/src/op_mode/pki.py
index 5652a5d74..56b873bb1 100755
--- a/src/op_mode/pki.py
+++ b/src/op_mode/pki.py
@@ -14,16 +14,18 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
-import argparse
import ipaddress
import os
import re
import sys
import tabulate
+import typing
from cryptography import x509
from cryptography.x509.oid import ExtendedKeyUsageOID
+import vyos.opmode
+
from vyos.config import Config
from vyos.config import config_dict_mangle_acme
from vyos.pki import encode_certificate
@@ -51,8 +53,36 @@ from vyos.utils.process import cmd
CERT_REQ_END = '-----END CERTIFICATE REQUEST-----'
auth_dir = '/config/auth'
+ArgsPkiType = typing.Literal['ca', 'certificate', 'dh', 'key-pair', 'openvpn', 'crl']
+ArgsPkiTypeGen = typing.Literal[ArgsPkiType, typing.Literal['ssh', 'wireguard']]
+ArgsFingerprint = typing.Literal['sha256', 'sha384', 'sha512']
+
# Helper Functions
conf = Config()
+
+
+def _verify(target):
+ """Decorator checks if config for PKI exists"""
+ from functools import wraps
+
+ if target not in ['ca', 'certificate']:
+ raise ValueError('Invalid PKI')
+
+ def _verify_target(func):
+ @wraps(func)
+ def _wrapper(*args, **kwargs):
+ name = kwargs.get('name')
+ unconf_message = f'PKI {target} "{name}" does not exist!'
+ if name:
+ if not conf.exists(['pki', target, name]):
+ raise vyos.opmode.UnconfiguredSubsystem(unconf_message)
+ return func(*args, **kwargs)
+
+ return _wrapper
+
+ return _verify_target
+
+
def get_default_values():
# Fetch default x509 values
base = ['pki', 'x509', 'default']
@@ -872,8 +902,111 @@ def import_openvpn_secret(name, path):
install_openvpn_key(name, key_data, key_version)
-# Show functions
-def show_certificate_authority(name=None, pem=False):
+
+def generate_pki(
+ raw: bool,
+ pki_type: ArgsPkiTypeGen,
+ name: typing.Optional[str],
+ file: typing.Optional[bool],
+ install: typing.Optional[bool],
+ sign: typing.Optional[str],
+ self_sign: typing.Optional[bool],
+ key: typing.Optional[bool],
+ psk: typing.Optional[bool],
+ interface: typing.Optional[str],
+ peer: typing.Optional[str],
+):
+ try:
+ if pki_type == 'ca':
+ if sign:
+ generate_ca_certificate_sign(name, sign, install=install, file=file)
+ else:
+ generate_ca_certificate(name, install=install, file=file)
+ elif pki_type == 'certificate':
+ if sign:
+ generate_certificate_sign(name, sign, install=install, file=file)
+ elif self_sign:
+ generate_certificate_selfsign(name, install=install, file=file)
+ else:
+ generate_certificate_request(name=name, install=install, file=file)
+
+ elif pki_type == 'crl':
+ generate_certificate_revocation_list(name, install=install, file=file)
+
+ elif pki_type == 'ssh':
+ generate_ssh_keypair(name, install=install, file=file)
+
+ elif pki_type == 'dh':
+ generate_dh_parameters(name, install=install, file=file)
+
+ elif pki_type == 'key-pair':
+ generate_keypair(name, install=install, file=file)
+
+ elif pki_type == 'openvpn':
+ generate_openvpn_key(name, install=install, file=file)
+
+ elif pki_type == 'wireguard':
+ # WireGuard supports writing key directly into the CLI, but this
+ # requires the vyos_libexec_dir environment variable to be set
+ os.environ['vyos_libexec_dir'] = '/usr/libexec/vyos'
+
+ if key:
+ generate_wireguard_key(interface, install=install)
+ if psk:
+ generate_wireguard_psk(interface, peer=peer, install=install)
+ except KeyboardInterrupt:
+ print('Aborted')
+ sys.exit(0)
+
+
+def import_pki(
+ name: str,
+ pki_type: ArgsPkiType,
+ filename: typing.Optional[str],
+ key_filename: typing.Optional[str],
+ no_prompt: typing.Optional[bool],
+ passphrase: typing.Optional[str],
+):
+ try:
+ if pki_type == 'ca':
+ import_ca_certificate(
+ name,
+ path=filename,
+ key_path=key_filename,
+ no_prompt=no_prompt,
+ passphrase=passphrase,
+ )
+ elif pki_type == 'certificate':
+ import_certificate(
+ name,
+ path=filename,
+ key_path=key_filename,
+ no_prompt=no_prompt,
+ passphrase=passphrase,
+ )
+ elif pki_type == 'crl':
+ import_crl(name, filename)
+ elif pki_type == 'dh':
+ import_dh_parameters(name, filename)
+ elif pki_type == 'key-pair':
+ import_keypair(
+ name,
+ path=filename,
+ key_path=key_filename,
+ no_prompt=no_prompt,
+ passphrase=passphrase,
+ )
+ elif pki_type == 'openvpn':
+ import_openvpn_secret(name, filename)
+ except KeyboardInterrupt:
+ print('Aborted')
+ sys.exit(0)
+
+
+@_verify('ca')
+def show_certificate_authority(
+ raw: bool, name: typing.Optional[str] = None, pem: typing.Optional[bool] = False
+):
headers = ['Name', 'Subject', 'Issuer CN', 'Issued', 'Expiry', 'Private Key', 'Parent']
data = []
certs = get_config_ca_certificate()
@@ -905,7 +1038,14 @@ def show_certificate_authority(name=None, pem=False):
print("Certificate Authorities:")
print(tabulate.tabulate(data, headers))
-def show_certificate(name=None, pem=False, fingerprint_hash=None):
+
+@_verify('certificate')
+def show_certificate(
+ raw: bool,
+ name: typing.Optional[str] = None,
+ pem: typing.Optional[bool] = False,
+ fingerprint: typing.Optional[ArgsFingerprint] = None,
+):
headers = ['Name', 'Type', 'Subject CN', 'Issuer CN', 'Issued', 'Expiry', 'Revoked', 'Private Key', 'CA Present']
data = []
certs = get_config_certificate()
@@ -926,8 +1066,8 @@ def show_certificate(name=None, pem=False, fingerprint_hash=None):
if name and pem:
print(encode_certificate(cert))
return
- elif name and fingerprint_hash:
- print(get_certificate_fingerprint(cert, fingerprint_hash))
+ elif name and fingerprint:
+ print(get_certificate_fingerprint(cert, fingerprint))
return
ca_name = get_certificate_ca(cert, ca_certs)
@@ -955,7 +1095,10 @@ def show_certificate(name=None, pem=False, fingerprint_hash=None):
print("Certificates:")
print(tabulate.tabulate(data, headers))
-def show_crl(name=None, pem=False):
+
+def show_crl(
+ raw: bool, name: typing.Optional[str] = None, pem: typing.Optional[bool] = False
+):
headers = ['CA Name', 'Updated', 'Revokes']
data = []
certs = get_config_ca_certificate()
@@ -989,132 +1132,20 @@ def show_crl(name=None, pem=False):
print("Certificate Revocation Lists:")
print(tabulate.tabulate(data, headers))
-if __name__ == '__main__':
- parser = argparse.ArgumentParser()
- parser.add_argument('--action', help='PKI action', required=True)
-
- # X509
- parser.add_argument('--ca', help='Certificate Authority', required=False)
- parser.add_argument('--certificate', help='Certificate', required=False)
- parser.add_argument('--crl', help='Certificate Revocation List', required=False)
- parser.add_argument('--sign', help='Sign certificate with specified CA', required=False)
- parser.add_argument('--self-sign', help='Self-sign the certificate', action='store_true')
- parser.add_argument('--pem', help='Output using PEM encoding', action='store_true')
- parser.add_argument('--fingerprint', help='Show fingerprint and exit', action='store')
- # SSH
- parser.add_argument('--ssh', help='SSH Key', required=False)
+def show_all(raw: bool):
+ show_certificate_authority(raw)
+ print('\n')
+ show_certificate(raw)
+ print('\n')
+ show_crl(raw)
- # DH
- parser.add_argument('--dh', help='DH Parameters', required=False)
-
- # Key pair
- parser.add_argument('--keypair', help='Key pair', required=False)
-
- # OpenVPN
- parser.add_argument('--openvpn', help='OpenVPN TLS key', required=False)
-
- # WireGuard
- parser.add_argument('--wireguard', help='Wireguard', action='store_true')
- group = parser.add_mutually_exclusive_group()
- group.add_argument('--key', help='Wireguard key pair', action='store_true', required=False)
- group.add_argument('--psk', help='Wireguard pre shared key', action='store_true', required=False)
- parser.add_argument('--interface', help='Install generated keys into running-config for named interface', action='store')
- parser.add_argument('--peer', help='Install generated keys into running-config for peer', action='store')
-
- # Global
- parser.add_argument('--file', help='Write generated keys into specified filename', action='store_true')
- parser.add_argument('--install', help='Install generated keys into running-config', action='store_true')
-
- parser.add_argument('--filename', help='Write certificate into specified filename', action='store')
- parser.add_argument('--key-filename', help='Write key into specified filename', action='store')
-
- parser.add_argument('--no-prompt', action='store_true', help='Perform action non-interactively')
- parser.add_argument('--passphrase', help='A passphrase to decrypt the private key')
-
- args = parser.parse_args()
+if __name__ == '__main__':
try:
- if args.action == 'generate':
- if args.ca:
- if args.sign:
- generate_ca_certificate_sign(args.ca, args.sign, install=args.install, file=args.file)
- else:
- generate_ca_certificate(args.ca, install=args.install, file=args.file)
- elif args.certificate:
- if args.sign:
- generate_certificate_sign(args.certificate, args.sign, install=args.install, file=args.file)
- elif args.self_sign:
- generate_certificate_selfsign(args.certificate, install=args.install, file=args.file)
- else:
- generate_certificate_request(name=args.certificate, install=args.install, file=args.file)
-
- elif args.crl:
- generate_certificate_revocation_list(args.crl, install=args.install, file=args.file)
-
- elif args.ssh:
- generate_ssh_keypair(args.ssh, install=args.install, file=args.file)
-
- elif args.dh:
- generate_dh_parameters(args.dh, install=args.install, file=args.file)
-
- elif args.keypair:
- generate_keypair(args.keypair, install=args.install, file=args.file)
-
- elif args.openvpn:
- generate_openvpn_key(args.openvpn, install=args.install, file=args.file)
-
- elif args.wireguard:
- # WireGuard supports writing key directly into the CLI, but this
- # requires the vyos_libexec_dir environment variable to be set
- os.environ["vyos_libexec_dir"] = "/usr/libexec/vyos"
-
- if args.key:
- generate_wireguard_key(args.interface, install=args.install)
- if args.psk:
- generate_wireguard_psk(args.interface, peer=args.peer, install=args.install)
- elif args.action == 'import':
- if args.ca:
- import_ca_certificate(args.ca, path=args.filename, key_path=args.key_filename,
- no_prompt=args.no_prompt, passphrase=args.passphrase)
- elif args.certificate:
- import_certificate(args.certificate, path=args.filename, key_path=args.key_filename,
- no_prompt=args.no_prompt, passphrase=args.passphrase)
- elif args.crl:
- import_crl(args.crl, args.filename)
- elif args.dh:
- import_dh_parameters(args.dh, args.filename)
- elif args.keypair:
- import_keypair(args.keypair, path=args.filename, key_path=args.key_filename,
- no_prompt=args.no_prompt, passphrase=args.passphrase)
- elif args.openvpn:
- import_openvpn_secret(args.openvpn, args.filename)
- elif args.action == 'show':
- if args.ca:
- ca_name = None if args.ca == 'all' else args.ca
- if ca_name:
- if not conf.exists(['pki', 'ca', ca_name]):
- print(f'CA "{ca_name}" does not exist!')
- exit(1)
- show_certificate_authority(ca_name, args.pem)
- elif args.certificate:
- cert_name = None if args.certificate == 'all' else args.certificate
- if cert_name:
- if not conf.exists(['pki', 'certificate', cert_name]):
- print(f'Certificate "{cert_name}" does not exist!')
- exit(1)
- if args.fingerprint is None:
- show_certificate(None if args.certificate == 'all' else args.certificate, args.pem)
- else:
- show_certificate(args.certificate, fingerprint_hash=args.fingerprint)
- elif args.crl:
- show_crl(None if args.crl == 'all' else args.crl, args.pem)
- else:
- show_certificate_authority()
- print('\n')
- show_certificate()
- print('\n')
- show_crl()
- except KeyboardInterrupt:
- print("Aborted")
- sys.exit(0)
+ res = vyos.opmode.run(sys.modules[__name__])
+ if res:
+ print(res)
+ except (ValueError, vyos.opmode.Error) as e:
+ print(e)
+ sys.exit(1)
diff --git a/src/services/api/rest/routers.py b/src/services/api/rest/routers.py
index da981d5bf..5612e947c 100644
--- a/src/services/api/rest/routers.py
+++ b/src/services/api/rest/routers.py
@@ -423,9 +423,9 @@ def create_path_import_pki_no_prompt(path):
correct_paths = ['ca', 'certificate', 'key-pair']
if path[1] not in correct_paths:
return False
- path[1] = '--' + path[1].replace('-', '')
path[3] = '--key-filename'
- return path[1:]
+ path.insert(2, '--name')
+ return ['--pki-type'] + path[1:]
@router.post('/configure')
--
cgit v1.2.3
From 52e3ba35c09237744340c2cce4cba1783ec3f2be Mon Sep 17 00:00:00 2001
From: Nataliia Solomko
Date: Fri, 18 Oct 2024 16:46:39 +0300
Subject: pki: T4914: reformat file by linter rules
---
src/op_mode/pki.py | 517 ++++++++++++++++++++++++++++++++++++++---------------
1 file changed, 375 insertions(+), 142 deletions(-)
(limited to 'src/op_mode')
diff --git a/src/op_mode/pki.py b/src/op_mode/pki.py
index 56b873bb1..49a461e9e 100755
--- a/src/op_mode/pki.py
+++ b/src/op_mode/pki.py
@@ -86,13 +86,17 @@ def _verify(target):
def get_default_values():
# Fetch default x509 values
base = ['pki', 'x509', 'default']
- x509_defaults = conf.get_config_dict(base, key_mangling=('-', '_'),
- no_tag_node_value_mangle=True,
- get_first_key=True,
- with_recursive_defaults=True)
+ x509_defaults = conf.get_config_dict(
+ base,
+ key_mangling=('-', '_'),
+ no_tag_node_value_mangle=True,
+ get_first_key=True,
+ with_recursive_defaults=True,
+ )
return x509_defaults
+
def get_config_ca_certificate(name=None):
# Fetch ca certificates from config
base = ['pki', 'ca']
@@ -101,12 +105,15 @@ def get_config_ca_certificate(name=None):
if name:
base = base + [name]
- if not conf.exists(base + ['private', 'key']) or not conf.exists(base + ['certificate']):
+ if not conf.exists(base + ['private', 'key']) or not conf.exists(
+ base + ['certificate']
+ ):
return False
- return conf.get_config_dict(base, key_mangling=('-', '_'),
- get_first_key=True,
- no_tag_node_value_mangle=True)
+ return conf.get_config_dict(
+ base, key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True
+ )
+
def get_config_certificate(name=None):
# Get certificates from config
@@ -116,18 +123,21 @@ def get_config_certificate(name=None):
if name:
base = base + [name]
- if not conf.exists(base + ['private', 'key']) or not conf.exists(base + ['certificate']):
+ if not conf.exists(base + ['private', 'key']) or not conf.exists(
+ base + ['certificate']
+ ):
return False
- pki = conf.get_config_dict(base, key_mangling=('-', '_'),
- get_first_key=True,
- no_tag_node_value_mangle=True)
+ pki = conf.get_config_dict(
+ base, key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True
+ )
if pki:
for certificate in pki:
pki[certificate] = config_dict_mangle_acme(certificate, pki[certificate])
return pki
+
def get_certificate_ca(cert, ca_certs):
# Find CA certificate for given certificate
if not ca_certs:
@@ -146,6 +156,7 @@ def get_certificate_ca(cert, ca_certs):
return ca_name
return None
+
def get_config_revoked_certificates():
# Fetch revoked certificates from config
ca_base = ['pki', 'ca']
@@ -154,19 +165,26 @@ def get_config_revoked_certificates():
certs = []
if conf.exists(ca_base):
- ca_certificates = conf.get_config_dict(ca_base, key_mangling=('-', '_'),
- get_first_key=True,
- no_tag_node_value_mangle=True)
+ ca_certificates = conf.get_config_dict(
+ ca_base,
+ key_mangling=('-', '_'),
+ get_first_key=True,
+ no_tag_node_value_mangle=True,
+ )
certs.extend(ca_certificates.values())
if conf.exists(cert_base):
- certificates = conf.get_config_dict(cert_base, key_mangling=('-', '_'),
- get_first_key=True,
- no_tag_node_value_mangle=True)
+ certificates = conf.get_config_dict(
+ cert_base,
+ key_mangling=('-', '_'),
+ get_first_key=True,
+ no_tag_node_value_mangle=True,
+ )
certs.extend(certificates.values())
return [cert_dict for cert_dict in certs if 'revoke' in cert_dict]
+
def get_revoked_by_serial_numbers(serial_numbers=[]):
# Return serial numbers of revoked certificates
certs_out = []
@@ -190,113 +208,153 @@ def get_revoked_by_serial_numbers(serial_numbers=[]):
certs_out.append(cert_name)
return certs_out
-def install_certificate(name, cert='', private_key=None, key_type=None, key_passphrase=None, is_ca=False):
+
+def install_certificate(
+ name, cert='', private_key=None, key_type=None, key_passphrase=None, is_ca=False
+):
# Show/install conf commands for certificate
prefix = 'ca' if is_ca else 'certificate'
- base = f"pki {prefix} {name}"
+ base = f'pki {prefix} {name}'
config_paths = []
if cert:
- cert_pem = "".join(encode_certificate(cert).strip().split("\n")[1:-1])
+ cert_pem = ''.join(encode_certificate(cert).strip().split('\n')[1:-1])
config_paths.append(f"{base} certificate '{cert_pem}'")
if private_key:
- key_pem = "".join(encode_private_key(private_key, passphrase=key_passphrase).strip().split("\n")[1:-1])
+ key_pem = ''.join(
+ encode_private_key(private_key, passphrase=key_passphrase)
+ .strip()
+ .split('\n')[1:-1]
+ )
config_paths.append(f"{base} private key '{key_pem}'")
if key_passphrase:
- config_paths.append(f"{base} private password-protected")
+ config_paths.append(f'{base} private password-protected')
install_into_config(conf, config_paths)
+
def install_crl(ca_name, crl):
# Show/install conf commands for crl
- crl_pem = "".join(encode_certificate(crl).strip().split("\n")[1:-1])
+ crl_pem = ''.join(encode_certificate(crl).strip().split('\n')[1:-1])
install_into_config(conf, [f"pki ca {ca_name} crl '{crl_pem}'"])
+
def install_dh_parameters(name, params):
# Show/install conf commands for dh params
- dh_pem = "".join(encode_dh_parameters(params).strip().split("\n")[1:-1])
+ dh_pem = ''.join(encode_dh_parameters(params).strip().split('\n')[1:-1])
install_into_config(conf, [f"pki dh {name} parameters '{dh_pem}'"])
+
def install_ssh_key(name, public_key, private_key, passphrase=None):
# Show/install conf commands for ssh key
- key_openssh = encode_public_key(public_key, encoding='OpenSSH', key_format='OpenSSH')
+ key_openssh = encode_public_key(
+ public_key, encoding='OpenSSH', key_format='OpenSSH'
+ )
username = os.getlogin()
- type_key_split = key_openssh.split(" ")
-
- base = f"system login user {username} authentication public-keys {name}"
- install_into_config(conf, [
- f"{base} key '{type_key_split[1]}'",
- f"{base} type '{type_key_split[0]}'"
- ])
- print(encode_private_key(private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase))
-
-def install_keypair(name, key_type, private_key=None, public_key=None, passphrase=None, prompt=True):
+ type_key_split = key_openssh.split(' ')
+
+ base = f'system login user {username} authentication public-keys {name}'
+ install_into_config(
+ conf,
+ [f"{base} key '{type_key_split[1]}'", f"{base} type '{type_key_split[0]}'"],
+ )
+ print(
+ encode_private_key(
+ private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase
+ )
+ )
+
+
+def install_keypair(
+ name, key_type, private_key=None, public_key=None, passphrase=None, prompt=True
+):
# Show/install conf commands for key-pair
config_paths = []
if public_key:
- install_public_key = not prompt or ask_yes_no('Do you want to install the public key?', default=True)
+ install_public_key = not prompt or ask_yes_no(
+ 'Do you want to install the public key?', default=True
+ )
public_key_pem = encode_public_key(public_key)
if install_public_key:
- install_public_pem = "".join(public_key_pem.strip().split("\n")[1:-1])
- config_paths.append(f"pki key-pair {name} public key '{install_public_pem}'")
+ install_public_pem = ''.join(public_key_pem.strip().split('\n')[1:-1])
+ config_paths.append(
+ f"pki key-pair {name} public key '{install_public_pem}'"
+ )
else:
- print("Public key:")
+ print('Public key:')
print(public_key_pem)
if private_key:
- install_private_key = not prompt or ask_yes_no('Do you want to install the private key?', default=True)
+ install_private_key = not prompt or ask_yes_no(
+ 'Do you want to install the private key?', default=True
+ )
private_key_pem = encode_private_key(private_key, passphrase=passphrase)
if install_private_key:
- install_private_pem = "".join(private_key_pem.strip().split("\n")[1:-1])
- config_paths.append(f"pki key-pair {name} private key '{install_private_pem}'")
+ install_private_pem = ''.join(private_key_pem.strip().split('\n')[1:-1])
+ config_paths.append(
+ f"pki key-pair {name} private key '{install_private_pem}'"
+ )
if passphrase:
- config_paths.append(f"pki key-pair {name} private password-protected")
+ config_paths.append(f'pki key-pair {name} private password-protected')
else:
- print("Private key:")
+ print('Private key:')
print(private_key_pem)
install_into_config(conf, config_paths)
+
def install_openvpn_key(name, key_data, key_version='1'):
config_paths = [
f"pki openvpn shared-secret {name} key '{key_data}'",
- f"pki openvpn shared-secret {name} version '{key_version}'"
+ f"pki openvpn shared-secret {name} version '{key_version}'",
]
install_into_config(conf, config_paths)
+
def install_wireguard_key(interface, private_key, public_key):
# Show conf commands for installing wireguard key pairs
from vyos.ifconfig import Section
+
if Section.section(interface) != 'wireguard':
print(f'"{interface}" is not a WireGuard interface name!')
exit(1)
# Check if we are running in a config session - if yes, we can directly write to the CLI
- install_into_config(conf, [f"interfaces wireguard {interface} private-key '{private_key}'"])
+ install_into_config(
+ conf, [f"interfaces wireguard {interface} private-key '{private_key}'"]
+ )
print(f"Corresponding public-key to use on peer system is: '{public_key}'")
+
def install_wireguard_psk(interface, peer, psk):
from vyos.ifconfig import Section
+
if Section.section(interface) != 'wireguard':
print(f'"{interface}" is not a WireGuard interface name!')
exit(1)
# Check if we are running in a config session - if yes, we can directly write to the CLI
- install_into_config(conf, [f"interfaces wireguard {interface} peer {peer} preshared-key '{psk}'"])
+ install_into_config(
+ conf, [f"interfaces wireguard {interface} peer {peer} preshared-key '{psk}'"]
+ )
+
def ask_passphrase():
passphrase = None
- print("Note: If you plan to use the generated key on this router, do not encrypt the private key.")
+ print(
+ 'Note: If you plan to use the generated key on this router, do not encrypt the private key.'
+ )
if ask_yes_no('Do you want to encrypt the private key with a passphrase?'):
passphrase = ask_input('Enter passphrase:')
return passphrase
+
def write_file(filename, contents):
full_path = os.path.join(auth_dir, filename)
directory = os.path.dirname(full_path)
@@ -305,7 +363,9 @@ def write_file(filename, contents):
print('Failed to write file: directory does not exist')
return False
- if os.path.exists(full_path) and not ask_yes_no('Do you want to overwrite the existing file?'):
+ if os.path.exists(full_path) and not ask_yes_no(
+ 'Do you want to overwrite the existing file?'
+ ):
return False
with open(full_path, 'w') as f:
@@ -313,10 +373,14 @@ def write_file(filename, contents):
print(f'File written to {full_path}')
-# Generation functions
+# Generation functions
def generate_private_key():
- key_type = ask_input('Enter private key type: [rsa, dsa, ec]', default='rsa', valid_responses=['rsa', 'dsa', 'ec'])
+ key_type = ask_input(
+ 'Enter private key type: [rsa, dsa, ec]',
+ default='rsa',
+ valid_responses=['rsa', 'dsa', 'ec'],
+ )
size_valid = []
size_default = 0
@@ -328,28 +392,43 @@ def generate_private_key():
size_default = 256
size_valid = [224, 256, 384, 521]
- size = ask_input('Enter private key bits:', default=size_default, numeric_only=True, valid_responses=size_valid)
+ size = ask_input(
+ 'Enter private key bits:',
+ default=size_default,
+ numeric_only=True,
+ valid_responses=size_valid,
+ )
return create_private_key(key_type, size), key_type
+
def parse_san_string(san_string):
if not san_string:
return None
output = []
- san_split = san_string.strip().split(",")
+ san_split = san_string.strip().split(',')
for pair_str in san_split:
- tag, value = pair_str.strip().split(":", 1)
+ tag, value = pair_str.strip().split(':', 1)
if tag == 'ipv4':
output.append(ipaddress.IPv4Address(value))
elif tag == 'ipv6':
output.append(ipaddress.IPv6Address(value))
elif tag == 'dns' or tag == 'rfc822':
output.append(value)
- return output
+ return
-def generate_certificate_request(private_key=None, key_type=None, return_request=False, name=None, install=False, file=False, ask_san=True):
+
+def generate_certificate_request(
+ private_key=None,
+ key_type=None,
+ return_request=False,
+ name=None,
+ install=False,
+ file=False,
+ ask_san=True,
+):
if not private_key:
private_key, key_type = generate_private_key()
@@ -358,18 +437,24 @@ def generate_certificate_request(private_key=None, key_type=None, return_request
while True:
country = ask_input('Enter country code:', default=default_values['country'])
if len(country) != 2:
- print("Country name must be a 2 character country code")
+ print('Country name must be a 2 character country code')
continue
subject['country'] = country
break
subject['state'] = ask_input('Enter state:', default=default_values['state'])
- subject['locality'] = ask_input('Enter locality:', default=default_values['locality'])
- subject['organization'] = ask_input('Enter organization name:', default=default_values['organization'])
+ subject['locality'] = ask_input(
+ 'Enter locality:', default=default_values['locality']
+ )
+ subject['organization'] = ask_input(
+ 'Enter organization name:', default=default_values['organization']
+ )
subject['common_name'] = ask_input('Enter common name:', default='vyos.io')
subject_alt_names = None
if ask_san and ask_yes_no('Do you want to configure Subject Alternative Names?'):
- print("Enter alternative names in a comma separate list, example: ipv4:1.1.1.1,ipv6:fe80::1,dns:vyos.net,rfc822:user@vyos.net")
+ print(
+ 'Enter alternative names in a comma separate list, example: ipv4:1.1.1.1,ipv6:fe80::1,dns:vyos.net,rfc822:user@vyos.net'
+ )
san_string = ask_input('Enter Subject Alternative Names:')
subject_alt_names = parse_san_string(san_string)
@@ -386,24 +471,48 @@ def generate_certificate_request(private_key=None, key_type=None, return_request
return None
if install:
- print("Certificate request:")
- print(encode_certificate(cert_req) + "\n")
- install_certificate(name, private_key=private_key, key_type=key_type, key_passphrase=passphrase, is_ca=False)
+ print('Certificate request:')
+ print(encode_certificate(cert_req) + '\n')
+ install_certificate(
+ name,
+ private_key=private_key,
+ key_type=key_type,
+ key_passphrase=passphrase,
+ is_ca=False,
+ )
if file:
write_file(f'{name}.csr', encode_certificate(cert_req))
- write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase))
+ write_file(
+ f'{name}.key', encode_private_key(private_key, passphrase=passphrase)
+ )
-def generate_certificate(cert_req, ca_cert, ca_private_key, is_ca=False, is_sub_ca=False):
- valid_days = ask_input('Enter how many days certificate will be valid:', default='365' if not is_ca else '1825', numeric_only=True)
+
+def generate_certificate(
+ cert_req, ca_cert, ca_private_key, is_ca=False, is_sub_ca=False
+):
+ valid_days = ask_input(
+ 'Enter how many days certificate will be valid:',
+ default='365' if not is_ca else '1825',
+ numeric_only=True,
+ )
cert_type = None
if not is_ca:
- cert_type = ask_input('Enter certificate type: (client, server)', default='server', valid_responses=['client', 'server'])
- return create_certificate(cert_req, ca_cert, ca_private_key, valid_days, cert_type, is_ca, is_sub_ca)
+ cert_type = ask_input(
+ 'Enter certificate type: (client, server)',
+ default='server',
+ valid_responses=['client', 'server'],
+ )
+ return create_certificate(
+ cert_req, ca_cert, ca_private_key, valid_days, cert_type, is_ca, is_sub_ca
+ )
+
def generate_ca_certificate(name, install=False, file=False):
private_key, key_type = generate_private_key()
- cert_req = generate_certificate_request(private_key, key_type, return_request=True, ask_san=False)
+ cert_req = generate_certificate_request(
+ private_key, key_type, return_request=True, ask_san=False
+ )
cert = generate_certificate(cert_req, cert_req, private_key, is_ca=True)
passphrase = ask_passphrase()
@@ -413,11 +522,16 @@ def generate_ca_certificate(name, install=False, file=False):
return None
if install:
- install_certificate(name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=True)
+ install_certificate(
+ name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=True
+ )
if file:
write_file(f'{name}.pem', encode_certificate(cert))
- write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase))
+ write_file(
+ f'{name}.key', encode_private_key(private_key, passphrase=passphrase)
+ )
+
def generate_ca_certificate_sign(name, ca_name, install=False, file=False):
ca_dict = get_config_ca_certificate(ca_name)
@@ -429,17 +543,19 @@ def generate_ca_certificate_sign(name, ca_name, install=False, file=False):
ca_cert = load_certificate(ca_dict['certificate'])
if not ca_cert:
- print("Failed to load signing CA certificate, aborting")
+ print('Failed to load signing CA certificate, aborting')
return None
ca_private = ca_dict['private']
ca_private_passphrase = None
if 'password_protected' in ca_private:
ca_private_passphrase = ask_input('Enter signing CA private key passphrase:')
- ca_private_key = load_private_key(ca_private['key'], passphrase=ca_private_passphrase)
+ ca_private_key = load_private_key(
+ ca_private['key'], passphrase=ca_private_passphrase
+ )
if not ca_private_key:
- print("Failed to load signing CA private key, aborting")
+ print('Failed to load signing CA private key, aborting')
return None
private_key = None
@@ -448,9 +564,11 @@ def generate_ca_certificate_sign(name, ca_name, install=False, file=False):
cert_req = None
if not ask_yes_no('Do you already have a certificate request?'):
private_key, key_type = generate_private_key()
- cert_req = generate_certificate_request(private_key, key_type, return_request=True, ask_san=False)
+ cert_req = generate_certificate_request(
+ private_key, key_type, return_request=True, ask_san=False
+ )
else:
- print("Paste certificate request and press enter:")
+ print('Paste certificate request and press enter:')
lines = []
curr_line = ''
while True:
@@ -460,17 +578,21 @@ def generate_ca_certificate_sign(name, ca_name, install=False, file=False):
lines.append(curr_line)
if not lines:
- print("Aborted")
+ print('Aborted')
return None
- wrap = lines[0].find('-----') < 0 # Only base64 pasted, add the CSR tags for parsing
- cert_req = load_certificate_request("\n".join(lines), wrap)
+ wrap = (
+ lines[0].find('-----') < 0
+ ) # Only base64 pasted, add the CSR tags for parsing
+ cert_req = load_certificate_request('\n'.join(lines), wrap)
if not cert_req:
- print("Invalid certificate request")
+ print('Invalid certificate request')
return None
- cert = generate_certificate(cert_req, ca_cert, ca_private_key, is_ca=True, is_sub_ca=True)
+ cert = generate_certificate(
+ cert_req, ca_cert, ca_private_key, is_ca=True, is_sub_ca=True
+ )
passphrase = None
if private_key is not None:
@@ -483,12 +605,17 @@ def generate_ca_certificate_sign(name, ca_name, install=False, file=False):
return None
if install:
- install_certificate(name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=True)
+ install_certificate(
+ name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=True
+ )
if file:
write_file(f'{name}.pem', encode_certificate(cert))
if private_key is not None:
- write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase))
+ write_file(
+ f'{name}.key', encode_private_key(private_key, passphrase=passphrase)
+ )
+
def generate_certificate_sign(name, ca_name, install=False, file=False):
ca_dict = get_config_ca_certificate(ca_name)
@@ -500,17 +627,19 @@ def generate_certificate_sign(name, ca_name, install=False, file=False):
ca_cert = load_certificate(ca_dict['certificate'])
if not ca_cert:
- print("Failed to load CA certificate, aborting")
+ print('Failed to load CA certificate, aborting')
return None
ca_private = ca_dict['private']
ca_private_passphrase = None
if 'password_protected' in ca_private:
ca_private_passphrase = ask_input('Enter CA private key passphrase:')
- ca_private_key = load_private_key(ca_private['key'], passphrase=ca_private_passphrase)
+ ca_private_key = load_private_key(
+ ca_private['key'], passphrase=ca_private_passphrase
+ )
if not ca_private_key:
- print("Failed to load CA private key, aborting")
+ print('Failed to load CA private key, aborting')
return None
private_key = None
@@ -519,9 +648,11 @@ def generate_certificate_sign(name, ca_name, install=False, file=False):
cert_req = None
if not ask_yes_no('Do you already have a certificate request?'):
private_key, key_type = generate_private_key()
- cert_req = generate_certificate_request(private_key, key_type, return_request=True)
+ cert_req = generate_certificate_request(
+ private_key, key_type, return_request=True
+ )
else:
- print("Paste certificate request and press enter:")
+ print('Paste certificate request and press enter:')
lines = []
curr_line = ''
while True:
@@ -531,18 +662,20 @@ def generate_certificate_sign(name, ca_name, install=False, file=False):
lines.append(curr_line)
if not lines:
- print("Aborted")
+ print('Aborted')
return None
- wrap = lines[0].find('-----') < 0 # Only base64 pasted, add the CSR tags for parsing
- cert_req = load_certificate_request("\n".join(lines), wrap)
+ wrap = (
+ lines[0].find('-----') < 0
+ ) # Only base64 pasted, add the CSR tags for parsing
+ cert_req = load_certificate_request('\n'.join(lines), wrap)
if not cert_req:
- print("Invalid certificate request")
+ print('Invalid certificate request')
return None
cert = generate_certificate(cert_req, ca_cert, ca_private_key, is_ca=False)
-
+
passphrase = None
if private_key is not None:
passphrase = ask_passphrase()
@@ -554,12 +687,17 @@ def generate_certificate_sign(name, ca_name, install=False, file=False):
return None
if install:
- install_certificate(name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=False)
+ install_certificate(
+ name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=False
+ )
if file:
write_file(f'{name}.pem', encode_certificate(cert))
if private_key is not None:
- write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase))
+ write_file(
+ f'{name}.key', encode_private_key(private_key, passphrase=passphrase)
+ )
+
def generate_certificate_selfsign(name, install=False, file=False):
private_key, key_type = generate_private_key()
@@ -573,11 +711,21 @@ def generate_certificate_selfsign(name, install=False, file=False):
return None
if install:
- install_certificate(name, cert, private_key=private_key, key_type=key_type, key_passphrase=passphrase, is_ca=False)
+ install_certificate(
+ name,
+ cert,
+ private_key=private_key,
+ key_type=key_type,
+ key_passphrase=passphrase,
+ is_ca=False,
+ )
if file:
write_file(f'{name}.pem', encode_certificate(cert))
- write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase))
+ write_file(
+ f'{name}.key', encode_private_key(private_key, passphrase=passphrase)
+ )
+
def generate_certificate_revocation_list(ca_name, install=False, file=False):
ca_dict = get_config_ca_certificate(ca_name)
@@ -589,17 +737,19 @@ def generate_certificate_revocation_list(ca_name, install=False, file=False):
ca_cert = load_certificate(ca_dict['certificate'])
if not ca_cert:
- print("Failed to load CA certificate, aborting")
+ print('Failed to load CA certificate, aborting')
return None
ca_private = ca_dict['private']
ca_private_passphrase = None
if 'password_protected' in ca_private:
ca_private_passphrase = ask_input('Enter CA private key passphrase:')
- ca_private_key = load_private_key(ca_private['key'], passphrase=ca_private_passphrase)
+ ca_private_key = load_private_key(
+ ca_private['key'], passphrase=ca_private_passphrase
+ )
if not ca_private_key:
- print("Failed to load CA private key, aborting")
+ print('Failed to load CA private key, aborting')
return None
revoked_certs = get_config_revoked_certificates()
@@ -620,13 +770,13 @@ def generate_certificate_revocation_list(ca_name, install=False, file=False):
continue
if not to_revoke:
- print("No revoked certificates to add to the CRL")
+ print('No revoked certificates to add to the CRL')
return None
crl = create_certificate_revocation_list(ca_cert, ca_private_key, to_revoke)
if not crl:
- print("Failed to create CRL")
+ print('Failed to create CRL')
return None
if not install and not file:
@@ -637,7 +787,8 @@ def generate_certificate_revocation_list(ca_name, install=False, file=False):
install_crl(ca_name, crl)
if file:
- write_file(f'{name}.crl', encode_certificate(crl))
+ write_file(f'{ca_name}.crl', encode_certificate(crl))
+
def generate_ssh_keypair(name, install=False, file=False):
private_key, key_type = generate_private_key()
@@ -646,29 +797,42 @@ def generate_ssh_keypair(name, install=False, file=False):
if not install and not file:
print(encode_public_key(public_key, encoding='OpenSSH', key_format='OpenSSH'))
- print("")
- print(encode_private_key(private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase))
+ print('')
+ print(
+ encode_private_key(
+ private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase
+ )
+ )
return None
if install:
install_ssh_key(name, public_key, private_key, passphrase)
if file:
- write_file(f'{name}.pem', encode_public_key(public_key, encoding='OpenSSH', key_format='OpenSSH'))
- write_file(f'{name}.key', encode_private_key(private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase))
+ write_file(
+ f'{name}.pem',
+ encode_public_key(public_key, encoding='OpenSSH', key_format='OpenSSH'),
+ )
+ write_file(
+ f'{name}.key',
+ encode_private_key(
+ private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase
+ ),
+ )
+
def generate_dh_parameters(name, install=False, file=False):
bits = ask_input('Enter DH parameters key size:', default=2048, numeric_only=True)
- print("Generating parameters...")
+ print('Generating parameters...')
dh_params = create_dh_parameters(bits)
if not dh_params:
- print("Failed to create DH parameters")
+ print('Failed to create DH parameters')
return None
if not install and not file:
- print("DH Parameters:")
+ print('DH Parameters:')
print(encode_dh_parameters(dh_params))
if install:
@@ -677,6 +841,7 @@ def generate_dh_parameters(name, install=False, file=False):
if file:
write_file(f'{name}.pem', encode_dh_parameters(dh_params))
+
def generate_keypair(name, install=False, file=False):
private_key, key_type = generate_private_key()
public_key = private_key.public_key()
@@ -684,7 +849,7 @@ def generate_keypair(name, install=False, file=False):
if not install and not file:
print(encode_public_key(public_key))
- print("")
+ print('')
print(encode_private_key(private_key, passphrase=passphrase))
return None
@@ -693,13 +858,16 @@ def generate_keypair(name, install=False, file=False):
if file:
write_file(f'{name}.pem', encode_public_key(public_key))
- write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase))
+ write_file(
+ f'{name}.key', encode_private_key(private_key, passphrase=passphrase)
+ )
+
def generate_openvpn_key(name, install=False, file=False):
result = cmd('openvpn --genkey secret /dev/stdout | grep -o "^[^#]*"')
if not result:
- print("Failed to generate OpenVPN key")
+ print('Failed to generate OpenVPN key')
return None
if not install and not file:
@@ -707,11 +875,13 @@ def generate_openvpn_key(name, install=False, file=False):
return None
if install:
- key_lines = result.split("\n")
- key_data = "".join(key_lines[1:-1]) # Remove wrapper tags and line endings
+ key_lines = result.split('\n')
+ key_data = ''.join(key_lines[1:-1]) # Remove wrapper tags and line endings
key_version = '1'
- version_search = re.search(r'BEGIN OpenVPN Static key V(\d+)', result) # Future-proofing (hopefully)
+ version_search = re.search(
+ r'BEGIN OpenVPN Static key V(\d+)', result
+ ) # Future-proofing (hopefully)
if version_search:
key_version = version_search[1]
@@ -720,6 +890,7 @@ def generate_openvpn_key(name, install=False, file=False):
if file:
write_file(f'{name}.key', result)
+
def generate_wireguard_key(interface=None, install=False):
private_key = cmd('wg genkey')
public_key = cmd('wg pubkey', input=private_key)
@@ -730,6 +901,7 @@ def generate_wireguard_key(interface=None, install=False):
print(f'Private key: {private_key}')
print(f'Public key: {public_key}', end='\n\n')
+
def generate_wireguard_psk(interface=None, peer=None, install=False):
psk = cmd('wg genpsk')
if interface and peer and install:
@@ -737,8 +909,11 @@ def generate_wireguard_psk(interface=None, peer=None, install=False):
else:
print(f'Pre-shared key: {psk}')
+
# Import functions
-def import_ca_certificate(name, path=None, key_path=None, no_prompt=False, passphrase=None):
+def import_ca_certificate(
+ name, path=None, key_path=None, no_prompt=False, passphrase=None
+):
if path:
if not os.path.exists(path):
print(f'File not found: {path}')
@@ -775,7 +950,10 @@ def import_ca_certificate(name, path=None, key_path=None, no_prompt=False, passp
install_certificate(name, private_key=key, is_ca=True)
-def import_certificate(name, path=None, key_path=None, no_prompt=False, passphrase=None):
+
+def import_certificate(
+ name, path=None, key_path=None, no_prompt=False, passphrase=None
+):
if path:
if not os.path.exists(path):
print(f'File not found: {path}')
@@ -812,6 +990,7 @@ def import_certificate(name, path=None, key_path=None, no_prompt=False, passphra
install_certificate(name, private_key=key, is_ca=False)
+
def import_crl(name, path):
if not os.path.exists(path):
print(f'File not found: {path}')
@@ -829,6 +1008,7 @@ def import_crl(name, path):
install_crl(name, crl)
+
def import_dh_parameters(name, path):
if not os.path.exists(path):
print(f'File not found: {path}')
@@ -846,6 +1026,7 @@ def import_dh_parameters(name, path):
install_dh_parameters(name, dh)
+
def import_keypair(name, path=None, key_path=None, no_prompt=False, passphrase=None):
if path:
if not os.path.exists(path):
@@ -883,6 +1064,7 @@ def import_keypair(name, path=None, key_path=None, no_prompt=False, passphrase=N
install_keypair(name, None, private_key=key, prompt=False)
+
def import_openvpn_secret(name, path):
if not os.path.exists(path):
print(f'File not found: {path}')
@@ -892,11 +1074,15 @@ def import_openvpn_secret(name, path):
key_version = '1'
with open(path) as f:
- key_lines = f.read().strip().split("\n")
- key_lines = list(filter(lambda line: not line.strip().startswith('#'), key_lines)) # Remove commented lines
- key_data = "".join(key_lines[1:-1]) # Remove wrapper tags and line endings
-
- version_search = re.search(r'BEGIN OpenVPN Static key V(\d+)', key_lines[0]) # Future-proofing (hopefully)
+ key_lines = f.read().strip().split('\n')
+ key_lines = list(
+ filter(lambda line: not line.strip().startswith('#'), key_lines)
+ ) # Remove commented lines
+ key_data = ''.join(key_lines[1:-1]) # Remove wrapper tags and line endings
+
+ version_search = re.search(
+ r'BEGIN OpenVPN Static key V(\d+)', key_lines[0]
+ ) # Future-proofing (hopefully)
if version_search:
key_version = version_search[1]
@@ -1007,7 +1193,15 @@ def import_pki(
def show_certificate_authority(
raw: bool, name: typing.Optional[str] = None, pem: typing.Optional[bool] = False
):
- headers = ['Name', 'Subject', 'Issuer CN', 'Issued', 'Expiry', 'Private Key', 'Parent']
+ headers = [
+ 'Name',
+ 'Subject',
+ 'Issuer CN',
+ 'Issued',
+ 'Expiry',
+ 'Private Key',
+ 'Parent',
+ ]
data = []
certs = get_config_ca_certificate()
if certs:
@@ -1024,7 +1218,7 @@ def show_certificate_authority(
return
parent_ca_name = get_certificate_ca(cert, certs)
- cert_issuer_cn = cert.issuer.rfc4514_string().split(",")[0]
+ cert_issuer_cn = cert.issuer.rfc4514_string().split(',')[0]
if not parent_ca_name or parent_ca_name == cert_name:
parent_ca_name = 'N/A'
@@ -1032,10 +1226,24 @@ def show_certificate_authority(
if not cert:
continue
- have_private = 'Yes' if 'private' in cert_dict and 'key' in cert_dict['private'] else 'No'
- data.append([cert_name, cert.subject.rfc4514_string(), cert_issuer_cn, cert.not_valid_before, cert.not_valid_after, have_private, parent_ca_name])
+ have_private = (
+ 'Yes'
+ if 'private' in cert_dict and 'key' in cert_dict['private']
+ else 'No'
+ )
+ data.append(
+ [
+ cert_name,
+ cert.subject.rfc4514_string(),
+ cert_issuer_cn,
+ cert.not_valid_before,
+ cert.not_valid_after,
+ have_private,
+ parent_ca_name,
+ ]
+ )
- print("Certificate Authorities:")
+ print('Certificate Authorities:')
print(tabulate.tabulate(data, headers))
@@ -1046,7 +1254,17 @@ def show_certificate(
pem: typing.Optional[bool] = False,
fingerprint: typing.Optional[ArgsFingerprint] = None,
):
- headers = ['Name', 'Type', 'Subject CN', 'Issuer CN', 'Issued', 'Expiry', 'Revoked', 'Private Key', 'CA Present']
+ headers = [
+ 'Name',
+ 'Type',
+ 'Subject CN',
+ 'Issuer CN',
+ 'Issued',
+ 'Expiry',
+ 'Revoked',
+ 'Private Key',
+ 'CA Present',
+ ]
data = []
certs = get_config_certificate()
if certs:
@@ -1071,8 +1289,8 @@ def show_certificate(
return
ca_name = get_certificate_ca(cert, ca_certs)
- cert_subject_cn = cert.subject.rfc4514_string().split(",")[0]
- cert_issuer_cn = cert.issuer.rfc4514_string().split(",")[0]
+ cert_subject_cn = cert.subject.rfc4514_string().split(',')[0]
+ cert_issuer_cn = cert.issuer.rfc4514_string().split(',')[0]
cert_type = 'Unknown'
try:
@@ -1081,18 +1299,31 @@ def show_certificate(
cert_type = 'Server'
elif ext and ExtendedKeyUsageOID.CLIENT_AUTH in ext.value:
cert_type = 'Client'
- except:
+ except Exception:
pass
revoked = 'Yes' if 'revoke' in cert_dict else 'No'
- have_private = 'Yes' if 'private' in cert_dict and 'key' in cert_dict['private'] else 'No'
+ have_private = (
+ 'Yes'
+ if 'private' in cert_dict and 'key' in cert_dict['private']
+ else 'No'
+ )
have_ca = f'Yes ({ca_name})' if ca_name else 'No'
- data.append([
- cert_name, cert_type, cert_subject_cn, cert_issuer_cn,
- cert.not_valid_before, cert.not_valid_after,
- revoked, have_private, have_ca])
+ data.append(
+ [
+ cert_name,
+ cert_type,
+ cert_subject_cn,
+ cert_issuer_cn,
+ cert.not_valid_before,
+ cert.not_valid_after,
+ revoked,
+ have_private,
+ have_ca,
+ ]
+ )
- print("Certificates:")
+ print('Certificates:')
print(tabulate.tabulate(data, headers))
@@ -1123,13 +1354,15 @@ def show_crl(
print(encode_certificate(crl))
continue
- certs = get_revoked_by_serial_numbers([revoked.serial_number for revoked in crl])
- data.append([cert_name, crl.last_update, ", ".join(certs)])
+ certs = get_revoked_by_serial_numbers(
+ [revoked.serial_number for revoked in crl]
+ )
+ data.append([cert_name, crl.last_update, ', '.join(certs)])
if name and pem:
return
- print("Certificate Revocation Lists:")
+ print('Certificate Revocation Lists:')
print(tabulate.tabulate(data, headers))
--
cgit v1.2.3