diff options
Diffstat (limited to 'python')
46 files changed, 310 insertions, 1352 deletions
diff --git a/python/vyos/accel_ppp.py b/python/vyos/accel_ppp.py index 0b4f8a9fe..bae695fc3 100644 --- a/python/vyos/accel_ppp.py +++ b/python/vyos/accel_ppp.py @@ -1,6 +1,4 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2022 VyOS maintainers and contributors +# Copyright (C) 2022-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -13,14 +11,9 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -# -import sys - -import vyos.opmode from vyos.utils.process import rc_cmd - def get_server_statistics(accel_statistics, pattern, sep=':') -> dict: import re diff --git a/python/vyos/accel_ppp_util.py b/python/vyos/accel_ppp_util.py index 845b2f5f0..ae75e6654 100644 --- a/python/vyos/accel_ppp_util.py +++ b/python/vyos/accel_ppp_util.py @@ -163,13 +163,23 @@ def verify_accel_ppp_authentication(config, local_users=True): if "key" not in radius_config: raise ConfigError(f'Missing RADIUS secret key for server "{server}"') + if dict_search("server_type", config) == 'ipoe' and dict_search( + "authentication.mode", config) == "local": + if not dict_search("authentication.interface", config): + raise ConfigError( + "Authentication mode local requires authentication interface to be configured!" + ) + for interface in dict_search("authentication.interface", config): + user_config = config["authentication"]["interface"][interface] + if "mac" not in user_config: + raise ConfigError( + f'Users MAC addreses are not configured for interface "{interface}"') + if dict_search('authentication.radius.dynamic_author.server', config): if not dict_search('authentication.radius.dynamic_author.key', config): raise ConfigError('DAE/CoA server key required!') - - def verify_accel_ppp_ip_pool(vpn_config): """ Common helper function which must be used by Accel-PPP @@ -192,7 +202,9 @@ def verify_accel_ppp_ip_pool(vpn_config): default_pool = dict_search("default_pool", vpn_config) if default_pool: - if default_pool not in dict_search("client_ip_pool", vpn_config): + if not dict_search('client_ip_pool', + vpn_config) or default_pool not in dict_search( + 'client_ip_pool', vpn_config): raise ConfigError(f'Default pool "{default_pool}" does not exists') if 'client_ipv6_pool' in vpn_config: @@ -204,8 +216,20 @@ def verify_accel_ppp_ip_pool(vpn_config): if dict_search('authentication.mode', vpn_config) in ['local', 'noauth']: if not dict_search('client_ip_pool', vpn_config) and not dict_search( 'client_ipv6_pool', vpn_config): - raise ConfigError( - "Local auth mode requires local client-ip-pool or client-ipv6-pool to be configured!") + if dict_search('server_type', vpn_config) == 'ipoe': + if 'interface' in vpn_config: + for interface, interface_config in vpn_config['interface'].items(): + if dict_search('client_subnet', interface_config): + break + else: + raise ConfigError( + 'Local auth and noauth mode requires local client-ip-pool \ + or client-ipv6-pool or client-subnet to be configured!') + else: + raise ConfigError( + "Local auth mode requires local client-ip-pool \ + or client-ipv6-pool to be configured!") + if dict_search('client_ip_pool', vpn_config) and not dict_search( 'default_pool', vpn_config): Warning("'default-pool' is not defined") diff --git a/python/vyos/config.py b/python/vyos/config.py index 7619ad367..cca65f0eb 100644 --- a/python/vyos/config.py +++ b/python/vyos/config.py @@ -1,4 +1,4 @@ -# Copyright 2017, 2019-2023 VyOS maintainers and contributors <maintainers@vyos.io> +# Copyright 2017-2024 VyOS maintainers and contributors <maintainers@vyos.io> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -60,12 +60,10 @@ In configuration mode, "base" functions like `exists`, `return_value` return val while functions prefixed "effective" return values from the running config. In operational mode, all functions return values from the running config. - """ import re import json -from copy import deepcopy from typing import Union import vyos.configtree diff --git a/python/vyos/config_mgmt.py b/python/vyos/config_mgmt.py index 28ccee769..fc51d781c 100644 --- a/python/vyos/config_mgmt.py +++ b/python/vyos/config_mgmt.py @@ -1,4 +1,4 @@ -# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# Copyright 2023-2024 VyOS maintainers and contributors <maintainers@vyos.io> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -19,18 +19,23 @@ import sys import gzip import logging -from typing import Optional, Tuple, Union +from typing import Optional +from typing import Tuple from filecmp import cmp from datetime import datetime -from textwrap import dedent, indent +from textwrap import dedent from pathlib import Path from tabulate import tabulate from shutil import copy, chown -from urllib.parse import urlsplit, urlunsplit +from urllib.parse import urlsplit +from urllib.parse import urlunsplit from vyos.config import Config -from vyos.configtree import ConfigTree, ConfigTreeError, show_diff -from vyos.load_config import load, LoadConfigError +from vyos.configtree import ConfigTree +from vyos.configtree import ConfigTreeError +from vyos.configtree import show_diff +from vyos.load_config import load +from vyos.load_config import LoadConfigError from vyos.defaults import directories from vyos.version import get_full_version_data from vyos.utils.io import ask_yes_no diff --git a/python/vyos/configdict.py b/python/vyos/configdict.py index 4111d7271..870d7cfda 100644 --- a/python/vyos/configdict.py +++ b/python/vyos/configdict.py @@ -1,4 +1,4 @@ -# Copyright 2019-2022 VyOS maintainers and contributors <maintainers@vyos.io> +# Copyright 2019-2024 VyOS maintainers and contributors <maintainers@vyos.io> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -203,8 +203,6 @@ def is_member(conf, interface, intftype=None): empty -> Interface is not a member key -> Interface is a member of this interface """ - from vyos.ifconfig import Section - ret_val = {} intftypes = ['bonding', 'bridge'] @@ -633,7 +631,7 @@ def get_accel_dict(config, base, chap_secrets, with_pki=False): Return a dictionary with the necessary interface config keys. """ - from vyos.utils.system import get_half_cpus + from vyos.cpu import get_core_count from vyos.template import is_ipv4 dict = config.get_config_dict(base, key_mangling=('-', '_'), @@ -643,7 +641,7 @@ def get_accel_dict(config, base, chap_secrets, with_pki=False): with_pki=with_pki) # set CPUs cores to process requests - dict.update({'thread_count' : get_half_cpus()}) + dict.update({'thread_count' : get_core_count()}) # we need to store the path to the secrets file dict.update({'chap_secrets_file' : chap_secrets}) diff --git a/python/vyos/configdiff.py b/python/vyos/configdiff.py index 03b06c6d9..f975df45d 100644 --- a/python/vyos/configdiff.py +++ b/python/vyos/configdiff.py @@ -1,4 +1,4 @@ -# Copyright 2020 VyOS maintainers and contributors <maintainers@vyos.io> +# Copyright 2020-2024 VyOS maintainers and contributors <maintainers@vyos.io> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -13,12 +13,12 @@ # You should have received a copy of the GNU Lesser General Public License # along with this library. If not, see <http://www.gnu.org/licenses/>. -from enum import IntFlag, auto +from enum import IntFlag +from enum import auto from vyos.config import Config from vyos.configtree import DiffTree from vyos.configdict import dict_merge -from vyos.configdict import list_diff from vyos.utils.dict import get_sub_dict from vyos.utils.dict import mangle_dict_keys from vyos.utils.dict import dict_search_args diff --git a/python/vyos/configsession.py b/python/vyos/configsession.py index 90842b749..ab7a631bb 100644 --- a/python/vyos/configsession.py +++ b/python/vyos/configsession.py @@ -176,6 +176,25 @@ class ConfigSession(object): except (ValueError, ConfigSessionError) as e: raise ConfigSessionError(e) + def set_section_tree(self, d: dict): + try: + if d: + for p in dict_to_paths(d): + self.set(p) + except (ValueError, ConfigSessionError) as e: + raise ConfigSessionError(e) + + def load_section_tree(self, mask: dict, d: dict): + try: + if mask: + for p in dict_to_paths(mask): + self.delete(p) + if d: + for p in dict_to_paths(d): + self.set(p) + except (ValueError, ConfigSessionError) as e: + raise ConfigSessionError(e) + def comment(self, path, value=None): if not value: value = [""] diff --git a/python/vyos/configtree.py b/python/vyos/configtree.py index 423fe01ed..e4b282d72 100644 --- a/python/vyos/configtree.py +++ b/python/vyos/configtree.py @@ -401,6 +401,30 @@ def union(left, right, libpath=LIBPATH): return tree +def mask_inclusive(left, right, libpath=LIBPATH): + if not (isinstance(left, ConfigTree) and isinstance(right, ConfigTree)): + raise TypeError("Arguments must be instances of ConfigTree") + + try: + __lib = cdll.LoadLibrary(libpath) + __mask_tree = __lib.mask_tree + __mask_tree.argtypes = [c_void_p, c_void_p] + __mask_tree.restype = c_void_p + __get_error = __lib.get_error + __get_error.argtypes = [] + __get_error.restype = c_char_p + + res = __mask_tree(left._get_config(), right._get_config()) + except Exception as e: + raise ConfigTreeError(e) + if not res: + msg = __get_error().decode() + raise ConfigTreeError(msg) + + tree = ConfigTree(address=res) + + return tree + def reference_tree_to_json(from_dir, to_file, libpath=LIBPATH): try: __lib = cdll.LoadLibrary(libpath) diff --git a/python/vyos/configverify.py b/python/vyos/configverify.py index 6508ccdd9..4cb84194a 100644 --- a/python/vyos/configverify.py +++ b/python/vyos/configverify.py @@ -23,8 +23,6 @@ from vyos import ConfigError from vyos.utils.dict import dict_search -from vyos.utils.dict import dict_search_recursive - # pattern re-used in ipsec migration script dynamic_interface_pattern = r'(ppp|pppoe|sstpc|l2tp|ipoe)[0-9]+' @@ -62,8 +60,8 @@ def verify_mtu_parent(config, parent): mtu = int(config['mtu']) parent_mtu = int(parent['mtu']) if mtu > parent_mtu: - raise ConfigError(f'Interface MTU ({mtu}) too high, ' \ - f'parent interface MTU is {parent_mtu}!') + raise ConfigError(f'Interface MTU "{mtu}" too high, ' \ + f'parent interface MTU is "{parent_mtu}"!') def verify_mtu_ipv6(config): """ @@ -78,7 +76,7 @@ def verify_mtu_ipv6(config): if int(config['mtu']) < min_mtu: interface = config['ifname'] error_msg = f'IPv6 address will be configured on interface "{interface}",\n' \ - f'the required minimum MTU is {min_mtu}!' + f'the required minimum MTU is "{min_mtu}"!' if 'address' in config: for address in config['address']: @@ -99,10 +97,17 @@ def verify_vrf(config): Common helper function used by interface implementations to perform recurring validation of VRF configuration. """ - from netifaces import interfaces - if 'vrf' in config and config['vrf'] != 'default': - if config['vrf'] not in interfaces(): - raise ConfigError('VRF "{vrf}" does not exist'.format(**config)) + from vyos.utils.network import interface_exists + if 'vrf' in config: + vrfs = config['vrf'] + if isinstance(vrfs, str): + vrfs = [vrfs] + + for vrf in vrfs: + if vrf == 'default': + continue + if not interface_exists(vrf): + raise ConfigError(f'VRF "{vrf}" does not exist!') if 'is_bridge_member' in config: raise ConfigError( @@ -162,43 +167,6 @@ def verify_tunnel(config): if 'source_address' in config and is_ipv6(config['source_address']): raise ConfigError('Can not use local IPv6 address is for mGRE tunnels') -def verify_eapol(config): - """ - Common helper function used by interface implementations to perform - recurring validation of EAPoL configuration. - """ - if 'eapol' in config: - if 'certificate' not in config['eapol']: - raise ConfigError('Certificate must be specified when using EAPoL!') - - if 'pki' not in config or 'certificate' not in config['pki']: - raise ConfigError('Invalid certificate specified for EAPoL') - - cert_name = config['eapol']['certificate'] - if cert_name not in config['pki']['certificate']: - raise ConfigError('Invalid certificate specified for EAPoL') - - cert = config['pki']['certificate'][cert_name] - - if 'certificate' not in cert or 'private' not in cert or 'key' not in cert['private']: - raise ConfigError('Invalid certificate/private key specified for EAPoL') - - if 'password_protected' in cert['private']: - raise ConfigError('Encrypted private key cannot be used for EAPoL') - - if 'ca_certificate' in config['eapol']: - if 'ca' not in config['pki']: - raise ConfigError('Invalid CA certificate specified for EAPoL') - - for ca_cert_name in config['eapol']['ca_certificate']: - if ca_cert_name not in config['pki']['ca']: - raise ConfigError('Invalid CA certificate specified for EAPoL') - - ca_cert = config['pki']['ca'][ca_cert_name] - - if 'certificate' not in ca_cert: - raise ConfigError('Invalid CA certificate specified for EAPoL') - def verify_mirror_redirect(config): """ Common helper function used by interface implementations to perform @@ -206,13 +174,13 @@ def verify_mirror_redirect(config): It makes no sense to mirror traffic back at yourself! """ - import os + from vyos.utils.network import interface_exists if {'mirror', 'redirect'} <= set(config): raise ConfigError('Mirror and redirect can not be enabled at the same time!') if 'mirror' in config: for direction, mirror_interface in config['mirror'].items(): - if not os.path.exists(f'/sys/class/net/{mirror_interface}'): + if not interface_exists(mirror_interface): raise ConfigError(f'Requested mirror interface "{mirror_interface}" '\ 'does not exist!') @@ -222,7 +190,7 @@ def verify_mirror_redirect(config): if 'redirect' in config: redirect_ifname = config['redirect'] - if not os.path.exists(f'/sys/class/net/{redirect_ifname}'): + if not interface_exists(redirect_ifname): raise ConfigError(f'Requested redirect interface "{redirect_ifname}" '\ 'does not exist!') @@ -276,10 +244,10 @@ def verify_interface_exists(ifname, warning_only=False): if the interface is defined on the CLI, if it's not found we try if it exists at the OS level. """ - import os from vyos.base import Warning from vyos.configquery import ConfigTreeQuery from vyos.utils.dict import dict_search_recursive + from vyos.utils.network import interface_exists # Check if interface is present in CLI config config = ConfigTreeQuery() @@ -288,7 +256,7 @@ def verify_interface_exists(ifname, warning_only=False): return True # Interface not found on CLI, try Linux Kernel - if os.path.exists(f'/sys/class/net/{ifname}'): + if interface_exists(ifname): return True message = f'Interface "{ifname}" does not exist!' @@ -304,7 +272,7 @@ def verify_source_interface(config): required by e.g. peth/MACvlan, MACsec ... """ import re - from netifaces import interfaces + from vyos.utils.network import interface_exists ifname = config['ifname'] if 'source_interface' not in config: @@ -316,7 +284,7 @@ def verify_source_interface(config): if tmp.match(src_ifname): raise ConfigError(f'Can not source "{ifname}" from dynamic interface "{src_ifname}"!') - if src_ifname not in interfaces(): + if not interface_exists(src_ifname): raise ConfigError(f'Specified source-interface {src_ifname} does not exist') if 'source_interface_is_bridge_member' in config: @@ -487,3 +455,69 @@ def verify_access_list(access_list, config, version=''): # Check if the specified ACL exists, if not error out if dict_search(f'policy.access-list{version}.{access_list}', config) == None: raise ConfigError(f'Specified access-list{version} "{access_list}" does not exist!') + +def verify_pki_certificate(config: dict, cert_name: str, no_password_protected: bool=False): + """ + Common helper function user by PKI consumers to perform recurring + validation functions for PEM based certificates + """ + if 'pki' not in config: + raise ConfigError('PKI is not configured!') + + if 'certificate' not in config['pki']: + raise ConfigError('PKI does not contain any certificates!') + + if cert_name not in config['pki']['certificate']: + raise ConfigError(f'Certificate "{cert_name}" not found in configuration!') + + pki_cert = config['pki']['certificate'][cert_name] + if 'certificate' not in pki_cert: + raise ConfigError(f'PEM certificate for "{cert_name}" missing in configuration!') + + if 'private' not in pki_cert or 'key' not in pki_cert['private']: + raise ConfigError(f'PEM private key for "{cert_name}" missing in configuration!') + + if no_password_protected and 'password_protected' in pki_cert['private']: + raise ConfigError('Password protected PEM private key is not supported!') + +def verify_pki_ca_certificate(config: dict, ca_name: str): + """ + Common helper function user by PKI consumers to perform recurring + validation functions for PEM based CA certificates + """ + if 'pki' not in config: + raise ConfigError('PKI is not configured!') + + if 'ca' not in config['pki']: + raise ConfigError('PKI does not contain any CA certificates!') + + if ca_name not in config['pki']['ca']: + raise ConfigError(f'CA Certificate "{ca_name}" not found in configuration!') + + pki_cert = config['pki']['ca'][ca_name] + if 'certificate' not in pki_cert: + raise ConfigError(f'PEM CA certificate for "{cert_name}" missing in configuration!') + +def verify_pki_dh_parameters(config: dict, dh_name: str, min_key_size: int=0): + """ + Common helper function user by PKI consumers to perform recurring + validation functions on DH parameters + """ + from vyos.pki import load_dh_parameters + + if 'pki' not in config: + raise ConfigError('PKI is not configured!') + + if 'dh' not in config['pki']: + raise ConfigError('PKI does not contain any DH parameters!') + + if dh_name not in config['pki']['dh']: + raise ConfigError(f'DH parameter "{dh_name}" not found in configuration!') + + if min_key_size: + pki_dh = config['pki']['dh'][dh_name] + dh_params = load_dh_parameters(pki_dh['parameters']) + dh_numbers = dh_params.parameter_numbers() + dh_bits = dh_numbers.p.bit_length() + if dh_bits < min_key_size: + raise ConfigError(f'Minimum DH key-size is {min_key_size} bits!') diff --git a/python/vyos/cpu.py b/python/vyos/cpu.py index d2e5f6504..cae5f5f4d 100644 --- a/python/vyos/cpu.py +++ b/python/vyos/cpu.py @@ -1,5 +1,4 @@ -#!/usr/bin/env python3 -# Copyright 2022 VyOS maintainers and contributors <maintainers@vyos.io> +# Copyright (C) 2022-2024 VyOS maintainers and contributors # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public diff --git a/python/vyos/ethtool.py b/python/vyos/ethtool.py index 5e241fc08..d45c9c272 100644 --- a/python/vyos/ethtool.py +++ b/python/vyos/ethtool.py @@ -13,7 +13,6 @@ # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see <http://www.gnu.org/licenses/>. -import os import re from json import loads diff --git a/python/vyos/firewall.py b/python/vyos/firewall.py index e70b4f0d9..946050a82 100644 --- a/python/vyos/firewall.py +++ b/python/vyos/firewall.py @@ -1,6 +1,4 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2021-2023 VyOS maintainers and contributors +# Copyright (C) 2021-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -30,7 +28,6 @@ from vyos.template import is_ipv4 from vyos.template import render from vyos.utils.dict import dict_search_args from vyos.utils.dict import dict_search_recursive -from vyos.utils.process import call from vyos.utils.process import cmd from vyos.utils.process import run @@ -66,7 +63,7 @@ def fqdn_config_parse(firewall): rule = path[4] suffix = path[5][0] set_name = f'{hook_name}_{priority}_{rule}_{suffix}' - + if (path[0] == 'ipv4') and (path[1] == 'forward' or path[1] == 'input' or path[1] == 'output' or path[1] == 'name'): firewall['ip_fqdn'][set_name] = domain elif (path[0] == 'ipv6') and (path[1] == 'forward' or path[1] == 'input' or path[1] == 'output' or path[1] == 'name'): @@ -85,7 +82,7 @@ def fqdn_resolve(fqdn, ipv6=False): def find_nftables_rule(table, chain, rule_matches=[]): # Find rule in table/chain that matches all criteria and return the handle - results = cmd(f'sudo nft -a list chain {table} {chain}').split("\n") + results = cmd(f'sudo nft --handle list chain {table} {chain}').split("\n") for line in results: if all(rule_match in line for rule_match in rule_matches): handle_search = re.search('handle (\d+)', line) @@ -655,7 +652,7 @@ def geoip_update(firewall, force=False): 'ipv6_sets': ipv6_sets }) - result = run(f'nft -f {nftables_geoip_conf}') + result = run(f'nft --file {nftables_geoip_conf}') if result != 0: print('Error: GeoIP failed to update firewall') return False diff --git a/python/vyos/frr.py b/python/vyos/frr.py index c3703cbb4..e7743e9d5 100644 --- a/python/vyos/frr.py +++ b/python/vyos/frr.py @@ -1,4 +1,4 @@ -# Copyright 2020 VyOS maintainers and contributors <maintainers@vyos.io> +# Copyright 2020-2024 VyOS maintainers and contributors <maintainers@vyos.io> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -69,7 +69,6 @@ import tempfile import re from vyos import ConfigError -from vyos.utils.permission import chown from vyos.utils.process import cmd from vyos.utils.process import popen from vyos.utils.process import STDOUT diff --git a/python/vyos/ifconfig/bond.py b/python/vyos/ifconfig/bond.py index 45e6e4c16..c6d0f1cff 100644 --- a/python/vyos/ifconfig/bond.py +++ b/python/vyos/ifconfig/bond.py @@ -1,4 +1,4 @@ -# Copyright 2019-2022 VyOS maintainers and contributors <maintainers@vyos.io> +# Copyright 2019-2024 VyOS maintainers and contributors <maintainers@vyos.io> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -16,7 +16,6 @@ import os from vyos.ifconfig.interface import Interface -from vyos.utils.process import cmd from vyos.utils.dict import dict_search from vyos.utils.assertion import assert_list from vyos.utils.assertion import assert_positive diff --git a/python/vyos/ifconfig/bridge.py b/python/vyos/ifconfig/bridge.py index 7936e3da5..917f962b7 100644 --- a/python/vyos/ifconfig/bridge.py +++ b/python/vyos/ifconfig/bridge.py @@ -13,13 +13,12 @@ # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see <http://www.gnu.org/licenses/>. -from netifaces import interfaces - from vyos.ifconfig.interface import Interface from vyos.utils.assertion import assert_boolean from vyos.utils.assertion import assert_list from vyos.utils.assertion import assert_positive from vyos.utils.dict import dict_search +from vyos.utils.network import interface_exists from vyos.configdict import get_vlan_ids from vyos.configdict import list_diff @@ -314,7 +313,7 @@ class BridgeIf(Interface): # remove interface from bridge tmp = dict_search('member.interface_remove', config) for member in (tmp or []): - if member in interfaces(): + if interface_exists(member): self.del_port(member) # enable/disable VLAN Filter @@ -345,7 +344,7 @@ class BridgeIf(Interface): for interface, interface_config in tmp.items(): # if interface does yet not exist bail out early and # add it later - if interface not in interfaces(): + if not interface_exists(interface): continue # Bridge lower "physical" interface diff --git a/python/vyos/ifconfig/interface.py b/python/vyos/ifconfig/interface.py index 56dcde214..1b86982c4 100644 --- a/python/vyos/ifconfig/interface.py +++ b/python/vyos/ifconfig/interface.py @@ -35,7 +35,6 @@ from vyos.defaults import directories from vyos.template import render from vyos.utils.network import mac2eui64 from vyos.utils.dict import dict_search -from vyos.utils.file import read_file from vyos.utils.network import get_interface_config from vyos.utils.network import get_interface_namespace from vyos.utils.network import is_netns_interface @@ -415,7 +414,7 @@ class Interface(Control): else: nft_del_element = f'delete element inet vrf_zones ct_iface_map {{ "{self.ifname}" }}' # Check if deleting is possible first to avoid raising errors - _, err = self._popen(f'nft -c {nft_del_element}') + _, err = self._popen(f'nft --check {nft_del_element}') if not err: # Remove map element self._cmd(f'nft {nft_del_element}') @@ -1375,15 +1374,19 @@ class Interface(Control): ifname = self.ifname config_base = directories['dhcp6_client_dir'] config_file = f'{config_base}/dhcp6c.{ifname}.conf' + script_file = f'/etc/wide-dhcpv6/dhcp6c.{ifname}.script' # can not live under /run b/c of noexec mount option systemd_override_file = f'/run/systemd/system/dhcp6c@{ifname}.service.d/10-override.conf' systemd_service = f'dhcp6c@{ifname}.service' - # Rendered client configuration files require the apsolute config path - self.config['dhcp6_client_dir'] = directories['dhcp6_client_dir'] + # Rendered client configuration files require additional settings + config = deepcopy(self.config) + config['dhcp6_client_dir'] = directories['dhcp6_client_dir'] + config['dhcp6_script_file'] = script_file - if enable and 'disable' not in self.config: - render(systemd_override_file, 'dhcp-client/ipv6.override.conf.j2', self.config) - render(config_file, 'dhcp-client/ipv6.j2', self.config) + if enable and 'disable' not in config: + render(systemd_override_file, 'dhcp-client/ipv6.override.conf.j2', config) + render(config_file, 'dhcp-client/ipv6.j2', config) + render(script_file, 'dhcp-client/dhcp6c-script.j2', config, permission=0o755) # Reload systemd unit definitons as some options are dynamically generated self._cmd('systemctl daemon-reload') @@ -1396,6 +1399,8 @@ class Interface(Control): self._cmd(f'systemctl stop {systemd_service}') if os.path.isfile(config_file): os.remove(config_file) + if os.path.isfile(script_file): + os.remove(script_file) return None diff --git a/python/vyos/ifconfig/section.py b/python/vyos/ifconfig/section.py index 5e98cd510..50273cf67 100644 --- a/python/vyos/ifconfig/section.py +++ b/python/vyos/ifconfig/section.py @@ -97,7 +97,7 @@ class Section: for ifname in interfaces: ifsection = cls.section(ifname) - if not ifsection: + if not ifsection and not ifname.startswith('vrrp'): continue if section and ifsection != section: diff --git a/python/vyos/ifconfig/vrrp.py b/python/vyos/ifconfig/vrrp.py index fde903a53..ee9336d1a 100644 --- a/python/vyos/ifconfig/vrrp.py +++ b/python/vyos/ifconfig/vrrp.py @@ -1,4 +1,4 @@ -# Copyright 2019-2023 VyOS maintainers and contributors <maintainers@vyos.io> +# Copyright 2019-2024 VyOS maintainers and contributors <maintainers@vyos.io> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -18,7 +18,6 @@ import json import signal from time import time -from time import sleep from tabulate import tabulate from vyos.configquery import ConfigTreeQuery @@ -155,4 +154,3 @@ class VRRP(object): # add to the active list disabled instances groups.extend(cls.disabled()) return(tabulate(groups, headers)) - diff --git a/python/vyos/ifconfig/vxlan.py b/python/vyos/ifconfig/vxlan.py index a2c4aad50..bdb48e303 100644 --- a/python/vyos/ifconfig/vxlan.py +++ b/python/vyos/ifconfig/vxlan.py @@ -1,4 +1,4 @@ -# Copyright 2019-2023 VyOS maintainers and contributors <maintainers@vyos.io> +# Copyright 2019-2024 VyOS maintainers and contributors <maintainers@vyos.io> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -13,9 +13,6 @@ # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see <http://www.gnu.org/licenses/>. -from json import loads - -from vyos import ConfigError from vyos.configdict import list_diff from vyos.ifconfig import Interface from vyos.utils.assertion import assert_list diff --git a/python/vyos/ifconfig/wireguard.py b/python/vyos/ifconfig/wireguard.py index 5704f8b64..5b5f25323 100644 --- a/python/vyos/ifconfig/wireguard.py +++ b/python/vyos/ifconfig/wireguard.py @@ -1,4 +1,4 @@ -# Copyright 2019-2023 VyOS maintainers and contributors <maintainers@vyos.io> +# Copyright 2019-2024 VyOS maintainers and contributors <maintainers@vyos.io> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -25,7 +25,6 @@ from hurry.filesize import alternative from vyos.ifconfig import Interface from vyos.ifconfig import Operational from vyos.template import is_ipv6 -from vyos.base import Warning class WireGuardOperational(Operational): def _dump(self): diff --git a/python/vyos/iflag.py b/python/vyos/iflag.py index 7ff8e5623..3ce73c1bf 100644 --- a/python/vyos/iflag.py +++ b/python/vyos/iflag.py @@ -1,4 +1,4 @@ -# Copyright 2019 VyOS maintainers and contributors <maintainers@vyos.io> +# Copyright 2019-2024 VyOS maintainers and contributors <maintainers@vyos.io> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -13,26 +13,24 @@ # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see <http://www.gnu.org/licenses/>. -from enum import Enum, unique, IntEnum - +from enum import IntEnum class IFlag(IntEnum): """ net/if.h interface flags """ - IFF_UP = 0x1 #: Interface up/down status - IFF_BROADCAST = 0x2 #: Broadcast address valid - IFF_DEBUG = 0x4, #: Debugging - IFF_LOOPBACK = 0x8 #: Is loopback network - IFF_POINTOPOINT = 0x10 #: Is point-to-point link - IFF_NOTRAILERS = 0x20 #: Avoid use of trailers - IFF_RUNNING = 0x40 #: Resources allocated - IFF_NOARP = 0x80 #: No address resolution protocol - IFF_PROMISC = 0x100 #: Promiscuous mode - IFF_ALLMULTI = 0x200 #: Receive all multicast - IFF_MASTER = 0x400 #: Load balancer master - IFF_SLAVE = 0x800 #: Load balancer slave - IFF_MULTICAST = 0x1000 #: Supports multicast - IFF_PORTSEL = 0x2000 #: Media type adjustable - IFF_AUTOMEDIA = 0x4000 #: Automatic media type enabled - IFF_DYNAMIC = 0x8000 #: Is a dial-up device with dynamic address - + IFF_UP = 0x1 #: Interface up/down status + IFF_BROADCAST = 0x2 #: Broadcast address valid + IFF_DEBUG = 0x4, #: Debugging + IFF_LOOPBACK = 0x8 #: Is loopback network + IFF_POINTOPOINT = 0x10 #: Is point-to-point link + IFF_NOTRAILERS = 0x20 #: Avoid use of trailers + IFF_RUNNING = 0x40 #: Resources allocated + IFF_NOARP = 0x80 #: No address resolution protocol + IFF_PROMISC = 0x100 #: Promiscuous mode + IFF_ALLMULTI = 0x200 #: Receive all multicast + IFF_MASTER = 0x400 #: Load balancer master + IFF_SLAVE = 0x800 #: Load balancer slave + IFF_MULTICAST = 0x1000 #: Supports multicast + IFF_PORTSEL = 0x2000 #: Media type adjustable + IFF_AUTOMEDIA = 0x4000 #: Automatic media type enabled + IFF_DYNAMIC = 0x8000 #: Is a dial-up device with dynamic address diff --git a/python/vyos/initialsetup.py b/python/vyos/initialsetup.py index 3b280dc6b..cb6b9e459 100644 --- a/python/vyos/initialsetup.py +++ b/python/vyos/initialsetup.py @@ -1,7 +1,7 @@ # initialsetup -- functions for setting common values in config file, # for use in installation and first boot scripts # -# Copyright (C) 2018-2023 VyOS maintainers and contributors +# Copyright (C) 2018-2024 VyOS maintainers and contributors # # This library is free software; you can redistribute it and/or modify it under the terms of # the GNU Lesser General Public License as published by the Free Software Foundation; @@ -14,8 +14,6 @@ # You should have received a copy of the GNU Lesser General Public License along with this library; # if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -import vyos.configtree - from vyos.utils.auth import make_password_hash from vyos.utils.auth import split_ssh_public_key diff --git a/python/vyos/ioctl.py b/python/vyos/ioctl.py index cfa75aac6..51574c1db 100644 --- a/python/vyos/ioctl.py +++ b/python/vyos/ioctl.py @@ -1,4 +1,4 @@ -# Copyright 2019 VyOS maintainers and contributors <maintainers@vyos.io> +# Copyright 2019-2024 VyOS maintainers and contributors <maintainers@vyos.io> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -13,7 +13,6 @@ # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see <http://www.gnu.org/licenses/>. -import sys import os import socket import fcntl @@ -29,7 +28,7 @@ def get_terminal_size(): def get_interface_flags(intf): """ Pull the SIOCGIFFLAGS """ - nullif = '\0'*256 + nullif = '\0'*256 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) raw = fcntl.ioctl(sock.fileno(), SIOCGIFFLAGS, intf + nullif) flags, = struct.unpack('H', raw[16:18]) diff --git a/python/vyos/kea.py b/python/vyos/kea.py index 89ae7ca81..addfdba49 100644 --- a/python/vyos/kea.py +++ b/python/vyos/kea.py @@ -1,4 +1,4 @@ -# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# Copyright 2023-2024 VyOS maintainers and contributors <maintainers@vyos.io> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -22,7 +22,6 @@ from vyos.template import isc_static_route from vyos.template import netmask_from_cidr from vyos.utils.dict import dict_search_args from vyos.utils.file import file_permissions -from vyos.utils.file import read_file from vyos.utils.process import run kea4_options = { diff --git a/python/vyos/nat.py b/python/vyos/nat.py index da2613b16..2ada29add 100644 --- a/python/vyos/nat.py +++ b/python/vyos/nat.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python3 -# # Copyright (C) 2022 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify diff --git a/python/vyos/pki.py b/python/vyos/pki.py index 02dece471..3c577db4d 100644 --- a/python/vyos/pki.py +++ b/python/vyos/pki.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python3 -# # Copyright (C) 2023-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify diff --git a/python/vyos/progressbar.py b/python/vyos/progressbar.py index 7bc9d9856..8d1042672 100644 --- a/python/vyos/progressbar.py +++ b/python/vyos/progressbar.py @@ -1,4 +1,4 @@ -# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# Copyright 2023-2024 VyOS maintainers and contributors <maintainers@vyos.io> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -17,12 +17,10 @@ import math import os import signal import subprocess -import sys from vyos.utils.io import is_dumb_terminal from vyos.utils.io import print_error - class Progressbar: def __init__(self, step=None): self.total = 0.0 diff --git a/python/vyos/qos/base.py b/python/vyos/qos/base.py index c8e881ee2..f9366c6b1 100644 --- a/python/vyos/qos/base.py +++ b/python/vyos/qos/base.py @@ -90,6 +90,23 @@ class QoSBase: else: return value + def _calc_random_detect_queue_params(self, avg_pkt, max_thr, limit=None, min_thr=None, mark_probability=None): + params = dict() + avg_pkt = int(avg_pkt) + max_thr = int(max_thr) + mark_probability = int(mark_probability) + limit = int(limit) if limit else 4 * max_thr + min_thr = int(min_thr) if min_thr else (9 * max_thr) // 18 + + params['avg_pkt'] = avg_pkt + params['limit'] = limit * avg_pkt + params['min_val'] = min_thr * avg_pkt + params['max_val'] = max_thr * avg_pkt + params['burst'] = (2 * min_thr + max_thr) // 3 + params['probability'] = 1 / mark_probability + + return params + def _build_base_qdisc(self, config : dict, cls_id : int): """ Add/replace qdisc for every class (also default is a class). This is @@ -144,6 +161,18 @@ class QoSBase: elif queue_type == 'random-detect': default_tc += f' red' + qparams = self._calc_random_detect_queue_params( + avg_pkt=dict_search('average_packet', config), + max_thr=dict_search('maximum_threshold', config), + limit=dict_search('queue_limit', config), + min_thr=dict_search('minimum_threshold', config), + mark_probability=dict_search('mark_probability', config) + ) + + default_tc += f' limit {qparams["limit"]} avpkt {qparams["avg_pkt"]}' + default_tc += f' max {qparams["max_val"]} min {qparams["min_val"]}' + default_tc += f' burst {qparams["burst"]} probability {qparams["probability"]}' + self._cmd(default_tc) elif queue_type == 'drop-tail': diff --git a/python/vyos/qos/priority.py b/python/vyos/qos/priority.py index 8182400f9..7f0a67032 100644 --- a/python/vyos/qos/priority.py +++ b/python/vyos/qos/priority.py @@ -1,4 +1,4 @@ -# Copyright 2022 VyOS maintainers and contributors <maintainers@vyos.io> +# Copyright 2022-2024 VyOS maintainers and contributors <maintainers@vyos.io> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -14,7 +14,6 @@ # License along with this library. If not, see <http://www.gnu.org/licenses/>. from vyos.qos.base import QoSBase -from vyos.utils.dict import dict_search class Priority(QoSBase): _parent = 1 diff --git a/python/vyos/range_regex.py b/python/vyos/range_regex.py index a8190d140..81e9d2e7e 100644 --- a/python/vyos/range_regex.py +++ b/python/vyos/range_regex.py @@ -22,7 +22,6 @@ CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. ''' -import math # coding=utf8 @@ -67,7 +66,7 @@ def regex_for_range(min_, max_): min_ = 0 if max_ >= 0: - positive_subpatterns = split_to_patterns(min_, max_) + positive_subpatterns = split_to_patterns(min_, max_) negative_only_subpatterns = ['-' + val for val in negative_subpatterns if val not in positive_subpatterns] positive_only_subpatterns = [val for val in positive_subpatterns if val not in negative_subpatterns] @@ -139,4 +138,4 @@ def range_to_pattern(start, stop): if any_digit_count > 1: pattern += '{{{}}}'.format(any_digit_count) - return pattern
\ No newline at end of file + return pattern diff --git a/python/vyos/system/grub.py b/python/vyos/system/grub.py index e56f0bec8..faf68c2d1 100644 --- a/python/vyos/system/grub.py +++ b/python/vyos/system/grub.py @@ -18,7 +18,6 @@ import platform from pathlib import Path from re import MULTILINE, compile as re_compile from shutil import copy2 -from typing import Union from uuid import uuid5, NAMESPACE_URL, UUID from vyos.template import render @@ -56,7 +55,7 @@ REGEX_KERNEL_CMDLINE: str = r'^BOOT_IMAGE=/(?P<boot_type>boot|live)/((?P<image_v REGEX_GRUB_BOOT_OPTS: str = r'^\s*set boot_opts="(?P<boot_opts>[^$]+)"$' -def install(drive_path: str, boot_dir: str, efi_dir: str, id: str = 'VyOS') -> None: +def install(drive_path: str, boot_dir: str, efi_dir: str, id: str = 'VyOS', chroot : str = "") -> None: """Install GRUB for both BIOS and EFI modes (hybrid boot) Args: @@ -65,17 +64,22 @@ def install(drive_path: str, boot_dir: str, efi_dir: str, id: str = 'VyOS') -> N efi_dir (str): a path to '/boot/efi' directory """ + if chroot: + chroot_cmd = f"chroot {chroot}" + else: + chroot_cmd = "" + efi_installation_arch = "x86_64" if platform.machine() == "aarch64": efi_installation_arch = "arm64" elif platform.machine() == "x86_64": cmd( - f'grub-install --no-floppy --target=i386-pc \ + f'{chroot_cmd} grub-install --no-floppy --target=i386-pc \ --boot-directory={boot_dir} {drive_path} --force' ) cmd( - f'grub-install --no-floppy --recheck --target={efi_installation_arch}-efi \ + f'{chroot_cmd} grub-install --no-floppy --recheck --target={efi_installation_arch}-efi \ --force-extra-removable --boot-directory={boot_dir} \ --efi-directory={efi_dir} --bootloader-id="{id}" \ --no-uefi-secure-boot' diff --git a/python/vyos/template.py b/python/vyos/template.py index bde8e3554..ac77e8a3d 100644 --- a/python/vyos/template.py +++ b/python/vyos/template.py @@ -1,4 +1,4 @@ -# Copyright 2019-2023 VyOS maintainers and contributors <maintainers@vyos.io> +# Copyright 2019-2024 VyOS maintainers and contributors <maintainers@vyos.io> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -32,8 +32,21 @@ _TESTS = {} # reuse Environments with identical settings to improve performance @functools.lru_cache(maxsize=2) def _get_environment(location=None): + from os import getenv + if location is None: - loc_loader=FileSystemLoader(directories["templates"]) + # Sometimes functions that rely on templates need to be executed outside of VyOS installations: + # for example, installer functions are executed for image builds, + # and anything may be invoked for testing from a developer's machine. + # This environment variable allows running any unmodified code + # with a custom template location. + location_env_var = getenv("VYOS_TEMPLATE_DIR") + if location_env_var: + print(f"Using environment variable {location_env_var}") + template_dir = location_env_var + else: + template_dir = directories["templates"] + loc_loader=FileSystemLoader(template_dir) else: loc_loader=FileSystemLoader(location) env = Environment( @@ -294,7 +307,8 @@ def network_from_ipv4(address): @register_filter('is_interface') def is_interface(interface): """ Check if parameter is a valid local interface name """ - return os.path.exists(f'/sys/class/net/{interface}') + from vyos.utils.network import interface_exists + return interface_exists(interface) @register_filter('is_ip') def is_ip(addr): @@ -794,7 +808,7 @@ def kea_address_json(addresses): out = [] for address in addresses: - ifname = is_addr_assigned(address, return_ifname=True) + ifname = is_addr_assigned(address, return_ifname=True, include_vrf=True) if not ifname: continue @@ -809,10 +823,19 @@ def kea_high_availability_json(config): source_addr = config['source_address'] remote_addr = config['remote'] + ha_mode = 'hot-standby' if config['mode'] == 'active-passive' else 'load-balancing' + ha_role = config['status'] + + if ha_role == 'primary': + peer1_role = 'primary' + peer2_role = 'standby' if ha_mode == 'hot-standby' else 'secondary' + else: + peer1_role = 'standby' if ha_mode == 'hot-standby' else 'secondary' + peer2_role = 'primary' data = { 'this-server-name': os.uname()[1], - 'mode': 'hot-standby', + 'mode': ha_mode, 'heartbeat-delay': 10000, 'max-response-delay': 10000, 'max-ack-delay': 5000, @@ -821,13 +844,13 @@ def kea_high_availability_json(config): { 'name': os.uname()[1], 'url': f'http://{source_addr}:647/', - 'role': 'standby' if config['status'] == 'secondary' else 'primary', + 'role': peer1_role, 'auto-failover': True }, { 'name': config['name'], 'url': f'http://{remote_addr}:647/', - 'role': 'primary' if config['status'] == 'secondary' else 'standby', + 'role': peer2_role, 'auto-failover': True }] } diff --git a/python/vyos/tpm.py b/python/vyos/tpm.py index f120e10c4..b9f28546f 100644 --- a/python/vyos/tpm.py +++ b/python/vyos/tpm.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python3 -# # Copyright (C) 2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify diff --git a/python/vyos/utils/io.py b/python/vyos/utils/io.py index 0afaf695c..7e6045291 100644 --- a/python/vyos/utils/io.py +++ b/python/vyos/utils/io.py @@ -13,7 +13,7 @@ # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see <http://www.gnu.org/licenses/>. -from typing import Callable +from typing import Callable, Optional def print_error(str='', end='\n'): """ @@ -81,7 +81,8 @@ def is_dumb_terminal(): return os.getenv('TERM') in ['vt100', 'dumb'] def select_entry(l: list, list_msg: str = '', prompt_msg: str = '', - list_format: Callable = None,) -> str: + list_format: Optional[Callable] = None, + default_entry: Optional[int] = None) -> str: """Select an entry from a list Args: @@ -99,6 +100,9 @@ def select_entry(l: list, list_msg: str = '', prompt_msg: str = '', print(f'\t{i}: {list_format(e)}') else: print(f'\t{i}: {e}') - select = ask_input(prompt_msg, numeric_only=True, - valid_responses=range(1, len(l)+1)) + valid_entry = range(1, len(l)+1) + if default_entry and default_entry not in valid_entry: + default_entry = None + select = ask_input(prompt_msg, default=default_entry, numeric_only=True, + valid_responses=valid_entry) return next(filter(lambda x: x[0] == select, en))[1] diff --git a/python/vyos/utils/network.py b/python/vyos/utils/network.py index cac59475d..829124b57 100644 --- a/python/vyos/utils/network.py +++ b/python/vyos/utils/network.py @@ -310,7 +310,7 @@ def is_ipv6_link_local(addr): return False -def is_addr_assigned(ip_address, vrf=None, return_ifname=False) -> bool | str: +def is_addr_assigned(ip_address, vrf=None, return_ifname=False, include_vrf=False) -> bool | str: """ Verify if the given IPv4/IPv6 address is assigned to any interface """ from netifaces import interfaces from vyos.utils.network import get_interface_config @@ -321,7 +321,7 @@ def is_addr_assigned(ip_address, vrf=None, return_ifname=False) -> bool | str: # case there is no need to proceed with this data set - continue loop # with next element tmp = get_interface_config(interface) - if dict_search('master', tmp) != vrf: + if dict_search('master', tmp) != vrf and not include_vrf: continue if is_intf_addr_assigned(interface, ip_address): diff --git a/python/vyos/utils/system.py b/python/vyos/utils/system.py index 5d41c0c05..55813a5f7 100644 --- a/python/vyos/utils/system.py +++ b/python/vyos/utils/system.py @@ -79,13 +79,6 @@ def sysctl_apply(sysctl_dict: dict[str, str], revert: bool = True) -> bool: # everything applied return True -def get_half_cpus(): - """ return 1/2 of the numbers of available CPUs """ - cpu = os.cpu_count() - if cpu > 1: - cpu /= 2 - return int(cpu) - def find_device_file(device): """ Recurively search /dev for the given device file and return its full path. If no device file was found 'None' is returned """ diff --git a/python/vyos/version.py b/python/vyos/version.py index 1c5651c83..b5ed2705b 100644 --- a/python/vyos/version.py +++ b/python/vyos/version.py @@ -1,4 +1,4 @@ -# Copyright 2017-2023 VyOS maintainers and contributors <maintainers@vyos.io> +# Copyright 2017-2024 VyOS maintainers and contributors <maintainers@vyos.io> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -30,7 +30,7 @@ Example of the version data dict:: """ import os -import json + import requests import vyos.defaults @@ -40,10 +40,8 @@ from vyos.utils.process import popen from vyos.utils.process import run from vyos.utils.process import DEVNULL - version_file = os.path.join(vyos.defaults.directories['data'], 'version.json') - def get_version_data(fname=version_file): """ Get complete version data diff --git a/python/vyos/xml/.gitignore b/python/vyos/xml/.gitignore deleted file mode 100644 index e934adfd1..000000000 --- a/python/vyos/xml/.gitignore +++ /dev/null @@ -1 +0,0 @@ -cache/ diff --git a/python/vyos/xml/__init__.py b/python/vyos/xml/__init__.py deleted file mode 100644 index 6db446a40..000000000 --- a/python/vyos/xml/__init__.py +++ /dev/null @@ -1,65 +0,0 @@ -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This library is free software; you can redistribute it and/or modify it under the terms of -# the GNU Lesser General Public License as published by the Free Software Foundation; -# either version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; -# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. -# See the GNU Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public License along with this library; -# if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - - -from vyos.xml import definition -from vyos.xml import load -from vyos.xml import kw - - -def load_configuration(cache=[]): - if cache: - return cache[0] - - xml = definition.XML() - - try: - from vyos.xml.cache import configuration - xml.update(configuration.definition) - cache.append(xml) - except Exception: - xml = definition.XML() - print('no xml configuration cache') - xml.update(load.xml(load.configuration_definition)) - - return xml - - -# def is_multi(lpath): -# return load_configuration().is_multi(lpath) - - -def is_tag(lpath): - return load_configuration().is_tag(lpath) - - -def is_leaf(lpath, flat=True): - return load_configuration().is_leaf(lpath, flat) - -def component_version(): - return load_configuration().component_version() - -def defaults(lpath, flat=False): - return load_configuration().defaults(lpath, flat) - - -def multi_to_list(lpath, conf): - return load_configuration().multi_to_list(lpath, conf) - - -if __name__ == '__main__': - print(defaults(['service'], flat=True)) - print(defaults(['service'], flat=False)) - - print(is_tag(["system", "login", "user", "vyos", "authentication", "public-keys"])) - print(is_tag(['protocols', 'static', 'multicast', 'route', '0.0.0.0/0', 'next-hop'])) diff --git a/python/vyos/xml/cache/__init__.py b/python/vyos/xml/cache/__init__.py deleted file mode 100644 index e69de29bb..000000000 --- a/python/vyos/xml/cache/__init__.py +++ /dev/null diff --git a/python/vyos/xml/definition.py b/python/vyos/xml/definition.py deleted file mode 100644 index bc3892b42..000000000 --- a/python/vyos/xml/definition.py +++ /dev/null @@ -1,360 +0,0 @@ -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This library is free software; you can redistribute it and/or modify it under the terms of -# the GNU Lesser General Public License as published by the Free Software Foundation; -# either version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; -# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. -# See the GNU Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public License along with this library; -# if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - -from vyos.xml import kw - -# As we index by key, the name is first and then the data: -# {'dummy': { -# '[node]': '[tagNode]', -# 'address': { ... } -# } } - -# so when we encounter a tagNode, we are really encountering -# the tagNode data. - - -class XML(dict): - def __init__(self): - self[kw.tree] = {} - self[kw.priorities] = {} - self[kw.owners] = {} - self[kw.default] = {} - self[kw.tags] = [] - self[kw.component_version] = {} - - dict.__init__(self) - - self.tree = self[kw.tree] - # the options which matched the last incomplete world we had - # or the last word in a list - self.options = [] - # store all the part of the command we processed - self.inside = [] - # should we check the data pass with the constraints - self.check = False - # are we still typing a word - self.filling = False - # do what have the tagNode value ? - self.filled = False - # last word seen - self.word = '' - # do we have all the data we want ? - self.final = False - # do we have too much data ? - self.extra = False - # what kind of node are we in plain vs data not - self.plain = True - - def reset(self): - self.tree = self[kw.tree] - self.options = [] - self.inside = [] - self.check = False - self.filling = False - self.filled = False - self.word = '' - self.final = False - self.extra = False - self.plain = True - - # from functools import lru_cache - # @lru_cache(maxsize=100) - # XXX: need to use cachetool instead - for later - - def traverse(self, cmd): - self.reset() - - # using split() intead of split(' ') eats the final ' ' - words = cmd.split(' ') - passed = [] - word = '' - data_node = False - space = False - - while words: - word = words.pop(0) - space = word == '' - perfect = False - if word in self.tree: - passed = [] - perfect = True - self.tree = self.tree[word] - data_node = self.tree[kw.node] - self.inside.append(word) - word = '' - continue - if word and data_node: - passed.append(word) - - is_valueless = self.tree.get(kw.valueless, False) - is_leafNode = data_node == kw.leafNode - is_dataNode = data_node in (kw.leafNode, kw.tagNode) - named_options = [_ for _ in self.tree if not kw.found(_)] - - if is_leafNode: - self.final = is_valueless or len(passed) > 0 - self.extra = is_valueless and len(passed) > 0 - self.check = len(passed) >= 1 - else: - self.final = False - self.extra = False - self.check = len(passed) == 1 and not space - - if self.final: - self.word = ' '.join(passed) - else: - self.word = word - - if self.final: - self.filling = True - else: - self.filling = not perfect and bool(cmd and word != '') - - self.filled = self.final or (is_dataNode and len(passed) > 0 and word == '') - - if is_dataNode and len(passed) == 0: - self.options = [] - elif word: - if data_node != kw.plainNode or len(passed) == 1: - self.options = [_ for _ in self.tree if _.startswith(word)] - self.options.sort() - else: - self.options = [] - else: - self.options = named_options - self.options.sort() - - self.plain = not is_dataNode - - # self.debug() - - return self.word - - def speculate(self): - if len(self.options) == 1: - self.tree = self.tree[self.options[0]] - self.word = '' - if self.tree.get(kw.node,'') not in (kw.tagNode, kw.leafNode): - self.options = [_ for _ in self.tree if not kw.found(_)] - self.options.sort() - - def checks(self, cmd): - # as we move thought the named node twice - # the first time we get the data with the node - # and the second with the pass parameters - xml = self[kw.tree] - - words = cmd.split(' ') - send = True - last = [] - while words: - word = words.pop(0) - if word in xml: - xml = xml[word] - send = True - last = [] - continue - if xml[kw.node] in (kw.tagNode, kw.leafNode): - if kw.constraint in xml: - if send: - yield (word, xml[kw.constraint]) - send = False - else: - last.append((word, None)) - if len(last) >= 2: - yield last[0] - - def summary(self): - yield ('enter', '[ summary ]', str(self.inside)) - - if kw.help not in self.tree: - yield ('skip', '[ summary ]', str(self.inside)) - return - - if self.filled: - return - - yield('', '', '\nHelp:') - - if kw.help in self.tree: - summary = self.tree[kw.help].get(kw.summary) - values = self.tree[kw.help].get(kw.valuehelp, []) - if summary: - yield(summary, '', '') - for value in values: - yield(value[kw.format], value[kw.description], '') - - def constraint(self): - yield ('enter', '[ constraint ]', str(self.inside)) - - if kw.help in self.tree: - yield ('skip', '[ constraint ]', str(self.inside)) - return - if kw.error not in self.tree: - yield ('skip', '[ constraint ]', str(self.inside)) - return - if not self.word or self.filling: - yield ('skip', '[ constraint ]', str(self.inside)) - return - - yield('', '', '\nData Constraint:') - - yield('', 'constraint', str(self.tree[kw.error])) - - def listing(self): - yield ('enter', '[ listing ]', str(self.inside)) - - # only show the details when we passed the tagNode data - if not self.plain and not self.filled: - yield ('skip', '[ listing ]', str(self.inside)) - return - - yield('', '', '\nPossible completions:') - - options = list(self.tree.keys()) - options.sort() - for option in options: - if kw.found(option): - continue - if not option.startswith(self.word): - continue - inner = self.tree[option] - prefix = '+> ' if inner.get(kw.node, '') != kw.leafNode else ' ' - if kw.help in inner: - yield (prefix + option, inner[kw.help].get(kw.summary), '') - else: - yield (prefix + option, '(no help available)', '') - - def debug(self): - print('------') - print("word '%s'" % self.word) - print("filling " + str(self.filling)) - print("filled " + str(self.filled)) - print("final " + str(self.final)) - print("extra " + str(self.extra)) - print("plain " + str(self.plain)) - print("options " + str(self.options)) - - # from functools import lru_cache - # @lru_cache(maxsize=100) - # XXX: need to use cachetool instead - for later - - def component_version(self) -> dict: - d = {} - for k in sorted(self[kw.component_version]): - d[k] = int(self[kw.component_version][k]) - return d - - def defaults(self, lpath, flat): - d = self[kw.default] - for k in lpath: - d = d.get(k, {}) - - if not flat: - # _flatten will make this conversion - d = self.multi_to_list(lpath, d, defaults=True) - - r = {} - for k in d: - under = k.replace('-','_') - if isinstance(d[k],dict): - r[under] = self.defaults(lpath + [k], flat) - continue - r[under] = d[k] - return r - - def _flatten(inside, index, d): - r = {} - local = inside[index:] - prefix = '_'.join(_.replace('-','_') for _ in local) + '_' if local else '' - for k in d: - under = prefix + k.replace('-','_') - level = inside + [k] - if isinstance(d[k],dict): - r.update(_flatten(level, index, d[k])) - continue - if self.is_multi(level, with_tag=False): - r[under] = [_.strip() for _ in d[k].split(',')] - continue - r[under] = d[k] - return r - - return _flatten(lpath, len(lpath), d) - - def multi_to_list(self, lpath, conf, defaults=False): - r = {} - for k in conf: - # key mangling could also be done here - # it would prevent two parsing of the config tree - # under = k.replace('-','_') - under = k - fpath = lpath + [k] - if isinstance(conf[k],dict): - r[under] = self.multi_to_list(fpath, conf[k], defaults) - continue - value = conf[k] - if self.is_multi(fpath) and not isinstance(value, list): - if not defaults: - value = [value] - else: - value = value.split(' ') - r[under] = value - return r - - # from functools import lru_cache - # @lru_cache(maxsize=100) - # XXX: need to use cachetool instead - for later - - def _tree(self, lpath, with_tag=True): - """ - returns the part of the tree searched or None if it does not exists - if with_tag is set, this is a configuration path (with tagNode names) - and tag name will be removed from the path when traversing the tree - """ - tree = self[kw.tree] - spath = lpath.copy() - while spath: - p = spath.pop(0) - if p not in tree: - return None - tree = tree[p] - if with_tag and spath and tree[kw.node] == kw.tagNode: - spath.pop(0) - return tree - - def _get(self, lpath, tag, with_tag=True): - tree = self._tree(lpath, with_tag) - if tree is None: - return None - return tree.get(tag, None) - - def is_multi(self, lpath, with_tag=True): - tree = self._get(lpath, kw.multi, with_tag) - if tree is None: - return None - return tree is True - - def is_tag(self, lpath, with_tag=True): - tree = self._get(lpath, kw.node, with_tag) - if tree is None: - return None - return tree == kw.tagNode - - def is_leaf(self, lpath, with_tag=True): - tree = self._get(lpath, kw.node, with_tag) - if tree is None: - return None - return tree == kw.leafNode - - def exists(self, lpath, with_tag=True): - return self._get(lpath, kw.node, with_tag) is not None diff --git a/python/vyos/xml/generate.py b/python/vyos/xml/generate.py deleted file mode 100755 index dfbbadd74..000000000 --- a/python/vyos/xml/generate.py +++ /dev/null @@ -1,70 +0,0 @@ - -#!/usr/bin/env python3 - -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This library is free software; you can redistribute it and/or modify it under the terms of -# the GNU Lesser General Public License as published by the Free Software Foundation; -# either version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; -# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. -# See the GNU Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public License along with this library; -# if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - -import os -import sys -import pprint -import argparse - -from vyos.xml import kw -from vyos.xml import load - - -# import json -# def save_json(fname, loaded): -# with open(fname, 'w') as w: -# print(f'saving {fname}') -# w.write(json.dumps(loaded)) - - -def save_dict(fname, loaded): - with open(fname, 'w') as w: - print(f'saving {fname}') - w.write(f'# generated by {__file__}\n\n') - w.write('definition = ') - w.write(str(loaded)) - - -def main(): - parser = argparse.ArgumentParser(description='generate python file from xml defintions') - parser.add_argument('--conf-folder', type=str, default=load.configuration_definition, help='XML interface definition folder') - parser.add_argument('--conf-cache', type=str, default=load.configuration_cache, help='python file with the conf mode dict') - - # parser.add_argument('--op-folder', type=str, default=load.operational_definition, help='XML interface definition folder') - # parser.add_argument('--op-cache', type=str, default=load.operational_cache, help='python file with the conf mode dict') - - parser.add_argument('--dry', action='store_true', help='dry run, print to screen') - - args = parser.parse_args() - - if os.path.exists(load.configuration_cache): - os.remove(load.configuration_cache) - # if os.path.exists(load.operational_cache): - # os.remove(load.operational_cache) - - conf = load.xml(args.conf_folder) - # op = load.xml(args.op_folder) - - if args.dry: - pprint.pprint(conf) - return - - save_dict(args.conf_cache, conf) - # save_dict(args.op_cache, op) - - -if __name__ == '__main__': - main() diff --git a/python/vyos/xml/kw.py b/python/vyos/xml/kw.py deleted file mode 100644 index 48226ce96..000000000 --- a/python/vyos/xml/kw.py +++ /dev/null @@ -1,83 +0,0 @@ -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This library is free software; you can redistribute it and/or modify it under the terms of -# the GNU Lesser General Public License as published by the Free Software Foundation; -# either version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; -# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. -# See the GNU Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public License along with this library; -# if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - -# all named used as key (keywords) in this module are defined here. -# using variable name will allow the linter to warn on typos -# it separates our dict syntax from the xmldict one, making it easy to change - -# we are redefining a python keyword "list" for ease - - -def found(word): - """ - is the word following the format for a keyword - """ - return word and word[0] == '[' and word[-1] == ']' - - -# root - -tree = '[tree]' -priorities = '[priorities]' -owners = '[owners]' -tags = '[tags]' -default = '[default]' -component_version = '[component_version]' - -# nodes - -node = '[node]' - -plainNode = '[plainNode]' -leafNode = '[leafNode]' -tagNode = '[tagNode]' - -owner = '[owner]' - -valueless = '[valueless]' -multi = '[multi]' -hidden = '[hidden]' - -# properties - -priority = '[priority]' - -completion = '[completion]' -list = '[list]' -script = '[script]' -path = '[path]' - -# help - -help = '[help]' - -summary = '[summary]' - -valuehelp = '[valuehelp]' -format = 'format' -description = 'description' - -# constraint - -constraint = '[constraint]' -name = '[name]' - -regex = '[regex]' -validator = '[validator]' -argument = '[argument]' - -error = '[error]' - -# created - -node = '[node]' diff --git a/python/vyos/xml/load.py b/python/vyos/xml/load.py deleted file mode 100644 index f842ff9ce..000000000 --- a/python/vyos/xml/load.py +++ /dev/null @@ -1,300 +0,0 @@ -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This library is free software; you can redistribute it and/or modify it under the terms of -# the GNU Lesser General Public License as published by the Free Software Foundation; -# either version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; -# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. -# See the GNU Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public License along with this library; -# if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - -import glob - -from os.path import join -from os.path import abspath -from os.path import dirname - -import xmltodict - -from vyos import debug -from vyos.xml import kw -from vyos.xml import definition - - -# where the files are located - -_here = dirname(__file__) - -configuration_definition = abspath(join(_here, '..', '..' ,'..', 'interface-definitions')) -configuration_cache = abspath(join(_here, 'cache', 'configuration.py')) - -operational_definition = abspath(join(_here, '..', '..' ,'..', 'op-mode-definitions')) -operational_cache = abspath(join(_here, 'cache', 'operational.py')) - - -# This code is only ran during the creation of the debian package -# therefore we accept that failure can be fatal and not handled -# gracefully. - - -def _fatal(debug_info=''): - """ - raise a RuntimeError or if in developer mode stop the code - """ - if not debug.enabled('developer'): - raise RuntimeError(str(debug_info)) - - if debug_info: - print(debug_info) - breakpoint() - - -def _safe_update(dict1, dict2): - """ - return a dict made of two, raise if any root key would be overwritten - """ - if set(dict1).intersection(dict2): - raise RuntimeError('overlapping configuration') - return {**dict1, **dict2} - - -def _merge(dict1, dict2): - """ - merge dict2 in to dict1 and return it - """ - for k in list(dict2): - if k not in dict1: - dict1[k] = dict2[k] - continue - if isinstance(dict1[k], dict) and isinstance(dict2[k], dict): - dict1[k] = _merge(dict1[k], dict2[k]) - elif isinstance(dict1[k], list) and isinstance(dict2[k], list): - dict1[k].extend(dict2[k]) - elif dict1[k] == dict2[k]: - continue - else: - dict1[k] = dict2[k] - return dict1 - - -def _include(fname, folder=''): - """ - return the content of a file, including any file referenced with a #include - """ - if not folder: - folder = dirname(fname) - content = '' - with open(fname, 'r') as r: - for line in r.readlines(): - if '#include' in line: - content += _include(join(folder,line.strip()[10:-1]), folder) - continue - content += line - return content - - -def _format_nodes(inside, conf, xml): - r = {} - while conf: - nodetype = '' - nodename = '' - if 'node' in conf.keys(): - nodetype = 'node' - nodename = kw.plainNode - elif 'leafNode' in conf.keys(): - nodetype = 'leafNode' - nodename = kw.leafNode - elif 'tagNode' in conf.keys(): - nodetype = 'tagNode' - nodename = kw.tagNode - elif 'syntaxVersion' in conf.keys(): - sv = conf.pop('syntaxVersion') - if isinstance(sv, list): - for v in sv: - xml[kw.component_version][v['@component']] = v['@version'] - else: - xml[kw.component_version][sv['@component']] = sv['@version'] - continue - else: - _fatal(conf.keys()) - - nodes = conf.pop(nodetype) - if isinstance(nodes, list): - for node in nodes: - name = node.pop('@name') - into = inside + [name] - if name in r: - _merge(r[name], _format_node(into, node, xml)) - else: - r[name] = _format_node(into, node, xml) - r[name][kw.node] = nodename - xml[kw.tags].append(' '.join(into)) - else: - node = nodes - name = node.pop('@name') - into = inside + [name] - if name in r: - _merge(r[name], _format_node(inside + [name], node, xml)) - else: - r[name] = _format_node(inside + [name], node, xml) - r[name][kw.node] = nodename - xml[kw.tags].append(' '.join(into)) - return r - - -def _set_validator(r, validator): - v = {} - while validator: - if '@name' in validator: - v[kw.name] = validator.pop('@name') - elif '@argument' in validator: - v[kw.argument] = validator.pop('@argument') - else: - _fatal(validator) - r[kw.constraint][kw.validator].append(v) - - -def _format_node(inside, conf, xml): - r = { - kw.valueless: False, - kw.multi: False, - kw.hidden: False, - } - - if '@owner' in conf: - owner = conf.pop('@owner', '') - r[kw.owner] = owner - xml[kw.owners][' '.join(inside)] = owner - - while conf: - keys = conf.keys() - if 'children' in keys: - children = conf.pop('children') - - if isinstance(conf, list): - for child in children: - _merge(r, _format_nodes(inside, child, xml)) - else: - child = children - _merge(r, _format_nodes(inside, child, xml)) - - elif 'properties' in keys: - properties = conf.pop('properties') - - while properties: - if 'help' in properties: - helpname = properties.pop('help') - r[kw.help] = {} - r[kw.help][kw.summary] = helpname - - elif 'valueHelp' in properties: - valuehelps = properties.pop('valueHelp') - if kw.valuehelp in r[kw.help]: - _fatal(valuehelps) - r[kw.help][kw.valuehelp] = [] - if isinstance(valuehelps, list): - for valuehelp in valuehelps: - r[kw.help][kw.valuehelp].append(dict(valuehelp)) - else: - valuehelp = valuehelps - r[kw.help][kw.valuehelp].append(dict(valuehelp)) - - elif 'constraint' in properties: - constraint = properties.pop('constraint') - r[kw.constraint] = {} - while constraint: - if 'regex' in constraint: - regexes = constraint.pop('regex') - if kw.regex in kw.constraint: - _fatal(regexes) - r[kw.constraint][kw.regex] = [] - if isinstance(regexes, list): - r[kw.constraint][kw.regex] = [] - for regex in regexes: - r[kw.constraint][kw.regex].append(regex) - else: - regex = regexes - r[kw.constraint][kw.regex].append(regex) - elif 'validator' in constraint: - validators = constraint.pop('validator') - if kw.validator in r[kw.constraint]: - _fatal(validators) - r[kw.constraint][kw.validator] = [] - if isinstance(validators, list): - for validator in validators: - _set_validator(r, validator) - else: - validator = validators - _set_validator(r, validator) - else: - _fatal(constraint) - - elif 'constraintGroup' in properties: - properties.pop('constraintGroup') - - elif 'constraintErrorMessage' in properties: - r[kw.error] = properties.pop('constraintErrorMessage') - - elif 'valueless' in properties: - properties.pop('valueless') - r[kw.valueless] = True - - elif 'multi' in properties: - properties.pop('multi') - r[kw.multi] = True - - elif 'hidden' in properties: - properties.pop('hidden') - r[kw.hidden] = True - - elif 'completionHelp' in properties: - completionHelp = properties.pop('completionHelp') - r[kw.completion] = {} - while completionHelp: - if 'list' in completionHelp: - r[kw.completion][kw.list] = completionHelp.pop('list') - elif 'script' in completionHelp: - r[kw.completion][kw.script] = completionHelp.pop('script') - elif 'path' in completionHelp: - r[kw.completion][kw.path] = completionHelp.pop('path') - else: - _fatal(completionHelp.keys()) - - elif 'priority' in properties: - priority = int(properties.pop('priority')) - r[kw.priority] = priority - xml[kw.priorities].setdefault(priority, []).append(' '.join(inside)) - - else: - _fatal(properties.keys()) - - elif 'defaultValue' in keys: - default = conf.pop('defaultValue') - x = xml[kw.default] - for k in inside[:-1]: - x = x.setdefault(k,{}) - x[inside[-1]] = '' if default is None else default - - else: - _fatal(conf) - - return r - - -def xml(folder): - """ - read all the xml in the folder - """ - xml = definition.XML() - for fname in glob.glob(f'{folder}/*.xml.in'): - parsed = xmltodict.parse(_include(fname)) - formated = _format_nodes([], parsed['interfaceDefinition'], xml) - _merge(xml[kw.tree], formated) - # fix the configuration root node for completion - # as we moved all the name "up" the chain to use them as index. - xml[kw.tree][kw.node] = kw.plainNode - # XXX: do the others - return xml diff --git a/python/vyos/xml/test_xml.py b/python/vyos/xml/test_xml.py deleted file mode 100644 index 3a6f0132d..000000000 --- a/python/vyos/xml/test_xml.py +++ /dev/null @@ -1,279 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -# -# - -import os -import unittest -from unittest import TestCase, mock - -from vyos.xml import load_configuration - -import sys - - -class TestSearch(TestCase): - def setUp(self): - self.xml = load_configuration() - - def test_(self): - last = self.xml.traverse("") - self.assertEqual(last, '') - self.assertEqual(self.xml.inside, []) - self.assertEqual(self.xml.options, ['firewall', 'high-availability', 'interfaces', 'nat', 'protocols', 'service', 'system', 'vpn', 'vrf']) - self.assertEqual(self.xml.filling, False) - self.assertEqual(self.xml.word, last) - self.assertEqual(self.xml.check, False) - self.assertEqual(self.xml.final, False) - self.assertEqual(self.xml.extra, False) - self.assertEqual(self.xml.filled, False) - self.assertEqual(self.xml.plain, True) - - def test_i(self): - last = self.xml.traverse("i") - self.assertEqual(last, 'i') - self.assertEqual(self.xml.inside, []) - self.assertEqual(self.xml.options, ['interfaces']) - self.assertEqual(self.xml.filling, True) - self.assertEqual(self.xml.word, last) - self.assertEqual(self.xml.check, False) - self.assertEqual(self.xml.final, False) - self.assertEqual(self.xml.extra, False) - self.assertEqual(self.xml.filled, False) - self.assertEqual(self.xml.plain, True) - - def test_interfaces(self): - last = self.xml.traverse("interfaces") - self.assertEqual(last, '') - self.assertEqual(self.xml.inside, ['interfaces']) - self.assertEqual(self.xml.options, ['bonding', 'bridge', 'dummy', 'ethernet', 'geneve', 'l2tpv3', 'loopback', 'macsec', 'openvpn', 'pppoe', 'pseudo-ethernet', 'tunnel', 'vxlan', 'wireguard', 'wireless', 'wwan']) - self.assertEqual(self.xml.filling, False) - self.assertEqual(self.xml.word, '') - self.assertEqual(self.xml.check, False) - self.assertEqual(self.xml.final, False) - self.assertEqual(self.xml.extra, False) - self.assertEqual(self.xml.filled, False) - self.assertEqual(self.xml.plain, True) - - def test_interfaces_space(self): - last = self.xml.traverse("interfaces ") - self.assertEqual(last, '') - self.assertEqual(self.xml.inside, ['interfaces']) - self.assertEqual(self.xml.options, ['bonding', 'bridge', 'dummy', 'ethernet', 'geneve', 'l2tpv3', 'loopback', 'macsec', 'openvpn', 'pppoe', 'pseudo-ethernet', 'tunnel', 'vxlan', 'wireguard', 'wireless', 'wwan']) - self.assertEqual(self.xml.filling, False) - self.assertEqual(self.xml.word, last) - self.assertEqual(self.xml.check, False) - self.assertEqual(self.xml.final, False) - self.assertEqual(self.xml.extra, False) - self.assertEqual(self.xml.filled, False) - self.assertEqual(self.xml.plain, True) - - def test_interfaces_w(self): - last = self.xml.traverse("interfaces w") - self.assertEqual(last, 'w') - self.assertEqual(self.xml.inside, ['interfaces']) - self.assertEqual(self.xml.options, ['wireguard', 'wireless', 'wwan']) - self.assertEqual(self.xml.filling, True) - self.assertEqual(self.xml.word, last) - self.assertEqual(self.xml.check, True) - self.assertEqual(self.xml.final, False) - self.assertEqual(self.xml.extra, False) - self.assertEqual(self.xml.filled, False) - self.assertEqual(self.xml.plain, True) - - def test_interfaces_ethernet(self): - last = self.xml.traverse("interfaces ethernet") - self.assertEqual(last, '') - self.assertEqual(self.xml.inside, ['interfaces', 'ethernet']) - self.assertEqual(self.xml.options, []) - self.assertEqual(self.xml.filling, False) - self.assertEqual(self.xml.word, '') - self.assertEqual(self.xml.check, False) - self.assertEqual(self.xml.final, False) - self.assertEqual(self.xml.extra, False) - self.assertEqual(self.xml.filled, False) - self.assertEqual(self.xml.plain, False) - - def test_interfaces_ethernet_space(self): - last = self.xml.traverse("interfaces ethernet ") - self.assertEqual(last, '') - self.assertEqual(self.xml.inside, ['interfaces', 'ethernet']) - self.assertEqual(self.xml.options, []) - self.assertEqual(self.xml.filling, False) - self.assertEqual(self.xml.word, '') - self.assertEqual(self.xml.check, False) - self.assertEqual(self.xml.final, False) - self.assertEqual(self.xml.extra, False) - self.assertEqual(self.xml.filled, False) - self.assertEqual(self.xml.plain, False) - - def test_interfaces_ethernet_e(self): - last = self.xml.traverse("interfaces ethernet e") - self.assertEqual(last, 'e') - self.assertEqual(self.xml.inside, ['interfaces', 'ethernet']) - self.assertEqual(self.xml.options, []) - self.assertEqual(self.xml.filling, True) - self.assertEqual(self.xml.word, last) - self.assertEqual(self.xml.check, True) - self.assertEqual(self.xml.final, False) - self.assertEqual(self.xml.extra, False) - self.assertEqual(self.xml.filled, False) - self.assertEqual(self.xml.plain, False) - - def test_interfaces_la(self): - last = self.xml.traverse("interfaces ethernet la") - self.assertEqual(last, 'la') - self.assertEqual(self.xml.inside, ['interfaces', 'ethernet']) - self.assertEqual(self.xml.options, []) - self.assertEqual(self.xml.filling, True) - self.assertEqual(self.xml.word, last) - self.assertEqual(self.xml.check, True) - self.assertEqual(self.xml.final, False) - self.assertEqual(self.xml.extra, False) - self.assertEqual(self.xml.filled, False) - self.assertEqual(self.xml.plain, False) - - def test_interfaces_ethernet_lan0(self): - last = self.xml.traverse("interfaces ethernet lan0") - self.assertEqual(last, 'lan0') - self.assertEqual(self.xml.inside, ['interfaces', 'ethernet']) - self.assertEqual(self.xml.options, []) - self.assertEqual(self.xml.filling, True) - self.assertEqual(self.xml.word, last) - self.assertEqual(self.xml.check, True) - self.assertEqual(self.xml.final, False) - self.assertEqual(self.xml.extra, False) - self.assertEqual(self.xml.filled, False) - self.assertEqual(self.xml.plain, False) - - def test_interfaces_ethernet_lan0_space(self): - last = self.xml.traverse("interfaces ethernet lan0 ") - self.assertEqual(last, '') - self.assertEqual(self.xml.inside, ['interfaces', 'ethernet']) - self.assertEqual(len(self.xml.options), 19) - self.assertEqual(self.xml.filling, False) - self.assertEqual(self.xml.word, last) - self.assertEqual(self.xml.check, False) - self.assertEqual(self.xml.final, False) - self.assertEqual(self.xml.extra, False) - self.assertEqual(self.xml.filled, True) - self.assertEqual(self.xml.plain, False) - - def test_interfaces_ethernet_lan0_ad(self): - last = self.xml.traverse("interfaces ethernet lan0 ad") - self.assertEqual(last, 'ad') - self.assertEqual(self.xml.inside, ['interfaces', 'ethernet']) - self.assertEqual(self.xml.options, ['address']) - self.assertEqual(self.xml.filling, True) - self.assertEqual(self.xml.word, last) - self.assertEqual(self.xml.check, False) - self.assertEqual(self.xml.final, False) - self.assertEqual(self.xml.extra, False) - self.assertEqual(self.xml.filled, False) - self.assertEqual(self.xml.plain, False) - - def test_interfaces_ethernet_lan0_address(self): - last = self.xml.traverse("interfaces ethernet lan0 address") - self.assertEqual(last, '') - self.assertEqual(self.xml.inside, ['interfaces', 'ethernet', 'address']) - self.assertEqual(self.xml.options, []) - self.assertEqual(self.xml.filling, False) - self.assertEqual(self.xml.word, last) - self.assertEqual(self.xml.check, False) - self.assertEqual(self.xml.final, False) - self.assertEqual(self.xml.extra, False) - self.assertEqual(self.xml.filled, False) - self.assertEqual(self.xml.plain, False) - - def test_interfaces_ethernet_lan0_address_space(self): - last = self.xml.traverse("interfaces ethernet lan0 address ") - self.assertEqual(last, '') - self.assertEqual(self.xml.inside, ['interfaces', 'ethernet', 'address']) - self.assertEqual(self.xml.options, []) - self.assertEqual(self.xml.filling, False) - self.assertEqual(self.xml.word, last) - self.assertEqual(self.xml.check, False) - self.assertEqual(self.xml.final, False) - self.assertEqual(self.xml.extra, False) - self.assertEqual(self.xml.filled, False) - self.assertEqual(self.xml.plain, False) - - def test_interfaces_ethernet_lan0_address_space_11(self): - last = self.xml.traverse("interfaces ethernet lan0 address 1.1") - self.assertEqual(last, '1.1') - self.assertEqual(self.xml.inside, ['interfaces', 'ethernet', 'address']) - self.assertEqual(self.xml.options, []) - self.assertEqual(self.xml.filling, True) - self.assertEqual(self.xml.word, last) - self.assertEqual(self.xml.check, True) - self.assertEqual(self.xml.final, True) - self.assertEqual(self.xml.extra, False) - self.assertEqual(self.xml.filled, True) - self.assertEqual(self.xml.plain, False) - - def test_interfaces_ethernet_lan0_address_space_1111_32(self): - last = self.xml.traverse("interfaces ethernet lan0 address 1.1.1.1/32") - self.assertEqual(last, '1.1.1.1/32') - self.assertEqual(self.xml.inside, ['interfaces', 'ethernet', 'address']) - self.assertEqual(self.xml.options, []) - self.assertEqual(self.xml.filling, True) - self.assertEqual(self.xml.word, last) - self.assertEqual(self.xml.check, True) - self.assertEqual(self.xml.final, True) - self.assertEqual(self.xml.extra, False) - self.assertEqual(self.xml.filled, True) - self.assertEqual(self.xml.plain, False) - - def test_interfaces_ethernet_lan0_address_space_1111_32_space(self): - last = self.xml.traverse("interfaces ethernet lan0 address 1.1.1.1/32 ") - self.assertEqual(last, '1.1.1.1/32') - self.assertEqual(self.xml.inside, ['interfaces', 'ethernet', 'address']) - self.assertEqual(self.xml.options, []) - self.assertEqual(self.xml.filling, True) - self.assertEqual(self.xml.word, last) - self.assertEqual(self.xml.check, True) - self.assertEqual(self.xml.final, True) - self.assertEqual(self.xml.extra, False) - self.assertEqual(self.xml.filled, True) - self.assertEqual(self.xml.plain, False) - - def test_interfaces_ethernet_lan0_address_space_1111_32_space_text(self): - last = self.xml.traverse("interfaces ethernet lan0 address 1.1.1.1/32 text") - self.assertEqual(last, '1.1.1.1/32 text') - self.assertEqual(self.xml.inside, ['interfaces', 'ethernet', 'address']) - self.assertEqual(self.xml.options, []) - self.assertEqual(self.xml.filling, True) - self.assertEqual(self.xml.word, last) - self.assertEqual(self.xml.check, True) - self.assertEqual(self.xml.final, True) - self.assertEqual(self.xml.extra, False) - self.assertEqual(self.xml.filled, True) - self.assertEqual(self.xml.plain, False) - - def test_interfaces_ethernet_lan0_address_space_1111_32_space_text_space(self): - last = self.xml.traverse("interfaces ethernet lan0 address 1.1.1.1/32 text ") - self.assertEqual(last, '1.1.1.1/32 text') - self.assertEqual(self.xml.inside, ['interfaces', 'ethernet', 'address']) - self.assertEqual(self.xml.options, []) - self.assertEqual(self.xml.filling, True) - self.assertEqual(self.xml.word, last) - self.assertEqual(self.xml.check, True) - self.assertEqual(self.xml.final, True) - self.assertEqual(self.xml.extra, False) - self.assertEqual(self.xml.filled, True) - self.assertEqual(self.xml.plain, False) - - # Need to add a check for a valuless leafNode diff --git a/python/vyos/xml_ref/generate_cache.py b/python/vyos/xml_ref/generate_cache.py index d1ccb0f81..5f3f84dee 100755 --- a/python/vyos/xml_ref/generate_cache.py +++ b/python/vyos/xml_ref/generate_cache.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2023 VyOS maintainers and contributors +# Copyright (C) 2023-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -13,19 +13,14 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -# -# import sys import json from argparse import ArgumentParser from argparse import ArgumentTypeError -from os import getcwd -from os import makedirs from os.path import join from os.path import abspath from os.path import dirname -from os.path import basename from xmltodict import parse _here = dirname(__file__) |