summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorScott Moser <smoser@ubuntu.com>2018-09-05 16:02:25 +0000
committerServer Team CI Bot <josh.powers+server-team-bot@canonical.com>2018-09-05 16:02:25 +0000
commita8dcad9ac62bb1d2a4f7489960395bad6cac9382 (patch)
tree837df7fce2bdc2c05ce7c3bf497fc8a5be5cb99f
parentdb50bc0d999e3a90136864a774f85e4e15b144e8 (diff)
downloadvyos-cloud-init-a8dcad9ac62bb1d2a4f7489960395bad6cac9382.tar.gz
vyos-cloud-init-a8dcad9ac62bb1d2a4f7489960395bad6cac9382.zip
tests: Disallow use of util.subp except for where needed.
In many cases, cloud-init uses 'util.subp' to run a subprocess. This is not really desirable in our unit tests as it makes the tests dependent upon existance of those utilities. The change here is to modify the base test case class (CiTestCase) to raise exception any time subp is called. Then, fix all callers. For cases where subp is necessary or actually desired, we can use it via   a.) context hander CiTestCase.allow_subp(value)   b.) class level self.allowed_subp = value Both cases the value is a list of acceptable executable names that will be called (essentially argv[0]). Some cleanups in AltCloud were done as the code was being updated.
-rw-r--r--cloudinit/analyze/tests/test_dump.py86
-rw-r--r--cloudinit/cmd/tests/test_status.py6
-rw-r--r--cloudinit/config/tests/test_snap.py7
-rw-r--r--cloudinit/config/tests/test_ubuntu_advantage.py7
-rw-r--r--cloudinit/net/tests/test_init.py2
-rw-r--r--cloudinit/sources/DataSourceAltCloud.py24
-rw-r--r--cloudinit/sources/DataSourceSmartOS.py29
-rw-r--r--cloudinit/tests/helpers.py43
-rw-r--r--tests/unittests/test_datasource/test_altcloud.py44
-rw-r--r--tests/unittests/test_datasource/test_cloudsigma.py3
-rw-r--r--tests/unittests/test_datasource/test_configdrive.py3
-rw-r--r--tests/unittests/test_datasource/test_nocloud.py2
-rw-r--r--tests/unittests/test_datasource/test_opennebula.py1
-rw-r--r--tests/unittests/test_datasource/test_ovf.py8
-rw-r--r--tests/unittests/test_datasource/test_smartos.py94
-rw-r--r--tests/unittests/test_ds_identify.py1
-rw-r--r--tests/unittests/test_handler/test_handler_apt_source_v3.py11
-rw-r--r--tests/unittests/test_handler/test_handler_bootcmd.py10
-rw-r--r--tests/unittests/test_handler/test_handler_chef.py18
-rw-r--r--tests/unittests/test_handler/test_handler_resizefs.py8
-rw-r--r--tests/unittests/test_handler/test_schema.py12
-rw-r--r--tests/unittests/test_net.py18
-rw-r--r--tests/unittests/test_util.py27
23 files changed, 289 insertions, 175 deletions
diff --git a/cloudinit/analyze/tests/test_dump.py b/cloudinit/analyze/tests/test_dump.py
index f4c42841..db2a667b 100644
--- a/cloudinit/analyze/tests/test_dump.py
+++ b/cloudinit/analyze/tests/test_dump.py
@@ -5,8 +5,8 @@ from textwrap import dedent
from cloudinit.analyze.dump import (
dump_events, parse_ci_logline, parse_timestamp)
-from cloudinit.util import subp, write_file
-from cloudinit.tests.helpers import CiTestCase
+from cloudinit.util import which, write_file
+from cloudinit.tests.helpers import CiTestCase, mock, skipIf
class TestParseTimestamp(CiTestCase):
@@ -15,21 +15,9 @@ class TestParseTimestamp(CiTestCase):
"""Logs with cloud-init detailed formats will be properly parsed."""
trusty_fmt = '%Y-%m-%d %H:%M:%S,%f'
trusty_stamp = '2016-09-12 14:39:20,839'
-
- parsed = parse_timestamp(trusty_stamp)
-
- # convert ourselves
dt = datetime.strptime(trusty_stamp, trusty_fmt)
- expected = float(dt.strftime('%s.%f'))
-
- # use date(1)
- out, _err = subp(['date', '+%s.%3N', '-d', trusty_stamp])
- timestamp = out.strip()
- date_ts = float(timestamp)
-
- self.assertEqual(expected, parsed)
- self.assertEqual(expected, date_ts)
- self.assertEqual(date_ts, parsed)
+ self.assertEqual(
+ float(dt.strftime('%s.%f')), parse_timestamp(trusty_stamp))
def test_parse_timestamp_handles_syslog_adding_year(self):
"""Syslog timestamps lack a year. Add year and properly parse."""
@@ -39,17 +27,9 @@ class TestParseTimestamp(CiTestCase):
# convert stamp ourselves by adding the missing year value
year = datetime.now().year
dt = datetime.strptime(syslog_stamp + " " + str(year), syslog_fmt)
- expected = float(dt.strftime('%s.%f'))
- parsed = parse_timestamp(syslog_stamp)
-
- # use date(1)
- out, _ = subp(['date', '+%s.%3N', '-d', syslog_stamp])
- timestamp = out.strip()
- date_ts = float(timestamp)
-
- self.assertEqual(expected, parsed)
- self.assertEqual(expected, date_ts)
- self.assertEqual(date_ts, parsed)
+ self.assertEqual(
+ float(dt.strftime('%s.%f')),
+ parse_timestamp(syslog_stamp))
def test_parse_timestamp_handles_journalctl_format_adding_year(self):
"""Journalctl precise timestamps lack a year. Add year and parse."""
@@ -59,37 +39,22 @@ class TestParseTimestamp(CiTestCase):
# convert stamp ourselves by adding the missing year value
year = datetime.now().year
dt = datetime.strptime(journal_stamp + " " + str(year), journal_fmt)
- expected = float(dt.strftime('%s.%f'))
- parsed = parse_timestamp(journal_stamp)
-
- # use date(1)
- out, _ = subp(['date', '+%s.%6N', '-d', journal_stamp])
- timestamp = out.strip()
- date_ts = float(timestamp)
-
- self.assertEqual(expected, parsed)
- self.assertEqual(expected, date_ts)
- self.assertEqual(date_ts, parsed)
+ self.assertEqual(
+ float(dt.strftime('%s.%f')), parse_timestamp(journal_stamp))
+ @skipIf(not which("date"), "'date' command not available.")
def test_parse_unexpected_timestamp_format_with_date_command(self):
- """Dump sends unexpected timestamp formats to data for processing."""
+ """Dump sends unexpected timestamp formats to date for processing."""
new_fmt = '%H:%M %m/%d %Y'
new_stamp = '17:15 08/08'
-
# convert stamp ourselves by adding the missing year value
year = datetime.now().year
dt = datetime.strptime(new_stamp + " " + str(year), new_fmt)
- expected = float(dt.strftime('%s.%f'))
- parsed = parse_timestamp(new_stamp)
# use date(1)
- out, _ = subp(['date', '+%s.%6N', '-d', new_stamp])
- timestamp = out.strip()
- date_ts = float(timestamp)
-
- self.assertEqual(expected, parsed)
- self.assertEqual(expected, date_ts)
- self.assertEqual(date_ts, parsed)
+ with self.allow_subp(["date"]):
+ self.assertEqual(
+ float(dt.strftime('%s.%f')), parse_timestamp(new_stamp))
class TestParseCILogLine(CiTestCase):
@@ -135,7 +100,9 @@ class TestParseCILogLine(CiTestCase):
'timestamp': timestamp}
self.assertEqual(expected, parse_ci_logline(line))
- def test_parse_logline_returns_event_for_finish_events(self):
+ @mock.patch("cloudinit.analyze.dump.parse_timestamp_from_date")
+ def test_parse_logline_returns_event_for_finish_events(self,
+ m_parse_from_date):
"""parse_ci_logline returns a finish event for a parsed log line."""
line = ('2016-08-30 21:53:25.972325+00:00 y1 [CLOUDINIT]'
' handlers.py[DEBUG]: finish: modules-final: SUCCESS: running'
@@ -147,7 +114,10 @@ class TestParseCILogLine(CiTestCase):
'origin': 'cloudinit',
'result': 'SUCCESS',
'timestamp': 1472594005.972}
+ m_parse_from_date.return_value = "1472594005.972"
self.assertEqual(expected, parse_ci_logline(line))
+ m_parse_from_date.assert_has_calls(
+ [mock.call("2016-08-30 21:53:25.972325+00:00")])
SAMPLE_LOGS = dedent("""\
@@ -162,10 +132,16 @@ Nov 03 06:51:06.074410 x2 cloud-init[106]: [CLOUDINIT] util.py[DEBUG]:\
class TestDumpEvents(CiTestCase):
maxDiff = None
- def test_dump_events_with_rawdata(self):
+ @mock.patch("cloudinit.analyze.dump.parse_timestamp_from_date")
+ def test_dump_events_with_rawdata(self, m_parse_from_date):
"""Rawdata is split and parsed into a tuple of events and data"""
+ m_parse_from_date.return_value = "1472594005.972"
events, data = dump_events(rawdata=SAMPLE_LOGS)
expected_data = SAMPLE_LOGS.splitlines()
+ self.assertEqual(
+ [mock.call("2016-08-30 21:53:25.972325+00:00")],
+ m_parse_from_date.call_args_list)
+ self.assertEqual(expected_data, data)
year = datetime.now().year
dt1 = datetime.strptime(
'Nov 03 06:51:06.074410 %d' % year, '%b %d %H:%M:%S.%f %Y')
@@ -183,12 +159,14 @@ class TestDumpEvents(CiTestCase):
'result': 'SUCCESS',
'timestamp': 1472594005.972}]
self.assertEqual(expected_events, events)
- self.assertEqual(expected_data, data)
- def test_dump_events_with_cisource(self):
+ @mock.patch("cloudinit.analyze.dump.parse_timestamp_from_date")
+ def test_dump_events_with_cisource(self, m_parse_from_date):
"""Cisource file is read and parsed into a tuple of events and data."""
tmpfile = self.tmp_path('logfile')
write_file(tmpfile, SAMPLE_LOGS)
+ m_parse_from_date.return_value = 1472594005.972
+
events, data = dump_events(cisource=open(tmpfile))
year = datetime.now().year
dt1 = datetime.strptime(
@@ -208,3 +186,5 @@ class TestDumpEvents(CiTestCase):
'timestamp': 1472594005.972}]
self.assertEqual(expected_events, events)
self.assertEqual(SAMPLE_LOGS.splitlines(), [d.strip() for d in data])
+ m_parse_from_date.assert_has_calls(
+ [mock.call("2016-08-30 21:53:25.972325+00:00")])
diff --git a/cloudinit/cmd/tests/test_status.py b/cloudinit/cmd/tests/test_status.py
index 37a89936..aded8580 100644
--- a/cloudinit/cmd/tests/test_status.py
+++ b/cloudinit/cmd/tests/test_status.py
@@ -39,7 +39,8 @@ class TestStatus(CiTestCase):
ensure_file(self.disable_file) # Create the ignored disable file
(is_disabled, reason) = wrap_and_call(
'cloudinit.cmd.status',
- {'uses_systemd': False},
+ {'uses_systemd': False,
+ 'get_cmdline': "root=/dev/my-root not-important"},
status._is_cloudinit_disabled, self.disable_file, self.paths)
self.assertFalse(
is_disabled, 'expected enabled cloud-init on sysvinit')
@@ -50,7 +51,8 @@ class TestStatus(CiTestCase):
ensure_file(self.disable_file) # Create observed disable file
(is_disabled, reason) = wrap_and_call(
'cloudinit.cmd.status',
- {'uses_systemd': True},
+ {'uses_systemd': True,
+ 'get_cmdline': "root=/dev/my-root not-important"},
status._is_cloudinit_disabled, self.disable_file, self.paths)
self.assertTrue(is_disabled, 'expected disabled cloud-init')
self.assertEqual(
diff --git a/cloudinit/config/tests/test_snap.py b/cloudinit/config/tests/test_snap.py
index 34c80f1e..3c472891 100644
--- a/cloudinit/config/tests/test_snap.py
+++ b/cloudinit/config/tests/test_snap.py
@@ -162,6 +162,7 @@ class TestAddAssertions(CiTestCase):
class TestRunCommands(CiTestCase):
with_logs = True
+ allowed_subp = [CiTestCase.SUBP_SHELL_TRUE]
def setUp(self):
super(TestRunCommands, self).setUp()
@@ -424,8 +425,10 @@ class TestHandle(CiTestCase):
'snap': {'commands': ['echo "HI" >> %s' % outfile,
'echo "MOM" >> %s' % outfile]}}
mock_path = 'cloudinit.config.cc_snap.sys.stderr'
- with mock.patch(mock_path, new_callable=StringIO):
- handle('snap', cfg=cfg, cloud=None, log=self.logger, args=None)
+ with self.allow_subp([CiTestCase.SUBP_SHELL_TRUE]):
+ with mock.patch(mock_path, new_callable=StringIO):
+ handle('snap', cfg=cfg, cloud=None, log=self.logger, args=None)
+
self.assertEqual('HI\nMOM\n', util.load_file(outfile))
@mock.patch('cloudinit.config.cc_snap.util.subp')
diff --git a/cloudinit/config/tests/test_ubuntu_advantage.py b/cloudinit/config/tests/test_ubuntu_advantage.py
index f1beeff8..b7cf9bee 100644
--- a/cloudinit/config/tests/test_ubuntu_advantage.py
+++ b/cloudinit/config/tests/test_ubuntu_advantage.py
@@ -23,6 +23,7 @@ class FakeCloud(object):
class TestRunCommands(CiTestCase):
with_logs = True
+ allowed_subp = [CiTestCase.SUBP_SHELL_TRUE]
def setUp(self):
super(TestRunCommands, self).setUp()
@@ -234,8 +235,10 @@ class TestHandle(CiTestCase):
'ubuntu-advantage': {'commands': ['echo "HI" >> %s' % outfile,
'echo "MOM" >> %s' % outfile]}}
mock_path = '%s.sys.stderr' % MPATH
- with mock.patch(mock_path, new_callable=StringIO):
- handle('nomatter', cfg=cfg, cloud=None, log=self.logger, args=None)
+ with self.allow_subp([CiTestCase.SUBP_SHELL_TRUE]):
+ with mock.patch(mock_path, new_callable=StringIO):
+ handle('nomatter', cfg=cfg, cloud=None, log=self.logger,
+ args=None)
self.assertEqual('HI\nMOM\n', util.load_file(outfile))
diff --git a/cloudinit/net/tests/test_init.py b/cloudinit/net/tests/test_init.py
index 5c017d15..8b444f14 100644
--- a/cloudinit/net/tests/test_init.py
+++ b/cloudinit/net/tests/test_init.py
@@ -199,6 +199,8 @@ class TestGenerateFallbackConfig(CiTestCase):
self.sysdir = self.tmp_dir() + '/'
self.m_sys_path.return_value = self.sysdir
self.addCleanup(sys_mock.stop)
+ self.add_patch('cloudinit.net.util.is_container', 'm_is_container',
+ return_value=False)
self.add_patch('cloudinit.net.util.udevadm_settle', 'm_settle')
def test_generate_fallback_finds_connected_eth_with_mac(self):
diff --git a/cloudinit/sources/DataSourceAltCloud.py b/cloudinit/sources/DataSourceAltCloud.py
index 24fd65ff..8cd312d0 100644
--- a/cloudinit/sources/DataSourceAltCloud.py
+++ b/cloudinit/sources/DataSourceAltCloud.py
@@ -181,27 +181,18 @@ class DataSourceAltCloud(sources.DataSource):
# modprobe floppy
try:
- cmd = CMD_PROBE_FLOPPY
- (cmd_out, _err) = util.subp(cmd)
- LOG.debug('Command: %s\nOutput%s', ' '.join(cmd), cmd_out)
+ modprobe_floppy()
except ProcessExecutionError as e:
- util.logexc(LOG, 'Failed command: %s\n%s', ' '.join(cmd), e)
- return False
- except OSError as e:
- util.logexc(LOG, 'Failed command: %s\n%s', ' '.join(cmd), e)
+ util.logexc(LOG, 'Failed modprobe: %s', e)
return False
floppy_dev = '/dev/fd0'
# udevadm settle for floppy device
try:
- (cmd_out, _err) = util.udevadm_settle(exists=floppy_dev, timeout=5)
- LOG.debug('Command: %s\nOutput%s', ' '.join(cmd), cmd_out)
- except ProcessExecutionError as e:
- util.logexc(LOG, 'Failed command: %s\n%s', ' '.join(cmd), e)
- return False
- except OSError as e:
- util.logexc(LOG, 'Failed command: %s\n%s', ' '.join(cmd), e)
+ util.udevadm_settle(exists=floppy_dev, timeout=5)
+ except (ProcessExecutionError, OSError) as e:
+ util.logexc(LOG, 'Failed udevadm_settle: %s\n', e)
return False
try:
@@ -258,6 +249,11 @@ class DataSourceAltCloud(sources.DataSource):
return False
+def modprobe_floppy():
+ out, _err = util.subp(CMD_PROBE_FLOPPY)
+ LOG.debug('Command: %s\nOutput%s', ' '.join(CMD_PROBE_FLOPPY), out)
+
+
# Used to match classes to dependencies
# Source DataSourceAltCloud does not really depend on networking.
# In the future 'dsmode' like behavior can be added to offer user
diff --git a/cloudinit/sources/DataSourceSmartOS.py b/cloudinit/sources/DataSourceSmartOS.py
index ad8cfb9c..593ac91a 100644
--- a/cloudinit/sources/DataSourceSmartOS.py
+++ b/cloudinit/sources/DataSourceSmartOS.py
@@ -683,6 +683,18 @@ def jmc_client_factory(
raise ValueError("Unknown value for smartos_type: %s" % smartos_type)
+def identify_file(content_f):
+ cmd = ["file", "--brief", "--mime-type", content_f]
+ f_type = None
+ try:
+ (f_type, _err) = util.subp(cmd)
+ LOG.debug("script %s mime type is %s", content_f, f_type)
+ except util.ProcessExecutionError as e:
+ util.logexc(
+ LOG, ("Failed to identify script type for %s" % content_f, e))
+ return None if f_type is None else f_type.strip()
+
+
def write_boot_content(content, content_f, link=None, shebang=False,
mode=0o400):
"""
@@ -715,18 +727,11 @@ def write_boot_content(content, content_f, link=None, shebang=False,
util.write_file(content_f, content, mode=mode)
if shebang and not content.startswith("#!"):
- try:
- cmd = ["file", "--brief", "--mime-type", content_f]
- (f_type, _err) = util.subp(cmd)
- LOG.debug("script %s mime type is %s", content_f, f_type)
- if f_type.strip() == "text/plain":
- new_content = "\n".join(["#!/bin/bash", content])
- util.write_file(content_f, new_content, mode=mode)
- LOG.debug("added shebang to file %s", content_f)
-
- except Exception as e:
- util.logexc(LOG, ("Failed to identify script type for %s" %
- content_f, e))
+ f_type = identify_file(content_f)
+ if f_type == "text/plain":
+ util.write_file(
+ content_f, "\n".join(["#!/bin/bash", content]), mode=mode)
+ LOG.debug("added shebang to file %s", content_f)
if link:
try:
diff --git a/cloudinit/tests/helpers.py b/cloudinit/tests/helpers.py
index 9a21426e..e7e4182f 100644
--- a/cloudinit/tests/helpers.py
+++ b/cloudinit/tests/helpers.py
@@ -33,6 +33,8 @@ from cloudinit import helpers as ch
from cloudinit.sources import DataSourceNone
from cloudinit import util
+_real_subp = util.subp
+
# Used for skipping tests
SkipTest = unittest2.SkipTest
skipIf = unittest2.skipIf
@@ -143,6 +145,17 @@ class CiTestCase(TestCase):
# Subclass overrides for specific test behavior
# Whether or not a unit test needs logfile setup
with_logs = False
+ allowed_subp = False
+ SUBP_SHELL_TRUE = "shell=true"
+
+ @contextmanager
+ def allow_subp(self, allowed_subp):
+ orig = self.allowed_subp
+ try:
+ self.allowed_subp = allowed_subp
+ yield
+ finally:
+ self.allowed_subp = orig
def setUp(self):
super(CiTestCase, self).setUp()
@@ -155,11 +168,41 @@ class CiTestCase(TestCase):
handler.setFormatter(formatter)
self.old_handlers = self.logger.handlers
self.logger.handlers = [handler]
+ if self.allowed_subp is True:
+ util.subp = _real_subp
+ else:
+ util.subp = self._fake_subp
+
+ def _fake_subp(self, *args, **kwargs):
+ if 'args' in kwargs:
+ cmd = kwargs['args']
+ else:
+ cmd = args[0]
+
+ if not isinstance(cmd, six.string_types):
+ cmd = cmd[0]
+ pass_through = False
+ if not isinstance(self.allowed_subp, (list, bool)):
+ raise TypeError("self.allowed_subp supports list or bool.")
+ if isinstance(self.allowed_subp, bool):
+ pass_through = self.allowed_subp
+ else:
+ pass_through = (
+ (cmd in self.allowed_subp) or
+ (self.SUBP_SHELL_TRUE in self.allowed_subp and
+ kwargs.get('shell')))
+ if pass_through:
+ return _real_subp(*args, **kwargs)
+ raise Exception(
+ "called subp. set self.allowed_subp=True to allow\n subp(%s)" %
+ ', '.join([str(repr(a)) for a in args] +
+ ["%s=%s" % (k, repr(v)) for k, v in kwargs.items()]))
def tearDown(self):
if self.with_logs:
# Remove the handler we setup
logging.getLogger().handlers = self.old_handlers
+ util.subp = _real_subp
super(CiTestCase, self).tearDown()
def tmp_dir(self, dir=None, cleanup=True):
diff --git a/tests/unittests/test_datasource/test_altcloud.py b/tests/unittests/test_datasource/test_altcloud.py
index 3253f3ad..ff35904e 100644
--- a/tests/unittests/test_datasource/test_altcloud.py
+++ b/tests/unittests/test_datasource/test_altcloud.py
@@ -262,64 +262,56 @@ class TestUserDataRhevm(CiTestCase):
'''
Test to exercise method: DataSourceAltCloud.user_data_rhevm()
'''
- cmd_pass = ['true']
- cmd_fail = ['false']
- cmd_not_found = ['bogus bad command']
-
def setUp(self):
'''Set up.'''
self.paths = helpers.Paths({'cloud_dir': '/tmp'})
- self.mount_dir = tempfile.mkdtemp()
+ self.mount_dir = self.tmp_dir()
_write_user_data_files(self.mount_dir, 'test user data')
-
- def tearDown(self):
- # Reset
-
- _remove_user_data_files(self.mount_dir)
-
- # Attempt to remove the temp dir ignoring errors
- try:
- shutil.rmtree(self.mount_dir)
- except OSError:
- pass
-
- dsac.CLOUD_INFO_FILE = '/etc/sysconfig/cloud-info'
- dsac.CMD_PROBE_FLOPPY = ['modprobe', 'floppy']
- dsac.CMD_UDEVADM_SETTLE = ['udevadm', 'settle',
- '--quiet', '--timeout=5']
+ self.add_patch(
+ 'cloudinit.sources.DataSourceAltCloud.modprobe_floppy',
+ 'm_modprobe_floppy', return_value=None)
+ self.add_patch(
+ 'cloudinit.sources.DataSourceAltCloud.util.udevadm_settle',
+ 'm_udevadm_settle', return_value=('', ''))
+ self.add_patch(
+ 'cloudinit.sources.DataSourceAltCloud.util.mount_cb',
+ 'm_mount_cb')
def test_mount_cb_fails(self):
'''Test user_data_rhevm() where mount_cb fails.'''
- dsac.CMD_PROBE_FLOPPY = self.cmd_pass
+ self.m_mount_cb.side_effect = util.MountFailedError("Failed Mount")
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
self.assertEqual(False, dsrc.user_data_rhevm())
def test_modprobe_fails(self):
'''Test user_data_rhevm() where modprobe fails.'''
- dsac.CMD_PROBE_FLOPPY = self.cmd_fail
+ self.m_modprobe_floppy.side_effect = util.ProcessExecutionError(
+ "Failed modprobe")
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
self.assertEqual(False, dsrc.user_data_rhevm())
def test_no_modprobe_cmd(self):
'''Test user_data_rhevm() with no modprobe command.'''
- dsac.CMD_PROBE_FLOPPY = self.cmd_not_found
+ self.m_modprobe_floppy.side_effect = util.ProcessExecutionError(
+ "No such file or dir")
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
self.assertEqual(False, dsrc.user_data_rhevm())
def test_udevadm_fails(self):
'''Test user_data_rhevm() where udevadm fails.'''
- dsac.CMD_UDEVADM_SETTLE = self.cmd_fail
+ self.m_udevadm_settle.side_effect = util.ProcessExecutionError(
+ "Failed settle.")
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
self.assertEqual(False, dsrc.user_data_rhevm())
def test_no_udevadm_cmd(self):
'''Test user_data_rhevm() with no udevadm command.'''
- dsac.CMD_UDEVADM_SETTLE = self.cmd_not_found
+ self.m_udevadm_settle.side_effect = OSError("No such file or dir")
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
self.assertEqual(False, dsrc.user_data_rhevm())
diff --git a/tests/unittests/test_datasource/test_cloudsigma.py b/tests/unittests/test_datasource/test_cloudsigma.py
index f6a59b6b..380ad1b5 100644
--- a/tests/unittests/test_datasource/test_cloudsigma.py
+++ b/tests/unittests/test_datasource/test_cloudsigma.py
@@ -42,6 +42,9 @@ class CepkoMock(Cepko):
class DataSourceCloudSigmaTest(test_helpers.CiTestCase):
def setUp(self):
super(DataSourceCloudSigmaTest, self).setUp()
+ self.add_patch(
+ "cloudinit.sources.DataSourceCloudSigma.util.is_container",
+ "m_is_container", return_value=False)
self.paths = helpers.Paths({'run_dir': self.tmp_dir()})
self.datasource = DataSourceCloudSigma.DataSourceCloudSigma(
"", "", paths=self.paths)
diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py
index 7e6fcbbf..b0abfc5f 100644
--- a/tests/unittests/test_datasource/test_configdrive.py
+++ b/tests/unittests/test_datasource/test_configdrive.py
@@ -224,6 +224,9 @@ class TestConfigDriveDataSource(CiTestCase):
def setUp(self):
super(TestConfigDriveDataSource, self).setUp()
+ self.add_patch(
+ "cloudinit.sources.DataSourceConfigDrive.util.find_devs_with",
+ "m_find_devs_with", return_value=[])
self.tmp = self.tmp_dir()
def test_ec2_metadata(self):
diff --git a/tests/unittests/test_datasource/test_nocloud.py b/tests/unittests/test_datasource/test_nocloud.py
index cdbd1e1a..21931eb7 100644
--- a/tests/unittests/test_datasource/test_nocloud.py
+++ b/tests/unittests/test_datasource/test_nocloud.py
@@ -25,6 +25,8 @@ class TestNoCloudDataSource(CiTestCase):
self.mocks.enter_context(
mock.patch.object(util, 'get_cmdline', return_value=self.cmdline))
+ self.mocks.enter_context(
+ mock.patch.object(util, 'read_dmi_data', return_value=None))
def test_nocloud_seed_dir(self):
md = {'instance-id': 'IID', 'dsmode': 'local'}
diff --git a/tests/unittests/test_datasource/test_opennebula.py b/tests/unittests/test_datasource/test_opennebula.py
index 36b4d771..61591017 100644
--- a/tests/unittests/test_datasource/test_opennebula.py
+++ b/tests/unittests/test_datasource/test_opennebula.py
@@ -43,6 +43,7 @@ DS_PATH = "cloudinit.sources.DataSourceOpenNebula"
class TestOpenNebulaDataSource(CiTestCase):
parsed_user = None
+ allowed_subp = ['bash']
def setUp(self):
super(TestOpenNebulaDataSource, self).setUp()
diff --git a/tests/unittests/test_datasource/test_ovf.py b/tests/unittests/test_datasource/test_ovf.py
index fc4eb36e..9d52eb99 100644
--- a/tests/unittests/test_datasource/test_ovf.py
+++ b/tests/unittests/test_datasource/test_ovf.py
@@ -124,7 +124,9 @@ class TestDatasourceOVF(CiTestCase):
ds = self.datasource(sys_cfg={}, distro={}, paths=paths)
retcode = wrap_and_call(
'cloudinit.sources.DataSourceOVF',
- {'util.read_dmi_data': None},
+ {'util.read_dmi_data': None,
+ 'transport_iso9660': (False, None, None),
+ 'transport_vmware_guestd': (False, None, None)},
ds.get_data)
self.assertFalse(retcode, 'Expected False return from ds.get_data')
self.assertIn(
@@ -138,7 +140,9 @@ class TestDatasourceOVF(CiTestCase):
paths=paths)
retcode = wrap_and_call(
'cloudinit.sources.DataSourceOVF',
- {'util.read_dmi_data': 'vmware'},
+ {'util.read_dmi_data': 'vmware',
+ 'transport_iso9660': (False, None, None),
+ 'transport_vmware_guestd': (False, None, None)},
ds.get_data)
self.assertFalse(retcode, 'Expected False return from ds.get_data')
self.assertIn(
diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py
index dca0b3d4..46d67b94 100644
--- a/tests/unittests/test_datasource/test_smartos.py
+++ b/tests/unittests/test_datasource/test_smartos.py
@@ -20,10 +20,8 @@ import multiprocessing
import os
import os.path
import re
-import shutil
import signal
import stat
-import tempfile
import unittest2
import uuid
@@ -31,15 +29,27 @@ from cloudinit import serial
from cloudinit.sources import DataSourceSmartOS
from cloudinit.sources.DataSourceSmartOS import (
convert_smartos_network_data as convert_net,
- SMARTOS_ENV_KVM, SERIAL_DEVICE, get_smartos_environ)
+ SMARTOS_ENV_KVM, SERIAL_DEVICE, get_smartos_environ,
+ identify_file)
import six
from cloudinit import helpers as c_helpers
-from cloudinit.util import (b64e, subp)
+from cloudinit.util import (
+ b64e, subp, ProcessExecutionError, which, write_file)
-from cloudinit.tests.helpers import mock, FilesystemMockingTestCase, TestCase
+from cloudinit.tests.helpers import (
+ CiTestCase, mock, FilesystemMockingTestCase, skipIf)
+
+try:
+ import serial as _pyserial
+ assert _pyserial # avoid pyflakes error F401: import unused
+ HAS_PYSERIAL = True
+except ImportError:
+ HAS_PYSERIAL = False
+
+DSMOS = 'cloudinit.sources.DataSourceSmartOS'
SDC_NICS = json.loads("""
[
{
@@ -366,37 +376,33 @@ class PsuedoJoyentClient(object):
class TestSmartOSDataSource(FilesystemMockingTestCase):
+ jmc_cfact = None
+ get_smartos_environ = None
+
def setUp(self):
super(TestSmartOSDataSource, self).setUp()
- dsmos = 'cloudinit.sources.DataSourceSmartOS'
- patcher = mock.patch(dsmos + ".jmc_client_factory")
- self.jmc_cfact = patcher.start()
- self.addCleanup(patcher.stop)
- patcher = mock.patch(dsmos + ".get_smartos_environ")
- self.get_smartos_environ = patcher.start()
- self.addCleanup(patcher.stop)
-
- self.tmp = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.tmp)
- self.paths = c_helpers.Paths(
- {'cloud_dir': self.tmp, 'run_dir': self.tmp})
-
- self.legacy_user_d = os.path.join(self.tmp, 'legacy_user_tmp')
+ self.add_patch(DSMOS + ".get_smartos_environ", "get_smartos_environ")
+ self.add_patch(DSMOS + ".jmc_client_factory", "jmc_cfact")
+ self.legacy_user_d = self.tmp_path('legacy_user_tmp')
os.mkdir(self.legacy_user_d)
-
- self.orig_lud = DataSourceSmartOS.LEGACY_USER_D
- DataSourceSmartOS.LEGACY_USER_D = self.legacy_user_d
-
- def tearDown(self):
- DataSourceSmartOS.LEGACY_USER_D = self.orig_lud
- super(TestSmartOSDataSource, self).tearDown()
+ self.add_patch(DSMOS + ".LEGACY_USER_D", "m_legacy_user_d",
+ autospec=False, new=self.legacy_user_d)
+ self.add_patch(DSMOS + ".identify_file", "m_identify_file",
+ return_value="text/plain")
def _get_ds(self, mockdata=None, mode=DataSourceSmartOS.SMARTOS_ENV_KVM,
sys_cfg=None, ds_cfg=None):
self.jmc_cfact.return_value = PsuedoJoyentClient(mockdata)
self.get_smartos_environ.return_value = mode
+ tmpd = self.tmp_dir()
+ dirs = {'cloud_dir': self.tmp_path('cloud_dir', tmpd),
+ 'run_dir': self.tmp_path('run_dir')}
+ for d in dirs.values():
+ os.mkdir(d)
+ paths = c_helpers.Paths(dirs)
+
if sys_cfg is None:
sys_cfg = {}
@@ -405,7 +411,7 @@ class TestSmartOSDataSource(FilesystemMockingTestCase):
sys_cfg['datasource']['SmartOS'] = ds_cfg
return DataSourceSmartOS.DataSourceSmartOS(
- sys_cfg, distro=None, paths=self.paths)
+ sys_cfg, distro=None, paths=paths)
def test_no_base64(self):
ds_cfg = {'no_base64_decode': ['test_var1'], 'all_base': True}
@@ -493,6 +499,7 @@ class TestSmartOSDataSource(FilesystemMockingTestCase):
dsrc.metadata['user-script'])
legacy_script_f = "%s/user-script" % self.legacy_user_d
+ print("legacy_script_f=%s" % legacy_script_f)
self.assertTrue(os.path.exists(legacy_script_f))
self.assertTrue(os.path.islink(legacy_script_f))
user_script_perm = oct(os.stat(legacy_script_f)[stat.ST_MODE])[-3:]
@@ -640,6 +647,28 @@ class TestSmartOSDataSource(FilesystemMockingTestCase):
mydscfg['disk_aliases']['FOO'])
+class TestIdentifyFile(CiTestCase):
+ """Test the 'identify_file' utility."""
+ @skipIf(not which("file"), "command 'file' not available.")
+ def test_file_happy_path(self):
+ """Test file is available and functional on plain text."""
+ fname = self.tmp_path("myfile")
+ write_file(fname, "plain text content here\n")
+ with self.allow_subp(["file"]):
+ self.assertEqual("text/plain", identify_file(fname))
+
+ @mock.patch(DSMOS + ".util.subp")
+ def test_returns_none_on_error(self, m_subp):
+ """On 'file' execution error, None should be returned."""
+ m_subp.side_effect = ProcessExecutionError("FILE_FAILED", exit_code=99)
+ fname = self.tmp_path("myfile")
+ write_file(fname, "plain text content here\n")
+ self.assertEqual(None, identify_file(fname))
+ self.assertEqual(
+ [mock.call(["file", "--brief", "--mime-type", fname])],
+ m_subp.call_args_list)
+
+
class ShortReader(object):
"""Implements a 'read' interface for bytes provided.
much like io.BytesIO but the 'endbyte' acts as if EOF.
@@ -893,7 +922,7 @@ class TestJoyentMetadataClient(FilesystemMockingTestCase):
self.assertEqual(client.list(), [])
-class TestNetworkConversion(TestCase):
+class TestNetworkConversion(CiTestCase):
def test_convert_simple(self):
expected = {
'version': 1,
@@ -1058,7 +1087,8 @@ class TestNetworkConversion(TestCase):
"Only supported on KVM and bhyve guests under SmartOS")
@unittest2.skipUnless(os.access(SERIAL_DEVICE, os.W_OK),
"Requires write access to " + SERIAL_DEVICE)
-class TestSerialConcurrency(TestCase):
+@unittest2.skipUnless(HAS_PYSERIAL is True, "pyserial not available")
+class TestSerialConcurrency(CiTestCase):
"""
This class tests locking on an actual serial port, and as such can only
be run in a kvm or bhyve guest running on a SmartOS host. A test run on
@@ -1066,7 +1096,11 @@ class TestSerialConcurrency(TestCase):
there is only one session over a connection. In contrast, in the
absence of proper locking multiple processes opening the same serial
port can corrupt each others' exchanges with the metadata server.
+
+ This takes on the order of 2 to 3 minutes to run.
"""
+ allowed_subp = ['mdata-get']
+
def setUp(self):
self.mdata_proc = multiprocessing.Process(target=self.start_mdata_loop)
self.mdata_proc.start()
@@ -1097,7 +1131,7 @@ class TestSerialConcurrency(TestCase):
keys = [tup[0] for tup in ds.SMARTOS_ATTRIB_MAP.values()]
keys.extend(ds.SMARTOS_ATTRIB_JSON.values())
- client = ds.jmc_client_factory()
+ client = ds.jmc_client_factory(smartos_type=SMARTOS_ENV_KVM)
self.assertIsNotNone(client)
# The behavior that we are testing for was observed mdata-get running
diff --git a/tests/unittests/test_ds_identify.py b/tests/unittests/test_ds_identify.py
index e08e7908..46778e95 100644
--- a/tests/unittests/test_ds_identify.py
+++ b/tests/unittests/test_ds_identify.py
@@ -89,6 +89,7 @@ CallReturn = namedtuple('CallReturn',
class DsIdentifyBase(CiTestCase):
dsid_path = os.path.realpath('tools/ds-identify')
+ allowed_subp = ['sh']
def call(self, rootd=None, mocks=None, func="main", args=None, files=None,
policy_dmi=DI_DEFAULT_POLICY,
diff --git a/tests/unittests/test_handler/test_handler_apt_source_v3.py b/tests/unittests/test_handler/test_handler_apt_source_v3.py
index 7a64c230..a81c67c7 100644
--- a/tests/unittests/test_handler/test_handler_apt_source_v3.py
+++ b/tests/unittests/test_handler/test_handler_apt_source_v3.py
@@ -48,6 +48,10 @@ ADD_APT_REPO_MATCH = r"^[\w-]+:\w"
TARGET = None
+MOCK_LSB_RELEASE_DATA = {
+ 'id': 'Ubuntu', 'description': 'Ubuntu 18.04.1 LTS',
+ 'release': '18.04', 'codename': 'bionic'}
+
class TestAptSourceConfig(t_help.FilesystemMockingTestCase):
"""TestAptSourceConfig
@@ -64,6 +68,9 @@ class TestAptSourceConfig(t_help.FilesystemMockingTestCase):
self.aptlistfile3 = os.path.join(self.tmp, "single-deb3.list")
self.join = os.path.join
self.matcher = re.compile(ADD_APT_REPO_MATCH).search
+ self.add_patch(
+ 'cloudinit.config.cc_apt_configure.util.lsb_release',
+ 'm_lsb_release', return_value=MOCK_LSB_RELEASE_DATA.copy())
@staticmethod
def _add_apt_sources(*args, **kwargs):
@@ -76,7 +83,7 @@ class TestAptSourceConfig(t_help.FilesystemMockingTestCase):
Get the most basic default mrror and release info to be used in tests
"""
params = {}
- params['RELEASE'] = util.lsb_release()['codename']
+ params['RELEASE'] = MOCK_LSB_RELEASE_DATA['release']
arch = 'amd64'
params['MIRROR'] = cc_apt_configure.\
get_default_mirrors(arch)["PRIMARY"]
@@ -464,7 +471,7 @@ class TestAptSourceConfig(t_help.FilesystemMockingTestCase):
'uri':
'http://testsec.ubuntu.com/%s/' % component}]}
post = ("%s_dists_%s-updates_InRelease" %
- (component, util.lsb_release()['codename']))
+ (component, MOCK_LSB_RELEASE_DATA['codename']))
fromfn = ("%s/%s_%s" % (pre, archive, post))
tofn = ("%s/test.ubuntu.com_%s" % (pre, post))
diff --git a/tests/unittests/test_handler/test_handler_bootcmd.py b/tests/unittests/test_handler/test_handler_bootcmd.py
index b1375269..a76760fa 100644
--- a/tests/unittests/test_handler/test_handler_bootcmd.py
+++ b/tests/unittests/test_handler/test_handler_bootcmd.py
@@ -118,7 +118,8 @@ class TestBootcmd(CiTestCase):
'echo {0} $INSTANCE_ID > {1}'.format(my_id, out_file)]}
with mock.patch(self._etmpfile_path, FakeExtendedTempFile):
- handle('cc_bootcmd', valid_config, cc, LOG, [])
+ with self.allow_subp(['/bin/sh']):
+ handle('cc_bootcmd', valid_config, cc, LOG, [])
self.assertEqual(my_id + ' iid-datasource-none\n',
util.load_file(out_file))
@@ -128,12 +129,13 @@ class TestBootcmd(CiTestCase):
valid_config = {'bootcmd': ['exit 1']} # Script with error
with mock.patch(self._etmpfile_path, FakeExtendedTempFile):
- with self.assertRaises(util.ProcessExecutionError) as ctxt_manager:
- handle('does-not-matter', valid_config, cc, LOG, [])
+ with self.allow_subp(['/bin/sh']):
+ with self.assertRaises(util.ProcessExecutionError) as ctxt:
+ handle('does-not-matter', valid_config, cc, LOG, [])
self.assertIn(
'Unexpected error while running command.\n'
"Command: ['/bin/sh',",
- str(ctxt_manager.exception))
+ str(ctxt.exception))
self.assertIn(
'Failed to run bootcmd module does-not-matter',
self.logs.getvalue())
diff --git a/tests/unittests/test_handler/test_handler_chef.py b/tests/unittests/test_handler/test_handler_chef.py
index f4bbd66d..b16532ea 100644
--- a/tests/unittests/test_handler/test_handler_chef.py
+++ b/tests/unittests/test_handler/test_handler_chef.py
@@ -36,13 +36,21 @@ class TestInstallChefOmnibus(HttprettyTestCase):
@mock.patch("cloudinit.config.cc_chef.OMNIBUS_URL", OMNIBUS_URL_HTTP)
def test_install_chef_from_omnibus_runs_chef_url_content(self):
- """install_chef_from_omnibus runs downloaded OMNIBUS_URL as script."""
- chef_outfile = self.tmp_path('chef.out', self.new_root)
- response = '#!/bin/bash\necho "Hi Mom" > {0}'.format(chef_outfile)
+ """install_chef_from_omnibus calls subp_blob_in_tempfile."""
+ response = b'#!/bin/bash\necho "Hi Mom"'
httpretty.register_uri(
httpretty.GET, cc_chef.OMNIBUS_URL, body=response, status=200)
- cc_chef.install_chef_from_omnibus()
- self.assertEqual('Hi Mom\n', util.load_file(chef_outfile))
+ ret = (None, None) # stdout, stderr but capture=False
+
+ with mock.patch("cloudinit.config.cc_chef.util.subp_blob_in_tempfile",
+ return_value=ret) as m_subp_blob:
+ cc_chef.install_chef_from_omnibus()
+ # admittedly whitebox, but assuming subp_blob_in_tempfile works
+ # this should be fine.
+ self.assertEqual(
+ [mock.call(blob=response, args=[], basename='chef-omnibus-install',
+ capture=False)],
+ m_subp_blob.call_args_list)
@mock.patch('cloudinit.config.cc_chef.url_helper.readurl')
@mock.patch('cloudinit.config.cc_chef.util.subp_blob_in_tempfile')
diff --git a/tests/unittests/test_handler/test_handler_resizefs.py b/tests/unittests/test_handler/test_handler_resizefs.py
index f92175fd..feca56c2 100644
--- a/tests/unittests/test_handler/test_handler_resizefs.py
+++ b/tests/unittests/test_handler/test_handler_resizefs.py
@@ -150,10 +150,12 @@ class TestResizefs(CiTestCase):
self.assertEqual(('growfs', '-y', devpth),
_resize_ufs(mount_point, devpth))
+ @mock.patch('cloudinit.util.is_container', return_value=False)
@mock.patch('cloudinit.util.get_mount_info')
@mock.patch('cloudinit.util.get_device_info_from_zpool')
@mock.patch('cloudinit.util.parse_mount')
- def test_handle_zfs_root(self, mount_info, zpool_info, parse_mount):
+ def test_handle_zfs_root(self, mount_info, zpool_info, parse_mount,
+ is_container):
devpth = 'vmzroot/ROOT/freebsd'
disk = 'gpt/system'
fs_type = 'zfs'
@@ -354,8 +356,10 @@ class TestMaybeGetDevicePathAsWritableBlock(CiTestCase):
('btrfs', 'filesystem', 'resize', 'max', '/'),
_resize_btrfs("/", "/dev/sda1"))
+ @mock.patch('cloudinit.util.is_container', return_value=True)
@mock.patch('cloudinit.util.is_FreeBSD')
- def test_maybe_get_writable_device_path_zfs_freebsd(self, freebsd):
+ def test_maybe_get_writable_device_path_zfs_freebsd(self, freebsd,
+ m_is_container):
freebsd.return_value = True
info = 'dev=gpt/system mnt_point=/ path=/'
devpth = maybe_get_writable_device_path('gpt/system', info, LOG)
diff --git a/tests/unittests/test_handler/test_schema.py b/tests/unittests/test_handler/test_schema.py
index fb266faf..1bad07f6 100644
--- a/tests/unittests/test_handler/test_schema.py
+++ b/tests/unittests/test_handler/test_schema.py
@@ -4,7 +4,7 @@ from cloudinit.config.schema import (
CLOUD_CONFIG_HEADER, SchemaValidationError, annotated_cloudconfig_file,
get_schema_doc, get_schema, validate_cloudconfig_file,
validate_cloudconfig_schema, main)
-from cloudinit.util import subp, write_file
+from cloudinit.util import write_file
from cloudinit.tests.helpers import CiTestCase, mock, skipUnlessJsonSchema
@@ -406,8 +406,14 @@ class CloudTestsIntegrationTest(CiTestCase):
integration_testdir = os.path.sep.join(
[testsdir, 'cloud_tests', 'testcases'])
errors = []
- out, _ = subp(['find', integration_testdir, '-name', '*yaml'])
- for filename in out.splitlines():
+
+ yaml_files = []
+ for root, _dirnames, filenames in os.walk(integration_testdir):
+ yaml_files.extend([os.path.join(root, f)
+ for f in filenames if f.endswith(".yaml")])
+ self.assertTrue(len(yaml_files) > 0)
+
+ for filename in yaml_files:
test_cfg = safe_load(open(filename))
cloud_config = test_cfg.get('cloud_config')
if cloud_config:
diff --git a/tests/unittests/test_net.py b/tests/unittests/test_net.py
index 05d5c72c..f3165da9 100644
--- a/tests/unittests/test_net.py
+++ b/tests/unittests/test_net.py
@@ -1653,6 +1653,12 @@ def _setup_test(tmp_dir, mock_get_devicelist, mock_read_sys_net,
class TestGenerateFallbackConfig(CiTestCase):
+ def setUp(self):
+ super(TestGenerateFallbackConfig, self).setUp()
+ self.add_patch(
+ "cloudinit.util.get_cmdline", "m_get_cmdline",
+ return_value="root=/dev/sda1")
+
@mock.patch("cloudinit.net.sys_dev_path")
@mock.patch("cloudinit.net.read_sys_net")
@mock.patch("cloudinit.net.get_devicelist")
@@ -1895,12 +1901,13 @@ class TestRhelSysConfigRendering(CiTestCase):
if missing:
raise AssertionError("Missing headers in: %s" % missing)
+ @mock.patch("cloudinit.net.util.get_cmdline", return_value="root=myroot")
@mock.patch("cloudinit.net.sys_dev_path")
@mock.patch("cloudinit.net.read_sys_net")
@mock.patch("cloudinit.net.get_devicelist")
def test_default_generation(self, mock_get_devicelist,
mock_read_sys_net,
- mock_sys_dev_path):
+ mock_sys_dev_path, m_get_cmdline):
tmp_dir = self.tmp_dir()
_setup_test(tmp_dir, mock_get_devicelist,
mock_read_sys_net, mock_sys_dev_path)
@@ -2183,12 +2190,13 @@ class TestOpenSuseSysConfigRendering(CiTestCase):
if missing:
raise AssertionError("Missing headers in: %s" % missing)
+ @mock.patch("cloudinit.net.util.get_cmdline", return_value="root=myroot")
@mock.patch("cloudinit.net.sys_dev_path")
@mock.patch("cloudinit.net.read_sys_net")
@mock.patch("cloudinit.net.get_devicelist")
def test_default_generation(self, mock_get_devicelist,
mock_read_sys_net,
- mock_sys_dev_path):
+ mock_sys_dev_path, m_get_cmdline):
tmp_dir = self.tmp_dir()
_setup_test(tmp_dir, mock_get_devicelist,
mock_read_sys_net, mock_sys_dev_path)
@@ -2429,12 +2437,13 @@ USERCTL=no
class TestEniNetRendering(CiTestCase):
+ @mock.patch("cloudinit.net.util.get_cmdline", return_value="root=myroot")
@mock.patch("cloudinit.net.sys_dev_path")
@mock.patch("cloudinit.net.read_sys_net")
@mock.patch("cloudinit.net.get_devicelist")
def test_default_generation(self, mock_get_devicelist,
mock_read_sys_net,
- mock_sys_dev_path):
+ mock_sys_dev_path, m_get_cmdline):
tmp_dir = self.tmp_dir()
_setup_test(tmp_dir, mock_get_devicelist,
mock_read_sys_net, mock_sys_dev_path)
@@ -2482,6 +2491,7 @@ iface eth0 inet dhcp
class TestNetplanNetRendering(CiTestCase):
+ @mock.patch("cloudinit.net.util.get_cmdline", return_value="root=myroot")
@mock.patch("cloudinit.net.netplan._clean_default")
@mock.patch("cloudinit.net.sys_dev_path")
@mock.patch("cloudinit.net.read_sys_net")
@@ -2489,7 +2499,7 @@ class TestNetplanNetRendering(CiTestCase):
def test_default_generation(self, mock_get_devicelist,
mock_read_sys_net,
mock_sys_dev_path,
- mock_clean_default):
+ mock_clean_default, m_get_cmdline):
tmp_dir = self.tmp_dir()
_setup_test(tmp_dir, mock_get_devicelist,
mock_read_sys_net, mock_sys_dev_path)
diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py
index 7a203ce2..5a14479a 100644
--- a/tests/unittests/test_util.py
+++ b/tests/unittests/test_util.py
@@ -24,6 +24,7 @@ except ImportError:
BASH = util.which('bash')
+BOGUS_COMMAND = 'this-is-not-expected-to-be-a-program-name'
class FakeSelinux(object):
@@ -742,6 +743,8 @@ class TestReadSeeded(helpers.TestCase):
class TestSubp(helpers.CiTestCase):
with_logs = True
+ allowed_subp = [BASH, 'cat', helpers.CiTestCase.SUBP_SHELL_TRUE,
+ BOGUS_COMMAND, sys.executable]
stdin2err = [BASH, '-c', 'cat >&2']
stdin2out = ['cat']
@@ -749,7 +752,6 @@ class TestSubp(helpers.CiTestCase):
utf8_valid = b'start \xc3\xa9 end'
utf8_valid_2 = b'd\xc3\xa9j\xc8\xa7'
printenv = [BASH, '-c', 'for n in "$@"; do echo "$n=${!n}"; done', '--']
- bogus_command = 'this-is-not-expected-to-be-a-program-name'
def printf_cmd(self, *args):
# bash's printf supports \xaa. So does /usr/bin/printf
@@ -848,9 +850,10 @@ class TestSubp(helpers.CiTestCase):
util.write_file(noshebang, 'true\n')
os.chmod(noshebang, os.stat(noshebang).st_mode | stat.S_IEXEC)
- self.assertRaisesRegex(util.ProcessExecutionError,
- r'Missing #! in script\?',
- util.subp, (noshebang,))
+ with self.allow_subp([noshebang]):
+ self.assertRaisesRegex(util.ProcessExecutionError,
+ r'Missing #! in script\?',
+ util.subp, (noshebang,))
def test_subp_combined_stderr_stdout(self):
"""Providing combine_capture as True redirects stderr to stdout."""
@@ -868,14 +871,14 @@ class TestSubp(helpers.CiTestCase):
def test_exception_has_out_err_are_bytes_if_decode_false(self):
"""Raised exc should have stderr, stdout as bytes if no decode."""
with self.assertRaises(util.ProcessExecutionError) as cm:
- util.subp([self.bogus_command], decode=False)
+ util.subp([BOGUS_COMMAND], decode=False)
self.assertTrue(isinstance(cm.exception.stdout, bytes))
self.assertTrue(isinstance(cm.exception.stderr, bytes))
def test_exception_has_out_err_are_bytes_if_decode_true(self):
"""Raised exc should have stderr, stdout as string if no decode."""
with self.assertRaises(util.ProcessExecutionError) as cm:
- util.subp([self.bogus_command], decode=True)
+ util.subp([BOGUS_COMMAND], decode=True)
self.assertTrue(isinstance(cm.exception.stdout, six.string_types))
self.assertTrue(isinstance(cm.exception.stderr, six.string_types))
@@ -925,10 +928,10 @@ class TestSubp(helpers.CiTestCase):
logs.append(log)
with self.assertRaises(util.ProcessExecutionError):
- util.subp([self.bogus_command], status_cb=status_cb)
+ util.subp([BOGUS_COMMAND], status_cb=status_cb)
expected = [
- 'Begin run command: {cmd}\n'.format(cmd=self.bogus_command),
+ 'Begin run command: {cmd}\n'.format(cmd=BOGUS_COMMAND),
'ERROR: End run command: invalid command provided\n']
self.assertEqual(expected, logs)
@@ -940,13 +943,13 @@ class TestSubp(helpers.CiTestCase):
logs.append(log)
with self.assertRaises(util.ProcessExecutionError):
- util.subp(['ls', '/I/dont/exist'], status_cb=status_cb)
- util.subp(['ls'], status_cb=status_cb)
+ util.subp([BASH, '-c', 'exit 2'], status_cb=status_cb)
+ util.subp([BASH, '-c', 'exit 0'], status_cb=status_cb)
expected = [
- 'Begin run command: ls /I/dont/exist\n',
+ 'Begin run command: %s -c exit 2\n' % BASH,
'ERROR: End run command: exit(2)\n',
- 'Begin run command: ls\n',
+ 'Begin run command: %s -c exit 0\n' % BASH,
'End run command: exit(0)\n']
self.assertEqual(expected, logs)