summaryrefslogtreecommitdiff
path: root/tests/unittests/distros
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/distros')
-rw-r--r--tests/unittests/distros/__init__.py21
-rw-r--r--tests/unittests/distros/test_arch.py45
-rw-r--r--tests/unittests/distros/test_bsd_utils.py67
-rw-r--r--tests/unittests/distros/test_create_users.py236
-rw-r--r--tests/unittests/distros/test_debian.py174
-rw-r--r--tests/unittests/distros/test_dragonflybsd.py25
-rw-r--r--tests/unittests/distros/test_freebsd.py45
-rw-r--r--tests/unittests/distros/test_generic.py315
-rw-r--r--tests/unittests/distros/test_gentoo.py26
-rw-r--r--tests/unittests/distros/test_hostname.py42
-rw-r--r--tests/unittests/distros/test_hosts.py45
-rw-r--r--tests/unittests/distros/test_init.py161
-rw-r--r--tests/unittests/distros/test_manage_service.py38
-rw-r--r--tests/unittests/distros/test_netbsd.py17
-rw-r--r--tests/unittests/distros/test_netconfig.py916
-rw-r--r--tests/unittests/distros/test_networking.py223
-rw-r--r--tests/unittests/distros/test_opensuse.py12
-rw-r--r--tests/unittests/distros/test_photon.py68
-rw-r--r--tests/unittests/distros/test_resolv.py65
-rw-r--r--tests/unittests/distros/test_sles.py12
-rw-r--r--tests/unittests/distros/test_sysconfig.py86
-rw-r--r--tests/unittests/distros/test_user_data_normalize.py372
22 files changed, 3011 insertions, 0 deletions
diff --git a/tests/unittests/distros/__init__.py b/tests/unittests/distros/__init__.py
new file mode 100644
index 00000000..5394aa56
--- /dev/null
+++ b/tests/unittests/distros/__init__.py
@@ -0,0 +1,21 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+import copy
+
+from cloudinit import distros
+from cloudinit import helpers
+from cloudinit import settings
+
+
+def _get_distro(dtype, system_info=None):
+ """Return a Distro class of distro 'dtype'.
+
+ cfg is format of CFG_BUILTIN['system_info'].
+
+ example: _get_distro("debian")
+ """
+ if system_info is None:
+ system_info = copy.deepcopy(settings.CFG_BUILTIN['system_info'])
+ system_info['distro'] = dtype
+ paths = helpers.Paths(system_info['paths'])
+ distro_cls = distros.fetch(dtype)
+ return distro_cls(dtype, system_info, paths)
diff --git a/tests/unittests/distros/test_arch.py b/tests/unittests/distros/test_arch.py
new file mode 100644
index 00000000..590ba00e
--- /dev/null
+++ b/tests/unittests/distros/test_arch.py
@@ -0,0 +1,45 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+from cloudinit.distros.arch import _render_network
+from cloudinit import util
+
+from tests.unittests.helpers import (CiTestCase, dir2dict)
+
+from . import _get_distro
+
+
+class TestArch(CiTestCase):
+
+ def test_get_distro(self):
+ distro = _get_distro("arch")
+ hostname = "myhostname"
+ hostfile = self.tmp_path("hostfile")
+ distro._write_hostname(hostname, hostfile)
+ self.assertEqual(hostname + "\n", util.load_file(hostfile))
+
+
+class TestRenderNetwork(CiTestCase):
+ def test_basic_static(self):
+ """Just the most basic static config.
+
+ note 'lo' should not be rendered as an interface."""
+ entries = {'eth0': {'auto': True,
+ 'dns-nameservers': ['8.8.8.8'],
+ 'bootproto': 'static',
+ 'address': '10.0.0.2',
+ 'gateway': '10.0.0.1',
+ 'netmask': '255.255.255.0'},
+ 'lo': {'auto': True}}
+ target = self.tmp_dir()
+ devs = _render_network(entries, target=target)
+ files = dir2dict(target, prefix=target)
+ self.assertEqual(['eth0'], devs)
+ self.assertEqual(
+ {'/etc/netctl/eth0': '\n'.join([
+ "Address=10.0.0.2/255.255.255.0",
+ "Connection=ethernet",
+ "DNS=('8.8.8.8')",
+ "Gateway=10.0.0.1",
+ "IP=static",
+ "Interface=eth0", ""]),
+ '/etc/resolv.conf': 'nameserver 8.8.8.8\n'}, files)
diff --git a/tests/unittests/distros/test_bsd_utils.py b/tests/unittests/distros/test_bsd_utils.py
new file mode 100644
index 00000000..55686dc9
--- /dev/null
+++ b/tests/unittests/distros/test_bsd_utils.py
@@ -0,0 +1,67 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+import cloudinit.distros.bsd_utils as bsd_utils
+
+from tests.unittests.helpers import (CiTestCase, ExitStack, mock)
+
+RC_FILE = """
+if something; then
+ do something here
+fi
+hostname={hostname}
+"""
+
+
+class TestBsdUtils(CiTestCase):
+
+ def setUp(self):
+ super().setUp()
+ patches = ExitStack()
+ self.addCleanup(patches.close)
+
+ self.load_file = patches.enter_context(
+ mock.patch.object(bsd_utils.util, 'load_file'))
+
+ self.write_file = patches.enter_context(
+ mock.patch.object(bsd_utils.util, 'write_file'))
+
+ def test_get_rc_config_value(self):
+ self.load_file.return_value = 'hostname=foo\n'
+ self.assertEqual(bsd_utils.get_rc_config_value('hostname'), 'foo')
+ self.load_file.assert_called_with('/etc/rc.conf')
+
+ self.load_file.return_value = 'hostname=foo'
+ self.assertEqual(bsd_utils.get_rc_config_value('hostname'), 'foo')
+
+ self.load_file.return_value = 'hostname="foo"'
+ self.assertEqual(bsd_utils.get_rc_config_value('hostname'), 'foo')
+
+ self.load_file.return_value = "hostname='foo'"
+ self.assertEqual(bsd_utils.get_rc_config_value('hostname'), 'foo')
+
+ self.load_file.return_value = 'hostname=\'foo"'
+ self.assertEqual(bsd_utils.get_rc_config_value('hostname'), "'foo\"")
+
+ self.load_file.return_value = ''
+ self.assertEqual(bsd_utils.get_rc_config_value('hostname'), None)
+
+ self.load_file.return_value = RC_FILE.format(hostname='foo')
+ self.assertEqual(bsd_utils.get_rc_config_value('hostname'), "foo")
+
+ def test_set_rc_config_value_unchanged(self):
+ # bsd_utils.set_rc_config_value('hostname', 'foo')
+ # self.write_file.assert_called_with('/etc/rc.conf', 'hostname=foo\n')
+
+ self.load_file.return_value = RC_FILE.format(hostname='foo')
+ self.write_file.assert_not_called()
+
+ def test_set_rc_config_value(self):
+ bsd_utils.set_rc_config_value('hostname', 'foo')
+ self.write_file.assert_called_with('/etc/rc.conf', 'hostname=foo\n')
+
+ self.load_file.return_value = RC_FILE.format(hostname='foo')
+ bsd_utils.set_rc_config_value('hostname', 'bar')
+ self.write_file.assert_called_with(
+ '/etc/rc.conf',
+ RC_FILE.format(hostname='bar')
+ )
diff --git a/tests/unittests/distros/test_create_users.py b/tests/unittests/distros/test_create_users.py
new file mode 100644
index 00000000..5baa8a4b
--- /dev/null
+++ b/tests/unittests/distros/test_create_users.py
@@ -0,0 +1,236 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+import re
+
+from cloudinit import distros
+from cloudinit import ssh_util
+from tests.unittests.helpers import (CiTestCase, mock)
+from tests.unittests.util import abstract_to_concrete
+
+
+@mock.patch("cloudinit.distros.util.system_is_snappy", return_value=False)
+@mock.patch("cloudinit.distros.subp.subp")
+class TestCreateUser(CiTestCase):
+
+ with_logs = True
+
+ def setUp(self):
+ super(TestCreateUser, self).setUp()
+ self.dist = abstract_to_concrete(distros.Distro)(
+ name='test', cfg=None, paths=None
+ )
+
+ def _useradd2call(self, args):
+ # return a mock call for the useradd command in args
+ # with expected 'logstring'.
+ args = ['useradd'] + args
+ logcmd = [a for a in args]
+ for i in range(len(args)):
+ if args[i] in ('--password',):
+ logcmd[i + 1] = 'REDACTED'
+ return mock.call(args, logstring=logcmd)
+
+ def test_basic(self, m_subp, m_is_snappy):
+ user = 'foouser'
+ self.dist.create_user(user)
+ self.assertEqual(
+ m_subp.call_args_list,
+ [self._useradd2call([user, '-m']),
+ mock.call(['passwd', '-l', user])])
+
+ def test_no_home(self, m_subp, m_is_snappy):
+ user = 'foouser'
+ self.dist.create_user(user, no_create_home=True)
+ self.assertEqual(
+ m_subp.call_args_list,
+ [self._useradd2call([user, '-M']),
+ mock.call(['passwd', '-l', user])])
+
+ def test_system_user(self, m_subp, m_is_snappy):
+ # system user should have no home and get --system
+ user = 'foouser'
+ self.dist.create_user(user, system=True)
+ self.assertEqual(
+ m_subp.call_args_list,
+ [self._useradd2call([user, '--system', '-M']),
+ mock.call(['passwd', '-l', user])])
+
+ def test_explicit_no_home_false(self, m_subp, m_is_snappy):
+ user = 'foouser'
+ self.dist.create_user(user, no_create_home=False)
+ self.assertEqual(
+ m_subp.call_args_list,
+ [self._useradd2call([user, '-m']),
+ mock.call(['passwd', '-l', user])])
+
+ def test_unlocked(self, m_subp, m_is_snappy):
+ user = 'foouser'
+ self.dist.create_user(user, lock_passwd=False)
+ self.assertEqual(
+ m_subp.call_args_list,
+ [self._useradd2call([user, '-m'])])
+
+ def test_set_password(self, m_subp, m_is_snappy):
+ user = 'foouser'
+ password = 'passfoo'
+ self.dist.create_user(user, passwd=password)
+ self.assertEqual(
+ m_subp.call_args_list,
+ [self._useradd2call([user, '--password', password, '-m']),
+ mock.call(['passwd', '-l', user])])
+
+ @mock.patch("cloudinit.distros.util.is_group")
+ def test_group_added(self, m_is_group, m_subp, m_is_snappy):
+ m_is_group.return_value = False
+ user = 'foouser'
+ self.dist.create_user(user, groups=['group1'])
+ expected = [
+ mock.call(['groupadd', 'group1']),
+ self._useradd2call([user, '--groups', 'group1', '-m']),
+ mock.call(['passwd', '-l', user])]
+ self.assertEqual(m_subp.call_args_list, expected)
+
+ @mock.patch("cloudinit.distros.util.is_group")
+ def test_only_new_group_added(self, m_is_group, m_subp, m_is_snappy):
+ ex_groups = ['existing_group']
+ groups = ['group1', ex_groups[0]]
+ m_is_group.side_effect = lambda m: m in ex_groups
+ user = 'foouser'
+ self.dist.create_user(user, groups=groups)
+ expected = [
+ mock.call(['groupadd', 'group1']),
+ self._useradd2call([user, '--groups', ','.join(groups), '-m']),
+ mock.call(['passwd', '-l', user])]
+ self.assertEqual(m_subp.call_args_list, expected)
+
+ @mock.patch("cloudinit.distros.util.is_group")
+ def test_create_groups_with_whitespace_string(
+ self, m_is_group, m_subp, m_is_snappy):
+ # groups supported as a comma delimeted string even with white space
+ m_is_group.return_value = False
+ user = 'foouser'
+ self.dist.create_user(user, groups='group1, group2')
+ expected = [
+ mock.call(['groupadd', 'group1']),
+ mock.call(['groupadd', 'group2']),
+ self._useradd2call([user, '--groups', 'group1,group2', '-m']),
+ mock.call(['passwd', '-l', user])]
+ self.assertEqual(m_subp.call_args_list, expected)
+
+ def test_explicit_sudo_false(self, m_subp, m_is_snappy):
+ user = 'foouser'
+ self.dist.create_user(user, sudo=False)
+ self.assertEqual(
+ m_subp.call_args_list,
+ [self._useradd2call([user, '-m']),
+ mock.call(['passwd', '-l', user])])
+
+ @mock.patch('cloudinit.ssh_util.setup_user_keys')
+ def test_setup_ssh_authorized_keys_with_string(
+ self, m_setup_user_keys, m_subp, m_is_snappy):
+ """ssh_authorized_keys allows string and calls setup_user_keys."""
+ user = 'foouser'
+ self.dist.create_user(user, ssh_authorized_keys='mykey')
+ self.assertEqual(
+ m_subp.call_args_list,
+ [self._useradd2call([user, '-m']),
+ mock.call(['passwd', '-l', user])])
+ m_setup_user_keys.assert_called_once_with(set(['mykey']), user)
+
+ @mock.patch('cloudinit.ssh_util.setup_user_keys')
+ def test_setup_ssh_authorized_keys_with_list(
+ self, m_setup_user_keys, m_subp, m_is_snappy):
+ """ssh_authorized_keys allows lists and calls setup_user_keys."""
+ user = 'foouser'
+ self.dist.create_user(user, ssh_authorized_keys=['key1', 'key2'])
+ self.assertEqual(
+ m_subp.call_args_list,
+ [self._useradd2call([user, '-m']),
+ mock.call(['passwd', '-l', user])])
+ m_setup_user_keys.assert_called_once_with(set(['key1', 'key2']), user)
+
+ @mock.patch('cloudinit.ssh_util.setup_user_keys')
+ def test_setup_ssh_authorized_keys_with_integer(
+ self, m_setup_user_keys, m_subp, m_is_snappy):
+ """ssh_authorized_keys warns on non-iterable/string type."""
+ user = 'foouser'
+ self.dist.create_user(user, ssh_authorized_keys=-1)
+ m_setup_user_keys.assert_called_once_with(set([]), user)
+ match = re.match(
+ r'.*WARNING: Invalid type \'<(type|class) \'int\'>\' detected for'
+ ' \'ssh_authorized_keys\'.*',
+ self.logs.getvalue(),
+ re.DOTALL)
+ self.assertIsNotNone(
+ match, 'Missing ssh_authorized_keys invalid type warning')
+
+ @mock.patch('cloudinit.ssh_util.setup_user_keys')
+ def test_create_user_with_ssh_redirect_user_no_cloud_keys(
+ self, m_setup_user_keys, m_subp, m_is_snappy):
+ """Log a warning when trying to redirect a user no cloud ssh keys."""
+ user = 'foouser'
+ self.dist.create_user(user, ssh_redirect_user='someuser')
+ self.assertIn(
+ 'WARNING: Unable to disable SSH logins for foouser given '
+ 'ssh_redirect_user: someuser. No cloud public-keys present.\n',
+ self.logs.getvalue())
+ m_setup_user_keys.assert_not_called()
+
+ @mock.patch('cloudinit.ssh_util.setup_user_keys')
+ def test_create_user_with_ssh_redirect_user_with_cloud_keys(
+ self, m_setup_user_keys, m_subp, m_is_snappy):
+ """Disable ssh when ssh_redirect_user and cloud ssh keys are set."""
+ user = 'foouser'
+ self.dist.create_user(
+ user, ssh_redirect_user='someuser', cloud_public_ssh_keys=['key1'])
+ disable_prefix = ssh_util.DISABLE_USER_OPTS
+ disable_prefix = disable_prefix.replace('$USER', 'someuser')
+ disable_prefix = disable_prefix.replace('$DISABLE_USER', user)
+ m_setup_user_keys.assert_called_once_with(
+ set(['key1']), 'foouser', options=disable_prefix)
+
+ @mock.patch('cloudinit.ssh_util.setup_user_keys')
+ def test_create_user_with_ssh_redirect_user_does_not_disable_auth_keys(
+ self, m_setup_user_keys, m_subp, m_is_snappy):
+ """Do not disable ssh_authorized_keys when ssh_redirect_user is set."""
+ user = 'foouser'
+ self.dist.create_user(
+ user, ssh_authorized_keys='auth1', ssh_redirect_user='someuser',
+ cloud_public_ssh_keys=['key1'])
+ disable_prefix = ssh_util.DISABLE_USER_OPTS
+ disable_prefix = disable_prefix.replace('$USER', 'someuser')
+ disable_prefix = disable_prefix.replace('$DISABLE_USER', user)
+ self.assertEqual(
+ m_setup_user_keys.call_args_list,
+ [mock.call(set(['auth1']), user), # not disabled
+ mock.call(set(['key1']), 'foouser', options=disable_prefix)])
+
+ @mock.patch("cloudinit.distros.subp.which")
+ def test_lock_with_usermod_if_no_passwd(self, m_which, m_subp,
+ m_is_snappy):
+ """Lock uses usermod --lock if no 'passwd' cmd available."""
+ m_which.side_effect = lambda m: m in ('usermod',)
+ self.dist.lock_passwd("bob")
+ self.assertEqual(
+ [mock.call(['usermod', '--lock', 'bob'])],
+ m_subp.call_args_list)
+
+ @mock.patch("cloudinit.distros.subp.which")
+ def test_lock_with_passwd_if_available(self, m_which, m_subp,
+ m_is_snappy):
+ """Lock with only passwd will use passwd."""
+ m_which.side_effect = lambda m: m in ('passwd',)
+ self.dist.lock_passwd("bob")
+ self.assertEqual(
+ [mock.call(['passwd', '-l', 'bob'])],
+ m_subp.call_args_list)
+
+ @mock.patch("cloudinit.distros.subp.which")
+ def test_lock_raises_runtime_if_no_commands(self, m_which, m_subp,
+ m_is_snappy):
+ """Lock with no commands available raises RuntimeError."""
+ m_which.return_value = None
+ with self.assertRaises(RuntimeError):
+ self.dist.lock_passwd("bob")
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/distros/test_debian.py b/tests/unittests/distros/test_debian.py
new file mode 100644
index 00000000..3d0db145
--- /dev/null
+++ b/tests/unittests/distros/test_debian.py
@@ -0,0 +1,174 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+from itertools import count, cycle
+from unittest import mock
+
+import pytest
+
+from cloudinit import distros, util
+from cloudinit.distros.debian import (
+ APT_GET_COMMAND,
+ APT_GET_WRAPPER,
+)
+from tests.unittests.helpers import FilesystemMockingTestCase
+from cloudinit import subp
+
+
+@mock.patch("cloudinit.distros.debian.subp.subp")
+class TestDebianApplyLocale(FilesystemMockingTestCase):
+
+ def setUp(self):
+ super(TestDebianApplyLocale, self).setUp()
+ self.new_root = self.tmp_dir()
+ self.patchOS(self.new_root)
+ self.patchUtils(self.new_root)
+ self.spath = self.tmp_path('etc/default/locale', self.new_root)
+ cls = distros.fetch("debian")
+ self.distro = cls("debian", {}, None)
+
+ def test_no_rerun(self, m_subp):
+ """If system has defined locale, no re-run is expected."""
+ m_subp.return_value = (None, None)
+ locale = 'en_US.UTF-8'
+ util.write_file(self.spath, 'LANG=%s\n' % locale, omode="w")
+ self.distro.apply_locale(locale, out_fn=self.spath)
+ m_subp.assert_not_called()
+
+ def test_no_regen_on_c_utf8(self, m_subp):
+ """If locale is set to C.UTF8, do not attempt to call locale-gen"""
+ m_subp.return_value = (None, None)
+ locale = 'C.UTF-8'
+ util.write_file(self.spath, 'LANG=%s\n' % 'en_US.UTF-8', omode="w")
+ self.distro.apply_locale(locale, out_fn=self.spath)
+ self.assertEqual(
+ [['update-locale', '--locale-file=' + self.spath,
+ 'LANG=%s' % locale]],
+ [p[0][0] for p in m_subp.call_args_list])
+
+ def test_rerun_if_different(self, m_subp):
+ """If system has different locale, locale-gen should be called."""
+ m_subp.return_value = (None, None)
+ locale = 'en_US.UTF-8'
+ util.write_file(self.spath, 'LANG=fr_FR.UTF-8', omode="w")
+ self.distro.apply_locale(locale, out_fn=self.spath)
+ self.assertEqual(
+ [['locale-gen', locale],
+ ['update-locale', '--locale-file=' + self.spath,
+ 'LANG=%s' % locale]],
+ [p[0][0] for p in m_subp.call_args_list])
+
+ def test_rerun_if_no_file(self, m_subp):
+ """If system has no locale file, locale-gen should be called."""
+ m_subp.return_value = (None, None)
+ locale = 'en_US.UTF-8'
+ self.distro.apply_locale(locale, out_fn=self.spath)
+ self.assertEqual(
+ [['locale-gen', locale],
+ ['update-locale', '--locale-file=' + self.spath,
+ 'LANG=%s' % locale]],
+ [p[0][0] for p in m_subp.call_args_list])
+
+ def test_rerun_on_unset_system_locale(self, m_subp):
+ """If system has unset locale, locale-gen should be called."""
+ m_subp.return_value = (None, None)
+ locale = 'en_US.UTF-8'
+ util.write_file(self.spath, 'LANG=', omode="w")
+ self.distro.apply_locale(locale, out_fn=self.spath)
+ self.assertEqual(
+ [['locale-gen', locale],
+ ['update-locale', '--locale-file=' + self.spath,
+ 'LANG=%s' % locale]],
+ [p[0][0] for p in m_subp.call_args_list])
+
+ def test_rerun_on_mismatched_keys(self, m_subp):
+ """If key is LC_ALL and system has only LANG, rerun is expected."""
+ m_subp.return_value = (None, None)
+ locale = 'en_US.UTF-8'
+ util.write_file(self.spath, 'LANG=', omode="w")
+ self.distro.apply_locale(locale, out_fn=self.spath, keyname='LC_ALL')
+ self.assertEqual(
+ [['locale-gen', locale],
+ ['update-locale', '--locale-file=' + self.spath,
+ 'LC_ALL=%s' % locale]],
+ [p[0][0] for p in m_subp.call_args_list])
+
+ def test_falseish_locale_raises_valueerror(self, m_subp):
+ """locale as None or "" is invalid and should raise ValueError."""
+
+ with self.assertRaises(ValueError) as ctext_m:
+ self.distro.apply_locale(None)
+ m_subp.assert_not_called()
+
+ self.assertEqual(
+ 'Failed to provide locale value.', str(ctext_m.exception))
+
+ with self.assertRaises(ValueError) as ctext_m:
+ self.distro.apply_locale("")
+ m_subp.assert_not_called()
+ self.assertEqual(
+ 'Failed to provide locale value.', str(ctext_m.exception))
+
+
+@mock.patch.dict('os.environ', {}, clear=True)
+@mock.patch("cloudinit.distros.debian.subp.which", return_value=True)
+@mock.patch("cloudinit.distros.debian.subp.subp")
+class TestPackageCommand:
+ distro = distros.fetch("debian")("debian", {}, None)
+
+ @mock.patch("cloudinit.distros.debian.Distro._apt_lock_available",
+ return_value=True)
+ def test_simple_command(self, m_apt_avail, m_subp, m_which):
+ self.distro.package_command('update')
+ apt_args = [APT_GET_WRAPPER['command']]
+ apt_args.extend(APT_GET_COMMAND)
+ apt_args.append('update')
+ expected_call = {
+ 'args': apt_args,
+ 'capture': False,
+ 'env': {'DEBIAN_FRONTEND': 'noninteractive'},
+ }
+ assert m_subp.call_args == mock.call(**expected_call)
+
+ @mock.patch("cloudinit.distros.debian.Distro._apt_lock_available",
+ side_effect=[False, False, True])
+ @mock.patch("cloudinit.distros.debian.time.sleep")
+ def test_wait_for_lock(self, m_sleep, m_apt_avail, m_subp, m_which):
+ self.distro._wait_for_apt_command("stub", {"args": "stub2"})
+ assert m_sleep.call_args_list == [mock.call(1), mock.call(1)]
+ assert m_subp.call_args_list == [mock.call(args='stub2')]
+
+ @mock.patch("cloudinit.distros.debian.Distro._apt_lock_available",
+ return_value=False)
+ @mock.patch("cloudinit.distros.debian.time.sleep")
+ @mock.patch("cloudinit.distros.debian.time.time", side_effect=count())
+ def test_lock_wait_timeout(
+ self, m_time, m_sleep, m_apt_avail, m_subp, m_which
+ ):
+ with pytest.raises(TimeoutError):
+ self.distro._wait_for_apt_command("stub", "stub2", timeout=5)
+ assert m_subp.call_args_list == []
+
+ @mock.patch("cloudinit.distros.debian.Distro._apt_lock_available",
+ side_effect=cycle([True, False]))
+ @mock.patch("cloudinit.distros.debian.time.sleep")
+ def test_lock_exception_wait(self, m_sleep, m_apt_avail, m_subp, m_which):
+ exception = subp.ProcessExecutionError(
+ exit_code=100, stderr="Could not get apt lock"
+ )
+ m_subp.side_effect = [exception, exception, "return_thing"]
+ ret = self.distro._wait_for_apt_command("stub", {"args": "stub2"})
+ assert ret == "return_thing"
+
+ @mock.patch("cloudinit.distros.debian.Distro._apt_lock_available",
+ side_effect=cycle([True, False]))
+ @mock.patch("cloudinit.distros.debian.time.sleep")
+ @mock.patch("cloudinit.distros.debian.time.time", side_effect=count())
+ def test_lock_exception_timeout(
+ self, m_time, m_sleep, m_apt_avail, m_subp, m_which
+ ):
+ m_subp.side_effect = subp.ProcessExecutionError(
+ exit_code=100, stderr="Could not get apt lock"
+ )
+ with pytest.raises(TimeoutError):
+ self.distro._wait_for_apt_command(
+ "stub", {"args": "stub2"}, timeout=5
+ )
diff --git a/tests/unittests/distros/test_dragonflybsd.py b/tests/unittests/distros/test_dragonflybsd.py
new file mode 100644
index 00000000..f0cd1b24
--- /dev/null
+++ b/tests/unittests/distros/test_dragonflybsd.py
@@ -0,0 +1,25 @@
+#!/usr/bin/env python3
+
+
+import cloudinit.util
+from tests.unittests.helpers import mock
+
+
+def test_find_dragonflybsd_part():
+ assert cloudinit.util.find_dragonflybsd_part("/dev/vbd0s3") == "vbd0s3"
+
+
+@mock.patch("cloudinit.util.is_DragonFlyBSD")
+@mock.patch("cloudinit.subp.subp")
+def test_parse_mount(mock_subp, m_is_DragonFlyBSD):
+ mount_out = """
+vbd0s3 on / (hammer2, local)
+devfs on /dev (devfs, nosymfollow, local)
+/dev/vbd0s0a on /boot (ufs, local)
+procfs on /proc (procfs, local)
+tmpfs on /var/run/shm (tmpfs, local)
+"""
+
+ mock_subp.return_value = (mount_out, "")
+ m_is_DragonFlyBSD.return_value = True
+ assert cloudinit.util.parse_mount("/") == ("vbd0s3", "hammer2", "/")
diff --git a/tests/unittests/distros/test_freebsd.py b/tests/unittests/distros/test_freebsd.py
new file mode 100644
index 00000000..0279e86f
--- /dev/null
+++ b/tests/unittests/distros/test_freebsd.py
@@ -0,0 +1,45 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+from cloudinit.util import (find_freebsd_part, get_path_dev_freebsd)
+from tests.unittests.helpers import (CiTestCase, mock)
+
+import os
+
+
+class TestDeviceLookUp(CiTestCase):
+
+ @mock.patch('cloudinit.subp.subp')
+ def test_find_freebsd_part_label(self, mock_subp):
+ glabel_out = '''
+gptid/fa52d426-c337-11e6-8911-00155d4c5e47 N/A da0p1
+ label/rootfs N/A da0p2
+ label/swap N/A da0p3
+'''
+ mock_subp.return_value = (glabel_out, "")
+ res = find_freebsd_part("/dev/label/rootfs")
+ self.assertEqual("da0p2", res)
+
+ @mock.patch('cloudinit.subp.subp')
+ def test_find_freebsd_part_gpt(self, mock_subp):
+ glabel_out = '''
+ gpt/bootfs N/A vtbd0p1
+gptid/3f4cbe26-75da-11e8-a8f2-002590ec6166 N/A vtbd0p1
+ gpt/swapfs N/A vtbd0p2
+ gpt/rootfs N/A vtbd0p3
+ iso9660/cidata N/A vtbd2
+'''
+ mock_subp.return_value = (glabel_out, "")
+ res = find_freebsd_part("/dev/gpt/rootfs")
+ self.assertEqual("vtbd0p3", res)
+
+ def test_get_path_dev_freebsd_label(self):
+ mnt_list = '''
+/dev/label/rootfs / ufs rw 1 1
+devfs /dev devfs rw,multilabel 0 0
+fdescfs /dev/fd fdescfs rw 0 0
+/dev/da1s1 /mnt/resource ufs rw 2 2
+'''
+ with mock.patch.object(os.path, 'exists',
+ return_value=True):
+ res = get_path_dev_freebsd('/etc', mnt_list)
+ self.assertIsNotNone(res)
diff --git a/tests/unittests/distros/test_generic.py b/tests/unittests/distros/test_generic.py
new file mode 100644
index 00000000..e542c26f
--- /dev/null
+++ b/tests/unittests/distros/test_generic.py
@@ -0,0 +1,315 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+from cloudinit import distros
+from cloudinit import util
+
+from tests.unittests import helpers
+
+import os
+import pytest
+import shutil
+import tempfile
+from unittest import mock
+
+unknown_arch_info = {
+ 'arches': ['default'],
+ 'failsafe': {'primary': 'http://fs-primary-default',
+ 'security': 'http://fs-security-default'}
+}
+
+package_mirrors = [
+ {'arches': ['i386', 'amd64'],
+ 'failsafe': {'primary': 'http://fs-primary-intel',
+ 'security': 'http://fs-security-intel'},
+ 'search': {
+ 'primary': ['http://%(ec2_region)s.ec2/',
+ 'http://%(availability_zone)s.clouds/'],
+ 'security': ['http://security-mirror1-intel',
+ 'http://security-mirror2-intel']}},
+ {'arches': ['armhf', 'armel'],
+ 'failsafe': {'primary': 'http://fs-primary-arm',
+ 'security': 'http://fs-security-arm'}},
+ unknown_arch_info
+]
+
+gpmi = distros._get_package_mirror_info
+gapmi = distros._get_arch_package_mirror_info
+
+
+class TestGenericDistro(helpers.FilesystemMockingTestCase):
+
+ def setUp(self):
+ super(TestGenericDistro, self).setUp()
+ # Make a temp directoy for tests to use.
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
+
+ def _write_load_sudoers(self, _user, rules):
+ cls = distros.fetch("ubuntu")
+ d = cls("ubuntu", {}, None)
+ os.makedirs(os.path.join(self.tmp, "etc"))
+ os.makedirs(os.path.join(self.tmp, "etc", 'sudoers.d'))
+ self.patchOS(self.tmp)
+ self.patchUtils(self.tmp)
+ d.write_sudo_rules("harlowja", rules)
+ contents = util.load_file(d.ci_sudoers_fn)
+ return contents
+
+ def _count_in(self, lines_look_for, text_content):
+ found_amount = 0
+ for e in lines_look_for:
+ for line in text_content.splitlines():
+ line = line.strip()
+ if line == e:
+ found_amount += 1
+ return found_amount
+
+ def test_sudoers_ensure_rules(self):
+ rules = 'ALL=(ALL:ALL) ALL'
+ contents = self._write_load_sudoers('harlowja', rules)
+ expected = ['harlowja ALL=(ALL:ALL) ALL']
+ self.assertEqual(len(expected), self._count_in(expected, contents))
+ not_expected = [
+ 'harlowja A',
+ 'harlowja L',
+ 'harlowja L',
+ ]
+ self.assertEqual(0, self._count_in(not_expected, contents))
+
+ def test_sudoers_ensure_rules_list(self):
+ rules = [
+ 'ALL=(ALL:ALL) ALL',
+ 'B-ALL=(ALL:ALL) ALL',
+ 'C-ALL=(ALL:ALL) ALL',
+ ]
+ contents = self._write_load_sudoers('harlowja', rules)
+ expected = [
+ 'harlowja ALL=(ALL:ALL) ALL',
+ 'harlowja B-ALL=(ALL:ALL) ALL',
+ 'harlowja C-ALL=(ALL:ALL) ALL',
+ ]
+ self.assertEqual(len(expected), self._count_in(expected, contents))
+ not_expected = [
+ 'harlowja A',
+ 'harlowja L',
+ 'harlowja L',
+ ]
+ self.assertEqual(0, self._count_in(not_expected, contents))
+
+ def test_sudoers_ensure_new(self):
+ cls = distros.fetch("ubuntu")
+ d = cls("ubuntu", {}, None)
+ self.patchOS(self.tmp)
+ self.patchUtils(self.tmp)
+ d.ensure_sudo_dir("/b")
+ contents = util.load_file("/etc/sudoers")
+ self.assertIn("includedir /b", contents)
+ self.assertTrue(os.path.isdir("/b"))
+
+ def test_sudoers_ensure_append(self):
+ cls = distros.fetch("ubuntu")
+ d = cls("ubuntu", {}, None)
+ self.patchOS(self.tmp)
+ self.patchUtils(self.tmp)
+ util.write_file("/etc/sudoers", "josh, josh\n")
+ d.ensure_sudo_dir("/b")
+ contents = util.load_file("/etc/sudoers")
+ self.assertIn("includedir /b", contents)
+ self.assertTrue(os.path.isdir("/b"))
+ self.assertIn("josh", contents)
+ self.assertEqual(2, contents.count("josh"))
+
+ def test_sudoers_ensure_only_one_includedir(self):
+ cls = distros.fetch("ubuntu")
+ d = cls("ubuntu", {}, None)
+ self.patchOS(self.tmp)
+ self.patchUtils(self.tmp)
+ for char in ['#', '@']:
+ util.write_file("/etc/sudoers", "{}includedir /b".format(char))
+ d.ensure_sudo_dir("/b")
+ contents = util.load_file("/etc/sudoers")
+ self.assertIn("includedir /b", contents)
+ self.assertTrue(os.path.isdir("/b"))
+ self.assertEqual(1, contents.count("includedir /b"))
+
+ def test_arch_package_mirror_info_unknown(self):
+ """for an unknown arch, we should get back that with arch 'default'."""
+ arch_mirrors = gapmi(package_mirrors, arch="unknown")
+ self.assertEqual(unknown_arch_info, arch_mirrors)
+
+ def test_arch_package_mirror_info_known(self):
+ arch_mirrors = gapmi(package_mirrors, arch="amd64")
+ self.assertEqual(package_mirrors[0], arch_mirrors)
+
+ def test_systemd_in_use(self):
+ cls = distros.fetch("ubuntu")
+ d = cls("ubuntu", {}, None)
+ self.patchOS(self.tmp)
+ self.patchUtils(self.tmp)
+ os.makedirs('/run/systemd/system')
+ self.assertTrue(d.uses_systemd())
+
+ def test_systemd_not_in_use(self):
+ cls = distros.fetch("ubuntu")
+ d = cls("ubuntu", {}, None)
+ self.patchOS(self.tmp)
+ self.patchUtils(self.tmp)
+ self.assertFalse(d.uses_systemd())
+
+ def test_systemd_symlink(self):
+ cls = distros.fetch("ubuntu")
+ d = cls("ubuntu", {}, None)
+ self.patchOS(self.tmp)
+ self.patchUtils(self.tmp)
+ os.makedirs('/run/systemd')
+ os.symlink('/', '/run/systemd/system')
+ self.assertFalse(d.uses_systemd())
+
+ @mock.patch('cloudinit.distros.debian.read_system_locale')
+ def test_get_locale_ubuntu(self, m_locale):
+ """Test ubuntu distro returns locale set to C.UTF-8"""
+ m_locale.return_value = 'C.UTF-8'
+ cls = distros.fetch("ubuntu")
+ d = cls("ubuntu", {}, None)
+ locale = d.get_locale()
+ self.assertEqual('C.UTF-8', locale)
+
+ def test_get_locale_rhel(self):
+ """Test rhel distro returns NotImplementedError exception"""
+ cls = distros.fetch("rhel")
+ d = cls("rhel", {}, None)
+ with self.assertRaises(NotImplementedError):
+ d.get_locale()
+
+ def test_expire_passwd_uses_chpasswd(self):
+ """Test ubuntu.expire_passwd uses the passwd command."""
+ for d_name in ("ubuntu", "rhel"):
+ cls = distros.fetch(d_name)
+ d = cls(d_name, {}, None)
+ with mock.patch("cloudinit.subp.subp") as m_subp:
+ d.expire_passwd("myuser")
+ m_subp.assert_called_once_with(["passwd", "--expire", "myuser"])
+
+ def test_expire_passwd_freebsd_uses_pw_command(self):
+ """Test FreeBSD.expire_passwd uses the pw command."""
+ cls = distros.fetch("freebsd")
+ d = cls("freebsd", {}, None)
+ with mock.patch("cloudinit.subp.subp") as m_subp:
+ d.expire_passwd("myuser")
+ m_subp.assert_called_once_with(
+ ["pw", "usermod", "myuser", "-p", "01-Jan-1970"])
+
+
+class TestGetPackageMirrors:
+
+ def return_first(self, mlist):
+ if not mlist:
+ return None
+ return mlist[0]
+
+ def return_second(self, mlist):
+ if not mlist:
+ return None
+
+ return mlist[1] if len(mlist) > 1 else None
+
+ def return_none(self, _mlist):
+ return None
+
+ def return_last(self, mlist):
+ if not mlist:
+ return None
+ return(mlist[-1])
+
+ @pytest.mark.parametrize(
+ "allow_ec2_mirror, platform_type, mirrors",
+ [
+ (True, "ec2", [
+ {'primary': 'http://us-east-1.ec2/',
+ 'security': 'http://security-mirror1-intel'},
+ {'primary': 'http://us-east-1a.clouds/',
+ 'security': 'http://security-mirror2-intel'}
+ ]),
+ (True, "other", [
+ {'primary': 'http://us-east-1.ec2/',
+ 'security': 'http://security-mirror1-intel'},
+ {'primary': 'http://us-east-1a.clouds/',
+ 'security': 'http://security-mirror2-intel'}
+ ]),
+ (False, "ec2", [
+ {'primary': 'http://us-east-1.ec2/',
+ 'security': 'http://security-mirror1-intel'},
+ {'primary': 'http://us-east-1a.clouds/',
+ 'security': 'http://security-mirror2-intel'}
+ ]),
+ (False, "other", [
+ {'primary': 'http://us-east-1a.clouds/',
+ 'security': 'http://security-mirror1-intel'},
+ {'primary': 'http://fs-primary-intel',
+ 'security': 'http://security-mirror2-intel'}
+ ])
+ ])
+ def test_get_package_mirror_info_az_ec2(self,
+ allow_ec2_mirror,
+ platform_type,
+ mirrors):
+ flag_path = "cloudinit.distros." \
+ "ALLOW_EC2_MIRRORS_ON_NON_AWS_INSTANCE_TYPES"
+ with mock.patch(flag_path, allow_ec2_mirror):
+ arch_mirrors = gapmi(package_mirrors, arch="amd64")
+ data_source_mock = mock.Mock(
+ availability_zone="us-east-1a",
+ platform_type=platform_type)
+
+ results = gpmi(arch_mirrors, data_source=data_source_mock,
+ mirror_filter=self.return_first)
+ assert(results == mirrors[0])
+
+ results = gpmi(arch_mirrors, data_source=data_source_mock,
+ mirror_filter=self.return_second)
+ assert(results == mirrors[1])
+
+ results = gpmi(arch_mirrors, data_source=data_source_mock,
+ mirror_filter=self.return_none)
+ assert(results == package_mirrors[0]['failsafe'])
+
+ def test_get_package_mirror_info_az_non_ec2(self):
+ arch_mirrors = gapmi(package_mirrors, arch="amd64")
+ data_source_mock = mock.Mock(availability_zone="nova.cloudvendor")
+
+ results = gpmi(arch_mirrors, data_source=data_source_mock,
+ mirror_filter=self.return_first)
+ assert(results == {
+ 'primary': 'http://nova.cloudvendor.clouds/',
+ 'security': 'http://security-mirror1-intel'}
+ )
+
+ results = gpmi(arch_mirrors, data_source=data_source_mock,
+ mirror_filter=self.return_last)
+ assert(results == {
+ 'primary': 'http://nova.cloudvendor.clouds/',
+ 'security': 'http://security-mirror2-intel'}
+ )
+
+ def test_get_package_mirror_info_none(self):
+ arch_mirrors = gapmi(package_mirrors, arch="amd64")
+ data_source_mock = mock.Mock(availability_zone=None)
+
+ # because both search entries here replacement based on
+ # availability-zone, the filter will be called with an empty list and
+ # failsafe should be taken.
+ results = gpmi(arch_mirrors, data_source=data_source_mock,
+ mirror_filter=self.return_first)
+ assert(results == {
+ 'primary': 'http://fs-primary-intel',
+ 'security': 'http://security-mirror1-intel'}
+ )
+
+ results = gpmi(arch_mirrors, data_source=data_source_mock,
+ mirror_filter=self.return_last)
+ assert(results == {
+ 'primary': 'http://fs-primary-intel',
+ 'security': 'http://security-mirror2-intel'}
+ )
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/distros/test_gentoo.py b/tests/unittests/distros/test_gentoo.py
new file mode 100644
index 00000000..4e4680b8
--- /dev/null
+++ b/tests/unittests/distros/test_gentoo.py
@@ -0,0 +1,26 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+from cloudinit import util
+from cloudinit import atomic_helper
+from tests.unittests.helpers import CiTestCase
+from . import _get_distro
+
+
+class TestGentoo(CiTestCase):
+
+ def test_write_hostname(self):
+ distro = _get_distro("gentoo")
+ hostname = "myhostname"
+ hostfile = self.tmp_path("hostfile")
+ distro._write_hostname(hostname, hostfile)
+ self.assertEqual('hostname="myhostname"\n', util.load_file(hostfile))
+
+ def test_write_existing_hostname_with_comments(self):
+ distro = _get_distro("gentoo")
+ hostname = "myhostname"
+ contents = '#This is the hostname\nhostname="localhost"'
+ hostfile = self.tmp_path("hostfile")
+ atomic_helper.write_file(hostfile, contents, omode="w")
+ distro._write_hostname(hostname, hostfile)
+ self.assertEqual('#This is the hostname\nhostname="myhostname"\n',
+ util.load_file(hostfile))
diff --git a/tests/unittests/distros/test_hostname.py b/tests/unittests/distros/test_hostname.py
new file mode 100644
index 00000000..f6d4dbe5
--- /dev/null
+++ b/tests/unittests/distros/test_hostname.py
@@ -0,0 +1,42 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+import unittest
+
+from cloudinit.distros.parsers import hostname
+
+
+BASE_HOSTNAME = '''
+# My super-duper-hostname
+
+blahblah
+
+'''
+BASE_HOSTNAME = BASE_HOSTNAME.strip()
+
+
+class TestHostnameHelper(unittest.TestCase):
+ def test_parse_same(self):
+ hn = hostname.HostnameConf(BASE_HOSTNAME)
+ self.assertEqual(str(hn).strip(), BASE_HOSTNAME)
+ self.assertEqual(hn.hostname, 'blahblah')
+
+ def test_no_adjust_hostname(self):
+ hn = hostname.HostnameConf(BASE_HOSTNAME)
+ prev_name = hn.hostname
+ hn.set_hostname("")
+ self.assertEqual(hn.hostname, prev_name)
+
+ def test_adjust_hostname(self):
+ hn = hostname.HostnameConf(BASE_HOSTNAME)
+ prev_name = hn.hostname
+ self.assertEqual(prev_name, 'blahblah')
+ hn.set_hostname("bbbbd")
+ self.assertEqual(hn.hostname, 'bbbbd')
+ expected_out = '''
+# My super-duper-hostname
+
+bbbbd
+'''
+ self.assertEqual(str(hn).strip(), expected_out.strip())
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/distros/test_hosts.py b/tests/unittests/distros/test_hosts.py
new file mode 100644
index 00000000..8aaa6e48
--- /dev/null
+++ b/tests/unittests/distros/test_hosts.py
@@ -0,0 +1,45 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+import unittest
+
+from cloudinit.distros.parsers import hosts
+
+
+BASE_ETC = '''
+# Example
+127.0.0.1 localhost
+192.168.1.10 foo.mydomain.org foo
+192.168.1.10 bar.mydomain.org bar
+146.82.138.7 master.debian.org master
+209.237.226.90 www.opensource.org
+'''
+BASE_ETC = BASE_ETC.strip()
+
+
+class TestHostsHelper(unittest.TestCase):
+ def test_parse(self):
+ eh = hosts.HostsConf(BASE_ETC)
+ self.assertEqual(eh.get_entry('127.0.0.1'), [['localhost']])
+ self.assertEqual(eh.get_entry('192.168.1.10'),
+ [['foo.mydomain.org', 'foo'],
+ ['bar.mydomain.org', 'bar']])
+ eh = str(eh)
+ self.assertTrue(eh.startswith('# Example'))
+
+ def test_add(self):
+ eh = hosts.HostsConf(BASE_ETC)
+ eh.add_entry('127.0.0.0', 'blah')
+ self.assertEqual(eh.get_entry('127.0.0.0'), [['blah']])
+ eh.add_entry('127.0.0.3', 'blah', 'blah2', 'blah3')
+ self.assertEqual(eh.get_entry('127.0.0.3'),
+ [['blah', 'blah2', 'blah3']])
+
+ def test_del(self):
+ eh = hosts.HostsConf(BASE_ETC)
+ eh.add_entry('127.0.0.0', 'blah')
+ self.assertEqual(eh.get_entry('127.0.0.0'), [['blah']])
+
+ eh.del_entries('127.0.0.0')
+ self.assertEqual(eh.get_entry('127.0.0.0'), [])
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/distros/test_init.py b/tests/unittests/distros/test_init.py
new file mode 100644
index 00000000..fd64a322
--- /dev/null
+++ b/tests/unittests/distros/test_init.py
@@ -0,0 +1,161 @@
+# Copyright (C) 2020 Canonical Ltd.
+#
+# Author: Daniel Watkins <oddbloke@ubuntu.com>
+#
+# This file is part of cloud-init. See LICENSE file for license information.
+"""Tests for cloudinit/distros/__init__.py"""
+
+from unittest import mock
+
+import pytest
+
+from cloudinit.distros import _get_package_mirror_info, LDH_ASCII_CHARS
+
+# In newer versions of Python, these characters will be omitted instead
+# of substituted because of security concerns.
+# See https://bugs.python.org/issue43882
+SECURITY_URL_CHARS = '\n\r\t'
+
+# Define a set of characters we would expect to be replaced
+INVALID_URL_CHARS = [
+ chr(x) for x in range(127)
+ if chr(x) not in LDH_ASCII_CHARS + SECURITY_URL_CHARS
+]
+for separator in [":", ".", "/", "#", "?", "@", "[", "]"]:
+ # Remove from the set characters that either separate hostname parts (":",
+ # "."), terminate hostnames ("/", "#", "?", "@"), or cause Python to be
+ # unable to parse URLs ("[", "]").
+ INVALID_URL_CHARS.remove(separator)
+
+
+class TestGetPackageMirrorInfo:
+ """
+ Tests for cloudinit.distros._get_package_mirror_info.
+
+ These supplement the tests in tests/unittests/test_distros/test_generic.py
+ which are more focused on testing a single production-like configuration.
+ These tests are more focused on specific aspects of the unit under test.
+ """
+
+ @pytest.mark.parametrize('mirror_info,expected', [
+ # Empty info gives empty return
+ ({}, {}),
+ # failsafe values used if present
+ ({'failsafe': {'primary': 'http://value', 'security': 'http://other'}},
+ {'primary': 'http://value', 'security': 'http://other'}),
+ # search values used if present
+ ({'search': {'primary': ['http://value'],
+ 'security': ['http://other']}},
+ {'primary': ['http://value'], 'security': ['http://other']}),
+ # failsafe values used if search value not present
+ ({'search': {'primary': ['http://value']},
+ 'failsafe': {'security': 'http://other'}},
+ {'primary': ['http://value'], 'security': 'http://other'})
+ ])
+ def test_get_package_mirror_info_failsafe(self, mirror_info, expected):
+ """
+ Test the interaction between search and failsafe inputs
+
+ (This doesn't test the case where the mirror_filter removes all search
+ options; test_failsafe_used_if_all_search_results_filtered_out covers
+ that.)
+ """
+ assert expected == _get_package_mirror_info(mirror_info,
+ mirror_filter=lambda x: x)
+
+ def test_failsafe_used_if_all_search_results_filtered_out(self):
+ """Test the failsafe option used if all search options eliminated."""
+ mirror_info = {
+ 'search': {'primary': ['http://value']},
+ 'failsafe': {'primary': 'http://other'}
+ }
+ assert {'primary': 'http://other'} == _get_package_mirror_info(
+ mirror_info, mirror_filter=lambda x: False)
+
+ @pytest.mark.parametrize('allow_ec2_mirror, platform_type', [
+ (True, 'ec2')
+ ])
+ @pytest.mark.parametrize('availability_zone,region,patterns,expected', (
+ # Test ec2_region alone
+ ('fk-fake-1f', None, ['http://EC2-%(ec2_region)s/ubuntu'],
+ ['http://ec2-fk-fake-1/ubuntu']),
+ # Test availability_zone alone
+ ('fk-fake-1f', None, ['http://AZ-%(availability_zone)s/ubuntu'],
+ ['http://az-fk-fake-1f/ubuntu']),
+ # Test region alone
+ (None, 'fk-fake-1', ['http://RG-%(region)s/ubuntu'],
+ ['http://rg-fk-fake-1/ubuntu']),
+ # Test that ec2_region is not available for non-matching AZs
+ ('fake-fake-1f', None,
+ ['http://EC2-%(ec2_region)s/ubuntu',
+ 'http://AZ-%(availability_zone)s/ubuntu'],
+ ['http://az-fake-fake-1f/ubuntu']),
+ # Test that template order maintained
+ (None, 'fake-region',
+ ['http://RG-%(region)s-2/ubuntu', 'http://RG-%(region)s-1/ubuntu'],
+ ['http://rg-fake-region-2/ubuntu', 'http://rg-fake-region-1/ubuntu']),
+ # Test that non-ASCII hostnames are IDNA encoded;
+ # "IDNA-ТεЅТ̣".encode('idna') == b"xn--idna--4kd53hh6aba3q"
+ (None, 'ТεЅТ̣', ['http://www.IDNA-%(region)s.com/ubuntu'],
+ ['http://www.xn--idna--4kd53hh6aba3q.com/ubuntu']),
+ # Test that non-ASCII hostnames with a port are IDNA encoded;
+ # "IDNA-ТεЅТ̣".encode('idna') == b"xn--idna--4kd53hh6aba3q"
+ (None, 'ТεЅТ̣', ['http://www.IDNA-%(region)s.com:8080/ubuntu'],
+ ['http://www.xn--idna--4kd53hh6aba3q.com:8080/ubuntu']),
+ # Test that non-ASCII non-hostname parts of URLs are unchanged
+ (None, 'ТεЅТ̣', ['http://www.example.com/%(region)s/ubuntu'],
+ ['http://www.example.com/ТεЅТ̣/ubuntu']),
+ # Test that IPv4 addresses are unchanged
+ (None, 'fk-fake-1', ['http://192.168.1.1:8080/%(region)s/ubuntu'],
+ ['http://192.168.1.1:8080/fk-fake-1/ubuntu']),
+ # Test that IPv6 addresses are unchanged
+ (None, 'fk-fake-1',
+ ['http://[2001:67c:1360:8001::23]/%(region)s/ubuntu'],
+ ['http://[2001:67c:1360:8001::23]/fk-fake-1/ubuntu']),
+ # Test that unparseable URLs are filtered out of the mirror list
+ (None, 'inv[lid',
+ ['http://%(region)s.in.hostname/should/be/filtered',
+ 'http://but.not.in.the.path/%(region)s'],
+ ['http://but.not.in.the.path/inv[lid']),
+ (None, '-some-region-',
+ ['http://-lead-ing.%(region)s.trail-ing-.example.com/ubuntu'],
+ ['http://lead-ing.some-region.trail-ing.example.com/ubuntu']),
+ ) + tuple(
+ # Dynamically generate a test case for each non-LDH
+ # (Letters/Digits/Hyphen) ASCII character, testing that it is
+ # substituted with a hyphen
+ (None, 'fk{0}fake{0}1'.format(invalid_char),
+ ['http://%(region)s/ubuntu'], ['http://fk-fake-1/ubuntu'])
+ for invalid_char in INVALID_URL_CHARS
+ ))
+ def test_valid_substitution(self,
+ allow_ec2_mirror,
+ platform_type,
+ availability_zone,
+ region,
+ patterns,
+ expected):
+ """Test substitution works as expected."""
+ flag_path = "cloudinit.distros." \
+ "ALLOW_EC2_MIRRORS_ON_NON_AWS_INSTANCE_TYPES"
+
+ m_data_source = mock.Mock(
+ availability_zone=availability_zone,
+ region=region,
+ platform_type=platform_type
+ )
+ mirror_info = {'search': {'primary': patterns}}
+
+ with mock.patch(flag_path, allow_ec2_mirror):
+ ret = _get_package_mirror_info(
+ mirror_info,
+ data_source=m_data_source,
+ mirror_filter=lambda x: x
+ )
+ print(allow_ec2_mirror)
+ print(platform_type)
+ print(availability_zone)
+ print(region)
+ print(patterns)
+ print(expected)
+ assert {'primary': expected} == ret
diff --git a/tests/unittests/distros/test_manage_service.py b/tests/unittests/distros/test_manage_service.py
new file mode 100644
index 00000000..6f1bd0b1
--- /dev/null
+++ b/tests/unittests/distros/test_manage_service.py
@@ -0,0 +1,38 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+from tests.unittests.helpers import (CiTestCase, mock)
+from tests.unittests.util import MockDistro
+
+
+class TestManageService(CiTestCase):
+
+ with_logs = True
+
+ def setUp(self):
+ super(TestManageService, self).setUp()
+ self.dist = MockDistro()
+
+ @mock.patch.object(MockDistro, 'uses_systemd', return_value=False)
+ @mock.patch("cloudinit.distros.subp.subp")
+ def test_manage_service_systemctl_initcmd(self, m_subp, m_sysd):
+ self.dist.init_cmd = ['systemctl']
+ self.dist.manage_service('start', 'myssh')
+ m_subp.assert_called_with(['systemctl', 'start', 'myssh'],
+ capture=True)
+
+ @mock.patch.object(MockDistro, 'uses_systemd', return_value=False)
+ @mock.patch("cloudinit.distros.subp.subp")
+ def test_manage_service_service_initcmd(self, m_subp, m_sysd):
+ self.dist.init_cmd = ['service']
+ self.dist.manage_service('start', 'myssh')
+ m_subp.assert_called_with(['service', 'myssh', 'start'], capture=True)
+
+ @mock.patch.object(MockDistro, 'uses_systemd', return_value=True)
+ @mock.patch("cloudinit.distros.subp.subp")
+ def test_manage_service_systemctl(self, m_subp, m_sysd):
+ self.dist.init_cmd = ['ignore']
+ self.dist.manage_service('start', 'myssh')
+ m_subp.assert_called_with(['systemctl', 'start', 'myssh'],
+ capture=True)
+
+# vi: ts=4 sw=4 expandtab
diff --git a/tests/unittests/distros/test_netbsd.py b/tests/unittests/distros/test_netbsd.py
new file mode 100644
index 00000000..11a68d2a
--- /dev/null
+++ b/tests/unittests/distros/test_netbsd.py
@@ -0,0 +1,17 @@
+import cloudinit.distros.netbsd
+
+import pytest
+import unittest.mock as mock
+
+
+@pytest.mark.parametrize('with_pkgin', (True, False))
+@mock.patch("cloudinit.distros.netbsd.os")
+def test_init(m_os, with_pkgin):
+ print(with_pkgin)
+ m_os.path.exists.return_value = with_pkgin
+ cfg = {}
+
+ distro = cloudinit.distros.netbsd.NetBSD("netbsd", cfg, None)
+ expectation = ['pkgin', '-y', 'full-upgrade'] if with_pkgin else None
+ assert distro.pkg_cmd_upgrade_prefix == expectation
+ assert [mock.call('/usr/pkg/bin/pkgin')] == m_os.path.exists.call_args_list
diff --git a/tests/unittests/distros/test_netconfig.py b/tests/unittests/distros/test_netconfig.py
new file mode 100644
index 00000000..90ac5578
--- /dev/null
+++ b/tests/unittests/distros/test_netconfig.py
@@ -0,0 +1,916 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+import copy
+import os
+import re
+from io import StringIO
+from textwrap import dedent
+from unittest import mock
+
+from cloudinit import distros
+from cloudinit.distros.parsers.sys_conf import SysConf
+from cloudinit import helpers
+from cloudinit import settings
+from tests.unittests.helpers import (
+ FilesystemMockingTestCase, dir2dict)
+from cloudinit import subp
+from cloudinit import util
+from cloudinit import safeyaml
+
+BASE_NET_CFG = '''
+auto lo
+iface lo inet loopback
+
+auto eth0
+iface eth0 inet static
+ address 192.168.1.5
+ broadcast 192.168.1.0
+ gateway 192.168.1.254
+ netmask 255.255.255.0
+ network 192.168.0.0
+
+auto eth1
+iface eth1 inet dhcp
+'''
+
+BASE_NET_CFG_FROM_V2 = '''
+auto lo
+iface lo inet loopback
+
+auto eth0
+iface eth0 inet static
+ address 192.168.1.5/24
+ gateway 192.168.1.254
+
+auto eth1
+iface eth1 inet dhcp
+'''
+
+BASE_NET_CFG_IPV6 = '''
+auto lo
+iface lo inet loopback
+
+auto eth0
+iface eth0 inet static
+ address 192.168.1.5
+ netmask 255.255.255.0
+ network 192.168.0.0
+ broadcast 192.168.1.0
+ gateway 192.168.1.254
+
+iface eth0 inet6 static
+ address 2607:f0d0:1002:0011::2
+ netmask 64
+ gateway 2607:f0d0:1002:0011::1
+
+iface eth1 inet static
+ address 192.168.1.6
+ netmask 255.255.255.0
+ network 192.168.0.0
+ broadcast 192.168.1.0
+ gateway 192.168.1.254
+
+iface eth1 inet6 static
+ address 2607:f0d0:1002:0011::3
+ netmask 64
+ gateway 2607:f0d0:1002:0011::1
+'''
+
+V1_NET_CFG = {'config': [{'name': 'eth0',
+
+ 'subnets': [{'address': '192.168.1.5',
+ 'broadcast': '192.168.1.0',
+ 'gateway': '192.168.1.254',
+ 'netmask': '255.255.255.0',
+ 'type': 'static'}],
+ 'type': 'physical'},
+ {'name': 'eth1',
+ 'subnets': [{'control': 'auto', 'type': 'dhcp4'}],
+ 'type': 'physical'}],
+ 'version': 1}
+
+V1_NET_CFG_WITH_DUPS = """\
+# same value in interface specific dns and global dns
+# should produce single entry in network file
+version: 1
+config:
+ - type: physical
+ name: eth0
+ subnets:
+ - type: static
+ address: 192.168.0.102/24
+ dns_nameservers: [1.2.3.4]
+ dns_search: [test.com]
+ interface: eth0
+ - type: nameserver
+ address: [1.2.3.4]
+ search: [test.com]
+"""
+
+V1_NET_CFG_OUTPUT = """\
+# This file is generated from information provided by the datasource. Changes
+# to it will not persist across an instance reboot. To disable cloud-init's
+# network configuration capabilities, write a file
+# /etc/cloud/cloud.cfg.d/99-disable-network-config.cfg with the following:
+# network: {config: disabled}
+auto lo
+iface lo inet loopback
+
+auto eth0
+iface eth0 inet static
+ address 192.168.1.5/24
+ broadcast 192.168.1.0
+ gateway 192.168.1.254
+
+auto eth1
+iface eth1 inet dhcp
+"""
+
+V1_NET_CFG_IPV6_OUTPUT = """\
+# This file is generated from information provided by the datasource. Changes
+# to it will not persist across an instance reboot. To disable cloud-init's
+# network configuration capabilities, write a file
+# /etc/cloud/cloud.cfg.d/99-disable-network-config.cfg with the following:
+# network: {config: disabled}
+auto lo
+iface lo inet loopback
+
+auto eth0
+iface eth0 inet6 static
+ address 2607:f0d0:1002:0011::2/64
+ gateway 2607:f0d0:1002:0011::1
+
+auto eth1
+iface eth1 inet dhcp
+"""
+
+V1_NET_CFG_IPV6 = {'config': [{'name': 'eth0',
+ 'subnets': [{'address':
+ '2607:f0d0:1002:0011::2',
+ 'gateway':
+ '2607:f0d0:1002:0011::1',
+ 'netmask': '64',
+ 'type': 'static6'}],
+ 'type': 'physical'},
+ {'name': 'eth1',
+ 'subnets': [{'control': 'auto',
+ 'type': 'dhcp4'}],
+ 'type': 'physical'}],
+ 'version': 1}
+
+
+V1_TO_V2_NET_CFG_OUTPUT = """\
+# This file is generated from information provided by the datasource. Changes
+# to it will not persist across an instance reboot. To disable cloud-init's
+# network configuration capabilities, write a file
+# /etc/cloud/cloud.cfg.d/99-disable-network-config.cfg with the following:
+# network: {config: disabled}
+network:
+ version: 2
+ ethernets:
+ eth0:
+ addresses:
+ - 192.168.1.5/24
+ gateway4: 192.168.1.254
+ eth1:
+ dhcp4: true
+"""
+
+V1_TO_V2_NET_CFG_IPV6_OUTPUT = """\
+# This file is generated from information provided by the datasource. Changes
+# to it will not persist across an instance reboot. To disable cloud-init's
+# network configuration capabilities, write a file
+# /etc/cloud/cloud.cfg.d/99-disable-network-config.cfg with the following:
+# network: {config: disabled}
+network:
+ version: 2
+ ethernets:
+ eth0:
+ addresses:
+ - 2607:f0d0:1002:0011::2/64
+ gateway6: 2607:f0d0:1002:0011::1
+ eth1:
+ dhcp4: true
+"""
+
+V2_NET_CFG = {
+ 'ethernets': {
+ 'eth7': {
+ 'addresses': ['192.168.1.5/24'],
+ 'gateway4': '192.168.1.254'},
+ 'eth9': {
+ 'dhcp4': True}
+ },
+ 'version': 2
+}
+
+
+V2_TO_V2_NET_CFG_OUTPUT = """\
+# This file is generated from information provided by the datasource. Changes
+# to it will not persist across an instance reboot. To disable cloud-init's
+# network configuration capabilities, write a file
+# /etc/cloud/cloud.cfg.d/99-disable-network-config.cfg with the following:
+# network: {config: disabled}
+network:
+ ethernets:
+ eth7:
+ addresses:
+ - 192.168.1.5/24
+ gateway4: 192.168.1.254
+ eth9:
+ dhcp4: true
+ version: 2
+"""
+
+
+class WriteBuffer(object):
+ def __init__(self):
+ self.buffer = StringIO()
+ self.mode = None
+ self.omode = None
+
+ def write(self, text):
+ self.buffer.write(text)
+
+ def __str__(self):
+ return self.buffer.getvalue()
+
+
+class TestNetCfgDistroBase(FilesystemMockingTestCase):
+
+ def setUp(self):
+ super(TestNetCfgDistroBase, self).setUp()
+ self.add_patch('cloudinit.util.system_is_snappy', 'm_snappy')
+
+ def _get_distro(self, dname, renderers=None):
+ cls = distros.fetch(dname)
+ cfg = settings.CFG_BUILTIN
+ cfg['system_info']['distro'] = dname
+ if renderers:
+ cfg['system_info']['network'] = {'renderers': renderers}
+ paths = helpers.Paths({})
+ return cls(dname, cfg.get('system_info'), paths)
+
+ def assertCfgEquals(self, blob1, blob2):
+ b1 = dict(SysConf(blob1.strip().splitlines()))
+ b2 = dict(SysConf(blob2.strip().splitlines()))
+ self.assertEqual(b1, b2)
+ for (k, v) in b1.items():
+ self.assertIn(k, b2)
+ for (k, v) in b2.items():
+ self.assertIn(k, b1)
+ for (k, v) in b1.items():
+ self.assertEqual(v, b2[k])
+
+
+class TestNetCfgDistroFreeBSD(TestNetCfgDistroBase):
+
+ def setUp(self):
+ super(TestNetCfgDistroFreeBSD, self).setUp()
+ self.distro = self._get_distro('freebsd', renderers=['freebsd'])
+
+ def _apply_and_verify_freebsd(self, apply_fn, config, expected_cfgs=None,
+ bringup=False):
+ if not expected_cfgs:
+ raise ValueError('expected_cfg must not be None')
+
+ tmpd = None
+ with mock.patch('cloudinit.net.freebsd.available') as m_avail:
+ m_avail.return_value = True
+ with self.reRooted(tmpd) as tmpd:
+ util.ensure_dir('/etc')
+ util.ensure_file('/etc/rc.conf')
+ util.ensure_file('/etc/resolv.conf')
+ apply_fn(config, bringup)
+
+ results = dir2dict(tmpd)
+ for cfgpath, expected in expected_cfgs.items():
+ print("----------")
+ print(expected)
+ print("^^^^ expected | rendered VVVVVVV")
+ print(results[cfgpath])
+ print("----------")
+ self.assertEqual(
+ set(expected.split('\n')),
+ set(results[cfgpath].split('\n')))
+ self.assertEqual(0o644, get_mode(cfgpath, tmpd))
+
+ @mock.patch('cloudinit.net.get_interfaces_by_mac')
+ def test_apply_network_config_freebsd_standard(self, ifaces_mac):
+ ifaces_mac.return_value = {
+ '00:15:5d:4c:73:00': 'eth0',
+ }
+ rc_conf_expected = """\
+defaultrouter=192.168.1.254
+ifconfig_eth0='192.168.1.5 netmask 255.255.255.0'
+ifconfig_eth1=DHCP
+"""
+
+ expected_cfgs = {
+ '/etc/rc.conf': rc_conf_expected,
+ '/etc/resolv.conf': ''
+ }
+ self._apply_and_verify_freebsd(self.distro.apply_network_config,
+ V1_NET_CFG,
+ expected_cfgs=expected_cfgs.copy())
+
+ @mock.patch('cloudinit.net.get_interfaces_by_mac')
+ def test_apply_network_config_freebsd_ifrename(self, ifaces_mac):
+ ifaces_mac.return_value = {
+ '00:15:5d:4c:73:00': 'vtnet0',
+ }
+ rc_conf_expected = """\
+ifconfig_vtnet0_name=eth0
+defaultrouter=192.168.1.254
+ifconfig_eth0='192.168.1.5 netmask 255.255.255.0'
+ifconfig_eth1=DHCP
+"""
+
+ V1_NET_CFG_RENAME = copy.deepcopy(V1_NET_CFG)
+ V1_NET_CFG_RENAME['config'][0]['mac_address'] = '00:15:5d:4c:73:00'
+
+ expected_cfgs = {
+ '/etc/rc.conf': rc_conf_expected,
+ '/etc/resolv.conf': ''
+ }
+ self._apply_and_verify_freebsd(self.distro.apply_network_config,
+ V1_NET_CFG_RENAME,
+ expected_cfgs=expected_cfgs.copy())
+
+ @mock.patch('cloudinit.net.get_interfaces_by_mac')
+ def test_apply_network_config_freebsd_nameserver(self, ifaces_mac):
+ ifaces_mac.return_value = {
+ '00:15:5d:4c:73:00': 'eth0',
+ }
+
+ V1_NET_CFG_DNS = copy.deepcopy(V1_NET_CFG)
+ ns = ['1.2.3.4']
+ V1_NET_CFG_DNS['config'][0]['subnets'][0]['dns_nameservers'] = ns
+ expected_cfgs = {
+ '/etc/resolv.conf': 'nameserver 1.2.3.4\n'
+ }
+ self._apply_and_verify_freebsd(self.distro.apply_network_config,
+ V1_NET_CFG_DNS,
+ expected_cfgs=expected_cfgs.copy())
+
+
+class TestNetCfgDistroUbuntuEni(TestNetCfgDistroBase):
+
+ def setUp(self):
+ super(TestNetCfgDistroUbuntuEni, self).setUp()
+ self.distro = self._get_distro('ubuntu', renderers=['eni'])
+
+ def eni_path(self):
+ return '/etc/network/interfaces.d/50-cloud-init.cfg'
+
+ def _apply_and_verify_eni(self, apply_fn, config, expected_cfgs=None,
+ bringup=False):
+ if not expected_cfgs:
+ raise ValueError('expected_cfg must not be None')
+
+ tmpd = None
+ with mock.patch('cloudinit.net.eni.available') as m_avail:
+ m_avail.return_value = True
+ with self.reRooted(tmpd) as tmpd:
+ apply_fn(config, bringup)
+
+ results = dir2dict(tmpd)
+ for cfgpath, expected in expected_cfgs.items():
+ print("----------")
+ print(expected)
+ print("^^^^ expected | rendered VVVVVVV")
+ print(results[cfgpath])
+ print("----------")
+ self.assertEqual(expected, results[cfgpath])
+ self.assertEqual(0o644, get_mode(cfgpath, tmpd))
+
+ def test_apply_network_config_eni_ub(self):
+ expected_cfgs = {
+ self.eni_path(): V1_NET_CFG_OUTPUT,
+ }
+ # ub_distro.apply_network_config(V1_NET_CFG, False)
+ self._apply_and_verify_eni(self.distro.apply_network_config,
+ V1_NET_CFG,
+ expected_cfgs=expected_cfgs.copy())
+
+ def test_apply_network_config_ipv6_ub(self):
+ expected_cfgs = {
+ self.eni_path(): V1_NET_CFG_IPV6_OUTPUT
+ }
+ self._apply_and_verify_eni(self.distro.apply_network_config,
+ V1_NET_CFG_IPV6,
+ expected_cfgs=expected_cfgs.copy())
+
+
+class TestNetCfgDistroUbuntuNetplan(TestNetCfgDistroBase):
+ def setUp(self):
+ super(TestNetCfgDistroUbuntuNetplan, self).setUp()
+ self.distro = self._get_distro('ubuntu', renderers=['netplan'])
+ self.devlist = ['eth0', 'lo']
+
+ def _apply_and_verify_netplan(self, apply_fn, config, expected_cfgs=None,
+ bringup=False):
+ if not expected_cfgs:
+ raise ValueError('expected_cfg must not be None')
+
+ tmpd = None
+ with mock.patch('cloudinit.net.netplan.available',
+ return_value=True):
+ with mock.patch("cloudinit.net.netplan.get_devicelist",
+ return_value=self.devlist):
+ with self.reRooted(tmpd) as tmpd:
+ apply_fn(config, bringup)
+
+ results = dir2dict(tmpd)
+ for cfgpath, expected in expected_cfgs.items():
+ print("----------")
+ print(expected)
+ print("^^^^ expected | rendered VVVVVVV")
+ print(results[cfgpath])
+ print("----------")
+ self.assertEqual(expected, results[cfgpath])
+ self.assertEqual(0o644, get_mode(cfgpath, tmpd))
+
+ def netplan_path(self):
+ return '/etc/netplan/50-cloud-init.yaml'
+
+ def test_apply_network_config_v1_to_netplan_ub(self):
+ expected_cfgs = {
+ self.netplan_path(): V1_TO_V2_NET_CFG_OUTPUT,
+ }
+
+ # ub_distro.apply_network_config(V1_NET_CFG, False)
+ self._apply_and_verify_netplan(self.distro.apply_network_config,
+ V1_NET_CFG,
+ expected_cfgs=expected_cfgs.copy())
+
+ def test_apply_network_config_v1_ipv6_to_netplan_ub(self):
+ expected_cfgs = {
+ self.netplan_path(): V1_TO_V2_NET_CFG_IPV6_OUTPUT,
+ }
+
+ # ub_distro.apply_network_config(V1_NET_CFG_IPV6, False)
+ self._apply_and_verify_netplan(self.distro.apply_network_config,
+ V1_NET_CFG_IPV6,
+ expected_cfgs=expected_cfgs.copy())
+
+ def test_apply_network_config_v2_passthrough_ub(self):
+ expected_cfgs = {
+ self.netplan_path(): V2_TO_V2_NET_CFG_OUTPUT,
+ }
+ # ub_distro.apply_network_config(V2_NET_CFG, False)
+ self._apply_and_verify_netplan(self.distro.apply_network_config,
+ V2_NET_CFG,
+ expected_cfgs=expected_cfgs.copy())
+
+
+class TestNetCfgDistroRedhat(TestNetCfgDistroBase):
+
+ def setUp(self):
+ super(TestNetCfgDistroRedhat, self).setUp()
+ self.distro = self._get_distro('rhel', renderers=['sysconfig'])
+
+ def ifcfg_path(self, ifname):
+ return '/etc/sysconfig/network-scripts/ifcfg-%s' % ifname
+
+ def control_path(self):
+ return '/etc/sysconfig/network'
+
+ def _apply_and_verify(self, apply_fn, config, expected_cfgs=None,
+ bringup=False):
+ if not expected_cfgs:
+ raise ValueError('expected_cfg must not be None')
+
+ tmpd = None
+ with mock.patch('cloudinit.net.sysconfig.available') as m_avail:
+ m_avail.return_value = True
+ with self.reRooted(tmpd) as tmpd:
+ apply_fn(config, bringup)
+
+ results = dir2dict(tmpd)
+ for cfgpath, expected in expected_cfgs.items():
+ self.assertCfgEquals(expected, results[cfgpath])
+ self.assertEqual(0o644, get_mode(cfgpath, tmpd))
+
+ def test_apply_network_config_rh(self):
+ expected_cfgs = {
+ self.ifcfg_path('eth0'): dedent("""\
+ BOOTPROTO=none
+ DEFROUTE=yes
+ DEVICE=eth0
+ GATEWAY=192.168.1.254
+ IPADDR=192.168.1.5
+ NETMASK=255.255.255.0
+ NM_CONTROLLED=no
+ ONBOOT=yes
+ TYPE=Ethernet
+ USERCTL=no
+ """),
+ self.ifcfg_path('eth1'): dedent("""\
+ BOOTPROTO=dhcp
+ DEVICE=eth1
+ NM_CONTROLLED=no
+ ONBOOT=yes
+ TYPE=Ethernet
+ USERCTL=no
+ """),
+ self.control_path(): dedent("""\
+ NETWORKING=yes
+ """),
+ }
+ # rh_distro.apply_network_config(V1_NET_CFG, False)
+ self._apply_and_verify(self.distro.apply_network_config,
+ V1_NET_CFG,
+ expected_cfgs=expected_cfgs.copy())
+
+ def test_apply_network_config_ipv6_rh(self):
+ expected_cfgs = {
+ self.ifcfg_path('eth0'): dedent("""\
+ BOOTPROTO=none
+ DEFROUTE=yes
+ DEVICE=eth0
+ IPV6ADDR=2607:f0d0:1002:0011::2/64
+ IPV6INIT=yes
+ IPV6_AUTOCONF=no
+ IPV6_DEFAULTGW=2607:f0d0:1002:0011::1
+ IPV6_FORCE_ACCEPT_RA=no
+ NM_CONTROLLED=no
+ ONBOOT=yes
+ TYPE=Ethernet
+ USERCTL=no
+ """),
+ self.ifcfg_path('eth1'): dedent("""\
+ BOOTPROTO=dhcp
+ DEVICE=eth1
+ NM_CONTROLLED=no
+ ONBOOT=yes
+ TYPE=Ethernet
+ USERCTL=no
+ """),
+ self.control_path(): dedent("""\
+ NETWORKING=yes
+ NETWORKING_IPV6=yes
+ IPV6_AUTOCONF=no
+ """),
+ }
+ # rh_distro.apply_network_config(V1_NET_CFG_IPV6, False)
+ self._apply_and_verify(self.distro.apply_network_config,
+ V1_NET_CFG_IPV6,
+ expected_cfgs=expected_cfgs.copy())
+
+ def test_vlan_render_unsupported(self):
+ """Render officially unsupported vlan names."""
+ cfg = {
+ 'version': 2,
+ 'ethernets': {
+ 'eth0': {'addresses': ["192.10.1.2/24"],
+ 'match': {'macaddress': "00:16:3e:60:7c:df"}}},
+ 'vlans': {
+ 'infra0': {'addresses': ["10.0.1.2/16"],
+ 'id': 1001, 'link': 'eth0'}},
+ }
+ expected_cfgs = {
+ self.ifcfg_path('eth0'): dedent("""\
+ BOOTPROTO=none
+ DEVICE=eth0
+ HWADDR=00:16:3e:60:7c:df
+ IPADDR=192.10.1.2
+ NETMASK=255.255.255.0
+ NM_CONTROLLED=no
+ ONBOOT=yes
+ TYPE=Ethernet
+ USERCTL=no
+ """),
+ self.ifcfg_path('infra0'): dedent("""\
+ BOOTPROTO=none
+ DEVICE=infra0
+ IPADDR=10.0.1.2
+ NETMASK=255.255.0.0
+ NM_CONTROLLED=no
+ ONBOOT=yes
+ PHYSDEV=eth0
+ USERCTL=no
+ VLAN=yes
+ """),
+ self.control_path(): dedent("""\
+ NETWORKING=yes
+ """),
+ }
+ self._apply_and_verify(
+ self.distro.apply_network_config, cfg,
+ expected_cfgs=expected_cfgs)
+
+ def test_vlan_render(self):
+ cfg = {
+ 'version': 2,
+ 'ethernets': {
+ 'eth0': {'addresses': ["192.10.1.2/24"]}},
+ 'vlans': {
+ 'eth0.1001': {'addresses': ["10.0.1.2/16"],
+ 'id': 1001, 'link': 'eth0'}},
+ }
+ expected_cfgs = {
+ self.ifcfg_path('eth0'): dedent("""\
+ BOOTPROTO=none
+ DEVICE=eth0
+ IPADDR=192.10.1.2
+ NETMASK=255.255.255.0
+ NM_CONTROLLED=no
+ ONBOOT=yes
+ TYPE=Ethernet
+ USERCTL=no
+ """),
+ self.ifcfg_path('eth0.1001'): dedent("""\
+ BOOTPROTO=none
+ DEVICE=eth0.1001
+ IPADDR=10.0.1.2
+ NETMASK=255.255.0.0
+ NM_CONTROLLED=no
+ ONBOOT=yes
+ PHYSDEV=eth0
+ USERCTL=no
+ VLAN=yes
+ """),
+ self.control_path(): dedent("""\
+ NETWORKING=yes
+ """),
+ }
+ self._apply_and_verify(
+ self.distro.apply_network_config, cfg,
+ expected_cfgs=expected_cfgs)
+
+
+class TestNetCfgDistroOpensuse(TestNetCfgDistroBase):
+
+ def setUp(self):
+ super(TestNetCfgDistroOpensuse, self).setUp()
+ self.distro = self._get_distro('opensuse', renderers=['sysconfig'])
+
+ def ifcfg_path(self, ifname):
+ return '/etc/sysconfig/network/ifcfg-%s' % ifname
+
+ def _apply_and_verify(self, apply_fn, config, expected_cfgs=None,
+ bringup=False):
+ if not expected_cfgs:
+ raise ValueError('expected_cfg must not be None')
+
+ tmpd = None
+ with mock.patch('cloudinit.net.sysconfig.available') as m_avail:
+ m_avail.return_value = True
+ with self.reRooted(tmpd) as tmpd:
+ apply_fn(config, bringup)
+
+ results = dir2dict(tmpd)
+ for cfgpath, expected in expected_cfgs.items():
+ self.assertCfgEquals(expected, results[cfgpath])
+ self.assertEqual(0o644, get_mode(cfgpath, tmpd))
+
+ def test_apply_network_config_opensuse(self):
+ """Opensuse uses apply_network_config and renders sysconfig"""
+ expected_cfgs = {
+ self.ifcfg_path('eth0'): dedent("""\
+ BOOTPROTO=static
+ IPADDR=192.168.1.5
+ NETMASK=255.255.255.0
+ STARTMODE=auto
+ """),
+ self.ifcfg_path('eth1'): dedent("""\
+ BOOTPROTO=dhcp4
+ STARTMODE=auto
+ """),
+ }
+ self._apply_and_verify(self.distro.apply_network_config,
+ V1_NET_CFG,
+ expected_cfgs=expected_cfgs.copy())
+
+ def test_apply_network_config_ipv6_opensuse(self):
+ """Opensuse uses apply_network_config and renders sysconfig w/ipv6"""
+ expected_cfgs = {
+ self.ifcfg_path('eth0'): dedent("""\
+ BOOTPROTO=static
+ IPADDR6=2607:f0d0:1002:0011::2/64
+ STARTMODE=auto
+ """),
+ self.ifcfg_path('eth1'): dedent("""\
+ BOOTPROTO=dhcp4
+ STARTMODE=auto
+ """),
+ }
+ self._apply_and_verify(self.distro.apply_network_config,
+ V1_NET_CFG_IPV6,
+ expected_cfgs=expected_cfgs.copy())
+
+
+class TestNetCfgDistroArch(TestNetCfgDistroBase):
+ def setUp(self):
+ super(TestNetCfgDistroArch, self).setUp()
+ self.distro = self._get_distro('arch', renderers=['netplan'])
+
+ def _apply_and_verify(self, apply_fn, config, expected_cfgs=None,
+ bringup=False, with_netplan=False):
+ if not expected_cfgs:
+ raise ValueError('expected_cfg must not be None')
+
+ tmpd = None
+ with mock.patch('cloudinit.net.netplan.available',
+ return_value=with_netplan):
+ with self.reRooted(tmpd) as tmpd:
+ apply_fn(config, bringup)
+
+ results = dir2dict(tmpd)
+ for cfgpath, expected in expected_cfgs.items():
+ print("----------")
+ print(expected)
+ print("^^^^ expected | rendered VVVVVVV")
+ print(results[cfgpath])
+ print("----------")
+ self.assertEqual(expected, results[cfgpath])
+ self.assertEqual(0o644, get_mode(cfgpath, tmpd))
+
+ def netctl_path(self, iface):
+ return '/etc/netctl/%s' % iface
+
+ def netplan_path(self):
+ return '/etc/netplan/50-cloud-init.yaml'
+
+ def test_apply_network_config_v1_without_netplan(self):
+ # Note that this is in fact an invalid netctl config:
+ # "Address=None/None"
+ # But this is what the renderer has been writing out for a long time,
+ # and the test's purpose is to assert that the netctl renderer is
+ # still being used in absence of netplan, not the correctness of the
+ # rendered netctl config.
+ expected_cfgs = {
+ self.netctl_path('eth0'): dedent("""\
+ Address=192.168.1.5/255.255.255.0
+ Connection=ethernet
+ DNS=()
+ Gateway=192.168.1.254
+ IP=static
+ Interface=eth0
+ """),
+ self.netctl_path('eth1'): dedent("""\
+ Address=None/None
+ Connection=ethernet
+ DNS=()
+ Gateway=
+ IP=dhcp
+ Interface=eth1
+ """),
+ }
+
+ # ub_distro.apply_network_config(V1_NET_CFG, False)
+ self._apply_and_verify(self.distro.apply_network_config,
+ V1_NET_CFG,
+ expected_cfgs=expected_cfgs.copy(),
+ with_netplan=False)
+
+ def test_apply_network_config_v1_with_netplan(self):
+ expected_cfgs = {
+ self.netplan_path(): dedent("""\
+ # generated by cloud-init
+ network:
+ version: 2
+ ethernets:
+ eth0:
+ addresses:
+ - 192.168.1.5/24
+ gateway4: 192.168.1.254
+ eth1:
+ dhcp4: true
+ """),
+ }
+
+ with mock.patch(
+ 'cloudinit.net.netplan.get_devicelist',
+ return_value=[]
+ ):
+ self._apply_and_verify(self.distro.apply_network_config,
+ V1_NET_CFG,
+ expected_cfgs=expected_cfgs.copy(),
+ with_netplan=True)
+
+
+class TestNetCfgDistroPhoton(TestNetCfgDistroBase):
+
+ def setUp(self):
+ super(TestNetCfgDistroPhoton, self).setUp()
+ self.distro = self._get_distro('photon', renderers=['networkd'])
+
+ def create_conf_dict(self, contents):
+ content_dict = {}
+ for line in contents:
+ if line:
+ line = line.strip()
+ if line and re.search(r'^\[(.+)\]$', line):
+ content_dict[line] = []
+ key = line
+ elif line:
+ assert key
+ content_dict[key].append(line)
+
+ return content_dict
+
+ def compare_dicts(self, actual, expected):
+ for k, v in actual.items():
+ self.assertEqual(sorted(expected[k]), sorted(v))
+
+ def _apply_and_verify(self, apply_fn, config, expected_cfgs=None,
+ bringup=False):
+ if not expected_cfgs:
+ raise ValueError('expected_cfg must not be None')
+
+ tmpd = None
+ with mock.patch('cloudinit.net.networkd.available') as m_avail:
+ m_avail.return_value = True
+ with self.reRooted(tmpd) as tmpd:
+ apply_fn(config, bringup)
+
+ results = dir2dict(tmpd)
+ for cfgpath, expected in expected_cfgs.items():
+ actual = self.create_conf_dict(results[cfgpath].splitlines())
+ self.compare_dicts(actual, expected)
+ self.assertEqual(0o644, get_mode(cfgpath, tmpd))
+
+ def nwk_file_path(self, ifname):
+ return '/etc/systemd/network/10-cloud-init-%s.network' % ifname
+
+ def net_cfg_1(self, ifname):
+ ret = """\
+ [Match]
+ Name=%s
+ [Network]
+ DHCP=no
+ [Address]
+ Address=192.168.1.5/24
+ [Route]
+ Gateway=192.168.1.254""" % ifname
+ return ret
+
+ def net_cfg_2(self, ifname):
+ ret = """\
+ [Match]
+ Name=%s
+ [Network]
+ DHCP=ipv4""" % ifname
+ return ret
+
+ def test_photon_network_config_v1(self):
+ tmp = self.net_cfg_1('eth0').splitlines()
+ expected_eth0 = self.create_conf_dict(tmp)
+
+ tmp = self.net_cfg_2('eth1').splitlines()
+ expected_eth1 = self.create_conf_dict(tmp)
+
+ expected_cfgs = {
+ self.nwk_file_path('eth0'): expected_eth0,
+ self.nwk_file_path('eth1'): expected_eth1,
+ }
+
+ self._apply_and_verify(self.distro.apply_network_config,
+ V1_NET_CFG,
+ expected_cfgs.copy())
+
+ def test_photon_network_config_v2(self):
+ tmp = self.net_cfg_1('eth7').splitlines()
+ expected_eth7 = self.create_conf_dict(tmp)
+
+ tmp = self.net_cfg_2('eth9').splitlines()
+ expected_eth9 = self.create_conf_dict(tmp)
+
+ expected_cfgs = {
+ self.nwk_file_path('eth7'): expected_eth7,
+ self.nwk_file_path('eth9'): expected_eth9,
+ }
+
+ self._apply_and_verify(self.distro.apply_network_config,
+ V2_NET_CFG,
+ expected_cfgs.copy())
+
+ def test_photon_network_config_v1_with_duplicates(self):
+ expected = """\
+ [Match]
+ Name=eth0
+ [Network]
+ DHCP=no
+ DNS=1.2.3.4
+ Domains=test.com
+ [Address]
+ Address=192.168.0.102/24"""
+
+ net_cfg = safeyaml.load(V1_NET_CFG_WITH_DUPS)
+
+ expected = self.create_conf_dict(expected.splitlines())
+ expected_cfgs = {
+ self.nwk_file_path('eth0'): expected,
+ }
+
+ self._apply_and_verify(self.distro.apply_network_config,
+ net_cfg,
+ expected_cfgs.copy())
+
+
+def get_mode(path, target=None):
+ return os.stat(subp.target_path(target, path)).st_mode & 0o777
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/distros/test_networking.py b/tests/unittests/distros/test_networking.py
new file mode 100644
index 00000000..ec508f4d
--- /dev/null
+++ b/tests/unittests/distros/test_networking.py
@@ -0,0 +1,223 @@
+from unittest import mock
+
+import pytest
+
+from cloudinit import net
+from cloudinit.distros.networking import (
+ BSDNetworking,
+ LinuxNetworking,
+ Networking,
+)
+
+# See https://docs.pytest.org/en/stable/example
+# /parametrize.html#parametrizing-conditional-raising
+from contextlib import ExitStack as does_not_raise
+
+
+@pytest.yield_fixture
+def generic_networking_cls():
+ """Returns a direct Networking subclass which errors on /sys usage.
+
+ This enables the direct testing of functionality only present on the
+ ``Networking`` super-class, and provides a check on accidentally using /sys
+ in that context.
+ """
+
+ class TestNetworking(Networking):
+ def is_physical(self, *args, **kwargs):
+ raise NotImplementedError
+
+ def settle(self, *args, **kwargs):
+ raise NotImplementedError
+
+ def try_set_link_up(self, *args, **kwargs):
+ raise NotImplementedError
+
+ error = AssertionError("Unexpectedly used /sys in generic networking code")
+ with mock.patch(
+ "cloudinit.net.get_sys_class_path", side_effect=error,
+ ):
+ yield TestNetworking
+
+
+@pytest.yield_fixture
+def sys_class_net(tmpdir):
+ sys_class_net_path = tmpdir.join("sys/class/net")
+ sys_class_net_path.ensure_dir()
+ with mock.patch(
+ "cloudinit.net.get_sys_class_path",
+ return_value=sys_class_net_path.strpath + "/",
+ ):
+ yield sys_class_net_path
+
+
+class TestBSDNetworkingIsPhysical:
+ def test_raises_notimplementederror(self):
+ with pytest.raises(NotImplementedError):
+ BSDNetworking().is_physical("eth0")
+
+
+class TestLinuxNetworkingIsPhysical:
+ def test_returns_false_by_default(self, sys_class_net):
+ assert not LinuxNetworking().is_physical("eth0")
+
+ def test_returns_false_if_devname_exists_but_not_physical(
+ self, sys_class_net
+ ):
+ devname = "eth0"
+ sys_class_net.join(devname).mkdir()
+ assert not LinuxNetworking().is_physical(devname)
+
+ def test_returns_true_if_device_is_physical(self, sys_class_net):
+ devname = "eth0"
+ device_dir = sys_class_net.join(devname)
+ device_dir.mkdir()
+ device_dir.join("device").write("")
+
+ assert LinuxNetworking().is_physical(devname)
+
+
+class TestBSDNetworkingTrySetLinkUp:
+ def test_raises_notimplementederror(self):
+ with pytest.raises(NotImplementedError):
+ BSDNetworking().try_set_link_up("eth0")
+
+
+@mock.patch("cloudinit.net.is_up")
+@mock.patch("cloudinit.distros.networking.subp.subp")
+class TestLinuxNetworkingTrySetLinkUp:
+ def test_calls_subp_return_true(self, m_subp, m_is_up):
+ devname = "eth0"
+ m_is_up.return_value = True
+ is_success = LinuxNetworking().try_set_link_up(devname)
+
+ assert (mock.call(['ip', 'link', 'set', devname, 'up']) ==
+ m_subp.call_args_list[-1])
+ assert is_success
+
+ def test_calls_subp_return_false(self, m_subp, m_is_up):
+ devname = "eth0"
+ m_is_up.return_value = False
+ is_success = LinuxNetworking().try_set_link_up(devname)
+
+ assert (mock.call(['ip', 'link', 'set', devname, 'up']) ==
+ m_subp.call_args_list[-1])
+ assert not is_success
+
+
+class TestBSDNetworkingSettle:
+ def test_settle_doesnt_error(self):
+ # This also implicitly tests that it doesn't use subp.subp
+ BSDNetworking().settle()
+
+
+@pytest.mark.usefixtures("sys_class_net")
+@mock.patch("cloudinit.distros.networking.util.udevadm_settle", autospec=True)
+class TestLinuxNetworkingSettle:
+ def test_no_arguments(self, m_udevadm_settle):
+ LinuxNetworking().settle()
+
+ assert [mock.call(exists=None)] == m_udevadm_settle.call_args_list
+
+ def test_exists_argument(self, m_udevadm_settle):
+ LinuxNetworking().settle(exists="ens3")
+
+ expected_path = net.sys_dev_path("ens3")
+ assert [
+ mock.call(exists=expected_path)
+ ] == m_udevadm_settle.call_args_list
+
+
+class TestNetworkingWaitForPhysDevs:
+ @pytest.fixture
+ def wait_for_physdevs_netcfg(self):
+ """This config is shared across all the tests in this class."""
+
+ def ethernet(mac, name, driver=None, device_id=None):
+ v2_cfg = {"set-name": name, "match": {"macaddress": mac}}
+ if driver:
+ v2_cfg["match"].update({"driver": driver})
+ if device_id:
+ v2_cfg["match"].update({"device_id": device_id})
+
+ return v2_cfg
+
+ physdevs = [
+ ["aa:bb:cc:dd:ee:ff", "eth0", "virtio", "0x1000"],
+ ["00:11:22:33:44:55", "ens3", "e1000", "0x1643"],
+ ]
+ netcfg = {
+ "version": 2,
+ "ethernets": {args[1]: ethernet(*args) for args in physdevs},
+ }
+ return netcfg
+
+ def test_skips_settle_if_all_present(
+ self, generic_networking_cls, wait_for_physdevs_netcfg,
+ ):
+ networking = generic_networking_cls()
+ with mock.patch.object(
+ networking, "get_interfaces_by_mac"
+ ) as m_get_interfaces_by_mac:
+ m_get_interfaces_by_mac.side_effect = iter(
+ [{"aa:bb:cc:dd:ee:ff": "eth0", "00:11:22:33:44:55": "ens3"}]
+ )
+ with mock.patch.object(
+ networking, "settle", autospec=True
+ ) as m_settle:
+ networking.wait_for_physdevs(wait_for_physdevs_netcfg)
+ assert 0 == m_settle.call_count
+
+ def test_calls_udev_settle_on_missing(
+ self, generic_networking_cls, wait_for_physdevs_netcfg,
+ ):
+ networking = generic_networking_cls()
+ with mock.patch.object(
+ networking, "get_interfaces_by_mac"
+ ) as m_get_interfaces_by_mac:
+ m_get_interfaces_by_mac.side_effect = iter(
+ [
+ {
+ "aa:bb:cc:dd:ee:ff": "eth0"
+ }, # first call ens3 is missing
+ {
+ "aa:bb:cc:dd:ee:ff": "eth0",
+ "00:11:22:33:44:55": "ens3",
+ }, # second call has both
+ ]
+ )
+ with mock.patch.object(
+ networking, "settle", autospec=True
+ ) as m_settle:
+ networking.wait_for_physdevs(wait_for_physdevs_netcfg)
+ m_settle.assert_called_with(exists="ens3")
+
+ @pytest.mark.parametrize(
+ "strict,expectation",
+ [(True, pytest.raises(RuntimeError)), (False, does_not_raise())],
+ )
+ def test_retrying_and_strict_behaviour(
+ self,
+ strict,
+ expectation,
+ generic_networking_cls,
+ wait_for_physdevs_netcfg,
+ ):
+ networking = generic_networking_cls()
+ with mock.patch.object(
+ networking, "get_interfaces_by_mac"
+ ) as m_get_interfaces_by_mac:
+ m_get_interfaces_by_mac.return_value = {}
+
+ with mock.patch.object(
+ networking, "settle", autospec=True
+ ) as m_settle:
+ with expectation:
+ networking.wait_for_physdevs(
+ wait_for_physdevs_netcfg, strict=strict
+ )
+
+ assert (
+ 5 * len(wait_for_physdevs_netcfg["ethernets"])
+ == m_settle.call_count
+ )
diff --git a/tests/unittests/distros/test_opensuse.py b/tests/unittests/distros/test_opensuse.py
new file mode 100644
index 00000000..4ff26102
--- /dev/null
+++ b/tests/unittests/distros/test_opensuse.py
@@ -0,0 +1,12 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+from tests.unittests.helpers import CiTestCase
+
+from . import _get_distro
+
+
+class TestopenSUSE(CiTestCase):
+
+ def test_get_distro(self):
+ distro = _get_distro("opensuse")
+ self.assertEqual(distro.osfamily, 'suse')
diff --git a/tests/unittests/distros/test_photon.py b/tests/unittests/distros/test_photon.py
new file mode 100644
index 00000000..3858f723
--- /dev/null
+++ b/tests/unittests/distros/test_photon.py
@@ -0,0 +1,68 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+from . import _get_distro
+from cloudinit import util
+from tests.unittests.helpers import mock
+from tests.unittests.helpers import CiTestCase
+
+SYSTEM_INFO = {
+ 'paths': {
+ 'cloud_dir': '/var/lib/cloud/',
+ 'templates_dir': '/etc/cloud/templates/',
+ },
+ 'network': {'renderers': 'networkd'},
+}
+
+
+class TestPhoton(CiTestCase):
+ with_logs = True
+ distro = _get_distro('photon', SYSTEM_INFO)
+ expected_log_line = 'Rely on PhotonOS default network config'
+
+ def test_network_renderer(self):
+ self.assertEqual(self.distro._cfg['network']['renderers'], 'networkd')
+
+ def test_get_distro(self):
+ self.assertEqual(self.distro.osfamily, 'photon')
+
+ @mock.patch("cloudinit.distros.photon.subp.subp")
+ def test_write_hostname(self, m_subp):
+ hostname = 'myhostname'
+ hostfile = self.tmp_path('previous-hostname')
+ self.distro._write_hostname(hostname, hostfile)
+ self.assertEqual(hostname, util.load_file(hostfile))
+
+ ret = self.distro._read_hostname(hostfile)
+ self.assertEqual(ret, hostname)
+
+ m_subp.return_value = (None, None)
+ hostfile += 'hostfile'
+ self.distro._write_hostname(hostname, hostfile)
+
+ m_subp.return_value = (hostname, None)
+ ret = self.distro._read_hostname(hostfile)
+ self.assertEqual(ret, hostname)
+
+ self.logs.truncate(0)
+ m_subp.return_value = (None, 'bla')
+ self.distro._write_hostname(hostname, None)
+ self.assertIn('Error while setting hostname', self.logs.getvalue())
+
+ @mock.patch('cloudinit.net.generate_fallback_config')
+ def test_fallback_netcfg(self, m_fallback_cfg):
+
+ key = 'disable_fallback_netcfg'
+ # Don't use fallback if no setting given
+ self.logs.truncate(0)
+ assert(self.distro.generate_fallback_config() is None)
+ self.assertIn(self.expected_log_line, self.logs.getvalue())
+
+ self.logs.truncate(0)
+ self.distro._cfg[key] = True
+ assert(self.distro.generate_fallback_config() is None)
+ self.assertIn(self.expected_log_line, self.logs.getvalue())
+
+ self.logs.truncate(0)
+ self.distro._cfg[key] = False
+ assert(self.distro.generate_fallback_config() is not None)
+ self.assertNotIn(self.expected_log_line, self.logs.getvalue())
diff --git a/tests/unittests/distros/test_resolv.py b/tests/unittests/distros/test_resolv.py
new file mode 100644
index 00000000..e7971627
--- /dev/null
+++ b/tests/unittests/distros/test_resolv.py
@@ -0,0 +1,65 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+from cloudinit.distros.parsers import resolv_conf
+
+from tests.unittests.helpers import TestCase
+
+import re
+
+
+BASE_RESOLVE = '''
+; generated by /sbin/dhclient-script
+search blah.yahoo.com yahoo.com
+nameserver 10.15.44.14
+nameserver 10.15.30.92
+'''
+BASE_RESOLVE = BASE_RESOLVE.strip()
+
+
+class TestResolvHelper(TestCase):
+ def test_parse_same(self):
+ rp = resolv_conf.ResolvConf(BASE_RESOLVE)
+ rp_r = str(rp).strip()
+ self.assertEqual(BASE_RESOLVE, rp_r)
+
+ def test_local_domain(self):
+ rp = resolv_conf.ResolvConf(BASE_RESOLVE)
+ self.assertIsNone(rp.local_domain)
+
+ rp.local_domain = "bob"
+ self.assertEqual('bob', rp.local_domain)
+ self.assertIn('domain bob', str(rp))
+
+ def test_nameservers(self):
+ rp = resolv_conf.ResolvConf(BASE_RESOLVE)
+ self.assertIn('10.15.44.14', rp.nameservers)
+ self.assertIn('10.15.30.92', rp.nameservers)
+ rp.add_nameserver('10.2')
+ self.assertIn('10.2', rp.nameservers)
+ self.assertIn('nameserver 10.2', str(rp))
+ self.assertNotIn('10.3', rp.nameservers)
+ self.assertEqual(len(rp.nameservers), 3)
+ rp.add_nameserver('10.2')
+ rp.add_nameserver('10.3')
+ self.assertNotIn('10.3', rp.nameservers)
+
+ def test_search_domains(self):
+ rp = resolv_conf.ResolvConf(BASE_RESOLVE)
+ self.assertIn('yahoo.com', rp.search_domains)
+ self.assertIn('blah.yahoo.com', rp.search_domains)
+ rp.add_search_domain('bbb.y.com')
+ self.assertIn('bbb.y.com', rp.search_domains)
+ self.assertTrue(re.search(r'search(.*)bbb.y.com(.*)', str(rp)))
+ self.assertIn('bbb.y.com', rp.search_domains)
+ rp.add_search_domain('bbb.y.com')
+ self.assertEqual(len(rp.search_domains), 3)
+ rp.add_search_domain('bbb2.y.com')
+ self.assertEqual(len(rp.search_domains), 4)
+ rp.add_search_domain('bbb3.y.com')
+ self.assertEqual(len(rp.search_domains), 5)
+ rp.add_search_domain('bbb4.y.com')
+ self.assertEqual(len(rp.search_domains), 6)
+ self.assertRaises(ValueError, rp.add_search_domain, 'bbb5.y.com')
+ self.assertEqual(len(rp.search_domains), 6)
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/distros/test_sles.py b/tests/unittests/distros/test_sles.py
new file mode 100644
index 00000000..04514a19
--- /dev/null
+++ b/tests/unittests/distros/test_sles.py
@@ -0,0 +1,12 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+from tests.unittests.helpers import CiTestCase
+
+from . import _get_distro
+
+
+class TestSLES(CiTestCase):
+
+ def test_get_distro(self):
+ distro = _get_distro("sles")
+ self.assertEqual(distro.osfamily, 'suse')
diff --git a/tests/unittests/distros/test_sysconfig.py b/tests/unittests/distros/test_sysconfig.py
new file mode 100644
index 00000000..4368496d
--- /dev/null
+++ b/tests/unittests/distros/test_sysconfig.py
@@ -0,0 +1,86 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+import re
+
+from cloudinit.distros.parsers.sys_conf import SysConf
+
+from tests.unittests.helpers import TestCase
+
+
+# Lots of good examples @
+# http://content.hccfl.edu/pollock/AUnix1/SysconfigFilesDesc.txt
+
+class TestSysConfHelper(TestCase):
+ # This function was added in 2.7, make it work for 2.6
+ def assertRegMatches(self, text, regexp):
+ regexp = re.compile(regexp)
+ self.assertTrue(regexp.search(text),
+ msg="%s must match %s!" % (text, regexp.pattern))
+
+ def test_parse_no_change(self):
+ contents = '''# A comment
+USESMBAUTH=no
+KEYTABLE=/usr/lib/kbd/keytables/us.map
+SHORTDATE=$(date +%y:%m:%d:%H:%M)
+HOSTNAME=blahblah
+NETMASK0=255.255.255.0
+# Inline comment
+LIST=$LOGROOT/incremental-list
+IPV6TO4_ROUTING='eth0-:0004::1/64 eth1-:0005::1/64'
+ETHTOOL_OPTS="-K ${DEVICE} tso on; -G ${DEVICE} rx 256 tx 256"
+USEMD5=no'''
+ conf = SysConf(contents.splitlines())
+ self.assertEqual(conf['HOSTNAME'], 'blahblah')
+ self.assertEqual(conf['SHORTDATE'], '$(date +%y:%m:%d:%H:%M)')
+ # Should be unquoted
+ self.assertEqual(conf['ETHTOOL_OPTS'], ('-K ${DEVICE} tso on; '
+ '-G ${DEVICE} rx 256 tx 256'))
+ self.assertEqual(contents, str(conf))
+
+ def test_parse_shell_vars(self):
+ contents = 'USESMBAUTH=$XYZ'
+ conf = SysConf(contents.splitlines())
+ self.assertEqual(contents, str(conf))
+ conf = SysConf('')
+ conf['B'] = '${ZZ}d apples'
+ # Should be quoted
+ self.assertEqual('B="${ZZ}d apples"', str(conf))
+ conf = SysConf('')
+ conf['B'] = '$? d apples'
+ self.assertEqual('B="$? d apples"', str(conf))
+ contents = 'IPMI_WATCHDOG_OPTIONS="timeout=60"'
+ conf = SysConf(contents.splitlines())
+ self.assertEqual('IPMI_WATCHDOG_OPTIONS=timeout=60', str(conf))
+
+ def test_parse_adjust(self):
+ contents = 'IPV6TO4_ROUTING="eth0-:0004::1/64 eth1-:0005::1/64"'
+ conf = SysConf(contents.splitlines())
+ # Should be unquoted
+ self.assertEqual('eth0-:0004::1/64 eth1-:0005::1/64',
+ conf['IPV6TO4_ROUTING'])
+ conf['IPV6TO4_ROUTING'] = "blah \tblah"
+ contents2 = str(conf).strip()
+ # Should be requoted due to whitespace
+ self.assertRegMatches(contents2,
+ r'IPV6TO4_ROUTING=[\']blah\s+blah[\']')
+
+ def test_parse_no_adjust_shell(self):
+ conf = SysConf(''.splitlines())
+ conf['B'] = ' $(time)'
+ contents = str(conf)
+ self.assertEqual('B= $(time)', contents)
+
+ def test_parse_empty(self):
+ contents = ''
+ conf = SysConf(contents.splitlines())
+ self.assertEqual('', str(conf).strip())
+
+ def test_parse_add_new(self):
+ contents = 'BLAH=b'
+ conf = SysConf(contents.splitlines())
+ conf['Z'] = 'd'
+ contents = str(conf)
+ self.assertIn("Z=d", contents)
+ self.assertIn("BLAH=b", contents)
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/distros/test_user_data_normalize.py b/tests/unittests/distros/test_user_data_normalize.py
new file mode 100644
index 00000000..bd8f2adb
--- /dev/null
+++ b/tests/unittests/distros/test_user_data_normalize.py
@@ -0,0 +1,372 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+from unittest import mock
+
+from cloudinit import distros
+from cloudinit.distros import ug_util
+from cloudinit import helpers
+from cloudinit import settings
+
+from tests.unittests.helpers import TestCase
+
+
+bcfg = {
+ 'name': 'bob',
+ 'plain_text_passwd': 'ubuntu',
+ 'home': "/home/ubuntu",
+ 'shell': "/bin/bash",
+ 'lock_passwd': True,
+ 'gecos': "Ubuntu",
+ 'groups': ["foo"]
+}
+
+
+class TestUGNormalize(TestCase):
+
+ def setUp(self):
+ super(TestUGNormalize, self).setUp()
+ self.add_patch('cloudinit.util.system_is_snappy', 'm_snappy')
+
+ def _make_distro(self, dtype, def_user=None):
+ cfg = dict(settings.CFG_BUILTIN)
+ cfg['system_info']['distro'] = dtype
+ paths = helpers.Paths(cfg['system_info']['paths'])
+ distro_cls = distros.fetch(dtype)
+ if def_user:
+ cfg['system_info']['default_user'] = def_user.copy()
+ distro = distro_cls(dtype, cfg['system_info'], paths)
+ return distro
+
+ def _norm(self, cfg, distro):
+ return ug_util.normalize_users_groups(cfg, distro)
+
+ def test_group_dict(self):
+ distro = self._make_distro('ubuntu')
+ g = {'groups':
+ [{'ubuntu': ['foo', 'bar'],
+ 'bob': 'users'},
+ 'cloud-users',
+ {'bob': 'users2'}]}
+ (_users, groups) = self._norm(g, distro)
+ self.assertIn('ubuntu', groups)
+ ub_members = groups['ubuntu']
+ self.assertEqual(sorted(['foo', 'bar']), sorted(ub_members))
+ self.assertIn('bob', groups)
+ b_members = groups['bob']
+ self.assertEqual(sorted(['users', 'users2']),
+ sorted(b_members))
+
+ def test_basic_groups(self):
+ distro = self._make_distro('ubuntu')
+ ug_cfg = {
+ 'groups': ['bob'],
+ }
+ (users, groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', groups)
+ self.assertEqual({}, users)
+
+ def test_csv_groups(self):
+ distro = self._make_distro('ubuntu')
+ ug_cfg = {
+ 'groups': 'bob,joe,steve',
+ }
+ (users, groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', groups)
+ self.assertIn('joe', groups)
+ self.assertIn('steve', groups)
+ self.assertEqual({}, users)
+
+ def test_more_groups(self):
+ distro = self._make_distro('ubuntu')
+ ug_cfg = {
+ 'groups': ['bob', 'joe', 'steve']
+ }
+ (users, groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', groups)
+ self.assertIn('joe', groups)
+ self.assertIn('steve', groups)
+ self.assertEqual({}, users)
+
+ def test_member_groups(self):
+ distro = self._make_distro('ubuntu')
+ ug_cfg = {
+ 'groups': {
+ 'bob': ['s'],
+ 'joe': [],
+ 'steve': [],
+ }
+ }
+ (users, groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', groups)
+ self.assertEqual(['s'], groups['bob'])
+ self.assertEqual([], groups['joe'])
+ self.assertIn('joe', groups)
+ self.assertIn('steve', groups)
+ self.assertEqual({}, users)
+
+ def test_users_simple_dict(self):
+ distro = self._make_distro('ubuntu', bcfg)
+ ug_cfg = {
+ 'users': {
+ 'default': True,
+ }
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', users)
+ ug_cfg = {
+ 'users': {
+ 'default': 'yes',
+ }
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', users)
+ ug_cfg = {
+ 'users': {
+ 'default': '1',
+ }
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', users)
+
+ def test_users_simple_dict_no(self):
+ distro = self._make_distro('ubuntu', bcfg)
+ ug_cfg = {
+ 'users': {
+ 'default': False,
+ }
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertEqual({}, users)
+ ug_cfg = {
+ 'users': {
+ 'default': 'no',
+ }
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertEqual({}, users)
+
+ def test_users_simple_csv(self):
+ distro = self._make_distro('ubuntu')
+ ug_cfg = {
+ 'users': 'joe,bob',
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('joe', users)
+ self.assertIn('bob', users)
+ self.assertEqual({'default': False}, users['joe'])
+ self.assertEqual({'default': False}, users['bob'])
+
+ def test_users_simple(self):
+ distro = self._make_distro('ubuntu')
+ ug_cfg = {
+ 'users': [
+ 'joe',
+ 'bob'
+ ],
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('joe', users)
+ self.assertIn('bob', users)
+ self.assertEqual({'default': False}, users['joe'])
+ self.assertEqual({'default': False}, users['bob'])
+
+ def test_users_old_user(self):
+ distro = self._make_distro('ubuntu', bcfg)
+ ug_cfg = {
+ 'user': 'zetta',
+ 'users': 'default'
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertNotIn('bob', users) # Bob is not the default now, zetta is
+ self.assertIn('zetta', users)
+ self.assertTrue(users['zetta']['default'])
+ self.assertNotIn('default', users)
+ ug_cfg = {
+ 'user': 'zetta',
+ 'users': 'default, joe'
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertNotIn('bob', users) # Bob is not the default now, zetta is
+ self.assertIn('joe', users)
+ self.assertIn('zetta', users)
+ self.assertTrue(users['zetta']['default'])
+ self.assertNotIn('default', users)
+ ug_cfg = {
+ 'user': 'zetta',
+ 'users': ['bob', 'joe']
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', users)
+ self.assertIn('joe', users)
+ self.assertIn('zetta', users)
+ self.assertTrue(users['zetta']['default'])
+ ug_cfg = {
+ 'user': 'zetta',
+ 'users': {
+ 'bob': True,
+ 'joe': True,
+ }
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', users)
+ self.assertIn('joe', users)
+ self.assertIn('zetta', users)
+ self.assertTrue(users['zetta']['default'])
+ ug_cfg = {
+ 'user': 'zetta',
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('zetta', users)
+ ug_cfg = {}
+ (users, groups) = self._norm(ug_cfg, distro)
+ self.assertEqual({}, users)
+ self.assertEqual({}, groups)
+
+ def test_users_dict_default_additional(self):
+ distro = self._make_distro('ubuntu', bcfg)
+ ug_cfg = {
+ 'users': [
+ {'name': 'default', 'blah': True}
+ ],
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', users)
+ self.assertEqual(",".join(distro.get_default_user()['groups']),
+ users['bob']['groups'])
+ self.assertEqual(True, users['bob']['blah'])
+ self.assertEqual(True, users['bob']['default'])
+
+ def test_users_dict_extract(self):
+ distro = self._make_distro('ubuntu', bcfg)
+ ug_cfg = {
+ 'users': [
+ 'default',
+ ],
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', users)
+ (name, config) = ug_util.extract_default(users)
+ self.assertEqual(name, 'bob')
+ expected_config = {}
+ def_config = None
+ try:
+ def_config = distro.get_default_user()
+ except NotImplementedError:
+ pass
+ if not def_config:
+ def_config = {}
+ expected_config.update(def_config)
+
+ # Ignore these for now
+ expected_config.pop('name', None)
+ expected_config.pop('groups', None)
+ config.pop('groups', None)
+ self.assertEqual(config, expected_config)
+
+ def test_users_dict_default(self):
+ distro = self._make_distro('ubuntu', bcfg)
+ ug_cfg = {
+ 'users': [
+ 'default',
+ ],
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', users)
+ self.assertEqual(",".join(distro.get_default_user()['groups']),
+ users['bob']['groups'])
+ self.assertEqual(True, users['bob']['default'])
+
+ def test_users_dict_trans(self):
+ distro = self._make_distro('ubuntu')
+ ug_cfg = {
+ 'users': [
+ {'name': 'joe',
+ 'tr-me': True},
+ {'name': 'bob'},
+ ],
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('joe', users)
+ self.assertIn('bob', users)
+ self.assertEqual({'tr_me': True, 'default': False}, users['joe'])
+ self.assertEqual({'default': False}, users['bob'])
+
+ def test_users_dict(self):
+ distro = self._make_distro('ubuntu')
+ ug_cfg = {
+ 'users': [
+ {'name': 'joe'},
+ {'name': 'bob'},
+ ],
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('joe', users)
+ self.assertIn('bob', users)
+ self.assertEqual({'default': False}, users['joe'])
+ self.assertEqual({'default': False}, users['bob'])
+
+ @mock.patch('cloudinit.subp.subp')
+ def test_create_snap_user(self, mock_subp):
+ mock_subp.side_effect = [('{"username": "joe", "ssh-key-count": 1}\n',
+ '')]
+ distro = self._make_distro('ubuntu')
+ ug_cfg = {
+ 'users': [
+ {'name': 'joe', 'snapuser': 'joe@joe.com'},
+ ],
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ for (user, config) in users.items():
+ print('user=%s config=%s' % (user, config))
+ username = distro.create_user(user, **config)
+
+ snapcmd = ['snap', 'create-user', '--sudoer', '--json', 'joe@joe.com']
+ mock_subp.assert_called_with(snapcmd, capture=True, logstring=snapcmd)
+ self.assertEqual(username, 'joe')
+
+ @mock.patch('cloudinit.subp.subp')
+ def test_create_snap_user_known(self, mock_subp):
+ mock_subp.side_effect = [('{"username": "joe", "ssh-key-count": 1}\n',
+ '')]
+ distro = self._make_distro('ubuntu')
+ ug_cfg = {
+ 'users': [
+ {'name': 'joe', 'snapuser': 'joe@joe.com', 'known': True},
+ ],
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ for (user, config) in users.items():
+ print('user=%s config=%s' % (user, config))
+ username = distro.create_user(user, **config)
+
+ snapcmd = ['snap', 'create-user', '--sudoer', '--json', '--known',
+ 'joe@joe.com']
+ mock_subp.assert_called_with(snapcmd, capture=True, logstring=snapcmd)
+ self.assertEqual(username, 'joe')
+
+ @mock.patch('cloudinit.util.system_is_snappy')
+ @mock.patch('cloudinit.util.is_group')
+ @mock.patch('cloudinit.subp.subp')
+ def test_add_user_on_snappy_system(self, mock_subp, mock_isgrp,
+ mock_snappy):
+ mock_isgrp.return_value = False
+ mock_subp.return_value = True
+ mock_snappy.return_value = True
+ distro = self._make_distro('ubuntu')
+ ug_cfg = {
+ 'users': [
+ {'name': 'joe', 'groups': 'users', 'create_groups': True},
+ ],
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ for (user, config) in users.items():
+ print('user=%s config=%s' % (user, config))
+ distro.add_user(user, **config)
+
+ groupcmd = ['groupadd', 'users', '--extrausers']
+ addcmd = ['useradd', 'joe', '--extrausers', '--groups', 'users', '-m']
+
+ mock_subp.assert_any_call(groupcmd)
+ mock_subp.assert_any_call(addcmd, logstring=addcmd)
+
+# vi: ts=4 expandtab