From 039c40f9b3d88ee8158604bb18ca4bf2fb5d5e51 Mon Sep 17 00:00:00 2001 From: Brett Holman Date: Fri, 3 Dec 2021 13:11:46 -0700 Subject: Reorganize unit test locations under tests/unittests (#1126) This attempts to standardize unit test file location under test/unittests/ such that any source file located at cloudinit/path/to/file.py may have a corresponding unit test file at test/unittests/path/to/test_file.py. Noteworthy Comments: ==================== Four different duplicate test files existed: test_{gpg,util,cc_mounts,cc_resolv_conf}.py Each of these duplicate file pairs has been merged together. This is a break in git history for these files. The test suite appears to have a dependency on test order. Changing test order causes some tests to fail. This should be rectified, but for now some tests have been modified in tests/unittests/config/test_set_passwords.py. A helper class name starts with "Test" which causes pytest to try executing it as a test case, which then throws warnings "due to Class having __init__()". Silence by changing the name of the class. # helpers.py is imported in many test files, import paths change cloudinit/tests/helpers.py -> tests/unittests/helpers.py # Move directories: cloudinit/distros/tests -> tests/unittests/distros cloudinit/cmd/devel/tests -> tests/unittests/cmd/devel cloudinit/cmd/tests -> tests/unittests/cmd/ cloudinit/sources/helpers/tests -> tests/unittests/sources/helpers cloudinit/sources/tests -> tests/unittests/sources cloudinit/net/tests -> tests/unittests/net cloudinit/config/tests -> tests/unittests/config cloudinit/analyze/tests/ -> tests/unittests/analyze/ # Standardize tests already in tests/unittests/ test_datasource -> sources test_distros -> distros test_vmware -> sources/vmware test_handler -> config # this contains cloudconfig module tests test_runs -> runs --- cloudinit/config/tests/test_ssh.py | 405 ------------------------------------- 1 file changed, 405 deletions(-) delete mode 100644 cloudinit/config/tests/test_ssh.py (limited to 'cloudinit/config/tests/test_ssh.py') diff --git a/cloudinit/config/tests/test_ssh.py b/cloudinit/config/tests/test_ssh.py deleted file mode 100644 index 87ccdb60..00000000 --- a/cloudinit/config/tests/test_ssh.py +++ /dev/null @@ -1,405 +0,0 @@ -# This file is part of cloud-init. See LICENSE file for license information. - -import os.path - -from cloudinit.config import cc_ssh -from cloudinit import ssh_util -from cloudinit.tests.helpers import CiTestCase, mock -import logging - -LOG = logging.getLogger(__name__) - -MODPATH = "cloudinit.config.cc_ssh." -KEY_NAMES_NO_DSA = [name for name in cc_ssh.GENERATE_KEY_NAMES - if name not in 'dsa'] - - -@mock.patch(MODPATH + "ssh_util.setup_user_keys") -class TestHandleSsh(CiTestCase): - """Test cc_ssh handling of ssh config.""" - - def _publish_hostkey_test_setup(self): - self.test_hostkeys = { - 'dsa': ('ssh-dss', 'AAAAB3NzaC1kc3MAAACB'), - 'ecdsa': ('ecdsa-sha2-nistp256', 'AAAAE2VjZ'), - 'ed25519': ('ssh-ed25519', 'AAAAC3NzaC1lZDI'), - 'rsa': ('ssh-rsa', 'AAAAB3NzaC1yc2EAAA'), - } - self.test_hostkey_files = [] - hostkey_tmpdir = self.tmp_dir() - for key_type in cc_ssh.GENERATE_KEY_NAMES: - key_data = self.test_hostkeys[key_type] - filename = 'ssh_host_%s_key.pub' % key_type - filepath = os.path.join(hostkey_tmpdir, filename) - self.test_hostkey_files.append(filepath) - with open(filepath, 'w') as f: - f.write(' '.join(key_data)) - - cc_ssh.KEY_FILE_TPL = os.path.join(hostkey_tmpdir, 'ssh_host_%s_key') - - def test_apply_credentials_with_user(self, m_setup_keys): - """Apply keys for the given user and root.""" - keys = ["key1"] - user = "clouduser" - cc_ssh.apply_credentials(keys, user, False, ssh_util.DISABLE_USER_OPTS) - self.assertEqual([mock.call(set(keys), user), - mock.call(set(keys), "root", options="")], - m_setup_keys.call_args_list) - - def test_apply_credentials_with_no_user(self, m_setup_keys): - """Apply keys for root only.""" - keys = ["key1"] - user = None - cc_ssh.apply_credentials(keys, user, False, ssh_util.DISABLE_USER_OPTS) - self.assertEqual([mock.call(set(keys), "root", options="")], - m_setup_keys.call_args_list) - - def test_apply_credentials_with_user_disable_root(self, m_setup_keys): - """Apply keys for the given user and disable root ssh.""" - keys = ["key1"] - user = "clouduser" - options = ssh_util.DISABLE_USER_OPTS - cc_ssh.apply_credentials(keys, user, True, options) - options = options.replace("$USER", user) - options = options.replace("$DISABLE_USER", "root") - self.assertEqual([mock.call(set(keys), user), - mock.call(set(keys), "root", options=options)], - m_setup_keys.call_args_list) - - def test_apply_credentials_with_no_user_disable_root(self, m_setup_keys): - """Apply keys no user and disable root ssh.""" - keys = ["key1"] - user = None - options = ssh_util.DISABLE_USER_OPTS - cc_ssh.apply_credentials(keys, user, True, options) - options = options.replace("$USER", "NONE") - options = options.replace("$DISABLE_USER", "root") - self.assertEqual([mock.call(set(keys), "root", options=options)], - m_setup_keys.call_args_list) - - @mock.patch(MODPATH + "glob.glob") - @mock.patch(MODPATH + "ug_util.normalize_users_groups") - @mock.patch(MODPATH + "os.path.exists") - def test_handle_no_cfg(self, m_path_exists, m_nug, - m_glob, m_setup_keys): - """Test handle with no config ignores generating existing keyfiles.""" - cfg = {} - keys = ["key1"] - m_glob.return_value = [] # Return no matching keys to prevent removal - # Mock os.path.exits to True to short-circuit the key writing logic - m_path_exists.return_value = True - m_nug.return_value = ([], {}) - cc_ssh.PUBLISH_HOST_KEYS = False - cloud = self.tmp_cloud( - distro='ubuntu', metadata={'public-keys': keys}) - cc_ssh.handle("name", cfg, cloud, LOG, None) - options = ssh_util.DISABLE_USER_OPTS.replace("$USER", "NONE") - options = options.replace("$DISABLE_USER", "root") - m_glob.assert_called_once_with('/etc/ssh/ssh_host_*key*') - self.assertIn( - [mock.call('/etc/ssh/ssh_host_rsa_key'), - mock.call('/etc/ssh/ssh_host_dsa_key'), - mock.call('/etc/ssh/ssh_host_ecdsa_key'), - mock.call('/etc/ssh/ssh_host_ed25519_key')], - m_path_exists.call_args_list) - self.assertEqual([mock.call(set(keys), "root", options=options)], - m_setup_keys.call_args_list) - - @mock.patch(MODPATH + "glob.glob") - @mock.patch(MODPATH + "ug_util.normalize_users_groups") - @mock.patch(MODPATH + "os.path.exists") - def test_dont_allow_public_ssh_keys(self, m_path_exists, m_nug, - m_glob, m_setup_keys): - """Test allow_public_ssh_keys=False ignores ssh public keys from - platform. - """ - cfg = {"allow_public_ssh_keys": False} - keys = ["key1"] - user = "clouduser" - m_glob.return_value = [] # Return no matching keys to prevent removal - # Mock os.path.exits to True to short-circuit the key writing logic - m_path_exists.return_value = True - m_nug.return_value = ({user: {"default": user}}, {}) - cloud = self.tmp_cloud( - distro='ubuntu', metadata={'public-keys': keys}) - cc_ssh.handle("name", cfg, cloud, LOG, None) - - options = ssh_util.DISABLE_USER_OPTS.replace("$USER", user) - options = options.replace("$DISABLE_USER", "root") - self.assertEqual([mock.call(set(), user), - mock.call(set(), "root", options=options)], - m_setup_keys.call_args_list) - - @mock.patch(MODPATH + "glob.glob") - @mock.patch(MODPATH + "ug_util.normalize_users_groups") - @mock.patch(MODPATH + "os.path.exists") - def test_handle_no_cfg_and_default_root(self, m_path_exists, m_nug, - m_glob, m_setup_keys): - """Test handle with no config and a default distro user.""" - cfg = {} - keys = ["key1"] - user = "clouduser" - m_glob.return_value = [] # Return no matching keys to prevent removal - # Mock os.path.exits to True to short-circuit the key writing logic - m_path_exists.return_value = True - m_nug.return_value = ({user: {"default": user}}, {}) - cloud = self.tmp_cloud( - distro='ubuntu', metadata={'public-keys': keys}) - cc_ssh.handle("name", cfg, cloud, LOG, None) - - options = ssh_util.DISABLE_USER_OPTS.replace("$USER", user) - options = options.replace("$DISABLE_USER", "root") - self.assertEqual([mock.call(set(keys), user), - mock.call(set(keys), "root", options=options)], - m_setup_keys.call_args_list) - - @mock.patch(MODPATH + "glob.glob") - @mock.patch(MODPATH + "ug_util.normalize_users_groups") - @mock.patch(MODPATH + "os.path.exists") - def test_handle_cfg_with_explicit_disable_root(self, m_path_exists, m_nug, - m_glob, m_setup_keys): - """Test handle with explicit disable_root and a default distro user.""" - # This test is identical to test_handle_no_cfg_and_default_root, - # except this uses an explicit cfg value - cfg = {"disable_root": True} - keys = ["key1"] - user = "clouduser" - m_glob.return_value = [] # Return no matching keys to prevent removal - # Mock os.path.exits to True to short-circuit the key writing logic - m_path_exists.return_value = True - m_nug.return_value = ({user: {"default": user}}, {}) - cloud = self.tmp_cloud( - distro='ubuntu', metadata={'public-keys': keys}) - cc_ssh.handle("name", cfg, cloud, LOG, None) - - options = ssh_util.DISABLE_USER_OPTS.replace("$USER", user) - options = options.replace("$DISABLE_USER", "root") - self.assertEqual([mock.call(set(keys), user), - mock.call(set(keys), "root", options=options)], - m_setup_keys.call_args_list) - - @mock.patch(MODPATH + "glob.glob") - @mock.patch(MODPATH + "ug_util.normalize_users_groups") - @mock.patch(MODPATH + "os.path.exists") - def test_handle_cfg_without_disable_root(self, m_path_exists, m_nug, - m_glob, m_setup_keys): - """Test handle with disable_root == False.""" - # When disable_root == False, the ssh redirect for root is skipped - cfg = {"disable_root": False} - keys = ["key1"] - user = "clouduser" - m_glob.return_value = [] # Return no matching keys to prevent removal - # Mock os.path.exits to True to short-circuit the key writing logic - m_path_exists.return_value = True - m_nug.return_value = ({user: {"default": user}}, {}) - cloud = self.tmp_cloud( - distro='ubuntu', metadata={'public-keys': keys}) - cloud.get_public_ssh_keys = mock.Mock(return_value=keys) - cc_ssh.handle("name", cfg, cloud, LOG, None) - - self.assertEqual([mock.call(set(keys), user), - mock.call(set(keys), "root", options="")], - m_setup_keys.call_args_list) - - @mock.patch(MODPATH + "glob.glob") - @mock.patch(MODPATH + "ug_util.normalize_users_groups") - @mock.patch(MODPATH + "os.path.exists") - def test_handle_publish_hostkeys_default( - self, m_path_exists, m_nug, m_glob, m_setup_keys): - """Test handle with various configs for ssh_publish_hostkeys.""" - self._publish_hostkey_test_setup() - cc_ssh.PUBLISH_HOST_KEYS = True - keys = ["key1"] - user = "clouduser" - # Return no matching keys for first glob, test keys for second. - m_glob.side_effect = iter([ - [], - self.test_hostkey_files, - ]) - # Mock os.path.exits to True to short-circuit the key writing logic - m_path_exists.return_value = True - m_nug.return_value = ({user: {"default": user}}, {}) - cloud = self.tmp_cloud( - distro='ubuntu', metadata={'public-keys': keys}) - cloud.datasource.publish_host_keys = mock.Mock() - - cfg = {} - expected_call = [self.test_hostkeys[key_type] for key_type - in KEY_NAMES_NO_DSA] - cc_ssh.handle("name", cfg, cloud, LOG, None) - self.assertEqual([mock.call(expected_call)], - cloud.datasource.publish_host_keys.call_args_list) - - @mock.patch(MODPATH + "glob.glob") - @mock.patch(MODPATH + "ug_util.normalize_users_groups") - @mock.patch(MODPATH + "os.path.exists") - def test_handle_publish_hostkeys_config_enable( - self, m_path_exists, m_nug, m_glob, m_setup_keys): - """Test handle with various configs for ssh_publish_hostkeys.""" - self._publish_hostkey_test_setup() - cc_ssh.PUBLISH_HOST_KEYS = False - keys = ["key1"] - user = "clouduser" - # Return no matching keys for first glob, test keys for second. - m_glob.side_effect = iter([ - [], - self.test_hostkey_files, - ]) - # Mock os.path.exits to True to short-circuit the key writing logic - m_path_exists.return_value = True - m_nug.return_value = ({user: {"default": user}}, {}) - cloud = self.tmp_cloud( - distro='ubuntu', metadata={'public-keys': keys}) - cloud.datasource.publish_host_keys = mock.Mock() - - cfg = {'ssh_publish_hostkeys': {'enabled': True}} - expected_call = [self.test_hostkeys[key_type] for key_type - in KEY_NAMES_NO_DSA] - cc_ssh.handle("name", cfg, cloud, LOG, None) - self.assertEqual([mock.call(expected_call)], - cloud.datasource.publish_host_keys.call_args_list) - - @mock.patch(MODPATH + "glob.glob") - @mock.patch(MODPATH + "ug_util.normalize_users_groups") - @mock.patch(MODPATH + "os.path.exists") - def test_handle_publish_hostkeys_config_disable( - self, m_path_exists, m_nug, m_glob, m_setup_keys): - """Test handle with various configs for ssh_publish_hostkeys.""" - self._publish_hostkey_test_setup() - cc_ssh.PUBLISH_HOST_KEYS = True - keys = ["key1"] - user = "clouduser" - # Return no matching keys for first glob, test keys for second. - m_glob.side_effect = iter([ - [], - self.test_hostkey_files, - ]) - # Mock os.path.exits to True to short-circuit the key writing logic - m_path_exists.return_value = True - m_nug.return_value = ({user: {"default": user}}, {}) - cloud = self.tmp_cloud( - distro='ubuntu', metadata={'public-keys': keys}) - cloud.datasource.publish_host_keys = mock.Mock() - - cfg = {'ssh_publish_hostkeys': {'enabled': False}} - cc_ssh.handle("name", cfg, cloud, LOG, None) - self.assertFalse(cloud.datasource.publish_host_keys.call_args_list) - cloud.datasource.publish_host_keys.assert_not_called() - - @mock.patch(MODPATH + "glob.glob") - @mock.patch(MODPATH + "ug_util.normalize_users_groups") - @mock.patch(MODPATH + "os.path.exists") - def test_handle_publish_hostkeys_config_blacklist( - self, m_path_exists, m_nug, m_glob, m_setup_keys): - """Test handle with various configs for ssh_publish_hostkeys.""" - self._publish_hostkey_test_setup() - cc_ssh.PUBLISH_HOST_KEYS = True - keys = ["key1"] - user = "clouduser" - # Return no matching keys for first glob, test keys for second. - m_glob.side_effect = iter([ - [], - self.test_hostkey_files, - ]) - # Mock os.path.exits to True to short-circuit the key writing logic - m_path_exists.return_value = True - m_nug.return_value = ({user: {"default": user}}, {}) - cloud = self.tmp_cloud( - distro='ubuntu', metadata={'public-keys': keys}) - cloud.datasource.publish_host_keys = mock.Mock() - - cfg = {'ssh_publish_hostkeys': {'enabled': True, - 'blacklist': ['dsa', 'rsa']}} - expected_call = [self.test_hostkeys[key_type] for key_type - in ['ecdsa', 'ed25519']] - cc_ssh.handle("name", cfg, cloud, LOG, None) - self.assertEqual([mock.call(expected_call)], - cloud.datasource.publish_host_keys.call_args_list) - - @mock.patch(MODPATH + "glob.glob") - @mock.patch(MODPATH + "ug_util.normalize_users_groups") - @mock.patch(MODPATH + "os.path.exists") - def test_handle_publish_hostkeys_empty_blacklist( - self, m_path_exists, m_nug, m_glob, m_setup_keys): - """Test handle with various configs for ssh_publish_hostkeys.""" - self._publish_hostkey_test_setup() - cc_ssh.PUBLISH_HOST_KEYS = True - keys = ["key1"] - user = "clouduser" - # Return no matching keys for first glob, test keys for second. - m_glob.side_effect = iter([ - [], - self.test_hostkey_files, - ]) - # Mock os.path.exits to True to short-circuit the key writing logic - m_path_exists.return_value = True - m_nug.return_value = ({user: {"default": user}}, {}) - cloud = self.tmp_cloud( - distro='ubuntu', metadata={'public-keys': keys}) - cloud.datasource.publish_host_keys = mock.Mock() - - cfg = {'ssh_publish_hostkeys': {'enabled': True, - 'blacklist': []}} - expected_call = [self.test_hostkeys[key_type] for key_type - in cc_ssh.GENERATE_KEY_NAMES] - cc_ssh.handle("name", cfg, cloud, LOG, None) - self.assertEqual([mock.call(expected_call)], - cloud.datasource.publish_host_keys.call_args_list) - - @mock.patch(MODPATH + "ug_util.normalize_users_groups") - @mock.patch(MODPATH + "util.write_file") - def test_handle_ssh_keys_in_cfg(self, m_write_file, m_nug, m_setup_keys): - """Test handle with ssh keys and certificate.""" - # Populate a config dictionary to pass to handle() as well - # as the expected file-writing calls. - cfg = {"ssh_keys": {}} - - expected_calls = [] - for key_type in cc_ssh.GENERATE_KEY_NAMES: - private_name = "{}_private".format(key_type) - public_name = "{}_public".format(key_type) - cert_name = "{}_certificate".format(key_type) - - # Actual key contents don"t have to be realistic - private_value = "{}_PRIVATE_KEY".format(key_type) - public_value = "{}_PUBLIC_KEY".format(key_type) - cert_value = "{}_CERT_KEY".format(key_type) - - cfg["ssh_keys"][private_name] = private_value - cfg["ssh_keys"][public_name] = public_value - cfg["ssh_keys"][cert_name] = cert_value - - expected_calls.extend([ - mock.call( - '/etc/ssh/ssh_host_{}_key'.format(key_type), - private_value, - 384 - ), - mock.call( - '/etc/ssh/ssh_host_{}_key.pub'.format(key_type), - public_value, - 384 - ), - mock.call( - '/etc/ssh/ssh_host_{}_key-cert.pub'.format(key_type), - cert_value, - 384 - ), - mock.call( - '/etc/ssh/sshd_config', - ('HostCertificate /etc/ssh/ssh_host_{}_key-cert.pub' - '\n'.format(key_type)), - preserve_mode=True - ) - ]) - - # Run the handler. - m_nug.return_value = ([], {}) - with mock.patch(MODPATH + 'ssh_util.parse_ssh_config', - return_value=[]): - cc_ssh.handle("name", cfg, self.tmp_cloud(distro='ubuntu'), - LOG, None) - - # Check that all expected output has been done. - for call_ in expected_calls: - self.assertIn(call_, m_write_file.call_args_list) -- cgit v1.2.3