summaryrefslogtreecommitdiff
path: root/tests/unittests/test_datasource
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/test_datasource')
-rw-r--r--tests/unittests/test_datasource/test_altcloud.py44
-rw-r--r--tests/unittests/test_datasource/test_cloudsigma.py3
-rw-r--r--tests/unittests/test_datasource/test_configdrive.py3
-rw-r--r--tests/unittests/test_datasource/test_nocloud.py2
-rw-r--r--tests/unittests/test_datasource/test_opennebula.py1
-rw-r--r--tests/unittests/test_datasource/test_ovf.py8
-rw-r--r--tests/unittests/test_datasource/test_smartos.py94
7 files changed, 97 insertions, 58 deletions
diff --git a/tests/unittests/test_datasource/test_altcloud.py b/tests/unittests/test_datasource/test_altcloud.py
index 3253f3ad..ff35904e 100644
--- a/tests/unittests/test_datasource/test_altcloud.py
+++ b/tests/unittests/test_datasource/test_altcloud.py
@@ -262,64 +262,56 @@ class TestUserDataRhevm(CiTestCase):
'''
Test to exercise method: DataSourceAltCloud.user_data_rhevm()
'''
- cmd_pass = ['true']
- cmd_fail = ['false']
- cmd_not_found = ['bogus bad command']
-
def setUp(self):
'''Set up.'''
self.paths = helpers.Paths({'cloud_dir': '/tmp'})
- self.mount_dir = tempfile.mkdtemp()
+ self.mount_dir = self.tmp_dir()
_write_user_data_files(self.mount_dir, 'test user data')
-
- def tearDown(self):
- # Reset
-
- _remove_user_data_files(self.mount_dir)
-
- # Attempt to remove the temp dir ignoring errors
- try:
- shutil.rmtree(self.mount_dir)
- except OSError:
- pass
-
- dsac.CLOUD_INFO_FILE = '/etc/sysconfig/cloud-info'
- dsac.CMD_PROBE_FLOPPY = ['modprobe', 'floppy']
- dsac.CMD_UDEVADM_SETTLE = ['udevadm', 'settle',
- '--quiet', '--timeout=5']
+ self.add_patch(
+ 'cloudinit.sources.DataSourceAltCloud.modprobe_floppy',
+ 'm_modprobe_floppy', return_value=None)
+ self.add_patch(
+ 'cloudinit.sources.DataSourceAltCloud.util.udevadm_settle',
+ 'm_udevadm_settle', return_value=('', ''))
+ self.add_patch(
+ 'cloudinit.sources.DataSourceAltCloud.util.mount_cb',
+ 'm_mount_cb')
def test_mount_cb_fails(self):
'''Test user_data_rhevm() where mount_cb fails.'''
- dsac.CMD_PROBE_FLOPPY = self.cmd_pass
+ self.m_mount_cb.side_effect = util.MountFailedError("Failed Mount")
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
self.assertEqual(False, dsrc.user_data_rhevm())
def test_modprobe_fails(self):
'''Test user_data_rhevm() where modprobe fails.'''
- dsac.CMD_PROBE_FLOPPY = self.cmd_fail
+ self.m_modprobe_floppy.side_effect = util.ProcessExecutionError(
+ "Failed modprobe")
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
self.assertEqual(False, dsrc.user_data_rhevm())
def test_no_modprobe_cmd(self):
'''Test user_data_rhevm() with no modprobe command.'''
- dsac.CMD_PROBE_FLOPPY = self.cmd_not_found
+ self.m_modprobe_floppy.side_effect = util.ProcessExecutionError(
+ "No such file or dir")
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
self.assertEqual(False, dsrc.user_data_rhevm())
def test_udevadm_fails(self):
'''Test user_data_rhevm() where udevadm fails.'''
- dsac.CMD_UDEVADM_SETTLE = self.cmd_fail
+ self.m_udevadm_settle.side_effect = util.ProcessExecutionError(
+ "Failed settle.")
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
self.assertEqual(False, dsrc.user_data_rhevm())
def test_no_udevadm_cmd(self):
'''Test user_data_rhevm() with no udevadm command.'''
- dsac.CMD_UDEVADM_SETTLE = self.cmd_not_found
+ self.m_udevadm_settle.side_effect = OSError("No such file or dir")
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
self.assertEqual(False, dsrc.user_data_rhevm())
diff --git a/tests/unittests/test_datasource/test_cloudsigma.py b/tests/unittests/test_datasource/test_cloudsigma.py
index f6a59b6b..380ad1b5 100644
--- a/tests/unittests/test_datasource/test_cloudsigma.py
+++ b/tests/unittests/test_datasource/test_cloudsigma.py
@@ -42,6 +42,9 @@ class CepkoMock(Cepko):
class DataSourceCloudSigmaTest(test_helpers.CiTestCase):
def setUp(self):
super(DataSourceCloudSigmaTest, self).setUp()
+ self.add_patch(
+ "cloudinit.sources.DataSourceCloudSigma.util.is_container",
+ "m_is_container", return_value=False)
self.paths = helpers.Paths({'run_dir': self.tmp_dir()})
self.datasource = DataSourceCloudSigma.DataSourceCloudSigma(
"", "", paths=self.paths)
diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py
index 7e6fcbbf..b0abfc5f 100644
--- a/tests/unittests/test_datasource/test_configdrive.py
+++ b/tests/unittests/test_datasource/test_configdrive.py
@@ -224,6 +224,9 @@ class TestConfigDriveDataSource(CiTestCase):
def setUp(self):
super(TestConfigDriveDataSource, self).setUp()
+ self.add_patch(
+ "cloudinit.sources.DataSourceConfigDrive.util.find_devs_with",
+ "m_find_devs_with", return_value=[])
self.tmp = self.tmp_dir()
def test_ec2_metadata(self):
diff --git a/tests/unittests/test_datasource/test_nocloud.py b/tests/unittests/test_datasource/test_nocloud.py
index cdbd1e1a..21931eb7 100644
--- a/tests/unittests/test_datasource/test_nocloud.py
+++ b/tests/unittests/test_datasource/test_nocloud.py
@@ -25,6 +25,8 @@ class TestNoCloudDataSource(CiTestCase):
self.mocks.enter_context(
mock.patch.object(util, 'get_cmdline', return_value=self.cmdline))
+ self.mocks.enter_context(
+ mock.patch.object(util, 'read_dmi_data', return_value=None))
def test_nocloud_seed_dir(self):
md = {'instance-id': 'IID', 'dsmode': 'local'}
diff --git a/tests/unittests/test_datasource/test_opennebula.py b/tests/unittests/test_datasource/test_opennebula.py
index 36b4d771..61591017 100644
--- a/tests/unittests/test_datasource/test_opennebula.py
+++ b/tests/unittests/test_datasource/test_opennebula.py
@@ -43,6 +43,7 @@ DS_PATH = "cloudinit.sources.DataSourceOpenNebula"
class TestOpenNebulaDataSource(CiTestCase):
parsed_user = None
+ allowed_subp = ['bash']
def setUp(self):
super(TestOpenNebulaDataSource, self).setUp()
diff --git a/tests/unittests/test_datasource/test_ovf.py b/tests/unittests/test_datasource/test_ovf.py
index fc4eb36e..9d52eb99 100644
--- a/tests/unittests/test_datasource/test_ovf.py
+++ b/tests/unittests/test_datasource/test_ovf.py
@@ -124,7 +124,9 @@ class TestDatasourceOVF(CiTestCase):
ds = self.datasource(sys_cfg={}, distro={}, paths=paths)
retcode = wrap_and_call(
'cloudinit.sources.DataSourceOVF',
- {'util.read_dmi_data': None},
+ {'util.read_dmi_data': None,
+ 'transport_iso9660': (False, None, None),
+ 'transport_vmware_guestd': (False, None, None)},
ds.get_data)
self.assertFalse(retcode, 'Expected False return from ds.get_data')
self.assertIn(
@@ -138,7 +140,9 @@ class TestDatasourceOVF(CiTestCase):
paths=paths)
retcode = wrap_and_call(
'cloudinit.sources.DataSourceOVF',
- {'util.read_dmi_data': 'vmware'},
+ {'util.read_dmi_data': 'vmware',
+ 'transport_iso9660': (False, None, None),
+ 'transport_vmware_guestd': (False, None, None)},
ds.get_data)
self.assertFalse(retcode, 'Expected False return from ds.get_data')
self.assertIn(
diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py
index dca0b3d4..46d67b94 100644
--- a/tests/unittests/test_datasource/test_smartos.py
+++ b/tests/unittests/test_datasource/test_smartos.py
@@ -20,10 +20,8 @@ import multiprocessing
import os
import os.path
import re
-import shutil
import signal
import stat
-import tempfile
import unittest2
import uuid
@@ -31,15 +29,27 @@ from cloudinit import serial
from cloudinit.sources import DataSourceSmartOS
from cloudinit.sources.DataSourceSmartOS import (
convert_smartos_network_data as convert_net,
- SMARTOS_ENV_KVM, SERIAL_DEVICE, get_smartos_environ)
+ SMARTOS_ENV_KVM, SERIAL_DEVICE, get_smartos_environ,
+ identify_file)
import six
from cloudinit import helpers as c_helpers
-from cloudinit.util import (b64e, subp)
+from cloudinit.util import (
+ b64e, subp, ProcessExecutionError, which, write_file)
-from cloudinit.tests.helpers import mock, FilesystemMockingTestCase, TestCase
+from cloudinit.tests.helpers import (
+ CiTestCase, mock, FilesystemMockingTestCase, skipIf)
+
+try:
+ import serial as _pyserial
+ assert _pyserial # avoid pyflakes error F401: import unused
+ HAS_PYSERIAL = True
+except ImportError:
+ HAS_PYSERIAL = False
+
+DSMOS = 'cloudinit.sources.DataSourceSmartOS'
SDC_NICS = json.loads("""
[
{
@@ -366,37 +376,33 @@ class PsuedoJoyentClient(object):
class TestSmartOSDataSource(FilesystemMockingTestCase):
+ jmc_cfact = None
+ get_smartos_environ = None
+
def setUp(self):
super(TestSmartOSDataSource, self).setUp()
- dsmos = 'cloudinit.sources.DataSourceSmartOS'
- patcher = mock.patch(dsmos + ".jmc_client_factory")
- self.jmc_cfact = patcher.start()
- self.addCleanup(patcher.stop)
- patcher = mock.patch(dsmos + ".get_smartos_environ")
- self.get_smartos_environ = patcher.start()
- self.addCleanup(patcher.stop)
-
- self.tmp = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.tmp)
- self.paths = c_helpers.Paths(
- {'cloud_dir': self.tmp, 'run_dir': self.tmp})
-
- self.legacy_user_d = os.path.join(self.tmp, 'legacy_user_tmp')
+ self.add_patch(DSMOS + ".get_smartos_environ", "get_smartos_environ")
+ self.add_patch(DSMOS + ".jmc_client_factory", "jmc_cfact")
+ self.legacy_user_d = self.tmp_path('legacy_user_tmp')
os.mkdir(self.legacy_user_d)
-
- self.orig_lud = DataSourceSmartOS.LEGACY_USER_D
- DataSourceSmartOS.LEGACY_USER_D = self.legacy_user_d
-
- def tearDown(self):
- DataSourceSmartOS.LEGACY_USER_D = self.orig_lud
- super(TestSmartOSDataSource, self).tearDown()
+ self.add_patch(DSMOS + ".LEGACY_USER_D", "m_legacy_user_d",
+ autospec=False, new=self.legacy_user_d)
+ self.add_patch(DSMOS + ".identify_file", "m_identify_file",
+ return_value="text/plain")
def _get_ds(self, mockdata=None, mode=DataSourceSmartOS.SMARTOS_ENV_KVM,
sys_cfg=None, ds_cfg=None):
self.jmc_cfact.return_value = PsuedoJoyentClient(mockdata)
self.get_smartos_environ.return_value = mode
+ tmpd = self.tmp_dir()
+ dirs = {'cloud_dir': self.tmp_path('cloud_dir', tmpd),
+ 'run_dir': self.tmp_path('run_dir')}
+ for d in dirs.values():
+ os.mkdir(d)
+ paths = c_helpers.Paths(dirs)
+
if sys_cfg is None:
sys_cfg = {}
@@ -405,7 +411,7 @@ class TestSmartOSDataSource(FilesystemMockingTestCase):
sys_cfg['datasource']['SmartOS'] = ds_cfg
return DataSourceSmartOS.DataSourceSmartOS(
- sys_cfg, distro=None, paths=self.paths)
+ sys_cfg, distro=None, paths=paths)
def test_no_base64(self):
ds_cfg = {'no_base64_decode': ['test_var1'], 'all_base': True}
@@ -493,6 +499,7 @@ class TestSmartOSDataSource(FilesystemMockingTestCase):
dsrc.metadata['user-script'])
legacy_script_f = "%s/user-script" % self.legacy_user_d
+ print("legacy_script_f=%s" % legacy_script_f)
self.assertTrue(os.path.exists(legacy_script_f))
self.assertTrue(os.path.islink(legacy_script_f))
user_script_perm = oct(os.stat(legacy_script_f)[stat.ST_MODE])[-3:]
@@ -640,6 +647,28 @@ class TestSmartOSDataSource(FilesystemMockingTestCase):
mydscfg['disk_aliases']['FOO'])
+class TestIdentifyFile(CiTestCase):
+ """Test the 'identify_file' utility."""
+ @skipIf(not which("file"), "command 'file' not available.")
+ def test_file_happy_path(self):
+ """Test file is available and functional on plain text."""
+ fname = self.tmp_path("myfile")
+ write_file(fname, "plain text content here\n")
+ with self.allow_subp(["file"]):
+ self.assertEqual("text/plain", identify_file(fname))
+
+ @mock.patch(DSMOS + ".util.subp")
+ def test_returns_none_on_error(self, m_subp):
+ """On 'file' execution error, None should be returned."""
+ m_subp.side_effect = ProcessExecutionError("FILE_FAILED", exit_code=99)
+ fname = self.tmp_path("myfile")
+ write_file(fname, "plain text content here\n")
+ self.assertEqual(None, identify_file(fname))
+ self.assertEqual(
+ [mock.call(["file", "--brief", "--mime-type", fname])],
+ m_subp.call_args_list)
+
+
class ShortReader(object):
"""Implements a 'read' interface for bytes provided.
much like io.BytesIO but the 'endbyte' acts as if EOF.
@@ -893,7 +922,7 @@ class TestJoyentMetadataClient(FilesystemMockingTestCase):
self.assertEqual(client.list(), [])
-class TestNetworkConversion(TestCase):
+class TestNetworkConversion(CiTestCase):
def test_convert_simple(self):
expected = {
'version': 1,
@@ -1058,7 +1087,8 @@ class TestNetworkConversion(TestCase):
"Only supported on KVM and bhyve guests under SmartOS")
@unittest2.skipUnless(os.access(SERIAL_DEVICE, os.W_OK),
"Requires write access to " + SERIAL_DEVICE)
-class TestSerialConcurrency(TestCase):
+@unittest2.skipUnless(HAS_PYSERIAL is True, "pyserial not available")
+class TestSerialConcurrency(CiTestCase):
"""
This class tests locking on an actual serial port, and as such can only
be run in a kvm or bhyve guest running on a SmartOS host. A test run on
@@ -1066,7 +1096,11 @@ class TestSerialConcurrency(TestCase):
there is only one session over a connection. In contrast, in the
absence of proper locking multiple processes opening the same serial
port can corrupt each others' exchanges with the metadata server.
+
+ This takes on the order of 2 to 3 minutes to run.
"""
+ allowed_subp = ['mdata-get']
+
def setUp(self):
self.mdata_proc = multiprocessing.Process(target=self.start_mdata_loop)
self.mdata_proc.start()
@@ -1097,7 +1131,7 @@ class TestSerialConcurrency(TestCase):
keys = [tup[0] for tup in ds.SMARTOS_ATTRIB_MAP.values()]
keys.extend(ds.SMARTOS_ATTRIB_JSON.values())
- client = ds.jmc_client_factory()
+ client = ds.jmc_client_factory(smartos_type=SMARTOS_ENV_KVM)
self.assertIsNotNone(client)
# The behavior that we are testing for was observed mdata-get running