diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/vyos/kea.py | 38 | ||||
-rw-r--r-- | python/vyos/pki.py | 31 |
2 files changed, 40 insertions, 29 deletions
diff --git a/python/vyos/kea.py b/python/vyos/kea.py index 720bebec3..894ac9e9a 100644 --- a/python/vyos/kea.py +++ b/python/vyos/kea.py @@ -17,8 +17,6 @@ import json import os import socket -from datetime import datetime - from vyos.template import is_ipv6 from vyos.template import isc_static_route from vyos.template import netmask_from_cidr @@ -212,6 +210,9 @@ def kea6_parse_subnet(subnet, config): if 'option' in config: out['option-data'] = kea6_parse_options(config['option']) + if 'interface' in config: + out['interface'] = config['interface'] + if 'range' in config: pools = [] for num, range_config in config['range'].items(): @@ -290,29 +291,6 @@ def kea6_parse_subnet(subnet, config): return out -def kea_parse_leases(lease_path): - contents = read_file(lease_path) - lines = contents.split("\n") - output = [] - - if len(lines) < 2: - return output - - headers = lines[0].split(",") - - for line in lines[1:]: - line_out = dict(zip(headers, line.split(","))) - - lifetime = int(line_out['valid_lifetime']) - expiry = int(line_out['expire']) - - line_out['start_timestamp'] = datetime.utcfromtimestamp(expiry - lifetime) - line_out['expire_timestamp'] = datetime.utcfromtimestamp(expiry) if expiry else None - - output.append(line_out) - - return output - def _ctrl_socket_command(path, command, args=None): if not os.path.exists(path): return None @@ -337,6 +315,16 @@ def _ctrl_socket_command(path, command, args=None): return json.loads(result.decode('utf-8')) +def kea_get_leases(inet): + ctrl_socket = f'/run/kea/dhcp{inet}-ctrl-socket' + + leases = _ctrl_socket_command(ctrl_socket, f'lease{inet}-get-all') + + if not leases or 'result' not in leases or leases['result'] != 0: + return [] + + return leases['arguments']['leases'] + def kea_get_active_config(inet): ctrl_socket = f'/run/kea/dhcp{inet}-ctrl-socket' diff --git a/python/vyos/pki.py b/python/vyos/pki.py index 792e24b76..02dece471 100644 --- a/python/vyos/pki.py +++ b/python/vyos/pki.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2023 VyOS maintainers and contributors +# Copyright (C) 2023-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -20,7 +20,9 @@ import ipaddress from cryptography import x509 from cryptography.exceptions import InvalidSignature from cryptography.x509.extensions import ExtensionNotFound -from cryptography.x509.oid import NameOID, ExtendedKeyUsageOID, ExtensionOID +from cryptography.x509.oid import NameOID +from cryptography.x509.oid import ExtendedKeyUsageOID +from cryptography.x509.oid import ExtensionOID from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import dh @@ -45,6 +47,8 @@ DH_BEGIN='-----BEGIN DH PARAMETERS-----\n' DH_END='\n-----END DH PARAMETERS-----' OVPN_BEGIN = '-----BEGIN OpenVPN Static key V{0}-----\n' OVPN_END = '\n-----END OpenVPN Static key V{0}-----' +OPENSSH_KEY_BEGIN='-----BEGIN OPENSSH PRIVATE KEY-----\n' +OPENSSH_KEY_END='\n-----END OPENSSH PRIVATE KEY-----' # Print functions @@ -229,6 +233,12 @@ def wrap_public_key(raw_data): def wrap_private_key(raw_data, passphrase=None): return (KEY_ENC_BEGIN if passphrase else KEY_BEGIN) + raw_data + (KEY_ENC_END if passphrase else KEY_END) +def wrap_openssh_public_key(raw_data, type): + return f'{type} {raw_data}' + +def wrap_openssh_private_key(raw_data): + return OPENSSH_KEY_BEGIN + raw_data + OPENSSH_KEY_END + def wrap_certificate_request(raw_data): return CSR_BEGIN + raw_data + CSR_END @@ -245,7 +255,6 @@ def wrap_openvpn_key(raw_data, version='1'): return OVPN_BEGIN.format(version) + raw_data + OVPN_END.format(version) # Load functions - def load_public_key(raw_data, wrap_tags=True): if wrap_tags: raw_data = wrap_public_key(raw_data) @@ -267,6 +276,21 @@ def load_private_key(raw_data, passphrase=None, wrap_tags=True): except ValueError: return False +def load_openssh_public_key(raw_data, type): + try: + return serialization.load_ssh_public_key(bytes(f'{type} {raw_data}', 'utf-8')) + except ValueError: + return False + +def load_openssh_private_key(raw_data, passphrase=None, wrap_tags=True): + if wrap_tags: + raw_data = wrap_openssh_private_key(raw_data) + + try: + return serialization.load_ssh_private_key(bytes(raw_data, 'utf-8'), password=passphrase) + except ValueError: + return False + def load_certificate_request(raw_data, wrap_tags=True): if wrap_tags: raw_data = wrap_certificate_request(raw_data) @@ -429,4 +453,3 @@ def sort_ca_chain(ca_names, pki_node): from functools import cmp_to_key return sorted(ca_names, key=cmp_to_key(lambda cert1, cert2: ca_cmp(cert1, cert2, pki_node))) - |