diff options
Diffstat (limited to 'src/op_mode')
-rwxr-xr-x | src/op_mode/bridge.py | 47 | ||||
-rwxr-xr-x | src/op_mode/dhcp.py | 473 | ||||
-rwxr-xr-x | src/op_mode/firewall.py | 159 | ||||
-rw-r--r-- | src/op_mode/generate_psk.py | 45 | ||||
-rwxr-xr-x | src/op_mode/image_installer.py | 211 | ||||
-rwxr-xr-x | src/op_mode/interfaces.py | 138 | ||||
-rw-r--r-- | src/op_mode/interfaces_wireguard.py | 53 | ||||
-rwxr-xr-x | src/op_mode/ipsec.py | 23 | ||||
-rwxr-xr-x | src/op_mode/load-balancing_haproxy.py (renamed from src/op_mode/reverseproxy.py) | 4 | ||||
-rwxr-xr-x | src/op_mode/load-balancing_wan.py | 117 | ||||
-rw-r--r-- | src/op_mode/mtr.py | 90 | ||||
-rw-r--r-- | src/op_mode/mtr_execute.py | 217 | ||||
-rwxr-xr-x | src/op_mode/nhrp.py | 101 | ||||
-rwxr-xr-x | src/op_mode/pki.py | 837 | ||||
-rwxr-xr-x | src/op_mode/qos.py | 2 | ||||
-rwxr-xr-x | src/op_mode/reset_wireguard.py | 55 | ||||
-rwxr-xr-x | src/op_mode/restart.py | 15 | ||||
-rwxr-xr-x | src/op_mode/show_configuration_files.sh | 10 | ||||
-rwxr-xr-x | src/op_mode/stp.py | 185 | ||||
-rw-r--r-- | src/op_mode/tech_support.py | 29 | ||||
-rwxr-xr-x | src/op_mode/vrrp.py | 355 | ||||
-rwxr-xr-x | src/op_mode/vtysh_wrapper.sh | 2 | ||||
-rw-r--r-- | src/op_mode/zone.py | 11 |
23 files changed, 2331 insertions, 848 deletions
diff --git a/src/op_mode/bridge.py b/src/op_mode/bridge.py index e80b1c21d..c4293a77c 100755 --- a/src/op_mode/bridge.py +++ b/src/op_mode/bridge.py @@ -23,10 +23,11 @@ from tabulate import tabulate from vyos.utils.process import cmd from vyos.utils.process import rc_cmd -from vyos.utils.process import call +from vyos.utils.process import call import vyos.opmode + def _get_json_data(): """ Get bridge data format JSON @@ -43,7 +44,7 @@ def _get_raw_data_summary(): return data_dict -def _get_raw_data_vlan(tunnel:bool=False): +def _get_raw_data_vlan(tunnel: bool = False): """ :returns dict """ @@ -54,14 +55,18 @@ def _get_raw_data_vlan(tunnel:bool=False): data_dict = json.loads(json_data) return data_dict + def _get_raw_data_vni() -> dict: """ :returns dict """ - json_data = cmd(f'bridge --json vni show') + code, json_data = rc_cmd(f'bridge --json vni show') + if code != 0: + raise vyos.opmode.UnconfiguredObject('VNI is not configured') data_dict = json.loads(json_data) return data_dict + def _get_raw_data_fdb(bridge): """Get MAC-address for the bridge brX :returns list @@ -70,7 +75,9 @@ def _get_raw_data_fdb(bridge): # From iproute2 fdb.c, fdb_show() will only exit(-1) in case of # non-existent bridge device; raise error. if code == 255: - raise vyos.opmode.UnconfiguredObject(f"bridge {bridge} does not exist in the system") + raise vyos.opmode.UnconfiguredObject( + f'bridge {bridge} does not exist in the system' + ) data_dict = json.loads(json_data) return data_dict @@ -116,8 +123,8 @@ def _get_formatted_output_summary(data): flags = ','.join(option.get('flags')).lower() prio = option.get('priority') member_entries.append([interface, state, mtu, flags, prio]) - member_headers = ["Member", "State", "MTU", "Flags", "Prio"] - output_members = tabulate(member_entries, member_headers, numalign="left") + member_headers = ['Member', 'State', 'MTU', 'Flags', 'Prio'] + output_members = tabulate(member_entries, member_headers, numalign='left') output_bridge = f"""Bridge interface {bridge}: {output_members} @@ -138,13 +145,14 @@ def _get_formatted_output_vlan(data): vlan_end = vlan_entry.get('vlanEnd') vlan = f'{vlan}-{vlan_end}' flags_raw = vlan_entry.get('flags') - flags = ', '.join(flags_raw if isinstance(flags_raw,list) else "").lower() + flags = ', '.join(flags_raw if isinstance(flags_raw, list) else '').lower() data_entries.append([interface, vlan, flags]) - headers = ["Interface", "VLAN", "Flags"] + headers = ['Interface', 'VLAN', 'Flags'] output = tabulate(data_entries, headers) return output + def _get_formatted_output_vlan_tunnel(data): data_entries = [] for entry in data: @@ -166,10 +174,11 @@ def _get_formatted_output_vlan_tunnel(data): # 200 200 data_entries.append(['', vlan, vni]) - headers = ["Interface", "VLAN", "VNI"] + headers = ['Interface', 'VLAN', 'VNI'] output = tabulate(data_entries, headers) return output + def _get_formatted_output_vni(data): data_entries = [] for entry in data: @@ -182,10 +191,11 @@ def _get_formatted_output_vni(data): vlan = f'{vlan}-{vlan_end}' data_entries.append([interface, vlan]) - headers = ["Interface", "VNI"] + headers = ['Interface', 'VNI'] output = tabulate(data_entries, headers) return output + def _get_formatted_output_fdb(data): data_entries = [] for entry in data: @@ -195,8 +205,8 @@ def _get_formatted_output_fdb(data): flags = ','.join(entry['flags']) data_entries.append([interface, mac, state, flags]) - headers = ["Interface", "Mac address", "State", "Flags"] - output = tabulate(data_entries, headers, numalign="left") + headers = ['Interface', 'Mac address', 'State', 'Flags'] + output = tabulate(data_entries, headers, numalign='left') return output @@ -209,28 +219,33 @@ def _get_formatted_output_mdb(data): state = mdb_entry.get('state') flags = ','.join(mdb_entry.get('flags')) data_entries.append([interface, group, state, flags]) - headers = ["Interface", "Group", "State", "Flags"] + headers = ['Interface', 'Group', 'State', 'Flags'] output = tabulate(data_entries, headers) return output + def _get_bridge_detail(iface): """Get interface detail statistics""" return call(f'vtysh -c "show interface {iface}"') + def _get_bridge_detail_nexthop_group(iface): """Get interface detail nexthop_group statistics""" return call(f'vtysh -c "show interface {iface} nexthop-group"') + def _get_bridge_detail_nexthop_group_raw(iface): out = cmd(f'vtysh -c "show interface {iface} nexthop-group"') return out + def _get_bridge_detail_raw(iface): """Get interface detail json statistics""" - data = cmd(f'vtysh -c "show interface {iface} json"') + data = cmd(f'vtysh -c "show interface {iface} json"') data_dict = json.loads(data) return data_dict + def show(raw: bool): bridge_data = _get_raw_data_summary() if raw: @@ -249,6 +264,7 @@ def show_vlan(raw: bool, tunnel: typing.Optional[bool]): else: return _get_formatted_output_vlan(bridge_vlan) + def show_vni(raw: bool): bridge_vni = _get_raw_data_vni() if raw: @@ -256,6 +272,7 @@ def show_vni(raw: bool): else: return _get_formatted_output_vni(bridge_vni) + def show_fdb(raw: bool, interface: str): fdb_data = _get_raw_data_fdb(interface) if raw: @@ -271,6 +288,7 @@ def show_mdb(raw: bool, interface: str): else: return _get_formatted_output_mdb(mdb_data) + def show_detail(raw: bool, nexthop_group: typing.Optional[bool], interface: str): if raw: if nexthop_group: @@ -283,6 +301,7 @@ def show_detail(raw: bool, nexthop_group: typing.Optional[bool], interface: str) else: return _get_bridge_detail(interface) + if __name__ == '__main__': try: res = vyos.opmode.run(sys.modules[__name__]) diff --git a/src/op_mode/dhcp.py b/src/op_mode/dhcp.py index e5455c8af..725bfc75b 100755 --- a/src/op_mode/dhcp.py +++ b/src/op_mode/dhcp.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2022-2024 VyOS maintainers and contributors +# Copyright (C) 2022-2025 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -19,6 +19,7 @@ import sys import typing from datetime import datetime +from datetime import timezone from glob import glob from ipaddress import ip_address from tabulate import tabulate @@ -29,133 +30,72 @@ from vyos.base import Warning from vyos.configquery import ConfigTreeQuery from vyos.kea import kea_get_active_config +from vyos.kea import kea_get_dhcp_pools from vyos.kea import kea_get_leases -from vyos.kea import kea_get_pool_from_subnet_id +from vyos.kea import kea_get_server_leases +from vyos.kea import kea_get_static_mappings from vyos.kea import kea_delete_lease -from vyos.utils.process import is_systemd_service_running from vyos.utils.process import call +from vyos.utils.process import is_systemd_service_running -time_string = "%a %b %d %H:%M:%S %Z %Y" +time_string = '%a %b %d %H:%M:%S %Z %Y' config = ConfigTreeQuery() -lease_valid_states = ['all', 'active', 'free', 'expired', 'released', 'abandoned', 'reset', 'backup'] -sort_valid_inet = ['end', 'mac', 'hostname', 'ip', 'pool', 'remaining', 'start', 'state'] -sort_valid_inet6 = ['end', 'duid', 'ip', 'last_communication', 'pool', 'remaining', 'state', 'type'] +lease_valid_states = [ + 'all', + 'active', + 'free', + 'expired', + 'released', + 'abandoned', + 'reset', + 'backup', +] +sort_valid_inet = [ + 'end', + 'mac', + 'hostname', + 'ip', + 'pool', + 'remaining', + 'start', + 'state', +] +sort_valid_inet6 = [ + 'end', + 'duid', + 'ip', + 'last_communication', + 'pool', + 'remaining', + 'state', + 'type', +] mapping_sort_valid = ['mac', 'ip', 'pool', 'duid'] +stale_warn_msg = 'DHCP server is configured but not started. Data may be stale.' + ArgFamily = typing.Literal['inet', 'inet6'] -ArgState = typing.Literal['all', 'active', 'free', 'expired', 'released', 'abandoned', 'reset', 'backup'] +ArgState = typing.Literal[ + 'all', 'active', 'free', 'expired', 'released', 'abandoned', 'reset', 'backup' +] ArgOrigin = typing.Literal['local', 'remote'] -def _utc_to_local(utc_dt): - return datetime.fromtimestamp((datetime.fromtimestamp(utc_dt) - datetime(1970, 1, 1)).total_seconds()) - - -def _format_hex_string(in_str): - out_str = "" - # if input is divisible by 2, add : every 2 chars - if len(in_str) > 0 and len(in_str) % 2 == 0: - out_str = ':'.join(a+b for a,b in zip(in_str[::2], in_str[1::2])) - else: - out_str = in_str - - return out_str - - -def _find_list_of_dict_index(lst, key='ip', value='') -> int: - """ - Find the index entry of list of dict matching the dict value - Exampe: - % lst = [{'ip': '192.0.2.1'}, {'ip': '192.0.2.2'}] - % _find_list_of_dict_index(lst, key='ip', value='192.0.2.2') - % 1 - """ - idx = next((index for (index, d) in enumerate(lst) if d[key] == value), None) - return idx - -def _get_raw_server_leases(family='inet', pool=None, sorted=None, state=[], origin=None) -> list: - """ - Get DHCP server leases - :return list - """ +def _get_raw_server_leases( + config, family='inet', pool=None, sorted=None, state=[], origin=None +) -> list: inet_suffix = '6' if family == 'inet6' else '4' - try: - leases = kea_get_leases(inet_suffix) - except: - raise vyos.opmode.DataUnavailable('Cannot fetch DHCP server lease information') - - if pool is None: - pool = _get_dhcp_pools(family=family) - else: - pool = [pool] - - try: - active_config = kea_get_active_config(inet_suffix) - except: - raise vyos.opmode.DataUnavailable('Cannot fetch DHCP server configuration') + pools = [pool] if pool else kea_get_dhcp_pools(config, inet_suffix) - data = [] - for lease in leases: - lifetime = lease['valid-lft'] - expiry = (lease['cltt'] + lifetime) - - lease['start_timestamp'] = datetime.utcfromtimestamp(expiry - lifetime) - lease['expire_timestamp'] = datetime.utcfromtimestamp(expiry) if expiry else None - - data_lease = {} - data_lease['ip'] = lease['ip-address'] - lease_state_long = {0: 'active', 1: 'rejected', 2: 'expired'} - data_lease['state'] = lease_state_long[lease['state']] - data_lease['pool'] = kea_get_pool_from_subnet_id(active_config, inet_suffix, lease['subnet-id']) if active_config else '-' - data_lease['end'] = lease['expire_timestamp'].timestamp() if lease['expire_timestamp'] else None - data_lease['origin'] = 'local' # TODO: Determine remote in HA - - if family == 'inet': - data_lease['mac'] = lease['hw-address'] - data_lease['start'] = lease['start_timestamp'].timestamp() - data_lease['hostname'] = lease['hostname'] - - if family == 'inet6': - data_lease['last_communication'] = lease['start_timestamp'].timestamp() - data_lease['duid'] = _format_hex_string(lease['duid']) - data_lease['type'] = lease['type'] - - if lease['type'] == 'IA_PD': - prefix_len = lease['prefix-len'] - data_lease['ip'] += f'/{prefix_len}' - - data_lease['remaining'] = '-' - - if lease['valid-lft'] > 0: - data_lease['remaining'] = lease['expire_timestamp'] - datetime.utcnow() - - if data_lease['remaining'].days >= 0: - # substraction gives us a timedelta object which can't be formatted with strftime - # so we use str(), split gets rid of the microseconds - data_lease['remaining'] = str(data_lease["remaining"]).split('.')[0] - - # Do not add old leases - if data_lease['remaining'] != '' and data_lease['pool'] in pool and data_lease['state'] != 'free': - if not state or state == 'all' or data_lease['state'] in state: - data.append(data_lease) - - # deduplicate - checked = [] - for entry in data: - addr = entry.get('ip') - if addr not in checked: - checked.append(addr) - else: - idx = _find_list_of_dict_index(data, key='ip', value=addr) - data.pop(idx) + mappings = kea_get_server_leases(config, inet_suffix, pools, state, origin) if sorted: if sorted == 'ip': - data.sort(key = lambda x:ip_address(x['ip'])) + mappings.sort(key=lambda x: ip_address(x['ip'])) else: - data.sort(key = lambda x:x[sorted]) - return data + mappings.sort(key=lambda x: x[sorted]) + return mappings def _get_formatted_server_leases(raw_data, family='inet'): @@ -165,46 +105,67 @@ def _get_formatted_server_leases(raw_data, family='inet'): ipaddr = lease.get('ip') hw_addr = lease.get('mac') state = lease.get('state') - start = lease.get('start') - start = _utc_to_local(start).strftime('%Y/%m/%d %H:%M:%S') - end = lease.get('end') - end = _utc_to_local(end).strftime('%Y/%m/%d %H:%M:%S') if end else '-' + start = datetime.fromtimestamp(lease.get('start'), timezone.utc) + end = ( + datetime.fromtimestamp(lease.get('end'), timezone.utc) + if lease.get('end') + else '-' + ) remain = lease.get('remaining') pool = lease.get('pool') hostname = lease.get('hostname') origin = lease.get('origin') - data_entries.append([ipaddr, hw_addr, state, start, end, remain, pool, hostname, origin]) - - headers = ['IP Address', 'MAC address', 'State', 'Lease start', 'Lease expiration', 'Remaining', 'Pool', - 'Hostname', 'Origin'] + data_entries.append( + [ipaddr, hw_addr, state, start, end, remain, pool, hostname, origin] + ) + + headers = [ + 'IP Address', + 'MAC address', + 'State', + 'Lease start', + 'Lease expiration', + 'Remaining', + 'Pool', + 'Hostname', + 'Origin', + ] if family == 'inet6': for lease in raw_data: ipaddr = lease.get('ip') state = lease.get('state') - start = lease.get('last_communication') - start = _utc_to_local(start).strftime('%Y/%m/%d %H:%M:%S') - end = lease.get('end') - end = _utc_to_local(end).strftime('%Y/%m/%d %H:%M:%S') + start = datetime.fromtimestamp( + lease.get('last_communication'), timezone.utc + ) + end = ( + datetime.fromtimestamp(lease.get('end'), timezone.utc) + if lease.get('end') + else '-' + ) remain = lease.get('remaining') lease_type = lease.get('type') pool = lease.get('pool') host_identifier = lease.get('duid') - data_entries.append([ipaddr, state, start, end, remain, lease_type, pool, host_identifier]) - - headers = ['IPv6 address', 'State', 'Last communication', 'Lease expiration', 'Remaining', 'Type', 'Pool', - 'DUID'] + data_entries.append( + [ipaddr, state, start, end, remain, lease_type, pool, host_identifier] + ) + + headers = [ + 'IPv6 address', + 'State', + 'Last communication', + 'Lease expiration', + 'Remaining', + 'Type', + 'Pool', + 'DUID', + ] output = tabulate(data_entries, headers, numalign='left') return output -def _get_dhcp_pools(family='inet') -> list: - v = 'v6' if family == 'inet6' else '' - pools = config.list_nodes(f'service dhcp{v}-server shared-network-name') - return pools - - def _get_pool_size(pool, family='inet'): v = 'v6' if family == 'inet6' else '' base = f'service dhcp{v}-server shared-network-name {pool}' @@ -224,26 +185,27 @@ def _get_pool_size(pool, family='inet'): return size -def _get_raw_pool_statistics(family='inet', pool=None): - if pool is None: - pool = _get_dhcp_pools(family=family) - else: - pool = [pool] +def _get_raw_server_pool_statistics(config, family='inet', pool=None): + inet_suffix = '6' if family == 'inet6' else '4' + pools = [pool] if pool else kea_get_dhcp_pools(config, inet_suffix) - v = 'v6' if family == 'inet6' else '' stats = [] - for p in pool: - subnet = config.list_nodes(f'service dhcp{v}-server shared-network-name {p} subnet') + for p in pools: size = _get_pool_size(family=family, pool=p) - leases = len(_get_raw_server_leases(family=family, pool=p)) + leases = len(_get_raw_server_leases(config, family=family, pool=p)) use_percentage = round(leases / size * 100) if size != 0 else 0 - pool_stats = {'pool': p, 'size': size, 'leases': leases, - 'available': (size - leases), 'use_percentage': use_percentage, 'subnet': subnet} + pool_stats = { + 'pool': p, + 'size': size, + 'leases': leases, + 'available': (size - leases), + 'use_percentage': use_percentage, + } stats.append(pool_stats) return stats -def _get_formatted_pool_statistics(pool_data, family='inet'): +def _get_formatted_server_pool_statistics(pool_data): data_entries = [] for entry in pool_data: pool = entry.get('pool') @@ -254,53 +216,52 @@ def _get_formatted_pool_statistics(pool_data, family='inet'): use_percentage = f'{use_percentage}%' data_entries.append([pool, size, leases, available, use_percentage]) - headers = ['Pool', 'Size','Leases', 'Available', 'Usage'] + headers = ['Pool', 'Size', 'Leases', 'Available', 'Usage'] output = tabulate(data_entries, headers, numalign='left') return output -def _get_raw_server_static_mappings(family='inet', pool=None, sorted=None): - if pool is None: - pool = _get_dhcp_pools(family=family) - else: - pool = [pool] - v = 'v6' if family == 'inet6' else '' - mappings = [] - for p in pool: - pool_config = config.get_config_dict(['service', f'dhcp{v}-server', 'shared-network-name', p], - get_first_key=True) - if 'subnet' in pool_config: - for subnet, subnet_config in pool_config['subnet'].items(): - if 'static-mapping' in subnet_config: - for name, mapping_config in subnet_config['static-mapping'].items(): - mapping = {'pool': p, 'subnet': subnet, 'name': name} - mapping.update(mapping_config) - mappings.append(mapping) +def _get_raw_server_static_mappings(config, family='inet', pool=None, sorted=None): + inet_suffix = '6' if family == 'inet6' else '4' + pools = [pool] if pool else kea_get_dhcp_pools(config, inet_suffix) + + mappings = kea_get_static_mappings(config, inet_suffix, pools) if sorted: if sorted == 'ip': - data.sort(key = lambda x:ip_address(x['ip-address'])) + mappings.sort(key=lambda x: ip_address(x['ip'])) else: - data.sort(key = lambda x:x[sorted]) + mappings.sort(key=lambda x: x[sorted]) return mappings -def _get_formatted_server_static_mappings(raw_data, family='inet'): + +def _get_formatted_server_static_mappings(raw_data): data_entries = [] + for entry in raw_data: pool = entry.get('pool') subnet = entry.get('subnet') - name = entry.get('name') - ip_addr = entry.get('ip-address', 'N/A') + hostname = entry.get('hostname') + ip_addr = entry.get('ip', 'N/A') mac_addr = entry.get('mac', 'N/A') duid = entry.get('duid', 'N/A') - description = entry.get('description', 'N/A') - data_entries.append([pool, subnet, name, ip_addr, mac_addr, duid, description]) - - headers = ['Pool', 'Subnet', 'Name', 'IP Address', 'MAC Address', 'DUID', 'Description'] + desc = entry.get('description', 'N/A') + data_entries.append([pool, subnet, hostname, ip_addr, mac_addr, duid, desc]) + + headers = [ + 'Pool', + 'Subnet', + 'Hostname', + 'IP Address', + 'MAC Address', + 'DUID', + 'Description', + ] output = tabulate(data_entries, headers, numalign='left') return output -def _verify(func): + +def _verify_server(func): """Decorator checks if DHCP(v6) config exists""" from functools import wraps @@ -314,8 +275,10 @@ def _verify(func): if not config.exists(f'service dhcp{v}-server'): raise vyos.opmode.UnconfiguredSubsystem(unconf_message) return func(*args, **kwargs) + return _wrapper + def _verify_client(func): """Decorator checks if interface is configured as DHCP client""" from functools import wraps @@ -334,67 +297,124 @@ def _verify_client(func): if not config.exists(f'interfaces {interface_path} address dhcp{v}'): raise vyos.opmode.UnconfiguredObject(unconf_message) return func(*args, **kwargs) + return _wrapper -@_verify -def show_pool_statistics(raw: bool, family: ArgFamily, pool: typing.Optional[str]): - pool_data = _get_raw_pool_statistics(family=family, pool=pool) + +@_verify_server +def show_server_pool_statistics( + raw: bool, family: ArgFamily, pool: typing.Optional[str] +): + v = 'v6' if family == 'inet6' else '' + inet_suffix = '6' if family == 'inet6' else '4' + + if not is_systemd_service_running(f'kea-dhcp{inet_suffix}-server.service'): + Warning(stale_warn_msg) + + try: + active_config = kea_get_active_config(inet_suffix) + except Exception: + raise vyos.opmode.DataUnavailable('Cannot fetch DHCP server configuration') + + active_pools = kea_get_dhcp_pools(active_config, inet_suffix) + + if pool and active_pools and pool not in active_pools: + raise vyos.opmode.IncorrectValue(f'DHCP{v} pool "{pool}" does not exist!') + + pool_data = _get_raw_server_pool_statistics(active_config, family=family, pool=pool) if raw: return pool_data else: - return _get_formatted_pool_statistics(pool_data, family=family) + return _get_formatted_server_pool_statistics(pool_data) + + +@_verify_server +def show_server_leases( + raw: bool, + family: ArgFamily, + pool: typing.Optional[str], + sorted: typing.Optional[str], + state: typing.Optional[ArgState], + origin: typing.Optional[ArgOrigin], +): + v = 'v6' if family == 'inet6' else '' + inet_suffix = '6' if family == 'inet6' else '4' + + if not is_systemd_service_running(f'kea-dhcp{inet_suffix}-server.service'): + Warning(stale_warn_msg) + try: + active_config = kea_get_active_config(inet_suffix) + except Exception: + raise vyos.opmode.DataUnavailable('Cannot fetch DHCP server configuration') -@_verify -def show_server_leases(raw: bool, family: ArgFamily, pool: typing.Optional[str], - sorted: typing.Optional[str], state: typing.Optional[ArgState], - origin: typing.Optional[ArgOrigin] ): - # if dhcp server is down, inactive leases may still be shown as active, so warn the user. - v = '6' if family == 'inet6' else '4' - if not is_systemd_service_running(f'kea-dhcp{v}-server.service'): - Warning('DHCP server is configured but not started. Data may be stale.') + active_pools = kea_get_dhcp_pools(active_config, inet_suffix) - v = 'v6' if family == 'inet6' else '' - if pool and pool not in _get_dhcp_pools(family=family): + if pool and active_pools and pool not in active_pools: raise vyos.opmode.IncorrectValue(f'DHCP{v} pool "{pool}" does not exist!') - if state and state not in lease_valid_states: - raise vyos.opmode.IncorrectValue(f'DHCP{v} state "{state}" is invalid!') - sort_valid = sort_valid_inet6 if family == 'inet6' else sort_valid_inet if sorted and sorted not in sort_valid: raise vyos.opmode.IncorrectValue(f'DHCP{v} sort "{sorted}" is invalid!') - lease_data = _get_raw_server_leases(family=family, pool=pool, sorted=sorted, state=state, origin=origin) + if state and state not in lease_valid_states: + raise vyos.opmode.IncorrectValue(f'DHCP{v} state "{state}" is invalid!') + + lease_data = _get_raw_server_leases( + config=active_config, + family=family, + pool=pool, + sorted=sorted, + state=state, + origin=origin, + ) if raw: return lease_data else: return _get_formatted_server_leases(lease_data, family=family) -@_verify -def show_server_static_mappings(raw: bool, family: ArgFamily, pool: typing.Optional[str], - sorted: typing.Optional[str]): + +@_verify_server +def show_server_static_mappings( + raw: bool, + family: ArgFamily, + pool: typing.Optional[str], + sorted: typing.Optional[str], +): v = 'v6' if family == 'inet6' else '' - if pool and pool not in _get_dhcp_pools(family=family): + inet_suffix = '6' if family == 'inet6' else '4' + + if not is_systemd_service_running(f'kea-dhcp{inet_suffix}-server.service'): + Warning(stale_warn_msg) + + try: + active_config = kea_get_active_config(inet_suffix) + except Exception: + raise vyos.opmode.DataUnavailable('Cannot fetch DHCP server configuration') + + active_pools = kea_get_dhcp_pools(active_config, inet_suffix) + + if pool and active_pools and pool not in active_pools: raise vyos.opmode.IncorrectValue(f'DHCP{v} pool "{pool}" does not exist!') if sorted and sorted not in mapping_sort_valid: raise vyos.opmode.IncorrectValue(f'DHCP{v} sort "{sorted}" is invalid!') - static_mappings = _get_raw_server_static_mappings(family=family, pool=pool, sorted=sorted) + static_mappings = _get_raw_server_static_mappings( + config=active_config, family=family, pool=pool, sorted=sorted + ) if raw: return static_mappings else: - return _get_formatted_server_static_mappings(static_mappings, family=family) + return _get_formatted_server_static_mappings(static_mappings) + def _lease_valid(inet, address): leases = kea_get_leases(inet) - for lease in leases: - if address == lease['ip-address']: - return True - return False + return any(lease['ip-address'] == address for lease in leases) -@_verify + +@_verify_server def clear_dhcp_server_lease(family: ArgFamily, address: str): v = 'v6' if family == 'inet6' else '' inet = '6' if family == 'inet6' else '4' @@ -409,6 +429,7 @@ def clear_dhcp_server_lease(family: ArgFamily, address: str): print(f'Lease "{address}" has been cleared') + def _get_raw_client_leases(family='inet', interface=None): from time import mktime from datetime import datetime @@ -437,21 +458,29 @@ def _get_raw_client_leases(family='inet', interface=None): # format this makes less sense for an API and also the expiry # timestamp is provided in UNIX time. Convert string (e.g. Sun Jul # 30 18:13:44 CEST 2023) to UNIX time (1690733624) - tmp.update({'last_update' : int(mktime(datetime.strptime(line, time_string).timetuple()))}) + tmp.update( + { + 'last_update': int( + mktime(datetime.strptime(line, time_string).timetuple()) + ) + } + ) continue k, v = line.split('=') - tmp.update({k : v.replace("'", "")}) + tmp.update({k: v.replace("'", '')}) if 'interface' in tmp: vrf = get_interface_vrf(tmp['interface']) - if vrf: tmp.update({'vrf' : vrf}) + if vrf: + tmp.update({'vrf': vrf}) lease_data.append(tmp) return lease_data -def _get_formatted_client_leases(lease_data, family): + +def _get_formatted_client_leases(lease_data): from time import localtime from time import strftime @@ -461,30 +490,34 @@ def _get_formatted_client_leases(lease_data, family): for lease in lease_data: if not lease.get('new_ip_address'): continue - data_entries.append(["Interface", lease['interface']]) + data_entries.append(['Interface', lease['interface']]) if 'new_ip_address' in lease: - tmp = '[Active]' if is_intf_addr_assigned(lease['interface'], lease['new_ip_address']) else '[Inactive]' - data_entries.append(["IP address", lease['new_ip_address'], tmp]) + tmp = ( + '[Active]' + if is_intf_addr_assigned(lease['interface'], lease['new_ip_address']) + else '[Inactive]' + ) + data_entries.append(['IP address', lease['new_ip_address'], tmp]) if 'new_subnet_mask' in lease: - data_entries.append(["Subnet Mask", lease['new_subnet_mask']]) + data_entries.append(['Subnet Mask', lease['new_subnet_mask']]) if 'new_domain_name' in lease: - data_entries.append(["Domain Name", lease['new_domain_name']]) + data_entries.append(['Domain Name', lease['new_domain_name']]) if 'new_routers' in lease: - data_entries.append(["Router", lease['new_routers']]) + data_entries.append(['Router', lease['new_routers']]) if 'new_domain_name_servers' in lease: - data_entries.append(["Name Server", lease['new_domain_name_servers']]) + data_entries.append(['Name Server', lease['new_domain_name_servers']]) if 'new_dhcp_server_identifier' in lease: - data_entries.append(["DHCP Server", lease['new_dhcp_server_identifier']]) + data_entries.append(['DHCP Server', lease['new_dhcp_server_identifier']]) if 'new_dhcp_lease_time' in lease: - data_entries.append(["DHCP Server", lease['new_dhcp_lease_time']]) + data_entries.append(['DHCP Server', lease['new_dhcp_lease_time']]) if 'vrf' in lease: - data_entries.append(["VRF", lease['vrf']]) + data_entries.append(['VRF', lease['vrf']]) if 'last_update' in lease: tmp = strftime(time_string, localtime(int(lease['last_update']))) - data_entries.append(["Last Update", tmp]) + data_entries.append(['Last Update', tmp]) if 'new_expiry' in lease: tmp = strftime(time_string, localtime(int(lease['new_expiry']))) - data_entries.append(["Expiry", tmp]) + data_entries.append(['Expiry', tmp]) # Add empty marker data_entries.append(['']) @@ -493,12 +526,14 @@ def _get_formatted_client_leases(lease_data, family): return output + def show_client_leases(raw: bool, family: ArgFamily, interface: typing.Optional[str]): lease_data = _get_raw_client_leases(family=family, interface=interface) if raw: return lease_data else: - return _get_formatted_client_leases(lease_data, family=family) + return _get_formatted_client_leases(lease_data) + @_verify_client def renew_client_lease(raw: bool, family: ArgFamily, interface: str): @@ -510,6 +545,7 @@ def renew_client_lease(raw: bool, family: ArgFamily, interface: str): else: call(f'systemctl restart dhclient@{interface}.service') + @_verify_client def release_client_lease(raw: bool, family: ArgFamily, interface: str): if not raw: @@ -520,6 +556,7 @@ def release_client_lease(raw: bool, family: ArgFamily, interface: str): else: call(f'systemctl stop dhclient@{interface}.service') + if __name__ == '__main__': try: res = vyos.opmode.run(sys.modules[__name__]) diff --git a/src/op_mode/firewall.py b/src/op_mode/firewall.py index c197ca434..f3309ee34 100755 --- a/src/op_mode/firewall.py +++ b/src/op_mode/firewall.py @@ -18,6 +18,7 @@ import argparse import ipaddress import json import re +from signal import signal, SIGPIPE, SIG_DFL import tabulate import textwrap @@ -25,6 +26,9 @@ from vyos.config import Config from vyos.utils.process import cmd from vyos.utils.dict import dict_search_args +signal(SIGPIPE, SIG_DFL) + + def get_config_node(conf, node=None, family=None, hook=None, priority=None): if node == 'nat': if family == 'ipv6': @@ -148,6 +152,38 @@ def get_nftables_group_members(family, table, name): return out +def get_nftables_remote_group_members(family, table, name): + prefix = 'ip6' if family == 'ipv6' else 'ip' + out = [] + + try: + results_str = cmd(f'nft -j list set {prefix} {table} {name}') + results = json.loads(results_str) + except: + return out + + if 'nftables' not in results: + return out + + for obj in results['nftables']: + if 'set' not in obj: + continue + + set_obj = obj['set'] + if 'elem' in set_obj: + for elem in set_obj['elem']: + # search for single IP elements + if isinstance(elem, str): + out.append(elem) + # search for prefix elements + elif isinstance(elem, dict) and 'prefix' in elem: + out.append(f"{elem['prefix']['addr']}/{elem['prefix']['len']}") + # search for IP range elements + elif isinstance(elem, dict) and 'range' in elem: + out.append(f"{elem['range'][0]}-{elem['range'][1]}") + + return out + def output_firewall_vertical(rules, headers, adjust=True): for rule in rules: adjusted_rule = rule + [""] * (len(headers) - len(rule)) if adjust else rule # account for different header length, like default-action @@ -253,15 +289,17 @@ def output_firewall_name_statistics(family, hook, prior, prior_conf, single_rule if not source_addr: source_addr = dict_search_args(rule_conf, 'source', 'group', 'domain_group') if not source_addr: - source_addr = dict_search_args(rule_conf, 'source', 'fqdn') + source_addr = dict_search_args(rule_conf, 'source', 'group', 'remote_group') if not source_addr: - source_addr = dict_search_args(rule_conf, 'source', 'geoip', 'country_code') - if source_addr: - source_addr = str(source_addr)[1:-1].replace('\'','') - if 'inverse_match' in dict_search_args(rule_conf, 'source', 'geoip'): - source_addr = 'NOT ' + str(source_addr) + source_addr = dict_search_args(rule_conf, 'source', 'fqdn') if not source_addr: - source_addr = 'any' + source_addr = dict_search_args(rule_conf, 'source', 'geoip', 'country_code') + if source_addr: + source_addr = str(source_addr)[1:-1].replace('\'','') + if 'inverse_match' in dict_search_args(rule_conf, 'source', 'geoip'): + source_addr = 'NOT ' + str(source_addr) + if not source_addr: + source_addr = 'any' # Get destination dest_addr = dict_search_args(rule_conf, 'destination', 'address') @@ -272,15 +310,17 @@ def output_firewall_name_statistics(family, hook, prior, prior_conf, single_rule if not dest_addr: dest_addr = dict_search_args(rule_conf, 'destination', 'group', 'domain_group') if not dest_addr: - dest_addr = dict_search_args(rule_conf, 'destination', 'fqdn') + dest_addr = dict_search_args(rule_conf, 'destination', 'group', 'remote_group') if not dest_addr: - dest_addr = dict_search_args(rule_conf, 'destination', 'geoip', 'country_code') - if dest_addr: - dest_addr = str(dest_addr)[1:-1].replace('\'','') - if 'inverse_match' in dict_search_args(rule_conf, 'destination', 'geoip'): - dest_addr = 'NOT ' + str(dest_addr) + dest_addr = dict_search_args(rule_conf, 'destination', 'fqdn') if not dest_addr: - dest_addr = 'any' + dest_addr = dict_search_args(rule_conf, 'destination', 'geoip', 'country_code') + if dest_addr: + dest_addr = str(dest_addr)[1:-1].replace('\'','') + if 'inverse_match' in dict_search_args(rule_conf, 'destination', 'geoip'): + dest_addr = 'NOT ' + str(dest_addr) + if not dest_addr: + dest_addr = 'any' # Get inbound interface iiface = dict_search_args(rule_conf, 'inbound_interface', 'name') @@ -552,30 +592,8 @@ def show_firewall_group(name=None): header_tail = [] for group_type, group_type_conf in firewall['group'].items(): - ## - if group_type != 'dynamic_group': - - for group_name, group_conf in group_type_conf.items(): - if name and name != group_name: - continue - - references = find_references(group_type, group_name) - row = [group_name, textwrap.fill(group_conf.get('description') or '', 50), group_type, '\n'.join(references) or 'N/D'] - if 'address' in group_conf: - row.append("\n".join(sorted(group_conf['address']))) - elif 'network' in group_conf: - row.append("\n".join(sorted(group_conf['network'], key=ipaddress.ip_network))) - elif 'mac_address' in group_conf: - row.append("\n".join(sorted(group_conf['mac_address']))) - elif 'port' in group_conf: - row.append("\n".join(sorted(group_conf['port']))) - elif 'interface' in group_conf: - row.append("\n".join(sorted(group_conf['interface']))) - else: - row.append('N/D') - rows.append(row) - - else: + # interate over dynamic-groups + if group_type == 'dynamic_group': if not args.detail: header_tail = ['Timeout', 'Expires'] @@ -584,6 +602,9 @@ def show_firewall_group(name=None): prefix = 'DA_' if dynamic_type == 'address_group' else 'DA6_' if dynamic_type in firewall['group']['dynamic_group']: for dynamic_name, dynamic_conf in firewall['group']['dynamic_group'][dynamic_type].items(): + if name and name != dynamic_name: + continue + references = find_references(dynamic_type, dynamic_name) row = [dynamic_name, textwrap.fill(dynamic_conf.get('description') or '', 50), dynamic_type + '(dynamic)', '\n'.join(references) or 'N/D'] @@ -622,6 +643,68 @@ def show_firewall_group(name=None): header_tail += [""] * (len(members) - 1) rows.append(row) + # iterate over remote-groups + elif group_type == 'remote_group': + for remote_name, remote_conf in group_type_conf.items(): + if name and name != remote_name: + continue + + references = find_references(group_type, remote_name) + row = [remote_name, textwrap.fill(remote_conf.get('description') or '', 50), group_type, '\n'.join(references) or 'N/D'] + members = get_nftables_remote_group_members("ipv4", 'vyos_filter', f'R_{remote_name}') + members6 = get_nftables_remote_group_members("ipv6", 'vyos_filter', f'R6_{remote_name}') + + if 'url' in remote_conf: + # display only the url if no members are found for both views + if not members and not members6: + if args.detail: + header_tail = ['IPv6 Members', 'Remote URL'] + row.append('N/D') + row.append('N/D') + row.append(remote_conf['url']) + else: + row.append(remote_conf['url']) + rows.append(row) + else: + # display all table elements in detail view + if args.detail: + header_tail = ['IPv6 Members', 'Remote URL'] + if members: + row.append(' '.join(members)) + else: + row.append('N/D') + if members6: + row.append(' '.join(members6)) + else: + row.append('N/D') + row.append(remote_conf['url']) + rows.append(row) + else: + row.append(remote_conf['url']) + rows.append(row) + + # catch the rest of the group types + else: + for group_name, group_conf in group_type_conf.items(): + if name and name != group_name: + continue + + references = find_references(group_type, group_name) + row = [group_name, textwrap.fill(group_conf.get('description') or '', 50), group_type, '\n'.join(references) or 'N/D'] + if 'address' in group_conf: + row.append("\n".join(sorted(group_conf['address']))) + elif 'network' in group_conf: + row.append("\n".join(sorted(group_conf['network'], key=ipaddress.ip_network))) + elif 'mac_address' in group_conf: + row.append("\n".join(sorted(group_conf['mac_address']))) + elif 'port' in group_conf: + row.append("\n".join(sorted(group_conf['port']))) + elif 'interface' in group_conf: + row.append("\n".join(sorted(group_conf['interface']))) + else: + row.append('N/D') + rows.append(row) + if rows: print('Firewall Groups\n') if args.detail: diff --git a/src/op_mode/generate_psk.py b/src/op_mode/generate_psk.py new file mode 100644 index 000000000..d51293712 --- /dev/null +++ b/src/op_mode/generate_psk.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2024 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +import argparse + +from vyos.utils.process import cmd + + +def validate_hex_size(value): + """Validate that the hex_size is between 32 and 512.""" + try: + value = int(value) + except ValueError: + raise argparse.ArgumentTypeError("hex_size must be integer.") + + if value < 32 or value > 512: + raise argparse.ArgumentTypeError("hex_size must be between 32 and 512.") + return value + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument( + "--hex_size", + type=validate_hex_size, + help='PKS value size in hex format. Default is 32 bytes.', + default=32, + + required=False, + ) + args = parser.parse_args() + + print(cmd(f'openssl rand -hex {args.hex_size}'))
\ No newline at end of file diff --git a/src/op_mode/image_installer.py b/src/op_mode/image_installer.py index bdc16de15..ac5a84419 100755 --- a/src/op_mode/image_installer.py +++ b/src/op_mode/image_installer.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright 2023-2024 VyOS maintainers and contributors <maintainers@vyos.io> +# Copyright 2023-2025 VyOS maintainers and contributors <maintainers@vyos.io> # # This file is part of VyOS. # @@ -24,7 +24,9 @@ from glob import glob from sys import exit from os import environ from os import readlink -from os import getpid, getppid +from os import getpid +from os import getppid +from json import loads from typing import Union from urllib.parse import urlparse from passlib.hosts import linux_context @@ -32,22 +34,41 @@ from errno import ENOSPC from psutil import disk_partitions +from vyos.base import Warning from vyos.configtree import ConfigTree -from vyos.configquery import ConfigTreeQuery from vyos.remote import download -from vyos.system import disk, grub, image, compat, raid, SYSTEM_CFG_VER +from vyos.system import disk +from vyos.system import grub +from vyos.system import image +from vyos.system import compat +from vyos.system import raid +from vyos.system import SYSTEM_CFG_VER +from vyos.system import grub_util from vyos.template import render +from vyos.utils.auth import ( + DEFAULT_PASSWORD, + EPasswdStrength, + evaluate_strength +) +from vyos.utils.dict import dict_search from vyos.utils.io import ask_input, ask_yes_no, select_entry from vyos.utils.file import chmod_2775 -from vyos.utils.process import cmd, run -from vyos.version import get_remote_version, get_version_data +from vyos.utils.file import read_file +from vyos.utils.process import cmd, run, rc_cmd +from vyos.version import get_version_data # define text messages MSG_ERR_NOT_LIVE: str = 'The system is already installed. Please use "add system image" instead.' MSG_ERR_LIVE: str = 'The system is in live-boot mode. Please use "install image" instead.' MSG_ERR_NO_DISK: str = 'No suitable disk was found. There must be at least one disk of 2GB or greater size.' MSG_ERR_IMPROPER_IMAGE: str = 'Missing sha256sum.txt.\nEither this image is corrupted, or of era 1.2.x (md5sum) and would downgrade image tools;\ndisallowed in either case.' -MSG_ERR_ARCHITECTURE_MISMATCH: str = 'Upgrading to a different image architecture will break your system.' +MSG_ERR_INCOMPATIBLE_IMAGE: str = 'Image compatibility check failed, aborting installation.' +MSG_ERR_ARCHITECTURE_MISMATCH: str = 'The current architecture is "{0}", the new image is for "{1}". Upgrading to a different image architecture will break your system.' +MSG_ERR_FLAVOR_MISMATCH: str = 'The current image flavor is "{0}", the new image is "{1}". Upgrading to a non-matching flavor can have unpredictable consequences.' +MSG_ERR_MISSING_ARCHITECTURE: str = 'The new image version data does not specify architecture, cannot check compatibility (is it a legacy release image?)' +MSG_ERR_MISSING_FLAVOR: str = 'The new image version data does not specify flavor, cannot check compatibility (is it a legacy release image?)' +MSG_ERR_CORRUPT_CURRENT_IMAGE: str = 'Version data in the current image is malformed: missing flavor and/or architecture fields. Upgrade compatibility cannot be checked.' +MSG_ERR_UNSUPPORTED_SIGNATURE_TYPE: str = 'Unsupported signature type, signature cannot be verified.' MSG_INFO_INSTALL_WELCOME: str = 'Welcome to VyOS installation!\nThis command will install VyOS to your permanent storage.' MSG_INFO_INSTALL_EXIT: str = 'Exiting from VyOS installation' MSG_INFO_INSTALL_SUCCESS: str = 'The image installed successfully; please reboot now.' @@ -63,6 +84,7 @@ MSG_INPUT_CONFIG_FOUND: str = 'An active configuration was found. Would you like MSG_INPUT_CONFIG_CHOICE: str = 'The following config files are available for boot:' MSG_INPUT_CONFIG_CHOOSE: str = 'Which file would you like as boot config?' MSG_INPUT_IMAGE_NAME: str = 'What would you like to name this image?' +MSG_INPUT_IMAGE_NAME_TAKEN: str = 'There is already an installed image by that name; please choose again' MSG_INPUT_IMAGE_DEFAULT: str = 'Would you like to set the new image as the default one for boot?' MSG_INPUT_PASSWORD: str = 'Please enter a password for the "vyos" user:' MSG_INPUT_PASSWORD_CONFIRM: str = 'Please confirm password for the "vyos" user:' @@ -79,8 +101,10 @@ MSG_WARN_ROOT_SIZE_TOOBIG: str = 'The size is too big. Try again.' MSG_WARN_ROOT_SIZE_TOOSMALL: str = 'The size is too small. Try again' MSG_WARN_IMAGE_NAME_WRONG: str = 'The suggested name is unsupported!\n'\ 'It must be between 1 and 64 characters long and contains only the next characters: .+-_ a-z A-Z 0-9' + +MSG_WARN_CHANGE_PASSWORD: str = 'Default password used. Consider changing ' \ + 'it on next login.' MSG_WARN_PASSWORD_CONFIRM: str = 'The entered values did not match. Try again' -MSG_WARN_FLAVOR_MISMATCH: str = 'The running image flavor is "{0}". The new image flavor is "{1}".\n' \ 'Installing a different image flavor may cause functionality degradation or break your system.\n' \ 'Do you want to continue with installation?' CONST_MIN_DISK_SIZE: int = 2147483648 # 2 GB @@ -96,9 +120,10 @@ DIR_ISO_MOUNT: str = f'{DIR_INSTALLATION}/iso_src' DIR_DST_ROOT: str = f'{DIR_INSTALLATION}/disk_dst' DIR_KERNEL_SRC: str = '/boot/' FILE_ROOTFS_SRC: str = '/usr/lib/live/mount/medium/live/filesystem.squashfs' -ISO_DOWNLOAD_PATH: str = '/tmp/vyos_installation.iso' +ISO_DOWNLOAD_PATH: str = '' external_download_script = '/usr/libexec/vyos/simple-download.py' +external_latest_image_url_script = '/usr/libexec/vyos/latest-image-url.py' # default boot variables DEFAULT_BOOT_VARS: dict[str, str] = { @@ -462,6 +487,29 @@ def setup_grub(root_dir: str) -> None: render(grub_cfg_menu, grub.TMPL_GRUB_MENU, {}) render(grub_cfg_options, grub.TMPL_GRUB_OPTS, {}) +def get_cli_kernel_options(config_file: str) -> list: + config = ConfigTree(read_file(config_file)) + config_dict = loads(config.to_json()) + kernel_options = dict_search('system.option.kernel', config_dict) + if kernel_options is None: + kernel_options = {} + cmdline_options = [] + + # XXX: This code path and if statements must be kept in sync with the Kernel + # option handling in system_options.py:generate(). This occurance is used + # for having the appropriate options passed to GRUB after an image upgrade! + if 'disable-mitigations' in kernel_options: + cmdline_options.append('mitigations=off') + if 'disable-power-saving' in kernel_options: + cmdline_options.append('intel_idle.max_cstate=0 processor.max_cstate=1') + if 'amd-pstate-driver' in kernel_options: + mode = kernel_options['amd-pstate-driver'] + cmdline_options.append( + f'initcall_blacklist=acpi_cpufreq_init amd_pstate={mode}') + if 'quiet' in kernel_options: + cmdline_options.append('quiet') + + return cmdline_options def configure_authentication(config_file: str, password: str) -> None: """Write encrypted password to config file @@ -476,10 +524,7 @@ def configure_authentication(config_file: str, password: str) -> None: plaintext exposed """ encrypted_password = linux_context.hash(password) - - with open(config_file) as f: - config_string = f.read() - + config_string = read_file(config_file) config = ConfigTree(config_string) config.set([ 'system', 'login', 'user', 'vyos', 'authentication', @@ -501,7 +546,6 @@ def validate_signature(file_path: str, sign_type: str) -> None: """ print('Validating signature') signature_valid: bool = False - # validate with minisig if sign_type == 'minisig': pub_key_list = glob('/usr/share/vyos/keys/*.minisign.pub') for pubkey in pub_key_list: @@ -510,11 +554,8 @@ def validate_signature(file_path: str, sign_type: str) -> None: signature_valid = True break Path(f'{file_path}.minisig').unlink() - # validate with GPG - if sign_type == 'asc': - if run(f'gpg --verify ${file_path}.asc ${file_path}') == 0: - signature_valid = True - Path(f'{file_path}.asc').unlink() + else: + exit(MSG_ERR_UNSUPPORTED_SIGNATURE_TYPE) # warn or pass if not signature_valid: @@ -524,21 +565,18 @@ def validate_signature(file_path: str, sign_type: str) -> None: print('Signature is valid') def download_file(local_file: str, remote_path: str, vrf: str, - username: str, password: str, progressbar: bool = False, check_space: bool = False): - environ['REMOTE_USERNAME'] = username - environ['REMOTE_PASSWORD'] = password + # Server credentials are implicitly passed in environment variables + # that are set by add_image if vrf is None: download(local_file, remote_path, progressbar=progressbar, check_space=check_space, raise_error=True) else: - vrf_cmd = f'REMOTE_USERNAME={username} REMOTE_PASSWORD={password} \ - ip vrf exec {vrf} {external_download_script} \ - --local-file {local_file} --remote-path {remote_path}' - cmd(vrf_cmd) + vrf_cmd = f'ip vrf exec {vrf} {external_download_script} \ + --local-file {local_file} --remote-path {remote_path}' + cmd(vrf_cmd, env=environ) def image_fetch(image_path: str, vrf: str = None, - username: str = '', password: str = '', no_prompt: bool = False) -> Path: """Fetch an ISO image @@ -548,34 +586,44 @@ def image_fetch(image_path: str, vrf: str = None, Returns: Path: a path to a local file """ + import os.path + from uuid import uuid4 + + global ISO_DOWNLOAD_PATH + # Latest version gets url from configured "system update-check url" if image_path == 'latest': - config = ConfigTreeQuery() - if config.exists('system update-check url'): - configured_url_version = config.value('system update-check url') - remote_url_list = get_remote_version(configured_url_version) - image_path = remote_url_list[0].get('url') + command = external_latest_image_url_script + if vrf: + command = f'ip vrf exec {vrf} {command}' + code, output = rc_cmd(command, env=environ) + if code: + print(output) + exit(MSG_INFO_INSTALL_EXIT) + image_path = output if output else image_path try: # check a type of path if urlparse(image_path).scheme: - # download an image + # Download the image file + ISO_DOWNLOAD_PATH = os.path.join(os.path.expanduser("~"), '{0}.iso'.format(uuid4())) download_file(ISO_DOWNLOAD_PATH, image_path, vrf, - username, password, progressbar=True, check_space=True) - # download a signature + # Download the image signature + # VyOS only supports minisign signatures at the moment, + # but we keep the logic for multiple signatures + # in case we add something new in the future sign_file = (False, '') - for sign_type in ['minisig', 'asc']: + for sign_type in ['minisig']: try: download_file(f'{ISO_DOWNLOAD_PATH}.{sign_type}', - f'{image_path}.{sign_type}', vrf, - username, password) + f'{image_path}.{sign_type}', vrf) sign_file = (True, sign_type) break except Exception: - print(f'{sign_type} signature is not available') - # validate a signature if it is available + print(f'Could not download {sign_type} signature') + # Validate the signature if it is available if sign_file[0]: validate_signature(ISO_DOWNLOAD_PATH, sign_file[1]) else: @@ -697,30 +745,48 @@ def is_raid_install(install_object: Union[disk.DiskDetails, raid.RaidDetails]) - return False -def validate_compatibility(iso_path: str) -> None: +def validate_compatibility(iso_path: str, force: bool = False) -> None: """Check architecture and flavor compatibility with the running image Args: iso_path (str): a path to the mounted ISO image """ - old_data = get_version_data() - old_flavor = old_data.get('flavor', '') - old_architecture = old_data.get('architecture') or cmd('dpkg --print-architecture') + current_data = get_version_data() + current_flavor = current_data.get('flavor') + current_architecture = current_data.get('architecture') or cmd('dpkg --print-architecture') new_data = get_version_data(f'{iso_path}/version.json') - new_flavor = new_data.get('flavor', '') - new_architecture = new_data.get('architecture', '') + new_flavor = new_data.get('flavor') + new_architecture = new_data.get('architecture') - if not old_architecture == new_architecture: - print(MSG_ERR_ARCHITECTURE_MISMATCH) + if not current_flavor or not current_architecture: + # This may only happen if someone modified the version file. + # Unlikely but not impossible. + print(MSG_ERR_CORRUPT_CURRENT_IMAGE) cleanup() exit(MSG_INFO_INSTALL_EXIT) - if not old_flavor == new_flavor: - if not ask_yes_no(MSG_WARN_FLAVOR_MISMATCH.format(old_flavor, new_flavor), default=False): - cleanup() - exit(MSG_INFO_INSTALL_EXIT) + success = True + if current_architecture != new_architecture: + success = False + if not new_architecture: + print(MSG_ERR_MISSING_ARCHITECTURE) + else: + print(MSG_ERR_ARCHITECTURE_MISMATCH.format(current_architecture, new_architecture)) + + if current_flavor != new_flavor: + if not force: + success = False + if not new_flavor: + print(MSG_ERR_MISSING_FLAVOR) + else: + print(MSG_ERR_FLAVOR_MISMATCH.format(current_flavor, new_flavor)) + + if not success: + print(MSG_ERR_INCOMPATIBLE_IMAGE) + cleanup() + exit(MSG_INFO_INSTALL_EXIT) def install_image() -> None: """Install an image to a disk @@ -742,14 +808,25 @@ def install_image() -> None: break print(MSG_WARN_IMAGE_NAME_WRONG) + failed_check_status = [EPasswdStrength.WEAK, EPasswdStrength.ERROR] # ask for password while True: user_password: str = ask_input(MSG_INPUT_PASSWORD, no_echo=True, non_empty=True) + + if user_password == DEFAULT_PASSWORD: + Warning(MSG_WARN_CHANGE_PASSWORD) + else: + result = evaluate_strength(user_password) + if result['strength'] in failed_check_status: + Warning(result['error']) + confirm: str = ask_input(MSG_INPUT_PASSWORD_CONFIRM, no_echo=True, non_empty=True) + if user_password == confirm: break + print(MSG_WARN_PASSWORD_CONFIRM) # ask for default console @@ -845,8 +922,7 @@ def install_image() -> None: for disk_target in l: disk.partition_mount(disk_target.partition['efi'], f'{DIR_DST_ROOT}/boot/efi') grub.install(disk_target.name, f'{DIR_DST_ROOT}/boot/', - f'{DIR_DST_ROOT}/boot/efi', - id=f'VyOS (RAID disk {l.index(disk_target) + 1})') + f'{DIR_DST_ROOT}/boot/efi') disk.partition_umount(disk_target.partition['efi']) else: print('Installing GRUB to the drive') @@ -889,7 +965,7 @@ def install_image() -> None: @compat.grub_cfg_update def add_image(image_path: str, vrf: str = None, username: str = '', - password: str = '', no_prompt: bool = False) -> None: + password: str = '', no_prompt: bool = False, force: bool = False) -> None: """Add a new image Args: @@ -898,15 +974,18 @@ def add_image(image_path: str, vrf: str = None, username: str = '', if image.is_live_boot(): exit(MSG_ERR_LIVE) + environ['REMOTE_USERNAME'] = username + environ['REMOTE_PASSWORD'] = password + # fetch an image - iso_path: Path = image_fetch(image_path, vrf, username, password, no_prompt) + iso_path: Path = image_fetch(image_path, vrf, no_prompt) try: # mount an ISO Path(DIR_ISO_MOUNT).mkdir(mode=0o755, parents=True) disk.partition_mount(iso_path, DIR_ISO_MOUNT, 'iso9660') print('Validating image compatibility') - validate_compatibility(DIR_ISO_MOUNT) + validate_compatibility(DIR_ISO_MOUNT, force=force) # check sums print('Validating image checksums') @@ -932,8 +1011,12 @@ def add_image(image_path: str, vrf: str = None, username: str = '', f'Adding image would downgrade image tools to v.{cfg_ver}; disallowed') if not no_prompt: + versions = grub.version_list() while True: image_name: str = ask_input(MSG_INPUT_IMAGE_NAME, version_name) + if image_name in versions: + print(MSG_INPUT_IMAGE_NAME_TAKEN) + continue if image.validate_name(image_name): break print(MSG_WARN_IMAGE_NAME_WRONG) @@ -955,7 +1038,7 @@ def add_image(image_path: str, vrf: str = None, username: str = '', Path(target_config_dir).mkdir(parents=True) chown(target_config_dir, group='vyattacfg') chmod_2775(target_config_dir) - copytree('/opt/vyatta/etc/config/', target_config_dir, + copytree('/opt/vyatta/etc/config/', target_config_dir, symlinks=True, copy_function=copy_preserve_owner, dirs_exist_ok=True) else: Path(target_config_dir).mkdir(parents=True) @@ -988,6 +1071,12 @@ def add_image(image_path: str, vrf: str = None, username: str = '', if set_as_default: grub.set_default(image_name, root_dir) + cmdline_options = get_cli_kernel_options( + f'{target_config_dir}/config.boot') + grub_util.update_kernel_cmdline_options(' '.join(cmdline_options), + root_dir=root_dir, + version=image_name) + except OSError as e: # if no space error, remove image dir and cleanup if e.errno == ENOSPC: @@ -1027,6 +1116,9 @@ def parse_arguments() -> Namespace: parser.add_argument('--image-path', help='a path (HTTP or local file) to an image that needs to be installed' ) + parser.add_argument('--force', action='store_true', + help='Ignore flavor compatibility requirements.' + ) # parser.add_argument('--image_new_name', help='a new name for image') args: Namespace = parser.parse_args() # Validate arguments @@ -1043,7 +1135,8 @@ if __name__ == '__main__': install_image() if args.action == 'add': add_image(args.image_path, args.vrf, - args.username, args.password, args.no_prompt) + args.username, args.password, + args.no_prompt, args.force) exit() diff --git a/src/op_mode/interfaces.py b/src/op_mode/interfaces.py index e7afc4caa..c97f3b129 100755 --- a/src/op_mode/interfaces.py +++ b/src/op_mode/interfaces.py @@ -29,6 +29,7 @@ from vyos.ifconfig import Section from vyos.ifconfig import Interface from vyos.ifconfig import VRRP from vyos.utils.process import cmd +from vyos.utils.network import interface_exists from vyos.utils.process import rc_cmd from vyos.utils.process import call @@ -84,6 +85,14 @@ def filtered_interfaces(ifnames: typing.Union[str, list], yield interface +def detailed_output(dataset, headers): + for data in dataset: + adjusted_rule = data + [""] * (len(headers) - len(data)) # account for different header length, like default-action + transformed_rule = [[header, adjusted_rule[i]] for i, header in enumerate(headers) if i < len(adjusted_rule)] # create key-pair list from headers and rules lists; wrap at 100 char + + print(tabulate(transformed_rule, tablefmt="presto")) + print() + def _split_text(text, used=0): """ take a string and attempt to split it to fit with the width of the screen @@ -296,6 +305,114 @@ def _get_counter_data(ifname: typing.Optional[str], return ret +def _get_kernel_data(raw, ifname = None, detail = False): + if ifname: + # Check if the interface exists + if not interface_exists(ifname): + raise vyos.opmode.IncorrectValue(f"{ifname} does not exist!") + int_name = f'dev {ifname}' + else: + int_name = '' + + kernel_interface = json.loads(cmd(f'ip -j -d -s address show {int_name}')) + + # Return early if raw + if raw: + return kernel_interface, None + + # Format the kernel data + kernel_interface_out = _format_kernel_data(kernel_interface, detail) + + return kernel_interface, kernel_interface_out + +def _format_kernel_data(data, detail): + output_list = [] + tmpInfo = {} + + # Sort interfaces by name + for interface in sorted(data, key=lambda x: x.get('ifname', '')): + if interface.get('linkinfo', {}).get('info_kind') == 'vrf': + continue + + # Get the device model; ex. Intel Corporation Ethernet Controller I225-V + dev_model = interface.get('parentdev', '') + if 'parentdev' in interface: + parentdev = interface['parentdev'] + if re.match(r'^[0-9a-fA-F]{4}:', parentdev): + dev_model = cmd(f'lspci -nn -s {parentdev}').split(']:')[1].strip() + + # Get the IP addresses on interface + ip_list = [] + has_global = False + + for ip in interface['addr_info']: + if ip.get('scope') in ('global', 'host'): + has_global = True + local = ip.get('local', '-') + prefixlen = ip.get('prefixlen', '') + ip_list.append(f"{local}/{prefixlen}") + + + # If no global IP address, add '-'; indicates no IP address on interface + if not has_global: + ip_list.append('-') + + sl_status = ('A' if not 'UP' in interface['flags'] else 'u') + '/' + ('D' if interface['operstate'] == 'DOWN' else 'u') + + # Generate temporary dict to hold data + tmpInfo['ifname'] = interface.get('ifname', '') + tmpInfo['ip'] = ip_list + tmpInfo['mac'] = interface.get('address', '') + tmpInfo['mtu'] = interface.get('mtu', '') + tmpInfo['vrf'] = interface.get('master', 'default') + tmpInfo['status'] = sl_status + tmpInfo['description'] = interface.get('ifalias', '') + tmpInfo['device'] = dev_model + tmpInfo['alternate_names'] = interface.get('altnames', '') + tmpInfo['minimum_mtu'] = interface.get('min_mtu', '') + tmpInfo['maximum_mtu'] = interface.get('max_mtu', '') + rx_stats = interface.get('stats64', {}).get('rx') + tx_stats = interface.get('stats64', {}).get('tx') + tmpInfo['rx_packets'] = rx_stats.get('packets', "") + tmpInfo['rx_bytes'] = rx_stats.get('bytes', "") + tmpInfo['rx_errors'] = rx_stats.get('errors', "") + tmpInfo['rx_dropped'] = rx_stats.get('dropped', "") + tmpInfo['rx_over_errors'] = rx_stats.get('over_errors', '') + tmpInfo['multicast'] = rx_stats.get('multicast', "") + tmpInfo['tx_packets'] = tx_stats.get('packets', "") + tmpInfo['tx_bytes'] = tx_stats.get('bytes', "") + tmpInfo['tx_errors'] = tx_stats.get('errors', "") + tmpInfo['tx_dropped'] = tx_stats.get('dropped', "") + tmpInfo['tx_carrier_errors'] = tx_stats.get('carrier_errors', "") + tmpInfo['tx_collisions'] = tx_stats.get('collisions', "") + + # Generate output list; detail adds more fields + output_list.append([tmpInfo['ifname'], + '\n'.join(tmpInfo['ip']), + tmpInfo['mac'], + tmpInfo['vrf'], + tmpInfo['mtu'], + tmpInfo['status'], + tmpInfo['description'], + *([tmpInfo['device']] if detail else []), + *(['\n'.join(tmpInfo['alternate_names'])] if detail else []), + *([tmpInfo['minimum_mtu']] if detail else []), + *([tmpInfo['maximum_mtu']] if detail else []), + *([tmpInfo['rx_packets']] if detail else []), + *([tmpInfo['rx_bytes']] if detail else []), + *([tmpInfo['rx_errors']] if detail else []), + *([tmpInfo['rx_dropped']] if detail else []), + *([tmpInfo['rx_over_errors']] if detail else []), + *([tmpInfo['multicast']] if detail else []), + *([tmpInfo['tx_packets']] if detail else []), + *([tmpInfo['tx_bytes']] if detail else []), + *([tmpInfo['tx_errors']] if detail else []), + *([tmpInfo['tx_dropped']] if detail else []), + *([tmpInfo['tx_carrier_errors']] if detail else []), + *([tmpInfo['tx_collisions']] if detail else [])]) + + return output_list + @catch_broken_pipe def _format_show_data(data: list): unhandled = [] @@ -445,6 +562,27 @@ def _format_show_counters(data: list): print (output) return output +def show_kernel(raw: bool, intf_name: typing.Optional[str], detail: bool): + raw_data, data = _get_kernel_data(raw, intf_name, detail) + + # Return early if raw + if raw: + return raw_data + + # Normal headers; show interfaces kernel + headers = ['Interface', 'IP Address', 'MAC', 'VRF', 'MTU', 'S/L', 'Description'] + + # Detail headers; show interfaces kernel detail + detail_header = ['Interface', 'IP Address', 'MAC', 'VRF', 'MTU', 'S/L', 'Description', + 'Device', 'Alternate Names','Minimum MTU', 'Maximum MTU', 'RX_Packets', + 'RX_Bytes', 'RX_Errors', 'RX_Dropped', 'Receive Overrun Errors', 'Received Multicast', + 'TX_Packets', 'TX_Bytes', 'TX_Errors', 'TX_Dropped', 'Transmit Carrier Errors', + 'Transmit Collisions'] + + if detail: + detailed_output(data, detail_header) + else: + print(tabulate(data, headers)) def _show_raw(data: list, intf_name: str): if intf_name is not None and len(data) <= 1: diff --git a/src/op_mode/interfaces_wireguard.py b/src/op_mode/interfaces_wireguard.py new file mode 100644 index 000000000..627af0579 --- /dev/null +++ b/src/op_mode/interfaces_wireguard.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2024 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import sys +import vyos.opmode + +from vyos.ifconfig import WireGuardIf +from vyos.configquery import ConfigTreeQuery + + +def _verify(func): + """Decorator checks if WireGuard interface config exists""" + from functools import wraps + + @wraps(func) + def _wrapper(*args, **kwargs): + config = ConfigTreeQuery() + interface = kwargs.get('intf_name') + if not config.exists(['interfaces', 'wireguard', interface]): + unconf_message = f'WireGuard interface {interface} is not configured' + raise vyos.opmode.UnconfiguredSubsystem(unconf_message) + return func(*args, **kwargs) + + return _wrapper + + +@_verify +def show_summary(raw: bool, intf_name: str): + intf = WireGuardIf(intf_name, create=False, debug=False) + return intf.operational.show_interface() + + +if __name__ == '__main__': + try: + res = vyos.opmode.run(sys.modules[__name__]) + if res: + print(res) + except (ValueError, vyos.opmode.Error) as e: + print(e) + sys.exit(1) diff --git a/src/op_mode/ipsec.py b/src/op_mode/ipsec.py index 02ba126b4..1ab50b105 100755 --- a/src/op_mode/ipsec.py +++ b/src/op_mode/ipsec.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2022-2024 VyOS maintainers and contributors +# Copyright (C) 2022-2025 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -700,15 +700,6 @@ def reset_profile_dst(profile: str, tunnel: str, nbma_dst: str): ] ) ) - # initiate IKE SAs - for ike in sa_nbma_list: - if ike_sa_name in ike: - vyos.ipsec.vici_initiate( - ike_sa_name, - 'dmvpn', - ike[ike_sa_name]['local-host'], - ike[ike_sa_name]['remote-host'], - ) print( f'Profile {profile} tunnel {tunnel} remote-host {nbma_dst} reset result: success' ) @@ -732,18 +723,6 @@ def reset_profile_all(profile: str, tunnel: str): ) # terminate IKE SAs vyos.ipsec.terminate_vici_by_name(ike_sa_name, None) - # initiate IKE SAs - for ike in sa_list: - if ike_sa_name in ike: - vyos.ipsec.vici_initiate( - ike_sa_name, - 'dmvpn', - ike[ike_sa_name]['local-host'], - ike[ike_sa_name]['remote-host'], - ) - print( - f'Profile {profile} tunnel {tunnel} remote-host {ike[ike_sa_name]["remote-host"]} reset result: success' - ) print(f'Profile {profile} tunnel {tunnel} reset result: success') except vyos.ipsec.ViciInitiateError as err: raise vyos.opmode.UnconfiguredSubsystem(err) diff --git a/src/op_mode/reverseproxy.py b/src/op_mode/load-balancing_haproxy.py index 19704182a..ae6734e16 100755 --- a/src/op_mode/reverseproxy.py +++ b/src/op_mode/load-balancing_haproxy.py @@ -217,8 +217,8 @@ def _get_formatted_output(data): def show(raw: bool): config = ConfigTreeQuery() - if not config.exists('load-balancing reverse-proxy'): - raise vyos.opmode.UnconfiguredSubsystem('Reverse-proxy is not configured') + if not config.exists('load-balancing haproxy'): + raise vyos.opmode.UnconfiguredSubsystem('Haproxy is not configured') data = _get_raw_data() if raw: diff --git a/src/op_mode/load-balancing_wan.py b/src/op_mode/load-balancing_wan.py new file mode 100755 index 000000000..9fa473802 --- /dev/null +++ b/src/op_mode/load-balancing_wan.py @@ -0,0 +1,117 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2024 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import json +import re +import sys + +from datetime import datetime + +from vyos.config import Config +from vyos.utils.process import cmd + +import vyos.opmode + +wlb_status_file = '/run/wlb_status.json' + +status_format = '''Interface: {ifname} +Status: {status} +Last Status Change: {last_change} +Last Interface Success: {last_success} +Last Interface Failure: {last_failure} +Interface Failures: {failures} +''' + +def _verify(func): + """Decorator checks if WLB config exists""" + from functools import wraps + + @wraps(func) + def _wrapper(*args, **kwargs): + config = Config() + if not config.exists(['load-balancing', 'wan']): + unconf_message = 'WAN load-balancing is not configured' + raise vyos.opmode.UnconfiguredSubsystem(unconf_message) + return func(*args, **kwargs) + return _wrapper + +def _get_raw_data(): + with open(wlb_status_file, 'r') as f: + data = json.loads(f.read()) + if not data: + return {} + return data + +def _get_formatted_output(raw_data): + for ifname, if_data in raw_data.items(): + latest_change = if_data['last_success'] if if_data['last_success'] > if_data['last_failure'] else if_data['last_failure'] + + change_dt = datetime.fromtimestamp(latest_change) if latest_change > 0 else None + success_dt = datetime.fromtimestamp(if_data['last_success']) if if_data['last_success'] > 0 else None + failure_dt = datetime.fromtimestamp(if_data['last_failure']) if if_data['last_failure'] > 0 else None + now = datetime.utcnow() + + fmt_data = { + 'ifname': ifname, + 'status': "active" if if_data['state'] else "failed", + 'last_change': change_dt.strftime("%Y-%m-%d %H:%M:%S") if change_dt else 'N/A', + 'last_success': str(now - success_dt) if success_dt else 'N/A', + 'last_failure': str(now - failure_dt) if failure_dt else 'N/A', + 'failures': if_data['failure_count'] + } + print(status_format.format(**fmt_data)) + +@_verify +def show_summary(raw: bool): + data = _get_raw_data() + + if raw: + return data + else: + return _get_formatted_output(data) + +@_verify +def show_connection(raw: bool): + res = cmd('sudo conntrack -L -n') + lines = res.split("\n") + filtered_lines = [line for line in lines if re.search(r' mark=[1-9]', line)] + + if raw: + return filtered_lines + + for line in lines: + print(line) + +@_verify +def show_status(raw: bool): + res = cmd('sudo nft list chain ip vyos_wanloadbalance wlb_mangle_prerouting') + lines = res.split("\n") + filtered_lines = [line.replace("\t", "") for line in lines[3:-2] if 'meta mark set' not in line] + + if raw: + return filtered_lines + + for line in filtered_lines: + print(line) + +if __name__ == "__main__": + try: + res = vyos.opmode.run(sys.modules[__name__]) + if res: + print(res) + except (ValueError, vyos.opmode.Error) as e: + print(e) + sys.exit(1) diff --git a/src/op_mode/mtr.py b/src/op_mode/mtr.py index de139f2fa..522cbe008 100644 --- a/src/op_mode/mtr.py +++ b/src/op_mode/mtr.py @@ -23,161 +23,162 @@ from vyos.utils.network import vrf_list from vyos.utils.process import call options = { - 'report': { + 'report-mode': { 'mtr': '{command} --report', 'type': 'noarg', - 'help': 'This option puts mtr into report mode. When in this mode, mtr will run for the number of cycles specified by the -c option, and then print statistics and exit.' + 'help': 'This option puts mtr into report mode. When in this mode, mtr will run for the number of cycles specified by the -c option, and then print statistics and exit.', }, 'report-wide': { 'mtr': '{command} --report-wide', 'type': 'noarg', - 'help': 'This option puts mtr into wide report mode. When in this mode, mtr will not cut hostnames in the report.' + 'help': 'This option puts mtr into wide report mode. When in this mode, mtr will not cut hostnames in the report.', }, 'raw': { 'mtr': '{command} --raw', 'type': 'noarg', - 'help': 'Use the raw output format. This format is better suited for archival of the measurement results.' + 'help': 'Use the raw output format. This format is better suited for archival of the measurement results.', }, 'json': { 'mtr': '{command} --json', 'type': 'noarg', - 'help': 'Use this option to tell mtr to use the JSON output format.' + 'help': 'Use this option to tell mtr to use the JSON output format.', }, 'split': { 'mtr': '{command} --split', 'type': 'noarg', - 'help': 'Use this option to set mtr to spit out a format that is suitable for a split-user interface.' + 'help': 'Use this option to set mtr to spit out a format that is suitable for a split-user interface.', }, 'no-dns': { 'mtr': '{command} --no-dns', 'type': 'noarg', - 'help': 'Use this option to force mtr to display numeric IP numbers and not try to resolve the host names.' + 'help': 'Use this option to force mtr to display numeric IP numbers and not try to resolve the host names.', }, 'show-ips': { 'mtr': '{command} --show-ips {value}', 'type': '<num>', - 'help': 'Use this option to tell mtr to display both the host names and numeric IP numbers.' + 'help': 'Use this option to tell mtr to display both the host names and numeric IP numbers.', }, 'ipinfo': { 'mtr': '{command} --ipinfo {value}', 'type': '<num>', - 'help': 'Displays information about each IP hop.' + 'help': 'Displays information about each IP hop.', }, 'aslookup': { 'mtr': '{command} --aslookup', 'type': 'noarg', - 'help': 'Displays the Autonomous System (AS) number alongside each hop. Equivalent to --ipinfo 0.' + 'help': 'Displays the Autonomous System (AS) number alongside each hop. Equivalent to --ipinfo 0.', }, 'interval': { 'mtr': '{command} --interval {value}', 'type': '<num>', - 'help': 'Use this option to specify the positive number of seconds between ICMP ECHO requests. The default value for this parameter is one second. The root user may choose values between zero and one.' + 'help': 'Use this option to specify the positive number of seconds between ICMP ECHO requests. The default value for this parameter is one second. The root user may choose values between zero and one.', }, 'report-cycles': { 'mtr': '{command} --report-cycles {value}', 'type': '<num>', - 'help': 'Use this option to set the number of pings sent to determine both the machines on the network and the reliability of those machines. Each cycle lasts one second.' + 'help': 'Use this option to set the number of pings sent to determine both the machines on the network and the reliability of those machines. Each cycle lasts one second.', }, 'psize': { 'mtr': '{command} --psize {value}', 'type': '<num>', - 'help': 'This option sets the packet size used for probing. It is in bytes, inclusive IP and ICMP headers. If set to a negative number, every iteration will use a different, random packet size up to that number.' + 'help': 'This option sets the packet size used for probing. It is in bytes, inclusive IP and ICMP headers. If set to a negative number, every iteration will use a different, random packet size up to that number.', }, 'bitpattern': { 'mtr': '{command} --bitpattern {value}', 'type': '<num>', - 'help': 'Specifies bit pattern to use in payload. Should be within range 0 - 255. If NUM is greater than 255, a random pattern is used.' + 'help': 'Specifies bit pattern to use in payload. Should be within range 0 - 255. If NUM is greater than 255, a random pattern is used.', }, 'gracetime': { 'mtr': '{command} --gracetime {value}', 'type': '<num>', - 'help': 'Use this option to specify the positive number of seconds to wait for responses after the final request. The default value is five seconds.' + 'help': 'Use this option to specify the positive number of seconds to wait for responses after the final request. The default value is five seconds.', }, 'tos': { 'mtr': '{command} --tos {value}', 'type': '<tos>', - 'help': 'Specifies value for type of service field in IP header. Should be within range 0 - 255.' + 'help': 'Specifies value for type of service field in IP header. Should be within range 0 - 255.', }, 'mpls': { 'mtr': '{command} --mpls {value}', 'type': 'noarg', - 'help': 'Use this option to tell mtr to display information from ICMP extensions for MPLS (RFC 4950) that are encoded in the response packets.' + 'help': 'Use this option to tell mtr to display information from ICMP extensions for MPLS (RFC 4950) that are encoded in the response packets.', }, 'interface': { 'mtr': '{command} --interface {value}', 'type': '<interface>', 'helpfunction': interface_list, - 'help': 'Use the network interface with a specific name for sending network probes. This can be useful when you have multiple network interfaces with routes to your destination, for example both wired Ethernet and WiFi, and wish to test a particular interface.' + 'help': 'Use the network interface with a specific name for sending network probes. This can be useful when you have multiple network interfaces with routes to your destination, for example both wired Ethernet and WiFi, and wish to test a particular interface.', }, 'address': { 'mtr': '{command} --address {value}', 'type': '<x.x.x.x> <h:h:h:h:h:h:h:h>', - 'help': 'Use this option to bind the outgoing socket to ADDRESS, so that all packets will be sent with ADDRESS as source address.' + 'help': 'Use this option to bind the outgoing socket to ADDRESS, so that all packets will be sent with ADDRESS as source address.', }, 'first-ttl': { 'mtr': '{command} --first-ttl {value}', 'type': '<num>', - 'help': 'Specifies with what TTL to start. Defaults to 1.' + 'help': 'Specifies with what TTL to start. Defaults to 1.', }, 'max-ttl': { 'mtr': '{command} --max-ttl {value}', 'type': '<num>', - 'help': 'Specifies the maximum number of hops or max time-to-live value mtr will probe. Default is 30.' + 'help': 'Specifies the maximum number of hops or max time-to-live value mtr will probe. Default is 30.', }, 'max-unknown': { 'mtr': '{command} --max-unknown {value}', 'type': '<num>', - 'help': 'Specifies the maximum unknown host. Default is 5.' + 'help': 'Specifies the maximum unknown host. Default is 5.', }, 'udp': { 'mtr': '{command} --udp', 'type': 'noarg', - 'help': 'Use UDP datagrams instead of ICMP ECHO.' + 'help': 'Use UDP datagrams instead of ICMP ECHO.', }, 'tcp': { 'mtr': '{command} --tcp', 'type': 'noarg', - 'help': ' Use TCP SYN packets instead of ICMP ECHO. PACKETSIZE is ignored, since SYN packets can not contain data.' + 'help': ' Use TCP SYN packets instead of ICMP ECHO. PACKETSIZE is ignored, since SYN packets can not contain data.', }, 'sctp': { 'mtr': '{command} --sctp', 'type': 'noarg', - 'help': 'Use Stream Control Transmission Protocol packets instead of ICMP ECHO.' + 'help': 'Use Stream Control Transmission Protocol packets instead of ICMP ECHO.', }, 'port': { 'mtr': '{command} --port {value}', 'type': '<port>', - 'help': 'The target port number for TCP/SCTP/UDP traces.' + 'help': 'The target port number for TCP/SCTP/UDP traces.', }, 'localport': { 'mtr': '{command} --localport {value}', 'type': '<port>', - 'help': 'The source port number for UDP traces.' + 'help': 'The source port number for UDP traces.', }, 'timeout': { 'mtr': '{command} --timeout {value}', 'type': '<num>', - 'help': ' The number of seconds to keep probe sockets open before giving up on the connection.' + 'help': ' The number of seconds to keep probe sockets open before giving up on the connection.', }, 'mark': { 'mtr': '{command} --mark {value}', 'type': '<num>', - 'help': ' Set the mark for each packet sent through this socket similar to the netfilter MARK target but socket-based. MARK is 32 unsigned integer.' + 'help': ' Set the mark for each packet sent through this socket similar to the netfilter MARK target but socket-based. MARK is 32 unsigned integer.', }, 'vrf': { 'mtr': 'sudo ip vrf exec {value} {command}', 'type': '<vrf>', 'help': 'Use specified VRF table', 'helpfunction': vrf_list, - 'dflt': 'default' - } - } + 'dflt': 'default', + }, +} mtr = { 4: '/bin/mtr -4', 6: '/bin/mtr -6', } + class List(list): def first(self): return self.pop(0) if self else '' @@ -203,8 +204,8 @@ def completion_failure(option: str) -> None: def expension_failure(option, completions): reason = 'Ambiguous' if completions else 'Invalid' sys.stderr.write( - '\n\n {} command: {} [{}]\n\n'.format(reason, ' '.join(sys.argv), - option)) + '\n\n {} command: {} [{}]\n\n'.format(reason, ' '.join(sys.argv), option) + ) if completions: sys.stderr.write(' Possible completions:\n ') sys.stderr.write('\n '.join(completions)) @@ -218,21 +219,24 @@ def complete(prefix): def convert(command, args): + to_json = False while args: shortname = args.first() longnames = complete(shortname) if len(longnames) != 1: expension_failure(shortname, longnames) longname = longnames[0] + if longname == 'json': + to_json = True if options[longname]['type'] == 'noarg': - command = options[longname]['mtr'].format( - command=command, value='') + command = options[longname]['mtr'].format(command=command, value='') elif not args: sys.exit(f'mtr: missing argument for {longname} option') else: command = options[longname]['mtr'].format( - command=command, value=args.first()) - return command + command=command, value=args.first() + ) + return command, to_json if __name__ == '__main__': @@ -240,8 +244,7 @@ if __name__ == '__main__': host = args.first() if not host: - sys.exit("mtr: Missing host") - + sys.exit('mtr: Missing host') if host == '--get-options' or host == '--get-options-nested': if host == '--get-options-nested': @@ -302,5 +305,8 @@ if __name__ == '__main__': except ValueError: sys.exit(f'mtr: Unknown host: {host}') - command = convert(mtr[version], args) - call(f'{command} --curses --displaymode 0 {host}') + command, to_json = convert(mtr[version], args) + if to_json: + call(f'{command} {host}') + else: + call(f'{command} --curses --displaymode 0 {host}') diff --git a/src/op_mode/mtr_execute.py b/src/op_mode/mtr_execute.py new file mode 100644 index 000000000..2585a7ee4 --- /dev/null +++ b/src/op_mode/mtr_execute.py @@ -0,0 +1,217 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2024 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import ipaddress +import socket +import sys +import typing + +from json import loads + +from vyos.utils.network import interface_list +from vyos.utils.network import vrf_list +from vyos.utils.process import cmd +from vyos.utils.process import call + +import vyos.opmode + +ArgProtocol = typing.Literal['tcp', 'udp', 'sctp'] +noargs_list = [ + 'report_mode', + 'json', + 'report_wide', + 'split', + 'raw', + 'no_dns', + 'aslookup', +] + + +def vrf_list_default(): + return vrf_list() + ['default'] + + +options = { + 'report_mode': { + 'mtr': '{command} --report', + }, + 'protocol': { + 'mtr': '{command} --{value}', + }, + 'json': { + 'mtr': '{command} --json', + }, + 'report_wide': { + 'mtr': '{command} --report-wide', + }, + 'raw': { + 'mtr': '{command} --raw', + }, + 'split': { + 'mtr': '{command} --split', + }, + 'no_dns': { + 'mtr': '{command} --no-dns', + }, + 'show_ips': { + 'mtr': '{command} --show-ips {value}', + }, + 'ipinfo': { + 'mtr': '{command} --ipinfo {value}', + }, + 'aslookup': { + 'mtr': '{command} --aslookup', + }, + 'interval': { + 'mtr': '{command} --interval {value}', + }, + 'report_cycles': { + 'mtr': '{command} --report-cycles {value}', + }, + 'psize': { + 'mtr': '{command} --psize {value}', + }, + 'bitpattern': { + 'mtr': '{command} --bitpattern {value}', + }, + 'gracetime': { + 'mtr': '{command} --gracetime {value}', + }, + 'tos': { + 'mtr': '{command} --tos {value}', + }, + 'mpls': { + 'mtr': '{command} --mpls {value}', + }, + 'interface': { + 'mtr': '{command} --interface {value}', + 'helpfunction': interface_list, + }, + 'address': { + 'mtr': '{command} --address {value}', + }, + 'first_ttl': { + 'mtr': '{command} --first-ttl {value}', + }, + 'max_ttl': { + 'mtr': '{command} --max-ttl {value}', + }, + 'max_unknown': { + 'mtr': '{command} --max-unknown {value}', + }, + 'port': { + 'mtr': '{command} --port {value}', + }, + 'localport': { + 'mtr': '{command} --localport {value}', + }, + 'timeout': { + 'mtr': '{command} --timeout {value}', + }, + 'mark': { + 'mtr': '{command} --mark {value}', + }, + 'vrf': { + 'mtr': 'sudo ip vrf exec {value} {command}', + 'helpfunction': vrf_list_default, + 'dflt': 'default', + }, +} + +mtr_command = { + 4: '/bin/mtr -4', + 6: '/bin/mtr -6', +} + + +def mtr( + host: str, + for_api: typing.Optional[bool], + report_mode: typing.Optional[bool], + protocol: typing.Optional[ArgProtocol], + report_wide: typing.Optional[bool], + raw: typing.Optional[bool], + json: typing.Optional[bool], + split: typing.Optional[bool], + no_dns: typing.Optional[bool], + show_ips: typing.Optional[str], + ipinfo: typing.Optional[str], + aslookup: typing.Optional[bool], + interval: typing.Optional[str], + report_cycles: typing.Optional[str], + psize: typing.Optional[str], + bitpattern: typing.Optional[str], + gracetime: typing.Optional[str], + tos: typing.Optional[str], + mpl: typing.Optional[bool], + interface: typing.Optional[str], + address: typing.Optional[str], + first_ttl: typing.Optional[str], + max_ttl: typing.Optional[str], + max_unknown: typing.Optional[str], + port: typing.Optional[str], + localport: typing.Optional[str], + timeout: typing.Optional[str], + mark: typing.Optional[str], + vrf: typing.Optional[str], +): + args = locals() + for name, option in options.items(): + if 'dflt' in option and not args[name]: + args[name] = option['dflt'] + + try: + ip = socket.gethostbyname(host) + except UnicodeError: + raise vyos.opmode.InternalError(f'Unknown host: {host}') + except socket.gaierror: + ip = host + + try: + version = ipaddress.ip_address(ip).version + except ValueError: + raise vyos.opmode.InternalError(f'Unknown host: {host}') + + command = mtr_command[version] + + for key, val in args.items(): + if key in options and val: + if 'helpfunction' in options[key]: + allowed_values = options[key]['helpfunction']() + if val not in allowed_values: + raise vyos.opmode.InternalError( + f'Invalid argument for option {key} - {val}' + ) + value = '' if key in noargs_list else val + command = options[key]['mtr'].format(command=command, value=val) + + if json: + output = cmd(f'{command} {host}') + if for_api: + output = loads(output) + print(output) + else: + call(f'{command} --curses --displaymode 0 {host}') + + +if __name__ == '__main__': + try: + res = vyos.opmode.run(sys.modules[__name__]) + if res: + print(res) + except (ValueError, vyos.opmode.Error) as e: + print(e) + sys.exit(1) diff --git a/src/op_mode/nhrp.py b/src/op_mode/nhrp.py deleted file mode 100755 index e66f33079..000000000 --- a/src/op_mode/nhrp.py +++ /dev/null @@ -1,101 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2023 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import sys -import tabulate -import vyos.opmode - -from vyos.utils.process import cmd -from vyos.utils.process import process_named_running -from vyos.utils.dict import colon_separated_to_dict - - -def _get_formatted_output(output_dict: dict) -> str: - """ - Create formatted table for CLI output - :param output_dict: dictionary for API - :type output_dict: dict - :return: tabulate string - :rtype: str - """ - print(f"Status: {output_dict['Status']}") - output: str = tabulate.tabulate(output_dict['routes'], headers='keys', - numalign="left") - return output - - -def _get_formatted_dict(output_string: str) -> dict: - """ - Format string returned from CMD to API list - :param output_string: String received by CMD - :type output_string: str - :return: dictionary for API - :rtype: dict - """ - formatted_dict: dict = { - 'Status': '', - 'routes': [] - } - output_list: list = output_string.split('\n\n') - for list_a in output_list: - output_dict = colon_separated_to_dict(list_a, True) - if 'Status' in output_dict: - formatted_dict['Status'] = output_dict['Status'] - else: - formatted_dict['routes'].append(output_dict) - return formatted_dict - - -def show_interface(raw: bool): - """ - Command 'show nhrp interface' - :param raw: if API - :type raw: bool - """ - if not process_named_running('opennhrp'): - raise vyos.opmode.UnconfiguredSubsystem('OpenNHRP is not running.') - interface_string: str = cmd('sudo opennhrpctl interface show') - interface_dict: dict = _get_formatted_dict(interface_string) - if raw: - return interface_dict - else: - return _get_formatted_output(interface_dict) - - -def show_tunnel(raw: bool): - """ - Command 'show nhrp tunnel' - :param raw: if API - :type raw: bool - """ - if not process_named_running('opennhrp'): - raise vyos.opmode.UnconfiguredSubsystem('OpenNHRP is not running.') - tunnel_string: str = cmd('sudo opennhrpctl show') - tunnel_dict: list = _get_formatted_dict(tunnel_string) - if raw: - return tunnel_dict - else: - return _get_formatted_output(tunnel_dict) - - -if __name__ == '__main__': - try: - res = vyos.opmode.run(sys.modules[__name__]) - if res: - print(res) - except (ValueError, vyos.opmode.Error) as e: - print(e) - sys.exit(1) diff --git a/src/op_mode/pki.py b/src/op_mode/pki.py index ab613e5c4..49a461e9e 100755 --- a/src/op_mode/pki.py +++ b/src/op_mode/pki.py @@ -14,25 +14,36 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import argparse import ipaddress import os import re import sys import tabulate +import typing from cryptography import x509 from cryptography.x509.oid import ExtendedKeyUsageOID +import vyos.opmode + from vyos.config import Config from vyos.config import config_dict_mangle_acme -from vyos.pki import encode_certificate, encode_public_key, encode_private_key, encode_dh_parameters +from vyos.pki import encode_certificate +from vyos.pki import encode_public_key +from vyos.pki import encode_private_key +from vyos.pki import encode_dh_parameters from vyos.pki import get_certificate_fingerprint -from vyos.pki import create_certificate, create_certificate_request, create_certificate_revocation_list +from vyos.pki import create_certificate +from vyos.pki import create_certificate_request +from vyos.pki import create_certificate_revocation_list from vyos.pki import create_private_key from vyos.pki import create_dh_parameters -from vyos.pki import load_certificate, load_certificate_request, load_private_key -from vyos.pki import load_crl, load_dh_parameters, load_public_key +from vyos.pki import load_certificate +from vyos.pki import load_certificate_request +from vyos.pki import load_private_key +from vyos.pki import load_crl +from vyos.pki import load_dh_parameters +from vyos.pki import load_public_key from vyos.pki import verify_certificate from vyos.utils.io import ask_input from vyos.utils.io import ask_yes_no @@ -42,18 +53,50 @@ from vyos.utils.process import cmd CERT_REQ_END = '-----END CERTIFICATE REQUEST-----' auth_dir = '/config/auth' +ArgsPkiType = typing.Literal['ca', 'certificate', 'dh', 'key-pair', 'openvpn', 'crl'] +ArgsPkiTypeGen = typing.Literal[ArgsPkiType, typing.Literal['ssh', 'wireguard']] +ArgsFingerprint = typing.Literal['sha256', 'sha384', 'sha512'] + # Helper Functions conf = Config() + + +def _verify(target): + """Decorator checks if config for PKI exists""" + from functools import wraps + + if target not in ['ca', 'certificate']: + raise ValueError('Invalid PKI') + + def _verify_target(func): + @wraps(func) + def _wrapper(*args, **kwargs): + name = kwargs.get('name') + unconf_message = f'PKI {target} "{name}" does not exist!' + if name: + if not conf.exists(['pki', target, name]): + raise vyos.opmode.UnconfiguredSubsystem(unconf_message) + return func(*args, **kwargs) + + return _wrapper + + return _verify_target + + def get_default_values(): # Fetch default x509 values base = ['pki', 'x509', 'default'] - x509_defaults = conf.get_config_dict(base, key_mangling=('-', '_'), - no_tag_node_value_mangle=True, - get_first_key=True, - with_recursive_defaults=True) + x509_defaults = conf.get_config_dict( + base, + key_mangling=('-', '_'), + no_tag_node_value_mangle=True, + get_first_key=True, + with_recursive_defaults=True, + ) return x509_defaults + def get_config_ca_certificate(name=None): # Fetch ca certificates from config base = ['pki', 'ca'] @@ -62,12 +105,15 @@ def get_config_ca_certificate(name=None): if name: base = base + [name] - if not conf.exists(base + ['private', 'key']) or not conf.exists(base + ['certificate']): + if not conf.exists(base + ['private', 'key']) or not conf.exists( + base + ['certificate'] + ): return False - return conf.get_config_dict(base, key_mangling=('-', '_'), - get_first_key=True, - no_tag_node_value_mangle=True) + return conf.get_config_dict( + base, key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True + ) + def get_config_certificate(name=None): # Get certificates from config @@ -77,18 +123,21 @@ def get_config_certificate(name=None): if name: base = base + [name] - if not conf.exists(base + ['private', 'key']) or not conf.exists(base + ['certificate']): + if not conf.exists(base + ['private', 'key']) or not conf.exists( + base + ['certificate'] + ): return False - pki = conf.get_config_dict(base, key_mangling=('-', '_'), - get_first_key=True, - no_tag_node_value_mangle=True) + pki = conf.get_config_dict( + base, key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True + ) if pki: for certificate in pki: pki[certificate] = config_dict_mangle_acme(certificate, pki[certificate]) return pki + def get_certificate_ca(cert, ca_certs): # Find CA certificate for given certificate if not ca_certs: @@ -107,6 +156,7 @@ def get_certificate_ca(cert, ca_certs): return ca_name return None + def get_config_revoked_certificates(): # Fetch revoked certificates from config ca_base = ['pki', 'ca'] @@ -115,19 +165,26 @@ def get_config_revoked_certificates(): certs = [] if conf.exists(ca_base): - ca_certificates = conf.get_config_dict(ca_base, key_mangling=('-', '_'), - get_first_key=True, - no_tag_node_value_mangle=True) + ca_certificates = conf.get_config_dict( + ca_base, + key_mangling=('-', '_'), + get_first_key=True, + no_tag_node_value_mangle=True, + ) certs.extend(ca_certificates.values()) if conf.exists(cert_base): - certificates = conf.get_config_dict(cert_base, key_mangling=('-', '_'), - get_first_key=True, - no_tag_node_value_mangle=True) + certificates = conf.get_config_dict( + cert_base, + key_mangling=('-', '_'), + get_first_key=True, + no_tag_node_value_mangle=True, + ) certs.extend(certificates.values()) return [cert_dict for cert_dict in certs if 'revoke' in cert_dict] + def get_revoked_by_serial_numbers(serial_numbers=[]): # Return serial numbers of revoked certificates certs_out = [] @@ -151,113 +208,153 @@ def get_revoked_by_serial_numbers(serial_numbers=[]): certs_out.append(cert_name) return certs_out -def install_certificate(name, cert='', private_key=None, key_type=None, key_passphrase=None, is_ca=False): + +def install_certificate( + name, cert='', private_key=None, key_type=None, key_passphrase=None, is_ca=False +): # Show/install conf commands for certificate prefix = 'ca' if is_ca else 'certificate' - base = f"pki {prefix} {name}" + base = f'pki {prefix} {name}' config_paths = [] if cert: - cert_pem = "".join(encode_certificate(cert).strip().split("\n")[1:-1]) + cert_pem = ''.join(encode_certificate(cert).strip().split('\n')[1:-1]) config_paths.append(f"{base} certificate '{cert_pem}'") if private_key: - key_pem = "".join(encode_private_key(private_key, passphrase=key_passphrase).strip().split("\n")[1:-1]) + key_pem = ''.join( + encode_private_key(private_key, passphrase=key_passphrase) + .strip() + .split('\n')[1:-1] + ) config_paths.append(f"{base} private key '{key_pem}'") if key_passphrase: - config_paths.append(f"{base} private password-protected") + config_paths.append(f'{base} private password-protected') install_into_config(conf, config_paths) + def install_crl(ca_name, crl): # Show/install conf commands for crl - crl_pem = "".join(encode_certificate(crl).strip().split("\n")[1:-1]) + crl_pem = ''.join(encode_certificate(crl).strip().split('\n')[1:-1]) install_into_config(conf, [f"pki ca {ca_name} crl '{crl_pem}'"]) + def install_dh_parameters(name, params): # Show/install conf commands for dh params - dh_pem = "".join(encode_dh_parameters(params).strip().split("\n")[1:-1]) + dh_pem = ''.join(encode_dh_parameters(params).strip().split('\n')[1:-1]) install_into_config(conf, [f"pki dh {name} parameters '{dh_pem}'"]) + def install_ssh_key(name, public_key, private_key, passphrase=None): # Show/install conf commands for ssh key - key_openssh = encode_public_key(public_key, encoding='OpenSSH', key_format='OpenSSH') + key_openssh = encode_public_key( + public_key, encoding='OpenSSH', key_format='OpenSSH' + ) username = os.getlogin() - type_key_split = key_openssh.split(" ") - - base = f"system login user {username} authentication public-keys {name}" - install_into_config(conf, [ - f"{base} key '{type_key_split[1]}'", - f"{base} type '{type_key_split[0]}'" - ]) - print(encode_private_key(private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase)) - -def install_keypair(name, key_type, private_key=None, public_key=None, passphrase=None, prompt=True): + type_key_split = key_openssh.split(' ') + + base = f'system login user {username} authentication public-keys {name}' + install_into_config( + conf, + [f"{base} key '{type_key_split[1]}'", f"{base} type '{type_key_split[0]}'"], + ) + print( + encode_private_key( + private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase + ) + ) + + +def install_keypair( + name, key_type, private_key=None, public_key=None, passphrase=None, prompt=True +): # Show/install conf commands for key-pair config_paths = [] if public_key: - install_public_key = not prompt or ask_yes_no('Do you want to install the public key?', default=True) + install_public_key = not prompt or ask_yes_no( + 'Do you want to install the public key?', default=True + ) public_key_pem = encode_public_key(public_key) if install_public_key: - install_public_pem = "".join(public_key_pem.strip().split("\n")[1:-1]) - config_paths.append(f"pki key-pair {name} public key '{install_public_pem}'") + install_public_pem = ''.join(public_key_pem.strip().split('\n')[1:-1]) + config_paths.append( + f"pki key-pair {name} public key '{install_public_pem}'" + ) else: - print("Public key:") + print('Public key:') print(public_key_pem) if private_key: - install_private_key = not prompt or ask_yes_no('Do you want to install the private key?', default=True) + install_private_key = not prompt or ask_yes_no( + 'Do you want to install the private key?', default=True + ) private_key_pem = encode_private_key(private_key, passphrase=passphrase) if install_private_key: - install_private_pem = "".join(private_key_pem.strip().split("\n")[1:-1]) - config_paths.append(f"pki key-pair {name} private key '{install_private_pem}'") + install_private_pem = ''.join(private_key_pem.strip().split('\n')[1:-1]) + config_paths.append( + f"pki key-pair {name} private key '{install_private_pem}'" + ) if passphrase: - config_paths.append(f"pki key-pair {name} private password-protected") + config_paths.append(f'pki key-pair {name} private password-protected') else: - print("Private key:") + print('Private key:') print(private_key_pem) install_into_config(conf, config_paths) + def install_openvpn_key(name, key_data, key_version='1'): config_paths = [ f"pki openvpn shared-secret {name} key '{key_data}'", - f"pki openvpn shared-secret {name} version '{key_version}'" + f"pki openvpn shared-secret {name} version '{key_version}'", ] install_into_config(conf, config_paths) + def install_wireguard_key(interface, private_key, public_key): # Show conf commands for installing wireguard key pairs from vyos.ifconfig import Section + if Section.section(interface) != 'wireguard': print(f'"{interface}" is not a WireGuard interface name!') exit(1) # Check if we are running in a config session - if yes, we can directly write to the CLI - install_into_config(conf, [f"interfaces wireguard {interface} private-key '{private_key}'"]) + install_into_config( + conf, [f"interfaces wireguard {interface} private-key '{private_key}'"] + ) print(f"Corresponding public-key to use on peer system is: '{public_key}'") + def install_wireguard_psk(interface, peer, psk): from vyos.ifconfig import Section + if Section.section(interface) != 'wireguard': print(f'"{interface}" is not a WireGuard interface name!') exit(1) # Check if we are running in a config session - if yes, we can directly write to the CLI - install_into_config(conf, [f"interfaces wireguard {interface} peer {peer} preshared-key '{psk}'"]) + install_into_config( + conf, [f"interfaces wireguard {interface} peer {peer} preshared-key '{psk}'"] + ) + def ask_passphrase(): passphrase = None - print("Note: If you plan to use the generated key on this router, do not encrypt the private key.") + print( + 'Note: If you plan to use the generated key on this router, do not encrypt the private key.' + ) if ask_yes_no('Do you want to encrypt the private key with a passphrase?'): passphrase = ask_input('Enter passphrase:') return passphrase + def write_file(filename, contents): full_path = os.path.join(auth_dir, filename) directory = os.path.dirname(full_path) @@ -266,7 +363,9 @@ def write_file(filename, contents): print('Failed to write file: directory does not exist') return False - if os.path.exists(full_path) and not ask_yes_no('Do you want to overwrite the existing file?'): + if os.path.exists(full_path) and not ask_yes_no( + 'Do you want to overwrite the existing file?' + ): return False with open(full_path, 'w') as f: @@ -274,10 +373,14 @@ def write_file(filename, contents): print(f'File written to {full_path}') -# Generation functions +# Generation functions def generate_private_key(): - key_type = ask_input('Enter private key type: [rsa, dsa, ec]', default='rsa', valid_responses=['rsa', 'dsa', 'ec']) + key_type = ask_input( + 'Enter private key type: [rsa, dsa, ec]', + default='rsa', + valid_responses=['rsa', 'dsa', 'ec'], + ) size_valid = [] size_default = 0 @@ -289,28 +392,43 @@ def generate_private_key(): size_default = 256 size_valid = [224, 256, 384, 521] - size = ask_input('Enter private key bits:', default=size_default, numeric_only=True, valid_responses=size_valid) + size = ask_input( + 'Enter private key bits:', + default=size_default, + numeric_only=True, + valid_responses=size_valid, + ) return create_private_key(key_type, size), key_type + def parse_san_string(san_string): if not san_string: return None output = [] - san_split = san_string.strip().split(",") + san_split = san_string.strip().split(',') for pair_str in san_split: - tag, value = pair_str.strip().split(":", 1) + tag, value = pair_str.strip().split(':', 1) if tag == 'ipv4': output.append(ipaddress.IPv4Address(value)) elif tag == 'ipv6': output.append(ipaddress.IPv6Address(value)) elif tag == 'dns' or tag == 'rfc822': output.append(value) - return output - -def generate_certificate_request(private_key=None, key_type=None, return_request=False, name=None, install=False, file=False, ask_san=True): + return + + +def generate_certificate_request( + private_key=None, + key_type=None, + return_request=False, + name=None, + install=False, + file=False, + ask_san=True, +): if not private_key: private_key, key_type = generate_private_key() @@ -319,18 +437,24 @@ def generate_certificate_request(private_key=None, key_type=None, return_request while True: country = ask_input('Enter country code:', default=default_values['country']) if len(country) != 2: - print("Country name must be a 2 character country code") + print('Country name must be a 2 character country code') continue subject['country'] = country break subject['state'] = ask_input('Enter state:', default=default_values['state']) - subject['locality'] = ask_input('Enter locality:', default=default_values['locality']) - subject['organization'] = ask_input('Enter organization name:', default=default_values['organization']) + subject['locality'] = ask_input( + 'Enter locality:', default=default_values['locality'] + ) + subject['organization'] = ask_input( + 'Enter organization name:', default=default_values['organization'] + ) subject['common_name'] = ask_input('Enter common name:', default='vyos.io') subject_alt_names = None if ask_san and ask_yes_no('Do you want to configure Subject Alternative Names?'): - print("Enter alternative names in a comma separate list, example: ipv4:1.1.1.1,ipv6:fe80::1,dns:vyos.net,rfc822:user@vyos.net") + print( + 'Enter alternative names in a comma separate list, example: ipv4:1.1.1.1,ipv6:fe80::1,dns:vyos.net,rfc822:user@vyos.net' + ) san_string = ask_input('Enter Subject Alternative Names:') subject_alt_names = parse_san_string(san_string) @@ -347,24 +471,48 @@ def generate_certificate_request(private_key=None, key_type=None, return_request return None if install: - print("Certificate request:") - print(encode_certificate(cert_req) + "\n") - install_certificate(name, private_key=private_key, key_type=key_type, key_passphrase=passphrase, is_ca=False) + print('Certificate request:') + print(encode_certificate(cert_req) + '\n') + install_certificate( + name, + private_key=private_key, + key_type=key_type, + key_passphrase=passphrase, + is_ca=False, + ) if file: write_file(f'{name}.csr', encode_certificate(cert_req)) - write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase)) - -def generate_certificate(cert_req, ca_cert, ca_private_key, is_ca=False, is_sub_ca=False): - valid_days = ask_input('Enter how many days certificate will be valid:', default='365' if not is_ca else '1825', numeric_only=True) + write_file( + f'{name}.key', encode_private_key(private_key, passphrase=passphrase) + ) + + +def generate_certificate( + cert_req, ca_cert, ca_private_key, is_ca=False, is_sub_ca=False +): + valid_days = ask_input( + 'Enter how many days certificate will be valid:', + default='365' if not is_ca else '1825', + numeric_only=True, + ) cert_type = None if not is_ca: - cert_type = ask_input('Enter certificate type: (client, server)', default='server', valid_responses=['client', 'server']) - return create_certificate(cert_req, ca_cert, ca_private_key, valid_days, cert_type, is_ca, is_sub_ca) + cert_type = ask_input( + 'Enter certificate type: (client, server)', + default='server', + valid_responses=['client', 'server'], + ) + return create_certificate( + cert_req, ca_cert, ca_private_key, valid_days, cert_type, is_ca, is_sub_ca + ) + def generate_ca_certificate(name, install=False, file=False): private_key, key_type = generate_private_key() - cert_req = generate_certificate_request(private_key, key_type, return_request=True, ask_san=False) + cert_req = generate_certificate_request( + private_key, key_type, return_request=True, ask_san=False + ) cert = generate_certificate(cert_req, cert_req, private_key, is_ca=True) passphrase = ask_passphrase() @@ -374,11 +522,16 @@ def generate_ca_certificate(name, install=False, file=False): return None if install: - install_certificate(name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=True) + install_certificate( + name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=True + ) if file: write_file(f'{name}.pem', encode_certificate(cert)) - write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase)) + write_file( + f'{name}.key', encode_private_key(private_key, passphrase=passphrase) + ) + def generate_ca_certificate_sign(name, ca_name, install=False, file=False): ca_dict = get_config_ca_certificate(ca_name) @@ -390,17 +543,19 @@ def generate_ca_certificate_sign(name, ca_name, install=False, file=False): ca_cert = load_certificate(ca_dict['certificate']) if not ca_cert: - print("Failed to load signing CA certificate, aborting") + print('Failed to load signing CA certificate, aborting') return None ca_private = ca_dict['private'] ca_private_passphrase = None if 'password_protected' in ca_private: ca_private_passphrase = ask_input('Enter signing CA private key passphrase:') - ca_private_key = load_private_key(ca_private['key'], passphrase=ca_private_passphrase) + ca_private_key = load_private_key( + ca_private['key'], passphrase=ca_private_passphrase + ) if not ca_private_key: - print("Failed to load signing CA private key, aborting") + print('Failed to load signing CA private key, aborting') return None private_key = None @@ -409,9 +564,11 @@ def generate_ca_certificate_sign(name, ca_name, install=False, file=False): cert_req = None if not ask_yes_no('Do you already have a certificate request?'): private_key, key_type = generate_private_key() - cert_req = generate_certificate_request(private_key, key_type, return_request=True, ask_san=False) + cert_req = generate_certificate_request( + private_key, key_type, return_request=True, ask_san=False + ) else: - print("Paste certificate request and press enter:") + print('Paste certificate request and press enter:') lines = [] curr_line = '' while True: @@ -421,17 +578,21 @@ def generate_ca_certificate_sign(name, ca_name, install=False, file=False): lines.append(curr_line) if not lines: - print("Aborted") + print('Aborted') return None - wrap = lines[0].find('-----') < 0 # Only base64 pasted, add the CSR tags for parsing - cert_req = load_certificate_request("\n".join(lines), wrap) + wrap = ( + lines[0].find('-----') < 0 + ) # Only base64 pasted, add the CSR tags for parsing + cert_req = load_certificate_request('\n'.join(lines), wrap) if not cert_req: - print("Invalid certificate request") + print('Invalid certificate request') return None - cert = generate_certificate(cert_req, ca_cert, ca_private_key, is_ca=True, is_sub_ca=True) + cert = generate_certificate( + cert_req, ca_cert, ca_private_key, is_ca=True, is_sub_ca=True + ) passphrase = None if private_key is not None: @@ -444,12 +605,17 @@ def generate_ca_certificate_sign(name, ca_name, install=False, file=False): return None if install: - install_certificate(name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=True) + install_certificate( + name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=True + ) if file: write_file(f'{name}.pem', encode_certificate(cert)) if private_key is not None: - write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase)) + write_file( + f'{name}.key', encode_private_key(private_key, passphrase=passphrase) + ) + def generate_certificate_sign(name, ca_name, install=False, file=False): ca_dict = get_config_ca_certificate(ca_name) @@ -461,17 +627,19 @@ def generate_certificate_sign(name, ca_name, install=False, file=False): ca_cert = load_certificate(ca_dict['certificate']) if not ca_cert: - print("Failed to load CA certificate, aborting") + print('Failed to load CA certificate, aborting') return None ca_private = ca_dict['private'] ca_private_passphrase = None if 'password_protected' in ca_private: ca_private_passphrase = ask_input('Enter CA private key passphrase:') - ca_private_key = load_private_key(ca_private['key'], passphrase=ca_private_passphrase) + ca_private_key = load_private_key( + ca_private['key'], passphrase=ca_private_passphrase + ) if not ca_private_key: - print("Failed to load CA private key, aborting") + print('Failed to load CA private key, aborting') return None private_key = None @@ -480,9 +648,11 @@ def generate_certificate_sign(name, ca_name, install=False, file=False): cert_req = None if not ask_yes_no('Do you already have a certificate request?'): private_key, key_type = generate_private_key() - cert_req = generate_certificate_request(private_key, key_type, return_request=True) + cert_req = generate_certificate_request( + private_key, key_type, return_request=True + ) else: - print("Paste certificate request and press enter:") + print('Paste certificate request and press enter:') lines = [] curr_line = '' while True: @@ -492,18 +662,20 @@ def generate_certificate_sign(name, ca_name, install=False, file=False): lines.append(curr_line) if not lines: - print("Aborted") + print('Aborted') return None - wrap = lines[0].find('-----') < 0 # Only base64 pasted, add the CSR tags for parsing - cert_req = load_certificate_request("\n".join(lines), wrap) + wrap = ( + lines[0].find('-----') < 0 + ) # Only base64 pasted, add the CSR tags for parsing + cert_req = load_certificate_request('\n'.join(lines), wrap) if not cert_req: - print("Invalid certificate request") + print('Invalid certificate request') return None cert = generate_certificate(cert_req, ca_cert, ca_private_key, is_ca=False) - + passphrase = None if private_key is not None: passphrase = ask_passphrase() @@ -515,12 +687,17 @@ def generate_certificate_sign(name, ca_name, install=False, file=False): return None if install: - install_certificate(name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=False) + install_certificate( + name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=False + ) if file: write_file(f'{name}.pem', encode_certificate(cert)) if private_key is not None: - write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase)) + write_file( + f'{name}.key', encode_private_key(private_key, passphrase=passphrase) + ) + def generate_certificate_selfsign(name, install=False, file=False): private_key, key_type = generate_private_key() @@ -534,11 +711,21 @@ def generate_certificate_selfsign(name, install=False, file=False): return None if install: - install_certificate(name, cert, private_key=private_key, key_type=key_type, key_passphrase=passphrase, is_ca=False) + install_certificate( + name, + cert, + private_key=private_key, + key_type=key_type, + key_passphrase=passphrase, + is_ca=False, + ) if file: write_file(f'{name}.pem', encode_certificate(cert)) - write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase)) + write_file( + f'{name}.key', encode_private_key(private_key, passphrase=passphrase) + ) + def generate_certificate_revocation_list(ca_name, install=False, file=False): ca_dict = get_config_ca_certificate(ca_name) @@ -550,17 +737,19 @@ def generate_certificate_revocation_list(ca_name, install=False, file=False): ca_cert = load_certificate(ca_dict['certificate']) if not ca_cert: - print("Failed to load CA certificate, aborting") + print('Failed to load CA certificate, aborting') return None ca_private = ca_dict['private'] ca_private_passphrase = None if 'password_protected' in ca_private: ca_private_passphrase = ask_input('Enter CA private key passphrase:') - ca_private_key = load_private_key(ca_private['key'], passphrase=ca_private_passphrase) + ca_private_key = load_private_key( + ca_private['key'], passphrase=ca_private_passphrase + ) if not ca_private_key: - print("Failed to load CA private key, aborting") + print('Failed to load CA private key, aborting') return None revoked_certs = get_config_revoked_certificates() @@ -581,13 +770,13 @@ def generate_certificate_revocation_list(ca_name, install=False, file=False): continue if not to_revoke: - print("No revoked certificates to add to the CRL") + print('No revoked certificates to add to the CRL') return None crl = create_certificate_revocation_list(ca_cert, ca_private_key, to_revoke) if not crl: - print("Failed to create CRL") + print('Failed to create CRL') return None if not install and not file: @@ -598,7 +787,8 @@ def generate_certificate_revocation_list(ca_name, install=False, file=False): install_crl(ca_name, crl) if file: - write_file(f'{name}.crl', encode_certificate(crl)) + write_file(f'{ca_name}.crl', encode_certificate(crl)) + def generate_ssh_keypair(name, install=False, file=False): private_key, key_type = generate_private_key() @@ -607,29 +797,42 @@ def generate_ssh_keypair(name, install=False, file=False): if not install and not file: print(encode_public_key(public_key, encoding='OpenSSH', key_format='OpenSSH')) - print("") - print(encode_private_key(private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase)) + print('') + print( + encode_private_key( + private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase + ) + ) return None if install: install_ssh_key(name, public_key, private_key, passphrase) if file: - write_file(f'{name}.pem', encode_public_key(public_key, encoding='OpenSSH', key_format='OpenSSH')) - write_file(f'{name}.key', encode_private_key(private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase)) + write_file( + f'{name}.pem', + encode_public_key(public_key, encoding='OpenSSH', key_format='OpenSSH'), + ) + write_file( + f'{name}.key', + encode_private_key( + private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase + ), + ) + def generate_dh_parameters(name, install=False, file=False): bits = ask_input('Enter DH parameters key size:', default=2048, numeric_only=True) - print("Generating parameters...") + print('Generating parameters...') dh_params = create_dh_parameters(bits) if not dh_params: - print("Failed to create DH parameters") + print('Failed to create DH parameters') return None if not install and not file: - print("DH Parameters:") + print('DH Parameters:') print(encode_dh_parameters(dh_params)) if install: @@ -638,6 +841,7 @@ def generate_dh_parameters(name, install=False, file=False): if file: write_file(f'{name}.pem', encode_dh_parameters(dh_params)) + def generate_keypair(name, install=False, file=False): private_key, key_type = generate_private_key() public_key = private_key.public_key() @@ -645,7 +849,7 @@ def generate_keypair(name, install=False, file=False): if not install and not file: print(encode_public_key(public_key)) - print("") + print('') print(encode_private_key(private_key, passphrase=passphrase)) return None @@ -654,13 +858,16 @@ def generate_keypair(name, install=False, file=False): if file: write_file(f'{name}.pem', encode_public_key(public_key)) - write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase)) + write_file( + f'{name}.key', encode_private_key(private_key, passphrase=passphrase) + ) + def generate_openvpn_key(name, install=False, file=False): result = cmd('openvpn --genkey secret /dev/stdout | grep -o "^[^#]*"') if not result: - print("Failed to generate OpenVPN key") + print('Failed to generate OpenVPN key') return None if not install and not file: @@ -668,11 +875,13 @@ def generate_openvpn_key(name, install=False, file=False): return None if install: - key_lines = result.split("\n") - key_data = "".join(key_lines[1:-1]) # Remove wrapper tags and line endings + key_lines = result.split('\n') + key_data = ''.join(key_lines[1:-1]) # Remove wrapper tags and line endings key_version = '1' - version_search = re.search(r'BEGIN OpenVPN Static key V(\d+)', result) # Future-proofing (hopefully) + version_search = re.search( + r'BEGIN OpenVPN Static key V(\d+)', result + ) # Future-proofing (hopefully) if version_search: key_version = version_search[1] @@ -681,6 +890,7 @@ def generate_openvpn_key(name, install=False, file=False): if file: write_file(f'{name}.key', result) + def generate_wireguard_key(interface=None, install=False): private_key = cmd('wg genkey') public_key = cmd('wg pubkey', input=private_key) @@ -691,6 +901,7 @@ def generate_wireguard_key(interface=None, install=False): print(f'Private key: {private_key}') print(f'Public key: {public_key}', end='\n\n') + def generate_wireguard_psk(interface=None, peer=None, install=False): psk = cmd('wg genpsk') if interface and peer and install: @@ -698,8 +909,11 @@ def generate_wireguard_psk(interface=None, peer=None, install=False): else: print(f'Pre-shared key: {psk}') + # Import functions -def import_ca_certificate(name, path=None, key_path=None, no_prompt=False, passphrase=None): +def import_ca_certificate( + name, path=None, key_path=None, no_prompt=False, passphrase=None +): if path: if not os.path.exists(path): print(f'File not found: {path}') @@ -736,7 +950,10 @@ def import_ca_certificate(name, path=None, key_path=None, no_prompt=False, passp install_certificate(name, private_key=key, is_ca=True) -def import_certificate(name, path=None, key_path=None, no_prompt=False, passphrase=None): + +def import_certificate( + name, path=None, key_path=None, no_prompt=False, passphrase=None +): if path: if not os.path.exists(path): print(f'File not found: {path}') @@ -773,6 +990,7 @@ def import_certificate(name, path=None, key_path=None, no_prompt=False, passphra install_certificate(name, private_key=key, is_ca=False) + def import_crl(name, path): if not os.path.exists(path): print(f'File not found: {path}') @@ -790,6 +1008,7 @@ def import_crl(name, path): install_crl(name, crl) + def import_dh_parameters(name, path): if not os.path.exists(path): print(f'File not found: {path}') @@ -807,6 +1026,7 @@ def import_dh_parameters(name, path): install_dh_parameters(name, dh) + def import_keypair(name, path=None, key_path=None, no_prompt=False, passphrase=None): if path: if not os.path.exists(path): @@ -844,6 +1064,7 @@ def import_keypair(name, path=None, key_path=None, no_prompt=False, passphrase=N install_keypair(name, None, private_key=key, prompt=False) + def import_openvpn_secret(name, path): if not os.path.exists(path): print(f'File not found: {path}') @@ -853,19 +1074,134 @@ def import_openvpn_secret(name, path): key_version = '1' with open(path) as f: - key_lines = f.read().strip().split("\n") - key_lines = list(filter(lambda line: not line.strip().startswith('#'), key_lines)) # Remove commented lines - key_data = "".join(key_lines[1:-1]) # Remove wrapper tags and line endings - - version_search = re.search(r'BEGIN OpenVPN Static key V(\d+)', key_lines[0]) # Future-proofing (hopefully) + key_lines = f.read().strip().split('\n') + key_lines = list( + filter(lambda line: not line.strip().startswith('#'), key_lines) + ) # Remove commented lines + key_data = ''.join(key_lines[1:-1]) # Remove wrapper tags and line endings + + version_search = re.search( + r'BEGIN OpenVPN Static key V(\d+)', key_lines[0] + ) # Future-proofing (hopefully) if version_search: key_version = version_search[1] install_openvpn_key(name, key_data, key_version) -# Show functions -def show_certificate_authority(name=None, pem=False): - headers = ['Name', 'Subject', 'Issuer CN', 'Issued', 'Expiry', 'Private Key', 'Parent'] + +def generate_pki( + raw: bool, + pki_type: ArgsPkiTypeGen, + name: typing.Optional[str], + file: typing.Optional[bool], + install: typing.Optional[bool], + sign: typing.Optional[str], + self_sign: typing.Optional[bool], + key: typing.Optional[bool], + psk: typing.Optional[bool], + interface: typing.Optional[str], + peer: typing.Optional[str], +): + try: + if pki_type == 'ca': + if sign: + generate_ca_certificate_sign(name, sign, install=install, file=file) + else: + generate_ca_certificate(name, install=install, file=file) + elif pki_type == 'certificate': + if sign: + generate_certificate_sign(name, sign, install=install, file=file) + elif self_sign: + generate_certificate_selfsign(name, install=install, file=file) + else: + generate_certificate_request(name=name, install=install, file=file) + + elif pki_type == 'crl': + generate_certificate_revocation_list(name, install=install, file=file) + + elif pki_type == 'ssh': + generate_ssh_keypair(name, install=install, file=file) + + elif pki_type == 'dh': + generate_dh_parameters(name, install=install, file=file) + + elif pki_type == 'key-pair': + generate_keypair(name, install=install, file=file) + + elif pki_type == 'openvpn': + generate_openvpn_key(name, install=install, file=file) + + elif pki_type == 'wireguard': + # WireGuard supports writing key directly into the CLI, but this + # requires the vyos_libexec_dir environment variable to be set + os.environ['vyos_libexec_dir'] = '/usr/libexec/vyos' + + if key: + generate_wireguard_key(interface, install=install) + if psk: + generate_wireguard_psk(interface, peer=peer, install=install) + except KeyboardInterrupt: + print('Aborted') + sys.exit(0) + + +def import_pki( + name: str, + pki_type: ArgsPkiType, + filename: typing.Optional[str], + key_filename: typing.Optional[str], + no_prompt: typing.Optional[bool], + passphrase: typing.Optional[str], +): + try: + if pki_type == 'ca': + import_ca_certificate( + name, + path=filename, + key_path=key_filename, + no_prompt=no_prompt, + passphrase=passphrase, + ) + elif pki_type == 'certificate': + import_certificate( + name, + path=filename, + key_path=key_filename, + no_prompt=no_prompt, + passphrase=passphrase, + ) + elif pki_type == 'crl': + import_crl(name, filename) + elif pki_type == 'dh': + import_dh_parameters(name, filename) + elif pki_type == 'key-pair': + import_keypair( + name, + path=filename, + key_path=key_filename, + no_prompt=no_prompt, + passphrase=passphrase, + ) + elif pki_type == 'openvpn': + import_openvpn_secret(name, filename) + except KeyboardInterrupt: + print('Aborted') + sys.exit(0) + + +@_verify('ca') +def show_certificate_authority( + raw: bool, name: typing.Optional[str] = None, pem: typing.Optional[bool] = False +): + headers = [ + 'Name', + 'Subject', + 'Issuer CN', + 'Issued', + 'Expiry', + 'Private Key', + 'Parent', + ] data = [] certs = get_config_ca_certificate() if certs: @@ -882,7 +1218,7 @@ def show_certificate_authority(name=None, pem=False): return parent_ca_name = get_certificate_ca(cert, certs) - cert_issuer_cn = cert.issuer.rfc4514_string().split(",")[0] + cert_issuer_cn = cert.issuer.rfc4514_string().split(',')[0] if not parent_ca_name or parent_ca_name == cert_name: parent_ca_name = 'N/A' @@ -890,14 +1226,45 @@ def show_certificate_authority(name=None, pem=False): if not cert: continue - have_private = 'Yes' if 'private' in cert_dict and 'key' in cert_dict['private'] else 'No' - data.append([cert_name, cert.subject.rfc4514_string(), cert_issuer_cn, cert.not_valid_before, cert.not_valid_after, have_private, parent_ca_name]) - - print("Certificate Authorities:") + have_private = ( + 'Yes' + if 'private' in cert_dict and 'key' in cert_dict['private'] + else 'No' + ) + data.append( + [ + cert_name, + cert.subject.rfc4514_string(), + cert_issuer_cn, + cert.not_valid_before, + cert.not_valid_after, + have_private, + parent_ca_name, + ] + ) + + print('Certificate Authorities:') print(tabulate.tabulate(data, headers)) -def show_certificate(name=None, pem=False, fingerprint_hash=None): - headers = ['Name', 'Type', 'Subject CN', 'Issuer CN', 'Issued', 'Expiry', 'Revoked', 'Private Key', 'CA Present'] + +@_verify('certificate') +def show_certificate( + raw: bool, + name: typing.Optional[str] = None, + pem: typing.Optional[bool] = False, + fingerprint: typing.Optional[ArgsFingerprint] = None, +): + headers = [ + 'Name', + 'Type', + 'Subject CN', + 'Issuer CN', + 'Issued', + 'Expiry', + 'Revoked', + 'Private Key', + 'CA Present', + ] data = [] certs = get_config_certificate() if certs: @@ -917,13 +1284,13 @@ def show_certificate(name=None, pem=False, fingerprint_hash=None): if name and pem: print(encode_certificate(cert)) return - elif name and fingerprint_hash: - print(get_certificate_fingerprint(cert, fingerprint_hash)) + elif name and fingerprint: + print(get_certificate_fingerprint(cert, fingerprint)) return ca_name = get_certificate_ca(cert, ca_certs) - cert_subject_cn = cert.subject.rfc4514_string().split(",")[0] - cert_issuer_cn = cert.issuer.rfc4514_string().split(",")[0] + cert_subject_cn = cert.subject.rfc4514_string().split(',')[0] + cert_issuer_cn = cert.issuer.rfc4514_string().split(',')[0] cert_type = 'Unknown' try: @@ -932,21 +1299,37 @@ def show_certificate(name=None, pem=False, fingerprint_hash=None): cert_type = 'Server' elif ext and ExtendedKeyUsageOID.CLIENT_AUTH in ext.value: cert_type = 'Client' - except: + except Exception: pass revoked = 'Yes' if 'revoke' in cert_dict else 'No' - have_private = 'Yes' if 'private' in cert_dict and 'key' in cert_dict['private'] else 'No' + have_private = ( + 'Yes' + if 'private' in cert_dict and 'key' in cert_dict['private'] + else 'No' + ) have_ca = f'Yes ({ca_name})' if ca_name else 'No' - data.append([ - cert_name, cert_type, cert_subject_cn, cert_issuer_cn, - cert.not_valid_before, cert.not_valid_after, - revoked, have_private, have_ca]) - - print("Certificates:") + data.append( + [ + cert_name, + cert_type, + cert_subject_cn, + cert_issuer_cn, + cert.not_valid_before, + cert.not_valid_after, + revoked, + have_private, + have_ca, + ] + ) + + print('Certificates:') print(tabulate.tabulate(data, headers)) -def show_crl(name=None, pem=False): + +def show_crl( + raw: bool, name: typing.Optional[str] = None, pem: typing.Optional[bool] = False +): headers = ['CA Name', 'Updated', 'Revokes'] data = [] certs = get_config_ca_certificate() @@ -971,141 +1354,31 @@ def show_crl(name=None, pem=False): print(encode_certificate(crl)) continue - certs = get_revoked_by_serial_numbers([revoked.serial_number for revoked in crl]) - data.append([cert_name, crl.last_update, ", ".join(certs)]) + certs = get_revoked_by_serial_numbers( + [revoked.serial_number for revoked in crl] + ) + data.append([cert_name, crl.last_update, ', '.join(certs)]) if name and pem: return - print("Certificate Revocation Lists:") + print('Certificate Revocation Lists:') print(tabulate.tabulate(data, headers)) -if __name__ == '__main__': - parser = argparse.ArgumentParser() - parser.add_argument('--action', help='PKI action', required=True) - - # X509 - parser.add_argument('--ca', help='Certificate Authority', required=False) - parser.add_argument('--certificate', help='Certificate', required=False) - parser.add_argument('--crl', help='Certificate Revocation List', required=False) - parser.add_argument('--sign', help='Sign certificate with specified CA', required=False) - parser.add_argument('--self-sign', help='Self-sign the certificate', action='store_true') - parser.add_argument('--pem', help='Output using PEM encoding', action='store_true') - parser.add_argument('--fingerprint', help='Show fingerprint and exit', action='store') - # SSH - parser.add_argument('--ssh', help='SSH Key', required=False) +def show_all(raw: bool): + show_certificate_authority(raw) + print('\n') + show_certificate(raw) + print('\n') + show_crl(raw) - # DH - parser.add_argument('--dh', help='DH Parameters', required=False) - - # Key pair - parser.add_argument('--keypair', help='Key pair', required=False) - - # OpenVPN - parser.add_argument('--openvpn', help='OpenVPN TLS key', required=False) - - # WireGuard - parser.add_argument('--wireguard', help='Wireguard', action='store_true') - group = parser.add_mutually_exclusive_group() - group.add_argument('--key', help='Wireguard key pair', action='store_true', required=False) - group.add_argument('--psk', help='Wireguard pre shared key', action='store_true', required=False) - parser.add_argument('--interface', help='Install generated keys into running-config for named interface', action='store') - parser.add_argument('--peer', help='Install generated keys into running-config for peer', action='store') - - # Global - parser.add_argument('--file', help='Write generated keys into specified filename', action='store_true') - parser.add_argument('--install', help='Install generated keys into running-config', action='store_true') - - parser.add_argument('--filename', help='Write certificate into specified filename', action='store') - parser.add_argument('--key-filename', help='Write key into specified filename', action='store') - - parser.add_argument('--no-prompt', action='store_true', help='Perform action non-interactively') - parser.add_argument('--passphrase', help='A passphrase to decrypt the private key') - - args = parser.parse_args() +if __name__ == '__main__': try: - if args.action == 'generate': - if args.ca: - if args.sign: - generate_ca_certificate_sign(args.ca, args.sign, install=args.install, file=args.file) - else: - generate_ca_certificate(args.ca, install=args.install, file=args.file) - elif args.certificate: - if args.sign: - generate_certificate_sign(args.certificate, args.sign, install=args.install, file=args.file) - elif args.self_sign: - generate_certificate_selfsign(args.certificate, install=args.install, file=args.file) - else: - generate_certificate_request(name=args.certificate, install=args.install, file=args.file) - - elif args.crl: - generate_certificate_revocation_list(args.crl, install=args.install, file=args.file) - - elif args.ssh: - generate_ssh_keypair(args.ssh, install=args.install, file=args.file) - - elif args.dh: - generate_dh_parameters(args.dh, install=args.install, file=args.file) - - elif args.keypair: - generate_keypair(args.keypair, install=args.install, file=args.file) - - elif args.openvpn: - generate_openvpn_key(args.openvpn, install=args.install, file=args.file) - - elif args.wireguard: - # WireGuard supports writing key directly into the CLI, but this - # requires the vyos_libexec_dir environment variable to be set - os.environ["vyos_libexec_dir"] = "/usr/libexec/vyos" - - if args.key: - generate_wireguard_key(args.interface, install=args.install) - if args.psk: - generate_wireguard_psk(args.interface, peer=args.peer, install=args.install) - elif args.action == 'import': - if args.ca: - import_ca_certificate(args.ca, path=args.filename, key_path=args.key_filename, - no_prompt=args.no_prompt, passphrase=args.passphrase) - elif args.certificate: - import_certificate(args.certificate, path=args.filename, key_path=args.key_filename, - no_prompt=args.no_prompt, passphrase=args.passphrase) - elif args.crl: - import_crl(args.crl, args.filename) - elif args.dh: - import_dh_parameters(args.dh, args.filename) - elif args.keypair: - import_keypair(args.keypair, path=args.filename, key_path=args.key_filename, - no_prompt=args.no_prompt, passphrase=args.passphrase) - elif args.openvpn: - import_openvpn_secret(args.openvpn, args.filename) - elif args.action == 'show': - if args.ca: - ca_name = None if args.ca == 'all' else args.ca - if ca_name: - if not conf.exists(['pki', 'ca', ca_name]): - print(f'CA "{ca_name}" does not exist!') - exit(1) - show_certificate_authority(ca_name, args.pem) - elif args.certificate: - cert_name = None if args.certificate == 'all' else args.certificate - if cert_name: - if not conf.exists(['pki', 'certificate', cert_name]): - print(f'Certificate "{cert_name}" does not exist!') - exit(1) - if args.fingerprint is None: - show_certificate(None if args.certificate == 'all' else args.certificate, args.pem) - else: - show_certificate(args.certificate, fingerprint_hash=args.fingerprint) - elif args.crl: - show_crl(None if args.crl == 'all' else args.crl, args.pem) - else: - show_certificate_authority() - print('\n') - show_certificate() - print('\n') - show_crl() - except KeyboardInterrupt: - print("Aborted") - sys.exit(0) + res = vyos.opmode.run(sys.modules[__name__]) + if res: + print(res) + except (ValueError, vyos.opmode.Error) as e: + print(e) + sys.exit(1) diff --git a/src/op_mode/qos.py b/src/op_mode/qos.py index b8ca149a0..464b552ee 100755 --- a/src/op_mode/qos.py +++ b/src/op_mode/qos.py @@ -38,7 +38,7 @@ def get_tc_info(interface_dict, interface_name, policy_type): if not policy_name: return None, None - class_dict = op_mode_config_dict(['qos', 'policy', policy_type, policy_name], key_mangling=('-', '_'), + class_dict = op_mode_config_dict(['qos', 'policy', policy_type, policy_name], get_first_key=True) if not class_dict: return None, None diff --git a/src/op_mode/reset_wireguard.py b/src/op_mode/reset_wireguard.py new file mode 100755 index 000000000..1fcfb31b5 --- /dev/null +++ b/src/op_mode/reset_wireguard.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2025 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import sys +import typing + +import vyos.opmode + +from vyos.ifconfig import WireGuardIf +from vyos.configquery import ConfigTreeQuery + + +def _verify(func): + """Decorator checks if WireGuard interface config exists""" + from functools import wraps + + @wraps(func) + def _wrapper(*args, **kwargs): + config = ConfigTreeQuery() + interface = kwargs.get('interface') + if not config.exists(['interfaces', 'wireguard', interface]): + unconf_message = f'WireGuard interface {interface} is not configured' + raise vyos.opmode.UnconfiguredSubsystem(unconf_message) + return func(*args, **kwargs) + + return _wrapper + + +@_verify +def reset_peer(interface: str, peer: typing.Optional[str] = None): + intf = WireGuardIf(interface, create=False, debug=False) + return intf.operational.reset_peer(peer) + + +if __name__ == '__main__': + try: + res = vyos.opmode.run(sys.modules[__name__]) + if res: + print(res) + except (ValueError, vyos.opmode.Error) as e: + print(e) + sys.exit(1) diff --git a/src/op_mode/restart.py b/src/op_mode/restart.py index a83c8b9d8..efa835485 100755 --- a/src/op_mode/restart.py +++ b/src/op_mode/restart.py @@ -41,6 +41,10 @@ service_map = { 'systemd_service': 'pdns-recursor', 'path': ['service', 'dns', 'forwarding'], }, + 'haproxy': { + 'systemd_service': 'haproxy', + 'path': ['load-balancing', 'haproxy'], + }, 'igmp_proxy': { 'systemd_service': 'igmpproxy', 'path': ['protocols', 'igmp-proxy'], @@ -49,14 +53,14 @@ service_map = { 'systemd_service': 'strongswan', 'path': ['vpn', 'ipsec'], }, + 'load-balancing_wan': { + 'systemd_service': 'vyos-wan-load-balance', + 'path': ['load-balancing', 'wan'], + }, 'mdns_repeater': { 'systemd_service': 'avahi-daemon', 'path': ['service', 'mdns', 'repeater'], }, - 'reverse_proxy': { - 'systemd_service': 'haproxy', - 'path': ['load-balancing', 'reverse-proxy'], - }, 'router_advert': { 'systemd_service': 'radvd', 'path': ['service', 'router-advert'], @@ -83,10 +87,11 @@ services = typing.Literal[ 'dhcpv6', 'dns_dynamic', 'dns_forwarding', + 'haproxy', 'igmp_proxy', 'ipsec', + 'load-balancing_wan', 'mdns_repeater', - 'reverse_proxy', 'router_advert', 'snmp', 'ssh', diff --git a/src/op_mode/show_configuration_files.sh b/src/op_mode/show_configuration_files.sh deleted file mode 100755 index ad8e0747c..000000000 --- a/src/op_mode/show_configuration_files.sh +++ /dev/null @@ -1,10 +0,0 @@ -#!/bin/bash - -# Wrapper script for the show configuration files command -find ${vyatta_sysconfdir}/config/ \ - -type f \ - -not -name ".*" \ - -not -name "config.boot.*" \ - -printf "%f\t(%Tc)\t%T@\n" \ - | sort -r -k3 \ - | awk -F"\t" '{printf ("%-20s\t%s\n", $1,$2) ;}' diff --git a/src/op_mode/stp.py b/src/op_mode/stp.py new file mode 100755 index 000000000..fb57bd7ee --- /dev/null +++ b/src/op_mode/stp.py @@ -0,0 +1,185 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2025 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import sys +import typing +import json +from tabulate import tabulate + +import vyos.opmode +from vyos.utils.process import cmd +from vyos.utils.network import interface_exists + +def detailed_output(dataset, headers): + for data in dataset: + adjusted_rule = data + [""] * (len(headers) - len(data)) # account for different header length, like default-action + transformed_rule = [[header, adjusted_rule[i]] for i, header in enumerate(headers) if i < len(adjusted_rule)] # create key-pair list from headers and rules lists; wrap at 100 char + + print(tabulate(transformed_rule, tablefmt="presto")) + print() + +def _get_bridge_vlan_data(iface): + allowed_vlans = [] + native_vlan = None + vlanData = json.loads(cmd(f"bridge -j -d vlan show")) + for vlans in vlanData: + if vlans['ifname'] == iface: + for allowed in vlans['vlans']: + if "flags" in allowed and "PVID" in allowed["flags"]: + native_vlan = allowed['vlan'] + elif allowed.get('vlanEnd', None): + allowed_vlans.append(f"{allowed['vlan']}-{allowed['vlanEnd']}") + else: + allowed_vlans.append(str(allowed['vlan'])) + + if not allowed_vlans: + allowed_vlans = ["none"] + if not native_vlan: + native_vlan = "none" + + return ",".join(allowed_vlans), native_vlan + +def _get_stp_data(ifname, brInfo, brStatus): + tmpInfo = {} + + tmpInfo['bridge_name'] = brInfo.get('ifname') + tmpInfo['up_state'] = brInfo.get('operstate') + tmpInfo['priority'] = brInfo.get('linkinfo').get('info_data').get('priority') + tmpInfo['vlan_filtering'] = "Enabled" if brInfo.get('linkinfo').get('info_data').get('vlan_filtering') == 1 else "Disabled" + tmpInfo['vlan_protocol'] = brInfo.get('linkinfo').get('info_data').get('vlan_protocol') + + # The version of VyOS I tested had am issue with the "ip -d link show type bridge" + # output. The root_id was always the local bridge, even though the underlying system + # understood when it wasn't. Could be an upstream Bug. I pull from the "/sys/class/net" + # structure instead. This can be changed later if the "ip link" behavior is corrected. + + #tmpInfo['bridge_id'] = brInfo.get('linkinfo').get('info_data').get('bridge_id') + #tmpInfo['root_id'] = brInfo.get('linkinfo').get('info_data').get('root_id') + + tmpInfo['bridge_id'] = cmd(f"cat /sys/class/net/{brInfo.get('ifname')}/bridge/bridge_id").split('.') + tmpInfo['root_id'] = cmd(f"cat /sys/class/net/{brInfo.get('ifname')}/bridge/root_id").split('.') + + # The "/sys/class/net" structure stores the IDs without seperators like ':' or '.' + # This adds a ':' after every 2 characters to make it resemble a MAC Address + tmpInfo['bridge_id'][1] = ':'.join(tmpInfo['bridge_id'][1][i:i+2] for i in range(0, len(tmpInfo['bridge_id'][1]), 2)) + tmpInfo['root_id'][1] = ':'.join(tmpInfo['root_id'][1][i:i+2] for i in range(0, len(tmpInfo['root_id'][1]), 2)) + + tmpInfo['stp_state'] = "Enabled" if brInfo.get('linkinfo', {}).get('info_data', {}).get('stp_state') == 1 else "Disabled" + + # I don't call any of these values, but I created them to be called within raw output if desired + + tmpInfo['mcast_snooping'] = "Enabled" if brInfo.get('linkinfo').get('info_data').get('mcast_snooping') == 1 else "Disabled" + tmpInfo['rxbytes'] = brInfo.get('stats64').get('rx').get('bytes') + tmpInfo['rxpackets'] = brInfo.get('stats64').get('rx').get('packets') + tmpInfo['rxerrors'] = brInfo.get('stats64').get('rx').get('errors') + tmpInfo['rxdropped'] = brInfo.get('stats64').get('rx').get('dropped') + tmpInfo['rxover_errors'] = brInfo.get('stats64').get('rx').get('over_errors') + tmpInfo['rxmulticast'] = brInfo.get('stats64').get('rx').get('multicast') + tmpInfo['txbytes'] = brInfo.get('stats64').get('tx').get('bytes') + tmpInfo['txpackets'] = brInfo.get('stats64').get('tx').get('packets') + tmpInfo['txerrors'] = brInfo.get('stats64').get('tx').get('errors') + tmpInfo['txdropped'] = brInfo.get('stats64').get('tx').get('dropped') + tmpInfo['txcarrier_errors'] = brInfo.get('stats64').get('tx').get('carrier_errors') + tmpInfo['txcollosions'] = brInfo.get('stats64').get('tx').get('collisions') + + tmpStatus = [] + for members in brStatus: + if members.get('master') == brInfo.get('ifname'): + allowed_vlans, native_vlan = _get_bridge_vlan_data(members['ifname']) + tmpStatus.append({'interface': members.get('ifname'), + 'state': members.get('state').capitalize(), + 'mtu': members.get('mtu'), + 'pathcost': members.get('cost'), + 'bpduguard': "Enabled" if members.get('guard') == True else "Disabled", + 'rootguard': "Enabled" if members.get('root_block') == True else "Disabled", + 'mac_learning': "Enabled" if members.get('learning') == True else "Disabled", + 'neigh_suppress': "Enabled" if members.get('neigh_suppress') == True else "Disabled", + 'vlan_tunnel': "Enabled" if members.get('vlan_tunnel') == True else "Disabled", + 'isolated': "Enabled" if members.get('isolated') == True else "Disabled", + **({'allowed_vlans': allowed_vlans} if allowed_vlans else {}), + **({'native_vlan': native_vlan} if native_vlan else {})}) + + tmpInfo['members'] = tmpStatus + return tmpInfo + +def show_stp(raw: bool, ifname: typing.Optional[str], detail: bool): + rawList = [] + rawDict = {'stp': []} + + if ifname: + if not interface_exists(ifname): + raise vyos.opmode.Error(f"{ifname} does not exist!") + else: + ifname = "" + + bridgeInfo = json.loads(cmd(f"ip -j -d -s link show type bridge {ifname}")) + + if not bridgeInfo: + raise vyos.opmode.Error(f"No Bridges configured!") + + bridgeStatus = json.loads(cmd(f"bridge -j -s -d link show")) + + for bridges in bridgeInfo: + output_list = [] + amRoot = "" + bridgeDict = _get_stp_data(ifname, bridges, bridgeStatus) + + if bridgeDict['bridge_id'][1] == bridgeDict['root_id'][1]: + amRoot = " (This bridge is the root)" + + print('-' * 80) + print(f"Bridge interface {bridgeDict['bridge_name']} ({bridgeDict['up_state']}):\n") + print(f"Spanning Tree is {bridgeDict['stp_state']}") + print(f"Bridge ID {bridgeDict['bridge_id'][1]}, Priority {int(bridgeDict['bridge_id'][0], 16)}") + print(f"Root ID {bridgeDict['root_id'][1]}, Priority {int(bridgeDict['root_id'][0], 16)}{amRoot}") + print(f"VLANs {bridgeDict['vlan_filtering'].capitalize()}, Protocol {bridgeDict['vlan_protocol']}") + print() + + for members in bridgeDict['members']: + output_list.append([members['interface'], + members['state'], + *([members['pathcost']] if detail else []), + members['bpduguard'], + members['rootguard'], + members['mac_learning'], + *([members['neigh_suppress']] if detail else []), + *([members['vlan_tunnel']] if detail else []), + *([members['isolated']] if detail else []), + *([members['allowed_vlans']] if detail else []), + *([members['native_vlan']] if detail else [])]) + + if raw: + rawList.append(bridgeDict) + elif detail: + headers = ['Interface', 'State', 'Pathcost', 'BPDU_Guard', 'Root_Guard', 'Learning', 'Neighbor_Suppression', 'Q-in-Q', 'Port_Isolation', 'Allowed VLANs', 'Native VLAN'] + detailed_output(output_list, headers) + else: + headers = ['Interface', 'State', 'BPDU_Guard', 'Root_Guard', 'Learning'] + print(tabulate(output_list, headers)) + print() + + if raw: + rawDict['stp'] = rawList + return rawDict + +if __name__ == '__main__': + try: + res = vyos.opmode.run(sys.modules[__name__]) + if res: + print(res) + except (ValueError, vyos.opmode.Error) as e: + print(e) + sys.exit(1) diff --git a/src/op_mode/tech_support.py b/src/op_mode/tech_support.py index f60bb87ff..c4496dfa3 100644 --- a/src/op_mode/tech_support.py +++ b/src/op_mode/tech_support.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2024 VyOS maintainers and contributors +# Copyright (C) 2025 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -20,6 +20,7 @@ import json import vyos.opmode from vyos.utils.process import cmd +from vyos.base import Warning def _get_version_data(): from vyos.version import get_version_data @@ -51,7 +52,12 @@ def _get_storage(): def _get_devices(): devices = {} devices["pci"] = cmd("lspci") - devices["usb"] = cmd("lsusb") + + try: + devices["usb"] = cmd("lsusb") + except OSError: + Warning("Could not retrieve information about USB devices") + devices["usb"] = {} return devices @@ -97,21 +103,22 @@ def _get_boot_config(): return strip_config_source(config) def _get_config_scripts(): - from os import listdir + from os import walk from os.path import join from vyos.utils.file import read_file scripts = [] dir = '/config/scripts' - for f in listdir(dir): - script = {} - path = join(dir, f) - data = read_file(path) - script["path"] = path - script["data"] = data - - scripts.append(script) + for dirpath, _, filenames in walk(dir): + for filename in filenames: + script = {} + path = join(dirpath, filename) + data = read_file(path) + script["path"] = path + script["data"] = data + + scripts.append(script) return scripts diff --git a/src/op_mode/vrrp.py b/src/op_mode/vrrp.py index 60be86065..ef1338e23 100755 --- a/src/op_mode/vrrp.py +++ b/src/op_mode/vrrp.py @@ -13,47 +13,324 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. - +import json import sys -import argparse +import typing + +from jinja2 import Template -from vyos.configquery import ConfigTreeQuery -from vyos.ifconfig.vrrp import VRRP +import vyos.opmode +from vyos.ifconfig import VRRP from vyos.ifconfig.vrrp import VRRPNoData -parser = argparse.ArgumentParser() -group = parser.add_mutually_exclusive_group() -group.add_argument("-s", "--summary", action="store_true", help="Print VRRP summary") -group.add_argument("-t", "--statistics", action="store_true", help="Print VRRP statistics") -group.add_argument("-d", "--data", action="store_true", help="Print detailed VRRP data") - -args = parser.parse_args() - -def is_configured(): - """ Check if VRRP is configured """ - config = ConfigTreeQuery() - if not config.exists(['high-availability', 'vrrp', 'group']): - return False - return True - -# Exit early if VRRP is dead or not configured -if is_configured() == False: - print('VRRP not configured!') - exit(0) -if not VRRP.is_running(): - print('VRRP is not running') - sys.exit(0) - -try: - if args.summary: - print(VRRP.format(VRRP.collect('json'))) - elif args.statistics: - print(VRRP.collect('stats')) - elif args.data: - print(VRRP.collect('state')) - else: - parser.print_help() + +stat_template = Template(""" +{% for rec in instances %} +VRRP Instance: {{rec.instance}} + Advertisements: + Received: {{rec.advert_rcvd}} + Sent: {{rec.advert_sent}} + Became master: {{rec.become_master}} + Released master: {{rec.release_master}} + Packet Errors: + Length: {{rec.packet_len_err}} + TTL: {{rec.ip_ttl_err}} + Invalid Type: {{rec.invalid_type_rcvd}} + Advertisement Interval: {{rec.advert_interval_err}} + Address List: {{rec.addr_list_err}} + Authentication Errors: + Invalid Type: {{rec.invalid_authtype}} + Type Mismatch: {{rec.authtype_mismatch}} + Failure: {{rec.auth_failure}} + Priority Zero: + Received: {{rec.pri_zero_rcvd}} + Sent: {{rec.pri_zero_sent}} +{% endfor %} +""") + +detail_template = Template(""" +{%- for rec in instances %} + VRRP Instance: {{rec.iname}} + VRRP Version: {{rec.version}} + State: {{rec.state}} + {% if rec.state == 'BACKUP' -%} + Master priority: {{ rec.master_priority }} + {% if rec.version == 3 -%} + Master advert interval: {{ rec.master_adver_int }} + {% endif -%} + {% endif -%} + Wantstate: {{rec.wantstate}} + Last transition: {{rec.last_transition}} + Interface: {{rec.ifp_ifname}} + {% if rec.dont_track_primary > 0 -%} + VRRP interface tracking disabled + {% endif -%} + {% if rec.skip_check_adv_addr > 0 -%} + Skip checking advert IP addresses + {% endif -%} + {% if rec.strict_mode > 0 -%} + Enforcing strict VRRP compliance + {% endif -%} + Gratuitous ARP delay: {{rec.garp_delay}} + Gratuitous ARP repeat: {{rec.garp_rep}} + Gratuitous ARP refresh: {{rec.garp_refresh}} + Gratuitous ARP refresh repeat: {{rec.garp_refresh_rep}} + Gratuitous ARP lower priority delay: {{rec.garp_lower_prio_delay}} + Gratuitous ARP lower priority repeat: {{rec.garp_lower_prio_rep}} + Send advert after receive lower priority advert: {{rec.lower_prio_no_advert}} + Send advert after receive higher priority advert: {{rec.higher_prio_send_advert}} + Virtual Router ID: {{rec.vrid}} + Priority: {{rec.base_priority}} + Effective priority: {{rec.effective_priority}} + Advert interval: {{rec.adver_int}} sec + Accept: {{rec.accept}} + Preempt: {{rec.nopreempt}} + {% if rec.preempt_delay -%} + Preempt delay: {{rec.preempt_delay}} + {% endif -%} + Promote secondaries: {{rec.promote_secondaries}} + Authentication type: {{rec.auth_type}} + {% if rec.vips %} + Virtual IP ({{ rec.vips | length }}): + {% for ip in rec.vips -%} + {{ip}} + {% endfor -%} + {% endif -%} + {% if rec.evips %} + Virtual IP Excluded: + {% for ip in rec.evips -%} + {{ip}} + {% endfor -%} + {% endif -%} + {% if rec.vroutes %} + Virtual Routes: + {% for route in rec.vroutes -%} + {{route}} + {% endfor -%} + {% endif -%} + {% if rec.vrules %} + Virtual Rules: + {% for rule in rec.vrules -%} + {{rule}} + {% endfor -%} + {% endif -%} + {% if rec.track_ifp %} + Tracked interfaces: + {% for ifp in rec.track_ifp -%} + {{ifp}} + {% endfor -%} + {% endif -%} + {% if rec.track_script %} + Tracked scripts: + {% for script in rec.track_script -%} + {{script}} + {% endfor -%} + {% endif %} + Using smtp notification: {{rec.smtp_alert}} + Notify deleted: {{rec.notify_deleted}} +{% endfor %} +""") + +# https://github.com/acassen/keepalived/blob/59c39afe7410f927c9894a1bafb87e398c6f02be/keepalived/include/vrrp.h#L126 +VRRP_AUTH_NONE = 0 +VRRP_AUTH_PASS = 1 +VRRP_AUTH_AH = 2 + +# https://github.com/acassen/keepalived/blob/59c39afe7410f927c9894a1bafb87e398c6f02be/keepalived/include/vrrp.h#L417 +VRRP_STATE_INIT = 0 +VRRP_STATE_BACK = 1 +VRRP_STATE_MAST = 2 +VRRP_STATE_FAULT = 3 + +VRRP_AUTH_TO_NAME = { + VRRP_AUTH_NONE: 'NONE', + VRRP_AUTH_PASS: 'SIMPLE_PASSWORD', + VRRP_AUTH_AH: 'IPSEC_AH', +} + +VRRP_STATE_TO_NAME = { + VRRP_STATE_INIT: 'INIT', + VRRP_STATE_BACK: 'BACKUP', + VRRP_STATE_MAST: 'MASTER', + VRRP_STATE_FAULT: 'FAULT', +} + + +def _get_raw_data(group_name: str = None) -> list: + """ + Retrieve raw JSON data for all VRRP groups. + + Args: + group_name (str, optional): If provided, filters the data to only + include the specified vrrp group. + + Returns: + list: A list of raw JSON data for VRRP groups, filtered by group_name + if specified. + """ + try: + output = VRRP.collect('json') + except VRRPNoData as e: + raise vyos.opmode.DataUnavailable(f'{e}') + + data = json.loads(output) + + if not data: + return [] + + if group_name is not None: + for rec in data: + if rec['data'].get('iname') == group_name: + return [rec] + return [] + return data + + +def _get_formatted_statistics_output(data: list) -> str: + """ + Prepare formatted statistics output from the given data. + + Args: + data (list): A list of dictionaries containing vrrp grop information + and statistics. + + Returns: + str: Rendered statistics output based on the provided data. + """ + instances = list() + for instance in data: + instances.append( + {'instance': instance['data'].get('iname'), **instance['stats']} + ) + + return stat_template.render(instances=instances) + + +def _process_field(data: dict, field: str, true_value: str, false_value: str): + """ + Updates the given field in the data dictionary with a specified value based + on its truthiness. + + Args: + data (dict): The dictionary containing the field to be processed. + field (str): The key representing the field in the dictionary. + true_value (str): The value to set if the field's value is truthy. + false_value (str): The value to set if the field's value is falsy. + + Returns: + None: The function modifies the dictionary in place. + """ + data[field] = true_value if data.get(field) else false_value + + +def _get_formatted_detail_output(data: list) -> str: + """ + Prepare formatted detail information output from the given data. + + Args: + data (list): A list of dictionaries containing vrrp grop information + and statistics. + + Returns: + str: Rendered detail info output based on the provided data. + """ + instances = list() + for instance in data: + instance['data']['state'] = VRRP_STATE_TO_NAME.get( + instance['data'].get('state'), 'unknown' + ) + instance['data']['wantstate'] = VRRP_STATE_TO_NAME.get( + instance['data'].get('wantstate'), 'unknown' + ) + instance['data']['auth_type'] = VRRP_AUTH_TO_NAME.get( + instance['data'].get('auth_type'), 'unknown' + ) + _process_field(instance['data'], 'lower_prio_no_advert', 'false', 'true') + _process_field(instance['data'], 'higher_prio_send_advert', 'true', 'false') + _process_field(instance['data'], 'accept', 'Enabled', 'Disabled') + _process_field(instance['data'], 'notify_deleted', 'Deleted', 'Fault') + _process_field(instance['data'], 'smtp_alert', 'yes', 'no') + _process_field(instance['data'], 'nopreempt', 'Disabled', 'Enabled') + _process_field(instance['data'], 'promote_secondaries', 'Enabled', 'Disabled') + instance['data']['vips'] = instance['data'].get('vips', False) + instance['data']['evips'] = instance['data'].get('evips', False) + instance['data']['vroutes'] = instance['data'].get('vroutes', False) + instance['data']['vrules'] = instance['data'].get('vrules', False) + + instances.append(instance['data']) + + return detail_template.render(instances=instances) + + +def show_detail( + raw: bool, group_name: typing.Optional[str] = None +) -> typing.Union[list, str]: + """ + Display detailed information about the VRRP group. + + Args: + raw (bool): If True, return raw data instead of formatted output. + group_name (str, optional): Filter the data by a specific group name, + if provided. + + Returns: + list or str: Raw data if `raw` is True, otherwise a formatted detail + output. + """ + data = _get_raw_data(group_name) + + if raw: + return data + + return _get_formatted_detail_output(data) + + +def show_statistics( + raw: bool, group_name: typing.Optional[str] = None +) -> typing.Union[list, str]: + """ + Display VRRP group statistics. + + Args: + raw (bool): If True, return raw data instead of formatted output. + group_name (str, optional): Filter the data by a specific group name, + if provided. + + Returns: + list or str: Raw data if `raw` is True, otherwise a formatted statistic + output. + """ + data = _get_raw_data(group_name) + + if raw: + return data + + return _get_formatted_statistics_output(data) + + +def show_summary(raw: bool) -> typing.Union[list, str]: + """ + Display a summary of VRRP group. + + Args: + raw (bool): If True, return raw data instead of formatted output. + + Returns: + list or str: Raw data if `raw` is True, otherwise a formatted summary output. + """ + data = _get_raw_data() + + if raw: + return data + + return VRRP.format(data) + + +if __name__ == '__main__': + try: + res = vyos.opmode.run(sys.modules[__name__]) + if res: + print(res) + except (ValueError, vyos.opmode.Error) as e: + print(e) sys.exit(1) -except VRRPNoData as e: - print(e) - sys.exit(1) diff --git a/src/op_mode/vtysh_wrapper.sh b/src/op_mode/vtysh_wrapper.sh index 25d09ce77..bc472f7bb 100755 --- a/src/op_mode/vtysh_wrapper.sh +++ b/src/op_mode/vtysh_wrapper.sh @@ -2,5 +2,5 @@ declare -a tmp # FRR uses ospf6 where we use ospfv3, and we use reset over clear for BGP, # thus alter the commands -tmp=$(echo $@ | sed -e "s/ospfv3/ospf6/" | sed -e "s/^reset bgp/clear bgp/" | sed -e "s/^reset ip bgp/clear ip bgp/") +tmp=$(echo $@ | sed -e "s/ospfv3/ospf6/" | sed -e "s/^reset bgp/clear bgp/" | sed -e "s/^reset ip bgp/clear ip bgp/"| sed -e "s/^reset ip nhrp/clear ip nhrp/") vtysh -c "$tmp" diff --git a/src/op_mode/zone.py b/src/op_mode/zone.py index 49fecdf28..df39549d2 100644 --- a/src/op_mode/zone.py +++ b/src/op_mode/zone.py @@ -56,10 +56,15 @@ def _convert_one_zone_data(zone: str, zone_config: dict) -> dict: from_zone_dict['firewall_v6'] = dict_search( 'firewall.ipv6_name', from_zone_config) list_of_rules.append(from_zone_dict) + zone_members =[] + interface_members = dict_search('member.interface', zone_config) + vrf_members = dict_search('member.vrf', zone_config) + zone_members += interface_members if interface_members is not None else [] + zone_members += vrf_members if vrf_members is not None else [] zone_dict = { 'name': zone, - 'interface': dict_search('interface', zone_config), + 'members': zone_members, 'type': 'LOCAL' if dict_search('local_zone', zone_config) is not None else None, } @@ -126,7 +131,7 @@ def output_zone_list(zone_conf: dict) -> list: if zone_conf['type'] == 'LOCAL': zone_info.append('LOCAL') else: - zone_info.append("\n".join(zone_conf['interface'])) + zone_info.append("\n".join(zone_conf['members'])) from_zone = [] firewall = [] @@ -175,7 +180,7 @@ def get_formatted_output(zone_policy: list) -> str: :rtype: str """ headers = ["Zone", - "Interfaces", + "Members", "From Zone", "Firewall IPv4", "Firewall IPv6" |