"""Tests for handling of userdata within cloud init""" import logging import StringIO from mocker import MockerTestCase import cloudinit from cloudinit.DataSource import DataSource instance_id = "i-testing" class FakeDataSource(DataSource): def __init__(self, userdata): self.metadata = {'instance-id': instance_id} self.userdata_raw = userdata class TestConsumeUserData(MockerTestCase): def setUp(self): self.mock_write = self.mocker.replace("cloudinit.util.write_file", passthrough=False) self.mock_write(self.get_ipath("cloud_config"), "", 0600) self.capture_log() def tearDown(self): self._log.removeHandler(self._log_handler) @staticmethod def get_ipath(name): return "%s/instances/%s%s" % (cloudinit.varlibdir, instance_id, cloudinit.pathmap[name]) def capture_log(self): self.log_file = StringIO.StringIO() self._log_handler = logging.StreamHandler(self.log_file) self._log_handler.setLevel(logging.DEBUG) self._log = logging.getLogger(cloudinit.logger_name) self._log.addHandler(self._log_handler) def test_script(self): script = "#!/bin/sh\necho hello\n" outpath = cloudinit.get_ipath_cur("scripts") + "/part-001" self.mock_write(outpath, script, 0700) self.mocker.replay() ci = cloudinit.CloudInit() ci.datasource = FakeDataSource(script) ci.consume_userdata() self.assertEqual("", self.log_file.getvalue()) def test_unhandled_type_warning(self): self.mocker.replay() ci = cloudinit.CloudInit() ci.datasource = FakeDataSource("arbitrary text\n") ci.consume_userdata() self.assertIn("Unhandled userdata part", self.log_file.getvalue())