summaryrefslogtreecommitdiff
path: root/tests/unittests
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests')
-rw-r--r--tests/unittests/test_userdata.py145
1 files changed, 91 insertions, 54 deletions
diff --git a/tests/unittests/test_userdata.py b/tests/unittests/test_userdata.py
index 8eb7b259..eeddde7d 100644
--- a/tests/unittests/test_userdata.py
+++ b/tests/unittests/test_userdata.py
@@ -1,107 +1,144 @@
"""Tests for handling of userdata within cloud init"""
-import logging
import StringIO
+import logging
+import os
+import shutil
+import tempfile
+
from email.mime.base import MIMEBase
from mocker import MockerTestCase
-import cloudinit
-from cloudinit.DataSource import DataSource
-
+from cloudinit import helpers
+from cloudinit import log
+from cloudinit import sources
+from cloudinit import stages
+from cloudinit import util
-instance_id = "i-testing"
+INSTANCE_ID = "i-testing"
-class FakeDataSource(DataSource):
+class FakeDataSource(sources.DataSource):
def __init__(self, userdata):
- DataSource.__init__(self)
- self.metadata = {'instance-id': instance_id}
+ sources.DataSource.__init__(self, {}, None, None)
+ self.metadata = {'instance-id': INSTANCE_ID}
self.userdata_raw = userdata
-class TestConsumeUserData(MockerTestCase):
+# FIXME: these tests shouldn't be checking log output??
+# Weirddddd...
+
- _log_handler = None
- _log = None
- log_file = None
+class TestConsumeUserData(MockerTestCase):
def setUp(self):
+ MockerTestCase.setUp(self)
+ # Replace the write so no actual files
+ # get written out...
self.mock_write = self.mocker.replace("cloudinit.util.write_file",
passthrough=False)
- self.mock_write(self.get_ipath("cloud_config"), "", 0600)
- self.capture_log()
+ self._log = None
+ self._log_file = None
+ self._log_handler = None
def tearDown(self):
- self._log.removeHandler(self._log_handler)
-
- @staticmethod
- def get_ipath(name):
- return "%s/instances/%s%s" % (cloudinit.varlibdir, instance_id,
- cloudinit.pathmap[name])
-
- def capture_log(self):
- self.log_file = StringIO.StringIO()
- self._log_handler = logging.StreamHandler(self.log_file)
- self._log_handler.setLevel(logging.DEBUG)
- self._log = logging.getLogger(cloudinit.logger_name)
+ MockerTestCase.tearDown(self)
+ if self._log_handler and self._log:
+ self._log.removeHandler(self._log_handler)
+
+ def capture_log(self, lvl=logging.DEBUG):
+ log_file = StringIO.StringIO()
+ self._log_handler = logging.StreamHandler(log_file)
+ self._log_handler.setLevel(lvl)
+ self._log = log.getLogger()
self._log.addHandler(self._log_handler)
+ return log_file
def test_unhandled_type_warning(self):
"""Raw text without magic is ignored but shows warning"""
+ ci = stages.Init()
+ data = "arbitrary text\n"
+ ci.datasource = FakeDataSource(data)
+
+ self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
self.mocker.replay()
- ci = cloudinit.CloudInit()
- ci.datasource = FakeDataSource("arbitrary text\n")
- ci.consume_userdata()
- self.assertEqual(
- "Unhandled non-multipart userdata starting 'arbitrary text...'\n",
- self.log_file.getvalue())
+
+ log_file = self.capture_log(logging.WARNING)
+ ci.fetch()
+ ci.consume()
+ self.assertIn(
+ "Unhandled non-multipart (text/x-not-multipart) userdata:",
+ log_file.getvalue())
def test_mime_text_plain(self):
- """Mime message of type text/plain is ignored without warning"""
- self.mocker.replay()
- ci = cloudinit.CloudInit()
+ """Mime message of type text/plain is ignored but shows warning"""
+ ci = stages.Init()
message = MIMEBase("text", "plain")
message.set_payload("Just text")
ci.datasource = FakeDataSource(message.as_string())
- ci.consume_userdata()
- self.assertEqual("", self.log_file.getvalue())
+ self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
+ self.mocker.replay()
+
+ log_file = self.capture_log(logging.WARNING)
+ ci.fetch()
+ ci.consume()
+ self.assertIn(
+ "Unhandled unknown content-type (text/plain)",
+ log_file.getvalue())
+
+
def test_shellscript(self):
"""Raw text starting #!/bin/sh is treated as script"""
+ ci = stages.Init()
script = "#!/bin/sh\necho hello\n"
- outpath = cloudinit.get_ipath_cur("scripts") + "/part-001"
+ ci.datasource = FakeDataSource(script)
+
+ outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
+ self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
self.mock_write(outpath, script, 0700)
self.mocker.replay()
- ci = cloudinit.CloudInit()
- ci.datasource = FakeDataSource(script)
- ci.consume_userdata()
- self.assertEqual("", self.log_file.getvalue())
+
+ log_file = self.capture_log(logging.WARNING)
+ ci.fetch()
+ ci.consume()
+ self.assertEqual("", log_file.getvalue())
def test_mime_text_x_shellscript(self):
"""Mime message of type text/x-shellscript is treated as script"""
+ ci = stages.Init()
script = "#!/bin/sh\necho hello\n"
- outpath = cloudinit.get_ipath_cur("scripts") + "/part-001"
- self.mock_write(outpath, script, 0700)
- self.mocker.replay()
- ci = cloudinit.CloudInit()
message = MIMEBase("text", "x-shellscript")
message.set_payload(script)
ci.datasource = FakeDataSource(message.as_string())
- ci.consume_userdata()
- self.assertEqual("", self.log_file.getvalue())
+
+ outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
+ self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
+ self.mock_write(outpath, script, 0700)
+ self.mocker.replay()
+
+ log_file = self.capture_log(logging.WARNING)
+ ci.fetch()
+ ci.consume()
+ self.assertEqual("", log_file.getvalue())
def test_mime_text_plain_shell(self):
"""Mime type text/plain starting #!/bin/sh is treated as script"""
+ ci = stages.Init()
script = "#!/bin/sh\necho hello\n"
- outpath = cloudinit.get_ipath_cur("scripts") + "/part-001"
- self.mock_write(outpath, script, 0700)
- self.mocker.replay()
- ci = cloudinit.CloudInit()
message = MIMEBase("text", "plain")
message.set_payload(script)
ci.datasource = FakeDataSource(message.as_string())
- ci.consume_userdata()
- self.assertEqual("", self.log_file.getvalue())
+
+ outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
+ self.mock_write(outpath, script, 0700)
+ self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
+ self.mocker.replay()
+
+ log_file = self.capture_log(logging.WARNING)
+ ci.fetch()
+ ci.consume()
+ self.assertEqual("", log_file.getvalue())