diff options
Diffstat (limited to 'tests/unittests/distros')
21 files changed, 1379 insertions, 1028 deletions
diff --git a/tests/unittests/distros/__init__.py b/tests/unittests/distros/__init__.py index 5394aa56..e66b9446 100644 --- a/tests/unittests/distros/__init__.py +++ b/tests/unittests/distros/__init__.py @@ -1,9 +1,7 @@ # This file is part of cloud-init. See LICENSE file for license information. import copy -from cloudinit import distros -from cloudinit import helpers -from cloudinit import settings +from cloudinit import distros, helpers, settings def _get_distro(dtype, system_info=None): @@ -14,8 +12,8 @@ def _get_distro(dtype, system_info=None): example: _get_distro("debian") """ if system_info is None: - system_info = copy.deepcopy(settings.CFG_BUILTIN['system_info']) - system_info['distro'] = dtype - paths = helpers.Paths(system_info['paths']) + system_info = copy.deepcopy(settings.CFG_BUILTIN["system_info"]) + system_info["distro"] = dtype + paths = helpers.Paths(system_info["paths"]) distro_cls = distros.fetch(dtype) return distro_cls(dtype, system_info, paths) diff --git a/tests/unittests/distros/test_arch.py b/tests/unittests/distros/test_arch.py index 590ba00e..5446295e 100644 --- a/tests/unittests/distros/test_arch.py +++ b/tests/unittests/distros/test_arch.py @@ -1,15 +1,13 @@ # This file is part of cloud-init. See LICENSE file for license information. -from cloudinit.distros.arch import _render_network from cloudinit import util - -from tests.unittests.helpers import (CiTestCase, dir2dict) +from cloudinit.distros.arch import _render_network +from tests.unittests.helpers import CiTestCase, dir2dict from . import _get_distro class TestArch(CiTestCase): - def test_get_distro(self): distro = _get_distro("arch") hostname = "myhostname" @@ -23,23 +21,35 @@ class TestRenderNetwork(CiTestCase): """Just the most basic static config. note 'lo' should not be rendered as an interface.""" - entries = {'eth0': {'auto': True, - 'dns-nameservers': ['8.8.8.8'], - 'bootproto': 'static', - 'address': '10.0.0.2', - 'gateway': '10.0.0.1', - 'netmask': '255.255.255.0'}, - 'lo': {'auto': True}} + entries = { + "eth0": { + "auto": True, + "dns-nameservers": ["8.8.8.8"], + "bootproto": "static", + "address": "10.0.0.2", + "gateway": "10.0.0.1", + "netmask": "255.255.255.0", + }, + "lo": {"auto": True}, + } target = self.tmp_dir() devs = _render_network(entries, target=target) files = dir2dict(target, prefix=target) - self.assertEqual(['eth0'], devs) + self.assertEqual(["eth0"], devs) self.assertEqual( - {'/etc/netctl/eth0': '\n'.join([ - "Address=10.0.0.2/255.255.255.0", - "Connection=ethernet", - "DNS=('8.8.8.8')", - "Gateway=10.0.0.1", - "IP=static", - "Interface=eth0", ""]), - '/etc/resolv.conf': 'nameserver 8.8.8.8\n'}, files) + { + "/etc/netctl/eth0": "\n".join( + [ + "Address=10.0.0.2/255.255.255.0", + "Connection=ethernet", + "DNS=('8.8.8.8')", + "Gateway=10.0.0.1", + "IP=static", + "Interface=eth0", + "", + ] + ), + "/etc/resolv.conf": "nameserver 8.8.8.8\n", + }, + files, + ) diff --git a/tests/unittests/distros/test_bsd_utils.py b/tests/unittests/distros/test_bsd_utils.py index 55686dc9..d6f0aeed 100644 --- a/tests/unittests/distros/test_bsd_utils.py +++ b/tests/unittests/distros/test_bsd_utils.py @@ -1,8 +1,7 @@ # This file is part of cloud-init. See LICENSE file for license information. import cloudinit.distros.bsd_utils as bsd_utils - -from tests.unittests.helpers import (CiTestCase, ExitStack, mock) +from tests.unittests.helpers import CiTestCase, ExitStack, mock RC_FILE = """ if something; then @@ -13,55 +12,55 @@ hostname={hostname} class TestBsdUtils(CiTestCase): - def setUp(self): super().setUp() patches = ExitStack() self.addCleanup(patches.close) self.load_file = patches.enter_context( - mock.patch.object(bsd_utils.util, 'load_file')) + mock.patch.object(bsd_utils.util, "load_file") + ) self.write_file = patches.enter_context( - mock.patch.object(bsd_utils.util, 'write_file')) + mock.patch.object(bsd_utils.util, "write_file") + ) def test_get_rc_config_value(self): - self.load_file.return_value = 'hostname=foo\n' - self.assertEqual(bsd_utils.get_rc_config_value('hostname'), 'foo') - self.load_file.assert_called_with('/etc/rc.conf') + self.load_file.return_value = "hostname=foo\n" + self.assertEqual(bsd_utils.get_rc_config_value("hostname"), "foo") + self.load_file.assert_called_with("/etc/rc.conf") - self.load_file.return_value = 'hostname=foo' - self.assertEqual(bsd_utils.get_rc_config_value('hostname'), 'foo') + self.load_file.return_value = "hostname=foo" + self.assertEqual(bsd_utils.get_rc_config_value("hostname"), "foo") self.load_file.return_value = 'hostname="foo"' - self.assertEqual(bsd_utils.get_rc_config_value('hostname'), 'foo') + self.assertEqual(bsd_utils.get_rc_config_value("hostname"), "foo") self.load_file.return_value = "hostname='foo'" - self.assertEqual(bsd_utils.get_rc_config_value('hostname'), 'foo') + self.assertEqual(bsd_utils.get_rc_config_value("hostname"), "foo") - self.load_file.return_value = 'hostname=\'foo"' - self.assertEqual(bsd_utils.get_rc_config_value('hostname'), "'foo\"") + self.load_file.return_value = "hostname='foo\"" + self.assertEqual(bsd_utils.get_rc_config_value("hostname"), "'foo\"") - self.load_file.return_value = '' - self.assertEqual(bsd_utils.get_rc_config_value('hostname'), None) + self.load_file.return_value = "" + self.assertEqual(bsd_utils.get_rc_config_value("hostname"), None) - self.load_file.return_value = RC_FILE.format(hostname='foo') - self.assertEqual(bsd_utils.get_rc_config_value('hostname'), "foo") + self.load_file.return_value = RC_FILE.format(hostname="foo") + self.assertEqual(bsd_utils.get_rc_config_value("hostname"), "foo") def test_set_rc_config_value_unchanged(self): # bsd_utils.set_rc_config_value('hostname', 'foo') # self.write_file.assert_called_with('/etc/rc.conf', 'hostname=foo\n') - self.load_file.return_value = RC_FILE.format(hostname='foo') + self.load_file.return_value = RC_FILE.format(hostname="foo") self.write_file.assert_not_called() def test_set_rc_config_value(self): - bsd_utils.set_rc_config_value('hostname', 'foo') - self.write_file.assert_called_with('/etc/rc.conf', 'hostname=foo\n') + bsd_utils.set_rc_config_value("hostname", "foo") + self.write_file.assert_called_with("/etc/rc.conf", "hostname=foo\n") - self.load_file.return_value = RC_FILE.format(hostname='foo') - bsd_utils.set_rc_config_value('hostname', 'bar') + self.load_file.return_value = RC_FILE.format(hostname="foo") + bsd_utils.set_rc_config_value("hostname", "bar") self.write_file.assert_called_with( - '/etc/rc.conf', - RC_FILE.format(hostname='bar') + "/etc/rc.conf", RC_FILE.format(hostname="bar") ) diff --git a/tests/unittests/distros/test_create_users.py b/tests/unittests/distros/test_create_users.py index 5baa8a4b..ddb039bd 100644 --- a/tests/unittests/distros/test_create_users.py +++ b/tests/unittests/distros/test_create_users.py @@ -2,9 +2,8 @@ import re -from cloudinit import distros -from cloudinit import ssh_util -from tests.unittests.helpers import (CiTestCase, mock) +from cloudinit import distros, ssh_util +from tests.unittests.helpers import CiTestCase, mock from tests.unittests.util import abstract_to_concrete @@ -17,220 +16,267 @@ class TestCreateUser(CiTestCase): def setUp(self): super(TestCreateUser, self).setUp() self.dist = abstract_to_concrete(distros.Distro)( - name='test', cfg=None, paths=None + name="test", cfg=None, paths=None ) def _useradd2call(self, args): # return a mock call for the useradd command in args # with expected 'logstring'. - args = ['useradd'] + args + args = ["useradd"] + args logcmd = [a for a in args] for i in range(len(args)): - if args[i] in ('--password',): - logcmd[i + 1] = 'REDACTED' + if args[i] in ("--password",): + logcmd[i + 1] = "REDACTED" return mock.call(args, logstring=logcmd) def test_basic(self, m_subp, m_is_snappy): - user = 'foouser' + user = "foouser" self.dist.create_user(user) self.assertEqual( m_subp.call_args_list, - [self._useradd2call([user, '-m']), - mock.call(['passwd', '-l', user])]) + [ + self._useradd2call([user, "-m"]), + mock.call(["passwd", "-l", user]), + ], + ) def test_no_home(self, m_subp, m_is_snappy): - user = 'foouser' + user = "foouser" self.dist.create_user(user, no_create_home=True) self.assertEqual( m_subp.call_args_list, - [self._useradd2call([user, '-M']), - mock.call(['passwd', '-l', user])]) + [ + self._useradd2call([user, "-M"]), + mock.call(["passwd", "-l", user]), + ], + ) def test_system_user(self, m_subp, m_is_snappy): # system user should have no home and get --system - user = 'foouser' + user = "foouser" self.dist.create_user(user, system=True) self.assertEqual( m_subp.call_args_list, - [self._useradd2call([user, '--system', '-M']), - mock.call(['passwd', '-l', user])]) + [ + self._useradd2call([user, "--system", "-M"]), + mock.call(["passwd", "-l", user]), + ], + ) def test_explicit_no_home_false(self, m_subp, m_is_snappy): - user = 'foouser' + user = "foouser" self.dist.create_user(user, no_create_home=False) self.assertEqual( m_subp.call_args_list, - [self._useradd2call([user, '-m']), - mock.call(['passwd', '-l', user])]) + [ + self._useradd2call([user, "-m"]), + mock.call(["passwd", "-l", user]), + ], + ) def test_unlocked(self, m_subp, m_is_snappy): - user = 'foouser' + user = "foouser" self.dist.create_user(user, lock_passwd=False) self.assertEqual( - m_subp.call_args_list, - [self._useradd2call([user, '-m'])]) + m_subp.call_args_list, [self._useradd2call([user, "-m"])] + ) def test_set_password(self, m_subp, m_is_snappy): - user = 'foouser' - password = 'passfoo' + user = "foouser" + password = "passfoo" self.dist.create_user(user, passwd=password) self.assertEqual( m_subp.call_args_list, - [self._useradd2call([user, '--password', password, '-m']), - mock.call(['passwd', '-l', user])]) + [ + self._useradd2call([user, "--password", password, "-m"]), + mock.call(["passwd", "-l", user]), + ], + ) @mock.patch("cloudinit.distros.util.is_group") def test_group_added(self, m_is_group, m_subp, m_is_snappy): m_is_group.return_value = False - user = 'foouser' - self.dist.create_user(user, groups=['group1']) + user = "foouser" + self.dist.create_user(user, groups=["group1"]) expected = [ - mock.call(['groupadd', 'group1']), - self._useradd2call([user, '--groups', 'group1', '-m']), - mock.call(['passwd', '-l', user])] + mock.call(["groupadd", "group1"]), + self._useradd2call([user, "--groups", "group1", "-m"]), + mock.call(["passwd", "-l", user]), + ] self.assertEqual(m_subp.call_args_list, expected) @mock.patch("cloudinit.distros.util.is_group") def test_only_new_group_added(self, m_is_group, m_subp, m_is_snappy): - ex_groups = ['existing_group'] - groups = ['group1', ex_groups[0]] + ex_groups = ["existing_group"] + groups = ["group1", ex_groups[0]] m_is_group.side_effect = lambda m: m in ex_groups - user = 'foouser' + user = "foouser" self.dist.create_user(user, groups=groups) expected = [ - mock.call(['groupadd', 'group1']), - self._useradd2call([user, '--groups', ','.join(groups), '-m']), - mock.call(['passwd', '-l', user])] + mock.call(["groupadd", "group1"]), + self._useradd2call([user, "--groups", ",".join(groups), "-m"]), + mock.call(["passwd", "-l", user]), + ] self.assertEqual(m_subp.call_args_list, expected) @mock.patch("cloudinit.distros.util.is_group") def test_create_groups_with_whitespace_string( - self, m_is_group, m_subp, m_is_snappy): + self, m_is_group, m_subp, m_is_snappy + ): # groups supported as a comma delimeted string even with white space m_is_group.return_value = False - user = 'foouser' - self.dist.create_user(user, groups='group1, group2') + user = "foouser" + self.dist.create_user(user, groups="group1, group2") expected = [ - mock.call(['groupadd', 'group1']), - mock.call(['groupadd', 'group2']), - self._useradd2call([user, '--groups', 'group1,group2', '-m']), - mock.call(['passwd', '-l', user])] + mock.call(["groupadd", "group1"]), + mock.call(["groupadd", "group2"]), + self._useradd2call([user, "--groups", "group1,group2", "-m"]), + mock.call(["passwd", "-l", user]), + ] self.assertEqual(m_subp.call_args_list, expected) def test_explicit_sudo_false(self, m_subp, m_is_snappy): - user = 'foouser' + user = "foouser" self.dist.create_user(user, sudo=False) self.assertEqual( m_subp.call_args_list, - [self._useradd2call([user, '-m']), - mock.call(['passwd', '-l', user])]) + [ + self._useradd2call([user, "-m"]), + mock.call(["passwd", "-l", user]), + ], + ) - @mock.patch('cloudinit.ssh_util.setup_user_keys') + @mock.patch("cloudinit.ssh_util.setup_user_keys") def test_setup_ssh_authorized_keys_with_string( - self, m_setup_user_keys, m_subp, m_is_snappy): + self, m_setup_user_keys, m_subp, m_is_snappy + ): """ssh_authorized_keys allows string and calls setup_user_keys.""" - user = 'foouser' - self.dist.create_user(user, ssh_authorized_keys='mykey') + user = "foouser" + self.dist.create_user(user, ssh_authorized_keys="mykey") self.assertEqual( m_subp.call_args_list, - [self._useradd2call([user, '-m']), - mock.call(['passwd', '-l', user])]) - m_setup_user_keys.assert_called_once_with(set(['mykey']), user) + [ + self._useradd2call([user, "-m"]), + mock.call(["passwd", "-l", user]), + ], + ) + m_setup_user_keys.assert_called_once_with(set(["mykey"]), user) - @mock.patch('cloudinit.ssh_util.setup_user_keys') + @mock.patch("cloudinit.ssh_util.setup_user_keys") def test_setup_ssh_authorized_keys_with_list( - self, m_setup_user_keys, m_subp, m_is_snappy): + self, m_setup_user_keys, m_subp, m_is_snappy + ): """ssh_authorized_keys allows lists and calls setup_user_keys.""" - user = 'foouser' - self.dist.create_user(user, ssh_authorized_keys=['key1', 'key2']) + user = "foouser" + self.dist.create_user(user, ssh_authorized_keys=["key1", "key2"]) self.assertEqual( m_subp.call_args_list, - [self._useradd2call([user, '-m']), - mock.call(['passwd', '-l', user])]) - m_setup_user_keys.assert_called_once_with(set(['key1', 'key2']), user) + [ + self._useradd2call([user, "-m"]), + mock.call(["passwd", "-l", user]), + ], + ) + m_setup_user_keys.assert_called_once_with(set(["key1", "key2"]), user) - @mock.patch('cloudinit.ssh_util.setup_user_keys') + @mock.patch("cloudinit.ssh_util.setup_user_keys") def test_setup_ssh_authorized_keys_with_integer( - self, m_setup_user_keys, m_subp, m_is_snappy): + self, m_setup_user_keys, m_subp, m_is_snappy + ): """ssh_authorized_keys warns on non-iterable/string type.""" - user = 'foouser' + user = "foouser" self.dist.create_user(user, ssh_authorized_keys=-1) m_setup_user_keys.assert_called_once_with(set([]), user) match = re.match( - r'.*WARNING: Invalid type \'<(type|class) \'int\'>\' detected for' - ' \'ssh_authorized_keys\'.*', + r".*WARNING: Invalid type \'<(type|class) \'int\'>\' detected for" + " 'ssh_authorized_keys'.*", self.logs.getvalue(), - re.DOTALL) + re.DOTALL, + ) self.assertIsNotNone( - match, 'Missing ssh_authorized_keys invalid type warning') + match, "Missing ssh_authorized_keys invalid type warning" + ) - @mock.patch('cloudinit.ssh_util.setup_user_keys') + @mock.patch("cloudinit.ssh_util.setup_user_keys") def test_create_user_with_ssh_redirect_user_no_cloud_keys( - self, m_setup_user_keys, m_subp, m_is_snappy): + self, m_setup_user_keys, m_subp, m_is_snappy + ): """Log a warning when trying to redirect a user no cloud ssh keys.""" - user = 'foouser' - self.dist.create_user(user, ssh_redirect_user='someuser') + user = "foouser" + self.dist.create_user(user, ssh_redirect_user="someuser") self.assertIn( - 'WARNING: Unable to disable SSH logins for foouser given ' - 'ssh_redirect_user: someuser. No cloud public-keys present.\n', - self.logs.getvalue()) + "WARNING: Unable to disable SSH logins for foouser given " + "ssh_redirect_user: someuser. No cloud public-keys present.\n", + self.logs.getvalue(), + ) m_setup_user_keys.assert_not_called() - @mock.patch('cloudinit.ssh_util.setup_user_keys') + @mock.patch("cloudinit.ssh_util.setup_user_keys") def test_create_user_with_ssh_redirect_user_with_cloud_keys( - self, m_setup_user_keys, m_subp, m_is_snappy): + self, m_setup_user_keys, m_subp, m_is_snappy + ): """Disable ssh when ssh_redirect_user and cloud ssh keys are set.""" - user = 'foouser' + user = "foouser" self.dist.create_user( - user, ssh_redirect_user='someuser', cloud_public_ssh_keys=['key1']) + user, ssh_redirect_user="someuser", cloud_public_ssh_keys=["key1"] + ) disable_prefix = ssh_util.DISABLE_USER_OPTS - disable_prefix = disable_prefix.replace('$USER', 'someuser') - disable_prefix = disable_prefix.replace('$DISABLE_USER', user) + disable_prefix = disable_prefix.replace("$USER", "someuser") + disable_prefix = disable_prefix.replace("$DISABLE_USER", user) m_setup_user_keys.assert_called_once_with( - set(['key1']), 'foouser', options=disable_prefix) + set(["key1"]), "foouser", options=disable_prefix + ) - @mock.patch('cloudinit.ssh_util.setup_user_keys') + @mock.patch("cloudinit.ssh_util.setup_user_keys") def test_create_user_with_ssh_redirect_user_does_not_disable_auth_keys( - self, m_setup_user_keys, m_subp, m_is_snappy): + self, m_setup_user_keys, m_subp, m_is_snappy + ): """Do not disable ssh_authorized_keys when ssh_redirect_user is set.""" - user = 'foouser' + user = "foouser" self.dist.create_user( - user, ssh_authorized_keys='auth1', ssh_redirect_user='someuser', - cloud_public_ssh_keys=['key1']) + user, + ssh_authorized_keys="auth1", + ssh_redirect_user="someuser", + cloud_public_ssh_keys=["key1"], + ) disable_prefix = ssh_util.DISABLE_USER_OPTS - disable_prefix = disable_prefix.replace('$USER', 'someuser') - disable_prefix = disable_prefix.replace('$DISABLE_USER', user) + disable_prefix = disable_prefix.replace("$USER", "someuser") + disable_prefix = disable_prefix.replace("$DISABLE_USER", user) self.assertEqual( m_setup_user_keys.call_args_list, - [mock.call(set(['auth1']), user), # not disabled - mock.call(set(['key1']), 'foouser', options=disable_prefix)]) + [ + mock.call(set(["auth1"]), user), # not disabled + mock.call(set(["key1"]), "foouser", options=disable_prefix), + ], + ) @mock.patch("cloudinit.distros.subp.which") - def test_lock_with_usermod_if_no_passwd(self, m_which, m_subp, - m_is_snappy): + def test_lock_with_usermod_if_no_passwd( + self, m_which, m_subp, m_is_snappy + ): """Lock uses usermod --lock if no 'passwd' cmd available.""" - m_which.side_effect = lambda m: m in ('usermod',) + m_which.side_effect = lambda m: m in ("usermod",) self.dist.lock_passwd("bob") self.assertEqual( - [mock.call(['usermod', '--lock', 'bob'])], - m_subp.call_args_list) + [mock.call(["usermod", "--lock", "bob"])], m_subp.call_args_list + ) @mock.patch("cloudinit.distros.subp.which") - def test_lock_with_passwd_if_available(self, m_which, m_subp, - m_is_snappy): + def test_lock_with_passwd_if_available(self, m_which, m_subp, m_is_snappy): """Lock with only passwd will use passwd.""" - m_which.side_effect = lambda m: m in ('passwd',) + m_which.side_effect = lambda m: m in ("passwd",) self.dist.lock_passwd("bob") self.assertEqual( - [mock.call(['passwd', '-l', 'bob'])], - m_subp.call_args_list) + [mock.call(["passwd", "-l", "bob"])], m_subp.call_args_list + ) @mock.patch("cloudinit.distros.subp.which") - def test_lock_raises_runtime_if_no_commands(self, m_which, m_subp, - m_is_snappy): + def test_lock_raises_runtime_if_no_commands( + self, m_which, m_subp, m_is_snappy + ): """Lock with no commands available raises RuntimeError.""" m_which.return_value = None with self.assertRaises(RuntimeError): self.dist.lock_passwd("bob") + # vi: ts=4 expandtab diff --git a/tests/unittests/distros/test_debian.py b/tests/unittests/distros/test_debian.py index 3d0db145..c7c5932e 100644 --- a/tests/unittests/distros/test_debian.py +++ b/tests/unittests/distros/test_debian.py @@ -4,92 +4,117 @@ from unittest import mock import pytest -from cloudinit import distros, util -from cloudinit.distros.debian import ( - APT_GET_COMMAND, - APT_GET_WRAPPER, -) +from cloudinit import distros, subp, util +from cloudinit.distros.debian import APT_GET_COMMAND, APT_GET_WRAPPER from tests.unittests.helpers import FilesystemMockingTestCase -from cloudinit import subp @mock.patch("cloudinit.distros.debian.subp.subp") class TestDebianApplyLocale(FilesystemMockingTestCase): - def setUp(self): super(TestDebianApplyLocale, self).setUp() self.new_root = self.tmp_dir() self.patchOS(self.new_root) self.patchUtils(self.new_root) - self.spath = self.tmp_path('etc/default/locale', self.new_root) + self.spath = self.tmp_path("etc/default/locale", self.new_root) cls = distros.fetch("debian") self.distro = cls("debian", {}, None) def test_no_rerun(self, m_subp): """If system has defined locale, no re-run is expected.""" m_subp.return_value = (None, None) - locale = 'en_US.UTF-8' - util.write_file(self.spath, 'LANG=%s\n' % locale, omode="w") + locale = "en_US.UTF-8" + util.write_file(self.spath, "LANG=%s\n" % locale, omode="w") self.distro.apply_locale(locale, out_fn=self.spath) m_subp.assert_not_called() def test_no_regen_on_c_utf8(self, m_subp): """If locale is set to C.UTF8, do not attempt to call locale-gen""" m_subp.return_value = (None, None) - locale = 'C.UTF-8' - util.write_file(self.spath, 'LANG=%s\n' % 'en_US.UTF-8', omode="w") + locale = "C.UTF-8" + util.write_file(self.spath, "LANG=%s\n" % "en_US.UTF-8", omode="w") self.distro.apply_locale(locale, out_fn=self.spath) self.assertEqual( - [['update-locale', '--locale-file=' + self.spath, - 'LANG=%s' % locale]], - [p[0][0] for p in m_subp.call_args_list]) + [ + [ + "update-locale", + "--locale-file=" + self.spath, + "LANG=%s" % locale, + ] + ], + [p[0][0] for p in m_subp.call_args_list], + ) def test_rerun_if_different(self, m_subp): """If system has different locale, locale-gen should be called.""" m_subp.return_value = (None, None) - locale = 'en_US.UTF-8' - util.write_file(self.spath, 'LANG=fr_FR.UTF-8', omode="w") + locale = "en_US.UTF-8" + util.write_file(self.spath, "LANG=fr_FR.UTF-8", omode="w") self.distro.apply_locale(locale, out_fn=self.spath) self.assertEqual( - [['locale-gen', locale], - ['update-locale', '--locale-file=' + self.spath, - 'LANG=%s' % locale]], - [p[0][0] for p in m_subp.call_args_list]) + [ + ["locale-gen", locale], + [ + "update-locale", + "--locale-file=" + self.spath, + "LANG=%s" % locale, + ], + ], + [p[0][0] for p in m_subp.call_args_list], + ) def test_rerun_if_no_file(self, m_subp): """If system has no locale file, locale-gen should be called.""" m_subp.return_value = (None, None) - locale = 'en_US.UTF-8' + locale = "en_US.UTF-8" self.distro.apply_locale(locale, out_fn=self.spath) self.assertEqual( - [['locale-gen', locale], - ['update-locale', '--locale-file=' + self.spath, - 'LANG=%s' % locale]], - [p[0][0] for p in m_subp.call_args_list]) + [ + ["locale-gen", locale], + [ + "update-locale", + "--locale-file=" + self.spath, + "LANG=%s" % locale, + ], + ], + [p[0][0] for p in m_subp.call_args_list], + ) def test_rerun_on_unset_system_locale(self, m_subp): """If system has unset locale, locale-gen should be called.""" m_subp.return_value = (None, None) - locale = 'en_US.UTF-8' - util.write_file(self.spath, 'LANG=', omode="w") + locale = "en_US.UTF-8" + util.write_file(self.spath, "LANG=", omode="w") self.distro.apply_locale(locale, out_fn=self.spath) self.assertEqual( - [['locale-gen', locale], - ['update-locale', '--locale-file=' + self.spath, - 'LANG=%s' % locale]], - [p[0][0] for p in m_subp.call_args_list]) + [ + ["locale-gen", locale], + [ + "update-locale", + "--locale-file=" + self.spath, + "LANG=%s" % locale, + ], + ], + [p[0][0] for p in m_subp.call_args_list], + ) def test_rerun_on_mismatched_keys(self, m_subp): """If key is LC_ALL and system has only LANG, rerun is expected.""" m_subp.return_value = (None, None) - locale = 'en_US.UTF-8' - util.write_file(self.spath, 'LANG=', omode="w") - self.distro.apply_locale(locale, out_fn=self.spath, keyname='LC_ALL') + locale = "en_US.UTF-8" + util.write_file(self.spath, "LANG=", omode="w") + self.distro.apply_locale(locale, out_fn=self.spath, keyname="LC_ALL") self.assertEqual( - [['locale-gen', locale], - ['update-locale', '--locale-file=' + self.spath, - 'LC_ALL=%s' % locale]], - [p[0][0] for p in m_subp.call_args_list]) + [ + ["locale-gen", locale], + [ + "update-locale", + "--locale-file=" + self.spath, + "LC_ALL=%s" % locale, + ], + ], + [p[0][0] for p in m_subp.call_args_list], + ) def test_falseish_locale_raises_valueerror(self, m_subp): """locale as None or "" is invalid and should raise ValueError.""" @@ -99,45 +124,53 @@ class TestDebianApplyLocale(FilesystemMockingTestCase): m_subp.assert_not_called() self.assertEqual( - 'Failed to provide locale value.', str(ctext_m.exception)) + "Failed to provide locale value.", str(ctext_m.exception) + ) with self.assertRaises(ValueError) as ctext_m: self.distro.apply_locale("") m_subp.assert_not_called() self.assertEqual( - 'Failed to provide locale value.', str(ctext_m.exception)) + "Failed to provide locale value.", str(ctext_m.exception) + ) -@mock.patch.dict('os.environ', {}, clear=True) +@mock.patch.dict("os.environ", {}, clear=True) @mock.patch("cloudinit.distros.debian.subp.which", return_value=True) @mock.patch("cloudinit.distros.debian.subp.subp") class TestPackageCommand: distro = distros.fetch("debian")("debian", {}, None) - @mock.patch("cloudinit.distros.debian.Distro._apt_lock_available", - return_value=True) + @mock.patch( + "cloudinit.distros.debian.Distro._apt_lock_available", + return_value=True, + ) def test_simple_command(self, m_apt_avail, m_subp, m_which): - self.distro.package_command('update') - apt_args = [APT_GET_WRAPPER['command']] + self.distro.package_command("update") + apt_args = [APT_GET_WRAPPER["command"]] apt_args.extend(APT_GET_COMMAND) - apt_args.append('update') + apt_args.append("update") expected_call = { - 'args': apt_args, - 'capture': False, - 'env': {'DEBIAN_FRONTEND': 'noninteractive'}, + "args": apt_args, + "capture": False, + "env": {"DEBIAN_FRONTEND": "noninteractive"}, } assert m_subp.call_args == mock.call(**expected_call) - @mock.patch("cloudinit.distros.debian.Distro._apt_lock_available", - side_effect=[False, False, True]) + @mock.patch( + "cloudinit.distros.debian.Distro._apt_lock_available", + side_effect=[False, False, True], + ) @mock.patch("cloudinit.distros.debian.time.sleep") def test_wait_for_lock(self, m_sleep, m_apt_avail, m_subp, m_which): self.distro._wait_for_apt_command("stub", {"args": "stub2"}) assert m_sleep.call_args_list == [mock.call(1), mock.call(1)] - assert m_subp.call_args_list == [mock.call(args='stub2')] + assert m_subp.call_args_list == [mock.call(args="stub2")] - @mock.patch("cloudinit.distros.debian.Distro._apt_lock_available", - return_value=False) + @mock.patch( + "cloudinit.distros.debian.Distro._apt_lock_available", + return_value=False, + ) @mock.patch("cloudinit.distros.debian.time.sleep") @mock.patch("cloudinit.distros.debian.time.time", side_effect=count()) def test_lock_wait_timeout( @@ -147,8 +180,10 @@ class TestPackageCommand: self.distro._wait_for_apt_command("stub", "stub2", timeout=5) assert m_subp.call_args_list == [] - @mock.patch("cloudinit.distros.debian.Distro._apt_lock_available", - side_effect=cycle([True, False])) + @mock.patch( + "cloudinit.distros.debian.Distro._apt_lock_available", + side_effect=cycle([True, False]), + ) @mock.patch("cloudinit.distros.debian.time.sleep") def test_lock_exception_wait(self, m_sleep, m_apt_avail, m_subp, m_which): exception = subp.ProcessExecutionError( @@ -158,8 +193,10 @@ class TestPackageCommand: ret = self.distro._wait_for_apt_command("stub", {"args": "stub2"}) assert ret == "return_thing" - @mock.patch("cloudinit.distros.debian.Distro._apt_lock_available", - side_effect=cycle([True, False])) + @mock.patch( + "cloudinit.distros.debian.Distro._apt_lock_available", + side_effect=cycle([True, False]), + ) @mock.patch("cloudinit.distros.debian.time.sleep") @mock.patch("cloudinit.distros.debian.time.time", side_effect=count()) def test_lock_exception_timeout( diff --git a/tests/unittests/distros/test_freebsd.py b/tests/unittests/distros/test_freebsd.py index 0279e86f..22be5098 100644 --- a/tests/unittests/distros/test_freebsd.py +++ b/tests/unittests/distros/test_freebsd.py @@ -1,45 +1,43 @@ # This file is part of cloud-init. See LICENSE file for license information. -from cloudinit.util import (find_freebsd_part, get_path_dev_freebsd) -from tests.unittests.helpers import (CiTestCase, mock) - import os +from cloudinit.util import find_freebsd_part, get_path_dev_freebsd +from tests.unittests.helpers import CiTestCase, mock -class TestDeviceLookUp(CiTestCase): - @mock.patch('cloudinit.subp.subp') +class TestDeviceLookUp(CiTestCase): + @mock.patch("cloudinit.subp.subp") def test_find_freebsd_part_label(self, mock_subp): - glabel_out = ''' + glabel_out = """ gptid/fa52d426-c337-11e6-8911-00155d4c5e47 N/A da0p1 label/rootfs N/A da0p2 label/swap N/A da0p3 -''' +""" mock_subp.return_value = (glabel_out, "") res = find_freebsd_part("/dev/label/rootfs") self.assertEqual("da0p2", res) - @mock.patch('cloudinit.subp.subp') + @mock.patch("cloudinit.subp.subp") def test_find_freebsd_part_gpt(self, mock_subp): - glabel_out = ''' + glabel_out = """ gpt/bootfs N/A vtbd0p1 gptid/3f4cbe26-75da-11e8-a8f2-002590ec6166 N/A vtbd0p1 gpt/swapfs N/A vtbd0p2 gpt/rootfs N/A vtbd0p3 iso9660/cidata N/A vtbd2 -''' +""" mock_subp.return_value = (glabel_out, "") res = find_freebsd_part("/dev/gpt/rootfs") self.assertEqual("vtbd0p3", res) def test_get_path_dev_freebsd_label(self): - mnt_list = ''' + mnt_list = """ /dev/label/rootfs / ufs rw 1 1 devfs /dev devfs rw,multilabel 0 0 fdescfs /dev/fd fdescfs rw 0 0 /dev/da1s1 /mnt/resource ufs rw 2 2 -''' - with mock.patch.object(os.path, 'exists', - return_value=True): - res = get_path_dev_freebsd('/etc', mnt_list) +""" + with mock.patch.object(os.path, "exists", return_value=True): + res = get_path_dev_freebsd("/etc", mnt_list) self.assertIsNotNone(res) diff --git a/tests/unittests/distros/test_generic.py b/tests/unittests/distros/test_generic.py index e542c26f..93c5395c 100644 --- a/tests/unittests/distros/test_generic.py +++ b/tests/unittests/distros/test_generic.py @@ -1,35 +1,49 @@ # This file is part of cloud-init. See LICENSE file for license information. -from cloudinit import distros -from cloudinit import util - -from tests.unittests import helpers - import os -import pytest import shutil import tempfile from unittest import mock +import pytest + +from cloudinit import distros, util +from tests.unittests import helpers + unknown_arch_info = { - 'arches': ['default'], - 'failsafe': {'primary': 'http://fs-primary-default', - 'security': 'http://fs-security-default'} + "arches": ["default"], + "failsafe": { + "primary": "http://fs-primary-default", + "security": "http://fs-security-default", + }, } package_mirrors = [ - {'arches': ['i386', 'amd64'], - 'failsafe': {'primary': 'http://fs-primary-intel', - 'security': 'http://fs-security-intel'}, - 'search': { - 'primary': ['http://%(ec2_region)s.ec2/', - 'http://%(availability_zone)s.clouds/'], - 'security': ['http://security-mirror1-intel', - 'http://security-mirror2-intel']}}, - {'arches': ['armhf', 'armel'], - 'failsafe': {'primary': 'http://fs-primary-arm', - 'security': 'http://fs-security-arm'}}, - unknown_arch_info + { + "arches": ["i386", "amd64"], + "failsafe": { + "primary": "http://fs-primary-intel", + "security": "http://fs-security-intel", + }, + "search": { + "primary": [ + "http://%(ec2_region)s.ec2/", + "http://%(availability_zone)s.clouds/", + ], + "security": [ + "http://security-mirror1-intel", + "http://security-mirror2-intel", + ], + }, + }, + { + "arches": ["armhf", "armel"], + "failsafe": { + "primary": "http://fs-primary-arm", + "security": "http://fs-security-arm", + }, + }, + unknown_arch_info, ] gpmi = distros._get_package_mirror_info @@ -37,7 +51,6 @@ gapmi = distros._get_arch_package_mirror_info class TestGenericDistro(helpers.FilesystemMockingTestCase): - def setUp(self): super(TestGenericDistro, self).setUp() # Make a temp directoy for tests to use. @@ -48,7 +61,7 @@ class TestGenericDistro(helpers.FilesystemMockingTestCase): cls = distros.fetch("ubuntu") d = cls("ubuntu", {}, None) os.makedirs(os.path.join(self.tmp, "etc")) - os.makedirs(os.path.join(self.tmp, "etc", 'sudoers.d')) + os.makedirs(os.path.join(self.tmp, "etc", "sudoers.d")) self.patchOS(self.tmp) self.patchUtils(self.tmp) d.write_sudo_rules("harlowja", rules) @@ -65,34 +78,34 @@ class TestGenericDistro(helpers.FilesystemMockingTestCase): return found_amount def test_sudoers_ensure_rules(self): - rules = 'ALL=(ALL:ALL) ALL' - contents = self._write_load_sudoers('harlowja', rules) - expected = ['harlowja ALL=(ALL:ALL) ALL'] + rules = "ALL=(ALL:ALL) ALL" + contents = self._write_load_sudoers("harlowja", rules) + expected = ["harlowja ALL=(ALL:ALL) ALL"] self.assertEqual(len(expected), self._count_in(expected, contents)) not_expected = [ - 'harlowja A', - 'harlowja L', - 'harlowja L', + "harlowja A", + "harlowja L", + "harlowja L", ] self.assertEqual(0, self._count_in(not_expected, contents)) def test_sudoers_ensure_rules_list(self): rules = [ - 'ALL=(ALL:ALL) ALL', - 'B-ALL=(ALL:ALL) ALL', - 'C-ALL=(ALL:ALL) ALL', + "ALL=(ALL:ALL) ALL", + "B-ALL=(ALL:ALL) ALL", + "C-ALL=(ALL:ALL) ALL", ] - contents = self._write_load_sudoers('harlowja', rules) + contents = self._write_load_sudoers("harlowja", rules) expected = [ - 'harlowja ALL=(ALL:ALL) ALL', - 'harlowja B-ALL=(ALL:ALL) ALL', - 'harlowja C-ALL=(ALL:ALL) ALL', + "harlowja ALL=(ALL:ALL) ALL", + "harlowja B-ALL=(ALL:ALL) ALL", + "harlowja C-ALL=(ALL:ALL) ALL", ] self.assertEqual(len(expected), self._count_in(expected, contents)) not_expected = [ - 'harlowja A', - 'harlowja L', - 'harlowja L', + "harlowja A", + "harlowja L", + "harlowja L", ] self.assertEqual(0, self._count_in(not_expected, contents)) @@ -124,7 +137,7 @@ class TestGenericDistro(helpers.FilesystemMockingTestCase): d = cls("ubuntu", {}, None) self.patchOS(self.tmp) self.patchUtils(self.tmp) - for char in ['#', '@']: + for char in ["#", "@"]: util.write_file("/etc/sudoers", "{}includedir /b".format(char)) d.ensure_sudo_dir("/b") contents = util.load_file("/etc/sudoers") @@ -146,7 +159,7 @@ class TestGenericDistro(helpers.FilesystemMockingTestCase): d = cls("ubuntu", {}, None) self.patchOS(self.tmp) self.patchUtils(self.tmp) - os.makedirs('/run/systemd/system') + os.makedirs("/run/systemd/system") self.assertTrue(d.uses_systemd()) def test_systemd_not_in_use(self): @@ -161,18 +174,18 @@ class TestGenericDistro(helpers.FilesystemMockingTestCase): d = cls("ubuntu", {}, None) self.patchOS(self.tmp) self.patchUtils(self.tmp) - os.makedirs('/run/systemd') - os.symlink('/', '/run/systemd/system') + os.makedirs("/run/systemd") + os.symlink("/", "/run/systemd/system") self.assertFalse(d.uses_systemd()) - @mock.patch('cloudinit.distros.debian.read_system_locale') + @mock.patch("cloudinit.distros.debian.read_system_locale") def test_get_locale_ubuntu(self, m_locale): """Test ubuntu distro returns locale set to C.UTF-8""" - m_locale.return_value = 'C.UTF-8' + m_locale.return_value = "C.UTF-8" cls = distros.fetch("ubuntu") d = cls("ubuntu", {}, None) locale = d.get_locale() - self.assertEqual('C.UTF-8', locale) + self.assertEqual("C.UTF-8", locale) def test_get_locale_rhel(self): """Test rhel distro returns NotImplementedError exception""" @@ -197,11 +210,11 @@ class TestGenericDistro(helpers.FilesystemMockingTestCase): with mock.patch("cloudinit.subp.subp") as m_subp: d.expire_passwd("myuser") m_subp.assert_called_once_with( - ["pw", "usermod", "myuser", "-p", "01-Jan-1970"]) + ["pw", "usermod", "myuser", "-p", "01-Jan-1970"] + ) class TestGetPackageMirrors: - def return_first(self, mlist): if not mlist: return None @@ -219,77 +232,125 @@ class TestGetPackageMirrors: def return_last(self, mlist): if not mlist: return None - return(mlist[-1]) + return mlist[-1] @pytest.mark.parametrize( "allow_ec2_mirror, platform_type, mirrors", [ - (True, "ec2", [ - {'primary': 'http://us-east-1.ec2/', - 'security': 'http://security-mirror1-intel'}, - {'primary': 'http://us-east-1a.clouds/', - 'security': 'http://security-mirror2-intel'} - ]), - (True, "other", [ - {'primary': 'http://us-east-1.ec2/', - 'security': 'http://security-mirror1-intel'}, - {'primary': 'http://us-east-1a.clouds/', - 'security': 'http://security-mirror2-intel'} - ]), - (False, "ec2", [ - {'primary': 'http://us-east-1.ec2/', - 'security': 'http://security-mirror1-intel'}, - {'primary': 'http://us-east-1a.clouds/', - 'security': 'http://security-mirror2-intel'} - ]), - (False, "other", [ - {'primary': 'http://us-east-1a.clouds/', - 'security': 'http://security-mirror1-intel'}, - {'primary': 'http://fs-primary-intel', - 'security': 'http://security-mirror2-intel'} - ]) - ]) - def test_get_package_mirror_info_az_ec2(self, - allow_ec2_mirror, - platform_type, - mirrors): - flag_path = "cloudinit.distros." \ - "ALLOW_EC2_MIRRORS_ON_NON_AWS_INSTANCE_TYPES" + ( + True, + "ec2", + [ + { + "primary": "http://us-east-1.ec2/", + "security": "http://security-mirror1-intel", + }, + { + "primary": "http://us-east-1a.clouds/", + "security": "http://security-mirror2-intel", + }, + ], + ), + ( + True, + "other", + [ + { + "primary": "http://us-east-1.ec2/", + "security": "http://security-mirror1-intel", + }, + { + "primary": "http://us-east-1a.clouds/", + "security": "http://security-mirror2-intel", + }, + ], + ), + ( + False, + "ec2", + [ + { + "primary": "http://us-east-1.ec2/", + "security": "http://security-mirror1-intel", + }, + { + "primary": "http://us-east-1a.clouds/", + "security": "http://security-mirror2-intel", + }, + ], + ), + ( + False, + "other", + [ + { + "primary": "http://us-east-1a.clouds/", + "security": "http://security-mirror1-intel", + }, + { + "primary": "http://fs-primary-intel", + "security": "http://security-mirror2-intel", + }, + ], + ), + ], + ) + def test_get_package_mirror_info_az_ec2( + self, allow_ec2_mirror, platform_type, mirrors + ): + flag_path = ( + "cloudinit.distros.ALLOW_EC2_MIRRORS_ON_NON_AWS_INSTANCE_TYPES" + ) with mock.patch(flag_path, allow_ec2_mirror): arch_mirrors = gapmi(package_mirrors, arch="amd64") data_source_mock = mock.Mock( - availability_zone="us-east-1a", - platform_type=platform_type) - - results = gpmi(arch_mirrors, data_source=data_source_mock, - mirror_filter=self.return_first) - assert(results == mirrors[0]) - - results = gpmi(arch_mirrors, data_source=data_source_mock, - mirror_filter=self.return_second) - assert(results == mirrors[1]) - - results = gpmi(arch_mirrors, data_source=data_source_mock, - mirror_filter=self.return_none) - assert(results == package_mirrors[0]['failsafe']) + availability_zone="us-east-1a", platform_type=platform_type + ) + + results = gpmi( + arch_mirrors, + data_source=data_source_mock, + mirror_filter=self.return_first, + ) + assert results == mirrors[0] + + results = gpmi( + arch_mirrors, + data_source=data_source_mock, + mirror_filter=self.return_second, + ) + assert results == mirrors[1] + + results = gpmi( + arch_mirrors, + data_source=data_source_mock, + mirror_filter=self.return_none, + ) + assert results == package_mirrors[0]["failsafe"] def test_get_package_mirror_info_az_non_ec2(self): arch_mirrors = gapmi(package_mirrors, arch="amd64") data_source_mock = mock.Mock(availability_zone="nova.cloudvendor") - results = gpmi(arch_mirrors, data_source=data_source_mock, - mirror_filter=self.return_first) - assert(results == { - 'primary': 'http://nova.cloudvendor.clouds/', - 'security': 'http://security-mirror1-intel'} + results = gpmi( + arch_mirrors, + data_source=data_source_mock, + mirror_filter=self.return_first, ) - - results = gpmi(arch_mirrors, data_source=data_source_mock, - mirror_filter=self.return_last) - assert(results == { - 'primary': 'http://nova.cloudvendor.clouds/', - 'security': 'http://security-mirror2-intel'} + assert results == { + "primary": "http://nova.cloudvendor.clouds/", + "security": "http://security-mirror1-intel", + } + + results = gpmi( + arch_mirrors, + data_source=data_source_mock, + mirror_filter=self.return_last, ) + assert results == { + "primary": "http://nova.cloudvendor.clouds/", + "security": "http://security-mirror2-intel", + } def test_get_package_mirror_info_none(self): arch_mirrors = gapmi(package_mirrors, arch="amd64") @@ -298,18 +359,25 @@ class TestGetPackageMirrors: # because both search entries here replacement based on # availability-zone, the filter will be called with an empty list and # failsafe should be taken. - results = gpmi(arch_mirrors, data_source=data_source_mock, - mirror_filter=self.return_first) - assert(results == { - 'primary': 'http://fs-primary-intel', - 'security': 'http://security-mirror1-intel'} + results = gpmi( + arch_mirrors, + data_source=data_source_mock, + mirror_filter=self.return_first, ) - - results = gpmi(arch_mirrors, data_source=data_source_mock, - mirror_filter=self.return_last) - assert(results == { - 'primary': 'http://fs-primary-intel', - 'security': 'http://security-mirror2-intel'} + assert results == { + "primary": "http://fs-primary-intel", + "security": "http://security-mirror1-intel", + } + + results = gpmi( + arch_mirrors, + data_source=data_source_mock, + mirror_filter=self.return_last, ) + assert results == { + "primary": "http://fs-primary-intel", + "security": "http://security-mirror2-intel", + } + # vi: ts=4 expandtab diff --git a/tests/unittests/distros/test_gentoo.py b/tests/unittests/distros/test_gentoo.py index 4e4680b8..dadf5df5 100644 --- a/tests/unittests/distros/test_gentoo.py +++ b/tests/unittests/distros/test_gentoo.py @@ -1,13 +1,12 @@ # This file is part of cloud-init. See LICENSE file for license information. -from cloudinit import util -from cloudinit import atomic_helper +from cloudinit import atomic_helper, util from tests.unittests.helpers import CiTestCase + from . import _get_distro class TestGentoo(CiTestCase): - def test_write_hostname(self): distro = _get_distro("gentoo") hostname = "myhostname" @@ -22,5 +21,7 @@ class TestGentoo(CiTestCase): hostfile = self.tmp_path("hostfile") atomic_helper.write_file(hostfile, contents, omode="w") distro._write_hostname(hostname, hostfile) - self.assertEqual('#This is the hostname\nhostname="myhostname"\n', - util.load_file(hostfile)) + self.assertEqual( + '#This is the hostname\nhostname="myhostname"\n', + util.load_file(hostfile), + ) diff --git a/tests/unittests/distros/test_hostname.py b/tests/unittests/distros/test_hostname.py index f6d4dbe5..2cbbb3e2 100644 --- a/tests/unittests/distros/test_hostname.py +++ b/tests/unittests/distros/test_hostname.py @@ -4,13 +4,12 @@ import unittest from cloudinit.distros.parsers import hostname - -BASE_HOSTNAME = ''' +BASE_HOSTNAME = """ # My super-duper-hostname blahblah -''' +""" BASE_HOSTNAME = BASE_HOSTNAME.strip() @@ -18,7 +17,7 @@ class TestHostnameHelper(unittest.TestCase): def test_parse_same(self): hn = hostname.HostnameConf(BASE_HOSTNAME) self.assertEqual(str(hn).strip(), BASE_HOSTNAME) - self.assertEqual(hn.hostname, 'blahblah') + self.assertEqual(hn.hostname, "blahblah") def test_no_adjust_hostname(self): hn = hostname.HostnameConf(BASE_HOSTNAME) @@ -29,14 +28,15 @@ class TestHostnameHelper(unittest.TestCase): def test_adjust_hostname(self): hn = hostname.HostnameConf(BASE_HOSTNAME) prev_name = hn.hostname - self.assertEqual(prev_name, 'blahblah') + self.assertEqual(prev_name, "blahblah") hn.set_hostname("bbbbd") - self.assertEqual(hn.hostname, 'bbbbd') - expected_out = ''' + self.assertEqual(hn.hostname, "bbbbd") + expected_out = """ # My super-duper-hostname bbbbd -''' +""" self.assertEqual(str(hn).strip(), expected_out.strip()) + # vi: ts=4 expandtab diff --git a/tests/unittests/distros/test_hosts.py b/tests/unittests/distros/test_hosts.py index 8aaa6e48..faffd912 100644 --- a/tests/unittests/distros/test_hosts.py +++ b/tests/unittests/distros/test_hosts.py @@ -4,42 +4,44 @@ import unittest from cloudinit.distros.parsers import hosts - -BASE_ETC = ''' +BASE_ETC = """ # Example 127.0.0.1 localhost 192.168.1.10 foo.mydomain.org foo 192.168.1.10 bar.mydomain.org bar 146.82.138.7 master.debian.org master 209.237.226.90 www.opensource.org -''' +""" BASE_ETC = BASE_ETC.strip() class TestHostsHelper(unittest.TestCase): def test_parse(self): eh = hosts.HostsConf(BASE_ETC) - self.assertEqual(eh.get_entry('127.0.0.1'), [['localhost']]) - self.assertEqual(eh.get_entry('192.168.1.10'), - [['foo.mydomain.org', 'foo'], - ['bar.mydomain.org', 'bar']]) + self.assertEqual(eh.get_entry("127.0.0.1"), [["localhost"]]) + self.assertEqual( + eh.get_entry("192.168.1.10"), + [["foo.mydomain.org", "foo"], ["bar.mydomain.org", "bar"]], + ) eh = str(eh) - self.assertTrue(eh.startswith('# Example')) + self.assertTrue(eh.startswith("# Example")) def test_add(self): eh = hosts.HostsConf(BASE_ETC) - eh.add_entry('127.0.0.0', 'blah') - self.assertEqual(eh.get_entry('127.0.0.0'), [['blah']]) - eh.add_entry('127.0.0.3', 'blah', 'blah2', 'blah3') - self.assertEqual(eh.get_entry('127.0.0.3'), - [['blah', 'blah2', 'blah3']]) + eh.add_entry("127.0.0.0", "blah") + self.assertEqual(eh.get_entry("127.0.0.0"), [["blah"]]) + eh.add_entry("127.0.0.3", "blah", "blah2", "blah3") + self.assertEqual( + eh.get_entry("127.0.0.3"), [["blah", "blah2", "blah3"]] + ) def test_del(self): eh = hosts.HostsConf(BASE_ETC) - eh.add_entry('127.0.0.0', 'blah') - self.assertEqual(eh.get_entry('127.0.0.0'), [['blah']]) + eh.add_entry("127.0.0.0", "blah") + self.assertEqual(eh.get_entry("127.0.0.0"), [["blah"]]) + + eh.del_entries("127.0.0.0") + self.assertEqual(eh.get_entry("127.0.0.0"), []) - eh.del_entries('127.0.0.0') - self.assertEqual(eh.get_entry('127.0.0.0'), []) # vi: ts=4 expandtab diff --git a/tests/unittests/distros/test_init.py b/tests/unittests/distros/test_init.py index fd64a322..8f3c8978 100644 --- a/tests/unittests/distros/test_init.py +++ b/tests/unittests/distros/test_init.py @@ -9,16 +9,17 @@ from unittest import mock import pytest -from cloudinit.distros import _get_package_mirror_info, LDH_ASCII_CHARS +from cloudinit.distros import LDH_ASCII_CHARS, _get_package_mirror_info # In newer versions of Python, these characters will be omitted instead # of substituted because of security concerns. # See https://bugs.python.org/issue43882 -SECURITY_URL_CHARS = '\n\r\t' +SECURITY_URL_CHARS = "\n\r\t" # Define a set of characters we would expect to be replaced INVALID_URL_CHARS = [ - chr(x) for x in range(127) + chr(x) + for x in range(127) if chr(x) not in LDH_ASCII_CHARS + SECURITY_URL_CHARS ] for separator in [":", ".", "/", "#", "?", "@", "[", "]"]: @@ -37,21 +38,41 @@ class TestGetPackageMirrorInfo: These tests are more focused on specific aspects of the unit under test. """ - @pytest.mark.parametrize('mirror_info,expected', [ - # Empty info gives empty return - ({}, {}), - # failsafe values used if present - ({'failsafe': {'primary': 'http://value', 'security': 'http://other'}}, - {'primary': 'http://value', 'security': 'http://other'}), - # search values used if present - ({'search': {'primary': ['http://value'], - 'security': ['http://other']}}, - {'primary': ['http://value'], 'security': ['http://other']}), - # failsafe values used if search value not present - ({'search': {'primary': ['http://value']}, - 'failsafe': {'security': 'http://other'}}, - {'primary': ['http://value'], 'security': 'http://other'}) - ]) + @pytest.mark.parametrize( + "mirror_info,expected", + [ + # Empty info gives empty return + ({}, {}), + # failsafe values used if present + ( + { + "failsafe": { + "primary": "http://value", + "security": "http://other", + } + }, + {"primary": "http://value", "security": "http://other"}, + ), + # search values used if present + ( + { + "search": { + "primary": ["http://value"], + "security": ["http://other"], + } + }, + {"primary": ["http://value"], "security": ["http://other"]}, + ), + # failsafe values used if search value not present + ( + { + "search": {"primary": ["http://value"]}, + "failsafe": {"security": "http://other"}, + }, + {"primary": ["http://value"], "security": "http://other"}, + ), + ], + ) def test_get_package_mirror_info_failsafe(self, mirror_info, expected): """ Test the interaction between search and failsafe inputs @@ -60,97 +81,163 @@ class TestGetPackageMirrorInfo: options; test_failsafe_used_if_all_search_results_filtered_out covers that.) """ - assert expected == _get_package_mirror_info(mirror_info, - mirror_filter=lambda x: x) + assert expected == _get_package_mirror_info( + mirror_info, mirror_filter=lambda x: x + ) def test_failsafe_used_if_all_search_results_filtered_out(self): """Test the failsafe option used if all search options eliminated.""" mirror_info = { - 'search': {'primary': ['http://value']}, - 'failsafe': {'primary': 'http://other'} + "search": {"primary": ["http://value"]}, + "failsafe": {"primary": "http://other"}, } - assert {'primary': 'http://other'} == _get_package_mirror_info( - mirror_info, mirror_filter=lambda x: False) + assert {"primary": "http://other"} == _get_package_mirror_info( + mirror_info, mirror_filter=lambda x: False + ) - @pytest.mark.parametrize('allow_ec2_mirror, platform_type', [ - (True, 'ec2') - ]) - @pytest.mark.parametrize('availability_zone,region,patterns,expected', ( - # Test ec2_region alone - ('fk-fake-1f', None, ['http://EC2-%(ec2_region)s/ubuntu'], - ['http://ec2-fk-fake-1/ubuntu']), - # Test availability_zone alone - ('fk-fake-1f', None, ['http://AZ-%(availability_zone)s/ubuntu'], - ['http://az-fk-fake-1f/ubuntu']), - # Test region alone - (None, 'fk-fake-1', ['http://RG-%(region)s/ubuntu'], - ['http://rg-fk-fake-1/ubuntu']), - # Test that ec2_region is not available for non-matching AZs - ('fake-fake-1f', None, - ['http://EC2-%(ec2_region)s/ubuntu', - 'http://AZ-%(availability_zone)s/ubuntu'], - ['http://az-fake-fake-1f/ubuntu']), - # Test that template order maintained - (None, 'fake-region', - ['http://RG-%(region)s-2/ubuntu', 'http://RG-%(region)s-1/ubuntu'], - ['http://rg-fake-region-2/ubuntu', 'http://rg-fake-region-1/ubuntu']), - # Test that non-ASCII hostnames are IDNA encoded; - # "IDNA-ТεЅТ̣".encode('idna') == b"xn--idna--4kd53hh6aba3q" - (None, 'ТεЅТ̣', ['http://www.IDNA-%(region)s.com/ubuntu'], - ['http://www.xn--idna--4kd53hh6aba3q.com/ubuntu']), - # Test that non-ASCII hostnames with a port are IDNA encoded; - # "IDNA-ТεЅТ̣".encode('idna') == b"xn--idna--4kd53hh6aba3q" - (None, 'ТεЅТ̣', ['http://www.IDNA-%(region)s.com:8080/ubuntu'], - ['http://www.xn--idna--4kd53hh6aba3q.com:8080/ubuntu']), - # Test that non-ASCII non-hostname parts of URLs are unchanged - (None, 'ТεЅТ̣', ['http://www.example.com/%(region)s/ubuntu'], - ['http://www.example.com/ТεЅТ̣/ubuntu']), - # Test that IPv4 addresses are unchanged - (None, 'fk-fake-1', ['http://192.168.1.1:8080/%(region)s/ubuntu'], - ['http://192.168.1.1:8080/fk-fake-1/ubuntu']), - # Test that IPv6 addresses are unchanged - (None, 'fk-fake-1', - ['http://[2001:67c:1360:8001::23]/%(region)s/ubuntu'], - ['http://[2001:67c:1360:8001::23]/fk-fake-1/ubuntu']), - # Test that unparseable URLs are filtered out of the mirror list - (None, 'inv[lid', - ['http://%(region)s.in.hostname/should/be/filtered', - 'http://but.not.in.the.path/%(region)s'], - ['http://but.not.in.the.path/inv[lid']), - (None, '-some-region-', - ['http://-lead-ing.%(region)s.trail-ing-.example.com/ubuntu'], - ['http://lead-ing.some-region.trail-ing.example.com/ubuntu']), - ) + tuple( - # Dynamically generate a test case for each non-LDH - # (Letters/Digits/Hyphen) ASCII character, testing that it is - # substituted with a hyphen - (None, 'fk{0}fake{0}1'.format(invalid_char), - ['http://%(region)s/ubuntu'], ['http://fk-fake-1/ubuntu']) - for invalid_char in INVALID_URL_CHARS - )) - def test_valid_substitution(self, - allow_ec2_mirror, - platform_type, - availability_zone, - region, - patterns, - expected): + @pytest.mark.parametrize( + "allow_ec2_mirror, platform_type", [(True, "ec2")] + ) + @pytest.mark.parametrize( + "availability_zone,region,patterns,expected", + ( + # Test ec2_region alone + ( + "fk-fake-1f", + None, + ["http://EC2-%(ec2_region)s/ubuntu"], + ["http://ec2-fk-fake-1/ubuntu"], + ), + # Test availability_zone alone + ( + "fk-fake-1f", + None, + ["http://AZ-%(availability_zone)s/ubuntu"], + ["http://az-fk-fake-1f/ubuntu"], + ), + # Test region alone + ( + None, + "fk-fake-1", + ["http://RG-%(region)s/ubuntu"], + ["http://rg-fk-fake-1/ubuntu"], + ), + # Test that ec2_region is not available for non-matching AZs + ( + "fake-fake-1f", + None, + [ + "http://EC2-%(ec2_region)s/ubuntu", + "http://AZ-%(availability_zone)s/ubuntu", + ], + ["http://az-fake-fake-1f/ubuntu"], + ), + # Test that template order maintained + ( + None, + "fake-region", + [ + "http://RG-%(region)s-2/ubuntu", + "http://RG-%(region)s-1/ubuntu", + ], + [ + "http://rg-fake-region-2/ubuntu", + "http://rg-fake-region-1/ubuntu", + ], + ), + # Test that non-ASCII hostnames are IDNA encoded; + # "IDNA-ТεЅТ̣".encode('idna') == b"xn--idna--4kd53hh6aba3q" + ( + None, + "ТεЅТ̣", + ["http://www.IDNA-%(region)s.com/ubuntu"], + ["http://www.xn--idna--4kd53hh6aba3q.com/ubuntu"], + ), + # Test that non-ASCII hostnames with a port are IDNA encoded; + # "IDNA-ТεЅТ̣".encode('idna') == b"xn--idna--4kd53hh6aba3q" + ( + None, + "ТεЅТ̣", + ["http://www.IDNA-%(region)s.com:8080/ubuntu"], + ["http://www.xn--idna--4kd53hh6aba3q.com:8080/ubuntu"], + ), + # Test that non-ASCII non-hostname parts of URLs are unchanged + ( + None, + "ТεЅТ̣", + ["http://www.example.com/%(region)s/ubuntu"], + ["http://www.example.com/ТεЅТ̣/ubuntu"], + ), + # Test that IPv4 addresses are unchanged + ( + None, + "fk-fake-1", + ["http://192.168.1.1:8080/%(region)s/ubuntu"], + ["http://192.168.1.1:8080/fk-fake-1/ubuntu"], + ), + # Test that IPv6 addresses are unchanged + ( + None, + "fk-fake-1", + ["http://[2001:67c:1360:8001::23]/%(region)s/ubuntu"], + ["http://[2001:67c:1360:8001::23]/fk-fake-1/ubuntu"], + ), + # Test that unparseable URLs are filtered out of the mirror list + ( + None, + "inv[lid", + [ + "http://%(region)s.in.hostname/should/be/filtered", + "http://but.not.in.the.path/%(region)s", + ], + ["http://but.not.in.the.path/inv[lid"], + ), + ( + None, + "-some-region-", + ["http://-lead-ing.%(region)s.trail-ing-.example.com/ubuntu"], + ["http://lead-ing.some-region.trail-ing.example.com/ubuntu"], + ), + ) + + tuple( + # Dynamically generate a test case for each non-LDH + # (Letters/Digits/Hyphen) ASCII character, testing that it is + # substituted with a hyphen + ( + None, + "fk{0}fake{0}1".format(invalid_char), + ["http://%(region)s/ubuntu"], + ["http://fk-fake-1/ubuntu"], + ) + for invalid_char in INVALID_URL_CHARS + ), + ) + def test_valid_substitution( + self, + allow_ec2_mirror, + platform_type, + availability_zone, + region, + patterns, + expected, + ): """Test substitution works as expected.""" - flag_path = "cloudinit.distros." \ - "ALLOW_EC2_MIRRORS_ON_NON_AWS_INSTANCE_TYPES" + flag_path = ( + "cloudinit.distros.ALLOW_EC2_MIRRORS_ON_NON_AWS_INSTANCE_TYPES" + ) m_data_source = mock.Mock( availability_zone=availability_zone, region=region, - platform_type=platform_type + platform_type=platform_type, ) - mirror_info = {'search': {'primary': patterns}} + mirror_info = {"search": {"primary": patterns}} with mock.patch(flag_path, allow_ec2_mirror): ret = _get_package_mirror_info( mirror_info, data_source=m_data_source, - mirror_filter=lambda x: x + mirror_filter=lambda x: x, ) print(allow_ec2_mirror) print(platform_type) @@ -158,4 +245,4 @@ class TestGetPackageMirrorInfo: print(region) print(patterns) print(expected) - assert {'primary': expected} == ret + assert {"primary": expected} == ret diff --git a/tests/unittests/distros/test_manage_service.py b/tests/unittests/distros/test_manage_service.py index 6f1bd0b1..9e64b35c 100644 --- a/tests/unittests/distros/test_manage_service.py +++ b/tests/unittests/distros/test_manage_service.py @@ -1,6 +1,6 @@ # This file is part of cloud-init. See LICENSE file for license information. -from tests.unittests.helpers import (CiTestCase, mock) +from tests.unittests.helpers import CiTestCase, mock from tests.unittests.util import MockDistro @@ -12,27 +12,30 @@ class TestManageService(CiTestCase): super(TestManageService, self).setUp() self.dist = MockDistro() - @mock.patch.object(MockDistro, 'uses_systemd', return_value=False) + @mock.patch.object(MockDistro, "uses_systemd", return_value=False) @mock.patch("cloudinit.distros.subp.subp") def test_manage_service_systemctl_initcmd(self, m_subp, m_sysd): - self.dist.init_cmd = ['systemctl'] - self.dist.manage_service('start', 'myssh') - m_subp.assert_called_with(['systemctl', 'start', 'myssh'], - capture=True) + self.dist.init_cmd = ["systemctl"] + self.dist.manage_service("start", "myssh") + m_subp.assert_called_with( + ["systemctl", "start", "myssh"], capture=True + ) - @mock.patch.object(MockDistro, 'uses_systemd', return_value=False) + @mock.patch.object(MockDistro, "uses_systemd", return_value=False) @mock.patch("cloudinit.distros.subp.subp") def test_manage_service_service_initcmd(self, m_subp, m_sysd): - self.dist.init_cmd = ['service'] - self.dist.manage_service('start', 'myssh') - m_subp.assert_called_with(['service', 'myssh', 'start'], capture=True) + self.dist.init_cmd = ["service"] + self.dist.manage_service("start", "myssh") + m_subp.assert_called_with(["service", "myssh", "start"], capture=True) - @mock.patch.object(MockDistro, 'uses_systemd', return_value=True) + @mock.patch.object(MockDistro, "uses_systemd", return_value=True) @mock.patch("cloudinit.distros.subp.subp") def test_manage_service_systemctl(self, m_subp, m_sysd): - self.dist.init_cmd = ['ignore'] - self.dist.manage_service('start', 'myssh') - m_subp.assert_called_with(['systemctl', 'start', 'myssh'], - capture=True) + self.dist.init_cmd = ["ignore"] + self.dist.manage_service("start", "myssh") + m_subp.assert_called_with( + ["systemctl", "start", "myssh"], capture=True + ) + # vi: ts=4 sw=4 expandtab diff --git a/tests/unittests/distros/test_netbsd.py b/tests/unittests/distros/test_netbsd.py index 11a68d2a..0bc6dfbd 100644 --- a/tests/unittests/distros/test_netbsd.py +++ b/tests/unittests/distros/test_netbsd.py @@ -1,10 +1,11 @@ -import cloudinit.distros.netbsd +import unittest.mock as mock import pytest -import unittest.mock as mock + +import cloudinit.distros.netbsd -@pytest.mark.parametrize('with_pkgin', (True, False)) +@pytest.mark.parametrize("with_pkgin", (True, False)) @mock.patch("cloudinit.distros.netbsd.os") def test_init(m_os, with_pkgin): print(with_pkgin) @@ -12,6 +13,6 @@ def test_init(m_os, with_pkgin): cfg = {} distro = cloudinit.distros.netbsd.NetBSD("netbsd", cfg, None) - expectation = ['pkgin', '-y', 'full-upgrade'] if with_pkgin else None + expectation = ["pkgin", "-y", "full-upgrade"] if with_pkgin else None assert distro.pkg_cmd_upgrade_prefix == expectation - assert [mock.call('/usr/pkg/bin/pkgin')] == m_os.path.exists.call_args_list + assert [mock.call("/usr/pkg/bin/pkgin")] == m_os.path.exists.call_args_list diff --git a/tests/unittests/distros/test_netconfig.py b/tests/unittests/distros/test_netconfig.py index 90ac5578..a25be481 100644 --- a/tests/unittests/distros/test_netconfig.py +++ b/tests/unittests/distros/test_netconfig.py @@ -7,17 +7,11 @@ from io import StringIO from textwrap import dedent from unittest import mock -from cloudinit import distros +from cloudinit import distros, helpers, safeyaml, settings, subp, util from cloudinit.distros.parsers.sys_conf import SysConf -from cloudinit import helpers -from cloudinit import settings -from tests.unittests.helpers import ( - FilesystemMockingTestCase, dir2dict) -from cloudinit import subp -from cloudinit import util -from cloudinit import safeyaml - -BASE_NET_CFG = ''' +from tests.unittests.helpers import FilesystemMockingTestCase, dir2dict + +BASE_NET_CFG = """ auto lo iface lo inet loopback @@ -31,9 +25,9 @@ iface eth0 inet static auto eth1 iface eth1 inet dhcp -''' +""" -BASE_NET_CFG_FROM_V2 = ''' +BASE_NET_CFG_FROM_V2 = """ auto lo iface lo inet loopback @@ -44,9 +38,9 @@ iface eth0 inet static auto eth1 iface eth1 inet dhcp -''' +""" -BASE_NET_CFG_IPV6 = ''' +BASE_NET_CFG_IPV6 = """ auto lo iface lo inet loopback @@ -74,20 +68,31 @@ iface eth1 inet6 static address 2607:f0d0:1002:0011::3 netmask 64 gateway 2607:f0d0:1002:0011::1 -''' - -V1_NET_CFG = {'config': [{'name': 'eth0', +""" - 'subnets': [{'address': '192.168.1.5', - 'broadcast': '192.168.1.0', - 'gateway': '192.168.1.254', - 'netmask': '255.255.255.0', - 'type': 'static'}], - 'type': 'physical'}, - {'name': 'eth1', - 'subnets': [{'control': 'auto', 'type': 'dhcp4'}], - 'type': 'physical'}], - 'version': 1} +V1_NET_CFG = { + "config": [ + { + "name": "eth0", + "subnets": [ + { + "address": "192.168.1.5", + "broadcast": "192.168.1.0", + "gateway": "192.168.1.254", + "netmask": "255.255.255.0", + "type": "static", + } + ], + "type": "physical", + }, + { + "name": "eth1", + "subnets": [{"control": "auto", "type": "dhcp4"}], + "type": "physical", + }, + ], + "version": 1, +} V1_NET_CFG_WITH_DUPS = """\ # same value in interface specific dns and global dns @@ -144,19 +149,28 @@ auto eth1 iface eth1 inet dhcp """ -V1_NET_CFG_IPV6 = {'config': [{'name': 'eth0', - 'subnets': [{'address': - '2607:f0d0:1002:0011::2', - 'gateway': - '2607:f0d0:1002:0011::1', - 'netmask': '64', - 'type': 'static6'}], - 'type': 'physical'}, - {'name': 'eth1', - 'subnets': [{'control': 'auto', - 'type': 'dhcp4'}], - 'type': 'physical'}], - 'version': 1} +V1_NET_CFG_IPV6 = { + "config": [ + { + "name": "eth0", + "subnets": [ + { + "address": "2607:f0d0:1002:0011::2", + "gateway": "2607:f0d0:1002:0011::1", + "netmask": "64", + "type": "static6", + } + ], + "type": "physical", + }, + { + "name": "eth1", + "subnets": [{"control": "auto", "type": "dhcp4"}], + "type": "physical", + }, + ], + "version": 1, +} V1_TO_V2_NET_CFG_OUTPUT = """\ @@ -194,14 +208,11 @@ network: """ V2_NET_CFG = { - 'ethernets': { - 'eth7': { - 'addresses': ['192.168.1.5/24'], - 'gateway4': '192.168.1.254'}, - 'eth9': { - 'dhcp4': True} + "ethernets": { + "eth7": {"addresses": ["192.168.1.5/24"], "gateway4": "192.168.1.254"}, + "eth9": {"dhcp4": True}, }, - 'version': 2 + "version": 2, } @@ -237,19 +248,18 @@ class WriteBuffer(object): class TestNetCfgDistroBase(FilesystemMockingTestCase): - def setUp(self): super(TestNetCfgDistroBase, self).setUp() - self.add_patch('cloudinit.util.system_is_snappy', 'm_snappy') + self.add_patch("cloudinit.util.system_is_snappy", "m_snappy") def _get_distro(self, dname, renderers=None): cls = distros.fetch(dname) cfg = settings.CFG_BUILTIN - cfg['system_info']['distro'] = dname + cfg["system_info"]["distro"] = dname if renderers: - cfg['system_info']['network'] = {'renderers': renderers} + cfg["system_info"]["network"] = {"renderers": renderers} paths = helpers.Paths({}) - return cls(dname, cfg.get('system_info'), paths) + return cls(dname, cfg.get("system_info"), paths) def assertCfgEquals(self, blob1, blob2): b1 = dict(SysConf(blob1.strip().splitlines())) @@ -264,23 +274,23 @@ class TestNetCfgDistroBase(FilesystemMockingTestCase): class TestNetCfgDistroFreeBSD(TestNetCfgDistroBase): - def setUp(self): super(TestNetCfgDistroFreeBSD, self).setUp() - self.distro = self._get_distro('freebsd', renderers=['freebsd']) + self.distro = self._get_distro("freebsd", renderers=["freebsd"]) - def _apply_and_verify_freebsd(self, apply_fn, config, expected_cfgs=None, - bringup=False): + def _apply_and_verify_freebsd( + self, apply_fn, config, expected_cfgs=None, bringup=False + ): if not expected_cfgs: - raise ValueError('expected_cfg must not be None') + raise ValueError("expected_cfg must not be None") tmpd = None - with mock.patch('cloudinit.net.freebsd.available') as m_avail: + with mock.patch("cloudinit.net.freebsd.available") as m_avail: m_avail.return_value = True with self.reRooted(tmpd) as tmpd: - util.ensure_dir('/etc') - util.ensure_file('/etc/rc.conf') - util.ensure_file('/etc/resolv.conf') + util.ensure_dir("/etc") + util.ensure_file("/etc/rc.conf") + util.ensure_file("/etc/resolv.conf") apply_fn(config, bringup) results = dir2dict(tmpd) @@ -291,14 +301,14 @@ class TestNetCfgDistroFreeBSD(TestNetCfgDistroBase): print(results[cfgpath]) print("----------") self.assertEqual( - set(expected.split('\n')), - set(results[cfgpath].split('\n'))) + set(expected.split("\n")), set(results[cfgpath].split("\n")) + ) self.assertEqual(0o644, get_mode(cfgpath, tmpd)) - @mock.patch('cloudinit.net.get_interfaces_by_mac') + @mock.patch("cloudinit.net.get_interfaces_by_mac") def test_apply_network_config_freebsd_standard(self, ifaces_mac): ifaces_mac.return_value = { - '00:15:5d:4c:73:00': 'eth0', + "00:15:5d:4c:73:00": "eth0", } rc_conf_expected = """\ defaultrouter=192.168.1.254 @@ -307,17 +317,19 @@ ifconfig_eth1=DHCP """ expected_cfgs = { - '/etc/rc.conf': rc_conf_expected, - '/etc/resolv.conf': '' + "/etc/rc.conf": rc_conf_expected, + "/etc/resolv.conf": "", } - self._apply_and_verify_freebsd(self.distro.apply_network_config, - V1_NET_CFG, - expected_cfgs=expected_cfgs.copy()) + self._apply_and_verify_freebsd( + self.distro.apply_network_config, + V1_NET_CFG, + expected_cfgs=expected_cfgs.copy(), + ) - @mock.patch('cloudinit.net.get_interfaces_by_mac') + @mock.patch("cloudinit.net.get_interfaces_by_mac") def test_apply_network_config_freebsd_ifrename(self, ifaces_mac): ifaces_mac.return_value = { - '00:15:5d:4c:73:00': 'vtnet0', + "00:15:5d:4c:73:00": "vtnet0", } rc_conf_expected = """\ ifconfig_vtnet0_name=eth0 @@ -327,49 +339,51 @@ ifconfig_eth1=DHCP """ V1_NET_CFG_RENAME = copy.deepcopy(V1_NET_CFG) - V1_NET_CFG_RENAME['config'][0]['mac_address'] = '00:15:5d:4c:73:00' + V1_NET_CFG_RENAME["config"][0]["mac_address"] = "00:15:5d:4c:73:00" expected_cfgs = { - '/etc/rc.conf': rc_conf_expected, - '/etc/resolv.conf': '' + "/etc/rc.conf": rc_conf_expected, + "/etc/resolv.conf": "", } - self._apply_and_verify_freebsd(self.distro.apply_network_config, - V1_NET_CFG_RENAME, - expected_cfgs=expected_cfgs.copy()) + self._apply_and_verify_freebsd( + self.distro.apply_network_config, + V1_NET_CFG_RENAME, + expected_cfgs=expected_cfgs.copy(), + ) - @mock.patch('cloudinit.net.get_interfaces_by_mac') + @mock.patch("cloudinit.net.get_interfaces_by_mac") def test_apply_network_config_freebsd_nameserver(self, ifaces_mac): ifaces_mac.return_value = { - '00:15:5d:4c:73:00': 'eth0', + "00:15:5d:4c:73:00": "eth0", } V1_NET_CFG_DNS = copy.deepcopy(V1_NET_CFG) - ns = ['1.2.3.4'] - V1_NET_CFG_DNS['config'][0]['subnets'][0]['dns_nameservers'] = ns - expected_cfgs = { - '/etc/resolv.conf': 'nameserver 1.2.3.4\n' - } - self._apply_and_verify_freebsd(self.distro.apply_network_config, - V1_NET_CFG_DNS, - expected_cfgs=expected_cfgs.copy()) + ns = ["1.2.3.4"] + V1_NET_CFG_DNS["config"][0]["subnets"][0]["dns_nameservers"] = ns + expected_cfgs = {"/etc/resolv.conf": "nameserver 1.2.3.4\n"} + self._apply_and_verify_freebsd( + self.distro.apply_network_config, + V1_NET_CFG_DNS, + expected_cfgs=expected_cfgs.copy(), + ) class TestNetCfgDistroUbuntuEni(TestNetCfgDistroBase): - def setUp(self): super(TestNetCfgDistroUbuntuEni, self).setUp() - self.distro = self._get_distro('ubuntu', renderers=['eni']) + self.distro = self._get_distro("ubuntu", renderers=["eni"]) def eni_path(self): - return '/etc/network/interfaces.d/50-cloud-init.cfg' + return "/etc/network/interfaces.d/50-cloud-init.cfg" - def _apply_and_verify_eni(self, apply_fn, config, expected_cfgs=None, - bringup=False): + def _apply_and_verify_eni( + self, apply_fn, config, expected_cfgs=None, bringup=False + ): if not expected_cfgs: - raise ValueError('expected_cfg must not be None') + raise ValueError("expected_cfg must not be None") tmpd = None - with mock.patch('cloudinit.net.eni.available') as m_avail: + with mock.patch("cloudinit.net.eni.available") as m_avail: m_avail.return_value = True with self.reRooted(tmpd) as tmpd: apply_fn(config, bringup) @@ -389,35 +403,39 @@ class TestNetCfgDistroUbuntuEni(TestNetCfgDistroBase): self.eni_path(): V1_NET_CFG_OUTPUT, } # ub_distro.apply_network_config(V1_NET_CFG, False) - self._apply_and_verify_eni(self.distro.apply_network_config, - V1_NET_CFG, - expected_cfgs=expected_cfgs.copy()) + self._apply_and_verify_eni( + self.distro.apply_network_config, + V1_NET_CFG, + expected_cfgs=expected_cfgs.copy(), + ) def test_apply_network_config_ipv6_ub(self): - expected_cfgs = { - self.eni_path(): V1_NET_CFG_IPV6_OUTPUT - } - self._apply_and_verify_eni(self.distro.apply_network_config, - V1_NET_CFG_IPV6, - expected_cfgs=expected_cfgs.copy()) + expected_cfgs = {self.eni_path(): V1_NET_CFG_IPV6_OUTPUT} + self._apply_and_verify_eni( + self.distro.apply_network_config, + V1_NET_CFG_IPV6, + expected_cfgs=expected_cfgs.copy(), + ) class TestNetCfgDistroUbuntuNetplan(TestNetCfgDistroBase): def setUp(self): super(TestNetCfgDistroUbuntuNetplan, self).setUp() - self.distro = self._get_distro('ubuntu', renderers=['netplan']) - self.devlist = ['eth0', 'lo'] + self.distro = self._get_distro("ubuntu", renderers=["netplan"]) + self.devlist = ["eth0", "lo"] - def _apply_and_verify_netplan(self, apply_fn, config, expected_cfgs=None, - bringup=False): + def _apply_and_verify_netplan( + self, apply_fn, config, expected_cfgs=None, bringup=False + ): if not expected_cfgs: - raise ValueError('expected_cfg must not be None') + raise ValueError("expected_cfg must not be None") tmpd = None - with mock.patch('cloudinit.net.netplan.available', - return_value=True): - with mock.patch("cloudinit.net.netplan.get_devicelist", - return_value=self.devlist): + with mock.patch("cloudinit.net.netplan.available", return_value=True): + with mock.patch( + "cloudinit.net.netplan.get_devicelist", + return_value=self.devlist, + ): with self.reRooted(tmpd) as tmpd: apply_fn(config, bringup) @@ -432,7 +450,7 @@ class TestNetCfgDistroUbuntuNetplan(TestNetCfgDistroBase): self.assertEqual(0o644, get_mode(cfgpath, tmpd)) def netplan_path(self): - return '/etc/netplan/50-cloud-init.yaml' + return "/etc/netplan/50-cloud-init.yaml" def test_apply_network_config_v1_to_netplan_ub(self): expected_cfgs = { @@ -440,9 +458,11 @@ class TestNetCfgDistroUbuntuNetplan(TestNetCfgDistroBase): } # ub_distro.apply_network_config(V1_NET_CFG, False) - self._apply_and_verify_netplan(self.distro.apply_network_config, - V1_NET_CFG, - expected_cfgs=expected_cfgs.copy()) + self._apply_and_verify_netplan( + self.distro.apply_network_config, + V1_NET_CFG, + expected_cfgs=expected_cfgs.copy(), + ) def test_apply_network_config_v1_ipv6_to_netplan_ub(self): expected_cfgs = { @@ -450,39 +470,43 @@ class TestNetCfgDistroUbuntuNetplan(TestNetCfgDistroBase): } # ub_distro.apply_network_config(V1_NET_CFG_IPV6, False) - self._apply_and_verify_netplan(self.distro.apply_network_config, - V1_NET_CFG_IPV6, - expected_cfgs=expected_cfgs.copy()) + self._apply_and_verify_netplan( + self.distro.apply_network_config, + V1_NET_CFG_IPV6, + expected_cfgs=expected_cfgs.copy(), + ) def test_apply_network_config_v2_passthrough_ub(self): expected_cfgs = { self.netplan_path(): V2_TO_V2_NET_CFG_OUTPUT, } # ub_distro.apply_network_config(V2_NET_CFG, False) - self._apply_and_verify_netplan(self.distro.apply_network_config, - V2_NET_CFG, - expected_cfgs=expected_cfgs.copy()) + self._apply_and_verify_netplan( + self.distro.apply_network_config, + V2_NET_CFG, + expected_cfgs=expected_cfgs.copy(), + ) class TestNetCfgDistroRedhat(TestNetCfgDistroBase): - def setUp(self): super(TestNetCfgDistroRedhat, self).setUp() - self.distro = self._get_distro('rhel', renderers=['sysconfig']) + self.distro = self._get_distro("rhel", renderers=["sysconfig"]) def ifcfg_path(self, ifname): - return '/etc/sysconfig/network-scripts/ifcfg-%s' % ifname + return "/etc/sysconfig/network-scripts/ifcfg-%s" % ifname def control_path(self): - return '/etc/sysconfig/network' + return "/etc/sysconfig/network" - def _apply_and_verify(self, apply_fn, config, expected_cfgs=None, - bringup=False): + def _apply_and_verify( + self, apply_fn, config, expected_cfgs=None, bringup=False + ): if not expected_cfgs: - raise ValueError('expected_cfg must not be None') + raise ValueError("expected_cfg must not be None") tmpd = None - with mock.patch('cloudinit.net.sysconfig.available') as m_avail: + with mock.patch("cloudinit.net.sysconfig.available") as m_avail: m_avail.return_value = True with self.reRooted(tmpd) as tmpd: apply_fn(config, bringup) @@ -494,7 +518,8 @@ class TestNetCfgDistroRedhat(TestNetCfgDistroBase): def test_apply_network_config_rh(self): expected_cfgs = { - self.ifcfg_path('eth0'): dedent("""\ + self.ifcfg_path("eth0"): dedent( + """\ BOOTPROTO=none DEFROUTE=yes DEVICE=eth0 @@ -505,27 +530,35 @@ class TestNetCfgDistroRedhat(TestNetCfgDistroBase): ONBOOT=yes TYPE=Ethernet USERCTL=no - """), - self.ifcfg_path('eth1'): dedent("""\ + """ + ), + self.ifcfg_path("eth1"): dedent( + """\ BOOTPROTO=dhcp DEVICE=eth1 NM_CONTROLLED=no ONBOOT=yes TYPE=Ethernet USERCTL=no - """), - self.control_path(): dedent("""\ + """ + ), + self.control_path(): dedent( + """\ NETWORKING=yes - """), + """ + ), } # rh_distro.apply_network_config(V1_NET_CFG, False) - self._apply_and_verify(self.distro.apply_network_config, - V1_NET_CFG, - expected_cfgs=expected_cfgs.copy()) + self._apply_and_verify( + self.distro.apply_network_config, + V1_NET_CFG, + expected_cfgs=expected_cfgs.copy(), + ) def test_apply_network_config_ipv6_rh(self): expected_cfgs = { - self.ifcfg_path('eth0'): dedent("""\ + self.ifcfg_path("eth0"): dedent( + """\ BOOTPROTO=none DEFROUTE=yes DEVICE=eth0 @@ -538,39 +571,54 @@ class TestNetCfgDistroRedhat(TestNetCfgDistroBase): ONBOOT=yes TYPE=Ethernet USERCTL=no - """), - self.ifcfg_path('eth1'): dedent("""\ + """ + ), + self.ifcfg_path("eth1"): dedent( + """\ BOOTPROTO=dhcp DEVICE=eth1 NM_CONTROLLED=no ONBOOT=yes TYPE=Ethernet USERCTL=no - """), - self.control_path(): dedent("""\ + """ + ), + self.control_path(): dedent( + """\ NETWORKING=yes NETWORKING_IPV6=yes IPV6_AUTOCONF=no - """), + """ + ), } # rh_distro.apply_network_config(V1_NET_CFG_IPV6, False) - self._apply_and_verify(self.distro.apply_network_config, - V1_NET_CFG_IPV6, - expected_cfgs=expected_cfgs.copy()) + self._apply_and_verify( + self.distro.apply_network_config, + V1_NET_CFG_IPV6, + expected_cfgs=expected_cfgs.copy(), + ) def test_vlan_render_unsupported(self): """Render officially unsupported vlan names.""" cfg = { - 'version': 2, - 'ethernets': { - 'eth0': {'addresses': ["192.10.1.2/24"], - 'match': {'macaddress': "00:16:3e:60:7c:df"}}}, - 'vlans': { - 'infra0': {'addresses': ["10.0.1.2/16"], - 'id': 1001, 'link': 'eth0'}}, + "version": 2, + "ethernets": { + "eth0": { + "addresses": ["192.10.1.2/24"], + "match": {"macaddress": "00:16:3e:60:7c:df"}, + } + }, + "vlans": { + "infra0": { + "addresses": ["10.0.1.2/16"], + "id": 1001, + "link": "eth0", + } + }, } expected_cfgs = { - self.ifcfg_path('eth0'): dedent("""\ + self.ifcfg_path("eth0"): dedent( + """\ BOOTPROTO=none DEVICE=eth0 HWADDR=00:16:3e:60:7c:df @@ -580,8 +628,10 @@ class TestNetCfgDistroRedhat(TestNetCfgDistroBase): ONBOOT=yes TYPE=Ethernet USERCTL=no - """), - self.ifcfg_path('infra0'): dedent("""\ + """ + ), + self.ifcfg_path("infra0"): dedent( + """\ BOOTPROTO=none DEVICE=infra0 IPADDR=10.0.1.2 @@ -591,26 +641,33 @@ class TestNetCfgDistroRedhat(TestNetCfgDistroBase): PHYSDEV=eth0 USERCTL=no VLAN=yes - """), - self.control_path(): dedent("""\ + """ + ), + self.control_path(): dedent( + """\ NETWORKING=yes - """), + """ + ), } self._apply_and_verify( - self.distro.apply_network_config, cfg, - expected_cfgs=expected_cfgs) + self.distro.apply_network_config, cfg, expected_cfgs=expected_cfgs + ) def test_vlan_render(self): cfg = { - 'version': 2, - 'ethernets': { - 'eth0': {'addresses': ["192.10.1.2/24"]}}, - 'vlans': { - 'eth0.1001': {'addresses': ["10.0.1.2/16"], - 'id': 1001, 'link': 'eth0'}}, + "version": 2, + "ethernets": {"eth0": {"addresses": ["192.10.1.2/24"]}}, + "vlans": { + "eth0.1001": { + "addresses": ["10.0.1.2/16"], + "id": 1001, + "link": "eth0", + } + }, } expected_cfgs = { - self.ifcfg_path('eth0'): dedent("""\ + self.ifcfg_path("eth0"): dedent( + """\ BOOTPROTO=none DEVICE=eth0 IPADDR=192.10.1.2 @@ -619,8 +676,10 @@ class TestNetCfgDistroRedhat(TestNetCfgDistroBase): ONBOOT=yes TYPE=Ethernet USERCTL=no - """), - self.ifcfg_path('eth0.1001'): dedent("""\ + """ + ), + self.ifcfg_path("eth0.1001"): dedent( + """\ BOOTPROTO=none DEVICE=eth0.1001 IPADDR=10.0.1.2 @@ -630,32 +689,35 @@ class TestNetCfgDistroRedhat(TestNetCfgDistroBase): PHYSDEV=eth0 USERCTL=no VLAN=yes - """), - self.control_path(): dedent("""\ + """ + ), + self.control_path(): dedent( + """\ NETWORKING=yes - """), + """ + ), } self._apply_and_verify( - self.distro.apply_network_config, cfg, - expected_cfgs=expected_cfgs) + self.distro.apply_network_config, cfg, expected_cfgs=expected_cfgs + ) class TestNetCfgDistroOpensuse(TestNetCfgDistroBase): - def setUp(self): super(TestNetCfgDistroOpensuse, self).setUp() - self.distro = self._get_distro('opensuse', renderers=['sysconfig']) + self.distro = self._get_distro("opensuse", renderers=["sysconfig"]) def ifcfg_path(self, ifname): - return '/etc/sysconfig/network/ifcfg-%s' % ifname + return "/etc/sysconfig/network/ifcfg-%s" % ifname - def _apply_and_verify(self, apply_fn, config, expected_cfgs=None, - bringup=False): + def _apply_and_verify( + self, apply_fn, config, expected_cfgs=None, bringup=False + ): if not expected_cfgs: - raise ValueError('expected_cfg must not be None') + raise ValueError("expected_cfg must not be None") tmpd = None - with mock.patch('cloudinit.net.sysconfig.available') as m_avail: + with mock.patch("cloudinit.net.sysconfig.available") as m_avail: m_avail.return_value = True with self.reRooted(tmpd) as tmpd: apply_fn(config, bringup) @@ -668,52 +730,71 @@ class TestNetCfgDistroOpensuse(TestNetCfgDistroBase): def test_apply_network_config_opensuse(self): """Opensuse uses apply_network_config and renders sysconfig""" expected_cfgs = { - self.ifcfg_path('eth0'): dedent("""\ + self.ifcfg_path("eth0"): dedent( + """\ BOOTPROTO=static IPADDR=192.168.1.5 NETMASK=255.255.255.0 STARTMODE=auto - """), - self.ifcfg_path('eth1'): dedent("""\ + """ + ), + self.ifcfg_path("eth1"): dedent( + """\ BOOTPROTO=dhcp4 STARTMODE=auto - """), + """ + ), } - self._apply_and_verify(self.distro.apply_network_config, - V1_NET_CFG, - expected_cfgs=expected_cfgs.copy()) + self._apply_and_verify( + self.distro.apply_network_config, + V1_NET_CFG, + expected_cfgs=expected_cfgs.copy(), + ) def test_apply_network_config_ipv6_opensuse(self): """Opensuse uses apply_network_config and renders sysconfig w/ipv6""" expected_cfgs = { - self.ifcfg_path('eth0'): dedent("""\ + self.ifcfg_path("eth0"): dedent( + """\ BOOTPROTO=static IPADDR6=2607:f0d0:1002:0011::2/64 STARTMODE=auto - """), - self.ifcfg_path('eth1'): dedent("""\ + """ + ), + self.ifcfg_path("eth1"): dedent( + """\ BOOTPROTO=dhcp4 STARTMODE=auto - """), + """ + ), } - self._apply_and_verify(self.distro.apply_network_config, - V1_NET_CFG_IPV6, - expected_cfgs=expected_cfgs.copy()) + self._apply_and_verify( + self.distro.apply_network_config, + V1_NET_CFG_IPV6, + expected_cfgs=expected_cfgs.copy(), + ) class TestNetCfgDistroArch(TestNetCfgDistroBase): def setUp(self): super(TestNetCfgDistroArch, self).setUp() - self.distro = self._get_distro('arch', renderers=['netplan']) - - def _apply_and_verify(self, apply_fn, config, expected_cfgs=None, - bringup=False, with_netplan=False): + self.distro = self._get_distro("arch", renderers=["netplan"]) + + def _apply_and_verify( + self, + apply_fn, + config, + expected_cfgs=None, + bringup=False, + with_netplan=False, + ): if not expected_cfgs: - raise ValueError('expected_cfg must not be None') + raise ValueError("expected_cfg must not be None") tmpd = None - with mock.patch('cloudinit.net.netplan.available', - return_value=with_netplan): + with mock.patch( + "cloudinit.net.netplan.available", return_value=with_netplan + ): with self.reRooted(tmpd) as tmpd: apply_fn(config, bringup) @@ -728,10 +809,10 @@ class TestNetCfgDistroArch(TestNetCfgDistroBase): self.assertEqual(0o644, get_mode(cfgpath, tmpd)) def netctl_path(self, iface): - return '/etc/netctl/%s' % iface + return "/etc/netctl/%s" % iface def netplan_path(self): - return '/etc/netplan/50-cloud-init.yaml' + return "/etc/netplan/50-cloud-init.yaml" def test_apply_network_config_v1_without_netplan(self): # Note that this is in fact an invalid netctl config: @@ -741,33 +822,40 @@ class TestNetCfgDistroArch(TestNetCfgDistroBase): # still being used in absence of netplan, not the correctness of the # rendered netctl config. expected_cfgs = { - self.netctl_path('eth0'): dedent("""\ + self.netctl_path("eth0"): dedent( + """\ Address=192.168.1.5/255.255.255.0 Connection=ethernet DNS=() Gateway=192.168.1.254 IP=static Interface=eth0 - """), - self.netctl_path('eth1'): dedent("""\ + """ + ), + self.netctl_path("eth1"): dedent( + """\ Address=None/None Connection=ethernet DNS=() Gateway= IP=dhcp Interface=eth1 - """), + """ + ), } # ub_distro.apply_network_config(V1_NET_CFG, False) - self._apply_and_verify(self.distro.apply_network_config, - V1_NET_CFG, - expected_cfgs=expected_cfgs.copy(), - with_netplan=False) + self._apply_and_verify( + self.distro.apply_network_config, + V1_NET_CFG, + expected_cfgs=expected_cfgs.copy(), + with_netplan=False, + ) def test_apply_network_config_v1_with_netplan(self): expected_cfgs = { - self.netplan_path(): dedent("""\ + self.netplan_path(): dedent( + """\ # generated by cloud-init network: version: 2 @@ -778,31 +866,32 @@ class TestNetCfgDistroArch(TestNetCfgDistroBase): gateway4: 192.168.1.254 eth1: dhcp4: true - """), + """ + ), } with mock.patch( - 'cloudinit.net.netplan.get_devicelist', - return_value=[] + "cloudinit.net.netplan.get_devicelist", return_value=[] ): - self._apply_and_verify(self.distro.apply_network_config, - V1_NET_CFG, - expected_cfgs=expected_cfgs.copy(), - with_netplan=True) + self._apply_and_verify( + self.distro.apply_network_config, + V1_NET_CFG, + expected_cfgs=expected_cfgs.copy(), + with_netplan=True, + ) class TestNetCfgDistroPhoton(TestNetCfgDistroBase): - def setUp(self): super(TestNetCfgDistroPhoton, self).setUp() - self.distro = self._get_distro('photon', renderers=['networkd']) + self.distro = self._get_distro("photon", renderers=["networkd"]) def create_conf_dict(self, contents): content_dict = {} for line in contents: if line: line = line.strip() - if line and re.search(r'^\[(.+)\]$', line): + if line and re.search(r"^\[(.+)\]$", line): content_dict[line] = [] key = line elif line: @@ -815,13 +904,14 @@ class TestNetCfgDistroPhoton(TestNetCfgDistroBase): for k, v in actual.items(): self.assertEqual(sorted(expected[k]), sorted(v)) - def _apply_and_verify(self, apply_fn, config, expected_cfgs=None, - bringup=False): + def _apply_and_verify( + self, apply_fn, config, expected_cfgs=None, bringup=False + ): if not expected_cfgs: - raise ValueError('expected_cfg must not be None') + raise ValueError("expected_cfg must not be None") tmpd = None - with mock.patch('cloudinit.net.networkd.available') as m_avail: + with mock.patch("cloudinit.net.networkd.available") as m_avail: m_avail.return_value = True with self.reRooted(tmpd) as tmpd: apply_fn(config, bringup) @@ -833,10 +923,11 @@ class TestNetCfgDistroPhoton(TestNetCfgDistroBase): self.assertEqual(0o644, get_mode(cfgpath, tmpd)) def nwk_file_path(self, ifname): - return '/etc/systemd/network/10-cloud-init-%s.network' % ifname + return "/etc/systemd/network/10-cloud-init-%s.network" % ifname def net_cfg_1(self, ifname): - ret = """\ + ret = ( + """\ [Match] Name=%s [Network] @@ -844,48 +935,53 @@ class TestNetCfgDistroPhoton(TestNetCfgDistroBase): [Address] Address=192.168.1.5/24 [Route] - Gateway=192.168.1.254""" % ifname + Gateway=192.168.1.254""" + % ifname + ) return ret def net_cfg_2(self, ifname): - ret = """\ + ret = ( + """\ [Match] Name=%s [Network] - DHCP=ipv4""" % ifname + DHCP=ipv4""" + % ifname + ) return ret def test_photon_network_config_v1(self): - tmp = self.net_cfg_1('eth0').splitlines() + tmp = self.net_cfg_1("eth0").splitlines() expected_eth0 = self.create_conf_dict(tmp) - tmp = self.net_cfg_2('eth1').splitlines() + tmp = self.net_cfg_2("eth1").splitlines() expected_eth1 = self.create_conf_dict(tmp) expected_cfgs = { - self.nwk_file_path('eth0'): expected_eth0, - self.nwk_file_path('eth1'): expected_eth1, + self.nwk_file_path("eth0"): expected_eth0, + self.nwk_file_path("eth1"): expected_eth1, } - self._apply_and_verify(self.distro.apply_network_config, - V1_NET_CFG, - expected_cfgs.copy()) + self._apply_and_verify( + self.distro.apply_network_config, V1_NET_CFG, expected_cfgs.copy() + ) def test_photon_network_config_v2(self): - tmp = self.net_cfg_1('eth7').splitlines() + tmp = self.net_cfg_1("eth7").splitlines() expected_eth7 = self.create_conf_dict(tmp) - tmp = self.net_cfg_2('eth9').splitlines() + tmp = self.net_cfg_2("eth9").splitlines() expected_eth9 = self.create_conf_dict(tmp) expected_cfgs = { - self.nwk_file_path('eth7'): expected_eth7, - self.nwk_file_path('eth9'): expected_eth9, + self.nwk_file_path("eth7"): expected_eth7, + self.nwk_file_path("eth9"): expected_eth9, } - self._apply_and_verify(self.distro.apply_network_config, - V2_NET_CFG, - expected_cfgs.copy()) + self._apply_and_verify( + self.distro.apply_network_config, V2_NET_CFG, expected_cfgs.copy() + ) def test_photon_network_config_v1_with_duplicates(self): expected = """\ @@ -902,15 +998,16 @@ class TestNetCfgDistroPhoton(TestNetCfgDistroBase): expected = self.create_conf_dict(expected.splitlines()) expected_cfgs = { - self.nwk_file_path('eth0'): expected, + self.nwk_file_path("eth0"): expected, } - self._apply_and_verify(self.distro.apply_network_config, - net_cfg, - expected_cfgs.copy()) + self._apply_and_verify( + self.distro.apply_network_config, net_cfg, expected_cfgs.copy() + ) def get_mode(path, target=None): return os.stat(subp.target_path(target, path)).st_mode & 0o777 + # vi: ts=4 expandtab diff --git a/tests/unittests/distros/test_networking.py b/tests/unittests/distros/test_networking.py index ec508f4d..635f6901 100644 --- a/tests/unittests/distros/test_networking.py +++ b/tests/unittests/distros/test_networking.py @@ -1,3 +1,6 @@ +# See https://docs.pytest.org/en/stable/example +# /parametrize.html#parametrizing-conditional-raising +from contextlib import ExitStack as does_not_raise from unittest import mock import pytest @@ -9,10 +12,6 @@ from cloudinit.distros.networking import ( Networking, ) -# See https://docs.pytest.org/en/stable/example -# /parametrize.html#parametrizing-conditional-raising -from contextlib import ExitStack as does_not_raise - @pytest.yield_fixture def generic_networking_cls(): @@ -35,7 +34,8 @@ def generic_networking_cls(): error = AssertionError("Unexpectedly used /sys in generic networking code") with mock.patch( - "cloudinit.net.get_sys_class_path", side_effect=error, + "cloudinit.net.get_sys_class_path", + side_effect=error, ): yield TestNetworking @@ -91,8 +91,10 @@ class TestLinuxNetworkingTrySetLinkUp: m_is_up.return_value = True is_success = LinuxNetworking().try_set_link_up(devname) - assert (mock.call(['ip', 'link', 'set', devname, 'up']) == - m_subp.call_args_list[-1]) + assert ( + mock.call(["ip", "link", "set", devname, "up"]) + == m_subp.call_args_list[-1] + ) assert is_success def test_calls_subp_return_false(self, m_subp, m_is_up): @@ -100,8 +102,10 @@ class TestLinuxNetworkingTrySetLinkUp: m_is_up.return_value = False is_success = LinuxNetworking().try_set_link_up(devname) - assert (mock.call(['ip', 'link', 'set', devname, 'up']) == - m_subp.call_args_list[-1]) + assert ( + mock.call(["ip", "link", "set", devname, "up"]) + == m_subp.call_args_list[-1] + ) assert not is_success @@ -153,7 +157,9 @@ class TestNetworkingWaitForPhysDevs: return netcfg def test_skips_settle_if_all_present( - self, generic_networking_cls, wait_for_physdevs_netcfg, + self, + generic_networking_cls, + wait_for_physdevs_netcfg, ): networking = generic_networking_cls() with mock.patch.object( @@ -169,7 +175,9 @@ class TestNetworkingWaitForPhysDevs: assert 0 == m_settle.call_count def test_calls_udev_settle_on_missing( - self, generic_networking_cls, wait_for_physdevs_netcfg, + self, + generic_networking_cls, + wait_for_physdevs_netcfg, ): networking = generic_networking_cls() with mock.patch.object( diff --git a/tests/unittests/distros/test_opensuse.py b/tests/unittests/distros/test_opensuse.py index 4ff26102..4a4b266f 100644 --- a/tests/unittests/distros/test_opensuse.py +++ b/tests/unittests/distros/test_opensuse.py @@ -6,7 +6,6 @@ from . import _get_distro class TestopenSUSE(CiTestCase): - def test_get_distro(self): distro = _get_distro("opensuse") - self.assertEqual(distro.osfamily, 'suse') + self.assertEqual(distro.osfamily, "suse") diff --git a/tests/unittests/distros/test_photon.py b/tests/unittests/distros/test_photon.py index 3858f723..fed30c2b 100644 --- a/tests/unittests/distros/test_photon.py +++ b/tests/unittests/distros/test_photon.py @@ -1,34 +1,34 @@ # This file is part of cloud-init. See LICENSE file for license information. -from . import _get_distro from cloudinit import util -from tests.unittests.helpers import mock -from tests.unittests.helpers import CiTestCase +from tests.unittests.helpers import CiTestCase, mock + +from . import _get_distro SYSTEM_INFO = { - 'paths': { - 'cloud_dir': '/var/lib/cloud/', - 'templates_dir': '/etc/cloud/templates/', + "paths": { + "cloud_dir": "/var/lib/cloud/", + "templates_dir": "/etc/cloud/templates/", }, - 'network': {'renderers': 'networkd'}, + "network": {"renderers": "networkd"}, } class TestPhoton(CiTestCase): with_logs = True - distro = _get_distro('photon', SYSTEM_INFO) - expected_log_line = 'Rely on PhotonOS default network config' + distro = _get_distro("photon", SYSTEM_INFO) + expected_log_line = "Rely on PhotonOS default network config" def test_network_renderer(self): - self.assertEqual(self.distro._cfg['network']['renderers'], 'networkd') + self.assertEqual(self.distro._cfg["network"]["renderers"], "networkd") def test_get_distro(self): - self.assertEqual(self.distro.osfamily, 'photon') + self.assertEqual(self.distro.osfamily, "photon") @mock.patch("cloudinit.distros.photon.subp.subp") def test_write_hostname(self, m_subp): - hostname = 'myhostname' - hostfile = self.tmp_path('previous-hostname') + hostname = "myhostname" + hostfile = self.tmp_path("previous-hostname") self.distro._write_hostname(hostname, hostfile) self.assertEqual(hostname, util.load_file(hostfile)) @@ -36,7 +36,7 @@ class TestPhoton(CiTestCase): self.assertEqual(ret, hostname) m_subp.return_value = (None, None) - hostfile += 'hostfile' + hostfile += "hostfile" self.distro._write_hostname(hostname, hostfile) m_subp.return_value = (hostname, None) @@ -44,25 +44,25 @@ class TestPhoton(CiTestCase): self.assertEqual(ret, hostname) self.logs.truncate(0) - m_subp.return_value = (None, 'bla') + m_subp.return_value = (None, "bla") self.distro._write_hostname(hostname, None) - self.assertIn('Error while setting hostname', self.logs.getvalue()) + self.assertIn("Error while setting hostname", self.logs.getvalue()) - @mock.patch('cloudinit.net.generate_fallback_config') + @mock.patch("cloudinit.net.generate_fallback_config") def test_fallback_netcfg(self, m_fallback_cfg): - key = 'disable_fallback_netcfg' + key = "disable_fallback_netcfg" # Don't use fallback if no setting given self.logs.truncate(0) - assert(self.distro.generate_fallback_config() is None) + assert self.distro.generate_fallback_config() is None self.assertIn(self.expected_log_line, self.logs.getvalue()) self.logs.truncate(0) self.distro._cfg[key] = True - assert(self.distro.generate_fallback_config() is None) + assert self.distro.generate_fallback_config() is None self.assertIn(self.expected_log_line, self.logs.getvalue()) self.logs.truncate(0) self.distro._cfg[key] = False - assert(self.distro.generate_fallback_config() is not None) + assert self.distro.generate_fallback_config() is not None self.assertNotIn(self.expected_log_line, self.logs.getvalue()) diff --git a/tests/unittests/distros/test_resolv.py b/tests/unittests/distros/test_resolv.py index e7971627..65e78101 100644 --- a/tests/unittests/distros/test_resolv.py +++ b/tests/unittests/distros/test_resolv.py @@ -1,18 +1,16 @@ # This file is part of cloud-init. See LICENSE file for license information. -from cloudinit.distros.parsers import resolv_conf - -from tests.unittests.helpers import TestCase - import re +from cloudinit.distros.parsers import resolv_conf +from tests.unittests.helpers import TestCase -BASE_RESOLVE = ''' +BASE_RESOLVE = """ ; generated by /sbin/dhclient-script search blah.yahoo.com yahoo.com nameserver 10.15.44.14 nameserver 10.15.30.92 -''' +""" BASE_RESOLVE = BASE_RESOLVE.strip() @@ -27,39 +25,40 @@ class TestResolvHelper(TestCase): self.assertIsNone(rp.local_domain) rp.local_domain = "bob" - self.assertEqual('bob', rp.local_domain) - self.assertIn('domain bob', str(rp)) + self.assertEqual("bob", rp.local_domain) + self.assertIn("domain bob", str(rp)) def test_nameservers(self): rp = resolv_conf.ResolvConf(BASE_RESOLVE) - self.assertIn('10.15.44.14', rp.nameservers) - self.assertIn('10.15.30.92', rp.nameservers) - rp.add_nameserver('10.2') - self.assertIn('10.2', rp.nameservers) - self.assertIn('nameserver 10.2', str(rp)) - self.assertNotIn('10.3', rp.nameservers) + self.assertIn("10.15.44.14", rp.nameservers) + self.assertIn("10.15.30.92", rp.nameservers) + rp.add_nameserver("10.2") + self.assertIn("10.2", rp.nameservers) + self.assertIn("nameserver 10.2", str(rp)) + self.assertNotIn("10.3", rp.nameservers) self.assertEqual(len(rp.nameservers), 3) - rp.add_nameserver('10.2') - rp.add_nameserver('10.3') - self.assertNotIn('10.3', rp.nameservers) + rp.add_nameserver("10.2") + rp.add_nameserver("10.3") + self.assertNotIn("10.3", rp.nameservers) def test_search_domains(self): rp = resolv_conf.ResolvConf(BASE_RESOLVE) - self.assertIn('yahoo.com', rp.search_domains) - self.assertIn('blah.yahoo.com', rp.search_domains) - rp.add_search_domain('bbb.y.com') - self.assertIn('bbb.y.com', rp.search_domains) - self.assertTrue(re.search(r'search(.*)bbb.y.com(.*)', str(rp))) - self.assertIn('bbb.y.com', rp.search_domains) - rp.add_search_domain('bbb.y.com') + self.assertIn("yahoo.com", rp.search_domains) + self.assertIn("blah.yahoo.com", rp.search_domains) + rp.add_search_domain("bbb.y.com") + self.assertIn("bbb.y.com", rp.search_domains) + self.assertTrue(re.search(r"search(.*)bbb.y.com(.*)", str(rp))) + self.assertIn("bbb.y.com", rp.search_domains) + rp.add_search_domain("bbb.y.com") self.assertEqual(len(rp.search_domains), 3) - rp.add_search_domain('bbb2.y.com') + rp.add_search_domain("bbb2.y.com") self.assertEqual(len(rp.search_domains), 4) - rp.add_search_domain('bbb3.y.com') + rp.add_search_domain("bbb3.y.com") self.assertEqual(len(rp.search_domains), 5) - rp.add_search_domain('bbb4.y.com') + rp.add_search_domain("bbb4.y.com") self.assertEqual(len(rp.search_domains), 6) - self.assertRaises(ValueError, rp.add_search_domain, 'bbb5.y.com') + self.assertRaises(ValueError, rp.add_search_domain, "bbb5.y.com") self.assertEqual(len(rp.search_domains), 6) + # vi: ts=4 expandtab diff --git a/tests/unittests/distros/test_sles.py b/tests/unittests/distros/test_sles.py index 04514a19..66b8b13d 100644 --- a/tests/unittests/distros/test_sles.py +++ b/tests/unittests/distros/test_sles.py @@ -6,7 +6,6 @@ from . import _get_distro class TestSLES(CiTestCase): - def test_get_distro(self): distro = _get_distro("sles") - self.assertEqual(distro.osfamily, 'suse') + self.assertEqual(distro.osfamily, "suse") diff --git a/tests/unittests/distros/test_sysconfig.py b/tests/unittests/distros/test_sysconfig.py index 4368496d..d0979e17 100644 --- a/tests/unittests/distros/test_sysconfig.py +++ b/tests/unittests/distros/test_sysconfig.py @@ -3,22 +3,23 @@ import re from cloudinit.distros.parsers.sys_conf import SysConf - from tests.unittests.helpers import TestCase - # Lots of good examples @ # http://content.hccfl.edu/pollock/AUnix1/SysconfigFilesDesc.txt + class TestSysConfHelper(TestCase): # This function was added in 2.7, make it work for 2.6 def assertRegMatches(self, text, regexp): regexp = re.compile(regexp) - self.assertTrue(regexp.search(text), - msg="%s must match %s!" % (text, regexp.pattern)) + self.assertTrue( + regexp.search(text), + msg="%s must match %s!" % (text, regexp.pattern), + ) def test_parse_no_change(self): - contents = '''# A comment + contents = """# A comment USESMBAUTH=no KEYTABLE=/usr/lib/kbd/keytables/us.map SHORTDATE=$(date +%y:%m:%d:%H:%M) @@ -28,59 +29,64 @@ NETMASK0=255.255.255.0 LIST=$LOGROOT/incremental-list IPV6TO4_ROUTING='eth0-:0004::1/64 eth1-:0005::1/64' ETHTOOL_OPTS="-K ${DEVICE} tso on; -G ${DEVICE} rx 256 tx 256" -USEMD5=no''' +USEMD5=no""" conf = SysConf(contents.splitlines()) - self.assertEqual(conf['HOSTNAME'], 'blahblah') - self.assertEqual(conf['SHORTDATE'], '$(date +%y:%m:%d:%H:%M)') + self.assertEqual(conf["HOSTNAME"], "blahblah") + self.assertEqual(conf["SHORTDATE"], "$(date +%y:%m:%d:%H:%M)") # Should be unquoted - self.assertEqual(conf['ETHTOOL_OPTS'], ('-K ${DEVICE} tso on; ' - '-G ${DEVICE} rx 256 tx 256')) + self.assertEqual( + conf["ETHTOOL_OPTS"], + "-K ${DEVICE} tso on; -G ${DEVICE} rx 256 tx 256", + ) self.assertEqual(contents, str(conf)) def test_parse_shell_vars(self): - contents = 'USESMBAUTH=$XYZ' + contents = "USESMBAUTH=$XYZ" conf = SysConf(contents.splitlines()) self.assertEqual(contents, str(conf)) - conf = SysConf('') - conf['B'] = '${ZZ}d apples' + conf = SysConf("") + conf["B"] = "${ZZ}d apples" # Should be quoted self.assertEqual('B="${ZZ}d apples"', str(conf)) - conf = SysConf('') - conf['B'] = '$? d apples' + conf = SysConf("") + conf["B"] = "$? d apples" self.assertEqual('B="$? d apples"', str(conf)) contents = 'IPMI_WATCHDOG_OPTIONS="timeout=60"' conf = SysConf(contents.splitlines()) - self.assertEqual('IPMI_WATCHDOG_OPTIONS=timeout=60', str(conf)) + self.assertEqual("IPMI_WATCHDOG_OPTIONS=timeout=60", str(conf)) def test_parse_adjust(self): contents = 'IPV6TO4_ROUTING="eth0-:0004::1/64 eth1-:0005::1/64"' conf = SysConf(contents.splitlines()) # Should be unquoted - self.assertEqual('eth0-:0004::1/64 eth1-:0005::1/64', - conf['IPV6TO4_ROUTING']) - conf['IPV6TO4_ROUTING'] = "blah \tblah" + self.assertEqual( + "eth0-:0004::1/64 eth1-:0005::1/64", conf["IPV6TO4_ROUTING"] + ) + conf["IPV6TO4_ROUTING"] = "blah \tblah" contents2 = str(conf).strip() # Should be requoted due to whitespace - self.assertRegMatches(contents2, - r'IPV6TO4_ROUTING=[\']blah\s+blah[\']') + self.assertRegMatches( + contents2, r"IPV6TO4_ROUTING=[\']blah\s+blah[\']" + ) def test_parse_no_adjust_shell(self): - conf = SysConf(''.splitlines()) - conf['B'] = ' $(time)' + conf = SysConf("".splitlines()) + conf["B"] = " $(time)" contents = str(conf) - self.assertEqual('B= $(time)', contents) + self.assertEqual("B= $(time)", contents) def test_parse_empty(self): - contents = '' + contents = "" conf = SysConf(contents.splitlines()) - self.assertEqual('', str(conf).strip()) + self.assertEqual("", str(conf).strip()) def test_parse_add_new(self): - contents = 'BLAH=b' + contents = "BLAH=b" conf = SysConf(contents.splitlines()) - conf['Z'] = 'd' + conf["Z"] = "d" contents = str(conf) self.assertIn("Z=d", contents) self.assertIn("BLAH=b", contents) + # vi: ts=4 expandtab diff --git a/tests/unittests/distros/test_user_data_normalize.py b/tests/unittests/distros/test_user_data_normalize.py index bd8f2adb..67ea024b 100644 --- a/tests/unittests/distros/test_user_data_normalize.py +++ b/tests/unittests/distros/test_user_data_normalize.py @@ -2,251 +2,233 @@ from unittest import mock -from cloudinit import distros +from cloudinit import distros, helpers, settings from cloudinit.distros import ug_util -from cloudinit import helpers -from cloudinit import settings - from tests.unittests.helpers import TestCase - bcfg = { - 'name': 'bob', - 'plain_text_passwd': 'ubuntu', - 'home': "/home/ubuntu", - 'shell': "/bin/bash", - 'lock_passwd': True, - 'gecos': "Ubuntu", - 'groups': ["foo"] + "name": "bob", + "plain_text_passwd": "ubuntu", + "home": "/home/ubuntu", + "shell": "/bin/bash", + "lock_passwd": True, + "gecos": "Ubuntu", + "groups": ["foo"], } class TestUGNormalize(TestCase): - def setUp(self): super(TestUGNormalize, self).setUp() - self.add_patch('cloudinit.util.system_is_snappy', 'm_snappy') + self.add_patch("cloudinit.util.system_is_snappy", "m_snappy") def _make_distro(self, dtype, def_user=None): cfg = dict(settings.CFG_BUILTIN) - cfg['system_info']['distro'] = dtype - paths = helpers.Paths(cfg['system_info']['paths']) + cfg["system_info"]["distro"] = dtype + paths = helpers.Paths(cfg["system_info"]["paths"]) distro_cls = distros.fetch(dtype) if def_user: - cfg['system_info']['default_user'] = def_user.copy() - distro = distro_cls(dtype, cfg['system_info'], paths) + cfg["system_info"]["default_user"] = def_user.copy() + distro = distro_cls(dtype, cfg["system_info"], paths) return distro def _norm(self, cfg, distro): return ug_util.normalize_users_groups(cfg, distro) def test_group_dict(self): - distro = self._make_distro('ubuntu') - g = {'groups': - [{'ubuntu': ['foo', 'bar'], - 'bob': 'users'}, - 'cloud-users', - {'bob': 'users2'}]} + distro = self._make_distro("ubuntu") + g = { + "groups": [ + {"ubuntu": ["foo", "bar"], "bob": "users"}, + "cloud-users", + {"bob": "users2"}, + ] + } (_users, groups) = self._norm(g, distro) - self.assertIn('ubuntu', groups) - ub_members = groups['ubuntu'] - self.assertEqual(sorted(['foo', 'bar']), sorted(ub_members)) - self.assertIn('bob', groups) - b_members = groups['bob'] - self.assertEqual(sorted(['users', 'users2']), - sorted(b_members)) + self.assertIn("ubuntu", groups) + ub_members = groups["ubuntu"] + self.assertEqual(sorted(["foo", "bar"]), sorted(ub_members)) + self.assertIn("bob", groups) + b_members = groups["bob"] + self.assertEqual(sorted(["users", "users2"]), sorted(b_members)) def test_basic_groups(self): - distro = self._make_distro('ubuntu') + distro = self._make_distro("ubuntu") ug_cfg = { - 'groups': ['bob'], + "groups": ["bob"], } (users, groups) = self._norm(ug_cfg, distro) - self.assertIn('bob', groups) + self.assertIn("bob", groups) self.assertEqual({}, users) def test_csv_groups(self): - distro = self._make_distro('ubuntu') + distro = self._make_distro("ubuntu") ug_cfg = { - 'groups': 'bob,joe,steve', + "groups": "bob,joe,steve", } (users, groups) = self._norm(ug_cfg, distro) - self.assertIn('bob', groups) - self.assertIn('joe', groups) - self.assertIn('steve', groups) + self.assertIn("bob", groups) + self.assertIn("joe", groups) + self.assertIn("steve", groups) self.assertEqual({}, users) def test_more_groups(self): - distro = self._make_distro('ubuntu') - ug_cfg = { - 'groups': ['bob', 'joe', 'steve'] - } + distro = self._make_distro("ubuntu") + ug_cfg = {"groups": ["bob", "joe", "steve"]} (users, groups) = self._norm(ug_cfg, distro) - self.assertIn('bob', groups) - self.assertIn('joe', groups) - self.assertIn('steve', groups) + self.assertIn("bob", groups) + self.assertIn("joe", groups) + self.assertIn("steve", groups) self.assertEqual({}, users) def test_member_groups(self): - distro = self._make_distro('ubuntu') + distro = self._make_distro("ubuntu") ug_cfg = { - 'groups': { - 'bob': ['s'], - 'joe': [], - 'steve': [], + "groups": { + "bob": ["s"], + "joe": [], + "steve": [], } } (users, groups) = self._norm(ug_cfg, distro) - self.assertIn('bob', groups) - self.assertEqual(['s'], groups['bob']) - self.assertEqual([], groups['joe']) - self.assertIn('joe', groups) - self.assertIn('steve', groups) + self.assertIn("bob", groups) + self.assertEqual(["s"], groups["bob"]) + self.assertEqual([], groups["joe"]) + self.assertIn("joe", groups) + self.assertIn("steve", groups) self.assertEqual({}, users) def test_users_simple_dict(self): - distro = self._make_distro('ubuntu', bcfg) + distro = self._make_distro("ubuntu", bcfg) ug_cfg = { - 'users': { - 'default': True, + "users": { + "default": True, } } (users, _groups) = self._norm(ug_cfg, distro) - self.assertIn('bob', users) + self.assertIn("bob", users) ug_cfg = { - 'users': { - 'default': 'yes', + "users": { + "default": "yes", } } (users, _groups) = self._norm(ug_cfg, distro) - self.assertIn('bob', users) + self.assertIn("bob", users) ug_cfg = { - 'users': { - 'default': '1', + "users": { + "default": "1", } } (users, _groups) = self._norm(ug_cfg, distro) - self.assertIn('bob', users) + self.assertIn("bob", users) def test_users_simple_dict_no(self): - distro = self._make_distro('ubuntu', bcfg) + distro = self._make_distro("ubuntu", bcfg) ug_cfg = { - 'users': { - 'default': False, + "users": { + "default": False, } } (users, _groups) = self._norm(ug_cfg, distro) self.assertEqual({}, users) ug_cfg = { - 'users': { - 'default': 'no', + "users": { + "default": "no", } } (users, _groups) = self._norm(ug_cfg, distro) self.assertEqual({}, users) def test_users_simple_csv(self): - distro = self._make_distro('ubuntu') + distro = self._make_distro("ubuntu") ug_cfg = { - 'users': 'joe,bob', + "users": "joe,bob", } (users, _groups) = self._norm(ug_cfg, distro) - self.assertIn('joe', users) - self.assertIn('bob', users) - self.assertEqual({'default': False}, users['joe']) - self.assertEqual({'default': False}, users['bob']) + self.assertIn("joe", users) + self.assertIn("bob", users) + self.assertEqual({"default": False}, users["joe"]) + self.assertEqual({"default": False}, users["bob"]) def test_users_simple(self): - distro = self._make_distro('ubuntu') + distro = self._make_distro("ubuntu") ug_cfg = { - 'users': [ - 'joe', - 'bob' - ], + "users": ["joe", "bob"], } (users, _groups) = self._norm(ug_cfg, distro) - self.assertIn('joe', users) - self.assertIn('bob', users) - self.assertEqual({'default': False}, users['joe']) - self.assertEqual({'default': False}, users['bob']) + self.assertIn("joe", users) + self.assertIn("bob", users) + self.assertEqual({"default": False}, users["joe"]) + self.assertEqual({"default": False}, users["bob"]) def test_users_old_user(self): - distro = self._make_distro('ubuntu', bcfg) - ug_cfg = { - 'user': 'zetta', - 'users': 'default' - } + distro = self._make_distro("ubuntu", bcfg) + ug_cfg = {"user": "zetta", "users": "default"} (users, _groups) = self._norm(ug_cfg, distro) - self.assertNotIn('bob', users) # Bob is not the default now, zetta is - self.assertIn('zetta', users) - self.assertTrue(users['zetta']['default']) - self.assertNotIn('default', users) - ug_cfg = { - 'user': 'zetta', - 'users': 'default, joe' - } + self.assertNotIn("bob", users) # Bob is not the default now, zetta is + self.assertIn("zetta", users) + self.assertTrue(users["zetta"]["default"]) + self.assertNotIn("default", users) + ug_cfg = {"user": "zetta", "users": "default, joe"} (users, _groups) = self._norm(ug_cfg, distro) - self.assertNotIn('bob', users) # Bob is not the default now, zetta is - self.assertIn('joe', users) - self.assertIn('zetta', users) - self.assertTrue(users['zetta']['default']) - self.assertNotIn('default', users) - ug_cfg = { - 'user': 'zetta', - 'users': ['bob', 'joe'] - } + self.assertNotIn("bob", users) # Bob is not the default now, zetta is + self.assertIn("joe", users) + self.assertIn("zetta", users) + self.assertTrue(users["zetta"]["default"]) + self.assertNotIn("default", users) + ug_cfg = {"user": "zetta", "users": ["bob", "joe"]} (users, _groups) = self._norm(ug_cfg, distro) - self.assertIn('bob', users) - self.assertIn('joe', users) - self.assertIn('zetta', users) - self.assertTrue(users['zetta']['default']) + self.assertIn("bob", users) + self.assertIn("joe", users) + self.assertIn("zetta", users) + self.assertTrue(users["zetta"]["default"]) ug_cfg = { - 'user': 'zetta', - 'users': { - 'bob': True, - 'joe': True, - } + "user": "zetta", + "users": { + "bob": True, + "joe": True, + }, } (users, _groups) = self._norm(ug_cfg, distro) - self.assertIn('bob', users) - self.assertIn('joe', users) - self.assertIn('zetta', users) - self.assertTrue(users['zetta']['default']) + self.assertIn("bob", users) + self.assertIn("joe", users) + self.assertIn("zetta", users) + self.assertTrue(users["zetta"]["default"]) ug_cfg = { - 'user': 'zetta', + "user": "zetta", } (users, _groups) = self._norm(ug_cfg, distro) - self.assertIn('zetta', users) + self.assertIn("zetta", users) ug_cfg = {} (users, groups) = self._norm(ug_cfg, distro) self.assertEqual({}, users) self.assertEqual({}, groups) def test_users_dict_default_additional(self): - distro = self._make_distro('ubuntu', bcfg) + distro = self._make_distro("ubuntu", bcfg) ug_cfg = { - 'users': [ - {'name': 'default', 'blah': True} - ], + "users": [{"name": "default", "blah": True}], } (users, _groups) = self._norm(ug_cfg, distro) - self.assertIn('bob', users) - self.assertEqual(",".join(distro.get_default_user()['groups']), - users['bob']['groups']) - self.assertEqual(True, users['bob']['blah']) - self.assertEqual(True, users['bob']['default']) + self.assertIn("bob", users) + self.assertEqual( + ",".join(distro.get_default_user()["groups"]), + users["bob"]["groups"], + ) + self.assertEqual(True, users["bob"]["blah"]) + self.assertEqual(True, users["bob"]["default"]) def test_users_dict_extract(self): - distro = self._make_distro('ubuntu', bcfg) + distro = self._make_distro("ubuntu", bcfg) ug_cfg = { - 'users': [ - 'default', + "users": [ + "default", ], } (users, _groups) = self._norm(ug_cfg, distro) - self.assertIn('bob', users) + self.assertIn("bob", users) (name, config) = ug_util.extract_default(users) - self.assertEqual(name, 'bob') + self.assertEqual(name, "bob") expected_config = {} def_config = None try: @@ -258,115 +240,126 @@ class TestUGNormalize(TestCase): expected_config.update(def_config) # Ignore these for now - expected_config.pop('name', None) - expected_config.pop('groups', None) - config.pop('groups', None) + expected_config.pop("name", None) + expected_config.pop("groups", None) + config.pop("groups", None) self.assertEqual(config, expected_config) def test_users_dict_default(self): - distro = self._make_distro('ubuntu', bcfg) + distro = self._make_distro("ubuntu", bcfg) ug_cfg = { - 'users': [ - 'default', + "users": [ + "default", ], } (users, _groups) = self._norm(ug_cfg, distro) - self.assertIn('bob', users) - self.assertEqual(",".join(distro.get_default_user()['groups']), - users['bob']['groups']) - self.assertEqual(True, users['bob']['default']) + self.assertIn("bob", users) + self.assertEqual( + ",".join(distro.get_default_user()["groups"]), + users["bob"]["groups"], + ) + self.assertEqual(True, users["bob"]["default"]) def test_users_dict_trans(self): - distro = self._make_distro('ubuntu') + distro = self._make_distro("ubuntu") ug_cfg = { - 'users': [ - {'name': 'joe', - 'tr-me': True}, - {'name': 'bob'}, + "users": [ + {"name": "joe", "tr-me": True}, + {"name": "bob"}, ], } (users, _groups) = self._norm(ug_cfg, distro) - self.assertIn('joe', users) - self.assertIn('bob', users) - self.assertEqual({'tr_me': True, 'default': False}, users['joe']) - self.assertEqual({'default': False}, users['bob']) + self.assertIn("joe", users) + self.assertIn("bob", users) + self.assertEqual({"tr_me": True, "default": False}, users["joe"]) + self.assertEqual({"default": False}, users["bob"]) def test_users_dict(self): - distro = self._make_distro('ubuntu') + distro = self._make_distro("ubuntu") ug_cfg = { - 'users': [ - {'name': 'joe'}, - {'name': 'bob'}, + "users": [ + {"name": "joe"}, + {"name": "bob"}, ], } (users, _groups) = self._norm(ug_cfg, distro) - self.assertIn('joe', users) - self.assertIn('bob', users) - self.assertEqual({'default': False}, users['joe']) - self.assertEqual({'default': False}, users['bob']) + self.assertIn("joe", users) + self.assertIn("bob", users) + self.assertEqual({"default": False}, users["joe"]) + self.assertEqual({"default": False}, users["bob"]) - @mock.patch('cloudinit.subp.subp') + @mock.patch("cloudinit.subp.subp") def test_create_snap_user(self, mock_subp): - mock_subp.side_effect = [('{"username": "joe", "ssh-key-count": 1}\n', - '')] - distro = self._make_distro('ubuntu') + mock_subp.side_effect = [ + ('{"username": "joe", "ssh-key-count": 1}\n', "") + ] + distro = self._make_distro("ubuntu") ug_cfg = { - 'users': [ - {'name': 'joe', 'snapuser': 'joe@joe.com'}, + "users": [ + {"name": "joe", "snapuser": "joe@joe.com"}, ], } (users, _groups) = self._norm(ug_cfg, distro) for (user, config) in users.items(): - print('user=%s config=%s' % (user, config)) + print("user=%s config=%s" % (user, config)) username = distro.create_user(user, **config) - snapcmd = ['snap', 'create-user', '--sudoer', '--json', 'joe@joe.com'] + snapcmd = ["snap", "create-user", "--sudoer", "--json", "joe@joe.com"] mock_subp.assert_called_with(snapcmd, capture=True, logstring=snapcmd) - self.assertEqual(username, 'joe') + self.assertEqual(username, "joe") - @mock.patch('cloudinit.subp.subp') + @mock.patch("cloudinit.subp.subp") def test_create_snap_user_known(self, mock_subp): - mock_subp.side_effect = [('{"username": "joe", "ssh-key-count": 1}\n', - '')] - distro = self._make_distro('ubuntu') + mock_subp.side_effect = [ + ('{"username": "joe", "ssh-key-count": 1}\n', "") + ] + distro = self._make_distro("ubuntu") ug_cfg = { - 'users': [ - {'name': 'joe', 'snapuser': 'joe@joe.com', 'known': True}, + "users": [ + {"name": "joe", "snapuser": "joe@joe.com", "known": True}, ], } (users, _groups) = self._norm(ug_cfg, distro) for (user, config) in users.items(): - print('user=%s config=%s' % (user, config)) + print("user=%s config=%s" % (user, config)) username = distro.create_user(user, **config) - snapcmd = ['snap', 'create-user', '--sudoer', '--json', '--known', - 'joe@joe.com'] + snapcmd = [ + "snap", + "create-user", + "--sudoer", + "--json", + "--known", + "joe@joe.com", + ] mock_subp.assert_called_with(snapcmd, capture=True, logstring=snapcmd) - self.assertEqual(username, 'joe') - - @mock.patch('cloudinit.util.system_is_snappy') - @mock.patch('cloudinit.util.is_group') - @mock.patch('cloudinit.subp.subp') - def test_add_user_on_snappy_system(self, mock_subp, mock_isgrp, - mock_snappy): + self.assertEqual(username, "joe") + + @mock.patch("cloudinit.util.system_is_snappy") + @mock.patch("cloudinit.util.is_group") + @mock.patch("cloudinit.subp.subp") + def test_add_user_on_snappy_system( + self, mock_subp, mock_isgrp, mock_snappy + ): mock_isgrp.return_value = False mock_subp.return_value = True mock_snappy.return_value = True - distro = self._make_distro('ubuntu') + distro = self._make_distro("ubuntu") ug_cfg = { - 'users': [ - {'name': 'joe', 'groups': 'users', 'create_groups': True}, + "users": [ + {"name": "joe", "groups": "users", "create_groups": True}, ], } (users, _groups) = self._norm(ug_cfg, distro) for (user, config) in users.items(): - print('user=%s config=%s' % (user, config)) + print("user=%s config=%s" % (user, config)) distro.add_user(user, **config) - groupcmd = ['groupadd', 'users', '--extrausers'] - addcmd = ['useradd', 'joe', '--extrausers', '--groups', 'users', '-m'] + groupcmd = ["groupadd", "users", "--extrausers"] + addcmd = ["useradd", "joe", "--extrausers", "--groups", "users", "-m"] mock_subp.assert_any_call(groupcmd) mock_subp.assert_any_call(addcmd, logstring=addcmd) + # vi: ts=4 expandtab |