summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cloudinit/config/cc_seed_random.py8
-rw-r--r--cloudinit/config/cc_ssh_authkey_fingerprints.py2
-rw-r--r--cloudinit/sources/DataSourceOpenNebula.py7
-rw-r--r--cloudinit/sources/DataSourceSmartOS.py11
-rw-r--r--cloudinit/util.py20
-rw-r--r--tests/unittests/test_datasource/test_azure.py28
-rw-r--r--tests/unittests/test_datasource/test_opennebula.py11
-rw-r--r--tests/unittests/test_datasource/test_smartos.py14
-rw-r--r--tests/unittests/test_handler/test_handler_seed_random.py12
9 files changed, 42 insertions, 71 deletions
diff --git a/cloudinit/config/cc_seed_random.py b/cloudinit/config/cc_seed_random.py
index 981e1b08..bb64b0f5 100644
--- a/cloudinit/config/cc_seed_random.py
+++ b/cloudinit/config/cc_seed_random.py
@@ -38,13 +38,7 @@ def _decode(data, encoding=None):
if not encoding or encoding.lower() in ['raw']:
return data
elif encoding.lower() in ['base64', 'b64']:
- # Try to give us a native string in both Python 2 and 3, and remember
- # that b64decode() returns bytes in Python 3.
- decoded = base64.b64decode(data)
- try:
- return decoded.decode('utf-8')
- except UnicodeDecodeError:
- return decoded
+ return util.b64d(data)
elif encoding.lower() in ['gzip', 'gz']:
return util.decomp_gzip(data, quiet=False)
else:
diff --git a/cloudinit/config/cc_ssh_authkey_fingerprints.py b/cloudinit/config/cc_ssh_authkey_fingerprints.py
index 51580633..6ce831bc 100644
--- a/cloudinit/config/cc_ssh_authkey_fingerprints.py
+++ b/cloudinit/config/cc_ssh_authkey_fingerprints.py
@@ -32,7 +32,7 @@ from cloudinit import util
def _split_hash(bin_hash):
split_up = []
- for i in xrange(0, len(bin_hash), 2):
+ for i in range(0, len(bin_hash), 2):
split_up.append(bin_hash[i:i + 2])
return split_up
diff --git a/cloudinit/sources/DataSourceOpenNebula.py b/cloudinit/sources/DataSourceOpenNebula.py
index a0275cda..61709c1b 100644
--- a/cloudinit/sources/DataSourceOpenNebula.py
+++ b/cloudinit/sources/DataSourceOpenNebula.py
@@ -426,12 +426,7 @@ def read_context_disk_dir(source_dir, asuser=None):
context.get('USER_DATA_ENCODING'))
if encoding == "base64":
try:
- userdata = base64.b64decode(results['userdata'])
- # In Python 3 we still expect a str, but b64decode will return
- # bytes. Convert to str.
- if isinstance(userdata, bytes):
- userdata = userdata.decode('utf-8')
- results['userdata'] = userdata
+ results['userdata'] = util.b64d(results['userdata'])
except TypeError:
LOG.warn("Failed base64 decoding of userdata")
diff --git a/cloudinit/sources/DataSourceSmartOS.py b/cloudinit/sources/DataSourceSmartOS.py
index f59ad3d6..9d48beab 100644
--- a/cloudinit/sources/DataSourceSmartOS.py
+++ b/cloudinit/sources/DataSourceSmartOS.py
@@ -351,16 +351,7 @@ def query_data(noun, seed_device, seed_timeout, strip=False, default=None,
if b64:
try:
- # Generally, we want native strings in the values. Python 3's
- # b64decode will return bytes though, so decode them to utf-8 if
- # possible. If that fails, return the bytes.
- decoded = base64.b64decode(resp)
- try:
- if isinstance(decoded, bytes):
- return decoded.decode('utf-8')
- except UnicodeDecodeError:
- pass
- return decoded
+ return util.b64d(resp)
# Bogus input produces different errors in Python 2 and 3; catch both.
except (TypeError, binascii.Error):
LOG.warn("Failed base64 decoding key '%s'", noun)
diff --git a/cloudinit/util.py b/cloudinit/util.py
index 766f8e32..8916cc11 100644
--- a/cloudinit/util.py
+++ b/cloudinit/util.py
@@ -44,6 +44,7 @@ import sys
import tempfile
import time
+from base64 import b64decode, b64encode
from six.moves.urllib import parse as urlparse
import six
@@ -90,6 +91,25 @@ def encode_text(text, encoding='utf-8'):
return text
return text.encode(encoding)
+
+def b64d(source):
+ # Base64 decode some data, accepting bytes or unicode/str, and returning
+ # str/unicode if the result is utf-8 compatible, otherwise returning bytes.
+ decoded = b64decode(source)
+ if isinstance(decoded, bytes):
+ try:
+ return decoded.decode('utf-8')
+ except UnicodeDecodeError:
+ return decoded
+
+def b64e(source):
+ # Base64 encode some data, accepting bytes or unicode/str, and returning
+ # str/unicode if the result is utf-8 compatible, otherwise returning bytes.
+ if not isinstance(source, bytes):
+ source = source.encode('utf-8')
+ return b64encode(source).decode('utf-8')
+
+
# Path for DMI Data
DMI_SYS_PATH = "/sys/class/dmi/id"
diff --git a/tests/unittests/test_datasource/test_azure.py b/tests/unittests/test_datasource/test_azure.py
index 97a53bee..965bce4b 100644
--- a/tests/unittests/test_datasource/test_azure.py
+++ b/tests/unittests/test_datasource/test_azure.py
@@ -1,5 +1,5 @@
from cloudinit import helpers
-from cloudinit.util import load_file
+from cloudinit.util import b64e, load_file
from cloudinit.sources import DataSourceAzure
from ..helpers import TestCase, populate_dir
@@ -12,7 +12,6 @@ try:
except ImportError:
from contextlib2 import ExitStack
-import base64
import crypt
import os
import stat
@@ -22,13 +21,6 @@ import tempfile
import unittest
-def b64(source):
- # In Python 3, b64encode only accepts bytes and returns bytes.
- if not isinstance(source, bytes):
- source = source.encode('utf-8')
- return base64.b64encode(source).decode('us-ascii')
-
-
def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None):
if data is None:
data = {'HostName': 'FOOHOST'}
@@ -58,7 +50,7 @@ def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None):
content += "<%s%s>%s</%s>\n" % (key, attrs, val, key)
if userdata:
- content += "<UserData>%s</UserData>\n" % (b64(userdata))
+ content += "<UserData>%s</UserData>\n" % (b64e(userdata))
if pubkeys:
content += "<SSH><PublicKeys>\n"
@@ -189,7 +181,7 @@ class TestAzureDataSource(TestCase):
# set dscfg in via base64 encoded yaml
cfg = {'agent_command': "my_command"}
odata = {'HostName': "myhost", 'UserName': "myuser",
- 'dscfg': {'text': b64(yaml.dump(cfg)),
+ 'dscfg': {'text': b64e(yaml.dump(cfg)),
'encoding': 'base64'}}
data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
@@ -241,7 +233,7 @@ class TestAzureDataSource(TestCase):
def test_userdata_found(self):
mydata = "FOOBAR"
- odata = {'UserData': b64(mydata)}
+ odata = {'UserData': b64e(mydata)}
data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
dsrc = self._get_ds(data)
@@ -289,7 +281,7 @@ class TestAzureDataSource(TestCase):
'command': 'my-bounce-command',
'hostname_command': 'my-hostname-command'}}
odata = {'HostName': "xhost",
- 'dscfg': {'text': b64(yaml.dump(cfg)),
+ 'dscfg': {'text': b64e(yaml.dump(cfg)),
'encoding': 'base64'}}
data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
self._get_ds(data).get_data()
@@ -304,7 +296,7 @@ class TestAzureDataSource(TestCase):
# config specifying set_hostname off should not bounce
cfg = {'set_hostname': False}
odata = {'HostName': "xhost",
- 'dscfg': {'text': b64(yaml.dump(cfg)),
+ 'dscfg': {'text': b64e(yaml.dump(cfg)),
'encoding': 'base64'}}
data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
self._get_ds(data).get_data()
@@ -333,7 +325,7 @@ class TestAzureDataSource(TestCase):
# Make sure that user can affect disk aliases
dscfg = {'disk_aliases': {'ephemeral0': '/dev/sdc'}}
odata = {'HostName': "myhost", 'UserName': "myuser",
- 'dscfg': {'text': b64(yaml.dump(dscfg)),
+ 'dscfg': {'text': b64e(yaml.dump(dscfg)),
'encoding': 'base64'}}
usercfg = {'disk_setup': {'/dev/sdc': {'something': '...'},
'ephemeral0': False}}
@@ -370,7 +362,7 @@ class TestAzureDataSource(TestCase):
def test_existing_ovf_same(self):
# waagent/SharedConfig left alone if found ovf-env.xml same as cached
- odata = {'UserData': b64("SOMEUSERDATA")}
+ odata = {'UserData': b64e("SOMEUSERDATA")}
data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
populate_dir(self.waagent_d,
@@ -394,9 +386,9 @@ class TestAzureDataSource(TestCase):
# 'get_data' should remove SharedConfig.xml in /var/lib/waagent
# if ovf-env.xml differs.
cached_ovfenv = construct_valid_ovf_env(
- {'userdata': b64("FOO_USERDATA")})
+ {'userdata': b64e("FOO_USERDATA")})
new_ovfenv = construct_valid_ovf_env(
- {'userdata': b64("NEW_USERDATA")})
+ {'userdata': b64e("NEW_USERDATA")})
populate_dir(self.waagent_d,
{'ovf-env.xml': cached_ovfenv,
diff --git a/tests/unittests/test_datasource/test_opennebula.py b/tests/unittests/test_datasource/test_opennebula.py
index e5a4bd18..27adf21b 100644
--- a/tests/unittests/test_datasource/test_opennebula.py
+++ b/tests/unittests/test_datasource/test_opennebula.py
@@ -3,19 +3,12 @@ from cloudinit.sources import DataSourceOpenNebula as ds
from cloudinit import util
from ..helpers import TestCase, populate_dir
-from base64 import b64encode
import os
import pwd
import shutil
import tempfile
import unittest
-def b64(source):
- # In Python 3, b64encode only accepts bytes and returns bytes.
- if not isinstance(source, bytes):
- source = source.encode('utf-8')
- return b64encode(source).decode('us-ascii')
-
TEST_VARS = {
'VAR1': 'single',
@@ -186,7 +179,7 @@ class TestOpenNebulaDataSource(TestCase):
self.assertEqual(USER_DATA, results['userdata'])
def test_user_data_encoding_required_for_decode(self):
- b64userdata = b64(USER_DATA)
+ b64userdata = util.b64e(USER_DATA)
for k in ('USER_DATA', 'USERDATA'):
my_d = os.path.join(self.tmp, k)
populate_context_dir(my_d, {k: b64userdata})
@@ -198,7 +191,7 @@ class TestOpenNebulaDataSource(TestCase):
def test_user_data_base64_encoding(self):
for k in ('USER_DATA', 'USERDATA'):
my_d = os.path.join(self.tmp, k)
- populate_context_dir(my_d, {k: b64(USER_DATA),
+ populate_context_dir(my_d, {k: util.b64e(USER_DATA),
'USERDATA_ENCODING': 'base64'})
results = ds.read_context_disk_dir(my_d)
diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py
index b5ebf94d..8b62b1b1 100644
--- a/tests/unittests/test_datasource/test_smartos.py
+++ b/tests/unittests/test_datasource/test_smartos.py
@@ -24,9 +24,9 @@
from __future__ import print_function
-import base64
from cloudinit import helpers as c_helpers
from cloudinit.sources import DataSourceSmartOS
+from cloudinit.util import b64e
from .. import helpers
import os
import os.path
@@ -36,12 +36,6 @@ import tempfile
import stat
import uuid
-def b64(source):
- # In Python 3, b64encode only accepts bytes and returns bytes.
- if not isinstance(source, bytes):
- source = source.encode('utf-8')
- return base64.b64encode(source).decode('us-ascii')
-
MOCK_RETURNS = {
'hostname': 'test-host',
@@ -239,7 +233,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
my_returns = MOCK_RETURNS.copy()
my_returns['base64_all'] = "true"
for k in ('hostname', 'cloud-init:user-data'):
- my_returns[k] = b64(my_returns[k])
+ my_returns[k] = b64e(my_returns[k])
dsrc = self._get_ds(mockdata=my_returns)
ret = dsrc.get_data()
@@ -260,7 +254,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
my_returns['b64-cloud-init:user-data'] = "true"
my_returns['b64-hostname'] = "true"
for k in ('hostname', 'cloud-init:user-data'):
- my_returns[k] = b64(my_returns[k])
+ my_returns[k] = b64e(my_returns[k])
dsrc = self._get_ds(mockdata=my_returns)
ret = dsrc.get_data()
@@ -276,7 +270,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
my_returns = MOCK_RETURNS.copy()
my_returns['base64_keys'] = 'hostname,ignored'
for k in ('hostname',):
- my_returns[k] = b64(my_returns[k])
+ my_returns[k] = b64e(my_returns[k])
dsrc = self._get_ds(mockdata=my_returns)
ret = dsrc.get_data()
diff --git a/tests/unittests/test_handler/test_handler_seed_random.py b/tests/unittests/test_handler/test_handler_seed_random.py
index d3f18fa0..0bcdcb31 100644
--- a/tests/unittests/test_handler/test_handler_seed_random.py
+++ b/tests/unittests/test_handler/test_handler_seed_random.py
@@ -18,7 +18,6 @@
from cloudinit.config import cc_seed_random
-import base64
import gzip
import tempfile
@@ -38,13 +37,6 @@ import logging
LOG = logging.getLogger(__name__)
-def b64(source):
- # In Python 3, b64encode only accepts bytes and returns bytes.
- if not isinstance(source, bytes):
- source = source.encode('utf-8')
- return base64.b64encode(source).decode('us-ascii')
-
-
class TestRandomSeed(t_help.TestCase):
def setUp(self):
super(TestRandomSeed, self).setUp()
@@ -141,7 +133,7 @@ class TestRandomSeed(t_help.TestCase):
self.assertEquals("big-toe", contents)
def test_append_random_base64(self):
- data = b64('bubbles')
+ data = util.b64e('bubbles')
cfg = {
'random_seed': {
'file': self._seed_file,
@@ -154,7 +146,7 @@ class TestRandomSeed(t_help.TestCase):
self.assertEquals("bubbles", contents)
def test_append_random_b64(self):
- data = b64('kit-kat')
+ data = util.b64e('kit-kat')
cfg = {
'random_seed': {
'file': self._seed_file,