summaryrefslogtreecommitdiff
path: root/tests/unittests/test_data.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/test_data.py')
-rw-r--r--tests/unittests/test_data.py52
1 files changed, 45 insertions, 7 deletions
diff --git a/tests/unittests/test_data.py b/tests/unittests/test_data.py
index e5b227f8..48475515 100644
--- a/tests/unittests/test_data.py
+++ b/tests/unittests/test_data.py
@@ -23,6 +23,7 @@ from cloudinit import log
from cloudinit.settings import (PER_INSTANCE)
from cloudinit import sources
from cloudinit import stages
+from cloudinit import user_data as ud
from cloudinit import util
INSTANCE_ID = "i-testing"
@@ -39,6 +40,25 @@ class FakeDataSource(sources.DataSource):
self.vendordata_raw = vendordata
+def count_messages(root):
+ am = 0
+ for m in root.walk():
+ if ud.is_skippable(m):
+ continue
+ am += 1
+ return am
+
+
+def gzip_text(text):
+ contents = BytesIO()
+ f = gzip.GzipFile(fileobj=contents, mode='wb')
+ f.write(util.encode_text(text))
+ f.flush()
+ f.close()
+ return contents.getvalue()
+
+
+
# FIXME: these tests shouldn't be checking log output??
# Weirddddd...
class TestConsumeUserData(helpers.FilesystemMockingTestCase):
@@ -363,12 +383,7 @@ p: 1
"""Tests that individual message gzip encoding works."""
def gzip_part(text):
- contents = BytesIO()
- f = gzip.GzipFile(fileobj=contents, mode='wb')
- f.write(util.encode_text(text))
- f.flush()
- f.close()
- return MIMEApplication(contents.getvalue(), 'gzip')
+ return MIMEApplication(gzip_text(text), 'gzip')
base_content1 = '''
#cloud-config
@@ -405,7 +420,7 @@ c: 4
ci = stages.Init()
message = MIMEBase("text", "plain")
message.set_payload("Just text")
- ci.datasource = FakeDataSource(message.as_string())
+ ci.datasource = FakeDataSource(message.as_string().encode())
with mock.patch('cloudinit.util.write_file') as mockobj:
log_file = self.capture_log(logging.WARNING)
@@ -477,3 +492,26 @@ c: 4
mock.call(outpath, script, 0o700),
mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600),
])
+
+
+class TestUDProcess(helpers.ResourceUsingTestCase):
+
+ def test_bytes_in_userdata(self):
+ msg = b'#cloud-config\napt_update: True\n'
+ ud_proc = ud.UserDataProcessor(self.getCloudPaths())
+ message = ud_proc.process(msg)
+ self.assertTrue(count_messages(message) == 1)
+
+ def test_string_in_userdata(self):
+ msg = '#cloud-config\napt_update: True\n'
+
+ ud_proc = ud.UserDataProcessor(self.getCloudPaths())
+ message = ud_proc.process(msg)
+ self.assertTrue(count_messages(message) == 1)
+
+ def test_compressed_in_userdata(self):
+ msg = gzip_text('#cloud-config\napt_update: True\n')
+
+ ud_proc = ud.UserDataProcessor(self.getCloudPaths())
+ message = ud_proc.process(msg)
+ self.assertTrue(count_messages(message) == 1)