# This file is part of cloud-init. See LICENSE file for license information. import re from cloudinit import distros, ssh_util from tests.unittests.helpers import CiTestCase, mock from tests.unittests.util import abstract_to_concrete @mock.patch("cloudinit.distros.util.system_is_snappy", return_value=False) @mock.patch("cloudinit.distros.subp.subp") class TestCreateUser(CiTestCase): with_logs = True def setUp(self): super(TestCreateUser, self).setUp() self.dist = abstract_to_concrete(distros.Distro)( name="test", cfg=None, paths=None ) def _useradd2call(self, args): # return a mock call for the useradd command in args # with expected 'logstring'. args = ["useradd"] + args logcmd = [a for a in args] for i in range(len(args)): if args[i] in ("--password",): logcmd[i + 1] = "REDACTED" return mock.call(args, logstring=logcmd) def test_basic(self, m_subp, m_is_snappy): user = "foouser" self.dist.create_user(user) self.assertEqual( m_subp.call_args_list, [ self._useradd2call([user, "-m"]), mock.call(["passwd", "-l", user]), ], ) def test_no_home(self, m_subp, m_is_snappy): user = "foouser" self.dist.create_user(user, no_create_home=True) self.assertEqual( m_subp.call_args_list, [ self._useradd2call([user, "-M"]), mock.call(["passwd", "-l", user]), ], ) def test_system_user(self, m_subp, m_is_snappy): # system user should have no home and get --system user = "foouser" self.dist.create_user(user, system=True) self.assertEqual( m_subp.call_args_list, [ self._useradd2call([user, "--system", "-M"]), mock.call(["passwd", "-l", user]), ], ) def test_explicit_no_home_false(self, m_subp, m_is_snappy): user = "foouser" self.dist.create_user(user, no_create_home=False) self.assertEqual( m_subp.call_args_list, [ self._useradd2call([user, "-m"]), mock.call(["passwd", "-l", user]), ], ) def test_unlocked(self, m_subp, m_is_snappy): user = "foouser" self.dist.create_user(user, lock_passwd=False) self.assertEqual( m_subp.call_args_list, [self._useradd2call([user, "-m"])] ) def test_set_password(self, m_subp, m_is_snappy): user = "foouser" password = "passfoo" self.dist.create_user(user, passwd=password) self.assertEqual( m_subp.call_args_list, [ self._useradd2call([user, "--password", password, "-m"]), mock.call(["passwd", "-l", user]), ], ) @mock.patch("cloudinit.distros.util.is_group") def test_group_added(self, m_is_group, m_subp, m_is_snappy): m_is_group.return_value = False user = "foouser" self.dist.create_user(user, groups=["group1"]) expected = [ mock.call(["groupadd", "group1"]), self._useradd2call([user, "--groups", "group1", "-m"]), mock.call(["passwd", "-l", user]), ] self.assertEqual(m_subp.call_args_list, expected) @mock.patch("cloudinit.distros.util.is_group") def test_only_new_group_added(self, m_is_group, m_subp, m_is_snappy): ex_groups = ["existing_group"] groups = ["group1", ex_groups[0]] m_is_group.side_effect = lambda m: m in ex_groups user = "foouser" self.dist.create_user(user, groups=groups) expected = [ mock.call(["groupadd", "group1"]), self._useradd2call([user, "--groups", ",".join(groups), "-m"]), mock.call(["passwd", "-l", user]), ] self.assertEqual(m_subp.call_args_list, expected) @mock.patch("cloudinit.distros.util.is_group") def test_create_groups_with_whitespace_string( self, m_is_group, m_subp, m_is_snappy ): # groups supported as a comma delimeted string even with white space m_is_group.return_value = False user = "foouser" self.dist.create_user(user, groups="group1, group2") expected = [ mock.call(["groupadd", "group1"]), mock.call(["groupadd", "group2"]), self._useradd2call([user, "--groups", "group1,group2", "-m"]), mock.call(["passwd", "-l", user]), ] self.assertEqual(m_subp.call_args_list, expected) def test_explicit_sudo_false(self, m_subp, m_is_snappy): user = "foouser" self.dist.create_user(user, sudo=False) self.assertEqual( m_subp.call_args_list, [ self._useradd2call([user, "-m"]), mock.call(["passwd", "-l", user]), ], ) @mock.patch("cloudinit.ssh_util.setup_user_keys") def test_setup_ssh_authorized_keys_with_string( self, m_setup_user_keys, m_subp, m_is_snappy ): """ssh_authorized_keys allows string and calls setup_user_keys.""" user = "foouser" self.dist.create_user(user, ssh_authorized_keys="mykey") self.assertEqual( m_subp.call_args_list, [ self._useradd2call([user, "-m"]), mock.call(["passwd", "-l", user]), ], ) m_setup_user_keys.assert_called_once_with(set(["mykey"]), user) @mock.patch("cloudinit.ssh_util.setup_user_keys") def test_setup_ssh_authorized_keys_with_list( self, m_setup_user_keys, m_subp, m_is_snappy ): """ssh_authorized_keys allows lists and calls setup_user_keys.""" user = "foouser" self.dist.create_user(user, ssh_authorized_keys=["key1", "key2"]) self.assertEqual( m_subp.call_args_list, [ self._useradd2call([user, "-m"]), mock.call(["passwd", "-l", user]), ], ) m_setup_user_keys.assert_called_once_with(set(["key1", "key2"]), user) @mock.patch("cloudinit.ssh_util.setup_user_keys") def test_setup_ssh_authorized_keys_with_integer( self, m_setup_user_keys, m_subp, m_is_snappy ): """ssh_authorized_keys warns on non-iterable/string type.""" user = "foouser" self.dist.create_user(user, ssh_authorized_keys=-1) m_setup_user_keys.assert_called_once_with(set([]), user) match = re.match( r".*WARNING: Invalid type \'<(type|class) \'int\'>\' detected for" " 'ssh_authorized_keys'.*", self.logs.getvalue(), re.DOTALL, ) self.assertIsNotNone( match, "Missing ssh_authorized_keys invalid type warning" ) @mock.patch("cloudinit.ssh_util.setup_user_keys") def test_create_user_with_ssh_redirect_user_no_cloud_keys( self, m_setup_user_keys, m_subp, m_is_snappy ): """Log a warning when trying to redirect a user no cloud ssh keys.""" user = "foouser" self.dist.create_user(user, ssh_redirect_user="someuser") self.assertIn( "WARNING: Unable to disable SSH logins for foouser given " "ssh_redirect_user: someuser. No cloud public-keys present.\n", self.logs.getvalue(), ) m_setup_user_keys.assert_not_called() @mock.patch("cloudinit.ssh_util.setup_user_keys") def test_create_user_with_ssh_redirect_user_with_cloud_keys( self, m_setup_user_keys, m_subp, m_is_snappy ): """Disable ssh when ssh_redirect_user and cloud ssh keys are set.""" user = "foouser" self.dist.create_user( user, ssh_redirect_user="someuser", cloud_public_ssh_keys=["key1"] ) disable_prefix = ssh_util.DISABLE_USER_OPTS disable_prefix = disable_prefix.replace("$USER", "someuser") disable_prefix = disable_prefix.replace("$DISABLE_USER", user) m_setup_user_keys.assert_called_once_with( set(["key1"]), "foouser", options=disable_prefix ) @mock.patch("cloudinit.ssh_util.setup_user_keys") def test_create_user_with_ssh_redirect_user_does_not_disable_auth_keys( self, m_setup_user_keys, m_subp, m_is_snappy ): """Do not disable ssh_authorized_keys when ssh_redirect_user is set.""" user = "foouser" self.dist.create_user( user, ssh_authorized_keys="auth1", ssh_redirect_user="someuser", cloud_public_ssh_keys=["key1"], ) disable_prefix = ssh_util.DISABLE_USER_OPTS disable_prefix = disable_prefix.replace("$USER", "someuser") disable_prefix = disable_prefix.replace("$DISABLE_USER", user) self.assertEqual( m_setup_user_keys.call_args_list, [ mock.call(set(["auth1"]), user), # not disabled mock.call(set(["key1"]), "foouser", options=disable_prefix), ], ) @mock.patch("cloudinit.distros.subp.which") def test_lock_with_usermod_if_no_passwd( self, m_which, m_subp, m_is_snappy ): """Lock uses usermod --lock if no 'passwd' cmd available.""" m_which.side_effect = lambda m: m in ("usermod",) self.dist.lock_passwd("bob") self.assertEqual( [mock.call(["usermod", "--lock", "bob"])], m_subp.call_args_list ) @mock.patch("cloudinit.distros.subp.which") def test_lock_with_passwd_if_available(self, m_which, m_subp, m_is_snappy): """Lock with only passwd will use passwd.""" m_which.side_effect = lambda m: m in ("passwd",) self.dist.lock_passwd("bob") self.assertEqual( [mock.call(["passwd", "-l", "bob"])], m_subp.call_args_list ) @mock.patch("cloudinit.distros.subp.which") def test_lock_raises_runtime_if_no_commands( self, m_which, m_subp, m_is_snappy ): """Lock with no commands available raises RuntimeError.""" m_which.return_value = None with self.assertRaises(RuntimeError): self.dist.lock_passwd("bob") # vi: ts=4 expandtab