summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/vyos/accel_ppp.py2
-rw-r--r--python/vyos/component_version.py2
-rw-r--r--python/vyos/config.py116
-rw-r--r--python/vyos/config_mgmt.py91
-rw-r--r--python/vyos/configdep.py2
-rw-r--r--python/vyos/configdict.py248
-rw-r--r--python/vyos/configdiff.py23
-rw-r--r--python/vyos/configquery.py8
-rw-r--r--python/vyos/configsession.py23
-rw-r--r--python/vyos/configsource.py4
-rw-r--r--python/vyos/configtree.py41
-rw-r--r--python/vyos/configverify.py23
-rw-r--r--python/vyos/defaults.py4
-rw-r--r--python/vyos/dicts.py53
-rw-r--r--python/vyos/ethtool.py5
-rw-r--r--python/vyos/firewall.py92
-rw-r--r--python/vyos/frr.py19
-rw-r--r--python/vyos/ifconfig/bond.py8
-rw-r--r--python/vyos/ifconfig/bridge.py8
-rw-r--r--python/vyos/ifconfig/control.py8
-rw-r--r--python/vyos/ifconfig/ethernet.py8
-rw-r--r--python/vyos/ifconfig/geneve.py3
-rw-r--r--python/vyos/ifconfig/interface.py211
-rw-r--r--python/vyos/ifconfig/l2tpv3.py5
-rw-r--r--python/vyos/ifconfig/macsec.py22
-rw-r--r--python/vyos/ifconfig/pppoe.py4
-rw-r--r--python/vyos/ifconfig/tunnel.py4
-rw-r--r--python/vyos/ifconfig/vrrp.py19
-rw-r--r--python/vyos/ifconfig/vti.py2
-rw-r--r--python/vyos/ifconfig/vxlan.py2
-rw-r--r--python/vyos/ifconfig/wireguard.py4
-rw-r--r--python/vyos/initialsetup.py14
-rw-r--r--python/vyos/ipsec.py16
-rw-r--r--python/vyos/migrator.py2
-rw-r--r--python/vyos/nat.py78
-rw-r--r--python/vyos/pki.py14
-rw-r--r--python/vyos/qos/base.py201
-rw-r--r--python/vyos/qos/limiter.py1
-rw-r--r--python/vyos/qos/priority.py2
-rw-r--r--python/vyos/qos/trafficshaper.py13
-rw-r--r--python/vyos/remote.py85
-rw-r--r--python/vyos/template.py25
-rw-r--r--python/vyos/util.py1160
-rw-r--r--python/vyos/utils/__init__.py30
-rw-r--r--python/vyos/utils/assertion.py81
-rw-r--r--python/vyos/utils/auth.py (renamed from python/vyos/authutils.py)2
-rw-r--r--python/vyos/utils/boot.py35
-rw-r--r--python/vyos/utils/commit.py60
-rw-r--r--python/vyos/utils/convert.py52
-rw-r--r--python/vyos/utils/dict.py101
-rw-r--r--python/vyos/utils/file.py12
-rw-r--r--python/vyos/utils/kernel.py38
-rw-r--r--python/vyos/utils/list.py20
-rw-r--r--python/vyos/utils/misc.py66
-rw-r--r--python/vyos/utils/network.py417
-rw-r--r--python/vyos/utils/permission.py78
-rw-r--r--python/vyos/utils/process.py232
-rw-r--r--python/vyos/utils/system.py107
-rw-r--r--python/vyos/validate.py259
-rw-r--r--python/vyos/version.py12
-rw-r--r--python/vyos/vpp.py315
-rw-r--r--python/vyos/xml_ref/__init__.py28
-rw-r--r--python/vyos/xml_ref/definition.py166
-rwxr-xr-xpython/vyos/xml_ref/generate_cache.py68
-rw-r--r--python/vyos/xml_ref/pkg_cache/__init__.py0
-rwxr-xr-xpython/vyos/xml_ref/update_cache.py51
66 files changed, 2769 insertions, 2136 deletions
diff --git a/python/vyos/accel_ppp.py b/python/vyos/accel_ppp.py
index 0af311e57..0b4f8a9fe 100644
--- a/python/vyos/accel_ppp.py
+++ b/python/vyos/accel_ppp.py
@@ -18,7 +18,7 @@
import sys
import vyos.opmode
-from vyos.util import rc_cmd
+from vyos.utils.process import rc_cmd
def get_server_statistics(accel_statistics, pattern, sep=':') -> dict:
diff --git a/python/vyos/component_version.py b/python/vyos/component_version.py
index a4e318d08..84e0ae51a 100644
--- a/python/vyos/component_version.py
+++ b/python/vyos/component_version.py
@@ -37,7 +37,7 @@ import re
import sys
import fileinput
-from vyos.xml import component_version
+from vyos.xml_ref import component_version
from vyos.version import get_version
from vyos.defaults import directories
diff --git a/python/vyos/config.py b/python/vyos/config.py
index 287fd2ed1..0ca41718f 100644
--- a/python/vyos/config.py
+++ b/python/vyos/config.py
@@ -1,4 +1,4 @@
-# Copyright 2017, 2019 VyOS maintainers and contributors <maintainers@vyos.io>
+# Copyright 2017, 2019-2023 VyOS maintainers and contributors <maintainers@vyos.io>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -66,11 +66,31 @@ In operational mode, all functions return values from the running config.
import re
import json
from copy import deepcopy
+from typing import Union
-import vyos.xml
-import vyos.util
import vyos.configtree
-from vyos.configsource import ConfigSource, ConfigSourceSession
+from vyos.xml_ref import multi_to_list
+from vyos.xml_ref import from_source
+from vyos.xml_ref import ext_dict_merge
+from vyos.xml_ref import relative_defaults
+from vyos.utils.dict import get_sub_dict
+from vyos.utils.dict import mangle_dict_keys
+from vyos.configsource import ConfigSource
+from vyos.configsource import ConfigSourceSession
+
+class ConfigDict(dict):
+ _from_defaults = {}
+ _dict_kwargs = {}
+ def from_defaults(self, path: list[str]) -> bool:
+ return from_source(self._from_defaults, path)
+ @property
+ def kwargs(self) -> dict:
+ return self._dict_kwargs
+
+def config_dict_merge(src: dict, dest: Union[dict, ConfigDict]) -> ConfigDict:
+ if not isinstance(dest, ConfigDict):
+ dest = ConfigDict(dest)
+ return ext_dict_merge(src, dest)
class Config(object):
"""
@@ -93,6 +113,11 @@ class Config(object):
(self._running_config,
self._session_config) = self._config_source.get_configtree_tuple()
+ def get_config_tree(self, effective=False):
+ if effective:
+ return self._running_config
+ return self._session_config
+
def _make_path(self, path):
# Backwards-compatibility stuff: original implementation used string paths
# libvyosconfig paths are lists, but since node names cannot contain whitespace,
@@ -223,9 +248,17 @@ class Config(object):
return config_dict
+ def verify_mangling(self, key_mangling):
+ if not (isinstance(key_mangling, tuple) and \
+ (len(key_mangling) == 2) and \
+ isinstance(key_mangling[0], str) and \
+ isinstance(key_mangling[1], str)):
+ raise ValueError("key_mangling must be a tuple of two strings")
+
def get_config_dict(self, path=[], effective=False, key_mangling=None,
get_first_key=False, no_multi_convert=False,
- no_tag_node_value_mangle=False):
+ no_tag_node_value_mangle=False,
+ with_defaults=False, with_recursive_defaults=False):
"""
Args:
path (str list): Configuration tree path, can be empty
@@ -236,32 +269,73 @@ class Config(object):
Returns: a dict representation of the config under path
"""
+ kwargs = locals().copy()
+ del kwargs['self']
+ del kwargs['no_multi_convert']
+ del kwargs['with_defaults']
+ del kwargs['with_recursive_defaults']
+
lpath = self._make_path(path)
root_dict = self.get_cached_root_dict(effective)
- conf_dict = vyos.util.get_sub_dict(root_dict, lpath, get_first_key)
-
- if not key_mangling and no_multi_convert:
- return deepcopy(conf_dict)
+ conf_dict = get_sub_dict(root_dict, lpath, get_first_key=get_first_key)
- xmlpath = lpath if get_first_key else lpath[:-1]
+ rpath = lpath if get_first_key else lpath[:-1]
- if not key_mangling:
- conf_dict = vyos.xml.multi_to_list(xmlpath, conf_dict)
- return conf_dict
+ if not no_multi_convert:
+ conf_dict = multi_to_list(rpath, conf_dict)
- if no_multi_convert is False:
- conf_dict = vyos.xml.multi_to_list(xmlpath, conf_dict)
+ if key_mangling is not None:
+ self.verify_mangling(key_mangling)
+ conf_dict = mangle_dict_keys(conf_dict,
+ key_mangling[0], key_mangling[1],
+ abs_path=rpath,
+ no_tag_node_value_mangle=no_tag_node_value_mangle)
- if not (isinstance(key_mangling, tuple) and \
- (len(key_mangling) == 2) and \
- isinstance(key_mangling[0], str) and \
- isinstance(key_mangling[1], str)):
- raise ValueError("key_mangling must be a tuple of two strings")
+ if with_defaults or with_recursive_defaults:
+ defaults = self.get_config_defaults(**kwargs,
+ recursive=with_recursive_defaults)
+ conf_dict = config_dict_merge(defaults, conf_dict)
+ else:
+ conf_dict = ConfigDict(conf_dict)
- conf_dict = vyos.util.mangle_dict_keys(conf_dict, key_mangling[0], key_mangling[1], abs_path=xmlpath, no_tag_node_value_mangle=no_tag_node_value_mangle)
+ # save optional args for a call to get_config_defaults
+ setattr(conf_dict, '_dict_kwargs', kwargs)
return conf_dict
+ def get_config_defaults(self, path=[], effective=False, key_mangling=None,
+ no_tag_node_value_mangle=False, get_first_key=False,
+ recursive=False) -> dict:
+ lpath = self._make_path(path)
+ root_dict = self.get_cached_root_dict(effective)
+ conf_dict = get_sub_dict(root_dict, lpath, get_first_key)
+
+ defaults = relative_defaults(lpath, conf_dict,
+ get_first_key=get_first_key,
+ recursive=recursive)
+
+ rpath = lpath if get_first_key else lpath[:-1]
+
+ if key_mangling is not None:
+ self.verify_mangling(key_mangling)
+ defaults = mangle_dict_keys(defaults,
+ key_mangling[0], key_mangling[1],
+ abs_path=rpath,
+ no_tag_node_value_mangle=no_tag_node_value_mangle)
+
+ return defaults
+
+ def merge_defaults(self, config_dict: ConfigDict, recursive=False):
+ if not isinstance(config_dict, ConfigDict):
+ raise TypeError('argument is not of type ConfigDict')
+ if not config_dict.kwargs:
+ raise ValueError('argument missing metadata')
+
+ args = config_dict.kwargs
+ d = self.get_config_defaults(**args, recursive=recursive)
+ config_dict = config_dict_merge(d, config_dict)
+ return config_dict
+
def is_multi(self, path):
"""
Args:
diff --git a/python/vyos/config_mgmt.py b/python/vyos/config_mgmt.py
index fade3081c..0fc72e660 100644
--- a/python/vyos/config_mgmt.py
+++ b/python/vyos/config_mgmt.py
@@ -18,17 +18,23 @@ import re
import sys
import gzip
import logging
+
from typing import Optional, Tuple, Union
from filecmp import cmp
from datetime import datetime
+from textwrap import dedent
+from pathlib import Path
from tabulate import tabulate
from vyos.config import Config
from vyos.configtree import ConfigTree, ConfigTreeError, show_diff
from vyos.defaults import directories
-from vyos.util import is_systemd_service_active, ask_yes_no, rc_cmd
+from vyos.version import get_full_version_data
+from vyos.utils.io import ask_yes_no
+from vyos.utils.process import is_systemd_service_active
+from vyos.utils.process import rc_cmd
-SAVE_CONFIG = '/opt/vyatta/sbin/vyatta-save-config.pl'
+SAVE_CONFIG = '/usr/libexec/vyos/vyos-save-config.py'
# created by vyatta-cfg-postinst
commit_post_hook_dir = '/etc/commit/post-hooks.d'
@@ -56,6 +62,44 @@ formatter = logging.Formatter('%(funcName)s: %(levelname)s:%(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
+def save_config(target):
+ cmd = f'{SAVE_CONFIG} {target}'
+ rc, out = rc_cmd(cmd)
+ if rc != 0:
+ logger.critical(f'save config failed: {out}')
+
+def unsaved_commits() -> bool:
+ if get_full_version_data()['boot_via'] == 'livecd':
+ return False
+ tmp_save = '/tmp/config.running'
+ save_config(tmp_save)
+ ret = not cmp(tmp_save, config_file, shallow=False)
+ os.unlink(tmp_save)
+ return ret
+
+def get_file_revision(rev: int):
+ revision = os.path.join(archive_dir, f'config.boot.{rev}.gz')
+ try:
+ with gzip.open(revision) as f:
+ r = f.read().decode()
+ except FileNotFoundError:
+ logger.warning(f'commit revision {rev} not available')
+ return ''
+ return r
+
+def get_config_tree_revision(rev: int):
+ c = get_file_revision(rev)
+ return ConfigTree(c)
+
+def is_node_revised(path: list = [], rev1: int = 1, rev2: int = 0) -> bool:
+ from vyos.configtree import DiffTree
+ left = get_config_tree_revision(rev1)
+ right = get_config_tree_revision(rev2)
+ diff_tree = DiffTree(left, right)
+ if diff_tree.add.exists(path) or diff_tree.sub.exists(path):
+ return True
+ return False
+
class ConfigMgmtError(Exception):
pass
@@ -98,20 +142,6 @@ class ConfigMgmt:
self.active_config = config._running_config
self.working_config = config._session_config
- @staticmethod
- def save_config(target):
- cmd = f'{SAVE_CONFIG} {target}'
- rc, out = rc_cmd(cmd)
- if rc != 0:
- logger.critical(f'save config failed: {out}')
-
- def _unsaved_commits(self) -> bool:
- tmp_save = '/tmp/config.boot.check-save'
- self.save_config(tmp_save)
- ret = not cmp(tmp_save, config_file, shallow=False)
- os.unlink(tmp_save)
- return ret
-
# Console script functions
#
def commit_confirm(self, minutes: int=DEFAULT_TIME_MINUTES,
@@ -123,7 +153,7 @@ class ConfigMgmt:
msg = 'Another confirm is pending'
return msg, 1
- if self._unsaved_commits():
+ if unsaved_commits():
W = '\nYou should save previous commits before commit-confirm !\n'
else:
W = ''
@@ -431,26 +461,25 @@ Proceed ?'''
return ConfigTree(c)
def _add_logrotate_conf(self):
- conf = f"""{archive_config_file} {{
- su root vyattacfg
- rotate {self.max_revisions}
- start 0
- compress
- copy
-}}"""
- mask = os.umask(0o133)
-
- with open(logrotate_conf, 'w') as f:
- f.write(conf)
-
- os.umask(mask)
+ conf: str = dedent(f"""\
+ {archive_config_file} {{
+ su root vyattacfg
+ rotate {self.max_revisions}
+ start 0
+ compress
+ copy
+ }}
+ """)
+ conf_file = Path(logrotate_conf)
+ conf_file.write_text(conf)
+ conf_file.chmod(0o644)
def _archive_active_config(self) -> bool:
mask = os.umask(0o113)
ext = os.getpid()
tmp_save = f'/tmp/config.boot.{ext}'
- self.save_config(tmp_save)
+ save_config(tmp_save)
try:
if cmp(tmp_save, archive_config_file, shallow=False):
diff --git a/python/vyos/configdep.py b/python/vyos/configdep.py
index d4b2cc78f..7a8559839 100644
--- a/python/vyos/configdep.py
+++ b/python/vyos/configdep.py
@@ -18,7 +18,7 @@ import json
import typing
from inspect import stack
-from vyos.util import load_as_module
+from vyos.utils.system import load_as_module
from vyos.defaults import directories
from vyos.configsource import VyOSError
from vyos import ConfigError
diff --git a/python/vyos/configdict.py b/python/vyos/configdict.py
index 6ab5c252c..71a06b625 100644
--- a/python/vyos/configdict.py
+++ b/python/vyos/configdict.py
@@ -19,9 +19,8 @@ A library for retrieving value dicts from VyOS configs in a declarative fashion.
import os
import json
-from vyos.util import dict_search
-from vyos.xml import defaults
-from vyos.util import cmd
+from vyos.utils.dict import dict_search
+from vyos.utils.process import cmd
def retrieve_config(path_hash, base_path, config):
"""
@@ -177,24 +176,6 @@ def get_removed_vlans(conf, path, dict):
return dict
-def T2665_set_dhcpv6pd_defaults(config_dict):
- """ Properly configure DHCPv6 default options in the dictionary. If there is
- no DHCPv6 configured at all, it is safe to remove the entire configuration.
- """
- # As this is the same for every interface type it is safe to assume this
- # for ethernet
- pd_defaults = defaults(['interfaces', 'ethernet', 'dhcpv6-options', 'pd'])
-
- # Implant default dictionary for DHCPv6-PD instances
- if dict_search('dhcpv6_options.pd.length', config_dict):
- del config_dict['dhcpv6_options']['pd']['length']
-
- for pd in (dict_search('dhcpv6_options.pd', config_dict) or []):
- config_dict['dhcpv6_options']['pd'][pd] = dict_merge(pd_defaults,
- config_dict['dhcpv6_options']['pd'][pd])
-
- return config_dict
-
def is_member(conf, interface, intftype=None):
"""
Checks if passed interface is member of other interface of specified type.
@@ -263,6 +244,48 @@ def is_mirror_intf(conf, interface, direction=None):
return ret_val
+def has_address_configured(conf, intf):
+ """
+ Checks if interface has an address configured.
+ Checks the following config nodes:
+ 'address', 'ipv6 address eui64', 'ipv6 address autoconf'
+
+ Returns True if interface has address configured, False if it doesn't.
+ """
+ from vyos.ifconfig import Section
+ ret = False
+
+ old_level = conf.get_level()
+ conf.set_level([])
+
+ intfpath = 'interfaces ' + Section.get_config_path(intf)
+ if ( conf.exists(f'{intfpath} address') or
+ conf.exists(f'{intfpath} ipv6 address autoconf') or
+ conf.exists(f'{intfpath} ipv6 address eui64') ):
+ ret = True
+
+ conf.set_level(old_level)
+ return ret
+
+def has_vrf_configured(conf, intf):
+ """
+ Checks if interface has a VRF configured.
+
+ Returns True if interface has VRF configured, False if it doesn't.
+ """
+ from vyos.ifconfig import Section
+ ret = False
+
+ old_level = conf.get_level()
+ conf.set_level([])
+
+ tmp = ['interfaces', Section.get_config_path(intf), 'vrf']
+ if conf.exists(tmp):
+ ret = True
+
+ conf.set_level(old_level)
+ return ret
+
def has_vlan_subinterface_configured(conf, intf):
"""
Checks if interface has an VLAN subinterface configured.
@@ -389,7 +412,7 @@ def get_pppoe_interfaces(conf, vrf=None):
return pppoe_interfaces
-def get_interface_dict(config, base, ifname=''):
+def get_interface_dict(config, base, ifname='', recursive_defaults=True):
"""
Common utility function to retrieve and mangle the interfaces configuration
from the CLI input nodes. All interfaces have a common base where value
@@ -405,46 +428,23 @@ def get_interface_dict(config, base, ifname=''):
raise ConfigError('Interface (VYOS_TAGNODE_VALUE) not specified')
ifname = os.environ['VYOS_TAGNODE_VALUE']
- # retrieve interface default values
- default_values = defaults(base)
-
- # We take care about VLAN (vif, vif-s, vif-c) default values later on when
- # parsing vlans in default dict and merge the "proper" values in correctly,
- # see T2665.
- for vif in ['vif', 'vif_s']:
- if vif in default_values: del default_values[vif]
-
- dict = config.get_config_dict(base + [ifname], key_mangling=('-', '_'),
- get_first_key=True,
- no_tag_node_value_mangle=True)
-
# Check if interface has been removed. We must use exists() as
# get_config_dict() will always return {} - even when an empty interface
# node like the following exists.
# +macsec macsec1 {
# +}
if not config.exists(base + [ifname]):
+ dict = config.get_config_dict(base + [ifname], key_mangling=('-', '_'),
+ get_first_key=True,
+ no_tag_node_value_mangle=True)
dict.update({'deleted' : {}})
-
- # Add interface instance name into dictionary
- dict.update({'ifname': ifname})
-
- # Check if QoS policy applied on this interface - See ifconfig.interface.set_mirror_redirect()
- if config.exists(['qos', 'interface', ifname]):
- dict.update({'traffic_policy': {}})
-
- # XXX: T2665: When there is no DHCPv6-PD configuration given, we can safely
- # remove the default values from the dict.
- if 'dhcpv6_options' not in dict:
- if 'dhcpv6_options' in default_values:
- del default_values['dhcpv6_options']
-
- # We have gathered the dict representation of the CLI, but there are
- # default options which we need to update into the dictionary retrived.
- # But we should only add them when interface is not deleted - as this might
- # confuse parsers
- if 'deleted' not in dict:
- dict = dict_merge(default_values, dict)
+ else:
+ # Get config_dict with default values
+ dict = config.get_config_dict(base + [ifname], key_mangling=('-', '_'),
+ get_first_key=True,
+ no_tag_node_value_mangle=True,
+ with_defaults=True,
+ with_recursive_defaults=recursive_defaults)
# If interface does not request an IPv4 DHCP address there is no need
# to keep the dhcp-options key
@@ -452,8 +452,12 @@ def get_interface_dict(config, base, ifname=''):
if 'dhcp_options' in dict:
del dict['dhcp_options']
- # XXX: T2665: blend in proper DHCPv6-PD default values
- dict = T2665_set_dhcpv6pd_defaults(dict)
+ # Add interface instance name into dictionary
+ dict.update({'ifname': ifname})
+
+ # Check if QoS policy applied on this interface - See ifconfig.interface.set_mirror_redirect()
+ if config.exists(['qos', 'interface', ifname]):
+ dict.update({'traffic_policy': {}})
address = leaf_node_changed(config, base + [ifname, 'address'])
if address: dict.update({'address_old' : address})
@@ -474,6 +478,10 @@ def get_interface_dict(config, base, ifname=''):
dhcp = is_node_changed(config, base + [ifname, 'dhcp-options'])
if dhcp: dict.update({'dhcp_options_changed' : {}})
+ # Changine interface VRF assignemnts require a DHCP restart, too
+ dhcp = is_node_changed(config, base + [ifname, 'vrf'])
+ if dhcp: dict.update({'dhcp_options_changed' : {}})
+
# Some interfaces come with a source_interface which must also not be part
# of any other bond or bridge interface as it is exclusivly assigned as the
# Kernels "lower" interface to this new "virtual/upper" interface.
@@ -497,9 +505,6 @@ def get_interface_dict(config, base, ifname=''):
else:
dict['ipv6']['address'].update({'eui64_old': eui64})
- # Implant default dictionary in vif/vif-s VLAN interfaces. Values are
- # identical for all types of VLAN interfaces as they all include the same
- # XML definitions which hold the defaults.
for vif, vif_config in dict.get('vif', {}).items():
# Add subinterface name to dictionary
dict['vif'][vif].update({'ifname' : f'{ifname}.{vif}'})
@@ -507,22 +512,10 @@ def get_interface_dict(config, base, ifname=''):
if config.exists(['qos', 'interface', f'{ifname}.{vif}']):
dict['vif'][vif].update({'traffic_policy': {}})
- default_vif_values = defaults(base + ['vif'])
- # XXX: T2665: When there is no DHCPv6-PD configuration given, we can safely
- # remove the default values from the dict.
- if not 'dhcpv6_options' in vif_config:
- del default_vif_values['dhcpv6_options']
-
- # Only add defaults if interface is not about to be deleted - this is
- # to keep a cleaner config dict.
if 'deleted' not in dict:
address = leaf_node_changed(config, base + [ifname, 'vif', vif, 'address'])
if address: dict['vif'][vif].update({'address_old' : address})
- dict['vif'][vif] = dict_merge(default_vif_values, dict['vif'][vif])
- # XXX: T2665: blend in proper DHCPv6-PD default values
- dict['vif'][vif] = T2665_set_dhcpv6pd_defaults(dict['vif'][vif])
-
# If interface does not request an IPv4 DHCP address there is no need
# to keep the dhcp-options key
if 'address' not in dict['vif'][vif] or 'dhcp' not in dict['vif'][vif]['address']:
@@ -544,26 +537,10 @@ def get_interface_dict(config, base, ifname=''):
if config.exists(['qos', 'interface', f'{ifname}.{vif_s}']):
dict['vif_s'][vif_s].update({'traffic_policy': {}})
- default_vif_s_values = defaults(base + ['vif-s'])
- # XXX: T2665: we only wan't the vif-s defaults - do not care about vif-c
- if 'vif_c' in default_vif_s_values: del default_vif_s_values['vif_c']
-
- # XXX: T2665: When there is no DHCPv6-PD configuration given, we can safely
- # remove the default values from the dict.
- if not 'dhcpv6_options' in vif_s_config:
- del default_vif_s_values['dhcpv6_options']
-
- # Only add defaults if interface is not about to be deleted - this is
- # to keep a cleaner config dict.
if 'deleted' not in dict:
address = leaf_node_changed(config, base + [ifname, 'vif-s', vif_s, 'address'])
if address: dict['vif_s'][vif_s].update({'address_old' : address})
- dict['vif_s'][vif_s] = dict_merge(default_vif_s_values,
- dict['vif_s'][vif_s])
- # XXX: T2665: blend in proper DHCPv6-PD default values
- dict['vif_s'][vif_s] = T2665_set_dhcpv6pd_defaults(dict['vif_s'][vif_s])
-
# If interface does not request an IPv4 DHCP address there is no need
# to keep the dhcp-options key
if 'address' not in dict['vif_s'][vif_s] or 'dhcp' not in \
@@ -586,26 +563,11 @@ def get_interface_dict(config, base, ifname=''):
if config.exists(['qos', 'interface', f'{ifname}.{vif_s}.{vif_c}']):
dict['vif_s'][vif_s]['vif_c'][vif_c].update({'traffic_policy': {}})
- default_vif_c_values = defaults(base + ['vif-s', 'vif-c'])
-
- # XXX: T2665: When there is no DHCPv6-PD configuration given, we can safely
- # remove the default values from the dict.
- if not 'dhcpv6_options' in vif_c_config:
- del default_vif_c_values['dhcpv6_options']
-
- # Only add defaults if interface is not about to be deleted - this is
- # to keep a cleaner config dict.
if 'deleted' not in dict:
address = leaf_node_changed(config, base + [ifname, 'vif-s', vif_s, 'vif-c', vif_c, 'address'])
if address: dict['vif_s'][vif_s]['vif_c'][vif_c].update(
{'address_old' : address})
- dict['vif_s'][vif_s]['vif_c'][vif_c] = dict_merge(
- default_vif_c_values, dict['vif_s'][vif_s]['vif_c'][vif_c])
- # XXX: T2665: blend in proper DHCPv6-PD default values
- dict['vif_s'][vif_s]['vif_c'][vif_c] = T2665_set_dhcpv6pd_defaults(
- dict['vif_s'][vif_s]['vif_c'][vif_c])
-
# If interface does not request an IPv4 DHCP address there is no need
# to keep the dhcp-options key
if 'address' not in dict['vif_s'][vif_s]['vif_c'][vif_c] or 'dhcp' \
@@ -655,45 +617,13 @@ def get_accel_dict(config, base, chap_secrets):
Return a dictionary with the necessary interface config keys.
"""
- from vyos.util import get_half_cpus
+ from vyos.utils.system import get_half_cpus
from vyos.template import is_ipv4
dict = config.get_config_dict(base, key_mangling=('-', '_'),
get_first_key=True,
- no_tag_node_value_mangle=True)
-
- # We have gathered the dict representation of the CLI, but there are default
- # options which we need to update into the dictionary retrived.
- default_values = defaults(base)
-
- # T2665: defaults include RADIUS server specifics per TAG node which need to
- # be added to individual RADIUS servers instead - so we can simply delete them
- if dict_search('authentication.radius.server', default_values):
- del default_values['authentication']['radius']['server']
-
- # T2665: defaults include static-ip address per TAG node which need to be
- # added to individual local users instead - so we can simply delete them
- if dict_search('authentication.local_users.username', default_values):
- del default_values['authentication']['local_users']['username']
-
- # T2665: defaults include IPv6 client-pool mask per TAG node which need to be
- # added to individual local users instead - so we can simply delete them
- if dict_search('client_ipv6_pool.prefix.mask', default_values):
- del default_values['client_ipv6_pool']['prefix']['mask']
- # delete empty dicts
- if len (default_values['client_ipv6_pool']['prefix']) == 0:
- del default_values['client_ipv6_pool']['prefix']
- if len (default_values['client_ipv6_pool']) == 0:
- del default_values['client_ipv6_pool']
-
- # T2665: IPoE only - it has an interface tag node
- # added to individual local users instead - so we can simply delete them
- if dict_search('authentication.interface', default_values):
- del default_values['authentication']['interface']
- if dict_search('interface', default_values):
- del default_values['interface']
-
- dict = dict_merge(default_values, dict)
+ no_tag_node_value_mangle=True,
+ with_recursive_defaults=True)
# set CPUs cores to process requests
dict.update({'thread_count' : get_half_cpus()})
@@ -713,43 +643,9 @@ def get_accel_dict(config, base, chap_secrets):
dict.update({'name_server_ipv4' : ns_v4, 'name_server_ipv6' : ns_v6})
del dict['name_server']
- # T2665: Add individual RADIUS server default values
- if dict_search('authentication.radius.server', dict):
- default_values = defaults(base + ['authentication', 'radius', 'server'])
- for server in dict_search('authentication.radius.server', dict):
- dict['authentication']['radius']['server'][server] = dict_merge(
- default_values, dict['authentication']['radius']['server'][server])
-
- # Check option "disable-accounting" per server and replace default value from '1813' to '0'
- # set vpn sstp authentication radius server x.x.x.x disable-accounting
- if 'disable_accounting' in dict['authentication']['radius']['server'][server]:
- dict['authentication']['radius']['server'][server]['acct_port'] = '0'
-
- # T2665: Add individual local-user default values
- if dict_search('authentication.local_users.username', dict):
- default_values = defaults(base + ['authentication', 'local-users', 'username'])
- for username in dict_search('authentication.local_users.username', dict):
- dict['authentication']['local_users']['username'][username] = dict_merge(
- default_values, dict['authentication']['local_users']['username'][username])
-
- # T2665: Add individual IPv6 client-pool default mask if required
- if dict_search('client_ipv6_pool.prefix', dict):
- default_values = defaults(base + ['client-ipv6-pool', 'prefix'])
- for prefix in dict_search('client_ipv6_pool.prefix', dict):
- dict['client_ipv6_pool']['prefix'][prefix] = dict_merge(
- default_values, dict['client_ipv6_pool']['prefix'][prefix])
-
- # T2665: IPoE only - add individual local-user default values
- if dict_search('authentication.interface', dict):
- default_values = defaults(base + ['authentication', 'interface'])
- for interface in dict_search('authentication.interface', dict):
- dict['authentication']['interface'][interface] = dict_merge(
- default_values, dict['authentication']['interface'][interface])
-
- if dict_search('interface', dict):
- default_values = defaults(base + ['interface'])
- for interface in dict_search('interface', dict):
- dict['interface'][interface] = dict_merge(default_values,
- dict['interface'][interface])
+ # Check option "disable-accounting" per server and replace default value from '1813' to '0'
+ for server in (dict_search('authentication.radius.server', dict) or []):
+ if 'disable_accounting' in dict['authentication']['radius']['server'][server]:
+ dict['authentication']['radius']['server'][server]['acct_port'] = '0'
return dict
diff --git a/python/vyos/configdiff.py b/python/vyos/configdiff.py
index ac86af09c..1ec2dfafe 100644
--- a/python/vyos/configdiff.py
+++ b/python/vyos/configdiff.py
@@ -19,9 +19,10 @@ from vyos.config import Config
from vyos.configtree import DiffTree
from vyos.configdict import dict_merge
from vyos.configdict import list_diff
-from vyos.util import get_sub_dict, mangle_dict_keys
-from vyos.util import dict_search_args
-from vyos.xml import defaults
+from vyos.utils.dict import get_sub_dict
+from vyos.utils.dict import mangle_dict_keys
+from vyos.utils.dict import dict_search_args
+from vyos.xml_ref import get_defaults
class ConfigDiffError(Exception):
"""
@@ -239,7 +240,9 @@ class ConfigDiff(object):
if self._key_mangling:
ret[k] = self._mangle_dict_keys(ret[k])
if k in target_defaults and not no_defaults:
- default_values = defaults(self._make_path(path))
+ default_values = get_defaults(self._make_path(path),
+ get_first_key=True,
+ recursive=True)
ret[k] = dict_merge(default_values, ret[k])
return ret
@@ -263,7 +266,9 @@ class ConfigDiff(object):
ret[k] = self._mangle_dict_keys(ret[k])
if k in target_defaults and not no_defaults:
- default_values = defaults(self._make_path(path))
+ default_values = get_defaults(self._make_path(path),
+ get_first_key=True,
+ recursive=True)
ret[k] = dict_merge(default_values, ret[k])
return ret
@@ -311,7 +316,9 @@ class ConfigDiff(object):
if self._key_mangling:
ret[k] = self._mangle_dict_keys(ret[k])
if k in target_defaults and not no_defaults:
- default_values = defaults(self._make_path(path))
+ default_values = get_defaults(self._make_path(path),
+ get_first_key=True,
+ recursive=True)
ret[k] = dict_merge(default_values, ret[k])
return ret
@@ -334,7 +341,9 @@ class ConfigDiff(object):
ret[k] = self._mangle_dict_keys(ret[k])
if k in target_defaults and not no_defaults:
- default_values = defaults(self._make_path(path))
+ default_values = get_defaults(self._make_path(path),
+ get_first_key=True,
+ recursive=True)
ret[k] = dict_merge(default_values, ret[k])
return ret
diff --git a/python/vyos/configquery.py b/python/vyos/configquery.py
index 85fef8777..71ad5b4f0 100644
--- a/python/vyos/configquery.py
+++ b/python/vyos/configquery.py
@@ -1,4 +1,4 @@
-# Copyright 2021 VyOS maintainers and contributors <maintainers@vyos.io>
+# Copyright 2021-2023 VyOS maintainers and contributors <maintainers@vyos.io>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -19,9 +19,11 @@ settings from op mode, and execution of arbitrary op mode commands.
'''
import os
-from subprocess import STDOUT
-from vyos.util import popen, boot_configuration_complete
+from vyos.utils.process import STDOUT
+from vyos.utils.process import popen
+
+from vyos.utils.boot import boot_configuration_complete
from vyos.config import Config
from vyos.configsource import ConfigSourceSession, ConfigSourceString
from vyos.defaults import directories
diff --git a/python/vyos/configsession.py b/python/vyos/configsession.py
index df44fd8d6..6d4b2af59 100644
--- a/python/vyos/configsession.py
+++ b/python/vyos/configsession.py
@@ -1,5 +1,5 @@
# configsession -- the write API for the VyOS running config
-# Copyright (C) 2019 VyOS maintainers and contributors
+# Copyright (C) 2019-2023 VyOS maintainers and contributors
#
# This library is free software; you can redistribute it and/or modify it under the terms of
# the GNU Lesser General Public License as published by the Free Software Foundation;
@@ -17,7 +17,8 @@ import re
import sys
import subprocess
-from vyos.util import is_systemd_service_running
+from vyos.utils.process import is_systemd_service_running
+from vyos.utils.dict import dict_to_paths
CLI_SHELL_API = '/bin/cli-shell-api'
SET = '/opt/vyatta/sbin/my_set'
@@ -28,7 +29,7 @@ DISCARD = '/opt/vyatta/sbin/my_discard'
SHOW_CONFIG = ['/bin/cli-shell-api', 'showConfig']
LOAD_CONFIG = ['/bin/cli-shell-api', 'loadFile']
MIGRATE_LOAD_CONFIG = ['/usr/libexec/vyos/vyos-load-config.py']
-SAVE_CONFIG = ['/opt/vyatta/sbin/vyatta-save-config.pl']
+SAVE_CONFIG = ['/usr/libexec/vyos/vyos-save-config.py']
INSTALL_IMAGE = ['/opt/vyatta/sbin/install-image', '--url']
REMOVE_IMAGE = ['/opt/vyatta/bin/vyatta-boot-image.pl', '--del']
GENERATE = ['/opt/vyatta/bin/vyatta-op-cmd-wrapper', 'generate']
@@ -148,6 +149,13 @@ class ConfigSession(object):
value = [value]
self.__run_command([SET] + path + value)
+ def set_section(self, path: list, d: dict):
+ try:
+ for p in dict_to_paths(d):
+ self.set(path + p)
+ except (ValueError, ConfigSessionError) as e:
+ raise ConfigSessionError(e)
+
def delete(self, path, value=None):
if not value:
value = []
@@ -155,6 +163,15 @@ class ConfigSession(object):
value = [value]
self.__run_command([DELETE] + path + value)
+ def load_section(self, path: list, d: dict):
+ try:
+ self.delete(path)
+ if d:
+ for p in dict_to_paths(d):
+ self.set(path + p)
+ except (ValueError, ConfigSessionError) as e:
+ raise ConfigSessionError(e)
+
def comment(self, path, value=None):
if not value:
value = [""]
diff --git a/python/vyos/configsource.py b/python/vyos/configsource.py
index 510b5b65a..f582bdfab 100644
--- a/python/vyos/configsource.py
+++ b/python/vyos/configsource.py
@@ -1,5 +1,5 @@
-# Copyright 2020 VyOS maintainers and contributors <maintainers@vyos.io>
+# Copyright 2020-2023 VyOS maintainers and contributors <maintainers@vyos.io>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -19,7 +19,7 @@ import re
import subprocess
from vyos.configtree import ConfigTree
-from vyos.util import boot_configuration_complete
+from vyos.utils.boot import boot_configuration_complete
class VyOSError(Exception):
"""
diff --git a/python/vyos/configtree.py b/python/vyos/configtree.py
index 19b9838d4..09cfd43d3 100644
--- a/python/vyos/configtree.py
+++ b/python/vyos/configtree.py
@@ -201,7 +201,9 @@ class ConfigTree(object):
check_path(path)
path_str = " ".join(map(str, path)).encode()
- self.__delete(self.__config, path_str)
+ res = self.__delete(self.__config, path_str)
+ if (res != 0):
+ raise ConfigTreeError(f"Path doesn't exist: {path}")
if self.__migration:
print(f"- op: delete path: {path}")
@@ -210,7 +212,14 @@ class ConfigTree(object):
check_path(path)
path_str = " ".join(map(str, path)).encode()
- self.__delete_value(self.__config, path_str, value.encode())
+ res = self.__delete_value(self.__config, path_str, value.encode())
+ if (res != 0):
+ if res == 1:
+ raise ConfigTreeError(f"Path doesn't exist: {path}")
+ elif res == 2:
+ raise ConfigTreeError(f"Value doesn't exist: '{value}'")
+ else:
+ raise ConfigTreeError()
if self.__migration:
print(f"- op: delete_value path: {path} value: {value}")
@@ -374,14 +383,16 @@ def union(left, right, libpath=LIBPATH):
return tree
def reference_tree_to_json(from_dir, to_file, libpath=LIBPATH):
- __lib = cdll.LoadLibrary(libpath)
- __reference_tree_to_json = __lib.reference_tree_to_json
- __reference_tree_to_json.argtypes = [c_char_p, c_char_p]
- __get_error = __lib.get_error
- __get_error.argtypes = []
- __get_error.restype = c_char_p
-
- res = __reference_tree_to_json(from_dir.encode(), to_file.encode())
+ try:
+ __lib = cdll.LoadLibrary(libpath)
+ __reference_tree_to_json = __lib.reference_tree_to_json
+ __reference_tree_to_json.argtypes = [c_char_p, c_char_p]
+ __get_error = __lib.get_error
+ __get_error.argtypes = []
+ __get_error.restype = c_char_p
+ res = __reference_tree_to_json(from_dir.encode(), to_file.encode())
+ except Exception as e:
+ raise ConfigTreeError(e)
if res == 1:
msg = __get_error().decode()
raise ConfigTreeError(msg)
@@ -409,10 +420,6 @@ class DiffTree:
self.__diff_tree.argtypes = [c_char_p, c_void_p, c_void_p]
self.__diff_tree.restype = c_void_p
- self.__trim_tree = self.__lib.trim_tree
- self.__trim_tree.argtypes = [c_void_p, c_void_p]
- self.__trim_tree.restype = c_void_p
-
check_path(path)
path_str = " ".join(map(str, path)).encode()
@@ -426,11 +433,7 @@ class DiffTree:
self.add = self.full.get_subtree(['add'])
self.sub = self.full.get_subtree(['sub'])
self.inter = self.full.get_subtree(['inter'])
-
- # trim sub(-tract) tree to get delete tree for commands
- ref = self.right.get_subtree(path, with_node=True) if path else self.right
- res = self.__trim_tree(self.sub._get_config(), ref._get_config())
- self.delete = ConfigTree(address=res)
+ self.delete = self.full.get_subtree(['del'])
def to_commands(self):
add = self.add.to_commands()
diff --git a/python/vyos/configverify.py b/python/vyos/configverify.py
index 8fddd91d0..52f9238b8 100644
--- a/python/vyos/configverify.py
+++ b/python/vyos/configverify.py
@@ -22,8 +22,8 @@
# makes use of it!
from vyos import ConfigError
-from vyos.util import dict_search
-from vyos.util import dict_search_recursive
+from vyos.utils.dict import dict_search
+from vyos.utils.dict import dict_search_recursive
def verify_mtu(config):
"""
@@ -187,15 +187,14 @@ def verify_eapol(config):
if 'ca' not in config['pki']:
raise ConfigError('Invalid CA certificate specified for EAPoL')
- ca_cert_name = config['eapol']['ca_certificate']
+ for ca_cert_name in config['eapol']['ca_certificate']:
+ if ca_cert_name not in config['pki']['ca']:
+ raise ConfigError('Invalid CA certificate specified for EAPoL')
- if ca_cert_name not in config['pki']['ca']:
- raise ConfigError('Invalid CA certificate specified for EAPoL')
-
- ca_cert = config['pki']['ca'][ca_cert_name]
+ ca_cert = config['pki']['ca'][ca_cert_name]
- if 'certificate' not in ca_cert:
- raise ConfigError('Invalid CA certificate specified for EAPoL')
+ if 'certificate' not in ca_cert:
+ raise ConfigError('Invalid CA certificate specified for EAPoL')
def verify_mirror_redirect(config):
"""
@@ -314,15 +313,13 @@ def verify_dhcpv6(config):
recurring validation of DHCPv6 options which are mutually exclusive.
"""
if 'dhcpv6_options' in config:
- from vyos.util import dict_search
-
if {'parameters_only', 'temporary'} <= set(config['dhcpv6_options']):
raise ConfigError('DHCPv6 temporary and parameters-only options '
'are mutually exclusive!')
# It is not allowed to have duplicate SLA-IDs as those identify an
# assigned IPv6 subnet from a delegated prefix
- for pd in dict_search('dhcpv6_options.pd', config):
+ for pd in (dict_search('dhcpv6_options.pd', config) or []):
sla_ids = []
interfaces = dict_search(f'dhcpv6_options.pd.{pd}.interface', config)
@@ -460,7 +457,7 @@ def verify_diffie_hellman_length(file, min_keysize):
then or equal to min_keysize """
import os
import re
- from vyos.util import cmd
+ from vyos.utils.process import cmd
try:
keysize = str(min_keysize)
diff --git a/python/vyos/defaults.py b/python/vyos/defaults.py
index d4ffc249e..a5314790d 100644
--- a/python/vyos/defaults.py
+++ b/python/vyos/defaults.py
@@ -32,7 +32,9 @@ directories = {
'api_schema': f'{base_dir}/services/api/graphql/graphql/schema/',
'api_client_op': f'{base_dir}/services/api/graphql/graphql/client_op/',
'api_templates': f'{base_dir}/services/api/graphql/session/templates/',
- 'vyos_udev_dir' : '/run/udev/vyos'
+ 'vyos_udev_dir' : '/run/udev/vyos',
+ 'isc_dhclient_dir' : '/run/dhclient',
+ 'dhcp6_client_dir' : '/run/dhcp6c',
}
config_status = '/tmp/vyos-config-status'
diff --git a/python/vyos/dicts.py b/python/vyos/dicts.py
deleted file mode 100644
index b12cda40f..000000000
--- a/python/vyos/dicts.py
+++ /dev/null
@@ -1,53 +0,0 @@
-#!/usr/bin/env python3
-#
-# Copyright (C) 2019 VyOS maintainers and contributors
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 2 or later as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-
-from vyos import ConfigError
-
-
-class FixedDict(dict):
- """
- FixedDict: A dictionnary not allowing new keys to be created after initialisation.
-
- >>> f = FixedDict(**{'count':1})
- >>> f['count'] = 2
- >>> f['king'] = 3
- File "...", line ..., in __setitem__
- raise ConfigError(f'Option "{k}" has no defined default')
- """
-
- def __init__(self, **options):
- self._allowed = options.keys()
- super().__init__(**options)
-
- def __setitem__(self, k, v):
- """
- __setitem__ is a builtin which is called by python when setting dict values:
- >>> d = dict()
- >>> d['key'] = 'value'
- >>> d
- {'key': 'value'}
-
- is syntaxic sugar for
-
- >>> d = dict()
- >>> d.__setitem__('key','value')
- >>> d
- {'key': 'value'}
- """
- if k not in self._allowed:
- raise ConfigError(f'Option "{k}" has no defined default')
- super().__setitem__(k, v)
diff --git a/python/vyos/ethtool.py b/python/vyos/ethtool.py
index 1b1e54dfb..ca3bcfc3d 100644
--- a/python/vyos/ethtool.py
+++ b/python/vyos/ethtool.py
@@ -16,12 +16,13 @@
import os
import re
-from vyos.util import popen
+from vyos.utils.process import popen
# These drivers do not support using ethtool to change the speed, duplex, or
# flow control settings
_drivers_without_speed_duplex_flow = ['vmxnet3', 'virtio_net', 'xen_netfront',
- 'iavf', 'ice', 'i40e', 'hv_netvsc', 'veth']
+ 'iavf', 'ice', 'i40e', 'hv_netvsc', 'veth', 'ixgbevf',
+ 'tun']
class Ethtool:
"""
diff --git a/python/vyos/firewall.py b/python/vyos/firewall.py
index 919032a41..53ff8259e 100644
--- a/python/vyos/firewall.py
+++ b/python/vyos/firewall.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2021-2022 VyOS maintainers and contributors
+# Copyright (C) 2021-2023 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -28,11 +28,11 @@ from time import strftime
from vyos.remote import download
from vyos.template import is_ipv4
from vyos.template import render
-from vyos.util import call
-from vyos.util import cmd
-from vyos.util import dict_search_args
-from vyos.util import dict_search_recursive
-from vyos.util import run
+from vyos.utils.dict import dict_search_args
+from vyos.utils.dict import dict_search_recursive
+from vyos.utils.process import call
+from vyos.utils.process import cmd
+from vyos.utils.process import run
# Domain Resolver
@@ -41,14 +41,19 @@ def fqdn_config_parse(firewall):
firewall['ip6_fqdn'] = {}
for domain, path in dict_search_recursive(firewall, 'fqdn'):
- fw_name = path[1] # name/ipv6-name
- rule = path[3] # rule id
- suffix = path[4][0] # source/destination (1 char)
- set_name = f'{fw_name}_{rule}_{suffix}'
+ hook_name = path[1]
+ priority = path[2]
+
+ fw_name = path[2]
+ rule = path[4]
+ suffix = path[5][0]
+ set_name = f'{hook_name}_{priority}_{rule}_{suffix}'
- if path[0] == 'name':
+ if (path[0] == 'ipv4') and (path[1] == 'forward' or path[1] == 'input' or path[1] == 'output' or path[1] == 'name'):
firewall['ip_fqdn'][set_name] = domain
- elif path[0] == 'ipv6_name':
+ elif (path[0] == 'ipv6') and (path[1] == 'forward' or path[1] == 'input' or path[1] == 'output' or path[1] == 'name'):
+ if path[1] == 'name':
+ set_name = f'name6_{priority}_{rule}_{suffix}'
firewall['ip6_fqdn'][set_name] = domain
def fqdn_resolve(fqdn, ipv6=False):
@@ -80,7 +85,7 @@ def nft_action(vyos_action):
return 'return'
return vyos_action
-def parse_rule(rule_conf, fw_name, rule_id, ip_name):
+def parse_rule(rule_conf, hook, fw_name, rule_id, ip_name):
output = []
def_suffix = '6' if ip_name == 'ip6' else ''
@@ -129,16 +134,34 @@ def parse_rule(rule_conf, fw_name, rule_id, ip_name):
if 'fqdn' in side_conf:
fqdn = side_conf['fqdn']
+ hook_name = ''
operator = ''
if fqdn[0] == '!':
operator = '!='
- output.append(f'{ip_name} {prefix}addr {operator} @FQDN_{fw_name}_{rule_id}_{prefix}')
+ if hook == 'FWD':
+ hook_name = 'forward'
+ if hook == 'INP':
+ hook_name = 'input'
+ if hook == 'OUT':
+ hook_name = 'output'
+ if hook == 'NAM':
+ hook_name = f'name{def_suffix}'
+ output.append(f'{ip_name} {prefix}addr {operator} @FQDN_{hook_name}_{fw_name}_{rule_id}_{prefix}')
if dict_search_args(side_conf, 'geoip', 'country_code'):
operator = ''
+ hook_name = ''
if dict_search_args(side_conf, 'geoip', 'inverse_match') != None:
operator = '!='
- output.append(f'{ip_name} {prefix}addr {operator} @GEOIP_CC_{fw_name}_{rule_id}')
+ if hook == 'FWD':
+ hook_name = 'forward'
+ if hook == 'INP':
+ hook_name = 'input'
+ if hook == 'OUT':
+ hook_name = 'output'
+ if hook == 'NAM':
+ hook_name = f'name'
+ output.append(f'{ip_name} {prefix}addr {operator} @GEOIP_CC{def_suffix}_{hook_name}_{fw_name}_{rule_id}')
if 'mac_address' in side_conf:
suffix = side_conf["mac_address"]
@@ -249,20 +272,34 @@ def parse_rule(rule_conf, fw_name, rule_id, ip_name):
output.append(f'ip6 hoplimit {operator} {value}')
if 'inbound_interface' in rule_conf:
+ operator = ''
if 'interface_name' in rule_conf['inbound_interface']:
iiface = rule_conf['inbound_interface']['interface_name']
- output.append(f'iifname {{{iiface}}}')
+ if iiface[0] == '!':
+ operator = '!='
+ iiface = iiface[1:]
+ output.append(f'iifname {operator} {{{iiface}}}')
else:
iiface = rule_conf['inbound_interface']['interface_group']
- output.append(f'iifname @I_{iiface}')
+ if iiface[0] == '!':
+ operator = '!='
+ iiface = iiface[1:]
+ output.append(f'iifname {operator} @I_{iiface}')
if 'outbound_interface' in rule_conf:
+ operator = ''
if 'interface_name' in rule_conf['outbound_interface']:
oiface = rule_conf['outbound_interface']['interface_name']
- output.append(f'oifname {{{oiface}}}')
+ if oiface[0] == '!':
+ operator = '!='
+ oiface = oiface[1:]
+ output.append(f'oifname {operator} {{{oiface}}}')
else:
oiface = rule_conf['outbound_interface']['interface_group']
- output.append(f'oifname @I_{oiface}')
+ if oiface[0] == '!':
+ operator = '!='
+ oiface = oiface[1:]
+ output.append(f'oifname {operator} @I_{oiface}')
if 'ttl' in rule_conf:
operators = {'eq': '==', 'gt': '>', 'lt': '<'}
@@ -304,7 +341,7 @@ def parse_rule(rule_conf, fw_name, rule_id, ip_name):
if 'ipsec' in rule_conf:
if 'match_ipsec' in rule_conf['ipsec']:
output.append('meta ipsec == 1')
- if 'match_non_ipsec' in rule_conf['ipsec']:
+ if 'match_none' in rule_conf['ipsec']:
output.append('meta ipsec == 0')
if 'fragment' in rule_conf:
@@ -324,7 +361,7 @@ def parse_rule(rule_conf, fw_name, rule_id, ip_name):
if 'recent' in rule_conf:
count = rule_conf['recent']['count']
time = rule_conf['recent']['time']
- output.append(f'add @RECENT{def_suffix}_{fw_name}_{rule_id} {{ {ip_name} saddr limit rate over {count}/{time} burst {count} packets }}')
+ output.append(f'add @RECENT{def_suffix}_{hook}_{fw_name}_{rule_id} {{ {ip_name} saddr limit rate over {count}/{time} burst {count} packets }}')
if 'time' in rule_conf:
output.append(parse_time(rule_conf['time']))
@@ -348,7 +385,9 @@ def parse_rule(rule_conf, fw_name, rule_id, ip_name):
output.append(parse_policy_set(rule_conf['set'], def_suffix))
if 'action' in rule_conf:
- output.append(nft_action(rule_conf['action']))
+ # Change action=return to action=action
+ # #output.append(nft_action(rule_conf['action']))
+ output.append(f'{rule_conf["action"]}')
if 'jump' in rule_conf['action']:
target = rule_conf['jump_target']
output.append(f'NAME{def_suffix}_{target}')
@@ -365,7 +404,7 @@ def parse_rule(rule_conf, fw_name, rule_id, ip_name):
else:
output.append('return')
- output.append(f'comment "{fw_name}-{rule_id}"')
+ output.append(f'comment "{hook}-{fw_name}-{rule_id}"')
return " ".join(output)
def parse_tcp_flags(flags):
@@ -493,11 +532,12 @@ def geoip_update(firewall, force=False):
# Map country codes to set names
for codes, path in dict_search_recursive(firewall, 'country_code'):
- set_name = f'GEOIP_CC_{path[1]}_{path[3]}'
- if path[0] == 'name':
+ set_name = f'GEOIP_CC_{path[1]}_{path[2]}_{path[4]}'
+ if ( path[0] == 'ipv4'):
for code in codes:
ipv4_codes.setdefault(code, []).append(set_name)
- elif path[0] == 'ipv6_name':
+ elif ( path[0] == 'ipv6' ):
+ set_name = f'GEOIP_CC6_{path[1]}_{path[2]}_{path[4]}'
for code in codes:
ipv6_codes.setdefault(code, []).append(set_name)
diff --git a/python/vyos/frr.py b/python/vyos/frr.py
index a84f183ef..2e3c8a271 100644
--- a/python/vyos/frr.py
+++ b/python/vyos/frr.py
@@ -67,9 +67,12 @@ Apply the new configuration:
import tempfile
import re
-from vyos import util
-from vyos.util import chown
-from vyos.util import cmd
+
+from vyos.utils.permission import chown
+from vyos.utils.process import cmd
+from vyos.utils.process import popen
+from vyos.utils.process import STDOUT
+
import logging
from logging.handlers import SysLogHandler
import os
@@ -144,7 +147,7 @@ def get_configuration(daemon=None, marked=False):
if daemon:
cmd += f' -d {daemon}'
- output, code = util.popen(cmd, stderr=util.STDOUT)
+ output, code = popen(cmd, stderr=STDOUT)
if code:
raise OSError(code, output)
@@ -166,7 +169,7 @@ def mark_configuration(config):
config: The configuration string to mark/test
return: The marked configuration from FRR
"""
- output, code = util.popen(f"{path_vtysh} -m -f -", stderr=util.STDOUT, input=config)
+ output, code = popen(f"{path_vtysh} -m -f -", stderr=STDOUT, input=config)
if code == 2:
raise ConfigurationNotValid(str(output))
@@ -206,7 +209,7 @@ def reload_configuration(config, daemon=None):
cmd += f' {f.name}'
LOG.debug(f'reload_configuration: Executing command against frr-reload: "{cmd}"')
- output, code = util.popen(cmd, stderr=util.STDOUT)
+ output, code = popen(cmd, stderr=STDOUT)
f.close()
for i, e in enumerate(output.split('\n')):
LOG.debug(f'frr-reload output: {i:3} {e}')
@@ -235,7 +238,7 @@ def execute(command):
cmd = f"{path_vtysh} -c '{command}'"
- output, code = util.popen(cmd, stderr=util.STDOUT)
+ output, code = popen(cmd, stderr=STDOUT)
if code:
raise OSError(code, output)
@@ -267,7 +270,7 @@ def configure(lines, daemon=False):
for x in lines:
cmd += f" -c '{x}'"
- output, code = util.popen(cmd, stderr=util.STDOUT)
+ output, code = popen(cmd, stderr=STDOUT)
if code == 1:
raise ConfigurationNotValid(f'Configuration FRR failed: {repr(output)}')
elif code:
diff --git a/python/vyos/ifconfig/bond.py b/python/vyos/ifconfig/bond.py
index 0edd17055..d1d7d48c4 100644
--- a/python/vyos/ifconfig/bond.py
+++ b/python/vyos/ifconfig/bond.py
@@ -16,10 +16,10 @@
import os
from vyos.ifconfig.interface import Interface
-from vyos.util import cmd
-from vyos.util import dict_search
-from vyos.validate import assert_list
-from vyos.validate import assert_positive
+from vyos.utils.process import cmd
+from vyos.utils.dict import dict_search
+from vyos.utils.assertion import assert_list
+from vyos.utils.assertion import assert_positive
@Interface.register
class BondIf(Interface):
diff --git a/python/vyos/ifconfig/bridge.py b/python/vyos/ifconfig/bridge.py
index aa818bc5f..b29e71394 100644
--- a/python/vyos/ifconfig/bridge.py
+++ b/python/vyos/ifconfig/bridge.py
@@ -17,10 +17,10 @@ from netifaces import interfaces
import json
from vyos.ifconfig.interface import Interface
-from vyos.validate import assert_boolean
-from vyos.validate import assert_positive
-from vyos.util import cmd
-from vyos.util import dict_search
+from vyos.utils.assertion import assert_boolean
+from vyos.utils.assertion import assert_positive
+from vyos.utils.process import cmd
+from vyos.utils.dict import dict_search
from vyos.configdict import get_vlan_ids
from vyos.configdict import list_diff
diff --git a/python/vyos/ifconfig/control.py b/python/vyos/ifconfig/control.py
index 915c1d2f9..7402da55a 100644
--- a/python/vyos/ifconfig/control.py
+++ b/python/vyos/ifconfig/control.py
@@ -19,10 +19,10 @@ from inspect import signature
from inspect import _empty
from vyos.ifconfig.section import Section
-from vyos.util import popen
-from vyos.util import cmd
-from vyos.util import read_file
-from vyos.util import write_file
+from vyos.utils.process import popen
+from vyos.utils.process import cmd
+from vyos.utils.file import read_file
+from vyos.utils.file import write_file
from vyos import debug
class Control(Section):
diff --git a/python/vyos/ifconfig/ethernet.py b/python/vyos/ifconfig/ethernet.py
index 30bea3b86..24ce3a803 100644
--- a/python/vyos/ifconfig/ethernet.py
+++ b/python/vyos/ifconfig/ethernet.py
@@ -20,10 +20,10 @@ from glob import glob
from vyos.base import Warning
from vyos.ethtool import Ethtool
from vyos.ifconfig.interface import Interface
-from vyos.util import run
-from vyos.util import dict_search
-from vyos.util import read_file
-from vyos.validate import assert_list
+from vyos.utils.dict import dict_search
+from vyos.utils.file import read_file
+from vyos.utils.process import run
+from vyos.utils.assertion import assert_list
@Interface.register
class EthernetIf(Interface):
diff --git a/python/vyos/ifconfig/geneve.py b/python/vyos/ifconfig/geneve.py
index 276c34cd7..fbb261a35 100644
--- a/python/vyos/ifconfig/geneve.py
+++ b/python/vyos/ifconfig/geneve.py
@@ -14,7 +14,7 @@
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
from vyos.ifconfig import Interface
-from vyos.util import dict_search
+from vyos.utils.dict import dict_search
@Interface.register
class GeneveIf(Interface):
@@ -45,6 +45,7 @@ class GeneveIf(Interface):
'parameters.ip.df' : 'df',
'parameters.ip.tos' : 'tos',
'parameters.ip.ttl' : 'ttl',
+ 'parameters.ip.innerproto' : 'innerprotoinherit',
'parameters.ipv6.flowlabel' : 'flowlabel',
}
diff --git a/python/vyos/ifconfig/interface.py b/python/vyos/ifconfig/interface.py
index 85fa90653..20ea66953 100644
--- a/python/vyos/ifconfig/interface.py
+++ b/python/vyos/ifconfig/interface.py
@@ -31,24 +31,25 @@ from vyos import ConfigError
from vyos.configdict import list_diff
from vyos.configdict import dict_merge
from vyos.configdict import get_vlan_ids
+from vyos.defaults import directories
from vyos.template import render
-from vyos.util import mac2eui64
-from vyos.util import dict_search
-from vyos.util import read_file
-from vyos.util import run
-from vyos.util import get_interface_config
-from vyos.util import get_interface_namespace
-from vyos.util import is_systemd_service_active
+from vyos.utils.network import mac2eui64
+from vyos.utils.dict import dict_search
+from vyos.utils.file import read_file
+from vyos.utils.network import get_interface_config
+from vyos.utils.network import get_interface_namespace
+from vyos.utils.process import is_systemd_service_active
+from vyos.utils.process import run
from vyos.template import is_ipv4
from vyos.template import is_ipv6
-from vyos.validate import is_intf_addr_assigned
-from vyos.validate import is_ipv6_link_local
-from vyos.validate import assert_boolean
-from vyos.validate import assert_list
-from vyos.validate import assert_mac
-from vyos.validate import assert_mtu
-from vyos.validate import assert_positive
-from vyos.validate import assert_range
+from vyos.utils.network import is_intf_addr_assigned
+from vyos.utils.network import is_ipv6_link_local
+from vyos.utils.assertion import assert_boolean
+from vyos.utils.assertion import assert_list
+from vyos.utils.assertion import assert_mac
+from vyos.utils.assertion import assert_mtu
+from vyos.utils.assertion import assert_positive
+from vyos.utils.assertion import assert_range
from vyos.ifconfig.control import Control
from vyos.ifconfig.vrrp import VRRP
@@ -197,6 +198,10 @@ class Interface(Control):
'validate': lambda fwd: assert_range(fwd,0,2),
'location': '/proc/sys/net/ipv6/conf/{ifname}/forwarding',
},
+ 'ipv6_accept_dad': {
+ 'validate': lambda dad: assert_range(dad,0,3),
+ 'location': '/proc/sys/net/ipv6/conf/{ifname}/accept_dad',
+ },
'ipv6_dad_transmits': {
'validate': assert_positive,
'location': '/proc/sys/net/ipv6/conf/{ifname}/dad_transmits',
@@ -226,6 +231,10 @@ class Interface(Control):
'validate': lambda link: assert_range(link,0,3),
'location': '/proc/sys/net/ipv4/conf/{ifname}/link_filter',
},
+ 'per_client_thread': {
+ 'validate': assert_boolean,
+ 'location': '/sys/class/net/{ifname}/threaded',
+ },
}
_sysfs_get = {
@@ -262,6 +271,9 @@ class Interface(Control):
'ipv6_forwarding': {
'location': '/proc/sys/net/ipv6/conf/{ifname}/forwarding',
},
+ 'ipv6_accept_dad': {
+ 'location': '/proc/sys/net/ipv6/conf/{ifname}/accept_dad',
+ },
'ipv6_dad_transmits': {
'location': '/proc/sys/net/ipv6/conf/{ifname}/dad_transmits',
},
@@ -274,6 +286,10 @@ class Interface(Control):
'link_detect': {
'location': '/proc/sys/net/ipv4/conf/{ifname}/link_filter',
},
+ 'per_client_thread': {
+ 'validate': assert_boolean,
+ 'location': '/sys/class/net/{ifname}/threaded',
+ },
}
@classmethod
@@ -793,6 +809,30 @@ class Interface(Control):
return None
return self.set_interface('rp_filter', value)
+ def _cleanup_ipv6_source_validation_rules(self, ifname):
+ commands = []
+ results = self._cmd(f'nft -a list chain ip6 raw vyos_rpfilter').split("\n")
+ for line in results:
+ if f'iifname "{ifname}"' in line:
+ handle_search = re.search('handle (\d+)', line)
+ if handle_search:
+ self._cmd(f'nft delete rule ip6 raw vyos_rpfilter handle {handle_search[1]}')
+
+ def set_ipv6_source_validation(self, mode):
+ """
+ Set IPv6 reverse path validation
+
+ Example:
+ >>> from vyos.ifconfig import Interface
+ >>> Interface('eth0').set_ipv6_source_validation('strict')
+ """
+ self._cleanup_ipv6_source_validation_rules(self.ifname)
+ nft_prefix = f'nft add rule ip6 raw vyos_rpfilter iifname "{self.ifname}"'
+ if mode == 'strict':
+ self._cmd(f"{nft_prefix} fib saddr . iif oif 0 counter drop")
+ elif mode == 'loose':
+ self._cmd(f"{nft_prefix} fib saddr oif 0 counter drop")
+
def set_ipv6_accept_ra(self, accept_ra):
"""
Accept Router Advertisements; autoconfigure using them.
@@ -877,6 +917,13 @@ class Interface(Control):
return None
return self.set_interface('ipv6_forwarding', forwarding)
+ def set_ipv6_dad_accept(self, dad):
+ """Whether to accept DAD (Duplicate Address Detection)"""
+ tmp = self.get_interface('ipv6_accept_dad')
+ if tmp == dad:
+ return None
+ return self.set_interface('ipv6_accept_dad', dad)
+
def set_ipv6_dad_messages(self, dad):
"""
The amount of Duplicate Address Detection probes to send.
@@ -1278,44 +1325,49 @@ class Interface(Control):
raise ValueError()
ifname = self.ifname
- config_base = r'/var/lib/dhcp/dhclient'
- config_file = f'{config_base}_{ifname}.conf'
- options_file = f'{config_base}_{ifname}.options'
- pid_file = f'{config_base}_{ifname}.pid'
- lease_file = f'{config_base}_{ifname}.leases'
+ config_base = directories['isc_dhclient_dir'] + '/dhclient'
+ dhclient_config_file = f'{config_base}_{ifname}.conf'
+ dhclient_lease_file = f'{config_base}_{ifname}.leases'
+ systemd_override_file = f'/run/systemd/system/dhclient@{ifname}.service.d/10-override.conf'
systemd_service = f'dhclient@{ifname}.service'
+ # Rendered client configuration files require the apsolute config path
+ self.config['isc_dhclient_dir'] = directories['isc_dhclient_dir']
+
# 'up' check is mandatory b/c even if the interface is A/D, as soon as
# the DHCP client is started the interface will be placed in u/u state.
# This is not what we intended to do when disabling an interface.
- if enable and 'disable' not in self._config:
- if dict_search('dhcp_options.host_name', self._config) == None:
+ if enable and 'disable' not in self.config:
+ if dict_search('dhcp_options.host_name', self.config) == None:
# read configured system hostname.
# maybe change to vyos hostd client ???
hostname = 'vyos'
with open('/etc/hostname', 'r') as f:
hostname = f.read().rstrip('\n')
tmp = {'dhcp_options' : { 'host_name' : hostname}}
- self._config = dict_merge(tmp, self._config)
+ self.config = dict_merge(tmp, self.config)
- render(options_file, 'dhcp-client/daemon-options.j2', self._config)
- render(config_file, 'dhcp-client/ipv4.j2', self._config)
+ render(systemd_override_file, 'dhcp-client/override.conf.j2', self.config)
+ render(dhclient_config_file, 'dhcp-client/ipv4.j2', self.config)
+
+ # Reload systemd unit definitons as some options are dynamically generated
+ self._cmd('systemctl daemon-reload')
# When the DHCP client is restarted a brief outage will occur, as
# the old lease is released a new one is acquired (T4203). We will
# only restart DHCP client if it's option changed, or if it's not
# running, but it should be running (e.g. on system startup)
- if 'dhcp_options_changed' in self._config or not is_systemd_service_active(systemd_service):
+ if 'dhcp_options_changed' in self.config or not is_systemd_service_active(systemd_service):
return self._cmd(f'systemctl restart {systemd_service}')
- return None
else:
if is_systemd_service_active(systemd_service):
self._cmd(f'systemctl stop {systemd_service}')
# cleanup old config files
- for file in [config_file, options_file, pid_file, lease_file]:
+ for file in [dhclient_config_file, systemd_override_file, dhclient_lease_file]:
if os.path.isfile(file):
os.remove(file)
+ return None
def set_dhcpv6(self, enable):
"""
@@ -1325,11 +1377,20 @@ class Interface(Control):
raise ValueError()
ifname = self.ifname
- config_file = f'/run/dhcp6c/dhcp6c.{ifname}.conf'
+ config_base = directories['dhcp6_client_dir']
+ config_file = f'{config_base}/dhcp6c.{ifname}.conf'
+ systemd_override_file = f'/run/systemd/system/dhcp6c@{ifname}.service.d/10-override.conf'
systemd_service = f'dhcp6c@{ifname}.service'
- if enable and 'disable' not in self._config:
- render(config_file, 'dhcp-client/ipv6.j2', self._config)
+ # Rendered client configuration files require the apsolute config path
+ self.config['dhcp6_client_dir'] = directories['dhcp6_client_dir']
+
+ if enable and 'disable' not in self.config:
+ render(systemd_override_file, 'dhcp-client/ipv6.override.conf.j2', self.config)
+ render(config_file, 'dhcp-client/ipv6.j2', self.config)
+
+ # Reload systemd unit definitons as some options are dynamically generated
+ self._cmd('systemctl daemon-reload')
# We must ignore any return codes. This is required to enable
# DHCPv6-PD for interfaces which are yet not up and running.
@@ -1340,6 +1401,8 @@ class Interface(Control):
if os.path.isfile(config_file):
os.remove(config_file)
+ return None
+
def set_mirror_redirect(self):
# Please refer to the document for details
# - https://man7.org/linux/man-pages/man8/tc.8.html
@@ -1351,20 +1414,20 @@ class Interface(Control):
if 'netns' in self.config:
return None
- source_if = self._config['ifname']
+ source_if = self.config['ifname']
mirror_config = None
- if 'mirror' in self._config:
- mirror_config = self._config['mirror']
- if 'is_mirror_intf' in self._config:
- source_if = next(iter(self._config['is_mirror_intf']))
- mirror_config = self._config['is_mirror_intf'][source_if].get('mirror', None)
+ if 'mirror' in self.config:
+ mirror_config = self.config['mirror']
+ if 'is_mirror_intf' in self.config:
+ source_if = next(iter(self.config['is_mirror_intf']))
+ mirror_config = self.config['is_mirror_intf'][source_if].get('mirror', None)
redirect_config = None
# clear existing ingess - ignore errors (e.g. "Error: Cannot find specified
# qdisc on specified device") - we simply cleanup all stuff here
- if not 'traffic_policy' in self._config:
+ if not 'traffic_policy' in self.config:
self._popen(f'tc qdisc del dev {source_if} parent ffff: 2>/dev/null');
self._popen(f'tc qdisc del dev {source_if} parent 1: 2>/dev/null');
@@ -1388,43 +1451,39 @@ class Interface(Control):
if err: print('tc qdisc(filter for mirror port failed')
# Apply interface traffic redirection policy
- elif 'redirect' in self._config:
+ elif 'redirect' in self.config:
_, err = self._popen(f'tc qdisc add dev {source_if} handle ffff: ingress')
if err: print(f'tc qdisc add for redirect failed!')
- target_if = self._config['redirect']
+ target_if = self.config['redirect']
_, err = self._popen(f'tc filter add dev {source_if} parent ffff: protocol '\
f'all prio 10 u32 match u32 0 0 flowid 1:1 action mirred '\
f'egress redirect dev {target_if}')
if err: print('tc filter add for redirect failed')
- def set_xdp(self, state):
+ def set_per_client_thread(self, enable):
"""
- Enable Kernel XDP support. State can be either True or False.
+ Per-device control to enable/disable the threaded mode for all the napi
+ instances of the given network device, without the need for a device up/down.
+
+ User sets it to 1 or 0 to enable or disable threaded mode.
Example:
>>> from vyos.ifconfig import Interface
- >>> i = Interface('eth0')
- >>> i.set_xdp(True)
- """
- if not isinstance(state, bool):
- raise ValueError("Value out of range")
-
- # https://vyos.dev/T3448 - there is (yet) no RPI support for XDP
- if not os.path.exists('/usr/sbin/xdp_loader'):
- return
-
- ifname = self.config['ifname']
- cmd = f'xdp_loader -d {ifname} -U --auto-mode'
- if state:
- # Using 'xdp' will automatically decide if the driver supports
- # 'xdpdrv' or only 'xdpgeneric'. A user later sees which driver is
- # actually in use by calling 'ip a' or 'show interfaces ethernet'
- cmd = f'xdp_loader -d {ifname} --auto-mode -F --progsec xdp_router ' \
- f'--filename /usr/share/vyos/xdp/xdp_prog_kern.o && ' \
- f'xdp_prog_user -d {ifname}'
+ >>> Interface('wg1').set_per_client_thread(1)
+ """
+ # In the case of a "virtual" interface like wireguard, the sysfs
+ # node is only created once there is a peer configured. We can now
+ # add a verify() code-path for this or make this dynamic without
+ # nagging the user
+ tmp = self._sysfs_get['per_client_thread']['location']
+ if not os.path.exists(tmp):
+ return None
- return self._cmd(cmd)
+ tmp = self.get_interface('per_client_thread')
+ if tmp == enable:
+ return None
+ self.set_interface('per_client_thread', enable)
def update(self, config):
""" General helper function which works on a dictionary retrived by
@@ -1439,7 +1498,7 @@ class Interface(Control):
# Cache the configuration - it will be reused inside e.g. DHCP handler
# XXX: maybe pass the option via __init__ in the future and rename this
# method to apply()?
- self._config = config
+ self.config = config
# Change interface MAC address - re-set to real hardware address (hw-id)
# if custom mac is removed. Skip if bond member.
@@ -1576,6 +1635,11 @@ class Interface(Control):
value = tmp if (tmp != None) else '0'
self.set_ipv4_source_validation(value)
+ # IPv6 source-validation
+ tmp = dict_search('ipv6.source_validation', config)
+ value = tmp if (tmp != None) else '0'
+ self.set_ipv6_source_validation(value)
+
# MTU - Maximum Transfer Unit has a default value. It must ALWAYS be set
# before mangling any IPv6 option. If MTU is less then 1280 IPv6 will be
# automatically disabled by the kernel. Also MTU must be increased before
@@ -1605,10 +1669,17 @@ class Interface(Control):
value = '1' if (tmp != None) else '0'
self.set_ipv6_autoconf(value)
- # IPv6 Duplicate Address Detection (DAD) tries
+ # Whether to accept IPv6 DAD (Duplicate Address Detection) packets
+ tmp = dict_search('ipv6.accept_dad', config)
+ # Not all interface types got this CLI option, but if they do, there
+ # is an XML defaultValue available
+ if (tmp != None): self.set_ipv6_dad_accept(tmp)
+
+ # IPv6 DAD tries
tmp = dict_search('ipv6.dup_addr_detect_transmits', config)
- value = tmp if (tmp != None) else '1'
- self.set_ipv6_dad_messages(value)
+ # Not all interface types got this CLI option, but if they do, there
+ # is an XML defaultValue available
+ if (tmp != None): self.set_ipv6_dad_messages(tmp)
# Delete old IPv6 EUI64 addresses before changing MAC
for addr in (dict_search('ipv6.address.eui64_old', config) or []):
@@ -1631,12 +1702,14 @@ class Interface(Control):
tmp = config.get('is_bridge_member')
self.add_to_bridge(tmp)
- # eXpress Data Path - highly experimental
- self.set_xdp('xdp' in config)
-
# configure interface mirror or redirection target
self.set_mirror_redirect()
+ # enable/disable NAPI threading mode
+ tmp = dict_search('per_client_thread', config)
+ value = '1' if (tmp != None) else '0'
+ self.set_per_client_thread(value)
+
# Enable/Disable of an interface must always be done at the end of the
# derived class to make use of the ref-counting set_admin_state()
# function. We will only enable the interface if 'up' was called as
diff --git a/python/vyos/ifconfig/l2tpv3.py b/python/vyos/ifconfig/l2tpv3.py
index fcd1fbf81..85a89ef8b 100644
--- a/python/vyos/ifconfig/l2tpv3.py
+++ b/python/vyos/ifconfig/l2tpv3.py
@@ -1,4 +1,4 @@
-# Copyright 2019-2021 VyOS maintainers and contributors <maintainers@vyos.io>
+# Copyright 2019-2023 VyOS maintainers and contributors <maintainers@vyos.io>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -15,7 +15,8 @@
from time import sleep
from time import time
-from vyos.util import run
+
+from vyos.utils.process import run
from vyos.ifconfig.interface import Interface
def wait_for_add_l2tpv3(timeout=10, sleep_interval=1, cmd=None):
diff --git a/python/vyos/ifconfig/macsec.py b/python/vyos/ifconfig/macsec.py
index 1a78d18d8..9329c5ee7 100644
--- a/python/vyos/ifconfig/macsec.py
+++ b/python/vyos/ifconfig/macsec.py
@@ -1,4 +1,4 @@
-# Copyright 2020-2021 VyOS maintainers and contributors <maintainers@vyos.io>
+# Copyright 2020-2023 VyOS maintainers and contributors <maintainers@vyos.io>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -41,10 +41,30 @@ class MACsecIf(Interface):
Create MACsec interface in OS kernel. Interface is administrative
down by default.
"""
+
# create tunnel interface
cmd = 'ip link add link {source_interface} {ifname} type {type}'.format(**self.config)
cmd += f' cipher {self.config["security"]["cipher"]}'
self._cmd(cmd)
+ # Check if using static keys
+ if 'static' in self.config["security"]:
+ # Set static TX key
+ cmd = 'ip macsec add {ifname} tx sa 0 pn 1 on key 00'.format(**self.config)
+ cmd += f' {self.config["security"]["static"]["key"]}'
+ self._cmd(cmd)
+
+ for peer, peer_config in self.config["security"]["static"]["peer"].items():
+ if 'disable' in peer_config:
+ continue
+
+ # Create the address
+ cmd = 'ip macsec add {ifname} rx port 1 address'.format(**self.config)
+ cmd += f' {peer_config["mac"]}'
+ self._cmd(cmd)
+ # Add the rx-key to the address
+ cmd += f' sa 0 pn 1 on key 01 {peer_config["key"]}'
+ self._cmd(cmd)
+
# interface is always A/D down. It needs to be enabled explicitly
self.set_admin_state('down')
diff --git a/python/vyos/ifconfig/pppoe.py b/python/vyos/ifconfig/pppoe.py
index 437fe0cae..febf1452d 100644
--- a/python/vyos/ifconfig/pppoe.py
+++ b/python/vyos/ifconfig/pppoe.py
@@ -14,8 +14,8 @@
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
from vyos.ifconfig.interface import Interface
-from vyos.validate import assert_range
-from vyos.util import get_interface_config
+from vyos.utils.assertion import assert_range
+from vyos.utils.network import get_interface_config
@Interface.register
class PPPoEIf(Interface):
diff --git a/python/vyos/ifconfig/tunnel.py b/python/vyos/ifconfig/tunnel.py
index b7bf7d982..9ba7b31a6 100644
--- a/python/vyos/ifconfig/tunnel.py
+++ b/python/vyos/ifconfig/tunnel.py
@@ -17,8 +17,8 @@
# https://community.hetzner.com/tutorials/linux-setup-gre-tunnel
from vyos.ifconfig.interface import Interface
-from vyos.util import dict_search
-from vyos.validate import assert_list
+from vyos.utils.dict import dict_search
+from vyos.utils.assertion import assert_list
def enable_to_on(value):
if value == 'enable':
diff --git a/python/vyos/ifconfig/vrrp.py b/python/vyos/ifconfig/vrrp.py
index 47aaadecd..fde903a53 100644
--- a/python/vyos/ifconfig/vrrp.py
+++ b/python/vyos/ifconfig/vrrp.py
@@ -1,4 +1,4 @@
-# Copyright 2019 VyOS maintainers and contributors <maintainers@vyos.io>
+# Copyright 2019-2023 VyOS maintainers and contributors <maintainers@vyos.io>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -21,8 +21,11 @@ from time import time
from time import sleep
from tabulate import tabulate
-from vyos import util
from vyos.configquery import ConfigTreeQuery
+from vyos.utils.convert import seconds_to_human
+from vyos.utils.file import read_file
+from vyos.utils.file import wait_for_file_write_complete
+from vyos.utils.process import process_running
class VRRPError(Exception):
pass
@@ -84,21 +87,21 @@ class VRRP(object):
def is_running(cls):
if not os.path.exists(cls.location['pid']):
return False
- return util.process_running(cls.location['pid'])
+ return process_running(cls.location['pid'])
@classmethod
def collect(cls, what):
fname = cls.location[what]
try:
# send signal to generate the configuration file
- pid = util.read_file(cls.location['pid'])
- util.wait_for_file_write_complete(fname,
+ pid = read_file(cls.location['pid'])
+ wait_for_file_write_complete(fname,
pre_hook=(lambda: os.kill(int(pid), cls._signal[what])),
timeout=30)
- return util.read_file(fname)
+ return read_file(fname)
except OSError:
- # raised by vyos.util.read_file
+ # raised by vyos.utils.file.read_file
raise VRRPNoData("VRRP data is not available (wait time exceeded)")
except FileNotFoundError:
raise VRRPNoData("VRRP data is not available (process not running or no active groups)")
@@ -145,7 +148,7 @@ class VRRP(object):
priority = data['effective_priority']
since = int(time() - float(data['last_transition']))
- last = util.seconds_to_human(since)
+ last = seconds_to_human(since)
groups.append([name, intf, vrid, state, priority, last])
diff --git a/python/vyos/ifconfig/vti.py b/python/vyos/ifconfig/vti.py
index dc99d365a..9ebbeb9ed 100644
--- a/python/vyos/ifconfig/vti.py
+++ b/python/vyos/ifconfig/vti.py
@@ -14,7 +14,7 @@
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
from vyos.ifconfig.interface import Interface
-from vyos.util import dict_search
+from vyos.utils.dict import dict_search
@Interface.register
class VTIIf(Interface):
diff --git a/python/vyos/ifconfig/vxlan.py b/python/vyos/ifconfig/vxlan.py
index 5baff10a9..6a9911588 100644
--- a/python/vyos/ifconfig/vxlan.py
+++ b/python/vyos/ifconfig/vxlan.py
@@ -15,7 +15,7 @@
from vyos import ConfigError
from vyos.ifconfig import Interface
-from vyos.util import dict_search
+from vyos.utils.dict import dict_search
@Interface.register
class VXLANIf(Interface):
diff --git a/python/vyos/ifconfig/wireguard.py b/python/vyos/ifconfig/wireguard.py
index fe5e9c519..4aac103ec 100644
--- a/python/vyos/ifconfig/wireguard.py
+++ b/python/vyos/ifconfig/wireguard.py
@@ -1,4 +1,4 @@
-# Copyright 2019-2022 VyOS maintainers and contributors <maintainers@vyos.io>
+# Copyright 2019-2023 VyOS maintainers and contributors <maintainers@vyos.io>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -25,6 +25,7 @@ from hurry.filesize import alternative
from vyos.ifconfig import Interface
from vyos.ifconfig import Operational
from vyos.template import is_ipv6
+from vyos.base import Warning
class WireGuardOperational(Operational):
def _dump(self):
@@ -184,7 +185,6 @@ class WireGuardIf(Interface):
base_cmd += f' private-key {tmp_file.name}'
base_cmd = base_cmd.format(**config)
-
if 'peer' in config:
for peer, peer_config in config['peer'].items():
# T4702: No need to configure this peer when it was explicitly
diff --git a/python/vyos/initialsetup.py b/python/vyos/initialsetup.py
index 574e7892d..3b280dc6b 100644
--- a/python/vyos/initialsetup.py
+++ b/python/vyos/initialsetup.py
@@ -1,7 +1,7 @@
# initialsetup -- functions for setting common values in config file,
# for use in installation and first boot scripts
#
-# Copyright (C) 2018 VyOS maintainers and contributors
+# Copyright (C) 2018-2023 VyOS maintainers and contributors
#
# This library is free software; you can redistribute it and/or modify it under the terms of
# the GNU Lesser General Public License as published by the Free Software Foundation;
@@ -12,10 +12,12 @@
# See the GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License along with this library;
-# if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+# if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
import vyos.configtree
-import vyos.authutils
+
+from vyos.utils.auth import make_password_hash
+from vyos.utils.auth import split_ssh_public_key
def set_interface_address(config, intf, addr, intf_type="ethernet"):
config.set(["interfaces", intf_type, intf, "address"], value=addr)
@@ -35,8 +37,8 @@ def set_default_gateway(config, gateway):
def set_user_password(config, user, password):
# Make a password hash
- hash = vyos.authutils.make_password_hash(password)
-
+ hash = make_password_hash(password)
+
config.set(["system", "login", "user", user, "authentication", "encrypted-password"], value=hash)
config.set(["system", "login", "user", user, "authentication", "plaintext-password"], value="")
@@ -48,7 +50,7 @@ def set_user_level(config, user, level):
config.set(["system", "login", "user", user, "level"], value=level)
def set_user_ssh_key(config, user, key_string):
- key = vyos.authutils.split_ssh_public_key(key_string, defaultname=user)
+ key = split_ssh_public_key(key_string, defaultname=user)
config.set(["system", "login", "user", user, "authentication", "public-keys", key["name"], "key"], value=key["data"])
config.set(["system", "login", "user", user, "authentication", "public-keys", key["name"], "type"], value=key["type"])
diff --git a/python/vyos/ipsec.py b/python/vyos/ipsec.py
index bb5611025..4603aab22 100644
--- a/python/vyos/ipsec.py
+++ b/python/vyos/ipsec.py
@@ -33,9 +33,11 @@ def get_vici_sas():
session = vici_session()
except Exception:
raise ViciInitiateError("IPsec not initialized")
- sas = list(session.list_sas())
- return sas
-
+ try:
+ sas = list(session.list_sas())
+ return sas
+ except Exception:
+ raise ViciCommandError(f'Failed to get SAs')
def get_vici_connections():
from vici import Session as vici_session
@@ -44,9 +46,11 @@ def get_vici_connections():
session = vici_session()
except Exception:
raise ViciInitiateError("IPsec not initialized")
- connections = list(session.list_conns())
- return connections
-
+ try:
+ connections = list(session.list_conns())
+ return connections
+ except Exception:
+ raise ViciCommandError(f'Failed to get connections')
def get_vici_sas_by_name(ike_name: str, tunnel: str) -> list:
"""
diff --git a/python/vyos/migrator.py b/python/vyos/migrator.py
index 87c74e1ea..872682bc0 100644
--- a/python/vyos/migrator.py
+++ b/python/vyos/migrator.py
@@ -20,7 +20,7 @@ import logging
import vyos.defaults
import vyos.component_version as component_version
-from vyos.util import cmd
+from vyos.utils.process import cmd
log_file = os.path.join(vyos.defaults.directories['config'], 'vyos-migrate.log')
diff --git a/python/vyos/nat.py b/python/vyos/nat.py
index 53fd7fb33..9cbc2b96e 100644
--- a/python/vyos/nat.py
+++ b/python/vyos/nat.py
@@ -15,7 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from vyos.template import is_ip_network
-from vyos.util import dict_search_args
+from vyos.utils.dict import dict_search_args
from vyos.template import bracketize_ipv6
@@ -54,28 +54,35 @@ def parse_nat_rule(rule_conf, rule_id, nat_type, ipv6=False):
translation_str = 'return'
log_suffix = '-EXCL'
elif 'translation' in rule_conf:
- translation_prefix = nat_type[:1]
- translation_output = [f'{translation_prefix}nat']
addr = dict_search_args(rule_conf, 'translation', 'address')
port = dict_search_args(rule_conf, 'translation', 'port')
+ if 'redirect' in rule_conf['translation']:
+ translation_output = [f'redirect']
+ redirect_port = dict_search_args(rule_conf, 'translation', 'redirect', 'port')
+ if redirect_port:
+ translation_output.append(f'to {redirect_port}')
+ else:
- if addr and is_ip_network(addr):
- if not ipv6:
- map_addr = dict_search_args(rule_conf, nat_type, 'address')
- translation_output.append(f'{ip_prefix} prefix to {ip_prefix} {translation_prefix}addr map {{ {map_addr} : {addr} }}')
- ignore_type_addr = True
+ translation_prefix = nat_type[:1]
+ translation_output = [f'{translation_prefix}nat']
+
+ if addr and is_ip_network(addr):
+ if not ipv6:
+ map_addr = dict_search_args(rule_conf, nat_type, 'address')
+ translation_output.append(f'{ip_prefix} prefix to {ip_prefix} {translation_prefix}addr map {{ {map_addr} : {addr} }}')
+ ignore_type_addr = True
+ else:
+ translation_output.append(f'prefix to {addr}')
+ elif addr == 'masquerade':
+ if port:
+ addr = f'{addr} to '
+ translation_output = [addr]
+ log_suffix = '-MASQ'
else:
- translation_output.append(f'prefix to {addr}')
- elif addr == 'masquerade':
- if port:
- addr = f'{addr} to '
- translation_output = [addr]
- log_suffix = '-MASQ'
- else:
- translation_output.append('to')
- if addr:
- addr = bracketize_ipv6(addr)
- translation_output.append(addr)
+ translation_output.append('to')
+ if addr:
+ addr = bracketize_ipv6(addr)
+ translation_output.append(addr)
options = []
addr_mapping = dict_search_args(rule_conf, 'translation', 'options', 'address_mapping')
@@ -90,6 +97,39 @@ def parse_nat_rule(rule_conf, rule_id, nat_type, ipv6=False):
if options:
translation_str += f' {",".join(options)}'
+ if not ipv6 and 'backend' in rule_conf['load_balance']:
+ hash_input_items = []
+ current_prob = 0
+ nat_map = []
+
+ for trans_addr, addr in rule_conf['load_balance']['backend'].items():
+ item_prob = int(addr['weight'])
+ upper_limit = current_prob + item_prob - 1
+ hash_val = str(current_prob) + '-' + str(upper_limit)
+ element = hash_val + " : " + trans_addr
+ nat_map.append(element)
+ current_prob = current_prob + item_prob
+
+ elements = ' , '.join(nat_map)
+
+ if 'hash' in rule_conf['load_balance'] and 'random' in rule_conf['load_balance']['hash']:
+ translation_str += ' numgen random mod 100 map ' + '{ ' + f'{elements}' + ' }'
+ else:
+ for input_param in rule_conf['load_balance']['hash']:
+ if input_param == 'source-address':
+ param = 'ip saddr'
+ elif input_param == 'destination-address':
+ param = 'ip daddr'
+ elif input_param == 'source-port':
+ prot = rule_conf['protocol']
+ param = f'{prot} sport'
+ elif input_param == 'destination-port':
+ prot = rule_conf['protocol']
+ param = f'{prot} dport'
+ hash_input_items.append(param)
+ hash_input = ' . '.join(hash_input_items)
+ translation_str += f' jhash ' + f'{hash_input}' + ' mod 100 map ' + '{ ' + f'{elements}' + ' }'
+
for target in ['source', 'destination']:
if target not in rule_conf:
continue
diff --git a/python/vyos/pki.py b/python/vyos/pki.py
index cd15e3878..792e24b76 100644
--- a/python/vyos/pki.py
+++ b/python/vyos/pki.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2021 VyOS maintainers and contributors
+# Copyright (C) 2023 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -63,6 +63,18 @@ private_format_map = {
'OpenSSH': serialization.PrivateFormat.OpenSSH
}
+hash_map = {
+ 'sha256': hashes.SHA256,
+ 'sha384': hashes.SHA384,
+ 'sha512': hashes.SHA512,
+}
+
+def get_certificate_fingerprint(cert, hash):
+ hash_algorithm = hash_map[hash]()
+ fp = cert.fingerprint(hash_algorithm)
+
+ return fp.hex(':').upper()
+
def encode_certificate(cert):
return cert.public_bytes(encoding=serialization.Encoding.PEM).decode('utf-8')
diff --git a/python/vyos/qos/base.py b/python/vyos/qos/base.py
index 33bb8ae28..d8bbfe970 100644
--- a/python/vyos/qos/base.py
+++ b/python/vyos/qos/base.py
@@ -1,4 +1,4 @@
-# Copyright 2022 VyOS maintainers and contributors <maintainers@vyos.io>
+# Copyright 2022-2023 VyOS maintainers and contributors <maintainers@vyos.io>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -16,14 +16,52 @@
import os
from vyos.base import Warning
-from vyos.util import cmd
-from vyos.util import dict_search
-from vyos.util import read_file
+from vyos.utils.process import cmd
+from vyos.utils.dict import dict_search
+from vyos.utils.file import read_file
+
+from vyos.utils.network import get_protocol_by_name
+
class QoSBase:
_debug = False
_direction = ['egress']
_parent = 0xffff
+ _dsfields = {
+ "default": 0x0,
+ "lowdelay": 0x10,
+ "throughput": 0x08,
+ "reliability": 0x04,
+ "mincost": 0x02,
+ "priority": 0x20,
+ "immediate": 0x40,
+ "flash": 0x60,
+ "flash-override": 0x80,
+ "critical": 0x0A,
+ "internet": 0xC0,
+ "network": 0xE0,
+ "AF11": 0x28,
+ "AF12": 0x30,
+ "AF13": 0x38,
+ "AF21": 0x48,
+ "AF22": 0x50,
+ "AF23": 0x58,
+ "AF31": 0x68,
+ "AF32": 0x70,
+ "AF33": 0x78,
+ "AF41": 0x88,
+ "AF42": 0x90,
+ "AF43": 0x98,
+ "CS1": 0x20,
+ "CS2": 0x40,
+ "CS3": 0x60,
+ "CS4": 0x80,
+ "CS5": 0xA0,
+ "CS6": 0xC0,
+ "CS7": 0xE0,
+ "EF": 0xB8
+ }
+ qostype = None
def __init__(self, interface):
if os.path.exists('/tmp/vyos.qos.debug'):
@@ -45,6 +83,12 @@ class QoSBase:
return tmp[-1]
return None
+ def _get_dsfield(self, value):
+ if value in self._dsfields:
+ return self._dsfields[value]
+ else:
+ return value
+
def _build_base_qdisc(self, config : dict, cls_id : int):
"""
Add/replace qdisc for every class (also default is a class). This is
@@ -63,7 +107,8 @@ class QoSBase:
queue_limit = dict_search('queue_limit', config)
for ii in range(1, 4):
- tmp = f'tc qdisc replace dev {self._interface} parent {handle:x}:{ii:x} pfifo limit {queue_limit}'
+ tmp = f'tc qdisc replace dev {self._interface} parent {handle:x}:{ii:x} pfifo'
+ if queue_limit: tmp += f' limit {queue_limit}'
self._cmd(tmp)
elif queue_type == 'fair-queue':
@@ -160,18 +205,21 @@ class QoSBase:
self._build_base_qdisc(cls_config, int(cls))
# every match criteria has it's tc instance
- filter_cmd = f'tc filter replace dev {self._interface} parent {self._parent:x}:'
+ filter_cmd_base = f'tc filter add dev {self._interface} parent {self._parent:x}:'
if priority:
- filter_cmd += f' prio {cls}'
+ filter_cmd_base += f' prio {cls}'
elif 'priority' in cls_config:
prio = cls_config['priority']
- filter_cmd += f' prio {prio}'
+ filter_cmd_base += f' prio {prio}'
- filter_cmd += ' protocol all'
+ filter_cmd_base += ' protocol all'
if 'match' in cls_config:
- for match, match_config in cls_config['match'].items():
+ for index, (match, match_config) in enumerate(cls_config['match'].items(), start=1):
+ filter_cmd = filter_cmd_base
+ if self.qostype == 'shaper' and 'prio ' not in filter_cmd:
+ filter_cmd += f' prio {index}'
if 'mark' in match_config:
mark = match_config['mark']
filter_cmd += f' handle {mark} fw'
@@ -197,7 +245,17 @@ class QoSBase:
if tmp: filter_cmd += f' match {tc_af} dport {tmp} 0xffff'
tmp = dict_search(f'{af}.protocol', match_config)
- if tmp: filter_cmd += f' match {tc_af} protocol {tmp} 0xff'
+ if tmp:
+ tmp = get_protocol_by_name(tmp)
+ filter_cmd += f' match {tc_af} protocol {tmp} 0xff'
+
+ tmp = dict_search(f'{af}.dscp', match_config)
+ if tmp:
+ tmp = self._get_dsfield(tmp)
+ if af == 'ip':
+ filter_cmd += f' match {tc_af} dsfield {tmp} 0xff'
+ elif af == 'ipv6':
+ filter_cmd += f' match u16 {tmp} 0x0ff0 at 0'
# Will match against total length of an IPv4 packet and
# payload length of an IPv6 packet.
@@ -236,67 +294,100 @@ class QoSBase:
elif af == 'ipv6':
filter_cmd += f' match u8 {mask} {mask} at 53'
+ cls = int(cls)
+ filter_cmd += f' flowid {self._parent:x}:{cls:x}'
+ self._cmd(filter_cmd)
+
+ if any(tmp in ['exceed', 'bandwidth', 'burst'] for tmp in cls_config):
+ filter_cmd += f' action police'
+
+ if 'exceed' in cls_config:
+ action = cls_config['exceed']
+ filter_cmd += f' conform-exceed {action}'
+ if 'not_exceed' in cls_config:
+ action = cls_config['not_exceed']
+ filter_cmd += f'/{action}'
+
+ if 'bandwidth' in cls_config:
+ rate = self._rate_convert(cls_config['bandwidth'])
+ filter_cmd += f' rate {rate}'
+
+ if 'burst' in cls_config:
+ burst = cls_config['burst']
+ filter_cmd += f' burst {burst}'
+ cls = int(cls)
+ filter_cmd += f' flowid {self._parent:x}:{cls:x}'
+ self._cmd(filter_cmd)
+
else:
filter_cmd += ' basic'
+ cls = int(cls)
+ filter_cmd += f' flowid {self._parent:x}:{cls:x}'
+ self._cmd(filter_cmd)
+
+
# The police block allows limiting of the byte or packet rate of
# traffic matched by the filter it is attached to.
# https://man7.org/linux/man-pages/man8/tc-police.8.html
- if any(tmp in ['exceed', 'bandwidth', 'burst'] for tmp in cls_config):
- filter_cmd += f' action police'
-
- if 'exceed' in cls_config:
- action = cls_config['exceed']
- filter_cmd += f' conform-exceed {action}'
- if 'not_exceed' in cls_config:
- action = cls_config['not_exceed']
- filter_cmd += f'/{action}'
- if 'bandwidth' in cls_config:
- rate = self._rate_convert(cls_config['bandwidth'])
- filter_cmd += f' rate {rate}'
-
- if 'burst' in cls_config:
- burst = cls_config['burst']
- filter_cmd += f' burst {burst}'
-
- cls = int(cls)
- filter_cmd += f' flowid {self._parent:x}:{cls:x}'
- self._cmd(filter_cmd)
+ # T5295: We do not handle rate via tc filter directly,
+ # but rather set the tc filter to direct traffic to the correct tc class flow.
+ #
+ # if any(tmp in ['exceed', 'bandwidth', 'burst'] for tmp in cls_config):
+ # filter_cmd += f' action police'
+ #
+ # if 'exceed' in cls_config:
+ # action = cls_config['exceed']
+ # filter_cmd += f' conform-exceed {action}'
+ # if 'not_exceed' in cls_config:
+ # action = cls_config['not_exceed']
+ # filter_cmd += f'/{action}'
+ #
+ # if 'bandwidth' in cls_config:
+ # rate = self._rate_convert(cls_config['bandwidth'])
+ # filter_cmd += f' rate {rate}'
+ #
+ # if 'burst' in cls_config:
+ # burst = cls_config['burst']
+ # filter_cmd += f' burst {burst}'
if 'default' in config:
+ default_cls_id = 1
if 'class' in config:
class_id_max = self._get_class_max_id(config)
default_cls_id = int(class_id_max) +1
- self._build_base_qdisc(config['default'], default_cls_id)
+ self._build_base_qdisc(config['default'], default_cls_id)
- filter_cmd = f'tc filter replace dev {self._interface} parent {self._parent:x}: '
- filter_cmd += 'prio 255 protocol all basic'
+ if self.qostype == 'limiter':
+ if 'default' in config:
+ filter_cmd = f'tc filter replace dev {self._interface} parent {self._parent:x}: '
+ filter_cmd += 'prio 255 protocol all basic'
- # The police block allows limiting of the byte or packet rate of
- # traffic matched by the filter it is attached to.
- # https://man7.org/linux/man-pages/man8/tc-police.8.html
- if any(tmp in ['exceed', 'bandwidth', 'burst'] for tmp in config['default']):
- filter_cmd += f' action police'
-
- if 'exceed' in config['default']:
- action = config['default']['exceed']
- filter_cmd += f' conform-exceed {action}'
- if 'not_exceed' in config['default']:
- action = config['default']['not_exceed']
- filter_cmd += f'/{action}'
+ # The police block allows limiting of the byte or packet rate of
+ # traffic matched by the filter it is attached to.
+ # https://man7.org/linux/man-pages/man8/tc-police.8.html
+ if any(tmp in ['exceed', 'bandwidth', 'burst'] for tmp in
+ config['default']):
+ filter_cmd += f' action police'
- if 'bandwidth' in config['default']:
- rate = self._rate_convert(config['default']['bandwidth'])
- filter_cmd += f' rate {rate}'
+ if 'exceed' in config['default']:
+ action = config['default']['exceed']
+ filter_cmd += f' conform-exceed {action}'
+ if 'not_exceed' in config['default']:
+ action = config['default']['not_exceed']
+ filter_cmd += f'/{action}'
- if 'burst' in config['default']:
- burst = config['default']['burst']
- filter_cmd += f' burst {burst}'
+ if 'bandwidth' in config['default']:
+ rate = self._rate_convert(config['default']['bandwidth'])
+ filter_cmd += f' rate {rate}'
- if 'class' in config:
- filter_cmd += f' flowid {self._parent:x}:{default_cls_id:x}'
+ if 'burst' in config['default']:
+ burst = config['default']['burst']
+ filter_cmd += f' burst {burst}'
- self._cmd(filter_cmd)
+ if 'class' in config:
+ filter_cmd += f' flowid {self._parent:x}:{default_cls_id:x}'
+ self._cmd(filter_cmd)
diff --git a/python/vyos/qos/limiter.py b/python/vyos/qos/limiter.py
index ace0c0b6c..3f5c11112 100644
--- a/python/vyos/qos/limiter.py
+++ b/python/vyos/qos/limiter.py
@@ -17,6 +17,7 @@ from vyos.qos.base import QoSBase
class Limiter(QoSBase):
_direction = ['ingress']
+ qostype = 'limiter'
def update(self, config, direction):
tmp = f'tc qdisc add dev {self._interface} handle {self._parent:x}: {direction}'
diff --git a/python/vyos/qos/priority.py b/python/vyos/qos/priority.py
index 6d4a60a43..8182400f9 100644
--- a/python/vyos/qos/priority.py
+++ b/python/vyos/qos/priority.py
@@ -14,7 +14,7 @@
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
from vyos.qos.base import QoSBase
-from vyos.util import dict_search
+from vyos.utils.dict import dict_search
class Priority(QoSBase):
_parent = 1
diff --git a/python/vyos/qos/trafficshaper.py b/python/vyos/qos/trafficshaper.py
index f42f4d022..c63c7cf39 100644
--- a/python/vyos/qos/trafficshaper.py
+++ b/python/vyos/qos/trafficshaper.py
@@ -22,6 +22,7 @@ MINQUANTUM = 1000
class TrafficShaper(QoSBase):
_parent = 1
+ qostype = 'shaper'
# https://man7.org/linux/man-pages/man8/tc-htb.8.html
def update(self, config, direction):
@@ -70,7 +71,17 @@ class TrafficShaper(QoSBase):
cls = int(cls)
# bandwidth is a mandatory CLI node
- rate = self._rate_convert(cls_config['bandwidth'])
+ # T5296 if bandwidth 'auto' or 'xx%' get value from config shaper total "bandwidth"
+ # i.e from set shaper test bandwidth '300mbit'
+ # without it, it tries to get value from qos.base /sys/class/net/{self._interface}/speed
+ if cls_config['bandwidth'] == 'auto':
+ rate = self._rate_convert(config['bandwidth'])
+ elif cls_config['bandwidth'].endswith('%'):
+ percent = cls_config['bandwidth'].rstrip('%')
+ rate = self._rate_convert(config['bandwidth']) * int(percent) // 100
+ else:
+ rate = self._rate_convert(cls_config['bandwidth'])
+
burst = cls_config['burst']
quantum = cls_config['codel_quantum']
diff --git a/python/vyos/remote.py b/python/vyos/remote.py
index 66044fa52..cf731c881 100644
--- a/python/vyos/remote.py
+++ b/python/vyos/remote.py
@@ -25,22 +25,21 @@ import urllib.parse
from ftplib import FTP
from ftplib import FTP_TLS
-from paramiko import SSHClient
+from paramiko import SSHClient, SSHException
from paramiko import MissingHostKeyPolicy
from requests import Session
from requests.adapters import HTTPAdapter
from requests.packages.urllib3 import PoolManager
-from vyos.util import ask_yes_no
-from vyos.util import begin
-from vyos.util import cmd
-from vyos.util import make_incremental_progressbar
-from vyos.util import make_progressbar
-from vyos.util import print_error
+from vyos.utils.io import ask_yes_no
+from vyos.utils.io import make_incremental_progressbar
+from vyos.utils.io import make_progressbar
+from vyos.utils.io import print_error
+from vyos.utils.misc import begin
+from vyos.utils.process import cmd
from vyos.version import get_version
-
CHUNK_SIZE = 8192
class InteractivePolicy(MissingHostKeyPolicy):
@@ -51,7 +50,7 @@ class InteractivePolicy(MissingHostKeyPolicy):
def missing_host_key(self, client, hostname, key):
print_error(f"Host '{hostname}' not found in known hosts.")
print_error('Fingerprint: ' + key.get_fingerprint().hex())
- if ask_yes_no('Do you wish to continue?'):
+ if sys.stdout.isatty() and ask_yes_no('Do you wish to continue?'):
if client._host_keys_filename\
and ask_yes_no('Do you wish to permanently add this host/key pair to known hosts?'):
client._host_keys.add(hostname, key.get_name(), key)
@@ -97,7 +96,13 @@ def check_storage(path, size):
class FtpC:
- def __init__(self, url, progressbar=False, check_space=False, source_host='', source_port=0):
+ def __init__(self,
+ url,
+ progressbar=False,
+ check_space=False,
+ source_host='',
+ source_port=0,
+ timeout=10):
self.secure = url.scheme == 'ftps'
self.hostname = url.hostname
self.path = url.path
@@ -107,12 +112,15 @@ class FtpC:
self.source = (source_host, source_port)
self.progressbar = progressbar
self.check_space = check_space
+ self.timeout = timeout
def _establish(self):
if self.secure:
- return FTP_TLS(source_address=self.source, context=ssl.create_default_context())
+ return FTP_TLS(source_address=self.source,
+ context=ssl.create_default_context(),
+ timeout=self.timeout)
else:
- return FTP(source_address=self.source)
+ return FTP(source_address=self.source, timeout=self.timeout)
def download(self, location: str):
# Open the file upfront before establishing connection.
@@ -151,7 +159,13 @@ class FtpC:
class SshC:
known_hosts = os.path.expanduser('~/.ssh/known_hosts')
- def __init__(self, url, progressbar=False, check_space=False, source_host='', source_port=0):
+ def __init__(self,
+ url,
+ progressbar=False,
+ check_space=False,
+ source_host='',
+ source_port=0,
+ timeout=10.0):
self.hostname = url.hostname
self.path = url.path
self.username = url.username or os.getenv('REMOTE_USERNAME')
@@ -160,6 +174,7 @@ class SshC:
self.source = (source_host, source_port)
self.progressbar = progressbar
self.check_space = check_space
+ self.timeout = timeout
def _establish(self):
ssh = SSHClient()
@@ -170,7 +185,7 @@ class SshC:
ssh.set_missing_host_key_policy(InteractivePolicy())
# `socket.create_connection()` automatically picks a NIC and an IPv4/IPv6 address family
# for us on dual-stack systems.
- sock = socket.create_connection((self.hostname, self.port), socket.getdefaulttimeout(), self.source)
+ sock = socket.create_connection((self.hostname, self.port), self.timeout, self.source)
ssh.connect(self.hostname, self.port, self.username, self.password, sock=sock)
return ssh
@@ -199,13 +214,20 @@ class SshC:
class HttpC:
- def __init__(self, url, progressbar=False, check_space=False, source_host='', source_port=0):
+ def __init__(self,
+ url,
+ progressbar=False,
+ check_space=False,
+ source_host='',
+ source_port=0,
+ timeout=10.0):
self.urlstring = urllib.parse.urlunsplit(url)
self.progressbar = progressbar
self.check_space = check_space
self.source_pair = (source_host, source_port)
self.username = url.username or os.getenv('REMOTE_USERNAME')
self.password = url.password or os.getenv('REMOTE_PASSWORD')
+ self.timeout = timeout
def _establish(self):
session = Session()
@@ -221,8 +243,11 @@ class HttpC:
# Not only would it potentially mess up with the progress bar but
# `shutil.copyfileobj(request.raw, file)` does not handle automatic decoding.
s.headers.update({'Accept-Encoding': 'identity'})
- with s.head(self.urlstring, allow_redirects=True) as r:
+ with s.head(self.urlstring,
+ allow_redirects=True,
+ timeout=self.timeout) as r:
# Abort early if the destination is inaccessible.
+ print('pre-3')
r.raise_for_status()
# If the request got redirected, keep the last URL we ended up with.
final_urlstring = r.url
@@ -236,7 +261,8 @@ class HttpC:
size = None
if self.check_space:
check_storage(location, size)
- with s.get(final_urlstring, stream=True) as r, open(location, 'wb') as f:
+ with s.get(final_urlstring, stream=True,
+ timeout=self.timeout) as r, open(location, 'wb') as f:
if self.progressbar and size:
progress = make_incremental_progressbar(CHUNK_SIZE / size)
next(progress)
@@ -250,7 +276,10 @@ class HttpC:
def upload(self, location: str):
# Does not yet support progressbars.
with self._establish() as s, open(location, 'rb') as f:
- s.post(self.urlstring, data=f, allow_redirects=True)
+ s.post(self.urlstring,
+ data=f,
+ allow_redirects=True,
+ timeout=self.timeout)
class TftpC:
@@ -259,10 +288,16 @@ class TftpC:
# 2. Since there's no concept authentication, we don't need to deal with keys/passwords.
# 3. It would be a waste to import, audit and maintain a third-party library for TFTP.
# 4. I'd rather not implement the entire protocol here, no matter how simple it is.
- def __init__(self, url, progressbar=False, check_space=False, source_host=None, source_port=0):
+ def __init__(self,
+ url,
+ progressbar=False,
+ check_space=False,
+ source_host=None,
+ source_port=0,
+ timeout=10):
source_option = f'--interface {source_host} --local-port {source_port}' if source_host else ''
progress_flag = '--progress-bar' if progressbar else '-s'
- self.command = f'curl {source_option} {progress_flag}'
+ self.command = f'curl {source_option} {progress_flag} --connect-timeout {timeout}'
self.urlstring = urllib.parse.urlunsplit(url)
def download(self, location: str):
@@ -287,10 +322,16 @@ def urlc(urlstring, *args, **kwargs):
raise ValueError(f'Unsupported URL scheme: "{url.scheme}"')
def download(local_path, urlstring, *args, **kwargs):
- urlc(urlstring, *args, **kwargs).download(local_path)
+ try:
+ urlc(urlstring, *args, **kwargs).download(local_path)
+ except Exception as err:
+ print_error(f'Unable to download "{urlstring}": {err}')
def upload(local_path, urlstring, *args, **kwargs):
- urlc(urlstring, *args, **kwargs).upload(local_path)
+ try:
+ urlc(urlstring, *args, **kwargs).upload(local_path)
+ except Exception as err:
+ print_error(f'Unable to upload "{urlstring}": {err}')
def get_remote_config(urlstring, source_host='', source_port=0):
"""
diff --git a/python/vyos/template.py b/python/vyos/template.py
index 254a15e3a..e167488c6 100644
--- a/python/vyos/template.py
+++ b/python/vyos/template.py
@@ -20,10 +20,10 @@ from jinja2 import Environment
from jinja2 import FileSystemLoader
from jinja2 import ChainableUndefined
from vyos.defaults import directories
-from vyos.util import chmod
-from vyos.util import chown
-from vyos.util import dict_search_args
-from vyos.util import makedir
+from vyos.utils.dict import dict_search_args
+from vyos.utils.file import makedir
+from vyos.utils.permission import chmod
+from vyos.utils.permission import chown
# Holds template filters registered via register_filter()
_FILTERS = {}
@@ -162,19 +162,19 @@ def force_to_list(value):
@register_filter('seconds_to_human')
def seconds_to_human(seconds, separator=""):
""" Convert seconds to human-readable values like 1d6h15m23s """
- from vyos.util import seconds_to_human
+ from vyos.utils.convert import seconds_to_human
return seconds_to_human(seconds, separator=separator)
@register_filter('bytes_to_human')
def bytes_to_human(bytes, initial_exponent=0, precision=2):
""" Convert bytes to human-readable values like 1.44M """
- from vyos.util import bytes_to_human
+ from vyos.utils.convert import bytes_to_human
return bytes_to_human(bytes, initial_exponent=initial_exponent, precision=precision)
@register_filter('human_to_bytes')
def human_to_bytes(value):
""" Convert a data amount with a unit suffix to bytes, like 2K to 2048 """
- from vyos.util import human_to_bytes
+ from vyos.utils.convert import human_to_bytes
return human_to_bytes(value)
@register_filter('ip_from_cidr')
@@ -420,11 +420,11 @@ def get_dhcp_router(interface):
Returns False of no router is found, returns the IP address as string if
a router is found.
"""
- lease_file = f'/var/lib/dhcp/dhclient_{interface}.leases'
+ lease_file = directories['isc_dhclient_dir'] + f'/dhclient_{interface}.leases'
if not os.path.exists(lease_file):
return None
- from vyos.util import read_file
+ from vyos.utils.file import read_file
for line in read_file(lease_file).splitlines():
if 'option routers' in line:
(_, _, address) = line.split()
@@ -574,9 +574,9 @@ def nft_action(vyos_action):
return vyos_action
@register_filter('nft_rule')
-def nft_rule(rule_conf, fw_name, rule_id, ip_name='ip'):
+def nft_rule(rule_conf, fw_hook, fw_name, rule_id, ip_name='ip'):
from vyos.firewall import parse_rule
- return parse_rule(rule_conf, fw_name, rule_id, ip_name)
+ return parse_rule(rule_conf, fw_hook, fw_name, rule_id, ip_name)
@register_filter('nft_default_rule')
def nft_default_rule(fw_conf, fw_name, ipv6=False):
@@ -587,7 +587,8 @@ def nft_default_rule(fw_conf, fw_name, ipv6=False):
action_suffix = default_action[:1].upper()
output.append(f'log prefix "[{fw_name[:19]}-default-{action_suffix}]"')
- output.append(nft_action(default_action))
+ #output.append(nft_action(default_action))
+ output.append(f'{default_action}')
if 'default_jump_target' in fw_conf:
target = fw_conf['default_jump_target']
def_suffix = '6' if ipv6 else ''
diff --git a/python/vyos/util.py b/python/vyos/util.py
deleted file mode 100644
index d83287fd2..000000000
--- a/python/vyos/util.py
+++ /dev/null
@@ -1,1160 +0,0 @@
-# Copyright 2020-2022 VyOS maintainers and contributors <maintainers@vyos.io>
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library. If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import re
-import sys
-
-#
-# NOTE: Do not import full classes here, move your import to the function
-# where it is used so it is as local as possible to the execution
-#
-
-from subprocess import Popen
-from subprocess import PIPE
-from subprocess import STDOUT
-from subprocess import DEVNULL
-
-def popen(command, flag='', shell=None, input=None, timeout=None, env=None,
- stdout=PIPE, stderr=PIPE, decode='utf-8'):
- """
- popen is a wrapper helper aound subprocess.Popen
- with it default setting it will return a tuple (out, err)
- out: the output of the program run
- err: the error code returned by the program
-
- it can be affected by the following flags:
- shell: do not try to auto-detect if a shell is required
- for example if a pipe (|) or redirection (>, >>) is used
- input: data to sent to the child process via STDIN
- the data should be bytes but string will be converted
- timeout: time after which the command will be considered to have failed
- env: mapping that defines the environment variables for the new process
- stdout: define how the output of the program should be handled
- - PIPE (default), sends stdout to the output
- - DEVNULL, discard the output
- stderr: define how the output of the program should be handled
- - None (default), send/merge the data to/with stderr
- - PIPE, popen will append it to output
- - STDOUT, send the data to be merged with stdout
- - DEVNULL, discard the output
- decode: specify the expected text encoding (utf-8, ascii, ...)
- the default is explicitely utf-8 which is python's own default
-
- usage:
- get both stdout and stderr: popen('command', stdout=PIPE, stderr=STDOUT)
- discard stdout and get stderr: popen('command', stdout=DEVNUL, stderr=PIPE)
- """
-
- # airbag must be left as an import in the function as otherwise we have a
- # a circual import dependency
- from vyos import debug
- from vyos import airbag
-
- # log if the flag is set, otherwise log if command is set
- if not debug.enabled(flag):
- flag = 'command'
-
- cmd_msg = f"cmd '{command}'"
- debug.message(cmd_msg, flag)
-
- use_shell = shell
- stdin = None
- if shell is None:
- use_shell = False
- if ' ' in command:
- use_shell = True
- if env:
- use_shell = True
-
- if input:
- stdin = PIPE
- input = input.encode() if type(input) is str else input
-
- p = Popen(command, stdin=stdin, stdout=stdout, stderr=stderr,
- env=env, shell=use_shell)
-
- pipe = p.communicate(input, timeout)
-
- pipe_out = b''
- if stdout == PIPE:
- pipe_out = pipe[0]
-
- pipe_err = b''
- if stderr == PIPE:
- pipe_err = pipe[1]
-
- str_out = pipe_out.decode(decode).replace('\r\n', '\n').strip()
- str_err = pipe_err.decode(decode).replace('\r\n', '\n').strip()
-
- out_msg = f"returned (out):\n{str_out}"
- if str_out:
- debug.message(out_msg, flag)
-
- if str_err:
- err_msg = f"returned (err):\n{str_err}"
- # this message will also be send to syslog via airbag
- debug.message(err_msg, flag, destination=sys.stderr)
-
- # should something go wrong, report this too via airbag
- airbag.noteworthy(cmd_msg)
- airbag.noteworthy(out_msg)
- airbag.noteworthy(err_msg)
-
- return str_out, p.returncode
-
-
-def run(command, flag='', shell=None, input=None, timeout=None, env=None,
- stdout=DEVNULL, stderr=PIPE, decode='utf-8'):
- """
- A wrapper around popen, which discard the stdout and
- will return the error code of a command
- """
- _, code = popen(
- command, flag,
- stdout=stdout, stderr=stderr,
- input=input, timeout=timeout,
- env=env, shell=shell,
- decode=decode,
- )
- return code
-
-
-def cmd(command, flag='', shell=None, input=None, timeout=None, env=None,
- stdout=PIPE, stderr=PIPE, decode='utf-8', raising=None, message='',
- expect=[0]):
- """
- A wrapper around popen, which returns the stdout and
- will raise the error code of a command
-
- raising: specify which call should be used when raising
- the class should only require a string as parameter
- (default is OSError) with the error code
- expect: a list of error codes to consider as normal
- """
- decoded, code = popen(
- command, flag,
- stdout=stdout, stderr=stderr,
- input=input, timeout=timeout,
- env=env, shell=shell,
- decode=decode,
- )
- if code not in expect:
- feedback = message + '\n' if message else ''
- feedback += f'failed to run command: {command}\n'
- feedback += f'returned: {decoded}\n'
- feedback += f'exit code: {code}'
- if raising is None:
- # error code can be recovered with .errno
- raise OSError(code, feedback)
- else:
- raise raising(feedback)
- return decoded
-
-
-def rc_cmd(command, flag='', shell=None, input=None, timeout=None, env=None,
- stdout=PIPE, stderr=STDOUT, decode='utf-8'):
- """
- A wrapper around popen, which returns the return code
- of a command and stdout
-
- % rc_cmd('uname')
- (0, 'Linux')
- % rc_cmd('ip link show dev eth99')
- (1, 'Device "eth99" does not exist.')
- """
- out, code = popen(
- command, flag,
- stdout=stdout, stderr=stderr,
- input=input, timeout=timeout,
- env=env, shell=shell,
- decode=decode,
- )
- return code, out
-
-
-def call(command, flag='', shell=None, input=None, timeout=None, env=None,
- stdout=PIPE, stderr=PIPE, decode='utf-8'):
- """
- A wrapper around popen, which print the stdout and
- will return the error code of a command
- """
- out, code = popen(
- command, flag,
- stdout=stdout, stderr=stderr,
- input=input, timeout=timeout,
- env=env, shell=shell,
- decode=decode,
- )
- if out:
- print(out)
- return code
-
-
-def read_file(fname, defaultonfailure=None):
- """
- read the content of a file, stripping any end characters (space, newlines)
- should defaultonfailure be not None, it is returned on failure to read
- """
- try:
- """ Read a file to string """
- with open(fname, 'r') as f:
- data = f.read().strip()
- return data
- except Exception as e:
- if defaultonfailure is not None:
- return defaultonfailure
- raise e
-
-def write_file(fname, data, defaultonfailure=None, user=None, group=None, mode=None, append=False):
- """
- Write content of data to given fname, should defaultonfailure be not None,
- it is returned on failure to read.
-
- If directory of file is not present, it is auto-created.
- """
- dirname = os.path.dirname(fname)
- if not os.path.isdir(dirname):
- os.makedirs(dirname, mode=0o755, exist_ok=False)
- chown(dirname, user, group)
-
- try:
- """ Write a file to string """
- bytes = 0
- with open(fname, 'w' if not append else 'a') as f:
- bytes = f.write(data)
- chown(fname, user, group)
- chmod(fname, mode)
- return bytes
- except Exception as e:
- if defaultonfailure is not None:
- return defaultonfailure
- raise e
-
-def read_json(fname, defaultonfailure=None):
- """
- read and json decode the content of a file
- should defaultonfailure be not None, it is returned on failure to read
- """
- import json
- try:
- with open(fname, 'r') as f:
- data = json.load(f)
- return data
- except Exception as e:
- if defaultonfailure is not None:
- return defaultonfailure
- raise e
-
-
-def chown(path, user, group):
- """ change file/directory owner """
- from pwd import getpwnam
- from grp import getgrnam
-
- if user is None or group is None:
- return False
-
- # path may also be an open file descriptor
- if not isinstance(path, int) and not os.path.exists(path):
- return False
-
- uid = getpwnam(user).pw_uid
- gid = getgrnam(group).gr_gid
- os.chown(path, uid, gid)
- return True
-
-
-def chmod(path, bitmask):
- # path may also be an open file descriptor
- if not isinstance(path, int) and not os.path.exists(path):
- return
- if bitmask is None:
- return
- os.chmod(path, bitmask)
-
-
-def chmod_600(path):
- """ make file only read/writable by owner """
- from stat import S_IRUSR, S_IWUSR
-
- bitmask = S_IRUSR | S_IWUSR
- chmod(path, bitmask)
-
-
-def chmod_750(path):
- """ make file/directory only executable to user and group """
- from stat import S_IRUSR, S_IWUSR, S_IXUSR, S_IRGRP, S_IXGRP
-
- bitmask = S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IXGRP
- chmod(path, bitmask)
-
-
-def chmod_755(path):
- """ make file executable by all """
- from stat import S_IRUSR, S_IWUSR, S_IXUSR, S_IRGRP, S_IXGRP, S_IROTH, S_IXOTH
-
- bitmask = S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IXGRP | \
- S_IROTH | S_IXOTH
- chmod(path, bitmask)
-
-
-def makedir(path, user=None, group=None):
- if os.path.exists(path):
- return
- os.makedirs(path, mode=0o755)
- chown(path, user, group)
-
-def colon_separated_to_dict(data_string, uniquekeys=False):
- """ Converts a string containing newline-separated entries
- of colon-separated key-value pairs into a dict.
-
- Such files are common in Linux /proc filesystem
-
- Args:
- data_string (str): data string
- uniquekeys (bool): whether to insist that keys are unique or not
-
- Returns: dict
-
- Raises:
- ValueError: if uniquekeys=True and the data string has
- duplicate keys.
-
- Note:
- If uniquekeys=True, then dict entries are always strings,
- otherwise they are always lists of strings.
- """
- import re
- key_value_re = re.compile('([^:]+)\s*\:\s*(.*)')
-
- data_raw = re.split('\n', data_string)
-
- data = {}
-
- for l in data_raw:
- l = l.strip()
- if l:
- match = re.match(key_value_re, l)
- if match and (len(match.groups()) == 2):
- key = match.groups()[0].strip()
- value = match.groups()[1].strip()
- else:
- raise ValueError(f"""Line "{l}" could not be parsed a colon-separated pair """, l)
- if key in data.keys():
- if uniquekeys:
- raise ValueError("Data string has duplicate keys: {0}".format(key))
- else:
- data[key].append(value)
- else:
- if uniquekeys:
- data[key] = value
- else:
- data[key] = [value]
- else:
- pass
-
- return data
-
-def _mangle_dict_keys(data, regex, replacement, abs_path=[], no_tag_node_value_mangle=False, mod=0):
- """ Mangles dict keys according to a regex and replacement character.
- Some libraries like Jinja2 do not like certain characters in dict keys.
- This function can be used for replacing all offending characters
- with something acceptable.
-
- Args:
- data (dict): Original dict to mangle
-
- Returns: dict
- """
- from vyos.xml import is_tag
-
- new_dict = {}
-
- for key in data.keys():
- save_mod = mod
- save_path = abs_path[:]
-
- abs_path.append(key)
-
- if not is_tag(abs_path):
- new_key = re.sub(regex, replacement, key)
- else:
- if mod%2:
- new_key = key
- else:
- new_key = re.sub(regex, replacement, key)
- if no_tag_node_value_mangle:
- mod += 1
-
- value = data[key]
-
- if isinstance(value, dict):
- new_dict[new_key] = _mangle_dict_keys(value, regex, replacement, abs_path=abs_path, mod=mod, no_tag_node_value_mangle=no_tag_node_value_mangle)
- else:
- new_dict[new_key] = value
-
- mod = save_mod
- abs_path = save_path[:]
-
- return new_dict
-
-def mangle_dict_keys(data, regex, replacement, abs_path=[], no_tag_node_value_mangle=False):
- return _mangle_dict_keys(data, regex, replacement, abs_path=abs_path, no_tag_node_value_mangle=no_tag_node_value_mangle, mod=0)
-
-def _get_sub_dict(d, lpath):
- k = lpath[0]
- if k not in d.keys():
- return {}
- c = {k: d[k]}
- lpath = lpath[1:]
- if not lpath:
- return c
- elif not isinstance(c[k], dict):
- return {}
- return _get_sub_dict(c[k], lpath)
-
-def get_sub_dict(source, lpath, get_first_key=False):
- """ Returns the sub-dict of a nested dict, defined by path of keys.
-
- Args:
- source (dict): Source dict to extract from
- lpath (list[str]): sequence of keys
-
- Returns: source, if lpath is empty, else
- {key : source[..]..[key]} for key the last element of lpath, if exists
- {} otherwise
- """
- if not isinstance(source, dict):
- raise TypeError("source must be of type dict")
- if not isinstance(lpath, list):
- raise TypeError("path must be of type list")
- if not lpath:
- return source
-
- ret = _get_sub_dict(source, lpath)
-
- if get_first_key and lpath and ret:
- tmp = next(iter(ret.values()))
- if not isinstance(tmp, dict):
- raise TypeError("Data under node is not of type dict")
- ret = tmp
-
- return ret
-
-def process_running(pid_file):
- """ Checks if a process with PID in pid_file is running """
- from psutil import pid_exists
- if not os.path.isfile(pid_file):
- return False
- with open(pid_file, 'r') as f:
- pid = f.read().strip()
- return pid_exists(int(pid))
-
-def process_named_running(name, cmdline: str=None):
- """ Checks if process with given name is running and returns its PID.
- If Process is not running, return None
- """
- from psutil import process_iter
- for p in process_iter(['name', 'pid', 'cmdline']):
- if cmdline:
- if p.info['name'] == name and cmdline in p.info['cmdline']:
- return p.info['pid']
- elif p.info['name'] == name:
- return p.info['pid']
- return None
-
-def is_list_equal(first: list, second: list) -> bool:
- """ Check if 2 lists are equal and list not empty """
- if len(first) != len(second) or len(first) == 0:
- return False
- return sorted(first) == sorted(second)
-
-def is_listen_port_bind_service(port: int, service: str) -> bool:
- """Check if listen port bound to expected program name
- :param port: Bind port
- :param service: Program name
- :return: bool
-
- Example:
- % is_listen_port_bind_service(443, 'nginx')
- True
- % is_listen_port_bind_service(443, 'ocserv-main')
- False
- """
- from psutil import net_connections as connections
- from psutil import Process as process
- for connection in connections():
- addr = connection.laddr
- pid = connection.pid
- pid_name = process(pid).name()
- pid_port = addr.port
- if service == pid_name and port == pid_port:
- return True
- return False
-
-def seconds_to_human(s, separator=""):
- """ Converts number of seconds passed to a human-readable
- interval such as 1w4d18h35m59s
- """
- s = int(s)
-
- week = 60 * 60 * 24 * 7
- day = 60 * 60 * 24
- hour = 60 * 60
-
- remainder = 0
- result = ""
-
- weeks = s // week
- if weeks > 0:
- result = "{0}w".format(weeks)
- s = s % week
-
- days = s // day
- if days > 0:
- result = "{0}{1}{2}d".format(result, separator, days)
- s = s % day
-
- hours = s // hour
- if hours > 0:
- result = "{0}{1}{2}h".format(result, separator, hours)
- s = s % hour
-
- minutes = s // 60
- if minutes > 0:
- result = "{0}{1}{2}m".format(result, separator, minutes)
- s = s % 60
-
- seconds = s
- if seconds > 0:
- result = "{0}{1}{2}s".format(result, separator, seconds)
-
- return result
-
-def bytes_to_human(bytes, initial_exponent=0, precision=2):
- """ Converts a value in bytes to a human-readable size string like 640 KB
-
- The initial_exponent parameter is the exponent of 2,
- e.g. 10 (1024) for kilobytes, 20 (1024 * 1024) for megabytes.
- """
-
- if bytes == 0:
- return "0 B"
-
- from math import log2
-
- bytes = bytes * (2**initial_exponent)
-
- # log2 is a float, while range checking requires an int
- exponent = int(log2(bytes))
-
- if exponent < 10:
- value = bytes
- suffix = "B"
- elif exponent in range(10, 20):
- value = bytes / 1024
- suffix = "KB"
- elif exponent in range(20, 30):
- value = bytes / 1024**2
- suffix = "MB"
- elif exponent in range(30, 40):
- value = bytes / 1024**3
- suffix = "GB"
- else:
- value = bytes / 1024**4
- suffix = "TB"
- # Add a new case when the first machine with petabyte RAM
- # hits the market.
-
- size_string = "{0:.{1}f} {2}".format(value, precision, suffix)
- return size_string
-
-def human_to_bytes(value):
- """ Converts a data amount with a unit suffix to bytes, like 2K to 2048 """
-
- from re import match as re_match
-
- res = re_match(r'^\s*(\d+(?:\.\d+)?)\s*([a-zA-Z]+)\s*$', value)
-
- if not res:
- raise ValueError(f"'{value}' is not a valid data amount")
- else:
- amount = float(res.group(1))
- unit = res.group(2).lower()
-
- if unit == 'b':
- res = amount
- elif (unit == 'k') or (unit == 'kb'):
- res = amount * 1024
- elif (unit == 'm') or (unit == 'mb'):
- res = amount * 1024**2
- elif (unit == 'g') or (unit == 'gb'):
- res = amount * 1024**3
- elif (unit == 't') or (unit == 'tb'):
- res = amount * 1024**4
- else:
- raise ValueError(f"Unsupported data unit '{unit}'")
-
- # There cannot be fractional bytes, so we convert them to integer.
- # However, truncating causes problems with conversion back to human unit,
- # so we round instead -- that seems to work well enough.
- return round(res)
-
-def get_cfg_group_id():
- from grp import getgrnam
- from vyos.defaults import cfg_group
-
- group_data = getgrnam(cfg_group)
- return group_data.gr_gid
-
-
-def file_is_persistent(path):
- import re
- location = r'^(/config|/opt/vyatta/etc/config)'
- absolute = os.path.abspath(os.path.dirname(path))
- return re.match(location,absolute)
-
-def wait_for_inotify(file_path, pre_hook=None, event_type=None, timeout=None, sleep_interval=0.1):
- """ Waits for an inotify event to occur """
- if not os.path.dirname(file_path):
- raise ValueError(
- "File path {} does not have a directory part (required for inotify watching)".format(file_path))
- if not os.path.basename(file_path):
- raise ValueError(
- "File path {} does not have a file part, do not know what to watch for".format(file_path))
-
- from inotify.adapters import Inotify
- from time import time
- from time import sleep
-
- time_start = time()
-
- i = Inotify()
- i.add_watch(os.path.dirname(file_path))
-
- if pre_hook:
- pre_hook()
-
- for event in i.event_gen(yield_nones=True):
- if (timeout is not None) and ((time() - time_start) > timeout):
- # If the function didn't return until this point,
- # the file failed to have been written to and closed within the timeout
- raise OSError("Waiting for file {} to be written has failed".format(file_path))
-
- # Most such events don't take much time, so it's better to check right away
- # and sleep later.
- if event is not None:
- (_, type_names, path, filename) = event
- if filename == os.path.basename(file_path):
- if event_type in type_names:
- return
- sleep(sleep_interval)
-
-def wait_for_file_write_complete(file_path, pre_hook=None, timeout=None, sleep_interval=0.1):
- """ Waits for a process to close a file after opening it in write mode. """
- wait_for_inotify(file_path,
- event_type='IN_CLOSE_WRITE', pre_hook=pre_hook, timeout=timeout, sleep_interval=sleep_interval)
-
-def commit_in_progress():
- """ Not to be used in normal op mode scripts! """
-
- # The CStore backend locks the config by opening a file
- # The file is not removed after commit, so just checking
- # if it exists is insufficient, we need to know if it's open by anyone
-
- # There are two ways to check if any other process keeps a file open.
- # The first one is to try opening it and see if the OS objects.
- # That's faster but prone to race conditions and can be intrusive.
- # The other one is to actually check if any process keeps it open.
- # It's non-intrusive but needs root permissions, else you can't check
- # processes of other users.
- #
- # Since this will be used in scripts that modify the config outside of the CLI
- # framework, those knowingly have root permissions.
- # For everything else, we add a safeguard.
- from psutil import process_iter
- from psutil import NoSuchProcess
- from getpass import getuser
- from vyos.defaults import commit_lock
-
- if getuser() != 'root':
- raise OSError('This functions needs to be run as root to return correct results!')
-
- for proc in process_iter():
- try:
- files = proc.open_files()
- if files:
- for f in files:
- if f.path == commit_lock:
- return True
- except NoSuchProcess as err:
- # Process died before we could examine it
- pass
- # Default case
- return False
-
-
-def wait_for_commit_lock():
- """ Not to be used in normal op mode scripts! """
- from time import sleep
- # Very synchronous approach to multiprocessing
- while commit_in_progress():
- sleep(1)
-
-def ask_input(question, default='', numeric_only=False, valid_responses=[]):
- question_out = question
- if default:
- question_out += f' (Default: {default})'
- response = ''
- while True:
- response = input(question_out + ' ').strip()
- if not response and default:
- return default
- if numeric_only:
- if not response.isnumeric():
- print("Invalid value, try again.")
- continue
- response = int(response)
- if valid_responses and response not in valid_responses:
- print("Invalid value, try again.")
- continue
- break
- return response
-
-def ask_yes_no(question, default=False) -> bool:
- """Ask a yes/no question via input() and return their answer."""
- from sys import stdout
- default_msg = "[Y/n]" if default else "[y/N]"
- while True:
- try:
- stdout.write("%s %s " % (question, default_msg))
- c = input().lower()
- if c == '':
- return default
- elif c in ("y", "ye", "yes"):
- return True
- elif c in ("n", "no"):
- return False
- else:
- stdout.write("Please respond with yes/y or no/n\n")
- except EOFError:
- stdout.write("\nPlease respond with yes/y or no/n\n")
-
-def is_admin() -> bool:
- """Look if current user is in sudo group"""
- from getpass import getuser
- from grp import getgrnam
- current_user = getuser()
- (_, _, _, admin_group_members) = getgrnam('sudo')
- return current_user in admin_group_members
-
-
-def mac2eui64(mac, prefix=None):
- """
- Convert a MAC address to a EUI64 address or, with prefix provided, a full
- IPv6 address.
- Thankfully copied from https://gist.github.com/wido/f5e32576bb57b5cc6f934e177a37a0d3
- """
- import re
- from ipaddress import ip_network
- # http://tools.ietf.org/html/rfc4291#section-2.5.1
- eui64 = re.sub(r'[.:-]', '', mac).lower()
- eui64 = eui64[0:6] + 'fffe' + eui64[6:]
- eui64 = hex(int(eui64[0:2], 16) ^ 2)[2:].zfill(2) + eui64[2:]
-
- if prefix is None:
- return ':'.join(re.findall(r'.{4}', eui64))
- else:
- try:
- net = ip_network(prefix, strict=False)
- euil = int('0x{0}'.format(eui64), 16)
- return str(net[euil])
- except: # pylint: disable=bare-except
- return
-
-def get_half_cpus():
- """ return 1/2 of the numbers of available CPUs """
- cpu = os.cpu_count()
- if cpu > 1:
- cpu /= 2
- return int(cpu)
-
-def check_kmod(k_mod):
- """ Common utility function to load required kernel modules on demand """
- from vyos import ConfigError
- if isinstance(k_mod, str):
- k_mod = k_mod.split()
- for module in k_mod:
- if not os.path.exists(f'/sys/module/{module}'):
- if call(f'modprobe {module}') != 0:
- raise ConfigError(f'Loading Kernel module {module} failed')
-
-def find_device_file(device):
- """ Recurively search /dev for the given device file and return its full path.
- If no device file was found 'None' is returned """
- from fnmatch import fnmatch
-
- for root, dirs, files in os.walk('/dev'):
- for basename in files:
- if fnmatch(basename, device):
- return os.path.join(root, basename)
-
- return None
-
-def dict_search(path, dict_object):
- """ Traverse Python dictionary (dict_object) delimited by dot (.).
- Return value of key if found, None otherwise.
-
- This is faster implementation then jmespath.search('foo.bar', dict_object)"""
- if not isinstance(dict_object, dict) or not path:
- return None
-
- parts = path.split('.')
- inside = parts[:-1]
- if not inside:
- if path not in dict_object:
- return None
- return dict_object[path]
- c = dict_object
- for p in parts[:-1]:
- c = c.get(p, {})
- return c.get(parts[-1], None)
-
-def dict_search_args(dict_object, *path):
- # Traverse dictionary using variable arguments
- # Added due to above function not allowing for '.' in the key names
- # Example: dict_search_args(some_dict, 'key', 'subkey', 'subsubkey', ...)
- if not isinstance(dict_object, dict) or not path:
- return None
-
- for item in path:
- if item not in dict_object:
- return None
- dict_object = dict_object[item]
- return dict_object
-
-def dict_search_recursive(dict_object, key, path=[]):
- """ Traverse a dictionary recurisvely and return the value of the key
- we are looking for.
-
- Thankfully copied from https://stackoverflow.com/a/19871956
-
- Modified to yield optional path to found keys
- """
- if isinstance(dict_object, list):
- for i in dict_object:
- new_path = path + [i]
- for x in dict_search_recursive(i, key, new_path):
- yield x
- elif isinstance(dict_object, dict):
- if key in dict_object:
- new_path = path + [key]
- yield dict_object[key], new_path
- for k, j in dict_object.items():
- new_path = path + [k]
- for x in dict_search_recursive(j, key, new_path):
- yield x
-
-def convert_data(data):
- """Convert multiple types of data to types usable in CLI
-
- Args:
- data (str | bytes | list | OrderedDict): input data
-
- Returns:
- str | list | dict: converted data
- """
- from base64 import b64encode
- from collections import OrderedDict
-
- if isinstance(data, str):
- return data
- if isinstance(data, bytes):
- try:
- return data.decode()
- except UnicodeDecodeError:
- return b64encode(data).decode()
- if isinstance(data, list):
- list_tmp = []
- for item in data:
- list_tmp.append(convert_data(item))
- return list_tmp
- if isinstance(data, OrderedDict):
- dict_tmp = {}
- for key, value in data.items():
- dict_tmp[key] = convert_data(value)
- return dict_tmp
-
-def get_bridge_fdb(interface):
- """ Returns the forwarding database entries for a given interface """
- if not os.path.exists(f'/sys/class/net/{interface}'):
- return None
- from json import loads
- tmp = loads(cmd(f'bridge -j fdb show dev {interface}'))
- return tmp
-
-def get_interface_config(interface, netns=None):
- """ Returns the used encapsulation protocol for given interface.
- If interface does not exist, None is returned.
- """
- from vyos.util import run
- netns_exec = f'ip netns exec {netns}' if netns else ''
- if run(f'{netns_exec} test -e /sys/class/net/{interface}') != 0:
- return None
- from json import loads
- tmp = loads(cmd(f'{netns_exec} ip -d -j link show {interface}'))[0]
- return tmp
-
-def get_interface_address(interface):
- """ Returns the used encapsulation protocol for given interface.
- If interface does not exist, None is returned.
- """
- if not os.path.exists(f'/sys/class/net/{interface}'):
- return None
- from json import loads
- tmp = loads(cmd(f'ip -d -j addr show {interface}'))[0]
- return tmp
-
-def get_interface_namespace(iface):
- """
- Returns wich netns the interface belongs to
- """
- from json import loads
- # Check if netns exist
- tmp = loads(cmd(f'ip --json netns ls'))
- if len(tmp) == 0:
- return None
-
- for ns in tmp:
- namespace = f'{ns["name"]}'
- # Search interface in each netns
- data = loads(cmd(f'ip netns exec {namespace} ip -j link show'))
- for compare in data:
- if iface == compare["ifname"]:
- return namespace
-
-def get_all_vrfs():
- """ Return a dictionary of all system wide known VRF instances """
- from json import loads
- tmp = loads(cmd('ip -j vrf list'))
- # Result is of type [{"name":"red","table":1000},{"name":"blue","table":2000}]
- # so we will re-arrange it to a more nicer representation:
- # {'red': {'table': 1000}, 'blue': {'table': 2000}}
- data = {}
- for entry in tmp:
- name = entry.pop('name')
- data[name] = entry
- return data
-
-def print_error(str='', end='\n'):
- """
- Print `str` to stderr, terminated with `end`.
- Used for warnings and out-of-band messages to avoid mangling precious
- stdout output.
- """
- sys.stderr.write(str)
- sys.stderr.write(end)
- sys.stderr.flush()
-
-def make_progressbar():
- """
- Make a procedure that takes two arguments `done` and `total` and prints a
- progressbar based on the ratio thereof, whose length is determined by the
- width of the terminal.
- """
- import shutil, math
- col, _ = shutil.get_terminal_size()
- col = max(col - 15, 20)
- def print_progressbar(done, total):
- if done <= total:
- increment = total / col
- length = math.ceil(done / increment)
- percentage = str(math.ceil(100 * done / total)).rjust(3)
- print_error(f'[{length * "#"}{(col - length) * "_"}] {percentage}%', '\r')
- # Print a newline so that the subsequent prints don't overwrite the full bar.
- if done == total:
- print_error()
- return print_progressbar
-
-def make_incremental_progressbar(increment: float):
- """
- Make a generator that displays a progressbar that grows monotonically with
- every iteration.
- First call displays it at 0% and every subsequent iteration displays it
- at `increment` increments where 0.0 < `increment` < 1.0.
- Intended for FTP and HTTP transfers with stateless callbacks.
- """
- print_progressbar = make_progressbar()
- total = 0.0
- while total < 1.0:
- print_progressbar(total, 1.0)
- yield
- total += increment
- print_progressbar(1, 1)
- # Ignore further calls.
- while True:
- yield
-
-def begin(*args):
- """
- Evaluate arguments in order and return the result of the *last* argument.
- For combining multiple expressions in one statement. Useful for lambdas.
- """
- return args[-1]
-
-def begin0(*args):
- """
- Evaluate arguments in order and return the result of the *first* argument.
- For combining multiple expressions in one statement. Useful for lambdas.
- """
- return args[0]
-
-def is_systemd_service_active(service):
- """ Test is a specified systemd service is activated.
- Returns True if service is active, false otherwise.
- Copied from: https://unix.stackexchange.com/a/435317 """
- tmp = cmd(f'systemctl show --value -p ActiveState {service}')
- return bool((tmp == 'active'))
-
-def is_systemd_service_running(service):
- """ Test is a specified systemd service is actually running.
- Returns True if service is running, false otherwise.
- Copied from: https://unix.stackexchange.com/a/435317 """
- tmp = cmd(f'systemctl show --value -p SubState {service}')
- return bool((tmp == 'running'))
-
-def check_port_availability(ipaddress, port, protocol):
- """
- Check if port is available and not used by any service
- Return False if a port is busy or IP address does not exists
- Should be used carefully for services that can start listening
- dynamically, because IP address may be dynamic too
- """
- from socketserver import TCPServer, UDPServer
- from ipaddress import ip_address
-
- # verify arguments
- try:
- ipaddress = ip_address(ipaddress).compressed
- except:
- raise ValueError(f'The {ipaddress} is not a valid IPv4 or IPv6 address')
- if port not in range(1, 65536):
- raise ValueError(f'The port number {port} is not in the 1-65535 range')
- if protocol not in ['tcp', 'udp']:
- raise ValueError(
- f'The protocol {protocol} is not supported. Only tcp and udp are allowed'
- )
-
- # check port availability
- try:
- if protocol == 'tcp':
- server = TCPServer((ipaddress, port), None, bind_and_activate=True)
- if protocol == 'udp':
- server = UDPServer((ipaddress, port), None, bind_and_activate=True)
- server.server_close()
- return True
- except:
- return False
-
-def install_into_config(conf, config_paths, override_prompt=True):
- # Allows op-mode scripts to install values if called from an active config session
- # config_paths: dict of config paths
- # override_prompt: if True, user will be prompted before existing nodes are overwritten
-
- if not config_paths:
- return None
-
- from vyos.config import Config
-
- if not Config().in_session():
- print('You are not in configure mode, commands to install manually from configure mode:')
- for path in config_paths:
- print(f'set {path}')
- return None
-
- count = 0
- failed = []
-
- for path in config_paths:
- if override_prompt and conf.exists(path) and not conf.is_multi(path):
- if not ask_yes_no(f'Config node "{node}" already exists. Do you want to overwrite it?'):
- continue
-
- try:
- cmd(f'/opt/vyatta/sbin/my_set {path}')
- count += 1
- except:
- failed.append(path)
-
- if failed:
- print(f'Failed to install {len(failed)} value(s). Commands to manually install:')
- for path in failed:
- print(f'set {path}')
-
- if count > 0:
- print(f'{count} value(s) installed. Use "compare" to see the pending changes, and "commit" to apply.')
-
-def is_wwan_connected(interface):
- """ Determine if a given WWAN interface, e.g. wwan0 is connected to the
- carrier network or not """
- import json
-
- if not interface.startswith('wwan'):
- raise ValueError(f'Specified interface "{interface}" is not a WWAN interface')
-
- # ModemManager is required for connection(s) - if service is not running,
- # there won't be any connection at all!
- if not is_systemd_service_active('ModemManager.service'):
- return False
-
- modem = interface.lstrip('wwan')
-
- tmp = cmd(f'mmcli --modem {modem} --output-json')
- tmp = json.loads(tmp)
-
- # return True/False if interface is in connected state
- return dict_search('modem.generic.state', tmp) == 'connected'
-
-def boot_configuration_complete() -> bool:
- """ Check if the boot config loader has completed
- """
- from vyos.defaults import config_status
-
- if os.path.isfile(config_status):
- return True
- return False
-
-def sysctl_read(name):
- """ Read and return current value of sysctl() option """
- tmp = cmd(f'sysctl {name}')
- return tmp.split()[-1]
-
-def sysctl_write(name, value):
- """ Change value via sysctl() - return True if changed, False otherwise """
- tmp = cmd(f'sysctl {name}')
- # last list index contains the actual value - only write if value differs
- if sysctl_read(name) != str(value):
- call(f'sysctl -wq {name}={value}')
- return True
- return False
-
-def load_as_module(name: str, path: str):
- import importlib.util
-
- spec = importlib.util.spec_from_file_location(name, path)
- mod = importlib.util.module_from_spec(spec)
- spec.loader.exec_module(mod)
- return mod
diff --git a/python/vyos/utils/__init__.py b/python/vyos/utils/__init__.py
index e69de29bb..12ef2d3b8 100644
--- a/python/vyos/utils/__init__.py
+++ b/python/vyos/utils/__init__.py
@@ -0,0 +1,30 @@
+# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+from vyos.utils import assertion
+from vyos.utils import auth
+from vyos.utils import boot
+from vyos.utils import commit
+from vyos.utils import convert
+from vyos.utils import dict
+from vyos.utils import file
+from vyos.utils import io
+from vyos.utils import kernel
+from vyos.utils import list
+from vyos.utils import misc
+from vyos.utils import network
+from vyos.utils import permission
+from vyos.utils import process
+from vyos.utils import system
diff --git a/python/vyos/utils/assertion.py b/python/vyos/utils/assertion.py
new file mode 100644
index 000000000..1aaa54dff
--- /dev/null
+++ b/python/vyos/utils/assertion.py
@@ -0,0 +1,81 @@
+# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+def assert_boolean(b):
+ if int(b) not in (0, 1):
+ raise ValueError(f'Value {b} out of range')
+
+def assert_range(value, lower=0, count=3):
+ if int(value, 16) not in range(lower, lower+count):
+ raise ValueError("Value out of range")
+
+def assert_list(s, l):
+ if s not in l:
+ o = ' or '.join([f'"{n}"' for n in l])
+ raise ValueError(f'state must be {o}, got {s}')
+
+def assert_number(n):
+ if not str(n).isnumeric():
+ raise ValueError(f'{n} must be a number')
+
+def assert_positive(n, smaller=0):
+ assert_number(n)
+ if int(n) < smaller:
+ raise ValueError(f'{n} is smaller than {smaller}')
+
+def assert_mtu(mtu, ifname):
+ assert_number(mtu)
+
+ import json
+ from vyos.utils.process import cmd
+ out = cmd(f'ip -j -d link show dev {ifname}')
+ # [{"ifindex":2,"ifname":"eth0","flags":["BROADCAST","MULTICAST","UP","LOWER_UP"],"mtu":1500,"qdisc":"pfifo_fast","operstate":"UP","linkmode":"DEFAULT","group":"default","txqlen":1000,"link_type":"ether","address":"08:00:27:d9:5b:04","broadcast":"ff:ff:ff:ff:ff:ff","promiscuity":0,"min_mtu":46,"max_mtu":16110,"inet6_addr_gen_mode":"none","num_tx_queues":1,"num_rx_queues":1,"gso_max_size":65536,"gso_max_segs":65535}]
+ parsed = json.loads(out)[0]
+ min_mtu = int(parsed.get('min_mtu', '0'))
+ # cur_mtu = parsed.get('mtu',0),
+ max_mtu = int(parsed.get('max_mtu', '0'))
+ cur_mtu = int(mtu)
+
+ if (min_mtu and cur_mtu < min_mtu) or cur_mtu < 68:
+ raise ValueError(f'MTU is too small for interface "{ifname}": {mtu} < {min_mtu}')
+ if (max_mtu and cur_mtu > max_mtu) or cur_mtu > 65536:
+ raise ValueError(f'MTU is too small for interface "{ifname}": {mtu} > {max_mtu}')
+
+def assert_mac(m):
+ split = m.split(':')
+ size = len(split)
+
+ # a mac address consits out of 6 octets
+ if size != 6:
+ raise ValueError(f'wrong number of MAC octets ({size}): {m}')
+
+ octets = []
+ try:
+ for octet in split:
+ octets.append(int(octet, 16))
+ except ValueError:
+ raise ValueError(f'invalid hex number "{octet}" in : {m}')
+
+ # validate against the first mac address byte if it's a multicast
+ # address
+ if octets[0] & 1:
+ raise ValueError(f'{m} is a multicast MAC address')
+
+ # overall mac address is not allowed to be 00:00:00:00:00:00
+ if sum(octets) == 0:
+ raise ValueError('00:00:00:00:00:00 is not a valid MAC address')
+
+ if octets[:5] == (0, 0, 94, 0, 1):
+ raise ValueError(f'{m} is a VRRP MAC address')
diff --git a/python/vyos/authutils.py b/python/vyos/utils/auth.py
index 66b5f4a74..a59858d72 100644
--- a/python/vyos/authutils.py
+++ b/python/vyos/utils/auth.py
@@ -15,7 +15,7 @@
import re
-from vyos.util import cmd
+from vyos.utils.process import cmd
def make_password_hash(password):
diff --git a/python/vyos/utils/boot.py b/python/vyos/utils/boot.py
new file mode 100644
index 000000000..3aecbec64
--- /dev/null
+++ b/python/vyos/utils/boot.py
@@ -0,0 +1,35 @@
+# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+
+def boot_configuration_complete() -> bool:
+ """ Check if the boot config loader has completed
+ """
+ from vyos.defaults import config_status
+ if os.path.isfile(config_status):
+ return True
+ return False
+
+def boot_configuration_success() -> bool:
+ from vyos.defaults import config_status
+ try:
+ with open(config_status) as f:
+ res = f.read().strip()
+ except FileNotFoundError:
+ return False
+ if int(res) == 0:
+ return True
+ return False
diff --git a/python/vyos/utils/commit.py b/python/vyos/utils/commit.py
new file mode 100644
index 000000000..105aed8c2
--- /dev/null
+++ b/python/vyos/utils/commit.py
@@ -0,0 +1,60 @@
+# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+def commit_in_progress():
+ """ Not to be used in normal op mode scripts! """
+
+ # The CStore backend locks the config by opening a file
+ # The file is not removed after commit, so just checking
+ # if it exists is insufficient, we need to know if it's open by anyone
+
+ # There are two ways to check if any other process keeps a file open.
+ # The first one is to try opening it and see if the OS objects.
+ # That's faster but prone to race conditions and can be intrusive.
+ # The other one is to actually check if any process keeps it open.
+ # It's non-intrusive but needs root permissions, else you can't check
+ # processes of other users.
+ #
+ # Since this will be used in scripts that modify the config outside of the CLI
+ # framework, those knowingly have root permissions.
+ # For everything else, we add a safeguard.
+ from psutil import process_iter
+ from psutil import NoSuchProcess
+ from getpass import getuser
+ from vyos.defaults import commit_lock
+
+ if getuser() != 'root':
+ raise OSError('This functions needs to be run as root to return correct results!')
+
+ for proc in process_iter():
+ try:
+ files = proc.open_files()
+ if files:
+ for f in files:
+ if f.path == commit_lock:
+ return True
+ except NoSuchProcess as err:
+ # Process died before we could examine it
+ pass
+ # Default case
+ return False
+
+
+def wait_for_commit_lock():
+ """ Not to be used in normal op mode scripts! """
+ from time import sleep
+ # Very synchronous approach to multiprocessing
+ while commit_in_progress():
+ sleep(1)
diff --git a/python/vyos/utils/convert.py b/python/vyos/utils/convert.py
index 975c67e0a..9a8a1ff7d 100644
--- a/python/vyos/utils/convert.py
+++ b/python/vyos/utils/convert.py
@@ -143,3 +143,55 @@ def mac_to_eui64(mac, prefix=None):
return str(net[euil])
except: # pylint: disable=bare-except
return
+
+
+def convert_data(data) -> dict | list | tuple | str | int | float | bool | None:
+ """Filter and convert multiple types of data to types usable in CLI/API
+
+ WARNING: Must not be used for anything except formatting output for API or CLI
+
+ On the output allowed everything supported in JSON.
+
+ Args:
+ data (Any): input data
+
+ Returns:
+ dict | list | tuple | str | int | float | bool | None: converted data
+ """
+ from base64 import b64encode
+
+ # return original data for types which do not require conversion
+ if isinstance(data, str | int | float | bool | None):
+ return data
+
+ if isinstance(data, list):
+ list_tmp = []
+ for item in data:
+ list_tmp.append(convert_data(item))
+ return list_tmp
+
+ if isinstance(data, tuple):
+ list_tmp = list(data)
+ tuple_tmp = tuple(convert_data(list_tmp))
+ return tuple_tmp
+
+ if isinstance(data, bytes | bytearray):
+ try:
+ return data.decode()
+ except UnicodeDecodeError:
+ return b64encode(data).decode()
+
+ if isinstance(data, set | frozenset):
+ list_tmp = convert_data(list(data))
+ return list_tmp
+
+ if isinstance(data, dict):
+ dict_tmp = {}
+ for key, value in data.items():
+ dict_tmp[key] = convert_data(value)
+ return dict_tmp
+
+ # do not return anything for other types
+ # which cannot be converted to JSON
+ # for example: complex | range | memoryview
+ return
diff --git a/python/vyos/utils/dict.py b/python/vyos/utils/dict.py
index 7c93deef6..9484eacdd 100644
--- a/python/vyos/utils/dict.py
+++ b/python/vyos/utils/dict.py
@@ -13,7 +13,6 @@
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
-
def colon_separated_to_dict(data_string, uniquekeys=False):
""" Converts a string containing newline-separated entries
of colon-separated key-value pairs into a dict.
@@ -65,7 +64,7 @@ def colon_separated_to_dict(data_string, uniquekeys=False):
return data
-def _mangle_dict_keys(data, regex, replacement, abs_path=[], no_tag_node_value_mangle=False, mod=0):
+def mangle_dict_keys(data, regex, replacement, abs_path=None, no_tag_node_value_mangle=False):
""" Mangles dict keys according to a regex and replacement character.
Some libraries like Jinja2 do not like certain characters in dict keys.
This function can be used for replacing all offending characters
@@ -73,44 +72,39 @@ def _mangle_dict_keys(data, regex, replacement, abs_path=[], no_tag_node_value_m
Args:
data (dict): Original dict to mangle
+ regex, replacement (str): arguments to re.sub(regex, replacement, ...)
+ abs_path (list): if data is a config dict and no_tag_node_value_mangle is True
+ then abs_path should be the absolute config path to the first
+ keys of data, non-inclusive
+ no_tag_node_value_mangle (bool): do not mangle keys of tag node values
Returns: dict
"""
- from vyos.xml import is_tag
-
- new_dict = {}
+ import re
+ from vyos.xml_ref import is_tag_value
- for key in data.keys():
- save_mod = mod
- save_path = abs_path[:]
+ if abs_path is None:
+ abs_path = []
- abs_path.append(key)
+ new_dict = type(data)()
- if not is_tag(abs_path):
- new_key = re.sub(regex, replacement, key)
+ for k in data.keys():
+ if no_tag_node_value_mangle and is_tag_value(abs_path + [k]):
+ new_key = k
else:
- if mod%2:
- new_key = key
- else:
- new_key = re.sub(regex, replacement, key)
- if no_tag_node_value_mangle:
- mod += 1
+ new_key = re.sub(regex, replacement, k)
- value = data[key]
+ value = data[k]
if isinstance(value, dict):
- new_dict[new_key] = _mangle_dict_keys(value, regex, replacement, abs_path=abs_path, mod=mod, no_tag_node_value_mangle=no_tag_node_value_mangle)
+ new_dict[new_key] = mangle_dict_keys(value, regex, replacement,
+ abs_path=abs_path + [k],
+ no_tag_node_value_mangle=no_tag_node_value_mangle)
else:
new_dict[new_key] = value
- mod = save_mod
- abs_path = save_path[:]
-
return new_dict
-def mangle_dict_keys(data, regex, replacement, abs_path=[], no_tag_node_value_mangle=False):
- return _mangle_dict_keys(data, regex, replacement, abs_path=abs_path, no_tag_node_value_mangle=no_tag_node_value_mangle, mod=0)
-
def _get_sub_dict(d, lpath):
k = lpath[0]
if k not in d.keys():
@@ -234,6 +228,27 @@ def dict_to_list(d, save_key_to=None):
return collect
+def dict_to_paths(d: dict) -> list:
+ """ Generator to return list of paths from dict of list[str]|str
+ """
+ def func(d, path):
+ if isinstance(d, dict):
+ if not d:
+ yield path
+ for k, v in d.items():
+ for r in func(v, path + [k]):
+ yield r
+ elif isinstance(d, list):
+ for i in d:
+ for r in func(i, path):
+ yield r
+ elif isinstance(d, str):
+ yield path + [d]
+ else:
+ raise ValueError('object is not a dict of strings/list of strings')
+ for r in func(d, []):
+ yield r
+
def check_mutually_exclusive_options(d, keys, required=False):
""" Checks if a dict has at most one or only one of
mutually exclusive keys.
@@ -254,3 +269,39 @@ def check_mutually_exclusive_options(d, keys, required=False):
if required and (len(present_keys) < 1):
raise ValueError(f"At least one of the following options is required: {orig_keys}")
+
+class FixedDict(dict):
+ """
+ FixedDict: A dictionnary not allowing new keys to be created after initialisation.
+
+ >>> f = FixedDict(**{'count':1})
+ >>> f['count'] = 2
+ >>> f['king'] = 3
+ File "...", line ..., in __setitem__
+ raise ConfigError(f'Option "{k}" has no defined default')
+ """
+
+ from vyos import ConfigError
+
+ def __init__(self, **options):
+ self._allowed = options.keys()
+ super().__init__(**options)
+
+ def __setitem__(self, k, v):
+ """
+ __setitem__ is a builtin which is called by python when setting dict values:
+ >>> d = dict()
+ >>> d['key'] = 'value'
+ >>> d
+ {'key': 'value'}
+
+ is syntaxic sugar for
+
+ >>> d = dict()
+ >>> d.__setitem__('key','value')
+ >>> d
+ {'key': 'value'}
+ """
+ if k not in self._allowed:
+ raise ConfigError(f'Option "{k}" has no defined default')
+ super().__setitem__(k, v)
diff --git a/python/vyos/utils/file.py b/python/vyos/utils/file.py
index 2560a35be..667a2464b 100644
--- a/python/vyos/utils/file.py
+++ b/python/vyos/utils/file.py
@@ -14,7 +14,19 @@
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
import os
+from vyos.utils.permission import chown
+def makedir(path, user=None, group=None):
+ if os.path.exists(path):
+ return
+ os.makedirs(path, mode=0o755)
+ chown(path, user, group)
+
+def file_is_persistent(path):
+ import re
+ location = r'^(/config|/opt/vyatta/etc/config)'
+ absolute = os.path.abspath(os.path.dirname(path))
+ return re.match(location,absolute)
def read_file(fname, defaultonfailure=None):
"""
diff --git a/python/vyos/utils/kernel.py b/python/vyos/utils/kernel.py
new file mode 100644
index 000000000..1f3bbdffe
--- /dev/null
+++ b/python/vyos/utils/kernel.py
@@ -0,0 +1,38 @@
+# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+
+def check_kmod(k_mod):
+ """ Common utility function to load required kernel modules on demand """
+ from vyos import ConfigError
+ from vyos.utils.process import call
+ if isinstance(k_mod, str):
+ k_mod = k_mod.split()
+ for module in k_mod:
+ if not os.path.exists(f'/sys/module/{module}'):
+ if call(f'modprobe {module}') != 0:
+ raise ConfigError(f'Loading Kernel module {module} failed')
+
+def unload_kmod(k_mod):
+ """ Common utility function to unload required kernel modules on demand """
+ from vyos import ConfigError
+ from vyos.utils.process import call
+ if isinstance(k_mod, str):
+ k_mod = k_mod.split()
+ for module in k_mod:
+ if os.path.exists(f'/sys/module/{module}'):
+ if call(f'rmmod {module}') != 0:
+ raise ConfigError(f'Unloading Kernel module {module} failed')
diff --git a/python/vyos/utils/list.py b/python/vyos/utils/list.py
new file mode 100644
index 000000000..63ef720ab
--- /dev/null
+++ b/python/vyos/utils/list.py
@@ -0,0 +1,20 @@
+# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+def is_list_equal(first: list, second: list) -> bool:
+ """ Check if 2 lists are equal and list not empty """
+ if len(first) != len(second) or len(first) == 0:
+ return False
+ return sorted(first) == sorted(second)
diff --git a/python/vyos/utils/misc.py b/python/vyos/utils/misc.py
new file mode 100644
index 000000000..d82655914
--- /dev/null
+++ b/python/vyos/utils/misc.py
@@ -0,0 +1,66 @@
+# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+def begin(*args):
+ """
+ Evaluate arguments in order and return the result of the *last* argument.
+ For combining multiple expressions in one statement. Useful for lambdas.
+ """
+ return args[-1]
+
+def begin0(*args):
+ """
+ Evaluate arguments in order and return the result of the *first* argument.
+ For combining multiple expressions in one statement. Useful for lambdas.
+ """
+ return args[0]
+
+def install_into_config(conf, config_paths, override_prompt=True):
+ # Allows op-mode scripts to install values if called from an active config session
+ # config_paths: dict of config paths
+ # override_prompt: if True, user will be prompted before existing nodes are overwritten
+ if not config_paths:
+ return None
+
+ from vyos.config import Config
+ from vyos.utils.io import ask_yes_no
+ from vyos.utils.process import cmd
+ if not Config().in_session():
+ print('You are not in configure mode, commands to install manually from configure mode:')
+ for path in config_paths:
+ print(f'set {path}')
+ return None
+
+ count = 0
+ failed = []
+
+ for path in config_paths:
+ if override_prompt and conf.exists(path) and not conf.is_multi(path):
+ if not ask_yes_no(f'Config node "{node}" already exists. Do you want to overwrite it?'):
+ continue
+
+ try:
+ cmd(f'/opt/vyatta/sbin/my_set {path}')
+ count += 1
+ except:
+ failed.append(path)
+
+ if failed:
+ print(f'Failed to install {len(failed)} value(s). Commands to manually install:')
+ for path in failed:
+ print(f'set {path}')
+
+ if count > 0:
+ print(f'{count} value(s) installed. Use "compare" to see the pending changes, and "commit" to apply.')
diff --git a/python/vyos/utils/network.py b/python/vyos/utils/network.py
new file mode 100644
index 000000000..2f181d8d9
--- /dev/null
+++ b/python/vyos/utils/network.py
@@ -0,0 +1,417 @@
+# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+def _are_same_ip(one, two):
+ from socket import AF_INET
+ from socket import AF_INET6
+ from socket import inet_pton
+ from vyos.template import is_ipv4
+ # compare the binary representation of the IP
+ f_one = AF_INET if is_ipv4(one) else AF_INET6
+ s_two = AF_INET if is_ipv4(two) else AF_INET6
+ return inet_pton(f_one, one) == inet_pton(f_one, two)
+
+def get_protocol_by_name(protocol_name):
+ """Get protocol number by protocol name
+
+ % get_protocol_by_name('tcp')
+ % 6
+ """
+ import socket
+ try:
+ protocol_number = socket.getprotobyname(protocol_name)
+ return protocol_number
+ except socket.error:
+ return protocol_name
+
+def interface_exists(interface) -> bool:
+ import os
+ return os.path.exists(f'/sys/class/net/{interface}')
+
+def interface_exists_in_netns(interface_name, netns):
+ from vyos.utils.process import rc_cmd
+ rc, out = rc_cmd(f'ip netns exec {netns} ip link show dev {interface_name}')
+ if rc == 0:
+ return True
+ return False
+
+def get_vrf_members(vrf: str) -> list:
+ """
+ Get list of interface VRF members
+ :param vrf: str
+ :return: list
+ """
+ import json
+ from vyos.utils.process import cmd
+ if not interface_exists(vrf):
+ raise ValueError(f'VRF "{vrf}" does not exist!')
+ output = cmd(f'ip --json --brief link show master {vrf}')
+ answer = json.loads(output)
+ interfaces = []
+ for data in answer:
+ if 'ifname' in data:
+ interfaces.append(data.get('ifname'))
+ return interfaces
+
+def get_interface_vrf(interface):
+ """ Returns VRF of given interface """
+ from vyos.utils.dict import dict_search
+ from vyos.utils.network import get_interface_config
+ tmp = get_interface_config(interface)
+ if dict_search('linkinfo.info_slave_kind', tmp) == 'vrf':
+ return tmp['master']
+ return 'default'
+
+def get_interface_config(interface):
+ """ Returns the used encapsulation protocol for given interface.
+ If interface does not exist, None is returned.
+ """
+ import os
+ if not os.path.exists(f'/sys/class/net/{interface}'):
+ return None
+ from json import loads
+ from vyos.utils.process import cmd
+ tmp = loads(cmd(f'ip --detail --json link show dev {interface}'))[0]
+ return tmp
+
+def get_interface_address(interface):
+ """ Returns the used encapsulation protocol for given interface.
+ If interface does not exist, None is returned.
+ """
+ import os
+ if not os.path.exists(f'/sys/class/net/{interface}'):
+ return None
+ from json import loads
+ from vyos.utils.process import cmd
+ tmp = loads(cmd(f'ip --detail --json addr show dev {interface}'))[0]
+ return tmp
+
+def get_interface_namespace(iface):
+ """
+ Returns wich netns the interface belongs to
+ """
+ from json import loads
+ from vyos.utils.process import cmd
+ # Check if netns exist
+ tmp = loads(cmd(f'ip --json netns ls'))
+ if len(tmp) == 0:
+ return None
+
+ for ns in tmp:
+ netns = f'{ns["name"]}'
+ # Search interface in each netns
+ data = loads(cmd(f'ip netns exec {netns} ip --json link show'))
+ for tmp in data:
+ if iface == tmp["ifname"]:
+ return netns
+
+def is_wwan_connected(interface):
+ """ Determine if a given WWAN interface, e.g. wwan0 is connected to the
+ carrier network or not """
+ import json
+ from vyos.utils.process import cmd
+
+ if not interface.startswith('wwan'):
+ raise ValueError(f'Specified interface "{interface}" is not a WWAN interface')
+
+ # ModemManager is required for connection(s) - if service is not running,
+ # there won't be any connection at all!
+ if not is_systemd_service_active('ModemManager.service'):
+ return False
+
+ modem = interface.lstrip('wwan')
+
+ tmp = cmd(f'mmcli --modem {modem} --output-json')
+ tmp = json.loads(tmp)
+
+ # return True/False if interface is in connected state
+ return dict_search('modem.generic.state', tmp) == 'connected'
+
+def get_bridge_fdb(interface):
+ """ Returns the forwarding database entries for a given interface """
+ import os
+ if not os.path.exists(f'/sys/class/net/{interface}'):
+ return None
+ from json import loads
+ from vyos.utils.process import cmd
+ tmp = loads(cmd(f'bridge -j fdb show dev {interface}'))
+ return tmp
+
+def get_all_vrfs():
+ """ Return a dictionary of all system wide known VRF instances """
+ from json import loads
+ from vyos.utils.process import cmd
+ tmp = loads(cmd('ip --json vrf list'))
+ # Result is of type [{"name":"red","table":1000},{"name":"blue","table":2000}]
+ # so we will re-arrange it to a more nicer representation:
+ # {'red': {'table': 1000}, 'blue': {'table': 2000}}
+ data = {}
+ for entry in tmp:
+ name = entry.pop('name')
+ data[name] = entry
+ return data
+
+def mac2eui64(mac, prefix=None):
+ """
+ Convert a MAC address to a EUI64 address or, with prefix provided, a full
+ IPv6 address.
+ Thankfully copied from https://gist.github.com/wido/f5e32576bb57b5cc6f934e177a37a0d3
+ """
+ import re
+ from ipaddress import ip_network
+ # http://tools.ietf.org/html/rfc4291#section-2.5.1
+ eui64 = re.sub(r'[.:-]', '', mac).lower()
+ eui64 = eui64[0:6] + 'fffe' + eui64[6:]
+ eui64 = hex(int(eui64[0:2], 16) ^ 2)[2:].zfill(2) + eui64[2:]
+
+ if prefix is None:
+ return ':'.join(re.findall(r'.{4}', eui64))
+ else:
+ try:
+ net = ip_network(prefix, strict=False)
+ euil = int('0x{0}'.format(eui64), 16)
+ return str(net[euil])
+ except: # pylint: disable=bare-except
+ return
+
+def check_port_availability(ipaddress, port, protocol):
+ """
+ Check if port is available and not used by any service
+ Return False if a port is busy or IP address does not exists
+ Should be used carefully for services that can start listening
+ dynamically, because IP address may be dynamic too
+ """
+ from socketserver import TCPServer, UDPServer
+ from ipaddress import ip_address
+
+ # verify arguments
+ try:
+ ipaddress = ip_address(ipaddress).compressed
+ except:
+ raise ValueError(f'The {ipaddress} is not a valid IPv4 or IPv6 address')
+ if port not in range(1, 65536):
+ raise ValueError(f'The port number {port} is not in the 1-65535 range')
+ if protocol not in ['tcp', 'udp']:
+ raise ValueError(f'The protocol {protocol} is not supported. Only tcp and udp are allowed')
+
+ # check port availability
+ try:
+ if protocol == 'tcp':
+ server = TCPServer((ipaddress, port), None, bind_and_activate=True)
+ if protocol == 'udp':
+ server = UDPServer((ipaddress, port), None, bind_and_activate=True)
+ server.server_close()
+ except Exception as e:
+ # errno.h:
+ #define EADDRINUSE 98 /* Address already in use */
+ if e.errno == 98:
+ return False
+
+ return True
+
+def is_listen_port_bind_service(port: int, service: str) -> bool:
+ """Check if listen port bound to expected program name
+ :param port: Bind port
+ :param service: Program name
+ :return: bool
+
+ Example:
+ % is_listen_port_bind_service(443, 'nginx')
+ True
+ % is_listen_port_bind_service(443, 'ocserv-main')
+ False
+ """
+ from psutil import net_connections as connections
+ from psutil import Process as process
+ for connection in connections():
+ addr = connection.laddr
+ pid = connection.pid
+ pid_name = process(pid).name()
+ pid_port = addr.port
+ if service == pid_name and port == pid_port:
+ return True
+ return False
+
+def is_ipv6_link_local(addr):
+ """ Check if addrsss is an IPv6 link-local address. Returns True/False """
+ from ipaddress import ip_interface
+ from vyos.template import is_ipv6
+ addr = addr.split('%')[0]
+ if is_ipv6(addr):
+ if ip_interface(addr).is_link_local:
+ return True
+
+ return False
+
+def is_addr_assigned(ip_address, vrf=None) -> bool:
+ """ Verify if the given IPv4/IPv6 address is assigned to any interface """
+ from netifaces import interfaces
+ from vyos.utils.network import get_interface_config
+ from vyos.utils.dict import dict_search
+
+ for interface in interfaces():
+ # Check if interface belongs to the requested VRF, if this is not the
+ # case there is no need to proceed with this data set - continue loop
+ # with next element
+ tmp = get_interface_config(interface)
+ if dict_search('master', tmp) != vrf:
+ continue
+
+ if is_intf_addr_assigned(interface, ip_address):
+ return True
+
+ return False
+
+def is_intf_addr_assigned(intf, address) -> bool:
+ """
+ Verify if the given IPv4/IPv6 address is assigned to specific interface.
+ It can check both a single IP address (e.g. 192.0.2.1 or a assigned CIDR
+ address 192.0.2.1/24.
+ """
+ from vyos.template import is_ipv4
+
+ from netifaces import ifaddresses
+ from netifaces import AF_INET
+ from netifaces import AF_INET6
+
+ # check if the requested address type is configured at all
+ # {
+ # 17: [{'addr': '08:00:27:d9:5b:04', 'broadcast': 'ff:ff:ff:ff:ff:ff'}],
+ # 2: [{'addr': '10.0.2.15', 'netmask': '255.255.255.0', 'broadcast': '10.0.2.255'}],
+ # 10: [{'addr': 'fe80::a00:27ff:fed9:5b04%eth0', 'netmask': 'ffff:ffff:ffff:ffff::'}]
+ # }
+ try:
+ addresses = ifaddresses(intf)
+ except ValueError as e:
+ print(e)
+ return False
+
+ # determine IP version (AF_INET or AF_INET6) depending on passed address
+ addr_type = AF_INET if is_ipv4(address) else AF_INET6
+
+ # Check every IP address on this interface for a match
+ netmask = None
+ if '/' in address:
+ address, netmask = address.split('/')
+ for ip in addresses.get(addr_type, []):
+ # ip can have the interface name in the 'addr' field, we need to remove it
+ # {'addr': 'fe80::a00:27ff:fec5:f821%eth2', 'netmask': 'ffff:ffff:ffff:ffff::'}
+ ip_addr = ip['addr'].split('%')[0]
+
+ if not _are_same_ip(address, ip_addr):
+ continue
+
+ # we do not have a netmask to compare against, they are the same
+ if not netmask:
+ return True
+
+ prefixlen = ''
+ if is_ipv4(ip_addr):
+ prefixlen = sum([bin(int(_)).count('1') for _ in ip['netmask'].split('.')])
+ else:
+ prefixlen = sum([bin(int(_,16)).count('1') for _ in ip['netmask'].split('/')[0].split(':') if _])
+
+ if str(prefixlen) == netmask:
+ return True
+
+ return False
+
+def is_loopback_addr(addr):
+ """ Check if supplied IPv4/IPv6 address is a loopback address """
+ from ipaddress import ip_address
+ return ip_address(addr).is_loopback
+
+def is_wireguard_key_pair(private_key: str, public_key:str) -> bool:
+ """
+ Checks if public/private keys are keypair
+ :param private_key: Wireguard private key
+ :type private_key: str
+ :param public_key: Wireguard public key
+ :type public_key: str
+ :return: If public/private keys are keypair returns True else False
+ :rtype: bool
+ """
+ from vyos.utils.process import cmd
+ gen_public_key = cmd('wg pubkey', input=private_key)
+ if gen_public_key == public_key:
+ return True
+ else:
+ return False
+
+def is_subnet_connected(subnet, primary=False):
+ """
+ Verify is the given IPv4/IPv6 subnet is connected to any interface on this
+ system.
+
+ primary check if the subnet is reachable via the primary IP address of this
+ interface, or in other words has a broadcast address configured. ISC DHCP
+ for instance will complain if it should listen on non broadcast interfaces.
+
+ Return True/False
+ """
+ from ipaddress import ip_address
+ from ipaddress import ip_network
+
+ from netifaces import ifaddresses
+ from netifaces import interfaces
+ from netifaces import AF_INET
+ from netifaces import AF_INET6
+
+ from vyos.template import is_ipv6
+
+ # determine IP version (AF_INET or AF_INET6) depending on passed address
+ addr_type = AF_INET
+ if is_ipv6(subnet):
+ addr_type = AF_INET6
+
+ for interface in interfaces():
+ # check if the requested address type is configured at all
+ if addr_type not in ifaddresses(interface).keys():
+ continue
+
+ # An interface can have multiple addresses, but some software components
+ # only support the primary address :(
+ if primary:
+ ip = ifaddresses(interface)[addr_type][0]['addr']
+ if ip_address(ip) in ip_network(subnet):
+ return True
+ else:
+ # Check every assigned IP address if it is connected to the subnet
+ # in question
+ for ip in ifaddresses(interface)[addr_type]:
+ # remove interface extension (e.g. %eth0) that gets thrown on the end of _some_ addrs
+ addr = ip['addr'].split('%')[0]
+ if ip_address(addr) in ip_network(subnet):
+ return True
+
+ return False
+
+def is_afi_configured(interface, afi):
+ """ Check if given address family is configured, or in other words - an IP
+ address is assigned to the interface. """
+ from netifaces import ifaddresses
+ from netifaces import AF_INET
+ from netifaces import AF_INET6
+
+ if afi not in [AF_INET, AF_INET6]:
+ raise ValueError('Address family must be in [AF_INET, AF_INET6]')
+
+ try:
+ addresses = ifaddresses(interface)
+ except ValueError as e:
+ print(e)
+ return False
+
+ return afi in addresses
diff --git a/python/vyos/utils/permission.py b/python/vyos/utils/permission.py
new file mode 100644
index 000000000..d938b494f
--- /dev/null
+++ b/python/vyos/utils/permission.py
@@ -0,0 +1,78 @@
+# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+
+def chown(path, user, group):
+ """ change file/directory owner """
+ from pwd import getpwnam
+ from grp import getgrnam
+
+ if user is None or group is None:
+ return False
+
+ # path may also be an open file descriptor
+ if not isinstance(path, int) and not os.path.exists(path):
+ return False
+
+ uid = getpwnam(user).pw_uid
+ gid = getgrnam(group).gr_gid
+ os.chown(path, uid, gid)
+ return True
+
+def chmod(path, bitmask):
+ # path may also be an open file descriptor
+ if not isinstance(path, int) and not os.path.exists(path):
+ return
+ if bitmask is None:
+ return
+ os.chmod(path, bitmask)
+
+def chmod_600(path):
+ """ make file only read/writable by owner """
+ from stat import S_IRUSR, S_IWUSR
+
+ bitmask = S_IRUSR | S_IWUSR
+ chmod(path, bitmask)
+
+def chmod_750(path):
+ """ make file/directory only executable to user and group """
+ from stat import S_IRUSR, S_IWUSR, S_IXUSR, S_IRGRP, S_IXGRP
+
+ bitmask = S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IXGRP
+ chmod(path, bitmask)
+
+def chmod_755(path):
+ """ make file executable by all """
+ from stat import S_IRUSR, S_IWUSR, S_IXUSR, S_IRGRP, S_IXGRP, S_IROTH, S_IXOTH
+
+ bitmask = S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IXGRP | \
+ S_IROTH | S_IXOTH
+ chmod(path, bitmask)
+
+def is_admin() -> bool:
+ """Look if current user is in sudo group"""
+ from getpass import getuser
+ from grp import getgrnam
+ current_user = getuser()
+ (_, _, _, admin_group_members) = getgrnam('sudo')
+ return current_user in admin_group_members
+
+def get_cfg_group_id():
+ from grp import getgrnam
+ from vyos.defaults import cfg_group
+
+ group_data = getgrnam(cfg_group)
+ return group_data.gr_gid
diff --git a/python/vyos/utils/process.py b/python/vyos/utils/process.py
new file mode 100644
index 000000000..e09c7d86d
--- /dev/null
+++ b/python/vyos/utils/process.py
@@ -0,0 +1,232 @@
+# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+
+from subprocess import Popen
+from subprocess import PIPE
+from subprocess import STDOUT
+from subprocess import DEVNULL
+
+def popen(command, flag='', shell=None, input=None, timeout=None, env=None,
+ stdout=PIPE, stderr=PIPE, decode='utf-8'):
+ """
+ popen is a wrapper helper aound subprocess.Popen
+ with it default setting it will return a tuple (out, err)
+ out: the output of the program run
+ err: the error code returned by the program
+
+ it can be affected by the following flags:
+ shell: do not try to auto-detect if a shell is required
+ for example if a pipe (|) or redirection (>, >>) is used
+ input: data to sent to the child process via STDIN
+ the data should be bytes but string will be converted
+ timeout: time after which the command will be considered to have failed
+ env: mapping that defines the environment variables for the new process
+ stdout: define how the output of the program should be handled
+ - PIPE (default), sends stdout to the output
+ - DEVNULL, discard the output
+ stderr: define how the output of the program should be handled
+ - None (default), send/merge the data to/with stderr
+ - PIPE, popen will append it to output
+ - STDOUT, send the data to be merged with stdout
+ - DEVNULL, discard the output
+ decode: specify the expected text encoding (utf-8, ascii, ...)
+ the default is explicitely utf-8 which is python's own default
+
+ usage:
+ get both stdout and stderr: popen('command', stdout=PIPE, stderr=STDOUT)
+ discard stdout and get stderr: popen('command', stdout=DEVNUL, stderr=PIPE)
+ """
+
+ # airbag must be left as an import in the function as otherwise we have a
+ # a circual import dependency
+ from vyos import debug
+ from vyos import airbag
+
+ # log if the flag is set, otherwise log if command is set
+ if not debug.enabled(flag):
+ flag = 'command'
+
+ cmd_msg = f"cmd '{command}'"
+ debug.message(cmd_msg, flag)
+
+ use_shell = shell
+ stdin = None
+ if shell is None:
+ use_shell = False
+ if ' ' in command:
+ use_shell = True
+ if env:
+ use_shell = True
+
+ if input:
+ stdin = PIPE
+ input = input.encode() if type(input) is str else input
+
+ p = Popen(command, stdin=stdin, stdout=stdout, stderr=stderr,
+ env=env, shell=use_shell)
+
+ pipe = p.communicate(input, timeout)
+
+ pipe_out = b''
+ if stdout == PIPE:
+ pipe_out = pipe[0]
+
+ pipe_err = b''
+ if stderr == PIPE:
+ pipe_err = pipe[1]
+
+ str_out = pipe_out.decode(decode).replace('\r\n', '\n').strip()
+ str_err = pipe_err.decode(decode).replace('\r\n', '\n').strip()
+
+ out_msg = f"returned (out):\n{str_out}"
+ if str_out:
+ debug.message(out_msg, flag)
+
+ if str_err:
+ from sys import stderr
+ err_msg = f"returned (err):\n{str_err}"
+ # this message will also be send to syslog via airbag
+ debug.message(err_msg, flag, destination=stderr)
+
+ # should something go wrong, report this too via airbag
+ airbag.noteworthy(cmd_msg)
+ airbag.noteworthy(out_msg)
+ airbag.noteworthy(err_msg)
+
+ return str_out, p.returncode
+
+
+def run(command, flag='', shell=None, input=None, timeout=None, env=None,
+ stdout=DEVNULL, stderr=PIPE, decode='utf-8'):
+ """
+ A wrapper around popen, which discard the stdout and
+ will return the error code of a command
+ """
+ _, code = popen(
+ command, flag,
+ stdout=stdout, stderr=stderr,
+ input=input, timeout=timeout,
+ env=env, shell=shell,
+ decode=decode,
+ )
+ return code
+
+
+def cmd(command, flag='', shell=None, input=None, timeout=None, env=None,
+ stdout=PIPE, stderr=PIPE, decode='utf-8', raising=None, message='',
+ expect=[0]):
+ """
+ A wrapper around popen, which returns the stdout and
+ will raise the error code of a command
+
+ raising: specify which call should be used when raising
+ the class should only require a string as parameter
+ (default is OSError) with the error code
+ expect: a list of error codes to consider as normal
+ """
+ decoded, code = popen(
+ command, flag,
+ stdout=stdout, stderr=stderr,
+ input=input, timeout=timeout,
+ env=env, shell=shell,
+ decode=decode,
+ )
+ if code not in expect:
+ feedback = message + '\n' if message else ''
+ feedback += f'failed to run command: {command}\n'
+ feedback += f'returned: {decoded}\n'
+ feedback += f'exit code: {code}'
+ if raising is None:
+ # error code can be recovered with .errno
+ raise OSError(code, feedback)
+ else:
+ raise raising(feedback)
+ return decoded
+
+
+def rc_cmd(command, flag='', shell=None, input=None, timeout=None, env=None,
+ stdout=PIPE, stderr=STDOUT, decode='utf-8'):
+ """
+ A wrapper around popen, which returns the return code
+ of a command and stdout
+
+ % rc_cmd('uname')
+ (0, 'Linux')
+ % rc_cmd('ip link show dev eth99')
+ (1, 'Device "eth99" does not exist.')
+ """
+ out, code = popen(
+ command, flag,
+ stdout=stdout, stderr=stderr,
+ input=input, timeout=timeout,
+ env=env, shell=shell,
+ decode=decode,
+ )
+ return code, out
+
+def call(command, flag='', shell=None, input=None, timeout=None, env=None,
+ stdout=None, stderr=None, decode='utf-8'):
+ """
+ A wrapper around popen, which print the stdout and
+ will return the error code of a command
+ """
+ out, code = popen(
+ command, flag,
+ stdout=stdout, stderr=stderr,
+ input=input, timeout=timeout,
+ env=env, shell=shell,
+ decode=decode,
+ )
+ if out:
+ print(out)
+ return code
+
+def process_running(pid_file):
+ """ Checks if a process with PID in pid_file is running """
+ from psutil import pid_exists
+ if not os.path.isfile(pid_file):
+ return False
+ with open(pid_file, 'r') as f:
+ pid = f.read().strip()
+ return pid_exists(int(pid))
+
+def process_named_running(name, cmdline: str=None):
+ """ Checks if process with given name is running and returns its PID.
+ If Process is not running, return None
+ """
+ from psutil import process_iter
+ for p in process_iter(['name', 'pid', 'cmdline']):
+ if cmdline:
+ if p.info['name'] == name and cmdline in p.info['cmdline']:
+ return p.info['pid']
+ elif p.info['name'] == name:
+ return p.info['pid']
+ return None
+
+def is_systemd_service_active(service):
+ """ Test is a specified systemd service is activated.
+ Returns True if service is active, false otherwise.
+ Copied from: https://unix.stackexchange.com/a/435317 """
+ tmp = cmd(f'systemctl show --value -p ActiveState {service}')
+ return bool((tmp == 'active'))
+
+def is_systemd_service_running(service):
+ """ Test is a specified systemd service is actually running.
+ Returns True if service is running, false otherwise.
+ Copied from: https://unix.stackexchange.com/a/435317 """
+ tmp = cmd(f'systemctl show --value -p SubState {service}')
+ return bool((tmp == 'running'))
diff --git a/python/vyos/utils/system.py b/python/vyos/utils/system.py
new file mode 100644
index 000000000..5d41c0c05
--- /dev/null
+++ b/python/vyos/utils/system.py
@@ -0,0 +1,107 @@
+# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+from subprocess import run
+
+def sysctl_read(name: str) -> str:
+ """Read and return current value of sysctl() option
+
+ Args:
+ name (str): sysctl key name
+
+ Returns:
+ str: sysctl key value
+ """
+ tmp = run(['sysctl', '-nb', name], capture_output=True)
+ return tmp.stdout.decode()
+
+def sysctl_write(name: str, value: str | int) -> bool:
+ """Change value via sysctl()
+
+ Args:
+ name (str): sysctl key name
+ value (str | int): sysctl key value
+
+ Returns:
+ bool: True if changed, False otherwise
+ """
+ # convert other types to string before comparison
+ if not isinstance(value, str):
+ value = str(value)
+ # do not change anything if a value is already configured
+ if sysctl_read(name) == value:
+ return True
+ # return False if sysctl call failed
+ if run(['sysctl', '-wq', f'{name}={value}']).returncode != 0:
+ return False
+ # compare old and new values
+ # sysctl may apply value, but its actual value will be
+ # different from requested
+ if sysctl_read(name) == value:
+ return True
+ # False in other cases
+ return False
+
+def sysctl_apply(sysctl_dict: dict[str, str], revert: bool = True) -> bool:
+ """Apply sysctl values.
+
+ Args:
+ sysctl_dict (dict[str, str]): dictionary with sysctl keys with values
+ revert (bool, optional): Revert to original values if new were not
+ applied. Defaults to True.
+
+ Returns:
+ bool: True if all params configured properly, False in other cases
+ """
+ # get current values
+ sysctl_original: dict[str, str] = {}
+ for key_name in sysctl_dict.keys():
+ sysctl_original[key_name] = sysctl_read(key_name)
+ # apply new values and revert in case one of them was not applied
+ for key_name, value in sysctl_dict.items():
+ if not sysctl_write(key_name, value):
+ if revert:
+ sysctl_apply(sysctl_original, revert=False)
+ return False
+ # everything applied
+ return True
+
+def get_half_cpus():
+ """ return 1/2 of the numbers of available CPUs """
+ cpu = os.cpu_count()
+ if cpu > 1:
+ cpu /= 2
+ return int(cpu)
+
+def find_device_file(device):
+ """ Recurively search /dev for the given device file and return its full path.
+ If no device file was found 'None' is returned """
+ from fnmatch import fnmatch
+
+ for root, dirs, files in os.walk('/dev'):
+ for basename in files:
+ if fnmatch(basename, device):
+ return os.path.join(root, basename)
+
+ return None
+
+def load_as_module(name: str, path: str):
+ import importlib.util
+
+ spec = importlib.util.spec_from_file_location(name, path)
+ mod = importlib.util.module_from_spec(spec)
+ spec.loader.exec_module(mod)
+ return mod
diff --git a/python/vyos/validate.py b/python/vyos/validate.py
deleted file mode 100644
index d3e6e9087..000000000
--- a/python/vyos/validate.py
+++ /dev/null
@@ -1,259 +0,0 @@
-# Copyright 2018-2021 VyOS maintainers and contributors <maintainers@vyos.io>
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library. If not, see <http://www.gnu.org/licenses/>.
-
-# Important note when you are adding new validation functions:
-#
-# The Control class will analyse the signature of the function in this file
-# and will build the parameters to be passed to it.
-#
-# The parameter names "ifname" and "self" will get the Interface name and class
-# parameters with default will be left unset
-# all other paramters will receive the value to check
-
-def is_ipv6_link_local(addr):
- """ Check if addrsss is an IPv6 link-local address. Returns True/False """
- from ipaddress import ip_interface
- from vyos.template import is_ipv6
- addr = addr.split('%')[0]
- if is_ipv6(addr):
- if ip_interface(addr).is_link_local:
- return True
-
- return False
-
-def _are_same_ip(one, two):
- from socket import AF_INET
- from socket import AF_INET6
- from socket import inet_pton
- from vyos.template import is_ipv4
- # compare the binary representation of the IP
- f_one = AF_INET if is_ipv4(one) else AF_INET6
- s_two = AF_INET if is_ipv4(two) else AF_INET6
- return inet_pton(f_one, one) == inet_pton(f_one, two)
-
-def is_intf_addr_assigned(ifname, addr, netns=None):
- """Verify if the given IPv4/IPv6 address is assigned to specific interface.
- It can check both a single IP address (e.g. 192.0.2.1 or a assigned CIDR
- address 192.0.2.1/24.
- """
- import json
- import jmespath
- from vyos.util import rc_cmd
- from ipaddress import ip_interface
-
- within_netns = f'ip netns exec {netns}' if netns else ''
- rc, out = rc_cmd(f'{within_netns} ip --json address show dev {ifname}')
- if rc == 0:
- json_out = json.loads(out)
- addresses = jmespath.search("[].addr_info[].{family: family, address: local, prefixlen: prefixlen}", json_out)
- for address_info in addresses:
- family = address_info['family']
- address = address_info['address']
- prefixlen = address_info['prefixlen']
- # Remove the interface name if present in the given address
- if '%' in addr:
- addr = addr.split('%')[0]
- interface = ip_interface(f"{address}/{prefixlen}")
- if ip_interface(addr) == interface or address == addr:
- return True
-
- return False
-
-def is_addr_assigned(ip_address, vrf=None) -> bool:
- """ Verify if the given IPv4/IPv6 address is assigned to any interfac """
- from netifaces import interfaces
- from vyos.util import get_interface_config
- from vyos.util import dict_search
- for interface in interfaces():
- # Check if interface belongs to the requested VRF, if this is not the
- # case there is no need to proceed with this data set - continue loop
- # with next element
- tmp = get_interface_config(interface)
- if dict_search('master', tmp) != vrf:
- continue
-
- if is_intf_addr_assigned(interface, ip_address):
- return True
-
- return False
-
-def is_loopback_addr(addr):
- """ Check if supplied IPv4/IPv6 address is a loopback address """
- from ipaddress import ip_address
- return ip_address(addr).is_loopback
-
-def is_subnet_connected(subnet, primary=False):
- """
- Verify is the given IPv4/IPv6 subnet is connected to any interface on this
- system.
-
- primary check if the subnet is reachable via the primary IP address of this
- interface, or in other words has a broadcast address configured. ISC DHCP
- for instance will complain if it should listen on non broadcast interfaces.
-
- Return True/False
- """
- from ipaddress import ip_address
- from ipaddress import ip_network
-
- from netifaces import ifaddresses
- from netifaces import interfaces
- from netifaces import AF_INET
- from netifaces import AF_INET6
-
- from vyos.template import is_ipv6
-
- # determine IP version (AF_INET or AF_INET6) depending on passed address
- addr_type = AF_INET
- if is_ipv6(subnet):
- addr_type = AF_INET6
-
- for interface in interfaces():
- # check if the requested address type is configured at all
- if addr_type not in ifaddresses(interface).keys():
- continue
-
- # An interface can have multiple addresses, but some software components
- # only support the primary address :(
- if primary:
- ip = ifaddresses(interface)[addr_type][0]['addr']
- if ip_address(ip) in ip_network(subnet):
- return True
- else:
- # Check every assigned IP address if it is connected to the subnet
- # in question
- for ip in ifaddresses(interface)[addr_type]:
- # remove interface extension (e.g. %eth0) that gets thrown on the end of _some_ addrs
- addr = ip['addr'].split('%')[0]
- if ip_address(addr) in ip_network(subnet):
- return True
-
- return False
-
-
-def assert_boolean(b):
- if int(b) not in (0, 1):
- raise ValueError(f'Value {b} out of range')
-
-
-def assert_range(value, lower=0, count=3):
- if int(value, 16) not in range(lower, lower+count):
- raise ValueError("Value out of range")
-
-
-def assert_list(s, l):
- if s not in l:
- o = ' or '.join([f'"{n}"' for n in l])
- raise ValueError(f'state must be {o}, got {s}')
-
-
-def assert_number(n):
- if not str(n).isnumeric():
- raise ValueError(f'{n} must be a number')
-
-
-def assert_positive(n, smaller=0):
- assert_number(n)
- if int(n) < smaller:
- raise ValueError(f'{n} is smaller than {smaller}')
-
-
-def assert_mtu(mtu, ifname):
- assert_number(mtu)
-
- import json
- from vyos.util import cmd
- out = cmd(f'ip -j -d link show dev {ifname}')
- # [{"ifindex":2,"ifname":"eth0","flags":["BROADCAST","MULTICAST","UP","LOWER_UP"],"mtu":1500,"qdisc":"pfifo_fast","operstate":"UP","linkmode":"DEFAULT","group":"default","txqlen":1000,"link_type":"ether","address":"08:00:27:d9:5b:04","broadcast":"ff:ff:ff:ff:ff:ff","promiscuity":0,"min_mtu":46,"max_mtu":16110,"inet6_addr_gen_mode":"none","num_tx_queues":1,"num_rx_queues":1,"gso_max_size":65536,"gso_max_segs":65535}]
- parsed = json.loads(out)[0]
- min_mtu = int(parsed.get('min_mtu', '0'))
- # cur_mtu = parsed.get('mtu',0),
- max_mtu = int(parsed.get('max_mtu', '0'))
- cur_mtu = int(mtu)
-
- if (min_mtu and cur_mtu < min_mtu) or cur_mtu < 68:
- raise ValueError(f'MTU is too small for interface "{ifname}": {mtu} < {min_mtu}')
- if (max_mtu and cur_mtu > max_mtu) or cur_mtu > 65536:
- raise ValueError(f'MTU is too small for interface "{ifname}": {mtu} > {max_mtu}')
-
-
-def assert_mac(m):
- split = m.split(':')
- size = len(split)
-
- # a mac address consits out of 6 octets
- if size != 6:
- raise ValueError(f'wrong number of MAC octets ({size}): {m}')
-
- octets = []
- try:
- for octet in split:
- octets.append(int(octet, 16))
- except ValueError:
- raise ValueError(f'invalid hex number "{octet}" in : {m}')
-
- # validate against the first mac address byte if it's a multicast
- # address
- if octets[0] & 1:
- raise ValueError(f'{m} is a multicast MAC address')
-
- # overall mac address is not allowed to be 00:00:00:00:00:00
- if sum(octets) == 0:
- raise ValueError('00:00:00:00:00:00 is not a valid MAC address')
-
- if octets[:5] == (0, 0, 94, 0, 1):
- raise ValueError(f'{m} is a VRRP MAC address')
-
-def has_address_configured(conf, intf):
- """
- Checks if interface has an address configured.
- Checks the following config nodes:
- 'address', 'ipv6 address eui64', 'ipv6 address autoconf'
-
- Returns True if interface has address configured, False if it doesn't.
- """
- from vyos.ifconfig import Section
- ret = False
-
- old_level = conf.get_level()
- conf.set_level([])
-
- intfpath = 'interfaces ' + Section.get_config_path(intf)
- if ( conf.exists(f'{intfpath} address') or
- conf.exists(f'{intfpath} ipv6 address autoconf') or
- conf.exists(f'{intfpath} ipv6 address eui64') ):
- ret = True
-
- conf.set_level(old_level)
- return ret
-
-def has_vrf_configured(conf, intf):
- """
- Checks if interface has a VRF configured.
-
- Returns True if interface has VRF configured, False if it doesn't.
- """
- from vyos.ifconfig import Section
- ret = False
-
- old_level = conf.get_level()
- conf.set_level([])
-
- tmp = ['interfaces', Section.get_config_path(intf), 'vrf']
- if conf.exists(tmp):
- ret = True
-
- conf.set_level(old_level)
- return ret
diff --git a/python/vyos/version.py b/python/vyos/version.py
index fb706ad44..1c5651c83 100644
--- a/python/vyos/version.py
+++ b/python/vyos/version.py
@@ -1,4 +1,4 @@
-# Copyright 2017-2020 VyOS maintainers and contributors <maintainers@vyos.io>
+# Copyright 2017-2023 VyOS maintainers and contributors <maintainers@vyos.io>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -34,11 +34,11 @@ import json
import requests
import vyos.defaults
-from vyos.util import read_file
-from vyos.util import read_json
-from vyos.util import popen
-from vyos.util import run
-from vyos.util import DEVNULL
+from vyos.utils.file import read_file
+from vyos.utils.file import read_json
+from vyos.utils.process import popen
+from vyos.utils.process import run
+from vyos.utils.process import DEVNULL
version_file = os.path.join(vyos.defaults.directories['data'], 'version.json')
diff --git a/python/vyos/vpp.py b/python/vyos/vpp.py
new file mode 100644
index 000000000..76e5d29c3
--- /dev/null
+++ b/python/vyos/vpp.py
@@ -0,0 +1,315 @@
+# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+from functools import wraps
+from pathlib import Path
+from re import search as re_search, fullmatch as re_fullmatch, MULTILINE as re_M
+from subprocess import run
+from time import sleep
+
+from vpp_papi import VPPApiClient
+from vpp_papi import VPPIOError, VPPValueError
+
+
+class VPPControl:
+ """Control VPP network stack
+ """
+
+ class _Decorators:
+ """Decorators for VPPControl
+ """
+
+ @classmethod
+ def api_call(cls, decorated_func):
+ """Check if API is connected before API call
+
+ Args:
+ decorated_func: function to decorate
+
+ Raises:
+ VPPIOError: Connection to API is not established
+ """
+
+ @wraps(decorated_func)
+ def api_safe_wrapper(cls, *args, **kwargs):
+ if not cls.vpp_api_client.transport.connected:
+ raise VPPIOError(2, 'VPP API is not connected')
+ return decorated_func(cls, *args, **kwargs)
+
+ return api_safe_wrapper
+
+ @classmethod
+ def check_retval(cls, decorated_func):
+ """Check retval from API response
+
+ Args:
+ decorated_func: function to decorate
+
+ Raises:
+ VPPValueError: raised when retval is not 0
+ """
+
+ @wraps(decorated_func)
+ def check_retval_wrapper(cls, *args, **kwargs):
+ return_value = decorated_func(cls, *args, **kwargs)
+ if not return_value.retval == 0:
+ raise VPPValueError(
+ f'VPP API call failed: {return_value.retval}')
+ return return_value
+
+ return check_retval_wrapper
+
+ def __init__(self, attempts: int = 5, interval: int = 1000) -> None:
+ """Create VPP API connection
+
+ Args:
+ attempts (int, optional): attempts to connect. Defaults to 5.
+ interval (int, optional): interval between attempts in ms. Defaults to 1000.
+
+ Raises:
+ VPPIOError: Connection to API cannot be established
+ """
+ self.vpp_api_client = VPPApiClient()
+ # connect with interval
+ while attempts:
+ try:
+ attempts -= 1
+ self.vpp_api_client.connect('vpp-vyos')
+ break
+ except (ConnectionRefusedError, FileNotFoundError) as err:
+ print(f'VPP API connection timeout: {err}')
+ sleep(interval / 1000)
+ # raise exception if connection was not successful in the end
+ if not self.vpp_api_client.transport.connected:
+ raise VPPIOError(2, 'Cannot connect to VPP API')
+
+ def __del__(self) -> None:
+ """Disconnect from VPP API (destructor)
+ """
+ self.disconnect()
+
+ def disconnect(self) -> None:
+ """Disconnect from VPP API
+ """
+ if self.vpp_api_client.transport.connected:
+ self.vpp_api_client.disconnect()
+
+ @_Decorators.check_retval
+ @_Decorators.api_call
+ def cli_cmd(self, command: str):
+ """Send raw CLI command
+
+ Args:
+ command (str): command to send
+
+ Returns:
+ vpp_papi.vpp_serializer.cli_inband_reply: CLI reply class
+ """
+ return self.vpp_api_client.api.cli_inband(cmd=command)
+
+ @_Decorators.api_call
+ def get_mac(self, ifname: str) -> str:
+ """Find MAC address by interface name in VPP
+
+ Args:
+ ifname (str): interface name inside VPP
+
+ Returns:
+ str: MAC address
+ """
+ for iface in self.vpp_api_client.api.sw_interface_dump():
+ if iface.interface_name == ifname:
+ return iface.l2_address.mac_string
+ return ''
+
+ @_Decorators.api_call
+ def get_sw_if_index(self, ifname: str) -> int | None:
+ """Find interface index by interface name in VPP
+
+ Args:
+ ifname (str): interface name inside VPP
+
+ Returns:
+ int | None: Interface index or None (if was not fount)
+ """
+ for iface in self.vpp_api_client.api.sw_interface_dump():
+ if iface.interface_name == ifname:
+ return iface.sw_if_index
+ return None
+
+ @_Decorators.check_retval
+ @_Decorators.api_call
+ def lcp_pair_add(self, iface_name_vpp: str, iface_name_kernel: str) -> None:
+ """Create LCP interface pair between VPP and kernel
+
+ Args:
+ iface_name_vpp (str): interface name in VPP
+ iface_name_kernel (str): interface name in kernel
+ """
+ iface_index = self.get_sw_if_index(iface_name_vpp)
+ if iface_index:
+ return self.vpp_api_client.api.lcp_itf_pair_add_del(
+ is_add=True,
+ sw_if_index=iface_index,
+ host_if_name=iface_name_kernel)
+
+ @_Decorators.check_retval
+ @_Decorators.api_call
+ def lcp_pair_del(self, iface_name_vpp: str, iface_name_kernel: str) -> None:
+ """Delete LCP interface pair between VPP and kernel
+
+ Args:
+ iface_name_vpp (str): interface name in VPP
+ iface_name_kernel (str): interface name in kernel
+ """
+ iface_index = self.get_sw_if_index(iface_name_vpp)
+ if iface_index:
+ return self.vpp_api_client.api.lcp_itf_pair_add_del(
+ is_add=False,
+ sw_if_index=iface_index,
+ host_if_name=iface_name_kernel)
+
+ @_Decorators.check_retval
+ @_Decorators.api_call
+ def iface_rxmode(self, iface_name: str, rx_mode: str) -> None:
+ """Set interface rx-mode in VPP
+
+ Args:
+ iface_name (str): interface name in VPP
+ rx_mode (str): mode (polling, interrupt, adaptive)
+ """
+ modes_dict: dict[str, int] = {
+ 'polling': 1,
+ 'interrupt': 2,
+ 'adaptive': 3
+ }
+ if rx_mode not in modes_dict:
+ raise VPPValueError(f'Mode {rx_mode} is not known')
+ iface_index = self.get_sw_if_index(iface_name)
+ return self.vpp_api_client.api.sw_interface_set_rx_mode(
+ sw_if_index=iface_index, mode=modes_dict[rx_mode])
+
+ @_Decorators.api_call
+ def get_pci_addr(self, ifname: str) -> str:
+ """Find PCI address of interface by interface name in VPP
+
+ Args:
+ ifname (str): interface name inside VPP
+
+ Returns:
+ str: PCI address
+ """
+ hw_info = self.cli_cmd(f'show hardware-interfaces {ifname}').reply
+
+ regex_filter = r'^\s+pci: device (?P<device>\w+:\w+) subsystem (?P<subsystem>\w+:\w+) address (?P<address>\w+:\w+:\w+\.\w+) numa (?P<numa>\w+)$'
+ re_obj = re_search(regex_filter, hw_info, re_M)
+
+ # return empty string if no interface or no PCI info was found
+ if not hw_info or not re_obj:
+ return ''
+
+ address = re_obj.groupdict().get('address', '')
+
+ # we need to modify address to math kernel style
+ # for example: 0000:06:14.00 -> 0000:06:14.0
+ address_chunks: list[str] = address.split('.')
+ address_normalized: str = f'{address_chunks[0]}.{int(address_chunks[1])}'
+
+ return address_normalized
+
+
+class HostControl:
+ """Control Linux host
+ """
+
+ @staticmethod
+ def pci_rescan(pci_addr: str = '') -> None:
+ """Rescan PCI device by removing it and rescan PCI bus
+
+ If PCI address is not defined - just rescan PCI bus
+
+ Args:
+ address (str, optional): PCI address of device. Defaults to ''.
+ """
+ if pci_addr:
+ device_file = Path(f'/sys/bus/pci/devices/{pci_addr}/remove')
+ if device_file.exists():
+ device_file.write_text('1')
+ # wait 10 seconds max until device will be removed
+ attempts = 100
+ while device_file.exists() and attempts:
+ attempts -= 1
+ sleep(0.1)
+ if device_file.exists():
+ raise TimeoutError(
+ f'Timeout was reached for removing PCI device {pci_addr}'
+ )
+ else:
+ raise FileNotFoundError(f'PCI device {pci_addr} does not exist')
+ rescan_file = Path('/sys/bus/pci/rescan')
+ rescan_file.write_text('1')
+ if pci_addr:
+ # wait 10 seconds max until device will be installed
+ attempts = 100
+ while not device_file.exists() and attempts:
+ attempts -= 1
+ sleep(0.1)
+ if not device_file.exists():
+ raise TimeoutError(
+ f'Timeout was reached for installing PCI device {pci_addr}')
+
+ @staticmethod
+ def get_eth_name(pci_addr: str) -> str:
+ """Find Ethernet interface name by PCI address
+
+ Args:
+ pci_addr (str): PCI address
+
+ Raises:
+ FileNotFoundError: no Ethernet interface was found
+
+ Returns:
+ str: Ethernet interface name
+ """
+ # find all PCI devices with eth* names
+ net_devs: dict[str, str] = {}
+ net_devs_dir = Path('/sys/class/net')
+ regex_filter = r'^/sys/devices/pci[\w/:\.]+/(?P<pci_addr>\w+:\w+:\w+\.\w+)/[\w/:\.]+/(?P<iface_name>eth\d+)$'
+ for dir in net_devs_dir.iterdir():
+ real_dir: str = dir.resolve().as_posix()
+ re_obj = re_fullmatch(regex_filter, real_dir)
+ if re_obj:
+ iface_name: str = re_obj.group('iface_name')
+ iface_addr: str = re_obj.group('pci_addr')
+ net_devs.update({iface_addr: iface_name})
+ # match to provided PCI address and return a name if found
+ if pci_addr in net_devs:
+ return net_devs[pci_addr]
+ # raise error if device was not found
+ raise FileNotFoundError(
+ f'PCI device {pci_addr} not found in ethernet interfaces')
+
+ @staticmethod
+ def rename_iface(name_old: str, name_new: str) -> None:
+ """Rename interface
+
+ Args:
+ name_old (str): old name
+ name_new (str): new name
+ """
+ rename_cmd: list[str] = [
+ 'ip', 'link', 'set', name_old, 'name', name_new
+ ]
+ run(rename_cmd)
diff --git a/python/vyos/xml_ref/__init__.py b/python/vyos/xml_ref/__init__.py
index 2e144ef10..bf434865d 100644
--- a/python/vyos/xml_ref/__init__.py
+++ b/python/vyos/xml_ref/__init__.py
@@ -13,8 +13,12 @@
# You should have received a copy of the GNU Lesser General Public License
# along with this library. If not, see <http://www.gnu.org/licenses/>.
+from typing import Optional, Union, TYPE_CHECKING
from vyos.xml_ref import definition
+if TYPE_CHECKING:
+ from vyos.config import ConfigDict
+
def load_reference(cache=[]):
if cache:
return cache[0]
@@ -23,11 +27,15 @@ def load_reference(cache=[]):
try:
from vyos.xml_ref.cache import reference
- xml.define(reference)
- cache.append(xml)
except Exception:
raise ImportError('no xml reference cache !!')
+ if not reference:
+ raise ValueError('empty xml reference cache !!')
+
+ xml.define(reference)
+ cache.append(xml)
+
return xml
def is_tag(path: list) -> bool:
@@ -51,6 +59,9 @@ def cli_defined(path: list, node: str, non_local=False) -> bool:
def component_version() -> dict:
return load_reference().component_version()
+def default_value(path: list) -> Optional[Union[str, list]]:
+ return load_reference().default_value(path)
+
def multi_to_list(rpath: list, conf: dict) -> dict:
return load_reference().multi_to_list(rpath, conf)
@@ -58,12 +69,15 @@ def get_defaults(path: list, get_first_key=False, recursive=False) -> dict:
return load_reference().get_defaults(path, get_first_key=get_first_key,
recursive=recursive)
-def get_config_defaults(rpath: list, conf: dict, get_first_key=False,
- recursive=False) -> dict:
+def relative_defaults(rpath: list, conf: dict, get_first_key=False,
+ recursive=False) -> dict:
- return load_reference().relative_defaults(rpath, conf=conf,
+ return load_reference().relative_defaults(rpath, conf,
get_first_key=get_first_key,
recursive=recursive)
-def merge_defaults(path: list, conf: dict) -> dict:
- return load_reference().merge_defaults(path, conf)
+def from_source(d: dict, path: list) -> bool:
+ return definition.from_source(d, path)
+
+def ext_dict_merge(source: dict, destination: Union[dict, 'ConfigDict']):
+ return definition.ext_dict_merge(source, destination)
diff --git a/python/vyos/xml_ref/definition.py b/python/vyos/xml_ref/definition.py
index 95ecc01a6..c90c5ddbc 100644
--- a/python/vyos/xml_ref/definition.py
+++ b/python/vyos/xml_ref/definition.py
@@ -13,8 +13,51 @@
# You should have received a copy of the GNU Lesser General Public License
# along with this library. If not, see <http://www.gnu.org/licenses/>.
-from typing import Union, Any
-from vyos.configdict import dict_merge
+from typing import Optional, Union, Any, TYPE_CHECKING
+
+# https://peps.python.org/pep-0484/#forward-references
+# for type 'ConfigDict'
+if TYPE_CHECKING:
+ from vyos.config import ConfigDict
+
+def set_source_recursive(o: Union[dict, str, list], b: bool):
+ d = {}
+ if not isinstance(o, dict):
+ d = {'_source': b}
+ else:
+ for k, v in o.items():
+ d[k] = set_source_recursive(v, b)
+ d |= {'_source': b}
+ return d
+
+def source_dict_merge(src: dict, dest: dict):
+ from copy import deepcopy
+ dst = deepcopy(dest)
+ from_src = {}
+
+ for key, value in src.items():
+ if key not in dst:
+ dst[key] = value
+ from_src[key] = set_source_recursive(value, True)
+ elif isinstance(src[key], dict):
+ dst[key], f = source_dict_merge(src[key], dst[key])
+ f |= {'_source': False}
+ from_src[key] = f
+
+ return dst, from_src
+
+def ext_dict_merge(src: dict, dest: Union[dict, 'ConfigDict']):
+ d, f = source_dict_merge(src, dest)
+ if hasattr(d, '_from_defaults'):
+ setattr(d, '_from_defaults', f)
+ return d
+
+def from_source(d: dict, path: list) -> bool:
+ for key in path:
+ d = d[key] if key in d else {}
+ if not d or not isinstance(d, dict):
+ return False
+ return d.get('_source', False)
class Xml:
def __init__(self):
@@ -119,14 +162,11 @@ class Xml:
def component_version(self) -> dict:
d = {}
- for k, v in self.ref['component_version']:
+ for k, v in self.ref['component_version'].items():
d[k] = int(v)
return d
def multi_to_list(self, rpath: list, conf: dict) -> dict:
- if rpath and rpath[-1] in list(conf):
- raise ValueError('rpath should be disjoint from conf keys')
-
res: Any = {}
for k in list(conf):
@@ -141,9 +181,26 @@ class Xml:
return res
- def _get_default_value(self, node: dict):
+ def _get_default_value(self, node: dict) -> Optional[str]:
return self._get_ref_node_data(node, "default_value")
+ def _get_default(self, node: dict) -> Optional[Union[str, list]]:
+ default = self._get_default_value(node)
+ if default is None:
+ return None
+ if self._is_multi_node(node):
+ return default.split()
+ return default
+
+ def default_value(self, path: list) -> Optional[Union[str, list]]:
+ d = self._get_ref_path(path)
+ default = self._get_default_value(d)
+ if default is None:
+ return None
+ if self._is_multi_node(d) or self._is_tag_node(d):
+ return default.split()
+ return default
+
def get_defaults(self, path: list, get_first_key=False, recursive=False) -> dict:
"""Return dict containing default values below path
@@ -153,18 +210,23 @@ class Xml:
'relative_defaults'
"""
res: dict = {}
+ if self.is_tag(path):
+ return res
+
d = self._get_ref_path(path)
+
+ if self._is_leaf_node(d):
+ default_value = self._get_default(d)
+ if default_value is not None:
+ return {path[-1]: default_value} if path else {}
+
for k in list(d):
if k in ('node_data', 'component_version') :
continue
- d_k = d[k]
- if self._is_leaf_node(d_k):
- default_value = self._get_default_value(d_k)
+ if self._is_leaf_node(d[k]):
+ default_value = self._get_default(d[k])
if default_value is not None:
- pos = default_value
- if self._is_multi_node(d_k) and not isinstance(pos, list):
- pos = [pos]
- res |= {k: pos}
+ res |= {k: default_value}
elif self.is_tag(path + [k]):
# tag node defaults are used as suggestion, not default value;
# should this change, append to path and continue if recursive
@@ -175,8 +237,6 @@ class Xml:
res |= pos
if res:
if get_first_key or not path:
- if not isinstance(res, dict):
- raise TypeError("Cannot get_first_key as data under node is not of type dict")
return res
return {path[-1]: res}
@@ -188,7 +248,7 @@ class Xml:
return [next(iter(c.keys()))] if c else []
try:
tmp = step(conf)
- if self.is_tag_value(path + tmp):
+ if tmp and self.is_tag_value(path + tmp):
c = conf[tmp[0]]
if not isinstance(c, dict):
raise ValueError
@@ -200,57 +260,43 @@ class Xml:
return False
return True
- def relative_defaults(self, rpath: list, conf: dict, get_first_key=False,
- recursive=False) -> dict:
- """Return dict containing defaults along paths of a config dict
- """
- if not conf:
- return self.get_defaults(rpath, get_first_key=get_first_key,
- recursive=recursive)
- if rpath and rpath[-1] in list(conf):
- conf = conf[rpath[-1]]
- if not isinstance(conf, dict):
- raise TypeError('conf at path is not of type dict')
-
- if not self._well_defined(rpath, conf):
- print('path to config dict does not define full config paths')
- return {}
-
+ def _relative_defaults(self, rpath: list, conf: dict, recursive=False) -> dict:
res: dict = {}
+ res = self.get_defaults(rpath, recursive=recursive,
+ get_first_key=True)
for k in list(conf):
- pos = self.get_defaults(rpath + [k], recursive=recursive)
- res |= pos
-
if isinstance(conf[k], dict):
- step = self.relative_defaults(rpath + [k], conf=conf[k],
- recursive=recursive)
+ step = self._relative_defaults(rpath + [k], conf=conf[k],
+ recursive=recursive)
res |= step
if res:
- if get_first_key:
- return res
return {rpath[-1]: res} if rpath else res
return {}
- def merge_defaults(self, path: list, conf: dict) -> dict:
- """Return config dict with defaults non-destructively merged
-
- This merges non-recursive defaults relative to the config dict.
+ def relative_defaults(self, path: list, conf: dict, get_first_key=False,
+ recursive=False) -> dict:
+ """Return dict containing defaults along paths of a config dict
"""
- if path[-1] in list(conf):
- config = conf[path[-1]]
- if not isinstance(config, dict):
- raise TypeError('conf at path is not of type dict')
- shift = False
- else:
- config = conf
- shift = True
-
- if not self._well_defined(path, config):
- print('path to config dict does not define config paths; conf returned unchanged')
- return conf
-
- d = self.relative_defaults(path, conf=config, get_first_key=shift)
- d = dict_merge(d, conf)
- return d
+ if not conf:
+ return self.get_defaults(path, get_first_key=get_first_key,
+ recursive=recursive)
+ if not self._well_defined(path, conf):
+ # adjust for possible overlap:
+ if path and path[-1] in list(conf):
+ conf = conf[path[-1]]
+ conf = {} if not isinstance(conf, dict) else conf
+ if not self._well_defined(path, conf):
+ print('path to config dict does not define full config paths')
+ return {}
+
+ res = self._relative_defaults(path, conf, recursive=recursive)
+
+ if get_first_key and path:
+ if res.values():
+ res = next(iter(res.values()))
+ else:
+ res = {}
+
+ return res
diff --git a/python/vyos/xml_ref/generate_cache.py b/python/vyos/xml_ref/generate_cache.py
index 792c6eea7..6a05d4608 100755
--- a/python/vyos/xml_ref/generate_cache.py
+++ b/python/vyos/xml_ref/generate_cache.py
@@ -18,10 +18,14 @@
import sys
import json
-import argparse
+from argparse import ArgumentParser
+from argparse import ArgumentTypeError
+from os import getcwd
+from os import makedirs
from os.path import join
from os.path import abspath
from os.path import dirname
+from os.path import basename
from xmltodict import parse
_here = dirname(__file__)
@@ -29,9 +33,10 @@ _here = dirname(__file__)
sys.path.append(join(_here, '..'))
from configtree import reference_tree_to_json, ConfigTreeError
-xml_cache = abspath(join(_here, 'cache.py'))
xml_cache_json = 'xml_cache.json'
xml_tmp = join('/tmp', xml_cache_json)
+pkg_cache = abspath(join(_here, 'pkg_cache'))
+ref_cache = abspath(join(_here, 'cache.py'))
node_data_fields = ("node_type", "multi", "valueless", "default_value")
@@ -45,16 +50,26 @@ def trim_node_data(cache: dict):
if isinstance(cache[k], dict):
trim_node_data(cache[k])
+def non_trivial(s):
+ if not s:
+ raise ArgumentTypeError("Argument must be non empty string")
+ return s
+
def main():
- parser = argparse.ArgumentParser(description='generate and save dict from xml defintions')
+ parser = ArgumentParser(description='generate and save dict from xml defintions')
parser.add_argument('--xml-dir', type=str, required=True,
help='transcluded xml interface-definition directory')
- parser.add_argument('--save-json-dir', type=str,
- help='directory to save json cache if needed')
- args = parser.parse_args()
-
- xml_dir = abspath(args.xml_dir)
- save_dir = abspath(args.save_json_dir) if args.save_json_dir else None
+ parser.add_argument('--package-name', type=non_trivial, default='vyos-1x',
+ help='name of current package')
+ parser.add_argument('--output-path', help='path to generated cache')
+ args = vars(parser.parse_args())
+
+ xml_dir = abspath(args['xml_dir'])
+ pkg_name = args['package_name'].replace('-','_')
+ cache_name = pkg_name + '_cache.py'
+ out_path = args['output_path']
+ path = out_path if out_path is not None else pkg_cache
+ xml_cache = abspath(join(path, cache_name))
try:
reference_tree_to_json(xml_dir, xml_tmp)
@@ -67,21 +82,30 @@ def main():
trim_node_data(d)
- if save_dir is not None:
- save_file = join(save_dir, xml_cache_json)
- with open(save_file, 'w') as f:
- f.write(json.dumps(d))
-
syntax_version = join(xml_dir, 'xml-component-version.xml')
- with open(syntax_version) as f:
- content = f.read()
+ try:
+ with open(syntax_version) as f:
+ component = f.read()
+ except FileNotFoundError:
+ if pkg_name != 'vyos_1x':
+ component = ''
+ else:
+ print("\nWARNING: missing xml-component-version.xml\n")
+ sys.exit(1)
- parsed = parse(content)
- converted = parsed['interfaceDefinition']['syntaxVersion']
+ if component:
+ parsed = parse(component)
+ else:
+ parsed = None
version = {}
- for i in converted:
- tmp = {i['@component']: i['@version']}
- version |= tmp
+ # addon package definitions may have empty (== 0) version info
+ if parsed is not None and parsed['interfaceDefinition'] is not None:
+ converted = parsed['interfaceDefinition']['syntaxVersion']
+ if not isinstance(converted, list):
+ converted = [converted]
+ for i in converted:
+ tmp = {i['@component']: i['@version']}
+ version |= tmp
version = {"component_version": version}
@@ -90,5 +114,7 @@ def main():
with open(xml_cache, 'w') as f:
f.write(f'reference = {str(d)}')
+ print(cache_name)
+
if __name__ == '__main__':
main()
diff --git a/python/vyos/xml_ref/pkg_cache/__init__.py b/python/vyos/xml_ref/pkg_cache/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/python/vyos/xml_ref/pkg_cache/__init__.py
diff --git a/python/vyos/xml_ref/update_cache.py b/python/vyos/xml_ref/update_cache.py
new file mode 100755
index 000000000..0842bcbe9
--- /dev/null
+++ b/python/vyos/xml_ref/update_cache.py
@@ -0,0 +1,51 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 2023 VyOS maintainers and contributors
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 or later as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+#
+import os
+from copy import deepcopy
+from generate_cache import pkg_cache
+from generate_cache import ref_cache
+
+def dict_merge(source, destination):
+ dest = deepcopy(destination)
+
+ for key, value in source.items():
+ if key not in dest:
+ dest[key] = value
+ elif isinstance(source[key], dict):
+ dest[key] = dict_merge(source[key], dest[key])
+
+ return dest
+
+def main():
+ res = {}
+ cache_dir = os.path.basename(pkg_cache)
+ for mod in os.listdir(pkg_cache):
+ mod = os.path.splitext(mod)[0]
+ if not mod.endswith('_cache'):
+ continue
+ d = getattr(__import__(f'{cache_dir}.{mod}', fromlist=[mod]), 'reference')
+ if mod == 'vyos_1x_cache':
+ res = dict_merge(res, d)
+ else:
+ res = dict_merge(d, res)
+
+ with open(ref_cache, 'w') as f:
+ f.write(f'reference = {str(res)}')
+
+if __name__ == '__main__':
+ main()