summaryrefslogtreecommitdiff
path: root/python/vyos
diff options
context:
space:
mode:
Diffstat (limited to 'python/vyos')
-rw-r--r--python/vyos/component_version.py2
-rw-r--r--python/vyos/config.py102
-rw-r--r--python/vyos/configdict.py65
-rw-r--r--python/vyos/configdiff.py18
-rw-r--r--python/vyos/configtree.py18
-rw-r--r--python/vyos/defaults.py4
-rw-r--r--python/vyos/firewall.py58
-rw-r--r--python/vyos/ifconfig/bond.py4
-rw-r--r--python/vyos/ifconfig/bridge.py4
-rw-r--r--python/vyos/ifconfig/ethernet.py2
-rw-r--r--python/vyos/ifconfig/interface.py114
-rw-r--r--python/vyos/ifconfig/pppoe.py2
-rw-r--r--python/vyos/ifconfig/tunnel.py2
-rw-r--r--python/vyos/nat.py33
-rw-r--r--python/vyos/pki.py14
-rw-r--r--python/vyos/qos/base.py24
-rw-r--r--python/vyos/template.py9
-rw-r--r--python/vyos/utils/__init__.py1
-rw-r--r--python/vyos/utils/assertion.py81
-rw-r--r--python/vyos/utils/convert.py46
-rw-r--r--python/vyos/utils/network.py186
-rw-r--r--python/vyos/validate.py321
-rw-r--r--python/vyos/xml_ref/__init__.py28
-rw-r--r--python/vyos/xml_ref/definition.py101
24 files changed, 700 insertions, 539 deletions
diff --git a/python/vyos/component_version.py b/python/vyos/component_version.py
index a4e318d08..84e0ae51a 100644
--- a/python/vyos/component_version.py
+++ b/python/vyos/component_version.py
@@ -37,7 +37,7 @@ import re
import sys
import fileinput
-from vyos.xml import component_version
+from vyos.xml_ref import component_version
from vyos.version import get_version
from vyos.defaults import directories
diff --git a/python/vyos/config.py b/python/vyos/config.py
index 179f60c43..6fececd76 100644
--- a/python/vyos/config.py
+++ b/python/vyos/config.py
@@ -66,17 +66,31 @@ In operational mode, all functions return values from the running config.
import re
import json
from copy import deepcopy
+from typing import Union
import vyos.configtree
-from vyos.xml_ref import multi_to_list, from_source
-from vyos.xml_ref import merge_defaults, relative_defaults
-from vyos.utils.dict import get_sub_dict, mangle_dict_keys
-from vyos.configsource import ConfigSource, ConfigSourceSession
+from vyos.xml_ref import multi_to_list
+from vyos.xml_ref import from_source
+from vyos.xml_ref import ext_dict_merge
+from vyos.xml_ref import relative_defaults
+from vyos.utils.dict import get_sub_dict
+from vyos.utils.dict import mangle_dict_keys
+from vyos.configsource import ConfigSource
+from vyos.configsource import ConfigSourceSession
class ConfigDict(dict):
_from_defaults = {}
- def from_defaults(self, path: list[str]):
+ _dict_kwargs = {}
+ def from_defaults(self, path: list[str]) -> bool:
return from_source(self._from_defaults, path)
+ @property
+ def kwargs(self) -> dict:
+ return self._dict_kwargs
+
+def config_dict_merge(src: dict, dest: Union[dict, ConfigDict]) -> ConfigDict:
+ if not isinstance(dest, ConfigDict):
+ dest = ConfigDict(dest)
+ return ext_dict_merge(src, dest)
class Config(object):
"""
@@ -229,6 +243,13 @@ class Config(object):
return config_dict
+ def verify_mangling(self, key_mangling):
+ if not (isinstance(key_mangling, tuple) and \
+ (len(key_mangling) == 2) and \
+ isinstance(key_mangling[0], str) and \
+ isinstance(key_mangling[1], str)):
+ raise ValueError("key_mangling must be a tuple of two strings")
+
def get_config_dict(self, path=[], effective=False, key_mangling=None,
get_first_key=False, no_multi_convert=False,
no_tag_node_value_mangle=False,
@@ -243,44 +264,37 @@ class Config(object):
Returns: a dict representation of the config under path
"""
+ kwargs = locals().copy()
+ del kwargs['self']
+ del kwargs['no_multi_convert']
+ del kwargs['with_defaults']
+ del kwargs['with_recursive_defaults']
+
lpath = self._make_path(path)
root_dict = self.get_cached_root_dict(effective)
conf_dict = get_sub_dict(root_dict, lpath, get_first_key=get_first_key)
- if key_mangling is None and no_multi_convert and not (with_defaults or with_recursive_defaults):
- return deepcopy(conf_dict)
-
rpath = lpath if get_first_key else lpath[:-1]
if not no_multi_convert:
conf_dict = multi_to_list(rpath, conf_dict)
+ if key_mangling is not None:
+ self.verify_mangling(key_mangling)
+ conf_dict = mangle_dict_keys(conf_dict,
+ key_mangling[0], key_mangling[1],
+ abs_path=rpath,
+ no_tag_node_value_mangle=no_tag_node_value_mangle)
+
if with_defaults or with_recursive_defaults:
+ defaults = self.get_config_defaults(**kwargs,
+ recursive=with_recursive_defaults)
+ conf_dict = config_dict_merge(defaults, conf_dict)
+ else:
conf_dict = ConfigDict(conf_dict)
- conf_dict = merge_defaults(lpath, conf_dict,
- get_first_key=get_first_key,
- recursive=with_recursive_defaults)
- if key_mangling is None:
- return conf_dict
-
- if not (isinstance(key_mangling, tuple) and \
- (len(key_mangling) == 2) and \
- isinstance(key_mangling[0], str) and \
- isinstance(key_mangling[1], str)):
- raise ValueError("key_mangling must be a tuple of two strings")
-
- def mangle(obj):
- return mangle_dict_keys(obj, key_mangling[0], key_mangling[1],
- abs_path=rpath,
- no_tag_node_value_mangle=no_tag_node_value_mangle)
-
- if isinstance(conf_dict, ConfigDict):
- from_defaults = mangle(conf_dict._from_defaults)
- conf_dict = mangle(conf_dict)
- conf_dict._from_defaults = from_defaults
- else:
- conf_dict = mangle(conf_dict)
+ # save optional args for a call to get_config_defaults
+ setattr(conf_dict, '_dict_kwargs', kwargs)
return conf_dict
@@ -294,21 +308,29 @@ class Config(object):
defaults = relative_defaults(lpath, conf_dict,
get_first_key=get_first_key,
recursive=recursive)
- if key_mangling is None:
- return defaults
rpath = lpath if get_first_key else lpath[:-1]
- if not (isinstance(key_mangling, tuple) and \
- (len(key_mangling) == 2) and \
- isinstance(key_mangling[0], str) and \
- isinstance(key_mangling[1], str)):
- raise ValueError("key_mangling must be a tuple of two strings")
-
- defaults = mangle_dict_keys(defaults, key_mangling[0], key_mangling[1], abs_path=rpath, no_tag_node_value_mangle=no_tag_node_value_mangle)
+ if key_mangling is not None:
+ self.verify_mangling(key_mangling)
+ defaults = mangle_dict_keys(defaults,
+ key_mangling[0], key_mangling[1],
+ abs_path=rpath,
+ no_tag_node_value_mangle=no_tag_node_value_mangle)
return defaults
+ def merge_defaults(self, config_dict: ConfigDict, recursive=False):
+ if not isinstance(config_dict, ConfigDict):
+ raise TypeError('argument is not of type ConfigDict')
+ if not config_dict.kwargs:
+ raise ValueError('argument missing metadata')
+
+ args = config_dict.kwargs
+ d = self.get_config_defaults(**args, recursive=recursive)
+ config_dict = config_dict_merge(d, config_dict)
+ return config_dict
+
def is_multi(self, path):
"""
Args:
diff --git a/python/vyos/configdict.py b/python/vyos/configdict.py
index f642d38f2..71a06b625 100644
--- a/python/vyos/configdict.py
+++ b/python/vyos/configdict.py
@@ -20,7 +20,6 @@ import os
import json
from vyos.utils.dict import dict_search
-from vyos.xml import defaults
from vyos.utils.process import cmd
def retrieve_config(path_hash, base_path, config):
@@ -177,24 +176,6 @@ def get_removed_vlans(conf, path, dict):
return dict
-def T2665_set_dhcpv6pd_defaults(config_dict):
- """ Properly configure DHCPv6 default options in the dictionary. If there is
- no DHCPv6 configured at all, it is safe to remove the entire configuration.
- """
- # As this is the same for every interface type it is safe to assume this
- # for ethernet
- pd_defaults = defaults(['interfaces', 'ethernet', 'dhcpv6-options', 'pd'])
-
- # Implant default dictionary for DHCPv6-PD instances
- if dict_search('dhcpv6_options.pd.length', config_dict):
- del config_dict['dhcpv6_options']['pd']['length']
-
- for pd in (dict_search('dhcpv6_options.pd', config_dict) or []):
- config_dict['dhcpv6_options']['pd'][pd] = dict_merge(pd_defaults,
- config_dict['dhcpv6_options']['pd'][pd])
-
- return config_dict
-
def is_member(conf, interface, intftype=None):
"""
Checks if passed interface is member of other interface of specified type.
@@ -263,6 +244,48 @@ def is_mirror_intf(conf, interface, direction=None):
return ret_val
+def has_address_configured(conf, intf):
+ """
+ Checks if interface has an address configured.
+ Checks the following config nodes:
+ 'address', 'ipv6 address eui64', 'ipv6 address autoconf'
+
+ Returns True if interface has address configured, False if it doesn't.
+ """
+ from vyos.ifconfig import Section
+ ret = False
+
+ old_level = conf.get_level()
+ conf.set_level([])
+
+ intfpath = 'interfaces ' + Section.get_config_path(intf)
+ if ( conf.exists(f'{intfpath} address') or
+ conf.exists(f'{intfpath} ipv6 address autoconf') or
+ conf.exists(f'{intfpath} ipv6 address eui64') ):
+ ret = True
+
+ conf.set_level(old_level)
+ return ret
+
+def has_vrf_configured(conf, intf):
+ """
+ Checks if interface has a VRF configured.
+
+ Returns True if interface has VRF configured, False if it doesn't.
+ """
+ from vyos.ifconfig import Section
+ ret = False
+
+ old_level = conf.get_level()
+ conf.set_level([])
+
+ tmp = ['interfaces', Section.get_config_path(intf), 'vrf']
+ if conf.exists(tmp):
+ ret = True
+
+ conf.set_level(old_level)
+ return ret
+
def has_vlan_subinterface_configured(conf, intf):
"""
Checks if interface has an VLAN subinterface configured.
@@ -455,6 +478,10 @@ def get_interface_dict(config, base, ifname='', recursive_defaults=True):
dhcp = is_node_changed(config, base + [ifname, 'dhcp-options'])
if dhcp: dict.update({'dhcp_options_changed' : {}})
+ # Changine interface VRF assignemnts require a DHCP restart, too
+ dhcp = is_node_changed(config, base + [ifname, 'vrf'])
+ if dhcp: dict.update({'dhcp_options_changed' : {}})
+
# Some interfaces come with a source_interface which must also not be part
# of any other bond or bridge interface as it is exclusivly assigned as the
# Kernels "lower" interface to this new "virtual/upper" interface.
diff --git a/python/vyos/configdiff.py b/python/vyos/configdiff.py
index 0caa204c3..1ec2dfafe 100644
--- a/python/vyos/configdiff.py
+++ b/python/vyos/configdiff.py
@@ -22,7 +22,7 @@ from vyos.configdict import list_diff
from vyos.utils.dict import get_sub_dict
from vyos.utils.dict import mangle_dict_keys
from vyos.utils.dict import dict_search_args
-from vyos.xml import defaults
+from vyos.xml_ref import get_defaults
class ConfigDiffError(Exception):
"""
@@ -240,7 +240,9 @@ class ConfigDiff(object):
if self._key_mangling:
ret[k] = self._mangle_dict_keys(ret[k])
if k in target_defaults and not no_defaults:
- default_values = defaults(self._make_path(path))
+ default_values = get_defaults(self._make_path(path),
+ get_first_key=True,
+ recursive=True)
ret[k] = dict_merge(default_values, ret[k])
return ret
@@ -264,7 +266,9 @@ class ConfigDiff(object):
ret[k] = self._mangle_dict_keys(ret[k])
if k in target_defaults and not no_defaults:
- default_values = defaults(self._make_path(path))
+ default_values = get_defaults(self._make_path(path),
+ get_first_key=True,
+ recursive=True)
ret[k] = dict_merge(default_values, ret[k])
return ret
@@ -312,7 +316,9 @@ class ConfigDiff(object):
if self._key_mangling:
ret[k] = self._mangle_dict_keys(ret[k])
if k in target_defaults and not no_defaults:
- default_values = defaults(self._make_path(path))
+ default_values = get_defaults(self._make_path(path),
+ get_first_key=True,
+ recursive=True)
ret[k] = dict_merge(default_values, ret[k])
return ret
@@ -335,7 +341,9 @@ class ConfigDiff(object):
ret[k] = self._mangle_dict_keys(ret[k])
if k in target_defaults and not no_defaults:
- default_values = defaults(self._make_path(path))
+ default_values = get_defaults(self._make_path(path),
+ get_first_key=True,
+ recursive=True)
ret[k] = dict_merge(default_values, ret[k])
return ret
diff --git a/python/vyos/configtree.py b/python/vyos/configtree.py
index e18d9817d..09cfd43d3 100644
--- a/python/vyos/configtree.py
+++ b/python/vyos/configtree.py
@@ -383,14 +383,16 @@ def union(left, right, libpath=LIBPATH):
return tree
def reference_tree_to_json(from_dir, to_file, libpath=LIBPATH):
- __lib = cdll.LoadLibrary(libpath)
- __reference_tree_to_json = __lib.reference_tree_to_json
- __reference_tree_to_json.argtypes = [c_char_p, c_char_p]
- __get_error = __lib.get_error
- __get_error.argtypes = []
- __get_error.restype = c_char_p
-
- res = __reference_tree_to_json(from_dir.encode(), to_file.encode())
+ try:
+ __lib = cdll.LoadLibrary(libpath)
+ __reference_tree_to_json = __lib.reference_tree_to_json
+ __reference_tree_to_json.argtypes = [c_char_p, c_char_p]
+ __get_error = __lib.get_error
+ __get_error.argtypes = []
+ __get_error.restype = c_char_p
+ res = __reference_tree_to_json(from_dir.encode(), to_file.encode())
+ except Exception as e:
+ raise ConfigTreeError(e)
if res == 1:
msg = __get_error().decode()
raise ConfigTreeError(msg)
diff --git a/python/vyos/defaults.py b/python/vyos/defaults.py
index d4ffc249e..a5314790d 100644
--- a/python/vyos/defaults.py
+++ b/python/vyos/defaults.py
@@ -32,7 +32,9 @@ directories = {
'api_schema': f'{base_dir}/services/api/graphql/graphql/schema/',
'api_client_op': f'{base_dir}/services/api/graphql/graphql/client_op/',
'api_templates': f'{base_dir}/services/api/graphql/session/templates/',
- 'vyos_udev_dir' : '/run/udev/vyos'
+ 'vyos_udev_dir' : '/run/udev/vyos',
+ 'isc_dhclient_dir' : '/run/dhclient',
+ 'dhcp6_client_dir' : '/run/dhcp6c',
}
config_status = '/tmp/vyos-config-status'
diff --git a/python/vyos/firewall.py b/python/vyos/firewall.py
index 903cc8535..4aa509fe2 100644
--- a/python/vyos/firewall.py
+++ b/python/vyos/firewall.py
@@ -41,14 +41,19 @@ def fqdn_config_parse(firewall):
firewall['ip6_fqdn'] = {}
for domain, path in dict_search_recursive(firewall, 'fqdn'):
- fw_name = path[1] # name/ipv6-name
- rule = path[3] # rule id
- suffix = path[4][0] # source/destination (1 char)
- set_name = f'{fw_name}_{rule}_{suffix}'
-
- if path[0] == 'name':
+ hook_name = path[1]
+ priority = path[2]
+
+ fw_name = path[2]
+ rule = path[4]
+ suffix = path[5][0]
+ set_name = f'{hook_name}_{priority}_{rule}_{suffix}'
+
+ if (path[0] == 'ipv4') and (path[1] == 'forward' or path[1] == 'input' or path[1] == 'output' or path[1] == 'name'):
firewall['ip_fqdn'][set_name] = domain
- elif path[0] == 'ipv6_name':
+ elif (path[0] == 'ipv6') and (path[1] == 'forward' or path[1] == 'input' or path[1] == 'output' or path[1] == 'name'):
+ if path[1] == 'name':
+ set_name = f'name6_{priority}_{rule}_{suffix}'
firewall['ip6_fqdn'][set_name] = domain
def fqdn_resolve(fqdn, ipv6=False):
@@ -80,7 +85,7 @@ def nft_action(vyos_action):
return 'return'
return vyos_action
-def parse_rule(rule_conf, fw_name, rule_id, ip_name):
+def parse_rule(rule_conf, hook, fw_name, rule_id, ip_name):
output = []
def_suffix = '6' if ip_name == 'ip6' else ''
@@ -129,16 +134,34 @@ def parse_rule(rule_conf, fw_name, rule_id, ip_name):
if 'fqdn' in side_conf:
fqdn = side_conf['fqdn']
+ hook_name = ''
operator = ''
if fqdn[0] == '!':
operator = '!='
- output.append(f'{ip_name} {prefix}addr {operator} @FQDN_{fw_name}_{rule_id}_{prefix}')
+ if hook == 'FWD':
+ hook_name = 'forward'
+ if hook == 'INP':
+ hook_name = 'input'
+ if hook == 'OUT':
+ hook_name = 'output'
+ if hook == 'NAM':
+ hook_name = f'name{def_suffix}'
+ output.append(f'{ip_name} {prefix}addr {operator} @FQDN_{hook_name}_{fw_name}_{rule_id}_{prefix}')
if dict_search_args(side_conf, 'geoip', 'country_code'):
operator = ''
+ hook_name = ''
if dict_search_args(side_conf, 'geoip', 'inverse_match') != None:
operator = '!='
- output.append(f'{ip_name} {prefix}addr {operator} @GEOIP_CC_{fw_name}_{rule_id}')
+ if hook == 'FWD':
+ hook_name = 'forward'
+ if hook == 'INP':
+ hook_name = 'input'
+ if hook == 'OUT':
+ hook_name = 'output'
+ if hook == 'NAM':
+ hook_name = f'name'
+ output.append(f'{ip_name} {prefix}addr {operator} @GEOIP_CC{def_suffix}_{hook_name}_{fw_name}_{rule_id}')
if 'mac_address' in side_conf:
suffix = side_conf["mac_address"]
@@ -324,7 +347,7 @@ def parse_rule(rule_conf, fw_name, rule_id, ip_name):
if 'recent' in rule_conf:
count = rule_conf['recent']['count']
time = rule_conf['recent']['time']
- output.append(f'add @RECENT{def_suffix}_{fw_name}_{rule_id} {{ {ip_name} saddr limit rate over {count}/{time} burst {count} packets }}')
+ output.append(f'add @RECENT{def_suffix}_{hook}_{fw_name}_{rule_id} {{ {ip_name} saddr limit rate over {count}/{time} burst {count} packets }}')
if 'time' in rule_conf:
output.append(parse_time(rule_conf['time']))
@@ -348,7 +371,9 @@ def parse_rule(rule_conf, fw_name, rule_id, ip_name):
output.append(parse_policy_set(rule_conf['set'], def_suffix))
if 'action' in rule_conf:
- output.append(nft_action(rule_conf['action']))
+ # Change action=return to action=action
+ # #output.append(nft_action(rule_conf['action']))
+ output.append(f'{rule_conf["action"]}')
if 'jump' in rule_conf['action']:
target = rule_conf['jump_target']
output.append(f'NAME{def_suffix}_{target}')
@@ -365,7 +390,7 @@ def parse_rule(rule_conf, fw_name, rule_id, ip_name):
else:
output.append('return')
- output.append(f'comment "{fw_name}-{rule_id}"')
+ output.append(f'comment "{hook}-{fw_name}-{rule_id}"')
return " ".join(output)
def parse_tcp_flags(flags):
@@ -493,11 +518,12 @@ def geoip_update(firewall, force=False):
# Map country codes to set names
for codes, path in dict_search_recursive(firewall, 'country_code'):
- set_name = f'GEOIP_CC_{path[1]}_{path[3]}'
- if path[0] == 'name':
+ set_name = f'GEOIP_CC_{path[1]}_{path[2]}_{path[4]}'
+ if ( path[0] == 'ipv4'):
for code in codes:
ipv4_codes.setdefault(code, []).append(set_name)
- elif path[0] == 'ipv6_name':
+ elif ( path[0] == 'ipv6' ):
+ set_name = f'GEOIP_CC6_{path[1]}_{path[2]}_{path[4]}'
for code in codes:
ipv6_codes.setdefault(code, []).append(set_name)
diff --git a/python/vyos/ifconfig/bond.py b/python/vyos/ifconfig/bond.py
index e88f860be..d1d7d48c4 100644
--- a/python/vyos/ifconfig/bond.py
+++ b/python/vyos/ifconfig/bond.py
@@ -18,8 +18,8 @@ import os
from vyos.ifconfig.interface import Interface
from vyos.utils.process import cmd
from vyos.utils.dict import dict_search
-from vyos.validate import assert_list
-from vyos.validate import assert_positive
+from vyos.utils.assertion import assert_list
+from vyos.utils.assertion import assert_positive
@Interface.register
class BondIf(Interface):
diff --git a/python/vyos/ifconfig/bridge.py b/python/vyos/ifconfig/bridge.py
index b103b49d8..b29e71394 100644
--- a/python/vyos/ifconfig/bridge.py
+++ b/python/vyos/ifconfig/bridge.py
@@ -17,8 +17,8 @@ from netifaces import interfaces
import json
from vyos.ifconfig.interface import Interface
-from vyos.validate import assert_boolean
-from vyos.validate import assert_positive
+from vyos.utils.assertion import assert_boolean
+from vyos.utils.assertion import assert_positive
from vyos.utils.process import cmd
from vyos.utils.dict import dict_search
from vyos.configdict import get_vlan_ids
diff --git a/python/vyos/ifconfig/ethernet.py b/python/vyos/ifconfig/ethernet.py
index 4ff044c23..24ce3a803 100644
--- a/python/vyos/ifconfig/ethernet.py
+++ b/python/vyos/ifconfig/ethernet.py
@@ -23,7 +23,7 @@ from vyos.ifconfig.interface import Interface
from vyos.utils.dict import dict_search
from vyos.utils.file import read_file
from vyos.utils.process import run
-from vyos.validate import assert_list
+from vyos.utils.assertion import assert_list
@Interface.register
class EthernetIf(Interface):
diff --git a/python/vyos/ifconfig/interface.py b/python/vyos/ifconfig/interface.py
index efacad902..ddac387e7 100644
--- a/python/vyos/ifconfig/interface.py
+++ b/python/vyos/ifconfig/interface.py
@@ -31,6 +31,7 @@ from vyos import ConfigError
from vyos.configdict import list_diff
from vyos.configdict import dict_merge
from vyos.configdict import get_vlan_ids
+from vyos.defaults import directories
from vyos.template import render
from vyos.utils.network import mac2eui64
from vyos.utils.dict import dict_search
@@ -40,14 +41,14 @@ from vyos.utils.network import get_interface_namespace
from vyos.utils.process import is_systemd_service_active
from vyos.template import is_ipv4
from vyos.template import is_ipv6
-from vyos.validate import is_intf_addr_assigned
-from vyos.validate import is_ipv6_link_local
-from vyos.validate import assert_boolean
-from vyos.validate import assert_list
-from vyos.validate import assert_mac
-from vyos.validate import assert_mtu
-from vyos.validate import assert_positive
-from vyos.validate import assert_range
+from vyos.utils.network import is_intf_addr_assigned
+from vyos.utils.network import is_ipv6_link_local
+from vyos.utils.assertion import assert_boolean
+from vyos.utils.assertion import assert_list
+from vyos.utils.assertion import assert_mac
+from vyos.utils.assertion import assert_mtu
+from vyos.utils.assertion import assert_positive
+from vyos.utils.assertion import assert_range
from vyos.ifconfig.control import Control
from vyos.ifconfig.vrrp import VRRP
@@ -190,6 +191,10 @@ class Interface(Control):
'validate': lambda fwd: assert_range(fwd,0,2),
'location': '/proc/sys/net/ipv6/conf/{ifname}/forwarding',
},
+ 'ipv6_accept_dad': {
+ 'validate': lambda dad: assert_range(dad,0,3),
+ 'location': '/proc/sys/net/ipv6/conf/{ifname}/accept_dad',
+ },
'ipv6_dad_transmits': {
'validate': assert_positive,
'location': '/proc/sys/net/ipv6/conf/{ifname}/dad_transmits',
@@ -259,6 +264,9 @@ class Interface(Control):
'ipv6_forwarding': {
'location': '/proc/sys/net/ipv6/conf/{ifname}/forwarding',
},
+ 'ipv6_accept_dad': {
+ 'location': '/proc/sys/net/ipv6/conf/{ifname}/accept_dad',
+ },
'ipv6_dad_transmits': {
'location': '/proc/sys/net/ipv6/conf/{ifname}/dad_transmits',
},
@@ -853,6 +861,13 @@ class Interface(Control):
return None
return self.set_interface('ipv6_forwarding', forwarding)
+ def set_ipv6_dad_accept(self, dad):
+ """Whether to accept DAD (Duplicate Address Detection)"""
+ tmp = self.get_interface('ipv6_accept_dad')
+ if tmp == dad:
+ return None
+ return self.set_interface('ipv6_accept_dad', dad)
+
def set_ipv6_dad_messages(self, dad):
"""
The amount of Duplicate Address Detection probes to send.
@@ -1248,44 +1263,49 @@ class Interface(Control):
raise ValueError()
ifname = self.ifname
- config_base = r'/var/lib/dhcp/dhclient'
- config_file = f'{config_base}_{ifname}.conf'
- options_file = f'{config_base}_{ifname}.options'
- pid_file = f'{config_base}_{ifname}.pid'
- lease_file = f'{config_base}_{ifname}.leases'
+ config_base = directories['isc_dhclient_dir'] + '/dhclient'
+ dhclient_config_file = f'{config_base}_{ifname}.conf'
+ dhclient_lease_file = f'{config_base}_{ifname}.leases'
+ systemd_override_file = f'/run/systemd/system/dhclient@{ifname}.service.d/10-override.conf'
systemd_service = f'dhclient@{ifname}.service'
+ # Rendered client configuration files require the apsolute config path
+ self.config['isc_dhclient_dir'] = directories['isc_dhclient_dir']
+
# 'up' check is mandatory b/c even if the interface is A/D, as soon as
# the DHCP client is started the interface will be placed in u/u state.
# This is not what we intended to do when disabling an interface.
- if enable and 'disable' not in self._config:
- if dict_search('dhcp_options.host_name', self._config) == None:
+ if enable and 'disable' not in self.config:
+ if dict_search('dhcp_options.host_name', self.config) == None:
# read configured system hostname.
# maybe change to vyos hostd client ???
hostname = 'vyos'
with open('/etc/hostname', 'r') as f:
hostname = f.read().rstrip('\n')
tmp = {'dhcp_options' : { 'host_name' : hostname}}
- self._config = dict_merge(tmp, self._config)
+ self.config = dict_merge(tmp, self.config)
- render(options_file, 'dhcp-client/daemon-options.j2', self._config)
- render(config_file, 'dhcp-client/ipv4.j2', self._config)
+ render(systemd_override_file, 'dhcp-client/override.conf.j2', self.config)
+ render(dhclient_config_file, 'dhcp-client/ipv4.j2', self.config)
+
+ # Reload systemd unit definitons as some options are dynamically generated
+ self._cmd('systemctl daemon-reload')
# When the DHCP client is restarted a brief outage will occur, as
# the old lease is released a new one is acquired (T4203). We will
# only restart DHCP client if it's option changed, or if it's not
# running, but it should be running (e.g. on system startup)
- if 'dhcp_options_changed' in self._config or not is_systemd_service_active(systemd_service):
+ if 'dhcp_options_changed' in self.config or not is_systemd_service_active(systemd_service):
return self._cmd(f'systemctl restart {systemd_service}')
- return None
else:
if is_systemd_service_active(systemd_service):
self._cmd(f'systemctl stop {systemd_service}')
# cleanup old config files
- for file in [config_file, options_file, pid_file, lease_file]:
+ for file in [dhclient_config_file, systemd_override_file, dhclient_lease_file]:
if os.path.isfile(file):
os.remove(file)
+ return None
def set_dhcpv6(self, enable):
"""
@@ -1295,13 +1315,20 @@ class Interface(Control):
raise ValueError()
ifname = self.ifname
- config_file = f'/run/dhcp6c/dhcp6c.{ifname}.conf'
- options_file = f'/run/dhcp6c/dhcp6c.{ifname}.options'
+ config_base = directories['dhcp6_client_dir']
+ config_file = f'{config_base}/dhcp6c.{ifname}.conf'
+ systemd_override_file = f'/run/systemd/system/dhcp6c@{ifname}.service.d/10-override.conf'
systemd_service = f'dhcp6c@{ifname}.service'
- if enable and 'disable' not in self._config:
- render(options_file, 'dhcp-client/dhcp6c_daemon-options.j2', self._config)
- render(config_file, 'dhcp-client/ipv6.j2', self._config)
+ # Rendered client configuration files require the apsolute config path
+ self.config['dhcp6_client_dir'] = directories['dhcp6_client_dir']
+
+ if enable and 'disable' not in self.config:
+ render(systemd_override_file, 'dhcp-client/ipv6.override.conf.j2', self.config)
+ render(config_file, 'dhcp-client/ipv6.j2', self.config)
+
+ # Reload systemd unit definitons as some options are dynamically generated
+ self._cmd('systemctl daemon-reload')
# We must ignore any return codes. This is required to enable
# DHCPv6-PD for interfaces which are yet not up and running.
@@ -1312,26 +1339,28 @@ class Interface(Control):
if os.path.isfile(config_file):
os.remove(config_file)
+ return None
+
def set_mirror_redirect(self):
# Please refer to the document for details
# - https://man7.org/linux/man-pages/man8/tc.8.html
# - https://man7.org/linux/man-pages/man8/tc-mirred.8.html
# Depening if we are the source or the target interface of the port
# mirror we need to setup some variables.
- source_if = self._config['ifname']
+ source_if = self.config['ifname']
mirror_config = None
- if 'mirror' in self._config:
- mirror_config = self._config['mirror']
- if 'is_mirror_intf' in self._config:
- source_if = next(iter(self._config['is_mirror_intf']))
- mirror_config = self._config['is_mirror_intf'][source_if].get('mirror', None)
+ if 'mirror' in self.config:
+ mirror_config = self.config['mirror']
+ if 'is_mirror_intf' in self.config:
+ source_if = next(iter(self.config['is_mirror_intf']))
+ mirror_config = self.config['is_mirror_intf'][source_if].get('mirror', None)
redirect_config = None
# clear existing ingess - ignore errors (e.g. "Error: Cannot find specified
# qdisc on specified device") - we simply cleanup all stuff here
- if not 'traffic_policy' in self._config:
+ if not 'traffic_policy' in self.config:
self._popen(f'tc qdisc del dev {source_if} parent ffff: 2>/dev/null');
self._popen(f'tc qdisc del dev {source_if} parent 1: 2>/dev/null');
@@ -1355,11 +1384,11 @@ class Interface(Control):
if err: print('tc qdisc(filter for mirror port failed')
# Apply interface traffic redirection policy
- elif 'redirect' in self._config:
+ elif 'redirect' in self.config:
_, err = self._popen(f'tc qdisc add dev {source_if} handle ffff: ingress')
if err: print(f'tc qdisc add for redirect failed!')
- target_if = self._config['redirect']
+ target_if = self.config['redirect']
_, err = self._popen(f'tc filter add dev {source_if} parent ffff: protocol '\
f'all prio 10 u32 match u32 0 0 flowid 1:1 action mirred '\
f'egress redirect dev {target_if}')
@@ -1402,7 +1431,7 @@ class Interface(Control):
# Cache the configuration - it will be reused inside e.g. DHCP handler
# XXX: maybe pass the option via __init__ in the future and rename this
# method to apply()?
- self._config = config
+ self.config = config
# Change interface MAC address - re-set to real hardware address (hw-id)
# if custom mac is removed. Skip if bond member.
@@ -1568,10 +1597,17 @@ class Interface(Control):
value = '1' if (tmp != None) else '0'
self.set_ipv6_autoconf(value)
- # IPv6 Duplicate Address Detection (DAD) tries
+ # Whether to accept IPv6 DAD (Duplicate Address Detection) packets
+ tmp = dict_search('ipv6.accept_dad', config)
+ # Not all interface types got this CLI option, but if they do, there
+ # is an XML defaultValue available
+ if (tmp != None): self.set_ipv6_dad_accept(tmp)
+
+ # IPv6 DAD tries
tmp = dict_search('ipv6.dup_addr_detect_transmits', config)
- value = tmp if (tmp != None) else '1'
- self.set_ipv6_dad_messages(value)
+ # Not all interface types got this CLI option, but if they do, there
+ # is an XML defaultValue available
+ if (tmp != None): self.set_ipv6_dad_messages(tmp)
# Delete old IPv6 EUI64 addresses before changing MAC
for addr in (dict_search('ipv6.address.eui64_old', config) or []):
diff --git a/python/vyos/ifconfig/pppoe.py b/python/vyos/ifconfig/pppoe.py
index fd4590beb..febf1452d 100644
--- a/python/vyos/ifconfig/pppoe.py
+++ b/python/vyos/ifconfig/pppoe.py
@@ -14,7 +14,7 @@
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
from vyos.ifconfig.interface import Interface
-from vyos.validate import assert_range
+from vyos.utils.assertion import assert_range
from vyos.utils.network import get_interface_config
@Interface.register
diff --git a/python/vyos/ifconfig/tunnel.py b/python/vyos/ifconfig/tunnel.py
index fb2f38e2b..9ba7b31a6 100644
--- a/python/vyos/ifconfig/tunnel.py
+++ b/python/vyos/ifconfig/tunnel.py
@@ -18,7 +18,7 @@
from vyos.ifconfig.interface import Interface
from vyos.utils.dict import dict_search
-from vyos.validate import assert_list
+from vyos.utils.assertion import assert_list
def enable_to_on(value):
if value == 'enable':
diff --git a/python/vyos/nat.py b/python/vyos/nat.py
index 603fedb9b..b6702f7e2 100644
--- a/python/vyos/nat.py
+++ b/python/vyos/nat.py
@@ -94,6 +94,39 @@ def parse_nat_rule(rule_conf, rule_id, nat_type, ipv6=False):
if options:
translation_str += f' {",".join(options)}'
+ if not ipv6 and 'backend' in rule_conf['load_balance']:
+ hash_input_items = []
+ current_prob = 0
+ nat_map = []
+
+ for trans_addr, addr in rule_conf['load_balance']['backend'].items():
+ item_prob = int(addr['weight'])
+ upper_limit = current_prob + item_prob - 1
+ hash_val = str(current_prob) + '-' + str(upper_limit)
+ element = hash_val + " : " + trans_addr
+ nat_map.append(element)
+ current_prob = current_prob + item_prob
+
+ elements = ' , '.join(nat_map)
+
+ if 'hash' in rule_conf['load_balance'] and 'random' in rule_conf['load_balance']['hash']:
+ translation_str += ' numgen random mod 100 map ' + '{ ' + f'{elements}' + ' }'
+ else:
+ for input_param in rule_conf['load_balance']['hash']:
+ if input_param == 'source-address':
+ param = 'ip saddr'
+ elif input_param == 'destination-address':
+ param = 'ip daddr'
+ elif input_param == 'source-port':
+ prot = rule_conf['protocol']
+ param = f'{prot} sport'
+ elif input_param == 'destination-port':
+ prot = rule_conf['protocol']
+ param = f'{prot} dport'
+ hash_input_items.append(param)
+ hash_input = ' . '.join(hash_input_items)
+ translation_str += f' jhash ' + f'{hash_input}' + ' mod 100 map ' + '{ ' + f'{elements}' + ' }'
+
for target in ['source', 'destination']:
if target not in rule_conf:
continue
diff --git a/python/vyos/pki.py b/python/vyos/pki.py
index cd15e3878..792e24b76 100644
--- a/python/vyos/pki.py
+++ b/python/vyos/pki.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2021 VyOS maintainers and contributors
+# Copyright (C) 2023 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -63,6 +63,18 @@ private_format_map = {
'OpenSSH': serialization.PrivateFormat.OpenSSH
}
+hash_map = {
+ 'sha256': hashes.SHA256,
+ 'sha384': hashes.SHA384,
+ 'sha512': hashes.SHA512,
+}
+
+def get_certificate_fingerprint(cert, hash):
+ hash_algorithm = hash_map[hash]()
+ fp = cert.fingerprint(hash_algorithm)
+
+ return fp.hex(':').upper()
+
def encode_certificate(cert):
return cert.public_bytes(encoding=serialization.Encoding.PEM).decode('utf-8')
diff --git a/python/vyos/qos/base.py b/python/vyos/qos/base.py
index 6c5a3d79c..d8bbfe970 100644
--- a/python/vyos/qos/base.py
+++ b/python/vyos/qos/base.py
@@ -107,7 +107,8 @@ class QoSBase:
queue_limit = dict_search('queue_limit', config)
for ii in range(1, 4):
- tmp = f'tc qdisc replace dev {self._interface} parent {handle:x}:{ii:x} pfifo limit {queue_limit}'
+ tmp = f'tc qdisc replace dev {self._interface} parent {handle:x}:{ii:x} pfifo'
+ if queue_limit: tmp += f' limit {queue_limit}'
self._cmd(tmp)
elif queue_type == 'fair-queue':
@@ -297,6 +298,27 @@ class QoSBase:
filter_cmd += f' flowid {self._parent:x}:{cls:x}'
self._cmd(filter_cmd)
+ if any(tmp in ['exceed', 'bandwidth', 'burst'] for tmp in cls_config):
+ filter_cmd += f' action police'
+
+ if 'exceed' in cls_config:
+ action = cls_config['exceed']
+ filter_cmd += f' conform-exceed {action}'
+ if 'not_exceed' in cls_config:
+ action = cls_config['not_exceed']
+ filter_cmd += f'/{action}'
+
+ if 'bandwidth' in cls_config:
+ rate = self._rate_convert(cls_config['bandwidth'])
+ filter_cmd += f' rate {rate}'
+
+ if 'burst' in cls_config:
+ burst = cls_config['burst']
+ filter_cmd += f' burst {burst}'
+ cls = int(cls)
+ filter_cmd += f' flowid {self._parent:x}:{cls:x}'
+ self._cmd(filter_cmd)
+
else:
filter_cmd += ' basic'
diff --git a/python/vyos/template.py b/python/vyos/template.py
index 7d1c3970f..e167488c6 100644
--- a/python/vyos/template.py
+++ b/python/vyos/template.py
@@ -420,7 +420,7 @@ def get_dhcp_router(interface):
Returns False of no router is found, returns the IP address as string if
a router is found.
"""
- lease_file = f'/var/lib/dhcp/dhclient_{interface}.leases'
+ lease_file = directories['isc_dhclient_dir'] + f'/dhclient_{interface}.leases'
if not os.path.exists(lease_file):
return None
@@ -574,9 +574,9 @@ def nft_action(vyos_action):
return vyos_action
@register_filter('nft_rule')
-def nft_rule(rule_conf, fw_name, rule_id, ip_name='ip'):
+def nft_rule(rule_conf, fw_hook, fw_name, rule_id, ip_name='ip'):
from vyos.firewall import parse_rule
- return parse_rule(rule_conf, fw_name, rule_id, ip_name)
+ return parse_rule(rule_conf, fw_hook, fw_name, rule_id, ip_name)
@register_filter('nft_default_rule')
def nft_default_rule(fw_conf, fw_name, ipv6=False):
@@ -587,7 +587,8 @@ def nft_default_rule(fw_conf, fw_name, ipv6=False):
action_suffix = default_action[:1].upper()
output.append(f'log prefix "[{fw_name[:19]}-default-{action_suffix}]"')
- output.append(nft_action(default_action))
+ #output.append(nft_action(default_action))
+ output.append(f'{default_action}')
if 'default_jump_target' in fw_conf:
target = fw_conf['default_jump_target']
def_suffix = '6' if ipv6 else ''
diff --git a/python/vyos/utils/__init__.py b/python/vyos/utils/__init__.py
index f2783113a..12ef2d3b8 100644
--- a/python/vyos/utils/__init__.py
+++ b/python/vyos/utils/__init__.py
@@ -13,6 +13,7 @@
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+from vyos.utils import assertion
from vyos.utils import auth
from vyos.utils import boot
from vyos.utils import commit
diff --git a/python/vyos/utils/assertion.py b/python/vyos/utils/assertion.py
new file mode 100644
index 000000000..1aaa54dff
--- /dev/null
+++ b/python/vyos/utils/assertion.py
@@ -0,0 +1,81 @@
+# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+def assert_boolean(b):
+ if int(b) not in (0, 1):
+ raise ValueError(f'Value {b} out of range')
+
+def assert_range(value, lower=0, count=3):
+ if int(value, 16) not in range(lower, lower+count):
+ raise ValueError("Value out of range")
+
+def assert_list(s, l):
+ if s not in l:
+ o = ' or '.join([f'"{n}"' for n in l])
+ raise ValueError(f'state must be {o}, got {s}')
+
+def assert_number(n):
+ if not str(n).isnumeric():
+ raise ValueError(f'{n} must be a number')
+
+def assert_positive(n, smaller=0):
+ assert_number(n)
+ if int(n) < smaller:
+ raise ValueError(f'{n} is smaller than {smaller}')
+
+def assert_mtu(mtu, ifname):
+ assert_number(mtu)
+
+ import json
+ from vyos.utils.process import cmd
+ out = cmd(f'ip -j -d link show dev {ifname}')
+ # [{"ifindex":2,"ifname":"eth0","flags":["BROADCAST","MULTICAST","UP","LOWER_UP"],"mtu":1500,"qdisc":"pfifo_fast","operstate":"UP","linkmode":"DEFAULT","group":"default","txqlen":1000,"link_type":"ether","address":"08:00:27:d9:5b:04","broadcast":"ff:ff:ff:ff:ff:ff","promiscuity":0,"min_mtu":46,"max_mtu":16110,"inet6_addr_gen_mode":"none","num_tx_queues":1,"num_rx_queues":1,"gso_max_size":65536,"gso_max_segs":65535}]
+ parsed = json.loads(out)[0]
+ min_mtu = int(parsed.get('min_mtu', '0'))
+ # cur_mtu = parsed.get('mtu',0),
+ max_mtu = int(parsed.get('max_mtu', '0'))
+ cur_mtu = int(mtu)
+
+ if (min_mtu and cur_mtu < min_mtu) or cur_mtu < 68:
+ raise ValueError(f'MTU is too small for interface "{ifname}": {mtu} < {min_mtu}')
+ if (max_mtu and cur_mtu > max_mtu) or cur_mtu > 65536:
+ raise ValueError(f'MTU is too small for interface "{ifname}": {mtu} > {max_mtu}')
+
+def assert_mac(m):
+ split = m.split(':')
+ size = len(split)
+
+ # a mac address consits out of 6 octets
+ if size != 6:
+ raise ValueError(f'wrong number of MAC octets ({size}): {m}')
+
+ octets = []
+ try:
+ for octet in split:
+ octets.append(int(octet, 16))
+ except ValueError:
+ raise ValueError(f'invalid hex number "{octet}" in : {m}')
+
+ # validate against the first mac address byte if it's a multicast
+ # address
+ if octets[0] & 1:
+ raise ValueError(f'{m} is a multicast MAC address')
+
+ # overall mac address is not allowed to be 00:00:00:00:00:00
+ if sum(octets) == 0:
+ raise ValueError('00:00:00:00:00:00 is not a valid MAC address')
+
+ if octets[:5] == (0, 0, 94, 0, 1):
+ raise ValueError(f'{m} is a VRRP MAC address')
diff --git a/python/vyos/utils/convert.py b/python/vyos/utils/convert.py
index ec2333ef0..9a8a1ff7d 100644
--- a/python/vyos/utils/convert.py
+++ b/python/vyos/utils/convert.py
@@ -144,32 +144,54 @@ def mac_to_eui64(mac, prefix=None):
except: # pylint: disable=bare-except
return
-def convert_data(data):
- """Convert multiple types of data to types usable in CLI
+
+def convert_data(data) -> dict | list | tuple | str | int | float | bool | None:
+ """Filter and convert multiple types of data to types usable in CLI/API
+
+ WARNING: Must not be used for anything except formatting output for API or CLI
+
+ On the output allowed everything supported in JSON.
Args:
- data (str | bytes | list | OrderedDict): input data
+ data (Any): input data
Returns:
- str | list | dict: converted data
+ dict | list | tuple | str | int | float | bool | None: converted data
"""
from base64 import b64encode
- from collections import OrderedDict
- if isinstance(data, str):
+ # return original data for types which do not require conversion
+ if isinstance(data, str | int | float | bool | None):
return data
- if isinstance(data, bytes):
- try:
- return data.decode()
- except UnicodeDecodeError:
- return b64encode(data).decode()
+
if isinstance(data, list):
list_tmp = []
for item in data:
list_tmp.append(convert_data(item))
return list_tmp
- if isinstance(data, OrderedDict):
+
+ if isinstance(data, tuple):
+ list_tmp = list(data)
+ tuple_tmp = tuple(convert_data(list_tmp))
+ return tuple_tmp
+
+ if isinstance(data, bytes | bytearray):
+ try:
+ return data.decode()
+ except UnicodeDecodeError:
+ return b64encode(data).decode()
+
+ if isinstance(data, set | frozenset):
+ list_tmp = convert_data(list(data))
+ return list_tmp
+
+ if isinstance(data, dict):
dict_tmp = {}
for key, value in data.items():
dict_tmp[key] = convert_data(value)
return dict_tmp
+
+ # do not return anything for other types
+ # which cannot be converted to JSON
+ # for example: complex | range | memoryview
+ return
diff --git a/python/vyos/utils/network.py b/python/vyos/utils/network.py
index 3786caf26..3f9a3ef4b 100644
--- a/python/vyos/utils/network.py
+++ b/python/vyos/utils/network.py
@@ -13,7 +13,15 @@
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
-import os
+def _are_same_ip(one, two):
+ from socket import AF_INET
+ from socket import AF_INET6
+ from socket import inet_pton
+ from vyos.template import is_ipv4
+ # compare the binary representation of the IP
+ f_one = AF_INET if is_ipv4(one) else AF_INET6
+ s_two = AF_INET if is_ipv4(two) else AF_INET6
+ return inet_pton(f_one, one) == inet_pton(f_one, two)
def get_protocol_by_name(protocol_name):
"""Get protocol number by protocol name
@@ -48,6 +56,7 @@ def get_interface_config(interface):
""" Returns the used encapsulation protocol for given interface.
If interface does not exist, None is returned.
"""
+ import os
if not os.path.exists(f'/sys/class/net/{interface}'):
return None
from json import loads
@@ -59,6 +68,7 @@ def get_interface_address(interface):
""" Returns the used encapsulation protocol for given interface.
If interface does not exist, None is returned.
"""
+ import os
if not os.path.exists(f'/sys/class/net/{interface}'):
return None
from json import loads
@@ -85,7 +95,6 @@ def get_interface_namespace(iface):
if iface == tmp["ifname"]:
return netns
-
def is_wwan_connected(interface):
""" Determine if a given WWAN interface, e.g. wwan0 is connected to the
carrier network or not """
@@ -110,6 +119,7 @@ def is_wwan_connected(interface):
def get_bridge_fdb(interface):
""" Returns the forwarding database entries for a given interface """
+ import os
if not os.path.exists(f'/sys/class/net/{interface}'):
return None
from json import loads
@@ -211,3 +221,175 @@ def is_listen_port_bind_service(port: int, service: str) -> bool:
if service == pid_name and port == pid_port:
return True
return False
+
+def is_ipv6_link_local(addr):
+ """ Check if addrsss is an IPv6 link-local address. Returns True/False """
+ from ipaddress import ip_interface
+ from vyos.template import is_ipv6
+ addr = addr.split('%')[0]
+ if is_ipv6(addr):
+ if ip_interface(addr).is_link_local:
+ return True
+
+ return False
+
+def is_addr_assigned(ip_address, vrf=None) -> bool:
+ """ Verify if the given IPv4/IPv6 address is assigned to any interface """
+ from netifaces import interfaces
+ from vyos.utils.network import get_interface_config
+ from vyos.utils.dict import dict_search
+
+ for interface in interfaces():
+ # Check if interface belongs to the requested VRF, if this is not the
+ # case there is no need to proceed with this data set - continue loop
+ # with next element
+ tmp = get_interface_config(interface)
+ if dict_search('master', tmp) != vrf:
+ continue
+
+ if is_intf_addr_assigned(interface, ip_address):
+ return True
+
+ return False
+
+def is_intf_addr_assigned(intf, address) -> bool:
+ """
+ Verify if the given IPv4/IPv6 address is assigned to specific interface.
+ It can check both a single IP address (e.g. 192.0.2.1 or a assigned CIDR
+ address 192.0.2.1/24.
+ """
+ from vyos.template import is_ipv4
+
+ from netifaces import ifaddresses
+ from netifaces import AF_INET
+ from netifaces import AF_INET6
+
+ # check if the requested address type is configured at all
+ # {
+ # 17: [{'addr': '08:00:27:d9:5b:04', 'broadcast': 'ff:ff:ff:ff:ff:ff'}],
+ # 2: [{'addr': '10.0.2.15', 'netmask': '255.255.255.0', 'broadcast': '10.0.2.255'}],
+ # 10: [{'addr': 'fe80::a00:27ff:fed9:5b04%eth0', 'netmask': 'ffff:ffff:ffff:ffff::'}]
+ # }
+ try:
+ addresses = ifaddresses(intf)
+ except ValueError as e:
+ print(e)
+ return False
+
+ # determine IP version (AF_INET or AF_INET6) depending on passed address
+ addr_type = AF_INET if is_ipv4(address) else AF_INET6
+
+ # Check every IP address on this interface for a match
+ netmask = None
+ if '/' in address:
+ address, netmask = address.split('/')
+ for ip in addresses.get(addr_type, []):
+ # ip can have the interface name in the 'addr' field, we need to remove it
+ # {'addr': 'fe80::a00:27ff:fec5:f821%eth2', 'netmask': 'ffff:ffff:ffff:ffff::'}
+ ip_addr = ip['addr'].split('%')[0]
+
+ if not _are_same_ip(address, ip_addr):
+ continue
+
+ # we do not have a netmask to compare against, they are the same
+ if not netmask:
+ return True
+
+ prefixlen = ''
+ if is_ipv4(ip_addr):
+ prefixlen = sum([bin(int(_)).count('1') for _ in ip['netmask'].split('.')])
+ else:
+ prefixlen = sum([bin(int(_,16)).count('1') for _ in ip['netmask'].split('/')[0].split(':') if _])
+
+ if str(prefixlen) == netmask:
+ return True
+
+ return False
+
+def is_loopback_addr(addr):
+ """ Check if supplied IPv4/IPv6 address is a loopback address """
+ from ipaddress import ip_address
+ return ip_address(addr).is_loopback
+
+def is_wireguard_key_pair(private_key: str, public_key:str) -> bool:
+ """
+ Checks if public/private keys are keypair
+ :param private_key: Wireguard private key
+ :type private_key: str
+ :param public_key: Wireguard public key
+ :type public_key: str
+ :return: If public/private keys are keypair returns True else False
+ :rtype: bool
+ """
+ from vyos.utils.process import cmd
+ gen_public_key = cmd('wg pubkey', input=private_key)
+ if gen_public_key == public_key:
+ return True
+ else:
+ return False
+
+def is_subnet_connected(subnet, primary=False):
+ """
+ Verify is the given IPv4/IPv6 subnet is connected to any interface on this
+ system.
+
+ primary check if the subnet is reachable via the primary IP address of this
+ interface, or in other words has a broadcast address configured. ISC DHCP
+ for instance will complain if it should listen on non broadcast interfaces.
+
+ Return True/False
+ """
+ from ipaddress import ip_address
+ from ipaddress import ip_network
+
+ from netifaces import ifaddresses
+ from netifaces import interfaces
+ from netifaces import AF_INET
+ from netifaces import AF_INET6
+
+ from vyos.template import is_ipv6
+
+ # determine IP version (AF_INET or AF_INET6) depending on passed address
+ addr_type = AF_INET
+ if is_ipv6(subnet):
+ addr_type = AF_INET6
+
+ for interface in interfaces():
+ # check if the requested address type is configured at all
+ if addr_type not in ifaddresses(interface).keys():
+ continue
+
+ # An interface can have multiple addresses, but some software components
+ # only support the primary address :(
+ if primary:
+ ip = ifaddresses(interface)[addr_type][0]['addr']
+ if ip_address(ip) in ip_network(subnet):
+ return True
+ else:
+ # Check every assigned IP address if it is connected to the subnet
+ # in question
+ for ip in ifaddresses(interface)[addr_type]:
+ # remove interface extension (e.g. %eth0) that gets thrown on the end of _some_ addrs
+ addr = ip['addr'].split('%')[0]
+ if ip_address(addr) in ip_network(subnet):
+ return True
+
+ return False
+
+def is_afi_configured(interface, afi):
+ """ Check if given address family is configured, or in other words - an IP
+ address is assigned to the interface. """
+ from netifaces import ifaddresses
+ from netifaces import AF_INET
+ from netifaces import AF_INET6
+
+ if afi not in [AF_INET, AF_INET6]:
+ raise ValueError('Address family must be in [AF_INET, AF_INET6]')
+
+ try:
+ addresses = ifaddresses(interface)
+ except ValueError as e:
+ print(e)
+ return False
+
+ return afi in addresses
diff --git a/python/vyos/validate.py b/python/vyos/validate.py
deleted file mode 100644
index b149b258f..000000000
--- a/python/vyos/validate.py
+++ /dev/null
@@ -1,321 +0,0 @@
-# Copyright 2018-2023 VyOS maintainers and contributors <maintainers@vyos.io>
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library. If not, see <http://www.gnu.org/licenses/>.
-
-# Important note when you are adding new validation functions:
-#
-# The Control class will analyse the signature of the function in this file
-# and will build the parameters to be passed to it.
-#
-# The parameter names "ifname" and "self" will get the Interface name and class
-# parameters with default will be left unset
-# all other paramters will receive the value to check
-
-def is_ipv6_link_local(addr):
- """ Check if addrsss is an IPv6 link-local address. Returns True/False """
- from ipaddress import ip_interface
- from vyos.template import is_ipv6
- addr = addr.split('%')[0]
- if is_ipv6(addr):
- if ip_interface(addr).is_link_local:
- return True
-
- return False
-
-def _are_same_ip(one, two):
- from socket import AF_INET
- from socket import AF_INET6
- from socket import inet_pton
- from vyos.template import is_ipv4
- # compare the binary representation of the IP
- f_one = AF_INET if is_ipv4(one) else AF_INET6
- s_two = AF_INET if is_ipv4(two) else AF_INET6
- return inet_pton(f_one, one) == inet_pton(f_one, two)
-
-def is_intf_addr_assigned(intf, address) -> bool:
- """
- Verify if the given IPv4/IPv6 address is assigned to specific interface.
- It can check both a single IP address (e.g. 192.0.2.1 or a assigned CIDR
- address 192.0.2.1/24.
- """
- from vyos.template import is_ipv4
-
- from netifaces import ifaddresses
- from netifaces import AF_INET
- from netifaces import AF_INET6
-
- # check if the requested address type is configured at all
- # {
- # 17: [{'addr': '08:00:27:d9:5b:04', 'broadcast': 'ff:ff:ff:ff:ff:ff'}],
- # 2: [{'addr': '10.0.2.15', 'netmask': '255.255.255.0', 'broadcast': '10.0.2.255'}],
- # 10: [{'addr': 'fe80::a00:27ff:fed9:5b04%eth0', 'netmask': 'ffff:ffff:ffff:ffff::'}]
- # }
- try:
- addresses = ifaddresses(intf)
- except ValueError as e:
- print(e)
- return False
-
- # determine IP version (AF_INET or AF_INET6) depending on passed address
- addr_type = AF_INET if is_ipv4(address) else AF_INET6
-
- # Check every IP address on this interface for a match
- netmask = None
- if '/' in address:
- address, netmask = address.split('/')
- for ip in addresses.get(addr_type, []):
- # ip can have the interface name in the 'addr' field, we need to remove it
- # {'addr': 'fe80::a00:27ff:fec5:f821%eth2', 'netmask': 'ffff:ffff:ffff:ffff::'}
- ip_addr = ip['addr'].split('%')[0]
-
- if not _are_same_ip(address, ip_addr):
- continue
-
- # we do not have a netmask to compare against, they are the same
- if not netmask:
- return True
-
- prefixlen = ''
- if is_ipv4(ip_addr):
- prefixlen = sum([bin(int(_)).count('1') for _ in ip['netmask'].split('.')])
- else:
- prefixlen = sum([bin(int(_,16)).count('1') for _ in ip['netmask'].split('/')[0].split(':') if _])
-
- if str(prefixlen) == netmask:
- return True
-
- return False
-
-def is_addr_assigned(ip_address, vrf=None) -> bool:
- """ Verify if the given IPv4/IPv6 address is assigned to any interface """
- from netifaces import interfaces
- from vyos.utils.network import get_interface_config
- from vyos.utils.dict import dict_search
-
- for interface in interfaces():
- # Check if interface belongs to the requested VRF, if this is not the
- # case there is no need to proceed with this data set - continue loop
- # with next element
- tmp = get_interface_config(interface)
- if dict_search('master', tmp) != vrf:
- continue
-
- if is_intf_addr_assigned(interface, ip_address):
- return True
-
- return False
-
-def is_afi_configured(interface, afi):
- """ Check if given address family is configured, or in other words - an IP
- address is assigned to the interface. """
- from netifaces import ifaddresses
- from netifaces import AF_INET
- from netifaces import AF_INET6
-
- if afi not in [AF_INET, AF_INET6]:
- raise ValueError('Address family must be in [AF_INET, AF_INET6]')
-
- try:
- addresses = ifaddresses(interface)
- except ValueError as e:
- print(e)
- return False
-
- return afi in addresses
-
-def is_loopback_addr(addr):
- """ Check if supplied IPv4/IPv6 address is a loopback address """
- from ipaddress import ip_address
- return ip_address(addr).is_loopback
-
-def is_subnet_connected(subnet, primary=False):
- """
- Verify is the given IPv4/IPv6 subnet is connected to any interface on this
- system.
-
- primary check if the subnet is reachable via the primary IP address of this
- interface, or in other words has a broadcast address configured. ISC DHCP
- for instance will complain if it should listen on non broadcast interfaces.
-
- Return True/False
- """
- from ipaddress import ip_address
- from ipaddress import ip_network
-
- from netifaces import ifaddresses
- from netifaces import interfaces
- from netifaces import AF_INET
- from netifaces import AF_INET6
-
- from vyos.template import is_ipv6
-
- # determine IP version (AF_INET or AF_INET6) depending on passed address
- addr_type = AF_INET
- if is_ipv6(subnet):
- addr_type = AF_INET6
-
- for interface in interfaces():
- # check if the requested address type is configured at all
- if addr_type not in ifaddresses(interface).keys():
- continue
-
- # An interface can have multiple addresses, but some software components
- # only support the primary address :(
- if primary:
- ip = ifaddresses(interface)[addr_type][0]['addr']
- if ip_address(ip) in ip_network(subnet):
- return True
- else:
- # Check every assigned IP address if it is connected to the subnet
- # in question
- for ip in ifaddresses(interface)[addr_type]:
- # remove interface extension (e.g. %eth0) that gets thrown on the end of _some_ addrs
- addr = ip['addr'].split('%')[0]
- if ip_address(addr) in ip_network(subnet):
- return True
-
- return False
-
-
-def assert_boolean(b):
- if int(b) not in (0, 1):
- raise ValueError(f'Value {b} out of range')
-
-
-def assert_range(value, lower=0, count=3):
- if int(value, 16) not in range(lower, lower+count):
- raise ValueError("Value out of range")
-
-
-def assert_list(s, l):
- if s not in l:
- o = ' or '.join([f'"{n}"' for n in l])
- raise ValueError(f'state must be {o}, got {s}')
-
-
-def assert_number(n):
- if not str(n).isnumeric():
- raise ValueError(f'{n} must be a number')
-
-
-def assert_positive(n, smaller=0):
- assert_number(n)
- if int(n) < smaller:
- raise ValueError(f'{n} is smaller than {smaller}')
-
-
-def assert_mtu(mtu, ifname):
- assert_number(mtu)
-
- import json
- from vyos.utils.process import cmd
- out = cmd(f'ip -j -d link show dev {ifname}')
- # [{"ifindex":2,"ifname":"eth0","flags":["BROADCAST","MULTICAST","UP","LOWER_UP"],"mtu":1500,"qdisc":"pfifo_fast","operstate":"UP","linkmode":"DEFAULT","group":"default","txqlen":1000,"link_type":"ether","address":"08:00:27:d9:5b:04","broadcast":"ff:ff:ff:ff:ff:ff","promiscuity":0,"min_mtu":46,"max_mtu":16110,"inet6_addr_gen_mode":"none","num_tx_queues":1,"num_rx_queues":1,"gso_max_size":65536,"gso_max_segs":65535}]
- parsed = json.loads(out)[0]
- min_mtu = int(parsed.get('min_mtu', '0'))
- # cur_mtu = parsed.get('mtu',0),
- max_mtu = int(parsed.get('max_mtu', '0'))
- cur_mtu = int(mtu)
-
- if (min_mtu and cur_mtu < min_mtu) or cur_mtu < 68:
- raise ValueError(f'MTU is too small for interface "{ifname}": {mtu} < {min_mtu}')
- if (max_mtu and cur_mtu > max_mtu) or cur_mtu > 65536:
- raise ValueError(f'MTU is too small for interface "{ifname}": {mtu} > {max_mtu}')
-
-
-def assert_mac(m):
- split = m.split(':')
- size = len(split)
-
- # a mac address consits out of 6 octets
- if size != 6:
- raise ValueError(f'wrong number of MAC octets ({size}): {m}')
-
- octets = []
- try:
- for octet in split:
- octets.append(int(octet, 16))
- except ValueError:
- raise ValueError(f'invalid hex number "{octet}" in : {m}')
-
- # validate against the first mac address byte if it's a multicast
- # address
- if octets[0] & 1:
- raise ValueError(f'{m} is a multicast MAC address')
-
- # overall mac address is not allowed to be 00:00:00:00:00:00
- if sum(octets) == 0:
- raise ValueError('00:00:00:00:00:00 is not a valid MAC address')
-
- if octets[:5] == (0, 0, 94, 0, 1):
- raise ValueError(f'{m} is a VRRP MAC address')
-
-def has_address_configured(conf, intf):
- """
- Checks if interface has an address configured.
- Checks the following config nodes:
- 'address', 'ipv6 address eui64', 'ipv6 address autoconf'
-
- Returns True if interface has address configured, False if it doesn't.
- """
- from vyos.ifconfig import Section
- ret = False
-
- old_level = conf.get_level()
- conf.set_level([])
-
- intfpath = 'interfaces ' + Section.get_config_path(intf)
- if ( conf.exists(f'{intfpath} address') or
- conf.exists(f'{intfpath} ipv6 address autoconf') or
- conf.exists(f'{intfpath} ipv6 address eui64') ):
- ret = True
-
- conf.set_level(old_level)
- return ret
-
-def has_vrf_configured(conf, intf):
- """
- Checks if interface has a VRF configured.
-
- Returns True if interface has VRF configured, False if it doesn't.
- """
- from vyos.ifconfig import Section
- ret = False
-
- old_level = conf.get_level()
- conf.set_level([])
-
- tmp = ['interfaces', Section.get_config_path(intf), 'vrf']
- if conf.exists(tmp):
- ret = True
-
- conf.set_level(old_level)
- return ret
-
-def is_wireguard_key_pair(private_key: str, public_key:str) -> bool:
- """
- Checks if public/private keys are keypair
- :param private_key: Wireguard private key
- :type private_key: str
- :param public_key: Wireguard public key
- :type public_key: str
- :return: If public/private keys are keypair returns True else False
- :rtype: bool
- """
- from vyos.utils.process import cmd
- gen_public_key = cmd('wg pubkey', input=private_key)
- if gen_public_key == public_key:
- return True
- else:
- return False
diff --git a/python/vyos/xml_ref/__init__.py b/python/vyos/xml_ref/__init__.py
index ad2130dca..bf434865d 100644
--- a/python/vyos/xml_ref/__init__.py
+++ b/python/vyos/xml_ref/__init__.py
@@ -13,8 +13,12 @@
# You should have received a copy of the GNU Lesser General Public License
# along with this library. If not, see <http://www.gnu.org/licenses/>.
+from typing import Optional, Union, TYPE_CHECKING
from vyos.xml_ref import definition
+if TYPE_CHECKING:
+ from vyos.config import ConfigDict
+
def load_reference(cache=[]):
if cache:
return cache[0]
@@ -23,11 +27,15 @@ def load_reference(cache=[]):
try:
from vyos.xml_ref.cache import reference
- xml.define(reference)
- cache.append(xml)
except Exception:
raise ImportError('no xml reference cache !!')
+ if not reference:
+ raise ValueError('empty xml reference cache !!')
+
+ xml.define(reference)
+ cache.append(xml)
+
return xml
def is_tag(path: list) -> bool:
@@ -48,12 +56,12 @@ def is_leaf(path: list) -> bool:
def cli_defined(path: list, node: str, non_local=False) -> bool:
return load_reference().cli_defined(path, node, non_local=non_local)
-def from_source(d: dict, path: list) -> bool:
- return load_reference().from_source(d, path)
-
def component_version() -> dict:
return load_reference().component_version()
+def default_value(path: list) -> Optional[Union[str, list]]:
+ return load_reference().default_value(path)
+
def multi_to_list(rpath: list, conf: dict) -> dict:
return load_reference().multi_to_list(rpath, conf)
@@ -68,8 +76,8 @@ def relative_defaults(rpath: list, conf: dict, get_first_key=False,
get_first_key=get_first_key,
recursive=recursive)
-def merge_defaults(path: list, conf: dict, get_first_key=False,
- recursive=False) -> dict:
- return load_reference().merge_defaults(path, conf,
- get_first_key=get_first_key,
- recursive=recursive)
+def from_source(d: dict, path: list) -> bool:
+ return definition.from_source(d, path)
+
+def ext_dict_merge(source: dict, destination: Union[dict, 'ConfigDict']):
+ return definition.ext_dict_merge(source, destination)
diff --git a/python/vyos/xml_ref/definition.py b/python/vyos/xml_ref/definition.py
index d95d580e2..c90c5ddbc 100644
--- a/python/vyos/xml_ref/definition.py
+++ b/python/vyos/xml_ref/definition.py
@@ -20,6 +20,45 @@ from typing import Optional, Union, Any, TYPE_CHECKING
if TYPE_CHECKING:
from vyos.config import ConfigDict
+def set_source_recursive(o: Union[dict, str, list], b: bool):
+ d = {}
+ if not isinstance(o, dict):
+ d = {'_source': b}
+ else:
+ for k, v in o.items():
+ d[k] = set_source_recursive(v, b)
+ d |= {'_source': b}
+ return d
+
+def source_dict_merge(src: dict, dest: dict):
+ from copy import deepcopy
+ dst = deepcopy(dest)
+ from_src = {}
+
+ for key, value in src.items():
+ if key not in dst:
+ dst[key] = value
+ from_src[key] = set_source_recursive(value, True)
+ elif isinstance(src[key], dict):
+ dst[key], f = source_dict_merge(src[key], dst[key])
+ f |= {'_source': False}
+ from_src[key] = f
+
+ return dst, from_src
+
+def ext_dict_merge(src: dict, dest: Union[dict, 'ConfigDict']):
+ d, f = source_dict_merge(src, dest)
+ if hasattr(d, '_from_defaults'):
+ setattr(d, '_from_defaults', f)
+ return d
+
+def from_source(d: dict, path: list) -> bool:
+ for key in path:
+ d = d[key] if key in d else {}
+ if not d or not isinstance(d, dict):
+ return False
+ return d.get('_source', False)
+
class Xml:
def __init__(self):
self.ref = {}
@@ -123,7 +162,7 @@ class Xml:
def component_version(self) -> dict:
d = {}
- for k, v in self.ref['component_version']:
+ for k, v in self.ref['component_version'].items():
d[k] = int(v)
return d
@@ -153,6 +192,15 @@ class Xml:
return default.split()
return default
+ def default_value(self, path: list) -> Optional[Union[str, list]]:
+ d = self._get_ref_path(path)
+ default = self._get_default_value(d)
+ if default is None:
+ return None
+ if self._is_multi_node(d) or self._is_tag_node(d):
+ return default.split()
+ return default
+
def get_defaults(self, path: list, get_first_key=False, recursive=False) -> dict:
"""Return dict containing default values below path
@@ -212,43 +260,6 @@ class Xml:
return False
return True
- def _set_source_recursive(self, o: Union[dict, str, list], b: bool):
- d = {}
- if not isinstance(o, dict):
- d = {'_source': b}
- else:
- for k, v in o.items():
- d[k] = self._set_source_recursive(v, b)
- d |= {'_source': b}
- return d
-
- # use local copy of function in module configdict, to avoid circular
- # import
- #
- # extend dict_merge to keep track of keys only in source
- def _dict_merge(self, source, destination):
- from copy import deepcopy
- dest = deepcopy(destination)
- from_source = {}
-
- for key, value in source.items():
- if key not in dest:
- dest[key] = value
- from_source[key] = self._set_source_recursive(value, True)
- elif isinstance(source[key], dict):
- dest[key], f = self._dict_merge(source[key], dest[key])
- f |= {'_source': False}
- from_source[key] = f
-
- return dest, from_source
-
- def from_source(self, d: dict, path: list) -> bool:
- for key in path:
- d = d[key] if key in d else {}
- if not d or not isinstance(d, dict):
- return False
- return d.get('_source', False)
-
def _relative_defaults(self, rpath: list, conf: dict, recursive=False) -> dict:
res: dict = {}
res = self.get_defaults(rpath, recursive=recursive,
@@ -289,17 +300,3 @@ class Xml:
res = {}
return res
-
- def merge_defaults(self, path: list, conf: Union[dict, 'ConfigDict'],
- get_first_key=False, recursive=False) -> dict:
- """Return config dict with defaults non-destructively merged
-
- This merges non-recursive defaults relative to the config dict.
- """
- d = self.relative_defaults(path, conf, get_first_key=get_first_key,
- recursive=recursive)
- d, f = self._dict_merge(d, conf)
- d = type(conf)(d)
- if hasattr(d, '_from_defaults'):
- setattr(d, '_from_defaults', f)
- return d