summaryrefslogtreecommitdiff
path: root/tests/unittests/distros/test_create_users.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/distros/test_create_users.py')
-rw-r--r--tests/unittests/distros/test_create_users.py282
1 files changed, 282 insertions, 0 deletions
diff --git a/tests/unittests/distros/test_create_users.py b/tests/unittests/distros/test_create_users.py
new file mode 100644
index 00000000..ddb039bd
--- /dev/null
+++ b/tests/unittests/distros/test_create_users.py
@@ -0,0 +1,282 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+import re
+
+from cloudinit import distros, ssh_util
+from tests.unittests.helpers import CiTestCase, mock
+from tests.unittests.util import abstract_to_concrete
+
+
+@mock.patch("cloudinit.distros.util.system_is_snappy", return_value=False)
+@mock.patch("cloudinit.distros.subp.subp")
+class TestCreateUser(CiTestCase):
+
+ with_logs = True
+
+ def setUp(self):
+ super(TestCreateUser, self).setUp()
+ self.dist = abstract_to_concrete(distros.Distro)(
+ name="test", cfg=None, paths=None
+ )
+
+ def _useradd2call(self, args):
+ # return a mock call for the useradd command in args
+ # with expected 'logstring'.
+ args = ["useradd"] + args
+ logcmd = [a for a in args]
+ for i in range(len(args)):
+ if args[i] in ("--password",):
+ logcmd[i + 1] = "REDACTED"
+ return mock.call(args, logstring=logcmd)
+
+ def test_basic(self, m_subp, m_is_snappy):
+ user = "foouser"
+ self.dist.create_user(user)
+ self.assertEqual(
+ m_subp.call_args_list,
+ [
+ self._useradd2call([user, "-m"]),
+ mock.call(["passwd", "-l", user]),
+ ],
+ )
+
+ def test_no_home(self, m_subp, m_is_snappy):
+ user = "foouser"
+ self.dist.create_user(user, no_create_home=True)
+ self.assertEqual(
+ m_subp.call_args_list,
+ [
+ self._useradd2call([user, "-M"]),
+ mock.call(["passwd", "-l", user]),
+ ],
+ )
+
+ def test_system_user(self, m_subp, m_is_snappy):
+ # system user should have no home and get --system
+ user = "foouser"
+ self.dist.create_user(user, system=True)
+ self.assertEqual(
+ m_subp.call_args_list,
+ [
+ self._useradd2call([user, "--system", "-M"]),
+ mock.call(["passwd", "-l", user]),
+ ],
+ )
+
+ def test_explicit_no_home_false(self, m_subp, m_is_snappy):
+ user = "foouser"
+ self.dist.create_user(user, no_create_home=False)
+ self.assertEqual(
+ m_subp.call_args_list,
+ [
+ self._useradd2call([user, "-m"]),
+ mock.call(["passwd", "-l", user]),
+ ],
+ )
+
+ def test_unlocked(self, m_subp, m_is_snappy):
+ user = "foouser"
+ self.dist.create_user(user, lock_passwd=False)
+ self.assertEqual(
+ m_subp.call_args_list, [self._useradd2call([user, "-m"])]
+ )
+
+ def test_set_password(self, m_subp, m_is_snappy):
+ user = "foouser"
+ password = "passfoo"
+ self.dist.create_user(user, passwd=password)
+ self.assertEqual(
+ m_subp.call_args_list,
+ [
+ self._useradd2call([user, "--password", password, "-m"]),
+ mock.call(["passwd", "-l", user]),
+ ],
+ )
+
+ @mock.patch("cloudinit.distros.util.is_group")
+ def test_group_added(self, m_is_group, m_subp, m_is_snappy):
+ m_is_group.return_value = False
+ user = "foouser"
+ self.dist.create_user(user, groups=["group1"])
+ expected = [
+ mock.call(["groupadd", "group1"]),
+ self._useradd2call([user, "--groups", "group1", "-m"]),
+ mock.call(["passwd", "-l", user]),
+ ]
+ self.assertEqual(m_subp.call_args_list, expected)
+
+ @mock.patch("cloudinit.distros.util.is_group")
+ def test_only_new_group_added(self, m_is_group, m_subp, m_is_snappy):
+ ex_groups = ["existing_group"]
+ groups = ["group1", ex_groups[0]]
+ m_is_group.side_effect = lambda m: m in ex_groups
+ user = "foouser"
+ self.dist.create_user(user, groups=groups)
+ expected = [
+ mock.call(["groupadd", "group1"]),
+ self._useradd2call([user, "--groups", ",".join(groups), "-m"]),
+ mock.call(["passwd", "-l", user]),
+ ]
+ self.assertEqual(m_subp.call_args_list, expected)
+
+ @mock.patch("cloudinit.distros.util.is_group")
+ def test_create_groups_with_whitespace_string(
+ self, m_is_group, m_subp, m_is_snappy
+ ):
+ # groups supported as a comma delimeted string even with white space
+ m_is_group.return_value = False
+ user = "foouser"
+ self.dist.create_user(user, groups="group1, group2")
+ expected = [
+ mock.call(["groupadd", "group1"]),
+ mock.call(["groupadd", "group2"]),
+ self._useradd2call([user, "--groups", "group1,group2", "-m"]),
+ mock.call(["passwd", "-l", user]),
+ ]
+ self.assertEqual(m_subp.call_args_list, expected)
+
+ def test_explicit_sudo_false(self, m_subp, m_is_snappy):
+ user = "foouser"
+ self.dist.create_user(user, sudo=False)
+ self.assertEqual(
+ m_subp.call_args_list,
+ [
+ self._useradd2call([user, "-m"]),
+ mock.call(["passwd", "-l", user]),
+ ],
+ )
+
+ @mock.patch("cloudinit.ssh_util.setup_user_keys")
+ def test_setup_ssh_authorized_keys_with_string(
+ self, m_setup_user_keys, m_subp, m_is_snappy
+ ):
+ """ssh_authorized_keys allows string and calls setup_user_keys."""
+ user = "foouser"
+ self.dist.create_user(user, ssh_authorized_keys="mykey")
+ self.assertEqual(
+ m_subp.call_args_list,
+ [
+ self._useradd2call([user, "-m"]),
+ mock.call(["passwd", "-l", user]),
+ ],
+ )
+ m_setup_user_keys.assert_called_once_with(set(["mykey"]), user)
+
+ @mock.patch("cloudinit.ssh_util.setup_user_keys")
+ def test_setup_ssh_authorized_keys_with_list(
+ self, m_setup_user_keys, m_subp, m_is_snappy
+ ):
+ """ssh_authorized_keys allows lists and calls setup_user_keys."""
+ user = "foouser"
+ self.dist.create_user(user, ssh_authorized_keys=["key1", "key2"])
+ self.assertEqual(
+ m_subp.call_args_list,
+ [
+ self._useradd2call([user, "-m"]),
+ mock.call(["passwd", "-l", user]),
+ ],
+ )
+ m_setup_user_keys.assert_called_once_with(set(["key1", "key2"]), user)
+
+ @mock.patch("cloudinit.ssh_util.setup_user_keys")
+ def test_setup_ssh_authorized_keys_with_integer(
+ self, m_setup_user_keys, m_subp, m_is_snappy
+ ):
+ """ssh_authorized_keys warns on non-iterable/string type."""
+ user = "foouser"
+ self.dist.create_user(user, ssh_authorized_keys=-1)
+ m_setup_user_keys.assert_called_once_with(set([]), user)
+ match = re.match(
+ r".*WARNING: Invalid type \'<(type|class) \'int\'>\' detected for"
+ " 'ssh_authorized_keys'.*",
+ self.logs.getvalue(),
+ re.DOTALL,
+ )
+ self.assertIsNotNone(
+ match, "Missing ssh_authorized_keys invalid type warning"
+ )
+
+ @mock.patch("cloudinit.ssh_util.setup_user_keys")
+ def test_create_user_with_ssh_redirect_user_no_cloud_keys(
+ self, m_setup_user_keys, m_subp, m_is_snappy
+ ):
+ """Log a warning when trying to redirect a user no cloud ssh keys."""
+ user = "foouser"
+ self.dist.create_user(user, ssh_redirect_user="someuser")
+ self.assertIn(
+ "WARNING: Unable to disable SSH logins for foouser given "
+ "ssh_redirect_user: someuser. No cloud public-keys present.\n",
+ self.logs.getvalue(),
+ )
+ m_setup_user_keys.assert_not_called()
+
+ @mock.patch("cloudinit.ssh_util.setup_user_keys")
+ def test_create_user_with_ssh_redirect_user_with_cloud_keys(
+ self, m_setup_user_keys, m_subp, m_is_snappy
+ ):
+ """Disable ssh when ssh_redirect_user and cloud ssh keys are set."""
+ user = "foouser"
+ self.dist.create_user(
+ user, ssh_redirect_user="someuser", cloud_public_ssh_keys=["key1"]
+ )
+ disable_prefix = ssh_util.DISABLE_USER_OPTS
+ disable_prefix = disable_prefix.replace("$USER", "someuser")
+ disable_prefix = disable_prefix.replace("$DISABLE_USER", user)
+ m_setup_user_keys.assert_called_once_with(
+ set(["key1"]), "foouser", options=disable_prefix
+ )
+
+ @mock.patch("cloudinit.ssh_util.setup_user_keys")
+ def test_create_user_with_ssh_redirect_user_does_not_disable_auth_keys(
+ self, m_setup_user_keys, m_subp, m_is_snappy
+ ):
+ """Do not disable ssh_authorized_keys when ssh_redirect_user is set."""
+ user = "foouser"
+ self.dist.create_user(
+ user,
+ ssh_authorized_keys="auth1",
+ ssh_redirect_user="someuser",
+ cloud_public_ssh_keys=["key1"],
+ )
+ disable_prefix = ssh_util.DISABLE_USER_OPTS
+ disable_prefix = disable_prefix.replace("$USER", "someuser")
+ disable_prefix = disable_prefix.replace("$DISABLE_USER", user)
+ self.assertEqual(
+ m_setup_user_keys.call_args_list,
+ [
+ mock.call(set(["auth1"]), user), # not disabled
+ mock.call(set(["key1"]), "foouser", options=disable_prefix),
+ ],
+ )
+
+ @mock.patch("cloudinit.distros.subp.which")
+ def test_lock_with_usermod_if_no_passwd(
+ self, m_which, m_subp, m_is_snappy
+ ):
+ """Lock uses usermod --lock if no 'passwd' cmd available."""
+ m_which.side_effect = lambda m: m in ("usermod",)
+ self.dist.lock_passwd("bob")
+ self.assertEqual(
+ [mock.call(["usermod", "--lock", "bob"])], m_subp.call_args_list
+ )
+
+ @mock.patch("cloudinit.distros.subp.which")
+ def test_lock_with_passwd_if_available(self, m_which, m_subp, m_is_snappy):
+ """Lock with only passwd will use passwd."""
+ m_which.side_effect = lambda m: m in ("passwd",)
+ self.dist.lock_passwd("bob")
+ self.assertEqual(
+ [mock.call(["passwd", "-l", "bob"])], m_subp.call_args_list
+ )
+
+ @mock.patch("cloudinit.distros.subp.which")
+ def test_lock_raises_runtime_if_no_commands(
+ self, m_which, m_subp, m_is_snappy
+ ):
+ """Lock with no commands available raises RuntimeError."""
+ m_which.return_value = None
+ with self.assertRaises(RuntimeError):
+ self.dist.lock_passwd("bob")
+
+
+# vi: ts=4 expandtab