diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/unittests/test__init__.py | 195 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_maas.py | 151 | ||||
-rw-r--r-- | tests/unittests/test_handler/test_handler_ca_certs.py (renamed from tests/unittests/test_handler_ca_certs.py) | 0 | ||||
-rw-r--r-- | tests/unittests/test_util.py | 2 |
4 files changed, 347 insertions, 1 deletions
diff --git a/tests/unittests/test__init__.py b/tests/unittests/test__init__.py new file mode 100644 index 00000000..e157fa77 --- /dev/null +++ b/tests/unittests/test__init__.py @@ -0,0 +1,195 @@ +from mocker import MockerTestCase, ANY, ARGS, KWARGS +import os + +from cloudinit import (partwalker_handle_handler, handler_handle_part, + handler_register) +from cloudinit.util import write_file, logexc + + +class TestPartwalkerHandleHandler(MockerTestCase): + def setUp(self): + self.data = { + "handlercount": 0, + "frequency": "?", + "handlerdir": "?", + "handlers": [], + "data": None} + + self.expected_module_name = "part-handler-%03d" % ( + self.data["handlercount"],) + expected_file_name = "%s.py" % self.expected_module_name + expected_file_fullname = os.path.join(self.data["handlerdir"], + expected_file_name) + self.module_fake = "fake module handle" + self.ctype = None + self.filename = None + self.payload = "dummy payload" + + # Mock the write_file function + write_file_mock = self.mocker.replace(write_file, passthrough=False) + write_file_mock(expected_file_fullname, self.payload, 0600) + + def test_no_errors(self): + """Payload gets written to file and added to C{pdata}.""" + # Mock the __import__ builtin + import_mock = self.mocker.replace("__builtin__.__import__") + import_mock(self.expected_module_name) + self.mocker.result(self.module_fake) + # Mock the handle_register function + handle_reg_mock = self.mocker.replace(handler_register, + passthrough=False) + handle_reg_mock(self.module_fake, self.data["handlers"], + self.data["data"], self.data["frequency"]) + # Activate mocks + self.mocker.replay() + + partwalker_handle_handler(self.data, self.ctype, self.filename, + self.payload) + + self.assertEqual(1, self.data["handlercount"]) + + def test_import_error(self): + """Module import errors are logged. No handler added to C{pdata}""" + # Mock the __import__ builtin + import_mock = self.mocker.replace("__builtin__.__import__") + import_mock(self.expected_module_name) + self.mocker.throw(ImportError()) + # Mock log function + logexc_mock = self.mocker.replace(logexc, passthrough=False) + logexc_mock(ANY) + # Mock the print_exc function + print_exc_mock = self.mocker.replace("traceback.print_exc", + passthrough=False) + print_exc_mock(ARGS, KWARGS) + # Activate mocks + self.mocker.replay() + + partwalker_handle_handler(self.data, self.ctype, self.filename, + self.payload) + + self.assertEqual(0, self.data["handlercount"]) + + def test_attribute_error(self): + """Attribute errors are logged. No handler added to C{pdata}""" + # Mock the __import__ builtin + import_mock = self.mocker.replace("__builtin__.__import__") + import_mock(self.expected_module_name) + self.mocker.result(self.module_fake) + # Mock the handle_register function + handle_reg_mock = self.mocker.replace(handler_register, + passthrough=False) + handle_reg_mock(self.module_fake, self.data["handlers"], + self.data["data"], self.data["frequency"]) + self.mocker.throw(AttributeError()) + # Mock log function + logexc_mock = self.mocker.replace(logexc, passthrough=False) + logexc_mock(ANY) + # Mock the print_exc function + print_exc_mock = self.mocker.replace("traceback.print_exc", + passthrough=False) + print_exc_mock(ARGS, KWARGS) + # Activate mocks + self.mocker.replay() + + partwalker_handle_handler(self.data, self.ctype, self.filename, + self.payload) + + self.assertEqual(0, self.data["handlercount"]) + + +class TestHandlerHandlePart(MockerTestCase): + def setUp(self): + self.data = "fake data" + self.ctype = "fake ctype" + self.filename = "fake filename" + self.payload = "fake payload" + self.frequency = "once-per-instance" + + def test_normal_version_1(self): + """ + C{handle_part} is called without C{frequency} for + C{handler_version} == 1. + """ + # Build a mock part-handler module + mod_mock = self.mocker.mock() + getattr(mod_mock, "frequency") + self.mocker.result("once-per-instance") + getattr(mod_mock, "handler_version") + self.mocker.result(1) + mod_mock.handle_part(self.data, self.ctype, self.filename, + self.payload) + self.mocker.replay() + + handler_handle_part(mod_mock, self.data, self.ctype, self.filename, + self.payload, self.frequency) + + def test_normal_version_2(self): + """ + C{handle_part} is called with C{frequency} for + C{handler_version} == 2. + """ + # Build a mock part-handler module + mod_mock = self.mocker.mock() + getattr(mod_mock, "frequency") + self.mocker.result("once-per-instance") + getattr(mod_mock, "handler_version") + self.mocker.result(2) + mod_mock.handle_part(self.data, self.ctype, self.filename, + self.payload, self.frequency) + self.mocker.replay() + + handler_handle_part(mod_mock, self.data, self.ctype, self.filename, + self.payload, self.frequency) + + def test_modfreq_per_always(self): + """ + C{handle_part} is called regardless of frequency if nofreq is always. + """ + self.frequency = "once" + # Build a mock part-handler module + mod_mock = self.mocker.mock() + getattr(mod_mock, "frequency") + self.mocker.result("always") + getattr(mod_mock, "handler_version") + self.mocker.result(1) + mod_mock.handle_part(self.data, self.ctype, self.filename, + self.payload) + self.mocker.replay() + + handler_handle_part(mod_mock, self.data, self.ctype, self.filename, + self.payload, self.frequency) + + def test_no_handle_when_modfreq_once(self): + """C{handle_part} is not called if frequency is once""" + self.frequency = "once" + # Build a mock part-handler module + mod_mock = self.mocker.mock() + getattr(mod_mock, "frequency") + self.mocker.result("once-per-instance") + self.mocker.replay() + + handler_handle_part(mod_mock, self.data, self.ctype, self.filename, + self.payload, self.frequency) + + def test_exception_is_caught(self): + """Exceptions within C{handle_part} are caught and logged.""" + # Build a mock part-handler module + mod_mock = self.mocker.mock() + getattr(mod_mock, "frequency") + self.mocker.result("once-per-instance") + getattr(mod_mock, "handler_version") + self.mocker.result(1) + mod_mock.handle_part(self.data, self.ctype, self.filename, + self.payload) + self.mocker.throw(Exception()) + # Mock log function + logexc_mock = self.mocker.replace(logexc, passthrough=False) + logexc_mock(ANY) + # Mock the print_exc function + print_exc_mock = self.mocker.replace("traceback.print_exc", + passthrough=False) + print_exc_mock(ARGS, KWARGS) + self.mocker.replay() + + handler_handle_part(mod_mock, self.data, self.ctype, self.filename, + self.payload, self.frequency) diff --git a/tests/unittests/test_datasource/test_maas.py b/tests/unittests/test_datasource/test_maas.py new file mode 100644 index 00000000..d0e121d6 --- /dev/null +++ b/tests/unittests/test_datasource/test_maas.py @@ -0,0 +1,151 @@ +from tempfile import mkdtemp +from shutil import rmtree +import os +from StringIO import StringIO +from copy import copy +from cloudinit.DataSourceMaaS import ( + MaasSeedDirNone, + MaasSeedDirMalformed, + read_maas_seed_dir, + read_maas_seed_url, +) +from mocker import MockerTestCase + + +class TestMaasDataSource(MockerTestCase): + + def setUp(self): + super(TestMaasDataSource, self).setUp() + # Make a temp directoy for tests to use. + self.tmp = mkdtemp(prefix="unittest_") + + def tearDown(self): + super(TestMaasDataSource, self).tearDown() + # Clean up temp directory + rmtree(self.tmp) + + def test_seed_dir_valid(self): + """Verify a valid seeddir is read as such""" + + data = {'instance-id': 'i-valid01', + 'local-hostname': 'valid01-hostname', + 'user-data': 'valid01-userdata'} + + my_d = os.path.join(self.tmp, "valid") + populate_dir(my_d, data) + + (userdata, metadata) = read_maas_seed_dir(my_d) + + self.assertEqual(userdata, data['user-data']) + for key in ('instance-id', 'local-hostname'): + self.assertEqual(data[key], metadata[key]) + + # verify that 'userdata' is not returned as part of the metadata + self.assertFalse(('user-data' in metadata)) + + def test_seed_dir_valid_extra(self): + """Verify extra files do not affect seed_dir validity """ + + data = {'instance-id': 'i-valid-extra', + 'local-hostname': 'valid-extra-hostname', + 'user-data': 'valid-extra-userdata', 'foo': 'bar'} + + my_d = os.path.join(self.tmp, "valid_extra") + populate_dir(my_d, data) + + (userdata, metadata) = read_maas_seed_dir(my_d) + + self.assertEqual(userdata, data['user-data']) + for key in ('instance-id', 'local-hostname'): + self.assertEqual(data[key], metadata[key]) + + # additional files should not just appear as keys in metadata atm + self.assertFalse(('foo' in metadata)) + + def test_seed_dir_invalid(self): + """Verify that invalid seed_dir raises MaasSeedDirMalformed""" + + valid = {'instance-id': 'i-instanceid', + 'local-hostname': 'test-hostname', 'user-data': ''} + + my_based = os.path.join(self.tmp, "valid_extra") + + # missing 'userdata' file + my_d = "%s-01" % my_based + invalid_data = copy(valid) + del invalid_data['local-hostname'] + populate_dir(my_d, invalid_data) + self.assertRaises(MaasSeedDirMalformed, read_maas_seed_dir, my_d) + + # missing 'instance-id' + my_d = "%s-02" % my_based + invalid_data = copy(valid) + del invalid_data['instance-id'] + populate_dir(my_d, invalid_data) + self.assertRaises(MaasSeedDirMalformed, read_maas_seed_dir, my_d) + + def test_seed_dir_none(self): + """Verify that empty seed_dir raises MaasSeedDirNone""" + + my_d = os.path.join(self.tmp, "valid_empty") + self.assertRaises(MaasSeedDirNone, read_maas_seed_dir, my_d) + + def test_seed_dir_missing(self): + """Verify that missing seed_dir raises MaasSeedDirNone""" + self.assertRaises(MaasSeedDirNone, read_maas_seed_dir, + os.path.join(self.tmp, "nonexistantdirectory")) + + def test_seed_url_valid(self): + """Verify that valid seed_url is read as such""" + valid = {'meta-data/instance-id': 'i-instanceid', + 'meta-data/local-hostname': 'test-hostname', + 'user-data': 'foodata'} + + my_seed = "http://example.com/xmeta" + my_ver = "1999-99-99" + my_headers = {'header1': 'value1', 'header2': 'value2'} + + def my_headers_cb(url): + return(my_headers) + + mock_request = self.mocker.replace("urllib2.Request", + passthrough=False) + mock_urlopen = self.mocker.replace("urllib2.urlopen", + passthrough=False) + + for (key, val) in valid.iteritems(): + mock_request("%s/%s/%s" % (my_seed, my_ver, key), + data=None, headers=my_headers) + self.mocker.nospec() + self.mocker.result("fake-request-%s" % key) + mock_urlopen("fake-request-%s" % key, timeout=None) + self.mocker.result(StringIO(val)) + + self.mocker.replay() + + (userdata, metadata) = read_maas_seed_url(my_seed, + header_cb=my_headers_cb, version=my_ver) + + self.assertEqual("foodata", userdata) + self.assertEqual(metadata['instance-id'], + valid['meta-data/instance-id']) + self.assertEqual(metadata['local-hostname'], + valid['meta-data/local-hostname']) + + def test_seed_url_invalid(self): + """Verify that invalid seed_url raises MaasSeedDirMalformed""" + pass + + def test_seed_url_missing(self): + """Verify seed_url with no found entries raises MaasSeedDirNone""" + pass + + +def populate_dir(seed_dir, files): + os.mkdir(seed_dir) + for (name, content) in files.iteritems(): + with open(os.path.join(seed_dir, name), "w") as fp: + fp.write(content) + fp.close() + +# vi: ts=4 expandtab diff --git a/tests/unittests/test_handler_ca_certs.py b/tests/unittests/test_handler/test_handler_ca_certs.py index 21d2442f..21d2442f 100644 --- a/tests/unittests/test_handler_ca_certs.py +++ b/tests/unittests/test_handler/test_handler_ca_certs.py diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py index d8da8bc9..ca96bc60 100644 --- a/tests/unittests/test_util.py +++ b/tests/unittests/test_util.py @@ -28,7 +28,7 @@ class TestMergeDict(TestCase): def test_merge_does_not_override(self): """Test that candidate doesn't override source.""" source = {"key1": "value1", "key2": "value2"} - candidate = {"key2": "value2", "key2": "NEW VALUE"} + candidate = {"key1": "value2", "key2": "NEW VALUE"} result = mergedict(source, candidate) self.assertEqual(source, result) |