diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/op_mode/image_info.py | 108 | ||||
| -rw-r--r-- | src/op_mode/image_installer.py | 557 | ||||
| -rw-r--r-- | src/op_mode/image_manager.py | 190 | ||||
| -rw-r--r-- | src/system/grub_update.py | 211 | ||||
| -rw-r--r-- | src/systemd/vyos-grub-update.service | 14 | 
5 files changed, 1080 insertions, 0 deletions
| diff --git a/src/op_mode/image_info.py b/src/op_mode/image_info.py new file mode 100644 index 000000000..ae0677196 --- /dev/null +++ b/src/op_mode/image_info.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python3 +# +# Copyright 2022 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This file is part of VyOS. +# +# VyOS is free software: you can redistribute it and/or modify it under the +# terms of the GNU General Public License as published by the Free Software +# Foundation, either version 3 of the License, or (at your option) any later +# version. +# +# VyOS is distributed in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# VyOS. If not, see <https://www.gnu.org/licenses/>. + +import sys +from typing import List, Union + +from hurry.filesize import size +from tabulate import tabulate + +from vyos import opmode +from vyos.system import disk, grub, image + + +def _format_show_images_summary(images_summary: image.BootDetails) -> str: +    headers: list[str] = ['Name', 'Default boot', 'Running'] +    table_data: list[list[str]] = list() +    for image_item in images_summary.get('images_available', []): +        name: str = image_item +        if images_summary.get('image_default') == name: +            default: str = 'Yes' +        else: +            default: str = '' + +        if images_summary.get('image_running') == name: +            running: str = 'Yes' +        else: +            running: str = '' + +        table_data.append([name, default, running]) +    tabulated: str = tabulate(table_data, headers) + +    return tabulated + + +def _format_show_images_details( +        images_details: list[image.ImageDetails]) -> str: +    headers: list[str] = [ +        'Name', 'Version', 'Storage Read-Only', 'Storage Read-Write', +        'Storage Total' +    ] +    table_data: list[list[Union[str, int]]] = list() +    for image_item in images_details: +        name: str = image_item.get('name') +        version: str = image_item.get('version') +        disk_ro: int = size(image_item.get('disk_ro')) +        disk_rw: int = size(image_item.get('disk_rw')) +        disk_total: int = size(image_item.get('disk_total')) +        table_data.append([name, version, disk_ro, disk_rw, disk_total]) +    tabulated: str = tabulate(table_data, headers) + +    return tabulated + + +def show_images_summary(raw: bool) -> Union[image.BootDetails, str]: +    images_available: list[str] = grub.version_list() +    root_dir: str = disk.find_persistence() +    boot_vars: dict = grub.vars_read(f'{root_dir}/{image.CFG_VYOS_VARS}') + +    images_summary: image.BootDetails = dict() + +    images_summary['image_default'] = image.get_default_image() +    images_summary['image_running'] = image.get_running_image() +    images_summary['images_available'] = images_available +    images_summary['console_type'] = boot_vars.get('console_type') +    images_summary['console_num'] = boot_vars.get('console_num') + +    if raw: +        return images_summary +    else: +        return _format_show_images_summary(images_summary) + + +def show_images_details(raw: bool) -> Union[list[image.ImageDetails], str]: +    images: list[str] = grub.version_list() +    images_details: list[image.ImageDetails] = list() +    for image_name in images: +        images_details.append(image.get_details(image_name)) + +    if raw: +        return images_details +    else: +        return _format_show_images_details(images_details) + + +if __name__ == '__main__': +    try: +        res = opmode.run(sys.modules[__name__]) +        if res: +            print(res) +    except (ValueError, opmode.Error) as e: +        print(e) +        sys.exit(1) diff --git a/src/op_mode/image_installer.py b/src/op_mode/image_installer.py new file mode 100644 index 000000000..6ebb38e46 --- /dev/null +++ b/src/op_mode/image_installer.py @@ -0,0 +1,557 @@ +#!/usr/bin/env python3 +# +# Copyright 2022 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This file is part of VyOS. +# +# VyOS is free software: you can redistribute it and/or modify it under the +# terms of the GNU General Public License as published by the Free Software +# Foundation, either version 3 of the License, or (at your option) any later +# version. +# +# VyOS is distributed in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# VyOS. If not, see <https://www.gnu.org/licenses/>. + +from argparse import ArgumentParser, Namespace +from pathlib import Path +from shutil import copy, rmtree, copytree +from sys import exit +from urllib.parse import urlparse + +from psutil import disk_partitions + +from vyos.configtree import ConfigTree +from vyos.remote import download +from vyos.system import disk, grub, image +from vyos.template import render +from vyos.util import ask_input, ask_yes_no, run + +# define text messages +MSG_ERR_NOT_LIVE: str = 'The system is already installed. Please use "add system image" instead.' +MSG_ERR_LIVE: str = 'The system is in live-boot mode. Please use "install image" instead.' +MSG_ERR_NO_DISK: str = 'No suitable disk was found. There must be at least one disk of 2GB or greater size.' +MSG_INFO_INSTALL_WELCOME: str = 'Welcome to VyOS installation!\nThis command will install the VyOS to your permanent storage.' +MSG_INFO_INSTALL_EXIT: str = 'Exitting from VyOS installation' +MSG_INFO_INSTALL_SUCCESS: str = 'The image installed successfully' +MSG_INFO_INSTALL_DISKS_LIST: str = 'Were found the next disks:' +MSG_INFO_INSTALL_DISK_SELECT: str = 'Which one should be used for installation?' +MSG_INFO_INSTALL_DISK_CONFIRM: str = 'Installation will delete all data on the drive. Continue?' +MSG_INFO_INSTALL_PARTITONING: str = 'Creating partition table...' +MSG_INPUT_CONFIG_FOUND: str = 'An active configuration was found. Would you like to copy it to the new image?' +MSG_INPUT_IMAGE_NAME: str = 'What would you like to name this image?' +MSG_INPUT_IMAGE_DEFAULT: str = 'Would you like to set a new image as default one for boot?' +MSG_INPUT_PASSWORD: str = 'Please enter a password for the "vyos" user' +MSG_INPUT_ROOT_SIZE_ALL: str = 'Would you like to use all free space on the drive?' +MSG_INPUT_ROOT_SIZE_SET: str = 'What should be a size (in GB) of the root partition (min is 1.5 GB)?' +MSG_INPUT_CONSOLE_TYPE: str = 'What console should be used by default? (K: KVM, S: Serial, U: USB-Serial)?' +MSG_WARN_ISO_SIGN_INVALID: str = 'Signature is not valid. Do you want to continue with installation?' +MSG_WARN_ISO_SIGN_UNAVAL: str = 'Signature is not available. Do you want to continue with installation?' +MSG_WARN_ROOT_SIZE_TOOBIG: str = 'The size is too big. Try again.' +MSG_WARN_ROOT_SIZE_TOOSMALL: str = 'The size is too small. Try again' +MSG_WARN_IMAGE_NAME_WRONG: str = 'The suggested name is unsupported!\n' +'It must be between 1 and 32 characters long and contains only the next characters: .+-_ a-z A-Z 0-9' +CONST_MIN_DISK_SIZE: int = 2147483648  # 2 GB +CONST_MIN_ROOT_SIZE: int = 1610612736  # 1.5 GB +# a reserved space: 2MB for header, 1 MB for BIOS partition, 256 MB for EFI +CONST_RESERVED_SPACE: int = (2 + 1 + 256) * 1024**2 + +# define directories and paths +DIR_INSTALLATION: str = '/mnt/installation' +DIR_ROOTFS_SRC: str = f'{DIR_INSTALLATION}/root_src' +DIR_ROOTFS_DST: str = f'{DIR_INSTALLATION}/root_dst' +DIR_ISO_MOUNT: str = f'{DIR_INSTALLATION}/iso_src' +DIR_DST_ROOT: str = f'{DIR_INSTALLATION}/disk_dst' +DIR_KERNEL_SRC: str = '/boot/' +FILE_ROOTFS_SRC: str = '/usr/lib/live/mount/medium/live/filesystem.squashfs' +ISO_DOWNLOAD_PATH: str = '/tmp/vyos_installation.iso' + +# default boot variables +DEFAULT_BOOT_VARS: dict[str, str] = { +    'timeout': '5', +    'console_type': 'tty', +    'console_num': '0', +    'bootmode': 'normal' +} + + +def bytes_to_gb(size: int) -> float: +    """Convert Bytes to GBytes, rounded to 1 decimal number + +    Args: +        size (int): input size in bytes + +    Returns: +        float: size in GB +    """ +    return round(size / 1024**3, 1) + + +def gb_to_bytes(size: float) -> int: +    """Convert GBytes to Bytes + +    Args: +        size (float): input size in GBytes + +    Returns: +        int: size in bytes +    """ +    return int(size * 1024**3) + + +def find_disk() -> tuple[str, int]: +    """Find a target disk for installation + +    Returns: +        tuple[str, int]: disk name and size in bytes +    """ +    # check for available disks +    disks_available: dict[str, int] = disk.disks_size() +    for disk_name, disk_size in disks_available.copy().items(): +        if disk_size < CONST_MIN_DISK_SIZE: +            del disks_available[disk_name] +    if not disks_available: +        print(MSG_ERR_NO_DISK) +        exit(MSG_INFO_INSTALL_EXIT) + +    # select one as a target +    print(MSG_INFO_INSTALL_DISKS_LIST) +    default_disk: str = list(disks_available)[0] +    for disk_name, disk_size in disks_available.items(): +        disk_size_human: str = bytes_to_gb(disk_size) +        print(f'Drive: {disk_name} ({disk_size_human} GB)') +    disk_selected: str = ask_input(MSG_INFO_INSTALL_DISK_SELECT, +                                   default=default_disk, +                                   valid_responses=list(disks_available)) + +    return disk_selected, disks_available[disk_selected] + + +def ask_root_size(available_space: int) -> int: +    """Define a size of root partition + +    Args: +        available_space (int): available space in bytes for a root partition + +    Returns: +        int: defined size +    """ +    if ask_yes_no(MSG_INPUT_ROOT_SIZE_ALL, default=True): +        return available_space + +    while True: +        root_size_gb: str = ask_input(MSG_INPUT_ROOT_SIZE_SET) +        root_size_kbytes: int = (gb_to_bytes(float(root_size_gb))) // 1024 + +        if root_size_kbytes > available_space: +            print(MSG_WARN_ROOT_SIZE_TOOBIG) +            continue +        if root_size_kbytes < CONST_MIN_ROOT_SIZE / 1024: +            print(MSG_WARN_ROOT_SIZE_TOOSMALL) +            continue + +        return root_size_kbytes + + +def prepare_tmp_disr() -> None: +    """Create temporary directories for installation +    """ +    print('Creating temporary directories') +    for dir in [DIR_ROOTFS_SRC, DIR_ROOTFS_DST, DIR_DST_ROOT]: +        dirpath = Path(dir) +        dirpath.mkdir(mode=0o755, parents=True) + + +def setup_grub(root_dir: str) -> None: +    """Install GRUB configurations + +    Args: +        root_dir (str): a path to the root of target filesystem +    """ +    print('Installing GRUB configuration files') +    grub_cfg_main = f'{root_dir}/{grub.GRUB_DIR_MAIN}/grub.cfg' +    grub_cfg_vars = f'{root_dir}/{grub.CFG_VYOS_VARS}' +    grub_cfg_modules = f'{root_dir}/{grub.CFG_VYOS_MODULES}' +    grub_cfg_menu = f'{root_dir}/{grub.CFG_VYOS_MENU}' +    grub_cfg_options = f'{root_dir}/{grub.CFG_VYOS_OPTIONS}' + +    # create new files +    render(grub_cfg_main, grub.TMPL_GRUB_MAIN, {}) +    grub.common_write(root_dir) +    grub.vars_write(grub_cfg_vars, DEFAULT_BOOT_VARS) +    grub.modules_write(grub_cfg_modules, []) +    grub.write_cfg_ver(1, root_dir) +    render(grub_cfg_menu, grub.TMPL_GRUB_MENU, {}) +    render(grub_cfg_options, grub.TMPL_GRUB_OPTS, {}) + + +def configure_authentication(config_file: str, password: str) -> None: +    config = ConfigTree(config_file) +    config.set([ +        'system', 'login', 'user', 'vyos', 'authentication', +        'plaintext-password' +    ], +               value=password, +               replace=True) +    config.set_tag(['system', 'login', 'user']) + + +def validate_signature(file_path: str, sign_type: str) -> None: +    """Validate a file by signature and delete a signature file + +    Args: +        file_path (str): a path to file +        sign_type (str): a signature type +    """ +    print('Validating signature') +    signature_valid: bool = False +    # validate with minisig +    if sign_type == 'minisig': +        for pubkey in [ +                '/usr/share/vyos/keys/vyos-release.minisign.pub', +                '/usr/share/vyos/keys/vyos-backup.minisign.pub' +        ]: +            if run(f'minisign -V -q -p {pubkey} -m {file_path} -x {file_path}.minisig' +                  ) == 0: +                signature_valid = True +                break +        Path(f'{file_path}.minisig').unlink() +    # validate with GPG +    if sign_type == 'asc': +        if run(f'gpg --verify ${file_path}.asc ${file_path}') == 0: +            signature_valid = True +        Path(f'{file_path}.asc').unlink() + +    # warn or pass +    if not signature_valid: +        if not ask_yes_no(MSG_WARN_ISO_SIGN_INVALID, default=False): +            exit(MSG_INFO_INSTALL_EXIT) +    else: +        print('Signature is valid') + + +def image_fetch(image_path: str) -> Path: +    """Fetch an ISO image + +    Args: +        image_path (str): a path, remote or local + +    Returns: +        Path: a path to a local file +    """ +    try: +        # check a type of path +        if urlparse(image_path).scheme: +            # download an image +            download(ISO_DOWNLOAD_PATH, image_path, True, True) +            # download a signature +            sign_file = (False, '') +            for sign_type in ['minisig', 'asc']: +                try: +                    download(f'{ISO_DOWNLOAD_PATH}.{sign_type}', +                             f'{image_path}.{sign_type}') +                    sign_file = (True, sign_type) +                    break +                except Exception: +                    print(f'{sign_type} signature is not available') +            # validate a signature if it is available +            if sign_file[0]: +                validate_signature(ISO_DOWNLOAD_PATH, sign_file[1]) +            else: +                if not ask_yes_no(MSG_WARN_ISO_SIGN_UNAVAL, default=False): +                    cleanup() +                    exit(MSG_INFO_INSTALL_EXIT) + +            return Path(ISO_DOWNLOAD_PATH) +        else: +            local_path: Path = Path(image_path) +            if local_path.is_file(): +                return local_path +            else: +                raise +    except Exception: +        print(f'The image cannot be fetched from: {image_path}') +        exit(1) + + +def migrate_config() -> bool: +    """Check for active config and ask user for migration + +    Returns: +        bool: user's decision +    """ +    active_config_path: Path = Path('/opt/vyatta/etc/config/config.boot') +    if active_config_path.exists(): +        if ask_yes_no(MSG_INPUT_CONFIG_FOUND, default=True): +            return True +    return False + + +def cleanup(mounts: list[str] = [], remove_items: list[str] = []) -> None: +    """Clean up after installation + +    Args: +        mounts (list[str], optional): List of mounts to unmount. +        Defaults to []. +        remove_items (list[str], optional): List of files or directories +        to remove. Defaults to []. +    """ +    print('Cleaning up') +    # clean up installation directory by default +    mounts_all = disk_partitions(all=True) +    for mounted_device in mounts_all: +        if mounted_device.mountpoint.startswith(DIR_INSTALLATION) and not ( +                mounted_device.device in mounts or +                mounted_device.mountpoint in mounts): +            mounts.append(mounted_device.mountpoint) +    # add installation dir to cleanup list +    if DIR_INSTALLATION not in remove_items: +        remove_items.append(DIR_INSTALLATION) +    # also delete an ISO file +    if Path(ISO_DOWNLOAD_PATH).exists( +    ) and ISO_DOWNLOAD_PATH not in remove_items: +        remove_items.append(ISO_DOWNLOAD_PATH) + +    if mounts: +        print('Unmounting target filesystems') +        for mountpoint in mounts: +            disk.partition_umount(mountpoint) +    if remove_items: +        print('Removing temporary files') +        for remove_item in remove_items: +            if Path(remove_item).exists(): +                if Path(remove_item).is_file(): +                    Path(remove_item).unlink() +                if Path(remove_item).is_dir(): +                    rmtree(remove_item) + + +def install_image() -> None: +    """Install an image to a disk +    """ +    if not image.is_live_boot(): +        exit(MSG_ERR_NOT_LIVE) + +    print(MSG_INFO_INSTALL_WELCOME) +    if not ask_yes_no('Would you like to continue?'): +        print(MSG_INFO_INSTALL_EXIT) +        exit() + +    try: +        # configure image name +        running_image_name: str = image.get_running_image() +        while True: +            image_name: str = ask_input(MSG_INPUT_IMAGE_NAME, +                                        running_image_name) +            if image.validate_name(image_name): +                break +            print(MSG_WARN_IMAGE_NAME_WRONG) + +        # define target drive +        install_target, target_size = find_disk() + +        # define target rootfs size in KB (smallest unit acceptable by sgdisk) +        availabe_size: int = (target_size - CONST_RESERVED_SPACE) // 1024 +        rootfs_size: int = ask_root_size(availabe_size) + +        # ask for password +        user_password: str = ask_input(MSG_INPUT_PASSWORD, default='vyos') + +        # ask for default console +        console_type: str = ask_input(MSG_INPUT_CONSOLE_TYPE, +                                      default='K', +                                      valid_responses=['K', 'S', 'U']) +        console_dict: dict[str, str] = {'K': 'tty', 'S': 'ttyS', 'U': 'ttyUSB'} + +        # create partitions +        if not ask_yes_no(MSG_INFO_INSTALL_DISK_CONFIRM): +            print(MSG_INFO_INSTALL_EXIT) +            exit() +        print(MSG_INFO_INSTALL_PARTITONING) +        disk.disk_cleanup(install_target) +        disk.parttable_create(install_target, rootfs_size) +        disk.filesystem_create(f'{install_target}2', 'efi') +        disk.filesystem_create(f'{install_target}3', 'ext4') + +        # create directiroes for installation media +        prepare_tmp_disr() + +        # mount target filesystem and create required dirs inside +        print('Mounting new partitions') +        disk.partition_mount(f'{install_target}3', DIR_DST_ROOT) +        Path(f'{DIR_DST_ROOT}/boot/efi').mkdir(parents=True) +        disk.partition_mount(f'{install_target}2', f'{DIR_DST_ROOT}/boot/efi') + +        # a config dir. It is the deepest one, so the comand will +        # create all the rest in a single step +        print('Creating a configuration file') +        target_config_dir: str = f'{DIR_DST_ROOT}/boot/{image_name}/rw/opt/vyatta/etc/config/' +        Path(target_config_dir).mkdir(parents=True) +        # copy config +        if migrate_config(): +            copy('/opt/vyatta/etc/config/config.boot', target_config_dir) +        else: +            copy('/opt/vyatta/etc/config.boot.default', +                 f'{target_config_dir}/config.boot') +        configure_authentication(f'{target_config_dir}/config.boot', +                                 user_password) +        Path(f'{target_config_dir}/.vyatta_config').touch() + +        # create a persistence.conf +        Path(f'{DIR_DST_ROOT}/persistence.conf').write_text('/ union\n') + +        # copy system image and kernel files +        print('Copying system image files') +        for file in Path(DIR_KERNEL_SRC).iterdir(): +            if file.is_file(): +                copy(file, f'{DIR_DST_ROOT}/boot/{image_name}/') +        copy(FILE_ROOTFS_SRC, +             f'{DIR_DST_ROOT}/boot/{image_name}/{image_name}.squashfs') + +        # install GRUB +        print('Installing GRUB to the drive') +        grub.install(install_target, f'{DIR_DST_ROOT}/boot/', +                     f'{DIR_DST_ROOT}/boot/efi') +        setup_grub(DIR_DST_ROOT) +        # add information about version +        grub.create_structure() +        grub.version_add(image_name, DIR_DST_ROOT) +        grub.set_default(image_name, DIR_DST_ROOT) +        grub.set_console_type(console_dict[console_type], DIR_DST_ROOT) + +        # umount filesystems and remove temporary files +        cleanup([f'{install_target}2', f'{install_target}3'], +                ['/mnt/installation']) + +        # we are done +        print(MSG_INFO_INSTALL_SUCCESS) +        exit() + +    except Exception as err: +        print(f'Unable to install VyOS: {err}') +        # unmount filesystems and clenup +        try: +            cleanup([f'{install_target}2', f'{install_target}3'], +                    ['/mnt/installation']) +        except Exception as err: +            print(f'Cleanup failed: {err}') + +        exit(1) + + +def add_image(image_path: str) -> None: +    """Add a new image + +    Args: +        image_path (str): a path to an ISO image +    """ +    if image.is_live_boot(): +        exit(MSG_ERR_LIVE) + +    # fetch an image +    iso_path: Path = image_fetch(image_path) +    try: +        # mount an ISO +        Path(DIR_ISO_MOUNT).mkdir(mode=0o755, parents=True) +        disk.partition_mount(iso_path, DIR_ISO_MOUNT, 'iso9660') + +        # check sums +        print('Validating image checksums') +        if run(f'cd {DIR_ISO_MOUNT} && sha256sum --status -c sha256sum.txt'): +            cleanup() +            exit('Image checksum verification failed.') + +        # mount rootfs (to get a system version) +        Path(DIR_ROOTFS_SRC).mkdir(mode=0o755, parents=True) +        disk.partition_mount(f'{DIR_ISO_MOUNT}/live/filesystem.squashfs', +                             DIR_ROOTFS_SRC, 'squashfs') +        version_file: str = Path( +            f'{DIR_ROOTFS_SRC}/opt/vyatta/etc/version').read_text() +        disk.partition_umount(f'{DIR_ISO_MOUNT}/live/filesystem.squashfs') +        version_name: str = version_file.lstrip('Version: ').strip() +        image_name: str = ask_input(MSG_INPUT_IMAGE_NAME, version_name) +        set_as_default: bool = ask_yes_no(MSG_INPUT_IMAGE_DEFAULT, default=True) + +        # find target directory +        root_dir: str = disk.find_persistence() + +        # a config dir. It is the deepest one, so the comand will +        # create all the rest in a single step +        target_config_dir: str = f'{root_dir}/boot/{image_name}/rw/opt/vyatta/etc/config/' +        # copy config +        if migrate_config(): +            print('Copying configuration directory') +            copytree('/opt/vyatta/etc/config/', target_config_dir) +        else: +            Path(target_config_dir).mkdir(parents=True) +            Path(f'{target_config_dir}/.vyatta_config').touch() + +        # copy system image and kernel files +        print('Copying system image files') +        for file in Path(f'{DIR_ISO_MOUNT}/live').iterdir(): +            if file.is_file() and (file.match('initrd*') or +                                   file.match('vmlinuz*')): +                copy(file, f'{root_dir}/boot/{image_name}/') +        copy(f'{DIR_ISO_MOUNT}/live/filesystem.squashfs', +             f'{root_dir}/boot/{image_name}/{image_name}.squashfs') + +        # unmount an ISO and cleanup +        cleanup([str(iso_path)]) + +        # add information about version +        grub.version_add(image_name, root_dir) +        if set_as_default: +            grub.set_default(image_name, root_dir) + +    except Exception as err: +        # unmount an ISO and cleanup +        cleanup([str(iso_path)]) +        exit(f'Whooops: {err}') + + +def parse_arguments() -> Namespace: +    """Parse arguments + +    Returns: +        Namespace: a namespace with parsed arguments +    """ +    parser: ArgumentParser = ArgumentParser( +        description='Install new system images') +    parser.add_argument('--action', +                        choices=['install', 'add'], +                        required=True, +                        help='action to perform with an image') +    parser.add_argument( +        '--image_path', +        help='a path (HTTP or local file) to an image that needs to be installed' +    ) +    # parser.add_argument('--image_new_name', help='a new name for image') +    args: Namespace = parser.parse_args() +    # Validate arguments +    if args.action == 'add' and not args.image_path: +        exit('A path to image is required for add action') + +    return args + + +if __name__ == '__main__': +    try: +        args: Namespace = parse_arguments() +        if args.action == 'install': +            install_image() +        if args.action == 'add': +            add_image(args.image_path) + +        exit() + +    except KeyboardInterrupt: +        print('Stopped by Ctrl+C') +        cleanup() +        exit() + +    except Exception as err: +        exit(f'{err}') diff --git a/src/op_mode/image_manager.py b/src/op_mode/image_manager.py new file mode 100644 index 000000000..ac889da38 --- /dev/null +++ b/src/op_mode/image_manager.py @@ -0,0 +1,190 @@ +#!/usr/bin/env python3 +# +# Copyright 2022 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This file is part of VyOS. +# +# VyOS is free software: you can redistribute it and/or modify it under the +# terms of the GNU General Public License as published by the Free Software +# Foundation, either version 3 of the License, or (at your option) any later +# version. +# +# VyOS is distributed in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# VyOS. If not, see <https://www.gnu.org/licenses/>. + +from argparse import ArgumentParser, Namespace +from pathlib import Path +from shutil import rmtree +from sys import exit + +from vyos.system import disk, grub, image +from vyos.util import ask_yes_no + + +def delete_image(image_name: str) -> None: +    """Remove installed image files and boot entry + +    Args: +        image_name (str): a name of image to delete +    """ +    if image_name == image.get_running_image(): +        exit('Currently running image cannot be deleted') +    if image_name == image.get_default_image(): +        exit('Default image cannot be deleted') +    available_images: list[str] = grub.version_list() +    if image_name not in available_images: +        exit(f'The image "{image_name}" cannot be found') +    presistence_storage: str = disk.find_persistence() +    if not presistence_storage: +        exit('Persistence storage cannot be found') + +    if not ask_yes_no(f'Do you really want to delete the image {image_name}?', +                      default=False): +        exit() + +    # remove files and menu entry +    version_path: Path = Path(f'{presistence_storage}/boot/{image_name}') +    try: +        rmtree(version_path) +        grub.version_del(image_name, presistence_storage) +        print(f'The image "{image_name}" was successfully deleted') +    except Exception as err: +        exit(f'Unable to remove the image "{image_name}": {err}') + + +def set_image(image_name: str) -> None: +    """Set default boot image + +    Args: +        image_name (str): an image name +    """ +    if image_name == image.get_default_image(): +        exit(f'The image "{image_name}" already configured as default') +    available_images: list[str] = grub.version_list() +    if image_name not in available_images: +        exit(f'The image "{image_name}" cannot be found') +    presistence_storage: str = disk.find_persistence() +    if not presistence_storage: +        exit('Persistence storage cannot be found') + +    if not ask_yes_no( +            f'Do you really want to set the image {image_name} ' +            'as default boot image?', +            default=False): +        exit() + +    # set default boot image +    try: +        grub.set_default(image_name, presistence_storage) +        print(f'The image "{image_name}" is now default boot image') +    except Exception as err: +        exit(f'Unable to set default image "{image_name}": {err}') + + +def rename_image(name_old: str, name_new: str) -> None: +    """Rename installed image + +    Args: +        name_old (str): old name +        name_new (str): new name +    """ +    if name_old == image.get_running_image(): +        exit('Currently running image cannot be renamed') +    available_images: list[str] = grub.version_list() +    if name_old not in available_images: +        exit(f'The image "{name_old}" cannot be found') +    if name_new in available_images: +        exit(f'The image "{name_new}" already exists') +    if not image.validate_name(name_new): +        exit(f'The image name "{name_new}" is not allowed') + +    presistence_storage: str = disk.find_persistence() +    if not presistence_storage: +        exit('Persistence storage cannot be found') + +    if not ask_yes_no( +            f'Do you really want to rename the image {name_old} ' +            f'to the {name_new}?', +            default=False): +        exit() + +    try: +        # replace default boot item +        if name_old == image.get_default_image(): +            grub.set_default(name_new, presistence_storage) + +        # rename files and dirs +        old_path: Path = Path(f'{presistence_storage}/boot/{name_old}') +        new_path: Path = Path(f'{presistence_storage}/boot/{name_new}') +        old_path.rename(new_path) + +        # replace boot item +        grub.version_del(name_old, presistence_storage) +        grub.version_add(name_new, presistence_storage) + +        print(f'The image "{name_old}" was renamed to "{name_new}"') +    except Exception as err: +        exit(f'Unable to rename image "{name_old}" to "{name_new}": {err}') + + +def list_images() -> None: +    """Print list of available images for CLI hints""" +    images_list: list[str] = grub.version_list() +    for image_name in images_list: +        print(image_name) + + +def parse_arguments() -> Namespace: +    """Parse arguments + +    Returns: +        Namespace: a namespace with parsed arguments +    """ +    parser: ArgumentParser = ArgumentParser(description='Manage system images') +    parser.add_argument('--action', +                        choices=['delete', 'set', 'rename', 'list'], +                        required=True, +                        help='action to perform with an image') +    parser.add_argument( +        '--image_name', +        help= +        'a name of an image to add, delete, install, rename, or set as default') +    parser.add_argument('--image_new_name', help='a new name for image') +    args: Namespace = parser.parse_args() +    # Validate arguments +    if args.action == 'delete' and not args.image_name: +        exit('An image name is required for delete action') +    if args.action == 'set' and not args.image_name: +        exit('An image name is required for set action') +    if args.action == 'rename' and (not args.image_name or +                                    not args.image_new_name): +        exit('Both old and new image names are required for rename action') + +    return args + + +if __name__ == '__main__': +    try: +        args: Namespace = parse_arguments() +        if args.action == 'delete': +            delete_image(args.image_name) +        if args.action == 'set': +            set_image(args.image_name) +        if args.action == 'rename': +            rename_image(args.image_name, args.image_new_name) +        if args.action == 'list': +            list_images() + +        exit() + +    except KeyboardInterrupt: +        print('Stopped by Ctrl+C') +        exit() + +    except Exception as err: +        exit(f'{err}') diff --git a/src/system/grub_update.py b/src/system/grub_update.py new file mode 100644 index 000000000..ebdc73af0 --- /dev/null +++ b/src/system/grub_update.py @@ -0,0 +1,211 @@ +#!/usr/bin/env python3 +# +# Copyright 2022 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This file is part of VyOS. +# +# VyOS is free software: you can redistribute it and/or modify it under the +# terms of the GNU General Public License as published by the Free Software +# Foundation, either version 3 of the License, or (at your option) any later +# version. +# +# VyOS is distributed in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# VyOS. If not, see <https://www.gnu.org/licenses/>. + +from pathlib import Path +from re import compile, MULTILINE, DOTALL +from sys import exit + +from vyos.system import disk, grub, image +from vyos.template import render + +# define configuration version +CFG_VER = 1 + +# define regexes and variables +REGEX_VERSION = r'^menuentry "[^\n]*{\n[^}]*\s+linux /boot/(?P<version>\S+)/[^}]*}' +REGEX_MENUENTRY = r'^menuentry "[^\n]*{\n[^}]*\s+linux /boot/(?P<version>\S+)/vmlinuz (?P<options>[^\n]+)\n[^}]*}' +REGEX_CONSOLE = r'^.*console=(?P<console_type>[^\s\d]+)(?P<console_num>[\d]+).*$' +REGEX_SANIT_CONSOLE = r'\ ?console=[^\s\d]+[\d]+(,\d+)?\ ?' +REGEX_SANIT_INIT = r'\ ?init=\S*\ ?' +PW_RESET_OPTION = 'init=/opt/vyatta/sbin/standalone_root_pw_reset' + + +def cfg_check_update() -> bool: +    """Check if GRUB structure update is required + +    Returns: +        bool: False if not required, True if required +    """ +    current_ver = grub.get_cfg_ver() +    if current_ver and current_ver >= CFG_VER: +        return False +    else: +        return True + + +def find_versions(menu_entries: list) -> list: +    """Find unique VyOS versions from menu entries + +    Args: +        menu_entries (list): a list with menu entries + +    Returns: +        list: List of installed versions +    """ +    versions = [] +    for vyos_ver in menu_entries: +        versions.append(vyos_ver.get('version')) +    # remove duplicates +    versions = list(set(versions)) +    return versions + + +def filter_unparsed(grub_path: str) -> str: +    """Find currently installed VyOS version + +    Args: +        grub_path (str): a path to the grub.cfg file + +    Returns: +        str: unparsed grub.cfg items +    """ +    config_text = Path(grub_path).read_text() +    regex_filter = compile(REGEX_VERSION, MULTILINE | DOTALL) +    filtered = regex_filter.sub('', config_text) +    regex_filter = compile(grub.REGEX_GRUB_VARS, MULTILINE) +    filtered = regex_filter.sub('', filtered) +    regex_filter = compile(grub.REGEX_GRUB_MODULES, MULTILINE) +    filtered = regex_filter.sub('', filtered) +    # strip extra new lines +    filtered = filtered.strip() +    return filtered + + +def sanitize_boot_opts(boot_opts: str) -> str: +    """Sanitize boot options from console and init + +    Args: +        boot_opts (str): boot options + +    Returns: +        str: sanitized boot options +    """ +    regex_filter = compile(REGEX_SANIT_CONSOLE) +    boot_opts = regex_filter.sub('', boot_opts) +    regex_filter = compile(REGEX_SANIT_INIT) +    boot_opts = regex_filter.sub('', boot_opts) + +    return boot_opts + + +def parse_entry(entry: tuple) -> dict: +    """Parse GRUB menuentry + +    Args: +        entry (tuple): tuple of (version, options) + +    Returns: +        dict: dictionary with parsed options +    """ +    # save version to dict +    entry_dict = {'version': entry[0]} +    # detect boot mode type +    if PW_RESET_OPTION in entry[1]: +        entry_dict['bootmode'] = 'pw_reset' +    else: +        entry_dict['bootmode'] = 'normal' +    # find console type and number +    regex_filter = compile(REGEX_CONSOLE) +    entry_dict.update(regex_filter.match(entry[1]).groupdict()) +    entry_dict['boot_opts'] = sanitize_boot_opts(entry[1]) + +    return entry_dict + + +def parse_menuntries(grub_path: str) -> list: +    """Parse all GRUB menuentries + +    Args: +        grub_path (str): a path to GRUB config file + +    Returns: +        list: list with menu items (each item is a dict) +    """ +    menuentries = [] +    # read configuration file +    config_text = Path(grub_path).read_text() +    # parse menuentries to tuples (version, options) +    regex_filter = compile(REGEX_MENUENTRY, MULTILINE) +    filter_results = regex_filter.findall(config_text) +    # parse each entry +    for entry in filter_results: +        menuentries.append(parse_entry(entry)) + +    return menuentries + + +if __name__ == '__main__': +    # Skip everything if update is not required +    if not cfg_check_update(): +        exit(0) + +    # find root directory of persistent storage +    root_dir = disk.find_persistence() + +    # read current GRUB config +    grub_cfg_main = f'{root_dir}/{image.GRUB_DIR_MAIN}/grub.cfg' +    vars = grub.vars_read(grub_cfg_main) +    modules = grub.modules_read(grub_cfg_main) +    vyos_menuentries = parse_menuntries(grub_cfg_main) +    vyos_versions = find_versions(vyos_menuentries) +    unparsed_items = filter_unparsed(grub_cfg_main) + +    # find default values +    default_entry = vyos_menuentries[int(vars['default'])] +    default_settings = { +        'default': grub.gen_version_uuid(default_entry['version']), +        'bootmode': default_entry['bootmode'], +        'console_type': default_entry['console_type'], +        'console_num': default_entry['console_num'] +    } +    vars.update(default_settings) + +    # print(f'vars: {vars}') +    # print(f'modules: {modules}') +    # print(f'vyos_menuentries: {vyos_menuentries}') +    # print(f'unparsed_items: {unparsed_items}') + +    # create new files +    grub_cfg_vars = f'{root_dir}/{image.CFG_VYOS_VARS}' +    grub_cfg_modules = f'{root_dir}/{grub.CFG_VYOS_MODULES}' +    grub_cfg_platform = f'{root_dir}/{grub.CFG_VYOS_PLATFORM}' +    grub_cfg_menu = f'{root_dir}/{grub.CFG_VYOS_MENU}' +    grub_cfg_options = f'{root_dir}/{grub.CFG_VYOS_OPTIONS}' + +    render(grub_cfg_main, grub.TMPL_GRUB_MAIN, {}) +    Path(image.GRUB_DIR_VYOS).mkdir(exist_ok=True) +    grub.vars_write(grub_cfg_vars, vars) +    grub.modules_write(grub_cfg_modules, modules) +    # Path(grub_cfg_platform).write_text(unparsed_items) +    grub.common_write() +    render(grub_cfg_menu, grub.TMPL_GRUB_MENU, {}) +    render(grub_cfg_options, grub.TMPL_GRUB_OPTS, {}) + +    # create menu entries +    for vyos_ver in vyos_versions: +        boot_opts = None +        for entry in vyos_menuentries: +            if entry.get('version') == vyos_ver and entry.get( +                    'bootmode') == 'normal': +                boot_opts = entry.get('boot_opts') +        grub.version_add(vyos_ver, root_dir, boot_opts) + +    # update structure version +    grub.write_cfg_ver(CFG_VER, root_dir) +    exit(0) diff --git a/src/systemd/vyos-grub-update.service b/src/systemd/vyos-grub-update.service new file mode 100644 index 000000000..522b13a33 --- /dev/null +++ b/src/systemd/vyos-grub-update.service @@ -0,0 +1,14 @@ +[Unit] +Description=Update GRUB loader configuration structure +After=local-fs.target +Before=vyos-router.service + +[Service] +Type=oneshot +ExecStart=/usr/libexec/vyos/system/grub_update.py +TimeoutSec=5 +KillMode=process +StandardOutput=journal+console + +[Install] +WantedBy=vyos-router.service
\ No newline at end of file | 
