# This file is part of cloud-init. See LICENSE file for license information. # Copyright (C) 2013 Hewlett-Packard Development Company, L.P. # # Author: Juerg Haefliger # # Based on test_handler_set_hostname.py # # This file is part of cloud-init. See LICENSE file for license information. from cloudinit.config import cc_seed_random import gzip import tempfile from io import BytesIO from cloudinit import cloud from cloudinit import distros from cloudinit import helpers from cloudinit import subp from cloudinit import util from cloudinit.sources import DataSourceNone from cloudinit.tests import helpers as t_help import logging LOG = logging.getLogger(__name__) class TestRandomSeed(t_help.TestCase): def setUp(self): super(TestRandomSeed, self).setUp() self._seed_file = tempfile.mktemp() self.unapply = [] # by default 'which' has nothing in its path self.apply_patches([(subp, 'which', self._which)]) self.apply_patches([(subp, 'subp', self._subp)]) self.subp_called = [] self.whichdata = {} def tearDown(self): apply_patches([i for i in reversed(self.unapply)]) util.del_file(self._seed_file) def apply_patches(self, patches): ret = apply_patches(patches) self.unapply += ret def _which(self, program): return self.whichdata.get(program) def _subp(self, *args, **kwargs): # supports subp calling with cmd as args or kwargs if 'args' not in kwargs: kwargs['args'] = args[0] self.subp_called.append(kwargs) return def _compress(self, text): contents = BytesIO() gz_fh = gzip.GzipFile(mode='wb', fileobj=contents) gz_fh.write(text) gz_fh.close() return contents.getvalue() def _get_cloud(self, distro, metadata=None): paths = helpers.Paths({}) cls = distros.fetch(distro) ubuntu_distro = cls(distro, {}, paths) ds = DataSourceNone.DataSourceNone({}, ubuntu_distro, paths) if metadata: ds.metadata = metadata return cloud.Cloud(ds, paths, {}, ubuntu_distro, None) def test_append_random(self): cfg = { 'random_seed': { 'file': self._seed_file, 'data': 'tiny-tim-was-here', } } cc_seed_random.handle('test', cfg, self._get_cloud('ubuntu'), LOG, []) contents = util.load_file(self._seed_file) self.assertEqual("tiny-tim-was-here", contents) def test_append_random_unknown_encoding(self): data = self._compress(b"tiny-toe") cfg = { 'random_seed': { 'file': self._seed_file, 'data': data, 'encoding': 'special_encoding', } } self.assertRaises(IOError, cc_seed_random.handle, 'test', cfg, self._get_cloud('ubuntu'), LOG, []) def test_append_random_gzip(self): data = self._compress(b"tiny-toe") cfg = { 'random_seed': { 'file': self._seed_file, 'data': data, 'encoding': 'gzip', } } cc_seed_random.handle('test', cfg, self._get_cloud('ubuntu'), LOG, []) contents = util.load_file(self._seed_file) self.assertEqual("tiny-toe", contents) def test_append_random_gz(self): data = self._compress(b"big-toe") cfg = { 'random_seed': { 'file': self._seed_file, 'data': data, 'encoding': 'gz', } } cc_seed_random.handle('test', cfg, self._get_cloud('ubuntu'), LOG, []) contents = util.load_file(self._seed_file) self.assertEqual("big-toe", contents) def test_append_random_base64(self): data = util.b64e('bubbles') cfg = { 'random_seed': { 'file': self._seed_file, 'data': data, 'encoding': 'base64', } } cc_seed_random.handle('test', cfg, self._get_cloud('ubuntu'), LOG, []) contents = util.load_file(self._seed_file) self.assertEqual("bubbles", contents) def test_append_random_b64(self): data = util.b64e('kit-kat') cfg = { 'random_seed': { 'file': self._seed_file, 'data': data, 'encoding': 'b64', } } cc_seed_random.handle('test', cfg, self._get_cloud('ubuntu'), LOG, []) contents = util.load_file(self._seed_file) self.assertEqual("kit-kat", contents) def test_append_random_metadata(self): cfg = { 'random_seed': { 'file': self._seed_file, 'data': 'tiny-tim-was-here', } } c = self._get_cloud('ubuntu', {'random_seed': '-so-was-josh'}) cc_seed_random.handle('test', cfg, c, LOG, []) contents = util.load_file(self._seed_file) self.assertEqual('tiny-tim-was-here-so-was-josh', contents) def test_seed_command_provided_and_available(self): c = self._get_cloud('ubuntu', {}) self.whichdata = {'pollinate': '/usr/bin/pollinate'} cfg = {'random_seed': {'command': ['pollinate', '-q']}} cc_seed_random.handle('test', cfg, c, LOG, []) subp_args = [f['args'] for f in self.subp_called] self.assertIn(['pollinate', '-q'], subp_args) def test_seed_command_not_provided(self): c = self._get_cloud('ubuntu', {}) self.whichdata = {} cc_seed_random.handle('test', {}, c, LOG, []) # subp should not have been called as which would say not available self.assertFalse(self.subp_called) def test_unavailable_seed_command_and_required_raises_error(self): c = self._get_cloud('ubuntu', {}) self.whichdata = {} cfg = {'random_seed': {'command': ['THIS_NO_COMMAND'], 'command_required': True}} self.assertRaises(ValueError, cc_seed_random.handle, 'test', cfg, c, LOG, []) def test_seed_command_and_required(self): c = self._get_cloud('ubuntu', {}) self.whichdata = {'foo': 'foo'} cfg = {'random_seed': {'command_required': True, 'command': ['foo']}} cc_seed_random.handle('test', cfg, c, LOG, []) self.assertIn(['foo'], [f['args'] for f in self.subp_called]) def test_file_in_environment_for_command(self): c = self._get_cloud('ubuntu', {}) self.whichdata = {'foo': 'foo'} cfg = {'random_seed': {'command_required': True, 'command': ['foo'], 'file': self._seed_file}} cc_seed_random.handle('test', cfg, c, LOG, []) # this just instists that the first time subp was called, # RANDOM_SEED_FILE was in the environment set up correctly subp_env = [f['env'] for f in self.subp_called] self.assertEqual(subp_env[0].get('RANDOM_SEED_FILE'), self._seed_file) def apply_patches(patches): ret = [] for (ref, name, replace) in patches: if replace is None: continue orig = getattr(ref, name) setattr(ref, name, replace) ret.append((ref, name, orig)) return ret # vi: ts=4 expandtab