summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorIndrajit Raychaudhuri <irc@indrajit.com>2025-01-13 19:34:56 -0600
committerIndrajit Raychaudhuri <irc@indrajit.com>2025-01-16 22:42:12 -0600
commit4fbdd9191d419d91e97118159d1e3cfa67336b3d (patch)
treee8c5ce8346080830bbc60aa8ecdae2ef56f1ec45 /python
parent99d0c7a804ea3cf7f843f0d4810e6772cf7ceeb8 (diff)
downloadvyos-1x-4fbdd9191d419d91e97118159d1e3cfa67336b3d.tar.gz
vyos-1x-4fbdd9191d419d91e97118159d1e3cfa67336b3d.zip
dhcp: T7052: Refactor kea dhcp op-mode functions to vyos.kea
Relocate the kea dhcp op-mode functions to kea helper functions in vyos.kea. This allows the functions to be reused by other scripts, not just op-mode wrappers. This moves the source of truth for the op-mode commands to the actual running kea instance, rather than VyOS config path. Also, apply some minor code cleanup and make some of the mappings consistent across the functions.
Diffstat (limited to 'python')
-rw-r--r--python/vyos/kea.py131
1 files changed, 130 insertions, 1 deletions
diff --git a/python/vyos/kea.py b/python/vyos/kea.py
index addfdba49..32a118f68 100644
--- a/python/vyos/kea.py
+++ b/python/vyos/kea.py
@@ -1,4 +1,4 @@
-# Copyright 2023-2024 VyOS maintainers and contributors <maintainers@vyos.io>
+# Copyright 2023-2025 VyOS maintainers and contributors <maintainers@vyos.io>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -17,6 +17,9 @@ import json
import os
import socket
+from datetime import datetime
+from datetime import timezone
+
from vyos.template import is_ipv6
from vyos.template import isc_static_route
from vyos.template import netmask_from_cidr
@@ -57,6 +60,27 @@ kea6_options = {
kea_ctrl_socket = '/run/kea/dhcp{inet}-ctrl-socket'
+def _format_hex_string(in_str):
+ out_str = ""
+ # if input is divisible by 2, add : every 2 chars
+ if len(in_str) > 0 and len(in_str) % 2 == 0:
+ out_str = ':'.join(a+b for a,b in zip(in_str[::2], in_str[1::2]))
+ else:
+ out_str = in_str
+
+ return out_str
+
+def _find_list_of_dict_index(lst, key='ip', value=''):
+ """
+ Find the index entry of list of dict matching the dict value
+ Exampe:
+ % lst = [{'ip': '192.0.2.1'}, {'ip': '192.0.2.2'}]
+ % _find_list_of_dict_index(lst, key='ip', value='192.0.2.2')
+ % 1
+ """
+ idx = next((index for (index, d) in enumerate(lst) if d[key] == value), None)
+ return idx
+
def kea_parse_options(config):
options = []
@@ -347,6 +371,10 @@ def kea_get_active_config(inet):
return config
+def kea_get_dhcp_pools(config, inet):
+ shared_networks = dict_search_args(config, 'arguments', f'Dhcp{inet}', 'shared-networks')
+ return [network['name'] for network in shared_networks] if shared_networks else []
+
def kea_get_pool_from_subnet_id(config, inet, subnet_id):
shared_networks = dict_search_args(config, 'arguments', f'Dhcp{inet}', 'shared-networks')
@@ -362,3 +390,104 @@ def kea_get_pool_from_subnet_id(config, inet, subnet_id):
return network['name']
return None
+
+
+def kea_get_static_mappings(config, inet, pools=[]) -> list:
+ """
+ Get DHCP static mapping from active Kea DHCPv4 or DHCPv6 configuration
+ :return list
+ """
+ shared_networks = dict_search_args(config, 'arguments', f'Dhcp{inet}', 'shared-networks')
+
+ mappings = []
+
+ if shared_networks:
+ for network in shared_networks:
+ if f'subnet{inet}' not in network:
+ continue
+
+ for p in pools:
+ if network['name'] == p:
+ for subnet in network[f'subnet{inet}']:
+ if 'reservations' in subnet:
+ for reservation in subnet['reservations']:
+ mapping = {'pool': p, 'subnet': subnet['subnet']}
+ mapping.update(reservation)
+ # rename 'ip(v6)-address' to 'ip', inet6 has 'ipv6-address' and inet has 'ip-address'
+ mapping['ip'] = mapping.pop('ipv6-address', mapping.pop('ip-address', None))
+ # rename 'hw-address' to 'mac'
+ mapping['mac'] = mapping.pop('hw-address', None)
+ mappings.append(mapping)
+
+ return mappings
+
+
+def kea_get_server_leases(config, inet, pools=[], state=[], origin=None) -> list:
+ """
+ Get DHCP server leases from active Kea DHCPv4 or DHCPv6 configuration
+ :return list
+ """
+ leases = kea_get_leases(inet)
+
+ data = []
+ for lease in leases:
+ lifetime = lease['valid-lft']
+ expiry = (lease['cltt'] + lifetime)
+
+ lease['start_timestamp'] = datetime.fromtimestamp(expiry - lifetime, timezone.utc)
+ lease['expire_timestamp'] = datetime.fromtimestamp(expiry, timezone.utc) if expiry else None
+
+ data_lease = {}
+ data_lease['ip'] = lease['ip-address']
+ lease_state_long = {0: 'active', 1: 'rejected', 2: 'expired'}
+ data_lease['state'] = lease_state_long[lease['state']]
+ data_lease['pool'] = kea_get_pool_from_subnet_id(config, inet, lease['subnet-id']) if config else '-'
+ data_lease['end'] = lease['expire_timestamp'].timestamp() if lease['expire_timestamp'] else None
+ data_lease['origin'] = 'local' # TODO: Determine remote in HA
+ # remove trailing dot in 'hostname' to ensure consistency for `vyos-hostsd-client`
+ data_lease['hostname'] = lease.get('hostname', '-').rstrip('.')
+
+ if inet == '4':
+ data_lease['mac'] = lease['hw-address']
+ data_lease['start'] = lease['start_timestamp'].timestamp()
+
+ if inet == '6':
+ data_lease['last_communication'] = lease['start_timestamp'].timestamp()
+ data_lease['duid'] = _format_hex_string(lease['duid'])
+ data_lease['type'] = lease['type']
+
+ if lease['type'] == 'IA_PD':
+ prefix_len = lease['prefix-len']
+ data_lease['ip'] += f'/{prefix_len}'
+
+ data_lease['remaining'] = '-'
+
+ if lease['valid-lft'] > 0:
+ data_lease['remaining'] = lease['expire_timestamp'] - datetime.now(timezone.utc)
+
+ if data_lease['remaining'].days >= 0:
+ # substraction gives us a timedelta object which can't be formatted with strftime
+ # so we use str(), split gets rid of the microseconds
+ data_lease['remaining'] = str(data_lease['remaining']).split('.')[0]
+
+ # Do not add old leases
+ if (
+ data_lease['remaining']
+ and data_lease['pool'] in pools
+ and data_lease['state'] != 'free'
+ and (not state or state == 'all' or data_lease['state'] in state)
+ ):
+ data.append(data_lease)
+
+ # deduplicate
+ checked = []
+ for entry in data:
+ addr = entry.get('ip')
+ if addr not in checked:
+ checked.append(addr)
+ else:
+ idx = _find_list_of_dict_index(data, key='ip', value=addr)
+ if idx is not None:
+ data.pop(idx)
+
+ return data