From 9b52405c6f0de5e00d5ee9c1d13540425d8f6bf5 Mon Sep 17 00:00:00 2001 From: Emanuele Giuseppe Esposito Date: Mon, 12 Jul 2021 20:21:02 +0200 Subject: ssh-util: allow cloudinit to merge all ssh keys into a custom user file, defined in AuthorizedKeysFile (#937) This patch aims to fix LP1911680, by analyzing the files provided in sshd_config and merge all keys into an user-specific file. Also introduces additional tests to cover this specific case. The file is picked by analyzing the path given in AuthorizedKeysFile. If it points inside the current user folder (path is /home/user/*), it means it is an user-specific file, so we can copy all user-keys there. If it contains a %u or %h, it means that there will be a specific authorized_keys file for each user, so we can copy all user-keys there. If no path points to an user-specific file, for example when only /etc/ssh/authorized_keys is given, default to ~/.ssh/authorized_keys. Note that if there are more than a single user-specific file, the last one will be picked. Signed-off-by: Emanuele Giuseppe Esposito Co-authored-by: James Falcon LP: #1911680 RHBZ:1862967 --- tests/unittests/test_sshutil.py | 246 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 235 insertions(+), 11 deletions(-) (limited to 'tests/unittests') diff --git a/tests/unittests/test_sshutil.py b/tests/unittests/test_sshutil.py index fd1d1bac..bcb8044f 100644 --- a/tests/unittests/test_sshutil.py +++ b/tests/unittests/test_sshutil.py @@ -570,20 +570,33 @@ class TestBasicAuthorizedKeyParse(test_helpers.CiTestCase): ssh_util.render_authorizedkeysfile_paths( "%h/.keys", "/homedirs/bobby", "bobby")) + def test_all(self): + self.assertEqual( + ["/homedirs/bobby/.keys", "/homedirs/bobby/.secret/keys", + "/keys/path1", "/opt/bobby/keys"], + ssh_util.render_authorizedkeysfile_paths( + "%h/.keys .secret/keys /keys/path1 /opt/%u/keys", + "/homedirs/bobby", "bobby")) + class TestMultipleSshAuthorizedKeysFile(test_helpers.CiTestCase): @patch("cloudinit.ssh_util.pwd.getpwnam") def test_multiple_authorizedkeys_file_order1(self, m_getpwnam): - fpw = FakePwEnt(pw_name='bobby', pw_dir='/home2/bobby') + fpw = FakePwEnt(pw_name='bobby', pw_dir='/tmp/home2/bobby') m_getpwnam.return_value = fpw - authorized_keys = self.tmp_path('authorized_keys') + user_ssh_folder = "%s/.ssh" % fpw.pw_dir + + # /tmp/home2/bobby/.ssh/authorized_keys = rsa + authorized_keys = self.tmp_path('authorized_keys', dir=user_ssh_folder) util.write_file(authorized_keys, VALID_CONTENT['rsa']) - user_keys = self.tmp_path('user_keys') + # /tmp/home2/bobby/.ssh/user_keys = dsa + user_keys = self.tmp_path('user_keys', dir=user_ssh_folder) util.write_file(user_keys, VALID_CONTENT['dsa']) - sshd_config = self.tmp_path('sshd_config') + # /tmp/sshd_config + sshd_config = self.tmp_path('sshd_config', dir="/tmp") util.write_file( sshd_config, "AuthorizedKeysFile %s %s" % (authorized_keys, user_keys) @@ -593,33 +606,244 @@ class TestMultipleSshAuthorizedKeysFile(test_helpers.CiTestCase): fpw.pw_name, sshd_config) content = ssh_util.update_authorized_keys(auth_key_entries, []) - self.assertEqual("%s/.ssh/authorized_keys" % fpw.pw_dir, auth_key_fn) + self.assertEqual(user_keys, auth_key_fn) self.assertTrue(VALID_CONTENT['rsa'] in content) self.assertTrue(VALID_CONTENT['dsa'] in content) @patch("cloudinit.ssh_util.pwd.getpwnam") def test_multiple_authorizedkeys_file_order2(self, m_getpwnam): - fpw = FakePwEnt(pw_name='suzie', pw_dir='/home/suzie') + fpw = FakePwEnt(pw_name='suzie', pw_dir='/tmp/home/suzie') m_getpwnam.return_value = fpw - authorized_keys = self.tmp_path('authorized_keys') + user_ssh_folder = "%s/.ssh" % fpw.pw_dir + + # /tmp/home/suzie/.ssh/authorized_keys = rsa + authorized_keys = self.tmp_path('authorized_keys', dir=user_ssh_folder) util.write_file(authorized_keys, VALID_CONTENT['rsa']) - user_keys = self.tmp_path('user_keys') + # /tmp/home/suzie/.ssh/user_keys = dsa + user_keys = self.tmp_path('user_keys', dir=user_ssh_folder) util.write_file(user_keys, VALID_CONTENT['dsa']) - sshd_config = self.tmp_path('sshd_config') + # /tmp/sshd_config + sshd_config = self.tmp_path('sshd_config', dir="/tmp") util.write_file( sshd_config, - "AuthorizedKeysFile %s %s" % (authorized_keys, user_keys) + "AuthorizedKeysFile %s %s" % (user_keys, authorized_keys) ) (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys( - fpw.pw_name, sshd_config + fpw.pw_name, sshd_config) + content = ssh_util.update_authorized_keys(auth_key_entries, []) + + self.assertEqual(authorized_keys, auth_key_fn) + self.assertTrue(VALID_CONTENT['rsa'] in content) + self.assertTrue(VALID_CONTENT['dsa'] in content) + + @patch("cloudinit.ssh_util.pwd.getpwnam") + def test_multiple_authorizedkeys_file_local_global(self, m_getpwnam): + fpw = FakePwEnt(pw_name='bobby', pw_dir='/tmp/home2/bobby') + m_getpwnam.return_value = fpw + user_ssh_folder = "%s/.ssh" % fpw.pw_dir + + # /tmp/home2/bobby/.ssh/authorized_keys = rsa + authorized_keys = self.tmp_path('authorized_keys', dir=user_ssh_folder) + util.write_file(authorized_keys, VALID_CONTENT['rsa']) + + # /tmp/home2/bobby/.ssh/user_keys = dsa + user_keys = self.tmp_path('user_keys', dir=user_ssh_folder) + util.write_file(user_keys, VALID_CONTENT['dsa']) + + # /tmp/etc/ssh/authorized_keys = ecdsa + authorized_keys_global = self.tmp_path('etc/ssh/authorized_keys', + dir="/tmp") + util.write_file(authorized_keys_global, VALID_CONTENT['ecdsa']) + + # /tmp/sshd_config + sshd_config = self.tmp_path('sshd_config', dir="/tmp") + util.write_file( + sshd_config, + "AuthorizedKeysFile %s %s %s" % (authorized_keys_global, + user_keys, authorized_keys) + ) + + (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys( + fpw.pw_name, sshd_config) + content = ssh_util.update_authorized_keys(auth_key_entries, []) + + self.assertEqual(authorized_keys, auth_key_fn) + self.assertTrue(VALID_CONTENT['rsa'] in content) + self.assertTrue(VALID_CONTENT['ecdsa'] in content) + self.assertTrue(VALID_CONTENT['dsa'] in content) + + @patch("cloudinit.ssh_util.pwd.getpwnam") + def test_multiple_authorizedkeys_file_local_global2(self, m_getpwnam): + fpw = FakePwEnt(pw_name='bobby', pw_dir='/tmp/home2/bobby') + m_getpwnam.return_value = fpw + user_ssh_folder = "%s/.ssh" % fpw.pw_dir + + # /tmp/home2/bobby/.ssh/authorized_keys2 = rsa + authorized_keys = self.tmp_path('authorized_keys2', + dir=user_ssh_folder) + util.write_file(authorized_keys, VALID_CONTENT['rsa']) + + # /tmp/home2/bobby/.ssh/user_keys3 = dsa + user_keys = self.tmp_path('user_keys3', dir=user_ssh_folder) + util.write_file(user_keys, VALID_CONTENT['dsa']) + + # /tmp/etc/ssh/authorized_keys = ecdsa + authorized_keys_global = self.tmp_path('etc/ssh/authorized_keys', + dir="/tmp") + util.write_file(authorized_keys_global, VALID_CONTENT['ecdsa']) + + # /tmp/sshd_config + sshd_config = self.tmp_path('sshd_config', dir="/tmp") + util.write_file( + sshd_config, + "AuthorizedKeysFile %s %s %s" % (authorized_keys_global, + authorized_keys, user_keys) + ) + + (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys( + fpw.pw_name, sshd_config) + content = ssh_util.update_authorized_keys(auth_key_entries, []) + + self.assertEqual(user_keys, auth_key_fn) + self.assertTrue(VALID_CONTENT['rsa'] in content) + self.assertTrue(VALID_CONTENT['ecdsa'] in content) + self.assertTrue(VALID_CONTENT['dsa'] in content) + + @patch("cloudinit.ssh_util.pwd.getpwnam") + def test_multiple_authorizedkeys_file_global(self, m_getpwnam): + fpw = FakePwEnt(pw_name='bobby', pw_dir='/tmp/home2/bobby') + m_getpwnam.return_value = fpw + + # /tmp/etc/ssh/authorized_keys = rsa + authorized_keys_global = self.tmp_path('etc/ssh/authorized_keys', + dir="/tmp") + util.write_file(authorized_keys_global, VALID_CONTENT['rsa']) + + # /tmp/sshd_config + sshd_config = self.tmp_path('sshd_config') + util.write_file( + sshd_config, + "AuthorizedKeysFile %s" % (authorized_keys_global) ) + + (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys( + fpw.pw_name, sshd_config) content = ssh_util.update_authorized_keys(auth_key_entries, []) self.assertEqual("%s/.ssh/authorized_keys" % fpw.pw_dir, auth_key_fn) self.assertTrue(VALID_CONTENT['rsa'] in content) + + @patch("cloudinit.ssh_util.pwd.getpwnam") + def test_multiple_authorizedkeys_file_multiuser(self, m_getpwnam): + fpw = FakePwEnt(pw_name='bobby', pw_dir='/tmp/home2/bobby') + m_getpwnam.return_value = fpw + user_ssh_folder = "%s/.ssh" % fpw.pw_dir + # /tmp/home2/bobby/.ssh/authorized_keys2 = rsa + authorized_keys = self.tmp_path('authorized_keys2', + dir=user_ssh_folder) + util.write_file(authorized_keys, VALID_CONTENT['rsa']) + # /tmp/home2/bobby/.ssh/user_keys3 = dsa + user_keys = self.tmp_path('user_keys3', dir=user_ssh_folder) + util.write_file(user_keys, VALID_CONTENT['dsa']) + + fpw2 = FakePwEnt(pw_name='suzie', pw_dir='/tmp/home/suzie') + user_ssh_folder = "%s/.ssh" % fpw2.pw_dir + # /tmp/home/suzie/.ssh/authorized_keys2 = ssh-xmss@openssh.com + authorized_keys2 = self.tmp_path('authorized_keys2', + dir=user_ssh_folder) + util.write_file(authorized_keys2, + VALID_CONTENT['ssh-xmss@openssh.com']) + + # /tmp/etc/ssh/authorized_keys = ecdsa + authorized_keys_global = self.tmp_path('etc/ssh/authorized_keys2', + dir="/tmp") + util.write_file(authorized_keys_global, VALID_CONTENT['ecdsa']) + + # /tmp/sshd_config + sshd_config = self.tmp_path('sshd_config', dir="/tmp") + util.write_file( + sshd_config, + "AuthorizedKeysFile %s %%h/.ssh/authorized_keys2 %s" % + (authorized_keys_global, user_keys) + ) + + # process first user + (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys( + fpw.pw_name, sshd_config) + content = ssh_util.update_authorized_keys(auth_key_entries, []) + + self.assertEqual(user_keys, auth_key_fn) + self.assertTrue(VALID_CONTENT['rsa'] in content) + self.assertTrue(VALID_CONTENT['ecdsa'] in content) + self.assertTrue(VALID_CONTENT['dsa'] in content) + self.assertFalse(VALID_CONTENT['ssh-xmss@openssh.com'] in content) + + m_getpwnam.return_value = fpw2 + # process second user + (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys( + fpw2.pw_name, sshd_config) + content = ssh_util.update_authorized_keys(auth_key_entries, []) + + self.assertEqual(authorized_keys2, auth_key_fn) + self.assertTrue(VALID_CONTENT['ssh-xmss@openssh.com'] in content) + self.assertTrue(VALID_CONTENT['ecdsa'] in content) + self.assertTrue(VALID_CONTENT['dsa'] in content) + self.assertFalse(VALID_CONTENT['rsa'] in content) + + @patch("cloudinit.ssh_util.pwd.getpwnam") + def test_multiple_authorizedkeys_file_multiuser2(self, m_getpwnam): + fpw = FakePwEnt(pw_name='bobby', pw_dir='/tmp/home/bobby') + m_getpwnam.return_value = fpw + user_ssh_folder = "%s/.ssh" % fpw.pw_dir + # /tmp/home/bobby/.ssh/authorized_keys2 = rsa + authorized_keys = self.tmp_path('authorized_keys2', + dir=user_ssh_folder) + util.write_file(authorized_keys, VALID_CONTENT['rsa']) + # /tmp/home/bobby/.ssh/user_keys3 = dsa + user_keys = self.tmp_path('user_keys3', dir=user_ssh_folder) + util.write_file(user_keys, VALID_CONTENT['dsa']) + + fpw2 = FakePwEnt(pw_name='badguy', pw_dir='/tmp/home/badguy') + user_ssh_folder = "%s/.ssh" % fpw2.pw_dir + # /tmp/home/badguy/home/bobby = "" + authorized_keys2 = self.tmp_path('home/bobby', dir="/tmp/home/badguy") + + # /tmp/etc/ssh/authorized_keys = ecdsa + authorized_keys_global = self.tmp_path('etc/ssh/authorized_keys2', + dir="/tmp") + util.write_file(authorized_keys_global, VALID_CONTENT['ecdsa']) + + # /tmp/sshd_config + sshd_config = self.tmp_path('sshd_config', dir="/tmp") + util.write_file( + sshd_config, + "AuthorizedKeysFile %s %%h/.ssh/authorized_keys2 %s %s" % + (authorized_keys_global, user_keys, authorized_keys2) + ) + + # process first user + (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys( + fpw.pw_name, sshd_config) + content = ssh_util.update_authorized_keys(auth_key_entries, []) + + self.assertEqual(user_keys, auth_key_fn) + self.assertTrue(VALID_CONTENT['rsa'] in content) + self.assertTrue(VALID_CONTENT['ecdsa'] in content) + self.assertTrue(VALID_CONTENT['dsa'] in content) + + m_getpwnam.return_value = fpw2 + # process second user + (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys( + fpw2.pw_name, sshd_config) + content = ssh_util.update_authorized_keys(auth_key_entries, []) + + # badguy should not take the key from the other user! + self.assertEqual(authorized_keys2, auth_key_fn) + self.assertTrue(VALID_CONTENT['ecdsa'] in content) self.assertTrue(VALID_CONTENT['dsa'] in content) + self.assertFalse(VALID_CONTENT['rsa'] in content) # vi: ts=4 expandtab -- cgit v1.2.3