diff options
-rw-r--r-- | ChangeLog | 1 | ||||
-rw-r--r-- | cloudinit/config/cc_ssh.py | 4 | ||||
-rw-r--r-- | cloudinit/ssh_util.py | 112 | ||||
-rw-r--r-- | tests/unittests/test_sshutil.py | 100 |
4 files changed, 154 insertions, 63 deletions
@@ -501,4 +501,5 @@ - make the message on 'disable_root' more clear (LP: #672417) - do not require public key if private is given in ssh cloud-config (LP: #648905) + - fix issue when writing ssh keys to .ssh/authorized_keys (LP: #1136343) # vi: syntax=text textwidth=79 diff --git a/cloudinit/config/cc_ssh.py b/cloudinit/config/cc_ssh.py index b623d476..7ef20d9f 100644 --- a/cloudinit/config/cc_ssh.py +++ b/cloudinit/config/cc_ssh.py @@ -126,7 +126,7 @@ def apply_credentials(keys, user, disable_root, disable_root_opts): keys = set(keys) if user: - ssh_util.setup_user_keys(keys, user, '') + ssh_util.setup_user_keys(keys, user) if disable_root: if not user: @@ -135,4 +135,4 @@ def apply_credentials(keys, user, disable_root, disable_root_opts): else: key_prefix = '' - ssh_util.setup_user_keys(keys, 'root', key_prefix) + ssh_util.setup_user_keys(keys, 'root', options=key_prefix) diff --git a/cloudinit/ssh_util.py b/cloudinit/ssh_util.py index dd6b742f..65fab117 100644 --- a/cloudinit/ssh_util.py +++ b/cloudinit/ssh_util.py @@ -33,6 +33,14 @@ LOG = logging.getLogger(__name__) # See: man sshd_config DEF_SSHD_CFG = "/etc/ssh/sshd_config" +# taken from openssh source key.c/key_type_from_name +VALID_KEY_TYPES = ("rsa", "dsa", "ssh-rsa", "ssh-dss", "ecdsa", + "ssh-rsa-cert-v00@openssh.com", "ssh-dss-cert-v00@openssh.com", + "ssh-rsa-cert-v00@openssh.com", "ssh-dss-cert-v00@openssh.com", + "ssh-rsa-cert-v01@openssh.com", "ssh-dss-cert-v01@openssh.com", + "ecdsa-sha2-nistp256-cert-v01@openssh.com", + "ecdsa-sha2-nistp384-cert-v01@openssh.com", + "ecdsa-sha2-nistp521-cert-v01@openssh.com") class AuthKeyLine(object): def __init__(self, source, keytype=None, base64=None, @@ -43,11 +51,8 @@ class AuthKeyLine(object): self.keytype = keytype self.source = source - def empty(self): - if (not self.base64 and - not self.comment and not self.keytype and not self.options): - return True - return False + def valid(self): + return (self.base64 and self.keytype) def __str__(self): toks = [] @@ -107,62 +112,47 @@ class AuthKeyLineParser(object): i = i + 1 options = ent[0:i] - options_lst = [] - - # Now use a csv parser to pull the options - # out of the above string that we just found an endpoint for. - # - # No quoting so we don't mess up any of the quoting that - # is already there. - reader = csv.reader(StringIO(options), quoting=csv.QUOTE_NONE) - for row in reader: - for e in row: - # Only keep non-empty csv options - e = e.strip() - if e: - options_lst.append(e) - - # Now take the rest of the items before the string - # as long as there is room to do this... - toks = [] - if i + 1 < len(ent): - rest = ent[i + 1:] - toks = rest.split(None, 2) - return (options_lst, toks) - - def _form_components(self, src_line, toks, options=None): - components = {} - if len(toks) == 1: - components['base64'] = toks[0] - elif len(toks) == 2: - components['base64'] = toks[0] - components['comment'] = toks[1] - elif len(toks) == 3: - components['keytype'] = toks[0] - components['base64'] = toks[1] - components['comment'] = toks[2] - components['options'] = options - if not components: - return AuthKeyLine(src_line) - else: - return AuthKeyLine(src_line, **components) - def parse(self, src_line, def_opt=None): + # Return the rest of the string in 'remain' + remain = ent[i:].lstrip() + return (options, remain) + + def parse(self, src_line, options=None): + # modeled after opensshes auth2-pubkey.c:user_key_allowed2 line = src_line.rstrip("\r\n") if line.startswith("#") or line.strip() == '': return AuthKeyLine(src_line) - else: - ent = line.strip() - toks = ent.split(None, 3) - if len(toks) < 4: - return self._form_components(src_line, toks, def_opt) - else: - (options, toks) = self._extract_options(ent) - if options: - options = ",".join(options) - else: - options = def_opt - return self._form_components(src_line, toks, options) + + def parse_ssh_key(ent): + # return ketype, key, [comment] + toks = ent.split(None, 2) + if len(toks) < 2: + raise TypeError("To few fields: %s" % len(toks)) + if toks[0] not in VALID_KEY_TYPES: + raise TypeError("Invalid keytype %s" % toks[0]) + + # valid key type and 2 or 3 fields: + if len(toks) == 2: + # no comment in line + toks.append("") + + return toks + + ent = line.strip() + try: + (keytype, base64, comment) = parse_ssh_key(ent) + except TypeError as e: + (keyopts, remain) = self._extract_options(ent) + if options is None: + options = keyopts + + try: + (keytype, base64, comment) = parse_ssh_key(remain) + except TypeError as e: + return AuthKeyLine(src_line) + + return AuthKeyLine(src_line, keytype=keytype, base64=base64, + comment=comment, options=options) def parse_authorized_keys(fname): @@ -186,11 +176,11 @@ def update_authorized_keys(old_entries, keys): for i in range(0, len(old_entries)): ent = old_entries[i] - if ent.empty() or not ent.base64: + if not ent.valid(): continue # Replace those with the same base64 for k in keys: - if k.empty() or not k.base64: + if not ent.valid(): continue if k.base64 == ent.base64: # Replace it with our better one @@ -249,7 +239,7 @@ def extract_authorized_keys(username): return (auth_key_fn, parse_authorized_keys(auth_key_fn)) -def setup_user_keys(keys, username, key_prefix): +def setup_user_keys(keys, username, options=None): # Make sure the users .ssh dir is setup accordingly (ssh_dir, pwent) = users_ssh_info(username) if not os.path.isdir(ssh_dir): @@ -260,7 +250,7 @@ def setup_user_keys(keys, username, key_prefix): parser = AuthKeyLineParser() key_entries = [] for k in keys: - key_entries.append(parser.parse(str(k), def_opt=key_prefix)) + key_entries.append(parser.parse(str(k), options=options)) # Extract the old and make the new (auth_key_fn, auth_key_entries) = extract_authorized_keys(username) diff --git a/tests/unittests/test_sshutil.py b/tests/unittests/test_sshutil.py new file mode 100644 index 00000000..2415d06f --- /dev/null +++ b/tests/unittests/test_sshutil.py @@ -0,0 +1,100 @@ +from unittest import TestCase +from cloudinit import ssh_util + + +VALID_CONTENT = { + 'dsa': ( + "AAAAB3NzaC1kc3MAAACBAIrjOQSlSea19bExXBMBKBvcLhBoVvNBjCppNzllipF" + "W4jgIOMcNanULRrZGjkOKat6MWJNetSbV1E6IOFDQ16rQgsh/OvYU9XhzM8seLa" + "A21VszZuhIV7/2DE3vxu7B54zVzueG1O1Deq6goQCRGWBUnqO2yluJiG4HzrnDa" + "jzRAAAAFQDMPO96qXd4F5A+5b2f2MO7SpVomQAAAIBpC3K2zIbDLqBBs1fn7rsv" + "KcJvwihdlVjG7UXsDB76P2GNqVG+IlYPpJZ8TO/B/fzTMtrdXp9pSm9OY1+BgN4" + "REsZ2WNcvfgY33aWaEM+ieCcQigvxrNAF2FTVcbUIIxAn6SmHuQSWrLSfdHc8H7" + "hsrgeUPPdzjBD/cv2ZmqwZ1AAAAIAplIsScrJut5wJMgyK1JG0Kbw9JYQpLe95P" + "obB069g8+mYR8U0fysmTEdR44mMu0VNU5E5OhTYoTGfXrVrkR134LqFM2zpVVbE" + "JNDnIqDHxTkc6LY2vu8Y2pQ3/bVnllZZOda2oD5HQ7ovygQa6CH+fbaZHbdDUX/" + "5z7u2rVAlDw==" + ), + 'ecdsa': ( + "AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBITrGBB3cgJ" + "J7fPxvtMW9H3oRisNpJ3OAslxZeyP7I0A9BPAW0RQIwHVtVnM7zrp4nI+JLZov/" + "Ql7lc2leWL7CY=" + ), + 'rsa': ( + "AAAAB3NzaC1yc2EAAAABIwAAAQEA3I7VUf2l5gSn5uavROsc5HRDpZdQueUq5oz" + "emNSj8T7enqKHOEaFoU2VoPgGEWC9RyzSQVeyD6s7APMcE82EtmW4skVEgEGSbD" + "c1pvxzxtchBj78hJP6Cf5TCMFSXw+Fz5rF1dR23QDbN1mkHs7adr8GW4kSWqU7Q" + "7NDwfIrJJtO7Hi42GyXtvEONHbiRPOe8stqUly7MvUoN+5kfjBM8Qqpfl2+FNhT" + "YWpMfYdPUnE7u536WqzFmsaqJctz3gBxH9Ex7dFtrxR4qiqEr9Qtlu3xGn7Bw07" + "/+i1D+ey3ONkZLN+LQ714cgj8fRS4Hj29SCmXp5Kt5/82cD/VN3NtHw==" + ), +} + +TEST_OPTIONS = ("no-port-forwarding,no-agent-forwarding,no-X11-forwarding," + 'command="echo \'Please login as the user \"ubuntu\" rather than the' + 'user \"root\".\';echo;sleep 10"') + +class TestAuthKeyLineParser(TestCase): + def test_simple_parse(self): + # test key line with common 3 fields (keytype, base64, comment) + parser = ssh_util.AuthKeyLineParser() + for ktype in ['rsa', 'ecdsa', 'dsa']: + content = VALID_CONTENT[ktype] + comment = 'user-%s@host' % ktype + line = ' '.join((ktype, content, comment,)) + key = parser.parse(line) + + self.assertEqual(key.base64, content) + self.assertFalse(key.options) + self.assertEqual(key.comment, comment) + self.assertEqual(key.keytype, ktype) + + def test_parse_no_comment(self): + # test key line with key type and base64 only + parser = ssh_util.AuthKeyLineParser() + for ktype in ['rsa', 'ecdsa', 'dsa']: + content = VALID_CONTENT[ktype] + line = ' '.join((ktype, content,)) + key = parser.parse(line) + + self.assertEqual(key.base64, content) + self.assertFalse(key.options) + self.assertFalse(key.comment) + self.assertEqual(key.keytype, ktype) + + def test_parse_with_keyoptions(self): + # test key line with options in it + parser = ssh_util.AuthKeyLineParser() + options = TEST_OPTIONS + for ktype in ['rsa', 'ecdsa', 'dsa']: + content = VALID_CONTENT[ktype] + comment = 'user-%s@host' % ktype + line = ' '.join((options, ktype, content, comment,)) + key = parser.parse(line) + + self.assertEqual(key.base64, content) + self.assertEqual(key.options, options) + self.assertEqual(key.comment, comment) + self.assertEqual(key.keytype, ktype) + + def test_parse_with_options_passed_in(self): + # test key line with key type and base64 only + parser = ssh_util.AuthKeyLineParser() + + baseline = ' '.join(("rsa", VALID_CONTENT['rsa'], "user@host")) + myopts = "no-port-forwarding,no-agent-forwarding" + + key = parser.parse("allowedopt" + " " + baseline) + self.assertEqual(key.options, "allowedopt") + + key = parser.parse("overridden_opt " + baseline, options=myopts) + self.assertEqual(key.options, myopts) + + def test_parse_invalid_keytype(self): + parser = ssh_util.AuthKeyLineParser() + key = parser.parse(' '.join(["badkeytype", VALID_CONTENT['rsa']])) + + self.assertFalse(key.valid()) + + +# vi: ts=4 expandtab |