diff options
Diffstat (limited to 'src/op_mode/dhcp.py')
-rwxr-xr-x | src/op_mode/dhcp.py | 71 |
1 files changed, 42 insertions, 29 deletions
diff --git a/src/op_mode/dhcp.py b/src/op_mode/dhcp.py index 07e9b7d6c..b9e6e7bc9 100755 --- a/src/op_mode/dhcp.py +++ b/src/op_mode/dhcp.py @@ -15,13 +15,14 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. import sys -from ipaddress import ip_address import typing from datetime import datetime -from sys import exit -from tabulate import tabulate +from ipaddress import ip_address from isc_dhcp_leases import IscDhcpLeases +from tabulate import tabulate + +import vyos.opmode from vyos.base import Warning from vyos.configquery import ConfigTreeQuery @@ -30,19 +31,10 @@ from vyos.util import cmd from vyos.util import dict_search from vyos.util import is_systemd_service_running -import vyos.opmode - - config = ConfigTreeQuery() -pool_key = "shared-networkname" - - -def _in_pool(lease, pool): - if pool_key in lease.sets: - if lease.sets[pool_key] == pool: - return True - return False - +lease_valid_states = ['all', 'active', 'free', 'expired', 'released', 'abandoned', 'reset', 'backup'] +sort_valid_inet = ['end', 'mac', 'hostname', 'ip', 'pool', 'remaining', 'start', 'state'] +sort_valid_inet6 = ['end', 'iaid_duid', 'ip', 'last_communication', 'pool', 'remaining', 'state', 'type'] def _utc_to_local(utc_dt): return datetime.fromtimestamp((datetime.fromtimestamp(utc_dt) - datetime(1970, 1, 1)).total_seconds()) @@ -71,7 +63,7 @@ def _find_list_of_dict_index(lst, key='ip', value='') -> int: return idx -def _get_raw_server_leases(family, pool=None) -> list: +def _get_raw_server_leases(family='inet', pool=None, sorted=None, state=[]) -> list: """ Get DHCP server leases :return list @@ -79,9 +71,12 @@ def _get_raw_server_leases(family, pool=None) -> list: lease_file = '/config/dhcpdv6.leases' if family == 'inet6' else '/config/dhcpd.leases' data = [] leases = IscDhcpLeases(lease_file).get() - if pool is not None: - if config.exists(f'service dhcp-server shared-network-name {pool}'): - leases = list(filter(lambda x: _in_pool(x, pool), leases)) + + if pool is None: + pool = _get_dhcp_pools(family=family) + else: + pool = [pool] + for lease in leases: data_lease = {} data_lease['ip'] = lease.ip @@ -90,7 +85,7 @@ def _get_raw_server_leases(family, pool=None) -> list: data_lease['end'] = lease.end.timestamp() if family == 'inet': - data_lease['hardware'] = lease.ethernet + data_lease['mac'] = lease.ethernet data_lease['start'] = lease.start.timestamp() data_lease['hostname'] = lease.hostname @@ -110,8 +105,9 @@ def _get_raw_server_leases(family, pool=None) -> list: data_lease['remaining'] = '' # Do not add old leases - if data_lease['remaining'] != '': - data.append(data_lease) + if data_lease['remaining'] != '' and data_lease['pool'] in pool: + if not state or data_lease['state'] in state: + data.append(data_lease) # deduplicate checked = [] @@ -123,15 +119,20 @@ def _get_raw_server_leases(family, pool=None) -> list: idx = _find_list_of_dict_index(data, key='ip', value=addr) data.pop(idx) + if sorted: + if sorted == 'ip': + data.sort(key = lambda x:ip_address(x['ip'])) + else: + data.sort(key = lambda x:x[sorted]) return data -def _get_formatted_server_leases(raw_data, family): +def _get_formatted_server_leases(raw_data, family='inet'): data_entries = [] if family == 'inet': for lease in raw_data: ipaddr = lease.get('ip') - hw_addr = lease.get('hardware') + hw_addr = lease.get('mac') state = lease.get('state') start = lease.get('start') start = _utc_to_local(start).strftime('%Y/%m/%d %H:%M:%S') @@ -142,7 +143,7 @@ def _get_formatted_server_leases(raw_data, family): hostname = lease.get('hostname') data_entries.append([ipaddr, hw_addr, state, start, end, remain, pool, hostname]) - headers = ['IP Address', 'Hardware address', 'State', 'Lease start', 'Lease expiration', 'Remaining', 'Pool', + headers = ['IP Address', 'MAC address', 'State', 'Lease start', 'Lease expiration', 'Remaining', 'Pool', 'Hostname'] if family == 'inet6': @@ -256,16 +257,28 @@ def show_pool_statistics(raw: bool, family: str, pool: typing.Optional[str]): @_verify -def show_server_leases(raw: bool, family: str): +def show_server_leases(raw: bool, family: str, pool: typing.Optional[str], + sorted: typing.Optional[str], state: typing.Optional[str]): # if dhcp server is down, inactive leases may still be shown as active, so warn the user. if not is_systemd_service_running('isc-dhcp-server.service'): Warning('DHCP server is configured but not started. Data may be stale.') - leases = _get_raw_server_leases(family) + v = 'v6' if family == 'inet6' else '' + if pool and pool not in _get_dhcp_pools(family=family): + raise vyos.opmode.IncorrectValue(f'DHCP{v} pool "{pool}" does not exist!') + + if state and state not in lease_valid_states: + raise vyos.opmode.IncorrectValue(f'DHCP{v} state "{state}" is invalid!') + + sort_valid = sort_valid_inet6 if family == 'inet6' else sort_valid_inet + if sorted and sorted not in sort_valid: + raise vyos.opmode.IncorrectValue(f'DHCP{v} sort "{sorted}" is invalid!') + + lease_data = _get_raw_server_leases(family=family, pool=pool, sorted=sorted, state=state) if raw: - return leases + return lease_data else: - return _get_formatted_server_leases(leases, family) + return _get_formatted_server_leases(lease_data, family=family) if __name__ == '__main__': |