diff options
Diffstat (limited to 'src/op_mode/openvpn.py')
-rwxr-xr-x | src/op_mode/openvpn.py | 66 |
1 files changed, 49 insertions, 17 deletions
diff --git a/src/op_mode/openvpn.py b/src/op_mode/openvpn.py index 3797a7153..d9ae965c5 100755 --- a/src/op_mode/openvpn.py +++ b/src/op_mode/openvpn.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2022 VyOS maintainers and contributors +# Copyright (C) 2022-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -16,16 +16,21 @@ # # +import json import os import sys +import typing from tabulate import tabulate import vyos.opmode from vyos.util import bytes_to_human from vyos.util import commit_in_progress from vyos.util import call +from vyos.util import rc_cmd from vyos.config import Config +ArgMode = typing.Literal['client', 'server', 'site_to_site'] + def _get_tunnel_address(peer_host, peer_port, status_file): peer = peer_host + ':' + peer_port lst = [] @@ -50,7 +55,7 @@ def _get_tunnel_address(peer_host, peer_port, status_file): def _get_interface_status(mode: str, interface: str) -> dict: status_file = f'/run/openvpn/{interface}.status' - data = { + data: dict = { 'mode': mode, 'intf': interface, 'local_host': '', @@ -60,7 +65,7 @@ def _get_interface_status(mode: str, interface: str) -> dict: } if not os.path.exists(status_file): - raise vyos.opmode.DataUnavailable('No information for interface {interface}') + return data with open(status_file, 'r') as f: lines = f.readlines() @@ -139,30 +144,54 @@ def _get_interface_status(mode: str, interface: str) -> dict: return data -def _get_raw_data(mode: str) -> dict: - data = {} + +def _get_interface_state(iface): + rc, out = rc_cmd(f'ip --json link show dev {iface}') + try: + data = json.loads(out) + except: + return 'DOWN' + return data[0].get('operstate', 'DOWN') + + +def _get_interface_description(iface): + rc, out = rc_cmd(f'ip --json link show dev {iface}') + try: + data = json.loads(out) + except: + return '' + return data[0].get('ifalias', '') + + +def _get_raw_data(mode: str) -> list: + data: list = [] conf = Config() conf_dict = conf.get_config_dict(['interfaces', 'openvpn'], get_first_key=True) if not conf_dict: return data - interfaces = [x for x in list(conf_dict) if conf_dict[x]['mode'] == mode] + interfaces = [x for x in list(conf_dict) if + conf_dict[x]['mode'].replace('-', '_') == mode] for intf in interfaces: - data[intf] = _get_interface_status(mode, intf) - d = data[intf] + d = _get_interface_status(mode, intf) + d['state'] = _get_interface_state(intf) + d['description'] = _get_interface_description(intf) d['local_host'] = conf_dict[intf].get('local-host', '') d['local_port'] = conf_dict[intf].get('local-port', '') - if mode in ['client', 'site-to-site']: + if conf.exists(f'interfaces openvpn {intf} server client'): + d['configured_clients'] = conf.list_nodes(f'interfaces openvpn {intf} server client') + if mode in ['client', 'site_to_site']: for client in d['clients']: if 'shared-secret-key-file' in list(conf_dict[intf]): client['name'] = 'None (PSK)' client['remote_host'] = conf_dict[intf].get('remote-host', [''])[0] client['remote_port'] = conf_dict[intf].get('remote-port', '1194') + data.append(d) return data -def _format_openvpn(data: dict) -> str: +def _format_openvpn(data: list) -> str: if not data: out = 'No OpenVPN interfaces configured' return out @@ -171,11 +200,12 @@ def _format_openvpn(data: dict) -> str: 'TX bytes', 'RX bytes', 'Connected Since'] out = '' - data_out = [] - for intf in list(data): - l_host = data[intf]['local_host'] - l_port = data[intf]['local_port'] - for client in list(data[intf]['clients']): + for d in data: + data_out = [] + intf = d['intf'] + l_host = d['local_host'] + l_port = d['local_port'] + for client in d['clients']: r_host = client['remote_host'] r_port = client['remote_port'] @@ -190,11 +220,13 @@ def _format_openvpn(data: dict) -> str: data_out.append([name, remote, tunnel, local, tx_bytes, rx_bytes, online_since]) - out += tabulate(data_out, headers) + if data_out: + out += tabulate(data_out, headers) + out += "\n" return out -def show(raw: bool, mode: str) -> str: +def show(raw: bool, mode: ArgMode) -> typing.Union[list,str]: openvpn_data = _get_raw_data(mode) if raw: |