summaryrefslogtreecommitdiff
path: root/tests/unittests/test_userdata.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/test_userdata.py')
-rw-r--r--tests/unittests/test_userdata.py80
1 files changed, 71 insertions, 9 deletions
diff --git a/tests/unittests/test_userdata.py b/tests/unittests/test_userdata.py
index 82a4c555..9e1fed7e 100644
--- a/tests/unittests/test_userdata.py
+++ b/tests/unittests/test_userdata.py
@@ -9,12 +9,17 @@ from email.mime.base import MIMEBase
from mocker import MockerTestCase
+from cloudinit import handlers
+from cloudinit import helpers as c_helpers
from cloudinit import log
from cloudinit import sources
from cloudinit import stages
+from cloudinit import util
INSTANCE_ID = "i-testing"
+from tests.unittests import helpers
+
class FakeDataSource(sources.DataSource):
@@ -26,22 +31,16 @@ class FakeDataSource(sources.DataSource):
# FIXME: these tests shouldn't be checking log output??
# Weirddddd...
-
-
-class TestConsumeUserData(MockerTestCase):
+class TestConsumeUserData(helpers.FilesystemMockingTestCase):
def setUp(self):
- MockerTestCase.setUp(self)
- # Replace the write so no actual files
- # get written out...
- self.mock_write = self.mocker.replace("cloudinit.util.write_file",
- passthrough=False)
+ helpers.FilesystemMockingTestCase.setUp(self)
self._log = None
self._log_file = None
self._log_handler = None
def tearDown(self):
- MockerTestCase.tearDown(self)
+ helpers.FilesystemMockingTestCase.tearDown(self)
if self._log_handler and self._log:
self._log.removeHandler(self._log_handler)
@@ -53,12 +52,71 @@ class TestConsumeUserData(MockerTestCase):
self._log.addHandler(self._log_handler)
return log_file
+ def test_merging_cloud_config(self):
+ blob = '''
+#cloud-config
+a: b
+e: f
+run:
+ - b
+ - c
+'''
+ message1 = MIMEBase("text", "cloud-config")
+ message1['Merge-Type'] = 'dict()+list(extend)+str(append)'
+ message1.set_payload(blob)
+
+ blob2 = '''
+#cloud-config
+a: e
+e: g
+run:
+ - stuff
+ - morestuff
+'''
+ message2 = MIMEBase("text", "cloud-config")
+ message2['Merge-Type'] = 'dict()+list(extend)+str()'
+ message2.set_payload(blob2)
+
+ blob3 = '''
+#cloud-config
+e:
+ - 1
+ - 2
+ - 3
+'''
+ message3 = MIMEBase("text", "cloud-config")
+ message3['Merge-Type'] = 'dict()+list()+str()'
+ message3.set_payload(blob3)
+
+ messages = [message1, message2, message3]
+
+ paths = c_helpers.Paths({}, ds=FakeDataSource(''))
+ cloud_cfg = handlers.cloud_config.CloudConfigPartHandler(paths)
+
+ new_root = self.makeDir()
+ self.patchUtils(new_root)
+ self.patchOS(new_root)
+ cloud_cfg.handle_part(None, handlers.CONTENT_START, None, None, None, None)
+ for i, m in enumerate(messages):
+ headers = dict(m)
+ fn = "part-%s" % (i + 1)
+ payload = m.get_payload(decode=True)
+ cloud_cfg.handle_part(None, headers['Content-Type'],
+ fn, payload, None, headers)
+ cloud_cfg.handle_part(None, handlers.CONTENT_END, None, None, None, None)
+ contents = util.load_file(paths.get_ipath('cloud_config'))
+ contents = util.load_yaml(contents)
+ self.assertEquals(contents['run'], ['b', 'c', 'stuff', 'morestuff'])
+ self.assertEquals(contents['a'], 'be')
+ self.assertEquals(contents['e'], 'fg')
+
def test_unhandled_type_warning(self):
"""Raw text without magic is ignored but shows warning."""
ci = stages.Init()
data = "arbitrary text\n"
ci.datasource = FakeDataSource(data)
+ self.mock_write = self.mocker.replace("cloudinit.util.write_file", passthrough=False)
self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
self.mocker.replay()
@@ -76,6 +134,7 @@ class TestConsumeUserData(MockerTestCase):
message.set_payload("Just text")
ci.datasource = FakeDataSource(message.as_string())
+ self.mock_write = self.mocker.replace("cloudinit.util.write_file", passthrough=False)
self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
self.mocker.replay()
@@ -93,6 +152,7 @@ class TestConsumeUserData(MockerTestCase):
ci.datasource = FakeDataSource(script)
outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
+ self.mock_write = self.mocker.replace("cloudinit.util.write_file", passthrough=False)
self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
self.mock_write(outpath, script, 0700)
self.mocker.replay()
@@ -111,6 +171,7 @@ class TestConsumeUserData(MockerTestCase):
ci.datasource = FakeDataSource(message.as_string())
outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
+ self.mock_write = self.mocker.replace("cloudinit.util.write_file", passthrough=False)
self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
self.mock_write(outpath, script, 0700)
self.mocker.replay()
@@ -129,6 +190,7 @@ class TestConsumeUserData(MockerTestCase):
ci.datasource = FakeDataSource(message.as_string())
outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
+ self.mock_write = self.mocker.replace("cloudinit.util.write_file", passthrough=False)
self.mock_write(outpath, script, 0700)
self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
self.mocker.replay()