diff options
Diffstat (limited to 'src/op_mode')
-rwxr-xr-x | src/op_mode/dhcp.py | 18 | ||||
-rwxr-xr-x | src/op_mode/firewall.py | 159 | ||||
-rwxr-xr-x | src/op_mode/image_installer.py | 193 | ||||
-rwxr-xr-x | src/op_mode/interfaces.py | 138 | ||||
-rwxr-xr-x | src/op_mode/load-balancing_wan.py | 117 | ||||
-rwxr-xr-x | src/op_mode/qos.py | 2 | ||||
-rwxr-xr-x | src/op_mode/restart.py | 5 | ||||
-rwxr-xr-x | src/op_mode/stp.py | 185 | ||||
-rw-r--r-- | src/op_mode/tech_support.py | 10 |
9 files changed, 724 insertions, 103 deletions
diff --git a/src/op_mode/dhcp.py b/src/op_mode/dhcp.py index 8eed2c6cd..725bfc75b 100755 --- a/src/op_mode/dhcp.py +++ b/src/op_mode/dhcp.py @@ -205,7 +205,7 @@ def _get_raw_server_pool_statistics(config, family='inet', pool=None): return stats -def _get_formatted_server_pool_statistics(pool_data, family='inet'): +def _get_formatted_server_pool_statistics(pool_data): data_entries = [] for entry in pool_data: pool = entry.get('pool') @@ -235,7 +235,7 @@ def _get_raw_server_static_mappings(config, family='inet', pool=None, sorted=Non return mappings -def _get_formatted_server_static_mappings(raw_data, family='inet'): +def _get_formatted_server_static_mappings(raw_data): data_entries = [] for entry in raw_data: @@ -245,10 +245,8 @@ def _get_formatted_server_static_mappings(raw_data, family='inet'): ip_addr = entry.get('ip', 'N/A') mac_addr = entry.get('mac', 'N/A') duid = entry.get('duid', 'N/A') - description = entry.get('description', 'N/A') - data_entries.append( - [pool, subnet, hostname, ip_addr, mac_addr, duid, description] - ) + desc = entry.get('description', 'N/A') + data_entries.append([pool, subnet, hostname, ip_addr, mac_addr, duid, desc]) headers = [ 'Pool', @@ -327,7 +325,7 @@ def show_server_pool_statistics( if raw: return pool_data else: - return _get_formatted_server_pool_statistics(pool_data, family=family) + return _get_formatted_server_pool_statistics(pool_data) @_verify_server @@ -408,7 +406,7 @@ def show_server_static_mappings( if raw: return static_mappings else: - return _get_formatted_server_static_mappings(static_mappings, family=family) + return _get_formatted_server_static_mappings(static_mappings) def _lease_valid(inet, address): @@ -482,7 +480,7 @@ def _get_raw_client_leases(family='inet', interface=None): return lease_data -def _get_formatted_client_leases(lease_data, family): +def _get_formatted_client_leases(lease_data): from time import localtime from time import strftime @@ -534,7 +532,7 @@ def show_client_leases(raw: bool, family: ArgFamily, interface: typing.Optional[ if raw: return lease_data else: - return _get_formatted_client_leases(lease_data, family=family) + return _get_formatted_client_leases(lease_data) @_verify_client diff --git a/src/op_mode/firewall.py b/src/op_mode/firewall.py index c197ca434..f3309ee34 100755 --- a/src/op_mode/firewall.py +++ b/src/op_mode/firewall.py @@ -18,6 +18,7 @@ import argparse import ipaddress import json import re +from signal import signal, SIGPIPE, SIG_DFL import tabulate import textwrap @@ -25,6 +26,9 @@ from vyos.config import Config from vyos.utils.process import cmd from vyos.utils.dict import dict_search_args +signal(SIGPIPE, SIG_DFL) + + def get_config_node(conf, node=None, family=None, hook=None, priority=None): if node == 'nat': if family == 'ipv6': @@ -148,6 +152,38 @@ def get_nftables_group_members(family, table, name): return out +def get_nftables_remote_group_members(family, table, name): + prefix = 'ip6' if family == 'ipv6' else 'ip' + out = [] + + try: + results_str = cmd(f'nft -j list set {prefix} {table} {name}') + results = json.loads(results_str) + except: + return out + + if 'nftables' not in results: + return out + + for obj in results['nftables']: + if 'set' not in obj: + continue + + set_obj = obj['set'] + if 'elem' in set_obj: + for elem in set_obj['elem']: + # search for single IP elements + if isinstance(elem, str): + out.append(elem) + # search for prefix elements + elif isinstance(elem, dict) and 'prefix' in elem: + out.append(f"{elem['prefix']['addr']}/{elem['prefix']['len']}") + # search for IP range elements + elif isinstance(elem, dict) and 'range' in elem: + out.append(f"{elem['range'][0]}-{elem['range'][1]}") + + return out + def output_firewall_vertical(rules, headers, adjust=True): for rule in rules: adjusted_rule = rule + [""] * (len(headers) - len(rule)) if adjust else rule # account for different header length, like default-action @@ -253,15 +289,17 @@ def output_firewall_name_statistics(family, hook, prior, prior_conf, single_rule if not source_addr: source_addr = dict_search_args(rule_conf, 'source', 'group', 'domain_group') if not source_addr: - source_addr = dict_search_args(rule_conf, 'source', 'fqdn') + source_addr = dict_search_args(rule_conf, 'source', 'group', 'remote_group') if not source_addr: - source_addr = dict_search_args(rule_conf, 'source', 'geoip', 'country_code') - if source_addr: - source_addr = str(source_addr)[1:-1].replace('\'','') - if 'inverse_match' in dict_search_args(rule_conf, 'source', 'geoip'): - source_addr = 'NOT ' + str(source_addr) + source_addr = dict_search_args(rule_conf, 'source', 'fqdn') if not source_addr: - source_addr = 'any' + source_addr = dict_search_args(rule_conf, 'source', 'geoip', 'country_code') + if source_addr: + source_addr = str(source_addr)[1:-1].replace('\'','') + if 'inverse_match' in dict_search_args(rule_conf, 'source', 'geoip'): + source_addr = 'NOT ' + str(source_addr) + if not source_addr: + source_addr = 'any' # Get destination dest_addr = dict_search_args(rule_conf, 'destination', 'address') @@ -272,15 +310,17 @@ def output_firewall_name_statistics(family, hook, prior, prior_conf, single_rule if not dest_addr: dest_addr = dict_search_args(rule_conf, 'destination', 'group', 'domain_group') if not dest_addr: - dest_addr = dict_search_args(rule_conf, 'destination', 'fqdn') + dest_addr = dict_search_args(rule_conf, 'destination', 'group', 'remote_group') if not dest_addr: - dest_addr = dict_search_args(rule_conf, 'destination', 'geoip', 'country_code') - if dest_addr: - dest_addr = str(dest_addr)[1:-1].replace('\'','') - if 'inverse_match' in dict_search_args(rule_conf, 'destination', 'geoip'): - dest_addr = 'NOT ' + str(dest_addr) + dest_addr = dict_search_args(rule_conf, 'destination', 'fqdn') if not dest_addr: - dest_addr = 'any' + dest_addr = dict_search_args(rule_conf, 'destination', 'geoip', 'country_code') + if dest_addr: + dest_addr = str(dest_addr)[1:-1].replace('\'','') + if 'inverse_match' in dict_search_args(rule_conf, 'destination', 'geoip'): + dest_addr = 'NOT ' + str(dest_addr) + if not dest_addr: + dest_addr = 'any' # Get inbound interface iiface = dict_search_args(rule_conf, 'inbound_interface', 'name') @@ -552,30 +592,8 @@ def show_firewall_group(name=None): header_tail = [] for group_type, group_type_conf in firewall['group'].items(): - ## - if group_type != 'dynamic_group': - - for group_name, group_conf in group_type_conf.items(): - if name and name != group_name: - continue - - references = find_references(group_type, group_name) - row = [group_name, textwrap.fill(group_conf.get('description') or '', 50), group_type, '\n'.join(references) or 'N/D'] - if 'address' in group_conf: - row.append("\n".join(sorted(group_conf['address']))) - elif 'network' in group_conf: - row.append("\n".join(sorted(group_conf['network'], key=ipaddress.ip_network))) - elif 'mac_address' in group_conf: - row.append("\n".join(sorted(group_conf['mac_address']))) - elif 'port' in group_conf: - row.append("\n".join(sorted(group_conf['port']))) - elif 'interface' in group_conf: - row.append("\n".join(sorted(group_conf['interface']))) - else: - row.append('N/D') - rows.append(row) - - else: + # interate over dynamic-groups + if group_type == 'dynamic_group': if not args.detail: header_tail = ['Timeout', 'Expires'] @@ -584,6 +602,9 @@ def show_firewall_group(name=None): prefix = 'DA_' if dynamic_type == 'address_group' else 'DA6_' if dynamic_type in firewall['group']['dynamic_group']: for dynamic_name, dynamic_conf in firewall['group']['dynamic_group'][dynamic_type].items(): + if name and name != dynamic_name: + continue + references = find_references(dynamic_type, dynamic_name) row = [dynamic_name, textwrap.fill(dynamic_conf.get('description') or '', 50), dynamic_type + '(dynamic)', '\n'.join(references) or 'N/D'] @@ -622,6 +643,68 @@ def show_firewall_group(name=None): header_tail += [""] * (len(members) - 1) rows.append(row) + # iterate over remote-groups + elif group_type == 'remote_group': + for remote_name, remote_conf in group_type_conf.items(): + if name and name != remote_name: + continue + + references = find_references(group_type, remote_name) + row = [remote_name, textwrap.fill(remote_conf.get('description') or '', 50), group_type, '\n'.join(references) or 'N/D'] + members = get_nftables_remote_group_members("ipv4", 'vyos_filter', f'R_{remote_name}') + members6 = get_nftables_remote_group_members("ipv6", 'vyos_filter', f'R6_{remote_name}') + + if 'url' in remote_conf: + # display only the url if no members are found for both views + if not members and not members6: + if args.detail: + header_tail = ['IPv6 Members', 'Remote URL'] + row.append('N/D') + row.append('N/D') + row.append(remote_conf['url']) + else: + row.append(remote_conf['url']) + rows.append(row) + else: + # display all table elements in detail view + if args.detail: + header_tail = ['IPv6 Members', 'Remote URL'] + if members: + row.append(' '.join(members)) + else: + row.append('N/D') + if members6: + row.append(' '.join(members6)) + else: + row.append('N/D') + row.append(remote_conf['url']) + rows.append(row) + else: + row.append(remote_conf['url']) + rows.append(row) + + # catch the rest of the group types + else: + for group_name, group_conf in group_type_conf.items(): + if name and name != group_name: + continue + + references = find_references(group_type, group_name) + row = [group_name, textwrap.fill(group_conf.get('description') or '', 50), group_type, '\n'.join(references) or 'N/D'] + if 'address' in group_conf: + row.append("\n".join(sorted(group_conf['address']))) + elif 'network' in group_conf: + row.append("\n".join(sorted(group_conf['network'], key=ipaddress.ip_network))) + elif 'mac_address' in group_conf: + row.append("\n".join(sorted(group_conf['mac_address']))) + elif 'port' in group_conf: + row.append("\n".join(sorted(group_conf['port']))) + elif 'interface' in group_conf: + row.append("\n".join(sorted(group_conf['interface']))) + else: + row.append('N/D') + rows.append(row) + if rows: print('Firewall Groups\n') if args.detail: diff --git a/src/op_mode/image_installer.py b/src/op_mode/image_installer.py index 1da112673..ac5a84419 100755 --- a/src/op_mode/image_installer.py +++ b/src/op_mode/image_installer.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright 2023-2024 VyOS maintainers and contributors <maintainers@vyos.io> +# Copyright 2023-2025 VyOS maintainers and contributors <maintainers@vyos.io> # # This file is part of VyOS. # @@ -24,7 +24,9 @@ from glob import glob from sys import exit from os import environ from os import readlink -from os import getpid, getppid +from os import getpid +from os import getppid +from json import loads from typing import Union from urllib.parse import urlparse from passlib.hosts import linux_context @@ -32,12 +34,26 @@ from errno import ENOSPC from psutil import disk_partitions +from vyos.base import Warning from vyos.configtree import ConfigTree from vyos.remote import download -from vyos.system import disk, grub, image, compat, raid, SYSTEM_CFG_VER +from vyos.system import disk +from vyos.system import grub +from vyos.system import image +from vyos.system import compat +from vyos.system import raid +from vyos.system import SYSTEM_CFG_VER +from vyos.system import grub_util from vyos.template import render +from vyos.utils.auth import ( + DEFAULT_PASSWORD, + EPasswdStrength, + evaluate_strength +) +from vyos.utils.dict import dict_search from vyos.utils.io import ask_input, ask_yes_no, select_entry from vyos.utils.file import chmod_2775 +from vyos.utils.file import read_file from vyos.utils.process import cmd, run, rc_cmd from vyos.version import get_version_data @@ -46,7 +62,13 @@ MSG_ERR_NOT_LIVE: str = 'The system is already installed. Please use "add system MSG_ERR_LIVE: str = 'The system is in live-boot mode. Please use "install image" instead.' MSG_ERR_NO_DISK: str = 'No suitable disk was found. There must be at least one disk of 2GB or greater size.' MSG_ERR_IMPROPER_IMAGE: str = 'Missing sha256sum.txt.\nEither this image is corrupted, or of era 1.2.x (md5sum) and would downgrade image tools;\ndisallowed in either case.' -MSG_ERR_ARCHITECTURE_MISMATCH: str = 'Upgrading to a different image architecture will break your system.' +MSG_ERR_INCOMPATIBLE_IMAGE: str = 'Image compatibility check failed, aborting installation.' +MSG_ERR_ARCHITECTURE_MISMATCH: str = 'The current architecture is "{0}", the new image is for "{1}". Upgrading to a different image architecture will break your system.' +MSG_ERR_FLAVOR_MISMATCH: str = 'The current image flavor is "{0}", the new image is "{1}". Upgrading to a non-matching flavor can have unpredictable consequences.' +MSG_ERR_MISSING_ARCHITECTURE: str = 'The new image version data does not specify architecture, cannot check compatibility (is it a legacy release image?)' +MSG_ERR_MISSING_FLAVOR: str = 'The new image version data does not specify flavor, cannot check compatibility (is it a legacy release image?)' +MSG_ERR_CORRUPT_CURRENT_IMAGE: str = 'Version data in the current image is malformed: missing flavor and/or architecture fields. Upgrade compatibility cannot be checked.' +MSG_ERR_UNSUPPORTED_SIGNATURE_TYPE: str = 'Unsupported signature type, signature cannot be verified.' MSG_INFO_INSTALL_WELCOME: str = 'Welcome to VyOS installation!\nThis command will install VyOS to your permanent storage.' MSG_INFO_INSTALL_EXIT: str = 'Exiting from VyOS installation' MSG_INFO_INSTALL_SUCCESS: str = 'The image installed successfully; please reboot now.' @@ -62,6 +84,7 @@ MSG_INPUT_CONFIG_FOUND: str = 'An active configuration was found. Would you like MSG_INPUT_CONFIG_CHOICE: str = 'The following config files are available for boot:' MSG_INPUT_CONFIG_CHOOSE: str = 'Which file would you like as boot config?' MSG_INPUT_IMAGE_NAME: str = 'What would you like to name this image?' +MSG_INPUT_IMAGE_NAME_TAKEN: str = 'There is already an installed image by that name; please choose again' MSG_INPUT_IMAGE_DEFAULT: str = 'Would you like to set the new image as the default one for boot?' MSG_INPUT_PASSWORD: str = 'Please enter a password for the "vyos" user:' MSG_INPUT_PASSWORD_CONFIRM: str = 'Please confirm password for the "vyos" user:' @@ -78,8 +101,10 @@ MSG_WARN_ROOT_SIZE_TOOBIG: str = 'The size is too big. Try again.' MSG_WARN_ROOT_SIZE_TOOSMALL: str = 'The size is too small. Try again' MSG_WARN_IMAGE_NAME_WRONG: str = 'The suggested name is unsupported!\n'\ 'It must be between 1 and 64 characters long and contains only the next characters: .+-_ a-z A-Z 0-9' + +MSG_WARN_CHANGE_PASSWORD: str = 'Default password used. Consider changing ' \ + 'it on next login.' MSG_WARN_PASSWORD_CONFIRM: str = 'The entered values did not match. Try again' -MSG_WARN_FLAVOR_MISMATCH: str = 'The running image flavor is "{0}". The new image flavor is "{1}".\n' \ 'Installing a different image flavor may cause functionality degradation or break your system.\n' \ 'Do you want to continue with installation?' CONST_MIN_DISK_SIZE: int = 2147483648 # 2 GB @@ -95,7 +120,7 @@ DIR_ISO_MOUNT: str = f'{DIR_INSTALLATION}/iso_src' DIR_DST_ROOT: str = f'{DIR_INSTALLATION}/disk_dst' DIR_KERNEL_SRC: str = '/boot/' FILE_ROOTFS_SRC: str = '/usr/lib/live/mount/medium/live/filesystem.squashfs' -ISO_DOWNLOAD_PATH: str = '/tmp/vyos_installation.iso' +ISO_DOWNLOAD_PATH: str = '' external_download_script = '/usr/libexec/vyos/simple-download.py' external_latest_image_url_script = '/usr/libexec/vyos/latest-image-url.py' @@ -462,6 +487,29 @@ def setup_grub(root_dir: str) -> None: render(grub_cfg_menu, grub.TMPL_GRUB_MENU, {}) render(grub_cfg_options, grub.TMPL_GRUB_OPTS, {}) +def get_cli_kernel_options(config_file: str) -> list: + config = ConfigTree(read_file(config_file)) + config_dict = loads(config.to_json()) + kernel_options = dict_search('system.option.kernel', config_dict) + if kernel_options is None: + kernel_options = {} + cmdline_options = [] + + # XXX: This code path and if statements must be kept in sync with the Kernel + # option handling in system_options.py:generate(). This occurance is used + # for having the appropriate options passed to GRUB after an image upgrade! + if 'disable-mitigations' in kernel_options: + cmdline_options.append('mitigations=off') + if 'disable-power-saving' in kernel_options: + cmdline_options.append('intel_idle.max_cstate=0 processor.max_cstate=1') + if 'amd-pstate-driver' in kernel_options: + mode = kernel_options['amd-pstate-driver'] + cmdline_options.append( + f'initcall_blacklist=acpi_cpufreq_init amd_pstate={mode}') + if 'quiet' in kernel_options: + cmdline_options.append('quiet') + + return cmdline_options def configure_authentication(config_file: str, password: str) -> None: """Write encrypted password to config file @@ -476,10 +524,7 @@ def configure_authentication(config_file: str, password: str) -> None: plaintext exposed """ encrypted_password = linux_context.hash(password) - - with open(config_file) as f: - config_string = f.read() - + config_string = read_file(config_file) config = ConfigTree(config_string) config.set([ 'system', 'login', 'user', 'vyos', 'authentication', @@ -501,7 +546,6 @@ def validate_signature(file_path: str, sign_type: str) -> None: """ print('Validating signature') signature_valid: bool = False - # validate with minisig if sign_type == 'minisig': pub_key_list = glob('/usr/share/vyos/keys/*.minisign.pub') for pubkey in pub_key_list: @@ -510,11 +554,8 @@ def validate_signature(file_path: str, sign_type: str) -> None: signature_valid = True break Path(f'{file_path}.minisig').unlink() - # validate with GPG - if sign_type == 'asc': - if run(f'gpg --verify ${file_path}.asc ${file_path}') == 0: - signature_valid = True - Path(f'{file_path}.asc').unlink() + else: + exit(MSG_ERR_UNSUPPORTED_SIGNATURE_TYPE) # warn or pass if not signature_valid: @@ -524,21 +565,18 @@ def validate_signature(file_path: str, sign_type: str) -> None: print('Signature is valid') def download_file(local_file: str, remote_path: str, vrf: str, - username: str, password: str, progressbar: bool = False, check_space: bool = False): - environ['REMOTE_USERNAME'] = username - environ['REMOTE_PASSWORD'] = password + # Server credentials are implicitly passed in environment variables + # that are set by add_image if vrf is None: download(local_file, remote_path, progressbar=progressbar, check_space=check_space, raise_error=True) else: - remote_auth = f'REMOTE_USERNAME={username} REMOTE_PASSWORD={password}' vrf_cmd = f'ip vrf exec {vrf} {external_download_script} \ --local-file {local_file} --remote-path {remote_path}' - cmd(vrf_cmd, auth=remote_auth) + cmd(vrf_cmd, env=environ) def image_fetch(image_path: str, vrf: str = None, - username: str = '', password: str = '', no_prompt: bool = False) -> Path: """Fetch an ISO image @@ -548,13 +586,17 @@ def image_fetch(image_path: str, vrf: str = None, Returns: Path: a path to a local file """ + import os.path + from uuid import uuid4 + + global ISO_DOWNLOAD_PATH + # Latest version gets url from configured "system update-check url" if image_path == 'latest': command = external_latest_image_url_script if vrf: - command = f'REMOTE_USERNAME={username} REMOTE_PASSWORD={password} \ - ip vrf exec {vrf} ' + command - code, output = rc_cmd(command) + command = f'ip vrf exec {vrf} {command}' + code, output = rc_cmd(command, env=environ) if code: print(output) exit(MSG_INFO_INSTALL_EXIT) @@ -563,23 +605,25 @@ def image_fetch(image_path: str, vrf: str = None, try: # check a type of path if urlparse(image_path).scheme: - # download an image + # Download the image file + ISO_DOWNLOAD_PATH = os.path.join(os.path.expanduser("~"), '{0}.iso'.format(uuid4())) download_file(ISO_DOWNLOAD_PATH, image_path, vrf, - username, password, progressbar=True, check_space=True) - # download a signature + # Download the image signature + # VyOS only supports minisign signatures at the moment, + # but we keep the logic for multiple signatures + # in case we add something new in the future sign_file = (False, '') - for sign_type in ['minisig', 'asc']: + for sign_type in ['minisig']: try: download_file(f'{ISO_DOWNLOAD_PATH}.{sign_type}', - f'{image_path}.{sign_type}', vrf, - username, password) + f'{image_path}.{sign_type}', vrf) sign_file = (True, sign_type) break except Exception: - print(f'{sign_type} signature is not available') - # validate a signature if it is available + print(f'Could not download {sign_type} signature') + # Validate the signature if it is available if sign_file[0]: validate_signature(ISO_DOWNLOAD_PATH, sign_file[1]) else: @@ -701,30 +745,48 @@ def is_raid_install(install_object: Union[disk.DiskDetails, raid.RaidDetails]) - return False -def validate_compatibility(iso_path: str) -> None: +def validate_compatibility(iso_path: str, force: bool = False) -> None: """Check architecture and flavor compatibility with the running image Args: iso_path (str): a path to the mounted ISO image """ - old_data = get_version_data() - old_flavor = old_data.get('flavor', '') - old_architecture = old_data.get('architecture') or cmd('dpkg --print-architecture') + current_data = get_version_data() + current_flavor = current_data.get('flavor') + current_architecture = current_data.get('architecture') or cmd('dpkg --print-architecture') new_data = get_version_data(f'{iso_path}/version.json') - new_flavor = new_data.get('flavor', '') - new_architecture = new_data.get('architecture', '') + new_flavor = new_data.get('flavor') + new_architecture = new_data.get('architecture') - if not old_architecture == new_architecture: - print(MSG_ERR_ARCHITECTURE_MISMATCH) + if not current_flavor or not current_architecture: + # This may only happen if someone modified the version file. + # Unlikely but not impossible. + print(MSG_ERR_CORRUPT_CURRENT_IMAGE) cleanup() exit(MSG_INFO_INSTALL_EXIT) - if not old_flavor == new_flavor: - if not ask_yes_no(MSG_WARN_FLAVOR_MISMATCH.format(old_flavor, new_flavor), default=False): - cleanup() - exit(MSG_INFO_INSTALL_EXIT) + success = True + + if current_architecture != new_architecture: + success = False + if not new_architecture: + print(MSG_ERR_MISSING_ARCHITECTURE) + else: + print(MSG_ERR_ARCHITECTURE_MISMATCH.format(current_architecture, new_architecture)) + + if current_flavor != new_flavor: + if not force: + success = False + if not new_flavor: + print(MSG_ERR_MISSING_FLAVOR) + else: + print(MSG_ERR_FLAVOR_MISMATCH.format(current_flavor, new_flavor)) + if not success: + print(MSG_ERR_INCOMPATIBLE_IMAGE) + cleanup() + exit(MSG_INFO_INSTALL_EXIT) def install_image() -> None: """Install an image to a disk @@ -746,14 +808,25 @@ def install_image() -> None: break print(MSG_WARN_IMAGE_NAME_WRONG) + failed_check_status = [EPasswdStrength.WEAK, EPasswdStrength.ERROR] # ask for password while True: user_password: str = ask_input(MSG_INPUT_PASSWORD, no_echo=True, non_empty=True) + + if user_password == DEFAULT_PASSWORD: + Warning(MSG_WARN_CHANGE_PASSWORD) + else: + result = evaluate_strength(user_password) + if result['strength'] in failed_check_status: + Warning(result['error']) + confirm: str = ask_input(MSG_INPUT_PASSWORD_CONFIRM, no_echo=True, non_empty=True) + if user_password == confirm: break + print(MSG_WARN_PASSWORD_CONFIRM) # ask for default console @@ -849,8 +922,7 @@ def install_image() -> None: for disk_target in l: disk.partition_mount(disk_target.partition['efi'], f'{DIR_DST_ROOT}/boot/efi') grub.install(disk_target.name, f'{DIR_DST_ROOT}/boot/', - f'{DIR_DST_ROOT}/boot/efi', - id=f'VyOS (RAID disk {l.index(disk_target) + 1})') + f'{DIR_DST_ROOT}/boot/efi') disk.partition_umount(disk_target.partition['efi']) else: print('Installing GRUB to the drive') @@ -893,7 +965,7 @@ def install_image() -> None: @compat.grub_cfg_update def add_image(image_path: str, vrf: str = None, username: str = '', - password: str = '', no_prompt: bool = False) -> None: + password: str = '', no_prompt: bool = False, force: bool = False) -> None: """Add a new image Args: @@ -902,15 +974,18 @@ def add_image(image_path: str, vrf: str = None, username: str = '', if image.is_live_boot(): exit(MSG_ERR_LIVE) + environ['REMOTE_USERNAME'] = username + environ['REMOTE_PASSWORD'] = password + # fetch an image - iso_path: Path = image_fetch(image_path, vrf, username, password, no_prompt) + iso_path: Path = image_fetch(image_path, vrf, no_prompt) try: # mount an ISO Path(DIR_ISO_MOUNT).mkdir(mode=0o755, parents=True) disk.partition_mount(iso_path, DIR_ISO_MOUNT, 'iso9660') print('Validating image compatibility') - validate_compatibility(DIR_ISO_MOUNT) + validate_compatibility(DIR_ISO_MOUNT, force=force) # check sums print('Validating image checksums') @@ -936,8 +1011,12 @@ def add_image(image_path: str, vrf: str = None, username: str = '', f'Adding image would downgrade image tools to v.{cfg_ver}; disallowed') if not no_prompt: + versions = grub.version_list() while True: image_name: str = ask_input(MSG_INPUT_IMAGE_NAME, version_name) + if image_name in versions: + print(MSG_INPUT_IMAGE_NAME_TAKEN) + continue if image.validate_name(image_name): break print(MSG_WARN_IMAGE_NAME_WRONG) @@ -959,7 +1038,7 @@ def add_image(image_path: str, vrf: str = None, username: str = '', Path(target_config_dir).mkdir(parents=True) chown(target_config_dir, group='vyattacfg') chmod_2775(target_config_dir) - copytree('/opt/vyatta/etc/config/', target_config_dir, + copytree('/opt/vyatta/etc/config/', target_config_dir, symlinks=True, copy_function=copy_preserve_owner, dirs_exist_ok=True) else: Path(target_config_dir).mkdir(parents=True) @@ -992,6 +1071,12 @@ def add_image(image_path: str, vrf: str = None, username: str = '', if set_as_default: grub.set_default(image_name, root_dir) + cmdline_options = get_cli_kernel_options( + f'{target_config_dir}/config.boot') + grub_util.update_kernel_cmdline_options(' '.join(cmdline_options), + root_dir=root_dir, + version=image_name) + except OSError as e: # if no space error, remove image dir and cleanup if e.errno == ENOSPC: @@ -1031,6 +1116,9 @@ def parse_arguments() -> Namespace: parser.add_argument('--image-path', help='a path (HTTP or local file) to an image that needs to be installed' ) + parser.add_argument('--force', action='store_true', + help='Ignore flavor compatibility requirements.' + ) # parser.add_argument('--image_new_name', help='a new name for image') args: Namespace = parser.parse_args() # Validate arguments @@ -1047,7 +1135,8 @@ if __name__ == '__main__': install_image() if args.action == 'add': add_image(args.image_path, args.vrf, - args.username, args.password, args.no_prompt) + args.username, args.password, + args.no_prompt, args.force) exit() diff --git a/src/op_mode/interfaces.py b/src/op_mode/interfaces.py index e7afc4caa..c97f3b129 100755 --- a/src/op_mode/interfaces.py +++ b/src/op_mode/interfaces.py @@ -29,6 +29,7 @@ from vyos.ifconfig import Section from vyos.ifconfig import Interface from vyos.ifconfig import VRRP from vyos.utils.process import cmd +from vyos.utils.network import interface_exists from vyos.utils.process import rc_cmd from vyos.utils.process import call @@ -84,6 +85,14 @@ def filtered_interfaces(ifnames: typing.Union[str, list], yield interface +def detailed_output(dataset, headers): + for data in dataset: + adjusted_rule = data + [""] * (len(headers) - len(data)) # account for different header length, like default-action + transformed_rule = [[header, adjusted_rule[i]] for i, header in enumerate(headers) if i < len(adjusted_rule)] # create key-pair list from headers and rules lists; wrap at 100 char + + print(tabulate(transformed_rule, tablefmt="presto")) + print() + def _split_text(text, used=0): """ take a string and attempt to split it to fit with the width of the screen @@ -296,6 +305,114 @@ def _get_counter_data(ifname: typing.Optional[str], return ret +def _get_kernel_data(raw, ifname = None, detail = False): + if ifname: + # Check if the interface exists + if not interface_exists(ifname): + raise vyos.opmode.IncorrectValue(f"{ifname} does not exist!") + int_name = f'dev {ifname}' + else: + int_name = '' + + kernel_interface = json.loads(cmd(f'ip -j -d -s address show {int_name}')) + + # Return early if raw + if raw: + return kernel_interface, None + + # Format the kernel data + kernel_interface_out = _format_kernel_data(kernel_interface, detail) + + return kernel_interface, kernel_interface_out + +def _format_kernel_data(data, detail): + output_list = [] + tmpInfo = {} + + # Sort interfaces by name + for interface in sorted(data, key=lambda x: x.get('ifname', '')): + if interface.get('linkinfo', {}).get('info_kind') == 'vrf': + continue + + # Get the device model; ex. Intel Corporation Ethernet Controller I225-V + dev_model = interface.get('parentdev', '') + if 'parentdev' in interface: + parentdev = interface['parentdev'] + if re.match(r'^[0-9a-fA-F]{4}:', parentdev): + dev_model = cmd(f'lspci -nn -s {parentdev}').split(']:')[1].strip() + + # Get the IP addresses on interface + ip_list = [] + has_global = False + + for ip in interface['addr_info']: + if ip.get('scope') in ('global', 'host'): + has_global = True + local = ip.get('local', '-') + prefixlen = ip.get('prefixlen', '') + ip_list.append(f"{local}/{prefixlen}") + + + # If no global IP address, add '-'; indicates no IP address on interface + if not has_global: + ip_list.append('-') + + sl_status = ('A' if not 'UP' in interface['flags'] else 'u') + '/' + ('D' if interface['operstate'] == 'DOWN' else 'u') + + # Generate temporary dict to hold data + tmpInfo['ifname'] = interface.get('ifname', '') + tmpInfo['ip'] = ip_list + tmpInfo['mac'] = interface.get('address', '') + tmpInfo['mtu'] = interface.get('mtu', '') + tmpInfo['vrf'] = interface.get('master', 'default') + tmpInfo['status'] = sl_status + tmpInfo['description'] = interface.get('ifalias', '') + tmpInfo['device'] = dev_model + tmpInfo['alternate_names'] = interface.get('altnames', '') + tmpInfo['minimum_mtu'] = interface.get('min_mtu', '') + tmpInfo['maximum_mtu'] = interface.get('max_mtu', '') + rx_stats = interface.get('stats64', {}).get('rx') + tx_stats = interface.get('stats64', {}).get('tx') + tmpInfo['rx_packets'] = rx_stats.get('packets', "") + tmpInfo['rx_bytes'] = rx_stats.get('bytes', "") + tmpInfo['rx_errors'] = rx_stats.get('errors', "") + tmpInfo['rx_dropped'] = rx_stats.get('dropped', "") + tmpInfo['rx_over_errors'] = rx_stats.get('over_errors', '') + tmpInfo['multicast'] = rx_stats.get('multicast', "") + tmpInfo['tx_packets'] = tx_stats.get('packets', "") + tmpInfo['tx_bytes'] = tx_stats.get('bytes', "") + tmpInfo['tx_errors'] = tx_stats.get('errors', "") + tmpInfo['tx_dropped'] = tx_stats.get('dropped', "") + tmpInfo['tx_carrier_errors'] = tx_stats.get('carrier_errors', "") + tmpInfo['tx_collisions'] = tx_stats.get('collisions', "") + + # Generate output list; detail adds more fields + output_list.append([tmpInfo['ifname'], + '\n'.join(tmpInfo['ip']), + tmpInfo['mac'], + tmpInfo['vrf'], + tmpInfo['mtu'], + tmpInfo['status'], + tmpInfo['description'], + *([tmpInfo['device']] if detail else []), + *(['\n'.join(tmpInfo['alternate_names'])] if detail else []), + *([tmpInfo['minimum_mtu']] if detail else []), + *([tmpInfo['maximum_mtu']] if detail else []), + *([tmpInfo['rx_packets']] if detail else []), + *([tmpInfo['rx_bytes']] if detail else []), + *([tmpInfo['rx_errors']] if detail else []), + *([tmpInfo['rx_dropped']] if detail else []), + *([tmpInfo['rx_over_errors']] if detail else []), + *([tmpInfo['multicast']] if detail else []), + *([tmpInfo['tx_packets']] if detail else []), + *([tmpInfo['tx_bytes']] if detail else []), + *([tmpInfo['tx_errors']] if detail else []), + *([tmpInfo['tx_dropped']] if detail else []), + *([tmpInfo['tx_carrier_errors']] if detail else []), + *([tmpInfo['tx_collisions']] if detail else [])]) + + return output_list + @catch_broken_pipe def _format_show_data(data: list): unhandled = [] @@ -445,6 +562,27 @@ def _format_show_counters(data: list): print (output) return output +def show_kernel(raw: bool, intf_name: typing.Optional[str], detail: bool): + raw_data, data = _get_kernel_data(raw, intf_name, detail) + + # Return early if raw + if raw: + return raw_data + + # Normal headers; show interfaces kernel + headers = ['Interface', 'IP Address', 'MAC', 'VRF', 'MTU', 'S/L', 'Description'] + + # Detail headers; show interfaces kernel detail + detail_header = ['Interface', 'IP Address', 'MAC', 'VRF', 'MTU', 'S/L', 'Description', + 'Device', 'Alternate Names','Minimum MTU', 'Maximum MTU', 'RX_Packets', + 'RX_Bytes', 'RX_Errors', 'RX_Dropped', 'Receive Overrun Errors', 'Received Multicast', + 'TX_Packets', 'TX_Bytes', 'TX_Errors', 'TX_Dropped', 'Transmit Carrier Errors', + 'Transmit Collisions'] + + if detail: + detailed_output(data, detail_header) + else: + print(tabulate(data, headers)) def _show_raw(data: list, intf_name: str): if intf_name is not None and len(data) <= 1: diff --git a/src/op_mode/load-balancing_wan.py b/src/op_mode/load-balancing_wan.py new file mode 100755 index 000000000..9fa473802 --- /dev/null +++ b/src/op_mode/load-balancing_wan.py @@ -0,0 +1,117 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2024 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import json +import re +import sys + +from datetime import datetime + +from vyos.config import Config +from vyos.utils.process import cmd + +import vyos.opmode + +wlb_status_file = '/run/wlb_status.json' + +status_format = '''Interface: {ifname} +Status: {status} +Last Status Change: {last_change} +Last Interface Success: {last_success} +Last Interface Failure: {last_failure} +Interface Failures: {failures} +''' + +def _verify(func): + """Decorator checks if WLB config exists""" + from functools import wraps + + @wraps(func) + def _wrapper(*args, **kwargs): + config = Config() + if not config.exists(['load-balancing', 'wan']): + unconf_message = 'WAN load-balancing is not configured' + raise vyos.opmode.UnconfiguredSubsystem(unconf_message) + return func(*args, **kwargs) + return _wrapper + +def _get_raw_data(): + with open(wlb_status_file, 'r') as f: + data = json.loads(f.read()) + if not data: + return {} + return data + +def _get_formatted_output(raw_data): + for ifname, if_data in raw_data.items(): + latest_change = if_data['last_success'] if if_data['last_success'] > if_data['last_failure'] else if_data['last_failure'] + + change_dt = datetime.fromtimestamp(latest_change) if latest_change > 0 else None + success_dt = datetime.fromtimestamp(if_data['last_success']) if if_data['last_success'] > 0 else None + failure_dt = datetime.fromtimestamp(if_data['last_failure']) if if_data['last_failure'] > 0 else None + now = datetime.utcnow() + + fmt_data = { + 'ifname': ifname, + 'status': "active" if if_data['state'] else "failed", + 'last_change': change_dt.strftime("%Y-%m-%d %H:%M:%S") if change_dt else 'N/A', + 'last_success': str(now - success_dt) if success_dt else 'N/A', + 'last_failure': str(now - failure_dt) if failure_dt else 'N/A', + 'failures': if_data['failure_count'] + } + print(status_format.format(**fmt_data)) + +@_verify +def show_summary(raw: bool): + data = _get_raw_data() + + if raw: + return data + else: + return _get_formatted_output(data) + +@_verify +def show_connection(raw: bool): + res = cmd('sudo conntrack -L -n') + lines = res.split("\n") + filtered_lines = [line for line in lines if re.search(r' mark=[1-9]', line)] + + if raw: + return filtered_lines + + for line in lines: + print(line) + +@_verify +def show_status(raw: bool): + res = cmd('sudo nft list chain ip vyos_wanloadbalance wlb_mangle_prerouting') + lines = res.split("\n") + filtered_lines = [line.replace("\t", "") for line in lines[3:-2] if 'meta mark set' not in line] + + if raw: + return filtered_lines + + for line in filtered_lines: + print(line) + +if __name__ == "__main__": + try: + res = vyos.opmode.run(sys.modules[__name__]) + if res: + print(res) + except (ValueError, vyos.opmode.Error) as e: + print(e) + sys.exit(1) diff --git a/src/op_mode/qos.py b/src/op_mode/qos.py index b8ca149a0..464b552ee 100755 --- a/src/op_mode/qos.py +++ b/src/op_mode/qos.py @@ -38,7 +38,7 @@ def get_tc_info(interface_dict, interface_name, policy_type): if not policy_name: return None, None - class_dict = op_mode_config_dict(['qos', 'policy', policy_type, policy_name], key_mangling=('-', '_'), + class_dict = op_mode_config_dict(['qos', 'policy', policy_type, policy_name], get_first_key=True) if not class_dict: return None, None diff --git a/src/op_mode/restart.py b/src/op_mode/restart.py index 3b0031f34..efa835485 100755 --- a/src/op_mode/restart.py +++ b/src/op_mode/restart.py @@ -53,6 +53,10 @@ service_map = { 'systemd_service': 'strongswan', 'path': ['vpn', 'ipsec'], }, + 'load-balancing_wan': { + 'systemd_service': 'vyos-wan-load-balance', + 'path': ['load-balancing', 'wan'], + }, 'mdns_repeater': { 'systemd_service': 'avahi-daemon', 'path': ['service', 'mdns', 'repeater'], @@ -86,6 +90,7 @@ services = typing.Literal[ 'haproxy', 'igmp_proxy', 'ipsec', + 'load-balancing_wan', 'mdns_repeater', 'router_advert', 'snmp', diff --git a/src/op_mode/stp.py b/src/op_mode/stp.py new file mode 100755 index 000000000..fb57bd7ee --- /dev/null +++ b/src/op_mode/stp.py @@ -0,0 +1,185 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2025 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import sys +import typing +import json +from tabulate import tabulate + +import vyos.opmode +from vyos.utils.process import cmd +from vyos.utils.network import interface_exists + +def detailed_output(dataset, headers): + for data in dataset: + adjusted_rule = data + [""] * (len(headers) - len(data)) # account for different header length, like default-action + transformed_rule = [[header, adjusted_rule[i]] for i, header in enumerate(headers) if i < len(adjusted_rule)] # create key-pair list from headers and rules lists; wrap at 100 char + + print(tabulate(transformed_rule, tablefmt="presto")) + print() + +def _get_bridge_vlan_data(iface): + allowed_vlans = [] + native_vlan = None + vlanData = json.loads(cmd(f"bridge -j -d vlan show")) + for vlans in vlanData: + if vlans['ifname'] == iface: + for allowed in vlans['vlans']: + if "flags" in allowed and "PVID" in allowed["flags"]: + native_vlan = allowed['vlan'] + elif allowed.get('vlanEnd', None): + allowed_vlans.append(f"{allowed['vlan']}-{allowed['vlanEnd']}") + else: + allowed_vlans.append(str(allowed['vlan'])) + + if not allowed_vlans: + allowed_vlans = ["none"] + if not native_vlan: + native_vlan = "none" + + return ",".join(allowed_vlans), native_vlan + +def _get_stp_data(ifname, brInfo, brStatus): + tmpInfo = {} + + tmpInfo['bridge_name'] = brInfo.get('ifname') + tmpInfo['up_state'] = brInfo.get('operstate') + tmpInfo['priority'] = brInfo.get('linkinfo').get('info_data').get('priority') + tmpInfo['vlan_filtering'] = "Enabled" if brInfo.get('linkinfo').get('info_data').get('vlan_filtering') == 1 else "Disabled" + tmpInfo['vlan_protocol'] = brInfo.get('linkinfo').get('info_data').get('vlan_protocol') + + # The version of VyOS I tested had am issue with the "ip -d link show type bridge" + # output. The root_id was always the local bridge, even though the underlying system + # understood when it wasn't. Could be an upstream Bug. I pull from the "/sys/class/net" + # structure instead. This can be changed later if the "ip link" behavior is corrected. + + #tmpInfo['bridge_id'] = brInfo.get('linkinfo').get('info_data').get('bridge_id') + #tmpInfo['root_id'] = brInfo.get('linkinfo').get('info_data').get('root_id') + + tmpInfo['bridge_id'] = cmd(f"cat /sys/class/net/{brInfo.get('ifname')}/bridge/bridge_id").split('.') + tmpInfo['root_id'] = cmd(f"cat /sys/class/net/{brInfo.get('ifname')}/bridge/root_id").split('.') + + # The "/sys/class/net" structure stores the IDs without seperators like ':' or '.' + # This adds a ':' after every 2 characters to make it resemble a MAC Address + tmpInfo['bridge_id'][1] = ':'.join(tmpInfo['bridge_id'][1][i:i+2] for i in range(0, len(tmpInfo['bridge_id'][1]), 2)) + tmpInfo['root_id'][1] = ':'.join(tmpInfo['root_id'][1][i:i+2] for i in range(0, len(tmpInfo['root_id'][1]), 2)) + + tmpInfo['stp_state'] = "Enabled" if brInfo.get('linkinfo', {}).get('info_data', {}).get('stp_state') == 1 else "Disabled" + + # I don't call any of these values, but I created them to be called within raw output if desired + + tmpInfo['mcast_snooping'] = "Enabled" if brInfo.get('linkinfo').get('info_data').get('mcast_snooping') == 1 else "Disabled" + tmpInfo['rxbytes'] = brInfo.get('stats64').get('rx').get('bytes') + tmpInfo['rxpackets'] = brInfo.get('stats64').get('rx').get('packets') + tmpInfo['rxerrors'] = brInfo.get('stats64').get('rx').get('errors') + tmpInfo['rxdropped'] = brInfo.get('stats64').get('rx').get('dropped') + tmpInfo['rxover_errors'] = brInfo.get('stats64').get('rx').get('over_errors') + tmpInfo['rxmulticast'] = brInfo.get('stats64').get('rx').get('multicast') + tmpInfo['txbytes'] = brInfo.get('stats64').get('tx').get('bytes') + tmpInfo['txpackets'] = brInfo.get('stats64').get('tx').get('packets') + tmpInfo['txerrors'] = brInfo.get('stats64').get('tx').get('errors') + tmpInfo['txdropped'] = brInfo.get('stats64').get('tx').get('dropped') + tmpInfo['txcarrier_errors'] = brInfo.get('stats64').get('tx').get('carrier_errors') + tmpInfo['txcollosions'] = brInfo.get('stats64').get('tx').get('collisions') + + tmpStatus = [] + for members in brStatus: + if members.get('master') == brInfo.get('ifname'): + allowed_vlans, native_vlan = _get_bridge_vlan_data(members['ifname']) + tmpStatus.append({'interface': members.get('ifname'), + 'state': members.get('state').capitalize(), + 'mtu': members.get('mtu'), + 'pathcost': members.get('cost'), + 'bpduguard': "Enabled" if members.get('guard') == True else "Disabled", + 'rootguard': "Enabled" if members.get('root_block') == True else "Disabled", + 'mac_learning': "Enabled" if members.get('learning') == True else "Disabled", + 'neigh_suppress': "Enabled" if members.get('neigh_suppress') == True else "Disabled", + 'vlan_tunnel': "Enabled" if members.get('vlan_tunnel') == True else "Disabled", + 'isolated': "Enabled" if members.get('isolated') == True else "Disabled", + **({'allowed_vlans': allowed_vlans} if allowed_vlans else {}), + **({'native_vlan': native_vlan} if native_vlan else {})}) + + tmpInfo['members'] = tmpStatus + return tmpInfo + +def show_stp(raw: bool, ifname: typing.Optional[str], detail: bool): + rawList = [] + rawDict = {'stp': []} + + if ifname: + if not interface_exists(ifname): + raise vyos.opmode.Error(f"{ifname} does not exist!") + else: + ifname = "" + + bridgeInfo = json.loads(cmd(f"ip -j -d -s link show type bridge {ifname}")) + + if not bridgeInfo: + raise vyos.opmode.Error(f"No Bridges configured!") + + bridgeStatus = json.loads(cmd(f"bridge -j -s -d link show")) + + for bridges in bridgeInfo: + output_list = [] + amRoot = "" + bridgeDict = _get_stp_data(ifname, bridges, bridgeStatus) + + if bridgeDict['bridge_id'][1] == bridgeDict['root_id'][1]: + amRoot = " (This bridge is the root)" + + print('-' * 80) + print(f"Bridge interface {bridgeDict['bridge_name']} ({bridgeDict['up_state']}):\n") + print(f"Spanning Tree is {bridgeDict['stp_state']}") + print(f"Bridge ID {bridgeDict['bridge_id'][1]}, Priority {int(bridgeDict['bridge_id'][0], 16)}") + print(f"Root ID {bridgeDict['root_id'][1]}, Priority {int(bridgeDict['root_id'][0], 16)}{amRoot}") + print(f"VLANs {bridgeDict['vlan_filtering'].capitalize()}, Protocol {bridgeDict['vlan_protocol']}") + print() + + for members in bridgeDict['members']: + output_list.append([members['interface'], + members['state'], + *([members['pathcost']] if detail else []), + members['bpduguard'], + members['rootguard'], + members['mac_learning'], + *([members['neigh_suppress']] if detail else []), + *([members['vlan_tunnel']] if detail else []), + *([members['isolated']] if detail else []), + *([members['allowed_vlans']] if detail else []), + *([members['native_vlan']] if detail else [])]) + + if raw: + rawList.append(bridgeDict) + elif detail: + headers = ['Interface', 'State', 'Pathcost', 'BPDU_Guard', 'Root_Guard', 'Learning', 'Neighbor_Suppression', 'Q-in-Q', 'Port_Isolation', 'Allowed VLANs', 'Native VLAN'] + detailed_output(output_list, headers) + else: + headers = ['Interface', 'State', 'BPDU_Guard', 'Root_Guard', 'Learning'] + print(tabulate(output_list, headers)) + print() + + if raw: + rawDict['stp'] = rawList + return rawDict + +if __name__ == '__main__': + try: + res = vyos.opmode.run(sys.modules[__name__]) + if res: + print(res) + except (ValueError, vyos.opmode.Error) as e: + print(e) + sys.exit(1) diff --git a/src/op_mode/tech_support.py b/src/op_mode/tech_support.py index 24ac0af1b..c4496dfa3 100644 --- a/src/op_mode/tech_support.py +++ b/src/op_mode/tech_support.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2024 VyOS maintainers and contributors +# Copyright (C) 2025 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -20,6 +20,7 @@ import json import vyos.opmode from vyos.utils.process import cmd +from vyos.base import Warning def _get_version_data(): from vyos.version import get_version_data @@ -51,7 +52,12 @@ def _get_storage(): def _get_devices(): devices = {} devices["pci"] = cmd("lspci") - devices["usb"] = cmd("lsusb") + + try: + devices["usb"] = cmd("lsusb") + except OSError: + Warning("Could not retrieve information about USB devices") + devices["usb"] = {} return devices |