summaryrefslogtreecommitdiff
path: root/tests/unittests/test_gpg.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/test_gpg.py')
-rw-r--r--tests/unittests/test_gpg.py139
1 files changed, 139 insertions, 0 deletions
diff --git a/tests/unittests/test_gpg.py b/tests/unittests/test_gpg.py
new file mode 100644
index 00000000..c3772e3f
--- /dev/null
+++ b/tests/unittests/test_gpg.py
@@ -0,0 +1,139 @@
+from unittest import mock
+
+import pytest
+
+from cloudinit import gpg, subp
+from tests.unittests.helpers import CiTestCase
+
+TEST_KEY_HUMAN = """
+/etc/apt/cloud-init.gpg.d/my_key.gpg
+--------------------------------------------
+pub rsa4096 2021-10-22 [SC]
+ 3A3E F34D FDED B3B7 F3FD F603 F83F 7712 9A5E BD85
+uid [ unknown] Brett Holman <brett.holman@canonical.com>
+sub rsa4096 2021-10-22 [A]
+sub rsa4096 2021-10-22 [E]
+"""
+
+TEST_KEY_MACHINE = """
+tru::1:1635129362:0:3:1:5
+pub:-:4096:1:F83F77129A5EBD85:1634912922:::-:::scESCA::::::23::0:
+fpr:::::::::3A3EF34DFDEDB3B7F3FDF603F83F77129A5EBD85:
+uid:-::::1634912922::64F1F1D6FA96316752D635D7C6406C52C40713C7::Brett Holman \
+<brett.holman@canonical.com>::::::::::0:
+sub:-:4096:1:544B39C9A9141F04:1634912922::::::a::::::23:
+fpr:::::::::8BD901490D6EC986D03D6F0D544B39C9A9141F04:
+sub:-:4096:1:F45D9443F0A87092:1634912922::::::e::::::23:
+fpr:::::::::8CCCB332317324F030A45B19F45D9443F0A87092:
+"""
+
+TEST_KEY_FINGERPRINT_HUMAN = (
+ "3A3E F34D FDED B3B7 F3FD F603 F83F 7712 9A5E BD85"
+)
+
+TEST_KEY_FINGERPRINT_MACHINE = "3A3EF34DFDEDB3B7F3FDF603F83F77129A5EBD85"
+
+
+class TestGPGCommands:
+ def test_dearmor_bad_value(self):
+ """This exception is handled by the callee. Ensure it is not caught
+ internally.
+ """
+ with mock.patch.object(
+ subp, "subp", side_effect=subp.ProcessExecutionError
+ ):
+ with pytest.raises(subp.ProcessExecutionError):
+ gpg.dearmor("garbage key value")
+
+ def test_gpg_list_args(self):
+ """Verify correct command gets called to list keys"""
+ no_colons = [
+ "gpg",
+ "--with-fingerprint",
+ "--no-default-keyring",
+ "--list-keys",
+ "--keyring",
+ "key",
+ ]
+ colons = [
+ "gpg",
+ "--with-fingerprint",
+ "--no-default-keyring",
+ "--list-keys",
+ "--keyring",
+ "--with-colons",
+ "key",
+ ]
+ with mock.patch.object(subp, "subp", return_value=("", "")) as m_subp:
+ gpg.list("key")
+ assert mock.call(colons, capture=True) == m_subp.call_args
+
+ gpg.list("key", human_output=True)
+ test_calls = mock.call((no_colons), capture=True)
+ assert test_calls == m_subp.call_args
+
+ def test_gpg_dearmor_args(self):
+ """Verify correct command gets called to dearmor keys"""
+ with mock.patch.object(subp, "subp", return_value=("", "")) as m_subp:
+ gpg.dearmor("key")
+ test_call = mock.call(
+ ["gpg", "--dearmor"], data="key", decode=False
+ )
+ assert test_call == m_subp.call_args
+
+ @mock.patch("cloudinit.gpg.time.sleep")
+ @mock.patch("cloudinit.gpg.subp.subp")
+ class TestReceiveKeys(CiTestCase):
+ """Test the recv_key method."""
+
+ def test_retries_on_subp_exc(self, m_subp, m_sleep):
+ """retry should be done on gpg receive keys failure."""
+ retries = (1, 2, 4)
+ my_exc = subp.ProcessExecutionError(
+ stdout="", stderr="", exit_code=2, cmd=["mycmd"]
+ )
+ m_subp.side_effect = (my_exc, my_exc, ("", ""))
+ gpg.recv_key("ABCD", "keyserver.example.com", retries=retries)
+ self.assertEqual(
+ [mock.call(1), mock.call(2)], m_sleep.call_args_list
+ )
+
+ def test_raises_error_after_retries(self, m_subp, m_sleep):
+ """If the final run fails, error should be raised."""
+ naplen = 1
+ keyid, keyserver = ("ABCD", "keyserver.example.com")
+ m_subp.side_effect = subp.ProcessExecutionError(
+ stdout="", stderr="", exit_code=2, cmd=["mycmd"]
+ )
+ with self.assertRaises(ValueError) as rcm:
+ gpg.recv_key(keyid, keyserver, retries=(naplen,))
+ self.assertIn(keyid, str(rcm.exception))
+ self.assertIn(keyserver, str(rcm.exception))
+ m_sleep.assert_called_with(naplen)
+
+ def test_no_retries_on_none(self, m_subp, m_sleep):
+ """retry should not be done if retries is None."""
+ m_subp.side_effect = subp.ProcessExecutionError(
+ stdout="", stderr="", exit_code=2, cmd=["mycmd"]
+ )
+ with self.assertRaises(ValueError):
+ gpg.recv_key("ABCD", "keyserver.example.com", retries=None)
+ m_sleep.assert_not_called()
+
+ def test_expected_gpg_command(self, m_subp, m_sleep):
+ """Verify gpg is called with expected args."""
+ key, keyserver = ("DEADBEEF", "keyserver.example.com")
+ retries = (1, 2, 4)
+ m_subp.return_value = ("", "")
+ gpg.recv_key(key, keyserver, retries=retries)
+ m_subp.assert_called_once_with(
+ [
+ "gpg",
+ "--no-tty",
+ "--keyserver=%s" % keyserver,
+ "--recv-keys",
+ key,
+ ],
+ capture=True,
+ )
+ m_sleep.assert_not_called()