diff options
author | Scott Moser <smoser@ubuntu.com> | 2018-09-05 16:02:25 +0000 |
---|---|---|
committer | Server Team CI Bot <josh.powers+server-team-bot@canonical.com> | 2018-09-05 16:02:25 +0000 |
commit | a8dcad9ac62bb1d2a4f7489960395bad6cac9382 (patch) | |
tree | 837df7fce2bdc2c05ce7c3bf497fc8a5be5cb99f /cloudinit | |
parent | db50bc0d999e3a90136864a774f85e4e15b144e8 (diff) | |
download | vyos-cloud-init-a8dcad9ac62bb1d2a4f7489960395bad6cac9382.tar.gz vyos-cloud-init-a8dcad9ac62bb1d2a4f7489960395bad6cac9382.zip |
tests: Disallow use of util.subp except for where needed.
In many cases, cloud-init uses 'util.subp' to run a subprocess.
This is not really desirable in our unit tests as it makes the tests
dependent upon existance of those utilities.
The change here is to modify the base test case class (CiTestCase) to
raise exception any time subp is called. Then, fix all callers.
For cases where subp is necessary or actually desired, we can use it
via
a.) context hander CiTestCase.allow_subp(value)
b.) class level self.allowed_subp = value
Both cases the value is a list of acceptable executable names that
will be called (essentially argv[0]).
Some cleanups in AltCloud were done as the code was being updated.
Diffstat (limited to 'cloudinit')
-rw-r--r-- | cloudinit/analyze/tests/test_dump.py | 86 | ||||
-rw-r--r-- | cloudinit/cmd/tests/test_status.py | 6 | ||||
-rw-r--r-- | cloudinit/config/tests/test_snap.py | 7 | ||||
-rw-r--r-- | cloudinit/config/tests/test_ubuntu_advantage.py | 7 | ||||
-rw-r--r-- | cloudinit/net/tests/test_init.py | 2 | ||||
-rw-r--r-- | cloudinit/sources/DataSourceAltCloud.py | 24 | ||||
-rw-r--r-- | cloudinit/sources/DataSourceSmartOS.py | 29 | ||||
-rw-r--r-- | cloudinit/tests/helpers.py | 43 |
8 files changed, 119 insertions, 85 deletions
diff --git a/cloudinit/analyze/tests/test_dump.py b/cloudinit/analyze/tests/test_dump.py index f4c42841..db2a667b 100644 --- a/cloudinit/analyze/tests/test_dump.py +++ b/cloudinit/analyze/tests/test_dump.py @@ -5,8 +5,8 @@ from textwrap import dedent from cloudinit.analyze.dump import ( dump_events, parse_ci_logline, parse_timestamp) -from cloudinit.util import subp, write_file -from cloudinit.tests.helpers import CiTestCase +from cloudinit.util import which, write_file +from cloudinit.tests.helpers import CiTestCase, mock, skipIf class TestParseTimestamp(CiTestCase): @@ -15,21 +15,9 @@ class TestParseTimestamp(CiTestCase): """Logs with cloud-init detailed formats will be properly parsed.""" trusty_fmt = '%Y-%m-%d %H:%M:%S,%f' trusty_stamp = '2016-09-12 14:39:20,839' - - parsed = parse_timestamp(trusty_stamp) - - # convert ourselves dt = datetime.strptime(trusty_stamp, trusty_fmt) - expected = float(dt.strftime('%s.%f')) - - # use date(1) - out, _err = subp(['date', '+%s.%3N', '-d', trusty_stamp]) - timestamp = out.strip() - date_ts = float(timestamp) - - self.assertEqual(expected, parsed) - self.assertEqual(expected, date_ts) - self.assertEqual(date_ts, parsed) + self.assertEqual( + float(dt.strftime('%s.%f')), parse_timestamp(trusty_stamp)) def test_parse_timestamp_handles_syslog_adding_year(self): """Syslog timestamps lack a year. Add year and properly parse.""" @@ -39,17 +27,9 @@ class TestParseTimestamp(CiTestCase): # convert stamp ourselves by adding the missing year value year = datetime.now().year dt = datetime.strptime(syslog_stamp + " " + str(year), syslog_fmt) - expected = float(dt.strftime('%s.%f')) - parsed = parse_timestamp(syslog_stamp) - - # use date(1) - out, _ = subp(['date', '+%s.%3N', '-d', syslog_stamp]) - timestamp = out.strip() - date_ts = float(timestamp) - - self.assertEqual(expected, parsed) - self.assertEqual(expected, date_ts) - self.assertEqual(date_ts, parsed) + self.assertEqual( + float(dt.strftime('%s.%f')), + parse_timestamp(syslog_stamp)) def test_parse_timestamp_handles_journalctl_format_adding_year(self): """Journalctl precise timestamps lack a year. Add year and parse.""" @@ -59,37 +39,22 @@ class TestParseTimestamp(CiTestCase): # convert stamp ourselves by adding the missing year value year = datetime.now().year dt = datetime.strptime(journal_stamp + " " + str(year), journal_fmt) - expected = float(dt.strftime('%s.%f')) - parsed = parse_timestamp(journal_stamp) - - # use date(1) - out, _ = subp(['date', '+%s.%6N', '-d', journal_stamp]) - timestamp = out.strip() - date_ts = float(timestamp) - - self.assertEqual(expected, parsed) - self.assertEqual(expected, date_ts) - self.assertEqual(date_ts, parsed) + self.assertEqual( + float(dt.strftime('%s.%f')), parse_timestamp(journal_stamp)) + @skipIf(not which("date"), "'date' command not available.") def test_parse_unexpected_timestamp_format_with_date_command(self): - """Dump sends unexpected timestamp formats to data for processing.""" + """Dump sends unexpected timestamp formats to date for processing.""" new_fmt = '%H:%M %m/%d %Y' new_stamp = '17:15 08/08' - # convert stamp ourselves by adding the missing year value year = datetime.now().year dt = datetime.strptime(new_stamp + " " + str(year), new_fmt) - expected = float(dt.strftime('%s.%f')) - parsed = parse_timestamp(new_stamp) # use date(1) - out, _ = subp(['date', '+%s.%6N', '-d', new_stamp]) - timestamp = out.strip() - date_ts = float(timestamp) - - self.assertEqual(expected, parsed) - self.assertEqual(expected, date_ts) - self.assertEqual(date_ts, parsed) + with self.allow_subp(["date"]): + self.assertEqual( + float(dt.strftime('%s.%f')), parse_timestamp(new_stamp)) class TestParseCILogLine(CiTestCase): @@ -135,7 +100,9 @@ class TestParseCILogLine(CiTestCase): 'timestamp': timestamp} self.assertEqual(expected, parse_ci_logline(line)) - def test_parse_logline_returns_event_for_finish_events(self): + @mock.patch("cloudinit.analyze.dump.parse_timestamp_from_date") + def test_parse_logline_returns_event_for_finish_events(self, + m_parse_from_date): """parse_ci_logline returns a finish event for a parsed log line.""" line = ('2016-08-30 21:53:25.972325+00:00 y1 [CLOUDINIT]' ' handlers.py[DEBUG]: finish: modules-final: SUCCESS: running' @@ -147,7 +114,10 @@ class TestParseCILogLine(CiTestCase): 'origin': 'cloudinit', 'result': 'SUCCESS', 'timestamp': 1472594005.972} + m_parse_from_date.return_value = "1472594005.972" self.assertEqual(expected, parse_ci_logline(line)) + m_parse_from_date.assert_has_calls( + [mock.call("2016-08-30 21:53:25.972325+00:00")]) SAMPLE_LOGS = dedent("""\ @@ -162,10 +132,16 @@ Nov 03 06:51:06.074410 x2 cloud-init[106]: [CLOUDINIT] util.py[DEBUG]:\ class TestDumpEvents(CiTestCase): maxDiff = None - def test_dump_events_with_rawdata(self): + @mock.patch("cloudinit.analyze.dump.parse_timestamp_from_date") + def test_dump_events_with_rawdata(self, m_parse_from_date): """Rawdata is split and parsed into a tuple of events and data""" + m_parse_from_date.return_value = "1472594005.972" events, data = dump_events(rawdata=SAMPLE_LOGS) expected_data = SAMPLE_LOGS.splitlines() + self.assertEqual( + [mock.call("2016-08-30 21:53:25.972325+00:00")], + m_parse_from_date.call_args_list) + self.assertEqual(expected_data, data) year = datetime.now().year dt1 = datetime.strptime( 'Nov 03 06:51:06.074410 %d' % year, '%b %d %H:%M:%S.%f %Y') @@ -183,12 +159,14 @@ class TestDumpEvents(CiTestCase): 'result': 'SUCCESS', 'timestamp': 1472594005.972}] self.assertEqual(expected_events, events) - self.assertEqual(expected_data, data) - def test_dump_events_with_cisource(self): + @mock.patch("cloudinit.analyze.dump.parse_timestamp_from_date") + def test_dump_events_with_cisource(self, m_parse_from_date): """Cisource file is read and parsed into a tuple of events and data.""" tmpfile = self.tmp_path('logfile') write_file(tmpfile, SAMPLE_LOGS) + m_parse_from_date.return_value = 1472594005.972 + events, data = dump_events(cisource=open(tmpfile)) year = datetime.now().year dt1 = datetime.strptime( @@ -208,3 +186,5 @@ class TestDumpEvents(CiTestCase): 'timestamp': 1472594005.972}] self.assertEqual(expected_events, events) self.assertEqual(SAMPLE_LOGS.splitlines(), [d.strip() for d in data]) + m_parse_from_date.assert_has_calls( + [mock.call("2016-08-30 21:53:25.972325+00:00")]) diff --git a/cloudinit/cmd/tests/test_status.py b/cloudinit/cmd/tests/test_status.py index 37a89936..aded8580 100644 --- a/cloudinit/cmd/tests/test_status.py +++ b/cloudinit/cmd/tests/test_status.py @@ -39,7 +39,8 @@ class TestStatus(CiTestCase): ensure_file(self.disable_file) # Create the ignored disable file (is_disabled, reason) = wrap_and_call( 'cloudinit.cmd.status', - {'uses_systemd': False}, + {'uses_systemd': False, + 'get_cmdline': "root=/dev/my-root not-important"}, status._is_cloudinit_disabled, self.disable_file, self.paths) self.assertFalse( is_disabled, 'expected enabled cloud-init on sysvinit') @@ -50,7 +51,8 @@ class TestStatus(CiTestCase): ensure_file(self.disable_file) # Create observed disable file (is_disabled, reason) = wrap_and_call( 'cloudinit.cmd.status', - {'uses_systemd': True}, + {'uses_systemd': True, + 'get_cmdline': "root=/dev/my-root not-important"}, status._is_cloudinit_disabled, self.disable_file, self.paths) self.assertTrue(is_disabled, 'expected disabled cloud-init') self.assertEqual( diff --git a/cloudinit/config/tests/test_snap.py b/cloudinit/config/tests/test_snap.py index 34c80f1e..3c472891 100644 --- a/cloudinit/config/tests/test_snap.py +++ b/cloudinit/config/tests/test_snap.py @@ -162,6 +162,7 @@ class TestAddAssertions(CiTestCase): class TestRunCommands(CiTestCase): with_logs = True + allowed_subp = [CiTestCase.SUBP_SHELL_TRUE] def setUp(self): super(TestRunCommands, self).setUp() @@ -424,8 +425,10 @@ class TestHandle(CiTestCase): 'snap': {'commands': ['echo "HI" >> %s' % outfile, 'echo "MOM" >> %s' % outfile]}} mock_path = 'cloudinit.config.cc_snap.sys.stderr' - with mock.patch(mock_path, new_callable=StringIO): - handle('snap', cfg=cfg, cloud=None, log=self.logger, args=None) + with self.allow_subp([CiTestCase.SUBP_SHELL_TRUE]): + with mock.patch(mock_path, new_callable=StringIO): + handle('snap', cfg=cfg, cloud=None, log=self.logger, args=None) + self.assertEqual('HI\nMOM\n', util.load_file(outfile)) @mock.patch('cloudinit.config.cc_snap.util.subp') diff --git a/cloudinit/config/tests/test_ubuntu_advantage.py b/cloudinit/config/tests/test_ubuntu_advantage.py index f1beeff8..b7cf9bee 100644 --- a/cloudinit/config/tests/test_ubuntu_advantage.py +++ b/cloudinit/config/tests/test_ubuntu_advantage.py @@ -23,6 +23,7 @@ class FakeCloud(object): class TestRunCommands(CiTestCase): with_logs = True + allowed_subp = [CiTestCase.SUBP_SHELL_TRUE] def setUp(self): super(TestRunCommands, self).setUp() @@ -234,8 +235,10 @@ class TestHandle(CiTestCase): 'ubuntu-advantage': {'commands': ['echo "HI" >> %s' % outfile, 'echo "MOM" >> %s' % outfile]}} mock_path = '%s.sys.stderr' % MPATH - with mock.patch(mock_path, new_callable=StringIO): - handle('nomatter', cfg=cfg, cloud=None, log=self.logger, args=None) + with self.allow_subp([CiTestCase.SUBP_SHELL_TRUE]): + with mock.patch(mock_path, new_callable=StringIO): + handle('nomatter', cfg=cfg, cloud=None, log=self.logger, + args=None) self.assertEqual('HI\nMOM\n', util.load_file(outfile)) diff --git a/cloudinit/net/tests/test_init.py b/cloudinit/net/tests/test_init.py index 5c017d15..8b444f14 100644 --- a/cloudinit/net/tests/test_init.py +++ b/cloudinit/net/tests/test_init.py @@ -199,6 +199,8 @@ class TestGenerateFallbackConfig(CiTestCase): self.sysdir = self.tmp_dir() + '/' self.m_sys_path.return_value = self.sysdir self.addCleanup(sys_mock.stop) + self.add_patch('cloudinit.net.util.is_container', 'm_is_container', + return_value=False) self.add_patch('cloudinit.net.util.udevadm_settle', 'm_settle') def test_generate_fallback_finds_connected_eth_with_mac(self): diff --git a/cloudinit/sources/DataSourceAltCloud.py b/cloudinit/sources/DataSourceAltCloud.py index 24fd65ff..8cd312d0 100644 --- a/cloudinit/sources/DataSourceAltCloud.py +++ b/cloudinit/sources/DataSourceAltCloud.py @@ -181,27 +181,18 @@ class DataSourceAltCloud(sources.DataSource): # modprobe floppy try: - cmd = CMD_PROBE_FLOPPY - (cmd_out, _err) = util.subp(cmd) - LOG.debug('Command: %s\nOutput%s', ' '.join(cmd), cmd_out) + modprobe_floppy() except ProcessExecutionError as e: - util.logexc(LOG, 'Failed command: %s\n%s', ' '.join(cmd), e) - return False - except OSError as e: - util.logexc(LOG, 'Failed command: %s\n%s', ' '.join(cmd), e) + util.logexc(LOG, 'Failed modprobe: %s', e) return False floppy_dev = '/dev/fd0' # udevadm settle for floppy device try: - (cmd_out, _err) = util.udevadm_settle(exists=floppy_dev, timeout=5) - LOG.debug('Command: %s\nOutput%s', ' '.join(cmd), cmd_out) - except ProcessExecutionError as e: - util.logexc(LOG, 'Failed command: %s\n%s', ' '.join(cmd), e) - return False - except OSError as e: - util.logexc(LOG, 'Failed command: %s\n%s', ' '.join(cmd), e) + util.udevadm_settle(exists=floppy_dev, timeout=5) + except (ProcessExecutionError, OSError) as e: + util.logexc(LOG, 'Failed udevadm_settle: %s\n', e) return False try: @@ -258,6 +249,11 @@ class DataSourceAltCloud(sources.DataSource): return False +def modprobe_floppy(): + out, _err = util.subp(CMD_PROBE_FLOPPY) + LOG.debug('Command: %s\nOutput%s', ' '.join(CMD_PROBE_FLOPPY), out) + + # Used to match classes to dependencies # Source DataSourceAltCloud does not really depend on networking. # In the future 'dsmode' like behavior can be added to offer user diff --git a/cloudinit/sources/DataSourceSmartOS.py b/cloudinit/sources/DataSourceSmartOS.py index ad8cfb9c..593ac91a 100644 --- a/cloudinit/sources/DataSourceSmartOS.py +++ b/cloudinit/sources/DataSourceSmartOS.py @@ -683,6 +683,18 @@ def jmc_client_factory( raise ValueError("Unknown value for smartos_type: %s" % smartos_type) +def identify_file(content_f): + cmd = ["file", "--brief", "--mime-type", content_f] + f_type = None + try: + (f_type, _err) = util.subp(cmd) + LOG.debug("script %s mime type is %s", content_f, f_type) + except util.ProcessExecutionError as e: + util.logexc( + LOG, ("Failed to identify script type for %s" % content_f, e)) + return None if f_type is None else f_type.strip() + + def write_boot_content(content, content_f, link=None, shebang=False, mode=0o400): """ @@ -715,18 +727,11 @@ def write_boot_content(content, content_f, link=None, shebang=False, util.write_file(content_f, content, mode=mode) if shebang and not content.startswith("#!"): - try: - cmd = ["file", "--brief", "--mime-type", content_f] - (f_type, _err) = util.subp(cmd) - LOG.debug("script %s mime type is %s", content_f, f_type) - if f_type.strip() == "text/plain": - new_content = "\n".join(["#!/bin/bash", content]) - util.write_file(content_f, new_content, mode=mode) - LOG.debug("added shebang to file %s", content_f) - - except Exception as e: - util.logexc(LOG, ("Failed to identify script type for %s" % - content_f, e)) + f_type = identify_file(content_f) + if f_type == "text/plain": + util.write_file( + content_f, "\n".join(["#!/bin/bash", content]), mode=mode) + LOG.debug("added shebang to file %s", content_f) if link: try: diff --git a/cloudinit/tests/helpers.py b/cloudinit/tests/helpers.py index 9a21426e..e7e4182f 100644 --- a/cloudinit/tests/helpers.py +++ b/cloudinit/tests/helpers.py @@ -33,6 +33,8 @@ from cloudinit import helpers as ch from cloudinit.sources import DataSourceNone from cloudinit import util +_real_subp = util.subp + # Used for skipping tests SkipTest = unittest2.SkipTest skipIf = unittest2.skipIf @@ -143,6 +145,17 @@ class CiTestCase(TestCase): # Subclass overrides for specific test behavior # Whether or not a unit test needs logfile setup with_logs = False + allowed_subp = False + SUBP_SHELL_TRUE = "shell=true" + + @contextmanager + def allow_subp(self, allowed_subp): + orig = self.allowed_subp + try: + self.allowed_subp = allowed_subp + yield + finally: + self.allowed_subp = orig def setUp(self): super(CiTestCase, self).setUp() @@ -155,11 +168,41 @@ class CiTestCase(TestCase): handler.setFormatter(formatter) self.old_handlers = self.logger.handlers self.logger.handlers = [handler] + if self.allowed_subp is True: + util.subp = _real_subp + else: + util.subp = self._fake_subp + + def _fake_subp(self, *args, **kwargs): + if 'args' in kwargs: + cmd = kwargs['args'] + else: + cmd = args[0] + + if not isinstance(cmd, six.string_types): + cmd = cmd[0] + pass_through = False + if not isinstance(self.allowed_subp, (list, bool)): + raise TypeError("self.allowed_subp supports list or bool.") + if isinstance(self.allowed_subp, bool): + pass_through = self.allowed_subp + else: + pass_through = ( + (cmd in self.allowed_subp) or + (self.SUBP_SHELL_TRUE in self.allowed_subp and + kwargs.get('shell'))) + if pass_through: + return _real_subp(*args, **kwargs) + raise Exception( + "called subp. set self.allowed_subp=True to allow\n subp(%s)" % + ', '.join([str(repr(a)) for a in args] + + ["%s=%s" % (k, repr(v)) for k, v in kwargs.items()])) def tearDown(self): if self.with_logs: # Remove the handler we setup logging.getLogger().handlers = self.old_handlers + util.subp = _real_subp super(CiTestCase, self).tearDown() def tmp_dir(self, dir=None, cleanup=True): |