summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cloudinit/gpg.py52
-rw-r--r--cloudinit/tests/test_gpg.py54
2 files changed, 96 insertions, 10 deletions
diff --git a/cloudinit/gpg.py b/cloudinit/gpg.py
index d58d73e0..7fe17a2e 100644
--- a/cloudinit/gpg.py
+++ b/cloudinit/gpg.py
@@ -10,6 +10,8 @@
from cloudinit import log as logging
from cloudinit import util
+import time
+
LOG = logging.getLogger(__name__)
@@ -25,16 +27,46 @@ def export_armour(key):
return armour
-def recv_key(key, keyserver):
- """Receive gpg key from the specified keyserver"""
- LOG.debug('Receive gpg key "%s"', key)
- try:
- util.subp(["gpg", "--keyserver", keyserver, "--recv", key],
- capture=True)
- except util.ProcessExecutionError as error:
- raise ValueError(('Failed to import key "%s" '
- 'from server "%s" - error %s') %
- (key, keyserver, error))
+def recv_key(key, keyserver, retries=(1, 1)):
+ """Receive gpg key from the specified keyserver.
+
+ Retries are done by default because keyservers can be unreliable.
+ Additionally, there is no way to determine the difference between
+ a non-existant key and a failure. In both cases gpg (at least 2.2.4)
+ exits with status 2 and stderr: "keyserver receive failed: No data"
+ It is assumed that a key provided to cloud-init exists on the keyserver
+ so re-trying makes better sense than failing.
+
+ @param key: a string key fingerprint (as passed to gpg --recv-keys).
+ @param keyserver: the keyserver to request keys from.
+ @param retries: an iterable of sleep lengths for retries.
+ Use None to indicate no retries."""
+ LOG.debug("Importing key '%s' from keyserver '%s'", key, keyserver)
+ cmd = ["gpg", "--keyserver=%s" % keyserver, "--recv-keys", key]
+ if retries is None:
+ retries = []
+ trynum = 0
+ error = None
+ sleeps = iter(retries)
+ while True:
+ trynum += 1
+ try:
+ util.subp(cmd, capture=True)
+ LOG.debug("Imported key '%s' from keyserver '%s' on try %d",
+ key, keyserver, trynum)
+ return
+ except util.ProcessExecutionError as e:
+ error = e
+ try:
+ naplen = next(sleeps)
+ LOG.debug(
+ "Import failed with exit code %d, will try again in %ss",
+ error.exit_code, naplen)
+ time.sleep(naplen)
+ except StopIteration:
+ raise ValueError(
+ ("Failed to import key '%s' from keyserver '%s' "
+ "after %d tries: %s") % (key, keyserver, trynum, error))
def delete_key(key):
diff --git a/cloudinit/tests/test_gpg.py b/cloudinit/tests/test_gpg.py
new file mode 100644
index 00000000..0562b966
--- /dev/null
+++ b/cloudinit/tests/test_gpg.py
@@ -0,0 +1,54 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+"""Test gpg module."""
+
+from cloudinit import gpg
+from cloudinit import util
+from cloudinit.tests.helpers import CiTestCase
+
+import mock
+
+
+@mock.patch("cloudinit.gpg.time.sleep")
+@mock.patch("cloudinit.gpg.util.subp")
+class TestReceiveKeys(CiTestCase):
+ """Test the recv_key method."""
+
+ def test_retries_on_subp_exc(self, m_subp, m_sleep):
+ """retry should be done on gpg receive keys failure."""
+ retries = (1, 2, 4)
+ my_exc = util.ProcessExecutionError(
+ stdout='', stderr='', exit_code=2, cmd=['mycmd'])
+ m_subp.side_effect = (my_exc, my_exc, ('', ''))
+ gpg.recv_key("ABCD", "keyserver.example.com", retries=retries)
+ self.assertEqual([mock.call(1), mock.call(2)], m_sleep.call_args_list)
+
+ def test_raises_error_after_retries(self, m_subp, m_sleep):
+ """If the final run fails, error should be raised."""
+ naplen = 1
+ keyid, keyserver = ("ABCD", "keyserver.example.com")
+ m_subp.side_effect = util.ProcessExecutionError(
+ stdout='', stderr='', exit_code=2, cmd=['mycmd'])
+ with self.assertRaises(ValueError) as rcm:
+ gpg.recv_key(keyid, keyserver, retries=(naplen,))
+ self.assertIn(keyid, str(rcm.exception))
+ self.assertIn(keyserver, str(rcm.exception))
+ m_sleep.assert_called_with(naplen)
+
+ def test_no_retries_on_none(self, m_subp, m_sleep):
+ """retry should not be done if retries is None."""
+ m_subp.side_effect = util.ProcessExecutionError(
+ stdout='', stderr='', exit_code=2, cmd=['mycmd'])
+ with self.assertRaises(ValueError):
+ gpg.recv_key("ABCD", "keyserver.example.com", retries=None)
+ m_sleep.assert_not_called()
+
+ def test_expected_gpg_command(self, m_subp, m_sleep):
+ """Verify gpg is called with expected args."""
+ key, keyserver = ("DEADBEEF", "keyserver.example.com")
+ retries = (1, 2, 4)
+ m_subp.return_value = ('', '')
+ gpg.recv_key(key, keyserver, retries=retries)
+ m_subp.assert_called_once_with(
+ ['gpg', '--keyserver=%s' % keyserver, '--recv-keys', key],
+ capture=True)
+ m_sleep.assert_not_called()