summaryrefslogtreecommitdiff
path: root/tests/unittests/test_userdata.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/test_userdata.py')
-rw-r--r--tests/unittests/test_userdata.py107
1 files changed, 88 insertions, 19 deletions
diff --git a/tests/unittests/test_userdata.py b/tests/unittests/test_userdata.py
index 82a4c555..fdfe2542 100644
--- a/tests/unittests/test_userdata.py
+++ b/tests/unittests/test_userdata.py
@@ -7,14 +7,17 @@ import os
from email.mime.base import MIMEBase
-from mocker import MockerTestCase
-
+from cloudinit import handlers
+from cloudinit import helpers as c_helpers
from cloudinit import log
from cloudinit import sources
from cloudinit import stages
+from cloudinit import util
INSTANCE_ID = "i-testing"
+from tests.unittests import helpers
+
class FakeDataSource(sources.DataSource):
@@ -26,22 +29,16 @@ class FakeDataSource(sources.DataSource):
# FIXME: these tests shouldn't be checking log output??
# Weirddddd...
-
-
-class TestConsumeUserData(MockerTestCase):
+class TestConsumeUserData(helpers.FilesystemMockingTestCase):
def setUp(self):
- MockerTestCase.setUp(self)
- # Replace the write so no actual files
- # get written out...
- self.mock_write = self.mocker.replace("cloudinit.util.write_file",
- passthrough=False)
+ helpers.FilesystemMockingTestCase.setUp(self)
self._log = None
self._log_file = None
self._log_handler = None
def tearDown(self):
- MockerTestCase.tearDown(self)
+ helpers.FilesystemMockingTestCase.tearDown(self)
if self._log_handler and self._log:
self._log.removeHandler(self._log_handler)
@@ -53,13 +50,77 @@ class TestConsumeUserData(MockerTestCase):
self._log.addHandler(self._log_handler)
return log_file
+ def test_merging_cloud_config(self):
+ blob = '''
+#cloud-config
+a: b
+e: f
+run:
+ - b
+ - c
+'''
+ message1 = MIMEBase("text", "cloud-config")
+ message1['Merge-Type'] = 'dict()+list(extend)+str(append)'
+ message1.set_payload(blob)
+
+ blob2 = '''
+#cloud-config
+a: e
+e: g
+run:
+ - stuff
+ - morestuff
+'''
+ message2 = MIMEBase("text", "cloud-config")
+ message2['X-Merge-Type'] = 'dict()+list(extend)+str()'
+ message2.set_payload(blob2)
+
+ blob3 = '''
+#cloud-config
+e:
+ - 1
+ - 2
+ - 3
+p: 1
+'''
+ message3 = MIMEBase("text", "cloud-config")
+ message3['Merge-Type'] = 'dict()+list()+str()'
+ message3.set_payload(blob3)
+
+ messages = [message1, message2, message3]
+
+ paths = c_helpers.Paths({}, ds=FakeDataSource(''))
+ cloud_cfg = handlers.cloud_config.CloudConfigPartHandler(paths)
+
+ new_root = self.makeDir()
+ self.patchUtils(new_root)
+ self.patchOS(new_root)
+ cloud_cfg.handle_part(None, handlers.CONTENT_START, None, None, None,
+ None)
+ for i, m in enumerate(messages):
+ headers = dict(m)
+ fn = "part-%s" % (i + 1)
+ payload = m.get_payload(decode=True)
+ cloud_cfg.handle_part(None, headers['Content-Type'],
+ fn, payload, None, headers)
+ cloud_cfg.handle_part(None, handlers.CONTENT_END, None, None, None,
+ None)
+ contents = util.load_file(paths.get_ipath('cloud_config'))
+ contents = util.load_yaml(contents)
+ self.assertEquals(contents['run'], ['b', 'c', 'stuff', 'morestuff'])
+ self.assertEquals(contents['a'], 'be')
+ self.assertEquals(contents['e'], 'fg')
+ self.assertEquals(contents['p'], 1)
+
def test_unhandled_type_warning(self):
"""Raw text without magic is ignored but shows warning."""
ci = stages.Init()
data = "arbitrary text\n"
ci.datasource = FakeDataSource(data)
- self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
+ mock_write = self.mocker.replace("cloudinit.util.write_file",
+ passthrough=False)
+ mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
self.mocker.replay()
log_file = self.capture_log(logging.WARNING)
@@ -76,7 +137,9 @@ class TestConsumeUserData(MockerTestCase):
message.set_payload("Just text")
ci.datasource = FakeDataSource(message.as_string())
- self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
+ mock_write = self.mocker.replace("cloudinit.util.write_file",
+ passthrough=False)
+ mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
self.mocker.replay()
log_file = self.capture_log(logging.WARNING)
@@ -93,8 +156,10 @@ class TestConsumeUserData(MockerTestCase):
ci.datasource = FakeDataSource(script)
outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
- self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
- self.mock_write(outpath, script, 0700)
+ mock_write = self.mocker.replace("cloudinit.util.write_file",
+ passthrough=False)
+ mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
+ mock_write(outpath, script, 0700)
self.mocker.replay()
log_file = self.capture_log(logging.WARNING)
@@ -111,8 +176,10 @@ class TestConsumeUserData(MockerTestCase):
ci.datasource = FakeDataSource(message.as_string())
outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
- self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
- self.mock_write(outpath, script, 0700)
+ mock_write = self.mocker.replace("cloudinit.util.write_file",
+ passthrough=False)
+ mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
+ mock_write(outpath, script, 0700)
self.mocker.replay()
log_file = self.capture_log(logging.WARNING)
@@ -129,8 +196,10 @@ class TestConsumeUserData(MockerTestCase):
ci.datasource = FakeDataSource(message.as_string())
outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
- self.mock_write(outpath, script, 0700)
- self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
+ mock_write = self.mocker.replace("cloudinit.util.write_file",
+ passthrough=False)
+ mock_write(outpath, script, 0700)
+ mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
self.mocker.replay()
log_file = self.capture_log(logging.WARNING)