# This file is part of cloud-init. See LICENSE file for license information. import re from cloudinit import distros from cloudinit import ssh_util from cloudinit.tests.helpers import (CiTestCase, mock) class MyBaseDistro(distros.Distro): # MyBaseDistro is here to test base Distro class implementations def __init__(self, name="basedistro", cfg=None, paths=None): if not cfg: cfg = {} if not paths: paths = {} super(MyBaseDistro, self).__init__(name, cfg, paths) def install_packages(self, pkglist): raise NotImplementedError() def _write_network(self, settings): raise NotImplementedError() def package_command(self, command, args=None, pkgs=None): raise NotImplementedError() def update_package_sources(self): raise NotImplementedError() def apply_locale(self, locale, out_fn=None): raise NotImplementedError() def set_timezone(self, tz): raise NotImplementedError() def _read_hostname(self, filename, default=None): raise NotImplementedError() def _write_hostname(self, hostname, filename): raise NotImplementedError() def _read_system_hostname(self): raise NotImplementedError() @mock.patch("cloudinit.distros.util.system_is_snappy", return_value=False) @mock.patch("cloudinit.distros.subp.subp") class TestCreateUser(CiTestCase): with_logs = True def setUp(self): super(TestCreateUser, self).setUp() self.dist = MyBaseDistro() def _useradd2call(self, args): # return a mock call for the useradd command in args # with expected 'logstring'. args = ['useradd'] + args logcmd = [a for a in args] for i in range(len(args)): if args[i] in ('--password',): logcmd[i + 1] = 'REDACTED' return mock.call(args, logstring=logcmd) def test_basic(self, m_subp, m_is_snappy): user = 'foouser' self.dist.create_user(user) self.assertEqual( m_subp.call_args_list, [self._useradd2call([user, '-m']), mock.call(['passwd', '-l', user])]) def test_no_home(self, m_subp, m_is_snappy): user = 'foouser' self.dist.create_user(user, no_create_home=True) self.assertEqual( m_subp.call_args_list, [self._useradd2call([user, '-M']), mock.call(['passwd', '-l', user])]) def test_system_user(self, m_subp, m_is_snappy): # system user should have no home and get --system user = 'foouser' self.dist.create_user(user, system=True) self.assertEqual( m_subp.call_args_list, [self._useradd2call([user, '--system', '-M']), mock.call(['passwd', '-l', user])]) def test_explicit_no_home_false(self, m_subp, m_is_snappy): user = 'foouser' self.dist.create_user(user, no_create_home=False) self.assertEqual( m_subp.call_args_list, [self._useradd2call([user, '-m']), mock.call(['passwd', '-l', user])]) def test_unlocked(self, m_subp, m_is_snappy): user = 'foouser' self.dist.create_user(user, lock_passwd=False) self.assertEqual( m_subp.call_args_list, [self._useradd2call([user, '-m'])]) def test_set_password(self, m_subp, m_is_snappy): user = 'foouser' password = 'passfoo' self.dist.create_user(user, passwd=password) self.assertEqual( m_subp.call_args_list, [self._useradd2call([user, '--password', password, '-m']), mock.call(['passwd', '-l', user])]) @mock.patch("cloudinit.distros.util.is_group") def test_group_added(self, m_is_group, m_subp, m_is_snappy): m_is_group.return_value = False user = 'foouser' self.dist.create_user(user, groups=['group1']) expected = [ mock.call(['groupadd', 'group1']), self._useradd2call([user, '--groups', 'group1', '-m']), mock.call(['passwd', '-l', user])] self.assertEqual(m_subp.call_args_list, expected) @mock.patch("cloudinit.distros.util.is_group") def test_only_new_group_added(self, m_is_group, m_subp, m_is_snappy): ex_groups = ['existing_group'] groups = ['group1', ex_groups[0]] m_is_group.side_effect = lambda m: m in ex_groups user = 'foouser' self.dist.create_user(user, groups=groups) expected = [ mock.call(['groupadd', 'group1']), self._useradd2call([user, '--groups', ','.join(groups), '-m']), mock.call(['passwd', '-l', user])] self.assertEqual(m_subp.call_args_list, expected) @mock.patch("cloudinit.distros.util.is_group") def test_create_groups_with_whitespace_string( self, m_is_group, m_subp, m_is_snappy): # groups supported as a comma delimeted string even with white space m_is_group.return_value = False user = 'foouser' self.dist.create_user(user, groups='group1, group2') expected = [ mock.call(['groupadd', 'group1']), mock.call(['groupadd', 'group2']), self._useradd2call([user, '--groups', 'group1,group2', '-m']), mock.call(['passwd', '-l', user])] self.assertEqual(m_subp.call_args_list, expected) def test_explicit_sudo_false(self, m_subp, m_is_snappy): user = 'foouser' self.dist.create_user(user, sudo=False) self.assertEqual( m_subp.call_args_list, [self._useradd2call([user, '-m']), mock.call(['passwd', '-l', user])]) @mock.patch('cloudinit.ssh_util.setup_user_keys') def test_setup_ssh_authorized_keys_with_string( self, m_setup_user_keys, m_subp, m_is_snappy): """ssh_authorized_keys allows string and calls setup_user_keys.""" user = 'foouser' self.dist.create_user(user, ssh_authorized_keys='mykey') self.assertEqual( m_subp.call_args_list, [self._useradd2call([user, '-m']), mock.call(['passwd', '-l', user])]) m_setup_user_keys.assert_called_once_with(set(['mykey']), user) @mock.patch('cloudinit.ssh_util.setup_user_keys') def test_setup_ssh_authorized_keys_with_list( self, m_setup_user_keys, m_subp, m_is_snappy): """ssh_authorized_keys allows lists and calls setup_user_keys.""" user = 'foouser' self.dist.create_user(user, ssh_authorized_keys=['key1', 'key2']) self.assertEqual( m_subp.call_args_list, [self._useradd2call([user, '-m']), mock.call(['passwd', '-l', user])]) m_setup_user_keys.assert_called_once_with(set(['key1', 'key2']), user) @mock.patch('cloudinit.ssh_util.setup_user_keys') def test_setup_ssh_authorized_keys_with_integer( self, m_setup_user_keys, m_subp, m_is_snappy): """ssh_authorized_keys warns on non-iterable/string type.""" user = 'foouser' self.dist.create_user(user, ssh_authorized_keys=-1) m_setup_user_keys.assert_called_once_with(set([]), user) match = re.match( r'.*WARNING: Invalid type \'<(type|class) \'int\'>\' detected for' ' \'ssh_authorized_keys\'.*', self.logs.getvalue(), re.DOTALL) self.assertIsNotNone( match, 'Missing ssh_authorized_keys invalid type warning') @mock.patch('cloudinit.ssh_util.setup_user_keys') def test_create_user_with_ssh_redirect_user_no_cloud_keys( self, m_setup_user_keys, m_subp, m_is_snappy): """Log a warning when trying to redirect a user no cloud ssh keys.""" user = 'foouser' self.dist.create_user(user, ssh_redirect_user='someuser') self.assertIn( 'WARNING: Unable to disable SSH logins for foouser given ' 'ssh_redirect_user: someuser. No cloud public-keys present.\n', self.logs.getvalue()) m_setup_user_keys.assert_not_called() @mock.patch('cloudinit.ssh_util.setup_user_keys') def test_create_user_with_ssh_redirect_user_with_cloud_keys( self, m_setup_user_keys, m_subp, m_is_snappy): """Disable ssh when ssh_redirect_user and cloud ssh keys are set.""" user = 'foouser' self.dist.create_user( user, ssh_redirect_user='someuser', cloud_public_ssh_keys=['key1']) disable_prefix = ssh_util.DISABLE_USER_OPTS disable_prefix = disable_prefix.replace('$USER', 'someuser') disable_prefix = disable_prefix.replace('$DISABLE_USER', user) m_setup_user_keys.assert_called_once_with( set(['key1']), 'foouser', options=disable_prefix) @mock.patch('cloudinit.ssh_util.setup_user_keys') def test_create_user_with_ssh_redirect_user_does_not_disable_auth_keys( self, m_setup_user_keys, m_subp, m_is_snappy): """Do not disable ssh_authorized_keys when ssh_redirect_user is set.""" user = 'foouser' self.dist.create_user( user, ssh_authorized_keys='auth1', ssh_redirect_user='someuser', cloud_public_ssh_keys=['key1']) disable_prefix = ssh_util.DISABLE_USER_OPTS disable_prefix = disable_prefix.replace('$USER', 'someuser') disable_prefix = disable_prefix.replace('$DISABLE_USER', user) self.assertEqual( m_setup_user_keys.call_args_list, [mock.call(set(['auth1']), user), # not disabled mock.call(set(['key1']), 'foouser', options=disable_prefix)]) @mock.patch("cloudinit.distros.subp.which") def test_lock_with_usermod_if_no_passwd(self, m_which, m_subp, m_is_snappy): """Lock uses usermod --lock if no 'passwd' cmd available.""" m_which.side_effect = lambda m: m in ('usermod',) self.dist.lock_passwd("bob") self.assertEqual( [mock.call(['usermod', '--lock', 'bob'])], m_subp.call_args_list) @mock.patch("cloudinit.distros.subp.which") def test_lock_with_passwd_if_available(self, m_which, m_subp, m_is_snappy): """Lock with only passwd will use passwd.""" m_which.side_effect = lambda m: m in ('passwd',) self.dist.lock_passwd("bob") self.assertEqual( [mock.call(['passwd', '-l', 'bob'])], m_subp.call_args_list) @mock.patch("cloudinit.distros.subp.which") def test_lock_raises_runtime_if_no_commands(self, m_which, m_subp, m_is_snappy): """Lock with no commands available raises RuntimeError.""" m_which.return_value = None with self.assertRaises(RuntimeError): self.dist.lock_passwd("bob") # vi: ts=4 expandtab