summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/vyos/config_mgmt.py3
-rw-r--r--python/vyos/configdep.py53
-rw-r--r--python/vyos/ethtool.py116
-rw-r--r--python/vyos/firewall.py22
-rw-r--r--python/vyos/kea.py27
-rw-r--r--python/vyos/nat.py11
-rw-r--r--python/vyos/qos/base.py16
-rw-r--r--python/vyos/remote.py16
-rw-r--r--python/vyos/system/grub.py36
-rw-r--r--python/vyos/tpm.py98
10 files changed, 287 insertions, 111 deletions
diff --git a/python/vyos/config_mgmt.py b/python/vyos/config_mgmt.py
index ff078649d..28ccee769 100644
--- a/python/vyos/config_mgmt.py
+++ b/python/vyos/config_mgmt.py
@@ -132,6 +132,9 @@ class ConfigMgmt:
{}).get('source_address', '')
if config.exists(['system', 'host-name']):
self.hostname = config.return_value(['system', 'host-name'])
+ if config.exists(['system', 'domain-name']):
+ tmp = config.return_value(['system', 'domain-name'])
+ self.hostname += f'.{tmp}'
else:
self.hostname = 'vyos'
diff --git a/python/vyos/configdep.py b/python/vyos/configdep.py
index 64727d355..73bd9ea96 100644
--- a/python/vyos/configdep.py
+++ b/python/vyos/configdep.py
@@ -1,4 +1,4 @@
-# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io>
+# Copyright 2023-2024 VyOS maintainers and contributors <maintainers@vyos.io>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -33,7 +33,14 @@ if typing.TYPE_CHECKING:
dependency_dir = os.path.join(directories['data'],
'config-mode-dependencies')
-dependent_func: dict[str, list[typing.Callable]] = {}
+local_dependent_func: dict[str, list[typing.Callable]] = {}
+
+DEBUG = False
+FORCE_LOCAL = False
+
+def debug_print(s: str):
+ if DEBUG:
+ print(s)
def canon_name(name: str) -> str:
return os.path.splitext(name)[0].replace('-', '_')
@@ -45,6 +52,26 @@ def canon_name_of_path(path: str) -> str:
def caller_name() -> str:
return stack()[2].filename
+def name_of(f: typing.Callable) -> str:
+ return f.__name__
+
+def names_of(l: list[typing.Callable]) -> list[str]:
+ return [name_of(f) for f in l]
+
+def remove_redundant(l: list[typing.Callable]) -> list[typing.Callable]:
+ names = set()
+ for e in reversed(l):
+ _ = l.remove(e) if name_of(e) in names else names.add(name_of(e))
+
+def append_uniq(l: list[typing.Callable], e: typing.Callable):
+ """Append an element, removing earlier occurrences
+
+ The list of dependencies is generally short and traversing the list on
+ each append is preferable to the cost of redundant script invocation.
+ """
+ l.append(e)
+ remove_redundant(l)
+
def read_dependency_dict(dependency_dir: str = dependency_dir) -> dict:
res = {}
for dep_file in os.listdir(dependency_dir):
@@ -95,16 +122,30 @@ def set_dependents(case: str, config: 'Config',
tagnode: typing.Optional[str] = None):
d = get_dependency_dict(config)
k = canon_name_of_path(caller_name())
- l = dependent_func.setdefault(k, [])
+ tag_ext = f'_{tagnode}' if tagnode is not None else ''
+ if hasattr(config, 'dependent_func') and not FORCE_LOCAL:
+ dependent_func = getattr(config, 'dependent_func')
+ l = dependent_func.setdefault('vyos_configd', [])
+ else:
+ dependent_func = local_dependent_func
+ l = dependent_func.setdefault(k, [])
for target in d[k][case]:
func = def_closure(target, config, tagnode)
- l.append(func)
+ func.__name__ = f'{target}{tag_ext}'
+ append_uniq(l, func)
+ debug_print(f'set_dependents: caller {k}, dependents {names_of(l)}')
-def call_dependents():
+def call_dependents(dependent_func: dict = None):
k = canon_name_of_path(caller_name())
- l = dependent_func.get(k, [])
+ if dependent_func is None or FORCE_LOCAL:
+ dependent_func = local_dependent_func
+ l = dependent_func.get(k, [])
+ else:
+ l = dependent_func.get('vyos_configd', [])
+ debug_print(f'call_dependents: caller {k}, dependents {names_of(l)}')
while l:
f = l.pop(0)
+ debug_print(f'calling: {f.__name__}')
f()
def called_as_dependent() -> bool:
diff --git a/python/vyos/ethtool.py b/python/vyos/ethtool.py
index ba638b280..473c98d0c 100644
--- a/python/vyos/ethtool.py
+++ b/python/vyos/ethtool.py
@@ -1,4 +1,4 @@
-# Copyright 2021-2023 VyOS maintainers and contributors <maintainers@vyos.io>
+# Copyright 2021-2024 VyOS maintainers and contributors <maintainers@vyos.io>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -16,6 +16,7 @@
import os
import re
+from json import loads
from vyos.utils.process import popen
# These drivers do not support using ethtool to change the speed, duplex, or
@@ -31,16 +32,24 @@ class Ethtool:
"""
# dictionary containing driver featurs, it will be populated on demand and
# the content will look like:
- # {
- # 'tls-hw-tx-offload': {'fixed': True, 'enabled': False},
- # 'tx-checksum-fcoe-crc': {'fixed': True, 'enabled': False},
- # 'tx-checksum-ip-generic': {'fixed': False, 'enabled': True},
- # 'tx-checksum-ipv4': {'fixed': True, 'enabled': False},
- # 'tx-checksum-ipv6': {'fixed': True, 'enabled': False},
- # 'tx-checksum-sctp': {'fixed': True, 'enabled': False},
- # 'tx-checksumming': {'fixed': False, 'enabled': True},
- # 'tx-esp-segmentation': {'fixed': True, 'enabled': False},
- # }
+ # [{'esp-hw-offload': {'active': False, 'fixed': True, 'requested': False},
+ # 'esp-tx-csum-hw-offload': {'active': False,
+ # 'fixed': True,
+ # 'requested': False},
+ # 'fcoe-mtu': {'active': False, 'fixed': True, 'requested': False},
+ # 'generic-receive-offload': {'active': True,
+ # 'fixed': False,
+ # 'requested': True},
+ # 'generic-segmentation-offload': {'active': True,
+ # 'fixed': False,
+ # 'requested': True},
+ # 'highdma': {'active': True, 'fixed': False, 'requested': True},
+ # 'ifname': 'eth0',
+ # 'l2-fwd-offload': {'active': False, 'fixed': True, 'requested': False},
+ # 'large-receive-offload': {'active': False,
+ # 'fixed': False,
+ # 'requested': False},
+ # ...
_features = { }
# dictionary containing available interface speed and duplex settings
# {
@@ -49,13 +58,11 @@ class Ethtool:
# '1000': {'full': ''}
# }
_speed_duplex = {'auto': {'auto': ''}}
- _ring_buffers = { }
- _ring_buffers_max = { }
+ _ring_buffer = None
_driver_name = None
_auto_negotiation = False
_auto_negotiation_supported = None
- _flow_control = False
- _flow_control_enabled = None
+ _flow_control = None
_eee = False
_eee_enabled = None
@@ -97,51 +104,19 @@ class Ethtool:
tmp = line.split()[-1]
self._auto_negotiation = bool(tmp == 'on')
- # Now populate features dictionaty
- out, _ = popen(f'ethtool --show-features {ifname}')
- # skip the first line, it only says: "Features for eth0":
- for line in out.splitlines()[1:]:
- if ":" in line:
- key, value = [s.strip() for s in line.strip().split(":", 1)]
- fixed = bool('fixed' in value)
- if fixed:
- value = value.split()[0].strip()
- self._features[key.strip()] = {
- 'enabled' : bool(value == 'on'),
- 'fixed' : fixed
- }
-
- out, _ = popen(f'ethtool --show-ring {ifname}')
- # We are only interested in line 2-5 which contains the device maximum
- # ringbuffers
- for line in out.splitlines()[2:6]:
- if ':' in line:
- key, value = [s.strip() for s in line.strip().split(":", 1)]
- key = key.lower().replace(' ', '_')
- # T3645: ethtool version used on Debian Bullseye changed the
- # output format from 0 -> n/a. As we are only interested in the
- # tx/rx keys we do not care about RX Mini/Jumbo.
- if value.isdigit():
- self._ring_buffers_max[key] = value
- # Now we wan't to get the current RX/TX ringbuffer values - used for
- for line in out.splitlines()[7:11]:
- if ':' in line:
- key, value = [s.strip() for s in line.strip().split(":", 1)]
- key = key.lower().replace(' ', '_')
- # T3645: ethtool version used on Debian Bullseye changed the
- # output format from 0 -> n/a. As we are only interested in the
- # tx/rx keys we do not care about RX Mini/Jumbo.
- if value.isdigit():
- self._ring_buffers[key] = value
+ # Now populate driver features
+ out, _ = popen(f'ethtool --json --show-features {ifname}')
+ self._features = loads(out)
+
+ # Get information about NIC ring buffers
+ out, _ = popen(f'ethtool --json --show-ring {ifname}')
+ self._ring_buffer = loads(out)
# Get current flow control settings, but this is not supported by
# all NICs (e.g. vmxnet3 does not support is)
- out, _ = popen(f'ethtool --show-pause {ifname}')
- if len(out.splitlines()) > 1:
- self._flow_control = True
- # read current flow control setting, this returns:
- # ['Autonegotiate:', 'on']
- self._flow_control_enabled = out.splitlines()[1].split()[-1]
+ out, err = popen(f'ethtool --json --show-pause {ifname}')
+ if not bool(err):
+ self._flow_control = loads(out)
# Get current Energy Efficient Ethernet (EEE) settings, but this is
# not supported by all NICs (e.g. vmxnet3 does not support is)
@@ -150,7 +125,7 @@ class Ethtool:
self._eee = True
# read current EEE setting, this returns:
# EEE status: disabled || EEE status: enabled - inactive || EEE status: enabled - active
- self._eee_enabled = bool('enabled' in out.splitlines()[2])
+ self._eee_enabled = bool('enabled' in out.splitlines()[1])
def check_auto_negotiation_supported(self):
""" Check if the NIC supports changing auto-negotiation """
@@ -169,14 +144,12 @@ class Ethtool:
In case of a missing key, return "fixed = True and enabled = False"
"""
+ active = False
fixed = True
- enabled = False
- if feature in self._features:
- if 'enabled' in self._features[feature]:
- enabled = self._features[feature]['enabled']
- if 'fixed' in self._features[feature]:
- fixed = self._features[feature]['fixed']
- return enabled, fixed
+ if feature in self._features[0]:
+ active = bool(self._features[0][feature]['active'])
+ fixed = bool(self._features[0][feature]['fixed'])
+ return active, fixed
def get_generic_receive_offload(self):
return self._get_generic('generic-receive-offload')
@@ -201,14 +174,14 @@ class Ethtool:
# thus when it's impossible return None
if rx_tx not in ['rx', 'tx']:
ValueError('Ring-buffer type must be either "rx" or "tx"')
- return self._ring_buffers_max.get(rx_tx, None)
+ return str(self._ring_buffer[0].get(f'{rx_tx}-max', None))
def get_ring_buffer(self, rx_tx):
# Configuration of RX/TX ring-buffers is not supported on every device,
# thus when it's impossible return None
if rx_tx not in ['rx', 'tx']:
ValueError('Ring-buffer type must be either "rx" or "tx"')
- return str(self._ring_buffers.get(rx_tx, None))
+ return str(self._ring_buffer[0].get(rx_tx, None))
def check_speed_duplex(self, speed, duplex):
""" Check if the passed speed and duplex combination is supported by
@@ -230,15 +203,14 @@ class Ethtool:
def check_flow_control(self):
""" Check if the NIC supports flow-control """
- if self.get_driver_name() in _drivers_without_speed_duplex_flow:
- return False
- return self._flow_control
+ return bool(self._flow_control)
def get_flow_control(self):
- if self._flow_control_enabled == None:
+ if self._flow_control == None:
raise ValueError('Interface does not support changing '\
'flow-control settings!')
- return self._flow_control_enabled
+
+ return 'on' if bool(self._flow_control[0]['autonegotiate']) else 'off'
def check_eee(self):
""" Check if the NIC supports eee """
diff --git a/python/vyos/firewall.py b/python/vyos/firewall.py
index eee11bd2d..e70b4f0d9 100644
--- a/python/vyos/firewall.py
+++ b/python/vyos/firewall.py
@@ -34,6 +34,24 @@ from vyos.utils.process import call
from vyos.utils.process import cmd
from vyos.utils.process import run
+# Conntrack
+
+def conntrack_required(conf):
+ required_nodes = ['nat', 'nat66', 'load-balancing wan']
+
+ for path in required_nodes:
+ if conf.exists(path):
+ return True
+
+ firewall = conf.get_config_dict(['firewall'], key_mangling=('-', '_'),
+ no_tag_node_value_mangle=True, get_first_key=True)
+
+ for rules, path in dict_search_recursive(firewall, 'rule'):
+ if any(('state' in rule_conf or 'connection_status' in rule_conf or 'offload_target' in rule_conf) for rule_conf in rules.values()):
+ return True
+
+ return False
+
# Domain Resolver
def fqdn_config_parse(firewall):
@@ -118,10 +136,10 @@ def parse_rule(rule_conf, hook, fw_name, rule_id, ip_name):
if 'connection_status' in rule_conf and rule_conf['connection_status']:
status = rule_conf['connection_status']
if status['nat'] == 'destination':
- nat_status = '{dnat}'
+ nat_status = 'dnat'
output.append(f'ct status {nat_status}')
if status['nat'] == 'source':
- nat_status = '{snat}'
+ nat_status = 'snat'
output.append(f'ct status {nat_status}')
if 'protocol' in rule_conf and rule_conf['protocol'] != 'all':
diff --git a/python/vyos/kea.py b/python/vyos/kea.py
index 894ac9e9a..89ae7ca81 100644
--- a/python/vyos/kea.py
+++ b/python/vyos/kea.py
@@ -56,6 +56,8 @@ kea6_options = {
'captive_portal': 'v6-captive-portal'
}
+kea_ctrl_socket = '/run/kea/dhcp{inet}-ctrl-socket'
+
def kea_parse_options(config):
options = []
@@ -113,6 +115,9 @@ def kea_parse_subnet(subnet, config):
if 'bootfile_server' in config['option']:
out['next-server'] = config['option']['bootfile_server']
+ if 'ignore_client_id' in config:
+ out['match-client-id'] = False
+
if 'lease' in config:
out['valid-lifetime'] = int(config['lease'])
out['max-valid-lifetime'] = int(config['lease'])
@@ -291,7 +296,9 @@ def kea6_parse_subnet(subnet, config):
return out
-def _ctrl_socket_command(path, command, args=None):
+def _ctrl_socket_command(inet, command, args=None):
+ path = kea_ctrl_socket.format(inet=inet)
+
if not os.path.exists(path):
return None
@@ -316,19 +323,25 @@ def _ctrl_socket_command(path, command, args=None):
return json.loads(result.decode('utf-8'))
def kea_get_leases(inet):
- ctrl_socket = f'/run/kea/dhcp{inet}-ctrl-socket'
-
- leases = _ctrl_socket_command(ctrl_socket, f'lease{inet}-get-all')
+ leases = _ctrl_socket_command(inet, f'lease{inet}-get-all')
if not leases or 'result' not in leases or leases['result'] != 0:
return []
return leases['arguments']['leases']
-def kea_get_active_config(inet):
- ctrl_socket = f'/run/kea/dhcp{inet}-ctrl-socket'
+def kea_delete_lease(inet, ip_address):
+ args = {'ip-address': ip_address}
- config = _ctrl_socket_command(ctrl_socket, 'config-get')
+ result = _ctrl_socket_command(inet, f'lease{inet}-del', args)
+
+ if result and 'result' in result:
+ return result['result'] == 0
+
+ return False
+
+def kea_get_active_config(inet):
+ config = _ctrl_socket_command(inet, 'config-get')
if not config or 'result' not in config or config['result'] != 0:
return None
diff --git a/python/vyos/nat.py b/python/vyos/nat.py
index 7215aac88..da2613b16 100644
--- a/python/vyos/nat.py
+++ b/python/vyos/nat.py
@@ -89,11 +89,14 @@ def parse_nat_rule(rule_conf, rule_id, nat_type, ipv6=False):
if addr and is_ip_network(addr):
if not ipv6:
map_addr = dict_search_args(rule_conf, nat_type, 'address')
- if port:
- translation_output.append(f'{ip_prefix} prefix to {ip_prefix} {translation_prefix}addr map {{ {map_addr} : {addr} . {port} }}')
+ if map_addr:
+ if port:
+ translation_output.append(f'{ip_prefix} prefix to {ip_prefix} {translation_prefix}addr map {{ {map_addr} : {addr} . {port} }}')
+ else:
+ translation_output.append(f'{ip_prefix} prefix to {ip_prefix} {translation_prefix}addr map {{ {map_addr} : {addr} }}')
+ ignore_type_addr = True
else:
- translation_output.append(f'{ip_prefix} prefix to {ip_prefix} {translation_prefix}addr map {{ {map_addr} : {addr} }}')
- ignore_type_addr = True
+ translation_output.append(f'prefix to {addr}')
else:
translation_output.append(f'prefix to {addr}')
elif addr == 'masquerade':
diff --git a/python/vyos/qos/base.py b/python/vyos/qos/base.py
index a22039e52..47318122b 100644
--- a/python/vyos/qos/base.py
+++ b/python/vyos/qos/base.py
@@ -129,16 +129,13 @@ class QoSBase:
if tmp: default_tc += f' flows {tmp}'
tmp = dict_search('interval', config)
- if tmp: default_tc += f' interval {tmp}'
-
- tmp = dict_search('interval', config)
- if tmp: default_tc += f' interval {tmp}'
+ if tmp: default_tc += f' interval {tmp}ms'
tmp = dict_search('queue_limit', config)
if tmp: default_tc += f' limit {tmp}'
tmp = dict_search('target', config)
- if tmp: default_tc += f' target {tmp}'
+ if tmp: default_tc += f' target {tmp}ms'
default_tc += f' noecn'
@@ -331,15 +328,6 @@ class QoSBase:
filter_cmd += f' flowid {self._parent:x}:{cls:x}'
self._cmd(filter_cmd)
- else:
-
- filter_cmd += ' basic'
-
- cls = int(cls)
- filter_cmd += f' flowid {self._parent:x}:{cls:x}'
- self._cmd(filter_cmd)
-
-
# The police block allows limiting of the byte or packet rate of
# traffic matched by the filter it is attached to.
# https://man7.org/linux/man-pages/man8/tc-police.8.html
diff --git a/python/vyos/remote.py b/python/vyos/remote.py
index 129e65772..d87fd24f6 100644
--- a/python/vyos/remote.py
+++ b/python/vyos/remote.py
@@ -43,6 +43,7 @@ from vyos.utils.io import print_error
from vyos.utils.misc import begin
from vyos.utils.process import cmd, rc_cmd
from vyos.version import get_version
+from vyos.base import Warning
CHUNK_SIZE = 8192
@@ -54,13 +55,16 @@ class InteractivePolicy(MissingHostKeyPolicy):
def missing_host_key(self, client, hostname, key):
print_error(f"Host '{hostname}' not found in known hosts.")
print_error('Fingerprint: ' + key.get_fingerprint().hex())
- if sys.stdin.isatty() and ask_yes_no('Do you wish to continue?'):
- if client._host_keys_filename\
- and ask_yes_no('Do you wish to permanently add this host/key pair to known hosts?'):
- client._host_keys.add(hostname, key.get_name(), key)
- client.save_host_keys(client._host_keys_filename)
- else:
+ if not sys.stdin.isatty():
+ return
+ if not ask_yes_no('Do you wish to continue?'):
raise SSHException(f"Cannot connect to unknown host '{hostname}'.")
+ if client._host_keys_filename is None:
+ Warning('no \'known_hosts\' file; create to store keys permanently')
+ return
+ if ask_yes_no('Do you wish to permanently add this host/key pair to known_hosts file?'):
+ client._host_keys.add(hostname, key.get_name(), key)
+ client.save_host_keys(client._host_keys_filename)
class SourceAdapter(HTTPAdapter):
"""
diff --git a/python/vyos/system/grub.py b/python/vyos/system/grub.py
index 2e8b20972..864ed65aa 100644
--- a/python/vyos/system/grub.py
+++ b/python/vyos/system/grub.py
@@ -17,6 +17,7 @@ import platform
from pathlib import Path
from re import MULTILINE, compile as re_compile
+from shutil import copy2
from typing import Union
from uuid import uuid5, NAMESPACE_URL, UUID
@@ -422,3 +423,38 @@ def set_kernel_cmdline_options(cmdline_options: str, version_name: str,
version_add(version_name=version_name, root_dir=root_dir,
boot_opts_config=cmdline_options)
+
+
+def sort_inodes(dir_path: str) -> None:
+ """Sort inodes for files inside a folder
+ Regenerate inodes for each file to get the same order for both inodes
+ and file names
+
+ GRUB iterates files by inodes, not alphabetically. Therefore, if we
+ want to read them in proper order, we need to sort inodes for all
+ config files in a folder.
+
+ Args:
+ dir_path (str): a path to directory
+ """
+ dir_content: list[Path] = sorted(Path(dir_path).iterdir())
+ temp_list_old: list[Path] = []
+ temp_list_new: list[Path] = []
+
+ # create a copy of all files, to get new inodes
+ for item in dir_content:
+ # skip directories
+ if item.is_dir():
+ continue
+ # create a new copy of file with a temporary name
+ copy_path = Path(f'{item.as_posix()}_tmp')
+ copy2(item, Path(copy_path))
+ temp_list_old.append(item)
+ temp_list_new.append(copy_path)
+
+ # delete old files and rename new ones
+ for item in temp_list_old:
+ item.unlink()
+ for item in temp_list_new:
+ new_name = Path(f'{item.as_posix()[0:-4]}')
+ item.rename(new_name)
diff --git a/python/vyos/tpm.py b/python/vyos/tpm.py
new file mode 100644
index 000000000..f120e10c4
--- /dev/null
+++ b/python/vyos/tpm.py
@@ -0,0 +1,98 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 2024 VyOS maintainers and contributors
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 or later as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import tempfile
+
+from vyos.util import rc_cmd
+
+default_pcrs = ['0','2','4','7']
+tpm_handle = 0x81000000
+
+def init_tpm(clear=False):
+ """
+ Initialize TPM
+ """
+ code, output = rc_cmd('tpm2_startup' + (' -c' if clear else ''))
+ if code != 0:
+ raise Exception('init_tpm: Failed to initialize TPM')
+
+def clear_tpm_key():
+ """
+ Clear existing key on TPM
+ """
+ code, output = rc_cmd(f'tpm2_evictcontrol -C o -c {tpm_handle}')
+ if code != 0:
+ raise Exception('clear_tpm_key: Failed to clear TPM key')
+
+def read_tpm_key(index=0, pcrs=default_pcrs):
+ """
+ Read existing key on TPM
+ """
+ with tempfile.TemporaryDirectory() as tpm_dir:
+ pcr_str = ",".join(pcrs)
+
+ tpm_key_file = os.path.join(tpm_dir, 'tpm_key.key')
+ code, output = rc_cmd(f'tpm2_unseal -c {tpm_handle + index} -p pcr:sha256:{pcr_str} -o {tpm_key_file}')
+ if code != 0:
+ raise Exception('read_tpm_key: Failed to read key from TPM')
+
+ with open(tpm_key_file, 'rb') as f:
+ tpm_key = f.read()
+
+ return tpm_key
+
+def write_tpm_key(key, index=0, pcrs=default_pcrs):
+ """
+ Saves key to TPM
+ """
+ with tempfile.TemporaryDirectory() as tpm_dir:
+ pcr_str = ",".join(pcrs)
+
+ policy_file = os.path.join(tpm_dir, 'policy.digest')
+ code, output = rc_cmd(f'tpm2_createpolicy --policy-pcr -l sha256:{pcr_str} -L {policy_file}')
+ if code != 0:
+ raise Exception('write_tpm_key: Failed to create policy digest')
+
+ primary_context_file = os.path.join(tpm_dir, 'primary.ctx')
+ code, output = rc_cmd(f'tpm2_createprimary -C e -g sha256 -G rsa -c {primary_context_file}')
+ if code != 0:
+ raise Exception('write_tpm_key: Failed to create primary key')
+
+ key_file = os.path.join(tpm_dir, 'crypt.key')
+ with open(key_file, 'wb') as f:
+ f.write(key)
+
+ public_obj = os.path.join(tpm_dir, 'obj.pub')
+ private_obj = os.path.join(tpm_dir, 'obj.key')
+ code, output = rc_cmd(
+ f'tpm2_create -g sha256 \
+ -u {public_obj} -r {private_obj} \
+ -C {primary_context_file} -L {policy_file} -i {key_file}')
+
+ if code != 0:
+ raise Exception('write_tpm_key: Failed to create object')
+
+ load_context_file = os.path.join(tpm_dir, 'load.ctx')
+ code, output = rc_cmd(f'tpm2_load -C {primary_context_file} -u {public_obj} -r {private_obj} -c {load_context_file}')
+
+ if code != 0:
+ raise Exception('write_tpm_key: Failed to load object')
+
+ code, output = rc_cmd(f'tpm2_evictcontrol -c {load_context_file} -C o {tpm_handle + index}')
+
+ if code != 0:
+ raise Exception('write_tpm_key: Failed to write object to TPM')