diff options
Diffstat (limited to 'cloudinit/tests')
-rw-r--r-- | cloudinit/tests/helpers.py | 54 | ||||
-rw-r--r-- | cloudinit/tests/test_subp.py | 61 | ||||
-rw-r--r-- | cloudinit/tests/test_util.py | 169 |
3 files changed, 265 insertions, 19 deletions
diff --git a/cloudinit/tests/helpers.py b/cloudinit/tests/helpers.py index 0080c729..999b1d7c 100644 --- a/cloudinit/tests/helpers.py +++ b/cloudinit/tests/helpers.py @@ -173,17 +173,15 @@ class CiTestCase(TestCase): dir = self.tmp_dir() return os.path.normpath(os.path.abspath(os.path.join(dir, path))) - def assertRaisesCodeEqual(self, expected, found): - """Handle centos6 having different context manager for assertRaises. - with assertRaises(Exception) as e: - raise Exception("BOO") - - centos6 will have e.exception as an integer. - anything nwere will have it as something with a '.code'""" - if isinstance(found, int): - self.assertEqual(expected, found) - else: - self.assertEqual(expected, found.code) + def sys_exit(self, code): + """Provide a wrapper around sys.exit for python 2.6 + + In 2.6, this code would produce 'cm.exception' with value int(2) + rather than the SystemExit that was raised by sys.exit(2). + with assertRaises(SystemExit) as cm: + sys.exit(2) + """ + raise SystemExit(code) class ResourceUsingTestCase(CiTestCase): @@ -285,10 +283,15 @@ class FilesystemMockingTestCase(ResourceUsingTestCase): def patchOS(self, new_root): patch_funcs = { os.path: [('isfile', 1), ('exists', 1), - ('islink', 1), ('isdir', 1)], + ('islink', 1), ('isdir', 1), ('lexists', 1)], os: [('listdir', 1), ('mkdir', 1), - ('lstat', 1), ('symlink', 2)], + ('lstat', 1), ('symlink', 2)] } + + if hasattr(os, 'scandir'): + # py27 does not have scandir + patch_funcs[os].append(('scandir', 1)) + for (mod, funcs) in patch_funcs.items(): for f, nargs in funcs: func = getattr(mod, f) @@ -411,6 +414,19 @@ except AttributeError: return decorator +try: + import jsonschema + assert jsonschema # avoid pyflakes error F401: import unused + _missing_jsonschema_dep = False +except ImportError: + _missing_jsonschema_dep = True + + +def skipUnlessJsonSchema(): + return skipIf( + _missing_jsonschema_dep, "No python-jsonschema dependency present.") + + # older versions of mock do not have the useful 'assert_not_called' if not hasattr(mock.Mock, 'assert_not_called'): def __mock_assert_not_called(mmock): @@ -422,12 +438,12 @@ if not hasattr(mock.Mock, 'assert_not_called'): mock.Mock.assert_not_called = __mock_assert_not_called -# older unittest2.TestCase (centos6) do not have assertRaisesRegex -# And setting assertRaisesRegex to assertRaisesRegexp causes -# https://github.com/PyCQA/pylint/issues/1653 . So the workaround. +# older unittest2.TestCase (centos6) have only the now-deprecated +# assertRaisesRegexp. Simple assignment makes pylint complain, about +# users of assertRaisesRegex so we use getattr to trick it. +# https://github.com/PyCQA/pylint/issues/1946 if not hasattr(unittest2.TestCase, 'assertRaisesRegex'): - def _tricky(*args, **kwargs): - return unittest2.TestCase.assertRaisesRegexp - unittest2.TestCase.assertRaisesRegex = _tricky + unittest2.TestCase.assertRaisesRegex = ( + getattr(unittest2.TestCase, 'assertRaisesRegexp')) # vi: ts=4 expandtab diff --git a/cloudinit/tests/test_subp.py b/cloudinit/tests/test_subp.py new file mode 100644 index 00000000..448097d3 --- /dev/null +++ b/cloudinit/tests/test_subp.py @@ -0,0 +1,61 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +"""Tests for cloudinit.subp utility functions""" + +from cloudinit import subp +from cloudinit.tests.helpers import CiTestCase + + +class TestPrependBaseCommands(CiTestCase): + + with_logs = True + + def test_prepend_base_command_errors_on_neither_string_nor_list(self): + """Raise an error for each command which is not a string or list.""" + orig_commands = ['ls', 1, {'not': 'gonna work'}, ['basecmd', 'list']] + with self.assertRaises(TypeError) as context_manager: + subp.prepend_base_command( + base_command='basecmd', commands=orig_commands) + self.assertEqual( + "Invalid basecmd config. These commands are not a string or" + " list:\n1\n{'not': 'gonna work'}", + str(context_manager.exception)) + + def test_prepend_base_command_warns_on_non_base_string_commands(self): + """Warn on each non-base for commands of type string.""" + orig_commands = [ + 'ls', 'basecmd list', 'touch /blah', 'basecmd install x'] + fixed_commands = subp.prepend_base_command( + base_command='basecmd', commands=orig_commands) + self.assertEqual( + 'WARNING: Non-basecmd commands in basecmd config:\n' + 'ls\ntouch /blah\n', + self.logs.getvalue()) + self.assertEqual(orig_commands, fixed_commands) + + def test_prepend_base_command_prepends_on_non_base_list_commands(self): + """Prepend 'basecmd' for each non-basecmd command of type list.""" + orig_commands = [['ls'], ['basecmd', 'list'], ['basecmda', '/blah'], + ['basecmd', 'install', 'x']] + expected = [['basecmd', 'ls'], ['basecmd', 'list'], + ['basecmd', 'basecmda', '/blah'], + ['basecmd', 'install', 'x']] + fixed_commands = subp.prepend_base_command( + base_command='basecmd', commands=orig_commands) + self.assertEqual('', self.logs.getvalue()) + self.assertEqual(expected, fixed_commands) + + def test_prepend_base_command_removes_first_item_when_none(self): + """Remove the first element of a non-basecmd when it is None.""" + orig_commands = [[None, 'ls'], ['basecmd', 'list'], + [None, 'touch', '/blah'], + ['basecmd', 'install', 'x']] + expected = [['ls'], ['basecmd', 'list'], + ['touch', '/blah'], + ['basecmd', 'install', 'x']] + fixed_commands = subp.prepend_base_command( + base_command='basecmd', commands=orig_commands) + self.assertEqual('', self.logs.getvalue()) + self.assertEqual(expected, fixed_commands) + +# vi: ts=4 expandtab diff --git a/cloudinit/tests/test_util.py b/cloudinit/tests/test_util.py index ba6bf699..3f37dbb6 100644 --- a/cloudinit/tests/test_util.py +++ b/cloudinit/tests/test_util.py @@ -3,6 +3,7 @@ """Tests for cloudinit.util""" import logging +from textwrap import dedent import cloudinit.util as util @@ -16,6 +17,25 @@ MOUNT_INFO = [ ] +class FakeCloud(object): + + def __init__(self, hostname, fqdn): + self.hostname = hostname + self.fqdn = fqdn + self.calls = [] + + def get_hostname(self, fqdn=None, metadata_only=None): + myargs = {} + if fqdn is not None: + myargs['fqdn'] = fqdn + if metadata_only is not None: + myargs['metadata_only'] = metadata_only + self.calls.append(myargs) + if fqdn: + return self.fqdn + return self.hostname + + class TestUtil(CiTestCase): def test_parse_mount_info_no_opts_no_arg(self): @@ -44,3 +64,152 @@ class TestUtil(CiTestCase): m_mount_info.return_value = ('/dev/sda1', 'btrfs', '/', 'ro,relatime') is_rw = util.mount_is_read_write('/') self.assertEqual(is_rw, False) + + +class TestShellify(CiTestCase): + + def test_input_dict_raises_type_error(self): + self.assertRaisesRegex( + TypeError, 'Input.*was.*dict.*xpected', + util.shellify, {'mykey': 'myval'}) + + def test_input_str_raises_type_error(self): + self.assertRaisesRegex( + TypeError, 'Input.*was.*str.*xpected', util.shellify, "foobar") + + def test_value_with_int_raises_type_error(self): + self.assertRaisesRegex( + TypeError, 'shellify.*int', util.shellify, ["foo", 1]) + + def test_supports_strings_and_lists(self): + self.assertEqual( + '\n'.join(["#!/bin/sh", "echo hi mom", "'echo' 'hi dad'", + "'echo' 'hi' 'sis'", ""]), + util.shellify(["echo hi mom", ["echo", "hi dad"], + ('echo', 'hi', 'sis')])) + + +class TestGetHostnameFqdn(CiTestCase): + + def test_get_hostname_fqdn_from_only_cfg_fqdn(self): + """When cfg only has the fqdn key, derive hostname and fqdn from it.""" + hostname, fqdn = util.get_hostname_fqdn( + cfg={'fqdn': 'myhost.domain.com'}, cloud=None) + self.assertEqual('myhost', hostname) + self.assertEqual('myhost.domain.com', fqdn) + + def test_get_hostname_fqdn_from_cfg_fqdn_and_hostname(self): + """When cfg has both fqdn and hostname keys, return them.""" + hostname, fqdn = util.get_hostname_fqdn( + cfg={'fqdn': 'myhost.domain.com', 'hostname': 'other'}, cloud=None) + self.assertEqual('other', hostname) + self.assertEqual('myhost.domain.com', fqdn) + + def test_get_hostname_fqdn_from_cfg_hostname_with_domain(self): + """When cfg has only hostname key which represents a fqdn, use that.""" + hostname, fqdn = util.get_hostname_fqdn( + cfg={'hostname': 'myhost.domain.com'}, cloud=None) + self.assertEqual('myhost', hostname) + self.assertEqual('myhost.domain.com', fqdn) + + def test_get_hostname_fqdn_from_cfg_hostname_without_domain(self): + """When cfg has a hostname without a '.' query cloud.get_hostname.""" + mycloud = FakeCloud('cloudhost', 'cloudhost.mycloud.com') + hostname, fqdn = util.get_hostname_fqdn( + cfg={'hostname': 'myhost'}, cloud=mycloud) + self.assertEqual('myhost', hostname) + self.assertEqual('cloudhost.mycloud.com', fqdn) + self.assertEqual( + [{'fqdn': True, 'metadata_only': False}], mycloud.calls) + + def test_get_hostname_fqdn_from_without_fqdn_or_hostname(self): + """When cfg has neither hostname nor fqdn cloud.get_hostname.""" + mycloud = FakeCloud('cloudhost', 'cloudhost.mycloud.com') + hostname, fqdn = util.get_hostname_fqdn(cfg={}, cloud=mycloud) + self.assertEqual('cloudhost', hostname) + self.assertEqual('cloudhost.mycloud.com', fqdn) + self.assertEqual( + [{'fqdn': True, 'metadata_only': False}, + {'metadata_only': False}], mycloud.calls) + + def test_get_hostname_fqdn_from_passes_metadata_only_to_cloud(self): + """Calls to cloud.get_hostname pass the metadata_only parameter.""" + mycloud = FakeCloud('cloudhost', 'cloudhost.mycloud.com') + hostname, fqdn = util.get_hostname_fqdn( + cfg={}, cloud=mycloud, metadata_only=True) + self.assertEqual( + [{'fqdn': True, 'metadata_only': True}, + {'metadata_only': True}], mycloud.calls) + + +class TestBlkid(CiTestCase): + ids = { + "id01": "1111-1111", + "id02": "22222222-2222", + "id03": "33333333-3333", + "id04": "44444444-4444", + "id05": "55555555-5555-5555-5555-555555555555", + "id06": "66666666-6666-6666-6666-666666666666", + "id07": "52894610484658920398", + "id08": "86753098675309867530", + "id09": "99999999-9999-9999-9999-999999999999", + } + + blkid_out = dedent("""\ + /dev/loop0: TYPE="squashfs" + /dev/loop1: TYPE="squashfs" + /dev/loop2: TYPE="squashfs" + /dev/loop3: TYPE="squashfs" + /dev/sda1: UUID="{id01}" TYPE="vfat" PARTUUID="{id02}" + /dev/sda2: UUID="{id03}" TYPE="ext4" PARTUUID="{id04}" + /dev/sda3: UUID="{id05}" TYPE="ext4" PARTUUID="{id06}" + /dev/sda4: LABEL="default" UUID="{id07}" UUID_SUB="{id08}" """ + """TYPE="zfs_member" PARTUUID="{id09}" + /dev/loop4: TYPE="squashfs" + """) + + maxDiff = None + + def _get_expected(self): + return ({ + "/dev/loop0": {"DEVNAME": "/dev/loop0", "TYPE": "squashfs"}, + "/dev/loop1": {"DEVNAME": "/dev/loop1", "TYPE": "squashfs"}, + "/dev/loop2": {"DEVNAME": "/dev/loop2", "TYPE": "squashfs"}, + "/dev/loop3": {"DEVNAME": "/dev/loop3", "TYPE": "squashfs"}, + "/dev/loop4": {"DEVNAME": "/dev/loop4", "TYPE": "squashfs"}, + "/dev/sda1": {"DEVNAME": "/dev/sda1", "TYPE": "vfat", + "UUID": self.ids["id01"], + "PARTUUID": self.ids["id02"]}, + "/dev/sda2": {"DEVNAME": "/dev/sda2", "TYPE": "ext4", + "UUID": self.ids["id03"], + "PARTUUID": self.ids["id04"]}, + "/dev/sda3": {"DEVNAME": "/dev/sda3", "TYPE": "ext4", + "UUID": self.ids["id05"], + "PARTUUID": self.ids["id06"]}, + "/dev/sda4": {"DEVNAME": "/dev/sda4", "TYPE": "zfs_member", + "LABEL": "default", + "UUID": self.ids["id07"], + "UUID_SUB": self.ids["id08"], + "PARTUUID": self.ids["id09"]}, + }) + + @mock.patch("cloudinit.util.subp") + def test_functional_blkid(self, m_subp): + m_subp.return_value = ( + self.blkid_out.format(**self.ids), "") + self.assertEqual(self._get_expected(), util.blkid()) + m_subp.assert_called_with(["blkid", "-o", "full"], capture=True, + decode="replace") + + @mock.patch("cloudinit.util.subp") + def test_blkid_no_cache_uses_no_cache(self, m_subp): + """blkid should turn off cache if disable_cache is true.""" + m_subp.return_value = ( + self.blkid_out.format(**self.ids), "") + self.assertEqual(self._get_expected(), + util.blkid(disable_cache=True)) + m_subp.assert_called_with(["blkid", "-o", "full", "-c", "/dev/null"], + capture=True, decode="replace") + + +# vi: ts=4 expandtab |