diff options
Diffstat (limited to 'tests/unittests/test_data.py')
-rw-r--r-- | tests/unittests/test_data.py | 537 |
1 files changed, 304 insertions, 233 deletions
diff --git a/tests/unittests/test_data.py b/tests/unittests/test_data.py index fb2b55e8..a5018a42 100644 --- a/tests/unittests/test_data.py +++ b/tests/unittests/test_data.py @@ -5,39 +5,33 @@ import gzip import logging import os -from io import BytesIO, StringIO -from unittest import mock - from email import encoders from email.mime.application import MIMEApplication from email.mime.base import MIMEBase from email.mime.multipart import MIMEMultipart +from io import BytesIO, StringIO +from unittest import mock import httpretty from cloudinit import handlers from cloudinit import helpers as c_helpers -from cloudinit import log -from cloudinit.settings import (PER_INSTANCE) -from cloudinit import sources -from cloudinit import stages +from cloudinit import log, safeyaml, sources, stages from cloudinit import user_data as ud -from cloudinit import safeyaml from cloudinit import util - -from cloudinit.tests import helpers - +from cloudinit.settings import PER_INSTANCE +from tests.unittests import helpers INSTANCE_ID = "i-testing" class FakeDataSource(sources.DataSource): - - def __init__(self, userdata=None, vendordata=None): + def __init__(self, userdata=None, vendordata=None, vendordata2=None): sources.DataSource.__init__(self, {}, None, None) - self.metadata = {'instance-id': INSTANCE_ID} + self.metadata = {"instance-id": INSTANCE_ID} self.userdata_raw = userdata self.vendordata_raw = vendordata + self.vendordata2_raw = vendordata2 def count_messages(root): @@ -51,7 +45,7 @@ def count_messages(root): def gzip_text(text): contents = BytesIO() - f = gzip.GzipFile(fileobj=contents, mode='wb') + f = gzip.GzipFile(fileobj=contents, mode="wb") f.write(util.encode_text(text)) f.flush() f.close() @@ -61,7 +55,6 @@ def gzip_text(text): # FIXME: these tests shouldn't be checking log output?? # Weirddddd... class TestConsumeUserData(helpers.FilesystemMockingTestCase): - def setUp(self): super(TestConsumeUserData, self).setUp() self._log = None @@ -86,13 +79,13 @@ class TestConsumeUserData(helpers.FilesystemMockingTestCase): return log_file def test_simple_jsonp(self): - blob = ''' + blob = """ #cloud-config-jsonp [ { "op": "add", "path": "/baz", "value": "qux" }, { "op": "add", "path": "/bar", "value": "qux2" } ] -''' +""" ci = stages.Init() ci.datasource = FakeDataSource(blob) @@ -102,64 +95,84 @@ class TestConsumeUserData(helpers.FilesystemMockingTestCase): cc_contents = util.load_file(ci.paths.get_ipath("cloud_config")) cc = util.load_yaml(cc_contents) self.assertEqual(2, len(cc)) - self.assertEqual('qux', cc['baz']) - self.assertEqual('qux2', cc['bar']) + self.assertEqual("qux", cc["baz"]) + self.assertEqual("qux2", cc["bar"]) - def test_simple_jsonp_vendor_and_user(self): + def test_simple_jsonp_vendor_and_vendor2_and_user(self): # test that user-data wins over vendor - user_blob = ''' + user_blob = """ #cloud-config-jsonp [ { "op": "add", "path": "/baz", "value": "qux" }, - { "op": "add", "path": "/bar", "value": "qux2" } + { "op": "add", "path": "/bar", "value": "qux2" }, + { "op": "add", "path": "/foobar", "value": "qux3" } ] -''' - vendor_blob = ''' +""" + vendor_blob = """ #cloud-config-jsonp [ { "op": "add", "path": "/baz", "value": "quxA" }, { "op": "add", "path": "/bar", "value": "quxB" }, - { "op": "add", "path": "/foo", "value": "quxC" } + { "op": "add", "path": "/foo", "value": "quxC" }, + { "op": "add", "path": "/corge", "value": "quxEE" } +] +""" + vendor2_blob = """ +#cloud-config-jsonp +[ + { "op": "add", "path": "/corge", "value": "quxD" }, + { "op": "add", "path": "/grault", "value": "quxFF" }, + { "op": "add", "path": "/foobar", "value": "quxGG" } ] -''' +""" self.reRoot() initer = stages.Init() - initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob) + initer.datasource = FakeDataSource( + user_blob, vendordata=vendor_blob, vendordata2=vendor2_blob + ) initer.read_cfg() initer.initialize() initer.fetch() initer.instancify() initer.update() - initer.cloudify().run('consume_data', - initer.consume_data, - args=[PER_INSTANCE], - freq=PER_INSTANCE) + initer.cloudify().run( + "consume_data", + initer.consume_data, + args=[PER_INSTANCE], + freq=PER_INSTANCE, + ) mods = stages.Modules(initer) - (_which_ran, _failures) = mods.run_section('cloud_init_modules') + (_which_ran, _failures) = mods.run_section("cloud_init_modules") cfg = mods.cfg - self.assertIn('vendor_data', cfg) - self.assertEqual('qux', cfg['baz']) - self.assertEqual('qux2', cfg['bar']) - self.assertEqual('quxC', cfg['foo']) + self.assertIn("vendor_data", cfg) + self.assertIn("vendor_data2", cfg) + # Confirm that vendordata2 overrides vendordata, and that + # userdata overrides both + self.assertEqual("qux", cfg["baz"]) + self.assertEqual("qux2", cfg["bar"]) + self.assertEqual("qux3", cfg["foobar"]) + self.assertEqual("quxC", cfg["foo"]) + self.assertEqual("quxD", cfg["corge"]) + self.assertEqual("quxFF", cfg["grault"]) def test_simple_jsonp_no_vendor_consumed(self): # make sure that vendor data is not consumed - user_blob = ''' + user_blob = """ #cloud-config-jsonp [ { "op": "add", "path": "/baz", "value": "qux" }, { "op": "add", "path": "/bar", "value": "qux2" }, { "op": "add", "path": "/vendor_data", "value": {"enabled": "false"}} ] -''' - vendor_blob = ''' +""" + vendor_blob = """ #cloud-config-jsonp [ { "op": "add", "path": "/baz", "value": "quxA" }, { "op": "add", "path": "/bar", "value": "quxB" }, { "op": "add", "path": "/foo", "value": "quxC" } ] -''' +""" self.reRoot() initer = stages.Init() initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob) @@ -168,35 +181,37 @@ class TestConsumeUserData(helpers.FilesystemMockingTestCase): initer.fetch() initer.instancify() initer.update() - initer.cloudify().run('consume_data', - initer.consume_data, - args=[PER_INSTANCE], - freq=PER_INSTANCE) + initer.cloudify().run( + "consume_data", + initer.consume_data, + args=[PER_INSTANCE], + freq=PER_INSTANCE, + ) mods = stages.Modules(initer) - (_which_ran, _failures) = mods.run_section('cloud_init_modules') + (_which_ran, _failures) = mods.run_section("cloud_init_modules") cfg = mods.cfg - self.assertEqual('qux', cfg['baz']) - self.assertEqual('qux2', cfg['bar']) - self.assertNotIn('foo', cfg) + self.assertEqual("qux", cfg["baz"]) + self.assertEqual("qux2", cfg["bar"]) + self.assertNotIn("foo", cfg) def test_mixed_cloud_config(self): - blob_cc = ''' + blob_cc = """ #cloud-config a: b c: d -''' +""" message_cc = MIMEBase("text", "cloud-config") message_cc.set_payload(blob_cc) - blob_jp = ''' + blob_jp = """ #cloud-config-jsonp [ { "op": "replace", "path": "/a", "value": "c" }, { "op": "remove", "path": "/c" } ] -''' +""" - message_jp = MIMEBase('text', "cloud-config-jsonp") + message_jp = MIMEBase("text", "cloud-config-jsonp") message_jp.set_payload(blob_jp) message = MIMEMultipart() @@ -211,26 +226,26 @@ c: d cc_contents = util.load_file(ci.paths.get_ipath("cloud_config")) cc = util.load_yaml(cc_contents) self.assertEqual(1, len(cc)) - self.assertEqual('c', cc['a']) + self.assertEqual("c", cc["a"]) def test_cloud_config_as_x_shell_script(self): - blob_cc = ''' + blob_cc = """ #cloud-config a: b c: d -''' +""" message_cc = MIMEBase("text", "x-shellscript") message_cc.set_payload(blob_cc) - blob_jp = ''' + blob_jp = """ #cloud-config-jsonp [ { "op": "replace", "path": "/a", "value": "c" }, { "op": "remove", "path": "/c" } ] -''' +""" - message_jp = MIMEBase('text', "cloud-config-jsonp") + message_jp = MIMEBase("text", "cloud-config-jsonp") message_jp.set_payload(blob_jp) message = MIMEMultipart() @@ -245,19 +260,19 @@ c: d cc_contents = util.load_file(ci.paths.get_ipath("cloud_config")) cc = util.load_yaml(cc_contents) self.assertEqual(1, len(cc)) - self.assertEqual('c', cc['a']) + self.assertEqual("c", cc["a"]) def test_vendor_user_yaml_cloud_config(self): - vendor_blob = ''' + vendor_blob = """ #cloud-config a: b name: vendor run: - x - y -''' +""" - user_blob = ''' + user_blob = """ #cloud-config a: c vendor_data: @@ -266,7 +281,7 @@ vendor_data: name: user run: - z -''' +""" self.reRoot() initer = stages.Init() initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob) @@ -275,108 +290,122 @@ run: initer.fetch() initer.instancify() initer.update() - initer.cloudify().run('consume_data', - initer.consume_data, - args=[PER_INSTANCE], - freq=PER_INSTANCE) + initer.cloudify().run( + "consume_data", + initer.consume_data, + args=[PER_INSTANCE], + freq=PER_INSTANCE, + ) mods = stages.Modules(initer) - (_which_ran, _failures) = mods.run_section('cloud_init_modules') + (_which_ran, _failures) = mods.run_section("cloud_init_modules") cfg = mods.cfg - self.assertIn('vendor_data', cfg) - self.assertEqual('c', cfg['a']) - self.assertEqual('user', cfg['name']) - self.assertNotIn('x', cfg['run']) - self.assertNotIn('y', cfg['run']) - self.assertIn('z', cfg['run']) + self.assertIn("vendor_data", cfg) + self.assertEqual("c", cfg["a"]) + self.assertEqual("user", cfg["name"]) + self.assertNotIn("x", cfg["run"]) + self.assertNotIn("y", cfg["run"]) + self.assertIn("z", cfg["run"]) def test_vendordata_script(self): - vendor_blob = ''' + vendor_blob = """ #!/bin/bash echo "test" -''' +""" + vendor2_blob = """ +#!/bin/bash +echo "dynamic test" +""" - user_blob = ''' + user_blob = """ #cloud-config vendor_data: enabled: True prefix: /bin/true -''' +""" new_root = self.reRoot() initer = stages.Init() - initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob) + initer.datasource = FakeDataSource( + user_blob, vendordata=vendor_blob, vendordata2=vendor2_blob + ) initer.read_cfg() initer.initialize() initer.fetch() initer.instancify() initer.update() - initer.cloudify().run('consume_data', - initer.consume_data, - args=[PER_INSTANCE], - freq=PER_INSTANCE) + initer.cloudify().run( + "consume_data", + initer.consume_data, + args=[PER_INSTANCE], + freq=PER_INSTANCE, + ) mods = stages.Modules(initer) - (_which_ran, _failures) = mods.run_section('cloud_init_modules') - vendor_script = initer.paths.get_ipath_cur('vendor_scripts') + (_which_ran, _failures) = mods.run_section("cloud_init_modules") + vendor_script = initer.paths.get_ipath_cur("vendor_scripts") vendor_script_fns = "%s%s/part-001" % (new_root, vendor_script) self.assertTrue(os.path.exists(vendor_script_fns)) def test_merging_cloud_config(self): - blob = ''' + blob = """ #cloud-config a: b e: f run: - b - c -''' +""" message1 = MIMEBase("text", "cloud-config") message1.set_payload(blob) - blob2 = ''' + blob2 = """ #cloud-config a: e e: g run: - stuff - morestuff -''' +""" message2 = MIMEBase("text", "cloud-config") - message2['X-Merge-Type'] = ('dict(recurse_array,' - 'recurse_str)+list(append)+str(append)') + message2[ + "X-Merge-Type" + ] = "dict(recurse_array,recurse_str)+list(append)+str(append)" message2.set_payload(blob2) - blob3 = ''' + blob3 = """ #cloud-config e: - 1 - 2 - 3 p: 1 -''' +""" message3 = MIMEBase("text", "cloud-config") message3.set_payload(blob3) messages = [message1, message2, message3] - paths = c_helpers.Paths({}, ds=FakeDataSource('')) + paths = c_helpers.Paths({}, ds=FakeDataSource("")) cloud_cfg = handlers.cloud_config.CloudConfigPartHandler(paths) self.reRoot() - cloud_cfg.handle_part(None, handlers.CONTENT_START, None, None, None, - None) + cloud_cfg.handle_part( + None, handlers.CONTENT_START, None, None, None, None + ) for i, m in enumerate(messages): headers = dict(m) fn = "part-%s" % (i + 1) payload = m.get_payload(decode=True) - cloud_cfg.handle_part(None, headers['Content-Type'], - fn, payload, None, headers) - cloud_cfg.handle_part(None, handlers.CONTENT_END, None, None, None, - None) - contents = util.load_file(paths.get_ipath('cloud_config')) + cloud_cfg.handle_part( + None, headers["Content-Type"], fn, payload, None, headers + ) + cloud_cfg.handle_part( + None, handlers.CONTENT_END, None, None, None, None + ) + contents = util.load_file(paths.get_ipath("cloud_config")) contents = util.load_yaml(contents) - self.assertEqual(contents['run'], ['b', 'c', 'stuff', 'morestuff']) - self.assertEqual(contents['a'], 'be') - self.assertEqual(contents['e'], [1, 2, 3]) - self.assertEqual(contents['p'], 1) + self.assertEqual(contents["run"], ["b", "c", "stuff", "morestuff"]) + self.assertEqual(contents["a"], "be") + self.assertEqual(contents["e"], [1, 2, 3]) + self.assertEqual(contents["p"], 1) def test_unhandled_type_warning(self): """Raw text without magic is ignored but shows warning.""" @@ -385,35 +414,37 @@ p: 1 data = "arbitrary text\n" ci.datasource = FakeDataSource(data) - with mock.patch('cloudinit.util.write_file') as mockobj: + with mock.patch("cloudinit.util.write_file") as mockobj: log_file = self.capture_log(logging.WARNING) ci.fetch() ci.consume_data() self.assertIn( "Unhandled non-multipart (text/x-not-multipart) userdata:", - log_file.getvalue()) + log_file.getvalue(), + ) mockobj.assert_called_once_with( - ci.paths.get_ipath("cloud_config"), "", 0o600) + ci.paths.get_ipath("cloud_config"), "", 0o600 + ) def test_mime_gzip_compressed(self): """Tests that individual message gzip encoding works.""" def gzip_part(text): - return MIMEApplication(gzip_text(text), 'gzip') + return MIMEApplication(gzip_text(text), "gzip") - base_content1 = ''' + base_content1 = """ #cloud-config a: 2 -''' +""" - base_content2 = ''' + base_content2 = """ #cloud-config b: 3 c: 4 -''' +""" - message = MIMEMultipart('test') + message = MIMEMultipart("test") message.attach(gzip_part(base_content1)) message.attach(gzip_part(base_content2)) ci = stages.Init() @@ -425,9 +456,9 @@ c: 4 contents = util.load_yaml(contents) self.assertTrue(isinstance(contents, dict)) self.assertEqual(3, len(contents)) - self.assertEqual(2, contents['a']) - self.assertEqual(3, contents['b']) - self.assertEqual(4, contents['c']) + self.assertEqual(2, contents["a"]) + self.assertEqual(3, contents["b"]) + self.assertEqual(4, contents["c"]) def test_mime_text_plain(self): """Mime message of type text/plain is ignored but shows warning.""" @@ -437,15 +468,17 @@ c: 4 message.set_payload("Just text") ci.datasource = FakeDataSource(message.as_string().encode()) - with mock.patch('cloudinit.util.write_file') as mockobj: + with mock.patch("cloudinit.util.write_file") as mockobj: log_file = self.capture_log(logging.WARNING) ci.fetch() ci.consume_data() self.assertIn( "Unhandled unknown content-type (text/plain)", - log_file.getvalue()) + log_file.getvalue(), + ) mockobj.assert_called_once_with( - ci.paths.get_ipath("cloud_config"), "", 0o600) + ci.paths.get_ipath("cloud_config"), "", 0o600 + ) def test_shellscript(self): """Raw text starting #!/bin/sh is treated as script.""" @@ -456,15 +489,18 @@ c: 4 outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001") - with mock.patch('cloudinit.util.write_file') as mockobj: + with mock.patch("cloudinit.util.write_file") as mockobj: log_file = self.capture_log(logging.WARNING) ci.fetch() ci.consume_data() self.assertEqual("", log_file.getvalue()) - mockobj.assert_has_calls([ - mock.call(outpath, script, 0o700), - mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600)]) + mockobj.assert_has_calls( + [ + mock.call(outpath, script, 0o700), + mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600), + ] + ) def test_mime_text_x_shellscript(self): """Mime message of type text/x-shellscript is treated as script.""" @@ -477,15 +513,18 @@ c: 4 outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001") - with mock.patch('cloudinit.util.write_file') as mockobj: + with mock.patch("cloudinit.util.write_file") as mockobj: log_file = self.capture_log(logging.WARNING) ci.fetch() ci.consume_data() self.assertEqual("", log_file.getvalue()) - mockobj.assert_has_calls([ - mock.call(outpath, script, 0o700), - mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600)]) + mockobj.assert_has_calls( + [ + mock.call(outpath, script, 0o700), + mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600), + ] + ) def test_mime_text_plain_shell(self): """Mime type text/plain starting #!/bin/sh is treated as script.""" @@ -498,41 +537,48 @@ c: 4 outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001") - with mock.patch('cloudinit.util.write_file') as mockobj: + with mock.patch("cloudinit.util.write_file") as mockobj: log_file = self.capture_log(logging.WARNING) ci.fetch() ci.consume_data() self.assertEqual("", log_file.getvalue()) - mockobj.assert_has_calls([ - mock.call(outpath, script, 0o700), - mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600)]) + mockobj.assert_has_calls( + [ + mock.call(outpath, script, 0o700), + mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600), + ] + ) def test_mime_application_octet_stream(self): """Mime type application/octet-stream is ignored but shows warning.""" self.reRoot() ci = stages.Init() message = MIMEBase("application", "octet-stream") - message.set_payload(b'\xbf\xe6\xb2\xc3\xd3\xba\x13\xa4\xd8\xa1\xcc') + message.set_payload(b"\xbf\xe6\xb2\xc3\xd3\xba\x13\xa4\xd8\xa1\xcc") encoders.encode_base64(message) ci.datasource = FakeDataSource(message.as_string().encode()) - with mock.patch('cloudinit.util.write_file') as mockobj: + with mock.patch("cloudinit.util.write_file") as mockobj: log_file = self.capture_log(logging.WARNING) ci.fetch() ci.consume_data() self.assertIn( "Unhandled unknown content-type (application/octet-stream)", - log_file.getvalue()) + log_file.getvalue(), + ) mockobj.assert_called_once_with( - ci.paths.get_ipath("cloud_config"), "", 0o600) + ci.paths.get_ipath("cloud_config"), "", 0o600 + ) def test_cloud_config_archive(self): - non_decodable = b'\x11\xc9\xb4gTH\xee\x12' - data = [{'content': '#cloud-config\npassword: gocubs\n'}, - {'content': '#cloud-config\nlocale: chicago\n'}, - {'content': non_decodable}] - message = b'#cloud-config-archive\n' + safeyaml.dumps(data).encode() + non_decodable = b"\x11\xc9\xb4gTH\xee\x12" + data = [ + {"content": "#cloud-config\npassword: gocubs\n"}, + {"content": "#cloud-config\nlocale: chicago\n"}, + {"content": non_decodable}, + ] + message = b"#cloud-config-archive\n" + safeyaml.dumps(data).encode() self.reRoot() ci = stages.Init() @@ -545,35 +591,35 @@ c: 4 # consuming the user-data provided should write 'cloud_config' file # which will have our yaml in it. - with mock.patch('cloudinit.util.write_file') as mockobj: + with mock.patch("cloudinit.util.write_file") as mockobj: mockobj.side_effect = fsstore ci.fetch() ci.consume_data() cfg = util.load_yaml(fs[ci.paths.get_ipath("cloud_config")]) - self.assertEqual(cfg.get('password'), 'gocubs') - self.assertEqual(cfg.get('locale'), 'chicago') + self.assertEqual(cfg.get("password"), "gocubs") + self.assertEqual(cfg.get("locale"), "chicago") - @mock.patch('cloudinit.util.read_conf_with_confd') + @mock.patch("cloudinit.util.read_conf_with_confd") def test_dont_allow_user_data(self, mock_cfg): mock_cfg.return_value = {"allow_userdata": False} # test that user-data is ignored but vendor-data is kept - user_blob = ''' + user_blob = """ #cloud-config-jsonp [ { "op": "add", "path": "/baz", "value": "qux" }, { "op": "add", "path": "/bar", "value": "qux2" } ] -''' - vendor_blob = ''' +""" + vendor_blob = """ #cloud-config-jsonp [ { "op": "add", "path": "/baz", "value": "quxA" }, { "op": "add", "path": "/bar", "value": "quxB" }, { "op": "add", "path": "/foo", "value": "quxC" } ] -''' +""" self.reRoot() initer = stages.Init() initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob) @@ -582,21 +628,22 @@ c: 4 initer.fetch() initer.instancify() initer.update() - initer.cloudify().run('consume_data', - initer.consume_data, - args=[PER_INSTANCE], - freq=PER_INSTANCE) + initer.cloudify().run( + "consume_data", + initer.consume_data, + args=[PER_INSTANCE], + freq=PER_INSTANCE, + ) mods = stages.Modules(initer) - (_which_ran, _failures) = mods.run_section('cloud_init_modules') + (_which_ran, _failures) = mods.run_section("cloud_init_modules") cfg = mods.cfg - self.assertIn('vendor_data', cfg) - self.assertEqual('quxA', cfg['baz']) - self.assertEqual('quxB', cfg['bar']) - self.assertEqual('quxC', cfg['foo']) + self.assertIn("vendor_data", cfg) + self.assertEqual("quxA", cfg["baz"]) + self.assertEqual("quxB", cfg["bar"]) + self.assertEqual("quxC", cfg["foo"]) class TestConsumeUserDataHttp(TestConsumeUserData, helpers.HttprettyTestCase): - def setUp(self): TestConsumeUserData.setUp(self) helpers.HttprettyTestCase.setUp(self) @@ -605,14 +652,14 @@ class TestConsumeUserDataHttp(TestConsumeUserData, helpers.HttprettyTestCase): TestConsumeUserData.tearDown(self) helpers.HttprettyTestCase.tearDown(self) - @mock.patch('cloudinit.url_helper.time.sleep') + @mock.patch("cloudinit.url_helper.time.sleep") def test_include(self, mock_sleep): """Test #include.""" - included_url = 'http://hostname/path' - included_data = '#cloud-config\nincluded: true\n' + included_url = "http://hostname/path" + included_data = "#cloud-config\nincluded: true\n" httpretty.register_uri(httpretty.GET, included_url, included_data) - blob = '#include\n%s\n' % included_url + blob = "#include\n%s\n" % included_url self.reRoot() ci = stages.Init() @@ -621,20 +668,20 @@ class TestConsumeUserDataHttp(TestConsumeUserData, helpers.HttprettyTestCase): ci.consume_data() cc_contents = util.load_file(ci.paths.get_ipath("cloud_config")) cc = util.load_yaml(cc_contents) - self.assertTrue(cc.get('included')) + self.assertTrue(cc.get("included")) - @mock.patch('cloudinit.url_helper.time.sleep') + @mock.patch("cloudinit.url_helper.time.sleep") def test_include_bad_url(self, mock_sleep): """Test #include with a bad URL.""" - bad_url = 'http://bad/forbidden' - bad_data = '#cloud-config\nbad: true\n' + bad_url = "http://bad/forbidden" + bad_data = "#cloud-config\nbad: true\n" httpretty.register_uri(httpretty.GET, bad_url, bad_data, status=403) - included_url = 'http://hostname/path' - included_data = '#cloud-config\nincluded: true\n' + included_url = "http://hostname/path" + included_data = "#cloud-config\nincluded: true\n" httpretty.register_uri(httpretty.GET, included_url, included_data) - blob = '#include\n%s\n%s' % (bad_url, included_url) + blob = "#include\n%s\n%s" % (bad_url, included_url) self.reRoot() ci = stages.Init() @@ -642,26 +689,26 @@ class TestConsumeUserDataHttp(TestConsumeUserData, helpers.HttprettyTestCase): ci.fetch() with self.assertRaises(Exception) as context: ci.consume_data() - self.assertIn('403', str(context.exception)) + self.assertIn("403", str(context.exception)) with self.assertRaises(FileNotFoundError): util.load_file(ci.paths.get_ipath("cloud_config")) - @mock.patch('cloudinit.url_helper.time.sleep') + @mock.patch("cloudinit.url_helper.time.sleep") @mock.patch( "cloudinit.user_data.features.ERROR_ON_USER_DATA_FAILURE", False ) def test_include_bad_url_no_fail(self, mock_sleep): """Test #include with a bad URL and failure disabled""" - bad_url = 'http://bad/forbidden' - bad_data = '#cloud-config\nbad: true\n' + bad_url = "http://bad/forbidden" + bad_data = "#cloud-config\nbad: true\n" httpretty.register_uri(httpretty.GET, bad_url, bad_data, status=403) - included_url = 'http://hostname/path' - included_data = '#cloud-config\nincluded: true\n' + included_url = "http://hostname/path" + included_data = "#cloud-config\nincluded: true\n" httpretty.register_uri(httpretty.GET, included_url, included_data) - blob = '#include\n%s\n%s' % (bad_url, included_url) + blob = "#include\n%s\n%s" % (bad_url, included_url) self.reRoot() ci = stages.Init() @@ -670,32 +717,33 @@ class TestConsumeUserDataHttp(TestConsumeUserData, helpers.HttprettyTestCase): ci.fetch() ci.consume_data() - self.assertIn("403 Client Error: Forbidden for url: %s" % bad_url, - log_file.getvalue()) + self.assertIn( + "403 Client Error: Forbidden for url: %s" % bad_url, + log_file.getvalue(), + ) cc_contents = util.load_file(ci.paths.get_ipath("cloud_config")) cc = util.load_yaml(cc_contents) - self.assertIsNone(cc.get('bad')) - self.assertTrue(cc.get('included')) + self.assertIsNone(cc.get("bad")) + self.assertTrue(cc.get("included")) class TestUDProcess(helpers.ResourceUsingTestCase): - def test_bytes_in_userdata(self): - msg = b'#cloud-config\napt_update: True\n' + msg = b"#cloud-config\napt_update: True\n" ud_proc = ud.UserDataProcessor(self.getCloudPaths()) message = ud_proc.process(msg) self.assertTrue(count_messages(message) == 1) def test_string_in_userdata(self): - msg = '#cloud-config\napt_update: True\n' + msg = "#cloud-config\napt_update: True\n" ud_proc = ud.UserDataProcessor(self.getCloudPaths()) message = ud_proc.process(msg) self.assertTrue(count_messages(message) == 1) def test_compressed_in_userdata(self): - msg = gzip_text('#cloud-config\napt_update: True\n') + msg = gzip_text("#cloud-config\napt_update: True\n") ud_proc = ud.UserDataProcessor(self.getCloudPaths()) message = ud_proc.process(msg) @@ -703,15 +751,14 @@ class TestUDProcess(helpers.ResourceUsingTestCase): class TestConvertString(helpers.TestCase): - def test_handles_binary_non_utf8_decodable(self): """Printable unicode (not utf8-decodable) is safely converted.""" - blob = b'#!/bin/bash\necho \xc3\x84\n' + blob = b"#!/bin/bash\necho \xc3\x84\n" msg = ud.convert_string(blob) self.assertEqual(blob, msg.get_payload(decode=True)) def test_handles_binary_utf8_decodable(self): - blob = b'\x32\x32' + blob = b"\x32\x32" msg = ud.convert_string(blob) self.assertEqual(blob, msg.get_payload(decode=True)) @@ -731,24 +778,31 @@ class TestConvertString(helpers.TestCase): class TestFetchBaseConfig(helpers.TestCase): def test_only_builtin_gets_builtin(self): ret = helpers.wrap_and_call( - 'cloudinit.stages', - {'util.read_conf_with_confd': None, - 'util.read_conf_from_cmdline': None, - 'read_runtime_config': {'return_value': {}}}, - stages.fetch_base_config) + "cloudinit.stages", + { + "util.read_conf_with_confd": None, + "util.read_conf_from_cmdline": None, + "read_runtime_config": {"return_value": {}}, + }, + stages.fetch_base_config, + ) self.assertEqual(util.get_builtin_cfg(), ret) def test_conf_d_overrides_defaults(self): builtin = util.get_builtin_cfg() test_key = sorted(builtin)[0] - test_value = 'test' + test_value = "test" ret = helpers.wrap_and_call( - 'cloudinit.stages', - {'util.read_conf_with_confd': - {'return_value': {test_key: test_value}}, - 'util.read_conf_from_cmdline': None, - 'read_runtime_config': {'return_value': {}}}, - stages.fetch_base_config) + "cloudinit.stages", + { + "util.read_conf_with_confd": { + "return_value": {test_key: test_value} + }, + "util.read_conf_from_cmdline": None, + "read_runtime_config": {"return_value": {}}, + }, + stages.fetch_base_config, + ) self.assertEqual(ret.get(test_key), test_value) builtin[test_key] = test_value self.assertEqual(ret, builtin) @@ -756,47 +810,64 @@ class TestFetchBaseConfig(helpers.TestCase): def test_cmdline_overrides_defaults(self): builtin = util.get_builtin_cfg() test_key = sorted(builtin)[0] - test_value = 'test' + test_value = "test" cmdline = {test_key: test_value} ret = helpers.wrap_and_call( - 'cloudinit.stages', - {'util.read_conf_from_cmdline': {'return_value': cmdline}, - 'util.read_conf_with_confd': None, - 'read_runtime_config': None}, - stages.fetch_base_config) + "cloudinit.stages", + { + "util.read_conf_from_cmdline": {"return_value": cmdline}, + "util.read_conf_with_confd": None, + "read_runtime_config": None, + }, + stages.fetch_base_config, + ) self.assertEqual(ret.get(test_key), test_value) builtin[test_key] = test_value self.assertEqual(ret, builtin) def test_cmdline_overrides_confd_runtime_and_defaults(self): - builtin = {'key1': 'value0', 'key3': 'other2'} - conf_d = {'key1': 'value1', 'key2': 'other1'} - cmdline = {'key3': 'other3', 'key2': 'other2'} - runtime = {'key3': 'runtime3'} + builtin = {"key1": "value0", "key3": "other2"} + conf_d = {"key1": "value1", "key2": "other1"} + cmdline = {"key3": "other3", "key2": "other2"} + runtime = {"key3": "runtime3"} ret = helpers.wrap_and_call( - 'cloudinit.stages', - {'util.read_conf_with_confd': {'return_value': conf_d}, - 'util.get_builtin_cfg': {'return_value': builtin}, - 'read_runtime_config': {'return_value': runtime}, - 'util.read_conf_from_cmdline': {'return_value': cmdline}}, - stages.fetch_base_config) - self.assertEqual(ret, {'key1': 'value1', 'key2': 'other2', - 'key3': 'other3'}) + "cloudinit.stages", + { + "util.read_conf_with_confd": {"return_value": conf_d}, + "util.get_builtin_cfg": {"return_value": builtin}, + "read_runtime_config": {"return_value": runtime}, + "util.read_conf_from_cmdline": {"return_value": cmdline}, + }, + stages.fetch_base_config, + ) + self.assertEqual( + ret, {"key1": "value1", "key2": "other2", "key3": "other3"} + ) def test_order_precedence_is_builtin_system_runtime_cmdline(self): - builtin = {'key1': 'builtin0', 'key3': 'builtin3'} - conf_d = {'key1': 'confd1', 'key2': 'confd2', 'keyconfd1': 'kconfd1'} - runtime = {'key1': 'runtime1', 'key2': 'runtime2'} - cmdline = {'key1': 'cmdline1'} + builtin = {"key1": "builtin0", "key3": "builtin3"} + conf_d = {"key1": "confd1", "key2": "confd2", "keyconfd1": "kconfd1"} + runtime = {"key1": "runtime1", "key2": "runtime2"} + cmdline = {"key1": "cmdline1"} ret = helpers.wrap_and_call( - 'cloudinit.stages', - {'util.read_conf_with_confd': {'return_value': conf_d}, - 'util.get_builtin_cfg': {'return_value': builtin}, - 'util.read_conf_from_cmdline': {'return_value': cmdline}, - 'read_runtime_config': {'return_value': runtime}, - }, - stages.fetch_base_config) - self.assertEqual(ret, {'key1': 'cmdline1', 'key2': 'runtime2', - 'key3': 'builtin3', 'keyconfd1': 'kconfd1'}) + "cloudinit.stages", + { + "util.read_conf_with_confd": {"return_value": conf_d}, + "util.get_builtin_cfg": {"return_value": builtin}, + "util.read_conf_from_cmdline": {"return_value": cmdline}, + "read_runtime_config": {"return_value": runtime}, + }, + stages.fetch_base_config, + ) + self.assertEqual( + ret, + { + "key1": "cmdline1", + "key2": "runtime2", + "key3": "builtin3", + "keyconfd1": "kconfd1", + }, + ) + # vi: ts=4 expandtab |