summaryrefslogtreecommitdiff
path: root/tests/unittests/cmd/test_status.py
diff options
context:
space:
mode:
authorBrett Holman <bholman.devel@gmail.com>2021-12-03 13:11:46 -0700
committerGitHub <noreply@github.com>2021-12-03 13:11:46 -0700
commit039c40f9b3d88ee8158604bb18ca4bf2fb5d5e51 (patch)
tree5f1b09486ccaf98ee8159de58d9a2a1ef0af5dc1 /tests/unittests/cmd/test_status.py
parentffa6fc88249aa080aa31811a45569a45e567418a (diff)
downloadvyos-cloud-init-039c40f9b3d88ee8158604bb18ca4bf2fb5d5e51.tar.gz
vyos-cloud-init-039c40f9b3d88ee8158604bb18ca4bf2fb5d5e51.zip
Reorganize unit test locations under tests/unittests (#1126)
This attempts to standardize unit test file location under test/unittests/ such that any source file located at cloudinit/path/to/file.py may have a corresponding unit test file at test/unittests/path/to/test_file.py. Noteworthy Comments: ==================== Four different duplicate test files existed: test_{gpg,util,cc_mounts,cc_resolv_conf}.py Each of these duplicate file pairs has been merged together. This is a break in git history for these files. The test suite appears to have a dependency on test order. Changing test order causes some tests to fail. This should be rectified, but for now some tests have been modified in tests/unittests/config/test_set_passwords.py. A helper class name starts with "Test" which causes pytest to try executing it as a test case, which then throws warnings "due to Class having __init__()". Silence by changing the name of the class. # helpers.py is imported in many test files, import paths change cloudinit/tests/helpers.py -> tests/unittests/helpers.py # Move directories: cloudinit/distros/tests -> tests/unittests/distros cloudinit/cmd/devel/tests -> tests/unittests/cmd/devel cloudinit/cmd/tests -> tests/unittests/cmd/ cloudinit/sources/helpers/tests -> tests/unittests/sources/helpers cloudinit/sources/tests -> tests/unittests/sources cloudinit/net/tests -> tests/unittests/net cloudinit/config/tests -> tests/unittests/config cloudinit/analyze/tests/ -> tests/unittests/analyze/ # Standardize tests already in tests/unittests/ test_datasource -> sources test_distros -> distros test_vmware -> sources/vmware test_handler -> config # this contains cloudconfig module tests test_runs -> runs
Diffstat (limited to 'tests/unittests/cmd/test_status.py')
-rw-r--r--tests/unittests/cmd/test_status.py391
1 files changed, 391 insertions, 0 deletions
diff --git a/tests/unittests/cmd/test_status.py b/tests/unittests/cmd/test_status.py
new file mode 100644
index 00000000..49eae043
--- /dev/null
+++ b/tests/unittests/cmd/test_status.py
@@ -0,0 +1,391 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+from collections import namedtuple
+import os
+from io import StringIO
+from textwrap import dedent
+
+from cloudinit.atomic_helper import write_json
+from cloudinit.cmd import status
+from cloudinit.util import ensure_file
+from tests.unittests.helpers import CiTestCase, wrap_and_call, mock
+
+mypaths = namedtuple('MyPaths', 'run_dir')
+myargs = namedtuple('MyArgs', 'long wait')
+
+
+class TestStatus(CiTestCase):
+
+ def setUp(self):
+ super(TestStatus, self).setUp()
+ self.new_root = self.tmp_dir()
+ self.status_file = self.tmp_path('status.json', self.new_root)
+ self.disable_file = self.tmp_path('cloudinit-disable', self.new_root)
+ self.paths = mypaths(run_dir=self.new_root)
+
+ class FakeInit(object):
+ paths = self.paths
+
+ def __init__(self, ds_deps):
+ pass
+
+ def read_cfg(self):
+ pass
+
+ self.init_class = FakeInit
+
+ def test__is_cloudinit_disabled_false_on_sysvinit(self):
+ '''When not in an environment using systemd, return False.'''
+ ensure_file(self.disable_file) # Create the ignored disable file
+ (is_disabled, reason) = wrap_and_call(
+ 'cloudinit.cmd.status',
+ {'uses_systemd': False,
+ 'get_cmdline': "root=/dev/my-root not-important"},
+ status._is_cloudinit_disabled, self.disable_file, self.paths)
+ self.assertFalse(
+ is_disabled, 'expected enabled cloud-init on sysvinit')
+ self.assertEqual('Cloud-init enabled on sysvinit', reason)
+
+ def test__is_cloudinit_disabled_true_on_disable_file(self):
+ '''When using systemd and disable_file is present return disabled.'''
+ ensure_file(self.disable_file) # Create observed disable file
+ (is_disabled, reason) = wrap_and_call(
+ 'cloudinit.cmd.status',
+ {'uses_systemd': True,
+ 'get_cmdline': "root=/dev/my-root not-important"},
+ status._is_cloudinit_disabled, self.disable_file, self.paths)
+ self.assertTrue(is_disabled, 'expected disabled cloud-init')
+ self.assertEqual(
+ 'Cloud-init disabled by {0}'.format(self.disable_file), reason)
+
+ def test__is_cloudinit_disabled_false_on_kernel_cmdline_enable(self):
+ '''Not disabled when using systemd and enabled via commandline.'''
+ ensure_file(self.disable_file) # Create ignored disable file
+ (is_disabled, reason) = wrap_and_call(
+ 'cloudinit.cmd.status',
+ {'uses_systemd': True,
+ 'get_cmdline': 'something cloud-init=enabled else'},
+ status._is_cloudinit_disabled, self.disable_file, self.paths)
+ self.assertFalse(is_disabled, 'expected enabled cloud-init')
+ self.assertEqual(
+ 'Cloud-init enabled by kernel command line cloud-init=enabled',
+ reason)
+
+ def test__is_cloudinit_disabled_true_on_kernel_cmdline(self):
+ '''When using systemd and disable_file is present return disabled.'''
+ (is_disabled, reason) = wrap_and_call(
+ 'cloudinit.cmd.status',
+ {'uses_systemd': True,
+ 'get_cmdline': 'something cloud-init=disabled else'},
+ status._is_cloudinit_disabled, self.disable_file, self.paths)
+ self.assertTrue(is_disabled, 'expected disabled cloud-init')
+ self.assertEqual(
+ 'Cloud-init disabled by kernel parameter cloud-init=disabled',
+ reason)
+
+ def test__is_cloudinit_disabled_true_when_generator_disables(self):
+ '''When cloud-init-generator doesn't write enabled file return True.'''
+ enabled_file = os.path.join(self.paths.run_dir, 'enabled')
+ self.assertFalse(os.path.exists(enabled_file))
+ (is_disabled, reason) = wrap_and_call(
+ 'cloudinit.cmd.status',
+ {'uses_systemd': True,
+ 'get_cmdline': 'something'},
+ status._is_cloudinit_disabled, self.disable_file, self.paths)
+ self.assertTrue(is_disabled, 'expected disabled cloud-init')
+ self.assertEqual('Cloud-init disabled by cloud-init-generator', reason)
+
+ def test__is_cloudinit_disabled_false_when_enabled_in_systemd(self):
+ '''Report enabled when systemd generator creates the enabled file.'''
+ enabled_file = os.path.join(self.paths.run_dir, 'enabled')
+ ensure_file(enabled_file)
+ (is_disabled, reason) = wrap_and_call(
+ 'cloudinit.cmd.status',
+ {'uses_systemd': True,
+ 'get_cmdline': 'something ignored'},
+ status._is_cloudinit_disabled, self.disable_file, self.paths)
+ self.assertFalse(is_disabled, 'expected enabled cloud-init')
+ self.assertEqual(
+ 'Cloud-init enabled by systemd cloud-init-generator', reason)
+
+ def test_status_returns_not_run(self):
+ '''When status.json does not exist yet, return 'not run'.'''
+ self.assertFalse(
+ os.path.exists(self.status_file), 'Unexpected status.json found')
+ cmdargs = myargs(long=False, wait=False)
+ with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
+ retcode = wrap_and_call(
+ 'cloudinit.cmd.status',
+ {'_is_cloudinit_disabled': (False, ''),
+ 'Init': {'side_effect': self.init_class}},
+ status.handle_status_args, 'ignored', cmdargs)
+ self.assertEqual(0, retcode)
+ self.assertEqual('status: not run\n', m_stdout.getvalue())
+
+ def test_status_returns_disabled_long_on_presence_of_disable_file(self):
+ '''When cloudinit is disabled, return disabled reason.'''
+
+ checked_files = []
+
+ def fakeexists(filepath):
+ checked_files.append(filepath)
+ status_file = os.path.join(self.paths.run_dir, 'status.json')
+ return bool(not filepath == status_file)
+
+ cmdargs = myargs(long=True, wait=False)
+ with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
+ retcode = wrap_and_call(
+ 'cloudinit.cmd.status',
+ {'os.path.exists': {'side_effect': fakeexists},
+ '_is_cloudinit_disabled': (True, 'disabled for some reason'),
+ 'Init': {'side_effect': self.init_class}},
+ status.handle_status_args, 'ignored', cmdargs)
+ self.assertEqual(0, retcode)
+ self.assertEqual(
+ [os.path.join(self.paths.run_dir, 'status.json')],
+ checked_files)
+ expected = dedent('''\
+ status: disabled
+ detail:
+ disabled for some reason
+ ''')
+ self.assertEqual(expected, m_stdout.getvalue())
+
+ def test_status_returns_running_on_no_results_json(self):
+ '''Report running when status.json exists but result.json does not.'''
+ result_file = self.tmp_path('result.json', self.new_root)
+ write_json(self.status_file, {})
+ self.assertFalse(
+ os.path.exists(result_file), 'Unexpected result.json found')
+ cmdargs = myargs(long=False, wait=False)
+ with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
+ retcode = wrap_and_call(
+ 'cloudinit.cmd.status',
+ {'_is_cloudinit_disabled': (False, ''),
+ 'Init': {'side_effect': self.init_class}},
+ status.handle_status_args, 'ignored', cmdargs)
+ self.assertEqual(0, retcode)
+ self.assertEqual('status: running\n', m_stdout.getvalue())
+
+ def test_status_returns_running(self):
+ '''Report running when status exists with an unfinished stage.'''
+ ensure_file(self.tmp_path('result.json', self.new_root))
+ write_json(self.status_file,
+ {'v1': {'init': {'start': 1, 'finished': None}}})
+ cmdargs = myargs(long=False, wait=False)
+ with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
+ retcode = wrap_and_call(
+ 'cloudinit.cmd.status',
+ {'_is_cloudinit_disabled': (False, ''),
+ 'Init': {'side_effect': self.init_class}},
+ status.handle_status_args, 'ignored', cmdargs)
+ self.assertEqual(0, retcode)
+ self.assertEqual('status: running\n', m_stdout.getvalue())
+
+ def test_status_returns_done(self):
+ '''Report done results.json exists no stages are unfinished.'''
+ ensure_file(self.tmp_path('result.json', self.new_root))
+ write_json(
+ self.status_file,
+ {'v1': {'stage': None, # No current stage running
+ 'datasource': (
+ 'DataSourceNoCloud [seed=/var/.../seed/nocloud-net]'
+ '[dsmode=net]'),
+ 'blah': {'finished': 123.456},
+ 'init': {'errors': [], 'start': 124.567,
+ 'finished': 125.678},
+ 'init-local': {'start': 123.45, 'finished': 123.46}}})
+ cmdargs = myargs(long=False, wait=False)
+ with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
+ retcode = wrap_and_call(
+ 'cloudinit.cmd.status',
+ {'_is_cloudinit_disabled': (False, ''),
+ 'Init': {'side_effect': self.init_class}},
+ status.handle_status_args, 'ignored', cmdargs)
+ self.assertEqual(0, retcode)
+ self.assertEqual('status: done\n', m_stdout.getvalue())
+
+ def test_status_returns_done_long(self):
+ '''Long format of done status includes datasource info.'''
+ ensure_file(self.tmp_path('result.json', self.new_root))
+ write_json(
+ self.status_file,
+ {'v1': {'stage': None,
+ 'datasource': (
+ 'DataSourceNoCloud [seed=/var/.../seed/nocloud-net]'
+ '[dsmode=net]'),
+ 'init': {'start': 124.567, 'finished': 125.678},
+ 'init-local': {'start': 123.45, 'finished': 123.46}}})
+ cmdargs = myargs(long=True, wait=False)
+ with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
+ retcode = wrap_and_call(
+ 'cloudinit.cmd.status',
+ {'_is_cloudinit_disabled': (False, ''),
+ 'Init': {'side_effect': self.init_class}},
+ status.handle_status_args, 'ignored', cmdargs)
+ self.assertEqual(0, retcode)
+ expected = dedent('''\
+ status: done
+ time: Thu, 01 Jan 1970 00:02:05 +0000
+ detail:
+ DataSourceNoCloud [seed=/var/.../seed/nocloud-net][dsmode=net]
+ ''')
+ self.assertEqual(expected, m_stdout.getvalue())
+
+ def test_status_on_errors(self):
+ '''Reports error when any stage has errors.'''
+ write_json(
+ self.status_file,
+ {'v1': {'stage': None,
+ 'blah': {'errors': [], 'finished': 123.456},
+ 'init': {'errors': ['error1'], 'start': 124.567,
+ 'finished': 125.678},
+ 'init-local': {'start': 123.45, 'finished': 123.46}}})
+ cmdargs = myargs(long=False, wait=False)
+ with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
+ retcode = wrap_and_call(
+ 'cloudinit.cmd.status',
+ {'_is_cloudinit_disabled': (False, ''),
+ 'Init': {'side_effect': self.init_class}},
+ status.handle_status_args, 'ignored', cmdargs)
+ self.assertEqual(1, retcode)
+ self.assertEqual('status: error\n', m_stdout.getvalue())
+
+ def test_status_on_errors_long(self):
+ '''Long format of error status includes all error messages.'''
+ write_json(
+ self.status_file,
+ {'v1': {'stage': None,
+ 'datasource': (
+ 'DataSourceNoCloud [seed=/var/.../seed/nocloud-net]'
+ '[dsmode=net]'),
+ 'init': {'errors': ['error1'], 'start': 124.567,
+ 'finished': 125.678},
+ 'init-local': {'errors': ['error2', 'error3'],
+ 'start': 123.45, 'finished': 123.46}}})
+ cmdargs = myargs(long=True, wait=False)
+ with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
+ retcode = wrap_and_call(
+ 'cloudinit.cmd.status',
+ {'_is_cloudinit_disabled': (False, ''),
+ 'Init': {'side_effect': self.init_class}},
+ status.handle_status_args, 'ignored', cmdargs)
+ self.assertEqual(1, retcode)
+ expected = dedent('''\
+ status: error
+ time: Thu, 01 Jan 1970 00:02:05 +0000
+ detail:
+ error1
+ error2
+ error3
+ ''')
+ self.assertEqual(expected, m_stdout.getvalue())
+
+ def test_status_returns_running_long_format(self):
+ '''Long format reports the stage in which we are running.'''
+ write_json(
+ self.status_file,
+ {'v1': {'stage': 'init',
+ 'init': {'start': 124.456, 'finished': None},
+ 'init-local': {'start': 123.45, 'finished': 123.46}}})
+ cmdargs = myargs(long=True, wait=False)
+ with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
+ retcode = wrap_and_call(
+ 'cloudinit.cmd.status',
+ {'_is_cloudinit_disabled': (False, ''),
+ 'Init': {'side_effect': self.init_class}},
+ status.handle_status_args, 'ignored', cmdargs)
+ self.assertEqual(0, retcode)
+ expected = dedent('''\
+ status: running
+ time: Thu, 01 Jan 1970 00:02:04 +0000
+ detail:
+ Running in stage: init
+ ''')
+ self.assertEqual(expected, m_stdout.getvalue())
+
+ def test_status_wait_blocks_until_done(self):
+ '''Specifying wait will poll every 1/4 second until done state.'''
+ running_json = {
+ 'v1': {'stage': 'init',
+ 'init': {'start': 124.456, 'finished': None},
+ 'init-local': {'start': 123.45, 'finished': 123.46}}}
+ done_json = {
+ 'v1': {'stage': None,
+ 'init': {'start': 124.456, 'finished': 125.678},
+ 'init-local': {'start': 123.45, 'finished': 123.46}}}
+
+ self.sleep_calls = 0
+
+ def fake_sleep(interval):
+ self.assertEqual(0.25, interval)
+ self.sleep_calls += 1
+ if self.sleep_calls == 2:
+ write_json(self.status_file, running_json)
+ elif self.sleep_calls == 3:
+ write_json(self.status_file, done_json)
+ result_file = self.tmp_path('result.json', self.new_root)
+ ensure_file(result_file)
+
+ cmdargs = myargs(long=False, wait=True)
+ with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
+ retcode = wrap_and_call(
+ 'cloudinit.cmd.status',
+ {'sleep': {'side_effect': fake_sleep},
+ '_is_cloudinit_disabled': (False, ''),
+ 'Init': {'side_effect': self.init_class}},
+ status.handle_status_args, 'ignored', cmdargs)
+ self.assertEqual(0, retcode)
+ self.assertEqual(4, self.sleep_calls)
+ self.assertEqual('....\nstatus: done\n', m_stdout.getvalue())
+
+ def test_status_wait_blocks_until_error(self):
+ '''Specifying wait will poll every 1/4 second until error state.'''
+ running_json = {
+ 'v1': {'stage': 'init',
+ 'init': {'start': 124.456, 'finished': None},
+ 'init-local': {'start': 123.45, 'finished': 123.46}}}
+ error_json = {
+ 'v1': {'stage': None,
+ 'init': {'errors': ['error1'], 'start': 124.456,
+ 'finished': 125.678},
+ 'init-local': {'start': 123.45, 'finished': 123.46}}}
+
+ self.sleep_calls = 0
+
+ def fake_sleep(interval):
+ self.assertEqual(0.25, interval)
+ self.sleep_calls += 1
+ if self.sleep_calls == 2:
+ write_json(self.status_file, running_json)
+ elif self.sleep_calls == 3:
+ write_json(self.status_file, error_json)
+
+ cmdargs = myargs(long=False, wait=True)
+ with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
+ retcode = wrap_and_call(
+ 'cloudinit.cmd.status',
+ {'sleep': {'side_effect': fake_sleep},
+ '_is_cloudinit_disabled': (False, ''),
+ 'Init': {'side_effect': self.init_class}},
+ status.handle_status_args, 'ignored', cmdargs)
+ self.assertEqual(1, retcode)
+ self.assertEqual(4, self.sleep_calls)
+ self.assertEqual('....\nstatus: error\n', m_stdout.getvalue())
+
+ def test_status_main(self):
+ '''status.main can be run as a standalone script.'''
+ write_json(self.status_file,
+ {'v1': {'init': {'start': 1, 'finished': None}}})
+ with self.assertRaises(SystemExit) as context_manager:
+ with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
+ wrap_and_call(
+ 'cloudinit.cmd.status',
+ {'sys.argv': {'new': ['status']},
+ '_is_cloudinit_disabled': (False, ''),
+ 'Init': {'side_effect': self.init_class}},
+ status.main)
+ self.assertEqual(0, context_manager.exception.code)
+ self.assertEqual('status: running\n', m_stdout.getvalue())
+
+# vi: ts=4 expandtab syntax=python