From 155847209e6a3ed5face91a133d8488a703f3f93 Mon Sep 17 00:00:00 2001 From: Rick Wright Date: Fri, 9 Aug 2019 17:11:05 +0000 Subject: Add support for publishing host keys to GCE guest attributes This adds an empty publish_host_keys() method to the default datasource that is called by cc_ssh.py. This feature can be controlled by the 'ssh_publish_hostkeys' config option. It is enabled by default but can be disabled by setting 'enabled' to false. Also, a blacklist of key types is supported. In addition, this change implements ssh_publish_hostkeys() for the GCE datasource, attempting to write the hostkeys to the instance's guest attributes. Using these hostkeys for ssh connections is currently supported by the alpha version of Google's 'gcloud' command-line tool. (On Google Compute Engine, this feature will be enabled by setting the 'enable-guest-attributes' metadata key to 'true' for the project/instance that you would like to use this feature for. When connecting to the instance for the first time using 'gcloud compute ssh' the hostkeys will be read from the guest attributes for the instance and written to the user's local known_hosts file for Google Compute Engine instances.) --- cloudinit/config/tests/test_ssh.py | 166 +++++++++++++++++++++++++++++++++++++ 1 file changed, 166 insertions(+) (limited to 'cloudinit/config/tests') diff --git a/cloudinit/config/tests/test_ssh.py b/cloudinit/config/tests/test_ssh.py index c8a4271f..e7789842 100644 --- a/cloudinit/config/tests/test_ssh.py +++ b/cloudinit/config/tests/test_ssh.py @@ -1,5 +1,6 @@ # This file is part of cloud-init. See LICENSE file for license information. +import os.path from cloudinit.config import cc_ssh from cloudinit import ssh_util @@ -12,6 +13,25 @@ MODPATH = "cloudinit.config.cc_ssh." class TestHandleSsh(CiTestCase): """Test cc_ssh handling of ssh config.""" + def _publish_hostkey_test_setup(self): + self.test_hostkeys = { + 'dsa': ('ssh-dss', 'AAAAB3NzaC1kc3MAAACB'), + 'ecdsa': ('ecdsa-sha2-nistp256', 'AAAAE2VjZ'), + 'ed25519': ('ssh-ed25519', 'AAAAC3NzaC1lZDI'), + 'rsa': ('ssh-rsa', 'AAAAB3NzaC1yc2EAAA'), + } + self.test_hostkey_files = [] + hostkey_tmpdir = self.tmp_dir() + for key_type in ['dsa', 'ecdsa', 'ed25519', 'rsa']: + key_data = self.test_hostkeys[key_type] + filename = 'ssh_host_%s_key.pub' % key_type + filepath = os.path.join(hostkey_tmpdir, filename) + self.test_hostkey_files.append(filepath) + with open(filepath, 'w') as f: + f.write(' '.join(key_data)) + + cc_ssh.KEY_FILE_TPL = os.path.join(hostkey_tmpdir, 'ssh_host_%s_key') + def test_apply_credentials_with_user(self, m_setup_keys): """Apply keys for the given user and root.""" keys = ["key1"] @@ -64,6 +84,7 @@ class TestHandleSsh(CiTestCase): # Mock os.path.exits to True to short-circuit the key writing logic m_path_exists.return_value = True m_nug.return_value = ([], {}) + cc_ssh.PUBLISH_HOST_KEYS = False cloud = self.tmp_cloud( distro='ubuntu', metadata={'public-keys': keys}) cc_ssh.handle("name", cfg, cloud, None, None) @@ -149,3 +170,148 @@ class TestHandleSsh(CiTestCase): self.assertEqual([mock.call(set(keys), user), mock.call(set(keys), "root", options="")], m_setup_keys.call_args_list) + + @mock.patch(MODPATH + "glob.glob") + @mock.patch(MODPATH + "ug_util.normalize_users_groups") + @mock.patch(MODPATH + "os.path.exists") + def test_handle_publish_hostkeys_default( + self, m_path_exists, m_nug, m_glob, m_setup_keys): + """Test handle with various configs for ssh_publish_hostkeys.""" + self._publish_hostkey_test_setup() + cc_ssh.PUBLISH_HOST_KEYS = True + keys = ["key1"] + user = "clouduser" + # Return no matching keys for first glob, test keys for second. + m_glob.side_effect = iter([ + [], + self.test_hostkey_files, + ]) + # Mock os.path.exits to True to short-circuit the key writing logic + m_path_exists.return_value = True + m_nug.return_value = ({user: {"default": user}}, {}) + cloud = self.tmp_cloud( + distro='ubuntu', metadata={'public-keys': keys}) + cloud.datasource.publish_host_keys = mock.Mock() + + cfg = {} + expected_call = [self.test_hostkeys[key_type] for key_type + in ['ecdsa', 'ed25519', 'rsa']] + cc_ssh.handle("name", cfg, cloud, None, None) + self.assertEqual([mock.call(expected_call)], + cloud.datasource.publish_host_keys.call_args_list) + + @mock.patch(MODPATH + "glob.glob") + @mock.patch(MODPATH + "ug_util.normalize_users_groups") + @mock.patch(MODPATH + "os.path.exists") + def test_handle_publish_hostkeys_config_enable( + self, m_path_exists, m_nug, m_glob, m_setup_keys): + """Test handle with various configs for ssh_publish_hostkeys.""" + self._publish_hostkey_test_setup() + cc_ssh.PUBLISH_HOST_KEYS = False + keys = ["key1"] + user = "clouduser" + # Return no matching keys for first glob, test keys for second. + m_glob.side_effect = iter([ + [], + self.test_hostkey_files, + ]) + # Mock os.path.exits to True to short-circuit the key writing logic + m_path_exists.return_value = True + m_nug.return_value = ({user: {"default": user}}, {}) + cloud = self.tmp_cloud( + distro='ubuntu', metadata={'public-keys': keys}) + cloud.datasource.publish_host_keys = mock.Mock() + + cfg = {'ssh_publish_hostkeys': {'enabled': True}} + expected_call = [self.test_hostkeys[key_type] for key_type + in ['ecdsa', 'ed25519', 'rsa']] + cc_ssh.handle("name", cfg, cloud, None, None) + self.assertEqual([mock.call(expected_call)], + cloud.datasource.publish_host_keys.call_args_list) + + @mock.patch(MODPATH + "glob.glob") + @mock.patch(MODPATH + "ug_util.normalize_users_groups") + @mock.patch(MODPATH + "os.path.exists") + def test_handle_publish_hostkeys_config_disable( + self, m_path_exists, m_nug, m_glob, m_setup_keys): + """Test handle with various configs for ssh_publish_hostkeys.""" + self._publish_hostkey_test_setup() + cc_ssh.PUBLISH_HOST_KEYS = True + keys = ["key1"] + user = "clouduser" + # Return no matching keys for first glob, test keys for second. + m_glob.side_effect = iter([ + [], + self.test_hostkey_files, + ]) + # Mock os.path.exits to True to short-circuit the key writing logic + m_path_exists.return_value = True + m_nug.return_value = ({user: {"default": user}}, {}) + cloud = self.tmp_cloud( + distro='ubuntu', metadata={'public-keys': keys}) + cloud.datasource.publish_host_keys = mock.Mock() + + cfg = {'ssh_publish_hostkeys': {'enabled': False}} + cc_ssh.handle("name", cfg, cloud, None, None) + self.assertFalse(cloud.datasource.publish_host_keys.call_args_list) + cloud.datasource.publish_host_keys.assert_not_called() + + @mock.patch(MODPATH + "glob.glob") + @mock.patch(MODPATH + "ug_util.normalize_users_groups") + @mock.patch(MODPATH + "os.path.exists") + def test_handle_publish_hostkeys_config_blacklist( + self, m_path_exists, m_nug, m_glob, m_setup_keys): + """Test handle with various configs for ssh_publish_hostkeys.""" + self._publish_hostkey_test_setup() + cc_ssh.PUBLISH_HOST_KEYS = True + keys = ["key1"] + user = "clouduser" + # Return no matching keys for first glob, test keys for second. + m_glob.side_effect = iter([ + [], + self.test_hostkey_files, + ]) + # Mock os.path.exits to True to short-circuit the key writing logic + m_path_exists.return_value = True + m_nug.return_value = ({user: {"default": user}}, {}) + cloud = self.tmp_cloud( + distro='ubuntu', metadata={'public-keys': keys}) + cloud.datasource.publish_host_keys = mock.Mock() + + cfg = {'ssh_publish_hostkeys': {'enabled': True, + 'blacklist': ['dsa', 'rsa']}} + expected_call = [self.test_hostkeys[key_type] for key_type + in ['ecdsa', 'ed25519']] + cc_ssh.handle("name", cfg, cloud, None, None) + self.assertEqual([mock.call(expected_call)], + cloud.datasource.publish_host_keys.call_args_list) + + @mock.patch(MODPATH + "glob.glob") + @mock.patch(MODPATH + "ug_util.normalize_users_groups") + @mock.patch(MODPATH + "os.path.exists") + def test_handle_publish_hostkeys_empty_blacklist( + self, m_path_exists, m_nug, m_glob, m_setup_keys): + """Test handle with various configs for ssh_publish_hostkeys.""" + self._publish_hostkey_test_setup() + cc_ssh.PUBLISH_HOST_KEYS = True + keys = ["key1"] + user = "clouduser" + # Return no matching keys for first glob, test keys for second. + m_glob.side_effect = iter([ + [], + self.test_hostkey_files, + ]) + # Mock os.path.exits to True to short-circuit the key writing logic + m_path_exists.return_value = True + m_nug.return_value = ({user: {"default": user}}, {}) + cloud = self.tmp_cloud( + distro='ubuntu', metadata={'public-keys': keys}) + cloud.datasource.publish_host_keys = mock.Mock() + + cfg = {'ssh_publish_hostkeys': {'enabled': True, + 'blacklist': []}} + expected_call = [self.test_hostkeys[key_type] for key_type + in ['dsa', 'ecdsa', 'ed25519', 'rsa']] + cc_ssh.handle("name", cfg, cloud, None, None) + self.assertEqual([mock.call(expected_call)], + cloud.datasource.publish_host_keys.call_args_list) -- cgit v1.2.3