diff options
author | zsdc <taras@vyos.io> | 2022-03-25 20:58:01 +0200 |
---|---|---|
committer | zsdc <taras@vyos.io> | 2022-03-25 21:42:00 +0200 |
commit | 31448cccedd8f841fb3ac7d0f2e3cdefe08a53ba (patch) | |
tree | 349631a02467dae0158f6f663cc8aa8537974a97 /tests/unittests/distros | |
parent | 5c4b3943343a85fbe517e5ec1fc670b3a8566b4b (diff) | |
parent | 8537237d80a48c8f0cbf8e66aa4826bbc882b022 (diff) | |
download | vyos-cloud-init-31448cccedd8f841fb3ac7d0f2e3cdefe08a53ba.tar.gz vyos-cloud-init-31448cccedd8f841fb3ac7d0f2e3cdefe08a53ba.zip |
T2117: Cloud-init updated to 22.1
Merged with 22.1 tag from the upstream Cloud-init repository.
Our modules were slightly modified for compatibility with the new
version.
Diffstat (limited to 'tests/unittests/distros')
22 files changed, 3362 insertions, 0 deletions
diff --git a/tests/unittests/distros/__init__.py b/tests/unittests/distros/__init__.py new file mode 100644 index 00000000..e66b9446 --- /dev/null +++ b/tests/unittests/distros/__init__.py @@ -0,0 +1,19 @@ +# This file is part of cloud-init. See LICENSE file for license information. +import copy + +from cloudinit import distros, helpers, settings + + +def _get_distro(dtype, system_info=None): + """Return a Distro class of distro 'dtype'. + + cfg is format of CFG_BUILTIN['system_info']. + + example: _get_distro("debian") + """ + if system_info is None: + system_info = copy.deepcopy(settings.CFG_BUILTIN["system_info"]) + system_info["distro"] = dtype + paths = helpers.Paths(system_info["paths"]) + distro_cls = distros.fetch(dtype) + return distro_cls(dtype, system_info, paths) diff --git a/tests/unittests/distros/test_arch.py b/tests/unittests/distros/test_arch.py new file mode 100644 index 00000000..5446295e --- /dev/null +++ b/tests/unittests/distros/test_arch.py @@ -0,0 +1,55 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +from cloudinit import util +from cloudinit.distros.arch import _render_network +from tests.unittests.helpers import CiTestCase, dir2dict + +from . import _get_distro + + +class TestArch(CiTestCase): + def test_get_distro(self): + distro = _get_distro("arch") + hostname = "myhostname" + hostfile = self.tmp_path("hostfile") + distro._write_hostname(hostname, hostfile) + self.assertEqual(hostname + "\n", util.load_file(hostfile)) + + +class TestRenderNetwork(CiTestCase): + def test_basic_static(self): + """Just the most basic static config. + + note 'lo' should not be rendered as an interface.""" + entries = { + "eth0": { + "auto": True, + "dns-nameservers": ["8.8.8.8"], + "bootproto": "static", + "address": "10.0.0.2", + "gateway": "10.0.0.1", + "netmask": "255.255.255.0", + }, + "lo": {"auto": True}, + } + target = self.tmp_dir() + devs = _render_network(entries, target=target) + files = dir2dict(target, prefix=target) + self.assertEqual(["eth0"], devs) + self.assertEqual( + { + "/etc/netctl/eth0": "\n".join( + [ + "Address=10.0.0.2/255.255.255.0", + "Connection=ethernet", + "DNS=('8.8.8.8')", + "Gateway=10.0.0.1", + "IP=static", + "Interface=eth0", + "", + ] + ), + "/etc/resolv.conf": "nameserver 8.8.8.8\n", + }, + files, + ) diff --git a/tests/unittests/distros/test_bsd_utils.py b/tests/unittests/distros/test_bsd_utils.py new file mode 100644 index 00000000..d6f0aeed --- /dev/null +++ b/tests/unittests/distros/test_bsd_utils.py @@ -0,0 +1,66 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +import cloudinit.distros.bsd_utils as bsd_utils +from tests.unittests.helpers import CiTestCase, ExitStack, mock + +RC_FILE = """ +if something; then + do something here +fi +hostname={hostname} +""" + + +class TestBsdUtils(CiTestCase): + def setUp(self): + super().setUp() + patches = ExitStack() + self.addCleanup(patches.close) + + self.load_file = patches.enter_context( + mock.patch.object(bsd_utils.util, "load_file") + ) + + self.write_file = patches.enter_context( + mock.patch.object(bsd_utils.util, "write_file") + ) + + def test_get_rc_config_value(self): + self.load_file.return_value = "hostname=foo\n" + self.assertEqual(bsd_utils.get_rc_config_value("hostname"), "foo") + self.load_file.assert_called_with("/etc/rc.conf") + + self.load_file.return_value = "hostname=foo" + self.assertEqual(bsd_utils.get_rc_config_value("hostname"), "foo") + + self.load_file.return_value = 'hostname="foo"' + self.assertEqual(bsd_utils.get_rc_config_value("hostname"), "foo") + + self.load_file.return_value = "hostname='foo'" + self.assertEqual(bsd_utils.get_rc_config_value("hostname"), "foo") + + self.load_file.return_value = "hostname='foo\"" + self.assertEqual(bsd_utils.get_rc_config_value("hostname"), "'foo\"") + + self.load_file.return_value = "" + self.assertEqual(bsd_utils.get_rc_config_value("hostname"), None) + + self.load_file.return_value = RC_FILE.format(hostname="foo") + self.assertEqual(bsd_utils.get_rc_config_value("hostname"), "foo") + + def test_set_rc_config_value_unchanged(self): + # bsd_utils.set_rc_config_value('hostname', 'foo') + # self.write_file.assert_called_with('/etc/rc.conf', 'hostname=foo\n') + + self.load_file.return_value = RC_FILE.format(hostname="foo") + self.write_file.assert_not_called() + + def test_set_rc_config_value(self): + bsd_utils.set_rc_config_value("hostname", "foo") + self.write_file.assert_called_with("/etc/rc.conf", "hostname=foo\n") + + self.load_file.return_value = RC_FILE.format(hostname="foo") + bsd_utils.set_rc_config_value("hostname", "bar") + self.write_file.assert_called_with( + "/etc/rc.conf", RC_FILE.format(hostname="bar") + ) diff --git a/tests/unittests/distros/test_create_users.py b/tests/unittests/distros/test_create_users.py new file mode 100644 index 00000000..ddb039bd --- /dev/null +++ b/tests/unittests/distros/test_create_users.py @@ -0,0 +1,282 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +import re + +from cloudinit import distros, ssh_util +from tests.unittests.helpers import CiTestCase, mock +from tests.unittests.util import abstract_to_concrete + + +@mock.patch("cloudinit.distros.util.system_is_snappy", return_value=False) +@mock.patch("cloudinit.distros.subp.subp") +class TestCreateUser(CiTestCase): + + with_logs = True + + def setUp(self): + super(TestCreateUser, self).setUp() + self.dist = abstract_to_concrete(distros.Distro)( + name="test", cfg=None, paths=None + ) + + def _useradd2call(self, args): + # return a mock call for the useradd command in args + # with expected 'logstring'. + args = ["useradd"] + args + logcmd = [a for a in args] + for i in range(len(args)): + if args[i] in ("--password",): + logcmd[i + 1] = "REDACTED" + return mock.call(args, logstring=logcmd) + + def test_basic(self, m_subp, m_is_snappy): + user = "foouser" + self.dist.create_user(user) + self.assertEqual( + m_subp.call_args_list, + [ + self._useradd2call([user, "-m"]), + mock.call(["passwd", "-l", user]), + ], + ) + + def test_no_home(self, m_subp, m_is_snappy): + user = "foouser" + self.dist.create_user(user, no_create_home=True) + self.assertEqual( + m_subp.call_args_list, + [ + self._useradd2call([user, "-M"]), + mock.call(["passwd", "-l", user]), + ], + ) + + def test_system_user(self, m_subp, m_is_snappy): + # system user should have no home and get --system + user = "foouser" + self.dist.create_user(user, system=True) + self.assertEqual( + m_subp.call_args_list, + [ + self._useradd2call([user, "--system", "-M"]), + mock.call(["passwd", "-l", user]), + ], + ) + + def test_explicit_no_home_false(self, m_subp, m_is_snappy): + user = "foouser" + self.dist.create_user(user, no_create_home=False) + self.assertEqual( + m_subp.call_args_list, + [ + self._useradd2call([user, "-m"]), + mock.call(["passwd", "-l", user]), + ], + ) + + def test_unlocked(self, m_subp, m_is_snappy): + user = "foouser" + self.dist.create_user(user, lock_passwd=False) + self.assertEqual( + m_subp.call_args_list, [self._useradd2call([user, "-m"])] + ) + + def test_set_password(self, m_subp, m_is_snappy): + user = "foouser" + password = "passfoo" + self.dist.create_user(user, passwd=password) + self.assertEqual( + m_subp.call_args_list, + [ + self._useradd2call([user, "--password", password, "-m"]), + mock.call(["passwd", "-l", user]), + ], + ) + + @mock.patch("cloudinit.distros.util.is_group") + def test_group_added(self, m_is_group, m_subp, m_is_snappy): + m_is_group.return_value = False + user = "foouser" + self.dist.create_user(user, groups=["group1"]) + expected = [ + mock.call(["groupadd", "group1"]), + self._useradd2call([user, "--groups", "group1", "-m"]), + mock.call(["passwd", "-l", user]), + ] + self.assertEqual(m_subp.call_args_list, expected) + + @mock.patch("cloudinit.distros.util.is_group") + def test_only_new_group_added(self, m_is_group, m_subp, m_is_snappy): + ex_groups = ["existing_group"] + groups = ["group1", ex_groups[0]] + m_is_group.side_effect = lambda m: m in ex_groups + user = "foouser" + self.dist.create_user(user, groups=groups) + expected = [ + mock.call(["groupadd", "group1"]), + self._useradd2call([user, "--groups", ",".join(groups), "-m"]), + mock.call(["passwd", "-l", user]), + ] + self.assertEqual(m_subp.call_args_list, expected) + + @mock.patch("cloudinit.distros.util.is_group") + def test_create_groups_with_whitespace_string( + self, m_is_group, m_subp, m_is_snappy + ): + # groups supported as a comma delimeted string even with white space + m_is_group.return_value = False + user = "foouser" + self.dist.create_user(user, groups="group1, group2") + expected = [ + mock.call(["groupadd", "group1"]), + mock.call(["groupadd", "group2"]), + self._useradd2call([user, "--groups", "group1,group2", "-m"]), + mock.call(["passwd", "-l", user]), + ] + self.assertEqual(m_subp.call_args_list, expected) + + def test_explicit_sudo_false(self, m_subp, m_is_snappy): + user = "foouser" + self.dist.create_user(user, sudo=False) + self.assertEqual( + m_subp.call_args_list, + [ + self._useradd2call([user, "-m"]), + mock.call(["passwd", "-l", user]), + ], + ) + + @mock.patch("cloudinit.ssh_util.setup_user_keys") + def test_setup_ssh_authorized_keys_with_string( + self, m_setup_user_keys, m_subp, m_is_snappy + ): + """ssh_authorized_keys allows string and calls setup_user_keys.""" + user = "foouser" + self.dist.create_user(user, ssh_authorized_keys="mykey") + self.assertEqual( + m_subp.call_args_list, + [ + self._useradd2call([user, "-m"]), + mock.call(["passwd", "-l", user]), + ], + ) + m_setup_user_keys.assert_called_once_with(set(["mykey"]), user) + + @mock.patch("cloudinit.ssh_util.setup_user_keys") + def test_setup_ssh_authorized_keys_with_list( + self, m_setup_user_keys, m_subp, m_is_snappy + ): + """ssh_authorized_keys allows lists and calls setup_user_keys.""" + user = "foouser" + self.dist.create_user(user, ssh_authorized_keys=["key1", "key2"]) + self.assertEqual( + m_subp.call_args_list, + [ + self._useradd2call([user, "-m"]), + mock.call(["passwd", "-l", user]), + ], + ) + m_setup_user_keys.assert_called_once_with(set(["key1", "key2"]), user) + + @mock.patch("cloudinit.ssh_util.setup_user_keys") + def test_setup_ssh_authorized_keys_with_integer( + self, m_setup_user_keys, m_subp, m_is_snappy + ): + """ssh_authorized_keys warns on non-iterable/string type.""" + user = "foouser" + self.dist.create_user(user, ssh_authorized_keys=-1) + m_setup_user_keys.assert_called_once_with(set([]), user) + match = re.match( + r".*WARNING: Invalid type \'<(type|class) \'int\'>\' detected for" + " 'ssh_authorized_keys'.*", + self.logs.getvalue(), + re.DOTALL, + ) + self.assertIsNotNone( + match, "Missing ssh_authorized_keys invalid type warning" + ) + + @mock.patch("cloudinit.ssh_util.setup_user_keys") + def test_create_user_with_ssh_redirect_user_no_cloud_keys( + self, m_setup_user_keys, m_subp, m_is_snappy + ): + """Log a warning when trying to redirect a user no cloud ssh keys.""" + user = "foouser" + self.dist.create_user(user, ssh_redirect_user="someuser") + self.assertIn( + "WARNING: Unable to disable SSH logins for foouser given " + "ssh_redirect_user: someuser. No cloud public-keys present.\n", + self.logs.getvalue(), + ) + m_setup_user_keys.assert_not_called() + + @mock.patch("cloudinit.ssh_util.setup_user_keys") + def test_create_user_with_ssh_redirect_user_with_cloud_keys( + self, m_setup_user_keys, m_subp, m_is_snappy + ): + """Disable ssh when ssh_redirect_user and cloud ssh keys are set.""" + user = "foouser" + self.dist.create_user( + user, ssh_redirect_user="someuser", cloud_public_ssh_keys=["key1"] + ) + disable_prefix = ssh_util.DISABLE_USER_OPTS + disable_prefix = disable_prefix.replace("$USER", "someuser") + disable_prefix = disable_prefix.replace("$DISABLE_USER", user) + m_setup_user_keys.assert_called_once_with( + set(["key1"]), "foouser", options=disable_prefix + ) + + @mock.patch("cloudinit.ssh_util.setup_user_keys") + def test_create_user_with_ssh_redirect_user_does_not_disable_auth_keys( + self, m_setup_user_keys, m_subp, m_is_snappy + ): + """Do not disable ssh_authorized_keys when ssh_redirect_user is set.""" + user = "foouser" + self.dist.create_user( + user, + ssh_authorized_keys="auth1", + ssh_redirect_user="someuser", + cloud_public_ssh_keys=["key1"], + ) + disable_prefix = ssh_util.DISABLE_USER_OPTS + disable_prefix = disable_prefix.replace("$USER", "someuser") + disable_prefix = disable_prefix.replace("$DISABLE_USER", user) + self.assertEqual( + m_setup_user_keys.call_args_list, + [ + mock.call(set(["auth1"]), user), # not disabled + mock.call(set(["key1"]), "foouser", options=disable_prefix), + ], + ) + + @mock.patch("cloudinit.distros.subp.which") + def test_lock_with_usermod_if_no_passwd( + self, m_which, m_subp, m_is_snappy + ): + """Lock uses usermod --lock if no 'passwd' cmd available.""" + m_which.side_effect = lambda m: m in ("usermod",) + self.dist.lock_passwd("bob") + self.assertEqual( + [mock.call(["usermod", "--lock", "bob"])], m_subp.call_args_list + ) + + @mock.patch("cloudinit.distros.subp.which") + def test_lock_with_passwd_if_available(self, m_which, m_subp, m_is_snappy): + """Lock with only passwd will use passwd.""" + m_which.side_effect = lambda m: m in ("passwd",) + self.dist.lock_passwd("bob") + self.assertEqual( + [mock.call(["passwd", "-l", "bob"])], m_subp.call_args_list + ) + + @mock.patch("cloudinit.distros.subp.which") + def test_lock_raises_runtime_if_no_commands( + self, m_which, m_subp, m_is_snappy + ): + """Lock with no commands available raises RuntimeError.""" + m_which.return_value = None + with self.assertRaises(RuntimeError): + self.dist.lock_passwd("bob") + + +# vi: ts=4 expandtab diff --git a/tests/unittests/distros/test_debian.py b/tests/unittests/distros/test_debian.py new file mode 100644 index 00000000..c7c5932e --- /dev/null +++ b/tests/unittests/distros/test_debian.py @@ -0,0 +1,211 @@ +# This file is part of cloud-init. See LICENSE file for license information. +from itertools import count, cycle +from unittest import mock + +import pytest + +from cloudinit import distros, subp, util +from cloudinit.distros.debian import APT_GET_COMMAND, APT_GET_WRAPPER +from tests.unittests.helpers import FilesystemMockingTestCase + + +@mock.patch("cloudinit.distros.debian.subp.subp") +class TestDebianApplyLocale(FilesystemMockingTestCase): + def setUp(self): + super(TestDebianApplyLocale, self).setUp() + self.new_root = self.tmp_dir() + self.patchOS(self.new_root) + self.patchUtils(self.new_root) + self.spath = self.tmp_path("etc/default/locale", self.new_root) + cls = distros.fetch("debian") + self.distro = cls("debian", {}, None) + + def test_no_rerun(self, m_subp): + """If system has defined locale, no re-run is expected.""" + m_subp.return_value = (None, None) + locale = "en_US.UTF-8" + util.write_file(self.spath, "LANG=%s\n" % locale, omode="w") + self.distro.apply_locale(locale, out_fn=self.spath) + m_subp.assert_not_called() + + def test_no_regen_on_c_utf8(self, m_subp): + """If locale is set to C.UTF8, do not attempt to call locale-gen""" + m_subp.return_value = (None, None) + locale = "C.UTF-8" + util.write_file(self.spath, "LANG=%s\n" % "en_US.UTF-8", omode="w") + self.distro.apply_locale(locale, out_fn=self.spath) + self.assertEqual( + [ + [ + "update-locale", + "--locale-file=" + self.spath, + "LANG=%s" % locale, + ] + ], + [p[0][0] for p in m_subp.call_args_list], + ) + + def test_rerun_if_different(self, m_subp): + """If system has different locale, locale-gen should be called.""" + m_subp.return_value = (None, None) + locale = "en_US.UTF-8" + util.write_file(self.spath, "LANG=fr_FR.UTF-8", omode="w") + self.distro.apply_locale(locale, out_fn=self.spath) + self.assertEqual( + [ + ["locale-gen", locale], + [ + "update-locale", + "--locale-file=" + self.spath, + "LANG=%s" % locale, + ], + ], + [p[0][0] for p in m_subp.call_args_list], + ) + + def test_rerun_if_no_file(self, m_subp): + """If system has no locale file, locale-gen should be called.""" + m_subp.return_value = (None, None) + locale = "en_US.UTF-8" + self.distro.apply_locale(locale, out_fn=self.spath) + self.assertEqual( + [ + ["locale-gen", locale], + [ + "update-locale", + "--locale-file=" + self.spath, + "LANG=%s" % locale, + ], + ], + [p[0][0] for p in m_subp.call_args_list], + ) + + def test_rerun_on_unset_system_locale(self, m_subp): + """If system has unset locale, locale-gen should be called.""" + m_subp.return_value = (None, None) + locale = "en_US.UTF-8" + util.write_file(self.spath, "LANG=", omode="w") + self.distro.apply_locale(locale, out_fn=self.spath) + self.assertEqual( + [ + ["locale-gen", locale], + [ + "update-locale", + "--locale-file=" + self.spath, + "LANG=%s" % locale, + ], + ], + [p[0][0] for p in m_subp.call_args_list], + ) + + def test_rerun_on_mismatched_keys(self, m_subp): + """If key is LC_ALL and system has only LANG, rerun is expected.""" + m_subp.return_value = (None, None) + locale = "en_US.UTF-8" + util.write_file(self.spath, "LANG=", omode="w") + self.distro.apply_locale(locale, out_fn=self.spath, keyname="LC_ALL") + self.assertEqual( + [ + ["locale-gen", locale], + [ + "update-locale", + "--locale-file=" + self.spath, + "LC_ALL=%s" % locale, + ], + ], + [p[0][0] for p in m_subp.call_args_list], + ) + + def test_falseish_locale_raises_valueerror(self, m_subp): + """locale as None or "" is invalid and should raise ValueError.""" + + with self.assertRaises(ValueError) as ctext_m: + self.distro.apply_locale(None) + m_subp.assert_not_called() + + self.assertEqual( + "Failed to provide locale value.", str(ctext_m.exception) + ) + + with self.assertRaises(ValueError) as ctext_m: + self.distro.apply_locale("") + m_subp.assert_not_called() + self.assertEqual( + "Failed to provide locale value.", str(ctext_m.exception) + ) + + +@mock.patch.dict("os.environ", {}, clear=True) +@mock.patch("cloudinit.distros.debian.subp.which", return_value=True) +@mock.patch("cloudinit.distros.debian.subp.subp") +class TestPackageCommand: + distro = distros.fetch("debian")("debian", {}, None) + + @mock.patch( + "cloudinit.distros.debian.Distro._apt_lock_available", + return_value=True, + ) + def test_simple_command(self, m_apt_avail, m_subp, m_which): + self.distro.package_command("update") + apt_args = [APT_GET_WRAPPER["command"]] + apt_args.extend(APT_GET_COMMAND) + apt_args.append("update") + expected_call = { + "args": apt_args, + "capture": False, + "env": {"DEBIAN_FRONTEND": "noninteractive"}, + } + assert m_subp.call_args == mock.call(**expected_call) + + @mock.patch( + "cloudinit.distros.debian.Distro._apt_lock_available", + side_effect=[False, False, True], + ) + @mock.patch("cloudinit.distros.debian.time.sleep") + def test_wait_for_lock(self, m_sleep, m_apt_avail, m_subp, m_which): + self.distro._wait_for_apt_command("stub", {"args": "stub2"}) + assert m_sleep.call_args_list == [mock.call(1), mock.call(1)] + assert m_subp.call_args_list == [mock.call(args="stub2")] + + @mock.patch( + "cloudinit.distros.debian.Distro._apt_lock_available", + return_value=False, + ) + @mock.patch("cloudinit.distros.debian.time.sleep") + @mock.patch("cloudinit.distros.debian.time.time", side_effect=count()) + def test_lock_wait_timeout( + self, m_time, m_sleep, m_apt_avail, m_subp, m_which + ): + with pytest.raises(TimeoutError): + self.distro._wait_for_apt_command("stub", "stub2", timeout=5) + assert m_subp.call_args_list == [] + + @mock.patch( + "cloudinit.distros.debian.Distro._apt_lock_available", + side_effect=cycle([True, False]), + ) + @mock.patch("cloudinit.distros.debian.time.sleep") + def test_lock_exception_wait(self, m_sleep, m_apt_avail, m_subp, m_which): + exception = subp.ProcessExecutionError( + exit_code=100, stderr="Could not get apt lock" + ) + m_subp.side_effect = [exception, exception, "return_thing"] + ret = self.distro._wait_for_apt_command("stub", {"args": "stub2"}) + assert ret == "return_thing" + + @mock.patch( + "cloudinit.distros.debian.Distro._apt_lock_available", + side_effect=cycle([True, False]), + ) + @mock.patch("cloudinit.distros.debian.time.sleep") + @mock.patch("cloudinit.distros.debian.time.time", side_effect=count()) + def test_lock_exception_timeout( + self, m_time, m_sleep, m_apt_avail, m_subp, m_which + ): + m_subp.side_effect = subp.ProcessExecutionError( + exit_code=100, stderr="Could not get apt lock" + ) + with pytest.raises(TimeoutError): + self.distro._wait_for_apt_command( + "stub", {"args": "stub2"}, timeout=5 + ) diff --git a/tests/unittests/distros/test_dragonflybsd.py b/tests/unittests/distros/test_dragonflybsd.py new file mode 100644 index 00000000..f0cd1b24 --- /dev/null +++ b/tests/unittests/distros/test_dragonflybsd.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python3 + + +import cloudinit.util +from tests.unittests.helpers import mock + + +def test_find_dragonflybsd_part(): + assert cloudinit.util.find_dragonflybsd_part("/dev/vbd0s3") == "vbd0s3" + + +@mock.patch("cloudinit.util.is_DragonFlyBSD") +@mock.patch("cloudinit.subp.subp") +def test_parse_mount(mock_subp, m_is_DragonFlyBSD): + mount_out = """ +vbd0s3 on / (hammer2, local) +devfs on /dev (devfs, nosymfollow, local) +/dev/vbd0s0a on /boot (ufs, local) +procfs on /proc (procfs, local) +tmpfs on /var/run/shm (tmpfs, local) +""" + + mock_subp.return_value = (mount_out, "") + m_is_DragonFlyBSD.return_value = True + assert cloudinit.util.parse_mount("/") == ("vbd0s3", "hammer2", "/") diff --git a/tests/unittests/distros/test_freebsd.py b/tests/unittests/distros/test_freebsd.py new file mode 100644 index 00000000..22be5098 --- /dev/null +++ b/tests/unittests/distros/test_freebsd.py @@ -0,0 +1,43 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +import os + +from cloudinit.util import find_freebsd_part, get_path_dev_freebsd +from tests.unittests.helpers import CiTestCase, mock + + +class TestDeviceLookUp(CiTestCase): + @mock.patch("cloudinit.subp.subp") + def test_find_freebsd_part_label(self, mock_subp): + glabel_out = """ +gptid/fa52d426-c337-11e6-8911-00155d4c5e47 N/A da0p1 + label/rootfs N/A da0p2 + label/swap N/A da0p3 +""" + mock_subp.return_value = (glabel_out, "") + res = find_freebsd_part("/dev/label/rootfs") + self.assertEqual("da0p2", res) + + @mock.patch("cloudinit.subp.subp") + def test_find_freebsd_part_gpt(self, mock_subp): + glabel_out = """ + gpt/bootfs N/A vtbd0p1 +gptid/3f4cbe26-75da-11e8-a8f2-002590ec6166 N/A vtbd0p1 + gpt/swapfs N/A vtbd0p2 + gpt/rootfs N/A vtbd0p3 + iso9660/cidata N/A vtbd2 +""" + mock_subp.return_value = (glabel_out, "") + res = find_freebsd_part("/dev/gpt/rootfs") + self.assertEqual("vtbd0p3", res) + + def test_get_path_dev_freebsd_label(self): + mnt_list = """ +/dev/label/rootfs / ufs rw 1 1 +devfs /dev devfs rw,multilabel 0 0 +fdescfs /dev/fd fdescfs rw 0 0 +/dev/da1s1 /mnt/resource ufs rw 2 2 +""" + with mock.patch.object(os.path, "exists", return_value=True): + res = get_path_dev_freebsd("/etc", mnt_list) + self.assertIsNotNone(res) diff --git a/tests/unittests/distros/test_generic.py b/tests/unittests/distros/test_generic.py new file mode 100644 index 00000000..93c5395c --- /dev/null +++ b/tests/unittests/distros/test_generic.py @@ -0,0 +1,383 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +import os +import shutil +import tempfile +from unittest import mock + +import pytest + +from cloudinit import distros, util +from tests.unittests import helpers + +unknown_arch_info = { + "arches": ["default"], + "failsafe": { + "primary": "http://fs-primary-default", + "security": "http://fs-security-default", + }, +} + +package_mirrors = [ + { + "arches": ["i386", "amd64"], + "failsafe": { + "primary": "http://fs-primary-intel", + "security": "http://fs-security-intel", + }, + "search": { + "primary": [ + "http://%(ec2_region)s.ec2/", + "http://%(availability_zone)s.clouds/", + ], + "security": [ + "http://security-mirror1-intel", + "http://security-mirror2-intel", + ], + }, + }, + { + "arches": ["armhf", "armel"], + "failsafe": { + "primary": "http://fs-primary-arm", + "security": "http://fs-security-arm", + }, + }, + unknown_arch_info, +] + +gpmi = distros._get_package_mirror_info +gapmi = distros._get_arch_package_mirror_info + + +class TestGenericDistro(helpers.FilesystemMockingTestCase): + def setUp(self): + super(TestGenericDistro, self).setUp() + # Make a temp directoy for tests to use. + self.tmp = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tmp) + + def _write_load_sudoers(self, _user, rules): + cls = distros.fetch("ubuntu") + d = cls("ubuntu", {}, None) + os.makedirs(os.path.join(self.tmp, "etc")) + os.makedirs(os.path.join(self.tmp, "etc", "sudoers.d")) + self.patchOS(self.tmp) + self.patchUtils(self.tmp) + d.write_sudo_rules("harlowja", rules) + contents = util.load_file(d.ci_sudoers_fn) + return contents + + def _count_in(self, lines_look_for, text_content): + found_amount = 0 + for e in lines_look_for: + for line in text_content.splitlines(): + line = line.strip() + if line == e: + found_amount += 1 + return found_amount + + def test_sudoers_ensure_rules(self): + rules = "ALL=(ALL:ALL) ALL" + contents = self._write_load_sudoers("harlowja", rules) + expected = ["harlowja ALL=(ALL:ALL) ALL"] + self.assertEqual(len(expected), self._count_in(expected, contents)) + not_expected = [ + "harlowja A", + "harlowja L", + "harlowja L", + ] + self.assertEqual(0, self._count_in(not_expected, contents)) + + def test_sudoers_ensure_rules_list(self): + rules = [ + "ALL=(ALL:ALL) ALL", + "B-ALL=(ALL:ALL) ALL", + "C-ALL=(ALL:ALL) ALL", + ] + contents = self._write_load_sudoers("harlowja", rules) + expected = [ + "harlowja ALL=(ALL:ALL) ALL", + "harlowja B-ALL=(ALL:ALL) ALL", + "harlowja C-ALL=(ALL:ALL) ALL", + ] + self.assertEqual(len(expected), self._count_in(expected, contents)) + not_expected = [ + "harlowja A", + "harlowja L", + "harlowja L", + ] + self.assertEqual(0, self._count_in(not_expected, contents)) + + def test_sudoers_ensure_new(self): + cls = distros.fetch("ubuntu") + d = cls("ubuntu", {}, None) + self.patchOS(self.tmp) + self.patchUtils(self.tmp) + d.ensure_sudo_dir("/b") + contents = util.load_file("/etc/sudoers") + self.assertIn("includedir /b", contents) + self.assertTrue(os.path.isdir("/b")) + + def test_sudoers_ensure_append(self): + cls = distros.fetch("ubuntu") + d = cls("ubuntu", {}, None) + self.patchOS(self.tmp) + self.patchUtils(self.tmp) + util.write_file("/etc/sudoers", "josh, josh\n") + d.ensure_sudo_dir("/b") + contents = util.load_file("/etc/sudoers") + self.assertIn("includedir /b", contents) + self.assertTrue(os.path.isdir("/b")) + self.assertIn("josh", contents) + self.assertEqual(2, contents.count("josh")) + + def test_sudoers_ensure_only_one_includedir(self): + cls = distros.fetch("ubuntu") + d = cls("ubuntu", {}, None) + self.patchOS(self.tmp) + self.patchUtils(self.tmp) + for char in ["#", "@"]: + util.write_file("/etc/sudoers", "{}includedir /b".format(char)) + d.ensure_sudo_dir("/b") + contents = util.load_file("/etc/sudoers") + self.assertIn("includedir /b", contents) + self.assertTrue(os.path.isdir("/b")) + self.assertEqual(1, contents.count("includedir /b")) + + def test_arch_package_mirror_info_unknown(self): + """for an unknown arch, we should get back that with arch 'default'.""" + arch_mirrors = gapmi(package_mirrors, arch="unknown") + self.assertEqual(unknown_arch_info, arch_mirrors) + + def test_arch_package_mirror_info_known(self): + arch_mirrors = gapmi(package_mirrors, arch="amd64") + self.assertEqual(package_mirrors[0], arch_mirrors) + + def test_systemd_in_use(self): + cls = distros.fetch("ubuntu") + d = cls("ubuntu", {}, None) + self.patchOS(self.tmp) + self.patchUtils(self.tmp) + os.makedirs("/run/systemd/system") + self.assertTrue(d.uses_systemd()) + + def test_systemd_not_in_use(self): + cls = distros.fetch("ubuntu") + d = cls("ubuntu", {}, None) + self.patchOS(self.tmp) + self.patchUtils(self.tmp) + self.assertFalse(d.uses_systemd()) + + def test_systemd_symlink(self): + cls = distros.fetch("ubuntu") + d = cls("ubuntu", {}, None) + self.patchOS(self.tmp) + self.patchUtils(self.tmp) + os.makedirs("/run/systemd") + os.symlink("/", "/run/systemd/system") + self.assertFalse(d.uses_systemd()) + + @mock.patch("cloudinit.distros.debian.read_system_locale") + def test_get_locale_ubuntu(self, m_locale): + """Test ubuntu distro returns locale set to C.UTF-8""" + m_locale.return_value = "C.UTF-8" + cls = distros.fetch("ubuntu") + d = cls("ubuntu", {}, None) + locale = d.get_locale() + self.assertEqual("C.UTF-8", locale) + + def test_get_locale_rhel(self): + """Test rhel distro returns NotImplementedError exception""" + cls = distros.fetch("rhel") + d = cls("rhel", {}, None) + with self.assertRaises(NotImplementedError): + d.get_locale() + + def test_expire_passwd_uses_chpasswd(self): + """Test ubuntu.expire_passwd uses the passwd command.""" + for d_name in ("ubuntu", "rhel"): + cls = distros.fetch(d_name) + d = cls(d_name, {}, None) + with mock.patch("cloudinit.subp.subp") as m_subp: + d.expire_passwd("myuser") + m_subp.assert_called_once_with(["passwd", "--expire", "myuser"]) + + def test_expire_passwd_freebsd_uses_pw_command(self): + """Test FreeBSD.expire_passwd uses the pw command.""" + cls = distros.fetch("freebsd") + d = cls("freebsd", {}, None) + with mock.patch("cloudinit.subp.subp") as m_subp: + d.expire_passwd("myuser") + m_subp.assert_called_once_with( + ["pw", "usermod", "myuser", "-p", "01-Jan-1970"] + ) + + +class TestGetPackageMirrors: + def return_first(self, mlist): + if not mlist: + return None + return mlist[0] + + def return_second(self, mlist): + if not mlist: + return None + + return mlist[1] if len(mlist) > 1 else None + + def return_none(self, _mlist): + return None + + def return_last(self, mlist): + if not mlist: + return None + return mlist[-1] + + @pytest.mark.parametrize( + "allow_ec2_mirror, platform_type, mirrors", + [ + ( + True, + "ec2", + [ + { + "primary": "http://us-east-1.ec2/", + "security": "http://security-mirror1-intel", + }, + { + "primary": "http://us-east-1a.clouds/", + "security": "http://security-mirror2-intel", + }, + ], + ), + ( + True, + "other", + [ + { + "primary": "http://us-east-1.ec2/", + "security": "http://security-mirror1-intel", + }, + { + "primary": "http://us-east-1a.clouds/", + "security": "http://security-mirror2-intel", + }, + ], + ), + ( + False, + "ec2", + [ + { + "primary": "http://us-east-1.ec2/", + "security": "http://security-mirror1-intel", + }, + { + "primary": "http://us-east-1a.clouds/", + "security": "http://security-mirror2-intel", + }, + ], + ), + ( + False, + "other", + [ + { + "primary": "http://us-east-1a.clouds/", + "security": "http://security-mirror1-intel", + }, + { + "primary": "http://fs-primary-intel", + "security": "http://security-mirror2-intel", + }, + ], + ), + ], + ) + def test_get_package_mirror_info_az_ec2( + self, allow_ec2_mirror, platform_type, mirrors + ): + flag_path = ( + "cloudinit.distros.ALLOW_EC2_MIRRORS_ON_NON_AWS_INSTANCE_TYPES" + ) + with mock.patch(flag_path, allow_ec2_mirror): + arch_mirrors = gapmi(package_mirrors, arch="amd64") + data_source_mock = mock.Mock( + availability_zone="us-east-1a", platform_type=platform_type + ) + + results = gpmi( + arch_mirrors, + data_source=data_source_mock, + mirror_filter=self.return_first, + ) + assert results == mirrors[0] + + results = gpmi( + arch_mirrors, + data_source=data_source_mock, + mirror_filter=self.return_second, + ) + assert results == mirrors[1] + + results = gpmi( + arch_mirrors, + data_source=data_source_mock, + mirror_filter=self.return_none, + ) + assert results == package_mirrors[0]["failsafe"] + + def test_get_package_mirror_info_az_non_ec2(self): + arch_mirrors = gapmi(package_mirrors, arch="amd64") + data_source_mock = mock.Mock(availability_zone="nova.cloudvendor") + + results = gpmi( + arch_mirrors, + data_source=data_source_mock, + mirror_filter=self.return_first, + ) + assert results == { + "primary": "http://nova.cloudvendor.clouds/", + "security": "http://security-mirror1-intel", + } + + results = gpmi( + arch_mirrors, + data_source=data_source_mock, + mirror_filter=self.return_last, + ) + assert results == { + "primary": "http://nova.cloudvendor.clouds/", + "security": "http://security-mirror2-intel", + } + + def test_get_package_mirror_info_none(self): + arch_mirrors = gapmi(package_mirrors, arch="amd64") + data_source_mock = mock.Mock(availability_zone=None) + + # because both search entries here replacement based on + # availability-zone, the filter will be called with an empty list and + # failsafe should be taken. + results = gpmi( + arch_mirrors, + data_source=data_source_mock, + mirror_filter=self.return_first, + ) + assert results == { + "primary": "http://fs-primary-intel", + "security": "http://security-mirror1-intel", + } + + results = gpmi( + arch_mirrors, + data_source=data_source_mock, + mirror_filter=self.return_last, + ) + assert results == { + "primary": "http://fs-primary-intel", + "security": "http://security-mirror2-intel", + } + + +# vi: ts=4 expandtab diff --git a/tests/unittests/distros/test_gentoo.py b/tests/unittests/distros/test_gentoo.py new file mode 100644 index 00000000..dadf5df5 --- /dev/null +++ b/tests/unittests/distros/test_gentoo.py @@ -0,0 +1,27 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +from cloudinit import atomic_helper, util +from tests.unittests.helpers import CiTestCase + +from . import _get_distro + + +class TestGentoo(CiTestCase): + def test_write_hostname(self): + distro = _get_distro("gentoo") + hostname = "myhostname" + hostfile = self.tmp_path("hostfile") + distro._write_hostname(hostname, hostfile) + self.assertEqual('hostname="myhostname"\n', util.load_file(hostfile)) + + def test_write_existing_hostname_with_comments(self): + distro = _get_distro("gentoo") + hostname = "myhostname" + contents = '#This is the hostname\nhostname="localhost"' + hostfile = self.tmp_path("hostfile") + atomic_helper.write_file(hostfile, contents, omode="w") + distro._write_hostname(hostname, hostfile) + self.assertEqual( + '#This is the hostname\nhostname="myhostname"\n', + util.load_file(hostfile), + ) diff --git a/tests/unittests/distros/test_hostname.py b/tests/unittests/distros/test_hostname.py new file mode 100644 index 00000000..2cbbb3e2 --- /dev/null +++ b/tests/unittests/distros/test_hostname.py @@ -0,0 +1,42 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +import unittest + +from cloudinit.distros.parsers import hostname + +BASE_HOSTNAME = """ +# My super-duper-hostname + +blahblah + +""" +BASE_HOSTNAME = BASE_HOSTNAME.strip() + + +class TestHostnameHelper(unittest.TestCase): + def test_parse_same(self): + hn = hostname.HostnameConf(BASE_HOSTNAME) + self.assertEqual(str(hn).strip(), BASE_HOSTNAME) + self.assertEqual(hn.hostname, "blahblah") + + def test_no_adjust_hostname(self): + hn = hostname.HostnameConf(BASE_HOSTNAME) + prev_name = hn.hostname + hn.set_hostname("") + self.assertEqual(hn.hostname, prev_name) + + def test_adjust_hostname(self): + hn = hostname.HostnameConf(BASE_HOSTNAME) + prev_name = hn.hostname + self.assertEqual(prev_name, "blahblah") + hn.set_hostname("bbbbd") + self.assertEqual(hn.hostname, "bbbbd") + expected_out = """ +# My super-duper-hostname + +bbbbd +""" + self.assertEqual(str(hn).strip(), expected_out.strip()) + + +# vi: ts=4 expandtab diff --git a/tests/unittests/distros/test_hosts.py b/tests/unittests/distros/test_hosts.py new file mode 100644 index 00000000..faffd912 --- /dev/null +++ b/tests/unittests/distros/test_hosts.py @@ -0,0 +1,47 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +import unittest + +from cloudinit.distros.parsers import hosts + +BASE_ETC = """ +# Example +127.0.0.1 localhost +192.168.1.10 foo.mydomain.org foo +192.168.1.10 bar.mydomain.org bar +146.82.138.7 master.debian.org master +209.237.226.90 www.opensource.org +""" +BASE_ETC = BASE_ETC.strip() + + +class TestHostsHelper(unittest.TestCase): + def test_parse(self): + eh = hosts.HostsConf(BASE_ETC) + self.assertEqual(eh.get_entry("127.0.0.1"), [["localhost"]]) + self.assertEqual( + eh.get_entry("192.168.1.10"), + [["foo.mydomain.org", "foo"], ["bar.mydomain.org", "bar"]], + ) + eh = str(eh) + self.assertTrue(eh.startswith("# Example")) + + def test_add(self): + eh = hosts.HostsConf(BASE_ETC) + eh.add_entry("127.0.0.0", "blah") + self.assertEqual(eh.get_entry("127.0.0.0"), [["blah"]]) + eh.add_entry("127.0.0.3", "blah", "blah2", "blah3") + self.assertEqual( + eh.get_entry("127.0.0.3"), [["blah", "blah2", "blah3"]] + ) + + def test_del(self): + eh = hosts.HostsConf(BASE_ETC) + eh.add_entry("127.0.0.0", "blah") + self.assertEqual(eh.get_entry("127.0.0.0"), [["blah"]]) + + eh.del_entries("127.0.0.0") + self.assertEqual(eh.get_entry("127.0.0.0"), []) + + +# vi: ts=4 expandtab diff --git a/tests/unittests/distros/test_init.py b/tests/unittests/distros/test_init.py new file mode 100644 index 00000000..8f3c8978 --- /dev/null +++ b/tests/unittests/distros/test_init.py @@ -0,0 +1,248 @@ +# Copyright (C) 2020 Canonical Ltd. +# +# Author: Daniel Watkins <oddbloke@ubuntu.com> +# +# This file is part of cloud-init. See LICENSE file for license information. +"""Tests for cloudinit/distros/__init__.py""" + +from unittest import mock + +import pytest + +from cloudinit.distros import LDH_ASCII_CHARS, _get_package_mirror_info + +# In newer versions of Python, these characters will be omitted instead +# of substituted because of security concerns. +# See https://bugs.python.org/issue43882 +SECURITY_URL_CHARS = "\n\r\t" + +# Define a set of characters we would expect to be replaced +INVALID_URL_CHARS = [ + chr(x) + for x in range(127) + if chr(x) not in LDH_ASCII_CHARS + SECURITY_URL_CHARS +] +for separator in [":", ".", "/", "#", "?", "@", "[", "]"]: + # Remove from the set characters that either separate hostname parts (":", + # "."), terminate hostnames ("/", "#", "?", "@"), or cause Python to be + # unable to parse URLs ("[", "]"). + INVALID_URL_CHARS.remove(separator) + + +class TestGetPackageMirrorInfo: + """ + Tests for cloudinit.distros._get_package_mirror_info. + + These supplement the tests in tests/unittests/test_distros/test_generic.py + which are more focused on testing a single production-like configuration. + These tests are more focused on specific aspects of the unit under test. + """ + + @pytest.mark.parametrize( + "mirror_info,expected", + [ + # Empty info gives empty return + ({}, {}), + # failsafe values used if present + ( + { + "failsafe": { + "primary": "http://value", + "security": "http://other", + } + }, + {"primary": "http://value", "security": "http://other"}, + ), + # search values used if present + ( + { + "search": { + "primary": ["http://value"], + "security": ["http://other"], + } + }, + {"primary": ["http://value"], "security": ["http://other"]}, + ), + # failsafe values used if search value not present + ( + { + "search": {"primary": ["http://value"]}, + "failsafe": {"security": "http://other"}, + }, + {"primary": ["http://value"], "security": "http://other"}, + ), + ], + ) + def test_get_package_mirror_info_failsafe(self, mirror_info, expected): + """ + Test the interaction between search and failsafe inputs + + (This doesn't test the case where the mirror_filter removes all search + options; test_failsafe_used_if_all_search_results_filtered_out covers + that.) + """ + assert expected == _get_package_mirror_info( + mirror_info, mirror_filter=lambda x: x + ) + + def test_failsafe_used_if_all_search_results_filtered_out(self): + """Test the failsafe option used if all search options eliminated.""" + mirror_info = { + "search": {"primary": ["http://value"]}, + "failsafe": {"primary": "http://other"}, + } + assert {"primary": "http://other"} == _get_package_mirror_info( + mirror_info, mirror_filter=lambda x: False + ) + + @pytest.mark.parametrize( + "allow_ec2_mirror, platform_type", [(True, "ec2")] + ) + @pytest.mark.parametrize( + "availability_zone,region,patterns,expected", + ( + # Test ec2_region alone + ( + "fk-fake-1f", + None, + ["http://EC2-%(ec2_region)s/ubuntu"], + ["http://ec2-fk-fake-1/ubuntu"], + ), + # Test availability_zone alone + ( + "fk-fake-1f", + None, + ["http://AZ-%(availability_zone)s/ubuntu"], + ["http://az-fk-fake-1f/ubuntu"], + ), + # Test region alone + ( + None, + "fk-fake-1", + ["http://RG-%(region)s/ubuntu"], + ["http://rg-fk-fake-1/ubuntu"], + ), + # Test that ec2_region is not available for non-matching AZs + ( + "fake-fake-1f", + None, + [ + "http://EC2-%(ec2_region)s/ubuntu", + "http://AZ-%(availability_zone)s/ubuntu", + ], + ["http://az-fake-fake-1f/ubuntu"], + ), + # Test that template order maintained + ( + None, + "fake-region", + [ + "http://RG-%(region)s-2/ubuntu", + "http://RG-%(region)s-1/ubuntu", + ], + [ + "http://rg-fake-region-2/ubuntu", + "http://rg-fake-region-1/ubuntu", + ], + ), + # Test that non-ASCII hostnames are IDNA encoded; + # "IDNA-ТεЅТ̣".encode('idna') == b"xn--idna--4kd53hh6aba3q" + ( + None, + "ТεЅТ̣", + ["http://www.IDNA-%(region)s.com/ubuntu"], + ["http://www.xn--idna--4kd53hh6aba3q.com/ubuntu"], + ), + # Test that non-ASCII hostnames with a port are IDNA encoded; + # "IDNA-ТεЅТ̣".encode('idna') == b"xn--idna--4kd53hh6aba3q" + ( + None, + "ТεЅТ̣", + ["http://www.IDNA-%(region)s.com:8080/ubuntu"], + ["http://www.xn--idna--4kd53hh6aba3q.com:8080/ubuntu"], + ), + # Test that non-ASCII non-hostname parts of URLs are unchanged + ( + None, + "ТεЅТ̣", + ["http://www.example.com/%(region)s/ubuntu"], + ["http://www.example.com/ТεЅТ̣/ubuntu"], + ), + # Test that IPv4 addresses are unchanged + ( + None, + "fk-fake-1", + ["http://192.168.1.1:8080/%(region)s/ubuntu"], + ["http://192.168.1.1:8080/fk-fake-1/ubuntu"], + ), + # Test that IPv6 addresses are unchanged + ( + None, + "fk-fake-1", + ["http://[2001:67c:1360:8001::23]/%(region)s/ubuntu"], + ["http://[2001:67c:1360:8001::23]/fk-fake-1/ubuntu"], + ), + # Test that unparseable URLs are filtered out of the mirror list + ( + None, + "inv[lid", + [ + "http://%(region)s.in.hostname/should/be/filtered", + "http://but.not.in.the.path/%(region)s", + ], + ["http://but.not.in.the.path/inv[lid"], + ), + ( + None, + "-some-region-", + ["http://-lead-ing.%(region)s.trail-ing-.example.com/ubuntu"], + ["http://lead-ing.some-region.trail-ing.example.com/ubuntu"], + ), + ) + + tuple( + # Dynamically generate a test case for each non-LDH + # (Letters/Digits/Hyphen) ASCII character, testing that it is + # substituted with a hyphen + ( + None, + "fk{0}fake{0}1".format(invalid_char), + ["http://%(region)s/ubuntu"], + ["http://fk-fake-1/ubuntu"], + ) + for invalid_char in INVALID_URL_CHARS + ), + ) + def test_valid_substitution( + self, + allow_ec2_mirror, + platform_type, + availability_zone, + region, + patterns, + expected, + ): + """Test substitution works as expected.""" + flag_path = ( + "cloudinit.distros.ALLOW_EC2_MIRRORS_ON_NON_AWS_INSTANCE_TYPES" + ) + + m_data_source = mock.Mock( + availability_zone=availability_zone, + region=region, + platform_type=platform_type, + ) + mirror_info = {"search": {"primary": patterns}} + + with mock.patch(flag_path, allow_ec2_mirror): + ret = _get_package_mirror_info( + mirror_info, + data_source=m_data_source, + mirror_filter=lambda x: x, + ) + print(allow_ec2_mirror) + print(platform_type) + print(availability_zone) + print(region) + print(patterns) + print(expected) + assert {"primary": expected} == ret diff --git a/tests/unittests/distros/test_manage_service.py b/tests/unittests/distros/test_manage_service.py new file mode 100644 index 00000000..9e64b35c --- /dev/null +++ b/tests/unittests/distros/test_manage_service.py @@ -0,0 +1,41 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +from tests.unittests.helpers import CiTestCase, mock +from tests.unittests.util import MockDistro + + +class TestManageService(CiTestCase): + + with_logs = True + + def setUp(self): + super(TestManageService, self).setUp() + self.dist = MockDistro() + + @mock.patch.object(MockDistro, "uses_systemd", return_value=False) + @mock.patch("cloudinit.distros.subp.subp") + def test_manage_service_systemctl_initcmd(self, m_subp, m_sysd): + self.dist.init_cmd = ["systemctl"] + self.dist.manage_service("start", "myssh") + m_subp.assert_called_with( + ["systemctl", "start", "myssh"], capture=True + ) + + @mock.patch.object(MockDistro, "uses_systemd", return_value=False) + @mock.patch("cloudinit.distros.subp.subp") + def test_manage_service_service_initcmd(self, m_subp, m_sysd): + self.dist.init_cmd = ["service"] + self.dist.manage_service("start", "myssh") + m_subp.assert_called_with(["service", "myssh", "start"], capture=True) + + @mock.patch.object(MockDistro, "uses_systemd", return_value=True) + @mock.patch("cloudinit.distros.subp.subp") + def test_manage_service_systemctl(self, m_subp, m_sysd): + self.dist.init_cmd = ["ignore"] + self.dist.manage_service("start", "myssh") + m_subp.assert_called_with( + ["systemctl", "start", "myssh"], capture=True + ) + + +# vi: ts=4 sw=4 expandtab diff --git a/tests/unittests/distros/test_netbsd.py b/tests/unittests/distros/test_netbsd.py new file mode 100644 index 00000000..0bc6dfbd --- /dev/null +++ b/tests/unittests/distros/test_netbsd.py @@ -0,0 +1,18 @@ +import unittest.mock as mock + +import pytest + +import cloudinit.distros.netbsd + + +@pytest.mark.parametrize("with_pkgin", (True, False)) +@mock.patch("cloudinit.distros.netbsd.os") +def test_init(m_os, with_pkgin): + print(with_pkgin) + m_os.path.exists.return_value = with_pkgin + cfg = {} + + distro = cloudinit.distros.netbsd.NetBSD("netbsd", cfg, None) + expectation = ["pkgin", "-y", "full-upgrade"] if with_pkgin else None + assert distro.pkg_cmd_upgrade_prefix == expectation + assert [mock.call("/usr/pkg/bin/pkgin")] == m_os.path.exists.call_args_list diff --git a/tests/unittests/distros/test_netconfig.py b/tests/unittests/distros/test_netconfig.py new file mode 100644 index 00000000..a25be481 --- /dev/null +++ b/tests/unittests/distros/test_netconfig.py @@ -0,0 +1,1013 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +import copy +import os +import re +from io import StringIO +from textwrap import dedent +from unittest import mock + +from cloudinit import distros, helpers, safeyaml, settings, subp, util +from cloudinit.distros.parsers.sys_conf import SysConf +from tests.unittests.helpers import FilesystemMockingTestCase, dir2dict + +BASE_NET_CFG = """ +auto lo +iface lo inet loopback + +auto eth0 +iface eth0 inet static + address 192.168.1.5 + broadcast 192.168.1.0 + gateway 192.168.1.254 + netmask 255.255.255.0 + network 192.168.0.0 + +auto eth1 +iface eth1 inet dhcp +""" + +BASE_NET_CFG_FROM_V2 = """ +auto lo +iface lo inet loopback + +auto eth0 +iface eth0 inet static + address 192.168.1.5/24 + gateway 192.168.1.254 + +auto eth1 +iface eth1 inet dhcp +""" + +BASE_NET_CFG_IPV6 = """ +auto lo +iface lo inet loopback + +auto eth0 +iface eth0 inet static + address 192.168.1.5 + netmask 255.255.255.0 + network 192.168.0.0 + broadcast 192.168.1.0 + gateway 192.168.1.254 + +iface eth0 inet6 static + address 2607:f0d0:1002:0011::2 + netmask 64 + gateway 2607:f0d0:1002:0011::1 + +iface eth1 inet static + address 192.168.1.6 + netmask 255.255.255.0 + network 192.168.0.0 + broadcast 192.168.1.0 + gateway 192.168.1.254 + +iface eth1 inet6 static + address 2607:f0d0:1002:0011::3 + netmask 64 + gateway 2607:f0d0:1002:0011::1 +""" + +V1_NET_CFG = { + "config": [ + { + "name": "eth0", + "subnets": [ + { + "address": "192.168.1.5", + "broadcast": "192.168.1.0", + "gateway": "192.168.1.254", + "netmask": "255.255.255.0", + "type": "static", + } + ], + "type": "physical", + }, + { + "name": "eth1", + "subnets": [{"control": "auto", "type": "dhcp4"}], + "type": "physical", + }, + ], + "version": 1, +} + +V1_NET_CFG_WITH_DUPS = """\ +# same value in interface specific dns and global dns +# should produce single entry in network file +version: 1 +config: + - type: physical + name: eth0 + subnets: + - type: static + address: 192.168.0.102/24 + dns_nameservers: [1.2.3.4] + dns_search: [test.com] + interface: eth0 + - type: nameserver + address: [1.2.3.4] + search: [test.com] +""" + +V1_NET_CFG_OUTPUT = """\ +# This file is generated from information provided by the datasource. Changes +# to it will not persist across an instance reboot. To disable cloud-init's +# network configuration capabilities, write a file +# /etc/cloud/cloud.cfg.d/99-disable-network-config.cfg with the following: +# network: {config: disabled} +auto lo +iface lo inet loopback + +auto eth0 +iface eth0 inet static + address 192.168.1.5/24 + broadcast 192.168.1.0 + gateway 192.168.1.254 + +auto eth1 +iface eth1 inet dhcp +""" + +V1_NET_CFG_IPV6_OUTPUT = """\ +# This file is generated from information provided by the datasource. Changes +# to it will not persist across an instance reboot. To disable cloud-init's +# network configuration capabilities, write a file +# /etc/cloud/cloud.cfg.d/99-disable-network-config.cfg with the following: +# network: {config: disabled} +auto lo +iface lo inet loopback + +auto eth0 +iface eth0 inet6 static + address 2607:f0d0:1002:0011::2/64 + gateway 2607:f0d0:1002:0011::1 + +auto eth1 +iface eth1 inet dhcp +""" + +V1_NET_CFG_IPV6 = { + "config": [ + { + "name": "eth0", + "subnets": [ + { + "address": "2607:f0d0:1002:0011::2", + "gateway": "2607:f0d0:1002:0011::1", + "netmask": "64", + "type": "static6", + } + ], + "type": "physical", + }, + { + "name": "eth1", + "subnets": [{"control": "auto", "type": "dhcp4"}], + "type": "physical", + }, + ], + "version": 1, +} + + +V1_TO_V2_NET_CFG_OUTPUT = """\ +# This file is generated from information provided by the datasource. Changes +# to it will not persist across an instance reboot. To disable cloud-init's +# network configuration capabilities, write a file +# /etc/cloud/cloud.cfg.d/99-disable-network-config.cfg with the following: +# network: {config: disabled} +network: + version: 2 + ethernets: + eth0: + addresses: + - 192.168.1.5/24 + gateway4: 192.168.1.254 + eth1: + dhcp4: true +""" + +V1_TO_V2_NET_CFG_IPV6_OUTPUT = """\ +# This file is generated from information provided by the datasource. Changes +# to it will not persist across an instance reboot. To disable cloud-init's +# network configuration capabilities, write a file +# /etc/cloud/cloud.cfg.d/99-disable-network-config.cfg with the following: +# network: {config: disabled} +network: + version: 2 + ethernets: + eth0: + addresses: + - 2607:f0d0:1002:0011::2/64 + gateway6: 2607:f0d0:1002:0011::1 + eth1: + dhcp4: true +""" + +V2_NET_CFG = { + "ethernets": { + "eth7": {"addresses": ["192.168.1.5/24"], "gateway4": "192.168.1.254"}, + "eth9": {"dhcp4": True}, + }, + "version": 2, +} + + +V2_TO_V2_NET_CFG_OUTPUT = """\ +# This file is generated from information provided by the datasource. Changes +# to it will not persist across an instance reboot. To disable cloud-init's +# network configuration capabilities, write a file +# /etc/cloud/cloud.cfg.d/99-disable-network-config.cfg with the following: +# network: {config: disabled} +network: + ethernets: + eth7: + addresses: + - 192.168.1.5/24 + gateway4: 192.168.1.254 + eth9: + dhcp4: true + version: 2 +""" + + +class WriteBuffer(object): + def __init__(self): + self.buffer = StringIO() + self.mode = None + self.omode = None + + def write(self, text): + self.buffer.write(text) + + def __str__(self): + return self.buffer.getvalue() + + +class TestNetCfgDistroBase(FilesystemMockingTestCase): + def setUp(self): + super(TestNetCfgDistroBase, self).setUp() + self.add_patch("cloudinit.util.system_is_snappy", "m_snappy") + + def _get_distro(self, dname, renderers=None): + cls = distros.fetch(dname) + cfg = settings.CFG_BUILTIN + cfg["system_info"]["distro"] = dname + if renderers: + cfg["system_info"]["network"] = {"renderers": renderers} + paths = helpers.Paths({}) + return cls(dname, cfg.get("system_info"), paths) + + def assertCfgEquals(self, blob1, blob2): + b1 = dict(SysConf(blob1.strip().splitlines())) + b2 = dict(SysConf(blob2.strip().splitlines())) + self.assertEqual(b1, b2) + for (k, v) in b1.items(): + self.assertIn(k, b2) + for (k, v) in b2.items(): + self.assertIn(k, b1) + for (k, v) in b1.items(): + self.assertEqual(v, b2[k]) + + +class TestNetCfgDistroFreeBSD(TestNetCfgDistroBase): + def setUp(self): + super(TestNetCfgDistroFreeBSD, self).setUp() + self.distro = self._get_distro("freebsd", renderers=["freebsd"]) + + def _apply_and_verify_freebsd( + self, apply_fn, config, expected_cfgs=None, bringup=False + ): + if not expected_cfgs: + raise ValueError("expected_cfg must not be None") + + tmpd = None + with mock.patch("cloudinit.net.freebsd.available") as m_avail: + m_avail.return_value = True + with self.reRooted(tmpd) as tmpd: + util.ensure_dir("/etc") + util.ensure_file("/etc/rc.conf") + util.ensure_file("/etc/resolv.conf") + apply_fn(config, bringup) + + results = dir2dict(tmpd) + for cfgpath, expected in expected_cfgs.items(): + print("----------") + print(expected) + print("^^^^ expected | rendered VVVVVVV") + print(results[cfgpath]) + print("----------") + self.assertEqual( + set(expected.split("\n")), set(results[cfgpath].split("\n")) + ) + self.assertEqual(0o644, get_mode(cfgpath, tmpd)) + + @mock.patch("cloudinit.net.get_interfaces_by_mac") + def test_apply_network_config_freebsd_standard(self, ifaces_mac): + ifaces_mac.return_value = { + "00:15:5d:4c:73:00": "eth0", + } + rc_conf_expected = """\ +defaultrouter=192.168.1.254 +ifconfig_eth0='192.168.1.5 netmask 255.255.255.0' +ifconfig_eth1=DHCP +""" + + expected_cfgs = { + "/etc/rc.conf": rc_conf_expected, + "/etc/resolv.conf": "", + } + self._apply_and_verify_freebsd( + self.distro.apply_network_config, + V1_NET_CFG, + expected_cfgs=expected_cfgs.copy(), + ) + + @mock.patch("cloudinit.net.get_interfaces_by_mac") + def test_apply_network_config_freebsd_ifrename(self, ifaces_mac): + ifaces_mac.return_value = { + "00:15:5d:4c:73:00": "vtnet0", + } + rc_conf_expected = """\ +ifconfig_vtnet0_name=eth0 +defaultrouter=192.168.1.254 +ifconfig_eth0='192.168.1.5 netmask 255.255.255.0' +ifconfig_eth1=DHCP +""" + + V1_NET_CFG_RENAME = copy.deepcopy(V1_NET_CFG) + V1_NET_CFG_RENAME["config"][0]["mac_address"] = "00:15:5d:4c:73:00" + + expected_cfgs = { + "/etc/rc.conf": rc_conf_expected, + "/etc/resolv.conf": "", + } + self._apply_and_verify_freebsd( + self.distro.apply_network_config, + V1_NET_CFG_RENAME, + expected_cfgs=expected_cfgs.copy(), + ) + + @mock.patch("cloudinit.net.get_interfaces_by_mac") + def test_apply_network_config_freebsd_nameserver(self, ifaces_mac): + ifaces_mac.return_value = { + "00:15:5d:4c:73:00": "eth0", + } + + V1_NET_CFG_DNS = copy.deepcopy(V1_NET_CFG) + ns = ["1.2.3.4"] + V1_NET_CFG_DNS["config"][0]["subnets"][0]["dns_nameservers"] = ns + expected_cfgs = {"/etc/resolv.conf": "nameserver 1.2.3.4\n"} + self._apply_and_verify_freebsd( + self.distro.apply_network_config, + V1_NET_CFG_DNS, + expected_cfgs=expected_cfgs.copy(), + ) + + +class TestNetCfgDistroUbuntuEni(TestNetCfgDistroBase): + def setUp(self): + super(TestNetCfgDistroUbuntuEni, self).setUp() + self.distro = self._get_distro("ubuntu", renderers=["eni"]) + + def eni_path(self): + return "/etc/network/interfaces.d/50-cloud-init.cfg" + + def _apply_and_verify_eni( + self, apply_fn, config, expected_cfgs=None, bringup=False + ): + if not expected_cfgs: + raise ValueError("expected_cfg must not be None") + + tmpd = None + with mock.patch("cloudinit.net.eni.available") as m_avail: + m_avail.return_value = True + with self.reRooted(tmpd) as tmpd: + apply_fn(config, bringup) + + results = dir2dict(tmpd) + for cfgpath, expected in expected_cfgs.items(): + print("----------") + print(expected) + print("^^^^ expected | rendered VVVVVVV") + print(results[cfgpath]) + print("----------") + self.assertEqual(expected, results[cfgpath]) + self.assertEqual(0o644, get_mode(cfgpath, tmpd)) + + def test_apply_network_config_eni_ub(self): + expected_cfgs = { + self.eni_path(): V1_NET_CFG_OUTPUT, + } + # ub_distro.apply_network_config(V1_NET_CFG, False) + self._apply_and_verify_eni( + self.distro.apply_network_config, + V1_NET_CFG, + expected_cfgs=expected_cfgs.copy(), + ) + + def test_apply_network_config_ipv6_ub(self): + expected_cfgs = {self.eni_path(): V1_NET_CFG_IPV6_OUTPUT} + self._apply_and_verify_eni( + self.distro.apply_network_config, + V1_NET_CFG_IPV6, + expected_cfgs=expected_cfgs.copy(), + ) + + +class TestNetCfgDistroUbuntuNetplan(TestNetCfgDistroBase): + def setUp(self): + super(TestNetCfgDistroUbuntuNetplan, self).setUp() + self.distro = self._get_distro("ubuntu", renderers=["netplan"]) + self.devlist = ["eth0", "lo"] + + def _apply_and_verify_netplan( + self, apply_fn, config, expected_cfgs=None, bringup=False + ): + if not expected_cfgs: + raise ValueError("expected_cfg must not be None") + + tmpd = None + with mock.patch("cloudinit.net.netplan.available", return_value=True): + with mock.patch( + "cloudinit.net.netplan.get_devicelist", + return_value=self.devlist, + ): + with self.reRooted(tmpd) as tmpd: + apply_fn(config, bringup) + + results = dir2dict(tmpd) + for cfgpath, expected in expected_cfgs.items(): + print("----------") + print(expected) + print("^^^^ expected | rendered VVVVVVV") + print(results[cfgpath]) + print("----------") + self.assertEqual(expected, results[cfgpath]) + self.assertEqual(0o644, get_mode(cfgpath, tmpd)) + + def netplan_path(self): + return "/etc/netplan/50-cloud-init.yaml" + + def test_apply_network_config_v1_to_netplan_ub(self): + expected_cfgs = { + self.netplan_path(): V1_TO_V2_NET_CFG_OUTPUT, + } + + # ub_distro.apply_network_config(V1_NET_CFG, False) + self._apply_and_verify_netplan( + self.distro.apply_network_config, + V1_NET_CFG, + expected_cfgs=expected_cfgs.copy(), + ) + + def test_apply_network_config_v1_ipv6_to_netplan_ub(self): + expected_cfgs = { + self.netplan_path(): V1_TO_V2_NET_CFG_IPV6_OUTPUT, + } + + # ub_distro.apply_network_config(V1_NET_CFG_IPV6, False) + self._apply_and_verify_netplan( + self.distro.apply_network_config, + V1_NET_CFG_IPV6, + expected_cfgs=expected_cfgs.copy(), + ) + + def test_apply_network_config_v2_passthrough_ub(self): + expected_cfgs = { + self.netplan_path(): V2_TO_V2_NET_CFG_OUTPUT, + } + # ub_distro.apply_network_config(V2_NET_CFG, False) + self._apply_and_verify_netplan( + self.distro.apply_network_config, + V2_NET_CFG, + expected_cfgs=expected_cfgs.copy(), + ) + + +class TestNetCfgDistroRedhat(TestNetCfgDistroBase): + def setUp(self): + super(TestNetCfgDistroRedhat, self).setUp() + self.distro = self._get_distro("rhel", renderers=["sysconfig"]) + + def ifcfg_path(self, ifname): + return "/etc/sysconfig/network-scripts/ifcfg-%s" % ifname + + def control_path(self): + return "/etc/sysconfig/network" + + def _apply_and_verify( + self, apply_fn, config, expected_cfgs=None, bringup=False + ): + if not expected_cfgs: + raise ValueError("expected_cfg must not be None") + + tmpd = None + with mock.patch("cloudinit.net.sysconfig.available") as m_avail: + m_avail.return_value = True + with self.reRooted(tmpd) as tmpd: + apply_fn(config, bringup) + + results = dir2dict(tmpd) + for cfgpath, expected in expected_cfgs.items(): + self.assertCfgEquals(expected, results[cfgpath]) + self.assertEqual(0o644, get_mode(cfgpath, tmpd)) + + def test_apply_network_config_rh(self): + expected_cfgs = { + self.ifcfg_path("eth0"): dedent( + """\ + BOOTPROTO=none + DEFROUTE=yes + DEVICE=eth0 + GATEWAY=192.168.1.254 + IPADDR=192.168.1.5 + NETMASK=255.255.255.0 + NM_CONTROLLED=no + ONBOOT=yes + TYPE=Ethernet + USERCTL=no + """ + ), + self.ifcfg_path("eth1"): dedent( + """\ + BOOTPROTO=dhcp + DEVICE=eth1 + NM_CONTROLLED=no + ONBOOT=yes + TYPE=Ethernet + USERCTL=no + """ + ), + self.control_path(): dedent( + """\ + NETWORKING=yes + """ + ), + } + # rh_distro.apply_network_config(V1_NET_CFG, False) + self._apply_and_verify( + self.distro.apply_network_config, + V1_NET_CFG, + expected_cfgs=expected_cfgs.copy(), + ) + + def test_apply_network_config_ipv6_rh(self): + expected_cfgs = { + self.ifcfg_path("eth0"): dedent( + """\ + BOOTPROTO=none + DEFROUTE=yes + DEVICE=eth0 + IPV6ADDR=2607:f0d0:1002:0011::2/64 + IPV6INIT=yes + IPV6_AUTOCONF=no + IPV6_DEFAULTGW=2607:f0d0:1002:0011::1 + IPV6_FORCE_ACCEPT_RA=no + NM_CONTROLLED=no + ONBOOT=yes + TYPE=Ethernet + USERCTL=no + """ + ), + self.ifcfg_path("eth1"): dedent( + """\ + BOOTPROTO=dhcp + DEVICE=eth1 + NM_CONTROLLED=no + ONBOOT=yes + TYPE=Ethernet + USERCTL=no + """ + ), + self.control_path(): dedent( + """\ + NETWORKING=yes + NETWORKING_IPV6=yes + IPV6_AUTOCONF=no + """ + ), + } + # rh_distro.apply_network_config(V1_NET_CFG_IPV6, False) + self._apply_and_verify( + self.distro.apply_network_config, + V1_NET_CFG_IPV6, + expected_cfgs=expected_cfgs.copy(), + ) + + def test_vlan_render_unsupported(self): + """Render officially unsupported vlan names.""" + cfg = { + "version": 2, + "ethernets": { + "eth0": { + "addresses": ["192.10.1.2/24"], + "match": {"macaddress": "00:16:3e:60:7c:df"}, + } + }, + "vlans": { + "infra0": { + "addresses": ["10.0.1.2/16"], + "id": 1001, + "link": "eth0", + } + }, + } + expected_cfgs = { + self.ifcfg_path("eth0"): dedent( + """\ + BOOTPROTO=none + DEVICE=eth0 + HWADDR=00:16:3e:60:7c:df + IPADDR=192.10.1.2 + NETMASK=255.255.255.0 + NM_CONTROLLED=no + ONBOOT=yes + TYPE=Ethernet + USERCTL=no + """ + ), + self.ifcfg_path("infra0"): dedent( + """\ + BOOTPROTO=none + DEVICE=infra0 + IPADDR=10.0.1.2 + NETMASK=255.255.0.0 + NM_CONTROLLED=no + ONBOOT=yes + PHYSDEV=eth0 + USERCTL=no + VLAN=yes + """ + ), + self.control_path(): dedent( + """\ + NETWORKING=yes + """ + ), + } + self._apply_and_verify( + self.distro.apply_network_config, cfg, expected_cfgs=expected_cfgs + ) + + def test_vlan_render(self): + cfg = { + "version": 2, + "ethernets": {"eth0": {"addresses": ["192.10.1.2/24"]}}, + "vlans": { + "eth0.1001": { + "addresses": ["10.0.1.2/16"], + "id": 1001, + "link": "eth0", + } + }, + } + expected_cfgs = { + self.ifcfg_path("eth0"): dedent( + """\ + BOOTPROTO=none + DEVICE=eth0 + IPADDR=192.10.1.2 + NETMASK=255.255.255.0 + NM_CONTROLLED=no + ONBOOT=yes + TYPE=Ethernet + USERCTL=no + """ + ), + self.ifcfg_path("eth0.1001"): dedent( + """\ + BOOTPROTO=none + DEVICE=eth0.1001 + IPADDR=10.0.1.2 + NETMASK=255.255.0.0 + NM_CONTROLLED=no + ONBOOT=yes + PHYSDEV=eth0 + USERCTL=no + VLAN=yes + """ + ), + self.control_path(): dedent( + """\ + NETWORKING=yes + """ + ), + } + self._apply_and_verify( + self.distro.apply_network_config, cfg, expected_cfgs=expected_cfgs + ) + + +class TestNetCfgDistroOpensuse(TestNetCfgDistroBase): + def setUp(self): + super(TestNetCfgDistroOpensuse, self).setUp() + self.distro = self._get_distro("opensuse", renderers=["sysconfig"]) + + def ifcfg_path(self, ifname): + return "/etc/sysconfig/network/ifcfg-%s" % ifname + + def _apply_and_verify( + self, apply_fn, config, expected_cfgs=None, bringup=False + ): + if not expected_cfgs: + raise ValueError("expected_cfg must not be None") + + tmpd = None + with mock.patch("cloudinit.net.sysconfig.available") as m_avail: + m_avail.return_value = True + with self.reRooted(tmpd) as tmpd: + apply_fn(config, bringup) + + results = dir2dict(tmpd) + for cfgpath, expected in expected_cfgs.items(): + self.assertCfgEquals(expected, results[cfgpath]) + self.assertEqual(0o644, get_mode(cfgpath, tmpd)) + + def test_apply_network_config_opensuse(self): + """Opensuse uses apply_network_config and renders sysconfig""" + expected_cfgs = { + self.ifcfg_path("eth0"): dedent( + """\ + BOOTPROTO=static + IPADDR=192.168.1.5 + NETMASK=255.255.255.0 + STARTMODE=auto + """ + ), + self.ifcfg_path("eth1"): dedent( + """\ + BOOTPROTO=dhcp4 + STARTMODE=auto + """ + ), + } + self._apply_and_verify( + self.distro.apply_network_config, + V1_NET_CFG, + expected_cfgs=expected_cfgs.copy(), + ) + + def test_apply_network_config_ipv6_opensuse(self): + """Opensuse uses apply_network_config and renders sysconfig w/ipv6""" + expected_cfgs = { + self.ifcfg_path("eth0"): dedent( + """\ + BOOTPROTO=static + IPADDR6=2607:f0d0:1002:0011::2/64 + STARTMODE=auto + """ + ), + self.ifcfg_path("eth1"): dedent( + """\ + BOOTPROTO=dhcp4 + STARTMODE=auto + """ + ), + } + self._apply_and_verify( + self.distro.apply_network_config, + V1_NET_CFG_IPV6, + expected_cfgs=expected_cfgs.copy(), + ) + + +class TestNetCfgDistroArch(TestNetCfgDistroBase): + def setUp(self): + super(TestNetCfgDistroArch, self).setUp() + self.distro = self._get_distro("arch", renderers=["netplan"]) + + def _apply_and_verify( + self, + apply_fn, + config, + expected_cfgs=None, + bringup=False, + with_netplan=False, + ): + if not expected_cfgs: + raise ValueError("expected_cfg must not be None") + + tmpd = None + with mock.patch( + "cloudinit.net.netplan.available", return_value=with_netplan + ): + with self.reRooted(tmpd) as tmpd: + apply_fn(config, bringup) + + results = dir2dict(tmpd) + for cfgpath, expected in expected_cfgs.items(): + print("----------") + print(expected) + print("^^^^ expected | rendered VVVVVVV") + print(results[cfgpath]) + print("----------") + self.assertEqual(expected, results[cfgpath]) + self.assertEqual(0o644, get_mode(cfgpath, tmpd)) + + def netctl_path(self, iface): + return "/etc/netctl/%s" % iface + + def netplan_path(self): + return "/etc/netplan/50-cloud-init.yaml" + + def test_apply_network_config_v1_without_netplan(self): + # Note that this is in fact an invalid netctl config: + # "Address=None/None" + # But this is what the renderer has been writing out for a long time, + # and the test's purpose is to assert that the netctl renderer is + # still being used in absence of netplan, not the correctness of the + # rendered netctl config. + expected_cfgs = { + self.netctl_path("eth0"): dedent( + """\ + Address=192.168.1.5/255.255.255.0 + Connection=ethernet + DNS=() + Gateway=192.168.1.254 + IP=static + Interface=eth0 + """ + ), + self.netctl_path("eth1"): dedent( + """\ + Address=None/None + Connection=ethernet + DNS=() + Gateway= + IP=dhcp + Interface=eth1 + """ + ), + } + + # ub_distro.apply_network_config(V1_NET_CFG, False) + self._apply_and_verify( + self.distro.apply_network_config, + V1_NET_CFG, + expected_cfgs=expected_cfgs.copy(), + with_netplan=False, + ) + + def test_apply_network_config_v1_with_netplan(self): + expected_cfgs = { + self.netplan_path(): dedent( + """\ + # generated by cloud-init + network: + version: 2 + ethernets: + eth0: + addresses: + - 192.168.1.5/24 + gateway4: 192.168.1.254 + eth1: + dhcp4: true + """ + ), + } + + with mock.patch( + "cloudinit.net.netplan.get_devicelist", return_value=[] + ): + self._apply_and_verify( + self.distro.apply_network_config, + V1_NET_CFG, + expected_cfgs=expected_cfgs.copy(), + with_netplan=True, + ) + + +class TestNetCfgDistroPhoton(TestNetCfgDistroBase): + def setUp(self): + super(TestNetCfgDistroPhoton, self).setUp() + self.distro = self._get_distro("photon", renderers=["networkd"]) + + def create_conf_dict(self, contents): + content_dict = {} + for line in contents: + if line: + line = line.strip() + if line and re.search(r"^\[(.+)\]$", line): + content_dict[line] = [] + key = line + elif line: + assert key + content_dict[key].append(line) + + return content_dict + + def compare_dicts(self, actual, expected): + for k, v in actual.items(): + self.assertEqual(sorted(expected[k]), sorted(v)) + + def _apply_and_verify( + self, apply_fn, config, expected_cfgs=None, bringup=False + ): + if not expected_cfgs: + raise ValueError("expected_cfg must not be None") + + tmpd = None + with mock.patch("cloudinit.net.networkd.available") as m_avail: + m_avail.return_value = True + with self.reRooted(tmpd) as tmpd: + apply_fn(config, bringup) + + results = dir2dict(tmpd) + for cfgpath, expected in expected_cfgs.items(): + actual = self.create_conf_dict(results[cfgpath].splitlines()) + self.compare_dicts(actual, expected) + self.assertEqual(0o644, get_mode(cfgpath, tmpd)) + + def nwk_file_path(self, ifname): + return "/etc/systemd/network/10-cloud-init-%s.network" % ifname + + def net_cfg_1(self, ifname): + ret = ( + """\ + [Match] + Name=%s + [Network] + DHCP=no + [Address] + Address=192.168.1.5/24 + [Route] + Gateway=192.168.1.254""" + % ifname + ) + return ret + + def net_cfg_2(self, ifname): + ret = ( + """\ + [Match] + Name=%s + [Network] + DHCP=ipv4""" + % ifname + ) + return ret + + def test_photon_network_config_v1(self): + tmp = self.net_cfg_1("eth0").splitlines() + expected_eth0 = self.create_conf_dict(tmp) + + tmp = self.net_cfg_2("eth1").splitlines() + expected_eth1 = self.create_conf_dict(tmp) + + expected_cfgs = { + self.nwk_file_path("eth0"): expected_eth0, + self.nwk_file_path("eth1"): expected_eth1, + } + + self._apply_and_verify( + self.distro.apply_network_config, V1_NET_CFG, expected_cfgs.copy() + ) + + def test_photon_network_config_v2(self): + tmp = self.net_cfg_1("eth7").splitlines() + expected_eth7 = self.create_conf_dict(tmp) + + tmp = self.net_cfg_2("eth9").splitlines() + expected_eth9 = self.create_conf_dict(tmp) + + expected_cfgs = { + self.nwk_file_path("eth7"): expected_eth7, + self.nwk_file_path("eth9"): expected_eth9, + } + + self._apply_and_verify( + self.distro.apply_network_config, V2_NET_CFG, expected_cfgs.copy() + ) + + def test_photon_network_config_v1_with_duplicates(self): + expected = """\ + [Match] + Name=eth0 + [Network] + DHCP=no + DNS=1.2.3.4 + Domains=test.com + [Address] + Address=192.168.0.102/24""" + + net_cfg = safeyaml.load(V1_NET_CFG_WITH_DUPS) + + expected = self.create_conf_dict(expected.splitlines()) + expected_cfgs = { + self.nwk_file_path("eth0"): expected, + } + + self._apply_and_verify( + self.distro.apply_network_config, net_cfg, expected_cfgs.copy() + ) + + +def get_mode(path, target=None): + return os.stat(subp.target_path(target, path)).st_mode & 0o777 + + +# vi: ts=4 expandtab diff --git a/tests/unittests/distros/test_networking.py b/tests/unittests/distros/test_networking.py new file mode 100644 index 00000000..274647cb --- /dev/null +++ b/tests/unittests/distros/test_networking.py @@ -0,0 +1,231 @@ +# See https://docs.pytest.org/en/stable/example +# /parametrize.html#parametrizing-conditional-raising +from contextlib import ExitStack as does_not_raise +from unittest import mock + +import pytest + +from cloudinit import net +from cloudinit.distros.networking import ( + BSDNetworking, + LinuxNetworking, + Networking, +) + + +@pytest.fixture +def generic_networking_cls(): + """Returns a direct Networking subclass which errors on /sys usage. + + This enables the direct testing of functionality only present on the + ``Networking`` super-class, and provides a check on accidentally using /sys + in that context. + """ + + class TestNetworking(Networking): + def is_physical(self, *args, **kwargs): + raise NotImplementedError + + def settle(self, *args, **kwargs): + raise NotImplementedError + + def try_set_link_up(self, *args, **kwargs): + raise NotImplementedError + + error = AssertionError("Unexpectedly used /sys in generic networking code") + with mock.patch( + "cloudinit.net.get_sys_class_path", + side_effect=error, + ): + yield TestNetworking + + +@pytest.fixture +def sys_class_net(tmpdir): + sys_class_net_path = tmpdir.join("sys/class/net") + sys_class_net_path.ensure_dir() + with mock.patch( + "cloudinit.net.get_sys_class_path", + return_value=sys_class_net_path.strpath + "/", + ): + yield sys_class_net_path + + +class TestBSDNetworkingIsPhysical: + def test_raises_notimplementederror(self): + with pytest.raises(NotImplementedError): + BSDNetworking().is_physical("eth0") + + +class TestLinuxNetworkingIsPhysical: + def test_returns_false_by_default(self, sys_class_net): + assert not LinuxNetworking().is_physical("eth0") + + def test_returns_false_if_devname_exists_but_not_physical( + self, sys_class_net + ): + devname = "eth0" + sys_class_net.join(devname).mkdir() + assert not LinuxNetworking().is_physical(devname) + + def test_returns_true_if_device_is_physical(self, sys_class_net): + devname = "eth0" + device_dir = sys_class_net.join(devname) + device_dir.mkdir() + device_dir.join("device").write("") + + assert LinuxNetworking().is_physical(devname) + + +class TestBSDNetworkingTrySetLinkUp: + def test_raises_notimplementederror(self): + with pytest.raises(NotImplementedError): + BSDNetworking().try_set_link_up("eth0") + + +@mock.patch("cloudinit.net.is_up") +@mock.patch("cloudinit.distros.networking.subp.subp") +class TestLinuxNetworkingTrySetLinkUp: + def test_calls_subp_return_true(self, m_subp, m_is_up): + devname = "eth0" + m_is_up.return_value = True + is_success = LinuxNetworking().try_set_link_up(devname) + + assert ( + mock.call(["ip", "link", "set", devname, "up"]) + == m_subp.call_args_list[-1] + ) + assert is_success + + def test_calls_subp_return_false(self, m_subp, m_is_up): + devname = "eth0" + m_is_up.return_value = False + is_success = LinuxNetworking().try_set_link_up(devname) + + assert ( + mock.call(["ip", "link", "set", devname, "up"]) + == m_subp.call_args_list[-1] + ) + assert not is_success + + +class TestBSDNetworkingSettle: + def test_settle_doesnt_error(self): + # This also implicitly tests that it doesn't use subp.subp + BSDNetworking().settle() + + +@pytest.mark.usefixtures("sys_class_net") +@mock.patch("cloudinit.distros.networking.util.udevadm_settle", autospec=True) +class TestLinuxNetworkingSettle: + def test_no_arguments(self, m_udevadm_settle): + LinuxNetworking().settle() + + assert [mock.call(exists=None)] == m_udevadm_settle.call_args_list + + def test_exists_argument(self, m_udevadm_settle): + LinuxNetworking().settle(exists="ens3") + + expected_path = net.sys_dev_path("ens3") + assert [ + mock.call(exists=expected_path) + ] == m_udevadm_settle.call_args_list + + +class TestNetworkingWaitForPhysDevs: + @pytest.fixture + def wait_for_physdevs_netcfg(self): + """This config is shared across all the tests in this class.""" + + def ethernet(mac, name, driver=None, device_id=None): + v2_cfg = {"set-name": name, "match": {"macaddress": mac}} + if driver: + v2_cfg["match"].update({"driver": driver}) + if device_id: + v2_cfg["match"].update({"device_id": device_id}) + + return v2_cfg + + physdevs = [ + ["aa:bb:cc:dd:ee:ff", "eth0", "virtio", "0x1000"], + ["00:11:22:33:44:55", "ens3", "e1000", "0x1643"], + ] + netcfg = { + "version": 2, + "ethernets": {args[1]: ethernet(*args) for args in physdevs}, + } + return netcfg + + def test_skips_settle_if_all_present( + self, + generic_networking_cls, + wait_for_physdevs_netcfg, + ): + networking = generic_networking_cls() + with mock.patch.object( + networking, "get_interfaces_by_mac" + ) as m_get_interfaces_by_mac: + m_get_interfaces_by_mac.side_effect = iter( + [{"aa:bb:cc:dd:ee:ff": "eth0", "00:11:22:33:44:55": "ens3"}] + ) + with mock.patch.object( + networking, "settle", autospec=True + ) as m_settle: + networking.wait_for_physdevs(wait_for_physdevs_netcfg) + assert 0 == m_settle.call_count + + def test_calls_udev_settle_on_missing( + self, + generic_networking_cls, + wait_for_physdevs_netcfg, + ): + networking = generic_networking_cls() + with mock.patch.object( + networking, "get_interfaces_by_mac" + ) as m_get_interfaces_by_mac: + m_get_interfaces_by_mac.side_effect = iter( + [ + { + "aa:bb:cc:dd:ee:ff": "eth0" + }, # first call ens3 is missing + { + "aa:bb:cc:dd:ee:ff": "eth0", + "00:11:22:33:44:55": "ens3", + }, # second call has both + ] + ) + with mock.patch.object( + networking, "settle", autospec=True + ) as m_settle: + networking.wait_for_physdevs(wait_for_physdevs_netcfg) + m_settle.assert_called_with(exists="ens3") + + @pytest.mark.parametrize( + "strict,expectation", + [(True, pytest.raises(RuntimeError)), (False, does_not_raise())], + ) + def test_retrying_and_strict_behaviour( + self, + strict, + expectation, + generic_networking_cls, + wait_for_physdevs_netcfg, + ): + networking = generic_networking_cls() + with mock.patch.object( + networking, "get_interfaces_by_mac" + ) as m_get_interfaces_by_mac: + m_get_interfaces_by_mac.return_value = {} + + with mock.patch.object( + networking, "settle", autospec=True + ) as m_settle: + with expectation: + networking.wait_for_physdevs( + wait_for_physdevs_netcfg, strict=strict + ) + + assert ( + 5 * len(wait_for_physdevs_netcfg["ethernets"]) + == m_settle.call_count + ) diff --git a/tests/unittests/distros/test_opensuse.py b/tests/unittests/distros/test_opensuse.py new file mode 100644 index 00000000..4a4b266f --- /dev/null +++ b/tests/unittests/distros/test_opensuse.py @@ -0,0 +1,11 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +from tests.unittests.helpers import CiTestCase + +from . import _get_distro + + +class TestopenSUSE(CiTestCase): + def test_get_distro(self): + distro = _get_distro("opensuse") + self.assertEqual(distro.osfamily, "suse") diff --git a/tests/unittests/distros/test_photon.py b/tests/unittests/distros/test_photon.py new file mode 100644 index 00000000..fed30c2b --- /dev/null +++ b/tests/unittests/distros/test_photon.py @@ -0,0 +1,68 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +from cloudinit import util +from tests.unittests.helpers import CiTestCase, mock + +from . import _get_distro + +SYSTEM_INFO = { + "paths": { + "cloud_dir": "/var/lib/cloud/", + "templates_dir": "/etc/cloud/templates/", + }, + "network": {"renderers": "networkd"}, +} + + +class TestPhoton(CiTestCase): + with_logs = True + distro = _get_distro("photon", SYSTEM_INFO) + expected_log_line = "Rely on PhotonOS default network config" + + def test_network_renderer(self): + self.assertEqual(self.distro._cfg["network"]["renderers"], "networkd") + + def test_get_distro(self): + self.assertEqual(self.distro.osfamily, "photon") + + @mock.patch("cloudinit.distros.photon.subp.subp") + def test_write_hostname(self, m_subp): + hostname = "myhostname" + hostfile = self.tmp_path("previous-hostname") + self.distro._write_hostname(hostname, hostfile) + self.assertEqual(hostname, util.load_file(hostfile)) + + ret = self.distro._read_hostname(hostfile) + self.assertEqual(ret, hostname) + + m_subp.return_value = (None, None) + hostfile += "hostfile" + self.distro._write_hostname(hostname, hostfile) + + m_subp.return_value = (hostname, None) + ret = self.distro._read_hostname(hostfile) + self.assertEqual(ret, hostname) + + self.logs.truncate(0) + m_subp.return_value = (None, "bla") + self.distro._write_hostname(hostname, None) + self.assertIn("Error while setting hostname", self.logs.getvalue()) + + @mock.patch("cloudinit.net.generate_fallback_config") + def test_fallback_netcfg(self, m_fallback_cfg): + + key = "disable_fallback_netcfg" + # Don't use fallback if no setting given + self.logs.truncate(0) + assert self.distro.generate_fallback_config() is None + self.assertIn(self.expected_log_line, self.logs.getvalue()) + + self.logs.truncate(0) + self.distro._cfg[key] = True + assert self.distro.generate_fallback_config() is None + self.assertIn(self.expected_log_line, self.logs.getvalue()) + + self.logs.truncate(0) + self.distro._cfg[key] = False + assert self.distro.generate_fallback_config() is not None + self.assertNotIn(self.expected_log_line, self.logs.getvalue()) diff --git a/tests/unittests/distros/test_resolv.py b/tests/unittests/distros/test_resolv.py new file mode 100644 index 00000000..65e78101 --- /dev/null +++ b/tests/unittests/distros/test_resolv.py @@ -0,0 +1,64 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +import re + +from cloudinit.distros.parsers import resolv_conf +from tests.unittests.helpers import TestCase + +BASE_RESOLVE = """ +; generated by /sbin/dhclient-script +search blah.yahoo.com yahoo.com +nameserver 10.15.44.14 +nameserver 10.15.30.92 +""" +BASE_RESOLVE = BASE_RESOLVE.strip() + + +class TestResolvHelper(TestCase): + def test_parse_same(self): + rp = resolv_conf.ResolvConf(BASE_RESOLVE) + rp_r = str(rp).strip() + self.assertEqual(BASE_RESOLVE, rp_r) + + def test_local_domain(self): + rp = resolv_conf.ResolvConf(BASE_RESOLVE) + self.assertIsNone(rp.local_domain) + + rp.local_domain = "bob" + self.assertEqual("bob", rp.local_domain) + self.assertIn("domain bob", str(rp)) + + def test_nameservers(self): + rp = resolv_conf.ResolvConf(BASE_RESOLVE) + self.assertIn("10.15.44.14", rp.nameservers) + self.assertIn("10.15.30.92", rp.nameservers) + rp.add_nameserver("10.2") + self.assertIn("10.2", rp.nameservers) + self.assertIn("nameserver 10.2", str(rp)) + self.assertNotIn("10.3", rp.nameservers) + self.assertEqual(len(rp.nameservers), 3) + rp.add_nameserver("10.2") + rp.add_nameserver("10.3") + self.assertNotIn("10.3", rp.nameservers) + + def test_search_domains(self): + rp = resolv_conf.ResolvConf(BASE_RESOLVE) + self.assertIn("yahoo.com", rp.search_domains) + self.assertIn("blah.yahoo.com", rp.search_domains) + rp.add_search_domain("bbb.y.com") + self.assertIn("bbb.y.com", rp.search_domains) + self.assertTrue(re.search(r"search(.*)bbb.y.com(.*)", str(rp))) + self.assertIn("bbb.y.com", rp.search_domains) + rp.add_search_domain("bbb.y.com") + self.assertEqual(len(rp.search_domains), 3) + rp.add_search_domain("bbb2.y.com") + self.assertEqual(len(rp.search_domains), 4) + rp.add_search_domain("bbb3.y.com") + self.assertEqual(len(rp.search_domains), 5) + rp.add_search_domain("bbb4.y.com") + self.assertEqual(len(rp.search_domains), 6) + self.assertRaises(ValueError, rp.add_search_domain, "bbb5.y.com") + self.assertEqual(len(rp.search_domains), 6) + + +# vi: ts=4 expandtab diff --git a/tests/unittests/distros/test_sles.py b/tests/unittests/distros/test_sles.py new file mode 100644 index 00000000..66b8b13d --- /dev/null +++ b/tests/unittests/distros/test_sles.py @@ -0,0 +1,11 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +from tests.unittests.helpers import CiTestCase + +from . import _get_distro + + +class TestSLES(CiTestCase): + def test_get_distro(self): + distro = _get_distro("sles") + self.assertEqual(distro.osfamily, "suse") diff --git a/tests/unittests/distros/test_sysconfig.py b/tests/unittests/distros/test_sysconfig.py new file mode 100644 index 00000000..d0979e17 --- /dev/null +++ b/tests/unittests/distros/test_sysconfig.py @@ -0,0 +1,92 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +import re + +from cloudinit.distros.parsers.sys_conf import SysConf +from tests.unittests.helpers import TestCase + +# Lots of good examples @ +# http://content.hccfl.edu/pollock/AUnix1/SysconfigFilesDesc.txt + + +class TestSysConfHelper(TestCase): + # This function was added in 2.7, make it work for 2.6 + def assertRegMatches(self, text, regexp): + regexp = re.compile(regexp) + self.assertTrue( + regexp.search(text), + msg="%s must match %s!" % (text, regexp.pattern), + ) + + def test_parse_no_change(self): + contents = """# A comment +USESMBAUTH=no +KEYTABLE=/usr/lib/kbd/keytables/us.map +SHORTDATE=$(date +%y:%m:%d:%H:%M) +HOSTNAME=blahblah +NETMASK0=255.255.255.0 +# Inline comment +LIST=$LOGROOT/incremental-list +IPV6TO4_ROUTING='eth0-:0004::1/64 eth1-:0005::1/64' +ETHTOOL_OPTS="-K ${DEVICE} tso on; -G ${DEVICE} rx 256 tx 256" +USEMD5=no""" + conf = SysConf(contents.splitlines()) + self.assertEqual(conf["HOSTNAME"], "blahblah") + self.assertEqual(conf["SHORTDATE"], "$(date +%y:%m:%d:%H:%M)") + # Should be unquoted + self.assertEqual( + conf["ETHTOOL_OPTS"], + "-K ${DEVICE} tso on; -G ${DEVICE} rx 256 tx 256", + ) + self.assertEqual(contents, str(conf)) + + def test_parse_shell_vars(self): + contents = "USESMBAUTH=$XYZ" + conf = SysConf(contents.splitlines()) + self.assertEqual(contents, str(conf)) + conf = SysConf("") + conf["B"] = "${ZZ}d apples" + # Should be quoted + self.assertEqual('B="${ZZ}d apples"', str(conf)) + conf = SysConf("") + conf["B"] = "$? d apples" + self.assertEqual('B="$? d apples"', str(conf)) + contents = 'IPMI_WATCHDOG_OPTIONS="timeout=60"' + conf = SysConf(contents.splitlines()) + self.assertEqual("IPMI_WATCHDOG_OPTIONS=timeout=60", str(conf)) + + def test_parse_adjust(self): + contents = 'IPV6TO4_ROUTING="eth0-:0004::1/64 eth1-:0005::1/64"' + conf = SysConf(contents.splitlines()) + # Should be unquoted + self.assertEqual( + "eth0-:0004::1/64 eth1-:0005::1/64", conf["IPV6TO4_ROUTING"] + ) + conf["IPV6TO4_ROUTING"] = "blah \tblah" + contents2 = str(conf).strip() + # Should be requoted due to whitespace + self.assertRegMatches( + contents2, r"IPV6TO4_ROUTING=[\']blah\s+blah[\']" + ) + + def test_parse_no_adjust_shell(self): + conf = SysConf("".splitlines()) + conf["B"] = " $(time)" + contents = str(conf) + self.assertEqual("B= $(time)", contents) + + def test_parse_empty(self): + contents = "" + conf = SysConf(contents.splitlines()) + self.assertEqual("", str(conf).strip()) + + def test_parse_add_new(self): + contents = "BLAH=b" + conf = SysConf(contents.splitlines()) + conf["Z"] = "d" + contents = str(conf) + self.assertIn("Z=d", contents) + self.assertIn("BLAH=b", contents) + + +# vi: ts=4 expandtab diff --git a/tests/unittests/distros/test_user_data_normalize.py b/tests/unittests/distros/test_user_data_normalize.py new file mode 100644 index 00000000..67ea024b --- /dev/null +++ b/tests/unittests/distros/test_user_data_normalize.py @@ -0,0 +1,365 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +from unittest import mock + +from cloudinit import distros, helpers, settings +from cloudinit.distros import ug_util +from tests.unittests.helpers import TestCase + +bcfg = { + "name": "bob", + "plain_text_passwd": "ubuntu", + "home": "/home/ubuntu", + "shell": "/bin/bash", + "lock_passwd": True, + "gecos": "Ubuntu", + "groups": ["foo"], +} + + +class TestUGNormalize(TestCase): + def setUp(self): + super(TestUGNormalize, self).setUp() + self.add_patch("cloudinit.util.system_is_snappy", "m_snappy") + + def _make_distro(self, dtype, def_user=None): + cfg = dict(settings.CFG_BUILTIN) + cfg["system_info"]["distro"] = dtype + paths = helpers.Paths(cfg["system_info"]["paths"]) + distro_cls = distros.fetch(dtype) + if def_user: + cfg["system_info"]["default_user"] = def_user.copy() + distro = distro_cls(dtype, cfg["system_info"], paths) + return distro + + def _norm(self, cfg, distro): + return ug_util.normalize_users_groups(cfg, distro) + + def test_group_dict(self): + distro = self._make_distro("ubuntu") + g = { + "groups": [ + {"ubuntu": ["foo", "bar"], "bob": "users"}, + "cloud-users", + {"bob": "users2"}, + ] + } + (_users, groups) = self._norm(g, distro) + self.assertIn("ubuntu", groups) + ub_members = groups["ubuntu"] + self.assertEqual(sorted(["foo", "bar"]), sorted(ub_members)) + self.assertIn("bob", groups) + b_members = groups["bob"] + self.assertEqual(sorted(["users", "users2"]), sorted(b_members)) + + def test_basic_groups(self): + distro = self._make_distro("ubuntu") + ug_cfg = { + "groups": ["bob"], + } + (users, groups) = self._norm(ug_cfg, distro) + self.assertIn("bob", groups) + self.assertEqual({}, users) + + def test_csv_groups(self): + distro = self._make_distro("ubuntu") + ug_cfg = { + "groups": "bob,joe,steve", + } + (users, groups) = self._norm(ug_cfg, distro) + self.assertIn("bob", groups) + self.assertIn("joe", groups) + self.assertIn("steve", groups) + self.assertEqual({}, users) + + def test_more_groups(self): + distro = self._make_distro("ubuntu") + ug_cfg = {"groups": ["bob", "joe", "steve"]} + (users, groups) = self._norm(ug_cfg, distro) + self.assertIn("bob", groups) + self.assertIn("joe", groups) + self.assertIn("steve", groups) + self.assertEqual({}, users) + + def test_member_groups(self): + distro = self._make_distro("ubuntu") + ug_cfg = { + "groups": { + "bob": ["s"], + "joe": [], + "steve": [], + } + } + (users, groups) = self._norm(ug_cfg, distro) + self.assertIn("bob", groups) + self.assertEqual(["s"], groups["bob"]) + self.assertEqual([], groups["joe"]) + self.assertIn("joe", groups) + self.assertIn("steve", groups) + self.assertEqual({}, users) + + def test_users_simple_dict(self): + distro = self._make_distro("ubuntu", bcfg) + ug_cfg = { + "users": { + "default": True, + } + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn("bob", users) + ug_cfg = { + "users": { + "default": "yes", + } + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn("bob", users) + ug_cfg = { + "users": { + "default": "1", + } + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn("bob", users) + + def test_users_simple_dict_no(self): + distro = self._make_distro("ubuntu", bcfg) + ug_cfg = { + "users": { + "default": False, + } + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertEqual({}, users) + ug_cfg = { + "users": { + "default": "no", + } + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertEqual({}, users) + + def test_users_simple_csv(self): + distro = self._make_distro("ubuntu") + ug_cfg = { + "users": "joe,bob", + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn("joe", users) + self.assertIn("bob", users) + self.assertEqual({"default": False}, users["joe"]) + self.assertEqual({"default": False}, users["bob"]) + + def test_users_simple(self): + distro = self._make_distro("ubuntu") + ug_cfg = { + "users": ["joe", "bob"], + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn("joe", users) + self.assertIn("bob", users) + self.assertEqual({"default": False}, users["joe"]) + self.assertEqual({"default": False}, users["bob"]) + + def test_users_old_user(self): + distro = self._make_distro("ubuntu", bcfg) + ug_cfg = {"user": "zetta", "users": "default"} + (users, _groups) = self._norm(ug_cfg, distro) + self.assertNotIn("bob", users) # Bob is not the default now, zetta is + self.assertIn("zetta", users) + self.assertTrue(users["zetta"]["default"]) + self.assertNotIn("default", users) + ug_cfg = {"user": "zetta", "users": "default, joe"} + (users, _groups) = self._norm(ug_cfg, distro) + self.assertNotIn("bob", users) # Bob is not the default now, zetta is + self.assertIn("joe", users) + self.assertIn("zetta", users) + self.assertTrue(users["zetta"]["default"]) + self.assertNotIn("default", users) + ug_cfg = {"user": "zetta", "users": ["bob", "joe"]} + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn("bob", users) + self.assertIn("joe", users) + self.assertIn("zetta", users) + self.assertTrue(users["zetta"]["default"]) + ug_cfg = { + "user": "zetta", + "users": { + "bob": True, + "joe": True, + }, + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn("bob", users) + self.assertIn("joe", users) + self.assertIn("zetta", users) + self.assertTrue(users["zetta"]["default"]) + ug_cfg = { + "user": "zetta", + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn("zetta", users) + ug_cfg = {} + (users, groups) = self._norm(ug_cfg, distro) + self.assertEqual({}, users) + self.assertEqual({}, groups) + + def test_users_dict_default_additional(self): + distro = self._make_distro("ubuntu", bcfg) + ug_cfg = { + "users": [{"name": "default", "blah": True}], + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn("bob", users) + self.assertEqual( + ",".join(distro.get_default_user()["groups"]), + users["bob"]["groups"], + ) + self.assertEqual(True, users["bob"]["blah"]) + self.assertEqual(True, users["bob"]["default"]) + + def test_users_dict_extract(self): + distro = self._make_distro("ubuntu", bcfg) + ug_cfg = { + "users": [ + "default", + ], + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn("bob", users) + (name, config) = ug_util.extract_default(users) + self.assertEqual(name, "bob") + expected_config = {} + def_config = None + try: + def_config = distro.get_default_user() + except NotImplementedError: + pass + if not def_config: + def_config = {} + expected_config.update(def_config) + + # Ignore these for now + expected_config.pop("name", None) + expected_config.pop("groups", None) + config.pop("groups", None) + self.assertEqual(config, expected_config) + + def test_users_dict_default(self): + distro = self._make_distro("ubuntu", bcfg) + ug_cfg = { + "users": [ + "default", + ], + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn("bob", users) + self.assertEqual( + ",".join(distro.get_default_user()["groups"]), + users["bob"]["groups"], + ) + self.assertEqual(True, users["bob"]["default"]) + + def test_users_dict_trans(self): + distro = self._make_distro("ubuntu") + ug_cfg = { + "users": [ + {"name": "joe", "tr-me": True}, + {"name": "bob"}, + ], + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn("joe", users) + self.assertIn("bob", users) + self.assertEqual({"tr_me": True, "default": False}, users["joe"]) + self.assertEqual({"default": False}, users["bob"]) + + def test_users_dict(self): + distro = self._make_distro("ubuntu") + ug_cfg = { + "users": [ + {"name": "joe"}, + {"name": "bob"}, + ], + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn("joe", users) + self.assertIn("bob", users) + self.assertEqual({"default": False}, users["joe"]) + self.assertEqual({"default": False}, users["bob"]) + + @mock.patch("cloudinit.subp.subp") + def test_create_snap_user(self, mock_subp): + mock_subp.side_effect = [ + ('{"username": "joe", "ssh-key-count": 1}\n', "") + ] + distro = self._make_distro("ubuntu") + ug_cfg = { + "users": [ + {"name": "joe", "snapuser": "joe@joe.com"}, + ], + } + (users, _groups) = self._norm(ug_cfg, distro) + for (user, config) in users.items(): + print("user=%s config=%s" % (user, config)) + username = distro.create_user(user, **config) + + snapcmd = ["snap", "create-user", "--sudoer", "--json", "joe@joe.com"] + mock_subp.assert_called_with(snapcmd, capture=True, logstring=snapcmd) + self.assertEqual(username, "joe") + + @mock.patch("cloudinit.subp.subp") + def test_create_snap_user_known(self, mock_subp): + mock_subp.side_effect = [ + ('{"username": "joe", "ssh-key-count": 1}\n', "") + ] + distro = self._make_distro("ubuntu") + ug_cfg = { + "users": [ + {"name": "joe", "snapuser": "joe@joe.com", "known": True}, + ], + } + (users, _groups) = self._norm(ug_cfg, distro) + for (user, config) in users.items(): + print("user=%s config=%s" % (user, config)) + username = distro.create_user(user, **config) + + snapcmd = [ + "snap", + "create-user", + "--sudoer", + "--json", + "--known", + "joe@joe.com", + ] + mock_subp.assert_called_with(snapcmd, capture=True, logstring=snapcmd) + self.assertEqual(username, "joe") + + @mock.patch("cloudinit.util.system_is_snappy") + @mock.patch("cloudinit.util.is_group") + @mock.patch("cloudinit.subp.subp") + def test_add_user_on_snappy_system( + self, mock_subp, mock_isgrp, mock_snappy + ): + mock_isgrp.return_value = False + mock_subp.return_value = True + mock_snappy.return_value = True + distro = self._make_distro("ubuntu") + ug_cfg = { + "users": [ + {"name": "joe", "groups": "users", "create_groups": True}, + ], + } + (users, _groups) = self._norm(ug_cfg, distro) + for (user, config) in users.items(): + print("user=%s config=%s" % (user, config)) + distro.add_user(user, **config) + + groupcmd = ["groupadd", "users", "--extrausers"] + addcmd = ["useradd", "joe", "--extrausers", "--groups", "users", "-m"] + + mock_subp.assert_any_call(groupcmd) + mock_subp.assert_any_call(addcmd, logstring=addcmd) + + +# vi: ts=4 expandtab |