summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/vyos/config.py56
-rw-r--r--python/vyos/configdict.py99
-rw-r--r--python/vyos/configverify.py2
-rw-r--r--python/vyos/utils/dict.py43
-rw-r--r--python/vyos/validate.py20
-rw-r--r--python/vyos/xml_ref/__init__.py13
-rw-r--r--python/vyos/xml_ref/definition.py120
7 files changed, 177 insertions, 176 deletions
diff --git a/python/vyos/config.py b/python/vyos/config.py
index 287fd2ed1..c3bb68373 100644
--- a/python/vyos/config.py
+++ b/python/vyos/config.py
@@ -1,4 +1,4 @@
-# Copyright 2017, 2019 VyOS maintainers and contributors <maintainers@vyos.io>
+# Copyright 2017, 2019-2023 VyOS maintainers and contributors <maintainers@vyos.io>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -67,9 +67,9 @@ import re
import json
from copy import deepcopy
-import vyos.xml
-import vyos.util
import vyos.configtree
+from vyos.xml_ref import multi_to_list, merge_defaults, relative_defaults
+from vyos.utils.dict import get_sub_dict, mangle_dict_keys
from vyos.configsource import ConfigSource, ConfigSourceSession
class Config(object):
@@ -225,7 +225,8 @@ class Config(object):
def get_config_dict(self, path=[], effective=False, key_mangling=None,
get_first_key=False, no_multi_convert=False,
- no_tag_node_value_mangle=False):
+ no_tag_node_value_mangle=False,
+ with_defaults=False, with_recursive_defaults=False):
"""
Args:
path (str list): Configuration tree path, can be empty
@@ -238,19 +239,23 @@ class Config(object):
"""
lpath = self._make_path(path)
root_dict = self.get_cached_root_dict(effective)
- conf_dict = vyos.util.get_sub_dict(root_dict, lpath, get_first_key)
+ conf_dict = get_sub_dict(root_dict, lpath, get_first_key)
- if not key_mangling and no_multi_convert:
+ if key_mangling is None and no_multi_convert and not with_defaults:
return deepcopy(conf_dict)
- xmlpath = lpath if get_first_key else lpath[:-1]
+ rpath = lpath if get_first_key else lpath[:-1]
- if not key_mangling:
- conf_dict = vyos.xml.multi_to_list(xmlpath, conf_dict)
- return conf_dict
+ if not no_multi_convert:
+ conf_dict = multi_to_list(rpath, conf_dict)
+
+ if with_defaults or with_recursive_defaults:
+ conf_dict = merge_defaults(lpath, conf_dict,
+ get_first_key=get_first_key,
+ recursive=with_recursive_defaults)
- if no_multi_convert is False:
- conf_dict = vyos.xml.multi_to_list(xmlpath, conf_dict)
+ if key_mangling is None:
+ return conf_dict
if not (isinstance(key_mangling, tuple) and \
(len(key_mangling) == 2) and \
@@ -258,10 +263,35 @@ class Config(object):
isinstance(key_mangling[1], str)):
raise ValueError("key_mangling must be a tuple of two strings")
- conf_dict = vyos.util.mangle_dict_keys(conf_dict, key_mangling[0], key_mangling[1], abs_path=xmlpath, no_tag_node_value_mangle=no_tag_node_value_mangle)
+ conf_dict = mangle_dict_keys(conf_dict, key_mangling[0], key_mangling[1], abs_path=rpath, no_tag_node_value_mangle=no_tag_node_value_mangle)
return conf_dict
+ def get_config_defaults(self, path=[], effective=False, key_mangling=None,
+ no_tag_node_value_mangle=False, get_first_key=False,
+ recursive=False) -> dict:
+ lpath = self._make_path(path)
+ root_dict = self.get_cached_root_dict(effective)
+ conf_dict = get_sub_dict(root_dict, lpath, get_first_key)
+
+ defaults = relative_defaults(lpath, conf_dict,
+ get_first_key=get_first_key,
+ recursive=recursive)
+ if key_mangling is None:
+ return defaults
+
+ rpath = lpath if get_first_key else lpath[:-1]
+
+ if not (isinstance(key_mangling, tuple) and \
+ (len(key_mangling) == 2) and \
+ isinstance(key_mangling[0], str) and \
+ isinstance(key_mangling[1], str)):
+ raise ValueError("key_mangling must be a tuple of two strings")
+
+ defaults = mangle_dict_keys(defaults, key_mangling[0], key_mangling[1], abs_path=rpath, no_tag_node_value_mangle=no_tag_node_value_mangle)
+
+ return defaults
+
def is_multi(self, path):
"""
Args:
diff --git a/python/vyos/configdict.py b/python/vyos/configdict.py
index 6ab5c252c..9618ec93e 100644
--- a/python/vyos/configdict.py
+++ b/python/vyos/configdict.py
@@ -389,7 +389,7 @@ def get_pppoe_interfaces(conf, vrf=None):
return pppoe_interfaces
-def get_interface_dict(config, base, ifname=''):
+def get_interface_dict(config, base, ifname='', recursive_defaults=True):
"""
Common utility function to retrieve and mangle the interfaces configuration
from the CLI input nodes. All interfaces have a common base where value
@@ -405,46 +405,23 @@ def get_interface_dict(config, base, ifname=''):
raise ConfigError('Interface (VYOS_TAGNODE_VALUE) not specified')
ifname = os.environ['VYOS_TAGNODE_VALUE']
- # retrieve interface default values
- default_values = defaults(base)
-
- # We take care about VLAN (vif, vif-s, vif-c) default values later on when
- # parsing vlans in default dict and merge the "proper" values in correctly,
- # see T2665.
- for vif in ['vif', 'vif_s']:
- if vif in default_values: del default_values[vif]
-
- dict = config.get_config_dict(base + [ifname], key_mangling=('-', '_'),
- get_first_key=True,
- no_tag_node_value_mangle=True)
-
# Check if interface has been removed. We must use exists() as
# get_config_dict() will always return {} - even when an empty interface
# node like the following exists.
# +macsec macsec1 {
# +}
if not config.exists(base + [ifname]):
+ dict = config.get_config_dict(base + [ifname], key_mangling=('-', '_'),
+ get_first_key=True,
+ no_tag_node_value_mangle=True)
dict.update({'deleted' : {}})
-
- # Add interface instance name into dictionary
- dict.update({'ifname': ifname})
-
- # Check if QoS policy applied on this interface - See ifconfig.interface.set_mirror_redirect()
- if config.exists(['qos', 'interface', ifname]):
- dict.update({'traffic_policy': {}})
-
- # XXX: T2665: When there is no DHCPv6-PD configuration given, we can safely
- # remove the default values from the dict.
- if 'dhcpv6_options' not in dict:
- if 'dhcpv6_options' in default_values:
- del default_values['dhcpv6_options']
-
- # We have gathered the dict representation of the CLI, but there are
- # default options which we need to update into the dictionary retrived.
- # But we should only add them when interface is not deleted - as this might
- # confuse parsers
- if 'deleted' not in dict:
- dict = dict_merge(default_values, dict)
+ else:
+ # Get config_dict with default values
+ dict = config.get_config_dict(base + [ifname], key_mangling=('-', '_'),
+ get_first_key=True,
+ no_tag_node_value_mangle=True,
+ with_defaults=True,
+ with_recursive_defaults=recursive_defaults)
# If interface does not request an IPv4 DHCP address there is no need
# to keep the dhcp-options key
@@ -452,8 +429,12 @@ def get_interface_dict(config, base, ifname=''):
if 'dhcp_options' in dict:
del dict['dhcp_options']
- # XXX: T2665: blend in proper DHCPv6-PD default values
- dict = T2665_set_dhcpv6pd_defaults(dict)
+ # Add interface instance name into dictionary
+ dict.update({'ifname': ifname})
+
+ # Check if QoS policy applied on this interface - See ifconfig.interface.set_mirror_redirect()
+ if config.exists(['qos', 'interface', ifname]):
+ dict.update({'traffic_policy': {}})
address = leaf_node_changed(config, base + [ifname, 'address'])
if address: dict.update({'address_old' : address})
@@ -497,9 +478,6 @@ def get_interface_dict(config, base, ifname=''):
else:
dict['ipv6']['address'].update({'eui64_old': eui64})
- # Implant default dictionary in vif/vif-s VLAN interfaces. Values are
- # identical for all types of VLAN interfaces as they all include the same
- # XML definitions which hold the defaults.
for vif, vif_config in dict.get('vif', {}).items():
# Add subinterface name to dictionary
dict['vif'][vif].update({'ifname' : f'{ifname}.{vif}'})
@@ -507,22 +485,10 @@ def get_interface_dict(config, base, ifname=''):
if config.exists(['qos', 'interface', f'{ifname}.{vif}']):
dict['vif'][vif].update({'traffic_policy': {}})
- default_vif_values = defaults(base + ['vif'])
- # XXX: T2665: When there is no DHCPv6-PD configuration given, we can safely
- # remove the default values from the dict.
- if not 'dhcpv6_options' in vif_config:
- del default_vif_values['dhcpv6_options']
-
- # Only add defaults if interface is not about to be deleted - this is
- # to keep a cleaner config dict.
if 'deleted' not in dict:
address = leaf_node_changed(config, base + [ifname, 'vif', vif, 'address'])
if address: dict['vif'][vif].update({'address_old' : address})
- dict['vif'][vif] = dict_merge(default_vif_values, dict['vif'][vif])
- # XXX: T2665: blend in proper DHCPv6-PD default values
- dict['vif'][vif] = T2665_set_dhcpv6pd_defaults(dict['vif'][vif])
-
# If interface does not request an IPv4 DHCP address there is no need
# to keep the dhcp-options key
if 'address' not in dict['vif'][vif] or 'dhcp' not in dict['vif'][vif]['address']:
@@ -544,26 +510,10 @@ def get_interface_dict(config, base, ifname=''):
if config.exists(['qos', 'interface', f'{ifname}.{vif_s}']):
dict['vif_s'][vif_s].update({'traffic_policy': {}})
- default_vif_s_values = defaults(base + ['vif-s'])
- # XXX: T2665: we only wan't the vif-s defaults - do not care about vif-c
- if 'vif_c' in default_vif_s_values: del default_vif_s_values['vif_c']
-
- # XXX: T2665: When there is no DHCPv6-PD configuration given, we can safely
- # remove the default values from the dict.
- if not 'dhcpv6_options' in vif_s_config:
- del default_vif_s_values['dhcpv6_options']
-
- # Only add defaults if interface is not about to be deleted - this is
- # to keep a cleaner config dict.
if 'deleted' not in dict:
address = leaf_node_changed(config, base + [ifname, 'vif-s', vif_s, 'address'])
if address: dict['vif_s'][vif_s].update({'address_old' : address})
- dict['vif_s'][vif_s] = dict_merge(default_vif_s_values,
- dict['vif_s'][vif_s])
- # XXX: T2665: blend in proper DHCPv6-PD default values
- dict['vif_s'][vif_s] = T2665_set_dhcpv6pd_defaults(dict['vif_s'][vif_s])
-
# If interface does not request an IPv4 DHCP address there is no need
# to keep the dhcp-options key
if 'address' not in dict['vif_s'][vif_s] or 'dhcp' not in \
@@ -586,26 +536,11 @@ def get_interface_dict(config, base, ifname=''):
if config.exists(['qos', 'interface', f'{ifname}.{vif_s}.{vif_c}']):
dict['vif_s'][vif_s]['vif_c'][vif_c].update({'traffic_policy': {}})
- default_vif_c_values = defaults(base + ['vif-s', 'vif-c'])
-
- # XXX: T2665: When there is no DHCPv6-PD configuration given, we can safely
- # remove the default values from the dict.
- if not 'dhcpv6_options' in vif_c_config:
- del default_vif_c_values['dhcpv6_options']
-
- # Only add defaults if interface is not about to be deleted - this is
- # to keep a cleaner config dict.
if 'deleted' not in dict:
address = leaf_node_changed(config, base + [ifname, 'vif-s', vif_s, 'vif-c', vif_c, 'address'])
if address: dict['vif_s'][vif_s]['vif_c'][vif_c].update(
{'address_old' : address})
- dict['vif_s'][vif_s]['vif_c'][vif_c] = dict_merge(
- default_vif_c_values, dict['vif_s'][vif_s]['vif_c'][vif_c])
- # XXX: T2665: blend in proper DHCPv6-PD default values
- dict['vif_s'][vif_s]['vif_c'][vif_c] = T2665_set_dhcpv6pd_defaults(
- dict['vif_s'][vif_s]['vif_c'][vif_c])
-
# If interface does not request an IPv4 DHCP address there is no need
# to keep the dhcp-options key
if 'address' not in dict['vif_s'][vif_s]['vif_c'][vif_c] or 'dhcp' \
diff --git a/python/vyos/configverify.py b/python/vyos/configverify.py
index 8fddd91d0..94dcdf4d9 100644
--- a/python/vyos/configverify.py
+++ b/python/vyos/configverify.py
@@ -322,7 +322,7 @@ def verify_dhcpv6(config):
# It is not allowed to have duplicate SLA-IDs as those identify an
# assigned IPv6 subnet from a delegated prefix
- for pd in dict_search('dhcpv6_options.pd', config):
+ for pd in (dict_search('dhcpv6_options.pd', config) or []):
sla_ids = []
interfaces = dict_search(f'dhcpv6_options.pd.{pd}.interface', config)
diff --git a/python/vyos/utils/dict.py b/python/vyos/utils/dict.py
index 3faf5c596..28d32bb8d 100644
--- a/python/vyos/utils/dict.py
+++ b/python/vyos/utils/dict.py
@@ -65,7 +65,7 @@ def colon_separated_to_dict(data_string, uniquekeys=False):
return data
-def _mangle_dict_keys(data, regex, replacement, abs_path=[], no_tag_node_value_mangle=False, mod=0):
+def mangle_dict_keys(data, regex, replacement, abs_path=None, no_tag_node_value_mangle=False):
""" Mangles dict keys according to a regex and replacement character.
Some libraries like Jinja2 do not like certain characters in dict keys.
This function can be used for replacing all offending characters
@@ -73,44 +73,39 @@ def _mangle_dict_keys(data, regex, replacement, abs_path=[], no_tag_node_value_m
Args:
data (dict): Original dict to mangle
+ regex, replacement (str): arguments to re.sub(regex, replacement, ...)
+ abs_path (list): if data is a config dict and no_tag_node_value_mangle is True
+ then abs_path should be the absolute config path to the first
+ keys of data, non-inclusive
+ no_tag_node_value_mangle (bool): do not mangle keys of tag node values
Returns: dict
"""
- from vyos.xml import is_tag
-
- new_dict = {}
+ import re
+ from vyos.xml_ref import is_tag_value
- for key in data.keys():
- save_mod = mod
- save_path = abs_path[:]
+ if abs_path is None:
+ abs_path = []
- abs_path.append(key)
+ new_dict = {}
- if not is_tag(abs_path):
- new_key = re.sub(regex, replacement, key)
+ for k in data.keys():
+ if no_tag_node_value_mangle and is_tag_value(abs_path + [k]):
+ new_key = k
else:
- if mod%2:
- new_key = key
- else:
- new_key = re.sub(regex, replacement, key)
- if no_tag_node_value_mangle:
- mod += 1
+ new_key = re.sub(regex, replacement, k)
- value = data[key]
+ value = data[k]
if isinstance(value, dict):
- new_dict[new_key] = _mangle_dict_keys(value, regex, replacement, abs_path=abs_path, mod=mod, no_tag_node_value_mangle=no_tag_node_value_mangle)
+ new_dict[new_key] = mangle_dict_keys(value, regex, replacement,
+ abs_path=abs_path + [k],
+ no_tag_node_value_mangle=no_tag_node_value_mangle)
else:
new_dict[new_key] = value
- mod = save_mod
- abs_path = save_path[:]
-
return new_dict
-def mangle_dict_keys(data, regex, replacement, abs_path=[], no_tag_node_value_mangle=False):
- return _mangle_dict_keys(data, regex, replacement, abs_path=abs_path, no_tag_node_value_mangle=no_tag_node_value_mangle, mod=0)
-
def _get_sub_dict(d, lpath):
k = lpath[0]
if k not in d.keys():
diff --git a/python/vyos/validate.py b/python/vyos/validate.py
index d18785aaf..e5d8c6043 100644
--- a/python/vyos/validate.py
+++ b/python/vyos/validate.py
@@ -98,7 +98,7 @@ def is_intf_addr_assigned(intf, address) -> bool:
return False
def is_addr_assigned(ip_address, vrf=None) -> bool:
- """ Verify if the given IPv4/IPv6 address is assigned to any interfac """
+ """ Verify if the given IPv4/IPv6 address is assigned to any interface """
from netifaces import interfaces
from vyos.util import get_interface_config
from vyos.util import dict_search
@@ -115,6 +115,24 @@ def is_addr_assigned(ip_address, vrf=None) -> bool:
return False
+def is_afi_configured(interface, afi):
+ """ Check if given address family is configured, or in other words - an IP
+ address is assigned to the interface. """
+ from netifaces import ifaddresses
+ from netifaces import AF_INET
+ from netifaces import AF_INET6
+
+ if afi not in [AF_INET, AF_INET6]:
+ raise ValueError('Address family must be in [AF_INET, AF_INET6]')
+
+ try:
+ addresses = ifaddresses(interface)
+ except ValueError as e:
+ print(e)
+ return False
+
+ return afi in addresses
+
def is_loopback_addr(addr):
""" Check if supplied IPv4/IPv6 address is a loopback address """
from ipaddress import ip_address
diff --git a/python/vyos/xml_ref/__init__.py b/python/vyos/xml_ref/__init__.py
index 2e144ef10..62d3680a1 100644
--- a/python/vyos/xml_ref/__init__.py
+++ b/python/vyos/xml_ref/__init__.py
@@ -58,12 +58,15 @@ def get_defaults(path: list, get_first_key=False, recursive=False) -> dict:
return load_reference().get_defaults(path, get_first_key=get_first_key,
recursive=recursive)
-def get_config_defaults(rpath: list, conf: dict, get_first_key=False,
- recursive=False) -> dict:
+def relative_defaults(rpath: list, conf: dict, get_first_key=False,
+ recursive=False) -> dict:
- return load_reference().relative_defaults(rpath, conf=conf,
+ return load_reference().relative_defaults(rpath, conf,
get_first_key=get_first_key,
recursive=recursive)
-def merge_defaults(path: list, conf: dict) -> dict:
- return load_reference().merge_defaults(path, conf)
+def merge_defaults(path: list, conf: dict, get_first_key=False,
+ recursive=False) -> dict:
+ return load_reference().merge_defaults(path, conf,
+ get_first_key=get_first_key,
+ recursive=recursive)
diff --git a/python/vyos/xml_ref/definition.py b/python/vyos/xml_ref/definition.py
index 95ecc01a6..7fd7a7b77 100644
--- a/python/vyos/xml_ref/definition.py
+++ b/python/vyos/xml_ref/definition.py
@@ -13,8 +13,7 @@
# You should have received a copy of the GNU Lesser General Public License
# along with this library. If not, see <http://www.gnu.org/licenses/>.
-from typing import Union, Any
-from vyos.configdict import dict_merge
+from typing import Optional, Union, Any
class Xml:
def __init__(self):
@@ -141,9 +140,17 @@ class Xml:
return res
- def _get_default_value(self, node: dict):
+ def _get_default_value(self, node: dict) -> Optional[str]:
return self._get_ref_node_data(node, "default_value")
+ def _get_default(self, node: dict) -> Optional[Union[str, list]]:
+ default = self._get_default_value(node)
+ if default is None:
+ return None
+ if self._is_multi_node(node) and not isinstance(default, list):
+ return [default]
+ return default
+
def get_defaults(self, path: list, get_first_key=False, recursive=False) -> dict:
"""Return dict containing default values below path
@@ -153,18 +160,23 @@ class Xml:
'relative_defaults'
"""
res: dict = {}
+ if self.is_tag(path):
+ return res
+
d = self._get_ref_path(path)
+
+ if self._is_leaf_node(d):
+ default_value = self._get_default(d)
+ if default_value is not None:
+ return {path[-1]: default_value} if path else {}
+
for k in list(d):
if k in ('node_data', 'component_version') :
continue
- d_k = d[k]
- if self._is_leaf_node(d_k):
- default_value = self._get_default_value(d_k)
+ if self._is_leaf_node(d[k]):
+ default_value = self._get_default(d[k])
if default_value is not None:
- pos = default_value
- if self._is_multi_node(d_k) and not isinstance(pos, list):
- pos = [pos]
- res |= {k: pos}
+ res |= {k: default_value}
elif self.is_tag(path + [k]):
# tag node defaults are used as suggestion, not default value;
# should this change, append to path and continue if recursive
@@ -175,8 +187,6 @@ class Xml:
res |= pos
if res:
if get_first_key or not path:
- if not isinstance(res, dict):
- raise TypeError("Cannot get_first_key as data under node is not of type dict")
return res
return {path[-1]: res}
@@ -188,7 +198,7 @@ class Xml:
return [next(iter(c.keys()))] if c else []
try:
tmp = step(conf)
- if self.is_tag_value(path + tmp):
+ if tmp and self.is_tag_value(path + tmp):
c = conf[tmp[0]]
if not isinstance(c, dict):
raise ValueError
@@ -200,57 +210,67 @@ class Xml:
return False
return True
- def relative_defaults(self, rpath: list, conf: dict, get_first_key=False,
- recursive=False) -> dict:
- """Return dict containing defaults along paths of a config dict
- """
- if not conf:
- return self.get_defaults(rpath, get_first_key=get_first_key,
- recursive=recursive)
- if rpath and rpath[-1] in list(conf):
- conf = conf[rpath[-1]]
- if not isinstance(conf, dict):
- raise TypeError('conf at path is not of type dict')
+ # use local copy of function in module configdict, to avoid circular
+ # import
+ def _dict_merge(self, source, destination):
+ from copy import deepcopy
+ tmp = deepcopy(destination)
- if not self._well_defined(rpath, conf):
- print('path to config dict does not define full config paths')
- return {}
+ for key, value in source.items():
+ if key not in tmp:
+ tmp[key] = value
+ elif isinstance(source[key], dict):
+ tmp[key] = self._dict_merge(source[key], tmp[key])
+ return tmp
+
+ def _relative_defaults(self, rpath: list, conf: dict, recursive=False) -> dict:
res: dict = {}
+ res = self.get_defaults(rpath, recursive=recursive,
+ get_first_key=True)
for k in list(conf):
- pos = self.get_defaults(rpath + [k], recursive=recursive)
- res |= pos
-
if isinstance(conf[k], dict):
- step = self.relative_defaults(rpath + [k], conf=conf[k],
- recursive=recursive)
+ step = self._relative_defaults(rpath + [k], conf=conf[k],
+ recursive=recursive)
res |= step
if res:
- if get_first_key:
- return res
return {rpath[-1]: res} if rpath else res
return {}
- def merge_defaults(self, path: list, conf: dict) -> dict:
+ def relative_defaults(self, path: list, conf: dict, get_first_key=False,
+ recursive=False) -> dict:
+ """Return dict containing defaults along paths of a config dict
+ """
+ if not conf:
+ return self.get_defaults(path, get_first_key=get_first_key,
+ recursive=recursive)
+ if path and path[-1] in list(conf):
+ conf = conf[path[-1]]
+ conf = {} if not isinstance(conf, dict) else conf
+
+ if not self._well_defined(path, conf):
+ print('path to config dict does not define full config paths')
+ return {}
+
+ res = self._relative_defaults(path, conf, recursive=recursive)
+
+ if get_first_key and path:
+ if res.values():
+ res = next(iter(res.values()))
+ else:
+ res = {}
+
+ return res
+
+ def merge_defaults(self, path: list, conf: dict, get_first_key=False,
+ recursive=False) -> dict:
"""Return config dict with defaults non-destructively merged
This merges non-recursive defaults relative to the config dict.
"""
- if path[-1] in list(conf):
- config = conf[path[-1]]
- if not isinstance(config, dict):
- raise TypeError('conf at path is not of type dict')
- shift = False
- else:
- config = conf
- shift = True
-
- if not self._well_defined(path, config):
- print('path to config dict does not define config paths; conf returned unchanged')
- return conf
-
- d = self.relative_defaults(path, conf=config, get_first_key=shift)
- d = dict_merge(d, conf)
+ d = self.relative_defaults(path, conf, get_first_key=get_first_key,
+ recursive=recursive)
+ d = self._dict_merge(d, conf)
return d