summaryrefslogtreecommitdiff
path: root/src/op_mode
diff options
context:
space:
mode:
Diffstat (limited to 'src/op_mode')
-rwxr-xr-xsrc/op_mode/dhcp.py499
-rwxr-xr-xsrc/op_mode/firewall.py159
-rwxr-xr-xsrc/op_mode/image_installer.py193
-rwxr-xr-xsrc/op_mode/interfaces.py138
-rwxr-xr-xsrc/op_mode/ipsec.py23
-rwxr-xr-xsrc/op_mode/load-balancing_wan.py117
-rwxr-xr-xsrc/op_mode/nhrp.py101
-rwxr-xr-xsrc/op_mode/qos.py2
-rwxr-xr-xsrc/op_mode/reset_wireguard.py55
-rwxr-xr-xsrc/op_mode/restart.py5
-rwxr-xr-xsrc/op_mode/stp.py185
-rw-r--r--src/op_mode/tech_support.py29
-rwxr-xr-xsrc/op_mode/vtysh_wrapper.sh2
-rw-r--r--src/op_mode/zone.py11
14 files changed, 1049 insertions, 470 deletions
diff --git a/src/op_mode/dhcp.py b/src/op_mode/dhcp.py
index 1429fd7b1..725bfc75b 100755
--- a/src/op_mode/dhcp.py
+++ b/src/op_mode/dhcp.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2022-2024 VyOS maintainers and contributors
+# Copyright (C) 2022-2025 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -30,137 +30,72 @@ from vyos.base import Warning
from vyos.configquery import ConfigTreeQuery
from vyos.kea import kea_get_active_config
+from vyos.kea import kea_get_dhcp_pools
from vyos.kea import kea_get_leases
-from vyos.kea import kea_get_pool_from_subnet_id
+from vyos.kea import kea_get_server_leases
+from vyos.kea import kea_get_static_mappings
from vyos.kea import kea_delete_lease
-from vyos.utils.process import is_systemd_service_running
from vyos.utils.process import call
+from vyos.utils.process import is_systemd_service_running
-time_string = "%a %b %d %H:%M:%S %Z %Y"
+time_string = '%a %b %d %H:%M:%S %Z %Y'
config = ConfigTreeQuery()
-lease_valid_states = ['all', 'active', 'free', 'expired', 'released', 'abandoned', 'reset', 'backup']
-sort_valid_inet = ['end', 'mac', 'hostname', 'ip', 'pool', 'remaining', 'start', 'state']
-sort_valid_inet6 = ['end', 'duid', 'ip', 'last_communication', 'pool', 'remaining', 'state', 'type']
+lease_valid_states = [
+ 'all',
+ 'active',
+ 'free',
+ 'expired',
+ 'released',
+ 'abandoned',
+ 'reset',
+ 'backup',
+]
+sort_valid_inet = [
+ 'end',
+ 'mac',
+ 'hostname',
+ 'ip',
+ 'pool',
+ 'remaining',
+ 'start',
+ 'state',
+]
+sort_valid_inet6 = [
+ 'end',
+ 'duid',
+ 'ip',
+ 'last_communication',
+ 'pool',
+ 'remaining',
+ 'state',
+ 'type',
+]
mapping_sort_valid = ['mac', 'ip', 'pool', 'duid']
+stale_warn_msg = 'DHCP server is configured but not started. Data may be stale.'
+
ArgFamily = typing.Literal['inet', 'inet6']
-ArgState = typing.Literal['all', 'active', 'free', 'expired', 'released', 'abandoned', 'reset', 'backup']
+ArgState = typing.Literal[
+ 'all', 'active', 'free', 'expired', 'released', 'abandoned', 'reset', 'backup'
+]
ArgOrigin = typing.Literal['local', 'remote']
-def _utc_to_local(utc_dt):
- return datetime.fromtimestamp((datetime.fromtimestamp(utc_dt) - datetime(1970, 1, 1)).total_seconds())
-
-
-def _format_hex_string(in_str):
- out_str = ""
- # if input is divisible by 2, add : every 2 chars
- if len(in_str) > 0 and len(in_str) % 2 == 0:
- out_str = ':'.join(a+b for a,b in zip(in_str[::2], in_str[1::2]))
- else:
- out_str = in_str
- return out_str
-
-
-def _find_list_of_dict_index(lst, key='ip', value=''):
- """
- Find the index entry of list of dict matching the dict value
- Exampe:
- % lst = [{'ip': '192.0.2.1'}, {'ip': '192.0.2.2'}]
- % _find_list_of_dict_index(lst, key='ip', value='192.0.2.2')
- % 1
- """
- idx = next((index for (index, d) in enumerate(lst) if d[key] == value), None)
- return idx
-
-
-def _get_raw_server_leases(family='inet', pool=None, sorted=None, state=[], origin=None) -> list:
- """
- Get DHCP server leases
- :return list
- """
+def _get_raw_server_leases(
+ config, family='inet', pool=None, sorted=None, state=[], origin=None
+) -> list:
inet_suffix = '6' if family == 'inet6' else '4'
- try:
- leases = kea_get_leases(inet_suffix)
- except Exception:
- raise vyos.opmode.DataUnavailable('Cannot fetch DHCP server lease information')
+ pools = [pool] if pool else kea_get_dhcp_pools(config, inet_suffix)
- if pool is None:
- pool = _get_dhcp_pools(family=family)
- else:
- pool = [pool]
-
- try:
- active_config = kea_get_active_config(inet_suffix)
- except Exception:
- raise vyos.opmode.DataUnavailable('Cannot fetch DHCP server configuration')
-
- data = []
- for lease in leases:
- lifetime = lease['valid-lft']
- expiry = (lease['cltt'] + lifetime)
-
- lease['start_timestamp'] = datetime.utcfromtimestamp(expiry - lifetime)
- lease['expire_timestamp'] = datetime.utcfromtimestamp(expiry) if expiry else None
-
- data_lease = {}
- data_lease['ip'] = lease['ip-address']
- lease_state_long = {0: 'active', 1: 'rejected', 2: 'expired'}
- data_lease['state'] = lease_state_long[lease['state']]
- data_lease['pool'] = kea_get_pool_from_subnet_id(active_config, inet_suffix, lease['subnet-id']) if active_config else '-'
- data_lease['end'] = lease['expire_timestamp'].timestamp() if lease['expire_timestamp'] else None
- data_lease['origin'] = 'local' # TODO: Determine remote in HA
- data_lease['hostname'] = lease.get('hostname', '-')
- # remove trailing dot to ensure consistency for `vyos-hostsd-client`
- if data_lease['hostname'][-1] == '.':
- data_lease['hostname'] = data_lease['hostname'][:-1]
-
- if family == 'inet':
- data_lease['mac'] = lease['hw-address']
- data_lease['start'] = lease['start_timestamp'].timestamp()
-
- if family == 'inet6':
- data_lease['last_communication'] = lease['start_timestamp'].timestamp()
- data_lease['duid'] = _format_hex_string(lease['duid'])
- data_lease['type'] = lease['type']
-
- if lease['type'] == 'IA_PD':
- prefix_len = lease['prefix-len']
- data_lease['ip'] += f'/{prefix_len}'
-
- data_lease['remaining'] = '-'
-
- if lease['valid-lft'] > 0:
- data_lease['remaining'] = lease['expire_timestamp'] - datetime.now(timezone.utc)
-
- if data_lease['remaining'].days >= 0:
- # substraction gives us a timedelta object which can't be formatted with strftime
- # so we use str(), split gets rid of the microseconds
- data_lease['remaining'] = str(data_lease['remaining']).split('.')[0]
-
- # Do not add old leases
- if data_lease['remaining'] != '' and data_lease['pool'] in pool and data_lease['state'] != 'free':
- if not state or state == 'all' or data_lease['state'] in state:
- data.append(data_lease)
-
- # deduplicate
- checked = []
- for entry in data:
- addr = entry.get('ip')
- if addr not in checked:
- checked.append(addr)
- else:
- idx = _find_list_of_dict_index(data, key='ip', value=addr)
- if idx is not None:
- data.pop(idx)
+ mappings = kea_get_server_leases(config, inet_suffix, pools, state, origin)
if sorted:
if sorted == 'ip':
- data.sort(key = lambda x:ip_address(x['ip']))
+ mappings.sort(key=lambda x: ip_address(x['ip']))
else:
- data.sort(key = lambda x:x[sorted])
- return data
+ mappings.sort(key=lambda x: x[sorted])
+ return mappings
def _get_formatted_server_leases(raw_data, family='inet'):
@@ -170,46 +105,67 @@ def _get_formatted_server_leases(raw_data, family='inet'):
ipaddr = lease.get('ip')
hw_addr = lease.get('mac')
state = lease.get('state')
- start = lease.get('start')
- start = _utc_to_local(start).strftime('%Y/%m/%d %H:%M:%S')
- end = lease.get('end')
- end = _utc_to_local(end).strftime('%Y/%m/%d %H:%M:%S') if end else '-'
+ start = datetime.fromtimestamp(lease.get('start'), timezone.utc)
+ end = (
+ datetime.fromtimestamp(lease.get('end'), timezone.utc)
+ if lease.get('end')
+ else '-'
+ )
remain = lease.get('remaining')
pool = lease.get('pool')
hostname = lease.get('hostname')
origin = lease.get('origin')
- data_entries.append([ipaddr, hw_addr, state, start, end, remain, pool, hostname, origin])
-
- headers = ['IP Address', 'MAC address', 'State', 'Lease start', 'Lease expiration', 'Remaining', 'Pool',
- 'Hostname', 'Origin']
+ data_entries.append(
+ [ipaddr, hw_addr, state, start, end, remain, pool, hostname, origin]
+ )
+
+ headers = [
+ 'IP Address',
+ 'MAC address',
+ 'State',
+ 'Lease start',
+ 'Lease expiration',
+ 'Remaining',
+ 'Pool',
+ 'Hostname',
+ 'Origin',
+ ]
if family == 'inet6':
for lease in raw_data:
ipaddr = lease.get('ip')
state = lease.get('state')
- start = lease.get('last_communication')
- start = _utc_to_local(start).strftime('%Y/%m/%d %H:%M:%S')
- end = lease.get('end')
- end = _utc_to_local(end).strftime('%Y/%m/%d %H:%M:%S')
+ start = datetime.fromtimestamp(
+ lease.get('last_communication'), timezone.utc
+ )
+ end = (
+ datetime.fromtimestamp(lease.get('end'), timezone.utc)
+ if lease.get('end')
+ else '-'
+ )
remain = lease.get('remaining')
lease_type = lease.get('type')
pool = lease.get('pool')
host_identifier = lease.get('duid')
- data_entries.append([ipaddr, state, start, end, remain, lease_type, pool, host_identifier])
-
- headers = ['IPv6 address', 'State', 'Last communication', 'Lease expiration', 'Remaining', 'Type', 'Pool',
- 'DUID']
+ data_entries.append(
+ [ipaddr, state, start, end, remain, lease_type, pool, host_identifier]
+ )
+
+ headers = [
+ 'IPv6 address',
+ 'State',
+ 'Last communication',
+ 'Lease expiration',
+ 'Remaining',
+ 'Type',
+ 'Pool',
+ 'DUID',
+ ]
output = tabulate(data_entries, headers, numalign='left')
return output
-def _get_dhcp_pools(family='inet') -> list:
- v = 'v6' if family == 'inet6' else ''
- pools = config.list_nodes(f'service dhcp{v}-server shared-network-name')
- return pools
-
-
def _get_pool_size(pool, family='inet'):
v = 'v6' if family == 'inet6' else ''
base = f'service dhcp{v}-server shared-network-name {pool}'
@@ -229,26 +185,27 @@ def _get_pool_size(pool, family='inet'):
return size
-def _get_raw_pool_statistics(family='inet', pool=None):
- if pool is None:
- pool = _get_dhcp_pools(family=family)
- else:
- pool = [pool]
+def _get_raw_server_pool_statistics(config, family='inet', pool=None):
+ inet_suffix = '6' if family == 'inet6' else '4'
+ pools = [pool] if pool else kea_get_dhcp_pools(config, inet_suffix)
- v = 'v6' if family == 'inet6' else ''
stats = []
- for p in pool:
- subnet = config.list_nodes(f'service dhcp{v}-server shared-network-name {p} subnet')
+ for p in pools:
size = _get_pool_size(family=family, pool=p)
- leases = len(_get_raw_server_leases(family=family, pool=p))
+ leases = len(_get_raw_server_leases(config, family=family, pool=p))
use_percentage = round(leases / size * 100) if size != 0 else 0
- pool_stats = {'pool': p, 'size': size, 'leases': leases,
- 'available': (size - leases), 'use_percentage': use_percentage, 'subnet': subnet}
+ pool_stats = {
+ 'pool': p,
+ 'size': size,
+ 'leases': leases,
+ 'available': (size - leases),
+ 'use_percentage': use_percentage,
+ }
stats.append(pool_stats)
return stats
-def _get_formatted_pool_statistics(pool_data, family='inet'):
+def _get_formatted_server_pool_statistics(pool_data):
data_entries = []
for entry in pool_data:
pool = entry.get('pool')
@@ -259,67 +216,52 @@ def _get_formatted_pool_statistics(pool_data, family='inet'):
use_percentage = f'{use_percentage}%'
data_entries.append([pool, size, leases, available, use_percentage])
- headers = ['Pool', 'Size','Leases', 'Available', 'Usage']
+ headers = ['Pool', 'Size', 'Leases', 'Available', 'Usage']
output = tabulate(data_entries, headers, numalign='left')
return output
-def _get_raw_server_static_mappings(family='inet', pool=None, sorted=None):
- if pool is None:
- pool = _get_dhcp_pools(family=family)
- else:
- pool = [pool]
- v = 'v6' if family == 'inet6' else ''
- mappings = []
- for p in pool:
- pool_config = config.get_config_dict(['service', f'dhcp{v}-server', 'shared-network-name', p],
- get_first_key=True)
- if 'subnet' in pool_config:
- for subnet, subnet_config in pool_config['subnet'].items():
- if 'static-mapping' in subnet_config:
- for name, mapping_config in subnet_config['static-mapping'].items():
- mapping = {'pool': p, 'subnet': subnet, 'name': name}
- mapping.update(mapping_config)
- mappings.append(mapping)
+def _get_raw_server_static_mappings(config, family='inet', pool=None, sorted=None):
+ inet_suffix = '6' if family == 'inet6' else '4'
+ pools = [pool] if pool else kea_get_dhcp_pools(config, inet_suffix)
+
+ mappings = kea_get_static_mappings(config, inet_suffix, pools)
if sorted:
if sorted == 'ip':
- if family == 'inet6':
- mappings.sort(key = lambda x:ip_address(x['ipv6-address']))
- else:
- mappings.sort(key = lambda x:ip_address(x['ip-address']))
+ mappings.sort(key=lambda x: ip_address(x['ip']))
else:
- mappings.sort(key = lambda x:x[sorted])
+ mappings.sort(key=lambda x: x[sorted])
return mappings
-def _get_formatted_server_static_mappings(raw_data, family='inet'):
+
+def _get_formatted_server_static_mappings(raw_data):
data_entries = []
- if family == 'inet':
- for entry in raw_data:
- pool = entry.get('pool')
- subnet = entry.get('subnet')
- name = entry.get('name')
- ip_addr = entry.get('ip-address', 'N/A')
- mac_addr = entry.get('mac', 'N/A')
- duid = entry.get('duid', 'N/A')
- description = entry.get('description', 'N/A')
- data_entries.append([pool, subnet, name, ip_addr, mac_addr, duid, description])
- elif family == 'inet6':
- for entry in raw_data:
- pool = entry.get('pool')
- subnet = entry.get('subnet')
- name = entry.get('name')
- ip_addr = entry.get('ipv6-address', 'N/A')
- mac_addr = entry.get('mac', 'N/A')
- duid = entry.get('duid', 'N/A')
- description = entry.get('description', 'N/A')
- data_entries.append([pool, subnet, name, ip_addr, mac_addr, duid, description])
-
- headers = ['Pool', 'Subnet', 'Name', 'IP Address', 'MAC Address', 'DUID', 'Description']
+
+ for entry in raw_data:
+ pool = entry.get('pool')
+ subnet = entry.get('subnet')
+ hostname = entry.get('hostname')
+ ip_addr = entry.get('ip', 'N/A')
+ mac_addr = entry.get('mac', 'N/A')
+ duid = entry.get('duid', 'N/A')
+ desc = entry.get('description', 'N/A')
+ data_entries.append([pool, subnet, hostname, ip_addr, mac_addr, duid, desc])
+
+ headers = [
+ 'Pool',
+ 'Subnet',
+ 'Hostname',
+ 'IP Address',
+ 'MAC Address',
+ 'DUID',
+ 'Description',
+ ]
output = tabulate(data_entries, headers, numalign='left')
return output
-def _verify(func):
+
+def _verify_server(func):
"""Decorator checks if DHCP(v6) config exists"""
from functools import wraps
@@ -333,8 +275,10 @@ def _verify(func):
if not config.exists(f'service dhcp{v}-server'):
raise vyos.opmode.UnconfiguredSubsystem(unconf_message)
return func(*args, **kwargs)
+
return _wrapper
+
def _verify_client(func):
"""Decorator checks if interface is configured as DHCP client"""
from functools import wraps
@@ -353,67 +297,124 @@ def _verify_client(func):
if not config.exists(f'interfaces {interface_path} address dhcp{v}'):
raise vyos.opmode.UnconfiguredObject(unconf_message)
return func(*args, **kwargs)
+
return _wrapper
-@_verify
-def show_pool_statistics(raw: bool, family: ArgFamily, pool: typing.Optional[str]):
- pool_data = _get_raw_pool_statistics(family=family, pool=pool)
+
+@_verify_server
+def show_server_pool_statistics(
+ raw: bool, family: ArgFamily, pool: typing.Optional[str]
+):
+ v = 'v6' if family == 'inet6' else ''
+ inet_suffix = '6' if family == 'inet6' else '4'
+
+ if not is_systemd_service_running(f'kea-dhcp{inet_suffix}-server.service'):
+ Warning(stale_warn_msg)
+
+ try:
+ active_config = kea_get_active_config(inet_suffix)
+ except Exception:
+ raise vyos.opmode.DataUnavailable('Cannot fetch DHCP server configuration')
+
+ active_pools = kea_get_dhcp_pools(active_config, inet_suffix)
+
+ if pool and active_pools and pool not in active_pools:
+ raise vyos.opmode.IncorrectValue(f'DHCP{v} pool "{pool}" does not exist!')
+
+ pool_data = _get_raw_server_pool_statistics(active_config, family=family, pool=pool)
if raw:
return pool_data
else:
- return _get_formatted_pool_statistics(pool_data, family=family)
+ return _get_formatted_server_pool_statistics(pool_data)
+
+
+@_verify_server
+def show_server_leases(
+ raw: bool,
+ family: ArgFamily,
+ pool: typing.Optional[str],
+ sorted: typing.Optional[str],
+ state: typing.Optional[ArgState],
+ origin: typing.Optional[ArgOrigin],
+):
+ v = 'v6' if family == 'inet6' else ''
+ inet_suffix = '6' if family == 'inet6' else '4'
+ if not is_systemd_service_running(f'kea-dhcp{inet_suffix}-server.service'):
+ Warning(stale_warn_msg)
-@_verify
-def show_server_leases(raw: bool, family: ArgFamily, pool: typing.Optional[str],
- sorted: typing.Optional[str], state: typing.Optional[ArgState],
- origin: typing.Optional[ArgOrigin] ):
- # if dhcp server is down, inactive leases may still be shown as active, so warn the user.
- v = '6' if family == 'inet6' else '4'
- if not is_systemd_service_running(f'kea-dhcp{v}-server.service'):
- Warning('DHCP server is configured but not started. Data may be stale.')
+ try:
+ active_config = kea_get_active_config(inet_suffix)
+ except Exception:
+ raise vyos.opmode.DataUnavailable('Cannot fetch DHCP server configuration')
- v = 'v6' if family == 'inet6' else ''
- if pool and pool not in _get_dhcp_pools(family=family):
- raise vyos.opmode.IncorrectValue(f'DHCP{v} pool "{pool}" does not exist!')
+ active_pools = kea_get_dhcp_pools(active_config, inet_suffix)
- if state and state not in lease_valid_states:
- raise vyos.opmode.IncorrectValue(f'DHCP{v} state "{state}" is invalid!')
+ if pool and active_pools and pool not in active_pools:
+ raise vyos.opmode.IncorrectValue(f'DHCP{v} pool "{pool}" does not exist!')
sort_valid = sort_valid_inet6 if family == 'inet6' else sort_valid_inet
if sorted and sorted not in sort_valid:
raise vyos.opmode.IncorrectValue(f'DHCP{v} sort "{sorted}" is invalid!')
- lease_data = _get_raw_server_leases(family=family, pool=pool, sorted=sorted, state=state, origin=origin)
+ if state and state not in lease_valid_states:
+ raise vyos.opmode.IncorrectValue(f'DHCP{v} state "{state}" is invalid!')
+
+ lease_data = _get_raw_server_leases(
+ config=active_config,
+ family=family,
+ pool=pool,
+ sorted=sorted,
+ state=state,
+ origin=origin,
+ )
if raw:
return lease_data
else:
return _get_formatted_server_leases(lease_data, family=family)
-@_verify
-def show_server_static_mappings(raw: bool, family: ArgFamily, pool: typing.Optional[str],
- sorted: typing.Optional[str]):
+
+@_verify_server
+def show_server_static_mappings(
+ raw: bool,
+ family: ArgFamily,
+ pool: typing.Optional[str],
+ sorted: typing.Optional[str],
+):
v = 'v6' if family == 'inet6' else ''
- if pool and pool not in _get_dhcp_pools(family=family):
+ inet_suffix = '6' if family == 'inet6' else '4'
+
+ if not is_systemd_service_running(f'kea-dhcp{inet_suffix}-server.service'):
+ Warning(stale_warn_msg)
+
+ try:
+ active_config = kea_get_active_config(inet_suffix)
+ except Exception:
+ raise vyos.opmode.DataUnavailable('Cannot fetch DHCP server configuration')
+
+ active_pools = kea_get_dhcp_pools(active_config, inet_suffix)
+
+ if pool and active_pools and pool not in active_pools:
raise vyos.opmode.IncorrectValue(f'DHCP{v} pool "{pool}" does not exist!')
if sorted and sorted not in mapping_sort_valid:
raise vyos.opmode.IncorrectValue(f'DHCP{v} sort "{sorted}" is invalid!')
- static_mappings = _get_raw_server_static_mappings(family=family, pool=pool, sorted=sorted)
+ static_mappings = _get_raw_server_static_mappings(
+ config=active_config, family=family, pool=pool, sorted=sorted
+ )
if raw:
return static_mappings
else:
- return _get_formatted_server_static_mappings(static_mappings, family=family)
+ return _get_formatted_server_static_mappings(static_mappings)
+
def _lease_valid(inet, address):
leases = kea_get_leases(inet)
- for lease in leases:
- if address == lease['ip-address']:
- return True
- return False
+ return any(lease['ip-address'] == address for lease in leases)
+
-@_verify
+@_verify_server
def clear_dhcp_server_lease(family: ArgFamily, address: str):
v = 'v6' if family == 'inet6' else ''
inet = '6' if family == 'inet6' else '4'
@@ -428,6 +429,7 @@ def clear_dhcp_server_lease(family: ArgFamily, address: str):
print(f'Lease "{address}" has been cleared')
+
def _get_raw_client_leases(family='inet', interface=None):
from time import mktime
from datetime import datetime
@@ -456,22 +458,29 @@ def _get_raw_client_leases(family='inet', interface=None):
# format this makes less sense for an API and also the expiry
# timestamp is provided in UNIX time. Convert string (e.g. Sun Jul
# 30 18:13:44 CEST 2023) to UNIX time (1690733624)
- tmp.update({'last_update' : int(mktime(datetime.strptime(line, time_string).timetuple()))})
+ tmp.update(
+ {
+ 'last_update': int(
+ mktime(datetime.strptime(line, time_string).timetuple())
+ )
+ }
+ )
continue
k, v = line.split('=')
- tmp.update({k : v.replace("'", "")})
+ tmp.update({k: v.replace("'", '')})
if 'interface' in tmp:
vrf = get_interface_vrf(tmp['interface'])
if vrf:
- tmp.update({'vrf' : vrf})
+ tmp.update({'vrf': vrf})
lease_data.append(tmp)
return lease_data
-def _get_formatted_client_leases(lease_data, family):
+
+def _get_formatted_client_leases(lease_data):
from time import localtime
from time import strftime
@@ -481,30 +490,34 @@ def _get_formatted_client_leases(lease_data, family):
for lease in lease_data:
if not lease.get('new_ip_address'):
continue
- data_entries.append(["Interface", lease['interface']])
+ data_entries.append(['Interface', lease['interface']])
if 'new_ip_address' in lease:
- tmp = '[Active]' if is_intf_addr_assigned(lease['interface'], lease['new_ip_address']) else '[Inactive]'
- data_entries.append(["IP address", lease['new_ip_address'], tmp])
+ tmp = (
+ '[Active]'
+ if is_intf_addr_assigned(lease['interface'], lease['new_ip_address'])
+ else '[Inactive]'
+ )
+ data_entries.append(['IP address', lease['new_ip_address'], tmp])
if 'new_subnet_mask' in lease:
- data_entries.append(["Subnet Mask", lease['new_subnet_mask']])
+ data_entries.append(['Subnet Mask', lease['new_subnet_mask']])
if 'new_domain_name' in lease:
- data_entries.append(["Domain Name", lease['new_domain_name']])
+ data_entries.append(['Domain Name', lease['new_domain_name']])
if 'new_routers' in lease:
- data_entries.append(["Router", lease['new_routers']])
+ data_entries.append(['Router', lease['new_routers']])
if 'new_domain_name_servers' in lease:
- data_entries.append(["Name Server", lease['new_domain_name_servers']])
+ data_entries.append(['Name Server', lease['new_domain_name_servers']])
if 'new_dhcp_server_identifier' in lease:
- data_entries.append(["DHCP Server", lease['new_dhcp_server_identifier']])
+ data_entries.append(['DHCP Server', lease['new_dhcp_server_identifier']])
if 'new_dhcp_lease_time' in lease:
- data_entries.append(["DHCP Server", lease['new_dhcp_lease_time']])
+ data_entries.append(['DHCP Server', lease['new_dhcp_lease_time']])
if 'vrf' in lease:
- data_entries.append(["VRF", lease['vrf']])
+ data_entries.append(['VRF', lease['vrf']])
if 'last_update' in lease:
tmp = strftime(time_string, localtime(int(lease['last_update'])))
- data_entries.append(["Last Update", tmp])
+ data_entries.append(['Last Update', tmp])
if 'new_expiry' in lease:
tmp = strftime(time_string, localtime(int(lease['new_expiry'])))
- data_entries.append(["Expiry", tmp])
+ data_entries.append(['Expiry', tmp])
# Add empty marker
data_entries.append([''])
@@ -513,12 +526,14 @@ def _get_formatted_client_leases(lease_data, family):
return output
+
def show_client_leases(raw: bool, family: ArgFamily, interface: typing.Optional[str]):
lease_data = _get_raw_client_leases(family=family, interface=interface)
if raw:
return lease_data
else:
- return _get_formatted_client_leases(lease_data, family=family)
+ return _get_formatted_client_leases(lease_data)
+
@_verify_client
def renew_client_lease(raw: bool, family: ArgFamily, interface: str):
@@ -530,6 +545,7 @@ def renew_client_lease(raw: bool, family: ArgFamily, interface: str):
else:
call(f'systemctl restart dhclient@{interface}.service')
+
@_verify_client
def release_client_lease(raw: bool, family: ArgFamily, interface: str):
if not raw:
@@ -540,6 +556,7 @@ def release_client_lease(raw: bool, family: ArgFamily, interface: str):
else:
call(f'systemctl stop dhclient@{interface}.service')
+
if __name__ == '__main__':
try:
res = vyos.opmode.run(sys.modules[__name__])
diff --git a/src/op_mode/firewall.py b/src/op_mode/firewall.py
index c197ca434..f3309ee34 100755
--- a/src/op_mode/firewall.py
+++ b/src/op_mode/firewall.py
@@ -18,6 +18,7 @@ import argparse
import ipaddress
import json
import re
+from signal import signal, SIGPIPE, SIG_DFL
import tabulate
import textwrap
@@ -25,6 +26,9 @@ from vyos.config import Config
from vyos.utils.process import cmd
from vyos.utils.dict import dict_search_args
+signal(SIGPIPE, SIG_DFL)
+
+
def get_config_node(conf, node=None, family=None, hook=None, priority=None):
if node == 'nat':
if family == 'ipv6':
@@ -148,6 +152,38 @@ def get_nftables_group_members(family, table, name):
return out
+def get_nftables_remote_group_members(family, table, name):
+ prefix = 'ip6' if family == 'ipv6' else 'ip'
+ out = []
+
+ try:
+ results_str = cmd(f'nft -j list set {prefix} {table} {name}')
+ results = json.loads(results_str)
+ except:
+ return out
+
+ if 'nftables' not in results:
+ return out
+
+ for obj in results['nftables']:
+ if 'set' not in obj:
+ continue
+
+ set_obj = obj['set']
+ if 'elem' in set_obj:
+ for elem in set_obj['elem']:
+ # search for single IP elements
+ if isinstance(elem, str):
+ out.append(elem)
+ # search for prefix elements
+ elif isinstance(elem, dict) and 'prefix' in elem:
+ out.append(f"{elem['prefix']['addr']}/{elem['prefix']['len']}")
+ # search for IP range elements
+ elif isinstance(elem, dict) and 'range' in elem:
+ out.append(f"{elem['range'][0]}-{elem['range'][1]}")
+
+ return out
+
def output_firewall_vertical(rules, headers, adjust=True):
for rule in rules:
adjusted_rule = rule + [""] * (len(headers) - len(rule)) if adjust else rule # account for different header length, like default-action
@@ -253,15 +289,17 @@ def output_firewall_name_statistics(family, hook, prior, prior_conf, single_rule
if not source_addr:
source_addr = dict_search_args(rule_conf, 'source', 'group', 'domain_group')
if not source_addr:
- source_addr = dict_search_args(rule_conf, 'source', 'fqdn')
+ source_addr = dict_search_args(rule_conf, 'source', 'group', 'remote_group')
if not source_addr:
- source_addr = dict_search_args(rule_conf, 'source', 'geoip', 'country_code')
- if source_addr:
- source_addr = str(source_addr)[1:-1].replace('\'','')
- if 'inverse_match' in dict_search_args(rule_conf, 'source', 'geoip'):
- source_addr = 'NOT ' + str(source_addr)
+ source_addr = dict_search_args(rule_conf, 'source', 'fqdn')
if not source_addr:
- source_addr = 'any'
+ source_addr = dict_search_args(rule_conf, 'source', 'geoip', 'country_code')
+ if source_addr:
+ source_addr = str(source_addr)[1:-1].replace('\'','')
+ if 'inverse_match' in dict_search_args(rule_conf, 'source', 'geoip'):
+ source_addr = 'NOT ' + str(source_addr)
+ if not source_addr:
+ source_addr = 'any'
# Get destination
dest_addr = dict_search_args(rule_conf, 'destination', 'address')
@@ -272,15 +310,17 @@ def output_firewall_name_statistics(family, hook, prior, prior_conf, single_rule
if not dest_addr:
dest_addr = dict_search_args(rule_conf, 'destination', 'group', 'domain_group')
if not dest_addr:
- dest_addr = dict_search_args(rule_conf, 'destination', 'fqdn')
+ dest_addr = dict_search_args(rule_conf, 'destination', 'group', 'remote_group')
if not dest_addr:
- dest_addr = dict_search_args(rule_conf, 'destination', 'geoip', 'country_code')
- if dest_addr:
- dest_addr = str(dest_addr)[1:-1].replace('\'','')
- if 'inverse_match' in dict_search_args(rule_conf, 'destination', 'geoip'):
- dest_addr = 'NOT ' + str(dest_addr)
+ dest_addr = dict_search_args(rule_conf, 'destination', 'fqdn')
if not dest_addr:
- dest_addr = 'any'
+ dest_addr = dict_search_args(rule_conf, 'destination', 'geoip', 'country_code')
+ if dest_addr:
+ dest_addr = str(dest_addr)[1:-1].replace('\'','')
+ if 'inverse_match' in dict_search_args(rule_conf, 'destination', 'geoip'):
+ dest_addr = 'NOT ' + str(dest_addr)
+ if not dest_addr:
+ dest_addr = 'any'
# Get inbound interface
iiface = dict_search_args(rule_conf, 'inbound_interface', 'name')
@@ -552,30 +592,8 @@ def show_firewall_group(name=None):
header_tail = []
for group_type, group_type_conf in firewall['group'].items():
- ##
- if group_type != 'dynamic_group':
-
- for group_name, group_conf in group_type_conf.items():
- if name and name != group_name:
- continue
-
- references = find_references(group_type, group_name)
- row = [group_name, textwrap.fill(group_conf.get('description') or '', 50), group_type, '\n'.join(references) or 'N/D']
- if 'address' in group_conf:
- row.append("\n".join(sorted(group_conf['address'])))
- elif 'network' in group_conf:
- row.append("\n".join(sorted(group_conf['network'], key=ipaddress.ip_network)))
- elif 'mac_address' in group_conf:
- row.append("\n".join(sorted(group_conf['mac_address'])))
- elif 'port' in group_conf:
- row.append("\n".join(sorted(group_conf['port'])))
- elif 'interface' in group_conf:
- row.append("\n".join(sorted(group_conf['interface'])))
- else:
- row.append('N/D')
- rows.append(row)
-
- else:
+ # interate over dynamic-groups
+ if group_type == 'dynamic_group':
if not args.detail:
header_tail = ['Timeout', 'Expires']
@@ -584,6 +602,9 @@ def show_firewall_group(name=None):
prefix = 'DA_' if dynamic_type == 'address_group' else 'DA6_'
if dynamic_type in firewall['group']['dynamic_group']:
for dynamic_name, dynamic_conf in firewall['group']['dynamic_group'][dynamic_type].items():
+ if name and name != dynamic_name:
+ continue
+
references = find_references(dynamic_type, dynamic_name)
row = [dynamic_name, textwrap.fill(dynamic_conf.get('description') or '', 50), dynamic_type + '(dynamic)', '\n'.join(references) or 'N/D']
@@ -622,6 +643,68 @@ def show_firewall_group(name=None):
header_tail += [""] * (len(members) - 1)
rows.append(row)
+ # iterate over remote-groups
+ elif group_type == 'remote_group':
+ for remote_name, remote_conf in group_type_conf.items():
+ if name and name != remote_name:
+ continue
+
+ references = find_references(group_type, remote_name)
+ row = [remote_name, textwrap.fill(remote_conf.get('description') or '', 50), group_type, '\n'.join(references) or 'N/D']
+ members = get_nftables_remote_group_members("ipv4", 'vyos_filter', f'R_{remote_name}')
+ members6 = get_nftables_remote_group_members("ipv6", 'vyos_filter', f'R6_{remote_name}')
+
+ if 'url' in remote_conf:
+ # display only the url if no members are found for both views
+ if not members and not members6:
+ if args.detail:
+ header_tail = ['IPv6 Members', 'Remote URL']
+ row.append('N/D')
+ row.append('N/D')
+ row.append(remote_conf['url'])
+ else:
+ row.append(remote_conf['url'])
+ rows.append(row)
+ else:
+ # display all table elements in detail view
+ if args.detail:
+ header_tail = ['IPv6 Members', 'Remote URL']
+ if members:
+ row.append(' '.join(members))
+ else:
+ row.append('N/D')
+ if members6:
+ row.append(' '.join(members6))
+ else:
+ row.append('N/D')
+ row.append(remote_conf['url'])
+ rows.append(row)
+ else:
+ row.append(remote_conf['url'])
+ rows.append(row)
+
+ # catch the rest of the group types
+ else:
+ for group_name, group_conf in group_type_conf.items():
+ if name and name != group_name:
+ continue
+
+ references = find_references(group_type, group_name)
+ row = [group_name, textwrap.fill(group_conf.get('description') or '', 50), group_type, '\n'.join(references) or 'N/D']
+ if 'address' in group_conf:
+ row.append("\n".join(sorted(group_conf['address'])))
+ elif 'network' in group_conf:
+ row.append("\n".join(sorted(group_conf['network'], key=ipaddress.ip_network)))
+ elif 'mac_address' in group_conf:
+ row.append("\n".join(sorted(group_conf['mac_address'])))
+ elif 'port' in group_conf:
+ row.append("\n".join(sorted(group_conf['port'])))
+ elif 'interface' in group_conf:
+ row.append("\n".join(sorted(group_conf['interface'])))
+ else:
+ row.append('N/D')
+ rows.append(row)
+
if rows:
print('Firewall Groups\n')
if args.detail:
diff --git a/src/op_mode/image_installer.py b/src/op_mode/image_installer.py
index 1da112673..ac5a84419 100755
--- a/src/op_mode/image_installer.py
+++ b/src/op_mode/image_installer.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright 2023-2024 VyOS maintainers and contributors <maintainers@vyos.io>
+# Copyright 2023-2025 VyOS maintainers and contributors <maintainers@vyos.io>
#
# This file is part of VyOS.
#
@@ -24,7 +24,9 @@ from glob import glob
from sys import exit
from os import environ
from os import readlink
-from os import getpid, getppid
+from os import getpid
+from os import getppid
+from json import loads
from typing import Union
from urllib.parse import urlparse
from passlib.hosts import linux_context
@@ -32,12 +34,26 @@ from errno import ENOSPC
from psutil import disk_partitions
+from vyos.base import Warning
from vyos.configtree import ConfigTree
from vyos.remote import download
-from vyos.system import disk, grub, image, compat, raid, SYSTEM_CFG_VER
+from vyos.system import disk
+from vyos.system import grub
+from vyos.system import image
+from vyos.system import compat
+from vyos.system import raid
+from vyos.system import SYSTEM_CFG_VER
+from vyos.system import grub_util
from vyos.template import render
+from vyos.utils.auth import (
+ DEFAULT_PASSWORD,
+ EPasswdStrength,
+ evaluate_strength
+)
+from vyos.utils.dict import dict_search
from vyos.utils.io import ask_input, ask_yes_no, select_entry
from vyos.utils.file import chmod_2775
+from vyos.utils.file import read_file
from vyos.utils.process import cmd, run, rc_cmd
from vyos.version import get_version_data
@@ -46,7 +62,13 @@ MSG_ERR_NOT_LIVE: str = 'The system is already installed. Please use "add system
MSG_ERR_LIVE: str = 'The system is in live-boot mode. Please use "install image" instead.'
MSG_ERR_NO_DISK: str = 'No suitable disk was found. There must be at least one disk of 2GB or greater size.'
MSG_ERR_IMPROPER_IMAGE: str = 'Missing sha256sum.txt.\nEither this image is corrupted, or of era 1.2.x (md5sum) and would downgrade image tools;\ndisallowed in either case.'
-MSG_ERR_ARCHITECTURE_MISMATCH: str = 'Upgrading to a different image architecture will break your system.'
+MSG_ERR_INCOMPATIBLE_IMAGE: str = 'Image compatibility check failed, aborting installation.'
+MSG_ERR_ARCHITECTURE_MISMATCH: str = 'The current architecture is "{0}", the new image is for "{1}". Upgrading to a different image architecture will break your system.'
+MSG_ERR_FLAVOR_MISMATCH: str = 'The current image flavor is "{0}", the new image is "{1}". Upgrading to a non-matching flavor can have unpredictable consequences.'
+MSG_ERR_MISSING_ARCHITECTURE: str = 'The new image version data does not specify architecture, cannot check compatibility (is it a legacy release image?)'
+MSG_ERR_MISSING_FLAVOR: str = 'The new image version data does not specify flavor, cannot check compatibility (is it a legacy release image?)'
+MSG_ERR_CORRUPT_CURRENT_IMAGE: str = 'Version data in the current image is malformed: missing flavor and/or architecture fields. Upgrade compatibility cannot be checked.'
+MSG_ERR_UNSUPPORTED_SIGNATURE_TYPE: str = 'Unsupported signature type, signature cannot be verified.'
MSG_INFO_INSTALL_WELCOME: str = 'Welcome to VyOS installation!\nThis command will install VyOS to your permanent storage.'
MSG_INFO_INSTALL_EXIT: str = 'Exiting from VyOS installation'
MSG_INFO_INSTALL_SUCCESS: str = 'The image installed successfully; please reboot now.'
@@ -62,6 +84,7 @@ MSG_INPUT_CONFIG_FOUND: str = 'An active configuration was found. Would you like
MSG_INPUT_CONFIG_CHOICE: str = 'The following config files are available for boot:'
MSG_INPUT_CONFIG_CHOOSE: str = 'Which file would you like as boot config?'
MSG_INPUT_IMAGE_NAME: str = 'What would you like to name this image?'
+MSG_INPUT_IMAGE_NAME_TAKEN: str = 'There is already an installed image by that name; please choose again'
MSG_INPUT_IMAGE_DEFAULT: str = 'Would you like to set the new image as the default one for boot?'
MSG_INPUT_PASSWORD: str = 'Please enter a password for the "vyos" user:'
MSG_INPUT_PASSWORD_CONFIRM: str = 'Please confirm password for the "vyos" user:'
@@ -78,8 +101,10 @@ MSG_WARN_ROOT_SIZE_TOOBIG: str = 'The size is too big. Try again.'
MSG_WARN_ROOT_SIZE_TOOSMALL: str = 'The size is too small. Try again'
MSG_WARN_IMAGE_NAME_WRONG: str = 'The suggested name is unsupported!\n'\
'It must be between 1 and 64 characters long and contains only the next characters: .+-_ a-z A-Z 0-9'
+
+MSG_WARN_CHANGE_PASSWORD: str = 'Default password used. Consider changing ' \
+ 'it on next login.'
MSG_WARN_PASSWORD_CONFIRM: str = 'The entered values did not match. Try again'
-MSG_WARN_FLAVOR_MISMATCH: str = 'The running image flavor is "{0}". The new image flavor is "{1}".\n' \
'Installing a different image flavor may cause functionality degradation or break your system.\n' \
'Do you want to continue with installation?'
CONST_MIN_DISK_SIZE: int = 2147483648 # 2 GB
@@ -95,7 +120,7 @@ DIR_ISO_MOUNT: str = f'{DIR_INSTALLATION}/iso_src'
DIR_DST_ROOT: str = f'{DIR_INSTALLATION}/disk_dst'
DIR_KERNEL_SRC: str = '/boot/'
FILE_ROOTFS_SRC: str = '/usr/lib/live/mount/medium/live/filesystem.squashfs'
-ISO_DOWNLOAD_PATH: str = '/tmp/vyos_installation.iso'
+ISO_DOWNLOAD_PATH: str = ''
external_download_script = '/usr/libexec/vyos/simple-download.py'
external_latest_image_url_script = '/usr/libexec/vyos/latest-image-url.py'
@@ -462,6 +487,29 @@ def setup_grub(root_dir: str) -> None:
render(grub_cfg_menu, grub.TMPL_GRUB_MENU, {})
render(grub_cfg_options, grub.TMPL_GRUB_OPTS, {})
+def get_cli_kernel_options(config_file: str) -> list:
+ config = ConfigTree(read_file(config_file))
+ config_dict = loads(config.to_json())
+ kernel_options = dict_search('system.option.kernel', config_dict)
+ if kernel_options is None:
+ kernel_options = {}
+ cmdline_options = []
+
+ # XXX: This code path and if statements must be kept in sync with the Kernel
+ # option handling in system_options.py:generate(). This occurance is used
+ # for having the appropriate options passed to GRUB after an image upgrade!
+ if 'disable-mitigations' in kernel_options:
+ cmdline_options.append('mitigations=off')
+ if 'disable-power-saving' in kernel_options:
+ cmdline_options.append('intel_idle.max_cstate=0 processor.max_cstate=1')
+ if 'amd-pstate-driver' in kernel_options:
+ mode = kernel_options['amd-pstate-driver']
+ cmdline_options.append(
+ f'initcall_blacklist=acpi_cpufreq_init amd_pstate={mode}')
+ if 'quiet' in kernel_options:
+ cmdline_options.append('quiet')
+
+ return cmdline_options
def configure_authentication(config_file: str, password: str) -> None:
"""Write encrypted password to config file
@@ -476,10 +524,7 @@ def configure_authentication(config_file: str, password: str) -> None:
plaintext exposed
"""
encrypted_password = linux_context.hash(password)
-
- with open(config_file) as f:
- config_string = f.read()
-
+ config_string = read_file(config_file)
config = ConfigTree(config_string)
config.set([
'system', 'login', 'user', 'vyos', 'authentication',
@@ -501,7 +546,6 @@ def validate_signature(file_path: str, sign_type: str) -> None:
"""
print('Validating signature')
signature_valid: bool = False
- # validate with minisig
if sign_type == 'minisig':
pub_key_list = glob('/usr/share/vyos/keys/*.minisign.pub')
for pubkey in pub_key_list:
@@ -510,11 +554,8 @@ def validate_signature(file_path: str, sign_type: str) -> None:
signature_valid = True
break
Path(f'{file_path}.minisig').unlink()
- # validate with GPG
- if sign_type == 'asc':
- if run(f'gpg --verify ${file_path}.asc ${file_path}') == 0:
- signature_valid = True
- Path(f'{file_path}.asc').unlink()
+ else:
+ exit(MSG_ERR_UNSUPPORTED_SIGNATURE_TYPE)
# warn or pass
if not signature_valid:
@@ -524,21 +565,18 @@ def validate_signature(file_path: str, sign_type: str) -> None:
print('Signature is valid')
def download_file(local_file: str, remote_path: str, vrf: str,
- username: str, password: str,
progressbar: bool = False, check_space: bool = False):
- environ['REMOTE_USERNAME'] = username
- environ['REMOTE_PASSWORD'] = password
+ # Server credentials are implicitly passed in environment variables
+ # that are set by add_image
if vrf is None:
download(local_file, remote_path, progressbar=progressbar,
check_space=check_space, raise_error=True)
else:
- remote_auth = f'REMOTE_USERNAME={username} REMOTE_PASSWORD={password}'
vrf_cmd = f'ip vrf exec {vrf} {external_download_script} \
--local-file {local_file} --remote-path {remote_path}'
- cmd(vrf_cmd, auth=remote_auth)
+ cmd(vrf_cmd, env=environ)
def image_fetch(image_path: str, vrf: str = None,
- username: str = '', password: str = '',
no_prompt: bool = False) -> Path:
"""Fetch an ISO image
@@ -548,13 +586,17 @@ def image_fetch(image_path: str, vrf: str = None,
Returns:
Path: a path to a local file
"""
+ import os.path
+ from uuid import uuid4
+
+ global ISO_DOWNLOAD_PATH
+
# Latest version gets url from configured "system update-check url"
if image_path == 'latest':
command = external_latest_image_url_script
if vrf:
- command = f'REMOTE_USERNAME={username} REMOTE_PASSWORD={password} \
- ip vrf exec {vrf} ' + command
- code, output = rc_cmd(command)
+ command = f'ip vrf exec {vrf} {command}'
+ code, output = rc_cmd(command, env=environ)
if code:
print(output)
exit(MSG_INFO_INSTALL_EXIT)
@@ -563,23 +605,25 @@ def image_fetch(image_path: str, vrf: str = None,
try:
# check a type of path
if urlparse(image_path).scheme:
- # download an image
+ # Download the image file
+ ISO_DOWNLOAD_PATH = os.path.join(os.path.expanduser("~"), '{0}.iso'.format(uuid4()))
download_file(ISO_DOWNLOAD_PATH, image_path, vrf,
- username, password,
progressbar=True, check_space=True)
- # download a signature
+ # Download the image signature
+ # VyOS only supports minisign signatures at the moment,
+ # but we keep the logic for multiple signatures
+ # in case we add something new in the future
sign_file = (False, '')
- for sign_type in ['minisig', 'asc']:
+ for sign_type in ['minisig']:
try:
download_file(f'{ISO_DOWNLOAD_PATH}.{sign_type}',
- f'{image_path}.{sign_type}', vrf,
- username, password)
+ f'{image_path}.{sign_type}', vrf)
sign_file = (True, sign_type)
break
except Exception:
- print(f'{sign_type} signature is not available')
- # validate a signature if it is available
+ print(f'Could not download {sign_type} signature')
+ # Validate the signature if it is available
if sign_file[0]:
validate_signature(ISO_DOWNLOAD_PATH, sign_file[1])
else:
@@ -701,30 +745,48 @@ def is_raid_install(install_object: Union[disk.DiskDetails, raid.RaidDetails]) -
return False
-def validate_compatibility(iso_path: str) -> None:
+def validate_compatibility(iso_path: str, force: bool = False) -> None:
"""Check architecture and flavor compatibility with the running image
Args:
iso_path (str): a path to the mounted ISO image
"""
- old_data = get_version_data()
- old_flavor = old_data.get('flavor', '')
- old_architecture = old_data.get('architecture') or cmd('dpkg --print-architecture')
+ current_data = get_version_data()
+ current_flavor = current_data.get('flavor')
+ current_architecture = current_data.get('architecture') or cmd('dpkg --print-architecture')
new_data = get_version_data(f'{iso_path}/version.json')
- new_flavor = new_data.get('flavor', '')
- new_architecture = new_data.get('architecture', '')
+ new_flavor = new_data.get('flavor')
+ new_architecture = new_data.get('architecture')
- if not old_architecture == new_architecture:
- print(MSG_ERR_ARCHITECTURE_MISMATCH)
+ if not current_flavor or not current_architecture:
+ # This may only happen if someone modified the version file.
+ # Unlikely but not impossible.
+ print(MSG_ERR_CORRUPT_CURRENT_IMAGE)
cleanup()
exit(MSG_INFO_INSTALL_EXIT)
- if not old_flavor == new_flavor:
- if not ask_yes_no(MSG_WARN_FLAVOR_MISMATCH.format(old_flavor, new_flavor), default=False):
- cleanup()
- exit(MSG_INFO_INSTALL_EXIT)
+ success = True
+
+ if current_architecture != new_architecture:
+ success = False
+ if not new_architecture:
+ print(MSG_ERR_MISSING_ARCHITECTURE)
+ else:
+ print(MSG_ERR_ARCHITECTURE_MISMATCH.format(current_architecture, new_architecture))
+
+ if current_flavor != new_flavor:
+ if not force:
+ success = False
+ if not new_flavor:
+ print(MSG_ERR_MISSING_FLAVOR)
+ else:
+ print(MSG_ERR_FLAVOR_MISMATCH.format(current_flavor, new_flavor))
+ if not success:
+ print(MSG_ERR_INCOMPATIBLE_IMAGE)
+ cleanup()
+ exit(MSG_INFO_INSTALL_EXIT)
def install_image() -> None:
"""Install an image to a disk
@@ -746,14 +808,25 @@ def install_image() -> None:
break
print(MSG_WARN_IMAGE_NAME_WRONG)
+ failed_check_status = [EPasswdStrength.WEAK, EPasswdStrength.ERROR]
# ask for password
while True:
user_password: str = ask_input(MSG_INPUT_PASSWORD, no_echo=True,
non_empty=True)
+
+ if user_password == DEFAULT_PASSWORD:
+ Warning(MSG_WARN_CHANGE_PASSWORD)
+ else:
+ result = evaluate_strength(user_password)
+ if result['strength'] in failed_check_status:
+ Warning(result['error'])
+
confirm: str = ask_input(MSG_INPUT_PASSWORD_CONFIRM, no_echo=True,
non_empty=True)
+
if user_password == confirm:
break
+
print(MSG_WARN_PASSWORD_CONFIRM)
# ask for default console
@@ -849,8 +922,7 @@ def install_image() -> None:
for disk_target in l:
disk.partition_mount(disk_target.partition['efi'], f'{DIR_DST_ROOT}/boot/efi')
grub.install(disk_target.name, f'{DIR_DST_ROOT}/boot/',
- f'{DIR_DST_ROOT}/boot/efi',
- id=f'VyOS (RAID disk {l.index(disk_target) + 1})')
+ f'{DIR_DST_ROOT}/boot/efi')
disk.partition_umount(disk_target.partition['efi'])
else:
print('Installing GRUB to the drive')
@@ -893,7 +965,7 @@ def install_image() -> None:
@compat.grub_cfg_update
def add_image(image_path: str, vrf: str = None, username: str = '',
- password: str = '', no_prompt: bool = False) -> None:
+ password: str = '', no_prompt: bool = False, force: bool = False) -> None:
"""Add a new image
Args:
@@ -902,15 +974,18 @@ def add_image(image_path: str, vrf: str = None, username: str = '',
if image.is_live_boot():
exit(MSG_ERR_LIVE)
+ environ['REMOTE_USERNAME'] = username
+ environ['REMOTE_PASSWORD'] = password
+
# fetch an image
- iso_path: Path = image_fetch(image_path, vrf, username, password, no_prompt)
+ iso_path: Path = image_fetch(image_path, vrf, no_prompt)
try:
# mount an ISO
Path(DIR_ISO_MOUNT).mkdir(mode=0o755, parents=True)
disk.partition_mount(iso_path, DIR_ISO_MOUNT, 'iso9660')
print('Validating image compatibility')
- validate_compatibility(DIR_ISO_MOUNT)
+ validate_compatibility(DIR_ISO_MOUNT, force=force)
# check sums
print('Validating image checksums')
@@ -936,8 +1011,12 @@ def add_image(image_path: str, vrf: str = None, username: str = '',
f'Adding image would downgrade image tools to v.{cfg_ver}; disallowed')
if not no_prompt:
+ versions = grub.version_list()
while True:
image_name: str = ask_input(MSG_INPUT_IMAGE_NAME, version_name)
+ if image_name in versions:
+ print(MSG_INPUT_IMAGE_NAME_TAKEN)
+ continue
if image.validate_name(image_name):
break
print(MSG_WARN_IMAGE_NAME_WRONG)
@@ -959,7 +1038,7 @@ def add_image(image_path: str, vrf: str = None, username: str = '',
Path(target_config_dir).mkdir(parents=True)
chown(target_config_dir, group='vyattacfg')
chmod_2775(target_config_dir)
- copytree('/opt/vyatta/etc/config/', target_config_dir,
+ copytree('/opt/vyatta/etc/config/', target_config_dir, symlinks=True,
copy_function=copy_preserve_owner, dirs_exist_ok=True)
else:
Path(target_config_dir).mkdir(parents=True)
@@ -992,6 +1071,12 @@ def add_image(image_path: str, vrf: str = None, username: str = '',
if set_as_default:
grub.set_default(image_name, root_dir)
+ cmdline_options = get_cli_kernel_options(
+ f'{target_config_dir}/config.boot')
+ grub_util.update_kernel_cmdline_options(' '.join(cmdline_options),
+ root_dir=root_dir,
+ version=image_name)
+
except OSError as e:
# if no space error, remove image dir and cleanup
if e.errno == ENOSPC:
@@ -1031,6 +1116,9 @@ def parse_arguments() -> Namespace:
parser.add_argument('--image-path',
help='a path (HTTP or local file) to an image that needs to be installed'
)
+ parser.add_argument('--force', action='store_true',
+ help='Ignore flavor compatibility requirements.'
+ )
# parser.add_argument('--image_new_name', help='a new name for image')
args: Namespace = parser.parse_args()
# Validate arguments
@@ -1047,7 +1135,8 @@ if __name__ == '__main__':
install_image()
if args.action == 'add':
add_image(args.image_path, args.vrf,
- args.username, args.password, args.no_prompt)
+ args.username, args.password,
+ args.no_prompt, args.force)
exit()
diff --git a/src/op_mode/interfaces.py b/src/op_mode/interfaces.py
index e7afc4caa..c97f3b129 100755
--- a/src/op_mode/interfaces.py
+++ b/src/op_mode/interfaces.py
@@ -29,6 +29,7 @@ from vyos.ifconfig import Section
from vyos.ifconfig import Interface
from vyos.ifconfig import VRRP
from vyos.utils.process import cmd
+from vyos.utils.network import interface_exists
from vyos.utils.process import rc_cmd
from vyos.utils.process import call
@@ -84,6 +85,14 @@ def filtered_interfaces(ifnames: typing.Union[str, list],
yield interface
+def detailed_output(dataset, headers):
+ for data in dataset:
+ adjusted_rule = data + [""] * (len(headers) - len(data)) # account for different header length, like default-action
+ transformed_rule = [[header, adjusted_rule[i]] for i, header in enumerate(headers) if i < len(adjusted_rule)] # create key-pair list from headers and rules lists; wrap at 100 char
+
+ print(tabulate(transformed_rule, tablefmt="presto"))
+ print()
+
def _split_text(text, used=0):
"""
take a string and attempt to split it to fit with the width of the screen
@@ -296,6 +305,114 @@ def _get_counter_data(ifname: typing.Optional[str],
return ret
+def _get_kernel_data(raw, ifname = None, detail = False):
+ if ifname:
+ # Check if the interface exists
+ if not interface_exists(ifname):
+ raise vyos.opmode.IncorrectValue(f"{ifname} does not exist!")
+ int_name = f'dev {ifname}'
+ else:
+ int_name = ''
+
+ kernel_interface = json.loads(cmd(f'ip -j -d -s address show {int_name}'))
+
+ # Return early if raw
+ if raw:
+ return kernel_interface, None
+
+ # Format the kernel data
+ kernel_interface_out = _format_kernel_data(kernel_interface, detail)
+
+ return kernel_interface, kernel_interface_out
+
+def _format_kernel_data(data, detail):
+ output_list = []
+ tmpInfo = {}
+
+ # Sort interfaces by name
+ for interface in sorted(data, key=lambda x: x.get('ifname', '')):
+ if interface.get('linkinfo', {}).get('info_kind') == 'vrf':
+ continue
+
+ # Get the device model; ex. Intel Corporation Ethernet Controller I225-V
+ dev_model = interface.get('parentdev', '')
+ if 'parentdev' in interface:
+ parentdev = interface['parentdev']
+ if re.match(r'^[0-9a-fA-F]{4}:', parentdev):
+ dev_model = cmd(f'lspci -nn -s {parentdev}').split(']:')[1].strip()
+
+ # Get the IP addresses on interface
+ ip_list = []
+ has_global = False
+
+ for ip in interface['addr_info']:
+ if ip.get('scope') in ('global', 'host'):
+ has_global = True
+ local = ip.get('local', '-')
+ prefixlen = ip.get('prefixlen', '')
+ ip_list.append(f"{local}/{prefixlen}")
+
+
+ # If no global IP address, add '-'; indicates no IP address on interface
+ if not has_global:
+ ip_list.append('-')
+
+ sl_status = ('A' if not 'UP' in interface['flags'] else 'u') + '/' + ('D' if interface['operstate'] == 'DOWN' else 'u')
+
+ # Generate temporary dict to hold data
+ tmpInfo['ifname'] = interface.get('ifname', '')
+ tmpInfo['ip'] = ip_list
+ tmpInfo['mac'] = interface.get('address', '')
+ tmpInfo['mtu'] = interface.get('mtu', '')
+ tmpInfo['vrf'] = interface.get('master', 'default')
+ tmpInfo['status'] = sl_status
+ tmpInfo['description'] = interface.get('ifalias', '')
+ tmpInfo['device'] = dev_model
+ tmpInfo['alternate_names'] = interface.get('altnames', '')
+ tmpInfo['minimum_mtu'] = interface.get('min_mtu', '')
+ tmpInfo['maximum_mtu'] = interface.get('max_mtu', '')
+ rx_stats = interface.get('stats64', {}).get('rx')
+ tx_stats = interface.get('stats64', {}).get('tx')
+ tmpInfo['rx_packets'] = rx_stats.get('packets', "")
+ tmpInfo['rx_bytes'] = rx_stats.get('bytes', "")
+ tmpInfo['rx_errors'] = rx_stats.get('errors', "")
+ tmpInfo['rx_dropped'] = rx_stats.get('dropped', "")
+ tmpInfo['rx_over_errors'] = rx_stats.get('over_errors', '')
+ tmpInfo['multicast'] = rx_stats.get('multicast', "")
+ tmpInfo['tx_packets'] = tx_stats.get('packets', "")
+ tmpInfo['tx_bytes'] = tx_stats.get('bytes', "")
+ tmpInfo['tx_errors'] = tx_stats.get('errors', "")
+ tmpInfo['tx_dropped'] = tx_stats.get('dropped', "")
+ tmpInfo['tx_carrier_errors'] = tx_stats.get('carrier_errors', "")
+ tmpInfo['tx_collisions'] = tx_stats.get('collisions', "")
+
+ # Generate output list; detail adds more fields
+ output_list.append([tmpInfo['ifname'],
+ '\n'.join(tmpInfo['ip']),
+ tmpInfo['mac'],
+ tmpInfo['vrf'],
+ tmpInfo['mtu'],
+ tmpInfo['status'],
+ tmpInfo['description'],
+ *([tmpInfo['device']] if detail else []),
+ *(['\n'.join(tmpInfo['alternate_names'])] if detail else []),
+ *([tmpInfo['minimum_mtu']] if detail else []),
+ *([tmpInfo['maximum_mtu']] if detail else []),
+ *([tmpInfo['rx_packets']] if detail else []),
+ *([tmpInfo['rx_bytes']] if detail else []),
+ *([tmpInfo['rx_errors']] if detail else []),
+ *([tmpInfo['rx_dropped']] if detail else []),
+ *([tmpInfo['rx_over_errors']] if detail else []),
+ *([tmpInfo['multicast']] if detail else []),
+ *([tmpInfo['tx_packets']] if detail else []),
+ *([tmpInfo['tx_bytes']] if detail else []),
+ *([tmpInfo['tx_errors']] if detail else []),
+ *([tmpInfo['tx_dropped']] if detail else []),
+ *([tmpInfo['tx_carrier_errors']] if detail else []),
+ *([tmpInfo['tx_collisions']] if detail else [])])
+
+ return output_list
+
@catch_broken_pipe
def _format_show_data(data: list):
unhandled = []
@@ -445,6 +562,27 @@ def _format_show_counters(data: list):
print (output)
return output
+def show_kernel(raw: bool, intf_name: typing.Optional[str], detail: bool):
+ raw_data, data = _get_kernel_data(raw, intf_name, detail)
+
+ # Return early if raw
+ if raw:
+ return raw_data
+
+ # Normal headers; show interfaces kernel
+ headers = ['Interface', 'IP Address', 'MAC', 'VRF', 'MTU', 'S/L', 'Description']
+
+ # Detail headers; show interfaces kernel detail
+ detail_header = ['Interface', 'IP Address', 'MAC', 'VRF', 'MTU', 'S/L', 'Description',
+ 'Device', 'Alternate Names','Minimum MTU', 'Maximum MTU', 'RX_Packets',
+ 'RX_Bytes', 'RX_Errors', 'RX_Dropped', 'Receive Overrun Errors', 'Received Multicast',
+ 'TX_Packets', 'TX_Bytes', 'TX_Errors', 'TX_Dropped', 'Transmit Carrier Errors',
+ 'Transmit Collisions']
+
+ if detail:
+ detailed_output(data, detail_header)
+ else:
+ print(tabulate(data, headers))
def _show_raw(data: list, intf_name: str):
if intf_name is not None and len(data) <= 1:
diff --git a/src/op_mode/ipsec.py b/src/op_mode/ipsec.py
index 02ba126b4..1ab50b105 100755
--- a/src/op_mode/ipsec.py
+++ b/src/op_mode/ipsec.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2022-2024 VyOS maintainers and contributors
+# Copyright (C) 2022-2025 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -700,15 +700,6 @@ def reset_profile_dst(profile: str, tunnel: str, nbma_dst: str):
]
)
)
- # initiate IKE SAs
- for ike in sa_nbma_list:
- if ike_sa_name in ike:
- vyos.ipsec.vici_initiate(
- ike_sa_name,
- 'dmvpn',
- ike[ike_sa_name]['local-host'],
- ike[ike_sa_name]['remote-host'],
- )
print(
f'Profile {profile} tunnel {tunnel} remote-host {nbma_dst} reset result: success'
)
@@ -732,18 +723,6 @@ def reset_profile_all(profile: str, tunnel: str):
)
# terminate IKE SAs
vyos.ipsec.terminate_vici_by_name(ike_sa_name, None)
- # initiate IKE SAs
- for ike in sa_list:
- if ike_sa_name in ike:
- vyos.ipsec.vici_initiate(
- ike_sa_name,
- 'dmvpn',
- ike[ike_sa_name]['local-host'],
- ike[ike_sa_name]['remote-host'],
- )
- print(
- f'Profile {profile} tunnel {tunnel} remote-host {ike[ike_sa_name]["remote-host"]} reset result: success'
- )
print(f'Profile {profile} tunnel {tunnel} reset result: success')
except vyos.ipsec.ViciInitiateError as err:
raise vyos.opmode.UnconfiguredSubsystem(err)
diff --git a/src/op_mode/load-balancing_wan.py b/src/op_mode/load-balancing_wan.py
new file mode 100755
index 000000000..9fa473802
--- /dev/null
+++ b/src/op_mode/load-balancing_wan.py
@@ -0,0 +1,117 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 2024 VyOS maintainers and contributors
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 or later as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import json
+import re
+import sys
+
+from datetime import datetime
+
+from vyos.config import Config
+from vyos.utils.process import cmd
+
+import vyos.opmode
+
+wlb_status_file = '/run/wlb_status.json'
+
+status_format = '''Interface: {ifname}
+Status: {status}
+Last Status Change: {last_change}
+Last Interface Success: {last_success}
+Last Interface Failure: {last_failure}
+Interface Failures: {failures}
+'''
+
+def _verify(func):
+ """Decorator checks if WLB config exists"""
+ from functools import wraps
+
+ @wraps(func)
+ def _wrapper(*args, **kwargs):
+ config = Config()
+ if not config.exists(['load-balancing', 'wan']):
+ unconf_message = 'WAN load-balancing is not configured'
+ raise vyos.opmode.UnconfiguredSubsystem(unconf_message)
+ return func(*args, **kwargs)
+ return _wrapper
+
+def _get_raw_data():
+ with open(wlb_status_file, 'r') as f:
+ data = json.loads(f.read())
+ if not data:
+ return {}
+ return data
+
+def _get_formatted_output(raw_data):
+ for ifname, if_data in raw_data.items():
+ latest_change = if_data['last_success'] if if_data['last_success'] > if_data['last_failure'] else if_data['last_failure']
+
+ change_dt = datetime.fromtimestamp(latest_change) if latest_change > 0 else None
+ success_dt = datetime.fromtimestamp(if_data['last_success']) if if_data['last_success'] > 0 else None
+ failure_dt = datetime.fromtimestamp(if_data['last_failure']) if if_data['last_failure'] > 0 else None
+ now = datetime.utcnow()
+
+ fmt_data = {
+ 'ifname': ifname,
+ 'status': "active" if if_data['state'] else "failed",
+ 'last_change': change_dt.strftime("%Y-%m-%d %H:%M:%S") if change_dt else 'N/A',
+ 'last_success': str(now - success_dt) if success_dt else 'N/A',
+ 'last_failure': str(now - failure_dt) if failure_dt else 'N/A',
+ 'failures': if_data['failure_count']
+ }
+ print(status_format.format(**fmt_data))
+
+@_verify
+def show_summary(raw: bool):
+ data = _get_raw_data()
+
+ if raw:
+ return data
+ else:
+ return _get_formatted_output(data)
+
+@_verify
+def show_connection(raw: bool):
+ res = cmd('sudo conntrack -L -n')
+ lines = res.split("\n")
+ filtered_lines = [line for line in lines if re.search(r' mark=[1-9]', line)]
+
+ if raw:
+ return filtered_lines
+
+ for line in lines:
+ print(line)
+
+@_verify
+def show_status(raw: bool):
+ res = cmd('sudo nft list chain ip vyos_wanloadbalance wlb_mangle_prerouting')
+ lines = res.split("\n")
+ filtered_lines = [line.replace("\t", "") for line in lines[3:-2] if 'meta mark set' not in line]
+
+ if raw:
+ return filtered_lines
+
+ for line in filtered_lines:
+ print(line)
+
+if __name__ == "__main__":
+ try:
+ res = vyos.opmode.run(sys.modules[__name__])
+ if res:
+ print(res)
+ except (ValueError, vyos.opmode.Error) as e:
+ print(e)
+ sys.exit(1)
diff --git a/src/op_mode/nhrp.py b/src/op_mode/nhrp.py
deleted file mode 100755
index e66f33079..000000000
--- a/src/op_mode/nhrp.py
+++ /dev/null
@@ -1,101 +0,0 @@
-#!/usr/bin/env python3
-#
-# Copyright (C) 2023 VyOS maintainers and contributors
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 2 or later as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import sys
-import tabulate
-import vyos.opmode
-
-from vyos.utils.process import cmd
-from vyos.utils.process import process_named_running
-from vyos.utils.dict import colon_separated_to_dict
-
-
-def _get_formatted_output(output_dict: dict) -> str:
- """
- Create formatted table for CLI output
- :param output_dict: dictionary for API
- :type output_dict: dict
- :return: tabulate string
- :rtype: str
- """
- print(f"Status: {output_dict['Status']}")
- output: str = tabulate.tabulate(output_dict['routes'], headers='keys',
- numalign="left")
- return output
-
-
-def _get_formatted_dict(output_string: str) -> dict:
- """
- Format string returned from CMD to API list
- :param output_string: String received by CMD
- :type output_string: str
- :return: dictionary for API
- :rtype: dict
- """
- formatted_dict: dict = {
- 'Status': '',
- 'routes': []
- }
- output_list: list = output_string.split('\n\n')
- for list_a in output_list:
- output_dict = colon_separated_to_dict(list_a, True)
- if 'Status' in output_dict:
- formatted_dict['Status'] = output_dict['Status']
- else:
- formatted_dict['routes'].append(output_dict)
- return formatted_dict
-
-
-def show_interface(raw: bool):
- """
- Command 'show nhrp interface'
- :param raw: if API
- :type raw: bool
- """
- if not process_named_running('opennhrp'):
- raise vyos.opmode.UnconfiguredSubsystem('OpenNHRP is not running.')
- interface_string: str = cmd('sudo opennhrpctl interface show')
- interface_dict: dict = _get_formatted_dict(interface_string)
- if raw:
- return interface_dict
- else:
- return _get_formatted_output(interface_dict)
-
-
-def show_tunnel(raw: bool):
- """
- Command 'show nhrp tunnel'
- :param raw: if API
- :type raw: bool
- """
- if not process_named_running('opennhrp'):
- raise vyos.opmode.UnconfiguredSubsystem('OpenNHRP is not running.')
- tunnel_string: str = cmd('sudo opennhrpctl show')
- tunnel_dict: list = _get_formatted_dict(tunnel_string)
- if raw:
- return tunnel_dict
- else:
- return _get_formatted_output(tunnel_dict)
-
-
-if __name__ == '__main__':
- try:
- res = vyos.opmode.run(sys.modules[__name__])
- if res:
- print(res)
- except (ValueError, vyos.opmode.Error) as e:
- print(e)
- sys.exit(1)
diff --git a/src/op_mode/qos.py b/src/op_mode/qos.py
index b8ca149a0..464b552ee 100755
--- a/src/op_mode/qos.py
+++ b/src/op_mode/qos.py
@@ -38,7 +38,7 @@ def get_tc_info(interface_dict, interface_name, policy_type):
if not policy_name:
return None, None
- class_dict = op_mode_config_dict(['qos', 'policy', policy_type, policy_name], key_mangling=('-', '_'),
+ class_dict = op_mode_config_dict(['qos', 'policy', policy_type, policy_name],
get_first_key=True)
if not class_dict:
return None, None
diff --git a/src/op_mode/reset_wireguard.py b/src/op_mode/reset_wireguard.py
new file mode 100755
index 000000000..1fcfb31b5
--- /dev/null
+++ b/src/op_mode/reset_wireguard.py
@@ -0,0 +1,55 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 2025 VyOS maintainers and contributors
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 or later as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import sys
+import typing
+
+import vyos.opmode
+
+from vyos.ifconfig import WireGuardIf
+from vyos.configquery import ConfigTreeQuery
+
+
+def _verify(func):
+ """Decorator checks if WireGuard interface config exists"""
+ from functools import wraps
+
+ @wraps(func)
+ def _wrapper(*args, **kwargs):
+ config = ConfigTreeQuery()
+ interface = kwargs.get('interface')
+ if not config.exists(['interfaces', 'wireguard', interface]):
+ unconf_message = f'WireGuard interface {interface} is not configured'
+ raise vyos.opmode.UnconfiguredSubsystem(unconf_message)
+ return func(*args, **kwargs)
+
+ return _wrapper
+
+
+@_verify
+def reset_peer(interface: str, peer: typing.Optional[str] = None):
+ intf = WireGuardIf(interface, create=False, debug=False)
+ return intf.operational.reset_peer(peer)
+
+
+if __name__ == '__main__':
+ try:
+ res = vyos.opmode.run(sys.modules[__name__])
+ if res:
+ print(res)
+ except (ValueError, vyos.opmode.Error) as e:
+ print(e)
+ sys.exit(1)
diff --git a/src/op_mode/restart.py b/src/op_mode/restart.py
index 3b0031f34..efa835485 100755
--- a/src/op_mode/restart.py
+++ b/src/op_mode/restart.py
@@ -53,6 +53,10 @@ service_map = {
'systemd_service': 'strongswan',
'path': ['vpn', 'ipsec'],
},
+ 'load-balancing_wan': {
+ 'systemd_service': 'vyos-wan-load-balance',
+ 'path': ['load-balancing', 'wan'],
+ },
'mdns_repeater': {
'systemd_service': 'avahi-daemon',
'path': ['service', 'mdns', 'repeater'],
@@ -86,6 +90,7 @@ services = typing.Literal[
'haproxy',
'igmp_proxy',
'ipsec',
+ 'load-balancing_wan',
'mdns_repeater',
'router_advert',
'snmp',
diff --git a/src/op_mode/stp.py b/src/op_mode/stp.py
new file mode 100755
index 000000000..fb57bd7ee
--- /dev/null
+++ b/src/op_mode/stp.py
@@ -0,0 +1,185 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 2025 VyOS maintainers and contributors
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 or later as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import sys
+import typing
+import json
+from tabulate import tabulate
+
+import vyos.opmode
+from vyos.utils.process import cmd
+from vyos.utils.network import interface_exists
+
+def detailed_output(dataset, headers):
+ for data in dataset:
+ adjusted_rule = data + [""] * (len(headers) - len(data)) # account for different header length, like default-action
+ transformed_rule = [[header, adjusted_rule[i]] for i, header in enumerate(headers) if i < len(adjusted_rule)] # create key-pair list from headers and rules lists; wrap at 100 char
+
+ print(tabulate(transformed_rule, tablefmt="presto"))
+ print()
+
+def _get_bridge_vlan_data(iface):
+ allowed_vlans = []
+ native_vlan = None
+ vlanData = json.loads(cmd(f"bridge -j -d vlan show"))
+ for vlans in vlanData:
+ if vlans['ifname'] == iface:
+ for allowed in vlans['vlans']:
+ if "flags" in allowed and "PVID" in allowed["flags"]:
+ native_vlan = allowed['vlan']
+ elif allowed.get('vlanEnd', None):
+ allowed_vlans.append(f"{allowed['vlan']}-{allowed['vlanEnd']}")
+ else:
+ allowed_vlans.append(str(allowed['vlan']))
+
+ if not allowed_vlans:
+ allowed_vlans = ["none"]
+ if not native_vlan:
+ native_vlan = "none"
+
+ return ",".join(allowed_vlans), native_vlan
+
+def _get_stp_data(ifname, brInfo, brStatus):
+ tmpInfo = {}
+
+ tmpInfo['bridge_name'] = brInfo.get('ifname')
+ tmpInfo['up_state'] = brInfo.get('operstate')
+ tmpInfo['priority'] = brInfo.get('linkinfo').get('info_data').get('priority')
+ tmpInfo['vlan_filtering'] = "Enabled" if brInfo.get('linkinfo').get('info_data').get('vlan_filtering') == 1 else "Disabled"
+ tmpInfo['vlan_protocol'] = brInfo.get('linkinfo').get('info_data').get('vlan_protocol')
+
+ # The version of VyOS I tested had am issue with the "ip -d link show type bridge"
+ # output. The root_id was always the local bridge, even though the underlying system
+ # understood when it wasn't. Could be an upstream Bug. I pull from the "/sys/class/net"
+ # structure instead. This can be changed later if the "ip link" behavior is corrected.
+
+ #tmpInfo['bridge_id'] = brInfo.get('linkinfo').get('info_data').get('bridge_id')
+ #tmpInfo['root_id'] = brInfo.get('linkinfo').get('info_data').get('root_id')
+
+ tmpInfo['bridge_id'] = cmd(f"cat /sys/class/net/{brInfo.get('ifname')}/bridge/bridge_id").split('.')
+ tmpInfo['root_id'] = cmd(f"cat /sys/class/net/{brInfo.get('ifname')}/bridge/root_id").split('.')
+
+ # The "/sys/class/net" structure stores the IDs without seperators like ':' or '.'
+ # This adds a ':' after every 2 characters to make it resemble a MAC Address
+ tmpInfo['bridge_id'][1] = ':'.join(tmpInfo['bridge_id'][1][i:i+2] for i in range(0, len(tmpInfo['bridge_id'][1]), 2))
+ tmpInfo['root_id'][1] = ':'.join(tmpInfo['root_id'][1][i:i+2] for i in range(0, len(tmpInfo['root_id'][1]), 2))
+
+ tmpInfo['stp_state'] = "Enabled" if brInfo.get('linkinfo', {}).get('info_data', {}).get('stp_state') == 1 else "Disabled"
+
+ # I don't call any of these values, but I created them to be called within raw output if desired
+
+ tmpInfo['mcast_snooping'] = "Enabled" if brInfo.get('linkinfo').get('info_data').get('mcast_snooping') == 1 else "Disabled"
+ tmpInfo['rxbytes'] = brInfo.get('stats64').get('rx').get('bytes')
+ tmpInfo['rxpackets'] = brInfo.get('stats64').get('rx').get('packets')
+ tmpInfo['rxerrors'] = brInfo.get('stats64').get('rx').get('errors')
+ tmpInfo['rxdropped'] = brInfo.get('stats64').get('rx').get('dropped')
+ tmpInfo['rxover_errors'] = brInfo.get('stats64').get('rx').get('over_errors')
+ tmpInfo['rxmulticast'] = brInfo.get('stats64').get('rx').get('multicast')
+ tmpInfo['txbytes'] = brInfo.get('stats64').get('tx').get('bytes')
+ tmpInfo['txpackets'] = brInfo.get('stats64').get('tx').get('packets')
+ tmpInfo['txerrors'] = brInfo.get('stats64').get('tx').get('errors')
+ tmpInfo['txdropped'] = brInfo.get('stats64').get('tx').get('dropped')
+ tmpInfo['txcarrier_errors'] = brInfo.get('stats64').get('tx').get('carrier_errors')
+ tmpInfo['txcollosions'] = brInfo.get('stats64').get('tx').get('collisions')
+
+ tmpStatus = []
+ for members in brStatus:
+ if members.get('master') == brInfo.get('ifname'):
+ allowed_vlans, native_vlan = _get_bridge_vlan_data(members['ifname'])
+ tmpStatus.append({'interface': members.get('ifname'),
+ 'state': members.get('state').capitalize(),
+ 'mtu': members.get('mtu'),
+ 'pathcost': members.get('cost'),
+ 'bpduguard': "Enabled" if members.get('guard') == True else "Disabled",
+ 'rootguard': "Enabled" if members.get('root_block') == True else "Disabled",
+ 'mac_learning': "Enabled" if members.get('learning') == True else "Disabled",
+ 'neigh_suppress': "Enabled" if members.get('neigh_suppress') == True else "Disabled",
+ 'vlan_tunnel': "Enabled" if members.get('vlan_tunnel') == True else "Disabled",
+ 'isolated': "Enabled" if members.get('isolated') == True else "Disabled",
+ **({'allowed_vlans': allowed_vlans} if allowed_vlans else {}),
+ **({'native_vlan': native_vlan} if native_vlan else {})})
+
+ tmpInfo['members'] = tmpStatus
+ return tmpInfo
+
+def show_stp(raw: bool, ifname: typing.Optional[str], detail: bool):
+ rawList = []
+ rawDict = {'stp': []}
+
+ if ifname:
+ if not interface_exists(ifname):
+ raise vyos.opmode.Error(f"{ifname} does not exist!")
+ else:
+ ifname = ""
+
+ bridgeInfo = json.loads(cmd(f"ip -j -d -s link show type bridge {ifname}"))
+
+ if not bridgeInfo:
+ raise vyos.opmode.Error(f"No Bridges configured!")
+
+ bridgeStatus = json.loads(cmd(f"bridge -j -s -d link show"))
+
+ for bridges in bridgeInfo:
+ output_list = []
+ amRoot = ""
+ bridgeDict = _get_stp_data(ifname, bridges, bridgeStatus)
+
+ if bridgeDict['bridge_id'][1] == bridgeDict['root_id'][1]:
+ amRoot = " (This bridge is the root)"
+
+ print('-' * 80)
+ print(f"Bridge interface {bridgeDict['bridge_name']} ({bridgeDict['up_state']}):\n")
+ print(f"Spanning Tree is {bridgeDict['stp_state']}")
+ print(f"Bridge ID {bridgeDict['bridge_id'][1]}, Priority {int(bridgeDict['bridge_id'][0], 16)}")
+ print(f"Root ID {bridgeDict['root_id'][1]}, Priority {int(bridgeDict['root_id'][0], 16)}{amRoot}")
+ print(f"VLANs {bridgeDict['vlan_filtering'].capitalize()}, Protocol {bridgeDict['vlan_protocol']}")
+ print()
+
+ for members in bridgeDict['members']:
+ output_list.append([members['interface'],
+ members['state'],
+ *([members['pathcost']] if detail else []),
+ members['bpduguard'],
+ members['rootguard'],
+ members['mac_learning'],
+ *([members['neigh_suppress']] if detail else []),
+ *([members['vlan_tunnel']] if detail else []),
+ *([members['isolated']] if detail else []),
+ *([members['allowed_vlans']] if detail else []),
+ *([members['native_vlan']] if detail else [])])
+
+ if raw:
+ rawList.append(bridgeDict)
+ elif detail:
+ headers = ['Interface', 'State', 'Pathcost', 'BPDU_Guard', 'Root_Guard', 'Learning', 'Neighbor_Suppression', 'Q-in-Q', 'Port_Isolation', 'Allowed VLANs', 'Native VLAN']
+ detailed_output(output_list, headers)
+ else:
+ headers = ['Interface', 'State', 'BPDU_Guard', 'Root_Guard', 'Learning']
+ print(tabulate(output_list, headers))
+ print()
+
+ if raw:
+ rawDict['stp'] = rawList
+ return rawDict
+
+if __name__ == '__main__':
+ try:
+ res = vyos.opmode.run(sys.modules[__name__])
+ if res:
+ print(res)
+ except (ValueError, vyos.opmode.Error) as e:
+ print(e)
+ sys.exit(1)
diff --git a/src/op_mode/tech_support.py b/src/op_mode/tech_support.py
index f60bb87ff..c4496dfa3 100644
--- a/src/op_mode/tech_support.py
+++ b/src/op_mode/tech_support.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2024 VyOS maintainers and contributors
+# Copyright (C) 2025 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -20,6 +20,7 @@ import json
import vyos.opmode
from vyos.utils.process import cmd
+from vyos.base import Warning
def _get_version_data():
from vyos.version import get_version_data
@@ -51,7 +52,12 @@ def _get_storage():
def _get_devices():
devices = {}
devices["pci"] = cmd("lspci")
- devices["usb"] = cmd("lsusb")
+
+ try:
+ devices["usb"] = cmd("lsusb")
+ except OSError:
+ Warning("Could not retrieve information about USB devices")
+ devices["usb"] = {}
return devices
@@ -97,21 +103,22 @@ def _get_boot_config():
return strip_config_source(config)
def _get_config_scripts():
- from os import listdir
+ from os import walk
from os.path import join
from vyos.utils.file import read_file
scripts = []
dir = '/config/scripts'
- for f in listdir(dir):
- script = {}
- path = join(dir, f)
- data = read_file(path)
- script["path"] = path
- script["data"] = data
-
- scripts.append(script)
+ for dirpath, _, filenames in walk(dir):
+ for filename in filenames:
+ script = {}
+ path = join(dirpath, filename)
+ data = read_file(path)
+ script["path"] = path
+ script["data"] = data
+
+ scripts.append(script)
return scripts
diff --git a/src/op_mode/vtysh_wrapper.sh b/src/op_mode/vtysh_wrapper.sh
index 25d09ce77..bc472f7bb 100755
--- a/src/op_mode/vtysh_wrapper.sh
+++ b/src/op_mode/vtysh_wrapper.sh
@@ -2,5 +2,5 @@
declare -a tmp
# FRR uses ospf6 where we use ospfv3, and we use reset over clear for BGP,
# thus alter the commands
-tmp=$(echo $@ | sed -e "s/ospfv3/ospf6/" | sed -e "s/^reset bgp/clear bgp/" | sed -e "s/^reset ip bgp/clear ip bgp/")
+tmp=$(echo $@ | sed -e "s/ospfv3/ospf6/" | sed -e "s/^reset bgp/clear bgp/" | sed -e "s/^reset ip bgp/clear ip bgp/"| sed -e "s/^reset ip nhrp/clear ip nhrp/")
vtysh -c "$tmp"
diff --git a/src/op_mode/zone.py b/src/op_mode/zone.py
index 49fecdf28..df39549d2 100644
--- a/src/op_mode/zone.py
+++ b/src/op_mode/zone.py
@@ -56,10 +56,15 @@ def _convert_one_zone_data(zone: str, zone_config: dict) -> dict:
from_zone_dict['firewall_v6'] = dict_search(
'firewall.ipv6_name', from_zone_config)
list_of_rules.append(from_zone_dict)
+ zone_members =[]
+ interface_members = dict_search('member.interface', zone_config)
+ vrf_members = dict_search('member.vrf', zone_config)
+ zone_members += interface_members if interface_members is not None else []
+ zone_members += vrf_members if vrf_members is not None else []
zone_dict = {
'name': zone,
- 'interface': dict_search('interface', zone_config),
+ 'members': zone_members,
'type': 'LOCAL' if dict_search('local_zone',
zone_config) is not None else None,
}
@@ -126,7 +131,7 @@ def output_zone_list(zone_conf: dict) -> list:
if zone_conf['type'] == 'LOCAL':
zone_info.append('LOCAL')
else:
- zone_info.append("\n".join(zone_conf['interface']))
+ zone_info.append("\n".join(zone_conf['members']))
from_zone = []
firewall = []
@@ -175,7 +180,7 @@ def get_formatted_output(zone_policy: list) -> str:
:rtype: str
"""
headers = ["Zone",
- "Interfaces",
+ "Members",
"From Zone",
"Firewall IPv4",
"Firewall IPv6"