From 8f94262e8fa2477700c50303ea6e2c6ddad72adb Mon Sep 17 00:00:00 2001 From: zsdc Date: Thu, 19 Jan 2023 20:18:42 +0200 Subject: image: T4516: Added system image tools This commit adds the whole set of system image tools written from the scratch in Python that allows performing all the operations on images: * check information * perform installation and deletion * versions management Also, it contains a new service that will update the GRUB menu and keep tracking its version in the future. WARNING: The commit contains non-reversible changes. Because of boot menu changes, it will not be possible to manage images from older VyOS versions after an update. --- python/vyos/image.py | 918 +++++++++++++++++++++++++++++++++++++++++ python/vyos/system/__init__.py | 16 + python/vyos/system/disk.py | 172 ++++++++ python/vyos/system/grub.py | 336 +++++++++++++++ python/vyos/system/image.py | 197 +++++++++ 5 files changed, 1639 insertions(+) create mode 100644 python/vyos/image.py create mode 100644 python/vyos/system/__init__.py create mode 100644 python/vyos/system/disk.py create mode 100644 python/vyos/system/grub.py create mode 100644 python/vyos/system/image.py (limited to 'python') diff --git a/python/vyos/image.py b/python/vyos/image.py new file mode 100644 index 000000000..cae25b891 --- /dev/null +++ b/python/vyos/image.py @@ -0,0 +1,918 @@ +#!/usr/bin/env python3 +# +# Copyright 2022 VyOS maintainers and contributors +# +# This file is part of VyOS. +# +# VyOS is free software: you can redistribute it and/or modify it under the +# terms of the GNU General Public License as published by the Free Software +# Foundation, either version 3 of the License, or (at your option) any later +# version. +# +# VyOS is distributed in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# VyOS. If not, see . + +from json import loads as json_loads +from os import sync +from pathlib import Path +from re import MULTILINE, compile as re_compile +from tempfile import TemporaryDirectory +from typing import TypedDict, Union +from uuid import uuid5, NAMESPACE_URL + +from psutil import disk_partitions + +from vyos.template import render +from vyos.util import run, cmd +from vyos import version + +# Define variables +GRUB_DIR_MAIN: str = '/boot/grub' +GRUB_DIR_VYOS: str = f'{GRUB_DIR_MAIN}/grub.cfg.d' +CFG_VYOS_HEADER: str = f'{GRUB_DIR_VYOS}/00-vyos-header.cfg' +CFG_VYOS_MODULES: str = f'{GRUB_DIR_VYOS}/10-vyos-modules-autoload.cfg' +CFG_VYOS_VARS: str = f'{GRUB_DIR_VYOS}/20-vyos-defaults-autoload.cfg' +CFG_VYOS_COMMON: str = f'{GRUB_DIR_VYOS}/25-vyos-common-autoload.cfg' +CFG_VYOS_PLATFORM: str = f'{GRUB_DIR_VYOS}/30-vyos-platform-autoload.cfg' +CFG_VYOS_MENU: str = f'{GRUB_DIR_VYOS}/40-vyos-menu-autoload.cfg' +CFG_VYOS_OPTIONS: str = f'{GRUB_DIR_VYOS}/50-vyos-options.cfg' +GRUB_DIR_VYOS_VERS: str = f'{GRUB_DIR_VYOS}/vyos-versions' + +TMPL_VYOS_VERSION: str = 'grub/grub_vyos_version.j2' +TMPL_GRUB_VARS: str = 'grub/grub_vars.j2' +TMPL_GRUB_MAIN: str = 'grub/grub_main.j2' +TMPL_GRUB_MENU: str = 'grub/grub_menu.j2' +TMPL_GRUB_MODULES: str = 'grub/grub_modules.j2' +TMPL_GRUB_OPTS: str = 'grub/grub_options.j2' +TMPL_GRUB_COMMON: str = 'grub/grub_common.j2' + +# prepare regexes +REGEX_GRUB_VARS: str = r'^set (?P.+)=[\'"]?(?P.*)(?.+)$' +REGEX_KERNEL_CMDLINE: str = r'^BOOT_IMAGE=/(?Pboot|live)/((?P.+)/)?vmlinuz.*$' + + +# structures definitions +class ImageDetails(TypedDict): + name: str + version: str + disk_ro: int + disk_rw: int + disk_total: int + + +class BootDetails(TypedDict): + image_default: str + image_running: str + images_available: list[str] + console_type: str + console_num: int + + +class Grub: + + def install(self, drive_path: str, boot_dir: str, + efi_dir: str) -> None: + """Install GRUB for both BIOS and EFI modes (hybrid boot) + + Args: + drive_path (str): path to a drive where GRUB must be installed + boot_dir (str): a path to '/boot' directory + efi_dir (str): a path to '/boot/efi' directory + """ + commands: list[str] = [ + f'grub-install --no-floppy --target=i386-pc --boot-directory={boot_dir} \ + {drive_path} --force', + f'grub-install --no-floppy --recheck --target=x86_64-efi \ + --force-extra-removable --boot-directory={boot_dir} \ + --efi-directory={efi_dir} --bootloader-id="VyOS" \ + --no-uefi-secure-boot' + ] + for command in commands: + run(command) + + def gen_version_uuid(self, version_name: str) -> str: + """Generate unique ID from version name + + Use UUID5 / NAMESPACE_URL with prefix `uuid5-` + + Args: + version_name (str): version name + + Returns: + str: generated unique ID + """ + ver_uuid = uuid5(NAMESPACE_URL, version_name) + ver_id = f'uuid5-{ver_uuid}' + return ver_id + + def version_add(self, + version_name: str, + root_dir: str = '', + boot_opts: str = '') -> None: + """Add a new VyOS version to GRUB loader configuration + + Args: + vyos_version (str): VyOS version name + root_dir (str): an optional path to the root directory. + Defaults to empty. + boot_opts (str): an optional boot options for Linux kernel. + Defaults to empty. + """ + if not root_dir: + root_dir = find_presistence() + version_config: str = f'{root_dir}/{GRUB_DIR_VYOS_VERS}/{version_name}.cfg' + render( + version_config, TMPL_VYOS_VERSION, { + 'version_name': version_name, + 'version_uuid': self.gen_version_uuid(version_name), + 'boot_opts': boot_opts + }) + + def version_del(self, vyos_version: str, root_dir: str = '') -> None: + """Delete a VyOS version from GRUB loader configuration + + Args: + vyos_version (str): VyOS version name + root_dir (str): an optional path to the root directory. + Defaults to empty. + """ + if not root_dir: + root_dir = find_presistence() + version_config = f'{root_dir}/{GRUB_DIR_VYOS_VERS}/{vyos_version}.cfg' + Path(version_config).unlink(missing_ok=True) + + def grub_version_list(self, root_dir: str = '') -> list[str]: + """Generate a list with installed VyOS versions + + Args: + root_dir (str): an optional path to the root directory. + Defaults to empty. + + Returns: + list: A list with versions names + """ + if not root_dir: + root_dir = find_presistence() + versions_files = Path(f'{root_dir}/{GRUB_DIR_VYOS_VERS}').glob('*.cfg') + versions_list: list[str] = [] + for file in versions_files: + versions_list.append(file.stem) + return versions_list + + def grub_read_env(self, env_file: str = '') -> dict[str, str]: + """Read GRUB environment + + Args: + env_file (str, optional): a path to grub environment file. + Defaults to empty. + + Returns: + dict: dictionary with GRUB environment + """ + if not env_file: + root_dir: str = find_presistence() + env_file = f'{root_dir}/{GRUB_DIR_MAIN}/grubenv' + + env_content: str = cmd(f'grub-editenv {env_file} list').splitlines() + regex_filter = re_compile( + r'^(?P.*)=(?P.*)$') + env_dict: dict[str, str] = {} + for env_item in env_content: + search_result = regex_filter.fullmatch(env_item) + if search_result: + search_result_dict: dict[str, str] = search_result.groupdict() + variable_name: str = search_result_dict.get('variable_name', '') + variable_value: str = search_result_dict.get( + 'variable_value', '') + if variable_name and variable_value: + env_dict.update({variable_name: variable_value}) + return env_dict + + def grub_get_cfg_ver(self, root_dir: str = '') -> int: + """Get current version of GRUB configuration + + Args: + root_dir (str, optional): an optional path to the root directory. + Defaults to empty. + + Returns: + int: a configuration version + """ + if not root_dir: + root_dir = find_presistence() + + cfg_ver: Union[str, None] = grub_vars_read( + f'{root_dir}/{CFG_VYOS_HEADER}').get('VYOS_CFG_VER') + if cfg_ver: + cfg_ver_int: int = int(cfg_ver) + else: + cfg_ver_int: int = 0 + return cfg_ver_int + + def grub_write_cfg_ver(self, cfg_ver: int, root_dir: str = '') -> None: + """Write version number of GRUB configuration + + Args: + cfg_ver (int): a version number to write + root_dir (str, optional): an optional path to the root directory. + Defaults to empty. + + Returns: + int: a configuration version + """ + if not root_dir: + root_dir = find_presistence() + + vars_file: str = f'{root_dir}/{CFG_VYOS_HEADER}' + vars_current: dict[str, str] = grub_vars_read(vars_file) + vars_current['VYOS_CFG_VER'] = str(cfg_ver) + grub_vars_write(vars_file, vars_current) + + def grub_vars_read(self, grub_cfg: str) -> dict[str, str]: + """Read variables from a GRUB configuration file + + Args: + grub_cfg (str): a path to the GRUB config file + + Returns: + dict: a dictionary with variables and values + """ + vars_dict: dict[str, str] = {} + regex_filter = re_compile(REGEX_GRUB_VARS) + try: + config_text: list[str] = Path(grub_cfg).read_text().splitlines() + except FileNotFoundError: + return vars_dict + for line in config_text: + search_result = regex_filter.fullmatch(line) + if search_result: + search_dict = search_result.groupdict() + variable_name: str = search_dict.get('variable_name', '') + variable_value: str = search_dict.get('variable_value', '') + if variable_name and variable_value: + vars_dict.update({variable_name: variable_value}) + return vars_dict + + def grub_modules_read(self, grub_cfg: str) -> list[str]: + """Read modules list from a GRUB configuration file + + Args: + grub_cfg (str): a path to the GRUB config file + + Returns: + list: a list with modules to load + """ + mods_list: list[str] = [] + regex_filter = re_compile(REGEX_GRUB_MODULES, MULTILINE) + try: + config_text = Path(grub_cfg).read_text() + except FileNotFoundError: + return mods_list + mods_list = regex_filter.findall(config_text) + + return mods_list + + def grub_modules_write(self, grub_cfg: str, mods_list: list[str]) -> None: + """Write modules list to a GRUB configuration file (overwrite everything) + + Args: + grub_cfg (str): a path to GRUB configuration file + mods_list (list): a list with modules to load + """ + render(grub_cfg, TMPL_GRUB_MODULES, {'mods_list': mods_list}) + + def grub_vars_write(self, grub_cfg: str, grub_vars: dict[str, str]) -> None: + """Write variables to a GRUB configuration file (overwrite everything) + + Args: + grub_cfg (str): a path to GRUB configuration file + grub_vars (dict): a dictionary with new variables + """ + render(grub_cfg, TMPL_GRUB_VARS, {'vars': grub_vars}) + + def grub_set_default(self, version_name: str, root_dir: str = '') -> None: + """Set version as default boot entry + + Args: + version_name (str): versio name + root_dir (str, optional): an optional path to the root directory. + Defaults to empty. + """ + if not root_dir: + root_dir = find_presistence() + + vars_file = f'{root_dir}/{CFG_VYOS_VARS}' + vars_current = grub_vars_read(vars_file) + vars_current['default'] = self.gen_version_uuid(version_name) + grub_vars_write(vars_file, vars_current) + + def grub_common_write(self, root_dir: str = '') -> None: + """Write common GRUB configuration file (overwrite everything) + + Args: + root_dir (str, optional): an optional path to the root directory. + Defaults to empty. + """ + if not root_dir: + root_dir = find_presistence() + common_config = f'{root_dir}/{CFG_VYOS_COMMON}' + render(common_config, TMPL_GRUB_COMMON, {}) + + def create_grub_structure(self, root_dir: str = '') -> None: + """Create GRUB directories structure + + Args: + root_dir (str, optional): an optional path to the root directory. + Defaults to ''. + """ + if not root_dir: + root_dir = find_presistence() + + Path(f'{root_dir}/GRUB_DIR_VYOS_VERS').mkdir(parents=True, + exist_ok=True) + + +def disk_cleanup(drive_path: str) -> None: + """Clean up disk partition table (MBR and GPT) + Zeroize primary and secondary headers - first and last 17408 bytes + (512 bytes * 34 LBA) on a drive + + Args: + drive_path (str): path to a drive that needs to be cleaned + """ + # with open(drive_path, 'w+b') as drive: + # drive.seek(0) + # drive.write(b'0' * 17408) + # drive.seek(-17408, 2) + # drive.write(b'0' * 17408) + # # update partitons in kernel + # sync() + # run(f'partprobe {drive_path}') + run(f'sgdisk -Z {drive_path}') + + +def bootmode_detect() -> str: + """Detect system boot mode + + Returns: + str: 'bios' or 'efi' + """ + if Path('/sys/firmware/efi/').exists(): + return 'efi' + else: + return 'bios' + + +def parttable_create(drive_path: str, root_size: int) -> None: + """Create a hybrid MBR/GPT partition table + 0-2047 first sectors are free + 2048-4095 sectors - BIOS Boot Partition + 4096 + 256 MB - EFI system partition + Everything else till the end of a drive - Linux partition + + Args: + drive_path (str): path to a drive + """ + if not root_size: + root_size_text: str = '+100%' + else: + root_size_text: str = str(root_size) + command = f'sgdisk -a1 -n1:2048:4095 -t1:EF02 -n2:4096:+256M -t2:EF00 \ + -n3:0:+{root_size_text}K -t3:8300 {drive_path}' + + run(command) + # update partitons in kernel + sync() + run(f'partprobe {drive_path}') + + +def filesystem_create(partition: str, fstype: str) -> None: + """Create a filesystem on a partition + + Args: + partition (str): path to a partition (for example: '/dev/sda1') + fstype (str): filesystem type ('efi' or 'ext4') + """ + if fstype == 'efi': + command = 'mkfs -t fat -n EFI' + run(f'{command} {partition}') + if fstype == 'ext4': + command = 'mkfs -t ext4 -L persistence' + run(f'{command} {partition}') + + +def partition_mount(partition: str, + path: str, + fsype: str = '', + overlay_params: dict[str, str] = {}) -> None: + """Mount a partition into a path + + Args: + partition (str): path to a partition (for example: '/dev/sda1') + path (str): a path where to mount + fsype (str): optionally, set fstype ('squashfs', 'overlay', 'iso9660') + overlay_params (dict): optionally, set overlay parameters. + Defaults to None. + """ + if fsype in ['squashfs', 'iso9660']: + command: str = f'mount -o loop,ro -t {fsype} {partition} {path}' + if fsype == 'overlay' and overlay_params: + command: str = f'mount -t overlay -o noatime,\ + upperdir={overlay_params["upperdir"]},\ + lowerdir={overlay_params["lowerdir"]},\ + workdir={overlay_params["workdir"]} overlay {path}' + + else: + command = f'mount {partition} {path}' + + run(command) + + +def partition_umount(partition: str = '', path: str = '') -> None: + """Umount a partition by a partition name or a path + + Args: + partition (str): path to a partition (for example: '/dev/sda1') + path (str): a path where a partition is mounted + """ + if partition: + command = f'umount {partition}' + run(command) + if path: + command = f'umount {path}' + run(command) + + +def grub_install(drive_path: str, boot_dir: str, efi_dir: str) -> None: + """Install GRUB for both BIOS and EFI modes (hybrid boot) + + Args: + drive_path (str): path to a drive where GRUB must be installed + boot_dir (str): a path to '/boot' directory + efi_dir (str): a path to '/boot/efi' directory + """ + commands: list[str] = [ + f'grub-install --no-floppy --target=i386-pc --boot-directory={boot_dir} \ + {drive_path} --force' , + f'grub-install --no-floppy --recheck --target=x86_64-efi \ + --force-extra-removable --boot-directory={boot_dir} \ + --efi-directory={efi_dir} --bootloader-id="VyOS" \ + --no-uefi-secure-boot' + ] + for command in commands: + run(command) + + +def find_presistence() -> str: + """Find a mountpoint for persistence storage + + Returns: + str: Path where 'persistance' pertition is mounted, Empty if not found + """ + mounted_partitions = disk_partitions() + for partition in mounted_partitions: + if partition.mountpoint.endswith('/persistence'): + return partition.mountpoint + return '' + + +def find_device(mountpoint: str) -> str: + """Find a device by mountpoint + + Returns: + str: Path to device, Empty if not found + """ + mounted_partitions = disk_partitions() + for partition in mounted_partitions: + if partition.mountpoint == mountpoint: + return partition.mountpoint + return '' + + +def gen_version_uuid(version_name: str) -> str: + """Generate unique ID from version name + + Use UUID5 / NAMESPACE_URL with prefix `uuid5-` + + Args: + version_name (str): version name + + Returns: + str: generated unique ID + """ + ver_uuid = uuid5(NAMESPACE_URL, version_name) + ver_id = f'uuid5-{ver_uuid}' + return ver_id + + +def grub_version_add(version_name: str, + root_dir: str = '', + boot_opts: str = '') -> None: + """Add a new VyOS version to GRUB loader configuration + + Args: + vyos_version (str): VyOS version name + root_dir (str): an optional path to the root directory. + Defaults to empty. + boot_opts (str): an optional boot options for Linux kernel. + Defaults to empty. + """ + if not root_dir: + root_dir = find_presistence() + version_config: str = f'{root_dir}/{GRUB_DIR_VYOS_VERS}/{version_name}.cfg' + render( + version_config, TMPL_VYOS_VERSION, { + 'version_name': version_name, + 'version_uuid': gen_version_uuid(version_name), + 'boot_opts': boot_opts + }) + + +def grub_version_del(vyos_version: str, root_dir: str = '') -> None: + """Delete a VyOS version from GRUB loader configuration + + Args: + vyos_version (str): VyOS version name + root_dir (str): an optional path to the root directory. + Defaults to empty. + """ + if not root_dir: + root_dir = find_presistence() + version_config = f'{root_dir}/{GRUB_DIR_VYOS_VERS}/{vyos_version}.cfg' + Path(version_config).unlink(missing_ok=True) + + +def grub_version_list(root_dir: str = '') -> list[str]: + """Generate a list with installed VyOS versions + + Args: + root_dir (str): an optional path to the root directory. + Defaults to empty. + + Returns: + list: A list with versions names + """ + if not root_dir: + root_dir = find_presistence() + versions_files = Path(f'{root_dir}/{GRUB_DIR_VYOS_VERS}').glob('*.cfg') + versions_list: list[str] = [] + for file in versions_files: + versions_list.append(file.stem) + return versions_list + + +def grub_read_env(env_file: str = '') -> dict[str, str]: + """Read GRUB environment + + Args: + env_file (str, optional): a path to grub environment file. + Defaults to empty. + + Returns: + dict: dictionary with GRUB environment + """ + if not env_file: + root_dir: str = find_presistence() + env_file = f'{root_dir}/{GRUB_DIR_MAIN}/grubenv' + + env_content: str = cmd(f'grub-editenv {env_file} list').splitlines() + regex_filter = re_compile(r'^(?P.*)=(?P.*)$') + env_dict: dict[str, str] = {} + for env_item in env_content: + search_result = regex_filter.fullmatch(env_item) + if search_result: + search_result_dict: dict[str, str] = search_result.groupdict() + variable_name: str = search_result_dict.get('variable_name', '') + variable_value: str = search_result_dict.get('variable_value', '') + if variable_name and variable_value: + env_dict.update({variable_name: variable_value}) + return env_dict + + +def grub_get_cfg_ver(root_dir: str = '') -> int: + """Get current version of GRUB configuration + + Args: + root_dir (str, optional): an optional path to the root directory. + Defaults to empty. + + Returns: + int: a configuration version + """ + if not root_dir: + root_dir = find_presistence() + + cfg_ver: Union[str, None] = grub_vars_read( + f'{root_dir}/{CFG_VYOS_HEADER}').get('VYOS_CFG_VER') + if cfg_ver: + cfg_ver_int: int = int(cfg_ver) + else: + cfg_ver_int: int = 0 + return cfg_ver_int + + +def grub_write_cfg_ver(cfg_ver: int, root_dir: str = '') -> None: + """Write version number of GRUB configuration + + Args: + cfg_ver (int): a version number to write + root_dir (str, optional): an optional path to the root directory. + Defaults to empty. + + Returns: + int: a configuration version + """ + if not root_dir: + root_dir = find_presistence() + + vars_file: str = f'{root_dir}/{CFG_VYOS_HEADER}' + vars_current: dict[str, str] = grub_vars_read(vars_file) + vars_current['VYOS_CFG_VER'] = str(cfg_ver) + grub_vars_write(vars_file, vars_current) + + +def grub_vars_read(grub_cfg: str) -> dict[str, str]: + """Read variables from a GRUB configuration file + + Args: + grub_cfg (str): a path to the GRUB config file + + Returns: + dict: a dictionary with variables and values + """ + vars_dict: dict[str, str] = {} + regex_filter = re_compile(REGEX_GRUB_VARS) + try: + config_text: list[str] = Path(grub_cfg).read_text().splitlines() + except FileNotFoundError: + return vars_dict + for line in config_text: + search_result = regex_filter.fullmatch(line) + if search_result: + search_dict = search_result.groupdict() + variable_name: str = search_dict.get('variable_name', '') + variable_value: str = search_dict.get('variable_value', '') + if variable_name and variable_value: + vars_dict.update({variable_name: variable_value}) + return vars_dict + + +def grub_modules_read(grub_cfg: str) -> list[str]: + """Read modules list from a GRUB configuration file + + Args: + grub_cfg (str): a path to the GRUB config file + + Returns: + list: a list with modules to load + """ + mods_list: list[str] = [] + regex_filter = re_compile(REGEX_GRUB_MODULES, MULTILINE) + try: + config_text = Path(grub_cfg).read_text() + except FileNotFoundError: + return mods_list + mods_list = regex_filter.findall(config_text) + + return mods_list + + +def grub_modules_write(grub_cfg: str, mods_list: list[str]) -> None: + """Write modules list to a GRUB configuration file (overwrite everything) + + Args: + grub_cfg (str): a path to GRUB configuration file + mods_list (list): a list with modules to load + """ + render(grub_cfg, TMPL_GRUB_MODULES, {'mods_list': mods_list}) + + +def grub_vars_write(grub_cfg: str, grub_vars: dict[str, str]) -> None: + """Write variables to a GRUB configuration file (overwrite everything) + + Args: + grub_cfg (str): a path to GRUB configuration file + grub_vars (dict): a dictionary with new variables + """ + render(grub_cfg, TMPL_GRUB_VARS, {'vars': grub_vars}) + + +def grub_set_default(version_name: str, root_dir: str = '') -> None: + """Set version as default boot entry + + Args: + version_name (str): versio name + root_dir (str, optional): an optional path to the root directory. + Defaults to empty. + """ + if not root_dir: + root_dir = find_presistence() + + vars_file = f'{root_dir}/{CFG_VYOS_VARS}' + vars_current = grub_vars_read(vars_file) + vars_current['default'] = gen_version_uuid(version_name) + grub_vars_write(vars_file, vars_current) + + +def grub_common_write(root_dir: str = '') -> None: + """Write common GRUB configuration file (overwrite everything) + + Args: + root_dir (str, optional): an optional path to the root directory. + Defaults to empty. + """ + if not root_dir: + root_dir = find_presistence() + common_config = f'{root_dir}/{CFG_VYOS_COMMON}' + render(common_config, TMPL_GRUB_COMMON, {}) + + +def raid_create(raid_name: str, + raid_members: list[str], + raid_level: str = 'raid1') -> None: + """Create a RAID array + + Args: + raid_name (str): a name of array (data, backup, test, etc.) + raid_members (list[str]): a list of array members + raid_level (str, optional): an array level. Defaults to 'raid1'. + """ + raid_devices_num: int = len(raid_members) + raid_members_str: str = ' '.join(raid_members) + command: str = f'mdadm --create /dev/md/{raid_name} --metadata=1.2 \ + --raid-devices={raid_devices_num} --level={raid_level} \ + {raid_members_str}' + + run(command) + + +def disks_size() -> dict[str, int]: + """Get a dictionary with physical disks and their sizes + + Returns: + dict[str, int]: a dictionary with name: size mapping + """ + disks_size: dict[str, int] = {} + lsblk: str = cmd('lsblk -Jbp') + blk_list = json_loads(lsblk) + for device in blk_list.get('blockdevices'): + if device['type'] == 'disk': + disks_size.update({device['name']: device['size']}) + return disks_size + + +def image_get_version(image_name: str, root_dir: str) -> str: + """Extract version name from rootfs based on image name + + Args: + image_name (str): a name of image (from boot menu) + root_dir (str): a root directory of persistence storage + + Returns: + str: version name + """ + squashfs_file: str = next( + Path(f'{root_dir}/boot/{image_name}').glob('*.squashfs')).as_posix() + with TemporaryDirectory() as squashfs_mounted: + partition_mount(squashfs_file, squashfs_mounted, 'squashfs') + version_file: str = Path( + f'{squashfs_mounted}/opt/vyatta/etc/version').read_text() + partition_umount(squashfs_file) + version_name: str = version_file.lstrip('Version: ').strip() + + return version_name + + +def image_details(image_name: str, root_dir: str = '') -> ImageDetails: + """Return information about image + + Args: + image_name (str): a name of an image + root_dir (str, optional): an optional path to the root directory. + Defaults to ''. + + Returns: + ImageDetails: a dictionary with details about an image (name, size) + """ + if not root_dir: + root_dir = find_presistence() + + image_version: str = image_get_version(image_name, root_dir) + + image_path: Path = Path(f'{root_dir}/boot/{image_name}') + image_path_rw: Path = Path(f'{root_dir}/boot/{image_name}/rw') + + image_disk_ro: int = int() + for item in image_path.iterdir(): + if not item.is_symlink(): + image_disk_ro += item.stat().st_size + + image_disk_rw: int = int() + for item in image_path_rw.rglob('*'): + if not item.is_symlink(): + image_disk_rw += item.stat().st_size + + image_details: ImageDetails = { + 'name': image_name, + 'version': image_version, + 'disk_ro': image_disk_ro, + 'disk_rw': image_disk_rw, + 'disk_total': image_disk_ro + image_disk_rw + } + + return image_details + + +def get_running_image() -> str: + """Find currently running image name + + Returns: + str: image name + """ + running_image: str = '' + regex_filter = re_compile(REGEX_KERNEL_CMDLINE) + cmdline: str = Path('/proc/cmdline').read_text() + running_image_result = regex_filter.match(cmdline) + if running_image_result: + running_image: str = running_image_result.groupdict().get( + 'image_version', '') + # we need to have a fallbak for live systems + if not running_image: + running_image: str = version.get_version() + + return running_image + + +def get_default_image(root_dir: str = '') -> str: + """Get default boot entry + + Args: + root_dir (str, optional): an optional path to the root directory. + Defaults to empty. + Returns: + str: a version name + """ + if not root_dir: + root_dir = find_presistence() + + vars_file: str = f'{root_dir}/{CFG_VYOS_VARS}' + vars_current: dict[str, str] = grub_vars_read(vars_file) + default_uuid: str = vars_current.get('default', '') + if default_uuid: + images_list: list[str] = Grub.grub_version_list(root_dir) + for image_name in images_list: + if default_uuid == gen_version_uuid(image_name): + return image_name + return '' + else: + return '' + + +def image_name_validate(image_name: str) -> bool: + """Validate image name + + Args: + image_name (str): suggested image name + + Returns: + bool: validation result + """ + regex_filter = re_compile(r'^[\w\.+-]{1,32}$') + if regex_filter.match(image_name): + return True + return False + + +def is_live_boot() -> bool: + """Detect live booted system + + Returns: + bool: True if the system currently booted in live mode + """ + regex_filter = re_compile(REGEX_KERNEL_CMDLINE) + cmdline: str = Path('/proc/cmdline').read_text() + running_image_result = regex_filter.match(cmdline) + if running_image_result: + boot_type: str = running_image_result.groupdict().get('boot_type', '') + if boot_type == 'live': + return True + return False + + +def create_grub_structure(root_dir: str = '') -> None: + """Create GRUB directories structure + + Args: + root_dir (str, optional): an optional path to the root directory. + Defaults to ''. + """ + if not root_dir: + root_dir = find_presistence() + + Path(f'{root_dir}/GRUB_DIR_VYOS_VERS').mkdir(parents=True, exist_ok=True) diff --git a/python/vyos/system/__init__.py b/python/vyos/system/__init__.py new file mode 100644 index 000000000..403738e20 --- /dev/null +++ b/python/vyos/system/__init__.py @@ -0,0 +1,16 @@ +# Copyright 2022 VyOS maintainers and contributors +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see . + +__all_: list[str] = ['disk', 'grub', 'image'] diff --git a/python/vyos/system/disk.py b/python/vyos/system/disk.py new file mode 100644 index 000000000..e20cf32be --- /dev/null +++ b/python/vyos/system/disk.py @@ -0,0 +1,172 @@ +# Copyright 2022 VyOS maintainers and contributors +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see . + +from json import loads as json_loads +from os import sync + +from psutil import disk_partitions + +from vyos.util import run, cmd + + +def disk_cleanup(drive_path: str) -> None: + """Clean up disk partition table (MBR and GPT) + Zeroize primary and secondary headers - first and last 17408 bytes + (512 bytes * 34 LBA) on a drive + + Args: + drive_path (str): path to a drive that needs to be cleaned + """ + run(f'sgdisk -Z {drive_path}') + + +def find_persistence() -> str: + """Find a mountpoint for persistence storage + + Returns: + str: Path where 'persistance' pertition is mounted, Empty if not found + """ + mounted_partitions = disk_partitions() + for partition in mounted_partitions: + if partition.mountpoint.endswith('/persistence'): + return partition.mountpoint + return '' + + +def parttable_create(drive_path: str, root_size: int) -> None: + """Create a hybrid MBR/GPT partition table + 0-2047 first sectors are free + 2048-4095 sectors - BIOS Boot Partition + 4096 + 256 MB - EFI system partition + Everything else till the end of a drive - Linux partition + + Args: + drive_path (str): path to a drive + """ + if not root_size: + root_size_text: str = '+100%' + else: + root_size_text: str = str(root_size) + command = f'sgdisk -a1 -n1:2048:4095 -t1:EF02 -n2:4096:+256M -t2:EF00 \ + -n3:0:+{root_size_text}K -t3:8300 {drive_path}' + + run(command) + # update partitons in kernel + sync() + run(f'partprobe {drive_path}') + + +def filesystem_create(partition: str, fstype: str) -> None: + """Create a filesystem on a partition + + Args: + partition (str): path to a partition (for example: '/dev/sda1') + fstype (str): filesystem type ('efi' or 'ext4') + """ + if fstype == 'efi': + command = 'mkfs -t fat -n EFI' + run(f'{command} {partition}') + if fstype == 'ext4': + command = 'mkfs -t ext4 -L persistence' + run(f'{command} {partition}') + + +def partition_mount(partition: str, + path: str, + fsype: str = '', + overlay_params: dict[str, str] = {}) -> None: + """Mount a partition into a path + + Args: + partition (str): path to a partition (for example: '/dev/sda1') + path (str): a path where to mount + fsype (str): optionally, set fstype ('squashfs', 'overlay', 'iso9660') + overlay_params (dict): optionally, set overlay parameters. + Defaults to None. + """ + if fsype in ['squashfs', 'iso9660']: + command: str = f'mount -o loop,ro -t {fsype} {partition} {path}' + if fsype == 'overlay' and overlay_params: + command: str = f'mount -t overlay -o noatime,\ + upperdir={overlay_params["upperdir"]},\ + lowerdir={overlay_params["lowerdir"]},\ + workdir={overlay_params["workdir"]} overlay {path}' + + else: + command = f'mount {partition} {path}' + + run(command) + + +def partition_umount(partition: str = '', path: str = '') -> None: + """Umount a partition by a partition name or a path + + Args: + partition (str): path to a partition (for example: '/dev/sda1') + path (str): a path where a partition is mounted + """ + if partition: + command = f'umount {partition}' + run(command) + if path: + command = f'umount {path}' + run(command) + + +def find_device(mountpoint: str) -> str: + """Find a device by mountpoint + + Returns: + str: Path to device, Empty if not found + """ + mounted_partitions = disk_partitions() + for partition in mounted_partitions: + if partition.mountpoint == mountpoint: + return partition.mountpoint + return '' + + +def raid_create(raid_name: str, + raid_members: list[str], + raid_level: str = 'raid1') -> None: + """Create a RAID array + + Args: + raid_name (str): a name of array (data, backup, test, etc.) + raid_members (list[str]): a list of array members + raid_level (str, optional): an array level. Defaults to 'raid1'. + """ + raid_devices_num: int = len(raid_members) + raid_members_str: str = ' '.join(raid_members) + command: str = f'mdadm --create /dev/md/{raid_name} --metadata=1.2 \ + --raid-devices={raid_devices_num} --level={raid_level} \ + {raid_members_str}' + + run(command) + + +def disks_size() -> dict[str, int]: + """Get a dictionary with physical disks and their sizes + + Returns: + dict[str, int]: a dictionary with name: size mapping + """ + disks_size: dict[str, int] = {} + lsblk: str = cmd('lsblk -Jbp') + blk_list = json_loads(lsblk) + for device in blk_list.get('blockdevices'): + if device['type'] == 'disk': + disks_size.update({device['name']: device['size']}) + return disks_size diff --git a/python/vyos/system/grub.py b/python/vyos/system/grub.py new file mode 100644 index 000000000..11c214675 --- /dev/null +++ b/python/vyos/system/grub.py @@ -0,0 +1,336 @@ +# Copyright 2022 VyOS maintainers and contributors +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see . + +from pathlib import Path +from re import MULTILINE, compile as re_compile +from typing import Union +from uuid import uuid5, NAMESPACE_URL, UUID + +from vyos.template import render +from vyos.util import run, cmd +from vyos.system import disk + +# Define variables +GRUB_DIR_MAIN: str = '/boot/grub' +GRUB_DIR_VYOS: str = f'{GRUB_DIR_MAIN}/grub.cfg.d' +CFG_VYOS_HEADER: str = f'{GRUB_DIR_VYOS}/00-vyos-header.cfg' +CFG_VYOS_MODULES: str = f'{GRUB_DIR_VYOS}/10-vyos-modules-autoload.cfg' +CFG_VYOS_VARS: str = f'{GRUB_DIR_VYOS}/20-vyos-defaults-autoload.cfg' +CFG_VYOS_COMMON: str = f'{GRUB_DIR_VYOS}/25-vyos-common-autoload.cfg' +CFG_VYOS_PLATFORM: str = f'{GRUB_DIR_VYOS}/30-vyos-platform-autoload.cfg' +CFG_VYOS_MENU: str = f'{GRUB_DIR_VYOS}/40-vyos-menu-autoload.cfg' +CFG_VYOS_OPTIONS: str = f'{GRUB_DIR_VYOS}/50-vyos-options.cfg' +GRUB_DIR_VYOS_VERS: str = f'{GRUB_DIR_VYOS}/vyos-versions' + +TMPL_VYOS_VERSION: str = 'grub/grub_vyos_version.j2' +TMPL_GRUB_VARS: str = 'grub/grub_vars.j2' +TMPL_GRUB_MAIN: str = 'grub/grub_main.j2' +TMPL_GRUB_MENU: str = 'grub/grub_menu.j2' +TMPL_GRUB_MODULES: str = 'grub/grub_modules.j2' +TMPL_GRUB_OPTS: str = 'grub/grub_options.j2' +TMPL_GRUB_COMMON: str = 'grub/grub_common.j2' + +# prepare regexes +REGEX_GRUB_VARS: str = r'^set (?P.+)=[\'"]?(?P.*)(?.+)$' +REGEX_KERNEL_CMDLINE: str = r'^BOOT_IMAGE=/(?Pboot|live)/((?P.+)/)?vmlinuz.*$' + + +def install(drive_path: str, boot_dir: str, efi_dir: str) -> None: + """Install GRUB for both BIOS and EFI modes (hybrid boot) + + Args: + drive_path (str): path to a drive where GRUB must be installed + boot_dir (str): a path to '/boot' directory + efi_dir (str): a path to '/boot/efi' directory + """ + commands: list[str] = [ + f'grub-install --no-floppy --target=i386-pc --boot-directory={boot_dir} \ + {drive_path} --force', + f'grub-install --no-floppy --recheck --target=x86_64-efi \ + --force-extra-removable --boot-directory={boot_dir} \ + --efi-directory={efi_dir} --bootloader-id="VyOS" \ + --no-uefi-secure-boot' + ] + for command in commands: + run(command) + + +def gen_version_uuid(version_name: str) -> str: + """Generate unique ID from version name + + Use UUID5 / NAMESPACE_URL with prefix `uuid5-` + + Args: + version_name (str): version name + + Returns: + str: generated unique ID + """ + ver_uuid: UUID = uuid5(NAMESPACE_URL, version_name) + ver_id: str = f'uuid5-{ver_uuid}' + return ver_id + + +def version_add(version_name: str, + root_dir: str = '', + boot_opts: str = '') -> None: + """Add a new VyOS version to GRUB loader configuration + + Args: + vyos_version (str): VyOS version name + root_dir (str): an optional path to the root directory. + Defaults to empty. + boot_opts (str): an optional boot options for Linux kernel. + Defaults to empty. + """ + if not root_dir: + root_dir = disk.find_persistence() + version_config: str = f'{root_dir}/{GRUB_DIR_VYOS_VERS}/{version_name}.cfg' + render( + version_config, TMPL_VYOS_VERSION, { + 'version_name': version_name, + 'version_uuid': gen_version_uuid(version_name), + 'boot_opts': boot_opts + }) + + +def version_del(vyos_version: str, root_dir: str = '') -> None: + """Delete a VyOS version from GRUB loader configuration + + Args: + vyos_version (str): VyOS version name + root_dir (str): an optional path to the root directory. + Defaults to empty. + """ + if not root_dir: + root_dir = disk.find_persistence() + version_config: str = f'{root_dir}/{GRUB_DIR_VYOS_VERS}/{vyos_version}.cfg' + Path(version_config).unlink(missing_ok=True) + + +def version_list(root_dir: str = '') -> list[str]: + """Generate a list with installed VyOS versions + + Args: + root_dir (str): an optional path to the root directory. + Defaults to empty. + + Returns: + list: A list with versions names + """ + if not root_dir: + root_dir = disk.find_persistence() + versions_files = Path(f'{root_dir}/{GRUB_DIR_VYOS_VERS}').glob('*.cfg') + versions_list: list[str] = [] + for file in versions_files: + versions_list.append(file.stem) + return versions_list + + +def read_env(env_file: str = '') -> dict[str, str]: + """Read GRUB environment + + Args: + env_file (str, optional): a path to grub environment file. + Defaults to empty. + + Returns: + dict: dictionary with GRUB environment + """ + if not env_file: + root_dir: str = disk.find_persistence() + env_file = f'{root_dir}/{GRUB_DIR_MAIN}/grubenv' + + env_content: str = cmd(f'grub-editenv {env_file} list').splitlines() + regex_filter = re_compile(r'^(?P.*)=(?P.*)$') + env_dict: dict[str, str] = {} + for env_item in env_content: + search_result = regex_filter.fullmatch(env_item) + if search_result: + search_result_dict: dict[str, str] = search_result.groupdict() + variable_name: str = search_result_dict.get('variable_name', '') + variable_value: str = search_result_dict.get('variable_value', '') + if variable_name and variable_value: + env_dict.update({variable_name: variable_value}) + return env_dict + + +def get_cfg_ver(root_dir: str = '') -> int: + """Get current version of GRUB configuration + + Args: + root_dir (str, optional): an optional path to the root directory. + Defaults to empty. + + Returns: + int: a configuration version + """ + if not root_dir: + root_dir = disk.find_persistence() + + cfg_ver: Union[str, None] = vars_read(f'{root_dir}/{CFG_VYOS_HEADER}').get( + 'VYOS_CFG_VER') + if cfg_ver: + cfg_ver_int: int = int(cfg_ver) + else: + cfg_ver_int: int = 0 + return cfg_ver_int + + +def write_cfg_ver(cfg_ver: int, root_dir: str = '') -> None: + """Write version number of GRUB configuration + + Args: + cfg_ver (int): a version number to write + root_dir (str, optional): an optional path to the root directory. + Defaults to empty. + + Returns: + int: a configuration version + """ + if not root_dir: + root_dir = disk.find_persistence() + + vars_file: str = f'{root_dir}/{CFG_VYOS_HEADER}' + vars_current: dict[str, str] = vars_read(vars_file) + vars_current['VYOS_CFG_VER'] = str(cfg_ver) + vars_write(vars_file, vars_current) + + +def vars_read(grub_cfg: str) -> dict[str, str]: + """Read variables from a GRUB configuration file + + Args: + grub_cfg (str): a path to the GRUB config file + + Returns: + dict: a dictionary with variables and values + """ + vars_dict: dict[str, str] = {} + regex_filter = re_compile(REGEX_GRUB_VARS) + try: + config_text: list[str] = Path(grub_cfg).read_text().splitlines() + except FileNotFoundError: + return vars_dict + for line in config_text: + search_result = regex_filter.fullmatch(line) + if search_result: + search_dict = search_result.groupdict() + variable_name: str = search_dict.get('variable_name', '') + variable_value: str = search_dict.get('variable_value', '') + if variable_name and variable_value: + vars_dict.update({variable_name: variable_value}) + return vars_dict + + +def modules_read(grub_cfg: str) -> list[str]: + """Read modules list from a GRUB configuration file + + Args: + grub_cfg (str): a path to the GRUB config file + + Returns: + list: a list with modules to load + """ + mods_list: list[str] = [] + regex_filter = re_compile(REGEX_GRUB_MODULES, MULTILINE) + try: + config_text = Path(grub_cfg).read_text() + except FileNotFoundError: + return mods_list + mods_list = regex_filter.findall(config_text) + + return mods_list + + +def modules_write(grub_cfg: str, mods_list: list[str]) -> None: + """Write modules list to a GRUB configuration file (overwrite everything) + + Args: + grub_cfg (str): a path to GRUB configuration file + mods_list (list): a list with modules to load + """ + render(grub_cfg, TMPL_GRUB_MODULES, {'mods_list': mods_list}) + + +def vars_write(grub_cfg: str, grub_vars: dict[str, str]) -> None: + """Write variables to a GRUB configuration file (overwrite everything) + + Args: + grub_cfg (str): a path to GRUB configuration file + grub_vars (dict): a dictionary with new variables + """ + render(grub_cfg, TMPL_GRUB_VARS, {'vars': grub_vars}) + + +def set_default(version_name: str, root_dir: str = '') -> None: + """Set version as default boot entry + + Args: + version_name (str): versio name + root_dir (str, optional): an optional path to the root directory. + Defaults to empty. + """ + if not root_dir: + root_dir = disk.find_persistence() + + vars_file = f'{root_dir}/{CFG_VYOS_VARS}' + vars_current = vars_read(vars_file) + vars_current['default'] = gen_version_uuid(version_name) + vars_write(vars_file, vars_current) + + +def common_write(root_dir: str = '') -> None: + """Write common GRUB configuration file (overwrite everything) + + Args: + root_dir (str, optional): an optional path to the root directory. + Defaults to empty. + """ + if not root_dir: + root_dir = disk.find_persistence() + common_config = f'{root_dir}/{CFG_VYOS_COMMON}' + render(common_config, TMPL_GRUB_COMMON, {}) + + +def create_structure(root_dir: str = '') -> None: + """Create GRUB directories structure + + Args: + root_dir (str, optional): an optional path to the root directory. + Defaults to ''. + """ + if not root_dir: + root_dir = disk.find_persistence() + + Path(f'{root_dir}/GRUB_DIR_VYOS_VERS').mkdir(parents=True, exist_ok=True) + + +def set_console_type(console_type: str, root_dir: str = '') -> None: + """Write default console type to GRUB configuration + + Args: + console_type (str): a default console type + root_dir (str, optional): an optional path to the root directory. + Defaults to empty. + """ + if not root_dir: + root_dir = disk.find_persistence() + + vars_file: str = f'{root_dir}/{CFG_VYOS_VARS}' + vars_current: dict[str, str] = vars_read(vars_file) + vars_current['console_type'] = str(console_type) + vars_write(vars_file, vars_current) diff --git a/python/vyos/system/image.py b/python/vyos/system/image.py new file mode 100644 index 000000000..b77c3563f --- /dev/null +++ b/python/vyos/system/image.py @@ -0,0 +1,197 @@ +# Copyright 2022 VyOS maintainers and contributors +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see . + +from pathlib import Path +from re import compile as re_compile +from tempfile import TemporaryDirectory +from typing import TypedDict + +from vyos import version +from vyos.system import disk, grub + +# Define variables +GRUB_DIR_MAIN: str = '/boot/grub' +GRUB_DIR_VYOS: str = f'{GRUB_DIR_MAIN}/grub.cfg.d' +CFG_VYOS_VARS: str = f'{GRUB_DIR_VYOS}/20-vyos-defaults-autoload.cfg' +GRUB_DIR_VYOS_VERS: str = f'{GRUB_DIR_VYOS}/vyos-versions' +# prepare regexes +REGEX_KERNEL_CMDLINE: str = r'^BOOT_IMAGE=/(?Pboot|live)/((?P.+)/)?vmlinuz.*$' + + +# structures definitions +class ImageDetails(TypedDict): + name: str + version: str + disk_ro: int + disk_rw: int + disk_total: int + + +class BootDetails(TypedDict): + image_default: str + image_running: str + images_available: list[str] + console_type: str + console_num: int + + +def bootmode_detect() -> str: + """Detect system boot mode + + Returns: + str: 'bios' or 'efi' + """ + if Path('/sys/firmware/efi/').exists(): + return 'efi' + else: + return 'bios' + + +def get_version(image_name: str, root_dir: str) -> str: + """Extract version name from rootfs based on image name + + Args: + image_name (str): a name of image (from boot menu) + root_dir (str): a root directory of persistence storage + + Returns: + str: version name + """ + squashfs_file: str = next( + Path(f'{root_dir}/boot/{image_name}').glob('*.squashfs')).as_posix() + with TemporaryDirectory() as squashfs_mounted: + disk.partition_mount(squashfs_file, squashfs_mounted, 'squashfs') + version_file: str = Path( + f'{squashfs_mounted}/opt/vyatta/etc/version').read_text() + disk.partition_umount(squashfs_file) + version_name: str = version_file.lstrip('Version: ').strip() + + return version_name + + +def get_details(image_name: str, root_dir: str = '') -> ImageDetails: + """Return information about image + + Args: + image_name (str): a name of an image + root_dir (str, optional): an optional path to the root directory. + Defaults to ''. + + Returns: + ImageDetails: a dictionary with details about an image (name, size) + """ + if not root_dir: + root_dir = disk.find_persistence() + + image_version: str = get_version(image_name, root_dir) + + image_path: Path = Path(f'{root_dir}/boot/{image_name}') + image_path_rw: Path = Path(f'{root_dir}/boot/{image_name}/rw') + + image_disk_ro: int = int() + for item in image_path.iterdir(): + if not item.is_symlink(): + image_disk_ro += item.stat().st_size + + image_disk_rw: int = int() + for item in image_path_rw.rglob('*'): + if not item.is_symlink(): + image_disk_rw += item.stat().st_size + + image_details: ImageDetails = { + 'name': image_name, + 'version': image_version, + 'disk_ro': image_disk_ro, + 'disk_rw': image_disk_rw, + 'disk_total': image_disk_ro + image_disk_rw + } + + return image_details + + +def get_running_image() -> str: + """Find currently running image name + + Returns: + str: image name + """ + running_image: str = '' + regex_filter = re_compile(REGEX_KERNEL_CMDLINE) + cmdline: str = Path('/proc/cmdline').read_text() + running_image_result = regex_filter.match(cmdline) + if running_image_result: + running_image: str = running_image_result.groupdict().get( + 'image_version', '') + # we need to have a fallbak for live systems + if not running_image: + running_image: str = version.get_version() + + return running_image + + +def get_default_image(root_dir: str = '') -> str: + """Get default boot entry + + Args: + root_dir (str, optional): an optional path to the root directory. + Defaults to empty. + Returns: + str: a version name + """ + if not root_dir: + root_dir = disk.find_persistence() + + vars_file: str = f'{root_dir}/{CFG_VYOS_VARS}' + vars_current: dict[str, str] = grub.vars_read(vars_file) + default_uuid: str = vars_current.get('default', '') + if default_uuid: + images_list: list[str] = grub.version_list(root_dir) + for image_name in images_list: + if default_uuid == grub.gen_version_uuid(image_name): + return image_name + return '' + else: + return '' + + +def validate_name(image_name: str) -> bool: + """Validate image name + + Args: + image_name (str): suggested image name + + Returns: + bool: validation result + """ + regex_filter = re_compile(r'^[\w\.+-]{1,32}$') + if regex_filter.match(image_name): + return True + return False + + +def is_live_boot() -> bool: + """Detect live booted system + + Returns: + bool: True if the system currently booted in live mode + """ + regex_filter = re_compile(REGEX_KERNEL_CMDLINE) + cmdline: str = Path('/proc/cmdline').read_text() + running_image_result = regex_filter.match(cmdline) + if running_image_result: + boot_type: str = running_image_result.groupdict().get('boot_type', '') + if boot_type == 'live': + return True + return False -- cgit v1.2.3 From 74b00c1f6961d1bd3a59768021f154bdb64c154e Mon Sep 17 00:00:00 2001 From: John Estabrook Date: Mon, 10 Apr 2023 14:04:00 -0500 Subject: image: T4516: correct permissions on creation of config directory --- python/vyos/utils/file.py | 6 ++++++ src/op_mode/image_installer.py | 17 ++++++++++++++--- 2 files changed, 20 insertions(+), 3 deletions(-) (limited to 'python') diff --git a/python/vyos/utils/file.py b/python/vyos/utils/file.py index 667a2464b..9f27a7fb9 100644 --- a/python/vyos/utils/file.py +++ b/python/vyos/utils/file.py @@ -134,6 +134,12 @@ def chmod_755(path): S_IROTH | S_IXOTH chmod(path, bitmask) +def chmod_2775(path): + """ user/group permissions with set-group-id bit set """ + from stat import S_ISGID, S_IRWXU, S_IRWXG, S_IROTH, S_IXOTH + + bitmask = S_ISGID | S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH + chmod(path, bitmask) def makedir(path, user=None, group=None): if os.path.exists(path): diff --git a/src/op_mode/image_installer.py b/src/op_mode/image_installer.py index 6ebb38e46..77bb6460f 100644 --- a/src/op_mode/image_installer.py +++ b/src/op_mode/image_installer.py @@ -19,7 +19,7 @@ from argparse import ArgumentParser, Namespace from pathlib import Path -from shutil import copy, rmtree, copytree +from shutil import copy, chown, rmtree, copytree from sys import exit from urllib.parse import urlparse @@ -29,7 +29,9 @@ from vyos.configtree import ConfigTree from vyos.remote import download from vyos.system import disk, grub, image from vyos.template import render -from vyos.util import ask_input, ask_yes_no, run +from vyos.utils.io import ask_input, ask_yes_no +from vyos.utils.file import chmod_2775 +from vyos.util import run # define text messages MSG_ERR_NOT_LIVE: str = 'The system is already installed. Please use "add system image" instead.' @@ -391,6 +393,8 @@ def install_image() -> None: print('Creating a configuration file') target_config_dir: str = f'{DIR_DST_ROOT}/boot/{image_name}/rw/opt/vyatta/etc/config/' Path(target_config_dir).mkdir(parents=True) + chown(target_config_dir, group='vyattacfg') + chmod_2775(target_config_dir) # copy config if migrate_config(): copy('/opt/vyatta/etc/config/config.boot', target_config_dir) @@ -485,9 +489,16 @@ def add_image(image_path: str) -> None: # copy config if migrate_config(): print('Copying configuration directory') - copytree('/opt/vyatta/etc/config/', target_config_dir) + # copytree preserves perms but not ownership: + Path(target_config_dir).mkdir(parents=True) + chown(target_config_dir, group='vyattacfg') + chmod_2775(target_config_dir) + copytree('/opt/vyatta/etc/config/', target_config_dir, + dirs_exist_ok=True) else: Path(target_config_dir).mkdir(parents=True) + chown(target_config_dir, group='vyattacfg') + chmod_2775(target_config_dir) Path(f'{target_config_dir}/.vyatta_config').touch() # copy system image and kernel files -- cgit v1.2.3 From 9e3b769f8402a816f6c7fa80ff12c9579c3f5243 Mon Sep 17 00:00:00 2001 From: John Estabrook Date: Mon, 22 May 2023 20:35:58 -0500 Subject: image: T4516: remove unused file, replaced by vyos/system/image.py --- python/vyos/image.py | 918 --------------------------------------------------- 1 file changed, 918 deletions(-) delete mode 100644 python/vyos/image.py (limited to 'python') diff --git a/python/vyos/image.py b/python/vyos/image.py deleted file mode 100644 index cae25b891..000000000 --- a/python/vyos/image.py +++ /dev/null @@ -1,918 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright 2022 VyOS maintainers and contributors -# -# This file is part of VyOS. -# -# VyOS is free software: you can redistribute it and/or modify it under the -# terms of the GNU General Public License as published by the Free Software -# Foundation, either version 3 of the License, or (at your option) any later -# version. -# -# VyOS is distributed in the hope that it will be useful, but WITHOUT ANY -# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS -# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more -# details. -# -# You should have received a copy of the GNU General Public License along with -# VyOS. If not, see . - -from json import loads as json_loads -from os import sync -from pathlib import Path -from re import MULTILINE, compile as re_compile -from tempfile import TemporaryDirectory -from typing import TypedDict, Union -from uuid import uuid5, NAMESPACE_URL - -from psutil import disk_partitions - -from vyos.template import render -from vyos.util import run, cmd -from vyos import version - -# Define variables -GRUB_DIR_MAIN: str = '/boot/grub' -GRUB_DIR_VYOS: str = f'{GRUB_DIR_MAIN}/grub.cfg.d' -CFG_VYOS_HEADER: str = f'{GRUB_DIR_VYOS}/00-vyos-header.cfg' -CFG_VYOS_MODULES: str = f'{GRUB_DIR_VYOS}/10-vyos-modules-autoload.cfg' -CFG_VYOS_VARS: str = f'{GRUB_DIR_VYOS}/20-vyos-defaults-autoload.cfg' -CFG_VYOS_COMMON: str = f'{GRUB_DIR_VYOS}/25-vyos-common-autoload.cfg' -CFG_VYOS_PLATFORM: str = f'{GRUB_DIR_VYOS}/30-vyos-platform-autoload.cfg' -CFG_VYOS_MENU: str = f'{GRUB_DIR_VYOS}/40-vyos-menu-autoload.cfg' -CFG_VYOS_OPTIONS: str = f'{GRUB_DIR_VYOS}/50-vyos-options.cfg' -GRUB_DIR_VYOS_VERS: str = f'{GRUB_DIR_VYOS}/vyos-versions' - -TMPL_VYOS_VERSION: str = 'grub/grub_vyos_version.j2' -TMPL_GRUB_VARS: str = 'grub/grub_vars.j2' -TMPL_GRUB_MAIN: str = 'grub/grub_main.j2' -TMPL_GRUB_MENU: str = 'grub/grub_menu.j2' -TMPL_GRUB_MODULES: str = 'grub/grub_modules.j2' -TMPL_GRUB_OPTS: str = 'grub/grub_options.j2' -TMPL_GRUB_COMMON: str = 'grub/grub_common.j2' - -# prepare regexes -REGEX_GRUB_VARS: str = r'^set (?P.+)=[\'"]?(?P.*)(?.+)$' -REGEX_KERNEL_CMDLINE: str = r'^BOOT_IMAGE=/(?Pboot|live)/((?P.+)/)?vmlinuz.*$' - - -# structures definitions -class ImageDetails(TypedDict): - name: str - version: str - disk_ro: int - disk_rw: int - disk_total: int - - -class BootDetails(TypedDict): - image_default: str - image_running: str - images_available: list[str] - console_type: str - console_num: int - - -class Grub: - - def install(self, drive_path: str, boot_dir: str, - efi_dir: str) -> None: - """Install GRUB for both BIOS and EFI modes (hybrid boot) - - Args: - drive_path (str): path to a drive where GRUB must be installed - boot_dir (str): a path to '/boot' directory - efi_dir (str): a path to '/boot/efi' directory - """ - commands: list[str] = [ - f'grub-install --no-floppy --target=i386-pc --boot-directory={boot_dir} \ - {drive_path} --force', - f'grub-install --no-floppy --recheck --target=x86_64-efi \ - --force-extra-removable --boot-directory={boot_dir} \ - --efi-directory={efi_dir} --bootloader-id="VyOS" \ - --no-uefi-secure-boot' - ] - for command in commands: - run(command) - - def gen_version_uuid(self, version_name: str) -> str: - """Generate unique ID from version name - - Use UUID5 / NAMESPACE_URL with prefix `uuid5-` - - Args: - version_name (str): version name - - Returns: - str: generated unique ID - """ - ver_uuid = uuid5(NAMESPACE_URL, version_name) - ver_id = f'uuid5-{ver_uuid}' - return ver_id - - def version_add(self, - version_name: str, - root_dir: str = '', - boot_opts: str = '') -> None: - """Add a new VyOS version to GRUB loader configuration - - Args: - vyos_version (str): VyOS version name - root_dir (str): an optional path to the root directory. - Defaults to empty. - boot_opts (str): an optional boot options for Linux kernel. - Defaults to empty. - """ - if not root_dir: - root_dir = find_presistence() - version_config: str = f'{root_dir}/{GRUB_DIR_VYOS_VERS}/{version_name}.cfg' - render( - version_config, TMPL_VYOS_VERSION, { - 'version_name': version_name, - 'version_uuid': self.gen_version_uuid(version_name), - 'boot_opts': boot_opts - }) - - def version_del(self, vyos_version: str, root_dir: str = '') -> None: - """Delete a VyOS version from GRUB loader configuration - - Args: - vyos_version (str): VyOS version name - root_dir (str): an optional path to the root directory. - Defaults to empty. - """ - if not root_dir: - root_dir = find_presistence() - version_config = f'{root_dir}/{GRUB_DIR_VYOS_VERS}/{vyos_version}.cfg' - Path(version_config).unlink(missing_ok=True) - - def grub_version_list(self, root_dir: str = '') -> list[str]: - """Generate a list with installed VyOS versions - - Args: - root_dir (str): an optional path to the root directory. - Defaults to empty. - - Returns: - list: A list with versions names - """ - if not root_dir: - root_dir = find_presistence() - versions_files = Path(f'{root_dir}/{GRUB_DIR_VYOS_VERS}').glob('*.cfg') - versions_list: list[str] = [] - for file in versions_files: - versions_list.append(file.stem) - return versions_list - - def grub_read_env(self, env_file: str = '') -> dict[str, str]: - """Read GRUB environment - - Args: - env_file (str, optional): a path to grub environment file. - Defaults to empty. - - Returns: - dict: dictionary with GRUB environment - """ - if not env_file: - root_dir: str = find_presistence() - env_file = f'{root_dir}/{GRUB_DIR_MAIN}/grubenv' - - env_content: str = cmd(f'grub-editenv {env_file} list').splitlines() - regex_filter = re_compile( - r'^(?P.*)=(?P.*)$') - env_dict: dict[str, str] = {} - for env_item in env_content: - search_result = regex_filter.fullmatch(env_item) - if search_result: - search_result_dict: dict[str, str] = search_result.groupdict() - variable_name: str = search_result_dict.get('variable_name', '') - variable_value: str = search_result_dict.get( - 'variable_value', '') - if variable_name and variable_value: - env_dict.update({variable_name: variable_value}) - return env_dict - - def grub_get_cfg_ver(self, root_dir: str = '') -> int: - """Get current version of GRUB configuration - - Args: - root_dir (str, optional): an optional path to the root directory. - Defaults to empty. - - Returns: - int: a configuration version - """ - if not root_dir: - root_dir = find_presistence() - - cfg_ver: Union[str, None] = grub_vars_read( - f'{root_dir}/{CFG_VYOS_HEADER}').get('VYOS_CFG_VER') - if cfg_ver: - cfg_ver_int: int = int(cfg_ver) - else: - cfg_ver_int: int = 0 - return cfg_ver_int - - def grub_write_cfg_ver(self, cfg_ver: int, root_dir: str = '') -> None: - """Write version number of GRUB configuration - - Args: - cfg_ver (int): a version number to write - root_dir (str, optional): an optional path to the root directory. - Defaults to empty. - - Returns: - int: a configuration version - """ - if not root_dir: - root_dir = find_presistence() - - vars_file: str = f'{root_dir}/{CFG_VYOS_HEADER}' - vars_current: dict[str, str] = grub_vars_read(vars_file) - vars_current['VYOS_CFG_VER'] = str(cfg_ver) - grub_vars_write(vars_file, vars_current) - - def grub_vars_read(self, grub_cfg: str) -> dict[str, str]: - """Read variables from a GRUB configuration file - - Args: - grub_cfg (str): a path to the GRUB config file - - Returns: - dict: a dictionary with variables and values - """ - vars_dict: dict[str, str] = {} - regex_filter = re_compile(REGEX_GRUB_VARS) - try: - config_text: list[str] = Path(grub_cfg).read_text().splitlines() - except FileNotFoundError: - return vars_dict - for line in config_text: - search_result = regex_filter.fullmatch(line) - if search_result: - search_dict = search_result.groupdict() - variable_name: str = search_dict.get('variable_name', '') - variable_value: str = search_dict.get('variable_value', '') - if variable_name and variable_value: - vars_dict.update({variable_name: variable_value}) - return vars_dict - - def grub_modules_read(self, grub_cfg: str) -> list[str]: - """Read modules list from a GRUB configuration file - - Args: - grub_cfg (str): a path to the GRUB config file - - Returns: - list: a list with modules to load - """ - mods_list: list[str] = [] - regex_filter = re_compile(REGEX_GRUB_MODULES, MULTILINE) - try: - config_text = Path(grub_cfg).read_text() - except FileNotFoundError: - return mods_list - mods_list = regex_filter.findall(config_text) - - return mods_list - - def grub_modules_write(self, grub_cfg: str, mods_list: list[str]) -> None: - """Write modules list to a GRUB configuration file (overwrite everything) - - Args: - grub_cfg (str): a path to GRUB configuration file - mods_list (list): a list with modules to load - """ - render(grub_cfg, TMPL_GRUB_MODULES, {'mods_list': mods_list}) - - def grub_vars_write(self, grub_cfg: str, grub_vars: dict[str, str]) -> None: - """Write variables to a GRUB configuration file (overwrite everything) - - Args: - grub_cfg (str): a path to GRUB configuration file - grub_vars (dict): a dictionary with new variables - """ - render(grub_cfg, TMPL_GRUB_VARS, {'vars': grub_vars}) - - def grub_set_default(self, version_name: str, root_dir: str = '') -> None: - """Set version as default boot entry - - Args: - version_name (str): versio name - root_dir (str, optional): an optional path to the root directory. - Defaults to empty. - """ - if not root_dir: - root_dir = find_presistence() - - vars_file = f'{root_dir}/{CFG_VYOS_VARS}' - vars_current = grub_vars_read(vars_file) - vars_current['default'] = self.gen_version_uuid(version_name) - grub_vars_write(vars_file, vars_current) - - def grub_common_write(self, root_dir: str = '') -> None: - """Write common GRUB configuration file (overwrite everything) - - Args: - root_dir (str, optional): an optional path to the root directory. - Defaults to empty. - """ - if not root_dir: - root_dir = find_presistence() - common_config = f'{root_dir}/{CFG_VYOS_COMMON}' - render(common_config, TMPL_GRUB_COMMON, {}) - - def create_grub_structure(self, root_dir: str = '') -> None: - """Create GRUB directories structure - - Args: - root_dir (str, optional): an optional path to the root directory. - Defaults to ''. - """ - if not root_dir: - root_dir = find_presistence() - - Path(f'{root_dir}/GRUB_DIR_VYOS_VERS').mkdir(parents=True, - exist_ok=True) - - -def disk_cleanup(drive_path: str) -> None: - """Clean up disk partition table (MBR and GPT) - Zeroize primary and secondary headers - first and last 17408 bytes - (512 bytes * 34 LBA) on a drive - - Args: - drive_path (str): path to a drive that needs to be cleaned - """ - # with open(drive_path, 'w+b') as drive: - # drive.seek(0) - # drive.write(b'0' * 17408) - # drive.seek(-17408, 2) - # drive.write(b'0' * 17408) - # # update partitons in kernel - # sync() - # run(f'partprobe {drive_path}') - run(f'sgdisk -Z {drive_path}') - - -def bootmode_detect() -> str: - """Detect system boot mode - - Returns: - str: 'bios' or 'efi' - """ - if Path('/sys/firmware/efi/').exists(): - return 'efi' - else: - return 'bios' - - -def parttable_create(drive_path: str, root_size: int) -> None: - """Create a hybrid MBR/GPT partition table - 0-2047 first sectors are free - 2048-4095 sectors - BIOS Boot Partition - 4096 + 256 MB - EFI system partition - Everything else till the end of a drive - Linux partition - - Args: - drive_path (str): path to a drive - """ - if not root_size: - root_size_text: str = '+100%' - else: - root_size_text: str = str(root_size) - command = f'sgdisk -a1 -n1:2048:4095 -t1:EF02 -n2:4096:+256M -t2:EF00 \ - -n3:0:+{root_size_text}K -t3:8300 {drive_path}' - - run(command) - # update partitons in kernel - sync() - run(f'partprobe {drive_path}') - - -def filesystem_create(partition: str, fstype: str) -> None: - """Create a filesystem on a partition - - Args: - partition (str): path to a partition (for example: '/dev/sda1') - fstype (str): filesystem type ('efi' or 'ext4') - """ - if fstype == 'efi': - command = 'mkfs -t fat -n EFI' - run(f'{command} {partition}') - if fstype == 'ext4': - command = 'mkfs -t ext4 -L persistence' - run(f'{command} {partition}') - - -def partition_mount(partition: str, - path: str, - fsype: str = '', - overlay_params: dict[str, str] = {}) -> None: - """Mount a partition into a path - - Args: - partition (str): path to a partition (for example: '/dev/sda1') - path (str): a path where to mount - fsype (str): optionally, set fstype ('squashfs', 'overlay', 'iso9660') - overlay_params (dict): optionally, set overlay parameters. - Defaults to None. - """ - if fsype in ['squashfs', 'iso9660']: - command: str = f'mount -o loop,ro -t {fsype} {partition} {path}' - if fsype == 'overlay' and overlay_params: - command: str = f'mount -t overlay -o noatime,\ - upperdir={overlay_params["upperdir"]},\ - lowerdir={overlay_params["lowerdir"]},\ - workdir={overlay_params["workdir"]} overlay {path}' - - else: - command = f'mount {partition} {path}' - - run(command) - - -def partition_umount(partition: str = '', path: str = '') -> None: - """Umount a partition by a partition name or a path - - Args: - partition (str): path to a partition (for example: '/dev/sda1') - path (str): a path where a partition is mounted - """ - if partition: - command = f'umount {partition}' - run(command) - if path: - command = f'umount {path}' - run(command) - - -def grub_install(drive_path: str, boot_dir: str, efi_dir: str) -> None: - """Install GRUB for both BIOS and EFI modes (hybrid boot) - - Args: - drive_path (str): path to a drive where GRUB must be installed - boot_dir (str): a path to '/boot' directory - efi_dir (str): a path to '/boot/efi' directory - """ - commands: list[str] = [ - f'grub-install --no-floppy --target=i386-pc --boot-directory={boot_dir} \ - {drive_path} --force' , - f'grub-install --no-floppy --recheck --target=x86_64-efi \ - --force-extra-removable --boot-directory={boot_dir} \ - --efi-directory={efi_dir} --bootloader-id="VyOS" \ - --no-uefi-secure-boot' - ] - for command in commands: - run(command) - - -def find_presistence() -> str: - """Find a mountpoint for persistence storage - - Returns: - str: Path where 'persistance' pertition is mounted, Empty if not found - """ - mounted_partitions = disk_partitions() - for partition in mounted_partitions: - if partition.mountpoint.endswith('/persistence'): - return partition.mountpoint - return '' - - -def find_device(mountpoint: str) -> str: - """Find a device by mountpoint - - Returns: - str: Path to device, Empty if not found - """ - mounted_partitions = disk_partitions() - for partition in mounted_partitions: - if partition.mountpoint == mountpoint: - return partition.mountpoint - return '' - - -def gen_version_uuid(version_name: str) -> str: - """Generate unique ID from version name - - Use UUID5 / NAMESPACE_URL with prefix `uuid5-` - - Args: - version_name (str): version name - - Returns: - str: generated unique ID - """ - ver_uuid = uuid5(NAMESPACE_URL, version_name) - ver_id = f'uuid5-{ver_uuid}' - return ver_id - - -def grub_version_add(version_name: str, - root_dir: str = '', - boot_opts: str = '') -> None: - """Add a new VyOS version to GRUB loader configuration - - Args: - vyos_version (str): VyOS version name - root_dir (str): an optional path to the root directory. - Defaults to empty. - boot_opts (str): an optional boot options for Linux kernel. - Defaults to empty. - """ - if not root_dir: - root_dir = find_presistence() - version_config: str = f'{root_dir}/{GRUB_DIR_VYOS_VERS}/{version_name}.cfg' - render( - version_config, TMPL_VYOS_VERSION, { - 'version_name': version_name, - 'version_uuid': gen_version_uuid(version_name), - 'boot_opts': boot_opts - }) - - -def grub_version_del(vyos_version: str, root_dir: str = '') -> None: - """Delete a VyOS version from GRUB loader configuration - - Args: - vyos_version (str): VyOS version name - root_dir (str): an optional path to the root directory. - Defaults to empty. - """ - if not root_dir: - root_dir = find_presistence() - version_config = f'{root_dir}/{GRUB_DIR_VYOS_VERS}/{vyos_version}.cfg' - Path(version_config).unlink(missing_ok=True) - - -def grub_version_list(root_dir: str = '') -> list[str]: - """Generate a list with installed VyOS versions - - Args: - root_dir (str): an optional path to the root directory. - Defaults to empty. - - Returns: - list: A list with versions names - """ - if not root_dir: - root_dir = find_presistence() - versions_files = Path(f'{root_dir}/{GRUB_DIR_VYOS_VERS}').glob('*.cfg') - versions_list: list[str] = [] - for file in versions_files: - versions_list.append(file.stem) - return versions_list - - -def grub_read_env(env_file: str = '') -> dict[str, str]: - """Read GRUB environment - - Args: - env_file (str, optional): a path to grub environment file. - Defaults to empty. - - Returns: - dict: dictionary with GRUB environment - """ - if not env_file: - root_dir: str = find_presistence() - env_file = f'{root_dir}/{GRUB_DIR_MAIN}/grubenv' - - env_content: str = cmd(f'grub-editenv {env_file} list').splitlines() - regex_filter = re_compile(r'^(?P.*)=(?P.*)$') - env_dict: dict[str, str] = {} - for env_item in env_content: - search_result = regex_filter.fullmatch(env_item) - if search_result: - search_result_dict: dict[str, str] = search_result.groupdict() - variable_name: str = search_result_dict.get('variable_name', '') - variable_value: str = search_result_dict.get('variable_value', '') - if variable_name and variable_value: - env_dict.update({variable_name: variable_value}) - return env_dict - - -def grub_get_cfg_ver(root_dir: str = '') -> int: - """Get current version of GRUB configuration - - Args: - root_dir (str, optional): an optional path to the root directory. - Defaults to empty. - - Returns: - int: a configuration version - """ - if not root_dir: - root_dir = find_presistence() - - cfg_ver: Union[str, None] = grub_vars_read( - f'{root_dir}/{CFG_VYOS_HEADER}').get('VYOS_CFG_VER') - if cfg_ver: - cfg_ver_int: int = int(cfg_ver) - else: - cfg_ver_int: int = 0 - return cfg_ver_int - - -def grub_write_cfg_ver(cfg_ver: int, root_dir: str = '') -> None: - """Write version number of GRUB configuration - - Args: - cfg_ver (int): a version number to write - root_dir (str, optional): an optional path to the root directory. - Defaults to empty. - - Returns: - int: a configuration version - """ - if not root_dir: - root_dir = find_presistence() - - vars_file: str = f'{root_dir}/{CFG_VYOS_HEADER}' - vars_current: dict[str, str] = grub_vars_read(vars_file) - vars_current['VYOS_CFG_VER'] = str(cfg_ver) - grub_vars_write(vars_file, vars_current) - - -def grub_vars_read(grub_cfg: str) -> dict[str, str]: - """Read variables from a GRUB configuration file - - Args: - grub_cfg (str): a path to the GRUB config file - - Returns: - dict: a dictionary with variables and values - """ - vars_dict: dict[str, str] = {} - regex_filter = re_compile(REGEX_GRUB_VARS) - try: - config_text: list[str] = Path(grub_cfg).read_text().splitlines() - except FileNotFoundError: - return vars_dict - for line in config_text: - search_result = regex_filter.fullmatch(line) - if search_result: - search_dict = search_result.groupdict() - variable_name: str = search_dict.get('variable_name', '') - variable_value: str = search_dict.get('variable_value', '') - if variable_name and variable_value: - vars_dict.update({variable_name: variable_value}) - return vars_dict - - -def grub_modules_read(grub_cfg: str) -> list[str]: - """Read modules list from a GRUB configuration file - - Args: - grub_cfg (str): a path to the GRUB config file - - Returns: - list: a list with modules to load - """ - mods_list: list[str] = [] - regex_filter = re_compile(REGEX_GRUB_MODULES, MULTILINE) - try: - config_text = Path(grub_cfg).read_text() - except FileNotFoundError: - return mods_list - mods_list = regex_filter.findall(config_text) - - return mods_list - - -def grub_modules_write(grub_cfg: str, mods_list: list[str]) -> None: - """Write modules list to a GRUB configuration file (overwrite everything) - - Args: - grub_cfg (str): a path to GRUB configuration file - mods_list (list): a list with modules to load - """ - render(grub_cfg, TMPL_GRUB_MODULES, {'mods_list': mods_list}) - - -def grub_vars_write(grub_cfg: str, grub_vars: dict[str, str]) -> None: - """Write variables to a GRUB configuration file (overwrite everything) - - Args: - grub_cfg (str): a path to GRUB configuration file - grub_vars (dict): a dictionary with new variables - """ - render(grub_cfg, TMPL_GRUB_VARS, {'vars': grub_vars}) - - -def grub_set_default(version_name: str, root_dir: str = '') -> None: - """Set version as default boot entry - - Args: - version_name (str): versio name - root_dir (str, optional): an optional path to the root directory. - Defaults to empty. - """ - if not root_dir: - root_dir = find_presistence() - - vars_file = f'{root_dir}/{CFG_VYOS_VARS}' - vars_current = grub_vars_read(vars_file) - vars_current['default'] = gen_version_uuid(version_name) - grub_vars_write(vars_file, vars_current) - - -def grub_common_write(root_dir: str = '') -> None: - """Write common GRUB configuration file (overwrite everything) - - Args: - root_dir (str, optional): an optional path to the root directory. - Defaults to empty. - """ - if not root_dir: - root_dir = find_presistence() - common_config = f'{root_dir}/{CFG_VYOS_COMMON}' - render(common_config, TMPL_GRUB_COMMON, {}) - - -def raid_create(raid_name: str, - raid_members: list[str], - raid_level: str = 'raid1') -> None: - """Create a RAID array - - Args: - raid_name (str): a name of array (data, backup, test, etc.) - raid_members (list[str]): a list of array members - raid_level (str, optional): an array level. Defaults to 'raid1'. - """ - raid_devices_num: int = len(raid_members) - raid_members_str: str = ' '.join(raid_members) - command: str = f'mdadm --create /dev/md/{raid_name} --metadata=1.2 \ - --raid-devices={raid_devices_num} --level={raid_level} \ - {raid_members_str}' - - run(command) - - -def disks_size() -> dict[str, int]: - """Get a dictionary with physical disks and their sizes - - Returns: - dict[str, int]: a dictionary with name: size mapping - """ - disks_size: dict[str, int] = {} - lsblk: str = cmd('lsblk -Jbp') - blk_list = json_loads(lsblk) - for device in blk_list.get('blockdevices'): - if device['type'] == 'disk': - disks_size.update({device['name']: device['size']}) - return disks_size - - -def image_get_version(image_name: str, root_dir: str) -> str: - """Extract version name from rootfs based on image name - - Args: - image_name (str): a name of image (from boot menu) - root_dir (str): a root directory of persistence storage - - Returns: - str: version name - """ - squashfs_file: str = next( - Path(f'{root_dir}/boot/{image_name}').glob('*.squashfs')).as_posix() - with TemporaryDirectory() as squashfs_mounted: - partition_mount(squashfs_file, squashfs_mounted, 'squashfs') - version_file: str = Path( - f'{squashfs_mounted}/opt/vyatta/etc/version').read_text() - partition_umount(squashfs_file) - version_name: str = version_file.lstrip('Version: ').strip() - - return version_name - - -def image_details(image_name: str, root_dir: str = '') -> ImageDetails: - """Return information about image - - Args: - image_name (str): a name of an image - root_dir (str, optional): an optional path to the root directory. - Defaults to ''. - - Returns: - ImageDetails: a dictionary with details about an image (name, size) - """ - if not root_dir: - root_dir = find_presistence() - - image_version: str = image_get_version(image_name, root_dir) - - image_path: Path = Path(f'{root_dir}/boot/{image_name}') - image_path_rw: Path = Path(f'{root_dir}/boot/{image_name}/rw') - - image_disk_ro: int = int() - for item in image_path.iterdir(): - if not item.is_symlink(): - image_disk_ro += item.stat().st_size - - image_disk_rw: int = int() - for item in image_path_rw.rglob('*'): - if not item.is_symlink(): - image_disk_rw += item.stat().st_size - - image_details: ImageDetails = { - 'name': image_name, - 'version': image_version, - 'disk_ro': image_disk_ro, - 'disk_rw': image_disk_rw, - 'disk_total': image_disk_ro + image_disk_rw - } - - return image_details - - -def get_running_image() -> str: - """Find currently running image name - - Returns: - str: image name - """ - running_image: str = '' - regex_filter = re_compile(REGEX_KERNEL_CMDLINE) - cmdline: str = Path('/proc/cmdline').read_text() - running_image_result = regex_filter.match(cmdline) - if running_image_result: - running_image: str = running_image_result.groupdict().get( - 'image_version', '') - # we need to have a fallbak for live systems - if not running_image: - running_image: str = version.get_version() - - return running_image - - -def get_default_image(root_dir: str = '') -> str: - """Get default boot entry - - Args: - root_dir (str, optional): an optional path to the root directory. - Defaults to empty. - Returns: - str: a version name - """ - if not root_dir: - root_dir = find_presistence() - - vars_file: str = f'{root_dir}/{CFG_VYOS_VARS}' - vars_current: dict[str, str] = grub_vars_read(vars_file) - default_uuid: str = vars_current.get('default', '') - if default_uuid: - images_list: list[str] = Grub.grub_version_list(root_dir) - for image_name in images_list: - if default_uuid == gen_version_uuid(image_name): - return image_name - return '' - else: - return '' - - -def image_name_validate(image_name: str) -> bool: - """Validate image name - - Args: - image_name (str): suggested image name - - Returns: - bool: validation result - """ - regex_filter = re_compile(r'^[\w\.+-]{1,32}$') - if regex_filter.match(image_name): - return True - return False - - -def is_live_boot() -> bool: - """Detect live booted system - - Returns: - bool: True if the system currently booted in live mode - """ - regex_filter = re_compile(REGEX_KERNEL_CMDLINE) - cmdline: str = Path('/proc/cmdline').read_text() - running_image_result = regex_filter.match(cmdline) - if running_image_result: - boot_type: str = running_image_result.groupdict().get('boot_type', '') - if boot_type == 'live': - return True - return False - - -def create_grub_structure(root_dir: str = '') -> None: - """Create GRUB directories structure - - Args: - root_dir (str, optional): an optional path to the root directory. - Defaults to ''. - """ - if not root_dir: - root_dir = find_presistence() - - Path(f'{root_dir}/GRUB_DIR_VYOS_VERS').mkdir(parents=True, exist_ok=True) -- cgit v1.2.3 From fcded7930b5426193e8490c6df2a70e300a60e31 Mon Sep 17 00:00:00 2001 From: John Estabrook Date: Wed, 20 Sep 2023 14:53:24 -0500 Subject: image: T5195: vyos.util -> vyos.utils package refactoring --- python/vyos/system/disk.py | 2 +- python/vyos/system/grub.py | 2 +- src/op_mode/image_installer.py | 2 +- src/op_mode/image_manager.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) (limited to 'python') diff --git a/python/vyos/system/disk.py b/python/vyos/system/disk.py index e20cf32be..b78190c06 100644 --- a/python/vyos/system/disk.py +++ b/python/vyos/system/disk.py @@ -18,7 +18,7 @@ from os import sync from psutil import disk_partitions -from vyos.util import run, cmd +from vyos.utils.process import run, cmd def disk_cleanup(drive_path: str) -> None: diff --git a/python/vyos/system/grub.py b/python/vyos/system/grub.py index 11c214675..47c674de8 100644 --- a/python/vyos/system/grub.py +++ b/python/vyos/system/grub.py @@ -19,7 +19,7 @@ from typing import Union from uuid import uuid5, NAMESPACE_URL, UUID from vyos.template import render -from vyos.util import run, cmd +from vyos.utils.process import run, cmd from vyos.system import disk # Define variables diff --git a/src/op_mode/image_installer.py b/src/op_mode/image_installer.py index 972faef8c..12d32968c 100755 --- a/src/op_mode/image_installer.py +++ b/src/op_mode/image_installer.py @@ -32,7 +32,7 @@ from vyos.system import disk, grub, image from vyos.template import render from vyos.utils.io import ask_input, ask_yes_no from vyos.utils.file import chmod_2775 -from vyos.util import run +from vyos.utils.process import run # define text messages MSG_ERR_NOT_LIVE: str = 'The system is already installed. Please use "add system image" instead.' diff --git a/src/op_mode/image_manager.py b/src/op_mode/image_manager.py index ac889da38..76fc4367f 100755 --- a/src/op_mode/image_manager.py +++ b/src/op_mode/image_manager.py @@ -23,7 +23,7 @@ from shutil import rmtree from sys import exit from vyos.system import disk, grub, image -from vyos.util import ask_yes_no +from vyos.utils.io import ask_yes_no def delete_image(image_name: str) -> None: -- cgit v1.2.3 From 8efab9ee8cdb0e65dddb9d3ba97de8ddcf3666dc Mon Sep 17 00:00:00 2001 From: John Estabrook Date: Fri, 20 Oct 2023 23:47:20 -0500 Subject: image: T4516: improve format of 'show system image details' --- python/vyos/utils/convert.py | 5 ++++- src/op_mode/image_info.py | 14 +++++++++----- 2 files changed, 13 insertions(+), 6 deletions(-) (limited to 'python') diff --git a/python/vyos/utils/convert.py b/python/vyos/utils/convert.py index 9a8a1ff7d..c02f0071e 100644 --- a/python/vyos/utils/convert.py +++ b/python/vyos/utils/convert.py @@ -52,7 +52,8 @@ def seconds_to_human(s, separator=""): return result -def bytes_to_human(bytes, initial_exponent=0, precision=2): +def bytes_to_human(bytes, initial_exponent=0, precision=2, + int_below_exponent=0): """ Converts a value in bytes to a human-readable size string like 640 KB The initial_exponent parameter is the exponent of 2, @@ -68,6 +69,8 @@ def bytes_to_human(bytes, initial_exponent=0, precision=2): # log2 is a float, while range checking requires an int exponent = int(log2(bytes)) + if exponent < int_below_exponent: + precision = 0 if exponent < 10: value = bytes diff --git a/src/op_mode/image_info.py b/src/op_mode/image_info.py index ae0677196..14dca7476 100755 --- a/src/op_mode/image_info.py +++ b/src/op_mode/image_info.py @@ -20,11 +20,11 @@ import sys from typing import List, Union -from hurry.filesize import size from tabulate import tabulate from vyos import opmode from vyos.system import disk, grub, image +from vyos.utils.convert import bytes_to_human def _format_show_images_summary(images_summary: image.BootDetails) -> str: @@ -58,11 +58,15 @@ def _format_show_images_details( for image_item in images_details: name: str = image_item.get('name') version: str = image_item.get('version') - disk_ro: int = size(image_item.get('disk_ro')) - disk_rw: int = size(image_item.get('disk_rw')) - disk_total: int = size(image_item.get('disk_total')) + disk_ro: str = bytes_to_human(image_item.get('disk_ro'), + precision=1, int_below_exponent=30) + disk_rw: str = bytes_to_human(image_item.get('disk_rw'), + precision=1, int_below_exponent=30) + disk_total: str = bytes_to_human(image_item.get('disk_total'), + precision=1, int_below_exponent=30) table_data.append([name, version, disk_ro, disk_rw, disk_total]) - tabulated: str = tabulate(table_data, headers) + tabulated: str = tabulate(table_data, headers, + colalign=('left', 'left', 'right', 'right', 'right')) return tabulated -- cgit v1.2.3 From 96b65e90fbfa1fe63d97929ac86fc910abb0caa9 Mon Sep 17 00:00:00 2001 From: John Estabrook Date: Sat, 21 Oct 2023 13:33:37 -0500 Subject: image: T4516: support for interoperability of legacy/new image tools This commit allows management of system images with either new or legacy tools: 'add/delete/rename system image' and 'set default' are translated appropriately on booting between images with the old and new tools. Consequently, the warning of the initial commit of T4516 is dropped. --- data/templates/grub/grub_compat.j2 | 58 +++++++ python/vyos/system/__init__.py | 4 +- python/vyos/system/compat.py | 307 +++++++++++++++++++++++++++++++++++++ python/vyos/system/disk.py | 2 +- python/vyos/system/grub.py | 7 +- python/vyos/system/image.py | 88 +++++++++-- src/op_mode/image_info.py | 7 +- src/op_mode/image_installer.py | 17 +- src/op_mode/image_manager.py | 7 +- src/system/grub_update.py | 141 ++--------------- 10 files changed, 486 insertions(+), 152 deletions(-) create mode 100644 data/templates/grub/grub_compat.j2 create mode 100644 python/vyos/system/compat.py (limited to 'python') diff --git a/data/templates/grub/grub_compat.j2 b/data/templates/grub/grub_compat.j2 new file mode 100644 index 000000000..935172005 --- /dev/null +++ b/data/templates/grub/grub_compat.j2 @@ -0,0 +1,58 @@ +{# j2lint: disable=S6 #} +### Generated by VyOS image-tools v.{{ tools_version }} ### +{% macro menu_name(mode) -%} +{% if mode == 'normal' -%} + VyOS +{%- elif mode == 'pw_reset' -%} + Lost password change +{%- else -%} + Unknown +{%- endif %} +{%- endmacro %} +{% macro console_name(type) -%} +{% if type == 'tty' -%} + KVM +{%- elif type == 'ttyS' -%} + Serial +{%- elif type == 'ttyUSB' -%} + USB +{%- else -%} + Unknown +{%- endif %} +{%- endmacro %} +{% macro console_opts(type) -%} +{% if type == 'tty' -%} + console=ttyS0,115200 console=tty0 +{%- elif type == 'ttyS' -%} + console=tty0 console=ttyS0,115200 +{%- elif type == 'ttyUSB' -%} + console=tty0 console=ttyUSB0,115200 +{%- else -%} + console=tty0 console=ttyS0,115200 +{%- endif %} +{%- endmacro %} +{% macro passwd_opts(mode) -%} +{% if mode == 'pw_reset' -%} + init=/opt/vyatta/sbin/standalone_root_pw_reset +{%- endif %} +{%- endmacro %} +set default={{ default }} +set timeout={{ timeout }} +{% if console_type == 'ttyS' %} +serial --unit={{ console_num }} --speed=115200 +{% else %} +serial --unit=0 --speed=115200 +{% endif %} +terminal_output --append serial +terminal_input serial console +{% if efi %} +insmod efi_gop +insmod efi_uga +{% endif %} + +{% for v in versions %} +menuentry "{{ menu_name(v.bootmode) }} {{ v.version }} ({{ console_name(v.console_type) }} console)" { + linux /boot/{{ v.version }}/vmlinuz {{ v.boot_opts }} {{ console_opts(v.console_type) }} {{ passwd_opts(v.bootmode) }} + initrd /boot/{{ v.version }}/initrd.img +} +{% endfor %} diff --git a/python/vyos/system/__init__.py b/python/vyos/system/__init__.py index 403738e20..0c91330ba 100644 --- a/python/vyos/system/__init__.py +++ b/python/vyos/system/__init__.py @@ -1,4 +1,4 @@ -# Copyright 2022 VyOS maintainers and contributors +# Copyright 2023 VyOS maintainers and contributors # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -14,3 +14,5 @@ # License along with this library. If not, see . __all_: list[str] = ['disk', 'grub', 'image'] +# define image-tools version +SYSTEM_CFG_VER = 1 diff --git a/python/vyos/system/compat.py b/python/vyos/system/compat.py new file mode 100644 index 000000000..aa9b0b4b5 --- /dev/null +++ b/python/vyos/system/compat.py @@ -0,0 +1,307 @@ +# Copyright 2023 VyOS maintainers and contributors +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this library. If not, see . + +from pathlib import Path +from re import compile, MULTILINE, DOTALL +from functools import wraps +from copy import deepcopy +from typing import Union + +from vyos.system import disk, grub, image, SYSTEM_CFG_VER +from vyos.template import render + +TMPL_GRUB_COMPAT: str = 'grub/grub_compat.j2' + +# define regexes and variables +REGEX_VERSION = r'^menuentry "[^\n]*{\n[^}]*\s+linux /boot/(?P\S+)/[^}]*}' +REGEX_MENUENTRY = r'^menuentry "[^\n]*{\n[^}]*\s+linux /boot/(?P\S+)/vmlinuz (?P[^\n]+)\n[^}]*}' +REGEX_CONSOLE = r'^.*console=(?P[^\s\d]+)(?P[\d]+).*$' +REGEX_SANIT_CONSOLE = r'\ ?console=[^\s\d]+[\d]+(,\d+)?\ ?' +REGEX_SANIT_INIT = r'\ ?init=\S*\ ?' +REGEX_SANIT_QUIET = r'\ ?quiet\ ?' +PW_RESET_OPTION = 'init=/opt/vyatta/sbin/standalone_root_pw_reset' + + +class DowngradingImageTools(Exception): + """Raised when attempting to add an image with an earlier version + of image-tools than the current system, as indicated by the value + of SYSTEM_CFG_VER or absence thereof.""" + pass + + +def mode(): + if grub.get_cfg_ver() >= SYSTEM_CFG_VER: + return False + + return True + + +def find_versions(menu_entries: list) -> list: + """Find unique VyOS versions from menu entries + + Args: + menu_entries (list): a list with menu entries + + Returns: + list: List of installed versions + """ + versions = [] + for vyos_ver in menu_entries: + versions.append(vyos_ver.get('version')) + # remove duplicates + versions = list(set(versions)) + return versions + + +def filter_unparsed(grub_path: str) -> str: + """Find currently installed VyOS version + + Args: + grub_path (str): a path to the grub.cfg file + + Returns: + str: unparsed grub.cfg items + """ + config_text = Path(grub_path).read_text() + regex_filter = compile(REGEX_VERSION, MULTILINE | DOTALL) + filtered = regex_filter.sub('', config_text) + regex_filter = compile(grub.REGEX_GRUB_VARS, MULTILINE) + filtered = regex_filter.sub('', filtered) + regex_filter = compile(grub.REGEX_GRUB_MODULES, MULTILINE) + filtered = regex_filter.sub('', filtered) + # strip extra new lines + filtered = filtered.strip() + return filtered + + +def sanitize_boot_opts(boot_opts: str) -> str: + """Sanitize boot options from console and init + + Args: + boot_opts (str): boot options + + Returns: + str: sanitized boot options + """ + regex_filter = compile(REGEX_SANIT_CONSOLE) + boot_opts = regex_filter.sub('', boot_opts) + regex_filter = compile(REGEX_SANIT_INIT) + boot_opts = regex_filter.sub('', boot_opts) + # legacy tools add 'quiet' on add system image; this is not desired + regex_filter = compile(REGEX_SANIT_QUIET) + boot_opts = regex_filter.sub(' ', boot_opts) + + return boot_opts + + +def parse_entry(entry: tuple) -> dict: + """Parse GRUB menuentry + + Args: + entry (tuple): tuple of (version, options) + + Returns: + dict: dictionary with parsed options + """ + # save version to dict + entry_dict = {'version': entry[0]} + # detect boot mode type + if PW_RESET_OPTION in entry[1]: + entry_dict['bootmode'] = 'pw_reset' + else: + entry_dict['bootmode'] = 'normal' + # find console type and number + regex_filter = compile(REGEX_CONSOLE) + entry_dict.update(regex_filter.match(entry[1]).groupdict()) + entry_dict['boot_opts'] = sanitize_boot_opts(entry[1]) + + return entry_dict + + +def parse_menuentries(grub_path: str) -> list: + """Parse all GRUB menuentries + + Args: + grub_path (str): a path to GRUB config file + + Returns: + list: list with menu items (each item is a dict) + """ + menuentries = [] + # read configuration file + config_text = Path(grub_path).read_text() + # parse menuentries to tuples (version, options) + regex_filter = compile(REGEX_MENUENTRY, MULTILINE) + filter_results = regex_filter.findall(config_text) + # parse each entry + for entry in filter_results: + menuentries.append(parse_entry(entry)) + + return menuentries + + +def prune_vyos_versions(root_dir: str = '') -> None: + """Delete vyos-versions files of registered images subsequently deleted + or renamed by legacy image-tools + + Args: + root_dir (str): an optional path to the root directory + """ + if not root_dir: + root_dir = disk.find_persistence() + + for version in grub.version_list(): + if not Path(f'{root_dir}/boot/{version}').is_dir(): + grub.version_del(version) + + +def update_cfg_ver(root_dir:str = '') -> int: + """Get minumum version of image-tools across all installed images + + Args: + root_dir (str): an optional path to the root directory + + Returns: + int: minimum version of image-tools + """ + if not root_dir: + root_dir = disk.find_persistence() + + prune_vyos_versions(root_dir) + + images_details = image.get_images_details() + cfg_version = min(d['tools_version'] for d in images_details) + + return cfg_version + + +def get_default(menu_entries: list, root_dir: str = '') -> Union[int, None]: + """Translate default version to menuentry index + + Args: + menu_entries (list): list of dicts of installed version boot data + root_dir (str): an optional path to the root directory + + Returns: + int: index of default version in menu_entries or None + """ + if not root_dir: + root_dir = disk.find_persistence() + + grub_cfg_main = f'{root_dir}/{grub.GRUB_CFG_MAIN}' + + image_name = image.get_default_image() + + sublist = list(filter(lambda x: x.get('version') == image_name, + menu_entries)) + if sublist: + return menu_entries.index(sublist[0]) + + return None + + +def update_version_list(root_dir: str = '') -> list[dict]: + """Update list of dicts of installed version boot data + + Args: + root_dir (str): an optional path to the root directory + + Returns: + list: list of dicts of installed version boot data + """ + if not root_dir: + root_dir = disk.find_persistence() + + grub_cfg_main = f'{root_dir}/{grub.GRUB_CFG_MAIN}' + + # get list of versions in menuentries + menu_entries = parse_menuentries(grub_cfg_main) + menu_versions = find_versions(menu_entries) + + # get list of versions added/removed by image-tools + current_versions = grub.version_list(root_dir) + + remove = list(set(menu_versions) - set(current_versions)) + for ver in remove: + menu_entries = list(filter(lambda x: x.get('version') != ver, + menu_entries)) + + add = list(set(current_versions) - set(menu_versions)) + for ver in add: + last = menu_entries[0].get('version') + new = deepcopy(list(filter(lambda x: x.get('version') == last, + menu_entries))) + for e in new: + boot_opts = e.get('boot_opts').replace(last, ver) + e.update({'version': ver, 'boot_opts': boot_opts}) + + menu_entries = new + menu_entries + + return menu_entries + + +def grub_cfg_fields(root_dir: str = '') -> dict: + """Gather fields for rendering grub.cfg + + Args: + root_dir (str): an optional path to the root directory + + Returns: + dict: dictionary for rendering TMPL_GRUB_COMPAT + """ + if not root_dir: + root_dir = disk.find_persistence() + + grub_cfg_main = f'{root_dir}/{grub.GRUB_CFG_MAIN}' + + fields = {'default': 0, 'timeout': 5} + # 'default' and 'timeout' from legacy grub.cfg + fields |= grub.vars_read(grub_cfg_main) + fields['tools_version'] = SYSTEM_CFG_VER + menu_entries = update_version_list(root_dir) + fields['versions'] = menu_entries + default = get_default(menu_entries, root_dir) + if default is not None: + fields['default'] = default + + p = Path('/sys/firmware/efi') + if p.is_dir(): + fields['efi'] = True + else: + fields['efi'] = False + + return fields + + +def render_grub_cfg(root_dir: str = '') -> None: + """Render grub.cfg for legacy compatibility""" + if not root_dir: + root_dir = disk.find_persistence() + + grub_cfg_main = f'{root_dir}/{grub.GRUB_CFG_MAIN}' + + fields = grub_cfg_fields(root_dir) + render(grub_cfg_main, TMPL_GRUB_COMPAT, fields) + + +def grub_cfg_update(func): + """Decorator to update grub.cfg after function call""" + @wraps(func) + def wrapper(*args, **kwargs): + ret = func(*args, **kwargs) + if mode(): + render_grub_cfg() + return ret + return wrapper diff --git a/python/vyos/system/disk.py b/python/vyos/system/disk.py index b78190c06..882b4eb39 100644 --- a/python/vyos/system/disk.py +++ b/python/vyos/system/disk.py @@ -1,4 +1,4 @@ -# Copyright 2022 VyOS maintainers and contributors +# Copyright 2023 VyOS maintainers and contributors # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public diff --git a/python/vyos/system/grub.py b/python/vyos/system/grub.py index 47c674de8..9ac205c03 100644 --- a/python/vyos/system/grub.py +++ b/python/vyos/system/grub.py @@ -1,4 +1,4 @@ -# Copyright 2022 VyOS maintainers and contributors +# Copyright 2023 VyOS maintainers and contributors # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -24,6 +24,7 @@ from vyos.system import disk # Define variables GRUB_DIR_MAIN: str = '/boot/grub' +GRUB_CFG_MAIN: str = f'{GRUB_DIR_MAIN}/grub.cfg' GRUB_DIR_VYOS: str = f'{GRUB_DIR_MAIN}/grub.cfg.d' CFG_VYOS_HEADER: str = f'{GRUB_DIR_VYOS}/00-vyos-header.cfg' CFG_VYOS_MODULES: str = f'{GRUB_DIR_VYOS}/10-vyos-modules-autoload.cfg' @@ -181,8 +182,8 @@ def get_cfg_ver(root_dir: str = '') -> int: if not root_dir: root_dir = disk.find_persistence() - cfg_ver: Union[str, None] = vars_read(f'{root_dir}/{CFG_VYOS_HEADER}').get( - 'VYOS_CFG_VER') + cfg_ver: str = vars_read(f'{root_dir}/{CFG_VYOS_HEADER}').get( + 'VYOS_CFG_VER') if cfg_ver: cfg_ver_int: int = int(cfg_ver) else: diff --git a/python/vyos/system/image.py b/python/vyos/system/image.py index b77c3563f..6c4e3bba5 100644 --- a/python/vyos/system/image.py +++ b/python/vyos/system/image.py @@ -1,4 +1,4 @@ -# Copyright 2022 VyOS maintainers and contributors +# Copyright 2023 VyOS maintainers and contributors # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -28,12 +28,14 @@ CFG_VYOS_VARS: str = f'{GRUB_DIR_VYOS}/20-vyos-defaults-autoload.cfg' GRUB_DIR_VYOS_VERS: str = f'{GRUB_DIR_VYOS}/vyos-versions' # prepare regexes REGEX_KERNEL_CMDLINE: str = r'^BOOT_IMAGE=/(?Pboot|live)/((?P.+)/)?vmlinuz.*$' +REGEX_SYSTEM_CFG_VER: str = r'(\r\n|\r|\n)SYSTEM_CFG_VER\s*=\s*(?P\d+)(\r\n|\r|\n)' # structures definitions class ImageDetails(TypedDict): name: str version: str + tools_version: int disk_ro: int disk_rw: int disk_total: int @@ -59,26 +61,73 @@ def bootmode_detect() -> str: return 'bios' -def get_version(image_name: str, root_dir: str) -> str: - """Extract version name from rootfs based on image name +def get_image_version(mount_path: str) -> str: + """Extract version name from rootfs mounted at mount_path Args: - image_name (str): a name of image (from boot menu) - root_dir (str): a root directory of persistence storage + mount_path (str): mount path of rootfs Returns: str: version name """ + version_file: str = Path( + f'{mount_path}/opt/vyatta/etc/version').read_text() + version_name: str = version_file.lstrip('Version: ').strip() + + return version_name + + +def get_image_tools_version(mount_path: str) -> int: + """Extract image-tools version from rootfs mounted at mount_path + + Args: + mount_path (str): mount path of rootfs + + Returns: + str: image-tools version + """ + try: + version_file: str = Path( + f'{mount_path}/usr/lib/python3/dist-packages/vyos/system/__init__.py').read_text() + except FileNotFoundError: + system_cfg_ver: int = 0 + else: + res = re_compile(REGEX_SYSTEM_CFG_VER).search(version_file) + system_cfg_ver: int = int(res.groupdict().get('cfg_ver', 0)) + + return system_cfg_ver + + +def get_versions(image_name: str, root_dir: str = '') -> dict[str, str]: + """Return versions of image and image-tools + + Args: + image_name (str): a name of an image + root_dir (str, optional): an optional path to the root directory. + Defaults to ''. + + Returns: + dict[str, int]: a dictionary with versions of image and image-tools + """ + if not root_dir: + root_dir = disk.find_persistence() + squashfs_file: str = next( Path(f'{root_dir}/boot/{image_name}').glob('*.squashfs')).as_posix() with TemporaryDirectory() as squashfs_mounted: disk.partition_mount(squashfs_file, squashfs_mounted, 'squashfs') - version_file: str = Path( - f'{squashfs_mounted}/opt/vyatta/etc/version').read_text() + + image_version: str = get_image_version(squashfs_mounted) + image_tools_version: int = get_image_tools_version(squashfs_mounted) + disk.partition_umount(squashfs_file) - version_name: str = version_file.lstrip('Version: ').strip() - return version_name + versions: dict[str, int] = { + 'image': image_version, + 'image-tools': image_tools_version + } + + return versions def get_details(image_name: str, root_dir: str = '') -> ImageDetails: @@ -95,7 +144,9 @@ def get_details(image_name: str, root_dir: str = '') -> ImageDetails: if not root_dir: root_dir = disk.find_persistence() - image_version: str = get_version(image_name, root_dir) + versions = get_versions(image_name, root_dir) + image_version: str = versions.get('image', '') + image_tools_version: int = versions.get('image-tools', 0) image_path: Path = Path(f'{root_dir}/boot/{image_name}') image_path_rw: Path = Path(f'{root_dir}/boot/{image_name}/rw') @@ -113,6 +164,7 @@ def get_details(image_name: str, root_dir: str = '') -> ImageDetails: image_details: ImageDetails = { 'name': image_name, 'version': image_version, + 'tools_version': image_tools_version, 'disk_ro': image_disk_ro, 'disk_rw': image_disk_rw, 'disk_total': image_disk_ro + image_disk_rw @@ -121,6 +173,20 @@ def get_details(image_name: str, root_dir: str = '') -> ImageDetails: return image_details +def get_images_details() -> list[ImageDetails]: + """Return information about all images + + Returns: + list[ImageDetails]: a list of dictionaries with details about images + """ + images: list[str] = grub.version_list() + images_details: list[ImageDetails] = list() + for image_name in images: + images_details.append(get_details(image_name)) + + return images_details + + def get_running_image() -> str: """Find currently running image name @@ -134,7 +200,7 @@ def get_running_image() -> str: if running_image_result: running_image: str = running_image_result.groupdict().get( 'image_version', '') - # we need to have a fallbak for live systems + # we need to have a fallback for live systems if not running_image: running_image: str = version.get_version() diff --git a/src/op_mode/image_info.py b/src/op_mode/image_info.py index 14dca7476..791001e00 100755 --- a/src/op_mode/image_info.py +++ b/src/op_mode/image_info.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright 2022 VyOS maintainers and contributors +# Copyright 2023 VyOS maintainers and contributors # # This file is part of VyOS. # @@ -91,10 +91,7 @@ def show_images_summary(raw: bool) -> Union[image.BootDetails, str]: def show_images_details(raw: bool) -> Union[list[image.ImageDetails], str]: - images: list[str] = grub.version_list() - images_details: list[image.ImageDetails] = list() - for image_name in images: - images_details.append(image.get_details(image_name)) + images_details = image.get_images_details() if raw: return images_details diff --git a/src/op_mode/image_installer.py b/src/op_mode/image_installer.py index 12d32968c..2d998f5e1 100755 --- a/src/op_mode/image_installer.py +++ b/src/op_mode/image_installer.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright 2022 VyOS maintainers and contributors +# Copyright 2023 VyOS maintainers and contributors # # This file is part of VyOS. # @@ -28,7 +28,7 @@ from psutil import disk_partitions from vyos.configtree import ConfigTree from vyos.remote import download -from vyos.system import disk, grub, image +from vyos.system import disk, grub, image, compat, SYSTEM_CFG_VER from vyos.template import render from vyos.utils.io import ask_input, ask_yes_no from vyos.utils.file import chmod_2775 @@ -462,6 +462,7 @@ def install_image() -> None: exit(1) +@compat.grub_cfg_update def add_image(image_path: str) -> None: """Add a new image @@ -488,10 +489,16 @@ def add_image(image_path: str) -> None: Path(DIR_ROOTFS_SRC).mkdir(mode=0o755, parents=True) disk.partition_mount(f'{DIR_ISO_MOUNT}/live/filesystem.squashfs', DIR_ROOTFS_SRC, 'squashfs') - version_file: str = Path( - f'{DIR_ROOTFS_SRC}/opt/vyatta/etc/version').read_text() + + cfg_ver: str = image.get_image_tools_version(DIR_ROOTFS_SRC) + version_name: str = image.get_image_version(DIR_ROOTFS_SRC) + disk.partition_umount(f'{DIR_ISO_MOUNT}/live/filesystem.squashfs') - version_name: str = version_file.lstrip('Version: ').strip() + + if cfg_ver < SYSTEM_CFG_VER: + raise compat.DowngradingImageTools( + f'Adding image would downgrade image tools to v.{cfg_ver}; disallowed') + image_name: str = ask_input(MSG_INPUT_IMAGE_NAME, version_name) set_as_default: bool = ask_yes_no(MSG_INPUT_IMAGE_DEFAULT, default=True) diff --git a/src/op_mode/image_manager.py b/src/op_mode/image_manager.py index 76fc4367f..725c40613 100755 --- a/src/op_mode/image_manager.py +++ b/src/op_mode/image_manager.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright 2022 VyOS maintainers and contributors +# Copyright 2023 VyOS maintainers and contributors # # This file is part of VyOS. # @@ -22,10 +22,11 @@ from pathlib import Path from shutil import rmtree from sys import exit -from vyos.system import disk, grub, image +from vyos.system import disk, grub, image, compat from vyos.utils.io import ask_yes_no +@compat.grub_cfg_update def delete_image(image_name: str) -> None: """Remove installed image files and boot entry @@ -57,6 +58,7 @@ def delete_image(image_name: str) -> None: exit(f'Unable to remove the image "{image_name}": {err}') +@compat.grub_cfg_update def set_image(image_name: str) -> None: """Set default boot image @@ -86,6 +88,7 @@ def set_image(image_name: str) -> None: exit(f'Unable to set default image "{image_name}": {err}') +@compat.grub_cfg_update def rename_image(name_old: str, name_new: str) -> None: """Rename installed image diff --git a/src/system/grub_update.py b/src/system/grub_update.py index 1ae66464b..da1986e9d 100644 --- a/src/system/grub_update.py +++ b/src/system/grub_update.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright 2022 VyOS maintainers and contributors +# Copyright 2023 VyOS maintainers and contributors # # This file is part of VyOS. # @@ -18,23 +18,11 @@ # VyOS. If not, see . from pathlib import Path -from re import compile, MULTILINE, DOTALL from sys import exit -from vyos.system import disk, grub, image +from vyos.system import disk, grub, image, compat, SYSTEM_CFG_VER from vyos.template import render -# define configuration version -CFG_VER = 1 - -# define regexes and variables -REGEX_VERSION = r'^menuentry "[^\n]*{\n[^}]*\s+linux /boot/(?P\S+)/[^}]*}' -REGEX_MENUENTRY = r'^menuentry "[^\n]*{\n[^}]*\s+linux /boot/(?P\S+)/vmlinuz (?P[^\n]+)\n[^}]*}' -REGEX_CONSOLE = r'^.*console=(?P[^\s\d]+)(?P[\d]+).*$' -REGEX_SANIT_CONSOLE = r'\ ?console=[^\s\d]+[\d]+(,\d+)?\ ?' -REGEX_SANIT_INIT = r'\ ?init=\S*\ ?' -PW_RESET_OPTION = 'init=/opt/vyatta/sbin/standalone_root_pw_reset' - def cfg_check_update() -> bool: """Check if GRUB structure update is required @@ -43,111 +31,10 @@ def cfg_check_update() -> bool: bool: False if not required, True if required """ current_ver = grub.get_cfg_ver() - if current_ver and current_ver >= CFG_VER: + if current_ver and current_ver >= SYSTEM_CFG_VER: return False - else: - return True - - -def find_versions(menu_entries: list) -> list: - """Find unique VyOS versions from menu entries - - Args: - menu_entries (list): a list with menu entries - - Returns: - list: List of installed versions - """ - versions = [] - for vyos_ver in menu_entries: - versions.append(vyos_ver.get('version')) - # remove duplicates - versions = list(set(versions)) - return versions - -def filter_unparsed(grub_path: str) -> str: - """Find currently installed VyOS version - - Args: - grub_path (str): a path to the grub.cfg file - - Returns: - str: unparsed grub.cfg items - """ - config_text = Path(grub_path).read_text() - regex_filter = compile(REGEX_VERSION, MULTILINE | DOTALL) - filtered = regex_filter.sub('', config_text) - regex_filter = compile(grub.REGEX_GRUB_VARS, MULTILINE) - filtered = regex_filter.sub('', filtered) - regex_filter = compile(grub.REGEX_GRUB_MODULES, MULTILINE) - filtered = regex_filter.sub('', filtered) - # strip extra new lines - filtered = filtered.strip() - return filtered - - -def sanitize_boot_opts(boot_opts: str) -> str: - """Sanitize boot options from console and init - - Args: - boot_opts (str): boot options - - Returns: - str: sanitized boot options - """ - regex_filter = compile(REGEX_SANIT_CONSOLE) - boot_opts = regex_filter.sub('', boot_opts) - regex_filter = compile(REGEX_SANIT_INIT) - boot_opts = regex_filter.sub('', boot_opts) - - return boot_opts - - -def parse_entry(entry: tuple) -> dict: - """Parse GRUB menuentry - - Args: - entry (tuple): tuple of (version, options) - - Returns: - dict: dictionary with parsed options - """ - # save version to dict - entry_dict = {'version': entry[0]} - # detect boot mode type - if PW_RESET_OPTION in entry[1]: - entry_dict['bootmode'] = 'pw_reset' - else: - entry_dict['bootmode'] = 'normal' - # find console type and number - regex_filter = compile(REGEX_CONSOLE) - entry_dict.update(regex_filter.match(entry[1]).groupdict()) - entry_dict['boot_opts'] = sanitize_boot_opts(entry[1]) - - return entry_dict - - -def parse_menuntries(grub_path: str) -> list: - """Parse all GRUB menuentries - - Args: - grub_path (str): a path to GRUB config file - - Returns: - list: list with menu items (each item is a dict) - """ - menuentries = [] - # read configuration file - config_text = Path(grub_path).read_text() - # parse menuentries to tuples (version, options) - regex_filter = compile(REGEX_MENUENTRY, MULTILINE) - filter_results = regex_filter.findall(config_text) - # parse each entry - for entry in filter_results: - menuentries.append(parse_entry(entry)) - - return menuentries + return True if __name__ == '__main__': @@ -162,12 +49,12 @@ if __name__ == '__main__': root_dir = disk.find_persistence() # read current GRUB config - grub_cfg_main = f'{root_dir}/{image.GRUB_DIR_MAIN}/grub.cfg' + grub_cfg_main = f'{root_dir}/{grub.GRUB_CFG_MAIN}' vars = grub.vars_read(grub_cfg_main) modules = grub.modules_read(grub_cfg_main) - vyos_menuentries = parse_menuntries(grub_cfg_main) - vyos_versions = find_versions(vyos_menuentries) - unparsed_items = filter_unparsed(grub_cfg_main) + vyos_menuentries = compat.parse_menuentries(grub_cfg_main) + vyos_versions = compat.find_versions(vyos_menuentries) + unparsed_items = compat.filter_unparsed(grub_cfg_main) # find default values default_entry = vyos_menuentries[int(vars['default'])] @@ -185,13 +72,12 @@ if __name__ == '__main__': # print(f'unparsed_items: {unparsed_items}') # create new files - grub_cfg_vars = f'{root_dir}/{image.CFG_VYOS_VARS}' + grub_cfg_vars = f'{root_dir}/{grub.CFG_VYOS_VARS}' grub_cfg_modules = f'{root_dir}/{grub.CFG_VYOS_MODULES}' grub_cfg_platform = f'{root_dir}/{grub.CFG_VYOS_PLATFORM}' grub_cfg_menu = f'{root_dir}/{grub.CFG_VYOS_MENU}' grub_cfg_options = f'{root_dir}/{grub.CFG_VYOS_OPTIONS}' - render(grub_cfg_main, grub.TMPL_GRUB_MAIN, {}) Path(image.GRUB_DIR_VYOS).mkdir(exist_ok=True) grub.vars_write(grub_cfg_vars, vars) grub.modules_write(grub_cfg_modules, modules) @@ -210,5 +96,12 @@ if __name__ == '__main__': grub.version_add(vyos_ver, root_dir, boot_opts) # update structure version - grub.write_cfg_ver(CFG_VER, root_dir) + cfg_ver = compat.update_cfg_ver(root_dir) + grub.write_cfg_ver(cfg_ver, root_dir) + + if compat.mode(): + compat.render_grub_cfg(root_dir) + else: + render(grub_cfg_main, grub.TMPL_GRUB_MAIN, {}) + exit(0) -- cgit v1.2.3 From 9ffa3e82d951756696367578dd5e82ef0f690065 Mon Sep 17 00:00:00 2001 From: John Estabrook Date: Tue, 31 Oct 2023 12:53:52 -0500 Subject: image: T4516: restore select entry to set/delete image --- op-mode-definitions/system-image.xml.in | 12 ++++++++++ python/vyos/utils/io.py | 19 +++++++++++++++ src/op_mode/image_manager.py | 42 ++++++++++++++++++++++++--------- 3 files changed, 62 insertions(+), 11 deletions(-) (limited to 'python') diff --git a/op-mode-definitions/system-image.xml.in b/op-mode-definitions/system-image.xml.in index 57aeb7bb4..463b985d6 100644 --- a/op-mode-definitions/system-image.xml.in +++ b/op-mode-definitions/system-image.xml.in @@ -77,6 +77,12 @@ Set system image parameters + + + Set default image to boot. + + sudo ${vyos_op_scripts_dir}/image_manager.py --action set + Set default image to boot. @@ -115,6 +121,12 @@ Delete system objects + + + Remove an installed image from the system + + sudo ${vyos_op_scripts_dir}/image_manager.py --action delete + Remove an installed image from the system diff --git a/python/vyos/utils/io.py b/python/vyos/utils/io.py index 8790cbaac..e34a1ba32 100644 --- a/python/vyos/utils/io.py +++ b/python/vyos/utils/io.py @@ -72,3 +72,22 @@ def is_dumb_terminal(): """Check if the current TTY is dumb, so that we can disable advanced terminal features.""" import os return os.getenv('TERM') in ['vt100', 'dumb'] + +def select_entry(l: list, list_msg: str = '', prompt_msg: str = '') -> str: + """Select an entry from a list + + Args: + l (list): a list of entries + list_msg (str): a message to print before listing the entries + prompt_msg (str): a message to print as prompt for selection + + Returns: + str: a selected entry + """ + en = list(enumerate(l, 1)) + print(list_msg) + for i, e in en: + print(f'\t{i}: {e}') + select = ask_input(prompt_msg, numeric_only=True, + valid_responses=range(1, len(l)+1)) + return next(filter(lambda x: x[0] == select, en))[1] diff --git a/src/op_mode/image_manager.py b/src/op_mode/image_manager.py index 55fd5c07d..de53c4cf0 100755 --- a/src/op_mode/image_manager.py +++ b/src/op_mode/image_manager.py @@ -21,23 +21,39 @@ from argparse import ArgumentParser, Namespace from pathlib import Path from shutil import rmtree from sys import exit +from typing import Optional from vyos.system import disk, grub, image, compat -from vyos.utils.io import ask_yes_no +from vyos.utils.io import ask_yes_no, select_entry + +SET_IMAGE_LIST_MSG: str = 'The following images are available:' +SET_IMAGE_PROMPT_MSG: str = 'Select an image to set as default:' +DELETE_IMAGE_LIST_MSG: str = 'The following images are installed:' +DELETE_IMAGE_PROMPT_MSG: str = 'Select an image to delete:' +MSG_DELETE_IMAGE_RUNNING: str = 'Currently running image cannot be deleted; reboot into another image first' +MSG_DELETE_IMAGE_DEFAULT: str = 'Default image cannot be deleted; set another image as default first' @compat.grub_cfg_update -def delete_image(image_name: str) -> None: +def delete_image(image_name: Optional[str] = None, + prompt: bool = True) -> None: """Remove installed image files and boot entry Args: image_name (str): a name of image to delete """ + available_images: list[str] = grub.version_list() + if image_name is None: + if not prompt: + exit('An image name is required for delete action') + else: + image_name = select_entry(available_images, + DELETE_IMAGE_LIST_MSG, + DELETE_IMAGE_PROMPT_MSG) if image_name == image.get_running_image(): - exit('Currently running image cannot be deleted') + exit(MSG_DELETE_IMAGE_RUNNING) if image_name == image.get_default_image(): - exit('Default image cannot be deleted') - available_images: list[str] = grub.version_list() + exit(MSG_DELETE_IMAGE_DEFAULT) if image_name not in available_images: exit(f'The image "{image_name}" cannot be found') presistence_storage: str = disk.find_persistence() @@ -59,15 +75,23 @@ def delete_image(image_name: str) -> None: @compat.grub_cfg_update -def set_image(image_name: str) -> None: +def set_image(image_name: Optional[str] = None, + prompt: bool = True) -> None: """Set default boot image Args: image_name (str): an image name """ + available_images: list[str] = grub.version_list() + if image_name is None: + if not prompt: + exit('An image name is required for set action') + else: + image_name = select_entry(available_images, + SET_IMAGE_LIST_MSG, + SET_IMAGE_PROMPT_MSG) if image_name == image.get_default_image(): exit(f'The image "{image_name}" already configured as default') - available_images: list[str] = grub.version_list() if image_name not in available_images: exit(f'The image "{image_name}" cannot be found') presistence_storage: str = disk.find_persistence() @@ -154,10 +178,6 @@ def parse_arguments() -> Namespace: parser.add_argument('--image_new_name', help='a new name for image') args: Namespace = parser.parse_args() # Validate arguments - if args.action == 'delete' and not args.image_name: - exit('An image name is required for delete action') - if args.action == 'set' and not args.image_name: - exit('An image name is required for set action') if args.action == 'rename' and (not args.image_name or not args.image_new_name): exit('Both old and new image names are required for rename action') -- cgit v1.2.3 From bd701768796d6ebb03ca943faf96d1dbea030edd Mon Sep 17 00:00:00 2001 From: John Estabrook Date: Thu, 9 Nov 2023 14:34:24 -0600 Subject: image: T4516: ensure compatibility with legacy RAID 1 installs --- data/templates/grub/grub_common.j2 | 5 +++-- data/templates/grub/grub_compat.j2 | 11 ++++++++--- python/vyos/system/compat.py | 19 ++++++++++++++----- src/system/grub_update.py | 13 +++++-------- 4 files changed, 30 insertions(+), 18 deletions(-) (limited to 'python') diff --git a/data/templates/grub/grub_common.j2 b/data/templates/grub/grub_common.j2 index 78df3f48c..278ffbf2c 100644 --- a/data/templates/grub/grub_common.j2 +++ b/data/templates/grub/grub_common.j2 @@ -18,5 +18,6 @@ function setup_serial { setup_serial -# find root device -#search --no-floppy --fs-uuid --set=root ${root_uuid} +{% if search_root %} +{{ search_root }} +{% endif %} diff --git a/data/templates/grub/grub_compat.j2 b/data/templates/grub/grub_compat.j2 index 935172005..887d5d0bd 100644 --- a/data/templates/grub/grub_compat.j2 +++ b/data/templates/grub/grub_compat.j2 @@ -45,9 +45,14 @@ serial --unit=0 --speed=115200 {% endif %} terminal_output --append serial terminal_input serial console -{% if efi %} -insmod efi_gop -insmod efi_uga +{% for mod in modules %} +insmod {{ mod }} +{% endfor %} +{% if root %} +set root={{ root }} +{% endif %} +{% if search_root %} +{{ search_root }} {% endif %} {% for v in versions %} diff --git a/python/vyos/system/compat.py b/python/vyos/system/compat.py index aa9b0b4b5..319c3dabf 100644 --- a/python/vyos/system/compat.py +++ b/python/vyos/system/compat.py @@ -86,6 +86,12 @@ def filter_unparsed(grub_path: str) -> str: return filtered +def get_search_root(unparsed: str) -> str: + unparsed_lines = unparsed.splitlines() + search_root = next((x for x in unparsed_lines if 'search' in x), '') + return search_root + + def sanitize_boot_opts(boot_opts: str) -> str: """Sanitize boot options from console and init @@ -269,18 +275,21 @@ def grub_cfg_fields(root_dir: str = '') -> dict: fields = {'default': 0, 'timeout': 5} # 'default' and 'timeout' from legacy grub.cfg fields |= grub.vars_read(grub_cfg_main) + fields['tools_version'] = SYSTEM_CFG_VER menu_entries = update_version_list(root_dir) fields['versions'] = menu_entries + default = get_default(menu_entries, root_dir) if default is not None: fields['default'] = default - p = Path('/sys/firmware/efi') - if p.is_dir(): - fields['efi'] = True - else: - fields['efi'] = False + modules = grub.modules_read(grub_cfg_main) + fields['modules'] = modules + + unparsed = filter_unparsed(grub_cfg_main).splitlines() + search_root = next((x for x in unparsed if 'search' in x), '') + fields['search_root'] = search_root return fields diff --git a/src/system/grub_update.py b/src/system/grub_update.py index da1986e9d..366a85344 100644 --- a/src/system/grub_update.py +++ b/src/system/grub_update.py @@ -55,7 +55,10 @@ if __name__ == '__main__': vyos_menuentries = compat.parse_menuentries(grub_cfg_main) vyos_versions = compat.find_versions(vyos_menuentries) unparsed_items = compat.filter_unparsed(grub_cfg_main) - + # compatibilty for raid installs + search_root = compat.get_search_root(unparsed_items) + common_dict = {} + common_dict['search_root'] = search_root # find default values default_entry = vyos_menuentries[int(vars['default'])] default_settings = { @@ -66,11 +69,6 @@ if __name__ == '__main__': } vars.update(default_settings) - # print(f'vars: {vars}') - # print(f'modules: {modules}') - # print(f'vyos_menuentries: {vyos_menuentries}') - # print(f'unparsed_items: {unparsed_items}') - # create new files grub_cfg_vars = f'{root_dir}/{grub.CFG_VYOS_VARS}' grub_cfg_modules = f'{root_dir}/{grub.CFG_VYOS_MODULES}' @@ -81,8 +79,7 @@ if __name__ == '__main__': Path(image.GRUB_DIR_VYOS).mkdir(exist_ok=True) grub.vars_write(grub_cfg_vars, vars) grub.modules_write(grub_cfg_modules, modules) - # Path(grub_cfg_platform).write_text(unparsed_items) - grub.common_write() + grub.common_write(grub_common=common_dict) render(grub_cfg_menu, grub.TMPL_GRUB_MENU, {}) render(grub_cfg_options, grub.TMPL_GRUB_OPTS, {}) -- cgit v1.2.3 From e036f783bc85e4d2bad5f5cbfd688a03a352223e Mon Sep 17 00:00:00 2001 From: John Estabrook Date: Wed, 15 Nov 2023 11:56:52 -0600 Subject: image: T4516: add raid-1 install support --- python/vyos/system/disk.py | 83 +++++++++--- python/vyos/system/grub.py | 15 ++- python/vyos/system/raid.py | 115 ++++++++++++++++ python/vyos/utils/io.py | 10 +- src/op_mode/image_installer.py | 295 ++++++++++++++++++++++++++++++++--------- 5 files changed, 426 insertions(+), 92 deletions(-) create mode 100644 python/vyos/system/raid.py (limited to 'python') diff --git a/python/vyos/system/disk.py b/python/vyos/system/disk.py index 882b4eb39..49e6b5c5e 100644 --- a/python/vyos/system/disk.py +++ b/python/vyos/system/disk.py @@ -15,12 +15,20 @@ from json import loads as json_loads from os import sync +from dataclasses import dataclass from psutil import disk_partitions from vyos.utils.process import run, cmd +@dataclass +class DiskDetails: + """Disk details""" + name: str + partition: dict[str, str] + + def disk_cleanup(drive_path: str) -> None: """Clean up disk partition table (MBR and GPT) Zeroize primary and secondary headers - first and last 17408 bytes @@ -67,6 +75,62 @@ def parttable_create(drive_path: str, root_size: int) -> None: sync() run(f'partprobe {drive_path}') + partitions: list[str] = partition_list(drive_path) + + disk: DiskDetails = DiskDetails( + name = drive_path, + partition = { + 'efi': next(x for x in partitions if x.endswith('2')), + 'root': next(x for x in partitions if x.endswith('3')) + } + ) + + return disk + + +def partition_list(drive_path: str) -> list[str]: + """Get a list of partitions on a drive + + Args: + drive_path (str): path to a drive + + Returns: + list[str]: a list of partition paths + """ + lsblk: str = cmd(f'lsblk -Jp {drive_path}') + drive_info: dict = json_loads(lsblk) + device: list = drive_info.get('blockdevices') + children: list[str] = device[0].get('children', []) if device else [] + partitions: list[str] = [child.get('name') for child in children] + return partitions + + +def partition_parent(partition_path: str) -> str: + """Get a parent device for a partition + + Args: + partition (str): path to a partition + + Returns: + str: path to a parent device + """ + parent: str = cmd(f'lsblk -ndpo pkname {partition_path}') + return parent + + +def from_partition(partition_path: str) -> DiskDetails: + drive_path: str = partition_parent(partition_path) + partitions: list[str] = partition_list(drive_path) + + disk: DiskDetails = DiskDetails( + name = drive_path, + partition = { + 'efi': next(x for x in partitions if x.endswith('2')), + 'root': next(x for x in partitions if x.endswith('3')) + } + ) + + return disk def filesystem_create(partition: str, fstype: str) -> None: """Create a filesystem on a partition @@ -138,25 +202,6 @@ def find_device(mountpoint: str) -> str: return '' -def raid_create(raid_name: str, - raid_members: list[str], - raid_level: str = 'raid1') -> None: - """Create a RAID array - - Args: - raid_name (str): a name of array (data, backup, test, etc.) - raid_members (list[str]): a list of array members - raid_level (str, optional): an array level. Defaults to 'raid1'. - """ - raid_devices_num: int = len(raid_members) - raid_members_str: str = ' '.join(raid_members) - command: str = f'mdadm --create /dev/md/{raid_name} --metadata=1.2 \ - --raid-devices={raid_devices_num} --level={raid_level} \ - {raid_members_str}' - - run(command) - - def disks_size() -> dict[str, int]: """Get a dictionary with physical disks and their sizes diff --git a/python/vyos/system/grub.py b/python/vyos/system/grub.py index 9ac205c03..0ac16af9a 100644 --- a/python/vyos/system/grub.py +++ b/python/vyos/system/grub.py @@ -19,7 +19,7 @@ from typing import Union from uuid import uuid5, NAMESPACE_URL, UUID from vyos.template import render -from vyos.utils.process import run, cmd +from vyos.utils.process import cmd from vyos.system import disk # Define variables @@ -49,7 +49,7 @@ REGEX_GRUB_MODULES: str = r'^insmod (?P.+)$' REGEX_KERNEL_CMDLINE: str = r'^BOOT_IMAGE=/(?Pboot|live)/((?P.+)/)?vmlinuz.*$' -def install(drive_path: str, boot_dir: str, efi_dir: str) -> None: +def install(drive_path: str, boot_dir: str, efi_dir: str, id: str = 'VyOS') -> None: """Install GRUB for both BIOS and EFI modes (hybrid boot) Args: @@ -62,11 +62,11 @@ def install(drive_path: str, boot_dir: str, efi_dir: str) -> None: {drive_path} --force', f'grub-install --no-floppy --recheck --target=x86_64-efi \ --force-extra-removable --boot-directory={boot_dir} \ - --efi-directory={efi_dir} --bootloader-id="VyOS" \ + --efi-directory={efi_dir} --bootloader-id="{id}" \ --no-uefi-secure-boot' ] for command in commands: - run(command) + cmd(command) def gen_version_uuid(version_name: str) -> str: @@ -294,7 +294,7 @@ def set_default(version_name: str, root_dir: str = '') -> None: vars_write(vars_file, vars_current) -def common_write(root_dir: str = '') -> None: +def common_write(root_dir: str = '', grub_common: dict[str, str] = {}) -> None: """Write common GRUB configuration file (overwrite everything) Args: @@ -304,7 +304,7 @@ def common_write(root_dir: str = '') -> None: if not root_dir: root_dir = disk.find_persistence() common_config = f'{root_dir}/{CFG_VYOS_COMMON}' - render(common_config, TMPL_GRUB_COMMON, {}) + render(common_config, TMPL_GRUB_COMMON, grub_common) def create_structure(root_dir: str = '') -> None: @@ -335,3 +335,6 @@ def set_console_type(console_type: str, root_dir: str = '') -> None: vars_current: dict[str, str] = vars_read(vars_file) vars_current['console_type'] = str(console_type) vars_write(vars_file, vars_current) + +def set_raid(root_dir: str = '') -> None: + pass diff --git a/python/vyos/system/raid.py b/python/vyos/system/raid.py new file mode 100644 index 000000000..13b99fa69 --- /dev/null +++ b/python/vyos/system/raid.py @@ -0,0 +1,115 @@ +# Copyright 2023 VyOS maintainers and contributors +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this library. If not, see . + +"""RAID related functions""" + +from pathlib import Path +from shutil import copy +from dataclasses import dataclass + +from vyos.utils.process import cmd +from vyos.system import disk + + +@dataclass +class RaidDetails: + """RAID type""" + name: str + level: str + members: list[str] + disks: list[disk.DiskDetails] + + +def raid_create(raid_members: list[str], + raid_name: str = 'md0', + raid_level: str = 'raid1') -> None: + """Create a RAID array + + Args: + raid_name (str): a name of array (data, backup, test, etc.) + raid_members (list[str]): a list of array members + raid_level (str, optional): an array level. Defaults to 'raid1'. + """ + raid_devices_num: int = len(raid_members) + raid_members_str: str = ' '.join(raid_members) + if Path('/sys/firmware/efi').exists(): + for part in raid_members: + drive: str = disk.partition_parent(part) + command: str = f'sgdisk --typecode=3:A19D880F-05FC-4D3B-A006-743F0F84911E {drive}' + cmd(command) + else: + for part in raid_members: + drive: str = disk.partition_parent(part) + command: str = f'sgdisk --typecode=3:A19D880F-05FC-4D3B-A006-743F0F84911E {drive}' + cmd(command) + for part in raid_members: + command: str = f'mdadm --zero-superblock {part}' + cmd(command) + command: str = f'mdadm --create /dev/{raid_name} -R --metadata=1.0 \ + --raid-devices={raid_devices_num} --level={raid_level} \ + {raid_members_str}' + + cmd(command) + + raid = RaidDetails( + name = f'/dev/{raid_name}', + level = raid_level, + members = raid_members, + disks = [disk.from_partition(m) for m in raid_members] + ) + + return raid + +def update_initramfs() -> None: + """Update initramfs""" + mdadm_script = '/etc/initramfs-tools/scripts/local-top/mdadm' + copy('/usr/share/initramfs-tools/scripts/local-block/mdadm', mdadm_script) + p = Path(mdadm_script) + p.write_text(p.read_text().replace('$((COUNT + 1))', '20')) + command: str = 'update-initramfs -u' + cmd(command) + +def update_default(target_dir: str) -> None: + """Update /etc/default/mdadm to start MD monitoring daemon at boot + """ + source_mdadm_config = '/etc/default/mdadm' + target_mdadm_config = Path(target_dir).joinpath('/etc/default/mdadm') + target_mdadm_config_dir = Path(target_mdadm_config).parent + Path.mkdir(target_mdadm_config_dir, parents=True, exist_ok=True) + s = Path(source_mdadm_config).read_text().replace('START_DAEMON=false', + 'START_DAEMON=true') + Path(target_mdadm_config).write_text(s) + +def get_uuid(device: str) -> str: + """Get UUID of a device""" + command: str = f'tune2fs -l {device}' + l = cmd(command).splitlines() + uuid = next((x for x in l if x.startswith('Filesystem UUID')), '') + return uuid.split(':')[1].strip() if uuid else '' + +def get_uuids(raid_details: RaidDetails) -> tuple[str]: + """Get UUIDs of RAID members + + Args: + raid_name (str): a name of array (data, backup, test, etc.) + + Returns: + tuple[str]: root_disk uuid, root_md uuid + """ + raid_name: str = raid_details.name + root_partition: str = raid_details.members[0] + uuid_root_disk: str = get_uuid(root_partition) + uuid_root_md: str = get_uuid(raid_name) + return uuid_root_disk, uuid_root_md diff --git a/python/vyos/utils/io.py b/python/vyos/utils/io.py index e34a1ba32..74099b502 100644 --- a/python/vyos/utils/io.py +++ b/python/vyos/utils/io.py @@ -13,6 +13,8 @@ # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see . +from typing import Callable + def print_error(str='', end='\n'): """ Print `str` to stderr, terminated with `end`. @@ -73,7 +75,8 @@ def is_dumb_terminal(): import os return os.getenv('TERM') in ['vt100', 'dumb'] -def select_entry(l: list, list_msg: str = '', prompt_msg: str = '') -> str: +def select_entry(l: list, list_msg: str = '', prompt_msg: str = '', + list_format: Callable = None,) -> str: """Select an entry from a list Args: @@ -87,7 +90,10 @@ def select_entry(l: list, list_msg: str = '', prompt_msg: str = '') -> str: en = list(enumerate(l, 1)) print(list_msg) for i, e in en: - print(f'\t{i}: {e}') + if list_format: + print(f'\t{i}: {list_format(e)}') + else: + print(f'\t{i}: {e}') select = ask_input(prompt_msg, numeric_only=True, valid_responses=range(1, len(l)+1)) return next(filter(lambda x: x[0] == select, en))[1] diff --git a/src/op_mode/image_installer.py b/src/op_mode/image_installer.py index 88d5eeb48..aa4cf301b 100755 --- a/src/op_mode/image_installer.py +++ b/src/op_mode/image_installer.py @@ -21,18 +21,20 @@ from argparse import ArgumentParser, Namespace from pathlib import Path from shutil import copy, chown, rmtree, copytree from sys import exit -from passlib.hosts import linux_context +from time import sleep +from typing import Union from urllib.parse import urlparse +from passlib.hosts import linux_context from psutil import disk_partitions from vyos.configtree import ConfigTree from vyos.remote import download -from vyos.system import disk, grub, image, compat, SYSTEM_CFG_VER +from vyos.system import disk, grub, image, compat, raid, SYSTEM_CFG_VER from vyos.template import render -from vyos.utils.io import ask_input, ask_yes_no +from vyos.utils.io import ask_input, ask_yes_no, select_entry from vyos.utils.file import chmod_2775 -from vyos.utils.process import run +from vyos.utils.process import cmd, run # define text messages MSG_ERR_NOT_LIVE: str = 'The system is already installed. Please use "add system image" instead.' @@ -44,11 +46,15 @@ MSG_INFO_INSTALL_EXIT: str = 'Exiting from VyOS installation' MSG_INFO_INSTALL_SUCCESS: str = 'The image installed successfully; please reboot now.' MSG_INFO_INSTALL_DISKS_LIST: str = 'The following disks were found:' MSG_INFO_INSTALL_DISK_SELECT: str = 'Which one should be used for installation?' +MSG_INFO_INSTALL_RAID_CONFIGURE: str = 'Would you like to configure RAID-1 mirroring?' +MSG_INFO_INSTALL_RAID_FOUND_DISKS: str = 'Would you like to configure RAID-1 mirroring on them?' +MSG_INFO_INSTALL_RAID_CHOOSE_DISKS: str = 'Would you like to choose two disks for RAID-1 mirroring?' MSG_INFO_INSTALL_DISK_CONFIRM: str = 'Installation will delete all data on the drive. Continue?' +MSG_INFO_INSTALL_RAID_CONFIRM: str = 'Installation will delete all data on both drives. Continue?' MSG_INFO_INSTALL_PARTITONING: str = 'Creating partition table...' MSG_INPUT_CONFIG_FOUND: str = 'An active configuration was found. Would you like to copy it to the new image?' MSG_INPUT_IMAGE_NAME: str = 'What would you like to name this image?' -MSG_INPUT_IMAGE_DEFAULT: str = 'Would you like to set a new image as default one for boot?' +MSG_INPUT_IMAGE_DEFAULT: str = 'Would you like to set the new image as the default one for boot?' MSG_INPUT_PASSWORD: str = 'Please enter a password for the "vyos" user' MSG_INPUT_ROOT_SIZE_ALL: str = 'Would you like to use all the free space on the drive?' MSG_INPUT_ROOT_SIZE_SET: str = 'Please specify the size (in GB) of the root partition (min is 1.5 GB)?' @@ -107,13 +113,14 @@ def gb_to_bytes(size: float) -> int: return int(size * 1024**3) -def find_disk() -> tuple[str, int]: +def find_disks() -> dict[str, int]: """Find a target disk for installation Returns: - tuple[str, int]: disk name and size in bytes + dict[str, int]: a list of available disks by name and size """ # check for available disks + print('Probing disks') disks_available: dict[str, int] = disk.disks_size() for disk_name, disk_size in disks_available.copy().items(): if disk_size < CONST_MIN_DISK_SIZE: @@ -122,17 +129,10 @@ def find_disk() -> tuple[str, int]: print(MSG_ERR_NO_DISK) exit(MSG_INFO_INSTALL_EXIT) - # select one as a target - print(MSG_INFO_INSTALL_DISKS_LIST) - default_disk: str = list(disks_available)[0] - for disk_name, disk_size in disks_available.items(): - disk_size_human: str = bytes_to_gb(disk_size) - print(f'Drive: {disk_name} ({disk_size_human} GB)') - disk_selected: str = ask_input(MSG_INFO_INSTALL_DISK_SELECT, - default=default_disk, - valid_responses=list(disks_available)) + num_disks: int = len(disks_available) + print(f'{num_disks} disk(s) found') - return disk_selected, disks_available[disk_selected] + return disks_available def ask_root_size(available_space: int) -> int: @@ -160,6 +160,126 @@ def ask_root_size(available_space: int) -> int: return root_size_kbytes +def create_partitions(target_disk: str, target_size: int, + prompt: bool = True) -> None: + """Create partitions on a target disk + + Args: + target_disk (str): a target disk + target_size (int): size of disk in bytes + """ + # define target rootfs size in KB (smallest unit acceptable by sgdisk) + available_size: int = (target_size - CONST_RESERVED_SPACE) // 1024 + if prompt: + rootfs_size: int = ask_root_size(available_size) + else: + rootfs_size: int = available_size + + print(MSG_INFO_INSTALL_PARTITONING) + disk.disk_cleanup(target_disk) + disk_details: disk.DiskDetails = disk.parttable_create(target_disk, + rootfs_size) + + return disk_details + + +def ask_single_disk(disks_available: dict[str, int]) -> str: + """Ask user to select a disk for installation + + Args: + disks_available (dict[str, int]): a list of available disks + """ + print(MSG_INFO_INSTALL_DISKS_LIST) + default_disk: str = list(disks_available)[0] + for disk_name, disk_size in disks_available.items(): + disk_size_human: str = bytes_to_gb(disk_size) + print(f'Drive: {disk_name} ({disk_size_human} GB)') + disk_selected: str = ask_input(MSG_INFO_INSTALL_DISK_SELECT, + default=default_disk, + valid_responses=list(disks_available)) + + # create partitions + if not ask_yes_no(MSG_INFO_INSTALL_DISK_CONFIRM): + print(MSG_INFO_INSTALL_EXIT) + exit() + + disk_details: disk.DiskDetails = create_partitions(disk_selected, + disks_available[disk_selected]) + + disk.filesystem_create(disk_details.partition['efi'], 'efi') + disk.filesystem_create(disk_details.partition['root'], 'ext4') + + return disk_details + + +def check_raid_install(disks_available: dict[str, int]) -> Union[str, None]: + """Ask user to select disks for RAID installation + + Args: + disks_available (dict[str, int]): a list of available disks + """ + if len(disks_available) < 2: + return None + + if not ask_yes_no(MSG_INFO_INSTALL_RAID_CONFIGURE, default=True): + return None + + def format_selection(disk_name: str) -> str: + return f'{disk_name}\t({bytes_to_gb(disks_available[disk_name])} GB)' + + disk0, disk1 = list(disks_available)[0], list(disks_available)[1] + disks_selected: dict[str, int] = { disk0: disks_available[disk0], + disk1: disks_available[disk1] } + + target_size: int = min(disks_selected[disk0], disks_selected[disk1]) + + print(MSG_INFO_INSTALL_DISKS_LIST) + for disk_name, disk_size in disks_selected.items(): + disk_size_human: str = bytes_to_gb(disk_size) + print(f'\t{disk_name} ({disk_size_human} GB)') + if not ask_yes_no(MSG_INFO_INSTALL_RAID_FOUND_DISKS, default=True): + if not ask_yes_no(MSG_INFO_INSTALL_RAID_CHOOSE_DISKS, default=True): + return None + else: + disks_selected = {} + disk0 = select_entry(list(disks_available), 'Disks available:', + 'Select first disk:', format_selection) + + disks_selected[disk0] = disks_available[disk0] + del disks_available[disk0] + disk1 = select_entry(list(disks_available), 'Remaining disks:', + 'Select second disk:', format_selection) + disks_selected[disk1] = disks_available[disk1] + + target_size: int = min(disks_selected[disk0], + disks_selected[disk1]) + + # create partitions + if not ask_yes_no(MSG_INFO_INSTALL_RAID_CONFIRM): + print(MSG_INFO_INSTALL_EXIT) + exit() + + disks: list[disk.DiskDetails] = [] + for disk_selected in list(disks_selected): + print(f'Creating partitions on {disk_selected}') + disk_details = create_partitions(disk_selected, target_size, + prompt=False) + disk.filesystem_create(disk_details.partition['efi'], 'efi') + + disks.append(disk_details) + + print('Creating RAID array') + members = [disk.partition['root'] for disk in disks] + raid_details: raid.RaidDetails = raid.raid_create(members) + # raid init stuff + print('Updating initramfs') + raid.update_initramfs() + # end init + print('Creating filesystem on RAID array') + disk.filesystem_create(raid_details.name, 'ext4') + + return raid_details + def prepare_tmp_disr() -> None: """Create temporary directories for installation @@ -294,7 +414,7 @@ def image_fetch(image_path: str) -> Path: if local_path.is_file(): return local_path else: - raise + raise FileNotFoundError except Exception: print(f'The image cannot be fetched from: {image_path}') exit(1) @@ -351,6 +471,27 @@ def cleanup(mounts: list[str] = [], remove_items: list[str] = []) -> None: if Path(remove_item).is_dir(): rmtree(remove_item) +def cleanup_raid(details: raid.RaidDetails) -> None: + efiparts = [] + for raid_disk in details.disks: + efiparts.append(raid_disk.partition['efi']) + cleanup([details.name, *efiparts], + ['/mnt/installation']) + + +def is_raid_install(install_object: Union[disk.DiskDetails, raid.RaidDetails]) -> bool: + """Check if installation target is a RAID array + + Args: + install_object (Union[disk.DiskDetails, raid.RaidDetails]): a target disk + + Returns: + bool: True if it is a RAID array + """ + if isinstance(install_object, raid.RaidDetails): + return True + return False + def install_image() -> None: """Install an image to a disk @@ -363,50 +504,44 @@ def install_image() -> None: print(MSG_INFO_INSTALL_EXIT) exit() + # configure image name + running_image_name: str = image.get_running_image() + while True: + image_name: str = ask_input(MSG_INPUT_IMAGE_NAME, + running_image_name) + if image.validate_name(image_name): + break + print(MSG_WARN_IMAGE_NAME_WRONG) + + # ask for password + user_password: str = ask_input(MSG_INPUT_PASSWORD, default='vyos') + + # ask for default console + console_type: str = ask_input(MSG_INPUT_CONSOLE_TYPE, + default='K', + valid_responses=['K', 'S', 'U']) + console_dict: dict[str, str] = {'K': 'tty', 'S': 'ttyS', 'U': 'ttyUSB'} + + disks: dict[str, int] = find_disks() + + install_target: Union[disk.DiskDetails, raid.RaidDetails, None] = None try: - # configure image name - running_image_name: str = image.get_running_image() - while True: - image_name: str = ask_input(MSG_INPUT_IMAGE_NAME, - running_image_name) - if image.validate_name(image_name): - break - print(MSG_WARN_IMAGE_NAME_WRONG) - - # define target drive - install_target, target_size = find_disk() - - # define target rootfs size in KB (smallest unit acceptable by sgdisk) - availabe_size: int = (target_size - CONST_RESERVED_SPACE) // 1024 - rootfs_size: int = ask_root_size(availabe_size) - - # ask for password - user_password: str = ask_input(MSG_INPUT_PASSWORD, default='vyos') - - # ask for default console - console_type: str = ask_input(MSG_INPUT_CONSOLE_TYPE, - default='K', - valid_responses=['K', 'S', 'U']) - console_dict: dict[str, str] = {'K': 'tty', 'S': 'ttyS', 'U': 'ttyUSB'} - - # create partitions - if not ask_yes_no(MSG_INFO_INSTALL_DISK_CONFIRM): - print(MSG_INFO_INSTALL_EXIT) - exit() - print(MSG_INFO_INSTALL_PARTITONING) - disk.disk_cleanup(install_target) - disk.parttable_create(install_target, rootfs_size) - disk.filesystem_create(f'{install_target}2', 'efi') - disk.filesystem_create(f'{install_target}3', 'ext4') - - # create directiroes for installation media + install_target = check_raid_install(disks) + if install_target is None: + install_target = ask_single_disk(disks) + + # create directories for installation media prepare_tmp_disr() # mount target filesystem and create required dirs inside print('Mounting new partitions') - disk.partition_mount(f'{install_target}3', DIR_DST_ROOT) - Path(f'{DIR_DST_ROOT}/boot/efi').mkdir(parents=True) - disk.partition_mount(f'{install_target}2', f'{DIR_DST_ROOT}/boot/efi') + if is_raid_install(install_target): + disk.partition_mount(install_target.name, DIR_DST_ROOT) + Path(f'{DIR_DST_ROOT}/boot/efi').mkdir(parents=True) + else: + disk.partition_mount(install_target.partition['root'], DIR_DST_ROOT) + Path(f'{DIR_DST_ROOT}/boot/efi').mkdir(parents=True) + disk.partition_mount(install_target.partition['efi'], f'{DIR_DST_ROOT}/boot/efi') # a config dir. It is the deepest one, so the comand will # create all the rest in a single step @@ -432,10 +567,10 @@ def install_image() -> None: copy(FILE_ROOTFS_SRC, f'{DIR_DST_ROOT}/boot/{image_name}/{image_name}.squashfs') - # install GRUB - print('Installing GRUB to the drive') - grub.install(install_target, f'{DIR_DST_ROOT}/boot/', - f'{DIR_DST_ROOT}/boot/efi') + if is_raid_install(install_target): + write_dir: str = f'{DIR_DST_ROOT}/boot/{image_name}/rw' + raid.update_default(write_dir) + setup_grub(DIR_DST_ROOT) # add information about version grub.create_structure() @@ -443,9 +578,34 @@ def install_image() -> None: grub.set_default(image_name, DIR_DST_ROOT) grub.set_console_type(console_dict[console_type], DIR_DST_ROOT) + if is_raid_install(install_target): + # add RAID specific modules + grub.modules_write(f'{DIR_DST_ROOT}/{grub.CFG_VYOS_MODULES}', + ['part_msdos', 'part_gpt', 'diskfilter', + 'ext2','mdraid1x']) + # install GRUB + if is_raid_install(install_target): + print('Installing GRUB to the drives') + l = install_target.disks + for disk_target in l: + disk.partition_mount(disk_target.partition['efi'], f'{DIR_DST_ROOT}/boot/efi') + grub.install(disk_target.name, f'{DIR_DST_ROOT}/boot/', + f'{DIR_DST_ROOT}/boot/efi', + id=f'VyOS (RAID disk {l.index(disk_target) + 1})') + disk.partition_umount(disk_target.partition['efi']) + else: + print('Installing GRUB to the drive') + grub.install(install_target.name, f'{DIR_DST_ROOT}/boot/', + f'{DIR_DST_ROOT}/boot/efi') + # umount filesystems and remove temporary files - cleanup([f'{install_target}2', f'{install_target}3'], - ['/mnt/installation']) + if is_raid_install(install_target): + cleanup([install_target.name], + ['/mnt/installation']) + else: + cleanup([install_target.partition['efi'], + install_target.partition['root']], + ['/mnt/installation']) # we are done print(MSG_INFO_INSTALL_SUCCESS) @@ -455,8 +615,13 @@ def install_image() -> None: print(f'Unable to install VyOS: {err}') # unmount filesystems and clenup try: - cleanup([f'{install_target}2', f'{install_target}3'], - ['/mnt/installation']) + if install_target is not None: + if is_raid_install(install_target): + cleanup_raid(install_target) + else: + cleanup([install_target.partition['efi'], + install_target.partition['root']], + ['/mnt/installation']) except Exception as err: print(f'Cleanup failed: {err}') -- cgit v1.2.3