summaryrefslogtreecommitdiff
path: root/tests/unittests/test_handler
diff options
context:
space:
mode:
authorcawamata <1749824+cawamata@users.noreply.github.com>2020-12-18 00:59:48 +0900
committerGitHub <noreply@github.com>2020-12-17 10:59:48 -0500
commita5484d02973e5710442c11e1dc6b1153695c9a59 (patch)
tree83cf9f51f770c1e37545942c8382db9951215d85 /tests/unittests/test_handler
parent913818553a8db236e20751c81dd0e2a27124617c (diff)
downloadvyos-cloud-init-a5484d02973e5710442c11e1dc6b1153695c9a59.tar.gz
vyos-cloud-init-a5484d02973e5710442c11e1dc6b1153695c9a59.zip
cc_ca_certs: add RHEL support (#633)
This refactors cc_ca_certs to support non-ca-certificates distros, and adds RHEL support.
Diffstat (limited to 'tests/unittests/test_handler')
-rw-r--r--tests/unittests/test_handler/test_handler_ca_certs.py292
1 files changed, 182 insertions, 110 deletions
diff --git a/tests/unittests/test_handler/test_handler_ca_certs.py b/tests/unittests/test_handler/test_handler_ca_certs.py
index a16430d5..6e3831ed 100644
--- a/tests/unittests/test_handler/test_handler_ca_certs.py
+++ b/tests/unittests/test_handler/test_handler_ca_certs.py
@@ -47,12 +47,20 @@ class TestConfig(TestCase):
def setUp(self):
super(TestConfig, self).setUp()
self.name = "ca-certs"
- distro = self._fetch_distro('ubuntu')
self.paths = None
- self.cloud = cloud.Cloud(None, self.paths, None, distro, None)
self.log = logging.getLogger("TestNoConfig")
self.args = []
+ def _fetch_distro(self, kind):
+ cls = distros.fetch(kind)
+ paths = helpers.Paths({})
+ return cls(kind, {}, paths)
+
+ def _get_cloud(self, kind):
+ distro = self._fetch_distro(kind)
+ return cloud.Cloud(None, self.paths, None, distro, None)
+
+ def _mock_init(self):
self.mocks = ExitStack()
self.addCleanup(self.mocks.close)
@@ -64,11 +72,6 @@ class TestConfig(TestCase):
self.mock_remove = self.mocks.enter_context(
mock.patch.object(cc_ca_certs, 'remove_default_ca_certs'))
- def _fetch_distro(self, kind):
- cls = distros.fetch(kind)
- paths = helpers.Paths({})
- return cls(kind, {}, paths)
-
def test_no_trusted_list(self):
"""
Test that no certificates are written if the 'trusted' key is not
@@ -76,71 +79,95 @@ class TestConfig(TestCase):
"""
config = {"ca-certs": {}}
- cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
+ for distro_name in cc_ca_certs.distros:
+ self._mock_init()
+ cloud = self._get_cloud(distro_name)
+ cc_ca_certs.handle(self.name, config, cloud, self.log, self.args)
- self.assertEqual(self.mock_add.call_count, 0)
- self.assertEqual(self.mock_update.call_count, 1)
- self.assertEqual(self.mock_remove.call_count, 0)
+ self.assertEqual(self.mock_add.call_count, 0)
+ self.assertEqual(self.mock_update.call_count, 1)
+ self.assertEqual(self.mock_remove.call_count, 0)
def test_empty_trusted_list(self):
"""Test that no certificate are written if 'trusted' list is empty."""
config = {"ca-certs": {"trusted": []}}
- cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
+ for distro_name in cc_ca_certs.distros:
+ self._mock_init()
+ cloud = self._get_cloud(distro_name)
+ cc_ca_certs.handle(self.name, config, cloud, self.log, self.args)
- self.assertEqual(self.mock_add.call_count, 0)
- self.assertEqual(self.mock_update.call_count, 1)
- self.assertEqual(self.mock_remove.call_count, 0)
+ self.assertEqual(self.mock_add.call_count, 0)
+ self.assertEqual(self.mock_update.call_count, 1)
+ self.assertEqual(self.mock_remove.call_count, 0)
def test_single_trusted(self):
"""Test that a single cert gets passed to add_ca_certs."""
config = {"ca-certs": {"trusted": ["CERT1"]}}
- cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
+ for distro_name in cc_ca_certs.distros:
+ self._mock_init()
+ cloud = self._get_cloud(distro_name)
+ conf = cc_ca_certs._distro_ca_certs_configs(distro_name)
+ cc_ca_certs.handle(self.name, config, cloud, self.log, self.args)
- self.mock_add.assert_called_once_with(['CERT1'])
- self.assertEqual(self.mock_update.call_count, 1)
- self.assertEqual(self.mock_remove.call_count, 0)
+ self.mock_add.assert_called_once_with(conf, ['CERT1'])
+ self.assertEqual(self.mock_update.call_count, 1)
+ self.assertEqual(self.mock_remove.call_count, 0)
def test_multiple_trusted(self):
"""Test that multiple certs get passed to add_ca_certs."""
config = {"ca-certs": {"trusted": ["CERT1", "CERT2"]}}
- cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
+ for distro_name in cc_ca_certs.distros:
+ self._mock_init()
+ cloud = self._get_cloud(distro_name)
+ conf = cc_ca_certs._distro_ca_certs_configs(distro_name)
+ cc_ca_certs.handle(self.name, config, cloud, self.log, self.args)
- self.mock_add.assert_called_once_with(['CERT1', 'CERT2'])
- self.assertEqual(self.mock_update.call_count, 1)
- self.assertEqual(self.mock_remove.call_count, 0)
+ self.mock_add.assert_called_once_with(conf, ['CERT1', 'CERT2'])
+ self.assertEqual(self.mock_update.call_count, 1)
+ self.assertEqual(self.mock_remove.call_count, 0)
def test_remove_default_ca_certs(self):
"""Test remove_defaults works as expected."""
config = {"ca-certs": {"remove-defaults": True}}
- cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
+ for distro_name in cc_ca_certs.distros:
+ self._mock_init()
+ cloud = self._get_cloud(distro_name)
+ cc_ca_certs.handle(self.name, config, cloud, self.log, self.args)
- self.assertEqual(self.mock_add.call_count, 0)
- self.assertEqual(self.mock_update.call_count, 1)
- self.assertEqual(self.mock_remove.call_count, 1)
+ self.assertEqual(self.mock_add.call_count, 0)
+ self.assertEqual(self.mock_update.call_count, 1)
+ self.assertEqual(self.mock_remove.call_count, 1)
def test_no_remove_defaults_if_false(self):
"""Test remove_defaults is not called when config value is False."""
config = {"ca-certs": {"remove-defaults": False}}
- cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
+ for distro_name in cc_ca_certs.distros:
+ self._mock_init()
+ cloud = self._get_cloud(distro_name)
+ cc_ca_certs.handle(self.name, config, cloud, self.log, self.args)
- self.assertEqual(self.mock_add.call_count, 0)
- self.assertEqual(self.mock_update.call_count, 1)
- self.assertEqual(self.mock_remove.call_count, 0)
+ self.assertEqual(self.mock_add.call_count, 0)
+ self.assertEqual(self.mock_update.call_count, 1)
+ self.assertEqual(self.mock_remove.call_count, 0)
def test_correct_order_for_remove_then_add(self):
"""Test remove_defaults is not called when config value is False."""
config = {"ca-certs": {"remove-defaults": True, "trusted": ["CERT1"]}}
- cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
+ for distro_name in cc_ca_certs.distros:
+ self._mock_init()
+ cloud = self._get_cloud(distro_name)
+ conf = cc_ca_certs._distro_ca_certs_configs(distro_name)
+ cc_ca_certs.handle(self.name, config, cloud, self.log, self.args)
- self.mock_add.assert_called_once_with(['CERT1'])
- self.assertEqual(self.mock_update.call_count, 1)
- self.assertEqual(self.mock_remove.call_count, 1)
+ self.mock_add.assert_called_once_with(conf, ['CERT1'])
+ self.assertEqual(self.mock_update.call_count, 1)
+ self.assertEqual(self.mock_remove.call_count, 1)
class TestAddCaCerts(TestCase):
@@ -154,11 +181,18 @@ class TestAddCaCerts(TestCase):
})
self.add_patch("cloudinit.config.cc_ca_certs.os.stat", "m_stat")
+ def _fetch_distro(self, kind):
+ cls = distros.fetch(kind)
+ paths = helpers.Paths({})
+ return cls(kind, {}, paths)
+
def test_no_certs_in_list(self):
"""Test that no certificate are written if not provided."""
- with mock.patch.object(util, 'write_file') as mockobj:
- cc_ca_certs.add_ca_certs([])
- self.assertEqual(mockobj.call_count, 0)
+ for distro_name in cc_ca_certs.distros:
+ conf = cc_ca_certs._distro_ca_certs_configs(distro_name)
+ with mock.patch.object(util, 'write_file') as mockobj:
+ cc_ca_certs.add_ca_certs(conf, [])
+ self.assertEqual(mockobj.call_count, 0)
def test_single_cert_trailing_cr(self):
"""Test adding a single certificate to the trusted CAs
@@ -168,20 +202,28 @@ class TestAddCaCerts(TestCase):
ca_certs_content = "line1\nline2\ncloud-init-ca-certs.crt\nline3\n"
expected = "line1\nline2\nline3\ncloud-init-ca-certs.crt\n"
- with ExitStack() as mocks:
- mock_write = mocks.enter_context(
- mock.patch.object(util, 'write_file'))
- mock_load = mocks.enter_context(
- mock.patch.object(util, 'load_file',
- return_value=ca_certs_content))
+ self.m_stat.return_value.st_size = 1
- cc_ca_certs.add_ca_certs([cert])
+ for distro_name in cc_ca_certs.distros:
+ conf = cc_ca_certs._distro_ca_certs_configs(distro_name)
- mock_write.assert_has_calls([
- mock.call("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
- cert, mode=0o644),
- mock.call("/etc/ca-certificates.conf", expected, omode="wb")])
- mock_load.assert_called_once_with("/etc/ca-certificates.conf")
+ with ExitStack() as mocks:
+ mock_write = mocks.enter_context(
+ mock.patch.object(util, 'write_file'))
+ mock_load = mocks.enter_context(
+ mock.patch.object(util, 'load_file',
+ return_value=ca_certs_content))
+
+ cc_ca_certs.add_ca_certs(conf, [cert])
+
+ mock_write.assert_has_calls([
+ mock.call(conf['ca_cert_full_path'],
+ cert, mode=0o644)])
+ if conf['ca_cert_config'] is not None:
+ mock_write.assert_has_calls([
+ mock.call(conf['ca_cert_config'],
+ expected, omode="wb")])
+ mock_load.assert_called_once_with(conf['ca_cert_config'])
def test_single_cert_no_trailing_cr(self):
"""Test adding a single certificate to the trusted CAs
@@ -190,24 +232,31 @@ class TestAddCaCerts(TestCase):
ca_certs_content = "line1\nline2\nline3"
- with ExitStack() as mocks:
- mock_write = mocks.enter_context(
- mock.patch.object(util, 'write_file'))
- mock_load = mocks.enter_context(
- mock.patch.object(util, 'load_file',
- return_value=ca_certs_content))
+ self.m_stat.return_value.st_size = 1
+
+ for distro_name in cc_ca_certs.distros:
+ conf = cc_ca_certs._distro_ca_certs_configs(distro_name)
- cc_ca_certs.add_ca_certs([cert])
+ with ExitStack() as mocks:
+ mock_write = mocks.enter_context(
+ mock.patch.object(util, 'write_file'))
+ mock_load = mocks.enter_context(
+ mock.patch.object(util, 'load_file',
+ return_value=ca_certs_content))
- mock_write.assert_has_calls([
- mock.call("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
- cert, mode=0o644),
- mock.call("/etc/ca-certificates.conf",
- "%s\n%s\n" % (ca_certs_content,
- "cloud-init-ca-certs.crt"),
- omode="wb")])
+ cc_ca_certs.add_ca_certs(conf, [cert])
- mock_load.assert_called_once_with("/etc/ca-certificates.conf")
+ mock_write.assert_has_calls([
+ mock.call(conf['ca_cert_full_path'],
+ cert, mode=0o644)])
+ if conf['ca_cert_config'] is not None:
+ mock_write.assert_has_calls([
+ mock.call(conf['ca_cert_config'],
+ "%s\n%s\n" % (ca_certs_content,
+ conf['ca_cert_filename']),
+ omode="wb")])
+
+ mock_load.assert_called_once_with(conf['ca_cert_config'])
def test_single_cert_to_empty_existing_ca_file(self):
"""Test adding a single certificate to the trusted CAs
@@ -216,15 +265,22 @@ class TestAddCaCerts(TestCase):
expected = "cloud-init-ca-certs.crt\n"
- with mock.patch.object(util, 'write_file', autospec=True) as m_write:
- self.m_stat.return_value.st_size = 0
+ self.m_stat.return_value.st_size = 0
+
+ for distro_name in cc_ca_certs.distros:
+ conf = cc_ca_certs._distro_ca_certs_configs(distro_name)
+ with mock.patch.object(util, 'write_file',
+ autospec=True) as m_write:
- cc_ca_certs.add_ca_certs([cert])
+ cc_ca_certs.add_ca_certs(conf, [cert])
- m_write.assert_has_calls([
- mock.call("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
- cert, mode=0o644),
- mock.call("/etc/ca-certificates.conf", expected, omode="wb")])
+ m_write.assert_has_calls([
+ mock.call(conf['ca_cert_full_path'],
+ cert, mode=0o644)])
+ if conf['ca_cert_config'] is not None:
+ m_write.assert_has_calls([
+ mock.call(conf['ca_cert_config'],
+ expected, omode="wb")])
def test_multiple_certs(self):
"""Test adding multiple certificates to the trusted CAs."""
@@ -232,32 +288,41 @@ class TestAddCaCerts(TestCase):
expected_cert_file = "\n".join(certs)
ca_certs_content = "line1\nline2\nline3"
- with ExitStack() as mocks:
- mock_write = mocks.enter_context(
- mock.patch.object(util, 'write_file'))
- mock_load = mocks.enter_context(
- mock.patch.object(util, 'load_file',
- return_value=ca_certs_content))
+ self.m_stat.return_value.st_size = 1
- cc_ca_certs.add_ca_certs(certs)
+ for distro_name in cc_ca_certs.distros:
+ conf = cc_ca_certs._distro_ca_certs_configs(distro_name)
- mock_write.assert_has_calls([
- mock.call("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
- expected_cert_file, mode=0o644),
- mock.call("/etc/ca-certificates.conf",
- "%s\n%s\n" % (ca_certs_content,
- "cloud-init-ca-certs.crt"),
- omode='wb')])
+ with ExitStack() as mocks:
+ mock_write = mocks.enter_context(
+ mock.patch.object(util, 'write_file'))
+ mock_load = mocks.enter_context(
+ mock.patch.object(util, 'load_file',
+ return_value=ca_certs_content))
- mock_load.assert_called_once_with("/etc/ca-certificates.conf")
+ cc_ca_certs.add_ca_certs(conf, certs)
+
+ mock_write.assert_has_calls([
+ mock.call(conf['ca_cert_full_path'],
+ expected_cert_file, mode=0o644)])
+ if conf['ca_cert_config'] is not None:
+ mock_write.assert_has_calls([
+ mock.call(conf['ca_cert_config'],
+ "%s\n%s\n" % (ca_certs_content,
+ conf['ca_cert_filename']),
+ omode='wb')])
+
+ mock_load.assert_called_once_with(conf['ca_cert_config'])
class TestUpdateCaCerts(unittest.TestCase):
def test_commands(self):
- with mock.patch.object(subp, 'subp') as mockobj:
- cc_ca_certs.update_ca_certs()
- mockobj.assert_called_once_with(
- ["update-ca-certificates"], capture=False)
+ for distro_name in cc_ca_certs.distros:
+ conf = cc_ca_certs._distro_ca_certs_configs(distro_name)
+ with mock.patch.object(subp, 'subp') as mockobj:
+ cc_ca_certs.update_ca_certs(conf)
+ mockobj.assert_called_once_with(
+ conf['ca_cert_update_cmd'], capture=False)
class TestRemoveDefaultCaCerts(TestCase):
@@ -271,24 +336,31 @@ class TestRemoveDefaultCaCerts(TestCase):
})
def test_commands(self):
- with ExitStack() as mocks:
- mock_delete = mocks.enter_context(
- mock.patch.object(util, 'delete_dir_contents'))
- mock_write = mocks.enter_context(
- mock.patch.object(util, 'write_file'))
- mock_subp = mocks.enter_context(mock.patch.object(subp, 'subp'))
-
- cc_ca_certs.remove_default_ca_certs('ubuntu')
-
- mock_delete.assert_has_calls([
- mock.call("/usr/share/ca-certificates/"),
- mock.call("/etc/ssl/certs/")])
-
- mock_write.assert_called_once_with(
- "/etc/ca-certificates.conf", "", mode=0o644)
-
- mock_subp.assert_called_once_with(
- ('debconf-set-selections', '-'),
- "ca-certificates ca-certificates/trust_new_crts select no")
+ for distro_name in cc_ca_certs.distros:
+ conf = cc_ca_certs._distro_ca_certs_configs(distro_name)
+
+ with ExitStack() as mocks:
+ mock_delete = mocks.enter_context(
+ mock.patch.object(util, 'delete_dir_contents'))
+ mock_write = mocks.enter_context(
+ mock.patch.object(util, 'write_file'))
+ mock_subp = mocks.enter_context(
+ mock.patch.object(subp, 'subp'))
+
+ cc_ca_certs.remove_default_ca_certs(distro_name, conf)
+
+ mock_delete.assert_has_calls([
+ mock.call(conf['ca_cert_path']),
+ mock.call(conf['ca_cert_system_path'])])
+
+ if conf['ca_cert_config'] is not None:
+ mock_write.assert_called_once_with(
+ conf['ca_cert_config'], "", mode=0o644)
+
+ if distro_name in ['debian', 'ubuntu']:
+ mock_subp.assert_called_once_with(
+ ('debconf-set-selections', '-'),
+ "ca-certificates \
+ca-certificates/trust_new_crts select no")
# vi: ts=4 expandtab