summaryrefslogtreecommitdiff
path: root/tests/unittests/analyze
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/analyze')
-rw-r--r--tests/unittests/analyze/test_boot.py174
-rw-r--r--tests/unittests/analyze/test_dump.py247
2 files changed, 421 insertions, 0 deletions
diff --git a/tests/unittests/analyze/test_boot.py b/tests/unittests/analyze/test_boot.py
new file mode 100644
index 00000000..68db69ec
--- /dev/null
+++ b/tests/unittests/analyze/test_boot.py
@@ -0,0 +1,174 @@
+import os
+
+from cloudinit.analyze.__main__ import analyze_boot, get_parser
+from cloudinit.analyze.show import (
+ CONTAINER_CODE,
+ FAIL_CODE,
+ SystemctlReader,
+ dist_check_timestamp,
+)
+from tests.unittests.helpers import CiTestCase, mock
+
+err_code = (FAIL_CODE, -1, -1, -1)
+
+
+class TestDistroChecker(CiTestCase):
+ def test_blank_distro(self):
+ self.assertEqual(err_code, dist_check_timestamp())
+
+ @mock.patch("cloudinit.util.is_FreeBSD", return_value=True)
+ def test_freebsd_gentoo_cant_find(self, m_is_FreeBSD):
+ self.assertEqual(err_code, dist_check_timestamp())
+
+ @mock.patch("cloudinit.subp.subp", return_value=(0, 1))
+ def test_subp_fails(self, m_subp):
+ self.assertEqual(err_code, dist_check_timestamp())
+
+
+class TestSystemCtlReader(CiTestCase):
+ def test_systemctl_invalid_property(self):
+ reader = SystemctlReader("dummyProperty")
+ with self.assertRaises(RuntimeError):
+ reader.parse_epoch_as_float()
+
+ def test_systemctl_invalid_parameter(self):
+ reader = SystemctlReader("dummyProperty", "dummyParameter")
+ with self.assertRaises(RuntimeError):
+ reader.parse_epoch_as_float()
+
+ @mock.patch("cloudinit.subp.subp", return_value=("U=1000000", None))
+ def test_systemctl_works_correctly_threshold(self, m_subp):
+ reader = SystemctlReader("dummyProperty", "dummyParameter")
+ self.assertEqual(1.0, reader.parse_epoch_as_float())
+ thresh = 1.0 - reader.parse_epoch_as_float()
+ self.assertTrue(thresh < 1e-6)
+ self.assertTrue(thresh > (-1 * 1e-6))
+
+ @mock.patch("cloudinit.subp.subp", return_value=("U=0", None))
+ def test_systemctl_succeed_zero(self, m_subp):
+ reader = SystemctlReader("dummyProperty", "dummyParameter")
+ self.assertEqual(0.0, reader.parse_epoch_as_float())
+
+ @mock.patch("cloudinit.subp.subp", return_value=("U=1", None))
+ def test_systemctl_succeed_distinct(self, m_subp):
+ reader = SystemctlReader("dummyProperty", "dummyParameter")
+ val1 = reader.parse_epoch_as_float()
+ m_subp.return_value = ("U=2", None)
+ reader2 = SystemctlReader("dummyProperty", "dummyParameter")
+ val2 = reader2.parse_epoch_as_float()
+ self.assertNotEqual(val1, val2)
+
+ @mock.patch("cloudinit.subp.subp", return_value=("100", None))
+ def test_systemctl_epoch_not_splittable(self, m_subp):
+ reader = SystemctlReader("dummyProperty", "dummyParameter")
+ with self.assertRaises(IndexError):
+ reader.parse_epoch_as_float()
+
+ @mock.patch("cloudinit.subp.subp", return_value=("U=foobar", None))
+ def test_systemctl_cannot_convert_epoch_to_float(self, m_subp):
+ reader = SystemctlReader("dummyProperty", "dummyParameter")
+ with self.assertRaises(ValueError):
+ reader.parse_epoch_as_float()
+
+
+class TestAnalyzeBoot(CiTestCase):
+ def set_up_dummy_file_ci(self, path, log_path):
+ infh = open(path, "w+")
+ infh.write(
+ "2019-07-08 17:40:49,601 - util.py[DEBUG]: Cloud-init v. "
+ "19.1-1-gbaa47854-0ubuntu1~18.04.1 running 'init-local' "
+ "at Mon, 08 Jul 2019 17:40:49 +0000. Up 18.84 seconds."
+ )
+ infh.close()
+ outfh = open(log_path, "w+")
+ outfh.close()
+
+ def set_up_dummy_file(self, path, log_path):
+ infh = open(path, "w+")
+ infh.write("dummy data")
+ infh.close()
+ outfh = open(log_path, "w+")
+ outfh.close()
+
+ def remove_dummy_file(self, path, log_path):
+ if os.path.isfile(path):
+ os.remove(path)
+ if os.path.isfile(log_path):
+ os.remove(log_path)
+
+ @mock.patch(
+ "cloudinit.analyze.show.dist_check_timestamp", return_value=err_code
+ )
+ def test_boot_invalid_distro(self, m_dist_check_timestamp):
+
+ path = os.path.dirname(os.path.abspath(__file__))
+ log_path = path + "/boot-test.log"
+ path += "/dummy.log"
+ self.set_up_dummy_file(path, log_path)
+
+ parser = get_parser()
+ args = parser.parse_args(args=["boot", "-i", path, "-o", log_path])
+ name_default = ""
+ analyze_boot(name_default, args)
+ # now args have been tested, go into outfile and make sure error
+ # message is in the outfile
+ outfh = open(args.outfile, "r")
+ data = outfh.read()
+ err_string = (
+ "Your Linux distro or container does not support this "
+ "functionality.\nYou must be running a Kernel "
+ "Telemetry supported distro.\nPlease check "
+ "https://cloudinit.readthedocs.io/en/latest/topics"
+ "/analyze.html for more information on supported "
+ "distros.\n"
+ )
+
+ self.remove_dummy_file(path, log_path)
+ self.assertEqual(err_string, data)
+
+ @mock.patch("cloudinit.util.is_container", return_value=True)
+ @mock.patch("cloudinit.subp.subp", return_value=("U=1000000", None))
+ def test_container_no_ci_log_line(self, m_is_container, m_subp):
+ path = os.path.dirname(os.path.abspath(__file__))
+ log_path = path + "/boot-test.log"
+ path += "/dummy.log"
+ self.set_up_dummy_file(path, log_path)
+
+ parser = get_parser()
+ args = parser.parse_args(args=["boot", "-i", path, "-o", log_path])
+ name_default = ""
+
+ finish_code = analyze_boot(name_default, args)
+
+ self.remove_dummy_file(path, log_path)
+ self.assertEqual(FAIL_CODE, finish_code)
+
+ @mock.patch("cloudinit.util.is_container", return_value=True)
+ @mock.patch("cloudinit.subp.subp", return_value=("U=1000000", None))
+ @mock.patch(
+ "cloudinit.analyze.__main__._get_events",
+ return_value=[
+ {
+ "name": "init-local",
+ "description": "starting search",
+ "timestamp": 100000,
+ }
+ ],
+ )
+ @mock.patch(
+ "cloudinit.analyze.show.dist_check_timestamp",
+ return_value=(CONTAINER_CODE, 1, 1, 1),
+ )
+ def test_container_ci_log_line(self, m_is_container, m_subp, m_get, m_g):
+ path = os.path.dirname(os.path.abspath(__file__))
+ log_path = path + "/boot-test.log"
+ path += "/dummy.log"
+ self.set_up_dummy_file_ci(path, log_path)
+
+ parser = get_parser()
+ args = parser.parse_args(args=["boot", "-i", path, "-o", log_path])
+ name_default = ""
+ finish_code = analyze_boot(name_default, args)
+
+ self.remove_dummy_file(path, log_path)
+ self.assertEqual(CONTAINER_CODE, finish_code)
diff --git a/tests/unittests/analyze/test_dump.py b/tests/unittests/analyze/test_dump.py
new file mode 100644
index 00000000..56bbf97f
--- /dev/null
+++ b/tests/unittests/analyze/test_dump.py
@@ -0,0 +1,247 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+from datetime import datetime
+from textwrap import dedent
+
+from cloudinit.analyze.dump import (
+ dump_events,
+ parse_ci_logline,
+ parse_timestamp,
+)
+from cloudinit.subp import which
+from cloudinit.util import write_file
+from tests.unittests.helpers import CiTestCase, mock, skipIf
+
+
+class TestParseTimestamp(CiTestCase):
+ def test_parse_timestamp_handles_cloud_init_default_format(self):
+ """Logs with cloud-init detailed formats will be properly parsed."""
+ trusty_fmt = "%Y-%m-%d %H:%M:%S,%f"
+ trusty_stamp = "2016-09-12 14:39:20,839"
+ dt = datetime.strptime(trusty_stamp, trusty_fmt)
+ self.assertEqual(
+ float(dt.strftime("%s.%f")), parse_timestamp(trusty_stamp)
+ )
+
+ def test_parse_timestamp_handles_syslog_adding_year(self):
+ """Syslog timestamps lack a year. Add year and properly parse."""
+ syslog_fmt = "%b %d %H:%M:%S %Y"
+ syslog_stamp = "Aug 08 15:12:51"
+
+ # convert stamp ourselves by adding the missing year value
+ year = datetime.now().year
+ dt = datetime.strptime(syslog_stamp + " " + str(year), syslog_fmt)
+ self.assertEqual(
+ float(dt.strftime("%s.%f")), parse_timestamp(syslog_stamp)
+ )
+
+ def test_parse_timestamp_handles_journalctl_format_adding_year(self):
+ """Journalctl precise timestamps lack a year. Add year and parse."""
+ journal_fmt = "%b %d %H:%M:%S.%f %Y"
+ journal_stamp = "Aug 08 17:15:50.606811"
+
+ # convert stamp ourselves by adding the missing year value
+ year = datetime.now().year
+ dt = datetime.strptime(journal_stamp + " " + str(year), journal_fmt)
+ self.assertEqual(
+ float(dt.strftime("%s.%f")), parse_timestamp(journal_stamp)
+ )
+
+ @skipIf(not which("date"), "'date' command not available.")
+ def test_parse_unexpected_timestamp_format_with_date_command(self):
+ """Dump sends unexpected timestamp formats to date for processing."""
+ new_fmt = "%H:%M %m/%d %Y"
+ new_stamp = "17:15 08/08"
+ # convert stamp ourselves by adding the missing year value
+ year = datetime.now().year
+ dt = datetime.strptime(new_stamp + " " + str(year), new_fmt)
+
+ # use date(1)
+ with self.allow_subp(["date"]):
+ self.assertEqual(
+ float(dt.strftime("%s.%f")), parse_timestamp(new_stamp)
+ )
+
+
+class TestParseCILogLine(CiTestCase):
+ def test_parse_logline_returns_none_without_separators(self):
+ """When no separators are found, parse_ci_logline returns None."""
+ expected_parse_ignores = [
+ "",
+ "-",
+ "adsf-asdf",
+ "2017-05-22 18:02:01,088",
+ "CLOUDINIT",
+ ]
+ for parse_ignores in expected_parse_ignores:
+ self.assertIsNone(parse_ci_logline(parse_ignores))
+
+ def test_parse_logline_returns_event_for_cloud_init_logs(self):
+ """parse_ci_logline returns an event parse from cloud-init format."""
+ line = (
+ "2017-08-08 20:05:07,147 - util.py[DEBUG]: Cloud-init v. 0.7.9"
+ " running 'init-local' at Tue, 08 Aug 2017 20:05:07 +0000. Up"
+ " 6.26 seconds."
+ )
+ dt = datetime.strptime(
+ "2017-08-08 20:05:07,147", "%Y-%m-%d %H:%M:%S,%f"
+ )
+ timestamp = float(dt.strftime("%s.%f"))
+ expected = {
+ "description": "starting search for local datasources",
+ "event_type": "start",
+ "name": "init-local",
+ "origin": "cloudinit",
+ "timestamp": timestamp,
+ }
+ self.assertEqual(expected, parse_ci_logline(line))
+
+ def test_parse_logline_returns_event_for_journalctl_logs(self):
+ """parse_ci_logline returns an event parse from journalctl format."""
+ line = (
+ "Nov 03 06:51:06.074410 x2 cloud-init[106]: [CLOUDINIT]"
+ " util.py[DEBUG]: Cloud-init v. 0.7.8 running 'init-local' at"
+ " Thu, 03 Nov 2016 06:51:06 +0000. Up 1.0 seconds."
+ )
+ year = datetime.now().year
+ dt = datetime.strptime(
+ "Nov 03 06:51:06.074410 %d" % year, "%b %d %H:%M:%S.%f %Y"
+ )
+ timestamp = float(dt.strftime("%s.%f"))
+ expected = {
+ "description": "starting search for local datasources",
+ "event_type": "start",
+ "name": "init-local",
+ "origin": "cloudinit",
+ "timestamp": timestamp,
+ }
+ self.assertEqual(expected, parse_ci_logline(line))
+
+ @mock.patch("cloudinit.analyze.dump.parse_timestamp_from_date")
+ def test_parse_logline_returns_event_for_finish_events(
+ self, m_parse_from_date
+ ):
+ """parse_ci_logline returns a finish event for a parsed log line."""
+ line = (
+ "2016-08-30 21:53:25.972325+00:00 y1 [CLOUDINIT]"
+ " handlers.py[DEBUG]: finish: modules-final: SUCCESS: running"
+ " modules for final"
+ )
+ expected = {
+ "description": "running modules for final",
+ "event_type": "finish",
+ "name": "modules-final",
+ "origin": "cloudinit",
+ "result": "SUCCESS",
+ "timestamp": 1472594005.972,
+ }
+ m_parse_from_date.return_value = "1472594005.972"
+ self.assertEqual(expected, parse_ci_logline(line))
+ m_parse_from_date.assert_has_calls(
+ [mock.call("2016-08-30 21:53:25.972325+00:00")]
+ )
+
+ def test_parse_logline_returns_event_for_amazon_linux_2_line(self):
+ line = (
+ "Apr 30 19:39:11 cloud-init[2673]: handlers.py[DEBUG]: start:"
+ " init-local/check-cache: attempting to read from cache [check]"
+ )
+ # Generate the expected value using `datetime`, so that TZ
+ # determination is consistent with the code under test.
+ timestamp_dt = datetime.strptime(
+ "Apr 30 19:39:11", "%b %d %H:%M:%S"
+ ).replace(year=datetime.now().year)
+ expected = {
+ "description": "attempting to read from cache [check]",
+ "event_type": "start",
+ "name": "init-local/check-cache",
+ "origin": "cloudinit",
+ "timestamp": timestamp_dt.timestamp(),
+ }
+ self.assertEqual(expected, parse_ci_logline(line))
+
+
+SAMPLE_LOGS = dedent(
+ """\
+Nov 03 06:51:06.074410 x2 cloud-init[106]: [CLOUDINIT] util.py[DEBUG]:\
+ Cloud-init v. 0.7.8 running 'init-local' at Thu, 03 Nov 2016\
+ 06:51:06 +0000. Up 1.0 seconds.
+2016-08-30 21:53:25.972325+00:00 y1 [CLOUDINIT] handlers.py[DEBUG]: finish:\
+ modules-final: SUCCESS: running modules for final
+"""
+)
+
+
+class TestDumpEvents(CiTestCase):
+ maxDiff = None
+
+ @mock.patch("cloudinit.analyze.dump.parse_timestamp_from_date")
+ def test_dump_events_with_rawdata(self, m_parse_from_date):
+ """Rawdata is split and parsed into a tuple of events and data"""
+ m_parse_from_date.return_value = "1472594005.972"
+ events, data = dump_events(rawdata=SAMPLE_LOGS)
+ expected_data = SAMPLE_LOGS.splitlines()
+ self.assertEqual(
+ [mock.call("2016-08-30 21:53:25.972325+00:00")],
+ m_parse_from_date.call_args_list,
+ )
+ self.assertEqual(expected_data, data)
+ year = datetime.now().year
+ dt1 = datetime.strptime(
+ "Nov 03 06:51:06.074410 %d" % year, "%b %d %H:%M:%S.%f %Y"
+ )
+ timestamp1 = float(dt1.strftime("%s.%f"))
+ expected_events = [
+ {
+ "description": "starting search for local datasources",
+ "event_type": "start",
+ "name": "init-local",
+ "origin": "cloudinit",
+ "timestamp": timestamp1,
+ },
+ {
+ "description": "running modules for final",
+ "event_type": "finish",
+ "name": "modules-final",
+ "origin": "cloudinit",
+ "result": "SUCCESS",
+ "timestamp": 1472594005.972,
+ },
+ ]
+ self.assertEqual(expected_events, events)
+
+ @mock.patch("cloudinit.analyze.dump.parse_timestamp_from_date")
+ def test_dump_events_with_cisource(self, m_parse_from_date):
+ """Cisource file is read and parsed into a tuple of events and data."""
+ tmpfile = self.tmp_path("logfile")
+ write_file(tmpfile, SAMPLE_LOGS)
+ m_parse_from_date.return_value = 1472594005.972
+
+ events, data = dump_events(cisource=open(tmpfile))
+ year = datetime.now().year
+ dt1 = datetime.strptime(
+ "Nov 03 06:51:06.074410 %d" % year, "%b %d %H:%M:%S.%f %Y"
+ )
+ timestamp1 = float(dt1.strftime("%s.%f"))
+ expected_events = [
+ {
+ "description": "starting search for local datasources",
+ "event_type": "start",
+ "name": "init-local",
+ "origin": "cloudinit",
+ "timestamp": timestamp1,
+ },
+ {
+ "description": "running modules for final",
+ "event_type": "finish",
+ "name": "modules-final",
+ "origin": "cloudinit",
+ "result": "SUCCESS",
+ "timestamp": 1472594005.972,
+ },
+ ]
+ self.assertEqual(expected_events, events)
+ self.assertEqual(SAMPLE_LOGS.splitlines(), [d.strip() for d in data])
+ m_parse_from_date.assert_has_calls(
+ [mock.call("2016-08-30 21:53:25.972325+00:00")]
+ )