summaryrefslogtreecommitdiff
path: root/src/op_mode/dhcp.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/op_mode/dhcp.py')
-rwxr-xr-xsrc/op_mode/dhcp.py201
1 files changed, 158 insertions, 43 deletions
diff --git a/src/op_mode/dhcp.py b/src/op_mode/dhcp.py
index 07e9b7d6c..77f38992b 100755
--- a/src/op_mode/dhcp.py
+++ b/src/op_mode/dhcp.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2022 VyOS maintainers and contributors
+# Copyright (C) 2022-2023 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -14,35 +14,35 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import os
import sys
-from ipaddress import ip_address
import typing
from datetime import datetime
-from sys import exit
-from tabulate import tabulate
+from glob import glob
+from ipaddress import ip_address
from isc_dhcp_leases import IscDhcpLeases
+from tabulate import tabulate
+
+import vyos.opmode
from vyos.base import Warning
from vyos.configquery import ConfigTreeQuery
-from vyos.util import cmd
-from vyos.util import dict_search
-from vyos.util import is_systemd_service_running
-
-import vyos.opmode
+from vyos.utils.dict import dict_search
+from vyos.utils.file import read_file
+from vyos.utils.process import cmd
+from vyos.utils.process import is_systemd_service_running
+time_string = "%a %b %d %H:%M:%S %Z %Y"
config = ConfigTreeQuery()
-pool_key = "shared-networkname"
-
-
-def _in_pool(lease, pool):
- if pool_key in lease.sets:
- if lease.sets[pool_key] == pool:
- return True
- return False
+lease_valid_states = ['all', 'active', 'free', 'expired', 'released', 'abandoned', 'reset', 'backup']
+sort_valid_inet = ['end', 'mac', 'hostname', 'ip', 'pool', 'remaining', 'start', 'state']
+sort_valid_inet6 = ['end', 'iaid_duid', 'ip', 'last_communication', 'pool', 'remaining', 'state', 'type']
+ArgFamily = typing.Literal['inet', 'inet6']
+ArgState = typing.Literal['all', 'active', 'free', 'expired', 'released', 'abandoned', 'reset', 'backup']
def _utc_to_local(utc_dt):
return datetime.fromtimestamp((datetime.fromtimestamp(utc_dt) - datetime(1970, 1, 1)).total_seconds())
@@ -71,7 +71,7 @@ def _find_list_of_dict_index(lst, key='ip', value='') -> int:
return idx
-def _get_raw_server_leases(family, pool=None) -> list:
+def _get_raw_server_leases(family='inet', pool=None, sorted=None, state=[]) -> list:
"""
Get DHCP server leases
:return list
@@ -79,18 +79,21 @@ def _get_raw_server_leases(family, pool=None) -> list:
lease_file = '/config/dhcpdv6.leases' if family == 'inet6' else '/config/dhcpd.leases'
data = []
leases = IscDhcpLeases(lease_file).get()
- if pool is not None:
- if config.exists(f'service dhcp-server shared-network-name {pool}'):
- leases = list(filter(lambda x: _in_pool(x, pool), leases))
+
+ if pool is None:
+ pool = _get_dhcp_pools(family=family)
+ else:
+ pool = [pool]
+
for lease in leases:
data_lease = {}
data_lease['ip'] = lease.ip
data_lease['state'] = lease.binding_state
data_lease['pool'] = lease.sets.get('shared-networkname', '')
- data_lease['end'] = lease.end.timestamp()
+ data_lease['end'] = lease.end.timestamp() if lease.end else None
if family == 'inet':
- data_lease['hardware'] = lease.ethernet
+ data_lease['mac'] = lease.ethernet
data_lease['start'] = lease.start.timestamp()
data_lease['hostname'] = lease.hostname
@@ -100,18 +103,20 @@ def _get_raw_server_leases(family, pool=None) -> list:
lease_types_long = {'na': 'non-temporary', 'ta': 'temporary', 'pd': 'prefix delegation'}
data_lease['type'] = lease_types_long[lease.type]
- data_lease['remaining'] = lease.end - datetime.utcnow()
+ data_lease['remaining'] = '-'
- if data_lease['remaining'].days >= 0:
- # substraction gives us a timedelta object which can't be formatted with strftime
- # so we use str(), split gets rid of the microseconds
- data_lease['remaining'] = str(data_lease["remaining"]).split('.')[0]
- else:
- data_lease['remaining'] = ''
+ if lease.end:
+ data_lease['remaining'] = lease.end - datetime.utcnow()
+
+ if data_lease['remaining'].days >= 0:
+ # substraction gives us a timedelta object which can't be formatted with strftime
+ # so we use str(), split gets rid of the microseconds
+ data_lease['remaining'] = str(data_lease["remaining"]).split('.')[0]
# Do not add old leases
- if data_lease['remaining'] != '':
- data.append(data_lease)
+ if data_lease['remaining'] != '' and data_lease['pool'] in pool and data_lease['state'] != 'free':
+ if not state or data_lease['state'] in state:
+ data.append(data_lease)
# deduplicate
checked = []
@@ -123,26 +128,31 @@ def _get_raw_server_leases(family, pool=None) -> list:
idx = _find_list_of_dict_index(data, key='ip', value=addr)
data.pop(idx)
+ if sorted:
+ if sorted == 'ip':
+ data.sort(key = lambda x:ip_address(x['ip']))
+ else:
+ data.sort(key = lambda x:x[sorted])
return data
-def _get_formatted_server_leases(raw_data, family):
+def _get_formatted_server_leases(raw_data, family='inet'):
data_entries = []
if family == 'inet':
for lease in raw_data:
ipaddr = lease.get('ip')
- hw_addr = lease.get('hardware')
+ hw_addr = lease.get('mac')
state = lease.get('state')
start = lease.get('start')
start = _utc_to_local(start).strftime('%Y/%m/%d %H:%M:%S')
end = lease.get('end')
- end = _utc_to_local(end).strftime('%Y/%m/%d %H:%M:%S')
+ end = _utc_to_local(end).strftime('%Y/%m/%d %H:%M:%S') if end else '-'
remain = lease.get('remaining')
pool = lease.get('pool')
hostname = lease.get('hostname')
data_entries.append([ipaddr, hw_addr, state, start, end, remain, pool, hostname])
- headers = ['IP Address', 'Hardware address', 'State', 'Lease start', 'Lease expiration', 'Remaining', 'Pool',
+ headers = ['IP Address', 'MAC address', 'State', 'Lease start', 'Lease expiration', 'Remaining', 'Pool',
'Hostname']
if family == 'inet6':
@@ -247,7 +257,7 @@ def _verify(func):
@_verify
-def show_pool_statistics(raw: bool, family: str, pool: typing.Optional[str]):
+def show_pool_statistics(raw: bool, family: ArgFamily, pool: typing.Optional[str]):
pool_data = _get_raw_pool_statistics(family=family, pool=pool)
if raw:
return pool_data
@@ -256,17 +266,122 @@ def show_pool_statistics(raw: bool, family: str, pool: typing.Optional[str]):
@_verify
-def show_server_leases(raw: bool, family: str):
+def show_server_leases(raw: bool, family: ArgFamily, pool: typing.Optional[str],
+ sorted: typing.Optional[str], state: typing.Optional[ArgState]):
# if dhcp server is down, inactive leases may still be shown as active, so warn the user.
- if not is_systemd_service_running('isc-dhcp-server.service'):
- Warning('DHCP server is configured but not started. Data may be stale.')
+ v = '6' if family == 'inet6' else ''
+ service_name = 'DHCPv6' if family == 'inet6' else 'DHCP'
+ if not is_systemd_service_running(f'isc-dhcp-server{v}.service'):
+ Warning(f'{service_name} server is configured but not started. Data may be stale.')
+
+ v = 'v6' if family == 'inet6' else ''
+ if pool and pool not in _get_dhcp_pools(family=family):
+ raise vyos.opmode.IncorrectValue(f'DHCP{v} pool "{pool}" does not exist!')
+
+ if state and state not in lease_valid_states:
+ raise vyos.opmode.IncorrectValue(f'DHCP{v} state "{state}" is invalid!')
+
+ sort_valid = sort_valid_inet6 if family == 'inet6' else sort_valid_inet
+ if sorted and sorted not in sort_valid:
+ raise vyos.opmode.IncorrectValue(f'DHCP{v} sort "{sorted}" is invalid!')
- leases = _get_raw_server_leases(family)
+ lease_data = _get_raw_server_leases(family=family, pool=pool, sorted=sorted, state=state)
if raw:
- return leases
+ return lease_data
else:
- return _get_formatted_server_leases(leases, family)
+ return _get_formatted_server_leases(lease_data, family=family)
+
+
+def _get_raw_client_leases(family='inet', interface=None):
+ from time import mktime
+ from datetime import datetime
+ from vyos.defaults import directories
+ from vyos.utils.network import get_interface_vrf
+
+ lease_dir = directories['isc_dhclient_dir']
+ lease_files = []
+ lease_data = []
+
+ if interface:
+ tmp = f'{lease_dir}/dhclient_{interface}.lease'
+ if os.path.exists(tmp):
+ lease_files.append(tmp)
+ else:
+ # All DHCP leases
+ lease_files = glob(f'{lease_dir}/dhclient_*.lease')
+
+ for lease in lease_files:
+ tmp = {}
+ with open(lease, 'r') as f:
+ for line in f.readlines():
+ line = line.rstrip()
+ if 'last_update' not in tmp:
+ # ISC dhcp client contains least_update timestamp in human readable
+ # format this makes less sense for an API and also the expiry
+ # timestamp is provided in UNIX time. Convert string (e.g. Sun Jul
+ # 30 18:13:44 CEST 2023) to UNIX time (1690733624)
+ tmp.update({'last_update' : int(mktime(datetime.strptime(line, time_string).timetuple()))})
+ continue
+
+ k, v = line.split('=')
+ tmp.update({k : v.replace("'", "")})
+
+ if 'interface' in tmp:
+ vrf = get_interface_vrf(tmp['interface'])
+ if vrf: tmp.update({'vrf' : vrf})
+ lease_data.append(tmp)
+
+ return lease_data
+
+def _get_formatted_client_leases(lease_data, family):
+ from time import localtime
+ from time import strftime
+
+ from vyos.utils.network import is_intf_addr_assigned
+
+ data_entries = []
+ for lease in lease_data:
+ if not lease.get('new_ip_address'):
+ continue
+ data_entries.append(["Interface", lease['interface']])
+ if 'new_ip_address' in lease:
+ tmp = '[Active]' if is_intf_addr_assigned(lease['interface'], lease['new_ip_address']) else '[Inactive]'
+ data_entries.append(["IP address", lease['new_ip_address'], tmp])
+ if 'new_subnet_mask' in lease:
+ data_entries.append(["Subnet Mask", lease['new_subnet_mask']])
+ if 'new_domain_name' in lease:
+ data_entries.append(["Domain Name", lease['new_domain_name']])
+ if 'new_routers' in lease:
+ data_entries.append(["Router", lease['new_routers']])
+ if 'new_domain_name_servers' in lease:
+ data_entries.append(["Name Server", lease['new_domain_name_servers']])
+ if 'new_dhcp_server_identifier' in lease:
+ data_entries.append(["DHCP Server", lease['new_dhcp_server_identifier']])
+ if 'new_dhcp_lease_time' in lease:
+ data_entries.append(["DHCP Server", lease['new_dhcp_lease_time']])
+ if 'vrf' in lease:
+ data_entries.append(["VRF", lease['vrf']])
+ if 'last_update' in lease:
+ tmp = strftime(time_string, localtime(int(lease['last_update'])))
+ data_entries.append(["Last Update", tmp])
+ if 'new_expiry' in lease:
+ tmp = strftime(time_string, localtime(int(lease['new_expiry'])))
+ data_entries.append(["Expiry", tmp])
+
+ # Add empty marker
+ data_entries.append([''])
+
+ output = tabulate(data_entries, tablefmt='plain')
+
+ return output
+
+def show_client_leases(raw: bool, family: ArgFamily, interface: typing.Optional[str]):
+ lease_data = _get_raw_client_leases(family=family, interface=interface)
+ if raw:
+ return lease_data
+ else:
+ return _get_formatted_client_leases(lease_data, family=family)
if __name__ == '__main__':
try: