diff options
Diffstat (limited to 'cloudinit/config/tests')
-rw-r--r-- | cloudinit/config/tests/test_disable_ec2_metadata.py | 50 | ||||
-rw-r--r-- | cloudinit/config/tests/test_set_passwords.py | 71 | ||||
-rw-r--r-- | cloudinit/config/tests/test_snap.py | 36 | ||||
-rw-r--r-- | cloudinit/config/tests/test_ssh.py | 151 | ||||
-rw-r--r-- | cloudinit/config/tests/test_ubuntu_advantage.py | 37 | ||||
-rw-r--r-- | cloudinit/config/tests/test_users_groups.py | 144 |
6 files changed, 481 insertions, 8 deletions
diff --git a/cloudinit/config/tests/test_disable_ec2_metadata.py b/cloudinit/config/tests/test_disable_ec2_metadata.py new file mode 100644 index 00000000..67646b03 --- /dev/null +++ b/cloudinit/config/tests/test_disable_ec2_metadata.py @@ -0,0 +1,50 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +"""Tests cc_disable_ec2_metadata handler""" + +import cloudinit.config.cc_disable_ec2_metadata as ec2_meta + +from cloudinit.tests.helpers import CiTestCase, mock + +import logging + +LOG = logging.getLogger(__name__) + +DISABLE_CFG = {'disable_ec2_metadata': 'true'} + + +class TestEC2MetadataRoute(CiTestCase): + + with_logs = True + + @mock.patch('cloudinit.config.cc_disable_ec2_metadata.util.which') + @mock.patch('cloudinit.config.cc_disable_ec2_metadata.util.subp') + def test_disable_ifconfig(self, m_subp, m_which): + """Set the route if ifconfig command is available""" + m_which.side_effect = lambda x: x if x == 'ifconfig' else None + ec2_meta.handle('foo', DISABLE_CFG, None, LOG, None) + m_subp.assert_called_with( + ['route', 'add', '-host', '169.254.169.254', 'reject'], + capture=False) + + @mock.patch('cloudinit.config.cc_disable_ec2_metadata.util.which') + @mock.patch('cloudinit.config.cc_disable_ec2_metadata.util.subp') + def test_disable_ip(self, m_subp, m_which): + """Set the route if ip command is available""" + m_which.side_effect = lambda x: x if x == 'ip' else None + ec2_meta.handle('foo', DISABLE_CFG, None, LOG, None) + m_subp.assert_called_with( + ['ip', 'route', 'add', 'prohibit', '169.254.169.254'], + capture=False) + + @mock.patch('cloudinit.config.cc_disable_ec2_metadata.util.which') + @mock.patch('cloudinit.config.cc_disable_ec2_metadata.util.subp') + def test_disable_no_tool(self, m_subp, m_which): + """Log error when neither route nor ip commands are available""" + m_which.return_value = None # Find neither ifconfig nor ip + ec2_meta.handle('foo', DISABLE_CFG, None, LOG, None) + self.assertEqual( + [mock.call('ip'), mock.call('ifconfig')], m_which.call_args_list) + m_subp.assert_not_called() + +# vi: ts=4 expandtab diff --git a/cloudinit/config/tests/test_set_passwords.py b/cloudinit/config/tests/test_set_passwords.py new file mode 100644 index 00000000..b051ec82 --- /dev/null +++ b/cloudinit/config/tests/test_set_passwords.py @@ -0,0 +1,71 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +import mock + +from cloudinit.config import cc_set_passwords as setpass +from cloudinit.tests.helpers import CiTestCase +from cloudinit import util + +MODPATH = "cloudinit.config.cc_set_passwords." + + +class TestHandleSshPwauth(CiTestCase): + """Test cc_set_passwords handling of ssh_pwauth in handle_ssh_pwauth.""" + + with_logs = True + + @mock.patch(MODPATH + "util.subp") + def test_unknown_value_logs_warning(self, m_subp): + setpass.handle_ssh_pwauth("floo") + self.assertIn("Unrecognized value: ssh_pwauth=floo", + self.logs.getvalue()) + m_subp.assert_not_called() + + @mock.patch(MODPATH + "update_ssh_config", return_value=True) + @mock.patch(MODPATH + "util.subp") + def test_systemctl_as_service_cmd(self, m_subp, m_update_ssh_config): + """If systemctl in service cmd: systemctl restart name.""" + setpass.handle_ssh_pwauth( + True, service_cmd=["systemctl"], service_name="myssh") + self.assertEqual(mock.call(["systemctl", "restart", "myssh"]), + m_subp.call_args) + + @mock.patch(MODPATH + "update_ssh_config", return_value=True) + @mock.patch(MODPATH + "util.subp") + def test_service_as_service_cmd(self, m_subp, m_update_ssh_config): + """If systemctl in service cmd: systemctl restart name.""" + setpass.handle_ssh_pwauth( + True, service_cmd=["service"], service_name="myssh") + self.assertEqual(mock.call(["service", "myssh", "restart"]), + m_subp.call_args) + + @mock.patch(MODPATH + "update_ssh_config", return_value=False) + @mock.patch(MODPATH + "util.subp") + def test_not_restarted_if_not_updated(self, m_subp, m_update_ssh_config): + """If config is not updated, then no system restart should be done.""" + setpass.handle_ssh_pwauth(True) + m_subp.assert_not_called() + self.assertIn("No need to restart ssh", self.logs.getvalue()) + + @mock.patch(MODPATH + "update_ssh_config", return_value=True) + @mock.patch(MODPATH + "util.subp") + def test_unchanged_does_nothing(self, m_subp, m_update_ssh_config): + """If 'unchanged', then no updates to config and no restart.""" + setpass.handle_ssh_pwauth( + "unchanged", service_cmd=["systemctl"], service_name="myssh") + m_update_ssh_config.assert_not_called() + m_subp.assert_not_called() + + @mock.patch(MODPATH + "util.subp") + def test_valid_change_values(self, m_subp): + """If value is a valid changen value, then update should be called.""" + upname = MODPATH + "update_ssh_config" + optname = "PasswordAuthentication" + for value in util.FALSE_STRINGS + util.TRUE_STRINGS: + optval = "yes" if value in util.TRUE_STRINGS else "no" + with mock.patch(upname, return_value=False) as m_update: + setpass.handle_ssh_pwauth(value) + m_update.assert_called_with({optname: optval}) + m_subp.assert_not_called() + +# vi: ts=4 expandtab diff --git a/cloudinit/config/tests/test_snap.py b/cloudinit/config/tests/test_snap.py index c5b4a9de..3c472891 100644 --- a/cloudinit/config/tests/test_snap.py +++ b/cloudinit/config/tests/test_snap.py @@ -9,7 +9,7 @@ from cloudinit.config.cc_snap import ( from cloudinit.config.schema import validate_cloudconfig_schema from cloudinit import util from cloudinit.tests.helpers import ( - CiTestCase, mock, wrap_and_call, skipUnlessJsonSchema) + CiTestCase, SchemaTestCaseMixin, mock, wrap_and_call, skipUnlessJsonSchema) SYSTEM_USER_ASSERTION = """\ @@ -162,6 +162,7 @@ class TestAddAssertions(CiTestCase): class TestRunCommands(CiTestCase): with_logs = True + allowed_subp = [CiTestCase.SUBP_SHELL_TRUE] def setUp(self): super(TestRunCommands, self).setUp() @@ -245,9 +246,10 @@ class TestRunCommands(CiTestCase): @skipUnlessJsonSchema() -class TestSchema(CiTestCase): +class TestSchema(CiTestCase, SchemaTestCaseMixin): with_logs = True + schema = schema def test_schema_warns_on_snap_not_as_dict(self): """If the snap configuration is not a dict, emit a warning.""" @@ -340,6 +342,30 @@ class TestSchema(CiTestCase): {'snap': {'assertions': {'01': 'also valid'}}}, schema) self.assertEqual('', self.logs.getvalue()) + def test_duplicates_are_fine_array_array(self): + """Duplicated commands array/array entries are allowed.""" + self.assertSchemaValid( + {'commands': [["echo", "bye"], ["echo" "bye"]]}, + "command entries can be duplicate.") + + def test_duplicates_are_fine_array_string(self): + """Duplicated commands array/string entries are allowed.""" + self.assertSchemaValid( + {'commands': ["echo bye", "echo bye"]}, + "command entries can be duplicate.") + + def test_duplicates_are_fine_dict_array(self): + """Duplicated commands dict/array entries are allowed.""" + self.assertSchemaValid( + {'commands': {'00': ["echo", "bye"], '01': ["echo", "bye"]}}, + "command entries can be duplicate.") + + def test_duplicates_are_fine_dict_string(self): + """Duplicated commands dict/string entries are allowed.""" + self.assertSchemaValid( + {'commands': {'00': "echo bye", '01': "echo bye"}}, + "command entries can be duplicate.") + class TestHandle(CiTestCase): @@ -399,8 +425,10 @@ class TestHandle(CiTestCase): 'snap': {'commands': ['echo "HI" >> %s' % outfile, 'echo "MOM" >> %s' % outfile]}} mock_path = 'cloudinit.config.cc_snap.sys.stderr' - with mock.patch(mock_path, new_callable=StringIO): - handle('snap', cfg=cfg, cloud=None, log=self.logger, args=None) + with self.allow_subp([CiTestCase.SUBP_SHELL_TRUE]): + with mock.patch(mock_path, new_callable=StringIO): + handle('snap', cfg=cfg, cloud=None, log=self.logger, args=None) + self.assertEqual('HI\nMOM\n', util.load_file(outfile)) @mock.patch('cloudinit.config.cc_snap.util.subp') diff --git a/cloudinit/config/tests/test_ssh.py b/cloudinit/config/tests/test_ssh.py new file mode 100644 index 00000000..c8a4271f --- /dev/null +++ b/cloudinit/config/tests/test_ssh.py @@ -0,0 +1,151 @@ +# This file is part of cloud-init. See LICENSE file for license information. + + +from cloudinit.config import cc_ssh +from cloudinit import ssh_util +from cloudinit.tests.helpers import CiTestCase, mock + +MODPATH = "cloudinit.config.cc_ssh." + + +@mock.patch(MODPATH + "ssh_util.setup_user_keys") +class TestHandleSsh(CiTestCase): + """Test cc_ssh handling of ssh config.""" + + def test_apply_credentials_with_user(self, m_setup_keys): + """Apply keys for the given user and root.""" + keys = ["key1"] + user = "clouduser" + cc_ssh.apply_credentials(keys, user, False, ssh_util.DISABLE_USER_OPTS) + self.assertEqual([mock.call(set(keys), user), + mock.call(set(keys), "root", options="")], + m_setup_keys.call_args_list) + + def test_apply_credentials_with_no_user(self, m_setup_keys): + """Apply keys for root only.""" + keys = ["key1"] + user = None + cc_ssh.apply_credentials(keys, user, False, ssh_util.DISABLE_USER_OPTS) + self.assertEqual([mock.call(set(keys), "root", options="")], + m_setup_keys.call_args_list) + + def test_apply_credentials_with_user_disable_root(self, m_setup_keys): + """Apply keys for the given user and disable root ssh.""" + keys = ["key1"] + user = "clouduser" + options = ssh_util.DISABLE_USER_OPTS + cc_ssh.apply_credentials(keys, user, True, options) + options = options.replace("$USER", user) + options = options.replace("$DISABLE_USER", "root") + self.assertEqual([mock.call(set(keys), user), + mock.call(set(keys), "root", options=options)], + m_setup_keys.call_args_list) + + def test_apply_credentials_with_no_user_disable_root(self, m_setup_keys): + """Apply keys no user and disable root ssh.""" + keys = ["key1"] + user = None + options = ssh_util.DISABLE_USER_OPTS + cc_ssh.apply_credentials(keys, user, True, options) + options = options.replace("$USER", "NONE") + options = options.replace("$DISABLE_USER", "root") + self.assertEqual([mock.call(set(keys), "root", options=options)], + m_setup_keys.call_args_list) + + @mock.patch(MODPATH + "glob.glob") + @mock.patch(MODPATH + "ug_util.normalize_users_groups") + @mock.patch(MODPATH + "os.path.exists") + def test_handle_no_cfg(self, m_path_exists, m_nug, + m_glob, m_setup_keys): + """Test handle with no config ignores generating existing keyfiles.""" + cfg = {} + keys = ["key1"] + m_glob.return_value = [] # Return no matching keys to prevent removal + # Mock os.path.exits to True to short-circuit the key writing logic + m_path_exists.return_value = True + m_nug.return_value = ([], {}) + cloud = self.tmp_cloud( + distro='ubuntu', metadata={'public-keys': keys}) + cc_ssh.handle("name", cfg, cloud, None, None) + options = ssh_util.DISABLE_USER_OPTS.replace("$USER", "NONE") + options = options.replace("$DISABLE_USER", "root") + m_glob.assert_called_once_with('/etc/ssh/ssh_host_*key*') + self.assertIn( + [mock.call('/etc/ssh/ssh_host_rsa_key'), + mock.call('/etc/ssh/ssh_host_dsa_key'), + mock.call('/etc/ssh/ssh_host_ecdsa_key'), + mock.call('/etc/ssh/ssh_host_ed25519_key')], + m_path_exists.call_args_list) + self.assertEqual([mock.call(set(keys), "root", options=options)], + m_setup_keys.call_args_list) + + @mock.patch(MODPATH + "glob.glob") + @mock.patch(MODPATH + "ug_util.normalize_users_groups") + @mock.patch(MODPATH + "os.path.exists") + def test_handle_no_cfg_and_default_root(self, m_path_exists, m_nug, + m_glob, m_setup_keys): + """Test handle with no config and a default distro user.""" + cfg = {} + keys = ["key1"] + user = "clouduser" + m_glob.return_value = [] # Return no matching keys to prevent removal + # Mock os.path.exits to True to short-circuit the key writing logic + m_path_exists.return_value = True + m_nug.return_value = ({user: {"default": user}}, {}) + cloud = self.tmp_cloud( + distro='ubuntu', metadata={'public-keys': keys}) + cc_ssh.handle("name", cfg, cloud, None, None) + + options = ssh_util.DISABLE_USER_OPTS.replace("$USER", user) + options = options.replace("$DISABLE_USER", "root") + self.assertEqual([mock.call(set(keys), user), + mock.call(set(keys), "root", options=options)], + m_setup_keys.call_args_list) + + @mock.patch(MODPATH + "glob.glob") + @mock.patch(MODPATH + "ug_util.normalize_users_groups") + @mock.patch(MODPATH + "os.path.exists") + def test_handle_cfg_with_explicit_disable_root(self, m_path_exists, m_nug, + m_glob, m_setup_keys): + """Test handle with explicit disable_root and a default distro user.""" + # This test is identical to test_handle_no_cfg_and_default_root, + # except this uses an explicit cfg value + cfg = {"disable_root": True} + keys = ["key1"] + user = "clouduser" + m_glob.return_value = [] # Return no matching keys to prevent removal + # Mock os.path.exits to True to short-circuit the key writing logic + m_path_exists.return_value = True + m_nug.return_value = ({user: {"default": user}}, {}) + cloud = self.tmp_cloud( + distro='ubuntu', metadata={'public-keys': keys}) + cc_ssh.handle("name", cfg, cloud, None, None) + + options = ssh_util.DISABLE_USER_OPTS.replace("$USER", user) + options = options.replace("$DISABLE_USER", "root") + self.assertEqual([mock.call(set(keys), user), + mock.call(set(keys), "root", options=options)], + m_setup_keys.call_args_list) + + @mock.patch(MODPATH + "glob.glob") + @mock.patch(MODPATH + "ug_util.normalize_users_groups") + @mock.patch(MODPATH + "os.path.exists") + def test_handle_cfg_without_disable_root(self, m_path_exists, m_nug, + m_glob, m_setup_keys): + """Test handle with disable_root == False.""" + # When disable_root == False, the ssh redirect for root is skipped + cfg = {"disable_root": False} + keys = ["key1"] + user = "clouduser" + m_glob.return_value = [] # Return no matching keys to prevent removal + # Mock os.path.exits to True to short-circuit the key writing logic + m_path_exists.return_value = True + m_nug.return_value = ({user: {"default": user}}, {}) + cloud = self.tmp_cloud( + distro='ubuntu', metadata={'public-keys': keys}) + cloud.get_public_ssh_keys = mock.Mock(return_value=keys) + cc_ssh.handle("name", cfg, cloud, None, None) + + self.assertEqual([mock.call(set(keys), user), + mock.call(set(keys), "root", options="")], + m_setup_keys.call_args_list) diff --git a/cloudinit/config/tests/test_ubuntu_advantage.py b/cloudinit/config/tests/test_ubuntu_advantage.py index f2a59faf..b7cf9bee 100644 --- a/cloudinit/config/tests/test_ubuntu_advantage.py +++ b/cloudinit/config/tests/test_ubuntu_advantage.py @@ -7,7 +7,8 @@ from cloudinit.config.cc_ubuntu_advantage import ( handle, maybe_install_ua_tools, run_commands, schema) from cloudinit.config.schema import validate_cloudconfig_schema from cloudinit import util -from cloudinit.tests.helpers import CiTestCase, mock, skipUnlessJsonSchema +from cloudinit.tests.helpers import ( + CiTestCase, mock, SchemaTestCaseMixin, skipUnlessJsonSchema) # Module path used in mocks @@ -22,6 +23,7 @@ class FakeCloud(object): class TestRunCommands(CiTestCase): with_logs = True + allowed_subp = [CiTestCase.SUBP_SHELL_TRUE] def setUp(self): super(TestRunCommands, self).setUp() @@ -105,9 +107,10 @@ class TestRunCommands(CiTestCase): @skipUnlessJsonSchema() -class TestSchema(CiTestCase): +class TestSchema(CiTestCase, SchemaTestCaseMixin): with_logs = True + schema = schema def test_schema_warns_on_ubuntu_advantage_not_as_dict(self): """If ubuntu-advantage configuration is not a dict, emit a warning.""" @@ -169,6 +172,30 @@ class TestSchema(CiTestCase): {'ubuntu-advantage': {'commands': {'01': 'also valid'}}}, schema) self.assertEqual('', self.logs.getvalue()) + def test_duplicates_are_fine_array_array(self): + """Duplicated commands array/array entries are allowed.""" + self.assertSchemaValid( + {'commands': [["echo", "bye"], ["echo" "bye"]]}, + "command entries can be duplicate.") + + def test_duplicates_are_fine_array_string(self): + """Duplicated commands array/string entries are allowed.""" + self.assertSchemaValid( + {'commands': ["echo bye", "echo bye"]}, + "command entries can be duplicate.") + + def test_duplicates_are_fine_dict_array(self): + """Duplicated commands dict/array entries are allowed.""" + self.assertSchemaValid( + {'commands': {'00': ["echo", "bye"], '01': ["echo", "bye"]}}, + "command entries can be duplicate.") + + def test_duplicates_are_fine_dict_string(self): + """Duplicated commands dict/string entries are allowed.""" + self.assertSchemaValid( + {'commands': {'00': "echo bye", '01': "echo bye"}}, + "command entries can be duplicate.") + class TestHandle(CiTestCase): @@ -208,8 +235,10 @@ class TestHandle(CiTestCase): 'ubuntu-advantage': {'commands': ['echo "HI" >> %s' % outfile, 'echo "MOM" >> %s' % outfile]}} mock_path = '%s.sys.stderr' % MPATH - with mock.patch(mock_path, new_callable=StringIO): - handle('nomatter', cfg=cfg, cloud=None, log=self.logger, args=None) + with self.allow_subp([CiTestCase.SUBP_SHELL_TRUE]): + with mock.patch(mock_path, new_callable=StringIO): + handle('nomatter', cfg=cfg, cloud=None, log=self.logger, + args=None) self.assertEqual('HI\nMOM\n', util.load_file(outfile)) diff --git a/cloudinit/config/tests/test_users_groups.py b/cloudinit/config/tests/test_users_groups.py new file mode 100644 index 00000000..ba0afae3 --- /dev/null +++ b/cloudinit/config/tests/test_users_groups.py @@ -0,0 +1,144 @@ +# This file is part of cloud-init. See LICENSE file for license information. + + +from cloudinit.config import cc_users_groups +from cloudinit.tests.helpers import CiTestCase, mock + +MODPATH = "cloudinit.config.cc_users_groups" + + +@mock.patch('cloudinit.distros.ubuntu.Distro.create_group') +@mock.patch('cloudinit.distros.ubuntu.Distro.create_user') +class TestHandleUsersGroups(CiTestCase): + """Test cc_users_groups handling of config.""" + + with_logs = True + + def test_handle_no_cfg_creates_no_users_or_groups(self, m_user, m_group): + """Test handle with no config will not create users or groups.""" + cfg = {} # merged cloud-config + # System config defines a default user for the distro. + sys_cfg = {'default_user': {'name': 'ubuntu', 'lock_passwd': True, + 'groups': ['lxd', 'sudo'], + 'shell': '/bin/bash'}} + metadata = {} + cloud = self.tmp_cloud( + distro='ubuntu', sys_cfg=sys_cfg, metadata=metadata) + cc_users_groups.handle('modulename', cfg, cloud, None, None) + m_user.assert_not_called() + m_group.assert_not_called() + + def test_handle_users_in_cfg_calls_create_users(self, m_user, m_group): + """When users in config, create users with distro.create_user.""" + cfg = {'users': ['default', {'name': 'me2'}]} # merged cloud-config + # System config defines a default user for the distro. + sys_cfg = {'default_user': {'name': 'ubuntu', 'lock_passwd': True, + 'groups': ['lxd', 'sudo'], + 'shell': '/bin/bash'}} + metadata = {} + cloud = self.tmp_cloud( + distro='ubuntu', sys_cfg=sys_cfg, metadata=metadata) + cc_users_groups.handle('modulename', cfg, cloud, None, None) + self.assertItemsEqual( + m_user.call_args_list, + [mock.call('ubuntu', groups='lxd,sudo', lock_passwd=True, + shell='/bin/bash'), + mock.call('me2', default=False)]) + m_group.assert_not_called() + + def test_users_with_ssh_redirect_user_passes_keys(self, m_user, m_group): + """When ssh_redirect_user is True pass default user and cloud keys.""" + cfg = { + 'users': ['default', {'name': 'me2', 'ssh_redirect_user': True}]} + # System config defines a default user for the distro. + sys_cfg = {'default_user': {'name': 'ubuntu', 'lock_passwd': True, + 'groups': ['lxd', 'sudo'], + 'shell': '/bin/bash'}} + metadata = {'public-keys': ['key1']} + cloud = self.tmp_cloud( + distro='ubuntu', sys_cfg=sys_cfg, metadata=metadata) + cc_users_groups.handle('modulename', cfg, cloud, None, None) + self.assertItemsEqual( + m_user.call_args_list, + [mock.call('ubuntu', groups='lxd,sudo', lock_passwd=True, + shell='/bin/bash'), + mock.call('me2', cloud_public_ssh_keys=['key1'], default=False, + ssh_redirect_user='ubuntu')]) + m_group.assert_not_called() + + def test_users_with_ssh_redirect_user_default_str(self, m_user, m_group): + """When ssh_redirect_user is 'default' pass default username.""" + cfg = { + 'users': ['default', {'name': 'me2', + 'ssh_redirect_user': 'default'}]} + # System config defines a default user for the distro. + sys_cfg = {'default_user': {'name': 'ubuntu', 'lock_passwd': True, + 'groups': ['lxd', 'sudo'], + 'shell': '/bin/bash'}} + metadata = {'public-keys': ['key1']} + cloud = self.tmp_cloud( + distro='ubuntu', sys_cfg=sys_cfg, metadata=metadata) + cc_users_groups.handle('modulename', cfg, cloud, None, None) + self.assertItemsEqual( + m_user.call_args_list, + [mock.call('ubuntu', groups='lxd,sudo', lock_passwd=True, + shell='/bin/bash'), + mock.call('me2', cloud_public_ssh_keys=['key1'], default=False, + ssh_redirect_user='ubuntu')]) + m_group.assert_not_called() + + def test_users_with_ssh_redirect_user_non_default(self, m_user, m_group): + """Warn when ssh_redirect_user is not 'default'.""" + cfg = { + 'users': ['default', {'name': 'me2', + 'ssh_redirect_user': 'snowflake'}]} + # System config defines a default user for the distro. + sys_cfg = {'default_user': {'name': 'ubuntu', 'lock_passwd': True, + 'groups': ['lxd', 'sudo'], + 'shell': '/bin/bash'}} + metadata = {'public-keys': ['key1']} + cloud = self.tmp_cloud( + distro='ubuntu', sys_cfg=sys_cfg, metadata=metadata) + with self.assertRaises(ValueError) as context_manager: + cc_users_groups.handle('modulename', cfg, cloud, None, None) + m_group.assert_not_called() + self.assertEqual( + 'Not creating user me2. Invalid value of ssh_redirect_user:' + ' snowflake. Expected values: true, default or false.', + str(context_manager.exception)) + + def test_users_with_ssh_redirect_user_default_false(self, m_user, m_group): + """When unspecified ssh_redirect_user is false and not set up.""" + cfg = {'users': ['default', {'name': 'me2'}]} + # System config defines a default user for the distro. + sys_cfg = {'default_user': {'name': 'ubuntu', 'lock_passwd': True, + 'groups': ['lxd', 'sudo'], + 'shell': '/bin/bash'}} + metadata = {'public-keys': ['key1']} + cloud = self.tmp_cloud( + distro='ubuntu', sys_cfg=sys_cfg, metadata=metadata) + cc_users_groups.handle('modulename', cfg, cloud, None, None) + self.assertItemsEqual( + m_user.call_args_list, + [mock.call('ubuntu', groups='lxd,sudo', lock_passwd=True, + shell='/bin/bash'), + mock.call('me2', default=False)]) + m_group.assert_not_called() + + def test_users_ssh_redirect_user_and_no_default(self, m_user, m_group): + """Warn when ssh_redirect_user is True and no default user present.""" + cfg = { + 'users': ['default', {'name': 'me2', 'ssh_redirect_user': True}]} + # System config defines *no* default user for the distro. + sys_cfg = {} + metadata = {} # no public-keys defined + cloud = self.tmp_cloud( + distro='ubuntu', sys_cfg=sys_cfg, metadata=metadata) + cc_users_groups.handle('modulename', cfg, cloud, None, None) + m_user.assert_called_once_with('me2', default=False) + m_group.assert_not_called() + self.assertEqual( + 'WARNING: Ignoring ssh_redirect_user: True for me2. No' + ' default_user defined. Perhaps missing' + ' cloud configuration users: [default, ..].\n', + self.logs.getvalue()) |