diff options
Diffstat (limited to 'tests/unittests/distros/test_create_users.py')
-rw-r--r-- | tests/unittests/distros/test_create_users.py | 282 |
1 files changed, 282 insertions, 0 deletions
diff --git a/tests/unittests/distros/test_create_users.py b/tests/unittests/distros/test_create_users.py new file mode 100644 index 00000000..ddb039bd --- /dev/null +++ b/tests/unittests/distros/test_create_users.py @@ -0,0 +1,282 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +import re + +from cloudinit import distros, ssh_util +from tests.unittests.helpers import CiTestCase, mock +from tests.unittests.util import abstract_to_concrete + + +@mock.patch("cloudinit.distros.util.system_is_snappy", return_value=False) +@mock.patch("cloudinit.distros.subp.subp") +class TestCreateUser(CiTestCase): + + with_logs = True + + def setUp(self): + super(TestCreateUser, self).setUp() + self.dist = abstract_to_concrete(distros.Distro)( + name="test", cfg=None, paths=None + ) + + def _useradd2call(self, args): + # return a mock call for the useradd command in args + # with expected 'logstring'. + args = ["useradd"] + args + logcmd = [a for a in args] + for i in range(len(args)): + if args[i] in ("--password",): + logcmd[i + 1] = "REDACTED" + return mock.call(args, logstring=logcmd) + + def test_basic(self, m_subp, m_is_snappy): + user = "foouser" + self.dist.create_user(user) + self.assertEqual( + m_subp.call_args_list, + [ + self._useradd2call([user, "-m"]), + mock.call(["passwd", "-l", user]), + ], + ) + + def test_no_home(self, m_subp, m_is_snappy): + user = "foouser" + self.dist.create_user(user, no_create_home=True) + self.assertEqual( + m_subp.call_args_list, + [ + self._useradd2call([user, "-M"]), + mock.call(["passwd", "-l", user]), + ], + ) + + def test_system_user(self, m_subp, m_is_snappy): + # system user should have no home and get --system + user = "foouser" + self.dist.create_user(user, system=True) + self.assertEqual( + m_subp.call_args_list, + [ + self._useradd2call([user, "--system", "-M"]), + mock.call(["passwd", "-l", user]), + ], + ) + + def test_explicit_no_home_false(self, m_subp, m_is_snappy): + user = "foouser" + self.dist.create_user(user, no_create_home=False) + self.assertEqual( + m_subp.call_args_list, + [ + self._useradd2call([user, "-m"]), + mock.call(["passwd", "-l", user]), + ], + ) + + def test_unlocked(self, m_subp, m_is_snappy): + user = "foouser" + self.dist.create_user(user, lock_passwd=False) + self.assertEqual( + m_subp.call_args_list, [self._useradd2call([user, "-m"])] + ) + + def test_set_password(self, m_subp, m_is_snappy): + user = "foouser" + password = "passfoo" + self.dist.create_user(user, passwd=password) + self.assertEqual( + m_subp.call_args_list, + [ + self._useradd2call([user, "--password", password, "-m"]), + mock.call(["passwd", "-l", user]), + ], + ) + + @mock.patch("cloudinit.distros.util.is_group") + def test_group_added(self, m_is_group, m_subp, m_is_snappy): + m_is_group.return_value = False + user = "foouser" + self.dist.create_user(user, groups=["group1"]) + expected = [ + mock.call(["groupadd", "group1"]), + self._useradd2call([user, "--groups", "group1", "-m"]), + mock.call(["passwd", "-l", user]), + ] + self.assertEqual(m_subp.call_args_list, expected) + + @mock.patch("cloudinit.distros.util.is_group") + def test_only_new_group_added(self, m_is_group, m_subp, m_is_snappy): + ex_groups = ["existing_group"] + groups = ["group1", ex_groups[0]] + m_is_group.side_effect = lambda m: m in ex_groups + user = "foouser" + self.dist.create_user(user, groups=groups) + expected = [ + mock.call(["groupadd", "group1"]), + self._useradd2call([user, "--groups", ",".join(groups), "-m"]), + mock.call(["passwd", "-l", user]), + ] + self.assertEqual(m_subp.call_args_list, expected) + + @mock.patch("cloudinit.distros.util.is_group") + def test_create_groups_with_whitespace_string( + self, m_is_group, m_subp, m_is_snappy + ): + # groups supported as a comma delimeted string even with white space + m_is_group.return_value = False + user = "foouser" + self.dist.create_user(user, groups="group1, group2") + expected = [ + mock.call(["groupadd", "group1"]), + mock.call(["groupadd", "group2"]), + self._useradd2call([user, "--groups", "group1,group2", "-m"]), + mock.call(["passwd", "-l", user]), + ] + self.assertEqual(m_subp.call_args_list, expected) + + def test_explicit_sudo_false(self, m_subp, m_is_snappy): + user = "foouser" + self.dist.create_user(user, sudo=False) + self.assertEqual( + m_subp.call_args_list, + [ + self._useradd2call([user, "-m"]), + mock.call(["passwd", "-l", user]), + ], + ) + + @mock.patch("cloudinit.ssh_util.setup_user_keys") + def test_setup_ssh_authorized_keys_with_string( + self, m_setup_user_keys, m_subp, m_is_snappy + ): + """ssh_authorized_keys allows string and calls setup_user_keys.""" + user = "foouser" + self.dist.create_user(user, ssh_authorized_keys="mykey") + self.assertEqual( + m_subp.call_args_list, + [ + self._useradd2call([user, "-m"]), + mock.call(["passwd", "-l", user]), + ], + ) + m_setup_user_keys.assert_called_once_with(set(["mykey"]), user) + + @mock.patch("cloudinit.ssh_util.setup_user_keys") + def test_setup_ssh_authorized_keys_with_list( + self, m_setup_user_keys, m_subp, m_is_snappy + ): + """ssh_authorized_keys allows lists and calls setup_user_keys.""" + user = "foouser" + self.dist.create_user(user, ssh_authorized_keys=["key1", "key2"]) + self.assertEqual( + m_subp.call_args_list, + [ + self._useradd2call([user, "-m"]), + mock.call(["passwd", "-l", user]), + ], + ) + m_setup_user_keys.assert_called_once_with(set(["key1", "key2"]), user) + + @mock.patch("cloudinit.ssh_util.setup_user_keys") + def test_setup_ssh_authorized_keys_with_integer( + self, m_setup_user_keys, m_subp, m_is_snappy + ): + """ssh_authorized_keys warns on non-iterable/string type.""" + user = "foouser" + self.dist.create_user(user, ssh_authorized_keys=-1) + m_setup_user_keys.assert_called_once_with(set([]), user) + match = re.match( + r".*WARNING: Invalid type \'<(type|class) \'int\'>\' detected for" + " 'ssh_authorized_keys'.*", + self.logs.getvalue(), + re.DOTALL, + ) + self.assertIsNotNone( + match, "Missing ssh_authorized_keys invalid type warning" + ) + + @mock.patch("cloudinit.ssh_util.setup_user_keys") + def test_create_user_with_ssh_redirect_user_no_cloud_keys( + self, m_setup_user_keys, m_subp, m_is_snappy + ): + """Log a warning when trying to redirect a user no cloud ssh keys.""" + user = "foouser" + self.dist.create_user(user, ssh_redirect_user="someuser") + self.assertIn( + "WARNING: Unable to disable SSH logins for foouser given " + "ssh_redirect_user: someuser. No cloud public-keys present.\n", + self.logs.getvalue(), + ) + m_setup_user_keys.assert_not_called() + + @mock.patch("cloudinit.ssh_util.setup_user_keys") + def test_create_user_with_ssh_redirect_user_with_cloud_keys( + self, m_setup_user_keys, m_subp, m_is_snappy + ): + """Disable ssh when ssh_redirect_user and cloud ssh keys are set.""" + user = "foouser" + self.dist.create_user( + user, ssh_redirect_user="someuser", cloud_public_ssh_keys=["key1"] + ) + disable_prefix = ssh_util.DISABLE_USER_OPTS + disable_prefix = disable_prefix.replace("$USER", "someuser") + disable_prefix = disable_prefix.replace("$DISABLE_USER", user) + m_setup_user_keys.assert_called_once_with( + set(["key1"]), "foouser", options=disable_prefix + ) + + @mock.patch("cloudinit.ssh_util.setup_user_keys") + def test_create_user_with_ssh_redirect_user_does_not_disable_auth_keys( + self, m_setup_user_keys, m_subp, m_is_snappy + ): + """Do not disable ssh_authorized_keys when ssh_redirect_user is set.""" + user = "foouser" + self.dist.create_user( + user, + ssh_authorized_keys="auth1", + ssh_redirect_user="someuser", + cloud_public_ssh_keys=["key1"], + ) + disable_prefix = ssh_util.DISABLE_USER_OPTS + disable_prefix = disable_prefix.replace("$USER", "someuser") + disable_prefix = disable_prefix.replace("$DISABLE_USER", user) + self.assertEqual( + m_setup_user_keys.call_args_list, + [ + mock.call(set(["auth1"]), user), # not disabled + mock.call(set(["key1"]), "foouser", options=disable_prefix), + ], + ) + + @mock.patch("cloudinit.distros.subp.which") + def test_lock_with_usermod_if_no_passwd( + self, m_which, m_subp, m_is_snappy + ): + """Lock uses usermod --lock if no 'passwd' cmd available.""" + m_which.side_effect = lambda m: m in ("usermod",) + self.dist.lock_passwd("bob") + self.assertEqual( + [mock.call(["usermod", "--lock", "bob"])], m_subp.call_args_list + ) + + @mock.patch("cloudinit.distros.subp.which") + def test_lock_with_passwd_if_available(self, m_which, m_subp, m_is_snappy): + """Lock with only passwd will use passwd.""" + m_which.side_effect = lambda m: m in ("passwd",) + self.dist.lock_passwd("bob") + self.assertEqual( + [mock.call(["passwd", "-l", "bob"])], m_subp.call_args_list + ) + + @mock.patch("cloudinit.distros.subp.which") + def test_lock_raises_runtime_if_no_commands( + self, m_which, m_subp, m_is_snappy + ): + """Lock with no commands available raises RuntimeError.""" + m_which.return_value = None + with self.assertRaises(RuntimeError): + self.dist.lock_passwd("bob") + + +# vi: ts=4 expandtab |