diff options
-rw-r--r-- | tests/unittests/test_userdata.py | 50 |
1 files changed, 46 insertions, 4 deletions
diff --git a/tests/unittests/test_userdata.py b/tests/unittests/test_userdata.py index 5cd50f4f..b227616c 100644 --- a/tests/unittests/test_userdata.py +++ b/tests/unittests/test_userdata.py @@ -2,11 +2,13 @@ import StringIO +import gzip import logging import os from email.mime.base import MIMEBase from email.mime.multipart import MIMEMultipart +from email.mime.application import MIMEApplication from cloudinit import handlers from cloudinit import helpers as c_helpers @@ -177,7 +179,7 @@ p: 1 ci.datasource = FakeDataSource(data) mock_write = self.mocker.replace("cloudinit.util.write_file", - passthrough=False) + passthrough=False) mock_write(ci.paths.get_ipath("cloud_config"), "", 0600) self.mocker.replay() @@ -188,6 +190,46 @@ p: 1 "Unhandled non-multipart (text/x-not-multipart) userdata:", log_file.getvalue()) + def test_mime_gzip_compressed(self): + """Tests that individual message gzip encoding works.""" + + def gzip_part(text): + contents = StringIO.StringIO() + f = gzip.GzipFile(fileobj=contents, mode='w') + f.write(str(text)) + f.flush() + f.close() + return MIMEApplication(contents.getvalue(), 'gzip') + + base_content1 = ''' +#cloud-config +a: 2 +''' + + base_content2 = ''' +#cloud-config +b: 3 +c: 4 +''' + + message = MIMEMultipart('test') + message.attach(gzip_part(base_content1)) + message.attach(gzip_part(base_content2)) + ci = stages.Init() + ci.datasource = FakeDataSource(str(message)) + new_root = self.makeDir() + self.patchUtils(new_root) + self.patchOS(new_root) + ci.fetch() + ci.consume_userdata() + contents = util.load_file(ci.paths.get_ipath("cloud_config")) + contents = util.load_yaml(contents) + self.assertTrue(isinstance(contents, dict)) + self.assertEquals(3, len(contents)) + self.assertEquals(2, contents['a']) + self.assertEquals(3, contents['b']) + self.assertEquals(4, contents['c']) + def test_mime_text_plain(self): """Mime message of type text/plain is ignored but shows warning.""" ci = stages.Init() @@ -196,7 +238,7 @@ p: 1 ci.datasource = FakeDataSource(message.as_string()) mock_write = self.mocker.replace("cloudinit.util.write_file", - passthrough=False) + passthrough=False) mock_write(ci.paths.get_ipath("cloud_config"), "", 0600) self.mocker.replay() @@ -215,7 +257,7 @@ p: 1 outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001") mock_write = self.mocker.replace("cloudinit.util.write_file", - passthrough=False) + passthrough=False) mock_write(ci.paths.get_ipath("cloud_config"), "", 0600) mock_write(outpath, script, 0700) self.mocker.replay() @@ -235,7 +277,7 @@ p: 1 outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001") mock_write = self.mocker.replace("cloudinit.util.write_file", - passthrough=False) + passthrough=False) mock_write(ci.paths.get_ipath("cloud_config"), "", 0600) mock_write(outpath, script, 0700) self.mocker.replay() |