From 86fe289ceb9b292ea91dbca056e0159e74091e47 Mon Sep 17 00:00:00 2001 From: Scott Moser Date: Fri, 1 Mar 2013 14:19:54 -0500 Subject: add some unit tests, fix an issue or two * drop the parsing of options into csv, as we were only exploding them back. That can only result in error. Just do minimal parsing. * change the parsing of key lines to: if entry is valid: * use it else try taking off options: if good, use it else fail --- cloudinit/ssh_util.py | 97 +++++++++++++++++++---------------------- tests/unittests/test_sshutil.py | 94 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 140 insertions(+), 51 deletions(-) create mode 100644 tests/unittests/test_sshutil.py diff --git a/cloudinit/ssh_util.py b/cloudinit/ssh_util.py index dd6b742f..863a63e7 100644 --- a/cloudinit/ssh_util.py +++ b/cloudinit/ssh_util.py @@ -107,62 +107,57 @@ class AuthKeyLineParser(object): i = i + 1 options = ent[0:i] - options_lst = [] - - # Now use a csv parser to pull the options - # out of the above string that we just found an endpoint for. - # - # No quoting so we don't mess up any of the quoting that - # is already there. - reader = csv.reader(StringIO(options), quoting=csv.QUOTE_NONE) - for row in reader: - for e in row: - # Only keep non-empty csv options - e = e.strip() - if e: - options_lst.append(e) - - # Now take the rest of the items before the string - # as long as there is room to do this... - toks = [] - if i + 1 < len(ent): - rest = ent[i + 1:] - toks = rest.split(None, 2) - return (options_lst, toks) - - def _form_components(self, src_line, toks, options=None): - components = {} - if len(toks) == 1: - components['base64'] = toks[0] - elif len(toks) == 2: - components['base64'] = toks[0] - components['comment'] = toks[1] - elif len(toks) == 3: - components['keytype'] = toks[0] - components['base64'] = toks[1] - components['comment'] = toks[2] - components['options'] = options - if not components: - return AuthKeyLine(src_line) - else: - return AuthKeyLine(src_line, **components) + + # Return the rest of the string in 'remain' + remain = ent[i:].lstrip() + return (options, remain) def parse(self, src_line, def_opt=None): + # modeled after opensshes auth2-pubkey.c:user_key_allowed2 line = src_line.rstrip("\r\n") if line.startswith("#") or line.strip() == '': return AuthKeyLine(src_line) - else: - ent = line.strip() - toks = ent.split(None, 3) - if len(toks) < 4: - return self._form_components(src_line, toks, def_opt) - else: - (options, toks) = self._extract_options(ent) - if options: - options = ",".join(options) - else: - options = def_opt - return self._form_components(src_line, toks, options) + + def parse_ssh_key(ent): + # return ketype, key, [comment] + toks = ent.split(None, 2) + if len(toks) < 2: + raise TypeError("To few fields: %s" % len(toks)) + if not _is_valid_ssh_keytype(toks[0]): + raise TypeError("Invalid keytype %s" % toks[0]) + + # valid key type and 2 or 3 fields: + if len(toks) == 2: + # no comment in line + toks.append("") + + return toks + + ent = line.strip() + options = None + try: + (keytype, base64, comment) = parse_ssh_key(ent) + options = def_opt + except TypeError as e: + (options, remain) = self._extract_options(ent) + try: + (keytype, base64, comment) = parse_ssh_key(remain) + except TypeError as e: + return AuthKeyLine(src_line) + + return AuthKeyLine(src_line, keytype=keytype, base64=base64, + comment=comment, options=options) + + +def _is_valid_ssh_keytype(key): + valid = ("rsa", "dsa", "ssh-rsa", "ssh-dss", "ecdsa", + "ssh-rsa-cert-v00@openssh.com", "ssh-dss-cert-v00@openssh.com", + "ssh-rsa-cert-v01@openssh.com", "ssh-dss-cert-v01@openssh.com", + "ecdsa-sha2-nistp256-cert-v01@openssh.com", + "ecdsa-sha2-nistp384-cert-v01@openssh.com", + "ecdsa-sha2-nistp521-cert-v01@openssh.com") + + return key in valid def parse_authorized_keys(fname): diff --git a/tests/unittests/test_sshutil.py b/tests/unittests/test_sshutil.py new file mode 100644 index 00000000..4564d9be --- /dev/null +++ b/tests/unittests/test_sshutil.py @@ -0,0 +1,94 @@ +from unittest import TestCase +from cloudinit import ssh_util + + +VALID_CONTENT = { + 'dsa': ( + "AAAAB3NzaC1kc3MAAACBAIrjOQSlSea19bExXBMBKBvcLhBoVvNBjCppNzllipF" + "W4jgIOMcNanULRrZGjkOKat6MWJNetSbV1E6IOFDQ16rQgsh/OvYU9XhzM8seLa" + "A21VszZuhIV7/2DE3vxu7B54zVzueG1O1Deq6goQCRGWBUnqO2yluJiG4HzrnDa" + "jzRAAAAFQDMPO96qXd4F5A+5b2f2MO7SpVomQAAAIBpC3K2zIbDLqBBs1fn7rsv" + "KcJvwihdlVjG7UXsDB76P2GNqVG+IlYPpJZ8TO/B/fzTMtrdXp9pSm9OY1+BgN4" + "REsZ2WNcvfgY33aWaEM+ieCcQigvxrNAF2FTVcbUIIxAn6SmHuQSWrLSfdHc8H7" + "hsrgeUPPdzjBD/cv2ZmqwZ1AAAAIAplIsScrJut5wJMgyK1JG0Kbw9JYQpLe95P" + "obB069g8+mYR8U0fysmTEdR44mMu0VNU5E5OhTYoTGfXrVrkR134LqFM2zpVVbE" + "JNDnIqDHxTkc6LY2vu8Y2pQ3/bVnllZZOda2oD5HQ7ovygQa6CH+fbaZHbdDUX/" + "5z7u2rVAlDw==" + ), + 'ecdsa': ( + "AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBITrGBB3cgJ" + "J7fPxvtMW9H3oRisNpJ3OAslxZeyP7I0A9BPAW0RQIwHVtVnM7zrp4nI+JLZov/" + "Ql7lc2leWL7CY=" + ), + 'rsa': ( + "AAAAB3NzaC1yc2EAAAABIwAAAQEA3I7VUf2l5gSn5uavROsc5HRDpZdQueUq5oz" + "emNSj8T7enqKHOEaFoU2VoPgGEWC9RyzSQVeyD6s7APMcE82EtmW4skVEgEGSbD" + "c1pvxzxtchBj78hJP6Cf5TCMFSXw+Fz5rF1dR23QDbN1mkHs7adr8GW4kSWqU7Q" + "7NDwfIrJJtO7Hi42GyXtvEONHbiRPOe8stqUly7MvUoN+5kfjBM8Qqpfl2+FNhT" + "YWpMfYdPUnE7u536WqzFmsaqJctz3gBxH9Ex7dFtrxR4qiqEr9Qtlu3xGn7Bw07" + "/+i1D+ey3ONkZLN+LQ714cgj8fRS4Hj29SCmXp5Kt5/82cD/VN3NtHw==" + ), +} + +TEST_OPTIONS = ("no-port-forwarding,no-agent-forwarding,no-X11-forwarding," + 'command="echo \'Please login as the user \"ubuntu\" rather than the' + 'user \"root\".\';echo;sleep 10"') + +class TestAuthKeyLineParser(TestCase): + def test_simple_parse(self): + # test key line with common 3 fields (keytype, base64, comment) + parser = ssh_util.AuthKeyLineParser() + for ktype in ['rsa', 'ecdsa', 'dsa']: + content = VALID_CONTENT[ktype] + comment = 'user-%s@host' % ktype + line = ' '.join((ktype, content, comment,)) + key = parser.parse(line) + + self.assertEqual(key.base64, content) + self.assertFalse(key.options) + self.assertEqual(key.comment, comment) + self.assertEqual(key.keytype, ktype) + + def test_parse_no_comment(self): + # test key line with key type and base64 only + parser = ssh_util.AuthKeyLineParser() + for ktype in ['rsa', 'ecdsa', 'dsa']: + content = VALID_CONTENT[ktype] + line = ' '.join((ktype, content,)) + key = parser.parse(line) + + self.assertEqual(key.base64, content) + self.assertFalse(key.options) + self.assertFalse(key.comment) + self.assertEqual(key.keytype, ktype) + + def test_parse_with_options(self): + # test key line with options in it + parser = ssh_util.AuthKeyLineParser() + options = TEST_OPTIONS + for ktype in ['rsa', 'ecdsa', 'dsa']: + content = VALID_CONTENT[ktype] + comment = 'user-%s@host' % ktype + line = ' '.join((options, ktype, content, comment,)) + key = parser.parse(line) + + self.assertEqual(key.base64, content) + self.assertEqual(key.options, options) + self.assertEqual(key.comment, comment) + self.assertEqual(key.keytype, ktype) + + def test_parse_with_defopt(self): + # test key line with key type and base64 only + parser = ssh_util.AuthKeyLineParser() + for ktype in ['rsa', 'ecdsa', 'dsa']: + content = VALID_CONTENT[ktype] + line = ' '.join((ktype, content,)) + myopts = "no-port-forwarding,no-agent-forwarding" + key = parser.parse(line, myopts) + + self.assertEqual(key.base64, content) + self.assertEqual(key.options, myopts) + self.assertFalse(key.comment) + self.assertEqual(key.keytype, ktype) + +# vi: ts=4 expandtab -- cgit v1.2.3