diff options
Diffstat (limited to 'src/op_mode')
-rwxr-xr-x | src/op_mode/bgp.py | 3 | ||||
-rwxr-xr-x | src/op_mode/conntrack.py | 4 | ||||
-rwxr-xr-x | src/op_mode/dhcp.py | 32 | ||||
-rwxr-xr-x | src/op_mode/nat.py | 10 | ||||
-rwxr-xr-x | src/op_mode/neighbor.py | 8 | ||||
-rwxr-xr-x | src/op_mode/openvpn.py | 7 | ||||
-rwxr-xr-x | src/op_mode/route.py | 6 |
7 files changed, 44 insertions, 26 deletions
diff --git a/src/op_mode/bgp.py b/src/op_mode/bgp.py index 23001a9d7..3f6d45dd7 100755 --- a/src/op_mode/bgp.py +++ b/src/op_mode/bgp.py @@ -30,6 +30,7 @@ from vyos.configquery import ConfigTreeQuery import vyos.opmode +ArgFamily = typing.Literal['inet', 'inet6'] frr_command_template = Template(""" {% if family %} @@ -75,7 +76,7 @@ def _verify(func): @_verify def show_neighbors(raw: bool, - family: str, + family: ArgFamily, peer: typing.Optional[str], vrf: typing.Optional[str]): kwargs = dict(locals()) diff --git a/src/op_mode/conntrack.py b/src/op_mode/conntrack.py index df213cc5a..ea7c4c208 100755 --- a/src/op_mode/conntrack.py +++ b/src/op_mode/conntrack.py @@ -15,6 +15,7 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. import sys +import typing import xmltodict from tabulate import tabulate @@ -23,6 +24,7 @@ from vyos.util import run import vyos.opmode +ArgFamily = typing.Literal['inet', 'inet6'] def _get_xml_data(family): """ @@ -126,7 +128,7 @@ def get_formatted_output(dict_data): return output -def show(raw: bool, family: str): +def show(raw: bool, family: ArgFamily): family = 'ipv6' if family == 'inet6' else 'ipv4' conntrack_data = _get_raw_data(family) if raw: diff --git a/src/op_mode/dhcp.py b/src/op_mode/dhcp.py index b9e6e7bc9..41da14065 100755 --- a/src/op_mode/dhcp.py +++ b/src/op_mode/dhcp.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2022 VyOS maintainers and contributors +# Copyright (C) 2022-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -36,6 +36,9 @@ lease_valid_states = ['all', 'active', 'free', 'expired', 'released', 'abandoned sort_valid_inet = ['end', 'mac', 'hostname', 'ip', 'pool', 'remaining', 'start', 'state'] sort_valid_inet6 = ['end', 'iaid_duid', 'ip', 'last_communication', 'pool', 'remaining', 'state', 'type'] +ArgFamily = typing.Literal['inet', 'inet6'] +ArgState = typing.Literal['all', 'active', 'free', 'expired', 'released', 'abandoned', 'reset', 'backup'] + def _utc_to_local(utc_dt): return datetime.fromtimestamp((datetime.fromtimestamp(utc_dt) - datetime(1970, 1, 1)).total_seconds()) @@ -82,7 +85,7 @@ def _get_raw_server_leases(family='inet', pool=None, sorted=None, state=[]) -> l data_lease['ip'] = lease.ip data_lease['state'] = lease.binding_state data_lease['pool'] = lease.sets.get('shared-networkname', '') - data_lease['end'] = lease.end.timestamp() + data_lease['end'] = lease.end.timestamp() if lease.end else None if family == 'inet': data_lease['mac'] = lease.ethernet @@ -95,17 +98,18 @@ def _get_raw_server_leases(family='inet', pool=None, sorted=None, state=[]) -> l lease_types_long = {'na': 'non-temporary', 'ta': 'temporary', 'pd': 'prefix delegation'} data_lease['type'] = lease_types_long[lease.type] - data_lease['remaining'] = lease.end - datetime.utcnow() + data_lease['remaining'] = '-' - if data_lease['remaining'].days >= 0: - # substraction gives us a timedelta object which can't be formatted with strftime - # so we use str(), split gets rid of the microseconds - data_lease['remaining'] = str(data_lease["remaining"]).split('.')[0] - else: - data_lease['remaining'] = '' + if lease.end: + data_lease['remaining'] = lease.end - datetime.utcnow() + + if data_lease['remaining'].days >= 0: + # substraction gives us a timedelta object which can't be formatted with strftime + # so we use str(), split gets rid of the microseconds + data_lease['remaining'] = str(data_lease["remaining"]).split('.')[0] # Do not add old leases - if data_lease['remaining'] != '' and data_lease['pool'] in pool: + if data_lease['remaining'] != '' and data_lease['pool'] in pool and data_lease['state'] != 'free': if not state or data_lease['state'] in state: data.append(data_lease) @@ -137,7 +141,7 @@ def _get_formatted_server_leases(raw_data, family='inet'): start = lease.get('start') start = _utc_to_local(start).strftime('%Y/%m/%d %H:%M:%S') end = lease.get('end') - end = _utc_to_local(end).strftime('%Y/%m/%d %H:%M:%S') + end = _utc_to_local(end).strftime('%Y/%m/%d %H:%M:%S') if end else '-' remain = lease.get('remaining') pool = lease.get('pool') hostname = lease.get('hostname') @@ -248,7 +252,7 @@ def _verify(func): @_verify -def show_pool_statistics(raw: bool, family: str, pool: typing.Optional[str]): +def show_pool_statistics(raw: bool, family: ArgFamily, pool: typing.Optional[str]): pool_data = _get_raw_pool_statistics(family=family, pool=pool) if raw: return pool_data @@ -257,8 +261,8 @@ def show_pool_statistics(raw: bool, family: str, pool: typing.Optional[str]): @_verify -def show_server_leases(raw: bool, family: str, pool: typing.Optional[str], - sorted: typing.Optional[str], state: typing.Optional[str]): +def show_server_leases(raw: bool, family: ArgFamily, pool: typing.Optional[str], + sorted: typing.Optional[str], state: typing.Optional[ArgState]): # if dhcp server is down, inactive leases may still be shown as active, so warn the user. if not is_systemd_service_running('isc-dhcp-server.service'): Warning('DHCP server is configured but not started. Data may be stale.') diff --git a/src/op_mode/nat.py b/src/op_mode/nat.py index cf06de0e9..c92795745 100755 --- a/src/op_mode/nat.py +++ b/src/op_mode/nat.py @@ -31,6 +31,8 @@ from vyos.util import dict_search base = 'nat' unconf_message = 'NAT is not configured' +ArgDirection = typing.Literal['source', 'destination'] +ArgFamily = typing.Literal['inet', 'inet6'] def _get_xml_translation(direction, family, address=None): """ @@ -298,7 +300,7 @@ def _verify(func): @_verify -def show_rules(raw: bool, direction: str, family: str): +def show_rules(raw: bool, direction: ArgDirection, family: ArgFamily): nat_rules = _get_raw_data_rules(direction, family) if raw: return nat_rules @@ -307,7 +309,7 @@ def show_rules(raw: bool, direction: str, family: str): @_verify -def show_statistics(raw: bool, direction: str, family: str): +def show_statistics(raw: bool, direction: ArgDirection, family: ArgFamily): nat_statistics = _get_raw_data_rules(direction, family) if raw: return nat_statistics @@ -316,8 +318,8 @@ def show_statistics(raw: bool, direction: str, family: str): @_verify -def show_translations(raw: bool, direction: - str, family: str, +def show_translations(raw: bool, direction: ArgDirection, + family: ArgFamily, address: typing.Optional[str], verbose: typing.Optional[bool]): family = 'ipv6' if family == 'inet6' else 'ipv4' diff --git a/src/op_mode/neighbor.py b/src/op_mode/neighbor.py index 264dbdc72..b329ea280 100755 --- a/src/op_mode/neighbor.py +++ b/src/op_mode/neighbor.py @@ -32,6 +32,9 @@ import typing import vyos.opmode +ArgFamily = typing.Literal['inet', 'inet6'] +ArgState = typing.Literal['reachable', 'stale', 'failed', 'permanent'] + def interface_exists(interface): import os return os.path.exists(f'/sys/class/net/{interface}') @@ -88,7 +91,8 @@ def format_neighbors(neighs, interface=None): headers = ["Address", "Interface", "Link layer address", "State"] return tabulate(neighs, headers) -def show(raw: bool, family: str, interface: typing.Optional[str], state: typing.Optional[str]): +def show(raw: bool, family: ArgFamily, interface: typing.Optional[str], + state: typing.Optional[ArgState]): """ Display neighbor table contents """ data = get_raw_data(family, interface, state=state) @@ -97,7 +101,7 @@ def show(raw: bool, family: str, interface: typing.Optional[str], state: typing. else: return format_neighbors(data, interface) -def reset(family: str, interface: typing.Optional[str], address: typing.Optional[str]): +def reset(family: ArgFamily, interface: typing.Optional[str], address: typing.Optional[str]): from vyos.util import run if address and interface: diff --git a/src/op_mode/openvpn.py b/src/op_mode/openvpn.py index 79130c7c0..8f88ab422 100755 --- a/src/op_mode/openvpn.py +++ b/src/op_mode/openvpn.py @@ -18,6 +18,7 @@ import os import sys +import typing from tabulate import tabulate import vyos.opmode @@ -26,6 +27,8 @@ from vyos.util import commit_in_progress from vyos.util import call from vyos.config import Config +ArgMode = typing.Literal['client', 'server', 'site_to_site'] + def _get_tunnel_address(peer_host, peer_port, status_file): peer = peer_host + ':' + peer_port lst = [] @@ -155,7 +158,7 @@ def _get_raw_data(mode: str) -> dict: d['local_port'] = conf_dict[intf].get('local-port', '') if conf.exists(f'interfaces openvpn {intf} server client'): d['configured_clients'] = conf.list_nodes(f'interfaces openvpn {intf} server client') - if mode in ['client', 'site-to-site']: + if mode in ['client', 'site_to_site']: for client in d['clients']: if 'shared-secret-key-file' in list(conf_dict[intf]): client['name'] = 'None (PSK)' @@ -198,7 +201,7 @@ def _format_openvpn(data: dict) -> str: return out -def show(raw: bool, mode: str) -> str: +def show(raw: bool, mode: ArgMode) -> str: openvpn_data = _get_raw_data(mode) if raw: diff --git a/src/op_mode/route.py b/src/op_mode/route.py index 7f0f9cbac..d6d6b7d6f 100755 --- a/src/op_mode/route.py +++ b/src/op_mode/route.py @@ -54,7 +54,9 @@ frr_command_template = Template(""" {% endif %} """) -def show_summary(raw: bool, family: str, table: typing.Optional[int], vrf: typing.Optional[str]): +ArgFamily = typing.Literal['inet', 'inet6'] + +def show_summary(raw: bool, family: ArgFamily, table: typing.Optional[int], vrf: typing.Optional[str]): from vyos.util import cmd if family == 'inet': @@ -94,7 +96,7 @@ def show_summary(raw: bool, family: str, table: typing.Optional[int], vrf: typin return output def show(raw: bool, - family: str, + family: ArgFamily, net: typing.Optional[str], table: typing.Optional[int], protocol: typing.Optional[str], |