summaryrefslogtreecommitdiff
path: root/tests/unittests/test_gpg.py
blob: c3772e3fe21754b33497f8443fd525919748a041 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
from unittest import mock

import pytest

from cloudinit import gpg, subp
from tests.unittests.helpers import CiTestCase

TEST_KEY_HUMAN = """
/etc/apt/cloud-init.gpg.d/my_key.gpg
--------------------------------------------
pub   rsa4096 2021-10-22 [SC]
      3A3E F34D FDED B3B7 F3FD  F603 F83F 7712 9A5E BD85
uid           [ unknown] Brett Holman <brett.holman@canonical.com>
sub   rsa4096 2021-10-22 [A]
sub   rsa4096 2021-10-22 [E]
"""

TEST_KEY_MACHINE = """
tru::1:1635129362:0:3:1:5
pub:-:4096:1:F83F77129A5EBD85:1634912922:::-:::scESCA::::::23::0:
fpr:::::::::3A3EF34DFDEDB3B7F3FDF603F83F77129A5EBD85:
uid:-::::1634912922::64F1F1D6FA96316752D635D7C6406C52C40713C7::Brett Holman \
<brett.holman@canonical.com>::::::::::0:
sub:-:4096:1:544B39C9A9141F04:1634912922::::::a::::::23:
fpr:::::::::8BD901490D6EC986D03D6F0D544B39C9A9141F04:
sub:-:4096:1:F45D9443F0A87092:1634912922::::::e::::::23:
fpr:::::::::8CCCB332317324F030A45B19F45D9443F0A87092:
"""

TEST_KEY_FINGERPRINT_HUMAN = (
    "3A3E F34D FDED B3B7 F3FD  F603 F83F 7712 9A5E BD85"
)

TEST_KEY_FINGERPRINT_MACHINE = "3A3EF34DFDEDB3B7F3FDF603F83F77129A5EBD85"


class TestGPGCommands:
    def test_dearmor_bad_value(self):
        """This exception is handled by the callee. Ensure it is not caught
        internally.
        """
        with mock.patch.object(
            subp, "subp", side_effect=subp.ProcessExecutionError
        ):
            with pytest.raises(subp.ProcessExecutionError):
                gpg.dearmor("garbage key value")

    def test_gpg_list_args(self):
        """Verify correct command gets called to list keys"""
        no_colons = [
            "gpg",
            "--with-fingerprint",
            "--no-default-keyring",
            "--list-keys",
            "--keyring",
            "key",
        ]
        colons = [
            "gpg",
            "--with-fingerprint",
            "--no-default-keyring",
            "--list-keys",
            "--keyring",
            "--with-colons",
            "key",
        ]
        with mock.patch.object(subp, "subp", return_value=("", "")) as m_subp:
            gpg.list("key")
            assert mock.call(colons, capture=True) == m_subp.call_args

            gpg.list("key", human_output=True)
            test_calls = mock.call((no_colons), capture=True)
            assert test_calls == m_subp.call_args

    def test_gpg_dearmor_args(self):
        """Verify correct command gets called to dearmor keys"""
        with mock.patch.object(subp, "subp", return_value=("", "")) as m_subp:
            gpg.dearmor("key")
            test_call = mock.call(
                ["gpg", "--dearmor"], data="key", decode=False
            )
            assert test_call == m_subp.call_args

    @mock.patch("cloudinit.gpg.time.sleep")
    @mock.patch("cloudinit.gpg.subp.subp")
    class TestReceiveKeys(CiTestCase):
        """Test the recv_key method."""

        def test_retries_on_subp_exc(self, m_subp, m_sleep):
            """retry should be done on gpg receive keys failure."""
            retries = (1, 2, 4)
            my_exc = subp.ProcessExecutionError(
                stdout="", stderr="", exit_code=2, cmd=["mycmd"]
            )
            m_subp.side_effect = (my_exc, my_exc, ("", ""))
            gpg.recv_key("ABCD", "keyserver.example.com", retries=retries)
            self.assertEqual(
                [mock.call(1), mock.call(2)], m_sleep.call_args_list
            )

        def test_raises_error_after_retries(self, m_subp, m_sleep):
            """If the final run fails, error should be raised."""
            naplen = 1
            keyid, keyserver = ("ABCD", "keyserver.example.com")
            m_subp.side_effect = subp.ProcessExecutionError(
                stdout="", stderr="", exit_code=2, cmd=["mycmd"]
            )
            with self.assertRaises(ValueError) as rcm:
                gpg.recv_key(keyid, keyserver, retries=(naplen,))
            self.assertIn(keyid, str(rcm.exception))
            self.assertIn(keyserver, str(rcm.exception))
            m_sleep.assert_called_with(naplen)

        def test_no_retries_on_none(self, m_subp, m_sleep):
            """retry should not be done if retries is None."""
            m_subp.side_effect = subp.ProcessExecutionError(
                stdout="", stderr="", exit_code=2, cmd=["mycmd"]
            )
            with self.assertRaises(ValueError):
                gpg.recv_key("ABCD", "keyserver.example.com", retries=None)
            m_sleep.assert_not_called()

        def test_expected_gpg_command(self, m_subp, m_sleep):
            """Verify gpg is called with expected args."""
            key, keyserver = ("DEADBEEF", "keyserver.example.com")
            retries = (1, 2, 4)
            m_subp.return_value = ("", "")
            gpg.recv_key(key, keyserver, retries=retries)
            m_subp.assert_called_once_with(
                [
                    "gpg",
                    "--no-tty",
                    "--keyserver=%s" % keyserver,
                    "--recv-keys",
                    key,
                ],
                capture=True,
            )
            m_sleep.assert_not_called()