summaryrefslogtreecommitdiff
path: root/cloudinit/tests
diff options
context:
space:
mode:
Diffstat (limited to 'cloudinit/tests')
-rw-r--r--cloudinit/tests/test_dmi.py154
-rw-r--r--cloudinit/tests/test_gpg.py3
-rw-r--r--cloudinit/tests/test_persistence.py127
-rw-r--r--cloudinit/tests/test_stages.py62
-rw-r--r--cloudinit/tests/test_upgrade.py45
-rw-r--r--cloudinit/tests/test_util.py80
6 files changed, 470 insertions, 1 deletions
diff --git a/cloudinit/tests/test_dmi.py b/cloudinit/tests/test_dmi.py
new file mode 100644
index 00000000..78a72122
--- /dev/null
+++ b/cloudinit/tests/test_dmi.py
@@ -0,0 +1,154 @@
+from cloudinit.tests import helpers
+from cloudinit import dmi
+from cloudinit import util
+from cloudinit import subp
+
+import os
+import tempfile
+import shutil
+from unittest import mock
+
+
+class TestReadDMIData(helpers.FilesystemMockingTestCase):
+
+ def setUp(self):
+ super(TestReadDMIData, self).setUp()
+ self.new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.new_root)
+ self.reRoot(self.new_root)
+ p = mock.patch("cloudinit.dmi.is_container", return_value=False)
+ self.addCleanup(p.stop)
+ self._m_is_container = p.start()
+ p = mock.patch("cloudinit.dmi.is_FreeBSD", return_value=False)
+ self.addCleanup(p.stop)
+ self._m_is_FreeBSD = p.start()
+
+ def _create_sysfs_parent_directory(self):
+ util.ensure_dir(os.path.join('sys', 'class', 'dmi', 'id'))
+
+ def _create_sysfs_file(self, key, content):
+ """Mocks the sys path found on Linux systems."""
+ self._create_sysfs_parent_directory()
+ dmi_key = "/sys/class/dmi/id/{0}".format(key)
+ util.write_file(dmi_key, content)
+
+ def _configure_dmidecode_return(self, key, content, error=None):
+ """
+ In order to test a missing sys path and call outs to dmidecode, this
+ function fakes the results of dmidecode to test the results.
+ """
+ def _dmidecode_subp(cmd):
+ if cmd[-1] != key:
+ raise subp.ProcessExecutionError()
+ return (content, error)
+
+ self.patched_funcs.enter_context(
+ mock.patch("cloudinit.dmi.subp.which", side_effect=lambda _: True))
+ self.patched_funcs.enter_context(
+ mock.patch("cloudinit.dmi.subp.subp", side_effect=_dmidecode_subp))
+
+ def _configure_kenv_return(self, key, content, error=None):
+ """
+ In order to test a FreeBSD system call outs to kenv, this
+ function fakes the results of kenv to test the results.
+ """
+ def _kenv_subp(cmd):
+ if cmd[-1] != dmi.DMIDECODE_TO_KERNEL[key].freebsd:
+ raise subp.ProcessExecutionError()
+ return (content, error)
+
+ self.patched_funcs.enter_context(
+ mock.patch("cloudinit.dmi.subp.subp", side_effect=_kenv_subp))
+
+ def patch_mapping(self, new_mapping):
+ self.patched_funcs.enter_context(
+ mock.patch('cloudinit.dmi.DMIDECODE_TO_KERNEL',
+ new_mapping))
+
+ def test_sysfs_used_with_key_in_mapping_and_file_on_disk(self):
+ self.patch_mapping({'mapped-key': dmi.kdmi('mapped-value', None)})
+ expected_dmi_value = 'sys-used-correctly'
+ self._create_sysfs_file('mapped-value', expected_dmi_value)
+ self._configure_dmidecode_return('mapped-key', 'wrong-wrong-wrong')
+ self.assertEqual(expected_dmi_value, dmi.read_dmi_data('mapped-key'))
+
+ def test_dmidecode_used_if_no_sysfs_file_on_disk(self):
+ self.patch_mapping({})
+ self._create_sysfs_parent_directory()
+ expected_dmi_value = 'dmidecode-used'
+ self._configure_dmidecode_return('use-dmidecode', expected_dmi_value)
+ with mock.patch("cloudinit.util.os.uname") as m_uname:
+ m_uname.return_value = ('x-sysname', 'x-nodename',
+ 'x-release', 'x-version', 'x86_64')
+ self.assertEqual(expected_dmi_value,
+ dmi.read_dmi_data('use-dmidecode'))
+
+ def test_dmidecode_not_used_on_arm(self):
+ self.patch_mapping({})
+ print("current =%s", subp)
+ self._create_sysfs_parent_directory()
+ dmi_val = 'from-dmidecode'
+ dmi_name = 'use-dmidecode'
+ self._configure_dmidecode_return(dmi_name, dmi_val)
+ print("now =%s", subp)
+
+ expected = {'armel': None, 'aarch64': dmi_val, 'x86_64': dmi_val}
+ found = {}
+ # we do not run the 'dmi-decode' binary on some arches
+ # verify that anything requested that is not in the sysfs dir
+ # will return None on those arches.
+ with mock.patch("cloudinit.util.os.uname") as m_uname:
+ for arch in expected:
+ m_uname.return_value = ('x-sysname', 'x-nodename',
+ 'x-release', 'x-version', arch)
+ print("now2 =%s", subp)
+ found[arch] = dmi.read_dmi_data(dmi_name)
+ self.assertEqual(expected, found)
+
+ def test_none_returned_if_neither_source_has_data(self):
+ self.patch_mapping({})
+ self._configure_dmidecode_return('key', 'value')
+ self.assertIsNone(dmi.read_dmi_data('expect-fail'))
+
+ def test_none_returned_if_dmidecode_not_in_path(self):
+ self.patched_funcs.enter_context(
+ mock.patch.object(subp, 'which', lambda _: False))
+ self.patch_mapping({})
+ self.assertIsNone(dmi.read_dmi_data('expect-fail'))
+
+ def test_empty_string_returned_instead_of_foxfox(self):
+ # uninitialized dmi values show as \xff, return empty string
+ my_len = 32
+ dmi_value = b'\xff' * my_len + b'\n'
+ expected = ""
+ dmi_key = 'system-product-name'
+ sysfs_key = 'product_name'
+ self._create_sysfs_file(sysfs_key, dmi_value)
+ self.assertEqual(expected, dmi.read_dmi_data(dmi_key))
+
+ def test_container_returns_none(self):
+ """In a container read_dmi_data should always return None."""
+
+ # first verify we get the value if not in container
+ self._m_is_container.return_value = False
+ key, val = ("system-product-name", "my_product")
+ self._create_sysfs_file('product_name', val)
+ self.assertEqual(val, dmi.read_dmi_data(key))
+
+ # then verify in container returns None
+ self._m_is_container.return_value = True
+ self.assertIsNone(dmi.read_dmi_data(key))
+
+ def test_container_returns_none_on_unknown(self):
+ """In a container even bogus keys return None."""
+ self._m_is_container.return_value = True
+ self._create_sysfs_file('product_name', "should-be-ignored")
+ self.assertIsNone(dmi.read_dmi_data("bogus"))
+ self.assertIsNone(dmi.read_dmi_data("system-product-name"))
+
+ def test_freebsd_uses_kenv(self):
+ """On a FreeBSD system, kenv is called."""
+ self._m_is_FreeBSD.return_value = True
+ key, val = ("system-product-name", "my_product")
+ self._configure_kenv_return(key, val)
+ self.assertEqual(dmi.read_dmi_data(key), val)
diff --git a/cloudinit/tests/test_gpg.py b/cloudinit/tests/test_gpg.py
index f96f5372..311dfad6 100644
--- a/cloudinit/tests/test_gpg.py
+++ b/cloudinit/tests/test_gpg.py
@@ -49,6 +49,7 @@ class TestReceiveKeys(CiTestCase):
m_subp.return_value = ('', '')
gpg.recv_key(key, keyserver, retries=retries)
m_subp.assert_called_once_with(
- ['gpg', '--keyserver=%s' % keyserver, '--recv-keys', key],
+ ['gpg', '--no-tty',
+ '--keyserver=%s' % keyserver, '--recv-keys', key],
capture=True)
m_sleep.assert_not_called()
diff --git a/cloudinit/tests/test_persistence.py b/cloudinit/tests/test_persistence.py
new file mode 100644
index 00000000..ec1152a9
--- /dev/null
+++ b/cloudinit/tests/test_persistence.py
@@ -0,0 +1,127 @@
+# Copyright (C) 2020 Canonical Ltd.
+#
+# Author: Daniel Watkins <oddbloke@ubuntu.com>
+#
+# This file is part of cloud-init. See LICENSE file for license information.
+"""
+Tests for cloudinit.persistence.
+
+Per https://docs.python.org/3/library/pickle.html, only "classes that are
+defined at the top level of a module" can be pickled. This means that all of
+our ``CloudInitPickleMixin`` subclasses for testing must be defined at
+module-level (rather than being defined inline or dynamically in the body of
+test methods, as we would do without this constraint).
+
+``TestPickleMixin.test_subclasses`` iterates over a list of all of these
+classes, and tests that they round-trip through a pickle dump/load. As the
+interface we're testing is that ``_unpickle`` is called appropriately on
+subclasses, our subclasses define their assertions in their ``_unpickle``
+implementation. (This means that the assertions will not be executed if
+``_unpickle`` is not called at all; we have
+``TestPickleMixin.test_unpickle_called`` to ensure it is called.)
+
+To avoid manually maintaining a list of classes for parametrization we use a
+simple metaclass, ``_Collector``, to gather them up.
+"""
+
+import pickle
+from unittest import mock
+
+import pytest
+
+from cloudinit.persistence import CloudInitPickleMixin
+
+
+class _Collector(type):
+ """Any class using this as a metaclass will be stored in test_classes."""
+
+ test_classes = []
+
+ def __new__(cls, *args):
+ new_cls = super().__new__(cls, *args)
+ _Collector.test_classes.append(new_cls)
+ return new_cls
+
+
+class InstanceVersionNotUsed(CloudInitPickleMixin, metaclass=_Collector):
+ """Test that the class version is used over one set in instance state."""
+
+ _ci_pkl_version = 1
+
+ def __init__(self):
+ self._ci_pkl_version = 2
+
+ def _unpickle(self, ci_pkl_version: int) -> None:
+ assert 1 == ci_pkl_version
+
+
+class MissingVersionHandled(CloudInitPickleMixin, metaclass=_Collector):
+ """Test that pickles without ``_ci_pkl_version`` are handled gracefully.
+
+ This is tested by overriding ``__getstate__`` so the dumped pickle of this
+ class will not have ``_ci_pkl_version`` included.
+ """
+
+ def __getstate__(self):
+ return self.__dict__
+
+ def _unpickle(self, ci_pkl_version: int) -> None:
+ assert 0 == ci_pkl_version
+
+
+class OverridenVersionHonored(CloudInitPickleMixin, metaclass=_Collector):
+ """Test that the subclass's version is used."""
+
+ _ci_pkl_version = 1
+
+ def _unpickle(self, ci_pkl_version: int) -> None:
+ assert 1 == ci_pkl_version
+
+
+class StateIsRestored(CloudInitPickleMixin, metaclass=_Collector):
+ """Instance state should be restored before ``_unpickle`` is called."""
+
+ def __init__(self):
+ self.some_state = "some state"
+
+ def _unpickle(self, ci_pkl_version: int) -> None:
+ assert "some state" == self.some_state
+
+
+class UnpickleCanBeUnoverriden(CloudInitPickleMixin, metaclass=_Collector):
+ """Subclasses should not need to override ``_unpickle``."""
+
+
+class VersionDefaultsToZero(CloudInitPickleMixin, metaclass=_Collector):
+ """Test that the default version is 0."""
+
+ def _unpickle(self, ci_pkl_version: int) -> None:
+ assert 0 == ci_pkl_version
+
+
+class VersionIsPoppedFromState(CloudInitPickleMixin, metaclass=_Collector):
+ """Test _ci_pkl_version is popped from state before being restored."""
+
+ def _unpickle(self, ci_pkl_version: int) -> None:
+ # `self._ci_pkl_version` returns the type's _ci_pkl_version if it isn't
+ # in instance state, so we need to explicitly check self.__dict__.
+ assert "_ci_pkl_version" not in self.__dict__
+
+
+class TestPickleMixin:
+ def test_unpickle_called(self):
+ """Test that self._unpickle is called on unpickle."""
+ with mock.patch.object(
+ CloudInitPickleMixin, "_unpickle"
+ ) as m_unpickle:
+ pickle.loads(pickle.dumps(CloudInitPickleMixin()))
+ assert 1 == m_unpickle.call_count
+
+ @pytest.mark.parametrize("cls", _Collector.test_classes)
+ def test_subclasses(self, cls):
+ """For each collected class, round-trip through pickle dump/load.
+
+ Assertions are implemented in ``cls._unpickle``, and so are evoked as
+ part of the pickle load.
+ """
+ pickle.loads(pickle.dumps(cls()))
diff --git a/cloudinit/tests/test_stages.py b/cloudinit/tests/test_stages.py
index d5c9c0e4..d2d1b37f 100644
--- a/cloudinit/tests/test_stages.py
+++ b/cloudinit/tests/test_stages.py
@@ -3,6 +3,9 @@
"""Tests related to cloudinit.stages module."""
import os
+import stat
+
+import pytest
from cloudinit import stages
from cloudinit import sources
@@ -341,4 +344,63 @@ class TestInit(CiTestCase):
self.init.distro.apply_network_config.assert_called_with(
net_cfg, bring_up=True)
+
+class TestInit_InitializeFilesystem:
+ """Tests for cloudinit.stages.Init._initialize_filesystem.
+
+ TODO: Expand these tests to cover all of _initialize_filesystem's behavior.
+ """
+
+ @pytest.yield_fixture
+ def init(self, paths):
+ """A fixture which yields a stages.Init instance with paths and cfg set
+
+ As it is replaced with a mock, consumers of this fixture can set
+ `init.cfg` if the default empty dict configuration is not appropriate.
+ """
+ with mock.patch(
+ "cloudinit.stages.Init.cfg", mock.PropertyMock(return_value={})
+ ):
+ with mock.patch("cloudinit.stages.util.ensure_dirs"):
+ init = stages.Init()
+ init._paths = paths
+ yield init
+
+ @mock.patch("cloudinit.stages.util.ensure_file")
+ def test_ensure_file_not_called_if_no_log_file_configured(
+ self, m_ensure_file, init
+ ):
+ """If no log file is configured, we should not ensure its existence."""
+ init.cfg = {}
+
+ init._initialize_filesystem()
+
+ assert 0 == m_ensure_file.call_count
+
+ def test_log_files_existence_is_ensured_if_configured(self, init, tmpdir):
+ """If a log file is configured, we should ensure its existence."""
+ log_file = tmpdir.join("cloud-init.log")
+ init.cfg = {"def_log_file": str(log_file)}
+
+ init._initialize_filesystem()
+
+ assert log_file.exists
+
+ def test_existing_file_permissions_are_not_modified(self, init, tmpdir):
+ """If the log file already exists, we should not modify its permissions
+
+ See https://bugs.launchpad.net/cloud-init/+bug/1900837.
+ """
+ # Use a mode that will never be made the default so this test will
+ # always be valid
+ mode = 0o606
+ log_file = tmpdir.join("cloud-init.log")
+ log_file.ensure()
+ log_file.chmod(mode)
+ init.cfg = {"def_log_file": str(log_file)}
+
+ init._initialize_filesystem()
+
+ assert mode == stat.S_IMODE(log_file.stat().mode)
+
# vi: ts=4 expandtab
diff --git a/cloudinit/tests/test_upgrade.py b/cloudinit/tests/test_upgrade.py
new file mode 100644
index 00000000..f79a2536
--- /dev/null
+++ b/cloudinit/tests/test_upgrade.py
@@ -0,0 +1,45 @@
+# Copyright (C) 2020 Canonical Ltd.
+#
+# Author: Daniel Watkins <oddbloke@ubuntu.com>
+#
+# This file is part of cloud-init. See LICENSE file for license information.
+
+"""Upgrade testing for cloud-init.
+
+This module tests cloud-init's behaviour across upgrades. Specifically, it
+specifies a set of invariants that the current codebase expects to be true (as
+tests in ``TestUpgrade``) and then checks that these hold true after unpickling
+``obj.pkl``s from previous versions of cloud-init; those pickles are stored in
+``tests/data/old_pickles/``.
+"""
+
+import operator
+import pathlib
+
+import pytest
+
+from cloudinit.stages import _pkl_load
+from cloudinit.tests.helpers import resourceLocation
+
+
+class TestUpgrade:
+ @pytest.fixture(
+ params=pathlib.Path(resourceLocation("old_pickles")).glob("*.pkl"),
+ scope="class",
+ ids=operator.attrgetter("name"),
+ )
+ def previous_obj_pkl(self, request):
+ """Load each pickle to memory once, then run all tests against it.
+
+ Test implementations _must not_ modify the ``previous_obj_pkl`` which
+ they are passed, as that will affect tests that run after them.
+ """
+ return _pkl_load(str(request.param))
+
+ def test_networking_set_on_distro(self, previous_obj_pkl):
+ """We always expect to have ``.networking`` on ``Distro`` objects."""
+ assert previous_obj_pkl.distro.networking is not None
+
+ def test_blacklist_drivers_set_on_networking(self, previous_obj_pkl):
+ """We always expect Networking.blacklist_drivers to be initialised."""
+ assert previous_obj_pkl.distro.networking.blacklist_drivers is None
diff --git a/cloudinit/tests/test_util.py b/cloudinit/tests/test_util.py
index 096a3037..b7a302f1 100644
--- a/cloudinit/tests/test_util.py
+++ b/cloudinit/tests/test_util.py
@@ -730,6 +730,41 @@ class TestMountCb:
"""already_mounted_device_and_mountdict, but return only the device"""
return already_mounted_device_and_mountdict[0]
+ @pytest.mark.parametrize(
+ "mtype,expected",
+ [
+ # While the filesystem is called iso9660, the mount type is cd9660
+ ("iso9660", "cd9660"),
+ # vfat is generally called "msdos" on BSD
+ ("vfat", "msdos"),
+ # judging from man pages, only FreeBSD has this alias
+ ("msdosfs", "msdos"),
+ # Test happy path
+ ("ufs", "ufs")
+ ],
+ )
+ @mock.patch("cloudinit.util.is_Linux", autospec=True)
+ @mock.patch("cloudinit.util.is_BSD", autospec=True)
+ @mock.patch("cloudinit.util.subp.subp")
+ @mock.patch("cloudinit.temp_utils.tempdir", autospec=True)
+ def test_normalize_mtype_on_bsd(
+ self, m_tmpdir, m_subp, m_is_BSD, m_is_Linux, mtype, expected
+ ):
+ m_is_BSD.return_value = True
+ m_is_Linux.return_value = False
+ m_tmpdir.return_value.__enter__ = mock.Mock(
+ autospec=True, return_value="/tmp/fake"
+ )
+ m_tmpdir.return_value.__exit__ = mock.Mock(
+ autospec=True, return_value=True
+ )
+ callback = mock.Mock(autospec=True)
+
+ util.mount_cb('/dev/fake0', callback, mtype=mtype)
+ assert mock.call(
+ ["mount", "-o", "ro", "-t", expected, "/dev/fake0", "/tmp/fake"],
+ update_env=None) in m_subp.call_args_list
+
@pytest.mark.parametrize("invalid_mtype", [int(0), float(0.0), dict()])
def test_typeerror_raised_for_invalid_mtype(self, invalid_mtype):
with pytest.raises(TypeError):
@@ -771,4 +806,49 @@ class TestMountCb:
] == callback.call_args_list
+@mock.patch("cloudinit.util.write_file")
+class TestEnsureFile:
+ """Tests for ``cloudinit.util.ensure_file``."""
+
+ def test_parameters_passed_through(self, m_write_file):
+ """Test the parameters in the signature are passed to write_file."""
+ util.ensure_file(
+ mock.sentinel.path,
+ mode=mock.sentinel.mode,
+ preserve_mode=mock.sentinel.preserve_mode,
+ )
+
+ assert 1 == m_write_file.call_count
+ args, kwargs = m_write_file.call_args
+ assert (mock.sentinel.path,) == args
+ assert mock.sentinel.mode == kwargs["mode"]
+ assert mock.sentinel.preserve_mode == kwargs["preserve_mode"]
+
+ @pytest.mark.parametrize(
+ "kwarg,expected",
+ [
+ # Files should be world-readable by default
+ ("mode", 0o644),
+ # The previous behaviour of not preserving mode should be retained
+ ("preserve_mode", False),
+ ],
+ )
+ def test_defaults(self, m_write_file, kwarg, expected):
+ """Test that ensure_file defaults appropriately."""
+ util.ensure_file(mock.sentinel.path)
+
+ assert 1 == m_write_file.call_count
+ _args, kwargs = m_write_file.call_args
+ assert expected == kwargs[kwarg]
+
+ def test_static_parameters_are_passed(self, m_write_file):
+ """Test that the static write_files parameters are passed correctly."""
+ util.ensure_file(mock.sentinel.path)
+
+ assert 1 == m_write_file.call_count
+ _args, kwargs = m_write_file.call_args
+ assert "" == kwargs["content"]
+ assert "ab" == kwargs["omode"]
+
+
# vi: ts=4 expandtab