diff options
Diffstat (limited to 'cloudinit')
-rw-r--r-- | cloudinit/analyze/tests/test_dump.py | 86 | ||||
-rw-r--r-- | cloudinit/cmd/tests/test_status.py | 6 | ||||
-rw-r--r-- | cloudinit/config/tests/test_snap.py | 7 | ||||
-rw-r--r-- | cloudinit/config/tests/test_ubuntu_advantage.py | 7 | ||||
-rw-r--r-- | cloudinit/net/tests/test_init.py | 2 | ||||
-rw-r--r-- | cloudinit/sources/DataSourceAltCloud.py | 24 | ||||
-rw-r--r-- | cloudinit/sources/DataSourceSmartOS.py | 29 | ||||
-rw-r--r-- | cloudinit/tests/helpers.py | 43 |
8 files changed, 119 insertions, 85 deletions
diff --git a/cloudinit/analyze/tests/test_dump.py b/cloudinit/analyze/tests/test_dump.py index f4c42841..db2a667b 100644 --- a/cloudinit/analyze/tests/test_dump.py +++ b/cloudinit/analyze/tests/test_dump.py @@ -5,8 +5,8 @@ from textwrap import dedent from cloudinit.analyze.dump import ( dump_events, parse_ci_logline, parse_timestamp) -from cloudinit.util import subp, write_file -from cloudinit.tests.helpers import CiTestCase +from cloudinit.util import which, write_file +from cloudinit.tests.helpers import CiTestCase, mock, skipIf class TestParseTimestamp(CiTestCase): @@ -15,21 +15,9 @@ class TestParseTimestamp(CiTestCase): """Logs with cloud-init detailed formats will be properly parsed.""" trusty_fmt = '%Y-%m-%d %H:%M:%S,%f' trusty_stamp = '2016-09-12 14:39:20,839' - - parsed = parse_timestamp(trusty_stamp) - - # convert ourselves dt = datetime.strptime(trusty_stamp, trusty_fmt) - expected = float(dt.strftime('%s.%f')) - - # use date(1) - out, _err = subp(['date', '+%s.%3N', '-d', trusty_stamp]) - timestamp = out.strip() - date_ts = float(timestamp) - - self.assertEqual(expected, parsed) - self.assertEqual(expected, date_ts) - self.assertEqual(date_ts, parsed) + self.assertEqual( + float(dt.strftime('%s.%f')), parse_timestamp(trusty_stamp)) def test_parse_timestamp_handles_syslog_adding_year(self): """Syslog timestamps lack a year. Add year and properly parse.""" @@ -39,17 +27,9 @@ class TestParseTimestamp(CiTestCase): # convert stamp ourselves by adding the missing year value year = datetime.now().year dt = datetime.strptime(syslog_stamp + " " + str(year), syslog_fmt) - expected = float(dt.strftime('%s.%f')) - parsed = parse_timestamp(syslog_stamp) - - # use date(1) - out, _ = subp(['date', '+%s.%3N', '-d', syslog_stamp]) - timestamp = out.strip() - date_ts = float(timestamp) - - self.assertEqual(expected, parsed) - self.assertEqual(expected, date_ts) - self.assertEqual(date_ts, parsed) + self.assertEqual( + float(dt.strftime('%s.%f')), + parse_timestamp(syslog_stamp)) def test_parse_timestamp_handles_journalctl_format_adding_year(self): """Journalctl precise timestamps lack a year. Add year and parse.""" @@ -59,37 +39,22 @@ class TestParseTimestamp(CiTestCase): # convert stamp ourselves by adding the missing year value year = datetime.now().year dt = datetime.strptime(journal_stamp + " " + str(year), journal_fmt) - expected = float(dt.strftime('%s.%f')) - parsed = parse_timestamp(journal_stamp) - - # use date(1) - out, _ = subp(['date', '+%s.%6N', '-d', journal_stamp]) - timestamp = out.strip() - date_ts = float(timestamp) - - self.assertEqual(expected, parsed) - self.assertEqual(expected, date_ts) - self.assertEqual(date_ts, parsed) + self.assertEqual( + float(dt.strftime('%s.%f')), parse_timestamp(journal_stamp)) + @skipIf(not which("date"), "'date' command not available.") def test_parse_unexpected_timestamp_format_with_date_command(self): - """Dump sends unexpected timestamp formats to data for processing.""" + """Dump sends unexpected timestamp formats to date for processing.""" new_fmt = '%H:%M %m/%d %Y' new_stamp = '17:15 08/08' - # convert stamp ourselves by adding the missing year value year = datetime.now().year dt = datetime.strptime(new_stamp + " " + str(year), new_fmt) - expected = float(dt.strftime('%s.%f')) - parsed = parse_timestamp(new_stamp) # use date(1) - out, _ = subp(['date', '+%s.%6N', '-d', new_stamp]) - timestamp = out.strip() - date_ts = float(timestamp) - - self.assertEqual(expected, parsed) - self.assertEqual(expected, date_ts) - self.assertEqual(date_ts, parsed) + with self.allow_subp(["date"]): + self.assertEqual( + float(dt.strftime('%s.%f')), parse_timestamp(new_stamp)) class TestParseCILogLine(CiTestCase): @@ -135,7 +100,9 @@ class TestParseCILogLine(CiTestCase): 'timestamp': timestamp} self.assertEqual(expected, parse_ci_logline(line)) - def test_parse_logline_returns_event_for_finish_events(self): + @mock.patch("cloudinit.analyze.dump.parse_timestamp_from_date") + def test_parse_logline_returns_event_for_finish_events(self, + m_parse_from_date): """parse_ci_logline returns a finish event for a parsed log line.""" line = ('2016-08-30 21:53:25.972325+00:00 y1 [CLOUDINIT]' ' handlers.py[DEBUG]: finish: modules-final: SUCCESS: running' @@ -147,7 +114,10 @@ class TestParseCILogLine(CiTestCase): 'origin': 'cloudinit', 'result': 'SUCCESS', 'timestamp': 1472594005.972} + m_parse_from_date.return_value = "1472594005.972" self.assertEqual(expected, parse_ci_logline(line)) + m_parse_from_date.assert_has_calls( + [mock.call("2016-08-30 21:53:25.972325+00:00")]) SAMPLE_LOGS = dedent("""\ @@ -162,10 +132,16 @@ Nov 03 06:51:06.074410 x2 cloud-init[106]: [CLOUDINIT] util.py[DEBUG]:\ class TestDumpEvents(CiTestCase): maxDiff = None - def test_dump_events_with_rawdata(self): + @mock.patch("cloudinit.analyze.dump.parse_timestamp_from_date") + def test_dump_events_with_rawdata(self, m_parse_from_date): """Rawdata is split and parsed into a tuple of events and data""" + m_parse_from_date.return_value = "1472594005.972" events, data = dump_events(rawdata=SAMPLE_LOGS) expected_data = SAMPLE_LOGS.splitlines() + self.assertEqual( + [mock.call("2016-08-30 21:53:25.972325+00:00")], + m_parse_from_date.call_args_list) + self.assertEqual(expected_data, data) year = datetime.now().year dt1 = datetime.strptime( 'Nov 03 06:51:06.074410 %d' % year, '%b %d %H:%M:%S.%f %Y') @@ -183,12 +159,14 @@ class TestDumpEvents(CiTestCase): 'result': 'SUCCESS', 'timestamp': 1472594005.972}] self.assertEqual(expected_events, events) - self.assertEqual(expected_data, data) - def test_dump_events_with_cisource(self): + @mock.patch("cloudinit.analyze.dump.parse_timestamp_from_date") + def test_dump_events_with_cisource(self, m_parse_from_date): """Cisource file is read and parsed into a tuple of events and data.""" tmpfile = self.tmp_path('logfile') write_file(tmpfile, SAMPLE_LOGS) + m_parse_from_date.return_value = 1472594005.972 + events, data = dump_events(cisource=open(tmpfile)) year = datetime.now().year dt1 = datetime.strptime( @@ -208,3 +186,5 @@ class TestDumpEvents(CiTestCase): 'timestamp': 1472594005.972}] self.assertEqual(expected_events, events) self.assertEqual(SAMPLE_LOGS.splitlines(), [d.strip() for d in data]) + m_parse_from_date.assert_has_calls( + [mock.call("2016-08-30 21:53:25.972325+00:00")]) diff --git a/cloudinit/cmd/tests/test_status.py b/cloudinit/cmd/tests/test_status.py index 37a89936..aded8580 100644 --- a/cloudinit/cmd/tests/test_status.py +++ b/cloudinit/cmd/tests/test_status.py @@ -39,7 +39,8 @@ class TestStatus(CiTestCase): ensure_file(self.disable_file) # Create the ignored disable file (is_disabled, reason) = wrap_and_call( 'cloudinit.cmd.status', - {'uses_systemd': False}, + {'uses_systemd': False, + 'get_cmdline': "root=/dev/my-root not-important"}, status._is_cloudinit_disabled, self.disable_file, self.paths) self.assertFalse( is_disabled, 'expected enabled cloud-init on sysvinit') @@ -50,7 +51,8 @@ class TestStatus(CiTestCase): ensure_file(self.disable_file) # Create observed disable file (is_disabled, reason) = wrap_and_call( 'cloudinit.cmd.status', - {'uses_systemd': True}, + {'uses_systemd': True, + 'get_cmdline': "root=/dev/my-root not-important"}, status._is_cloudinit_disabled, self.disable_file, self.paths) self.assertTrue(is_disabled, 'expected disabled cloud-init') self.assertEqual( diff --git a/cloudinit/config/tests/test_snap.py b/cloudinit/config/tests/test_snap.py index 34c80f1e..3c472891 100644 --- a/cloudinit/config/tests/test_snap.py +++ b/cloudinit/config/tests/test_snap.py @@ -162,6 +162,7 @@ class TestAddAssertions(CiTestCase): class TestRunCommands(CiTestCase): with_logs = True + allowed_subp = [CiTestCase.SUBP_SHELL_TRUE] def setUp(self): super(TestRunCommands, self).setUp() @@ -424,8 +425,10 @@ class TestHandle(CiTestCase): 'snap': {'commands': ['echo "HI" >> %s' % outfile, 'echo "MOM" >> %s' % outfile]}} mock_path = 'cloudinit.config.cc_snap.sys.stderr' - with mock.patch(mock_path, new_callable=StringIO): - handle('snap', cfg=cfg, cloud=None, log=self.logger, args=None) + with self.allow_subp([CiTestCase.SUBP_SHELL_TRUE]): + with mock.patch(mock_path, new_callable=StringIO): + handle('snap', cfg=cfg, cloud=None, log=self.logger, args=None) + self.assertEqual('HI\nMOM\n', util.load_file(outfile)) @mock.patch('cloudinit.config.cc_snap.util.subp') diff --git a/cloudinit/config/tests/test_ubuntu_advantage.py b/cloudinit/config/tests/test_ubuntu_advantage.py index f1beeff8..b7cf9bee 100644 --- a/cloudinit/config/tests/test_ubuntu_advantage.py +++ b/cloudinit/config/tests/test_ubuntu_advantage.py @@ -23,6 +23,7 @@ class FakeCloud(object): class TestRunCommands(CiTestCase): with_logs = True + allowed_subp = [CiTestCase.SUBP_SHELL_TRUE] def setUp(self): super(TestRunCommands, self).setUp() @@ -234,8 +235,10 @@ class TestHandle(CiTestCase): 'ubuntu-advantage': {'commands': ['echo "HI" >> %s' % outfile, 'echo "MOM" >> %s' % outfile]}} mock_path = '%s.sys.stderr' % MPATH - with mock.patch(mock_path, new_callable=StringIO): - handle('nomatter', cfg=cfg, cloud=None, log=self.logger, args=None) + with self.allow_subp([CiTestCase.SUBP_SHELL_TRUE]): + with mock.patch(mock_path, new_callable=StringIO): + handle('nomatter', cfg=cfg, cloud=None, log=self.logger, + args=None) self.assertEqual('HI\nMOM\n', util.load_file(outfile)) diff --git a/cloudinit/net/tests/test_init.py b/cloudinit/net/tests/test_init.py index 5c017d15..8b444f14 100644 --- a/cloudinit/net/tests/test_init.py +++ b/cloudinit/net/tests/test_init.py @@ -199,6 +199,8 @@ class TestGenerateFallbackConfig(CiTestCase): self.sysdir = self.tmp_dir() + '/' self.m_sys_path.return_value = self.sysdir self.addCleanup(sys_mock.stop) + self.add_patch('cloudinit.net.util.is_container', 'm_is_container', + return_value=False) self.add_patch('cloudinit.net.util.udevadm_settle', 'm_settle') def test_generate_fallback_finds_connected_eth_with_mac(self): diff --git a/cloudinit/sources/DataSourceAltCloud.py b/cloudinit/sources/DataSourceAltCloud.py index 24fd65ff..8cd312d0 100644 --- a/cloudinit/sources/DataSourceAltCloud.py +++ b/cloudinit/sources/DataSourceAltCloud.py @@ -181,27 +181,18 @@ class DataSourceAltCloud(sources.DataSource): # modprobe floppy try: - cmd = CMD_PROBE_FLOPPY - (cmd_out, _err) = util.subp(cmd) - LOG.debug('Command: %s\nOutput%s', ' '.join(cmd), cmd_out) + modprobe_floppy() except ProcessExecutionError as e: - util.logexc(LOG, 'Failed command: %s\n%s', ' '.join(cmd), e) - return False - except OSError as e: - util.logexc(LOG, 'Failed command: %s\n%s', ' '.join(cmd), e) + util.logexc(LOG, 'Failed modprobe: %s', e) return False floppy_dev = '/dev/fd0' # udevadm settle for floppy device try: - (cmd_out, _err) = util.udevadm_settle(exists=floppy_dev, timeout=5) - LOG.debug('Command: %s\nOutput%s', ' '.join(cmd), cmd_out) - except ProcessExecutionError as e: - util.logexc(LOG, 'Failed command: %s\n%s', ' '.join(cmd), e) - return False - except OSError as e: - util.logexc(LOG, 'Failed command: %s\n%s', ' '.join(cmd), e) + util.udevadm_settle(exists=floppy_dev, timeout=5) + except (ProcessExecutionError, OSError) as e: + util.logexc(LOG, 'Failed udevadm_settle: %s\n', e) return False try: @@ -258,6 +249,11 @@ class DataSourceAltCloud(sources.DataSource): return False +def modprobe_floppy(): + out, _err = util.subp(CMD_PROBE_FLOPPY) + LOG.debug('Command: %s\nOutput%s', ' '.join(CMD_PROBE_FLOPPY), out) + + # Used to match classes to dependencies # Source DataSourceAltCloud does not really depend on networking. # In the future 'dsmode' like behavior can be added to offer user diff --git a/cloudinit/sources/DataSourceSmartOS.py b/cloudinit/sources/DataSourceSmartOS.py index ad8cfb9c..593ac91a 100644 --- a/cloudinit/sources/DataSourceSmartOS.py +++ b/cloudinit/sources/DataSourceSmartOS.py @@ -683,6 +683,18 @@ def jmc_client_factory( raise ValueError("Unknown value for smartos_type: %s" % smartos_type) +def identify_file(content_f): + cmd = ["file", "--brief", "--mime-type", content_f] + f_type = None + try: + (f_type, _err) = util.subp(cmd) + LOG.debug("script %s mime type is %s", content_f, f_type) + except util.ProcessExecutionError as e: + util.logexc( + LOG, ("Failed to identify script type for %s" % content_f, e)) + return None if f_type is None else f_type.strip() + + def write_boot_content(content, content_f, link=None, shebang=False, mode=0o400): """ @@ -715,18 +727,11 @@ def write_boot_content(content, content_f, link=None, shebang=False, util.write_file(content_f, content, mode=mode) if shebang and not content.startswith("#!"): - try: - cmd = ["file", "--brief", "--mime-type", content_f] - (f_type, _err) = util.subp(cmd) - LOG.debug("script %s mime type is %s", content_f, f_type) - if f_type.strip() == "text/plain": - new_content = "\n".join(["#!/bin/bash", content]) - util.write_file(content_f, new_content, mode=mode) - LOG.debug("added shebang to file %s", content_f) - - except Exception as e: - util.logexc(LOG, ("Failed to identify script type for %s" % - content_f, e)) + f_type = identify_file(content_f) + if f_type == "text/plain": + util.write_file( + content_f, "\n".join(["#!/bin/bash", content]), mode=mode) + LOG.debug("added shebang to file %s", content_f) if link: try: diff --git a/cloudinit/tests/helpers.py b/cloudinit/tests/helpers.py index 9a21426e..e7e4182f 100644 --- a/cloudinit/tests/helpers.py +++ b/cloudinit/tests/helpers.py @@ -33,6 +33,8 @@ from cloudinit import helpers as ch from cloudinit.sources import DataSourceNone from cloudinit import util +_real_subp = util.subp + # Used for skipping tests SkipTest = unittest2.SkipTest skipIf = unittest2.skipIf @@ -143,6 +145,17 @@ class CiTestCase(TestCase): # Subclass overrides for specific test behavior # Whether or not a unit test needs logfile setup with_logs = False + allowed_subp = False + SUBP_SHELL_TRUE = "shell=true" + + @contextmanager + def allow_subp(self, allowed_subp): + orig = self.allowed_subp + try: + self.allowed_subp = allowed_subp + yield + finally: + self.allowed_subp = orig def setUp(self): super(CiTestCase, self).setUp() @@ -155,11 +168,41 @@ class CiTestCase(TestCase): handler.setFormatter(formatter) self.old_handlers = self.logger.handlers self.logger.handlers = [handler] + if self.allowed_subp is True: + util.subp = _real_subp + else: + util.subp = self._fake_subp + + def _fake_subp(self, *args, **kwargs): + if 'args' in kwargs: + cmd = kwargs['args'] + else: + cmd = args[0] + + if not isinstance(cmd, six.string_types): + cmd = cmd[0] + pass_through = False + if not isinstance(self.allowed_subp, (list, bool)): + raise TypeError("self.allowed_subp supports list or bool.") + if isinstance(self.allowed_subp, bool): + pass_through = self.allowed_subp + else: + pass_through = ( + (cmd in self.allowed_subp) or + (self.SUBP_SHELL_TRUE in self.allowed_subp and + kwargs.get('shell'))) + if pass_through: + return _real_subp(*args, **kwargs) + raise Exception( + "called subp. set self.allowed_subp=True to allow\n subp(%s)" % + ', '.join([str(repr(a)) for a in args] + + ["%s=%s" % (k, repr(v)) for k, v in kwargs.items()])) def tearDown(self): if self.with_logs: # Remove the handler we setup logging.getLogger().handlers = self.old_handlers + util.subp = _real_subp super(CiTestCase, self).tearDown() def tmp_dir(self, dir=None, cleanup=True): |