diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/unittests/test_userdata.py | 145 |
1 files changed, 91 insertions, 54 deletions
diff --git a/tests/unittests/test_userdata.py b/tests/unittests/test_userdata.py index 8eb7b259..eeddde7d 100644 --- a/tests/unittests/test_userdata.py +++ b/tests/unittests/test_userdata.py @@ -1,107 +1,144 @@ """Tests for handling of userdata within cloud init""" -import logging import StringIO +import logging +import os +import shutil +import tempfile + from email.mime.base import MIMEBase from mocker import MockerTestCase -import cloudinit -from cloudinit.DataSource import DataSource - +from cloudinit import helpers +from cloudinit import log +from cloudinit import sources +from cloudinit import stages +from cloudinit import util -instance_id = "i-testing" +INSTANCE_ID = "i-testing" -class FakeDataSource(DataSource): +class FakeDataSource(sources.DataSource): def __init__(self, userdata): - DataSource.__init__(self) - self.metadata = {'instance-id': instance_id} + sources.DataSource.__init__(self, {}, None, None) + self.metadata = {'instance-id': INSTANCE_ID} self.userdata_raw = userdata -class TestConsumeUserData(MockerTestCase): +# FIXME: these tests shouldn't be checking log output?? +# Weirddddd... + - _log_handler = None - _log = None - log_file = None +class TestConsumeUserData(MockerTestCase): def setUp(self): + MockerTestCase.setUp(self) + # Replace the write so no actual files + # get written out... self.mock_write = self.mocker.replace("cloudinit.util.write_file", passthrough=False) - self.mock_write(self.get_ipath("cloud_config"), "", 0600) - self.capture_log() + self._log = None + self._log_file = None + self._log_handler = None def tearDown(self): - self._log.removeHandler(self._log_handler) - - @staticmethod - def get_ipath(name): - return "%s/instances/%s%s" % (cloudinit.varlibdir, instance_id, - cloudinit.pathmap[name]) - - def capture_log(self): - self.log_file = StringIO.StringIO() - self._log_handler = logging.StreamHandler(self.log_file) - self._log_handler.setLevel(logging.DEBUG) - self._log = logging.getLogger(cloudinit.logger_name) + MockerTestCase.tearDown(self) + if self._log_handler and self._log: + self._log.removeHandler(self._log_handler) + + def capture_log(self, lvl=logging.DEBUG): + log_file = StringIO.StringIO() + self._log_handler = logging.StreamHandler(log_file) + self._log_handler.setLevel(lvl) + self._log = log.getLogger() self._log.addHandler(self._log_handler) + return log_file def test_unhandled_type_warning(self): """Raw text without magic is ignored but shows warning""" + ci = stages.Init() + data = "arbitrary text\n" + ci.datasource = FakeDataSource(data) + + self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600) self.mocker.replay() - ci = cloudinit.CloudInit() - ci.datasource = FakeDataSource("arbitrary text\n") - ci.consume_userdata() - self.assertEqual( - "Unhandled non-multipart userdata starting 'arbitrary text...'\n", - self.log_file.getvalue()) + + log_file = self.capture_log(logging.WARNING) + ci.fetch() + ci.consume() + self.assertIn( + "Unhandled non-multipart (text/x-not-multipart) userdata:", + log_file.getvalue()) def test_mime_text_plain(self): - """Mime message of type text/plain is ignored without warning""" - self.mocker.replay() - ci = cloudinit.CloudInit() + """Mime message of type text/plain is ignored but shows warning""" + ci = stages.Init() message = MIMEBase("text", "plain") message.set_payload("Just text") ci.datasource = FakeDataSource(message.as_string()) - ci.consume_userdata() - self.assertEqual("", self.log_file.getvalue()) + self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600) + self.mocker.replay() + + log_file = self.capture_log(logging.WARNING) + ci.fetch() + ci.consume() + self.assertIn( + "Unhandled unknown content-type (text/plain)", + log_file.getvalue()) + + def test_shellscript(self): """Raw text starting #!/bin/sh is treated as script""" + ci = stages.Init() script = "#!/bin/sh\necho hello\n" - outpath = cloudinit.get_ipath_cur("scripts") + "/part-001" + ci.datasource = FakeDataSource(script) + + outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001") + self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600) self.mock_write(outpath, script, 0700) self.mocker.replay() - ci = cloudinit.CloudInit() - ci.datasource = FakeDataSource(script) - ci.consume_userdata() - self.assertEqual("", self.log_file.getvalue()) + + log_file = self.capture_log(logging.WARNING) + ci.fetch() + ci.consume() + self.assertEqual("", log_file.getvalue()) def test_mime_text_x_shellscript(self): """Mime message of type text/x-shellscript is treated as script""" + ci = stages.Init() script = "#!/bin/sh\necho hello\n" - outpath = cloudinit.get_ipath_cur("scripts") + "/part-001" - self.mock_write(outpath, script, 0700) - self.mocker.replay() - ci = cloudinit.CloudInit() message = MIMEBase("text", "x-shellscript") message.set_payload(script) ci.datasource = FakeDataSource(message.as_string()) - ci.consume_userdata() - self.assertEqual("", self.log_file.getvalue()) + + outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001") + self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600) + self.mock_write(outpath, script, 0700) + self.mocker.replay() + + log_file = self.capture_log(logging.WARNING) + ci.fetch() + ci.consume() + self.assertEqual("", log_file.getvalue()) def test_mime_text_plain_shell(self): """Mime type text/plain starting #!/bin/sh is treated as script""" + ci = stages.Init() script = "#!/bin/sh\necho hello\n" - outpath = cloudinit.get_ipath_cur("scripts") + "/part-001" - self.mock_write(outpath, script, 0700) - self.mocker.replay() - ci = cloudinit.CloudInit() message = MIMEBase("text", "plain") message.set_payload(script) ci.datasource = FakeDataSource(message.as_string()) - ci.consume_userdata() - self.assertEqual("", self.log_file.getvalue()) + + outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001") + self.mock_write(outpath, script, 0700) + self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600) + self.mocker.replay() + + log_file = self.capture_log(logging.WARNING) + ci.fetch() + ci.consume() + self.assertEqual("", log_file.getvalue()) |