summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/vyos/kea.py131
1 files changed, 130 insertions, 1 deletions
diff --git a/python/vyos/kea.py b/python/vyos/kea.py
index addfdba49..32a118f68 100644
--- a/python/vyos/kea.py
+++ b/python/vyos/kea.py
@@ -1,4 +1,4 @@
-# Copyright 2023-2024 VyOS maintainers and contributors <maintainers@vyos.io>
+# Copyright 2023-2025 VyOS maintainers and contributors <maintainers@vyos.io>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -17,6 +17,9 @@ import json
import os
import socket
+from datetime import datetime
+from datetime import timezone
+
from vyos.template import is_ipv6
from vyos.template import isc_static_route
from vyos.template import netmask_from_cidr
@@ -57,6 +60,27 @@ kea6_options = {
kea_ctrl_socket = '/run/kea/dhcp{inet}-ctrl-socket'
+def _format_hex_string(in_str):
+ out_str = ""
+ # if input is divisible by 2, add : every 2 chars
+ if len(in_str) > 0 and len(in_str) % 2 == 0:
+ out_str = ':'.join(a+b for a,b in zip(in_str[::2], in_str[1::2]))
+ else:
+ out_str = in_str
+
+ return out_str
+
+def _find_list_of_dict_index(lst, key='ip', value=''):
+ """
+ Find the index entry of list of dict matching the dict value
+ Exampe:
+ % lst = [{'ip': '192.0.2.1'}, {'ip': '192.0.2.2'}]
+ % _find_list_of_dict_index(lst, key='ip', value='192.0.2.2')
+ % 1
+ """
+ idx = next((index for (index, d) in enumerate(lst) if d[key] == value), None)
+ return idx
+
def kea_parse_options(config):
options = []
@@ -347,6 +371,10 @@ def kea_get_active_config(inet):
return config
+def kea_get_dhcp_pools(config, inet):
+ shared_networks = dict_search_args(config, 'arguments', f'Dhcp{inet}', 'shared-networks')
+ return [network['name'] for network in shared_networks] if shared_networks else []
+
def kea_get_pool_from_subnet_id(config, inet, subnet_id):
shared_networks = dict_search_args(config, 'arguments', f'Dhcp{inet}', 'shared-networks')
@@ -362,3 +390,104 @@ def kea_get_pool_from_subnet_id(config, inet, subnet_id):
return network['name']
return None
+
+
+def kea_get_static_mappings(config, inet, pools=[]) -> list:
+ """
+ Get DHCP static mapping from active Kea DHCPv4 or DHCPv6 configuration
+ :return list
+ """
+ shared_networks = dict_search_args(config, 'arguments', f'Dhcp{inet}', 'shared-networks')
+
+ mappings = []
+
+ if shared_networks:
+ for network in shared_networks:
+ if f'subnet{inet}' not in network:
+ continue
+
+ for p in pools:
+ if network['name'] == p:
+ for subnet in network[f'subnet{inet}']:
+ if 'reservations' in subnet:
+ for reservation in subnet['reservations']:
+ mapping = {'pool': p, 'subnet': subnet['subnet']}
+ mapping.update(reservation)
+ # rename 'ip(v6)-address' to 'ip', inet6 has 'ipv6-address' and inet has 'ip-address'
+ mapping['ip'] = mapping.pop('ipv6-address', mapping.pop('ip-address', None))
+ # rename 'hw-address' to 'mac'
+ mapping['mac'] = mapping.pop('hw-address', None)
+ mappings.append(mapping)
+
+ return mappings
+
+
+def kea_get_server_leases(config, inet, pools=[], state=[], origin=None) -> list:
+ """
+ Get DHCP server leases from active Kea DHCPv4 or DHCPv6 configuration
+ :return list
+ """
+ leases = kea_get_leases(inet)
+
+ data = []
+ for lease in leases:
+ lifetime = lease['valid-lft']
+ expiry = (lease['cltt'] + lifetime)
+
+ lease['start_timestamp'] = datetime.fromtimestamp(expiry - lifetime, timezone.utc)
+ lease['expire_timestamp'] = datetime.fromtimestamp(expiry, timezone.utc) if expiry else None
+
+ data_lease = {}
+ data_lease['ip'] = lease['ip-address']
+ lease_state_long = {0: 'active', 1: 'rejected', 2: 'expired'}
+ data_lease['state'] = lease_state_long[lease['state']]
+ data_lease['pool'] = kea_get_pool_from_subnet_id(config, inet, lease['subnet-id']) if config else '-'
+ data_lease['end'] = lease['expire_timestamp'].timestamp() if lease['expire_timestamp'] else None
+ data_lease['origin'] = 'local' # TODO: Determine remote in HA
+ # remove trailing dot in 'hostname' to ensure consistency for `vyos-hostsd-client`
+ data_lease['hostname'] = lease.get('hostname', '-').rstrip('.')
+
+ if inet == '4':
+ data_lease['mac'] = lease['hw-address']
+ data_lease['start'] = lease['start_timestamp'].timestamp()
+
+ if inet == '6':
+ data_lease['last_communication'] = lease['start_timestamp'].timestamp()
+ data_lease['duid'] = _format_hex_string(lease['duid'])
+ data_lease['type'] = lease['type']
+
+ if lease['type'] == 'IA_PD':
+ prefix_len = lease['prefix-len']
+ data_lease['ip'] += f'/{prefix_len}'
+
+ data_lease['remaining'] = '-'
+
+ if lease['valid-lft'] > 0:
+ data_lease['remaining'] = lease['expire_timestamp'] - datetime.now(timezone.utc)
+
+ if data_lease['remaining'].days >= 0:
+ # substraction gives us a timedelta object which can't be formatted with strftime
+ # so we use str(), split gets rid of the microseconds
+ data_lease['remaining'] = str(data_lease['remaining']).split('.')[0]
+
+ # Do not add old leases
+ if (
+ data_lease['remaining']
+ and data_lease['pool'] in pools
+ and data_lease['state'] != 'free'
+ and (not state or state == 'all' or data_lease['state'] in state)
+ ):
+ data.append(data_lease)
+
+ # deduplicate
+ checked = []
+ for entry in data:
+ addr = entry.get('ip')
+ if addr not in checked:
+ checked.append(addr)
+ else:
+ idx = _find_list_of_dict_index(data, key='ip', value=addr)
+ if idx is not None:
+ data.pop(idx)
+
+ return data