diff options
Diffstat (limited to 'tests/unittests/test_data.py')
-rw-r--r-- | tests/unittests/test_data.py | 52 |
1 files changed, 45 insertions, 7 deletions
diff --git a/tests/unittests/test_data.py b/tests/unittests/test_data.py index e5b227f8..48475515 100644 --- a/tests/unittests/test_data.py +++ b/tests/unittests/test_data.py @@ -23,6 +23,7 @@ from cloudinit import log from cloudinit.settings import (PER_INSTANCE) from cloudinit import sources from cloudinit import stages +from cloudinit import user_data as ud from cloudinit import util INSTANCE_ID = "i-testing" @@ -39,6 +40,25 @@ class FakeDataSource(sources.DataSource): self.vendordata_raw = vendordata +def count_messages(root): + am = 0 + for m in root.walk(): + if ud.is_skippable(m): + continue + am += 1 + return am + + +def gzip_text(text): + contents = BytesIO() + f = gzip.GzipFile(fileobj=contents, mode='wb') + f.write(util.encode_text(text)) + f.flush() + f.close() + return contents.getvalue() + + + # FIXME: these tests shouldn't be checking log output?? # Weirddddd... class TestConsumeUserData(helpers.FilesystemMockingTestCase): @@ -363,12 +383,7 @@ p: 1 """Tests that individual message gzip encoding works.""" def gzip_part(text): - contents = BytesIO() - f = gzip.GzipFile(fileobj=contents, mode='wb') - f.write(util.encode_text(text)) - f.flush() - f.close() - return MIMEApplication(contents.getvalue(), 'gzip') + return MIMEApplication(gzip_text(text), 'gzip') base_content1 = ''' #cloud-config @@ -405,7 +420,7 @@ c: 4 ci = stages.Init() message = MIMEBase("text", "plain") message.set_payload("Just text") - ci.datasource = FakeDataSource(message.as_string()) + ci.datasource = FakeDataSource(message.as_string().encode()) with mock.patch('cloudinit.util.write_file') as mockobj: log_file = self.capture_log(logging.WARNING) @@ -477,3 +492,26 @@ c: 4 mock.call(outpath, script, 0o700), mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600), ]) + + +class TestUDProcess(helpers.ResourceUsingTestCase): + + def test_bytes_in_userdata(self): + msg = b'#cloud-config\napt_update: True\n' + ud_proc = ud.UserDataProcessor(self.getCloudPaths()) + message = ud_proc.process(msg) + self.assertTrue(count_messages(message) == 1) + + def test_string_in_userdata(self): + msg = '#cloud-config\napt_update: True\n' + + ud_proc = ud.UserDataProcessor(self.getCloudPaths()) + message = ud_proc.process(msg) + self.assertTrue(count_messages(message) == 1) + + def test_compressed_in_userdata(self): + msg = gzip_text('#cloud-config\napt_update: True\n') + + ud_proc = ud.UserDataProcessor(self.getCloudPaths()) + message = ud_proc.process(msg) + self.assertTrue(count_messages(message) == 1) |