summaryrefslogtreecommitdiff
path: root/src/op_mode
diff options
context:
space:
mode:
authorIndrajit Raychaudhuri <irc@indrajit.com>2025-01-13 19:34:56 -0600
committerIndrajit Raychaudhuri <irc@indrajit.com>2025-01-16 22:42:12 -0600
commit4fbdd9191d419d91e97118159d1e3cfa67336b3d (patch)
treee8c5ce8346080830bbc60aa8ecdae2ef56f1ec45 /src/op_mode
parent99d0c7a804ea3cf7f843f0d4810e6772cf7ceeb8 (diff)
downloadvyos-1x-4fbdd9191d419d91e97118159d1e3cfa67336b3d.tar.gz
vyos-1x-4fbdd9191d419d91e97118159d1e3cfa67336b3d.zip
dhcp: T7052: Refactor kea dhcp op-mode functions to vyos.kea
Relocate the kea dhcp op-mode functions to kea helper functions in vyos.kea. This allows the functions to be reused by other scripts, not just op-mode wrappers. This moves the source of truth for the op-mode commands to the actual running kea instance, rather than VyOS config path. Also, apply some minor code cleanup and make some of the mappings consistent across the functions.
Diffstat (limited to 'src/op_mode')
-rwxr-xr-xsrc/op_mode/dhcp.py249
1 files changed, 82 insertions, 167 deletions
diff --git a/src/op_mode/dhcp.py b/src/op_mode/dhcp.py
index 45de86cab..fde89c706 100755
--- a/src/op_mode/dhcp.py
+++ b/src/op_mode/dhcp.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2022-2024 VyOS maintainers and contributors
+# Copyright (C) 2022-2025 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -19,7 +19,6 @@ import sys
import typing
from datetime import datetime
-from datetime import timezone
from glob import glob
from ipaddress import ip_address
from tabulate import tabulate
@@ -30,11 +29,13 @@ from vyos.base import Warning
from vyos.configquery import ConfigTreeQuery
from vyos.kea import kea_get_active_config
+from vyos.kea import kea_get_dhcp_pools
from vyos.kea import kea_get_leases
-from vyos.kea import kea_get_pool_from_subnet_id
+from vyos.kea import kea_get_server_leases
+from vyos.kea import kea_get_static_mappings
from vyos.kea import kea_delete_lease
-from vyos.utils.process import is_systemd_service_running
from vyos.utils.process import call
+from vyos.utils.process import is_systemd_service_running
time_string = "%a %b %d %H:%M:%S %Z %Y"
@@ -52,115 +53,18 @@ def _utc_to_local(utc_dt):
return datetime.fromtimestamp((datetime.fromtimestamp(utc_dt) - datetime(1970, 1, 1)).total_seconds())
-def _format_hex_string(in_str):
- out_str = ""
- # if input is divisible by 2, add : every 2 chars
- if len(in_str) > 0 and len(in_str) % 2 == 0:
- out_str = ':'.join(a+b for a,b in zip(in_str[::2], in_str[1::2]))
- else:
- out_str = in_str
-
- return out_str
-
-
-def _find_list_of_dict_index(lst, key='ip', value=''):
- """
- Find the index entry of list of dict matching the dict value
- Exampe:
- % lst = [{'ip': '192.0.2.1'}, {'ip': '192.0.2.2'}]
- % _find_list_of_dict_index(lst, key='ip', value='192.0.2.2')
- % 1
- """
- idx = next((index for (index, d) in enumerate(lst) if d[key] == value), None)
- return idx
-
-
-def _get_raw_server_leases(family='inet', pool=None, sorted=None, state=[], origin=None) -> list:
- """
- Get DHCP server leases
- :return list
- """
+def _get_raw_server_leases(config, family='inet', pool=None, sorted=None, state=[], origin=None) -> list:
inet_suffix = '6' if family == 'inet6' else '4'
- try:
- leases = kea_get_leases(inet_suffix)
- except Exception:
- raise vyos.opmode.DataUnavailable('Cannot fetch DHCP server lease information')
+ pools = [pool] if pool else kea_get_dhcp_pools(config, inet_suffix)
- if pool is None:
- pool = _get_dhcp_pools(family=family)
- else:
- pool = [pool]
-
- try:
- active_config = kea_get_active_config(inet_suffix)
- except Exception:
- raise vyos.opmode.DataUnavailable('Cannot fetch DHCP server configuration')
-
- data = []
- for lease in leases:
- lifetime = lease['valid-lft']
- expiry = (lease['cltt'] + lifetime)
-
- lease['start_timestamp'] = datetime.fromtimestamp(expiry - lifetime, timezone.utc)
- lease['expire_timestamp'] = datetime.fromtimestamp(expiry, timezone.utc) if expiry else None
-
- data_lease = {}
- data_lease['ip'] = lease['ip-address']
- lease_state_long = {0: 'active', 1: 'rejected', 2: 'expired'}
- data_lease['state'] = lease_state_long[lease['state']]
- data_lease['pool'] = kea_get_pool_from_subnet_id(active_config, inet_suffix, lease['subnet-id']) if active_config else '-'
- data_lease['end'] = lease['expire_timestamp'].timestamp() if lease['expire_timestamp'] else None
- data_lease['origin'] = 'local' # TODO: Determine remote in HA
- data_lease['hostname'] = lease.get('hostname', '-')
- # remove trailing dot to ensure consistency for `vyos-hostsd-client`
- if data_lease['hostname'] and data_lease['hostname'][-1] == '.':
- data_lease['hostname'] = data_lease['hostname'][:-1]
-
- if family == 'inet':
- data_lease['mac'] = lease['hw-address']
- data_lease['start'] = lease['start_timestamp'].timestamp()
-
- if family == 'inet6':
- data_lease['last_communication'] = lease['start_timestamp'].timestamp()
- data_lease['duid'] = _format_hex_string(lease['duid'])
- data_lease['type'] = lease['type']
-
- if lease['type'] == 'IA_PD':
- prefix_len = lease['prefix-len']
- data_lease['ip'] += f'/{prefix_len}'
-
- data_lease['remaining'] = '-'
-
- if lease['valid-lft'] > 0:
- data_lease['remaining'] = lease['expire_timestamp'] - datetime.now(timezone.utc)
-
- if data_lease['remaining'].days >= 0:
- # substraction gives us a timedelta object which can't be formatted with strftime
- # so we use str(), split gets rid of the microseconds
- data_lease['remaining'] = str(data_lease['remaining']).split('.')[0]
-
- # Do not add old leases
- if data_lease['remaining'] != '' and data_lease['pool'] in pool and data_lease['state'] != 'free':
- if not state or state == 'all' or data_lease['state'] in state:
- data.append(data_lease)
-
- # deduplicate
- checked = []
- for entry in data:
- addr = entry.get('ip')
- if addr not in checked:
- checked.append(addr)
- else:
- idx = _find_list_of_dict_index(data, key='ip', value=addr)
- if idx is not None:
- data.pop(idx)
+ mappings = kea_get_server_leases(config, inet_suffix, pools, state, origin)
if sorted:
if sorted == 'ip':
- data.sort(key = lambda x:ip_address(x['ip']))
+ mappings.sort(key = lambda x:ip_address(x['ip']))
else:
- data.sort(key = lambda x:x[sorted])
- return data
+ mappings.sort(key = lambda x:x[sorted])
+ return mappings
def _get_formatted_server_leases(raw_data, family='inet'):
@@ -204,12 +108,6 @@ def _get_formatted_server_leases(raw_data, family='inet'):
return output
-def _get_dhcp_pools(family='inet') -> list:
- v = 'v6' if family == 'inet6' else ''
- pools = config.list_nodes(f'service dhcp{v}-server shared-network-name')
- return pools
-
-
def _get_pool_size(pool, family='inet'):
v = 'v6' if family == 'inet6' else ''
base = f'service dhcp{v}-server shared-network-name {pool}'
@@ -229,21 +127,17 @@ def _get_pool_size(pool, family='inet'):
return size
-def _get_raw_pool_statistics(family='inet', pool=None):
- if pool is None:
- pool = _get_dhcp_pools(family=family)
- else:
- pool = [pool]
+def _get_raw_pool_statistics(config, family='inet', pool=None):
+ inet_suffix = '6' if family == 'inet6' else '4'
+ pools = [pool] if pool else kea_get_dhcp_pools(config, inet_suffix)
- v = 'v6' if family == 'inet6' else ''
stats = []
- for p in pool:
- subnet = config.list_nodes(f'service dhcp{v}-server shared-network-name {p} subnet')
+ for p in pools:
size = _get_pool_size(family=family, pool=p)
- leases = len(_get_raw_server_leases(family=family, pool=p))
+ leases = len(_get_raw_server_leases(config, family=family, pool=p))
use_percentage = round(leases / size * 100) if size != 0 else 0
pool_stats = {'pool': p, 'size': size, 'leases': leases,
- 'available': (size - leases), 'use_percentage': use_percentage, 'subnet': subnet}
+ 'available': (size - leases), 'use_percentage': use_percentage}
stats.append(pool_stats)
return stats
@@ -263,59 +157,46 @@ def _get_formatted_pool_statistics(pool_data, family='inet'):
output = tabulate(data_entries, headers, numalign='left')
return output
-def _get_raw_server_static_mappings(family='inet', pool=None, sorted=None):
- if pool is None:
- pool = _get_dhcp_pools(family=family)
- else:
- pool = [pool]
- v = 'v6' if family == 'inet6' else ''
- mappings = []
- for p in pool:
- pool_config = config.get_config_dict(['service', f'dhcp{v}-server', 'shared-network-name', p],
- get_first_key=True)
- if 'subnet' in pool_config:
- for subnet, subnet_config in pool_config['subnet'].items():
- if 'static-mapping' in subnet_config:
- for name, mapping_config in subnet_config['static-mapping'].items():
- mapping = {'pool': p, 'subnet': subnet, 'name': name}
- mapping.update(mapping_config)
- mappings.append(mapping)
+def _get_raw_server_static_mappings(config, family='inet', pool=None, sorted=None):
+
+ inet_suffix = '6' if family == 'inet6' else '4'
+ pools = [pool] if pool else kea_get_dhcp_pools(config, inet_suffix)
+
+ mappings = kea_get_static_mappings(config, inet_suffix, pools)
if sorted:
if sorted == 'ip':
- if family == 'inet6':
- mappings.sort(key = lambda x:ip_address(x['ipv6-address']))
- else:
- mappings.sort(key = lambda x:ip_address(x['ip-address']))
+ mappings.sort(key = lambda x:ip_address(x['ip']))
else:
mappings.sort(key = lambda x:x[sorted])
return mappings
+
def _get_formatted_server_static_mappings(raw_data, family='inet'):
data_entries = []
if family == 'inet':
for entry in raw_data:
pool = entry.get('pool')
subnet = entry.get('subnet')
- name = entry.get('name')
- ip_addr = entry.get('ip-address', 'N/A')
+ hostname = entry.get('hostname')
+ ip_addr = entry.get('ip', 'N/A')
mac_addr = entry.get('mac', 'N/A')
duid = entry.get('duid', 'N/A')
description = entry.get('description', 'N/A')
- data_entries.append([pool, subnet, name, ip_addr, mac_addr, duid, description])
+ data_entries.append([pool, subnet, hostname, ip_addr, mac_addr, duid, description])
elif family == 'inet6':
for entry in raw_data:
pool = entry.get('pool')
subnet = entry.get('subnet')
- name = entry.get('name')
- ip_addr = entry.get('ipv6-address', 'N/A')
+ hostname = entry.get('hostname')
+ ip_addr = entry.get('ip', 'N/A')
mac_addr = entry.get('mac', 'N/A')
duid = entry.get('duid', 'N/A')
description = entry.get('description', 'N/A')
- data_entries.append([pool, subnet, name, ip_addr, mac_addr, duid, description])
+ data_entries.append([pool, subnet, hostname, ip_addr, mac_addr, duid, description])
- headers = ['Pool', 'Subnet', 'Name', 'IP Address', 'MAC Address', 'DUID', 'Description']
+ headers = ['Pool', 'Subnet', 'Hostname', 'IP Address', 'MAC Address', 'DUID', 'Description']
output = tabulate(data_entries, headers, numalign='left')
return output
@@ -357,7 +238,24 @@ def _verify_client(func):
@_verify
def show_pool_statistics(raw: bool, family: ArgFamily, pool: typing.Optional[str]):
- pool_data = _get_raw_pool_statistics(family=family, pool=pool)
+
+ v = 'v6' if family == 'inet6' else ''
+ inet_suffix = '6' if family == 'inet6' else '4'
+
+ if not is_systemd_service_running(f'kea-dhcp{inet_suffix}-server.service'):
+ Warning('DHCP server is configured but not started. Data may be stale.')
+
+ try:
+ active_config = kea_get_active_config(inet_suffix)
+ except Exception:
+ raise vyos.opmode.DataUnavailable('Cannot fetch DHCP server configuration')
+
+ active_pools = kea_get_dhcp_pools(active_config, inet_suffix)
+
+ if pool and active_pools and pool not in active_pools:
+ raise vyos.opmode.IncorrectValue(f'DHCP{v} pool "{pool}" does not exist!')
+
+ pool_data = _get_raw_pool_statistics(active_config, family=family, pool=pool)
if raw:
return pool_data
else:
@@ -368,23 +266,31 @@ def show_pool_statistics(raw: bool, family: ArgFamily, pool: typing.Optional[str
def show_server_leases(raw: bool, family: ArgFamily, pool: typing.Optional[str],
sorted: typing.Optional[str], state: typing.Optional[ArgState],
origin: typing.Optional[ArgOrigin] ):
- # if dhcp server is down, inactive leases may still be shown as active, so warn the user.
- v = '6' if family == 'inet6' else '4'
- if not is_systemd_service_running(f'kea-dhcp{v}-server.service'):
- Warning('DHCP server is configured but not started. Data may be stale.')
v = 'v6' if family == 'inet6' else ''
- if pool and pool not in _get_dhcp_pools(family=family):
- raise vyos.opmode.IncorrectValue(f'DHCP{v} pool "{pool}" does not exist!')
+ inet_suffix = '6' if family == 'inet6' else '4'
- if state and state not in lease_valid_states:
- raise vyos.opmode.IncorrectValue(f'DHCP{v} state "{state}" is invalid!')
+ if not is_systemd_service_running(f'kea-dhcp{inet_suffix}-server.service'):
+ Warning('DHCP server is configured but not started. Data may be stale.')
+
+ try:
+ active_config = kea_get_active_config(inet_suffix)
+ except Exception:
+ raise vyos.opmode.DataUnavailable('Cannot fetch DHCP server configuration')
+
+ active_pools = kea_get_dhcp_pools(active_config, inet_suffix)
+
+ if pool and active_pools and pool not in active_pools:
+ raise vyos.opmode.IncorrectValue(f'DHCP{v} pool "{pool}" does not exist!')
sort_valid = sort_valid_inet6 if family == 'inet6' else sort_valid_inet
if sorted and sorted not in sort_valid:
raise vyos.opmode.IncorrectValue(f'DHCP{v} sort "{sorted}" is invalid!')
- lease_data = _get_raw_server_leases(family=family, pool=pool, sorted=sorted, state=state, origin=origin)
+ if state and state not in lease_valid_states:
+ raise vyos.opmode.IncorrectValue(f'DHCP{v} state "{state}" is invalid!')
+
+ lease_data = _get_raw_server_leases(config=active_config, family=family, pool=pool, sorted=sorted, state=state, origin=origin)
if raw:
return lease_data
else:
@@ -394,13 +300,25 @@ def show_server_leases(raw: bool, family: ArgFamily, pool: typing.Optional[str],
def show_server_static_mappings(raw: bool, family: ArgFamily, pool: typing.Optional[str],
sorted: typing.Optional[str]):
v = 'v6' if family == 'inet6' else ''
- if pool and pool not in _get_dhcp_pools(family=family):
+ inet_suffix = '6' if family == 'inet6' else '4'
+
+ if not is_systemd_service_running(f'kea-dhcp{inet_suffix}-server.service'):
+ Warning('DHCP server is configured but not started. Data may be stale.')
+
+ try:
+ active_config = kea_get_active_config(inet_suffix)
+ except Exception:
+ raise vyos.opmode.DataUnavailable('Cannot fetch DHCP server configuration')
+
+ active_pools = kea_get_dhcp_pools(active_config, inet_suffix)
+
+ if pool and active_pools and pool not in active_pools:
raise vyos.opmode.IncorrectValue(f'DHCP{v} pool "{pool}" does not exist!')
if sorted and sorted not in mapping_sort_valid:
raise vyos.opmode.IncorrectValue(f'DHCP{v} sort "{sorted}" is invalid!')
- static_mappings = _get_raw_server_static_mappings(family=family, pool=pool, sorted=sorted)
+ static_mappings = _get_raw_server_static_mappings(config=active_config, family=family, pool=pool, sorted=sorted)
if raw:
return static_mappings
else:
@@ -408,10 +326,7 @@ def show_server_static_mappings(raw: bool, family: ArgFamily, pool: typing.Optio
def _lease_valid(inet, address):
leases = kea_get_leases(inet)
- for lease in leases:
- if address == lease['ip-address']:
- return True
- return False
+ return any(lease['ip-address'] == address for lease in leases)
@_verify
def clear_dhcp_server_lease(family: ArgFamily, address: str):