diff options
-rw-r--r-- | op-mode-definitions/show-interfaces-wireless.xml.in | 10 | ||||
-rwxr-xr-x | src/op_mode/interfaces_wireless.py | 186 | ||||
-rwxr-xr-x | src/op_mode/show_wireless.py | 149 |
3 files changed, 191 insertions, 154 deletions
diff --git a/op-mode-definitions/show-interfaces-wireless.xml.in b/op-mode-definitions/show-interfaces-wireless.xml.in index 27c0f43db..09c9a7895 100644 --- a/op-mode-definitions/show-interfaces-wireless.xml.in +++ b/op-mode-definitions/show-interfaces-wireless.xml.in @@ -20,7 +20,7 @@ <properties> <help>Show wireless interface configuration</help> </properties> - <command>${vyos_op_scripts_dir}/show_wireless.py --brief</command> + <command>${vyos_op_scripts_dir}/interfaces_wireless.py show_info</command> </leafNode> </children> </node> @@ -35,15 +35,15 @@ <children> <leafNode name="brief"> <properties> - <help>Show summary of the specified wireless interface information</help> + <help>Show brief summary of the specified wireless interface</help> </properties> <command>${vyos_op_scripts_dir}/interfaces.py show_summary --intf-name="$4" --intf-type=wireless</command> </leafNode> <node name="scan"> <properties> - <help>Show summary of the specified wireless interface information</help> + <help>Scan for networks via specified wireless interface</help> </properties> - <command>sudo ${vyos_op_scripts_dir}/show_wireless.py --scan "$4"</command> + <command>sudo ${vyos_op_scripts_dir}/interfaces_wireless.py show_scan --intf-name="$4"</command> <children> <leafNode name="detail"> <properties> @@ -57,7 +57,7 @@ <properties> <help>Show specified Wireless interface information</help> </properties> - <command>${vyos_op_scripts_dir}/show_wireless.py --stations "$4"</command> + <command>${vyos_op_scripts_dir}/interfaces_wireless.py show_stations --intf-name="$4"</command> </leafNode> <tagNode name="vif"> <properties> diff --git a/src/op_mode/interfaces_wireless.py b/src/op_mode/interfaces_wireless.py new file mode 100755 index 000000000..dfe50e2cb --- /dev/null +++ b/src/op_mode/interfaces_wireless.py @@ -0,0 +1,186 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2023 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import re +import sys +import typing +import vyos.opmode + +from copy import deepcopy +from tabulate import tabulate +from vyos.utils.process import popen +from vyos.configquery import ConfigTreeQuery + +def _verify(func): + """Decorator checks if Wireless LAN config exists""" + from functools import wraps + + @wraps(func) + def _wrapper(*args, **kwargs): + config = ConfigTreeQuery() + if not config.exists(['interfaces', 'wireless']): + raise vyos.opmode.UnconfiguredSubsystem(unconf_message) + return func(*args, **kwargs) + return _wrapper + +def _get_raw_info_data(): + output_data = [] + + config = ConfigTreeQuery() + raw = config.get_config_dict(['interfaces', 'wireless'], effective=True, + get_first_key=True, key_mangling=('-', '_')) + for interface, interface_config in raw.items(): + tmp = {'name' : interface} + + if 'type' in interface_config: + tmp.update({'type' : interface_config['type']}) + else: + tmp.update({'type' : '-'}) + + if 'ssid' in interface_config: + tmp.update({'ssid' : interface_config['ssid']}) + else: + tmp.update({'ssid' : '-'}) + + if 'channel' in interface_config: + tmp.update({'channel' : interface_config['channel']}) + else: + tmp.update({'channel' : '-'}) + + output_data.append(tmp) + + return output_data + +def _get_formatted_info_output(raw_data): + output=[] + for ssid in raw_data: + output.append([ssid['name'], ssid['type'], ssid['ssid'], ssid['channel']]) + + headers = ["Interface", "Type", "SSID", "Channel"] + print(tabulate(output, headers, numalign="left")) + +def _get_raw_scan_data(intf_name): + # XXX: This ignores errors + tmp, _ = popen(f'iw dev {intf_name} scan ap-force') + networks = [] + data = { + 'ssid': '', + 'mac': '', + 'channel': '', + 'signal': '' + } + re_mac = re.compile(r'([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})') + for line in tmp.splitlines(): + if line.startswith('BSS '): + ssid = deepcopy(data) + ssid['mac'] = re.search(re_mac, line).group() + + elif line.lstrip().startswith('SSID: '): + # SSID can be " SSID: WLAN-57 6405", thus strip all leading whitespaces + ssid['ssid'] = line.lstrip().split(':')[-1].lstrip() + + elif line.lstrip().startswith('signal: '): + # Siganl can be " signal: -67.00 dBm", thus strip all leading whitespaces + ssid['signal'] = line.lstrip().split(':')[-1].split()[0] + + elif line.lstrip().startswith('DS Parameter set: channel'): + # Channel can be " DS Parameter set: channel 6" , thus + # strip all leading whitespaces + ssid['channel'] = line.lstrip().split(':')[-1].split()[-1] + networks.append(ssid) + continue + + return networks + +def _format_scan_data(raw_data): + output=[] + for ssid in raw_data: + output.append([ssid['mac'], ssid['ssid'], ssid['channel'], ssid['signal']]) + headers = ["Address", "SSID", "Channel", "Signal (dbm)"] + return tabulate(output, headers, numalign="left") + +def _get_raw_station_data(intf_name): + # XXX: This ignores errors + tmp, _ = popen(f'iw dev {intf_name} station dump') + clients = [] + data = { + 'mac': '', + 'signal': '', + 'rx_bytes': '', + 'rx_packets': '', + 'tx_bytes': '', + 'tx_packets': '' + } + re_mac = re.compile(r'([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})') + for line in tmp.splitlines(): + if line.startswith('Station'): + client = deepcopy(data) + client['mac'] = re.search(re_mac, line).group() + + elif line.lstrip().startswith('signal avg:'): + client['signal'] = line.lstrip().split(':')[-1].lstrip().split()[0] + + elif line.lstrip().startswith('rx bytes:'): + client['rx_bytes'] = line.lstrip().split(':')[-1].lstrip() + + elif line.lstrip().startswith('rx packets:'): + client['rx_packets'] = line.lstrip().split(':')[-1].lstrip() + + elif line.lstrip().startswith('tx bytes:'): + client['tx_bytes'] = line.lstrip().split(':')[-1].lstrip() + + elif line.lstrip().startswith('tx packets:'): + client['tx_packets'] = line.lstrip().split(':')[-1].lstrip() + clients.append(client) + continue + + return clients + +def _format_station_data(raw_data): + output=[] + for ssid in raw_data: + output.append([ssid['mac'], ssid['signal'], ssid['rx_bytes'], ssid['rx_packets'], ssid['tx_bytes'], ssid['tx_packets']]) + headers = ["Station", "Signal", "RX bytes", "RX packets", "TX bytes", "TX packets"] + return tabulate(output, headers, numalign="left") + +@_verify +def show_info(raw: bool): + info_data = _get_raw_info_data() + if raw: + return info_data + return _get_formatted_info_output(info_data) + +def show_scan(raw: bool, intf_name: str): + data = _get_raw_scan_data(intf_name) + if raw: + return data + return _format_scan_data(data) + +@_verify +def show_stations(raw: bool, intf_name: str): + data = _get_raw_station_data(intf_name) + if raw: + return data + return _format_station_data(data) + +if __name__ == '__main__': + try: + res = vyos.opmode.run(sys.modules[__name__]) + if res: + print(res) + except (ValueError, vyos.opmode.Error) as e: + print(e) + sys.exit(1) diff --git a/src/op_mode/show_wireless.py b/src/op_mode/show_wireless.py deleted file mode 100755 index 340163057..000000000 --- a/src/op_mode/show_wireless.py +++ /dev/null @@ -1,149 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2019-2023 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import argparse -import re - -from sys import exit -from copy import deepcopy - -from vyos.config import Config -from vyos.utils.process import popen - -parser = argparse.ArgumentParser() -parser.add_argument("-s", "--scan", help="Scan for Wireless APs on given interface, e.g. 'wlan0'") -parser.add_argument("-b", "--brief", action="store_true", help="Show wireless configuration") -parser.add_argument("-c", "--stations", help="Show wireless clients connected on interface, e.g. 'wlan0'") - -def show_brief(): - config = Config() - if len(config.list_effective_nodes('interfaces wireless')) == 0: - print("No Wireless interfaces configured") - exit(0) - - interfaces = [] - for intf in config.list_effective_nodes('interfaces wireless'): - config.set_level(f'interfaces wireless {intf}') - data = { 'name': intf } - data['type'] = config.return_effective_value('type') or '-' - data['ssid'] = config.return_effective_value('ssid') or '-' - data['channel'] = config.return_effective_value('channel') or '-' - interfaces.append(data) - - return interfaces - -def ssid_scan(intf): - # XXX: This ignores errors - tmp, _ = popen(f'/sbin/iw dev {intf} scan ap-force') - networks = [] - data = { - 'ssid': '', - 'mac': '', - 'channel': '', - 'signal': '' - } - re_mac = re.compile(r'([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})') - for line in tmp.splitlines(): - if line.startswith('BSS '): - ssid = deepcopy(data) - ssid['mac'] = re.search(re_mac, line).group() - - elif line.lstrip().startswith('SSID: '): - # SSID can be " SSID: WLAN-57 6405", thus strip all leading whitespaces - ssid['ssid'] = line.lstrip().split(':')[-1].lstrip() - - elif line.lstrip().startswith('signal: '): - # Siganl can be " signal: -67.00 dBm", thus strip all leading whitespaces - ssid['signal'] = line.lstrip().split(':')[-1].split()[0] - - elif line.lstrip().startswith('DS Parameter set: channel'): - # Channel can be " DS Parameter set: channel 6" , thus - # strip all leading whitespaces - ssid['channel'] = line.lstrip().split(':')[-1].split()[-1] - networks.append(ssid) - continue - - return networks - -def show_clients(intf): - # XXX: This ignores errors - tmp, _ = popen(f'/sbin/iw dev {intf} station dump') - clients = [] - data = { - 'mac': '', - 'signal': '', - 'rx_bytes': '', - 'rx_packets': '', - 'tx_bytes': '', - 'tx_packets': '' - } - re_mac = re.compile(r'([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})') - for line in tmp.splitlines(): - if line.startswith('Station'): - client = deepcopy(data) - client['mac'] = re.search(re_mac, line).group() - - elif line.lstrip().startswith('signal avg:'): - client['signal'] = line.lstrip().split(':')[-1].lstrip().split()[0] - - elif line.lstrip().startswith('rx bytes:'): - client['rx_bytes'] = line.lstrip().split(':')[-1].lstrip() - - elif line.lstrip().startswith('rx packets:'): - client['rx_packets'] = line.lstrip().split(':')[-1].lstrip() - - elif line.lstrip().startswith('tx bytes:'): - client['tx_bytes'] = line.lstrip().split(':')[-1].lstrip() - - elif line.lstrip().startswith('tx packets:'): - client['tx_packets'] = line.lstrip().split(':')[-1].lstrip() - clients.append(client) - continue - - return clients - -if __name__ == '__main__': - args = parser.parse_args() - - if args.scan: - print("Address SSID Channel Signal (dbm)") - for network in ssid_scan(args.scan): - print("{:<17} {:<32} {:>3} {}".format(network['mac'], - network['ssid'], - network['channel'], - network['signal'])) - exit(0) - - elif args.brief: - print("Interface Type SSID Channel") - for intf in show_brief(): - print("{:<9} {:<12} {:<32} {:>3}".format(intf['name'], - intf['type'], - intf['ssid'], - intf['channel'])) - exit(0) - - elif args.stations: - print("Station Signal RX: bytes packets TX: bytes packets") - for client in show_clients(args.stations): - print("{:<17} {:>3} {:>15} {:>9} {:>15} {:>10} ".format(client['mac'], - client['signal'], client['rx_bytes'], client['rx_packets'], client['tx_bytes'], client['tx_packets'])) - - exit(0) - - else: - parser.print_help() - exit(1) |