# This file is part of cloud-init. See LICENSE file for license information. # Copyright (C) 2013 Hewlett-Packard Development Company, L.P. # # Author: Juerg Haefliger # # Based on test_handler_set_hostname.py # # This file is part of cloud-init. See LICENSE file for license information. import gzip import logging import tempfile from io import BytesIO from cloudinit import subp from cloudinit import util from cloudinit.config import cc_seed_random from tests.unittests import helpers as t_help from tests.unittests.util import get_cloud LOG = logging.getLogger(__name__) class TestRandomSeed(t_help.TestCase): def setUp(self): super(TestRandomSeed, self).setUp() self._seed_file = tempfile.mktemp() self.unapply = [] # by default 'which' has nothing in its path self.apply_patches([(subp, 'which', self._which)]) self.apply_patches([(subp, 'subp', self._subp)]) self.subp_called = [] self.whichdata = {} def tearDown(self): apply_patches([i for i in reversed(self.unapply)]) util.del_file(self._seed_file) def apply_patches(self, patches): ret = apply_patches(patches) self.unapply += ret def _which(self, program): return self.whichdata.get(program) def _subp(self, *args, **kwargs): # supports subp calling with cmd as args or kwargs if 'args' not in kwargs: kwargs['args'] = args[0] self.subp_called.append(kwargs) return def _compress(self, text): contents = BytesIO() gz_fh = gzip.GzipFile(mode='wb', fileobj=contents) gz_fh.write(text) gz_fh.close() return contents.getvalue() def test_append_random(self): cfg = { 'random_seed': { 'file': self._seed_file, 'data': 'tiny-tim-was-here', } } cc_seed_random.handle('test', cfg, get_cloud('ubuntu'), LOG, []) contents = util.load_file(self._seed_file) self.assertEqual("tiny-tim-was-here", contents) def test_append_random_unknown_encoding(self): data = self._compress(b"tiny-toe") cfg = { 'random_seed': { 'file': self._seed_file, 'data': data, 'encoding': 'special_encoding', } } self.assertRaises(IOError, cc_seed_random.handle, 'test', cfg, get_cloud('ubuntu'), LOG, []) def test_append_random_gzip(self): data = self._compress(b"tiny-toe") cfg = { 'random_seed': { 'file': self._seed_file, 'data': data, 'encoding': 'gzip', } } cc_seed_random.handle('test', cfg, get_cloud('ubuntu'), LOG, []) contents = util.load_file(self._seed_file) self.assertEqual("tiny-toe", contents) def test_append_random_gz(self): data = self._compress(b"big-toe") cfg = { 'random_seed': { 'file': self._seed_file, 'data': data, 'encoding': 'gz', } } cc_seed_random.handle('test', cfg, get_cloud('ubuntu'), LOG, []) contents = util.load_file(self._seed_file) self.assertEqual("big-toe", contents) def test_append_random_base64(self): data = util.b64e('bubbles') cfg = { 'random_seed': { 'file': self._seed_file, 'data': data, 'encoding': 'base64', } } cc_seed_random.handle('test', cfg, get_cloud('ubuntu'), LOG, []) contents = util.load_file(self._seed_file) self.assertEqual("bubbles", contents) def test_append_random_b64(self): data = util.b64e('kit-kat') cfg = { 'random_seed': { 'file': self._seed_file, 'data': data, 'encoding': 'b64', } } cc_seed_random.handle('test', cfg, get_cloud('ubuntu'), LOG, []) contents = util.load_file(self._seed_file) self.assertEqual("kit-kat", contents) def test_append_random_metadata(self): cfg = { 'random_seed': { 'file': self._seed_file, 'data': 'tiny-tim-was-here', } } c = get_cloud('ubuntu', metadata={'random_seed': '-so-was-josh'}) cc_seed_random.handle('test', cfg, c, LOG, []) contents = util.load_file(self._seed_file) self.assertEqual('tiny-tim-was-here-so-was-josh', contents) def test_seed_command_provided_and_available(self): c = get_cloud('ubuntu') self.whichdata = {'pollinate': '/usr/bin/pollinate'} cfg = {'random_seed': {'command': ['pollinate', '-q']}} cc_seed_random.handle('test', cfg, c, LOG, []) subp_args = [f['args'] for f in self.subp_called] self.assertIn(['pollinate', '-q'], subp_args) def test_seed_command_not_provided(self): c = get_cloud('ubuntu') self.whichdata = {} cc_seed_random.handle('test', {}, c, LOG, []) # subp should not have been called as which would say not available self.assertFalse(self.subp_called) def test_unavailable_seed_command_and_required_raises_error(self): c = get_cloud('ubuntu') self.whichdata = {} cfg = {'random_seed': {'command': ['THIS_NO_COMMAND'], 'command_required': True}} self.assertRaises(ValueError, cc_seed_random.handle, 'test', cfg, c, LOG, []) def test_seed_command_and_required(self): c = get_cloud('ubuntu') self.whichdata = {'foo': 'foo'} cfg = {'random_seed': {'command_required': True, 'command': ['foo']}} cc_seed_random.handle('test', cfg, c, LOG, []) self.assertIn(['foo'], [f['args'] for f in self.subp_called]) def test_file_in_environment_for_command(self): c = get_cloud('ubuntu') self.whichdata = {'foo': 'foo'} cfg = {'random_seed': {'command_required': True, 'command': ['foo'], 'file': self._seed_file}} cc_seed_random.handle('test', cfg, c, LOG, []) # this just instists that the first time subp was called, # RANDOM_SEED_FILE was in the environment set up correctly subp_env = [f['env'] for f in self.subp_called] self.assertEqual(subp_env[0].get('RANDOM_SEED_FILE'), self._seed_file) def apply_patches(patches): ret = [] for (ref, name, replace) in patches: if replace is None: continue orig = getattr(ref, name) setattr(ref, name, replace) ret.append((ref, name, orig)) return ret # vi: ts=4 expandtab