summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorNataliia Solomko <natalirs1985@gmail.com>2024-10-18 15:42:59 +0300
committerNataliia Solomko <natalirs1985@gmail.com>2024-10-18 15:42:59 +0300
commit8943446c8c79c6daad2326ddc7a59950123b35d2 (patch)
treeeca41b73cac77d5bc6362f3f13d3b6adf531978c /src
parente5d2ac54150922640c08bacab124e96c7bbd1f7f (diff)
downloadvyos-1x-8943446c8c79c6daad2326ddc7a59950123b35d2.tar.gz
vyos-1x-8943446c8c79c6daad2326ddc7a59950123b35d2.zip
pki: T4914: Rewrite the PKI op mode in the new style
Diffstat (limited to 'src')
-rwxr-xr-xsrc/op_mode/pki.py295
-rw-r--r--src/services/api/rest/routers.py4
2 files changed, 165 insertions, 134 deletions
diff --git a/src/op_mode/pki.py b/src/op_mode/pki.py
index 5652a5d74..56b873bb1 100755
--- a/src/op_mode/pki.py
+++ b/src/op_mode/pki.py
@@ -14,16 +14,18 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import argparse
import ipaddress
import os
import re
import sys
import tabulate
+import typing
from cryptography import x509
from cryptography.x509.oid import ExtendedKeyUsageOID
+import vyos.opmode
+
from vyos.config import Config
from vyos.config import config_dict_mangle_acme
from vyos.pki import encode_certificate
@@ -51,8 +53,36 @@ from vyos.utils.process import cmd
CERT_REQ_END = '-----END CERTIFICATE REQUEST-----'
auth_dir = '/config/auth'
+ArgsPkiType = typing.Literal['ca', 'certificate', 'dh', 'key-pair', 'openvpn', 'crl']
+ArgsPkiTypeGen = typing.Literal[ArgsPkiType, typing.Literal['ssh', 'wireguard']]
+ArgsFingerprint = typing.Literal['sha256', 'sha384', 'sha512']
+
# Helper Functions
conf = Config()
+
+
+def _verify(target):
+ """Decorator checks if config for PKI exists"""
+ from functools import wraps
+
+ if target not in ['ca', 'certificate']:
+ raise ValueError('Invalid PKI')
+
+ def _verify_target(func):
+ @wraps(func)
+ def _wrapper(*args, **kwargs):
+ name = kwargs.get('name')
+ unconf_message = f'PKI {target} "{name}" does not exist!'
+ if name:
+ if not conf.exists(['pki', target, name]):
+ raise vyos.opmode.UnconfiguredSubsystem(unconf_message)
+ return func(*args, **kwargs)
+
+ return _wrapper
+
+ return _verify_target
+
+
def get_default_values():
# Fetch default x509 values
base = ['pki', 'x509', 'default']
@@ -872,8 +902,111 @@ def import_openvpn_secret(name, path):
install_openvpn_key(name, key_data, key_version)
-# Show functions
-def show_certificate_authority(name=None, pem=False):
+
+def generate_pki(
+ raw: bool,
+ pki_type: ArgsPkiTypeGen,
+ name: typing.Optional[str],
+ file: typing.Optional[bool],
+ install: typing.Optional[bool],
+ sign: typing.Optional[str],
+ self_sign: typing.Optional[bool],
+ key: typing.Optional[bool],
+ psk: typing.Optional[bool],
+ interface: typing.Optional[str],
+ peer: typing.Optional[str],
+):
+ try:
+ if pki_type == 'ca':
+ if sign:
+ generate_ca_certificate_sign(name, sign, install=install, file=file)
+ else:
+ generate_ca_certificate(name, install=install, file=file)
+ elif pki_type == 'certificate':
+ if sign:
+ generate_certificate_sign(name, sign, install=install, file=file)
+ elif self_sign:
+ generate_certificate_selfsign(name, install=install, file=file)
+ else:
+ generate_certificate_request(name=name, install=install, file=file)
+
+ elif pki_type == 'crl':
+ generate_certificate_revocation_list(name, install=install, file=file)
+
+ elif pki_type == 'ssh':
+ generate_ssh_keypair(name, install=install, file=file)
+
+ elif pki_type == 'dh':
+ generate_dh_parameters(name, install=install, file=file)
+
+ elif pki_type == 'key-pair':
+ generate_keypair(name, install=install, file=file)
+
+ elif pki_type == 'openvpn':
+ generate_openvpn_key(name, install=install, file=file)
+
+ elif pki_type == 'wireguard':
+ # WireGuard supports writing key directly into the CLI, but this
+ # requires the vyos_libexec_dir environment variable to be set
+ os.environ['vyos_libexec_dir'] = '/usr/libexec/vyos'
+
+ if key:
+ generate_wireguard_key(interface, install=install)
+ if psk:
+ generate_wireguard_psk(interface, peer=peer, install=install)
+ except KeyboardInterrupt:
+ print('Aborted')
+ sys.exit(0)
+
+
+def import_pki(
+ name: str,
+ pki_type: ArgsPkiType,
+ filename: typing.Optional[str],
+ key_filename: typing.Optional[str],
+ no_prompt: typing.Optional[bool],
+ passphrase: typing.Optional[str],
+):
+ try:
+ if pki_type == 'ca':
+ import_ca_certificate(
+ name,
+ path=filename,
+ key_path=key_filename,
+ no_prompt=no_prompt,
+ passphrase=passphrase,
+ )
+ elif pki_type == 'certificate':
+ import_certificate(
+ name,
+ path=filename,
+ key_path=key_filename,
+ no_prompt=no_prompt,
+ passphrase=passphrase,
+ )
+ elif pki_type == 'crl':
+ import_crl(name, filename)
+ elif pki_type == 'dh':
+ import_dh_parameters(name, filename)
+ elif pki_type == 'key-pair':
+ import_keypair(
+ name,
+ path=filename,
+ key_path=key_filename,
+ no_prompt=no_prompt,
+ passphrase=passphrase,
+ )
+ elif pki_type == 'openvpn':
+ import_openvpn_secret(name, filename)
+ except KeyboardInterrupt:
+ print('Aborted')
+ sys.exit(0)
+
+
+@_verify('ca')
+def show_certificate_authority(
+ raw: bool, name: typing.Optional[str] = None, pem: typing.Optional[bool] = False
+):
headers = ['Name', 'Subject', 'Issuer CN', 'Issued', 'Expiry', 'Private Key', 'Parent']
data = []
certs = get_config_ca_certificate()
@@ -905,7 +1038,14 @@ def show_certificate_authority(name=None, pem=False):
print("Certificate Authorities:")
print(tabulate.tabulate(data, headers))
-def show_certificate(name=None, pem=False, fingerprint_hash=None):
+
+@_verify('certificate')
+def show_certificate(
+ raw: bool,
+ name: typing.Optional[str] = None,
+ pem: typing.Optional[bool] = False,
+ fingerprint: typing.Optional[ArgsFingerprint] = None,
+):
headers = ['Name', 'Type', 'Subject CN', 'Issuer CN', 'Issued', 'Expiry', 'Revoked', 'Private Key', 'CA Present']
data = []
certs = get_config_certificate()
@@ -926,8 +1066,8 @@ def show_certificate(name=None, pem=False, fingerprint_hash=None):
if name and pem:
print(encode_certificate(cert))
return
- elif name and fingerprint_hash:
- print(get_certificate_fingerprint(cert, fingerprint_hash))
+ elif name and fingerprint:
+ print(get_certificate_fingerprint(cert, fingerprint))
return
ca_name = get_certificate_ca(cert, ca_certs)
@@ -955,7 +1095,10 @@ def show_certificate(name=None, pem=False, fingerprint_hash=None):
print("Certificates:")
print(tabulate.tabulate(data, headers))
-def show_crl(name=None, pem=False):
+
+def show_crl(
+ raw: bool, name: typing.Optional[str] = None, pem: typing.Optional[bool] = False
+):
headers = ['CA Name', 'Updated', 'Revokes']
data = []
certs = get_config_ca_certificate()
@@ -989,132 +1132,20 @@ def show_crl(name=None, pem=False):
print("Certificate Revocation Lists:")
print(tabulate.tabulate(data, headers))
-if __name__ == '__main__':
- parser = argparse.ArgumentParser()
- parser.add_argument('--action', help='PKI action', required=True)
-
- # X509
- parser.add_argument('--ca', help='Certificate Authority', required=False)
- parser.add_argument('--certificate', help='Certificate', required=False)
- parser.add_argument('--crl', help='Certificate Revocation List', required=False)
- parser.add_argument('--sign', help='Sign certificate with specified CA', required=False)
- parser.add_argument('--self-sign', help='Self-sign the certificate', action='store_true')
- parser.add_argument('--pem', help='Output using PEM encoding', action='store_true')
- parser.add_argument('--fingerprint', help='Show fingerprint and exit', action='store')
- # SSH
- parser.add_argument('--ssh', help='SSH Key', required=False)
+def show_all(raw: bool):
+ show_certificate_authority(raw)
+ print('\n')
+ show_certificate(raw)
+ print('\n')
+ show_crl(raw)
- # DH
- parser.add_argument('--dh', help='DH Parameters', required=False)
-
- # Key pair
- parser.add_argument('--keypair', help='Key pair', required=False)
-
- # OpenVPN
- parser.add_argument('--openvpn', help='OpenVPN TLS key', required=False)
-
- # WireGuard
- parser.add_argument('--wireguard', help='Wireguard', action='store_true')
- group = parser.add_mutually_exclusive_group()
- group.add_argument('--key', help='Wireguard key pair', action='store_true', required=False)
- group.add_argument('--psk', help='Wireguard pre shared key', action='store_true', required=False)
- parser.add_argument('--interface', help='Install generated keys into running-config for named interface', action='store')
- parser.add_argument('--peer', help='Install generated keys into running-config for peer', action='store')
-
- # Global
- parser.add_argument('--file', help='Write generated keys into specified filename', action='store_true')
- parser.add_argument('--install', help='Install generated keys into running-config', action='store_true')
-
- parser.add_argument('--filename', help='Write certificate into specified filename', action='store')
- parser.add_argument('--key-filename', help='Write key into specified filename', action='store')
-
- parser.add_argument('--no-prompt', action='store_true', help='Perform action non-interactively')
- parser.add_argument('--passphrase', help='A passphrase to decrypt the private key')
-
- args = parser.parse_args()
+if __name__ == '__main__':
try:
- if args.action == 'generate':
- if args.ca:
- if args.sign:
- generate_ca_certificate_sign(args.ca, args.sign, install=args.install, file=args.file)
- else:
- generate_ca_certificate(args.ca, install=args.install, file=args.file)
- elif args.certificate:
- if args.sign:
- generate_certificate_sign(args.certificate, args.sign, install=args.install, file=args.file)
- elif args.self_sign:
- generate_certificate_selfsign(args.certificate, install=args.install, file=args.file)
- else:
- generate_certificate_request(name=args.certificate, install=args.install, file=args.file)
-
- elif args.crl:
- generate_certificate_revocation_list(args.crl, install=args.install, file=args.file)
-
- elif args.ssh:
- generate_ssh_keypair(args.ssh, install=args.install, file=args.file)
-
- elif args.dh:
- generate_dh_parameters(args.dh, install=args.install, file=args.file)
-
- elif args.keypair:
- generate_keypair(args.keypair, install=args.install, file=args.file)
-
- elif args.openvpn:
- generate_openvpn_key(args.openvpn, install=args.install, file=args.file)
-
- elif args.wireguard:
- # WireGuard supports writing key directly into the CLI, but this
- # requires the vyos_libexec_dir environment variable to be set
- os.environ["vyos_libexec_dir"] = "/usr/libexec/vyos"
-
- if args.key:
- generate_wireguard_key(args.interface, install=args.install)
- if args.psk:
- generate_wireguard_psk(args.interface, peer=args.peer, install=args.install)
- elif args.action == 'import':
- if args.ca:
- import_ca_certificate(args.ca, path=args.filename, key_path=args.key_filename,
- no_prompt=args.no_prompt, passphrase=args.passphrase)
- elif args.certificate:
- import_certificate(args.certificate, path=args.filename, key_path=args.key_filename,
- no_prompt=args.no_prompt, passphrase=args.passphrase)
- elif args.crl:
- import_crl(args.crl, args.filename)
- elif args.dh:
- import_dh_parameters(args.dh, args.filename)
- elif args.keypair:
- import_keypair(args.keypair, path=args.filename, key_path=args.key_filename,
- no_prompt=args.no_prompt, passphrase=args.passphrase)
- elif args.openvpn:
- import_openvpn_secret(args.openvpn, args.filename)
- elif args.action == 'show':
- if args.ca:
- ca_name = None if args.ca == 'all' else args.ca
- if ca_name:
- if not conf.exists(['pki', 'ca', ca_name]):
- print(f'CA "{ca_name}" does not exist!')
- exit(1)
- show_certificate_authority(ca_name, args.pem)
- elif args.certificate:
- cert_name = None if args.certificate == 'all' else args.certificate
- if cert_name:
- if not conf.exists(['pki', 'certificate', cert_name]):
- print(f'Certificate "{cert_name}" does not exist!')
- exit(1)
- if args.fingerprint is None:
- show_certificate(None if args.certificate == 'all' else args.certificate, args.pem)
- else:
- show_certificate(args.certificate, fingerprint_hash=args.fingerprint)
- elif args.crl:
- show_crl(None if args.crl == 'all' else args.crl, args.pem)
- else:
- show_certificate_authority()
- print('\n')
- show_certificate()
- print('\n')
- show_crl()
- except KeyboardInterrupt:
- print("Aborted")
- sys.exit(0)
+ res = vyos.opmode.run(sys.modules[__name__])
+ if res:
+ print(res)
+ except (ValueError, vyos.opmode.Error) as e:
+ print(e)
+ sys.exit(1)
diff --git a/src/services/api/rest/routers.py b/src/services/api/rest/routers.py
index da981d5bf..5612e947c 100644
--- a/src/services/api/rest/routers.py
+++ b/src/services/api/rest/routers.py
@@ -423,9 +423,9 @@ def create_path_import_pki_no_prompt(path):
correct_paths = ['ca', 'certificate', 'key-pair']
if path[1] not in correct_paths:
return False
- path[1] = '--' + path[1].replace('-', '')
path[3] = '--key-filename'
- return path[1:]
+ path.insert(2, '--name')
+ return ['--pki-type'] + path[1:]
@router.post('/configure')