diff options
Diffstat (limited to 'tests/unittests/distros/test_create_users.py')
-rw-r--r-- | tests/unittests/distros/test_create_users.py | 236 |
1 files changed, 236 insertions, 0 deletions
diff --git a/tests/unittests/distros/test_create_users.py b/tests/unittests/distros/test_create_users.py new file mode 100644 index 00000000..5baa8a4b --- /dev/null +++ b/tests/unittests/distros/test_create_users.py @@ -0,0 +1,236 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +import re + +from cloudinit import distros +from cloudinit import ssh_util +from tests.unittests.helpers import (CiTestCase, mock) +from tests.unittests.util import abstract_to_concrete + + +@mock.patch("cloudinit.distros.util.system_is_snappy", return_value=False) +@mock.patch("cloudinit.distros.subp.subp") +class TestCreateUser(CiTestCase): + + with_logs = True + + def setUp(self): + super(TestCreateUser, self).setUp() + self.dist = abstract_to_concrete(distros.Distro)( + name='test', cfg=None, paths=None + ) + + def _useradd2call(self, args): + # return a mock call for the useradd command in args + # with expected 'logstring'. + args = ['useradd'] + args + logcmd = [a for a in args] + for i in range(len(args)): + if args[i] in ('--password',): + logcmd[i + 1] = 'REDACTED' + return mock.call(args, logstring=logcmd) + + def test_basic(self, m_subp, m_is_snappy): + user = 'foouser' + self.dist.create_user(user) + self.assertEqual( + m_subp.call_args_list, + [self._useradd2call([user, '-m']), + mock.call(['passwd', '-l', user])]) + + def test_no_home(self, m_subp, m_is_snappy): + user = 'foouser' + self.dist.create_user(user, no_create_home=True) + self.assertEqual( + m_subp.call_args_list, + [self._useradd2call([user, '-M']), + mock.call(['passwd', '-l', user])]) + + def test_system_user(self, m_subp, m_is_snappy): + # system user should have no home and get --system + user = 'foouser' + self.dist.create_user(user, system=True) + self.assertEqual( + m_subp.call_args_list, + [self._useradd2call([user, '--system', '-M']), + mock.call(['passwd', '-l', user])]) + + def test_explicit_no_home_false(self, m_subp, m_is_snappy): + user = 'foouser' + self.dist.create_user(user, no_create_home=False) + self.assertEqual( + m_subp.call_args_list, + [self._useradd2call([user, '-m']), + mock.call(['passwd', '-l', user])]) + + def test_unlocked(self, m_subp, m_is_snappy): + user = 'foouser' + self.dist.create_user(user, lock_passwd=False) + self.assertEqual( + m_subp.call_args_list, + [self._useradd2call([user, '-m'])]) + + def test_set_password(self, m_subp, m_is_snappy): + user = 'foouser' + password = 'passfoo' + self.dist.create_user(user, passwd=password) + self.assertEqual( + m_subp.call_args_list, + [self._useradd2call([user, '--password', password, '-m']), + mock.call(['passwd', '-l', user])]) + + @mock.patch("cloudinit.distros.util.is_group") + def test_group_added(self, m_is_group, m_subp, m_is_snappy): + m_is_group.return_value = False + user = 'foouser' + self.dist.create_user(user, groups=['group1']) + expected = [ + mock.call(['groupadd', 'group1']), + self._useradd2call([user, '--groups', 'group1', '-m']), + mock.call(['passwd', '-l', user])] + self.assertEqual(m_subp.call_args_list, expected) + + @mock.patch("cloudinit.distros.util.is_group") + def test_only_new_group_added(self, m_is_group, m_subp, m_is_snappy): + ex_groups = ['existing_group'] + groups = ['group1', ex_groups[0]] + m_is_group.side_effect = lambda m: m in ex_groups + user = 'foouser' + self.dist.create_user(user, groups=groups) + expected = [ + mock.call(['groupadd', 'group1']), + self._useradd2call([user, '--groups', ','.join(groups), '-m']), + mock.call(['passwd', '-l', user])] + self.assertEqual(m_subp.call_args_list, expected) + + @mock.patch("cloudinit.distros.util.is_group") + def test_create_groups_with_whitespace_string( + self, m_is_group, m_subp, m_is_snappy): + # groups supported as a comma delimeted string even with white space + m_is_group.return_value = False + user = 'foouser' + self.dist.create_user(user, groups='group1, group2') + expected = [ + mock.call(['groupadd', 'group1']), + mock.call(['groupadd', 'group2']), + self._useradd2call([user, '--groups', 'group1,group2', '-m']), + mock.call(['passwd', '-l', user])] + self.assertEqual(m_subp.call_args_list, expected) + + def test_explicit_sudo_false(self, m_subp, m_is_snappy): + user = 'foouser' + self.dist.create_user(user, sudo=False) + self.assertEqual( + m_subp.call_args_list, + [self._useradd2call([user, '-m']), + mock.call(['passwd', '-l', user])]) + + @mock.patch('cloudinit.ssh_util.setup_user_keys') + def test_setup_ssh_authorized_keys_with_string( + self, m_setup_user_keys, m_subp, m_is_snappy): + """ssh_authorized_keys allows string and calls setup_user_keys.""" + user = 'foouser' + self.dist.create_user(user, ssh_authorized_keys='mykey') + self.assertEqual( + m_subp.call_args_list, + [self._useradd2call([user, '-m']), + mock.call(['passwd', '-l', user])]) + m_setup_user_keys.assert_called_once_with(set(['mykey']), user) + + @mock.patch('cloudinit.ssh_util.setup_user_keys') + def test_setup_ssh_authorized_keys_with_list( + self, m_setup_user_keys, m_subp, m_is_snappy): + """ssh_authorized_keys allows lists and calls setup_user_keys.""" + user = 'foouser' + self.dist.create_user(user, ssh_authorized_keys=['key1', 'key2']) + self.assertEqual( + m_subp.call_args_list, + [self._useradd2call([user, '-m']), + mock.call(['passwd', '-l', user])]) + m_setup_user_keys.assert_called_once_with(set(['key1', 'key2']), user) + + @mock.patch('cloudinit.ssh_util.setup_user_keys') + def test_setup_ssh_authorized_keys_with_integer( + self, m_setup_user_keys, m_subp, m_is_snappy): + """ssh_authorized_keys warns on non-iterable/string type.""" + user = 'foouser' + self.dist.create_user(user, ssh_authorized_keys=-1) + m_setup_user_keys.assert_called_once_with(set([]), user) + match = re.match( + r'.*WARNING: Invalid type \'<(type|class) \'int\'>\' detected for' + ' \'ssh_authorized_keys\'.*', + self.logs.getvalue(), + re.DOTALL) + self.assertIsNotNone( + match, 'Missing ssh_authorized_keys invalid type warning') + + @mock.patch('cloudinit.ssh_util.setup_user_keys') + def test_create_user_with_ssh_redirect_user_no_cloud_keys( + self, m_setup_user_keys, m_subp, m_is_snappy): + """Log a warning when trying to redirect a user no cloud ssh keys.""" + user = 'foouser' + self.dist.create_user(user, ssh_redirect_user='someuser') + self.assertIn( + 'WARNING: Unable to disable SSH logins for foouser given ' + 'ssh_redirect_user: someuser. No cloud public-keys present.\n', + self.logs.getvalue()) + m_setup_user_keys.assert_not_called() + + @mock.patch('cloudinit.ssh_util.setup_user_keys') + def test_create_user_with_ssh_redirect_user_with_cloud_keys( + self, m_setup_user_keys, m_subp, m_is_snappy): + """Disable ssh when ssh_redirect_user and cloud ssh keys are set.""" + user = 'foouser' + self.dist.create_user( + user, ssh_redirect_user='someuser', cloud_public_ssh_keys=['key1']) + disable_prefix = ssh_util.DISABLE_USER_OPTS + disable_prefix = disable_prefix.replace('$USER', 'someuser') + disable_prefix = disable_prefix.replace('$DISABLE_USER', user) + m_setup_user_keys.assert_called_once_with( + set(['key1']), 'foouser', options=disable_prefix) + + @mock.patch('cloudinit.ssh_util.setup_user_keys') + def test_create_user_with_ssh_redirect_user_does_not_disable_auth_keys( + self, m_setup_user_keys, m_subp, m_is_snappy): + """Do not disable ssh_authorized_keys when ssh_redirect_user is set.""" + user = 'foouser' + self.dist.create_user( + user, ssh_authorized_keys='auth1', ssh_redirect_user='someuser', + cloud_public_ssh_keys=['key1']) + disable_prefix = ssh_util.DISABLE_USER_OPTS + disable_prefix = disable_prefix.replace('$USER', 'someuser') + disable_prefix = disable_prefix.replace('$DISABLE_USER', user) + self.assertEqual( + m_setup_user_keys.call_args_list, + [mock.call(set(['auth1']), user), # not disabled + mock.call(set(['key1']), 'foouser', options=disable_prefix)]) + + @mock.patch("cloudinit.distros.subp.which") + def test_lock_with_usermod_if_no_passwd(self, m_which, m_subp, + m_is_snappy): + """Lock uses usermod --lock if no 'passwd' cmd available.""" + m_which.side_effect = lambda m: m in ('usermod',) + self.dist.lock_passwd("bob") + self.assertEqual( + [mock.call(['usermod', '--lock', 'bob'])], + m_subp.call_args_list) + + @mock.patch("cloudinit.distros.subp.which") + def test_lock_with_passwd_if_available(self, m_which, m_subp, + m_is_snappy): + """Lock with only passwd will use passwd.""" + m_which.side_effect = lambda m: m in ('passwd',) + self.dist.lock_passwd("bob") + self.assertEqual( + [mock.call(['passwd', '-l', 'bob'])], + m_subp.call_args_list) + + @mock.patch("cloudinit.distros.subp.which") + def test_lock_raises_runtime_if_no_commands(self, m_which, m_subp, + m_is_snappy): + """Lock with no commands available raises RuntimeError.""" + m_which.return_value = None + with self.assertRaises(RuntimeError): + self.dist.lock_passwd("bob") + +# vi: ts=4 expandtab |