summaryrefslogtreecommitdiff
path: root/tests/unittests/test_datasource
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/test_datasource')
-rw-r--r--tests/unittests/test_datasource/test_cloudsigma.py49
-rw-r--r--tests/unittests/test_datasource/test_gce.py5
-rw-r--r--tests/unittests/test_datasource/test_maas.py1
-rw-r--r--tests/unittests/test_datasource/test_opennebula.py26
-rw-r--r--tests/unittests/test_datasource/test_smartos.py4
5 files changed, 72 insertions, 13 deletions
diff --git a/tests/unittests/test_datasource/test_cloudsigma.py b/tests/unittests/test_datasource/test_cloudsigma.py
index 3245aba1..eadb3cb7 100644
--- a/tests/unittests/test_datasource/test_cloudsigma.py
+++ b/tests/unittests/test_datasource/test_cloudsigma.py
@@ -1,9 +1,11 @@
# coding: utf-8
-from unittest import TestCase
+import copy
from cloudinit.cs_utils import Cepko
from cloudinit.sources import DataSourceCloudSigma
+from tests.unittests import helpers as test_helpers
+
SERVER_CONTEXT = {
"cpu": 1000,
@@ -19,21 +21,27 @@ SERVER_CONTEXT = {
"smp": 1,
"tags": ["much server", "very performance"],
"uuid": "65b2fb23-8c03-4187-a3ba-8b7c919e8890",
- "vnc_password": "9e84d6cb49e46379"
+ "vnc_password": "9e84d6cb49e46379",
+ "vendor_data": {
+ "location": "zrh",
+ "cloudinit": "#cloud-config\n\n...",
+ }
}
class CepkoMock(Cepko):
- result = SERVER_CONTEXT
+ def __init__(self, mocked_context):
+ self.result = mocked_context
def all(self):
return self
-class DataSourceCloudSigmaTest(TestCase):
+class DataSourceCloudSigmaTest(test_helpers.TestCase):
def setUp(self):
self.datasource = DataSourceCloudSigma.DataSourceCloudSigma("", "", "")
- self.datasource.cepko = CepkoMock()
+ self.datasource.is_running_in_cloudsigma = lambda: True
+ self.datasource.cepko = CepkoMock(SERVER_CONTEXT)
self.datasource.get_data()
def test_get_hostname(self):
@@ -57,3 +65,34 @@ class DataSourceCloudSigmaTest(TestCase):
def test_user_data(self):
self.assertEqual(self.datasource.userdata_raw,
SERVER_CONTEXT['meta']['cloudinit-user-data'])
+
+ def test_encoded_user_data(self):
+ encoded_context = copy.deepcopy(SERVER_CONTEXT)
+ encoded_context['meta']['base64_fields'] = 'cloudinit-user-data'
+ encoded_context['meta']['cloudinit-user-data'] = 'aGkgd29ybGQK'
+ self.datasource.cepko = CepkoMock(encoded_context)
+ self.datasource.get_data()
+
+ self.assertEqual(self.datasource.userdata_raw, b'hi world\n')
+
+ def test_vendor_data(self):
+ self.assertEqual(self.datasource.vendordata_raw,
+ SERVER_CONTEXT['vendor_data']['cloudinit'])
+
+ def test_lack_of_vendor_data(self):
+ stripped_context = copy.deepcopy(SERVER_CONTEXT)
+ del stripped_context["vendor_data"]
+ self.datasource = DataSourceCloudSigma.DataSourceCloudSigma("", "", "")
+ self.datasource.cepko = CepkoMock(stripped_context)
+ self.datasource.get_data()
+
+ self.assertIsNone(self.datasource.vendordata_raw)
+
+ def test_lack_of_cloudinit_key_in_vendor_data(self):
+ stripped_context = copy.deepcopy(SERVER_CONTEXT)
+ del stripped_context["vendor_data"]["cloudinit"]
+ self.datasource = DataSourceCloudSigma.DataSourceCloudSigma("", "", "")
+ self.datasource.cepko = CepkoMock(stripped_context)
+ self.datasource.get_data()
+
+ self.assertIsNone(self.datasource.vendordata_raw)
diff --git a/tests/unittests/test_datasource/test_gce.py b/tests/unittests/test_datasource/test_gce.py
index d91bd531..1979a0de 100644
--- a/tests/unittests/test_datasource/test_gce.py
+++ b/tests/unittests/test_datasource/test_gce.py
@@ -15,7 +15,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import unittest
import httpretty
import re
@@ -25,6 +24,8 @@ from cloudinit import settings
from cloudinit import helpers
from cloudinit.sources import DataSourceGCE
+from tests.unittests import helpers as test_helpers
+
GCE_META = {
'instance/id': '123',
'instance/zone': 'foo/bar',
@@ -54,7 +55,7 @@ def _request_callback(method, uri, headers):
return (404, headers, '')
-class TestDataSourceGCE(unittest.TestCase):
+class TestDataSourceGCE(test_helpers.TestCase):
def setUp(self):
self.ds = DataSourceGCE.DataSourceGCE(
diff --git a/tests/unittests/test_datasource/test_maas.py b/tests/unittests/test_datasource/test_maas.py
index bd5d23fd..73cfadcb 100644
--- a/tests/unittests/test_datasource/test_maas.py
+++ b/tests/unittests/test_datasource/test_maas.py
@@ -3,7 +3,6 @@ import os
from cloudinit.sources import DataSourceMAAS
from cloudinit import url_helper
-from cloudinit import util
from tests.unittests.helpers import populate_dir
import mocker
diff --git a/tests/unittests/test_datasource/test_opennebula.py b/tests/unittests/test_datasource/test_opennebula.py
index 6fc5b2ac..ec6b752b 100644
--- a/tests/unittests/test_datasource/test_opennebula.py
+++ b/tests/unittests/test_datasource/test_opennebula.py
@@ -4,6 +4,7 @@ from cloudinit import util
from mocker import MockerTestCase
from tests.unittests.helpers import populate_dir
+from base64 import b64encode
import os
import pwd
@@ -164,10 +165,31 @@ class TestOpenNebulaDataSource(MockerTestCase):
public_keys.append(SSH_KEY % (c + 1,))
- def test_user_data(self):
+ def test_user_data_plain(self):
for k in ('USER_DATA', 'USERDATA'):
my_d = os.path.join(self.tmp, k)
- populate_context_dir(my_d, {k: USER_DATA})
+ populate_context_dir(my_d, {k: USER_DATA,
+ 'USERDATA_ENCODING': ''})
+ results = ds.read_context_disk_dir(my_d)
+
+ self.assertTrue('userdata' in results)
+ self.assertEqual(USER_DATA, results['userdata'])
+
+ def test_user_data_encoding_required_for_decode(self):
+ b64userdata = b64encode(USER_DATA)
+ for k in ('USER_DATA', 'USERDATA'):
+ my_d = os.path.join(self.tmp, k)
+ populate_context_dir(my_d, {k: b64userdata})
+ results = ds.read_context_disk_dir(my_d)
+
+ self.assertTrue('userdata' in results)
+ self.assertEqual(b64userdata, results['userdata'])
+
+ def test_user_data_base64_encoding(self):
+ for k in ('USER_DATA', 'USERDATA'):
+ my_d = os.path.join(self.tmp, k)
+ populate_context_dir(my_d, {k: b64encode(USER_DATA),
+ 'USERDATA_ENCODING': 'base64'})
results = ds.read_context_disk_dir(my_d)
self.assertTrue('userdata' in results)
diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py
index 8f9fa27d..f64aea07 100644
--- a/tests/unittests/test_datasource/test_smartos.py
+++ b/tests/unittests/test_datasource/test_smartos.py
@@ -24,10 +24,7 @@
import base64
from cloudinit import helpers as c_helpers
-from cloudinit import stages
-from cloudinit import util
from cloudinit.sources import DataSourceSmartOS
-from cloudinit.settings import (PER_INSTANCE)
from tests.unittests import helpers
import os
import os.path
@@ -174,6 +171,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
self.apply_patches([(mod, 'get_serial', _get_serial)])
self.apply_patches([(mod, 'dmi_data', _dmi_data)])
self.apply_patches([(os, 'uname', _os_uname)])
+ self.apply_patches([(mod, 'device_exists', lambda d: True)])
dsrc = mod.DataSourceSmartOS(sys_cfg, distro=None,
paths=self.paths)
return dsrc