summaryrefslogtreecommitdiff
path: root/cloudinit
diff options
context:
space:
mode:
Diffstat (limited to 'cloudinit')
-rw-r--r--cloudinit/analyze/tests/test_dump.py86
-rw-r--r--cloudinit/cmd/tests/test_status.py6
-rw-r--r--cloudinit/config/tests/test_snap.py7
-rw-r--r--cloudinit/config/tests/test_ubuntu_advantage.py7
-rw-r--r--cloudinit/net/tests/test_init.py2
-rw-r--r--cloudinit/sources/DataSourceAltCloud.py24
-rw-r--r--cloudinit/sources/DataSourceSmartOS.py29
-rw-r--r--cloudinit/tests/helpers.py43
8 files changed, 119 insertions, 85 deletions
diff --git a/cloudinit/analyze/tests/test_dump.py b/cloudinit/analyze/tests/test_dump.py
index f4c42841..db2a667b 100644
--- a/cloudinit/analyze/tests/test_dump.py
+++ b/cloudinit/analyze/tests/test_dump.py
@@ -5,8 +5,8 @@ from textwrap import dedent
from cloudinit.analyze.dump import (
dump_events, parse_ci_logline, parse_timestamp)
-from cloudinit.util import subp, write_file
-from cloudinit.tests.helpers import CiTestCase
+from cloudinit.util import which, write_file
+from cloudinit.tests.helpers import CiTestCase, mock, skipIf
class TestParseTimestamp(CiTestCase):
@@ -15,21 +15,9 @@ class TestParseTimestamp(CiTestCase):
"""Logs with cloud-init detailed formats will be properly parsed."""
trusty_fmt = '%Y-%m-%d %H:%M:%S,%f'
trusty_stamp = '2016-09-12 14:39:20,839'
-
- parsed = parse_timestamp(trusty_stamp)
-
- # convert ourselves
dt = datetime.strptime(trusty_stamp, trusty_fmt)
- expected = float(dt.strftime('%s.%f'))
-
- # use date(1)
- out, _err = subp(['date', '+%s.%3N', '-d', trusty_stamp])
- timestamp = out.strip()
- date_ts = float(timestamp)
-
- self.assertEqual(expected, parsed)
- self.assertEqual(expected, date_ts)
- self.assertEqual(date_ts, parsed)
+ self.assertEqual(
+ float(dt.strftime('%s.%f')), parse_timestamp(trusty_stamp))
def test_parse_timestamp_handles_syslog_adding_year(self):
"""Syslog timestamps lack a year. Add year and properly parse."""
@@ -39,17 +27,9 @@ class TestParseTimestamp(CiTestCase):
# convert stamp ourselves by adding the missing year value
year = datetime.now().year
dt = datetime.strptime(syslog_stamp + " " + str(year), syslog_fmt)
- expected = float(dt.strftime('%s.%f'))
- parsed = parse_timestamp(syslog_stamp)
-
- # use date(1)
- out, _ = subp(['date', '+%s.%3N', '-d', syslog_stamp])
- timestamp = out.strip()
- date_ts = float(timestamp)
-
- self.assertEqual(expected, parsed)
- self.assertEqual(expected, date_ts)
- self.assertEqual(date_ts, parsed)
+ self.assertEqual(
+ float(dt.strftime('%s.%f')),
+ parse_timestamp(syslog_stamp))
def test_parse_timestamp_handles_journalctl_format_adding_year(self):
"""Journalctl precise timestamps lack a year. Add year and parse."""
@@ -59,37 +39,22 @@ class TestParseTimestamp(CiTestCase):
# convert stamp ourselves by adding the missing year value
year = datetime.now().year
dt = datetime.strptime(journal_stamp + " " + str(year), journal_fmt)
- expected = float(dt.strftime('%s.%f'))
- parsed = parse_timestamp(journal_stamp)
-
- # use date(1)
- out, _ = subp(['date', '+%s.%6N', '-d', journal_stamp])
- timestamp = out.strip()
- date_ts = float(timestamp)
-
- self.assertEqual(expected, parsed)
- self.assertEqual(expected, date_ts)
- self.assertEqual(date_ts, parsed)
+ self.assertEqual(
+ float(dt.strftime('%s.%f')), parse_timestamp(journal_stamp))
+ @skipIf(not which("date"), "'date' command not available.")
def test_parse_unexpected_timestamp_format_with_date_command(self):
- """Dump sends unexpected timestamp formats to data for processing."""
+ """Dump sends unexpected timestamp formats to date for processing."""
new_fmt = '%H:%M %m/%d %Y'
new_stamp = '17:15 08/08'
-
# convert stamp ourselves by adding the missing year value
year = datetime.now().year
dt = datetime.strptime(new_stamp + " " + str(year), new_fmt)
- expected = float(dt.strftime('%s.%f'))
- parsed = parse_timestamp(new_stamp)
# use date(1)
- out, _ = subp(['date', '+%s.%6N', '-d', new_stamp])
- timestamp = out.strip()
- date_ts = float(timestamp)
-
- self.assertEqual(expected, parsed)
- self.assertEqual(expected, date_ts)
- self.assertEqual(date_ts, parsed)
+ with self.allow_subp(["date"]):
+ self.assertEqual(
+ float(dt.strftime('%s.%f')), parse_timestamp(new_stamp))
class TestParseCILogLine(CiTestCase):
@@ -135,7 +100,9 @@ class TestParseCILogLine(CiTestCase):
'timestamp': timestamp}
self.assertEqual(expected, parse_ci_logline(line))
- def test_parse_logline_returns_event_for_finish_events(self):
+ @mock.patch("cloudinit.analyze.dump.parse_timestamp_from_date")
+ def test_parse_logline_returns_event_for_finish_events(self,
+ m_parse_from_date):
"""parse_ci_logline returns a finish event for a parsed log line."""
line = ('2016-08-30 21:53:25.972325+00:00 y1 [CLOUDINIT]'
' handlers.py[DEBUG]: finish: modules-final: SUCCESS: running'
@@ -147,7 +114,10 @@ class TestParseCILogLine(CiTestCase):
'origin': 'cloudinit',
'result': 'SUCCESS',
'timestamp': 1472594005.972}
+ m_parse_from_date.return_value = "1472594005.972"
self.assertEqual(expected, parse_ci_logline(line))
+ m_parse_from_date.assert_has_calls(
+ [mock.call("2016-08-30 21:53:25.972325+00:00")])
SAMPLE_LOGS = dedent("""\
@@ -162,10 +132,16 @@ Nov 03 06:51:06.074410 x2 cloud-init[106]: [CLOUDINIT] util.py[DEBUG]:\
class TestDumpEvents(CiTestCase):
maxDiff = None
- def test_dump_events_with_rawdata(self):
+ @mock.patch("cloudinit.analyze.dump.parse_timestamp_from_date")
+ def test_dump_events_with_rawdata(self, m_parse_from_date):
"""Rawdata is split and parsed into a tuple of events and data"""
+ m_parse_from_date.return_value = "1472594005.972"
events, data = dump_events(rawdata=SAMPLE_LOGS)
expected_data = SAMPLE_LOGS.splitlines()
+ self.assertEqual(
+ [mock.call("2016-08-30 21:53:25.972325+00:00")],
+ m_parse_from_date.call_args_list)
+ self.assertEqual(expected_data, data)
year = datetime.now().year
dt1 = datetime.strptime(
'Nov 03 06:51:06.074410 %d' % year, '%b %d %H:%M:%S.%f %Y')
@@ -183,12 +159,14 @@ class TestDumpEvents(CiTestCase):
'result': 'SUCCESS',
'timestamp': 1472594005.972}]
self.assertEqual(expected_events, events)
- self.assertEqual(expected_data, data)
- def test_dump_events_with_cisource(self):
+ @mock.patch("cloudinit.analyze.dump.parse_timestamp_from_date")
+ def test_dump_events_with_cisource(self, m_parse_from_date):
"""Cisource file is read and parsed into a tuple of events and data."""
tmpfile = self.tmp_path('logfile')
write_file(tmpfile, SAMPLE_LOGS)
+ m_parse_from_date.return_value = 1472594005.972
+
events, data = dump_events(cisource=open(tmpfile))
year = datetime.now().year
dt1 = datetime.strptime(
@@ -208,3 +186,5 @@ class TestDumpEvents(CiTestCase):
'timestamp': 1472594005.972}]
self.assertEqual(expected_events, events)
self.assertEqual(SAMPLE_LOGS.splitlines(), [d.strip() for d in data])
+ m_parse_from_date.assert_has_calls(
+ [mock.call("2016-08-30 21:53:25.972325+00:00")])
diff --git a/cloudinit/cmd/tests/test_status.py b/cloudinit/cmd/tests/test_status.py
index 37a89936..aded8580 100644
--- a/cloudinit/cmd/tests/test_status.py
+++ b/cloudinit/cmd/tests/test_status.py
@@ -39,7 +39,8 @@ class TestStatus(CiTestCase):
ensure_file(self.disable_file) # Create the ignored disable file
(is_disabled, reason) = wrap_and_call(
'cloudinit.cmd.status',
- {'uses_systemd': False},
+ {'uses_systemd': False,
+ 'get_cmdline': "root=/dev/my-root not-important"},
status._is_cloudinit_disabled, self.disable_file, self.paths)
self.assertFalse(
is_disabled, 'expected enabled cloud-init on sysvinit')
@@ -50,7 +51,8 @@ class TestStatus(CiTestCase):
ensure_file(self.disable_file) # Create observed disable file
(is_disabled, reason) = wrap_and_call(
'cloudinit.cmd.status',
- {'uses_systemd': True},
+ {'uses_systemd': True,
+ 'get_cmdline': "root=/dev/my-root not-important"},
status._is_cloudinit_disabled, self.disable_file, self.paths)
self.assertTrue(is_disabled, 'expected disabled cloud-init')
self.assertEqual(
diff --git a/cloudinit/config/tests/test_snap.py b/cloudinit/config/tests/test_snap.py
index 34c80f1e..3c472891 100644
--- a/cloudinit/config/tests/test_snap.py
+++ b/cloudinit/config/tests/test_snap.py
@@ -162,6 +162,7 @@ class TestAddAssertions(CiTestCase):
class TestRunCommands(CiTestCase):
with_logs = True
+ allowed_subp = [CiTestCase.SUBP_SHELL_TRUE]
def setUp(self):
super(TestRunCommands, self).setUp()
@@ -424,8 +425,10 @@ class TestHandle(CiTestCase):
'snap': {'commands': ['echo "HI" >> %s' % outfile,
'echo "MOM" >> %s' % outfile]}}
mock_path = 'cloudinit.config.cc_snap.sys.stderr'
- with mock.patch(mock_path, new_callable=StringIO):
- handle('snap', cfg=cfg, cloud=None, log=self.logger, args=None)
+ with self.allow_subp([CiTestCase.SUBP_SHELL_TRUE]):
+ with mock.patch(mock_path, new_callable=StringIO):
+ handle('snap', cfg=cfg, cloud=None, log=self.logger, args=None)
+
self.assertEqual('HI\nMOM\n', util.load_file(outfile))
@mock.patch('cloudinit.config.cc_snap.util.subp')
diff --git a/cloudinit/config/tests/test_ubuntu_advantage.py b/cloudinit/config/tests/test_ubuntu_advantage.py
index f1beeff8..b7cf9bee 100644
--- a/cloudinit/config/tests/test_ubuntu_advantage.py
+++ b/cloudinit/config/tests/test_ubuntu_advantage.py
@@ -23,6 +23,7 @@ class FakeCloud(object):
class TestRunCommands(CiTestCase):
with_logs = True
+ allowed_subp = [CiTestCase.SUBP_SHELL_TRUE]
def setUp(self):
super(TestRunCommands, self).setUp()
@@ -234,8 +235,10 @@ class TestHandle(CiTestCase):
'ubuntu-advantage': {'commands': ['echo "HI" >> %s' % outfile,
'echo "MOM" >> %s' % outfile]}}
mock_path = '%s.sys.stderr' % MPATH
- with mock.patch(mock_path, new_callable=StringIO):
- handle('nomatter', cfg=cfg, cloud=None, log=self.logger, args=None)
+ with self.allow_subp([CiTestCase.SUBP_SHELL_TRUE]):
+ with mock.patch(mock_path, new_callable=StringIO):
+ handle('nomatter', cfg=cfg, cloud=None, log=self.logger,
+ args=None)
self.assertEqual('HI\nMOM\n', util.load_file(outfile))
diff --git a/cloudinit/net/tests/test_init.py b/cloudinit/net/tests/test_init.py
index 5c017d15..8b444f14 100644
--- a/cloudinit/net/tests/test_init.py
+++ b/cloudinit/net/tests/test_init.py
@@ -199,6 +199,8 @@ class TestGenerateFallbackConfig(CiTestCase):
self.sysdir = self.tmp_dir() + '/'
self.m_sys_path.return_value = self.sysdir
self.addCleanup(sys_mock.stop)
+ self.add_patch('cloudinit.net.util.is_container', 'm_is_container',
+ return_value=False)
self.add_patch('cloudinit.net.util.udevadm_settle', 'm_settle')
def test_generate_fallback_finds_connected_eth_with_mac(self):
diff --git a/cloudinit/sources/DataSourceAltCloud.py b/cloudinit/sources/DataSourceAltCloud.py
index 24fd65ff..8cd312d0 100644
--- a/cloudinit/sources/DataSourceAltCloud.py
+++ b/cloudinit/sources/DataSourceAltCloud.py
@@ -181,27 +181,18 @@ class DataSourceAltCloud(sources.DataSource):
# modprobe floppy
try:
- cmd = CMD_PROBE_FLOPPY
- (cmd_out, _err) = util.subp(cmd)
- LOG.debug('Command: %s\nOutput%s', ' '.join(cmd), cmd_out)
+ modprobe_floppy()
except ProcessExecutionError as e:
- util.logexc(LOG, 'Failed command: %s\n%s', ' '.join(cmd), e)
- return False
- except OSError as e:
- util.logexc(LOG, 'Failed command: %s\n%s', ' '.join(cmd), e)
+ util.logexc(LOG, 'Failed modprobe: %s', e)
return False
floppy_dev = '/dev/fd0'
# udevadm settle for floppy device
try:
- (cmd_out, _err) = util.udevadm_settle(exists=floppy_dev, timeout=5)
- LOG.debug('Command: %s\nOutput%s', ' '.join(cmd), cmd_out)
- except ProcessExecutionError as e:
- util.logexc(LOG, 'Failed command: %s\n%s', ' '.join(cmd), e)
- return False
- except OSError as e:
- util.logexc(LOG, 'Failed command: %s\n%s', ' '.join(cmd), e)
+ util.udevadm_settle(exists=floppy_dev, timeout=5)
+ except (ProcessExecutionError, OSError) as e:
+ util.logexc(LOG, 'Failed udevadm_settle: %s\n', e)
return False
try:
@@ -258,6 +249,11 @@ class DataSourceAltCloud(sources.DataSource):
return False
+def modprobe_floppy():
+ out, _err = util.subp(CMD_PROBE_FLOPPY)
+ LOG.debug('Command: %s\nOutput%s', ' '.join(CMD_PROBE_FLOPPY), out)
+
+
# Used to match classes to dependencies
# Source DataSourceAltCloud does not really depend on networking.
# In the future 'dsmode' like behavior can be added to offer user
diff --git a/cloudinit/sources/DataSourceSmartOS.py b/cloudinit/sources/DataSourceSmartOS.py
index ad8cfb9c..593ac91a 100644
--- a/cloudinit/sources/DataSourceSmartOS.py
+++ b/cloudinit/sources/DataSourceSmartOS.py
@@ -683,6 +683,18 @@ def jmc_client_factory(
raise ValueError("Unknown value for smartos_type: %s" % smartos_type)
+def identify_file(content_f):
+ cmd = ["file", "--brief", "--mime-type", content_f]
+ f_type = None
+ try:
+ (f_type, _err) = util.subp(cmd)
+ LOG.debug("script %s mime type is %s", content_f, f_type)
+ except util.ProcessExecutionError as e:
+ util.logexc(
+ LOG, ("Failed to identify script type for %s" % content_f, e))
+ return None if f_type is None else f_type.strip()
+
+
def write_boot_content(content, content_f, link=None, shebang=False,
mode=0o400):
"""
@@ -715,18 +727,11 @@ def write_boot_content(content, content_f, link=None, shebang=False,
util.write_file(content_f, content, mode=mode)
if shebang and not content.startswith("#!"):
- try:
- cmd = ["file", "--brief", "--mime-type", content_f]
- (f_type, _err) = util.subp(cmd)
- LOG.debug("script %s mime type is %s", content_f, f_type)
- if f_type.strip() == "text/plain":
- new_content = "\n".join(["#!/bin/bash", content])
- util.write_file(content_f, new_content, mode=mode)
- LOG.debug("added shebang to file %s", content_f)
-
- except Exception as e:
- util.logexc(LOG, ("Failed to identify script type for %s" %
- content_f, e))
+ f_type = identify_file(content_f)
+ if f_type == "text/plain":
+ util.write_file(
+ content_f, "\n".join(["#!/bin/bash", content]), mode=mode)
+ LOG.debug("added shebang to file %s", content_f)
if link:
try:
diff --git a/cloudinit/tests/helpers.py b/cloudinit/tests/helpers.py
index 9a21426e..e7e4182f 100644
--- a/cloudinit/tests/helpers.py
+++ b/cloudinit/tests/helpers.py
@@ -33,6 +33,8 @@ from cloudinit import helpers as ch
from cloudinit.sources import DataSourceNone
from cloudinit import util
+_real_subp = util.subp
+
# Used for skipping tests
SkipTest = unittest2.SkipTest
skipIf = unittest2.skipIf
@@ -143,6 +145,17 @@ class CiTestCase(TestCase):
# Subclass overrides for specific test behavior
# Whether or not a unit test needs logfile setup
with_logs = False
+ allowed_subp = False
+ SUBP_SHELL_TRUE = "shell=true"
+
+ @contextmanager
+ def allow_subp(self, allowed_subp):
+ orig = self.allowed_subp
+ try:
+ self.allowed_subp = allowed_subp
+ yield
+ finally:
+ self.allowed_subp = orig
def setUp(self):
super(CiTestCase, self).setUp()
@@ -155,11 +168,41 @@ class CiTestCase(TestCase):
handler.setFormatter(formatter)
self.old_handlers = self.logger.handlers
self.logger.handlers = [handler]
+ if self.allowed_subp is True:
+ util.subp = _real_subp
+ else:
+ util.subp = self._fake_subp
+
+ def _fake_subp(self, *args, **kwargs):
+ if 'args' in kwargs:
+ cmd = kwargs['args']
+ else:
+ cmd = args[0]
+
+ if not isinstance(cmd, six.string_types):
+ cmd = cmd[0]
+ pass_through = False
+ if not isinstance(self.allowed_subp, (list, bool)):
+ raise TypeError("self.allowed_subp supports list or bool.")
+ if isinstance(self.allowed_subp, bool):
+ pass_through = self.allowed_subp
+ else:
+ pass_through = (
+ (cmd in self.allowed_subp) or
+ (self.SUBP_SHELL_TRUE in self.allowed_subp and
+ kwargs.get('shell')))
+ if pass_through:
+ return _real_subp(*args, **kwargs)
+ raise Exception(
+ "called subp. set self.allowed_subp=True to allow\n subp(%s)" %
+ ', '.join([str(repr(a)) for a in args] +
+ ["%s=%s" % (k, repr(v)) for k, v in kwargs.items()]))
def tearDown(self):
if self.with_logs:
# Remove the handler we setup
logging.getLogger().handlers = self.old_handlers
+ util.subp = _real_subp
super(CiTestCase, self).tearDown()
def tmp_dir(self, dir=None, cleanup=True):