From 96b65e90fbfa1fe63d97929ac86fc910abb0caa9 Mon Sep 17 00:00:00 2001 From: John Estabrook Date: Sat, 21 Oct 2023 13:33:37 -0500 Subject: image: T4516: support for interoperability of legacy/new image tools This commit allows management of system images with either new or legacy tools: 'add/delete/rename system image' and 'set default' are translated appropriately on booting between images with the old and new tools. Consequently, the warning of the initial commit of T4516 is dropped. --- python/vyos/system/__init__.py | 4 +- python/vyos/system/compat.py | 307 +++++++++++++++++++++++++++++++++++++++++ python/vyos/system/disk.py | 2 +- python/vyos/system/grub.py | 7 +- python/vyos/system/image.py | 88 ++++++++++-- 5 files changed, 392 insertions(+), 16 deletions(-) create mode 100644 python/vyos/system/compat.py (limited to 'python') diff --git a/python/vyos/system/__init__.py b/python/vyos/system/__init__.py index 403738e20..0c91330ba 100644 --- a/python/vyos/system/__init__.py +++ b/python/vyos/system/__init__.py @@ -1,4 +1,4 @@ -# Copyright 2022 VyOS maintainers and contributors +# Copyright 2023 VyOS maintainers and contributors # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -14,3 +14,5 @@ # License along with this library. If not, see . __all_: list[str] = ['disk', 'grub', 'image'] +# define image-tools version +SYSTEM_CFG_VER = 1 diff --git a/python/vyos/system/compat.py b/python/vyos/system/compat.py new file mode 100644 index 000000000..aa9b0b4b5 --- /dev/null +++ b/python/vyos/system/compat.py @@ -0,0 +1,307 @@ +# Copyright 2023 VyOS maintainers and contributors +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this library. If not, see . + +from pathlib import Path +from re import compile, MULTILINE, DOTALL +from functools import wraps +from copy import deepcopy +from typing import Union + +from vyos.system import disk, grub, image, SYSTEM_CFG_VER +from vyos.template import render + +TMPL_GRUB_COMPAT: str = 'grub/grub_compat.j2' + +# define regexes and variables +REGEX_VERSION = r'^menuentry "[^\n]*{\n[^}]*\s+linux /boot/(?P\S+)/[^}]*}' +REGEX_MENUENTRY = r'^menuentry "[^\n]*{\n[^}]*\s+linux /boot/(?P\S+)/vmlinuz (?P[^\n]+)\n[^}]*}' +REGEX_CONSOLE = r'^.*console=(?P[^\s\d]+)(?P[\d]+).*$' +REGEX_SANIT_CONSOLE = r'\ ?console=[^\s\d]+[\d]+(,\d+)?\ ?' +REGEX_SANIT_INIT = r'\ ?init=\S*\ ?' +REGEX_SANIT_QUIET = r'\ ?quiet\ ?' +PW_RESET_OPTION = 'init=/opt/vyatta/sbin/standalone_root_pw_reset' + + +class DowngradingImageTools(Exception): + """Raised when attempting to add an image with an earlier version + of image-tools than the current system, as indicated by the value + of SYSTEM_CFG_VER or absence thereof.""" + pass + + +def mode(): + if grub.get_cfg_ver() >= SYSTEM_CFG_VER: + return False + + return True + + +def find_versions(menu_entries: list) -> list: + """Find unique VyOS versions from menu entries + + Args: + menu_entries (list): a list with menu entries + + Returns: + list: List of installed versions + """ + versions = [] + for vyos_ver in menu_entries: + versions.append(vyos_ver.get('version')) + # remove duplicates + versions = list(set(versions)) + return versions + + +def filter_unparsed(grub_path: str) -> str: + """Find currently installed VyOS version + + Args: + grub_path (str): a path to the grub.cfg file + + Returns: + str: unparsed grub.cfg items + """ + config_text = Path(grub_path).read_text() + regex_filter = compile(REGEX_VERSION, MULTILINE | DOTALL) + filtered = regex_filter.sub('', config_text) + regex_filter = compile(grub.REGEX_GRUB_VARS, MULTILINE) + filtered = regex_filter.sub('', filtered) + regex_filter = compile(grub.REGEX_GRUB_MODULES, MULTILINE) + filtered = regex_filter.sub('', filtered) + # strip extra new lines + filtered = filtered.strip() + return filtered + + +def sanitize_boot_opts(boot_opts: str) -> str: + """Sanitize boot options from console and init + + Args: + boot_opts (str): boot options + + Returns: + str: sanitized boot options + """ + regex_filter = compile(REGEX_SANIT_CONSOLE) + boot_opts = regex_filter.sub('', boot_opts) + regex_filter = compile(REGEX_SANIT_INIT) + boot_opts = regex_filter.sub('', boot_opts) + # legacy tools add 'quiet' on add system image; this is not desired + regex_filter = compile(REGEX_SANIT_QUIET) + boot_opts = regex_filter.sub(' ', boot_opts) + + return boot_opts + + +def parse_entry(entry: tuple) -> dict: + """Parse GRUB menuentry + + Args: + entry (tuple): tuple of (version, options) + + Returns: + dict: dictionary with parsed options + """ + # save version to dict + entry_dict = {'version': entry[0]} + # detect boot mode type + if PW_RESET_OPTION in entry[1]: + entry_dict['bootmode'] = 'pw_reset' + else: + entry_dict['bootmode'] = 'normal' + # find console type and number + regex_filter = compile(REGEX_CONSOLE) + entry_dict.update(regex_filter.match(entry[1]).groupdict()) + entry_dict['boot_opts'] = sanitize_boot_opts(entry[1]) + + return entry_dict + + +def parse_menuentries(grub_path: str) -> list: + """Parse all GRUB menuentries + + Args: + grub_path (str): a path to GRUB config file + + Returns: + list: list with menu items (each item is a dict) + """ + menuentries = [] + # read configuration file + config_text = Path(grub_path).read_text() + # parse menuentries to tuples (version, options) + regex_filter = compile(REGEX_MENUENTRY, MULTILINE) + filter_results = regex_filter.findall(config_text) + # parse each entry + for entry in filter_results: + menuentries.append(parse_entry(entry)) + + return menuentries + + +def prune_vyos_versions(root_dir: str = '') -> None: + """Delete vyos-versions files of registered images subsequently deleted + or renamed by legacy image-tools + + Args: + root_dir (str): an optional path to the root directory + """ + if not root_dir: + root_dir = disk.find_persistence() + + for version in grub.version_list(): + if not Path(f'{root_dir}/boot/{version}').is_dir(): + grub.version_del(version) + + +def update_cfg_ver(root_dir:str = '') -> int: + """Get minumum version of image-tools across all installed images + + Args: + root_dir (str): an optional path to the root directory + + Returns: + int: minimum version of image-tools + """ + if not root_dir: + root_dir = disk.find_persistence() + + prune_vyos_versions(root_dir) + + images_details = image.get_images_details() + cfg_version = min(d['tools_version'] for d in images_details) + + return cfg_version + + +def get_default(menu_entries: list, root_dir: str = '') -> Union[int, None]: + """Translate default version to menuentry index + + Args: + menu_entries (list): list of dicts of installed version boot data + root_dir (str): an optional path to the root directory + + Returns: + int: index of default version in menu_entries or None + """ + if not root_dir: + root_dir = disk.find_persistence() + + grub_cfg_main = f'{root_dir}/{grub.GRUB_CFG_MAIN}' + + image_name = image.get_default_image() + + sublist = list(filter(lambda x: x.get('version') == image_name, + menu_entries)) + if sublist: + return menu_entries.index(sublist[0]) + + return None + + +def update_version_list(root_dir: str = '') -> list[dict]: + """Update list of dicts of installed version boot data + + Args: + root_dir (str): an optional path to the root directory + + Returns: + list: list of dicts of installed version boot data + """ + if not root_dir: + root_dir = disk.find_persistence() + + grub_cfg_main = f'{root_dir}/{grub.GRUB_CFG_MAIN}' + + # get list of versions in menuentries + menu_entries = parse_menuentries(grub_cfg_main) + menu_versions = find_versions(menu_entries) + + # get list of versions added/removed by image-tools + current_versions = grub.version_list(root_dir) + + remove = list(set(menu_versions) - set(current_versions)) + for ver in remove: + menu_entries = list(filter(lambda x: x.get('version') != ver, + menu_entries)) + + add = list(set(current_versions) - set(menu_versions)) + for ver in add: + last = menu_entries[0].get('version') + new = deepcopy(list(filter(lambda x: x.get('version') == last, + menu_entries))) + for e in new: + boot_opts = e.get('boot_opts').replace(last, ver) + e.update({'version': ver, 'boot_opts': boot_opts}) + + menu_entries = new + menu_entries + + return menu_entries + + +def grub_cfg_fields(root_dir: str = '') -> dict: + """Gather fields for rendering grub.cfg + + Args: + root_dir (str): an optional path to the root directory + + Returns: + dict: dictionary for rendering TMPL_GRUB_COMPAT + """ + if not root_dir: + root_dir = disk.find_persistence() + + grub_cfg_main = f'{root_dir}/{grub.GRUB_CFG_MAIN}' + + fields = {'default': 0, 'timeout': 5} + # 'default' and 'timeout' from legacy grub.cfg + fields |= grub.vars_read(grub_cfg_main) + fields['tools_version'] = SYSTEM_CFG_VER + menu_entries = update_version_list(root_dir) + fields['versions'] = menu_entries + default = get_default(menu_entries, root_dir) + if default is not None: + fields['default'] = default + + p = Path('/sys/firmware/efi') + if p.is_dir(): + fields['efi'] = True + else: + fields['efi'] = False + + return fields + + +def render_grub_cfg(root_dir: str = '') -> None: + """Render grub.cfg for legacy compatibility""" + if not root_dir: + root_dir = disk.find_persistence() + + grub_cfg_main = f'{root_dir}/{grub.GRUB_CFG_MAIN}' + + fields = grub_cfg_fields(root_dir) + render(grub_cfg_main, TMPL_GRUB_COMPAT, fields) + + +def grub_cfg_update(func): + """Decorator to update grub.cfg after function call""" + @wraps(func) + def wrapper(*args, **kwargs): + ret = func(*args, **kwargs) + if mode(): + render_grub_cfg() + return ret + return wrapper diff --git a/python/vyos/system/disk.py b/python/vyos/system/disk.py index b78190c06..882b4eb39 100644 --- a/python/vyos/system/disk.py +++ b/python/vyos/system/disk.py @@ -1,4 +1,4 @@ -# Copyright 2022 VyOS maintainers and contributors +# Copyright 2023 VyOS maintainers and contributors # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public diff --git a/python/vyos/system/grub.py b/python/vyos/system/grub.py index 47c674de8..9ac205c03 100644 --- a/python/vyos/system/grub.py +++ b/python/vyos/system/grub.py @@ -1,4 +1,4 @@ -# Copyright 2022 VyOS maintainers and contributors +# Copyright 2023 VyOS maintainers and contributors # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -24,6 +24,7 @@ from vyos.system import disk # Define variables GRUB_DIR_MAIN: str = '/boot/grub' +GRUB_CFG_MAIN: str = f'{GRUB_DIR_MAIN}/grub.cfg' GRUB_DIR_VYOS: str = f'{GRUB_DIR_MAIN}/grub.cfg.d' CFG_VYOS_HEADER: str = f'{GRUB_DIR_VYOS}/00-vyos-header.cfg' CFG_VYOS_MODULES: str = f'{GRUB_DIR_VYOS}/10-vyos-modules-autoload.cfg' @@ -181,8 +182,8 @@ def get_cfg_ver(root_dir: str = '') -> int: if not root_dir: root_dir = disk.find_persistence() - cfg_ver: Union[str, None] = vars_read(f'{root_dir}/{CFG_VYOS_HEADER}').get( - 'VYOS_CFG_VER') + cfg_ver: str = vars_read(f'{root_dir}/{CFG_VYOS_HEADER}').get( + 'VYOS_CFG_VER') if cfg_ver: cfg_ver_int: int = int(cfg_ver) else: diff --git a/python/vyos/system/image.py b/python/vyos/system/image.py index b77c3563f..6c4e3bba5 100644 --- a/python/vyos/system/image.py +++ b/python/vyos/system/image.py @@ -1,4 +1,4 @@ -# Copyright 2022 VyOS maintainers and contributors +# Copyright 2023 VyOS maintainers and contributors # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -28,12 +28,14 @@ CFG_VYOS_VARS: str = f'{GRUB_DIR_VYOS}/20-vyos-defaults-autoload.cfg' GRUB_DIR_VYOS_VERS: str = f'{GRUB_DIR_VYOS}/vyos-versions' # prepare regexes REGEX_KERNEL_CMDLINE: str = r'^BOOT_IMAGE=/(?Pboot|live)/((?P.+)/)?vmlinuz.*$' +REGEX_SYSTEM_CFG_VER: str = r'(\r\n|\r|\n)SYSTEM_CFG_VER\s*=\s*(?P\d+)(\r\n|\r|\n)' # structures definitions class ImageDetails(TypedDict): name: str version: str + tools_version: int disk_ro: int disk_rw: int disk_total: int @@ -59,26 +61,73 @@ def bootmode_detect() -> str: return 'bios' -def get_version(image_name: str, root_dir: str) -> str: - """Extract version name from rootfs based on image name +def get_image_version(mount_path: str) -> str: + """Extract version name from rootfs mounted at mount_path Args: - image_name (str): a name of image (from boot menu) - root_dir (str): a root directory of persistence storage + mount_path (str): mount path of rootfs Returns: str: version name """ + version_file: str = Path( + f'{mount_path}/opt/vyatta/etc/version').read_text() + version_name: str = version_file.lstrip('Version: ').strip() + + return version_name + + +def get_image_tools_version(mount_path: str) -> int: + """Extract image-tools version from rootfs mounted at mount_path + + Args: + mount_path (str): mount path of rootfs + + Returns: + str: image-tools version + """ + try: + version_file: str = Path( + f'{mount_path}/usr/lib/python3/dist-packages/vyos/system/__init__.py').read_text() + except FileNotFoundError: + system_cfg_ver: int = 0 + else: + res = re_compile(REGEX_SYSTEM_CFG_VER).search(version_file) + system_cfg_ver: int = int(res.groupdict().get('cfg_ver', 0)) + + return system_cfg_ver + + +def get_versions(image_name: str, root_dir: str = '') -> dict[str, str]: + """Return versions of image and image-tools + + Args: + image_name (str): a name of an image + root_dir (str, optional): an optional path to the root directory. + Defaults to ''. + + Returns: + dict[str, int]: a dictionary with versions of image and image-tools + """ + if not root_dir: + root_dir = disk.find_persistence() + squashfs_file: str = next( Path(f'{root_dir}/boot/{image_name}').glob('*.squashfs')).as_posix() with TemporaryDirectory() as squashfs_mounted: disk.partition_mount(squashfs_file, squashfs_mounted, 'squashfs') - version_file: str = Path( - f'{squashfs_mounted}/opt/vyatta/etc/version').read_text() + + image_version: str = get_image_version(squashfs_mounted) + image_tools_version: int = get_image_tools_version(squashfs_mounted) + disk.partition_umount(squashfs_file) - version_name: str = version_file.lstrip('Version: ').strip() - return version_name + versions: dict[str, int] = { + 'image': image_version, + 'image-tools': image_tools_version + } + + return versions def get_details(image_name: str, root_dir: str = '') -> ImageDetails: @@ -95,7 +144,9 @@ def get_details(image_name: str, root_dir: str = '') -> ImageDetails: if not root_dir: root_dir = disk.find_persistence() - image_version: str = get_version(image_name, root_dir) + versions = get_versions(image_name, root_dir) + image_version: str = versions.get('image', '') + image_tools_version: int = versions.get('image-tools', 0) image_path: Path = Path(f'{root_dir}/boot/{image_name}') image_path_rw: Path = Path(f'{root_dir}/boot/{image_name}/rw') @@ -113,6 +164,7 @@ def get_details(image_name: str, root_dir: str = '') -> ImageDetails: image_details: ImageDetails = { 'name': image_name, 'version': image_version, + 'tools_version': image_tools_version, 'disk_ro': image_disk_ro, 'disk_rw': image_disk_rw, 'disk_total': image_disk_ro + image_disk_rw @@ -121,6 +173,20 @@ def get_details(image_name: str, root_dir: str = '') -> ImageDetails: return image_details +def get_images_details() -> list[ImageDetails]: + """Return information about all images + + Returns: + list[ImageDetails]: a list of dictionaries with details about images + """ + images: list[str] = grub.version_list() + images_details: list[ImageDetails] = list() + for image_name in images: + images_details.append(get_details(image_name)) + + return images_details + + def get_running_image() -> str: """Find currently running image name @@ -134,7 +200,7 @@ def get_running_image() -> str: if running_image_result: running_image: str = running_image_result.groupdict().get( 'image_version', '') - # we need to have a fallbak for live systems + # we need to have a fallback for live systems if not running_image: running_image: str = version.get_version() -- cgit v1.2.3