summaryrefslogtreecommitdiff
path: root/cloudinit/config/tests/test_ssh.py
diff options
context:
space:
mode:
Diffstat (limited to 'cloudinit/config/tests/test_ssh.py')
-rw-r--r--cloudinit/config/tests/test_ssh.py166
1 files changed, 166 insertions, 0 deletions
diff --git a/cloudinit/config/tests/test_ssh.py b/cloudinit/config/tests/test_ssh.py
index c8a4271f..e7789842 100644
--- a/cloudinit/config/tests/test_ssh.py
+++ b/cloudinit/config/tests/test_ssh.py
@@ -1,5 +1,6 @@
# This file is part of cloud-init. See LICENSE file for license information.
+import os.path
from cloudinit.config import cc_ssh
from cloudinit import ssh_util
@@ -12,6 +13,25 @@ MODPATH = "cloudinit.config.cc_ssh."
class TestHandleSsh(CiTestCase):
"""Test cc_ssh handling of ssh config."""
+ def _publish_hostkey_test_setup(self):
+ self.test_hostkeys = {
+ 'dsa': ('ssh-dss', 'AAAAB3NzaC1kc3MAAACB'),
+ 'ecdsa': ('ecdsa-sha2-nistp256', 'AAAAE2VjZ'),
+ 'ed25519': ('ssh-ed25519', 'AAAAC3NzaC1lZDI'),
+ 'rsa': ('ssh-rsa', 'AAAAB3NzaC1yc2EAAA'),
+ }
+ self.test_hostkey_files = []
+ hostkey_tmpdir = self.tmp_dir()
+ for key_type in ['dsa', 'ecdsa', 'ed25519', 'rsa']:
+ key_data = self.test_hostkeys[key_type]
+ filename = 'ssh_host_%s_key.pub' % key_type
+ filepath = os.path.join(hostkey_tmpdir, filename)
+ self.test_hostkey_files.append(filepath)
+ with open(filepath, 'w') as f:
+ f.write(' '.join(key_data))
+
+ cc_ssh.KEY_FILE_TPL = os.path.join(hostkey_tmpdir, 'ssh_host_%s_key')
+
def test_apply_credentials_with_user(self, m_setup_keys):
"""Apply keys for the given user and root."""
keys = ["key1"]
@@ -64,6 +84,7 @@ class TestHandleSsh(CiTestCase):
# Mock os.path.exits to True to short-circuit the key writing logic
m_path_exists.return_value = True
m_nug.return_value = ([], {})
+ cc_ssh.PUBLISH_HOST_KEYS = False
cloud = self.tmp_cloud(
distro='ubuntu', metadata={'public-keys': keys})
cc_ssh.handle("name", cfg, cloud, None, None)
@@ -149,3 +170,148 @@ class TestHandleSsh(CiTestCase):
self.assertEqual([mock.call(set(keys), user),
mock.call(set(keys), "root", options="")],
m_setup_keys.call_args_list)
+
+ @mock.patch(MODPATH + "glob.glob")
+ @mock.patch(MODPATH + "ug_util.normalize_users_groups")
+ @mock.patch(MODPATH + "os.path.exists")
+ def test_handle_publish_hostkeys_default(
+ self, m_path_exists, m_nug, m_glob, m_setup_keys):
+ """Test handle with various configs for ssh_publish_hostkeys."""
+ self._publish_hostkey_test_setup()
+ cc_ssh.PUBLISH_HOST_KEYS = True
+ keys = ["key1"]
+ user = "clouduser"
+ # Return no matching keys for first glob, test keys for second.
+ m_glob.side_effect = iter([
+ [],
+ self.test_hostkey_files,
+ ])
+ # Mock os.path.exits to True to short-circuit the key writing logic
+ m_path_exists.return_value = True
+ m_nug.return_value = ({user: {"default": user}}, {})
+ cloud = self.tmp_cloud(
+ distro='ubuntu', metadata={'public-keys': keys})
+ cloud.datasource.publish_host_keys = mock.Mock()
+
+ cfg = {}
+ expected_call = [self.test_hostkeys[key_type] for key_type
+ in ['ecdsa', 'ed25519', 'rsa']]
+ cc_ssh.handle("name", cfg, cloud, None, None)
+ self.assertEqual([mock.call(expected_call)],
+ cloud.datasource.publish_host_keys.call_args_list)
+
+ @mock.patch(MODPATH + "glob.glob")
+ @mock.patch(MODPATH + "ug_util.normalize_users_groups")
+ @mock.patch(MODPATH + "os.path.exists")
+ def test_handle_publish_hostkeys_config_enable(
+ self, m_path_exists, m_nug, m_glob, m_setup_keys):
+ """Test handle with various configs for ssh_publish_hostkeys."""
+ self._publish_hostkey_test_setup()
+ cc_ssh.PUBLISH_HOST_KEYS = False
+ keys = ["key1"]
+ user = "clouduser"
+ # Return no matching keys for first glob, test keys for second.
+ m_glob.side_effect = iter([
+ [],
+ self.test_hostkey_files,
+ ])
+ # Mock os.path.exits to True to short-circuit the key writing logic
+ m_path_exists.return_value = True
+ m_nug.return_value = ({user: {"default": user}}, {})
+ cloud = self.tmp_cloud(
+ distro='ubuntu', metadata={'public-keys': keys})
+ cloud.datasource.publish_host_keys = mock.Mock()
+
+ cfg = {'ssh_publish_hostkeys': {'enabled': True}}
+ expected_call = [self.test_hostkeys[key_type] for key_type
+ in ['ecdsa', 'ed25519', 'rsa']]
+ cc_ssh.handle("name", cfg, cloud, None, None)
+ self.assertEqual([mock.call(expected_call)],
+ cloud.datasource.publish_host_keys.call_args_list)
+
+ @mock.patch(MODPATH + "glob.glob")
+ @mock.patch(MODPATH + "ug_util.normalize_users_groups")
+ @mock.patch(MODPATH + "os.path.exists")
+ def test_handle_publish_hostkeys_config_disable(
+ self, m_path_exists, m_nug, m_glob, m_setup_keys):
+ """Test handle with various configs for ssh_publish_hostkeys."""
+ self._publish_hostkey_test_setup()
+ cc_ssh.PUBLISH_HOST_KEYS = True
+ keys = ["key1"]
+ user = "clouduser"
+ # Return no matching keys for first glob, test keys for second.
+ m_glob.side_effect = iter([
+ [],
+ self.test_hostkey_files,
+ ])
+ # Mock os.path.exits to True to short-circuit the key writing logic
+ m_path_exists.return_value = True
+ m_nug.return_value = ({user: {"default": user}}, {})
+ cloud = self.tmp_cloud(
+ distro='ubuntu', metadata={'public-keys': keys})
+ cloud.datasource.publish_host_keys = mock.Mock()
+
+ cfg = {'ssh_publish_hostkeys': {'enabled': False}}
+ cc_ssh.handle("name", cfg, cloud, None, None)
+ self.assertFalse(cloud.datasource.publish_host_keys.call_args_list)
+ cloud.datasource.publish_host_keys.assert_not_called()
+
+ @mock.patch(MODPATH + "glob.glob")
+ @mock.patch(MODPATH + "ug_util.normalize_users_groups")
+ @mock.patch(MODPATH + "os.path.exists")
+ def test_handle_publish_hostkeys_config_blacklist(
+ self, m_path_exists, m_nug, m_glob, m_setup_keys):
+ """Test handle with various configs for ssh_publish_hostkeys."""
+ self._publish_hostkey_test_setup()
+ cc_ssh.PUBLISH_HOST_KEYS = True
+ keys = ["key1"]
+ user = "clouduser"
+ # Return no matching keys for first glob, test keys for second.
+ m_glob.side_effect = iter([
+ [],
+ self.test_hostkey_files,
+ ])
+ # Mock os.path.exits to True to short-circuit the key writing logic
+ m_path_exists.return_value = True
+ m_nug.return_value = ({user: {"default": user}}, {})
+ cloud = self.tmp_cloud(
+ distro='ubuntu', metadata={'public-keys': keys})
+ cloud.datasource.publish_host_keys = mock.Mock()
+
+ cfg = {'ssh_publish_hostkeys': {'enabled': True,
+ 'blacklist': ['dsa', 'rsa']}}
+ expected_call = [self.test_hostkeys[key_type] for key_type
+ in ['ecdsa', 'ed25519']]
+ cc_ssh.handle("name", cfg, cloud, None, None)
+ self.assertEqual([mock.call(expected_call)],
+ cloud.datasource.publish_host_keys.call_args_list)
+
+ @mock.patch(MODPATH + "glob.glob")
+ @mock.patch(MODPATH + "ug_util.normalize_users_groups")
+ @mock.patch(MODPATH + "os.path.exists")
+ def test_handle_publish_hostkeys_empty_blacklist(
+ self, m_path_exists, m_nug, m_glob, m_setup_keys):
+ """Test handle with various configs for ssh_publish_hostkeys."""
+ self._publish_hostkey_test_setup()
+ cc_ssh.PUBLISH_HOST_KEYS = True
+ keys = ["key1"]
+ user = "clouduser"
+ # Return no matching keys for first glob, test keys for second.
+ m_glob.side_effect = iter([
+ [],
+ self.test_hostkey_files,
+ ])
+ # Mock os.path.exits to True to short-circuit the key writing logic
+ m_path_exists.return_value = True
+ m_nug.return_value = ({user: {"default": user}}, {})
+ cloud = self.tmp_cloud(
+ distro='ubuntu', metadata={'public-keys': keys})
+ cloud.datasource.publish_host_keys = mock.Mock()
+
+ cfg = {'ssh_publish_hostkeys': {'enabled': True,
+ 'blacklist': []}}
+ expected_call = [self.test_hostkeys[key_type] for key_type
+ in ['dsa', 'ecdsa', 'ed25519', 'rsa']]
+ cc_ssh.handle("name", cfg, cloud, None, None)
+ self.assertEqual([mock.call(expected_call)],
+ cloud.datasource.publish_host_keys.call_args_list)