diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/vyos/config_mgmt.py | 20 | ||||
-rw-r--r-- | python/vyos/configdiff.py | 24 | ||||
-rw-r--r-- | python/vyos/configsession.py | 16 | ||||
-rw-r--r-- | python/vyos/defaults.py | 9 | ||||
-rw-r--r-- | python/vyos/frr.py | 6 | ||||
-rw-r--r-- | python/vyos/ifconfig/interface.py | 14 | ||||
-rw-r--r-- | python/vyos/ifconfig/macsec.py | 4 | ||||
-rw-r--r-- | python/vyos/ifconfig/vxlan.py | 21 | ||||
-rw-r--r-- | python/vyos/progressbar.py | 33 | ||||
-rw-r--r-- | python/vyos/remote.py | 168 | ||||
-rw-r--r-- | python/vyos/system/__init__.py | 18 | ||||
-rw-r--r-- | python/vyos/system/compat.py | 316 | ||||
-rw-r--r-- | python/vyos/system/disk.py | 217 | ||||
-rw-r--r-- | python/vyos/system/grub.py | 340 | ||||
-rw-r--r-- | python/vyos/system/image.py | 268 | ||||
-rw-r--r-- | python/vyos/system/raid.py | 115 | ||||
-rw-r--r-- | python/vyos/template.py | 6 | ||||
-rw-r--r-- | python/vyos/utils/convert.py | 5 | ||||
-rw-r--r-- | python/vyos/utils/file.py | 6 | ||||
-rw-r--r-- | python/vyos/utils/io.py | 35 | ||||
-rw-r--r-- | python/vyos/utils/network.py | 50 |
21 files changed, 1607 insertions, 84 deletions
diff --git a/python/vyos/config_mgmt.py b/python/vyos/config_mgmt.py index 654a8d698..df7240c88 100644 --- a/python/vyos/config_mgmt.py +++ b/python/vyos/config_mgmt.py @@ -22,10 +22,11 @@ import logging from typing import Optional, Tuple, Union from filecmp import cmp from datetime import datetime -from textwrap import dedent +from textwrap import dedent, indent from pathlib import Path from tabulate import tabulate from shutil import copy, chown +from urllib.parse import urlsplit, urlunsplit from vyos.config import Config from vyos.configtree import ConfigTree, ConfigTreeError, show_diff @@ -377,9 +378,22 @@ Proceed ?''' remote_file = f'config.boot-{hostname}.{timestamp}' source_address = self.source_address + if self.effective_locations: + print("Archiving config...") for location in self.effective_locations: - upload(archive_config_file, f'{location}/{remote_file}', - source_host=source_address) + url = urlsplit(location) + _, _, netloc = url.netloc.rpartition("@") + redacted_location = urlunsplit(url._replace(netloc=netloc)) + print(f" {redacted_location}", end=" ", flush=True) + try: + upload(archive_config_file, f'{location}/{remote_file}', + source_host=source_address, raise_error=True) + print("OK") + except Exception as e: + print("FAILED!") + print() + print(indent(str(e), " > ")) + print() # op-mode functions # diff --git a/python/vyos/configdiff.py b/python/vyos/configdiff.py index 1ec2dfafe..03b06c6d9 100644 --- a/python/vyos/configdiff.py +++ b/python/vyos/configdiff.py @@ -165,6 +165,30 @@ class ConfigDiff(object): return True return False + def node_changed_presence(self, path=[]) -> bool: + if self._diff_tree is None: + raise NotImplementedError("diff_tree class not available") + + path = self._make_path(path) + before = self._diff_tree.left.exists(path) + after = self._diff_tree.right.exists(path) + return (before and not after) or (not before and after) + + def node_changed_children(self, path=[]) -> list: + if self._diff_tree is None: + raise NotImplementedError("diff_tree class not available") + + path = self._make_path(path) + add = self._diff_tree.add + sub = self._diff_tree.sub + children = set() + if add.exists(path): + children.update(add.list_nodes(path)) + if sub.exists(path): + children.update(sub.list_nodes(path)) + + return list(children) + def get_child_nodes_diff_str(self, path=[]): ret = {'add': {}, 'change': {}, 'delete': {}} diff --git a/python/vyos/configsession.py b/python/vyos/configsession.py index 6d4b2af59..90842b749 100644 --- a/python/vyos/configsession.py +++ b/python/vyos/configsession.py @@ -30,11 +30,15 @@ SHOW_CONFIG = ['/bin/cli-shell-api', 'showConfig'] LOAD_CONFIG = ['/bin/cli-shell-api', 'loadFile'] MIGRATE_LOAD_CONFIG = ['/usr/libexec/vyos/vyos-load-config.py'] SAVE_CONFIG = ['/usr/libexec/vyos/vyos-save-config.py'] -INSTALL_IMAGE = ['/opt/vyatta/sbin/install-image', '--url'] -REMOVE_IMAGE = ['/opt/vyatta/bin/vyatta-boot-image.pl', '--del'] +INSTALL_IMAGE = ['/usr/libexec/vyos/op_mode/image_installer.py', + '--action', 'add', '--no-prompt', '--image-path'] +REMOVE_IMAGE = ['/usr/libexec/vyos/op_mode/image_manager.py', + '--action', 'delete', '--no-prompt', '--image-name'] GENERATE = ['/opt/vyatta/bin/vyatta-op-cmd-wrapper', 'generate'] SHOW = ['/opt/vyatta/bin/vyatta-op-cmd-wrapper', 'show'] RESET = ['/opt/vyatta/bin/vyatta-op-cmd-wrapper', 'reset'] +REBOOT = ['/opt/vyatta/bin/vyatta-op-cmd-wrapper', 'reboot'] +POWEROFF = ['/opt/vyatta/bin/vyatta-op-cmd-wrapper', 'poweroff'] OP_CMD_ADD = ['/opt/vyatta/bin/vyatta-op-cmd-wrapper', 'add'] OP_CMD_DELETE = ['/opt/vyatta/bin/vyatta-op-cmd-wrapper', 'delete'] @@ -220,10 +224,18 @@ class ConfigSession(object): out = self.__run_command(SHOW + path) return out + def reboot(self, path): + out = self.__run_command(REBOOT + path) + return out + def reset(self, path): out = self.__run_command(RESET + path) return out + def poweroff(self, path): + out = self.__run_command(POWEROFF + path) + return out + def add_container_image(self, name): out = self.__run_command(OP_CMD_ADD + ['container', 'image'] + [name]) return out diff --git a/python/vyos/defaults.py b/python/vyos/defaults.py index a229533bd..2f3580571 100644 --- a/python/vyos/defaults.py +++ b/python/vyos/defaults.py @@ -50,15 +50,6 @@ https_data = { 'listen_addresses' : { '*': ['_'] } } -api_data = { - 'listen_address' : '127.0.0.1', - 'port' : '8080', - 'socket' : False, - 'strict' : False, - 'debug' : False, - 'api_keys' : [ {'id' : 'testapp', 'key' : 'qwerty'} ] -} - vyos_cert_data = { 'conf' : '/etc/nginx/snippets/vyos-cert.conf', 'crt' : '/etc/ssl/certs/vyos-selfsigned.crt', diff --git a/python/vyos/frr.py b/python/vyos/frr.py index ad5c207f5..a01d967e4 100644 --- a/python/vyos/frr.py +++ b/python/vyos/frr.py @@ -86,12 +86,8 @@ ch2 = logging.StreamHandler(stream=sys.stdout) LOG.addHandler(ch) LOG.addHandler(ch2) -# Full list of FRR 9.0/stable daemons for reference -#_frr_daemons = ['zebra', 'staticd', 'bgpd', 'ospfd', 'ospf6d', 'ripd', 'ripngd', -# 'isisd', 'pim6d', 'ldpd', 'eigrpd', 'babeld', 'sharpd', 'bfdd', -# 'fabricd', 'pathd'] _frr_daemons = ['zebra', 'staticd', 'bgpd', 'ospfd', 'ospf6d', 'ripd', 'ripngd', - 'isisd', 'pim6d', 'ldpd', 'babeld', 'bfdd'] + 'isisd', 'pimd', 'pim6d', 'ldpd', 'eigrpd', 'babeld', 'bfdd'] path_vtysh = '/usr/bin/vtysh' path_frr_reload = '/usr/lib/frr/frr-reload.py' diff --git a/python/vyos/ifconfig/interface.py b/python/vyos/ifconfig/interface.py index 050095364..1586710db 100644 --- a/python/vyos/ifconfig/interface.py +++ b/python/vyos/ifconfig/interface.py @@ -496,12 +496,12 @@ class Interface(Control): from hashlib import sha256 # Get processor ID number - cpu_id = self._cmd('sudo dmidecode -t 4 | grep ID | head -n1 | sed "s/.*ID://;s/ //g"') + cpu_id = self._cmd('sudo dmidecode -t 4 | grep ID | head -n1 | sed "s/.*ID://;s/ //g"') # XXX: T3894 - it seems not all systems have eth0 - get a list of all # available Ethernet interfaces on the system (without VLAN subinterfaces) # and then take the first one. - all_eth_ifs = [x for x in Section.interfaces('ethernet') if '.' not in x] + all_eth_ifs = Section.interfaces('ethernet', vlan=False) first_mac = Interface(all_eth_ifs[0]).get_mac() sha = sha256() @@ -571,6 +571,16 @@ class Interface(Control): self._cmd(f'ip link set dev {self.ifname} netns {netns}') return True + def get_vrf(self): + """ + Get VRF from interface + + Example: + >>> from vyos.ifconfig import Interface + >>> Interface('eth0').get_vrf() + """ + return self.get_interface('vrf') + def set_vrf(self, vrf: str) -> bool: """ Add/Remove interface from given VRF instance. diff --git a/python/vyos/ifconfig/macsec.py b/python/vyos/ifconfig/macsec.py index 9329c5ee7..bde1d9aec 100644 --- a/python/vyos/ifconfig/macsec.py +++ b/python/vyos/ifconfig/macsec.py @@ -45,6 +45,10 @@ class MACsecIf(Interface): # create tunnel interface cmd = 'ip link add link {source_interface} {ifname} type {type}'.format(**self.config) cmd += f' cipher {self.config["security"]["cipher"]}' + + if 'encrypt' in self.config["security"]: + cmd += ' encrypt on' + self._cmd(cmd) # Check if using static keys diff --git a/python/vyos/ifconfig/vxlan.py b/python/vyos/ifconfig/vxlan.py index 8c5a0220e..23b6daa3a 100644 --- a/python/vyos/ifconfig/vxlan.py +++ b/python/vyos/ifconfig/vxlan.py @@ -22,6 +22,7 @@ from vyos.utils.assertion import assert_list from vyos.utils.dict import dict_search from vyos.utils.network import get_interface_config from vyos.utils.network import get_vxlan_vlan_tunnels +from vyos.utils.network import get_vxlan_vni_filter @Interface.register class VXLANIf(Interface): @@ -79,6 +80,7 @@ class VXLANIf(Interface): 'parameters.ip.ttl' : 'ttl', 'parameters.ipv6.flowlabel' : 'flowlabel', 'parameters.nolearning' : 'nolearning', + 'parameters.vni_filter' : 'vnifilter', 'remote' : 'remote', 'source_address' : 'local', 'source_interface' : 'dev', @@ -138,10 +140,14 @@ class VXLANIf(Interface): if not isinstance(state, bool): raise ValueError('Value out of range') - cur_vlan_ids = [] if 'vlan_to_vni_removed' in self.config: - cur_vlan_ids = self.config['vlan_to_vni_removed'] - for vlan in cur_vlan_ids: + cur_vni_filter = get_vxlan_vni_filter(self.ifname) + for vlan, vlan_config in self.config['vlan_to_vni_removed'].items(): + # If VNI filtering is enabled, remove matching VNI filter + if dict_search('parameters.vni_filter', self.config) != None: + vni = vlan_config['vni'] + if vni in cur_vni_filter: + self._cmd(f'bridge vni delete dev {self.ifname} vni {vni}') self._cmd(f'bridge vlan del dev {self.ifname} vid {vlan}') # Determine current OS Kernel vlan_tunnel setting - only adjust when needed @@ -151,10 +157,9 @@ class VXLANIf(Interface): if cur_state != new_state: self.set_interface('vlan_tunnel', new_state) - # Determine current OS Kernel configured VLANs - os_configured_vlan_ids = get_vxlan_vlan_tunnels(self.ifname) - if 'vlan_to_vni' in self.config: + # Determine current OS Kernel configured VLANs + os_configured_vlan_ids = get_vxlan_vlan_tunnels(self.ifname) add_vlan = list_diff(list(self.config['vlan_to_vni'].keys()), os_configured_vlan_ids) for vlan, vlan_config in self.config['vlan_to_vni'].items(): @@ -168,6 +173,10 @@ class VXLANIf(Interface): self._cmd(f'bridge vlan add dev {self.ifname} vid {vlan}') self._cmd(f'bridge vlan add dev {self.ifname} vid {vlan} tunnel_info id {vni}') + # If VNI filtering is enabled, install matching VNI filter + if dict_search('parameters.vni_filter', self.config) != None: + self._cmd(f'bridge vni add dev {self.ifname} vni {vni}') + def update(self, config): """ General helper function which works on a dictionary retrived by get_config_dict(). It's main intention is to consolidate the scattered diff --git a/python/vyos/progressbar.py b/python/vyos/progressbar.py index 1793c445b..7bc9d9856 100644 --- a/python/vyos/progressbar.py +++ b/python/vyos/progressbar.py @@ -19,26 +19,35 @@ import signal import subprocess import sys +from vyos.utils.io import is_dumb_terminal from vyos.utils.io import print_error + class Progressbar: def __init__(self, step=None): self.total = 0.0 self.step = step + # Silently ignore all calls if terminal capabilities are lacking. + # This will also prevent the output from littering Ansible logs, + # as `ansible.netcommon.network_cli' coaxes the terminal into believing + # it is interactive. + self._dumb = is_dumb_terminal() def __enter__(self): - # Recalculate terminal width with every window resize. - signal.signal(signal.SIGWINCH, lambda signum, frame: self._update_cols()) - # Disable line wrapping to prevent the staircase effect. - subprocess.run(['tput', 'rmam'], check=False) - self._update_cols() - # Print an empty progressbar with entry. - self.progress(0, 1) + if not self._dumb: + # Recalculate terminal width with every window resize. + signal.signal(signal.SIGWINCH, lambda signum, frame: self._update_cols()) + # Disable line wrapping to prevent the staircase effect. + subprocess.run(['tput', 'rmam'], check=False) + self._update_cols() + # Print an empty progressbar with entry. + self.progress(0, 1) return self def __exit__(self, exc_type, kexc_val, exc_tb): - # Revert to the default SIGWINCH handler (ie nothing). - signal.signal(signal.SIGWINCH, signal.SIG_DFL) - # Reenable line wrapping. - subprocess.run(['tput', 'smam'], check=False) + if not self._dumb: + # Revert to the default SIGWINCH handler (ie nothing). + signal.signal(signal.SIGWINCH, signal.SIG_DFL) + # Reenable line wrapping. + subprocess.run(['tput', 'smam'], check=False) def _update_cols(self): # `os.get_terminal_size()' is fast enough for our purposes. self.col = max(os.get_terminal_size().columns - 15, 20) @@ -60,7 +69,7 @@ class Progressbar: Stateless progressbar taking no input at init and current progress with final size at callback (for SSH) """ - if done <= total: + if done <= total and not self._dumb: length = math.ceil(self.col * done / total) percentage = str(math.ceil(100 * done / total)).rjust(3) # Carriage return at the end will make sure the line will get overwritten. diff --git a/python/vyos/remote.py b/python/vyos/remote.py index 1ca8a9530..fec44b571 100644 --- a/python/vyos/remote.py +++ b/python/vyos/remote.py @@ -14,6 +14,7 @@ # License along with this library. If not, see <http://www.gnu.org/licenses/>. import os +import pwd import shutil import socket import ssl @@ -22,6 +23,9 @@ import sys import tempfile import urllib.parse +from contextlib import contextmanager +from pathlib import Path + from ftplib import FTP from ftplib import FTP_TLS @@ -34,9 +38,10 @@ from requests.packages.urllib3 import PoolManager from vyos.progressbar import Progressbar from vyos.utils.io import ask_yes_no +from vyos.utils.io import is_interactive from vyos.utils.io import print_error from vyos.utils.misc import begin -from vyos.utils.process import cmd +from vyos.utils.process import cmd, rc_cmd from vyos.version import get_version CHUNK_SIZE = 8192 @@ -49,7 +54,7 @@ class InteractivePolicy(MissingHostKeyPolicy): def missing_host_key(self, client, hostname, key): print_error(f"Host '{hostname}' not found in known hosts.") print_error('Fingerprint: ' + key.get_fingerprint().hex()) - if sys.stdout.isatty() and ask_yes_no('Do you wish to continue?'): + if is_interactive() and ask_yes_no('Do you wish to continue?'): if client._host_keys_filename\ and ask_yes_no('Do you wish to permanently add this host/key pair to known hosts?'): client._host_keys.add(hostname, key.get_name(), key) @@ -72,6 +77,17 @@ class SourceAdapter(HTTPAdapter): num_pools=connections, maxsize=maxsize, block=block, source_address=self._source_pair) +@contextmanager +def umask(mask: int): + """ + Context manager that temporarily sets the process umask. + """ + import os + oldmask = os.umask(mask) + try: + yield + finally: + os.umask(oldmask) def check_storage(path, size): """ @@ -250,7 +266,6 @@ class HttpC: allow_redirects=True, timeout=self.timeout) as r: # Abort early if the destination is inaccessible. - print('pre-3') r.raise_for_status() # If the request got redirected, keep the last URL we ended up with. final_urlstring = r.url @@ -310,30 +325,138 @@ class TftpC: with open(location, 'rb') as f: cmd(f'{self.command} -T - "{self.urlstring}"', input=f.read()) +class GitC: + def __init__(self, + url, + progressbar=False, + check_space=False, + source_host=None, + source_port=0, + timeout=10, + ): + self.command = 'git' + self.url = url + self.urlstring = urllib.parse.urlunsplit(url) + if self.urlstring.startswith("git+"): + self.urlstring = self.urlstring.replace("git+", "", 1) + + def download(self, location: str): + raise NotImplementedError("not supported") + + @umask(0o077) + def upload(self, location: str): + scheme = self.url.scheme + _, _, scheme = scheme.partition("+") + netloc = self.url.netloc + url = Path(self.url.path).parent + with tempfile.TemporaryDirectory(prefix="git-commit-archive-") as directory: + # Determine username, fullname, email for Git commit + pwd_entry = pwd.getpwuid(os.getuid()) + user = pwd_entry.pw_name + name = pwd_entry.pw_gecos.split(",")[0] or user + fqdn = socket.getfqdn() + email = f"{user}@{fqdn}" + + # environment vars for our git commands + env = { + "GIT_TERMINAL_PROMPT": "0", + "GIT_AUTHOR_NAME": name, + "GIT_AUTHOR_EMAIL": email, + "GIT_COMMITTER_NAME": name, + "GIT_COMMITTER_EMAIL": email, + } + + # build ssh command for git + ssh_command = ["ssh"] + + # if we are not interactive, we use StrictHostKeyChecking=yes to avoid any prompts + if not sys.stdout.isatty(): + ssh_command += ["-o", "StrictHostKeyChecking=yes"] + + env["GIT_SSH_COMMAND"] = " ".join(ssh_command) + + # git clone + path_repository = Path(directory) / "repository" + scheme = f"{scheme}://" if scheme else "" + rc, out = rc_cmd( + [self.command, "clone", f"{scheme}{netloc}{url}", str(path_repository), "--depth=1"], + env=env, + shell=False, + ) + if rc: + raise Exception(out) + + # git add + filename = Path(Path(self.url.path).name).stem + dst = path_repository / filename + shutil.copy2(location, dst) + rc, out = rc_cmd( + [self.command, "-C", str(path_repository), "add", filename], + env=env, + shell=False, + ) + + # git commit -m + commit_message = os.environ.get("COMMIT_COMMENT", "commit") + rc, out = rc_cmd( + [self.command, "-C", str(path_repository), "commit", "-m", commit_message], + env=env, + shell=False, + ) + + # git push + rc, out = rc_cmd( + [self.command, "-C", str(path_repository), "push"], + env=env, + shell=False, + ) + if rc: + raise Exception(out) + def urlc(urlstring, *args, **kwargs): """ Dynamically dispatch the appropriate protocol class. """ - url_classes = {'http': HttpC, 'https': HttpC, 'ftp': FtpC, 'ftps': FtpC, \ - 'sftp': SshC, 'ssh': SshC, 'scp': SshC, 'tftp': TftpC} + url_classes = { + "http": HttpC, + "https": HttpC, + "ftp": FtpC, + "ftps": FtpC, + "sftp": SshC, + "ssh": SshC, + "scp": SshC, + "tftp": TftpC, + "git": GitC, + } url = urllib.parse.urlsplit(urlstring) + scheme, _, _ = url.scheme.partition("+") try: - return url_classes[url.scheme](url, *args, **kwargs) + return url_classes[scheme](url, *args, **kwargs) except KeyError: - raise ValueError(f'Unsupported URL scheme: "{url.scheme}"') + raise ValueError(f'Unsupported URL scheme: "{scheme}"') -def download(local_path, urlstring, *args, **kwargs): +def download(local_path, urlstring, progressbar=False, check_space=False, + source_host='', source_port=0, timeout=10.0, raise_error=False): try: - urlc(urlstring, *args, **kwargs).download(local_path) + progressbar = progressbar and is_interactive() + urlc(urlstring, progressbar, check_space, source_host, source_port, timeout).download(local_path) except Exception as err: + if raise_error: + raise print_error(f'Unable to download "{urlstring}": {err}') + except KeyboardInterrupt: + print_error('\nDownload aborted by user.') -def upload(local_path, urlstring, *args, **kwargs): +def upload(local_path, urlstring, progressbar=False, + source_host='', source_port=0, timeout=10.0): try: - urlc(urlstring, *args, **kwargs).upload(local_path) + progressbar = progressbar and is_interactive() + urlc(urlstring, progressbar, source_host, source_port, timeout).upload(local_path) except Exception as err: print_error(f'Unable to upload "{urlstring}": {err}') + except KeyboardInterrupt: + print_error('\nUpload aborted by user.') def get_remote_config(urlstring, source_host='', source_port=0): """ @@ -346,26 +469,3 @@ def get_remote_config(urlstring, source_host='', source_port=0): return f.read() finally: os.remove(temp) - -def friendly_download(local_path, urlstring, source_host='', source_port=0): - """ - Download with a progress bar, reassuring messages and free space checks. - """ - try: - print_error('Downloading...') - download(local_path, urlstring, True, True, source_host, source_port) - except KeyboardInterrupt: - print_error('\nDownload aborted by user.') - sys.exit(1) - except: - import traceback - print_error(f'Failed to download {urlstring}.') - # There are a myriad different reasons a download could fail. - # SSH errors, FTP errors, I/O errors, HTTP errors (403, 404...) - # We omit the scary stack trace but print the error nevertheless. - exc_type, exc_value, exc_traceback = sys.exc_info() - traceback.print_exception(exc_type, exc_value, None, 0, None, False) - sys.exit(1) - else: - print_error('Download complete.') - sys.exit(0) diff --git a/python/vyos/system/__init__.py b/python/vyos/system/__init__.py new file mode 100644 index 000000000..0c91330ba --- /dev/null +++ b/python/vyos/system/__init__.py @@ -0,0 +1,18 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. + +__all_: list[str] = ['disk', 'grub', 'image'] +# define image-tools version +SYSTEM_CFG_VER = 1 diff --git a/python/vyos/system/compat.py b/python/vyos/system/compat.py new file mode 100644 index 000000000..319c3dabf --- /dev/null +++ b/python/vyos/system/compat.py @@ -0,0 +1,316 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this library. If not, see <http://www.gnu.org/licenses/>. + +from pathlib import Path +from re import compile, MULTILINE, DOTALL +from functools import wraps +from copy import deepcopy +from typing import Union + +from vyos.system import disk, grub, image, SYSTEM_CFG_VER +from vyos.template import render + +TMPL_GRUB_COMPAT: str = 'grub/grub_compat.j2' + +# define regexes and variables +REGEX_VERSION = r'^menuentry "[^\n]*{\n[^}]*\s+linux /boot/(?P<version>\S+)/[^}]*}' +REGEX_MENUENTRY = r'^menuentry "[^\n]*{\n[^}]*\s+linux /boot/(?P<version>\S+)/vmlinuz (?P<options>[^\n]+)\n[^}]*}' +REGEX_CONSOLE = r'^.*console=(?P<console_type>[^\s\d]+)(?P<console_num>[\d]+).*$' +REGEX_SANIT_CONSOLE = r'\ ?console=[^\s\d]+[\d]+(,\d+)?\ ?' +REGEX_SANIT_INIT = r'\ ?init=\S*\ ?' +REGEX_SANIT_QUIET = r'\ ?quiet\ ?' +PW_RESET_OPTION = 'init=/opt/vyatta/sbin/standalone_root_pw_reset' + + +class DowngradingImageTools(Exception): + """Raised when attempting to add an image with an earlier version + of image-tools than the current system, as indicated by the value + of SYSTEM_CFG_VER or absence thereof.""" + pass + + +def mode(): + if grub.get_cfg_ver() >= SYSTEM_CFG_VER: + return False + + return True + + +def find_versions(menu_entries: list) -> list: + """Find unique VyOS versions from menu entries + + Args: + menu_entries (list): a list with menu entries + + Returns: + list: List of installed versions + """ + versions = [] + for vyos_ver in menu_entries: + versions.append(vyos_ver.get('version')) + # remove duplicates + versions = list(set(versions)) + return versions + + +def filter_unparsed(grub_path: str) -> str: + """Find currently installed VyOS version + + Args: + grub_path (str): a path to the grub.cfg file + + Returns: + str: unparsed grub.cfg items + """ + config_text = Path(grub_path).read_text() + regex_filter = compile(REGEX_VERSION, MULTILINE | DOTALL) + filtered = regex_filter.sub('', config_text) + regex_filter = compile(grub.REGEX_GRUB_VARS, MULTILINE) + filtered = regex_filter.sub('', filtered) + regex_filter = compile(grub.REGEX_GRUB_MODULES, MULTILINE) + filtered = regex_filter.sub('', filtered) + # strip extra new lines + filtered = filtered.strip() + return filtered + + +def get_search_root(unparsed: str) -> str: + unparsed_lines = unparsed.splitlines() + search_root = next((x for x in unparsed_lines if 'search' in x), '') + return search_root + + +def sanitize_boot_opts(boot_opts: str) -> str: + """Sanitize boot options from console and init + + Args: + boot_opts (str): boot options + + Returns: + str: sanitized boot options + """ + regex_filter = compile(REGEX_SANIT_CONSOLE) + boot_opts = regex_filter.sub('', boot_opts) + regex_filter = compile(REGEX_SANIT_INIT) + boot_opts = regex_filter.sub('', boot_opts) + # legacy tools add 'quiet' on add system image; this is not desired + regex_filter = compile(REGEX_SANIT_QUIET) + boot_opts = regex_filter.sub(' ', boot_opts) + + return boot_opts + + +def parse_entry(entry: tuple) -> dict: + """Parse GRUB menuentry + + Args: + entry (tuple): tuple of (version, options) + + Returns: + dict: dictionary with parsed options + """ + # save version to dict + entry_dict = {'version': entry[0]} + # detect boot mode type + if PW_RESET_OPTION in entry[1]: + entry_dict['bootmode'] = 'pw_reset' + else: + entry_dict['bootmode'] = 'normal' + # find console type and number + regex_filter = compile(REGEX_CONSOLE) + entry_dict.update(regex_filter.match(entry[1]).groupdict()) + entry_dict['boot_opts'] = sanitize_boot_opts(entry[1]) + + return entry_dict + + +def parse_menuentries(grub_path: str) -> list: + """Parse all GRUB menuentries + + Args: + grub_path (str): a path to GRUB config file + + Returns: + list: list with menu items (each item is a dict) + """ + menuentries = [] + # read configuration file + config_text = Path(grub_path).read_text() + # parse menuentries to tuples (version, options) + regex_filter = compile(REGEX_MENUENTRY, MULTILINE) + filter_results = regex_filter.findall(config_text) + # parse each entry + for entry in filter_results: + menuentries.append(parse_entry(entry)) + + return menuentries + + +def prune_vyos_versions(root_dir: str = '') -> None: + """Delete vyos-versions files of registered images subsequently deleted + or renamed by legacy image-tools + + Args: + root_dir (str): an optional path to the root directory + """ + if not root_dir: + root_dir = disk.find_persistence() + + for version in grub.version_list(): + if not Path(f'{root_dir}/boot/{version}').is_dir(): + grub.version_del(version) + + +def update_cfg_ver(root_dir:str = '') -> int: + """Get minumum version of image-tools across all installed images + + Args: + root_dir (str): an optional path to the root directory + + Returns: + int: minimum version of image-tools + """ + if not root_dir: + root_dir = disk.find_persistence() + + prune_vyos_versions(root_dir) + + images_details = image.get_images_details() + cfg_version = min(d['tools_version'] for d in images_details) + + return cfg_version + + +def get_default(menu_entries: list, root_dir: str = '') -> Union[int, None]: + """Translate default version to menuentry index + + Args: + menu_entries (list): list of dicts of installed version boot data + root_dir (str): an optional path to the root directory + + Returns: + int: index of default version in menu_entries or None + """ + if not root_dir: + root_dir = disk.find_persistence() + + grub_cfg_main = f'{root_dir}/{grub.GRUB_CFG_MAIN}' + + image_name = image.get_default_image() + + sublist = list(filter(lambda x: x.get('version') == image_name, + menu_entries)) + if sublist: + return menu_entries.index(sublist[0]) + + return None + + +def update_version_list(root_dir: str = '') -> list[dict]: + """Update list of dicts of installed version boot data + + Args: + root_dir (str): an optional path to the root directory + + Returns: + list: list of dicts of installed version boot data + """ + if not root_dir: + root_dir = disk.find_persistence() + + grub_cfg_main = f'{root_dir}/{grub.GRUB_CFG_MAIN}' + + # get list of versions in menuentries + menu_entries = parse_menuentries(grub_cfg_main) + menu_versions = find_versions(menu_entries) + + # get list of versions added/removed by image-tools + current_versions = grub.version_list(root_dir) + + remove = list(set(menu_versions) - set(current_versions)) + for ver in remove: + menu_entries = list(filter(lambda x: x.get('version') != ver, + menu_entries)) + + add = list(set(current_versions) - set(menu_versions)) + for ver in add: + last = menu_entries[0].get('version') + new = deepcopy(list(filter(lambda x: x.get('version') == last, + menu_entries))) + for e in new: + boot_opts = e.get('boot_opts').replace(last, ver) + e.update({'version': ver, 'boot_opts': boot_opts}) + + menu_entries = new + menu_entries + + return menu_entries + + +def grub_cfg_fields(root_dir: str = '') -> dict: + """Gather fields for rendering grub.cfg + + Args: + root_dir (str): an optional path to the root directory + + Returns: + dict: dictionary for rendering TMPL_GRUB_COMPAT + """ + if not root_dir: + root_dir = disk.find_persistence() + + grub_cfg_main = f'{root_dir}/{grub.GRUB_CFG_MAIN}' + + fields = {'default': 0, 'timeout': 5} + # 'default' and 'timeout' from legacy grub.cfg + fields |= grub.vars_read(grub_cfg_main) + + fields['tools_version'] = SYSTEM_CFG_VER + menu_entries = update_version_list(root_dir) + fields['versions'] = menu_entries + + default = get_default(menu_entries, root_dir) + if default is not None: + fields['default'] = default + + modules = grub.modules_read(grub_cfg_main) + fields['modules'] = modules + + unparsed = filter_unparsed(grub_cfg_main).splitlines() + search_root = next((x for x in unparsed if 'search' in x), '') + fields['search_root'] = search_root + + return fields + + +def render_grub_cfg(root_dir: str = '') -> None: + """Render grub.cfg for legacy compatibility""" + if not root_dir: + root_dir = disk.find_persistence() + + grub_cfg_main = f'{root_dir}/{grub.GRUB_CFG_MAIN}' + + fields = grub_cfg_fields(root_dir) + render(grub_cfg_main, TMPL_GRUB_COMPAT, fields) + + +def grub_cfg_update(func): + """Decorator to update grub.cfg after function call""" + @wraps(func) + def wrapper(*args, **kwargs): + ret = func(*args, **kwargs) + if mode(): + render_grub_cfg() + return ret + return wrapper diff --git a/python/vyos/system/disk.py b/python/vyos/system/disk.py new file mode 100644 index 000000000..49e6b5c5e --- /dev/null +++ b/python/vyos/system/disk.py @@ -0,0 +1,217 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. + +from json import loads as json_loads +from os import sync +from dataclasses import dataclass + +from psutil import disk_partitions + +from vyos.utils.process import run, cmd + + +@dataclass +class DiskDetails: + """Disk details""" + name: str + partition: dict[str, str] + + +def disk_cleanup(drive_path: str) -> None: + """Clean up disk partition table (MBR and GPT) + Zeroize primary and secondary headers - first and last 17408 bytes + (512 bytes * 34 LBA) on a drive + + Args: + drive_path (str): path to a drive that needs to be cleaned + """ + run(f'sgdisk -Z {drive_path}') + + +def find_persistence() -> str: + """Find a mountpoint for persistence storage + + Returns: + str: Path where 'persistance' pertition is mounted, Empty if not found + """ + mounted_partitions = disk_partitions() + for partition in mounted_partitions: + if partition.mountpoint.endswith('/persistence'): + return partition.mountpoint + return '' + + +def parttable_create(drive_path: str, root_size: int) -> None: + """Create a hybrid MBR/GPT partition table + 0-2047 first sectors are free + 2048-4095 sectors - BIOS Boot Partition + 4096 + 256 MB - EFI system partition + Everything else till the end of a drive - Linux partition + + Args: + drive_path (str): path to a drive + """ + if not root_size: + root_size_text: str = '+100%' + else: + root_size_text: str = str(root_size) + command = f'sgdisk -a1 -n1:2048:4095 -t1:EF02 -n2:4096:+256M -t2:EF00 \ + -n3:0:+{root_size_text}K -t3:8300 {drive_path}' + + run(command) + # update partitons in kernel + sync() + run(f'partprobe {drive_path}') + + partitions: list[str] = partition_list(drive_path) + + disk: DiskDetails = DiskDetails( + name = drive_path, + partition = { + 'efi': next(x for x in partitions if x.endswith('2')), + 'root': next(x for x in partitions if x.endswith('3')) + } + ) + + return disk + + +def partition_list(drive_path: str) -> list[str]: + """Get a list of partitions on a drive + + Args: + drive_path (str): path to a drive + + Returns: + list[str]: a list of partition paths + """ + lsblk: str = cmd(f'lsblk -Jp {drive_path}') + drive_info: dict = json_loads(lsblk) + device: list = drive_info.get('blockdevices') + children: list[str] = device[0].get('children', []) if device else [] + partitions: list[str] = [child.get('name') for child in children] + return partitions + + +def partition_parent(partition_path: str) -> str: + """Get a parent device for a partition + + Args: + partition (str): path to a partition + + Returns: + str: path to a parent device + """ + parent: str = cmd(f'lsblk -ndpo pkname {partition_path}') + return parent + + +def from_partition(partition_path: str) -> DiskDetails: + drive_path: str = partition_parent(partition_path) + partitions: list[str] = partition_list(drive_path) + + disk: DiskDetails = DiskDetails( + name = drive_path, + partition = { + 'efi': next(x for x in partitions if x.endswith('2')), + 'root': next(x for x in partitions if x.endswith('3')) + } + ) + + return disk + +def filesystem_create(partition: str, fstype: str) -> None: + """Create a filesystem on a partition + + Args: + partition (str): path to a partition (for example: '/dev/sda1') + fstype (str): filesystem type ('efi' or 'ext4') + """ + if fstype == 'efi': + command = 'mkfs -t fat -n EFI' + run(f'{command} {partition}') + if fstype == 'ext4': + command = 'mkfs -t ext4 -L persistence' + run(f'{command} {partition}') + + +def partition_mount(partition: str, + path: str, + fsype: str = '', + overlay_params: dict[str, str] = {}) -> None: + """Mount a partition into a path + + Args: + partition (str): path to a partition (for example: '/dev/sda1') + path (str): a path where to mount + fsype (str): optionally, set fstype ('squashfs', 'overlay', 'iso9660') + overlay_params (dict): optionally, set overlay parameters. + Defaults to None. + """ + if fsype in ['squashfs', 'iso9660']: + command: str = f'mount -o loop,ro -t {fsype} {partition} {path}' + if fsype == 'overlay' and overlay_params: + command: str = f'mount -t overlay -o noatime,\ + upperdir={overlay_params["upperdir"]},\ + lowerdir={overlay_params["lowerdir"]},\ + workdir={overlay_params["workdir"]} overlay {path}' + + else: + command = f'mount {partition} {path}' + + run(command) + + +def partition_umount(partition: str = '', path: str = '') -> None: + """Umount a partition by a partition name or a path + + Args: + partition (str): path to a partition (for example: '/dev/sda1') + path (str): a path where a partition is mounted + """ + if partition: + command = f'umount {partition}' + run(command) + if path: + command = f'umount {path}' + run(command) + + +def find_device(mountpoint: str) -> str: + """Find a device by mountpoint + + Returns: + str: Path to device, Empty if not found + """ + mounted_partitions = disk_partitions() + for partition in mounted_partitions: + if partition.mountpoint == mountpoint: + return partition.mountpoint + return '' + + +def disks_size() -> dict[str, int]: + """Get a dictionary with physical disks and their sizes + + Returns: + dict[str, int]: a dictionary with name: size mapping + """ + disks_size: dict[str, int] = {} + lsblk: str = cmd('lsblk -Jbp') + blk_list = json_loads(lsblk) + for device in blk_list.get('blockdevices'): + if device['type'] == 'disk': + disks_size.update({device['name']: device['size']}) + return disks_size diff --git a/python/vyos/system/grub.py b/python/vyos/system/grub.py new file mode 100644 index 000000000..0ac16af9a --- /dev/null +++ b/python/vyos/system/grub.py @@ -0,0 +1,340 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. + +from pathlib import Path +from re import MULTILINE, compile as re_compile +from typing import Union +from uuid import uuid5, NAMESPACE_URL, UUID + +from vyos.template import render +from vyos.utils.process import cmd +from vyos.system import disk + +# Define variables +GRUB_DIR_MAIN: str = '/boot/grub' +GRUB_CFG_MAIN: str = f'{GRUB_DIR_MAIN}/grub.cfg' +GRUB_DIR_VYOS: str = f'{GRUB_DIR_MAIN}/grub.cfg.d' +CFG_VYOS_HEADER: str = f'{GRUB_DIR_VYOS}/00-vyos-header.cfg' +CFG_VYOS_MODULES: str = f'{GRUB_DIR_VYOS}/10-vyos-modules-autoload.cfg' +CFG_VYOS_VARS: str = f'{GRUB_DIR_VYOS}/20-vyos-defaults-autoload.cfg' +CFG_VYOS_COMMON: str = f'{GRUB_DIR_VYOS}/25-vyos-common-autoload.cfg' +CFG_VYOS_PLATFORM: str = f'{GRUB_DIR_VYOS}/30-vyos-platform-autoload.cfg' +CFG_VYOS_MENU: str = f'{GRUB_DIR_VYOS}/40-vyos-menu-autoload.cfg' +CFG_VYOS_OPTIONS: str = f'{GRUB_DIR_VYOS}/50-vyos-options.cfg' +GRUB_DIR_VYOS_VERS: str = f'{GRUB_DIR_VYOS}/vyos-versions' + +TMPL_VYOS_VERSION: str = 'grub/grub_vyos_version.j2' +TMPL_GRUB_VARS: str = 'grub/grub_vars.j2' +TMPL_GRUB_MAIN: str = 'grub/grub_main.j2' +TMPL_GRUB_MENU: str = 'grub/grub_menu.j2' +TMPL_GRUB_MODULES: str = 'grub/grub_modules.j2' +TMPL_GRUB_OPTS: str = 'grub/grub_options.j2' +TMPL_GRUB_COMMON: str = 'grub/grub_common.j2' + +# prepare regexes +REGEX_GRUB_VARS: str = r'^set (?P<variable_name>.+)=[\'"]?(?P<variable_value>.*)(?<![\'"])[\'"]?$' +REGEX_GRUB_MODULES: str = r'^insmod (?P<module_name>.+)$' +REGEX_KERNEL_CMDLINE: str = r'^BOOT_IMAGE=/(?P<boot_type>boot|live)/((?P<image_version>.+)/)?vmlinuz.*$' + + +def install(drive_path: str, boot_dir: str, efi_dir: str, id: str = 'VyOS') -> None: + """Install GRUB for both BIOS and EFI modes (hybrid boot) + + Args: + drive_path (str): path to a drive where GRUB must be installed + boot_dir (str): a path to '/boot' directory + efi_dir (str): a path to '/boot/efi' directory + """ + commands: list[str] = [ + f'grub-install --no-floppy --target=i386-pc --boot-directory={boot_dir} \ + {drive_path} --force', + f'grub-install --no-floppy --recheck --target=x86_64-efi \ + --force-extra-removable --boot-directory={boot_dir} \ + --efi-directory={efi_dir} --bootloader-id="{id}" \ + --no-uefi-secure-boot' + ] + for command in commands: + cmd(command) + + +def gen_version_uuid(version_name: str) -> str: + """Generate unique ID from version name + + Use UUID5 / NAMESPACE_URL with prefix `uuid5-` + + Args: + version_name (str): version name + + Returns: + str: generated unique ID + """ + ver_uuid: UUID = uuid5(NAMESPACE_URL, version_name) + ver_id: str = f'uuid5-{ver_uuid}' + return ver_id + + +def version_add(version_name: str, + root_dir: str = '', + boot_opts: str = '') -> None: + """Add a new VyOS version to GRUB loader configuration + + Args: + vyos_version (str): VyOS version name + root_dir (str): an optional path to the root directory. + Defaults to empty. + boot_opts (str): an optional boot options for Linux kernel. + Defaults to empty. + """ + if not root_dir: + root_dir = disk.find_persistence() + version_config: str = f'{root_dir}/{GRUB_DIR_VYOS_VERS}/{version_name}.cfg' + render( + version_config, TMPL_VYOS_VERSION, { + 'version_name': version_name, + 'version_uuid': gen_version_uuid(version_name), + 'boot_opts': boot_opts + }) + + +def version_del(vyos_version: str, root_dir: str = '') -> None: + """Delete a VyOS version from GRUB loader configuration + + Args: + vyos_version (str): VyOS version name + root_dir (str): an optional path to the root directory. + Defaults to empty. + """ + if not root_dir: + root_dir = disk.find_persistence() + version_config: str = f'{root_dir}/{GRUB_DIR_VYOS_VERS}/{vyos_version}.cfg' + Path(version_config).unlink(missing_ok=True) + + +def version_list(root_dir: str = '') -> list[str]: + """Generate a list with installed VyOS versions + + Args: + root_dir (str): an optional path to the root directory. + Defaults to empty. + + Returns: + list: A list with versions names + """ + if not root_dir: + root_dir = disk.find_persistence() + versions_files = Path(f'{root_dir}/{GRUB_DIR_VYOS_VERS}').glob('*.cfg') + versions_list: list[str] = [] + for file in versions_files: + versions_list.append(file.stem) + return versions_list + + +def read_env(env_file: str = '') -> dict[str, str]: + """Read GRUB environment + + Args: + env_file (str, optional): a path to grub environment file. + Defaults to empty. + + Returns: + dict: dictionary with GRUB environment + """ + if not env_file: + root_dir: str = disk.find_persistence() + env_file = f'{root_dir}/{GRUB_DIR_MAIN}/grubenv' + + env_content: str = cmd(f'grub-editenv {env_file} list').splitlines() + regex_filter = re_compile(r'^(?P<variable_name>.*)=(?P<variable_value>.*)$') + env_dict: dict[str, str] = {} + for env_item in env_content: + search_result = regex_filter.fullmatch(env_item) + if search_result: + search_result_dict: dict[str, str] = search_result.groupdict() + variable_name: str = search_result_dict.get('variable_name', '') + variable_value: str = search_result_dict.get('variable_value', '') + if variable_name and variable_value: + env_dict.update({variable_name: variable_value}) + return env_dict + + +def get_cfg_ver(root_dir: str = '') -> int: + """Get current version of GRUB configuration + + Args: + root_dir (str, optional): an optional path to the root directory. + Defaults to empty. + + Returns: + int: a configuration version + """ + if not root_dir: + root_dir = disk.find_persistence() + + cfg_ver: str = vars_read(f'{root_dir}/{CFG_VYOS_HEADER}').get( + 'VYOS_CFG_VER') + if cfg_ver: + cfg_ver_int: int = int(cfg_ver) + else: + cfg_ver_int: int = 0 + return cfg_ver_int + + +def write_cfg_ver(cfg_ver: int, root_dir: str = '') -> None: + """Write version number of GRUB configuration + + Args: + cfg_ver (int): a version number to write + root_dir (str, optional): an optional path to the root directory. + Defaults to empty. + + Returns: + int: a configuration version + """ + if not root_dir: + root_dir = disk.find_persistence() + + vars_file: str = f'{root_dir}/{CFG_VYOS_HEADER}' + vars_current: dict[str, str] = vars_read(vars_file) + vars_current['VYOS_CFG_VER'] = str(cfg_ver) + vars_write(vars_file, vars_current) + + +def vars_read(grub_cfg: str) -> dict[str, str]: + """Read variables from a GRUB configuration file + + Args: + grub_cfg (str): a path to the GRUB config file + + Returns: + dict: a dictionary with variables and values + """ + vars_dict: dict[str, str] = {} + regex_filter = re_compile(REGEX_GRUB_VARS) + try: + config_text: list[str] = Path(grub_cfg).read_text().splitlines() + except FileNotFoundError: + return vars_dict + for line in config_text: + search_result = regex_filter.fullmatch(line) + if search_result: + search_dict = search_result.groupdict() + variable_name: str = search_dict.get('variable_name', '') + variable_value: str = search_dict.get('variable_value', '') + if variable_name and variable_value: + vars_dict.update({variable_name: variable_value}) + return vars_dict + + +def modules_read(grub_cfg: str) -> list[str]: + """Read modules list from a GRUB configuration file + + Args: + grub_cfg (str): a path to the GRUB config file + + Returns: + list: a list with modules to load + """ + mods_list: list[str] = [] + regex_filter = re_compile(REGEX_GRUB_MODULES, MULTILINE) + try: + config_text = Path(grub_cfg).read_text() + except FileNotFoundError: + return mods_list + mods_list = regex_filter.findall(config_text) + + return mods_list + + +def modules_write(grub_cfg: str, mods_list: list[str]) -> None: + """Write modules list to a GRUB configuration file (overwrite everything) + + Args: + grub_cfg (str): a path to GRUB configuration file + mods_list (list): a list with modules to load + """ + render(grub_cfg, TMPL_GRUB_MODULES, {'mods_list': mods_list}) + + +def vars_write(grub_cfg: str, grub_vars: dict[str, str]) -> None: + """Write variables to a GRUB configuration file (overwrite everything) + + Args: + grub_cfg (str): a path to GRUB configuration file + grub_vars (dict): a dictionary with new variables + """ + render(grub_cfg, TMPL_GRUB_VARS, {'vars': grub_vars}) + + +def set_default(version_name: str, root_dir: str = '') -> None: + """Set version as default boot entry + + Args: + version_name (str): versio name + root_dir (str, optional): an optional path to the root directory. + Defaults to empty. + """ + if not root_dir: + root_dir = disk.find_persistence() + + vars_file = f'{root_dir}/{CFG_VYOS_VARS}' + vars_current = vars_read(vars_file) + vars_current['default'] = gen_version_uuid(version_name) + vars_write(vars_file, vars_current) + + +def common_write(root_dir: str = '', grub_common: dict[str, str] = {}) -> None: + """Write common GRUB configuration file (overwrite everything) + + Args: + root_dir (str, optional): an optional path to the root directory. + Defaults to empty. + """ + if not root_dir: + root_dir = disk.find_persistence() + common_config = f'{root_dir}/{CFG_VYOS_COMMON}' + render(common_config, TMPL_GRUB_COMMON, grub_common) + + +def create_structure(root_dir: str = '') -> None: + """Create GRUB directories structure + + Args: + root_dir (str, optional): an optional path to the root directory. + Defaults to ''. + """ + if not root_dir: + root_dir = disk.find_persistence() + + Path(f'{root_dir}/GRUB_DIR_VYOS_VERS').mkdir(parents=True, exist_ok=True) + + +def set_console_type(console_type: str, root_dir: str = '') -> None: + """Write default console type to GRUB configuration + + Args: + console_type (str): a default console type + root_dir (str, optional): an optional path to the root directory. + Defaults to empty. + """ + if not root_dir: + root_dir = disk.find_persistence() + + vars_file: str = f'{root_dir}/{CFG_VYOS_VARS}' + vars_current: dict[str, str] = vars_read(vars_file) + vars_current['console_type'] = str(console_type) + vars_write(vars_file, vars_current) + +def set_raid(root_dir: str = '') -> None: + pass diff --git a/python/vyos/system/image.py b/python/vyos/system/image.py new file mode 100644 index 000000000..c03ce02d5 --- /dev/null +++ b/python/vyos/system/image.py @@ -0,0 +1,268 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. + +from pathlib import Path +from re import compile as re_compile +from tempfile import TemporaryDirectory +from typing import TypedDict + +from vyos import version +from vyos.system import disk, grub + +# Define variables +GRUB_DIR_MAIN: str = '/boot/grub' +GRUB_DIR_VYOS: str = f'{GRUB_DIR_MAIN}/grub.cfg.d' +CFG_VYOS_VARS: str = f'{GRUB_DIR_VYOS}/20-vyos-defaults-autoload.cfg' +GRUB_DIR_VYOS_VERS: str = f'{GRUB_DIR_VYOS}/vyos-versions' +# prepare regexes +REGEX_KERNEL_CMDLINE: str = r'^BOOT_IMAGE=/(?P<boot_type>boot|live)/((?P<image_version>.+)/)?vmlinuz.*$' +REGEX_SYSTEM_CFG_VER: str = r'(\r\n|\r|\n)SYSTEM_CFG_VER\s*=\s*(?P<cfg_ver>\d+)(\r\n|\r|\n)' + + +# structures definitions +class ImageDetails(TypedDict): + name: str + version: str + tools_version: int + disk_ro: int + disk_rw: int + disk_total: int + + +class BootDetails(TypedDict): + image_default: str + image_running: str + images_available: list[str] + console_type: str + console_num: int + + +def bootmode_detect() -> str: + """Detect system boot mode + + Returns: + str: 'bios' or 'efi' + """ + if Path('/sys/firmware/efi/').exists(): + return 'efi' + else: + return 'bios' + + +def get_image_version(mount_path: str) -> str: + """Extract version name from rootfs mounted at mount_path + + Args: + mount_path (str): mount path of rootfs + + Returns: + str: version name + """ + version_file: str = Path( + f'{mount_path}/opt/vyatta/etc/version').read_text() + version_name: str = version_file.lstrip('Version: ').strip() + + return version_name + + +def get_image_tools_version(mount_path: str) -> int: + """Extract image-tools version from rootfs mounted at mount_path + + Args: + mount_path (str): mount path of rootfs + + Returns: + str: image-tools version + """ + try: + version_file: str = Path( + f'{mount_path}/usr/lib/python3/dist-packages/vyos/system/__init__.py').read_text() + except FileNotFoundError: + system_cfg_ver: int = 0 + else: + res = re_compile(REGEX_SYSTEM_CFG_VER).search(version_file) + system_cfg_ver: int = int(res.groupdict().get('cfg_ver', 0)) + + return system_cfg_ver + + +def get_versions(image_name: str, root_dir: str = '') -> dict[str, str]: + """Return versions of image and image-tools + + Args: + image_name (str): a name of an image + root_dir (str, optional): an optional path to the root directory. + Defaults to ''. + + Returns: + dict[str, int]: a dictionary with versions of image and image-tools + """ + if not root_dir: + root_dir = disk.find_persistence() + + squashfs_file: str = next( + Path(f'{root_dir}/boot/{image_name}').glob('*.squashfs')).as_posix() + with TemporaryDirectory() as squashfs_mounted: + disk.partition_mount(squashfs_file, squashfs_mounted, 'squashfs') + + image_version: str = get_image_version(squashfs_mounted) + image_tools_version: int = get_image_tools_version(squashfs_mounted) + + disk.partition_umount(squashfs_file) + + versions: dict[str, int] = { + 'image': image_version, + 'image-tools': image_tools_version + } + + return versions + + +def get_details(image_name: str, root_dir: str = '') -> ImageDetails: + """Return information about image + + Args: + image_name (str): a name of an image + root_dir (str, optional): an optional path to the root directory. + Defaults to ''. + + Returns: + ImageDetails: a dictionary with details about an image (name, size) + """ + if not root_dir: + root_dir = disk.find_persistence() + + versions = get_versions(image_name, root_dir) + image_version: str = versions.get('image', '') + image_tools_version: int = versions.get('image-tools', 0) + + image_path: Path = Path(f'{root_dir}/boot/{image_name}') + image_path_rw: Path = Path(f'{root_dir}/boot/{image_name}/rw') + + image_disk_ro: int = int() + for item in image_path.iterdir(): + if not item.is_symlink(): + image_disk_ro += item.stat().st_size + + image_disk_rw: int = int() + for item in image_path_rw.rglob('*'): + if not item.is_symlink(): + image_disk_rw += item.stat().st_size + + image_details: ImageDetails = { + 'name': image_name, + 'version': image_version, + 'tools_version': image_tools_version, + 'disk_ro': image_disk_ro, + 'disk_rw': image_disk_rw, + 'disk_total': image_disk_ro + image_disk_rw + } + + return image_details + + +def get_images_details() -> list[ImageDetails]: + """Return information about all images + + Returns: + list[ImageDetails]: a list of dictionaries with details about images + """ + images: list[str] = grub.version_list() + images_details: list[ImageDetails] = list() + for image_name in images: + images_details.append(get_details(image_name)) + + return images_details + + +def get_running_image() -> str: + """Find currently running image name + + Returns: + str: image name + """ + running_image: str = '' + regex_filter = re_compile(REGEX_KERNEL_CMDLINE) + cmdline: str = Path('/proc/cmdline').read_text() + running_image_result = regex_filter.match(cmdline) + if running_image_result: + running_image: str = running_image_result.groupdict().get( + 'image_version', '') + # we need to have a fallback for live systems + if not running_image: + running_image: str = version.get_version() + + return running_image + + +def get_default_image(root_dir: str = '') -> str: + """Get default boot entry + + Args: + root_dir (str, optional): an optional path to the root directory. + Defaults to empty. + Returns: + str: a version name + """ + if not root_dir: + root_dir = disk.find_persistence() + + vars_file: str = f'{root_dir}/{CFG_VYOS_VARS}' + vars_current: dict[str, str] = grub.vars_read(vars_file) + default_uuid: str = vars_current.get('default', '') + if default_uuid: + images_list: list[str] = grub.version_list(root_dir) + for image_name in images_list: + if default_uuid == grub.gen_version_uuid(image_name): + return image_name + return '' + else: + return '' + + +def validate_name(image_name: str) -> bool: + """Validate image name + + Args: + image_name (str): suggested image name + + Returns: + bool: validation result + """ + regex_filter = re_compile(r'^[\w\.+-]{1,32}$') + if regex_filter.match(image_name): + return True + return False + + +def is_live_boot() -> bool: + """Detect live booted system + + Returns: + bool: True if the system currently booted in live mode + """ + regex_filter = re_compile(REGEX_KERNEL_CMDLINE) + cmdline: str = Path('/proc/cmdline').read_text() + running_image_result = regex_filter.match(cmdline) + if running_image_result: + boot_type: str = running_image_result.groupdict().get('boot_type', '') + if boot_type == 'live': + return True + return False + +def is_running_as_container() -> bool: + if Path('/.dockerenv').exists(): + return True + return False diff --git a/python/vyos/system/raid.py b/python/vyos/system/raid.py new file mode 100644 index 000000000..13b99fa69 --- /dev/null +++ b/python/vyos/system/raid.py @@ -0,0 +1,115 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this library. If not, see <http://www.gnu.org/licenses/>. + +"""RAID related functions""" + +from pathlib import Path +from shutil import copy +from dataclasses import dataclass + +from vyos.utils.process import cmd +from vyos.system import disk + + +@dataclass +class RaidDetails: + """RAID type""" + name: str + level: str + members: list[str] + disks: list[disk.DiskDetails] + + +def raid_create(raid_members: list[str], + raid_name: str = 'md0', + raid_level: str = 'raid1') -> None: + """Create a RAID array + + Args: + raid_name (str): a name of array (data, backup, test, etc.) + raid_members (list[str]): a list of array members + raid_level (str, optional): an array level. Defaults to 'raid1'. + """ + raid_devices_num: int = len(raid_members) + raid_members_str: str = ' '.join(raid_members) + if Path('/sys/firmware/efi').exists(): + for part in raid_members: + drive: str = disk.partition_parent(part) + command: str = f'sgdisk --typecode=3:A19D880F-05FC-4D3B-A006-743F0F84911E {drive}' + cmd(command) + else: + for part in raid_members: + drive: str = disk.partition_parent(part) + command: str = f'sgdisk --typecode=3:A19D880F-05FC-4D3B-A006-743F0F84911E {drive}' + cmd(command) + for part in raid_members: + command: str = f'mdadm --zero-superblock {part}' + cmd(command) + command: str = f'mdadm --create /dev/{raid_name} -R --metadata=1.0 \ + --raid-devices={raid_devices_num} --level={raid_level} \ + {raid_members_str}' + + cmd(command) + + raid = RaidDetails( + name = f'/dev/{raid_name}', + level = raid_level, + members = raid_members, + disks = [disk.from_partition(m) for m in raid_members] + ) + + return raid + +def update_initramfs() -> None: + """Update initramfs""" + mdadm_script = '/etc/initramfs-tools/scripts/local-top/mdadm' + copy('/usr/share/initramfs-tools/scripts/local-block/mdadm', mdadm_script) + p = Path(mdadm_script) + p.write_text(p.read_text().replace('$((COUNT + 1))', '20')) + command: str = 'update-initramfs -u' + cmd(command) + +def update_default(target_dir: str) -> None: + """Update /etc/default/mdadm to start MD monitoring daemon at boot + """ + source_mdadm_config = '/etc/default/mdadm' + target_mdadm_config = Path(target_dir).joinpath('/etc/default/mdadm') + target_mdadm_config_dir = Path(target_mdadm_config).parent + Path.mkdir(target_mdadm_config_dir, parents=True, exist_ok=True) + s = Path(source_mdadm_config).read_text().replace('START_DAEMON=false', + 'START_DAEMON=true') + Path(target_mdadm_config).write_text(s) + +def get_uuid(device: str) -> str: + """Get UUID of a device""" + command: str = f'tune2fs -l {device}' + l = cmd(command).splitlines() + uuid = next((x for x in l if x.startswith('Filesystem UUID')), '') + return uuid.split(':')[1].strip() if uuid else '' + +def get_uuids(raid_details: RaidDetails) -> tuple[str]: + """Get UUIDs of RAID members + + Args: + raid_name (str): a name of array (data, backup, test, etc.) + + Returns: + tuple[str]: root_disk uuid, root_md uuid + """ + raid_name: str = raid_details.name + root_partition: str = raid_details.members[0] + uuid_root_disk: str = get_uuid(root_partition) + uuid_root_md: str = get_uuid(raid_name) + return uuid_root_disk, uuid_root_md diff --git a/python/vyos/template.py b/python/vyos/template.py index c778d0de8..1e683b605 100644 --- a/python/vyos/template.py +++ b/python/vyos/template.py @@ -579,10 +579,10 @@ def nft_rule(rule_conf, fw_hook, fw_name, rule_id, ip_name='ip'): return parse_rule(rule_conf, fw_hook, fw_name, rule_id, ip_name) @register_filter('nft_default_rule') -def nft_default_rule(fw_conf, fw_name, ipv6=False): +def nft_default_rule(fw_conf, fw_name, family): output = ['counter'] default_action = fw_conf['default_action'] - family = 'ipv6' if ipv6 else 'ipv4' + #family = 'ipv6' if ipv6 else 'ipv4' if 'enable_default_log' in fw_conf: action_suffix = default_action[:1].upper() @@ -592,7 +592,7 @@ def nft_default_rule(fw_conf, fw_name, ipv6=False): output.append(f'{default_action}') if 'default_jump_target' in fw_conf: target = fw_conf['default_jump_target'] - def_suffix = '6' if ipv6 else '' + def_suffix = '6' if family == 'ipv6' else '' output.append(f'NAME{def_suffix}_{target}') output.append(f'comment "{fw_name} default-action {default_action}"') diff --git a/python/vyos/utils/convert.py b/python/vyos/utils/convert.py index 9a8a1ff7d..c02f0071e 100644 --- a/python/vyos/utils/convert.py +++ b/python/vyos/utils/convert.py @@ -52,7 +52,8 @@ def seconds_to_human(s, separator=""): return result -def bytes_to_human(bytes, initial_exponent=0, precision=2): +def bytes_to_human(bytes, initial_exponent=0, precision=2, + int_below_exponent=0): """ Converts a value in bytes to a human-readable size string like 640 KB The initial_exponent parameter is the exponent of 2, @@ -68,6 +69,8 @@ def bytes_to_human(bytes, initial_exponent=0, precision=2): # log2 is a float, while range checking requires an int exponent = int(log2(bytes)) + if exponent < int_below_exponent: + precision = 0 if exponent < 10: value = bytes diff --git a/python/vyos/utils/file.py b/python/vyos/utils/file.py index 667a2464b..9f27a7fb9 100644 --- a/python/vyos/utils/file.py +++ b/python/vyos/utils/file.py @@ -134,6 +134,12 @@ def chmod_755(path): S_IROTH | S_IXOTH chmod(path, bitmask) +def chmod_2775(path): + """ user/group permissions with set-group-id bit set """ + from stat import S_ISGID, S_IRWXU, S_IRWXG, S_IROTH, S_IXOTH + + bitmask = S_ISGID | S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH + chmod(path, bitmask) def makedir(path, user=None, group=None): if os.path.exists(path): diff --git a/python/vyos/utils/io.py b/python/vyos/utils/io.py index 5fffa62f8..74099b502 100644 --- a/python/vyos/utils/io.py +++ b/python/vyos/utils/io.py @@ -13,6 +13,8 @@ # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see <http://www.gnu.org/licenses/>. +from typing import Callable + def print_error(str='', end='\n'): """ Print `str` to stderr, terminated with `end`. @@ -62,3 +64,36 @@ def ask_yes_no(question, default=False) -> bool: stdout.write("Please respond with yes/y or no/n\n") except EOFError: stdout.write("\nPlease respond with yes/y or no/n\n") + +def is_interactive(): + """Try to determine if the routine was called from an interactive shell.""" + import os, sys + return os.getenv('TERM', default=False) and sys.stderr.isatty() and sys.stdout.isatty() + +def is_dumb_terminal(): + """Check if the current TTY is dumb, so that we can disable advanced terminal features.""" + import os + return os.getenv('TERM') in ['vt100', 'dumb'] + +def select_entry(l: list, list_msg: str = '', prompt_msg: str = '', + list_format: Callable = None,) -> str: + """Select an entry from a list + + Args: + l (list): a list of entries + list_msg (str): a message to print before listing the entries + prompt_msg (str): a message to print as prompt for selection + + Returns: + str: a selected entry + """ + en = list(enumerate(l, 1)) + print(list_msg) + for i, e in en: + if list_format: + print(f'\t{i}: {list_format(e)}') + else: + print(f'\t{i}: {e}') + select = ask_input(prompt_msg, numeric_only=True, + valid_responses=range(1, len(l)+1)) + return next(filter(lambda x: x[0] == select, en))[1] diff --git a/python/vyos/utils/network.py b/python/vyos/utils/network.py index 5d19f256b..2a0808fca 100644 --- a/python/vyos/utils/network.py +++ b/python/vyos/utils/network.py @@ -61,14 +61,17 @@ def get_vrf_members(vrf: str) -> list: """ import json from vyos.utils.process import cmd - if not interface_exists(vrf): - raise ValueError(f'VRF "{vrf}" does not exist!') - output = cmd(f'ip --json --brief link show master {vrf}') - answer = json.loads(output) interfaces = [] - for data in answer: - if 'ifname' in data: - interfaces.append(data.get('ifname')) + try: + if not interface_exists(vrf): + raise ValueError(f'VRF "{vrf}" does not exist!') + output = cmd(f'ip --json --brief link show vrf {vrf}') + answer = json.loads(output) + for data in answer: + if 'ifname' in data: + interfaces.append(data.get('ifname')) + except: + pass return interfaces def get_interface_vrf(interface): @@ -483,3 +486,36 @@ def get_vxlan_vlan_tunnels(interface: str) -> list: os_configured_vlan_ids.append(str(vlanStart)) return os_configured_vlan_ids + +def get_vxlan_vni_filter(interface: str) -> list: + """ Return a list of strings with VNIs configured in the Kernel""" + from json import loads + from vyos.utils.process import cmd + + if not interface.startswith('vxlan'): + raise ValueError('Only applicable for VXLAN interfaces!') + + # Determine current OS Kernel configured VNI filters in VXLAN interface + # + # $ bridge -j vni show dev vxlan1 + # [{"ifname":"vxlan1","vnis":[{"vni":100},{"vni":200},{"vni":300,"vniEnd":399}]}] + # + # Example output: ['10010', '10020', '10021', '10022'] + os_configured_vnis = [] + tmp = loads(cmd(f'bridge --json vni show dev {interface}')) + if tmp: + for tunnel in tmp[0].get('vnis', {}): + vniStart = tunnel['vni'] + if 'vniEnd' in tunnel: + vniEnd = tunnel['vniEnd'] + # Build a real list for user VNIs + vni_list = list(range(vniStart, vniEnd +1)) + # Convert list of integers to list or strings + os_configured_vnis.extend(map(str, vni_list)) + # Proceed with next tunnel - this one is complete + continue + + # Add single tunel id - not part of a range + os_configured_vnis.append(str(vniStart)) + + return os_configured_vnis |