diff options
| author | Barry Warsaw <barry@python.org> | 2015-01-27 15:03:52 -0500 | 
|---|---|---|
| committer | Barry Warsaw <barry@python.org> | 2015-01-27 15:03:52 -0500 | 
| commit | 6e742d20e9ed56498925c7c850cd5da65d063b4b (patch) | |
| tree | 3e584ae2381d72d2b77c6f2d98c9be93bcb9e413 | |
| parent | 69c64029997599b3f1764ef48fe571094e2ee5f2 (diff) | |
| download | vyos-cloud-init-6e742d20e9ed56498925c7c850cd5da65d063b4b.tar.gz vyos-cloud-init-6e742d20e9ed56498925c7c850cd5da65d063b4b.zip | |
Respond to review:
- Refactor both the base64 encoding and decoding into utility functions.
Also:
- Mechanically fix some other broken untested code.
| -rw-r--r-- | cloudinit/config/cc_seed_random.py | 8 | ||||
| -rw-r--r-- | cloudinit/config/cc_ssh_authkey_fingerprints.py | 2 | ||||
| -rw-r--r-- | cloudinit/sources/DataSourceOpenNebula.py | 7 | ||||
| -rw-r--r-- | cloudinit/sources/DataSourceSmartOS.py | 11 | ||||
| -rw-r--r-- | cloudinit/util.py | 20 | ||||
| -rw-r--r-- | tests/unittests/test_datasource/test_azure.py | 28 | ||||
| -rw-r--r-- | tests/unittests/test_datasource/test_opennebula.py | 11 | ||||
| -rw-r--r-- | tests/unittests/test_datasource/test_smartos.py | 14 | ||||
| -rw-r--r-- | tests/unittests/test_handler/test_handler_seed_random.py | 12 | 
9 files changed, 42 insertions, 71 deletions
| diff --git a/cloudinit/config/cc_seed_random.py b/cloudinit/config/cc_seed_random.py index 981e1b08..bb64b0f5 100644 --- a/cloudinit/config/cc_seed_random.py +++ b/cloudinit/config/cc_seed_random.py @@ -38,13 +38,7 @@ def _decode(data, encoding=None):      if not encoding or encoding.lower() in ['raw']:          return data      elif encoding.lower() in ['base64', 'b64']: -        # Try to give us a native string in both Python 2 and 3, and remember -        # that b64decode() returns bytes in Python 3. -        decoded = base64.b64decode(data) -        try: -            return decoded.decode('utf-8') -        except UnicodeDecodeError: -            return decoded +        return util.b64d(data)      elif encoding.lower() in ['gzip', 'gz']:          return util.decomp_gzip(data, quiet=False)      else: diff --git a/cloudinit/config/cc_ssh_authkey_fingerprints.py b/cloudinit/config/cc_ssh_authkey_fingerprints.py index 51580633..6ce831bc 100644 --- a/cloudinit/config/cc_ssh_authkey_fingerprints.py +++ b/cloudinit/config/cc_ssh_authkey_fingerprints.py @@ -32,7 +32,7 @@ from cloudinit import util  def _split_hash(bin_hash):      split_up = [] -    for i in xrange(0, len(bin_hash), 2): +    for i in range(0, len(bin_hash), 2):          split_up.append(bin_hash[i:i + 2])      return split_up diff --git a/cloudinit/sources/DataSourceOpenNebula.py b/cloudinit/sources/DataSourceOpenNebula.py index a0275cda..61709c1b 100644 --- a/cloudinit/sources/DataSourceOpenNebula.py +++ b/cloudinit/sources/DataSourceOpenNebula.py @@ -426,12 +426,7 @@ def read_context_disk_dir(source_dir, asuser=None):                                 context.get('USER_DATA_ENCODING'))          if encoding == "base64":              try: -                userdata = base64.b64decode(results['userdata']) -                # In Python 3 we still expect a str, but b64decode will return -                # bytes.  Convert to str. -                if isinstance(userdata, bytes): -                    userdata = userdata.decode('utf-8') -                results['userdata'] = userdata +                results['userdata'] = util.b64d(results['userdata'])              except TypeError:                  LOG.warn("Failed base64 decoding of userdata") diff --git a/cloudinit/sources/DataSourceSmartOS.py b/cloudinit/sources/DataSourceSmartOS.py index f59ad3d6..9d48beab 100644 --- a/cloudinit/sources/DataSourceSmartOS.py +++ b/cloudinit/sources/DataSourceSmartOS.py @@ -351,16 +351,7 @@ def query_data(noun, seed_device, seed_timeout, strip=False, default=None,      if b64:          try: -            # Generally, we want native strings in the values.  Python 3's -            # b64decode will return bytes though, so decode them to utf-8 if -            # possible.  If that fails, return the bytes. -            decoded = base64.b64decode(resp) -            try: -                if isinstance(decoded, bytes): -                    return decoded.decode('utf-8') -            except UnicodeDecodeError: -                pass -            return decoded +            return util.b64d(resp)          # Bogus input produces different errors in Python 2 and 3; catch both.          except (TypeError, binascii.Error):              LOG.warn("Failed base64 decoding key '%s'", noun) diff --git a/cloudinit/util.py b/cloudinit/util.py index 766f8e32..8916cc11 100644 --- a/cloudinit/util.py +++ b/cloudinit/util.py @@ -44,6 +44,7 @@ import sys  import tempfile  import time +from base64 import b64decode, b64encode  from six.moves.urllib import parse as urlparse  import six @@ -90,6 +91,25 @@ def encode_text(text, encoding='utf-8'):          return text      return text.encode(encoding) + +def b64d(source): +    # Base64 decode some data, accepting bytes or unicode/str, and returning +    # str/unicode if the result is utf-8 compatible, otherwise returning bytes. +    decoded = b64decode(source) +    if isinstance(decoded, bytes): +        try: +            return decoded.decode('utf-8') +        except UnicodeDecodeError: +            return decoded + +def b64e(source): +    # Base64 encode some data, accepting bytes or unicode/str, and returning +    # str/unicode if the result is utf-8 compatible, otherwise returning bytes. +    if not isinstance(source, bytes): +        source = source.encode('utf-8') +    return b64encode(source).decode('utf-8') + +  # Path for DMI Data  DMI_SYS_PATH = "/sys/class/dmi/id" diff --git a/tests/unittests/test_datasource/test_azure.py b/tests/unittests/test_datasource/test_azure.py index 97a53bee..965bce4b 100644 --- a/tests/unittests/test_datasource/test_azure.py +++ b/tests/unittests/test_datasource/test_azure.py @@ -1,5 +1,5 @@  from cloudinit import helpers -from cloudinit.util import load_file +from cloudinit.util import b64e, load_file  from cloudinit.sources import DataSourceAzure  from ..helpers import TestCase, populate_dir @@ -12,7 +12,6 @@ try:  except ImportError:      from contextlib2 import ExitStack -import base64  import crypt  import os  import stat @@ -22,13 +21,6 @@ import tempfile  import unittest -def b64(source): -    # In Python 3, b64encode only accepts bytes and returns bytes. -    if not isinstance(source, bytes): -        source = source.encode('utf-8') -    return base64.b64encode(source).decode('us-ascii') - -  def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None):      if data is None:          data = {'HostName': 'FOOHOST'} @@ -58,7 +50,7 @@ def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None):          content += "<%s%s>%s</%s>\n" % (key, attrs, val, key)      if userdata: -        content += "<UserData>%s</UserData>\n" % (b64(userdata)) +        content += "<UserData>%s</UserData>\n" % (b64e(userdata))      if pubkeys:          content += "<SSH><PublicKeys>\n" @@ -189,7 +181,7 @@ class TestAzureDataSource(TestCase):          # set dscfg in via base64 encoded yaml          cfg = {'agent_command': "my_command"}          odata = {'HostName': "myhost", 'UserName': "myuser", -                'dscfg': {'text': b64(yaml.dump(cfg)), +                'dscfg': {'text': b64e(yaml.dump(cfg)),                            'encoding': 'base64'}}          data = {'ovfcontent': construct_valid_ovf_env(data=odata)} @@ -241,7 +233,7 @@ class TestAzureDataSource(TestCase):      def test_userdata_found(self):          mydata = "FOOBAR" -        odata = {'UserData': b64(mydata)} +        odata = {'UserData': b64e(mydata)}          data = {'ovfcontent': construct_valid_ovf_env(data=odata)}          dsrc = self._get_ds(data) @@ -289,7 +281,7 @@ class TestAzureDataSource(TestCase):                                     'command': 'my-bounce-command',                                     'hostname_command': 'my-hostname-command'}}          odata = {'HostName': "xhost", -                'dscfg': {'text': b64(yaml.dump(cfg)), +                'dscfg': {'text': b64e(yaml.dump(cfg)),                            'encoding': 'base64'}}          data = {'ovfcontent': construct_valid_ovf_env(data=odata)}          self._get_ds(data).get_data() @@ -304,7 +296,7 @@ class TestAzureDataSource(TestCase):          # config specifying set_hostname off should not bounce          cfg = {'set_hostname': False}          odata = {'HostName': "xhost", -                'dscfg': {'text': b64(yaml.dump(cfg)), +                'dscfg': {'text': b64e(yaml.dump(cfg)),                            'encoding': 'base64'}}          data = {'ovfcontent': construct_valid_ovf_env(data=odata)}          self._get_ds(data).get_data() @@ -333,7 +325,7 @@ class TestAzureDataSource(TestCase):          # Make sure that user can affect disk aliases          dscfg = {'disk_aliases': {'ephemeral0': '/dev/sdc'}}          odata = {'HostName': "myhost", 'UserName': "myuser", -                'dscfg': {'text': b64(yaml.dump(dscfg)), +                'dscfg': {'text': b64e(yaml.dump(dscfg)),                            'encoding': 'base64'}}          usercfg = {'disk_setup': {'/dev/sdc': {'something': '...'},                                    'ephemeral0': False}} @@ -370,7 +362,7 @@ class TestAzureDataSource(TestCase):      def test_existing_ovf_same(self):          # waagent/SharedConfig left alone if found ovf-env.xml same as cached -        odata = {'UserData': b64("SOMEUSERDATA")} +        odata = {'UserData': b64e("SOMEUSERDATA")}          data = {'ovfcontent': construct_valid_ovf_env(data=odata)}          populate_dir(self.waagent_d, @@ -394,9 +386,9 @@ class TestAzureDataSource(TestCase):          # 'get_data' should remove SharedConfig.xml in /var/lib/waagent          # if ovf-env.xml differs.          cached_ovfenv = construct_valid_ovf_env( -            {'userdata': b64("FOO_USERDATA")}) +            {'userdata': b64e("FOO_USERDATA")})          new_ovfenv = construct_valid_ovf_env( -            {'userdata': b64("NEW_USERDATA")}) +            {'userdata': b64e("NEW_USERDATA")})          populate_dir(self.waagent_d,              {'ovf-env.xml': cached_ovfenv, diff --git a/tests/unittests/test_datasource/test_opennebula.py b/tests/unittests/test_datasource/test_opennebula.py index e5a4bd18..27adf21b 100644 --- a/tests/unittests/test_datasource/test_opennebula.py +++ b/tests/unittests/test_datasource/test_opennebula.py @@ -3,19 +3,12 @@ from cloudinit.sources import DataSourceOpenNebula as ds  from cloudinit import util  from ..helpers import TestCase, populate_dir -from base64 import b64encode  import os  import pwd  import shutil  import tempfile  import unittest -def b64(source): -    # In Python 3, b64encode only accepts bytes and returns bytes. -    if not isinstance(source, bytes): -        source = source.encode('utf-8') -    return b64encode(source).decode('us-ascii') -  TEST_VARS = {      'VAR1': 'single', @@ -186,7 +179,7 @@ class TestOpenNebulaDataSource(TestCase):              self.assertEqual(USER_DATA, results['userdata'])      def test_user_data_encoding_required_for_decode(self): -        b64userdata = b64(USER_DATA) +        b64userdata = util.b64e(USER_DATA)          for k in ('USER_DATA', 'USERDATA'):              my_d = os.path.join(self.tmp, k)              populate_context_dir(my_d, {k: b64userdata}) @@ -198,7 +191,7 @@ class TestOpenNebulaDataSource(TestCase):      def test_user_data_base64_encoding(self):          for k in ('USER_DATA', 'USERDATA'):              my_d = os.path.join(self.tmp, k) -            populate_context_dir(my_d, {k: b64(USER_DATA), +            populate_context_dir(my_d, {k: util.b64e(USER_DATA),                                          'USERDATA_ENCODING': 'base64'})              results = ds.read_context_disk_dir(my_d) diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py index b5ebf94d..8b62b1b1 100644 --- a/tests/unittests/test_datasource/test_smartos.py +++ b/tests/unittests/test_datasource/test_smartos.py @@ -24,9 +24,9 @@  from __future__ import print_function -import base64  from cloudinit import helpers as c_helpers  from cloudinit.sources import DataSourceSmartOS +from cloudinit.util import b64e  from .. import helpers  import os  import os.path @@ -36,12 +36,6 @@ import tempfile  import stat  import uuid -def b64(source): -    # In Python 3, b64encode only accepts bytes and returns bytes. -    if not isinstance(source, bytes): -        source = source.encode('utf-8') -    return base64.b64encode(source).decode('us-ascii') -  MOCK_RETURNS = {      'hostname': 'test-host', @@ -239,7 +233,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):          my_returns = MOCK_RETURNS.copy()          my_returns['base64_all'] = "true"          for k in ('hostname', 'cloud-init:user-data'): -            my_returns[k] = b64(my_returns[k]) +            my_returns[k] = b64e(my_returns[k])          dsrc = self._get_ds(mockdata=my_returns)          ret = dsrc.get_data() @@ -260,7 +254,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):          my_returns['b64-cloud-init:user-data'] = "true"          my_returns['b64-hostname'] = "true"          for k in ('hostname', 'cloud-init:user-data'): -            my_returns[k] = b64(my_returns[k]) +            my_returns[k] = b64e(my_returns[k])          dsrc = self._get_ds(mockdata=my_returns)          ret = dsrc.get_data() @@ -276,7 +270,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):          my_returns = MOCK_RETURNS.copy()          my_returns['base64_keys'] = 'hostname,ignored'          for k in ('hostname',): -            my_returns[k] = b64(my_returns[k]) +            my_returns[k] = b64e(my_returns[k])          dsrc = self._get_ds(mockdata=my_returns)          ret = dsrc.get_data() diff --git a/tests/unittests/test_handler/test_handler_seed_random.py b/tests/unittests/test_handler/test_handler_seed_random.py index d3f18fa0..0bcdcb31 100644 --- a/tests/unittests/test_handler/test_handler_seed_random.py +++ b/tests/unittests/test_handler/test_handler_seed_random.py @@ -18,7 +18,6 @@  from cloudinit.config import cc_seed_random -import base64  import gzip  import tempfile @@ -38,13 +37,6 @@ import logging  LOG = logging.getLogger(__name__) -def b64(source): -    # In Python 3, b64encode only accepts bytes and returns bytes. -    if not isinstance(source, bytes): -        source = source.encode('utf-8') -    return base64.b64encode(source).decode('us-ascii') - -  class TestRandomSeed(t_help.TestCase):      def setUp(self):          super(TestRandomSeed, self).setUp() @@ -141,7 +133,7 @@ class TestRandomSeed(t_help.TestCase):          self.assertEquals("big-toe", contents)      def test_append_random_base64(self): -        data = b64('bubbles') +        data = util.b64e('bubbles')          cfg = {              'random_seed': {                  'file': self._seed_file, @@ -154,7 +146,7 @@ class TestRandomSeed(t_help.TestCase):          self.assertEquals("bubbles", contents)      def test_append_random_b64(self): -        data = b64('kit-kat') +        data = util.b64e('kit-kat')          cfg = {              'random_seed': {                  'file': self._seed_file, | 
