summaryrefslogtreecommitdiff
path: root/cloudinit/config/tests/test_ssh.py
diff options
context:
space:
mode:
authorBrett Holman <bholman.devel@gmail.com>2021-12-03 13:11:46 -0700
committerGitHub <noreply@github.com>2021-12-03 13:11:46 -0700
commit039c40f9b3d88ee8158604bb18ca4bf2fb5d5e51 (patch)
tree5f1b09486ccaf98ee8159de58d9a2a1ef0af5dc1 /cloudinit/config/tests/test_ssh.py
parentffa6fc88249aa080aa31811a45569a45e567418a (diff)
downloadvyos-cloud-init-039c40f9b3d88ee8158604bb18ca4bf2fb5d5e51.tar.gz
vyos-cloud-init-039c40f9b3d88ee8158604bb18ca4bf2fb5d5e51.zip
Reorganize unit test locations under tests/unittests (#1126)
This attempts to standardize unit test file location under test/unittests/ such that any source file located at cloudinit/path/to/file.py may have a corresponding unit test file at test/unittests/path/to/test_file.py. Noteworthy Comments: ==================== Four different duplicate test files existed: test_{gpg,util,cc_mounts,cc_resolv_conf}.py Each of these duplicate file pairs has been merged together. This is a break in git history for these files. The test suite appears to have a dependency on test order. Changing test order causes some tests to fail. This should be rectified, but for now some tests have been modified in tests/unittests/config/test_set_passwords.py. A helper class name starts with "Test" which causes pytest to try executing it as a test case, which then throws warnings "due to Class having __init__()". Silence by changing the name of the class. # helpers.py is imported in many test files, import paths change cloudinit/tests/helpers.py -> tests/unittests/helpers.py # Move directories: cloudinit/distros/tests -> tests/unittests/distros cloudinit/cmd/devel/tests -> tests/unittests/cmd/devel cloudinit/cmd/tests -> tests/unittests/cmd/ cloudinit/sources/helpers/tests -> tests/unittests/sources/helpers cloudinit/sources/tests -> tests/unittests/sources cloudinit/net/tests -> tests/unittests/net cloudinit/config/tests -> tests/unittests/config cloudinit/analyze/tests/ -> tests/unittests/analyze/ # Standardize tests already in tests/unittests/ test_datasource -> sources test_distros -> distros test_vmware -> sources/vmware test_handler -> config # this contains cloudconfig module tests test_runs -> runs
Diffstat (limited to 'cloudinit/config/tests/test_ssh.py')
-rw-r--r--cloudinit/config/tests/test_ssh.py405
1 files changed, 0 insertions, 405 deletions
diff --git a/cloudinit/config/tests/test_ssh.py b/cloudinit/config/tests/test_ssh.py
deleted file mode 100644
index 87ccdb60..00000000
--- a/cloudinit/config/tests/test_ssh.py
+++ /dev/null
@@ -1,405 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-
-import os.path
-
-from cloudinit.config import cc_ssh
-from cloudinit import ssh_util
-from cloudinit.tests.helpers import CiTestCase, mock
-import logging
-
-LOG = logging.getLogger(__name__)
-
-MODPATH = "cloudinit.config.cc_ssh."
-KEY_NAMES_NO_DSA = [name for name in cc_ssh.GENERATE_KEY_NAMES
- if name not in 'dsa']
-
-
-@mock.patch(MODPATH + "ssh_util.setup_user_keys")
-class TestHandleSsh(CiTestCase):
- """Test cc_ssh handling of ssh config."""
-
- def _publish_hostkey_test_setup(self):
- self.test_hostkeys = {
- 'dsa': ('ssh-dss', 'AAAAB3NzaC1kc3MAAACB'),
- 'ecdsa': ('ecdsa-sha2-nistp256', 'AAAAE2VjZ'),
- 'ed25519': ('ssh-ed25519', 'AAAAC3NzaC1lZDI'),
- 'rsa': ('ssh-rsa', 'AAAAB3NzaC1yc2EAAA'),
- }
- self.test_hostkey_files = []
- hostkey_tmpdir = self.tmp_dir()
- for key_type in cc_ssh.GENERATE_KEY_NAMES:
- key_data = self.test_hostkeys[key_type]
- filename = 'ssh_host_%s_key.pub' % key_type
- filepath = os.path.join(hostkey_tmpdir, filename)
- self.test_hostkey_files.append(filepath)
- with open(filepath, 'w') as f:
- f.write(' '.join(key_data))
-
- cc_ssh.KEY_FILE_TPL = os.path.join(hostkey_tmpdir, 'ssh_host_%s_key')
-
- def test_apply_credentials_with_user(self, m_setup_keys):
- """Apply keys for the given user and root."""
- keys = ["key1"]
- user = "clouduser"
- cc_ssh.apply_credentials(keys, user, False, ssh_util.DISABLE_USER_OPTS)
- self.assertEqual([mock.call(set(keys), user),
- mock.call(set(keys), "root", options="")],
- m_setup_keys.call_args_list)
-
- def test_apply_credentials_with_no_user(self, m_setup_keys):
- """Apply keys for root only."""
- keys = ["key1"]
- user = None
- cc_ssh.apply_credentials(keys, user, False, ssh_util.DISABLE_USER_OPTS)
- self.assertEqual([mock.call(set(keys), "root", options="")],
- m_setup_keys.call_args_list)
-
- def test_apply_credentials_with_user_disable_root(self, m_setup_keys):
- """Apply keys for the given user and disable root ssh."""
- keys = ["key1"]
- user = "clouduser"
- options = ssh_util.DISABLE_USER_OPTS
- cc_ssh.apply_credentials(keys, user, True, options)
- options = options.replace("$USER", user)
- options = options.replace("$DISABLE_USER", "root")
- self.assertEqual([mock.call(set(keys), user),
- mock.call(set(keys), "root", options=options)],
- m_setup_keys.call_args_list)
-
- def test_apply_credentials_with_no_user_disable_root(self, m_setup_keys):
- """Apply keys no user and disable root ssh."""
- keys = ["key1"]
- user = None
- options = ssh_util.DISABLE_USER_OPTS
- cc_ssh.apply_credentials(keys, user, True, options)
- options = options.replace("$USER", "NONE")
- options = options.replace("$DISABLE_USER", "root")
- self.assertEqual([mock.call(set(keys), "root", options=options)],
- m_setup_keys.call_args_list)
-
- @mock.patch(MODPATH + "glob.glob")
- @mock.patch(MODPATH + "ug_util.normalize_users_groups")
- @mock.patch(MODPATH + "os.path.exists")
- def test_handle_no_cfg(self, m_path_exists, m_nug,
- m_glob, m_setup_keys):
- """Test handle with no config ignores generating existing keyfiles."""
- cfg = {}
- keys = ["key1"]
- m_glob.return_value = [] # Return no matching keys to prevent removal
- # Mock os.path.exits to True to short-circuit the key writing logic
- m_path_exists.return_value = True
- m_nug.return_value = ([], {})
- cc_ssh.PUBLISH_HOST_KEYS = False
- cloud = self.tmp_cloud(
- distro='ubuntu', metadata={'public-keys': keys})
- cc_ssh.handle("name", cfg, cloud, LOG, None)
- options = ssh_util.DISABLE_USER_OPTS.replace("$USER", "NONE")
- options = options.replace("$DISABLE_USER", "root")
- m_glob.assert_called_once_with('/etc/ssh/ssh_host_*key*')
- self.assertIn(
- [mock.call('/etc/ssh/ssh_host_rsa_key'),
- mock.call('/etc/ssh/ssh_host_dsa_key'),
- mock.call('/etc/ssh/ssh_host_ecdsa_key'),
- mock.call('/etc/ssh/ssh_host_ed25519_key')],
- m_path_exists.call_args_list)
- self.assertEqual([mock.call(set(keys), "root", options=options)],
- m_setup_keys.call_args_list)
-
- @mock.patch(MODPATH + "glob.glob")
- @mock.patch(MODPATH + "ug_util.normalize_users_groups")
- @mock.patch(MODPATH + "os.path.exists")
- def test_dont_allow_public_ssh_keys(self, m_path_exists, m_nug,
- m_glob, m_setup_keys):
- """Test allow_public_ssh_keys=False ignores ssh public keys from
- platform.
- """
- cfg = {"allow_public_ssh_keys": False}
- keys = ["key1"]
- user = "clouduser"
- m_glob.return_value = [] # Return no matching keys to prevent removal
- # Mock os.path.exits to True to short-circuit the key writing logic
- m_path_exists.return_value = True
- m_nug.return_value = ({user: {"default": user}}, {})
- cloud = self.tmp_cloud(
- distro='ubuntu', metadata={'public-keys': keys})
- cc_ssh.handle("name", cfg, cloud, LOG, None)
-
- options = ssh_util.DISABLE_USER_OPTS.replace("$USER", user)
- options = options.replace("$DISABLE_USER", "root")
- self.assertEqual([mock.call(set(), user),
- mock.call(set(), "root", options=options)],
- m_setup_keys.call_args_list)
-
- @mock.patch(MODPATH + "glob.glob")
- @mock.patch(MODPATH + "ug_util.normalize_users_groups")
- @mock.patch(MODPATH + "os.path.exists")
- def test_handle_no_cfg_and_default_root(self, m_path_exists, m_nug,
- m_glob, m_setup_keys):
- """Test handle with no config and a default distro user."""
- cfg = {}
- keys = ["key1"]
- user = "clouduser"
- m_glob.return_value = [] # Return no matching keys to prevent removal
- # Mock os.path.exits to True to short-circuit the key writing logic
- m_path_exists.return_value = True
- m_nug.return_value = ({user: {"default": user}}, {})
- cloud = self.tmp_cloud(
- distro='ubuntu', metadata={'public-keys': keys})
- cc_ssh.handle("name", cfg, cloud, LOG, None)
-
- options = ssh_util.DISABLE_USER_OPTS.replace("$USER", user)
- options = options.replace("$DISABLE_USER", "root")
- self.assertEqual([mock.call(set(keys), user),
- mock.call(set(keys), "root", options=options)],
- m_setup_keys.call_args_list)
-
- @mock.patch(MODPATH + "glob.glob")
- @mock.patch(MODPATH + "ug_util.normalize_users_groups")
- @mock.patch(MODPATH + "os.path.exists")
- def test_handle_cfg_with_explicit_disable_root(self, m_path_exists, m_nug,
- m_glob, m_setup_keys):
- """Test handle with explicit disable_root and a default distro user."""
- # This test is identical to test_handle_no_cfg_and_default_root,
- # except this uses an explicit cfg value
- cfg = {"disable_root": True}
- keys = ["key1"]
- user = "clouduser"
- m_glob.return_value = [] # Return no matching keys to prevent removal
- # Mock os.path.exits to True to short-circuit the key writing logic
- m_path_exists.return_value = True
- m_nug.return_value = ({user: {"default": user}}, {})
- cloud = self.tmp_cloud(
- distro='ubuntu', metadata={'public-keys': keys})
- cc_ssh.handle("name", cfg, cloud, LOG, None)
-
- options = ssh_util.DISABLE_USER_OPTS.replace("$USER", user)
- options = options.replace("$DISABLE_USER", "root")
- self.assertEqual([mock.call(set(keys), user),
- mock.call(set(keys), "root", options=options)],
- m_setup_keys.call_args_list)
-
- @mock.patch(MODPATH + "glob.glob")
- @mock.patch(MODPATH + "ug_util.normalize_users_groups")
- @mock.patch(MODPATH + "os.path.exists")
- def test_handle_cfg_without_disable_root(self, m_path_exists, m_nug,
- m_glob, m_setup_keys):
- """Test handle with disable_root == False."""
- # When disable_root == False, the ssh redirect for root is skipped
- cfg = {"disable_root": False}
- keys = ["key1"]
- user = "clouduser"
- m_glob.return_value = [] # Return no matching keys to prevent removal
- # Mock os.path.exits to True to short-circuit the key writing logic
- m_path_exists.return_value = True
- m_nug.return_value = ({user: {"default": user}}, {})
- cloud = self.tmp_cloud(
- distro='ubuntu', metadata={'public-keys': keys})
- cloud.get_public_ssh_keys = mock.Mock(return_value=keys)
- cc_ssh.handle("name", cfg, cloud, LOG, None)
-
- self.assertEqual([mock.call(set(keys), user),
- mock.call(set(keys), "root", options="")],
- m_setup_keys.call_args_list)
-
- @mock.patch(MODPATH + "glob.glob")
- @mock.patch(MODPATH + "ug_util.normalize_users_groups")
- @mock.patch(MODPATH + "os.path.exists")
- def test_handle_publish_hostkeys_default(
- self, m_path_exists, m_nug, m_glob, m_setup_keys):
- """Test handle with various configs for ssh_publish_hostkeys."""
- self._publish_hostkey_test_setup()
- cc_ssh.PUBLISH_HOST_KEYS = True
- keys = ["key1"]
- user = "clouduser"
- # Return no matching keys for first glob, test keys for second.
- m_glob.side_effect = iter([
- [],
- self.test_hostkey_files,
- ])
- # Mock os.path.exits to True to short-circuit the key writing logic
- m_path_exists.return_value = True
- m_nug.return_value = ({user: {"default": user}}, {})
- cloud = self.tmp_cloud(
- distro='ubuntu', metadata={'public-keys': keys})
- cloud.datasource.publish_host_keys = mock.Mock()
-
- cfg = {}
- expected_call = [self.test_hostkeys[key_type] for key_type
- in KEY_NAMES_NO_DSA]
- cc_ssh.handle("name", cfg, cloud, LOG, None)
- self.assertEqual([mock.call(expected_call)],
- cloud.datasource.publish_host_keys.call_args_list)
-
- @mock.patch(MODPATH + "glob.glob")
- @mock.patch(MODPATH + "ug_util.normalize_users_groups")
- @mock.patch(MODPATH + "os.path.exists")
- def test_handle_publish_hostkeys_config_enable(
- self, m_path_exists, m_nug, m_glob, m_setup_keys):
- """Test handle with various configs for ssh_publish_hostkeys."""
- self._publish_hostkey_test_setup()
- cc_ssh.PUBLISH_HOST_KEYS = False
- keys = ["key1"]
- user = "clouduser"
- # Return no matching keys for first glob, test keys for second.
- m_glob.side_effect = iter([
- [],
- self.test_hostkey_files,
- ])
- # Mock os.path.exits to True to short-circuit the key writing logic
- m_path_exists.return_value = True
- m_nug.return_value = ({user: {"default": user}}, {})
- cloud = self.tmp_cloud(
- distro='ubuntu', metadata={'public-keys': keys})
- cloud.datasource.publish_host_keys = mock.Mock()
-
- cfg = {'ssh_publish_hostkeys': {'enabled': True}}
- expected_call = [self.test_hostkeys[key_type] for key_type
- in KEY_NAMES_NO_DSA]
- cc_ssh.handle("name", cfg, cloud, LOG, None)
- self.assertEqual([mock.call(expected_call)],
- cloud.datasource.publish_host_keys.call_args_list)
-
- @mock.patch(MODPATH + "glob.glob")
- @mock.patch(MODPATH + "ug_util.normalize_users_groups")
- @mock.patch(MODPATH + "os.path.exists")
- def test_handle_publish_hostkeys_config_disable(
- self, m_path_exists, m_nug, m_glob, m_setup_keys):
- """Test handle with various configs for ssh_publish_hostkeys."""
- self._publish_hostkey_test_setup()
- cc_ssh.PUBLISH_HOST_KEYS = True
- keys = ["key1"]
- user = "clouduser"
- # Return no matching keys for first glob, test keys for second.
- m_glob.side_effect = iter([
- [],
- self.test_hostkey_files,
- ])
- # Mock os.path.exits to True to short-circuit the key writing logic
- m_path_exists.return_value = True
- m_nug.return_value = ({user: {"default": user}}, {})
- cloud = self.tmp_cloud(
- distro='ubuntu', metadata={'public-keys': keys})
- cloud.datasource.publish_host_keys = mock.Mock()
-
- cfg = {'ssh_publish_hostkeys': {'enabled': False}}
- cc_ssh.handle("name", cfg, cloud, LOG, None)
- self.assertFalse(cloud.datasource.publish_host_keys.call_args_list)
- cloud.datasource.publish_host_keys.assert_not_called()
-
- @mock.patch(MODPATH + "glob.glob")
- @mock.patch(MODPATH + "ug_util.normalize_users_groups")
- @mock.patch(MODPATH + "os.path.exists")
- def test_handle_publish_hostkeys_config_blacklist(
- self, m_path_exists, m_nug, m_glob, m_setup_keys):
- """Test handle with various configs for ssh_publish_hostkeys."""
- self._publish_hostkey_test_setup()
- cc_ssh.PUBLISH_HOST_KEYS = True
- keys = ["key1"]
- user = "clouduser"
- # Return no matching keys for first glob, test keys for second.
- m_glob.side_effect = iter([
- [],
- self.test_hostkey_files,
- ])
- # Mock os.path.exits to True to short-circuit the key writing logic
- m_path_exists.return_value = True
- m_nug.return_value = ({user: {"default": user}}, {})
- cloud = self.tmp_cloud(
- distro='ubuntu', metadata={'public-keys': keys})
- cloud.datasource.publish_host_keys = mock.Mock()
-
- cfg = {'ssh_publish_hostkeys': {'enabled': True,
- 'blacklist': ['dsa', 'rsa']}}
- expected_call = [self.test_hostkeys[key_type] for key_type
- in ['ecdsa', 'ed25519']]
- cc_ssh.handle("name", cfg, cloud, LOG, None)
- self.assertEqual([mock.call(expected_call)],
- cloud.datasource.publish_host_keys.call_args_list)
-
- @mock.patch(MODPATH + "glob.glob")
- @mock.patch(MODPATH + "ug_util.normalize_users_groups")
- @mock.patch(MODPATH + "os.path.exists")
- def test_handle_publish_hostkeys_empty_blacklist(
- self, m_path_exists, m_nug, m_glob, m_setup_keys):
- """Test handle with various configs for ssh_publish_hostkeys."""
- self._publish_hostkey_test_setup()
- cc_ssh.PUBLISH_HOST_KEYS = True
- keys = ["key1"]
- user = "clouduser"
- # Return no matching keys for first glob, test keys for second.
- m_glob.side_effect = iter([
- [],
- self.test_hostkey_files,
- ])
- # Mock os.path.exits to True to short-circuit the key writing logic
- m_path_exists.return_value = True
- m_nug.return_value = ({user: {"default": user}}, {})
- cloud = self.tmp_cloud(
- distro='ubuntu', metadata={'public-keys': keys})
- cloud.datasource.publish_host_keys = mock.Mock()
-
- cfg = {'ssh_publish_hostkeys': {'enabled': True,
- 'blacklist': []}}
- expected_call = [self.test_hostkeys[key_type] for key_type
- in cc_ssh.GENERATE_KEY_NAMES]
- cc_ssh.handle("name", cfg, cloud, LOG, None)
- self.assertEqual([mock.call(expected_call)],
- cloud.datasource.publish_host_keys.call_args_list)
-
- @mock.patch(MODPATH + "ug_util.normalize_users_groups")
- @mock.patch(MODPATH + "util.write_file")
- def test_handle_ssh_keys_in_cfg(self, m_write_file, m_nug, m_setup_keys):
- """Test handle with ssh keys and certificate."""
- # Populate a config dictionary to pass to handle() as well
- # as the expected file-writing calls.
- cfg = {"ssh_keys": {}}
-
- expected_calls = []
- for key_type in cc_ssh.GENERATE_KEY_NAMES:
- private_name = "{}_private".format(key_type)
- public_name = "{}_public".format(key_type)
- cert_name = "{}_certificate".format(key_type)
-
- # Actual key contents don"t have to be realistic
- private_value = "{}_PRIVATE_KEY".format(key_type)
- public_value = "{}_PUBLIC_KEY".format(key_type)
- cert_value = "{}_CERT_KEY".format(key_type)
-
- cfg["ssh_keys"][private_name] = private_value
- cfg["ssh_keys"][public_name] = public_value
- cfg["ssh_keys"][cert_name] = cert_value
-
- expected_calls.extend([
- mock.call(
- '/etc/ssh/ssh_host_{}_key'.format(key_type),
- private_value,
- 384
- ),
- mock.call(
- '/etc/ssh/ssh_host_{}_key.pub'.format(key_type),
- public_value,
- 384
- ),
- mock.call(
- '/etc/ssh/ssh_host_{}_key-cert.pub'.format(key_type),
- cert_value,
- 384
- ),
- mock.call(
- '/etc/ssh/sshd_config',
- ('HostCertificate /etc/ssh/ssh_host_{}_key-cert.pub'
- '\n'.format(key_type)),
- preserve_mode=True
- )
- ])
-
- # Run the handler.
- m_nug.return_value = ([], {})
- with mock.patch(MODPATH + 'ssh_util.parse_ssh_config',
- return_value=[]):
- cc_ssh.handle("name", cfg, self.tmp_cloud(distro='ubuntu'),
- LOG, None)
-
- # Check that all expected output has been done.
- for call_ in expected_calls:
- self.assertIn(call_, m_write_file.call_args_list)