summaryrefslogtreecommitdiff
path: root/tests/unittests
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests')
-rw-r--r--tests/unittests/helpers.py24
-rw-r--r--tests/unittests/test__init__.py6
-rw-r--r--tests/unittests/test_datasource/test_cloudsigma.py49
-rw-r--r--tests/unittests/test_datasource/test_gce.py5
-rw-r--r--tests/unittests/test_datasource/test_maas.py1
-rw-r--r--tests/unittests/test_datasource/test_opennebula.py26
-rw-r--r--tests/unittests/test_datasource/test_smartos.py4
-rw-r--r--tests/unittests/test_handler/test_handler_seed_random.py75
-rw-r--r--tests/unittests/test_handler/test_handler_yum_add_repo.py1
9 files changed, 172 insertions, 19 deletions
diff --git a/tests/unittests/helpers.py b/tests/unittests/helpers.py
index 5bed13cc..970eb8cb 100644
--- a/tests/unittests/helpers.py
+++ b/tests/unittests/helpers.py
@@ -52,6 +52,30 @@ if PY26:
standardMsg = standardMsg % (value)
self.fail(self._formatMessage(msg, standardMsg))
+ def assertDictContainsSubset(self, expected, actual, msg=None):
+ missing = []
+ mismatched = []
+ for k, v in expected.iteritems():
+ if k not in actual:
+ missing.append(k)
+ elif actual[k] != v:
+ mismatched.append('%r, expected: %r, actual: %r'
+ % (k, v, actual[k]))
+
+ if len(missing) == 0 and len(mismatched) == 0:
+ return
+
+ standardMsg = ''
+ if missing:
+ standardMsg = 'Missing: %r' % ','.join(m for m in missing)
+ if mismatched:
+ if standardMsg:
+ standardMsg += '; '
+ standardMsg += 'Mismatched values: %s' % ','.join(mismatched)
+
+ self.fail(self._formatMessage(msg, standardMsg))
+
+
else:
class TestCase(unittest.TestCase):
pass
diff --git a/tests/unittests/test__init__.py b/tests/unittests/test__init__.py
index 8c41c1ca..03065c8b 100644
--- a/tests/unittests/test__init__.py
+++ b/tests/unittests/test__init__.py
@@ -1,14 +1,10 @@
-import logging
import os
-import StringIO
-import sys
-from mocker import MockerTestCase, ANY, ARGS, KWARGS
+from mocker import MockerTestCase, ARGS, KWARGS
from cloudinit import handlers
from cloudinit import helpers
from cloudinit import importer
-from cloudinit import log
from cloudinit import settings
from cloudinit import url_helper
from cloudinit import util
diff --git a/tests/unittests/test_datasource/test_cloudsigma.py b/tests/unittests/test_datasource/test_cloudsigma.py
index 3245aba1..eadb3cb7 100644
--- a/tests/unittests/test_datasource/test_cloudsigma.py
+++ b/tests/unittests/test_datasource/test_cloudsigma.py
@@ -1,9 +1,11 @@
# coding: utf-8
-from unittest import TestCase
+import copy
from cloudinit.cs_utils import Cepko
from cloudinit.sources import DataSourceCloudSigma
+from tests.unittests import helpers as test_helpers
+
SERVER_CONTEXT = {
"cpu": 1000,
@@ -19,21 +21,27 @@ SERVER_CONTEXT = {
"smp": 1,
"tags": ["much server", "very performance"],
"uuid": "65b2fb23-8c03-4187-a3ba-8b7c919e8890",
- "vnc_password": "9e84d6cb49e46379"
+ "vnc_password": "9e84d6cb49e46379",
+ "vendor_data": {
+ "location": "zrh",
+ "cloudinit": "#cloud-config\n\n...",
+ }
}
class CepkoMock(Cepko):
- result = SERVER_CONTEXT
+ def __init__(self, mocked_context):
+ self.result = mocked_context
def all(self):
return self
-class DataSourceCloudSigmaTest(TestCase):
+class DataSourceCloudSigmaTest(test_helpers.TestCase):
def setUp(self):
self.datasource = DataSourceCloudSigma.DataSourceCloudSigma("", "", "")
- self.datasource.cepko = CepkoMock()
+ self.datasource.is_running_in_cloudsigma = lambda: True
+ self.datasource.cepko = CepkoMock(SERVER_CONTEXT)
self.datasource.get_data()
def test_get_hostname(self):
@@ -57,3 +65,34 @@ class DataSourceCloudSigmaTest(TestCase):
def test_user_data(self):
self.assertEqual(self.datasource.userdata_raw,
SERVER_CONTEXT['meta']['cloudinit-user-data'])
+
+ def test_encoded_user_data(self):
+ encoded_context = copy.deepcopy(SERVER_CONTEXT)
+ encoded_context['meta']['base64_fields'] = 'cloudinit-user-data'
+ encoded_context['meta']['cloudinit-user-data'] = 'aGkgd29ybGQK'
+ self.datasource.cepko = CepkoMock(encoded_context)
+ self.datasource.get_data()
+
+ self.assertEqual(self.datasource.userdata_raw, b'hi world\n')
+
+ def test_vendor_data(self):
+ self.assertEqual(self.datasource.vendordata_raw,
+ SERVER_CONTEXT['vendor_data']['cloudinit'])
+
+ def test_lack_of_vendor_data(self):
+ stripped_context = copy.deepcopy(SERVER_CONTEXT)
+ del stripped_context["vendor_data"]
+ self.datasource = DataSourceCloudSigma.DataSourceCloudSigma("", "", "")
+ self.datasource.cepko = CepkoMock(stripped_context)
+ self.datasource.get_data()
+
+ self.assertIsNone(self.datasource.vendordata_raw)
+
+ def test_lack_of_cloudinit_key_in_vendor_data(self):
+ stripped_context = copy.deepcopy(SERVER_CONTEXT)
+ del stripped_context["vendor_data"]["cloudinit"]
+ self.datasource = DataSourceCloudSigma.DataSourceCloudSigma("", "", "")
+ self.datasource.cepko = CepkoMock(stripped_context)
+ self.datasource.get_data()
+
+ self.assertIsNone(self.datasource.vendordata_raw)
diff --git a/tests/unittests/test_datasource/test_gce.py b/tests/unittests/test_datasource/test_gce.py
index d91bd531..1979a0de 100644
--- a/tests/unittests/test_datasource/test_gce.py
+++ b/tests/unittests/test_datasource/test_gce.py
@@ -15,7 +15,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import unittest
import httpretty
import re
@@ -25,6 +24,8 @@ from cloudinit import settings
from cloudinit import helpers
from cloudinit.sources import DataSourceGCE
+from tests.unittests import helpers as test_helpers
+
GCE_META = {
'instance/id': '123',
'instance/zone': 'foo/bar',
@@ -54,7 +55,7 @@ def _request_callback(method, uri, headers):
return (404, headers, '')
-class TestDataSourceGCE(unittest.TestCase):
+class TestDataSourceGCE(test_helpers.TestCase):
def setUp(self):
self.ds = DataSourceGCE.DataSourceGCE(
diff --git a/tests/unittests/test_datasource/test_maas.py b/tests/unittests/test_datasource/test_maas.py
index bd5d23fd..73cfadcb 100644
--- a/tests/unittests/test_datasource/test_maas.py
+++ b/tests/unittests/test_datasource/test_maas.py
@@ -3,7 +3,6 @@ import os
from cloudinit.sources import DataSourceMAAS
from cloudinit import url_helper
-from cloudinit import util
from tests.unittests.helpers import populate_dir
import mocker
diff --git a/tests/unittests/test_datasource/test_opennebula.py b/tests/unittests/test_datasource/test_opennebula.py
index 6fc5b2ac..ec6b752b 100644
--- a/tests/unittests/test_datasource/test_opennebula.py
+++ b/tests/unittests/test_datasource/test_opennebula.py
@@ -4,6 +4,7 @@ from cloudinit import util
from mocker import MockerTestCase
from tests.unittests.helpers import populate_dir
+from base64 import b64encode
import os
import pwd
@@ -164,10 +165,31 @@ class TestOpenNebulaDataSource(MockerTestCase):
public_keys.append(SSH_KEY % (c + 1,))
- def test_user_data(self):
+ def test_user_data_plain(self):
for k in ('USER_DATA', 'USERDATA'):
my_d = os.path.join(self.tmp, k)
- populate_context_dir(my_d, {k: USER_DATA})
+ populate_context_dir(my_d, {k: USER_DATA,
+ 'USERDATA_ENCODING': ''})
+ results = ds.read_context_disk_dir(my_d)
+
+ self.assertTrue('userdata' in results)
+ self.assertEqual(USER_DATA, results['userdata'])
+
+ def test_user_data_encoding_required_for_decode(self):
+ b64userdata = b64encode(USER_DATA)
+ for k in ('USER_DATA', 'USERDATA'):
+ my_d = os.path.join(self.tmp, k)
+ populate_context_dir(my_d, {k: b64userdata})
+ results = ds.read_context_disk_dir(my_d)
+
+ self.assertTrue('userdata' in results)
+ self.assertEqual(b64userdata, results['userdata'])
+
+ def test_user_data_base64_encoding(self):
+ for k in ('USER_DATA', 'USERDATA'):
+ my_d = os.path.join(self.tmp, k)
+ populate_context_dir(my_d, {k: b64encode(USER_DATA),
+ 'USERDATA_ENCODING': 'base64'})
results = ds.read_context_disk_dir(my_d)
self.assertTrue('userdata' in results)
diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py
index 8f9fa27d..f64aea07 100644
--- a/tests/unittests/test_datasource/test_smartos.py
+++ b/tests/unittests/test_datasource/test_smartos.py
@@ -24,10 +24,7 @@
import base64
from cloudinit import helpers as c_helpers
-from cloudinit import stages
-from cloudinit import util
from cloudinit.sources import DataSourceSmartOS
-from cloudinit.settings import (PER_INSTANCE)
from tests.unittests import helpers
import os
import os.path
@@ -174,6 +171,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
self.apply_patches([(mod, 'get_serial', _get_serial)])
self.apply_patches([(mod, 'dmi_data', _dmi_data)])
self.apply_patches([(os, 'uname', _os_uname)])
+ self.apply_patches([(mod, 'device_exists', lambda d: True)])
dsrc = mod.DataSourceSmartOS(sys_cfg, distro=None,
paths=self.paths)
return dsrc
diff --git a/tests/unittests/test_handler/test_handler_seed_random.py b/tests/unittests/test_handler/test_handler_seed_random.py
index 2b21ac02..be2fa4a4 100644
--- a/tests/unittests/test_handler/test_handler_seed_random.py
+++ b/tests/unittests/test_handler/test_handler_seed_random.py
@@ -42,10 +42,32 @@ class TestRandomSeed(t_help.TestCase):
def setUp(self):
super(TestRandomSeed, self).setUp()
self._seed_file = tempfile.mktemp()
+ self.unapply = []
+
+ # by default 'which' has nothing in its path
+ self.apply_patches([(util, 'which', self._which)])
+ self.apply_patches([(util, 'subp', self._subp)])
+ self.subp_called = []
+ self.whichdata = {}
def tearDown(self):
+ apply_patches([i for i in reversed(self.unapply)])
util.del_file(self._seed_file)
+ def apply_patches(self, patches):
+ ret = apply_patches(patches)
+ self.unapply += ret
+
+ def _which(self, program):
+ return self.whichdata.get(program)
+
+ def _subp(self, *args, **kwargs):
+ # supports subp calling with cmd as args or kwargs
+ if 'args' not in kwargs:
+ kwargs['args'] = args[0]
+ self.subp_called.append(kwargs)
+ return
+
def _compress(self, text):
contents = StringIO()
gz_fh = gzip.GzipFile(mode='wb', fileobj=contents)
@@ -148,3 +170,56 @@ class TestRandomSeed(t_help.TestCase):
cc_seed_random.handle('test', cfg, c, LOG, [])
contents = util.load_file(self._seed_file)
self.assertEquals('tiny-tim-was-here-so-was-josh', contents)
+
+ def test_seed_command_not_provided_pollinate_available(self):
+ c = self._get_cloud('ubuntu', {})
+ self.whichdata = {'pollinate': '/usr/bin/pollinate'}
+ cc_seed_random.handle('test', {}, c, LOG, [])
+
+ subp_args = [f['args'] for f in self.subp_called]
+ self.assertIn(['pollinate', '-q'], subp_args)
+
+ def test_seed_command_not_provided_pollinate_not_available(self):
+ c = self._get_cloud('ubuntu', {})
+ self.whichdata = {}
+ cc_seed_random.handle('test', {}, c, LOG, [])
+
+ # subp should not have been called as which would say not available
+ self.assertEquals(self.subp_called, list())
+
+ def test_unavailable_seed_command_and_required_raises_error(self):
+ c = self._get_cloud('ubuntu', {})
+ self.whichdata = {}
+ self.assertRaises(ValueError, cc_seed_random.handle,
+ 'test', {'random_seed': {'command_required': True}}, c, LOG, [])
+
+ def test_seed_command_and_required(self):
+ c = self._get_cloud('ubuntu', {})
+ self.whichdata = {'foo': 'foo'}
+ cfg = {'random_seed': {'command_required': True, 'command': ['foo']}}
+ cc_seed_random.handle('test', cfg, c, LOG, [])
+
+ self.assertIn(['foo'], [f['args'] for f in self.subp_called])
+
+ def test_file_in_environment_for_command(self):
+ c = self._get_cloud('ubuntu', {})
+ self.whichdata = {'foo': 'foo'}
+ cfg = {'random_seed': {'command_required': True, 'command': ['foo'],
+ 'file': self._seed_file}}
+ cc_seed_random.handle('test', cfg, c, LOG, [])
+
+ # this just instists that the first time subp was called,
+ # RANDOM_SEED_FILE was in the environment set up correctly
+ subp_env = [f['env'] for f in self.subp_called]
+ self.assertEqual(subp_env[0].get('RANDOM_SEED_FILE'), self._seed_file)
+
+
+def apply_patches(patches):
+ ret = []
+ for (ref, name, replace) in patches:
+ if replace is None:
+ continue
+ orig = getattr(ref, name)
+ setattr(ref, name, replace)
+ ret.append((ref, name, orig))
+ return ret
diff --git a/tests/unittests/test_handler/test_handler_yum_add_repo.py b/tests/unittests/test_handler/test_handler_yum_add_repo.py
index 8df592f9..7c6f7c40 100644
--- a/tests/unittests/test_handler/test_handler_yum_add_repo.py
+++ b/tests/unittests/test_handler/test_handler_yum_add_repo.py
@@ -1,4 +1,3 @@
-from cloudinit import helpers
from cloudinit import util
from cloudinit.config import cc_yum_add_repo