diff options
author | zsdc <taras@vyos.io> | 2023-01-19 20:18:42 +0200 |
---|---|---|
committer | John Estabrook <jestabro@vyos.io> | 2023-11-15 11:29:04 -0600 |
commit | 8f94262e8fa2477700c50303ea6e2c6ddad72adb (patch) | |
tree | 99848c673c7b91091a032389ee088eb0439e6ec4 /src/op_mode/image_installer.py | |
parent | e085f3e6c21a41beee2c23d8525c2029a8e0b250 (diff) | |
download | vyos-1x-8f94262e8fa2477700c50303ea6e2c6ddad72adb.tar.gz vyos-1x-8f94262e8fa2477700c50303ea6e2c6ddad72adb.zip |
image: T4516: Added system image tools
This commit adds the whole set of system image tools written from the scratch in
Python that allows performing all the operations on images:
* check information
* perform installation and deletion
* versions management
Also, it contains a new service that will update the GRUB menu and keep tracking
its version in the future.
WARNING: The commit contains non-reversible changes. Because of boot menu
changes, it will not be possible to manage images from older VyOS versions after
an update.
Diffstat (limited to 'src/op_mode/image_installer.py')
-rw-r--r-- | src/op_mode/image_installer.py | 557 |
1 files changed, 557 insertions, 0 deletions
diff --git a/src/op_mode/image_installer.py b/src/op_mode/image_installer.py new file mode 100644 index 000000000..6ebb38e46 --- /dev/null +++ b/src/op_mode/image_installer.py @@ -0,0 +1,557 @@ +#!/usr/bin/env python3 +# +# Copyright 2022 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This file is part of VyOS. +# +# VyOS is free software: you can redistribute it and/or modify it under the +# terms of the GNU General Public License as published by the Free Software +# Foundation, either version 3 of the License, or (at your option) any later +# version. +# +# VyOS is distributed in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# VyOS. If not, see <https://www.gnu.org/licenses/>. + +from argparse import ArgumentParser, Namespace +from pathlib import Path +from shutil import copy, rmtree, copytree +from sys import exit +from urllib.parse import urlparse + +from psutil import disk_partitions + +from vyos.configtree import ConfigTree +from vyos.remote import download +from vyos.system import disk, grub, image +from vyos.template import render +from vyos.util import ask_input, ask_yes_no, run + +# define text messages +MSG_ERR_NOT_LIVE: str = 'The system is already installed. Please use "add system image" instead.' +MSG_ERR_LIVE: str = 'The system is in live-boot mode. Please use "install image" instead.' +MSG_ERR_NO_DISK: str = 'No suitable disk was found. There must be at least one disk of 2GB or greater size.' +MSG_INFO_INSTALL_WELCOME: str = 'Welcome to VyOS installation!\nThis command will install the VyOS to your permanent storage.' +MSG_INFO_INSTALL_EXIT: str = 'Exitting from VyOS installation' +MSG_INFO_INSTALL_SUCCESS: str = 'The image installed successfully' +MSG_INFO_INSTALL_DISKS_LIST: str = 'Were found the next disks:' +MSG_INFO_INSTALL_DISK_SELECT: str = 'Which one should be used for installation?' +MSG_INFO_INSTALL_DISK_CONFIRM: str = 'Installation will delete all data on the drive. Continue?' +MSG_INFO_INSTALL_PARTITONING: str = 'Creating partition table...' +MSG_INPUT_CONFIG_FOUND: str = 'An active configuration was found. Would you like to copy it to the new image?' +MSG_INPUT_IMAGE_NAME: str = 'What would you like to name this image?' +MSG_INPUT_IMAGE_DEFAULT: str = 'Would you like to set a new image as default one for boot?' +MSG_INPUT_PASSWORD: str = 'Please enter a password for the "vyos" user' +MSG_INPUT_ROOT_SIZE_ALL: str = 'Would you like to use all free space on the drive?' +MSG_INPUT_ROOT_SIZE_SET: str = 'What should be a size (in GB) of the root partition (min is 1.5 GB)?' +MSG_INPUT_CONSOLE_TYPE: str = 'What console should be used by default? (K: KVM, S: Serial, U: USB-Serial)?' +MSG_WARN_ISO_SIGN_INVALID: str = 'Signature is not valid. Do you want to continue with installation?' +MSG_WARN_ISO_SIGN_UNAVAL: str = 'Signature is not available. Do you want to continue with installation?' +MSG_WARN_ROOT_SIZE_TOOBIG: str = 'The size is too big. Try again.' +MSG_WARN_ROOT_SIZE_TOOSMALL: str = 'The size is too small. Try again' +MSG_WARN_IMAGE_NAME_WRONG: str = 'The suggested name is unsupported!\n' +'It must be between 1 and 32 characters long and contains only the next characters: .+-_ a-z A-Z 0-9' +CONST_MIN_DISK_SIZE: int = 2147483648 # 2 GB +CONST_MIN_ROOT_SIZE: int = 1610612736 # 1.5 GB +# a reserved space: 2MB for header, 1 MB for BIOS partition, 256 MB for EFI +CONST_RESERVED_SPACE: int = (2 + 1 + 256) * 1024**2 + +# define directories and paths +DIR_INSTALLATION: str = '/mnt/installation' +DIR_ROOTFS_SRC: str = f'{DIR_INSTALLATION}/root_src' +DIR_ROOTFS_DST: str = f'{DIR_INSTALLATION}/root_dst' +DIR_ISO_MOUNT: str = f'{DIR_INSTALLATION}/iso_src' +DIR_DST_ROOT: str = f'{DIR_INSTALLATION}/disk_dst' +DIR_KERNEL_SRC: str = '/boot/' +FILE_ROOTFS_SRC: str = '/usr/lib/live/mount/medium/live/filesystem.squashfs' +ISO_DOWNLOAD_PATH: str = '/tmp/vyos_installation.iso' + +# default boot variables +DEFAULT_BOOT_VARS: dict[str, str] = { + 'timeout': '5', + 'console_type': 'tty', + 'console_num': '0', + 'bootmode': 'normal' +} + + +def bytes_to_gb(size: int) -> float: + """Convert Bytes to GBytes, rounded to 1 decimal number + + Args: + size (int): input size in bytes + + Returns: + float: size in GB + """ + return round(size / 1024**3, 1) + + +def gb_to_bytes(size: float) -> int: + """Convert GBytes to Bytes + + Args: + size (float): input size in GBytes + + Returns: + int: size in bytes + """ + return int(size * 1024**3) + + +def find_disk() -> tuple[str, int]: + """Find a target disk for installation + + Returns: + tuple[str, int]: disk name and size in bytes + """ + # check for available disks + disks_available: dict[str, int] = disk.disks_size() + for disk_name, disk_size in disks_available.copy().items(): + if disk_size < CONST_MIN_DISK_SIZE: + del disks_available[disk_name] + if not disks_available: + print(MSG_ERR_NO_DISK) + exit(MSG_INFO_INSTALL_EXIT) + + # select one as a target + print(MSG_INFO_INSTALL_DISKS_LIST) + default_disk: str = list(disks_available)[0] + for disk_name, disk_size in disks_available.items(): + disk_size_human: str = bytes_to_gb(disk_size) + print(f'Drive: {disk_name} ({disk_size_human} GB)') + disk_selected: str = ask_input(MSG_INFO_INSTALL_DISK_SELECT, + default=default_disk, + valid_responses=list(disks_available)) + + return disk_selected, disks_available[disk_selected] + + +def ask_root_size(available_space: int) -> int: + """Define a size of root partition + + Args: + available_space (int): available space in bytes for a root partition + + Returns: + int: defined size + """ + if ask_yes_no(MSG_INPUT_ROOT_SIZE_ALL, default=True): + return available_space + + while True: + root_size_gb: str = ask_input(MSG_INPUT_ROOT_SIZE_SET) + root_size_kbytes: int = (gb_to_bytes(float(root_size_gb))) // 1024 + + if root_size_kbytes > available_space: + print(MSG_WARN_ROOT_SIZE_TOOBIG) + continue + if root_size_kbytes < CONST_MIN_ROOT_SIZE / 1024: + print(MSG_WARN_ROOT_SIZE_TOOSMALL) + continue + + return root_size_kbytes + + +def prepare_tmp_disr() -> None: + """Create temporary directories for installation + """ + print('Creating temporary directories') + for dir in [DIR_ROOTFS_SRC, DIR_ROOTFS_DST, DIR_DST_ROOT]: + dirpath = Path(dir) + dirpath.mkdir(mode=0o755, parents=True) + + +def setup_grub(root_dir: str) -> None: + """Install GRUB configurations + + Args: + root_dir (str): a path to the root of target filesystem + """ + print('Installing GRUB configuration files') + grub_cfg_main = f'{root_dir}/{grub.GRUB_DIR_MAIN}/grub.cfg' + grub_cfg_vars = f'{root_dir}/{grub.CFG_VYOS_VARS}' + grub_cfg_modules = f'{root_dir}/{grub.CFG_VYOS_MODULES}' + grub_cfg_menu = f'{root_dir}/{grub.CFG_VYOS_MENU}' + grub_cfg_options = f'{root_dir}/{grub.CFG_VYOS_OPTIONS}' + + # create new files + render(grub_cfg_main, grub.TMPL_GRUB_MAIN, {}) + grub.common_write(root_dir) + grub.vars_write(grub_cfg_vars, DEFAULT_BOOT_VARS) + grub.modules_write(grub_cfg_modules, []) + grub.write_cfg_ver(1, root_dir) + render(grub_cfg_menu, grub.TMPL_GRUB_MENU, {}) + render(grub_cfg_options, grub.TMPL_GRUB_OPTS, {}) + + +def configure_authentication(config_file: str, password: str) -> None: + config = ConfigTree(config_file) + config.set([ + 'system', 'login', 'user', 'vyos', 'authentication', + 'plaintext-password' + ], + value=password, + replace=True) + config.set_tag(['system', 'login', 'user']) + + +def validate_signature(file_path: str, sign_type: str) -> None: + """Validate a file by signature and delete a signature file + + Args: + file_path (str): a path to file + sign_type (str): a signature type + """ + print('Validating signature') + signature_valid: bool = False + # validate with minisig + if sign_type == 'minisig': + for pubkey in [ + '/usr/share/vyos/keys/vyos-release.minisign.pub', + '/usr/share/vyos/keys/vyos-backup.minisign.pub' + ]: + if run(f'minisign -V -q -p {pubkey} -m {file_path} -x {file_path}.minisig' + ) == 0: + signature_valid = True + break + Path(f'{file_path}.minisig').unlink() + # validate with GPG + if sign_type == 'asc': + if run(f'gpg --verify ${file_path}.asc ${file_path}') == 0: + signature_valid = True + Path(f'{file_path}.asc').unlink() + + # warn or pass + if not signature_valid: + if not ask_yes_no(MSG_WARN_ISO_SIGN_INVALID, default=False): + exit(MSG_INFO_INSTALL_EXIT) + else: + print('Signature is valid') + + +def image_fetch(image_path: str) -> Path: + """Fetch an ISO image + + Args: + image_path (str): a path, remote or local + + Returns: + Path: a path to a local file + """ + try: + # check a type of path + if urlparse(image_path).scheme: + # download an image + download(ISO_DOWNLOAD_PATH, image_path, True, True) + # download a signature + sign_file = (False, '') + for sign_type in ['minisig', 'asc']: + try: + download(f'{ISO_DOWNLOAD_PATH}.{sign_type}', + f'{image_path}.{sign_type}') + sign_file = (True, sign_type) + break + except Exception: + print(f'{sign_type} signature is not available') + # validate a signature if it is available + if sign_file[0]: + validate_signature(ISO_DOWNLOAD_PATH, sign_file[1]) + else: + if not ask_yes_no(MSG_WARN_ISO_SIGN_UNAVAL, default=False): + cleanup() + exit(MSG_INFO_INSTALL_EXIT) + + return Path(ISO_DOWNLOAD_PATH) + else: + local_path: Path = Path(image_path) + if local_path.is_file(): + return local_path + else: + raise + except Exception: + print(f'The image cannot be fetched from: {image_path}') + exit(1) + + +def migrate_config() -> bool: + """Check for active config and ask user for migration + + Returns: + bool: user's decision + """ + active_config_path: Path = Path('/opt/vyatta/etc/config/config.boot') + if active_config_path.exists(): + if ask_yes_no(MSG_INPUT_CONFIG_FOUND, default=True): + return True + return False + + +def cleanup(mounts: list[str] = [], remove_items: list[str] = []) -> None: + """Clean up after installation + + Args: + mounts (list[str], optional): List of mounts to unmount. + Defaults to []. + remove_items (list[str], optional): List of files or directories + to remove. Defaults to []. + """ + print('Cleaning up') + # clean up installation directory by default + mounts_all = disk_partitions(all=True) + for mounted_device in mounts_all: + if mounted_device.mountpoint.startswith(DIR_INSTALLATION) and not ( + mounted_device.device in mounts or + mounted_device.mountpoint in mounts): + mounts.append(mounted_device.mountpoint) + # add installation dir to cleanup list + if DIR_INSTALLATION not in remove_items: + remove_items.append(DIR_INSTALLATION) + # also delete an ISO file + if Path(ISO_DOWNLOAD_PATH).exists( + ) and ISO_DOWNLOAD_PATH not in remove_items: + remove_items.append(ISO_DOWNLOAD_PATH) + + if mounts: + print('Unmounting target filesystems') + for mountpoint in mounts: + disk.partition_umount(mountpoint) + if remove_items: + print('Removing temporary files') + for remove_item in remove_items: + if Path(remove_item).exists(): + if Path(remove_item).is_file(): + Path(remove_item).unlink() + if Path(remove_item).is_dir(): + rmtree(remove_item) + + +def install_image() -> None: + """Install an image to a disk + """ + if not image.is_live_boot(): + exit(MSG_ERR_NOT_LIVE) + + print(MSG_INFO_INSTALL_WELCOME) + if not ask_yes_no('Would you like to continue?'): + print(MSG_INFO_INSTALL_EXIT) + exit() + + try: + # configure image name + running_image_name: str = image.get_running_image() + while True: + image_name: str = ask_input(MSG_INPUT_IMAGE_NAME, + running_image_name) + if image.validate_name(image_name): + break + print(MSG_WARN_IMAGE_NAME_WRONG) + + # define target drive + install_target, target_size = find_disk() + + # define target rootfs size in KB (smallest unit acceptable by sgdisk) + availabe_size: int = (target_size - CONST_RESERVED_SPACE) // 1024 + rootfs_size: int = ask_root_size(availabe_size) + + # ask for password + user_password: str = ask_input(MSG_INPUT_PASSWORD, default='vyos') + + # ask for default console + console_type: str = ask_input(MSG_INPUT_CONSOLE_TYPE, + default='K', + valid_responses=['K', 'S', 'U']) + console_dict: dict[str, str] = {'K': 'tty', 'S': 'ttyS', 'U': 'ttyUSB'} + + # create partitions + if not ask_yes_no(MSG_INFO_INSTALL_DISK_CONFIRM): + print(MSG_INFO_INSTALL_EXIT) + exit() + print(MSG_INFO_INSTALL_PARTITONING) + disk.disk_cleanup(install_target) + disk.parttable_create(install_target, rootfs_size) + disk.filesystem_create(f'{install_target}2', 'efi') + disk.filesystem_create(f'{install_target}3', 'ext4') + + # create directiroes for installation media + prepare_tmp_disr() + + # mount target filesystem and create required dirs inside + print('Mounting new partitions') + disk.partition_mount(f'{install_target}3', DIR_DST_ROOT) + Path(f'{DIR_DST_ROOT}/boot/efi').mkdir(parents=True) + disk.partition_mount(f'{install_target}2', f'{DIR_DST_ROOT}/boot/efi') + + # a config dir. It is the deepest one, so the comand will + # create all the rest in a single step + print('Creating a configuration file') + target_config_dir: str = f'{DIR_DST_ROOT}/boot/{image_name}/rw/opt/vyatta/etc/config/' + Path(target_config_dir).mkdir(parents=True) + # copy config + if migrate_config(): + copy('/opt/vyatta/etc/config/config.boot', target_config_dir) + else: + copy('/opt/vyatta/etc/config.boot.default', + f'{target_config_dir}/config.boot') + configure_authentication(f'{target_config_dir}/config.boot', + user_password) + Path(f'{target_config_dir}/.vyatta_config').touch() + + # create a persistence.conf + Path(f'{DIR_DST_ROOT}/persistence.conf').write_text('/ union\n') + + # copy system image and kernel files + print('Copying system image files') + for file in Path(DIR_KERNEL_SRC).iterdir(): + if file.is_file(): + copy(file, f'{DIR_DST_ROOT}/boot/{image_name}/') + copy(FILE_ROOTFS_SRC, + f'{DIR_DST_ROOT}/boot/{image_name}/{image_name}.squashfs') + + # install GRUB + print('Installing GRUB to the drive') + grub.install(install_target, f'{DIR_DST_ROOT}/boot/', + f'{DIR_DST_ROOT}/boot/efi') + setup_grub(DIR_DST_ROOT) + # add information about version + grub.create_structure() + grub.version_add(image_name, DIR_DST_ROOT) + grub.set_default(image_name, DIR_DST_ROOT) + grub.set_console_type(console_dict[console_type], DIR_DST_ROOT) + + # umount filesystems and remove temporary files + cleanup([f'{install_target}2', f'{install_target}3'], + ['/mnt/installation']) + + # we are done + print(MSG_INFO_INSTALL_SUCCESS) + exit() + + except Exception as err: + print(f'Unable to install VyOS: {err}') + # unmount filesystems and clenup + try: + cleanup([f'{install_target}2', f'{install_target}3'], + ['/mnt/installation']) + except Exception as err: + print(f'Cleanup failed: {err}') + + exit(1) + + +def add_image(image_path: str) -> None: + """Add a new image + + Args: + image_path (str): a path to an ISO image + """ + if image.is_live_boot(): + exit(MSG_ERR_LIVE) + + # fetch an image + iso_path: Path = image_fetch(image_path) + try: + # mount an ISO + Path(DIR_ISO_MOUNT).mkdir(mode=0o755, parents=True) + disk.partition_mount(iso_path, DIR_ISO_MOUNT, 'iso9660') + + # check sums + print('Validating image checksums') + if run(f'cd {DIR_ISO_MOUNT} && sha256sum --status -c sha256sum.txt'): + cleanup() + exit('Image checksum verification failed.') + + # mount rootfs (to get a system version) + Path(DIR_ROOTFS_SRC).mkdir(mode=0o755, parents=True) + disk.partition_mount(f'{DIR_ISO_MOUNT}/live/filesystem.squashfs', + DIR_ROOTFS_SRC, 'squashfs') + version_file: str = Path( + f'{DIR_ROOTFS_SRC}/opt/vyatta/etc/version').read_text() + disk.partition_umount(f'{DIR_ISO_MOUNT}/live/filesystem.squashfs') + version_name: str = version_file.lstrip('Version: ').strip() + image_name: str = ask_input(MSG_INPUT_IMAGE_NAME, version_name) + set_as_default: bool = ask_yes_no(MSG_INPUT_IMAGE_DEFAULT, default=True) + + # find target directory + root_dir: str = disk.find_persistence() + + # a config dir. It is the deepest one, so the comand will + # create all the rest in a single step + target_config_dir: str = f'{root_dir}/boot/{image_name}/rw/opt/vyatta/etc/config/' + # copy config + if migrate_config(): + print('Copying configuration directory') + copytree('/opt/vyatta/etc/config/', target_config_dir) + else: + Path(target_config_dir).mkdir(parents=True) + Path(f'{target_config_dir}/.vyatta_config').touch() + + # copy system image and kernel files + print('Copying system image files') + for file in Path(f'{DIR_ISO_MOUNT}/live').iterdir(): + if file.is_file() and (file.match('initrd*') or + file.match('vmlinuz*')): + copy(file, f'{root_dir}/boot/{image_name}/') + copy(f'{DIR_ISO_MOUNT}/live/filesystem.squashfs', + f'{root_dir}/boot/{image_name}/{image_name}.squashfs') + + # unmount an ISO and cleanup + cleanup([str(iso_path)]) + + # add information about version + grub.version_add(image_name, root_dir) + if set_as_default: + grub.set_default(image_name, root_dir) + + except Exception as err: + # unmount an ISO and cleanup + cleanup([str(iso_path)]) + exit(f'Whooops: {err}') + + +def parse_arguments() -> Namespace: + """Parse arguments + + Returns: + Namespace: a namespace with parsed arguments + """ + parser: ArgumentParser = ArgumentParser( + description='Install new system images') + parser.add_argument('--action', + choices=['install', 'add'], + required=True, + help='action to perform with an image') + parser.add_argument( + '--image_path', + help='a path (HTTP or local file) to an image that needs to be installed' + ) + # parser.add_argument('--image_new_name', help='a new name for image') + args: Namespace = parser.parse_args() + # Validate arguments + if args.action == 'add' and not args.image_path: + exit('A path to image is required for add action') + + return args + + +if __name__ == '__main__': + try: + args: Namespace = parse_arguments() + if args.action == 'install': + install_image() + if args.action == 'add': + add_image(args.image_path) + + exit() + + except KeyboardInterrupt: + print('Stopped by Ctrl+C') + cleanup() + exit() + + except Exception as err: + exit(f'{err}') |