summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cloudinit/user_data.py22
-rw-r--r--tests/unittests/test_data.py11
2 files changed, 23 insertions, 10 deletions
diff --git a/cloudinit/user_data.py b/cloudinit/user_data.py
index 8f6aba1e..ed83d2d8 100644
--- a/cloudinit/user_data.py
+++ b/cloudinit/user_data.py
@@ -337,8 +337,10 @@ def is_skippable(part):
# Coverts a raw string into a mime message
def convert_string(raw_data, content_type=NOT_MULTIPART_TYPE):
+ """convert a string (more likely bytes) or a message into
+ a mime message."""
if not raw_data:
- raw_data = ''
+ raw_data = b''
def create_binmsg(data, content_type):
maintype, subtype = content_type.split("/", 1)
@@ -346,15 +348,17 @@ def convert_string(raw_data, content_type=NOT_MULTIPART_TYPE):
msg.set_payload(data)
return msg
- try:
- data = util.decode_binary(util.decomp_gzip(raw_data))
- if "mime-version:" in data[0:4096].lower():
- msg = util.message_from_string(data)
- else:
- msg = create_binmsg(data, content_type)
- except UnicodeDecodeError:
- msg = create_binmsg(raw_data, content_type)
+ if isinstance(raw_data, six.text_type):
+ bdata = raw_data.encode('utf-8')
+ else:
+ bdata = raw_data
+ bdata = util.decomp_gzip(bdata, decode=False)
+ if b"mime-version:" in bdata[0:4096].lower():
+ msg = util.message_from_string(bdata.decode('utf-8'))
+ else:
+ msg = create_binmsg(bdata, content_type)
return msg
+
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_data.py b/tests/unittests/test_data.py
index 91d35cb8..3efe7adf 100644
--- a/tests/unittests/test_data.py
+++ b/tests/unittests/test_data.py
@@ -606,8 +606,10 @@ class TestUDProcess(helpers.ResourceUsingTestCase):
class TestConvertString(helpers.TestCase):
+
def test_handles_binary_non_utf8_decodable(self):
- blob = b'\x32\x99'
+ """Printable unicode (not utf8-decodable) is safely converted."""
+ blob = b'#!/bin/bash\necho \xc3\x84\n'
msg = ud.convert_string(blob)
self.assertEqual(blob, msg.get_payload(decode=True))
@@ -621,6 +623,13 @@ class TestConvertString(helpers.TestCase):
msg = ud.convert_string(text)
self.assertEqual(text, msg.get_payload(decode=False))
+ def test_handle_mime_parts(self):
+ """Mime parts are properly returned as a mime message."""
+ message = MIMEBase("text", "plain")
+ message.set_payload("Just text")
+ msg = ud.convert_string(str(message))
+ self.assertEqual("Just text", msg.get_payload(decode=False))
+
class TestFetchBaseConfig(helpers.TestCase):
def test_only_builtin_gets_builtin(self):