1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
|
#!/usr/bin/env python3
#
# Copyright (C) 2023 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import argparse
import glob
from datetime import datetime
from pathlib import Path
from shutil import rmtree
from socket import gethostname
from sys import exit
from tarfile import open as tar_open
from vyos.utils.process import rc_cmd
from vyos.remote import upload
def op(cmd: str) -> str:
"""Returns a command with the VyOS operational mode wrapper."""
return f'/opt/vyatta/bin/vyatta-op-cmd-wrapper {cmd}'
def save_stdout(command: str, file: Path) -> None:
rc, stdout = rc_cmd(command)
body: str = f'''### {command} ###
Command: {command}
Exit code: {rc}
Stdout:
{stdout}
'''
with file.open(mode='a') as f:
f.write(body)
def __rotate_logs(path: str, log_pattern:str):
files_list = glob.glob(f'{path}/{log_pattern}')
if len(files_list) > 5:
oldest_file = min(files_list, key=os.path.getctime)
os.remove(oldest_file)
def __generate_archived_files(location_path: str) -> None:
"""
Generate arhives of main directories
:param location_path: path to temporary directory
:type location_path: str
"""
# Dictionary arhive_name:directory_to_arhive
archive_dict = {
'etc': '/etc',
'home': '/home',
'var-log': '/var/log',
'root': '/root',
'tmp': '/tmp',
'core-dump': '/var/core',
'config': '/opt/vyatta/etc/config'
}
# Dictionary arhive_name:excluding pattern
archive_excludes = {
# Old location of archives
'config': 'tech-support-archive',
# New locations of arhives
'tmp': 'tech-support-archive'
}
for archive_name, path in archive_dict.items():
archive_file: str = f'{location_path}/{archive_name}.tar.gz'
with tar_open(name=archive_file, mode='x:gz') as tar_file:
if archive_name in archive_excludes:
tar_file.add(path, filter=lambda x: None if str(archive_excludes[archive_name]) in str(x.name) else x)
else:
tar_file.add(path)
def __generate_main_archive_file(archive_file: str, tmp_dir_path: str) -> None:
"""
Generate main arhive file
:param archive_file: name of arhive file
:type archive_file: str
:param tmp_dir_path: path to arhive memeber
:type tmp_dir_path: str
"""
with tar_open(name=archive_file, mode='x:gz') as tar_file:
tar_file.add(tmp_dir_path, arcname=os.path.basename(tmp_dir_path))
if __name__ == '__main__':
defualt_tmp_dir = '/tmp'
parser = argparse.ArgumentParser()
parser.add_argument("path", nargs='?', default=defualt_tmp_dir)
args = parser.parse_args()
location_path = args.path[:-1] if args.path[-1] == '/' else args.path
hostname: str = gethostname()
time_now: str = datetime.now().isoformat(timespec='seconds').replace(":", "-")
remote = False
tmp_path = ''
tmp_dir_path = ''
if 'ftp://' in args.path or 'scp://' in args.path:
remote = True
tmp_path = defualt_tmp_dir
else:
tmp_path = location_path
archive_pattern = f'_tech-support-archive_'
archive_file_name = f'{hostname}{archive_pattern}{time_now}.tar.gz'
# Log rotation in tmp directory
if tmp_path == defualt_tmp_dir:
__rotate_logs(tmp_path, f'*{archive_pattern}*')
# Temporary directory creation
tmp_dir_path = f'{tmp_path}/drops-debug_{time_now}'
tmp_dir: Path = Path(tmp_dir_path)
tmp_dir.mkdir()
report_file: Path = Path(f'{tmp_dir_path}/show_tech-support_report.txt')
report_file.touch()
try:
save_stdout(op('show tech-support report'), report_file)
# Generate included archives
__generate_archived_files(tmp_dir_path)
# Generate main archive
__generate_main_archive_file(f'{tmp_path}/{archive_file_name}', tmp_dir_path)
# Delete temporary directory
rmtree(tmp_dir)
# Upload to remote site if it is scpecified
if remote:
upload(f'{tmp_path}/{archive_file_name}', args.path)
print(f'Debug file is generated and located in {location_path}/{archive_file_name}')
except Exception as err:
print(f'Error during generating a debug file: {err}')
# cleanup
if tmp_dir.exists():
rmtree(tmp_dir)
finally:
# cleanup
exit()
|