summaryrefslogtreecommitdiff
path: root/src/op_mode/pki.py
diff options
context:
space:
mode:
authorKim <kim.sidney@gmail.com>2021-10-07 16:52:56 +0200
committerGitHub <noreply@github.com>2021-10-07 16:52:56 +0200
commit2274dbf9047493a00a6f30346b38dacd8cfcf965 (patch)
treef431f5f6f1b2770c98ed9047e1cec9209e536366 /src/op_mode/pki.py
parent2acfffab8b98238e7d869673a858a4ae21651f0b (diff)
parentadc7ef387d40e92bd7163ee6b401e99e554394a3 (diff)
downloadvyos-1x-2274dbf9047493a00a6f30346b38dacd8cfcf965.tar.gz
vyos-1x-2274dbf9047493a00a6f30346b38dacd8cfcf965.zip
Merge branch 'current' into 2fa
Diffstat (limited to 'src/op_mode/pki.py')
-rwxr-xr-xsrc/op_mode/pki.py189
1 files changed, 113 insertions, 76 deletions
diff --git a/src/op_mode/pki.py b/src/op_mode/pki.py
index 297270cf1..2283cd820 100755
--- a/src/op_mode/pki.py
+++ b/src/op_mode/pki.py
@@ -17,7 +17,6 @@
import argparse
import ipaddress
import os
-import re
import sys
import tabulate
@@ -25,6 +24,7 @@ from cryptography import x509
from cryptography.x509.oid import ExtendedKeyUsageOID
from vyos.config import Config
+from vyos.configquery import ConfigTreeQuery
from vyos.configdict import dict_merge
from vyos.pki import encode_certificate, encode_public_key, encode_private_key, encode_dh_parameters
from vyos.pki import create_certificate, create_certificate_request, create_certificate_revocation_list
@@ -37,25 +37,24 @@ from vyos.util import ask_input, ask_yes_no
from vyos.util import cmd
CERT_REQ_END = '-----END CERTIFICATE REQUEST-----'
-
auth_dir = '/config/auth'
# Helper Functions
-
+conf = ConfigTreeQuery()
def get_default_values():
# Fetch default x509 values
- conf = Config()
base = ['pki', 'x509', 'default']
x509_defaults = conf.get_config_dict(base, key_mangling=('-', '_'),
- get_first_key=True, no_tag_node_value_mangle=True)
+ get_first_key=True,
+ no_tag_node_value_mangle=True)
default_values = defaults(base)
- return dict_merge(default_values, x509_defaults)
+ x509_defaults = dict_merge(default_values, x509_defaults)
+
+ return x509_defaults
def get_config_ca_certificate(name=None):
# Fetch ca certificates from config
- conf = Config()
base = ['pki', 'ca']
-
if not conf.exists(base):
return False
@@ -65,13 +64,12 @@ def get_config_ca_certificate(name=None):
return False
return conf.get_config_dict(base, key_mangling=('-', '_'),
- get_first_key=True, no_tag_node_value_mangle=True)
+ get_first_key=True,
+ no_tag_node_value_mangle=True)
def get_config_certificate(name=None):
# Get certificates from config
- conf = Config()
base = ['pki', 'certificate']
-
if not conf.exists(base):
return False
@@ -81,7 +79,8 @@ def get_config_certificate(name=None):
return False
return conf.get_config_dict(base, key_mangling=('-', '_'),
- get_first_key=True, no_tag_node_value_mangle=True)
+ get_first_key=True,
+ no_tag_node_value_mangle=True)
def get_certificate_ca(cert, ca_certs):
# Find CA certificate for given certificate
@@ -100,7 +99,6 @@ def get_certificate_ca(cert, ca_certs):
def get_config_revoked_certificates():
# Fetch revoked certificates from config
- conf = Config()
ca_base = ['pki', 'ca']
cert_base = ['pki', 'certificate']
@@ -108,12 +106,14 @@ def get_config_revoked_certificates():
if conf.exists(ca_base):
ca_certificates = conf.get_config_dict(ca_base, key_mangling=('-', '_'),
- get_first_key=True, no_tag_node_value_mangle=True)
+ get_first_key=True,
+ no_tag_node_value_mangle=True)
certs.extend(ca_certificates.values())
if conf.exists(cert_base):
certificates = conf.get_config_dict(cert_base, key_mangling=('-', '_'),
- get_first_key=True, no_tag_node_value_mangle=True)
+ get_first_key=True,
+ no_tag_node_value_mangle=True)
certs.extend(certificates.values())
return [cert_dict for cert_dict in certs if 'revoke' in cert_dict]
@@ -144,39 +144,41 @@ def get_revoked_by_serial_numbers(serial_numbers=[]):
def install_certificate(name, cert='', private_key=None, key_type=None, key_passphrase=None, is_ca=False):
# Show conf commands for installing certificate
prefix = 'ca' if is_ca else 'certificate'
- print("Configure mode commands to install:")
+ print('Configure mode commands to install:')
+ base = f"set pki {prefix} {name}"
if cert:
cert_pem = "".join(encode_certificate(cert).strip().split("\n")[1:-1])
- print("set pki %s %s certificate '%s'" % (prefix, name, cert_pem))
+ print(f"{base} certificate '{cert_pem}'")
if private_key:
key_pem = "".join(encode_private_key(private_key, passphrase=key_passphrase).strip().split("\n")[1:-1])
- print("set pki %s %s private key '%s'" % (prefix, name, key_pem))
+ print(f"{base} private key '{key_pem}'")
if key_passphrase:
- print("set pki %s %s private password-protected" % (prefix, name))
+ print(f"{base} private password-protected")
def install_crl(ca_name, crl):
# Show conf commands for installing crl
print("Configure mode commands to install CRL:")
crl_pem = "".join(encode_certificate(crl).strip().split("\n")[1:-1])
- print("set pki ca %s crl '%s'" % (ca_name, crl_pem))
+ print(f"set pki ca {ca_name} crl '{crl_pem}'")
def install_dh_parameters(name, params):
# Show conf commands for installing dh params
print("Configure mode commands to install DH parameters:")
dh_pem = "".join(encode_dh_parameters(params).strip().split("\n")[1:-1])
- print("set pki dh %s parameters '%s'" % (name, dh_pem))
+ print(f"set pki dh {name} parameters '{dh_pem}'")
def install_ssh_key(name, public_key, private_key, passphrase=None):
# Show conf commands for installing ssh key
key_openssh = encode_public_key(public_key, encoding='OpenSSH', key_format='OpenSSH')
username = os.getlogin()
type_key_split = key_openssh.split(" ")
+
+ base = f"set system login user {username} authentication public-keys {name}"
print("Configure mode commands to install SSH key:")
- print("set system login user %s authentication public-keys %s key '%s'" % (username, name, type_key_split[1]))
- print("set system login user %s authentication public-keys %s type '%s'" % (username, name, type_key_split[0]))
- print("")
+ print(f"{base} key '{type_key_split[1]}'")
+ print(f"{base} type '{type_key_split[0]}'", end="\n\n")
print(encode_private_key(private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase))
def install_keypair(name, key_type, private_key=None, public_key=None, passphrase=None):
@@ -189,7 +191,7 @@ def install_keypair(name, key_type, private_key=None, public_key=None, passphras
if install_public_key:
install_public_pem = "".join(public_key_pem.strip().split("\n")[1:-1])
- print("set pki key-pair %s public key '%s'" % (name, install_public_pem))
+ print(f"set pki key-pair {name} public key '{install_public_pem}'")
else:
print("Public key:")
print(public_key_pem)
@@ -200,30 +202,53 @@ def install_keypair(name, key_type, private_key=None, public_key=None, passphras
if install_private_key:
install_private_pem = "".join(private_key_pem.strip().split("\n")[1:-1])
- print("set pki key-pair %s private key '%s'" % (name, install_private_pem))
+ print(f"set pki key-pair {name} private key '{install_private_pem}'")
if passphrase:
- print("set pki key-pair %s private password-protected" % (name,))
+ print(f"set pki key-pair {name} private password-protected")
else:
print("Private key:")
print(private_key_pem)
-def install_wireguard_key(name, private_key, public_key):
+def install_wireguard_key(interface, private_key, public_key):
# Show conf commands for installing wireguard key pairs
- is_interface = re.match(r'^wg[\d]+$', name)
+ from vyos.ifconfig import Section
+ if Section.section(interface) != 'wireguard':
+ print(f'"{interface}" is not a WireGuard interface name!')
+ exit(1)
+
+ # Check if we are running in a config session - if yes, we can directly write to the CLI
+ cli_string = f"interfaces wireguard {interface} private-key '{private_key}'"
+ if Config().in_session():
+ cmd(f"/opt/vyatta/sbin/my_set {cli_string}")
+
+ print('"generate" CLI command executed from config session.\nGenerated private-key was imported to CLI!',end='\n\n')
+ print(f'Use the following command to verify: show interfaces wireguard {interface}')
+ else:
+ print('"generate" CLI command executed from operational level.\n'
+ 'Generated private-key is not stored to CLI, use configure mode commands to install key:', end='\n\n')
+ print(f"set {cli_string}", end="\n\n")
- print("Configure mode commands to install key:")
- if is_interface:
- print("set interfaces wireguard %s private-key '%s'" % (name, private_key))
- print("")
- print("Public key for use on peer configuration: " + public_key)
+ print(f"Corresponding public-key to use on peer system is: '{public_key}'")
+
+
+def install_wireguard_psk(interface, peer, psk):
+ from vyos.ifconfig import Section
+ if Section.section(interface) != 'wireguard':
+ print(f'"{interface}" is not a WireGuard interface name!')
+ exit(1)
+
+ # Check if we are running in a config session - if yes, we can directly write to the CLI
+ cli_string = f"interfaces wireguard {interface} peer {peer} preshared-key '{psk}'"
+ if Config().in_session():
+ cmd(f"/opt/vyatta/sbin/my_set {cli_string}")
+
+ print('"generate" CLI command executed from config session.\nGenerated preshared-key was imported to CLI!',end='\n\n')
+ print(f'Use the following command to verify: show interfaces wireguard {interface}')
else:
- print("set interfaces wireguard [INTERFACE] peer %s public-key '%s'" % (name, public_key))
- print("")
- print("Private key for use on peer configuration: " + private_key)
+ print('"generate" CLI command executed from operational level.\n'
+ 'Generated preshared-key is not stored to CLI, use configure mode commands to install key:', end='\n\n')
+ print(f"set {cli_string}", end="\n\n")
-def install_wireguard_psk(name, psk):
- # Show conf commands for installing wireguard psk
- print("set interfaces wireguard [INTERFACE] peer %s preshared-key '%s'" % (name, psk))
def ask_passphrase():
passphrase = None
@@ -464,7 +489,7 @@ def generate_certificate_sign(name, ca_name, install=False, file=False):
if not cert_req:
print("Invalid certificate request")
return None
-
+
cert = generate_certificate(cert_req, ca_cert, ca_private_key, is_ca=False)
passphrase = ask_passphrase()
@@ -630,49 +655,37 @@ def generate_openvpn_key(name, install=False, file=False):
key_data = "".join(key_lines[1:-1]) # Remove wrapper tags and line endings
key_version = '1'
+ import re
version_search = re.search(r'BEGIN OpenVPN Static key V(\d+)', result) # Future-proofing (hopefully)
if version_search:
key_version = version_search[1]
+ base = f"set pki openvpn shared-secret {name}"
print("Configure mode commands to install OpenVPN key:")
- print("set pki openvpn shared-secret %s key '%s'" % (name, key_data))
- print("set pki openvpn shared-secret %s version '%s'" % (name, key_version))
+ print(f"{base} key '{key_data}'")
+ print(f"{base} version '{key_version}'")
if file:
write_file(f'{name}.key', result)
-def generate_wireguard_key(name, install=False, file=False):
+def generate_wireguard_key(interface=None, install=False):
private_key = cmd('wg genkey')
public_key = cmd('wg pubkey', input=private_key)
- if not install:
- print("Private key: " + private_key)
- print("Public key: " + public_key)
- return None
-
- if install:
- install_wireguard_key(name, private_key, public_key)
-
- if file:
- write_file(f'{name}_public.key', public_key)
- write_file(f'{name}_private.key', private_key)
+ if interface and install:
+ install_wireguard_key(interface, private_key, public_key)
+ else:
+ print(f'Private key: {private_key}')
+ print(f'Public key: {public_key}', end='\n\n')
-def generate_wireguard_psk(name, install=False, file=False):
+def generate_wireguard_psk(interface=None, peer=None, install=False):
psk = cmd('wg genpsk')
-
- if not install and not file:
- print("Pre-shared key:")
- print(psk)
- return None
-
- if install:
- install_wireguard_psk(name, psk)
-
- if file:
- write_file(f'{name}.key', psk)
+ if interface and peer and install:
+ install_wireguard_psk(interface, peer, psk)
+ else:
+ print(f'Pre-shared key: {psk}')
# Show functions
-
def show_certificate_authority(name=None):
headers = ['Name', 'Subject', 'Issuer CN', 'Issued', 'Expiry', 'Private Key', 'Parent']
data = []
@@ -789,10 +802,13 @@ if __name__ == '__main__':
# OpenVPN
parser.add_argument('--openvpn', help='OpenVPN TLS key', required=False)
- # Wireguard
+ # WireGuard
parser.add_argument('--wireguard', help='Wireguard', action='store_true')
- parser.add_argument('--key', help='Wireguard key pair', required=False)
- parser.add_argument('--psk', help='Wireguard pre shared key', required=False)
+ group = parser.add_mutually_exclusive_group()
+ group.add_argument('--key', help='Wireguard key pair', action='store_true', required=False)
+ group.add_argument('--psk', help='Wireguard pre shared key', action='store_true', required=False)
+ parser.add_argument('--interface', help='Install generated keys into running-config for named interface', action='store')
+ parser.add_argument('--peer', help='Install generated keys into running-config for peer', action='store')
# Global
parser.add_argument('--file', help='Write generated keys into specified filename', action='store_true')
@@ -813,26 +829,47 @@ if __name__ == '__main__':
elif args.self_sign:
generate_certificate_selfsign(args.certificate, install=args.install, file=args.file)
else:
- generate_certificate_request(name=args.certificate, install=args.install)
+ generate_certificate_request(name=args.certificate, install=args.install, file=args.file)
+
elif args.crl:
generate_certificate_revocation_list(args.crl, install=args.install, file=args.file)
+
elif args.ssh:
generate_ssh_keypair(args.ssh, install=args.install, file=args.file)
+
elif args.dh:
generate_dh_parameters(args.dh, install=args.install, file=args.file)
+
elif args.keypair:
generate_keypair(args.keypair, install=args.install, file=args.file)
+
elif args.openvpn:
generate_openvpn_key(args.openvpn, install=args.install, file=args.file)
+
elif args.wireguard:
+ # WireGuard supports writing key directly into the CLI, but this
+ # requires the vyos_libexec_dir environment variable to be set
+ os.environ["vyos_libexec_dir"] = "/usr/libexec/vyos"
+
if args.key:
- generate_wireguard_key(args.key, install=args.install, file=args.file)
- elif args.psk:
- generate_wireguard_psk(args.psk, install=args.install, file=args.file)
+ generate_wireguard_key(args.interface, install=args.install)
+ if args.psk:
+ generate_wireguard_psk(args.interface, peer=args.peer, install=args.install)
+
elif args.action == 'show':
if args.ca:
- show_certificate_authority(None if args.ca == 'all' else args.ca)
+ ca_name = None if args.ca == 'all' else args.ca
+ if ca_name:
+ if not conf.exists(['pki', 'ca', ca_name]):
+ print(f'CA "{ca_name}" does not exist!')
+ exit(1)
+ show_certificate_authority(ca_name)
elif args.certificate:
+ cert_name = None if args.certificate == 'all' else args.certificate
+ if cert_name:
+ if not conf.exists(['pki', 'certificate', cert_name]):
+ print(f'Certificate "{cert_name}" does not exist!')
+ exit(1)
show_certificate(None if args.certificate == 'all' else args.certificate)
elif args.crl:
show_crl(None if args.crl == 'all' else args.crl)