summaryrefslogtreecommitdiff
path: root/src/op_mode
diff options
context:
space:
mode:
Diffstat (limited to 'src/op_mode')
-rwxr-xr-xsrc/op_mode/generate_tech-support_archive.py148
-rwxr-xr-xsrc/op_mode/interfaces_wireless.py186
-rwxr-xr-xsrc/op_mode/lldp.py5
-rw-r--r--src/op_mode/show-ssh-fingerprints.py49
-rwxr-xr-xsrc/op_mode/show_wireless.py149
5 files changed, 387 insertions, 150 deletions
diff --git a/src/op_mode/generate_tech-support_archive.py b/src/op_mode/generate_tech-support_archive.py
new file mode 100755
index 000000000..c490b0137
--- /dev/null
+++ b/src/op_mode/generate_tech-support_archive.py
@@ -0,0 +1,148 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 2023 VyOS maintainers and contributors
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 or later as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import os
+import argparse
+import glob
+from datetime import datetime
+from pathlib import Path
+from shutil import rmtree
+
+from socket import gethostname
+from sys import exit
+from tarfile import open as tar_open
+from vyos.utils.process import rc_cmd
+from vyos.remote import upload
+
+def op(cmd: str) -> str:
+ """Returns a command with the VyOS operational mode wrapper."""
+ return f'/opt/vyatta/bin/vyatta-op-cmd-wrapper {cmd}'
+
+def save_stdout(command: str, file: Path) -> None:
+ rc, stdout = rc_cmd(command)
+ body: str = f'''### {command} ###
+Command: {command}
+Exit code: {rc}
+Stdout:
+{stdout}
+
+'''
+ with file.open(mode='a') as f:
+ f.write(body)
+def __rotate_logs(path: str, log_pattern:str):
+ files_list = glob.glob(f'{path}/{log_pattern}')
+ if len(files_list) > 5:
+ oldest_file = min(files_list, key=os.path.getctime)
+ os.remove(oldest_file)
+
+
+def __generate_archived_files(location_path: str) -> None:
+ """
+ Generate arhives of main directories
+ :param location_path: path to temporary directory
+ :type location_path: str
+ """
+ # Dictionary arhive_name:directory_to_arhive
+ archive_dict = {
+ 'etc': '/etc',
+ 'home': '/home',
+ 'var-log': '/var/log',
+ 'root': '/root',
+ 'tmp': '/tmp',
+ 'core-dump': '/var/core',
+ 'config': '/opt/vyatta/etc/config'
+ }
+ # Dictionary arhive_name:excluding pattern
+ archive_excludes = {
+ # Old location of archives
+ 'config': 'tech-support-archive',
+ # New locations of arhives
+ 'tmp': 'tech-support-archive'
+ }
+ for archive_name, path in archive_dict.items():
+ archive_file: str = f'{location_path}/{archive_name}.tar.gz'
+ with tar_open(name=archive_file, mode='x:gz') as tar_file:
+ if archive_name in archive_excludes:
+ tar_file.add(path, filter=lambda x: None if str(archive_excludes[archive_name]) in str(x.name) else x)
+ else:
+ tar_file.add(path)
+
+
+def __generate_main_archive_file(archive_file: str, tmp_dir_path: str) -> None:
+ """
+ Generate main arhive file
+ :param archive_file: name of arhive file
+ :type archive_file: str
+ :param tmp_dir_path: path to arhive memeber
+ :type tmp_dir_path: str
+ """
+ with tar_open(name=archive_file, mode='x:gz') as tar_file:
+ tar_file.add(tmp_dir_path, arcname=os.path.basename(tmp_dir_path))
+
+
+if __name__ == '__main__':
+ defualt_tmp_dir = '/tmp'
+ parser = argparse.ArgumentParser()
+ parser.add_argument("path", nargs='?', default=defualt_tmp_dir)
+ args = parser.parse_args()
+ location_path = args.path[:-1] if args.path[-1] == '/' else args.path
+
+ hostname: str = gethostname()
+ time_now: str = datetime.now().isoformat(timespec='seconds').replace(":", "-")
+
+ remote = False
+ tmp_path = ''
+ tmp_dir_path = ''
+ if 'ftp://' in args.path or 'scp://' in args.path:
+ remote = True
+ tmp_path = defualt_tmp_dir
+ else:
+ tmp_path = location_path
+ archive_pattern = f'_tech-support-archive_'
+ archive_file_name = f'{hostname}{archive_pattern}{time_now}.tar.gz'
+
+ # Log rotation in tmp directory
+ if tmp_path == defualt_tmp_dir:
+ __rotate_logs(tmp_path, f'*{archive_pattern}*')
+
+ # Temporary directory creation
+ tmp_dir_path = f'{tmp_path}/drops-debug_{time_now}'
+ tmp_dir: Path = Path(tmp_dir_path)
+ tmp_dir.mkdir()
+
+ report_file: Path = Path(f'{tmp_dir_path}/show_tech-support_report.txt')
+ report_file.touch()
+ try:
+
+ save_stdout(op('show tech-support report'), report_file)
+ # Generate included archives
+ __generate_archived_files(tmp_dir_path)
+
+ # Generate main archive
+ __generate_main_archive_file(f'{tmp_path}/{archive_file_name}', tmp_dir_path)
+ # Delete temporary directory
+ rmtree(tmp_dir)
+ # Upload to remote site if it is scpecified
+ if remote:
+ upload(f'{tmp_path}/{archive_file_name}', args.path)
+ print(f'Debug file is generated and located in {location_path}/{archive_file_name}')
+ except Exception as err:
+ print(f'Error during generating a debug file: {err}')
+ # cleanup
+ if tmp_dir.exists():
+ rmtree(tmp_dir)
+ finally:
+ # cleanup
+ exit()
diff --git a/src/op_mode/interfaces_wireless.py b/src/op_mode/interfaces_wireless.py
new file mode 100755
index 000000000..dfe50e2cb
--- /dev/null
+++ b/src/op_mode/interfaces_wireless.py
@@ -0,0 +1,186 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 2023 VyOS maintainers and contributors
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 or later as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import re
+import sys
+import typing
+import vyos.opmode
+
+from copy import deepcopy
+from tabulate import tabulate
+from vyos.utils.process import popen
+from vyos.configquery import ConfigTreeQuery
+
+def _verify(func):
+ """Decorator checks if Wireless LAN config exists"""
+ from functools import wraps
+
+ @wraps(func)
+ def _wrapper(*args, **kwargs):
+ config = ConfigTreeQuery()
+ if not config.exists(['interfaces', 'wireless']):
+ raise vyos.opmode.UnconfiguredSubsystem(unconf_message)
+ return func(*args, **kwargs)
+ return _wrapper
+
+def _get_raw_info_data():
+ output_data = []
+
+ config = ConfigTreeQuery()
+ raw = config.get_config_dict(['interfaces', 'wireless'], effective=True,
+ get_first_key=True, key_mangling=('-', '_'))
+ for interface, interface_config in raw.items():
+ tmp = {'name' : interface}
+
+ if 'type' in interface_config:
+ tmp.update({'type' : interface_config['type']})
+ else:
+ tmp.update({'type' : '-'})
+
+ if 'ssid' in interface_config:
+ tmp.update({'ssid' : interface_config['ssid']})
+ else:
+ tmp.update({'ssid' : '-'})
+
+ if 'channel' in interface_config:
+ tmp.update({'channel' : interface_config['channel']})
+ else:
+ tmp.update({'channel' : '-'})
+
+ output_data.append(tmp)
+
+ return output_data
+
+def _get_formatted_info_output(raw_data):
+ output=[]
+ for ssid in raw_data:
+ output.append([ssid['name'], ssid['type'], ssid['ssid'], ssid['channel']])
+
+ headers = ["Interface", "Type", "SSID", "Channel"]
+ print(tabulate(output, headers, numalign="left"))
+
+def _get_raw_scan_data(intf_name):
+ # XXX: This ignores errors
+ tmp, _ = popen(f'iw dev {intf_name} scan ap-force')
+ networks = []
+ data = {
+ 'ssid': '',
+ 'mac': '',
+ 'channel': '',
+ 'signal': ''
+ }
+ re_mac = re.compile(r'([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})')
+ for line in tmp.splitlines():
+ if line.startswith('BSS '):
+ ssid = deepcopy(data)
+ ssid['mac'] = re.search(re_mac, line).group()
+
+ elif line.lstrip().startswith('SSID: '):
+ # SSID can be " SSID: WLAN-57 6405", thus strip all leading whitespaces
+ ssid['ssid'] = line.lstrip().split(':')[-1].lstrip()
+
+ elif line.lstrip().startswith('signal: '):
+ # Siganl can be " signal: -67.00 dBm", thus strip all leading whitespaces
+ ssid['signal'] = line.lstrip().split(':')[-1].split()[0]
+
+ elif line.lstrip().startswith('DS Parameter set: channel'):
+ # Channel can be " DS Parameter set: channel 6" , thus
+ # strip all leading whitespaces
+ ssid['channel'] = line.lstrip().split(':')[-1].split()[-1]
+ networks.append(ssid)
+ continue
+
+ return networks
+
+def _format_scan_data(raw_data):
+ output=[]
+ for ssid in raw_data:
+ output.append([ssid['mac'], ssid['ssid'], ssid['channel'], ssid['signal']])
+ headers = ["Address", "SSID", "Channel", "Signal (dbm)"]
+ return tabulate(output, headers, numalign="left")
+
+def _get_raw_station_data(intf_name):
+ # XXX: This ignores errors
+ tmp, _ = popen(f'iw dev {intf_name} station dump')
+ clients = []
+ data = {
+ 'mac': '',
+ 'signal': '',
+ 'rx_bytes': '',
+ 'rx_packets': '',
+ 'tx_bytes': '',
+ 'tx_packets': ''
+ }
+ re_mac = re.compile(r'([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})')
+ for line in tmp.splitlines():
+ if line.startswith('Station'):
+ client = deepcopy(data)
+ client['mac'] = re.search(re_mac, line).group()
+
+ elif line.lstrip().startswith('signal avg:'):
+ client['signal'] = line.lstrip().split(':')[-1].lstrip().split()[0]
+
+ elif line.lstrip().startswith('rx bytes:'):
+ client['rx_bytes'] = line.lstrip().split(':')[-1].lstrip()
+
+ elif line.lstrip().startswith('rx packets:'):
+ client['rx_packets'] = line.lstrip().split(':')[-1].lstrip()
+
+ elif line.lstrip().startswith('tx bytes:'):
+ client['tx_bytes'] = line.lstrip().split(':')[-1].lstrip()
+
+ elif line.lstrip().startswith('tx packets:'):
+ client['tx_packets'] = line.lstrip().split(':')[-1].lstrip()
+ clients.append(client)
+ continue
+
+ return clients
+
+def _format_station_data(raw_data):
+ output=[]
+ for ssid in raw_data:
+ output.append([ssid['mac'], ssid['signal'], ssid['rx_bytes'], ssid['rx_packets'], ssid['tx_bytes'], ssid['tx_packets']])
+ headers = ["Station", "Signal", "RX bytes", "RX packets", "TX bytes", "TX packets"]
+ return tabulate(output, headers, numalign="left")
+
+@_verify
+def show_info(raw: bool):
+ info_data = _get_raw_info_data()
+ if raw:
+ return info_data
+ return _get_formatted_info_output(info_data)
+
+def show_scan(raw: bool, intf_name: str):
+ data = _get_raw_scan_data(intf_name)
+ if raw:
+ return data
+ return _format_scan_data(data)
+
+@_verify
+def show_stations(raw: bool, intf_name: str):
+ data = _get_raw_station_data(intf_name)
+ if raw:
+ return data
+ return _format_station_data(data)
+
+if __name__ == '__main__':
+ try:
+ res = vyos.opmode.run(sys.modules[__name__])
+ if res:
+ print(res)
+ except (ValueError, vyos.opmode.Error) as e:
+ print(e)
+ sys.exit(1)
diff --git a/src/op_mode/lldp.py b/src/op_mode/lldp.py
index c287b8fa6..58cfce443 100755
--- a/src/op_mode/lldp.py
+++ b/src/op_mode/lldp.py
@@ -114,7 +114,10 @@ def _get_formatted_output(raw_data):
# Remote software platform
platform = jmespath.search('chassis.[*][0][0].descr', values)
- tmp.append(platform[:37])
+ if platform:
+ tmp.append(platform[:37])
+ else:
+ tmp.append('')
# Remote interface
interface = jmespath.search('port.descr', values)
diff --git a/src/op_mode/show-ssh-fingerprints.py b/src/op_mode/show-ssh-fingerprints.py
new file mode 100644
index 000000000..913baae46
--- /dev/null
+++ b/src/op_mode/show-ssh-fingerprints.py
@@ -0,0 +1,49 @@
+#!/usr/bin/env python3
+#
+# Copyright 2017-2023 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+import sys
+import glob
+import argparse
+from vyos.utils.process import cmd
+
+# Parse command line
+parser = argparse.ArgumentParser()
+parser.add_argument("--ascii", help="Show visual ASCII art representation of the public key", action="store_true")
+args = parser.parse_args()
+
+# Get list of server public keys
+publickeys = glob.glob("/etc/ssh/*.pub")
+
+if publickeys:
+ print("SSH server public key fingerprints:\n", flush=True)
+ for keyfile in publickeys:
+ if args.ascii:
+ try:
+ print(cmd("ssh-keygen -l -v -E sha256 -f " + keyfile) + "\n", flush=True)
+ # Ignore invalid public keys
+ except:
+ pass
+ else:
+ try:
+ print(cmd("ssh-keygen -l -E sha256 -f " + keyfile) + "\n", flush=True)
+ # Ignore invalid public keys
+ except:
+ pass
+else:
+ print("No SSH server public keys are found.", flush=True)
+
+sys.exit(0)
diff --git a/src/op_mode/show_wireless.py b/src/op_mode/show_wireless.py
deleted file mode 100755
index 340163057..000000000
--- a/src/op_mode/show_wireless.py
+++ /dev/null
@@ -1,149 +0,0 @@
-#!/usr/bin/env python3
-#
-# Copyright (C) 2019-2023 VyOS maintainers and contributors
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 2 or later as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import argparse
-import re
-
-from sys import exit
-from copy import deepcopy
-
-from vyos.config import Config
-from vyos.utils.process import popen
-
-parser = argparse.ArgumentParser()
-parser.add_argument("-s", "--scan", help="Scan for Wireless APs on given interface, e.g. 'wlan0'")
-parser.add_argument("-b", "--brief", action="store_true", help="Show wireless configuration")
-parser.add_argument("-c", "--stations", help="Show wireless clients connected on interface, e.g. 'wlan0'")
-
-def show_brief():
- config = Config()
- if len(config.list_effective_nodes('interfaces wireless')) == 0:
- print("No Wireless interfaces configured")
- exit(0)
-
- interfaces = []
- for intf in config.list_effective_nodes('interfaces wireless'):
- config.set_level(f'interfaces wireless {intf}')
- data = { 'name': intf }
- data['type'] = config.return_effective_value('type') or '-'
- data['ssid'] = config.return_effective_value('ssid') or '-'
- data['channel'] = config.return_effective_value('channel') or '-'
- interfaces.append(data)
-
- return interfaces
-
-def ssid_scan(intf):
- # XXX: This ignores errors
- tmp, _ = popen(f'/sbin/iw dev {intf} scan ap-force')
- networks = []
- data = {
- 'ssid': '',
- 'mac': '',
- 'channel': '',
- 'signal': ''
- }
- re_mac = re.compile(r'([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})')
- for line in tmp.splitlines():
- if line.startswith('BSS '):
- ssid = deepcopy(data)
- ssid['mac'] = re.search(re_mac, line).group()
-
- elif line.lstrip().startswith('SSID: '):
- # SSID can be " SSID: WLAN-57 6405", thus strip all leading whitespaces
- ssid['ssid'] = line.lstrip().split(':')[-1].lstrip()
-
- elif line.lstrip().startswith('signal: '):
- # Siganl can be " signal: -67.00 dBm", thus strip all leading whitespaces
- ssid['signal'] = line.lstrip().split(':')[-1].split()[0]
-
- elif line.lstrip().startswith('DS Parameter set: channel'):
- # Channel can be " DS Parameter set: channel 6" , thus
- # strip all leading whitespaces
- ssid['channel'] = line.lstrip().split(':')[-1].split()[-1]
- networks.append(ssid)
- continue
-
- return networks
-
-def show_clients(intf):
- # XXX: This ignores errors
- tmp, _ = popen(f'/sbin/iw dev {intf} station dump')
- clients = []
- data = {
- 'mac': '',
- 'signal': '',
- 'rx_bytes': '',
- 'rx_packets': '',
- 'tx_bytes': '',
- 'tx_packets': ''
- }
- re_mac = re.compile(r'([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})')
- for line in tmp.splitlines():
- if line.startswith('Station'):
- client = deepcopy(data)
- client['mac'] = re.search(re_mac, line).group()
-
- elif line.lstrip().startswith('signal avg:'):
- client['signal'] = line.lstrip().split(':')[-1].lstrip().split()[0]
-
- elif line.lstrip().startswith('rx bytes:'):
- client['rx_bytes'] = line.lstrip().split(':')[-1].lstrip()
-
- elif line.lstrip().startswith('rx packets:'):
- client['rx_packets'] = line.lstrip().split(':')[-1].lstrip()
-
- elif line.lstrip().startswith('tx bytes:'):
- client['tx_bytes'] = line.lstrip().split(':')[-1].lstrip()
-
- elif line.lstrip().startswith('tx packets:'):
- client['tx_packets'] = line.lstrip().split(':')[-1].lstrip()
- clients.append(client)
- continue
-
- return clients
-
-if __name__ == '__main__':
- args = parser.parse_args()
-
- if args.scan:
- print("Address SSID Channel Signal (dbm)")
- for network in ssid_scan(args.scan):
- print("{:<17} {:<32} {:>3} {}".format(network['mac'],
- network['ssid'],
- network['channel'],
- network['signal']))
- exit(0)
-
- elif args.brief:
- print("Interface Type SSID Channel")
- for intf in show_brief():
- print("{:<9} {:<12} {:<32} {:>3}".format(intf['name'],
- intf['type'],
- intf['ssid'],
- intf['channel']))
- exit(0)
-
- elif args.stations:
- print("Station Signal RX: bytes packets TX: bytes packets")
- for client in show_clients(args.stations):
- print("{:<17} {:>3} {:>15} {:>9} {:>15} {:>10} ".format(client['mac'],
- client['signal'], client['rx_bytes'], client['rx_packets'], client['tx_bytes'], client['tx_packets']))
-
- exit(0)
-
- else:
- parser.print_help()
- exit(1)