diff options
Diffstat (limited to 'python/vyos')
-rw-r--r-- | python/vyos/config.py | 56 | ||||
-rw-r--r-- | python/vyos/configdict.py | 99 | ||||
-rw-r--r-- | python/vyos/configverify.py | 2 | ||||
-rw-r--r-- | python/vyos/util.py | 13 | ||||
-rw-r--r-- | python/vyos/utils/dict.py | 43 | ||||
-rw-r--r-- | python/vyos/validate.py | 20 | ||||
-rw-r--r-- | python/vyos/xml_ref/__init__.py | 13 | ||||
-rw-r--r-- | python/vyos/xml_ref/definition.py | 120 |
8 files changed, 190 insertions, 176 deletions
diff --git a/python/vyos/config.py b/python/vyos/config.py index 287fd2ed1..c3bb68373 100644 --- a/python/vyos/config.py +++ b/python/vyos/config.py @@ -1,4 +1,4 @@ -# Copyright 2017, 2019 VyOS maintainers and contributors <maintainers@vyos.io> +# Copyright 2017, 2019-2023 VyOS maintainers and contributors <maintainers@vyos.io> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -67,9 +67,9 @@ import re import json from copy import deepcopy -import vyos.xml -import vyos.util import vyos.configtree +from vyos.xml_ref import multi_to_list, merge_defaults, relative_defaults +from vyos.utils.dict import get_sub_dict, mangle_dict_keys from vyos.configsource import ConfigSource, ConfigSourceSession class Config(object): @@ -225,7 +225,8 @@ class Config(object): def get_config_dict(self, path=[], effective=False, key_mangling=None, get_first_key=False, no_multi_convert=False, - no_tag_node_value_mangle=False): + no_tag_node_value_mangle=False, + with_defaults=False, with_recursive_defaults=False): """ Args: path (str list): Configuration tree path, can be empty @@ -238,19 +239,23 @@ class Config(object): """ lpath = self._make_path(path) root_dict = self.get_cached_root_dict(effective) - conf_dict = vyos.util.get_sub_dict(root_dict, lpath, get_first_key) + conf_dict = get_sub_dict(root_dict, lpath, get_first_key) - if not key_mangling and no_multi_convert: + if key_mangling is None and no_multi_convert and not with_defaults: return deepcopy(conf_dict) - xmlpath = lpath if get_first_key else lpath[:-1] + rpath = lpath if get_first_key else lpath[:-1] - if not key_mangling: - conf_dict = vyos.xml.multi_to_list(xmlpath, conf_dict) - return conf_dict + if not no_multi_convert: + conf_dict = multi_to_list(rpath, conf_dict) + + if with_defaults or with_recursive_defaults: + conf_dict = merge_defaults(lpath, conf_dict, + get_first_key=get_first_key, + recursive=with_recursive_defaults) - if no_multi_convert is False: - conf_dict = vyos.xml.multi_to_list(xmlpath, conf_dict) + if key_mangling is None: + return conf_dict if not (isinstance(key_mangling, tuple) and \ (len(key_mangling) == 2) and \ @@ -258,10 +263,35 @@ class Config(object): isinstance(key_mangling[1], str)): raise ValueError("key_mangling must be a tuple of two strings") - conf_dict = vyos.util.mangle_dict_keys(conf_dict, key_mangling[0], key_mangling[1], abs_path=xmlpath, no_tag_node_value_mangle=no_tag_node_value_mangle) + conf_dict = mangle_dict_keys(conf_dict, key_mangling[0], key_mangling[1], abs_path=rpath, no_tag_node_value_mangle=no_tag_node_value_mangle) return conf_dict + def get_config_defaults(self, path=[], effective=False, key_mangling=None, + no_tag_node_value_mangle=False, get_first_key=False, + recursive=False) -> dict: + lpath = self._make_path(path) + root_dict = self.get_cached_root_dict(effective) + conf_dict = get_sub_dict(root_dict, lpath, get_first_key) + + defaults = relative_defaults(lpath, conf_dict, + get_first_key=get_first_key, + recursive=recursive) + if key_mangling is None: + return defaults + + rpath = lpath if get_first_key else lpath[:-1] + + if not (isinstance(key_mangling, tuple) and \ + (len(key_mangling) == 2) and \ + isinstance(key_mangling[0], str) and \ + isinstance(key_mangling[1], str)): + raise ValueError("key_mangling must be a tuple of two strings") + + defaults = mangle_dict_keys(defaults, key_mangling[0], key_mangling[1], abs_path=rpath, no_tag_node_value_mangle=no_tag_node_value_mangle) + + return defaults + def is_multi(self, path): """ Args: diff --git a/python/vyos/configdict.py b/python/vyos/configdict.py index 6ab5c252c..9618ec93e 100644 --- a/python/vyos/configdict.py +++ b/python/vyos/configdict.py @@ -389,7 +389,7 @@ def get_pppoe_interfaces(conf, vrf=None): return pppoe_interfaces -def get_interface_dict(config, base, ifname=''): +def get_interface_dict(config, base, ifname='', recursive_defaults=True): """ Common utility function to retrieve and mangle the interfaces configuration from the CLI input nodes. All interfaces have a common base where value @@ -405,46 +405,23 @@ def get_interface_dict(config, base, ifname=''): raise ConfigError('Interface (VYOS_TAGNODE_VALUE) not specified') ifname = os.environ['VYOS_TAGNODE_VALUE'] - # retrieve interface default values - default_values = defaults(base) - - # We take care about VLAN (vif, vif-s, vif-c) default values later on when - # parsing vlans in default dict and merge the "proper" values in correctly, - # see T2665. - for vif in ['vif', 'vif_s']: - if vif in default_values: del default_values[vif] - - dict = config.get_config_dict(base + [ifname], key_mangling=('-', '_'), - get_first_key=True, - no_tag_node_value_mangle=True) - # Check if interface has been removed. We must use exists() as # get_config_dict() will always return {} - even when an empty interface # node like the following exists. # +macsec macsec1 { # +} if not config.exists(base + [ifname]): + dict = config.get_config_dict(base + [ifname], key_mangling=('-', '_'), + get_first_key=True, + no_tag_node_value_mangle=True) dict.update({'deleted' : {}}) - - # Add interface instance name into dictionary - dict.update({'ifname': ifname}) - - # Check if QoS policy applied on this interface - See ifconfig.interface.set_mirror_redirect() - if config.exists(['qos', 'interface', ifname]): - dict.update({'traffic_policy': {}}) - - # XXX: T2665: When there is no DHCPv6-PD configuration given, we can safely - # remove the default values from the dict. - if 'dhcpv6_options' not in dict: - if 'dhcpv6_options' in default_values: - del default_values['dhcpv6_options'] - - # We have gathered the dict representation of the CLI, but there are - # default options which we need to update into the dictionary retrived. - # But we should only add them when interface is not deleted - as this might - # confuse parsers - if 'deleted' not in dict: - dict = dict_merge(default_values, dict) + else: + # Get config_dict with default values + dict = config.get_config_dict(base + [ifname], key_mangling=('-', '_'), + get_first_key=True, + no_tag_node_value_mangle=True, + with_defaults=True, + with_recursive_defaults=recursive_defaults) # If interface does not request an IPv4 DHCP address there is no need # to keep the dhcp-options key @@ -452,8 +429,12 @@ def get_interface_dict(config, base, ifname=''): if 'dhcp_options' in dict: del dict['dhcp_options'] - # XXX: T2665: blend in proper DHCPv6-PD default values - dict = T2665_set_dhcpv6pd_defaults(dict) + # Add interface instance name into dictionary + dict.update({'ifname': ifname}) + + # Check if QoS policy applied on this interface - See ifconfig.interface.set_mirror_redirect() + if config.exists(['qos', 'interface', ifname]): + dict.update({'traffic_policy': {}}) address = leaf_node_changed(config, base + [ifname, 'address']) if address: dict.update({'address_old' : address}) @@ -497,9 +478,6 @@ def get_interface_dict(config, base, ifname=''): else: dict['ipv6']['address'].update({'eui64_old': eui64}) - # Implant default dictionary in vif/vif-s VLAN interfaces. Values are - # identical for all types of VLAN interfaces as they all include the same - # XML definitions which hold the defaults. for vif, vif_config in dict.get('vif', {}).items(): # Add subinterface name to dictionary dict['vif'][vif].update({'ifname' : f'{ifname}.{vif}'}) @@ -507,22 +485,10 @@ def get_interface_dict(config, base, ifname=''): if config.exists(['qos', 'interface', f'{ifname}.{vif}']): dict['vif'][vif].update({'traffic_policy': {}}) - default_vif_values = defaults(base + ['vif']) - # XXX: T2665: When there is no DHCPv6-PD configuration given, we can safely - # remove the default values from the dict. - if not 'dhcpv6_options' in vif_config: - del default_vif_values['dhcpv6_options'] - - # Only add defaults if interface is not about to be deleted - this is - # to keep a cleaner config dict. if 'deleted' not in dict: address = leaf_node_changed(config, base + [ifname, 'vif', vif, 'address']) if address: dict['vif'][vif].update({'address_old' : address}) - dict['vif'][vif] = dict_merge(default_vif_values, dict['vif'][vif]) - # XXX: T2665: blend in proper DHCPv6-PD default values - dict['vif'][vif] = T2665_set_dhcpv6pd_defaults(dict['vif'][vif]) - # If interface does not request an IPv4 DHCP address there is no need # to keep the dhcp-options key if 'address' not in dict['vif'][vif] or 'dhcp' not in dict['vif'][vif]['address']: @@ -544,26 +510,10 @@ def get_interface_dict(config, base, ifname=''): if config.exists(['qos', 'interface', f'{ifname}.{vif_s}']): dict['vif_s'][vif_s].update({'traffic_policy': {}}) - default_vif_s_values = defaults(base + ['vif-s']) - # XXX: T2665: we only wan't the vif-s defaults - do not care about vif-c - if 'vif_c' in default_vif_s_values: del default_vif_s_values['vif_c'] - - # XXX: T2665: When there is no DHCPv6-PD configuration given, we can safely - # remove the default values from the dict. - if not 'dhcpv6_options' in vif_s_config: - del default_vif_s_values['dhcpv6_options'] - - # Only add defaults if interface is not about to be deleted - this is - # to keep a cleaner config dict. if 'deleted' not in dict: address = leaf_node_changed(config, base + [ifname, 'vif-s', vif_s, 'address']) if address: dict['vif_s'][vif_s].update({'address_old' : address}) - dict['vif_s'][vif_s] = dict_merge(default_vif_s_values, - dict['vif_s'][vif_s]) - # XXX: T2665: blend in proper DHCPv6-PD default values - dict['vif_s'][vif_s] = T2665_set_dhcpv6pd_defaults(dict['vif_s'][vif_s]) - # If interface does not request an IPv4 DHCP address there is no need # to keep the dhcp-options key if 'address' not in dict['vif_s'][vif_s] or 'dhcp' not in \ @@ -586,26 +536,11 @@ def get_interface_dict(config, base, ifname=''): if config.exists(['qos', 'interface', f'{ifname}.{vif_s}.{vif_c}']): dict['vif_s'][vif_s]['vif_c'][vif_c].update({'traffic_policy': {}}) - default_vif_c_values = defaults(base + ['vif-s', 'vif-c']) - - # XXX: T2665: When there is no DHCPv6-PD configuration given, we can safely - # remove the default values from the dict. - if not 'dhcpv6_options' in vif_c_config: - del default_vif_c_values['dhcpv6_options'] - - # Only add defaults if interface is not about to be deleted - this is - # to keep a cleaner config dict. if 'deleted' not in dict: address = leaf_node_changed(config, base + [ifname, 'vif-s', vif_s, 'vif-c', vif_c, 'address']) if address: dict['vif_s'][vif_s]['vif_c'][vif_c].update( {'address_old' : address}) - dict['vif_s'][vif_s]['vif_c'][vif_c] = dict_merge( - default_vif_c_values, dict['vif_s'][vif_s]['vif_c'][vif_c]) - # XXX: T2665: blend in proper DHCPv6-PD default values - dict['vif_s'][vif_s]['vif_c'][vif_c] = T2665_set_dhcpv6pd_defaults( - dict['vif_s'][vif_s]['vif_c'][vif_c]) - # If interface does not request an IPv4 DHCP address there is no need # to keep the dhcp-options key if 'address' not in dict['vif_s'][vif_s]['vif_c'][vif_c] or 'dhcp' \ diff --git a/python/vyos/configverify.py b/python/vyos/configverify.py index 8fddd91d0..94dcdf4d9 100644 --- a/python/vyos/configverify.py +++ b/python/vyos/configverify.py @@ -322,7 +322,7 @@ def verify_dhcpv6(config): # It is not allowed to have duplicate SLA-IDs as those identify an # assigned IPv6 subnet from a delegated prefix - for pd in dict_search('dhcpv6_options.pd', config): + for pd in (dict_search('dhcpv6_options.pd', config) or []): sla_ids = [] interfaces = dict_search(f'dhcpv6_options.pd.{pd}.interface', config) diff --git a/python/vyos/util.py b/python/vyos/util.py index e62f9d5cf..33da5da40 100644 --- a/python/vyos/util.py +++ b/python/vyos/util.py @@ -1139,6 +1139,19 @@ def boot_configuration_complete() -> bool: return True return False +def boot_configuration_success() -> bool: + from vyos.defaults import config_status + + try: + with open(config_status) as f: + res = f.read().strip() + except FileNotFoundError: + return False + + if int(res) == 0: + return True + return False + def sysctl_read(name): """ Read and return current value of sysctl() option """ tmp = cmd(f'sysctl {name}') diff --git a/python/vyos/utils/dict.py b/python/vyos/utils/dict.py index 3faf5c596..28d32bb8d 100644 --- a/python/vyos/utils/dict.py +++ b/python/vyos/utils/dict.py @@ -65,7 +65,7 @@ def colon_separated_to_dict(data_string, uniquekeys=False): return data -def _mangle_dict_keys(data, regex, replacement, abs_path=[], no_tag_node_value_mangle=False, mod=0): +def mangle_dict_keys(data, regex, replacement, abs_path=None, no_tag_node_value_mangle=False): """ Mangles dict keys according to a regex and replacement character. Some libraries like Jinja2 do not like certain characters in dict keys. This function can be used for replacing all offending characters @@ -73,44 +73,39 @@ def _mangle_dict_keys(data, regex, replacement, abs_path=[], no_tag_node_value_m Args: data (dict): Original dict to mangle + regex, replacement (str): arguments to re.sub(regex, replacement, ...) + abs_path (list): if data is a config dict and no_tag_node_value_mangle is True + then abs_path should be the absolute config path to the first + keys of data, non-inclusive + no_tag_node_value_mangle (bool): do not mangle keys of tag node values Returns: dict """ - from vyos.xml import is_tag - - new_dict = {} + import re + from vyos.xml_ref import is_tag_value - for key in data.keys(): - save_mod = mod - save_path = abs_path[:] + if abs_path is None: + abs_path = [] - abs_path.append(key) + new_dict = {} - if not is_tag(abs_path): - new_key = re.sub(regex, replacement, key) + for k in data.keys(): + if no_tag_node_value_mangle and is_tag_value(abs_path + [k]): + new_key = k else: - if mod%2: - new_key = key - else: - new_key = re.sub(regex, replacement, key) - if no_tag_node_value_mangle: - mod += 1 + new_key = re.sub(regex, replacement, k) - value = data[key] + value = data[k] if isinstance(value, dict): - new_dict[new_key] = _mangle_dict_keys(value, regex, replacement, abs_path=abs_path, mod=mod, no_tag_node_value_mangle=no_tag_node_value_mangle) + new_dict[new_key] = mangle_dict_keys(value, regex, replacement, + abs_path=abs_path + [k], + no_tag_node_value_mangle=no_tag_node_value_mangle) else: new_dict[new_key] = value - mod = save_mod - abs_path = save_path[:] - return new_dict -def mangle_dict_keys(data, regex, replacement, abs_path=[], no_tag_node_value_mangle=False): - return _mangle_dict_keys(data, regex, replacement, abs_path=abs_path, no_tag_node_value_mangle=no_tag_node_value_mangle, mod=0) - def _get_sub_dict(d, lpath): k = lpath[0] if k not in d.keys(): diff --git a/python/vyos/validate.py b/python/vyos/validate.py index d18785aaf..e5d8c6043 100644 --- a/python/vyos/validate.py +++ b/python/vyos/validate.py @@ -98,7 +98,7 @@ def is_intf_addr_assigned(intf, address) -> bool: return False def is_addr_assigned(ip_address, vrf=None) -> bool: - """ Verify if the given IPv4/IPv6 address is assigned to any interfac """ + """ Verify if the given IPv4/IPv6 address is assigned to any interface """ from netifaces import interfaces from vyos.util import get_interface_config from vyos.util import dict_search @@ -115,6 +115,24 @@ def is_addr_assigned(ip_address, vrf=None) -> bool: return False +def is_afi_configured(interface, afi): + """ Check if given address family is configured, or in other words - an IP + address is assigned to the interface. """ + from netifaces import ifaddresses + from netifaces import AF_INET + from netifaces import AF_INET6 + + if afi not in [AF_INET, AF_INET6]: + raise ValueError('Address family must be in [AF_INET, AF_INET6]') + + try: + addresses = ifaddresses(interface) + except ValueError as e: + print(e) + return False + + return afi in addresses + def is_loopback_addr(addr): """ Check if supplied IPv4/IPv6 address is a loopback address """ from ipaddress import ip_address diff --git a/python/vyos/xml_ref/__init__.py b/python/vyos/xml_ref/__init__.py index 2e144ef10..62d3680a1 100644 --- a/python/vyos/xml_ref/__init__.py +++ b/python/vyos/xml_ref/__init__.py @@ -58,12 +58,15 @@ def get_defaults(path: list, get_first_key=False, recursive=False) -> dict: return load_reference().get_defaults(path, get_first_key=get_first_key, recursive=recursive) -def get_config_defaults(rpath: list, conf: dict, get_first_key=False, - recursive=False) -> dict: +def relative_defaults(rpath: list, conf: dict, get_first_key=False, + recursive=False) -> dict: - return load_reference().relative_defaults(rpath, conf=conf, + return load_reference().relative_defaults(rpath, conf, get_first_key=get_first_key, recursive=recursive) -def merge_defaults(path: list, conf: dict) -> dict: - return load_reference().merge_defaults(path, conf) +def merge_defaults(path: list, conf: dict, get_first_key=False, + recursive=False) -> dict: + return load_reference().merge_defaults(path, conf, + get_first_key=get_first_key, + recursive=recursive) diff --git a/python/vyos/xml_ref/definition.py b/python/vyos/xml_ref/definition.py index 95ecc01a6..7fd7a7b77 100644 --- a/python/vyos/xml_ref/definition.py +++ b/python/vyos/xml_ref/definition.py @@ -13,8 +13,7 @@ # You should have received a copy of the GNU Lesser General Public License # along with this library. If not, see <http://www.gnu.org/licenses/>. -from typing import Union, Any -from vyos.configdict import dict_merge +from typing import Optional, Union, Any class Xml: def __init__(self): @@ -141,9 +140,17 @@ class Xml: return res - def _get_default_value(self, node: dict): + def _get_default_value(self, node: dict) -> Optional[str]: return self._get_ref_node_data(node, "default_value") + def _get_default(self, node: dict) -> Optional[Union[str, list]]: + default = self._get_default_value(node) + if default is None: + return None + if self._is_multi_node(node) and not isinstance(default, list): + return [default] + return default + def get_defaults(self, path: list, get_first_key=False, recursive=False) -> dict: """Return dict containing default values below path @@ -153,18 +160,23 @@ class Xml: 'relative_defaults' """ res: dict = {} + if self.is_tag(path): + return res + d = self._get_ref_path(path) + + if self._is_leaf_node(d): + default_value = self._get_default(d) + if default_value is not None: + return {path[-1]: default_value} if path else {} + for k in list(d): if k in ('node_data', 'component_version') : continue - d_k = d[k] - if self._is_leaf_node(d_k): - default_value = self._get_default_value(d_k) + if self._is_leaf_node(d[k]): + default_value = self._get_default(d[k]) if default_value is not None: - pos = default_value - if self._is_multi_node(d_k) and not isinstance(pos, list): - pos = [pos] - res |= {k: pos} + res |= {k: default_value} elif self.is_tag(path + [k]): # tag node defaults are used as suggestion, not default value; # should this change, append to path and continue if recursive @@ -175,8 +187,6 @@ class Xml: res |= pos if res: if get_first_key or not path: - if not isinstance(res, dict): - raise TypeError("Cannot get_first_key as data under node is not of type dict") return res return {path[-1]: res} @@ -188,7 +198,7 @@ class Xml: return [next(iter(c.keys()))] if c else [] try: tmp = step(conf) - if self.is_tag_value(path + tmp): + if tmp and self.is_tag_value(path + tmp): c = conf[tmp[0]] if not isinstance(c, dict): raise ValueError @@ -200,57 +210,67 @@ class Xml: return False return True - def relative_defaults(self, rpath: list, conf: dict, get_first_key=False, - recursive=False) -> dict: - """Return dict containing defaults along paths of a config dict - """ - if not conf: - return self.get_defaults(rpath, get_first_key=get_first_key, - recursive=recursive) - if rpath and rpath[-1] in list(conf): - conf = conf[rpath[-1]] - if not isinstance(conf, dict): - raise TypeError('conf at path is not of type dict') + # use local copy of function in module configdict, to avoid circular + # import + def _dict_merge(self, source, destination): + from copy import deepcopy + tmp = deepcopy(destination) - if not self._well_defined(rpath, conf): - print('path to config dict does not define full config paths') - return {} + for key, value in source.items(): + if key not in tmp: + tmp[key] = value + elif isinstance(source[key], dict): + tmp[key] = self._dict_merge(source[key], tmp[key]) + return tmp + + def _relative_defaults(self, rpath: list, conf: dict, recursive=False) -> dict: res: dict = {} + res = self.get_defaults(rpath, recursive=recursive, + get_first_key=True) for k in list(conf): - pos = self.get_defaults(rpath + [k], recursive=recursive) - res |= pos - if isinstance(conf[k], dict): - step = self.relative_defaults(rpath + [k], conf=conf[k], - recursive=recursive) + step = self._relative_defaults(rpath + [k], conf=conf[k], + recursive=recursive) res |= step if res: - if get_first_key: - return res return {rpath[-1]: res} if rpath else res return {} - def merge_defaults(self, path: list, conf: dict) -> dict: + def relative_defaults(self, path: list, conf: dict, get_first_key=False, + recursive=False) -> dict: + """Return dict containing defaults along paths of a config dict + """ + if not conf: + return self.get_defaults(path, get_first_key=get_first_key, + recursive=recursive) + if path and path[-1] in list(conf): + conf = conf[path[-1]] + conf = {} if not isinstance(conf, dict) else conf + + if not self._well_defined(path, conf): + print('path to config dict does not define full config paths') + return {} + + res = self._relative_defaults(path, conf, recursive=recursive) + + if get_first_key and path: + if res.values(): + res = next(iter(res.values())) + else: + res = {} + + return res + + def merge_defaults(self, path: list, conf: dict, get_first_key=False, + recursive=False) -> dict: """Return config dict with defaults non-destructively merged This merges non-recursive defaults relative to the config dict. """ - if path[-1] in list(conf): - config = conf[path[-1]] - if not isinstance(config, dict): - raise TypeError('conf at path is not of type dict') - shift = False - else: - config = conf - shift = True - - if not self._well_defined(path, config): - print('path to config dict does not define config paths; conf returned unchanged') - return conf - - d = self.relative_defaults(path, conf=config, get_first_key=shift) - d = dict_merge(d, conf) + d = self.relative_defaults(path, conf, get_first_key=get_first_key, + recursive=recursive) + d = self._dict_merge(d, conf) return d |