diff options
Diffstat (limited to 'python/vyos')
-rw-r--r-- | python/vyos/configverify.py | 17 | ||||
-rw-r--r-- | python/vyos/ethtool.py | 88 | ||||
-rwxr-xr-x | python/vyos/firewall.py | 27 | ||||
-rw-r--r-- | python/vyos/frr.py | 2 | ||||
-rw-r--r-- | python/vyos/ifconfig/bond.py | 3 | ||||
-rw-r--r-- | python/vyos/ifconfig/ethernet.py | 3 | ||||
-rw-r--r-- | python/vyos/ifconfig/interface.py | 69 | ||||
-rw-r--r-- | python/vyos/ifconfig/wireguard.py | 110 | ||||
-rw-r--r-- | python/vyos/nat.py | 10 | ||||
-rw-r--r-- | python/vyos/system/grub.py | 2 | ||||
-rw-r--r-- | python/vyos/utils/boot.py | 6 | ||||
-rw-r--r-- | python/vyos/utils/convert.py | 62 | ||||
-rw-r--r-- | python/vyos/utils/system.py | 8 |
13 files changed, 290 insertions, 117 deletions
diff --git a/python/vyos/configverify.py b/python/vyos/configverify.py index 59b67300d..92996f2ee 100644 --- a/python/vyos/configverify.py +++ b/python/vyos/configverify.py @@ -520,3 +520,20 @@ def verify_pki_dh_parameters(config: dict, dh_name: str, min_key_size: int=0): dh_bits = dh_numbers.p.bit_length() if dh_bits < min_key_size: raise ConfigError(f'Minimum DH key-size is {min_key_size} bits!') + +def verify_eapol(config: dict): + """ + Common helper function used by interface implementations to perform + recurring validation of EAPoL configuration. + """ + if 'eapol' not in config: + return + + if 'certificate' not in config['eapol']: + raise ConfigError('Certificate must be specified when using EAPoL!') + + verify_pki_certificate(config, config['eapol']['certificate'], no_password_protected=True) + + if 'ca_certificate' in config['eapol']: + for ca_cert in config['eapol']['ca_certificate']: + verify_pki_ca_certificate(config, ca_cert) diff --git a/python/vyos/ethtool.py b/python/vyos/ethtool.py index 80bb56fa2..21272cc5b 100644 --- a/python/vyos/ethtool.py +++ b/python/vyos/ethtool.py @@ -56,11 +56,8 @@ class Ethtool: # '100' : {'full': '', 'half': ''}, # '1000': {'full': ''} # } - _speed_duplex = {'auto': {'auto': ''}} _ring_buffer = None _driver_name = None - _auto_negotiation = False - _auto_negotiation_supported = None _flow_control = None def __init__(self, ifname): @@ -74,56 +71,51 @@ class Ethtool: self._driver_name = driver.group(1) # Build a dictinary of supported link-speed and dupley settings. - out, _ = popen(f'ethtool {ifname}') - reading = False - pattern = re.compile(r'\d+base.*') - for line in out.splitlines()[1:]: - line = line.lstrip() - if 'Supported link modes:' in line: - reading = True - if 'Supported pause frame use:' in line: - reading = False - if reading: - for block in line.split(): - if pattern.search(block): - speed = block.split('base')[0] - duplex = block.split('/')[-1].lower() - if speed not in self._speed_duplex: - self._speed_duplex.update({ speed : {}}) - if duplex not in self._speed_duplex[speed]: - self._speed_duplex[speed].update({ duplex : ''}) - if 'Supports auto-negotiation:' in line: - # Split the following string: Auto-negotiation: off - # we are only interested in off or on - tmp = line.split()[-1] - self._auto_negotiation_supported = bool(tmp == 'Yes') - # Only read in if Auto-negotiation is supported - if self._auto_negotiation_supported and 'Auto-negotiation:' in line: - # Split the following string: Auto-negotiation: off - # we are only interested in off or on - tmp = line.split()[-1] - self._auto_negotiation = bool(tmp == 'on') + # [ { + # "ifname": "eth0", + # "supported-ports": [ "TP" ], + # "supported-link-modes": [ "10baseT/Half","10baseT/Full","100baseT/Half","100baseT/Full","1000baseT/Full" ], + # "supported-pause-frame-use": "Symmetric", + # "supports-auto-negotiation": true, + # "supported-fec-modes": [ ], + # "advertised-link-modes": [ "10baseT/Half","10baseT/Full","100baseT/Half","100baseT/Full","1000baseT/Full" ], + # "advertised-pause-frame-use": "Symmetric", + # "advertised-auto-negotiation": true, + # "advertised-fec-modes": [ ], + # "speed": 1000, + # "duplex": "Full", + # "auto-negotiation": false, + # "port": "Twisted Pair", + # "phyad": 1, + # "transceiver": "internal", + # "supports-wake-on": "pumbg", + # "wake-on": "g", + # "current-message-level": 7, + # "link-detected": true + # } ] + out, _ = popen(f'ethtool --json {ifname}') + self._base_settings = loads(out)[0] # Now populate driver features out, _ = popen(f'ethtool --json --show-features {ifname}') - self._features = loads(out) + self._features = loads(out)[0] # Get information about NIC ring buffers out, _ = popen(f'ethtool --json --show-ring {ifname}') - self._ring_buffer = loads(out) + self._ring_buffer = loads(out)[0] # Get current flow control settings, but this is not supported by # all NICs (e.g. vmxnet3 does not support is) out, err = popen(f'ethtool --json --show-pause {ifname}') if not bool(err): - self._flow_control = loads(out) + self._flow_control = loads(out)[0] def check_auto_negotiation_supported(self): """ Check if the NIC supports changing auto-negotiation """ - return self._auto_negotiation_supported + return self._base_settings['supports-auto-negotiation'] def get_auto_negotiation(self): - return self._auto_negotiation_supported and self._auto_negotiation + return self._base_settings['supports-auto-negotiation'] and self._base_settings['auto-negotiation'] def get_driver_name(self): return self._driver_name @@ -137,9 +129,9 @@ class Ethtool: """ active = False fixed = True - if feature in self._features[0]: - active = bool(self._features[0][feature]['active']) - fixed = bool(self._features[0][feature]['fixed']) + if feature in self._features: + active = bool(self._features[feature]['active']) + fixed = bool(self._features[feature]['fixed']) return active, fixed def get_generic_receive_offload(self): @@ -165,14 +157,14 @@ class Ethtool: # thus when it's impossible return None if rx_tx not in ['rx', 'tx']: ValueError('Ring-buffer type must be either "rx" or "tx"') - return str(self._ring_buffer[0].get(f'{rx_tx}-max', None)) + return str(self._ring_buffer.get(f'{rx_tx}-max', None)) def get_ring_buffer(self, rx_tx): # Configuration of RX/TX ring-buffers is not supported on every device, # thus when it's impossible return None if rx_tx not in ['rx', 'tx']: ValueError('Ring-buffer type must be either "rx" or "tx"') - return str(self._ring_buffer[0].get(rx_tx, None)) + return str(self._ring_buffer.get(rx_tx, None)) def check_speed_duplex(self, speed, duplex): """ Check if the passed speed and duplex combination is supported by @@ -184,12 +176,16 @@ class Ethtool: if duplex not in ['auto', 'full', 'half']: raise ValueError(f'Value "{duplex}" for duplex is invalid!') + if speed == 'auto' and duplex == 'auto': + return True + if self.get_driver_name() in _drivers_without_speed_duplex_flow: return False - if speed in self._speed_duplex: - if duplex in self._speed_duplex[speed]: - return True + # ['10baset/half', '10baset/full', '100baset/half', '100baset/full', '1000baset/full'] + tmp = [x.lower() for x in self._base_settings['supported-link-modes']] + if f'{speed}baset/{duplex}' in tmp: + return True return False def check_flow_control(self): @@ -201,4 +197,4 @@ class Ethtool: raise ValueError('Interface does not support changing '\ 'flow-control settings!') - return 'on' if bool(self._flow_control[0]['autonegotiate']) else 'off' + return 'on' if bool(self._flow_control['autonegotiate']) else 'off' diff --git a/python/vyos/firewall.py b/python/vyos/firewall.py index b9439d42b..34d0b73f6 100755 --- a/python/vyos/firewall.py +++ b/python/vyos/firewall.py @@ -156,6 +156,20 @@ def parse_rule(rule_conf, hook, fw_name, rule_id, ip_name): proto = '{tcp, udp}' output.append(f'meta l4proto {operator} {proto}') + if 'ethernet_type' in rule_conf: + ether_type_mapping = { + '802.1q': '8021q', + '802.1ad': '8021ad', + 'ipv6': 'ip6', + 'ipv4': 'ip', + 'arp': 'arp' + } + ether_type = rule_conf['ethernet_type'] + operator = '!=' if ether_type.startswith('!') else '' + ether_type = ether_type.lstrip('!') + ether_type = ether_type_mapping.get(ether_type, ether_type) + output.append(f'ether type {operator} {ether_type}') + for side in ['destination', 'source']: if side in rule_conf: prefix = side[0] @@ -487,6 +501,19 @@ def parse_rule(rule_conf, hook, fw_name, rule_id, ip_name): output.append(f'vlan id {rule_conf["vlan"]["id"]}') if 'priority' in rule_conf['vlan']: output.append(f'vlan pcp {rule_conf["vlan"]["priority"]}') + if 'ethernet_type' in rule_conf['vlan']: + ether_type_mapping = { + '802.1q': '8021q', + '802.1ad': '8021ad', + 'ipv6': 'ip6', + 'ipv4': 'ip', + 'arp': 'arp' + } + ether_type = rule_conf['vlan']['ethernet_type'] + operator = '!=' if ether_type.startswith('!') else '' + ether_type = ether_type.lstrip('!') + ether_type = ether_type_mapping.get(ether_type, ether_type) + output.append(f'vlan type {operator} {ether_type}') if 'log' in rule_conf: action = rule_conf['action'] if 'action' in rule_conf else 'accept' diff --git a/python/vyos/frr.py b/python/vyos/frr.py index e7743e9d5..6fb81803f 100644 --- a/python/vyos/frr.py +++ b/python/vyos/frr.py @@ -87,7 +87,7 @@ LOG.addHandler(ch) LOG.addHandler(ch2) _frr_daemons = ['zebra', 'staticd', 'bgpd', 'ospfd', 'ospf6d', 'ripd', 'ripngd', - 'isisd', 'pimd', 'pim6d', 'ldpd', 'eigrpd', 'babeld', 'bfdd'] + 'isisd', 'pimd', 'pim6d', 'ldpd', 'eigrpd', 'babeld', 'bfdd', 'fabricd'] path_vtysh = '/usr/bin/vtysh' path_frr_reload = '/usr/lib/frr/frr-reload.py' diff --git a/python/vyos/ifconfig/bond.py b/python/vyos/ifconfig/bond.py index b8ea90049..8ba481728 100644 --- a/python/vyos/ifconfig/bond.py +++ b/python/vyos/ifconfig/bond.py @@ -504,3 +504,6 @@ class BondIf(Interface): # call base class first super().update(config) + + # enable/disable EAPoL (Extensible Authentication Protocol over Local Area Network) + self.set_eapol() diff --git a/python/vyos/ifconfig/ethernet.py b/python/vyos/ifconfig/ethernet.py index 8d96c863f..61da7b74b 100644 --- a/python/vyos/ifconfig/ethernet.py +++ b/python/vyos/ifconfig/ethernet.py @@ -452,3 +452,6 @@ class EthernetIf(Interface): # call base class last super().update(config) + + # enable/disable EAPoL (Extensible Authentication Protocol over Local Area Network) + self.set_eapol() diff --git a/python/vyos/ifconfig/interface.py b/python/vyos/ifconfig/interface.py index 72d3d3afe..002d3da9e 100644 --- a/python/vyos/ifconfig/interface.py +++ b/python/vyos/ifconfig/interface.py @@ -32,6 +32,12 @@ from vyos.configdict import list_diff from vyos.configdict import dict_merge from vyos.configdict import get_vlan_ids from vyos.defaults import directories +from vyos.pki import find_chain +from vyos.pki import encode_certificate +from vyos.pki import load_certificate +from vyos.pki import wrap_private_key +from vyos.template import is_ipv4 +from vyos.template import is_ipv6 from vyos.template import render from vyos.utils.network import mac2eui64 from vyos.utils.dict import dict_search @@ -41,9 +47,8 @@ from vyos.utils.network import get_vrf_tableid from vyos.utils.network import is_netns_interface from vyos.utils.process import is_systemd_service_active from vyos.utils.process import run -from vyos.template import is_ipv4 -from vyos.template import is_ipv6 from vyos.utils.file import read_file +from vyos.utils.file import write_file from vyos.utils.network import is_intf_addr_assigned from vyos.utils.network import is_ipv6_link_local from vyos.utils.assertion import assert_boolean @@ -52,7 +57,6 @@ from vyos.utils.assertion import assert_mac from vyos.utils.assertion import assert_mtu from vyos.utils.assertion import assert_positive from vyos.utils.assertion import assert_range - from vyos.ifconfig.control import Control from vyos.ifconfig.vrrp import VRRP from vyos.ifconfig.operational import Operational @@ -377,6 +381,9 @@ class Interface(Control): >>> i = Interface('eth0') >>> i.remove() """ + # Stop WPA supplicant if EAPoL was in use + if is_systemd_service_active(f'wpa_supplicant-wired@{self.ifname}'): + self._cmd(f'systemctl stop wpa_supplicant-wired@{self.ifname}') # remove all assigned IP addresses from interface - this is a bit redundant # as the kernel will remove all addresses on interface deletion, but we @@ -1522,6 +1529,61 @@ class Interface(Control): return None self.set_interface('per_client_thread', enable) + def set_eapol(self) -> None: + """ Take care about EAPoL supplicant daemon """ + + # XXX: wpa_supplicant works on the source interface + cfg_dir = '/run/wpa_supplicant' + wpa_supplicant_conf = f'{cfg_dir}/{self.ifname}.conf' + eapol_action='stop' + + if 'eapol' in self.config: + # The default is a fallback to hw_id which is not present for any interface + # other then an ethernet interface. Thus we emulate hw_id by reading back the + # Kernel assigned MAC address + if 'hw_id' not in self.config: + self.config['hw_id'] = read_file(f'/sys/class/net/{self.ifname}/address') + render(wpa_supplicant_conf, 'ethernet/wpa_supplicant.conf.j2', self.config) + + cert_file_path = os.path.join(cfg_dir, f'{self.ifname}_cert.pem') + cert_key_path = os.path.join(cfg_dir, f'{self.ifname}_cert.key') + + cert_name = self.config['eapol']['certificate'] + pki_cert = self.config['pki']['certificate'][cert_name] + + loaded_pki_cert = load_certificate(pki_cert['certificate']) + loaded_ca_certs = {load_certificate(c['certificate']) + for c in self.config['pki']['ca'].values()} if 'ca' in self.config['pki'] else {} + + cert_full_chain = find_chain(loaded_pki_cert, loaded_ca_certs) + + write_file(cert_file_path, + '\n'.join(encode_certificate(c) for c in cert_full_chain)) + write_file(cert_key_path, wrap_private_key(pki_cert['private']['key'])) + + if 'ca_certificate' in self.config['eapol']: + ca_cert_file_path = os.path.join(cfg_dir, f'{self.ifname}_ca.pem') + ca_chains = [] + + for ca_cert_name in self.config['eapol']['ca_certificate']: + pki_ca_cert = self.config['pki']['ca'][ca_cert_name] + loaded_ca_cert = load_certificate(pki_ca_cert['certificate']) + ca_full_chain = find_chain(loaded_ca_cert, loaded_ca_certs) + ca_chains.append( + '\n'.join(encode_certificate(c) for c in ca_full_chain)) + + write_file(ca_cert_file_path, '\n'.join(ca_chains)) + + eapol_action='reload-or-restart' + + # start/stop WPA supplicant service + self._cmd(f'systemctl {eapol_action} wpa_supplicant-wired@{self.ifname}') + + if 'eapol' not in self.config: + # delete configuration on interface removal + if os.path.isfile(wpa_supplicant_conf): + os.unlink(wpa_supplicant_conf) + def update(self, config): """ General helper function which works on a dictionary retrived by get_config_dict(). It's main intention is to consolidate the scattered @@ -1609,7 +1671,6 @@ class Interface(Control): tmp = get_interface_config(config['ifname']) if 'master' in tmp and tmp['master'] != bridge_if: self.set_vrf('') - else: self.set_vrf(config.get('vrf', '')) diff --git a/python/vyos/ifconfig/wireguard.py b/python/vyos/ifconfig/wireguard.py index 5b5f25323..9030b1302 100644 --- a/python/vyos/ifconfig/wireguard.py +++ b/python/vyos/ifconfig/wireguard.py @@ -26,6 +26,7 @@ from vyos.ifconfig import Interface from vyos.ifconfig import Operational from vyos.template import is_ipv6 + class WireGuardOperational(Operational): def _dump(self): """Dump wireguard data in a python friendly way.""" @@ -54,7 +55,17 @@ class WireGuardOperational(Operational): } else: # We are entering a peer - device, public_key, preshared_key, endpoint, allowed_ips, latest_handshake, transfer_rx, transfer_tx, persistent_keepalive = items + ( + device, + public_key, + preshared_key, + endpoint, + allowed_ips, + latest_handshake, + transfer_rx, + transfer_tx, + persistent_keepalive, + ) = items if allowed_ips == '(none)': allowed_ips = [] else: @@ -72,75 +83,78 @@ class WireGuardOperational(Operational): def show_interface(self): from vyos.config import Config + c = Config() wgdump = self._dump().get(self.config['ifname'], None) - c.set_level(["interfaces", "wireguard", self.config['ifname']]) - description = c.return_effective_value(["description"]) - ips = c.return_effective_values(["address"]) + c.set_level(['interfaces', 'wireguard', self.config['ifname']]) + description = c.return_effective_value(['description']) + ips = c.return_effective_values(['address']) - answer = "interface: {}\n".format(self.config['ifname']) - if (description): - answer += " description: {}\n".format(description) - if (ips): - answer += " address: {}\n".format(", ".join(ips)) + answer = 'interface: {}\n'.format(self.config['ifname']) + if description: + answer += ' description: {}\n'.format(description) + if ips: + answer += ' address: {}\n'.format(', '.join(ips)) - answer += " public key: {}\n".format(wgdump['public_key']) - answer += " private key: (hidden)\n" - answer += " listening port: {}\n".format(wgdump['listen_port']) - answer += "\n" + answer += ' public key: {}\n'.format(wgdump['public_key']) + answer += ' private key: (hidden)\n' + answer += ' listening port: {}\n'.format(wgdump['listen_port']) + answer += '\n' - for peer in c.list_effective_nodes(["peer"]): + for peer in c.list_effective_nodes(['peer']): if wgdump['peers']: - pubkey = c.return_effective_value(["peer", peer, "public_key"]) + pubkey = c.return_effective_value(['peer', peer, 'public-key']) if pubkey in wgdump['peers']: wgpeer = wgdump['peers'][pubkey] - answer += " peer: {}\n".format(peer) - answer += " public key: {}\n".format(pubkey) + answer += ' peer: {}\n'.format(peer) + answer += ' public key: {}\n'.format(pubkey) """ figure out if the tunnel is recently active or not """ - status = "inactive" - if (wgpeer['latest_handshake'] is None): + status = 'inactive' + if wgpeer['latest_handshake'] is None: """ no handshake ever """ - status = "inactive" + status = 'inactive' else: if int(wgpeer['latest_handshake']) > 0: - delta = timedelta(seconds=int( - time.time() - wgpeer['latest_handshake'])) - answer += " latest handshake: {}\n".format(delta) - if (time.time() - int(wgpeer['latest_handshake']) < (60*5)): + delta = timedelta( + seconds=int(time.time() - wgpeer['latest_handshake']) + ) + answer += ' latest handshake: {}\n'.format(delta) + if time.time() - int(wgpeer['latest_handshake']) < (60 * 5): """ Five minutes and the tunnel is still active """ - status = "active" + status = 'active' else: """ it's been longer than 5 minutes """ - status = "inactive" + status = 'inactive' elif int(wgpeer['latest_handshake']) == 0: """ no handshake ever """ - status = "inactive" - answer += " status: {}\n".format(status) + status = 'inactive' + answer += ' status: {}\n'.format(status) if wgpeer['endpoint'] is not None: - answer += " endpoint: {}\n".format(wgpeer['endpoint']) + answer += ' endpoint: {}\n'.format(wgpeer['endpoint']) if wgpeer['allowed_ips'] is not None: - answer += " allowed ips: {}\n".format( - ",".join(wgpeer['allowed_ips']).replace(",", ", ")) + answer += ' allowed ips: {}\n'.format( + ','.join(wgpeer['allowed_ips']).replace(',', ', ') + ) if wgpeer['transfer_rx'] > 0 or wgpeer['transfer_tx'] > 0: - rx_size = size( - wgpeer['transfer_rx'], system=alternative) - tx_size = size( - wgpeer['transfer_tx'], system=alternative) - answer += " transfer: {} received, {} sent\n".format( - rx_size, tx_size) + rx_size = size(wgpeer['transfer_rx'], system=alternative) + tx_size = size(wgpeer['transfer_tx'], system=alternative) + answer += ' transfer: {} received, {} sent\n'.format( + rx_size, tx_size + ) if wgpeer['persistent_keepalive'] is not None: - answer += " persistent keepalive: every {} seconds\n".format( - wgpeer['persistent_keepalive']) + answer += ' persistent keepalive: every {} seconds\n'.format( + wgpeer['persistent_keepalive'] + ) answer += '\n' - return answer + super().formated_stats() + return answer @Interface.register @@ -151,27 +165,29 @@ class WireGuardIf(Interface): **Interface.definition, **{ 'section': 'wireguard', - 'prefixes': ['wg', ], + 'prefixes': [ + 'wg', + ], 'bridgeable': False, - } + }, } def get_mac(self): - """ Get a synthetic MAC address. """ + """Get a synthetic MAC address.""" return self.get_mac_synthetic() def update(self, config): - """ General helper function which works on a dictionary retrived by + """General helper function which works on a dictionary retrived by get_config_dict(). It's main intention is to consolidate the scattered interface setup code and provide a single point of entry when workin - on any interface. """ + on any interface.""" tmp_file = NamedTemporaryFile('w') tmp_file.write(config['private_key']) tmp_file.flush() # Wireguard base command is identical for every peer - base_cmd = 'wg set {ifname}' + base_cmd = 'wg set {ifname}' if 'port' in config: base_cmd += ' listen-port {port}' if 'fwmark' in config: @@ -201,7 +217,7 @@ class WireGuardIf(Interface): cmd += f' preshared-key {psk_file}' # Persistent keepalive is optional - if 'persistent_keepalive'in peer_config: + if 'persistent_keepalive' in peer_config: cmd += ' persistent-keepalive {persistent_keepalive}' # Multiple allowed-ip ranges can be defined - ensure we are always diff --git a/python/vyos/nat.py b/python/vyos/nat.py index 4fe21ef13..29f8e961b 100644 --- a/python/vyos/nat.py +++ b/python/vyos/nat.py @@ -199,7 +199,10 @@ def parse_nat_rule(rule_conf, rule_id, nat_type, ipv6=False): if group_name[0] == '!': operator = '!=' group_name = group_name[1:] - output.append(f'{ip_prefix} {prefix}addr {operator} @A_{group_name}') + if ipv6: + output.append(f'{ip_prefix} {prefix}addr {operator} @A6_{group_name}') + else: + output.append(f'{ip_prefix} {prefix}addr {operator} @A_{group_name}') # Generate firewall group domain-group elif 'domain_group' in group and not (ignore_type_addr and target == nat_type): group_name = group['domain_group'] @@ -214,7 +217,10 @@ def parse_nat_rule(rule_conf, rule_id, nat_type, ipv6=False): if group_name[0] == '!': operator = '!=' group_name = group_name[1:] - output.append(f'{ip_prefix} {prefix}addr {operator} @N_{group_name}') + if ipv6: + output.append(f'{ip_prefix} {prefix}addr {operator} @N6_{group_name}') + else: + output.append(f'{ip_prefix} {prefix}addr {operator} @N_{group_name}') if 'mac_group' in group: group_name = group['mac_group'] operator = '' diff --git a/python/vyos/system/grub.py b/python/vyos/system/grub.py index daddb799a..de8303ee2 100644 --- a/python/vyos/system/grub.py +++ b/python/vyos/system/grub.py @@ -82,7 +82,7 @@ def install(drive_path: str, boot_dir: str, efi_dir: str, id: str = 'VyOS', chro f'{chroot_cmd} grub-install --no-floppy --recheck --target={efi_installation_arch}-efi \ --force-extra-removable --boot-directory={boot_dir} \ --efi-directory={efi_dir} --bootloader-id="{id}" \ - --no-uefi-secure-boot' + --uefi-secure-boot' ) diff --git a/python/vyos/utils/boot.py b/python/vyos/utils/boot.py index 3aecbec64..708bef14d 100644 --- a/python/vyos/utils/boot.py +++ b/python/vyos/utils/boot.py @@ -1,4 +1,4 @@ -# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# Copyright 2023-2024 VyOS maintainers and contributors <maintainers@vyos.io> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -33,3 +33,7 @@ def boot_configuration_success() -> bool: if int(res) == 0: return True return False + +def is_uefi_system() -> bool: + efi_fw_dir = '/sys/firmware/efi' + return os.path.exists(efi_fw_dir) and os.path.isdir(efi_fw_dir) diff --git a/python/vyos/utils/convert.py b/python/vyos/utils/convert.py index 41e65081f..dd4266f57 100644 --- a/python/vyos/utils/convert.py +++ b/python/vyos/utils/convert.py @@ -12,41 +12,72 @@ # # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see <http://www.gnu.org/licenses/>. +import re + +# Define the number of seconds in each time unit +time_units = { + 'y': 60 * 60 * 24 * 365.25, # year + 'w': 60 * 60 * 24 * 7, # week + 'd': 60 * 60 * 24, # day + 'h': 60 * 60, # hour + 'm': 60, # minute + 's': 1 # second +} + + +def human_to_seconds(time_str): + """ Converts a human-readable interval such as 1w4d18h35m59s + to number of seconds + """ + + time_patterns = { + 'y': r'(\d+)\s*y', + 'w': r'(\d+)\s*w', + 'd': r'(\d+)\s*d', + 'h': r'(\d+)\s*h', + 'm': r'(\d+)\s*m', + 's': r'(\d+)\s*s' + } + + total_seconds = 0 + + for unit, pattern in time_patterns.items(): + match = re.search(pattern, time_str) + if match: + value = int(match.group(1)) + total_seconds += value * time_units[unit] + + return int(total_seconds) + def seconds_to_human(s, separator=""): """ Converts number of seconds passed to a human-readable interval such as 1w4d18h35m59s """ s = int(s) - - year = 60 * 60 * 24 * 365.25 - week = 60 * 60 * 24 * 7 - day = 60 * 60 * 24 - hour = 60 * 60 - result = [] - years = s // year + years = s // time_units['y'] if years > 0: result.append(f'{int(years)}y') - s = int(s % year) + s = int(s % time_units['y']) - weeks = s // week + weeks = s // time_units['w'] if weeks > 0: result.append(f'{weeks}w') - s = s % week + s = s % time_units['w'] - days = s // day + days = s // time_units['d'] if days > 0: result.append(f'{days}d') - s = s % day + s = s % time_units['d'] - hours = s // hour + hours = s // time_units['h'] if hours > 0: result.append(f'{hours}h') - s = s % hour + s = s % time_units['h'] - minutes = s // 60 + minutes = s // time_units['m'] if minutes > 0: result.append(f'{minutes}m') s = s % 60 @@ -57,6 +88,7 @@ def seconds_to_human(s, separator=""): return separator.join(result) + def bytes_to_human(bytes, initial_exponent=0, precision=2, int_below_exponent=0): """ Converts a value in bytes to a human-readable size string like 640 KB diff --git a/python/vyos/utils/system.py b/python/vyos/utils/system.py index fca93d118..7b12efb14 100644 --- a/python/vyos/utils/system.py +++ b/python/vyos/utils/system.py @@ -139,3 +139,11 @@ def get_load_averages(): res[15] = float(matches["fifteen"]) / core_count return res + +def get_secure_boot_state() -> bool: + from vyos.utils.process import cmd + from vyos.utils.boot import is_uefi_system + if not is_uefi_system(): + return False + tmp = cmd('mokutil --sb-state') + return bool('enabled' in tmp) |