summaryrefslogtreecommitdiff
path: root/tests/unittests/test_datasource/test_configdrive.py
diff options
context:
space:
mode:
authorScott Moser <smoser@ubuntu.com>2016-03-14 14:16:49 -0400
committerScott Moser <smoser@ubuntu.com>2016-03-14 14:16:49 -0400
commit92db1b884bf34339a4536a20123c45b01c9c49ce (patch)
tree22be0de3d0496212d26015c3b30423da9338aa5c /tests/unittests/test_datasource/test_configdrive.py
parent91ccf1b55b5b79694449446b029dd7c4570517a5 (diff)
parent72f826bff694b612d54b177635ca7e0dc83aed2f (diff)
downloadvyos-cloud-init-92db1b884bf34339a4536a20123c45b01c9c49ce.tar.gz
vyos-cloud-init-92db1b884bf34339a4536a20123c45b01c9c49ce.zip
merge with trunk
Diffstat (limited to 'tests/unittests/test_datasource/test_configdrive.py')
-rw-r--r--tests/unittests/test_datasource/test_configdrive.py154
1 files changed, 87 insertions, 67 deletions
diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py
index d88066e5..bfd787d1 100644
--- a/tests/unittests/test_datasource/test_configdrive.py
+++ b/tests/unittests/test_datasource/test_configdrive.py
@@ -1,10 +1,18 @@
from copy import copy
import json
import os
-import os.path
-
-import mocker
-from mocker import MockerTestCase
+import shutil
+import six
+import tempfile
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+try:
+ from contextlib import ExitStack
+except ImportError:
+ from contextlib2 import ExitStack
from cloudinit import helpers
from cloudinit import settings
@@ -12,7 +20,8 @@ from cloudinit.sources import DataSourceConfigDrive as ds
from cloudinit.sources.helpers import openstack
from cloudinit import util
-from .. import helpers as unit_helpers
+from ..helpers import TestCase
+
PUBKEY = u'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460\n'
EC2_META = {
@@ -37,7 +46,7 @@ EC2_META = {
'reservation-id': 'r-iru5qm4m',
'security-groups': ['default']
}
-USER_DATA = '#!/bin/sh\necho This is user data\n'
+USER_DATA = b'#!/bin/sh\necho This is user data\n'
OSTACK_META = {
'availability_zone': 'nova',
'files': [{'content_path': '/content/0000', 'path': '/etc/foo.cfg'},
@@ -48,27 +57,28 @@ OSTACK_META = {
'public_keys': {'mykey': PUBKEY},
'uuid': 'b0fa911b-69d4-4476-bbe2-1c92bff6535c'}
-CONTENT_0 = 'This is contents of /etc/foo.cfg\n'
-CONTENT_1 = '# this is /etc/bar/bar.cfg\n'
+CONTENT_0 = b'This is contents of /etc/foo.cfg\n'
+CONTENT_1 = b'# this is /etc/bar/bar.cfg\n'
CFG_DRIVE_FILES_V2 = {
- 'ec2/2009-04-04/meta-data.json': json.dumps(EC2_META),
- 'ec2/2009-04-04/user-data': USER_DATA,
- 'ec2/latest/meta-data.json': json.dumps(EC2_META),
- 'ec2/latest/user-data': USER_DATA,
- 'openstack/2012-08-10/meta_data.json': json.dumps(OSTACK_META),
- 'openstack/2012-08-10/user_data': USER_DATA,
- 'openstack/content/0000': CONTENT_0,
- 'openstack/content/0001': CONTENT_1,
- 'openstack/latest/meta_data.json': json.dumps(OSTACK_META),
- 'openstack/latest/user_data': USER_DATA}
+ 'ec2/2009-04-04/meta-data.json': json.dumps(EC2_META),
+ 'ec2/2009-04-04/user-data': USER_DATA,
+ 'ec2/latest/meta-data.json': json.dumps(EC2_META),
+ 'ec2/latest/user-data': USER_DATA,
+ 'openstack/2012-08-10/meta_data.json': json.dumps(OSTACK_META),
+ 'openstack/2012-08-10/user_data': USER_DATA,
+ 'openstack/content/0000': CONTENT_0,
+ 'openstack/content/0001': CONTENT_1,
+ 'openstack/latest/meta_data.json': json.dumps(OSTACK_META),
+ 'openstack/latest/user_data': USER_DATA}
-class TestConfigDriveDataSource(MockerTestCase):
+class TestConfigDriveDataSource(TestCase):
def setUp(self):
super(TestConfigDriveDataSource, self).setUp()
- self.tmp = self.makeDir()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
def test_ec2_metadata(self):
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
@@ -91,23 +101,29 @@ class TestConfigDriveDataSource(MockerTestCase):
'swap': '/dev/vda3',
}
for name, dev_name in name_tests.items():
- with unit_helpers.mocker() as my_mock:
- find_mock = my_mock.replace(util.find_devs_with,
- spec=False, passthrough=False)
+ with ExitStack() as mocks:
provided_name = dev_name[len('/dev/'):]
provided_name = "s" + provided_name[1:]
- find_mock(mocker.ARGS)
- my_mock.result([provided_name])
- exists_mock = my_mock.replace(os.path.exists,
- spec=False, passthrough=False)
- exists_mock(mocker.ARGS)
- my_mock.result(False)
- exists_mock(mocker.ARGS)
- my_mock.result(True)
- my_mock.replay()
+ find_mock = mocks.enter_context(
+ mock.patch.object(util, 'find_devs_with',
+ return_value=[provided_name]))
+ # We want os.path.exists() to return False on its first call,
+ # and True on its second call. We use a handy generator as
+ # the mock side effect for this. The mocked function returns
+ # what the side effect returns.
+
+ def exists_side_effect():
+ yield False
+ yield True
+ exists_mock = mocks.enter_context(
+ mock.patch.object(os.path, 'exists',
+ side_effect=exists_side_effect()))
device = cfg_ds.device_name_to_device(name)
self.assertEquals(dev_name, device)
+ find_mock.assert_called_once_with(mock.ANY)
+ self.assertEqual(exists_mock.call_count, 2)
+
def test_dev_os_map(self):
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN,
@@ -123,19 +139,19 @@ class TestConfigDriveDataSource(MockerTestCase):
'swap': '/dev/vda3',
}
for name, dev_name in name_tests.items():
- with unit_helpers.mocker() as my_mock:
- find_mock = my_mock.replace(util.find_devs_with,
- spec=False, passthrough=False)
- find_mock(mocker.ARGS)
- my_mock.result([dev_name])
- exists_mock = my_mock.replace(os.path.exists,
- spec=False, passthrough=False)
- exists_mock(mocker.ARGS)
- my_mock.result(True)
- my_mock.replay()
+ with ExitStack() as mocks:
+ find_mock = mocks.enter_context(
+ mock.patch.object(util, 'find_devs_with',
+ return_value=[dev_name]))
+ exists_mock = mocks.enter_context(
+ mock.patch.object(os.path, 'exists',
+ return_value=True))
device = cfg_ds.device_name_to_device(name)
self.assertEquals(dev_name, device)
+ find_mock.assert_called_once_with(mock.ANY)
+ exists_mock.assert_called_once_with(mock.ANY)
+
def test_dev_ec2_remap(self):
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN,
@@ -156,16 +172,21 @@ class TestConfigDriveDataSource(MockerTestCase):
'root2k': None,
}
for name, dev_name in name_tests.items():
- with unit_helpers.mocker(verify_calls=False) as my_mock:
- exists_mock = my_mock.replace(os.path.exists,
- spec=False, passthrough=False)
- exists_mock(mocker.ARGS)
- my_mock.result(False)
- exists_mock(mocker.ARGS)
- my_mock.result(True)
- my_mock.replay()
+ # We want os.path.exists() to return False on its first call,
+ # and True on its second call. We use a handy generator as
+ # the mock side effect for this. The mocked function returns
+ # what the side effect returns.
+ def exists_side_effect():
+ yield False
+ yield True
+ with mock.patch.object(os.path, 'exists',
+ side_effect=exists_side_effect()):
device = cfg_ds.device_name_to_device(name)
self.assertEquals(dev_name, device)
+ # We don't assert the call count for os.path.exists() because
+ # not all of the entries in name_tests results in two calls to
+ # that function. Specifically, 'root2k' doesn't seem to call
+ # it at all.
def test_dev_ec2_map(self):
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
@@ -173,12 +194,6 @@ class TestConfigDriveDataSource(MockerTestCase):
None,
helpers.Paths({}))
found = ds.read_config_drive(self.tmp)
- exists_mock = self.mocker.replace(os.path.exists,
- spec=False, passthrough=False)
- exists_mock(mocker.ARGS)
- self.mocker.count(0, None)
- self.mocker.result(True)
- self.mocker.replay()
ec2_md = found['ec2-metadata']
os_md = found['metadata']
cfg_ds.ec2_metadata = ec2_md
@@ -193,8 +208,9 @@ class TestConfigDriveDataSource(MockerTestCase):
'root2k': None,
}
for name, dev_name in name_tests.items():
- device = cfg_ds.device_name_to_device(name)
- self.assertEquals(dev_name, device)
+ with mock.patch.object(os.path, 'exists', return_value=True):
+ device = cfg_ds.device_name_to_device(name)
+ self.assertEquals(dev_name, device)
def test_dir_valid(self):
"""Verify a dir is read as such."""
@@ -277,9 +293,8 @@ class TestConfigDriveDataSource(MockerTestCase):
util.is_partition = my_is_partition
devs_with_answers = {"TYPE=vfat": [],
- "TYPE=iso9660": ["/dev/vdb"],
- "LABEL=config-2": ["/dev/vdb"],
- }
+ "TYPE=iso9660": ["/dev/vdb"],
+ "LABEL=config-2": ["/dev/vdb"]}
self.assertEqual(["/dev/vdb"], ds.find_candidate_devs())
# add a vfat item
@@ -290,9 +305,10 @@ class TestConfigDriveDataSource(MockerTestCase):
# verify that partitions are considered, that have correct label.
devs_with_answers = {"TYPE=vfat": ["/dev/sda1"],
- "TYPE=iso9660": [], "LABEL=config-2": ["/dev/vdb3"]}
+ "TYPE=iso9660": [],
+ "LABEL=config-2": ["/dev/vdb3"]}
self.assertEqual(["/dev/vdb3"],
- ds.find_candidate_devs())
+ ds.find_candidate_devs())
finally:
util.find_devs_with = orig_find_devs_with
@@ -303,7 +319,7 @@ class TestConfigDriveDataSource(MockerTestCase):
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
myds = cfg_ds_from_dir(self.tmp)
self.assertEqual(myds.get_public_ssh_keys(),
- [OSTACK_META['public_keys']['mykey']])
+ [OSTACK_META['public_keys']['mykey']])
def cfg_ds_from_dir(seed_d):
@@ -326,13 +342,17 @@ def populate_ds_from_read_config(cfg_ds, source, results):
def populate_dir(seed_dir, files):
- for (name, content) in files.iteritems():
+ for (name, content) in files.items():
path = os.path.join(seed_dir, name)
dirname = os.path.dirname(path)
if not os.path.isdir(dirname):
os.makedirs(dirname)
- with open(path, "w") as fp:
+ if isinstance(content, six.text_type):
+ mode = "w"
+ else:
+ mode = "wb"
+
+ with open(path, mode) as fp:
fp.write(content)
- fp.close()
# vi: ts=4 expandtab