diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/vyos/accel_ppp_util.py | 32 | ||||
-rw-r--r-- | python/vyos/kea.py | 35 | ||||
-rw-r--r-- | python/vyos/pki.py | 31 |
3 files changed, 57 insertions, 41 deletions
diff --git a/python/vyos/accel_ppp_util.py b/python/vyos/accel_ppp_util.py index bd0c46a19..845b2f5f0 100644 --- a/python/vyos/accel_ppp_util.py +++ b/python/vyos/accel_ppp_util.py @@ -106,7 +106,26 @@ def get_pools_in_order(data: dict) -> list: return pools -def verify_accel_ppp_base_service(config, local_users=True): +def verify_accel_ppp_name_servers(config): + if "name_server_ipv4" in config: + if len(config["name_server_ipv4"]) > 2: + raise ConfigError( + "Not more then two IPv4 DNS name-servers " "can be configured" + ) + if "name_server_ipv6" in config: + if len(config["name_server_ipv6"]) > 3: + raise ConfigError( + "Not more then three IPv6 DNS name-servers " "can be configured" + ) + + +def verify_accel_ppp_wins_servers(config): + if 'wins_server' in config and len(config['wins_server']) > 2: + raise ConfigError( + 'Not more then two WINS name-servers can be configured') + + +def verify_accel_ppp_authentication(config, local_users=True): """ Common helper function which must be used by all Accel-PPP services based on get_config_dict() @@ -148,17 +167,6 @@ def verify_accel_ppp_base_service(config, local_users=True): if not dict_search('authentication.radius.dynamic_author.key', config): raise ConfigError('DAE/CoA server key required!') - if "name_server_ipv4" in config: - if len(config["name_server_ipv4"]) > 2: - raise ConfigError( - "Not more then two IPv4 DNS name-servers " "can be configured" - ) - - if "name_server_ipv6" in config: - if len(config["name_server_ipv6"]) > 3: - raise ConfigError( - "Not more then three IPv6 DNS name-servers " "can be configured" - ) diff --git a/python/vyos/kea.py b/python/vyos/kea.py index 7365c1f02..894ac9e9a 100644 --- a/python/vyos/kea.py +++ b/python/vyos/kea.py @@ -17,8 +17,6 @@ import json import os import socket -from datetime import datetime - from vyos.template import is_ipv6 from vyos.template import isc_static_route from vyos.template import netmask_from_cidr @@ -293,29 +291,6 @@ def kea6_parse_subnet(subnet, config): return out -def kea_parse_leases(lease_path): - contents = read_file(lease_path) - lines = contents.split("\n") - output = [] - - if len(lines) < 2: - return output - - headers = lines[0].split(",") - - for line in lines[1:]: - line_out = dict(zip(headers, line.split(","))) - - lifetime = int(line_out['valid_lifetime']) - expiry = int(line_out['expire']) - - line_out['start_timestamp'] = datetime.utcfromtimestamp(expiry - lifetime) - line_out['expire_timestamp'] = datetime.utcfromtimestamp(expiry) if expiry else None - - output.append(line_out) - - return output - def _ctrl_socket_command(path, command, args=None): if not os.path.exists(path): return None @@ -340,6 +315,16 @@ def _ctrl_socket_command(path, command, args=None): return json.loads(result.decode('utf-8')) +def kea_get_leases(inet): + ctrl_socket = f'/run/kea/dhcp{inet}-ctrl-socket' + + leases = _ctrl_socket_command(ctrl_socket, f'lease{inet}-get-all') + + if not leases or 'result' not in leases or leases['result'] != 0: + return [] + + return leases['arguments']['leases'] + def kea_get_active_config(inet): ctrl_socket = f'/run/kea/dhcp{inet}-ctrl-socket' diff --git a/python/vyos/pki.py b/python/vyos/pki.py index 792e24b76..02dece471 100644 --- a/python/vyos/pki.py +++ b/python/vyos/pki.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2023 VyOS maintainers and contributors +# Copyright (C) 2023-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -20,7 +20,9 @@ import ipaddress from cryptography import x509 from cryptography.exceptions import InvalidSignature from cryptography.x509.extensions import ExtensionNotFound -from cryptography.x509.oid import NameOID, ExtendedKeyUsageOID, ExtensionOID +from cryptography.x509.oid import NameOID +from cryptography.x509.oid import ExtendedKeyUsageOID +from cryptography.x509.oid import ExtensionOID from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import dh @@ -45,6 +47,8 @@ DH_BEGIN='-----BEGIN DH PARAMETERS-----\n' DH_END='\n-----END DH PARAMETERS-----' OVPN_BEGIN = '-----BEGIN OpenVPN Static key V{0}-----\n' OVPN_END = '\n-----END OpenVPN Static key V{0}-----' +OPENSSH_KEY_BEGIN='-----BEGIN OPENSSH PRIVATE KEY-----\n' +OPENSSH_KEY_END='\n-----END OPENSSH PRIVATE KEY-----' # Print functions @@ -229,6 +233,12 @@ def wrap_public_key(raw_data): def wrap_private_key(raw_data, passphrase=None): return (KEY_ENC_BEGIN if passphrase else KEY_BEGIN) + raw_data + (KEY_ENC_END if passphrase else KEY_END) +def wrap_openssh_public_key(raw_data, type): + return f'{type} {raw_data}' + +def wrap_openssh_private_key(raw_data): + return OPENSSH_KEY_BEGIN + raw_data + OPENSSH_KEY_END + def wrap_certificate_request(raw_data): return CSR_BEGIN + raw_data + CSR_END @@ -245,7 +255,6 @@ def wrap_openvpn_key(raw_data, version='1'): return OVPN_BEGIN.format(version) + raw_data + OVPN_END.format(version) # Load functions - def load_public_key(raw_data, wrap_tags=True): if wrap_tags: raw_data = wrap_public_key(raw_data) @@ -267,6 +276,21 @@ def load_private_key(raw_data, passphrase=None, wrap_tags=True): except ValueError: return False +def load_openssh_public_key(raw_data, type): + try: + return serialization.load_ssh_public_key(bytes(f'{type} {raw_data}', 'utf-8')) + except ValueError: + return False + +def load_openssh_private_key(raw_data, passphrase=None, wrap_tags=True): + if wrap_tags: + raw_data = wrap_openssh_private_key(raw_data) + + try: + return serialization.load_ssh_private_key(bytes(raw_data, 'utf-8'), password=passphrase) + except ValueError: + return False + def load_certificate_request(raw_data, wrap_tags=True): if wrap_tags: raw_data = wrap_certificate_request(raw_data) @@ -429,4 +453,3 @@ def sort_ca_chain(ca_names, pki_node): from functools import cmp_to_key return sorted(ca_names, key=cmp_to_key(lambda cert1, cert2: ca_cmp(cert1, cert2, pki_node))) - |