summaryrefslogtreecommitdiff
path: root/tests/unittests/test_datasource
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/test_datasource')
-rw-r--r--tests/unittests/test_datasource/test_azure.py28
-rw-r--r--tests/unittests/test_datasource/test_opennebula.py11
-rw-r--r--tests/unittests/test_datasource/test_smartos.py14
3 files changed, 16 insertions, 37 deletions
diff --git a/tests/unittests/test_datasource/test_azure.py b/tests/unittests/test_datasource/test_azure.py
index 97a53bee..965bce4b 100644
--- a/tests/unittests/test_datasource/test_azure.py
+++ b/tests/unittests/test_datasource/test_azure.py
@@ -1,5 +1,5 @@
from cloudinit import helpers
-from cloudinit.util import load_file
+from cloudinit.util import b64e, load_file
from cloudinit.sources import DataSourceAzure
from ..helpers import TestCase, populate_dir
@@ -12,7 +12,6 @@ try:
except ImportError:
from contextlib2 import ExitStack
-import base64
import crypt
import os
import stat
@@ -22,13 +21,6 @@ import tempfile
import unittest
-def b64(source):
- # In Python 3, b64encode only accepts bytes and returns bytes.
- if not isinstance(source, bytes):
- source = source.encode('utf-8')
- return base64.b64encode(source).decode('us-ascii')
-
-
def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None):
if data is None:
data = {'HostName': 'FOOHOST'}
@@ -58,7 +50,7 @@ def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None):
content += "<%s%s>%s</%s>\n" % (key, attrs, val, key)
if userdata:
- content += "<UserData>%s</UserData>\n" % (b64(userdata))
+ content += "<UserData>%s</UserData>\n" % (b64e(userdata))
if pubkeys:
content += "<SSH><PublicKeys>\n"
@@ -189,7 +181,7 @@ class TestAzureDataSource(TestCase):
# set dscfg in via base64 encoded yaml
cfg = {'agent_command': "my_command"}
odata = {'HostName': "myhost", 'UserName': "myuser",
- 'dscfg': {'text': b64(yaml.dump(cfg)),
+ 'dscfg': {'text': b64e(yaml.dump(cfg)),
'encoding': 'base64'}}
data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
@@ -241,7 +233,7 @@ class TestAzureDataSource(TestCase):
def test_userdata_found(self):
mydata = "FOOBAR"
- odata = {'UserData': b64(mydata)}
+ odata = {'UserData': b64e(mydata)}
data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
dsrc = self._get_ds(data)
@@ -289,7 +281,7 @@ class TestAzureDataSource(TestCase):
'command': 'my-bounce-command',
'hostname_command': 'my-hostname-command'}}
odata = {'HostName': "xhost",
- 'dscfg': {'text': b64(yaml.dump(cfg)),
+ 'dscfg': {'text': b64e(yaml.dump(cfg)),
'encoding': 'base64'}}
data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
self._get_ds(data).get_data()
@@ -304,7 +296,7 @@ class TestAzureDataSource(TestCase):
# config specifying set_hostname off should not bounce
cfg = {'set_hostname': False}
odata = {'HostName': "xhost",
- 'dscfg': {'text': b64(yaml.dump(cfg)),
+ 'dscfg': {'text': b64e(yaml.dump(cfg)),
'encoding': 'base64'}}
data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
self._get_ds(data).get_data()
@@ -333,7 +325,7 @@ class TestAzureDataSource(TestCase):
# Make sure that user can affect disk aliases
dscfg = {'disk_aliases': {'ephemeral0': '/dev/sdc'}}
odata = {'HostName': "myhost", 'UserName': "myuser",
- 'dscfg': {'text': b64(yaml.dump(dscfg)),
+ 'dscfg': {'text': b64e(yaml.dump(dscfg)),
'encoding': 'base64'}}
usercfg = {'disk_setup': {'/dev/sdc': {'something': '...'},
'ephemeral0': False}}
@@ -370,7 +362,7 @@ class TestAzureDataSource(TestCase):
def test_existing_ovf_same(self):
# waagent/SharedConfig left alone if found ovf-env.xml same as cached
- odata = {'UserData': b64("SOMEUSERDATA")}
+ odata = {'UserData': b64e("SOMEUSERDATA")}
data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
populate_dir(self.waagent_d,
@@ -394,9 +386,9 @@ class TestAzureDataSource(TestCase):
# 'get_data' should remove SharedConfig.xml in /var/lib/waagent
# if ovf-env.xml differs.
cached_ovfenv = construct_valid_ovf_env(
- {'userdata': b64("FOO_USERDATA")})
+ {'userdata': b64e("FOO_USERDATA")})
new_ovfenv = construct_valid_ovf_env(
- {'userdata': b64("NEW_USERDATA")})
+ {'userdata': b64e("NEW_USERDATA")})
populate_dir(self.waagent_d,
{'ovf-env.xml': cached_ovfenv,
diff --git a/tests/unittests/test_datasource/test_opennebula.py b/tests/unittests/test_datasource/test_opennebula.py
index e5a4bd18..27adf21b 100644
--- a/tests/unittests/test_datasource/test_opennebula.py
+++ b/tests/unittests/test_datasource/test_opennebula.py
@@ -3,19 +3,12 @@ from cloudinit.sources import DataSourceOpenNebula as ds
from cloudinit import util
from ..helpers import TestCase, populate_dir
-from base64 import b64encode
import os
import pwd
import shutil
import tempfile
import unittest
-def b64(source):
- # In Python 3, b64encode only accepts bytes and returns bytes.
- if not isinstance(source, bytes):
- source = source.encode('utf-8')
- return b64encode(source).decode('us-ascii')
-
TEST_VARS = {
'VAR1': 'single',
@@ -186,7 +179,7 @@ class TestOpenNebulaDataSource(TestCase):
self.assertEqual(USER_DATA, results['userdata'])
def test_user_data_encoding_required_for_decode(self):
- b64userdata = b64(USER_DATA)
+ b64userdata = util.b64e(USER_DATA)
for k in ('USER_DATA', 'USERDATA'):
my_d = os.path.join(self.tmp, k)
populate_context_dir(my_d, {k: b64userdata})
@@ -198,7 +191,7 @@ class TestOpenNebulaDataSource(TestCase):
def test_user_data_base64_encoding(self):
for k in ('USER_DATA', 'USERDATA'):
my_d = os.path.join(self.tmp, k)
- populate_context_dir(my_d, {k: b64(USER_DATA),
+ populate_context_dir(my_d, {k: util.b64e(USER_DATA),
'USERDATA_ENCODING': 'base64'})
results = ds.read_context_disk_dir(my_d)
diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py
index b5ebf94d..8b62b1b1 100644
--- a/tests/unittests/test_datasource/test_smartos.py
+++ b/tests/unittests/test_datasource/test_smartos.py
@@ -24,9 +24,9 @@
from __future__ import print_function
-import base64
from cloudinit import helpers as c_helpers
from cloudinit.sources import DataSourceSmartOS
+from cloudinit.util import b64e
from .. import helpers
import os
import os.path
@@ -36,12 +36,6 @@ import tempfile
import stat
import uuid
-def b64(source):
- # In Python 3, b64encode only accepts bytes and returns bytes.
- if not isinstance(source, bytes):
- source = source.encode('utf-8')
- return base64.b64encode(source).decode('us-ascii')
-
MOCK_RETURNS = {
'hostname': 'test-host',
@@ -239,7 +233,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
my_returns = MOCK_RETURNS.copy()
my_returns['base64_all'] = "true"
for k in ('hostname', 'cloud-init:user-data'):
- my_returns[k] = b64(my_returns[k])
+ my_returns[k] = b64e(my_returns[k])
dsrc = self._get_ds(mockdata=my_returns)
ret = dsrc.get_data()
@@ -260,7 +254,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
my_returns['b64-cloud-init:user-data'] = "true"
my_returns['b64-hostname'] = "true"
for k in ('hostname', 'cloud-init:user-data'):
- my_returns[k] = b64(my_returns[k])
+ my_returns[k] = b64e(my_returns[k])
dsrc = self._get_ds(mockdata=my_returns)
ret = dsrc.get_data()
@@ -276,7 +270,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
my_returns = MOCK_RETURNS.copy()
my_returns['base64_keys'] = 'hostname,ignored'
for k in ('hostname',):
- my_returns[k] = b64(my_returns[k])
+ my_returns[k] = b64e(my_returns[k])
dsrc = self._get_ds(mockdata=my_returns)
ret = dsrc.get_data()