diff options
author | Indrajit Raychaudhuri <irc@indrajit.com> | 2025-01-13 19:34:56 -0600 |
---|---|---|
committer | Indrajit Raychaudhuri <irc@indrajit.com> | 2025-01-16 22:42:12 -0600 |
commit | 4fbdd9191d419d91e97118159d1e3cfa67336b3d (patch) | |
tree | e8c5ce8346080830bbc60aa8ecdae2ef56f1ec45 | |
parent | 99d0c7a804ea3cf7f843f0d4810e6772cf7ceeb8 (diff) | |
download | vyos-1x-4fbdd9191d419d91e97118159d1e3cfa67336b3d.tar.gz vyos-1x-4fbdd9191d419d91e97118159d1e3cfa67336b3d.zip |
dhcp: T7052: Refactor kea dhcp op-mode functions to vyos.kea
Relocate the kea dhcp op-mode functions to kea helper
functions in vyos.kea. This allows the functions to
be reused by other scripts, not just op-mode wrappers.
This moves the source of truth for the op-mode
commands to the actual running kea instance,
rather than VyOS config path.
Also, apply some minor code cleanup and make some
of the mappings consistent across the functions.
-rw-r--r-- | python/vyos/kea.py | 131 | ||||
-rwxr-xr-x | src/op_mode/dhcp.py | 249 |
2 files changed, 212 insertions, 168 deletions
diff --git a/python/vyos/kea.py b/python/vyos/kea.py index addfdba49..32a118f68 100644 --- a/python/vyos/kea.py +++ b/python/vyos/kea.py @@ -1,4 +1,4 @@ -# Copyright 2023-2024 VyOS maintainers and contributors <maintainers@vyos.io> +# Copyright 2023-2025 VyOS maintainers and contributors <maintainers@vyos.io> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -17,6 +17,9 @@ import json import os import socket +from datetime import datetime +from datetime import timezone + from vyos.template import is_ipv6 from vyos.template import isc_static_route from vyos.template import netmask_from_cidr @@ -57,6 +60,27 @@ kea6_options = { kea_ctrl_socket = '/run/kea/dhcp{inet}-ctrl-socket' +def _format_hex_string(in_str): + out_str = "" + # if input is divisible by 2, add : every 2 chars + if len(in_str) > 0 and len(in_str) % 2 == 0: + out_str = ':'.join(a+b for a,b in zip(in_str[::2], in_str[1::2])) + else: + out_str = in_str + + return out_str + +def _find_list_of_dict_index(lst, key='ip', value=''): + """ + Find the index entry of list of dict matching the dict value + Exampe: + % lst = [{'ip': '192.0.2.1'}, {'ip': '192.0.2.2'}] + % _find_list_of_dict_index(lst, key='ip', value='192.0.2.2') + % 1 + """ + idx = next((index for (index, d) in enumerate(lst) if d[key] == value), None) + return idx + def kea_parse_options(config): options = [] @@ -347,6 +371,10 @@ def kea_get_active_config(inet): return config +def kea_get_dhcp_pools(config, inet): + shared_networks = dict_search_args(config, 'arguments', f'Dhcp{inet}', 'shared-networks') + return [network['name'] for network in shared_networks] if shared_networks else [] + def kea_get_pool_from_subnet_id(config, inet, subnet_id): shared_networks = dict_search_args(config, 'arguments', f'Dhcp{inet}', 'shared-networks') @@ -362,3 +390,104 @@ def kea_get_pool_from_subnet_id(config, inet, subnet_id): return network['name'] return None + + +def kea_get_static_mappings(config, inet, pools=[]) -> list: + """ + Get DHCP static mapping from active Kea DHCPv4 or DHCPv6 configuration + :return list + """ + shared_networks = dict_search_args(config, 'arguments', f'Dhcp{inet}', 'shared-networks') + + mappings = [] + + if shared_networks: + for network in shared_networks: + if f'subnet{inet}' not in network: + continue + + for p in pools: + if network['name'] == p: + for subnet in network[f'subnet{inet}']: + if 'reservations' in subnet: + for reservation in subnet['reservations']: + mapping = {'pool': p, 'subnet': subnet['subnet']} + mapping.update(reservation) + # rename 'ip(v6)-address' to 'ip', inet6 has 'ipv6-address' and inet has 'ip-address' + mapping['ip'] = mapping.pop('ipv6-address', mapping.pop('ip-address', None)) + # rename 'hw-address' to 'mac' + mapping['mac'] = mapping.pop('hw-address', None) + mappings.append(mapping) + + return mappings + + +def kea_get_server_leases(config, inet, pools=[], state=[], origin=None) -> list: + """ + Get DHCP server leases from active Kea DHCPv4 or DHCPv6 configuration + :return list + """ + leases = kea_get_leases(inet) + + data = [] + for lease in leases: + lifetime = lease['valid-lft'] + expiry = (lease['cltt'] + lifetime) + + lease['start_timestamp'] = datetime.fromtimestamp(expiry - lifetime, timezone.utc) + lease['expire_timestamp'] = datetime.fromtimestamp(expiry, timezone.utc) if expiry else None + + data_lease = {} + data_lease['ip'] = lease['ip-address'] + lease_state_long = {0: 'active', 1: 'rejected', 2: 'expired'} + data_lease['state'] = lease_state_long[lease['state']] + data_lease['pool'] = kea_get_pool_from_subnet_id(config, inet, lease['subnet-id']) if config else '-' + data_lease['end'] = lease['expire_timestamp'].timestamp() if lease['expire_timestamp'] else None + data_lease['origin'] = 'local' # TODO: Determine remote in HA + # remove trailing dot in 'hostname' to ensure consistency for `vyos-hostsd-client` + data_lease['hostname'] = lease.get('hostname', '-').rstrip('.') + + if inet == '4': + data_lease['mac'] = lease['hw-address'] + data_lease['start'] = lease['start_timestamp'].timestamp() + + if inet == '6': + data_lease['last_communication'] = lease['start_timestamp'].timestamp() + data_lease['duid'] = _format_hex_string(lease['duid']) + data_lease['type'] = lease['type'] + + if lease['type'] == 'IA_PD': + prefix_len = lease['prefix-len'] + data_lease['ip'] += f'/{prefix_len}' + + data_lease['remaining'] = '-' + + if lease['valid-lft'] > 0: + data_lease['remaining'] = lease['expire_timestamp'] - datetime.now(timezone.utc) + + if data_lease['remaining'].days >= 0: + # substraction gives us a timedelta object which can't be formatted with strftime + # so we use str(), split gets rid of the microseconds + data_lease['remaining'] = str(data_lease['remaining']).split('.')[0] + + # Do not add old leases + if ( + data_lease['remaining'] + and data_lease['pool'] in pools + and data_lease['state'] != 'free' + and (not state or state == 'all' or data_lease['state'] in state) + ): + data.append(data_lease) + + # deduplicate + checked = [] + for entry in data: + addr = entry.get('ip') + if addr not in checked: + checked.append(addr) + else: + idx = _find_list_of_dict_index(data, key='ip', value=addr) + if idx is not None: + data.pop(idx) + + return data diff --git a/src/op_mode/dhcp.py b/src/op_mode/dhcp.py index 45de86cab..fde89c706 100755 --- a/src/op_mode/dhcp.py +++ b/src/op_mode/dhcp.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2022-2024 VyOS maintainers and contributors +# Copyright (C) 2022-2025 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -19,7 +19,6 @@ import sys import typing from datetime import datetime -from datetime import timezone from glob import glob from ipaddress import ip_address from tabulate import tabulate @@ -30,11 +29,13 @@ from vyos.base import Warning from vyos.configquery import ConfigTreeQuery from vyos.kea import kea_get_active_config +from vyos.kea import kea_get_dhcp_pools from vyos.kea import kea_get_leases -from vyos.kea import kea_get_pool_from_subnet_id +from vyos.kea import kea_get_server_leases +from vyos.kea import kea_get_static_mappings from vyos.kea import kea_delete_lease -from vyos.utils.process import is_systemd_service_running from vyos.utils.process import call +from vyos.utils.process import is_systemd_service_running time_string = "%a %b %d %H:%M:%S %Z %Y" @@ -52,115 +53,18 @@ def _utc_to_local(utc_dt): return datetime.fromtimestamp((datetime.fromtimestamp(utc_dt) - datetime(1970, 1, 1)).total_seconds()) -def _format_hex_string(in_str): - out_str = "" - # if input is divisible by 2, add : every 2 chars - if len(in_str) > 0 and len(in_str) % 2 == 0: - out_str = ':'.join(a+b for a,b in zip(in_str[::2], in_str[1::2])) - else: - out_str = in_str - - return out_str - - -def _find_list_of_dict_index(lst, key='ip', value=''): - """ - Find the index entry of list of dict matching the dict value - Exampe: - % lst = [{'ip': '192.0.2.1'}, {'ip': '192.0.2.2'}] - % _find_list_of_dict_index(lst, key='ip', value='192.0.2.2') - % 1 - """ - idx = next((index for (index, d) in enumerate(lst) if d[key] == value), None) - return idx - - -def _get_raw_server_leases(family='inet', pool=None, sorted=None, state=[], origin=None) -> list: - """ - Get DHCP server leases - :return list - """ +def _get_raw_server_leases(config, family='inet', pool=None, sorted=None, state=[], origin=None) -> list: inet_suffix = '6' if family == 'inet6' else '4' - try: - leases = kea_get_leases(inet_suffix) - except Exception: - raise vyos.opmode.DataUnavailable('Cannot fetch DHCP server lease information') + pools = [pool] if pool else kea_get_dhcp_pools(config, inet_suffix) - if pool is None: - pool = _get_dhcp_pools(family=family) - else: - pool = [pool] - - try: - active_config = kea_get_active_config(inet_suffix) - except Exception: - raise vyos.opmode.DataUnavailable('Cannot fetch DHCP server configuration') - - data = [] - for lease in leases: - lifetime = lease['valid-lft'] - expiry = (lease['cltt'] + lifetime) - - lease['start_timestamp'] = datetime.fromtimestamp(expiry - lifetime, timezone.utc) - lease['expire_timestamp'] = datetime.fromtimestamp(expiry, timezone.utc) if expiry else None - - data_lease = {} - data_lease['ip'] = lease['ip-address'] - lease_state_long = {0: 'active', 1: 'rejected', 2: 'expired'} - data_lease['state'] = lease_state_long[lease['state']] - data_lease['pool'] = kea_get_pool_from_subnet_id(active_config, inet_suffix, lease['subnet-id']) if active_config else '-' - data_lease['end'] = lease['expire_timestamp'].timestamp() if lease['expire_timestamp'] else None - data_lease['origin'] = 'local' # TODO: Determine remote in HA - data_lease['hostname'] = lease.get('hostname', '-') - # remove trailing dot to ensure consistency for `vyos-hostsd-client` - if data_lease['hostname'] and data_lease['hostname'][-1] == '.': - data_lease['hostname'] = data_lease['hostname'][:-1] - - if family == 'inet': - data_lease['mac'] = lease['hw-address'] - data_lease['start'] = lease['start_timestamp'].timestamp() - - if family == 'inet6': - data_lease['last_communication'] = lease['start_timestamp'].timestamp() - data_lease['duid'] = _format_hex_string(lease['duid']) - data_lease['type'] = lease['type'] - - if lease['type'] == 'IA_PD': - prefix_len = lease['prefix-len'] - data_lease['ip'] += f'/{prefix_len}' - - data_lease['remaining'] = '-' - - if lease['valid-lft'] > 0: - data_lease['remaining'] = lease['expire_timestamp'] - datetime.now(timezone.utc) - - if data_lease['remaining'].days >= 0: - # substraction gives us a timedelta object which can't be formatted with strftime - # so we use str(), split gets rid of the microseconds - data_lease['remaining'] = str(data_lease['remaining']).split('.')[0] - - # Do not add old leases - if data_lease['remaining'] != '' and data_lease['pool'] in pool and data_lease['state'] != 'free': - if not state or state == 'all' or data_lease['state'] in state: - data.append(data_lease) - - # deduplicate - checked = [] - for entry in data: - addr = entry.get('ip') - if addr not in checked: - checked.append(addr) - else: - idx = _find_list_of_dict_index(data, key='ip', value=addr) - if idx is not None: - data.pop(idx) + mappings = kea_get_server_leases(config, inet_suffix, pools, state, origin) if sorted: if sorted == 'ip': - data.sort(key = lambda x:ip_address(x['ip'])) + mappings.sort(key = lambda x:ip_address(x['ip'])) else: - data.sort(key = lambda x:x[sorted]) - return data + mappings.sort(key = lambda x:x[sorted]) + return mappings def _get_formatted_server_leases(raw_data, family='inet'): @@ -204,12 +108,6 @@ def _get_formatted_server_leases(raw_data, family='inet'): return output -def _get_dhcp_pools(family='inet') -> list: - v = 'v6' if family == 'inet6' else '' - pools = config.list_nodes(f'service dhcp{v}-server shared-network-name') - return pools - - def _get_pool_size(pool, family='inet'): v = 'v6' if family == 'inet6' else '' base = f'service dhcp{v}-server shared-network-name {pool}' @@ -229,21 +127,17 @@ def _get_pool_size(pool, family='inet'): return size -def _get_raw_pool_statistics(family='inet', pool=None): - if pool is None: - pool = _get_dhcp_pools(family=family) - else: - pool = [pool] +def _get_raw_pool_statistics(config, family='inet', pool=None): + inet_suffix = '6' if family == 'inet6' else '4' + pools = [pool] if pool else kea_get_dhcp_pools(config, inet_suffix) - v = 'v6' if family == 'inet6' else '' stats = [] - for p in pool: - subnet = config.list_nodes(f'service dhcp{v}-server shared-network-name {p} subnet') + for p in pools: size = _get_pool_size(family=family, pool=p) - leases = len(_get_raw_server_leases(family=family, pool=p)) + leases = len(_get_raw_server_leases(config, family=family, pool=p)) use_percentage = round(leases / size * 100) if size != 0 else 0 pool_stats = {'pool': p, 'size': size, 'leases': leases, - 'available': (size - leases), 'use_percentage': use_percentage, 'subnet': subnet} + 'available': (size - leases), 'use_percentage': use_percentage} stats.append(pool_stats) return stats @@ -263,59 +157,46 @@ def _get_formatted_pool_statistics(pool_data, family='inet'): output = tabulate(data_entries, headers, numalign='left') return output -def _get_raw_server_static_mappings(family='inet', pool=None, sorted=None): - if pool is None: - pool = _get_dhcp_pools(family=family) - else: - pool = [pool] - v = 'v6' if family == 'inet6' else '' - mappings = [] - for p in pool: - pool_config = config.get_config_dict(['service', f'dhcp{v}-server', 'shared-network-name', p], - get_first_key=True) - if 'subnet' in pool_config: - for subnet, subnet_config in pool_config['subnet'].items(): - if 'static-mapping' in subnet_config: - for name, mapping_config in subnet_config['static-mapping'].items(): - mapping = {'pool': p, 'subnet': subnet, 'name': name} - mapping.update(mapping_config) - mappings.append(mapping) +def _get_raw_server_static_mappings(config, family='inet', pool=None, sorted=None): + + inet_suffix = '6' if family == 'inet6' else '4' + pools = [pool] if pool else kea_get_dhcp_pools(config, inet_suffix) + + mappings = kea_get_static_mappings(config, inet_suffix, pools) if sorted: if sorted == 'ip': - if family == 'inet6': - mappings.sort(key = lambda x:ip_address(x['ipv6-address'])) - else: - mappings.sort(key = lambda x:ip_address(x['ip-address'])) + mappings.sort(key = lambda x:ip_address(x['ip'])) else: mappings.sort(key = lambda x:x[sorted]) return mappings + def _get_formatted_server_static_mappings(raw_data, family='inet'): data_entries = [] if family == 'inet': for entry in raw_data: pool = entry.get('pool') subnet = entry.get('subnet') - name = entry.get('name') - ip_addr = entry.get('ip-address', 'N/A') + hostname = entry.get('hostname') + ip_addr = entry.get('ip', 'N/A') mac_addr = entry.get('mac', 'N/A') duid = entry.get('duid', 'N/A') description = entry.get('description', 'N/A') - data_entries.append([pool, subnet, name, ip_addr, mac_addr, duid, description]) + data_entries.append([pool, subnet, hostname, ip_addr, mac_addr, duid, description]) elif family == 'inet6': for entry in raw_data: pool = entry.get('pool') subnet = entry.get('subnet') - name = entry.get('name') - ip_addr = entry.get('ipv6-address', 'N/A') + hostname = entry.get('hostname') + ip_addr = entry.get('ip', 'N/A') mac_addr = entry.get('mac', 'N/A') duid = entry.get('duid', 'N/A') description = entry.get('description', 'N/A') - data_entries.append([pool, subnet, name, ip_addr, mac_addr, duid, description]) + data_entries.append([pool, subnet, hostname, ip_addr, mac_addr, duid, description]) - headers = ['Pool', 'Subnet', 'Name', 'IP Address', 'MAC Address', 'DUID', 'Description'] + headers = ['Pool', 'Subnet', 'Hostname', 'IP Address', 'MAC Address', 'DUID', 'Description'] output = tabulate(data_entries, headers, numalign='left') return output @@ -357,7 +238,24 @@ def _verify_client(func): @_verify def show_pool_statistics(raw: bool, family: ArgFamily, pool: typing.Optional[str]): - pool_data = _get_raw_pool_statistics(family=family, pool=pool) + + v = 'v6' if family == 'inet6' else '' + inet_suffix = '6' if family == 'inet6' else '4' + + if not is_systemd_service_running(f'kea-dhcp{inet_suffix}-server.service'): + Warning('DHCP server is configured but not started. Data may be stale.') + + try: + active_config = kea_get_active_config(inet_suffix) + except Exception: + raise vyos.opmode.DataUnavailable('Cannot fetch DHCP server configuration') + + active_pools = kea_get_dhcp_pools(active_config, inet_suffix) + + if pool and active_pools and pool not in active_pools: + raise vyos.opmode.IncorrectValue(f'DHCP{v} pool "{pool}" does not exist!') + + pool_data = _get_raw_pool_statistics(active_config, family=family, pool=pool) if raw: return pool_data else: @@ -368,23 +266,31 @@ def show_pool_statistics(raw: bool, family: ArgFamily, pool: typing.Optional[str def show_server_leases(raw: bool, family: ArgFamily, pool: typing.Optional[str], sorted: typing.Optional[str], state: typing.Optional[ArgState], origin: typing.Optional[ArgOrigin] ): - # if dhcp server is down, inactive leases may still be shown as active, so warn the user. - v = '6' if family == 'inet6' else '4' - if not is_systemd_service_running(f'kea-dhcp{v}-server.service'): - Warning('DHCP server is configured but not started. Data may be stale.') v = 'v6' if family == 'inet6' else '' - if pool and pool not in _get_dhcp_pools(family=family): - raise vyos.opmode.IncorrectValue(f'DHCP{v} pool "{pool}" does not exist!') + inet_suffix = '6' if family == 'inet6' else '4' - if state and state not in lease_valid_states: - raise vyos.opmode.IncorrectValue(f'DHCP{v} state "{state}" is invalid!') + if not is_systemd_service_running(f'kea-dhcp{inet_suffix}-server.service'): + Warning('DHCP server is configured but not started. Data may be stale.') + + try: + active_config = kea_get_active_config(inet_suffix) + except Exception: + raise vyos.opmode.DataUnavailable('Cannot fetch DHCP server configuration') + + active_pools = kea_get_dhcp_pools(active_config, inet_suffix) + + if pool and active_pools and pool not in active_pools: + raise vyos.opmode.IncorrectValue(f'DHCP{v} pool "{pool}" does not exist!') sort_valid = sort_valid_inet6 if family == 'inet6' else sort_valid_inet if sorted and sorted not in sort_valid: raise vyos.opmode.IncorrectValue(f'DHCP{v} sort "{sorted}" is invalid!') - lease_data = _get_raw_server_leases(family=family, pool=pool, sorted=sorted, state=state, origin=origin) + if state and state not in lease_valid_states: + raise vyos.opmode.IncorrectValue(f'DHCP{v} state "{state}" is invalid!') + + lease_data = _get_raw_server_leases(config=active_config, family=family, pool=pool, sorted=sorted, state=state, origin=origin) if raw: return lease_data else: @@ -394,13 +300,25 @@ def show_server_leases(raw: bool, family: ArgFamily, pool: typing.Optional[str], def show_server_static_mappings(raw: bool, family: ArgFamily, pool: typing.Optional[str], sorted: typing.Optional[str]): v = 'v6' if family == 'inet6' else '' - if pool and pool not in _get_dhcp_pools(family=family): + inet_suffix = '6' if family == 'inet6' else '4' + + if not is_systemd_service_running(f'kea-dhcp{inet_suffix}-server.service'): + Warning('DHCP server is configured but not started. Data may be stale.') + + try: + active_config = kea_get_active_config(inet_suffix) + except Exception: + raise vyos.opmode.DataUnavailable('Cannot fetch DHCP server configuration') + + active_pools = kea_get_dhcp_pools(active_config, inet_suffix) + + if pool and active_pools and pool not in active_pools: raise vyos.opmode.IncorrectValue(f'DHCP{v} pool "{pool}" does not exist!') if sorted and sorted not in mapping_sort_valid: raise vyos.opmode.IncorrectValue(f'DHCP{v} sort "{sorted}" is invalid!') - static_mappings = _get_raw_server_static_mappings(family=family, pool=pool, sorted=sorted) + static_mappings = _get_raw_server_static_mappings(config=active_config, family=family, pool=pool, sorted=sorted) if raw: return static_mappings else: @@ -408,10 +326,7 @@ def show_server_static_mappings(raw: bool, family: ArgFamily, pool: typing.Optio def _lease_valid(inet, address): leases = kea_get_leases(inet) - for lease in leases: - if address == lease['ip-address']: - return True - return False + return any(lease['ip-address'] == address for lease in leases) @_verify def clear_dhcp_server_lease(family: ArgFamily, address: str): |