summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/vyos/config_mgmt.py20
-rw-r--r--python/vyos/configdiff.py24
-rw-r--r--python/vyos/configsession.py16
-rw-r--r--python/vyos/defaults.py9
-rw-r--r--python/vyos/frr.py6
-rw-r--r--python/vyos/ifconfig/interface.py14
-rw-r--r--python/vyos/ifconfig/macsec.py4
-rw-r--r--python/vyos/ifconfig/vxlan.py21
-rw-r--r--python/vyos/progressbar.py33
-rw-r--r--python/vyos/remote.py168
-rw-r--r--python/vyos/system/__init__.py18
-rw-r--r--python/vyos/system/compat.py316
-rw-r--r--python/vyos/system/disk.py217
-rw-r--r--python/vyos/system/grub.py340
-rw-r--r--python/vyos/system/image.py268
-rw-r--r--python/vyos/system/raid.py115
-rw-r--r--python/vyos/template.py6
-rw-r--r--python/vyos/utils/convert.py5
-rw-r--r--python/vyos/utils/file.py6
-rw-r--r--python/vyos/utils/io.py35
-rw-r--r--python/vyos/utils/network.py50
21 files changed, 1607 insertions, 84 deletions
diff --git a/python/vyos/config_mgmt.py b/python/vyos/config_mgmt.py
index 654a8d698..df7240c88 100644
--- a/python/vyos/config_mgmt.py
+++ b/python/vyos/config_mgmt.py
@@ -22,10 +22,11 @@ import logging
from typing import Optional, Tuple, Union
from filecmp import cmp
from datetime import datetime
-from textwrap import dedent
+from textwrap import dedent, indent
from pathlib import Path
from tabulate import tabulate
from shutil import copy, chown
+from urllib.parse import urlsplit, urlunsplit
from vyos.config import Config
from vyos.configtree import ConfigTree, ConfigTreeError, show_diff
@@ -377,9 +378,22 @@ Proceed ?'''
remote_file = f'config.boot-{hostname}.{timestamp}'
source_address = self.source_address
+ if self.effective_locations:
+ print("Archiving config...")
for location in self.effective_locations:
- upload(archive_config_file, f'{location}/{remote_file}',
- source_host=source_address)
+ url = urlsplit(location)
+ _, _, netloc = url.netloc.rpartition("@")
+ redacted_location = urlunsplit(url._replace(netloc=netloc))
+ print(f" {redacted_location}", end=" ", flush=True)
+ try:
+ upload(archive_config_file, f'{location}/{remote_file}',
+ source_host=source_address, raise_error=True)
+ print("OK")
+ except Exception as e:
+ print("FAILED!")
+ print()
+ print(indent(str(e), " > "))
+ print()
# op-mode functions
#
diff --git a/python/vyos/configdiff.py b/python/vyos/configdiff.py
index 1ec2dfafe..03b06c6d9 100644
--- a/python/vyos/configdiff.py
+++ b/python/vyos/configdiff.py
@@ -165,6 +165,30 @@ class ConfigDiff(object):
return True
return False
+ def node_changed_presence(self, path=[]) -> bool:
+ if self._diff_tree is None:
+ raise NotImplementedError("diff_tree class not available")
+
+ path = self._make_path(path)
+ before = self._diff_tree.left.exists(path)
+ after = self._diff_tree.right.exists(path)
+ return (before and not after) or (not before and after)
+
+ def node_changed_children(self, path=[]) -> list:
+ if self._diff_tree is None:
+ raise NotImplementedError("diff_tree class not available")
+
+ path = self._make_path(path)
+ add = self._diff_tree.add
+ sub = self._diff_tree.sub
+ children = set()
+ if add.exists(path):
+ children.update(add.list_nodes(path))
+ if sub.exists(path):
+ children.update(sub.list_nodes(path))
+
+ return list(children)
+
def get_child_nodes_diff_str(self, path=[]):
ret = {'add': {}, 'change': {}, 'delete': {}}
diff --git a/python/vyos/configsession.py b/python/vyos/configsession.py
index 6d4b2af59..90842b749 100644
--- a/python/vyos/configsession.py
+++ b/python/vyos/configsession.py
@@ -30,11 +30,15 @@ SHOW_CONFIG = ['/bin/cli-shell-api', 'showConfig']
LOAD_CONFIG = ['/bin/cli-shell-api', 'loadFile']
MIGRATE_LOAD_CONFIG = ['/usr/libexec/vyos/vyos-load-config.py']
SAVE_CONFIG = ['/usr/libexec/vyos/vyos-save-config.py']
-INSTALL_IMAGE = ['/opt/vyatta/sbin/install-image', '--url']
-REMOVE_IMAGE = ['/opt/vyatta/bin/vyatta-boot-image.pl', '--del']
+INSTALL_IMAGE = ['/usr/libexec/vyos/op_mode/image_installer.py',
+ '--action', 'add', '--no-prompt', '--image-path']
+REMOVE_IMAGE = ['/usr/libexec/vyos/op_mode/image_manager.py',
+ '--action', 'delete', '--no-prompt', '--image-name']
GENERATE = ['/opt/vyatta/bin/vyatta-op-cmd-wrapper', 'generate']
SHOW = ['/opt/vyatta/bin/vyatta-op-cmd-wrapper', 'show']
RESET = ['/opt/vyatta/bin/vyatta-op-cmd-wrapper', 'reset']
+REBOOT = ['/opt/vyatta/bin/vyatta-op-cmd-wrapper', 'reboot']
+POWEROFF = ['/opt/vyatta/bin/vyatta-op-cmd-wrapper', 'poweroff']
OP_CMD_ADD = ['/opt/vyatta/bin/vyatta-op-cmd-wrapper', 'add']
OP_CMD_DELETE = ['/opt/vyatta/bin/vyatta-op-cmd-wrapper', 'delete']
@@ -220,10 +224,18 @@ class ConfigSession(object):
out = self.__run_command(SHOW + path)
return out
+ def reboot(self, path):
+ out = self.__run_command(REBOOT + path)
+ return out
+
def reset(self, path):
out = self.__run_command(RESET + path)
return out
+ def poweroff(self, path):
+ out = self.__run_command(POWEROFF + path)
+ return out
+
def add_container_image(self, name):
out = self.__run_command(OP_CMD_ADD + ['container', 'image'] + [name])
return out
diff --git a/python/vyos/defaults.py b/python/vyos/defaults.py
index a229533bd..2f3580571 100644
--- a/python/vyos/defaults.py
+++ b/python/vyos/defaults.py
@@ -50,15 +50,6 @@ https_data = {
'listen_addresses' : { '*': ['_'] }
}
-api_data = {
- 'listen_address' : '127.0.0.1',
- 'port' : '8080',
- 'socket' : False,
- 'strict' : False,
- 'debug' : False,
- 'api_keys' : [ {'id' : 'testapp', 'key' : 'qwerty'} ]
-}
-
vyos_cert_data = {
'conf' : '/etc/nginx/snippets/vyos-cert.conf',
'crt' : '/etc/ssl/certs/vyos-selfsigned.crt',
diff --git a/python/vyos/frr.py b/python/vyos/frr.py
index ad5c207f5..a01d967e4 100644
--- a/python/vyos/frr.py
+++ b/python/vyos/frr.py
@@ -86,12 +86,8 @@ ch2 = logging.StreamHandler(stream=sys.stdout)
LOG.addHandler(ch)
LOG.addHandler(ch2)
-# Full list of FRR 9.0/stable daemons for reference
-#_frr_daemons = ['zebra', 'staticd', 'bgpd', 'ospfd', 'ospf6d', 'ripd', 'ripngd',
-# 'isisd', 'pim6d', 'ldpd', 'eigrpd', 'babeld', 'sharpd', 'bfdd',
-# 'fabricd', 'pathd']
_frr_daemons = ['zebra', 'staticd', 'bgpd', 'ospfd', 'ospf6d', 'ripd', 'ripngd',
- 'isisd', 'pim6d', 'ldpd', 'babeld', 'bfdd']
+ 'isisd', 'pimd', 'pim6d', 'ldpd', 'eigrpd', 'babeld', 'bfdd']
path_vtysh = '/usr/bin/vtysh'
path_frr_reload = '/usr/lib/frr/frr-reload.py'
diff --git a/python/vyos/ifconfig/interface.py b/python/vyos/ifconfig/interface.py
index 050095364..1586710db 100644
--- a/python/vyos/ifconfig/interface.py
+++ b/python/vyos/ifconfig/interface.py
@@ -496,12 +496,12 @@ class Interface(Control):
from hashlib import sha256
# Get processor ID number
- cpu_id = self._cmd('sudo dmidecode -t 4 | grep ID | head -n1 | sed "s/.*ID://;s/ //g"')
+ cpu_id = self._cmd('sudo dmidecode -t 4 | grep ID | head -n1 | sed "s/.*ID://;s/ //g"')
# XXX: T3894 - it seems not all systems have eth0 - get a list of all
# available Ethernet interfaces on the system (without VLAN subinterfaces)
# and then take the first one.
- all_eth_ifs = [x for x in Section.interfaces('ethernet') if '.' not in x]
+ all_eth_ifs = Section.interfaces('ethernet', vlan=False)
first_mac = Interface(all_eth_ifs[0]).get_mac()
sha = sha256()
@@ -571,6 +571,16 @@ class Interface(Control):
self._cmd(f'ip link set dev {self.ifname} netns {netns}')
return True
+ def get_vrf(self):
+ """
+ Get VRF from interface
+
+ Example:
+ >>> from vyos.ifconfig import Interface
+ >>> Interface('eth0').get_vrf()
+ """
+ return self.get_interface('vrf')
+
def set_vrf(self, vrf: str) -> bool:
"""
Add/Remove interface from given VRF instance.
diff --git a/python/vyos/ifconfig/macsec.py b/python/vyos/ifconfig/macsec.py
index 9329c5ee7..bde1d9aec 100644
--- a/python/vyos/ifconfig/macsec.py
+++ b/python/vyos/ifconfig/macsec.py
@@ -45,6 +45,10 @@ class MACsecIf(Interface):
# create tunnel interface
cmd = 'ip link add link {source_interface} {ifname} type {type}'.format(**self.config)
cmd += f' cipher {self.config["security"]["cipher"]}'
+
+ if 'encrypt' in self.config["security"]:
+ cmd += ' encrypt on'
+
self._cmd(cmd)
# Check if using static keys
diff --git a/python/vyos/ifconfig/vxlan.py b/python/vyos/ifconfig/vxlan.py
index 8c5a0220e..23b6daa3a 100644
--- a/python/vyos/ifconfig/vxlan.py
+++ b/python/vyos/ifconfig/vxlan.py
@@ -22,6 +22,7 @@ from vyos.utils.assertion import assert_list
from vyos.utils.dict import dict_search
from vyos.utils.network import get_interface_config
from vyos.utils.network import get_vxlan_vlan_tunnels
+from vyos.utils.network import get_vxlan_vni_filter
@Interface.register
class VXLANIf(Interface):
@@ -79,6 +80,7 @@ class VXLANIf(Interface):
'parameters.ip.ttl' : 'ttl',
'parameters.ipv6.flowlabel' : 'flowlabel',
'parameters.nolearning' : 'nolearning',
+ 'parameters.vni_filter' : 'vnifilter',
'remote' : 'remote',
'source_address' : 'local',
'source_interface' : 'dev',
@@ -138,10 +140,14 @@ class VXLANIf(Interface):
if not isinstance(state, bool):
raise ValueError('Value out of range')
- cur_vlan_ids = []
if 'vlan_to_vni_removed' in self.config:
- cur_vlan_ids = self.config['vlan_to_vni_removed']
- for vlan in cur_vlan_ids:
+ cur_vni_filter = get_vxlan_vni_filter(self.ifname)
+ for vlan, vlan_config in self.config['vlan_to_vni_removed'].items():
+ # If VNI filtering is enabled, remove matching VNI filter
+ if dict_search('parameters.vni_filter', self.config) != None:
+ vni = vlan_config['vni']
+ if vni in cur_vni_filter:
+ self._cmd(f'bridge vni delete dev {self.ifname} vni {vni}')
self._cmd(f'bridge vlan del dev {self.ifname} vid {vlan}')
# Determine current OS Kernel vlan_tunnel setting - only adjust when needed
@@ -151,10 +157,9 @@ class VXLANIf(Interface):
if cur_state != new_state:
self.set_interface('vlan_tunnel', new_state)
- # Determine current OS Kernel configured VLANs
- os_configured_vlan_ids = get_vxlan_vlan_tunnels(self.ifname)
-
if 'vlan_to_vni' in self.config:
+ # Determine current OS Kernel configured VLANs
+ os_configured_vlan_ids = get_vxlan_vlan_tunnels(self.ifname)
add_vlan = list_diff(list(self.config['vlan_to_vni'].keys()), os_configured_vlan_ids)
for vlan, vlan_config in self.config['vlan_to_vni'].items():
@@ -168,6 +173,10 @@ class VXLANIf(Interface):
self._cmd(f'bridge vlan add dev {self.ifname} vid {vlan}')
self._cmd(f'bridge vlan add dev {self.ifname} vid {vlan} tunnel_info id {vni}')
+ # If VNI filtering is enabled, install matching VNI filter
+ if dict_search('parameters.vni_filter', self.config) != None:
+ self._cmd(f'bridge vni add dev {self.ifname} vni {vni}')
+
def update(self, config):
""" General helper function which works on a dictionary retrived by
get_config_dict(). It's main intention is to consolidate the scattered
diff --git a/python/vyos/progressbar.py b/python/vyos/progressbar.py
index 1793c445b..7bc9d9856 100644
--- a/python/vyos/progressbar.py
+++ b/python/vyos/progressbar.py
@@ -19,26 +19,35 @@ import signal
import subprocess
import sys
+from vyos.utils.io import is_dumb_terminal
from vyos.utils.io import print_error
+
class Progressbar:
def __init__(self, step=None):
self.total = 0.0
self.step = step
+ # Silently ignore all calls if terminal capabilities are lacking.
+ # This will also prevent the output from littering Ansible logs,
+ # as `ansible.netcommon.network_cli' coaxes the terminal into believing
+ # it is interactive.
+ self._dumb = is_dumb_terminal()
def __enter__(self):
- # Recalculate terminal width with every window resize.
- signal.signal(signal.SIGWINCH, lambda signum, frame: self._update_cols())
- # Disable line wrapping to prevent the staircase effect.
- subprocess.run(['tput', 'rmam'], check=False)
- self._update_cols()
- # Print an empty progressbar with entry.
- self.progress(0, 1)
+ if not self._dumb:
+ # Recalculate terminal width with every window resize.
+ signal.signal(signal.SIGWINCH, lambda signum, frame: self._update_cols())
+ # Disable line wrapping to prevent the staircase effect.
+ subprocess.run(['tput', 'rmam'], check=False)
+ self._update_cols()
+ # Print an empty progressbar with entry.
+ self.progress(0, 1)
return self
def __exit__(self, exc_type, kexc_val, exc_tb):
- # Revert to the default SIGWINCH handler (ie nothing).
- signal.signal(signal.SIGWINCH, signal.SIG_DFL)
- # Reenable line wrapping.
- subprocess.run(['tput', 'smam'], check=False)
+ if not self._dumb:
+ # Revert to the default SIGWINCH handler (ie nothing).
+ signal.signal(signal.SIGWINCH, signal.SIG_DFL)
+ # Reenable line wrapping.
+ subprocess.run(['tput', 'smam'], check=False)
def _update_cols(self):
# `os.get_terminal_size()' is fast enough for our purposes.
self.col = max(os.get_terminal_size().columns - 15, 20)
@@ -60,7 +69,7 @@ class Progressbar:
Stateless progressbar taking no input at init and current progress with
final size at callback (for SSH)
"""
- if done <= total:
+ if done <= total and not self._dumb:
length = math.ceil(self.col * done / total)
percentage = str(math.ceil(100 * done / total)).rjust(3)
# Carriage return at the end will make sure the line will get overwritten.
diff --git a/python/vyos/remote.py b/python/vyos/remote.py
index 1ca8a9530..fec44b571 100644
--- a/python/vyos/remote.py
+++ b/python/vyos/remote.py
@@ -14,6 +14,7 @@
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
import os
+import pwd
import shutil
import socket
import ssl
@@ -22,6 +23,9 @@ import sys
import tempfile
import urllib.parse
+from contextlib import contextmanager
+from pathlib import Path
+
from ftplib import FTP
from ftplib import FTP_TLS
@@ -34,9 +38,10 @@ from requests.packages.urllib3 import PoolManager
from vyos.progressbar import Progressbar
from vyos.utils.io import ask_yes_no
+from vyos.utils.io import is_interactive
from vyos.utils.io import print_error
from vyos.utils.misc import begin
-from vyos.utils.process import cmd
+from vyos.utils.process import cmd, rc_cmd
from vyos.version import get_version
CHUNK_SIZE = 8192
@@ -49,7 +54,7 @@ class InteractivePolicy(MissingHostKeyPolicy):
def missing_host_key(self, client, hostname, key):
print_error(f"Host '{hostname}' not found in known hosts.")
print_error('Fingerprint: ' + key.get_fingerprint().hex())
- if sys.stdout.isatty() and ask_yes_no('Do you wish to continue?'):
+ if is_interactive() and ask_yes_no('Do you wish to continue?'):
if client._host_keys_filename\
and ask_yes_no('Do you wish to permanently add this host/key pair to known hosts?'):
client._host_keys.add(hostname, key.get_name(), key)
@@ -72,6 +77,17 @@ class SourceAdapter(HTTPAdapter):
num_pools=connections, maxsize=maxsize,
block=block, source_address=self._source_pair)
+@contextmanager
+def umask(mask: int):
+ """
+ Context manager that temporarily sets the process umask.
+ """
+ import os
+ oldmask = os.umask(mask)
+ try:
+ yield
+ finally:
+ os.umask(oldmask)
def check_storage(path, size):
"""
@@ -250,7 +266,6 @@ class HttpC:
allow_redirects=True,
timeout=self.timeout) as r:
# Abort early if the destination is inaccessible.
- print('pre-3')
r.raise_for_status()
# If the request got redirected, keep the last URL we ended up with.
final_urlstring = r.url
@@ -310,30 +325,138 @@ class TftpC:
with open(location, 'rb') as f:
cmd(f'{self.command} -T - "{self.urlstring}"', input=f.read())
+class GitC:
+ def __init__(self,
+ url,
+ progressbar=False,
+ check_space=False,
+ source_host=None,
+ source_port=0,
+ timeout=10,
+ ):
+ self.command = 'git'
+ self.url = url
+ self.urlstring = urllib.parse.urlunsplit(url)
+ if self.urlstring.startswith("git+"):
+ self.urlstring = self.urlstring.replace("git+", "", 1)
+
+ def download(self, location: str):
+ raise NotImplementedError("not supported")
+
+ @umask(0o077)
+ def upload(self, location: str):
+ scheme = self.url.scheme
+ _, _, scheme = scheme.partition("+")
+ netloc = self.url.netloc
+ url = Path(self.url.path).parent
+ with tempfile.TemporaryDirectory(prefix="git-commit-archive-") as directory:
+ # Determine username, fullname, email for Git commit
+ pwd_entry = pwd.getpwuid(os.getuid())
+ user = pwd_entry.pw_name
+ name = pwd_entry.pw_gecos.split(",")[0] or user
+ fqdn = socket.getfqdn()
+ email = f"{user}@{fqdn}"
+
+ # environment vars for our git commands
+ env = {
+ "GIT_TERMINAL_PROMPT": "0",
+ "GIT_AUTHOR_NAME": name,
+ "GIT_AUTHOR_EMAIL": email,
+ "GIT_COMMITTER_NAME": name,
+ "GIT_COMMITTER_EMAIL": email,
+ }
+
+ # build ssh command for git
+ ssh_command = ["ssh"]
+
+ # if we are not interactive, we use StrictHostKeyChecking=yes to avoid any prompts
+ if not sys.stdout.isatty():
+ ssh_command += ["-o", "StrictHostKeyChecking=yes"]
+
+ env["GIT_SSH_COMMAND"] = " ".join(ssh_command)
+
+ # git clone
+ path_repository = Path(directory) / "repository"
+ scheme = f"{scheme}://" if scheme else ""
+ rc, out = rc_cmd(
+ [self.command, "clone", f"{scheme}{netloc}{url}", str(path_repository), "--depth=1"],
+ env=env,
+ shell=False,
+ )
+ if rc:
+ raise Exception(out)
+
+ # git add
+ filename = Path(Path(self.url.path).name).stem
+ dst = path_repository / filename
+ shutil.copy2(location, dst)
+ rc, out = rc_cmd(
+ [self.command, "-C", str(path_repository), "add", filename],
+ env=env,
+ shell=False,
+ )
+
+ # git commit -m
+ commit_message = os.environ.get("COMMIT_COMMENT", "commit")
+ rc, out = rc_cmd(
+ [self.command, "-C", str(path_repository), "commit", "-m", commit_message],
+ env=env,
+ shell=False,
+ )
+
+ # git push
+ rc, out = rc_cmd(
+ [self.command, "-C", str(path_repository), "push"],
+ env=env,
+ shell=False,
+ )
+ if rc:
+ raise Exception(out)
+
def urlc(urlstring, *args, **kwargs):
"""
Dynamically dispatch the appropriate protocol class.
"""
- url_classes = {'http': HttpC, 'https': HttpC, 'ftp': FtpC, 'ftps': FtpC, \
- 'sftp': SshC, 'ssh': SshC, 'scp': SshC, 'tftp': TftpC}
+ url_classes = {
+ "http": HttpC,
+ "https": HttpC,
+ "ftp": FtpC,
+ "ftps": FtpC,
+ "sftp": SshC,
+ "ssh": SshC,
+ "scp": SshC,
+ "tftp": TftpC,
+ "git": GitC,
+ }
url = urllib.parse.urlsplit(urlstring)
+ scheme, _, _ = url.scheme.partition("+")
try:
- return url_classes[url.scheme](url, *args, **kwargs)
+ return url_classes[scheme](url, *args, **kwargs)
except KeyError:
- raise ValueError(f'Unsupported URL scheme: "{url.scheme}"')
+ raise ValueError(f'Unsupported URL scheme: "{scheme}"')
-def download(local_path, urlstring, *args, **kwargs):
+def download(local_path, urlstring, progressbar=False, check_space=False,
+ source_host='', source_port=0, timeout=10.0, raise_error=False):
try:
- urlc(urlstring, *args, **kwargs).download(local_path)
+ progressbar = progressbar and is_interactive()
+ urlc(urlstring, progressbar, check_space, source_host, source_port, timeout).download(local_path)
except Exception as err:
+ if raise_error:
+ raise
print_error(f'Unable to download "{urlstring}": {err}')
+ except KeyboardInterrupt:
+ print_error('\nDownload aborted by user.')
-def upload(local_path, urlstring, *args, **kwargs):
+def upload(local_path, urlstring, progressbar=False,
+ source_host='', source_port=0, timeout=10.0):
try:
- urlc(urlstring, *args, **kwargs).upload(local_path)
+ progressbar = progressbar and is_interactive()
+ urlc(urlstring, progressbar, source_host, source_port, timeout).upload(local_path)
except Exception as err:
print_error(f'Unable to upload "{urlstring}": {err}')
+ except KeyboardInterrupt:
+ print_error('\nUpload aborted by user.')
def get_remote_config(urlstring, source_host='', source_port=0):
"""
@@ -346,26 +469,3 @@ def get_remote_config(urlstring, source_host='', source_port=0):
return f.read()
finally:
os.remove(temp)
-
-def friendly_download(local_path, urlstring, source_host='', source_port=0):
- """
- Download with a progress bar, reassuring messages and free space checks.
- """
- try:
- print_error('Downloading...')
- download(local_path, urlstring, True, True, source_host, source_port)
- except KeyboardInterrupt:
- print_error('\nDownload aborted by user.')
- sys.exit(1)
- except:
- import traceback
- print_error(f'Failed to download {urlstring}.')
- # There are a myriad different reasons a download could fail.
- # SSH errors, FTP errors, I/O errors, HTTP errors (403, 404...)
- # We omit the scary stack trace but print the error nevertheless.
- exc_type, exc_value, exc_traceback = sys.exc_info()
- traceback.print_exception(exc_type, exc_value, None, 0, None, False)
- sys.exit(1)
- else:
- print_error('Download complete.')
- sys.exit(0)
diff --git a/python/vyos/system/__init__.py b/python/vyos/system/__init__.py
new file mode 100644
index 000000000..0c91330ba
--- /dev/null
+++ b/python/vyos/system/__init__.py
@@ -0,0 +1,18 @@
+# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+__all_: list[str] = ['disk', 'grub', 'image']
+# define image-tools version
+SYSTEM_CFG_VER = 1
diff --git a/python/vyos/system/compat.py b/python/vyos/system/compat.py
new file mode 100644
index 000000000..319c3dabf
--- /dev/null
+++ b/python/vyos/system/compat.py
@@ -0,0 +1,316 @@
+# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+from pathlib import Path
+from re import compile, MULTILINE, DOTALL
+from functools import wraps
+from copy import deepcopy
+from typing import Union
+
+from vyos.system import disk, grub, image, SYSTEM_CFG_VER
+from vyos.template import render
+
+TMPL_GRUB_COMPAT: str = 'grub/grub_compat.j2'
+
+# define regexes and variables
+REGEX_VERSION = r'^menuentry "[^\n]*{\n[^}]*\s+linux /boot/(?P<version>\S+)/[^}]*}'
+REGEX_MENUENTRY = r'^menuentry "[^\n]*{\n[^}]*\s+linux /boot/(?P<version>\S+)/vmlinuz (?P<options>[^\n]+)\n[^}]*}'
+REGEX_CONSOLE = r'^.*console=(?P<console_type>[^\s\d]+)(?P<console_num>[\d]+).*$'
+REGEX_SANIT_CONSOLE = r'\ ?console=[^\s\d]+[\d]+(,\d+)?\ ?'
+REGEX_SANIT_INIT = r'\ ?init=\S*\ ?'
+REGEX_SANIT_QUIET = r'\ ?quiet\ ?'
+PW_RESET_OPTION = 'init=/opt/vyatta/sbin/standalone_root_pw_reset'
+
+
+class DowngradingImageTools(Exception):
+ """Raised when attempting to add an image with an earlier version
+ of image-tools than the current system, as indicated by the value
+ of SYSTEM_CFG_VER or absence thereof."""
+ pass
+
+
+def mode():
+ if grub.get_cfg_ver() >= SYSTEM_CFG_VER:
+ return False
+
+ return True
+
+
+def find_versions(menu_entries: list) -> list:
+ """Find unique VyOS versions from menu entries
+
+ Args:
+ menu_entries (list): a list with menu entries
+
+ Returns:
+ list: List of installed versions
+ """
+ versions = []
+ for vyos_ver in menu_entries:
+ versions.append(vyos_ver.get('version'))
+ # remove duplicates
+ versions = list(set(versions))
+ return versions
+
+
+def filter_unparsed(grub_path: str) -> str:
+ """Find currently installed VyOS version
+
+ Args:
+ grub_path (str): a path to the grub.cfg file
+
+ Returns:
+ str: unparsed grub.cfg items
+ """
+ config_text = Path(grub_path).read_text()
+ regex_filter = compile(REGEX_VERSION, MULTILINE | DOTALL)
+ filtered = regex_filter.sub('', config_text)
+ regex_filter = compile(grub.REGEX_GRUB_VARS, MULTILINE)
+ filtered = regex_filter.sub('', filtered)
+ regex_filter = compile(grub.REGEX_GRUB_MODULES, MULTILINE)
+ filtered = regex_filter.sub('', filtered)
+ # strip extra new lines
+ filtered = filtered.strip()
+ return filtered
+
+
+def get_search_root(unparsed: str) -> str:
+ unparsed_lines = unparsed.splitlines()
+ search_root = next((x for x in unparsed_lines if 'search' in x), '')
+ return search_root
+
+
+def sanitize_boot_opts(boot_opts: str) -> str:
+ """Sanitize boot options from console and init
+
+ Args:
+ boot_opts (str): boot options
+
+ Returns:
+ str: sanitized boot options
+ """
+ regex_filter = compile(REGEX_SANIT_CONSOLE)
+ boot_opts = regex_filter.sub('', boot_opts)
+ regex_filter = compile(REGEX_SANIT_INIT)
+ boot_opts = regex_filter.sub('', boot_opts)
+ # legacy tools add 'quiet' on add system image; this is not desired
+ regex_filter = compile(REGEX_SANIT_QUIET)
+ boot_opts = regex_filter.sub(' ', boot_opts)
+
+ return boot_opts
+
+
+def parse_entry(entry: tuple) -> dict:
+ """Parse GRUB menuentry
+
+ Args:
+ entry (tuple): tuple of (version, options)
+
+ Returns:
+ dict: dictionary with parsed options
+ """
+ # save version to dict
+ entry_dict = {'version': entry[0]}
+ # detect boot mode type
+ if PW_RESET_OPTION in entry[1]:
+ entry_dict['bootmode'] = 'pw_reset'
+ else:
+ entry_dict['bootmode'] = 'normal'
+ # find console type and number
+ regex_filter = compile(REGEX_CONSOLE)
+ entry_dict.update(regex_filter.match(entry[1]).groupdict())
+ entry_dict['boot_opts'] = sanitize_boot_opts(entry[1])
+
+ return entry_dict
+
+
+def parse_menuentries(grub_path: str) -> list:
+ """Parse all GRUB menuentries
+
+ Args:
+ grub_path (str): a path to GRUB config file
+
+ Returns:
+ list: list with menu items (each item is a dict)
+ """
+ menuentries = []
+ # read configuration file
+ config_text = Path(grub_path).read_text()
+ # parse menuentries to tuples (version, options)
+ regex_filter = compile(REGEX_MENUENTRY, MULTILINE)
+ filter_results = regex_filter.findall(config_text)
+ # parse each entry
+ for entry in filter_results:
+ menuentries.append(parse_entry(entry))
+
+ return menuentries
+
+
+def prune_vyos_versions(root_dir: str = '') -> None:
+ """Delete vyos-versions files of registered images subsequently deleted
+ or renamed by legacy image-tools
+
+ Args:
+ root_dir (str): an optional path to the root directory
+ """
+ if not root_dir:
+ root_dir = disk.find_persistence()
+
+ for version in grub.version_list():
+ if not Path(f'{root_dir}/boot/{version}').is_dir():
+ grub.version_del(version)
+
+
+def update_cfg_ver(root_dir:str = '') -> int:
+ """Get minumum version of image-tools across all installed images
+
+ Args:
+ root_dir (str): an optional path to the root directory
+
+ Returns:
+ int: minimum version of image-tools
+ """
+ if not root_dir:
+ root_dir = disk.find_persistence()
+
+ prune_vyos_versions(root_dir)
+
+ images_details = image.get_images_details()
+ cfg_version = min(d['tools_version'] for d in images_details)
+
+ return cfg_version
+
+
+def get_default(menu_entries: list, root_dir: str = '') -> Union[int, None]:
+ """Translate default version to menuentry index
+
+ Args:
+ menu_entries (list): list of dicts of installed version boot data
+ root_dir (str): an optional path to the root directory
+
+ Returns:
+ int: index of default version in menu_entries or None
+ """
+ if not root_dir:
+ root_dir = disk.find_persistence()
+
+ grub_cfg_main = f'{root_dir}/{grub.GRUB_CFG_MAIN}'
+
+ image_name = image.get_default_image()
+
+ sublist = list(filter(lambda x: x.get('version') == image_name,
+ menu_entries))
+ if sublist:
+ return menu_entries.index(sublist[0])
+
+ return None
+
+
+def update_version_list(root_dir: str = '') -> list[dict]:
+ """Update list of dicts of installed version boot data
+
+ Args:
+ root_dir (str): an optional path to the root directory
+
+ Returns:
+ list: list of dicts of installed version boot data
+ """
+ if not root_dir:
+ root_dir = disk.find_persistence()
+
+ grub_cfg_main = f'{root_dir}/{grub.GRUB_CFG_MAIN}'
+
+ # get list of versions in menuentries
+ menu_entries = parse_menuentries(grub_cfg_main)
+ menu_versions = find_versions(menu_entries)
+
+ # get list of versions added/removed by image-tools
+ current_versions = grub.version_list(root_dir)
+
+ remove = list(set(menu_versions) - set(current_versions))
+ for ver in remove:
+ menu_entries = list(filter(lambda x: x.get('version') != ver,
+ menu_entries))
+
+ add = list(set(current_versions) - set(menu_versions))
+ for ver in add:
+ last = menu_entries[0].get('version')
+ new = deepcopy(list(filter(lambda x: x.get('version') == last,
+ menu_entries)))
+ for e in new:
+ boot_opts = e.get('boot_opts').replace(last, ver)
+ e.update({'version': ver, 'boot_opts': boot_opts})
+
+ menu_entries = new + menu_entries
+
+ return menu_entries
+
+
+def grub_cfg_fields(root_dir: str = '') -> dict:
+ """Gather fields for rendering grub.cfg
+
+ Args:
+ root_dir (str): an optional path to the root directory
+
+ Returns:
+ dict: dictionary for rendering TMPL_GRUB_COMPAT
+ """
+ if not root_dir:
+ root_dir = disk.find_persistence()
+
+ grub_cfg_main = f'{root_dir}/{grub.GRUB_CFG_MAIN}'
+
+ fields = {'default': 0, 'timeout': 5}
+ # 'default' and 'timeout' from legacy grub.cfg
+ fields |= grub.vars_read(grub_cfg_main)
+
+ fields['tools_version'] = SYSTEM_CFG_VER
+ menu_entries = update_version_list(root_dir)
+ fields['versions'] = menu_entries
+
+ default = get_default(menu_entries, root_dir)
+ if default is not None:
+ fields['default'] = default
+
+ modules = grub.modules_read(grub_cfg_main)
+ fields['modules'] = modules
+
+ unparsed = filter_unparsed(grub_cfg_main).splitlines()
+ search_root = next((x for x in unparsed if 'search' in x), '')
+ fields['search_root'] = search_root
+
+ return fields
+
+
+def render_grub_cfg(root_dir: str = '') -> None:
+ """Render grub.cfg for legacy compatibility"""
+ if not root_dir:
+ root_dir = disk.find_persistence()
+
+ grub_cfg_main = f'{root_dir}/{grub.GRUB_CFG_MAIN}'
+
+ fields = grub_cfg_fields(root_dir)
+ render(grub_cfg_main, TMPL_GRUB_COMPAT, fields)
+
+
+def grub_cfg_update(func):
+ """Decorator to update grub.cfg after function call"""
+ @wraps(func)
+ def wrapper(*args, **kwargs):
+ ret = func(*args, **kwargs)
+ if mode():
+ render_grub_cfg()
+ return ret
+ return wrapper
diff --git a/python/vyos/system/disk.py b/python/vyos/system/disk.py
new file mode 100644
index 000000000..49e6b5c5e
--- /dev/null
+++ b/python/vyos/system/disk.py
@@ -0,0 +1,217 @@
+# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+from json import loads as json_loads
+from os import sync
+from dataclasses import dataclass
+
+from psutil import disk_partitions
+
+from vyos.utils.process import run, cmd
+
+
+@dataclass
+class DiskDetails:
+ """Disk details"""
+ name: str
+ partition: dict[str, str]
+
+
+def disk_cleanup(drive_path: str) -> None:
+ """Clean up disk partition table (MBR and GPT)
+ Zeroize primary and secondary headers - first and last 17408 bytes
+ (512 bytes * 34 LBA) on a drive
+
+ Args:
+ drive_path (str): path to a drive that needs to be cleaned
+ """
+ run(f'sgdisk -Z {drive_path}')
+
+
+def find_persistence() -> str:
+ """Find a mountpoint for persistence storage
+
+ Returns:
+ str: Path where 'persistance' pertition is mounted, Empty if not found
+ """
+ mounted_partitions = disk_partitions()
+ for partition in mounted_partitions:
+ if partition.mountpoint.endswith('/persistence'):
+ return partition.mountpoint
+ return ''
+
+
+def parttable_create(drive_path: str, root_size: int) -> None:
+ """Create a hybrid MBR/GPT partition table
+ 0-2047 first sectors are free
+ 2048-4095 sectors - BIOS Boot Partition
+ 4096 + 256 MB - EFI system partition
+ Everything else till the end of a drive - Linux partition
+
+ Args:
+ drive_path (str): path to a drive
+ """
+ if not root_size:
+ root_size_text: str = '+100%'
+ else:
+ root_size_text: str = str(root_size)
+ command = f'sgdisk -a1 -n1:2048:4095 -t1:EF02 -n2:4096:+256M -t2:EF00 \
+ -n3:0:+{root_size_text}K -t3:8300 {drive_path}'
+
+ run(command)
+ # update partitons in kernel
+ sync()
+ run(f'partprobe {drive_path}')
+
+ partitions: list[str] = partition_list(drive_path)
+
+ disk: DiskDetails = DiskDetails(
+ name = drive_path,
+ partition = {
+ 'efi': next(x for x in partitions if x.endswith('2')),
+ 'root': next(x for x in partitions if x.endswith('3'))
+ }
+ )
+
+ return disk
+
+
+def partition_list(drive_path: str) -> list[str]:
+ """Get a list of partitions on a drive
+
+ Args:
+ drive_path (str): path to a drive
+
+ Returns:
+ list[str]: a list of partition paths
+ """
+ lsblk: str = cmd(f'lsblk -Jp {drive_path}')
+ drive_info: dict = json_loads(lsblk)
+ device: list = drive_info.get('blockdevices')
+ children: list[str] = device[0].get('children', []) if device else []
+ partitions: list[str] = [child.get('name') for child in children]
+ return partitions
+
+
+def partition_parent(partition_path: str) -> str:
+ """Get a parent device for a partition
+
+ Args:
+ partition (str): path to a partition
+
+ Returns:
+ str: path to a parent device
+ """
+ parent: str = cmd(f'lsblk -ndpo pkname {partition_path}')
+ return parent
+
+
+def from_partition(partition_path: str) -> DiskDetails:
+ drive_path: str = partition_parent(partition_path)
+ partitions: list[str] = partition_list(drive_path)
+
+ disk: DiskDetails = DiskDetails(
+ name = drive_path,
+ partition = {
+ 'efi': next(x for x in partitions if x.endswith('2')),
+ 'root': next(x for x in partitions if x.endswith('3'))
+ }
+ )
+
+ return disk
+
+def filesystem_create(partition: str, fstype: str) -> None:
+ """Create a filesystem on a partition
+
+ Args:
+ partition (str): path to a partition (for example: '/dev/sda1')
+ fstype (str): filesystem type ('efi' or 'ext4')
+ """
+ if fstype == 'efi':
+ command = 'mkfs -t fat -n EFI'
+ run(f'{command} {partition}')
+ if fstype == 'ext4':
+ command = 'mkfs -t ext4 -L persistence'
+ run(f'{command} {partition}')
+
+
+def partition_mount(partition: str,
+ path: str,
+ fsype: str = '',
+ overlay_params: dict[str, str] = {}) -> None:
+ """Mount a partition into a path
+
+ Args:
+ partition (str): path to a partition (for example: '/dev/sda1')
+ path (str): a path where to mount
+ fsype (str): optionally, set fstype ('squashfs', 'overlay', 'iso9660')
+ overlay_params (dict): optionally, set overlay parameters.
+ Defaults to None.
+ """
+ if fsype in ['squashfs', 'iso9660']:
+ command: str = f'mount -o loop,ro -t {fsype} {partition} {path}'
+ if fsype == 'overlay' and overlay_params:
+ command: str = f'mount -t overlay -o noatime,\
+ upperdir={overlay_params["upperdir"]},\
+ lowerdir={overlay_params["lowerdir"]},\
+ workdir={overlay_params["workdir"]} overlay {path}'
+
+ else:
+ command = f'mount {partition} {path}'
+
+ run(command)
+
+
+def partition_umount(partition: str = '', path: str = '') -> None:
+ """Umount a partition by a partition name or a path
+
+ Args:
+ partition (str): path to a partition (for example: '/dev/sda1')
+ path (str): a path where a partition is mounted
+ """
+ if partition:
+ command = f'umount {partition}'
+ run(command)
+ if path:
+ command = f'umount {path}'
+ run(command)
+
+
+def find_device(mountpoint: str) -> str:
+ """Find a device by mountpoint
+
+ Returns:
+ str: Path to device, Empty if not found
+ """
+ mounted_partitions = disk_partitions()
+ for partition in mounted_partitions:
+ if partition.mountpoint == mountpoint:
+ return partition.mountpoint
+ return ''
+
+
+def disks_size() -> dict[str, int]:
+ """Get a dictionary with physical disks and their sizes
+
+ Returns:
+ dict[str, int]: a dictionary with name: size mapping
+ """
+ disks_size: dict[str, int] = {}
+ lsblk: str = cmd('lsblk -Jbp')
+ blk_list = json_loads(lsblk)
+ for device in blk_list.get('blockdevices'):
+ if device['type'] == 'disk':
+ disks_size.update({device['name']: device['size']})
+ return disks_size
diff --git a/python/vyos/system/grub.py b/python/vyos/system/grub.py
new file mode 100644
index 000000000..0ac16af9a
--- /dev/null
+++ b/python/vyos/system/grub.py
@@ -0,0 +1,340 @@
+# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+from pathlib import Path
+from re import MULTILINE, compile as re_compile
+from typing import Union
+from uuid import uuid5, NAMESPACE_URL, UUID
+
+from vyos.template import render
+from vyos.utils.process import cmd
+from vyos.system import disk
+
+# Define variables
+GRUB_DIR_MAIN: str = '/boot/grub'
+GRUB_CFG_MAIN: str = f'{GRUB_DIR_MAIN}/grub.cfg'
+GRUB_DIR_VYOS: str = f'{GRUB_DIR_MAIN}/grub.cfg.d'
+CFG_VYOS_HEADER: str = f'{GRUB_DIR_VYOS}/00-vyos-header.cfg'
+CFG_VYOS_MODULES: str = f'{GRUB_DIR_VYOS}/10-vyos-modules-autoload.cfg'
+CFG_VYOS_VARS: str = f'{GRUB_DIR_VYOS}/20-vyos-defaults-autoload.cfg'
+CFG_VYOS_COMMON: str = f'{GRUB_DIR_VYOS}/25-vyos-common-autoload.cfg'
+CFG_VYOS_PLATFORM: str = f'{GRUB_DIR_VYOS}/30-vyos-platform-autoload.cfg'
+CFG_VYOS_MENU: str = f'{GRUB_DIR_VYOS}/40-vyos-menu-autoload.cfg'
+CFG_VYOS_OPTIONS: str = f'{GRUB_DIR_VYOS}/50-vyos-options.cfg'
+GRUB_DIR_VYOS_VERS: str = f'{GRUB_DIR_VYOS}/vyos-versions'
+
+TMPL_VYOS_VERSION: str = 'grub/grub_vyos_version.j2'
+TMPL_GRUB_VARS: str = 'grub/grub_vars.j2'
+TMPL_GRUB_MAIN: str = 'grub/grub_main.j2'
+TMPL_GRUB_MENU: str = 'grub/grub_menu.j2'
+TMPL_GRUB_MODULES: str = 'grub/grub_modules.j2'
+TMPL_GRUB_OPTS: str = 'grub/grub_options.j2'
+TMPL_GRUB_COMMON: str = 'grub/grub_common.j2'
+
+# prepare regexes
+REGEX_GRUB_VARS: str = r'^set (?P<variable_name>.+)=[\'"]?(?P<variable_value>.*)(?<![\'"])[\'"]?$'
+REGEX_GRUB_MODULES: str = r'^insmod (?P<module_name>.+)$'
+REGEX_KERNEL_CMDLINE: str = r'^BOOT_IMAGE=/(?P<boot_type>boot|live)/((?P<image_version>.+)/)?vmlinuz.*$'
+
+
+def install(drive_path: str, boot_dir: str, efi_dir: str, id: str = 'VyOS') -> None:
+ """Install GRUB for both BIOS and EFI modes (hybrid boot)
+
+ Args:
+ drive_path (str): path to a drive where GRUB must be installed
+ boot_dir (str): a path to '/boot' directory
+ efi_dir (str): a path to '/boot/efi' directory
+ """
+ commands: list[str] = [
+ f'grub-install --no-floppy --target=i386-pc --boot-directory={boot_dir} \
+ {drive_path} --force',
+ f'grub-install --no-floppy --recheck --target=x86_64-efi \
+ --force-extra-removable --boot-directory={boot_dir} \
+ --efi-directory={efi_dir} --bootloader-id="{id}" \
+ --no-uefi-secure-boot'
+ ]
+ for command in commands:
+ cmd(command)
+
+
+def gen_version_uuid(version_name: str) -> str:
+ """Generate unique ID from version name
+
+ Use UUID5 / NAMESPACE_URL with prefix `uuid5-`
+
+ Args:
+ version_name (str): version name
+
+ Returns:
+ str: generated unique ID
+ """
+ ver_uuid: UUID = uuid5(NAMESPACE_URL, version_name)
+ ver_id: str = f'uuid5-{ver_uuid}'
+ return ver_id
+
+
+def version_add(version_name: str,
+ root_dir: str = '',
+ boot_opts: str = '') -> None:
+ """Add a new VyOS version to GRUB loader configuration
+
+ Args:
+ vyos_version (str): VyOS version name
+ root_dir (str): an optional path to the root directory.
+ Defaults to empty.
+ boot_opts (str): an optional boot options for Linux kernel.
+ Defaults to empty.
+ """
+ if not root_dir:
+ root_dir = disk.find_persistence()
+ version_config: str = f'{root_dir}/{GRUB_DIR_VYOS_VERS}/{version_name}.cfg'
+ render(
+ version_config, TMPL_VYOS_VERSION, {
+ 'version_name': version_name,
+ 'version_uuid': gen_version_uuid(version_name),
+ 'boot_opts': boot_opts
+ })
+
+
+def version_del(vyos_version: str, root_dir: str = '') -> None:
+ """Delete a VyOS version from GRUB loader configuration
+
+ Args:
+ vyos_version (str): VyOS version name
+ root_dir (str): an optional path to the root directory.
+ Defaults to empty.
+ """
+ if not root_dir:
+ root_dir = disk.find_persistence()
+ version_config: str = f'{root_dir}/{GRUB_DIR_VYOS_VERS}/{vyos_version}.cfg'
+ Path(version_config).unlink(missing_ok=True)
+
+
+def version_list(root_dir: str = '') -> list[str]:
+ """Generate a list with installed VyOS versions
+
+ Args:
+ root_dir (str): an optional path to the root directory.
+ Defaults to empty.
+
+ Returns:
+ list: A list with versions names
+ """
+ if not root_dir:
+ root_dir = disk.find_persistence()
+ versions_files = Path(f'{root_dir}/{GRUB_DIR_VYOS_VERS}').glob('*.cfg')
+ versions_list: list[str] = []
+ for file in versions_files:
+ versions_list.append(file.stem)
+ return versions_list
+
+
+def read_env(env_file: str = '') -> dict[str, str]:
+ """Read GRUB environment
+
+ Args:
+ env_file (str, optional): a path to grub environment file.
+ Defaults to empty.
+
+ Returns:
+ dict: dictionary with GRUB environment
+ """
+ if not env_file:
+ root_dir: str = disk.find_persistence()
+ env_file = f'{root_dir}/{GRUB_DIR_MAIN}/grubenv'
+
+ env_content: str = cmd(f'grub-editenv {env_file} list').splitlines()
+ regex_filter = re_compile(r'^(?P<variable_name>.*)=(?P<variable_value>.*)$')
+ env_dict: dict[str, str] = {}
+ for env_item in env_content:
+ search_result = regex_filter.fullmatch(env_item)
+ if search_result:
+ search_result_dict: dict[str, str] = search_result.groupdict()
+ variable_name: str = search_result_dict.get('variable_name', '')
+ variable_value: str = search_result_dict.get('variable_value', '')
+ if variable_name and variable_value:
+ env_dict.update({variable_name: variable_value})
+ return env_dict
+
+
+def get_cfg_ver(root_dir: str = '') -> int:
+ """Get current version of GRUB configuration
+
+ Args:
+ root_dir (str, optional): an optional path to the root directory.
+ Defaults to empty.
+
+ Returns:
+ int: a configuration version
+ """
+ if not root_dir:
+ root_dir = disk.find_persistence()
+
+ cfg_ver: str = vars_read(f'{root_dir}/{CFG_VYOS_HEADER}').get(
+ 'VYOS_CFG_VER')
+ if cfg_ver:
+ cfg_ver_int: int = int(cfg_ver)
+ else:
+ cfg_ver_int: int = 0
+ return cfg_ver_int
+
+
+def write_cfg_ver(cfg_ver: int, root_dir: str = '') -> None:
+ """Write version number of GRUB configuration
+
+ Args:
+ cfg_ver (int): a version number to write
+ root_dir (str, optional): an optional path to the root directory.
+ Defaults to empty.
+
+ Returns:
+ int: a configuration version
+ """
+ if not root_dir:
+ root_dir = disk.find_persistence()
+
+ vars_file: str = f'{root_dir}/{CFG_VYOS_HEADER}'
+ vars_current: dict[str, str] = vars_read(vars_file)
+ vars_current['VYOS_CFG_VER'] = str(cfg_ver)
+ vars_write(vars_file, vars_current)
+
+
+def vars_read(grub_cfg: str) -> dict[str, str]:
+ """Read variables from a GRUB configuration file
+
+ Args:
+ grub_cfg (str): a path to the GRUB config file
+
+ Returns:
+ dict: a dictionary with variables and values
+ """
+ vars_dict: dict[str, str] = {}
+ regex_filter = re_compile(REGEX_GRUB_VARS)
+ try:
+ config_text: list[str] = Path(grub_cfg).read_text().splitlines()
+ except FileNotFoundError:
+ return vars_dict
+ for line in config_text:
+ search_result = regex_filter.fullmatch(line)
+ if search_result:
+ search_dict = search_result.groupdict()
+ variable_name: str = search_dict.get('variable_name', '')
+ variable_value: str = search_dict.get('variable_value', '')
+ if variable_name and variable_value:
+ vars_dict.update({variable_name: variable_value})
+ return vars_dict
+
+
+def modules_read(grub_cfg: str) -> list[str]:
+ """Read modules list from a GRUB configuration file
+
+ Args:
+ grub_cfg (str): a path to the GRUB config file
+
+ Returns:
+ list: a list with modules to load
+ """
+ mods_list: list[str] = []
+ regex_filter = re_compile(REGEX_GRUB_MODULES, MULTILINE)
+ try:
+ config_text = Path(grub_cfg).read_text()
+ except FileNotFoundError:
+ return mods_list
+ mods_list = regex_filter.findall(config_text)
+
+ return mods_list
+
+
+def modules_write(grub_cfg: str, mods_list: list[str]) -> None:
+ """Write modules list to a GRUB configuration file (overwrite everything)
+
+ Args:
+ grub_cfg (str): a path to GRUB configuration file
+ mods_list (list): a list with modules to load
+ """
+ render(grub_cfg, TMPL_GRUB_MODULES, {'mods_list': mods_list})
+
+
+def vars_write(grub_cfg: str, grub_vars: dict[str, str]) -> None:
+ """Write variables to a GRUB configuration file (overwrite everything)
+
+ Args:
+ grub_cfg (str): a path to GRUB configuration file
+ grub_vars (dict): a dictionary with new variables
+ """
+ render(grub_cfg, TMPL_GRUB_VARS, {'vars': grub_vars})
+
+
+def set_default(version_name: str, root_dir: str = '') -> None:
+ """Set version as default boot entry
+
+ Args:
+ version_name (str): versio name
+ root_dir (str, optional): an optional path to the root directory.
+ Defaults to empty.
+ """
+ if not root_dir:
+ root_dir = disk.find_persistence()
+
+ vars_file = f'{root_dir}/{CFG_VYOS_VARS}'
+ vars_current = vars_read(vars_file)
+ vars_current['default'] = gen_version_uuid(version_name)
+ vars_write(vars_file, vars_current)
+
+
+def common_write(root_dir: str = '', grub_common: dict[str, str] = {}) -> None:
+ """Write common GRUB configuration file (overwrite everything)
+
+ Args:
+ root_dir (str, optional): an optional path to the root directory.
+ Defaults to empty.
+ """
+ if not root_dir:
+ root_dir = disk.find_persistence()
+ common_config = f'{root_dir}/{CFG_VYOS_COMMON}'
+ render(common_config, TMPL_GRUB_COMMON, grub_common)
+
+
+def create_structure(root_dir: str = '') -> None:
+ """Create GRUB directories structure
+
+ Args:
+ root_dir (str, optional): an optional path to the root directory.
+ Defaults to ''.
+ """
+ if not root_dir:
+ root_dir = disk.find_persistence()
+
+ Path(f'{root_dir}/GRUB_DIR_VYOS_VERS').mkdir(parents=True, exist_ok=True)
+
+
+def set_console_type(console_type: str, root_dir: str = '') -> None:
+ """Write default console type to GRUB configuration
+
+ Args:
+ console_type (str): a default console type
+ root_dir (str, optional): an optional path to the root directory.
+ Defaults to empty.
+ """
+ if not root_dir:
+ root_dir = disk.find_persistence()
+
+ vars_file: str = f'{root_dir}/{CFG_VYOS_VARS}'
+ vars_current: dict[str, str] = vars_read(vars_file)
+ vars_current['console_type'] = str(console_type)
+ vars_write(vars_file, vars_current)
+
+def set_raid(root_dir: str = '') -> None:
+ pass
diff --git a/python/vyos/system/image.py b/python/vyos/system/image.py
new file mode 100644
index 000000000..c03ce02d5
--- /dev/null
+++ b/python/vyos/system/image.py
@@ -0,0 +1,268 @@
+# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+from pathlib import Path
+from re import compile as re_compile
+from tempfile import TemporaryDirectory
+from typing import TypedDict
+
+from vyos import version
+from vyos.system import disk, grub
+
+# Define variables
+GRUB_DIR_MAIN: str = '/boot/grub'
+GRUB_DIR_VYOS: str = f'{GRUB_DIR_MAIN}/grub.cfg.d'
+CFG_VYOS_VARS: str = f'{GRUB_DIR_VYOS}/20-vyos-defaults-autoload.cfg'
+GRUB_DIR_VYOS_VERS: str = f'{GRUB_DIR_VYOS}/vyos-versions'
+# prepare regexes
+REGEX_KERNEL_CMDLINE: str = r'^BOOT_IMAGE=/(?P<boot_type>boot|live)/((?P<image_version>.+)/)?vmlinuz.*$'
+REGEX_SYSTEM_CFG_VER: str = r'(\r\n|\r|\n)SYSTEM_CFG_VER\s*=\s*(?P<cfg_ver>\d+)(\r\n|\r|\n)'
+
+
+# structures definitions
+class ImageDetails(TypedDict):
+ name: str
+ version: str
+ tools_version: int
+ disk_ro: int
+ disk_rw: int
+ disk_total: int
+
+
+class BootDetails(TypedDict):
+ image_default: str
+ image_running: str
+ images_available: list[str]
+ console_type: str
+ console_num: int
+
+
+def bootmode_detect() -> str:
+ """Detect system boot mode
+
+ Returns:
+ str: 'bios' or 'efi'
+ """
+ if Path('/sys/firmware/efi/').exists():
+ return 'efi'
+ else:
+ return 'bios'
+
+
+def get_image_version(mount_path: str) -> str:
+ """Extract version name from rootfs mounted at mount_path
+
+ Args:
+ mount_path (str): mount path of rootfs
+
+ Returns:
+ str: version name
+ """
+ version_file: str = Path(
+ f'{mount_path}/opt/vyatta/etc/version').read_text()
+ version_name: str = version_file.lstrip('Version: ').strip()
+
+ return version_name
+
+
+def get_image_tools_version(mount_path: str) -> int:
+ """Extract image-tools version from rootfs mounted at mount_path
+
+ Args:
+ mount_path (str): mount path of rootfs
+
+ Returns:
+ str: image-tools version
+ """
+ try:
+ version_file: str = Path(
+ f'{mount_path}/usr/lib/python3/dist-packages/vyos/system/__init__.py').read_text()
+ except FileNotFoundError:
+ system_cfg_ver: int = 0
+ else:
+ res = re_compile(REGEX_SYSTEM_CFG_VER).search(version_file)
+ system_cfg_ver: int = int(res.groupdict().get('cfg_ver', 0))
+
+ return system_cfg_ver
+
+
+def get_versions(image_name: str, root_dir: str = '') -> dict[str, str]:
+ """Return versions of image and image-tools
+
+ Args:
+ image_name (str): a name of an image
+ root_dir (str, optional): an optional path to the root directory.
+ Defaults to ''.
+
+ Returns:
+ dict[str, int]: a dictionary with versions of image and image-tools
+ """
+ if not root_dir:
+ root_dir = disk.find_persistence()
+
+ squashfs_file: str = next(
+ Path(f'{root_dir}/boot/{image_name}').glob('*.squashfs')).as_posix()
+ with TemporaryDirectory() as squashfs_mounted:
+ disk.partition_mount(squashfs_file, squashfs_mounted, 'squashfs')
+
+ image_version: str = get_image_version(squashfs_mounted)
+ image_tools_version: int = get_image_tools_version(squashfs_mounted)
+
+ disk.partition_umount(squashfs_file)
+
+ versions: dict[str, int] = {
+ 'image': image_version,
+ 'image-tools': image_tools_version
+ }
+
+ return versions
+
+
+def get_details(image_name: str, root_dir: str = '') -> ImageDetails:
+ """Return information about image
+
+ Args:
+ image_name (str): a name of an image
+ root_dir (str, optional): an optional path to the root directory.
+ Defaults to ''.
+
+ Returns:
+ ImageDetails: a dictionary with details about an image (name, size)
+ """
+ if not root_dir:
+ root_dir = disk.find_persistence()
+
+ versions = get_versions(image_name, root_dir)
+ image_version: str = versions.get('image', '')
+ image_tools_version: int = versions.get('image-tools', 0)
+
+ image_path: Path = Path(f'{root_dir}/boot/{image_name}')
+ image_path_rw: Path = Path(f'{root_dir}/boot/{image_name}/rw')
+
+ image_disk_ro: int = int()
+ for item in image_path.iterdir():
+ if not item.is_symlink():
+ image_disk_ro += item.stat().st_size
+
+ image_disk_rw: int = int()
+ for item in image_path_rw.rglob('*'):
+ if not item.is_symlink():
+ image_disk_rw += item.stat().st_size
+
+ image_details: ImageDetails = {
+ 'name': image_name,
+ 'version': image_version,
+ 'tools_version': image_tools_version,
+ 'disk_ro': image_disk_ro,
+ 'disk_rw': image_disk_rw,
+ 'disk_total': image_disk_ro + image_disk_rw
+ }
+
+ return image_details
+
+
+def get_images_details() -> list[ImageDetails]:
+ """Return information about all images
+
+ Returns:
+ list[ImageDetails]: a list of dictionaries with details about images
+ """
+ images: list[str] = grub.version_list()
+ images_details: list[ImageDetails] = list()
+ for image_name in images:
+ images_details.append(get_details(image_name))
+
+ return images_details
+
+
+def get_running_image() -> str:
+ """Find currently running image name
+
+ Returns:
+ str: image name
+ """
+ running_image: str = ''
+ regex_filter = re_compile(REGEX_KERNEL_CMDLINE)
+ cmdline: str = Path('/proc/cmdline').read_text()
+ running_image_result = regex_filter.match(cmdline)
+ if running_image_result:
+ running_image: str = running_image_result.groupdict().get(
+ 'image_version', '')
+ # we need to have a fallback for live systems
+ if not running_image:
+ running_image: str = version.get_version()
+
+ return running_image
+
+
+def get_default_image(root_dir: str = '') -> str:
+ """Get default boot entry
+
+ Args:
+ root_dir (str, optional): an optional path to the root directory.
+ Defaults to empty.
+ Returns:
+ str: a version name
+ """
+ if not root_dir:
+ root_dir = disk.find_persistence()
+
+ vars_file: str = f'{root_dir}/{CFG_VYOS_VARS}'
+ vars_current: dict[str, str] = grub.vars_read(vars_file)
+ default_uuid: str = vars_current.get('default', '')
+ if default_uuid:
+ images_list: list[str] = grub.version_list(root_dir)
+ for image_name in images_list:
+ if default_uuid == grub.gen_version_uuid(image_name):
+ return image_name
+ return ''
+ else:
+ return ''
+
+
+def validate_name(image_name: str) -> bool:
+ """Validate image name
+
+ Args:
+ image_name (str): suggested image name
+
+ Returns:
+ bool: validation result
+ """
+ regex_filter = re_compile(r'^[\w\.+-]{1,32}$')
+ if regex_filter.match(image_name):
+ return True
+ return False
+
+
+def is_live_boot() -> bool:
+ """Detect live booted system
+
+ Returns:
+ bool: True if the system currently booted in live mode
+ """
+ regex_filter = re_compile(REGEX_KERNEL_CMDLINE)
+ cmdline: str = Path('/proc/cmdline').read_text()
+ running_image_result = regex_filter.match(cmdline)
+ if running_image_result:
+ boot_type: str = running_image_result.groupdict().get('boot_type', '')
+ if boot_type == 'live':
+ return True
+ return False
+
+def is_running_as_container() -> bool:
+ if Path('/.dockerenv').exists():
+ return True
+ return False
diff --git a/python/vyos/system/raid.py b/python/vyos/system/raid.py
new file mode 100644
index 000000000..13b99fa69
--- /dev/null
+++ b/python/vyos/system/raid.py
@@ -0,0 +1,115 @@
+# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+"""RAID related functions"""
+
+from pathlib import Path
+from shutil import copy
+from dataclasses import dataclass
+
+from vyos.utils.process import cmd
+from vyos.system import disk
+
+
+@dataclass
+class RaidDetails:
+ """RAID type"""
+ name: str
+ level: str
+ members: list[str]
+ disks: list[disk.DiskDetails]
+
+
+def raid_create(raid_members: list[str],
+ raid_name: str = 'md0',
+ raid_level: str = 'raid1') -> None:
+ """Create a RAID array
+
+ Args:
+ raid_name (str): a name of array (data, backup, test, etc.)
+ raid_members (list[str]): a list of array members
+ raid_level (str, optional): an array level. Defaults to 'raid1'.
+ """
+ raid_devices_num: int = len(raid_members)
+ raid_members_str: str = ' '.join(raid_members)
+ if Path('/sys/firmware/efi').exists():
+ for part in raid_members:
+ drive: str = disk.partition_parent(part)
+ command: str = f'sgdisk --typecode=3:A19D880F-05FC-4D3B-A006-743F0F84911E {drive}'
+ cmd(command)
+ else:
+ for part in raid_members:
+ drive: str = disk.partition_parent(part)
+ command: str = f'sgdisk --typecode=3:A19D880F-05FC-4D3B-A006-743F0F84911E {drive}'
+ cmd(command)
+ for part in raid_members:
+ command: str = f'mdadm --zero-superblock {part}'
+ cmd(command)
+ command: str = f'mdadm --create /dev/{raid_name} -R --metadata=1.0 \
+ --raid-devices={raid_devices_num} --level={raid_level} \
+ {raid_members_str}'
+
+ cmd(command)
+
+ raid = RaidDetails(
+ name = f'/dev/{raid_name}',
+ level = raid_level,
+ members = raid_members,
+ disks = [disk.from_partition(m) for m in raid_members]
+ )
+
+ return raid
+
+def update_initramfs() -> None:
+ """Update initramfs"""
+ mdadm_script = '/etc/initramfs-tools/scripts/local-top/mdadm'
+ copy('/usr/share/initramfs-tools/scripts/local-block/mdadm', mdadm_script)
+ p = Path(mdadm_script)
+ p.write_text(p.read_text().replace('$((COUNT + 1))', '20'))
+ command: str = 'update-initramfs -u'
+ cmd(command)
+
+def update_default(target_dir: str) -> None:
+ """Update /etc/default/mdadm to start MD monitoring daemon at boot
+ """
+ source_mdadm_config = '/etc/default/mdadm'
+ target_mdadm_config = Path(target_dir).joinpath('/etc/default/mdadm')
+ target_mdadm_config_dir = Path(target_mdadm_config).parent
+ Path.mkdir(target_mdadm_config_dir, parents=True, exist_ok=True)
+ s = Path(source_mdadm_config).read_text().replace('START_DAEMON=false',
+ 'START_DAEMON=true')
+ Path(target_mdadm_config).write_text(s)
+
+def get_uuid(device: str) -> str:
+ """Get UUID of a device"""
+ command: str = f'tune2fs -l {device}'
+ l = cmd(command).splitlines()
+ uuid = next((x for x in l if x.startswith('Filesystem UUID')), '')
+ return uuid.split(':')[1].strip() if uuid else ''
+
+def get_uuids(raid_details: RaidDetails) -> tuple[str]:
+ """Get UUIDs of RAID members
+
+ Args:
+ raid_name (str): a name of array (data, backup, test, etc.)
+
+ Returns:
+ tuple[str]: root_disk uuid, root_md uuid
+ """
+ raid_name: str = raid_details.name
+ root_partition: str = raid_details.members[0]
+ uuid_root_disk: str = get_uuid(root_partition)
+ uuid_root_md: str = get_uuid(raid_name)
+ return uuid_root_disk, uuid_root_md
diff --git a/python/vyos/template.py b/python/vyos/template.py
index c778d0de8..1e683b605 100644
--- a/python/vyos/template.py
+++ b/python/vyos/template.py
@@ -579,10 +579,10 @@ def nft_rule(rule_conf, fw_hook, fw_name, rule_id, ip_name='ip'):
return parse_rule(rule_conf, fw_hook, fw_name, rule_id, ip_name)
@register_filter('nft_default_rule')
-def nft_default_rule(fw_conf, fw_name, ipv6=False):
+def nft_default_rule(fw_conf, fw_name, family):
output = ['counter']
default_action = fw_conf['default_action']
- family = 'ipv6' if ipv6 else 'ipv4'
+ #family = 'ipv6' if ipv6 else 'ipv4'
if 'enable_default_log' in fw_conf:
action_suffix = default_action[:1].upper()
@@ -592,7 +592,7 @@ def nft_default_rule(fw_conf, fw_name, ipv6=False):
output.append(f'{default_action}')
if 'default_jump_target' in fw_conf:
target = fw_conf['default_jump_target']
- def_suffix = '6' if ipv6 else ''
+ def_suffix = '6' if family == 'ipv6' else ''
output.append(f'NAME{def_suffix}_{target}')
output.append(f'comment "{fw_name} default-action {default_action}"')
diff --git a/python/vyos/utils/convert.py b/python/vyos/utils/convert.py
index 9a8a1ff7d..c02f0071e 100644
--- a/python/vyos/utils/convert.py
+++ b/python/vyos/utils/convert.py
@@ -52,7 +52,8 @@ def seconds_to_human(s, separator=""):
return result
-def bytes_to_human(bytes, initial_exponent=0, precision=2):
+def bytes_to_human(bytes, initial_exponent=0, precision=2,
+ int_below_exponent=0):
""" Converts a value in bytes to a human-readable size string like 640 KB
The initial_exponent parameter is the exponent of 2,
@@ -68,6 +69,8 @@ def bytes_to_human(bytes, initial_exponent=0, precision=2):
# log2 is a float, while range checking requires an int
exponent = int(log2(bytes))
+ if exponent < int_below_exponent:
+ precision = 0
if exponent < 10:
value = bytes
diff --git a/python/vyos/utils/file.py b/python/vyos/utils/file.py
index 667a2464b..9f27a7fb9 100644
--- a/python/vyos/utils/file.py
+++ b/python/vyos/utils/file.py
@@ -134,6 +134,12 @@ def chmod_755(path):
S_IROTH | S_IXOTH
chmod(path, bitmask)
+def chmod_2775(path):
+ """ user/group permissions with set-group-id bit set """
+ from stat import S_ISGID, S_IRWXU, S_IRWXG, S_IROTH, S_IXOTH
+
+ bitmask = S_ISGID | S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH
+ chmod(path, bitmask)
def makedir(path, user=None, group=None):
if os.path.exists(path):
diff --git a/python/vyos/utils/io.py b/python/vyos/utils/io.py
index 5fffa62f8..74099b502 100644
--- a/python/vyos/utils/io.py
+++ b/python/vyos/utils/io.py
@@ -13,6 +13,8 @@
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+from typing import Callable
+
def print_error(str='', end='\n'):
"""
Print `str` to stderr, terminated with `end`.
@@ -62,3 +64,36 @@ def ask_yes_no(question, default=False) -> bool:
stdout.write("Please respond with yes/y or no/n\n")
except EOFError:
stdout.write("\nPlease respond with yes/y or no/n\n")
+
+def is_interactive():
+ """Try to determine if the routine was called from an interactive shell."""
+ import os, sys
+ return os.getenv('TERM', default=False) and sys.stderr.isatty() and sys.stdout.isatty()
+
+def is_dumb_terminal():
+ """Check if the current TTY is dumb, so that we can disable advanced terminal features."""
+ import os
+ return os.getenv('TERM') in ['vt100', 'dumb']
+
+def select_entry(l: list, list_msg: str = '', prompt_msg: str = '',
+ list_format: Callable = None,) -> str:
+ """Select an entry from a list
+
+ Args:
+ l (list): a list of entries
+ list_msg (str): a message to print before listing the entries
+ prompt_msg (str): a message to print as prompt for selection
+
+ Returns:
+ str: a selected entry
+ """
+ en = list(enumerate(l, 1))
+ print(list_msg)
+ for i, e in en:
+ if list_format:
+ print(f'\t{i}: {list_format(e)}')
+ else:
+ print(f'\t{i}: {e}')
+ select = ask_input(prompt_msg, numeric_only=True,
+ valid_responses=range(1, len(l)+1))
+ return next(filter(lambda x: x[0] == select, en))[1]
diff --git a/python/vyos/utils/network.py b/python/vyos/utils/network.py
index 5d19f256b..2a0808fca 100644
--- a/python/vyos/utils/network.py
+++ b/python/vyos/utils/network.py
@@ -61,14 +61,17 @@ def get_vrf_members(vrf: str) -> list:
"""
import json
from vyos.utils.process import cmd
- if not interface_exists(vrf):
- raise ValueError(f'VRF "{vrf}" does not exist!')
- output = cmd(f'ip --json --brief link show master {vrf}')
- answer = json.loads(output)
interfaces = []
- for data in answer:
- if 'ifname' in data:
- interfaces.append(data.get('ifname'))
+ try:
+ if not interface_exists(vrf):
+ raise ValueError(f'VRF "{vrf}" does not exist!')
+ output = cmd(f'ip --json --brief link show vrf {vrf}')
+ answer = json.loads(output)
+ for data in answer:
+ if 'ifname' in data:
+ interfaces.append(data.get('ifname'))
+ except:
+ pass
return interfaces
def get_interface_vrf(interface):
@@ -483,3 +486,36 @@ def get_vxlan_vlan_tunnels(interface: str) -> list:
os_configured_vlan_ids.append(str(vlanStart))
return os_configured_vlan_ids
+
+def get_vxlan_vni_filter(interface: str) -> list:
+ """ Return a list of strings with VNIs configured in the Kernel"""
+ from json import loads
+ from vyos.utils.process import cmd
+
+ if not interface.startswith('vxlan'):
+ raise ValueError('Only applicable for VXLAN interfaces!')
+
+ # Determine current OS Kernel configured VNI filters in VXLAN interface
+ #
+ # $ bridge -j vni show dev vxlan1
+ # [{"ifname":"vxlan1","vnis":[{"vni":100},{"vni":200},{"vni":300,"vniEnd":399}]}]
+ #
+ # Example output: ['10010', '10020', '10021', '10022']
+ os_configured_vnis = []
+ tmp = loads(cmd(f'bridge --json vni show dev {interface}'))
+ if tmp:
+ for tunnel in tmp[0].get('vnis', {}):
+ vniStart = tunnel['vni']
+ if 'vniEnd' in tunnel:
+ vniEnd = tunnel['vniEnd']
+ # Build a real list for user VNIs
+ vni_list = list(range(vniStart, vniEnd +1))
+ # Convert list of integers to list or strings
+ os_configured_vnis.extend(map(str, vni_list))
+ # Proceed with next tunnel - this one is complete
+ continue
+
+ # Add single tunel id - not part of a range
+ os_configured_vnis.append(str(vniStart))
+
+ return os_configured_vnis