diff options
| author | zsdc <taras@vyos.io> | 2022-03-25 20:58:01 +0200 | 
|---|---|---|
| committer | zsdc <taras@vyos.io> | 2022-03-25 21:42:00 +0200 | 
| commit | 31448cccedd8f841fb3ac7d0f2e3cdefe08a53ba (patch) | |
| tree | 349631a02467dae0158f6f663cc8aa8537974a97 /tests/unittests/test_data.py | |
| parent | 5c4b3943343a85fbe517e5ec1fc670b3a8566b4b (diff) | |
| parent | 8537237d80a48c8f0cbf8e66aa4826bbc882b022 (diff) | |
| download | vyos-cloud-init-31448cccedd8f841fb3ac7d0f2e3cdefe08a53ba.tar.gz vyos-cloud-init-31448cccedd8f841fb3ac7d0f2e3cdefe08a53ba.zip | |
T2117: Cloud-init updated to 22.1
Merged with 22.1 tag from the upstream Cloud-init repository.
Our modules were slightly modified for compatibility with the new
version.
Diffstat (limited to 'tests/unittests/test_data.py')
| -rw-r--r-- | tests/unittests/test_data.py | 537 | 
1 files changed, 304 insertions, 233 deletions
| diff --git a/tests/unittests/test_data.py b/tests/unittests/test_data.py index fb2b55e8..a5018a42 100644 --- a/tests/unittests/test_data.py +++ b/tests/unittests/test_data.py @@ -5,39 +5,33 @@  import gzip  import logging  import os -from io import BytesIO, StringIO -from unittest import mock -  from email import encoders  from email.mime.application import MIMEApplication  from email.mime.base import MIMEBase  from email.mime.multipart import MIMEMultipart +from io import BytesIO, StringIO +from unittest import mock  import httpretty  from cloudinit import handlers  from cloudinit import helpers as c_helpers -from cloudinit import log -from cloudinit.settings import (PER_INSTANCE) -from cloudinit import sources -from cloudinit import stages +from cloudinit import log, safeyaml, sources, stages  from cloudinit import user_data as ud -from cloudinit import safeyaml  from cloudinit import util - -from cloudinit.tests import helpers - +from cloudinit.settings import PER_INSTANCE +from tests.unittests import helpers  INSTANCE_ID = "i-testing"  class FakeDataSource(sources.DataSource): - -    def __init__(self, userdata=None, vendordata=None): +    def __init__(self, userdata=None, vendordata=None, vendordata2=None):          sources.DataSource.__init__(self, {}, None, None) -        self.metadata = {'instance-id': INSTANCE_ID} +        self.metadata = {"instance-id": INSTANCE_ID}          self.userdata_raw = userdata          self.vendordata_raw = vendordata +        self.vendordata2_raw = vendordata2  def count_messages(root): @@ -51,7 +45,7 @@ def count_messages(root):  def gzip_text(text):      contents = BytesIO() -    f = gzip.GzipFile(fileobj=contents, mode='wb') +    f = gzip.GzipFile(fileobj=contents, mode="wb")      f.write(util.encode_text(text))      f.flush()      f.close() @@ -61,7 +55,6 @@ def gzip_text(text):  # FIXME: these tests shouldn't be checking log output??  # Weirddddd...  class TestConsumeUserData(helpers.FilesystemMockingTestCase): -      def setUp(self):          super(TestConsumeUserData, self).setUp()          self._log = None @@ -86,13 +79,13 @@ class TestConsumeUserData(helpers.FilesystemMockingTestCase):          return log_file      def test_simple_jsonp(self): -        blob = ''' +        blob = """  #cloud-config-jsonp  [       { "op": "add", "path": "/baz", "value": "qux" },       { "op": "add", "path": "/bar", "value": "qux2" }  ] -''' +"""          ci = stages.Init()          ci.datasource = FakeDataSource(blob) @@ -102,64 +95,84 @@ class TestConsumeUserData(helpers.FilesystemMockingTestCase):          cc_contents = util.load_file(ci.paths.get_ipath("cloud_config"))          cc = util.load_yaml(cc_contents)          self.assertEqual(2, len(cc)) -        self.assertEqual('qux', cc['baz']) -        self.assertEqual('qux2', cc['bar']) +        self.assertEqual("qux", cc["baz"]) +        self.assertEqual("qux2", cc["bar"]) -    def test_simple_jsonp_vendor_and_user(self): +    def test_simple_jsonp_vendor_and_vendor2_and_user(self):          # test that user-data wins over vendor -        user_blob = ''' +        user_blob = """  #cloud-config-jsonp  [       { "op": "add", "path": "/baz", "value": "qux" }, -     { "op": "add", "path": "/bar", "value": "qux2" } +     { "op": "add", "path": "/bar", "value": "qux2" }, +     { "op": "add", "path": "/foobar", "value": "qux3" }  ] -''' -        vendor_blob = ''' +""" +        vendor_blob = """  #cloud-config-jsonp  [       { "op": "add", "path": "/baz", "value": "quxA" },       { "op": "add", "path": "/bar", "value": "quxB" }, -     { "op": "add", "path": "/foo", "value": "quxC" } +     { "op": "add", "path": "/foo", "value": "quxC" }, +     { "op": "add", "path": "/corge", "value": "quxEE" } +] +""" +        vendor2_blob = """ +#cloud-config-jsonp +[ +     { "op": "add", "path": "/corge", "value": "quxD" }, +     { "op": "add", "path": "/grault", "value": "quxFF" }, +     { "op": "add", "path": "/foobar", "value": "quxGG" }  ] -''' +"""          self.reRoot()          initer = stages.Init() -        initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob) +        initer.datasource = FakeDataSource( +            user_blob, vendordata=vendor_blob, vendordata2=vendor2_blob +        )          initer.read_cfg()          initer.initialize()          initer.fetch()          initer.instancify()          initer.update() -        initer.cloudify().run('consume_data', -                              initer.consume_data, -                              args=[PER_INSTANCE], -                              freq=PER_INSTANCE) +        initer.cloudify().run( +            "consume_data", +            initer.consume_data, +            args=[PER_INSTANCE], +            freq=PER_INSTANCE, +        )          mods = stages.Modules(initer) -        (_which_ran, _failures) = mods.run_section('cloud_init_modules') +        (_which_ran, _failures) = mods.run_section("cloud_init_modules")          cfg = mods.cfg -        self.assertIn('vendor_data', cfg) -        self.assertEqual('qux', cfg['baz']) -        self.assertEqual('qux2', cfg['bar']) -        self.assertEqual('quxC', cfg['foo']) +        self.assertIn("vendor_data", cfg) +        self.assertIn("vendor_data2", cfg) +        # Confirm that vendordata2 overrides vendordata, and that +        #  userdata overrides both +        self.assertEqual("qux", cfg["baz"]) +        self.assertEqual("qux2", cfg["bar"]) +        self.assertEqual("qux3", cfg["foobar"]) +        self.assertEqual("quxC", cfg["foo"]) +        self.assertEqual("quxD", cfg["corge"]) +        self.assertEqual("quxFF", cfg["grault"])      def test_simple_jsonp_no_vendor_consumed(self):          # make sure that vendor data is not consumed -        user_blob = ''' +        user_blob = """  #cloud-config-jsonp  [       { "op": "add", "path": "/baz", "value": "qux" },       { "op": "add", "path": "/bar", "value": "qux2" },       { "op": "add", "path": "/vendor_data", "value": {"enabled": "false"}}  ] -''' -        vendor_blob = ''' +""" +        vendor_blob = """  #cloud-config-jsonp  [       { "op": "add", "path": "/baz", "value": "quxA" },       { "op": "add", "path": "/bar", "value": "quxB" },       { "op": "add", "path": "/foo", "value": "quxC" }  ] -''' +"""          self.reRoot()          initer = stages.Init()          initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob) @@ -168,35 +181,37 @@ class TestConsumeUserData(helpers.FilesystemMockingTestCase):          initer.fetch()          initer.instancify()          initer.update() -        initer.cloudify().run('consume_data', -                              initer.consume_data, -                              args=[PER_INSTANCE], -                              freq=PER_INSTANCE) +        initer.cloudify().run( +            "consume_data", +            initer.consume_data, +            args=[PER_INSTANCE], +            freq=PER_INSTANCE, +        )          mods = stages.Modules(initer) -        (_which_ran, _failures) = mods.run_section('cloud_init_modules') +        (_which_ran, _failures) = mods.run_section("cloud_init_modules")          cfg = mods.cfg -        self.assertEqual('qux', cfg['baz']) -        self.assertEqual('qux2', cfg['bar']) -        self.assertNotIn('foo', cfg) +        self.assertEqual("qux", cfg["baz"]) +        self.assertEqual("qux2", cfg["bar"]) +        self.assertNotIn("foo", cfg)      def test_mixed_cloud_config(self): -        blob_cc = ''' +        blob_cc = """  #cloud-config  a: b  c: d -''' +"""          message_cc = MIMEBase("text", "cloud-config")          message_cc.set_payload(blob_cc) -        blob_jp = ''' +        blob_jp = """  #cloud-config-jsonp  [       { "op": "replace", "path": "/a", "value": "c" },       { "op": "remove", "path": "/c" }  ] -''' +""" -        message_jp = MIMEBase('text', "cloud-config-jsonp") +        message_jp = MIMEBase("text", "cloud-config-jsonp")          message_jp.set_payload(blob_jp)          message = MIMEMultipart() @@ -211,26 +226,26 @@ c: d          cc_contents = util.load_file(ci.paths.get_ipath("cloud_config"))          cc = util.load_yaml(cc_contents)          self.assertEqual(1, len(cc)) -        self.assertEqual('c', cc['a']) +        self.assertEqual("c", cc["a"])      def test_cloud_config_as_x_shell_script(self): -        blob_cc = ''' +        blob_cc = """  #cloud-config  a: b  c: d -''' +"""          message_cc = MIMEBase("text", "x-shellscript")          message_cc.set_payload(blob_cc) -        blob_jp = ''' +        blob_jp = """  #cloud-config-jsonp  [       { "op": "replace", "path": "/a", "value": "c" },       { "op": "remove", "path": "/c" }  ] -''' +""" -        message_jp = MIMEBase('text', "cloud-config-jsonp") +        message_jp = MIMEBase("text", "cloud-config-jsonp")          message_jp.set_payload(blob_jp)          message = MIMEMultipart() @@ -245,19 +260,19 @@ c: d          cc_contents = util.load_file(ci.paths.get_ipath("cloud_config"))          cc = util.load_yaml(cc_contents)          self.assertEqual(1, len(cc)) -        self.assertEqual('c', cc['a']) +        self.assertEqual("c", cc["a"])      def test_vendor_user_yaml_cloud_config(self): -        vendor_blob = ''' +        vendor_blob = """  #cloud-config  a: b  name: vendor  run:   - x   - y -''' +""" -        user_blob = ''' +        user_blob = """  #cloud-config  a: c  vendor_data: @@ -266,7 +281,7 @@ vendor_data:  name: user  run:   - z -''' +"""          self.reRoot()          initer = stages.Init()          initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob) @@ -275,108 +290,122 @@ run:          initer.fetch()          initer.instancify()          initer.update() -        initer.cloudify().run('consume_data', -                              initer.consume_data, -                              args=[PER_INSTANCE], -                              freq=PER_INSTANCE) +        initer.cloudify().run( +            "consume_data", +            initer.consume_data, +            args=[PER_INSTANCE], +            freq=PER_INSTANCE, +        )          mods = stages.Modules(initer) -        (_which_ran, _failures) = mods.run_section('cloud_init_modules') +        (_which_ran, _failures) = mods.run_section("cloud_init_modules")          cfg = mods.cfg -        self.assertIn('vendor_data', cfg) -        self.assertEqual('c', cfg['a']) -        self.assertEqual('user', cfg['name']) -        self.assertNotIn('x', cfg['run']) -        self.assertNotIn('y', cfg['run']) -        self.assertIn('z', cfg['run']) +        self.assertIn("vendor_data", cfg) +        self.assertEqual("c", cfg["a"]) +        self.assertEqual("user", cfg["name"]) +        self.assertNotIn("x", cfg["run"]) +        self.assertNotIn("y", cfg["run"]) +        self.assertIn("z", cfg["run"])      def test_vendordata_script(self): -        vendor_blob = ''' +        vendor_blob = """  #!/bin/bash  echo "test" -''' +""" +        vendor2_blob = """ +#!/bin/bash +echo "dynamic test" +""" -        user_blob = ''' +        user_blob = """  #cloud-config  vendor_data:    enabled: True    prefix: /bin/true -''' +"""          new_root = self.reRoot()          initer = stages.Init() -        initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob) +        initer.datasource = FakeDataSource( +            user_blob, vendordata=vendor_blob, vendordata2=vendor2_blob +        )          initer.read_cfg()          initer.initialize()          initer.fetch()          initer.instancify()          initer.update() -        initer.cloudify().run('consume_data', -                              initer.consume_data, -                              args=[PER_INSTANCE], -                              freq=PER_INSTANCE) +        initer.cloudify().run( +            "consume_data", +            initer.consume_data, +            args=[PER_INSTANCE], +            freq=PER_INSTANCE, +        )          mods = stages.Modules(initer) -        (_which_ran, _failures) = mods.run_section('cloud_init_modules') -        vendor_script = initer.paths.get_ipath_cur('vendor_scripts') +        (_which_ran, _failures) = mods.run_section("cloud_init_modules") +        vendor_script = initer.paths.get_ipath_cur("vendor_scripts")          vendor_script_fns = "%s%s/part-001" % (new_root, vendor_script)          self.assertTrue(os.path.exists(vendor_script_fns))      def test_merging_cloud_config(self): -        blob = ''' +        blob = """  #cloud-config  a: b  e: f  run:   - b   - c -''' +"""          message1 = MIMEBase("text", "cloud-config")          message1.set_payload(blob) -        blob2 = ''' +        blob2 = """  #cloud-config  a: e  e: g  run:   - stuff   - morestuff -''' +"""          message2 = MIMEBase("text", "cloud-config") -        message2['X-Merge-Type'] = ('dict(recurse_array,' -                                    'recurse_str)+list(append)+str(append)') +        message2[ +            "X-Merge-Type" +        ] = "dict(recurse_array,recurse_str)+list(append)+str(append)"          message2.set_payload(blob2) -        blob3 = ''' +        blob3 = """  #cloud-config  e:   - 1   - 2   - 3  p: 1 -''' +"""          message3 = MIMEBase("text", "cloud-config")          message3.set_payload(blob3)          messages = [message1, message2, message3] -        paths = c_helpers.Paths({}, ds=FakeDataSource('')) +        paths = c_helpers.Paths({}, ds=FakeDataSource(""))          cloud_cfg = handlers.cloud_config.CloudConfigPartHandler(paths)          self.reRoot() -        cloud_cfg.handle_part(None, handlers.CONTENT_START, None, None, None, -                              None) +        cloud_cfg.handle_part( +            None, handlers.CONTENT_START, None, None, None, None +        )          for i, m in enumerate(messages):              headers = dict(m)              fn = "part-%s" % (i + 1)              payload = m.get_payload(decode=True) -            cloud_cfg.handle_part(None, headers['Content-Type'], -                                  fn, payload, None, headers) -        cloud_cfg.handle_part(None, handlers.CONTENT_END, None, None, None, -                              None) -        contents = util.load_file(paths.get_ipath('cloud_config')) +            cloud_cfg.handle_part( +                None, headers["Content-Type"], fn, payload, None, headers +            ) +        cloud_cfg.handle_part( +            None, handlers.CONTENT_END, None, None, None, None +        ) +        contents = util.load_file(paths.get_ipath("cloud_config"))          contents = util.load_yaml(contents) -        self.assertEqual(contents['run'], ['b', 'c', 'stuff', 'morestuff']) -        self.assertEqual(contents['a'], 'be') -        self.assertEqual(contents['e'], [1, 2, 3]) -        self.assertEqual(contents['p'], 1) +        self.assertEqual(contents["run"], ["b", "c", "stuff", "morestuff"]) +        self.assertEqual(contents["a"], "be") +        self.assertEqual(contents["e"], [1, 2, 3]) +        self.assertEqual(contents["p"], 1)      def test_unhandled_type_warning(self):          """Raw text without magic is ignored but shows warning.""" @@ -385,35 +414,37 @@ p: 1          data = "arbitrary text\n"          ci.datasource = FakeDataSource(data) -        with mock.patch('cloudinit.util.write_file') as mockobj: +        with mock.patch("cloudinit.util.write_file") as mockobj:              log_file = self.capture_log(logging.WARNING)              ci.fetch()              ci.consume_data()              self.assertIn(                  "Unhandled non-multipart (text/x-not-multipart) userdata:", -                log_file.getvalue()) +                log_file.getvalue(), +            )          mockobj.assert_called_once_with( -            ci.paths.get_ipath("cloud_config"), "", 0o600) +            ci.paths.get_ipath("cloud_config"), "", 0o600 +        )      def test_mime_gzip_compressed(self):          """Tests that individual message gzip encoding works."""          def gzip_part(text): -            return MIMEApplication(gzip_text(text), 'gzip') +            return MIMEApplication(gzip_text(text), "gzip") -        base_content1 = ''' +        base_content1 = """  #cloud-config  a: 2 -''' +""" -        base_content2 = ''' +        base_content2 = """  #cloud-config  b: 3  c: 4 -''' +""" -        message = MIMEMultipart('test') +        message = MIMEMultipart("test")          message.attach(gzip_part(base_content1))          message.attach(gzip_part(base_content2))          ci = stages.Init() @@ -425,9 +456,9 @@ c: 4          contents = util.load_yaml(contents)          self.assertTrue(isinstance(contents, dict))          self.assertEqual(3, len(contents)) -        self.assertEqual(2, contents['a']) -        self.assertEqual(3, contents['b']) -        self.assertEqual(4, contents['c']) +        self.assertEqual(2, contents["a"]) +        self.assertEqual(3, contents["b"]) +        self.assertEqual(4, contents["c"])      def test_mime_text_plain(self):          """Mime message of type text/plain is ignored but shows warning.""" @@ -437,15 +468,17 @@ c: 4          message.set_payload("Just text")          ci.datasource = FakeDataSource(message.as_string().encode()) -        with mock.patch('cloudinit.util.write_file') as mockobj: +        with mock.patch("cloudinit.util.write_file") as mockobj:              log_file = self.capture_log(logging.WARNING)              ci.fetch()              ci.consume_data()              self.assertIn(                  "Unhandled unknown content-type (text/plain)", -                log_file.getvalue()) +                log_file.getvalue(), +            )          mockobj.assert_called_once_with( -            ci.paths.get_ipath("cloud_config"), "", 0o600) +            ci.paths.get_ipath("cloud_config"), "", 0o600 +        )      def test_shellscript(self):          """Raw text starting #!/bin/sh is treated as script.""" @@ -456,15 +489,18 @@ c: 4          outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001") -        with mock.patch('cloudinit.util.write_file') as mockobj: +        with mock.patch("cloudinit.util.write_file") as mockobj:              log_file = self.capture_log(logging.WARNING)              ci.fetch()              ci.consume_data()              self.assertEqual("", log_file.getvalue()) -        mockobj.assert_has_calls([ -            mock.call(outpath, script, 0o700), -            mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600)]) +        mockobj.assert_has_calls( +            [ +                mock.call(outpath, script, 0o700), +                mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600), +            ] +        )      def test_mime_text_x_shellscript(self):          """Mime message of type text/x-shellscript is treated as script.""" @@ -477,15 +513,18 @@ c: 4          outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001") -        with mock.patch('cloudinit.util.write_file') as mockobj: +        with mock.patch("cloudinit.util.write_file") as mockobj:              log_file = self.capture_log(logging.WARNING)              ci.fetch()              ci.consume_data()              self.assertEqual("", log_file.getvalue()) -        mockobj.assert_has_calls([ -            mock.call(outpath, script, 0o700), -            mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600)]) +        mockobj.assert_has_calls( +            [ +                mock.call(outpath, script, 0o700), +                mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600), +            ] +        )      def test_mime_text_plain_shell(self):          """Mime type text/plain starting #!/bin/sh is treated as script.""" @@ -498,41 +537,48 @@ c: 4          outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001") -        with mock.patch('cloudinit.util.write_file') as mockobj: +        with mock.patch("cloudinit.util.write_file") as mockobj:              log_file = self.capture_log(logging.WARNING)              ci.fetch()              ci.consume_data()              self.assertEqual("", log_file.getvalue()) -        mockobj.assert_has_calls([ -            mock.call(outpath, script, 0o700), -            mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600)]) +        mockobj.assert_has_calls( +            [ +                mock.call(outpath, script, 0o700), +                mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600), +            ] +        )      def test_mime_application_octet_stream(self):          """Mime type application/octet-stream is ignored but shows warning."""          self.reRoot()          ci = stages.Init()          message = MIMEBase("application", "octet-stream") -        message.set_payload(b'\xbf\xe6\xb2\xc3\xd3\xba\x13\xa4\xd8\xa1\xcc') +        message.set_payload(b"\xbf\xe6\xb2\xc3\xd3\xba\x13\xa4\xd8\xa1\xcc")          encoders.encode_base64(message)          ci.datasource = FakeDataSource(message.as_string().encode()) -        with mock.patch('cloudinit.util.write_file') as mockobj: +        with mock.patch("cloudinit.util.write_file") as mockobj:              log_file = self.capture_log(logging.WARNING)              ci.fetch()              ci.consume_data()              self.assertIn(                  "Unhandled unknown content-type (application/octet-stream)", -                log_file.getvalue()) +                log_file.getvalue(), +            )          mockobj.assert_called_once_with( -            ci.paths.get_ipath("cloud_config"), "", 0o600) +            ci.paths.get_ipath("cloud_config"), "", 0o600 +        )      def test_cloud_config_archive(self): -        non_decodable = b'\x11\xc9\xb4gTH\xee\x12' -        data = [{'content': '#cloud-config\npassword: gocubs\n'}, -                {'content': '#cloud-config\nlocale: chicago\n'}, -                {'content': non_decodable}] -        message = b'#cloud-config-archive\n' + safeyaml.dumps(data).encode() +        non_decodable = b"\x11\xc9\xb4gTH\xee\x12" +        data = [ +            {"content": "#cloud-config\npassword: gocubs\n"}, +            {"content": "#cloud-config\nlocale: chicago\n"}, +            {"content": non_decodable}, +        ] +        message = b"#cloud-config-archive\n" + safeyaml.dumps(data).encode()          self.reRoot()          ci = stages.Init() @@ -545,35 +591,35 @@ c: 4          # consuming the user-data provided should write 'cloud_config' file          # which will have our yaml in it. -        with mock.patch('cloudinit.util.write_file') as mockobj: +        with mock.patch("cloudinit.util.write_file") as mockobj:              mockobj.side_effect = fsstore              ci.fetch()              ci.consume_data()          cfg = util.load_yaml(fs[ci.paths.get_ipath("cloud_config")]) -        self.assertEqual(cfg.get('password'), 'gocubs') -        self.assertEqual(cfg.get('locale'), 'chicago') +        self.assertEqual(cfg.get("password"), "gocubs") +        self.assertEqual(cfg.get("locale"), "chicago") -    @mock.patch('cloudinit.util.read_conf_with_confd') +    @mock.patch("cloudinit.util.read_conf_with_confd")      def test_dont_allow_user_data(self, mock_cfg):          mock_cfg.return_value = {"allow_userdata": False}          # test that user-data is ignored but vendor-data is kept -        user_blob = ''' +        user_blob = """  #cloud-config-jsonp  [       { "op": "add", "path": "/baz", "value": "qux" },       { "op": "add", "path": "/bar", "value": "qux2" }  ] -''' -        vendor_blob = ''' +""" +        vendor_blob = """  #cloud-config-jsonp  [       { "op": "add", "path": "/baz", "value": "quxA" },       { "op": "add", "path": "/bar", "value": "quxB" },       { "op": "add", "path": "/foo", "value": "quxC" }  ] -''' +"""          self.reRoot()          initer = stages.Init()          initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob) @@ -582,21 +628,22 @@ c: 4          initer.fetch()          initer.instancify()          initer.update() -        initer.cloudify().run('consume_data', -                              initer.consume_data, -                              args=[PER_INSTANCE], -                              freq=PER_INSTANCE) +        initer.cloudify().run( +            "consume_data", +            initer.consume_data, +            args=[PER_INSTANCE], +            freq=PER_INSTANCE, +        )          mods = stages.Modules(initer) -        (_which_ran, _failures) = mods.run_section('cloud_init_modules') +        (_which_ran, _failures) = mods.run_section("cloud_init_modules")          cfg = mods.cfg -        self.assertIn('vendor_data', cfg) -        self.assertEqual('quxA', cfg['baz']) -        self.assertEqual('quxB', cfg['bar']) -        self.assertEqual('quxC', cfg['foo']) +        self.assertIn("vendor_data", cfg) +        self.assertEqual("quxA", cfg["baz"]) +        self.assertEqual("quxB", cfg["bar"]) +        self.assertEqual("quxC", cfg["foo"])  class TestConsumeUserDataHttp(TestConsumeUserData, helpers.HttprettyTestCase): -      def setUp(self):          TestConsumeUserData.setUp(self)          helpers.HttprettyTestCase.setUp(self) @@ -605,14 +652,14 @@ class TestConsumeUserDataHttp(TestConsumeUserData, helpers.HttprettyTestCase):          TestConsumeUserData.tearDown(self)          helpers.HttprettyTestCase.tearDown(self) -    @mock.patch('cloudinit.url_helper.time.sleep') +    @mock.patch("cloudinit.url_helper.time.sleep")      def test_include(self, mock_sleep):          """Test #include.""" -        included_url = 'http://hostname/path' -        included_data = '#cloud-config\nincluded: true\n' +        included_url = "http://hostname/path" +        included_data = "#cloud-config\nincluded: true\n"          httpretty.register_uri(httpretty.GET, included_url, included_data) -        blob = '#include\n%s\n' % included_url +        blob = "#include\n%s\n" % included_url          self.reRoot()          ci = stages.Init() @@ -621,20 +668,20 @@ class TestConsumeUserDataHttp(TestConsumeUserData, helpers.HttprettyTestCase):          ci.consume_data()          cc_contents = util.load_file(ci.paths.get_ipath("cloud_config"))          cc = util.load_yaml(cc_contents) -        self.assertTrue(cc.get('included')) +        self.assertTrue(cc.get("included")) -    @mock.patch('cloudinit.url_helper.time.sleep') +    @mock.patch("cloudinit.url_helper.time.sleep")      def test_include_bad_url(self, mock_sleep):          """Test #include with a bad URL.""" -        bad_url = 'http://bad/forbidden' -        bad_data = '#cloud-config\nbad: true\n' +        bad_url = "http://bad/forbidden" +        bad_data = "#cloud-config\nbad: true\n"          httpretty.register_uri(httpretty.GET, bad_url, bad_data, status=403) -        included_url = 'http://hostname/path' -        included_data = '#cloud-config\nincluded: true\n' +        included_url = "http://hostname/path" +        included_data = "#cloud-config\nincluded: true\n"          httpretty.register_uri(httpretty.GET, included_url, included_data) -        blob = '#include\n%s\n%s' % (bad_url, included_url) +        blob = "#include\n%s\n%s" % (bad_url, included_url)          self.reRoot()          ci = stages.Init() @@ -642,26 +689,26 @@ class TestConsumeUserDataHttp(TestConsumeUserData, helpers.HttprettyTestCase):          ci.fetch()          with self.assertRaises(Exception) as context:              ci.consume_data() -        self.assertIn('403', str(context.exception)) +        self.assertIn("403", str(context.exception))          with self.assertRaises(FileNotFoundError):              util.load_file(ci.paths.get_ipath("cloud_config")) -    @mock.patch('cloudinit.url_helper.time.sleep') +    @mock.patch("cloudinit.url_helper.time.sleep")      @mock.patch(          "cloudinit.user_data.features.ERROR_ON_USER_DATA_FAILURE", False      )      def test_include_bad_url_no_fail(self, mock_sleep):          """Test #include with a bad URL and failure disabled""" -        bad_url = 'http://bad/forbidden' -        bad_data = '#cloud-config\nbad: true\n' +        bad_url = "http://bad/forbidden" +        bad_data = "#cloud-config\nbad: true\n"          httpretty.register_uri(httpretty.GET, bad_url, bad_data, status=403) -        included_url = 'http://hostname/path' -        included_data = '#cloud-config\nincluded: true\n' +        included_url = "http://hostname/path" +        included_data = "#cloud-config\nincluded: true\n"          httpretty.register_uri(httpretty.GET, included_url, included_data) -        blob = '#include\n%s\n%s' % (bad_url, included_url) +        blob = "#include\n%s\n%s" % (bad_url, included_url)          self.reRoot()          ci = stages.Init() @@ -670,32 +717,33 @@ class TestConsumeUserDataHttp(TestConsumeUserData, helpers.HttprettyTestCase):          ci.fetch()          ci.consume_data() -        self.assertIn("403 Client Error: Forbidden for url: %s" % bad_url, -                      log_file.getvalue()) +        self.assertIn( +            "403 Client Error: Forbidden for url: %s" % bad_url, +            log_file.getvalue(), +        )          cc_contents = util.load_file(ci.paths.get_ipath("cloud_config"))          cc = util.load_yaml(cc_contents) -        self.assertIsNone(cc.get('bad')) -        self.assertTrue(cc.get('included')) +        self.assertIsNone(cc.get("bad")) +        self.assertTrue(cc.get("included"))  class TestUDProcess(helpers.ResourceUsingTestCase): -      def test_bytes_in_userdata(self): -        msg = b'#cloud-config\napt_update: True\n' +        msg = b"#cloud-config\napt_update: True\n"          ud_proc = ud.UserDataProcessor(self.getCloudPaths())          message = ud_proc.process(msg)          self.assertTrue(count_messages(message) == 1)      def test_string_in_userdata(self): -        msg = '#cloud-config\napt_update: True\n' +        msg = "#cloud-config\napt_update: True\n"          ud_proc = ud.UserDataProcessor(self.getCloudPaths())          message = ud_proc.process(msg)          self.assertTrue(count_messages(message) == 1)      def test_compressed_in_userdata(self): -        msg = gzip_text('#cloud-config\napt_update: True\n') +        msg = gzip_text("#cloud-config\napt_update: True\n")          ud_proc = ud.UserDataProcessor(self.getCloudPaths())          message = ud_proc.process(msg) @@ -703,15 +751,14 @@ class TestUDProcess(helpers.ResourceUsingTestCase):  class TestConvertString(helpers.TestCase): -      def test_handles_binary_non_utf8_decodable(self):          """Printable unicode (not utf8-decodable) is safely converted.""" -        blob = b'#!/bin/bash\necho \xc3\x84\n' +        blob = b"#!/bin/bash\necho \xc3\x84\n"          msg = ud.convert_string(blob)          self.assertEqual(blob, msg.get_payload(decode=True))      def test_handles_binary_utf8_decodable(self): -        blob = b'\x32\x32' +        blob = b"\x32\x32"          msg = ud.convert_string(blob)          self.assertEqual(blob, msg.get_payload(decode=True)) @@ -731,24 +778,31 @@ class TestConvertString(helpers.TestCase):  class TestFetchBaseConfig(helpers.TestCase):      def test_only_builtin_gets_builtin(self):          ret = helpers.wrap_and_call( -            'cloudinit.stages', -            {'util.read_conf_with_confd': None, -             'util.read_conf_from_cmdline': None, -             'read_runtime_config': {'return_value': {}}}, -            stages.fetch_base_config) +            "cloudinit.stages", +            { +                "util.read_conf_with_confd": None, +                "util.read_conf_from_cmdline": None, +                "read_runtime_config": {"return_value": {}}, +            }, +            stages.fetch_base_config, +        )          self.assertEqual(util.get_builtin_cfg(), ret)      def test_conf_d_overrides_defaults(self):          builtin = util.get_builtin_cfg()          test_key = sorted(builtin)[0] -        test_value = 'test' +        test_value = "test"          ret = helpers.wrap_and_call( -            'cloudinit.stages', -            {'util.read_conf_with_confd': -                {'return_value': {test_key: test_value}}, -             'util.read_conf_from_cmdline': None, -             'read_runtime_config': {'return_value': {}}}, -            stages.fetch_base_config) +            "cloudinit.stages", +            { +                "util.read_conf_with_confd": { +                    "return_value": {test_key: test_value} +                }, +                "util.read_conf_from_cmdline": None, +                "read_runtime_config": {"return_value": {}}, +            }, +            stages.fetch_base_config, +        )          self.assertEqual(ret.get(test_key), test_value)          builtin[test_key] = test_value          self.assertEqual(ret, builtin) @@ -756,47 +810,64 @@ class TestFetchBaseConfig(helpers.TestCase):      def test_cmdline_overrides_defaults(self):          builtin = util.get_builtin_cfg()          test_key = sorted(builtin)[0] -        test_value = 'test' +        test_value = "test"          cmdline = {test_key: test_value}          ret = helpers.wrap_and_call( -            'cloudinit.stages', -            {'util.read_conf_from_cmdline': {'return_value': cmdline}, -             'util.read_conf_with_confd': None, -             'read_runtime_config': None}, -            stages.fetch_base_config) +            "cloudinit.stages", +            { +                "util.read_conf_from_cmdline": {"return_value": cmdline}, +                "util.read_conf_with_confd": None, +                "read_runtime_config": None, +            }, +            stages.fetch_base_config, +        )          self.assertEqual(ret.get(test_key), test_value)          builtin[test_key] = test_value          self.assertEqual(ret, builtin)      def test_cmdline_overrides_confd_runtime_and_defaults(self): -        builtin = {'key1': 'value0', 'key3': 'other2'} -        conf_d = {'key1': 'value1', 'key2': 'other1'} -        cmdline = {'key3': 'other3', 'key2': 'other2'} -        runtime = {'key3': 'runtime3'} +        builtin = {"key1": "value0", "key3": "other2"} +        conf_d = {"key1": "value1", "key2": "other1"} +        cmdline = {"key3": "other3", "key2": "other2"} +        runtime = {"key3": "runtime3"}          ret = helpers.wrap_and_call( -            'cloudinit.stages', -            {'util.read_conf_with_confd': {'return_value': conf_d}, -             'util.get_builtin_cfg': {'return_value': builtin}, -             'read_runtime_config': {'return_value': runtime}, -             'util.read_conf_from_cmdline': {'return_value': cmdline}}, -            stages.fetch_base_config) -        self.assertEqual(ret, {'key1': 'value1', 'key2': 'other2', -                               'key3': 'other3'}) +            "cloudinit.stages", +            { +                "util.read_conf_with_confd": {"return_value": conf_d}, +                "util.get_builtin_cfg": {"return_value": builtin}, +                "read_runtime_config": {"return_value": runtime}, +                "util.read_conf_from_cmdline": {"return_value": cmdline}, +            }, +            stages.fetch_base_config, +        ) +        self.assertEqual( +            ret, {"key1": "value1", "key2": "other2", "key3": "other3"} +        )      def test_order_precedence_is_builtin_system_runtime_cmdline(self): -        builtin = {'key1': 'builtin0', 'key3': 'builtin3'} -        conf_d = {'key1': 'confd1', 'key2': 'confd2', 'keyconfd1': 'kconfd1'} -        runtime = {'key1': 'runtime1', 'key2': 'runtime2'} -        cmdline = {'key1': 'cmdline1'} +        builtin = {"key1": "builtin0", "key3": "builtin3"} +        conf_d = {"key1": "confd1", "key2": "confd2", "keyconfd1": "kconfd1"} +        runtime = {"key1": "runtime1", "key2": "runtime2"} +        cmdline = {"key1": "cmdline1"}          ret = helpers.wrap_and_call( -            'cloudinit.stages', -            {'util.read_conf_with_confd': {'return_value': conf_d}, -             'util.get_builtin_cfg': {'return_value': builtin}, -             'util.read_conf_from_cmdline': {'return_value': cmdline}, -             'read_runtime_config': {'return_value': runtime}, -             }, -            stages.fetch_base_config) -        self.assertEqual(ret, {'key1': 'cmdline1', 'key2': 'runtime2', -                               'key3': 'builtin3', 'keyconfd1': 'kconfd1'}) +            "cloudinit.stages", +            { +                "util.read_conf_with_confd": {"return_value": conf_d}, +                "util.get_builtin_cfg": {"return_value": builtin}, +                "util.read_conf_from_cmdline": {"return_value": cmdline}, +                "read_runtime_config": {"return_value": runtime}, +            }, +            stages.fetch_base_config, +        ) +        self.assertEqual( +            ret, +            { +                "key1": "cmdline1", +                "key2": "runtime2", +                "key3": "builtin3", +                "keyconfd1": "kconfd1", +            }, +        ) +  # vi: ts=4 expandtab | 
