diff options
author | Scott Moser <smoser@ubuntu.com> | 2014-07-21 14:33:27 -0400 |
---|---|---|
committer | Scott Moser <smoser@ubuntu.com> | 2014-07-21 14:33:27 -0400 |
commit | 0dbd2a0ec71de2f9d8d631ae5241a948fc6a51ee (patch) | |
tree | 658d3a3d0e2675a1738d65cc1260f2594877c360 /tests/unittests | |
parent | f9d18e2ed747a6d44e60547fbcc0bbff780f351f (diff) | |
parent | 6a4976e8a9915680fbc91f90bed8fcfa79cba5cf (diff) | |
download | vyos-cloud-init-0dbd2a0ec71de2f9d8d631ae5241a948fc6a51ee.tar.gz vyos-cloud-init-0dbd2a0ec71de2f9d8d631ae5241a948fc6a51ee.zip |
merge from trunk
Diffstat (limited to 'tests/unittests')
-rw-r--r-- | tests/unittests/helpers.py | 24 | ||||
-rw-r--r-- | tests/unittests/test__init__.py | 6 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_cloudsigma.py | 49 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_gce.py | 5 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_maas.py | 1 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_opennebula.py | 26 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_smartos.py | 4 | ||||
-rw-r--r-- | tests/unittests/test_handler/test_handler_seed_random.py | 75 | ||||
-rw-r--r-- | tests/unittests/test_handler/test_handler_yum_add_repo.py | 1 |
9 files changed, 172 insertions, 19 deletions
diff --git a/tests/unittests/helpers.py b/tests/unittests/helpers.py index 5bed13cc..970eb8cb 100644 --- a/tests/unittests/helpers.py +++ b/tests/unittests/helpers.py @@ -52,6 +52,30 @@ if PY26: standardMsg = standardMsg % (value) self.fail(self._formatMessage(msg, standardMsg)) + def assertDictContainsSubset(self, expected, actual, msg=None): + missing = [] + mismatched = [] + for k, v in expected.iteritems(): + if k not in actual: + missing.append(k) + elif actual[k] != v: + mismatched.append('%r, expected: %r, actual: %r' + % (k, v, actual[k])) + + if len(missing) == 0 and len(mismatched) == 0: + return + + standardMsg = '' + if missing: + standardMsg = 'Missing: %r' % ','.join(m for m in missing) + if mismatched: + if standardMsg: + standardMsg += '; ' + standardMsg += 'Mismatched values: %s' % ','.join(mismatched) + + self.fail(self._formatMessage(msg, standardMsg)) + + else: class TestCase(unittest.TestCase): pass diff --git a/tests/unittests/test__init__.py b/tests/unittests/test__init__.py index 8c41c1ca..03065c8b 100644 --- a/tests/unittests/test__init__.py +++ b/tests/unittests/test__init__.py @@ -1,14 +1,10 @@ -import logging import os -import StringIO -import sys -from mocker import MockerTestCase, ANY, ARGS, KWARGS +from mocker import MockerTestCase, ARGS, KWARGS from cloudinit import handlers from cloudinit import helpers from cloudinit import importer -from cloudinit import log from cloudinit import settings from cloudinit import url_helper from cloudinit import util diff --git a/tests/unittests/test_datasource/test_cloudsigma.py b/tests/unittests/test_datasource/test_cloudsigma.py index 3245aba1..eadb3cb7 100644 --- a/tests/unittests/test_datasource/test_cloudsigma.py +++ b/tests/unittests/test_datasource/test_cloudsigma.py @@ -1,9 +1,11 @@ # coding: utf-8 -from unittest import TestCase +import copy from cloudinit.cs_utils import Cepko from cloudinit.sources import DataSourceCloudSigma +from tests.unittests import helpers as test_helpers + SERVER_CONTEXT = { "cpu": 1000, @@ -19,21 +21,27 @@ SERVER_CONTEXT = { "smp": 1, "tags": ["much server", "very performance"], "uuid": "65b2fb23-8c03-4187-a3ba-8b7c919e8890", - "vnc_password": "9e84d6cb49e46379" + "vnc_password": "9e84d6cb49e46379", + "vendor_data": { + "location": "zrh", + "cloudinit": "#cloud-config\n\n...", + } } class CepkoMock(Cepko): - result = SERVER_CONTEXT + def __init__(self, mocked_context): + self.result = mocked_context def all(self): return self -class DataSourceCloudSigmaTest(TestCase): +class DataSourceCloudSigmaTest(test_helpers.TestCase): def setUp(self): self.datasource = DataSourceCloudSigma.DataSourceCloudSigma("", "", "") - self.datasource.cepko = CepkoMock() + self.datasource.is_running_in_cloudsigma = lambda: True + self.datasource.cepko = CepkoMock(SERVER_CONTEXT) self.datasource.get_data() def test_get_hostname(self): @@ -57,3 +65,34 @@ class DataSourceCloudSigmaTest(TestCase): def test_user_data(self): self.assertEqual(self.datasource.userdata_raw, SERVER_CONTEXT['meta']['cloudinit-user-data']) + + def test_encoded_user_data(self): + encoded_context = copy.deepcopy(SERVER_CONTEXT) + encoded_context['meta']['base64_fields'] = 'cloudinit-user-data' + encoded_context['meta']['cloudinit-user-data'] = 'aGkgd29ybGQK' + self.datasource.cepko = CepkoMock(encoded_context) + self.datasource.get_data() + + self.assertEqual(self.datasource.userdata_raw, b'hi world\n') + + def test_vendor_data(self): + self.assertEqual(self.datasource.vendordata_raw, + SERVER_CONTEXT['vendor_data']['cloudinit']) + + def test_lack_of_vendor_data(self): + stripped_context = copy.deepcopy(SERVER_CONTEXT) + del stripped_context["vendor_data"] + self.datasource = DataSourceCloudSigma.DataSourceCloudSigma("", "", "") + self.datasource.cepko = CepkoMock(stripped_context) + self.datasource.get_data() + + self.assertIsNone(self.datasource.vendordata_raw) + + def test_lack_of_cloudinit_key_in_vendor_data(self): + stripped_context = copy.deepcopy(SERVER_CONTEXT) + del stripped_context["vendor_data"]["cloudinit"] + self.datasource = DataSourceCloudSigma.DataSourceCloudSigma("", "", "") + self.datasource.cepko = CepkoMock(stripped_context) + self.datasource.get_data() + + self.assertIsNone(self.datasource.vendordata_raw) diff --git a/tests/unittests/test_datasource/test_gce.py b/tests/unittests/test_datasource/test_gce.py index d91bd531..1979a0de 100644 --- a/tests/unittests/test_datasource/test_gce.py +++ b/tests/unittests/test_datasource/test_gce.py @@ -15,7 +15,6 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import unittest import httpretty import re @@ -25,6 +24,8 @@ from cloudinit import settings from cloudinit import helpers from cloudinit.sources import DataSourceGCE +from tests.unittests import helpers as test_helpers + GCE_META = { 'instance/id': '123', 'instance/zone': 'foo/bar', @@ -54,7 +55,7 @@ def _request_callback(method, uri, headers): return (404, headers, '') -class TestDataSourceGCE(unittest.TestCase): +class TestDataSourceGCE(test_helpers.TestCase): def setUp(self): self.ds = DataSourceGCE.DataSourceGCE( diff --git a/tests/unittests/test_datasource/test_maas.py b/tests/unittests/test_datasource/test_maas.py index bd5d23fd..73cfadcb 100644 --- a/tests/unittests/test_datasource/test_maas.py +++ b/tests/unittests/test_datasource/test_maas.py @@ -3,7 +3,6 @@ import os from cloudinit.sources import DataSourceMAAS from cloudinit import url_helper -from cloudinit import util from tests.unittests.helpers import populate_dir import mocker diff --git a/tests/unittests/test_datasource/test_opennebula.py b/tests/unittests/test_datasource/test_opennebula.py index 6fc5b2ac..ec6b752b 100644 --- a/tests/unittests/test_datasource/test_opennebula.py +++ b/tests/unittests/test_datasource/test_opennebula.py @@ -4,6 +4,7 @@ from cloudinit import util from mocker import MockerTestCase from tests.unittests.helpers import populate_dir +from base64 import b64encode import os import pwd @@ -164,10 +165,31 @@ class TestOpenNebulaDataSource(MockerTestCase): public_keys.append(SSH_KEY % (c + 1,)) - def test_user_data(self): + def test_user_data_plain(self): for k in ('USER_DATA', 'USERDATA'): my_d = os.path.join(self.tmp, k) - populate_context_dir(my_d, {k: USER_DATA}) + populate_context_dir(my_d, {k: USER_DATA, + 'USERDATA_ENCODING': ''}) + results = ds.read_context_disk_dir(my_d) + + self.assertTrue('userdata' in results) + self.assertEqual(USER_DATA, results['userdata']) + + def test_user_data_encoding_required_for_decode(self): + b64userdata = b64encode(USER_DATA) + for k in ('USER_DATA', 'USERDATA'): + my_d = os.path.join(self.tmp, k) + populate_context_dir(my_d, {k: b64userdata}) + results = ds.read_context_disk_dir(my_d) + + self.assertTrue('userdata' in results) + self.assertEqual(b64userdata, results['userdata']) + + def test_user_data_base64_encoding(self): + for k in ('USER_DATA', 'USERDATA'): + my_d = os.path.join(self.tmp, k) + populate_context_dir(my_d, {k: b64encode(USER_DATA), + 'USERDATA_ENCODING': 'base64'}) results = ds.read_context_disk_dir(my_d) self.assertTrue('userdata' in results) diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py index 8f9fa27d..f64aea07 100644 --- a/tests/unittests/test_datasource/test_smartos.py +++ b/tests/unittests/test_datasource/test_smartos.py @@ -24,10 +24,7 @@ import base64 from cloudinit import helpers as c_helpers -from cloudinit import stages -from cloudinit import util from cloudinit.sources import DataSourceSmartOS -from cloudinit.settings import (PER_INSTANCE) from tests.unittests import helpers import os import os.path @@ -174,6 +171,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): self.apply_patches([(mod, 'get_serial', _get_serial)]) self.apply_patches([(mod, 'dmi_data', _dmi_data)]) self.apply_patches([(os, 'uname', _os_uname)]) + self.apply_patches([(mod, 'device_exists', lambda d: True)]) dsrc = mod.DataSourceSmartOS(sys_cfg, distro=None, paths=self.paths) return dsrc diff --git a/tests/unittests/test_handler/test_handler_seed_random.py b/tests/unittests/test_handler/test_handler_seed_random.py index 2b21ac02..be2fa4a4 100644 --- a/tests/unittests/test_handler/test_handler_seed_random.py +++ b/tests/unittests/test_handler/test_handler_seed_random.py @@ -42,10 +42,32 @@ class TestRandomSeed(t_help.TestCase): def setUp(self): super(TestRandomSeed, self).setUp() self._seed_file = tempfile.mktemp() + self.unapply = [] + + # by default 'which' has nothing in its path + self.apply_patches([(util, 'which', self._which)]) + self.apply_patches([(util, 'subp', self._subp)]) + self.subp_called = [] + self.whichdata = {} def tearDown(self): + apply_patches([i for i in reversed(self.unapply)]) util.del_file(self._seed_file) + def apply_patches(self, patches): + ret = apply_patches(patches) + self.unapply += ret + + def _which(self, program): + return self.whichdata.get(program) + + def _subp(self, *args, **kwargs): + # supports subp calling with cmd as args or kwargs + if 'args' not in kwargs: + kwargs['args'] = args[0] + self.subp_called.append(kwargs) + return + def _compress(self, text): contents = StringIO() gz_fh = gzip.GzipFile(mode='wb', fileobj=contents) @@ -148,3 +170,56 @@ class TestRandomSeed(t_help.TestCase): cc_seed_random.handle('test', cfg, c, LOG, []) contents = util.load_file(self._seed_file) self.assertEquals('tiny-tim-was-here-so-was-josh', contents) + + def test_seed_command_not_provided_pollinate_available(self): + c = self._get_cloud('ubuntu', {}) + self.whichdata = {'pollinate': '/usr/bin/pollinate'} + cc_seed_random.handle('test', {}, c, LOG, []) + + subp_args = [f['args'] for f in self.subp_called] + self.assertIn(['pollinate', '-q'], subp_args) + + def test_seed_command_not_provided_pollinate_not_available(self): + c = self._get_cloud('ubuntu', {}) + self.whichdata = {} + cc_seed_random.handle('test', {}, c, LOG, []) + + # subp should not have been called as which would say not available + self.assertEquals(self.subp_called, list()) + + def test_unavailable_seed_command_and_required_raises_error(self): + c = self._get_cloud('ubuntu', {}) + self.whichdata = {} + self.assertRaises(ValueError, cc_seed_random.handle, + 'test', {'random_seed': {'command_required': True}}, c, LOG, []) + + def test_seed_command_and_required(self): + c = self._get_cloud('ubuntu', {}) + self.whichdata = {'foo': 'foo'} + cfg = {'random_seed': {'command_required': True, 'command': ['foo']}} + cc_seed_random.handle('test', cfg, c, LOG, []) + + self.assertIn(['foo'], [f['args'] for f in self.subp_called]) + + def test_file_in_environment_for_command(self): + c = self._get_cloud('ubuntu', {}) + self.whichdata = {'foo': 'foo'} + cfg = {'random_seed': {'command_required': True, 'command': ['foo'], + 'file': self._seed_file}} + cc_seed_random.handle('test', cfg, c, LOG, []) + + # this just instists that the first time subp was called, + # RANDOM_SEED_FILE was in the environment set up correctly + subp_env = [f['env'] for f in self.subp_called] + self.assertEqual(subp_env[0].get('RANDOM_SEED_FILE'), self._seed_file) + + +def apply_patches(patches): + ret = [] + for (ref, name, replace) in patches: + if replace is None: + continue + orig = getattr(ref, name) + setattr(ref, name, replace) + ret.append((ref, name, orig)) + return ret diff --git a/tests/unittests/test_handler/test_handler_yum_add_repo.py b/tests/unittests/test_handler/test_handler_yum_add_repo.py index 8df592f9..7c6f7c40 100644 --- a/tests/unittests/test_handler/test_handler_yum_add_repo.py +++ b/tests/unittests/test_handler/test_handler_yum_add_repo.py @@ -1,4 +1,3 @@ -from cloudinit import helpers from cloudinit import util from cloudinit.config import cc_yum_add_repo |