summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/vyos/configverify.py17
-rw-r--r--python/vyos/ethtool.py88
-rwxr-xr-xpython/vyos/firewall.py27
-rw-r--r--python/vyos/frr.py2
-rw-r--r--python/vyos/ifconfig/bond.py3
-rw-r--r--python/vyos/ifconfig/ethernet.py3
-rw-r--r--python/vyos/ifconfig/interface.py69
-rw-r--r--python/vyos/ifconfig/wireguard.py110
-rw-r--r--python/vyos/nat.py10
-rw-r--r--python/vyos/system/grub.py2
-rw-r--r--python/vyos/utils/boot.py6
-rw-r--r--python/vyos/utils/convert.py62
-rw-r--r--python/vyos/utils/system.py8
13 files changed, 290 insertions, 117 deletions
diff --git a/python/vyos/configverify.py b/python/vyos/configverify.py
index 59b67300d..92996f2ee 100644
--- a/python/vyos/configverify.py
+++ b/python/vyos/configverify.py
@@ -520,3 +520,20 @@ def verify_pki_dh_parameters(config: dict, dh_name: str, min_key_size: int=0):
dh_bits = dh_numbers.p.bit_length()
if dh_bits < min_key_size:
raise ConfigError(f'Minimum DH key-size is {min_key_size} bits!')
+
+def verify_eapol(config: dict):
+ """
+ Common helper function used by interface implementations to perform
+ recurring validation of EAPoL configuration.
+ """
+ if 'eapol' not in config:
+ return
+
+ if 'certificate' not in config['eapol']:
+ raise ConfigError('Certificate must be specified when using EAPoL!')
+
+ verify_pki_certificate(config, config['eapol']['certificate'], no_password_protected=True)
+
+ if 'ca_certificate' in config['eapol']:
+ for ca_cert in config['eapol']['ca_certificate']:
+ verify_pki_ca_certificate(config, ca_cert)
diff --git a/python/vyos/ethtool.py b/python/vyos/ethtool.py
index 80bb56fa2..21272cc5b 100644
--- a/python/vyos/ethtool.py
+++ b/python/vyos/ethtool.py
@@ -56,11 +56,8 @@ class Ethtool:
# '100' : {'full': '', 'half': ''},
# '1000': {'full': ''}
# }
- _speed_duplex = {'auto': {'auto': ''}}
_ring_buffer = None
_driver_name = None
- _auto_negotiation = False
- _auto_negotiation_supported = None
_flow_control = None
def __init__(self, ifname):
@@ -74,56 +71,51 @@ class Ethtool:
self._driver_name = driver.group(1)
# Build a dictinary of supported link-speed and dupley settings.
- out, _ = popen(f'ethtool {ifname}')
- reading = False
- pattern = re.compile(r'\d+base.*')
- for line in out.splitlines()[1:]:
- line = line.lstrip()
- if 'Supported link modes:' in line:
- reading = True
- if 'Supported pause frame use:' in line:
- reading = False
- if reading:
- for block in line.split():
- if pattern.search(block):
- speed = block.split('base')[0]
- duplex = block.split('/')[-1].lower()
- if speed not in self._speed_duplex:
- self._speed_duplex.update({ speed : {}})
- if duplex not in self._speed_duplex[speed]:
- self._speed_duplex[speed].update({ duplex : ''})
- if 'Supports auto-negotiation:' in line:
- # Split the following string: Auto-negotiation: off
- # we are only interested in off or on
- tmp = line.split()[-1]
- self._auto_negotiation_supported = bool(tmp == 'Yes')
- # Only read in if Auto-negotiation is supported
- if self._auto_negotiation_supported and 'Auto-negotiation:' in line:
- # Split the following string: Auto-negotiation: off
- # we are only interested in off or on
- tmp = line.split()[-1]
- self._auto_negotiation = bool(tmp == 'on')
+ # [ {
+ # "ifname": "eth0",
+ # "supported-ports": [ "TP" ],
+ # "supported-link-modes": [ "10baseT/Half","10baseT/Full","100baseT/Half","100baseT/Full","1000baseT/Full" ],
+ # "supported-pause-frame-use": "Symmetric",
+ # "supports-auto-negotiation": true,
+ # "supported-fec-modes": [ ],
+ # "advertised-link-modes": [ "10baseT/Half","10baseT/Full","100baseT/Half","100baseT/Full","1000baseT/Full" ],
+ # "advertised-pause-frame-use": "Symmetric",
+ # "advertised-auto-negotiation": true,
+ # "advertised-fec-modes": [ ],
+ # "speed": 1000,
+ # "duplex": "Full",
+ # "auto-negotiation": false,
+ # "port": "Twisted Pair",
+ # "phyad": 1,
+ # "transceiver": "internal",
+ # "supports-wake-on": "pumbg",
+ # "wake-on": "g",
+ # "current-message-level": 7,
+ # "link-detected": true
+ # } ]
+ out, _ = popen(f'ethtool --json {ifname}')
+ self._base_settings = loads(out)[0]
# Now populate driver features
out, _ = popen(f'ethtool --json --show-features {ifname}')
- self._features = loads(out)
+ self._features = loads(out)[0]
# Get information about NIC ring buffers
out, _ = popen(f'ethtool --json --show-ring {ifname}')
- self._ring_buffer = loads(out)
+ self._ring_buffer = loads(out)[0]
# Get current flow control settings, but this is not supported by
# all NICs (e.g. vmxnet3 does not support is)
out, err = popen(f'ethtool --json --show-pause {ifname}')
if not bool(err):
- self._flow_control = loads(out)
+ self._flow_control = loads(out)[0]
def check_auto_negotiation_supported(self):
""" Check if the NIC supports changing auto-negotiation """
- return self._auto_negotiation_supported
+ return self._base_settings['supports-auto-negotiation']
def get_auto_negotiation(self):
- return self._auto_negotiation_supported and self._auto_negotiation
+ return self._base_settings['supports-auto-negotiation'] and self._base_settings['auto-negotiation']
def get_driver_name(self):
return self._driver_name
@@ -137,9 +129,9 @@ class Ethtool:
"""
active = False
fixed = True
- if feature in self._features[0]:
- active = bool(self._features[0][feature]['active'])
- fixed = bool(self._features[0][feature]['fixed'])
+ if feature in self._features:
+ active = bool(self._features[feature]['active'])
+ fixed = bool(self._features[feature]['fixed'])
return active, fixed
def get_generic_receive_offload(self):
@@ -165,14 +157,14 @@ class Ethtool:
# thus when it's impossible return None
if rx_tx not in ['rx', 'tx']:
ValueError('Ring-buffer type must be either "rx" or "tx"')
- return str(self._ring_buffer[0].get(f'{rx_tx}-max', None))
+ return str(self._ring_buffer.get(f'{rx_tx}-max', None))
def get_ring_buffer(self, rx_tx):
# Configuration of RX/TX ring-buffers is not supported on every device,
# thus when it's impossible return None
if rx_tx not in ['rx', 'tx']:
ValueError('Ring-buffer type must be either "rx" or "tx"')
- return str(self._ring_buffer[0].get(rx_tx, None))
+ return str(self._ring_buffer.get(rx_tx, None))
def check_speed_duplex(self, speed, duplex):
""" Check if the passed speed and duplex combination is supported by
@@ -184,12 +176,16 @@ class Ethtool:
if duplex not in ['auto', 'full', 'half']:
raise ValueError(f'Value "{duplex}" for duplex is invalid!')
+ if speed == 'auto' and duplex == 'auto':
+ return True
+
if self.get_driver_name() in _drivers_without_speed_duplex_flow:
return False
- if speed in self._speed_duplex:
- if duplex in self._speed_duplex[speed]:
- return True
+ # ['10baset/half', '10baset/full', '100baset/half', '100baset/full', '1000baset/full']
+ tmp = [x.lower() for x in self._base_settings['supported-link-modes']]
+ if f'{speed}baset/{duplex}' in tmp:
+ return True
return False
def check_flow_control(self):
@@ -201,4 +197,4 @@ class Ethtool:
raise ValueError('Interface does not support changing '\
'flow-control settings!')
- return 'on' if bool(self._flow_control[0]['autonegotiate']) else 'off'
+ return 'on' if bool(self._flow_control['autonegotiate']) else 'off'
diff --git a/python/vyos/firewall.py b/python/vyos/firewall.py
index b9439d42b..34d0b73f6 100755
--- a/python/vyos/firewall.py
+++ b/python/vyos/firewall.py
@@ -156,6 +156,20 @@ def parse_rule(rule_conf, hook, fw_name, rule_id, ip_name):
proto = '{tcp, udp}'
output.append(f'meta l4proto {operator} {proto}')
+ if 'ethernet_type' in rule_conf:
+ ether_type_mapping = {
+ '802.1q': '8021q',
+ '802.1ad': '8021ad',
+ 'ipv6': 'ip6',
+ 'ipv4': 'ip',
+ 'arp': 'arp'
+ }
+ ether_type = rule_conf['ethernet_type']
+ operator = '!=' if ether_type.startswith('!') else ''
+ ether_type = ether_type.lstrip('!')
+ ether_type = ether_type_mapping.get(ether_type, ether_type)
+ output.append(f'ether type {operator} {ether_type}')
+
for side in ['destination', 'source']:
if side in rule_conf:
prefix = side[0]
@@ -487,6 +501,19 @@ def parse_rule(rule_conf, hook, fw_name, rule_id, ip_name):
output.append(f'vlan id {rule_conf["vlan"]["id"]}')
if 'priority' in rule_conf['vlan']:
output.append(f'vlan pcp {rule_conf["vlan"]["priority"]}')
+ if 'ethernet_type' in rule_conf['vlan']:
+ ether_type_mapping = {
+ '802.1q': '8021q',
+ '802.1ad': '8021ad',
+ 'ipv6': 'ip6',
+ 'ipv4': 'ip',
+ 'arp': 'arp'
+ }
+ ether_type = rule_conf['vlan']['ethernet_type']
+ operator = '!=' if ether_type.startswith('!') else ''
+ ether_type = ether_type.lstrip('!')
+ ether_type = ether_type_mapping.get(ether_type, ether_type)
+ output.append(f'vlan type {operator} {ether_type}')
if 'log' in rule_conf:
action = rule_conf['action'] if 'action' in rule_conf else 'accept'
diff --git a/python/vyos/frr.py b/python/vyos/frr.py
index e7743e9d5..6fb81803f 100644
--- a/python/vyos/frr.py
+++ b/python/vyos/frr.py
@@ -87,7 +87,7 @@ LOG.addHandler(ch)
LOG.addHandler(ch2)
_frr_daemons = ['zebra', 'staticd', 'bgpd', 'ospfd', 'ospf6d', 'ripd', 'ripngd',
- 'isisd', 'pimd', 'pim6d', 'ldpd', 'eigrpd', 'babeld', 'bfdd']
+ 'isisd', 'pimd', 'pim6d', 'ldpd', 'eigrpd', 'babeld', 'bfdd', 'fabricd']
path_vtysh = '/usr/bin/vtysh'
path_frr_reload = '/usr/lib/frr/frr-reload.py'
diff --git a/python/vyos/ifconfig/bond.py b/python/vyos/ifconfig/bond.py
index b8ea90049..8ba481728 100644
--- a/python/vyos/ifconfig/bond.py
+++ b/python/vyos/ifconfig/bond.py
@@ -504,3 +504,6 @@ class BondIf(Interface):
# call base class first
super().update(config)
+
+ # enable/disable EAPoL (Extensible Authentication Protocol over Local Area Network)
+ self.set_eapol()
diff --git a/python/vyos/ifconfig/ethernet.py b/python/vyos/ifconfig/ethernet.py
index 8d96c863f..61da7b74b 100644
--- a/python/vyos/ifconfig/ethernet.py
+++ b/python/vyos/ifconfig/ethernet.py
@@ -452,3 +452,6 @@ class EthernetIf(Interface):
# call base class last
super().update(config)
+
+ # enable/disable EAPoL (Extensible Authentication Protocol over Local Area Network)
+ self.set_eapol()
diff --git a/python/vyos/ifconfig/interface.py b/python/vyos/ifconfig/interface.py
index 72d3d3afe..002d3da9e 100644
--- a/python/vyos/ifconfig/interface.py
+++ b/python/vyos/ifconfig/interface.py
@@ -32,6 +32,12 @@ from vyos.configdict import list_diff
from vyos.configdict import dict_merge
from vyos.configdict import get_vlan_ids
from vyos.defaults import directories
+from vyos.pki import find_chain
+from vyos.pki import encode_certificate
+from vyos.pki import load_certificate
+from vyos.pki import wrap_private_key
+from vyos.template import is_ipv4
+from vyos.template import is_ipv6
from vyos.template import render
from vyos.utils.network import mac2eui64
from vyos.utils.dict import dict_search
@@ -41,9 +47,8 @@ from vyos.utils.network import get_vrf_tableid
from vyos.utils.network import is_netns_interface
from vyos.utils.process import is_systemd_service_active
from vyos.utils.process import run
-from vyos.template import is_ipv4
-from vyos.template import is_ipv6
from vyos.utils.file import read_file
+from vyos.utils.file import write_file
from vyos.utils.network import is_intf_addr_assigned
from vyos.utils.network import is_ipv6_link_local
from vyos.utils.assertion import assert_boolean
@@ -52,7 +57,6 @@ from vyos.utils.assertion import assert_mac
from vyos.utils.assertion import assert_mtu
from vyos.utils.assertion import assert_positive
from vyos.utils.assertion import assert_range
-
from vyos.ifconfig.control import Control
from vyos.ifconfig.vrrp import VRRP
from vyos.ifconfig.operational import Operational
@@ -377,6 +381,9 @@ class Interface(Control):
>>> i = Interface('eth0')
>>> i.remove()
"""
+ # Stop WPA supplicant if EAPoL was in use
+ if is_systemd_service_active(f'wpa_supplicant-wired@{self.ifname}'):
+ self._cmd(f'systemctl stop wpa_supplicant-wired@{self.ifname}')
# remove all assigned IP addresses from interface - this is a bit redundant
# as the kernel will remove all addresses on interface deletion, but we
@@ -1522,6 +1529,61 @@ class Interface(Control):
return None
self.set_interface('per_client_thread', enable)
+ def set_eapol(self) -> None:
+ """ Take care about EAPoL supplicant daemon """
+
+ # XXX: wpa_supplicant works on the source interface
+ cfg_dir = '/run/wpa_supplicant'
+ wpa_supplicant_conf = f'{cfg_dir}/{self.ifname}.conf'
+ eapol_action='stop'
+
+ if 'eapol' in self.config:
+ # The default is a fallback to hw_id which is not present for any interface
+ # other then an ethernet interface. Thus we emulate hw_id by reading back the
+ # Kernel assigned MAC address
+ if 'hw_id' not in self.config:
+ self.config['hw_id'] = read_file(f'/sys/class/net/{self.ifname}/address')
+ render(wpa_supplicant_conf, 'ethernet/wpa_supplicant.conf.j2', self.config)
+
+ cert_file_path = os.path.join(cfg_dir, f'{self.ifname}_cert.pem')
+ cert_key_path = os.path.join(cfg_dir, f'{self.ifname}_cert.key')
+
+ cert_name = self.config['eapol']['certificate']
+ pki_cert = self.config['pki']['certificate'][cert_name]
+
+ loaded_pki_cert = load_certificate(pki_cert['certificate'])
+ loaded_ca_certs = {load_certificate(c['certificate'])
+ for c in self.config['pki']['ca'].values()} if 'ca' in self.config['pki'] else {}
+
+ cert_full_chain = find_chain(loaded_pki_cert, loaded_ca_certs)
+
+ write_file(cert_file_path,
+ '\n'.join(encode_certificate(c) for c in cert_full_chain))
+ write_file(cert_key_path, wrap_private_key(pki_cert['private']['key']))
+
+ if 'ca_certificate' in self.config['eapol']:
+ ca_cert_file_path = os.path.join(cfg_dir, f'{self.ifname}_ca.pem')
+ ca_chains = []
+
+ for ca_cert_name in self.config['eapol']['ca_certificate']:
+ pki_ca_cert = self.config['pki']['ca'][ca_cert_name]
+ loaded_ca_cert = load_certificate(pki_ca_cert['certificate'])
+ ca_full_chain = find_chain(loaded_ca_cert, loaded_ca_certs)
+ ca_chains.append(
+ '\n'.join(encode_certificate(c) for c in ca_full_chain))
+
+ write_file(ca_cert_file_path, '\n'.join(ca_chains))
+
+ eapol_action='reload-or-restart'
+
+ # start/stop WPA supplicant service
+ self._cmd(f'systemctl {eapol_action} wpa_supplicant-wired@{self.ifname}')
+
+ if 'eapol' not in self.config:
+ # delete configuration on interface removal
+ if os.path.isfile(wpa_supplicant_conf):
+ os.unlink(wpa_supplicant_conf)
+
def update(self, config):
""" General helper function which works on a dictionary retrived by
get_config_dict(). It's main intention is to consolidate the scattered
@@ -1609,7 +1671,6 @@ class Interface(Control):
tmp = get_interface_config(config['ifname'])
if 'master' in tmp and tmp['master'] != bridge_if:
self.set_vrf('')
-
else:
self.set_vrf(config.get('vrf', ''))
diff --git a/python/vyos/ifconfig/wireguard.py b/python/vyos/ifconfig/wireguard.py
index 5b5f25323..9030b1302 100644
--- a/python/vyos/ifconfig/wireguard.py
+++ b/python/vyos/ifconfig/wireguard.py
@@ -26,6 +26,7 @@ from vyos.ifconfig import Interface
from vyos.ifconfig import Operational
from vyos.template import is_ipv6
+
class WireGuardOperational(Operational):
def _dump(self):
"""Dump wireguard data in a python friendly way."""
@@ -54,7 +55,17 @@ class WireGuardOperational(Operational):
}
else:
# We are entering a peer
- device, public_key, preshared_key, endpoint, allowed_ips, latest_handshake, transfer_rx, transfer_tx, persistent_keepalive = items
+ (
+ device,
+ public_key,
+ preshared_key,
+ endpoint,
+ allowed_ips,
+ latest_handshake,
+ transfer_rx,
+ transfer_tx,
+ persistent_keepalive,
+ ) = items
if allowed_ips == '(none)':
allowed_ips = []
else:
@@ -72,75 +83,78 @@ class WireGuardOperational(Operational):
def show_interface(self):
from vyos.config import Config
+
c = Config()
wgdump = self._dump().get(self.config['ifname'], None)
- c.set_level(["interfaces", "wireguard", self.config['ifname']])
- description = c.return_effective_value(["description"])
- ips = c.return_effective_values(["address"])
+ c.set_level(['interfaces', 'wireguard', self.config['ifname']])
+ description = c.return_effective_value(['description'])
+ ips = c.return_effective_values(['address'])
- answer = "interface: {}\n".format(self.config['ifname'])
- if (description):
- answer += " description: {}\n".format(description)
- if (ips):
- answer += " address: {}\n".format(", ".join(ips))
+ answer = 'interface: {}\n'.format(self.config['ifname'])
+ if description:
+ answer += ' description: {}\n'.format(description)
+ if ips:
+ answer += ' address: {}\n'.format(', '.join(ips))
- answer += " public key: {}\n".format(wgdump['public_key'])
- answer += " private key: (hidden)\n"
- answer += " listening port: {}\n".format(wgdump['listen_port'])
- answer += "\n"
+ answer += ' public key: {}\n'.format(wgdump['public_key'])
+ answer += ' private key: (hidden)\n'
+ answer += ' listening port: {}\n'.format(wgdump['listen_port'])
+ answer += '\n'
- for peer in c.list_effective_nodes(["peer"]):
+ for peer in c.list_effective_nodes(['peer']):
if wgdump['peers']:
- pubkey = c.return_effective_value(["peer", peer, "public_key"])
+ pubkey = c.return_effective_value(['peer', peer, 'public-key'])
if pubkey in wgdump['peers']:
wgpeer = wgdump['peers'][pubkey]
- answer += " peer: {}\n".format(peer)
- answer += " public key: {}\n".format(pubkey)
+ answer += ' peer: {}\n'.format(peer)
+ answer += ' public key: {}\n'.format(pubkey)
""" figure out if the tunnel is recently active or not """
- status = "inactive"
- if (wgpeer['latest_handshake'] is None):
+ status = 'inactive'
+ if wgpeer['latest_handshake'] is None:
""" no handshake ever """
- status = "inactive"
+ status = 'inactive'
else:
if int(wgpeer['latest_handshake']) > 0:
- delta = timedelta(seconds=int(
- time.time() - wgpeer['latest_handshake']))
- answer += " latest handshake: {}\n".format(delta)
- if (time.time() - int(wgpeer['latest_handshake']) < (60*5)):
+ delta = timedelta(
+ seconds=int(time.time() - wgpeer['latest_handshake'])
+ )
+ answer += ' latest handshake: {}\n'.format(delta)
+ if time.time() - int(wgpeer['latest_handshake']) < (60 * 5):
""" Five minutes and the tunnel is still active """
- status = "active"
+ status = 'active'
else:
""" it's been longer than 5 minutes """
- status = "inactive"
+ status = 'inactive'
elif int(wgpeer['latest_handshake']) == 0:
""" no handshake ever """
- status = "inactive"
- answer += " status: {}\n".format(status)
+ status = 'inactive'
+ answer += ' status: {}\n'.format(status)
if wgpeer['endpoint'] is not None:
- answer += " endpoint: {}\n".format(wgpeer['endpoint'])
+ answer += ' endpoint: {}\n'.format(wgpeer['endpoint'])
if wgpeer['allowed_ips'] is not None:
- answer += " allowed ips: {}\n".format(
- ",".join(wgpeer['allowed_ips']).replace(",", ", "))
+ answer += ' allowed ips: {}\n'.format(
+ ','.join(wgpeer['allowed_ips']).replace(',', ', ')
+ )
if wgpeer['transfer_rx'] > 0 or wgpeer['transfer_tx'] > 0:
- rx_size = size(
- wgpeer['transfer_rx'], system=alternative)
- tx_size = size(
- wgpeer['transfer_tx'], system=alternative)
- answer += " transfer: {} received, {} sent\n".format(
- rx_size, tx_size)
+ rx_size = size(wgpeer['transfer_rx'], system=alternative)
+ tx_size = size(wgpeer['transfer_tx'], system=alternative)
+ answer += ' transfer: {} received, {} sent\n'.format(
+ rx_size, tx_size
+ )
if wgpeer['persistent_keepalive'] is not None:
- answer += " persistent keepalive: every {} seconds\n".format(
- wgpeer['persistent_keepalive'])
+ answer += ' persistent keepalive: every {} seconds\n'.format(
+ wgpeer['persistent_keepalive']
+ )
answer += '\n'
- return answer + super().formated_stats()
+ return answer
@Interface.register
@@ -151,27 +165,29 @@ class WireGuardIf(Interface):
**Interface.definition,
**{
'section': 'wireguard',
- 'prefixes': ['wg', ],
+ 'prefixes': [
+ 'wg',
+ ],
'bridgeable': False,
- }
+ },
}
def get_mac(self):
- """ Get a synthetic MAC address. """
+ """Get a synthetic MAC address."""
return self.get_mac_synthetic()
def update(self, config):
- """ General helper function which works on a dictionary retrived by
+ """General helper function which works on a dictionary retrived by
get_config_dict(). It's main intention is to consolidate the scattered
interface setup code and provide a single point of entry when workin
- on any interface. """
+ on any interface."""
tmp_file = NamedTemporaryFile('w')
tmp_file.write(config['private_key'])
tmp_file.flush()
# Wireguard base command is identical for every peer
- base_cmd = 'wg set {ifname}'
+ base_cmd = 'wg set {ifname}'
if 'port' in config:
base_cmd += ' listen-port {port}'
if 'fwmark' in config:
@@ -201,7 +217,7 @@ class WireGuardIf(Interface):
cmd += f' preshared-key {psk_file}'
# Persistent keepalive is optional
- if 'persistent_keepalive'in peer_config:
+ if 'persistent_keepalive' in peer_config:
cmd += ' persistent-keepalive {persistent_keepalive}'
# Multiple allowed-ip ranges can be defined - ensure we are always
diff --git a/python/vyos/nat.py b/python/vyos/nat.py
index 4fe21ef13..29f8e961b 100644
--- a/python/vyos/nat.py
+++ b/python/vyos/nat.py
@@ -199,7 +199,10 @@ def parse_nat_rule(rule_conf, rule_id, nat_type, ipv6=False):
if group_name[0] == '!':
operator = '!='
group_name = group_name[1:]
- output.append(f'{ip_prefix} {prefix}addr {operator} @A_{group_name}')
+ if ipv6:
+ output.append(f'{ip_prefix} {prefix}addr {operator} @A6_{group_name}')
+ else:
+ output.append(f'{ip_prefix} {prefix}addr {operator} @A_{group_name}')
# Generate firewall group domain-group
elif 'domain_group' in group and not (ignore_type_addr and target == nat_type):
group_name = group['domain_group']
@@ -214,7 +217,10 @@ def parse_nat_rule(rule_conf, rule_id, nat_type, ipv6=False):
if group_name[0] == '!':
operator = '!='
group_name = group_name[1:]
- output.append(f'{ip_prefix} {prefix}addr {operator} @N_{group_name}')
+ if ipv6:
+ output.append(f'{ip_prefix} {prefix}addr {operator} @N6_{group_name}')
+ else:
+ output.append(f'{ip_prefix} {prefix}addr {operator} @N_{group_name}')
if 'mac_group' in group:
group_name = group['mac_group']
operator = ''
diff --git a/python/vyos/system/grub.py b/python/vyos/system/grub.py
index daddb799a..de8303ee2 100644
--- a/python/vyos/system/grub.py
+++ b/python/vyos/system/grub.py
@@ -82,7 +82,7 @@ def install(drive_path: str, boot_dir: str, efi_dir: str, id: str = 'VyOS', chro
f'{chroot_cmd} grub-install --no-floppy --recheck --target={efi_installation_arch}-efi \
--force-extra-removable --boot-directory={boot_dir} \
--efi-directory={efi_dir} --bootloader-id="{id}" \
- --no-uefi-secure-boot'
+ --uefi-secure-boot'
)
diff --git a/python/vyos/utils/boot.py b/python/vyos/utils/boot.py
index 3aecbec64..708bef14d 100644
--- a/python/vyos/utils/boot.py
+++ b/python/vyos/utils/boot.py
@@ -1,4 +1,4 @@
-# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io>
+# Copyright 2023-2024 VyOS maintainers and contributors <maintainers@vyos.io>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -33,3 +33,7 @@ def boot_configuration_success() -> bool:
if int(res) == 0:
return True
return False
+
+def is_uefi_system() -> bool:
+ efi_fw_dir = '/sys/firmware/efi'
+ return os.path.exists(efi_fw_dir) and os.path.isdir(efi_fw_dir)
diff --git a/python/vyos/utils/convert.py b/python/vyos/utils/convert.py
index 41e65081f..dd4266f57 100644
--- a/python/vyos/utils/convert.py
+++ b/python/vyos/utils/convert.py
@@ -12,41 +12,72 @@
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+import re
+
+# Define the number of seconds in each time unit
+time_units = {
+ 'y': 60 * 60 * 24 * 365.25, # year
+ 'w': 60 * 60 * 24 * 7, # week
+ 'd': 60 * 60 * 24, # day
+ 'h': 60 * 60, # hour
+ 'm': 60, # minute
+ 's': 1 # second
+}
+
+
+def human_to_seconds(time_str):
+ """ Converts a human-readable interval such as 1w4d18h35m59s
+ to number of seconds
+ """
+
+ time_patterns = {
+ 'y': r'(\d+)\s*y',
+ 'w': r'(\d+)\s*w',
+ 'd': r'(\d+)\s*d',
+ 'h': r'(\d+)\s*h',
+ 'm': r'(\d+)\s*m',
+ 's': r'(\d+)\s*s'
+ }
+
+ total_seconds = 0
+
+ for unit, pattern in time_patterns.items():
+ match = re.search(pattern, time_str)
+ if match:
+ value = int(match.group(1))
+ total_seconds += value * time_units[unit]
+
+ return int(total_seconds)
+
def seconds_to_human(s, separator=""):
""" Converts number of seconds passed to a human-readable
interval such as 1w4d18h35m59s
"""
s = int(s)
-
- year = 60 * 60 * 24 * 365.25
- week = 60 * 60 * 24 * 7
- day = 60 * 60 * 24
- hour = 60 * 60
-
result = []
- years = s // year
+ years = s // time_units['y']
if years > 0:
result.append(f'{int(years)}y')
- s = int(s % year)
+ s = int(s % time_units['y'])
- weeks = s // week
+ weeks = s // time_units['w']
if weeks > 0:
result.append(f'{weeks}w')
- s = s % week
+ s = s % time_units['w']
- days = s // day
+ days = s // time_units['d']
if days > 0:
result.append(f'{days}d')
- s = s % day
+ s = s % time_units['d']
- hours = s // hour
+ hours = s // time_units['h']
if hours > 0:
result.append(f'{hours}h')
- s = s % hour
+ s = s % time_units['h']
- minutes = s // 60
+ minutes = s // time_units['m']
if minutes > 0:
result.append(f'{minutes}m')
s = s % 60
@@ -57,6 +88,7 @@ def seconds_to_human(s, separator=""):
return separator.join(result)
+
def bytes_to_human(bytes, initial_exponent=0, precision=2,
int_below_exponent=0):
""" Converts a value in bytes to a human-readable size string like 640 KB
diff --git a/python/vyos/utils/system.py b/python/vyos/utils/system.py
index fca93d118..7b12efb14 100644
--- a/python/vyos/utils/system.py
+++ b/python/vyos/utils/system.py
@@ -139,3 +139,11 @@ def get_load_averages():
res[15] = float(matches["fifteen"]) / core_count
return res
+
+def get_secure_boot_state() -> bool:
+ from vyos.utils.process import cmd
+ from vyos.utils.boot import is_uefi_system
+ if not is_uefi_system():
+ return False
+ tmp = cmd('mokutil --sb-state')
+ return bool('enabled' in tmp)