summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/unittests/test__init__.py195
-rw-r--r--tests/unittests/test_datasource/test_maas.py151
-rw-r--r--tests/unittests/test_handler/test_handler_ca_certs.py (renamed from tests/unittests/test_handler_ca_certs.py)0
-rw-r--r--tests/unittests/test_util.py2
4 files changed, 347 insertions, 1 deletions
diff --git a/tests/unittests/test__init__.py b/tests/unittests/test__init__.py
new file mode 100644
index 00000000..e157fa77
--- /dev/null
+++ b/tests/unittests/test__init__.py
@@ -0,0 +1,195 @@
+from mocker import MockerTestCase, ANY, ARGS, KWARGS
+import os
+
+from cloudinit import (partwalker_handle_handler, handler_handle_part,
+ handler_register)
+from cloudinit.util import write_file, logexc
+
+
+class TestPartwalkerHandleHandler(MockerTestCase):
+ def setUp(self):
+ self.data = {
+ "handlercount": 0,
+ "frequency": "?",
+ "handlerdir": "?",
+ "handlers": [],
+ "data": None}
+
+ self.expected_module_name = "part-handler-%03d" % (
+ self.data["handlercount"],)
+ expected_file_name = "%s.py" % self.expected_module_name
+ expected_file_fullname = os.path.join(self.data["handlerdir"],
+ expected_file_name)
+ self.module_fake = "fake module handle"
+ self.ctype = None
+ self.filename = None
+ self.payload = "dummy payload"
+
+ # Mock the write_file function
+ write_file_mock = self.mocker.replace(write_file, passthrough=False)
+ write_file_mock(expected_file_fullname, self.payload, 0600)
+
+ def test_no_errors(self):
+ """Payload gets written to file and added to C{pdata}."""
+ # Mock the __import__ builtin
+ import_mock = self.mocker.replace("__builtin__.__import__")
+ import_mock(self.expected_module_name)
+ self.mocker.result(self.module_fake)
+ # Mock the handle_register function
+ handle_reg_mock = self.mocker.replace(handler_register,
+ passthrough=False)
+ handle_reg_mock(self.module_fake, self.data["handlers"],
+ self.data["data"], self.data["frequency"])
+ # Activate mocks
+ self.mocker.replay()
+
+ partwalker_handle_handler(self.data, self.ctype, self.filename,
+ self.payload)
+
+ self.assertEqual(1, self.data["handlercount"])
+
+ def test_import_error(self):
+ """Module import errors are logged. No handler added to C{pdata}"""
+ # Mock the __import__ builtin
+ import_mock = self.mocker.replace("__builtin__.__import__")
+ import_mock(self.expected_module_name)
+ self.mocker.throw(ImportError())
+ # Mock log function
+ logexc_mock = self.mocker.replace(logexc, passthrough=False)
+ logexc_mock(ANY)
+ # Mock the print_exc function
+ print_exc_mock = self.mocker.replace("traceback.print_exc",
+ passthrough=False)
+ print_exc_mock(ARGS, KWARGS)
+ # Activate mocks
+ self.mocker.replay()
+
+ partwalker_handle_handler(self.data, self.ctype, self.filename,
+ self.payload)
+
+ self.assertEqual(0, self.data["handlercount"])
+
+ def test_attribute_error(self):
+ """Attribute errors are logged. No handler added to C{pdata}"""
+ # Mock the __import__ builtin
+ import_mock = self.mocker.replace("__builtin__.__import__")
+ import_mock(self.expected_module_name)
+ self.mocker.result(self.module_fake)
+ # Mock the handle_register function
+ handle_reg_mock = self.mocker.replace(handler_register,
+ passthrough=False)
+ handle_reg_mock(self.module_fake, self.data["handlers"],
+ self.data["data"], self.data["frequency"])
+ self.mocker.throw(AttributeError())
+ # Mock log function
+ logexc_mock = self.mocker.replace(logexc, passthrough=False)
+ logexc_mock(ANY)
+ # Mock the print_exc function
+ print_exc_mock = self.mocker.replace("traceback.print_exc",
+ passthrough=False)
+ print_exc_mock(ARGS, KWARGS)
+ # Activate mocks
+ self.mocker.replay()
+
+ partwalker_handle_handler(self.data, self.ctype, self.filename,
+ self.payload)
+
+ self.assertEqual(0, self.data["handlercount"])
+
+
+class TestHandlerHandlePart(MockerTestCase):
+ def setUp(self):
+ self.data = "fake data"
+ self.ctype = "fake ctype"
+ self.filename = "fake filename"
+ self.payload = "fake payload"
+ self.frequency = "once-per-instance"
+
+ def test_normal_version_1(self):
+ """
+ C{handle_part} is called without C{frequency} for
+ C{handler_version} == 1.
+ """
+ # Build a mock part-handler module
+ mod_mock = self.mocker.mock()
+ getattr(mod_mock, "frequency")
+ self.mocker.result("once-per-instance")
+ getattr(mod_mock, "handler_version")
+ self.mocker.result(1)
+ mod_mock.handle_part(self.data, self.ctype, self.filename,
+ self.payload)
+ self.mocker.replay()
+
+ handler_handle_part(mod_mock, self.data, self.ctype, self.filename,
+ self.payload, self.frequency)
+
+ def test_normal_version_2(self):
+ """
+ C{handle_part} is called with C{frequency} for
+ C{handler_version} == 2.
+ """
+ # Build a mock part-handler module
+ mod_mock = self.mocker.mock()
+ getattr(mod_mock, "frequency")
+ self.mocker.result("once-per-instance")
+ getattr(mod_mock, "handler_version")
+ self.mocker.result(2)
+ mod_mock.handle_part(self.data, self.ctype, self.filename,
+ self.payload, self.frequency)
+ self.mocker.replay()
+
+ handler_handle_part(mod_mock, self.data, self.ctype, self.filename,
+ self.payload, self.frequency)
+
+ def test_modfreq_per_always(self):
+ """
+ C{handle_part} is called regardless of frequency if nofreq is always.
+ """
+ self.frequency = "once"
+ # Build a mock part-handler module
+ mod_mock = self.mocker.mock()
+ getattr(mod_mock, "frequency")
+ self.mocker.result("always")
+ getattr(mod_mock, "handler_version")
+ self.mocker.result(1)
+ mod_mock.handle_part(self.data, self.ctype, self.filename,
+ self.payload)
+ self.mocker.replay()
+
+ handler_handle_part(mod_mock, self.data, self.ctype, self.filename,
+ self.payload, self.frequency)
+
+ def test_no_handle_when_modfreq_once(self):
+ """C{handle_part} is not called if frequency is once"""
+ self.frequency = "once"
+ # Build a mock part-handler module
+ mod_mock = self.mocker.mock()
+ getattr(mod_mock, "frequency")
+ self.mocker.result("once-per-instance")
+ self.mocker.replay()
+
+ handler_handle_part(mod_mock, self.data, self.ctype, self.filename,
+ self.payload, self.frequency)
+
+ def test_exception_is_caught(self):
+ """Exceptions within C{handle_part} are caught and logged."""
+ # Build a mock part-handler module
+ mod_mock = self.mocker.mock()
+ getattr(mod_mock, "frequency")
+ self.mocker.result("once-per-instance")
+ getattr(mod_mock, "handler_version")
+ self.mocker.result(1)
+ mod_mock.handle_part(self.data, self.ctype, self.filename,
+ self.payload)
+ self.mocker.throw(Exception())
+ # Mock log function
+ logexc_mock = self.mocker.replace(logexc, passthrough=False)
+ logexc_mock(ANY)
+ # Mock the print_exc function
+ print_exc_mock = self.mocker.replace("traceback.print_exc",
+ passthrough=False)
+ print_exc_mock(ARGS, KWARGS)
+ self.mocker.replay()
+
+ handler_handle_part(mod_mock, self.data, self.ctype, self.filename,
+ self.payload, self.frequency)
diff --git a/tests/unittests/test_datasource/test_maas.py b/tests/unittests/test_datasource/test_maas.py
new file mode 100644
index 00000000..d0e121d6
--- /dev/null
+++ b/tests/unittests/test_datasource/test_maas.py
@@ -0,0 +1,151 @@
+from tempfile import mkdtemp
+from shutil import rmtree
+import os
+from StringIO import StringIO
+from copy import copy
+from cloudinit.DataSourceMaaS import (
+ MaasSeedDirNone,
+ MaasSeedDirMalformed,
+ read_maas_seed_dir,
+ read_maas_seed_url,
+)
+from mocker import MockerTestCase
+
+
+class TestMaasDataSource(MockerTestCase):
+
+ def setUp(self):
+ super(TestMaasDataSource, self).setUp()
+ # Make a temp directoy for tests to use.
+ self.tmp = mkdtemp(prefix="unittest_")
+
+ def tearDown(self):
+ super(TestMaasDataSource, self).tearDown()
+ # Clean up temp directory
+ rmtree(self.tmp)
+
+ def test_seed_dir_valid(self):
+ """Verify a valid seeddir is read as such"""
+
+ data = {'instance-id': 'i-valid01',
+ 'local-hostname': 'valid01-hostname',
+ 'user-data': 'valid01-userdata'}
+
+ my_d = os.path.join(self.tmp, "valid")
+ populate_dir(my_d, data)
+
+ (userdata, metadata) = read_maas_seed_dir(my_d)
+
+ self.assertEqual(userdata, data['user-data'])
+ for key in ('instance-id', 'local-hostname'):
+ self.assertEqual(data[key], metadata[key])
+
+ # verify that 'userdata' is not returned as part of the metadata
+ self.assertFalse(('user-data' in metadata))
+
+ def test_seed_dir_valid_extra(self):
+ """Verify extra files do not affect seed_dir validity """
+
+ data = {'instance-id': 'i-valid-extra',
+ 'local-hostname': 'valid-extra-hostname',
+ 'user-data': 'valid-extra-userdata', 'foo': 'bar'}
+
+ my_d = os.path.join(self.tmp, "valid_extra")
+ populate_dir(my_d, data)
+
+ (userdata, metadata) = read_maas_seed_dir(my_d)
+
+ self.assertEqual(userdata, data['user-data'])
+ for key in ('instance-id', 'local-hostname'):
+ self.assertEqual(data[key], metadata[key])
+
+ # additional files should not just appear as keys in metadata atm
+ self.assertFalse(('foo' in metadata))
+
+ def test_seed_dir_invalid(self):
+ """Verify that invalid seed_dir raises MaasSeedDirMalformed"""
+
+ valid = {'instance-id': 'i-instanceid',
+ 'local-hostname': 'test-hostname', 'user-data': ''}
+
+ my_based = os.path.join(self.tmp, "valid_extra")
+
+ # missing 'userdata' file
+ my_d = "%s-01" % my_based
+ invalid_data = copy(valid)
+ del invalid_data['local-hostname']
+ populate_dir(my_d, invalid_data)
+ self.assertRaises(MaasSeedDirMalformed, read_maas_seed_dir, my_d)
+
+ # missing 'instance-id'
+ my_d = "%s-02" % my_based
+ invalid_data = copy(valid)
+ del invalid_data['instance-id']
+ populate_dir(my_d, invalid_data)
+ self.assertRaises(MaasSeedDirMalformed, read_maas_seed_dir, my_d)
+
+ def test_seed_dir_none(self):
+ """Verify that empty seed_dir raises MaasSeedDirNone"""
+
+ my_d = os.path.join(self.tmp, "valid_empty")
+ self.assertRaises(MaasSeedDirNone, read_maas_seed_dir, my_d)
+
+ def test_seed_dir_missing(self):
+ """Verify that missing seed_dir raises MaasSeedDirNone"""
+ self.assertRaises(MaasSeedDirNone, read_maas_seed_dir,
+ os.path.join(self.tmp, "nonexistantdirectory"))
+
+ def test_seed_url_valid(self):
+ """Verify that valid seed_url is read as such"""
+ valid = {'meta-data/instance-id': 'i-instanceid',
+ 'meta-data/local-hostname': 'test-hostname',
+ 'user-data': 'foodata'}
+
+ my_seed = "http://example.com/xmeta"
+ my_ver = "1999-99-99"
+ my_headers = {'header1': 'value1', 'header2': 'value2'}
+
+ def my_headers_cb(url):
+ return(my_headers)
+
+ mock_request = self.mocker.replace("urllib2.Request",
+ passthrough=False)
+ mock_urlopen = self.mocker.replace("urllib2.urlopen",
+ passthrough=False)
+
+ for (key, val) in valid.iteritems():
+ mock_request("%s/%s/%s" % (my_seed, my_ver, key),
+ data=None, headers=my_headers)
+ self.mocker.nospec()
+ self.mocker.result("fake-request-%s" % key)
+ mock_urlopen("fake-request-%s" % key, timeout=None)
+ self.mocker.result(StringIO(val))
+
+ self.mocker.replay()
+
+ (userdata, metadata) = read_maas_seed_url(my_seed,
+ header_cb=my_headers_cb, version=my_ver)
+
+ self.assertEqual("foodata", userdata)
+ self.assertEqual(metadata['instance-id'],
+ valid['meta-data/instance-id'])
+ self.assertEqual(metadata['local-hostname'],
+ valid['meta-data/local-hostname'])
+
+ def test_seed_url_invalid(self):
+ """Verify that invalid seed_url raises MaasSeedDirMalformed"""
+ pass
+
+ def test_seed_url_missing(self):
+ """Verify seed_url with no found entries raises MaasSeedDirNone"""
+ pass
+
+
+def populate_dir(seed_dir, files):
+ os.mkdir(seed_dir)
+ for (name, content) in files.iteritems():
+ with open(os.path.join(seed_dir, name), "w") as fp:
+ fp.write(content)
+ fp.close()
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/test_handler_ca_certs.py b/tests/unittests/test_handler/test_handler_ca_certs.py
index 21d2442f..21d2442f 100644
--- a/tests/unittests/test_handler_ca_certs.py
+++ b/tests/unittests/test_handler/test_handler_ca_certs.py
diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py
index d8da8bc9..ca96bc60 100644
--- a/tests/unittests/test_util.py
+++ b/tests/unittests/test_util.py
@@ -28,7 +28,7 @@ class TestMergeDict(TestCase):
def test_merge_does_not_override(self):
"""Test that candidate doesn't override source."""
source = {"key1": "value1", "key2": "value2"}
- candidate = {"key2": "value2", "key2": "NEW VALUE"}
+ candidate = {"key1": "value2", "key2": "NEW VALUE"}
result = mergedict(source, candidate)
self.assertEqual(source, result)