summaryrefslogtreecommitdiff
path: root/python/vyos/system
diff options
context:
space:
mode:
Diffstat (limited to 'python/vyos/system')
-rw-r--r--python/vyos/system/__init__.py4
-rw-r--r--python/vyos/system/compat.py307
-rw-r--r--python/vyos/system/disk.py2
-rw-r--r--python/vyos/system/grub.py7
-rw-r--r--python/vyos/system/image.py88
5 files changed, 392 insertions, 16 deletions
diff --git a/python/vyos/system/__init__.py b/python/vyos/system/__init__.py
index 403738e20..0c91330ba 100644
--- a/python/vyos/system/__init__.py
+++ b/python/vyos/system/__init__.py
@@ -1,4 +1,4 @@
-# Copyright 2022 VyOS maintainers and contributors <maintainers@vyos.io>
+# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -14,3 +14,5 @@
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
__all_: list[str] = ['disk', 'grub', 'image']
+# define image-tools version
+SYSTEM_CFG_VER = 1
diff --git a/python/vyos/system/compat.py b/python/vyos/system/compat.py
new file mode 100644
index 000000000..aa9b0b4b5
--- /dev/null
+++ b/python/vyos/system/compat.py
@@ -0,0 +1,307 @@
+# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+from pathlib import Path
+from re import compile, MULTILINE, DOTALL
+from functools import wraps
+from copy import deepcopy
+from typing import Union
+
+from vyos.system import disk, grub, image, SYSTEM_CFG_VER
+from vyos.template import render
+
+TMPL_GRUB_COMPAT: str = 'grub/grub_compat.j2'
+
+# define regexes and variables
+REGEX_VERSION = r'^menuentry "[^\n]*{\n[^}]*\s+linux /boot/(?P<version>\S+)/[^}]*}'
+REGEX_MENUENTRY = r'^menuentry "[^\n]*{\n[^}]*\s+linux /boot/(?P<version>\S+)/vmlinuz (?P<options>[^\n]+)\n[^}]*}'
+REGEX_CONSOLE = r'^.*console=(?P<console_type>[^\s\d]+)(?P<console_num>[\d]+).*$'
+REGEX_SANIT_CONSOLE = r'\ ?console=[^\s\d]+[\d]+(,\d+)?\ ?'
+REGEX_SANIT_INIT = r'\ ?init=\S*\ ?'
+REGEX_SANIT_QUIET = r'\ ?quiet\ ?'
+PW_RESET_OPTION = 'init=/opt/vyatta/sbin/standalone_root_pw_reset'
+
+
+class DowngradingImageTools(Exception):
+ """Raised when attempting to add an image with an earlier version
+ of image-tools than the current system, as indicated by the value
+ of SYSTEM_CFG_VER or absence thereof."""
+ pass
+
+
+def mode():
+ if grub.get_cfg_ver() >= SYSTEM_CFG_VER:
+ return False
+
+ return True
+
+
+def find_versions(menu_entries: list) -> list:
+ """Find unique VyOS versions from menu entries
+
+ Args:
+ menu_entries (list): a list with menu entries
+
+ Returns:
+ list: List of installed versions
+ """
+ versions = []
+ for vyos_ver in menu_entries:
+ versions.append(vyos_ver.get('version'))
+ # remove duplicates
+ versions = list(set(versions))
+ return versions
+
+
+def filter_unparsed(grub_path: str) -> str:
+ """Find currently installed VyOS version
+
+ Args:
+ grub_path (str): a path to the grub.cfg file
+
+ Returns:
+ str: unparsed grub.cfg items
+ """
+ config_text = Path(grub_path).read_text()
+ regex_filter = compile(REGEX_VERSION, MULTILINE | DOTALL)
+ filtered = regex_filter.sub('', config_text)
+ regex_filter = compile(grub.REGEX_GRUB_VARS, MULTILINE)
+ filtered = regex_filter.sub('', filtered)
+ regex_filter = compile(grub.REGEX_GRUB_MODULES, MULTILINE)
+ filtered = regex_filter.sub('', filtered)
+ # strip extra new lines
+ filtered = filtered.strip()
+ return filtered
+
+
+def sanitize_boot_opts(boot_opts: str) -> str:
+ """Sanitize boot options from console and init
+
+ Args:
+ boot_opts (str): boot options
+
+ Returns:
+ str: sanitized boot options
+ """
+ regex_filter = compile(REGEX_SANIT_CONSOLE)
+ boot_opts = regex_filter.sub('', boot_opts)
+ regex_filter = compile(REGEX_SANIT_INIT)
+ boot_opts = regex_filter.sub('', boot_opts)
+ # legacy tools add 'quiet' on add system image; this is not desired
+ regex_filter = compile(REGEX_SANIT_QUIET)
+ boot_opts = regex_filter.sub(' ', boot_opts)
+
+ return boot_opts
+
+
+def parse_entry(entry: tuple) -> dict:
+ """Parse GRUB menuentry
+
+ Args:
+ entry (tuple): tuple of (version, options)
+
+ Returns:
+ dict: dictionary with parsed options
+ """
+ # save version to dict
+ entry_dict = {'version': entry[0]}
+ # detect boot mode type
+ if PW_RESET_OPTION in entry[1]:
+ entry_dict['bootmode'] = 'pw_reset'
+ else:
+ entry_dict['bootmode'] = 'normal'
+ # find console type and number
+ regex_filter = compile(REGEX_CONSOLE)
+ entry_dict.update(regex_filter.match(entry[1]).groupdict())
+ entry_dict['boot_opts'] = sanitize_boot_opts(entry[1])
+
+ return entry_dict
+
+
+def parse_menuentries(grub_path: str) -> list:
+ """Parse all GRUB menuentries
+
+ Args:
+ grub_path (str): a path to GRUB config file
+
+ Returns:
+ list: list with menu items (each item is a dict)
+ """
+ menuentries = []
+ # read configuration file
+ config_text = Path(grub_path).read_text()
+ # parse menuentries to tuples (version, options)
+ regex_filter = compile(REGEX_MENUENTRY, MULTILINE)
+ filter_results = regex_filter.findall(config_text)
+ # parse each entry
+ for entry in filter_results:
+ menuentries.append(parse_entry(entry))
+
+ return menuentries
+
+
+def prune_vyos_versions(root_dir: str = '') -> None:
+ """Delete vyos-versions files of registered images subsequently deleted
+ or renamed by legacy image-tools
+
+ Args:
+ root_dir (str): an optional path to the root directory
+ """
+ if not root_dir:
+ root_dir = disk.find_persistence()
+
+ for version in grub.version_list():
+ if not Path(f'{root_dir}/boot/{version}').is_dir():
+ grub.version_del(version)
+
+
+def update_cfg_ver(root_dir:str = '') -> int:
+ """Get minumum version of image-tools across all installed images
+
+ Args:
+ root_dir (str): an optional path to the root directory
+
+ Returns:
+ int: minimum version of image-tools
+ """
+ if not root_dir:
+ root_dir = disk.find_persistence()
+
+ prune_vyos_versions(root_dir)
+
+ images_details = image.get_images_details()
+ cfg_version = min(d['tools_version'] for d in images_details)
+
+ return cfg_version
+
+
+def get_default(menu_entries: list, root_dir: str = '') -> Union[int, None]:
+ """Translate default version to menuentry index
+
+ Args:
+ menu_entries (list): list of dicts of installed version boot data
+ root_dir (str): an optional path to the root directory
+
+ Returns:
+ int: index of default version in menu_entries or None
+ """
+ if not root_dir:
+ root_dir = disk.find_persistence()
+
+ grub_cfg_main = f'{root_dir}/{grub.GRUB_CFG_MAIN}'
+
+ image_name = image.get_default_image()
+
+ sublist = list(filter(lambda x: x.get('version') == image_name,
+ menu_entries))
+ if sublist:
+ return menu_entries.index(sublist[0])
+
+ return None
+
+
+def update_version_list(root_dir: str = '') -> list[dict]:
+ """Update list of dicts of installed version boot data
+
+ Args:
+ root_dir (str): an optional path to the root directory
+
+ Returns:
+ list: list of dicts of installed version boot data
+ """
+ if not root_dir:
+ root_dir = disk.find_persistence()
+
+ grub_cfg_main = f'{root_dir}/{grub.GRUB_CFG_MAIN}'
+
+ # get list of versions in menuentries
+ menu_entries = parse_menuentries(grub_cfg_main)
+ menu_versions = find_versions(menu_entries)
+
+ # get list of versions added/removed by image-tools
+ current_versions = grub.version_list(root_dir)
+
+ remove = list(set(menu_versions) - set(current_versions))
+ for ver in remove:
+ menu_entries = list(filter(lambda x: x.get('version') != ver,
+ menu_entries))
+
+ add = list(set(current_versions) - set(menu_versions))
+ for ver in add:
+ last = menu_entries[0].get('version')
+ new = deepcopy(list(filter(lambda x: x.get('version') == last,
+ menu_entries)))
+ for e in new:
+ boot_opts = e.get('boot_opts').replace(last, ver)
+ e.update({'version': ver, 'boot_opts': boot_opts})
+
+ menu_entries = new + menu_entries
+
+ return menu_entries
+
+
+def grub_cfg_fields(root_dir: str = '') -> dict:
+ """Gather fields for rendering grub.cfg
+
+ Args:
+ root_dir (str): an optional path to the root directory
+
+ Returns:
+ dict: dictionary for rendering TMPL_GRUB_COMPAT
+ """
+ if not root_dir:
+ root_dir = disk.find_persistence()
+
+ grub_cfg_main = f'{root_dir}/{grub.GRUB_CFG_MAIN}'
+
+ fields = {'default': 0, 'timeout': 5}
+ # 'default' and 'timeout' from legacy grub.cfg
+ fields |= grub.vars_read(grub_cfg_main)
+ fields['tools_version'] = SYSTEM_CFG_VER
+ menu_entries = update_version_list(root_dir)
+ fields['versions'] = menu_entries
+ default = get_default(menu_entries, root_dir)
+ if default is not None:
+ fields['default'] = default
+
+ p = Path('/sys/firmware/efi')
+ if p.is_dir():
+ fields['efi'] = True
+ else:
+ fields['efi'] = False
+
+ return fields
+
+
+def render_grub_cfg(root_dir: str = '') -> None:
+ """Render grub.cfg for legacy compatibility"""
+ if not root_dir:
+ root_dir = disk.find_persistence()
+
+ grub_cfg_main = f'{root_dir}/{grub.GRUB_CFG_MAIN}'
+
+ fields = grub_cfg_fields(root_dir)
+ render(grub_cfg_main, TMPL_GRUB_COMPAT, fields)
+
+
+def grub_cfg_update(func):
+ """Decorator to update grub.cfg after function call"""
+ @wraps(func)
+ def wrapper(*args, **kwargs):
+ ret = func(*args, **kwargs)
+ if mode():
+ render_grub_cfg()
+ return ret
+ return wrapper
diff --git a/python/vyos/system/disk.py b/python/vyos/system/disk.py
index b78190c06..882b4eb39 100644
--- a/python/vyos/system/disk.py
+++ b/python/vyos/system/disk.py
@@ -1,4 +1,4 @@
-# Copyright 2022 VyOS maintainers and contributors <maintainers@vyos.io>
+# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
diff --git a/python/vyos/system/grub.py b/python/vyos/system/grub.py
index 47c674de8..9ac205c03 100644
--- a/python/vyos/system/grub.py
+++ b/python/vyos/system/grub.py
@@ -1,4 +1,4 @@
-# Copyright 2022 VyOS maintainers and contributors <maintainers@vyos.io>
+# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -24,6 +24,7 @@ from vyos.system import disk
# Define variables
GRUB_DIR_MAIN: str = '/boot/grub'
+GRUB_CFG_MAIN: str = f'{GRUB_DIR_MAIN}/grub.cfg'
GRUB_DIR_VYOS: str = f'{GRUB_DIR_MAIN}/grub.cfg.d'
CFG_VYOS_HEADER: str = f'{GRUB_DIR_VYOS}/00-vyos-header.cfg'
CFG_VYOS_MODULES: str = f'{GRUB_DIR_VYOS}/10-vyos-modules-autoload.cfg'
@@ -181,8 +182,8 @@ def get_cfg_ver(root_dir: str = '') -> int:
if not root_dir:
root_dir = disk.find_persistence()
- cfg_ver: Union[str, None] = vars_read(f'{root_dir}/{CFG_VYOS_HEADER}').get(
- 'VYOS_CFG_VER')
+ cfg_ver: str = vars_read(f'{root_dir}/{CFG_VYOS_HEADER}').get(
+ 'VYOS_CFG_VER')
if cfg_ver:
cfg_ver_int: int = int(cfg_ver)
else:
diff --git a/python/vyos/system/image.py b/python/vyos/system/image.py
index b77c3563f..6c4e3bba5 100644
--- a/python/vyos/system/image.py
+++ b/python/vyos/system/image.py
@@ -1,4 +1,4 @@
-# Copyright 2022 VyOS maintainers and contributors <maintainers@vyos.io>
+# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -28,12 +28,14 @@ CFG_VYOS_VARS: str = f'{GRUB_DIR_VYOS}/20-vyos-defaults-autoload.cfg'
GRUB_DIR_VYOS_VERS: str = f'{GRUB_DIR_VYOS}/vyos-versions'
# prepare regexes
REGEX_KERNEL_CMDLINE: str = r'^BOOT_IMAGE=/(?P<boot_type>boot|live)/((?P<image_version>.+)/)?vmlinuz.*$'
+REGEX_SYSTEM_CFG_VER: str = r'(\r\n|\r|\n)SYSTEM_CFG_VER\s*=\s*(?P<cfg_ver>\d+)(\r\n|\r|\n)'
# structures definitions
class ImageDetails(TypedDict):
name: str
version: str
+ tools_version: int
disk_ro: int
disk_rw: int
disk_total: int
@@ -59,26 +61,73 @@ def bootmode_detect() -> str:
return 'bios'
-def get_version(image_name: str, root_dir: str) -> str:
- """Extract version name from rootfs based on image name
+def get_image_version(mount_path: str) -> str:
+ """Extract version name from rootfs mounted at mount_path
Args:
- image_name (str): a name of image (from boot menu)
- root_dir (str): a root directory of persistence storage
+ mount_path (str): mount path of rootfs
Returns:
str: version name
"""
+ version_file: str = Path(
+ f'{mount_path}/opt/vyatta/etc/version').read_text()
+ version_name: str = version_file.lstrip('Version: ').strip()
+
+ return version_name
+
+
+def get_image_tools_version(mount_path: str) -> int:
+ """Extract image-tools version from rootfs mounted at mount_path
+
+ Args:
+ mount_path (str): mount path of rootfs
+
+ Returns:
+ str: image-tools version
+ """
+ try:
+ version_file: str = Path(
+ f'{mount_path}/usr/lib/python3/dist-packages/vyos/system/__init__.py').read_text()
+ except FileNotFoundError:
+ system_cfg_ver: int = 0
+ else:
+ res = re_compile(REGEX_SYSTEM_CFG_VER).search(version_file)
+ system_cfg_ver: int = int(res.groupdict().get('cfg_ver', 0))
+
+ return system_cfg_ver
+
+
+def get_versions(image_name: str, root_dir: str = '') -> dict[str, str]:
+ """Return versions of image and image-tools
+
+ Args:
+ image_name (str): a name of an image
+ root_dir (str, optional): an optional path to the root directory.
+ Defaults to ''.
+
+ Returns:
+ dict[str, int]: a dictionary with versions of image and image-tools
+ """
+ if not root_dir:
+ root_dir = disk.find_persistence()
+
squashfs_file: str = next(
Path(f'{root_dir}/boot/{image_name}').glob('*.squashfs')).as_posix()
with TemporaryDirectory() as squashfs_mounted:
disk.partition_mount(squashfs_file, squashfs_mounted, 'squashfs')
- version_file: str = Path(
- f'{squashfs_mounted}/opt/vyatta/etc/version').read_text()
+
+ image_version: str = get_image_version(squashfs_mounted)
+ image_tools_version: int = get_image_tools_version(squashfs_mounted)
+
disk.partition_umount(squashfs_file)
- version_name: str = version_file.lstrip('Version: ').strip()
- return version_name
+ versions: dict[str, int] = {
+ 'image': image_version,
+ 'image-tools': image_tools_version
+ }
+
+ return versions
def get_details(image_name: str, root_dir: str = '') -> ImageDetails:
@@ -95,7 +144,9 @@ def get_details(image_name: str, root_dir: str = '') -> ImageDetails:
if not root_dir:
root_dir = disk.find_persistence()
- image_version: str = get_version(image_name, root_dir)
+ versions = get_versions(image_name, root_dir)
+ image_version: str = versions.get('image', '')
+ image_tools_version: int = versions.get('image-tools', 0)
image_path: Path = Path(f'{root_dir}/boot/{image_name}')
image_path_rw: Path = Path(f'{root_dir}/boot/{image_name}/rw')
@@ -113,6 +164,7 @@ def get_details(image_name: str, root_dir: str = '') -> ImageDetails:
image_details: ImageDetails = {
'name': image_name,
'version': image_version,
+ 'tools_version': image_tools_version,
'disk_ro': image_disk_ro,
'disk_rw': image_disk_rw,
'disk_total': image_disk_ro + image_disk_rw
@@ -121,6 +173,20 @@ def get_details(image_name: str, root_dir: str = '') -> ImageDetails:
return image_details
+def get_images_details() -> list[ImageDetails]:
+ """Return information about all images
+
+ Returns:
+ list[ImageDetails]: a list of dictionaries with details about images
+ """
+ images: list[str] = grub.version_list()
+ images_details: list[ImageDetails] = list()
+ for image_name in images:
+ images_details.append(get_details(image_name))
+
+ return images_details
+
+
def get_running_image() -> str:
"""Find currently running image name
@@ -134,7 +200,7 @@ def get_running_image() -> str:
if running_image_result:
running_image: str = running_image_result.groupdict().get(
'image_version', '')
- # we need to have a fallbak for live systems
+ # we need to have a fallback for live systems
if not running_image:
running_image: str = version.get_version()