diff options
author | Christian Breunig <christian@breunig.cc> | 2023-12-17 08:23:24 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-17 08:23:24 +0100 |
commit | f991faab2c0d95cbec5d46996b154145955572d7 (patch) | |
tree | 7b6157e73946826cdb4128cf5100ba627934136d /python | |
parent | d5375ce02376a91c07ec68f8d410a08dcdc57ef9 (diff) | |
parent | 8809d2e799ae1130b3328b081466b7d772a3da23 (diff) | |
download | vyos-1x-f991faab2c0d95cbec5d46996b154145955572d7.tar.gz vyos-1x-f991faab2c0d95cbec5d46996b154145955572d7.zip |
Merge pull request #2648 from jestabro/sagitta-image-tools
image-tools: T4516: revise system image tools
Diffstat (limited to 'python')
-rw-r--r-- | python/vyos/configsession.py | 6 | ||||
-rw-r--r-- | python/vyos/remote.py | 4 | ||||
-rw-r--r-- | python/vyos/system/__init__.py | 18 | ||||
-rw-r--r-- | python/vyos/system/compat.py | 316 | ||||
-rw-r--r-- | python/vyos/system/disk.py | 229 | ||||
-rw-r--r-- | python/vyos/system/grub.py | 342 | ||||
-rw-r--r-- | python/vyos/system/image.py | 268 | ||||
-rw-r--r-- | python/vyos/system/raid.py | 122 | ||||
-rw-r--r-- | python/vyos/utils/convert.py | 5 | ||||
-rw-r--r-- | python/vyos/utils/file.py | 6 | ||||
-rw-r--r-- | python/vyos/utils/io.py | 34 |
11 files changed, 1344 insertions, 6 deletions
diff --git a/python/vyos/configsession.py b/python/vyos/configsession.py index 9802ebae4..90842b749 100644 --- a/python/vyos/configsession.py +++ b/python/vyos/configsession.py @@ -30,8 +30,10 @@ SHOW_CONFIG = ['/bin/cli-shell-api', 'showConfig'] LOAD_CONFIG = ['/bin/cli-shell-api', 'loadFile'] MIGRATE_LOAD_CONFIG = ['/usr/libexec/vyos/vyos-load-config.py'] SAVE_CONFIG = ['/usr/libexec/vyos/vyos-save-config.py'] -INSTALL_IMAGE = ['/opt/vyatta/sbin/install-image', '--url'] -REMOVE_IMAGE = ['/opt/vyatta/bin/vyatta-boot-image.pl', '--del'] +INSTALL_IMAGE = ['/usr/libexec/vyos/op_mode/image_installer.py', + '--action', 'add', '--no-prompt', '--image-path'] +REMOVE_IMAGE = ['/usr/libexec/vyos/op_mode/image_manager.py', + '--action', 'delete', '--no-prompt', '--image-name'] GENERATE = ['/opt/vyatta/bin/vyatta-op-cmd-wrapper', 'generate'] SHOW = ['/opt/vyatta/bin/vyatta-op-cmd-wrapper', 'show'] RESET = ['/opt/vyatta/bin/vyatta-op-cmd-wrapper', 'reset'] diff --git a/python/vyos/remote.py b/python/vyos/remote.py index f86d2ef35..b1efcd10b 100644 --- a/python/vyos/remote.py +++ b/python/vyos/remote.py @@ -437,11 +437,13 @@ def urlc(urlstring, *args, **kwargs): raise ValueError(f'Unsupported URL scheme: "{scheme}"') def download(local_path, urlstring, progressbar=False, check_space=False, - source_host='', source_port=0, timeout=10.0): + source_host='', source_port=0, timeout=10.0, raise_error=False): try: progressbar = progressbar and is_interactive() urlc(urlstring, progressbar, check_space, source_host, source_port, timeout).download(local_path) except Exception as err: + if raise_error: + raise print_error(f'Unable to download "{urlstring}": {err}') except KeyboardInterrupt: print_error('\nDownload aborted by user.') diff --git a/python/vyos/system/__init__.py b/python/vyos/system/__init__.py new file mode 100644 index 000000000..0c91330ba --- /dev/null +++ b/python/vyos/system/__init__.py @@ -0,0 +1,18 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. + +__all_: list[str] = ['disk', 'grub', 'image'] +# define image-tools version +SYSTEM_CFG_VER = 1 diff --git a/python/vyos/system/compat.py b/python/vyos/system/compat.py new file mode 100644 index 000000000..319c3dabf --- /dev/null +++ b/python/vyos/system/compat.py @@ -0,0 +1,316 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this library. If not, see <http://www.gnu.org/licenses/>. + +from pathlib import Path +from re import compile, MULTILINE, DOTALL +from functools import wraps +from copy import deepcopy +from typing import Union + +from vyos.system import disk, grub, image, SYSTEM_CFG_VER +from vyos.template import render + +TMPL_GRUB_COMPAT: str = 'grub/grub_compat.j2' + +# define regexes and variables +REGEX_VERSION = r'^menuentry "[^\n]*{\n[^}]*\s+linux /boot/(?P<version>\S+)/[^}]*}' +REGEX_MENUENTRY = r'^menuentry "[^\n]*{\n[^}]*\s+linux /boot/(?P<version>\S+)/vmlinuz (?P<options>[^\n]+)\n[^}]*}' +REGEX_CONSOLE = r'^.*console=(?P<console_type>[^\s\d]+)(?P<console_num>[\d]+).*$' +REGEX_SANIT_CONSOLE = r'\ ?console=[^\s\d]+[\d]+(,\d+)?\ ?' +REGEX_SANIT_INIT = r'\ ?init=\S*\ ?' +REGEX_SANIT_QUIET = r'\ ?quiet\ ?' +PW_RESET_OPTION = 'init=/opt/vyatta/sbin/standalone_root_pw_reset' + + +class DowngradingImageTools(Exception): + """Raised when attempting to add an image with an earlier version + of image-tools than the current system, as indicated by the value + of SYSTEM_CFG_VER or absence thereof.""" + pass + + +def mode(): + if grub.get_cfg_ver() >= SYSTEM_CFG_VER: + return False + + return True + + +def find_versions(menu_entries: list) -> list: + """Find unique VyOS versions from menu entries + + Args: + menu_entries (list): a list with menu entries + + Returns: + list: List of installed versions + """ + versions = [] + for vyos_ver in menu_entries: + versions.append(vyos_ver.get('version')) + # remove duplicates + versions = list(set(versions)) + return versions + + +def filter_unparsed(grub_path: str) -> str: + """Find currently installed VyOS version + + Args: + grub_path (str): a path to the grub.cfg file + + Returns: + str: unparsed grub.cfg items + """ + config_text = Path(grub_path).read_text() + regex_filter = compile(REGEX_VERSION, MULTILINE | DOTALL) + filtered = regex_filter.sub('', config_text) + regex_filter = compile(grub.REGEX_GRUB_VARS, MULTILINE) + filtered = regex_filter.sub('', filtered) + regex_filter = compile(grub.REGEX_GRUB_MODULES, MULTILINE) + filtered = regex_filter.sub('', filtered) + # strip extra new lines + filtered = filtered.strip() + return filtered + + +def get_search_root(unparsed: str) -> str: + unparsed_lines = unparsed.splitlines() + search_root = next((x for x in unparsed_lines if 'search' in x), '') + return search_root + + +def sanitize_boot_opts(boot_opts: str) -> str: + """Sanitize boot options from console and init + + Args: + boot_opts (str): boot options + + Returns: + str: sanitized boot options + """ + regex_filter = compile(REGEX_SANIT_CONSOLE) + boot_opts = regex_filter.sub('', boot_opts) + regex_filter = compile(REGEX_SANIT_INIT) + boot_opts = regex_filter.sub('', boot_opts) + # legacy tools add 'quiet' on add system image; this is not desired + regex_filter = compile(REGEX_SANIT_QUIET) + boot_opts = regex_filter.sub(' ', boot_opts) + + return boot_opts + + +def parse_entry(entry: tuple) -> dict: + """Parse GRUB menuentry + + Args: + entry (tuple): tuple of (version, options) + + Returns: + dict: dictionary with parsed options + """ + # save version to dict + entry_dict = {'version': entry[0]} + # detect boot mode type + if PW_RESET_OPTION in entry[1]: + entry_dict['bootmode'] = 'pw_reset' + else: + entry_dict['bootmode'] = 'normal' + # find console type and number + regex_filter = compile(REGEX_CONSOLE) + entry_dict.update(regex_filter.match(entry[1]).groupdict()) + entry_dict['boot_opts'] = sanitize_boot_opts(entry[1]) + + return entry_dict + + +def parse_menuentries(grub_path: str) -> list: + """Parse all GRUB menuentries + + Args: + grub_path (str): a path to GRUB config file + + Returns: + list: list with menu items (each item is a dict) + """ + menuentries = [] + # read configuration file + config_text = Path(grub_path).read_text() + # parse menuentries to tuples (version, options) + regex_filter = compile(REGEX_MENUENTRY, MULTILINE) + filter_results = regex_filter.findall(config_text) + # parse each entry + for entry in filter_results: + menuentries.append(parse_entry(entry)) + + return menuentries + + +def prune_vyos_versions(root_dir: str = '') -> None: + """Delete vyos-versions files of registered images subsequently deleted + or renamed by legacy image-tools + + Args: + root_dir (str): an optional path to the root directory + """ + if not root_dir: + root_dir = disk.find_persistence() + + for version in grub.version_list(): + if not Path(f'{root_dir}/boot/{version}').is_dir(): + grub.version_del(version) + + +def update_cfg_ver(root_dir:str = '') -> int: + """Get minumum version of image-tools across all installed images + + Args: + root_dir (str): an optional path to the root directory + + Returns: + int: minimum version of image-tools + """ + if not root_dir: + root_dir = disk.find_persistence() + + prune_vyos_versions(root_dir) + + images_details = image.get_images_details() + cfg_version = min(d['tools_version'] for d in images_details) + + return cfg_version + + +def get_default(menu_entries: list, root_dir: str = '') -> Union[int, None]: + """Translate default version to menuentry index + + Args: + menu_entries (list): list of dicts of installed version boot data + root_dir (str): an optional path to the root directory + + Returns: + int: index of default version in menu_entries or None + """ + if not root_dir: + root_dir = disk.find_persistence() + + grub_cfg_main = f'{root_dir}/{grub.GRUB_CFG_MAIN}' + + image_name = image.get_default_image() + + sublist = list(filter(lambda x: x.get('version') == image_name, + menu_entries)) + if sublist: + return menu_entries.index(sublist[0]) + + return None + + +def update_version_list(root_dir: str = '') -> list[dict]: + """Update list of dicts of installed version boot data + + Args: + root_dir (str): an optional path to the root directory + + Returns: + list: list of dicts of installed version boot data + """ + if not root_dir: + root_dir = disk.find_persistence() + + grub_cfg_main = f'{root_dir}/{grub.GRUB_CFG_MAIN}' + + # get list of versions in menuentries + menu_entries = parse_menuentries(grub_cfg_main) + menu_versions = find_versions(menu_entries) + + # get list of versions added/removed by image-tools + current_versions = grub.version_list(root_dir) + + remove = list(set(menu_versions) - set(current_versions)) + for ver in remove: + menu_entries = list(filter(lambda x: x.get('version') != ver, + menu_entries)) + + add = list(set(current_versions) - set(menu_versions)) + for ver in add: + last = menu_entries[0].get('version') + new = deepcopy(list(filter(lambda x: x.get('version') == last, + menu_entries))) + for e in new: + boot_opts = e.get('boot_opts').replace(last, ver) + e.update({'version': ver, 'boot_opts': boot_opts}) + + menu_entries = new + menu_entries + + return menu_entries + + +def grub_cfg_fields(root_dir: str = '') -> dict: + """Gather fields for rendering grub.cfg + + Args: + root_dir (str): an optional path to the root directory + + Returns: + dict: dictionary for rendering TMPL_GRUB_COMPAT + """ + if not root_dir: + root_dir = disk.find_persistence() + + grub_cfg_main = f'{root_dir}/{grub.GRUB_CFG_MAIN}' + + fields = {'default': 0, 'timeout': 5} + # 'default' and 'timeout' from legacy grub.cfg + fields |= grub.vars_read(grub_cfg_main) + + fields['tools_version'] = SYSTEM_CFG_VER + menu_entries = update_version_list(root_dir) + fields['versions'] = menu_entries + + default = get_default(menu_entries, root_dir) + if default is not None: + fields['default'] = default + + modules = grub.modules_read(grub_cfg_main) + fields['modules'] = modules + + unparsed = filter_unparsed(grub_cfg_main).splitlines() + search_root = next((x for x in unparsed if 'search' in x), '') + fields['search_root'] = search_root + + return fields + + +def render_grub_cfg(root_dir: str = '') -> None: + """Render grub.cfg for legacy compatibility""" + if not root_dir: + root_dir = disk.find_persistence() + + grub_cfg_main = f'{root_dir}/{grub.GRUB_CFG_MAIN}' + + fields = grub_cfg_fields(root_dir) + render(grub_cfg_main, TMPL_GRUB_COMPAT, fields) + + +def grub_cfg_update(func): + """Decorator to update grub.cfg after function call""" + @wraps(func) + def wrapper(*args, **kwargs): + ret = func(*args, **kwargs) + if mode(): + render_grub_cfg() + return ret + return wrapper diff --git a/python/vyos/system/disk.py b/python/vyos/system/disk.py new file mode 100644 index 000000000..b8a2c0f35 --- /dev/null +++ b/python/vyos/system/disk.py @@ -0,0 +1,229 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. + +from json import loads as json_loads +from os import sync +from dataclasses import dataclass + +from psutil import disk_partitions + +from vyos.utils.process import run, cmd + + +@dataclass +class DiskDetails: + """Disk details""" + name: str + partition: dict[str, str] + + +def disk_cleanup(drive_path: str) -> None: + """Clean up disk partition table (MBR and GPT) + Remove partition and device signatures. + Zeroize primary and secondary headers - first and last 17408 bytes + (512 bytes * 34 LBA) on a drive + + Args: + drive_path (str): path to a drive that needs to be cleaned + """ + partitions: list[str] = partition_list(drive_path) + for partition in partitions: + run(f'wipefs -af {partition}') + run(f'wipefs -af {drive_path}') + run(f'sgdisk -Z {drive_path}') + + +def find_persistence() -> str: + """Find a mountpoint for persistence storage + + Returns: + str: Path where 'persistance' pertition is mounted, Empty if not found + """ + mounted_partitions = disk_partitions() + for partition in mounted_partitions: + if partition.mountpoint.endswith('/persistence'): + return partition.mountpoint + return '' + + +def parttable_create(drive_path: str, root_size: int) -> None: + """Create a hybrid MBR/GPT partition table + 0-2047 first sectors are free + 2048-4095 sectors - BIOS Boot Partition + 4096 + 256 MB - EFI system partition + Everything else till the end of a drive - Linux partition + + Args: + drive_path (str): path to a drive + """ + if not root_size: + root_size_text: str = '+100%' + else: + root_size_text: str = str(root_size) + command = f'sgdisk -a1 -n1:2048:4095 -t1:EF02 -n2:4096:+256M -t2:EF00 \ + -n3:0:+{root_size_text}K -t3:8300 {drive_path}' + + run(command) + # update partitons in kernel + sync() + run(f'partprobe {drive_path}') + + partitions: list[str] = partition_list(drive_path) + + disk: DiskDetails = DiskDetails( + name = drive_path, + partition = { + 'efi': next(x for x in partitions if x.endswith('2')), + 'root': next(x for x in partitions if x.endswith('3')) + } + ) + + return disk + + +def partition_list(drive_path: str) -> list[str]: + """Get a list of partitions on a drive + + Args: + drive_path (str): path to a drive + + Returns: + list[str]: a list of partition paths + """ + lsblk: str = cmd(f'lsblk -Jp {drive_path}') + drive_info: dict = json_loads(lsblk) + device: list = drive_info.get('blockdevices') + children: list[str] = device[0].get('children', []) if device else [] + partitions: list[str] = [child.get('name') for child in children] + return partitions + + +def partition_parent(partition_path: str) -> str: + """Get a parent device for a partition + + Args: + partition (str): path to a partition + + Returns: + str: path to a parent device + """ + parent: str = cmd(f'lsblk -ndpo pkname {partition_path}') + return parent + + +def from_partition(partition_path: str) -> DiskDetails: + drive_path: str = partition_parent(partition_path) + partitions: list[str] = partition_list(drive_path) + + disk: DiskDetails = DiskDetails( + name = drive_path, + partition = { + 'efi': next(x for x in partitions if x.endswith('2')), + 'root': next(x for x in partitions if x.endswith('3')) + } + ) + + return disk + +def filesystem_create(partition: str, fstype: str) -> None: + """Create a filesystem on a partition + + Args: + partition (str): path to a partition (for example: '/dev/sda1') + fstype (str): filesystem type ('efi' or 'ext4') + """ + if fstype == 'efi': + command = 'mkfs -t fat -n EFI' + run(f'{command} {partition}') + if fstype == 'ext4': + command = 'mkfs -t ext4 -L persistence' + run(f'{command} {partition}') + + +def partition_mount(partition: str, + path: str, + fsype: str = '', + overlay_params: dict[str, str] = {}) -> bool: + """Mount a partition into a path + + Args: + partition (str): path to a partition (for example: '/dev/sda1') + path (str): a path where to mount + fsype (str): optionally, set fstype ('squashfs', 'overlay', 'iso9660') + overlay_params (dict): optionally, set overlay parameters. + Defaults to None. + + Returns: + bool: True on success + """ + if fsype in ['squashfs', 'iso9660']: + command: str = f'mount -o loop,ro -t {fsype} {partition} {path}' + if fsype == 'overlay' and overlay_params: + command: str = f'mount -t overlay -o noatime,\ + upperdir={overlay_params["upperdir"]},\ + lowerdir={overlay_params["lowerdir"]},\ + workdir={overlay_params["workdir"]} overlay {path}' + + else: + command = f'mount {partition} {path}' + + rc = run(command) + if rc == 0: + return True + + return False + + +def partition_umount(partition: str = '', path: str = '') -> None: + """Umount a partition by a partition name or a path + + Args: + partition (str): path to a partition (for example: '/dev/sda1') + path (str): a path where a partition is mounted + """ + if partition: + command = f'umount {partition}' + run(command) + if path: + command = f'umount {path}' + run(command) + + +def find_device(mountpoint: str) -> str: + """Find a device by mountpoint + + Returns: + str: Path to device, Empty if not found + """ + mounted_partitions = disk_partitions() + for partition in mounted_partitions: + if partition.mountpoint == mountpoint: + return partition.mountpoint + return '' + + +def disks_size() -> dict[str, int]: + """Get a dictionary with physical disks and their sizes + + Returns: + dict[str, int]: a dictionary with name: size mapping + """ + disks_size: dict[str, int] = {} + lsblk: str = cmd('lsblk -Jbp') + blk_list = json_loads(lsblk) + for device in blk_list.get('blockdevices'): + if device['type'] == 'disk': + disks_size.update({device['name']: device['size']}) + return disks_size diff --git a/python/vyos/system/grub.py b/python/vyos/system/grub.py new file mode 100644 index 000000000..4ebf229a0 --- /dev/null +++ b/python/vyos/system/grub.py @@ -0,0 +1,342 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. + +from pathlib import Path +from re import MULTILINE, compile as re_compile +from typing import Union +from uuid import uuid5, NAMESPACE_URL, UUID + +from vyos.template import render +from vyos.utils.process import cmd +from vyos.system import disk + +# Define variables +GRUB_DIR_MAIN: str = '/boot/grub' +GRUB_CFG_MAIN: str = f'{GRUB_DIR_MAIN}/grub.cfg' +GRUB_DIR_VYOS: str = f'{GRUB_DIR_MAIN}/grub.cfg.d' +CFG_VYOS_HEADER: str = f'{GRUB_DIR_VYOS}/00-vyos-header.cfg' +CFG_VYOS_MODULES: str = f'{GRUB_DIR_VYOS}/10-vyos-modules-autoload.cfg' +CFG_VYOS_VARS: str = f'{GRUB_DIR_VYOS}/20-vyos-defaults-autoload.cfg' +CFG_VYOS_COMMON: str = f'{GRUB_DIR_VYOS}/25-vyos-common-autoload.cfg' +CFG_VYOS_PLATFORM: str = f'{GRUB_DIR_VYOS}/30-vyos-platform-autoload.cfg' +CFG_VYOS_MENU: str = f'{GRUB_DIR_VYOS}/40-vyos-menu-autoload.cfg' +CFG_VYOS_OPTIONS: str = f'{GRUB_DIR_VYOS}/50-vyos-options.cfg' +GRUB_DIR_VYOS_VERS: str = f'{GRUB_DIR_VYOS}/vyos-versions' + +TMPL_VYOS_VERSION: str = 'grub/grub_vyos_version.j2' +TMPL_GRUB_VARS: str = 'grub/grub_vars.j2' +TMPL_GRUB_MAIN: str = 'grub/grub_main.j2' +TMPL_GRUB_MENU: str = 'grub/grub_menu.j2' +TMPL_GRUB_MODULES: str = 'grub/grub_modules.j2' +TMPL_GRUB_OPTS: str = 'grub/grub_options.j2' +TMPL_GRUB_COMMON: str = 'grub/grub_common.j2' + +# prepare regexes +REGEX_GRUB_VARS: str = r'^set (?P<variable_name>.+)=[\'"]?(?P<variable_value>.*)(?<![\'"])[\'"]?$' +REGEX_GRUB_MODULES: str = r'^insmod (?P<module_name>.+)$' +REGEX_KERNEL_CMDLINE: str = r'^BOOT_IMAGE=/(?P<boot_type>boot|live)/((?P<image_version>.+)/)?vmlinuz.*$' + + +def install(drive_path: str, boot_dir: str, efi_dir: str, id: str = 'VyOS') -> None: + """Install GRUB for both BIOS and EFI modes (hybrid boot) + + Args: + drive_path (str): path to a drive where GRUB must be installed + boot_dir (str): a path to '/boot' directory + efi_dir (str): a path to '/boot/efi' directory + """ + commands: list[str] = [ + f'grub-install --no-floppy --target=i386-pc --boot-directory={boot_dir} \ + {drive_path} --force', + f'grub-install --no-floppy --recheck --target=x86_64-efi \ + --force-extra-removable --boot-directory={boot_dir} \ + --efi-directory={efi_dir} --bootloader-id="{id}" \ + --no-uefi-secure-boot' + ] + for command in commands: + cmd(command) + + +def gen_version_uuid(version_name: str) -> str: + """Generate unique ID from version name + + Use UUID5 / NAMESPACE_URL with prefix `uuid5-` + + Args: + version_name (str): version name + + Returns: + str: generated unique ID + """ + ver_uuid: UUID = uuid5(NAMESPACE_URL, version_name) + ver_id: str = f'uuid5-{ver_uuid}' + return ver_id + + +def version_add(version_name: str, + root_dir: str = '', + boot_opts: str = '') -> None: + """Add a new VyOS version to GRUB loader configuration + + Args: + vyos_version (str): VyOS version name + root_dir (str): an optional path to the root directory. + Defaults to empty. + boot_opts (str): an optional boot options for Linux kernel. + Defaults to empty. + """ + if not root_dir: + root_dir = disk.find_persistence() + version_config: str = f'{root_dir}/{GRUB_DIR_VYOS_VERS}/{version_name}.cfg' + render( + version_config, TMPL_VYOS_VERSION, { + 'version_name': version_name, + 'version_uuid': gen_version_uuid(version_name), + 'boot_opts': boot_opts + }) + + +def version_del(vyos_version: str, root_dir: str = '') -> None: + """Delete a VyOS version from GRUB loader configuration + + Args: + vyos_version (str): VyOS version name + root_dir (str): an optional path to the root directory. + Defaults to empty. + """ + if not root_dir: + root_dir = disk.find_persistence() + version_config: str = f'{root_dir}/{GRUB_DIR_VYOS_VERS}/{vyos_version}.cfg' + Path(version_config).unlink(missing_ok=True) + + +def version_list(root_dir: str = '') -> list[str]: + """Generate a list with installed VyOS versions + + Args: + root_dir (str): an optional path to the root directory. + Defaults to empty. + + Returns: + list: A list with versions names + """ + if not root_dir: + root_dir = disk.find_persistence() + versions_files = Path(f'{root_dir}/{GRUB_DIR_VYOS_VERS}').glob('*.cfg') + versions_list: list[str] = [] + for file in versions_files: + versions_list.append(file.stem) + versions_list.sort() + + return versions_list + + +def read_env(env_file: str = '') -> dict[str, str]: + """Read GRUB environment + + Args: + env_file (str, optional): a path to grub environment file. + Defaults to empty. + + Returns: + dict: dictionary with GRUB environment + """ + if not env_file: + root_dir: str = disk.find_persistence() + env_file = f'{root_dir}/{GRUB_DIR_MAIN}/grubenv' + + env_content: str = cmd(f'grub-editenv {env_file} list').splitlines() + regex_filter = re_compile(r'^(?P<variable_name>.*)=(?P<variable_value>.*)$') + env_dict: dict[str, str] = {} + for env_item in env_content: + search_result = regex_filter.fullmatch(env_item) + if search_result: + search_result_dict: dict[str, str] = search_result.groupdict() + variable_name: str = search_result_dict.get('variable_name', '') + variable_value: str = search_result_dict.get('variable_value', '') + if variable_name and variable_value: + env_dict.update({variable_name: variable_value}) + return env_dict + + +def get_cfg_ver(root_dir: str = '') -> int: + """Get current version of GRUB configuration + + Args: + root_dir (str, optional): an optional path to the root directory. + Defaults to empty. + + Returns: + int: a configuration version + """ + if not root_dir: + root_dir = disk.find_persistence() + + cfg_ver: str = vars_read(f'{root_dir}/{CFG_VYOS_HEADER}').get( + 'VYOS_CFG_VER') + if cfg_ver: + cfg_ver_int: int = int(cfg_ver) + else: + cfg_ver_int: int = 0 + return cfg_ver_int + + +def write_cfg_ver(cfg_ver: int, root_dir: str = '') -> None: + """Write version number of GRUB configuration + + Args: + cfg_ver (int): a version number to write + root_dir (str, optional): an optional path to the root directory. + Defaults to empty. + + Returns: + int: a configuration version + """ + if not root_dir: + root_dir = disk.find_persistence() + + vars_file: str = f'{root_dir}/{CFG_VYOS_HEADER}' + vars_current: dict[str, str] = vars_read(vars_file) + vars_current['VYOS_CFG_VER'] = str(cfg_ver) + vars_write(vars_file, vars_current) + + +def vars_read(grub_cfg: str) -> dict[str, str]: + """Read variables from a GRUB configuration file + + Args: + grub_cfg (str): a path to the GRUB config file + + Returns: + dict: a dictionary with variables and values + """ + vars_dict: dict[str, str] = {} + regex_filter = re_compile(REGEX_GRUB_VARS) + try: + config_text: list[str] = Path(grub_cfg).read_text().splitlines() + except FileNotFoundError: + return vars_dict + for line in config_text: + search_result = regex_filter.fullmatch(line) + if search_result: + search_dict = search_result.groupdict() + variable_name: str = search_dict.get('variable_name', '') + variable_value: str = search_dict.get('variable_value', '') + if variable_name and variable_value: + vars_dict.update({variable_name: variable_value}) + return vars_dict + + +def modules_read(grub_cfg: str) -> list[str]: + """Read modules list from a GRUB configuration file + + Args: + grub_cfg (str): a path to the GRUB config file + + Returns: + list: a list with modules to load + """ + mods_list: list[str] = [] + regex_filter = re_compile(REGEX_GRUB_MODULES, MULTILINE) + try: + config_text = Path(grub_cfg).read_text() + except FileNotFoundError: + return mods_list + mods_list = regex_filter.findall(config_text) + + return mods_list + + +def modules_write(grub_cfg: str, mods_list: list[str]) -> None: + """Write modules list to a GRUB configuration file (overwrite everything) + + Args: + grub_cfg (str): a path to GRUB configuration file + mods_list (list): a list with modules to load + """ + render(grub_cfg, TMPL_GRUB_MODULES, {'mods_list': mods_list}) + + +def vars_write(grub_cfg: str, grub_vars: dict[str, str]) -> None: + """Write variables to a GRUB configuration file (overwrite everything) + + Args: + grub_cfg (str): a path to GRUB configuration file + grub_vars (dict): a dictionary with new variables + """ + render(grub_cfg, TMPL_GRUB_VARS, {'vars': grub_vars}) + + +def set_default(version_name: str, root_dir: str = '') -> None: + """Set version as default boot entry + + Args: + version_name (str): versio name + root_dir (str, optional): an optional path to the root directory. + Defaults to empty. + """ + if not root_dir: + root_dir = disk.find_persistence() + + vars_file = f'{root_dir}/{CFG_VYOS_VARS}' + vars_current = vars_read(vars_file) + vars_current['default'] = gen_version_uuid(version_name) + vars_write(vars_file, vars_current) + + +def common_write(root_dir: str = '', grub_common: dict[str, str] = {}) -> None: + """Write common GRUB configuration file (overwrite everything) + + Args: + root_dir (str, optional): an optional path to the root directory. + Defaults to empty. + """ + if not root_dir: + root_dir = disk.find_persistence() + common_config = f'{root_dir}/{CFG_VYOS_COMMON}' + render(common_config, TMPL_GRUB_COMMON, grub_common) + + +def create_structure(root_dir: str = '') -> None: + """Create GRUB directories structure + + Args: + root_dir (str, optional): an optional path to the root directory. + Defaults to ''. + """ + if not root_dir: + root_dir = disk.find_persistence() + + Path(f'{root_dir}/GRUB_DIR_VYOS_VERS').mkdir(parents=True, exist_ok=True) + + +def set_console_type(console_type: str, root_dir: str = '') -> None: + """Write default console type to GRUB configuration + + Args: + console_type (str): a default console type + root_dir (str, optional): an optional path to the root directory. + Defaults to empty. + """ + if not root_dir: + root_dir = disk.find_persistence() + + vars_file: str = f'{root_dir}/{CFG_VYOS_VARS}' + vars_current: dict[str, str] = vars_read(vars_file) + vars_current['console_type'] = str(console_type) + vars_write(vars_file, vars_current) + +def set_raid(root_dir: str = '') -> None: + pass diff --git a/python/vyos/system/image.py b/python/vyos/system/image.py new file mode 100644 index 000000000..c03ce02d5 --- /dev/null +++ b/python/vyos/system/image.py @@ -0,0 +1,268 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. + +from pathlib import Path +from re import compile as re_compile +from tempfile import TemporaryDirectory +from typing import TypedDict + +from vyos import version +from vyos.system import disk, grub + +# Define variables +GRUB_DIR_MAIN: str = '/boot/grub' +GRUB_DIR_VYOS: str = f'{GRUB_DIR_MAIN}/grub.cfg.d' +CFG_VYOS_VARS: str = f'{GRUB_DIR_VYOS}/20-vyos-defaults-autoload.cfg' +GRUB_DIR_VYOS_VERS: str = f'{GRUB_DIR_VYOS}/vyos-versions' +# prepare regexes +REGEX_KERNEL_CMDLINE: str = r'^BOOT_IMAGE=/(?P<boot_type>boot|live)/((?P<image_version>.+)/)?vmlinuz.*$' +REGEX_SYSTEM_CFG_VER: str = r'(\r\n|\r|\n)SYSTEM_CFG_VER\s*=\s*(?P<cfg_ver>\d+)(\r\n|\r|\n)' + + +# structures definitions +class ImageDetails(TypedDict): + name: str + version: str + tools_version: int + disk_ro: int + disk_rw: int + disk_total: int + + +class BootDetails(TypedDict): + image_default: str + image_running: str + images_available: list[str] + console_type: str + console_num: int + + +def bootmode_detect() -> str: + """Detect system boot mode + + Returns: + str: 'bios' or 'efi' + """ + if Path('/sys/firmware/efi/').exists(): + return 'efi' + else: + return 'bios' + + +def get_image_version(mount_path: str) -> str: + """Extract version name from rootfs mounted at mount_path + + Args: + mount_path (str): mount path of rootfs + + Returns: + str: version name + """ + version_file: str = Path( + f'{mount_path}/opt/vyatta/etc/version').read_text() + version_name: str = version_file.lstrip('Version: ').strip() + + return version_name + + +def get_image_tools_version(mount_path: str) -> int: + """Extract image-tools version from rootfs mounted at mount_path + + Args: + mount_path (str): mount path of rootfs + + Returns: + str: image-tools version + """ + try: + version_file: str = Path( + f'{mount_path}/usr/lib/python3/dist-packages/vyos/system/__init__.py').read_text() + except FileNotFoundError: + system_cfg_ver: int = 0 + else: + res = re_compile(REGEX_SYSTEM_CFG_VER).search(version_file) + system_cfg_ver: int = int(res.groupdict().get('cfg_ver', 0)) + + return system_cfg_ver + + +def get_versions(image_name: str, root_dir: str = '') -> dict[str, str]: + """Return versions of image and image-tools + + Args: + image_name (str): a name of an image + root_dir (str, optional): an optional path to the root directory. + Defaults to ''. + + Returns: + dict[str, int]: a dictionary with versions of image and image-tools + """ + if not root_dir: + root_dir = disk.find_persistence() + + squashfs_file: str = next( + Path(f'{root_dir}/boot/{image_name}').glob('*.squashfs')).as_posix() + with TemporaryDirectory() as squashfs_mounted: + disk.partition_mount(squashfs_file, squashfs_mounted, 'squashfs') + + image_version: str = get_image_version(squashfs_mounted) + image_tools_version: int = get_image_tools_version(squashfs_mounted) + + disk.partition_umount(squashfs_file) + + versions: dict[str, int] = { + 'image': image_version, + 'image-tools': image_tools_version + } + + return versions + + +def get_details(image_name: str, root_dir: str = '') -> ImageDetails: + """Return information about image + + Args: + image_name (str): a name of an image + root_dir (str, optional): an optional path to the root directory. + Defaults to ''. + + Returns: + ImageDetails: a dictionary with details about an image (name, size) + """ + if not root_dir: + root_dir = disk.find_persistence() + + versions = get_versions(image_name, root_dir) + image_version: str = versions.get('image', '') + image_tools_version: int = versions.get('image-tools', 0) + + image_path: Path = Path(f'{root_dir}/boot/{image_name}') + image_path_rw: Path = Path(f'{root_dir}/boot/{image_name}/rw') + + image_disk_ro: int = int() + for item in image_path.iterdir(): + if not item.is_symlink(): + image_disk_ro += item.stat().st_size + + image_disk_rw: int = int() + for item in image_path_rw.rglob('*'): + if not item.is_symlink(): + image_disk_rw += item.stat().st_size + + image_details: ImageDetails = { + 'name': image_name, + 'version': image_version, + 'tools_version': image_tools_version, + 'disk_ro': image_disk_ro, + 'disk_rw': image_disk_rw, + 'disk_total': image_disk_ro + image_disk_rw + } + + return image_details + + +def get_images_details() -> list[ImageDetails]: + """Return information about all images + + Returns: + list[ImageDetails]: a list of dictionaries with details about images + """ + images: list[str] = grub.version_list() + images_details: list[ImageDetails] = list() + for image_name in images: + images_details.append(get_details(image_name)) + + return images_details + + +def get_running_image() -> str: + """Find currently running image name + + Returns: + str: image name + """ + running_image: str = '' + regex_filter = re_compile(REGEX_KERNEL_CMDLINE) + cmdline: str = Path('/proc/cmdline').read_text() + running_image_result = regex_filter.match(cmdline) + if running_image_result: + running_image: str = running_image_result.groupdict().get( + 'image_version', '') + # we need to have a fallback for live systems + if not running_image: + running_image: str = version.get_version() + + return running_image + + +def get_default_image(root_dir: str = '') -> str: + """Get default boot entry + + Args: + root_dir (str, optional): an optional path to the root directory. + Defaults to empty. + Returns: + str: a version name + """ + if not root_dir: + root_dir = disk.find_persistence() + + vars_file: str = f'{root_dir}/{CFG_VYOS_VARS}' + vars_current: dict[str, str] = grub.vars_read(vars_file) + default_uuid: str = vars_current.get('default', '') + if default_uuid: + images_list: list[str] = grub.version_list(root_dir) + for image_name in images_list: + if default_uuid == grub.gen_version_uuid(image_name): + return image_name + return '' + else: + return '' + + +def validate_name(image_name: str) -> bool: + """Validate image name + + Args: + image_name (str): suggested image name + + Returns: + bool: validation result + """ + regex_filter = re_compile(r'^[\w\.+-]{1,32}$') + if regex_filter.match(image_name): + return True + return False + + +def is_live_boot() -> bool: + """Detect live booted system + + Returns: + bool: True if the system currently booted in live mode + """ + regex_filter = re_compile(REGEX_KERNEL_CMDLINE) + cmdline: str = Path('/proc/cmdline').read_text() + running_image_result = regex_filter.match(cmdline) + if running_image_result: + boot_type: str = running_image_result.groupdict().get('boot_type', '') + if boot_type == 'live': + return True + return False + +def is_running_as_container() -> bool: + if Path('/.dockerenv').exists(): + return True + return False diff --git a/python/vyos/system/raid.py b/python/vyos/system/raid.py new file mode 100644 index 000000000..5b33d34da --- /dev/null +++ b/python/vyos/system/raid.py @@ -0,0 +1,122 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this library. If not, see <http://www.gnu.org/licenses/>. + +"""RAID related functions""" + +from pathlib import Path +from shutil import copy +from dataclasses import dataclass + +from vyos.utils.process import cmd, run +from vyos.system import disk + + +@dataclass +class RaidDetails: + """RAID type""" + name: str + level: str + members: list[str] + disks: list[disk.DiskDetails] + + +def raid_create(raid_members: list[str], + raid_name: str = 'md0', + raid_level: str = 'raid1') -> None: + """Create a RAID array + + Args: + raid_name (str): a name of array (data, backup, test, etc.) + raid_members (list[str]): a list of array members + raid_level (str, optional): an array level. Defaults to 'raid1'. + """ + raid_devices_num: int = len(raid_members) + raid_members_str: str = ' '.join(raid_members) + for part in raid_members: + drive: str = disk.partition_parent(part) + # set partition type GUID for raid member; cf. + # https://en.wikipedia.org/wiki/GUID_Partition_Table#Partition_type_GUIDs + command: str = f'sgdisk --typecode=3:A19D880F-05FC-4D3B-A006-743F0F84911E {drive}' + cmd(command) + command: str = f'mdadm --create /dev/{raid_name} -R --metadata=1.0 \ + --raid-devices={raid_devices_num} --level={raid_level} \ + {raid_members_str}' + + cmd(command) + + raid = RaidDetails( + name = f'/dev/{raid_name}', + level = raid_level, + members = raid_members, + disks = [disk.from_partition(m) for m in raid_members] + ) + + return raid + +def clear(): + """Deactivate all RAID arrays""" + command: str = 'mdadm --examine --scan' + raid_config = cmd(command) + if not raid_config: + return + command: str = 'mdadm --run /dev/md?*' + run(command) + command: str = 'mdadm --assemble --scan --auto=yes --symlink=no' + run(command) + command: str = 'mdadm --stop --scan' + run(command) + + +def update_initramfs() -> None: + """Update initramfs""" + mdadm_script = '/etc/initramfs-tools/scripts/local-top/mdadm' + copy('/usr/share/initramfs-tools/scripts/local-block/mdadm', mdadm_script) + p = Path(mdadm_script) + p.write_text(p.read_text().replace('$((COUNT + 1))', '20')) + command: str = 'update-initramfs -u' + cmd(command) + +def update_default(target_dir: str) -> None: + """Update /etc/default/mdadm to start MD monitoring daemon at boot + """ + source_mdadm_config = '/etc/default/mdadm' + target_mdadm_config = Path(target_dir).joinpath('/etc/default/mdadm') + target_mdadm_config_dir = Path(target_mdadm_config).parent + Path.mkdir(target_mdadm_config_dir, parents=True, exist_ok=True) + s = Path(source_mdadm_config).read_text().replace('START_DAEMON=false', + 'START_DAEMON=true') + Path(target_mdadm_config).write_text(s) + +def get_uuid(device: str) -> str: + """Get UUID of a device""" + command: str = f'tune2fs -l {device}' + l = cmd(command).splitlines() + uuid = next((x for x in l if x.startswith('Filesystem UUID')), '') + return uuid.split(':')[1].strip() if uuid else '' + +def get_uuids(raid_details: RaidDetails) -> tuple[str]: + """Get UUIDs of RAID members + + Args: + raid_name (str): a name of array (data, backup, test, etc.) + + Returns: + tuple[str]: root_disk uuid, root_md uuid + """ + raid_name: str = raid_details.name + root_partition: str = raid_details.members[0] + uuid_root_disk: str = get_uuid(root_partition) + uuid_root_md: str = get_uuid(raid_name) + return uuid_root_disk, uuid_root_md diff --git a/python/vyos/utils/convert.py b/python/vyos/utils/convert.py index 9a8a1ff7d..c02f0071e 100644 --- a/python/vyos/utils/convert.py +++ b/python/vyos/utils/convert.py @@ -52,7 +52,8 @@ def seconds_to_human(s, separator=""): return result -def bytes_to_human(bytes, initial_exponent=0, precision=2): +def bytes_to_human(bytes, initial_exponent=0, precision=2, + int_below_exponent=0): """ Converts a value in bytes to a human-readable size string like 640 KB The initial_exponent parameter is the exponent of 2, @@ -68,6 +69,8 @@ def bytes_to_human(bytes, initial_exponent=0, precision=2): # log2 is a float, while range checking requires an int exponent = int(log2(bytes)) + if exponent < int_below_exponent: + precision = 0 if exponent < 10: value = bytes diff --git a/python/vyos/utils/file.py b/python/vyos/utils/file.py index 667a2464b..9f27a7fb9 100644 --- a/python/vyos/utils/file.py +++ b/python/vyos/utils/file.py @@ -134,6 +134,12 @@ def chmod_755(path): S_IROTH | S_IXOTH chmod(path, bitmask) +def chmod_2775(path): + """ user/group permissions with set-group-id bit set """ + from stat import S_ISGID, S_IRWXU, S_IRWXG, S_IROTH, S_IXOTH + + bitmask = S_ISGID | S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH + chmod(path, bitmask) def makedir(path, user=None, group=None): if os.path.exists(path): diff --git a/python/vyos/utils/io.py b/python/vyos/utils/io.py index 8790cbaac..0afaf695c 100644 --- a/python/vyos/utils/io.py +++ b/python/vyos/utils/io.py @@ -13,6 +13,8 @@ # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see <http://www.gnu.org/licenses/>. +from typing import Callable + def print_error(str='', end='\n'): """ Print `str` to stderr, terminated with `end`. @@ -24,13 +26,18 @@ def print_error(str='', end='\n'): sys.stderr.write(end) sys.stderr.flush() -def ask_input(question, default='', numeric_only=False, valid_responses=[]): +def ask_input(question, default='', numeric_only=False, valid_responses=[], + no_echo=False): + from getpass import getpass question_out = question if default: question_out += f' (Default: {default})' response = '' while True: - response = input(question_out + ' ').strip() + if not no_echo: + response = input(question_out + ' ').strip() + else: + response = getpass(question_out + ' ').strip() if not response and default: return default if numeric_only: @@ -72,3 +79,26 @@ def is_dumb_terminal(): """Check if the current TTY is dumb, so that we can disable advanced terminal features.""" import os return os.getenv('TERM') in ['vt100', 'dumb'] + +def select_entry(l: list, list_msg: str = '', prompt_msg: str = '', + list_format: Callable = None,) -> str: + """Select an entry from a list + + Args: + l (list): a list of entries + list_msg (str): a message to print before listing the entries + prompt_msg (str): a message to print as prompt for selection + + Returns: + str: a selected entry + """ + en = list(enumerate(l, 1)) + print(list_msg) + for i, e in en: + if list_format: + print(f'\t{i}: {list_format(e)}') + else: + print(f'\t{i}: {e}') + select = ask_input(prompt_msg, numeric_only=True, + valid_responses=range(1, len(l)+1)) + return next(filter(lambda x: x[0] == select, en))[1] |