summaryrefslogtreecommitdiff
path: root/tests/unittests/test_data.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/test_data.py')
-rw-r--r--tests/unittests/test_data.py241
1 files changed, 169 insertions, 72 deletions
diff --git a/tests/unittests/test_data.py b/tests/unittests/test_data.py
index fd6bd8a1..9c1ec1d4 100644
--- a/tests/unittests/test_data.py
+++ b/tests/unittests/test_data.py
@@ -1,11 +1,19 @@
"""Tests for handling of userdata within cloud init."""
-import StringIO
-
import gzip
import logging
import os
+import shutil
+import tempfile
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+
+from six import BytesIO, StringIO
+from email import encoders
from email.mime.application import MIMEApplication
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
@@ -16,13 +24,15 @@ from cloudinit import log
from cloudinit.settings import (PER_INSTANCE)
from cloudinit import sources
from cloudinit import stages
+from cloudinit import user_data as ud
from cloudinit import util
-INSTANCE_ID = "i-testing"
-
from . import helpers
+INSTANCE_ID = "i-testing"
+
+
class FakeDataSource(sources.DataSource):
def __init__(self, userdata=None, vendordata=None):
@@ -32,28 +42,45 @@ class FakeDataSource(sources.DataSource):
self.vendordata_raw = vendordata
+def count_messages(root):
+ am = 0
+ for m in root.walk():
+ if ud.is_skippable(m):
+ continue
+ am += 1
+ return am
+
+
+def gzip_text(text):
+ contents = BytesIO()
+ f = gzip.GzipFile(fileobj=contents, mode='wb')
+ f.write(util.encode_text(text))
+ f.flush()
+ f.close()
+ return contents.getvalue()
+
+
# FIXME: these tests shouldn't be checking log output??
# Weirddddd...
class TestConsumeUserData(helpers.FilesystemMockingTestCase):
def setUp(self):
- helpers.FilesystemMockingTestCase.setUp(self)
+ super(TestConsumeUserData, self).setUp()
self._log = None
self._log_file = None
self._log_handler = None
def tearDown(self):
- helpers.FilesystemMockingTestCase.tearDown(self)
if self._log_handler and self._log:
self._log.removeHandler(self._log_handler)
+ helpers.FilesystemMockingTestCase.tearDown(self)
def _patchIn(self, root):
- self.restore()
self.patchOS(root)
self.patchUtils(root)
def capture_log(self, lvl=logging.DEBUG):
- log_file = StringIO.StringIO()
+ log_file = StringIO()
self._log_handler = logging.StreamHandler(log_file)
self._log_handler.setLevel(lvl)
self._log = log.getLogger()
@@ -71,7 +98,8 @@ class TestConsumeUserData(helpers.FilesystemMockingTestCase):
ci = stages.Init()
ci.datasource = FakeDataSource(blob)
- new_root = self.makeDir()
+ new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, new_root)
self.patchUtils(new_root)
self.patchOS(new_root)
ci.fetch()
@@ -99,7 +127,8 @@ class TestConsumeUserData(helpers.FilesystemMockingTestCase):
{ "op": "add", "path": "/foo", "value": "quxC" }
]
'''
- new_root = self.makeDir()
+ new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, new_root)
self._patchIn(new_root)
initer = stages.Init()
initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob)
@@ -138,7 +167,8 @@ class TestConsumeUserData(helpers.FilesystemMockingTestCase):
{ "op": "add", "path": "/foo", "value": "quxC" }
]
'''
- new_root = self.makeDir()
+ new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, new_root)
self._patchIn(new_root)
initer = stages.Init()
initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob)
@@ -184,7 +214,8 @@ c: d
ci = stages.Init()
ci.datasource = FakeDataSource(str(message))
- new_root = self.makeDir()
+ new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, new_root)
self.patchUtils(new_root)
self.patchOS(new_root)
ci.fetch()
@@ -214,7 +245,8 @@ name: user
run:
- z
'''
- new_root = self.makeDir()
+ new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, new_root)
self._patchIn(new_root)
initer = stages.Init()
initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob)
@@ -249,7 +281,8 @@ vendor_data:
enabled: True
prefix: /bin/true
'''
- new_root = self.makeDir()
+ new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, new_root)
self._patchIn(new_root)
initer = stages.Init()
initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob)
@@ -309,7 +342,8 @@ p: 1
paths = c_helpers.Paths({}, ds=FakeDataSource(''))
cloud_cfg = handlers.cloud_config.CloudConfigPartHandler(paths)
- new_root = self.makeDir()
+ new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, new_root)
self.patchUtils(new_root)
self.patchOS(new_root)
cloud_cfg.handle_part(None, handlers.CONTENT_START, None, None, None,
@@ -335,28 +369,22 @@ p: 1
data = "arbitrary text\n"
ci.datasource = FakeDataSource(data)
- mock_write = self.mocker.replace("cloudinit.util.write_file",
- passthrough=False)
- mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
- self.mocker.replay()
+ with mock.patch('cloudinit.util.write_file') as mockobj:
+ log_file = self.capture_log(logging.WARNING)
+ ci.fetch()
+ ci.consume_data()
+ self.assertIn(
+ "Unhandled non-multipart (text/x-not-multipart) userdata:",
+ log_file.getvalue())
- log_file = self.capture_log(logging.WARNING)
- ci.fetch()
- ci.consume_data()
- self.assertIn(
- "Unhandled non-multipart (text/x-not-multipart) userdata:",
- log_file.getvalue())
+ mockobj.assert_called_once_with(
+ ci.paths.get_ipath("cloud_config"), "", 0o600)
def test_mime_gzip_compressed(self):
"""Tests that individual message gzip encoding works."""
def gzip_part(text):
- contents = StringIO.StringIO()
- f = gzip.GzipFile(fileobj=contents, mode='w')
- f.write(str(text))
- f.flush()
- f.close()
- return MIMEApplication(contents.getvalue(), 'gzip')
+ return MIMEApplication(gzip_text(text), 'gzip')
base_content1 = '''
#cloud-config
@@ -374,7 +402,8 @@ c: 4
message.attach(gzip_part(base_content2))
ci = stages.Init()
ci.datasource = FakeDataSource(str(message))
- new_root = self.makeDir()
+ new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, new_root)
self.patchUtils(new_root)
self.patchOS(new_root)
ci.fetch()
@@ -392,19 +421,17 @@ c: 4
ci = stages.Init()
message = MIMEBase("text", "plain")
message.set_payload("Just text")
- ci.datasource = FakeDataSource(message.as_string())
-
- mock_write = self.mocker.replace("cloudinit.util.write_file",
- passthrough=False)
- mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
- self.mocker.replay()
-
- log_file = self.capture_log(logging.WARNING)
- ci.fetch()
- ci.consume_data()
- self.assertIn(
- "Unhandled unknown content-type (text/plain)",
- log_file.getvalue())
+ ci.datasource = FakeDataSource(message.as_string().encode())
+
+ with mock.patch('cloudinit.util.write_file') as mockobj:
+ log_file = self.capture_log(logging.WARNING)
+ ci.fetch()
+ ci.consume_data()
+ self.assertIn(
+ "Unhandled unknown content-type (text/plain)",
+ log_file.getvalue())
+ mockobj.assert_called_once_with(
+ ci.paths.get_ipath("cloud_config"), "", 0o600)
def test_shellscript(self):
"""Raw text starting #!/bin/sh is treated as script."""
@@ -413,16 +440,17 @@ c: 4
ci.datasource = FakeDataSource(script)
outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
- mock_write = self.mocker.replace("cloudinit.util.write_file",
- passthrough=False)
- mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
- mock_write(outpath, script, 0700)
- self.mocker.replay()
- log_file = self.capture_log(logging.WARNING)
- ci.fetch()
- ci.consume_data()
- self.assertEqual("", log_file.getvalue())
+ with mock.patch('cloudinit.util.write_file') as mockobj:
+ log_file = self.capture_log(logging.WARNING)
+ ci.fetch()
+ ci.consume_data()
+ self.assertEqual("", log_file.getvalue())
+
+ mockobj.assert_has_calls([
+ mock.call(outpath, script, 0o700),
+ mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600),
+ ])
def test_mime_text_x_shellscript(self):
"""Mime message of type text/x-shellscript is treated as script."""
@@ -433,16 +461,17 @@ c: 4
ci.datasource = FakeDataSource(message.as_string())
outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
- mock_write = self.mocker.replace("cloudinit.util.write_file",
- passthrough=False)
- mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
- mock_write(outpath, script, 0700)
- self.mocker.replay()
- log_file = self.capture_log(logging.WARNING)
- ci.fetch()
- ci.consume_data()
- self.assertEqual("", log_file.getvalue())
+ with mock.patch('cloudinit.util.write_file') as mockobj:
+ log_file = self.capture_log(logging.WARNING)
+ ci.fetch()
+ ci.consume_data()
+ self.assertEqual("", log_file.getvalue())
+
+ mockobj.assert_has_calls([
+ mock.call(outpath, script, 0o700),
+ mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600),
+ ])
def test_mime_text_plain_shell(self):
"""Mime type text/plain starting #!/bin/sh is treated as script."""
@@ -453,13 +482,81 @@ c: 4
ci.datasource = FakeDataSource(message.as_string())
outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
- mock_write = self.mocker.replace("cloudinit.util.write_file",
- passthrough=False)
- mock_write(outpath, script, 0700)
- mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
- self.mocker.replay()
- log_file = self.capture_log(logging.WARNING)
- ci.fetch()
- ci.consume_data()
- self.assertEqual("", log_file.getvalue())
+ with mock.patch('cloudinit.util.write_file') as mockobj:
+ log_file = self.capture_log(logging.WARNING)
+ ci.fetch()
+ ci.consume_data()
+ self.assertEqual("", log_file.getvalue())
+
+ mockobj.assert_has_calls([
+ mock.call(outpath, script, 0o700),
+ mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600),
+ ])
+
+ def test_mime_application_octet_stream(self):
+ """Mime type application/octet-stream is ignored but shows warning."""
+ ci = stages.Init()
+ message = MIMEBase("application", "octet-stream")
+ message.set_payload(b'\xbf\xe6\xb2\xc3\xd3\xba\x13\xa4\xd8\xa1\xcc')
+ encoders.encode_base64(message)
+ ci.datasource = FakeDataSource(message.as_string().encode())
+
+ with mock.patch('cloudinit.util.write_file') as mockobj:
+ log_file = self.capture_log(logging.WARNING)
+ ci.fetch()
+ ci.consume_data()
+ self.assertIn(
+ "Unhandled unknown content-type (application/octet-stream)",
+ log_file.getvalue())
+ mockobj.assert_called_once_with(
+ ci.paths.get_ipath("cloud_config"), "", 0o600)
+
+ def test_cloud_config_archive(self):
+ non_decodable = b'\x11\xc9\xb4gTH\xee\x12'
+ data = [{'content': '#cloud-config\npassword: gocubs\n'},
+ {'content': '#cloud-config\nlocale: chicago\n'},
+ {'content': non_decodable}]
+ message = b'#cloud-config-archive\n' + util.yaml_dumps(data).encode()
+
+ ci = stages.Init()
+ ci.datasource = FakeDataSource(message)
+
+ fs = {}
+
+ def fsstore(filename, content, mode=0o0644, omode="wb"):
+ fs[filename] = content
+
+ # consuming the user-data provided should write 'cloud_config' file
+ # which will have our yaml in it.
+ with mock.patch('cloudinit.util.write_file') as mockobj:
+ mockobj.side_effect = fsstore
+ ci.fetch()
+ ci.consume_data()
+
+ cfg = util.load_yaml(fs[ci.paths.get_ipath("cloud_config")])
+ self.assertEqual(cfg.get('password'), 'gocubs')
+ self.assertEqual(cfg.get('locale'), 'chicago')
+
+
+class TestUDProcess(helpers.ResourceUsingTestCase):
+
+ def test_bytes_in_userdata(self):
+ msg = b'#cloud-config\napt_update: True\n'
+ ud_proc = ud.UserDataProcessor(self.getCloudPaths())
+ message = ud_proc.process(msg)
+ self.assertTrue(count_messages(message) == 1)
+
+ def test_string_in_userdata(self):
+ msg = '#cloud-config\napt_update: True\n'
+
+ ud_proc = ud.UserDataProcessor(self.getCloudPaths())
+ message = ud_proc.process(msg)
+ self.assertTrue(count_messages(message) == 1)
+
+ def test_compressed_in_userdata(self):
+ msg = gzip_text('#cloud-config\napt_update: True\n')
+
+ ud_proc = ud.UserDataProcessor(self.getCloudPaths())
+ message = ud_proc.process(msg)
+ self.assertTrue(count_messages(message) == 1)