summaryrefslogtreecommitdiff
path: root/tests/unittests
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests')
-rw-r--r--tests/unittests/test__init__.py162
-rw-r--r--tests/unittests/test_builtin_handlers.py54
-rw-r--r--tests/unittests/test_datasource/test_maas.py64
-rw-r--r--tests/unittests/test_handler/test_handler_ca_certs.py103
-rw-r--r--tests/unittests/test_userdata.py135
-rw-r--r--tests/unittests/test_util.py123
6 files changed, 366 insertions, 275 deletions
diff --git a/tests/unittests/test__init__.py b/tests/unittests/test__init__.py
index 4f60f0ea..af18955d 100644
--- a/tests/unittests/test__init__.py
+++ b/tests/unittests/test__init__.py
@@ -1,18 +1,42 @@
-from mocker import MockerTestCase, ANY, ARGS, KWARGS
+import StringIO
+import logging
import os
+import sys
+
+from mocker import MockerTestCase, ANY, ARGS, KWARGS
+
+from cloudinit import handlers
+from cloudinit import helpers
+from cloudinit import importer
+from cloudinit import log
+from cloudinit import settings
+from cloudinit import url_helper
+from cloudinit import util
+
+
+class FakeModule(handlers.Handler):
+ def __init__(self):
+ handlers.Handler.__init__(self, settings.PER_ALWAYS)
+ self.types = []
+
+ def list_types(self):
+ return self.types
+
+ def _handle_part(self, data, ctype, filename, payload, frequency):
+ pass
-from cloudinit import (partwalker_handle_handler, handler_handle_part,
- handler_register, get_cmdline_url)
-from cloudinit.util import write_file, logexc, readurl
+class TestWalkerHandleHandler(MockerTestCase):
-class TestPartwalkerHandleHandler(MockerTestCase):
def setUp(self):
+
+ MockerTestCase.setUp(self)
+
self.data = {
"handlercount": 0,
- "frequency": "?",
- "handlerdir": "?",
- "handlers": [],
+ "frequency": "",
+ "handlerdir": self.makeDir(),
+ "handlers": helpers.ContentHandlers(),
"data": None}
self.expected_module_name = "part-handler-%03d" % (
@@ -20,179 +44,138 @@ class TestPartwalkerHandleHandler(MockerTestCase):
expected_file_name = "%s.py" % self.expected_module_name
expected_file_fullname = os.path.join(self.data["handlerdir"],
expected_file_name)
- self.module_fake = "fake module handle"
+ self.module_fake = FakeModule()
self.ctype = None
self.filename = None
self.payload = "dummy payload"
# Mock the write_file function
- write_file_mock = self.mocker.replace(write_file, passthrough=False)
+ write_file_mock = self.mocker.replace(util.write_file, passthrough=False)
write_file_mock(expected_file_fullname, self.payload, 0600)
def test_no_errors(self):
"""Payload gets written to file and added to C{pdata}."""
- # Mock the __import__ builtin
- import_mock = self.mocker.replace("__builtin__.__import__")
+ import_mock = self.mocker.replace(importer.import_module, passthrough=False)
import_mock(self.expected_module_name)
self.mocker.result(self.module_fake)
- # Mock the handle_register function
- handle_reg_mock = self.mocker.replace(handler_register,
- passthrough=False)
- handle_reg_mock(self.module_fake, self.data["handlers"],
- self.data["data"], self.data["frequency"])
- # Activate mocks
self.mocker.replay()
-
- partwalker_handle_handler(self.data, self.ctype, self.filename,
- self.payload)
-
+
+ handlers.walker_handle_handler(self.data, self.ctype, self.filename,
+ self.payload)
+
self.assertEqual(1, self.data["handlercount"])
-
+
def test_import_error(self):
"""Module import errors are logged. No handler added to C{pdata}"""
- # Mock the __import__ builtin
- import_mock = self.mocker.replace("__builtin__.__import__")
+ import_mock = self.mocker.replace(importer.import_module, passthrough=False)
import_mock(self.expected_module_name)
self.mocker.throw(ImportError())
- # Mock log function
- logexc_mock = self.mocker.replace(logexc, passthrough=False)
- logexc_mock(ANY)
- # Mock the print_exc function
- print_exc_mock = self.mocker.replace("traceback.print_exc",
- passthrough=False)
- print_exc_mock(ARGS, KWARGS)
- # Activate mocks
self.mocker.replay()
- partwalker_handle_handler(self.data, self.ctype, self.filename,
- self.payload)
+ handlers.walker_handle_handler(self.data, self.ctype, self.filename,
+ self.payload)
self.assertEqual(0, self.data["handlercount"])
def test_attribute_error(self):
"""Attribute errors are logged. No handler added to C{pdata}"""
- # Mock the __import__ builtin
- import_mock = self.mocker.replace("__builtin__.__import__")
+ import_mock = self.mocker.replace(importer.import_module, passthrough=False)
import_mock(self.expected_module_name)
self.mocker.result(self.module_fake)
- # Mock the handle_register function
- handle_reg_mock = self.mocker.replace(handler_register,
- passthrough=False)
- handle_reg_mock(self.module_fake, self.data["handlers"],
- self.data["data"], self.data["frequency"])
self.mocker.throw(AttributeError())
- # Mock log function
- logexc_mock = self.mocker.replace(logexc, passthrough=False)
- logexc_mock(ANY)
- # Mock the print_exc function
- print_exc_mock = self.mocker.replace("traceback.print_exc",
- passthrough=False)
- print_exc_mock(ARGS, KWARGS)
- # Activate mocks
self.mocker.replay()
- partwalker_handle_handler(self.data, self.ctype, self.filename,
- self.payload)
+ handlers.walker_handle_handler(self.data, self.ctype, self.filename,
+ self.payload)
self.assertEqual(0, self.data["handlercount"])
class TestHandlerHandlePart(MockerTestCase):
+
def setUp(self):
self.data = "fake data"
self.ctype = "fake ctype"
self.filename = "fake filename"
self.payload = "fake payload"
- self.frequency = "once-per-instance"
+ self.frequency = settings.PER_INSTANCE
def test_normal_version_1(self):
"""
C{handle_part} is called without C{frequency} for
C{handler_version} == 1.
"""
- # Build a mock part-handler module
mod_mock = self.mocker.mock()
getattr(mod_mock, "frequency")
- self.mocker.result("once-per-instance")
+ self.mocker.result(settings.PER_INSTANCE)
getattr(mod_mock, "handler_version")
self.mocker.result(1)
mod_mock.handle_part(self.data, self.ctype, self.filename,
self.payload)
self.mocker.replay()
- handler_handle_part(mod_mock, self.data, self.ctype, self.filename,
- self.payload, self.frequency)
+ handlers.run_part(mod_mock, self.data, self.ctype, self.filename,
+ self.payload, self.frequency)
def test_normal_version_2(self):
"""
C{handle_part} is called with C{frequency} for
C{handler_version} == 2.
"""
- # Build a mock part-handler module
mod_mock = self.mocker.mock()
getattr(mod_mock, "frequency")
- self.mocker.result("once-per-instance")
+ self.mocker.result(settings.PER_INSTANCE)
getattr(mod_mock, "handler_version")
self.mocker.result(2)
mod_mock.handle_part(self.data, self.ctype, self.filename,
self.payload, self.frequency)
self.mocker.replay()
- handler_handle_part(mod_mock, self.data, self.ctype, self.filename,
- self.payload, self.frequency)
+ handlers.run_part(mod_mock, self.data, self.ctype, self.filename,
+ self.payload, self.frequency)
def test_modfreq_per_always(self):
"""
C{handle_part} is called regardless of frequency if nofreq is always.
"""
self.frequency = "once"
- # Build a mock part-handler module
mod_mock = self.mocker.mock()
getattr(mod_mock, "frequency")
- self.mocker.result("always")
+ self.mocker.result(settings.PER_ALWAYS)
getattr(mod_mock, "handler_version")
self.mocker.result(1)
mod_mock.handle_part(self.data, self.ctype, self.filename,
self.payload)
self.mocker.replay()
- handler_handle_part(mod_mock, self.data, self.ctype, self.filename,
- self.payload, self.frequency)
+ handlers.run_part(mod_mock, self.data, self.ctype, self.filename,
+ self.payload, self.frequency)
def test_no_handle_when_modfreq_once(self):
"""C{handle_part} is not called if frequency is once"""
self.frequency = "once"
- # Build a mock part-handler module
mod_mock = self.mocker.mock()
getattr(mod_mock, "frequency")
- self.mocker.result("once-per-instance")
+ self.mocker.result(settings.PER_ONCE)
self.mocker.replay()
- handler_handle_part(mod_mock, self.data, self.ctype, self.filename,
- self.payload, self.frequency)
+ handlers.run_part(mod_mock, self.data, self.ctype, self.filename,
+ self.payload, self.frequency)
def test_exception_is_caught(self):
"""Exceptions within C{handle_part} are caught and logged."""
- # Build a mock part-handler module
mod_mock = self.mocker.mock()
getattr(mod_mock, "frequency")
- self.mocker.result("once-per-instance")
+ self.mocker.result(settings.PER_INSTANCE)
getattr(mod_mock, "handler_version")
self.mocker.result(1)
mod_mock.handle_part(self.data, self.ctype, self.filename,
self.payload)
self.mocker.throw(Exception())
- # Mock log function
- logexc_mock = self.mocker.replace(logexc, passthrough=False)
- logexc_mock(ANY)
- # Mock the print_exc function
- print_exc_mock = self.mocker.replace("traceback.print_exc",
- passthrough=False)
- print_exc_mock(ARGS, KWARGS)
self.mocker.replay()
- handler_handle_part(mod_mock, self.data, self.ctype, self.filename,
- self.payload, self.frequency)
+ handlers.run_part(mod_mock, self.data, self.ctype, self.filename,
+ self.payload, self.frequency)
class TestCmdlineUrl(MockerTestCase):
@@ -202,14 +185,13 @@ class TestCmdlineUrl(MockerTestCase):
payload = "0"
cmdline = "ro %s=%s bar=1" % (key, url)
- mock_readurl = self.mocker.replace(readurl, passthrough=False)
+ mock_readurl = self.mocker.replace(url_helper.readurl, passthrough=False)
mock_readurl(url)
- self.mocker.result(payload)
-
+ self.mocker.result(url_helper.UrlResponse(200, payload))
self.mocker.replay()
self.assertEqual((key, url, None),
- get_cmdline_url(names=[key], starts="xxxxxx", cmdline=cmdline))
+ util.get_cmdline_url(names=[key], starts="xxxxxx", cmdline=cmdline))
def test_valid_content(self):
url = "http://example.com/foo"
@@ -217,14 +199,13 @@ class TestCmdlineUrl(MockerTestCase):
payload = "xcloud-config\nmydata: foo\nbar: wark\n"
cmdline = "ro %s=%s bar=1" % (key, url)
- mock_readurl = self.mocker.replace(readurl, passthrough=False)
+ mock_readurl = self.mocker.replace(url_helper.readurl, passthrough=False)
mock_readurl(url)
- self.mocker.result(payload)
-
+ self.mocker.result(url_helper.UrlResponse(200, payload))
self.mocker.replay()
self.assertEqual((key, url, payload),
- get_cmdline_url(names=[key], starts="xcloud-config",
+ util.get_cmdline_url(names=[key], starts="xcloud-config",
cmdline=cmdline))
def test_no_key_found(self):
@@ -232,11 +213,12 @@ class TestCmdlineUrl(MockerTestCase):
key = "mykey"
cmdline = "ro %s=%s bar=1" % (key, url)
- self.mocker.replace(readurl, passthrough=False)
+ self.mocker.replace(url_helper.readurl, passthrough=False)
+ self.mocker.result(url_helper.UrlResponse(400))
self.mocker.replay()
self.assertEqual((None, None, None),
- get_cmdline_url(names=["does-not-appear"],
+ util.get_cmdline_url(names=["does-not-appear"],
starts="#cloud-config", cmdline=cmdline))
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_builtin_handlers.py b/tests/unittests/test_builtin_handlers.py
new file mode 100644
index 00000000..84d85d4d
--- /dev/null
+++ b/tests/unittests/test_builtin_handlers.py
@@ -0,0 +1,54 @@
+"""Tests of the built-in user data handlers"""
+
+import os
+
+from mocker import MockerTestCase
+
+from cloudinit import handlers
+from cloudinit import helpers
+from cloudinit import util
+
+from cloudinit.handlers import upstart_job
+
+from cloudinit.settings import (PER_ALWAYS, PER_INSTANCE)
+
+
+class TestBuiltins(MockerTestCase):
+
+ def test_upstart_frequency_no_out(self):
+ c_root = self.makeDir()
+ up_root = self.makeDir()
+ paths = helpers.Paths({
+ 'cloud_dir': c_root,
+ 'upstart_dir': up_root,
+ })
+ freq = PER_ALWAYS
+ h = upstart_job.UpstartJobPartHandler(paths)
+ # No files should be written out when
+ # the frequency is ! per-instance
+ h.handle_part('', handlers.CONTENT_START,
+ None, None, None)
+ h.handle_part('blah', 'text/upstart-job',
+ 'test.conf', 'blah', freq)
+ h.handle_part('', handlers.CONTENT_END,
+ None, None, None)
+ self.assertEquals(0, len(os.listdir(up_root)))
+
+ def test_upstart_frequency_single(self):
+ c_root = self.makeDir()
+ up_root = self.makeDir()
+ paths = helpers.Paths({
+ 'cloud_dir': c_root,
+ 'upstart_dir': up_root,
+ })
+ freq = PER_INSTANCE
+ h = upstart_job.UpstartJobPartHandler(paths)
+ # No files should be written out when
+ # the frequency is ! per-instance
+ h.handle_part('', handlers.CONTENT_START,
+ None, None, None)
+ h.handle_part('blah', 'text/upstart-job',
+ 'test.conf', 'blah', freq)
+ h.handle_part('', handlers.CONTENT_END,
+ None, None, None)
+ self.assertEquals(1, len(os.listdir(up_root)))
diff --git a/tests/unittests/test_datasource/test_maas.py b/tests/unittests/test_datasource/test_maas.py
index 7659dd03..261c410a 100644
--- a/tests/unittests/test_datasource/test_maas.py
+++ b/tests/unittests/test_datasource/test_maas.py
@@ -1,14 +1,11 @@
-from tempfile import mkdtemp
-from shutil import rmtree
import os
from StringIO import StringIO
from copy import copy
-from cloudinit.DataSourceMAAS import (
- MAASSeedDirNone,
- MAASSeedDirMalformed,
- read_maas_seed_dir,
- read_maas_seed_url,
-)
+
+from cloudinit import util
+from cloudinit import url_helper
+from cloudinit.sources import DataSourceMAAS
+
from mocker import MockerTestCase
@@ -17,12 +14,7 @@ class TestMAASDataSource(MockerTestCase):
def setUp(self):
super(TestMAASDataSource, self).setUp()
# Make a temp directoy for tests to use.
- self.tmp = mkdtemp(prefix="unittest_")
-
- def tearDown(self):
- super(TestMAASDataSource, self).tearDown()
- # Clean up temp directory
- rmtree(self.tmp)
+ self.tmp = self.makeDir()
def test_seed_dir_valid(self):
"""Verify a valid seeddir is read as such"""
@@ -35,7 +27,7 @@ class TestMAASDataSource(MockerTestCase):
my_d = os.path.join(self.tmp, "valid")
populate_dir(my_d, data)
- (userdata, metadata) = read_maas_seed_dir(my_d)
+ (userdata, metadata) = DataSourceMAAS.read_maas_seed_dir(my_d)
self.assertEqual(userdata, data['user-data'])
for key in ('instance-id', 'local-hostname'):
@@ -54,7 +46,7 @@ class TestMAASDataSource(MockerTestCase):
my_d = os.path.join(self.tmp, "valid_extra")
populate_dir(my_d, data)
- (userdata, metadata) = read_maas_seed_dir(my_d)
+ (userdata, metadata) = DataSourceMAAS.read_maas_seed_dir(my_d)
self.assertEqual(userdata, data['user-data'])
for key in ('instance-id', 'local-hostname'):
@@ -76,24 +68,28 @@ class TestMAASDataSource(MockerTestCase):
invalid_data = copy(valid)
del invalid_data['local-hostname']
populate_dir(my_d, invalid_data)
- self.assertRaises(MAASSeedDirMalformed, read_maas_seed_dir, my_d)
+ self.assertRaises(DataSourceMAAS.MAASSeedDirMalformed,
+ DataSourceMAAS.read_maas_seed_dir, my_d)
# missing 'instance-id'
my_d = "%s-02" % my_based
invalid_data = copy(valid)
del invalid_data['instance-id']
populate_dir(my_d, invalid_data)
- self.assertRaises(MAASSeedDirMalformed, read_maas_seed_dir, my_d)
+ self.assertRaises(DataSourceMAAS.MAASSeedDirMalformed,
+ DataSourceMAAS.read_maas_seed_dir, my_d)
def test_seed_dir_none(self):
"""Verify that empty seed_dir raises MAASSeedDirNone"""
my_d = os.path.join(self.tmp, "valid_empty")
- self.assertRaises(MAASSeedDirNone, read_maas_seed_dir, my_d)
+ self.assertRaises(DataSourceMAAS.MAASSeedDirNone,
+ DataSourceMAAS.read_maas_seed_dir, my_d)
def test_seed_dir_missing(self):
"""Verify that missing seed_dir raises MAASSeedDirNone"""
- self.assertRaises(MAASSeedDirNone, read_maas_seed_dir,
+ self.assertRaises(DataSourceMAAS.MAASSeedDirNone,
+ DataSourceMAAS.read_maas_seed_dir,
os.path.join(self.tmp, "nonexistantdirectory"))
def test_seed_url_valid(self):
@@ -102,30 +98,30 @@ class TestMAASDataSource(MockerTestCase):
'meta-data/local-hostname': 'test-hostname',
'meta-data/public-keys': 'test-hostname',
'user-data': 'foodata'}
-
+ valid_order = [
+ 'meta-data/local-hostname',
+ 'meta-data/instance-id',
+ 'meta-data/public-keys',
+ 'user-data',
+ ]
my_seed = "http://example.com/xmeta"
my_ver = "1999-99-99"
my_headers = {'header1': 'value1', 'header2': 'value2'}
def my_headers_cb(url):
- return(my_headers)
+ return my_headers
- mock_request = self.mocker.replace("urllib2.Request",
- passthrough=False)
- mock_urlopen = self.mocker.replace("urllib2.urlopen",
+ mock_request = self.mocker.replace(url_helper.readurl,
passthrough=False)
- for (key, val) in valid.iteritems():
- mock_request("%s/%s/%s" % (my_seed, my_ver, key),
- data=None, headers=my_headers)
- self.mocker.nospec()
- self.mocker.result("fake-request-%s" % key)
- mock_urlopen("fake-request-%s" % key, timeout=None)
- self.mocker.result(StringIO(val))
-
+ for key in valid_order:
+ url = "%s/%s/%s" % (my_seed, my_ver, key)
+ mock_request(url, headers=my_headers, timeout=None)
+ resp = valid.get(key)
+ self.mocker.result(url_helper.UrlResponse(200, resp))
self.mocker.replay()
- (userdata, metadata) = read_maas_seed_url(my_seed,
+ (userdata, metadata) = DataSourceMAAS.read_maas_seed_url(my_seed,
header_cb=my_headers_cb, version=my_ver)
self.assertEqual("foodata", userdata)
diff --git a/tests/unittests/test_handler/test_handler_ca_certs.py b/tests/unittests/test_handler/test_handler_ca_certs.py
index 21d2442f..1f96e992 100644
--- a/tests/unittests/test_handler/test_handler_ca_certs.py
+++ b/tests/unittests/test_handler/test_handler_ca_certs.py
@@ -1,9 +1,12 @@
from mocker import MockerTestCase
-from cloudinit.util import write_file, delete_dir_contents
-from cloudinit.CloudConfig.cc_ca_certs import (
- handle, update_ca_certs, add_ca_certs, remove_default_ca_certs)
-from logging import getLogger
+from cloudinit import util
+from cloudinit import cloud
+from cloudinit import helpers
+
+from cloudinit.config import cc_ca_certs
+
+import logging
class TestNoConfig(MockerTestCase):
@@ -11,36 +14,37 @@ class TestNoConfig(MockerTestCase):
super(TestNoConfig, self).setUp()
self.name = "ca-certs"
self.cloud_init = None
- self.log = getLogger("TestNoConfig")
+ self.log = logging.getLogger("TestNoConfig")
self.args = []
def test_no_config(self):
"""
Test that nothing is done if no ca-certs configuration is provided.
"""
- config = {"unknown-key": "value"}
-
- self.mocker.replace(write_file, passthrough=False)
- self.mocker.replace(update_ca_certs, passthrough=False)
+ config = util.get_builtin_cfg()
+ self.mocker.replace(util.write_file, passthrough=False)
+ self.mocker.replace(cc_ca_certs.update_ca_certs, passthrough=False)
self.mocker.replay()
- handle(self.name, config, self.cloud_init, self.log, self.args)
+ cc_ca_certs.handle(self.name, config, self.cloud_init, self.log, self.args)
class TestConfig(MockerTestCase):
def setUp(self):
super(TestConfig, self).setUp()
self.name = "ca-certs"
- self.cloud_init = None
- self.log = getLogger("TestNoConfig")
+ self.paths = None
+ self.cloud = cloud.Cloud(None, self.paths, None, None, None)
+ self.log = logging.getLogger("TestNoConfig")
self.args = []
# Mock out the functions that actually modify the system
- self.mock_add = self.mocker.replace(add_ca_certs, passthrough=False)
- self.mock_update = self.mocker.replace(update_ca_certs,
+ self.mock_add = self.mocker.replace(cc_ca_certs.add_ca_certs, passthrough=False)
+ self.mock_update = self.mocker.replace(cc_ca_certs.update_ca_certs,
passthrough=False)
- self.mock_remove = self.mocker.replace(remove_default_ca_certs,
+ self.mock_remove = self.mocker.replace(cc_ca_certs.remove_default_ca_certs,
passthrough=False)
+
# Order must be correct
self.mocker.order()
@@ -55,7 +59,7 @@ class TestConfig(MockerTestCase):
self.mock_update()
self.mocker.replay()
- handle(self.name, config, self.cloud_init, self.log, self.args)
+ cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
def test_empty_trusted_list(self):
"""Test that no certificate are written if 'trusted' list is empty"""
@@ -65,37 +69,37 @@ class TestConfig(MockerTestCase):
self.mock_update()
self.mocker.replay()
- handle(self.name, config, self.cloud_init, self.log, self.args)
+ cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
def test_single_trusted(self):
"""Test that a single cert gets passed to add_ca_certs"""
config = {"ca-certs": {"trusted": ["CERT1"]}}
- self.mock_add(["CERT1"])
+ self.mock_add(self.paths, ["CERT1"])
self.mock_update()
self.mocker.replay()
- handle(self.name, config, self.cloud_init, self.log, self.args)
+ cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
def test_multiple_trusted(self):
"""Test that multiple certs get passed to add_ca_certs"""
config = {"ca-certs": {"trusted": ["CERT1", "CERT2"]}}
- self.mock_add(["CERT1", "CERT2"])
+ self.mock_add(self.paths, ["CERT1", "CERT2"])
self.mock_update()
self.mocker.replay()
- handle(self.name, config, self.cloud_init, self.log, self.args)
+ cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
def test_remove_default_ca_certs(self):
"""Test remove_defaults works as expected"""
config = {"ca-certs": {"remove-defaults": True}}
- self.mock_remove()
+ self.mock_remove(self.paths)
self.mock_update()
self.mocker.replay()
- handle(self.name, config, self.cloud_init, self.log, self.args)
+ cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
def test_no_remove_defaults_if_false(self):
"""Test remove_defaults is not called when config value is False"""
@@ -104,72 +108,85 @@ class TestConfig(MockerTestCase):
self.mock_update()
self.mocker.replay()
- handle(self.name, config, self.cloud_init, self.log, self.args)
+ cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
def test_correct_order_for_remove_then_add(self):
"""Test remove_defaults is not called when config value is False"""
config = {"ca-certs": {"remove-defaults": True, "trusted": ["CERT1"]}}
- self.mock_remove()
- self.mock_add(["CERT1"])
+ self.mock_remove(self.paths)
+ self.mock_add(self.paths, ["CERT1"])
self.mock_update()
self.mocker.replay()
- handle(self.name, config, self.cloud_init, self.log, self.args)
+ cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
class TestAddCaCerts(MockerTestCase):
+
+ def setUp(self):
+ super(TestAddCaCerts, self).setUp()
+ self.paths = helpers.Paths({
+ 'cloud_dir': self.makeDir()
+ })
+
def test_no_certs_in_list(self):
"""Test that no certificate are written if not provided."""
- self.mocker.replace(write_file, passthrough=False)
+ self.mocker.replace(util.write_file, passthrough=False)
self.mocker.replay()
-
- add_ca_certs([])
+ cc_ca_certs.add_ca_certs(self.paths, [])
def test_single_cert(self):
"""Test adding a single certificate to the trusted CAs"""
cert = "CERT1\nLINE2\nLINE3"
- mock_write = self.mocker.replace(write_file, passthrough=False)
+ mock_write = self.mocker.replace(util.write_file, passthrough=False)
mock_write("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
cert, mode=0644)
mock_write("/etc/ca-certificates.conf",
- "\ncloud-init-ca-certs.crt", omode="a")
+ "\ncloud-init-ca-certs.crt", omode="ab")
self.mocker.replay()
- add_ca_certs([cert])
+ cc_ca_certs.add_ca_certs(self.paths, [cert])
def test_multiple_certs(self):
"""Test adding multiple certificates to the trusted CAs"""
certs = ["CERT1\nLINE2\nLINE3", "CERT2\nLINE2\nLINE3"]
expected_cert_file = "\n".join(certs)
- mock_write = self.mocker.replace(write_file, passthrough=False)
+ mock_write = self.mocker.replace(util.write_file, passthrough=False)
mock_write("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
expected_cert_file, mode=0644)
mock_write("/etc/ca-certificates.conf",
- "\ncloud-init-ca-certs.crt", omode="a")
+ "\ncloud-init-ca-certs.crt", omode="ab")
self.mocker.replay()
- add_ca_certs(certs)
+ cc_ca_certs.add_ca_certs(self.paths, certs)
class TestUpdateCaCerts(MockerTestCase):
def test_commands(self):
- mock_check_call = self.mocker.replace("subprocess.check_call",
+ mock_check_call = self.mocker.replace(util.subp,
passthrough=False)
- mock_check_call(["update-ca-certificates"])
+ mock_check_call(["update-ca-certificates"], capture=False)
self.mocker.replay()
- update_ca_certs()
+ cc_ca_certs.update_ca_certs()
class TestRemoveDefaultCaCerts(MockerTestCase):
+
+ def setUp(self):
+ super(TestRemoveDefaultCaCerts, self).setUp()
+ self.paths = helpers.Paths({
+ 'cloud_dir': self.makeDir()
+ })
+
def test_commands(self):
- mock_delete_dir_contents = self.mocker.replace(delete_dir_contents,
+ mock_delete_dir_contents = self.mocker.replace(util.delete_dir_contents,
passthrough=False)
- mock_write = self.mocker.replace(write_file, passthrough=False)
- mock_subp = self.mocker.replace("cloudinit.util.subp",
+ mock_write = self.mocker.replace(util.write_file, passthrough=False)
+ mock_subp = self.mocker.replace(util.subp,
passthrough=False)
mock_delete_dir_contents("/usr/share/ca-certificates/")
@@ -179,4 +196,4 @@ class TestRemoveDefaultCaCerts(MockerTestCase):
"ca-certificates ca-certificates/trust_new_crts select no")
self.mocker.replay()
- remove_default_ca_certs()
+ cc_ca_certs.remove_default_ca_certs(self.paths)
diff --git a/tests/unittests/test_userdata.py b/tests/unittests/test_userdata.py
index 8eb7b259..861642b6 100644
--- a/tests/unittests/test_userdata.py
+++ b/tests/unittests/test_userdata.py
@@ -1,107 +1,144 @@
"""Tests for handling of userdata within cloud init"""
-import logging
import StringIO
+import logging
+import os
+import shutil
+import tempfile
+
from email.mime.base import MIMEBase
from mocker import MockerTestCase
-import cloudinit
-from cloudinit.DataSource import DataSource
-
+from cloudinit import helpers
+from cloudinit import log
+from cloudinit import sources
+from cloudinit import stages
+from cloudinit import util
-instance_id = "i-testing"
+INSTANCE_ID = "i-testing"
-class FakeDataSource(DataSource):
+class FakeDataSource(sources.DataSource):
def __init__(self, userdata):
- DataSource.__init__(self)
- self.metadata = {'instance-id': instance_id}
+ sources.DataSource.__init__(self, {}, None, None)
+ self.metadata = {'instance-id': INSTANCE_ID}
self.userdata_raw = userdata
-class TestConsumeUserData(MockerTestCase):
+# FIXME: these tests shouldn't be checking log output??
+# Weirddddd...
+
- _log_handler = None
- _log = None
- log_file = None
+class TestConsumeUserData(MockerTestCase):
def setUp(self):
+ MockerTestCase.setUp(self)
+ # Replace the write so no actual files
+ # get written out...
self.mock_write = self.mocker.replace("cloudinit.util.write_file",
passthrough=False)
- self.mock_write(self.get_ipath("cloud_config"), "", 0600)
- self.capture_log()
+ self._log = None
+ self._log_file = None
+ self._log_handler = None
def tearDown(self):
- self._log.removeHandler(self._log_handler)
-
- @staticmethod
- def get_ipath(name):
- return "%s/instances/%s%s" % (cloudinit.varlibdir, instance_id,
- cloudinit.pathmap[name])
-
- def capture_log(self):
- self.log_file = StringIO.StringIO()
- self._log_handler = logging.StreamHandler(self.log_file)
- self._log_handler.setLevel(logging.DEBUG)
- self._log = logging.getLogger(cloudinit.logger_name)
+ MockerTestCase.tearDown(self)
+ if self._log_handler and self._log:
+ self._log.removeHandler(self._log_handler)
+
+ def capture_log(self, lvl=logging.DEBUG):
+ log_file = StringIO.StringIO()
+ self._log_handler = logging.StreamHandler(log_file)
+ self._log_handler.setLevel(lvl)
+ self._log = log.getLogger()
self._log.addHandler(self._log_handler)
+ return log_file
def test_unhandled_type_warning(self):
"""Raw text without magic is ignored but shows warning"""
+ ci = stages.Init()
+ data = "arbitrary text\n"
+ ci.datasource = FakeDataSource(data)
+
+ self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
self.mocker.replay()
- ci = cloudinit.CloudInit()
- ci.datasource = FakeDataSource("arbitrary text\n")
+
+ log_file = self.capture_log(logging.WARNING)
+ ci.fetch()
ci.consume_userdata()
- self.assertEqual(
- "Unhandled non-multipart userdata starting 'arbitrary text...'\n",
- self.log_file.getvalue())
+ self.assertIn(
+ "Unhandled non-multipart (text/x-not-multipart) userdata:",
+ log_file.getvalue())
def test_mime_text_plain(self):
- """Mime message of type text/plain is ignored without warning"""
- self.mocker.replay()
- ci = cloudinit.CloudInit()
+ """Mime message of type text/plain is ignored but shows warning"""
+ ci = stages.Init()
message = MIMEBase("text", "plain")
message.set_payload("Just text")
ci.datasource = FakeDataSource(message.as_string())
+
+ self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
+ self.mocker.replay()
+
+ log_file = self.capture_log(logging.WARNING)
+ ci.fetch()
ci.consume_userdata()
- self.assertEqual("", self.log_file.getvalue())
+ self.assertIn(
+ "Unhandled unknown content-type (text/plain)",
+ log_file.getvalue())
+
def test_shellscript(self):
"""Raw text starting #!/bin/sh is treated as script"""
+ ci = stages.Init()
script = "#!/bin/sh\necho hello\n"
- outpath = cloudinit.get_ipath_cur("scripts") + "/part-001"
+ ci.datasource = FakeDataSource(script)
+
+ outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
+ self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
self.mock_write(outpath, script, 0700)
self.mocker.replay()
- ci = cloudinit.CloudInit()
- ci.datasource = FakeDataSource(script)
+
+ log_file = self.capture_log(logging.WARNING)
+ ci.fetch()
ci.consume_userdata()
- self.assertEqual("", self.log_file.getvalue())
+ self.assertEqual("", log_file.getvalue())
def test_mime_text_x_shellscript(self):
"""Mime message of type text/x-shellscript is treated as script"""
+ ci = stages.Init()
script = "#!/bin/sh\necho hello\n"
- outpath = cloudinit.get_ipath_cur("scripts") + "/part-001"
- self.mock_write(outpath, script, 0700)
- self.mocker.replay()
- ci = cloudinit.CloudInit()
message = MIMEBase("text", "x-shellscript")
message.set_payload(script)
ci.datasource = FakeDataSource(message.as_string())
+
+ outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
+ self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
+ self.mock_write(outpath, script, 0700)
+ self.mocker.replay()
+
+ log_file = self.capture_log(logging.WARNING)
+ ci.fetch()
ci.consume_userdata()
- self.assertEqual("", self.log_file.getvalue())
+ self.assertEqual("", log_file.getvalue())
def test_mime_text_plain_shell(self):
"""Mime type text/plain starting #!/bin/sh is treated as script"""
+ ci = stages.Init()
script = "#!/bin/sh\necho hello\n"
- outpath = cloudinit.get_ipath_cur("scripts") + "/part-001"
- self.mock_write(outpath, script, 0700)
- self.mocker.replay()
- ci = cloudinit.CloudInit()
message = MIMEBase("text", "plain")
message.set_payload(script)
ci.datasource = FakeDataSource(message.as_string())
+
+ outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
+ self.mock_write(outpath, script, 0700)
+ self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
+ self.mocker.replay()
+
+ log_file = self.capture_log(logging.WARNING)
+ ci.fetch()
ci.consume_userdata()
- self.assertEqual("", self.log_file.getvalue())
+ self.assertEqual("", log_file.getvalue())
diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py
index e8f5885c..93979f06 100644
--- a/tests/unittests/test_util.py
+++ b/tests/unittests/test_util.py
@@ -1,28 +1,45 @@
-from unittest import TestCase
-from mocker import MockerTestCase
-from tempfile import mkdtemp
-from shutil import rmtree
import os
import stat
-from cloudinit.util import (mergedict, get_cfg_option_list_or_str, write_file,
- delete_dir_contents, get_cmdline,
- keyval_str_to_dict)
+from unittest import TestCase
+from mocker import MockerTestCase
+
+from cloudinit import util
+from cloudinit import importer
+
+
+class FakeSelinux(object):
+
+ def __init__(self, match_what):
+ self.match_what = match_what
+ self.restored = []
+
+ def matchpathcon(self, path, mode):
+ if path == self.match_what:
+ return
+ else:
+ raise OSError("No match!")
+ def is_selinux_enabled(self):
+ return True
-class TestMergeDict(TestCase):
+ def restorecon(self, path, recursive):
+ self.restored.append(path)
+
+
+class TestMergeDict(MockerTestCase):
def test_simple_merge(self):
"""Test simple non-conflict merge."""
source = {"key1": "value1"}
candidate = {"key2": "value2"}
- result = mergedict(source, candidate)
+ result = util.mergedict(source, candidate)
self.assertEqual({"key1": "value1", "key2": "value2"}, result)
def test_nested_merge(self):
"""Test nested merge."""
source = {"key1": {"key1.1": "value1.1"}}
candidate = {"key1": {"key1.2": "value1.2"}}
- result = mergedict(source, candidate)
+ result = util.mergedict(source, candidate)
self.assertEqual(
{"key1": {"key1.1": "value1.1", "key1.2": "value1.2"}}, result)
@@ -30,42 +47,42 @@ class TestMergeDict(TestCase):
"""Test that candidate doesn't override source."""
source = {"key1": "value1", "key2": "value2"}
candidate = {"key1": "value2", "key2": "NEW VALUE"}
- result = mergedict(source, candidate)
+ result = util.mergedict(source, candidate)
self.assertEqual(source, result)
def test_empty_candidate(self):
"""Test empty candidate doesn't change source."""
source = {"key": "value"}
candidate = {}
- result = mergedict(source, candidate)
+ result = util.mergedict(source, candidate)
self.assertEqual(source, result)
def test_empty_source(self):
"""Test empty source is replaced by candidate."""
source = {}
candidate = {"key": "value"}
- result = mergedict(source, candidate)
+ result = util.mergedict(source, candidate)
self.assertEqual(candidate, result)
def test_non_dict_candidate(self):
"""Test non-dict candidate is discarded."""
source = {"key": "value"}
candidate = "not a dict"
- result = mergedict(source, candidate)
+ result = util.mergedict(source, candidate)
self.assertEqual(source, result)
def test_non_dict_source(self):
"""Test non-dict source is not modified with a dict candidate."""
source = "not a dict"
candidate = {"key": "value"}
- result = mergedict(source, candidate)
+ result = util.mergedict(source, candidate)
self.assertEqual(source, result)
def test_neither_dict(self):
"""Test if neither candidate or source is dict source wins."""
source = "source"
candidate = "candidate"
- result = mergedict(source, candidate)
+ result = util.mergedict(source, candidate)
self.assertEqual(source, result)
@@ -73,51 +90,45 @@ class TestGetCfgOptionListOrStr(TestCase):
def test_not_found_no_default(self):
"""None is returned if key is not found and no default given."""
config = {}
- result = get_cfg_option_list_or_str(config, "key")
- self.assertIsNone(result)
+ result = util.get_cfg_option_list(config, "key")
+ self.assertEqual(None, result)
def test_not_found_with_default(self):
"""Default is returned if key is not found."""
config = {}
- result = get_cfg_option_list_or_str(config, "key", default=["DEFAULT"])
+ result = util.get_cfg_option_list(config, "key", default=["DEFAULT"])
self.assertEqual(["DEFAULT"], result)
def test_found_with_default(self):
"""Default is not returned if key is found."""
config = {"key": ["value1"]}
- result = get_cfg_option_list_or_str(config, "key", default=["DEFAULT"])
+ result = util.get_cfg_option_list(config, "key", default=["DEFAULT"])
self.assertEqual(["value1"], result)
def test_found_convert_to_list(self):
"""Single string is converted to one element list."""
config = {"key": "value1"}
- result = get_cfg_option_list_or_str(config, "key")
+ result = util.get_cfg_option_list(config, "key")
self.assertEqual(["value1"], result)
def test_value_is_none(self):
"""If value is None empty list is returned."""
config = {"key": None}
- result = get_cfg_option_list_or_str(config, "key")
+ result = util.get_cfg_option_list(config, "key")
self.assertEqual([], result)
class TestWriteFile(MockerTestCase):
def setUp(self):
super(TestWriteFile, self).setUp()
- # Make a temp directoy for tests to use.
- self.tmp = mkdtemp(prefix="unittest_")
-
- def tearDown(self):
- super(TestWriteFile, self).tearDown()
- # Clean up temp directory
- rmtree(self.tmp)
+ self.tmp = self.makeDir(prefix="unittest_")
def test_basic_usage(self):
"""Verify basic usage with default args."""
path = os.path.join(self.tmp, "NewFile.txt")
contents = "Hey there"
- write_file(path, contents)
+ util.write_file(path, contents)
self.assertTrue(os.path.exists(path))
self.assertTrue(os.path.isfile(path))
@@ -133,7 +144,7 @@ class TestWriteFile(MockerTestCase):
path = os.path.join(dirname, "NewFile.txt")
contents = "Hey there"
- write_file(path, contents)
+ util.write_file(path, contents)
self.assertTrue(os.path.isdir(dirname))
self.assertTrue(os.path.isfile(path))
@@ -143,7 +154,7 @@ class TestWriteFile(MockerTestCase):
path = os.path.join(self.tmp, "NewFile.txt")
contents = "Hey there"
- write_file(path, contents, mode=0666)
+ util.write_file(path, contents, mode=0666)
self.assertTrue(os.path.exists(path))
self.assertTrue(os.path.isfile(path))
@@ -158,7 +169,7 @@ class TestWriteFile(MockerTestCase):
# Create file first with basic content
with open(path, "wb") as f:
f.write("LINE1\n")
- write_file(path, contents, omode="a")
+ util.write_file(path, contents, omode="a")
self.assertTrue(os.path.exists(path))
self.assertTrue(os.path.isfile(path))
@@ -167,36 +178,30 @@ class TestWriteFile(MockerTestCase):
self.assertEqual("LINE1\nHey there", create_contents)
def test_restorecon_if_possible_is_called(self):
- """Make sure the restorecon_if_possible is called correctly."""
- path = os.path.join(self.tmp, "NewFile.txt")
- contents = "Hey there"
-
- # Mock out the restorecon_if_possible call to test if it's called.
- mock_restorecon = self.mocker.replace(
- "cloudinit.util.restorecon_if_possible", passthrough=False)
- mock_restorecon(path)
+ """Make sure the selinux guard is called correctly."""
+ import_mock = self.mocker.replace(importer.import_module,
+ passthrough=False)
+ import_mock('selinux')
+ fake_se = FakeSelinux('/etc/hosts')
+ self.mocker.result(fake_se)
self.mocker.replay()
+ with util.SeLinuxGuard("/etc/hosts") as is_on:
+ self.assertTrue(is_on)
+ self.assertEqual(1, len(fake_se.restored))
+ self.assertEqual('/etc/hosts', fake_se.restored[0])
- write_file(path, contents)
-
-class TestDeleteDirContents(TestCase):
+class TestDeleteDirContents(MockerTestCase):
def setUp(self):
super(TestDeleteDirContents, self).setUp()
- # Make a temp directoy for tests to use.
- self.tmp = mkdtemp(prefix="unittest_")
-
- def tearDown(self):
- super(TestDeleteDirContents, self).tearDown()
- # Clean up temp directory
- rmtree(self.tmp)
+ self.tmp = self.makeDir(prefix="unittest_")
def assertDirEmpty(self, dirname):
self.assertEqual([], os.listdir(dirname))
def test_does_not_delete_dir(self):
"""Ensure directory itself is not deleted."""
- delete_dir_contents(self.tmp)
+ util.delete_dir_contents(self.tmp)
self.assertTrue(os.path.isdir(self.tmp))
self.assertDirEmpty(self.tmp)
@@ -206,7 +211,7 @@ class TestDeleteDirContents(TestCase):
with open(os.path.join(self.tmp, "new_file.txt"), "wb") as f:
f.write("DELETE ME")
- delete_dir_contents(self.tmp)
+ util.delete_dir_contents(self.tmp)
self.assertDirEmpty(self.tmp)
@@ -214,7 +219,7 @@ class TestDeleteDirContents(TestCase):
"""Empty directories should be deleted."""
os.mkdir(os.path.join(self.tmp, "new_dir"))
- delete_dir_contents(self.tmp)
+ util.delete_dir_contents(self.tmp)
self.assertDirEmpty(self.tmp)
@@ -223,7 +228,7 @@ class TestDeleteDirContents(TestCase):
os.mkdir(os.path.join(self.tmp, "new_dir"))
os.mkdir(os.path.join(self.tmp, "new_dir", "new_subdir"))
- delete_dir_contents(self.tmp)
+ util.delete_dir_contents(self.tmp)
self.assertDirEmpty(self.tmp)
@@ -234,7 +239,7 @@ class TestDeleteDirContents(TestCase):
with open(f_name, "wb") as f:
f.write("DELETE ME")
- delete_dir_contents(self.tmp)
+ util.delete_dir_contents(self.tmp)
self.assertDirEmpty(self.tmp)
@@ -246,7 +251,7 @@ class TestDeleteDirContents(TestCase):
f.write("DELETE ME")
os.symlink(file_name, link_name)
- delete_dir_contents(self.tmp)
+ util.delete_dir_contents(self.tmp)
self.assertDirEmpty(self.tmp)
@@ -255,12 +260,12 @@ class TestKeyValStrings(TestCase):
def test_keyval_str_to_dict(self):
expected = {'1': 'one', '2': 'one+one', 'ro': True}
cmdline = "1=one ro 2=one+one"
- self.assertEqual(expected, keyval_str_to_dict(cmdline))
+ self.assertEqual(expected, util.keyval_str_to_dict(cmdline))
class TestGetCmdline(TestCase):
def test_cmdline_reads_debug_env(self):
os.environ['DEBUG_PROC_CMDLINE'] = 'abcd 123'
- self.assertEqual(os.environ['DEBUG_PROC_CMDLINE'], get_cmdline())
+ self.assertEqual(os.environ['DEBUG_PROC_CMDLINE'], util.get_cmdline())
# vi: ts=4 expandtab