summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/unittests/test_datasource/test_altcloud.py44
-rw-r--r--tests/unittests/test_datasource/test_cloudsigma.py3
-rw-r--r--tests/unittests/test_datasource/test_configdrive.py3
-rw-r--r--tests/unittests/test_datasource/test_nocloud.py2
-rw-r--r--tests/unittests/test_datasource/test_opennebula.py1
-rw-r--r--tests/unittests/test_datasource/test_ovf.py8
-rw-r--r--tests/unittests/test_datasource/test_smartos.py94
-rw-r--r--tests/unittests/test_ds_identify.py1
-rw-r--r--tests/unittests/test_handler/test_handler_apt_source_v3.py11
-rw-r--r--tests/unittests/test_handler/test_handler_bootcmd.py10
-rw-r--r--tests/unittests/test_handler/test_handler_chef.py18
-rw-r--r--tests/unittests/test_handler/test_handler_resizefs.py8
-rw-r--r--tests/unittests/test_handler/test_schema.py12
-rw-r--r--tests/unittests/test_net.py18
-rw-r--r--tests/unittests/test_util.py27
15 files changed, 170 insertions, 90 deletions
diff --git a/tests/unittests/test_datasource/test_altcloud.py b/tests/unittests/test_datasource/test_altcloud.py
index 3253f3ad..ff35904e 100644
--- a/tests/unittests/test_datasource/test_altcloud.py
+++ b/tests/unittests/test_datasource/test_altcloud.py
@@ -262,64 +262,56 @@ class TestUserDataRhevm(CiTestCase):
'''
Test to exercise method: DataSourceAltCloud.user_data_rhevm()
'''
- cmd_pass = ['true']
- cmd_fail = ['false']
- cmd_not_found = ['bogus bad command']
-
def setUp(self):
'''Set up.'''
self.paths = helpers.Paths({'cloud_dir': '/tmp'})
- self.mount_dir = tempfile.mkdtemp()
+ self.mount_dir = self.tmp_dir()
_write_user_data_files(self.mount_dir, 'test user data')
-
- def tearDown(self):
- # Reset
-
- _remove_user_data_files(self.mount_dir)
-
- # Attempt to remove the temp dir ignoring errors
- try:
- shutil.rmtree(self.mount_dir)
- except OSError:
- pass
-
- dsac.CLOUD_INFO_FILE = '/etc/sysconfig/cloud-info'
- dsac.CMD_PROBE_FLOPPY = ['modprobe', 'floppy']
- dsac.CMD_UDEVADM_SETTLE = ['udevadm', 'settle',
- '--quiet', '--timeout=5']
+ self.add_patch(
+ 'cloudinit.sources.DataSourceAltCloud.modprobe_floppy',
+ 'm_modprobe_floppy', return_value=None)
+ self.add_patch(
+ 'cloudinit.sources.DataSourceAltCloud.util.udevadm_settle',
+ 'm_udevadm_settle', return_value=('', ''))
+ self.add_patch(
+ 'cloudinit.sources.DataSourceAltCloud.util.mount_cb',
+ 'm_mount_cb')
def test_mount_cb_fails(self):
'''Test user_data_rhevm() where mount_cb fails.'''
- dsac.CMD_PROBE_FLOPPY = self.cmd_pass
+ self.m_mount_cb.side_effect = util.MountFailedError("Failed Mount")
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
self.assertEqual(False, dsrc.user_data_rhevm())
def test_modprobe_fails(self):
'''Test user_data_rhevm() where modprobe fails.'''
- dsac.CMD_PROBE_FLOPPY = self.cmd_fail
+ self.m_modprobe_floppy.side_effect = util.ProcessExecutionError(
+ "Failed modprobe")
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
self.assertEqual(False, dsrc.user_data_rhevm())
def test_no_modprobe_cmd(self):
'''Test user_data_rhevm() with no modprobe command.'''
- dsac.CMD_PROBE_FLOPPY = self.cmd_not_found
+ self.m_modprobe_floppy.side_effect = util.ProcessExecutionError(
+ "No such file or dir")
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
self.assertEqual(False, dsrc.user_data_rhevm())
def test_udevadm_fails(self):
'''Test user_data_rhevm() where udevadm fails.'''
- dsac.CMD_UDEVADM_SETTLE = self.cmd_fail
+ self.m_udevadm_settle.side_effect = util.ProcessExecutionError(
+ "Failed settle.")
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
self.assertEqual(False, dsrc.user_data_rhevm())
def test_no_udevadm_cmd(self):
'''Test user_data_rhevm() with no udevadm command.'''
- dsac.CMD_UDEVADM_SETTLE = self.cmd_not_found
+ self.m_udevadm_settle.side_effect = OSError("No such file or dir")
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
self.assertEqual(False, dsrc.user_data_rhevm())
diff --git a/tests/unittests/test_datasource/test_cloudsigma.py b/tests/unittests/test_datasource/test_cloudsigma.py
index f6a59b6b..380ad1b5 100644
--- a/tests/unittests/test_datasource/test_cloudsigma.py
+++ b/tests/unittests/test_datasource/test_cloudsigma.py
@@ -42,6 +42,9 @@ class CepkoMock(Cepko):
class DataSourceCloudSigmaTest(test_helpers.CiTestCase):
def setUp(self):
super(DataSourceCloudSigmaTest, self).setUp()
+ self.add_patch(
+ "cloudinit.sources.DataSourceCloudSigma.util.is_container",
+ "m_is_container", return_value=False)
self.paths = helpers.Paths({'run_dir': self.tmp_dir()})
self.datasource = DataSourceCloudSigma.DataSourceCloudSigma(
"", "", paths=self.paths)
diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py
index 7e6fcbbf..b0abfc5f 100644
--- a/tests/unittests/test_datasource/test_configdrive.py
+++ b/tests/unittests/test_datasource/test_configdrive.py
@@ -224,6 +224,9 @@ class TestConfigDriveDataSource(CiTestCase):
def setUp(self):
super(TestConfigDriveDataSource, self).setUp()
+ self.add_patch(
+ "cloudinit.sources.DataSourceConfigDrive.util.find_devs_with",
+ "m_find_devs_with", return_value=[])
self.tmp = self.tmp_dir()
def test_ec2_metadata(self):
diff --git a/tests/unittests/test_datasource/test_nocloud.py b/tests/unittests/test_datasource/test_nocloud.py
index cdbd1e1a..21931eb7 100644
--- a/tests/unittests/test_datasource/test_nocloud.py
+++ b/tests/unittests/test_datasource/test_nocloud.py
@@ -25,6 +25,8 @@ class TestNoCloudDataSource(CiTestCase):
self.mocks.enter_context(
mock.patch.object(util, 'get_cmdline', return_value=self.cmdline))
+ self.mocks.enter_context(
+ mock.patch.object(util, 'read_dmi_data', return_value=None))
def test_nocloud_seed_dir(self):
md = {'instance-id': 'IID', 'dsmode': 'local'}
diff --git a/tests/unittests/test_datasource/test_opennebula.py b/tests/unittests/test_datasource/test_opennebula.py
index 36b4d771..61591017 100644
--- a/tests/unittests/test_datasource/test_opennebula.py
+++ b/tests/unittests/test_datasource/test_opennebula.py
@@ -43,6 +43,7 @@ DS_PATH = "cloudinit.sources.DataSourceOpenNebula"
class TestOpenNebulaDataSource(CiTestCase):
parsed_user = None
+ allowed_subp = ['bash']
def setUp(self):
super(TestOpenNebulaDataSource, self).setUp()
diff --git a/tests/unittests/test_datasource/test_ovf.py b/tests/unittests/test_datasource/test_ovf.py
index fc4eb36e..9d52eb99 100644
--- a/tests/unittests/test_datasource/test_ovf.py
+++ b/tests/unittests/test_datasource/test_ovf.py
@@ -124,7 +124,9 @@ class TestDatasourceOVF(CiTestCase):
ds = self.datasource(sys_cfg={}, distro={}, paths=paths)
retcode = wrap_and_call(
'cloudinit.sources.DataSourceOVF',
- {'util.read_dmi_data': None},
+ {'util.read_dmi_data': None,
+ 'transport_iso9660': (False, None, None),
+ 'transport_vmware_guestd': (False, None, None)},
ds.get_data)
self.assertFalse(retcode, 'Expected False return from ds.get_data')
self.assertIn(
@@ -138,7 +140,9 @@ class TestDatasourceOVF(CiTestCase):
paths=paths)
retcode = wrap_and_call(
'cloudinit.sources.DataSourceOVF',
- {'util.read_dmi_data': 'vmware'},
+ {'util.read_dmi_data': 'vmware',
+ 'transport_iso9660': (False, None, None),
+ 'transport_vmware_guestd': (False, None, None)},
ds.get_data)
self.assertFalse(retcode, 'Expected False return from ds.get_data')
self.assertIn(
diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py
index dca0b3d4..46d67b94 100644
--- a/tests/unittests/test_datasource/test_smartos.py
+++ b/tests/unittests/test_datasource/test_smartos.py
@@ -20,10 +20,8 @@ import multiprocessing
import os
import os.path
import re
-import shutil
import signal
import stat
-import tempfile
import unittest2
import uuid
@@ -31,15 +29,27 @@ from cloudinit import serial
from cloudinit.sources import DataSourceSmartOS
from cloudinit.sources.DataSourceSmartOS import (
convert_smartos_network_data as convert_net,
- SMARTOS_ENV_KVM, SERIAL_DEVICE, get_smartos_environ)
+ SMARTOS_ENV_KVM, SERIAL_DEVICE, get_smartos_environ,
+ identify_file)
import six
from cloudinit import helpers as c_helpers
-from cloudinit.util import (b64e, subp)
+from cloudinit.util import (
+ b64e, subp, ProcessExecutionError, which, write_file)
-from cloudinit.tests.helpers import mock, FilesystemMockingTestCase, TestCase
+from cloudinit.tests.helpers import (
+ CiTestCase, mock, FilesystemMockingTestCase, skipIf)
+
+try:
+ import serial as _pyserial
+ assert _pyserial # avoid pyflakes error F401: import unused
+ HAS_PYSERIAL = True
+except ImportError:
+ HAS_PYSERIAL = False
+
+DSMOS = 'cloudinit.sources.DataSourceSmartOS'
SDC_NICS = json.loads("""
[
{
@@ -366,37 +376,33 @@ class PsuedoJoyentClient(object):
class TestSmartOSDataSource(FilesystemMockingTestCase):
+ jmc_cfact = None
+ get_smartos_environ = None
+
def setUp(self):
super(TestSmartOSDataSource, self).setUp()
- dsmos = 'cloudinit.sources.DataSourceSmartOS'
- patcher = mock.patch(dsmos + ".jmc_client_factory")
- self.jmc_cfact = patcher.start()
- self.addCleanup(patcher.stop)
- patcher = mock.patch(dsmos + ".get_smartos_environ")
- self.get_smartos_environ = patcher.start()
- self.addCleanup(patcher.stop)
-
- self.tmp = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.tmp)
- self.paths = c_helpers.Paths(
- {'cloud_dir': self.tmp, 'run_dir': self.tmp})
-
- self.legacy_user_d = os.path.join(self.tmp, 'legacy_user_tmp')
+ self.add_patch(DSMOS + ".get_smartos_environ", "get_smartos_environ")
+ self.add_patch(DSMOS + ".jmc_client_factory", "jmc_cfact")
+ self.legacy_user_d = self.tmp_path('legacy_user_tmp')
os.mkdir(self.legacy_user_d)
-
- self.orig_lud = DataSourceSmartOS.LEGACY_USER_D
- DataSourceSmartOS.LEGACY_USER_D = self.legacy_user_d
-
- def tearDown(self):
- DataSourceSmartOS.LEGACY_USER_D = self.orig_lud
- super(TestSmartOSDataSource, self).tearDown()
+ self.add_patch(DSMOS + ".LEGACY_USER_D", "m_legacy_user_d",
+ autospec=False, new=self.legacy_user_d)
+ self.add_patch(DSMOS + ".identify_file", "m_identify_file",
+ return_value="text/plain")
def _get_ds(self, mockdata=None, mode=DataSourceSmartOS.SMARTOS_ENV_KVM,
sys_cfg=None, ds_cfg=None):
self.jmc_cfact.return_value = PsuedoJoyentClient(mockdata)
self.get_smartos_environ.return_value = mode
+ tmpd = self.tmp_dir()
+ dirs = {'cloud_dir': self.tmp_path('cloud_dir', tmpd),
+ 'run_dir': self.tmp_path('run_dir')}
+ for d in dirs.values():
+ os.mkdir(d)
+ paths = c_helpers.Paths(dirs)
+
if sys_cfg is None:
sys_cfg = {}
@@ -405,7 +411,7 @@ class TestSmartOSDataSource(FilesystemMockingTestCase):
sys_cfg['datasource']['SmartOS'] = ds_cfg
return DataSourceSmartOS.DataSourceSmartOS(
- sys_cfg, distro=None, paths=self.paths)
+ sys_cfg, distro=None, paths=paths)
def test_no_base64(self):
ds_cfg = {'no_base64_decode': ['test_var1'], 'all_base': True}
@@ -493,6 +499,7 @@ class TestSmartOSDataSource(FilesystemMockingTestCase):
dsrc.metadata['user-script'])
legacy_script_f = "%s/user-script" % self.legacy_user_d
+ print("legacy_script_f=%s" % legacy_script_f)
self.assertTrue(os.path.exists(legacy_script_f))
self.assertTrue(os.path.islink(legacy_script_f))
user_script_perm = oct(os.stat(legacy_script_f)[stat.ST_MODE])[-3:]
@@ -640,6 +647,28 @@ class TestSmartOSDataSource(FilesystemMockingTestCase):
mydscfg['disk_aliases']['FOO'])
+class TestIdentifyFile(CiTestCase):
+ """Test the 'identify_file' utility."""
+ @skipIf(not which("file"), "command 'file' not available.")
+ def test_file_happy_path(self):
+ """Test file is available and functional on plain text."""
+ fname = self.tmp_path("myfile")
+ write_file(fname, "plain text content here\n")
+ with self.allow_subp(["file"]):
+ self.assertEqual("text/plain", identify_file(fname))
+
+ @mock.patch(DSMOS + ".util.subp")
+ def test_returns_none_on_error(self, m_subp):
+ """On 'file' execution error, None should be returned."""
+ m_subp.side_effect = ProcessExecutionError("FILE_FAILED", exit_code=99)
+ fname = self.tmp_path("myfile")
+ write_file(fname, "plain text content here\n")
+ self.assertEqual(None, identify_file(fname))
+ self.assertEqual(
+ [mock.call(["file", "--brief", "--mime-type", fname])],
+ m_subp.call_args_list)
+
+
class ShortReader(object):
"""Implements a 'read' interface for bytes provided.
much like io.BytesIO but the 'endbyte' acts as if EOF.
@@ -893,7 +922,7 @@ class TestJoyentMetadataClient(FilesystemMockingTestCase):
self.assertEqual(client.list(), [])
-class TestNetworkConversion(TestCase):
+class TestNetworkConversion(CiTestCase):
def test_convert_simple(self):
expected = {
'version': 1,
@@ -1058,7 +1087,8 @@ class TestNetworkConversion(TestCase):
"Only supported on KVM and bhyve guests under SmartOS")
@unittest2.skipUnless(os.access(SERIAL_DEVICE, os.W_OK),
"Requires write access to " + SERIAL_DEVICE)
-class TestSerialConcurrency(TestCase):
+@unittest2.skipUnless(HAS_PYSERIAL is True, "pyserial not available")
+class TestSerialConcurrency(CiTestCase):
"""
This class tests locking on an actual serial port, and as such can only
be run in a kvm or bhyve guest running on a SmartOS host. A test run on
@@ -1066,7 +1096,11 @@ class TestSerialConcurrency(TestCase):
there is only one session over a connection. In contrast, in the
absence of proper locking multiple processes opening the same serial
port can corrupt each others' exchanges with the metadata server.
+
+ This takes on the order of 2 to 3 minutes to run.
"""
+ allowed_subp = ['mdata-get']
+
def setUp(self):
self.mdata_proc = multiprocessing.Process(target=self.start_mdata_loop)
self.mdata_proc.start()
@@ -1097,7 +1131,7 @@ class TestSerialConcurrency(TestCase):
keys = [tup[0] for tup in ds.SMARTOS_ATTRIB_MAP.values()]
keys.extend(ds.SMARTOS_ATTRIB_JSON.values())
- client = ds.jmc_client_factory()
+ client = ds.jmc_client_factory(smartos_type=SMARTOS_ENV_KVM)
self.assertIsNotNone(client)
# The behavior that we are testing for was observed mdata-get running
diff --git a/tests/unittests/test_ds_identify.py b/tests/unittests/test_ds_identify.py
index e08e7908..46778e95 100644
--- a/tests/unittests/test_ds_identify.py
+++ b/tests/unittests/test_ds_identify.py
@@ -89,6 +89,7 @@ CallReturn = namedtuple('CallReturn',
class DsIdentifyBase(CiTestCase):
dsid_path = os.path.realpath('tools/ds-identify')
+ allowed_subp = ['sh']
def call(self, rootd=None, mocks=None, func="main", args=None, files=None,
policy_dmi=DI_DEFAULT_POLICY,
diff --git a/tests/unittests/test_handler/test_handler_apt_source_v3.py b/tests/unittests/test_handler/test_handler_apt_source_v3.py
index 7a64c230..a81c67c7 100644
--- a/tests/unittests/test_handler/test_handler_apt_source_v3.py
+++ b/tests/unittests/test_handler/test_handler_apt_source_v3.py
@@ -48,6 +48,10 @@ ADD_APT_REPO_MATCH = r"^[\w-]+:\w"
TARGET = None
+MOCK_LSB_RELEASE_DATA = {
+ 'id': 'Ubuntu', 'description': 'Ubuntu 18.04.1 LTS',
+ 'release': '18.04', 'codename': 'bionic'}
+
class TestAptSourceConfig(t_help.FilesystemMockingTestCase):
"""TestAptSourceConfig
@@ -64,6 +68,9 @@ class TestAptSourceConfig(t_help.FilesystemMockingTestCase):
self.aptlistfile3 = os.path.join(self.tmp, "single-deb3.list")
self.join = os.path.join
self.matcher = re.compile(ADD_APT_REPO_MATCH).search
+ self.add_patch(
+ 'cloudinit.config.cc_apt_configure.util.lsb_release',
+ 'm_lsb_release', return_value=MOCK_LSB_RELEASE_DATA.copy())
@staticmethod
def _add_apt_sources(*args, **kwargs):
@@ -76,7 +83,7 @@ class TestAptSourceConfig(t_help.FilesystemMockingTestCase):
Get the most basic default mrror and release info to be used in tests
"""
params = {}
- params['RELEASE'] = util.lsb_release()['codename']
+ params['RELEASE'] = MOCK_LSB_RELEASE_DATA['release']
arch = 'amd64'
params['MIRROR'] = cc_apt_configure.\
get_default_mirrors(arch)["PRIMARY"]
@@ -464,7 +471,7 @@ class TestAptSourceConfig(t_help.FilesystemMockingTestCase):
'uri':
'http://testsec.ubuntu.com/%s/' % component}]}
post = ("%s_dists_%s-updates_InRelease" %
- (component, util.lsb_release()['codename']))
+ (component, MOCK_LSB_RELEASE_DATA['codename']))
fromfn = ("%s/%s_%s" % (pre, archive, post))
tofn = ("%s/test.ubuntu.com_%s" % (pre, post))
diff --git a/tests/unittests/test_handler/test_handler_bootcmd.py b/tests/unittests/test_handler/test_handler_bootcmd.py
index b1375269..a76760fa 100644
--- a/tests/unittests/test_handler/test_handler_bootcmd.py
+++ b/tests/unittests/test_handler/test_handler_bootcmd.py
@@ -118,7 +118,8 @@ class TestBootcmd(CiTestCase):
'echo {0} $INSTANCE_ID > {1}'.format(my_id, out_file)]}
with mock.patch(self._etmpfile_path, FakeExtendedTempFile):
- handle('cc_bootcmd', valid_config, cc, LOG, [])
+ with self.allow_subp(['/bin/sh']):
+ handle('cc_bootcmd', valid_config, cc, LOG, [])
self.assertEqual(my_id + ' iid-datasource-none\n',
util.load_file(out_file))
@@ -128,12 +129,13 @@ class TestBootcmd(CiTestCase):
valid_config = {'bootcmd': ['exit 1']} # Script with error
with mock.patch(self._etmpfile_path, FakeExtendedTempFile):
- with self.assertRaises(util.ProcessExecutionError) as ctxt_manager:
- handle('does-not-matter', valid_config, cc, LOG, [])
+ with self.allow_subp(['/bin/sh']):
+ with self.assertRaises(util.ProcessExecutionError) as ctxt:
+ handle('does-not-matter', valid_config, cc, LOG, [])
self.assertIn(
'Unexpected error while running command.\n'
"Command: ['/bin/sh',",
- str(ctxt_manager.exception))
+ str(ctxt.exception))
self.assertIn(
'Failed to run bootcmd module does-not-matter',
self.logs.getvalue())
diff --git a/tests/unittests/test_handler/test_handler_chef.py b/tests/unittests/test_handler/test_handler_chef.py
index f4bbd66d..b16532ea 100644
--- a/tests/unittests/test_handler/test_handler_chef.py
+++ b/tests/unittests/test_handler/test_handler_chef.py
@@ -36,13 +36,21 @@ class TestInstallChefOmnibus(HttprettyTestCase):
@mock.patch("cloudinit.config.cc_chef.OMNIBUS_URL", OMNIBUS_URL_HTTP)
def test_install_chef_from_omnibus_runs_chef_url_content(self):
- """install_chef_from_omnibus runs downloaded OMNIBUS_URL as script."""
- chef_outfile = self.tmp_path('chef.out', self.new_root)
- response = '#!/bin/bash\necho "Hi Mom" > {0}'.format(chef_outfile)
+ """install_chef_from_omnibus calls subp_blob_in_tempfile."""
+ response = b'#!/bin/bash\necho "Hi Mom"'
httpretty.register_uri(
httpretty.GET, cc_chef.OMNIBUS_URL, body=response, status=200)
- cc_chef.install_chef_from_omnibus()
- self.assertEqual('Hi Mom\n', util.load_file(chef_outfile))
+ ret = (None, None) # stdout, stderr but capture=False
+
+ with mock.patch("cloudinit.config.cc_chef.util.subp_blob_in_tempfile",
+ return_value=ret) as m_subp_blob:
+ cc_chef.install_chef_from_omnibus()
+ # admittedly whitebox, but assuming subp_blob_in_tempfile works
+ # this should be fine.
+ self.assertEqual(
+ [mock.call(blob=response, args=[], basename='chef-omnibus-install',
+ capture=False)],
+ m_subp_blob.call_args_list)
@mock.patch('cloudinit.config.cc_chef.url_helper.readurl')
@mock.patch('cloudinit.config.cc_chef.util.subp_blob_in_tempfile')
diff --git a/tests/unittests/test_handler/test_handler_resizefs.py b/tests/unittests/test_handler/test_handler_resizefs.py
index f92175fd..feca56c2 100644
--- a/tests/unittests/test_handler/test_handler_resizefs.py
+++ b/tests/unittests/test_handler/test_handler_resizefs.py
@@ -150,10 +150,12 @@ class TestResizefs(CiTestCase):
self.assertEqual(('growfs', '-y', devpth),
_resize_ufs(mount_point, devpth))
+ @mock.patch('cloudinit.util.is_container', return_value=False)
@mock.patch('cloudinit.util.get_mount_info')
@mock.patch('cloudinit.util.get_device_info_from_zpool')
@mock.patch('cloudinit.util.parse_mount')
- def test_handle_zfs_root(self, mount_info, zpool_info, parse_mount):
+ def test_handle_zfs_root(self, mount_info, zpool_info, parse_mount,
+ is_container):
devpth = 'vmzroot/ROOT/freebsd'
disk = 'gpt/system'
fs_type = 'zfs'
@@ -354,8 +356,10 @@ class TestMaybeGetDevicePathAsWritableBlock(CiTestCase):
('btrfs', 'filesystem', 'resize', 'max', '/'),
_resize_btrfs("/", "/dev/sda1"))
+ @mock.patch('cloudinit.util.is_container', return_value=True)
@mock.patch('cloudinit.util.is_FreeBSD')
- def test_maybe_get_writable_device_path_zfs_freebsd(self, freebsd):
+ def test_maybe_get_writable_device_path_zfs_freebsd(self, freebsd,
+ m_is_container):
freebsd.return_value = True
info = 'dev=gpt/system mnt_point=/ path=/'
devpth = maybe_get_writable_device_path('gpt/system', info, LOG)
diff --git a/tests/unittests/test_handler/test_schema.py b/tests/unittests/test_handler/test_schema.py
index fb266faf..1bad07f6 100644
--- a/tests/unittests/test_handler/test_schema.py
+++ b/tests/unittests/test_handler/test_schema.py
@@ -4,7 +4,7 @@ from cloudinit.config.schema import (
CLOUD_CONFIG_HEADER, SchemaValidationError, annotated_cloudconfig_file,
get_schema_doc, get_schema, validate_cloudconfig_file,
validate_cloudconfig_schema, main)
-from cloudinit.util import subp, write_file
+from cloudinit.util import write_file
from cloudinit.tests.helpers import CiTestCase, mock, skipUnlessJsonSchema
@@ -406,8 +406,14 @@ class CloudTestsIntegrationTest(CiTestCase):
integration_testdir = os.path.sep.join(
[testsdir, 'cloud_tests', 'testcases'])
errors = []
- out, _ = subp(['find', integration_testdir, '-name', '*yaml'])
- for filename in out.splitlines():
+
+ yaml_files = []
+ for root, _dirnames, filenames in os.walk(integration_testdir):
+ yaml_files.extend([os.path.join(root, f)
+ for f in filenames if f.endswith(".yaml")])
+ self.assertTrue(len(yaml_files) > 0)
+
+ for filename in yaml_files:
test_cfg = safe_load(open(filename))
cloud_config = test_cfg.get('cloud_config')
if cloud_config:
diff --git a/tests/unittests/test_net.py b/tests/unittests/test_net.py
index 05d5c72c..f3165da9 100644
--- a/tests/unittests/test_net.py
+++ b/tests/unittests/test_net.py
@@ -1653,6 +1653,12 @@ def _setup_test(tmp_dir, mock_get_devicelist, mock_read_sys_net,
class TestGenerateFallbackConfig(CiTestCase):
+ def setUp(self):
+ super(TestGenerateFallbackConfig, self).setUp()
+ self.add_patch(
+ "cloudinit.util.get_cmdline", "m_get_cmdline",
+ return_value="root=/dev/sda1")
+
@mock.patch("cloudinit.net.sys_dev_path")
@mock.patch("cloudinit.net.read_sys_net")
@mock.patch("cloudinit.net.get_devicelist")
@@ -1895,12 +1901,13 @@ class TestRhelSysConfigRendering(CiTestCase):
if missing:
raise AssertionError("Missing headers in: %s" % missing)
+ @mock.patch("cloudinit.net.util.get_cmdline", return_value="root=myroot")
@mock.patch("cloudinit.net.sys_dev_path")
@mock.patch("cloudinit.net.read_sys_net")
@mock.patch("cloudinit.net.get_devicelist")
def test_default_generation(self, mock_get_devicelist,
mock_read_sys_net,
- mock_sys_dev_path):
+ mock_sys_dev_path, m_get_cmdline):
tmp_dir = self.tmp_dir()
_setup_test(tmp_dir, mock_get_devicelist,
mock_read_sys_net, mock_sys_dev_path)
@@ -2183,12 +2190,13 @@ class TestOpenSuseSysConfigRendering(CiTestCase):
if missing:
raise AssertionError("Missing headers in: %s" % missing)
+ @mock.patch("cloudinit.net.util.get_cmdline", return_value="root=myroot")
@mock.patch("cloudinit.net.sys_dev_path")
@mock.patch("cloudinit.net.read_sys_net")
@mock.patch("cloudinit.net.get_devicelist")
def test_default_generation(self, mock_get_devicelist,
mock_read_sys_net,
- mock_sys_dev_path):
+ mock_sys_dev_path, m_get_cmdline):
tmp_dir = self.tmp_dir()
_setup_test(tmp_dir, mock_get_devicelist,
mock_read_sys_net, mock_sys_dev_path)
@@ -2429,12 +2437,13 @@ USERCTL=no
class TestEniNetRendering(CiTestCase):
+ @mock.patch("cloudinit.net.util.get_cmdline", return_value="root=myroot")
@mock.patch("cloudinit.net.sys_dev_path")
@mock.patch("cloudinit.net.read_sys_net")
@mock.patch("cloudinit.net.get_devicelist")
def test_default_generation(self, mock_get_devicelist,
mock_read_sys_net,
- mock_sys_dev_path):
+ mock_sys_dev_path, m_get_cmdline):
tmp_dir = self.tmp_dir()
_setup_test(tmp_dir, mock_get_devicelist,
mock_read_sys_net, mock_sys_dev_path)
@@ -2482,6 +2491,7 @@ iface eth0 inet dhcp
class TestNetplanNetRendering(CiTestCase):
+ @mock.patch("cloudinit.net.util.get_cmdline", return_value="root=myroot")
@mock.patch("cloudinit.net.netplan._clean_default")
@mock.patch("cloudinit.net.sys_dev_path")
@mock.patch("cloudinit.net.read_sys_net")
@@ -2489,7 +2499,7 @@ class TestNetplanNetRendering(CiTestCase):
def test_default_generation(self, mock_get_devicelist,
mock_read_sys_net,
mock_sys_dev_path,
- mock_clean_default):
+ mock_clean_default, m_get_cmdline):
tmp_dir = self.tmp_dir()
_setup_test(tmp_dir, mock_get_devicelist,
mock_read_sys_net, mock_sys_dev_path)
diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py
index 7a203ce2..5a14479a 100644
--- a/tests/unittests/test_util.py
+++ b/tests/unittests/test_util.py
@@ -24,6 +24,7 @@ except ImportError:
BASH = util.which('bash')
+BOGUS_COMMAND = 'this-is-not-expected-to-be-a-program-name'
class FakeSelinux(object):
@@ -742,6 +743,8 @@ class TestReadSeeded(helpers.TestCase):
class TestSubp(helpers.CiTestCase):
with_logs = True
+ allowed_subp = [BASH, 'cat', helpers.CiTestCase.SUBP_SHELL_TRUE,
+ BOGUS_COMMAND, sys.executable]
stdin2err = [BASH, '-c', 'cat >&2']
stdin2out = ['cat']
@@ -749,7 +752,6 @@ class TestSubp(helpers.CiTestCase):
utf8_valid = b'start \xc3\xa9 end'
utf8_valid_2 = b'd\xc3\xa9j\xc8\xa7'
printenv = [BASH, '-c', 'for n in "$@"; do echo "$n=${!n}"; done', '--']
- bogus_command = 'this-is-not-expected-to-be-a-program-name'
def printf_cmd(self, *args):
# bash's printf supports \xaa. So does /usr/bin/printf
@@ -848,9 +850,10 @@ class TestSubp(helpers.CiTestCase):
util.write_file(noshebang, 'true\n')
os.chmod(noshebang, os.stat(noshebang).st_mode | stat.S_IEXEC)
- self.assertRaisesRegex(util.ProcessExecutionError,
- r'Missing #! in script\?',
- util.subp, (noshebang,))
+ with self.allow_subp([noshebang]):
+ self.assertRaisesRegex(util.ProcessExecutionError,
+ r'Missing #! in script\?',
+ util.subp, (noshebang,))
def test_subp_combined_stderr_stdout(self):
"""Providing combine_capture as True redirects stderr to stdout."""
@@ -868,14 +871,14 @@ class TestSubp(helpers.CiTestCase):
def test_exception_has_out_err_are_bytes_if_decode_false(self):
"""Raised exc should have stderr, stdout as bytes if no decode."""
with self.assertRaises(util.ProcessExecutionError) as cm:
- util.subp([self.bogus_command], decode=False)
+ util.subp([BOGUS_COMMAND], decode=False)
self.assertTrue(isinstance(cm.exception.stdout, bytes))
self.assertTrue(isinstance(cm.exception.stderr, bytes))
def test_exception_has_out_err_are_bytes_if_decode_true(self):
"""Raised exc should have stderr, stdout as string if no decode."""
with self.assertRaises(util.ProcessExecutionError) as cm:
- util.subp([self.bogus_command], decode=True)
+ util.subp([BOGUS_COMMAND], decode=True)
self.assertTrue(isinstance(cm.exception.stdout, six.string_types))
self.assertTrue(isinstance(cm.exception.stderr, six.string_types))
@@ -925,10 +928,10 @@ class TestSubp(helpers.CiTestCase):
logs.append(log)
with self.assertRaises(util.ProcessExecutionError):
- util.subp([self.bogus_command], status_cb=status_cb)
+ util.subp([BOGUS_COMMAND], status_cb=status_cb)
expected = [
- 'Begin run command: {cmd}\n'.format(cmd=self.bogus_command),
+ 'Begin run command: {cmd}\n'.format(cmd=BOGUS_COMMAND),
'ERROR: End run command: invalid command provided\n']
self.assertEqual(expected, logs)
@@ -940,13 +943,13 @@ class TestSubp(helpers.CiTestCase):
logs.append(log)
with self.assertRaises(util.ProcessExecutionError):
- util.subp(['ls', '/I/dont/exist'], status_cb=status_cb)
- util.subp(['ls'], status_cb=status_cb)
+ util.subp([BASH, '-c', 'exit 2'], status_cb=status_cb)
+ util.subp([BASH, '-c', 'exit 0'], status_cb=status_cb)
expected = [
- 'Begin run command: ls /I/dont/exist\n',
+ 'Begin run command: %s -c exit 2\n' % BASH,
'ERROR: End run command: exit(2)\n',
- 'Begin run command: ls\n',
+ 'Begin run command: %s -c exit 0\n' % BASH,
'End run command: exit(0)\n']
self.assertEqual(expected, logs)