diff options
Diffstat (limited to 'cloudinit/tests')
| -rw-r--r-- | cloudinit/tests/test_dmi.py | 154 | ||||
| -rw-r--r-- | cloudinit/tests/test_gpg.py | 3 | ||||
| -rw-r--r-- | cloudinit/tests/test_persistence.py | 127 | ||||
| -rw-r--r-- | cloudinit/tests/test_stages.py | 62 | ||||
| -rw-r--r-- | cloudinit/tests/test_upgrade.py | 45 | ||||
| -rw-r--r-- | cloudinit/tests/test_util.py | 80 | 
6 files changed, 470 insertions, 1 deletions
| diff --git a/cloudinit/tests/test_dmi.py b/cloudinit/tests/test_dmi.py new file mode 100644 index 00000000..78a72122 --- /dev/null +++ b/cloudinit/tests/test_dmi.py @@ -0,0 +1,154 @@ +from cloudinit.tests import helpers +from cloudinit import dmi +from cloudinit import util +from cloudinit import subp + +import os +import tempfile +import shutil +from unittest import mock + + +class TestReadDMIData(helpers.FilesystemMockingTestCase): + +    def setUp(self): +        super(TestReadDMIData, self).setUp() +        self.new_root = tempfile.mkdtemp() +        self.addCleanup(shutil.rmtree, self.new_root) +        self.reRoot(self.new_root) +        p = mock.patch("cloudinit.dmi.is_container", return_value=False) +        self.addCleanup(p.stop) +        self._m_is_container = p.start() +        p = mock.patch("cloudinit.dmi.is_FreeBSD", return_value=False) +        self.addCleanup(p.stop) +        self._m_is_FreeBSD = p.start() + +    def _create_sysfs_parent_directory(self): +        util.ensure_dir(os.path.join('sys', 'class', 'dmi', 'id')) + +    def _create_sysfs_file(self, key, content): +        """Mocks the sys path found on Linux systems.""" +        self._create_sysfs_parent_directory() +        dmi_key = "/sys/class/dmi/id/{0}".format(key) +        util.write_file(dmi_key, content) + +    def _configure_dmidecode_return(self, key, content, error=None): +        """ +        In order to test a missing sys path and call outs to dmidecode, this +        function fakes the results of dmidecode to test the results. +        """ +        def _dmidecode_subp(cmd): +            if cmd[-1] != key: +                raise subp.ProcessExecutionError() +            return (content, error) + +        self.patched_funcs.enter_context( +            mock.patch("cloudinit.dmi.subp.which", side_effect=lambda _: True)) +        self.patched_funcs.enter_context( +            mock.patch("cloudinit.dmi.subp.subp", side_effect=_dmidecode_subp)) + +    def _configure_kenv_return(self, key, content, error=None): +        """ +        In order to test a FreeBSD system call outs to kenv, this +        function fakes the results of kenv to test the results. +        """ +        def _kenv_subp(cmd): +            if cmd[-1] != dmi.DMIDECODE_TO_KERNEL[key].freebsd: +                raise subp.ProcessExecutionError() +            return (content, error) + +        self.patched_funcs.enter_context( +            mock.patch("cloudinit.dmi.subp.subp", side_effect=_kenv_subp)) + +    def patch_mapping(self, new_mapping): +        self.patched_funcs.enter_context( +            mock.patch('cloudinit.dmi.DMIDECODE_TO_KERNEL', +                       new_mapping)) + +    def test_sysfs_used_with_key_in_mapping_and_file_on_disk(self): +        self.patch_mapping({'mapped-key': dmi.kdmi('mapped-value', None)}) +        expected_dmi_value = 'sys-used-correctly' +        self._create_sysfs_file('mapped-value', expected_dmi_value) +        self._configure_dmidecode_return('mapped-key', 'wrong-wrong-wrong') +        self.assertEqual(expected_dmi_value, dmi.read_dmi_data('mapped-key')) + +    def test_dmidecode_used_if_no_sysfs_file_on_disk(self): +        self.patch_mapping({}) +        self._create_sysfs_parent_directory() +        expected_dmi_value = 'dmidecode-used' +        self._configure_dmidecode_return('use-dmidecode', expected_dmi_value) +        with mock.patch("cloudinit.util.os.uname") as m_uname: +            m_uname.return_value = ('x-sysname', 'x-nodename', +                                    'x-release', 'x-version', 'x86_64') +            self.assertEqual(expected_dmi_value, +                             dmi.read_dmi_data('use-dmidecode')) + +    def test_dmidecode_not_used_on_arm(self): +        self.patch_mapping({}) +        print("current =%s", subp) +        self._create_sysfs_parent_directory() +        dmi_val = 'from-dmidecode' +        dmi_name = 'use-dmidecode' +        self._configure_dmidecode_return(dmi_name, dmi_val) +        print("now =%s", subp) + +        expected = {'armel': None, 'aarch64': dmi_val, 'x86_64': dmi_val} +        found = {} +        # we do not run the 'dmi-decode' binary on some arches +        # verify that anything requested that is not in the sysfs dir +        # will return None on those arches. +        with mock.patch("cloudinit.util.os.uname") as m_uname: +            for arch in expected: +                m_uname.return_value = ('x-sysname', 'x-nodename', +                                        'x-release', 'x-version', arch) +                print("now2 =%s", subp) +                found[arch] = dmi.read_dmi_data(dmi_name) +        self.assertEqual(expected, found) + +    def test_none_returned_if_neither_source_has_data(self): +        self.patch_mapping({}) +        self._configure_dmidecode_return('key', 'value') +        self.assertIsNone(dmi.read_dmi_data('expect-fail')) + +    def test_none_returned_if_dmidecode_not_in_path(self): +        self.patched_funcs.enter_context( +            mock.patch.object(subp, 'which', lambda _: False)) +        self.patch_mapping({}) +        self.assertIsNone(dmi.read_dmi_data('expect-fail')) + +    def test_empty_string_returned_instead_of_foxfox(self): +        # uninitialized dmi values show as \xff, return empty string +        my_len = 32 +        dmi_value = b'\xff' * my_len + b'\n' +        expected = "" +        dmi_key = 'system-product-name' +        sysfs_key = 'product_name' +        self._create_sysfs_file(sysfs_key, dmi_value) +        self.assertEqual(expected, dmi.read_dmi_data(dmi_key)) + +    def test_container_returns_none(self): +        """In a container read_dmi_data should always return None.""" + +        # first verify we get the value if not in container +        self._m_is_container.return_value = False +        key, val = ("system-product-name", "my_product") +        self._create_sysfs_file('product_name', val) +        self.assertEqual(val, dmi.read_dmi_data(key)) + +        # then verify in container returns None +        self._m_is_container.return_value = True +        self.assertIsNone(dmi.read_dmi_data(key)) + +    def test_container_returns_none_on_unknown(self): +        """In a container even bogus keys return None.""" +        self._m_is_container.return_value = True +        self._create_sysfs_file('product_name', "should-be-ignored") +        self.assertIsNone(dmi.read_dmi_data("bogus")) +        self.assertIsNone(dmi.read_dmi_data("system-product-name")) + +    def test_freebsd_uses_kenv(self): +        """On a FreeBSD system, kenv is called.""" +        self._m_is_FreeBSD.return_value = True +        key, val = ("system-product-name", "my_product") +        self._configure_kenv_return(key, val) +        self.assertEqual(dmi.read_dmi_data(key), val) diff --git a/cloudinit/tests/test_gpg.py b/cloudinit/tests/test_gpg.py index f96f5372..311dfad6 100644 --- a/cloudinit/tests/test_gpg.py +++ b/cloudinit/tests/test_gpg.py @@ -49,6 +49,7 @@ class TestReceiveKeys(CiTestCase):          m_subp.return_value = ('', '')          gpg.recv_key(key, keyserver, retries=retries)          m_subp.assert_called_once_with( -            ['gpg', '--keyserver=%s' % keyserver, '--recv-keys', key], +            ['gpg', '--no-tty', +             '--keyserver=%s' % keyserver, '--recv-keys', key],              capture=True)          m_sleep.assert_not_called() diff --git a/cloudinit/tests/test_persistence.py b/cloudinit/tests/test_persistence.py new file mode 100644 index 00000000..ec1152a9 --- /dev/null +++ b/cloudinit/tests/test_persistence.py @@ -0,0 +1,127 @@ +# Copyright (C) 2020 Canonical Ltd. +# +# Author: Daniel Watkins <oddbloke@ubuntu.com> +# +# This file is part of cloud-init. See LICENSE file for license information. +""" +Tests for cloudinit.persistence. + +Per https://docs.python.org/3/library/pickle.html, only "classes that are +defined at the top level of a module" can be pickled.  This means that all of +our ``CloudInitPickleMixin`` subclasses for testing must be defined at +module-level (rather than being defined inline or dynamically in the body of +test methods, as we would do without this constraint). + +``TestPickleMixin.test_subclasses`` iterates over a list of all of these +classes, and tests that they round-trip through a pickle dump/load.  As the +interface we're testing is that ``_unpickle`` is called appropriately on +subclasses, our subclasses define their assertions in their ``_unpickle`` +implementation.  (This means that the assertions will not be executed if +``_unpickle`` is not called at all; we have +``TestPickleMixin.test_unpickle_called`` to ensure it is called.) + +To avoid manually maintaining a list of classes for parametrization we use a +simple metaclass, ``_Collector``, to gather them up. +""" + +import pickle +from unittest import mock + +import pytest + +from cloudinit.persistence import CloudInitPickleMixin + + +class _Collector(type): +    """Any class using this as a metaclass will be stored in test_classes.""" + +    test_classes = [] + +    def __new__(cls, *args): +        new_cls = super().__new__(cls, *args) +        _Collector.test_classes.append(new_cls) +        return new_cls + + +class InstanceVersionNotUsed(CloudInitPickleMixin, metaclass=_Collector): +    """Test that the class version is used over one set in instance state.""" + +    _ci_pkl_version = 1 + +    def __init__(self): +        self._ci_pkl_version = 2 + +    def _unpickle(self, ci_pkl_version: int) -> None: +        assert 1 == ci_pkl_version + + +class MissingVersionHandled(CloudInitPickleMixin, metaclass=_Collector): +    """Test that pickles without ``_ci_pkl_version`` are handled gracefully. + +    This is tested by overriding ``__getstate__`` so the dumped pickle of this +    class will not have ``_ci_pkl_version`` included. +    """ + +    def __getstate__(self): +        return self.__dict__ + +    def _unpickle(self, ci_pkl_version: int) -> None: +        assert 0 == ci_pkl_version + + +class OverridenVersionHonored(CloudInitPickleMixin, metaclass=_Collector): +    """Test that the subclass's version is used.""" + +    _ci_pkl_version = 1 + +    def _unpickle(self, ci_pkl_version: int) -> None: +        assert 1 == ci_pkl_version + + +class StateIsRestored(CloudInitPickleMixin, metaclass=_Collector): +    """Instance state should be restored before ``_unpickle`` is called.""" + +    def __init__(self): +        self.some_state = "some state" + +    def _unpickle(self, ci_pkl_version: int) -> None: +        assert "some state" == self.some_state + + +class UnpickleCanBeUnoverriden(CloudInitPickleMixin, metaclass=_Collector): +    """Subclasses should not need to override ``_unpickle``.""" + + +class VersionDefaultsToZero(CloudInitPickleMixin, metaclass=_Collector): +    """Test that the default version is 0.""" + +    def _unpickle(self, ci_pkl_version: int) -> None: +        assert 0 == ci_pkl_version + + +class VersionIsPoppedFromState(CloudInitPickleMixin, metaclass=_Collector): +    """Test _ci_pkl_version is popped from state before being restored.""" + +    def _unpickle(self, ci_pkl_version: int) -> None: +        # `self._ci_pkl_version` returns the type's _ci_pkl_version if it isn't +        # in instance state, so we need to explicitly check self.__dict__. +        assert "_ci_pkl_version" not in self.__dict__ + + +class TestPickleMixin: +    def test_unpickle_called(self): +        """Test that self._unpickle is called on unpickle.""" +        with mock.patch.object( +            CloudInitPickleMixin, "_unpickle" +        ) as m_unpickle: +            pickle.loads(pickle.dumps(CloudInitPickleMixin())) +        assert 1 == m_unpickle.call_count + +    @pytest.mark.parametrize("cls", _Collector.test_classes) +    def test_subclasses(self, cls): +        """For each collected class, round-trip through pickle dump/load. + +        Assertions are implemented in ``cls._unpickle``, and so are evoked as +        part of the pickle load. +        """ +        pickle.loads(pickle.dumps(cls())) diff --git a/cloudinit/tests/test_stages.py b/cloudinit/tests/test_stages.py index d5c9c0e4..d2d1b37f 100644 --- a/cloudinit/tests/test_stages.py +++ b/cloudinit/tests/test_stages.py @@ -3,6 +3,9 @@  """Tests related to cloudinit.stages module."""  import os +import stat + +import pytest  from cloudinit import stages  from cloudinit import sources @@ -341,4 +344,63 @@ class TestInit(CiTestCase):          self.init.distro.apply_network_config.assert_called_with(              net_cfg, bring_up=True) + +class TestInit_InitializeFilesystem: +    """Tests for cloudinit.stages.Init._initialize_filesystem. + +    TODO: Expand these tests to cover all of _initialize_filesystem's behavior. +    """ + +    @pytest.yield_fixture +    def init(self, paths): +        """A fixture which yields a stages.Init instance with paths and cfg set + +        As it is replaced with a mock, consumers of this fixture can set +        `init.cfg` if the default empty dict configuration is not appropriate. +        """ +        with mock.patch( +            "cloudinit.stages.Init.cfg", mock.PropertyMock(return_value={}) +        ): +            with mock.patch("cloudinit.stages.util.ensure_dirs"): +                init = stages.Init() +                init._paths = paths +                yield init + +    @mock.patch("cloudinit.stages.util.ensure_file") +    def test_ensure_file_not_called_if_no_log_file_configured( +        self, m_ensure_file, init +    ): +        """If no log file is configured, we should not ensure its existence.""" +        init.cfg = {} + +        init._initialize_filesystem() + +        assert 0 == m_ensure_file.call_count + +    def test_log_files_existence_is_ensured_if_configured(self, init, tmpdir): +        """If a log file is configured, we should ensure its existence.""" +        log_file = tmpdir.join("cloud-init.log") +        init.cfg = {"def_log_file": str(log_file)} + +        init._initialize_filesystem() + +        assert log_file.exists + +    def test_existing_file_permissions_are_not_modified(self, init, tmpdir): +        """If the log file already exists, we should not modify its permissions + +        See https://bugs.launchpad.net/cloud-init/+bug/1900837. +        """ +        # Use a mode that will never be made the default so this test will +        # always be valid +        mode = 0o606 +        log_file = tmpdir.join("cloud-init.log") +        log_file.ensure() +        log_file.chmod(mode) +        init.cfg = {"def_log_file": str(log_file)} + +        init._initialize_filesystem() + +        assert mode == stat.S_IMODE(log_file.stat().mode) +  # vi: ts=4 expandtab diff --git a/cloudinit/tests/test_upgrade.py b/cloudinit/tests/test_upgrade.py new file mode 100644 index 00000000..f79a2536 --- /dev/null +++ b/cloudinit/tests/test_upgrade.py @@ -0,0 +1,45 @@ +# Copyright (C) 2020 Canonical Ltd. +# +# Author: Daniel Watkins <oddbloke@ubuntu.com> +# +# This file is part of cloud-init. See LICENSE file for license information. + +"""Upgrade testing for cloud-init. + +This module tests cloud-init's behaviour across upgrades.  Specifically, it +specifies a set of invariants that the current codebase expects to be true (as +tests in ``TestUpgrade``) and then checks that these hold true after unpickling +``obj.pkl``s from previous versions of cloud-init; those pickles are stored in +``tests/data/old_pickles/``. +""" + +import operator +import pathlib + +import pytest + +from cloudinit.stages import _pkl_load +from cloudinit.tests.helpers import resourceLocation + + +class TestUpgrade: +    @pytest.fixture( +        params=pathlib.Path(resourceLocation("old_pickles")).glob("*.pkl"), +        scope="class", +        ids=operator.attrgetter("name"), +    ) +    def previous_obj_pkl(self, request): +        """Load each pickle to memory once, then run all tests against it. + +        Test implementations _must not_ modify the ``previous_obj_pkl`` which +        they are passed, as that will affect tests that run after them. +        """ +        return _pkl_load(str(request.param)) + +    def test_networking_set_on_distro(self, previous_obj_pkl): +        """We always expect to have ``.networking`` on ``Distro`` objects.""" +        assert previous_obj_pkl.distro.networking is not None + +    def test_blacklist_drivers_set_on_networking(self, previous_obj_pkl): +        """We always expect Networking.blacklist_drivers to be initialised.""" +        assert previous_obj_pkl.distro.networking.blacklist_drivers is None diff --git a/cloudinit/tests/test_util.py b/cloudinit/tests/test_util.py index 096a3037..b7a302f1 100644 --- a/cloudinit/tests/test_util.py +++ b/cloudinit/tests/test_util.py @@ -730,6 +730,41 @@ class TestMountCb:          """already_mounted_device_and_mountdict, but return only the device"""          return already_mounted_device_and_mountdict[0] +    @pytest.mark.parametrize( +        "mtype,expected", +        [ +            # While the filesystem is called iso9660, the mount type is cd9660 +            ("iso9660", "cd9660"), +            # vfat is generally called "msdos" on BSD +            ("vfat", "msdos"), +            # judging from man pages, only FreeBSD has this alias +            ("msdosfs", "msdos"), +            # Test happy path +            ("ufs", "ufs") +        ], +    ) +    @mock.patch("cloudinit.util.is_Linux", autospec=True) +    @mock.patch("cloudinit.util.is_BSD", autospec=True) +    @mock.patch("cloudinit.util.subp.subp") +    @mock.patch("cloudinit.temp_utils.tempdir", autospec=True) +    def test_normalize_mtype_on_bsd( +        self, m_tmpdir, m_subp, m_is_BSD, m_is_Linux, mtype, expected +    ): +        m_is_BSD.return_value = True +        m_is_Linux.return_value = False +        m_tmpdir.return_value.__enter__ = mock.Mock( +            autospec=True, return_value="/tmp/fake" +        ) +        m_tmpdir.return_value.__exit__ = mock.Mock( +            autospec=True, return_value=True +        ) +        callback = mock.Mock(autospec=True) + +        util.mount_cb('/dev/fake0', callback, mtype=mtype) +        assert mock.call( +            ["mount", "-o", "ro", "-t", expected, "/dev/fake0", "/tmp/fake"], +            update_env=None) in m_subp.call_args_list +      @pytest.mark.parametrize("invalid_mtype", [int(0), float(0.0), dict()])      def test_typeerror_raised_for_invalid_mtype(self, invalid_mtype):          with pytest.raises(TypeError): @@ -771,4 +806,49 @@ class TestMountCb:          ] == callback.call_args_list +@mock.patch("cloudinit.util.write_file") +class TestEnsureFile: +    """Tests for ``cloudinit.util.ensure_file``.""" + +    def test_parameters_passed_through(self, m_write_file): +        """Test the parameters in the signature are passed to write_file.""" +        util.ensure_file( +            mock.sentinel.path, +            mode=mock.sentinel.mode, +            preserve_mode=mock.sentinel.preserve_mode, +        ) + +        assert 1 == m_write_file.call_count +        args, kwargs = m_write_file.call_args +        assert (mock.sentinel.path,) == args +        assert mock.sentinel.mode == kwargs["mode"] +        assert mock.sentinel.preserve_mode == kwargs["preserve_mode"] + +    @pytest.mark.parametrize( +        "kwarg,expected", +        [ +            # Files should be world-readable by default +            ("mode", 0o644), +            # The previous behaviour of not preserving mode should be retained +            ("preserve_mode", False), +        ], +    ) +    def test_defaults(self, m_write_file, kwarg, expected): +        """Test that ensure_file defaults appropriately.""" +        util.ensure_file(mock.sentinel.path) + +        assert 1 == m_write_file.call_count +        _args, kwargs = m_write_file.call_args +        assert expected == kwargs[kwarg] + +    def test_static_parameters_are_passed(self, m_write_file): +        """Test that the static write_files parameters are passed correctly.""" +        util.ensure_file(mock.sentinel.path) + +        assert 1 == m_write_file.call_count +        _args, kwargs = m_write_file.call_args +        assert "" == kwargs["content"] +        assert "ab" == kwargs["omode"] + +  # vi: ts=4 expandtab | 
