summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--python/vyos/kea.py148
-rwxr-xr-xsrc/op_mode/dhcp.py221
2 files changed, 267 insertions, 102 deletions
diff --git a/python/vyos/kea.py b/python/vyos/kea.py
index 32a118f68..951c83693 100644
--- a/python/vyos/kea.py
+++ b/python/vyos/kea.py
@@ -43,7 +43,7 @@ kea4_options = {
'time_offset': 'time-offset',
'wpad_url': 'wpad-url',
'ipv6_only_preferred': 'v6-only-preferred',
- 'captive_portal': 'v4-captive-portal'
+ 'captive_portal': 'v4-captive-portal',
}
kea6_options = {
@@ -55,21 +55,23 @@ kea6_options = {
'nisplus_domain': 'nisp-domain-name',
'nisplus_server': 'nisp-servers',
'sntp_server': 'sntp-servers',
- 'captive_portal': 'v6-captive-portal'
+ 'captive_portal': 'v6-captive-portal',
}
kea_ctrl_socket = '/run/kea/dhcp{inet}-ctrl-socket'
+
def _format_hex_string(in_str):
- out_str = ""
+ out_str = ''
# if input is divisible by 2, add : every 2 chars
if len(in_str) > 0 and len(in_str) % 2 == 0:
- out_str = ':'.join(a+b for a,b in zip(in_str[::2], in_str[1::2]))
+ out_str = ':'.join(a + b for a, b in zip(in_str[::2], in_str[1::2]))
else:
out_str = in_str
return out_str
+
def _find_list_of_dict_index(lst, key='ip', value=''):
"""
Find the index entry of list of dict matching the dict value
@@ -81,6 +83,7 @@ def _find_list_of_dict_index(lst, key='ip', value=''):
idx = next((index for (index, d) in enumerate(lst) if d[key] == value), None)
return idx
+
def kea_parse_options(config):
options = []
@@ -88,14 +91,21 @@ def kea_parse_options(config):
if node not in config:
continue
- value = ", ".join(config[node]) if isinstance(config[node], list) else config[node]
+ value = (
+ ', '.join(config[node]) if isinstance(config[node], list) else config[node]
+ )
options.append({'name': option_name, 'data': value})
if 'client_prefix_length' in config:
- options.append({'name': 'subnet-mask', 'data': netmask_from_cidr('0.0.0.0/' + config['client_prefix_length'])})
+ options.append(
+ {
+ 'name': 'subnet-mask',
+ 'data': netmask_from_cidr('0.0.0.0/' + config['client_prefix_length']),
+ }
+ )
if 'ip_forwarding' in config:
- options.append({'name': 'ip-forwarding', 'data': "true"})
+ options.append({'name': 'ip-forwarding', 'data': 'true'})
if 'static_route' in config:
default_route = ''
@@ -103,31 +113,41 @@ def kea_parse_options(config):
if 'default_router' in config:
default_route = isc_static_route('0.0.0.0/0', config['default_router'])
- routes = [isc_static_route(route, route_options['next_hop']) for route, route_options in config['static_route'].items()]
-
- options.append({'name': 'rfc3442-static-route', 'data': ", ".join(routes if not default_route else routes + [default_route])})
- options.append({'name': 'windows-static-route', 'data': ", ".join(routes)})
+ routes = [
+ isc_static_route(route, route_options['next_hop'])
+ for route, route_options in config['static_route'].items()
+ ]
+
+ options.append(
+ {
+ 'name': 'rfc3442-static-route',
+ 'data': ', '.join(
+ routes if not default_route else routes + [default_route]
+ ),
+ }
+ )
+ options.append({'name': 'windows-static-route', 'data': ', '.join(routes)})
if 'time_zone' in config:
- with open("/usr/share/zoneinfo/" + config['time_zone'], "rb") as f:
- tz_string = f.read().split(b"\n")[-2].decode("utf-8")
+ with open('/usr/share/zoneinfo/' + config['time_zone'], 'rb') as f:
+ tz_string = f.read().split(b'\n')[-2].decode('utf-8')
options.append({'name': 'pcode', 'data': tz_string})
options.append({'name': 'tcode', 'data': config['time_zone']})
- unifi_controller = dict_search_args(config, 'vendor_option', 'ubiquiti', 'unifi_controller')
+ unifi_controller = dict_search_args(
+ config, 'vendor_option', 'ubiquiti', 'unifi_controller'
+ )
if unifi_controller:
- options.append({
- 'name': 'unifi-controller',
- 'data': unifi_controller,
- 'space': 'ubnt'
- })
+ options.append(
+ {'name': 'unifi-controller', 'data': unifi_controller, 'space': 'ubnt'}
+ )
return options
+
def kea_parse_subnet(subnet, config):
out = {'subnet': subnet, 'id': int(config['subnet_id'])}
- options = []
if 'option' in config:
out['option-data'] = kea_parse_options(config['option'])
@@ -149,9 +169,7 @@ def kea_parse_subnet(subnet, config):
pools = []
for num, range_config in config['range'].items():
start, stop = range_config['start'], range_config['stop']
- pool = {
- 'pool': f'{start} - {stop}'
- }
+ pool = {'pool': f'{start} - {stop}'}
if 'option' in range_config:
pool['option-data'] = kea_parse_options(range_config['option'])
@@ -188,16 +206,21 @@ def kea_parse_subnet(subnet, config):
reservation['option-data'] = kea_parse_options(host_config['option'])
if 'bootfile_name' in host_config['option']:
- reservation['boot-file-name'] = host_config['option']['bootfile_name']
+ reservation['boot-file-name'] = host_config['option'][
+ 'bootfile_name'
+ ]
if 'bootfile_server' in host_config['option']:
- reservation['next-server'] = host_config['option']['bootfile_server']
+ reservation['next-server'] = host_config['option'][
+ 'bootfile_server'
+ ]
reservations.append(reservation)
out['reservations'] = reservations
return out
+
def kea6_parse_options(config):
options = []
@@ -205,7 +228,9 @@ def kea6_parse_options(config):
if node not in config:
continue
- value = ", ".join(config[node]) if isinstance(config[node], list) else config[node]
+ value = (
+ ', '.join(config[node]) if isinstance(config[node], list) else config[node]
+ )
options.append({'name': option_name, 'data': value})
if 'sip_server' in config:
@@ -221,17 +246,20 @@ def kea6_parse_options(config):
hosts.append(server)
if addrs:
- options.append({'name': 'sip-server-addr', 'data': ", ".join(addrs)})
+ options.append({'name': 'sip-server-addr', 'data': ', '.join(addrs)})
if hosts:
- options.append({'name': 'sip-server-dns', 'data': ", ".join(hosts)})
+ options.append({'name': 'sip-server-dns', 'data': ', '.join(hosts)})
cisco_tftp = dict_search_args(config, 'vendor_option', 'cisco', 'tftp-server')
if cisco_tftp:
- options.append({'name': 'tftp-servers', 'code': 2, 'space': 'cisco', 'data': cisco_tftp})
+ options.append(
+ {'name': 'tftp-servers', 'code': 2, 'space': 'cisco', 'data': cisco_tftp}
+ )
return options
+
def kea6_parse_subnet(subnet, config):
out = {'subnet': subnet, 'id': int(config['subnet_id'])}
@@ -269,12 +297,14 @@ def kea6_parse_subnet(subnet, config):
pd_pool = {
'prefix': prefix,
'prefix-len': int(pd_conf['prefix_length']),
- 'delegated-len': int(pd_conf['delegated_length'])
+ 'delegated-len': int(pd_conf['delegated_length']),
}
if 'excluded_prefix' in pd_conf:
pd_pool['excluded-prefix'] = pd_conf['excluded_prefix']
- pd_pool['excluded-prefix-len'] = int(pd_conf['excluded_prefix_length'])
+ pd_pool['excluded-prefix-len'] = int(
+ pd_conf['excluded_prefix_length']
+ )
pd_pools.append(pd_pool)
@@ -294,9 +324,7 @@ def kea6_parse_subnet(subnet, config):
if 'disable' in host_config:
continue
- reservation = {
- 'hostname': host
- }
+ reservation = {'hostname': host}
if 'mac' in host_config:
reservation['hw-address'] = host_config['mac']
@@ -305,10 +333,10 @@ def kea6_parse_subnet(subnet, config):
reservation['duid'] = host_config['duid']
if 'ipv6_address' in host_config:
- reservation['ip-addresses'] = [ host_config['ipv6_address'] ]
+ reservation['ip-addresses'] = [host_config['ipv6_address']]
if 'ipv6_prefix' in host_config:
- reservation['prefixes'] = [ host_config['ipv6_prefix'] ]
+ reservation['prefixes'] = [host_config['ipv6_prefix']]
if 'option' in host_config:
reservation['option-data'] = kea6_parse_options(host_config['option'])
@@ -319,6 +347,7 @@ def kea6_parse_subnet(subnet, config):
return out
+
def _ctrl_socket_command(inet, command, args=None):
path = kea_ctrl_socket.format(inet=inet)
@@ -345,6 +374,7 @@ def _ctrl_socket_command(inet, command, args=None):
return json.loads(result.decode('utf-8'))
+
def kea_get_leases(inet):
leases = _ctrl_socket_command(inet, f'lease{inet}-get-all')
@@ -353,6 +383,7 @@ def kea_get_leases(inet):
return leases['arguments']['leases']
+
def kea_delete_lease(inet, ip_address):
args = {'ip-address': ip_address}
@@ -363,6 +394,7 @@ def kea_delete_lease(inet, ip_address):
return False
+
def kea_get_active_config(inet):
config = _ctrl_socket_command(inet, 'config-get')
@@ -371,12 +403,18 @@ def kea_get_active_config(inet):
return config
+
def kea_get_dhcp_pools(config, inet):
- shared_networks = dict_search_args(config, 'arguments', f'Dhcp{inet}', 'shared-networks')
+ shared_networks = dict_search_args(
+ config, 'arguments', f'Dhcp{inet}', 'shared-networks'
+ )
return [network['name'] for network in shared_networks] if shared_networks else []
+
def kea_get_pool_from_subnet_id(config, inet, subnet_id):
- shared_networks = dict_search_args(config, 'arguments', f'Dhcp{inet}', 'shared-networks')
+ shared_networks = dict_search_args(
+ config, 'arguments', f'Dhcp{inet}', 'shared-networks'
+ )
if not shared_networks:
return None
@@ -397,7 +435,9 @@ def kea_get_static_mappings(config, inet, pools=[]) -> list:
Get DHCP static mapping from active Kea DHCPv4 or DHCPv6 configuration
:return list
"""
- shared_networks = dict_search_args(config, 'arguments', f'Dhcp{inet}', 'shared-networks')
+ shared_networks = dict_search_args(
+ config, 'arguments', f'Dhcp{inet}', 'shared-networks'
+ )
mappings = []
@@ -414,7 +454,9 @@ def kea_get_static_mappings(config, inet, pools=[]) -> list:
mapping = {'pool': p, 'subnet': subnet['subnet']}
mapping.update(reservation)
# rename 'ip(v6)-address' to 'ip', inet6 has 'ipv6-address' and inet has 'ip-address'
- mapping['ip'] = mapping.pop('ipv6-address', mapping.pop('ip-address', None))
+ mapping['ip'] = mapping.pop(
+ 'ipv6-address', mapping.pop('ip-address', None)
+ )
# rename 'hw-address' to 'mac'
mapping['mac'] = mapping.pop('hw-address', None)
mappings.append(mapping)
@@ -432,18 +474,28 @@ def kea_get_server_leases(config, inet, pools=[], state=[], origin=None) -> list
data = []
for lease in leases:
lifetime = lease['valid-lft']
- expiry = (lease['cltt'] + lifetime)
+ expiry = lease['cltt'] + lifetime
- lease['start_timestamp'] = datetime.fromtimestamp(expiry - lifetime, timezone.utc)
- lease['expire_timestamp'] = datetime.fromtimestamp(expiry, timezone.utc) if expiry else None
+ lease['start_timestamp'] = datetime.fromtimestamp(
+ expiry - lifetime, timezone.utc
+ )
+ lease['expire_timestamp'] = (
+ datetime.fromtimestamp(expiry, timezone.utc) if expiry else None
+ )
data_lease = {}
data_lease['ip'] = lease['ip-address']
lease_state_long = {0: 'active', 1: 'rejected', 2: 'expired'}
data_lease['state'] = lease_state_long[lease['state']]
- data_lease['pool'] = kea_get_pool_from_subnet_id(config, inet, lease['subnet-id']) if config else '-'
- data_lease['end'] = lease['expire_timestamp'].timestamp() if lease['expire_timestamp'] else None
- data_lease['origin'] = 'local' # TODO: Determine remote in HA
+ data_lease['pool'] = (
+ kea_get_pool_from_subnet_id(config, inet, lease['subnet-id'])
+ if config
+ else '-'
+ )
+ data_lease['end'] = (
+ lease['expire_timestamp'].timestamp() if lease['expire_timestamp'] else None
+ )
+ data_lease['origin'] = 'local' # TODO: Determine remote in HA
# remove trailing dot in 'hostname' to ensure consistency for `vyos-hostsd-client`
data_lease['hostname'] = lease.get('hostname', '-').rstrip('.')
@@ -463,7 +515,9 @@ def kea_get_server_leases(config, inet, pools=[], state=[], origin=None) -> list
data_lease['remaining'] = '-'
if lease['valid-lft'] > 0:
- data_lease['remaining'] = lease['expire_timestamp'] - datetime.now(timezone.utc)
+ data_lease['remaining'] = lease['expire_timestamp'] - datetime.now(
+ timezone.utc
+ )
if data_lease['remaining'].days >= 0:
# substraction gives us a timedelta object which can't be formatted with strftime
diff --git a/src/op_mode/dhcp.py b/src/op_mode/dhcp.py
index 0717fc333..b3d7d4dd3 100755
--- a/src/op_mode/dhcp.py
+++ b/src/op_mode/dhcp.py
@@ -37,25 +37,59 @@ from vyos.kea import kea_delete_lease
from vyos.utils.process import call
from vyos.utils.process import is_systemd_service_running
-time_string = "%a %b %d %H:%M:%S %Z %Y"
+time_string = '%a %b %d %H:%M:%S %Z %Y'
config = ConfigTreeQuery()
-lease_valid_states = ['all', 'active', 'free', 'expired', 'released', 'abandoned', 'reset', 'backup']
-sort_valid_inet = ['end', 'mac', 'hostname', 'ip', 'pool', 'remaining', 'start', 'state']
-sort_valid_inet6 = ['end', 'duid', 'ip', 'last_communication', 'pool', 'remaining', 'state', 'type']
+lease_valid_states = [
+ 'all',
+ 'active',
+ 'free',
+ 'expired',
+ 'released',
+ 'abandoned',
+ 'reset',
+ 'backup',
+]
+sort_valid_inet = [
+ 'end',
+ 'mac',
+ 'hostname',
+ 'ip',
+ 'pool',
+ 'remaining',
+ 'start',
+ 'state',
+]
+sort_valid_inet6 = [
+ 'end',
+ 'duid',
+ 'ip',
+ 'last_communication',
+ 'pool',
+ 'remaining',
+ 'state',
+ 'type',
+]
mapping_sort_valid = ['mac', 'ip', 'pool', 'duid']
stale_warn_msg = 'DHCP server is configured but not started. Data may be stale.'
ArgFamily = typing.Literal['inet', 'inet6']
-ArgState = typing.Literal['all', 'active', 'free', 'expired', 'released', 'abandoned', 'reset', 'backup']
+ArgState = typing.Literal[
+ 'all', 'active', 'free', 'expired', 'released', 'abandoned', 'reset', 'backup'
+]
ArgOrigin = typing.Literal['local', 'remote']
+
def _utc_to_local(utc_dt):
- return datetime.fromtimestamp((datetime.fromtimestamp(utc_dt) - datetime(1970, 1, 1)).total_seconds())
+ return datetime.fromtimestamp(
+ (datetime.fromtimestamp(utc_dt) - datetime(1970, 1, 1)).total_seconds()
+ )
-def _get_raw_server_leases(config, family='inet', pool=None, sorted=None, state=[], origin=None) -> list:
+def _get_raw_server_leases(
+ config, family='inet', pool=None, sorted=None, state=[], origin=None
+) -> list:
inet_suffix = '6' if family == 'inet6' else '4'
pools = [pool] if pool else kea_get_dhcp_pools(config, inet_suffix)
@@ -63,9 +97,9 @@ def _get_raw_server_leases(config, family='inet', pool=None, sorted=None, state=
if sorted:
if sorted == 'ip':
- mappings.sort(key = lambda x:ip_address(x['ip']))
+ mappings.sort(key=lambda x: ip_address(x['ip']))
else:
- mappings.sort(key = lambda x:x[sorted])
+ mappings.sort(key=lambda x: x[sorted])
return mappings
@@ -77,34 +111,55 @@ def _get_formatted_server_leases(raw_data, family='inet'):
hw_addr = lease.get('mac')
state = lease.get('state')
start = lease.get('start')
- start = _utc_to_local(start).strftime('%Y/%m/%d %H:%M:%S')
+ start = _utc_to_local(start).strftime('%Y/%m/%d %H:%M:%S')
end = lease.get('end')
- end = _utc_to_local(end).strftime('%Y/%m/%d %H:%M:%S') if end else '-'
+ end = _utc_to_local(end).strftime('%Y/%m/%d %H:%M:%S') if end else '-'
remain = lease.get('remaining')
pool = lease.get('pool')
hostname = lease.get('hostname')
origin = lease.get('origin')
- data_entries.append([ipaddr, hw_addr, state, start, end, remain, pool, hostname, origin])
-
- headers = ['IP Address', 'MAC address', 'State', 'Lease start', 'Lease expiration', 'Remaining', 'Pool',
- 'Hostname', 'Origin']
+ data_entries.append(
+ [ipaddr, hw_addr, state, start, end, remain, pool, hostname, origin]
+ )
+
+ headers = [
+ 'IP Address',
+ 'MAC address',
+ 'State',
+ 'Lease start',
+ 'Lease expiration',
+ 'Remaining',
+ 'Pool',
+ 'Hostname',
+ 'Origin',
+ ]
if family == 'inet6':
for lease in raw_data:
ipaddr = lease.get('ip')
state = lease.get('state')
start = lease.get('last_communication')
- start = _utc_to_local(start).strftime('%Y/%m/%d %H:%M:%S')
+ start = _utc_to_local(start).strftime('%Y/%m/%d %H:%M:%S')
end = lease.get('end')
- end = _utc_to_local(end).strftime('%Y/%m/%d %H:%M:%S')
+ end = _utc_to_local(end).strftime('%Y/%m/%d %H:%M:%S')
remain = lease.get('remaining')
lease_type = lease.get('type')
pool = lease.get('pool')
host_identifier = lease.get('duid')
- data_entries.append([ipaddr, state, start, end, remain, lease_type, pool, host_identifier])
-
- headers = ['IPv6 address', 'State', 'Last communication', 'Lease expiration', 'Remaining', 'Type', 'Pool',
- 'DUID']
+ data_entries.append(
+ [ipaddr, state, start, end, remain, lease_type, pool, host_identifier]
+ )
+
+ headers = [
+ 'IPv6 address',
+ 'State',
+ 'Last communication',
+ 'Lease expiration',
+ 'Remaining',
+ 'Type',
+ 'Pool',
+ 'DUID',
+ ]
output = tabulate(data_entries, headers, numalign='left')
return output
@@ -138,8 +193,13 @@ def _get_raw_server_pool_statistics(config, family='inet', pool=None):
size = _get_pool_size(family=family, pool=p)
leases = len(_get_raw_server_leases(config, family=family, pool=p))
use_percentage = round(leases / size * 100) if size != 0 else 0
- pool_stats = {'pool': p, 'size': size, 'leases': leases,
- 'available': (size - leases), 'use_percentage': use_percentage}
+ pool_stats = {
+ 'pool': p,
+ 'size': size,
+ 'leases': leases,
+ 'available': (size - leases),
+ 'use_percentage': use_percentage,
+ }
stats.append(pool_stats)
return stats
@@ -155,13 +215,12 @@ def _get_formatted_server_pool_statistics(pool_data, family='inet'):
use_percentage = f'{use_percentage}%'
data_entries.append([pool, size, leases, available, use_percentage])
- headers = ['Pool', 'Size','Leases', 'Available', 'Usage']
+ headers = ['Pool', 'Size', 'Leases', 'Available', 'Usage']
output = tabulate(data_entries, headers, numalign='left')
return output
def _get_raw_server_static_mappings(config, family='inet', pool=None, sorted=None):
-
inet_suffix = '6' if family == 'inet6' else '4'
pools = [pool] if pool else kea_get_dhcp_pools(config, inet_suffix)
@@ -169,9 +228,9 @@ def _get_raw_server_static_mappings(config, family='inet', pool=None, sorted=Non
if sorted:
if sorted == 'ip':
- mappings.sort(key = lambda x:ip_address(x['ip']))
+ mappings.sort(key=lambda x: ip_address(x['ip']))
else:
- mappings.sort(key = lambda x:x[sorted])
+ mappings.sort(key=lambda x: x[sorted])
return mappings
@@ -186,12 +245,23 @@ def _get_formatted_server_static_mappings(raw_data, family='inet'):
mac_addr = entry.get('mac', 'N/A')
duid = entry.get('duid', 'N/A')
description = entry.get('description', 'N/A')
- data_entries.append([pool, subnet, hostname, ip_addr, mac_addr, duid, description])
-
- headers = ['Pool', 'Subnet', 'Hostname', 'IP Address', 'MAC Address', 'DUID', 'Description']
+ data_entries.append(
+ [pool, subnet, hostname, ip_addr, mac_addr, duid, description]
+ )
+
+ headers = [
+ 'Pool',
+ 'Subnet',
+ 'Hostname',
+ 'IP Address',
+ 'MAC Address',
+ 'DUID',
+ 'Description',
+ ]
output = tabulate(data_entries, headers, numalign='left')
return output
+
def _verify_server(func):
"""Decorator checks if DHCP(v6) config exists"""
from functools import wraps
@@ -206,8 +276,10 @@ def _verify_server(func):
if not config.exists(f'service dhcp{v}-server'):
raise vyos.opmode.UnconfiguredSubsystem(unconf_message)
return func(*args, **kwargs)
+
return _wrapper
+
def _verify_client(func):
"""Decorator checks if interface is configured as DHCP client"""
from functools import wraps
@@ -226,11 +298,14 @@ def _verify_client(func):
if not config.exists(f'interfaces {interface_path} address dhcp{v}'):
raise vyos.opmode.UnconfiguredObject(unconf_message)
return func(*args, **kwargs)
+
return _wrapper
-@_verify_server
-def show_server_pool_statistics(raw: bool, family: ArgFamily, pool: typing.Optional[str]):
+@_verify_server
+def show_server_pool_statistics(
+ raw: bool, family: ArgFamily, pool: typing.Optional[str]
+):
v = 'v6' if family == 'inet6' else ''
inet_suffix = '6' if family == 'inet6' else '4'
@@ -255,10 +330,14 @@ def show_server_pool_statistics(raw: bool, family: ArgFamily, pool: typing.Optio
@_verify_server
-def show_server_leases(raw: bool, family: ArgFamily, pool: typing.Optional[str],
- sorted: typing.Optional[str], state: typing.Optional[ArgState],
- origin: typing.Optional[ArgOrigin] ):
-
+def show_server_leases(
+ raw: bool,
+ family: ArgFamily,
+ pool: typing.Optional[str],
+ sorted: typing.Optional[str],
+ state: typing.Optional[ArgState],
+ origin: typing.Optional[ArgOrigin],
+):
v = 'v6' if family == 'inet6' else ''
inet_suffix = '6' if family == 'inet6' else '4'
@@ -282,15 +361,27 @@ def show_server_leases(raw: bool, family: ArgFamily, pool: typing.Optional[str],
if state and state not in lease_valid_states:
raise vyos.opmode.IncorrectValue(f'DHCP{v} state "{state}" is invalid!')
- lease_data = _get_raw_server_leases(config=active_config, family=family, pool=pool, sorted=sorted, state=state, origin=origin)
+ lease_data = _get_raw_server_leases(
+ config=active_config,
+ family=family,
+ pool=pool,
+ sorted=sorted,
+ state=state,
+ origin=origin,
+ )
if raw:
return lease_data
else:
return _get_formatted_server_leases(lease_data, family=family)
+
@_verify_server
-def show_server_static_mappings(raw: bool, family: ArgFamily, pool: typing.Optional[str],
- sorted: typing.Optional[str]):
+def show_server_static_mappings(
+ raw: bool,
+ family: ArgFamily,
+ pool: typing.Optional[str],
+ sorted: typing.Optional[str],
+):
v = 'v6' if family == 'inet6' else ''
inet_suffix = '6' if family == 'inet6' else '4'
@@ -310,16 +401,20 @@ def show_server_static_mappings(raw: bool, family: ArgFamily, pool: typing.Optio
if sorted and sorted not in mapping_sort_valid:
raise vyos.opmode.IncorrectValue(f'DHCP{v} sort "{sorted}" is invalid!')
- static_mappings = _get_raw_server_static_mappings(config=active_config, family=family, pool=pool, sorted=sorted)
+ static_mappings = _get_raw_server_static_mappings(
+ config=active_config, family=family, pool=pool, sorted=sorted
+ )
if raw:
return static_mappings
else:
return _get_formatted_server_static_mappings(static_mappings, family=family)
+
def _lease_valid(inet, address):
leases = kea_get_leases(inet)
return any(lease['ip-address'] == address for lease in leases)
+
@_verify_server
def clear_dhcp_server_lease(family: ArgFamily, address: str):
v = 'v6' if family == 'inet6' else ''
@@ -335,6 +430,7 @@ def clear_dhcp_server_lease(family: ArgFamily, address: str):
print(f'Lease "{address}" has been cleared')
+
def _get_raw_client_leases(family='inet', interface=None):
from time import mktime
from datetime import datetime
@@ -363,21 +459,28 @@ def _get_raw_client_leases(family='inet', interface=None):
# format this makes less sense for an API and also the expiry
# timestamp is provided in UNIX time. Convert string (e.g. Sun Jul
# 30 18:13:44 CEST 2023) to UNIX time (1690733624)
- tmp.update({'last_update' : int(mktime(datetime.strptime(line, time_string).timetuple()))})
+ tmp.update(
+ {
+ 'last_update': int(
+ mktime(datetime.strptime(line, time_string).timetuple())
+ )
+ }
+ )
continue
k, v = line.split('=')
- tmp.update({k : v.replace("'", "")})
+ tmp.update({k: v.replace("'", '')})
if 'interface' in tmp:
vrf = get_interface_vrf(tmp['interface'])
if vrf:
- tmp.update({'vrf' : vrf})
+ tmp.update({'vrf': vrf})
lease_data.append(tmp)
return lease_data
+
def _get_formatted_client_leases(lease_data, family):
from time import localtime
from time import strftime
@@ -388,30 +491,34 @@ def _get_formatted_client_leases(lease_data, family):
for lease in lease_data:
if not lease.get('new_ip_address'):
continue
- data_entries.append(["Interface", lease['interface']])
+ data_entries.append(['Interface', lease['interface']])
if 'new_ip_address' in lease:
- tmp = '[Active]' if is_intf_addr_assigned(lease['interface'], lease['new_ip_address']) else '[Inactive]'
- data_entries.append(["IP address", lease['new_ip_address'], tmp])
+ tmp = (
+ '[Active]'
+ if is_intf_addr_assigned(lease['interface'], lease['new_ip_address'])
+ else '[Inactive]'
+ )
+ data_entries.append(['IP address', lease['new_ip_address'], tmp])
if 'new_subnet_mask' in lease:
- data_entries.append(["Subnet Mask", lease['new_subnet_mask']])
+ data_entries.append(['Subnet Mask', lease['new_subnet_mask']])
if 'new_domain_name' in lease:
- data_entries.append(["Domain Name", lease['new_domain_name']])
+ data_entries.append(['Domain Name', lease['new_domain_name']])
if 'new_routers' in lease:
- data_entries.append(["Router", lease['new_routers']])
+ data_entries.append(['Router', lease['new_routers']])
if 'new_domain_name_servers' in lease:
- data_entries.append(["Name Server", lease['new_domain_name_servers']])
+ data_entries.append(['Name Server', lease['new_domain_name_servers']])
if 'new_dhcp_server_identifier' in lease:
- data_entries.append(["DHCP Server", lease['new_dhcp_server_identifier']])
+ data_entries.append(['DHCP Server', lease['new_dhcp_server_identifier']])
if 'new_dhcp_lease_time' in lease:
- data_entries.append(["DHCP Server", lease['new_dhcp_lease_time']])
+ data_entries.append(['DHCP Server', lease['new_dhcp_lease_time']])
if 'vrf' in lease:
- data_entries.append(["VRF", lease['vrf']])
+ data_entries.append(['VRF', lease['vrf']])
if 'last_update' in lease:
tmp = strftime(time_string, localtime(int(lease['last_update'])))
- data_entries.append(["Last Update", tmp])
+ data_entries.append(['Last Update', tmp])
if 'new_expiry' in lease:
tmp = strftime(time_string, localtime(int(lease['new_expiry'])))
- data_entries.append(["Expiry", tmp])
+ data_entries.append(['Expiry', tmp])
# Add empty marker
data_entries.append([''])
@@ -420,6 +527,7 @@ def _get_formatted_client_leases(lease_data, family):
return output
+
def show_client_leases(raw: bool, family: ArgFamily, interface: typing.Optional[str]):
lease_data = _get_raw_client_leases(family=family, interface=interface)
if raw:
@@ -427,6 +535,7 @@ def show_client_leases(raw: bool, family: ArgFamily, interface: typing.Optional[
else:
return _get_formatted_client_leases(lease_data, family=family)
+
@_verify_client
def renew_client_lease(raw: bool, family: ArgFamily, interface: str):
if not raw:
@@ -437,6 +546,7 @@ def renew_client_lease(raw: bool, family: ArgFamily, interface: str):
else:
call(f'systemctl restart dhclient@{interface}.service')
+
@_verify_client
def release_client_lease(raw: bool, family: ArgFamily, interface: str):
if not raw:
@@ -447,6 +557,7 @@ def release_client_lease(raw: bool, family: ArgFamily, interface: str):
else:
call(f'systemctl stop dhclient@{interface}.service')
+
if __name__ == '__main__':
try:
res = vyos.opmode.run(sys.modules[__name__])