summaryrefslogtreecommitdiff
path: root/cloudinit/tests
diff options
context:
space:
mode:
authorChad Smith <chad.smith@canonical.com>2017-10-06 13:22:54 -0600
committerChad Smith <chad.smith@canonical.com>2017-10-06 13:22:54 -0600
commit9fd022780ae516df3499b17b2d69b72fc502917c (patch)
treebc33ac6296f374414ccb15dce233a4293b8633d3 /cloudinit/tests
parent89630a6658c099d59f2766493a35c2ad266a8f42 (diff)
parent45d361cb0b7f5e4e7d79522bd285871898358623 (diff)
downloadvyos-cloud-init-9fd022780ae516df3499b17b2d69b72fc502917c.tar.gz
vyos-cloud-init-9fd022780ae516df3499b17b2d69b72fc502917c.zip
merge from master at 17.1-17-g45d361cb
Diffstat (limited to 'cloudinit/tests')
-rw-r--r--cloudinit/tests/__init__.py0
-rw-r--r--cloudinit/tests/helpers.py405
-rw-r--r--cloudinit/tests/test_simpletable.py100
-rw-r--r--cloudinit/tests/test_temp_utils.py101
-rw-r--r--cloudinit/tests/test_url_helper.py40
5 files changed, 646 insertions, 0 deletions
diff --git a/cloudinit/tests/__init__.py b/cloudinit/tests/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/cloudinit/tests/__init__.py
diff --git a/cloudinit/tests/helpers.py b/cloudinit/tests/helpers.py
new file mode 100644
index 00000000..6f88a5b7
--- /dev/null
+++ b/cloudinit/tests/helpers.py
@@ -0,0 +1,405 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+from __future__ import print_function
+
+import functools
+import json
+import logging
+import os
+import shutil
+import sys
+import tempfile
+import unittest
+
+import mock
+import six
+import unittest2
+
+try:
+ from contextlib import ExitStack
+except ImportError:
+ from contextlib2 import ExitStack
+
+from cloudinit import helpers as ch
+from cloudinit import util
+
+# Used for skipping tests
+SkipTest = unittest2.SkipTest
+
+# Used for detecting different python versions
+PY2 = False
+PY26 = False
+PY27 = False
+PY3 = False
+
+_PY_VER = sys.version_info
+_PY_MAJOR, _PY_MINOR, _PY_MICRO = _PY_VER[0:3]
+if (_PY_MAJOR, _PY_MINOR) <= (2, 6):
+ if (_PY_MAJOR, _PY_MINOR) == (2, 6):
+ PY26 = True
+ if (_PY_MAJOR, _PY_MINOR) >= (2, 0):
+ PY2 = True
+else:
+ if (_PY_MAJOR, _PY_MINOR) == (2, 7):
+ PY27 = True
+ PY2 = True
+ if (_PY_MAJOR, _PY_MINOR) >= (3, 0):
+ PY3 = True
+
+
+# Makes the old path start
+# with new base instead of whatever
+# it previously had
+def rebase_path(old_path, new_base):
+ if old_path.startswith(new_base):
+ # Already handled...
+ return old_path
+ # Retarget the base of that path
+ # to the new base instead of the
+ # old one...
+ path = os.path.join(new_base, old_path.lstrip("/"))
+ path = os.path.abspath(path)
+ return path
+
+
+# Can work on anything that takes a path as arguments
+def retarget_many_wrapper(new_base, am, old_func):
+ def wrapper(*args, **kwds):
+ n_args = list(args)
+ nam = am
+ if am == -1:
+ nam = len(n_args)
+ for i in range(0, nam):
+ path = args[i]
+ # patchOS() wraps various os and os.path functions, however in
+ # Python 3 some of these now accept file-descriptors (integers).
+ # That breaks rebase_path() so in lieu of a better solution, just
+ # don't rebase if we get a fd.
+ if isinstance(path, six.string_types):
+ n_args[i] = rebase_path(path, new_base)
+ return old_func(*n_args, **kwds)
+ return wrapper
+
+
+class TestCase(unittest2.TestCase):
+
+ def reset_global_state(self):
+ """Reset any global state to its original settings.
+
+ cloudinit caches some values in cloudinit.util. Unit tests that
+ involved those cached paths were then subject to failure if the order
+ of invocation changed (LP: #1703697).
+
+ This function resets any of these global state variables to their
+ initial state.
+
+ In the future this should really be done with some registry that
+ can then be cleaned in a more obvious way.
+ """
+ util.PROC_CMDLINE = None
+ util._DNS_REDIRECT_IP = None
+ util._LSB_RELEASE = {}
+
+ def setUp(self):
+ super(TestCase, self).setUp()
+ self.reset_global_state()
+
+ def add_patch(self, target, attr, **kwargs):
+ """Patches specified target object and sets it as attr on test
+ instance also schedules cleanup"""
+ if 'autospec' not in kwargs:
+ kwargs['autospec'] = True
+ m = mock.patch(target, **kwargs)
+ p = m.start()
+ self.addCleanup(m.stop)
+ setattr(self, attr, p)
+
+
+class CiTestCase(TestCase):
+ """This is the preferred test case base class unless user
+ needs other test case classes below."""
+
+ # Subclass overrides for specific test behavior
+ # Whether or not a unit test needs logfile setup
+ with_logs = False
+
+ def setUp(self):
+ super(CiTestCase, self).setUp()
+ if self.with_logs:
+ # Create a log handler so unit tests can search expected logs.
+ self.logger = logging.getLogger()
+ self.logs = six.StringIO()
+ formatter = logging.Formatter('%(levelname)s: %(message)s')
+ handler = logging.StreamHandler(self.logs)
+ handler.setFormatter(formatter)
+ self.old_handlers = self.logger.handlers
+ self.logger.handlers = [handler]
+
+ def tearDown(self):
+ if self.with_logs:
+ # Remove the handler we setup
+ logging.getLogger().handlers = self.old_handlers
+ super(CiTestCase, self).tearDown()
+
+ def tmp_dir(self, dir=None, cleanup=True):
+ # return a full path to a temporary directory that will be cleaned up.
+ if dir is None:
+ tmpd = tempfile.mkdtemp(
+ prefix="ci-%s." % self.__class__.__name__)
+ else:
+ tmpd = tempfile.mkdtemp(dir=dir)
+ self.addCleanup(functools.partial(shutil.rmtree, tmpd))
+ return tmpd
+
+ def tmp_path(self, path, dir=None):
+ # return an absolute path to 'path' under dir.
+ # if dir is None, one will be created with tmp_dir()
+ # the file is not created or modified.
+ if dir is None:
+ dir = self.tmp_dir()
+ return os.path.normpath(os.path.abspath(os.path.join(dir, path)))
+
+
+class ResourceUsingTestCase(CiTestCase):
+
+ def setUp(self):
+ super(ResourceUsingTestCase, self).setUp()
+ self.resource_path = None
+
+ def resourceLocation(self, subname=None):
+ if self.resource_path is None:
+ paths = [
+ os.path.join('tests', 'data'),
+ os.path.join('data'),
+ os.path.join(os.pardir, 'tests', 'data'),
+ os.path.join(os.pardir, 'data'),
+ ]
+ for p in paths:
+ if os.path.isdir(p):
+ self.resource_path = p
+ break
+ self.assertTrue((self.resource_path and
+ os.path.isdir(self.resource_path)),
+ msg="Unable to locate test resource data path!")
+ if not subname:
+ return self.resource_path
+ return os.path.join(self.resource_path, subname)
+
+ def readResource(self, name):
+ where = self.resourceLocation(name)
+ with open(where, 'r') as fh:
+ return fh.read()
+
+ def getCloudPaths(self, ds=None):
+ tmpdir = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, tmpdir)
+ cp = ch.Paths({'cloud_dir': tmpdir,
+ 'templates_dir': self.resourceLocation()},
+ ds=ds)
+ return cp
+
+
+class FilesystemMockingTestCase(ResourceUsingTestCase):
+
+ def setUp(self):
+ super(FilesystemMockingTestCase, self).setUp()
+ self.patched_funcs = ExitStack()
+
+ def tearDown(self):
+ self.patched_funcs.close()
+ ResourceUsingTestCase.tearDown(self)
+
+ def replicateTestRoot(self, example_root, target_root):
+ real_root = self.resourceLocation()
+ real_root = os.path.join(real_root, 'roots', example_root)
+ for (dir_path, _dirnames, filenames) in os.walk(real_root):
+ real_path = dir_path
+ make_path = rebase_path(real_path[len(real_root):], target_root)
+ util.ensure_dir(make_path)
+ for f in filenames:
+ real_path = util.abs_join(real_path, f)
+ make_path = util.abs_join(make_path, f)
+ shutil.copy(real_path, make_path)
+
+ def patchUtils(self, new_root):
+ patch_funcs = {
+ util: [('write_file', 1),
+ ('append_file', 1),
+ ('load_file', 1),
+ ('ensure_dir', 1),
+ ('chmod', 1),
+ ('delete_dir_contents', 1),
+ ('del_file', 1),
+ ('sym_link', -1),
+ ('copy', -1)],
+ }
+ for (mod, funcs) in patch_funcs.items():
+ for (f, am) in funcs:
+ func = getattr(mod, f)
+ trap_func = retarget_many_wrapper(new_root, am, func)
+ self.patched_funcs.enter_context(
+ mock.patch.object(mod, f, trap_func))
+
+ # Handle subprocess calls
+ func = getattr(util, 'subp')
+
+ def nsubp(*_args, **_kwargs):
+ return ('', '')
+
+ self.patched_funcs.enter_context(
+ mock.patch.object(util, 'subp', nsubp))
+
+ def null_func(*_args, **_kwargs):
+ return None
+
+ for f in ['chownbyid', 'chownbyname']:
+ self.patched_funcs.enter_context(
+ mock.patch.object(util, f, null_func))
+
+ def patchOS(self, new_root):
+ patch_funcs = {
+ os.path: [('isfile', 1), ('exists', 1),
+ ('islink', 1), ('isdir', 1)],
+ os: [('listdir', 1), ('mkdir', 1),
+ ('lstat', 1), ('symlink', 2)],
+ }
+ for (mod, funcs) in patch_funcs.items():
+ for f, nargs in funcs:
+ func = getattr(mod, f)
+ trap_func = retarget_many_wrapper(new_root, nargs, func)
+ self.patched_funcs.enter_context(
+ mock.patch.object(mod, f, trap_func))
+
+ def patchOpen(self, new_root):
+ trap_func = retarget_many_wrapper(new_root, 1, open)
+ name = 'builtins.open' if PY3 else '__builtin__.open'
+ self.patched_funcs.enter_context(mock.patch(name, trap_func))
+
+ def patchStdoutAndStderr(self, stdout=None, stderr=None):
+ if stdout is not None:
+ self.patched_funcs.enter_context(
+ mock.patch.object(sys, 'stdout', stdout))
+ if stderr is not None:
+ self.patched_funcs.enter_context(
+ mock.patch.object(sys, 'stderr', stderr))
+
+ def reRoot(self, root=None):
+ if root is None:
+ root = self.tmp_dir()
+ self.patchUtils(root)
+ self.patchOS(root)
+ return root
+
+
+class HttprettyTestCase(CiTestCase):
+ # necessary as http_proxy gets in the way of httpretty
+ # https://github.com/gabrielfalcao/HTTPretty/issues/122
+
+ def setUp(self):
+ self.restore_proxy = os.environ.get('http_proxy')
+ if self.restore_proxy is not None:
+ del os.environ['http_proxy']
+ super(HttprettyTestCase, self).setUp()
+
+ def tearDown(self):
+ if self.restore_proxy:
+ os.environ['http_proxy'] = self.restore_proxy
+ super(HttprettyTestCase, self).tearDown()
+
+
+def populate_dir(path, files):
+ if not os.path.exists(path):
+ os.makedirs(path)
+ ret = []
+ for (name, content) in files.items():
+ p = os.path.sep.join([path, name])
+ util.ensure_dir(os.path.dirname(p))
+ with open(p, "wb") as fp:
+ if isinstance(content, six.binary_type):
+ fp.write(content)
+ else:
+ fp.write(content.encode('utf-8'))
+ fp.close()
+ ret.append(p)
+
+ return ret
+
+
+def dir2dict(startdir, prefix=None):
+ flist = {}
+ if prefix is None:
+ prefix = startdir
+ for root, dirs, files in os.walk(startdir):
+ for fname in files:
+ fpath = os.path.join(root, fname)
+ key = fpath[len(prefix):]
+ flist[key] = util.load_file(fpath)
+ return flist
+
+
+def json_dumps(data):
+ # print data in nicely formatted json.
+ return json.dumps(data, indent=1, sort_keys=True,
+ separators=(',', ': '))
+
+
+def wrap_and_call(prefix, mocks, func, *args, **kwargs):
+ """
+ call func(args, **kwargs) with mocks applied, then unapplies mocks
+ nicer to read than repeating dectorators on each function
+
+ prefix: prefix for mock names (e.g. 'cloudinit.stages.util') or None
+ mocks: dictionary of names (under 'prefix') to mock and either
+ a return value or a dictionary to pass to the mock.patch call
+ func: function to call with mocks applied
+ *args,**kwargs: arguments for 'func'
+
+ return_value: return from 'func'
+ """
+ delim = '.'
+ if prefix is None:
+ prefix = ''
+ prefix = prefix.rstrip(delim)
+ unwraps = []
+ for fname, kw in mocks.items():
+ if prefix:
+ fname = delim.join((prefix, fname))
+ if not isinstance(kw, dict):
+ kw = {'return_value': kw}
+ p = mock.patch(fname, **kw)
+ p.start()
+ unwraps.append(p)
+ try:
+ return func(*args, **kwargs)
+ finally:
+ for p in unwraps:
+ p.stop()
+
+
+try:
+ skipIf = unittest.skipIf
+except AttributeError:
+ # Python 2.6. Doesn't have to be high fidelity.
+ def skipIf(condition, reason):
+ def decorator(func):
+ def wrapper(*args, **kws):
+ if condition:
+ return func(*args, **kws)
+ else:
+ print(reason, file=sys.stderr)
+ return wrapper
+ return decorator
+
+
+# older versions of mock do not have the useful 'assert_not_called'
+if not hasattr(mock.Mock, 'assert_not_called'):
+ def __mock_assert_not_called(mmock):
+ if mmock.call_count != 0:
+ msg = ("[citest] Expected '%s' to not have been called. "
+ "Called %s times." %
+ (mmock._mock_name or 'mock', mmock.call_count))
+ raise AssertionError(msg)
+ mock.Mock.assert_not_called = __mock_assert_not_called
+
+
+# vi: ts=4 expandtab
diff --git a/cloudinit/tests/test_simpletable.py b/cloudinit/tests/test_simpletable.py
new file mode 100644
index 00000000..96bc24cf
--- /dev/null
+++ b/cloudinit/tests/test_simpletable.py
@@ -0,0 +1,100 @@
+# Copyright (C) 2017 Amazon.com, Inc. or its affiliates
+#
+# Author: Andrew Jorgensen <ajorgens@amazon.com>
+#
+# This file is part of cloud-init. See LICENSE file for license information.
+"""Tests that SimpleTable works just like PrettyTable for cloud-init.
+
+Not all possible PrettyTable cases are tested because we're not trying to
+reimplement the entire library, only the minimal parts we actually use.
+"""
+
+from cloudinit.simpletable import SimpleTable
+from cloudinit.tests.helpers import CiTestCase
+
+# Examples rendered by cloud-init using PrettyTable
+NET_DEVICE_FIELDS = (
+ 'Device', 'Up', 'Address', 'Mask', 'Scope', 'Hw-Address')
+NET_DEVICE_ROWS = (
+ ('ens3', True, '172.31.4.203', '255.255.240.0', '.', '0a:1f:07:15:98:70'),
+ ('ens3', True, 'fe80::81f:7ff:fe15:9870/64', '.', 'link',
+ '0a:1f:07:15:98:70'),
+ ('lo', True, '127.0.0.1', '255.0.0.0', '.', '.'),
+ ('lo', True, '::1/128', '.', 'host', '.'),
+)
+NET_DEVICE_TABLE = """\
++--------+------+----------------------------+---------------+-------+-------------------+
+| Device | Up | Address | Mask | Scope | Hw-Address |
++--------+------+----------------------------+---------------+-------+-------------------+
+| ens3 | True | 172.31.4.203 | 255.255.240.0 | . | 0a:1f:07:15:98:70 |
+| ens3 | True | fe80::81f:7ff:fe15:9870/64 | . | link | 0a:1f:07:15:98:70 |
+| lo | True | 127.0.0.1 | 255.0.0.0 | . | . |
+| lo | True | ::1/128 | . | host | . |
++--------+------+----------------------------+---------------+-------+-------------------+""" # noqa: E501
+ROUTE_IPV4_FIELDS = (
+ 'Route', 'Destination', 'Gateway', 'Genmask', 'Interface', 'Flags')
+ROUTE_IPV4_ROWS = (
+ ('0', '0.0.0.0', '172.31.0.1', '0.0.0.0', 'ens3', 'UG'),
+ ('1', '169.254.0.0', '0.0.0.0', '255.255.0.0', 'ens3', 'U'),
+ ('2', '172.31.0.0', '0.0.0.0', '255.255.240.0', 'ens3', 'U'),
+)
+ROUTE_IPV4_TABLE = """\
++-------+-------------+------------+---------------+-----------+-------+
+| Route | Destination | Gateway | Genmask | Interface | Flags |
++-------+-------------+------------+---------------+-----------+-------+
+| 0 | 0.0.0.0 | 172.31.0.1 | 0.0.0.0 | ens3 | UG |
+| 1 | 169.254.0.0 | 0.0.0.0 | 255.255.0.0 | ens3 | U |
+| 2 | 172.31.0.0 | 0.0.0.0 | 255.255.240.0 | ens3 | U |
++-------+-------------+------------+---------------+-----------+-------+"""
+
+AUTHORIZED_KEYS_FIELDS = (
+ 'Keytype', 'Fingerprint (md5)', 'Options', 'Comment')
+AUTHORIZED_KEYS_ROWS = (
+ ('ssh-rsa', '24:c7:41:49:47:12:31:a0:de:6f:62:79:9b:13:06:36', '-',
+ 'ajorgens'),
+)
+AUTHORIZED_KEYS_TABLE = """\
++---------+-------------------------------------------------+---------+----------+
+| Keytype | Fingerprint (md5) | Options | Comment |
++---------+-------------------------------------------------+---------+----------+
+| ssh-rsa | 24:c7:41:49:47:12:31:a0:de:6f:62:79:9b:13:06:36 | - | ajorgens |
++---------+-------------------------------------------------+---------+----------+""" # noqa: E501
+
+# from prettytable import PrettyTable
+# pt = PrettyTable(('HEADER',))
+# print(pt)
+NO_ROWS_FIELDS = ('HEADER',)
+NO_ROWS_TABLE = """\
++--------+
+| HEADER |
++--------+
++--------+"""
+
+
+class TestSimpleTable(CiTestCase):
+
+ def test_no_rows(self):
+ """An empty table is rendered as PrettyTable would have done it."""
+ table = SimpleTable(NO_ROWS_FIELDS)
+ self.assertEqual(str(table), NO_ROWS_TABLE)
+
+ def test_net_dev(self):
+ """Net device info is rendered as it was with PrettyTable."""
+ table = SimpleTable(NET_DEVICE_FIELDS)
+ for row in NET_DEVICE_ROWS:
+ table.add_row(row)
+ self.assertEqual(str(table), NET_DEVICE_TABLE)
+
+ def test_route_ipv4(self):
+ """Route IPv4 info is rendered as it was with PrettyTable."""
+ table = SimpleTable(ROUTE_IPV4_FIELDS)
+ for row in ROUTE_IPV4_ROWS:
+ table.add_row(row)
+ self.assertEqual(str(table), ROUTE_IPV4_TABLE)
+
+ def test_authorized_keys(self):
+ """SSH authorized keys are rendered as they were with PrettyTable."""
+ table = SimpleTable(AUTHORIZED_KEYS_FIELDS)
+ for row in AUTHORIZED_KEYS_ROWS:
+ table.add_row(row)
+ self.assertEqual(str(table), AUTHORIZED_KEYS_TABLE)
diff --git a/cloudinit/tests/test_temp_utils.py b/cloudinit/tests/test_temp_utils.py
new file mode 100644
index 00000000..ffbb92cd
--- /dev/null
+++ b/cloudinit/tests/test_temp_utils.py
@@ -0,0 +1,101 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+"""Tests for cloudinit.temp_utils"""
+
+from cloudinit.temp_utils import mkdtemp, mkstemp
+from cloudinit.tests.helpers import CiTestCase, wrap_and_call
+
+
+class TestTempUtils(CiTestCase):
+
+ def test_mkdtemp_default_non_root(self):
+ """mkdtemp creates a dir under /tmp for the unprivileged."""
+ calls = []
+
+ def fake_mkdtemp(*args, **kwargs):
+ calls.append(kwargs)
+ return '/fake/return/path'
+
+ retval = wrap_and_call(
+ 'cloudinit.temp_utils',
+ {'os.getuid': 1000,
+ 'tempfile.mkdtemp': {'side_effect': fake_mkdtemp},
+ '_TMPDIR': {'new': None},
+ 'os.path.isdir': True},
+ mkdtemp)
+ self.assertEqual('/fake/return/path', retval)
+ self.assertEqual([{'dir': '/tmp'}], calls)
+
+ def test_mkdtemp_default_non_root_needs_exe(self):
+ """mkdtemp creates a dir under /var/tmp/cloud-init when needs_exe."""
+ calls = []
+
+ def fake_mkdtemp(*args, **kwargs):
+ calls.append(kwargs)
+ return '/fake/return/path'
+
+ retval = wrap_and_call(
+ 'cloudinit.temp_utils',
+ {'os.getuid': 1000,
+ 'tempfile.mkdtemp': {'side_effect': fake_mkdtemp},
+ '_TMPDIR': {'new': None},
+ 'os.path.isdir': True},
+ mkdtemp, needs_exe=True)
+ self.assertEqual('/fake/return/path', retval)
+ self.assertEqual([{'dir': '/var/tmp/cloud-init'}], calls)
+
+ def test_mkdtemp_default_root(self):
+ """mkdtemp creates a dir under /run/cloud-init for the privileged."""
+ calls = []
+
+ def fake_mkdtemp(*args, **kwargs):
+ calls.append(kwargs)
+ return '/fake/return/path'
+
+ retval = wrap_and_call(
+ 'cloudinit.temp_utils',
+ {'os.getuid': 0,
+ 'tempfile.mkdtemp': {'side_effect': fake_mkdtemp},
+ '_TMPDIR': {'new': None},
+ 'os.path.isdir': True},
+ mkdtemp)
+ self.assertEqual('/fake/return/path', retval)
+ self.assertEqual([{'dir': '/run/cloud-init/tmp'}], calls)
+
+ def test_mkstemp_default_non_root(self):
+ """mkstemp creates secure tempfile under /tmp for the unprivileged."""
+ calls = []
+
+ def fake_mkstemp(*args, **kwargs):
+ calls.append(kwargs)
+ return '/fake/return/path'
+
+ retval = wrap_and_call(
+ 'cloudinit.temp_utils',
+ {'os.getuid': 1000,
+ 'tempfile.mkstemp': {'side_effect': fake_mkstemp},
+ '_TMPDIR': {'new': None},
+ 'os.path.isdir': True},
+ mkstemp)
+ self.assertEqual('/fake/return/path', retval)
+ self.assertEqual([{'dir': '/tmp'}], calls)
+
+ def test_mkstemp_default_root(self):
+ """mkstemp creates a secure tempfile in /run/cloud-init for root."""
+ calls = []
+
+ def fake_mkstemp(*args, **kwargs):
+ calls.append(kwargs)
+ return '/fake/return/path'
+
+ retval = wrap_and_call(
+ 'cloudinit.temp_utils',
+ {'os.getuid': 0,
+ 'tempfile.mkstemp': {'side_effect': fake_mkstemp},
+ '_TMPDIR': {'new': None},
+ 'os.path.isdir': True},
+ mkstemp)
+ self.assertEqual('/fake/return/path', retval)
+ self.assertEqual([{'dir': '/run/cloud-init/tmp'}], calls)
+
+# vi: ts=4 expandtab
diff --git a/cloudinit/tests/test_url_helper.py b/cloudinit/tests/test_url_helper.py
new file mode 100644
index 00000000..b778a3a7
--- /dev/null
+++ b/cloudinit/tests/test_url_helper.py
@@ -0,0 +1,40 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+from cloudinit.url_helper import oauth_headers
+from cloudinit.tests.helpers import CiTestCase, mock, skipIf
+
+
+try:
+ import oauthlib
+ assert oauthlib # avoid pyflakes error F401: import unused
+ _missing_oauthlib_dep = False
+except ImportError:
+ _missing_oauthlib_dep = True
+
+
+class TestOAuthHeaders(CiTestCase):
+
+ def test_oauth_headers_raises_not_implemented_when_oathlib_missing(self):
+ """oauth_headers raises a NotImplemented error when oauth absent."""
+ with mock.patch.dict('sys.modules', {'oauthlib': None}):
+ with self.assertRaises(NotImplementedError) as context_manager:
+ oauth_headers(1, 2, 3, 4, 5)
+ self.assertEqual(
+ 'oauth support is not available',
+ str(context_manager.exception))
+
+ @skipIf(_missing_oauthlib_dep, "No python-oauthlib dependency")
+ @mock.patch('oauthlib.oauth1.Client')
+ def test_oauth_headers_calls_oathlibclient_when_available(self, m_client):
+ """oauth_headers calls oaut1.hClient.sign with the provided url."""
+ class fakeclient(object):
+ def sign(self, url):
+ # The first and 3rd item of the client.sign tuple are ignored
+ return ('junk', url, 'junk2')
+
+ m_client.return_value = fakeclient()
+
+ return_value = oauth_headers(
+ 'url', 'consumer_key', 'token_key', 'token_secret',
+ 'consumer_secret')
+ self.assertEqual('url', return_value)