import pytest from unittest import mock from cloudinit import gpg from cloudinit import subp from tests.unittests.helpers import CiTestCase TEST_KEY_HUMAN = ''' /etc/apt/cloud-init.gpg.d/my_key.gpg -------------------------------------------- pub rsa4096 2021-10-22 [SC] 3A3E F34D FDED B3B7 F3FD F603 F83F 7712 9A5E BD85 uid [ unknown] Brett Holman sub rsa4096 2021-10-22 [A] sub rsa4096 2021-10-22 [E] ''' TEST_KEY_MACHINE = ''' tru::1:1635129362:0:3:1:5 pub:-:4096:1:F83F77129A5EBD85:1634912922:::-:::scESCA::::::23::0: fpr:::::::::3A3EF34DFDEDB3B7F3FDF603F83F77129A5EBD85: uid:-::::1634912922::64F1F1D6FA96316752D635D7C6406C52C40713C7::Brett Holman \ ::::::::::0: sub:-:4096:1:544B39C9A9141F04:1634912922::::::a::::::23: fpr:::::::::8BD901490D6EC986D03D6F0D544B39C9A9141F04: sub:-:4096:1:F45D9443F0A87092:1634912922::::::e::::::23: fpr:::::::::8CCCB332317324F030A45B19F45D9443F0A87092: ''' TEST_KEY_FINGERPRINT_HUMAN = \ '3A3E F34D FDED B3B7 F3FD F603 F83F 7712 9A5E BD85' TEST_KEY_FINGERPRINT_MACHINE = \ '3A3EF34DFDEDB3B7F3FDF603F83F77129A5EBD85' class TestGPGCommands: def test_dearmor_bad_value(self): """This exception is handled by the callee. Ensure it is not caught internally. """ with mock.patch.object( subp, 'subp', side_effect=subp.ProcessExecutionError): with pytest.raises(subp.ProcessExecutionError): gpg.dearmor('garbage key value') def test_gpg_list_args(self): """Verify correct command gets called to list keys """ no_colons = [ 'gpg', '--with-fingerprint', '--no-default-keyring', '--list-keys', '--keyring', 'key'] colons = [ 'gpg', '--with-fingerprint', '--no-default-keyring', '--list-keys', '--keyring', '--with-colons', 'key'] with mock.patch.object(subp, 'subp', return_value=('', '')) as m_subp: gpg.list('key') assert mock.call(colons, capture=True) == m_subp.call_args gpg.list('key', human_output=True) test_calls = mock.call((no_colons), capture=True) assert test_calls == m_subp.call_args def test_gpg_dearmor_args(self): """Verify correct command gets called to dearmor keys """ with mock.patch.object(subp, 'subp', return_value=('', '')) as m_subp: gpg.dearmor('key') test_call = mock.call( ["gpg", "--dearmor"], data='key', decode=False) assert test_call == m_subp.call_args @mock.patch("cloudinit.gpg.time.sleep") @mock.patch("cloudinit.gpg.subp.subp") class TestReceiveKeys(CiTestCase): """Test the recv_key method.""" def test_retries_on_subp_exc(self, m_subp, m_sleep): """retry should be done on gpg receive keys failure.""" retries = (1, 2, 4) my_exc = subp.ProcessExecutionError( stdout='', stderr='', exit_code=2, cmd=['mycmd']) m_subp.side_effect = (my_exc, my_exc, ('', '')) gpg.recv_key("ABCD", "keyserver.example.com", retries=retries) self.assertEqual( [mock.call(1), mock.call(2)], m_sleep.call_args_list) def test_raises_error_after_retries(self, m_subp, m_sleep): """If the final run fails, error should be raised.""" naplen = 1 keyid, keyserver = ("ABCD", "keyserver.example.com") m_subp.side_effect = subp.ProcessExecutionError( stdout='', stderr='', exit_code=2, cmd=['mycmd']) with self.assertRaises(ValueError) as rcm: gpg.recv_key(keyid, keyserver, retries=(naplen,)) self.assertIn(keyid, str(rcm.exception)) self.assertIn(keyserver, str(rcm.exception)) m_sleep.assert_called_with(naplen) def test_no_retries_on_none(self, m_subp, m_sleep): """retry should not be done if retries is None.""" m_subp.side_effect = subp.ProcessExecutionError( stdout='', stderr='', exit_code=2, cmd=['mycmd']) with self.assertRaises(ValueError): gpg.recv_key("ABCD", "keyserver.example.com", retries=None) m_sleep.assert_not_called() def test_expected_gpg_command(self, m_subp, m_sleep): """Verify gpg is called with expected args.""" key, keyserver = ("DEADBEEF", "keyserver.example.com") retries = (1, 2, 4) m_subp.return_value = ('', '') gpg.recv_key(key, keyserver, retries=retries) m_subp.assert_called_once_with( ['gpg', '--no-tty', '--keyserver=%s' % keyserver, '--recv-keys', key], capture=True) m_sleep.assert_not_called()