diff options
Diffstat (limited to 'tests/unittests')
-rw-r--r-- | tests/unittests/test__init__.py | 162 | ||||
-rw-r--r-- | tests/unittests/test_builtin_handlers.py | 54 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_maas.py | 64 | ||||
-rw-r--r-- | tests/unittests/test_handler/test_handler_ca_certs.py | 103 | ||||
-rw-r--r-- | tests/unittests/test_userdata.py | 135 | ||||
-rw-r--r-- | tests/unittests/test_util.py | 123 |
6 files changed, 366 insertions, 275 deletions
diff --git a/tests/unittests/test__init__.py b/tests/unittests/test__init__.py index 4f60f0ea..af18955d 100644 --- a/tests/unittests/test__init__.py +++ b/tests/unittests/test__init__.py @@ -1,18 +1,42 @@ -from mocker import MockerTestCase, ANY, ARGS, KWARGS +import StringIO +import logging import os +import sys + +from mocker import MockerTestCase, ANY, ARGS, KWARGS + +from cloudinit import handlers +from cloudinit import helpers +from cloudinit import importer +from cloudinit import log +from cloudinit import settings +from cloudinit import url_helper +from cloudinit import util + + +class FakeModule(handlers.Handler): + def __init__(self): + handlers.Handler.__init__(self, settings.PER_ALWAYS) + self.types = [] + + def list_types(self): + return self.types + + def _handle_part(self, data, ctype, filename, payload, frequency): + pass -from cloudinit import (partwalker_handle_handler, handler_handle_part, - handler_register, get_cmdline_url) -from cloudinit.util import write_file, logexc, readurl +class TestWalkerHandleHandler(MockerTestCase): -class TestPartwalkerHandleHandler(MockerTestCase): def setUp(self): + + MockerTestCase.setUp(self) + self.data = { "handlercount": 0, - "frequency": "?", - "handlerdir": "?", - "handlers": [], + "frequency": "", + "handlerdir": self.makeDir(), + "handlers": helpers.ContentHandlers(), "data": None} self.expected_module_name = "part-handler-%03d" % ( @@ -20,179 +44,138 @@ class TestPartwalkerHandleHandler(MockerTestCase): expected_file_name = "%s.py" % self.expected_module_name expected_file_fullname = os.path.join(self.data["handlerdir"], expected_file_name) - self.module_fake = "fake module handle" + self.module_fake = FakeModule() self.ctype = None self.filename = None self.payload = "dummy payload" # Mock the write_file function - write_file_mock = self.mocker.replace(write_file, passthrough=False) + write_file_mock = self.mocker.replace(util.write_file, passthrough=False) write_file_mock(expected_file_fullname, self.payload, 0600) def test_no_errors(self): """Payload gets written to file and added to C{pdata}.""" - # Mock the __import__ builtin - import_mock = self.mocker.replace("__builtin__.__import__") + import_mock = self.mocker.replace(importer.import_module, passthrough=False) import_mock(self.expected_module_name) self.mocker.result(self.module_fake) - # Mock the handle_register function - handle_reg_mock = self.mocker.replace(handler_register, - passthrough=False) - handle_reg_mock(self.module_fake, self.data["handlers"], - self.data["data"], self.data["frequency"]) - # Activate mocks self.mocker.replay() - - partwalker_handle_handler(self.data, self.ctype, self.filename, - self.payload) - + + handlers.walker_handle_handler(self.data, self.ctype, self.filename, + self.payload) + self.assertEqual(1, self.data["handlercount"]) - + def test_import_error(self): """Module import errors are logged. No handler added to C{pdata}""" - # Mock the __import__ builtin - import_mock = self.mocker.replace("__builtin__.__import__") + import_mock = self.mocker.replace(importer.import_module, passthrough=False) import_mock(self.expected_module_name) self.mocker.throw(ImportError()) - # Mock log function - logexc_mock = self.mocker.replace(logexc, passthrough=False) - logexc_mock(ANY) - # Mock the print_exc function - print_exc_mock = self.mocker.replace("traceback.print_exc", - passthrough=False) - print_exc_mock(ARGS, KWARGS) - # Activate mocks self.mocker.replay() - partwalker_handle_handler(self.data, self.ctype, self.filename, - self.payload) + handlers.walker_handle_handler(self.data, self.ctype, self.filename, + self.payload) self.assertEqual(0, self.data["handlercount"]) def test_attribute_error(self): """Attribute errors are logged. No handler added to C{pdata}""" - # Mock the __import__ builtin - import_mock = self.mocker.replace("__builtin__.__import__") + import_mock = self.mocker.replace(importer.import_module, passthrough=False) import_mock(self.expected_module_name) self.mocker.result(self.module_fake) - # Mock the handle_register function - handle_reg_mock = self.mocker.replace(handler_register, - passthrough=False) - handle_reg_mock(self.module_fake, self.data["handlers"], - self.data["data"], self.data["frequency"]) self.mocker.throw(AttributeError()) - # Mock log function - logexc_mock = self.mocker.replace(logexc, passthrough=False) - logexc_mock(ANY) - # Mock the print_exc function - print_exc_mock = self.mocker.replace("traceback.print_exc", - passthrough=False) - print_exc_mock(ARGS, KWARGS) - # Activate mocks self.mocker.replay() - partwalker_handle_handler(self.data, self.ctype, self.filename, - self.payload) + handlers.walker_handle_handler(self.data, self.ctype, self.filename, + self.payload) self.assertEqual(0, self.data["handlercount"]) class TestHandlerHandlePart(MockerTestCase): + def setUp(self): self.data = "fake data" self.ctype = "fake ctype" self.filename = "fake filename" self.payload = "fake payload" - self.frequency = "once-per-instance" + self.frequency = settings.PER_INSTANCE def test_normal_version_1(self): """ C{handle_part} is called without C{frequency} for C{handler_version} == 1. """ - # Build a mock part-handler module mod_mock = self.mocker.mock() getattr(mod_mock, "frequency") - self.mocker.result("once-per-instance") + self.mocker.result(settings.PER_INSTANCE) getattr(mod_mock, "handler_version") self.mocker.result(1) mod_mock.handle_part(self.data, self.ctype, self.filename, self.payload) self.mocker.replay() - handler_handle_part(mod_mock, self.data, self.ctype, self.filename, - self.payload, self.frequency) + handlers.run_part(mod_mock, self.data, self.ctype, self.filename, + self.payload, self.frequency) def test_normal_version_2(self): """ C{handle_part} is called with C{frequency} for C{handler_version} == 2. """ - # Build a mock part-handler module mod_mock = self.mocker.mock() getattr(mod_mock, "frequency") - self.mocker.result("once-per-instance") + self.mocker.result(settings.PER_INSTANCE) getattr(mod_mock, "handler_version") self.mocker.result(2) mod_mock.handle_part(self.data, self.ctype, self.filename, self.payload, self.frequency) self.mocker.replay() - handler_handle_part(mod_mock, self.data, self.ctype, self.filename, - self.payload, self.frequency) + handlers.run_part(mod_mock, self.data, self.ctype, self.filename, + self.payload, self.frequency) def test_modfreq_per_always(self): """ C{handle_part} is called regardless of frequency if nofreq is always. """ self.frequency = "once" - # Build a mock part-handler module mod_mock = self.mocker.mock() getattr(mod_mock, "frequency") - self.mocker.result("always") + self.mocker.result(settings.PER_ALWAYS) getattr(mod_mock, "handler_version") self.mocker.result(1) mod_mock.handle_part(self.data, self.ctype, self.filename, self.payload) self.mocker.replay() - handler_handle_part(mod_mock, self.data, self.ctype, self.filename, - self.payload, self.frequency) + handlers.run_part(mod_mock, self.data, self.ctype, self.filename, + self.payload, self.frequency) def test_no_handle_when_modfreq_once(self): """C{handle_part} is not called if frequency is once""" self.frequency = "once" - # Build a mock part-handler module mod_mock = self.mocker.mock() getattr(mod_mock, "frequency") - self.mocker.result("once-per-instance") + self.mocker.result(settings.PER_ONCE) self.mocker.replay() - handler_handle_part(mod_mock, self.data, self.ctype, self.filename, - self.payload, self.frequency) + handlers.run_part(mod_mock, self.data, self.ctype, self.filename, + self.payload, self.frequency) def test_exception_is_caught(self): """Exceptions within C{handle_part} are caught and logged.""" - # Build a mock part-handler module mod_mock = self.mocker.mock() getattr(mod_mock, "frequency") - self.mocker.result("once-per-instance") + self.mocker.result(settings.PER_INSTANCE) getattr(mod_mock, "handler_version") self.mocker.result(1) mod_mock.handle_part(self.data, self.ctype, self.filename, self.payload) self.mocker.throw(Exception()) - # Mock log function - logexc_mock = self.mocker.replace(logexc, passthrough=False) - logexc_mock(ANY) - # Mock the print_exc function - print_exc_mock = self.mocker.replace("traceback.print_exc", - passthrough=False) - print_exc_mock(ARGS, KWARGS) self.mocker.replay() - handler_handle_part(mod_mock, self.data, self.ctype, self.filename, - self.payload, self.frequency) + handlers.run_part(mod_mock, self.data, self.ctype, self.filename, + self.payload, self.frequency) class TestCmdlineUrl(MockerTestCase): @@ -202,14 +185,13 @@ class TestCmdlineUrl(MockerTestCase): payload = "0" cmdline = "ro %s=%s bar=1" % (key, url) - mock_readurl = self.mocker.replace(readurl, passthrough=False) + mock_readurl = self.mocker.replace(url_helper.readurl, passthrough=False) mock_readurl(url) - self.mocker.result(payload) - + self.mocker.result(url_helper.UrlResponse(200, payload)) self.mocker.replay() self.assertEqual((key, url, None), - get_cmdline_url(names=[key], starts="xxxxxx", cmdline=cmdline)) + util.get_cmdline_url(names=[key], starts="xxxxxx", cmdline=cmdline)) def test_valid_content(self): url = "http://example.com/foo" @@ -217,14 +199,13 @@ class TestCmdlineUrl(MockerTestCase): payload = "xcloud-config\nmydata: foo\nbar: wark\n" cmdline = "ro %s=%s bar=1" % (key, url) - mock_readurl = self.mocker.replace(readurl, passthrough=False) + mock_readurl = self.mocker.replace(url_helper.readurl, passthrough=False) mock_readurl(url) - self.mocker.result(payload) - + self.mocker.result(url_helper.UrlResponse(200, payload)) self.mocker.replay() self.assertEqual((key, url, payload), - get_cmdline_url(names=[key], starts="xcloud-config", + util.get_cmdline_url(names=[key], starts="xcloud-config", cmdline=cmdline)) def test_no_key_found(self): @@ -232,11 +213,12 @@ class TestCmdlineUrl(MockerTestCase): key = "mykey" cmdline = "ro %s=%s bar=1" % (key, url) - self.mocker.replace(readurl, passthrough=False) + self.mocker.replace(url_helper.readurl, passthrough=False) + self.mocker.result(url_helper.UrlResponse(400)) self.mocker.replay() self.assertEqual((None, None, None), - get_cmdline_url(names=["does-not-appear"], + util.get_cmdline_url(names=["does-not-appear"], starts="#cloud-config", cmdline=cmdline)) # vi: ts=4 expandtab diff --git a/tests/unittests/test_builtin_handlers.py b/tests/unittests/test_builtin_handlers.py new file mode 100644 index 00000000..84d85d4d --- /dev/null +++ b/tests/unittests/test_builtin_handlers.py @@ -0,0 +1,54 @@ +"""Tests of the built-in user data handlers""" + +import os + +from mocker import MockerTestCase + +from cloudinit import handlers +from cloudinit import helpers +from cloudinit import util + +from cloudinit.handlers import upstart_job + +from cloudinit.settings import (PER_ALWAYS, PER_INSTANCE) + + +class TestBuiltins(MockerTestCase): + + def test_upstart_frequency_no_out(self): + c_root = self.makeDir() + up_root = self.makeDir() + paths = helpers.Paths({ + 'cloud_dir': c_root, + 'upstart_dir': up_root, + }) + freq = PER_ALWAYS + h = upstart_job.UpstartJobPartHandler(paths) + # No files should be written out when + # the frequency is ! per-instance + h.handle_part('', handlers.CONTENT_START, + None, None, None) + h.handle_part('blah', 'text/upstart-job', + 'test.conf', 'blah', freq) + h.handle_part('', handlers.CONTENT_END, + None, None, None) + self.assertEquals(0, len(os.listdir(up_root))) + + def test_upstart_frequency_single(self): + c_root = self.makeDir() + up_root = self.makeDir() + paths = helpers.Paths({ + 'cloud_dir': c_root, + 'upstart_dir': up_root, + }) + freq = PER_INSTANCE + h = upstart_job.UpstartJobPartHandler(paths) + # No files should be written out when + # the frequency is ! per-instance + h.handle_part('', handlers.CONTENT_START, + None, None, None) + h.handle_part('blah', 'text/upstart-job', + 'test.conf', 'blah', freq) + h.handle_part('', handlers.CONTENT_END, + None, None, None) + self.assertEquals(1, len(os.listdir(up_root))) diff --git a/tests/unittests/test_datasource/test_maas.py b/tests/unittests/test_datasource/test_maas.py index 7659dd03..261c410a 100644 --- a/tests/unittests/test_datasource/test_maas.py +++ b/tests/unittests/test_datasource/test_maas.py @@ -1,14 +1,11 @@ -from tempfile import mkdtemp -from shutil import rmtree import os from StringIO import StringIO from copy import copy -from cloudinit.DataSourceMAAS import ( - MAASSeedDirNone, - MAASSeedDirMalformed, - read_maas_seed_dir, - read_maas_seed_url, -) + +from cloudinit import util +from cloudinit import url_helper +from cloudinit.sources import DataSourceMAAS + from mocker import MockerTestCase @@ -17,12 +14,7 @@ class TestMAASDataSource(MockerTestCase): def setUp(self): super(TestMAASDataSource, self).setUp() # Make a temp directoy for tests to use. - self.tmp = mkdtemp(prefix="unittest_") - - def tearDown(self): - super(TestMAASDataSource, self).tearDown() - # Clean up temp directory - rmtree(self.tmp) + self.tmp = self.makeDir() def test_seed_dir_valid(self): """Verify a valid seeddir is read as such""" @@ -35,7 +27,7 @@ class TestMAASDataSource(MockerTestCase): my_d = os.path.join(self.tmp, "valid") populate_dir(my_d, data) - (userdata, metadata) = read_maas_seed_dir(my_d) + (userdata, metadata) = DataSourceMAAS.read_maas_seed_dir(my_d) self.assertEqual(userdata, data['user-data']) for key in ('instance-id', 'local-hostname'): @@ -54,7 +46,7 @@ class TestMAASDataSource(MockerTestCase): my_d = os.path.join(self.tmp, "valid_extra") populate_dir(my_d, data) - (userdata, metadata) = read_maas_seed_dir(my_d) + (userdata, metadata) = DataSourceMAAS.read_maas_seed_dir(my_d) self.assertEqual(userdata, data['user-data']) for key in ('instance-id', 'local-hostname'): @@ -76,24 +68,28 @@ class TestMAASDataSource(MockerTestCase): invalid_data = copy(valid) del invalid_data['local-hostname'] populate_dir(my_d, invalid_data) - self.assertRaises(MAASSeedDirMalformed, read_maas_seed_dir, my_d) + self.assertRaises(DataSourceMAAS.MAASSeedDirMalformed, + DataSourceMAAS.read_maas_seed_dir, my_d) # missing 'instance-id' my_d = "%s-02" % my_based invalid_data = copy(valid) del invalid_data['instance-id'] populate_dir(my_d, invalid_data) - self.assertRaises(MAASSeedDirMalformed, read_maas_seed_dir, my_d) + self.assertRaises(DataSourceMAAS.MAASSeedDirMalformed, + DataSourceMAAS.read_maas_seed_dir, my_d) def test_seed_dir_none(self): """Verify that empty seed_dir raises MAASSeedDirNone""" my_d = os.path.join(self.tmp, "valid_empty") - self.assertRaises(MAASSeedDirNone, read_maas_seed_dir, my_d) + self.assertRaises(DataSourceMAAS.MAASSeedDirNone, + DataSourceMAAS.read_maas_seed_dir, my_d) def test_seed_dir_missing(self): """Verify that missing seed_dir raises MAASSeedDirNone""" - self.assertRaises(MAASSeedDirNone, read_maas_seed_dir, + self.assertRaises(DataSourceMAAS.MAASSeedDirNone, + DataSourceMAAS.read_maas_seed_dir, os.path.join(self.tmp, "nonexistantdirectory")) def test_seed_url_valid(self): @@ -102,30 +98,30 @@ class TestMAASDataSource(MockerTestCase): 'meta-data/local-hostname': 'test-hostname', 'meta-data/public-keys': 'test-hostname', 'user-data': 'foodata'} - + valid_order = [ + 'meta-data/local-hostname', + 'meta-data/instance-id', + 'meta-data/public-keys', + 'user-data', + ] my_seed = "http://example.com/xmeta" my_ver = "1999-99-99" my_headers = {'header1': 'value1', 'header2': 'value2'} def my_headers_cb(url): - return(my_headers) + return my_headers - mock_request = self.mocker.replace("urllib2.Request", - passthrough=False) - mock_urlopen = self.mocker.replace("urllib2.urlopen", + mock_request = self.mocker.replace(url_helper.readurl, passthrough=False) - for (key, val) in valid.iteritems(): - mock_request("%s/%s/%s" % (my_seed, my_ver, key), - data=None, headers=my_headers) - self.mocker.nospec() - self.mocker.result("fake-request-%s" % key) - mock_urlopen("fake-request-%s" % key, timeout=None) - self.mocker.result(StringIO(val)) - + for key in valid_order: + url = "%s/%s/%s" % (my_seed, my_ver, key) + mock_request(url, headers=my_headers, timeout=None) + resp = valid.get(key) + self.mocker.result(url_helper.UrlResponse(200, resp)) self.mocker.replay() - (userdata, metadata) = read_maas_seed_url(my_seed, + (userdata, metadata) = DataSourceMAAS.read_maas_seed_url(my_seed, header_cb=my_headers_cb, version=my_ver) self.assertEqual("foodata", userdata) diff --git a/tests/unittests/test_handler/test_handler_ca_certs.py b/tests/unittests/test_handler/test_handler_ca_certs.py index 21d2442f..1f96e992 100644 --- a/tests/unittests/test_handler/test_handler_ca_certs.py +++ b/tests/unittests/test_handler/test_handler_ca_certs.py @@ -1,9 +1,12 @@ from mocker import MockerTestCase -from cloudinit.util import write_file, delete_dir_contents -from cloudinit.CloudConfig.cc_ca_certs import ( - handle, update_ca_certs, add_ca_certs, remove_default_ca_certs) -from logging import getLogger +from cloudinit import util +from cloudinit import cloud +from cloudinit import helpers + +from cloudinit.config import cc_ca_certs + +import logging class TestNoConfig(MockerTestCase): @@ -11,36 +14,37 @@ class TestNoConfig(MockerTestCase): super(TestNoConfig, self).setUp() self.name = "ca-certs" self.cloud_init = None - self.log = getLogger("TestNoConfig") + self.log = logging.getLogger("TestNoConfig") self.args = [] def test_no_config(self): """ Test that nothing is done if no ca-certs configuration is provided. """ - config = {"unknown-key": "value"} - - self.mocker.replace(write_file, passthrough=False) - self.mocker.replace(update_ca_certs, passthrough=False) + config = util.get_builtin_cfg() + self.mocker.replace(util.write_file, passthrough=False) + self.mocker.replace(cc_ca_certs.update_ca_certs, passthrough=False) self.mocker.replay() - handle(self.name, config, self.cloud_init, self.log, self.args) + cc_ca_certs.handle(self.name, config, self.cloud_init, self.log, self.args) class TestConfig(MockerTestCase): def setUp(self): super(TestConfig, self).setUp() self.name = "ca-certs" - self.cloud_init = None - self.log = getLogger("TestNoConfig") + self.paths = None + self.cloud = cloud.Cloud(None, self.paths, None, None, None) + self.log = logging.getLogger("TestNoConfig") self.args = [] # Mock out the functions that actually modify the system - self.mock_add = self.mocker.replace(add_ca_certs, passthrough=False) - self.mock_update = self.mocker.replace(update_ca_certs, + self.mock_add = self.mocker.replace(cc_ca_certs.add_ca_certs, passthrough=False) + self.mock_update = self.mocker.replace(cc_ca_certs.update_ca_certs, passthrough=False) - self.mock_remove = self.mocker.replace(remove_default_ca_certs, + self.mock_remove = self.mocker.replace(cc_ca_certs.remove_default_ca_certs, passthrough=False) + # Order must be correct self.mocker.order() @@ -55,7 +59,7 @@ class TestConfig(MockerTestCase): self.mock_update() self.mocker.replay() - handle(self.name, config, self.cloud_init, self.log, self.args) + cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) def test_empty_trusted_list(self): """Test that no certificate are written if 'trusted' list is empty""" @@ -65,37 +69,37 @@ class TestConfig(MockerTestCase): self.mock_update() self.mocker.replay() - handle(self.name, config, self.cloud_init, self.log, self.args) + cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) def test_single_trusted(self): """Test that a single cert gets passed to add_ca_certs""" config = {"ca-certs": {"trusted": ["CERT1"]}} - self.mock_add(["CERT1"]) + self.mock_add(self.paths, ["CERT1"]) self.mock_update() self.mocker.replay() - handle(self.name, config, self.cloud_init, self.log, self.args) + cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) def test_multiple_trusted(self): """Test that multiple certs get passed to add_ca_certs""" config = {"ca-certs": {"trusted": ["CERT1", "CERT2"]}} - self.mock_add(["CERT1", "CERT2"]) + self.mock_add(self.paths, ["CERT1", "CERT2"]) self.mock_update() self.mocker.replay() - handle(self.name, config, self.cloud_init, self.log, self.args) + cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) def test_remove_default_ca_certs(self): """Test remove_defaults works as expected""" config = {"ca-certs": {"remove-defaults": True}} - self.mock_remove() + self.mock_remove(self.paths) self.mock_update() self.mocker.replay() - handle(self.name, config, self.cloud_init, self.log, self.args) + cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) def test_no_remove_defaults_if_false(self): """Test remove_defaults is not called when config value is False""" @@ -104,72 +108,85 @@ class TestConfig(MockerTestCase): self.mock_update() self.mocker.replay() - handle(self.name, config, self.cloud_init, self.log, self.args) + cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) def test_correct_order_for_remove_then_add(self): """Test remove_defaults is not called when config value is False""" config = {"ca-certs": {"remove-defaults": True, "trusted": ["CERT1"]}} - self.mock_remove() - self.mock_add(["CERT1"]) + self.mock_remove(self.paths) + self.mock_add(self.paths, ["CERT1"]) self.mock_update() self.mocker.replay() - handle(self.name, config, self.cloud_init, self.log, self.args) + cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) class TestAddCaCerts(MockerTestCase): + + def setUp(self): + super(TestAddCaCerts, self).setUp() + self.paths = helpers.Paths({ + 'cloud_dir': self.makeDir() + }) + def test_no_certs_in_list(self): """Test that no certificate are written if not provided.""" - self.mocker.replace(write_file, passthrough=False) + self.mocker.replace(util.write_file, passthrough=False) self.mocker.replay() - - add_ca_certs([]) + cc_ca_certs.add_ca_certs(self.paths, []) def test_single_cert(self): """Test adding a single certificate to the trusted CAs""" cert = "CERT1\nLINE2\nLINE3" - mock_write = self.mocker.replace(write_file, passthrough=False) + mock_write = self.mocker.replace(util.write_file, passthrough=False) mock_write("/usr/share/ca-certificates/cloud-init-ca-certs.crt", cert, mode=0644) mock_write("/etc/ca-certificates.conf", - "\ncloud-init-ca-certs.crt", omode="a") + "\ncloud-init-ca-certs.crt", omode="ab") self.mocker.replay() - add_ca_certs([cert]) + cc_ca_certs.add_ca_certs(self.paths, [cert]) def test_multiple_certs(self): """Test adding multiple certificates to the trusted CAs""" certs = ["CERT1\nLINE2\nLINE3", "CERT2\nLINE2\nLINE3"] expected_cert_file = "\n".join(certs) - mock_write = self.mocker.replace(write_file, passthrough=False) + mock_write = self.mocker.replace(util.write_file, passthrough=False) mock_write("/usr/share/ca-certificates/cloud-init-ca-certs.crt", expected_cert_file, mode=0644) mock_write("/etc/ca-certificates.conf", - "\ncloud-init-ca-certs.crt", omode="a") + "\ncloud-init-ca-certs.crt", omode="ab") self.mocker.replay() - add_ca_certs(certs) + cc_ca_certs.add_ca_certs(self.paths, certs) class TestUpdateCaCerts(MockerTestCase): def test_commands(self): - mock_check_call = self.mocker.replace("subprocess.check_call", + mock_check_call = self.mocker.replace(util.subp, passthrough=False) - mock_check_call(["update-ca-certificates"]) + mock_check_call(["update-ca-certificates"], capture=False) self.mocker.replay() - update_ca_certs() + cc_ca_certs.update_ca_certs() class TestRemoveDefaultCaCerts(MockerTestCase): + + def setUp(self): + super(TestRemoveDefaultCaCerts, self).setUp() + self.paths = helpers.Paths({ + 'cloud_dir': self.makeDir() + }) + def test_commands(self): - mock_delete_dir_contents = self.mocker.replace(delete_dir_contents, + mock_delete_dir_contents = self.mocker.replace(util.delete_dir_contents, passthrough=False) - mock_write = self.mocker.replace(write_file, passthrough=False) - mock_subp = self.mocker.replace("cloudinit.util.subp", + mock_write = self.mocker.replace(util.write_file, passthrough=False) + mock_subp = self.mocker.replace(util.subp, passthrough=False) mock_delete_dir_contents("/usr/share/ca-certificates/") @@ -179,4 +196,4 @@ class TestRemoveDefaultCaCerts(MockerTestCase): "ca-certificates ca-certificates/trust_new_crts select no") self.mocker.replay() - remove_default_ca_certs() + cc_ca_certs.remove_default_ca_certs(self.paths) diff --git a/tests/unittests/test_userdata.py b/tests/unittests/test_userdata.py index 8eb7b259..861642b6 100644 --- a/tests/unittests/test_userdata.py +++ b/tests/unittests/test_userdata.py @@ -1,107 +1,144 @@ """Tests for handling of userdata within cloud init""" -import logging import StringIO +import logging +import os +import shutil +import tempfile + from email.mime.base import MIMEBase from mocker import MockerTestCase -import cloudinit -from cloudinit.DataSource import DataSource - +from cloudinit import helpers +from cloudinit import log +from cloudinit import sources +from cloudinit import stages +from cloudinit import util -instance_id = "i-testing" +INSTANCE_ID = "i-testing" -class FakeDataSource(DataSource): +class FakeDataSource(sources.DataSource): def __init__(self, userdata): - DataSource.__init__(self) - self.metadata = {'instance-id': instance_id} + sources.DataSource.__init__(self, {}, None, None) + self.metadata = {'instance-id': INSTANCE_ID} self.userdata_raw = userdata -class TestConsumeUserData(MockerTestCase): +# FIXME: these tests shouldn't be checking log output?? +# Weirddddd... + - _log_handler = None - _log = None - log_file = None +class TestConsumeUserData(MockerTestCase): def setUp(self): + MockerTestCase.setUp(self) + # Replace the write so no actual files + # get written out... self.mock_write = self.mocker.replace("cloudinit.util.write_file", passthrough=False) - self.mock_write(self.get_ipath("cloud_config"), "", 0600) - self.capture_log() + self._log = None + self._log_file = None + self._log_handler = None def tearDown(self): - self._log.removeHandler(self._log_handler) - - @staticmethod - def get_ipath(name): - return "%s/instances/%s%s" % (cloudinit.varlibdir, instance_id, - cloudinit.pathmap[name]) - - def capture_log(self): - self.log_file = StringIO.StringIO() - self._log_handler = logging.StreamHandler(self.log_file) - self._log_handler.setLevel(logging.DEBUG) - self._log = logging.getLogger(cloudinit.logger_name) + MockerTestCase.tearDown(self) + if self._log_handler and self._log: + self._log.removeHandler(self._log_handler) + + def capture_log(self, lvl=logging.DEBUG): + log_file = StringIO.StringIO() + self._log_handler = logging.StreamHandler(log_file) + self._log_handler.setLevel(lvl) + self._log = log.getLogger() self._log.addHandler(self._log_handler) + return log_file def test_unhandled_type_warning(self): """Raw text without magic is ignored but shows warning""" + ci = stages.Init() + data = "arbitrary text\n" + ci.datasource = FakeDataSource(data) + + self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600) self.mocker.replay() - ci = cloudinit.CloudInit() - ci.datasource = FakeDataSource("arbitrary text\n") + + log_file = self.capture_log(logging.WARNING) + ci.fetch() ci.consume_userdata() - self.assertEqual( - "Unhandled non-multipart userdata starting 'arbitrary text...'\n", - self.log_file.getvalue()) + self.assertIn( + "Unhandled non-multipart (text/x-not-multipart) userdata:", + log_file.getvalue()) def test_mime_text_plain(self): - """Mime message of type text/plain is ignored without warning""" - self.mocker.replay() - ci = cloudinit.CloudInit() + """Mime message of type text/plain is ignored but shows warning""" + ci = stages.Init() message = MIMEBase("text", "plain") message.set_payload("Just text") ci.datasource = FakeDataSource(message.as_string()) + + self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600) + self.mocker.replay() + + log_file = self.capture_log(logging.WARNING) + ci.fetch() ci.consume_userdata() - self.assertEqual("", self.log_file.getvalue()) + self.assertIn( + "Unhandled unknown content-type (text/plain)", + log_file.getvalue()) + def test_shellscript(self): """Raw text starting #!/bin/sh is treated as script""" + ci = stages.Init() script = "#!/bin/sh\necho hello\n" - outpath = cloudinit.get_ipath_cur("scripts") + "/part-001" + ci.datasource = FakeDataSource(script) + + outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001") + self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600) self.mock_write(outpath, script, 0700) self.mocker.replay() - ci = cloudinit.CloudInit() - ci.datasource = FakeDataSource(script) + + log_file = self.capture_log(logging.WARNING) + ci.fetch() ci.consume_userdata() - self.assertEqual("", self.log_file.getvalue()) + self.assertEqual("", log_file.getvalue()) def test_mime_text_x_shellscript(self): """Mime message of type text/x-shellscript is treated as script""" + ci = stages.Init() script = "#!/bin/sh\necho hello\n" - outpath = cloudinit.get_ipath_cur("scripts") + "/part-001" - self.mock_write(outpath, script, 0700) - self.mocker.replay() - ci = cloudinit.CloudInit() message = MIMEBase("text", "x-shellscript") message.set_payload(script) ci.datasource = FakeDataSource(message.as_string()) + + outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001") + self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600) + self.mock_write(outpath, script, 0700) + self.mocker.replay() + + log_file = self.capture_log(logging.WARNING) + ci.fetch() ci.consume_userdata() - self.assertEqual("", self.log_file.getvalue()) + self.assertEqual("", log_file.getvalue()) def test_mime_text_plain_shell(self): """Mime type text/plain starting #!/bin/sh is treated as script""" + ci = stages.Init() script = "#!/bin/sh\necho hello\n" - outpath = cloudinit.get_ipath_cur("scripts") + "/part-001" - self.mock_write(outpath, script, 0700) - self.mocker.replay() - ci = cloudinit.CloudInit() message = MIMEBase("text", "plain") message.set_payload(script) ci.datasource = FakeDataSource(message.as_string()) + + outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001") + self.mock_write(outpath, script, 0700) + self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600) + self.mocker.replay() + + log_file = self.capture_log(logging.WARNING) + ci.fetch() ci.consume_userdata() - self.assertEqual("", self.log_file.getvalue()) + self.assertEqual("", log_file.getvalue()) diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py index e8f5885c..93979f06 100644 --- a/tests/unittests/test_util.py +++ b/tests/unittests/test_util.py @@ -1,28 +1,45 @@ -from unittest import TestCase -from mocker import MockerTestCase -from tempfile import mkdtemp -from shutil import rmtree import os import stat -from cloudinit.util import (mergedict, get_cfg_option_list_or_str, write_file, - delete_dir_contents, get_cmdline, - keyval_str_to_dict) +from unittest import TestCase +from mocker import MockerTestCase + +from cloudinit import util +from cloudinit import importer + + +class FakeSelinux(object): + + def __init__(self, match_what): + self.match_what = match_what + self.restored = [] + + def matchpathcon(self, path, mode): + if path == self.match_what: + return + else: + raise OSError("No match!") + def is_selinux_enabled(self): + return True -class TestMergeDict(TestCase): + def restorecon(self, path, recursive): + self.restored.append(path) + + +class TestMergeDict(MockerTestCase): def test_simple_merge(self): """Test simple non-conflict merge.""" source = {"key1": "value1"} candidate = {"key2": "value2"} - result = mergedict(source, candidate) + result = util.mergedict(source, candidate) self.assertEqual({"key1": "value1", "key2": "value2"}, result) def test_nested_merge(self): """Test nested merge.""" source = {"key1": {"key1.1": "value1.1"}} candidate = {"key1": {"key1.2": "value1.2"}} - result = mergedict(source, candidate) + result = util.mergedict(source, candidate) self.assertEqual( {"key1": {"key1.1": "value1.1", "key1.2": "value1.2"}}, result) @@ -30,42 +47,42 @@ class TestMergeDict(TestCase): """Test that candidate doesn't override source.""" source = {"key1": "value1", "key2": "value2"} candidate = {"key1": "value2", "key2": "NEW VALUE"} - result = mergedict(source, candidate) + result = util.mergedict(source, candidate) self.assertEqual(source, result) def test_empty_candidate(self): """Test empty candidate doesn't change source.""" source = {"key": "value"} candidate = {} - result = mergedict(source, candidate) + result = util.mergedict(source, candidate) self.assertEqual(source, result) def test_empty_source(self): """Test empty source is replaced by candidate.""" source = {} candidate = {"key": "value"} - result = mergedict(source, candidate) + result = util.mergedict(source, candidate) self.assertEqual(candidate, result) def test_non_dict_candidate(self): """Test non-dict candidate is discarded.""" source = {"key": "value"} candidate = "not a dict" - result = mergedict(source, candidate) + result = util.mergedict(source, candidate) self.assertEqual(source, result) def test_non_dict_source(self): """Test non-dict source is not modified with a dict candidate.""" source = "not a dict" candidate = {"key": "value"} - result = mergedict(source, candidate) + result = util.mergedict(source, candidate) self.assertEqual(source, result) def test_neither_dict(self): """Test if neither candidate or source is dict source wins.""" source = "source" candidate = "candidate" - result = mergedict(source, candidate) + result = util.mergedict(source, candidate) self.assertEqual(source, result) @@ -73,51 +90,45 @@ class TestGetCfgOptionListOrStr(TestCase): def test_not_found_no_default(self): """None is returned if key is not found and no default given.""" config = {} - result = get_cfg_option_list_or_str(config, "key") - self.assertIsNone(result) + result = util.get_cfg_option_list(config, "key") + self.assertEqual(None, result) def test_not_found_with_default(self): """Default is returned if key is not found.""" config = {} - result = get_cfg_option_list_or_str(config, "key", default=["DEFAULT"]) + result = util.get_cfg_option_list(config, "key", default=["DEFAULT"]) self.assertEqual(["DEFAULT"], result) def test_found_with_default(self): """Default is not returned if key is found.""" config = {"key": ["value1"]} - result = get_cfg_option_list_or_str(config, "key", default=["DEFAULT"]) + result = util.get_cfg_option_list(config, "key", default=["DEFAULT"]) self.assertEqual(["value1"], result) def test_found_convert_to_list(self): """Single string is converted to one element list.""" config = {"key": "value1"} - result = get_cfg_option_list_or_str(config, "key") + result = util.get_cfg_option_list(config, "key") self.assertEqual(["value1"], result) def test_value_is_none(self): """If value is None empty list is returned.""" config = {"key": None} - result = get_cfg_option_list_or_str(config, "key") + result = util.get_cfg_option_list(config, "key") self.assertEqual([], result) class TestWriteFile(MockerTestCase): def setUp(self): super(TestWriteFile, self).setUp() - # Make a temp directoy for tests to use. - self.tmp = mkdtemp(prefix="unittest_") - - def tearDown(self): - super(TestWriteFile, self).tearDown() - # Clean up temp directory - rmtree(self.tmp) + self.tmp = self.makeDir(prefix="unittest_") def test_basic_usage(self): """Verify basic usage with default args.""" path = os.path.join(self.tmp, "NewFile.txt") contents = "Hey there" - write_file(path, contents) + util.write_file(path, contents) self.assertTrue(os.path.exists(path)) self.assertTrue(os.path.isfile(path)) @@ -133,7 +144,7 @@ class TestWriteFile(MockerTestCase): path = os.path.join(dirname, "NewFile.txt") contents = "Hey there" - write_file(path, contents) + util.write_file(path, contents) self.assertTrue(os.path.isdir(dirname)) self.assertTrue(os.path.isfile(path)) @@ -143,7 +154,7 @@ class TestWriteFile(MockerTestCase): path = os.path.join(self.tmp, "NewFile.txt") contents = "Hey there" - write_file(path, contents, mode=0666) + util.write_file(path, contents, mode=0666) self.assertTrue(os.path.exists(path)) self.assertTrue(os.path.isfile(path)) @@ -158,7 +169,7 @@ class TestWriteFile(MockerTestCase): # Create file first with basic content with open(path, "wb") as f: f.write("LINE1\n") - write_file(path, contents, omode="a") + util.write_file(path, contents, omode="a") self.assertTrue(os.path.exists(path)) self.assertTrue(os.path.isfile(path)) @@ -167,36 +178,30 @@ class TestWriteFile(MockerTestCase): self.assertEqual("LINE1\nHey there", create_contents) def test_restorecon_if_possible_is_called(self): - """Make sure the restorecon_if_possible is called correctly.""" - path = os.path.join(self.tmp, "NewFile.txt") - contents = "Hey there" - - # Mock out the restorecon_if_possible call to test if it's called. - mock_restorecon = self.mocker.replace( - "cloudinit.util.restorecon_if_possible", passthrough=False) - mock_restorecon(path) + """Make sure the selinux guard is called correctly.""" + import_mock = self.mocker.replace(importer.import_module, + passthrough=False) + import_mock('selinux') + fake_se = FakeSelinux('/etc/hosts') + self.mocker.result(fake_se) self.mocker.replay() + with util.SeLinuxGuard("/etc/hosts") as is_on: + self.assertTrue(is_on) + self.assertEqual(1, len(fake_se.restored)) + self.assertEqual('/etc/hosts', fake_se.restored[0]) - write_file(path, contents) - -class TestDeleteDirContents(TestCase): +class TestDeleteDirContents(MockerTestCase): def setUp(self): super(TestDeleteDirContents, self).setUp() - # Make a temp directoy for tests to use. - self.tmp = mkdtemp(prefix="unittest_") - - def tearDown(self): - super(TestDeleteDirContents, self).tearDown() - # Clean up temp directory - rmtree(self.tmp) + self.tmp = self.makeDir(prefix="unittest_") def assertDirEmpty(self, dirname): self.assertEqual([], os.listdir(dirname)) def test_does_not_delete_dir(self): """Ensure directory itself is not deleted.""" - delete_dir_contents(self.tmp) + util.delete_dir_contents(self.tmp) self.assertTrue(os.path.isdir(self.tmp)) self.assertDirEmpty(self.tmp) @@ -206,7 +211,7 @@ class TestDeleteDirContents(TestCase): with open(os.path.join(self.tmp, "new_file.txt"), "wb") as f: f.write("DELETE ME") - delete_dir_contents(self.tmp) + util.delete_dir_contents(self.tmp) self.assertDirEmpty(self.tmp) @@ -214,7 +219,7 @@ class TestDeleteDirContents(TestCase): """Empty directories should be deleted.""" os.mkdir(os.path.join(self.tmp, "new_dir")) - delete_dir_contents(self.tmp) + util.delete_dir_contents(self.tmp) self.assertDirEmpty(self.tmp) @@ -223,7 +228,7 @@ class TestDeleteDirContents(TestCase): os.mkdir(os.path.join(self.tmp, "new_dir")) os.mkdir(os.path.join(self.tmp, "new_dir", "new_subdir")) - delete_dir_contents(self.tmp) + util.delete_dir_contents(self.tmp) self.assertDirEmpty(self.tmp) @@ -234,7 +239,7 @@ class TestDeleteDirContents(TestCase): with open(f_name, "wb") as f: f.write("DELETE ME") - delete_dir_contents(self.tmp) + util.delete_dir_contents(self.tmp) self.assertDirEmpty(self.tmp) @@ -246,7 +251,7 @@ class TestDeleteDirContents(TestCase): f.write("DELETE ME") os.symlink(file_name, link_name) - delete_dir_contents(self.tmp) + util.delete_dir_contents(self.tmp) self.assertDirEmpty(self.tmp) @@ -255,12 +260,12 @@ class TestKeyValStrings(TestCase): def test_keyval_str_to_dict(self): expected = {'1': 'one', '2': 'one+one', 'ro': True} cmdline = "1=one ro 2=one+one" - self.assertEqual(expected, keyval_str_to_dict(cmdline)) + self.assertEqual(expected, util.keyval_str_to_dict(cmdline)) class TestGetCmdline(TestCase): def test_cmdline_reads_debug_env(self): os.environ['DEBUG_PROC_CMDLINE'] = 'abcd 123' - self.assertEqual(os.environ['DEBUG_PROC_CMDLINE'], get_cmdline()) + self.assertEqual(os.environ['DEBUG_PROC_CMDLINE'], util.get_cmdline()) # vi: ts=4 expandtab |