summaryrefslogtreecommitdiff
path: root/src/op_mode/image_installer.py
diff options
context:
space:
mode:
authorzsdc <taras@vyos.io>2023-01-19 20:18:42 +0200
committerJohn Estabrook <jestabro@vyos.io>2023-11-15 11:29:04 -0600
commit8f94262e8fa2477700c50303ea6e2c6ddad72adb (patch)
tree99848c673c7b91091a032389ee088eb0439e6ec4 /src/op_mode/image_installer.py
parente085f3e6c21a41beee2c23d8525c2029a8e0b250 (diff)
downloadvyos-1x-8f94262e8fa2477700c50303ea6e2c6ddad72adb.tar.gz
vyos-1x-8f94262e8fa2477700c50303ea6e2c6ddad72adb.zip
image: T4516: Added system image tools
This commit adds the whole set of system image tools written from the scratch in Python that allows performing all the operations on images: * check information * perform installation and deletion * versions management Also, it contains a new service that will update the GRUB menu and keep tracking its version in the future. WARNING: The commit contains non-reversible changes. Because of boot menu changes, it will not be possible to manage images from older VyOS versions after an update.
Diffstat (limited to 'src/op_mode/image_installer.py')
-rw-r--r--src/op_mode/image_installer.py557
1 files changed, 557 insertions, 0 deletions
diff --git a/src/op_mode/image_installer.py b/src/op_mode/image_installer.py
new file mode 100644
index 000000000..6ebb38e46
--- /dev/null
+++ b/src/op_mode/image_installer.py
@@ -0,0 +1,557 @@
+#!/usr/bin/env python3
+#
+# Copyright 2022 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This file is part of VyOS.
+#
+# VyOS is free software: you can redistribute it and/or modify it under the
+# terms of the GNU General Public License as published by the Free Software
+# Foundation, either version 3 of the License, or (at your option) any later
+# version.
+#
+# VyOS is distributed in the hope that it will be useful, but WITHOUT ANY
+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
+# details.
+#
+# You should have received a copy of the GNU General Public License along with
+# VyOS. If not, see <https://www.gnu.org/licenses/>.
+
+from argparse import ArgumentParser, Namespace
+from pathlib import Path
+from shutil import copy, rmtree, copytree
+from sys import exit
+from urllib.parse import urlparse
+
+from psutil import disk_partitions
+
+from vyos.configtree import ConfigTree
+from vyos.remote import download
+from vyos.system import disk, grub, image
+from vyos.template import render
+from vyos.util import ask_input, ask_yes_no, run
+
+# define text messages
+MSG_ERR_NOT_LIVE: str = 'The system is already installed. Please use "add system image" instead.'
+MSG_ERR_LIVE: str = 'The system is in live-boot mode. Please use "install image" instead.'
+MSG_ERR_NO_DISK: str = 'No suitable disk was found. There must be at least one disk of 2GB or greater size.'
+MSG_INFO_INSTALL_WELCOME: str = 'Welcome to VyOS installation!\nThis command will install the VyOS to your permanent storage.'
+MSG_INFO_INSTALL_EXIT: str = 'Exitting from VyOS installation'
+MSG_INFO_INSTALL_SUCCESS: str = 'The image installed successfully'
+MSG_INFO_INSTALL_DISKS_LIST: str = 'Were found the next disks:'
+MSG_INFO_INSTALL_DISK_SELECT: str = 'Which one should be used for installation?'
+MSG_INFO_INSTALL_DISK_CONFIRM: str = 'Installation will delete all data on the drive. Continue?'
+MSG_INFO_INSTALL_PARTITONING: str = 'Creating partition table...'
+MSG_INPUT_CONFIG_FOUND: str = 'An active configuration was found. Would you like to copy it to the new image?'
+MSG_INPUT_IMAGE_NAME: str = 'What would you like to name this image?'
+MSG_INPUT_IMAGE_DEFAULT: str = 'Would you like to set a new image as default one for boot?'
+MSG_INPUT_PASSWORD: str = 'Please enter a password for the "vyos" user'
+MSG_INPUT_ROOT_SIZE_ALL: str = 'Would you like to use all free space on the drive?'
+MSG_INPUT_ROOT_SIZE_SET: str = 'What should be a size (in GB) of the root partition (min is 1.5 GB)?'
+MSG_INPUT_CONSOLE_TYPE: str = 'What console should be used by default? (K: KVM, S: Serial, U: USB-Serial)?'
+MSG_WARN_ISO_SIGN_INVALID: str = 'Signature is not valid. Do you want to continue with installation?'
+MSG_WARN_ISO_SIGN_UNAVAL: str = 'Signature is not available. Do you want to continue with installation?'
+MSG_WARN_ROOT_SIZE_TOOBIG: str = 'The size is too big. Try again.'
+MSG_WARN_ROOT_SIZE_TOOSMALL: str = 'The size is too small. Try again'
+MSG_WARN_IMAGE_NAME_WRONG: str = 'The suggested name is unsupported!\n'
+'It must be between 1 and 32 characters long and contains only the next characters: .+-_ a-z A-Z 0-9'
+CONST_MIN_DISK_SIZE: int = 2147483648 # 2 GB
+CONST_MIN_ROOT_SIZE: int = 1610612736 # 1.5 GB
+# a reserved space: 2MB for header, 1 MB for BIOS partition, 256 MB for EFI
+CONST_RESERVED_SPACE: int = (2 + 1 + 256) * 1024**2
+
+# define directories and paths
+DIR_INSTALLATION: str = '/mnt/installation'
+DIR_ROOTFS_SRC: str = f'{DIR_INSTALLATION}/root_src'
+DIR_ROOTFS_DST: str = f'{DIR_INSTALLATION}/root_dst'
+DIR_ISO_MOUNT: str = f'{DIR_INSTALLATION}/iso_src'
+DIR_DST_ROOT: str = f'{DIR_INSTALLATION}/disk_dst'
+DIR_KERNEL_SRC: str = '/boot/'
+FILE_ROOTFS_SRC: str = '/usr/lib/live/mount/medium/live/filesystem.squashfs'
+ISO_DOWNLOAD_PATH: str = '/tmp/vyos_installation.iso'
+
+# default boot variables
+DEFAULT_BOOT_VARS: dict[str, str] = {
+ 'timeout': '5',
+ 'console_type': 'tty',
+ 'console_num': '0',
+ 'bootmode': 'normal'
+}
+
+
+def bytes_to_gb(size: int) -> float:
+ """Convert Bytes to GBytes, rounded to 1 decimal number
+
+ Args:
+ size (int): input size in bytes
+
+ Returns:
+ float: size in GB
+ """
+ return round(size / 1024**3, 1)
+
+
+def gb_to_bytes(size: float) -> int:
+ """Convert GBytes to Bytes
+
+ Args:
+ size (float): input size in GBytes
+
+ Returns:
+ int: size in bytes
+ """
+ return int(size * 1024**3)
+
+
+def find_disk() -> tuple[str, int]:
+ """Find a target disk for installation
+
+ Returns:
+ tuple[str, int]: disk name and size in bytes
+ """
+ # check for available disks
+ disks_available: dict[str, int] = disk.disks_size()
+ for disk_name, disk_size in disks_available.copy().items():
+ if disk_size < CONST_MIN_DISK_SIZE:
+ del disks_available[disk_name]
+ if not disks_available:
+ print(MSG_ERR_NO_DISK)
+ exit(MSG_INFO_INSTALL_EXIT)
+
+ # select one as a target
+ print(MSG_INFO_INSTALL_DISKS_LIST)
+ default_disk: str = list(disks_available)[0]
+ for disk_name, disk_size in disks_available.items():
+ disk_size_human: str = bytes_to_gb(disk_size)
+ print(f'Drive: {disk_name} ({disk_size_human} GB)')
+ disk_selected: str = ask_input(MSG_INFO_INSTALL_DISK_SELECT,
+ default=default_disk,
+ valid_responses=list(disks_available))
+
+ return disk_selected, disks_available[disk_selected]
+
+
+def ask_root_size(available_space: int) -> int:
+ """Define a size of root partition
+
+ Args:
+ available_space (int): available space in bytes for a root partition
+
+ Returns:
+ int: defined size
+ """
+ if ask_yes_no(MSG_INPUT_ROOT_SIZE_ALL, default=True):
+ return available_space
+
+ while True:
+ root_size_gb: str = ask_input(MSG_INPUT_ROOT_SIZE_SET)
+ root_size_kbytes: int = (gb_to_bytes(float(root_size_gb))) // 1024
+
+ if root_size_kbytes > available_space:
+ print(MSG_WARN_ROOT_SIZE_TOOBIG)
+ continue
+ if root_size_kbytes < CONST_MIN_ROOT_SIZE / 1024:
+ print(MSG_WARN_ROOT_SIZE_TOOSMALL)
+ continue
+
+ return root_size_kbytes
+
+
+def prepare_tmp_disr() -> None:
+ """Create temporary directories for installation
+ """
+ print('Creating temporary directories')
+ for dir in [DIR_ROOTFS_SRC, DIR_ROOTFS_DST, DIR_DST_ROOT]:
+ dirpath = Path(dir)
+ dirpath.mkdir(mode=0o755, parents=True)
+
+
+def setup_grub(root_dir: str) -> None:
+ """Install GRUB configurations
+
+ Args:
+ root_dir (str): a path to the root of target filesystem
+ """
+ print('Installing GRUB configuration files')
+ grub_cfg_main = f'{root_dir}/{grub.GRUB_DIR_MAIN}/grub.cfg'
+ grub_cfg_vars = f'{root_dir}/{grub.CFG_VYOS_VARS}'
+ grub_cfg_modules = f'{root_dir}/{grub.CFG_VYOS_MODULES}'
+ grub_cfg_menu = f'{root_dir}/{grub.CFG_VYOS_MENU}'
+ grub_cfg_options = f'{root_dir}/{grub.CFG_VYOS_OPTIONS}'
+
+ # create new files
+ render(grub_cfg_main, grub.TMPL_GRUB_MAIN, {})
+ grub.common_write(root_dir)
+ grub.vars_write(grub_cfg_vars, DEFAULT_BOOT_VARS)
+ grub.modules_write(grub_cfg_modules, [])
+ grub.write_cfg_ver(1, root_dir)
+ render(grub_cfg_menu, grub.TMPL_GRUB_MENU, {})
+ render(grub_cfg_options, grub.TMPL_GRUB_OPTS, {})
+
+
+def configure_authentication(config_file: str, password: str) -> None:
+ config = ConfigTree(config_file)
+ config.set([
+ 'system', 'login', 'user', 'vyos', 'authentication',
+ 'plaintext-password'
+ ],
+ value=password,
+ replace=True)
+ config.set_tag(['system', 'login', 'user'])
+
+
+def validate_signature(file_path: str, sign_type: str) -> None:
+ """Validate a file by signature and delete a signature file
+
+ Args:
+ file_path (str): a path to file
+ sign_type (str): a signature type
+ """
+ print('Validating signature')
+ signature_valid: bool = False
+ # validate with minisig
+ if sign_type == 'minisig':
+ for pubkey in [
+ '/usr/share/vyos/keys/vyos-release.minisign.pub',
+ '/usr/share/vyos/keys/vyos-backup.minisign.pub'
+ ]:
+ if run(f'minisign -V -q -p {pubkey} -m {file_path} -x {file_path}.minisig'
+ ) == 0:
+ signature_valid = True
+ break
+ Path(f'{file_path}.minisig').unlink()
+ # validate with GPG
+ if sign_type == 'asc':
+ if run(f'gpg --verify ${file_path}.asc ${file_path}') == 0:
+ signature_valid = True
+ Path(f'{file_path}.asc').unlink()
+
+ # warn or pass
+ if not signature_valid:
+ if not ask_yes_no(MSG_WARN_ISO_SIGN_INVALID, default=False):
+ exit(MSG_INFO_INSTALL_EXIT)
+ else:
+ print('Signature is valid')
+
+
+def image_fetch(image_path: str) -> Path:
+ """Fetch an ISO image
+
+ Args:
+ image_path (str): a path, remote or local
+
+ Returns:
+ Path: a path to a local file
+ """
+ try:
+ # check a type of path
+ if urlparse(image_path).scheme:
+ # download an image
+ download(ISO_DOWNLOAD_PATH, image_path, True, True)
+ # download a signature
+ sign_file = (False, '')
+ for sign_type in ['minisig', 'asc']:
+ try:
+ download(f'{ISO_DOWNLOAD_PATH}.{sign_type}',
+ f'{image_path}.{sign_type}')
+ sign_file = (True, sign_type)
+ break
+ except Exception:
+ print(f'{sign_type} signature is not available')
+ # validate a signature if it is available
+ if sign_file[0]:
+ validate_signature(ISO_DOWNLOAD_PATH, sign_file[1])
+ else:
+ if not ask_yes_no(MSG_WARN_ISO_SIGN_UNAVAL, default=False):
+ cleanup()
+ exit(MSG_INFO_INSTALL_EXIT)
+
+ return Path(ISO_DOWNLOAD_PATH)
+ else:
+ local_path: Path = Path(image_path)
+ if local_path.is_file():
+ return local_path
+ else:
+ raise
+ except Exception:
+ print(f'The image cannot be fetched from: {image_path}')
+ exit(1)
+
+
+def migrate_config() -> bool:
+ """Check for active config and ask user for migration
+
+ Returns:
+ bool: user's decision
+ """
+ active_config_path: Path = Path('/opt/vyatta/etc/config/config.boot')
+ if active_config_path.exists():
+ if ask_yes_no(MSG_INPUT_CONFIG_FOUND, default=True):
+ return True
+ return False
+
+
+def cleanup(mounts: list[str] = [], remove_items: list[str] = []) -> None:
+ """Clean up after installation
+
+ Args:
+ mounts (list[str], optional): List of mounts to unmount.
+ Defaults to [].
+ remove_items (list[str], optional): List of files or directories
+ to remove. Defaults to [].
+ """
+ print('Cleaning up')
+ # clean up installation directory by default
+ mounts_all = disk_partitions(all=True)
+ for mounted_device in mounts_all:
+ if mounted_device.mountpoint.startswith(DIR_INSTALLATION) and not (
+ mounted_device.device in mounts or
+ mounted_device.mountpoint in mounts):
+ mounts.append(mounted_device.mountpoint)
+ # add installation dir to cleanup list
+ if DIR_INSTALLATION not in remove_items:
+ remove_items.append(DIR_INSTALLATION)
+ # also delete an ISO file
+ if Path(ISO_DOWNLOAD_PATH).exists(
+ ) and ISO_DOWNLOAD_PATH not in remove_items:
+ remove_items.append(ISO_DOWNLOAD_PATH)
+
+ if mounts:
+ print('Unmounting target filesystems')
+ for mountpoint in mounts:
+ disk.partition_umount(mountpoint)
+ if remove_items:
+ print('Removing temporary files')
+ for remove_item in remove_items:
+ if Path(remove_item).exists():
+ if Path(remove_item).is_file():
+ Path(remove_item).unlink()
+ if Path(remove_item).is_dir():
+ rmtree(remove_item)
+
+
+def install_image() -> None:
+ """Install an image to a disk
+ """
+ if not image.is_live_boot():
+ exit(MSG_ERR_NOT_LIVE)
+
+ print(MSG_INFO_INSTALL_WELCOME)
+ if not ask_yes_no('Would you like to continue?'):
+ print(MSG_INFO_INSTALL_EXIT)
+ exit()
+
+ try:
+ # configure image name
+ running_image_name: str = image.get_running_image()
+ while True:
+ image_name: str = ask_input(MSG_INPUT_IMAGE_NAME,
+ running_image_name)
+ if image.validate_name(image_name):
+ break
+ print(MSG_WARN_IMAGE_NAME_WRONG)
+
+ # define target drive
+ install_target, target_size = find_disk()
+
+ # define target rootfs size in KB (smallest unit acceptable by sgdisk)
+ availabe_size: int = (target_size - CONST_RESERVED_SPACE) // 1024
+ rootfs_size: int = ask_root_size(availabe_size)
+
+ # ask for password
+ user_password: str = ask_input(MSG_INPUT_PASSWORD, default='vyos')
+
+ # ask for default console
+ console_type: str = ask_input(MSG_INPUT_CONSOLE_TYPE,
+ default='K',
+ valid_responses=['K', 'S', 'U'])
+ console_dict: dict[str, str] = {'K': 'tty', 'S': 'ttyS', 'U': 'ttyUSB'}
+
+ # create partitions
+ if not ask_yes_no(MSG_INFO_INSTALL_DISK_CONFIRM):
+ print(MSG_INFO_INSTALL_EXIT)
+ exit()
+ print(MSG_INFO_INSTALL_PARTITONING)
+ disk.disk_cleanup(install_target)
+ disk.parttable_create(install_target, rootfs_size)
+ disk.filesystem_create(f'{install_target}2', 'efi')
+ disk.filesystem_create(f'{install_target}3', 'ext4')
+
+ # create directiroes for installation media
+ prepare_tmp_disr()
+
+ # mount target filesystem and create required dirs inside
+ print('Mounting new partitions')
+ disk.partition_mount(f'{install_target}3', DIR_DST_ROOT)
+ Path(f'{DIR_DST_ROOT}/boot/efi').mkdir(parents=True)
+ disk.partition_mount(f'{install_target}2', f'{DIR_DST_ROOT}/boot/efi')
+
+ # a config dir. It is the deepest one, so the comand will
+ # create all the rest in a single step
+ print('Creating a configuration file')
+ target_config_dir: str = f'{DIR_DST_ROOT}/boot/{image_name}/rw/opt/vyatta/etc/config/'
+ Path(target_config_dir).mkdir(parents=True)
+ # copy config
+ if migrate_config():
+ copy('/opt/vyatta/etc/config/config.boot', target_config_dir)
+ else:
+ copy('/opt/vyatta/etc/config.boot.default',
+ f'{target_config_dir}/config.boot')
+ configure_authentication(f'{target_config_dir}/config.boot',
+ user_password)
+ Path(f'{target_config_dir}/.vyatta_config').touch()
+
+ # create a persistence.conf
+ Path(f'{DIR_DST_ROOT}/persistence.conf').write_text('/ union\n')
+
+ # copy system image and kernel files
+ print('Copying system image files')
+ for file in Path(DIR_KERNEL_SRC).iterdir():
+ if file.is_file():
+ copy(file, f'{DIR_DST_ROOT}/boot/{image_name}/')
+ copy(FILE_ROOTFS_SRC,
+ f'{DIR_DST_ROOT}/boot/{image_name}/{image_name}.squashfs')
+
+ # install GRUB
+ print('Installing GRUB to the drive')
+ grub.install(install_target, f'{DIR_DST_ROOT}/boot/',
+ f'{DIR_DST_ROOT}/boot/efi')
+ setup_grub(DIR_DST_ROOT)
+ # add information about version
+ grub.create_structure()
+ grub.version_add(image_name, DIR_DST_ROOT)
+ grub.set_default(image_name, DIR_DST_ROOT)
+ grub.set_console_type(console_dict[console_type], DIR_DST_ROOT)
+
+ # umount filesystems and remove temporary files
+ cleanup([f'{install_target}2', f'{install_target}3'],
+ ['/mnt/installation'])
+
+ # we are done
+ print(MSG_INFO_INSTALL_SUCCESS)
+ exit()
+
+ except Exception as err:
+ print(f'Unable to install VyOS: {err}')
+ # unmount filesystems and clenup
+ try:
+ cleanup([f'{install_target}2', f'{install_target}3'],
+ ['/mnt/installation'])
+ except Exception as err:
+ print(f'Cleanup failed: {err}')
+
+ exit(1)
+
+
+def add_image(image_path: str) -> None:
+ """Add a new image
+
+ Args:
+ image_path (str): a path to an ISO image
+ """
+ if image.is_live_boot():
+ exit(MSG_ERR_LIVE)
+
+ # fetch an image
+ iso_path: Path = image_fetch(image_path)
+ try:
+ # mount an ISO
+ Path(DIR_ISO_MOUNT).mkdir(mode=0o755, parents=True)
+ disk.partition_mount(iso_path, DIR_ISO_MOUNT, 'iso9660')
+
+ # check sums
+ print('Validating image checksums')
+ if run(f'cd {DIR_ISO_MOUNT} && sha256sum --status -c sha256sum.txt'):
+ cleanup()
+ exit('Image checksum verification failed.')
+
+ # mount rootfs (to get a system version)
+ Path(DIR_ROOTFS_SRC).mkdir(mode=0o755, parents=True)
+ disk.partition_mount(f'{DIR_ISO_MOUNT}/live/filesystem.squashfs',
+ DIR_ROOTFS_SRC, 'squashfs')
+ version_file: str = Path(
+ f'{DIR_ROOTFS_SRC}/opt/vyatta/etc/version').read_text()
+ disk.partition_umount(f'{DIR_ISO_MOUNT}/live/filesystem.squashfs')
+ version_name: str = version_file.lstrip('Version: ').strip()
+ image_name: str = ask_input(MSG_INPUT_IMAGE_NAME, version_name)
+ set_as_default: bool = ask_yes_no(MSG_INPUT_IMAGE_DEFAULT, default=True)
+
+ # find target directory
+ root_dir: str = disk.find_persistence()
+
+ # a config dir. It is the deepest one, so the comand will
+ # create all the rest in a single step
+ target_config_dir: str = f'{root_dir}/boot/{image_name}/rw/opt/vyatta/etc/config/'
+ # copy config
+ if migrate_config():
+ print('Copying configuration directory')
+ copytree('/opt/vyatta/etc/config/', target_config_dir)
+ else:
+ Path(target_config_dir).mkdir(parents=True)
+ Path(f'{target_config_dir}/.vyatta_config').touch()
+
+ # copy system image and kernel files
+ print('Copying system image files')
+ for file in Path(f'{DIR_ISO_MOUNT}/live').iterdir():
+ if file.is_file() and (file.match('initrd*') or
+ file.match('vmlinuz*')):
+ copy(file, f'{root_dir}/boot/{image_name}/')
+ copy(f'{DIR_ISO_MOUNT}/live/filesystem.squashfs',
+ f'{root_dir}/boot/{image_name}/{image_name}.squashfs')
+
+ # unmount an ISO and cleanup
+ cleanup([str(iso_path)])
+
+ # add information about version
+ grub.version_add(image_name, root_dir)
+ if set_as_default:
+ grub.set_default(image_name, root_dir)
+
+ except Exception as err:
+ # unmount an ISO and cleanup
+ cleanup([str(iso_path)])
+ exit(f'Whooops: {err}')
+
+
+def parse_arguments() -> Namespace:
+ """Parse arguments
+
+ Returns:
+ Namespace: a namespace with parsed arguments
+ """
+ parser: ArgumentParser = ArgumentParser(
+ description='Install new system images')
+ parser.add_argument('--action',
+ choices=['install', 'add'],
+ required=True,
+ help='action to perform with an image')
+ parser.add_argument(
+ '--image_path',
+ help='a path (HTTP or local file) to an image that needs to be installed'
+ )
+ # parser.add_argument('--image_new_name', help='a new name for image')
+ args: Namespace = parser.parse_args()
+ # Validate arguments
+ if args.action == 'add' and not args.image_path:
+ exit('A path to image is required for add action')
+
+ return args
+
+
+if __name__ == '__main__':
+ try:
+ args: Namespace = parse_arguments()
+ if args.action == 'install':
+ install_image()
+ if args.action == 'add':
+ add_image(args.image_path)
+
+ exit()
+
+ except KeyboardInterrupt:
+ print('Stopped by Ctrl+C')
+ cleanup()
+ exit()
+
+ except Exception as err:
+ exit(f'{err}')