diff options
Diffstat (limited to 'tests/unittests/test_sshutil.py')
-rw-r--r-- | tests/unittests/test_sshutil.py | 1199 |
1 files changed, 1072 insertions, 127 deletions
diff --git a/tests/unittests/test_sshutil.py b/tests/unittests/test_sshutil.py index 88a111e3..d614350e 100644 --- a/tests/unittests/test_sshutil.py +++ b/tests/unittests/test_sshutil.py @@ -1,25 +1,65 @@ # This file is part of cloud-init. See LICENSE file for license information. +import os from collections import namedtuple +from functools import partial from unittest.mock import patch -from cloudinit import ssh_util -from cloudinit.tests import helpers as test_helpers -from cloudinit import util +from cloudinit import ssh_util, util +from tests.unittests import helpers as test_helpers # https://stackoverflow.com/questions/11351032/ FakePwEnt = namedtuple( - 'FakePwEnt', - ['pw_dir', 'pw_gecos', 'pw_name', 'pw_passwd', 'pw_shell', 'pwd_uid']) + "FakePwEnt", + [ + "pw_name", + "pw_passwd", + "pw_uid", + "pw_gid", + "pw_gecos", + "pw_dir", + "pw_shell", + ], +) FakePwEnt.__new__.__defaults__ = tuple( - "UNSET_%s" % n for n in FakePwEnt._fields) + "UNSET_%s" % n for n in FakePwEnt._fields +) + + +def mock_get_owner(updated_permissions, value): + try: + return updated_permissions[value][0] + except ValueError: + return util.get_owner(value) + + +def mock_get_group(updated_permissions, value): + try: + return updated_permissions[value][1] + except ValueError: + return util.get_group(value) + + +def mock_get_user_groups(username): + return username + + +def mock_get_permissions(updated_permissions, value): + try: + return updated_permissions[value][2] + except ValueError: + return util.get_permissions(value) + + +def mock_getpwnam(users, username): + return users[username] # Do not use these public keys, most of them are fetched from # the testdata for OpenSSH, and their private keys are available # https://github.com/openssh/openssh-portable/tree/master/regress/unittests/sshkey/testdata VALID_CONTENT = { - 'dsa': ( + "dsa": ( "AAAAB3NzaC1kc3MAAACBAIrjOQSlSea19bExXBMBKBvcLhBoVvNBjCppNzllipF" "W4jgIOMcNanULRrZGjkOKat6MWJNetSbV1E6IOFDQ16rQgsh/OvYU9XhzM8seLa" "A21VszZuhIV7/2DE3vxu7B54zVzueG1O1Deq6goQCRGWBUnqO2yluJiG4HzrnDa" @@ -31,12 +71,12 @@ VALID_CONTENT = { "JNDnIqDHxTkc6LY2vu8Y2pQ3/bVnllZZOda2oD5HQ7ovygQa6CH+fbaZHbdDUX/" "5z7u2rVAlDw==" ), - 'ecdsa': ( + "ecdsa": ( "AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBITrGBB3cgJ" "J7fPxvtMW9H3oRisNpJ3OAslxZeyP7I0A9BPAW0RQIwHVtVnM7zrp4nI+JLZov/" "Ql7lc2leWL7CY=" ), - 'rsa': ( + "rsa": ( "AAAAB3NzaC1yc2EAAAABIwAAAQEA3I7VUf2l5gSn5uavROsc5HRDpZdQueUq5oz" "emNSj8T7enqKHOEaFoU2VoPgGEWC9RyzSQVeyD6s7APMcE82EtmW4skVEgEGSbD" "c1pvxzxtchBj78hJP6Cf5TCMFSXw+Fz5rF1dR23QDbN1mkHs7adr8GW4kSWqU7Q" @@ -44,11 +84,10 @@ VALID_CONTENT = { "YWpMfYdPUnE7u536WqzFmsaqJctz3gBxH9Ex7dFtrxR4qiqEr9Qtlu3xGn7Bw07" "/+i1D+ey3ONkZLN+LQ714cgj8fRS4Hj29SCmXp5Kt5/82cD/VN3NtHw==" ), - 'ed25519': ( - "AAAAC3NzaC1lZDI1NTE5AAAAIA1J77+CrJ8p6/vWCEzuylqJNMHUP/XmeYyGVWb" - "8lnDd" + "ed25519": ( + "AAAAC3NzaC1lZDI1NTE5AAAAIA1J77+CrJ8p6/vWCEzuylqJNMHUP/XmeYyGVWb8lnDd" ), - 'ecdsa-sha2-nistp256-cert-v01@openssh.com': ( + "ecdsa-sha2-nistp256-cert-v01@openssh.com": ( "AAAAKGVjZHNhLXNoYTItbmlzdHAyNTYtY2VydC12MDFAb3BlbnNzaC5jb20AAAA" "gQIfwT/+UX68/hlKsdKuaOuAVB6ftTg03SlP/uH4OBEwAAAAIbmlzdHAyNTYAAA" "BBBEjA0gjJmPM6La3sXyfNlnjilvvGY6I2M8SvJj4o3X/46wcUbPWTaj4RF3EXw" @@ -63,12 +102,12 @@ VALID_CONTENT = { "2tM3QXkDcwdP0SxSEW5yy4XV5oAAAAhANNMm1cdVlAt3hmycQgdD82zPlg5YvVO" "iN0SQTbgVD8i" ), - 'ecdsa-sha2-nistp256': ( + "ecdsa-sha2-nistp256": ( "AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBEjA0gjJmPM" "6La3sXyfNlnjilvvGY6I2M8SvJj4o3X/46wcUbPWTaj4RF3EXwHvNxplYBwdPlk" "2zEecvf9Cs2BM=" ), - 'ecdsa-sha2-nistp384-cert-v01@openssh.com': ( + "ecdsa-sha2-nistp384-cert-v01@openssh.com": ( "AAAAKGVjZHNhLXNoYTItbmlzdHAzODQtY2VydC12MDFAb3BlbnNzaC5jb20AAAA" "grnSvDsK1EnCZndO1IyGWcGkVgVSkPWi/XO2ybPFyLVUAAAAIbmlzdHAzODQAAA" "BhBAaYSQs+8TT0Tzciy0dorwhur6yzOGUrYQ6ueUQYWbE7eNdHmhsVrlpGPgSaY" @@ -85,12 +124,12 @@ VALID_CONTENT = { "RVYqYQgAAADAiit0UCMDAUbjD+R2x4LvU3x/t8G3sdqDLRNfMRpjZpvcS8AwC+Y" "VFVSQNn0AyzW0=" ), - 'ecdsa-sha2-nistp384': ( + "ecdsa-sha2-nistp384": ( "AAAAE2VjZHNhLXNoYTItbmlzdHAzODQAAAAIbmlzdHAzODQAAABhBAaYSQs+8TT" "0Tzciy0dorwhur6yzOGUrYQ6ueUQYWbE7eNdHmhsVrlpGPgSaYByhXtAJiPOMqL" "U5h0eb3sCtM3ek4NvjXFTGTqPrrxJI6q0OsgrtkGE7UM9ZsfMm7q6BOA==" ), - 'ecdsa-sha2-nistp521-cert-v01@openssh.com': ( + "ecdsa-sha2-nistp521-cert-v01@openssh.com": ( "AAAAKGVjZHNhLXNoYTItbmlzdHA1MjEtY2VydC12MDFAb3BlbnNzaC5jb20AAAA" "gGmRzkkMvRFk1V5U3m3mQ2nfW20SJVXk1NKnT5iZGDcEAAAAIbmlzdHA1MjEAAA" "CFBAHosAOHAI1ZkerbKYQ72S6uit1u77PCj/OalZtXgsxv0TTAZB273puG2X94C" @@ -109,13 +148,13 @@ VALID_CONTENT = { "AAAQgEzkIpX3yKXPaPcK17mNx40ujEDitm4ARmbhAge0sFhZtf7YIgI55b6vkI8" "JvMJkzQCBF1cpNOaIpVh1nFZNBphMQ==" ), - 'ecdsa-sha2-nistp521': ( + "ecdsa-sha2-nistp521": ( "AAAAE2VjZHNhLXNoYTItbmlzdHA1MjEAAAAIbmlzdHA1MjEAAACFBAHosAOHAI1" "ZkerbKYQ72S6uit1u77PCj/OalZtXgsxv0TTAZB273puG2X94CQ8yyNHcby87zF" "ZHdv5BSKyZ/cyREAAeiAcSakop9VS3+bUfZpEIqwBZXarwUjnRnxprkcQ0rfCCd" "agkGZr/OA7DemK2D8tKLTHsKoEEWNImo6/pXDkFxA==" ), - 'sk-ecdsa-sha2-nistp256-cert-v01@openssh.com': ( + "sk-ecdsa-sha2-nistp256-cert-v01@openssh.com": ( "AAAAIHNzaC1lZDI1NTE5LWNlcnQtdjAxQG9wZW5zc2guY29tAAAAIIxzuxl4z3u" "wAIslne8Huft+1n1IhHAlNbWZkQyyECCGAAAAIFOG6kY7Rf4UtCFvPwKgo/BztX" "ck2xC4a2WyA34XtIwZAAAAAAAAAAgAAAACAAAABmp1bGl1cwAAABIAAAAFaG9zd" @@ -124,12 +163,12 @@ VALID_CONTENT = { "AAFMAAAALc3NoLWVkMjU1MTkAAABABGTn+Bmz86Ajk+iqKCSdP5NClsYzn4alJd" "0V5bizhP0Kumc/HbqQfSt684J1WdSzih+EjvnTgBhK9jTBKb90AQ==" ), - 'sk-ecdsa-sha2-nistp256@openssh.com': ( + "sk-ecdsa-sha2-nistp256@openssh.com": ( "AAAAInNrLWVjZHNhLXNoYTItbmlzdHAyNTZAb3BlbnNzaC5jb20AAAAIbmlzdHA" "yNTYAAABBBIELQJ2DgvaX1yQlKFokfWM2suuaCFI2qp0eJodHyg6O4ifxc3XpRK" "d1OS8dNYQtE/YjdXSrA+AOnMF5ns2Nkx4AAAAEc3NoOg==" ), - 'sk-ssh-ed25519-cert-v01@openssh.com': ( + "sk-ssh-ed25519-cert-v01@openssh.com": ( "AAAAIHNzaC1lZDI1NTE5LWNlcnQtdjAxQG9wZW5zc2guY29tAAAAIIxzuxl4z3u" "wAIslne8Huft+1n1IhHAlNbWZkQyyECCGAAAAIFOG6kY7Rf4UtCFvPwKgo/BztX" "ck2xC4a2WyA34XtIwZAAAAAAAAAAgAAAACAAAABmp1bGl1cwAAABIAAAAFaG9zd" @@ -138,11 +177,11 @@ VALID_CONTENT = { "AAFMAAAALc3NoLWVkMjU1MTkAAABABGTn+Bmz86Ajk+iqKCSdP5NClsYzn4alJd" "0V5bizhP0Kumc/HbqQfSt684J1WdSzih+EjvnTgBhK9jTBKb90AQ==" ), - 'sk-ssh-ed25519@openssh.com': ( + "sk-ssh-ed25519@openssh.com": ( "AAAAGnNrLXNzaC1lZDI1NTE5QG9wZW5zc2guY29tAAAAICFo/k5LU8863u66YC9" "eUO2170QduohPURkQnbLa/dczAAAABHNzaDo=" ), - 'ssh-dss-cert-v01@openssh.com': ( + "ssh-dss-cert-v01@openssh.com": ( "AAAAHHNzaC1kc3MtY2VydC12MDFAb3BlbnNzaC5jb20AAAAgdTlbNU9Hn9Qng3F" "HxwH971bxCIoq1ern/QWFFDWXgmYAAACBAPqS600VGwdPAQC/p3f0uGyrLVql0c" "Fn1zYd/JGvtabKnIYjLaYprje/NcjwI3CZFJiz4Dp3S8kLs+X5/1DMn/Tg1Y4D4" @@ -159,7 +198,7 @@ VALID_CONTENT = { "+F7SMGQAAAFMAAAALc3NoLWVkMjU1MTkAAABAh/z1LIdNL1b66tQ8t9DY9BTB3B" "QKpTKmc7ezyFKLwl96yaIniZwD9Ticdbe/8i/Li3uCFE3EAt8NAIv9zff8Bg==" ), - 'ssh-dss': ( + "ssh-dss": ( "AAAAB3NzaC1kc3MAAACBAPqS600VGwdPAQC/p3f0uGyrLVql0cFn1zYd/JGvtab" "KnIYjLaYprje/NcjwI3CZFJiz4Dp3S8kLs+X5/1DMn/Tg1Y4D4yLB+6vCtHcJF7" "rVBFhvw/KZwc7G54ez3khyOtsg82fzpyOc8/mq+/+C5TMKO7DDjMF0k5emWKCsa" @@ -171,7 +210,7 @@ VALID_CONTENT = { "GIf95LiLSgaXMjko7joot+LK84ltLymwZ4QMnYjnZSSclf1UuyQMcUtb34+I0u9" "Ycnyhp2mSFsQt" ), - 'ssh-ed25519-cert-v01@openssh.com': ( + "ssh-ed25519-cert-v01@openssh.com": ( "AAAAIHNzaC1lZDI1NTE5LWNlcnQtdjAxQG9wZW5zc2guY29tAAAAIIxzuxl4z3u" "wAIslne8Huft+1n1IhHAlNbWZkQyyECCGAAAAIFOG6kY7Rf4UtCFvPwKgo/BztX" "ck2xC4a2WyA34XtIwZAAAAAAAAAAgAAAACAAAABmp1bGl1cwAAABIAAAAFaG9zd" @@ -180,11 +219,10 @@ VALID_CONTENT = { "AAFMAAAALc3NoLWVkMjU1MTkAAABABGTn+Bmz86Ajk+iqKCSdP5NClsYzn4alJd" "0V5bizhP0Kumc/HbqQfSt684J1WdSzih+EjvnTgBhK9jTBKb90AQ==" ), - 'ssh-ed25519': ( - "AAAAC3NzaC1lZDI1NTE5AAAAIFOG6kY7Rf4UtCFvPwKgo/BztXck2xC4a2WyA34" - "XtIwZ" + "ssh-ed25519": ( + "AAAAC3NzaC1lZDI1NTE5AAAAIFOG6kY7Rf4UtCFvPwKgo/BztXck2xC4a2WyA34XtIwZ" ), - 'ssh-rsa-cert-v01@openssh.com': ( + "ssh-rsa-cert-v01@openssh.com": ( "AAAAHHNzaC1yc2EtY2VydC12MDFAb3BlbnNzaC5jb20AAAAg98LhS2EHxLOWCLo" "pZPwHdg/RJXusnkOqQXSc9R7aITkAAAADAQABAAAAgQDLV5lUTt7FrADseB/CGh" "EZzpoojjEW5y8+ePvLppmK3MmMI18ud6vxzpK3bwZLYkVSyfJYI0HmIuGhdu7yM" @@ -195,13 +233,13 @@ VALID_CONTENT = { "he0jBkAAABTAAAAC3NzaC1lZDI1NTE5AAAAQI3QGlUCzC07KorupxpDkkGy6tni" "aZ8EvBflzvv+itXWNchGvfUeHmVT6aX0sRqehdz/lR+GmXRoZBhofwh0qAM=" ), - 'ssh-rsa': ( + "ssh-rsa": ( "AAAAB3NzaC1yc2EAAAADAQABAAAAgQDLV5lUTt7FrADseB/CGhEZzpoojjEW5y8" "+ePvLppmK3MmMI18ud6vxzpK3bwZLYkVSyfJYI0HmIuGhdu7yMrW6wb84gbq8C3" "1Xoe9EORcIUuGSvDKdNSM1SjlhDquRblDFB8kToqXyx1lqrXecXylxIUOL0jE+u" "0rU1967pDJx+w==" ), - 'ssh-xmss-cert-v01@openssh.com': ( + "ssh-xmss-cert-v01@openssh.com": ( "AAAAHXNzaC14bXNzLWNlcnQtdjAxQG9wZW5zc2guY29tAAAAIM2UD0IH+Igsekq" "xjTO5f36exX4WGRMCtDGPjwfbXblxAAAAFVhNU1NfU0hBMi0yNTZfVzE2X0gxMA" "AAAEDI83/K5JMOy0BMJgQypRdz35ApAnoQinMJ8ZMoZPaEJF8Z4rANQlfzaAXum" @@ -267,7 +305,7 @@ VALID_CONTENT = { "rNYClh8fQEQ8XuOCDpomMWu58YOTfbZNMDWs/Ou7RfCjX+VNwjPShDK9joMwWKc" "Jy3QalZbaoWtcyyvXxR2sqhVR9F7Cmasq4=" ), - 'ssh-xmss@openssh.com': ( + "ssh-xmss@openssh.com": ( "AAAAFHNzaC14bXNzQG9wZW5zc2guY29tAAAAFVhNU1NfU0hBMi0yNTZfVzE2X0g" "xMAAAAECqptWnK94d+Sj2xcdTu8gz+75lawZoLSZFqC5IhbYuT/Z3oBZCim6yt+" "HAmk6MKldl3Fg+74v4sR/SII0I0Jv/" @@ -278,19 +316,25 @@ KEY_TYPES = list(VALID_CONTENT.keys()) TEST_OPTIONS = ( "no-port-forwarding,no-agent-forwarding,no-X11-forwarding," - 'command="echo \'Please login as the user \"ubuntu\" rather than the' - 'user \"root\".\';echo;sleep 10"') + 'command="echo \'Please login as the user "ubuntu" rather than the' + 'user "root".\';echo;sleep 10"' +) class TestAuthKeyLineParser(test_helpers.CiTestCase): - def test_simple_parse(self): # test key line with common 3 fields (keytype, base64, comment) parser = ssh_util.AuthKeyLineParser() for ktype in KEY_TYPES: content = VALID_CONTENT[ktype] - comment = 'user-%s@host' % ktype - line = ' '.join((ktype, content, comment,)) + comment = "user-%s@host" % ktype + line = " ".join( + ( + ktype, + content, + comment, + ) + ) key = parser.parse(line) self.assertEqual(key.base64, content) @@ -303,7 +347,12 @@ class TestAuthKeyLineParser(test_helpers.CiTestCase): parser = ssh_util.AuthKeyLineParser() for ktype in KEY_TYPES: content = VALID_CONTENT[ktype] - line = ' '.join((ktype, content,)) + line = " ".join( + ( + ktype, + content, + ) + ) key = parser.parse(line) self.assertEqual(key.base64, content) @@ -317,8 +366,15 @@ class TestAuthKeyLineParser(test_helpers.CiTestCase): options = TEST_OPTIONS for ktype in KEY_TYPES: content = VALID_CONTENT[ktype] - comment = 'user-%s@host' % ktype - line = ' '.join((options, ktype, content, comment,)) + comment = "user-%s@host" % ktype + line = " ".join( + ( + options, + ktype, + content, + comment, + ) + ) key = parser.parse(line) self.assertEqual(key.base64, content) @@ -330,7 +386,7 @@ class TestAuthKeyLineParser(test_helpers.CiTestCase): # test key line with key type and base64 only parser = ssh_util.AuthKeyLineParser() - baseline = ' '.join(("rsa", VALID_CONTENT['rsa'], "user@host")) + baseline = " ".join(("rsa", VALID_CONTENT["rsa"], "user@host")) myopts = "no-port-forwarding,no-agent-forwarding" key = parser.parse("allowedopt" + " " + baseline) @@ -341,59 +397,62 @@ class TestAuthKeyLineParser(test_helpers.CiTestCase): def test_parse_invalid_keytype(self): parser = ssh_util.AuthKeyLineParser() - key = parser.parse(' '.join(["badkeytype", VALID_CONTENT['rsa']])) + key = parser.parse(" ".join(["badkeytype", VALID_CONTENT["rsa"]])) self.assertFalse(key.valid()) class TestUpdateAuthorizedKeys(test_helpers.CiTestCase): - def test_new_keys_replace(self): """new entries with the same base64 should replace old.""" orig_entries = [ - ' '.join(('rsa', VALID_CONTENT['rsa'], 'orig_comment1')), - ' '.join(('dsa', VALID_CONTENT['dsa'], 'orig_comment2'))] + " ".join(("rsa", VALID_CONTENT["rsa"], "orig_comment1")), + " ".join(("dsa", VALID_CONTENT["dsa"], "orig_comment2")), + ] new_entries = [ - ' '.join(('rsa', VALID_CONTENT['rsa'], 'new_comment1')), ] + " ".join(("rsa", VALID_CONTENT["rsa"], "new_comment1")), + ] - expected = '\n'.join([new_entries[0], orig_entries[1]]) + '\n' + expected = "\n".join([new_entries[0], orig_entries[1]]) + "\n" parser = ssh_util.AuthKeyLineParser() found = ssh_util.update_authorized_keys( [parser.parse(p) for p in orig_entries], - [parser.parse(p) for p in new_entries]) + [parser.parse(p) for p in new_entries], + ) self.assertEqual(expected, found) def test_new_invalid_keys_are_ignored(self): """new entries that are invalid should be skipped.""" orig_entries = [ - ' '.join(('rsa', VALID_CONTENT['rsa'], 'orig_comment1')), - ' '.join(('dsa', VALID_CONTENT['dsa'], 'orig_comment2'))] + " ".join(("rsa", VALID_CONTENT["rsa"], "orig_comment1")), + " ".join(("dsa", VALID_CONTENT["dsa"], "orig_comment2")), + ] new_entries = [ - ' '.join(('rsa', VALID_CONTENT['rsa'], 'new_comment1')), - 'xxx-invalid-thing1', - 'xxx-invalid-blob2' + " ".join(("rsa", VALID_CONTENT["rsa"], "new_comment1")), + "xxx-invalid-thing1", + "xxx-invalid-blob2", ] - expected = '\n'.join([new_entries[0], orig_entries[1]]) + '\n' + expected = "\n".join([new_entries[0], orig_entries[1]]) + "\n" parser = ssh_util.AuthKeyLineParser() found = ssh_util.update_authorized_keys( [parser.parse(p) for p in orig_entries], - [parser.parse(p) for p in new_entries]) + [parser.parse(p) for p in new_entries], + ) self.assertEqual(expected, found) class TestParseSSHConfig(test_helpers.CiTestCase): - def setUp(self): - self.load_file_patch = patch('cloudinit.ssh_util.util.load_file') + self.load_file_patch = patch("cloudinit.ssh_util.util.load_file") self.load_file = self.load_file_patch.start() - self.isfile_patch = patch('cloudinit.ssh_util.os.path.isfile') + self.isfile_patch = patch("cloudinit.ssh_util.os.path.isfile") self.isfile = self.isfile_patch.start() self.isfile.return_value = True @@ -404,60 +463,61 @@ class TestParseSSHConfig(test_helpers.CiTestCase): def test_not_a_file(self): self.isfile.return_value = False self.load_file.side_effect = IOError - ret = ssh_util.parse_ssh_config('not a real file') + ret = ssh_util.parse_ssh_config("not a real file") self.assertEqual([], ret) def test_empty_file(self): - self.load_file.return_value = '' - ret = ssh_util.parse_ssh_config('some real file') + self.load_file.return_value = "" + ret = ssh_util.parse_ssh_config("some real file") self.assertEqual([], ret) def test_comment_line(self): - comment_line = '# This is a comment' + comment_line = "# This is a comment" self.load_file.return_value = comment_line - ret = ssh_util.parse_ssh_config('some real file') + ret = ssh_util.parse_ssh_config("some real file") self.assertEqual(1, len(ret)) self.assertEqual(comment_line, ret[0].line) def test_blank_lines(self): - lines = ['', '\t', ' '] - self.load_file.return_value = '\n'.join(lines) - ret = ssh_util.parse_ssh_config('some real file') + lines = ["", "\t", " "] + self.load_file.return_value = "\n".join(lines) + ret = ssh_util.parse_ssh_config("some real file") self.assertEqual(len(lines), len(ret)) for line in ret: - self.assertEqual('', line.line) + self.assertEqual("", line.line) def test_lower_case_config(self): - self.load_file.return_value = 'foo bar' - ret = ssh_util.parse_ssh_config('some real file') + self.load_file.return_value = "foo bar" + ret = ssh_util.parse_ssh_config("some real file") self.assertEqual(1, len(ret)) - self.assertEqual('foo', ret[0].key) - self.assertEqual('bar', ret[0].value) + self.assertEqual("foo", ret[0].key) + self.assertEqual("bar", ret[0].value) def test_upper_case_config(self): - self.load_file.return_value = 'Foo Bar' - ret = ssh_util.parse_ssh_config('some real file') + self.load_file.return_value = "Foo Bar" + ret = ssh_util.parse_ssh_config("some real file") self.assertEqual(1, len(ret)) - self.assertEqual('foo', ret[0].key) - self.assertEqual('Bar', ret[0].value) + self.assertEqual("foo", ret[0].key) + self.assertEqual("Bar", ret[0].value) def test_lower_case_with_equals(self): - self.load_file.return_value = 'foo=bar' - ret = ssh_util.parse_ssh_config('some real file') + self.load_file.return_value = "foo=bar" + ret = ssh_util.parse_ssh_config("some real file") self.assertEqual(1, len(ret)) - self.assertEqual('foo', ret[0].key) - self.assertEqual('bar', ret[0].value) + self.assertEqual("foo", ret[0].key) + self.assertEqual("bar", ret[0].value) def test_upper_case_with_equals(self): - self.load_file.return_value = 'Foo=bar' - ret = ssh_util.parse_ssh_config('some real file') + self.load_file.return_value = "Foo=bar" + ret = ssh_util.parse_ssh_config("some real file") self.assertEqual(1, len(ret)) - self.assertEqual('foo', ret[0].key) - self.assertEqual('bar', ret[0].value) + self.assertEqual("foo", ret[0].key) + self.assertEqual("bar", ret[0].value) class TestUpdateSshConfigLines(test_helpers.CiTestCase): """Test the update_ssh_config_lines method.""" + exlines = [ "#PasswordAuthentication yes", "UsePAM yes", @@ -476,8 +536,8 @@ class TestUpdateSshConfigLines(test_helpers.CiTestCase): def test_new_option_added(self): """A single update of non-existing option.""" lines = ssh_util.parse_ssh_config_lines(list(self.exlines)) - result = ssh_util.update_ssh_config_lines(lines, {'MyKey': 'MyVal'}) - self.assertEqual(['MyKey'], result) + result = ssh_util.update_ssh_config_lines(lines, {"MyKey": "MyVal"}) + self.assertEqual(["MyKey"], result) self.check_line(lines[-1], "MyKey", "MyVal") def test_commented_out_not_updated_but_appended(self): @@ -487,6 +547,14 @@ class TestUpdateSshConfigLines(test_helpers.CiTestCase): self.assertEqual([self.pwauth], result) self.check_line(lines[-1], self.pwauth, "no") + def test_option_without_value(self): + """Implementation only accepts key-value pairs.""" + extended_exlines = self.exlines.copy() + denyusers_opt = "DenyUsers" + extended_exlines.append(denyusers_opt) + lines = ssh_util.parse_ssh_config_lines(list(extended_exlines)) + self.assertNotIn(denyusers_opt, str(lines)) + def test_single_option_updated(self): """A single update should have change made and line updated.""" opt, val = ("UsePAM", "no") @@ -497,8 +565,12 @@ class TestUpdateSshConfigLines(test_helpers.CiTestCase): def test_multiple_updates_with_add(self): """Verify multiple updates some added some changed, some not.""" - updates = {"UsePAM": "no", "X11Forwarding": "no", "NewOpt": "newval", - "AcceptEnv": "LANG ADD LC_*"} + updates = { + "UsePAM": "no", + "X11Forwarding": "no", + "NewOpt": "newval", + "AcceptEnv": "LANG ADD LC_*", + } lines = ssh_util.parse_ssh_config_lines(list(self.exlines)) result = ssh_util.update_ssh_config_lines(lines, updates) self.assertEqual(set(["UsePAM", "NewOpt", "AcceptEnv"]), set(result)) @@ -523,7 +595,7 @@ class TestUpdateSshConfigLines(test_helpers.CiTestCase): class TestUpdateSshConfig(test_helpers.CiTestCase): - cfgdata = '\n'.join(["#Option val", "MyKey ORIG_VAL", ""]) + cfgdata = "\n".join(["#Option val", "MyKey ORIG_VAL", ""]) def test_modified(self): mycfg = self.tmp_path("ssh_config_1") @@ -533,7 +605,7 @@ class TestUpdateSshConfig(test_helpers.CiTestCase): found = util.load_file(mycfg) self.assertEqual(self.cfgdata.replace("ORIG_VAL", "NEW_VAL"), found) # assert there is a newline at end of file (LP: #1677205) - self.assertEqual('\n', found[-1]) + self.assertEqual("\n", found[-1]) def test_not_modified(self): mycfg = self.tmp_path("ssh_config_2") @@ -550,76 +622,949 @@ class TestBasicAuthorizedKeyParse(test_helpers.CiTestCase): self.assertEqual( ["/opt/bobby/keys"], ssh_util.render_authorizedkeysfile_paths( - "/opt/%u/keys", "/home/bobby", "bobby")) + "/opt/%u/keys", "/home/bobby", "bobby" + ), + ) + + def test_user_file(self): + self.assertEqual( + ["/opt/bobby"], + ssh_util.render_authorizedkeysfile_paths( + "/opt/%u", "/home/bobby", "bobby" + ), + ) + + def test_user_file2(self): + self.assertEqual( + ["/opt/bobby/bobby"], + ssh_util.render_authorizedkeysfile_paths( + "/opt/%u/%u", "/home/bobby", "bobby" + ), + ) def test_multiple(self): self.assertEqual( ["/keys/path1", "/keys/path2"], ssh_util.render_authorizedkeysfile_paths( - "/keys/path1 /keys/path2", "/home/bobby", "bobby")) + "/keys/path1 /keys/path2", "/home/bobby", "bobby" + ), + ) + + def test_multiple2(self): + self.assertEqual( + ["/keys/path1", "/keys/bobby"], + ssh_util.render_authorizedkeysfile_paths( + "/keys/path1 /keys/%u", "/home/bobby", "bobby" + ), + ) def test_relative(self): self.assertEqual( ["/home/bobby/.secret/keys"], ssh_util.render_authorizedkeysfile_paths( - ".secret/keys", "/home/bobby", "bobby")) + ".secret/keys", "/home/bobby", "bobby" + ), + ) def test_home(self): self.assertEqual( ["/homedirs/bobby/.keys"], ssh_util.render_authorizedkeysfile_paths( - "%h/.keys", "/homedirs/bobby", "bobby")) + "%h/.keys", "/homedirs/bobby", "bobby" + ), + ) + + def test_all(self): + self.assertEqual( + [ + "/homedirs/bobby/.keys", + "/homedirs/bobby/.secret/keys", + "/keys/path1", + "/opt/bobby/keys", + ], + ssh_util.render_authorizedkeysfile_paths( + "%h/.keys .secret/keys /keys/path1 /opt/%u/keys", + "/homedirs/bobby", + "bobby", + ), + ) class TestMultipleSshAuthorizedKeysFile(test_helpers.CiTestCase): + def create_fake_users( + self, + names, + mock_permissions, + m_get_group, + m_get_owner, + m_get_permissions, + m_getpwnam, + users, + ): + homes = [] + + root = "/tmp/root" + fpw = FakePwEnt(pw_name="root", pw_dir=root) + users["root"] = fpw + + for name in names: + home = "/tmp/home/" + name + fpw = FakePwEnt(pw_name=name, pw_dir=home) + users[name] = fpw + homes.append(home) + + m_get_permissions.side_effect = partial( + mock_get_permissions, mock_permissions + ) + m_get_owner.side_effect = partial(mock_get_owner, mock_permissions) + m_get_group.side_effect = partial(mock_get_group, mock_permissions) + m_getpwnam.side_effect = partial(mock_getpwnam, users) + return homes + + def create_user_authorized_file(self, home, filename, content_key, keys): + user_ssh_folder = "%s/.ssh" % home + # /tmp/home/<user>/.ssh/authorized_keys = content_key + authorized_keys = self.tmp_path(filename, dir=user_ssh_folder) + util.write_file(authorized_keys, VALID_CONTENT[content_key]) + keys[authorized_keys] = content_key + return authorized_keys + + def create_global_authorized_file(self, filename, content_key, keys): + authorized_keys = self.tmp_path(filename, dir="/tmp") + util.write_file(authorized_keys, VALID_CONTENT[content_key]) + keys[authorized_keys] = content_key + return authorized_keys + + def create_sshd_config(self, authorized_keys_files): + sshd_config = self.tmp_path("sshd_config", dir="/tmp") + util.write_file( + sshd_config, "AuthorizedKeysFile " + authorized_keys_files + ) + return sshd_config + + def execute_and_check( + self, user, sshd_config, solution, keys, delete_keys=True + ): + (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys( + user, sshd_config + ) + content = ssh_util.update_authorized_keys(auth_key_entries, []) + + self.assertEqual(auth_key_fn, solution) + for path, key in keys.items(): + if path == solution: + self.assertTrue(VALID_CONTENT[key] in content) + else: + self.assertFalse(VALID_CONTENT[key] in content) + + if delete_keys and os.path.isdir("/tmp/home/"): + util.delete_dir_contents("/tmp/home/") + + @patch("cloudinit.ssh_util.pwd.getpwnam") + @patch("cloudinit.util.get_permissions") + @patch("cloudinit.util.get_owner") + @patch("cloudinit.util.get_group") + def test_single_user_two_local_files( + self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam + ): + user_bobby = "bobby" + keys = {} + users = {} + mock_permissions = { + "/tmp/home/bobby": ("bobby", "bobby", 0o700), + "/tmp/home/bobby/.ssh": ("bobby", "bobby", 0o700), + "/tmp/home/bobby/.ssh/user_keys": ("bobby", "bobby", 0o600), + "/tmp/home/bobby/.ssh/authorized_keys": ("bobby", "bobby", 0o600), + } + + homes = self.create_fake_users( + [user_bobby], + mock_permissions, + m_get_group, + m_get_owner, + m_get_permissions, + m_getpwnam, + users, + ) + home = homes[0] + + # /tmp/home/bobby/.ssh/authorized_keys = rsa + authorized_keys = self.create_user_authorized_file( + home, "authorized_keys", "rsa", keys + ) + + # /tmp/home/bobby/.ssh/user_keys = dsa + user_keys = self.create_user_authorized_file( + home, "user_keys", "dsa", keys + ) + + # /tmp/sshd_config + options = "%s %s" % (authorized_keys, user_keys) + sshd_config = self.create_sshd_config(options) + + self.execute_and_check(user_bobby, sshd_config, authorized_keys, keys) @patch("cloudinit.ssh_util.pwd.getpwnam") - def test_multiple_authorizedkeys_file_order1(self, m_getpwnam): - fpw = FakePwEnt(pw_name='bobby', pw_dir='/home2/bobby') - m_getpwnam.return_value = fpw - authorized_keys = self.tmp_path('authorized_keys') - util.write_file(authorized_keys, VALID_CONTENT['rsa']) + @patch("cloudinit.util.get_permissions") + @patch("cloudinit.util.get_owner") + @patch("cloudinit.util.get_group") + def test_single_user_two_local_files_inverted( + self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam + ): + user_bobby = "bobby" + keys = {} + users = {} + mock_permissions = { + "/tmp/home/bobby": ("bobby", "bobby", 0o700), + "/tmp/home/bobby/.ssh": ("bobby", "bobby", 0o700), + "/tmp/home/bobby/.ssh/user_keys": ("bobby", "bobby", 0o600), + "/tmp/home/bobby/.ssh/authorized_keys": ("bobby", "bobby", 0o600), + } + + homes = self.create_fake_users( + [user_bobby], + mock_permissions, + m_get_group, + m_get_owner, + m_get_permissions, + m_getpwnam, + users, + ) + home = homes[0] - user_keys = self.tmp_path('user_keys') - util.write_file(user_keys, VALID_CONTENT['dsa']) + # /tmp/home/bobby/.ssh/authorized_keys = rsa + authorized_keys = self.create_user_authorized_file( + home, "authorized_keys", "rsa", keys + ) - sshd_config = self.tmp_path('sshd_config') - util.write_file( - sshd_config, - "AuthorizedKeysFile %s %s" % (authorized_keys, user_keys) + # /tmp/home/bobby/.ssh/user_keys = dsa + user_keys = self.create_user_authorized_file( + home, "user_keys", "dsa", keys ) - (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys( - fpw.pw_name, sshd_config) - content = ssh_util.update_authorized_keys(auth_key_entries, []) + # /tmp/sshd_config + options = "%s %s" % (user_keys, authorized_keys) + sshd_config = self.create_sshd_config(options) - self.assertEqual(authorized_keys, auth_key_fn) - self.assertTrue(VALID_CONTENT['rsa'] in content) - self.assertTrue(VALID_CONTENT['dsa'] in content) + self.execute_and_check(user_bobby, sshd_config, user_keys, keys) @patch("cloudinit.ssh_util.pwd.getpwnam") - def test_multiple_authorizedkeys_file_order2(self, m_getpwnam): - fpw = FakePwEnt(pw_name='suzie', pw_dir='/home/suzie') - m_getpwnam.return_value = fpw - authorized_keys = self.tmp_path('authorized_keys') - util.write_file(authorized_keys, VALID_CONTENT['rsa']) + @patch("cloudinit.util.get_permissions") + @patch("cloudinit.util.get_owner") + @patch("cloudinit.util.get_group") + def test_single_user_local_global_files( + self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam + ): + user_bobby = "bobby" + keys = {} + users = {} + mock_permissions = { + "/tmp/home/bobby": ("bobby", "bobby", 0o700), + "/tmp/home/bobby/.ssh": ("bobby", "bobby", 0o700), + "/tmp/home/bobby/.ssh/user_keys": ("bobby", "bobby", 0o600), + "/tmp/home/bobby/.ssh/authorized_keys": ("bobby", "bobby", 0o600), + } + + homes = self.create_fake_users( + [user_bobby], + mock_permissions, + m_get_group, + m_get_owner, + m_get_permissions, + m_getpwnam, + users, + ) + home = homes[0] - user_keys = self.tmp_path('user_keys') - util.write_file(user_keys, VALID_CONTENT['dsa']) + # /tmp/home/bobby/.ssh/authorized_keys = rsa + authorized_keys = self.create_user_authorized_file( + home, "authorized_keys", "rsa", keys + ) - sshd_config = self.tmp_path('sshd_config') - util.write_file( - sshd_config, - "AuthorizedKeysFile %s %s" % (user_keys, authorized_keys) + # /tmp/home/bobby/.ssh/user_keys = dsa + user_keys = self.create_user_authorized_file( + home, "user_keys", "dsa", keys ) - (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys( - fpw.pw_name, sshd_config + authorized_keys_global = self.create_global_authorized_file( + "etc/ssh/authorized_keys", "ecdsa", keys ) - content = ssh_util.update_authorized_keys(auth_key_entries, []) - self.assertEqual(user_keys, auth_key_fn) - self.assertTrue(VALID_CONTENT['rsa'] in content) - self.assertTrue(VALID_CONTENT['dsa'] in content) + options = "%s %s %s" % ( + authorized_keys_global, + user_keys, + authorized_keys, + ) + sshd_config = self.create_sshd_config(options) + + self.execute_and_check(user_bobby, sshd_config, user_keys, keys) + + @patch("cloudinit.ssh_util.pwd.getpwnam") + @patch("cloudinit.util.get_permissions") + @patch("cloudinit.util.get_owner") + @patch("cloudinit.util.get_group") + def test_single_user_local_global_files_inverted( + self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam + ): + user_bobby = "bobby" + keys = {} + users = {} + mock_permissions = { + "/tmp/home/bobby": ("bobby", "bobby", 0o700), + "/tmp/home/bobby/.ssh": ("bobby", "bobby", 0o700), + "/tmp/home/bobby/.ssh/user_keys3": ("bobby", "bobby", 0o600), + "/tmp/home/bobby/.ssh/authorized_keys2": ("bobby", "bobby", 0o600), + } + + homes = self.create_fake_users( + [user_bobby], + mock_permissions, + m_get_group, + m_get_owner, + m_get_permissions, + m_getpwnam, + users, + ) + home = homes[0] + + # /tmp/home/bobby/.ssh/authorized_keys = rsa + authorized_keys = self.create_user_authorized_file( + home, "authorized_keys2", "rsa", keys + ) + + # /tmp/home/bobby/.ssh/user_keys = dsa + user_keys = self.create_user_authorized_file( + home, "user_keys3", "dsa", keys + ) + + authorized_keys_global = self.create_global_authorized_file( + "etc/ssh/authorized_keys", "ecdsa", keys + ) + + options = "%s %s %s" % ( + authorized_keys_global, + authorized_keys, + user_keys, + ) + sshd_config = self.create_sshd_config(options) + + self.execute_and_check(user_bobby, sshd_config, authorized_keys, keys) + + @patch("cloudinit.ssh_util.pwd.getpwnam") + @patch("cloudinit.util.get_permissions") + @patch("cloudinit.util.get_owner") + @patch("cloudinit.util.get_group") + def test_single_user_global_file( + self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam + ): + user_bobby = "bobby" + keys = {} + users = {} + mock_permissions = { + "/tmp/home/bobby": ("bobby", "bobby", 0o700), + "/tmp/home/bobby/.ssh": ("bobby", "bobby", 0o700), + "/tmp/home/bobby/.ssh/authorized_keys": ("bobby", "bobby", 0o600), + } + + homes = self.create_fake_users( + [user_bobby], + mock_permissions, + m_get_group, + m_get_owner, + m_get_permissions, + m_getpwnam, + users, + ) + home = homes[0] + + # /tmp/etc/ssh/authorized_keys = rsa + authorized_keys_global = self.create_global_authorized_file( + "etc/ssh/authorized_keys", "rsa", keys + ) + + options = "%s" % authorized_keys_global + sshd_config = self.create_sshd_config(options) + + default = "%s/.ssh/authorized_keys" % home + self.execute_and_check(user_bobby, sshd_config, default, keys) + + @patch("cloudinit.ssh_util.pwd.getpwnam") + @patch("cloudinit.util.get_permissions") + @patch("cloudinit.util.get_owner") + @patch("cloudinit.util.get_group") + def test_two_users_local_file_standard( + self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam + ): + keys = {} + users = {} + mock_permissions = { + "/tmp/home/bobby": ("bobby", "bobby", 0o700), + "/tmp/home/bobby/.ssh": ("bobby", "bobby", 0o700), + "/tmp/home/bobby/.ssh/authorized_keys": ("bobby", "bobby", 0o600), + "/tmp/home/suzie": ("suzie", "suzie", 0o700), + "/tmp/home/suzie/.ssh": ("suzie", "suzie", 0o700), + "/tmp/home/suzie/.ssh/authorized_keys": ("suzie", "suzie", 0o600), + } + + user_bobby = "bobby" + user_suzie = "suzie" + homes = self.create_fake_users( + [user_bobby, user_suzie], + mock_permissions, + m_get_group, + m_get_owner, + m_get_permissions, + m_getpwnam, + users, + ) + home_bobby = homes[0] + home_suzie = homes[1] + + # /tmp/home/bobby/.ssh/authorized_keys = rsa + authorized_keys = self.create_user_authorized_file( + home_bobby, "authorized_keys", "rsa", keys + ) + + # /tmp/home/suzie/.ssh/authorized_keys = rsa + authorized_keys2 = self.create_user_authorized_file( + home_suzie, "authorized_keys", "ssh-xmss@openssh.com", keys + ) + + options = ".ssh/authorized_keys" + sshd_config = self.create_sshd_config(options) + + self.execute_and_check( + user_bobby, sshd_config, authorized_keys, keys, delete_keys=False + ) + self.execute_and_check(user_suzie, sshd_config, authorized_keys2, keys) + + @patch("cloudinit.ssh_util.pwd.getpwnam") + @patch("cloudinit.util.get_permissions") + @patch("cloudinit.util.get_owner") + @patch("cloudinit.util.get_group") + def test_two_users_local_file_custom( + self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam + ): + keys = {} + users = {} + mock_permissions = { + "/tmp/home/bobby": ("bobby", "bobby", 0o700), + "/tmp/home/bobby/.ssh": ("bobby", "bobby", 0o700), + "/tmp/home/bobby/.ssh/authorized_keys2": ("bobby", "bobby", 0o600), + "/tmp/home/suzie": ("suzie", "suzie", 0o700), + "/tmp/home/suzie/.ssh": ("suzie", "suzie", 0o700), + "/tmp/home/suzie/.ssh/authorized_keys2": ("suzie", "suzie", 0o600), + } + + user_bobby = "bobby" + user_suzie = "suzie" + homes = self.create_fake_users( + [user_bobby, user_suzie], + mock_permissions, + m_get_group, + m_get_owner, + m_get_permissions, + m_getpwnam, + users, + ) + home_bobby = homes[0] + home_suzie = homes[1] + + # /tmp/home/bobby/.ssh/authorized_keys2 = rsa + authorized_keys = self.create_user_authorized_file( + home_bobby, "authorized_keys2", "rsa", keys + ) + + # /tmp/home/suzie/.ssh/authorized_keys2 = rsa + authorized_keys2 = self.create_user_authorized_file( + home_suzie, "authorized_keys2", "ssh-xmss@openssh.com", keys + ) + + options = ".ssh/authorized_keys2" + sshd_config = self.create_sshd_config(options) + + self.execute_and_check( + user_bobby, sshd_config, authorized_keys, keys, delete_keys=False + ) + self.execute_and_check(user_suzie, sshd_config, authorized_keys2, keys) + + @patch("cloudinit.ssh_util.pwd.getpwnam") + @patch("cloudinit.util.get_permissions") + @patch("cloudinit.util.get_owner") + @patch("cloudinit.util.get_group") + def test_two_users_local_global_files( + self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam + ): + keys = {} + users = {} + mock_permissions = { + "/tmp/home/bobby": ("bobby", "bobby", 0o700), + "/tmp/home/bobby/.ssh": ("bobby", "bobby", 0o700), + "/tmp/home/bobby/.ssh/authorized_keys2": ("bobby", "bobby", 0o600), + "/tmp/home/bobby/.ssh/user_keys3": ("bobby", "bobby", 0o600), + "/tmp/home/suzie": ("suzie", "suzie", 0o700), + "/tmp/home/suzie/.ssh": ("suzie", "suzie", 0o700), + "/tmp/home/suzie/.ssh/authorized_keys2": ("suzie", "suzie", 0o600), + "/tmp/home/suzie/.ssh/user_keys3": ("suzie", "suzie", 0o600), + } + + user_bobby = "bobby" + user_suzie = "suzie" + homes = self.create_fake_users( + [user_bobby, user_suzie], + mock_permissions, + m_get_group, + m_get_owner, + m_get_permissions, + m_getpwnam, + users, + ) + home_bobby = homes[0] + home_suzie = homes[1] + + # /tmp/home/bobby/.ssh/authorized_keys2 = rsa + self.create_user_authorized_file( + home_bobby, "authorized_keys2", "rsa", keys + ) + # /tmp/home/bobby/.ssh/user_keys3 = dsa + user_keys = self.create_user_authorized_file( + home_bobby, "user_keys3", "dsa", keys + ) + + # /tmp/home/suzie/.ssh/authorized_keys2 = rsa + authorized_keys2 = self.create_user_authorized_file( + home_suzie, "authorized_keys2", "ssh-xmss@openssh.com", keys + ) + + # /tmp/etc/ssh/authorized_keys = ecdsa + authorized_keys_global = self.create_global_authorized_file( + "etc/ssh/authorized_keys2", "ecdsa", keys + ) + + options = "%s %s %%h/.ssh/authorized_keys2" % ( + authorized_keys_global, + user_keys, + ) + sshd_config = self.create_sshd_config(options) + + self.execute_and_check( + user_bobby, sshd_config, user_keys, keys, delete_keys=False + ) + self.execute_and_check(user_suzie, sshd_config, authorized_keys2, keys) + + @patch("cloudinit.util.get_user_groups") + @patch("cloudinit.ssh_util.pwd.getpwnam") + @patch("cloudinit.util.get_permissions") + @patch("cloudinit.util.get_owner") + @patch("cloudinit.util.get_group") + def test_two_users_local_global_files_badguy( + self, + m_get_group, + m_get_owner, + m_get_permissions, + m_getpwnam, + m_get_user_groups, + ): + keys = {} + users = {} + mock_permissions = { + "/tmp/home/bobby": ("bobby", "bobby", 0o700), + "/tmp/home/bobby/.ssh": ("bobby", "bobby", 0o700), + "/tmp/home/bobby/.ssh/authorized_keys2": ("bobby", "bobby", 0o600), + "/tmp/home/bobby/.ssh/user_keys3": ("bobby", "bobby", 0o600), + "/tmp/home/badguy": ("root", "root", 0o755), + "/tmp/home/badguy/home": ("root", "root", 0o755), + "/tmp/home/badguy/home/bobby": ("root", "root", 0o655), + } + + user_bobby = "bobby" + user_badguy = "badguy" + home_bobby, *_ = self.create_fake_users( + [user_bobby, user_badguy], + mock_permissions, + m_get_group, + m_get_owner, + m_get_permissions, + m_getpwnam, + users, + ) + m_get_user_groups.side_effect = mock_get_user_groups + + # /tmp/home/bobby/.ssh/authorized_keys2 = rsa + authorized_keys = self.create_user_authorized_file( + home_bobby, "authorized_keys2", "rsa", keys + ) + # /tmp/home/bobby/.ssh/user_keys3 = dsa + user_keys = self.create_user_authorized_file( + home_bobby, "user_keys3", "dsa", keys + ) + + # /tmp/home/badguy/home/bobby = "" + authorized_keys2 = self.tmp_path("home/bobby", dir="/tmp/home/badguy") + util.write_file(authorized_keys2, "") + + # /tmp/etc/ssh/authorized_keys = ecdsa + authorized_keys_global = self.create_global_authorized_file( + "etc/ssh/authorized_keys2", "ecdsa", keys + ) + + # /tmp/sshd_config + options = "%s %%h/.ssh/authorized_keys2 %s %s" % ( + authorized_keys2, + authorized_keys_global, + user_keys, + ) + sshd_config = self.create_sshd_config(options) + + self.execute_and_check( + user_bobby, sshd_config, authorized_keys, keys, delete_keys=False + ) + self.execute_and_check( + user_badguy, sshd_config, authorized_keys2, keys + ) + + @patch("cloudinit.util.get_user_groups") + @patch("cloudinit.ssh_util.pwd.getpwnam") + @patch("cloudinit.util.get_permissions") + @patch("cloudinit.util.get_owner") + @patch("cloudinit.util.get_group") + def test_two_users_unaccessible_file( + self, + m_get_group, + m_get_owner, + m_get_permissions, + m_getpwnam, + m_get_user_groups, + ): + keys = {} + users = {} + mock_permissions = { + "/tmp/home/bobby": ("bobby", "bobby", 0o700), + "/tmp/home/bobby/.ssh": ("bobby", "bobby", 0o700), + "/tmp/home/bobby/.ssh/authorized_keys": ("bobby", "bobby", 0o600), + "/tmp/etc": ("root", "root", 0o755), + "/tmp/etc/ssh": ("root", "root", 0o755), + "/tmp/etc/ssh/userkeys": ("root", "root", 0o700), + "/tmp/etc/ssh/userkeys/bobby": ("bobby", "bobby", 0o600), + "/tmp/etc/ssh/userkeys/badguy": ("badguy", "badguy", 0o600), + "/tmp/home/badguy": ("badguy", "badguy", 0o700), + "/tmp/home/badguy/.ssh": ("badguy", "badguy", 0o700), + "/tmp/home/badguy/.ssh/authorized_keys": ( + "badguy", + "badguy", + 0o600, + ), + } + + user_bobby = "bobby" + user_badguy = "badguy" + homes = self.create_fake_users( + [user_bobby, user_badguy], + mock_permissions, + m_get_group, + m_get_owner, + m_get_permissions, + m_getpwnam, + users, + ) + m_get_user_groups.side_effect = mock_get_user_groups + home_bobby = homes[0] + home_badguy = homes[1] + + # /tmp/home/bobby/.ssh/authorized_keys = rsa + authorized_keys = self.create_user_authorized_file( + home_bobby, "authorized_keys", "rsa", keys + ) + # /tmp/etc/ssh/userkeys/bobby = dsa + # assume here that we can bypass userkeys, despite permissions + self.create_global_authorized_file( + "etc/ssh/userkeys/bobby", "dsa", keys + ) + + # /tmp/home/badguy/.ssh/authorized_keys = ssh-xmss@openssh.com + authorized_keys2 = self.create_user_authorized_file( + home_badguy, "authorized_keys", "ssh-xmss@openssh.com", keys + ) + + # /tmp/etc/ssh/userkeys/badguy = ecdsa + self.create_global_authorized_file( + "etc/ssh/userkeys/badguy", "ecdsa", keys + ) + + # /tmp/sshd_config + options = "/tmp/etc/ssh/userkeys/%u .ssh/authorized_keys" + sshd_config = self.create_sshd_config(options) + + self.execute_and_check( + user_bobby, sshd_config, authorized_keys, keys, delete_keys=False + ) + self.execute_and_check( + user_badguy, sshd_config, authorized_keys2, keys + ) + + @patch("cloudinit.util.get_user_groups") + @patch("cloudinit.ssh_util.pwd.getpwnam") + @patch("cloudinit.util.get_permissions") + @patch("cloudinit.util.get_owner") + @patch("cloudinit.util.get_group") + def test_two_users_accessible_file( + self, + m_get_group, + m_get_owner, + m_get_permissions, + m_getpwnam, + m_get_user_groups, + ): + keys = {} + users = {} + mock_permissions = { + "/tmp/home/bobby": ("bobby", "bobby", 0o700), + "/tmp/home/bobby/.ssh": ("bobby", "bobby", 0o700), + "/tmp/home/bobby/.ssh/authorized_keys": ("bobby", "bobby", 0o600), + "/tmp/etc": ("root", "root", 0o755), + "/tmp/etc/ssh": ("root", "root", 0o755), + "/tmp/etc/ssh/userkeys": ("root", "root", 0o755), + "/tmp/etc/ssh/userkeys/bobby": ("bobby", "bobby", 0o600), + "/tmp/etc/ssh/userkeys/badguy": ("badguy", "badguy", 0o600), + "/tmp/home/badguy": ("badguy", "badguy", 0o700), + "/tmp/home/badguy/.ssh": ("badguy", "badguy", 0o700), + "/tmp/home/badguy/.ssh/authorized_keys": ( + "badguy", + "badguy", + 0o600, + ), + } + + user_bobby = "bobby" + user_badguy = "badguy" + homes = self.create_fake_users( + [user_bobby, user_badguy], + mock_permissions, + m_get_group, + m_get_owner, + m_get_permissions, + m_getpwnam, + users, + ) + m_get_user_groups.side_effect = mock_get_user_groups + home_bobby = homes[0] + home_badguy = homes[1] + + # /tmp/home/bobby/.ssh/authorized_keys = rsa + self.create_user_authorized_file( + home_bobby, "authorized_keys", "rsa", keys + ) + # /tmp/etc/ssh/userkeys/bobby = dsa + # assume here that we can bypass userkeys, despite permissions + authorized_keys = self.create_global_authorized_file( + "etc/ssh/userkeys/bobby", "dsa", keys + ) + + # /tmp/home/badguy/.ssh/authorized_keys = ssh-xmss@openssh.com + self.create_user_authorized_file( + home_badguy, "authorized_keys", "ssh-xmss@openssh.com", keys + ) + + # /tmp/etc/ssh/userkeys/badguy = ecdsa + authorized_keys2 = self.create_global_authorized_file( + "etc/ssh/userkeys/badguy", "ecdsa", keys + ) + + # /tmp/sshd_config + options = "/tmp/etc/ssh/userkeys/%u .ssh/authorized_keys" + sshd_config = self.create_sshd_config(options) + + self.execute_and_check( + user_bobby, sshd_config, authorized_keys, keys, delete_keys=False + ) + self.execute_and_check( + user_badguy, sshd_config, authorized_keys2, keys + ) + + @patch("cloudinit.util.get_user_groups") + @patch("cloudinit.ssh_util.pwd.getpwnam") + @patch("cloudinit.util.get_permissions") + @patch("cloudinit.util.get_owner") + @patch("cloudinit.util.get_group") + def test_two_users_hardcoded_single_user_file( + self, + m_get_group, + m_get_owner, + m_get_permissions, + m_getpwnam, + m_get_user_groups, + ): + keys = {} + users = {} + mock_permissions = { + "/tmp/home/bobby": ("bobby", "bobby", 0o700), + "/tmp/home/bobby/.ssh": ("bobby", "bobby", 0o700), + "/tmp/home/bobby/.ssh/authorized_keys": ("bobby", "bobby", 0o600), + "/tmp/home/suzie": ("suzie", "suzie", 0o700), + "/tmp/home/suzie/.ssh": ("suzie", "suzie", 0o700), + "/tmp/home/suzie/.ssh/authorized_keys": ("suzie", "suzie", 0o600), + } + + user_bobby = "bobby" + user_suzie = "suzie" + homes = self.create_fake_users( + [user_bobby, user_suzie], + mock_permissions, + m_get_group, + m_get_owner, + m_get_permissions, + m_getpwnam, + users, + ) + home_bobby = homes[0] + home_suzie = homes[1] + m_get_user_groups.side_effect = mock_get_user_groups + + # /tmp/home/bobby/.ssh/authorized_keys = rsa + authorized_keys = self.create_user_authorized_file( + home_bobby, "authorized_keys", "rsa", keys + ) + + # /tmp/home/suzie/.ssh/authorized_keys = ssh-xmss@openssh.com + self.create_user_authorized_file( + home_suzie, "authorized_keys", "ssh-xmss@openssh.com", keys + ) + + # /tmp/sshd_config + options = "%s" % (authorized_keys) + sshd_config = self.create_sshd_config(options) + + self.execute_and_check( + user_bobby, sshd_config, authorized_keys, keys, delete_keys=False + ) + default = "%s/.ssh/authorized_keys" % home_suzie + self.execute_and_check(user_suzie, sshd_config, default, keys) + + @patch("cloudinit.util.get_user_groups") + @patch("cloudinit.ssh_util.pwd.getpwnam") + @patch("cloudinit.util.get_permissions") + @patch("cloudinit.util.get_owner") + @patch("cloudinit.util.get_group") + def test_two_users_hardcoded_single_user_file_inverted( + self, + m_get_group, + m_get_owner, + m_get_permissions, + m_getpwnam, + m_get_user_groups, + ): + keys = {} + users = {} + mock_permissions = { + "/tmp/home/bobby": ("bobby", "bobby", 0o700), + "/tmp/home/bobby/.ssh": ("bobby", "bobby", 0o700), + "/tmp/home/bobby/.ssh/authorized_keys": ("bobby", "bobby", 0o600), + "/tmp/home/suzie": ("suzie", "suzie", 0o700), + "/tmp/home/suzie/.ssh": ("suzie", "suzie", 0o700), + "/tmp/home/suzie/.ssh/authorized_keys": ("suzie", "suzie", 0o600), + } + + user_bobby = "bobby" + user_suzie = "suzie" + homes = self.create_fake_users( + [user_bobby, user_suzie], + mock_permissions, + m_get_group, + m_get_owner, + m_get_permissions, + m_getpwnam, + users, + ) + home_bobby = homes[0] + home_suzie = homes[1] + m_get_user_groups.side_effect = mock_get_user_groups + + # /tmp/home/bobby/.ssh/authorized_keys = rsa + self.create_user_authorized_file( + home_bobby, "authorized_keys", "rsa", keys + ) + + # /tmp/home/suzie/.ssh/authorized_keys = ssh-xmss@openssh.com + authorized_keys2 = self.create_user_authorized_file( + home_suzie, "authorized_keys", "ssh-xmss@openssh.com", keys + ) + + # /tmp/sshd_config + options = "%s" % (authorized_keys2) + sshd_config = self.create_sshd_config(options) + + default = "%s/.ssh/authorized_keys" % home_bobby + self.execute_and_check( + user_bobby, sshd_config, default, keys, delete_keys=False + ) + self.execute_and_check(user_suzie, sshd_config, authorized_keys2, keys) + + @patch("cloudinit.util.get_user_groups") + @patch("cloudinit.ssh_util.pwd.getpwnam") + @patch("cloudinit.util.get_permissions") + @patch("cloudinit.util.get_owner") + @patch("cloudinit.util.get_group") + def test_two_users_hardcoded_user_files( + self, + m_get_group, + m_get_owner, + m_get_permissions, + m_getpwnam, + m_get_user_groups, + ): + keys = {} + users = {} + mock_permissions = { + "/tmp/home/bobby": ("bobby", "bobby", 0o700), + "/tmp/home/bobby/.ssh": ("bobby", "bobby", 0o700), + "/tmp/home/bobby/.ssh/authorized_keys": ("bobby", "bobby", 0o600), + "/tmp/home/suzie": ("suzie", "suzie", 0o700), + "/tmp/home/suzie/.ssh": ("suzie", "suzie", 0o700), + "/tmp/home/suzie/.ssh/authorized_keys": ("suzie", "suzie", 0o600), + } + + user_bobby = "bobby" + user_suzie = "suzie" + homes = self.create_fake_users( + [user_bobby, user_suzie], + mock_permissions, + m_get_group, + m_get_owner, + m_get_permissions, + m_getpwnam, + users, + ) + home_bobby = homes[0] + home_suzie = homes[1] + m_get_user_groups.side_effect = mock_get_user_groups + + # /tmp/home/bobby/.ssh/authorized_keys = rsa + authorized_keys = self.create_user_authorized_file( + home_bobby, "authorized_keys", "rsa", keys + ) + + # /tmp/home/suzie/.ssh/authorized_keys = ssh-xmss@openssh.com + authorized_keys2 = self.create_user_authorized_file( + home_suzie, "authorized_keys", "ssh-xmss@openssh.com", keys + ) + + # /tmp/etc/ssh/authorized_keys = ecdsa + authorized_keys_global = self.create_global_authorized_file( + "etc/ssh/authorized_keys", "ecdsa", keys + ) + + # /tmp/sshd_config + options = "%s %s %s" % ( + authorized_keys_global, + authorized_keys, + authorized_keys2, + ) + sshd_config = self.create_sshd_config(options) + + self.execute_and_check( + user_bobby, sshd_config, authorized_keys, keys, delete_keys=False + ) + self.execute_and_check(user_suzie, sshd_config, authorized_keys2, keys) + # vi: ts=4 expandtab |