diff options
-rw-r--r-- | cloudinit/user_data.py | 2 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_azure.py | 14 | ||||
-rw-r--r-- | tests/unittests/test_udprocess.py | 30 |
3 files changed, 43 insertions, 3 deletions
diff --git a/cloudinit/user_data.py b/cloudinit/user_data.py index fe343d0c..b11894ce 100644 --- a/cloudinit/user_data.py +++ b/cloudinit/user_data.py @@ -336,7 +336,7 @@ def convert_string(raw_data, headers=None): raw_data = '' if not headers: headers = {} - data = util.decomp_gzip(raw_data) + data = util.decode_binary(util.decomp_gzip(raw_data)) if "mime-version:" in data[0:4096].lower(): msg = email.message_from_string(data) for (key, val) in headers.items(): diff --git a/tests/unittests/test_datasource/test_azure.py b/tests/unittests/test_datasource/test_azure.py index 38d70fcd..8112c69b 100644 --- a/tests/unittests/test_datasource/test_azure.py +++ b/tests/unittests/test_datasource/test_azure.py @@ -1,5 +1,5 @@ from cloudinit import helpers -from cloudinit.util import b64e, load_file +from cloudinit.util import b64e, decode_binary, load_file from cloudinit.sources import DataSourceAzure from ..helpers import TestCase, populate_dir @@ -231,9 +231,19 @@ class TestAzureDataSource(TestCase): self.assertEqual(defuser['passwd'], crypt.crypt(odata['UserPassword'], defuser['passwd'][0:pos])) + def test_userdata_plain(self): + mydata = "FOOBAR" + odata = {'UserData': {'text': mydata, 'encoding': 'plain'}} + data = {'ovfcontent': construct_valid_ovf_env(data=odata)} + + dsrc = self._get_ds(data) + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertEqual(decode_binary(dsrc.userdata_raw), mydata) + def test_userdata_found(self): mydata = "FOOBAR" - odata = {'UserData': b64e(mydata)} + odata = {'UserData': {'text': b64e(mydata), 'encoding': 'base64'}} data = {'ovfcontent': construct_valid_ovf_env(data=odata)} dsrc = self._get_ds(data) diff --git a/tests/unittests/test_udprocess.py b/tests/unittests/test_udprocess.py new file mode 100644 index 00000000..39adbf9d --- /dev/null +++ b/tests/unittests/test_udprocess.py @@ -0,0 +1,30 @@ +from . import helpers + +from six.moves import filterfalse + +from cloudinit import user_data as ud +from cloudinit import util + +def count_messages(root): + am = 0 + for m in root.walk(): + if ud.is_skippable(m): + continue + am += 1 + return am + + +class TestUDProcess(helpers.ResourceUsingTestCase): + + def testBytesInPayload(self): + msg = b'#cloud-config\napt_update: True\n' + ud_proc = ud.UserDataProcessor(self.getCloudPaths()) + message = ud_proc.process(msg) + self.assertTrue(count_messages(message) == 1) + + def testStringInPayload(self): + msg = '#cloud-config\napt_update: True\n' + + ud_proc = ud.UserDataProcessor(self.getCloudPaths()) + message = ud_proc.process(msg) + self.assertTrue(count_messages(message) == 1) |