summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/vyos/accel_ppp_util.py202
-rw-r--r--python/vyos/config.py51
-rw-r--r--python/vyos/config_mgmt.py46
-rw-r--r--python/vyos/configdep.py9
-rw-r--r--python/vyos/configdict.py57
-rw-r--r--python/vyos/configdiff.py24
-rw-r--r--python/vyos/configsession.py16
-rw-r--r--python/vyos/configtree.py3
-rw-r--r--python/vyos/configverify.py90
-rw-r--r--python/vyos/defaults.py21
-rw-r--r--python/vyos/ethtool.py34
-rw-r--r--python/vyos/firewall.py41
-rw-r--r--python/vyos/frr.py6
-rw-r--r--python/vyos/ifconfig/bond.py13
-rw-r--r--python/vyos/ifconfig/ethernet.py72
-rw-r--r--python/vyos/ifconfig/interface.py16
-rw-r--r--python/vyos/ifconfig/macsec.py4
-rw-r--r--python/vyos/ifconfig/vxlan.py48
-rw-r--r--python/vyos/ifconfig/wireguard.py5
-rw-r--r--python/vyos/kea.py364
-rw-r--r--python/vyos/load_config.py200
-rw-r--r--python/vyos/nat.py44
-rw-r--r--python/vyos/opmode.py5
-rw-r--r--python/vyos/qos/base.py50
-rw-r--r--python/vyos/qos/cake.py2
-rw-r--r--python/vyos/qos/trafficshaper.py108
-rw-r--r--python/vyos/remote.py131
-rw-r--r--python/vyos/system/__init__.py18
-rw-r--r--python/vyos/system/compat.py327
-rw-r--r--python/vyos/system/disk.py242
-rw-r--r--python/vyos/system/grub.py424
-rw-r--r--python/vyos/system/grub_util.py70
-rw-r--r--python/vyos/system/image.py279
-rw-r--r--python/vyos/system/raid.py122
-rw-r--r--python/vyos/template.py174
-rw-r--r--python/vyos/utils/config.py9
-rw-r--r--python/vyos/utils/convert.py5
-rw-r--r--python/vyos/utils/dict.py59
-rw-r--r--python/vyos/utils/file.py41
-rw-r--r--python/vyos/utils/io.py34
-rw-r--r--python/vyos/utils/network.py106
-rw-r--r--python/vyos/utils/process.py27
-rw-r--r--python/vyos/vpp.py315
43 files changed, 3345 insertions, 569 deletions
diff --git a/python/vyos/accel_ppp_util.py b/python/vyos/accel_ppp_util.py
new file mode 100644
index 000000000..d60402e48
--- /dev/null
+++ b/python/vyos/accel_ppp_util.py
@@ -0,0 +1,202 @@
+# Copyright 2023-2024 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+# The sole purpose of this module is to hold common functions used in
+# all kinds of implementations to verify the CLI configuration.
+# It is started by migrating the interfaces to the new get_config_dict()
+# approach which will lead to a lot of code that can be reused.
+
+# NOTE: imports should be as local as possible to the function which
+# makes use of it!
+
+from vyos import ConfigError
+from vyos.base import Warning
+from vyos.utils.dict import dict_search
+
+def get_pools_in_order(data: dict) -> list:
+ """Return a list of dictionaries representing pool data in the order
+ in which they should be allocated. Pool must be defined before we can
+ use it with 'next-pool' option.
+
+ Args:
+ data: A dictionary of pool data, where the keys are pool names and the
+ values are dictionaries containing the 'subnet' key and the optional
+ 'next_pool' key.
+
+ Returns:
+ list: A list of dictionaries
+
+ Raises:
+ ValueError: If a 'next_pool' key references a pool name that
+ has not been defined.
+ ValueError: If a circular reference is found in the 'next_pool' keys.
+
+ Example:
+ config_data = {
+ ... 'first-pool': {
+ ... 'next_pool': 'second-pool',
+ ... 'subnet': '192.0.2.0/25'
+ ... },
+ ... 'second-pool': {
+ ... 'next_pool': 'third-pool',
+ ... 'subnet': '203.0.113.0/25'
+ ... },
+ ... 'third-pool': {
+ ... 'subnet': '198.51.100.0/24'
+ ... },
+ ... 'foo': {
+ ... 'subnet': '100.64.0.0/24',
+ ... 'next_pool': 'second-pool'
+ ... }
+ ... }
+
+ % get_pools_in_order(config_data)
+ [{'third-pool': {'subnet': '198.51.100.0/24'}},
+ {'second-pool': {'next_pool': 'third-pool', 'subnet': '203.0.113.0/25'}},
+ {'first-pool': {'next_pool': 'second-pool', 'subnet': '192.0.2.0/25'}},
+ {'foo': {'next_pool': 'second-pool', 'subnet': '100.64.0.0/24'}}]
+ """
+ pools = []
+ unresolved_pools = {}
+
+ for pool, pool_config in data.items():
+ if "next_pool" not in pool_config or not pool_config["next_pool"]:
+ pools.insert(0, {pool: pool_config})
+ else:
+ unresolved_pools[pool] = pool_config
+
+ while unresolved_pools:
+ resolved_pools = []
+
+ for pool, pool_config in unresolved_pools.items():
+ next_pool_name = pool_config["next_pool"]
+
+ if any(p for p in pools if next_pool_name in p):
+ index = next(
+ (i for i, p in enumerate(pools) if next_pool_name in p), None
+ )
+ pools.insert(index + 1, {pool: pool_config})
+ resolved_pools.append(pool)
+ elif next_pool_name in unresolved_pools:
+ # next pool not yet resolved
+ pass
+ else:
+ raise ConfigError(
+ f"Pool '{next_pool_name}' not defined in configuration data"
+ )
+
+ if not resolved_pools:
+ raise ConfigError("Circular reference in configuration data")
+
+ for pool in resolved_pools:
+ unresolved_pools.pop(pool)
+
+ return pools
+
+
+def verify_accel_ppp_base_service(config, local_users=True):
+ """
+ Common helper function which must be used by all Accel-PPP services based
+ on get_config_dict()
+ """
+ # vertify auth settings
+ if local_users and dict_search("authentication.mode", config) == "local":
+ if (
+ dict_search("authentication.local_users", config) is None
+ or dict_search("authentication.local_users", config) == {}
+ ):
+ raise ConfigError(
+ "Authentication mode local requires local users to be configured!"
+ )
+
+ for user in dict_search("authentication.local_users.username", config):
+ user_config = config["authentication"]["local_users"]["username"][user]
+
+ if "password" not in user_config:
+ raise ConfigError(f'Password required for local user "{user}"')
+
+ if "rate_limit" in user_config:
+ # if up/download is set, check that both have a value
+ if not {"upload", "download"} <= set(user_config["rate_limit"]):
+ raise ConfigError(
+ f'User "{user}" has rate-limit configured for only one '
+ "direction but both upload and download must be given!"
+ )
+
+ elif dict_search("authentication.mode", config) == "radius":
+ if not dict_search("authentication.radius.server", config):
+ raise ConfigError("RADIUS authentication requires at least one server")
+
+ for server in dict_search("authentication.radius.server", config):
+ radius_config = config["authentication"]["radius"]["server"][server]
+ if "key" not in radius_config:
+ raise ConfigError(f'Missing RADIUS secret key for server "{server}"')
+
+ if "name_server_ipv4" in config:
+ if len(config["name_server_ipv4"]) > 2:
+ raise ConfigError(
+ "Not more then two IPv4 DNS name-servers " "can be configured"
+ )
+
+ if "name_server_ipv6" in config:
+ if len(config["name_server_ipv6"]) > 3:
+ raise ConfigError(
+ "Not more then three IPv6 DNS name-servers " "can be configured"
+ )
+
+
+
+def verify_accel_ppp_ip_pool(vpn_config):
+ """
+ Common helper function which must be used by Accel-PPP
+ services (pptp, l2tp, sstp, pppoe) to verify client-ip-pool
+ and client-ipv6-pool
+ """
+ if dict_search("client_ip_pool", vpn_config):
+ for pool_name, pool_config in vpn_config["client_ip_pool"].items():
+ next_pool = dict_search(f"next_pool", pool_config)
+ if next_pool:
+ if next_pool not in vpn_config["client_ip_pool"]:
+ raise ConfigError(
+ f'Next pool "{next_pool}" does not exist')
+ if not dict_search(f"range", pool_config):
+ raise ConfigError(
+ f'Pool "{pool_name}" does not contain range but next-pool exists'
+ )
+ if not dict_search("gateway_address", vpn_config):
+ Warning("IPv4 Server requires gateway-address to be configured!")
+
+ default_pool = dict_search("default_pool", vpn_config)
+ if default_pool:
+ if default_pool not in dict_search("client_ip_pool", vpn_config):
+ raise ConfigError(f'Default pool "{default_pool}" does not exists')
+
+ if 'client_ipv6_pool' in vpn_config:
+ for ipv6_pool, ipv6_pool_config in vpn_config['client_ipv6_pool'].items():
+ if 'delegate' in ipv6_pool_config and 'prefix' not in ipv6_pool_config:
+ raise ConfigError(
+ f'IPv6 delegate-prefix requires IPv6 prefix to be configured in "{ipv6_pool}"!')
+
+ if dict_search('authentication.mode', vpn_config) in ['local', 'noauth']:
+ if not dict_search('client_ip_pool', vpn_config) and not dict_search(
+ 'client_ipv6_pool', vpn_config):
+ raise ConfigError(
+ "Local auth mode requires local client-ip-pool or client-ipv6-pool to be configured!")
+ if dict_search('client_ip_pool', vpn_config) and not dict_search(
+ 'default_pool', vpn_config):
+ Warning("'default-pool' is not defined")
+ if dict_search('client_ipv6_pool', vpn_config) and not dict_search(
+ 'default_ipv6_pool', vpn_config):
+ Warning("'default-ipv6-pool' is not defined")
diff --git a/python/vyos/config.py b/python/vyos/config.py
index 0ca41718f..7619ad367 100644
--- a/python/vyos/config.py
+++ b/python/vyos/config.py
@@ -29,7 +29,7 @@ There are multiple types of config tree nodes in VyOS, each requires
its own set of operations.
*Leaf nodes* (such as "address" in interfaces) can have values, but cannot
-have children.
+have children.
Leaf nodes can have one value, multiple values, or no values at all.
For example, "system host-name" is a single-value leaf node,
@@ -92,6 +92,38 @@ def config_dict_merge(src: dict, dest: Union[dict, ConfigDict]) -> ConfigDict:
dest = ConfigDict(dest)
return ext_dict_merge(src, dest)
+def config_dict_mangle_acme(name, cli_dict):
+ """
+ Load CLI PKI dictionary and if an ACME certificate is used, load it's content
+ and place it into the CLI dictionary as it would be a "regular" CLI PKI based
+ certificate with private key
+ """
+ from vyos.base import ConfigError
+ from vyos.defaults import directories
+ from vyos.utils.file import read_file
+ from vyos.pki import encode_certificate
+ from vyos.pki import encode_private_key
+ from vyos.pki import load_certificate
+ from vyos.pki import load_private_key
+
+ try:
+ vyos_certbot_dir = directories['certbot']
+
+ if 'acme' in cli_dict:
+ tmp = read_file(f'{vyos_certbot_dir}/live/{name}/cert.pem')
+ tmp = load_certificate(tmp, wrap_tags=False)
+ cert_base64 = "".join(encode_certificate(tmp).strip().split("\n")[1:-1])
+
+ tmp = read_file(f'{vyos_certbot_dir}/live/{name}/privkey.pem')
+ tmp = load_private_key(tmp, wrap_tags=False)
+ key_base64 = "".join(encode_private_key(tmp).strip().split("\n")[1:-1])
+ # install ACME based PEM keys into "regular" CLI config keys
+ cli_dict.update({'certificate' : cert_base64, 'private' : {'key' : key_base64}})
+ except:
+ raise ConfigError(f'Unable to load ACME certificates for "{name}"!')
+
+ return cli_dict
+
class Config(object):
"""
The class of config access objects.
@@ -258,7 +290,9 @@ class Config(object):
def get_config_dict(self, path=[], effective=False, key_mangling=None,
get_first_key=False, no_multi_convert=False,
no_tag_node_value_mangle=False,
- with_defaults=False, with_recursive_defaults=False):
+ with_defaults=False,
+ with_recursive_defaults=False,
+ with_pki=False):
"""
Args:
path (str list): Configuration tree path, can be empty
@@ -274,6 +308,7 @@ class Config(object):
del kwargs['no_multi_convert']
del kwargs['with_defaults']
del kwargs['with_recursive_defaults']
+ del kwargs['with_pki']
lpath = self._make_path(path)
root_dict = self.get_cached_root_dict(effective)
@@ -298,6 +333,18 @@ class Config(object):
else:
conf_dict = ConfigDict(conf_dict)
+ if with_pki and conf_dict:
+ pki_dict = self.get_config_dict(['pki'], key_mangling=('-', '_'),
+ no_tag_node_value_mangle=True,
+ get_first_key=True)
+ if pki_dict:
+ if 'certificate' in pki_dict:
+ for certificate in pki_dict['certificate']:
+ pki_dict['certificate'][certificate] = config_dict_mangle_acme(
+ certificate, pki_dict['certificate'][certificate])
+
+ conf_dict['pki'] = pki_dict
+
# save optional args for a call to get_config_defaults
setattr(conf_dict, '_dict_kwargs', kwargs)
diff --git a/python/vyos/config_mgmt.py b/python/vyos/config_mgmt.py
index 654a8d698..ff078649d 100644
--- a/python/vyos/config_mgmt.py
+++ b/python/vyos/config_mgmt.py
@@ -22,13 +22,15 @@ import logging
from typing import Optional, Tuple, Union
from filecmp import cmp
from datetime import datetime
-from textwrap import dedent
+from textwrap import dedent, indent
from pathlib import Path
from tabulate import tabulate
from shutil import copy, chown
+from urllib.parse import urlsplit, urlunsplit
from vyos.config import Config
from vyos.configtree import ConfigTree, ConfigTreeError, show_diff
+from vyos.load_config import load, LoadConfigError
from vyos.defaults import directories
from vyos.version import get_full_version_data
from vyos.utils.io import ask_yes_no
@@ -124,6 +126,7 @@ class ConfigMgmt:
get_first_key=True)
self.max_revisions = int(d.get('commit_revisions', 0))
+ self.num_revisions = 0
self.locations = d.get('commit_archive', {}).get('location', [])
self.source_address = d.get('commit_archive',
{}).get('source_address', '')
@@ -232,7 +235,7 @@ Proceed ?'''
msg = ''
if not self._check_revision_number(rev):
- msg = f'Invalid revision number {rev}: must be 0 < rev < {maxrev}'
+ msg = f'Invalid revision number {rev}: must be 0 < rev < {self.num_revisions}'
return msg, 1
prompt_str = 'Proceed with reboot ?'
@@ -260,6 +263,23 @@ Proceed ?'''
return msg, 0
+ def rollback_soft(self, rev: int):
+ """Rollback without reboot (rollback-soft)
+ """
+ msg = ''
+
+ if not self._check_revision_number(rev):
+ msg = f'Invalid revision number {rev}: must be 0 < rev < {self.num_revisions}'
+ return msg, 1
+
+ rollback_ct = self._get_config_tree_revision(rev)
+ try:
+ load(rollback_ct, switch='explicit')
+ except LoadConfigError as e:
+ raise ConfigMgmtError(e) from e
+
+ return msg, 0
+
def compare(self, saved: bool=False, commands: bool=False,
rev1: Optional[int]=None,
rev2: Optional[int]=None) -> Tuple[str,int]:
@@ -377,7 +397,13 @@ Proceed ?'''
remote_file = f'config.boot-{hostname}.{timestamp}'
source_address = self.source_address
+ if self.effective_locations:
+ print("Archiving config...")
for location in self.effective_locations:
+ url = urlsplit(location)
+ _, _, netloc = url.netloc.rpartition("@")
+ redacted_location = urlunsplit(url._replace(netloc=netloc))
+ print(f" {redacted_location}", end=" ", flush=True)
upload(archive_config_file, f'{location}/{remote_file}',
source_host=source_address)
@@ -448,13 +474,10 @@ Proceed ?'''
# utility functions
#
- @staticmethod
- def _strip_version(s):
- return re.split(r'(^//)', s, maxsplit=1, flags=re.MULTILINE)[0]
def _get_saved_config_tree(self):
with open(config_file) as f:
- c = self._strip_version(f.read())
+ c = f.read()
return ConfigTree(c)
def _get_file_revision(self, rev: int):
@@ -466,7 +489,7 @@ Proceed ?'''
return r
def _get_config_tree_revision(self, rev: int):
- c = self._strip_version(self._get_file_revision(rev))
+ c = self._get_file_revision(rev)
return ConfigTree(c)
def _add_logrotate_conf(self):
@@ -546,8 +569,8 @@ Proceed ?'''
return len(l)
def _check_revision_number(self, rev: int) -> bool:
- maxrev = self._get_number_of_revisions()
- if not 0 <= rev < maxrev:
+ self.num_revisions = self._get_number_of_revisions()
+ if not 0 <= rev < self.num_revisions:
return False
return True
@@ -689,6 +712,11 @@ def run():
rollback.add_argument('-y', dest='no_prompt', action='store_true',
help="Excute without prompt")
+ rollback_soft = subparsers.add_parser('rollback_soft',
+ help="Rollback to earlier config")
+ rollback_soft.add_argument('--rev', type=int,
+ help="Revision number for rollback")
+
compare = subparsers.add_parser('compare',
help="Compare config files")
diff --git a/python/vyos/configdep.py b/python/vyos/configdep.py
index 05d9a3fa3..64727d355 100644
--- a/python/vyos/configdep.py
+++ b/python/vyos/configdep.py
@@ -43,7 +43,7 @@ def canon_name_of_path(path: str) -> str:
return canon_name(script)
def caller_name() -> str:
- return stack()[-1].filename
+ return stack()[2].filename
def read_dependency_dict(dependency_dir: str = dependency_dir) -> dict:
res = {}
@@ -107,6 +107,13 @@ def call_dependents():
f = l.pop(0)
f()
+def called_as_dependent() -> bool:
+ st = stack()[1:]
+ for f in st:
+ if f.filename == __file__:
+ return True
+ return False
+
def graph_from_dependency_dict(d: dict) -> dict:
g = {}
for k in list(d):
diff --git a/python/vyos/configdict.py b/python/vyos/configdict.py
index 71a06b625..4111d7271 100644
--- a/python/vyos/configdict.py
+++ b/python/vyos/configdict.py
@@ -104,6 +104,10 @@ def list_diff(first, second):
return [item for item in first if item not in second]
def is_node_changed(conf, path):
+ """
+ Check if any key under path has been changed and return True.
+ If nothing changed, return false
+ """
from vyos.configdiff import get_config_diff
D = get_config_diff(conf, key_mangling=('-', '_'))
return D.is_node_changed(path)
@@ -139,17 +143,30 @@ def leaf_node_changed(conf, path):
return None
-def node_changed(conf, path, key_mangling=None, recursive=False):
+def node_changed(conf, path, key_mangling=None, recursive=False, expand_nodes=None) -> list:
"""
- Check if a leaf node was altered. If it has been altered - values has been
- changed, or it was added/removed, we will return the old value. If nothing
- has been changed, None is returned
+ Check if node under path (or anything under path if recursive=True) was changed. By default
+ we only check if a node or subnode (recursive) was deleted from path. If expand_nodes
+ is set to Diff.ADD we can also check if something was added to the path.
+
+ If nothing changed, an empty list is returned.
"""
- from vyos.configdiff import get_config_diff, Diff
+ from vyos.configdiff import get_config_diff
+ from vyos.configdiff import Diff
+ # to prevent circular dependencies we assign the default here
+ if not expand_nodes: expand_nodes = Diff.DELETE
D = get_config_diff(conf, key_mangling)
- # get_child_nodes() will return dict_keys(), mangle this into a list with PEP448
- keys = D.get_child_nodes_diff(path, expand_nodes=Diff.DELETE, recursive=recursive)['delete'].keys()
- return list(keys)
+ # get_child_nodes_diff() will return dict_keys()
+ tmp = D.get_child_nodes_diff(path, expand_nodes=expand_nodes, recursive=recursive)
+ output = []
+ if expand_nodes & Diff.DELETE:
+ output.extend(list(tmp['delete'].keys()))
+ if expand_nodes & Diff.ADD:
+ output.extend(list(tmp['add'].keys()))
+
+ # remove duplicate keys from list, this happens when a node (e.g. description) is altered
+ output = list(dict.fromkeys(output))
+ return output
def get_removed_vlans(conf, path, dict):
"""
@@ -258,10 +275,10 @@ def has_address_configured(conf, intf):
old_level = conf.get_level()
conf.set_level([])
- intfpath = 'interfaces ' + Section.get_config_path(intf)
- if ( conf.exists(f'{intfpath} address') or
- conf.exists(f'{intfpath} ipv6 address autoconf') or
- conf.exists(f'{intfpath} ipv6 address eui64') ):
+ intfpath = ['interfaces', Section.get_config_path(intf)]
+ if (conf.exists([intfpath, 'address']) or
+ conf.exists([intfpath, 'ipv6', 'address', 'autoconf']) or
+ conf.exists([intfpath, 'ipv6', 'address', 'eui64'])):
ret = True
conf.set_level(old_level)
@@ -279,8 +296,7 @@ def has_vrf_configured(conf, intf):
old_level = conf.get_level()
conf.set_level([])
- tmp = ['interfaces', Section.get_config_path(intf), 'vrf']
- if conf.exists(tmp):
+ if conf.exists(['interfaces', Section.get_config_path(intf), 'vrf']):
ret = True
conf.set_level(old_level)
@@ -298,8 +314,7 @@ def has_vlan_subinterface_configured(conf, intf):
ret = False
intfpath = ['interfaces', Section.section(intf), intf]
- if ( conf.exists(intfpath + ['vif']) or
- conf.exists(intfpath + ['vif-s'])):
+ if (conf.exists(intfpath + ['vif']) or conf.exists(intfpath + ['vif-s'])):
ret = True
return ret
@@ -412,7 +427,7 @@ def get_pppoe_interfaces(conf, vrf=None):
return pppoe_interfaces
-def get_interface_dict(config, base, ifname='', recursive_defaults=True):
+def get_interface_dict(config, base, ifname='', recursive_defaults=True, with_pki=False):
"""
Common utility function to retrieve and mangle the interfaces configuration
from the CLI input nodes. All interfaces have a common base where value
@@ -444,7 +459,8 @@ def get_interface_dict(config, base, ifname='', recursive_defaults=True):
get_first_key=True,
no_tag_node_value_mangle=True,
with_defaults=True,
- with_recursive_defaults=recursive_defaults)
+ with_recursive_defaults=recursive_defaults,
+ with_pki=with_pki)
# If interface does not request an IPv4 DHCP address there is no need
# to keep the dhcp-options key
@@ -608,7 +624,7 @@ def get_vlan_ids(interface):
return vlan_ids
-def get_accel_dict(config, base, chap_secrets):
+def get_accel_dict(config, base, chap_secrets, with_pki=False):
"""
Common utility function to retrieve and mangle the Accel-PPP configuration
from different CLI input nodes. All Accel-PPP services have a common base
@@ -623,7 +639,8 @@ def get_accel_dict(config, base, chap_secrets):
dict = config.get_config_dict(base, key_mangling=('-', '_'),
get_first_key=True,
no_tag_node_value_mangle=True,
- with_recursive_defaults=True)
+ with_recursive_defaults=True,
+ with_pki=with_pki)
# set CPUs cores to process requests
dict.update({'thread_count' : get_half_cpus()})
diff --git a/python/vyos/configdiff.py b/python/vyos/configdiff.py
index 1ec2dfafe..03b06c6d9 100644
--- a/python/vyos/configdiff.py
+++ b/python/vyos/configdiff.py
@@ -165,6 +165,30 @@ class ConfigDiff(object):
return True
return False
+ def node_changed_presence(self, path=[]) -> bool:
+ if self._diff_tree is None:
+ raise NotImplementedError("diff_tree class not available")
+
+ path = self._make_path(path)
+ before = self._diff_tree.left.exists(path)
+ after = self._diff_tree.right.exists(path)
+ return (before and not after) or (not before and after)
+
+ def node_changed_children(self, path=[]) -> list:
+ if self._diff_tree is None:
+ raise NotImplementedError("diff_tree class not available")
+
+ path = self._make_path(path)
+ add = self._diff_tree.add
+ sub = self._diff_tree.sub
+ children = set()
+ if add.exists(path):
+ children.update(add.list_nodes(path))
+ if sub.exists(path):
+ children.update(sub.list_nodes(path))
+
+ return list(children)
+
def get_child_nodes_diff_str(self, path=[]):
ret = {'add': {}, 'change': {}, 'delete': {}}
diff --git a/python/vyos/configsession.py b/python/vyos/configsession.py
index 6d4b2af59..90842b749 100644
--- a/python/vyos/configsession.py
+++ b/python/vyos/configsession.py
@@ -30,11 +30,15 @@ SHOW_CONFIG = ['/bin/cli-shell-api', 'showConfig']
LOAD_CONFIG = ['/bin/cli-shell-api', 'loadFile']
MIGRATE_LOAD_CONFIG = ['/usr/libexec/vyos/vyos-load-config.py']
SAVE_CONFIG = ['/usr/libexec/vyos/vyos-save-config.py']
-INSTALL_IMAGE = ['/opt/vyatta/sbin/install-image', '--url']
-REMOVE_IMAGE = ['/opt/vyatta/bin/vyatta-boot-image.pl', '--del']
+INSTALL_IMAGE = ['/usr/libexec/vyos/op_mode/image_installer.py',
+ '--action', 'add', '--no-prompt', '--image-path']
+REMOVE_IMAGE = ['/usr/libexec/vyos/op_mode/image_manager.py',
+ '--action', 'delete', '--no-prompt', '--image-name']
GENERATE = ['/opt/vyatta/bin/vyatta-op-cmd-wrapper', 'generate']
SHOW = ['/opt/vyatta/bin/vyatta-op-cmd-wrapper', 'show']
RESET = ['/opt/vyatta/bin/vyatta-op-cmd-wrapper', 'reset']
+REBOOT = ['/opt/vyatta/bin/vyatta-op-cmd-wrapper', 'reboot']
+POWEROFF = ['/opt/vyatta/bin/vyatta-op-cmd-wrapper', 'poweroff']
OP_CMD_ADD = ['/opt/vyatta/bin/vyatta-op-cmd-wrapper', 'add']
OP_CMD_DELETE = ['/opt/vyatta/bin/vyatta-op-cmd-wrapper', 'delete']
@@ -220,10 +224,18 @@ class ConfigSession(object):
out = self.__run_command(SHOW + path)
return out
+ def reboot(self, path):
+ out = self.__run_command(REBOOT + path)
+ return out
+
def reset(self, path):
out = self.__run_command(RESET + path)
return out
+ def poweroff(self, path):
+ out = self.__run_command(POWEROFF + path)
+ return out
+
def add_container_image(self, name):
out = self.__run_command(OP_CMD_ADD + ['container', 'image'] + [name])
return out
diff --git a/python/vyos/configtree.py b/python/vyos/configtree.py
index 09cfd43d3..d048901f0 100644
--- a/python/vyos/configtree.py
+++ b/python/vyos/configtree.py
@@ -160,6 +160,9 @@ class ConfigTree(object):
def _get_config(self):
return self.__config
+ def get_version_string(self):
+ return self.__version
+
def to_string(self, ordered_values=False):
config_string = self.__to_string(self.__config, ordered_values).decode()
config_string = "{0}\n{1}".format(config_string, self.__version)
diff --git a/python/vyos/configverify.py b/python/vyos/configverify.py
index 52f9238b8..5d3723876 100644
--- a/python/vyos/configverify.py
+++ b/python/vyos/configverify.py
@@ -1,4 +1,4 @@
-# Copyright 2020-2023 VyOS maintainers and contributors <maintainers@vyos.io>
+# Copyright 2020-2024 VyOS maintainers and contributors <maintainers@vyos.io>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -25,6 +25,9 @@ from vyos import ConfigError
from vyos.utils.dict import dict_search
from vyos.utils.dict import dict_search_recursive
+# pattern re-used in ipsec migration script
+dynamic_interface_pattern = r'(ppp|pppoe|sstpc|l2tp|ipoe)[0-9]+'
+
def verify_mtu(config):
"""
Common helper function used by interface implementations to perform
@@ -281,16 +284,22 @@ def verify_source_interface(config):
perform recurring validation of the existence of a source-interface
required by e.g. peth/MACvlan, MACsec ...
"""
+ import re
from netifaces import interfaces
- if 'source_interface' not in config:
- raise ConfigError('Physical source-interface required for '
- 'interface "{ifname}"'.format(**config))
- if config['source_interface'] not in interfaces():
- raise ConfigError('Specified source-interface {source_interface} does '
- 'not exist'.format(**config))
+ ifname = config['ifname']
+ if 'source_interface' not in config:
+ raise ConfigError(f'Physical source-interface required for "{ifname}"!')
src_ifname = config['source_interface']
+ # We do not allow sourcing other interfaces (e.g. tunnel) from dynamic interfaces
+ tmp = re.compile(dynamic_interface_pattern)
+ if tmp.match(src_ifname):
+ raise ConfigError(f'Can not source "{ifname}" from dynamic interface "{src_ifname}"!')
+
+ if src_ifname not in interfaces():
+ raise ConfigError(f'Specified source-interface {src_ifname} does not exist')
+
if 'source_interface_is_bridge_member' in config:
bridge_name = next(iter(config['source_interface_is_bridge_member']))
raise ConfigError(f'Invalid source-interface "{src_ifname}". Interface '
@@ -303,7 +312,6 @@ def verify_source_interface(config):
if 'is_source_interface' in config:
tmp = config['is_source_interface']
- src_ifname = config['source_interface']
raise ConfigError(f'Can not use source-interface "{src_ifname}", it already ' \
f'belongs to interface "{tmp}"!')
@@ -385,72 +393,6 @@ def verify_vlan_config(config):
verify_mtu_parent(c_vlan, config)
verify_mtu_parent(c_vlan, s_vlan)
-def verify_accel_ppp_base_service(config, local_users=True):
- """
- Common helper function which must be used by all Accel-PPP services based
- on get_config_dict()
- """
- # vertify auth settings
- if local_users and dict_search('authentication.mode', config) == 'local':
- if (dict_search(f'authentication.local_users', config) is None or
- dict_search(f'authentication.local_users', config) == {}):
- raise ConfigError(
- 'Authentication mode local requires local users to be configured!')
-
- for user in dict_search('authentication.local_users.username', config):
- user_config = config['authentication']['local_users']['username'][user]
-
- if 'password' not in user_config:
- raise ConfigError(f'Password required for local user "{user}"')
-
- if 'rate_limit' in user_config:
- # if up/download is set, check that both have a value
- if not {'upload', 'download'} <= set(user_config['rate_limit']):
- raise ConfigError(f'User "{user}" has rate-limit configured for only one ' \
- 'direction but both upload and download must be given!')
-
- elif dict_search('authentication.mode', config) == 'radius':
- if not dict_search('authentication.radius.server', config):
- raise ConfigError('RADIUS authentication requires at least one server')
-
- for server in dict_search('authentication.radius.server', config):
- radius_config = config['authentication']['radius']['server'][server]
- if 'key' not in radius_config:
- raise ConfigError(f'Missing RADIUS secret key for server "{server}"')
-
- # Check global gateway or gateway in named pool
- gateway = False
- if 'gateway_address' in config:
- gateway = True
- else:
- if 'client_ip_pool' in config:
- if dict_search_recursive(config, 'gateway_address', ['client_ip_pool', 'name']):
- for _, v in config['client_ip_pool']['name'].items():
- if 'gateway_address' in v:
- gateway = True
- break
- if not gateway:
- raise ConfigError('Server requires gateway-address to be configured!')
-
- if 'name_server_ipv4' in config:
- if len(config['name_server_ipv4']) > 2:
- raise ConfigError('Not more then two IPv4 DNS name-servers ' \
- 'can be configured')
-
- if 'name_server_ipv6' in config:
- if len(config['name_server_ipv6']) > 3:
- raise ConfigError('Not more then three IPv6 DNS name-servers ' \
- 'can be configured')
-
- if 'client_ipv6_pool' in config:
- ipv6_pool = config['client_ipv6_pool']
- if 'delegate' in ipv6_pool:
- if 'prefix' not in ipv6_pool:
- raise ConfigError('IPv6 "delegate" also requires "prefix" to be defined!')
-
- for delegate in ipv6_pool['delegate']:
- if 'delegation_prefix' not in ipv6_pool['delegate'][delegate]:
- raise ConfigError('delegation-prefix length required!')
def verify_diffie_hellman_length(file, min_keysize):
""" Verify Diffie-Hellamn keypair length given via file. It must be greater
diff --git a/python/vyos/defaults.py b/python/vyos/defaults.py
index a229533bd..64145a42e 100644
--- a/python/vyos/defaults.py
+++ b/python/vyos/defaults.py
@@ -37,6 +37,7 @@ directories = {
}
config_status = '/tmp/vyos-config-status'
+api_config_state = '/run/http-api-state'
cfg_group = 'vyattacfg'
@@ -45,23 +46,3 @@ cfg_vintage = 'vyos'
commit_lock = '/opt/vyatta/config/.lock'
component_version_json = os.path.join(directories['data'], 'component-versions.json')
-
-https_data = {
- 'listen_addresses' : { '*': ['_'] }
-}
-
-api_data = {
- 'listen_address' : '127.0.0.1',
- 'port' : '8080',
- 'socket' : False,
- 'strict' : False,
- 'debug' : False,
- 'api_keys' : [ {'id' : 'testapp', 'key' : 'qwerty'} ]
-}
-
-vyos_cert_data = {
- 'conf' : '/etc/nginx/snippets/vyos-cert.conf',
- 'crt' : '/etc/ssl/certs/vyos-selfsigned.crt',
- 'key' : '/etc/ssl/private/vyos-selfsign',
- 'lifetime' : '365',
-}
diff --git a/python/vyos/ethtool.py b/python/vyos/ethtool.py
index f19632719..ba638b280 100644
--- a/python/vyos/ethtool.py
+++ b/python/vyos/ethtool.py
@@ -23,6 +23,7 @@ from vyos.utils.process import popen
_drivers_without_speed_duplex_flow = ['vmxnet3', 'virtio_net', 'xen_netfront',
'iavf', 'ice', 'i40e', 'hv_netvsc', 'veth', 'ixgbevf',
'tun']
+_drivers_without_eee = ['vmxnet3', 'virtio_net', 'xen_netfront', 'hv_netvsc']
class Ethtool:
"""
@@ -55,16 +56,18 @@ class Ethtool:
_auto_negotiation_supported = None
_flow_control = False
_flow_control_enabled = None
+ _eee = False
+ _eee_enabled = None
def __init__(self, ifname):
# Get driver used for interface
- out, err = popen(f'ethtool --driver {ifname}')
+ out, _ = popen(f'ethtool --driver {ifname}')
driver = re.search(r'driver:\s(\w+)', out)
if driver:
self._driver_name = driver.group(1)
# Build a dictinary of supported link-speed and dupley settings.
- out, err = popen(f'ethtool {ifname}')
+ out, _ = popen(f'ethtool {ifname}')
reading = False
pattern = re.compile(r'\d+base.*')
for line in out.splitlines()[1:]:
@@ -95,7 +98,7 @@ class Ethtool:
self._auto_negotiation = bool(tmp == 'on')
# Now populate features dictionaty
- out, err = popen(f'ethtool --show-features {ifname}')
+ out, _ = popen(f'ethtool --show-features {ifname}')
# skip the first line, it only says: "Features for eth0":
for line in out.splitlines()[1:]:
if ":" in line:
@@ -108,7 +111,7 @@ class Ethtool:
'fixed' : fixed
}
- out, err = popen(f'ethtool --show-ring {ifname}')
+ out, _ = popen(f'ethtool --show-ring {ifname}')
# We are only interested in line 2-5 which contains the device maximum
# ringbuffers
for line in out.splitlines()[2:6]:
@@ -133,13 +136,22 @@ class Ethtool:
# Get current flow control settings, but this is not supported by
# all NICs (e.g. vmxnet3 does not support is)
- out, err = popen(f'ethtool --show-pause {ifname}')
+ out, _ = popen(f'ethtool --show-pause {ifname}')
if len(out.splitlines()) > 1:
self._flow_control = True
# read current flow control setting, this returns:
# ['Autonegotiate:', 'on']
self._flow_control_enabled = out.splitlines()[1].split()[-1]
+ # Get current Energy Efficient Ethernet (EEE) settings, but this is
+ # not supported by all NICs (e.g. vmxnet3 does not support is)
+ out, _ = popen(f'ethtool --show-eee {ifname}')
+ if len(out.splitlines()) > 1:
+ self._eee = True
+ # read current EEE setting, this returns:
+ # EEE status: disabled || EEE status: enabled - inactive || EEE status: enabled - active
+ self._eee_enabled = bool('enabled' in out.splitlines()[2])
+
def check_auto_negotiation_supported(self):
""" Check if the NIC supports changing auto-negotiation """
return self._auto_negotiation_supported
@@ -227,3 +239,15 @@ class Ethtool:
raise ValueError('Interface does not support changing '\
'flow-control settings!')
return self._flow_control_enabled
+
+ def check_eee(self):
+ """ Check if the NIC supports eee """
+ if self.get_driver_name() in _drivers_without_eee:
+ return False
+ return self._eee
+
+ def get_eee(self):
+ if self._eee_enabled == None:
+ raise ValueError('Interface does not support changing '\
+ 'EEE settings!')
+ return self._eee_enabled
diff --git a/python/vyos/firewall.py b/python/vyos/firewall.py
index c07ed1adf..eee11bd2d 100644
--- a/python/vyos/firewall.py
+++ b/python/vyos/firewall.py
@@ -87,7 +87,6 @@ def nft_action(vyos_action):
def parse_rule(rule_conf, hook, fw_name, rule_id, ip_name):
output = []
- #def_suffix = '6' if ip_name == 'ip6' else ''
if ip_name == 'ip6':
def_suffix = '6'
@@ -97,7 +96,7 @@ def parse_rule(rule_conf, hook, fw_name, rule_id, ip_name):
family = 'bri' if ip_name == 'bri' else 'ipv4'
if 'state' in rule_conf and rule_conf['state']:
- states = ",".join([s for s, v in rule_conf['state'].items() if v == 'enable'])
+ states = ",".join([s for s in rule_conf['state']])
if states:
output.append(f'ct state {{{states}}}')
@@ -227,6 +226,14 @@ def parse_rule(rule_conf, hook, fw_name, rule_id, ip_name):
operator = '!=' if exclude else '=='
operator = f'& {address_mask} {operator}'
output.append(f'{ip_name} {prefix}addr {operator} @A{def_suffix}_{group_name}')
+ elif 'dynamic_address_group' in group:
+ group_name = group['dynamic_address_group']
+ operator = ''
+ exclude = group_name[0] == "!"
+ if exclude:
+ operator = '!='
+ group_name = group_name[1:]
+ output.append(f'{ip_name} {prefix}addr {operator} @DA{def_suffix}_{group_name}')
# Generate firewall group domain-group
elif 'domain_group' in group:
group_name = group['domain_group']
@@ -275,14 +282,14 @@ def parse_rule(rule_conf, hook, fw_name, rule_id, ip_name):
if 'inbound_interface' in rule_conf:
operator = ''
- if 'interface_name' in rule_conf['inbound_interface']:
- iiface = rule_conf['inbound_interface']['interface_name']
+ if 'name' in rule_conf['inbound_interface']:
+ iiface = rule_conf['inbound_interface']['name']
if iiface[0] == '!':
operator = '!='
iiface = iiface[1:]
output.append(f'iifname {operator} {{{iiface}}}')
- else:
- iiface = rule_conf['inbound_interface']['interface_group']
+ elif 'group' in rule_conf['inbound_interface']:
+ iiface = rule_conf['inbound_interface']['group']
if iiface[0] == '!':
operator = '!='
iiface = iiface[1:]
@@ -290,14 +297,14 @@ def parse_rule(rule_conf, hook, fw_name, rule_id, ip_name):
if 'outbound_interface' in rule_conf:
operator = ''
- if 'interface_name' in rule_conf['outbound_interface']:
- oiface = rule_conf['outbound_interface']['interface_name']
+ if 'name' in rule_conf['outbound_interface']:
+ oiface = rule_conf['outbound_interface']['name']
if oiface[0] == '!':
operator = '!='
oiface = oiface[1:]
output.append(f'oifname {operator} {{{oiface}}}')
- else:
- oiface = rule_conf['outbound_interface']['interface_group']
+ elif 'group' in rule_conf['outbound_interface']:
+ oiface = rule_conf['outbound_interface']['group']
if oiface[0] == '!':
operator = '!='
oiface = oiface[1:]
@@ -395,7 +402,7 @@ def parse_rule(rule_conf, hook, fw_name, rule_id, ip_name):
if 'priority' in rule_conf['vlan']:
output.append(f'vlan pcp {rule_conf["vlan"]["priority"]}')
- if 'log' in rule_conf and rule_conf['log'] == 'enable':
+ if 'log' in rule_conf:
action = rule_conf['action'] if 'action' in rule_conf else 'accept'
#output.append(f'log prefix "[{fw_name[:19]}-{rule_id}-{action[:1].upper()}]"')
output.append(f'log prefix "[{family}-{hook}-{fw_name}-{rule_id}-{action[:1].upper()}]"')
@@ -420,6 +427,18 @@ def parse_rule(rule_conf, hook, fw_name, rule_id, ip_name):
output.append('counter')
+ if 'add_address_to_group' in rule_conf:
+ for side in ['destination_address', 'source_address']:
+ if side in rule_conf['add_address_to_group']:
+ prefix = side[0]
+ side_conf = rule_conf['add_address_to_group'][side]
+ dyn_group = side_conf['address_group']
+ if 'timeout' in side_conf:
+ timeout_value = side_conf['timeout']
+ output.append(f'set update ip{def_suffix} {prefix}addr timeout {timeout_value} @DA{def_suffix}_{dyn_group}')
+ else:
+ output.append(f'set update ip{def_suffix} saddr @DA{def_suffix}_{dyn_group}')
+
if 'set' in rule_conf:
output.append(parse_policy_set(rule_conf['set'], def_suffix))
diff --git a/python/vyos/frr.py b/python/vyos/frr.py
index ad5c207f5..a01d967e4 100644
--- a/python/vyos/frr.py
+++ b/python/vyos/frr.py
@@ -86,12 +86,8 @@ ch2 = logging.StreamHandler(stream=sys.stdout)
LOG.addHandler(ch)
LOG.addHandler(ch2)
-# Full list of FRR 9.0/stable daemons for reference
-#_frr_daemons = ['zebra', 'staticd', 'bgpd', 'ospfd', 'ospf6d', 'ripd', 'ripngd',
-# 'isisd', 'pim6d', 'ldpd', 'eigrpd', 'babeld', 'sharpd', 'bfdd',
-# 'fabricd', 'pathd']
_frr_daemons = ['zebra', 'staticd', 'bgpd', 'ospfd', 'ospf6d', 'ripd', 'ripngd',
- 'isisd', 'pim6d', 'ldpd', 'babeld', 'bfdd']
+ 'isisd', 'pimd', 'pim6d', 'ldpd', 'eigrpd', 'babeld', 'bfdd']
path_vtysh = '/usr/bin/vtysh'
path_frr_reload = '/usr/lib/frr/frr-reload.py'
diff --git a/python/vyos/ifconfig/bond.py b/python/vyos/ifconfig/bond.py
index d1d7d48c4..45e6e4c16 100644
--- a/python/vyos/ifconfig/bond.py
+++ b/python/vyos/ifconfig/bond.py
@@ -92,6 +92,19 @@ class BondIf(Interface):
}
}}
+ @staticmethod
+ def get_inherit_bond_options() -> list:
+ """
+ Returns list of option
+ which are inherited from bond interface to member interfaces
+ :return: List of interface options
+ :rtype: list
+ """
+ options = [
+ 'mtu'
+ ]
+ return options
+
def remove(self):
"""
Remove interface from operating system. Removing the interface
diff --git a/python/vyos/ifconfig/ethernet.py b/python/vyos/ifconfig/ethernet.py
index 285542057..c3f5bbf47 100644
--- a/python/vyos/ifconfig/ethernet.py
+++ b/python/vyos/ifconfig/ethernet.py
@@ -19,6 +19,7 @@ from glob import glob
from vyos.base import Warning
from vyos.ethtool import Ethtool
+from vyos.ifconfig import Section
from vyos.ifconfig.interface import Interface
from vyos.utils.dict import dict_search
from vyos.utils.file import read_file
@@ -75,6 +76,40 @@ class EthernetIf(Interface):
},
}}
+ @staticmethod
+ def get_bond_member_allowed_options() -> list:
+ """
+ Return list of options which are allowed for changing,
+ when interface is a bond member
+ :return: List of interface options
+ :rtype: list
+ """
+ bond_allowed_sections = [
+ 'description',
+ 'disable',
+ 'disable_flow_control',
+ 'disable_link_detect',
+ 'duplex',
+ 'eapol.ca_certificate',
+ 'eapol.certificate',
+ 'eapol.passphrase',
+ 'mirror.egress',
+ 'mirror.ingress',
+ 'offload.gro',
+ 'offload.gso',
+ 'offload.lro',
+ 'offload.rfs',
+ 'offload.rps',
+ 'offload.sg',
+ 'offload.tso',
+ 'redirect',
+ 'ring_buffer.rx',
+ 'ring_buffer.tx',
+ 'speed',
+ 'hw_id'
+ ]
+ return bond_allowed_sections
+
def __init__(self, ifname, **kargs):
super().__init__(ifname, **kargs)
self.ethtool = Ethtool(ifname)
@@ -94,6 +129,10 @@ class EthernetIf(Interface):
# will remain visible for the operating system.
self.set_admin_state('down')
+ # Remove all VLAN subinterfaces - filter with the VLAN dot
+ for vlan in [x for x in Section.interfaces(self.iftype) if x.startswith(f'{self.ifname}.')]:
+ Interface(vlan).remove()
+
super().remove()
def set_flow_control(self, enable):
@@ -365,6 +404,34 @@ class EthernetIf(Interface):
print(f'could not set "{rx_tx}" ring-buffer for {ifname}')
return output
+ def set_eee(self, enable):
+ """
+ Enable/Disable Energy Efficient Ethernet (EEE) settings
+
+ Example:
+ >>> from vyos.ifconfig import EthernetIf
+ >>> i = EthernetIf('eth0')
+ >>> i.set_eee(enable=False)
+ """
+ if not isinstance(enable, bool):
+ raise ValueError('Value out of range')
+
+ if not self.ethtool.check_eee():
+ self._debug_msg(f'NIC driver does not support changing EEE settings!')
+ return False
+
+ current = self.ethtool.get_eee()
+ if current != enable:
+ # Assemble command executed on system. Unfortunately there is no way
+ # to change this setting via sysfs
+ cmd = f'ethtool --set-eee {self.ifname} eee '
+ cmd += 'on' if enable else 'off'
+ output, code = self._popen(cmd)
+ if code:
+ Warning(f'could not change "{self.ifname}" EEE setting!')
+ return output
+ return None
+
def update(self, config):
""" General helper function which works on a dictionary retrived by
get_config_dict(). It's main intention is to consolidate the scattered
@@ -375,6 +442,9 @@ class EthernetIf(Interface):
value = 'off' if 'disable_flow_control' in config else 'on'
self.set_flow_control(value)
+ # Always disable Energy Efficient Ethernet
+ self.set_eee(False)
+
# GRO (generic receive offload)
self.set_gro(dict_search('offload.gro', config) != None)
@@ -382,7 +452,7 @@ class EthernetIf(Interface):
self.set_gso(dict_search('offload.gso', config) != None)
# GSO (generic segmentation offload)
- self.set_hw_tc_offload(dict_search('offload.hw-tc-offload', config) != None)
+ self.set_hw_tc_offload(dict_search('offload.hw_tc_offload', config) != None)
# LRO (large receive offload)
self.set_lro(dict_search('offload.lro', config) != None)
diff --git a/python/vyos/ifconfig/interface.py b/python/vyos/ifconfig/interface.py
index 050095364..56dcde214 100644
--- a/python/vyos/ifconfig/interface.py
+++ b/python/vyos/ifconfig/interface.py
@@ -115,7 +115,7 @@ class Interface(Control):
},
'vrf': {
'shellcmd': 'ip -json -detail link list dev {ifname}',
- 'format': lambda j: jmespath.search('[*].master | [0]', json.loads(j)),
+ 'format': lambda j: jmespath.search('[?linkinfo.info_slave_kind == `vrf`].master | [0]', json.loads(j)),
},
}
@@ -496,12 +496,12 @@ class Interface(Control):
from hashlib import sha256
# Get processor ID number
- cpu_id = self._cmd('sudo dmidecode -t 4 | grep ID | head -n1 | sed "s/.*ID://;s/ //g"')
+ cpu_id = self._cmd('sudo dmidecode -t 4 | grep ID | head -n1 | sed "s/.*ID://;s/ //g"')
# XXX: T3894 - it seems not all systems have eth0 - get a list of all
# available Ethernet interfaces on the system (without VLAN subinterfaces)
# and then take the first one.
- all_eth_ifs = [x for x in Section.interfaces('ethernet') if '.' not in x]
+ all_eth_ifs = Section.interfaces('ethernet', vlan=False)
first_mac = Interface(all_eth_ifs[0]).get_mac()
sha = sha256()
@@ -571,6 +571,16 @@ class Interface(Control):
self._cmd(f'ip link set dev {self.ifname} netns {netns}')
return True
+ def get_vrf(self):
+ """
+ Get VRF from interface
+
+ Example:
+ >>> from vyos.ifconfig import Interface
+ >>> Interface('eth0').get_vrf()
+ """
+ return self.get_interface('vrf')
+
def set_vrf(self, vrf: str) -> bool:
"""
Add/Remove interface from given VRF instance.
diff --git a/python/vyos/ifconfig/macsec.py b/python/vyos/ifconfig/macsec.py
index 9329c5ee7..bde1d9aec 100644
--- a/python/vyos/ifconfig/macsec.py
+++ b/python/vyos/ifconfig/macsec.py
@@ -45,6 +45,10 @@ class MACsecIf(Interface):
# create tunnel interface
cmd = 'ip link add link {source_interface} {ifname} type {type}'.format(**self.config)
cmd += f' cipher {self.config["security"]["cipher"]}'
+
+ if 'encrypt' in self.config["security"]:
+ cmd += ' encrypt on'
+
self._cmd(cmd)
# Check if using static keys
diff --git a/python/vyos/ifconfig/vxlan.py b/python/vyos/ifconfig/vxlan.py
index 1fe5db7cd..a2c4aad50 100644
--- a/python/vyos/ifconfig/vxlan.py
+++ b/python/vyos/ifconfig/vxlan.py
@@ -22,6 +22,7 @@ from vyos.utils.assertion import assert_list
from vyos.utils.dict import dict_search
from vyos.utils.network import get_interface_config
from vyos.utils.network import get_vxlan_vlan_tunnels
+from vyos.utils.network import get_vxlan_vni_filter
@Interface.register
class VXLANIf(Interface):
@@ -56,6 +57,10 @@ class VXLANIf(Interface):
}
_command_set = {**Interface._command_set, **{
+ 'neigh_suppress': {
+ 'validate': lambda v: assert_list(v, ['on', 'off']),
+ 'shellcmd': 'bridge link set dev {ifname} neigh_suppress {value} learning off',
+ },
'vlan_tunnel': {
'validate': lambda v: assert_list(v, ['on', 'off']),
'shellcmd': 'bridge link set dev {ifname} vlan_tunnel {value}',
@@ -68,13 +73,14 @@ class VXLANIf(Interface):
# - https://man7.org/linux/man-pages/man8/ip-link.8.html
mapping = {
'group' : 'group',
- 'external' : 'external',
'gpe' : 'gpe',
+ 'parameters.external' : 'external',
'parameters.ip.df' : 'df',
'parameters.ip.tos' : 'tos',
'parameters.ip.ttl' : 'ttl',
'parameters.ipv6.flowlabel' : 'flowlabel',
'parameters.nolearning' : 'nolearning',
+ 'parameters.vni_filter' : 'vnifilter',
'remote' : 'remote',
'source_address' : 'local',
'source_interface' : 'dev',
@@ -106,13 +112,26 @@ class VXLANIf(Interface):
# interface is always A/D down. It needs to be enabled explicitly
self.set_admin_state('down')
- # VXLAN tunnel is always recreated on any change - see interfaces-vxlan.py
+ # VXLAN tunnel is always recreated on any change - see interfaces_vxlan.py
if remote_list:
for remote in remote_list:
cmd = f'bridge fdb append to 00:00:00:00:00:00 dst {remote} ' \
'port {port} dev {ifname}'
self._cmd(cmd.format(**self.config))
+ def set_neigh_suppress(self, state):
+ """
+ Controls whether neigh discovery (arp and nd) proxy and suppression
+ is enabled on the port. By default this flag is off.
+ """
+
+ # Determine current OS Kernel neigh_suppress setting - only adjust when needed
+ tmp = get_interface_config(self.ifname)
+ cur_state = 'on' if dict_search(f'linkinfo.info_slave_data.neigh_suppress', tmp) == True else 'off'
+ new_state = 'on' if state else 'off'
+ if cur_state != new_state:
+ self.set_interface('neigh_suppress', state)
+
def set_vlan_vni_mapping(self, state):
"""
Controls whether vlan to tunnel mapping is enabled on the port.
@@ -121,10 +140,14 @@ class VXLANIf(Interface):
if not isinstance(state, bool):
raise ValueError('Value out of range')
- cur_vlan_ids = []
if 'vlan_to_vni_removed' in self.config:
- cur_vlan_ids = self.config['vlan_to_vni_removed']
- for vlan in cur_vlan_ids:
+ cur_vni_filter = get_vxlan_vni_filter(self.ifname)
+ for vlan, vlan_config in self.config['vlan_to_vni_removed'].items():
+ # If VNI filtering is enabled, remove matching VNI filter
+ if dict_search('parameters.vni_filter', self.config) != None:
+ vni = vlan_config['vni']
+ if vni in cur_vni_filter:
+ self._cmd(f'bridge vni delete dev {self.ifname} vni {vni}')
self._cmd(f'bridge vlan del dev {self.ifname} vid {vlan}')
# Determine current OS Kernel vlan_tunnel setting - only adjust when needed
@@ -134,10 +157,9 @@ class VXLANIf(Interface):
if cur_state != new_state:
self.set_interface('vlan_tunnel', new_state)
- # Determine current OS Kernel configured VLANs
- os_configured_vlan_ids = get_vxlan_vlan_tunnels(self.ifname)
-
if 'vlan_to_vni' in self.config:
+ # Determine current OS Kernel configured VLANs
+ os_configured_vlan_ids = get_vxlan_vlan_tunnels(self.ifname)
add_vlan = list_diff(list(self.config['vlan_to_vni'].keys()), os_configured_vlan_ids)
for vlan, vlan_config in self.config['vlan_to_vni'].items():
@@ -151,6 +173,10 @@ class VXLANIf(Interface):
self._cmd(f'bridge vlan add dev {self.ifname} vid {vlan}')
self._cmd(f'bridge vlan add dev {self.ifname} vid {vlan} tunnel_info id {vni}')
+ # If VNI filtering is enabled, install matching VNI filter
+ if dict_search('parameters.vni_filter', self.config) != None:
+ self._cmd(f'bridge vni add dev {self.ifname} vni {vni}')
+
def update(self, config):
""" General helper function which works on a dictionary retrived by
get_config_dict(). It's main intention is to consolidate the scattered
@@ -163,3 +189,9 @@ class VXLANIf(Interface):
# Enable/Disable VLAN tunnel mapping
# This is only possible after the interface was assigned to the bridge
self.set_vlan_vni_mapping(dict_search('vlan_to_vni', config) != None)
+
+ # Enable/Disable neighbor suppression and learning, there is no need to
+ # explicitly "disable" it, as VXLAN interface will be recreated if anything
+ # under "parameters" changes.
+ if dict_search('parameters.neighbor_suppress', config) != None:
+ self.set_neigh_suppress('on')
diff --git a/python/vyos/ifconfig/wireguard.py b/python/vyos/ifconfig/wireguard.py
index 4aac103ec..5704f8b64 100644
--- a/python/vyos/ifconfig/wireguard.py
+++ b/python/vyos/ifconfig/wireguard.py
@@ -167,11 +167,6 @@ class WireGuardIf(Interface):
interface setup code and provide a single point of entry when workin
on any interface. """
- # remove no longer associated peers first
- if 'peer_remove' in config:
- for peer, public_key in config['peer_remove'].items():
- self._cmd(f'wg set {self.ifname} peer {public_key} remove')
-
tmp_file = NamedTemporaryFile('w')
tmp_file.write(config['private_key'])
tmp_file.flush()
diff --git a/python/vyos/kea.py b/python/vyos/kea.py
new file mode 100644
index 000000000..720bebec3
--- /dev/null
+++ b/python/vyos/kea.py
@@ -0,0 +1,364 @@
+# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+import json
+import os
+import socket
+
+from datetime import datetime
+
+from vyos.template import is_ipv6
+from vyos.template import isc_static_route
+from vyos.template import netmask_from_cidr
+from vyos.utils.dict import dict_search_args
+from vyos.utils.file import file_permissions
+from vyos.utils.file import read_file
+from vyos.utils.process import run
+
+kea4_options = {
+ 'name_server': 'domain-name-servers',
+ 'domain_name': 'domain-name',
+ 'domain_search': 'domain-search',
+ 'ntp_server': 'ntp-servers',
+ 'pop_server': 'pop-server',
+ 'smtp_server': 'smtp-server',
+ 'time_server': 'time-servers',
+ 'wins_server': 'netbios-name-servers',
+ 'default_router': 'routers',
+ 'server_identifier': 'dhcp-server-identifier',
+ 'tftp_server_name': 'tftp-server-name',
+ 'bootfile_size': 'boot-size',
+ 'time_offset': 'time-offset',
+ 'wpad_url': 'wpad-url',
+ 'ipv6_only_preferred': 'v6-only-preferred',
+ 'captive_portal': 'v4-captive-portal'
+}
+
+kea6_options = {
+ 'info_refresh_time': 'information-refresh-time',
+ 'name_server': 'dns-servers',
+ 'domain_search': 'domain-search',
+ 'nis_domain': 'nis-domain-name',
+ 'nis_server': 'nis-servers',
+ 'nisplus_domain': 'nisp-domain-name',
+ 'nisplus_server': 'nisp-servers',
+ 'sntp_server': 'sntp-servers',
+ 'captive_portal': 'v6-captive-portal'
+}
+
+def kea_parse_options(config):
+ options = []
+
+ for node, option_name in kea4_options.items():
+ if node not in config:
+ continue
+
+ value = ", ".join(config[node]) if isinstance(config[node], list) else config[node]
+ options.append({'name': option_name, 'data': value})
+
+ if 'client_prefix_length' in config:
+ options.append({'name': 'subnet-mask', 'data': netmask_from_cidr('0.0.0.0/' + config['client_prefix_length'])})
+
+ if 'ip_forwarding' in config:
+ options.append({'name': 'ip-forwarding', 'data': "true"})
+
+ if 'static_route' in config:
+ default_route = ''
+
+ if 'default_router' in config:
+ default_route = isc_static_route('0.0.0.0/0', config['default_router'])
+
+ routes = [isc_static_route(route, route_options['next_hop']) for route, route_options in config['static_route'].items()]
+
+ options.append({'name': 'rfc3442-static-route', 'data': ", ".join(routes if not default_route else routes + [default_route])})
+ options.append({'name': 'windows-static-route', 'data': ", ".join(routes)})
+
+ if 'time_zone' in config:
+ with open("/usr/share/zoneinfo/" + config['time_zone'], "rb") as f:
+ tz_string = f.read().split(b"\n")[-2].decode("utf-8")
+
+ options.append({'name': 'pcode', 'data': tz_string})
+ options.append({'name': 'tcode', 'data': config['time_zone']})
+
+ unifi_controller = dict_search_args(config, 'vendor_option', 'ubiquiti', 'unifi_controller')
+ if unifi_controller:
+ options.append({
+ 'name': 'unifi-controller',
+ 'data': unifi_controller,
+ 'space': 'ubnt'
+ })
+
+ return options
+
+def kea_parse_subnet(subnet, config):
+ out = {'subnet': subnet, 'id': int(config['subnet_id'])}
+ options = []
+
+ if 'option' in config:
+ out['option-data'] = kea_parse_options(config['option'])
+
+ if 'bootfile_name' in config['option']:
+ out['boot-file-name'] = config['option']['bootfile_name']
+
+ if 'bootfile_server' in config['option']:
+ out['next-server'] = config['option']['bootfile_server']
+
+ if 'lease' in config:
+ out['valid-lifetime'] = int(config['lease'])
+ out['max-valid-lifetime'] = int(config['lease'])
+
+ if 'range' in config:
+ pools = []
+ for num, range_config in config['range'].items():
+ start, stop = range_config['start'], range_config['stop']
+ pool = {
+ 'pool': f'{start} - {stop}'
+ }
+
+ if 'option' in range_config:
+ pool['option-data'] = kea_parse_options(range_config['option'])
+
+ if 'bootfile_name' in range_config['option']:
+ pool['boot-file-name'] = range_config['option']['bootfile_name']
+
+ if 'bootfile_server' in range_config['option']:
+ pool['next-server'] = range_config['option']['bootfile_server']
+
+ pools.append(pool)
+ out['pools'] = pools
+
+ if 'static_mapping' in config:
+ reservations = []
+ for host, host_config in config['static_mapping'].items():
+ if 'disable' in host_config:
+ continue
+
+ reservation = {
+ 'hostname': host,
+ }
+
+ if 'mac' in host_config:
+ reservation['hw-address'] = host_config['mac']
+
+ if 'duid' in host_config:
+ reservation['duid'] = host_config['duid']
+
+ if 'ip_address' in host_config:
+ reservation['ip-address'] = host_config['ip_address']
+
+ if 'option' in host_config:
+ reservation['option-data'] = kea_parse_options(host_config['option'])
+
+ if 'bootfile_name' in host_config['option']:
+ reservation['boot-file-name'] = host_config['option']['bootfile_name']
+
+ if 'bootfile_server' in host_config['option']:
+ reservation['next-server'] = host_config['option']['bootfile_server']
+
+ reservations.append(reservation)
+ out['reservations'] = reservations
+
+ return out
+
+def kea6_parse_options(config):
+ options = []
+
+ for node, option_name in kea6_options.items():
+ if node not in config:
+ continue
+
+ value = ", ".join(config[node]) if isinstance(config[node], list) else config[node]
+ options.append({'name': option_name, 'data': value})
+
+ if 'sip_server' in config:
+ sip_servers = config['sip_server']
+
+ addrs = []
+ hosts = []
+
+ for server in sip_servers:
+ if is_ipv6(server):
+ addrs.append(server)
+ else:
+ hosts.append(server)
+
+ if addrs:
+ options.append({'name': 'sip-server-addr', 'data': ", ".join(addrs)})
+
+ if hosts:
+ options.append({'name': 'sip-server-dns', 'data': ", ".join(hosts)})
+
+ cisco_tftp = dict_search_args(config, 'vendor_option', 'cisco', 'tftp-server')
+ if cisco_tftp:
+ options.append({'name': 'tftp-servers', 'code': 2, 'space': 'cisco', 'data': cisco_tftp})
+
+ return options
+
+def kea6_parse_subnet(subnet, config):
+ out = {'subnet': subnet, 'id': int(config['subnet_id'])}
+
+ if 'option' in config:
+ out['option-data'] = kea6_parse_options(config['option'])
+
+ if 'range' in config:
+ pools = []
+ for num, range_config in config['range'].items():
+ pool = {}
+
+ if 'prefix' in range_config:
+ pool['pool'] = range_config['prefix']
+
+ if 'start' in range_config:
+ start = range_config['start']
+ stop = range_config['stop']
+ pool['pool'] = f'{start} - {stop}'
+
+ if 'option' in range_config:
+ pool['option-data'] = kea6_parse_options(range_config['option'])
+
+ pools.append(pool)
+
+ out['pools'] = pools
+
+ if 'prefix_delegation' in config:
+ pd_pools = []
+
+ if 'prefix' in config['prefix_delegation']:
+ for prefix, pd_conf in config['prefix_delegation']['prefix'].items():
+ pd_pool = {
+ 'prefix': prefix,
+ 'prefix-len': int(pd_conf['prefix_length']),
+ 'delegated-len': int(pd_conf['delegated_length'])
+ }
+
+ if 'excluded_prefix' in pd_conf:
+ pd_pool['excluded-prefix'] = pd_conf['excluded_prefix']
+ pd_pool['excluded-prefix-len'] = int(pd_conf['excluded_prefix_length'])
+
+ pd_pools.append(pd_pool)
+
+ out['pd-pools'] = pd_pools
+
+ if 'lease_time' in config:
+ if 'default' in config['lease_time']:
+ out['valid-lifetime'] = int(config['lease_time']['default'])
+ if 'maximum' in config['lease_time']:
+ out['max-valid-lifetime'] = int(config['lease_time']['maximum'])
+ if 'minimum' in config['lease_time']:
+ out['min-valid-lifetime'] = int(config['lease_time']['minimum'])
+
+ if 'static_mapping' in config:
+ reservations = []
+ for host, host_config in config['static_mapping'].items():
+ if 'disable' in host_config:
+ continue
+
+ reservation = {
+ 'hostname': host
+ }
+
+ if 'mac' in host_config:
+ reservation['hw-address'] = host_config['mac']
+
+ if 'duid' in host_config:
+ reservation['duid'] = host_config['duid']
+
+ if 'ipv6_address' in host_config:
+ reservation['ip-addresses'] = [ host_config['ipv6_address'] ]
+
+ if 'ipv6_prefix' in host_config:
+ reservation['prefixes'] = [ host_config['ipv6_prefix'] ]
+
+ if 'option' in host_config:
+ reservation['option-data'] = kea6_parse_options(host_config['option'])
+
+ reservations.append(reservation)
+
+ out['reservations'] = reservations
+
+ return out
+
+def kea_parse_leases(lease_path):
+ contents = read_file(lease_path)
+ lines = contents.split("\n")
+ output = []
+
+ if len(lines) < 2:
+ return output
+
+ headers = lines[0].split(",")
+
+ for line in lines[1:]:
+ line_out = dict(zip(headers, line.split(",")))
+
+ lifetime = int(line_out['valid_lifetime'])
+ expiry = int(line_out['expire'])
+
+ line_out['start_timestamp'] = datetime.utcfromtimestamp(expiry - lifetime)
+ line_out['expire_timestamp'] = datetime.utcfromtimestamp(expiry) if expiry else None
+
+ output.append(line_out)
+
+ return output
+
+def _ctrl_socket_command(path, command, args=None):
+ if not os.path.exists(path):
+ return None
+
+ if file_permissions(path) != '0775':
+ run(f'sudo chmod 775 {path}')
+
+ with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
+ sock.connect(path)
+
+ payload = {'command': command}
+ if args:
+ payload['arguments'] = args
+
+ sock.send(bytes(json.dumps(payload), 'utf-8'))
+ result = b''
+ while True:
+ data = sock.recv(4096)
+ result += data
+ if len(data) < 4096:
+ break
+
+ return json.loads(result.decode('utf-8'))
+
+def kea_get_active_config(inet):
+ ctrl_socket = f'/run/kea/dhcp{inet}-ctrl-socket'
+
+ config = _ctrl_socket_command(ctrl_socket, 'config-get')
+
+ if not config or 'result' not in config or config['result'] != 0:
+ return None
+
+ return config
+
+def kea_get_pool_from_subnet_id(config, inet, subnet_id):
+ shared_networks = dict_search_args(config, 'arguments', f'Dhcp{inet}', 'shared-networks')
+
+ if not shared_networks:
+ return None
+
+ for network in shared_networks:
+ if f'subnet{inet}' not in network:
+ continue
+
+ for subnet in network[f'subnet{inet}']:
+ if 'id' in subnet and int(subnet['id']) == int(subnet_id):
+ return network['name']
+
+ return None
diff --git a/python/vyos/load_config.py b/python/vyos/load_config.py
new file mode 100644
index 000000000..af563614d
--- /dev/null
+++ b/python/vyos/load_config.py
@@ -0,0 +1,200 @@
+# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+"""This module abstracts the loading of a config file into the running
+config. It provides several varieties of loading a config file, from the
+legacy version to the developing versions, as a means of offering
+alternatives for competing use cases, and a base for profiling the
+performance of each.
+"""
+
+import sys
+from pathlib import Path
+from tempfile import NamedTemporaryFile
+from typing import Union, Literal, TypeAlias, get_type_hints, get_args
+
+from vyos.config import Config
+from vyos.configtree import ConfigTree, DiffTree
+from vyos.configsource import ConfigSourceSession, VyOSError
+from vyos.component_version import from_string as version_from_string
+from vyos.component_version import from_system as version_from_system
+from vyos.migrator import Migrator, VirtualMigrator, MigratorError
+from vyos.utils.process import popen, DEVNULL
+
+Variety: TypeAlias = Literal['explicit', 'batch', 'tree', 'legacy']
+ConfigObj: TypeAlias = Union[str, ConfigTree]
+
+thismod = sys.modules[__name__]
+
+class LoadConfigError(Exception):
+ """Raised when an error occurs loading a config file.
+ """
+
+# utility functions
+
+def get_running_config(config: Config) -> ConfigTree:
+ return config.get_config_tree(effective=True)
+
+def get_proposed_config(config_file: str = None) -> ConfigTree:
+ config_str = Path(config_file).read_text()
+ return ConfigTree(config_str)
+
+def migration_needed(config_obj: ConfigObj) -> bool:
+ """Check if a migration is needed for the config object.
+ """
+ if not isinstance(config_obj, ConfigTree):
+ atree = get_proposed_config(config_obj)
+ else:
+ atree = config_obj
+ version_str = atree.get_version_string()
+ if not version_str:
+ return True
+ aversion = version_from_string(version_str.splitlines()[1])
+ bversion = version_from_system()
+ return aversion != bversion
+
+def check_session(strict: bool, switch: Variety) -> None:
+ """Check if we are in a config session, with no uncommitted changes, if
+ strict. This is not needed for legacy load, as these checks are
+ implicit.
+ """
+
+ if switch == 'legacy':
+ return
+
+ context = ConfigSourceSession()
+
+ if not context.in_session():
+ raise LoadConfigError('not in a config session')
+
+ if strict and context.session_changed():
+ raise LoadConfigError('commit or discard changes before loading config')
+
+# methods to call for each variety
+
+# explicit
+def diff_to_commands(ctree: ConfigTree, ntree: ConfigTree) -> list:
+ """Calculate the diff between the current and proposed config."""
+ # Calculate the diff between the current and new config tree
+ commands = DiffTree(ctree, ntree).to_commands()
+ # on an empty set of 'add' or 'delete' commands, to_commands
+ # returns '\n'; prune below
+ command_list = commands.splitlines()
+ command_list = [c for c in command_list if c]
+ return command_list
+
+def set_commands(cmds: list) -> None:
+ """Set commands in the config session."""
+ if not cmds:
+ print('no commands to set')
+ return
+ error_out = []
+ for op in cmds:
+ out, rc = popen(f'/opt/vyatta/sbin/my_{op}', shell=True, stderr=DEVNULL)
+ if rc != 0:
+ error_out.append(out)
+ continue
+ if error_out:
+ out = '\n'.join(error_out)
+ raise LoadConfigError(out)
+
+# legacy
+class LoadConfig(ConfigSourceSession):
+ """A subclass for calling 'loadFile'.
+ """
+ def load_config(self, file_name):
+ return self._run(['/bin/cli-shell-api','loadFile', file_name])
+
+# end methods to call for each variety
+
+def migrate(config_obj: ConfigObj) -> ConfigObj:
+ """Migrate a config object to the current version.
+ """
+ if isinstance(config_obj, ConfigTree):
+ config_file = NamedTemporaryFile(delete=False).name
+ Path(config_file).write_text(config_obj.to_string())
+ else:
+ config_file = config_obj
+
+ virtual_migration = VirtualMigrator(config_file)
+ migration = Migrator(config_file)
+ try:
+ virtual_migration.run()
+ migration.run()
+ except MigratorError as e:
+ raise LoadConfigError(e) from e
+ else:
+ if isinstance(config_obj, ConfigTree):
+ return ConfigTree(Path(config_file).read_text())
+ return config_file
+ finally:
+ if isinstance(config_obj, ConfigTree):
+ Path(config_file).unlink()
+
+def load_explicit(config_obj: ConfigObj):
+ """Explicit load from file or configtree.
+ """
+ config = Config()
+ ctree = get_running_config(config)
+ if isinstance(config_obj, ConfigTree):
+ ntree = config_obj
+ else:
+ ntree = get_proposed_config(config_obj)
+ # Calculate the diff between the current and proposed config
+ cmds = diff_to_commands(ctree, ntree)
+ # Set the commands in the config session
+ set_commands(cmds)
+
+def load_batch(config_obj: ConfigObj):
+ # requires legacy backend patch
+ raise NotImplementedError('batch loading not implemented')
+
+def load_tree(config_obj: ConfigObj):
+ # requires vyconf backend patch
+ raise NotImplementedError('tree loading not implemented')
+
+def load_legacy(config_obj: ConfigObj):
+ """Legacy load from file or configtree.
+ """
+ if isinstance(config_obj, ConfigTree):
+ config_file = NamedTemporaryFile(delete=False).name
+ Path(config_file).write_text(config_obj.to_string())
+ else:
+ config_file = config_obj
+
+ config = LoadConfig()
+
+ try:
+ config.load_config(config_file)
+ except VyOSError as e:
+ raise LoadConfigError(e) from e
+ finally:
+ if isinstance(config_obj, ConfigTree):
+ Path(config_file).unlink()
+
+def load(config_obj: ConfigObj, strict: bool = True,
+ switch: Variety = 'legacy'):
+ type_hints = get_type_hints(load)
+ switch_choice = get_args(type_hints['switch'])
+ if switch not in switch_choice:
+ raise ValueError(f'invalid switch: {switch}')
+
+ check_session(strict, switch)
+
+ if migration_needed(config_obj):
+ config_obj = migrate(config_obj)
+
+ func = getattr(thismod, f'load_{switch}')
+ func(config_obj)
diff --git a/python/vyos/nat.py b/python/vyos/nat.py
index 9cbc2b96e..7215aac88 100644
--- a/python/vyos/nat.py
+++ b/python/vyos/nat.py
@@ -32,14 +32,34 @@ def parse_nat_rule(rule_conf, rule_id, nat_type, ipv6=False):
translation_str = ''
if 'inbound_interface' in rule_conf:
- ifname = rule_conf['inbound_interface']
- if ifname != 'any':
- output.append(f'iifname "{ifname}"')
+ operator = ''
+ if 'name' in rule_conf['inbound_interface']:
+ iiface = rule_conf['inbound_interface']['name']
+ if iiface[0] == '!':
+ operator = '!='
+ iiface = iiface[1:]
+ output.append(f'iifname {operator} {{{iiface}}}')
+ else:
+ iiface = rule_conf['inbound_interface']['group']
+ if iiface[0] == '!':
+ operator = '!='
+ iiface = iiface[1:]
+ output.append(f'iifname {operator} @I_{iiface}')
if 'outbound_interface' in rule_conf:
- ifname = rule_conf['outbound_interface']
- if ifname != 'any':
- output.append(f'oifname "{ifname}"')
+ operator = ''
+ if 'name' in rule_conf['outbound_interface']:
+ oiface = rule_conf['outbound_interface']['name']
+ if oiface[0] == '!':
+ operator = '!='
+ oiface = oiface[1:]
+ output.append(f'oifname {operator} {{{oiface}}}')
+ else:
+ oiface = rule_conf['outbound_interface']['group']
+ if oiface[0] == '!':
+ operator = '!='
+ oiface = oiface[1:]
+ output.append(f'oifname {operator} @I_{oiface}')
if 'protocol' in rule_conf and rule_conf['protocol'] != 'all':
protocol = rule_conf['protocol']
@@ -69,7 +89,10 @@ def parse_nat_rule(rule_conf, rule_id, nat_type, ipv6=False):
if addr and is_ip_network(addr):
if not ipv6:
map_addr = dict_search_args(rule_conf, nat_type, 'address')
- translation_output.append(f'{ip_prefix} prefix to {ip_prefix} {translation_prefix}addr map {{ {map_addr} : {addr} }}')
+ if port:
+ translation_output.append(f'{ip_prefix} prefix to {ip_prefix} {translation_prefix}addr map {{ {map_addr} : {addr} . {port} }}')
+ else:
+ translation_output.append(f'{ip_prefix} prefix to {ip_prefix} {translation_prefix}addr map {{ {map_addr} : {addr} }}')
ignore_type_addr = True
else:
translation_output.append(f'prefix to {addr}')
@@ -92,7 +115,10 @@ def parse_nat_rule(rule_conf, rule_id, nat_type, ipv6=False):
if port_mapping and port_mapping != 'none':
options.append(port_mapping)
- translation_str = " ".join(translation_output) + (f':{port}' if port else '')
+ if ((not addr) or (addr and not is_ip_network(addr))) and port:
+ translation_str = " ".join(translation_output) + (f':{port}')
+ else:
+ translation_str = " ".join(translation_output)
if options:
translation_str += f' {",".join(options)}'
@@ -150,7 +176,7 @@ def parse_nat_rule(rule_conf, rule_id, nat_type, ipv6=False):
operator = ''
if addr_prefix[:1] == '!':
operator = '!='
- addr_prefix = addr[1:]
+ addr_prefix = addr_prefix[1:]
output.append(f'ip6 {prefix}addr {operator} {addr_prefix}')
port = dict_search_args(side_conf, 'port')
diff --git a/python/vyos/opmode.py b/python/vyos/opmode.py
index 230a85541..e1af1a682 100644
--- a/python/vyos/opmode.py
+++ b/python/vyos/opmode.py
@@ -1,4 +1,4 @@
-# Copyright 2022-2023 VyOS maintainers and contributors <maintainers@vyos.io>
+# Copyright 2022-2024 VyOS maintainers and contributors <maintainers@vyos.io>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -81,7 +81,7 @@ class InternalError(Error):
def _is_op_mode_function_name(name):
- if re.match(r"^(show|clear|reset|restart|add|delete|generate|set)", name):
+ if re.match(r"^(show|clear|reset|restart|add|update|delete|generate|set)", name):
return True
else:
return False
@@ -275,4 +275,3 @@ def run(module):
# Other functions should not return anything,
# although they may print their own warnings or status messages
func(**args)
-
diff --git a/python/vyos/qos/base.py b/python/vyos/qos/base.py
index d8bbfe970..a22039e52 100644
--- a/python/vyos/qos/base.py
+++ b/python/vyos/qos/base.py
@@ -1,4 +1,4 @@
-# Copyright 2022-2023 VyOS maintainers and contributors <maintainers@vyos.io>
+# Copyright 2022-2024 VyOS maintainers and contributors <maintainers@vyos.io>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -14,6 +14,7 @@
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
import os
+import jmespath
from vyos.base import Warning
from vyos.utils.process import cmd
@@ -166,14 +167,17 @@ class QoSBase:
}
if rate == 'auto' or rate.endswith('%'):
- speed = 10
+ speed = 1000
+ default_speed = speed
# Not all interfaces have valid entries in the speed file. PPPoE
# interfaces have the appropriate speed file, but you can not read it:
# cat: /sys/class/net/pppoe7/speed: Invalid argument
try:
speed = read_file(f'/sys/class/net/{self._interface}/speed')
if not speed.isnumeric():
- Warning('Interface speed cannot be determined (assuming 10 Mbit/s)')
+ Warning('Interface speed cannot be determined (assuming 1000 Mbit/s)')
+ if int(speed) < 1:
+ speed = default_speed
if rate.endswith('%'):
percent = rate.rstrip('%')
speed = int(speed) * int(percent) // 100
@@ -223,6 +227,9 @@ class QoSBase:
if 'mark' in match_config:
mark = match_config['mark']
filter_cmd += f' handle {mark} fw'
+ if 'vif' in match_config:
+ vif = match_config['vif']
+ filter_cmd += f' basic match "meta(vlan mask 0xfff eq {vif})"'
for af in ['ip', 'ipv6']:
tc_af = af
@@ -298,23 +305,28 @@ class QoSBase:
filter_cmd += f' flowid {self._parent:x}:{cls:x}'
self._cmd(filter_cmd)
+ vlan_expression = "match.*.vif"
+ match_vlan = jmespath.search(vlan_expression, cls_config)
+
if any(tmp in ['exceed', 'bandwidth', 'burst'] for tmp in cls_config):
- filter_cmd += f' action police'
-
- if 'exceed' in cls_config:
- action = cls_config['exceed']
- filter_cmd += f' conform-exceed {action}'
- if 'not_exceed' in cls_config:
- action = cls_config['not_exceed']
- filter_cmd += f'/{action}'
-
- if 'bandwidth' in cls_config:
- rate = self._rate_convert(cls_config['bandwidth'])
- filter_cmd += f' rate {rate}'
-
- if 'burst' in cls_config:
- burst = cls_config['burst']
- filter_cmd += f' burst {burst}'
+ # For "vif" "basic match" is used instead of "action police" T5961
+ if not match_vlan:
+ filter_cmd += f' action police'
+
+ if 'exceed' in cls_config:
+ action = cls_config['exceed']
+ filter_cmd += f' conform-exceed {action}'
+ if 'not_exceed' in cls_config:
+ action = cls_config['not_exceed']
+ filter_cmd += f'/{action}'
+
+ if 'bandwidth' in cls_config:
+ rate = self._rate_convert(cls_config['bandwidth'])
+ filter_cmd += f' rate {rate}'
+
+ if 'burst' in cls_config:
+ burst = cls_config['burst']
+ filter_cmd += f' burst {burst}'
cls = int(cls)
filter_cmd += f' flowid {self._parent:x}:{cls:x}'
self._cmd(filter_cmd)
diff --git a/python/vyos/qos/cake.py b/python/vyos/qos/cake.py
index a89b1de1e..1ee7d0fc3 100644
--- a/python/vyos/qos/cake.py
+++ b/python/vyos/qos/cake.py
@@ -38,6 +38,8 @@ class CAKE(QoSBase):
tmp += f' dual-dsthost'
if 'dual_src_host' in config['flow_isolation']:
tmp += f' dual-srchost'
+ if 'triple_isolate' in config['flow_isolation']:
+ tmp += f' triple-isolate'
if 'flow' in config['flow_isolation']:
tmp += f' flows'
if 'host' in config['flow_isolation']:
diff --git a/python/vyos/qos/trafficshaper.py b/python/vyos/qos/trafficshaper.py
index c63c7cf39..7d580baa2 100644
--- a/python/vyos/qos/trafficshaper.py
+++ b/python/vyos/qos/trafficshaper.py
@@ -1,4 +1,4 @@
-# Copyright 2022 VyOS maintainers and contributors <maintainers@vyos.io>
+# Copyright 2022-2024 VyOS maintainers and contributors <maintainers@vyos.io>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -39,7 +39,7 @@ class TrafficShaper(QoSBase):
# need a bigger r2q if going fast than 16 mbits/sec
if (speed_bps // r2q) >= MAXQUANTUM: # integer division
- r2q = ceil(speed_bps // MAXQUANTUM)
+ r2q = ceil(speed_bps / MAXQUANTUM)
else:
# if there is a slow class then may need smaller value
if 'class' in config:
@@ -89,19 +89,34 @@ class TrafficShaper(QoSBase):
if 'priority' in cls_config:
priority = cls_config['priority']
tmp += f' prio {priority}'
+
+ if 'ceiling' in cls_config:
+ f_ceil = self._rate_convert(cls_config['ceiling'])
+ tmp += f' ceil {f_ceil}'
self._cmd(tmp)
tmp = f'tc qdisc replace dev {self._interface} parent {self._parent:x}:{cls:x} sfq'
self._cmd(tmp)
if 'default' in config:
- rate = self._rate_convert(config['default']['bandwidth'])
+ if config['default']['bandwidth'].endswith('%'):
+ percent = config['default']['bandwidth'].rstrip('%')
+ rate = self._rate_convert(config['bandwidth']) * int(percent) // 100
+ else:
+ rate = self._rate_convert(config['default']['bandwidth'])
burst = config['default']['burst']
quantum = config['default']['codel_quantum']
tmp = f'tc class replace dev {self._interface} parent {self._parent:x}:1 classid {self._parent:x}:{default_minor_id:x} htb rate {rate} burst {burst} quantum {quantum}'
if 'priority' in config['default']:
priority = config['default']['priority']
tmp += f' prio {priority}'
+ if 'ceiling' in config['default']:
+ if config['default']['ceiling'].endswith('%'):
+ percent = config['default']['ceiling'].rstrip('%')
+ f_ceil = self._rate_convert(config['bandwidth']) * int(percent) // 100
+ else:
+ f_ceil = self._rate_convert(config['default']['ceiling'])
+ tmp += f' ceil {f_ceil}'
self._cmd(tmp)
tmp = f'tc qdisc replace dev {self._interface} parent {self._parent:x}:{default_minor_id:x} sfq'
@@ -110,8 +125,91 @@ class TrafficShaper(QoSBase):
# call base class
super().update(config, direction)
-class TrafficShaperHFSC(TrafficShaper):
+class TrafficShaperHFSC(QoSBase):
+ _parent = 1
+ qostype = 'shaper_hfsc'
+
+ # https://man7.org/linux/man-pages/man8/tc-hfsc.8.html
def update(self, config, direction):
+ class_id_max = 0
+ if 'class' in config:
+ tmp = list(config['class'])
+ tmp.sort()
+ class_id_max = tmp[-1]
+
+ r2q = 10
+ # bandwidth is a mandatory CLI node
+ speed = self._rate_convert(config['bandwidth'])
+ speed_bps = int(speed) // 8
+
+ # need a bigger r2q if going fast than 16 mbits/sec
+ if (speed_bps // r2q) >= MAXQUANTUM: # integer division
+ r2q = ceil(speed_bps // MAXQUANTUM)
+ else:
+ # if there is a slow class then may need smaller value
+ if 'class' in config:
+ min_speed = speed_bps
+ for cls, cls_options in config['class'].items():
+ # find class with the lowest bandwidth used
+ if 'bandwidth' in cls_options:
+ bw_bps = int(self._rate_convert(cls_options['bandwidth'])) // 8 # bandwidth in bytes per second
+ if bw_bps < min_speed:
+ min_speed = bw_bps
+
+ while (r2q > 1) and (min_speed // r2q) < MINQUANTUM:
+ tmp = r2q -1
+ if (speed_bps // tmp) >= MAXQUANTUM:
+ break
+ r2q = tmp
+
+ default_minor_id = int(class_id_max) +1
+ tmp = f'tc qdisc replace dev {self._interface} root handle {self._parent:x}: hfsc default {default_minor_id:x}' # default is in hex
+ self._cmd(tmp)
+
+ tmp = f'tc class replace dev {self._interface} parent {self._parent:x}: classid {self._parent:x}:1 hfsc sc rate {speed} ul rate {speed}'
+ self._cmd(tmp)
+
+ if 'class' in config:
+ for cls, cls_config in config['class'].items():
+ # class id is used later on and passed as hex, thus this needs to be an int
+ cls = int(cls)
+ # ls m1
+ if cls_config.get('linkshare', {}).get('m1').endswith('%'):
+ percent = cls_config['linkshare']['m1'].rstrip('%')
+ m_one_rate = self._rate_convert(config['bandwidth']) * int(percent) // 100
+ else:
+ m_one_rate = cls_config['linkshare']['m1']
+ # ls m2
+ if cls_config.get('linkshare', {}).get('m2').endswith('%'):
+ percent = cls_config['linkshare']['m2'].rstrip('%')
+ m_two_rate = self._rate_convert(config['bandwidth']) * int(percent) // 100
+ else:
+ m_two_rate = self._rate_convert(cls_config['linkshare']['m2'])
+
+ tmp = f'tc class replace dev {self._interface} parent {self._parent:x}:1 classid {self._parent:x}:{cls:x} hfsc ls m1 {m_one_rate} m2 {m_two_rate} '
+ self._cmd(tmp)
+
+ tmp = f'tc qdisc replace dev {self._interface} parent {self._parent:x}:{cls:x} sfq perturb 10'
+ self._cmd(tmp)
+
+ if 'default' in config:
+ # ls m1
+ if config.get('default', {}).get('linkshare', {}).get('m1').endswith('%'):
+ percent = config['default']['linkshare']['m1'].rstrip('%')
+ m_one_rate = self._rate_convert(config['default']['linkshare']['m1']) * int(percent) // 100
+ else:
+ m_one_rate = config['default']['linkshare']['m1']
+ # ls m2
+ if config.get('default', {}).get('linkshare', {}).get('m2').endswith('%'):
+ percent = config['default']['linkshare']['m2'].rstrip('%')
+ m_two_rate = self._rate_convert(config['default']['linkshare']['m2']) * int(percent) // 100
+ else:
+ m_two_rate = self._rate_convert(config['default']['linkshare']['m2'])
+ tmp = f'tc class replace dev {self._interface} parent {self._parent:x}:1 classid {self._parent:x}:{default_minor_id:x} hfsc ls m1 {m_one_rate} m2 {m_two_rate} '
+ self._cmd(tmp)
+
+ tmp = f'tc qdisc replace dev {self._interface} parent {self._parent:x}:{default_minor_id:x} sfq perturb 10'
+ self._cmd(tmp)
+
# call base class
super().update(config, direction)
-
diff --git a/python/vyos/remote.py b/python/vyos/remote.py
index b7c9ed176..129e65772 100644
--- a/python/vyos/remote.py
+++ b/python/vyos/remote.py
@@ -14,6 +14,7 @@
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
import os
+import pwd
import shutil
import socket
import ssl
@@ -22,6 +23,9 @@ import sys
import tempfile
import urllib.parse
+from contextlib import contextmanager
+from pathlib import Path
+
from ftplib import FTP
from ftplib import FTP_TLS
@@ -37,7 +41,7 @@ from vyos.utils.io import ask_yes_no
from vyos.utils.io import is_interactive
from vyos.utils.io import print_error
from vyos.utils.misc import begin
-from vyos.utils.process import cmd
+from vyos.utils.process import cmd, rc_cmd
from vyos.version import get_version
CHUNK_SIZE = 8192
@@ -73,6 +77,17 @@ class SourceAdapter(HTTPAdapter):
num_pools=connections, maxsize=maxsize,
block=block, source_address=self._source_pair)
+@contextmanager
+def umask(mask: int):
+ """
+ Context manager that temporarily sets the process umask.
+ """
+ import os
+ oldmask = os.umask(mask)
+ try:
+ yield
+ finally:
+ os.umask(oldmask)
def check_storage(path, size):
"""
@@ -133,7 +148,7 @@ class FtpC:
# Almost all FTP servers support the `SIZE' command.
size = conn.size(self.path)
if self.check_space:
- check_storage(path, size)
+ check_storage(location, size)
# No progressbar if we can't determine the size or if the file is too small.
if self.progressbar and size and size > CHUNK_SIZE:
with Progressbar(CHUNK_SIZE / size) as p:
@@ -310,25 +325,125 @@ class TftpC:
with open(location, 'rb') as f:
cmd(f'{self.command} -T - "{self.urlstring}"', input=f.read())
+class GitC:
+ def __init__(self,
+ url,
+ progressbar=False,
+ check_space=False,
+ source_host=None,
+ source_port=0,
+ timeout=10,
+ ):
+ self.command = 'git'
+ self.url = url
+ self.urlstring = urllib.parse.urlunsplit(url)
+ if self.urlstring.startswith("git+"):
+ self.urlstring = self.urlstring.replace("git+", "", 1)
+
+ def download(self, location: str):
+ raise NotImplementedError("not supported")
+
+ @umask(0o077)
+ def upload(self, location: str):
+ scheme = self.url.scheme
+ _, _, scheme = scheme.partition("+")
+ netloc = self.url.netloc
+ url = Path(self.url.path).parent
+ with tempfile.TemporaryDirectory(prefix="git-commit-archive-") as directory:
+ # Determine username, fullname, email for Git commit
+ pwd_entry = pwd.getpwuid(os.getuid())
+ user = pwd_entry.pw_name
+ name = pwd_entry.pw_gecos.split(",")[0] or user
+ fqdn = socket.getfqdn()
+ email = f"{user}@{fqdn}"
+
+ # environment vars for our git commands
+ env = {
+ "GIT_TERMINAL_PROMPT": "0",
+ "GIT_AUTHOR_NAME": name,
+ "GIT_AUTHOR_EMAIL": email,
+ "GIT_COMMITTER_NAME": name,
+ "GIT_COMMITTER_EMAIL": email,
+ }
+
+ # build ssh command for git
+ ssh_command = ["ssh"]
+
+ # if we are not interactive, we use StrictHostKeyChecking=yes to avoid any prompts
+ if not sys.stdout.isatty():
+ ssh_command += ["-o", "StrictHostKeyChecking=yes"]
+
+ env["GIT_SSH_COMMAND"] = " ".join(ssh_command)
+
+ # git clone
+ path_repository = Path(directory) / "repository"
+ scheme = f"{scheme}://" if scheme else ""
+ rc, out = rc_cmd(
+ [self.command, "clone", f"{scheme}{netloc}{url}", str(path_repository), "--depth=1"],
+ env=env,
+ shell=False,
+ )
+ if rc:
+ raise Exception(out)
+
+ # git add
+ filename = Path(Path(self.url.path).name).stem
+ dst = path_repository / filename
+ shutil.copy2(location, dst)
+ rc, out = rc_cmd(
+ [self.command, "-C", str(path_repository), "add", filename],
+ env=env,
+ shell=False,
+ )
+
+ # git commit -m
+ commit_message = os.environ.get("COMMIT_COMMENT", "commit")
+ rc, out = rc_cmd(
+ [self.command, "-C", str(path_repository), "commit", "-m", commit_message],
+ env=env,
+ shell=False,
+ )
+
+ # git push
+ rc, out = rc_cmd(
+ [self.command, "-C", str(path_repository), "push"],
+ env=env,
+ shell=False,
+ )
+ if rc:
+ raise Exception(out)
+
def urlc(urlstring, *args, **kwargs):
"""
Dynamically dispatch the appropriate protocol class.
"""
- url_classes = {'http': HttpC, 'https': HttpC, 'ftp': FtpC, 'ftps': FtpC, \
- 'sftp': SshC, 'ssh': SshC, 'scp': SshC, 'tftp': TftpC}
+ url_classes = {
+ "http": HttpC,
+ "https": HttpC,
+ "ftp": FtpC,
+ "ftps": FtpC,
+ "sftp": SshC,
+ "ssh": SshC,
+ "scp": SshC,
+ "tftp": TftpC,
+ "git": GitC,
+ }
url = urllib.parse.urlsplit(urlstring)
+ scheme, _, _ = url.scheme.partition("+")
try:
- return url_classes[url.scheme](url, *args, **kwargs)
+ return url_classes[scheme](url, *args, **kwargs)
except KeyError:
- raise ValueError(f'Unsupported URL scheme: "{url.scheme}"')
+ raise ValueError(f'Unsupported URL scheme: "{scheme}"')
def download(local_path, urlstring, progressbar=False, check_space=False,
- source_host='', source_port=0, timeout=10.0):
+ source_host='', source_port=0, timeout=10.0, raise_error=False):
try:
progressbar = progressbar and is_interactive()
urlc(urlstring, progressbar, check_space, source_host, source_port, timeout).download(local_path)
except Exception as err:
+ if raise_error:
+ raise
print_error(f'Unable to download "{urlstring}": {err}')
sys.exit(1)
except KeyboardInterrupt:
@@ -339,7 +454,7 @@ def upload(local_path, urlstring, progressbar=False,
source_host='', source_port=0, timeout=10.0):
try:
progressbar = progressbar and is_interactive()
- urlc(urlstring, progressbar, source_host, source_port, timeout).upload(local_path)
+ urlc(urlstring, progressbar, False, source_host, source_port, timeout).upload(local_path)
except Exception as err:
print_error(f'Unable to upload "{urlstring}": {err}')
sys.exit(1)
diff --git a/python/vyos/system/__init__.py b/python/vyos/system/__init__.py
new file mode 100644
index 000000000..0c91330ba
--- /dev/null
+++ b/python/vyos/system/__init__.py
@@ -0,0 +1,18 @@
+# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+__all_: list[str] = ['disk', 'grub', 'image']
+# define image-tools version
+SYSTEM_CFG_VER = 1
diff --git a/python/vyos/system/compat.py b/python/vyos/system/compat.py
new file mode 100644
index 000000000..37b834ad6
--- /dev/null
+++ b/python/vyos/system/compat.py
@@ -0,0 +1,327 @@
+# Copyright 2023-2024 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+from pathlib import Path
+from re import compile, MULTILINE, DOTALL
+from functools import wraps
+from copy import deepcopy
+from typing import Union
+
+from vyos.system import disk, grub, image, SYSTEM_CFG_VER
+from vyos.template import render
+
+TMPL_GRUB_COMPAT: str = 'grub/grub_compat.j2'
+
+# define regexes and variables
+REGEX_VERSION = r'^menuentry "[^\n]*{\n[^}]*\s+linux /boot/(?P<version>\S+)/[^}]*}'
+REGEX_MENUENTRY = r'^menuentry "[^\n]*{\n[^}]*\s+linux /boot/(?P<version>\S+)/vmlinuz (?P<options>[^\n]+)\n[^}]*}'
+REGEX_CONSOLE = r'^.*console=(?P<console_type>[^\s\d]+)(?P<console_num>[\d]+)(,(?P<console_speed>[\d]+))?.*$'
+REGEX_SANIT_CONSOLE = r'\ ?console=[^\s\d]+[\d]+(,\d+)?\ ?'
+REGEX_SANIT_INIT = r'\ ?init=\S*\ ?'
+REGEX_SANIT_QUIET = r'\ ?quiet\ ?'
+PW_RESET_OPTION = 'init=/opt/vyatta/sbin/standalone_root_pw_reset'
+
+
+class DowngradingImageTools(Exception):
+ """Raised when attempting to add an image with an earlier version
+ of image-tools than the current system, as indicated by the value
+ of SYSTEM_CFG_VER or absence thereof."""
+ pass
+
+
+def mode():
+ if grub.get_cfg_ver() >= SYSTEM_CFG_VER:
+ return False
+
+ return True
+
+
+def find_versions(menu_entries: list) -> list:
+ """Find unique VyOS versions from menu entries
+
+ Args:
+ menu_entries (list): a list with menu entries
+
+ Returns:
+ list: List of installed versions
+ """
+ versions = []
+ for vyos_ver in menu_entries:
+ versions.append(vyos_ver.get('version'))
+ # remove duplicates
+ versions = list(set(versions))
+ return versions
+
+
+def filter_unparsed(grub_path: str) -> str:
+ """Find currently installed VyOS version
+
+ Args:
+ grub_path (str): a path to the grub.cfg file
+
+ Returns:
+ str: unparsed grub.cfg items
+ """
+ config_text = Path(grub_path).read_text()
+ regex_filter = compile(REGEX_VERSION, MULTILINE | DOTALL)
+ filtered = regex_filter.sub('', config_text)
+ regex_filter = compile(grub.REGEX_GRUB_VARS, MULTILINE)
+ filtered = regex_filter.sub('', filtered)
+ regex_filter = compile(grub.REGEX_GRUB_MODULES, MULTILINE)
+ filtered = regex_filter.sub('', filtered)
+ # strip extra new lines
+ filtered = filtered.strip()
+ return filtered
+
+
+def get_search_root(unparsed: str) -> str:
+ unparsed_lines = unparsed.splitlines()
+ search_root = next((x for x in unparsed_lines if 'search' in x), '')
+ return search_root
+
+
+def sanitize_boot_opts(boot_opts: str) -> str:
+ """Sanitize boot options from console and init
+
+ Args:
+ boot_opts (str): boot options
+
+ Returns:
+ str: sanitized boot options
+ """
+ regex_filter = compile(REGEX_SANIT_CONSOLE)
+ boot_opts = regex_filter.sub('', boot_opts)
+ regex_filter = compile(REGEX_SANIT_INIT)
+ boot_opts = regex_filter.sub('', boot_opts)
+ # legacy tools add 'quiet' on add system image; this is not desired
+ regex_filter = compile(REGEX_SANIT_QUIET)
+ boot_opts = regex_filter.sub(' ', boot_opts)
+
+ return boot_opts
+
+
+def parse_entry(entry: tuple) -> dict:
+ """Parse GRUB menuentry
+
+ Args:
+ entry (tuple): tuple of (version, options)
+
+ Returns:
+ dict: dictionary with parsed options
+ """
+ # save version to dict
+ entry_dict = {'version': entry[0]}
+ # detect boot mode type
+ if PW_RESET_OPTION in entry[1]:
+ entry_dict['bootmode'] = 'pw_reset'
+ else:
+ entry_dict['bootmode'] = 'normal'
+ # find console type and number
+ regex_filter = compile(REGEX_CONSOLE)
+ entry_dict.update(regex_filter.match(entry[1]).groupdict())
+ speed = entry_dict.get('console_speed', None)
+ entry_dict['console_speed'] = speed if speed is not None else '115200'
+ entry_dict['boot_opts'] = sanitize_boot_opts(entry[1])
+
+ return entry_dict
+
+
+def parse_menuentries(grub_path: str) -> list:
+ """Parse all GRUB menuentries
+
+ Args:
+ grub_path (str): a path to GRUB config file
+
+ Returns:
+ list: list with menu items (each item is a dict)
+ """
+ menuentries = []
+ # read configuration file
+ config_text = Path(grub_path).read_text()
+ # parse menuentries to tuples (version, options)
+ regex_filter = compile(REGEX_MENUENTRY, MULTILINE)
+ filter_results = regex_filter.findall(config_text)
+ # parse each entry
+ for entry in filter_results:
+ menuentries.append(parse_entry(entry))
+
+ return menuentries
+
+
+def prune_vyos_versions(root_dir: str = '') -> None:
+ """Delete vyos-versions files of registered images subsequently deleted
+ or renamed by legacy image-tools
+
+ Args:
+ root_dir (str): an optional path to the root directory
+ """
+ if not root_dir:
+ root_dir = disk.find_persistence()
+
+ version_files = Path(f'{root_dir}/{grub.GRUB_DIR_VYOS_VERS}').glob('*.cfg')
+
+ for file in version_files:
+ version = Path(file).stem
+ if not Path(f'{root_dir}/boot/{version}').is_dir():
+ grub.version_del(version, root_dir)
+
+
+def update_cfg_ver(root_dir:str = '') -> int:
+ """Get minumum version of image-tools across all installed images
+
+ Args:
+ root_dir (str): an optional path to the root directory
+
+ Returns:
+ int: minimum version of image-tools
+ """
+ if not root_dir:
+ root_dir = disk.find_persistence()
+
+ prune_vyos_versions(root_dir)
+
+ images_details = image.get_images_details()
+ cfg_version = min(d['tools_version'] for d in images_details)
+
+ return cfg_version
+
+
+def get_default(menu_entries: list, root_dir: str = '') -> Union[int, None]:
+ """Translate default version to menuentry index
+
+ Args:
+ menu_entries (list): list of dicts of installed version boot data
+ root_dir (str): an optional path to the root directory
+
+ Returns:
+ int: index of default version in menu_entries or None
+ """
+ if not root_dir:
+ root_dir = disk.find_persistence()
+
+ grub_cfg_main = f'{root_dir}/{grub.GRUB_CFG_MAIN}'
+
+ image_name = image.get_default_image()
+
+ sublist = list(filter(lambda x: x.get('version') == image_name,
+ menu_entries))
+ if sublist:
+ return menu_entries.index(sublist[0])
+
+ return None
+
+
+def update_version_list(root_dir: str = '') -> list[dict]:
+ """Update list of dicts of installed version boot data
+
+ Args:
+ root_dir (str): an optional path to the root directory
+
+ Returns:
+ list: list of dicts of installed version boot data
+ """
+ if not root_dir:
+ root_dir = disk.find_persistence()
+
+ grub_cfg_main = f'{root_dir}/{grub.GRUB_CFG_MAIN}'
+
+ # get list of versions in menuentries
+ menu_entries = parse_menuentries(grub_cfg_main)
+ menu_versions = find_versions(menu_entries)
+
+ # get list of versions added/removed by image-tools
+ current_versions = grub.version_list(root_dir)
+
+ remove = list(set(menu_versions) - set(current_versions))
+ for ver in remove:
+ menu_entries = list(filter(lambda x: x.get('version') != ver,
+ menu_entries))
+
+ # reset boot_opts in case of config update
+ for entry in menu_entries:
+ entry['boot_opts'] = grub.get_boot_opts(entry['version'])
+
+ add = list(set(current_versions) - set(menu_versions))
+ for ver in add:
+ last = menu_entries[0].get('version')
+ new = deepcopy(list(filter(lambda x: x.get('version') == last,
+ menu_entries)))
+ for e in new:
+ boot_opts = grub.get_boot_opts(ver)
+ e.update({'version': ver, 'boot_opts': boot_opts})
+
+ menu_entries = new + menu_entries
+
+ return menu_entries
+
+
+def grub_cfg_fields(root_dir: str = '') -> dict:
+ """Gather fields for rendering grub.cfg
+
+ Args:
+ root_dir (str): an optional path to the root directory
+
+ Returns:
+ dict: dictionary for rendering TMPL_GRUB_COMPAT
+ """
+ if not root_dir:
+ root_dir = disk.find_persistence()
+
+ grub_cfg_main = f'{root_dir}/{grub.GRUB_CFG_MAIN}'
+ grub_vars = f'{root_dir}/{grub.CFG_VYOS_VARS}'
+
+ fields = grub.vars_read(grub_vars)
+ # 'default' and 'timeout' from legacy grub.cfg resets 'default' to
+ # index, rather than uuid
+ fields |= grub.vars_read(grub_cfg_main)
+
+ fields['tools_version'] = SYSTEM_CFG_VER
+ menu_entries = update_version_list(root_dir)
+ fields['versions'] = menu_entries
+
+ default = get_default(menu_entries, root_dir)
+ if default is not None:
+ fields['default'] = default
+
+ modules = grub.modules_read(grub_cfg_main)
+ fields['modules'] = modules
+
+ unparsed = filter_unparsed(grub_cfg_main).splitlines()
+ search_root = next((x for x in unparsed if 'search' in x), '')
+ fields['search_root'] = search_root
+
+ return fields
+
+
+def render_grub_cfg(root_dir: str = '') -> None:
+ """Render grub.cfg for legacy compatibility"""
+ if not root_dir:
+ root_dir = disk.find_persistence()
+
+ grub_cfg_main = f'{root_dir}/{grub.GRUB_CFG_MAIN}'
+
+ fields = grub_cfg_fields(root_dir)
+ render(grub_cfg_main, TMPL_GRUB_COMPAT, fields)
+
+
+def grub_cfg_update(func):
+ """Decorator to update grub.cfg after function call"""
+ @wraps(func)
+ def wrapper(*args, **kwargs):
+ ret = func(*args, **kwargs)
+ if mode():
+ render_grub_cfg()
+ return ret
+ return wrapper
diff --git a/python/vyos/system/disk.py b/python/vyos/system/disk.py
new file mode 100644
index 000000000..c8908cd5c
--- /dev/null
+++ b/python/vyos/system/disk.py
@@ -0,0 +1,242 @@
+# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+from json import loads as json_loads
+from os import sync
+from dataclasses import dataclass
+from time import sleep
+
+from psutil import disk_partitions
+
+from vyos.utils.process import run, cmd
+
+
+@dataclass
+class DiskDetails:
+ """Disk details"""
+ name: str
+ partition: dict[str, str]
+
+
+def disk_cleanup(drive_path: str) -> None:
+ """Clean up disk partition table (MBR and GPT)
+ Remove partition and device signatures.
+ Zeroize primary and secondary headers - first and last 17408 bytes
+ (512 bytes * 34 LBA) on a drive
+
+ Args:
+ drive_path (str): path to a drive that needs to be cleaned
+ """
+ partitions: list[str] = partition_list(drive_path)
+ for partition in partitions:
+ run(f'wipefs -af {partition}')
+ run(f'wipefs -af {drive_path}')
+ run(f'sgdisk -Z {drive_path}')
+
+
+def find_persistence() -> str:
+ """Find a mountpoint for persistence storage
+
+ Returns:
+ str: Path where 'persistance' pertition is mounted, Empty if not found
+ """
+ mounted_partitions = disk_partitions()
+ for partition in mounted_partitions:
+ if partition.mountpoint.endswith('/persistence'):
+ return partition.mountpoint
+ return ''
+
+
+def parttable_create(drive_path: str, root_size: int) -> None:
+ """Create a hybrid MBR/GPT partition table
+ 0-2047 first sectors are free
+ 2048-4095 sectors - BIOS Boot Partition
+ 4096 + 256 MB - EFI system partition
+ Everything else till the end of a drive - Linux partition
+
+ Args:
+ drive_path (str): path to a drive
+ """
+ if not root_size:
+ root_size_text: str = '+100%'
+ else:
+ root_size_text: str = str(root_size)
+ command = f'sgdisk -a1 -n1:2048:4095 -t1:EF02 -n2:4096:+256M -t2:EF00 \
+ -n3:0:+{root_size_text}K -t3:8300 {drive_path}'
+
+ run(command)
+ # update partitons in kernel
+ sync()
+ run(f'partx -u {drive_path}')
+
+ partitions: list[str] = partition_list(drive_path)
+
+ disk: DiskDetails = DiskDetails(
+ name = drive_path,
+ partition = {
+ 'efi': next(x for x in partitions if x.endswith('2')),
+ 'root': next(x for x in partitions if x.endswith('3'))
+ }
+ )
+
+ return disk
+
+
+def partition_list(drive_path: str) -> list[str]:
+ """Get a list of partitions on a drive
+
+ Args:
+ drive_path (str): path to a drive
+
+ Returns:
+ list[str]: a list of partition paths
+ """
+ lsblk: str = cmd(f'lsblk -Jp {drive_path}')
+ drive_info: dict = json_loads(lsblk)
+ device: list = drive_info.get('blockdevices')
+ children: list[str] = device[0].get('children', []) if device else []
+ partitions: list[str] = [child.get('name') for child in children]
+ return partitions
+
+
+def partition_parent(partition_path: str) -> str:
+ """Get a parent device for a partition
+
+ Args:
+ partition (str): path to a partition
+
+ Returns:
+ str: path to a parent device
+ """
+ parent: str = cmd(f'lsblk -ndpo pkname {partition_path}')
+ return parent
+
+
+def from_partition(partition_path: str) -> DiskDetails:
+ drive_path: str = partition_parent(partition_path)
+ partitions: list[str] = partition_list(drive_path)
+
+ disk: DiskDetails = DiskDetails(
+ name = drive_path,
+ partition = {
+ 'efi': next(x for x in partitions if x.endswith('2')),
+ 'root': next(x for x in partitions if x.endswith('3'))
+ }
+ )
+
+ return disk
+
+def filesystem_create(partition: str, fstype: str) -> None:
+ """Create a filesystem on a partition
+
+ Args:
+ partition (str): path to a partition (for example: '/dev/sda1')
+ fstype (str): filesystem type ('efi' or 'ext4')
+ """
+ if fstype == 'efi':
+ command = 'mkfs -t fat -n EFI'
+ run(f'{command} {partition}')
+ if fstype == 'ext4':
+ command = 'mkfs -t ext4 -L persistence'
+ run(f'{command} {partition}')
+
+
+def partition_mount(partition: str,
+ path: str,
+ fsype: str = '',
+ overlay_params: dict[str, str] = {}) -> bool:
+ """Mount a partition into a path
+
+ Args:
+ partition (str): path to a partition (for example: '/dev/sda1')
+ path (str): a path where to mount
+ fsype (str): optionally, set fstype ('squashfs', 'overlay', 'iso9660')
+ overlay_params (dict): optionally, set overlay parameters.
+ Defaults to None.
+
+ Returns:
+ bool: True on success
+ """
+ if fsype in ['squashfs', 'iso9660']:
+ command: str = f'mount -o loop,ro -t {fsype} {partition} {path}'
+ if fsype == 'overlay' and overlay_params:
+ command: str = f'mount -t overlay -o noatime,\
+ upperdir={overlay_params["upperdir"]},\
+ lowerdir={overlay_params["lowerdir"]},\
+ workdir={overlay_params["workdir"]} overlay {path}'
+
+ else:
+ command = f'mount {partition} {path}'
+
+ rc = run(command)
+ if rc == 0:
+ return True
+
+ return False
+
+
+def partition_umount(partition: str = '', path: str = '') -> None:
+ """Umount a partition by a partition name or a path
+
+ Args:
+ partition (str): path to a partition (for example: '/dev/sda1')
+ path (str): a path where a partition is mounted
+ """
+ if partition:
+ command = f'umount {partition}'
+ run(command)
+ if path:
+ command = f'umount {path}'
+ run(command)
+
+
+def find_device(mountpoint: str) -> str:
+ """Find a device by mountpoint
+
+ Returns:
+ str: Path to device, Empty if not found
+ """
+ mounted_partitions = disk_partitions(all=True)
+ for partition in mounted_partitions:
+ if partition.mountpoint == mountpoint:
+ return partition.mountpoint
+ return ''
+
+
+def wait_for_umount(mountpoint: str = '') -> None:
+ """Wait (within reason) for umount to complete
+ """
+ i = 0
+ while find_device(mountpoint):
+ i += 1
+ if i == 5:
+ print(f'Warning: {mountpoint} still mounted')
+ break
+ sleep(1)
+
+
+def disks_size() -> dict[str, int]:
+ """Get a dictionary with physical disks and their sizes
+
+ Returns:
+ dict[str, int]: a dictionary with name: size mapping
+ """
+ disks_size: dict[str, int] = {}
+ lsblk: str = cmd('lsblk -Jbp')
+ blk_list = json_loads(lsblk)
+ for device in blk_list.get('blockdevices'):
+ if device['type'] == 'disk':
+ disks_size.update({device['name']: device['size']})
+ return disks_size
diff --git a/python/vyos/system/grub.py b/python/vyos/system/grub.py
new file mode 100644
index 000000000..2e8b20972
--- /dev/null
+++ b/python/vyos/system/grub.py
@@ -0,0 +1,424 @@
+# Copyright 2023-2024 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+import platform
+
+from pathlib import Path
+from re import MULTILINE, compile as re_compile
+from typing import Union
+from uuid import uuid5, NAMESPACE_URL, UUID
+
+from vyos.template import render
+from vyos.utils.process import cmd, rc_cmd
+from vyos.system import disk
+
+# Define variables
+GRUB_DIR_MAIN: str = '/boot/grub'
+GRUB_CFG_MAIN: str = f'{GRUB_DIR_MAIN}/grub.cfg'
+GRUB_DIR_VYOS: str = f'{GRUB_DIR_MAIN}/grub.cfg.d'
+CFG_VYOS_HEADER: str = f'{GRUB_DIR_VYOS}/00-vyos-header.cfg'
+CFG_VYOS_MODULES: str = f'{GRUB_DIR_VYOS}/10-vyos-modules-autoload.cfg'
+CFG_VYOS_VARS: str = f'{GRUB_DIR_VYOS}/20-vyos-defaults-autoload.cfg'
+CFG_VYOS_COMMON: str = f'{GRUB_DIR_VYOS}/25-vyos-common-autoload.cfg'
+CFG_VYOS_PLATFORM: str = f'{GRUB_DIR_VYOS}/30-vyos-platform-autoload.cfg'
+CFG_VYOS_MENU: str = f'{GRUB_DIR_VYOS}/40-vyos-menu-autoload.cfg'
+CFG_VYOS_OPTIONS: str = f'{GRUB_DIR_VYOS}/50-vyos-options.cfg'
+GRUB_DIR_VYOS_VERS: str = f'{GRUB_DIR_VYOS}/vyos-versions'
+
+TMPL_VYOS_VERSION: str = 'grub/grub_vyos_version.j2'
+TMPL_GRUB_VARS: str = 'grub/grub_vars.j2'
+TMPL_GRUB_MAIN: str = 'grub/grub_main.j2'
+TMPL_GRUB_MENU: str = 'grub/grub_menu.j2'
+TMPL_GRUB_MODULES: str = 'grub/grub_modules.j2'
+TMPL_GRUB_OPTS: str = 'grub/grub_options.j2'
+TMPL_GRUB_COMMON: str = 'grub/grub_common.j2'
+
+# default boot options
+BOOT_OPTS_STEM: str = 'boot=live rootdelay=5 noautologin net.ifnames=0 biosdevname=0 vyos-union=/boot/'
+
+# prepare regexes
+REGEX_GRUB_VARS: str = r'^set (?P<variable_name>.+)=[\'"]?(?P<variable_value>.*)(?<![\'"])[\'"]?$'
+REGEX_GRUB_MODULES: str = r'^insmod (?P<module_name>.+)$'
+REGEX_KERNEL_CMDLINE: str = r'^BOOT_IMAGE=/(?P<boot_type>boot|live)/((?P<image_version>.+)/)?vmlinuz.*$'
+REGEX_GRUB_BOOT_OPTS: str = r'^\s*set boot_opts="(?P<boot_opts>[^$]+)"$'
+
+
+def install(drive_path: str, boot_dir: str, efi_dir: str, id: str = 'VyOS') -> None:
+ """Install GRUB for both BIOS and EFI modes (hybrid boot)
+
+ Args:
+ drive_path (str): path to a drive where GRUB must be installed
+ boot_dir (str): a path to '/boot' directory
+ efi_dir (str): a path to '/boot/efi' directory
+ """
+
+ efi_installation_arch = "x86_64"
+ if platform.machine() == "aarch64":
+ efi_installation_arch = "arm64"
+ elif platform.machine() == "x86_64":
+ cmd(
+ f'grub-install --no-floppy --target=i386-pc \
+ --boot-directory={boot_dir} {drive_path} --force'
+ )
+
+ cmd(
+ f'grub-install --no-floppy --recheck --target={efi_installation_arch}-efi \
+ --force-extra-removable --boot-directory={boot_dir} \
+ --efi-directory={efi_dir} --bootloader-id="{id}" \
+ --no-uefi-secure-boot'
+ )
+
+
+def gen_version_uuid(version_name: str) -> str:
+ """Generate unique ID from version name
+
+ Use UUID5 / NAMESPACE_URL with prefix `uuid5-`
+
+ Args:
+ version_name (str): version name
+
+ Returns:
+ str: generated unique ID
+ """
+ ver_uuid: UUID = uuid5(NAMESPACE_URL, version_name)
+ ver_id: str = f'uuid5-{ver_uuid}'
+ return ver_id
+
+
+def version_add(version_name: str,
+ root_dir: str = '',
+ boot_opts: str = '',
+ boot_opts_config = None) -> None:
+ """Add a new VyOS version to GRUB loader configuration
+
+ Args:
+ vyos_version (str): VyOS version name
+ root_dir (str): an optional path to the root directory.
+ Defaults to empty.
+ boot_opts (str): an optional boot options for Linux kernel.
+ Defaults to empty.
+ """
+ if not root_dir:
+ root_dir = disk.find_persistence()
+ version_config: str = f'{root_dir}/{GRUB_DIR_VYOS_VERS}/{version_name}.cfg'
+ render(
+ version_config, TMPL_VYOS_VERSION, {
+ 'version_name': version_name,
+ 'version_uuid': gen_version_uuid(version_name),
+ 'boot_opts_default': BOOT_OPTS_STEM + version_name,
+ 'boot_opts': boot_opts,
+ 'boot_opts_config': boot_opts_config
+ })
+
+
+def version_del(vyos_version: str, root_dir: str = '') -> None:
+ """Delete a VyOS version from GRUB loader configuration
+
+ Args:
+ vyos_version (str): VyOS version name
+ root_dir (str): an optional path to the root directory.
+ Defaults to empty.
+ """
+ if not root_dir:
+ root_dir = disk.find_persistence()
+ version_config: str = f'{root_dir}/{GRUB_DIR_VYOS_VERS}/{vyos_version}.cfg'
+ Path(version_config).unlink(missing_ok=True)
+
+
+def version_list(root_dir: str = '') -> list[str]:
+ """Generate a list with installed VyOS versions
+
+ Args:
+ root_dir (str): an optional path to the root directory.
+ Defaults to empty.
+
+ Returns:
+ list: A list with versions names
+
+ N.B. coreutils stat reports st_birthtime, but not available in
+ Path.stat()/os.stat()
+ """
+ if not root_dir:
+ root_dir = disk.find_persistence()
+ versions_files = Path(f'{root_dir}/{GRUB_DIR_VYOS_VERS}').glob('*.cfg')
+ versions_order: dict[str, int] = {}
+ for file in versions_files:
+ p = Path(root_dir).joinpath('boot').joinpath(file.stem)
+ command = f'stat -c %W {p.as_posix()}'
+ rc, out = rc_cmd(command)
+ if rc == 0:
+ versions_order[file.stem] = int(out)
+ versions_order = sorted(versions_order, key=versions_order.get, reverse=True)
+ versions_list: list[str] = list(versions_order)
+
+ return versions_list
+
+
+def read_env(env_file: str = '') -> dict[str, str]:
+ """Read GRUB environment
+
+ Args:
+ env_file (str, optional): a path to grub environment file.
+ Defaults to empty.
+
+ Returns:
+ dict: dictionary with GRUB environment
+ """
+ if not env_file:
+ root_dir: str = disk.find_persistence()
+ env_file = f'{root_dir}/{GRUB_DIR_MAIN}/grubenv'
+
+ env_content: str = cmd(f'grub-editenv {env_file} list').splitlines()
+ regex_filter = re_compile(r'^(?P<variable_name>.*)=(?P<variable_value>.*)$')
+ env_dict: dict[str, str] = {}
+ for env_item in env_content:
+ search_result = regex_filter.fullmatch(env_item)
+ if search_result:
+ search_result_dict: dict[str, str] = search_result.groupdict()
+ variable_name: str = search_result_dict.get('variable_name', '')
+ variable_value: str = search_result_dict.get('variable_value', '')
+ if variable_name and variable_value:
+ env_dict.update({variable_name: variable_value})
+ return env_dict
+
+
+def get_cfg_ver(root_dir: str = '') -> int:
+ """Get current version of GRUB configuration
+
+ Args:
+ root_dir (str, optional): an optional path to the root directory.
+ Defaults to empty.
+
+ Returns:
+ int: a configuration version
+ """
+ if not root_dir:
+ root_dir = disk.find_persistence()
+
+ cfg_ver: str = vars_read(f'{root_dir}/{CFG_VYOS_HEADER}').get(
+ 'VYOS_CFG_VER')
+ if cfg_ver:
+ cfg_ver_int: int = int(cfg_ver)
+ else:
+ cfg_ver_int: int = 0
+ return cfg_ver_int
+
+
+def write_cfg_ver(cfg_ver: int, root_dir: str = '') -> None:
+ """Write version number of GRUB configuration
+
+ Args:
+ cfg_ver (int): a version number to write
+ root_dir (str, optional): an optional path to the root directory.
+ Defaults to empty.
+
+ Returns:
+ int: a configuration version
+ """
+ if not root_dir:
+ root_dir = disk.find_persistence()
+
+ vars_file: str = f'{root_dir}/{CFG_VYOS_HEADER}'
+ vars_current: dict[str, str] = vars_read(vars_file)
+ vars_current['VYOS_CFG_VER'] = str(cfg_ver)
+ vars_write(vars_file, vars_current)
+
+
+def vars_read(grub_cfg: str) -> dict[str, str]:
+ """Read variables from a GRUB configuration file
+
+ Args:
+ grub_cfg (str): a path to the GRUB config file
+
+ Returns:
+ dict: a dictionary with variables and values
+ """
+ vars_dict: dict[str, str] = {}
+ regex_filter = re_compile(REGEX_GRUB_VARS)
+ try:
+ config_text: list[str] = Path(grub_cfg).read_text().splitlines()
+ except FileNotFoundError:
+ return vars_dict
+ for line in config_text:
+ search_result = regex_filter.fullmatch(line)
+ if search_result:
+ search_dict = search_result.groupdict()
+ variable_name: str = search_dict.get('variable_name', '')
+ variable_value: str = search_dict.get('variable_value', '')
+ if variable_name and variable_value:
+ vars_dict.update({variable_name: variable_value})
+ return vars_dict
+
+
+def modules_read(grub_cfg: str) -> list[str]:
+ """Read modules list from a GRUB configuration file
+
+ Args:
+ grub_cfg (str): a path to the GRUB config file
+
+ Returns:
+ list: a list with modules to load
+ """
+ mods_list: list[str] = []
+ regex_filter = re_compile(REGEX_GRUB_MODULES, MULTILINE)
+ try:
+ config_text = Path(grub_cfg).read_text()
+ except FileNotFoundError:
+ return mods_list
+ mods_list = regex_filter.findall(config_text)
+
+ return mods_list
+
+
+def modules_write(grub_cfg: str, mods_list: list[str]) -> None:
+ """Write modules list to a GRUB configuration file (overwrite everything)
+
+ Args:
+ grub_cfg (str): a path to GRUB configuration file
+ mods_list (list): a list with modules to load
+ """
+ render(grub_cfg, TMPL_GRUB_MODULES, {'mods_list': mods_list})
+
+
+def vars_write(grub_cfg: str, grub_vars: dict[str, str]) -> None:
+ """Write variables to a GRUB configuration file (overwrite everything)
+
+ Args:
+ grub_cfg (str): a path to GRUB configuration file
+ grub_vars (dict): a dictionary with new variables
+ """
+ render(grub_cfg, TMPL_GRUB_VARS, {'vars': grub_vars})
+
+def get_boot_opts(version_name: str, root_dir: str = '') -> str:
+ """Read boot_opts setting from version file; return default setting on
+ any failure.
+
+ Args:
+ version_name (str): version name
+ root_dir (str, optional): an optional path to the root directory.
+ Defaults to empty.
+ """
+ if not root_dir:
+ root_dir = disk.find_persistence()
+
+ boot_opts_default: str = BOOT_OPTS_STEM + version_name
+ boot_opts: str = ''
+ regex_filter = re_compile(REGEX_GRUB_BOOT_OPTS)
+ version_config: str = f'{root_dir}/{GRUB_DIR_VYOS_VERS}/{version_name}.cfg'
+ try:
+ config_text: list[str] = Path(version_config).read_text().splitlines()
+ except FileNotFoundError:
+ return boot_opts_default
+ for line in config_text:
+ search_result = regex_filter.fullmatch(line)
+ if search_result:
+ search_dict = search_result.groupdict()
+ boot_opts = search_dict.get('boot_opts', '')
+ break
+
+ if not boot_opts:
+ boot_opts = boot_opts_default
+
+ return boot_opts
+
+def set_default(version_name: str, root_dir: str = '') -> None:
+ """Set version as default boot entry
+
+ Args:
+ version_name (str): version name
+ root_dir (str, optional): an optional path to the root directory.
+ Defaults to empty.
+ """
+ if not root_dir:
+ root_dir = disk.find_persistence()
+
+ vars_file = f'{root_dir}/{CFG_VYOS_VARS}'
+ vars_current = vars_read(vars_file)
+ vars_current['default'] = gen_version_uuid(version_name)
+ vars_write(vars_file, vars_current)
+
+
+def common_write(root_dir: str = '', grub_common: dict[str, str] = {}) -> None:
+ """Write common GRUB configuration file (overwrite everything)
+
+ Args:
+ root_dir (str, optional): an optional path to the root directory.
+ Defaults to empty.
+ """
+ if not root_dir:
+ root_dir = disk.find_persistence()
+ common_config = f'{root_dir}/{CFG_VYOS_COMMON}'
+ render(common_config, TMPL_GRUB_COMMON, grub_common)
+
+
+def create_structure(root_dir: str = '') -> None:
+ """Create GRUB directories structure
+
+ Args:
+ root_dir (str, optional): an optional path to the root directory.
+ Defaults to ''.
+ """
+ if not root_dir:
+ root_dir = disk.find_persistence()
+
+ Path(f'{root_dir}/GRUB_DIR_VYOS_VERS').mkdir(parents=True, exist_ok=True)
+
+
+def set_console_type(console_type: str, root_dir: str = '') -> None:
+ """Write default console type to GRUB configuration
+
+ Args:
+ console_type (str): a default console type
+ root_dir (str, optional): an optional path to the root directory.
+ Defaults to empty.
+ """
+ if not root_dir:
+ root_dir = disk.find_persistence()
+
+ vars_file: str = f'{root_dir}/{CFG_VYOS_VARS}'
+ vars_current: dict[str, str] = vars_read(vars_file)
+ vars_current['console_type'] = str(console_type)
+ vars_write(vars_file, vars_current)
+
+def set_console_speed(console_speed: str, root_dir: str = '') -> None:
+ """Write default console speed to GRUB configuration
+
+ Args:
+ console_speed (str): default console speed
+ root_dir (str, optional): an optional path to the root directory.
+ Defaults to empty.
+ """
+ if not root_dir:
+ root_dir = disk.find_persistence()
+
+ vars_file: str = f'{root_dir}/{CFG_VYOS_VARS}'
+ vars_current: dict[str, str] = vars_read(vars_file)
+ vars_current['console_speed'] = str(console_speed)
+ vars_write(vars_file, vars_current)
+
+def set_kernel_cmdline_options(cmdline_options: str, version_name: str,
+ root_dir: str = '') -> None:
+ """Write additional cmdline options to GRUB configuration
+
+ Args:
+ cmdline_options (str): cmdline options to add to default boot line
+ version_name (str): image version name
+ root_dir (str, optional): an optional path to the root directory.
+ """
+ if not root_dir:
+ root_dir = disk.find_persistence()
+
+ version_add(version_name=version_name, root_dir=root_dir,
+ boot_opts_config=cmdline_options)
diff --git a/python/vyos/system/grub_util.py b/python/vyos/system/grub_util.py
new file mode 100644
index 000000000..4a3d8795e
--- /dev/null
+++ b/python/vyos/system/grub_util.py
@@ -0,0 +1,70 @@
+# Copyright 2024 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+from vyos.system import disk, grub, image, compat
+
+@compat.grub_cfg_update
+def set_console_speed(console_speed: str, root_dir: str = '') -> None:
+ """Write default console speed to GRUB configuration
+
+ Args:
+ console_speed (str): default console speed
+ root_dir (str, optional): an optional path to the root directory.
+ Defaults to empty.
+ """
+ if not root_dir:
+ root_dir = disk.find_persistence()
+
+ grub.set_console_speed(console_speed, root_dir)
+
+@image.if_not_live_boot
+def update_console_speed(console_speed: str, root_dir: str = '') -> None:
+ """Update console_speed if different from current value"""
+
+ if not root_dir:
+ root_dir = disk.find_persistence()
+
+ vars_file: str = f'{root_dir}/{grub.CFG_VYOS_VARS}'
+ vars_current: dict[str, str] = grub.vars_read(vars_file)
+ console_speed_current = vars_current.get('console_speed', None)
+ if console_speed != console_speed_current:
+ set_console_speed(console_speed, root_dir)
+
+@compat.grub_cfg_update
+def set_kernel_cmdline_options(cmdline_options: str, version: str = '',
+ root_dir: str = '') -> None:
+ """Write Kernel CLI cmdline options to GRUB configuration"""
+ if not root_dir:
+ root_dir = disk.find_persistence()
+
+ if not version:
+ version = image.get_running_image()
+
+ grub.set_kernel_cmdline_options(cmdline_options, version, root_dir)
+
+@image.if_not_live_boot
+def update_kernel_cmdline_options(cmdline_options: str,
+ root_dir: str = '') -> None:
+ """Update Kernel custom cmdline options"""
+ if not root_dir:
+ root_dir = disk.find_persistence()
+
+ version = image.get_running_image()
+
+ boot_opts_current = grub.get_boot_opts(version, root_dir)
+ boot_opts_proposed = grub.BOOT_OPTS_STEM + f'{version} {cmdline_options}'
+
+ if boot_opts_proposed != boot_opts_current:
+ set_kernel_cmdline_options(cmdline_options, version, root_dir)
diff --git a/python/vyos/system/image.py b/python/vyos/system/image.py
new file mode 100644
index 000000000..5460e6a36
--- /dev/null
+++ b/python/vyos/system/image.py
@@ -0,0 +1,279 @@
+# Copyright 2023-2024 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+from pathlib import Path
+from re import compile as re_compile
+from functools import wraps
+from tempfile import TemporaryDirectory
+from typing import TypedDict
+
+from vyos import version
+from vyos.system import disk, grub
+
+# Define variables
+GRUB_DIR_MAIN: str = '/boot/grub'
+GRUB_DIR_VYOS: str = f'{GRUB_DIR_MAIN}/grub.cfg.d'
+CFG_VYOS_VARS: str = f'{GRUB_DIR_VYOS}/20-vyos-defaults-autoload.cfg'
+GRUB_DIR_VYOS_VERS: str = f'{GRUB_DIR_VYOS}/vyos-versions'
+# prepare regexes
+REGEX_KERNEL_CMDLINE: str = r'^BOOT_IMAGE=/(?P<boot_type>boot|live)/((?P<image_version>.+)/)?vmlinuz.*$'
+REGEX_SYSTEM_CFG_VER: str = r'(\r\n|\r|\n)SYSTEM_CFG_VER\s*=\s*(?P<cfg_ver>\d+)(\r\n|\r|\n)'
+
+
+# structures definitions
+class ImageDetails(TypedDict):
+ name: str
+ version: str
+ tools_version: int
+ disk_ro: int
+ disk_rw: int
+ disk_total: int
+
+
+class BootDetails(TypedDict):
+ image_default: str
+ image_running: str
+ images_available: list[str]
+ console_type: str
+ console_num: int
+
+
+def bootmode_detect() -> str:
+ """Detect system boot mode
+
+ Returns:
+ str: 'bios' or 'efi'
+ """
+ if Path('/sys/firmware/efi/').exists():
+ return 'efi'
+ else:
+ return 'bios'
+
+
+def get_image_version(mount_path: str) -> str:
+ """Extract version name from rootfs mounted at mount_path
+
+ Args:
+ mount_path (str): mount path of rootfs
+
+ Returns:
+ str: version name
+ """
+ version_file: str = Path(
+ f'{mount_path}/opt/vyatta/etc/version').read_text()
+ version_name: str = version_file.lstrip('Version: ').strip()
+
+ return version_name
+
+
+def get_image_tools_version(mount_path: str) -> int:
+ """Extract image-tools version from rootfs mounted at mount_path
+
+ Args:
+ mount_path (str): mount path of rootfs
+
+ Returns:
+ str: image-tools version
+ """
+ try:
+ version_file: str = Path(
+ f'{mount_path}/usr/lib/python3/dist-packages/vyos/system/__init__.py').read_text()
+ except FileNotFoundError:
+ system_cfg_ver: int = 0
+ else:
+ res = re_compile(REGEX_SYSTEM_CFG_VER).search(version_file)
+ system_cfg_ver: int = int(res.groupdict().get('cfg_ver', 0))
+
+ return system_cfg_ver
+
+
+def get_versions(image_name: str, root_dir: str = '') -> dict[str, str]:
+ """Return versions of image and image-tools
+
+ Args:
+ image_name (str): a name of an image
+ root_dir (str, optional): an optional path to the root directory.
+ Defaults to ''.
+
+ Returns:
+ dict[str, int]: a dictionary with versions of image and image-tools
+ """
+ if not root_dir:
+ root_dir = disk.find_persistence()
+
+ squashfs_file: str = next(
+ Path(f'{root_dir}/boot/{image_name}').glob('*.squashfs')).as_posix()
+ with TemporaryDirectory() as squashfs_mounted:
+ disk.partition_mount(squashfs_file, squashfs_mounted, 'squashfs')
+
+ image_version: str = get_image_version(squashfs_mounted)
+ image_tools_version: int = get_image_tools_version(squashfs_mounted)
+
+ disk.partition_umount(squashfs_file)
+
+ versions: dict[str, int] = {
+ 'image': image_version,
+ 'image-tools': image_tools_version
+ }
+
+ return versions
+
+
+def get_details(image_name: str, root_dir: str = '') -> ImageDetails:
+ """Return information about image
+
+ Args:
+ image_name (str): a name of an image
+ root_dir (str, optional): an optional path to the root directory.
+ Defaults to ''.
+
+ Returns:
+ ImageDetails: a dictionary with details about an image (name, size)
+ """
+ if not root_dir:
+ root_dir = disk.find_persistence()
+
+ versions = get_versions(image_name, root_dir)
+ image_version: str = versions.get('image', '')
+ image_tools_version: int = versions.get('image-tools', 0)
+
+ image_path: Path = Path(f'{root_dir}/boot/{image_name}')
+ image_path_rw: Path = Path(f'{root_dir}/boot/{image_name}/rw')
+
+ image_disk_ro: int = int()
+ for item in image_path.iterdir():
+ if not item.is_symlink():
+ image_disk_ro += item.stat().st_size
+
+ image_disk_rw: int = int()
+ for item in image_path_rw.rglob('*'):
+ if not item.is_symlink():
+ image_disk_rw += item.stat().st_size
+
+ image_details: ImageDetails = {
+ 'name': image_name,
+ 'version': image_version,
+ 'tools_version': image_tools_version,
+ 'disk_ro': image_disk_ro,
+ 'disk_rw': image_disk_rw,
+ 'disk_total': image_disk_ro + image_disk_rw
+ }
+
+ return image_details
+
+
+def get_images_details() -> list[ImageDetails]:
+ """Return information about all images
+
+ Returns:
+ list[ImageDetails]: a list of dictionaries with details about images
+ """
+ images: list[str] = grub.version_list()
+ images_details: list[ImageDetails] = list()
+ for image_name in images:
+ images_details.append(get_details(image_name))
+
+ return images_details
+
+
+def get_running_image() -> str:
+ """Find currently running image name
+
+ Returns:
+ str: image name
+ """
+ running_image: str = ''
+ regex_filter = re_compile(REGEX_KERNEL_CMDLINE)
+ cmdline: str = Path('/proc/cmdline').read_text()
+ running_image_result = regex_filter.match(cmdline)
+ if running_image_result:
+ running_image: str = running_image_result.groupdict().get(
+ 'image_version', '')
+ # we need to have a fallback for live systems
+ if not running_image:
+ running_image: str = version.get_version()
+
+ return running_image
+
+
+def get_default_image(root_dir: str = '') -> str:
+ """Get default boot entry
+
+ Args:
+ root_dir (str, optional): an optional path to the root directory.
+ Defaults to empty.
+ Returns:
+ str: a version name
+ """
+ if not root_dir:
+ root_dir = disk.find_persistence()
+
+ vars_file: str = f'{root_dir}/{CFG_VYOS_VARS}'
+ vars_current: dict[str, str] = grub.vars_read(vars_file)
+ default_uuid: str = vars_current.get('default', '')
+ if default_uuid:
+ images_list: list[str] = grub.version_list(root_dir)
+ for image_name in images_list:
+ if default_uuid == grub.gen_version_uuid(image_name):
+ return image_name
+ return ''
+ else:
+ return ''
+
+
+def validate_name(image_name: str) -> bool:
+ """Validate image name
+
+ Args:
+ image_name (str): suggested image name
+
+ Returns:
+ bool: validation result
+ """
+ regex_filter = re_compile(r'^[\w\.+-]{1,64}$')
+ if regex_filter.match(image_name):
+ return True
+ return False
+
+
+def is_live_boot() -> bool:
+ """Detect live booted system
+
+ Returns:
+ bool: True if the system currently booted in live mode
+ """
+ regex_filter = re_compile(REGEX_KERNEL_CMDLINE)
+ cmdline: str = Path('/proc/cmdline').read_text()
+ running_image_result = regex_filter.match(cmdline)
+ if running_image_result:
+ boot_type: str = running_image_result.groupdict().get('boot_type', '')
+ if boot_type == 'live':
+ return True
+ return False
+
+def if_not_live_boot(func):
+ """Decorator to call function only if not live boot"""
+ @wraps(func)
+ def wrapper(*args, **kwargs):
+ if not is_live_boot():
+ ret = func(*args, **kwargs)
+ return ret
+ return None
+ return wrapper
+
+def is_running_as_container() -> bool:
+ if Path('/.dockerenv').exists():
+ return True
+ return False
diff --git a/python/vyos/system/raid.py b/python/vyos/system/raid.py
new file mode 100644
index 000000000..5b33d34da
--- /dev/null
+++ b/python/vyos/system/raid.py
@@ -0,0 +1,122 @@
+# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+"""RAID related functions"""
+
+from pathlib import Path
+from shutil import copy
+from dataclasses import dataclass
+
+from vyos.utils.process import cmd, run
+from vyos.system import disk
+
+
+@dataclass
+class RaidDetails:
+ """RAID type"""
+ name: str
+ level: str
+ members: list[str]
+ disks: list[disk.DiskDetails]
+
+
+def raid_create(raid_members: list[str],
+ raid_name: str = 'md0',
+ raid_level: str = 'raid1') -> None:
+ """Create a RAID array
+
+ Args:
+ raid_name (str): a name of array (data, backup, test, etc.)
+ raid_members (list[str]): a list of array members
+ raid_level (str, optional): an array level. Defaults to 'raid1'.
+ """
+ raid_devices_num: int = len(raid_members)
+ raid_members_str: str = ' '.join(raid_members)
+ for part in raid_members:
+ drive: str = disk.partition_parent(part)
+ # set partition type GUID for raid member; cf.
+ # https://en.wikipedia.org/wiki/GUID_Partition_Table#Partition_type_GUIDs
+ command: str = f'sgdisk --typecode=3:A19D880F-05FC-4D3B-A006-743F0F84911E {drive}'
+ cmd(command)
+ command: str = f'mdadm --create /dev/{raid_name} -R --metadata=1.0 \
+ --raid-devices={raid_devices_num} --level={raid_level} \
+ {raid_members_str}'
+
+ cmd(command)
+
+ raid = RaidDetails(
+ name = f'/dev/{raid_name}',
+ level = raid_level,
+ members = raid_members,
+ disks = [disk.from_partition(m) for m in raid_members]
+ )
+
+ return raid
+
+def clear():
+ """Deactivate all RAID arrays"""
+ command: str = 'mdadm --examine --scan'
+ raid_config = cmd(command)
+ if not raid_config:
+ return
+ command: str = 'mdadm --run /dev/md?*'
+ run(command)
+ command: str = 'mdadm --assemble --scan --auto=yes --symlink=no'
+ run(command)
+ command: str = 'mdadm --stop --scan'
+ run(command)
+
+
+def update_initramfs() -> None:
+ """Update initramfs"""
+ mdadm_script = '/etc/initramfs-tools/scripts/local-top/mdadm'
+ copy('/usr/share/initramfs-tools/scripts/local-block/mdadm', mdadm_script)
+ p = Path(mdadm_script)
+ p.write_text(p.read_text().replace('$((COUNT + 1))', '20'))
+ command: str = 'update-initramfs -u'
+ cmd(command)
+
+def update_default(target_dir: str) -> None:
+ """Update /etc/default/mdadm to start MD monitoring daemon at boot
+ """
+ source_mdadm_config = '/etc/default/mdadm'
+ target_mdadm_config = Path(target_dir).joinpath('/etc/default/mdadm')
+ target_mdadm_config_dir = Path(target_mdadm_config).parent
+ Path.mkdir(target_mdadm_config_dir, parents=True, exist_ok=True)
+ s = Path(source_mdadm_config).read_text().replace('START_DAEMON=false',
+ 'START_DAEMON=true')
+ Path(target_mdadm_config).write_text(s)
+
+def get_uuid(device: str) -> str:
+ """Get UUID of a device"""
+ command: str = f'tune2fs -l {device}'
+ l = cmd(command).splitlines()
+ uuid = next((x for x in l if x.startswith('Filesystem UUID')), '')
+ return uuid.split(':')[1].strip() if uuid else ''
+
+def get_uuids(raid_details: RaidDetails) -> tuple[str]:
+ """Get UUIDs of RAID members
+
+ Args:
+ raid_name (str): a name of array (data, backup, test, etc.)
+
+ Returns:
+ tuple[str]: root_disk uuid, root_md uuid
+ """
+ raid_name: str = raid_details.name
+ root_partition: str = raid_details.members[0]
+ uuid_root_disk: str = get_uuid(root_partition)
+ uuid_root_md: str = get_uuid(raid_name)
+ return uuid_root_disk, uuid_root_md
diff --git a/python/vyos/template.py b/python/vyos/template.py
index 3be486cc4..456239568 100644
--- a/python/vyos/template.py
+++ b/python/vyos/template.py
@@ -316,20 +316,15 @@ def is_ipv6(text):
except: return False
@register_filter('first_host_address')
-def first_host_address(text):
+def first_host_address(prefix):
""" Return first usable (host) IP address from given prefix.
Example:
- 10.0.0.0/24 -> 10.0.0.1
- 2001:db8::/64 -> 2001:db8::
"""
from ipaddress import ip_interface
- from ipaddress import IPv4Network
- from ipaddress import IPv6Network
-
- addr = ip_interface(text)
- if addr.version == 4:
- return str(addr.ip +1)
- return str(addr.ip)
+ tmp = ip_interface(prefix).network
+ return str(tmp.network_address +1)
@register_filter('last_host_address')
def last_host_address(text):
@@ -579,19 +574,20 @@ def nft_rule(rule_conf, fw_hook, fw_name, rule_id, ip_name='ip'):
return parse_rule(rule_conf, fw_hook, fw_name, rule_id, ip_name)
@register_filter('nft_default_rule')
-def nft_default_rule(fw_conf, fw_name, ipv6=False):
+def nft_default_rule(fw_conf, fw_name, family):
output = ['counter']
default_action = fw_conf['default_action']
+ #family = 'ipv6' if ipv6 else 'ipv4'
- if 'enable_default_log' in fw_conf:
+ if 'default_log' in fw_conf:
action_suffix = default_action[:1].upper()
- output.append(f'log prefix "[{fw_name[:19]}-default-{action_suffix}]"')
+ output.append(f'log prefix "[{family}-{fw_name[:19]}-default-{action_suffix}]"')
#output.append(nft_action(default_action))
output.append(f'{default_action}')
if 'default_jump_target' in fw_conf:
target = fw_conf['default_jump_target']
- def_suffix = '6' if ipv6 else ''
+ def_suffix = '6' if family == 'ipv6' else ''
output.append(f'NAME{def_suffix}_{target}')
output.append(f'comment "{fw_name} default-action {default_action}"')
@@ -601,7 +597,7 @@ def nft_default_rule(fw_conf, fw_name, ipv6=False):
def nft_state_policy(conf, state):
out = [f'ct state {state}']
- if 'log' in conf and 'enable' in conf['log']:
+ if 'log' in conf:
log_state = state[:3].upper()
log_action = (conf['action'] if 'action' in conf else 'accept')[:1].upper()
out.append(f'log prefix "[STATE-POLICY-{log_state}-{log_action}]"')
@@ -663,8 +659,8 @@ def nat_static_rule(rule_conf, rule_id, nat_type):
from vyos.nat import parse_nat_static_rule
return parse_nat_static_rule(rule_conf, rule_id, nat_type)
-@register_filter('conntrack_ignore_rule')
-def conntrack_ignore_rule(rule_conf, rule_id, ipv6=False):
+@register_filter('conntrack_rule')
+def conntrack_rule(rule_conf, rule_id, action, ipv6=False):
ip_prefix = 'ip6' if ipv6 else 'ip'
def_suffix = '6' if ipv6 else ''
output = []
@@ -675,11 +671,15 @@ def conntrack_ignore_rule(rule_conf, rule_id, ipv6=False):
output.append(f'iifname {ifname}')
if 'protocol' in rule_conf:
- proto = rule_conf['protocol']
+ if action != 'timeout':
+ proto = rule_conf['protocol']
+ else:
+ for protocol, protocol_config in rule_conf['protocol'].items():
+ proto = protocol
output.append(f'meta l4proto {proto}')
tcp_flags = dict_search_args(rule_conf, 'tcp', 'flags')
- if tcp_flags:
+ if tcp_flags and action != 'timeout':
from vyos.firewall import parse_tcp_flags
output.append(parse_tcp_flags(tcp_flags))
@@ -742,11 +742,24 @@ def conntrack_ignore_rule(rule_conf, rule_id, ipv6=False):
output.append(f'{proto} {prefix}port {operator} @P_{group_name}')
- output.append('counter notrack')
- output.append(f'comment "ignore-{rule_id}"')
+ if action == 'ignore':
+ output.append('counter notrack')
+ output.append(f'comment "ignore-{rule_id}"')
+ else:
+ output.append(f'counter ct timeout set ct-timeout-{rule_id}')
+ output.append(f'comment "timeout-{rule_id}"')
return " ".join(output)
+@register_filter('conntrack_ct_policy')
+def conntrack_ct_policy(protocol_conf):
+ output = []
+ for item in protocol_conf:
+ item_value = protocol_conf[item]
+ output.append(f'{item}: {item_value}')
+
+ return ", ".join(output)
+
@register_filter('range_to_regex')
def range_to_regex(num_range):
"""Convert range of numbers or list of ranges
@@ -773,6 +786,129 @@ def range_to_regex(num_range):
regex = range_to_regex(num_range)
return f'({regex})'
+@register_filter('kea_address_json')
+def kea_address_json(addresses):
+ from json import dumps
+ from vyos.utils.network import is_addr_assigned
+
+ out = []
+
+ for address in addresses:
+ ifname = is_addr_assigned(address, return_ifname=True)
+
+ if not ifname:
+ continue
+
+ out.append(f'{ifname}/{address}')
+
+ return dumps(out)
+
+@register_filter('kea_failover_json')
+def kea_failover_json(config):
+ from json import dumps
+
+ source_addr = config['source_address']
+ remote_addr = config['remote']
+
+ data = {
+ 'this-server-name': os.uname()[1],
+ 'mode': 'hot-standby',
+ 'heartbeat-delay': 10000,
+ 'max-response-delay': 10000,
+ 'max-ack-delay': 5000,
+ 'max-unacked-clients': 0,
+ 'peers': [
+ {
+ 'name': os.uname()[1],
+ 'url': f'http://{source_addr}:647/',
+ 'role': 'standby' if config['status'] == 'secondary' else 'primary',
+ 'auto-failover': True
+ },
+ {
+ 'name': config['name'],
+ 'url': f'http://{remote_addr}:647/',
+ 'role': 'primary' if config['status'] == 'secondary' else 'standby',
+ 'auto-failover': True
+ }]
+ }
+
+ if 'ca_cert_file' in config:
+ data['trust-anchor'] = config['ca_cert_file']
+
+ if 'cert_file' in config:
+ data['cert-file'] = config['cert_file']
+
+ if 'cert_key_file' in config:
+ data['key-file'] = config['cert_key_file']
+
+ return dumps(data)
+
+@register_filter('kea_shared_network_json')
+def kea_shared_network_json(shared_networks):
+ from vyos.kea import kea_parse_options
+ from vyos.kea import kea_parse_subnet
+ from json import dumps
+ out = []
+
+ for name, config in shared_networks.items():
+ if 'disable' in config:
+ continue
+
+ network = {
+ 'name': name,
+ 'authoritative': ('authoritative' in config),
+ 'subnet4': []
+ }
+
+ if 'option' in config:
+ network['option-data'] = kea_parse_options(config['option'])
+
+ if 'bootfile_name' in config['option']:
+ network['boot-file-name'] = config['option']['bootfile_name']
+
+ if 'bootfile_server' in config['option']:
+ network['next-server'] = config['option']['bootfile_server']
+
+ if 'subnet' in config:
+ for subnet, subnet_config in config['subnet'].items():
+ if 'disable' in subnet_config:
+ continue
+ network['subnet4'].append(kea_parse_subnet(subnet, subnet_config))
+
+ out.append(network)
+
+ return dumps(out, indent=4)
+
+@register_filter('kea6_shared_network_json')
+def kea6_shared_network_json(shared_networks):
+ from vyos.kea import kea6_parse_options
+ from vyos.kea import kea6_parse_subnet
+ from json import dumps
+ out = []
+
+ for name, config in shared_networks.items():
+ if 'disable' in config:
+ continue
+
+ network = {
+ 'name': name,
+ 'subnet6': []
+ }
+
+ if 'common_options' in config:
+ network['option-data'] = kea6_parse_options(config['common_options'])
+
+ if 'interface' in config:
+ network['interface'] = config['interface']
+
+ if 'subnet' in config:
+ for subnet, subnet_config in config['subnet'].items():
+ network['subnet6'].append(kea6_parse_subnet(subnet, subnet_config))
+
+ out.append(network)
+
+ return dumps(out, indent=4)
+
@register_test('vyos_defined')
def vyos_defined(value, test_value=None, var_type=None):
"""
diff --git a/python/vyos/utils/config.py b/python/vyos/utils/config.py
index bd363ce46..33047010b 100644
--- a/python/vyos/utils/config.py
+++ b/python/vyos/utils/config.py
@@ -1,4 +1,4 @@
-# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io>
+# Copyright 2023-2024 VyOS maintainers and contributors <maintainers@vyos.io>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -31,4 +31,9 @@ def read_saved_value(path: list):
if not ct.exists(path):
return ''
res = ct.return_values(path)
- return res[0] if len(res) == 1 else res
+ if len(res) == 1:
+ return res[0]
+ res = ct.list_nodes(path)
+ if len(res) == 1:
+ return ' '.join(res)
+ return res
diff --git a/python/vyos/utils/convert.py b/python/vyos/utils/convert.py
index 9a8a1ff7d..c02f0071e 100644
--- a/python/vyos/utils/convert.py
+++ b/python/vyos/utils/convert.py
@@ -52,7 +52,8 @@ def seconds_to_human(s, separator=""):
return result
-def bytes_to_human(bytes, initial_exponent=0, precision=2):
+def bytes_to_human(bytes, initial_exponent=0, precision=2,
+ int_below_exponent=0):
""" Converts a value in bytes to a human-readable size string like 640 KB
The initial_exponent parameter is the exponent of 2,
@@ -68,6 +69,8 @@ def bytes_to_human(bytes, initial_exponent=0, precision=2):
# log2 is a float, while range checking requires an int
exponent = int(log2(bytes))
+ if exponent < int_below_exponent:
+ precision = 0
if exponent < 10:
value = bytes
diff --git a/python/vyos/utils/dict.py b/python/vyos/utils/dict.py
index 9484eacdd..d36b6fcfb 100644
--- a/python/vyos/utils/dict.py
+++ b/python/vyos/utils/dict.py
@@ -199,6 +199,31 @@ def dict_search_recursive(dict_object, key, path=[]):
for x in dict_search_recursive(j, key, new_path):
yield x
+
+def dict_set(key_path, value, dict_object):
+ """ Set value to Python dictionary (dict_object) using path to key delimited by dot (.).
+ The key will be added if it does not exist.
+ """
+ path_list = key_path.split(".")
+ dynamic_dict = dict_object
+ if len(path_list) > 0:
+ for i in range(0, len(path_list)-1):
+ dynamic_dict = dynamic_dict[path_list[i]]
+ dynamic_dict[path_list[len(path_list)-1]] = value
+
+def dict_delete(key_path, dict_object):
+ """ Delete key in Python dictionary (dict_object) using path to key delimited by dot (.).
+ """
+ path_dict = dict_object
+ path_list = key_path.split('.')
+ inside = path_list[:-1]
+ if not inside:
+ del dict_object[path_list]
+ else:
+ for key in path_list[:-1]:
+ path_dict = path_dict[key]
+ del path_dict[path_list[len(path_list)-1]]
+
def dict_to_list(d, save_key_to=None):
""" Convert a dict to a list of dicts.
@@ -228,6 +253,39 @@ def dict_to_list(d, save_key_to=None):
return collect
+def dict_to_paths_values(conf: dict) -> dict:
+ """
+ Convert nested dictionary to simple dictionary, where key is a path is delimited by dot (.).
+ """
+ list_of_paths = []
+ dict_of_options ={}
+ for path in dict_to_key_paths(conf):
+ str_path = '.'.join(path)
+ list_of_paths.append(str_path)
+
+ for path in list_of_paths:
+ dict_of_options[path] = dict_search(path,conf)
+
+ return dict_of_options
+def dict_to_key_paths(d: dict) -> list:
+ """ Generator to return list of key paths from dict of list[str]|str
+ """
+ def func(d, path):
+ if isinstance(d, dict):
+ if not d:
+ yield path
+ for k, v in d.items():
+ for r in func(v, path + [k]):
+ yield r
+ elif isinstance(d, list):
+ yield path
+ elif isinstance(d, str):
+ yield path
+ else:
+ raise ValueError('object is not a dict of strings/list of strings')
+ for r in func(d, []):
+ yield r
+
def dict_to_paths(d: dict) -> list:
""" Generator to return list of paths from dict of list[str]|str
"""
@@ -305,3 +363,4 @@ class FixedDict(dict):
if k not in self._allowed:
raise ConfigError(f'Option "{k}" has no defined default')
super().__setitem__(k, v)
+
diff --git a/python/vyos/utils/file.py b/python/vyos/utils/file.py
index 667a2464b..c566f0334 100644
--- a/python/vyos/utils/file.py
+++ b/python/vyos/utils/file.py
@@ -83,21 +83,34 @@ def read_json(fname, defaultonfailure=None):
return defaultonfailure
raise e
-def chown(path, user, group):
+def chown(path, user=None, group=None, recursive=False):
""" change file/directory owner """
from pwd import getpwnam
from grp import getgrnam
- if user is None or group is None:
+ if user is None and group is None:
return False
# path may also be an open file descriptor
if not isinstance(path, int) and not os.path.exists(path):
return False
- uid = getpwnam(user).pw_uid
- gid = getgrnam(group).gr_gid
- os.chown(path, uid, gid)
+ # keep current value if not specified otherwise
+ uid = -1
+ gid = -1
+
+ if user:
+ uid = getpwnam(user).pw_uid
+ if group:
+ gid = getgrnam(group).gr_gid
+
+ if recursive:
+ for dirpath, dirnames, filenames in os.walk(path):
+ os.chown(dirpath, uid, gid)
+ for filename in filenames:
+ os.chown(os.path.join(dirpath, filename), uid, gid)
+ else:
+ os.chown(path, uid, gid)
return True
@@ -134,6 +147,24 @@ def chmod_755(path):
S_IROTH | S_IXOTH
chmod(path, bitmask)
+def chmod_2775(path):
+ """ user/group permissions with set-group-id bit set """
+ from stat import S_ISGID, S_IRWXU, S_IRWXG, S_IROTH, S_IXOTH
+
+ bitmask = S_ISGID | S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH
+ chmod(path, bitmask)
+
+def chmod_775(path):
+ """ Make file executable by all """
+ from stat import S_IRUSR, S_IWUSR, S_IXUSR, S_IRGRP, S_IWGRP, S_IXGRP, S_IROTH, S_IXOTH
+
+ bitmask = S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IWGRP | S_IXGRP | \
+ S_IROTH | S_IXOTH
+ chmod(path, bitmask)
+
+def file_permissions(path):
+ """ Return file permissions in string format, e.g '0755' """
+ return oct(os.stat(path).st_mode)[4:]
def makedir(path, user=None, group=None):
if os.path.exists(path):
diff --git a/python/vyos/utils/io.py b/python/vyos/utils/io.py
index 8790cbaac..0afaf695c 100644
--- a/python/vyos/utils/io.py
+++ b/python/vyos/utils/io.py
@@ -13,6 +13,8 @@
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+from typing import Callable
+
def print_error(str='', end='\n'):
"""
Print `str` to stderr, terminated with `end`.
@@ -24,13 +26,18 @@ def print_error(str='', end='\n'):
sys.stderr.write(end)
sys.stderr.flush()
-def ask_input(question, default='', numeric_only=False, valid_responses=[]):
+def ask_input(question, default='', numeric_only=False, valid_responses=[],
+ no_echo=False):
+ from getpass import getpass
question_out = question
if default:
question_out += f' (Default: {default})'
response = ''
while True:
- response = input(question_out + ' ').strip()
+ if not no_echo:
+ response = input(question_out + ' ').strip()
+ else:
+ response = getpass(question_out + ' ').strip()
if not response and default:
return default
if numeric_only:
@@ -72,3 +79,26 @@ def is_dumb_terminal():
"""Check if the current TTY is dumb, so that we can disable advanced terminal features."""
import os
return os.getenv('TERM') in ['vt100', 'dumb']
+
+def select_entry(l: list, list_msg: str = '', prompt_msg: str = '',
+ list_format: Callable = None,) -> str:
+ """Select an entry from a list
+
+ Args:
+ l (list): a list of entries
+ list_msg (str): a message to print before listing the entries
+ prompt_msg (str): a message to print as prompt for selection
+
+ Returns:
+ str: a selected entry
+ """
+ en = list(enumerate(l, 1))
+ print(list_msg)
+ for i, e in en:
+ if list_format:
+ print(f'\t{i}: {list_format(e)}')
+ else:
+ print(f'\t{i}: {e}')
+ select = ask_input(prompt_msg, numeric_only=True,
+ valid_responses=range(1, len(l)+1))
+ return next(filter(lambda x: x[0] == select, en))[1]
diff --git a/python/vyos/utils/network.py b/python/vyos/utils/network.py
index 9354bd495..cac59475d 100644
--- a/python/vyos/utils/network.py
+++ b/python/vyos/utils/network.py
@@ -61,14 +61,17 @@ def get_vrf_members(vrf: str) -> list:
"""
import json
from vyos.utils.process import cmd
- if not interface_exists(vrf):
- raise ValueError(f'VRF "{vrf}" does not exist!')
- output = cmd(f'ip --json --brief link show master {vrf}')
- answer = json.loads(output)
interfaces = []
- for data in answer:
- if 'ifname' in data:
- interfaces.append(data.get('ifname'))
+ try:
+ if not interface_exists(vrf):
+ raise ValueError(f'VRF "{vrf}" does not exist!')
+ output = cmd(f'ip --json --brief link show vrf {vrf}')
+ answer = json.loads(output)
+ for data in answer:
+ if 'ifname' in data:
+ interfaces.append(data.get('ifname'))
+ except:
+ pass
return interfaces
def get_interface_vrf(interface):
@@ -156,7 +159,9 @@ def is_wwan_connected(interface):
""" Determine if a given WWAN interface, e.g. wwan0 is connected to the
carrier network or not """
import json
+ from vyos.utils.dict import dict_search
from vyos.utils.process import cmd
+ from vyos.utils.process import is_systemd_service_active
if not interface.startswith('wwan'):
raise ValueError(f'Specified interface "{interface}" is not a WWAN interface')
@@ -197,6 +202,22 @@ def get_all_vrfs():
data[name] = entry
return data
+def interface_list() -> list:
+ from vyos.ifconfig import Section
+ """
+ Get list of interfaces in system
+ :rtype: list
+ """
+ return Section.interfaces()
+
+
+def vrf_list() -> list:
+ """
+ Get list of VRFs in system
+ :rtype: list
+ """
+ return list(get_all_vrfs().keys())
+
def mac2eui64(mac, prefix=None):
"""
Convert a MAC address to a EUI64 address or, with prefix provided, a full
@@ -289,7 +310,7 @@ def is_ipv6_link_local(addr):
return False
-def is_addr_assigned(ip_address, vrf=None) -> bool:
+def is_addr_assigned(ip_address, vrf=None, return_ifname=False) -> bool | str:
""" Verify if the given IPv4/IPv6 address is assigned to any interface """
from netifaces import interfaces
from vyos.utils.network import get_interface_config
@@ -304,7 +325,7 @@ def is_addr_assigned(ip_address, vrf=None) -> bool:
continue
if is_intf_addr_assigned(interface, ip_address):
- return True
+ return interface if return_ifname else True
return False
@@ -468,3 +489,70 @@ def get_vxlan_vlan_tunnels(interface: str) -> list:
os_configured_vlan_ids.append(str(vlanStart))
return os_configured_vlan_ids
+
+def get_vxlan_vni_filter(interface: str) -> list:
+ """ Return a list of strings with VNIs configured in the Kernel"""
+ from json import loads
+ from vyos.utils.process import cmd
+
+ if not interface.startswith('vxlan'):
+ raise ValueError('Only applicable for VXLAN interfaces!')
+
+ # Determine current OS Kernel configured VNI filters in VXLAN interface
+ #
+ # $ bridge -j vni show dev vxlan1
+ # [{"ifname":"vxlan1","vnis":[{"vni":100},{"vni":200},{"vni":300,"vniEnd":399}]}]
+ #
+ # Example output: ['10010', '10020', '10021', '10022']
+ os_configured_vnis = []
+ tmp = loads(cmd(f'bridge --json vni show dev {interface}'))
+ if tmp:
+ for tunnel in tmp[0].get('vnis', {}):
+ vniStart = tunnel['vni']
+ if 'vniEnd' in tunnel:
+ vniEnd = tunnel['vniEnd']
+ # Build a real list for user VNIs
+ vni_list = list(range(vniStart, vniEnd +1))
+ # Convert list of integers to list or strings
+ os_configured_vnis.extend(map(str, vni_list))
+ # Proceed with next tunnel - this one is complete
+ continue
+
+ # Add single tunel id - not part of a range
+ os_configured_vnis.append(str(vniStart))
+
+ return os_configured_vnis
+
+# Calculate prefix length of an IPv6 range, where possible
+# Python-ified from source: https://gitlab.isc.org/isc-projects/dhcp/-/blob/master/keama/confparse.c#L4591
+def ipv6_prefix_length(low, high):
+ import socket
+
+ bytemasks = [0x80, 0xc0, 0xe0, 0xf0, 0xf8, 0xfc, 0xfe, 0xff]
+
+ try:
+ lo = bytearray(socket.inet_pton(socket.AF_INET6, low))
+ hi = bytearray(socket.inet_pton(socket.AF_INET6, high))
+ except:
+ return None
+
+ xor = bytearray(a ^ b for a, b in zip(lo, hi))
+
+ plen = 0
+ while plen < 128 and xor[plen // 8] == 0:
+ plen += 8
+
+ if plen == 128:
+ return plen
+
+ for i in range((plen // 8) + 1, 16):
+ if xor[i] != 0:
+ return None
+
+ for i in range(8):
+ msk = ~xor[plen // 8] & 0xff
+
+ if msk == bytemasks[i]:
+ return plen + i + 1
+
+ return None
diff --git a/python/vyos/utils/process.py b/python/vyos/utils/process.py
index e09c7d86d..bd0644bc0 100644
--- a/python/vyos/utils/process.py
+++ b/python/vyos/utils/process.py
@@ -204,17 +204,32 @@ def process_running(pid_file):
pid = f.read().strip()
return pid_exists(int(pid))
-def process_named_running(name, cmdline: str=None):
+def process_named_running(name: str, cmdline: str=None, timeout: int=0):
""" Checks if process with given name is running and returns its PID.
If Process is not running, return None
"""
from psutil import process_iter
- for p in process_iter(['name', 'pid', 'cmdline']):
- if cmdline:
- if p.info['name'] == name and cmdline in p.info['cmdline']:
+ def check_process(name, cmdline):
+ for p in process_iter(['name', 'pid', 'cmdline']):
+ if cmdline:
+ if name in p.info['name'] and cmdline in p.info['cmdline']:
+ return p.info['pid']
+ elif name in p.info['name']:
return p.info['pid']
- elif p.info['name'] == name:
- return p.info['pid']
+ return None
+ if timeout:
+ import time
+ time_expire = time.time() + timeout
+ while True:
+ tmp = check_process(name, cmdline)
+ if not tmp:
+ if time.time() > time_expire:
+ break
+ time.sleep(0.100) # wait 250ms
+ continue
+ return tmp
+ else:
+ return check_process(name, cmdline)
return None
def is_systemd_service_active(service):
diff --git a/python/vyos/vpp.py b/python/vyos/vpp.py
deleted file mode 100644
index 76e5d29c3..000000000
--- a/python/vyos/vpp.py
+++ /dev/null
@@ -1,315 +0,0 @@
-# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io>
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library. If not, see <http://www.gnu.org/licenses/>.
-
-from functools import wraps
-from pathlib import Path
-from re import search as re_search, fullmatch as re_fullmatch, MULTILINE as re_M
-from subprocess import run
-from time import sleep
-
-from vpp_papi import VPPApiClient
-from vpp_papi import VPPIOError, VPPValueError
-
-
-class VPPControl:
- """Control VPP network stack
- """
-
- class _Decorators:
- """Decorators for VPPControl
- """
-
- @classmethod
- def api_call(cls, decorated_func):
- """Check if API is connected before API call
-
- Args:
- decorated_func: function to decorate
-
- Raises:
- VPPIOError: Connection to API is not established
- """
-
- @wraps(decorated_func)
- def api_safe_wrapper(cls, *args, **kwargs):
- if not cls.vpp_api_client.transport.connected:
- raise VPPIOError(2, 'VPP API is not connected')
- return decorated_func(cls, *args, **kwargs)
-
- return api_safe_wrapper
-
- @classmethod
- def check_retval(cls, decorated_func):
- """Check retval from API response
-
- Args:
- decorated_func: function to decorate
-
- Raises:
- VPPValueError: raised when retval is not 0
- """
-
- @wraps(decorated_func)
- def check_retval_wrapper(cls, *args, **kwargs):
- return_value = decorated_func(cls, *args, **kwargs)
- if not return_value.retval == 0:
- raise VPPValueError(
- f'VPP API call failed: {return_value.retval}')
- return return_value
-
- return check_retval_wrapper
-
- def __init__(self, attempts: int = 5, interval: int = 1000) -> None:
- """Create VPP API connection
-
- Args:
- attempts (int, optional): attempts to connect. Defaults to 5.
- interval (int, optional): interval between attempts in ms. Defaults to 1000.
-
- Raises:
- VPPIOError: Connection to API cannot be established
- """
- self.vpp_api_client = VPPApiClient()
- # connect with interval
- while attempts:
- try:
- attempts -= 1
- self.vpp_api_client.connect('vpp-vyos')
- break
- except (ConnectionRefusedError, FileNotFoundError) as err:
- print(f'VPP API connection timeout: {err}')
- sleep(interval / 1000)
- # raise exception if connection was not successful in the end
- if not self.vpp_api_client.transport.connected:
- raise VPPIOError(2, 'Cannot connect to VPP API')
-
- def __del__(self) -> None:
- """Disconnect from VPP API (destructor)
- """
- self.disconnect()
-
- def disconnect(self) -> None:
- """Disconnect from VPP API
- """
- if self.vpp_api_client.transport.connected:
- self.vpp_api_client.disconnect()
-
- @_Decorators.check_retval
- @_Decorators.api_call
- def cli_cmd(self, command: str):
- """Send raw CLI command
-
- Args:
- command (str): command to send
-
- Returns:
- vpp_papi.vpp_serializer.cli_inband_reply: CLI reply class
- """
- return self.vpp_api_client.api.cli_inband(cmd=command)
-
- @_Decorators.api_call
- def get_mac(self, ifname: str) -> str:
- """Find MAC address by interface name in VPP
-
- Args:
- ifname (str): interface name inside VPP
-
- Returns:
- str: MAC address
- """
- for iface in self.vpp_api_client.api.sw_interface_dump():
- if iface.interface_name == ifname:
- return iface.l2_address.mac_string
- return ''
-
- @_Decorators.api_call
- def get_sw_if_index(self, ifname: str) -> int | None:
- """Find interface index by interface name in VPP
-
- Args:
- ifname (str): interface name inside VPP
-
- Returns:
- int | None: Interface index or None (if was not fount)
- """
- for iface in self.vpp_api_client.api.sw_interface_dump():
- if iface.interface_name == ifname:
- return iface.sw_if_index
- return None
-
- @_Decorators.check_retval
- @_Decorators.api_call
- def lcp_pair_add(self, iface_name_vpp: str, iface_name_kernel: str) -> None:
- """Create LCP interface pair between VPP and kernel
-
- Args:
- iface_name_vpp (str): interface name in VPP
- iface_name_kernel (str): interface name in kernel
- """
- iface_index = self.get_sw_if_index(iface_name_vpp)
- if iface_index:
- return self.vpp_api_client.api.lcp_itf_pair_add_del(
- is_add=True,
- sw_if_index=iface_index,
- host_if_name=iface_name_kernel)
-
- @_Decorators.check_retval
- @_Decorators.api_call
- def lcp_pair_del(self, iface_name_vpp: str, iface_name_kernel: str) -> None:
- """Delete LCP interface pair between VPP and kernel
-
- Args:
- iface_name_vpp (str): interface name in VPP
- iface_name_kernel (str): interface name in kernel
- """
- iface_index = self.get_sw_if_index(iface_name_vpp)
- if iface_index:
- return self.vpp_api_client.api.lcp_itf_pair_add_del(
- is_add=False,
- sw_if_index=iface_index,
- host_if_name=iface_name_kernel)
-
- @_Decorators.check_retval
- @_Decorators.api_call
- def iface_rxmode(self, iface_name: str, rx_mode: str) -> None:
- """Set interface rx-mode in VPP
-
- Args:
- iface_name (str): interface name in VPP
- rx_mode (str): mode (polling, interrupt, adaptive)
- """
- modes_dict: dict[str, int] = {
- 'polling': 1,
- 'interrupt': 2,
- 'adaptive': 3
- }
- if rx_mode not in modes_dict:
- raise VPPValueError(f'Mode {rx_mode} is not known')
- iface_index = self.get_sw_if_index(iface_name)
- return self.vpp_api_client.api.sw_interface_set_rx_mode(
- sw_if_index=iface_index, mode=modes_dict[rx_mode])
-
- @_Decorators.api_call
- def get_pci_addr(self, ifname: str) -> str:
- """Find PCI address of interface by interface name in VPP
-
- Args:
- ifname (str): interface name inside VPP
-
- Returns:
- str: PCI address
- """
- hw_info = self.cli_cmd(f'show hardware-interfaces {ifname}').reply
-
- regex_filter = r'^\s+pci: device (?P<device>\w+:\w+) subsystem (?P<subsystem>\w+:\w+) address (?P<address>\w+:\w+:\w+\.\w+) numa (?P<numa>\w+)$'
- re_obj = re_search(regex_filter, hw_info, re_M)
-
- # return empty string if no interface or no PCI info was found
- if not hw_info or not re_obj:
- return ''
-
- address = re_obj.groupdict().get('address', '')
-
- # we need to modify address to math kernel style
- # for example: 0000:06:14.00 -> 0000:06:14.0
- address_chunks: list[str] = address.split('.')
- address_normalized: str = f'{address_chunks[0]}.{int(address_chunks[1])}'
-
- return address_normalized
-
-
-class HostControl:
- """Control Linux host
- """
-
- @staticmethod
- def pci_rescan(pci_addr: str = '') -> None:
- """Rescan PCI device by removing it and rescan PCI bus
-
- If PCI address is not defined - just rescan PCI bus
-
- Args:
- address (str, optional): PCI address of device. Defaults to ''.
- """
- if pci_addr:
- device_file = Path(f'/sys/bus/pci/devices/{pci_addr}/remove')
- if device_file.exists():
- device_file.write_text('1')
- # wait 10 seconds max until device will be removed
- attempts = 100
- while device_file.exists() and attempts:
- attempts -= 1
- sleep(0.1)
- if device_file.exists():
- raise TimeoutError(
- f'Timeout was reached for removing PCI device {pci_addr}'
- )
- else:
- raise FileNotFoundError(f'PCI device {pci_addr} does not exist')
- rescan_file = Path('/sys/bus/pci/rescan')
- rescan_file.write_text('1')
- if pci_addr:
- # wait 10 seconds max until device will be installed
- attempts = 100
- while not device_file.exists() and attempts:
- attempts -= 1
- sleep(0.1)
- if not device_file.exists():
- raise TimeoutError(
- f'Timeout was reached for installing PCI device {pci_addr}')
-
- @staticmethod
- def get_eth_name(pci_addr: str) -> str:
- """Find Ethernet interface name by PCI address
-
- Args:
- pci_addr (str): PCI address
-
- Raises:
- FileNotFoundError: no Ethernet interface was found
-
- Returns:
- str: Ethernet interface name
- """
- # find all PCI devices with eth* names
- net_devs: dict[str, str] = {}
- net_devs_dir = Path('/sys/class/net')
- regex_filter = r'^/sys/devices/pci[\w/:\.]+/(?P<pci_addr>\w+:\w+:\w+\.\w+)/[\w/:\.]+/(?P<iface_name>eth\d+)$'
- for dir in net_devs_dir.iterdir():
- real_dir: str = dir.resolve().as_posix()
- re_obj = re_fullmatch(regex_filter, real_dir)
- if re_obj:
- iface_name: str = re_obj.group('iface_name')
- iface_addr: str = re_obj.group('pci_addr')
- net_devs.update({iface_addr: iface_name})
- # match to provided PCI address and return a name if found
- if pci_addr in net_devs:
- return net_devs[pci_addr]
- # raise error if device was not found
- raise FileNotFoundError(
- f'PCI device {pci_addr} not found in ethernet interfaces')
-
- @staticmethod
- def rename_iface(name_old: str, name_new: str) -> None:
- """Rename interface
-
- Args:
- name_old (str): old name
- name_new (str): new name
- """
- rename_cmd: list[str] = [
- 'ip', 'link', 'set', name_old, 'name', name_new
- ]
- run(rename_cmd)