summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/vyos/ethtool.py114
-rw-r--r--python/vyos/firewall.py4
-rw-r--r--python/vyos/nat.py11
-rw-r--r--python/vyos/qos/base.py16
-rw-r--r--python/vyos/remote.py16
-rw-r--r--python/vyos/tpm.py98
6 files changed, 162 insertions, 97 deletions
diff --git a/python/vyos/ethtool.py b/python/vyos/ethtool.py
index f20fa452e..473c98d0c 100644
--- a/python/vyos/ethtool.py
+++ b/python/vyos/ethtool.py
@@ -1,4 +1,4 @@
-# Copyright 2021-2023 VyOS maintainers and contributors <maintainers@vyos.io>
+# Copyright 2021-2024 VyOS maintainers and contributors <maintainers@vyos.io>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -16,6 +16,7 @@
import os
import re
+from json import loads
from vyos.utils.process import popen
# These drivers do not support using ethtool to change the speed, duplex, or
@@ -31,16 +32,24 @@ class Ethtool:
"""
# dictionary containing driver featurs, it will be populated on demand and
# the content will look like:
- # {
- # 'tls-hw-tx-offload': {'fixed': True, 'enabled': False},
- # 'tx-checksum-fcoe-crc': {'fixed': True, 'enabled': False},
- # 'tx-checksum-ip-generic': {'fixed': False, 'enabled': True},
- # 'tx-checksum-ipv4': {'fixed': True, 'enabled': False},
- # 'tx-checksum-ipv6': {'fixed': True, 'enabled': False},
- # 'tx-checksum-sctp': {'fixed': True, 'enabled': False},
- # 'tx-checksumming': {'fixed': False, 'enabled': True},
- # 'tx-esp-segmentation': {'fixed': True, 'enabled': False},
- # }
+ # [{'esp-hw-offload': {'active': False, 'fixed': True, 'requested': False},
+ # 'esp-tx-csum-hw-offload': {'active': False,
+ # 'fixed': True,
+ # 'requested': False},
+ # 'fcoe-mtu': {'active': False, 'fixed': True, 'requested': False},
+ # 'generic-receive-offload': {'active': True,
+ # 'fixed': False,
+ # 'requested': True},
+ # 'generic-segmentation-offload': {'active': True,
+ # 'fixed': False,
+ # 'requested': True},
+ # 'highdma': {'active': True, 'fixed': False, 'requested': True},
+ # 'ifname': 'eth0',
+ # 'l2-fwd-offload': {'active': False, 'fixed': True, 'requested': False},
+ # 'large-receive-offload': {'active': False,
+ # 'fixed': False,
+ # 'requested': False},
+ # ...
_features = { }
# dictionary containing available interface speed and duplex settings
# {
@@ -49,13 +58,11 @@ class Ethtool:
# '1000': {'full': ''}
# }
_speed_duplex = {'auto': {'auto': ''}}
- _ring_buffers = { }
- _ring_buffers_max = { }
+ _ring_buffer = None
_driver_name = None
_auto_negotiation = False
_auto_negotiation_supported = None
- _flow_control = False
- _flow_control_enabled = None
+ _flow_control = None
_eee = False
_eee_enabled = None
@@ -97,51 +104,19 @@ class Ethtool:
tmp = line.split()[-1]
self._auto_negotiation = bool(tmp == 'on')
- # Now populate features dictionaty
- out, _ = popen(f'ethtool --show-features {ifname}')
- # skip the first line, it only says: "Features for eth0":
- for line in out.splitlines()[1:]:
- if ":" in line:
- key, value = [s.strip() for s in line.strip().split(":", 1)]
- fixed = bool('fixed' in value)
- if fixed:
- value = value.split()[0].strip()
- self._features[key.strip()] = {
- 'enabled' : bool(value == 'on'),
- 'fixed' : fixed
- }
-
- out, _ = popen(f'ethtool --show-ring {ifname}')
- # We are only interested in line 2-5 which contains the device maximum
- # ringbuffers
- for line in out.splitlines()[2:6]:
- if ':' in line:
- key, value = [s.strip() for s in line.strip().split(":", 1)]
- key = key.lower().replace(' ', '_')
- # T3645: ethtool version used on Debian Bullseye changed the
- # output format from 0 -> n/a. As we are only interested in the
- # tx/rx keys we do not care about RX Mini/Jumbo.
- if value.isdigit():
- self._ring_buffers_max[key] = value
- # Now we wan't to get the current RX/TX ringbuffer values - used for
- for line in out.splitlines()[7:11]:
- if ':' in line:
- key, value = [s.strip() for s in line.strip().split(":", 1)]
- key = key.lower().replace(' ', '_')
- # T3645: ethtool version used on Debian Bullseye changed the
- # output format from 0 -> n/a. As we are only interested in the
- # tx/rx keys we do not care about RX Mini/Jumbo.
- if value.isdigit():
- self._ring_buffers[key] = value
+ # Now populate driver features
+ out, _ = popen(f'ethtool --json --show-features {ifname}')
+ self._features = loads(out)
+
+ # Get information about NIC ring buffers
+ out, _ = popen(f'ethtool --json --show-ring {ifname}')
+ self._ring_buffer = loads(out)
# Get current flow control settings, but this is not supported by
# all NICs (e.g. vmxnet3 does not support is)
- out, _ = popen(f'ethtool --show-pause {ifname}')
- if len(out.splitlines()) > 1:
- self._flow_control = True
- # read current flow control setting, this returns:
- # ['Autonegotiate:', 'on']
- self._flow_control_enabled = out.splitlines()[1].split()[-1]
+ out, err = popen(f'ethtool --json --show-pause {ifname}')
+ if not bool(err):
+ self._flow_control = loads(out)
# Get current Energy Efficient Ethernet (EEE) settings, but this is
# not supported by all NICs (e.g. vmxnet3 does not support is)
@@ -169,14 +144,12 @@ class Ethtool:
In case of a missing key, return "fixed = True and enabled = False"
"""
+ active = False
fixed = True
- enabled = False
- if feature in self._features:
- if 'enabled' in self._features[feature]:
- enabled = self._features[feature]['enabled']
- if 'fixed' in self._features[feature]:
- fixed = self._features[feature]['fixed']
- return enabled, fixed
+ if feature in self._features[0]:
+ active = bool(self._features[0][feature]['active'])
+ fixed = bool(self._features[0][feature]['fixed'])
+ return active, fixed
def get_generic_receive_offload(self):
return self._get_generic('generic-receive-offload')
@@ -201,14 +174,14 @@ class Ethtool:
# thus when it's impossible return None
if rx_tx not in ['rx', 'tx']:
ValueError('Ring-buffer type must be either "rx" or "tx"')
- return self._ring_buffers_max.get(rx_tx, None)
+ return str(self._ring_buffer[0].get(f'{rx_tx}-max', None))
def get_ring_buffer(self, rx_tx):
# Configuration of RX/TX ring-buffers is not supported on every device,
# thus when it's impossible return None
if rx_tx not in ['rx', 'tx']:
ValueError('Ring-buffer type must be either "rx" or "tx"')
- return str(self._ring_buffers.get(rx_tx, None))
+ return str(self._ring_buffer[0].get(rx_tx, None))
def check_speed_duplex(self, speed, duplex):
""" Check if the passed speed and duplex combination is supported by
@@ -230,15 +203,14 @@ class Ethtool:
def check_flow_control(self):
""" Check if the NIC supports flow-control """
- if self.get_driver_name() in _drivers_without_speed_duplex_flow:
- return False
- return self._flow_control
+ return bool(self._flow_control)
def get_flow_control(self):
- if self._flow_control_enabled == None:
+ if self._flow_control == None:
raise ValueError('Interface does not support changing '\
'flow-control settings!')
- return self._flow_control_enabled
+
+ return 'on' if bool(self._flow_control[0]['autonegotiate']) else 'off'
def check_eee(self):
""" Check if the NIC supports eee """
diff --git a/python/vyos/firewall.py b/python/vyos/firewall.py
index 49e095946..e70b4f0d9 100644
--- a/python/vyos/firewall.py
+++ b/python/vyos/firewall.py
@@ -136,10 +136,10 @@ def parse_rule(rule_conf, hook, fw_name, rule_id, ip_name):
if 'connection_status' in rule_conf and rule_conf['connection_status']:
status = rule_conf['connection_status']
if status['nat'] == 'destination':
- nat_status = '{dnat}'
+ nat_status = 'dnat'
output.append(f'ct status {nat_status}')
if status['nat'] == 'source':
- nat_status = '{snat}'
+ nat_status = 'snat'
output.append(f'ct status {nat_status}')
if 'protocol' in rule_conf and rule_conf['protocol'] != 'all':
diff --git a/python/vyos/nat.py b/python/vyos/nat.py
index 7215aac88..da2613b16 100644
--- a/python/vyos/nat.py
+++ b/python/vyos/nat.py
@@ -89,11 +89,14 @@ def parse_nat_rule(rule_conf, rule_id, nat_type, ipv6=False):
if addr and is_ip_network(addr):
if not ipv6:
map_addr = dict_search_args(rule_conf, nat_type, 'address')
- if port:
- translation_output.append(f'{ip_prefix} prefix to {ip_prefix} {translation_prefix}addr map {{ {map_addr} : {addr} . {port} }}')
+ if map_addr:
+ if port:
+ translation_output.append(f'{ip_prefix} prefix to {ip_prefix} {translation_prefix}addr map {{ {map_addr} : {addr} . {port} }}')
+ else:
+ translation_output.append(f'{ip_prefix} prefix to {ip_prefix} {translation_prefix}addr map {{ {map_addr} : {addr} }}')
+ ignore_type_addr = True
else:
- translation_output.append(f'{ip_prefix} prefix to {ip_prefix} {translation_prefix}addr map {{ {map_addr} : {addr} }}')
- ignore_type_addr = True
+ translation_output.append(f'prefix to {addr}')
else:
translation_output.append(f'prefix to {addr}')
elif addr == 'masquerade':
diff --git a/python/vyos/qos/base.py b/python/vyos/qos/base.py
index a22039e52..47318122b 100644
--- a/python/vyos/qos/base.py
+++ b/python/vyos/qos/base.py
@@ -129,16 +129,13 @@ class QoSBase:
if tmp: default_tc += f' flows {tmp}'
tmp = dict_search('interval', config)
- if tmp: default_tc += f' interval {tmp}'
-
- tmp = dict_search('interval', config)
- if tmp: default_tc += f' interval {tmp}'
+ if tmp: default_tc += f' interval {tmp}ms'
tmp = dict_search('queue_limit', config)
if tmp: default_tc += f' limit {tmp}'
tmp = dict_search('target', config)
- if tmp: default_tc += f' target {tmp}'
+ if tmp: default_tc += f' target {tmp}ms'
default_tc += f' noecn'
@@ -331,15 +328,6 @@ class QoSBase:
filter_cmd += f' flowid {self._parent:x}:{cls:x}'
self._cmd(filter_cmd)
- else:
-
- filter_cmd += ' basic'
-
- cls = int(cls)
- filter_cmd += f' flowid {self._parent:x}:{cls:x}'
- self._cmd(filter_cmd)
-
-
# The police block allows limiting of the byte or packet rate of
# traffic matched by the filter it is attached to.
# https://man7.org/linux/man-pages/man8/tc-police.8.html
diff --git a/python/vyos/remote.py b/python/vyos/remote.py
index 129e65772..d87fd24f6 100644
--- a/python/vyos/remote.py
+++ b/python/vyos/remote.py
@@ -43,6 +43,7 @@ from vyos.utils.io import print_error
from vyos.utils.misc import begin
from vyos.utils.process import cmd, rc_cmd
from vyos.version import get_version
+from vyos.base import Warning
CHUNK_SIZE = 8192
@@ -54,13 +55,16 @@ class InteractivePolicy(MissingHostKeyPolicy):
def missing_host_key(self, client, hostname, key):
print_error(f"Host '{hostname}' not found in known hosts.")
print_error('Fingerprint: ' + key.get_fingerprint().hex())
- if sys.stdin.isatty() and ask_yes_no('Do you wish to continue?'):
- if client._host_keys_filename\
- and ask_yes_no('Do you wish to permanently add this host/key pair to known hosts?'):
- client._host_keys.add(hostname, key.get_name(), key)
- client.save_host_keys(client._host_keys_filename)
- else:
+ if not sys.stdin.isatty():
+ return
+ if not ask_yes_no('Do you wish to continue?'):
raise SSHException(f"Cannot connect to unknown host '{hostname}'.")
+ if client._host_keys_filename is None:
+ Warning('no \'known_hosts\' file; create to store keys permanently')
+ return
+ if ask_yes_no('Do you wish to permanently add this host/key pair to known_hosts file?'):
+ client._host_keys.add(hostname, key.get_name(), key)
+ client.save_host_keys(client._host_keys_filename)
class SourceAdapter(HTTPAdapter):
"""
diff --git a/python/vyos/tpm.py b/python/vyos/tpm.py
new file mode 100644
index 000000000..f120e10c4
--- /dev/null
+++ b/python/vyos/tpm.py
@@ -0,0 +1,98 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 2024 VyOS maintainers and contributors
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 or later as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import tempfile
+
+from vyos.util import rc_cmd
+
+default_pcrs = ['0','2','4','7']
+tpm_handle = 0x81000000
+
+def init_tpm(clear=False):
+ """
+ Initialize TPM
+ """
+ code, output = rc_cmd('tpm2_startup' + (' -c' if clear else ''))
+ if code != 0:
+ raise Exception('init_tpm: Failed to initialize TPM')
+
+def clear_tpm_key():
+ """
+ Clear existing key on TPM
+ """
+ code, output = rc_cmd(f'tpm2_evictcontrol -C o -c {tpm_handle}')
+ if code != 0:
+ raise Exception('clear_tpm_key: Failed to clear TPM key')
+
+def read_tpm_key(index=0, pcrs=default_pcrs):
+ """
+ Read existing key on TPM
+ """
+ with tempfile.TemporaryDirectory() as tpm_dir:
+ pcr_str = ",".join(pcrs)
+
+ tpm_key_file = os.path.join(tpm_dir, 'tpm_key.key')
+ code, output = rc_cmd(f'tpm2_unseal -c {tpm_handle + index} -p pcr:sha256:{pcr_str} -o {tpm_key_file}')
+ if code != 0:
+ raise Exception('read_tpm_key: Failed to read key from TPM')
+
+ with open(tpm_key_file, 'rb') as f:
+ tpm_key = f.read()
+
+ return tpm_key
+
+def write_tpm_key(key, index=0, pcrs=default_pcrs):
+ """
+ Saves key to TPM
+ """
+ with tempfile.TemporaryDirectory() as tpm_dir:
+ pcr_str = ",".join(pcrs)
+
+ policy_file = os.path.join(tpm_dir, 'policy.digest')
+ code, output = rc_cmd(f'tpm2_createpolicy --policy-pcr -l sha256:{pcr_str} -L {policy_file}')
+ if code != 0:
+ raise Exception('write_tpm_key: Failed to create policy digest')
+
+ primary_context_file = os.path.join(tpm_dir, 'primary.ctx')
+ code, output = rc_cmd(f'tpm2_createprimary -C e -g sha256 -G rsa -c {primary_context_file}')
+ if code != 0:
+ raise Exception('write_tpm_key: Failed to create primary key')
+
+ key_file = os.path.join(tpm_dir, 'crypt.key')
+ with open(key_file, 'wb') as f:
+ f.write(key)
+
+ public_obj = os.path.join(tpm_dir, 'obj.pub')
+ private_obj = os.path.join(tpm_dir, 'obj.key')
+ code, output = rc_cmd(
+ f'tpm2_create -g sha256 \
+ -u {public_obj} -r {private_obj} \
+ -C {primary_context_file} -L {policy_file} -i {key_file}')
+
+ if code != 0:
+ raise Exception('write_tpm_key: Failed to create object')
+
+ load_context_file = os.path.join(tpm_dir, 'load.ctx')
+ code, output = rc_cmd(f'tpm2_load -C {primary_context_file} -u {public_obj} -r {private_obj} -c {load_context_file}')
+
+ if code != 0:
+ raise Exception('write_tpm_key: Failed to load object')
+
+ code, output = rc_cmd(f'tpm2_evictcontrol -c {load_context_file} -C o {tpm_handle + index}')
+
+ if code != 0:
+ raise Exception('write_tpm_key: Failed to write object to TPM')