summaryrefslogtreecommitdiff
path: root/tests/unittests/distros
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/distros')
-rw-r--r--tests/unittests/distros/__init__.py10
-rw-r--r--tests/unittests/distros/test_arch.py50
-rw-r--r--tests/unittests/distros/test_bsd_utils.py49
-rw-r--r--tests/unittests/distros/test_create_users.py252
-rw-r--r--tests/unittests/distros/test_debian.py155
-rw-r--r--tests/unittests/distros/test_freebsd.py28
-rw-r--r--tests/unittests/distros/test_generic.py300
-rw-r--r--tests/unittests/distros/test_gentoo.py11
-rw-r--r--tests/unittests/distros/test_hostname.py16
-rw-r--r--tests/unittests/distros/test_hosts.py36
-rw-r--r--tests/unittests/distros/test_init.py273
-rw-r--r--tests/unittests/distros/test_manage_service.py33
-rw-r--r--tests/unittests/distros/test_netbsd.py11
-rw-r--r--tests/unittests/distros/test_netconfig.py605
-rw-r--r--tests/unittests/distros/test_networking.py30
-rw-r--r--tests/unittests/distros/test_opensuse.py3
-rw-r--r--tests/unittests/distros/test_photon.py42
-rw-r--r--tests/unittests/distros/test_resolv.py55
-rw-r--r--tests/unittests/distros/test_sles.py3
-rw-r--r--tests/unittests/distros/test_sysconfig.py62
-rw-r--r--tests/unittests/distros/test_user_data_normalize.py383
21 files changed, 1379 insertions, 1028 deletions
diff --git a/tests/unittests/distros/__init__.py b/tests/unittests/distros/__init__.py
index 5394aa56..e66b9446 100644
--- a/tests/unittests/distros/__init__.py
+++ b/tests/unittests/distros/__init__.py
@@ -1,9 +1,7 @@
# This file is part of cloud-init. See LICENSE file for license information.
import copy
-from cloudinit import distros
-from cloudinit import helpers
-from cloudinit import settings
+from cloudinit import distros, helpers, settings
def _get_distro(dtype, system_info=None):
@@ -14,8 +12,8 @@ def _get_distro(dtype, system_info=None):
example: _get_distro("debian")
"""
if system_info is None:
- system_info = copy.deepcopy(settings.CFG_BUILTIN['system_info'])
- system_info['distro'] = dtype
- paths = helpers.Paths(system_info['paths'])
+ system_info = copy.deepcopy(settings.CFG_BUILTIN["system_info"])
+ system_info["distro"] = dtype
+ paths = helpers.Paths(system_info["paths"])
distro_cls = distros.fetch(dtype)
return distro_cls(dtype, system_info, paths)
diff --git a/tests/unittests/distros/test_arch.py b/tests/unittests/distros/test_arch.py
index 590ba00e..5446295e 100644
--- a/tests/unittests/distros/test_arch.py
+++ b/tests/unittests/distros/test_arch.py
@@ -1,15 +1,13 @@
# This file is part of cloud-init. See LICENSE file for license information.
-from cloudinit.distros.arch import _render_network
from cloudinit import util
-
-from tests.unittests.helpers import (CiTestCase, dir2dict)
+from cloudinit.distros.arch import _render_network
+from tests.unittests.helpers import CiTestCase, dir2dict
from . import _get_distro
class TestArch(CiTestCase):
-
def test_get_distro(self):
distro = _get_distro("arch")
hostname = "myhostname"
@@ -23,23 +21,35 @@ class TestRenderNetwork(CiTestCase):
"""Just the most basic static config.
note 'lo' should not be rendered as an interface."""
- entries = {'eth0': {'auto': True,
- 'dns-nameservers': ['8.8.8.8'],
- 'bootproto': 'static',
- 'address': '10.0.0.2',
- 'gateway': '10.0.0.1',
- 'netmask': '255.255.255.0'},
- 'lo': {'auto': True}}
+ entries = {
+ "eth0": {
+ "auto": True,
+ "dns-nameservers": ["8.8.8.8"],
+ "bootproto": "static",
+ "address": "10.0.0.2",
+ "gateway": "10.0.0.1",
+ "netmask": "255.255.255.0",
+ },
+ "lo": {"auto": True},
+ }
target = self.tmp_dir()
devs = _render_network(entries, target=target)
files = dir2dict(target, prefix=target)
- self.assertEqual(['eth0'], devs)
+ self.assertEqual(["eth0"], devs)
self.assertEqual(
- {'/etc/netctl/eth0': '\n'.join([
- "Address=10.0.0.2/255.255.255.0",
- "Connection=ethernet",
- "DNS=('8.8.8.8')",
- "Gateway=10.0.0.1",
- "IP=static",
- "Interface=eth0", ""]),
- '/etc/resolv.conf': 'nameserver 8.8.8.8\n'}, files)
+ {
+ "/etc/netctl/eth0": "\n".join(
+ [
+ "Address=10.0.0.2/255.255.255.0",
+ "Connection=ethernet",
+ "DNS=('8.8.8.8')",
+ "Gateway=10.0.0.1",
+ "IP=static",
+ "Interface=eth0",
+ "",
+ ]
+ ),
+ "/etc/resolv.conf": "nameserver 8.8.8.8\n",
+ },
+ files,
+ )
diff --git a/tests/unittests/distros/test_bsd_utils.py b/tests/unittests/distros/test_bsd_utils.py
index 55686dc9..d6f0aeed 100644
--- a/tests/unittests/distros/test_bsd_utils.py
+++ b/tests/unittests/distros/test_bsd_utils.py
@@ -1,8 +1,7 @@
# This file is part of cloud-init. See LICENSE file for license information.
import cloudinit.distros.bsd_utils as bsd_utils
-
-from tests.unittests.helpers import (CiTestCase, ExitStack, mock)
+from tests.unittests.helpers import CiTestCase, ExitStack, mock
RC_FILE = """
if something; then
@@ -13,55 +12,55 @@ hostname={hostname}
class TestBsdUtils(CiTestCase):
-
def setUp(self):
super().setUp()
patches = ExitStack()
self.addCleanup(patches.close)
self.load_file = patches.enter_context(
- mock.patch.object(bsd_utils.util, 'load_file'))
+ mock.patch.object(bsd_utils.util, "load_file")
+ )
self.write_file = patches.enter_context(
- mock.patch.object(bsd_utils.util, 'write_file'))
+ mock.patch.object(bsd_utils.util, "write_file")
+ )
def test_get_rc_config_value(self):
- self.load_file.return_value = 'hostname=foo\n'
- self.assertEqual(bsd_utils.get_rc_config_value('hostname'), 'foo')
- self.load_file.assert_called_with('/etc/rc.conf')
+ self.load_file.return_value = "hostname=foo\n"
+ self.assertEqual(bsd_utils.get_rc_config_value("hostname"), "foo")
+ self.load_file.assert_called_with("/etc/rc.conf")
- self.load_file.return_value = 'hostname=foo'
- self.assertEqual(bsd_utils.get_rc_config_value('hostname'), 'foo')
+ self.load_file.return_value = "hostname=foo"
+ self.assertEqual(bsd_utils.get_rc_config_value("hostname"), "foo")
self.load_file.return_value = 'hostname="foo"'
- self.assertEqual(bsd_utils.get_rc_config_value('hostname'), 'foo')
+ self.assertEqual(bsd_utils.get_rc_config_value("hostname"), "foo")
self.load_file.return_value = "hostname='foo'"
- self.assertEqual(bsd_utils.get_rc_config_value('hostname'), 'foo')
+ self.assertEqual(bsd_utils.get_rc_config_value("hostname"), "foo")
- self.load_file.return_value = 'hostname=\'foo"'
- self.assertEqual(bsd_utils.get_rc_config_value('hostname'), "'foo\"")
+ self.load_file.return_value = "hostname='foo\""
+ self.assertEqual(bsd_utils.get_rc_config_value("hostname"), "'foo\"")
- self.load_file.return_value = ''
- self.assertEqual(bsd_utils.get_rc_config_value('hostname'), None)
+ self.load_file.return_value = ""
+ self.assertEqual(bsd_utils.get_rc_config_value("hostname"), None)
- self.load_file.return_value = RC_FILE.format(hostname='foo')
- self.assertEqual(bsd_utils.get_rc_config_value('hostname'), "foo")
+ self.load_file.return_value = RC_FILE.format(hostname="foo")
+ self.assertEqual(bsd_utils.get_rc_config_value("hostname"), "foo")
def test_set_rc_config_value_unchanged(self):
# bsd_utils.set_rc_config_value('hostname', 'foo')
# self.write_file.assert_called_with('/etc/rc.conf', 'hostname=foo\n')
- self.load_file.return_value = RC_FILE.format(hostname='foo')
+ self.load_file.return_value = RC_FILE.format(hostname="foo")
self.write_file.assert_not_called()
def test_set_rc_config_value(self):
- bsd_utils.set_rc_config_value('hostname', 'foo')
- self.write_file.assert_called_with('/etc/rc.conf', 'hostname=foo\n')
+ bsd_utils.set_rc_config_value("hostname", "foo")
+ self.write_file.assert_called_with("/etc/rc.conf", "hostname=foo\n")
- self.load_file.return_value = RC_FILE.format(hostname='foo')
- bsd_utils.set_rc_config_value('hostname', 'bar')
+ self.load_file.return_value = RC_FILE.format(hostname="foo")
+ bsd_utils.set_rc_config_value("hostname", "bar")
self.write_file.assert_called_with(
- '/etc/rc.conf',
- RC_FILE.format(hostname='bar')
+ "/etc/rc.conf", RC_FILE.format(hostname="bar")
)
diff --git a/tests/unittests/distros/test_create_users.py b/tests/unittests/distros/test_create_users.py
index 5baa8a4b..ddb039bd 100644
--- a/tests/unittests/distros/test_create_users.py
+++ b/tests/unittests/distros/test_create_users.py
@@ -2,9 +2,8 @@
import re
-from cloudinit import distros
-from cloudinit import ssh_util
-from tests.unittests.helpers import (CiTestCase, mock)
+from cloudinit import distros, ssh_util
+from tests.unittests.helpers import CiTestCase, mock
from tests.unittests.util import abstract_to_concrete
@@ -17,220 +16,267 @@ class TestCreateUser(CiTestCase):
def setUp(self):
super(TestCreateUser, self).setUp()
self.dist = abstract_to_concrete(distros.Distro)(
- name='test', cfg=None, paths=None
+ name="test", cfg=None, paths=None
)
def _useradd2call(self, args):
# return a mock call for the useradd command in args
# with expected 'logstring'.
- args = ['useradd'] + args
+ args = ["useradd"] + args
logcmd = [a for a in args]
for i in range(len(args)):
- if args[i] in ('--password',):
- logcmd[i + 1] = 'REDACTED'
+ if args[i] in ("--password",):
+ logcmd[i + 1] = "REDACTED"
return mock.call(args, logstring=logcmd)
def test_basic(self, m_subp, m_is_snappy):
- user = 'foouser'
+ user = "foouser"
self.dist.create_user(user)
self.assertEqual(
m_subp.call_args_list,
- [self._useradd2call([user, '-m']),
- mock.call(['passwd', '-l', user])])
+ [
+ self._useradd2call([user, "-m"]),
+ mock.call(["passwd", "-l", user]),
+ ],
+ )
def test_no_home(self, m_subp, m_is_snappy):
- user = 'foouser'
+ user = "foouser"
self.dist.create_user(user, no_create_home=True)
self.assertEqual(
m_subp.call_args_list,
- [self._useradd2call([user, '-M']),
- mock.call(['passwd', '-l', user])])
+ [
+ self._useradd2call([user, "-M"]),
+ mock.call(["passwd", "-l", user]),
+ ],
+ )
def test_system_user(self, m_subp, m_is_snappy):
# system user should have no home and get --system
- user = 'foouser'
+ user = "foouser"
self.dist.create_user(user, system=True)
self.assertEqual(
m_subp.call_args_list,
- [self._useradd2call([user, '--system', '-M']),
- mock.call(['passwd', '-l', user])])
+ [
+ self._useradd2call([user, "--system", "-M"]),
+ mock.call(["passwd", "-l", user]),
+ ],
+ )
def test_explicit_no_home_false(self, m_subp, m_is_snappy):
- user = 'foouser'
+ user = "foouser"
self.dist.create_user(user, no_create_home=False)
self.assertEqual(
m_subp.call_args_list,
- [self._useradd2call([user, '-m']),
- mock.call(['passwd', '-l', user])])
+ [
+ self._useradd2call([user, "-m"]),
+ mock.call(["passwd", "-l", user]),
+ ],
+ )
def test_unlocked(self, m_subp, m_is_snappy):
- user = 'foouser'
+ user = "foouser"
self.dist.create_user(user, lock_passwd=False)
self.assertEqual(
- m_subp.call_args_list,
- [self._useradd2call([user, '-m'])])
+ m_subp.call_args_list, [self._useradd2call([user, "-m"])]
+ )
def test_set_password(self, m_subp, m_is_snappy):
- user = 'foouser'
- password = 'passfoo'
+ user = "foouser"
+ password = "passfoo"
self.dist.create_user(user, passwd=password)
self.assertEqual(
m_subp.call_args_list,
- [self._useradd2call([user, '--password', password, '-m']),
- mock.call(['passwd', '-l', user])])
+ [
+ self._useradd2call([user, "--password", password, "-m"]),
+ mock.call(["passwd", "-l", user]),
+ ],
+ )
@mock.patch("cloudinit.distros.util.is_group")
def test_group_added(self, m_is_group, m_subp, m_is_snappy):
m_is_group.return_value = False
- user = 'foouser'
- self.dist.create_user(user, groups=['group1'])
+ user = "foouser"
+ self.dist.create_user(user, groups=["group1"])
expected = [
- mock.call(['groupadd', 'group1']),
- self._useradd2call([user, '--groups', 'group1', '-m']),
- mock.call(['passwd', '-l', user])]
+ mock.call(["groupadd", "group1"]),
+ self._useradd2call([user, "--groups", "group1", "-m"]),
+ mock.call(["passwd", "-l", user]),
+ ]
self.assertEqual(m_subp.call_args_list, expected)
@mock.patch("cloudinit.distros.util.is_group")
def test_only_new_group_added(self, m_is_group, m_subp, m_is_snappy):
- ex_groups = ['existing_group']
- groups = ['group1', ex_groups[0]]
+ ex_groups = ["existing_group"]
+ groups = ["group1", ex_groups[0]]
m_is_group.side_effect = lambda m: m in ex_groups
- user = 'foouser'
+ user = "foouser"
self.dist.create_user(user, groups=groups)
expected = [
- mock.call(['groupadd', 'group1']),
- self._useradd2call([user, '--groups', ','.join(groups), '-m']),
- mock.call(['passwd', '-l', user])]
+ mock.call(["groupadd", "group1"]),
+ self._useradd2call([user, "--groups", ",".join(groups), "-m"]),
+ mock.call(["passwd", "-l", user]),
+ ]
self.assertEqual(m_subp.call_args_list, expected)
@mock.patch("cloudinit.distros.util.is_group")
def test_create_groups_with_whitespace_string(
- self, m_is_group, m_subp, m_is_snappy):
+ self, m_is_group, m_subp, m_is_snappy
+ ):
# groups supported as a comma delimeted string even with white space
m_is_group.return_value = False
- user = 'foouser'
- self.dist.create_user(user, groups='group1, group2')
+ user = "foouser"
+ self.dist.create_user(user, groups="group1, group2")
expected = [
- mock.call(['groupadd', 'group1']),
- mock.call(['groupadd', 'group2']),
- self._useradd2call([user, '--groups', 'group1,group2', '-m']),
- mock.call(['passwd', '-l', user])]
+ mock.call(["groupadd", "group1"]),
+ mock.call(["groupadd", "group2"]),
+ self._useradd2call([user, "--groups", "group1,group2", "-m"]),
+ mock.call(["passwd", "-l", user]),
+ ]
self.assertEqual(m_subp.call_args_list, expected)
def test_explicit_sudo_false(self, m_subp, m_is_snappy):
- user = 'foouser'
+ user = "foouser"
self.dist.create_user(user, sudo=False)
self.assertEqual(
m_subp.call_args_list,
- [self._useradd2call([user, '-m']),
- mock.call(['passwd', '-l', user])])
+ [
+ self._useradd2call([user, "-m"]),
+ mock.call(["passwd", "-l", user]),
+ ],
+ )
- @mock.patch('cloudinit.ssh_util.setup_user_keys')
+ @mock.patch("cloudinit.ssh_util.setup_user_keys")
def test_setup_ssh_authorized_keys_with_string(
- self, m_setup_user_keys, m_subp, m_is_snappy):
+ self, m_setup_user_keys, m_subp, m_is_snappy
+ ):
"""ssh_authorized_keys allows string and calls setup_user_keys."""
- user = 'foouser'
- self.dist.create_user(user, ssh_authorized_keys='mykey')
+ user = "foouser"
+ self.dist.create_user(user, ssh_authorized_keys="mykey")
self.assertEqual(
m_subp.call_args_list,
- [self._useradd2call([user, '-m']),
- mock.call(['passwd', '-l', user])])
- m_setup_user_keys.assert_called_once_with(set(['mykey']), user)
+ [
+ self._useradd2call([user, "-m"]),
+ mock.call(["passwd", "-l", user]),
+ ],
+ )
+ m_setup_user_keys.assert_called_once_with(set(["mykey"]), user)
- @mock.patch('cloudinit.ssh_util.setup_user_keys')
+ @mock.patch("cloudinit.ssh_util.setup_user_keys")
def test_setup_ssh_authorized_keys_with_list(
- self, m_setup_user_keys, m_subp, m_is_snappy):
+ self, m_setup_user_keys, m_subp, m_is_snappy
+ ):
"""ssh_authorized_keys allows lists and calls setup_user_keys."""
- user = 'foouser'
- self.dist.create_user(user, ssh_authorized_keys=['key1', 'key2'])
+ user = "foouser"
+ self.dist.create_user(user, ssh_authorized_keys=["key1", "key2"])
self.assertEqual(
m_subp.call_args_list,
- [self._useradd2call([user, '-m']),
- mock.call(['passwd', '-l', user])])
- m_setup_user_keys.assert_called_once_with(set(['key1', 'key2']), user)
+ [
+ self._useradd2call([user, "-m"]),
+ mock.call(["passwd", "-l", user]),
+ ],
+ )
+ m_setup_user_keys.assert_called_once_with(set(["key1", "key2"]), user)
- @mock.patch('cloudinit.ssh_util.setup_user_keys')
+ @mock.patch("cloudinit.ssh_util.setup_user_keys")
def test_setup_ssh_authorized_keys_with_integer(
- self, m_setup_user_keys, m_subp, m_is_snappy):
+ self, m_setup_user_keys, m_subp, m_is_snappy
+ ):
"""ssh_authorized_keys warns on non-iterable/string type."""
- user = 'foouser'
+ user = "foouser"
self.dist.create_user(user, ssh_authorized_keys=-1)
m_setup_user_keys.assert_called_once_with(set([]), user)
match = re.match(
- r'.*WARNING: Invalid type \'<(type|class) \'int\'>\' detected for'
- ' \'ssh_authorized_keys\'.*',
+ r".*WARNING: Invalid type \'<(type|class) \'int\'>\' detected for"
+ " 'ssh_authorized_keys'.*",
self.logs.getvalue(),
- re.DOTALL)
+ re.DOTALL,
+ )
self.assertIsNotNone(
- match, 'Missing ssh_authorized_keys invalid type warning')
+ match, "Missing ssh_authorized_keys invalid type warning"
+ )
- @mock.patch('cloudinit.ssh_util.setup_user_keys')
+ @mock.patch("cloudinit.ssh_util.setup_user_keys")
def test_create_user_with_ssh_redirect_user_no_cloud_keys(
- self, m_setup_user_keys, m_subp, m_is_snappy):
+ self, m_setup_user_keys, m_subp, m_is_snappy
+ ):
"""Log a warning when trying to redirect a user no cloud ssh keys."""
- user = 'foouser'
- self.dist.create_user(user, ssh_redirect_user='someuser')
+ user = "foouser"
+ self.dist.create_user(user, ssh_redirect_user="someuser")
self.assertIn(
- 'WARNING: Unable to disable SSH logins for foouser given '
- 'ssh_redirect_user: someuser. No cloud public-keys present.\n',
- self.logs.getvalue())
+ "WARNING: Unable to disable SSH logins for foouser given "
+ "ssh_redirect_user: someuser. No cloud public-keys present.\n",
+ self.logs.getvalue(),
+ )
m_setup_user_keys.assert_not_called()
- @mock.patch('cloudinit.ssh_util.setup_user_keys')
+ @mock.patch("cloudinit.ssh_util.setup_user_keys")
def test_create_user_with_ssh_redirect_user_with_cloud_keys(
- self, m_setup_user_keys, m_subp, m_is_snappy):
+ self, m_setup_user_keys, m_subp, m_is_snappy
+ ):
"""Disable ssh when ssh_redirect_user and cloud ssh keys are set."""
- user = 'foouser'
+ user = "foouser"
self.dist.create_user(
- user, ssh_redirect_user='someuser', cloud_public_ssh_keys=['key1'])
+ user, ssh_redirect_user="someuser", cloud_public_ssh_keys=["key1"]
+ )
disable_prefix = ssh_util.DISABLE_USER_OPTS
- disable_prefix = disable_prefix.replace('$USER', 'someuser')
- disable_prefix = disable_prefix.replace('$DISABLE_USER', user)
+ disable_prefix = disable_prefix.replace("$USER", "someuser")
+ disable_prefix = disable_prefix.replace("$DISABLE_USER", user)
m_setup_user_keys.assert_called_once_with(
- set(['key1']), 'foouser', options=disable_prefix)
+ set(["key1"]), "foouser", options=disable_prefix
+ )
- @mock.patch('cloudinit.ssh_util.setup_user_keys')
+ @mock.patch("cloudinit.ssh_util.setup_user_keys")
def test_create_user_with_ssh_redirect_user_does_not_disable_auth_keys(
- self, m_setup_user_keys, m_subp, m_is_snappy):
+ self, m_setup_user_keys, m_subp, m_is_snappy
+ ):
"""Do not disable ssh_authorized_keys when ssh_redirect_user is set."""
- user = 'foouser'
+ user = "foouser"
self.dist.create_user(
- user, ssh_authorized_keys='auth1', ssh_redirect_user='someuser',
- cloud_public_ssh_keys=['key1'])
+ user,
+ ssh_authorized_keys="auth1",
+ ssh_redirect_user="someuser",
+ cloud_public_ssh_keys=["key1"],
+ )
disable_prefix = ssh_util.DISABLE_USER_OPTS
- disable_prefix = disable_prefix.replace('$USER', 'someuser')
- disable_prefix = disable_prefix.replace('$DISABLE_USER', user)
+ disable_prefix = disable_prefix.replace("$USER", "someuser")
+ disable_prefix = disable_prefix.replace("$DISABLE_USER", user)
self.assertEqual(
m_setup_user_keys.call_args_list,
- [mock.call(set(['auth1']), user), # not disabled
- mock.call(set(['key1']), 'foouser', options=disable_prefix)])
+ [
+ mock.call(set(["auth1"]), user), # not disabled
+ mock.call(set(["key1"]), "foouser", options=disable_prefix),
+ ],
+ )
@mock.patch("cloudinit.distros.subp.which")
- def test_lock_with_usermod_if_no_passwd(self, m_which, m_subp,
- m_is_snappy):
+ def test_lock_with_usermod_if_no_passwd(
+ self, m_which, m_subp, m_is_snappy
+ ):
"""Lock uses usermod --lock if no 'passwd' cmd available."""
- m_which.side_effect = lambda m: m in ('usermod',)
+ m_which.side_effect = lambda m: m in ("usermod",)
self.dist.lock_passwd("bob")
self.assertEqual(
- [mock.call(['usermod', '--lock', 'bob'])],
- m_subp.call_args_list)
+ [mock.call(["usermod", "--lock", "bob"])], m_subp.call_args_list
+ )
@mock.patch("cloudinit.distros.subp.which")
- def test_lock_with_passwd_if_available(self, m_which, m_subp,
- m_is_snappy):
+ def test_lock_with_passwd_if_available(self, m_which, m_subp, m_is_snappy):
"""Lock with only passwd will use passwd."""
- m_which.side_effect = lambda m: m in ('passwd',)
+ m_which.side_effect = lambda m: m in ("passwd",)
self.dist.lock_passwd("bob")
self.assertEqual(
- [mock.call(['passwd', '-l', 'bob'])],
- m_subp.call_args_list)
+ [mock.call(["passwd", "-l", "bob"])], m_subp.call_args_list
+ )
@mock.patch("cloudinit.distros.subp.which")
- def test_lock_raises_runtime_if_no_commands(self, m_which, m_subp,
- m_is_snappy):
+ def test_lock_raises_runtime_if_no_commands(
+ self, m_which, m_subp, m_is_snappy
+ ):
"""Lock with no commands available raises RuntimeError."""
m_which.return_value = None
with self.assertRaises(RuntimeError):
self.dist.lock_passwd("bob")
+
# vi: ts=4 expandtab
diff --git a/tests/unittests/distros/test_debian.py b/tests/unittests/distros/test_debian.py
index 3d0db145..c7c5932e 100644
--- a/tests/unittests/distros/test_debian.py
+++ b/tests/unittests/distros/test_debian.py
@@ -4,92 +4,117 @@ from unittest import mock
import pytest
-from cloudinit import distros, util
-from cloudinit.distros.debian import (
- APT_GET_COMMAND,
- APT_GET_WRAPPER,
-)
+from cloudinit import distros, subp, util
+from cloudinit.distros.debian import APT_GET_COMMAND, APT_GET_WRAPPER
from tests.unittests.helpers import FilesystemMockingTestCase
-from cloudinit import subp
@mock.patch("cloudinit.distros.debian.subp.subp")
class TestDebianApplyLocale(FilesystemMockingTestCase):
-
def setUp(self):
super(TestDebianApplyLocale, self).setUp()
self.new_root = self.tmp_dir()
self.patchOS(self.new_root)
self.patchUtils(self.new_root)
- self.spath = self.tmp_path('etc/default/locale', self.new_root)
+ self.spath = self.tmp_path("etc/default/locale", self.new_root)
cls = distros.fetch("debian")
self.distro = cls("debian", {}, None)
def test_no_rerun(self, m_subp):
"""If system has defined locale, no re-run is expected."""
m_subp.return_value = (None, None)
- locale = 'en_US.UTF-8'
- util.write_file(self.spath, 'LANG=%s\n' % locale, omode="w")
+ locale = "en_US.UTF-8"
+ util.write_file(self.spath, "LANG=%s\n" % locale, omode="w")
self.distro.apply_locale(locale, out_fn=self.spath)
m_subp.assert_not_called()
def test_no_regen_on_c_utf8(self, m_subp):
"""If locale is set to C.UTF8, do not attempt to call locale-gen"""
m_subp.return_value = (None, None)
- locale = 'C.UTF-8'
- util.write_file(self.spath, 'LANG=%s\n' % 'en_US.UTF-8', omode="w")
+ locale = "C.UTF-8"
+ util.write_file(self.spath, "LANG=%s\n" % "en_US.UTF-8", omode="w")
self.distro.apply_locale(locale, out_fn=self.spath)
self.assertEqual(
- [['update-locale', '--locale-file=' + self.spath,
- 'LANG=%s' % locale]],
- [p[0][0] for p in m_subp.call_args_list])
+ [
+ [
+ "update-locale",
+ "--locale-file=" + self.spath,
+ "LANG=%s" % locale,
+ ]
+ ],
+ [p[0][0] for p in m_subp.call_args_list],
+ )
def test_rerun_if_different(self, m_subp):
"""If system has different locale, locale-gen should be called."""
m_subp.return_value = (None, None)
- locale = 'en_US.UTF-8'
- util.write_file(self.spath, 'LANG=fr_FR.UTF-8', omode="w")
+ locale = "en_US.UTF-8"
+ util.write_file(self.spath, "LANG=fr_FR.UTF-8", omode="w")
self.distro.apply_locale(locale, out_fn=self.spath)
self.assertEqual(
- [['locale-gen', locale],
- ['update-locale', '--locale-file=' + self.spath,
- 'LANG=%s' % locale]],
- [p[0][0] for p in m_subp.call_args_list])
+ [
+ ["locale-gen", locale],
+ [
+ "update-locale",
+ "--locale-file=" + self.spath,
+ "LANG=%s" % locale,
+ ],
+ ],
+ [p[0][0] for p in m_subp.call_args_list],
+ )
def test_rerun_if_no_file(self, m_subp):
"""If system has no locale file, locale-gen should be called."""
m_subp.return_value = (None, None)
- locale = 'en_US.UTF-8'
+ locale = "en_US.UTF-8"
self.distro.apply_locale(locale, out_fn=self.spath)
self.assertEqual(
- [['locale-gen', locale],
- ['update-locale', '--locale-file=' + self.spath,
- 'LANG=%s' % locale]],
- [p[0][0] for p in m_subp.call_args_list])
+ [
+ ["locale-gen", locale],
+ [
+ "update-locale",
+ "--locale-file=" + self.spath,
+ "LANG=%s" % locale,
+ ],
+ ],
+ [p[0][0] for p in m_subp.call_args_list],
+ )
def test_rerun_on_unset_system_locale(self, m_subp):
"""If system has unset locale, locale-gen should be called."""
m_subp.return_value = (None, None)
- locale = 'en_US.UTF-8'
- util.write_file(self.spath, 'LANG=', omode="w")
+ locale = "en_US.UTF-8"
+ util.write_file(self.spath, "LANG=", omode="w")
self.distro.apply_locale(locale, out_fn=self.spath)
self.assertEqual(
- [['locale-gen', locale],
- ['update-locale', '--locale-file=' + self.spath,
- 'LANG=%s' % locale]],
- [p[0][0] for p in m_subp.call_args_list])
+ [
+ ["locale-gen", locale],
+ [
+ "update-locale",
+ "--locale-file=" + self.spath,
+ "LANG=%s" % locale,
+ ],
+ ],
+ [p[0][0] for p in m_subp.call_args_list],
+ )
def test_rerun_on_mismatched_keys(self, m_subp):
"""If key is LC_ALL and system has only LANG, rerun is expected."""
m_subp.return_value = (None, None)
- locale = 'en_US.UTF-8'
- util.write_file(self.spath, 'LANG=', omode="w")
- self.distro.apply_locale(locale, out_fn=self.spath, keyname='LC_ALL')
+ locale = "en_US.UTF-8"
+ util.write_file(self.spath, "LANG=", omode="w")
+ self.distro.apply_locale(locale, out_fn=self.spath, keyname="LC_ALL")
self.assertEqual(
- [['locale-gen', locale],
- ['update-locale', '--locale-file=' + self.spath,
- 'LC_ALL=%s' % locale]],
- [p[0][0] for p in m_subp.call_args_list])
+ [
+ ["locale-gen", locale],
+ [
+ "update-locale",
+ "--locale-file=" + self.spath,
+ "LC_ALL=%s" % locale,
+ ],
+ ],
+ [p[0][0] for p in m_subp.call_args_list],
+ )
def test_falseish_locale_raises_valueerror(self, m_subp):
"""locale as None or "" is invalid and should raise ValueError."""
@@ -99,45 +124,53 @@ class TestDebianApplyLocale(FilesystemMockingTestCase):
m_subp.assert_not_called()
self.assertEqual(
- 'Failed to provide locale value.', str(ctext_m.exception))
+ "Failed to provide locale value.", str(ctext_m.exception)
+ )
with self.assertRaises(ValueError) as ctext_m:
self.distro.apply_locale("")
m_subp.assert_not_called()
self.assertEqual(
- 'Failed to provide locale value.', str(ctext_m.exception))
+ "Failed to provide locale value.", str(ctext_m.exception)
+ )
-@mock.patch.dict('os.environ', {}, clear=True)
+@mock.patch.dict("os.environ", {}, clear=True)
@mock.patch("cloudinit.distros.debian.subp.which", return_value=True)
@mock.patch("cloudinit.distros.debian.subp.subp")
class TestPackageCommand:
distro = distros.fetch("debian")("debian", {}, None)
- @mock.patch("cloudinit.distros.debian.Distro._apt_lock_available",
- return_value=True)
+ @mock.patch(
+ "cloudinit.distros.debian.Distro._apt_lock_available",
+ return_value=True,
+ )
def test_simple_command(self, m_apt_avail, m_subp, m_which):
- self.distro.package_command('update')
- apt_args = [APT_GET_WRAPPER['command']]
+ self.distro.package_command("update")
+ apt_args = [APT_GET_WRAPPER["command"]]
apt_args.extend(APT_GET_COMMAND)
- apt_args.append('update')
+ apt_args.append("update")
expected_call = {
- 'args': apt_args,
- 'capture': False,
- 'env': {'DEBIAN_FRONTEND': 'noninteractive'},
+ "args": apt_args,
+ "capture": False,
+ "env": {"DEBIAN_FRONTEND": "noninteractive"},
}
assert m_subp.call_args == mock.call(**expected_call)
- @mock.patch("cloudinit.distros.debian.Distro._apt_lock_available",
- side_effect=[False, False, True])
+ @mock.patch(
+ "cloudinit.distros.debian.Distro._apt_lock_available",
+ side_effect=[False, False, True],
+ )
@mock.patch("cloudinit.distros.debian.time.sleep")
def test_wait_for_lock(self, m_sleep, m_apt_avail, m_subp, m_which):
self.distro._wait_for_apt_command("stub", {"args": "stub2"})
assert m_sleep.call_args_list == [mock.call(1), mock.call(1)]
- assert m_subp.call_args_list == [mock.call(args='stub2')]
+ assert m_subp.call_args_list == [mock.call(args="stub2")]
- @mock.patch("cloudinit.distros.debian.Distro._apt_lock_available",
- return_value=False)
+ @mock.patch(
+ "cloudinit.distros.debian.Distro._apt_lock_available",
+ return_value=False,
+ )
@mock.patch("cloudinit.distros.debian.time.sleep")
@mock.patch("cloudinit.distros.debian.time.time", side_effect=count())
def test_lock_wait_timeout(
@@ -147,8 +180,10 @@ class TestPackageCommand:
self.distro._wait_for_apt_command("stub", "stub2", timeout=5)
assert m_subp.call_args_list == []
- @mock.patch("cloudinit.distros.debian.Distro._apt_lock_available",
- side_effect=cycle([True, False]))
+ @mock.patch(
+ "cloudinit.distros.debian.Distro._apt_lock_available",
+ side_effect=cycle([True, False]),
+ )
@mock.patch("cloudinit.distros.debian.time.sleep")
def test_lock_exception_wait(self, m_sleep, m_apt_avail, m_subp, m_which):
exception = subp.ProcessExecutionError(
@@ -158,8 +193,10 @@ class TestPackageCommand:
ret = self.distro._wait_for_apt_command("stub", {"args": "stub2"})
assert ret == "return_thing"
- @mock.patch("cloudinit.distros.debian.Distro._apt_lock_available",
- side_effect=cycle([True, False]))
+ @mock.patch(
+ "cloudinit.distros.debian.Distro._apt_lock_available",
+ side_effect=cycle([True, False]),
+ )
@mock.patch("cloudinit.distros.debian.time.sleep")
@mock.patch("cloudinit.distros.debian.time.time", side_effect=count())
def test_lock_exception_timeout(
diff --git a/tests/unittests/distros/test_freebsd.py b/tests/unittests/distros/test_freebsd.py
index 0279e86f..22be5098 100644
--- a/tests/unittests/distros/test_freebsd.py
+++ b/tests/unittests/distros/test_freebsd.py
@@ -1,45 +1,43 @@
# This file is part of cloud-init. See LICENSE file for license information.
-from cloudinit.util import (find_freebsd_part, get_path_dev_freebsd)
-from tests.unittests.helpers import (CiTestCase, mock)
-
import os
+from cloudinit.util import find_freebsd_part, get_path_dev_freebsd
+from tests.unittests.helpers import CiTestCase, mock
-class TestDeviceLookUp(CiTestCase):
- @mock.patch('cloudinit.subp.subp')
+class TestDeviceLookUp(CiTestCase):
+ @mock.patch("cloudinit.subp.subp")
def test_find_freebsd_part_label(self, mock_subp):
- glabel_out = '''
+ glabel_out = """
gptid/fa52d426-c337-11e6-8911-00155d4c5e47 N/A da0p1
label/rootfs N/A da0p2
label/swap N/A da0p3
-'''
+"""
mock_subp.return_value = (glabel_out, "")
res = find_freebsd_part("/dev/label/rootfs")
self.assertEqual("da0p2", res)
- @mock.patch('cloudinit.subp.subp')
+ @mock.patch("cloudinit.subp.subp")
def test_find_freebsd_part_gpt(self, mock_subp):
- glabel_out = '''
+ glabel_out = """
gpt/bootfs N/A vtbd0p1
gptid/3f4cbe26-75da-11e8-a8f2-002590ec6166 N/A vtbd0p1
gpt/swapfs N/A vtbd0p2
gpt/rootfs N/A vtbd0p3
iso9660/cidata N/A vtbd2
-'''
+"""
mock_subp.return_value = (glabel_out, "")
res = find_freebsd_part("/dev/gpt/rootfs")
self.assertEqual("vtbd0p3", res)
def test_get_path_dev_freebsd_label(self):
- mnt_list = '''
+ mnt_list = """
/dev/label/rootfs / ufs rw 1 1
devfs /dev devfs rw,multilabel 0 0
fdescfs /dev/fd fdescfs rw 0 0
/dev/da1s1 /mnt/resource ufs rw 2 2
-'''
- with mock.patch.object(os.path, 'exists',
- return_value=True):
- res = get_path_dev_freebsd('/etc', mnt_list)
+"""
+ with mock.patch.object(os.path, "exists", return_value=True):
+ res = get_path_dev_freebsd("/etc", mnt_list)
self.assertIsNotNone(res)
diff --git a/tests/unittests/distros/test_generic.py b/tests/unittests/distros/test_generic.py
index e542c26f..93c5395c 100644
--- a/tests/unittests/distros/test_generic.py
+++ b/tests/unittests/distros/test_generic.py
@@ -1,35 +1,49 @@
# This file is part of cloud-init. See LICENSE file for license information.
-from cloudinit import distros
-from cloudinit import util
-
-from tests.unittests import helpers
-
import os
-import pytest
import shutil
import tempfile
from unittest import mock
+import pytest
+
+from cloudinit import distros, util
+from tests.unittests import helpers
+
unknown_arch_info = {
- 'arches': ['default'],
- 'failsafe': {'primary': 'http://fs-primary-default',
- 'security': 'http://fs-security-default'}
+ "arches": ["default"],
+ "failsafe": {
+ "primary": "http://fs-primary-default",
+ "security": "http://fs-security-default",
+ },
}
package_mirrors = [
- {'arches': ['i386', 'amd64'],
- 'failsafe': {'primary': 'http://fs-primary-intel',
- 'security': 'http://fs-security-intel'},
- 'search': {
- 'primary': ['http://%(ec2_region)s.ec2/',
- 'http://%(availability_zone)s.clouds/'],
- 'security': ['http://security-mirror1-intel',
- 'http://security-mirror2-intel']}},
- {'arches': ['armhf', 'armel'],
- 'failsafe': {'primary': 'http://fs-primary-arm',
- 'security': 'http://fs-security-arm'}},
- unknown_arch_info
+ {
+ "arches": ["i386", "amd64"],
+ "failsafe": {
+ "primary": "http://fs-primary-intel",
+ "security": "http://fs-security-intel",
+ },
+ "search": {
+ "primary": [
+ "http://%(ec2_region)s.ec2/",
+ "http://%(availability_zone)s.clouds/",
+ ],
+ "security": [
+ "http://security-mirror1-intel",
+ "http://security-mirror2-intel",
+ ],
+ },
+ },
+ {
+ "arches": ["armhf", "armel"],
+ "failsafe": {
+ "primary": "http://fs-primary-arm",
+ "security": "http://fs-security-arm",
+ },
+ },
+ unknown_arch_info,
]
gpmi = distros._get_package_mirror_info
@@ -37,7 +51,6 @@ gapmi = distros._get_arch_package_mirror_info
class TestGenericDistro(helpers.FilesystemMockingTestCase):
-
def setUp(self):
super(TestGenericDistro, self).setUp()
# Make a temp directoy for tests to use.
@@ -48,7 +61,7 @@ class TestGenericDistro(helpers.FilesystemMockingTestCase):
cls = distros.fetch("ubuntu")
d = cls("ubuntu", {}, None)
os.makedirs(os.path.join(self.tmp, "etc"))
- os.makedirs(os.path.join(self.tmp, "etc", 'sudoers.d'))
+ os.makedirs(os.path.join(self.tmp, "etc", "sudoers.d"))
self.patchOS(self.tmp)
self.patchUtils(self.tmp)
d.write_sudo_rules("harlowja", rules)
@@ -65,34 +78,34 @@ class TestGenericDistro(helpers.FilesystemMockingTestCase):
return found_amount
def test_sudoers_ensure_rules(self):
- rules = 'ALL=(ALL:ALL) ALL'
- contents = self._write_load_sudoers('harlowja', rules)
- expected = ['harlowja ALL=(ALL:ALL) ALL']
+ rules = "ALL=(ALL:ALL) ALL"
+ contents = self._write_load_sudoers("harlowja", rules)
+ expected = ["harlowja ALL=(ALL:ALL) ALL"]
self.assertEqual(len(expected), self._count_in(expected, contents))
not_expected = [
- 'harlowja A',
- 'harlowja L',
- 'harlowja L',
+ "harlowja A",
+ "harlowja L",
+ "harlowja L",
]
self.assertEqual(0, self._count_in(not_expected, contents))
def test_sudoers_ensure_rules_list(self):
rules = [
- 'ALL=(ALL:ALL) ALL',
- 'B-ALL=(ALL:ALL) ALL',
- 'C-ALL=(ALL:ALL) ALL',
+ "ALL=(ALL:ALL) ALL",
+ "B-ALL=(ALL:ALL) ALL",
+ "C-ALL=(ALL:ALL) ALL",
]
- contents = self._write_load_sudoers('harlowja', rules)
+ contents = self._write_load_sudoers("harlowja", rules)
expected = [
- 'harlowja ALL=(ALL:ALL) ALL',
- 'harlowja B-ALL=(ALL:ALL) ALL',
- 'harlowja C-ALL=(ALL:ALL) ALL',
+ "harlowja ALL=(ALL:ALL) ALL",
+ "harlowja B-ALL=(ALL:ALL) ALL",
+ "harlowja C-ALL=(ALL:ALL) ALL",
]
self.assertEqual(len(expected), self._count_in(expected, contents))
not_expected = [
- 'harlowja A',
- 'harlowja L',
- 'harlowja L',
+ "harlowja A",
+ "harlowja L",
+ "harlowja L",
]
self.assertEqual(0, self._count_in(not_expected, contents))
@@ -124,7 +137,7 @@ class TestGenericDistro(helpers.FilesystemMockingTestCase):
d = cls("ubuntu", {}, None)
self.patchOS(self.tmp)
self.patchUtils(self.tmp)
- for char in ['#', '@']:
+ for char in ["#", "@"]:
util.write_file("/etc/sudoers", "{}includedir /b".format(char))
d.ensure_sudo_dir("/b")
contents = util.load_file("/etc/sudoers")
@@ -146,7 +159,7 @@ class TestGenericDistro(helpers.FilesystemMockingTestCase):
d = cls("ubuntu", {}, None)
self.patchOS(self.tmp)
self.patchUtils(self.tmp)
- os.makedirs('/run/systemd/system')
+ os.makedirs("/run/systemd/system")
self.assertTrue(d.uses_systemd())
def test_systemd_not_in_use(self):
@@ -161,18 +174,18 @@ class TestGenericDistro(helpers.FilesystemMockingTestCase):
d = cls("ubuntu", {}, None)
self.patchOS(self.tmp)
self.patchUtils(self.tmp)
- os.makedirs('/run/systemd')
- os.symlink('/', '/run/systemd/system')
+ os.makedirs("/run/systemd")
+ os.symlink("/", "/run/systemd/system")
self.assertFalse(d.uses_systemd())
- @mock.patch('cloudinit.distros.debian.read_system_locale')
+ @mock.patch("cloudinit.distros.debian.read_system_locale")
def test_get_locale_ubuntu(self, m_locale):
"""Test ubuntu distro returns locale set to C.UTF-8"""
- m_locale.return_value = 'C.UTF-8'
+ m_locale.return_value = "C.UTF-8"
cls = distros.fetch("ubuntu")
d = cls("ubuntu", {}, None)
locale = d.get_locale()
- self.assertEqual('C.UTF-8', locale)
+ self.assertEqual("C.UTF-8", locale)
def test_get_locale_rhel(self):
"""Test rhel distro returns NotImplementedError exception"""
@@ -197,11 +210,11 @@ class TestGenericDistro(helpers.FilesystemMockingTestCase):
with mock.patch("cloudinit.subp.subp") as m_subp:
d.expire_passwd("myuser")
m_subp.assert_called_once_with(
- ["pw", "usermod", "myuser", "-p", "01-Jan-1970"])
+ ["pw", "usermod", "myuser", "-p", "01-Jan-1970"]
+ )
class TestGetPackageMirrors:
-
def return_first(self, mlist):
if not mlist:
return None
@@ -219,77 +232,125 @@ class TestGetPackageMirrors:
def return_last(self, mlist):
if not mlist:
return None
- return(mlist[-1])
+ return mlist[-1]
@pytest.mark.parametrize(
"allow_ec2_mirror, platform_type, mirrors",
[
- (True, "ec2", [
- {'primary': 'http://us-east-1.ec2/',
- 'security': 'http://security-mirror1-intel'},
- {'primary': 'http://us-east-1a.clouds/',
- 'security': 'http://security-mirror2-intel'}
- ]),
- (True, "other", [
- {'primary': 'http://us-east-1.ec2/',
- 'security': 'http://security-mirror1-intel'},
- {'primary': 'http://us-east-1a.clouds/',
- 'security': 'http://security-mirror2-intel'}
- ]),
- (False, "ec2", [
- {'primary': 'http://us-east-1.ec2/',
- 'security': 'http://security-mirror1-intel'},
- {'primary': 'http://us-east-1a.clouds/',
- 'security': 'http://security-mirror2-intel'}
- ]),
- (False, "other", [
- {'primary': 'http://us-east-1a.clouds/',
- 'security': 'http://security-mirror1-intel'},
- {'primary': 'http://fs-primary-intel',
- 'security': 'http://security-mirror2-intel'}
- ])
- ])
- def test_get_package_mirror_info_az_ec2(self,
- allow_ec2_mirror,
- platform_type,
- mirrors):
- flag_path = "cloudinit.distros." \
- "ALLOW_EC2_MIRRORS_ON_NON_AWS_INSTANCE_TYPES"
+ (
+ True,
+ "ec2",
+ [
+ {
+ "primary": "http://us-east-1.ec2/",
+ "security": "http://security-mirror1-intel",
+ },
+ {
+ "primary": "http://us-east-1a.clouds/",
+ "security": "http://security-mirror2-intel",
+ },
+ ],
+ ),
+ (
+ True,
+ "other",
+ [
+ {
+ "primary": "http://us-east-1.ec2/",
+ "security": "http://security-mirror1-intel",
+ },
+ {
+ "primary": "http://us-east-1a.clouds/",
+ "security": "http://security-mirror2-intel",
+ },
+ ],
+ ),
+ (
+ False,
+ "ec2",
+ [
+ {
+ "primary": "http://us-east-1.ec2/",
+ "security": "http://security-mirror1-intel",
+ },
+ {
+ "primary": "http://us-east-1a.clouds/",
+ "security": "http://security-mirror2-intel",
+ },
+ ],
+ ),
+ (
+ False,
+ "other",
+ [
+ {
+ "primary": "http://us-east-1a.clouds/",
+ "security": "http://security-mirror1-intel",
+ },
+ {
+ "primary": "http://fs-primary-intel",
+ "security": "http://security-mirror2-intel",
+ },
+ ],
+ ),
+ ],
+ )
+ def test_get_package_mirror_info_az_ec2(
+ self, allow_ec2_mirror, platform_type, mirrors
+ ):
+ flag_path = (
+ "cloudinit.distros.ALLOW_EC2_MIRRORS_ON_NON_AWS_INSTANCE_TYPES"
+ )
with mock.patch(flag_path, allow_ec2_mirror):
arch_mirrors = gapmi(package_mirrors, arch="amd64")
data_source_mock = mock.Mock(
- availability_zone="us-east-1a",
- platform_type=platform_type)
-
- results = gpmi(arch_mirrors, data_source=data_source_mock,
- mirror_filter=self.return_first)
- assert(results == mirrors[0])
-
- results = gpmi(arch_mirrors, data_source=data_source_mock,
- mirror_filter=self.return_second)
- assert(results == mirrors[1])
-
- results = gpmi(arch_mirrors, data_source=data_source_mock,
- mirror_filter=self.return_none)
- assert(results == package_mirrors[0]['failsafe'])
+ availability_zone="us-east-1a", platform_type=platform_type
+ )
+
+ results = gpmi(
+ arch_mirrors,
+ data_source=data_source_mock,
+ mirror_filter=self.return_first,
+ )
+ assert results == mirrors[0]
+
+ results = gpmi(
+ arch_mirrors,
+ data_source=data_source_mock,
+ mirror_filter=self.return_second,
+ )
+ assert results == mirrors[1]
+
+ results = gpmi(
+ arch_mirrors,
+ data_source=data_source_mock,
+ mirror_filter=self.return_none,
+ )
+ assert results == package_mirrors[0]["failsafe"]
def test_get_package_mirror_info_az_non_ec2(self):
arch_mirrors = gapmi(package_mirrors, arch="amd64")
data_source_mock = mock.Mock(availability_zone="nova.cloudvendor")
- results = gpmi(arch_mirrors, data_source=data_source_mock,
- mirror_filter=self.return_first)
- assert(results == {
- 'primary': 'http://nova.cloudvendor.clouds/',
- 'security': 'http://security-mirror1-intel'}
+ results = gpmi(
+ arch_mirrors,
+ data_source=data_source_mock,
+ mirror_filter=self.return_first,
)
-
- results = gpmi(arch_mirrors, data_source=data_source_mock,
- mirror_filter=self.return_last)
- assert(results == {
- 'primary': 'http://nova.cloudvendor.clouds/',
- 'security': 'http://security-mirror2-intel'}
+ assert results == {
+ "primary": "http://nova.cloudvendor.clouds/",
+ "security": "http://security-mirror1-intel",
+ }
+
+ results = gpmi(
+ arch_mirrors,
+ data_source=data_source_mock,
+ mirror_filter=self.return_last,
)
+ assert results == {
+ "primary": "http://nova.cloudvendor.clouds/",
+ "security": "http://security-mirror2-intel",
+ }
def test_get_package_mirror_info_none(self):
arch_mirrors = gapmi(package_mirrors, arch="amd64")
@@ -298,18 +359,25 @@ class TestGetPackageMirrors:
# because both search entries here replacement based on
# availability-zone, the filter will be called with an empty list and
# failsafe should be taken.
- results = gpmi(arch_mirrors, data_source=data_source_mock,
- mirror_filter=self.return_first)
- assert(results == {
- 'primary': 'http://fs-primary-intel',
- 'security': 'http://security-mirror1-intel'}
+ results = gpmi(
+ arch_mirrors,
+ data_source=data_source_mock,
+ mirror_filter=self.return_first,
)
-
- results = gpmi(arch_mirrors, data_source=data_source_mock,
- mirror_filter=self.return_last)
- assert(results == {
- 'primary': 'http://fs-primary-intel',
- 'security': 'http://security-mirror2-intel'}
+ assert results == {
+ "primary": "http://fs-primary-intel",
+ "security": "http://security-mirror1-intel",
+ }
+
+ results = gpmi(
+ arch_mirrors,
+ data_source=data_source_mock,
+ mirror_filter=self.return_last,
)
+ assert results == {
+ "primary": "http://fs-primary-intel",
+ "security": "http://security-mirror2-intel",
+ }
+
# vi: ts=4 expandtab
diff --git a/tests/unittests/distros/test_gentoo.py b/tests/unittests/distros/test_gentoo.py
index 4e4680b8..dadf5df5 100644
--- a/tests/unittests/distros/test_gentoo.py
+++ b/tests/unittests/distros/test_gentoo.py
@@ -1,13 +1,12 @@
# This file is part of cloud-init. See LICENSE file for license information.
-from cloudinit import util
-from cloudinit import atomic_helper
+from cloudinit import atomic_helper, util
from tests.unittests.helpers import CiTestCase
+
from . import _get_distro
class TestGentoo(CiTestCase):
-
def test_write_hostname(self):
distro = _get_distro("gentoo")
hostname = "myhostname"
@@ -22,5 +21,7 @@ class TestGentoo(CiTestCase):
hostfile = self.tmp_path("hostfile")
atomic_helper.write_file(hostfile, contents, omode="w")
distro._write_hostname(hostname, hostfile)
- self.assertEqual('#This is the hostname\nhostname="myhostname"\n',
- util.load_file(hostfile))
+ self.assertEqual(
+ '#This is the hostname\nhostname="myhostname"\n',
+ util.load_file(hostfile),
+ )
diff --git a/tests/unittests/distros/test_hostname.py b/tests/unittests/distros/test_hostname.py
index f6d4dbe5..2cbbb3e2 100644
--- a/tests/unittests/distros/test_hostname.py
+++ b/tests/unittests/distros/test_hostname.py
@@ -4,13 +4,12 @@ import unittest
from cloudinit.distros.parsers import hostname
-
-BASE_HOSTNAME = '''
+BASE_HOSTNAME = """
# My super-duper-hostname
blahblah
-'''
+"""
BASE_HOSTNAME = BASE_HOSTNAME.strip()
@@ -18,7 +17,7 @@ class TestHostnameHelper(unittest.TestCase):
def test_parse_same(self):
hn = hostname.HostnameConf(BASE_HOSTNAME)
self.assertEqual(str(hn).strip(), BASE_HOSTNAME)
- self.assertEqual(hn.hostname, 'blahblah')
+ self.assertEqual(hn.hostname, "blahblah")
def test_no_adjust_hostname(self):
hn = hostname.HostnameConf(BASE_HOSTNAME)
@@ -29,14 +28,15 @@ class TestHostnameHelper(unittest.TestCase):
def test_adjust_hostname(self):
hn = hostname.HostnameConf(BASE_HOSTNAME)
prev_name = hn.hostname
- self.assertEqual(prev_name, 'blahblah')
+ self.assertEqual(prev_name, "blahblah")
hn.set_hostname("bbbbd")
- self.assertEqual(hn.hostname, 'bbbbd')
- expected_out = '''
+ self.assertEqual(hn.hostname, "bbbbd")
+ expected_out = """
# My super-duper-hostname
bbbbd
-'''
+"""
self.assertEqual(str(hn).strip(), expected_out.strip())
+
# vi: ts=4 expandtab
diff --git a/tests/unittests/distros/test_hosts.py b/tests/unittests/distros/test_hosts.py
index 8aaa6e48..faffd912 100644
--- a/tests/unittests/distros/test_hosts.py
+++ b/tests/unittests/distros/test_hosts.py
@@ -4,42 +4,44 @@ import unittest
from cloudinit.distros.parsers import hosts
-
-BASE_ETC = '''
+BASE_ETC = """
# Example
127.0.0.1 localhost
192.168.1.10 foo.mydomain.org foo
192.168.1.10 bar.mydomain.org bar
146.82.138.7 master.debian.org master
209.237.226.90 www.opensource.org
-'''
+"""
BASE_ETC = BASE_ETC.strip()
class TestHostsHelper(unittest.TestCase):
def test_parse(self):
eh = hosts.HostsConf(BASE_ETC)
- self.assertEqual(eh.get_entry('127.0.0.1'), [['localhost']])
- self.assertEqual(eh.get_entry('192.168.1.10'),
- [['foo.mydomain.org', 'foo'],
- ['bar.mydomain.org', 'bar']])
+ self.assertEqual(eh.get_entry("127.0.0.1"), [["localhost"]])
+ self.assertEqual(
+ eh.get_entry("192.168.1.10"),
+ [["foo.mydomain.org", "foo"], ["bar.mydomain.org", "bar"]],
+ )
eh = str(eh)
- self.assertTrue(eh.startswith('# Example'))
+ self.assertTrue(eh.startswith("# Example"))
def test_add(self):
eh = hosts.HostsConf(BASE_ETC)
- eh.add_entry('127.0.0.0', 'blah')
- self.assertEqual(eh.get_entry('127.0.0.0'), [['blah']])
- eh.add_entry('127.0.0.3', 'blah', 'blah2', 'blah3')
- self.assertEqual(eh.get_entry('127.0.0.3'),
- [['blah', 'blah2', 'blah3']])
+ eh.add_entry("127.0.0.0", "blah")
+ self.assertEqual(eh.get_entry("127.0.0.0"), [["blah"]])
+ eh.add_entry("127.0.0.3", "blah", "blah2", "blah3")
+ self.assertEqual(
+ eh.get_entry("127.0.0.3"), [["blah", "blah2", "blah3"]]
+ )
def test_del(self):
eh = hosts.HostsConf(BASE_ETC)
- eh.add_entry('127.0.0.0', 'blah')
- self.assertEqual(eh.get_entry('127.0.0.0'), [['blah']])
+ eh.add_entry("127.0.0.0", "blah")
+ self.assertEqual(eh.get_entry("127.0.0.0"), [["blah"]])
+
+ eh.del_entries("127.0.0.0")
+ self.assertEqual(eh.get_entry("127.0.0.0"), [])
- eh.del_entries('127.0.0.0')
- self.assertEqual(eh.get_entry('127.0.0.0'), [])
# vi: ts=4 expandtab
diff --git a/tests/unittests/distros/test_init.py b/tests/unittests/distros/test_init.py
index fd64a322..8f3c8978 100644
--- a/tests/unittests/distros/test_init.py
+++ b/tests/unittests/distros/test_init.py
@@ -9,16 +9,17 @@ from unittest import mock
import pytest
-from cloudinit.distros import _get_package_mirror_info, LDH_ASCII_CHARS
+from cloudinit.distros import LDH_ASCII_CHARS, _get_package_mirror_info
# In newer versions of Python, these characters will be omitted instead
# of substituted because of security concerns.
# See https://bugs.python.org/issue43882
-SECURITY_URL_CHARS = '\n\r\t'
+SECURITY_URL_CHARS = "\n\r\t"
# Define a set of characters we would expect to be replaced
INVALID_URL_CHARS = [
- chr(x) for x in range(127)
+ chr(x)
+ for x in range(127)
if chr(x) not in LDH_ASCII_CHARS + SECURITY_URL_CHARS
]
for separator in [":", ".", "/", "#", "?", "@", "[", "]"]:
@@ -37,21 +38,41 @@ class TestGetPackageMirrorInfo:
These tests are more focused on specific aspects of the unit under test.
"""
- @pytest.mark.parametrize('mirror_info,expected', [
- # Empty info gives empty return
- ({}, {}),
- # failsafe values used if present
- ({'failsafe': {'primary': 'http://value', 'security': 'http://other'}},
- {'primary': 'http://value', 'security': 'http://other'}),
- # search values used if present
- ({'search': {'primary': ['http://value'],
- 'security': ['http://other']}},
- {'primary': ['http://value'], 'security': ['http://other']}),
- # failsafe values used if search value not present
- ({'search': {'primary': ['http://value']},
- 'failsafe': {'security': 'http://other'}},
- {'primary': ['http://value'], 'security': 'http://other'})
- ])
+ @pytest.mark.parametrize(
+ "mirror_info,expected",
+ [
+ # Empty info gives empty return
+ ({}, {}),
+ # failsafe values used if present
+ (
+ {
+ "failsafe": {
+ "primary": "http://value",
+ "security": "http://other",
+ }
+ },
+ {"primary": "http://value", "security": "http://other"},
+ ),
+ # search values used if present
+ (
+ {
+ "search": {
+ "primary": ["http://value"],
+ "security": ["http://other"],
+ }
+ },
+ {"primary": ["http://value"], "security": ["http://other"]},
+ ),
+ # failsafe values used if search value not present
+ (
+ {
+ "search": {"primary": ["http://value"]},
+ "failsafe": {"security": "http://other"},
+ },
+ {"primary": ["http://value"], "security": "http://other"},
+ ),
+ ],
+ )
def test_get_package_mirror_info_failsafe(self, mirror_info, expected):
"""
Test the interaction between search and failsafe inputs
@@ -60,97 +81,163 @@ class TestGetPackageMirrorInfo:
options; test_failsafe_used_if_all_search_results_filtered_out covers
that.)
"""
- assert expected == _get_package_mirror_info(mirror_info,
- mirror_filter=lambda x: x)
+ assert expected == _get_package_mirror_info(
+ mirror_info, mirror_filter=lambda x: x
+ )
def test_failsafe_used_if_all_search_results_filtered_out(self):
"""Test the failsafe option used if all search options eliminated."""
mirror_info = {
- 'search': {'primary': ['http://value']},
- 'failsafe': {'primary': 'http://other'}
+ "search": {"primary": ["http://value"]},
+ "failsafe": {"primary": "http://other"},
}
- assert {'primary': 'http://other'} == _get_package_mirror_info(
- mirror_info, mirror_filter=lambda x: False)
+ assert {"primary": "http://other"} == _get_package_mirror_info(
+ mirror_info, mirror_filter=lambda x: False
+ )
- @pytest.mark.parametrize('allow_ec2_mirror, platform_type', [
- (True, 'ec2')
- ])
- @pytest.mark.parametrize('availability_zone,region,patterns,expected', (
- # Test ec2_region alone
- ('fk-fake-1f', None, ['http://EC2-%(ec2_region)s/ubuntu'],
- ['http://ec2-fk-fake-1/ubuntu']),
- # Test availability_zone alone
- ('fk-fake-1f', None, ['http://AZ-%(availability_zone)s/ubuntu'],
- ['http://az-fk-fake-1f/ubuntu']),
- # Test region alone
- (None, 'fk-fake-1', ['http://RG-%(region)s/ubuntu'],
- ['http://rg-fk-fake-1/ubuntu']),
- # Test that ec2_region is not available for non-matching AZs
- ('fake-fake-1f', None,
- ['http://EC2-%(ec2_region)s/ubuntu',
- 'http://AZ-%(availability_zone)s/ubuntu'],
- ['http://az-fake-fake-1f/ubuntu']),
- # Test that template order maintained
- (None, 'fake-region',
- ['http://RG-%(region)s-2/ubuntu', 'http://RG-%(region)s-1/ubuntu'],
- ['http://rg-fake-region-2/ubuntu', 'http://rg-fake-region-1/ubuntu']),
- # Test that non-ASCII hostnames are IDNA encoded;
- # "IDNA-ТεЅТ̣".encode('idna') == b"xn--idna--4kd53hh6aba3q"
- (None, 'ТεЅТ̣', ['http://www.IDNA-%(region)s.com/ubuntu'],
- ['http://www.xn--idna--4kd53hh6aba3q.com/ubuntu']),
- # Test that non-ASCII hostnames with a port are IDNA encoded;
- # "IDNA-ТεЅТ̣".encode('idna') == b"xn--idna--4kd53hh6aba3q"
- (None, 'ТεЅТ̣', ['http://www.IDNA-%(region)s.com:8080/ubuntu'],
- ['http://www.xn--idna--4kd53hh6aba3q.com:8080/ubuntu']),
- # Test that non-ASCII non-hostname parts of URLs are unchanged
- (None, 'ТεЅТ̣', ['http://www.example.com/%(region)s/ubuntu'],
- ['http://www.example.com/ТεЅТ̣/ubuntu']),
- # Test that IPv4 addresses are unchanged
- (None, 'fk-fake-1', ['http://192.168.1.1:8080/%(region)s/ubuntu'],
- ['http://192.168.1.1:8080/fk-fake-1/ubuntu']),
- # Test that IPv6 addresses are unchanged
- (None, 'fk-fake-1',
- ['http://[2001:67c:1360:8001::23]/%(region)s/ubuntu'],
- ['http://[2001:67c:1360:8001::23]/fk-fake-1/ubuntu']),
- # Test that unparseable URLs are filtered out of the mirror list
- (None, 'inv[lid',
- ['http://%(region)s.in.hostname/should/be/filtered',
- 'http://but.not.in.the.path/%(region)s'],
- ['http://but.not.in.the.path/inv[lid']),
- (None, '-some-region-',
- ['http://-lead-ing.%(region)s.trail-ing-.example.com/ubuntu'],
- ['http://lead-ing.some-region.trail-ing.example.com/ubuntu']),
- ) + tuple(
- # Dynamically generate a test case for each non-LDH
- # (Letters/Digits/Hyphen) ASCII character, testing that it is
- # substituted with a hyphen
- (None, 'fk{0}fake{0}1'.format(invalid_char),
- ['http://%(region)s/ubuntu'], ['http://fk-fake-1/ubuntu'])
- for invalid_char in INVALID_URL_CHARS
- ))
- def test_valid_substitution(self,
- allow_ec2_mirror,
- platform_type,
- availability_zone,
- region,
- patterns,
- expected):
+ @pytest.mark.parametrize(
+ "allow_ec2_mirror, platform_type", [(True, "ec2")]
+ )
+ @pytest.mark.parametrize(
+ "availability_zone,region,patterns,expected",
+ (
+ # Test ec2_region alone
+ (
+ "fk-fake-1f",
+ None,
+ ["http://EC2-%(ec2_region)s/ubuntu"],
+ ["http://ec2-fk-fake-1/ubuntu"],
+ ),
+ # Test availability_zone alone
+ (
+ "fk-fake-1f",
+ None,
+ ["http://AZ-%(availability_zone)s/ubuntu"],
+ ["http://az-fk-fake-1f/ubuntu"],
+ ),
+ # Test region alone
+ (
+ None,
+ "fk-fake-1",
+ ["http://RG-%(region)s/ubuntu"],
+ ["http://rg-fk-fake-1/ubuntu"],
+ ),
+ # Test that ec2_region is not available for non-matching AZs
+ (
+ "fake-fake-1f",
+ None,
+ [
+ "http://EC2-%(ec2_region)s/ubuntu",
+ "http://AZ-%(availability_zone)s/ubuntu",
+ ],
+ ["http://az-fake-fake-1f/ubuntu"],
+ ),
+ # Test that template order maintained
+ (
+ None,
+ "fake-region",
+ [
+ "http://RG-%(region)s-2/ubuntu",
+ "http://RG-%(region)s-1/ubuntu",
+ ],
+ [
+ "http://rg-fake-region-2/ubuntu",
+ "http://rg-fake-region-1/ubuntu",
+ ],
+ ),
+ # Test that non-ASCII hostnames are IDNA encoded;
+ # "IDNA-ТεЅТ̣".encode('idna') == b"xn--idna--4kd53hh6aba3q"
+ (
+ None,
+ "ТεЅТ̣",
+ ["http://www.IDNA-%(region)s.com/ubuntu"],
+ ["http://www.xn--idna--4kd53hh6aba3q.com/ubuntu"],
+ ),
+ # Test that non-ASCII hostnames with a port are IDNA encoded;
+ # "IDNA-ТεЅТ̣".encode('idna') == b"xn--idna--4kd53hh6aba3q"
+ (
+ None,
+ "ТεЅТ̣",
+ ["http://www.IDNA-%(region)s.com:8080/ubuntu"],
+ ["http://www.xn--idna--4kd53hh6aba3q.com:8080/ubuntu"],
+ ),
+ # Test that non-ASCII non-hostname parts of URLs are unchanged
+ (
+ None,
+ "ТεЅТ̣",
+ ["http://www.example.com/%(region)s/ubuntu"],
+ ["http://www.example.com/ТεЅТ̣/ubuntu"],
+ ),
+ # Test that IPv4 addresses are unchanged
+ (
+ None,
+ "fk-fake-1",
+ ["http://192.168.1.1:8080/%(region)s/ubuntu"],
+ ["http://192.168.1.1:8080/fk-fake-1/ubuntu"],
+ ),
+ # Test that IPv6 addresses are unchanged
+ (
+ None,
+ "fk-fake-1",
+ ["http://[2001:67c:1360:8001::23]/%(region)s/ubuntu"],
+ ["http://[2001:67c:1360:8001::23]/fk-fake-1/ubuntu"],
+ ),
+ # Test that unparseable URLs are filtered out of the mirror list
+ (
+ None,
+ "inv[lid",
+ [
+ "http://%(region)s.in.hostname/should/be/filtered",
+ "http://but.not.in.the.path/%(region)s",
+ ],
+ ["http://but.not.in.the.path/inv[lid"],
+ ),
+ (
+ None,
+ "-some-region-",
+ ["http://-lead-ing.%(region)s.trail-ing-.example.com/ubuntu"],
+ ["http://lead-ing.some-region.trail-ing.example.com/ubuntu"],
+ ),
+ )
+ + tuple(
+ # Dynamically generate a test case for each non-LDH
+ # (Letters/Digits/Hyphen) ASCII character, testing that it is
+ # substituted with a hyphen
+ (
+ None,
+ "fk{0}fake{0}1".format(invalid_char),
+ ["http://%(region)s/ubuntu"],
+ ["http://fk-fake-1/ubuntu"],
+ )
+ for invalid_char in INVALID_URL_CHARS
+ ),
+ )
+ def test_valid_substitution(
+ self,
+ allow_ec2_mirror,
+ platform_type,
+ availability_zone,
+ region,
+ patterns,
+ expected,
+ ):
"""Test substitution works as expected."""
- flag_path = "cloudinit.distros." \
- "ALLOW_EC2_MIRRORS_ON_NON_AWS_INSTANCE_TYPES"
+ flag_path = (
+ "cloudinit.distros.ALLOW_EC2_MIRRORS_ON_NON_AWS_INSTANCE_TYPES"
+ )
m_data_source = mock.Mock(
availability_zone=availability_zone,
region=region,
- platform_type=platform_type
+ platform_type=platform_type,
)
- mirror_info = {'search': {'primary': patterns}}
+ mirror_info = {"search": {"primary": patterns}}
with mock.patch(flag_path, allow_ec2_mirror):
ret = _get_package_mirror_info(
mirror_info,
data_source=m_data_source,
- mirror_filter=lambda x: x
+ mirror_filter=lambda x: x,
)
print(allow_ec2_mirror)
print(platform_type)
@@ -158,4 +245,4 @@ class TestGetPackageMirrorInfo:
print(region)
print(patterns)
print(expected)
- assert {'primary': expected} == ret
+ assert {"primary": expected} == ret
diff --git a/tests/unittests/distros/test_manage_service.py b/tests/unittests/distros/test_manage_service.py
index 6f1bd0b1..9e64b35c 100644
--- a/tests/unittests/distros/test_manage_service.py
+++ b/tests/unittests/distros/test_manage_service.py
@@ -1,6 +1,6 @@
# This file is part of cloud-init. See LICENSE file for license information.
-from tests.unittests.helpers import (CiTestCase, mock)
+from tests.unittests.helpers import CiTestCase, mock
from tests.unittests.util import MockDistro
@@ -12,27 +12,30 @@ class TestManageService(CiTestCase):
super(TestManageService, self).setUp()
self.dist = MockDistro()
- @mock.patch.object(MockDistro, 'uses_systemd', return_value=False)
+ @mock.patch.object(MockDistro, "uses_systemd", return_value=False)
@mock.patch("cloudinit.distros.subp.subp")
def test_manage_service_systemctl_initcmd(self, m_subp, m_sysd):
- self.dist.init_cmd = ['systemctl']
- self.dist.manage_service('start', 'myssh')
- m_subp.assert_called_with(['systemctl', 'start', 'myssh'],
- capture=True)
+ self.dist.init_cmd = ["systemctl"]
+ self.dist.manage_service("start", "myssh")
+ m_subp.assert_called_with(
+ ["systemctl", "start", "myssh"], capture=True
+ )
- @mock.patch.object(MockDistro, 'uses_systemd', return_value=False)
+ @mock.patch.object(MockDistro, "uses_systemd", return_value=False)
@mock.patch("cloudinit.distros.subp.subp")
def test_manage_service_service_initcmd(self, m_subp, m_sysd):
- self.dist.init_cmd = ['service']
- self.dist.manage_service('start', 'myssh')
- m_subp.assert_called_with(['service', 'myssh', 'start'], capture=True)
+ self.dist.init_cmd = ["service"]
+ self.dist.manage_service("start", "myssh")
+ m_subp.assert_called_with(["service", "myssh", "start"], capture=True)
- @mock.patch.object(MockDistro, 'uses_systemd', return_value=True)
+ @mock.patch.object(MockDistro, "uses_systemd", return_value=True)
@mock.patch("cloudinit.distros.subp.subp")
def test_manage_service_systemctl(self, m_subp, m_sysd):
- self.dist.init_cmd = ['ignore']
- self.dist.manage_service('start', 'myssh')
- m_subp.assert_called_with(['systemctl', 'start', 'myssh'],
- capture=True)
+ self.dist.init_cmd = ["ignore"]
+ self.dist.manage_service("start", "myssh")
+ m_subp.assert_called_with(
+ ["systemctl", "start", "myssh"], capture=True
+ )
+
# vi: ts=4 sw=4 expandtab
diff --git a/tests/unittests/distros/test_netbsd.py b/tests/unittests/distros/test_netbsd.py
index 11a68d2a..0bc6dfbd 100644
--- a/tests/unittests/distros/test_netbsd.py
+++ b/tests/unittests/distros/test_netbsd.py
@@ -1,10 +1,11 @@
-import cloudinit.distros.netbsd
+import unittest.mock as mock
import pytest
-import unittest.mock as mock
+
+import cloudinit.distros.netbsd
-@pytest.mark.parametrize('with_pkgin', (True, False))
+@pytest.mark.parametrize("with_pkgin", (True, False))
@mock.patch("cloudinit.distros.netbsd.os")
def test_init(m_os, with_pkgin):
print(with_pkgin)
@@ -12,6 +13,6 @@ def test_init(m_os, with_pkgin):
cfg = {}
distro = cloudinit.distros.netbsd.NetBSD("netbsd", cfg, None)
- expectation = ['pkgin', '-y', 'full-upgrade'] if with_pkgin else None
+ expectation = ["pkgin", "-y", "full-upgrade"] if with_pkgin else None
assert distro.pkg_cmd_upgrade_prefix == expectation
- assert [mock.call('/usr/pkg/bin/pkgin')] == m_os.path.exists.call_args_list
+ assert [mock.call("/usr/pkg/bin/pkgin")] == m_os.path.exists.call_args_list
diff --git a/tests/unittests/distros/test_netconfig.py b/tests/unittests/distros/test_netconfig.py
index 90ac5578..a25be481 100644
--- a/tests/unittests/distros/test_netconfig.py
+++ b/tests/unittests/distros/test_netconfig.py
@@ -7,17 +7,11 @@ from io import StringIO
from textwrap import dedent
from unittest import mock
-from cloudinit import distros
+from cloudinit import distros, helpers, safeyaml, settings, subp, util
from cloudinit.distros.parsers.sys_conf import SysConf
-from cloudinit import helpers
-from cloudinit import settings
-from tests.unittests.helpers import (
- FilesystemMockingTestCase, dir2dict)
-from cloudinit import subp
-from cloudinit import util
-from cloudinit import safeyaml
-
-BASE_NET_CFG = '''
+from tests.unittests.helpers import FilesystemMockingTestCase, dir2dict
+
+BASE_NET_CFG = """
auto lo
iface lo inet loopback
@@ -31,9 +25,9 @@ iface eth0 inet static
auto eth1
iface eth1 inet dhcp
-'''
+"""
-BASE_NET_CFG_FROM_V2 = '''
+BASE_NET_CFG_FROM_V2 = """
auto lo
iface lo inet loopback
@@ -44,9 +38,9 @@ iface eth0 inet static
auto eth1
iface eth1 inet dhcp
-'''
+"""
-BASE_NET_CFG_IPV6 = '''
+BASE_NET_CFG_IPV6 = """
auto lo
iface lo inet loopback
@@ -74,20 +68,31 @@ iface eth1 inet6 static
address 2607:f0d0:1002:0011::3
netmask 64
gateway 2607:f0d0:1002:0011::1
-'''
-
-V1_NET_CFG = {'config': [{'name': 'eth0',
+"""
- 'subnets': [{'address': '192.168.1.5',
- 'broadcast': '192.168.1.0',
- 'gateway': '192.168.1.254',
- 'netmask': '255.255.255.0',
- 'type': 'static'}],
- 'type': 'physical'},
- {'name': 'eth1',
- 'subnets': [{'control': 'auto', 'type': 'dhcp4'}],
- 'type': 'physical'}],
- 'version': 1}
+V1_NET_CFG = {
+ "config": [
+ {
+ "name": "eth0",
+ "subnets": [
+ {
+ "address": "192.168.1.5",
+ "broadcast": "192.168.1.0",
+ "gateway": "192.168.1.254",
+ "netmask": "255.255.255.0",
+ "type": "static",
+ }
+ ],
+ "type": "physical",
+ },
+ {
+ "name": "eth1",
+ "subnets": [{"control": "auto", "type": "dhcp4"}],
+ "type": "physical",
+ },
+ ],
+ "version": 1,
+}
V1_NET_CFG_WITH_DUPS = """\
# same value in interface specific dns and global dns
@@ -144,19 +149,28 @@ auto eth1
iface eth1 inet dhcp
"""
-V1_NET_CFG_IPV6 = {'config': [{'name': 'eth0',
- 'subnets': [{'address':
- '2607:f0d0:1002:0011::2',
- 'gateway':
- '2607:f0d0:1002:0011::1',
- 'netmask': '64',
- 'type': 'static6'}],
- 'type': 'physical'},
- {'name': 'eth1',
- 'subnets': [{'control': 'auto',
- 'type': 'dhcp4'}],
- 'type': 'physical'}],
- 'version': 1}
+V1_NET_CFG_IPV6 = {
+ "config": [
+ {
+ "name": "eth0",
+ "subnets": [
+ {
+ "address": "2607:f0d0:1002:0011::2",
+ "gateway": "2607:f0d0:1002:0011::1",
+ "netmask": "64",
+ "type": "static6",
+ }
+ ],
+ "type": "physical",
+ },
+ {
+ "name": "eth1",
+ "subnets": [{"control": "auto", "type": "dhcp4"}],
+ "type": "physical",
+ },
+ ],
+ "version": 1,
+}
V1_TO_V2_NET_CFG_OUTPUT = """\
@@ -194,14 +208,11 @@ network:
"""
V2_NET_CFG = {
- 'ethernets': {
- 'eth7': {
- 'addresses': ['192.168.1.5/24'],
- 'gateway4': '192.168.1.254'},
- 'eth9': {
- 'dhcp4': True}
+ "ethernets": {
+ "eth7": {"addresses": ["192.168.1.5/24"], "gateway4": "192.168.1.254"},
+ "eth9": {"dhcp4": True},
},
- 'version': 2
+ "version": 2,
}
@@ -237,19 +248,18 @@ class WriteBuffer(object):
class TestNetCfgDistroBase(FilesystemMockingTestCase):
-
def setUp(self):
super(TestNetCfgDistroBase, self).setUp()
- self.add_patch('cloudinit.util.system_is_snappy', 'm_snappy')
+ self.add_patch("cloudinit.util.system_is_snappy", "m_snappy")
def _get_distro(self, dname, renderers=None):
cls = distros.fetch(dname)
cfg = settings.CFG_BUILTIN
- cfg['system_info']['distro'] = dname
+ cfg["system_info"]["distro"] = dname
if renderers:
- cfg['system_info']['network'] = {'renderers': renderers}
+ cfg["system_info"]["network"] = {"renderers": renderers}
paths = helpers.Paths({})
- return cls(dname, cfg.get('system_info'), paths)
+ return cls(dname, cfg.get("system_info"), paths)
def assertCfgEquals(self, blob1, blob2):
b1 = dict(SysConf(blob1.strip().splitlines()))
@@ -264,23 +274,23 @@ class TestNetCfgDistroBase(FilesystemMockingTestCase):
class TestNetCfgDistroFreeBSD(TestNetCfgDistroBase):
-
def setUp(self):
super(TestNetCfgDistroFreeBSD, self).setUp()
- self.distro = self._get_distro('freebsd', renderers=['freebsd'])
+ self.distro = self._get_distro("freebsd", renderers=["freebsd"])
- def _apply_and_verify_freebsd(self, apply_fn, config, expected_cfgs=None,
- bringup=False):
+ def _apply_and_verify_freebsd(
+ self, apply_fn, config, expected_cfgs=None, bringup=False
+ ):
if not expected_cfgs:
- raise ValueError('expected_cfg must not be None')
+ raise ValueError("expected_cfg must not be None")
tmpd = None
- with mock.patch('cloudinit.net.freebsd.available') as m_avail:
+ with mock.patch("cloudinit.net.freebsd.available") as m_avail:
m_avail.return_value = True
with self.reRooted(tmpd) as tmpd:
- util.ensure_dir('/etc')
- util.ensure_file('/etc/rc.conf')
- util.ensure_file('/etc/resolv.conf')
+ util.ensure_dir("/etc")
+ util.ensure_file("/etc/rc.conf")
+ util.ensure_file("/etc/resolv.conf")
apply_fn(config, bringup)
results = dir2dict(tmpd)
@@ -291,14 +301,14 @@ class TestNetCfgDistroFreeBSD(TestNetCfgDistroBase):
print(results[cfgpath])
print("----------")
self.assertEqual(
- set(expected.split('\n')),
- set(results[cfgpath].split('\n')))
+ set(expected.split("\n")), set(results[cfgpath].split("\n"))
+ )
self.assertEqual(0o644, get_mode(cfgpath, tmpd))
- @mock.patch('cloudinit.net.get_interfaces_by_mac')
+ @mock.patch("cloudinit.net.get_interfaces_by_mac")
def test_apply_network_config_freebsd_standard(self, ifaces_mac):
ifaces_mac.return_value = {
- '00:15:5d:4c:73:00': 'eth0',
+ "00:15:5d:4c:73:00": "eth0",
}
rc_conf_expected = """\
defaultrouter=192.168.1.254
@@ -307,17 +317,19 @@ ifconfig_eth1=DHCP
"""
expected_cfgs = {
- '/etc/rc.conf': rc_conf_expected,
- '/etc/resolv.conf': ''
+ "/etc/rc.conf": rc_conf_expected,
+ "/etc/resolv.conf": "",
}
- self._apply_and_verify_freebsd(self.distro.apply_network_config,
- V1_NET_CFG,
- expected_cfgs=expected_cfgs.copy())
+ self._apply_and_verify_freebsd(
+ self.distro.apply_network_config,
+ V1_NET_CFG,
+ expected_cfgs=expected_cfgs.copy(),
+ )
- @mock.patch('cloudinit.net.get_interfaces_by_mac')
+ @mock.patch("cloudinit.net.get_interfaces_by_mac")
def test_apply_network_config_freebsd_ifrename(self, ifaces_mac):
ifaces_mac.return_value = {
- '00:15:5d:4c:73:00': 'vtnet0',
+ "00:15:5d:4c:73:00": "vtnet0",
}
rc_conf_expected = """\
ifconfig_vtnet0_name=eth0
@@ -327,49 +339,51 @@ ifconfig_eth1=DHCP
"""
V1_NET_CFG_RENAME = copy.deepcopy(V1_NET_CFG)
- V1_NET_CFG_RENAME['config'][0]['mac_address'] = '00:15:5d:4c:73:00'
+ V1_NET_CFG_RENAME["config"][0]["mac_address"] = "00:15:5d:4c:73:00"
expected_cfgs = {
- '/etc/rc.conf': rc_conf_expected,
- '/etc/resolv.conf': ''
+ "/etc/rc.conf": rc_conf_expected,
+ "/etc/resolv.conf": "",
}
- self._apply_and_verify_freebsd(self.distro.apply_network_config,
- V1_NET_CFG_RENAME,
- expected_cfgs=expected_cfgs.copy())
+ self._apply_and_verify_freebsd(
+ self.distro.apply_network_config,
+ V1_NET_CFG_RENAME,
+ expected_cfgs=expected_cfgs.copy(),
+ )
- @mock.patch('cloudinit.net.get_interfaces_by_mac')
+ @mock.patch("cloudinit.net.get_interfaces_by_mac")
def test_apply_network_config_freebsd_nameserver(self, ifaces_mac):
ifaces_mac.return_value = {
- '00:15:5d:4c:73:00': 'eth0',
+ "00:15:5d:4c:73:00": "eth0",
}
V1_NET_CFG_DNS = copy.deepcopy(V1_NET_CFG)
- ns = ['1.2.3.4']
- V1_NET_CFG_DNS['config'][0]['subnets'][0]['dns_nameservers'] = ns
- expected_cfgs = {
- '/etc/resolv.conf': 'nameserver 1.2.3.4\n'
- }
- self._apply_and_verify_freebsd(self.distro.apply_network_config,
- V1_NET_CFG_DNS,
- expected_cfgs=expected_cfgs.copy())
+ ns = ["1.2.3.4"]
+ V1_NET_CFG_DNS["config"][0]["subnets"][0]["dns_nameservers"] = ns
+ expected_cfgs = {"/etc/resolv.conf": "nameserver 1.2.3.4\n"}
+ self._apply_and_verify_freebsd(
+ self.distro.apply_network_config,
+ V1_NET_CFG_DNS,
+ expected_cfgs=expected_cfgs.copy(),
+ )
class TestNetCfgDistroUbuntuEni(TestNetCfgDistroBase):
-
def setUp(self):
super(TestNetCfgDistroUbuntuEni, self).setUp()
- self.distro = self._get_distro('ubuntu', renderers=['eni'])
+ self.distro = self._get_distro("ubuntu", renderers=["eni"])
def eni_path(self):
- return '/etc/network/interfaces.d/50-cloud-init.cfg'
+ return "/etc/network/interfaces.d/50-cloud-init.cfg"
- def _apply_and_verify_eni(self, apply_fn, config, expected_cfgs=None,
- bringup=False):
+ def _apply_and_verify_eni(
+ self, apply_fn, config, expected_cfgs=None, bringup=False
+ ):
if not expected_cfgs:
- raise ValueError('expected_cfg must not be None')
+ raise ValueError("expected_cfg must not be None")
tmpd = None
- with mock.patch('cloudinit.net.eni.available') as m_avail:
+ with mock.patch("cloudinit.net.eni.available") as m_avail:
m_avail.return_value = True
with self.reRooted(tmpd) as tmpd:
apply_fn(config, bringup)
@@ -389,35 +403,39 @@ class TestNetCfgDistroUbuntuEni(TestNetCfgDistroBase):
self.eni_path(): V1_NET_CFG_OUTPUT,
}
# ub_distro.apply_network_config(V1_NET_CFG, False)
- self._apply_and_verify_eni(self.distro.apply_network_config,
- V1_NET_CFG,
- expected_cfgs=expected_cfgs.copy())
+ self._apply_and_verify_eni(
+ self.distro.apply_network_config,
+ V1_NET_CFG,
+ expected_cfgs=expected_cfgs.copy(),
+ )
def test_apply_network_config_ipv6_ub(self):
- expected_cfgs = {
- self.eni_path(): V1_NET_CFG_IPV6_OUTPUT
- }
- self._apply_and_verify_eni(self.distro.apply_network_config,
- V1_NET_CFG_IPV6,
- expected_cfgs=expected_cfgs.copy())
+ expected_cfgs = {self.eni_path(): V1_NET_CFG_IPV6_OUTPUT}
+ self._apply_and_verify_eni(
+ self.distro.apply_network_config,
+ V1_NET_CFG_IPV6,
+ expected_cfgs=expected_cfgs.copy(),
+ )
class TestNetCfgDistroUbuntuNetplan(TestNetCfgDistroBase):
def setUp(self):
super(TestNetCfgDistroUbuntuNetplan, self).setUp()
- self.distro = self._get_distro('ubuntu', renderers=['netplan'])
- self.devlist = ['eth0', 'lo']
+ self.distro = self._get_distro("ubuntu", renderers=["netplan"])
+ self.devlist = ["eth0", "lo"]
- def _apply_and_verify_netplan(self, apply_fn, config, expected_cfgs=None,
- bringup=False):
+ def _apply_and_verify_netplan(
+ self, apply_fn, config, expected_cfgs=None, bringup=False
+ ):
if not expected_cfgs:
- raise ValueError('expected_cfg must not be None')
+ raise ValueError("expected_cfg must not be None")
tmpd = None
- with mock.patch('cloudinit.net.netplan.available',
- return_value=True):
- with mock.patch("cloudinit.net.netplan.get_devicelist",
- return_value=self.devlist):
+ with mock.patch("cloudinit.net.netplan.available", return_value=True):
+ with mock.patch(
+ "cloudinit.net.netplan.get_devicelist",
+ return_value=self.devlist,
+ ):
with self.reRooted(tmpd) as tmpd:
apply_fn(config, bringup)
@@ -432,7 +450,7 @@ class TestNetCfgDistroUbuntuNetplan(TestNetCfgDistroBase):
self.assertEqual(0o644, get_mode(cfgpath, tmpd))
def netplan_path(self):
- return '/etc/netplan/50-cloud-init.yaml'
+ return "/etc/netplan/50-cloud-init.yaml"
def test_apply_network_config_v1_to_netplan_ub(self):
expected_cfgs = {
@@ -440,9 +458,11 @@ class TestNetCfgDistroUbuntuNetplan(TestNetCfgDistroBase):
}
# ub_distro.apply_network_config(V1_NET_CFG, False)
- self._apply_and_verify_netplan(self.distro.apply_network_config,
- V1_NET_CFG,
- expected_cfgs=expected_cfgs.copy())
+ self._apply_and_verify_netplan(
+ self.distro.apply_network_config,
+ V1_NET_CFG,
+ expected_cfgs=expected_cfgs.copy(),
+ )
def test_apply_network_config_v1_ipv6_to_netplan_ub(self):
expected_cfgs = {
@@ -450,39 +470,43 @@ class TestNetCfgDistroUbuntuNetplan(TestNetCfgDistroBase):
}
# ub_distro.apply_network_config(V1_NET_CFG_IPV6, False)
- self._apply_and_verify_netplan(self.distro.apply_network_config,
- V1_NET_CFG_IPV6,
- expected_cfgs=expected_cfgs.copy())
+ self._apply_and_verify_netplan(
+ self.distro.apply_network_config,
+ V1_NET_CFG_IPV6,
+ expected_cfgs=expected_cfgs.copy(),
+ )
def test_apply_network_config_v2_passthrough_ub(self):
expected_cfgs = {
self.netplan_path(): V2_TO_V2_NET_CFG_OUTPUT,
}
# ub_distro.apply_network_config(V2_NET_CFG, False)
- self._apply_and_verify_netplan(self.distro.apply_network_config,
- V2_NET_CFG,
- expected_cfgs=expected_cfgs.copy())
+ self._apply_and_verify_netplan(
+ self.distro.apply_network_config,
+ V2_NET_CFG,
+ expected_cfgs=expected_cfgs.copy(),
+ )
class TestNetCfgDistroRedhat(TestNetCfgDistroBase):
-
def setUp(self):
super(TestNetCfgDistroRedhat, self).setUp()
- self.distro = self._get_distro('rhel', renderers=['sysconfig'])
+ self.distro = self._get_distro("rhel", renderers=["sysconfig"])
def ifcfg_path(self, ifname):
- return '/etc/sysconfig/network-scripts/ifcfg-%s' % ifname
+ return "/etc/sysconfig/network-scripts/ifcfg-%s" % ifname
def control_path(self):
- return '/etc/sysconfig/network'
+ return "/etc/sysconfig/network"
- def _apply_and_verify(self, apply_fn, config, expected_cfgs=None,
- bringup=False):
+ def _apply_and_verify(
+ self, apply_fn, config, expected_cfgs=None, bringup=False
+ ):
if not expected_cfgs:
- raise ValueError('expected_cfg must not be None')
+ raise ValueError("expected_cfg must not be None")
tmpd = None
- with mock.patch('cloudinit.net.sysconfig.available') as m_avail:
+ with mock.patch("cloudinit.net.sysconfig.available") as m_avail:
m_avail.return_value = True
with self.reRooted(tmpd) as tmpd:
apply_fn(config, bringup)
@@ -494,7 +518,8 @@ class TestNetCfgDistroRedhat(TestNetCfgDistroBase):
def test_apply_network_config_rh(self):
expected_cfgs = {
- self.ifcfg_path('eth0'): dedent("""\
+ self.ifcfg_path("eth0"): dedent(
+ """\
BOOTPROTO=none
DEFROUTE=yes
DEVICE=eth0
@@ -505,27 +530,35 @@ class TestNetCfgDistroRedhat(TestNetCfgDistroBase):
ONBOOT=yes
TYPE=Ethernet
USERCTL=no
- """),
- self.ifcfg_path('eth1'): dedent("""\
+ """
+ ),
+ self.ifcfg_path("eth1"): dedent(
+ """\
BOOTPROTO=dhcp
DEVICE=eth1
NM_CONTROLLED=no
ONBOOT=yes
TYPE=Ethernet
USERCTL=no
- """),
- self.control_path(): dedent("""\
+ """
+ ),
+ self.control_path(): dedent(
+ """\
NETWORKING=yes
- """),
+ """
+ ),
}
# rh_distro.apply_network_config(V1_NET_CFG, False)
- self._apply_and_verify(self.distro.apply_network_config,
- V1_NET_CFG,
- expected_cfgs=expected_cfgs.copy())
+ self._apply_and_verify(
+ self.distro.apply_network_config,
+ V1_NET_CFG,
+ expected_cfgs=expected_cfgs.copy(),
+ )
def test_apply_network_config_ipv6_rh(self):
expected_cfgs = {
- self.ifcfg_path('eth0'): dedent("""\
+ self.ifcfg_path("eth0"): dedent(
+ """\
BOOTPROTO=none
DEFROUTE=yes
DEVICE=eth0
@@ -538,39 +571,54 @@ class TestNetCfgDistroRedhat(TestNetCfgDistroBase):
ONBOOT=yes
TYPE=Ethernet
USERCTL=no
- """),
- self.ifcfg_path('eth1'): dedent("""\
+ """
+ ),
+ self.ifcfg_path("eth1"): dedent(
+ """\
BOOTPROTO=dhcp
DEVICE=eth1
NM_CONTROLLED=no
ONBOOT=yes
TYPE=Ethernet
USERCTL=no
- """),
- self.control_path(): dedent("""\
+ """
+ ),
+ self.control_path(): dedent(
+ """\
NETWORKING=yes
NETWORKING_IPV6=yes
IPV6_AUTOCONF=no
- """),
+ """
+ ),
}
# rh_distro.apply_network_config(V1_NET_CFG_IPV6, False)
- self._apply_and_verify(self.distro.apply_network_config,
- V1_NET_CFG_IPV6,
- expected_cfgs=expected_cfgs.copy())
+ self._apply_and_verify(
+ self.distro.apply_network_config,
+ V1_NET_CFG_IPV6,
+ expected_cfgs=expected_cfgs.copy(),
+ )
def test_vlan_render_unsupported(self):
"""Render officially unsupported vlan names."""
cfg = {
- 'version': 2,
- 'ethernets': {
- 'eth0': {'addresses': ["192.10.1.2/24"],
- 'match': {'macaddress': "00:16:3e:60:7c:df"}}},
- 'vlans': {
- 'infra0': {'addresses': ["10.0.1.2/16"],
- 'id': 1001, 'link': 'eth0'}},
+ "version": 2,
+ "ethernets": {
+ "eth0": {
+ "addresses": ["192.10.1.2/24"],
+ "match": {"macaddress": "00:16:3e:60:7c:df"},
+ }
+ },
+ "vlans": {
+ "infra0": {
+ "addresses": ["10.0.1.2/16"],
+ "id": 1001,
+ "link": "eth0",
+ }
+ },
}
expected_cfgs = {
- self.ifcfg_path('eth0'): dedent("""\
+ self.ifcfg_path("eth0"): dedent(
+ """\
BOOTPROTO=none
DEVICE=eth0
HWADDR=00:16:3e:60:7c:df
@@ -580,8 +628,10 @@ class TestNetCfgDistroRedhat(TestNetCfgDistroBase):
ONBOOT=yes
TYPE=Ethernet
USERCTL=no
- """),
- self.ifcfg_path('infra0'): dedent("""\
+ """
+ ),
+ self.ifcfg_path("infra0"): dedent(
+ """\
BOOTPROTO=none
DEVICE=infra0
IPADDR=10.0.1.2
@@ -591,26 +641,33 @@ class TestNetCfgDistroRedhat(TestNetCfgDistroBase):
PHYSDEV=eth0
USERCTL=no
VLAN=yes
- """),
- self.control_path(): dedent("""\
+ """
+ ),
+ self.control_path(): dedent(
+ """\
NETWORKING=yes
- """),
+ """
+ ),
}
self._apply_and_verify(
- self.distro.apply_network_config, cfg,
- expected_cfgs=expected_cfgs)
+ self.distro.apply_network_config, cfg, expected_cfgs=expected_cfgs
+ )
def test_vlan_render(self):
cfg = {
- 'version': 2,
- 'ethernets': {
- 'eth0': {'addresses': ["192.10.1.2/24"]}},
- 'vlans': {
- 'eth0.1001': {'addresses': ["10.0.1.2/16"],
- 'id': 1001, 'link': 'eth0'}},
+ "version": 2,
+ "ethernets": {"eth0": {"addresses": ["192.10.1.2/24"]}},
+ "vlans": {
+ "eth0.1001": {
+ "addresses": ["10.0.1.2/16"],
+ "id": 1001,
+ "link": "eth0",
+ }
+ },
}
expected_cfgs = {
- self.ifcfg_path('eth0'): dedent("""\
+ self.ifcfg_path("eth0"): dedent(
+ """\
BOOTPROTO=none
DEVICE=eth0
IPADDR=192.10.1.2
@@ -619,8 +676,10 @@ class TestNetCfgDistroRedhat(TestNetCfgDistroBase):
ONBOOT=yes
TYPE=Ethernet
USERCTL=no
- """),
- self.ifcfg_path('eth0.1001'): dedent("""\
+ """
+ ),
+ self.ifcfg_path("eth0.1001"): dedent(
+ """\
BOOTPROTO=none
DEVICE=eth0.1001
IPADDR=10.0.1.2
@@ -630,32 +689,35 @@ class TestNetCfgDistroRedhat(TestNetCfgDistroBase):
PHYSDEV=eth0
USERCTL=no
VLAN=yes
- """),
- self.control_path(): dedent("""\
+ """
+ ),
+ self.control_path(): dedent(
+ """\
NETWORKING=yes
- """),
+ """
+ ),
}
self._apply_and_verify(
- self.distro.apply_network_config, cfg,
- expected_cfgs=expected_cfgs)
+ self.distro.apply_network_config, cfg, expected_cfgs=expected_cfgs
+ )
class TestNetCfgDistroOpensuse(TestNetCfgDistroBase):
-
def setUp(self):
super(TestNetCfgDistroOpensuse, self).setUp()
- self.distro = self._get_distro('opensuse', renderers=['sysconfig'])
+ self.distro = self._get_distro("opensuse", renderers=["sysconfig"])
def ifcfg_path(self, ifname):
- return '/etc/sysconfig/network/ifcfg-%s' % ifname
+ return "/etc/sysconfig/network/ifcfg-%s" % ifname
- def _apply_and_verify(self, apply_fn, config, expected_cfgs=None,
- bringup=False):
+ def _apply_and_verify(
+ self, apply_fn, config, expected_cfgs=None, bringup=False
+ ):
if not expected_cfgs:
- raise ValueError('expected_cfg must not be None')
+ raise ValueError("expected_cfg must not be None")
tmpd = None
- with mock.patch('cloudinit.net.sysconfig.available') as m_avail:
+ with mock.patch("cloudinit.net.sysconfig.available") as m_avail:
m_avail.return_value = True
with self.reRooted(tmpd) as tmpd:
apply_fn(config, bringup)
@@ -668,52 +730,71 @@ class TestNetCfgDistroOpensuse(TestNetCfgDistroBase):
def test_apply_network_config_opensuse(self):
"""Opensuse uses apply_network_config and renders sysconfig"""
expected_cfgs = {
- self.ifcfg_path('eth0'): dedent("""\
+ self.ifcfg_path("eth0"): dedent(
+ """\
BOOTPROTO=static
IPADDR=192.168.1.5
NETMASK=255.255.255.0
STARTMODE=auto
- """),
- self.ifcfg_path('eth1'): dedent("""\
+ """
+ ),
+ self.ifcfg_path("eth1"): dedent(
+ """\
BOOTPROTO=dhcp4
STARTMODE=auto
- """),
+ """
+ ),
}
- self._apply_and_verify(self.distro.apply_network_config,
- V1_NET_CFG,
- expected_cfgs=expected_cfgs.copy())
+ self._apply_and_verify(
+ self.distro.apply_network_config,
+ V1_NET_CFG,
+ expected_cfgs=expected_cfgs.copy(),
+ )
def test_apply_network_config_ipv6_opensuse(self):
"""Opensuse uses apply_network_config and renders sysconfig w/ipv6"""
expected_cfgs = {
- self.ifcfg_path('eth0'): dedent("""\
+ self.ifcfg_path("eth0"): dedent(
+ """\
BOOTPROTO=static
IPADDR6=2607:f0d0:1002:0011::2/64
STARTMODE=auto
- """),
- self.ifcfg_path('eth1'): dedent("""\
+ """
+ ),
+ self.ifcfg_path("eth1"): dedent(
+ """\
BOOTPROTO=dhcp4
STARTMODE=auto
- """),
+ """
+ ),
}
- self._apply_and_verify(self.distro.apply_network_config,
- V1_NET_CFG_IPV6,
- expected_cfgs=expected_cfgs.copy())
+ self._apply_and_verify(
+ self.distro.apply_network_config,
+ V1_NET_CFG_IPV6,
+ expected_cfgs=expected_cfgs.copy(),
+ )
class TestNetCfgDistroArch(TestNetCfgDistroBase):
def setUp(self):
super(TestNetCfgDistroArch, self).setUp()
- self.distro = self._get_distro('arch', renderers=['netplan'])
-
- def _apply_and_verify(self, apply_fn, config, expected_cfgs=None,
- bringup=False, with_netplan=False):
+ self.distro = self._get_distro("arch", renderers=["netplan"])
+
+ def _apply_and_verify(
+ self,
+ apply_fn,
+ config,
+ expected_cfgs=None,
+ bringup=False,
+ with_netplan=False,
+ ):
if not expected_cfgs:
- raise ValueError('expected_cfg must not be None')
+ raise ValueError("expected_cfg must not be None")
tmpd = None
- with mock.patch('cloudinit.net.netplan.available',
- return_value=with_netplan):
+ with mock.patch(
+ "cloudinit.net.netplan.available", return_value=with_netplan
+ ):
with self.reRooted(tmpd) as tmpd:
apply_fn(config, bringup)
@@ -728,10 +809,10 @@ class TestNetCfgDistroArch(TestNetCfgDistroBase):
self.assertEqual(0o644, get_mode(cfgpath, tmpd))
def netctl_path(self, iface):
- return '/etc/netctl/%s' % iface
+ return "/etc/netctl/%s" % iface
def netplan_path(self):
- return '/etc/netplan/50-cloud-init.yaml'
+ return "/etc/netplan/50-cloud-init.yaml"
def test_apply_network_config_v1_without_netplan(self):
# Note that this is in fact an invalid netctl config:
@@ -741,33 +822,40 @@ class TestNetCfgDistroArch(TestNetCfgDistroBase):
# still being used in absence of netplan, not the correctness of the
# rendered netctl config.
expected_cfgs = {
- self.netctl_path('eth0'): dedent("""\
+ self.netctl_path("eth0"): dedent(
+ """\
Address=192.168.1.5/255.255.255.0
Connection=ethernet
DNS=()
Gateway=192.168.1.254
IP=static
Interface=eth0
- """),
- self.netctl_path('eth1'): dedent("""\
+ """
+ ),
+ self.netctl_path("eth1"): dedent(
+ """\
Address=None/None
Connection=ethernet
DNS=()
Gateway=
IP=dhcp
Interface=eth1
- """),
+ """
+ ),
}
# ub_distro.apply_network_config(V1_NET_CFG, False)
- self._apply_and_verify(self.distro.apply_network_config,
- V1_NET_CFG,
- expected_cfgs=expected_cfgs.copy(),
- with_netplan=False)
+ self._apply_and_verify(
+ self.distro.apply_network_config,
+ V1_NET_CFG,
+ expected_cfgs=expected_cfgs.copy(),
+ with_netplan=False,
+ )
def test_apply_network_config_v1_with_netplan(self):
expected_cfgs = {
- self.netplan_path(): dedent("""\
+ self.netplan_path(): dedent(
+ """\
# generated by cloud-init
network:
version: 2
@@ -778,31 +866,32 @@ class TestNetCfgDistroArch(TestNetCfgDistroBase):
gateway4: 192.168.1.254
eth1:
dhcp4: true
- """),
+ """
+ ),
}
with mock.patch(
- 'cloudinit.net.netplan.get_devicelist',
- return_value=[]
+ "cloudinit.net.netplan.get_devicelist", return_value=[]
):
- self._apply_and_verify(self.distro.apply_network_config,
- V1_NET_CFG,
- expected_cfgs=expected_cfgs.copy(),
- with_netplan=True)
+ self._apply_and_verify(
+ self.distro.apply_network_config,
+ V1_NET_CFG,
+ expected_cfgs=expected_cfgs.copy(),
+ with_netplan=True,
+ )
class TestNetCfgDistroPhoton(TestNetCfgDistroBase):
-
def setUp(self):
super(TestNetCfgDistroPhoton, self).setUp()
- self.distro = self._get_distro('photon', renderers=['networkd'])
+ self.distro = self._get_distro("photon", renderers=["networkd"])
def create_conf_dict(self, contents):
content_dict = {}
for line in contents:
if line:
line = line.strip()
- if line and re.search(r'^\[(.+)\]$', line):
+ if line and re.search(r"^\[(.+)\]$", line):
content_dict[line] = []
key = line
elif line:
@@ -815,13 +904,14 @@ class TestNetCfgDistroPhoton(TestNetCfgDistroBase):
for k, v in actual.items():
self.assertEqual(sorted(expected[k]), sorted(v))
- def _apply_and_verify(self, apply_fn, config, expected_cfgs=None,
- bringup=False):
+ def _apply_and_verify(
+ self, apply_fn, config, expected_cfgs=None, bringup=False
+ ):
if not expected_cfgs:
- raise ValueError('expected_cfg must not be None')
+ raise ValueError("expected_cfg must not be None")
tmpd = None
- with mock.patch('cloudinit.net.networkd.available') as m_avail:
+ with mock.patch("cloudinit.net.networkd.available") as m_avail:
m_avail.return_value = True
with self.reRooted(tmpd) as tmpd:
apply_fn(config, bringup)
@@ -833,10 +923,11 @@ class TestNetCfgDistroPhoton(TestNetCfgDistroBase):
self.assertEqual(0o644, get_mode(cfgpath, tmpd))
def nwk_file_path(self, ifname):
- return '/etc/systemd/network/10-cloud-init-%s.network' % ifname
+ return "/etc/systemd/network/10-cloud-init-%s.network" % ifname
def net_cfg_1(self, ifname):
- ret = """\
+ ret = (
+ """\
[Match]
Name=%s
[Network]
@@ -844,48 +935,53 @@ class TestNetCfgDistroPhoton(TestNetCfgDistroBase):
[Address]
Address=192.168.1.5/24
[Route]
- Gateway=192.168.1.254""" % ifname
+ Gateway=192.168.1.254"""
+ % ifname
+ )
return ret
def net_cfg_2(self, ifname):
- ret = """\
+ ret = (
+ """\
[Match]
Name=%s
[Network]
- DHCP=ipv4""" % ifname
+ DHCP=ipv4"""
+ % ifname
+ )
return ret
def test_photon_network_config_v1(self):
- tmp = self.net_cfg_1('eth0').splitlines()
+ tmp = self.net_cfg_1("eth0").splitlines()
expected_eth0 = self.create_conf_dict(tmp)
- tmp = self.net_cfg_2('eth1').splitlines()
+ tmp = self.net_cfg_2("eth1").splitlines()
expected_eth1 = self.create_conf_dict(tmp)
expected_cfgs = {
- self.nwk_file_path('eth0'): expected_eth0,
- self.nwk_file_path('eth1'): expected_eth1,
+ self.nwk_file_path("eth0"): expected_eth0,
+ self.nwk_file_path("eth1"): expected_eth1,
}
- self._apply_and_verify(self.distro.apply_network_config,
- V1_NET_CFG,
- expected_cfgs.copy())
+ self._apply_and_verify(
+ self.distro.apply_network_config, V1_NET_CFG, expected_cfgs.copy()
+ )
def test_photon_network_config_v2(self):
- tmp = self.net_cfg_1('eth7').splitlines()
+ tmp = self.net_cfg_1("eth7").splitlines()
expected_eth7 = self.create_conf_dict(tmp)
- tmp = self.net_cfg_2('eth9').splitlines()
+ tmp = self.net_cfg_2("eth9").splitlines()
expected_eth9 = self.create_conf_dict(tmp)
expected_cfgs = {
- self.nwk_file_path('eth7'): expected_eth7,
- self.nwk_file_path('eth9'): expected_eth9,
+ self.nwk_file_path("eth7"): expected_eth7,
+ self.nwk_file_path("eth9"): expected_eth9,
}
- self._apply_and_verify(self.distro.apply_network_config,
- V2_NET_CFG,
- expected_cfgs.copy())
+ self._apply_and_verify(
+ self.distro.apply_network_config, V2_NET_CFG, expected_cfgs.copy()
+ )
def test_photon_network_config_v1_with_duplicates(self):
expected = """\
@@ -902,15 +998,16 @@ class TestNetCfgDistroPhoton(TestNetCfgDistroBase):
expected = self.create_conf_dict(expected.splitlines())
expected_cfgs = {
- self.nwk_file_path('eth0'): expected,
+ self.nwk_file_path("eth0"): expected,
}
- self._apply_and_verify(self.distro.apply_network_config,
- net_cfg,
- expected_cfgs.copy())
+ self._apply_and_verify(
+ self.distro.apply_network_config, net_cfg, expected_cfgs.copy()
+ )
def get_mode(path, target=None):
return os.stat(subp.target_path(target, path)).st_mode & 0o777
+
# vi: ts=4 expandtab
diff --git a/tests/unittests/distros/test_networking.py b/tests/unittests/distros/test_networking.py
index ec508f4d..635f6901 100644
--- a/tests/unittests/distros/test_networking.py
+++ b/tests/unittests/distros/test_networking.py
@@ -1,3 +1,6 @@
+# See https://docs.pytest.org/en/stable/example
+# /parametrize.html#parametrizing-conditional-raising
+from contextlib import ExitStack as does_not_raise
from unittest import mock
import pytest
@@ -9,10 +12,6 @@ from cloudinit.distros.networking import (
Networking,
)
-# See https://docs.pytest.org/en/stable/example
-# /parametrize.html#parametrizing-conditional-raising
-from contextlib import ExitStack as does_not_raise
-
@pytest.yield_fixture
def generic_networking_cls():
@@ -35,7 +34,8 @@ def generic_networking_cls():
error = AssertionError("Unexpectedly used /sys in generic networking code")
with mock.patch(
- "cloudinit.net.get_sys_class_path", side_effect=error,
+ "cloudinit.net.get_sys_class_path",
+ side_effect=error,
):
yield TestNetworking
@@ -91,8 +91,10 @@ class TestLinuxNetworkingTrySetLinkUp:
m_is_up.return_value = True
is_success = LinuxNetworking().try_set_link_up(devname)
- assert (mock.call(['ip', 'link', 'set', devname, 'up']) ==
- m_subp.call_args_list[-1])
+ assert (
+ mock.call(["ip", "link", "set", devname, "up"])
+ == m_subp.call_args_list[-1]
+ )
assert is_success
def test_calls_subp_return_false(self, m_subp, m_is_up):
@@ -100,8 +102,10 @@ class TestLinuxNetworkingTrySetLinkUp:
m_is_up.return_value = False
is_success = LinuxNetworking().try_set_link_up(devname)
- assert (mock.call(['ip', 'link', 'set', devname, 'up']) ==
- m_subp.call_args_list[-1])
+ assert (
+ mock.call(["ip", "link", "set", devname, "up"])
+ == m_subp.call_args_list[-1]
+ )
assert not is_success
@@ -153,7 +157,9 @@ class TestNetworkingWaitForPhysDevs:
return netcfg
def test_skips_settle_if_all_present(
- self, generic_networking_cls, wait_for_physdevs_netcfg,
+ self,
+ generic_networking_cls,
+ wait_for_physdevs_netcfg,
):
networking = generic_networking_cls()
with mock.patch.object(
@@ -169,7 +175,9 @@ class TestNetworkingWaitForPhysDevs:
assert 0 == m_settle.call_count
def test_calls_udev_settle_on_missing(
- self, generic_networking_cls, wait_for_physdevs_netcfg,
+ self,
+ generic_networking_cls,
+ wait_for_physdevs_netcfg,
):
networking = generic_networking_cls()
with mock.patch.object(
diff --git a/tests/unittests/distros/test_opensuse.py b/tests/unittests/distros/test_opensuse.py
index 4ff26102..4a4b266f 100644
--- a/tests/unittests/distros/test_opensuse.py
+++ b/tests/unittests/distros/test_opensuse.py
@@ -6,7 +6,6 @@ from . import _get_distro
class TestopenSUSE(CiTestCase):
-
def test_get_distro(self):
distro = _get_distro("opensuse")
- self.assertEqual(distro.osfamily, 'suse')
+ self.assertEqual(distro.osfamily, "suse")
diff --git a/tests/unittests/distros/test_photon.py b/tests/unittests/distros/test_photon.py
index 3858f723..fed30c2b 100644
--- a/tests/unittests/distros/test_photon.py
+++ b/tests/unittests/distros/test_photon.py
@@ -1,34 +1,34 @@
# This file is part of cloud-init. See LICENSE file for license information.
-from . import _get_distro
from cloudinit import util
-from tests.unittests.helpers import mock
-from tests.unittests.helpers import CiTestCase
+from tests.unittests.helpers import CiTestCase, mock
+
+from . import _get_distro
SYSTEM_INFO = {
- 'paths': {
- 'cloud_dir': '/var/lib/cloud/',
- 'templates_dir': '/etc/cloud/templates/',
+ "paths": {
+ "cloud_dir": "/var/lib/cloud/",
+ "templates_dir": "/etc/cloud/templates/",
},
- 'network': {'renderers': 'networkd'},
+ "network": {"renderers": "networkd"},
}
class TestPhoton(CiTestCase):
with_logs = True
- distro = _get_distro('photon', SYSTEM_INFO)
- expected_log_line = 'Rely on PhotonOS default network config'
+ distro = _get_distro("photon", SYSTEM_INFO)
+ expected_log_line = "Rely on PhotonOS default network config"
def test_network_renderer(self):
- self.assertEqual(self.distro._cfg['network']['renderers'], 'networkd')
+ self.assertEqual(self.distro._cfg["network"]["renderers"], "networkd")
def test_get_distro(self):
- self.assertEqual(self.distro.osfamily, 'photon')
+ self.assertEqual(self.distro.osfamily, "photon")
@mock.patch("cloudinit.distros.photon.subp.subp")
def test_write_hostname(self, m_subp):
- hostname = 'myhostname'
- hostfile = self.tmp_path('previous-hostname')
+ hostname = "myhostname"
+ hostfile = self.tmp_path("previous-hostname")
self.distro._write_hostname(hostname, hostfile)
self.assertEqual(hostname, util.load_file(hostfile))
@@ -36,7 +36,7 @@ class TestPhoton(CiTestCase):
self.assertEqual(ret, hostname)
m_subp.return_value = (None, None)
- hostfile += 'hostfile'
+ hostfile += "hostfile"
self.distro._write_hostname(hostname, hostfile)
m_subp.return_value = (hostname, None)
@@ -44,25 +44,25 @@ class TestPhoton(CiTestCase):
self.assertEqual(ret, hostname)
self.logs.truncate(0)
- m_subp.return_value = (None, 'bla')
+ m_subp.return_value = (None, "bla")
self.distro._write_hostname(hostname, None)
- self.assertIn('Error while setting hostname', self.logs.getvalue())
+ self.assertIn("Error while setting hostname", self.logs.getvalue())
- @mock.patch('cloudinit.net.generate_fallback_config')
+ @mock.patch("cloudinit.net.generate_fallback_config")
def test_fallback_netcfg(self, m_fallback_cfg):
- key = 'disable_fallback_netcfg'
+ key = "disable_fallback_netcfg"
# Don't use fallback if no setting given
self.logs.truncate(0)
- assert(self.distro.generate_fallback_config() is None)
+ assert self.distro.generate_fallback_config() is None
self.assertIn(self.expected_log_line, self.logs.getvalue())
self.logs.truncate(0)
self.distro._cfg[key] = True
- assert(self.distro.generate_fallback_config() is None)
+ assert self.distro.generate_fallback_config() is None
self.assertIn(self.expected_log_line, self.logs.getvalue())
self.logs.truncate(0)
self.distro._cfg[key] = False
- assert(self.distro.generate_fallback_config() is not None)
+ assert self.distro.generate_fallback_config() is not None
self.assertNotIn(self.expected_log_line, self.logs.getvalue())
diff --git a/tests/unittests/distros/test_resolv.py b/tests/unittests/distros/test_resolv.py
index e7971627..65e78101 100644
--- a/tests/unittests/distros/test_resolv.py
+++ b/tests/unittests/distros/test_resolv.py
@@ -1,18 +1,16 @@
# This file is part of cloud-init. See LICENSE file for license information.
-from cloudinit.distros.parsers import resolv_conf
-
-from tests.unittests.helpers import TestCase
-
import re
+from cloudinit.distros.parsers import resolv_conf
+from tests.unittests.helpers import TestCase
-BASE_RESOLVE = '''
+BASE_RESOLVE = """
; generated by /sbin/dhclient-script
search blah.yahoo.com yahoo.com
nameserver 10.15.44.14
nameserver 10.15.30.92
-'''
+"""
BASE_RESOLVE = BASE_RESOLVE.strip()
@@ -27,39 +25,40 @@ class TestResolvHelper(TestCase):
self.assertIsNone(rp.local_domain)
rp.local_domain = "bob"
- self.assertEqual('bob', rp.local_domain)
- self.assertIn('domain bob', str(rp))
+ self.assertEqual("bob", rp.local_domain)
+ self.assertIn("domain bob", str(rp))
def test_nameservers(self):
rp = resolv_conf.ResolvConf(BASE_RESOLVE)
- self.assertIn('10.15.44.14', rp.nameservers)
- self.assertIn('10.15.30.92', rp.nameservers)
- rp.add_nameserver('10.2')
- self.assertIn('10.2', rp.nameservers)
- self.assertIn('nameserver 10.2', str(rp))
- self.assertNotIn('10.3', rp.nameservers)
+ self.assertIn("10.15.44.14", rp.nameservers)
+ self.assertIn("10.15.30.92", rp.nameservers)
+ rp.add_nameserver("10.2")
+ self.assertIn("10.2", rp.nameservers)
+ self.assertIn("nameserver 10.2", str(rp))
+ self.assertNotIn("10.3", rp.nameservers)
self.assertEqual(len(rp.nameservers), 3)
- rp.add_nameserver('10.2')
- rp.add_nameserver('10.3')
- self.assertNotIn('10.3', rp.nameservers)
+ rp.add_nameserver("10.2")
+ rp.add_nameserver("10.3")
+ self.assertNotIn("10.3", rp.nameservers)
def test_search_domains(self):
rp = resolv_conf.ResolvConf(BASE_RESOLVE)
- self.assertIn('yahoo.com', rp.search_domains)
- self.assertIn('blah.yahoo.com', rp.search_domains)
- rp.add_search_domain('bbb.y.com')
- self.assertIn('bbb.y.com', rp.search_domains)
- self.assertTrue(re.search(r'search(.*)bbb.y.com(.*)', str(rp)))
- self.assertIn('bbb.y.com', rp.search_domains)
- rp.add_search_domain('bbb.y.com')
+ self.assertIn("yahoo.com", rp.search_domains)
+ self.assertIn("blah.yahoo.com", rp.search_domains)
+ rp.add_search_domain("bbb.y.com")
+ self.assertIn("bbb.y.com", rp.search_domains)
+ self.assertTrue(re.search(r"search(.*)bbb.y.com(.*)", str(rp)))
+ self.assertIn("bbb.y.com", rp.search_domains)
+ rp.add_search_domain("bbb.y.com")
self.assertEqual(len(rp.search_domains), 3)
- rp.add_search_domain('bbb2.y.com')
+ rp.add_search_domain("bbb2.y.com")
self.assertEqual(len(rp.search_domains), 4)
- rp.add_search_domain('bbb3.y.com')
+ rp.add_search_domain("bbb3.y.com")
self.assertEqual(len(rp.search_domains), 5)
- rp.add_search_domain('bbb4.y.com')
+ rp.add_search_domain("bbb4.y.com")
self.assertEqual(len(rp.search_domains), 6)
- self.assertRaises(ValueError, rp.add_search_domain, 'bbb5.y.com')
+ self.assertRaises(ValueError, rp.add_search_domain, "bbb5.y.com")
self.assertEqual(len(rp.search_domains), 6)
+
# vi: ts=4 expandtab
diff --git a/tests/unittests/distros/test_sles.py b/tests/unittests/distros/test_sles.py
index 04514a19..66b8b13d 100644
--- a/tests/unittests/distros/test_sles.py
+++ b/tests/unittests/distros/test_sles.py
@@ -6,7 +6,6 @@ from . import _get_distro
class TestSLES(CiTestCase):
-
def test_get_distro(self):
distro = _get_distro("sles")
- self.assertEqual(distro.osfamily, 'suse')
+ self.assertEqual(distro.osfamily, "suse")
diff --git a/tests/unittests/distros/test_sysconfig.py b/tests/unittests/distros/test_sysconfig.py
index 4368496d..d0979e17 100644
--- a/tests/unittests/distros/test_sysconfig.py
+++ b/tests/unittests/distros/test_sysconfig.py
@@ -3,22 +3,23 @@
import re
from cloudinit.distros.parsers.sys_conf import SysConf
-
from tests.unittests.helpers import TestCase
-
# Lots of good examples @
# http://content.hccfl.edu/pollock/AUnix1/SysconfigFilesDesc.txt
+
class TestSysConfHelper(TestCase):
# This function was added in 2.7, make it work for 2.6
def assertRegMatches(self, text, regexp):
regexp = re.compile(regexp)
- self.assertTrue(regexp.search(text),
- msg="%s must match %s!" % (text, regexp.pattern))
+ self.assertTrue(
+ regexp.search(text),
+ msg="%s must match %s!" % (text, regexp.pattern),
+ )
def test_parse_no_change(self):
- contents = '''# A comment
+ contents = """# A comment
USESMBAUTH=no
KEYTABLE=/usr/lib/kbd/keytables/us.map
SHORTDATE=$(date +%y:%m:%d:%H:%M)
@@ -28,59 +29,64 @@ NETMASK0=255.255.255.0
LIST=$LOGROOT/incremental-list
IPV6TO4_ROUTING='eth0-:0004::1/64 eth1-:0005::1/64'
ETHTOOL_OPTS="-K ${DEVICE} tso on; -G ${DEVICE} rx 256 tx 256"
-USEMD5=no'''
+USEMD5=no"""
conf = SysConf(contents.splitlines())
- self.assertEqual(conf['HOSTNAME'], 'blahblah')
- self.assertEqual(conf['SHORTDATE'], '$(date +%y:%m:%d:%H:%M)')
+ self.assertEqual(conf["HOSTNAME"], "blahblah")
+ self.assertEqual(conf["SHORTDATE"], "$(date +%y:%m:%d:%H:%M)")
# Should be unquoted
- self.assertEqual(conf['ETHTOOL_OPTS'], ('-K ${DEVICE} tso on; '
- '-G ${DEVICE} rx 256 tx 256'))
+ self.assertEqual(
+ conf["ETHTOOL_OPTS"],
+ "-K ${DEVICE} tso on; -G ${DEVICE} rx 256 tx 256",
+ )
self.assertEqual(contents, str(conf))
def test_parse_shell_vars(self):
- contents = 'USESMBAUTH=$XYZ'
+ contents = "USESMBAUTH=$XYZ"
conf = SysConf(contents.splitlines())
self.assertEqual(contents, str(conf))
- conf = SysConf('')
- conf['B'] = '${ZZ}d apples'
+ conf = SysConf("")
+ conf["B"] = "${ZZ}d apples"
# Should be quoted
self.assertEqual('B="${ZZ}d apples"', str(conf))
- conf = SysConf('')
- conf['B'] = '$? d apples'
+ conf = SysConf("")
+ conf["B"] = "$? d apples"
self.assertEqual('B="$? d apples"', str(conf))
contents = 'IPMI_WATCHDOG_OPTIONS="timeout=60"'
conf = SysConf(contents.splitlines())
- self.assertEqual('IPMI_WATCHDOG_OPTIONS=timeout=60', str(conf))
+ self.assertEqual("IPMI_WATCHDOG_OPTIONS=timeout=60", str(conf))
def test_parse_adjust(self):
contents = 'IPV6TO4_ROUTING="eth0-:0004::1/64 eth1-:0005::1/64"'
conf = SysConf(contents.splitlines())
# Should be unquoted
- self.assertEqual('eth0-:0004::1/64 eth1-:0005::1/64',
- conf['IPV6TO4_ROUTING'])
- conf['IPV6TO4_ROUTING'] = "blah \tblah"
+ self.assertEqual(
+ "eth0-:0004::1/64 eth1-:0005::1/64", conf["IPV6TO4_ROUTING"]
+ )
+ conf["IPV6TO4_ROUTING"] = "blah \tblah"
contents2 = str(conf).strip()
# Should be requoted due to whitespace
- self.assertRegMatches(contents2,
- r'IPV6TO4_ROUTING=[\']blah\s+blah[\']')
+ self.assertRegMatches(
+ contents2, r"IPV6TO4_ROUTING=[\']blah\s+blah[\']"
+ )
def test_parse_no_adjust_shell(self):
- conf = SysConf(''.splitlines())
- conf['B'] = ' $(time)'
+ conf = SysConf("".splitlines())
+ conf["B"] = " $(time)"
contents = str(conf)
- self.assertEqual('B= $(time)', contents)
+ self.assertEqual("B= $(time)", contents)
def test_parse_empty(self):
- contents = ''
+ contents = ""
conf = SysConf(contents.splitlines())
- self.assertEqual('', str(conf).strip())
+ self.assertEqual("", str(conf).strip())
def test_parse_add_new(self):
- contents = 'BLAH=b'
+ contents = "BLAH=b"
conf = SysConf(contents.splitlines())
- conf['Z'] = 'd'
+ conf["Z"] = "d"
contents = str(conf)
self.assertIn("Z=d", contents)
self.assertIn("BLAH=b", contents)
+
# vi: ts=4 expandtab
diff --git a/tests/unittests/distros/test_user_data_normalize.py b/tests/unittests/distros/test_user_data_normalize.py
index bd8f2adb..67ea024b 100644
--- a/tests/unittests/distros/test_user_data_normalize.py
+++ b/tests/unittests/distros/test_user_data_normalize.py
@@ -2,251 +2,233 @@
from unittest import mock
-from cloudinit import distros
+from cloudinit import distros, helpers, settings
from cloudinit.distros import ug_util
-from cloudinit import helpers
-from cloudinit import settings
-
from tests.unittests.helpers import TestCase
-
bcfg = {
- 'name': 'bob',
- 'plain_text_passwd': 'ubuntu',
- 'home': "/home/ubuntu",
- 'shell': "/bin/bash",
- 'lock_passwd': True,
- 'gecos': "Ubuntu",
- 'groups': ["foo"]
+ "name": "bob",
+ "plain_text_passwd": "ubuntu",
+ "home": "/home/ubuntu",
+ "shell": "/bin/bash",
+ "lock_passwd": True,
+ "gecos": "Ubuntu",
+ "groups": ["foo"],
}
class TestUGNormalize(TestCase):
-
def setUp(self):
super(TestUGNormalize, self).setUp()
- self.add_patch('cloudinit.util.system_is_snappy', 'm_snappy')
+ self.add_patch("cloudinit.util.system_is_snappy", "m_snappy")
def _make_distro(self, dtype, def_user=None):
cfg = dict(settings.CFG_BUILTIN)
- cfg['system_info']['distro'] = dtype
- paths = helpers.Paths(cfg['system_info']['paths'])
+ cfg["system_info"]["distro"] = dtype
+ paths = helpers.Paths(cfg["system_info"]["paths"])
distro_cls = distros.fetch(dtype)
if def_user:
- cfg['system_info']['default_user'] = def_user.copy()
- distro = distro_cls(dtype, cfg['system_info'], paths)
+ cfg["system_info"]["default_user"] = def_user.copy()
+ distro = distro_cls(dtype, cfg["system_info"], paths)
return distro
def _norm(self, cfg, distro):
return ug_util.normalize_users_groups(cfg, distro)
def test_group_dict(self):
- distro = self._make_distro('ubuntu')
- g = {'groups':
- [{'ubuntu': ['foo', 'bar'],
- 'bob': 'users'},
- 'cloud-users',
- {'bob': 'users2'}]}
+ distro = self._make_distro("ubuntu")
+ g = {
+ "groups": [
+ {"ubuntu": ["foo", "bar"], "bob": "users"},
+ "cloud-users",
+ {"bob": "users2"},
+ ]
+ }
(_users, groups) = self._norm(g, distro)
- self.assertIn('ubuntu', groups)
- ub_members = groups['ubuntu']
- self.assertEqual(sorted(['foo', 'bar']), sorted(ub_members))
- self.assertIn('bob', groups)
- b_members = groups['bob']
- self.assertEqual(sorted(['users', 'users2']),
- sorted(b_members))
+ self.assertIn("ubuntu", groups)
+ ub_members = groups["ubuntu"]
+ self.assertEqual(sorted(["foo", "bar"]), sorted(ub_members))
+ self.assertIn("bob", groups)
+ b_members = groups["bob"]
+ self.assertEqual(sorted(["users", "users2"]), sorted(b_members))
def test_basic_groups(self):
- distro = self._make_distro('ubuntu')
+ distro = self._make_distro("ubuntu")
ug_cfg = {
- 'groups': ['bob'],
+ "groups": ["bob"],
}
(users, groups) = self._norm(ug_cfg, distro)
- self.assertIn('bob', groups)
+ self.assertIn("bob", groups)
self.assertEqual({}, users)
def test_csv_groups(self):
- distro = self._make_distro('ubuntu')
+ distro = self._make_distro("ubuntu")
ug_cfg = {
- 'groups': 'bob,joe,steve',
+ "groups": "bob,joe,steve",
}
(users, groups) = self._norm(ug_cfg, distro)
- self.assertIn('bob', groups)
- self.assertIn('joe', groups)
- self.assertIn('steve', groups)
+ self.assertIn("bob", groups)
+ self.assertIn("joe", groups)
+ self.assertIn("steve", groups)
self.assertEqual({}, users)
def test_more_groups(self):
- distro = self._make_distro('ubuntu')
- ug_cfg = {
- 'groups': ['bob', 'joe', 'steve']
- }
+ distro = self._make_distro("ubuntu")
+ ug_cfg = {"groups": ["bob", "joe", "steve"]}
(users, groups) = self._norm(ug_cfg, distro)
- self.assertIn('bob', groups)
- self.assertIn('joe', groups)
- self.assertIn('steve', groups)
+ self.assertIn("bob", groups)
+ self.assertIn("joe", groups)
+ self.assertIn("steve", groups)
self.assertEqual({}, users)
def test_member_groups(self):
- distro = self._make_distro('ubuntu')
+ distro = self._make_distro("ubuntu")
ug_cfg = {
- 'groups': {
- 'bob': ['s'],
- 'joe': [],
- 'steve': [],
+ "groups": {
+ "bob": ["s"],
+ "joe": [],
+ "steve": [],
}
}
(users, groups) = self._norm(ug_cfg, distro)
- self.assertIn('bob', groups)
- self.assertEqual(['s'], groups['bob'])
- self.assertEqual([], groups['joe'])
- self.assertIn('joe', groups)
- self.assertIn('steve', groups)
+ self.assertIn("bob", groups)
+ self.assertEqual(["s"], groups["bob"])
+ self.assertEqual([], groups["joe"])
+ self.assertIn("joe", groups)
+ self.assertIn("steve", groups)
self.assertEqual({}, users)
def test_users_simple_dict(self):
- distro = self._make_distro('ubuntu', bcfg)
+ distro = self._make_distro("ubuntu", bcfg)
ug_cfg = {
- 'users': {
- 'default': True,
+ "users": {
+ "default": True,
}
}
(users, _groups) = self._norm(ug_cfg, distro)
- self.assertIn('bob', users)
+ self.assertIn("bob", users)
ug_cfg = {
- 'users': {
- 'default': 'yes',
+ "users": {
+ "default": "yes",
}
}
(users, _groups) = self._norm(ug_cfg, distro)
- self.assertIn('bob', users)
+ self.assertIn("bob", users)
ug_cfg = {
- 'users': {
- 'default': '1',
+ "users": {
+ "default": "1",
}
}
(users, _groups) = self._norm(ug_cfg, distro)
- self.assertIn('bob', users)
+ self.assertIn("bob", users)
def test_users_simple_dict_no(self):
- distro = self._make_distro('ubuntu', bcfg)
+ distro = self._make_distro("ubuntu", bcfg)
ug_cfg = {
- 'users': {
- 'default': False,
+ "users": {
+ "default": False,
}
}
(users, _groups) = self._norm(ug_cfg, distro)
self.assertEqual({}, users)
ug_cfg = {
- 'users': {
- 'default': 'no',
+ "users": {
+ "default": "no",
}
}
(users, _groups) = self._norm(ug_cfg, distro)
self.assertEqual({}, users)
def test_users_simple_csv(self):
- distro = self._make_distro('ubuntu')
+ distro = self._make_distro("ubuntu")
ug_cfg = {
- 'users': 'joe,bob',
+ "users": "joe,bob",
}
(users, _groups) = self._norm(ug_cfg, distro)
- self.assertIn('joe', users)
- self.assertIn('bob', users)
- self.assertEqual({'default': False}, users['joe'])
- self.assertEqual({'default': False}, users['bob'])
+ self.assertIn("joe", users)
+ self.assertIn("bob", users)
+ self.assertEqual({"default": False}, users["joe"])
+ self.assertEqual({"default": False}, users["bob"])
def test_users_simple(self):
- distro = self._make_distro('ubuntu')
+ distro = self._make_distro("ubuntu")
ug_cfg = {
- 'users': [
- 'joe',
- 'bob'
- ],
+ "users": ["joe", "bob"],
}
(users, _groups) = self._norm(ug_cfg, distro)
- self.assertIn('joe', users)
- self.assertIn('bob', users)
- self.assertEqual({'default': False}, users['joe'])
- self.assertEqual({'default': False}, users['bob'])
+ self.assertIn("joe", users)
+ self.assertIn("bob", users)
+ self.assertEqual({"default": False}, users["joe"])
+ self.assertEqual({"default": False}, users["bob"])
def test_users_old_user(self):
- distro = self._make_distro('ubuntu', bcfg)
- ug_cfg = {
- 'user': 'zetta',
- 'users': 'default'
- }
+ distro = self._make_distro("ubuntu", bcfg)
+ ug_cfg = {"user": "zetta", "users": "default"}
(users, _groups) = self._norm(ug_cfg, distro)
- self.assertNotIn('bob', users) # Bob is not the default now, zetta is
- self.assertIn('zetta', users)
- self.assertTrue(users['zetta']['default'])
- self.assertNotIn('default', users)
- ug_cfg = {
- 'user': 'zetta',
- 'users': 'default, joe'
- }
+ self.assertNotIn("bob", users) # Bob is not the default now, zetta is
+ self.assertIn("zetta", users)
+ self.assertTrue(users["zetta"]["default"])
+ self.assertNotIn("default", users)
+ ug_cfg = {"user": "zetta", "users": "default, joe"}
(users, _groups) = self._norm(ug_cfg, distro)
- self.assertNotIn('bob', users) # Bob is not the default now, zetta is
- self.assertIn('joe', users)
- self.assertIn('zetta', users)
- self.assertTrue(users['zetta']['default'])
- self.assertNotIn('default', users)
- ug_cfg = {
- 'user': 'zetta',
- 'users': ['bob', 'joe']
- }
+ self.assertNotIn("bob", users) # Bob is not the default now, zetta is
+ self.assertIn("joe", users)
+ self.assertIn("zetta", users)
+ self.assertTrue(users["zetta"]["default"])
+ self.assertNotIn("default", users)
+ ug_cfg = {"user": "zetta", "users": ["bob", "joe"]}
(users, _groups) = self._norm(ug_cfg, distro)
- self.assertIn('bob', users)
- self.assertIn('joe', users)
- self.assertIn('zetta', users)
- self.assertTrue(users['zetta']['default'])
+ self.assertIn("bob", users)
+ self.assertIn("joe", users)
+ self.assertIn("zetta", users)
+ self.assertTrue(users["zetta"]["default"])
ug_cfg = {
- 'user': 'zetta',
- 'users': {
- 'bob': True,
- 'joe': True,
- }
+ "user": "zetta",
+ "users": {
+ "bob": True,
+ "joe": True,
+ },
}
(users, _groups) = self._norm(ug_cfg, distro)
- self.assertIn('bob', users)
- self.assertIn('joe', users)
- self.assertIn('zetta', users)
- self.assertTrue(users['zetta']['default'])
+ self.assertIn("bob", users)
+ self.assertIn("joe", users)
+ self.assertIn("zetta", users)
+ self.assertTrue(users["zetta"]["default"])
ug_cfg = {
- 'user': 'zetta',
+ "user": "zetta",
}
(users, _groups) = self._norm(ug_cfg, distro)
- self.assertIn('zetta', users)
+ self.assertIn("zetta", users)
ug_cfg = {}
(users, groups) = self._norm(ug_cfg, distro)
self.assertEqual({}, users)
self.assertEqual({}, groups)
def test_users_dict_default_additional(self):
- distro = self._make_distro('ubuntu', bcfg)
+ distro = self._make_distro("ubuntu", bcfg)
ug_cfg = {
- 'users': [
- {'name': 'default', 'blah': True}
- ],
+ "users": [{"name": "default", "blah": True}],
}
(users, _groups) = self._norm(ug_cfg, distro)
- self.assertIn('bob', users)
- self.assertEqual(",".join(distro.get_default_user()['groups']),
- users['bob']['groups'])
- self.assertEqual(True, users['bob']['blah'])
- self.assertEqual(True, users['bob']['default'])
+ self.assertIn("bob", users)
+ self.assertEqual(
+ ",".join(distro.get_default_user()["groups"]),
+ users["bob"]["groups"],
+ )
+ self.assertEqual(True, users["bob"]["blah"])
+ self.assertEqual(True, users["bob"]["default"])
def test_users_dict_extract(self):
- distro = self._make_distro('ubuntu', bcfg)
+ distro = self._make_distro("ubuntu", bcfg)
ug_cfg = {
- 'users': [
- 'default',
+ "users": [
+ "default",
],
}
(users, _groups) = self._norm(ug_cfg, distro)
- self.assertIn('bob', users)
+ self.assertIn("bob", users)
(name, config) = ug_util.extract_default(users)
- self.assertEqual(name, 'bob')
+ self.assertEqual(name, "bob")
expected_config = {}
def_config = None
try:
@@ -258,115 +240,126 @@ class TestUGNormalize(TestCase):
expected_config.update(def_config)
# Ignore these for now
- expected_config.pop('name', None)
- expected_config.pop('groups', None)
- config.pop('groups', None)
+ expected_config.pop("name", None)
+ expected_config.pop("groups", None)
+ config.pop("groups", None)
self.assertEqual(config, expected_config)
def test_users_dict_default(self):
- distro = self._make_distro('ubuntu', bcfg)
+ distro = self._make_distro("ubuntu", bcfg)
ug_cfg = {
- 'users': [
- 'default',
+ "users": [
+ "default",
],
}
(users, _groups) = self._norm(ug_cfg, distro)
- self.assertIn('bob', users)
- self.assertEqual(",".join(distro.get_default_user()['groups']),
- users['bob']['groups'])
- self.assertEqual(True, users['bob']['default'])
+ self.assertIn("bob", users)
+ self.assertEqual(
+ ",".join(distro.get_default_user()["groups"]),
+ users["bob"]["groups"],
+ )
+ self.assertEqual(True, users["bob"]["default"])
def test_users_dict_trans(self):
- distro = self._make_distro('ubuntu')
+ distro = self._make_distro("ubuntu")
ug_cfg = {
- 'users': [
- {'name': 'joe',
- 'tr-me': True},
- {'name': 'bob'},
+ "users": [
+ {"name": "joe", "tr-me": True},
+ {"name": "bob"},
],
}
(users, _groups) = self._norm(ug_cfg, distro)
- self.assertIn('joe', users)
- self.assertIn('bob', users)
- self.assertEqual({'tr_me': True, 'default': False}, users['joe'])
- self.assertEqual({'default': False}, users['bob'])
+ self.assertIn("joe", users)
+ self.assertIn("bob", users)
+ self.assertEqual({"tr_me": True, "default": False}, users["joe"])
+ self.assertEqual({"default": False}, users["bob"])
def test_users_dict(self):
- distro = self._make_distro('ubuntu')
+ distro = self._make_distro("ubuntu")
ug_cfg = {
- 'users': [
- {'name': 'joe'},
- {'name': 'bob'},
+ "users": [
+ {"name": "joe"},
+ {"name": "bob"},
],
}
(users, _groups) = self._norm(ug_cfg, distro)
- self.assertIn('joe', users)
- self.assertIn('bob', users)
- self.assertEqual({'default': False}, users['joe'])
- self.assertEqual({'default': False}, users['bob'])
+ self.assertIn("joe", users)
+ self.assertIn("bob", users)
+ self.assertEqual({"default": False}, users["joe"])
+ self.assertEqual({"default": False}, users["bob"])
- @mock.patch('cloudinit.subp.subp')
+ @mock.patch("cloudinit.subp.subp")
def test_create_snap_user(self, mock_subp):
- mock_subp.side_effect = [('{"username": "joe", "ssh-key-count": 1}\n',
- '')]
- distro = self._make_distro('ubuntu')
+ mock_subp.side_effect = [
+ ('{"username": "joe", "ssh-key-count": 1}\n', "")
+ ]
+ distro = self._make_distro("ubuntu")
ug_cfg = {
- 'users': [
- {'name': 'joe', 'snapuser': 'joe@joe.com'},
+ "users": [
+ {"name": "joe", "snapuser": "joe@joe.com"},
],
}
(users, _groups) = self._norm(ug_cfg, distro)
for (user, config) in users.items():
- print('user=%s config=%s' % (user, config))
+ print("user=%s config=%s" % (user, config))
username = distro.create_user(user, **config)
- snapcmd = ['snap', 'create-user', '--sudoer', '--json', 'joe@joe.com']
+ snapcmd = ["snap", "create-user", "--sudoer", "--json", "joe@joe.com"]
mock_subp.assert_called_with(snapcmd, capture=True, logstring=snapcmd)
- self.assertEqual(username, 'joe')
+ self.assertEqual(username, "joe")
- @mock.patch('cloudinit.subp.subp')
+ @mock.patch("cloudinit.subp.subp")
def test_create_snap_user_known(self, mock_subp):
- mock_subp.side_effect = [('{"username": "joe", "ssh-key-count": 1}\n',
- '')]
- distro = self._make_distro('ubuntu')
+ mock_subp.side_effect = [
+ ('{"username": "joe", "ssh-key-count": 1}\n', "")
+ ]
+ distro = self._make_distro("ubuntu")
ug_cfg = {
- 'users': [
- {'name': 'joe', 'snapuser': 'joe@joe.com', 'known': True},
+ "users": [
+ {"name": "joe", "snapuser": "joe@joe.com", "known": True},
],
}
(users, _groups) = self._norm(ug_cfg, distro)
for (user, config) in users.items():
- print('user=%s config=%s' % (user, config))
+ print("user=%s config=%s" % (user, config))
username = distro.create_user(user, **config)
- snapcmd = ['snap', 'create-user', '--sudoer', '--json', '--known',
- 'joe@joe.com']
+ snapcmd = [
+ "snap",
+ "create-user",
+ "--sudoer",
+ "--json",
+ "--known",
+ "joe@joe.com",
+ ]
mock_subp.assert_called_with(snapcmd, capture=True, logstring=snapcmd)
- self.assertEqual(username, 'joe')
-
- @mock.patch('cloudinit.util.system_is_snappy')
- @mock.patch('cloudinit.util.is_group')
- @mock.patch('cloudinit.subp.subp')
- def test_add_user_on_snappy_system(self, mock_subp, mock_isgrp,
- mock_snappy):
+ self.assertEqual(username, "joe")
+
+ @mock.patch("cloudinit.util.system_is_snappy")
+ @mock.patch("cloudinit.util.is_group")
+ @mock.patch("cloudinit.subp.subp")
+ def test_add_user_on_snappy_system(
+ self, mock_subp, mock_isgrp, mock_snappy
+ ):
mock_isgrp.return_value = False
mock_subp.return_value = True
mock_snappy.return_value = True
- distro = self._make_distro('ubuntu')
+ distro = self._make_distro("ubuntu")
ug_cfg = {
- 'users': [
- {'name': 'joe', 'groups': 'users', 'create_groups': True},
+ "users": [
+ {"name": "joe", "groups": "users", "create_groups": True},
],
}
(users, _groups) = self._norm(ug_cfg, distro)
for (user, config) in users.items():
- print('user=%s config=%s' % (user, config))
+ print("user=%s config=%s" % (user, config))
distro.add_user(user, **config)
- groupcmd = ['groupadd', 'users', '--extrausers']
- addcmd = ['useradd', 'joe', '--extrausers', '--groups', 'users', '-m']
+ groupcmd = ["groupadd", "users", "--extrausers"]
+ addcmd = ["useradd", "joe", "--extrausers", "--groups", "users", "-m"]
mock_subp.assert_any_call(groupcmd)
mock_subp.assert_any_call(addcmd, logstring=addcmd)
+
# vi: ts=4 expandtab