summaryrefslogtreecommitdiff
path: root/tests/unittests
diff options
context:
space:
mode:
authorScott Moser <smoser@ubuntu.com>2018-09-05 16:02:25 +0000
committerServer Team CI Bot <josh.powers+server-team-bot@canonical.com>2018-09-05 16:02:25 +0000
commita8dcad9ac62bb1d2a4f7489960395bad6cac9382 (patch)
tree837df7fce2bdc2c05ce7c3bf497fc8a5be5cb99f /tests/unittests
parentdb50bc0d999e3a90136864a774f85e4e15b144e8 (diff)
downloadvyos-cloud-init-a8dcad9ac62bb1d2a4f7489960395bad6cac9382.tar.gz
vyos-cloud-init-a8dcad9ac62bb1d2a4f7489960395bad6cac9382.zip
tests: Disallow use of util.subp except for where needed.
In many cases, cloud-init uses 'util.subp' to run a subprocess. This is not really desirable in our unit tests as it makes the tests dependent upon existance of those utilities. The change here is to modify the base test case class (CiTestCase) to raise exception any time subp is called. Then, fix all callers. For cases where subp is necessary or actually desired, we can use it via   a.) context hander CiTestCase.allow_subp(value)   b.) class level self.allowed_subp = value Both cases the value is a list of acceptable executable names that will be called (essentially argv[0]). Some cleanups in AltCloud were done as the code was being updated.
Diffstat (limited to 'tests/unittests')
-rw-r--r--tests/unittests/test_datasource/test_altcloud.py44
-rw-r--r--tests/unittests/test_datasource/test_cloudsigma.py3
-rw-r--r--tests/unittests/test_datasource/test_configdrive.py3
-rw-r--r--tests/unittests/test_datasource/test_nocloud.py2
-rw-r--r--tests/unittests/test_datasource/test_opennebula.py1
-rw-r--r--tests/unittests/test_datasource/test_ovf.py8
-rw-r--r--tests/unittests/test_datasource/test_smartos.py94
-rw-r--r--tests/unittests/test_ds_identify.py1
-rw-r--r--tests/unittests/test_handler/test_handler_apt_source_v3.py11
-rw-r--r--tests/unittests/test_handler/test_handler_bootcmd.py10
-rw-r--r--tests/unittests/test_handler/test_handler_chef.py18
-rw-r--r--tests/unittests/test_handler/test_handler_resizefs.py8
-rw-r--r--tests/unittests/test_handler/test_schema.py12
-rw-r--r--tests/unittests/test_net.py18
-rw-r--r--tests/unittests/test_util.py27
15 files changed, 170 insertions, 90 deletions
diff --git a/tests/unittests/test_datasource/test_altcloud.py b/tests/unittests/test_datasource/test_altcloud.py
index 3253f3ad..ff35904e 100644
--- a/tests/unittests/test_datasource/test_altcloud.py
+++ b/tests/unittests/test_datasource/test_altcloud.py
@@ -262,64 +262,56 @@ class TestUserDataRhevm(CiTestCase):
'''
Test to exercise method: DataSourceAltCloud.user_data_rhevm()
'''
- cmd_pass = ['true']
- cmd_fail = ['false']
- cmd_not_found = ['bogus bad command']
-
def setUp(self):
'''Set up.'''
self.paths = helpers.Paths({'cloud_dir': '/tmp'})
- self.mount_dir = tempfile.mkdtemp()
+ self.mount_dir = self.tmp_dir()
_write_user_data_files(self.mount_dir, 'test user data')
-
- def tearDown(self):
- # Reset
-
- _remove_user_data_files(self.mount_dir)
-
- # Attempt to remove the temp dir ignoring errors
- try:
- shutil.rmtree(self.mount_dir)
- except OSError:
- pass
-
- dsac.CLOUD_INFO_FILE = '/etc/sysconfig/cloud-info'
- dsac.CMD_PROBE_FLOPPY = ['modprobe', 'floppy']
- dsac.CMD_UDEVADM_SETTLE = ['udevadm', 'settle',
- '--quiet', '--timeout=5']
+ self.add_patch(
+ 'cloudinit.sources.DataSourceAltCloud.modprobe_floppy',
+ 'm_modprobe_floppy', return_value=None)
+ self.add_patch(
+ 'cloudinit.sources.DataSourceAltCloud.util.udevadm_settle',
+ 'm_udevadm_settle', return_value=('', ''))
+ self.add_patch(
+ 'cloudinit.sources.DataSourceAltCloud.util.mount_cb',
+ 'm_mount_cb')
def test_mount_cb_fails(self):
'''Test user_data_rhevm() where mount_cb fails.'''
- dsac.CMD_PROBE_FLOPPY = self.cmd_pass
+ self.m_mount_cb.side_effect = util.MountFailedError("Failed Mount")
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
self.assertEqual(False, dsrc.user_data_rhevm())
def test_modprobe_fails(self):
'''Test user_data_rhevm() where modprobe fails.'''
- dsac.CMD_PROBE_FLOPPY = self.cmd_fail
+ self.m_modprobe_floppy.side_effect = util.ProcessExecutionError(
+ "Failed modprobe")
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
self.assertEqual(False, dsrc.user_data_rhevm())
def test_no_modprobe_cmd(self):
'''Test user_data_rhevm() with no modprobe command.'''
- dsac.CMD_PROBE_FLOPPY = self.cmd_not_found
+ self.m_modprobe_floppy.side_effect = util.ProcessExecutionError(
+ "No such file or dir")
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
self.assertEqual(False, dsrc.user_data_rhevm())
def test_udevadm_fails(self):
'''Test user_data_rhevm() where udevadm fails.'''
- dsac.CMD_UDEVADM_SETTLE = self.cmd_fail
+ self.m_udevadm_settle.side_effect = util.ProcessExecutionError(
+ "Failed settle.")
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
self.assertEqual(False, dsrc.user_data_rhevm())
def test_no_udevadm_cmd(self):
'''Test user_data_rhevm() with no udevadm command.'''
- dsac.CMD_UDEVADM_SETTLE = self.cmd_not_found
+ self.m_udevadm_settle.side_effect = OSError("No such file or dir")
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
self.assertEqual(False, dsrc.user_data_rhevm())
diff --git a/tests/unittests/test_datasource/test_cloudsigma.py b/tests/unittests/test_datasource/test_cloudsigma.py
index f6a59b6b..380ad1b5 100644
--- a/tests/unittests/test_datasource/test_cloudsigma.py
+++ b/tests/unittests/test_datasource/test_cloudsigma.py
@@ -42,6 +42,9 @@ class CepkoMock(Cepko):
class DataSourceCloudSigmaTest(test_helpers.CiTestCase):
def setUp(self):
super(DataSourceCloudSigmaTest, self).setUp()
+ self.add_patch(
+ "cloudinit.sources.DataSourceCloudSigma.util.is_container",
+ "m_is_container", return_value=False)
self.paths = helpers.Paths({'run_dir': self.tmp_dir()})
self.datasource = DataSourceCloudSigma.DataSourceCloudSigma(
"", "", paths=self.paths)
diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py
index 7e6fcbbf..b0abfc5f 100644
--- a/tests/unittests/test_datasource/test_configdrive.py
+++ b/tests/unittests/test_datasource/test_configdrive.py
@@ -224,6 +224,9 @@ class TestConfigDriveDataSource(CiTestCase):
def setUp(self):
super(TestConfigDriveDataSource, self).setUp()
+ self.add_patch(
+ "cloudinit.sources.DataSourceConfigDrive.util.find_devs_with",
+ "m_find_devs_with", return_value=[])
self.tmp = self.tmp_dir()
def test_ec2_metadata(self):
diff --git a/tests/unittests/test_datasource/test_nocloud.py b/tests/unittests/test_datasource/test_nocloud.py
index cdbd1e1a..21931eb7 100644
--- a/tests/unittests/test_datasource/test_nocloud.py
+++ b/tests/unittests/test_datasource/test_nocloud.py
@@ -25,6 +25,8 @@ class TestNoCloudDataSource(CiTestCase):
self.mocks.enter_context(
mock.patch.object(util, 'get_cmdline', return_value=self.cmdline))
+ self.mocks.enter_context(
+ mock.patch.object(util, 'read_dmi_data', return_value=None))
def test_nocloud_seed_dir(self):
md = {'instance-id': 'IID', 'dsmode': 'local'}
diff --git a/tests/unittests/test_datasource/test_opennebula.py b/tests/unittests/test_datasource/test_opennebula.py
index 36b4d771..61591017 100644
--- a/tests/unittests/test_datasource/test_opennebula.py
+++ b/tests/unittests/test_datasource/test_opennebula.py
@@ -43,6 +43,7 @@ DS_PATH = "cloudinit.sources.DataSourceOpenNebula"
class TestOpenNebulaDataSource(CiTestCase):
parsed_user = None
+ allowed_subp = ['bash']
def setUp(self):
super(TestOpenNebulaDataSource, self).setUp()
diff --git a/tests/unittests/test_datasource/test_ovf.py b/tests/unittests/test_datasource/test_ovf.py
index fc4eb36e..9d52eb99 100644
--- a/tests/unittests/test_datasource/test_ovf.py
+++ b/tests/unittests/test_datasource/test_ovf.py
@@ -124,7 +124,9 @@ class TestDatasourceOVF(CiTestCase):
ds = self.datasource(sys_cfg={}, distro={}, paths=paths)
retcode = wrap_and_call(
'cloudinit.sources.DataSourceOVF',
- {'util.read_dmi_data': None},
+ {'util.read_dmi_data': None,
+ 'transport_iso9660': (False, None, None),
+ 'transport_vmware_guestd': (False, None, None)},
ds.get_data)
self.assertFalse(retcode, 'Expected False return from ds.get_data')
self.assertIn(
@@ -138,7 +140,9 @@ class TestDatasourceOVF(CiTestCase):
paths=paths)
retcode = wrap_and_call(
'cloudinit.sources.DataSourceOVF',
- {'util.read_dmi_data': 'vmware'},
+ {'util.read_dmi_data': 'vmware',
+ 'transport_iso9660': (False, None, None),
+ 'transport_vmware_guestd': (False, None, None)},
ds.get_data)
self.assertFalse(retcode, 'Expected False return from ds.get_data')
self.assertIn(
diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py
index dca0b3d4..46d67b94 100644
--- a/tests/unittests/test_datasource/test_smartos.py
+++ b/tests/unittests/test_datasource/test_smartos.py
@@ -20,10 +20,8 @@ import multiprocessing
import os
import os.path
import re
-import shutil
import signal
import stat
-import tempfile
import unittest2
import uuid
@@ -31,15 +29,27 @@ from cloudinit import serial
from cloudinit.sources import DataSourceSmartOS
from cloudinit.sources.DataSourceSmartOS import (
convert_smartos_network_data as convert_net,
- SMARTOS_ENV_KVM, SERIAL_DEVICE, get_smartos_environ)
+ SMARTOS_ENV_KVM, SERIAL_DEVICE, get_smartos_environ,
+ identify_file)
import six
from cloudinit import helpers as c_helpers
-from cloudinit.util import (b64e, subp)
+from cloudinit.util import (
+ b64e, subp, ProcessExecutionError, which, write_file)
-from cloudinit.tests.helpers import mock, FilesystemMockingTestCase, TestCase
+from cloudinit.tests.helpers import (
+ CiTestCase, mock, FilesystemMockingTestCase, skipIf)
+
+try:
+ import serial as _pyserial
+ assert _pyserial # avoid pyflakes error F401: import unused
+ HAS_PYSERIAL = True
+except ImportError:
+ HAS_PYSERIAL = False
+
+DSMOS = 'cloudinit.sources.DataSourceSmartOS'
SDC_NICS = json.loads("""
[
{
@@ -366,37 +376,33 @@ class PsuedoJoyentClient(object):
class TestSmartOSDataSource(FilesystemMockingTestCase):
+ jmc_cfact = None
+ get_smartos_environ = None
+
def setUp(self):
super(TestSmartOSDataSource, self).setUp()
- dsmos = 'cloudinit.sources.DataSourceSmartOS'
- patcher = mock.patch(dsmos + ".jmc_client_factory")
- self.jmc_cfact = patcher.start()
- self.addCleanup(patcher.stop)
- patcher = mock.patch(dsmos + ".get_smartos_environ")
- self.get_smartos_environ = patcher.start()
- self.addCleanup(patcher.stop)
-
- self.tmp = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.tmp)
- self.paths = c_helpers.Paths(
- {'cloud_dir': self.tmp, 'run_dir': self.tmp})
-
- self.legacy_user_d = os.path.join(self.tmp, 'legacy_user_tmp')
+ self.add_patch(DSMOS + ".get_smartos_environ", "get_smartos_environ")
+ self.add_patch(DSMOS + ".jmc_client_factory", "jmc_cfact")
+ self.legacy_user_d = self.tmp_path('legacy_user_tmp')
os.mkdir(self.legacy_user_d)
-
- self.orig_lud = DataSourceSmartOS.LEGACY_USER_D
- DataSourceSmartOS.LEGACY_USER_D = self.legacy_user_d
-
- def tearDown(self):
- DataSourceSmartOS.LEGACY_USER_D = self.orig_lud
- super(TestSmartOSDataSource, self).tearDown()
+ self.add_patch(DSMOS + ".LEGACY_USER_D", "m_legacy_user_d",
+ autospec=False, new=self.legacy_user_d)
+ self.add_patch(DSMOS + ".identify_file", "m_identify_file",
+ return_value="text/plain")
def _get_ds(self, mockdata=None, mode=DataSourceSmartOS.SMARTOS_ENV_KVM,
sys_cfg=None, ds_cfg=None):
self.jmc_cfact.return_value = PsuedoJoyentClient(mockdata)
self.get_smartos_environ.return_value = mode
+ tmpd = self.tmp_dir()
+ dirs = {'cloud_dir': self.tmp_path('cloud_dir', tmpd),
+ 'run_dir': self.tmp_path('run_dir')}
+ for d in dirs.values():
+ os.mkdir(d)
+ paths = c_helpers.Paths(dirs)
+
if sys_cfg is None:
sys_cfg = {}
@@ -405,7 +411,7 @@ class TestSmartOSDataSource(FilesystemMockingTestCase):
sys_cfg['datasource']['SmartOS'] = ds_cfg
return DataSourceSmartOS.DataSourceSmartOS(
- sys_cfg, distro=None, paths=self.paths)
+ sys_cfg, distro=None, paths=paths)
def test_no_base64(self):
ds_cfg = {'no_base64_decode': ['test_var1'], 'all_base': True}
@@ -493,6 +499,7 @@ class TestSmartOSDataSource(FilesystemMockingTestCase):
dsrc.metadata['user-script'])
legacy_script_f = "%s/user-script" % self.legacy_user_d
+ print("legacy_script_f=%s" % legacy_script_f)
self.assertTrue(os.path.exists(legacy_script_f))
self.assertTrue(os.path.islink(legacy_script_f))
user_script_perm = oct(os.stat(legacy_script_f)[stat.ST_MODE])[-3:]
@@ -640,6 +647,28 @@ class TestSmartOSDataSource(FilesystemMockingTestCase):
mydscfg['disk_aliases']['FOO'])
+class TestIdentifyFile(CiTestCase):
+ """Test the 'identify_file' utility."""
+ @skipIf(not which("file"), "command 'file' not available.")
+ def test_file_happy_path(self):
+ """Test file is available and functional on plain text."""
+ fname = self.tmp_path("myfile")
+ write_file(fname, "plain text content here\n")
+ with self.allow_subp(["file"]):
+ self.assertEqual("text/plain", identify_file(fname))
+
+ @mock.patch(DSMOS + ".util.subp")
+ def test_returns_none_on_error(self, m_subp):
+ """On 'file' execution error, None should be returned."""
+ m_subp.side_effect = ProcessExecutionError("FILE_FAILED", exit_code=99)
+ fname = self.tmp_path("myfile")
+ write_file(fname, "plain text content here\n")
+ self.assertEqual(None, identify_file(fname))
+ self.assertEqual(
+ [mock.call(["file", "--brief", "--mime-type", fname])],
+ m_subp.call_args_list)
+
+
class ShortReader(object):
"""Implements a 'read' interface for bytes provided.
much like io.BytesIO but the 'endbyte' acts as if EOF.
@@ -893,7 +922,7 @@ class TestJoyentMetadataClient(FilesystemMockingTestCase):
self.assertEqual(client.list(), [])
-class TestNetworkConversion(TestCase):
+class TestNetworkConversion(CiTestCase):
def test_convert_simple(self):
expected = {
'version': 1,
@@ -1058,7 +1087,8 @@ class TestNetworkConversion(TestCase):
"Only supported on KVM and bhyve guests under SmartOS")
@unittest2.skipUnless(os.access(SERIAL_DEVICE, os.W_OK),
"Requires write access to " + SERIAL_DEVICE)
-class TestSerialConcurrency(TestCase):
+@unittest2.skipUnless(HAS_PYSERIAL is True, "pyserial not available")
+class TestSerialConcurrency(CiTestCase):
"""
This class tests locking on an actual serial port, and as such can only
be run in a kvm or bhyve guest running on a SmartOS host. A test run on
@@ -1066,7 +1096,11 @@ class TestSerialConcurrency(TestCase):
there is only one session over a connection. In contrast, in the
absence of proper locking multiple processes opening the same serial
port can corrupt each others' exchanges with the metadata server.
+
+ This takes on the order of 2 to 3 minutes to run.
"""
+ allowed_subp = ['mdata-get']
+
def setUp(self):
self.mdata_proc = multiprocessing.Process(target=self.start_mdata_loop)
self.mdata_proc.start()
@@ -1097,7 +1131,7 @@ class TestSerialConcurrency(TestCase):
keys = [tup[0] for tup in ds.SMARTOS_ATTRIB_MAP.values()]
keys.extend(ds.SMARTOS_ATTRIB_JSON.values())
- client = ds.jmc_client_factory()
+ client = ds.jmc_client_factory(smartos_type=SMARTOS_ENV_KVM)
self.assertIsNotNone(client)
# The behavior that we are testing for was observed mdata-get running
diff --git a/tests/unittests/test_ds_identify.py b/tests/unittests/test_ds_identify.py
index e08e7908..46778e95 100644
--- a/tests/unittests/test_ds_identify.py
+++ b/tests/unittests/test_ds_identify.py
@@ -89,6 +89,7 @@ CallReturn = namedtuple('CallReturn',
class DsIdentifyBase(CiTestCase):
dsid_path = os.path.realpath('tools/ds-identify')
+ allowed_subp = ['sh']
def call(self, rootd=None, mocks=None, func="main", args=None, files=None,
policy_dmi=DI_DEFAULT_POLICY,
diff --git a/tests/unittests/test_handler/test_handler_apt_source_v3.py b/tests/unittests/test_handler/test_handler_apt_source_v3.py
index 7a64c230..a81c67c7 100644
--- a/tests/unittests/test_handler/test_handler_apt_source_v3.py
+++ b/tests/unittests/test_handler/test_handler_apt_source_v3.py
@@ -48,6 +48,10 @@ ADD_APT_REPO_MATCH = r"^[\w-]+:\w"
TARGET = None
+MOCK_LSB_RELEASE_DATA = {
+ 'id': 'Ubuntu', 'description': 'Ubuntu 18.04.1 LTS',
+ 'release': '18.04', 'codename': 'bionic'}
+
class TestAptSourceConfig(t_help.FilesystemMockingTestCase):
"""TestAptSourceConfig
@@ -64,6 +68,9 @@ class TestAptSourceConfig(t_help.FilesystemMockingTestCase):
self.aptlistfile3 = os.path.join(self.tmp, "single-deb3.list")
self.join = os.path.join
self.matcher = re.compile(ADD_APT_REPO_MATCH).search
+ self.add_patch(
+ 'cloudinit.config.cc_apt_configure.util.lsb_release',
+ 'm_lsb_release', return_value=MOCK_LSB_RELEASE_DATA.copy())
@staticmethod
def _add_apt_sources(*args, **kwargs):
@@ -76,7 +83,7 @@ class TestAptSourceConfig(t_help.FilesystemMockingTestCase):
Get the most basic default mrror and release info to be used in tests
"""
params = {}
- params['RELEASE'] = util.lsb_release()['codename']
+ params['RELEASE'] = MOCK_LSB_RELEASE_DATA['release']
arch = 'amd64'
params['MIRROR'] = cc_apt_configure.\
get_default_mirrors(arch)["PRIMARY"]
@@ -464,7 +471,7 @@ class TestAptSourceConfig(t_help.FilesystemMockingTestCase):
'uri':
'http://testsec.ubuntu.com/%s/' % component}]}
post = ("%s_dists_%s-updates_InRelease" %
- (component, util.lsb_release()['codename']))
+ (component, MOCK_LSB_RELEASE_DATA['codename']))
fromfn = ("%s/%s_%s" % (pre, archive, post))
tofn = ("%s/test.ubuntu.com_%s" % (pre, post))
diff --git a/tests/unittests/test_handler/test_handler_bootcmd.py b/tests/unittests/test_handler/test_handler_bootcmd.py
index b1375269..a76760fa 100644
--- a/tests/unittests/test_handler/test_handler_bootcmd.py
+++ b/tests/unittests/test_handler/test_handler_bootcmd.py
@@ -118,7 +118,8 @@ class TestBootcmd(CiTestCase):
'echo {0} $INSTANCE_ID > {1}'.format(my_id, out_file)]}
with mock.patch(self._etmpfile_path, FakeExtendedTempFile):
- handle('cc_bootcmd', valid_config, cc, LOG, [])
+ with self.allow_subp(['/bin/sh']):
+ handle('cc_bootcmd', valid_config, cc, LOG, [])
self.assertEqual(my_id + ' iid-datasource-none\n',
util.load_file(out_file))
@@ -128,12 +129,13 @@ class TestBootcmd(CiTestCase):
valid_config = {'bootcmd': ['exit 1']} # Script with error
with mock.patch(self._etmpfile_path, FakeExtendedTempFile):
- with self.assertRaises(util.ProcessExecutionError) as ctxt_manager:
- handle('does-not-matter', valid_config, cc, LOG, [])
+ with self.allow_subp(['/bin/sh']):
+ with self.assertRaises(util.ProcessExecutionError) as ctxt:
+ handle('does-not-matter', valid_config, cc, LOG, [])
self.assertIn(
'Unexpected error while running command.\n'
"Command: ['/bin/sh',",
- str(ctxt_manager.exception))
+ str(ctxt.exception))
self.assertIn(
'Failed to run bootcmd module does-not-matter',
self.logs.getvalue())
diff --git a/tests/unittests/test_handler/test_handler_chef.py b/tests/unittests/test_handler/test_handler_chef.py
index f4bbd66d..b16532ea 100644
--- a/tests/unittests/test_handler/test_handler_chef.py
+++ b/tests/unittests/test_handler/test_handler_chef.py
@@ -36,13 +36,21 @@ class TestInstallChefOmnibus(HttprettyTestCase):
@mock.patch("cloudinit.config.cc_chef.OMNIBUS_URL", OMNIBUS_URL_HTTP)
def test_install_chef_from_omnibus_runs_chef_url_content(self):
- """install_chef_from_omnibus runs downloaded OMNIBUS_URL as script."""
- chef_outfile = self.tmp_path('chef.out', self.new_root)
- response = '#!/bin/bash\necho "Hi Mom" > {0}'.format(chef_outfile)
+ """install_chef_from_omnibus calls subp_blob_in_tempfile."""
+ response = b'#!/bin/bash\necho "Hi Mom"'
httpretty.register_uri(
httpretty.GET, cc_chef.OMNIBUS_URL, body=response, status=200)
- cc_chef.install_chef_from_omnibus()
- self.assertEqual('Hi Mom\n', util.load_file(chef_outfile))
+ ret = (None, None) # stdout, stderr but capture=False
+
+ with mock.patch("cloudinit.config.cc_chef.util.subp_blob_in_tempfile",
+ return_value=ret) as m_subp_blob:
+ cc_chef.install_chef_from_omnibus()
+ # admittedly whitebox, but assuming subp_blob_in_tempfile works
+ # this should be fine.
+ self.assertEqual(
+ [mock.call(blob=response, args=[], basename='chef-omnibus-install',
+ capture=False)],
+ m_subp_blob.call_args_list)
@mock.patch('cloudinit.config.cc_chef.url_helper.readurl')
@mock.patch('cloudinit.config.cc_chef.util.subp_blob_in_tempfile')
diff --git a/tests/unittests/test_handler/test_handler_resizefs.py b/tests/unittests/test_handler/test_handler_resizefs.py
index f92175fd..feca56c2 100644
--- a/tests/unittests/test_handler/test_handler_resizefs.py
+++ b/tests/unittests/test_handler/test_handler_resizefs.py
@@ -150,10 +150,12 @@ class TestResizefs(CiTestCase):
self.assertEqual(('growfs', '-y', devpth),
_resize_ufs(mount_point, devpth))
+ @mock.patch('cloudinit.util.is_container', return_value=False)
@mock.patch('cloudinit.util.get_mount_info')
@mock.patch('cloudinit.util.get_device_info_from_zpool')
@mock.patch('cloudinit.util.parse_mount')
- def test_handle_zfs_root(self, mount_info, zpool_info, parse_mount):
+ def test_handle_zfs_root(self, mount_info, zpool_info, parse_mount,
+ is_container):
devpth = 'vmzroot/ROOT/freebsd'
disk = 'gpt/system'
fs_type = 'zfs'
@@ -354,8 +356,10 @@ class TestMaybeGetDevicePathAsWritableBlock(CiTestCase):
('btrfs', 'filesystem', 'resize', 'max', '/'),
_resize_btrfs("/", "/dev/sda1"))
+ @mock.patch('cloudinit.util.is_container', return_value=True)
@mock.patch('cloudinit.util.is_FreeBSD')
- def test_maybe_get_writable_device_path_zfs_freebsd(self, freebsd):
+ def test_maybe_get_writable_device_path_zfs_freebsd(self, freebsd,
+ m_is_container):
freebsd.return_value = True
info = 'dev=gpt/system mnt_point=/ path=/'
devpth = maybe_get_writable_device_path('gpt/system', info, LOG)
diff --git a/tests/unittests/test_handler/test_schema.py b/tests/unittests/test_handler/test_schema.py
index fb266faf..1bad07f6 100644
--- a/tests/unittests/test_handler/test_schema.py
+++ b/tests/unittests/test_handler/test_schema.py
@@ -4,7 +4,7 @@ from cloudinit.config.schema import (
CLOUD_CONFIG_HEADER, SchemaValidationError, annotated_cloudconfig_file,
get_schema_doc, get_schema, validate_cloudconfig_file,
validate_cloudconfig_schema, main)
-from cloudinit.util import subp, write_file
+from cloudinit.util import write_file
from cloudinit.tests.helpers import CiTestCase, mock, skipUnlessJsonSchema
@@ -406,8 +406,14 @@ class CloudTestsIntegrationTest(CiTestCase):
integration_testdir = os.path.sep.join(
[testsdir, 'cloud_tests', 'testcases'])
errors = []
- out, _ = subp(['find', integration_testdir, '-name', '*yaml'])
- for filename in out.splitlines():
+
+ yaml_files = []
+ for root, _dirnames, filenames in os.walk(integration_testdir):
+ yaml_files.extend([os.path.join(root, f)
+ for f in filenames if f.endswith(".yaml")])
+ self.assertTrue(len(yaml_files) > 0)
+
+ for filename in yaml_files:
test_cfg = safe_load(open(filename))
cloud_config = test_cfg.get('cloud_config')
if cloud_config:
diff --git a/tests/unittests/test_net.py b/tests/unittests/test_net.py
index 05d5c72c..f3165da9 100644
--- a/tests/unittests/test_net.py
+++ b/tests/unittests/test_net.py
@@ -1653,6 +1653,12 @@ def _setup_test(tmp_dir, mock_get_devicelist, mock_read_sys_net,
class TestGenerateFallbackConfig(CiTestCase):
+ def setUp(self):
+ super(TestGenerateFallbackConfig, self).setUp()
+ self.add_patch(
+ "cloudinit.util.get_cmdline", "m_get_cmdline",
+ return_value="root=/dev/sda1")
+
@mock.patch("cloudinit.net.sys_dev_path")
@mock.patch("cloudinit.net.read_sys_net")
@mock.patch("cloudinit.net.get_devicelist")
@@ -1895,12 +1901,13 @@ class TestRhelSysConfigRendering(CiTestCase):
if missing:
raise AssertionError("Missing headers in: %s" % missing)
+ @mock.patch("cloudinit.net.util.get_cmdline", return_value="root=myroot")
@mock.patch("cloudinit.net.sys_dev_path")
@mock.patch("cloudinit.net.read_sys_net")
@mock.patch("cloudinit.net.get_devicelist")
def test_default_generation(self, mock_get_devicelist,
mock_read_sys_net,
- mock_sys_dev_path):
+ mock_sys_dev_path, m_get_cmdline):
tmp_dir = self.tmp_dir()
_setup_test(tmp_dir, mock_get_devicelist,
mock_read_sys_net, mock_sys_dev_path)
@@ -2183,12 +2190,13 @@ class TestOpenSuseSysConfigRendering(CiTestCase):
if missing:
raise AssertionError("Missing headers in: %s" % missing)
+ @mock.patch("cloudinit.net.util.get_cmdline", return_value="root=myroot")
@mock.patch("cloudinit.net.sys_dev_path")
@mock.patch("cloudinit.net.read_sys_net")
@mock.patch("cloudinit.net.get_devicelist")
def test_default_generation(self, mock_get_devicelist,
mock_read_sys_net,
- mock_sys_dev_path):
+ mock_sys_dev_path, m_get_cmdline):
tmp_dir = self.tmp_dir()
_setup_test(tmp_dir, mock_get_devicelist,
mock_read_sys_net, mock_sys_dev_path)
@@ -2429,12 +2437,13 @@ USERCTL=no
class TestEniNetRendering(CiTestCase):
+ @mock.patch("cloudinit.net.util.get_cmdline", return_value="root=myroot")
@mock.patch("cloudinit.net.sys_dev_path")
@mock.patch("cloudinit.net.read_sys_net")
@mock.patch("cloudinit.net.get_devicelist")
def test_default_generation(self, mock_get_devicelist,
mock_read_sys_net,
- mock_sys_dev_path):
+ mock_sys_dev_path, m_get_cmdline):
tmp_dir = self.tmp_dir()
_setup_test(tmp_dir, mock_get_devicelist,
mock_read_sys_net, mock_sys_dev_path)
@@ -2482,6 +2491,7 @@ iface eth0 inet dhcp
class TestNetplanNetRendering(CiTestCase):
+ @mock.patch("cloudinit.net.util.get_cmdline", return_value="root=myroot")
@mock.patch("cloudinit.net.netplan._clean_default")
@mock.patch("cloudinit.net.sys_dev_path")
@mock.patch("cloudinit.net.read_sys_net")
@@ -2489,7 +2499,7 @@ class TestNetplanNetRendering(CiTestCase):
def test_default_generation(self, mock_get_devicelist,
mock_read_sys_net,
mock_sys_dev_path,
- mock_clean_default):
+ mock_clean_default, m_get_cmdline):
tmp_dir = self.tmp_dir()
_setup_test(tmp_dir, mock_get_devicelist,
mock_read_sys_net, mock_sys_dev_path)
diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py
index 7a203ce2..5a14479a 100644
--- a/tests/unittests/test_util.py
+++ b/tests/unittests/test_util.py
@@ -24,6 +24,7 @@ except ImportError:
BASH = util.which('bash')
+BOGUS_COMMAND = 'this-is-not-expected-to-be-a-program-name'
class FakeSelinux(object):
@@ -742,6 +743,8 @@ class TestReadSeeded(helpers.TestCase):
class TestSubp(helpers.CiTestCase):
with_logs = True
+ allowed_subp = [BASH, 'cat', helpers.CiTestCase.SUBP_SHELL_TRUE,
+ BOGUS_COMMAND, sys.executable]
stdin2err = [BASH, '-c', 'cat >&2']
stdin2out = ['cat']
@@ -749,7 +752,6 @@ class TestSubp(helpers.CiTestCase):
utf8_valid = b'start \xc3\xa9 end'
utf8_valid_2 = b'd\xc3\xa9j\xc8\xa7'
printenv = [BASH, '-c', 'for n in "$@"; do echo "$n=${!n}"; done', '--']
- bogus_command = 'this-is-not-expected-to-be-a-program-name'
def printf_cmd(self, *args):
# bash's printf supports \xaa. So does /usr/bin/printf
@@ -848,9 +850,10 @@ class TestSubp(helpers.CiTestCase):
util.write_file(noshebang, 'true\n')
os.chmod(noshebang, os.stat(noshebang).st_mode | stat.S_IEXEC)
- self.assertRaisesRegex(util.ProcessExecutionError,
- r'Missing #! in script\?',
- util.subp, (noshebang,))
+ with self.allow_subp([noshebang]):
+ self.assertRaisesRegex(util.ProcessExecutionError,
+ r'Missing #! in script\?',
+ util.subp, (noshebang,))
def test_subp_combined_stderr_stdout(self):
"""Providing combine_capture as True redirects stderr to stdout."""
@@ -868,14 +871,14 @@ class TestSubp(helpers.CiTestCase):
def test_exception_has_out_err_are_bytes_if_decode_false(self):
"""Raised exc should have stderr, stdout as bytes if no decode."""
with self.assertRaises(util.ProcessExecutionError) as cm:
- util.subp([self.bogus_command], decode=False)
+ util.subp([BOGUS_COMMAND], decode=False)
self.assertTrue(isinstance(cm.exception.stdout, bytes))
self.assertTrue(isinstance(cm.exception.stderr, bytes))
def test_exception_has_out_err_are_bytes_if_decode_true(self):
"""Raised exc should have stderr, stdout as string if no decode."""
with self.assertRaises(util.ProcessExecutionError) as cm:
- util.subp([self.bogus_command], decode=True)
+ util.subp([BOGUS_COMMAND], decode=True)
self.assertTrue(isinstance(cm.exception.stdout, six.string_types))
self.assertTrue(isinstance(cm.exception.stderr, six.string_types))
@@ -925,10 +928,10 @@ class TestSubp(helpers.CiTestCase):
logs.append(log)
with self.assertRaises(util.ProcessExecutionError):
- util.subp([self.bogus_command], status_cb=status_cb)
+ util.subp([BOGUS_COMMAND], status_cb=status_cb)
expected = [
- 'Begin run command: {cmd}\n'.format(cmd=self.bogus_command),
+ 'Begin run command: {cmd}\n'.format(cmd=BOGUS_COMMAND),
'ERROR: End run command: invalid command provided\n']
self.assertEqual(expected, logs)
@@ -940,13 +943,13 @@ class TestSubp(helpers.CiTestCase):
logs.append(log)
with self.assertRaises(util.ProcessExecutionError):
- util.subp(['ls', '/I/dont/exist'], status_cb=status_cb)
- util.subp(['ls'], status_cb=status_cb)
+ util.subp([BASH, '-c', 'exit 2'], status_cb=status_cb)
+ util.subp([BASH, '-c', 'exit 0'], status_cb=status_cb)
expected = [
- 'Begin run command: ls /I/dont/exist\n',
+ 'Begin run command: %s -c exit 2\n' % BASH,
'ERROR: End run command: exit(2)\n',
- 'Begin run command: ls\n',
+ 'Begin run command: %s -c exit 0\n' % BASH,
'End run command: exit(0)\n']
self.assertEqual(expected, logs)