summaryrefslogtreecommitdiff
path: root/tests/unittests/test_sshutil.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/test_sshutil.py')
-rw-r--r--tests/unittests/test_sshutil.py1199
1 files changed, 1072 insertions, 127 deletions
diff --git a/tests/unittests/test_sshutil.py b/tests/unittests/test_sshutil.py
index 88a111e3..d614350e 100644
--- a/tests/unittests/test_sshutil.py
+++ b/tests/unittests/test_sshutil.py
@@ -1,25 +1,65 @@
# This file is part of cloud-init. See LICENSE file for license information.
+import os
from collections import namedtuple
+from functools import partial
from unittest.mock import patch
-from cloudinit import ssh_util
-from cloudinit.tests import helpers as test_helpers
-from cloudinit import util
+from cloudinit import ssh_util, util
+from tests.unittests import helpers as test_helpers
# https://stackoverflow.com/questions/11351032/
FakePwEnt = namedtuple(
- 'FakePwEnt',
- ['pw_dir', 'pw_gecos', 'pw_name', 'pw_passwd', 'pw_shell', 'pwd_uid'])
+ "FakePwEnt",
+ [
+ "pw_name",
+ "pw_passwd",
+ "pw_uid",
+ "pw_gid",
+ "pw_gecos",
+ "pw_dir",
+ "pw_shell",
+ ],
+)
FakePwEnt.__new__.__defaults__ = tuple(
- "UNSET_%s" % n for n in FakePwEnt._fields)
+ "UNSET_%s" % n for n in FakePwEnt._fields
+)
+
+
+def mock_get_owner(updated_permissions, value):
+ try:
+ return updated_permissions[value][0]
+ except ValueError:
+ return util.get_owner(value)
+
+
+def mock_get_group(updated_permissions, value):
+ try:
+ return updated_permissions[value][1]
+ except ValueError:
+ return util.get_group(value)
+
+
+def mock_get_user_groups(username):
+ return username
+
+
+def mock_get_permissions(updated_permissions, value):
+ try:
+ return updated_permissions[value][2]
+ except ValueError:
+ return util.get_permissions(value)
+
+
+def mock_getpwnam(users, username):
+ return users[username]
# Do not use these public keys, most of them are fetched from
# the testdata for OpenSSH, and their private keys are available
# https://github.com/openssh/openssh-portable/tree/master/regress/unittests/sshkey/testdata
VALID_CONTENT = {
- 'dsa': (
+ "dsa": (
"AAAAB3NzaC1kc3MAAACBAIrjOQSlSea19bExXBMBKBvcLhBoVvNBjCppNzllipF"
"W4jgIOMcNanULRrZGjkOKat6MWJNetSbV1E6IOFDQ16rQgsh/OvYU9XhzM8seLa"
"A21VszZuhIV7/2DE3vxu7B54zVzueG1O1Deq6goQCRGWBUnqO2yluJiG4HzrnDa"
@@ -31,12 +71,12 @@ VALID_CONTENT = {
"JNDnIqDHxTkc6LY2vu8Y2pQ3/bVnllZZOda2oD5HQ7ovygQa6CH+fbaZHbdDUX/"
"5z7u2rVAlDw=="
),
- 'ecdsa': (
+ "ecdsa": (
"AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBITrGBB3cgJ"
"J7fPxvtMW9H3oRisNpJ3OAslxZeyP7I0A9BPAW0RQIwHVtVnM7zrp4nI+JLZov/"
"Ql7lc2leWL7CY="
),
- 'rsa': (
+ "rsa": (
"AAAAB3NzaC1yc2EAAAABIwAAAQEA3I7VUf2l5gSn5uavROsc5HRDpZdQueUq5oz"
"emNSj8T7enqKHOEaFoU2VoPgGEWC9RyzSQVeyD6s7APMcE82EtmW4skVEgEGSbD"
"c1pvxzxtchBj78hJP6Cf5TCMFSXw+Fz5rF1dR23QDbN1mkHs7adr8GW4kSWqU7Q"
@@ -44,11 +84,10 @@ VALID_CONTENT = {
"YWpMfYdPUnE7u536WqzFmsaqJctz3gBxH9Ex7dFtrxR4qiqEr9Qtlu3xGn7Bw07"
"/+i1D+ey3ONkZLN+LQ714cgj8fRS4Hj29SCmXp5Kt5/82cD/VN3NtHw=="
),
- 'ed25519': (
- "AAAAC3NzaC1lZDI1NTE5AAAAIA1J77+CrJ8p6/vWCEzuylqJNMHUP/XmeYyGVWb"
- "8lnDd"
+ "ed25519": (
+ "AAAAC3NzaC1lZDI1NTE5AAAAIA1J77+CrJ8p6/vWCEzuylqJNMHUP/XmeYyGVWb8lnDd"
),
- 'ecdsa-sha2-nistp256-cert-v01@openssh.com': (
+ "ecdsa-sha2-nistp256-cert-v01@openssh.com": (
"AAAAKGVjZHNhLXNoYTItbmlzdHAyNTYtY2VydC12MDFAb3BlbnNzaC5jb20AAAA"
"gQIfwT/+UX68/hlKsdKuaOuAVB6ftTg03SlP/uH4OBEwAAAAIbmlzdHAyNTYAAA"
"BBBEjA0gjJmPM6La3sXyfNlnjilvvGY6I2M8SvJj4o3X/46wcUbPWTaj4RF3EXw"
@@ -63,12 +102,12 @@ VALID_CONTENT = {
"2tM3QXkDcwdP0SxSEW5yy4XV5oAAAAhANNMm1cdVlAt3hmycQgdD82zPlg5YvVO"
"iN0SQTbgVD8i"
),
- 'ecdsa-sha2-nistp256': (
+ "ecdsa-sha2-nistp256": (
"AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBEjA0gjJmPM"
"6La3sXyfNlnjilvvGY6I2M8SvJj4o3X/46wcUbPWTaj4RF3EXwHvNxplYBwdPlk"
"2zEecvf9Cs2BM="
),
- 'ecdsa-sha2-nistp384-cert-v01@openssh.com': (
+ "ecdsa-sha2-nistp384-cert-v01@openssh.com": (
"AAAAKGVjZHNhLXNoYTItbmlzdHAzODQtY2VydC12MDFAb3BlbnNzaC5jb20AAAA"
"grnSvDsK1EnCZndO1IyGWcGkVgVSkPWi/XO2ybPFyLVUAAAAIbmlzdHAzODQAAA"
"BhBAaYSQs+8TT0Tzciy0dorwhur6yzOGUrYQ6ueUQYWbE7eNdHmhsVrlpGPgSaY"
@@ -85,12 +124,12 @@ VALID_CONTENT = {
"RVYqYQgAAADAiit0UCMDAUbjD+R2x4LvU3x/t8G3sdqDLRNfMRpjZpvcS8AwC+Y"
"VFVSQNn0AyzW0="
),
- 'ecdsa-sha2-nistp384': (
+ "ecdsa-sha2-nistp384": (
"AAAAE2VjZHNhLXNoYTItbmlzdHAzODQAAAAIbmlzdHAzODQAAABhBAaYSQs+8TT"
"0Tzciy0dorwhur6yzOGUrYQ6ueUQYWbE7eNdHmhsVrlpGPgSaYByhXtAJiPOMqL"
"U5h0eb3sCtM3ek4NvjXFTGTqPrrxJI6q0OsgrtkGE7UM9ZsfMm7q6BOA=="
),
- 'ecdsa-sha2-nistp521-cert-v01@openssh.com': (
+ "ecdsa-sha2-nistp521-cert-v01@openssh.com": (
"AAAAKGVjZHNhLXNoYTItbmlzdHA1MjEtY2VydC12MDFAb3BlbnNzaC5jb20AAAA"
"gGmRzkkMvRFk1V5U3m3mQ2nfW20SJVXk1NKnT5iZGDcEAAAAIbmlzdHA1MjEAAA"
"CFBAHosAOHAI1ZkerbKYQ72S6uit1u77PCj/OalZtXgsxv0TTAZB273puG2X94C"
@@ -109,13 +148,13 @@ VALID_CONTENT = {
"AAAQgEzkIpX3yKXPaPcK17mNx40ujEDitm4ARmbhAge0sFhZtf7YIgI55b6vkI8"
"JvMJkzQCBF1cpNOaIpVh1nFZNBphMQ=="
),
- 'ecdsa-sha2-nistp521': (
+ "ecdsa-sha2-nistp521": (
"AAAAE2VjZHNhLXNoYTItbmlzdHA1MjEAAAAIbmlzdHA1MjEAAACFBAHosAOHAI1"
"ZkerbKYQ72S6uit1u77PCj/OalZtXgsxv0TTAZB273puG2X94CQ8yyNHcby87zF"
"ZHdv5BSKyZ/cyREAAeiAcSakop9VS3+bUfZpEIqwBZXarwUjnRnxprkcQ0rfCCd"
"agkGZr/OA7DemK2D8tKLTHsKoEEWNImo6/pXDkFxA=="
),
- 'sk-ecdsa-sha2-nistp256-cert-v01@openssh.com': (
+ "sk-ecdsa-sha2-nistp256-cert-v01@openssh.com": (
"AAAAIHNzaC1lZDI1NTE5LWNlcnQtdjAxQG9wZW5zc2guY29tAAAAIIxzuxl4z3u"
"wAIslne8Huft+1n1IhHAlNbWZkQyyECCGAAAAIFOG6kY7Rf4UtCFvPwKgo/BztX"
"ck2xC4a2WyA34XtIwZAAAAAAAAAAgAAAACAAAABmp1bGl1cwAAABIAAAAFaG9zd"
@@ -124,12 +163,12 @@ VALID_CONTENT = {
"AAFMAAAALc3NoLWVkMjU1MTkAAABABGTn+Bmz86Ajk+iqKCSdP5NClsYzn4alJd"
"0V5bizhP0Kumc/HbqQfSt684J1WdSzih+EjvnTgBhK9jTBKb90AQ=="
),
- 'sk-ecdsa-sha2-nistp256@openssh.com': (
+ "sk-ecdsa-sha2-nistp256@openssh.com": (
"AAAAInNrLWVjZHNhLXNoYTItbmlzdHAyNTZAb3BlbnNzaC5jb20AAAAIbmlzdHA"
"yNTYAAABBBIELQJ2DgvaX1yQlKFokfWM2suuaCFI2qp0eJodHyg6O4ifxc3XpRK"
"d1OS8dNYQtE/YjdXSrA+AOnMF5ns2Nkx4AAAAEc3NoOg=="
),
- 'sk-ssh-ed25519-cert-v01@openssh.com': (
+ "sk-ssh-ed25519-cert-v01@openssh.com": (
"AAAAIHNzaC1lZDI1NTE5LWNlcnQtdjAxQG9wZW5zc2guY29tAAAAIIxzuxl4z3u"
"wAIslne8Huft+1n1IhHAlNbWZkQyyECCGAAAAIFOG6kY7Rf4UtCFvPwKgo/BztX"
"ck2xC4a2WyA34XtIwZAAAAAAAAAAgAAAACAAAABmp1bGl1cwAAABIAAAAFaG9zd"
@@ -138,11 +177,11 @@ VALID_CONTENT = {
"AAFMAAAALc3NoLWVkMjU1MTkAAABABGTn+Bmz86Ajk+iqKCSdP5NClsYzn4alJd"
"0V5bizhP0Kumc/HbqQfSt684J1WdSzih+EjvnTgBhK9jTBKb90AQ=="
),
- 'sk-ssh-ed25519@openssh.com': (
+ "sk-ssh-ed25519@openssh.com": (
"AAAAGnNrLXNzaC1lZDI1NTE5QG9wZW5zc2guY29tAAAAICFo/k5LU8863u66YC9"
"eUO2170QduohPURkQnbLa/dczAAAABHNzaDo="
),
- 'ssh-dss-cert-v01@openssh.com': (
+ "ssh-dss-cert-v01@openssh.com": (
"AAAAHHNzaC1kc3MtY2VydC12MDFAb3BlbnNzaC5jb20AAAAgdTlbNU9Hn9Qng3F"
"HxwH971bxCIoq1ern/QWFFDWXgmYAAACBAPqS600VGwdPAQC/p3f0uGyrLVql0c"
"Fn1zYd/JGvtabKnIYjLaYprje/NcjwI3CZFJiz4Dp3S8kLs+X5/1DMn/Tg1Y4D4"
@@ -159,7 +198,7 @@ VALID_CONTENT = {
"+F7SMGQAAAFMAAAALc3NoLWVkMjU1MTkAAABAh/z1LIdNL1b66tQ8t9DY9BTB3B"
"QKpTKmc7ezyFKLwl96yaIniZwD9Ticdbe/8i/Li3uCFE3EAt8NAIv9zff8Bg=="
),
- 'ssh-dss': (
+ "ssh-dss": (
"AAAAB3NzaC1kc3MAAACBAPqS600VGwdPAQC/p3f0uGyrLVql0cFn1zYd/JGvtab"
"KnIYjLaYprje/NcjwI3CZFJiz4Dp3S8kLs+X5/1DMn/Tg1Y4D4yLB+6vCtHcJF7"
"rVBFhvw/KZwc7G54ez3khyOtsg82fzpyOc8/mq+/+C5TMKO7DDjMF0k5emWKCsa"
@@ -171,7 +210,7 @@ VALID_CONTENT = {
"GIf95LiLSgaXMjko7joot+LK84ltLymwZ4QMnYjnZSSclf1UuyQMcUtb34+I0u9"
"Ycnyhp2mSFsQt"
),
- 'ssh-ed25519-cert-v01@openssh.com': (
+ "ssh-ed25519-cert-v01@openssh.com": (
"AAAAIHNzaC1lZDI1NTE5LWNlcnQtdjAxQG9wZW5zc2guY29tAAAAIIxzuxl4z3u"
"wAIslne8Huft+1n1IhHAlNbWZkQyyECCGAAAAIFOG6kY7Rf4UtCFvPwKgo/BztX"
"ck2xC4a2WyA34XtIwZAAAAAAAAAAgAAAACAAAABmp1bGl1cwAAABIAAAAFaG9zd"
@@ -180,11 +219,10 @@ VALID_CONTENT = {
"AAFMAAAALc3NoLWVkMjU1MTkAAABABGTn+Bmz86Ajk+iqKCSdP5NClsYzn4alJd"
"0V5bizhP0Kumc/HbqQfSt684J1WdSzih+EjvnTgBhK9jTBKb90AQ=="
),
- 'ssh-ed25519': (
- "AAAAC3NzaC1lZDI1NTE5AAAAIFOG6kY7Rf4UtCFvPwKgo/BztXck2xC4a2WyA34"
- "XtIwZ"
+ "ssh-ed25519": (
+ "AAAAC3NzaC1lZDI1NTE5AAAAIFOG6kY7Rf4UtCFvPwKgo/BztXck2xC4a2WyA34XtIwZ"
),
- 'ssh-rsa-cert-v01@openssh.com': (
+ "ssh-rsa-cert-v01@openssh.com": (
"AAAAHHNzaC1yc2EtY2VydC12MDFAb3BlbnNzaC5jb20AAAAg98LhS2EHxLOWCLo"
"pZPwHdg/RJXusnkOqQXSc9R7aITkAAAADAQABAAAAgQDLV5lUTt7FrADseB/CGh"
"EZzpoojjEW5y8+ePvLppmK3MmMI18ud6vxzpK3bwZLYkVSyfJYI0HmIuGhdu7yM"
@@ -195,13 +233,13 @@ VALID_CONTENT = {
"he0jBkAAABTAAAAC3NzaC1lZDI1NTE5AAAAQI3QGlUCzC07KorupxpDkkGy6tni"
"aZ8EvBflzvv+itXWNchGvfUeHmVT6aX0sRqehdz/lR+GmXRoZBhofwh0qAM="
),
- 'ssh-rsa': (
+ "ssh-rsa": (
"AAAAB3NzaC1yc2EAAAADAQABAAAAgQDLV5lUTt7FrADseB/CGhEZzpoojjEW5y8"
"+ePvLppmK3MmMI18ud6vxzpK3bwZLYkVSyfJYI0HmIuGhdu7yMrW6wb84gbq8C3"
"1Xoe9EORcIUuGSvDKdNSM1SjlhDquRblDFB8kToqXyx1lqrXecXylxIUOL0jE+u"
"0rU1967pDJx+w=="
),
- 'ssh-xmss-cert-v01@openssh.com': (
+ "ssh-xmss-cert-v01@openssh.com": (
"AAAAHXNzaC14bXNzLWNlcnQtdjAxQG9wZW5zc2guY29tAAAAIM2UD0IH+Igsekq"
"xjTO5f36exX4WGRMCtDGPjwfbXblxAAAAFVhNU1NfU0hBMi0yNTZfVzE2X0gxMA"
"AAAEDI83/K5JMOy0BMJgQypRdz35ApAnoQinMJ8ZMoZPaEJF8Z4rANQlfzaAXum"
@@ -267,7 +305,7 @@ VALID_CONTENT = {
"rNYClh8fQEQ8XuOCDpomMWu58YOTfbZNMDWs/Ou7RfCjX+VNwjPShDK9joMwWKc"
"Jy3QalZbaoWtcyyvXxR2sqhVR9F7Cmasq4="
),
- 'ssh-xmss@openssh.com': (
+ "ssh-xmss@openssh.com": (
"AAAAFHNzaC14bXNzQG9wZW5zc2guY29tAAAAFVhNU1NfU0hBMi0yNTZfVzE2X0g"
"xMAAAAECqptWnK94d+Sj2xcdTu8gz+75lawZoLSZFqC5IhbYuT/Z3oBZCim6yt+"
"HAmk6MKldl3Fg+74v4sR/SII0I0Jv/"
@@ -278,19 +316,25 @@ KEY_TYPES = list(VALID_CONTENT.keys())
TEST_OPTIONS = (
"no-port-forwarding,no-agent-forwarding,no-X11-forwarding,"
- 'command="echo \'Please login as the user \"ubuntu\" rather than the'
- 'user \"root\".\';echo;sleep 10"')
+ 'command="echo \'Please login as the user "ubuntu" rather than the'
+ 'user "root".\';echo;sleep 10"'
+)
class TestAuthKeyLineParser(test_helpers.CiTestCase):
-
def test_simple_parse(self):
# test key line with common 3 fields (keytype, base64, comment)
parser = ssh_util.AuthKeyLineParser()
for ktype in KEY_TYPES:
content = VALID_CONTENT[ktype]
- comment = 'user-%s@host' % ktype
- line = ' '.join((ktype, content, comment,))
+ comment = "user-%s@host" % ktype
+ line = " ".join(
+ (
+ ktype,
+ content,
+ comment,
+ )
+ )
key = parser.parse(line)
self.assertEqual(key.base64, content)
@@ -303,7 +347,12 @@ class TestAuthKeyLineParser(test_helpers.CiTestCase):
parser = ssh_util.AuthKeyLineParser()
for ktype in KEY_TYPES:
content = VALID_CONTENT[ktype]
- line = ' '.join((ktype, content,))
+ line = " ".join(
+ (
+ ktype,
+ content,
+ )
+ )
key = parser.parse(line)
self.assertEqual(key.base64, content)
@@ -317,8 +366,15 @@ class TestAuthKeyLineParser(test_helpers.CiTestCase):
options = TEST_OPTIONS
for ktype in KEY_TYPES:
content = VALID_CONTENT[ktype]
- comment = 'user-%s@host' % ktype
- line = ' '.join((options, ktype, content, comment,))
+ comment = "user-%s@host" % ktype
+ line = " ".join(
+ (
+ options,
+ ktype,
+ content,
+ comment,
+ )
+ )
key = parser.parse(line)
self.assertEqual(key.base64, content)
@@ -330,7 +386,7 @@ class TestAuthKeyLineParser(test_helpers.CiTestCase):
# test key line with key type and base64 only
parser = ssh_util.AuthKeyLineParser()
- baseline = ' '.join(("rsa", VALID_CONTENT['rsa'], "user@host"))
+ baseline = " ".join(("rsa", VALID_CONTENT["rsa"], "user@host"))
myopts = "no-port-forwarding,no-agent-forwarding"
key = parser.parse("allowedopt" + " " + baseline)
@@ -341,59 +397,62 @@ class TestAuthKeyLineParser(test_helpers.CiTestCase):
def test_parse_invalid_keytype(self):
parser = ssh_util.AuthKeyLineParser()
- key = parser.parse(' '.join(["badkeytype", VALID_CONTENT['rsa']]))
+ key = parser.parse(" ".join(["badkeytype", VALID_CONTENT["rsa"]]))
self.assertFalse(key.valid())
class TestUpdateAuthorizedKeys(test_helpers.CiTestCase):
-
def test_new_keys_replace(self):
"""new entries with the same base64 should replace old."""
orig_entries = [
- ' '.join(('rsa', VALID_CONTENT['rsa'], 'orig_comment1')),
- ' '.join(('dsa', VALID_CONTENT['dsa'], 'orig_comment2'))]
+ " ".join(("rsa", VALID_CONTENT["rsa"], "orig_comment1")),
+ " ".join(("dsa", VALID_CONTENT["dsa"], "orig_comment2")),
+ ]
new_entries = [
- ' '.join(('rsa', VALID_CONTENT['rsa'], 'new_comment1')), ]
+ " ".join(("rsa", VALID_CONTENT["rsa"], "new_comment1")),
+ ]
- expected = '\n'.join([new_entries[0], orig_entries[1]]) + '\n'
+ expected = "\n".join([new_entries[0], orig_entries[1]]) + "\n"
parser = ssh_util.AuthKeyLineParser()
found = ssh_util.update_authorized_keys(
[parser.parse(p) for p in orig_entries],
- [parser.parse(p) for p in new_entries])
+ [parser.parse(p) for p in new_entries],
+ )
self.assertEqual(expected, found)
def test_new_invalid_keys_are_ignored(self):
"""new entries that are invalid should be skipped."""
orig_entries = [
- ' '.join(('rsa', VALID_CONTENT['rsa'], 'orig_comment1')),
- ' '.join(('dsa', VALID_CONTENT['dsa'], 'orig_comment2'))]
+ " ".join(("rsa", VALID_CONTENT["rsa"], "orig_comment1")),
+ " ".join(("dsa", VALID_CONTENT["dsa"], "orig_comment2")),
+ ]
new_entries = [
- ' '.join(('rsa', VALID_CONTENT['rsa'], 'new_comment1')),
- 'xxx-invalid-thing1',
- 'xxx-invalid-blob2'
+ " ".join(("rsa", VALID_CONTENT["rsa"], "new_comment1")),
+ "xxx-invalid-thing1",
+ "xxx-invalid-blob2",
]
- expected = '\n'.join([new_entries[0], orig_entries[1]]) + '\n'
+ expected = "\n".join([new_entries[0], orig_entries[1]]) + "\n"
parser = ssh_util.AuthKeyLineParser()
found = ssh_util.update_authorized_keys(
[parser.parse(p) for p in orig_entries],
- [parser.parse(p) for p in new_entries])
+ [parser.parse(p) for p in new_entries],
+ )
self.assertEqual(expected, found)
class TestParseSSHConfig(test_helpers.CiTestCase):
-
def setUp(self):
- self.load_file_patch = patch('cloudinit.ssh_util.util.load_file')
+ self.load_file_patch = patch("cloudinit.ssh_util.util.load_file")
self.load_file = self.load_file_patch.start()
- self.isfile_patch = patch('cloudinit.ssh_util.os.path.isfile')
+ self.isfile_patch = patch("cloudinit.ssh_util.os.path.isfile")
self.isfile = self.isfile_patch.start()
self.isfile.return_value = True
@@ -404,60 +463,61 @@ class TestParseSSHConfig(test_helpers.CiTestCase):
def test_not_a_file(self):
self.isfile.return_value = False
self.load_file.side_effect = IOError
- ret = ssh_util.parse_ssh_config('not a real file')
+ ret = ssh_util.parse_ssh_config("not a real file")
self.assertEqual([], ret)
def test_empty_file(self):
- self.load_file.return_value = ''
- ret = ssh_util.parse_ssh_config('some real file')
+ self.load_file.return_value = ""
+ ret = ssh_util.parse_ssh_config("some real file")
self.assertEqual([], ret)
def test_comment_line(self):
- comment_line = '# This is a comment'
+ comment_line = "# This is a comment"
self.load_file.return_value = comment_line
- ret = ssh_util.parse_ssh_config('some real file')
+ ret = ssh_util.parse_ssh_config("some real file")
self.assertEqual(1, len(ret))
self.assertEqual(comment_line, ret[0].line)
def test_blank_lines(self):
- lines = ['', '\t', ' ']
- self.load_file.return_value = '\n'.join(lines)
- ret = ssh_util.parse_ssh_config('some real file')
+ lines = ["", "\t", " "]
+ self.load_file.return_value = "\n".join(lines)
+ ret = ssh_util.parse_ssh_config("some real file")
self.assertEqual(len(lines), len(ret))
for line in ret:
- self.assertEqual('', line.line)
+ self.assertEqual("", line.line)
def test_lower_case_config(self):
- self.load_file.return_value = 'foo bar'
- ret = ssh_util.parse_ssh_config('some real file')
+ self.load_file.return_value = "foo bar"
+ ret = ssh_util.parse_ssh_config("some real file")
self.assertEqual(1, len(ret))
- self.assertEqual('foo', ret[0].key)
- self.assertEqual('bar', ret[0].value)
+ self.assertEqual("foo", ret[0].key)
+ self.assertEqual("bar", ret[0].value)
def test_upper_case_config(self):
- self.load_file.return_value = 'Foo Bar'
- ret = ssh_util.parse_ssh_config('some real file')
+ self.load_file.return_value = "Foo Bar"
+ ret = ssh_util.parse_ssh_config("some real file")
self.assertEqual(1, len(ret))
- self.assertEqual('foo', ret[0].key)
- self.assertEqual('Bar', ret[0].value)
+ self.assertEqual("foo", ret[0].key)
+ self.assertEqual("Bar", ret[0].value)
def test_lower_case_with_equals(self):
- self.load_file.return_value = 'foo=bar'
- ret = ssh_util.parse_ssh_config('some real file')
+ self.load_file.return_value = "foo=bar"
+ ret = ssh_util.parse_ssh_config("some real file")
self.assertEqual(1, len(ret))
- self.assertEqual('foo', ret[0].key)
- self.assertEqual('bar', ret[0].value)
+ self.assertEqual("foo", ret[0].key)
+ self.assertEqual("bar", ret[0].value)
def test_upper_case_with_equals(self):
- self.load_file.return_value = 'Foo=bar'
- ret = ssh_util.parse_ssh_config('some real file')
+ self.load_file.return_value = "Foo=bar"
+ ret = ssh_util.parse_ssh_config("some real file")
self.assertEqual(1, len(ret))
- self.assertEqual('foo', ret[0].key)
- self.assertEqual('bar', ret[0].value)
+ self.assertEqual("foo", ret[0].key)
+ self.assertEqual("bar", ret[0].value)
class TestUpdateSshConfigLines(test_helpers.CiTestCase):
"""Test the update_ssh_config_lines method."""
+
exlines = [
"#PasswordAuthentication yes",
"UsePAM yes",
@@ -476,8 +536,8 @@ class TestUpdateSshConfigLines(test_helpers.CiTestCase):
def test_new_option_added(self):
"""A single update of non-existing option."""
lines = ssh_util.parse_ssh_config_lines(list(self.exlines))
- result = ssh_util.update_ssh_config_lines(lines, {'MyKey': 'MyVal'})
- self.assertEqual(['MyKey'], result)
+ result = ssh_util.update_ssh_config_lines(lines, {"MyKey": "MyVal"})
+ self.assertEqual(["MyKey"], result)
self.check_line(lines[-1], "MyKey", "MyVal")
def test_commented_out_not_updated_but_appended(self):
@@ -487,6 +547,14 @@ class TestUpdateSshConfigLines(test_helpers.CiTestCase):
self.assertEqual([self.pwauth], result)
self.check_line(lines[-1], self.pwauth, "no")
+ def test_option_without_value(self):
+ """Implementation only accepts key-value pairs."""
+ extended_exlines = self.exlines.copy()
+ denyusers_opt = "DenyUsers"
+ extended_exlines.append(denyusers_opt)
+ lines = ssh_util.parse_ssh_config_lines(list(extended_exlines))
+ self.assertNotIn(denyusers_opt, str(lines))
+
def test_single_option_updated(self):
"""A single update should have change made and line updated."""
opt, val = ("UsePAM", "no")
@@ -497,8 +565,12 @@ class TestUpdateSshConfigLines(test_helpers.CiTestCase):
def test_multiple_updates_with_add(self):
"""Verify multiple updates some added some changed, some not."""
- updates = {"UsePAM": "no", "X11Forwarding": "no", "NewOpt": "newval",
- "AcceptEnv": "LANG ADD LC_*"}
+ updates = {
+ "UsePAM": "no",
+ "X11Forwarding": "no",
+ "NewOpt": "newval",
+ "AcceptEnv": "LANG ADD LC_*",
+ }
lines = ssh_util.parse_ssh_config_lines(list(self.exlines))
result = ssh_util.update_ssh_config_lines(lines, updates)
self.assertEqual(set(["UsePAM", "NewOpt", "AcceptEnv"]), set(result))
@@ -523,7 +595,7 @@ class TestUpdateSshConfigLines(test_helpers.CiTestCase):
class TestUpdateSshConfig(test_helpers.CiTestCase):
- cfgdata = '\n'.join(["#Option val", "MyKey ORIG_VAL", ""])
+ cfgdata = "\n".join(["#Option val", "MyKey ORIG_VAL", ""])
def test_modified(self):
mycfg = self.tmp_path("ssh_config_1")
@@ -533,7 +605,7 @@ class TestUpdateSshConfig(test_helpers.CiTestCase):
found = util.load_file(mycfg)
self.assertEqual(self.cfgdata.replace("ORIG_VAL", "NEW_VAL"), found)
# assert there is a newline at end of file (LP: #1677205)
- self.assertEqual('\n', found[-1])
+ self.assertEqual("\n", found[-1])
def test_not_modified(self):
mycfg = self.tmp_path("ssh_config_2")
@@ -550,76 +622,949 @@ class TestBasicAuthorizedKeyParse(test_helpers.CiTestCase):
self.assertEqual(
["/opt/bobby/keys"],
ssh_util.render_authorizedkeysfile_paths(
- "/opt/%u/keys", "/home/bobby", "bobby"))
+ "/opt/%u/keys", "/home/bobby", "bobby"
+ ),
+ )
+
+ def test_user_file(self):
+ self.assertEqual(
+ ["/opt/bobby"],
+ ssh_util.render_authorizedkeysfile_paths(
+ "/opt/%u", "/home/bobby", "bobby"
+ ),
+ )
+
+ def test_user_file2(self):
+ self.assertEqual(
+ ["/opt/bobby/bobby"],
+ ssh_util.render_authorizedkeysfile_paths(
+ "/opt/%u/%u", "/home/bobby", "bobby"
+ ),
+ )
def test_multiple(self):
self.assertEqual(
["/keys/path1", "/keys/path2"],
ssh_util.render_authorizedkeysfile_paths(
- "/keys/path1 /keys/path2", "/home/bobby", "bobby"))
+ "/keys/path1 /keys/path2", "/home/bobby", "bobby"
+ ),
+ )
+
+ def test_multiple2(self):
+ self.assertEqual(
+ ["/keys/path1", "/keys/bobby"],
+ ssh_util.render_authorizedkeysfile_paths(
+ "/keys/path1 /keys/%u", "/home/bobby", "bobby"
+ ),
+ )
def test_relative(self):
self.assertEqual(
["/home/bobby/.secret/keys"],
ssh_util.render_authorizedkeysfile_paths(
- ".secret/keys", "/home/bobby", "bobby"))
+ ".secret/keys", "/home/bobby", "bobby"
+ ),
+ )
def test_home(self):
self.assertEqual(
["/homedirs/bobby/.keys"],
ssh_util.render_authorizedkeysfile_paths(
- "%h/.keys", "/homedirs/bobby", "bobby"))
+ "%h/.keys", "/homedirs/bobby", "bobby"
+ ),
+ )
+
+ def test_all(self):
+ self.assertEqual(
+ [
+ "/homedirs/bobby/.keys",
+ "/homedirs/bobby/.secret/keys",
+ "/keys/path1",
+ "/opt/bobby/keys",
+ ],
+ ssh_util.render_authorizedkeysfile_paths(
+ "%h/.keys .secret/keys /keys/path1 /opt/%u/keys",
+ "/homedirs/bobby",
+ "bobby",
+ ),
+ )
class TestMultipleSshAuthorizedKeysFile(test_helpers.CiTestCase):
+ def create_fake_users(
+ self,
+ names,
+ mock_permissions,
+ m_get_group,
+ m_get_owner,
+ m_get_permissions,
+ m_getpwnam,
+ users,
+ ):
+ homes = []
+
+ root = "/tmp/root"
+ fpw = FakePwEnt(pw_name="root", pw_dir=root)
+ users["root"] = fpw
+
+ for name in names:
+ home = "/tmp/home/" + name
+ fpw = FakePwEnt(pw_name=name, pw_dir=home)
+ users[name] = fpw
+ homes.append(home)
+
+ m_get_permissions.side_effect = partial(
+ mock_get_permissions, mock_permissions
+ )
+ m_get_owner.side_effect = partial(mock_get_owner, mock_permissions)
+ m_get_group.side_effect = partial(mock_get_group, mock_permissions)
+ m_getpwnam.side_effect = partial(mock_getpwnam, users)
+ return homes
+
+ def create_user_authorized_file(self, home, filename, content_key, keys):
+ user_ssh_folder = "%s/.ssh" % home
+ # /tmp/home/<user>/.ssh/authorized_keys = content_key
+ authorized_keys = self.tmp_path(filename, dir=user_ssh_folder)
+ util.write_file(authorized_keys, VALID_CONTENT[content_key])
+ keys[authorized_keys] = content_key
+ return authorized_keys
+
+ def create_global_authorized_file(self, filename, content_key, keys):
+ authorized_keys = self.tmp_path(filename, dir="/tmp")
+ util.write_file(authorized_keys, VALID_CONTENT[content_key])
+ keys[authorized_keys] = content_key
+ return authorized_keys
+
+ def create_sshd_config(self, authorized_keys_files):
+ sshd_config = self.tmp_path("sshd_config", dir="/tmp")
+ util.write_file(
+ sshd_config, "AuthorizedKeysFile " + authorized_keys_files
+ )
+ return sshd_config
+
+ def execute_and_check(
+ self, user, sshd_config, solution, keys, delete_keys=True
+ ):
+ (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys(
+ user, sshd_config
+ )
+ content = ssh_util.update_authorized_keys(auth_key_entries, [])
+
+ self.assertEqual(auth_key_fn, solution)
+ for path, key in keys.items():
+ if path == solution:
+ self.assertTrue(VALID_CONTENT[key] in content)
+ else:
+ self.assertFalse(VALID_CONTENT[key] in content)
+
+ if delete_keys and os.path.isdir("/tmp/home/"):
+ util.delete_dir_contents("/tmp/home/")
+
+ @patch("cloudinit.ssh_util.pwd.getpwnam")
+ @patch("cloudinit.util.get_permissions")
+ @patch("cloudinit.util.get_owner")
+ @patch("cloudinit.util.get_group")
+ def test_single_user_two_local_files(
+ self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam
+ ):
+ user_bobby = "bobby"
+ keys = {}
+ users = {}
+ mock_permissions = {
+ "/tmp/home/bobby": ("bobby", "bobby", 0o700),
+ "/tmp/home/bobby/.ssh": ("bobby", "bobby", 0o700),
+ "/tmp/home/bobby/.ssh/user_keys": ("bobby", "bobby", 0o600),
+ "/tmp/home/bobby/.ssh/authorized_keys": ("bobby", "bobby", 0o600),
+ }
+
+ homes = self.create_fake_users(
+ [user_bobby],
+ mock_permissions,
+ m_get_group,
+ m_get_owner,
+ m_get_permissions,
+ m_getpwnam,
+ users,
+ )
+ home = homes[0]
+
+ # /tmp/home/bobby/.ssh/authorized_keys = rsa
+ authorized_keys = self.create_user_authorized_file(
+ home, "authorized_keys", "rsa", keys
+ )
+
+ # /tmp/home/bobby/.ssh/user_keys = dsa
+ user_keys = self.create_user_authorized_file(
+ home, "user_keys", "dsa", keys
+ )
+
+ # /tmp/sshd_config
+ options = "%s %s" % (authorized_keys, user_keys)
+ sshd_config = self.create_sshd_config(options)
+
+ self.execute_and_check(user_bobby, sshd_config, authorized_keys, keys)
@patch("cloudinit.ssh_util.pwd.getpwnam")
- def test_multiple_authorizedkeys_file_order1(self, m_getpwnam):
- fpw = FakePwEnt(pw_name='bobby', pw_dir='/home2/bobby')
- m_getpwnam.return_value = fpw
- authorized_keys = self.tmp_path('authorized_keys')
- util.write_file(authorized_keys, VALID_CONTENT['rsa'])
+ @patch("cloudinit.util.get_permissions")
+ @patch("cloudinit.util.get_owner")
+ @patch("cloudinit.util.get_group")
+ def test_single_user_two_local_files_inverted(
+ self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam
+ ):
+ user_bobby = "bobby"
+ keys = {}
+ users = {}
+ mock_permissions = {
+ "/tmp/home/bobby": ("bobby", "bobby", 0o700),
+ "/tmp/home/bobby/.ssh": ("bobby", "bobby", 0o700),
+ "/tmp/home/bobby/.ssh/user_keys": ("bobby", "bobby", 0o600),
+ "/tmp/home/bobby/.ssh/authorized_keys": ("bobby", "bobby", 0o600),
+ }
+
+ homes = self.create_fake_users(
+ [user_bobby],
+ mock_permissions,
+ m_get_group,
+ m_get_owner,
+ m_get_permissions,
+ m_getpwnam,
+ users,
+ )
+ home = homes[0]
- user_keys = self.tmp_path('user_keys')
- util.write_file(user_keys, VALID_CONTENT['dsa'])
+ # /tmp/home/bobby/.ssh/authorized_keys = rsa
+ authorized_keys = self.create_user_authorized_file(
+ home, "authorized_keys", "rsa", keys
+ )
- sshd_config = self.tmp_path('sshd_config')
- util.write_file(
- sshd_config,
- "AuthorizedKeysFile %s %s" % (authorized_keys, user_keys)
+ # /tmp/home/bobby/.ssh/user_keys = dsa
+ user_keys = self.create_user_authorized_file(
+ home, "user_keys", "dsa", keys
)
- (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys(
- fpw.pw_name, sshd_config)
- content = ssh_util.update_authorized_keys(auth_key_entries, [])
+ # /tmp/sshd_config
+ options = "%s %s" % (user_keys, authorized_keys)
+ sshd_config = self.create_sshd_config(options)
- self.assertEqual(authorized_keys, auth_key_fn)
- self.assertTrue(VALID_CONTENT['rsa'] in content)
- self.assertTrue(VALID_CONTENT['dsa'] in content)
+ self.execute_and_check(user_bobby, sshd_config, user_keys, keys)
@patch("cloudinit.ssh_util.pwd.getpwnam")
- def test_multiple_authorizedkeys_file_order2(self, m_getpwnam):
- fpw = FakePwEnt(pw_name='suzie', pw_dir='/home/suzie')
- m_getpwnam.return_value = fpw
- authorized_keys = self.tmp_path('authorized_keys')
- util.write_file(authorized_keys, VALID_CONTENT['rsa'])
+ @patch("cloudinit.util.get_permissions")
+ @patch("cloudinit.util.get_owner")
+ @patch("cloudinit.util.get_group")
+ def test_single_user_local_global_files(
+ self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam
+ ):
+ user_bobby = "bobby"
+ keys = {}
+ users = {}
+ mock_permissions = {
+ "/tmp/home/bobby": ("bobby", "bobby", 0o700),
+ "/tmp/home/bobby/.ssh": ("bobby", "bobby", 0o700),
+ "/tmp/home/bobby/.ssh/user_keys": ("bobby", "bobby", 0o600),
+ "/tmp/home/bobby/.ssh/authorized_keys": ("bobby", "bobby", 0o600),
+ }
+
+ homes = self.create_fake_users(
+ [user_bobby],
+ mock_permissions,
+ m_get_group,
+ m_get_owner,
+ m_get_permissions,
+ m_getpwnam,
+ users,
+ )
+ home = homes[0]
- user_keys = self.tmp_path('user_keys')
- util.write_file(user_keys, VALID_CONTENT['dsa'])
+ # /tmp/home/bobby/.ssh/authorized_keys = rsa
+ authorized_keys = self.create_user_authorized_file(
+ home, "authorized_keys", "rsa", keys
+ )
- sshd_config = self.tmp_path('sshd_config')
- util.write_file(
- sshd_config,
- "AuthorizedKeysFile %s %s" % (user_keys, authorized_keys)
+ # /tmp/home/bobby/.ssh/user_keys = dsa
+ user_keys = self.create_user_authorized_file(
+ home, "user_keys", "dsa", keys
)
- (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys(
- fpw.pw_name, sshd_config
+ authorized_keys_global = self.create_global_authorized_file(
+ "etc/ssh/authorized_keys", "ecdsa", keys
)
- content = ssh_util.update_authorized_keys(auth_key_entries, [])
- self.assertEqual(user_keys, auth_key_fn)
- self.assertTrue(VALID_CONTENT['rsa'] in content)
- self.assertTrue(VALID_CONTENT['dsa'] in content)
+ options = "%s %s %s" % (
+ authorized_keys_global,
+ user_keys,
+ authorized_keys,
+ )
+ sshd_config = self.create_sshd_config(options)
+
+ self.execute_and_check(user_bobby, sshd_config, user_keys, keys)
+
+ @patch("cloudinit.ssh_util.pwd.getpwnam")
+ @patch("cloudinit.util.get_permissions")
+ @patch("cloudinit.util.get_owner")
+ @patch("cloudinit.util.get_group")
+ def test_single_user_local_global_files_inverted(
+ self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam
+ ):
+ user_bobby = "bobby"
+ keys = {}
+ users = {}
+ mock_permissions = {
+ "/tmp/home/bobby": ("bobby", "bobby", 0o700),
+ "/tmp/home/bobby/.ssh": ("bobby", "bobby", 0o700),
+ "/tmp/home/bobby/.ssh/user_keys3": ("bobby", "bobby", 0o600),
+ "/tmp/home/bobby/.ssh/authorized_keys2": ("bobby", "bobby", 0o600),
+ }
+
+ homes = self.create_fake_users(
+ [user_bobby],
+ mock_permissions,
+ m_get_group,
+ m_get_owner,
+ m_get_permissions,
+ m_getpwnam,
+ users,
+ )
+ home = homes[0]
+
+ # /tmp/home/bobby/.ssh/authorized_keys = rsa
+ authorized_keys = self.create_user_authorized_file(
+ home, "authorized_keys2", "rsa", keys
+ )
+
+ # /tmp/home/bobby/.ssh/user_keys = dsa
+ user_keys = self.create_user_authorized_file(
+ home, "user_keys3", "dsa", keys
+ )
+
+ authorized_keys_global = self.create_global_authorized_file(
+ "etc/ssh/authorized_keys", "ecdsa", keys
+ )
+
+ options = "%s %s %s" % (
+ authorized_keys_global,
+ authorized_keys,
+ user_keys,
+ )
+ sshd_config = self.create_sshd_config(options)
+
+ self.execute_and_check(user_bobby, sshd_config, authorized_keys, keys)
+
+ @patch("cloudinit.ssh_util.pwd.getpwnam")
+ @patch("cloudinit.util.get_permissions")
+ @patch("cloudinit.util.get_owner")
+ @patch("cloudinit.util.get_group")
+ def test_single_user_global_file(
+ self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam
+ ):
+ user_bobby = "bobby"
+ keys = {}
+ users = {}
+ mock_permissions = {
+ "/tmp/home/bobby": ("bobby", "bobby", 0o700),
+ "/tmp/home/bobby/.ssh": ("bobby", "bobby", 0o700),
+ "/tmp/home/bobby/.ssh/authorized_keys": ("bobby", "bobby", 0o600),
+ }
+
+ homes = self.create_fake_users(
+ [user_bobby],
+ mock_permissions,
+ m_get_group,
+ m_get_owner,
+ m_get_permissions,
+ m_getpwnam,
+ users,
+ )
+ home = homes[0]
+
+ # /tmp/etc/ssh/authorized_keys = rsa
+ authorized_keys_global = self.create_global_authorized_file(
+ "etc/ssh/authorized_keys", "rsa", keys
+ )
+
+ options = "%s" % authorized_keys_global
+ sshd_config = self.create_sshd_config(options)
+
+ default = "%s/.ssh/authorized_keys" % home
+ self.execute_and_check(user_bobby, sshd_config, default, keys)
+
+ @patch("cloudinit.ssh_util.pwd.getpwnam")
+ @patch("cloudinit.util.get_permissions")
+ @patch("cloudinit.util.get_owner")
+ @patch("cloudinit.util.get_group")
+ def test_two_users_local_file_standard(
+ self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam
+ ):
+ keys = {}
+ users = {}
+ mock_permissions = {
+ "/tmp/home/bobby": ("bobby", "bobby", 0o700),
+ "/tmp/home/bobby/.ssh": ("bobby", "bobby", 0o700),
+ "/tmp/home/bobby/.ssh/authorized_keys": ("bobby", "bobby", 0o600),
+ "/tmp/home/suzie": ("suzie", "suzie", 0o700),
+ "/tmp/home/suzie/.ssh": ("suzie", "suzie", 0o700),
+ "/tmp/home/suzie/.ssh/authorized_keys": ("suzie", "suzie", 0o600),
+ }
+
+ user_bobby = "bobby"
+ user_suzie = "suzie"
+ homes = self.create_fake_users(
+ [user_bobby, user_suzie],
+ mock_permissions,
+ m_get_group,
+ m_get_owner,
+ m_get_permissions,
+ m_getpwnam,
+ users,
+ )
+ home_bobby = homes[0]
+ home_suzie = homes[1]
+
+ # /tmp/home/bobby/.ssh/authorized_keys = rsa
+ authorized_keys = self.create_user_authorized_file(
+ home_bobby, "authorized_keys", "rsa", keys
+ )
+
+ # /tmp/home/suzie/.ssh/authorized_keys = rsa
+ authorized_keys2 = self.create_user_authorized_file(
+ home_suzie, "authorized_keys", "ssh-xmss@openssh.com", keys
+ )
+
+ options = ".ssh/authorized_keys"
+ sshd_config = self.create_sshd_config(options)
+
+ self.execute_and_check(
+ user_bobby, sshd_config, authorized_keys, keys, delete_keys=False
+ )
+ self.execute_and_check(user_suzie, sshd_config, authorized_keys2, keys)
+
+ @patch("cloudinit.ssh_util.pwd.getpwnam")
+ @patch("cloudinit.util.get_permissions")
+ @patch("cloudinit.util.get_owner")
+ @patch("cloudinit.util.get_group")
+ def test_two_users_local_file_custom(
+ self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam
+ ):
+ keys = {}
+ users = {}
+ mock_permissions = {
+ "/tmp/home/bobby": ("bobby", "bobby", 0o700),
+ "/tmp/home/bobby/.ssh": ("bobby", "bobby", 0o700),
+ "/tmp/home/bobby/.ssh/authorized_keys2": ("bobby", "bobby", 0o600),
+ "/tmp/home/suzie": ("suzie", "suzie", 0o700),
+ "/tmp/home/suzie/.ssh": ("suzie", "suzie", 0o700),
+ "/tmp/home/suzie/.ssh/authorized_keys2": ("suzie", "suzie", 0o600),
+ }
+
+ user_bobby = "bobby"
+ user_suzie = "suzie"
+ homes = self.create_fake_users(
+ [user_bobby, user_suzie],
+ mock_permissions,
+ m_get_group,
+ m_get_owner,
+ m_get_permissions,
+ m_getpwnam,
+ users,
+ )
+ home_bobby = homes[0]
+ home_suzie = homes[1]
+
+ # /tmp/home/bobby/.ssh/authorized_keys2 = rsa
+ authorized_keys = self.create_user_authorized_file(
+ home_bobby, "authorized_keys2", "rsa", keys
+ )
+
+ # /tmp/home/suzie/.ssh/authorized_keys2 = rsa
+ authorized_keys2 = self.create_user_authorized_file(
+ home_suzie, "authorized_keys2", "ssh-xmss@openssh.com", keys
+ )
+
+ options = ".ssh/authorized_keys2"
+ sshd_config = self.create_sshd_config(options)
+
+ self.execute_and_check(
+ user_bobby, sshd_config, authorized_keys, keys, delete_keys=False
+ )
+ self.execute_and_check(user_suzie, sshd_config, authorized_keys2, keys)
+
+ @patch("cloudinit.ssh_util.pwd.getpwnam")
+ @patch("cloudinit.util.get_permissions")
+ @patch("cloudinit.util.get_owner")
+ @patch("cloudinit.util.get_group")
+ def test_two_users_local_global_files(
+ self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam
+ ):
+ keys = {}
+ users = {}
+ mock_permissions = {
+ "/tmp/home/bobby": ("bobby", "bobby", 0o700),
+ "/tmp/home/bobby/.ssh": ("bobby", "bobby", 0o700),
+ "/tmp/home/bobby/.ssh/authorized_keys2": ("bobby", "bobby", 0o600),
+ "/tmp/home/bobby/.ssh/user_keys3": ("bobby", "bobby", 0o600),
+ "/tmp/home/suzie": ("suzie", "suzie", 0o700),
+ "/tmp/home/suzie/.ssh": ("suzie", "suzie", 0o700),
+ "/tmp/home/suzie/.ssh/authorized_keys2": ("suzie", "suzie", 0o600),
+ "/tmp/home/suzie/.ssh/user_keys3": ("suzie", "suzie", 0o600),
+ }
+
+ user_bobby = "bobby"
+ user_suzie = "suzie"
+ homes = self.create_fake_users(
+ [user_bobby, user_suzie],
+ mock_permissions,
+ m_get_group,
+ m_get_owner,
+ m_get_permissions,
+ m_getpwnam,
+ users,
+ )
+ home_bobby = homes[0]
+ home_suzie = homes[1]
+
+ # /tmp/home/bobby/.ssh/authorized_keys2 = rsa
+ self.create_user_authorized_file(
+ home_bobby, "authorized_keys2", "rsa", keys
+ )
+ # /tmp/home/bobby/.ssh/user_keys3 = dsa
+ user_keys = self.create_user_authorized_file(
+ home_bobby, "user_keys3", "dsa", keys
+ )
+
+ # /tmp/home/suzie/.ssh/authorized_keys2 = rsa
+ authorized_keys2 = self.create_user_authorized_file(
+ home_suzie, "authorized_keys2", "ssh-xmss@openssh.com", keys
+ )
+
+ # /tmp/etc/ssh/authorized_keys = ecdsa
+ authorized_keys_global = self.create_global_authorized_file(
+ "etc/ssh/authorized_keys2", "ecdsa", keys
+ )
+
+ options = "%s %s %%h/.ssh/authorized_keys2" % (
+ authorized_keys_global,
+ user_keys,
+ )
+ sshd_config = self.create_sshd_config(options)
+
+ self.execute_and_check(
+ user_bobby, sshd_config, user_keys, keys, delete_keys=False
+ )
+ self.execute_and_check(user_suzie, sshd_config, authorized_keys2, keys)
+
+ @patch("cloudinit.util.get_user_groups")
+ @patch("cloudinit.ssh_util.pwd.getpwnam")
+ @patch("cloudinit.util.get_permissions")
+ @patch("cloudinit.util.get_owner")
+ @patch("cloudinit.util.get_group")
+ def test_two_users_local_global_files_badguy(
+ self,
+ m_get_group,
+ m_get_owner,
+ m_get_permissions,
+ m_getpwnam,
+ m_get_user_groups,
+ ):
+ keys = {}
+ users = {}
+ mock_permissions = {
+ "/tmp/home/bobby": ("bobby", "bobby", 0o700),
+ "/tmp/home/bobby/.ssh": ("bobby", "bobby", 0o700),
+ "/tmp/home/bobby/.ssh/authorized_keys2": ("bobby", "bobby", 0o600),
+ "/tmp/home/bobby/.ssh/user_keys3": ("bobby", "bobby", 0o600),
+ "/tmp/home/badguy": ("root", "root", 0o755),
+ "/tmp/home/badguy/home": ("root", "root", 0o755),
+ "/tmp/home/badguy/home/bobby": ("root", "root", 0o655),
+ }
+
+ user_bobby = "bobby"
+ user_badguy = "badguy"
+ home_bobby, *_ = self.create_fake_users(
+ [user_bobby, user_badguy],
+ mock_permissions,
+ m_get_group,
+ m_get_owner,
+ m_get_permissions,
+ m_getpwnam,
+ users,
+ )
+ m_get_user_groups.side_effect = mock_get_user_groups
+
+ # /tmp/home/bobby/.ssh/authorized_keys2 = rsa
+ authorized_keys = self.create_user_authorized_file(
+ home_bobby, "authorized_keys2", "rsa", keys
+ )
+ # /tmp/home/bobby/.ssh/user_keys3 = dsa
+ user_keys = self.create_user_authorized_file(
+ home_bobby, "user_keys3", "dsa", keys
+ )
+
+ # /tmp/home/badguy/home/bobby = ""
+ authorized_keys2 = self.tmp_path("home/bobby", dir="/tmp/home/badguy")
+ util.write_file(authorized_keys2, "")
+
+ # /tmp/etc/ssh/authorized_keys = ecdsa
+ authorized_keys_global = self.create_global_authorized_file(
+ "etc/ssh/authorized_keys2", "ecdsa", keys
+ )
+
+ # /tmp/sshd_config
+ options = "%s %%h/.ssh/authorized_keys2 %s %s" % (
+ authorized_keys2,
+ authorized_keys_global,
+ user_keys,
+ )
+ sshd_config = self.create_sshd_config(options)
+
+ self.execute_and_check(
+ user_bobby, sshd_config, authorized_keys, keys, delete_keys=False
+ )
+ self.execute_and_check(
+ user_badguy, sshd_config, authorized_keys2, keys
+ )
+
+ @patch("cloudinit.util.get_user_groups")
+ @patch("cloudinit.ssh_util.pwd.getpwnam")
+ @patch("cloudinit.util.get_permissions")
+ @patch("cloudinit.util.get_owner")
+ @patch("cloudinit.util.get_group")
+ def test_two_users_unaccessible_file(
+ self,
+ m_get_group,
+ m_get_owner,
+ m_get_permissions,
+ m_getpwnam,
+ m_get_user_groups,
+ ):
+ keys = {}
+ users = {}
+ mock_permissions = {
+ "/tmp/home/bobby": ("bobby", "bobby", 0o700),
+ "/tmp/home/bobby/.ssh": ("bobby", "bobby", 0o700),
+ "/tmp/home/bobby/.ssh/authorized_keys": ("bobby", "bobby", 0o600),
+ "/tmp/etc": ("root", "root", 0o755),
+ "/tmp/etc/ssh": ("root", "root", 0o755),
+ "/tmp/etc/ssh/userkeys": ("root", "root", 0o700),
+ "/tmp/etc/ssh/userkeys/bobby": ("bobby", "bobby", 0o600),
+ "/tmp/etc/ssh/userkeys/badguy": ("badguy", "badguy", 0o600),
+ "/tmp/home/badguy": ("badguy", "badguy", 0o700),
+ "/tmp/home/badguy/.ssh": ("badguy", "badguy", 0o700),
+ "/tmp/home/badguy/.ssh/authorized_keys": (
+ "badguy",
+ "badguy",
+ 0o600,
+ ),
+ }
+
+ user_bobby = "bobby"
+ user_badguy = "badguy"
+ homes = self.create_fake_users(
+ [user_bobby, user_badguy],
+ mock_permissions,
+ m_get_group,
+ m_get_owner,
+ m_get_permissions,
+ m_getpwnam,
+ users,
+ )
+ m_get_user_groups.side_effect = mock_get_user_groups
+ home_bobby = homes[0]
+ home_badguy = homes[1]
+
+ # /tmp/home/bobby/.ssh/authorized_keys = rsa
+ authorized_keys = self.create_user_authorized_file(
+ home_bobby, "authorized_keys", "rsa", keys
+ )
+ # /tmp/etc/ssh/userkeys/bobby = dsa
+ # assume here that we can bypass userkeys, despite permissions
+ self.create_global_authorized_file(
+ "etc/ssh/userkeys/bobby", "dsa", keys
+ )
+
+ # /tmp/home/badguy/.ssh/authorized_keys = ssh-xmss@openssh.com
+ authorized_keys2 = self.create_user_authorized_file(
+ home_badguy, "authorized_keys", "ssh-xmss@openssh.com", keys
+ )
+
+ # /tmp/etc/ssh/userkeys/badguy = ecdsa
+ self.create_global_authorized_file(
+ "etc/ssh/userkeys/badguy", "ecdsa", keys
+ )
+
+ # /tmp/sshd_config
+ options = "/tmp/etc/ssh/userkeys/%u .ssh/authorized_keys"
+ sshd_config = self.create_sshd_config(options)
+
+ self.execute_and_check(
+ user_bobby, sshd_config, authorized_keys, keys, delete_keys=False
+ )
+ self.execute_and_check(
+ user_badguy, sshd_config, authorized_keys2, keys
+ )
+
+ @patch("cloudinit.util.get_user_groups")
+ @patch("cloudinit.ssh_util.pwd.getpwnam")
+ @patch("cloudinit.util.get_permissions")
+ @patch("cloudinit.util.get_owner")
+ @patch("cloudinit.util.get_group")
+ def test_two_users_accessible_file(
+ self,
+ m_get_group,
+ m_get_owner,
+ m_get_permissions,
+ m_getpwnam,
+ m_get_user_groups,
+ ):
+ keys = {}
+ users = {}
+ mock_permissions = {
+ "/tmp/home/bobby": ("bobby", "bobby", 0o700),
+ "/tmp/home/bobby/.ssh": ("bobby", "bobby", 0o700),
+ "/tmp/home/bobby/.ssh/authorized_keys": ("bobby", "bobby", 0o600),
+ "/tmp/etc": ("root", "root", 0o755),
+ "/tmp/etc/ssh": ("root", "root", 0o755),
+ "/tmp/etc/ssh/userkeys": ("root", "root", 0o755),
+ "/tmp/etc/ssh/userkeys/bobby": ("bobby", "bobby", 0o600),
+ "/tmp/etc/ssh/userkeys/badguy": ("badguy", "badguy", 0o600),
+ "/tmp/home/badguy": ("badguy", "badguy", 0o700),
+ "/tmp/home/badguy/.ssh": ("badguy", "badguy", 0o700),
+ "/tmp/home/badguy/.ssh/authorized_keys": (
+ "badguy",
+ "badguy",
+ 0o600,
+ ),
+ }
+
+ user_bobby = "bobby"
+ user_badguy = "badguy"
+ homes = self.create_fake_users(
+ [user_bobby, user_badguy],
+ mock_permissions,
+ m_get_group,
+ m_get_owner,
+ m_get_permissions,
+ m_getpwnam,
+ users,
+ )
+ m_get_user_groups.side_effect = mock_get_user_groups
+ home_bobby = homes[0]
+ home_badguy = homes[1]
+
+ # /tmp/home/bobby/.ssh/authorized_keys = rsa
+ self.create_user_authorized_file(
+ home_bobby, "authorized_keys", "rsa", keys
+ )
+ # /tmp/etc/ssh/userkeys/bobby = dsa
+ # assume here that we can bypass userkeys, despite permissions
+ authorized_keys = self.create_global_authorized_file(
+ "etc/ssh/userkeys/bobby", "dsa", keys
+ )
+
+ # /tmp/home/badguy/.ssh/authorized_keys = ssh-xmss@openssh.com
+ self.create_user_authorized_file(
+ home_badguy, "authorized_keys", "ssh-xmss@openssh.com", keys
+ )
+
+ # /tmp/etc/ssh/userkeys/badguy = ecdsa
+ authorized_keys2 = self.create_global_authorized_file(
+ "etc/ssh/userkeys/badguy", "ecdsa", keys
+ )
+
+ # /tmp/sshd_config
+ options = "/tmp/etc/ssh/userkeys/%u .ssh/authorized_keys"
+ sshd_config = self.create_sshd_config(options)
+
+ self.execute_and_check(
+ user_bobby, sshd_config, authorized_keys, keys, delete_keys=False
+ )
+ self.execute_and_check(
+ user_badguy, sshd_config, authorized_keys2, keys
+ )
+
+ @patch("cloudinit.util.get_user_groups")
+ @patch("cloudinit.ssh_util.pwd.getpwnam")
+ @patch("cloudinit.util.get_permissions")
+ @patch("cloudinit.util.get_owner")
+ @patch("cloudinit.util.get_group")
+ def test_two_users_hardcoded_single_user_file(
+ self,
+ m_get_group,
+ m_get_owner,
+ m_get_permissions,
+ m_getpwnam,
+ m_get_user_groups,
+ ):
+ keys = {}
+ users = {}
+ mock_permissions = {
+ "/tmp/home/bobby": ("bobby", "bobby", 0o700),
+ "/tmp/home/bobby/.ssh": ("bobby", "bobby", 0o700),
+ "/tmp/home/bobby/.ssh/authorized_keys": ("bobby", "bobby", 0o600),
+ "/tmp/home/suzie": ("suzie", "suzie", 0o700),
+ "/tmp/home/suzie/.ssh": ("suzie", "suzie", 0o700),
+ "/tmp/home/suzie/.ssh/authorized_keys": ("suzie", "suzie", 0o600),
+ }
+
+ user_bobby = "bobby"
+ user_suzie = "suzie"
+ homes = self.create_fake_users(
+ [user_bobby, user_suzie],
+ mock_permissions,
+ m_get_group,
+ m_get_owner,
+ m_get_permissions,
+ m_getpwnam,
+ users,
+ )
+ home_bobby = homes[0]
+ home_suzie = homes[1]
+ m_get_user_groups.side_effect = mock_get_user_groups
+
+ # /tmp/home/bobby/.ssh/authorized_keys = rsa
+ authorized_keys = self.create_user_authorized_file(
+ home_bobby, "authorized_keys", "rsa", keys
+ )
+
+ # /tmp/home/suzie/.ssh/authorized_keys = ssh-xmss@openssh.com
+ self.create_user_authorized_file(
+ home_suzie, "authorized_keys", "ssh-xmss@openssh.com", keys
+ )
+
+ # /tmp/sshd_config
+ options = "%s" % (authorized_keys)
+ sshd_config = self.create_sshd_config(options)
+
+ self.execute_and_check(
+ user_bobby, sshd_config, authorized_keys, keys, delete_keys=False
+ )
+ default = "%s/.ssh/authorized_keys" % home_suzie
+ self.execute_and_check(user_suzie, sshd_config, default, keys)
+
+ @patch("cloudinit.util.get_user_groups")
+ @patch("cloudinit.ssh_util.pwd.getpwnam")
+ @patch("cloudinit.util.get_permissions")
+ @patch("cloudinit.util.get_owner")
+ @patch("cloudinit.util.get_group")
+ def test_two_users_hardcoded_single_user_file_inverted(
+ self,
+ m_get_group,
+ m_get_owner,
+ m_get_permissions,
+ m_getpwnam,
+ m_get_user_groups,
+ ):
+ keys = {}
+ users = {}
+ mock_permissions = {
+ "/tmp/home/bobby": ("bobby", "bobby", 0o700),
+ "/tmp/home/bobby/.ssh": ("bobby", "bobby", 0o700),
+ "/tmp/home/bobby/.ssh/authorized_keys": ("bobby", "bobby", 0o600),
+ "/tmp/home/suzie": ("suzie", "suzie", 0o700),
+ "/tmp/home/suzie/.ssh": ("suzie", "suzie", 0o700),
+ "/tmp/home/suzie/.ssh/authorized_keys": ("suzie", "suzie", 0o600),
+ }
+
+ user_bobby = "bobby"
+ user_suzie = "suzie"
+ homes = self.create_fake_users(
+ [user_bobby, user_suzie],
+ mock_permissions,
+ m_get_group,
+ m_get_owner,
+ m_get_permissions,
+ m_getpwnam,
+ users,
+ )
+ home_bobby = homes[0]
+ home_suzie = homes[1]
+ m_get_user_groups.side_effect = mock_get_user_groups
+
+ # /tmp/home/bobby/.ssh/authorized_keys = rsa
+ self.create_user_authorized_file(
+ home_bobby, "authorized_keys", "rsa", keys
+ )
+
+ # /tmp/home/suzie/.ssh/authorized_keys = ssh-xmss@openssh.com
+ authorized_keys2 = self.create_user_authorized_file(
+ home_suzie, "authorized_keys", "ssh-xmss@openssh.com", keys
+ )
+
+ # /tmp/sshd_config
+ options = "%s" % (authorized_keys2)
+ sshd_config = self.create_sshd_config(options)
+
+ default = "%s/.ssh/authorized_keys" % home_bobby
+ self.execute_and_check(
+ user_bobby, sshd_config, default, keys, delete_keys=False
+ )
+ self.execute_and_check(user_suzie, sshd_config, authorized_keys2, keys)
+
+ @patch("cloudinit.util.get_user_groups")
+ @patch("cloudinit.ssh_util.pwd.getpwnam")
+ @patch("cloudinit.util.get_permissions")
+ @patch("cloudinit.util.get_owner")
+ @patch("cloudinit.util.get_group")
+ def test_two_users_hardcoded_user_files(
+ self,
+ m_get_group,
+ m_get_owner,
+ m_get_permissions,
+ m_getpwnam,
+ m_get_user_groups,
+ ):
+ keys = {}
+ users = {}
+ mock_permissions = {
+ "/tmp/home/bobby": ("bobby", "bobby", 0o700),
+ "/tmp/home/bobby/.ssh": ("bobby", "bobby", 0o700),
+ "/tmp/home/bobby/.ssh/authorized_keys": ("bobby", "bobby", 0o600),
+ "/tmp/home/suzie": ("suzie", "suzie", 0o700),
+ "/tmp/home/suzie/.ssh": ("suzie", "suzie", 0o700),
+ "/tmp/home/suzie/.ssh/authorized_keys": ("suzie", "suzie", 0o600),
+ }
+
+ user_bobby = "bobby"
+ user_suzie = "suzie"
+ homes = self.create_fake_users(
+ [user_bobby, user_suzie],
+ mock_permissions,
+ m_get_group,
+ m_get_owner,
+ m_get_permissions,
+ m_getpwnam,
+ users,
+ )
+ home_bobby = homes[0]
+ home_suzie = homes[1]
+ m_get_user_groups.side_effect = mock_get_user_groups
+
+ # /tmp/home/bobby/.ssh/authorized_keys = rsa
+ authorized_keys = self.create_user_authorized_file(
+ home_bobby, "authorized_keys", "rsa", keys
+ )
+
+ # /tmp/home/suzie/.ssh/authorized_keys = ssh-xmss@openssh.com
+ authorized_keys2 = self.create_user_authorized_file(
+ home_suzie, "authorized_keys", "ssh-xmss@openssh.com", keys
+ )
+
+ # /tmp/etc/ssh/authorized_keys = ecdsa
+ authorized_keys_global = self.create_global_authorized_file(
+ "etc/ssh/authorized_keys", "ecdsa", keys
+ )
+
+ # /tmp/sshd_config
+ options = "%s %s %s" % (
+ authorized_keys_global,
+ authorized_keys,
+ authorized_keys2,
+ )
+ sshd_config = self.create_sshd_config(options)
+
+ self.execute_and_check(
+ user_bobby, sshd_config, authorized_keys, keys, delete_keys=False
+ )
+ self.execute_and_check(user_suzie, sshd_config, authorized_keys2, keys)
+
# vi: ts=4 expandtab