summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/vyos/accel_ppp_util.py32
-rw-r--r--python/vyos/kea.py35
-rw-r--r--python/vyos/pki.py31
3 files changed, 57 insertions, 41 deletions
diff --git a/python/vyos/accel_ppp_util.py b/python/vyos/accel_ppp_util.py
index bd0c46a19..845b2f5f0 100644
--- a/python/vyos/accel_ppp_util.py
+++ b/python/vyos/accel_ppp_util.py
@@ -106,7 +106,26 @@ def get_pools_in_order(data: dict) -> list:
return pools
-def verify_accel_ppp_base_service(config, local_users=True):
+def verify_accel_ppp_name_servers(config):
+ if "name_server_ipv4" in config:
+ if len(config["name_server_ipv4"]) > 2:
+ raise ConfigError(
+ "Not more then two IPv4 DNS name-servers " "can be configured"
+ )
+ if "name_server_ipv6" in config:
+ if len(config["name_server_ipv6"]) > 3:
+ raise ConfigError(
+ "Not more then three IPv6 DNS name-servers " "can be configured"
+ )
+
+
+def verify_accel_ppp_wins_servers(config):
+ if 'wins_server' in config and len(config['wins_server']) > 2:
+ raise ConfigError(
+ 'Not more then two WINS name-servers can be configured')
+
+
+def verify_accel_ppp_authentication(config, local_users=True):
"""
Common helper function which must be used by all Accel-PPP services based
on get_config_dict()
@@ -148,17 +167,6 @@ def verify_accel_ppp_base_service(config, local_users=True):
if not dict_search('authentication.radius.dynamic_author.key', config):
raise ConfigError('DAE/CoA server key required!')
- if "name_server_ipv4" in config:
- if len(config["name_server_ipv4"]) > 2:
- raise ConfigError(
- "Not more then two IPv4 DNS name-servers " "can be configured"
- )
-
- if "name_server_ipv6" in config:
- if len(config["name_server_ipv6"]) > 3:
- raise ConfigError(
- "Not more then three IPv6 DNS name-servers " "can be configured"
- )
diff --git a/python/vyos/kea.py b/python/vyos/kea.py
index 7365c1f02..894ac9e9a 100644
--- a/python/vyos/kea.py
+++ b/python/vyos/kea.py
@@ -17,8 +17,6 @@ import json
import os
import socket
-from datetime import datetime
-
from vyos.template import is_ipv6
from vyos.template import isc_static_route
from vyos.template import netmask_from_cidr
@@ -293,29 +291,6 @@ def kea6_parse_subnet(subnet, config):
return out
-def kea_parse_leases(lease_path):
- contents = read_file(lease_path)
- lines = contents.split("\n")
- output = []
-
- if len(lines) < 2:
- return output
-
- headers = lines[0].split(",")
-
- for line in lines[1:]:
- line_out = dict(zip(headers, line.split(",")))
-
- lifetime = int(line_out['valid_lifetime'])
- expiry = int(line_out['expire'])
-
- line_out['start_timestamp'] = datetime.utcfromtimestamp(expiry - lifetime)
- line_out['expire_timestamp'] = datetime.utcfromtimestamp(expiry) if expiry else None
-
- output.append(line_out)
-
- return output
-
def _ctrl_socket_command(path, command, args=None):
if not os.path.exists(path):
return None
@@ -340,6 +315,16 @@ def _ctrl_socket_command(path, command, args=None):
return json.loads(result.decode('utf-8'))
+def kea_get_leases(inet):
+ ctrl_socket = f'/run/kea/dhcp{inet}-ctrl-socket'
+
+ leases = _ctrl_socket_command(ctrl_socket, f'lease{inet}-get-all')
+
+ if not leases or 'result' not in leases or leases['result'] != 0:
+ return []
+
+ return leases['arguments']['leases']
+
def kea_get_active_config(inet):
ctrl_socket = f'/run/kea/dhcp{inet}-ctrl-socket'
diff --git a/python/vyos/pki.py b/python/vyos/pki.py
index 792e24b76..02dece471 100644
--- a/python/vyos/pki.py
+++ b/python/vyos/pki.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2023 VyOS maintainers and contributors
+# Copyright (C) 2023-2024 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -20,7 +20,9 @@ import ipaddress
from cryptography import x509
from cryptography.exceptions import InvalidSignature
from cryptography.x509.extensions import ExtensionNotFound
-from cryptography.x509.oid import NameOID, ExtendedKeyUsageOID, ExtensionOID
+from cryptography.x509.oid import NameOID
+from cryptography.x509.oid import ExtendedKeyUsageOID
+from cryptography.x509.oid import ExtensionOID
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import dh
@@ -45,6 +47,8 @@ DH_BEGIN='-----BEGIN DH PARAMETERS-----\n'
DH_END='\n-----END DH PARAMETERS-----'
OVPN_BEGIN = '-----BEGIN OpenVPN Static key V{0}-----\n'
OVPN_END = '\n-----END OpenVPN Static key V{0}-----'
+OPENSSH_KEY_BEGIN='-----BEGIN OPENSSH PRIVATE KEY-----\n'
+OPENSSH_KEY_END='\n-----END OPENSSH PRIVATE KEY-----'
# Print functions
@@ -229,6 +233,12 @@ def wrap_public_key(raw_data):
def wrap_private_key(raw_data, passphrase=None):
return (KEY_ENC_BEGIN if passphrase else KEY_BEGIN) + raw_data + (KEY_ENC_END if passphrase else KEY_END)
+def wrap_openssh_public_key(raw_data, type):
+ return f'{type} {raw_data}'
+
+def wrap_openssh_private_key(raw_data):
+ return OPENSSH_KEY_BEGIN + raw_data + OPENSSH_KEY_END
+
def wrap_certificate_request(raw_data):
return CSR_BEGIN + raw_data + CSR_END
@@ -245,7 +255,6 @@ def wrap_openvpn_key(raw_data, version='1'):
return OVPN_BEGIN.format(version) + raw_data + OVPN_END.format(version)
# Load functions
-
def load_public_key(raw_data, wrap_tags=True):
if wrap_tags:
raw_data = wrap_public_key(raw_data)
@@ -267,6 +276,21 @@ def load_private_key(raw_data, passphrase=None, wrap_tags=True):
except ValueError:
return False
+def load_openssh_public_key(raw_data, type):
+ try:
+ return serialization.load_ssh_public_key(bytes(f'{type} {raw_data}', 'utf-8'))
+ except ValueError:
+ return False
+
+def load_openssh_private_key(raw_data, passphrase=None, wrap_tags=True):
+ if wrap_tags:
+ raw_data = wrap_openssh_private_key(raw_data)
+
+ try:
+ return serialization.load_ssh_private_key(bytes(raw_data, 'utf-8'), password=passphrase)
+ except ValueError:
+ return False
+
def load_certificate_request(raw_data, wrap_tags=True):
if wrap_tags:
raw_data = wrap_certificate_request(raw_data)
@@ -429,4 +453,3 @@ def sort_ca_chain(ca_names, pki_node):
from functools import cmp_to_key
return sorted(ca_names, key=cmp_to_key(lambda cert1, cert2: ca_cmp(cert1, cert2, pki_node)))
-