# This file is part of cloud-init. See LICENSE file for license information. from cloudinit.config.cc_bootcmd import handle, schema from cloudinit.sources import DataSourceNone from cloudinit import (distros, helpers, cloud, subp, util) from cloudinit.tests.helpers import ( CiTestCase, mock, SchemaTestCaseMixin, skipUnlessJsonSchema) import logging import tempfile LOG = logging.getLogger(__name__) class FakeExtendedTempFile(object): def __init__(self, suffix): self.suffix = suffix self.handle = tempfile.NamedTemporaryFile( prefix="ci-%s." % self.__class__.__name__, delete=False) def __enter__(self): return self.handle def __exit__(self, exc_type, exc_value, traceback): self.handle.close() util.del_file(self.handle.name) class TestBootcmd(CiTestCase): with_logs = True _etmpfile_path = ('cloudinit.config.cc_bootcmd.temp_utils.' 'ExtendedTemporaryFile') def setUp(self): super(TestBootcmd, self).setUp() self.subp = subp.subp self.new_root = self.tmp_dir() def _get_cloud(self, distro): paths = helpers.Paths({}) cls = distros.fetch(distro) mydist = cls(distro, {}, paths) myds = DataSourceNone.DataSourceNone({}, mydist, paths) paths.datasource = myds return cloud.Cloud(myds, paths, {}, mydist, None) def test_handler_skip_if_no_bootcmd(self): """When the provided config doesn't contain bootcmd, skip it.""" cfg = {} mycloud = self._get_cloud('ubuntu') handle('notimportant', cfg, mycloud, LOG, None) self.assertIn( "Skipping module named notimportant, no 'bootcmd' key", self.logs.getvalue()) def test_handler_invalid_command_set(self): """Commands which can't be converted to shell will raise errors.""" invalid_config = {'bootcmd': 1} cc = self._get_cloud('ubuntu') with self.assertRaises(TypeError) as context_manager: handle('cc_bootcmd', invalid_config, cc, LOG, []) self.assertIn('Failed to shellify bootcmd', self.logs.getvalue()) self.assertEqual( "Input to shellify was type 'int'. Expected list or tuple.", str(context_manager.exception)) @skipUnlessJsonSchema() def test_handler_schema_validation_warns_non_array_type(self): """Schema validation warns of non-array type for bootcmd key. Schema validation is not strict, so bootcmd attempts to shellify the invalid content. """ invalid_config = {'bootcmd': 1} cc = self._get_cloud('ubuntu') with self.assertRaises(TypeError): handle('cc_bootcmd', invalid_config, cc, LOG, []) self.assertIn( 'Invalid config:\nbootcmd: 1 is not of type \'array\'', self.logs.getvalue()) self.assertIn('Failed to shellify', self.logs.getvalue()) @skipUnlessJsonSchema() def test_handler_schema_validation_warns_non_array_item_type(self): """Schema validation warns of non-array or string bootcmd items. Schema validation is not strict, so bootcmd attempts to shellify the invalid content. """ invalid_config = { 'bootcmd': ['ls /', 20, ['wget', 'http://stuff/blah'], {'a': 'n'}]} cc = self._get_cloud('ubuntu') with self.assertRaises(TypeError) as context_manager: handle('cc_bootcmd', invalid_config, cc, LOG, []) expected_warnings = [ 'bootcmd.1: 20 is not valid under any of the given schemas', 'bootcmd.3: {\'a\': \'n\'} is not valid under any of the given' ' schema' ] logs = self.logs.getvalue() for warning in expected_warnings: self.assertIn(warning, logs) self.assertIn('Failed to shellify', logs) self.assertEqual( ("Unable to shellify type 'int'. Expected list, string, tuple. " "Got: 20"), str(context_manager.exception)) def test_handler_creates_and_runs_bootcmd_script_with_instance_id(self): """Valid schema runs a bootcmd script with INSTANCE_ID in the env.""" cc = self._get_cloud('ubuntu') out_file = self.tmp_path('bootcmd.out', self.new_root) my_id = "b6ea0f59-e27d-49c6-9f87-79f19765a425" valid_config = {'bootcmd': [ 'echo {0} $INSTANCE_ID > {1}'.format(my_id, out_file)]} with mock.patch(self._etmpfile_path, FakeExtendedTempFile): with self.allow_subp(['/bin/sh']): handle('cc_bootcmd', valid_config, cc, LOG, []) self.assertEqual(my_id + ' iid-datasource-none\n', util.load_file(out_file)) def test_handler_runs_bootcmd_script_with_error(self): """When a valid script generates an error, that error is raised.""" cc = self._get_cloud('ubuntu') valid_config = {'bootcmd': ['exit 1']} # Script with error with mock.patch(self._etmpfile_path, FakeExtendedTempFile): with self.allow_subp(['/bin/sh']): with self.assertRaises(subp.ProcessExecutionError) as ctxt: handle('does-not-matter', valid_config, cc, LOG, []) self.assertIn( 'Unexpected error while running command.\n' "Command: ['/bin/sh',", str(ctxt.exception)) self.assertIn( 'Failed to run bootcmd module does-not-matter', self.logs.getvalue()) @skipUnlessJsonSchema() class TestSchema(CiTestCase, SchemaTestCaseMixin): """Directly test schema rather than through handle.""" schema = schema def test_duplicates_are_fine_array_array(self): """Duplicated commands array/array entries are allowed.""" self.assertSchemaValid( ["byebye", "byebye"], 'command entries can be duplicate') def test_duplicates_are_fine_array_string(self): """Duplicated commands array/string entries are allowed.""" self.assertSchemaValid( ["echo bye", "echo bye"], "command entries can be duplicate.") # vi: ts=4 expandtab