diff options
Diffstat (limited to 'tests/unittests/test_data.py')
-rw-r--r-- | tests/unittests/test_data.py | 143 |
1 files changed, 79 insertions, 64 deletions
diff --git a/tests/unittests/test_data.py b/tests/unittests/test_data.py index fd6bd8a1..7598837e 100644 --- a/tests/unittests/test_data.py +++ b/tests/unittests/test_data.py @@ -1,10 +1,17 @@ """Tests for handling of userdata within cloud init.""" -import StringIO - import gzip import logging import os +import shutil +import tempfile + +try: + from unittest import mock +except ImportError: + import mock + +from six import BytesIO, StringIO from email.mime.application import MIMEApplication from email.mime.base import MIMEBase @@ -43,17 +50,16 @@ class TestConsumeUserData(helpers.FilesystemMockingTestCase): self._log_handler = None def tearDown(self): - helpers.FilesystemMockingTestCase.tearDown(self) if self._log_handler and self._log: self._log.removeHandler(self._log_handler) + helpers.FilesystemMockingTestCase.tearDown(self) def _patchIn(self, root): - self.restore() self.patchOS(root) self.patchUtils(root) def capture_log(self, lvl=logging.DEBUG): - log_file = StringIO.StringIO() + log_file = StringIO() self._log_handler = logging.StreamHandler(log_file) self._log_handler.setLevel(lvl) self._log = log.getLogger() @@ -71,7 +77,8 @@ class TestConsumeUserData(helpers.FilesystemMockingTestCase): ci = stages.Init() ci.datasource = FakeDataSource(blob) - new_root = self.makeDir() + new_root = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, new_root) self.patchUtils(new_root) self.patchOS(new_root) ci.fetch() @@ -99,7 +106,8 @@ class TestConsumeUserData(helpers.FilesystemMockingTestCase): { "op": "add", "path": "/foo", "value": "quxC" } ] ''' - new_root = self.makeDir() + new_root = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, new_root) self._patchIn(new_root) initer = stages.Init() initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob) @@ -138,7 +146,8 @@ class TestConsumeUserData(helpers.FilesystemMockingTestCase): { "op": "add", "path": "/foo", "value": "quxC" } ] ''' - new_root = self.makeDir() + new_root = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, new_root) self._patchIn(new_root) initer = stages.Init() initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob) @@ -184,7 +193,8 @@ c: d ci = stages.Init() ci.datasource = FakeDataSource(str(message)) - new_root = self.makeDir() + new_root = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, new_root) self.patchUtils(new_root) self.patchOS(new_root) ci.fetch() @@ -214,7 +224,8 @@ name: user run: - z ''' - new_root = self.makeDir() + new_root = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, new_root) self._patchIn(new_root) initer = stages.Init() initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob) @@ -249,7 +260,8 @@ vendor_data: enabled: True prefix: /bin/true ''' - new_root = self.makeDir() + new_root = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, new_root) self._patchIn(new_root) initer = stages.Init() initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob) @@ -309,7 +321,8 @@ p: 1 paths = c_helpers.Paths({}, ds=FakeDataSource('')) cloud_cfg = handlers.cloud_config.CloudConfigPartHandler(paths) - new_root = self.makeDir() + new_root = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, new_root) self.patchUtils(new_root) self.patchOS(new_root) cloud_cfg.handle_part(None, handlers.CONTENT_START, None, None, None, @@ -335,25 +348,25 @@ p: 1 data = "arbitrary text\n" ci.datasource = FakeDataSource(data) - mock_write = self.mocker.replace("cloudinit.util.write_file", - passthrough=False) - mock_write(ci.paths.get_ipath("cloud_config"), "", 0600) - self.mocker.replay() + with mock.patch('cloudinit.util.write_file') as mockobj: + log_file = self.capture_log(logging.WARNING) + ci.fetch() + ci.consume_data() + self.assertIn( + "Unhandled non-multipart (text/x-not-multipart) userdata:", + log_file.getvalue()) + + mockobj.assert_called_once_with( + ci.paths.get_ipath("cloud_config"), "", 0o600) - log_file = self.capture_log(logging.WARNING) - ci.fetch() - ci.consume_data() - self.assertIn( - "Unhandled non-multipart (text/x-not-multipart) userdata:", - log_file.getvalue()) def test_mime_gzip_compressed(self): """Tests that individual message gzip encoding works.""" def gzip_part(text): - contents = StringIO.StringIO() - f = gzip.GzipFile(fileobj=contents, mode='w') - f.write(str(text)) + contents = BytesIO() + f = gzip.GzipFile(fileobj=contents, mode='wb') + f.write(util.encode_text(text)) f.flush() f.close() return MIMEApplication(contents.getvalue(), 'gzip') @@ -374,7 +387,8 @@ c: 4 message.attach(gzip_part(base_content2)) ci = stages.Init() ci.datasource = FakeDataSource(str(message)) - new_root = self.makeDir() + new_root = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, new_root) self.patchUtils(new_root) self.patchOS(new_root) ci.fetch() @@ -394,17 +408,15 @@ c: 4 message.set_payload("Just text") ci.datasource = FakeDataSource(message.as_string()) - mock_write = self.mocker.replace("cloudinit.util.write_file", - passthrough=False) - mock_write(ci.paths.get_ipath("cloud_config"), "", 0600) - self.mocker.replay() - - log_file = self.capture_log(logging.WARNING) - ci.fetch() - ci.consume_data() - self.assertIn( - "Unhandled unknown content-type (text/plain)", - log_file.getvalue()) + with mock.patch('cloudinit.util.write_file') as mockobj: + log_file = self.capture_log(logging.WARNING) + ci.fetch() + ci.consume_data() + self.assertIn( + "Unhandled unknown content-type (text/plain)", + log_file.getvalue()) + mockobj.assert_called_once_with( + ci.paths.get_ipath("cloud_config"), "", 0o600) def test_shellscript(self): """Raw text starting #!/bin/sh is treated as script.""" @@ -413,16 +425,17 @@ c: 4 ci.datasource = FakeDataSource(script) outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001") - mock_write = self.mocker.replace("cloudinit.util.write_file", - passthrough=False) - mock_write(ci.paths.get_ipath("cloud_config"), "", 0600) - mock_write(outpath, script, 0700) - self.mocker.replay() - log_file = self.capture_log(logging.WARNING) - ci.fetch() - ci.consume_data() - self.assertEqual("", log_file.getvalue()) + with mock.patch('cloudinit.util.write_file') as mockobj: + log_file = self.capture_log(logging.WARNING) + ci.fetch() + ci.consume_data() + self.assertEqual("", log_file.getvalue()) + + mockobj.assert_has_calls([ + mock.call(outpath, script, 0o700), + mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600), + ]) def test_mime_text_x_shellscript(self): """Mime message of type text/x-shellscript is treated as script.""" @@ -433,16 +446,17 @@ c: 4 ci.datasource = FakeDataSource(message.as_string()) outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001") - mock_write = self.mocker.replace("cloudinit.util.write_file", - passthrough=False) - mock_write(ci.paths.get_ipath("cloud_config"), "", 0600) - mock_write(outpath, script, 0700) - self.mocker.replay() - log_file = self.capture_log(logging.WARNING) - ci.fetch() - ci.consume_data() - self.assertEqual("", log_file.getvalue()) + with mock.patch('cloudinit.util.write_file') as mockobj: + log_file = self.capture_log(logging.WARNING) + ci.fetch() + ci.consume_data() + self.assertEqual("", log_file.getvalue()) + + mockobj.assert_has_calls([ + mock.call(outpath, script, 0o700), + mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600), + ]) def test_mime_text_plain_shell(self): """Mime type text/plain starting #!/bin/sh is treated as script.""" @@ -453,13 +467,14 @@ c: 4 ci.datasource = FakeDataSource(message.as_string()) outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001") - mock_write = self.mocker.replace("cloudinit.util.write_file", - passthrough=False) - mock_write(outpath, script, 0700) - mock_write(ci.paths.get_ipath("cloud_config"), "", 0600) - self.mocker.replay() - log_file = self.capture_log(logging.WARNING) - ci.fetch() - ci.consume_data() - self.assertEqual("", log_file.getvalue()) + with mock.patch('cloudinit.util.write_file') as mockobj: + log_file = self.capture_log(logging.WARNING) + ci.fetch() + ci.consume_data() + self.assertEqual("", log_file.getvalue()) + + mockobj.assert_has_calls([ + mock.call(outpath, script, 0o700), + mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600), + ]) |