summaryrefslogtreecommitdiff
path: root/tests/unittests/test_util.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/test_util.py')
-rw-r--r--tests/unittests/test_util.py234
1 files changed, 205 insertions, 29 deletions
diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py
index 35e92445..37a984ac 100644
--- a/tests/unittests/test_util.py
+++ b/tests/unittests/test_util.py
@@ -1,13 +1,21 @@
+from __future__ import print_function
+
+import logging
import os
+import shutil
import stat
+import tempfile
+
+import six
import yaml
-from mocker import MockerTestCase
+from cloudinit import importer, util
from . import helpers
-import unittest
-from cloudinit import importer
-from cloudinit import util
+try:
+ from unittest import mock
+except ImportError:
+ import mock
class FakeSelinux(object):
@@ -29,7 +37,7 @@ class FakeSelinux(object):
self.restored.append(path)
-class TestGetCfgOptionListOrStr(unittest.TestCase):
+class TestGetCfgOptionListOrStr(helpers.TestCase):
def test_not_found_no_default(self):
"""None is returned if key is not found and no default given."""
config = {}
@@ -61,10 +69,11 @@ class TestGetCfgOptionListOrStr(unittest.TestCase):
self.assertEqual([], result)
-class TestWriteFile(MockerTestCase):
+class TestWriteFile(helpers.TestCase):
def setUp(self):
super(TestWriteFile, self).setUp()
- self.tmp = self.makeDir(prefix="unittest_")
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
def test_basic_usage(self):
"""Verify basic usage with default args."""
@@ -79,7 +88,7 @@ class TestWriteFile(MockerTestCase):
create_contents = f.read()
self.assertEqual(contents, create_contents)
file_stat = os.stat(path)
- self.assertEqual(0644, stat.S_IMODE(file_stat.st_mode))
+ self.assertEqual(0o644, stat.S_IMODE(file_stat.st_mode))
def test_dir_is_created_if_required(self):
"""Verifiy that directories are created is required."""
@@ -97,12 +106,12 @@ class TestWriteFile(MockerTestCase):
path = os.path.join(self.tmp, "NewFile.txt")
contents = "Hey there"
- util.write_file(path, contents, mode=0666)
+ util.write_file(path, contents, mode=0o666)
self.assertTrue(os.path.exists(path))
self.assertTrue(os.path.isfile(path))
file_stat = os.stat(path)
- self.assertEqual(0666, stat.S_IMODE(file_stat.st_mode))
+ self.assertEqual(0o666, stat.S_IMODE(file_stat.st_mode))
def test_custom_omode(self):
"""Verify custom omode works properly."""
@@ -111,7 +120,7 @@ class TestWriteFile(MockerTestCase):
# Create file first with basic content
with open(path, "wb") as f:
- f.write("LINE1\n")
+ f.write(b"LINE1\n")
util.write_file(path, contents, omode="a")
self.assertTrue(os.path.exists(path))
@@ -126,23 +135,24 @@ class TestWriteFile(MockerTestCase):
with open(my_file, "w") as fp:
fp.write("My Content")
- import_mock = self.mocker.replace(importer.import_module,
- passthrough=False)
- import_mock('selinux')
-
fake_se = FakeSelinux(my_file)
- self.mocker.result(fake_se)
- self.mocker.replay()
- with util.SeLinuxGuard(my_file) as is_on:
- self.assertTrue(is_on)
+
+ with mock.patch.object(importer, 'import_module',
+ return_value=fake_se) as mockobj:
+ with util.SeLinuxGuard(my_file) as is_on:
+ self.assertTrue(is_on)
+
self.assertEqual(1, len(fake_se.restored))
self.assertEqual(my_file, fake_se.restored[0])
+ mockobj.assert_called_once_with('selinux')
+
-class TestDeleteDirContents(MockerTestCase):
+class TestDeleteDirContents(helpers.TestCase):
def setUp(self):
super(TestDeleteDirContents, self).setUp()
- self.tmp = self.makeDir(prefix="unittest_")
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
def assertDirEmpty(self, dirname):
self.assertEqual([], os.listdir(dirname))
@@ -157,7 +167,7 @@ class TestDeleteDirContents(MockerTestCase):
def test_deletes_files(self):
"""Single file should be deleted."""
with open(os.path.join(self.tmp, "new_file.txt"), "wb") as f:
- f.write("DELETE ME")
+ f.write(b"DELETE ME")
util.delete_dir_contents(self.tmp)
@@ -185,7 +195,7 @@ class TestDeleteDirContents(MockerTestCase):
os.mkdir(os.path.join(self.tmp, "new_dir"))
f_name = os.path.join(self.tmp, "new_dir", "new_file.txt")
with open(f_name, "wb") as f:
- f.write("DELETE ME")
+ f.write(b"DELETE ME")
util.delete_dir_contents(self.tmp)
@@ -196,7 +206,7 @@ class TestDeleteDirContents(MockerTestCase):
file_name = os.path.join(self.tmp, "new_file.txt")
link_name = os.path.join(self.tmp, "new_file_link.txt")
with open(file_name, "wb") as f:
- f.write("DELETE ME")
+ f.write(b"DELETE ME")
os.symlink(file_name, link_name)
util.delete_dir_contents(self.tmp)
@@ -204,20 +214,20 @@ class TestDeleteDirContents(MockerTestCase):
self.assertDirEmpty(self.tmp)
-class TestKeyValStrings(unittest.TestCase):
+class TestKeyValStrings(helpers.TestCase):
def test_keyval_str_to_dict(self):
expected = {'1': 'one', '2': 'one+one', 'ro': True}
cmdline = "1=one ro 2=one+one"
self.assertEqual(expected, util.keyval_str_to_dict(cmdline))
-class TestGetCmdline(unittest.TestCase):
+class TestGetCmdline(helpers.TestCase):
def test_cmdline_reads_debug_env(self):
os.environ['DEBUG_PROC_CMDLINE'] = 'abcd 123'
self.assertEqual(os.environ['DEBUG_PROC_CMDLINE'], util.get_cmdline())
-class TestLoadYaml(unittest.TestCase):
+class TestLoadYaml(helpers.TestCase):
mydefault = "7b03a8ebace993d806255121073fed52"
def test_simple(self):
@@ -246,8 +256,8 @@ class TestLoadYaml(unittest.TestCase):
self.mydefault)
def test_python_unicode(self):
- # complex type of python/unicde is explicitly allowed
- myobj = {'1': unicode("FOOBAR")}
+ # complex type of python/unicode is explicitly allowed
+ myobj = {'1': six.text_type("FOOBAR")}
safe_yaml = yaml.dump(myobj)
self.assertEqual(util.load_yaml(blob=safe_yaml,
default=self.mydefault),
@@ -310,4 +320,170 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase):
expected = ('none', 'tmpfs', '/run/lock')
self.assertEqual(expected, util.parse_mount_info('/run/lock', lines))
+
+class TestReadDMIData(helpers.FilesystemMockingTestCase):
+
+ def setUp(self):
+ super(TestReadDMIData, self).setUp()
+ self.new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.new_root)
+ self.patchOS(self.new_root)
+ self.patchUtils(self.new_root)
+
+ def _create_sysfs_parent_directory(self):
+ util.ensure_dir(os.path.join('sys', 'class', 'dmi', 'id'))
+
+ def _create_sysfs_file(self, key, content):
+ """Mocks the sys path found on Linux systems."""
+ self._create_sysfs_parent_directory()
+ dmi_key = "/sys/class/dmi/id/{0}".format(key)
+ util.write_file(dmi_key, content)
+
+ def _configure_dmidecode_return(self, key, content, error=None):
+ """
+ In order to test a missing sys path and call outs to dmidecode, this
+ function fakes the results of dmidecode to test the results.
+ """
+ def _dmidecode_subp(cmd):
+ if cmd[-1] != key:
+ raise util.ProcessExecutionError()
+ return (content, error)
+
+ self.patched_funcs.enter_context(
+ mock.patch.object(util, 'which', lambda _: True))
+ self.patched_funcs.enter_context(
+ mock.patch.object(util, 'subp', _dmidecode_subp))
+
+ def patch_mapping(self, new_mapping):
+ self.patched_funcs.enter_context(
+ mock.patch('cloudinit.util.DMIDECODE_TO_DMI_SYS_MAPPING',
+ new_mapping))
+
+ def test_sysfs_used_with_key_in_mapping_and_file_on_disk(self):
+ self.patch_mapping({'mapped-key': 'mapped-value'})
+ expected_dmi_value = 'sys-used-correctly'
+ self._create_sysfs_file('mapped-value', expected_dmi_value)
+ self._configure_dmidecode_return('mapped-key', 'wrong-wrong-wrong')
+ self.assertEqual(expected_dmi_value, util.read_dmi_data('mapped-key'))
+
+ def test_dmidecode_used_if_no_sysfs_file_on_disk(self):
+ self.patch_mapping({})
+ self._create_sysfs_parent_directory()
+ expected_dmi_value = 'dmidecode-used'
+ self._configure_dmidecode_return('use-dmidecode', expected_dmi_value)
+ self.assertEqual(expected_dmi_value,
+ util.read_dmi_data('use-dmidecode'))
+
+ def test_none_returned_if_neither_source_has_data(self):
+ self.patch_mapping({})
+ self._configure_dmidecode_return('key', 'value')
+ self.assertEqual(None, util.read_dmi_data('expect-fail'))
+
+ def test_none_returned_if_dmidecode_not_in_path(self):
+ self.patched_funcs.enter_context(
+ mock.patch.object(util, 'which', lambda _: False))
+ self.patch_mapping({})
+ self.assertEqual(None, util.read_dmi_data('expect-fail'))
+
+ def test_dots_returned_instead_of_foxfox(self):
+ # uninitialized dmi values show as \xff, return those as .
+ my_len = 32
+ dmi_value = b'\xff' * my_len + b'\n'
+ expected = ""
+ dmi_key = 'system-product-name'
+ sysfs_key = 'product_name'
+ self._create_sysfs_file(sysfs_key, dmi_value)
+ self.assertEqual(expected, util.read_dmi_data(dmi_key))
+
+
+class TestMultiLog(helpers.FilesystemMockingTestCase):
+
+ def _createConsole(self, root):
+ os.mkdir(os.path.join(root, 'dev'))
+ open(os.path.join(root, 'dev', 'console'), 'a').close()
+
+ def setUp(self):
+ super(TestMultiLog, self).setUp()
+ self.root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.root)
+ self.patchOS(self.root)
+ self.patchUtils(self.root)
+ self.patchOpen(self.root)
+ self.stdout = six.StringIO()
+ self.stderr = six.StringIO()
+ self.patchStdoutAndStderr(self.stdout, self.stderr)
+
+ def test_stderr_used_by_default(self):
+ logged_string = 'test stderr output'
+ util.multi_log(logged_string)
+ self.assertEqual(logged_string, self.stderr.getvalue())
+
+ def test_stderr_not_used_if_false(self):
+ util.multi_log('should not see this', stderr=False)
+ self.assertEqual('', self.stderr.getvalue())
+
+ def test_logs_go_to_console_by_default(self):
+ self._createConsole(self.root)
+ logged_string = 'something very important'
+ util.multi_log(logged_string)
+ self.assertEqual(logged_string, open('/dev/console').read())
+
+ def test_logs_dont_go_to_stdout_if_console_exists(self):
+ self._createConsole(self.root)
+ util.multi_log('something')
+ self.assertEqual('', self.stdout.getvalue())
+
+ def test_logs_go_to_stdout_if_console_does_not_exist(self):
+ logged_string = 'something very important'
+ util.multi_log(logged_string)
+ self.assertEqual(logged_string, self.stdout.getvalue())
+
+ def test_logs_go_to_log_if_given(self):
+ log = mock.MagicMock()
+ logged_string = 'something very important'
+ util.multi_log(logged_string, log=log)
+ self.assertEqual([((mock.ANY, logged_string), {})],
+ log.log.call_args_list)
+
+ def test_newlines_stripped_from_log_call(self):
+ log = mock.MagicMock()
+ expected_string = 'something very important'
+ util.multi_log('{0}\n'.format(expected_string), log=log)
+ self.assertEqual((mock.ANY, expected_string), log.log.call_args[0])
+
+ def test_log_level_defaults_to_debug(self):
+ log = mock.MagicMock()
+ util.multi_log('message', log=log)
+ self.assertEqual((logging.DEBUG, mock.ANY), log.log.call_args[0])
+
+ def test_given_log_level_used(self):
+ log = mock.MagicMock()
+ log_level = mock.Mock()
+ util.multi_log('message', log=log, log_level=log_level)
+ self.assertEqual((log_level, mock.ANY), log.log.call_args[0])
+
+
+class TestMessageFromString(helpers.TestCase):
+
+ def test_unicode_not_messed_up(self):
+ roundtripped = util.message_from_string(u'\n').as_string()
+ self.assertNotIn('\x00', roundtripped)
+
+
+class TestReadSeeded(helpers.TestCase):
+ def setUp(self):
+ super(TestReadSeeded, self).setUp()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
+
+ def test_unicode_not_messed_up(self):
+ ud = b"userdatablob"
+ helpers.populate_dir(
+ self.tmp, {'meta-data': "key1: val1", 'user-data': ud})
+ sdir = self.tmp + os.path.sep
+ (found_md, found_ud) = util.read_seeded(sdir)
+
+ self.assertEqual(found_md, {'key1': 'val1'})
+ self.assertEqual(found_ud, ud)
+
# vi: ts=4 expandtab