summaryrefslogtreecommitdiff
path: root/tests/unittests/distros/test_generic.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/distros/test_generic.py')
-rw-r--r--tests/unittests/distros/test_generic.py383
1 files changed, 383 insertions, 0 deletions
diff --git a/tests/unittests/distros/test_generic.py b/tests/unittests/distros/test_generic.py
new file mode 100644
index 00000000..93c5395c
--- /dev/null
+++ b/tests/unittests/distros/test_generic.py
@@ -0,0 +1,383 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+import os
+import shutil
+import tempfile
+from unittest import mock
+
+import pytest
+
+from cloudinit import distros, util
+from tests.unittests import helpers
+
+unknown_arch_info = {
+ "arches": ["default"],
+ "failsafe": {
+ "primary": "http://fs-primary-default",
+ "security": "http://fs-security-default",
+ },
+}
+
+package_mirrors = [
+ {
+ "arches": ["i386", "amd64"],
+ "failsafe": {
+ "primary": "http://fs-primary-intel",
+ "security": "http://fs-security-intel",
+ },
+ "search": {
+ "primary": [
+ "http://%(ec2_region)s.ec2/",
+ "http://%(availability_zone)s.clouds/",
+ ],
+ "security": [
+ "http://security-mirror1-intel",
+ "http://security-mirror2-intel",
+ ],
+ },
+ },
+ {
+ "arches": ["armhf", "armel"],
+ "failsafe": {
+ "primary": "http://fs-primary-arm",
+ "security": "http://fs-security-arm",
+ },
+ },
+ unknown_arch_info,
+]
+
+gpmi = distros._get_package_mirror_info
+gapmi = distros._get_arch_package_mirror_info
+
+
+class TestGenericDistro(helpers.FilesystemMockingTestCase):
+ def setUp(self):
+ super(TestGenericDistro, self).setUp()
+ # Make a temp directoy for tests to use.
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
+
+ def _write_load_sudoers(self, _user, rules):
+ cls = distros.fetch("ubuntu")
+ d = cls("ubuntu", {}, None)
+ os.makedirs(os.path.join(self.tmp, "etc"))
+ os.makedirs(os.path.join(self.tmp, "etc", "sudoers.d"))
+ self.patchOS(self.tmp)
+ self.patchUtils(self.tmp)
+ d.write_sudo_rules("harlowja", rules)
+ contents = util.load_file(d.ci_sudoers_fn)
+ return contents
+
+ def _count_in(self, lines_look_for, text_content):
+ found_amount = 0
+ for e in lines_look_for:
+ for line in text_content.splitlines():
+ line = line.strip()
+ if line == e:
+ found_amount += 1
+ return found_amount
+
+ def test_sudoers_ensure_rules(self):
+ rules = "ALL=(ALL:ALL) ALL"
+ contents = self._write_load_sudoers("harlowja", rules)
+ expected = ["harlowja ALL=(ALL:ALL) ALL"]
+ self.assertEqual(len(expected), self._count_in(expected, contents))
+ not_expected = [
+ "harlowja A",
+ "harlowja L",
+ "harlowja L",
+ ]
+ self.assertEqual(0, self._count_in(not_expected, contents))
+
+ def test_sudoers_ensure_rules_list(self):
+ rules = [
+ "ALL=(ALL:ALL) ALL",
+ "B-ALL=(ALL:ALL) ALL",
+ "C-ALL=(ALL:ALL) ALL",
+ ]
+ contents = self._write_load_sudoers("harlowja", rules)
+ expected = [
+ "harlowja ALL=(ALL:ALL) ALL",
+ "harlowja B-ALL=(ALL:ALL) ALL",
+ "harlowja C-ALL=(ALL:ALL) ALL",
+ ]
+ self.assertEqual(len(expected), self._count_in(expected, contents))
+ not_expected = [
+ "harlowja A",
+ "harlowja L",
+ "harlowja L",
+ ]
+ self.assertEqual(0, self._count_in(not_expected, contents))
+
+ def test_sudoers_ensure_new(self):
+ cls = distros.fetch("ubuntu")
+ d = cls("ubuntu", {}, None)
+ self.patchOS(self.tmp)
+ self.patchUtils(self.tmp)
+ d.ensure_sudo_dir("/b")
+ contents = util.load_file("/etc/sudoers")
+ self.assertIn("includedir /b", contents)
+ self.assertTrue(os.path.isdir("/b"))
+
+ def test_sudoers_ensure_append(self):
+ cls = distros.fetch("ubuntu")
+ d = cls("ubuntu", {}, None)
+ self.patchOS(self.tmp)
+ self.patchUtils(self.tmp)
+ util.write_file("/etc/sudoers", "josh, josh\n")
+ d.ensure_sudo_dir("/b")
+ contents = util.load_file("/etc/sudoers")
+ self.assertIn("includedir /b", contents)
+ self.assertTrue(os.path.isdir("/b"))
+ self.assertIn("josh", contents)
+ self.assertEqual(2, contents.count("josh"))
+
+ def test_sudoers_ensure_only_one_includedir(self):
+ cls = distros.fetch("ubuntu")
+ d = cls("ubuntu", {}, None)
+ self.patchOS(self.tmp)
+ self.patchUtils(self.tmp)
+ for char in ["#", "@"]:
+ util.write_file("/etc/sudoers", "{}includedir /b".format(char))
+ d.ensure_sudo_dir("/b")
+ contents = util.load_file("/etc/sudoers")
+ self.assertIn("includedir /b", contents)
+ self.assertTrue(os.path.isdir("/b"))
+ self.assertEqual(1, contents.count("includedir /b"))
+
+ def test_arch_package_mirror_info_unknown(self):
+ """for an unknown arch, we should get back that with arch 'default'."""
+ arch_mirrors = gapmi(package_mirrors, arch="unknown")
+ self.assertEqual(unknown_arch_info, arch_mirrors)
+
+ def test_arch_package_mirror_info_known(self):
+ arch_mirrors = gapmi(package_mirrors, arch="amd64")
+ self.assertEqual(package_mirrors[0], arch_mirrors)
+
+ def test_systemd_in_use(self):
+ cls = distros.fetch("ubuntu")
+ d = cls("ubuntu", {}, None)
+ self.patchOS(self.tmp)
+ self.patchUtils(self.tmp)
+ os.makedirs("/run/systemd/system")
+ self.assertTrue(d.uses_systemd())
+
+ def test_systemd_not_in_use(self):
+ cls = distros.fetch("ubuntu")
+ d = cls("ubuntu", {}, None)
+ self.patchOS(self.tmp)
+ self.patchUtils(self.tmp)
+ self.assertFalse(d.uses_systemd())
+
+ def test_systemd_symlink(self):
+ cls = distros.fetch("ubuntu")
+ d = cls("ubuntu", {}, None)
+ self.patchOS(self.tmp)
+ self.patchUtils(self.tmp)
+ os.makedirs("/run/systemd")
+ os.symlink("/", "/run/systemd/system")
+ self.assertFalse(d.uses_systemd())
+
+ @mock.patch("cloudinit.distros.debian.read_system_locale")
+ def test_get_locale_ubuntu(self, m_locale):
+ """Test ubuntu distro returns locale set to C.UTF-8"""
+ m_locale.return_value = "C.UTF-8"
+ cls = distros.fetch("ubuntu")
+ d = cls("ubuntu", {}, None)
+ locale = d.get_locale()
+ self.assertEqual("C.UTF-8", locale)
+
+ def test_get_locale_rhel(self):
+ """Test rhel distro returns NotImplementedError exception"""
+ cls = distros.fetch("rhel")
+ d = cls("rhel", {}, None)
+ with self.assertRaises(NotImplementedError):
+ d.get_locale()
+
+ def test_expire_passwd_uses_chpasswd(self):
+ """Test ubuntu.expire_passwd uses the passwd command."""
+ for d_name in ("ubuntu", "rhel"):
+ cls = distros.fetch(d_name)
+ d = cls(d_name, {}, None)
+ with mock.patch("cloudinit.subp.subp") as m_subp:
+ d.expire_passwd("myuser")
+ m_subp.assert_called_once_with(["passwd", "--expire", "myuser"])
+
+ def test_expire_passwd_freebsd_uses_pw_command(self):
+ """Test FreeBSD.expire_passwd uses the pw command."""
+ cls = distros.fetch("freebsd")
+ d = cls("freebsd", {}, None)
+ with mock.patch("cloudinit.subp.subp") as m_subp:
+ d.expire_passwd("myuser")
+ m_subp.assert_called_once_with(
+ ["pw", "usermod", "myuser", "-p", "01-Jan-1970"]
+ )
+
+
+class TestGetPackageMirrors:
+ def return_first(self, mlist):
+ if not mlist:
+ return None
+ return mlist[0]
+
+ def return_second(self, mlist):
+ if not mlist:
+ return None
+
+ return mlist[1] if len(mlist) > 1 else None
+
+ def return_none(self, _mlist):
+ return None
+
+ def return_last(self, mlist):
+ if not mlist:
+ return None
+ return mlist[-1]
+
+ @pytest.mark.parametrize(
+ "allow_ec2_mirror, platform_type, mirrors",
+ [
+ (
+ True,
+ "ec2",
+ [
+ {
+ "primary": "http://us-east-1.ec2/",
+ "security": "http://security-mirror1-intel",
+ },
+ {
+ "primary": "http://us-east-1a.clouds/",
+ "security": "http://security-mirror2-intel",
+ },
+ ],
+ ),
+ (
+ True,
+ "other",
+ [
+ {
+ "primary": "http://us-east-1.ec2/",
+ "security": "http://security-mirror1-intel",
+ },
+ {
+ "primary": "http://us-east-1a.clouds/",
+ "security": "http://security-mirror2-intel",
+ },
+ ],
+ ),
+ (
+ False,
+ "ec2",
+ [
+ {
+ "primary": "http://us-east-1.ec2/",
+ "security": "http://security-mirror1-intel",
+ },
+ {
+ "primary": "http://us-east-1a.clouds/",
+ "security": "http://security-mirror2-intel",
+ },
+ ],
+ ),
+ (
+ False,
+ "other",
+ [
+ {
+ "primary": "http://us-east-1a.clouds/",
+ "security": "http://security-mirror1-intel",
+ },
+ {
+ "primary": "http://fs-primary-intel",
+ "security": "http://security-mirror2-intel",
+ },
+ ],
+ ),
+ ],
+ )
+ def test_get_package_mirror_info_az_ec2(
+ self, allow_ec2_mirror, platform_type, mirrors
+ ):
+ flag_path = (
+ "cloudinit.distros.ALLOW_EC2_MIRRORS_ON_NON_AWS_INSTANCE_TYPES"
+ )
+ with mock.patch(flag_path, allow_ec2_mirror):
+ arch_mirrors = gapmi(package_mirrors, arch="amd64")
+ data_source_mock = mock.Mock(
+ availability_zone="us-east-1a", platform_type=platform_type
+ )
+
+ results = gpmi(
+ arch_mirrors,
+ data_source=data_source_mock,
+ mirror_filter=self.return_first,
+ )
+ assert results == mirrors[0]
+
+ results = gpmi(
+ arch_mirrors,
+ data_source=data_source_mock,
+ mirror_filter=self.return_second,
+ )
+ assert results == mirrors[1]
+
+ results = gpmi(
+ arch_mirrors,
+ data_source=data_source_mock,
+ mirror_filter=self.return_none,
+ )
+ assert results == package_mirrors[0]["failsafe"]
+
+ def test_get_package_mirror_info_az_non_ec2(self):
+ arch_mirrors = gapmi(package_mirrors, arch="amd64")
+ data_source_mock = mock.Mock(availability_zone="nova.cloudvendor")
+
+ results = gpmi(
+ arch_mirrors,
+ data_source=data_source_mock,
+ mirror_filter=self.return_first,
+ )
+ assert results == {
+ "primary": "http://nova.cloudvendor.clouds/",
+ "security": "http://security-mirror1-intel",
+ }
+
+ results = gpmi(
+ arch_mirrors,
+ data_source=data_source_mock,
+ mirror_filter=self.return_last,
+ )
+ assert results == {
+ "primary": "http://nova.cloudvendor.clouds/",
+ "security": "http://security-mirror2-intel",
+ }
+
+ def test_get_package_mirror_info_none(self):
+ arch_mirrors = gapmi(package_mirrors, arch="amd64")
+ data_source_mock = mock.Mock(availability_zone=None)
+
+ # because both search entries here replacement based on
+ # availability-zone, the filter will be called with an empty list and
+ # failsafe should be taken.
+ results = gpmi(
+ arch_mirrors,
+ data_source=data_source_mock,
+ mirror_filter=self.return_first,
+ )
+ assert results == {
+ "primary": "http://fs-primary-intel",
+ "security": "http://security-mirror1-intel",
+ }
+
+ results = gpmi(
+ arch_mirrors,
+ data_source=data_source_mock,
+ mirror_filter=self.return_last,
+ )
+ assert results == {
+ "primary": "http://fs-primary-intel",
+ "security": "http://security-mirror2-intel",
+ }
+
+
+# vi: ts=4 expandtab