summaryrefslogtreecommitdiff
path: root/tests/unittests/test_util.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/test_util.py')
-rw-r--r--tests/unittests/test_util.py143
1 files changed, 110 insertions, 33 deletions
diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py
index 3e079131..33c191a9 100644
--- a/tests/unittests/test_util.py
+++ b/tests/unittests/test_util.py
@@ -1,13 +1,21 @@
+from __future__ import print_function
+
+import logging
import os
+import shutil
import stat
+import tempfile
+
+import six
import yaml
-from mocker import MockerTestCase
+from cloudinit import importer, util
from . import helpers
-import unittest
-from cloudinit import importer
-from cloudinit import util
+try:
+ from unittest import mock
+except ImportError:
+ import mock
class FakeSelinux(object):
@@ -29,7 +37,7 @@ class FakeSelinux(object):
self.restored.append(path)
-class TestGetCfgOptionListOrStr(unittest.TestCase):
+class TestGetCfgOptionListOrStr(helpers.TestCase):
def test_not_found_no_default(self):
"""None is returned if key is not found and no default given."""
config = {}
@@ -61,10 +69,11 @@ class TestGetCfgOptionListOrStr(unittest.TestCase):
self.assertEqual([], result)
-class TestWriteFile(MockerTestCase):
+class TestWriteFile(helpers.TestCase):
def setUp(self):
super(TestWriteFile, self).setUp()
- self.tmp = self.makeDir(prefix="unittest_")
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
def test_basic_usage(self):
"""Verify basic usage with default args."""
@@ -79,7 +88,7 @@ class TestWriteFile(MockerTestCase):
create_contents = f.read()
self.assertEqual(contents, create_contents)
file_stat = os.stat(path)
- self.assertEqual(0644, stat.S_IMODE(file_stat.st_mode))
+ self.assertEqual(0o644, stat.S_IMODE(file_stat.st_mode))
def test_dir_is_created_if_required(self):
"""Verifiy that directories are created is required."""
@@ -97,12 +106,12 @@ class TestWriteFile(MockerTestCase):
path = os.path.join(self.tmp, "NewFile.txt")
contents = "Hey there"
- util.write_file(path, contents, mode=0666)
+ util.write_file(path, contents, mode=0o666)
self.assertTrue(os.path.exists(path))
self.assertTrue(os.path.isfile(path))
file_stat = os.stat(path)
- self.assertEqual(0666, stat.S_IMODE(file_stat.st_mode))
+ self.assertEqual(0o666, stat.S_IMODE(file_stat.st_mode))
def test_custom_omode(self):
"""Verify custom omode works properly."""
@@ -111,7 +120,7 @@ class TestWriteFile(MockerTestCase):
# Create file first with basic content
with open(path, "wb") as f:
- f.write("LINE1\n")
+ f.write(b"LINE1\n")
util.write_file(path, contents, omode="a")
self.assertTrue(os.path.exists(path))
@@ -126,23 +135,24 @@ class TestWriteFile(MockerTestCase):
with open(my_file, "w") as fp:
fp.write("My Content")
- import_mock = self.mocker.replace(importer.import_module,
- passthrough=False)
- import_mock('selinux')
-
fake_se = FakeSelinux(my_file)
- self.mocker.result(fake_se)
- self.mocker.replay()
- with util.SeLinuxGuard(my_file) as is_on:
- self.assertTrue(is_on)
+
+ with mock.patch.object(importer, 'import_module',
+ return_value=fake_se) as mockobj:
+ with util.SeLinuxGuard(my_file) as is_on:
+ self.assertTrue(is_on)
+
self.assertEqual(1, len(fake_se.restored))
self.assertEqual(my_file, fake_se.restored[0])
+ mockobj.assert_called_once_with('selinux')
-class TestDeleteDirContents(MockerTestCase):
+
+class TestDeleteDirContents(helpers.TestCase):
def setUp(self):
super(TestDeleteDirContents, self).setUp()
- self.tmp = self.makeDir(prefix="unittest_")
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
def assertDirEmpty(self, dirname):
self.assertEqual([], os.listdir(dirname))
@@ -157,7 +167,7 @@ class TestDeleteDirContents(MockerTestCase):
def test_deletes_files(self):
"""Single file should be deleted."""
with open(os.path.join(self.tmp, "new_file.txt"), "wb") as f:
- f.write("DELETE ME")
+ f.write(b"DELETE ME")
util.delete_dir_contents(self.tmp)
@@ -185,7 +195,7 @@ class TestDeleteDirContents(MockerTestCase):
os.mkdir(os.path.join(self.tmp, "new_dir"))
f_name = os.path.join(self.tmp, "new_dir", "new_file.txt")
with open(f_name, "wb") as f:
- f.write("DELETE ME")
+ f.write(b"DELETE ME")
util.delete_dir_contents(self.tmp)
@@ -196,7 +206,7 @@ class TestDeleteDirContents(MockerTestCase):
file_name = os.path.join(self.tmp, "new_file.txt")
link_name = os.path.join(self.tmp, "new_file_link.txt")
with open(file_name, "wb") as f:
- f.write("DELETE ME")
+ f.write(b"DELETE ME")
os.symlink(file_name, link_name)
util.delete_dir_contents(self.tmp)
@@ -204,20 +214,20 @@ class TestDeleteDirContents(MockerTestCase):
self.assertDirEmpty(self.tmp)
-class TestKeyValStrings(unittest.TestCase):
+class TestKeyValStrings(helpers.TestCase):
def test_keyval_str_to_dict(self):
expected = {'1': 'one', '2': 'one+one', 'ro': True}
cmdline = "1=one ro 2=one+one"
self.assertEqual(expected, util.keyval_str_to_dict(cmdline))
-class TestGetCmdline(unittest.TestCase):
+class TestGetCmdline(helpers.TestCase):
def test_cmdline_reads_debug_env(self):
os.environ['DEBUG_PROC_CMDLINE'] = 'abcd 123'
self.assertEqual(os.environ['DEBUG_PROC_CMDLINE'], util.get_cmdline())
-class TestLoadYaml(unittest.TestCase):
+class TestLoadYaml(helpers.TestCase):
mydefault = "7b03a8ebace993d806255121073fed52"
def test_simple(self):
@@ -246,8 +256,8 @@ class TestLoadYaml(unittest.TestCase):
self.mydefault)
def test_python_unicode(self):
- # complex type of python/unicde is explicitly allowed
- myobj = {'1': unicode("FOOBAR")}
+ # complex type of python/unicode is explicitly allowed
+ myobj = {'1': six.text_type("FOOBAR")}
safe_yaml = yaml.dump(myobj)
self.assertEqual(util.load_yaml(blob=safe_yaml,
default=self.mydefault),
@@ -314,17 +324,17 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase):
class TestReadDMIData(helpers.FilesystemMockingTestCase):
def _patchIn(self, root):
- self.restore()
self.patchOS(root)
self.patchUtils(root)
def _write_key(self, key, content):
"""Mocks the sys path found on Linux systems."""
- new_root = self.makeDir()
+ new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, new_root)
self._patchIn(new_root)
util.ensure_dir(os.path.join('sys', 'class', 'dmi', 'id'))
- dmi_key = "/sys/class/dmi/id/{}".format(key)
+ dmi_key = "/sys/class/dmi/id/{0}".format(key)
util.write_file(dmi_key, content)
def _no_syspath(self, key, content):
@@ -332,7 +342,8 @@ class TestReadDMIData(helpers.FilesystemMockingTestCase):
In order to test a missing sys path and call outs to dmidecode, this
function fakes the results of dmidecode to test the results.
"""
- new_root = self.makeDir()
+ new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, new_root)
self._patchIn(new_root)
self.real_which = util.which
self.real_subp = util.subp
@@ -366,4 +377,70 @@ class TestReadDMIData(helpers.FilesystemMockingTestCase):
self.assertFalse(None, util.read_dmi_data("key"))
+class TestMultiLog(helpers.FilesystemMockingTestCase):
+
+ def _createConsole(self, root):
+ os.mkdir(os.path.join(root, 'dev'))
+ open(os.path.join(root, 'dev', 'console'), 'a').close()
+
+ def setUp(self):
+ super(TestMultiLog, self).setUp()
+ self.root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.root)
+ self.patchOS(self.root)
+ self.patchUtils(self.root)
+ self.patchOpen(self.root)
+ self.stdout = six.StringIO()
+ self.stderr = six.StringIO()
+ self.patchStdoutAndStderr(self.stdout, self.stderr)
+
+ def test_stderr_used_by_default(self):
+ logged_string = 'test stderr output'
+ util.multi_log(logged_string)
+ self.assertEqual(logged_string, self.stderr.getvalue())
+
+ def test_stderr_not_used_if_false(self):
+ util.multi_log('should not see this', stderr=False)
+ self.assertEqual('', self.stderr.getvalue())
+
+ def test_logs_go_to_console_by_default(self):
+ self._createConsole(self.root)
+ logged_string = 'something very important'
+ util.multi_log(logged_string)
+ self.assertEqual(logged_string, open('/dev/console').read())
+
+ def test_logs_dont_go_to_stdout_if_console_exists(self):
+ self._createConsole(self.root)
+ util.multi_log('something')
+ self.assertEqual('', self.stdout.getvalue())
+
+ def test_logs_go_to_stdout_if_console_does_not_exist(self):
+ logged_string = 'something very important'
+ util.multi_log(logged_string)
+ self.assertEqual(logged_string, self.stdout.getvalue())
+
+ def test_logs_go_to_log_if_given(self):
+ log = mock.MagicMock()
+ logged_string = 'something very important'
+ util.multi_log(logged_string, log=log)
+ self.assertEqual([((mock.ANY, logged_string), {})],
+ log.log.call_args_list)
+
+ def test_newlines_stripped_from_log_call(self):
+ log = mock.MagicMock()
+ expected_string = 'something very important'
+ util.multi_log('{0}\n'.format(expected_string), log=log)
+ self.assertEqual((mock.ANY, expected_string), log.log.call_args[0])
+
+ def test_log_level_defaults_to_debug(self):
+ log = mock.MagicMock()
+ util.multi_log('message', log=log)
+ self.assertEqual((logging.DEBUG, mock.ANY), log.log.call_args[0])
+
+ def test_given_log_level_used(self):
+ log = mock.MagicMock()
+ log_level = mock.Mock()
+ util.multi_log('message', log=log, log_level=log_level)
+ self.assertEqual((log_level, mock.ANY), log.log.call_args[0])
+
# vi: ts=4 expandtab