summaryrefslogtreecommitdiff
path: root/tests/unittests/distros
diff options
context:
space:
mode:
authorzdc <zdc@users.noreply.github.com>2022-03-26 15:41:59 +0200
committerGitHub <noreply@github.com>2022-03-26 15:41:59 +0200
commitaa60d48c2711cdcd9f88a4e5c77379adb0408231 (patch)
tree349631a02467dae0158f6f663cc8aa8537974a97 /tests/unittests/distros
parent5c4b3943343a85fbe517e5ec1fc670b3a8566b4b (diff)
parent31448cccedd8f841fb3ac7d0f2e3cdefe08a53ba (diff)
downloadvyos-cloud-init-aa60d48c2711cdcd9f88a4e5c77379adb0408231.tar.gz
vyos-cloud-init-aa60d48c2711cdcd9f88a4e5c77379adb0408231.zip
Merge pull request #51 from zdc/T2117-sagitta-22.1
T2117: Cloud-init updated to 22.1
Diffstat (limited to 'tests/unittests/distros')
-rw-r--r--tests/unittests/distros/__init__.py19
-rw-r--r--tests/unittests/distros/test_arch.py55
-rw-r--r--tests/unittests/distros/test_bsd_utils.py66
-rw-r--r--tests/unittests/distros/test_create_users.py282
-rw-r--r--tests/unittests/distros/test_debian.py211
-rw-r--r--tests/unittests/distros/test_dragonflybsd.py25
-rw-r--r--tests/unittests/distros/test_freebsd.py43
-rw-r--r--tests/unittests/distros/test_generic.py383
-rw-r--r--tests/unittests/distros/test_gentoo.py27
-rw-r--r--tests/unittests/distros/test_hostname.py42
-rw-r--r--tests/unittests/distros/test_hosts.py47
-rw-r--r--tests/unittests/distros/test_init.py248
-rw-r--r--tests/unittests/distros/test_manage_service.py41
-rw-r--r--tests/unittests/distros/test_netbsd.py18
-rw-r--r--tests/unittests/distros/test_netconfig.py1013
-rw-r--r--tests/unittests/distros/test_networking.py231
-rw-r--r--tests/unittests/distros/test_opensuse.py11
-rw-r--r--tests/unittests/distros/test_photon.py68
-rw-r--r--tests/unittests/distros/test_resolv.py64
-rw-r--r--tests/unittests/distros/test_sles.py11
-rw-r--r--tests/unittests/distros/test_sysconfig.py92
-rw-r--r--tests/unittests/distros/test_user_data_normalize.py365
22 files changed, 3362 insertions, 0 deletions
diff --git a/tests/unittests/distros/__init__.py b/tests/unittests/distros/__init__.py
new file mode 100644
index 00000000..e66b9446
--- /dev/null
+++ b/tests/unittests/distros/__init__.py
@@ -0,0 +1,19 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+import copy
+
+from cloudinit import distros, helpers, settings
+
+
+def _get_distro(dtype, system_info=None):
+ """Return a Distro class of distro 'dtype'.
+
+ cfg is format of CFG_BUILTIN['system_info'].
+
+ example: _get_distro("debian")
+ """
+ if system_info is None:
+ system_info = copy.deepcopy(settings.CFG_BUILTIN["system_info"])
+ system_info["distro"] = dtype
+ paths = helpers.Paths(system_info["paths"])
+ distro_cls = distros.fetch(dtype)
+ return distro_cls(dtype, system_info, paths)
diff --git a/tests/unittests/distros/test_arch.py b/tests/unittests/distros/test_arch.py
new file mode 100644
index 00000000..5446295e
--- /dev/null
+++ b/tests/unittests/distros/test_arch.py
@@ -0,0 +1,55 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+from cloudinit import util
+from cloudinit.distros.arch import _render_network
+from tests.unittests.helpers import CiTestCase, dir2dict
+
+from . import _get_distro
+
+
+class TestArch(CiTestCase):
+ def test_get_distro(self):
+ distro = _get_distro("arch")
+ hostname = "myhostname"
+ hostfile = self.tmp_path("hostfile")
+ distro._write_hostname(hostname, hostfile)
+ self.assertEqual(hostname + "\n", util.load_file(hostfile))
+
+
+class TestRenderNetwork(CiTestCase):
+ def test_basic_static(self):
+ """Just the most basic static config.
+
+ note 'lo' should not be rendered as an interface."""
+ entries = {
+ "eth0": {
+ "auto": True,
+ "dns-nameservers": ["8.8.8.8"],
+ "bootproto": "static",
+ "address": "10.0.0.2",
+ "gateway": "10.0.0.1",
+ "netmask": "255.255.255.0",
+ },
+ "lo": {"auto": True},
+ }
+ target = self.tmp_dir()
+ devs = _render_network(entries, target=target)
+ files = dir2dict(target, prefix=target)
+ self.assertEqual(["eth0"], devs)
+ self.assertEqual(
+ {
+ "/etc/netctl/eth0": "\n".join(
+ [
+ "Address=10.0.0.2/255.255.255.0",
+ "Connection=ethernet",
+ "DNS=('8.8.8.8')",
+ "Gateway=10.0.0.1",
+ "IP=static",
+ "Interface=eth0",
+ "",
+ ]
+ ),
+ "/etc/resolv.conf": "nameserver 8.8.8.8\n",
+ },
+ files,
+ )
diff --git a/tests/unittests/distros/test_bsd_utils.py b/tests/unittests/distros/test_bsd_utils.py
new file mode 100644
index 00000000..d6f0aeed
--- /dev/null
+++ b/tests/unittests/distros/test_bsd_utils.py
@@ -0,0 +1,66 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+import cloudinit.distros.bsd_utils as bsd_utils
+from tests.unittests.helpers import CiTestCase, ExitStack, mock
+
+RC_FILE = """
+if something; then
+ do something here
+fi
+hostname={hostname}
+"""
+
+
+class TestBsdUtils(CiTestCase):
+ def setUp(self):
+ super().setUp()
+ patches = ExitStack()
+ self.addCleanup(patches.close)
+
+ self.load_file = patches.enter_context(
+ mock.patch.object(bsd_utils.util, "load_file")
+ )
+
+ self.write_file = patches.enter_context(
+ mock.patch.object(bsd_utils.util, "write_file")
+ )
+
+ def test_get_rc_config_value(self):
+ self.load_file.return_value = "hostname=foo\n"
+ self.assertEqual(bsd_utils.get_rc_config_value("hostname"), "foo")
+ self.load_file.assert_called_with("/etc/rc.conf")
+
+ self.load_file.return_value = "hostname=foo"
+ self.assertEqual(bsd_utils.get_rc_config_value("hostname"), "foo")
+
+ self.load_file.return_value = 'hostname="foo"'
+ self.assertEqual(bsd_utils.get_rc_config_value("hostname"), "foo")
+
+ self.load_file.return_value = "hostname='foo'"
+ self.assertEqual(bsd_utils.get_rc_config_value("hostname"), "foo")
+
+ self.load_file.return_value = "hostname='foo\""
+ self.assertEqual(bsd_utils.get_rc_config_value("hostname"), "'foo\"")
+
+ self.load_file.return_value = ""
+ self.assertEqual(bsd_utils.get_rc_config_value("hostname"), None)
+
+ self.load_file.return_value = RC_FILE.format(hostname="foo")
+ self.assertEqual(bsd_utils.get_rc_config_value("hostname"), "foo")
+
+ def test_set_rc_config_value_unchanged(self):
+ # bsd_utils.set_rc_config_value('hostname', 'foo')
+ # self.write_file.assert_called_with('/etc/rc.conf', 'hostname=foo\n')
+
+ self.load_file.return_value = RC_FILE.format(hostname="foo")
+ self.write_file.assert_not_called()
+
+ def test_set_rc_config_value(self):
+ bsd_utils.set_rc_config_value("hostname", "foo")
+ self.write_file.assert_called_with("/etc/rc.conf", "hostname=foo\n")
+
+ self.load_file.return_value = RC_FILE.format(hostname="foo")
+ bsd_utils.set_rc_config_value("hostname", "bar")
+ self.write_file.assert_called_with(
+ "/etc/rc.conf", RC_FILE.format(hostname="bar")
+ )
diff --git a/tests/unittests/distros/test_create_users.py b/tests/unittests/distros/test_create_users.py
new file mode 100644
index 00000000..ddb039bd
--- /dev/null
+++ b/tests/unittests/distros/test_create_users.py
@@ -0,0 +1,282 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+import re
+
+from cloudinit import distros, ssh_util
+from tests.unittests.helpers import CiTestCase, mock
+from tests.unittests.util import abstract_to_concrete
+
+
+@mock.patch("cloudinit.distros.util.system_is_snappy", return_value=False)
+@mock.patch("cloudinit.distros.subp.subp")
+class TestCreateUser(CiTestCase):
+
+ with_logs = True
+
+ def setUp(self):
+ super(TestCreateUser, self).setUp()
+ self.dist = abstract_to_concrete(distros.Distro)(
+ name="test", cfg=None, paths=None
+ )
+
+ def _useradd2call(self, args):
+ # return a mock call for the useradd command in args
+ # with expected 'logstring'.
+ args = ["useradd"] + args
+ logcmd = [a for a in args]
+ for i in range(len(args)):
+ if args[i] in ("--password",):
+ logcmd[i + 1] = "REDACTED"
+ return mock.call(args, logstring=logcmd)
+
+ def test_basic(self, m_subp, m_is_snappy):
+ user = "foouser"
+ self.dist.create_user(user)
+ self.assertEqual(
+ m_subp.call_args_list,
+ [
+ self._useradd2call([user, "-m"]),
+ mock.call(["passwd", "-l", user]),
+ ],
+ )
+
+ def test_no_home(self, m_subp, m_is_snappy):
+ user = "foouser"
+ self.dist.create_user(user, no_create_home=True)
+ self.assertEqual(
+ m_subp.call_args_list,
+ [
+ self._useradd2call([user, "-M"]),
+ mock.call(["passwd", "-l", user]),
+ ],
+ )
+
+ def test_system_user(self, m_subp, m_is_snappy):
+ # system user should have no home and get --system
+ user = "foouser"
+ self.dist.create_user(user, system=True)
+ self.assertEqual(
+ m_subp.call_args_list,
+ [
+ self._useradd2call([user, "--system", "-M"]),
+ mock.call(["passwd", "-l", user]),
+ ],
+ )
+
+ def test_explicit_no_home_false(self, m_subp, m_is_snappy):
+ user = "foouser"
+ self.dist.create_user(user, no_create_home=False)
+ self.assertEqual(
+ m_subp.call_args_list,
+ [
+ self._useradd2call([user, "-m"]),
+ mock.call(["passwd", "-l", user]),
+ ],
+ )
+
+ def test_unlocked(self, m_subp, m_is_snappy):
+ user = "foouser"
+ self.dist.create_user(user, lock_passwd=False)
+ self.assertEqual(
+ m_subp.call_args_list, [self._useradd2call([user, "-m"])]
+ )
+
+ def test_set_password(self, m_subp, m_is_snappy):
+ user = "foouser"
+ password = "passfoo"
+ self.dist.create_user(user, passwd=password)
+ self.assertEqual(
+ m_subp.call_args_list,
+ [
+ self._useradd2call([user, "--password", password, "-m"]),
+ mock.call(["passwd", "-l", user]),
+ ],
+ )
+
+ @mock.patch("cloudinit.distros.util.is_group")
+ def test_group_added(self, m_is_group, m_subp, m_is_snappy):
+ m_is_group.return_value = False
+ user = "foouser"
+ self.dist.create_user(user, groups=["group1"])
+ expected = [
+ mock.call(["groupadd", "group1"]),
+ self._useradd2call([user, "--groups", "group1", "-m"]),
+ mock.call(["passwd", "-l", user]),
+ ]
+ self.assertEqual(m_subp.call_args_list, expected)
+
+ @mock.patch("cloudinit.distros.util.is_group")
+ def test_only_new_group_added(self, m_is_group, m_subp, m_is_snappy):
+ ex_groups = ["existing_group"]
+ groups = ["group1", ex_groups[0]]
+ m_is_group.side_effect = lambda m: m in ex_groups
+ user = "foouser"
+ self.dist.create_user(user, groups=groups)
+ expected = [
+ mock.call(["groupadd", "group1"]),
+ self._useradd2call([user, "--groups", ",".join(groups), "-m"]),
+ mock.call(["passwd", "-l", user]),
+ ]
+ self.assertEqual(m_subp.call_args_list, expected)
+
+ @mock.patch("cloudinit.distros.util.is_group")
+ def test_create_groups_with_whitespace_string(
+ self, m_is_group, m_subp, m_is_snappy
+ ):
+ # groups supported as a comma delimeted string even with white space
+ m_is_group.return_value = False
+ user = "foouser"
+ self.dist.create_user(user, groups="group1, group2")
+ expected = [
+ mock.call(["groupadd", "group1"]),
+ mock.call(["groupadd", "group2"]),
+ self._useradd2call([user, "--groups", "group1,group2", "-m"]),
+ mock.call(["passwd", "-l", user]),
+ ]
+ self.assertEqual(m_subp.call_args_list, expected)
+
+ def test_explicit_sudo_false(self, m_subp, m_is_snappy):
+ user = "foouser"
+ self.dist.create_user(user, sudo=False)
+ self.assertEqual(
+ m_subp.call_args_list,
+ [
+ self._useradd2call([user, "-m"]),
+ mock.call(["passwd", "-l", user]),
+ ],
+ )
+
+ @mock.patch("cloudinit.ssh_util.setup_user_keys")
+ def test_setup_ssh_authorized_keys_with_string(
+ self, m_setup_user_keys, m_subp, m_is_snappy
+ ):
+ """ssh_authorized_keys allows string and calls setup_user_keys."""
+ user = "foouser"
+ self.dist.create_user(user, ssh_authorized_keys="mykey")
+ self.assertEqual(
+ m_subp.call_args_list,
+ [
+ self._useradd2call([user, "-m"]),
+ mock.call(["passwd", "-l", user]),
+ ],
+ )
+ m_setup_user_keys.assert_called_once_with(set(["mykey"]), user)
+
+ @mock.patch("cloudinit.ssh_util.setup_user_keys")
+ def test_setup_ssh_authorized_keys_with_list(
+ self, m_setup_user_keys, m_subp, m_is_snappy
+ ):
+ """ssh_authorized_keys allows lists and calls setup_user_keys."""
+ user = "foouser"
+ self.dist.create_user(user, ssh_authorized_keys=["key1", "key2"])
+ self.assertEqual(
+ m_subp.call_args_list,
+ [
+ self._useradd2call([user, "-m"]),
+ mock.call(["passwd", "-l", user]),
+ ],
+ )
+ m_setup_user_keys.assert_called_once_with(set(["key1", "key2"]), user)
+
+ @mock.patch("cloudinit.ssh_util.setup_user_keys")
+ def test_setup_ssh_authorized_keys_with_integer(
+ self, m_setup_user_keys, m_subp, m_is_snappy
+ ):
+ """ssh_authorized_keys warns on non-iterable/string type."""
+ user = "foouser"
+ self.dist.create_user(user, ssh_authorized_keys=-1)
+ m_setup_user_keys.assert_called_once_with(set([]), user)
+ match = re.match(
+ r".*WARNING: Invalid type \'<(type|class) \'int\'>\' detected for"
+ " 'ssh_authorized_keys'.*",
+ self.logs.getvalue(),
+ re.DOTALL,
+ )
+ self.assertIsNotNone(
+ match, "Missing ssh_authorized_keys invalid type warning"
+ )
+
+ @mock.patch("cloudinit.ssh_util.setup_user_keys")
+ def test_create_user_with_ssh_redirect_user_no_cloud_keys(
+ self, m_setup_user_keys, m_subp, m_is_snappy
+ ):
+ """Log a warning when trying to redirect a user no cloud ssh keys."""
+ user = "foouser"
+ self.dist.create_user(user, ssh_redirect_user="someuser")
+ self.assertIn(
+ "WARNING: Unable to disable SSH logins for foouser given "
+ "ssh_redirect_user: someuser. No cloud public-keys present.\n",
+ self.logs.getvalue(),
+ )
+ m_setup_user_keys.assert_not_called()
+
+ @mock.patch("cloudinit.ssh_util.setup_user_keys")
+ def test_create_user_with_ssh_redirect_user_with_cloud_keys(
+ self, m_setup_user_keys, m_subp, m_is_snappy
+ ):
+ """Disable ssh when ssh_redirect_user and cloud ssh keys are set."""
+ user = "foouser"
+ self.dist.create_user(
+ user, ssh_redirect_user="someuser", cloud_public_ssh_keys=["key1"]
+ )
+ disable_prefix = ssh_util.DISABLE_USER_OPTS
+ disable_prefix = disable_prefix.replace("$USER", "someuser")
+ disable_prefix = disable_prefix.replace("$DISABLE_USER", user)
+ m_setup_user_keys.assert_called_once_with(
+ set(["key1"]), "foouser", options=disable_prefix
+ )
+
+ @mock.patch("cloudinit.ssh_util.setup_user_keys")
+ def test_create_user_with_ssh_redirect_user_does_not_disable_auth_keys(
+ self, m_setup_user_keys, m_subp, m_is_snappy
+ ):
+ """Do not disable ssh_authorized_keys when ssh_redirect_user is set."""
+ user = "foouser"
+ self.dist.create_user(
+ user,
+ ssh_authorized_keys="auth1",
+ ssh_redirect_user="someuser",
+ cloud_public_ssh_keys=["key1"],
+ )
+ disable_prefix = ssh_util.DISABLE_USER_OPTS
+ disable_prefix = disable_prefix.replace("$USER", "someuser")
+ disable_prefix = disable_prefix.replace("$DISABLE_USER", user)
+ self.assertEqual(
+ m_setup_user_keys.call_args_list,
+ [
+ mock.call(set(["auth1"]), user), # not disabled
+ mock.call(set(["key1"]), "foouser", options=disable_prefix),
+ ],
+ )
+
+ @mock.patch("cloudinit.distros.subp.which")
+ def test_lock_with_usermod_if_no_passwd(
+ self, m_which, m_subp, m_is_snappy
+ ):
+ """Lock uses usermod --lock if no 'passwd' cmd available."""
+ m_which.side_effect = lambda m: m in ("usermod",)
+ self.dist.lock_passwd("bob")
+ self.assertEqual(
+ [mock.call(["usermod", "--lock", "bob"])], m_subp.call_args_list
+ )
+
+ @mock.patch("cloudinit.distros.subp.which")
+ def test_lock_with_passwd_if_available(self, m_which, m_subp, m_is_snappy):
+ """Lock with only passwd will use passwd."""
+ m_which.side_effect = lambda m: m in ("passwd",)
+ self.dist.lock_passwd("bob")
+ self.assertEqual(
+ [mock.call(["passwd", "-l", "bob"])], m_subp.call_args_list
+ )
+
+ @mock.patch("cloudinit.distros.subp.which")
+ def test_lock_raises_runtime_if_no_commands(
+ self, m_which, m_subp, m_is_snappy
+ ):
+ """Lock with no commands available raises RuntimeError."""
+ m_which.return_value = None
+ with self.assertRaises(RuntimeError):
+ self.dist.lock_passwd("bob")
+
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/distros/test_debian.py b/tests/unittests/distros/test_debian.py
new file mode 100644
index 00000000..c7c5932e
--- /dev/null
+++ b/tests/unittests/distros/test_debian.py
@@ -0,0 +1,211 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+from itertools import count, cycle
+from unittest import mock
+
+import pytest
+
+from cloudinit import distros, subp, util
+from cloudinit.distros.debian import APT_GET_COMMAND, APT_GET_WRAPPER
+from tests.unittests.helpers import FilesystemMockingTestCase
+
+
+@mock.patch("cloudinit.distros.debian.subp.subp")
+class TestDebianApplyLocale(FilesystemMockingTestCase):
+ def setUp(self):
+ super(TestDebianApplyLocale, self).setUp()
+ self.new_root = self.tmp_dir()
+ self.patchOS(self.new_root)
+ self.patchUtils(self.new_root)
+ self.spath = self.tmp_path("etc/default/locale", self.new_root)
+ cls = distros.fetch("debian")
+ self.distro = cls("debian", {}, None)
+
+ def test_no_rerun(self, m_subp):
+ """If system has defined locale, no re-run is expected."""
+ m_subp.return_value = (None, None)
+ locale = "en_US.UTF-8"
+ util.write_file(self.spath, "LANG=%s\n" % locale, omode="w")
+ self.distro.apply_locale(locale, out_fn=self.spath)
+ m_subp.assert_not_called()
+
+ def test_no_regen_on_c_utf8(self, m_subp):
+ """If locale is set to C.UTF8, do not attempt to call locale-gen"""
+ m_subp.return_value = (None, None)
+ locale = "C.UTF-8"
+ util.write_file(self.spath, "LANG=%s\n" % "en_US.UTF-8", omode="w")
+ self.distro.apply_locale(locale, out_fn=self.spath)
+ self.assertEqual(
+ [
+ [
+ "update-locale",
+ "--locale-file=" + self.spath,
+ "LANG=%s" % locale,
+ ]
+ ],
+ [p[0][0] for p in m_subp.call_args_list],
+ )
+
+ def test_rerun_if_different(self, m_subp):
+ """If system has different locale, locale-gen should be called."""
+ m_subp.return_value = (None, None)
+ locale = "en_US.UTF-8"
+ util.write_file(self.spath, "LANG=fr_FR.UTF-8", omode="w")
+ self.distro.apply_locale(locale, out_fn=self.spath)
+ self.assertEqual(
+ [
+ ["locale-gen", locale],
+ [
+ "update-locale",
+ "--locale-file=" + self.spath,
+ "LANG=%s" % locale,
+ ],
+ ],
+ [p[0][0] for p in m_subp.call_args_list],
+ )
+
+ def test_rerun_if_no_file(self, m_subp):
+ """If system has no locale file, locale-gen should be called."""
+ m_subp.return_value = (None, None)
+ locale = "en_US.UTF-8"
+ self.distro.apply_locale(locale, out_fn=self.spath)
+ self.assertEqual(
+ [
+ ["locale-gen", locale],
+ [
+ "update-locale",
+ "--locale-file=" + self.spath,
+ "LANG=%s" % locale,
+ ],
+ ],
+ [p[0][0] for p in m_subp.call_args_list],
+ )
+
+ def test_rerun_on_unset_system_locale(self, m_subp):
+ """If system has unset locale, locale-gen should be called."""
+ m_subp.return_value = (None, None)
+ locale = "en_US.UTF-8"
+ util.write_file(self.spath, "LANG=", omode="w")
+ self.distro.apply_locale(locale, out_fn=self.spath)
+ self.assertEqual(
+ [
+ ["locale-gen", locale],
+ [
+ "update-locale",
+ "--locale-file=" + self.spath,
+ "LANG=%s" % locale,
+ ],
+ ],
+ [p[0][0] for p in m_subp.call_args_list],
+ )
+
+ def test_rerun_on_mismatched_keys(self, m_subp):
+ """If key is LC_ALL and system has only LANG, rerun is expected."""
+ m_subp.return_value = (None, None)
+ locale = "en_US.UTF-8"
+ util.write_file(self.spath, "LANG=", omode="w")
+ self.distro.apply_locale(locale, out_fn=self.spath, keyname="LC_ALL")
+ self.assertEqual(
+ [
+ ["locale-gen", locale],
+ [
+ "update-locale",
+ "--locale-file=" + self.spath,
+ "LC_ALL=%s" % locale,
+ ],
+ ],
+ [p[0][0] for p in m_subp.call_args_list],
+ )
+
+ def test_falseish_locale_raises_valueerror(self, m_subp):
+ """locale as None or "" is invalid and should raise ValueError."""
+
+ with self.assertRaises(ValueError) as ctext_m:
+ self.distro.apply_locale(None)
+ m_subp.assert_not_called()
+
+ self.assertEqual(
+ "Failed to provide locale value.", str(ctext_m.exception)
+ )
+
+ with self.assertRaises(ValueError) as ctext_m:
+ self.distro.apply_locale("")
+ m_subp.assert_not_called()
+ self.assertEqual(
+ "Failed to provide locale value.", str(ctext_m.exception)
+ )
+
+
+@mock.patch.dict("os.environ", {}, clear=True)
+@mock.patch("cloudinit.distros.debian.subp.which", return_value=True)
+@mock.patch("cloudinit.distros.debian.subp.subp")
+class TestPackageCommand:
+ distro = distros.fetch("debian")("debian", {}, None)
+
+ @mock.patch(
+ "cloudinit.distros.debian.Distro._apt_lock_available",
+ return_value=True,
+ )
+ def test_simple_command(self, m_apt_avail, m_subp, m_which):
+ self.distro.package_command("update")
+ apt_args = [APT_GET_WRAPPER["command"]]
+ apt_args.extend(APT_GET_COMMAND)
+ apt_args.append("update")
+ expected_call = {
+ "args": apt_args,
+ "capture": False,
+ "env": {"DEBIAN_FRONTEND": "noninteractive"},
+ }
+ assert m_subp.call_args == mock.call(**expected_call)
+
+ @mock.patch(
+ "cloudinit.distros.debian.Distro._apt_lock_available",
+ side_effect=[False, False, True],
+ )
+ @mock.patch("cloudinit.distros.debian.time.sleep")
+ def test_wait_for_lock(self, m_sleep, m_apt_avail, m_subp, m_which):
+ self.distro._wait_for_apt_command("stub", {"args": "stub2"})
+ assert m_sleep.call_args_list == [mock.call(1), mock.call(1)]
+ assert m_subp.call_args_list == [mock.call(args="stub2")]
+
+ @mock.patch(
+ "cloudinit.distros.debian.Distro._apt_lock_available",
+ return_value=False,
+ )
+ @mock.patch("cloudinit.distros.debian.time.sleep")
+ @mock.patch("cloudinit.distros.debian.time.time", side_effect=count())
+ def test_lock_wait_timeout(
+ self, m_time, m_sleep, m_apt_avail, m_subp, m_which
+ ):
+ with pytest.raises(TimeoutError):
+ self.distro._wait_for_apt_command("stub", "stub2", timeout=5)
+ assert m_subp.call_args_list == []
+
+ @mock.patch(
+ "cloudinit.distros.debian.Distro._apt_lock_available",
+ side_effect=cycle([True, False]),
+ )
+ @mock.patch("cloudinit.distros.debian.time.sleep")
+ def test_lock_exception_wait(self, m_sleep, m_apt_avail, m_subp, m_which):
+ exception = subp.ProcessExecutionError(
+ exit_code=100, stderr="Could not get apt lock"
+ )
+ m_subp.side_effect = [exception, exception, "return_thing"]
+ ret = self.distro._wait_for_apt_command("stub", {"args": "stub2"})
+ assert ret == "return_thing"
+
+ @mock.patch(
+ "cloudinit.distros.debian.Distro._apt_lock_available",
+ side_effect=cycle([True, False]),
+ )
+ @mock.patch("cloudinit.distros.debian.time.sleep")
+ @mock.patch("cloudinit.distros.debian.time.time", side_effect=count())
+ def test_lock_exception_timeout(
+ self, m_time, m_sleep, m_apt_avail, m_subp, m_which
+ ):
+ m_subp.side_effect = subp.ProcessExecutionError(
+ exit_code=100, stderr="Could not get apt lock"
+ )
+ with pytest.raises(TimeoutError):
+ self.distro._wait_for_apt_command(
+ "stub", {"args": "stub2"}, timeout=5
+ )
diff --git a/tests/unittests/distros/test_dragonflybsd.py b/tests/unittests/distros/test_dragonflybsd.py
new file mode 100644
index 00000000..f0cd1b24
--- /dev/null
+++ b/tests/unittests/distros/test_dragonflybsd.py
@@ -0,0 +1,25 @@
+#!/usr/bin/env python3
+
+
+import cloudinit.util
+from tests.unittests.helpers import mock
+
+
+def test_find_dragonflybsd_part():
+ assert cloudinit.util.find_dragonflybsd_part("/dev/vbd0s3") == "vbd0s3"
+
+
+@mock.patch("cloudinit.util.is_DragonFlyBSD")
+@mock.patch("cloudinit.subp.subp")
+def test_parse_mount(mock_subp, m_is_DragonFlyBSD):
+ mount_out = """
+vbd0s3 on / (hammer2, local)
+devfs on /dev (devfs, nosymfollow, local)
+/dev/vbd0s0a on /boot (ufs, local)
+procfs on /proc (procfs, local)
+tmpfs on /var/run/shm (tmpfs, local)
+"""
+
+ mock_subp.return_value = (mount_out, "")
+ m_is_DragonFlyBSD.return_value = True
+ assert cloudinit.util.parse_mount("/") == ("vbd0s3", "hammer2", "/")
diff --git a/tests/unittests/distros/test_freebsd.py b/tests/unittests/distros/test_freebsd.py
new file mode 100644
index 00000000..22be5098
--- /dev/null
+++ b/tests/unittests/distros/test_freebsd.py
@@ -0,0 +1,43 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+import os
+
+from cloudinit.util import find_freebsd_part, get_path_dev_freebsd
+from tests.unittests.helpers import CiTestCase, mock
+
+
+class TestDeviceLookUp(CiTestCase):
+ @mock.patch("cloudinit.subp.subp")
+ def test_find_freebsd_part_label(self, mock_subp):
+ glabel_out = """
+gptid/fa52d426-c337-11e6-8911-00155d4c5e47 N/A da0p1
+ label/rootfs N/A da0p2
+ label/swap N/A da0p3
+"""
+ mock_subp.return_value = (glabel_out, "")
+ res = find_freebsd_part("/dev/label/rootfs")
+ self.assertEqual("da0p2", res)
+
+ @mock.patch("cloudinit.subp.subp")
+ def test_find_freebsd_part_gpt(self, mock_subp):
+ glabel_out = """
+ gpt/bootfs N/A vtbd0p1
+gptid/3f4cbe26-75da-11e8-a8f2-002590ec6166 N/A vtbd0p1
+ gpt/swapfs N/A vtbd0p2
+ gpt/rootfs N/A vtbd0p3
+ iso9660/cidata N/A vtbd2
+"""
+ mock_subp.return_value = (glabel_out, "")
+ res = find_freebsd_part("/dev/gpt/rootfs")
+ self.assertEqual("vtbd0p3", res)
+
+ def test_get_path_dev_freebsd_label(self):
+ mnt_list = """
+/dev/label/rootfs / ufs rw 1 1
+devfs /dev devfs rw,multilabel 0 0
+fdescfs /dev/fd fdescfs rw 0 0
+/dev/da1s1 /mnt/resource ufs rw 2 2
+"""
+ with mock.patch.object(os.path, "exists", return_value=True):
+ res = get_path_dev_freebsd("/etc", mnt_list)
+ self.assertIsNotNone(res)
diff --git a/tests/unittests/distros/test_generic.py b/tests/unittests/distros/test_generic.py
new file mode 100644
index 00000000..93c5395c
--- /dev/null
+++ b/tests/unittests/distros/test_generic.py
@@ -0,0 +1,383 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+import os
+import shutil
+import tempfile
+from unittest import mock
+
+import pytest
+
+from cloudinit import distros, util
+from tests.unittests import helpers
+
+unknown_arch_info = {
+ "arches": ["default"],
+ "failsafe": {
+ "primary": "http://fs-primary-default",
+ "security": "http://fs-security-default",
+ },
+}
+
+package_mirrors = [
+ {
+ "arches": ["i386", "amd64"],
+ "failsafe": {
+ "primary": "http://fs-primary-intel",
+ "security": "http://fs-security-intel",
+ },
+ "search": {
+ "primary": [
+ "http://%(ec2_region)s.ec2/",
+ "http://%(availability_zone)s.clouds/",
+ ],
+ "security": [
+ "http://security-mirror1-intel",
+ "http://security-mirror2-intel",
+ ],
+ },
+ },
+ {
+ "arches": ["armhf", "armel"],
+ "failsafe": {
+ "primary": "http://fs-primary-arm",
+ "security": "http://fs-security-arm",
+ },
+ },
+ unknown_arch_info,
+]
+
+gpmi = distros._get_package_mirror_info
+gapmi = distros._get_arch_package_mirror_info
+
+
+class TestGenericDistro(helpers.FilesystemMockingTestCase):
+ def setUp(self):
+ super(TestGenericDistro, self).setUp()
+ # Make a temp directoy for tests to use.
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
+
+ def _write_load_sudoers(self, _user, rules):
+ cls = distros.fetch("ubuntu")
+ d = cls("ubuntu", {}, None)
+ os.makedirs(os.path.join(self.tmp, "etc"))
+ os.makedirs(os.path.join(self.tmp, "etc", "sudoers.d"))
+ self.patchOS(self.tmp)
+ self.patchUtils(self.tmp)
+ d.write_sudo_rules("harlowja", rules)
+ contents = util.load_file(d.ci_sudoers_fn)
+ return contents
+
+ def _count_in(self, lines_look_for, text_content):
+ found_amount = 0
+ for e in lines_look_for:
+ for line in text_content.splitlines():
+ line = line.strip()
+ if line == e:
+ found_amount += 1
+ return found_amount
+
+ def test_sudoers_ensure_rules(self):
+ rules = "ALL=(ALL:ALL) ALL"
+ contents = self._write_load_sudoers("harlowja", rules)
+ expected = ["harlowja ALL=(ALL:ALL) ALL"]
+ self.assertEqual(len(expected), self._count_in(expected, contents))
+ not_expected = [
+ "harlowja A",
+ "harlowja L",
+ "harlowja L",
+ ]
+ self.assertEqual(0, self._count_in(not_expected, contents))
+
+ def test_sudoers_ensure_rules_list(self):
+ rules = [
+ "ALL=(ALL:ALL) ALL",
+ "B-ALL=(ALL:ALL) ALL",
+ "C-ALL=(ALL:ALL) ALL",
+ ]
+ contents = self._write_load_sudoers("harlowja", rules)
+ expected = [
+ "harlowja ALL=(ALL:ALL) ALL",
+ "harlowja B-ALL=(ALL:ALL) ALL",
+ "harlowja C-ALL=(ALL:ALL) ALL",
+ ]
+ self.assertEqual(len(expected), self._count_in(expected, contents))
+ not_expected = [
+ "harlowja A",
+ "harlowja L",
+ "harlowja L",
+ ]
+ self.assertEqual(0, self._count_in(not_expected, contents))
+
+ def test_sudoers_ensure_new(self):
+ cls = distros.fetch("ubuntu")
+ d = cls("ubuntu", {}, None)
+ self.patchOS(self.tmp)
+ self.patchUtils(self.tmp)
+ d.ensure_sudo_dir("/b")
+ contents = util.load_file("/etc/sudoers")
+ self.assertIn("includedir /b", contents)
+ self.assertTrue(os.path.isdir("/b"))
+
+ def test_sudoers_ensure_append(self):
+ cls = distros.fetch("ubuntu")
+ d = cls("ubuntu", {}, None)
+ self.patchOS(self.tmp)
+ self.patchUtils(self.tmp)
+ util.write_file("/etc/sudoers", "josh, josh\n")
+ d.ensure_sudo_dir("/b")
+ contents = util.load_file("/etc/sudoers")
+ self.assertIn("includedir /b", contents)
+ self.assertTrue(os.path.isdir("/b"))
+ self.assertIn("josh", contents)
+ self.assertEqual(2, contents.count("josh"))
+
+ def test_sudoers_ensure_only_one_includedir(self):
+ cls = distros.fetch("ubuntu")
+ d = cls("ubuntu", {}, None)
+ self.patchOS(self.tmp)
+ self.patchUtils(self.tmp)
+ for char in ["#", "@"]:
+ util.write_file("/etc/sudoers", "{}includedir /b".format(char))
+ d.ensure_sudo_dir("/b")
+ contents = util.load_file("/etc/sudoers")
+ self.assertIn("includedir /b", contents)
+ self.assertTrue(os.path.isdir("/b"))
+ self.assertEqual(1, contents.count("includedir /b"))
+
+ def test_arch_package_mirror_info_unknown(self):
+ """for an unknown arch, we should get back that with arch 'default'."""
+ arch_mirrors = gapmi(package_mirrors, arch="unknown")
+ self.assertEqual(unknown_arch_info, arch_mirrors)
+
+ def test_arch_package_mirror_info_known(self):
+ arch_mirrors = gapmi(package_mirrors, arch="amd64")
+ self.assertEqual(package_mirrors[0], arch_mirrors)
+
+ def test_systemd_in_use(self):
+ cls = distros.fetch("ubuntu")
+ d = cls("ubuntu", {}, None)
+ self.patchOS(self.tmp)
+ self.patchUtils(self.tmp)
+ os.makedirs("/run/systemd/system")
+ self.assertTrue(d.uses_systemd())
+
+ def test_systemd_not_in_use(self):
+ cls = distros.fetch("ubuntu")
+ d = cls("ubuntu", {}, None)
+ self.patchOS(self.tmp)
+ self.patchUtils(self.tmp)
+ self.assertFalse(d.uses_systemd())
+
+ def test_systemd_symlink(self):
+ cls = distros.fetch("ubuntu")
+ d = cls("ubuntu", {}, None)
+ self.patchOS(self.tmp)
+ self.patchUtils(self.tmp)
+ os.makedirs("/run/systemd")
+ os.symlink("/", "/run/systemd/system")
+ self.assertFalse(d.uses_systemd())
+
+ @mock.patch("cloudinit.distros.debian.read_system_locale")
+ def test_get_locale_ubuntu(self, m_locale):
+ """Test ubuntu distro returns locale set to C.UTF-8"""
+ m_locale.return_value = "C.UTF-8"
+ cls = distros.fetch("ubuntu")
+ d = cls("ubuntu", {}, None)
+ locale = d.get_locale()
+ self.assertEqual("C.UTF-8", locale)
+
+ def test_get_locale_rhel(self):
+ """Test rhel distro returns NotImplementedError exception"""
+ cls = distros.fetch("rhel")
+ d = cls("rhel", {}, None)
+ with self.assertRaises(NotImplementedError):
+ d.get_locale()
+
+ def test_expire_passwd_uses_chpasswd(self):
+ """Test ubuntu.expire_passwd uses the passwd command."""
+ for d_name in ("ubuntu", "rhel"):
+ cls = distros.fetch(d_name)
+ d = cls(d_name, {}, None)
+ with mock.patch("cloudinit.subp.subp") as m_subp:
+ d.expire_passwd("myuser")
+ m_subp.assert_called_once_with(["passwd", "--expire", "myuser"])
+
+ def test_expire_passwd_freebsd_uses_pw_command(self):
+ """Test FreeBSD.expire_passwd uses the pw command."""
+ cls = distros.fetch("freebsd")
+ d = cls("freebsd", {}, None)
+ with mock.patch("cloudinit.subp.subp") as m_subp:
+ d.expire_passwd("myuser")
+ m_subp.assert_called_once_with(
+ ["pw", "usermod", "myuser", "-p", "01-Jan-1970"]
+ )
+
+
+class TestGetPackageMirrors:
+ def return_first(self, mlist):
+ if not mlist:
+ return None
+ return mlist[0]
+
+ def return_second(self, mlist):
+ if not mlist:
+ return None
+
+ return mlist[1] if len(mlist) > 1 else None
+
+ def return_none(self, _mlist):
+ return None
+
+ def return_last(self, mlist):
+ if not mlist:
+ return None
+ return mlist[-1]
+
+ @pytest.mark.parametrize(
+ "allow_ec2_mirror, platform_type, mirrors",
+ [
+ (
+ True,
+ "ec2",
+ [
+ {
+ "primary": "http://us-east-1.ec2/",
+ "security": "http://security-mirror1-intel",
+ },
+ {
+ "primary": "http://us-east-1a.clouds/",
+ "security": "http://security-mirror2-intel",
+ },
+ ],
+ ),
+ (
+ True,
+ "other",
+ [
+ {
+ "primary": "http://us-east-1.ec2/",
+ "security": "http://security-mirror1-intel",
+ },
+ {
+ "primary": "http://us-east-1a.clouds/",
+ "security": "http://security-mirror2-intel",
+ },
+ ],
+ ),
+ (
+ False,
+ "ec2",
+ [
+ {
+ "primary": "http://us-east-1.ec2/",
+ "security": "http://security-mirror1-intel",
+ },
+ {
+ "primary": "http://us-east-1a.clouds/",
+ "security": "http://security-mirror2-intel",
+ },
+ ],
+ ),
+ (
+ False,
+ "other",
+ [
+ {
+ "primary": "http://us-east-1a.clouds/",
+ "security": "http://security-mirror1-intel",
+ },
+ {
+ "primary": "http://fs-primary-intel",
+ "security": "http://security-mirror2-intel",
+ },
+ ],
+ ),
+ ],
+ )
+ def test_get_package_mirror_info_az_ec2(
+ self, allow_ec2_mirror, platform_type, mirrors
+ ):
+ flag_path = (
+ "cloudinit.distros.ALLOW_EC2_MIRRORS_ON_NON_AWS_INSTANCE_TYPES"
+ )
+ with mock.patch(flag_path, allow_ec2_mirror):
+ arch_mirrors = gapmi(package_mirrors, arch="amd64")
+ data_source_mock = mock.Mock(
+ availability_zone="us-east-1a", platform_type=platform_type
+ )
+
+ results = gpmi(
+ arch_mirrors,
+ data_source=data_source_mock,
+ mirror_filter=self.return_first,
+ )
+ assert results == mirrors[0]
+
+ results = gpmi(
+ arch_mirrors,
+ data_source=data_source_mock,
+ mirror_filter=self.return_second,
+ )
+ assert results == mirrors[1]
+
+ results = gpmi(
+ arch_mirrors,
+ data_source=data_source_mock,
+ mirror_filter=self.return_none,
+ )
+ assert results == package_mirrors[0]["failsafe"]
+
+ def test_get_package_mirror_info_az_non_ec2(self):
+ arch_mirrors = gapmi(package_mirrors, arch="amd64")
+ data_source_mock = mock.Mock(availability_zone="nova.cloudvendor")
+
+ results = gpmi(
+ arch_mirrors,
+ data_source=data_source_mock,
+ mirror_filter=self.return_first,
+ )
+ assert results == {
+ "primary": "http://nova.cloudvendor.clouds/",
+ "security": "http://security-mirror1-intel",
+ }
+
+ results = gpmi(
+ arch_mirrors,
+ data_source=data_source_mock,
+ mirror_filter=self.return_last,
+ )
+ assert results == {
+ "primary": "http://nova.cloudvendor.clouds/",
+ "security": "http://security-mirror2-intel",
+ }
+
+ def test_get_package_mirror_info_none(self):
+ arch_mirrors = gapmi(package_mirrors, arch="amd64")
+ data_source_mock = mock.Mock(availability_zone=None)
+
+ # because both search entries here replacement based on
+ # availability-zone, the filter will be called with an empty list and
+ # failsafe should be taken.
+ results = gpmi(
+ arch_mirrors,
+ data_source=data_source_mock,
+ mirror_filter=self.return_first,
+ )
+ assert results == {
+ "primary": "http://fs-primary-intel",
+ "security": "http://security-mirror1-intel",
+ }
+
+ results = gpmi(
+ arch_mirrors,
+ data_source=data_source_mock,
+ mirror_filter=self.return_last,
+ )
+ assert results == {
+ "primary": "http://fs-primary-intel",
+ "security": "http://security-mirror2-intel",
+ }
+
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/distros/test_gentoo.py b/tests/unittests/distros/test_gentoo.py
new file mode 100644
index 00000000..dadf5df5
--- /dev/null
+++ b/tests/unittests/distros/test_gentoo.py
@@ -0,0 +1,27 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+from cloudinit import atomic_helper, util
+from tests.unittests.helpers import CiTestCase
+
+from . import _get_distro
+
+
+class TestGentoo(CiTestCase):
+ def test_write_hostname(self):
+ distro = _get_distro("gentoo")
+ hostname = "myhostname"
+ hostfile = self.tmp_path("hostfile")
+ distro._write_hostname(hostname, hostfile)
+ self.assertEqual('hostname="myhostname"\n', util.load_file(hostfile))
+
+ def test_write_existing_hostname_with_comments(self):
+ distro = _get_distro("gentoo")
+ hostname = "myhostname"
+ contents = '#This is the hostname\nhostname="localhost"'
+ hostfile = self.tmp_path("hostfile")
+ atomic_helper.write_file(hostfile, contents, omode="w")
+ distro._write_hostname(hostname, hostfile)
+ self.assertEqual(
+ '#This is the hostname\nhostname="myhostname"\n',
+ util.load_file(hostfile),
+ )
diff --git a/tests/unittests/distros/test_hostname.py b/tests/unittests/distros/test_hostname.py
new file mode 100644
index 00000000..2cbbb3e2
--- /dev/null
+++ b/tests/unittests/distros/test_hostname.py
@@ -0,0 +1,42 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+import unittest
+
+from cloudinit.distros.parsers import hostname
+
+BASE_HOSTNAME = """
+# My super-duper-hostname
+
+blahblah
+
+"""
+BASE_HOSTNAME = BASE_HOSTNAME.strip()
+
+
+class TestHostnameHelper(unittest.TestCase):
+ def test_parse_same(self):
+ hn = hostname.HostnameConf(BASE_HOSTNAME)
+ self.assertEqual(str(hn).strip(), BASE_HOSTNAME)
+ self.assertEqual(hn.hostname, "blahblah")
+
+ def test_no_adjust_hostname(self):
+ hn = hostname.HostnameConf(BASE_HOSTNAME)
+ prev_name = hn.hostname
+ hn.set_hostname("")
+ self.assertEqual(hn.hostname, prev_name)
+
+ def test_adjust_hostname(self):
+ hn = hostname.HostnameConf(BASE_HOSTNAME)
+ prev_name = hn.hostname
+ self.assertEqual(prev_name, "blahblah")
+ hn.set_hostname("bbbbd")
+ self.assertEqual(hn.hostname, "bbbbd")
+ expected_out = """
+# My super-duper-hostname
+
+bbbbd
+"""
+ self.assertEqual(str(hn).strip(), expected_out.strip())
+
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/distros/test_hosts.py b/tests/unittests/distros/test_hosts.py
new file mode 100644
index 00000000..faffd912
--- /dev/null
+++ b/tests/unittests/distros/test_hosts.py
@@ -0,0 +1,47 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+import unittest
+
+from cloudinit.distros.parsers import hosts
+
+BASE_ETC = """
+# Example
+127.0.0.1 localhost
+192.168.1.10 foo.mydomain.org foo
+192.168.1.10 bar.mydomain.org bar
+146.82.138.7 master.debian.org master
+209.237.226.90 www.opensource.org
+"""
+BASE_ETC = BASE_ETC.strip()
+
+
+class TestHostsHelper(unittest.TestCase):
+ def test_parse(self):
+ eh = hosts.HostsConf(BASE_ETC)
+ self.assertEqual(eh.get_entry("127.0.0.1"), [["localhost"]])
+ self.assertEqual(
+ eh.get_entry("192.168.1.10"),
+ [["foo.mydomain.org", "foo"], ["bar.mydomain.org", "bar"]],
+ )
+ eh = str(eh)
+ self.assertTrue(eh.startswith("# Example"))
+
+ def test_add(self):
+ eh = hosts.HostsConf(BASE_ETC)
+ eh.add_entry("127.0.0.0", "blah")
+ self.assertEqual(eh.get_entry("127.0.0.0"), [["blah"]])
+ eh.add_entry("127.0.0.3", "blah", "blah2", "blah3")
+ self.assertEqual(
+ eh.get_entry("127.0.0.3"), [["blah", "blah2", "blah3"]]
+ )
+
+ def test_del(self):
+ eh = hosts.HostsConf(BASE_ETC)
+ eh.add_entry("127.0.0.0", "blah")
+ self.assertEqual(eh.get_entry("127.0.0.0"), [["blah"]])
+
+ eh.del_entries("127.0.0.0")
+ self.assertEqual(eh.get_entry("127.0.0.0"), [])
+
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/distros/test_init.py b/tests/unittests/distros/test_init.py
new file mode 100644
index 00000000..8f3c8978
--- /dev/null
+++ b/tests/unittests/distros/test_init.py
@@ -0,0 +1,248 @@
+# Copyright (C) 2020 Canonical Ltd.
+#
+# Author: Daniel Watkins <oddbloke@ubuntu.com>
+#
+# This file is part of cloud-init. See LICENSE file for license information.
+"""Tests for cloudinit/distros/__init__.py"""
+
+from unittest import mock
+
+import pytest
+
+from cloudinit.distros import LDH_ASCII_CHARS, _get_package_mirror_info
+
+# In newer versions of Python, these characters will be omitted instead
+# of substituted because of security concerns.
+# See https://bugs.python.org/issue43882
+SECURITY_URL_CHARS = "\n\r\t"
+
+# Define a set of characters we would expect to be replaced
+INVALID_URL_CHARS = [
+ chr(x)
+ for x in range(127)
+ if chr(x) not in LDH_ASCII_CHARS + SECURITY_URL_CHARS
+]
+for separator in [":", ".", "/", "#", "?", "@", "[", "]"]:
+ # Remove from the set characters that either separate hostname parts (":",
+ # "."), terminate hostnames ("/", "#", "?", "@"), or cause Python to be
+ # unable to parse URLs ("[", "]").
+ INVALID_URL_CHARS.remove(separator)
+
+
+class TestGetPackageMirrorInfo:
+ """
+ Tests for cloudinit.distros._get_package_mirror_info.
+
+ These supplement the tests in tests/unittests/test_distros/test_generic.py
+ which are more focused on testing a single production-like configuration.
+ These tests are more focused on specific aspects of the unit under test.
+ """
+
+ @pytest.mark.parametrize(
+ "mirror_info,expected",
+ [
+ # Empty info gives empty return
+ ({}, {}),
+ # failsafe values used if present
+ (
+ {
+ "failsafe": {
+ "primary": "http://value",
+ "security": "http://other",
+ }
+ },
+ {"primary": "http://value", "security": "http://other"},
+ ),
+ # search values used if present
+ (
+ {
+ "search": {
+ "primary": ["http://value"],
+ "security": ["http://other"],
+ }
+ },
+ {"primary": ["http://value"], "security": ["http://other"]},
+ ),
+ # failsafe values used if search value not present
+ (
+ {
+ "search": {"primary": ["http://value"]},
+ "failsafe": {"security": "http://other"},
+ },
+ {"primary": ["http://value"], "security": "http://other"},
+ ),
+ ],
+ )
+ def test_get_package_mirror_info_failsafe(self, mirror_info, expected):
+ """
+ Test the interaction between search and failsafe inputs
+
+ (This doesn't test the case where the mirror_filter removes all search
+ options; test_failsafe_used_if_all_search_results_filtered_out covers
+ that.)
+ """
+ assert expected == _get_package_mirror_info(
+ mirror_info, mirror_filter=lambda x: x
+ )
+
+ def test_failsafe_used_if_all_search_results_filtered_out(self):
+ """Test the failsafe option used if all search options eliminated."""
+ mirror_info = {
+ "search": {"primary": ["http://value"]},
+ "failsafe": {"primary": "http://other"},
+ }
+ assert {"primary": "http://other"} == _get_package_mirror_info(
+ mirror_info, mirror_filter=lambda x: False
+ )
+
+ @pytest.mark.parametrize(
+ "allow_ec2_mirror, platform_type", [(True, "ec2")]
+ )
+ @pytest.mark.parametrize(
+ "availability_zone,region,patterns,expected",
+ (
+ # Test ec2_region alone
+ (
+ "fk-fake-1f",
+ None,
+ ["http://EC2-%(ec2_region)s/ubuntu"],
+ ["http://ec2-fk-fake-1/ubuntu"],
+ ),
+ # Test availability_zone alone
+ (
+ "fk-fake-1f",
+ None,
+ ["http://AZ-%(availability_zone)s/ubuntu"],
+ ["http://az-fk-fake-1f/ubuntu"],
+ ),
+ # Test region alone
+ (
+ None,
+ "fk-fake-1",
+ ["http://RG-%(region)s/ubuntu"],
+ ["http://rg-fk-fake-1/ubuntu"],
+ ),
+ # Test that ec2_region is not available for non-matching AZs
+ (
+ "fake-fake-1f",
+ None,
+ [
+ "http://EC2-%(ec2_region)s/ubuntu",
+ "http://AZ-%(availability_zone)s/ubuntu",
+ ],
+ ["http://az-fake-fake-1f/ubuntu"],
+ ),
+ # Test that template order maintained
+ (
+ None,
+ "fake-region",
+ [
+ "http://RG-%(region)s-2/ubuntu",
+ "http://RG-%(region)s-1/ubuntu",
+ ],
+ [
+ "http://rg-fake-region-2/ubuntu",
+ "http://rg-fake-region-1/ubuntu",
+ ],
+ ),
+ # Test that non-ASCII hostnames are IDNA encoded;
+ # "IDNA-ТεЅТ̣".encode('idna') == b"xn--idna--4kd53hh6aba3q"
+ (
+ None,
+ "ТεЅТ̣",
+ ["http://www.IDNA-%(region)s.com/ubuntu"],
+ ["http://www.xn--idna--4kd53hh6aba3q.com/ubuntu"],
+ ),
+ # Test that non-ASCII hostnames with a port are IDNA encoded;
+ # "IDNA-ТεЅТ̣".encode('idna') == b"xn--idna--4kd53hh6aba3q"
+ (
+ None,
+ "ТεЅТ̣",
+ ["http://www.IDNA-%(region)s.com:8080/ubuntu"],
+ ["http://www.xn--idna--4kd53hh6aba3q.com:8080/ubuntu"],
+ ),
+ # Test that non-ASCII non-hostname parts of URLs are unchanged
+ (
+ None,
+ "ТεЅТ̣",
+ ["http://www.example.com/%(region)s/ubuntu"],
+ ["http://www.example.com/ТεЅТ̣/ubuntu"],
+ ),
+ # Test that IPv4 addresses are unchanged
+ (
+ None,
+ "fk-fake-1",
+ ["http://192.168.1.1:8080/%(region)s/ubuntu"],
+ ["http://192.168.1.1:8080/fk-fake-1/ubuntu"],
+ ),
+ # Test that IPv6 addresses are unchanged
+ (
+ None,
+ "fk-fake-1",
+ ["http://[2001:67c:1360:8001::23]/%(region)s/ubuntu"],
+ ["http://[2001:67c:1360:8001::23]/fk-fake-1/ubuntu"],
+ ),
+ # Test that unparseable URLs are filtered out of the mirror list
+ (
+ None,
+ "inv[lid",
+ [
+ "http://%(region)s.in.hostname/should/be/filtered",
+ "http://but.not.in.the.path/%(region)s",
+ ],
+ ["http://but.not.in.the.path/inv[lid"],
+ ),
+ (
+ None,
+ "-some-region-",
+ ["http://-lead-ing.%(region)s.trail-ing-.example.com/ubuntu"],
+ ["http://lead-ing.some-region.trail-ing.example.com/ubuntu"],
+ ),
+ )
+ + tuple(
+ # Dynamically generate a test case for each non-LDH
+ # (Letters/Digits/Hyphen) ASCII character, testing that it is
+ # substituted with a hyphen
+ (
+ None,
+ "fk{0}fake{0}1".format(invalid_char),
+ ["http://%(region)s/ubuntu"],
+ ["http://fk-fake-1/ubuntu"],
+ )
+ for invalid_char in INVALID_URL_CHARS
+ ),
+ )
+ def test_valid_substitution(
+ self,
+ allow_ec2_mirror,
+ platform_type,
+ availability_zone,
+ region,
+ patterns,
+ expected,
+ ):
+ """Test substitution works as expected."""
+ flag_path = (
+ "cloudinit.distros.ALLOW_EC2_MIRRORS_ON_NON_AWS_INSTANCE_TYPES"
+ )
+
+ m_data_source = mock.Mock(
+ availability_zone=availability_zone,
+ region=region,
+ platform_type=platform_type,
+ )
+ mirror_info = {"search": {"primary": patterns}}
+
+ with mock.patch(flag_path, allow_ec2_mirror):
+ ret = _get_package_mirror_info(
+ mirror_info,
+ data_source=m_data_source,
+ mirror_filter=lambda x: x,
+ )
+ print(allow_ec2_mirror)
+ print(platform_type)
+ print(availability_zone)
+ print(region)
+ print(patterns)
+ print(expected)
+ assert {"primary": expected} == ret
diff --git a/tests/unittests/distros/test_manage_service.py b/tests/unittests/distros/test_manage_service.py
new file mode 100644
index 00000000..9e64b35c
--- /dev/null
+++ b/tests/unittests/distros/test_manage_service.py
@@ -0,0 +1,41 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+from tests.unittests.helpers import CiTestCase, mock
+from tests.unittests.util import MockDistro
+
+
+class TestManageService(CiTestCase):
+
+ with_logs = True
+
+ def setUp(self):
+ super(TestManageService, self).setUp()
+ self.dist = MockDistro()
+
+ @mock.patch.object(MockDistro, "uses_systemd", return_value=False)
+ @mock.patch("cloudinit.distros.subp.subp")
+ def test_manage_service_systemctl_initcmd(self, m_subp, m_sysd):
+ self.dist.init_cmd = ["systemctl"]
+ self.dist.manage_service("start", "myssh")
+ m_subp.assert_called_with(
+ ["systemctl", "start", "myssh"], capture=True
+ )
+
+ @mock.patch.object(MockDistro, "uses_systemd", return_value=False)
+ @mock.patch("cloudinit.distros.subp.subp")
+ def test_manage_service_service_initcmd(self, m_subp, m_sysd):
+ self.dist.init_cmd = ["service"]
+ self.dist.manage_service("start", "myssh")
+ m_subp.assert_called_with(["service", "myssh", "start"], capture=True)
+
+ @mock.patch.object(MockDistro, "uses_systemd", return_value=True)
+ @mock.patch("cloudinit.distros.subp.subp")
+ def test_manage_service_systemctl(self, m_subp, m_sysd):
+ self.dist.init_cmd = ["ignore"]
+ self.dist.manage_service("start", "myssh")
+ m_subp.assert_called_with(
+ ["systemctl", "start", "myssh"], capture=True
+ )
+
+
+# vi: ts=4 sw=4 expandtab
diff --git a/tests/unittests/distros/test_netbsd.py b/tests/unittests/distros/test_netbsd.py
new file mode 100644
index 00000000..0bc6dfbd
--- /dev/null
+++ b/tests/unittests/distros/test_netbsd.py
@@ -0,0 +1,18 @@
+import unittest.mock as mock
+
+import pytest
+
+import cloudinit.distros.netbsd
+
+
+@pytest.mark.parametrize("with_pkgin", (True, False))
+@mock.patch("cloudinit.distros.netbsd.os")
+def test_init(m_os, with_pkgin):
+ print(with_pkgin)
+ m_os.path.exists.return_value = with_pkgin
+ cfg = {}
+
+ distro = cloudinit.distros.netbsd.NetBSD("netbsd", cfg, None)
+ expectation = ["pkgin", "-y", "full-upgrade"] if with_pkgin else None
+ assert distro.pkg_cmd_upgrade_prefix == expectation
+ assert [mock.call("/usr/pkg/bin/pkgin")] == m_os.path.exists.call_args_list
diff --git a/tests/unittests/distros/test_netconfig.py b/tests/unittests/distros/test_netconfig.py
new file mode 100644
index 00000000..a25be481
--- /dev/null
+++ b/tests/unittests/distros/test_netconfig.py
@@ -0,0 +1,1013 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+import copy
+import os
+import re
+from io import StringIO
+from textwrap import dedent
+from unittest import mock
+
+from cloudinit import distros, helpers, safeyaml, settings, subp, util
+from cloudinit.distros.parsers.sys_conf import SysConf
+from tests.unittests.helpers import FilesystemMockingTestCase, dir2dict
+
+BASE_NET_CFG = """
+auto lo
+iface lo inet loopback
+
+auto eth0
+iface eth0 inet static
+ address 192.168.1.5
+ broadcast 192.168.1.0
+ gateway 192.168.1.254
+ netmask 255.255.255.0
+ network 192.168.0.0
+
+auto eth1
+iface eth1 inet dhcp
+"""
+
+BASE_NET_CFG_FROM_V2 = """
+auto lo
+iface lo inet loopback
+
+auto eth0
+iface eth0 inet static
+ address 192.168.1.5/24
+ gateway 192.168.1.254
+
+auto eth1
+iface eth1 inet dhcp
+"""
+
+BASE_NET_CFG_IPV6 = """
+auto lo
+iface lo inet loopback
+
+auto eth0
+iface eth0 inet static
+ address 192.168.1.5
+ netmask 255.255.255.0
+ network 192.168.0.0
+ broadcast 192.168.1.0
+ gateway 192.168.1.254
+
+iface eth0 inet6 static
+ address 2607:f0d0:1002:0011::2
+ netmask 64
+ gateway 2607:f0d0:1002:0011::1
+
+iface eth1 inet static
+ address 192.168.1.6
+ netmask 255.255.255.0
+ network 192.168.0.0
+ broadcast 192.168.1.0
+ gateway 192.168.1.254
+
+iface eth1 inet6 static
+ address 2607:f0d0:1002:0011::3
+ netmask 64
+ gateway 2607:f0d0:1002:0011::1
+"""
+
+V1_NET_CFG = {
+ "config": [
+ {
+ "name": "eth0",
+ "subnets": [
+ {
+ "address": "192.168.1.5",
+ "broadcast": "192.168.1.0",
+ "gateway": "192.168.1.254",
+ "netmask": "255.255.255.0",
+ "type": "static",
+ }
+ ],
+ "type": "physical",
+ },
+ {
+ "name": "eth1",
+ "subnets": [{"control": "auto", "type": "dhcp4"}],
+ "type": "physical",
+ },
+ ],
+ "version": 1,
+}
+
+V1_NET_CFG_WITH_DUPS = """\
+# same value in interface specific dns and global dns
+# should produce single entry in network file
+version: 1
+config:
+ - type: physical
+ name: eth0
+ subnets:
+ - type: static
+ address: 192.168.0.102/24
+ dns_nameservers: [1.2.3.4]
+ dns_search: [test.com]
+ interface: eth0
+ - type: nameserver
+ address: [1.2.3.4]
+ search: [test.com]
+"""
+
+V1_NET_CFG_OUTPUT = """\
+# This file is generated from information provided by the datasource. Changes
+# to it will not persist across an instance reboot. To disable cloud-init's
+# network configuration capabilities, write a file
+# /etc/cloud/cloud.cfg.d/99-disable-network-config.cfg with the following:
+# network: {config: disabled}
+auto lo
+iface lo inet loopback
+
+auto eth0
+iface eth0 inet static
+ address 192.168.1.5/24
+ broadcast 192.168.1.0
+ gateway 192.168.1.254
+
+auto eth1
+iface eth1 inet dhcp
+"""
+
+V1_NET_CFG_IPV6_OUTPUT = """\
+# This file is generated from information provided by the datasource. Changes
+# to it will not persist across an instance reboot. To disable cloud-init's
+# network configuration capabilities, write a file
+# /etc/cloud/cloud.cfg.d/99-disable-network-config.cfg with the following:
+# network: {config: disabled}
+auto lo
+iface lo inet loopback
+
+auto eth0
+iface eth0 inet6 static
+ address 2607:f0d0:1002:0011::2/64
+ gateway 2607:f0d0:1002:0011::1
+
+auto eth1
+iface eth1 inet dhcp
+"""
+
+V1_NET_CFG_IPV6 = {
+ "config": [
+ {
+ "name": "eth0",
+ "subnets": [
+ {
+ "address": "2607:f0d0:1002:0011::2",
+ "gateway": "2607:f0d0:1002:0011::1",
+ "netmask": "64",
+ "type": "static6",
+ }
+ ],
+ "type": "physical",
+ },
+ {
+ "name": "eth1",
+ "subnets": [{"control": "auto", "type": "dhcp4"}],
+ "type": "physical",
+ },
+ ],
+ "version": 1,
+}
+
+
+V1_TO_V2_NET_CFG_OUTPUT = """\
+# This file is generated from information provided by the datasource. Changes
+# to it will not persist across an instance reboot. To disable cloud-init's
+# network configuration capabilities, write a file
+# /etc/cloud/cloud.cfg.d/99-disable-network-config.cfg with the following:
+# network: {config: disabled}
+network:
+ version: 2
+ ethernets:
+ eth0:
+ addresses:
+ - 192.168.1.5/24
+ gateway4: 192.168.1.254
+ eth1:
+ dhcp4: true
+"""
+
+V1_TO_V2_NET_CFG_IPV6_OUTPUT = """\
+# This file is generated from information provided by the datasource. Changes
+# to it will not persist across an instance reboot. To disable cloud-init's
+# network configuration capabilities, write a file
+# /etc/cloud/cloud.cfg.d/99-disable-network-config.cfg with the following:
+# network: {config: disabled}
+network:
+ version: 2
+ ethernets:
+ eth0:
+ addresses:
+ - 2607:f0d0:1002:0011::2/64
+ gateway6: 2607:f0d0:1002:0011::1
+ eth1:
+ dhcp4: true
+"""
+
+V2_NET_CFG = {
+ "ethernets": {
+ "eth7": {"addresses": ["192.168.1.5/24"], "gateway4": "192.168.1.254"},
+ "eth9": {"dhcp4": True},
+ },
+ "version": 2,
+}
+
+
+V2_TO_V2_NET_CFG_OUTPUT = """\
+# This file is generated from information provided by the datasource. Changes
+# to it will not persist across an instance reboot. To disable cloud-init's
+# network configuration capabilities, write a file
+# /etc/cloud/cloud.cfg.d/99-disable-network-config.cfg with the following:
+# network: {config: disabled}
+network:
+ ethernets:
+ eth7:
+ addresses:
+ - 192.168.1.5/24
+ gateway4: 192.168.1.254
+ eth9:
+ dhcp4: true
+ version: 2
+"""
+
+
+class WriteBuffer(object):
+ def __init__(self):
+ self.buffer = StringIO()
+ self.mode = None
+ self.omode = None
+
+ def write(self, text):
+ self.buffer.write(text)
+
+ def __str__(self):
+ return self.buffer.getvalue()
+
+
+class TestNetCfgDistroBase(FilesystemMockingTestCase):
+ def setUp(self):
+ super(TestNetCfgDistroBase, self).setUp()
+ self.add_patch("cloudinit.util.system_is_snappy", "m_snappy")
+
+ def _get_distro(self, dname, renderers=None):
+ cls = distros.fetch(dname)
+ cfg = settings.CFG_BUILTIN
+ cfg["system_info"]["distro"] = dname
+ if renderers:
+ cfg["system_info"]["network"] = {"renderers": renderers}
+ paths = helpers.Paths({})
+ return cls(dname, cfg.get("system_info"), paths)
+
+ def assertCfgEquals(self, blob1, blob2):
+ b1 = dict(SysConf(blob1.strip().splitlines()))
+ b2 = dict(SysConf(blob2.strip().splitlines()))
+ self.assertEqual(b1, b2)
+ for (k, v) in b1.items():
+ self.assertIn(k, b2)
+ for (k, v) in b2.items():
+ self.assertIn(k, b1)
+ for (k, v) in b1.items():
+ self.assertEqual(v, b2[k])
+
+
+class TestNetCfgDistroFreeBSD(TestNetCfgDistroBase):
+ def setUp(self):
+ super(TestNetCfgDistroFreeBSD, self).setUp()
+ self.distro = self._get_distro("freebsd", renderers=["freebsd"])
+
+ def _apply_and_verify_freebsd(
+ self, apply_fn, config, expected_cfgs=None, bringup=False
+ ):
+ if not expected_cfgs:
+ raise ValueError("expected_cfg must not be None")
+
+ tmpd = None
+ with mock.patch("cloudinit.net.freebsd.available") as m_avail:
+ m_avail.return_value = True
+ with self.reRooted(tmpd) as tmpd:
+ util.ensure_dir("/etc")
+ util.ensure_file("/etc/rc.conf")
+ util.ensure_file("/etc/resolv.conf")
+ apply_fn(config, bringup)
+
+ results = dir2dict(tmpd)
+ for cfgpath, expected in expected_cfgs.items():
+ print("----------")
+ print(expected)
+ print("^^^^ expected | rendered VVVVVVV")
+ print(results[cfgpath])
+ print("----------")
+ self.assertEqual(
+ set(expected.split("\n")), set(results[cfgpath].split("\n"))
+ )
+ self.assertEqual(0o644, get_mode(cfgpath, tmpd))
+
+ @mock.patch("cloudinit.net.get_interfaces_by_mac")
+ def test_apply_network_config_freebsd_standard(self, ifaces_mac):
+ ifaces_mac.return_value = {
+ "00:15:5d:4c:73:00": "eth0",
+ }
+ rc_conf_expected = """\
+defaultrouter=192.168.1.254
+ifconfig_eth0='192.168.1.5 netmask 255.255.255.0'
+ifconfig_eth1=DHCP
+"""
+
+ expected_cfgs = {
+ "/etc/rc.conf": rc_conf_expected,
+ "/etc/resolv.conf": "",
+ }
+ self._apply_and_verify_freebsd(
+ self.distro.apply_network_config,
+ V1_NET_CFG,
+ expected_cfgs=expected_cfgs.copy(),
+ )
+
+ @mock.patch("cloudinit.net.get_interfaces_by_mac")
+ def test_apply_network_config_freebsd_ifrename(self, ifaces_mac):
+ ifaces_mac.return_value = {
+ "00:15:5d:4c:73:00": "vtnet0",
+ }
+ rc_conf_expected = """\
+ifconfig_vtnet0_name=eth0
+defaultrouter=192.168.1.254
+ifconfig_eth0='192.168.1.5 netmask 255.255.255.0'
+ifconfig_eth1=DHCP
+"""
+
+ V1_NET_CFG_RENAME = copy.deepcopy(V1_NET_CFG)
+ V1_NET_CFG_RENAME["config"][0]["mac_address"] = "00:15:5d:4c:73:00"
+
+ expected_cfgs = {
+ "/etc/rc.conf": rc_conf_expected,
+ "/etc/resolv.conf": "",
+ }
+ self._apply_and_verify_freebsd(
+ self.distro.apply_network_config,
+ V1_NET_CFG_RENAME,
+ expected_cfgs=expected_cfgs.copy(),
+ )
+
+ @mock.patch("cloudinit.net.get_interfaces_by_mac")
+ def test_apply_network_config_freebsd_nameserver(self, ifaces_mac):
+ ifaces_mac.return_value = {
+ "00:15:5d:4c:73:00": "eth0",
+ }
+
+ V1_NET_CFG_DNS = copy.deepcopy(V1_NET_CFG)
+ ns = ["1.2.3.4"]
+ V1_NET_CFG_DNS["config"][0]["subnets"][0]["dns_nameservers"] = ns
+ expected_cfgs = {"/etc/resolv.conf": "nameserver 1.2.3.4\n"}
+ self._apply_and_verify_freebsd(
+ self.distro.apply_network_config,
+ V1_NET_CFG_DNS,
+ expected_cfgs=expected_cfgs.copy(),
+ )
+
+
+class TestNetCfgDistroUbuntuEni(TestNetCfgDistroBase):
+ def setUp(self):
+ super(TestNetCfgDistroUbuntuEni, self).setUp()
+ self.distro = self._get_distro("ubuntu", renderers=["eni"])
+
+ def eni_path(self):
+ return "/etc/network/interfaces.d/50-cloud-init.cfg"
+
+ def _apply_and_verify_eni(
+ self, apply_fn, config, expected_cfgs=None, bringup=False
+ ):
+ if not expected_cfgs:
+ raise ValueError("expected_cfg must not be None")
+
+ tmpd = None
+ with mock.patch("cloudinit.net.eni.available") as m_avail:
+ m_avail.return_value = True
+ with self.reRooted(tmpd) as tmpd:
+ apply_fn(config, bringup)
+
+ results = dir2dict(tmpd)
+ for cfgpath, expected in expected_cfgs.items():
+ print("----------")
+ print(expected)
+ print("^^^^ expected | rendered VVVVVVV")
+ print(results[cfgpath])
+ print("----------")
+ self.assertEqual(expected, results[cfgpath])
+ self.assertEqual(0o644, get_mode(cfgpath, tmpd))
+
+ def test_apply_network_config_eni_ub(self):
+ expected_cfgs = {
+ self.eni_path(): V1_NET_CFG_OUTPUT,
+ }
+ # ub_distro.apply_network_config(V1_NET_CFG, False)
+ self._apply_and_verify_eni(
+ self.distro.apply_network_config,
+ V1_NET_CFG,
+ expected_cfgs=expected_cfgs.copy(),
+ )
+
+ def test_apply_network_config_ipv6_ub(self):
+ expected_cfgs = {self.eni_path(): V1_NET_CFG_IPV6_OUTPUT}
+ self._apply_and_verify_eni(
+ self.distro.apply_network_config,
+ V1_NET_CFG_IPV6,
+ expected_cfgs=expected_cfgs.copy(),
+ )
+
+
+class TestNetCfgDistroUbuntuNetplan(TestNetCfgDistroBase):
+ def setUp(self):
+ super(TestNetCfgDistroUbuntuNetplan, self).setUp()
+ self.distro = self._get_distro("ubuntu", renderers=["netplan"])
+ self.devlist = ["eth0", "lo"]
+
+ def _apply_and_verify_netplan(
+ self, apply_fn, config, expected_cfgs=None, bringup=False
+ ):
+ if not expected_cfgs:
+ raise ValueError("expected_cfg must not be None")
+
+ tmpd = None
+ with mock.patch("cloudinit.net.netplan.available", return_value=True):
+ with mock.patch(
+ "cloudinit.net.netplan.get_devicelist",
+ return_value=self.devlist,
+ ):
+ with self.reRooted(tmpd) as tmpd:
+ apply_fn(config, bringup)
+
+ results = dir2dict(tmpd)
+ for cfgpath, expected in expected_cfgs.items():
+ print("----------")
+ print(expected)
+ print("^^^^ expected | rendered VVVVVVV")
+ print(results[cfgpath])
+ print("----------")
+ self.assertEqual(expected, results[cfgpath])
+ self.assertEqual(0o644, get_mode(cfgpath, tmpd))
+
+ def netplan_path(self):
+ return "/etc/netplan/50-cloud-init.yaml"
+
+ def test_apply_network_config_v1_to_netplan_ub(self):
+ expected_cfgs = {
+ self.netplan_path(): V1_TO_V2_NET_CFG_OUTPUT,
+ }
+
+ # ub_distro.apply_network_config(V1_NET_CFG, False)
+ self._apply_and_verify_netplan(
+ self.distro.apply_network_config,
+ V1_NET_CFG,
+ expected_cfgs=expected_cfgs.copy(),
+ )
+
+ def test_apply_network_config_v1_ipv6_to_netplan_ub(self):
+ expected_cfgs = {
+ self.netplan_path(): V1_TO_V2_NET_CFG_IPV6_OUTPUT,
+ }
+
+ # ub_distro.apply_network_config(V1_NET_CFG_IPV6, False)
+ self._apply_and_verify_netplan(
+ self.distro.apply_network_config,
+ V1_NET_CFG_IPV6,
+ expected_cfgs=expected_cfgs.copy(),
+ )
+
+ def test_apply_network_config_v2_passthrough_ub(self):
+ expected_cfgs = {
+ self.netplan_path(): V2_TO_V2_NET_CFG_OUTPUT,
+ }
+ # ub_distro.apply_network_config(V2_NET_CFG, False)
+ self._apply_and_verify_netplan(
+ self.distro.apply_network_config,
+ V2_NET_CFG,
+ expected_cfgs=expected_cfgs.copy(),
+ )
+
+
+class TestNetCfgDistroRedhat(TestNetCfgDistroBase):
+ def setUp(self):
+ super(TestNetCfgDistroRedhat, self).setUp()
+ self.distro = self._get_distro("rhel", renderers=["sysconfig"])
+
+ def ifcfg_path(self, ifname):
+ return "/etc/sysconfig/network-scripts/ifcfg-%s" % ifname
+
+ def control_path(self):
+ return "/etc/sysconfig/network"
+
+ def _apply_and_verify(
+ self, apply_fn, config, expected_cfgs=None, bringup=False
+ ):
+ if not expected_cfgs:
+ raise ValueError("expected_cfg must not be None")
+
+ tmpd = None
+ with mock.patch("cloudinit.net.sysconfig.available") as m_avail:
+ m_avail.return_value = True
+ with self.reRooted(tmpd) as tmpd:
+ apply_fn(config, bringup)
+
+ results = dir2dict(tmpd)
+ for cfgpath, expected in expected_cfgs.items():
+ self.assertCfgEquals(expected, results[cfgpath])
+ self.assertEqual(0o644, get_mode(cfgpath, tmpd))
+
+ def test_apply_network_config_rh(self):
+ expected_cfgs = {
+ self.ifcfg_path("eth0"): dedent(
+ """\
+ BOOTPROTO=none
+ DEFROUTE=yes
+ DEVICE=eth0
+ GATEWAY=192.168.1.254
+ IPADDR=192.168.1.5
+ NETMASK=255.255.255.0
+ NM_CONTROLLED=no
+ ONBOOT=yes
+ TYPE=Ethernet
+ USERCTL=no
+ """
+ ),
+ self.ifcfg_path("eth1"): dedent(
+ """\
+ BOOTPROTO=dhcp
+ DEVICE=eth1
+ NM_CONTROLLED=no
+ ONBOOT=yes
+ TYPE=Ethernet
+ USERCTL=no
+ """
+ ),
+ self.control_path(): dedent(
+ """\
+ NETWORKING=yes
+ """
+ ),
+ }
+ # rh_distro.apply_network_config(V1_NET_CFG, False)
+ self._apply_and_verify(
+ self.distro.apply_network_config,
+ V1_NET_CFG,
+ expected_cfgs=expected_cfgs.copy(),
+ )
+
+ def test_apply_network_config_ipv6_rh(self):
+ expected_cfgs = {
+ self.ifcfg_path("eth0"): dedent(
+ """\
+ BOOTPROTO=none
+ DEFROUTE=yes
+ DEVICE=eth0
+ IPV6ADDR=2607:f0d0:1002:0011::2/64
+ IPV6INIT=yes
+ IPV6_AUTOCONF=no
+ IPV6_DEFAULTGW=2607:f0d0:1002:0011::1
+ IPV6_FORCE_ACCEPT_RA=no
+ NM_CONTROLLED=no
+ ONBOOT=yes
+ TYPE=Ethernet
+ USERCTL=no
+ """
+ ),
+ self.ifcfg_path("eth1"): dedent(
+ """\
+ BOOTPROTO=dhcp
+ DEVICE=eth1
+ NM_CONTROLLED=no
+ ONBOOT=yes
+ TYPE=Ethernet
+ USERCTL=no
+ """
+ ),
+ self.control_path(): dedent(
+ """\
+ NETWORKING=yes
+ NETWORKING_IPV6=yes
+ IPV6_AUTOCONF=no
+ """
+ ),
+ }
+ # rh_distro.apply_network_config(V1_NET_CFG_IPV6, False)
+ self._apply_and_verify(
+ self.distro.apply_network_config,
+ V1_NET_CFG_IPV6,
+ expected_cfgs=expected_cfgs.copy(),
+ )
+
+ def test_vlan_render_unsupported(self):
+ """Render officially unsupported vlan names."""
+ cfg = {
+ "version": 2,
+ "ethernets": {
+ "eth0": {
+ "addresses": ["192.10.1.2/24"],
+ "match": {"macaddress": "00:16:3e:60:7c:df"},
+ }
+ },
+ "vlans": {
+ "infra0": {
+ "addresses": ["10.0.1.2/16"],
+ "id": 1001,
+ "link": "eth0",
+ }
+ },
+ }
+ expected_cfgs = {
+ self.ifcfg_path("eth0"): dedent(
+ """\
+ BOOTPROTO=none
+ DEVICE=eth0
+ HWADDR=00:16:3e:60:7c:df
+ IPADDR=192.10.1.2
+ NETMASK=255.255.255.0
+ NM_CONTROLLED=no
+ ONBOOT=yes
+ TYPE=Ethernet
+ USERCTL=no
+ """
+ ),
+ self.ifcfg_path("infra0"): dedent(
+ """\
+ BOOTPROTO=none
+ DEVICE=infra0
+ IPADDR=10.0.1.2
+ NETMASK=255.255.0.0
+ NM_CONTROLLED=no
+ ONBOOT=yes
+ PHYSDEV=eth0
+ USERCTL=no
+ VLAN=yes
+ """
+ ),
+ self.control_path(): dedent(
+ """\
+ NETWORKING=yes
+ """
+ ),
+ }
+ self._apply_and_verify(
+ self.distro.apply_network_config, cfg, expected_cfgs=expected_cfgs
+ )
+
+ def test_vlan_render(self):
+ cfg = {
+ "version": 2,
+ "ethernets": {"eth0": {"addresses": ["192.10.1.2/24"]}},
+ "vlans": {
+ "eth0.1001": {
+ "addresses": ["10.0.1.2/16"],
+ "id": 1001,
+ "link": "eth0",
+ }
+ },
+ }
+ expected_cfgs = {
+ self.ifcfg_path("eth0"): dedent(
+ """\
+ BOOTPROTO=none
+ DEVICE=eth0
+ IPADDR=192.10.1.2
+ NETMASK=255.255.255.0
+ NM_CONTROLLED=no
+ ONBOOT=yes
+ TYPE=Ethernet
+ USERCTL=no
+ """
+ ),
+ self.ifcfg_path("eth0.1001"): dedent(
+ """\
+ BOOTPROTO=none
+ DEVICE=eth0.1001
+ IPADDR=10.0.1.2
+ NETMASK=255.255.0.0
+ NM_CONTROLLED=no
+ ONBOOT=yes
+ PHYSDEV=eth0
+ USERCTL=no
+ VLAN=yes
+ """
+ ),
+ self.control_path(): dedent(
+ """\
+ NETWORKING=yes
+ """
+ ),
+ }
+ self._apply_and_verify(
+ self.distro.apply_network_config, cfg, expected_cfgs=expected_cfgs
+ )
+
+
+class TestNetCfgDistroOpensuse(TestNetCfgDistroBase):
+ def setUp(self):
+ super(TestNetCfgDistroOpensuse, self).setUp()
+ self.distro = self._get_distro("opensuse", renderers=["sysconfig"])
+
+ def ifcfg_path(self, ifname):
+ return "/etc/sysconfig/network/ifcfg-%s" % ifname
+
+ def _apply_and_verify(
+ self, apply_fn, config, expected_cfgs=None, bringup=False
+ ):
+ if not expected_cfgs:
+ raise ValueError("expected_cfg must not be None")
+
+ tmpd = None
+ with mock.patch("cloudinit.net.sysconfig.available") as m_avail:
+ m_avail.return_value = True
+ with self.reRooted(tmpd) as tmpd:
+ apply_fn(config, bringup)
+
+ results = dir2dict(tmpd)
+ for cfgpath, expected in expected_cfgs.items():
+ self.assertCfgEquals(expected, results[cfgpath])
+ self.assertEqual(0o644, get_mode(cfgpath, tmpd))
+
+ def test_apply_network_config_opensuse(self):
+ """Opensuse uses apply_network_config and renders sysconfig"""
+ expected_cfgs = {
+ self.ifcfg_path("eth0"): dedent(
+ """\
+ BOOTPROTO=static
+ IPADDR=192.168.1.5
+ NETMASK=255.255.255.0
+ STARTMODE=auto
+ """
+ ),
+ self.ifcfg_path("eth1"): dedent(
+ """\
+ BOOTPROTO=dhcp4
+ STARTMODE=auto
+ """
+ ),
+ }
+ self._apply_and_verify(
+ self.distro.apply_network_config,
+ V1_NET_CFG,
+ expected_cfgs=expected_cfgs.copy(),
+ )
+
+ def test_apply_network_config_ipv6_opensuse(self):
+ """Opensuse uses apply_network_config and renders sysconfig w/ipv6"""
+ expected_cfgs = {
+ self.ifcfg_path("eth0"): dedent(
+ """\
+ BOOTPROTO=static
+ IPADDR6=2607:f0d0:1002:0011::2/64
+ STARTMODE=auto
+ """
+ ),
+ self.ifcfg_path("eth1"): dedent(
+ """\
+ BOOTPROTO=dhcp4
+ STARTMODE=auto
+ """
+ ),
+ }
+ self._apply_and_verify(
+ self.distro.apply_network_config,
+ V1_NET_CFG_IPV6,
+ expected_cfgs=expected_cfgs.copy(),
+ )
+
+
+class TestNetCfgDistroArch(TestNetCfgDistroBase):
+ def setUp(self):
+ super(TestNetCfgDistroArch, self).setUp()
+ self.distro = self._get_distro("arch", renderers=["netplan"])
+
+ def _apply_and_verify(
+ self,
+ apply_fn,
+ config,
+ expected_cfgs=None,
+ bringup=False,
+ with_netplan=False,
+ ):
+ if not expected_cfgs:
+ raise ValueError("expected_cfg must not be None")
+
+ tmpd = None
+ with mock.patch(
+ "cloudinit.net.netplan.available", return_value=with_netplan
+ ):
+ with self.reRooted(tmpd) as tmpd:
+ apply_fn(config, bringup)
+
+ results = dir2dict(tmpd)
+ for cfgpath, expected in expected_cfgs.items():
+ print("----------")
+ print(expected)
+ print("^^^^ expected | rendered VVVVVVV")
+ print(results[cfgpath])
+ print("----------")
+ self.assertEqual(expected, results[cfgpath])
+ self.assertEqual(0o644, get_mode(cfgpath, tmpd))
+
+ def netctl_path(self, iface):
+ return "/etc/netctl/%s" % iface
+
+ def netplan_path(self):
+ return "/etc/netplan/50-cloud-init.yaml"
+
+ def test_apply_network_config_v1_without_netplan(self):
+ # Note that this is in fact an invalid netctl config:
+ # "Address=None/None"
+ # But this is what the renderer has been writing out for a long time,
+ # and the test's purpose is to assert that the netctl renderer is
+ # still being used in absence of netplan, not the correctness of the
+ # rendered netctl config.
+ expected_cfgs = {
+ self.netctl_path("eth0"): dedent(
+ """\
+ Address=192.168.1.5/255.255.255.0
+ Connection=ethernet
+ DNS=()
+ Gateway=192.168.1.254
+ IP=static
+ Interface=eth0
+ """
+ ),
+ self.netctl_path("eth1"): dedent(
+ """\
+ Address=None/None
+ Connection=ethernet
+ DNS=()
+ Gateway=
+ IP=dhcp
+ Interface=eth1
+ """
+ ),
+ }
+
+ # ub_distro.apply_network_config(V1_NET_CFG, False)
+ self._apply_and_verify(
+ self.distro.apply_network_config,
+ V1_NET_CFG,
+ expected_cfgs=expected_cfgs.copy(),
+ with_netplan=False,
+ )
+
+ def test_apply_network_config_v1_with_netplan(self):
+ expected_cfgs = {
+ self.netplan_path(): dedent(
+ """\
+ # generated by cloud-init
+ network:
+ version: 2
+ ethernets:
+ eth0:
+ addresses:
+ - 192.168.1.5/24
+ gateway4: 192.168.1.254
+ eth1:
+ dhcp4: true
+ """
+ ),
+ }
+
+ with mock.patch(
+ "cloudinit.net.netplan.get_devicelist", return_value=[]
+ ):
+ self._apply_and_verify(
+ self.distro.apply_network_config,
+ V1_NET_CFG,
+ expected_cfgs=expected_cfgs.copy(),
+ with_netplan=True,
+ )
+
+
+class TestNetCfgDistroPhoton(TestNetCfgDistroBase):
+ def setUp(self):
+ super(TestNetCfgDistroPhoton, self).setUp()
+ self.distro = self._get_distro("photon", renderers=["networkd"])
+
+ def create_conf_dict(self, contents):
+ content_dict = {}
+ for line in contents:
+ if line:
+ line = line.strip()
+ if line and re.search(r"^\[(.+)\]$", line):
+ content_dict[line] = []
+ key = line
+ elif line:
+ assert key
+ content_dict[key].append(line)
+
+ return content_dict
+
+ def compare_dicts(self, actual, expected):
+ for k, v in actual.items():
+ self.assertEqual(sorted(expected[k]), sorted(v))
+
+ def _apply_and_verify(
+ self, apply_fn, config, expected_cfgs=None, bringup=False
+ ):
+ if not expected_cfgs:
+ raise ValueError("expected_cfg must not be None")
+
+ tmpd = None
+ with mock.patch("cloudinit.net.networkd.available") as m_avail:
+ m_avail.return_value = True
+ with self.reRooted(tmpd) as tmpd:
+ apply_fn(config, bringup)
+
+ results = dir2dict(tmpd)
+ for cfgpath, expected in expected_cfgs.items():
+ actual = self.create_conf_dict(results[cfgpath].splitlines())
+ self.compare_dicts(actual, expected)
+ self.assertEqual(0o644, get_mode(cfgpath, tmpd))
+
+ def nwk_file_path(self, ifname):
+ return "/etc/systemd/network/10-cloud-init-%s.network" % ifname
+
+ def net_cfg_1(self, ifname):
+ ret = (
+ """\
+ [Match]
+ Name=%s
+ [Network]
+ DHCP=no
+ [Address]
+ Address=192.168.1.5/24
+ [Route]
+ Gateway=192.168.1.254"""
+ % ifname
+ )
+ return ret
+
+ def net_cfg_2(self, ifname):
+ ret = (
+ """\
+ [Match]
+ Name=%s
+ [Network]
+ DHCP=ipv4"""
+ % ifname
+ )
+ return ret
+
+ def test_photon_network_config_v1(self):
+ tmp = self.net_cfg_1("eth0").splitlines()
+ expected_eth0 = self.create_conf_dict(tmp)
+
+ tmp = self.net_cfg_2("eth1").splitlines()
+ expected_eth1 = self.create_conf_dict(tmp)
+
+ expected_cfgs = {
+ self.nwk_file_path("eth0"): expected_eth0,
+ self.nwk_file_path("eth1"): expected_eth1,
+ }
+
+ self._apply_and_verify(
+ self.distro.apply_network_config, V1_NET_CFG, expected_cfgs.copy()
+ )
+
+ def test_photon_network_config_v2(self):
+ tmp = self.net_cfg_1("eth7").splitlines()
+ expected_eth7 = self.create_conf_dict(tmp)
+
+ tmp = self.net_cfg_2("eth9").splitlines()
+ expected_eth9 = self.create_conf_dict(tmp)
+
+ expected_cfgs = {
+ self.nwk_file_path("eth7"): expected_eth7,
+ self.nwk_file_path("eth9"): expected_eth9,
+ }
+
+ self._apply_and_verify(
+ self.distro.apply_network_config, V2_NET_CFG, expected_cfgs.copy()
+ )
+
+ def test_photon_network_config_v1_with_duplicates(self):
+ expected = """\
+ [Match]
+ Name=eth0
+ [Network]
+ DHCP=no
+ DNS=1.2.3.4
+ Domains=test.com
+ [Address]
+ Address=192.168.0.102/24"""
+
+ net_cfg = safeyaml.load(V1_NET_CFG_WITH_DUPS)
+
+ expected = self.create_conf_dict(expected.splitlines())
+ expected_cfgs = {
+ self.nwk_file_path("eth0"): expected,
+ }
+
+ self._apply_and_verify(
+ self.distro.apply_network_config, net_cfg, expected_cfgs.copy()
+ )
+
+
+def get_mode(path, target=None):
+ return os.stat(subp.target_path(target, path)).st_mode & 0o777
+
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/distros/test_networking.py b/tests/unittests/distros/test_networking.py
new file mode 100644
index 00000000..274647cb
--- /dev/null
+++ b/tests/unittests/distros/test_networking.py
@@ -0,0 +1,231 @@
+# See https://docs.pytest.org/en/stable/example
+# /parametrize.html#parametrizing-conditional-raising
+from contextlib import ExitStack as does_not_raise
+from unittest import mock
+
+import pytest
+
+from cloudinit import net
+from cloudinit.distros.networking import (
+ BSDNetworking,
+ LinuxNetworking,
+ Networking,
+)
+
+
+@pytest.fixture
+def generic_networking_cls():
+ """Returns a direct Networking subclass which errors on /sys usage.
+
+ This enables the direct testing of functionality only present on the
+ ``Networking`` super-class, and provides a check on accidentally using /sys
+ in that context.
+ """
+
+ class TestNetworking(Networking):
+ def is_physical(self, *args, **kwargs):
+ raise NotImplementedError
+
+ def settle(self, *args, **kwargs):
+ raise NotImplementedError
+
+ def try_set_link_up(self, *args, **kwargs):
+ raise NotImplementedError
+
+ error = AssertionError("Unexpectedly used /sys in generic networking code")
+ with mock.patch(
+ "cloudinit.net.get_sys_class_path",
+ side_effect=error,
+ ):
+ yield TestNetworking
+
+
+@pytest.fixture
+def sys_class_net(tmpdir):
+ sys_class_net_path = tmpdir.join("sys/class/net")
+ sys_class_net_path.ensure_dir()
+ with mock.patch(
+ "cloudinit.net.get_sys_class_path",
+ return_value=sys_class_net_path.strpath + "/",
+ ):
+ yield sys_class_net_path
+
+
+class TestBSDNetworkingIsPhysical:
+ def test_raises_notimplementederror(self):
+ with pytest.raises(NotImplementedError):
+ BSDNetworking().is_physical("eth0")
+
+
+class TestLinuxNetworkingIsPhysical:
+ def test_returns_false_by_default(self, sys_class_net):
+ assert not LinuxNetworking().is_physical("eth0")
+
+ def test_returns_false_if_devname_exists_but_not_physical(
+ self, sys_class_net
+ ):
+ devname = "eth0"
+ sys_class_net.join(devname).mkdir()
+ assert not LinuxNetworking().is_physical(devname)
+
+ def test_returns_true_if_device_is_physical(self, sys_class_net):
+ devname = "eth0"
+ device_dir = sys_class_net.join(devname)
+ device_dir.mkdir()
+ device_dir.join("device").write("")
+
+ assert LinuxNetworking().is_physical(devname)
+
+
+class TestBSDNetworkingTrySetLinkUp:
+ def test_raises_notimplementederror(self):
+ with pytest.raises(NotImplementedError):
+ BSDNetworking().try_set_link_up("eth0")
+
+
+@mock.patch("cloudinit.net.is_up")
+@mock.patch("cloudinit.distros.networking.subp.subp")
+class TestLinuxNetworkingTrySetLinkUp:
+ def test_calls_subp_return_true(self, m_subp, m_is_up):
+ devname = "eth0"
+ m_is_up.return_value = True
+ is_success = LinuxNetworking().try_set_link_up(devname)
+
+ assert (
+ mock.call(["ip", "link", "set", devname, "up"])
+ == m_subp.call_args_list[-1]
+ )
+ assert is_success
+
+ def test_calls_subp_return_false(self, m_subp, m_is_up):
+ devname = "eth0"
+ m_is_up.return_value = False
+ is_success = LinuxNetworking().try_set_link_up(devname)
+
+ assert (
+ mock.call(["ip", "link", "set", devname, "up"])
+ == m_subp.call_args_list[-1]
+ )
+ assert not is_success
+
+
+class TestBSDNetworkingSettle:
+ def test_settle_doesnt_error(self):
+ # This also implicitly tests that it doesn't use subp.subp
+ BSDNetworking().settle()
+
+
+@pytest.mark.usefixtures("sys_class_net")
+@mock.patch("cloudinit.distros.networking.util.udevadm_settle", autospec=True)
+class TestLinuxNetworkingSettle:
+ def test_no_arguments(self, m_udevadm_settle):
+ LinuxNetworking().settle()
+
+ assert [mock.call(exists=None)] == m_udevadm_settle.call_args_list
+
+ def test_exists_argument(self, m_udevadm_settle):
+ LinuxNetworking().settle(exists="ens3")
+
+ expected_path = net.sys_dev_path("ens3")
+ assert [
+ mock.call(exists=expected_path)
+ ] == m_udevadm_settle.call_args_list
+
+
+class TestNetworkingWaitForPhysDevs:
+ @pytest.fixture
+ def wait_for_physdevs_netcfg(self):
+ """This config is shared across all the tests in this class."""
+
+ def ethernet(mac, name, driver=None, device_id=None):
+ v2_cfg = {"set-name": name, "match": {"macaddress": mac}}
+ if driver:
+ v2_cfg["match"].update({"driver": driver})
+ if device_id:
+ v2_cfg["match"].update({"device_id": device_id})
+
+ return v2_cfg
+
+ physdevs = [
+ ["aa:bb:cc:dd:ee:ff", "eth0", "virtio", "0x1000"],
+ ["00:11:22:33:44:55", "ens3", "e1000", "0x1643"],
+ ]
+ netcfg = {
+ "version": 2,
+ "ethernets": {args[1]: ethernet(*args) for args in physdevs},
+ }
+ return netcfg
+
+ def test_skips_settle_if_all_present(
+ self,
+ generic_networking_cls,
+ wait_for_physdevs_netcfg,
+ ):
+ networking = generic_networking_cls()
+ with mock.patch.object(
+ networking, "get_interfaces_by_mac"
+ ) as m_get_interfaces_by_mac:
+ m_get_interfaces_by_mac.side_effect = iter(
+ [{"aa:bb:cc:dd:ee:ff": "eth0", "00:11:22:33:44:55": "ens3"}]
+ )
+ with mock.patch.object(
+ networking, "settle", autospec=True
+ ) as m_settle:
+ networking.wait_for_physdevs(wait_for_physdevs_netcfg)
+ assert 0 == m_settle.call_count
+
+ def test_calls_udev_settle_on_missing(
+ self,
+ generic_networking_cls,
+ wait_for_physdevs_netcfg,
+ ):
+ networking = generic_networking_cls()
+ with mock.patch.object(
+ networking, "get_interfaces_by_mac"
+ ) as m_get_interfaces_by_mac:
+ m_get_interfaces_by_mac.side_effect = iter(
+ [
+ {
+ "aa:bb:cc:dd:ee:ff": "eth0"
+ }, # first call ens3 is missing
+ {
+ "aa:bb:cc:dd:ee:ff": "eth0",
+ "00:11:22:33:44:55": "ens3",
+ }, # second call has both
+ ]
+ )
+ with mock.patch.object(
+ networking, "settle", autospec=True
+ ) as m_settle:
+ networking.wait_for_physdevs(wait_for_physdevs_netcfg)
+ m_settle.assert_called_with(exists="ens3")
+
+ @pytest.mark.parametrize(
+ "strict,expectation",
+ [(True, pytest.raises(RuntimeError)), (False, does_not_raise())],
+ )
+ def test_retrying_and_strict_behaviour(
+ self,
+ strict,
+ expectation,
+ generic_networking_cls,
+ wait_for_physdevs_netcfg,
+ ):
+ networking = generic_networking_cls()
+ with mock.patch.object(
+ networking, "get_interfaces_by_mac"
+ ) as m_get_interfaces_by_mac:
+ m_get_interfaces_by_mac.return_value = {}
+
+ with mock.patch.object(
+ networking, "settle", autospec=True
+ ) as m_settle:
+ with expectation:
+ networking.wait_for_physdevs(
+ wait_for_physdevs_netcfg, strict=strict
+ )
+
+ assert (
+ 5 * len(wait_for_physdevs_netcfg["ethernets"])
+ == m_settle.call_count
+ )
diff --git a/tests/unittests/distros/test_opensuse.py b/tests/unittests/distros/test_opensuse.py
new file mode 100644
index 00000000..4a4b266f
--- /dev/null
+++ b/tests/unittests/distros/test_opensuse.py
@@ -0,0 +1,11 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+from tests.unittests.helpers import CiTestCase
+
+from . import _get_distro
+
+
+class TestopenSUSE(CiTestCase):
+ def test_get_distro(self):
+ distro = _get_distro("opensuse")
+ self.assertEqual(distro.osfamily, "suse")
diff --git a/tests/unittests/distros/test_photon.py b/tests/unittests/distros/test_photon.py
new file mode 100644
index 00000000..fed30c2b
--- /dev/null
+++ b/tests/unittests/distros/test_photon.py
@@ -0,0 +1,68 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+from cloudinit import util
+from tests.unittests.helpers import CiTestCase, mock
+
+from . import _get_distro
+
+SYSTEM_INFO = {
+ "paths": {
+ "cloud_dir": "/var/lib/cloud/",
+ "templates_dir": "/etc/cloud/templates/",
+ },
+ "network": {"renderers": "networkd"},
+}
+
+
+class TestPhoton(CiTestCase):
+ with_logs = True
+ distro = _get_distro("photon", SYSTEM_INFO)
+ expected_log_line = "Rely on PhotonOS default network config"
+
+ def test_network_renderer(self):
+ self.assertEqual(self.distro._cfg["network"]["renderers"], "networkd")
+
+ def test_get_distro(self):
+ self.assertEqual(self.distro.osfamily, "photon")
+
+ @mock.patch("cloudinit.distros.photon.subp.subp")
+ def test_write_hostname(self, m_subp):
+ hostname = "myhostname"
+ hostfile = self.tmp_path("previous-hostname")
+ self.distro._write_hostname(hostname, hostfile)
+ self.assertEqual(hostname, util.load_file(hostfile))
+
+ ret = self.distro._read_hostname(hostfile)
+ self.assertEqual(ret, hostname)
+
+ m_subp.return_value = (None, None)
+ hostfile += "hostfile"
+ self.distro._write_hostname(hostname, hostfile)
+
+ m_subp.return_value = (hostname, None)
+ ret = self.distro._read_hostname(hostfile)
+ self.assertEqual(ret, hostname)
+
+ self.logs.truncate(0)
+ m_subp.return_value = (None, "bla")
+ self.distro._write_hostname(hostname, None)
+ self.assertIn("Error while setting hostname", self.logs.getvalue())
+
+ @mock.patch("cloudinit.net.generate_fallback_config")
+ def test_fallback_netcfg(self, m_fallback_cfg):
+
+ key = "disable_fallback_netcfg"
+ # Don't use fallback if no setting given
+ self.logs.truncate(0)
+ assert self.distro.generate_fallback_config() is None
+ self.assertIn(self.expected_log_line, self.logs.getvalue())
+
+ self.logs.truncate(0)
+ self.distro._cfg[key] = True
+ assert self.distro.generate_fallback_config() is None
+ self.assertIn(self.expected_log_line, self.logs.getvalue())
+
+ self.logs.truncate(0)
+ self.distro._cfg[key] = False
+ assert self.distro.generate_fallback_config() is not None
+ self.assertNotIn(self.expected_log_line, self.logs.getvalue())
diff --git a/tests/unittests/distros/test_resolv.py b/tests/unittests/distros/test_resolv.py
new file mode 100644
index 00000000..65e78101
--- /dev/null
+++ b/tests/unittests/distros/test_resolv.py
@@ -0,0 +1,64 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+import re
+
+from cloudinit.distros.parsers import resolv_conf
+from tests.unittests.helpers import TestCase
+
+BASE_RESOLVE = """
+; generated by /sbin/dhclient-script
+search blah.yahoo.com yahoo.com
+nameserver 10.15.44.14
+nameserver 10.15.30.92
+"""
+BASE_RESOLVE = BASE_RESOLVE.strip()
+
+
+class TestResolvHelper(TestCase):
+ def test_parse_same(self):
+ rp = resolv_conf.ResolvConf(BASE_RESOLVE)
+ rp_r = str(rp).strip()
+ self.assertEqual(BASE_RESOLVE, rp_r)
+
+ def test_local_domain(self):
+ rp = resolv_conf.ResolvConf(BASE_RESOLVE)
+ self.assertIsNone(rp.local_domain)
+
+ rp.local_domain = "bob"
+ self.assertEqual("bob", rp.local_domain)
+ self.assertIn("domain bob", str(rp))
+
+ def test_nameservers(self):
+ rp = resolv_conf.ResolvConf(BASE_RESOLVE)
+ self.assertIn("10.15.44.14", rp.nameservers)
+ self.assertIn("10.15.30.92", rp.nameservers)
+ rp.add_nameserver("10.2")
+ self.assertIn("10.2", rp.nameservers)
+ self.assertIn("nameserver 10.2", str(rp))
+ self.assertNotIn("10.3", rp.nameservers)
+ self.assertEqual(len(rp.nameservers), 3)
+ rp.add_nameserver("10.2")
+ rp.add_nameserver("10.3")
+ self.assertNotIn("10.3", rp.nameservers)
+
+ def test_search_domains(self):
+ rp = resolv_conf.ResolvConf(BASE_RESOLVE)
+ self.assertIn("yahoo.com", rp.search_domains)
+ self.assertIn("blah.yahoo.com", rp.search_domains)
+ rp.add_search_domain("bbb.y.com")
+ self.assertIn("bbb.y.com", rp.search_domains)
+ self.assertTrue(re.search(r"search(.*)bbb.y.com(.*)", str(rp)))
+ self.assertIn("bbb.y.com", rp.search_domains)
+ rp.add_search_domain("bbb.y.com")
+ self.assertEqual(len(rp.search_domains), 3)
+ rp.add_search_domain("bbb2.y.com")
+ self.assertEqual(len(rp.search_domains), 4)
+ rp.add_search_domain("bbb3.y.com")
+ self.assertEqual(len(rp.search_domains), 5)
+ rp.add_search_domain("bbb4.y.com")
+ self.assertEqual(len(rp.search_domains), 6)
+ self.assertRaises(ValueError, rp.add_search_domain, "bbb5.y.com")
+ self.assertEqual(len(rp.search_domains), 6)
+
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/distros/test_sles.py b/tests/unittests/distros/test_sles.py
new file mode 100644
index 00000000..66b8b13d
--- /dev/null
+++ b/tests/unittests/distros/test_sles.py
@@ -0,0 +1,11 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+from tests.unittests.helpers import CiTestCase
+
+from . import _get_distro
+
+
+class TestSLES(CiTestCase):
+ def test_get_distro(self):
+ distro = _get_distro("sles")
+ self.assertEqual(distro.osfamily, "suse")
diff --git a/tests/unittests/distros/test_sysconfig.py b/tests/unittests/distros/test_sysconfig.py
new file mode 100644
index 00000000..d0979e17
--- /dev/null
+++ b/tests/unittests/distros/test_sysconfig.py
@@ -0,0 +1,92 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+import re
+
+from cloudinit.distros.parsers.sys_conf import SysConf
+from tests.unittests.helpers import TestCase
+
+# Lots of good examples @
+# http://content.hccfl.edu/pollock/AUnix1/SysconfigFilesDesc.txt
+
+
+class TestSysConfHelper(TestCase):
+ # This function was added in 2.7, make it work for 2.6
+ def assertRegMatches(self, text, regexp):
+ regexp = re.compile(regexp)
+ self.assertTrue(
+ regexp.search(text),
+ msg="%s must match %s!" % (text, regexp.pattern),
+ )
+
+ def test_parse_no_change(self):
+ contents = """# A comment
+USESMBAUTH=no
+KEYTABLE=/usr/lib/kbd/keytables/us.map
+SHORTDATE=$(date +%y:%m:%d:%H:%M)
+HOSTNAME=blahblah
+NETMASK0=255.255.255.0
+# Inline comment
+LIST=$LOGROOT/incremental-list
+IPV6TO4_ROUTING='eth0-:0004::1/64 eth1-:0005::1/64'
+ETHTOOL_OPTS="-K ${DEVICE} tso on; -G ${DEVICE} rx 256 tx 256"
+USEMD5=no"""
+ conf = SysConf(contents.splitlines())
+ self.assertEqual(conf["HOSTNAME"], "blahblah")
+ self.assertEqual(conf["SHORTDATE"], "$(date +%y:%m:%d:%H:%M)")
+ # Should be unquoted
+ self.assertEqual(
+ conf["ETHTOOL_OPTS"],
+ "-K ${DEVICE} tso on; -G ${DEVICE} rx 256 tx 256",
+ )
+ self.assertEqual(contents, str(conf))
+
+ def test_parse_shell_vars(self):
+ contents = "USESMBAUTH=$XYZ"
+ conf = SysConf(contents.splitlines())
+ self.assertEqual(contents, str(conf))
+ conf = SysConf("")
+ conf["B"] = "${ZZ}d apples"
+ # Should be quoted
+ self.assertEqual('B="${ZZ}d apples"', str(conf))
+ conf = SysConf("")
+ conf["B"] = "$? d apples"
+ self.assertEqual('B="$? d apples"', str(conf))
+ contents = 'IPMI_WATCHDOG_OPTIONS="timeout=60"'
+ conf = SysConf(contents.splitlines())
+ self.assertEqual("IPMI_WATCHDOG_OPTIONS=timeout=60", str(conf))
+
+ def test_parse_adjust(self):
+ contents = 'IPV6TO4_ROUTING="eth0-:0004::1/64 eth1-:0005::1/64"'
+ conf = SysConf(contents.splitlines())
+ # Should be unquoted
+ self.assertEqual(
+ "eth0-:0004::1/64 eth1-:0005::1/64", conf["IPV6TO4_ROUTING"]
+ )
+ conf["IPV6TO4_ROUTING"] = "blah \tblah"
+ contents2 = str(conf).strip()
+ # Should be requoted due to whitespace
+ self.assertRegMatches(
+ contents2, r"IPV6TO4_ROUTING=[\']blah\s+blah[\']"
+ )
+
+ def test_parse_no_adjust_shell(self):
+ conf = SysConf("".splitlines())
+ conf["B"] = " $(time)"
+ contents = str(conf)
+ self.assertEqual("B= $(time)", contents)
+
+ def test_parse_empty(self):
+ contents = ""
+ conf = SysConf(contents.splitlines())
+ self.assertEqual("", str(conf).strip())
+
+ def test_parse_add_new(self):
+ contents = "BLAH=b"
+ conf = SysConf(contents.splitlines())
+ conf["Z"] = "d"
+ contents = str(conf)
+ self.assertIn("Z=d", contents)
+ self.assertIn("BLAH=b", contents)
+
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/distros/test_user_data_normalize.py b/tests/unittests/distros/test_user_data_normalize.py
new file mode 100644
index 00000000..67ea024b
--- /dev/null
+++ b/tests/unittests/distros/test_user_data_normalize.py
@@ -0,0 +1,365 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+from unittest import mock
+
+from cloudinit import distros, helpers, settings
+from cloudinit.distros import ug_util
+from tests.unittests.helpers import TestCase
+
+bcfg = {
+ "name": "bob",
+ "plain_text_passwd": "ubuntu",
+ "home": "/home/ubuntu",
+ "shell": "/bin/bash",
+ "lock_passwd": True,
+ "gecos": "Ubuntu",
+ "groups": ["foo"],
+}
+
+
+class TestUGNormalize(TestCase):
+ def setUp(self):
+ super(TestUGNormalize, self).setUp()
+ self.add_patch("cloudinit.util.system_is_snappy", "m_snappy")
+
+ def _make_distro(self, dtype, def_user=None):
+ cfg = dict(settings.CFG_BUILTIN)
+ cfg["system_info"]["distro"] = dtype
+ paths = helpers.Paths(cfg["system_info"]["paths"])
+ distro_cls = distros.fetch(dtype)
+ if def_user:
+ cfg["system_info"]["default_user"] = def_user.copy()
+ distro = distro_cls(dtype, cfg["system_info"], paths)
+ return distro
+
+ def _norm(self, cfg, distro):
+ return ug_util.normalize_users_groups(cfg, distro)
+
+ def test_group_dict(self):
+ distro = self._make_distro("ubuntu")
+ g = {
+ "groups": [
+ {"ubuntu": ["foo", "bar"], "bob": "users"},
+ "cloud-users",
+ {"bob": "users2"},
+ ]
+ }
+ (_users, groups) = self._norm(g, distro)
+ self.assertIn("ubuntu", groups)
+ ub_members = groups["ubuntu"]
+ self.assertEqual(sorted(["foo", "bar"]), sorted(ub_members))
+ self.assertIn("bob", groups)
+ b_members = groups["bob"]
+ self.assertEqual(sorted(["users", "users2"]), sorted(b_members))
+
+ def test_basic_groups(self):
+ distro = self._make_distro("ubuntu")
+ ug_cfg = {
+ "groups": ["bob"],
+ }
+ (users, groups) = self._norm(ug_cfg, distro)
+ self.assertIn("bob", groups)
+ self.assertEqual({}, users)
+
+ def test_csv_groups(self):
+ distro = self._make_distro("ubuntu")
+ ug_cfg = {
+ "groups": "bob,joe,steve",
+ }
+ (users, groups) = self._norm(ug_cfg, distro)
+ self.assertIn("bob", groups)
+ self.assertIn("joe", groups)
+ self.assertIn("steve", groups)
+ self.assertEqual({}, users)
+
+ def test_more_groups(self):
+ distro = self._make_distro("ubuntu")
+ ug_cfg = {"groups": ["bob", "joe", "steve"]}
+ (users, groups) = self._norm(ug_cfg, distro)
+ self.assertIn("bob", groups)
+ self.assertIn("joe", groups)
+ self.assertIn("steve", groups)
+ self.assertEqual({}, users)
+
+ def test_member_groups(self):
+ distro = self._make_distro("ubuntu")
+ ug_cfg = {
+ "groups": {
+ "bob": ["s"],
+ "joe": [],
+ "steve": [],
+ }
+ }
+ (users, groups) = self._norm(ug_cfg, distro)
+ self.assertIn("bob", groups)
+ self.assertEqual(["s"], groups["bob"])
+ self.assertEqual([], groups["joe"])
+ self.assertIn("joe", groups)
+ self.assertIn("steve", groups)
+ self.assertEqual({}, users)
+
+ def test_users_simple_dict(self):
+ distro = self._make_distro("ubuntu", bcfg)
+ ug_cfg = {
+ "users": {
+ "default": True,
+ }
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn("bob", users)
+ ug_cfg = {
+ "users": {
+ "default": "yes",
+ }
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn("bob", users)
+ ug_cfg = {
+ "users": {
+ "default": "1",
+ }
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn("bob", users)
+
+ def test_users_simple_dict_no(self):
+ distro = self._make_distro("ubuntu", bcfg)
+ ug_cfg = {
+ "users": {
+ "default": False,
+ }
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertEqual({}, users)
+ ug_cfg = {
+ "users": {
+ "default": "no",
+ }
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertEqual({}, users)
+
+ def test_users_simple_csv(self):
+ distro = self._make_distro("ubuntu")
+ ug_cfg = {
+ "users": "joe,bob",
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn("joe", users)
+ self.assertIn("bob", users)
+ self.assertEqual({"default": False}, users["joe"])
+ self.assertEqual({"default": False}, users["bob"])
+
+ def test_users_simple(self):
+ distro = self._make_distro("ubuntu")
+ ug_cfg = {
+ "users": ["joe", "bob"],
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn("joe", users)
+ self.assertIn("bob", users)
+ self.assertEqual({"default": False}, users["joe"])
+ self.assertEqual({"default": False}, users["bob"])
+
+ def test_users_old_user(self):
+ distro = self._make_distro("ubuntu", bcfg)
+ ug_cfg = {"user": "zetta", "users": "default"}
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertNotIn("bob", users) # Bob is not the default now, zetta is
+ self.assertIn("zetta", users)
+ self.assertTrue(users["zetta"]["default"])
+ self.assertNotIn("default", users)
+ ug_cfg = {"user": "zetta", "users": "default, joe"}
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertNotIn("bob", users) # Bob is not the default now, zetta is
+ self.assertIn("joe", users)
+ self.assertIn("zetta", users)
+ self.assertTrue(users["zetta"]["default"])
+ self.assertNotIn("default", users)
+ ug_cfg = {"user": "zetta", "users": ["bob", "joe"]}
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn("bob", users)
+ self.assertIn("joe", users)
+ self.assertIn("zetta", users)
+ self.assertTrue(users["zetta"]["default"])
+ ug_cfg = {
+ "user": "zetta",
+ "users": {
+ "bob": True,
+ "joe": True,
+ },
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn("bob", users)
+ self.assertIn("joe", users)
+ self.assertIn("zetta", users)
+ self.assertTrue(users["zetta"]["default"])
+ ug_cfg = {
+ "user": "zetta",
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn("zetta", users)
+ ug_cfg = {}
+ (users, groups) = self._norm(ug_cfg, distro)
+ self.assertEqual({}, users)
+ self.assertEqual({}, groups)
+
+ def test_users_dict_default_additional(self):
+ distro = self._make_distro("ubuntu", bcfg)
+ ug_cfg = {
+ "users": [{"name": "default", "blah": True}],
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn("bob", users)
+ self.assertEqual(
+ ",".join(distro.get_default_user()["groups"]),
+ users["bob"]["groups"],
+ )
+ self.assertEqual(True, users["bob"]["blah"])
+ self.assertEqual(True, users["bob"]["default"])
+
+ def test_users_dict_extract(self):
+ distro = self._make_distro("ubuntu", bcfg)
+ ug_cfg = {
+ "users": [
+ "default",
+ ],
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn("bob", users)
+ (name, config) = ug_util.extract_default(users)
+ self.assertEqual(name, "bob")
+ expected_config = {}
+ def_config = None
+ try:
+ def_config = distro.get_default_user()
+ except NotImplementedError:
+ pass
+ if not def_config:
+ def_config = {}
+ expected_config.update(def_config)
+
+ # Ignore these for now
+ expected_config.pop("name", None)
+ expected_config.pop("groups", None)
+ config.pop("groups", None)
+ self.assertEqual(config, expected_config)
+
+ def test_users_dict_default(self):
+ distro = self._make_distro("ubuntu", bcfg)
+ ug_cfg = {
+ "users": [
+ "default",
+ ],
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn("bob", users)
+ self.assertEqual(
+ ",".join(distro.get_default_user()["groups"]),
+ users["bob"]["groups"],
+ )
+ self.assertEqual(True, users["bob"]["default"])
+
+ def test_users_dict_trans(self):
+ distro = self._make_distro("ubuntu")
+ ug_cfg = {
+ "users": [
+ {"name": "joe", "tr-me": True},
+ {"name": "bob"},
+ ],
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn("joe", users)
+ self.assertIn("bob", users)
+ self.assertEqual({"tr_me": True, "default": False}, users["joe"])
+ self.assertEqual({"default": False}, users["bob"])
+
+ def test_users_dict(self):
+ distro = self._make_distro("ubuntu")
+ ug_cfg = {
+ "users": [
+ {"name": "joe"},
+ {"name": "bob"},
+ ],
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn("joe", users)
+ self.assertIn("bob", users)
+ self.assertEqual({"default": False}, users["joe"])
+ self.assertEqual({"default": False}, users["bob"])
+
+ @mock.patch("cloudinit.subp.subp")
+ def test_create_snap_user(self, mock_subp):
+ mock_subp.side_effect = [
+ ('{"username": "joe", "ssh-key-count": 1}\n', "")
+ ]
+ distro = self._make_distro("ubuntu")
+ ug_cfg = {
+ "users": [
+ {"name": "joe", "snapuser": "joe@joe.com"},
+ ],
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ for (user, config) in users.items():
+ print("user=%s config=%s" % (user, config))
+ username = distro.create_user(user, **config)
+
+ snapcmd = ["snap", "create-user", "--sudoer", "--json", "joe@joe.com"]
+ mock_subp.assert_called_with(snapcmd, capture=True, logstring=snapcmd)
+ self.assertEqual(username, "joe")
+
+ @mock.patch("cloudinit.subp.subp")
+ def test_create_snap_user_known(self, mock_subp):
+ mock_subp.side_effect = [
+ ('{"username": "joe", "ssh-key-count": 1}\n', "")
+ ]
+ distro = self._make_distro("ubuntu")
+ ug_cfg = {
+ "users": [
+ {"name": "joe", "snapuser": "joe@joe.com", "known": True},
+ ],
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ for (user, config) in users.items():
+ print("user=%s config=%s" % (user, config))
+ username = distro.create_user(user, **config)
+
+ snapcmd = [
+ "snap",
+ "create-user",
+ "--sudoer",
+ "--json",
+ "--known",
+ "joe@joe.com",
+ ]
+ mock_subp.assert_called_with(snapcmd, capture=True, logstring=snapcmd)
+ self.assertEqual(username, "joe")
+
+ @mock.patch("cloudinit.util.system_is_snappy")
+ @mock.patch("cloudinit.util.is_group")
+ @mock.patch("cloudinit.subp.subp")
+ def test_add_user_on_snappy_system(
+ self, mock_subp, mock_isgrp, mock_snappy
+ ):
+ mock_isgrp.return_value = False
+ mock_subp.return_value = True
+ mock_snappy.return_value = True
+ distro = self._make_distro("ubuntu")
+ ug_cfg = {
+ "users": [
+ {"name": "joe", "groups": "users", "create_groups": True},
+ ],
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ for (user, config) in users.items():
+ print("user=%s config=%s" % (user, config))
+ distro.add_user(user, **config)
+
+ groupcmd = ["groupadd", "users", "--extrausers"]
+ addcmd = ["useradd", "joe", "--extrausers", "--groups", "users", "-m"]
+
+ mock_subp.assert_any_call(groupcmd)
+ mock_subp.assert_any_call(addcmd, logstring=addcmd)
+
+
+# vi: ts=4 expandtab