From 1750c756e0d4a63747ffdcd0040958622e3caf58 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Wed, 24 Jul 2013 00:34:58 -0700 Subject: Add test for mime gzipped message segments. --- tests/unittests/test_userdata.py | 52 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 48 insertions(+), 4 deletions(-) diff --git a/tests/unittests/test_userdata.py b/tests/unittests/test_userdata.py index 0ebb0484..dc252b5d 100644 --- a/tests/unittests/test_userdata.py +++ b/tests/unittests/test_userdata.py @@ -2,10 +2,14 @@ import StringIO +import gzip import logging +import mocker import os from email.mime.base import MIMEBase +from email.mime.multipart import MIMEMultipart +from email.mime.application import MIMEApplication from cloudinit import handlers from cloudinit import helpers as c_helpers @@ -118,7 +122,7 @@ p: 1 ci.datasource = FakeDataSource(data) mock_write = self.mocker.replace("cloudinit.util.write_file", - passthrough=False) + passthrough=False) mock_write(ci.paths.get_ipath("cloud_config"), "", 0600) self.mocker.replay() @@ -129,6 +133,46 @@ p: 1 "Unhandled non-multipart (text/x-not-multipart) userdata:", log_file.getvalue()) + def test_mime_gzip_compressed(self): + """Tests that individual message gzip encoding works.""" + + def gzip_part(text): + contents = StringIO.StringIO() + f = gzip.GzipFile(fileobj=contents, mode='w') + f.write(str(text)) + f.flush() + f.close() + return MIMEApplication(contents.getvalue(), 'gzip') + + base_content1 = ''' +#cloud-config +a: 2 +''' + + base_content2 = ''' +#cloud-config +b: 3 +c: 4 +''' + + message = MIMEMultipart('test') + message.attach(gzip_part(base_content1)) + message.attach(gzip_part(base_content2)) + ci = stages.Init() + ci.datasource = FakeDataSource(str(message)) + new_root = self.makeDir() + self.patchUtils(new_root) + self.patchOS(new_root) + ci.fetch() + ci.consume_userdata() + contents = util.load_file(ci.paths.get_ipath("cloud_config")) + contents = util.load_yaml(contents) + self.assertTrue(isinstance(contents, dict)) + self.assertEquals(3, len(contents)) + self.assertEquals(2, contents['a']) + self.assertEquals(3, contents['b']) + self.assertEquals(4, contents['c']) + def test_mime_text_plain(self): """Mime message of type text/plain is ignored but shows warning.""" ci = stages.Init() @@ -137,7 +181,7 @@ p: 1 ci.datasource = FakeDataSource(message.as_string()) mock_write = self.mocker.replace("cloudinit.util.write_file", - passthrough=False) + passthrough=False) mock_write(ci.paths.get_ipath("cloud_config"), "", 0600) self.mocker.replay() @@ -156,7 +200,7 @@ p: 1 outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001") mock_write = self.mocker.replace("cloudinit.util.write_file", - passthrough=False) + passthrough=False) mock_write(ci.paths.get_ipath("cloud_config"), "", 0600) mock_write(outpath, script, 0700) self.mocker.replay() @@ -176,7 +220,7 @@ p: 1 outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001") mock_write = self.mocker.replace("cloudinit.util.write_file", - passthrough=False) + passthrough=False) mock_write(ci.paths.get_ipath("cloud_config"), "", 0600) mock_write(outpath, script, 0700) self.mocker.replay() -- cgit v1.2.3