diff options
Diffstat (limited to 'tests/unittests/cmd')
-rw-r--r-- | tests/unittests/cmd/__init__.py | 0 | ||||
-rw-r--r-- | tests/unittests/cmd/devel/__init__.py | 0 | ||||
-rw-r--r-- | tests/unittests/cmd/devel/test_logs.py | 167 | ||||
-rw-r--r-- | tests/unittests/cmd/devel/test_render.py | 144 | ||||
-rw-r--r-- | tests/unittests/cmd/test_clean.py | 178 | ||||
-rw-r--r-- | tests/unittests/cmd/test_cloud_id.py | 127 | ||||
-rw-r--r-- | tests/unittests/cmd/test_main.py | 188 | ||||
-rw-r--r-- | tests/unittests/cmd/test_query.py | 392 | ||||
-rw-r--r-- | tests/unittests/cmd/test_status.py | 391 |
9 files changed, 1587 insertions, 0 deletions
diff --git a/tests/unittests/cmd/__init__.py b/tests/unittests/cmd/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/tests/unittests/cmd/__init__.py diff --git a/tests/unittests/cmd/devel/__init__.py b/tests/unittests/cmd/devel/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/tests/unittests/cmd/devel/__init__.py diff --git a/tests/unittests/cmd/devel/test_logs.py b/tests/unittests/cmd/devel/test_logs.py new file mode 100644 index 00000000..18bdcdda --- /dev/null +++ b/tests/unittests/cmd/devel/test_logs.py @@ -0,0 +1,167 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +from datetime import datetime +import os +from io import StringIO + +from cloudinit.cmd.devel import logs +from cloudinit.sources import INSTANCE_JSON_SENSITIVE_FILE +from tests.unittests.helpers import ( + FilesystemMockingTestCase, mock, wrap_and_call) +from cloudinit.subp import subp +from cloudinit.util import ensure_dir, load_file, write_file + + +@mock.patch('cloudinit.cmd.devel.logs.os.getuid') +class TestCollectLogs(FilesystemMockingTestCase): + + def setUp(self): + super(TestCollectLogs, self).setUp() + self.new_root = self.tmp_dir() + self.run_dir = self.tmp_path('run', self.new_root) + + def test_collect_logs_with_userdata_requires_root_user(self, m_getuid): + """collect-logs errors when non-root user collects userdata .""" + m_getuid.return_value = 100 # non-root + output_tarfile = self.tmp_path('logs.tgz') + with mock.patch('sys.stderr', new_callable=StringIO) as m_stderr: + self.assertEqual( + 1, logs.collect_logs(output_tarfile, include_userdata=True)) + self.assertEqual( + 'To include userdata, root user is required.' + ' Try sudo cloud-init collect-logs\n', + m_stderr.getvalue()) + + def test_collect_logs_creates_tarfile(self, m_getuid): + """collect-logs creates a tarfile with all related cloud-init info.""" + m_getuid.return_value = 100 + log1 = self.tmp_path('cloud-init.log', self.new_root) + write_file(log1, 'cloud-init-log') + log2 = self.tmp_path('cloud-init-output.log', self.new_root) + write_file(log2, 'cloud-init-output-log') + ensure_dir(self.run_dir) + write_file(self.tmp_path('results.json', self.run_dir), 'results') + write_file(self.tmp_path(INSTANCE_JSON_SENSITIVE_FILE, self.run_dir), + 'sensitive') + output_tarfile = self.tmp_path('logs.tgz') + + date = datetime.utcnow().date().strftime('%Y-%m-%d') + date_logdir = 'cloud-init-logs-{0}'.format(date) + + version_out = '/usr/bin/cloud-init 18.2fake\n' + expected_subp = { + ('dpkg-query', '--show', "-f=${Version}\n", 'cloud-init'): + '0.7fake\n', + ('cloud-init', '--version'): version_out, + ('dmesg',): 'dmesg-out\n', + ('journalctl', '--boot=0', '-o', 'short-precise'): 'journal-out\n', + ('tar', 'czvf', output_tarfile, date_logdir): '' + } + + def fake_subp(cmd): + cmd_tuple = tuple(cmd) + if cmd_tuple not in expected_subp: + raise AssertionError( + 'Unexpected command provided to subp: {0}'.format(cmd)) + if cmd == ['tar', 'czvf', output_tarfile, date_logdir]: + subp(cmd) # Pass through tar cmd so we can check output + return expected_subp[cmd_tuple], '' + + fake_stderr = mock.MagicMock() + + wrap_and_call( + 'cloudinit.cmd.devel.logs', + {'subp': {'side_effect': fake_subp}, + 'sys.stderr': {'new': fake_stderr}, + 'CLOUDINIT_LOGS': {'new': [log1, log2]}, + 'CLOUDINIT_RUN_DIR': {'new': self.run_dir}}, + logs.collect_logs, output_tarfile, include_userdata=False) + # unpack the tarfile and check file contents + subp(['tar', 'zxvf', output_tarfile, '-C', self.new_root]) + out_logdir = self.tmp_path(date_logdir, self.new_root) + self.assertFalse( + os.path.exists( + os.path.join(out_logdir, 'run', 'cloud-init', + INSTANCE_JSON_SENSITIVE_FILE)), + 'Unexpected file found: %s' % INSTANCE_JSON_SENSITIVE_FILE) + self.assertEqual( + '0.7fake\n', + load_file(os.path.join(out_logdir, 'dpkg-version'))) + self.assertEqual(version_out, + load_file(os.path.join(out_logdir, 'version'))) + self.assertEqual( + 'cloud-init-log', + load_file(os.path.join(out_logdir, 'cloud-init.log'))) + self.assertEqual( + 'cloud-init-output-log', + load_file(os.path.join(out_logdir, 'cloud-init-output.log'))) + self.assertEqual( + 'dmesg-out\n', + load_file(os.path.join(out_logdir, 'dmesg.txt'))) + self.assertEqual( + 'journal-out\n', + load_file(os.path.join(out_logdir, 'journal.txt'))) + self.assertEqual( + 'results', + load_file( + os.path.join(out_logdir, 'run', 'cloud-init', 'results.json'))) + fake_stderr.write.assert_any_call('Wrote %s\n' % output_tarfile) + + def test_collect_logs_includes_optional_userdata(self, m_getuid): + """collect-logs include userdata when --include-userdata is set.""" + m_getuid.return_value = 0 + log1 = self.tmp_path('cloud-init.log', self.new_root) + write_file(log1, 'cloud-init-log') + log2 = self.tmp_path('cloud-init-output.log', self.new_root) + write_file(log2, 'cloud-init-output-log') + userdata = self.tmp_path('user-data.txt', self.new_root) + write_file(userdata, 'user-data') + ensure_dir(self.run_dir) + write_file(self.tmp_path('results.json', self.run_dir), 'results') + write_file(self.tmp_path(INSTANCE_JSON_SENSITIVE_FILE, self.run_dir), + 'sensitive') + output_tarfile = self.tmp_path('logs.tgz') + + date = datetime.utcnow().date().strftime('%Y-%m-%d') + date_logdir = 'cloud-init-logs-{0}'.format(date) + + version_out = '/usr/bin/cloud-init 18.2fake\n' + expected_subp = { + ('dpkg-query', '--show', "-f=${Version}\n", 'cloud-init'): + '0.7fake', + ('cloud-init', '--version'): version_out, + ('dmesg',): 'dmesg-out\n', + ('journalctl', '--boot=0', '-o', 'short-precise'): 'journal-out\n', + ('tar', 'czvf', output_tarfile, date_logdir): '' + } + + def fake_subp(cmd): + cmd_tuple = tuple(cmd) + if cmd_tuple not in expected_subp: + raise AssertionError( + 'Unexpected command provided to subp: {0}'.format(cmd)) + if cmd == ['tar', 'czvf', output_tarfile, date_logdir]: + subp(cmd) # Pass through tar cmd so we can check output + return expected_subp[cmd_tuple], '' + + fake_stderr = mock.MagicMock() + + wrap_and_call( + 'cloudinit.cmd.devel.logs', + {'subp': {'side_effect': fake_subp}, + 'sys.stderr': {'new': fake_stderr}, + 'CLOUDINIT_LOGS': {'new': [log1, log2]}, + 'CLOUDINIT_RUN_DIR': {'new': self.run_dir}, + 'USER_DATA_FILE': {'new': userdata}}, + logs.collect_logs, output_tarfile, include_userdata=True) + # unpack the tarfile and check file contents + subp(['tar', 'zxvf', output_tarfile, '-C', self.new_root]) + out_logdir = self.tmp_path(date_logdir, self.new_root) + self.assertEqual( + 'user-data', + load_file(os.path.join(out_logdir, 'user-data.txt'))) + self.assertEqual( + 'sensitive', + load_file(os.path.join(out_logdir, 'run', 'cloud-init', + INSTANCE_JSON_SENSITIVE_FILE))) + fake_stderr.write.assert_any_call('Wrote %s\n' % output_tarfile) diff --git a/tests/unittests/cmd/devel/test_render.py b/tests/unittests/cmd/devel/test_render.py new file mode 100644 index 00000000..c7ddca3d --- /dev/null +++ b/tests/unittests/cmd/devel/test_render.py @@ -0,0 +1,144 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +import os +from io import StringIO + +from collections import namedtuple +from cloudinit.cmd.devel import render +from cloudinit.helpers import Paths +from cloudinit.sources import INSTANCE_JSON_FILE, INSTANCE_JSON_SENSITIVE_FILE +from tests.unittests.helpers import CiTestCase, mock, skipUnlessJinja +from cloudinit.util import ensure_dir, write_file + + +class TestRender(CiTestCase): + + with_logs = True + + args = namedtuple('renderargs', 'user_data instance_data debug') + + def setUp(self): + super(TestRender, self).setUp() + self.tmp = self.tmp_dir() + + def test_handle_args_error_on_missing_user_data(self): + """When user_data file path does not exist, log an error.""" + absent_file = self.tmp_path('user-data', dir=self.tmp) + instance_data = self.tmp_path('instance-data', dir=self.tmp) + write_file(instance_data, '{}') + args = self.args( + user_data=absent_file, instance_data=instance_data, debug=False) + with mock.patch('sys.stderr', new_callable=StringIO): + self.assertEqual(1, render.handle_args('anyname', args)) + self.assertIn( + 'Missing user-data file: %s' % absent_file, + self.logs.getvalue()) + + def test_handle_args_error_on_missing_instance_data(self): + """When instance_data file path does not exist, log an error.""" + user_data = self.tmp_path('user-data', dir=self.tmp) + absent_file = self.tmp_path('instance-data', dir=self.tmp) + args = self.args( + user_data=user_data, instance_data=absent_file, debug=False) + with mock.patch('sys.stderr', new_callable=StringIO): + self.assertEqual(1, render.handle_args('anyname', args)) + self.assertIn( + 'Missing instance-data.json file: %s' % absent_file, + self.logs.getvalue()) + + def test_handle_args_defaults_instance_data(self): + """When no instance_data argument, default to configured run_dir.""" + user_data = self.tmp_path('user-data', dir=self.tmp) + run_dir = self.tmp_path('run_dir', dir=self.tmp) + ensure_dir(run_dir) + paths = Paths({'run_dir': run_dir}) + self.add_patch('cloudinit.cmd.devel.render.read_cfg_paths', 'm_paths') + self.m_paths.return_value = paths + args = self.args( + user_data=user_data, instance_data=None, debug=False) + with mock.patch('sys.stderr', new_callable=StringIO): + self.assertEqual(1, render.handle_args('anyname', args)) + json_file = os.path.join(run_dir, INSTANCE_JSON_FILE) + self.assertIn( + 'Missing instance-data.json file: %s' % json_file, + self.logs.getvalue()) + + def test_handle_args_root_fallback_from_sensitive_instance_data(self): + """When root user defaults to sensitive.json.""" + user_data = self.tmp_path('user-data', dir=self.tmp) + run_dir = self.tmp_path('run_dir', dir=self.tmp) + ensure_dir(run_dir) + paths = Paths({'run_dir': run_dir}) + self.add_patch('cloudinit.cmd.devel.render.read_cfg_paths', 'm_paths') + self.m_paths.return_value = paths + args = self.args( + user_data=user_data, instance_data=None, debug=False) + with mock.patch('sys.stderr', new_callable=StringIO): + with mock.patch('os.getuid') as m_getuid: + m_getuid.return_value = 0 + self.assertEqual(1, render.handle_args('anyname', args)) + json_file = os.path.join(run_dir, INSTANCE_JSON_FILE) + json_sensitive = os.path.join(run_dir, INSTANCE_JSON_SENSITIVE_FILE) + self.assertIn( + 'WARNING: Missing root-readable %s. Using redacted %s' % ( + json_sensitive, json_file), self.logs.getvalue()) + self.assertIn( + 'ERROR: Missing instance-data.json file: %s' % json_file, + self.logs.getvalue()) + + def test_handle_args_root_uses_sensitive_instance_data(self): + """When root user, and no instance-data arg, use sensitive.json.""" + user_data = self.tmp_path('user-data', dir=self.tmp) + write_file(user_data, '##template: jinja\nrendering: {{ my_var }}') + run_dir = self.tmp_path('run_dir', dir=self.tmp) + ensure_dir(run_dir) + json_sensitive = os.path.join(run_dir, INSTANCE_JSON_SENSITIVE_FILE) + write_file(json_sensitive, '{"my-var": "jinja worked"}') + paths = Paths({'run_dir': run_dir}) + self.add_patch('cloudinit.cmd.devel.render.read_cfg_paths', 'm_paths') + self.m_paths.return_value = paths + args = self.args( + user_data=user_data, instance_data=None, debug=False) + with mock.patch('sys.stderr', new_callable=StringIO): + with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: + with mock.patch('os.getuid') as m_getuid: + m_getuid.return_value = 0 + self.assertEqual(0, render.handle_args('anyname', args)) + self.assertIn('rendering: jinja worked', m_stdout.getvalue()) + + @skipUnlessJinja() + def test_handle_args_renders_instance_data_vars_in_template(self): + """If user_data file is a jinja template render instance-data vars.""" + user_data = self.tmp_path('user-data', dir=self.tmp) + write_file(user_data, '##template: jinja\nrendering: {{ my_var }}') + instance_data = self.tmp_path('instance-data', dir=self.tmp) + write_file(instance_data, '{"my-var": "jinja worked"}') + args = self.args( + user_data=user_data, instance_data=instance_data, debug=True) + with mock.patch('sys.stderr', new_callable=StringIO) as m_console_err: + with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: + self.assertEqual(0, render.handle_args('anyname', args)) + self.assertIn( + 'DEBUG: Converted jinja variables\n{', self.logs.getvalue()) + self.assertIn( + 'DEBUG: Converted jinja variables\n{', m_console_err.getvalue()) + self.assertEqual('rendering: jinja worked', m_stdout.getvalue()) + + @skipUnlessJinja() + def test_handle_args_warns_and_gives_up_on_invalid_jinja_operation(self): + """If user_data file has invalid jinja operations log warnings.""" + user_data = self.tmp_path('user-data', dir=self.tmp) + write_file(user_data, '##template: jinja\nrendering: {{ my-var }}') + instance_data = self.tmp_path('instance-data', dir=self.tmp) + write_file(instance_data, '{"my-var": "jinja worked"}') + args = self.args( + user_data=user_data, instance_data=instance_data, debug=True) + with mock.patch('sys.stderr', new_callable=StringIO): + self.assertEqual(1, render.handle_args('anyname', args)) + self.assertIn( + 'WARNING: Ignoring jinja template for %s: Undefined jinja' + ' variable: "my-var". Jinja tried subtraction. Perhaps you meant' + ' "my_var"?' % user_data, + self.logs.getvalue()) + +# vi: ts=4 expandtab diff --git a/tests/unittests/cmd/test_clean.py b/tests/unittests/cmd/test_clean.py new file mode 100644 index 00000000..81fc930e --- /dev/null +++ b/tests/unittests/cmd/test_clean.py @@ -0,0 +1,178 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +from cloudinit.cmd import clean +from cloudinit.util import ensure_dir, sym_link, write_file +from tests.unittests.helpers import CiTestCase, wrap_and_call, mock +from collections import namedtuple +import os +from io import StringIO + +mypaths = namedtuple('MyPaths', 'cloud_dir') + + +class TestClean(CiTestCase): + + def setUp(self): + super(TestClean, self).setUp() + self.new_root = self.tmp_dir() + self.artifact_dir = self.tmp_path('artifacts', self.new_root) + self.log1 = self.tmp_path('cloud-init.log', self.new_root) + self.log2 = self.tmp_path('cloud-init-output.log', self.new_root) + + class FakeInit(object): + cfg = {'def_log_file': self.log1, + 'output': {'all': '|tee -a {0}'.format(self.log2)}} + # Ensure cloud_dir has a trailing slash, to match real behaviour + paths = mypaths(cloud_dir='{}/'.format(self.artifact_dir)) + + def __init__(self, ds_deps): + pass + + def read_cfg(self): + pass + + self.init_class = FakeInit + + def test_remove_artifacts_removes_logs(self): + """remove_artifacts removes logs when remove_logs is True.""" + write_file(self.log1, 'cloud-init-log') + write_file(self.log2, 'cloud-init-output-log') + + self.assertFalse( + os.path.exists(self.artifact_dir), 'Unexpected artifacts dir') + retcode = wrap_and_call( + 'cloudinit.cmd.clean', + {'Init': {'side_effect': self.init_class}}, + clean.remove_artifacts, remove_logs=True) + self.assertFalse(os.path.exists(self.log1), 'Unexpected file') + self.assertFalse(os.path.exists(self.log2), 'Unexpected file') + self.assertEqual(0, retcode) + + def test_remove_artifacts_preserves_logs(self): + """remove_artifacts leaves logs when remove_logs is False.""" + write_file(self.log1, 'cloud-init-log') + write_file(self.log2, 'cloud-init-output-log') + + retcode = wrap_and_call( + 'cloudinit.cmd.clean', + {'Init': {'side_effect': self.init_class}}, + clean.remove_artifacts, remove_logs=False) + self.assertTrue(os.path.exists(self.log1), 'Missing expected file') + self.assertTrue(os.path.exists(self.log2), 'Missing expected file') + self.assertEqual(0, retcode) + + def test_remove_artifacts_removes_unlinks_symlinks(self): + """remove_artifacts cleans artifacts dir unlinking any symlinks.""" + dir1 = os.path.join(self.artifact_dir, 'dir1') + ensure_dir(dir1) + symlink = os.path.join(self.artifact_dir, 'mylink') + sym_link(dir1, symlink) + + retcode = wrap_and_call( + 'cloudinit.cmd.clean', + {'Init': {'side_effect': self.init_class}}, + clean.remove_artifacts, remove_logs=False) + self.assertEqual(0, retcode) + for path in (dir1, symlink): + self.assertFalse( + os.path.exists(path), + 'Unexpected {0} dir'.format(path)) + + def test_remove_artifacts_removes_artifacts_skipping_seed(self): + """remove_artifacts cleans artifacts dir with exception of seed dir.""" + dirs = [ + self.artifact_dir, + os.path.join(self.artifact_dir, 'seed'), + os.path.join(self.artifact_dir, 'dir1'), + os.path.join(self.artifact_dir, 'dir2')] + for _dir in dirs: + ensure_dir(_dir) + + retcode = wrap_and_call( + 'cloudinit.cmd.clean', + {'Init': {'side_effect': self.init_class}}, + clean.remove_artifacts, remove_logs=False) + self.assertEqual(0, retcode) + for expected_dir in dirs[:2]: + self.assertTrue( + os.path.exists(expected_dir), + 'Missing {0} dir'.format(expected_dir)) + for deleted_dir in dirs[2:]: + self.assertFalse( + os.path.exists(deleted_dir), + 'Unexpected {0} dir'.format(deleted_dir)) + + def test_remove_artifacts_removes_artifacts_removes_seed(self): + """remove_artifacts removes seed dir when remove_seed is True.""" + dirs = [ + self.artifact_dir, + os.path.join(self.artifact_dir, 'seed'), + os.path.join(self.artifact_dir, 'dir1'), + os.path.join(self.artifact_dir, 'dir2')] + for _dir in dirs: + ensure_dir(_dir) + + retcode = wrap_and_call( + 'cloudinit.cmd.clean', + {'Init': {'side_effect': self.init_class}}, + clean.remove_artifacts, remove_logs=False, remove_seed=True) + self.assertEqual(0, retcode) + self.assertTrue( + os.path.exists(self.artifact_dir), 'Missing artifact dir') + for deleted_dir in dirs[1:]: + self.assertFalse( + os.path.exists(deleted_dir), + 'Unexpected {0} dir'.format(deleted_dir)) + + def test_remove_artifacts_returns_one_on_errors(self): + """remove_artifacts returns non-zero on failure and prints an error.""" + ensure_dir(self.artifact_dir) + ensure_dir(os.path.join(self.artifact_dir, 'dir1')) + + with mock.patch('sys.stderr', new_callable=StringIO) as m_stderr: + retcode = wrap_and_call( + 'cloudinit.cmd.clean', + {'del_dir': {'side_effect': OSError('oops')}, + 'Init': {'side_effect': self.init_class}}, + clean.remove_artifacts, remove_logs=False) + self.assertEqual(1, retcode) + self.assertEqual( + 'ERROR: Could not remove %s/dir1: oops\n' % self.artifact_dir, + m_stderr.getvalue()) + + def test_handle_clean_args_reboots(self): + """handle_clean_args_reboots when reboot arg is provided.""" + + called_cmds = [] + + def fake_subp(cmd, capture): + called_cmds.append((cmd, capture)) + return '', '' + + myargs = namedtuple('MyArgs', 'remove_logs remove_seed reboot') + cmdargs = myargs(remove_logs=False, remove_seed=False, reboot=True) + retcode = wrap_and_call( + 'cloudinit.cmd.clean', + {'subp': {'side_effect': fake_subp}, + 'Init': {'side_effect': self.init_class}}, + clean.handle_clean_args, name='does not matter', args=cmdargs) + self.assertEqual(0, retcode) + self.assertEqual( + [(['shutdown', '-r', 'now'], False)], called_cmds) + + def test_status_main(self): + '''clean.main can be run as a standalone script.''' + write_file(self.log1, 'cloud-init-log') + with self.assertRaises(SystemExit) as context_manager: + wrap_and_call( + 'cloudinit.cmd.clean', + {'Init': {'side_effect': self.init_class}, + 'sys.argv': {'new': ['clean', '--logs']}}, + clean.main) + + self.assertEqual(0, context_manager.exception.code) + self.assertFalse( + os.path.exists(self.log1), 'Unexpected log {0}'.format(self.log1)) + + +# vi: ts=4 expandtab syntax=python diff --git a/tests/unittests/cmd/test_cloud_id.py b/tests/unittests/cmd/test_cloud_id.py new file mode 100644 index 00000000..12fc80e8 --- /dev/null +++ b/tests/unittests/cmd/test_cloud_id.py @@ -0,0 +1,127 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +"""Tests for cloud-id command line utility.""" + +from cloudinit import util +from collections import namedtuple +from io import StringIO + +from cloudinit.cmd import cloud_id + +from tests.unittests.helpers import CiTestCase, mock + + +class TestCloudId(CiTestCase): + + args = namedtuple('cloudidargs', ('instance_data json long')) + + def setUp(self): + super(TestCloudId, self).setUp() + self.tmp = self.tmp_dir() + self.instance_data = self.tmp_path('instance-data.json', dir=self.tmp) + + def test_cloud_id_arg_parser_defaults(self): + """Validate the argument defaults when not provided by the end-user.""" + cmd = ['cloud-id'] + with mock.patch('sys.argv', cmd): + args = cloud_id.get_parser().parse_args() + self.assertEqual( + '/run/cloud-init/instance-data.json', + args.instance_data) + self.assertEqual(False, args.long) + self.assertEqual(False, args.json) + + def test_cloud_id_arg_parse_overrides(self): + """Override argument defaults by specifying values for each param.""" + util.write_file(self.instance_data, '{}') + cmd = ['cloud-id', '--instance-data', self.instance_data, '--long', + '--json'] + with mock.patch('sys.argv', cmd): + args = cloud_id.get_parser().parse_args() + self.assertEqual(self.instance_data, args.instance_data) + self.assertEqual(True, args.long) + self.assertEqual(True, args.json) + + def test_cloud_id_missing_instance_data_json(self): + """Exit error when the provided instance-data.json does not exist.""" + cmd = ['cloud-id', '--instance-data', self.instance_data] + with mock.patch('sys.argv', cmd): + with mock.patch('sys.stderr', new_callable=StringIO) as m_stderr: + with self.assertRaises(SystemExit) as context_manager: + cloud_id.main() + self.assertEqual(1, context_manager.exception.code) + self.assertIn( + "ERROR: File not found '%s'" % self.instance_data, + m_stderr.getvalue()) + + def test_cloud_id_non_json_instance_data(self): + """Exit error when the provided instance-data.json is not json.""" + cmd = ['cloud-id', '--instance-data', self.instance_data] + util.write_file(self.instance_data, '{') + with mock.patch('sys.argv', cmd): + with mock.patch('sys.stderr', new_callable=StringIO) as m_stderr: + with self.assertRaises(SystemExit) as context_manager: + cloud_id.main() + self.assertEqual(1, context_manager.exception.code) + self.assertIn( + "ERROR: File '%s' is not valid json." % self.instance_data, + m_stderr.getvalue()) + + def test_cloud_id_from_cloud_name_in_instance_data(self): + """Report canonical cloud-id from cloud_name in instance-data.""" + util.write_file( + self.instance_data, + '{"v1": {"cloud_name": "mycloud", "region": "somereg"}}') + cmd = ['cloud-id', '--instance-data', self.instance_data] + with mock.patch('sys.argv', cmd): + with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: + with self.assertRaises(SystemExit) as context_manager: + cloud_id.main() + self.assertEqual(0, context_manager.exception.code) + self.assertEqual("mycloud\n", m_stdout.getvalue()) + + def test_cloud_id_long_name_from_instance_data(self): + """Report long cloud-id format from cloud_name and region.""" + util.write_file( + self.instance_data, + '{"v1": {"cloud_name": "mycloud", "region": "somereg"}}') + cmd = ['cloud-id', '--instance-data', self.instance_data, '--long'] + with mock.patch('sys.argv', cmd): + with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: + with self.assertRaises(SystemExit) as context_manager: + cloud_id.main() + self.assertEqual(0, context_manager.exception.code) + self.assertEqual("mycloud\tsomereg\n", m_stdout.getvalue()) + + def test_cloud_id_lookup_from_instance_data_region(self): + """Report discovered canonical cloud_id when region lookup matches.""" + util.write_file( + self.instance_data, + '{"v1": {"cloud_name": "aws", "region": "cn-north-1",' + ' "platform": "ec2"}}') + cmd = ['cloud-id', '--instance-data', self.instance_data, '--long'] + with mock.patch('sys.argv', cmd): + with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: + with self.assertRaises(SystemExit) as context_manager: + cloud_id.main() + self.assertEqual(0, context_manager.exception.code) + self.assertEqual("aws-china\tcn-north-1\n", m_stdout.getvalue()) + + def test_cloud_id_lookup_json_instance_data_adds_cloud_id_to_json(self): + """Report v1 instance-data content with cloud_id when --json set.""" + util.write_file( + self.instance_data, + '{"v1": {"cloud_name": "unknown", "region": "dfw",' + ' "platform": "openstack", "public_ssh_keys": []}}') + expected = util.json_dumps({ + 'cloud_id': 'openstack', 'cloud_name': 'unknown', + 'platform': 'openstack', 'public_ssh_keys': [], 'region': 'dfw'}) + cmd = ['cloud-id', '--instance-data', self.instance_data, '--json'] + with mock.patch('sys.argv', cmd): + with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: + with self.assertRaises(SystemExit) as context_manager: + cloud_id.main() + self.assertEqual(0, context_manager.exception.code) + self.assertEqual(expected + '\n', m_stdout.getvalue()) + +# vi: ts=4 expandtab diff --git a/tests/unittests/cmd/test_main.py b/tests/unittests/cmd/test_main.py new file mode 100644 index 00000000..e1ce682b --- /dev/null +++ b/tests/unittests/cmd/test_main.py @@ -0,0 +1,188 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +from collections import namedtuple +import copy +import os +from io import StringIO +from unittest import mock + +import pytest + +from cloudinit.cmd import main +from cloudinit import safeyaml +from cloudinit.util import ( + ensure_dir, load_file, write_file) +from tests.unittests.helpers import ( + FilesystemMockingTestCase, wrap_and_call) + +mypaths = namedtuple('MyPaths', 'run_dir') +myargs = namedtuple('MyArgs', 'debug files force local reporter subcommand') + + +class TestMain(FilesystemMockingTestCase): + with_logs = True + allowed_subp = False + + def setUp(self): + super(TestMain, self).setUp() + self.new_root = self.tmp_dir() + self.cloud_dir = self.tmp_path('var/lib/cloud/', dir=self.new_root) + os.makedirs(self.cloud_dir) + self.replicateTestRoot('simple_ubuntu', self.new_root) + self.cfg = { + 'datasource_list': ['None'], + 'runcmd': ['ls /etc'], # test ALL_DISTROS + 'system_info': {'paths': {'cloud_dir': self.cloud_dir, + 'run_dir': self.new_root}}, + 'write_files': [ + { + 'path': '/etc/blah.ini', + 'content': 'blah', + 'permissions': 0o755, + }, + ], + 'cloud_init_modules': ['write-files', 'runcmd'], + } + cloud_cfg = safeyaml.dumps(self.cfg) + ensure_dir(os.path.join(self.new_root, 'etc', 'cloud')) + self.cloud_cfg_file = os.path.join( + self.new_root, 'etc', 'cloud', 'cloud.cfg') + write_file(self.cloud_cfg_file, cloud_cfg) + self.patchOS(self.new_root) + self.patchUtils(self.new_root) + self.stderr = StringIO() + self.patchStdoutAndStderr(stderr=self.stderr) + + def test_main_init_run_net_stops_on_file_no_net(self): + """When no-net file is present, main_init does not process modules.""" + stop_file = os.path.join(self.cloud_dir, 'data', 'no-net') # stop file + write_file(stop_file, '') + cmdargs = myargs( + debug=False, files=None, force=False, local=False, reporter=None, + subcommand='init') + (_item1, item2) = wrap_and_call( + 'cloudinit.cmd.main', + {'util.close_stdin': True, + 'netinfo.debug_info': 'my net debug info', + 'util.fixup_output': ('outfmt', 'errfmt')}, + main.main_init, 'init', cmdargs) + # We should not run write_files module + self.assertFalse( + os.path.exists(os.path.join(self.new_root, 'etc/blah.ini')), + 'Unexpected run of write_files module produced blah.ini') + self.assertEqual([], item2) + # Instancify is called + instance_id_path = 'var/lib/cloud/data/instance-id' + self.assertFalse( + os.path.exists(os.path.join(self.new_root, instance_id_path)), + 'Unexpected call to datasource.instancify produced instance-id') + expected_logs = [ + "Exiting. stop file ['{stop_file}'] existed\n".format( + stop_file=stop_file), + 'my net debug info' # netinfo.debug_info + ] + for log in expected_logs: + self.assertIn(log, self.stderr.getvalue()) + + def test_main_init_run_net_runs_modules(self): + """Modules like write_files are run in 'net' mode.""" + cmdargs = myargs( + debug=False, files=None, force=False, local=False, reporter=None, + subcommand='init') + (_item1, item2) = wrap_and_call( + 'cloudinit.cmd.main', + {'util.close_stdin': True, + 'netinfo.debug_info': 'my net debug info', + 'util.fixup_output': ('outfmt', 'errfmt')}, + main.main_init, 'init', cmdargs) + self.assertEqual([], item2) + # Instancify is called + instance_id_path = 'var/lib/cloud/data/instance-id' + self.assertEqual( + 'iid-datasource-none\n', + os.path.join(load_file( + os.path.join(self.new_root, instance_id_path)))) + # modules are run (including write_files) + self.assertEqual( + 'blah', load_file(os.path.join(self.new_root, 'etc/blah.ini'))) + expected_logs = [ + 'network config is disabled by fallback', # apply_network_config + 'my net debug info', # netinfo.debug_info + 'no previous run detected' + ] + for log in expected_logs: + self.assertIn(log, self.stderr.getvalue()) + + def test_main_init_run_net_calls_set_hostname_when_metadata_present(self): + """When local-hostname metadata is present, call cc_set_hostname.""" + self.cfg['datasource'] = { + 'None': {'metadata': {'local-hostname': 'md-hostname'}}} + cloud_cfg = safeyaml.dumps(self.cfg) + write_file(self.cloud_cfg_file, cloud_cfg) + cmdargs = myargs( + debug=False, files=None, force=False, local=False, reporter=None, + subcommand='init') + + def set_hostname(name, cfg, cloud, log, args): + self.assertEqual('set-hostname', name) + updated_cfg = copy.deepcopy(self.cfg) + updated_cfg.update( + {'def_log_file': '/var/log/cloud-init.log', + 'log_cfgs': [], + 'syslog_fix_perms': [ + 'syslog:adm', 'root:adm', 'root:wheel', 'root:root' + ], + 'vendor_data': {'enabled': True, 'prefix': []}, + 'vendor_data2': {'enabled': True, 'prefix': []}}) + updated_cfg.pop('system_info') + + self.assertEqual(updated_cfg, cfg) + self.assertEqual(main.LOG, log) + self.assertIsNone(args) + + (_item1, item2) = wrap_and_call( + 'cloudinit.cmd.main', + {'util.close_stdin': True, + 'netinfo.debug_info': 'my net debug info', + 'cc_set_hostname.handle': {'side_effect': set_hostname}, + 'util.fixup_output': ('outfmt', 'errfmt')}, + main.main_init, 'init', cmdargs) + self.assertEqual([], item2) + # Instancify is called + instance_id_path = 'var/lib/cloud/data/instance-id' + self.assertEqual( + 'iid-datasource-none\n', + os.path.join(load_file( + os.path.join(self.new_root, instance_id_path)))) + # modules are run (including write_files) + self.assertEqual( + 'blah', load_file(os.path.join(self.new_root, 'etc/blah.ini'))) + expected_logs = [ + 'network config is disabled by fallback', # apply_network_config + 'my net debug info', # netinfo.debug_info + 'no previous run detected' + ] + for log in expected_logs: + self.assertIn(log, self.stderr.getvalue()) + + +class TestShouldBringUpInterfaces: + @pytest.mark.parametrize('cfg_disable,args_local,expected', [ + (True, True, False), + (True, False, False), + (False, True, False), + (False, False, True), + ]) + def test_should_bring_up_interfaces( + self, cfg_disable, args_local, expected + ): + init = mock.Mock() + init.cfg = {'disable_network_activation': cfg_disable} + + args = mock.Mock() + args.local = args_local + + result = main._should_bring_up_interfaces(init, args) + assert result == expected + +# vi: ts=4 expandtab diff --git a/tests/unittests/cmd/test_query.py b/tests/unittests/cmd/test_query.py new file mode 100644 index 00000000..b3f1d98d --- /dev/null +++ b/tests/unittests/cmd/test_query.py @@ -0,0 +1,392 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +import errno +import gzip +from io import BytesIO +import json +from textwrap import dedent + +import pytest + +from collections import namedtuple +from cloudinit.cmd import query +from cloudinit.helpers import Paths +from cloudinit.sources import ( + REDACT_SENSITIVE_VALUE, INSTANCE_JSON_FILE, INSTANCE_JSON_SENSITIVE_FILE) +from tests.unittests.helpers import mock + +from cloudinit.util import b64e, write_file + + +def _gzip_data(data): + with BytesIO() as iobuf: + with gzip.GzipFile(mode="wb", fileobj=iobuf) as gzfp: + gzfp.write(data) + return iobuf.getvalue() + + +@mock.patch("cloudinit.cmd.query.addLogHandlerCLI", lambda *args: "") +class TestQuery: + + args = namedtuple( + 'queryargs', + ('debug dump_all format instance_data list_keys user_data vendor_data' + ' varname')) + + def _setup_paths(self, tmpdir, ud_val=None, vd_val=None): + """Write userdata and vendordata into a tmpdir. + + Return: + 4-tuple : (paths, run_dir_path, userdata_path, vendordata_path) + """ + if ud_val: + user_data = tmpdir.join('user-data') + write_file(user_data.strpath, ud_val) + else: + user_data = None + if vd_val: + vendor_data = tmpdir.join('vendor-data') + write_file(vendor_data.strpath, vd_val) + else: + vendor_data = None + run_dir = tmpdir.join('run_dir') + run_dir.ensure_dir() + return ( + Paths({'run_dir': run_dir.strpath}), + run_dir, + user_data, + vendor_data + ) + + def test_handle_args_error_on_missing_param(self, caplog, capsys): + """Error when missing required parameters and print usage.""" + args = self.args( + debug=False, dump_all=False, format=None, instance_data=None, + list_keys=False, user_data=None, vendor_data=None, varname=None) + with mock.patch( + "cloudinit.cmd.query.addLogHandlerCLI", return_value="" + ) as m_cli_log: + assert 1 == query.handle_args('anyname', args) + expected_error = ( + 'Expected one of the options: --all, --format, --list-keys' + ' or varname\n') + assert expected_error in caplog.text + out, _err = capsys.readouterr() + assert 'usage: query' in out + assert 1 == m_cli_log.call_count + + @pytest.mark.parametrize( + "inst_data,varname,expected_error", ( + ( + '{"v1": {"key-2": "value-2"}}', + 'v1.absent_leaf', + "instance-data 'v1' has no 'absent_leaf'\n" + ), + ( + '{"v1": {"key-2": "value-2"}}', + 'absent_key', + "Undefined instance-data key 'absent_key'\n" + ), + ) + ) + def test_handle_args_error_on_invalid_vaname_paths( + self, inst_data, varname, expected_error, caplog, tmpdir + ): + """Error when varname is not a valid instance-data variable path.""" + instance_data = tmpdir.join('instance-data') + instance_data.write(inst_data) + args = self.args( + debug=False, dump_all=False, format=None, + instance_data=instance_data.strpath, + list_keys=False, user_data=None, vendor_data=None, varname=varname + ) + paths, _, _, _ = self._setup_paths(tmpdir) + with mock.patch('cloudinit.cmd.query.read_cfg_paths') as m_paths: + m_paths.return_value = paths + with mock.patch( + "cloudinit.cmd.query.addLogHandlerCLI", return_value="" + ): + assert 1 == query.handle_args('anyname', args) + assert expected_error in caplog.text + + def test_handle_args_error_on_missing_instance_data(self, caplog, tmpdir): + """When instance_data file path does not exist, log an error.""" + absent_fn = tmpdir.join('absent') + args = self.args( + debug=False, dump_all=True, format=None, + instance_data=absent_fn.strpath, + list_keys=False, user_data='ud', vendor_data='vd', varname=None) + assert 1 == query.handle_args('anyname', args) + + msg = 'Missing instance-data file: %s' % absent_fn + assert msg in caplog.text + + def test_handle_args_error_when_no_read_permission_instance_data( + self, caplog, tmpdir + ): + """When instance_data file is unreadable, log an error.""" + noread_fn = tmpdir.join('unreadable') + noread_fn.write('thou shall not pass') + args = self.args( + debug=False, dump_all=True, format=None, + instance_data=noread_fn.strpath, + list_keys=False, user_data='ud', vendor_data='vd', varname=None) + with mock.patch('cloudinit.cmd.query.util.load_file') as m_load: + m_load.side_effect = OSError(errno.EACCES, 'Not allowed') + assert 1 == query.handle_args('anyname', args) + msg = "No read permission on '%s'. Try sudo" % noread_fn + assert msg in caplog.text + + def test_handle_args_defaults_instance_data(self, caplog, tmpdir): + """When no instance_data argument, default to configured run_dir.""" + args = self.args( + debug=False, dump_all=True, format=None, instance_data=None, + list_keys=False, user_data=None, vendor_data=None, varname=None) + paths, run_dir, _, _ = self._setup_paths(tmpdir) + with mock.patch('cloudinit.cmd.query.read_cfg_paths') as m_paths: + m_paths.return_value = paths + assert 1 == query.handle_args('anyname', args) + json_file = run_dir.join(INSTANCE_JSON_FILE) + msg = 'Missing instance-data file: %s' % json_file.strpath + assert msg in caplog.text + + def test_handle_args_root_fallsback_to_instance_data(self, caplog, tmpdir): + """When no instance_data argument, root falls back to redacted json.""" + args = self.args( + debug=False, dump_all=True, format=None, instance_data=None, + list_keys=False, user_data=None, vendor_data=None, varname=None) + paths, run_dir, _, _ = self._setup_paths(tmpdir) + with mock.patch('cloudinit.cmd.query.read_cfg_paths') as m_paths: + m_paths.return_value = paths + with mock.patch('os.getuid') as m_getuid: + m_getuid.return_value = 0 + assert 1 == query.handle_args('anyname', args) + json_file = run_dir.join(INSTANCE_JSON_FILE) + sensitive_file = run_dir.join(INSTANCE_JSON_SENSITIVE_FILE) + msg = ( + 'Missing root-readable %s. Using redacted %s instead.' % + ( + sensitive_file.strpath, json_file.strpath + ) + ) + assert msg in caplog.text + + @pytest.mark.parametrize( + 'ud_src,ud_expected,vd_src,vd_expected', + ( + ('hi mom', 'hi mom', 'hi pops', 'hi pops'), + ('ud'.encode('utf-8'), 'ud', 'vd'.encode('utf-8'), 'vd'), + (_gzip_data(b'ud'), 'ud', _gzip_data(b'vd'), 'vd'), + (_gzip_data('ud'.encode('utf-8')), 'ud', _gzip_data(b'vd'), 'vd'), + ) + ) + def test_handle_args_root_processes_user_data( + self, ud_src, ud_expected, vd_src, vd_expected, capsys, tmpdir + ): + """Support reading multiple user-data file content types""" + paths, run_dir, user_data, vendor_data = self._setup_paths( + tmpdir, ud_val=ud_src, vd_val=vd_src + ) + sensitive_file = run_dir.join(INSTANCE_JSON_SENSITIVE_FILE) + sensitive_file.write('{"my-var": "it worked"}') + args = self.args( + debug=False, dump_all=True, format=None, instance_data=None, + list_keys=False, user_data=user_data.strpath, + vendor_data=vendor_data.strpath, varname=None) + with mock.patch('cloudinit.cmd.query.read_cfg_paths') as m_paths: + m_paths.return_value = paths + with mock.patch('os.getuid') as m_getuid: + m_getuid.return_value = 0 + assert 0 == query.handle_args('anyname', args) + out, _err = capsys.readouterr() + cmd_output = json.loads(out) + assert "it worked" == cmd_output['my-var'] + if ud_expected == "ci-b64:": + ud_expected = "ci-b64:{}".format(b64e(ud_src)) + if vd_expected == "ci-b64:": + vd_expected = "ci-b64:{}".format(b64e(vd_src)) + assert ud_expected == cmd_output['userdata'] + assert vd_expected == cmd_output['vendordata'] + + def test_handle_args_root_uses_instance_sensitive_data( + self, capsys, tmpdir + ): + """When no instance_data argument, root uses sensitive json.""" + paths, run_dir, user_data, vendor_data = self._setup_paths( + tmpdir, ud_val='ud', vd_val='vd' + ) + sensitive_file = run_dir.join(INSTANCE_JSON_SENSITIVE_FILE) + sensitive_file.write('{"my-var": "it worked"}') + args = self.args( + debug=False, dump_all=True, format=None, instance_data=None, + list_keys=False, user_data=user_data.strpath, + vendor_data=vendor_data.strpath, varname=None) + with mock.patch('cloudinit.cmd.query.read_cfg_paths') as m_paths: + m_paths.return_value = paths + with mock.patch('os.getuid') as m_getuid: + m_getuid.return_value = 0 + assert 0 == query.handle_args('anyname', args) + expected = ( + '{\n "my-var": "it worked",\n ' + '"userdata": "ud",\n "vendordata": "vd"\n}\n' + ) + out, _err = capsys.readouterr() + assert expected == out + + def test_handle_args_dumps_all_instance_data(self, capsys, tmpdir): + """When --all is specified query will dump all instance data vars.""" + instance_data = tmpdir.join('instance-data') + instance_data.write('{"my-var": "it worked"}') + args = self.args( + debug=False, dump_all=True, format=None, + instance_data=instance_data.strpath, list_keys=False, + user_data='ud', vendor_data='vd', varname=None) + with mock.patch('os.getuid') as m_getuid: + m_getuid.return_value = 100 + assert 0 == query.handle_args('anyname', args) + expected = ( + '{\n "my-var": "it worked",\n "userdata": "<%s> file:ud",\n' + ' "vendordata": "<%s> file:vd"\n}\n' % ( + REDACT_SENSITIVE_VALUE, REDACT_SENSITIVE_VALUE + ) + ) + out, _err = capsys.readouterr() + assert expected == out + + def test_handle_args_returns_top_level_varname(self, capsys, tmpdir): + """When the argument varname is passed, report its value.""" + instance_data = tmpdir.join('instance-data') + instance_data.write('{"my-var": "it worked"}') + args = self.args( + debug=False, dump_all=True, format=None, + instance_data=instance_data.strpath, list_keys=False, + user_data='ud', vendor_data='vd', varname='my_var') + with mock.patch('os.getuid') as m_getuid: + m_getuid.return_value = 100 + assert 0 == query.handle_args('anyname', args) + out, _err = capsys.readouterr() + assert 'it worked\n' == out + + @pytest.mark.parametrize( + 'inst_data,varname,expected', + ( + ( + '{"v1": {"key-2": "value-2"}, "my-var": "it worked"}', + 'v1.key_2', + 'value-2\n' + ), + # Assert no jinja underscore-delimited aliases are reported on CLI + ( + '{"v1": {"something-hyphenated": {"no.underscores":"x",' + ' "no-alias": "y"}}, "my-var": "it worked"}', + 'v1.something_hyphenated', + '{\n "no-alias": "y",\n "no.underscores": "x"\n}\n' + ), + ) + ) + def test_handle_args_returns_nested_varname( + self, inst_data, varname, expected, capsys, tmpdir + ): + """If user_data file is a jinja template render instance-data vars.""" + instance_data = tmpdir.join('instance-data') + instance_data.write(inst_data) + args = self.args( + debug=False, dump_all=False, format=None, + instance_data=instance_data.strpath, user_data='ud', + vendor_data='vd', list_keys=False, varname=varname) + with mock.patch('os.getuid') as m_getuid: + m_getuid.return_value = 100 + assert 0 == query.handle_args('anyname', args) + out, _err = capsys.readouterr() + assert expected == out + + def test_handle_args_returns_standardized_vars_to_top_level_aliases( + self, capsys, tmpdir + ): + """Any standardized vars under v# are promoted as top-level aliases.""" + instance_data = tmpdir.join('instance-data') + instance_data.write( + '{"v1": {"v1_1": "val1.1"}, "v2": {"v2_2": "val2.2"},' + ' "top": "gun"}') + expected = dedent("""\ + { + "top": "gun", + "userdata": "<redacted for non-root user> file:ud", + "v1": { + "v1_1": "val1.1" + }, + "v1_1": "val1.1", + "v2": { + "v2_2": "val2.2" + }, + "v2_2": "val2.2", + "vendordata": "<redacted for non-root user> file:vd" + } + """) + args = self.args( + debug=False, dump_all=True, format=None, + instance_data=instance_data.strpath, user_data='ud', + vendor_data='vd', list_keys=False, varname=None) + with mock.patch('os.getuid') as m_getuid: + m_getuid.return_value = 100 + assert 0 == query.handle_args('anyname', args) + out, _err = capsys.readouterr() + assert expected == out + + def test_handle_args_list_keys_sorts_top_level_keys_when_no_varname( + self, capsys, tmpdir + ): + """Sort all top-level keys when only --list-keys provided.""" + instance_data = tmpdir.join('instance-data') + instance_data.write( + '{"v1": {"v1_1": "val1.1"}, "v2": {"v2_2": "val2.2"},' + ' "top": "gun"}') + expected = 'top\nuserdata\nv1\nv1_1\nv2\nv2_2\nvendordata\n' + args = self.args( + debug=False, dump_all=False, format=None, + instance_data=instance_data.strpath, list_keys=True, + user_data='ud', vendor_data='vd', varname=None) + with mock.patch('os.getuid') as m_getuid: + m_getuid.return_value = 100 + assert 0 == query.handle_args('anyname', args) + out, _err = capsys.readouterr() + assert expected == out + + def test_handle_args_list_keys_sorts_nested_keys_when_varname( + self, capsys, tmpdir + ): + """Sort all nested keys of varname object when --list-keys provided.""" + instance_data = tmpdir.join('instance-data') + instance_data.write( + '{"v1": {"v1_1": "val1.1", "v1_2": "val1.2"}, "v2":' + + ' {"v2_2": "val2.2"}, "top": "gun"}') + expected = 'v1_1\nv1_2\n' + args = self.args( + debug=False, dump_all=False, format=None, + instance_data=instance_data.strpath, list_keys=True, + user_data='ud', vendor_data='vd', varname='v1') + with mock.patch('os.getuid') as m_getuid: + m_getuid.return_value = 100 + assert 0 == query.handle_args('anyname', args) + out, _err = capsys.readouterr() + assert expected == out + + def test_handle_args_list_keys_errors_when_varname_is_not_a_dict( + self, caplog, tmpdir + ): + """Raise an error when --list-keys and varname specify a non-list.""" + instance_data = tmpdir.join('instance-data') + instance_data.write( + '{"v1": {"v1_1": "val1.1", "v1_2": "val1.2"}, "v2": ' + + '{"v2_2": "val2.2"}, "top": "gun"}') + expected_error = "--list-keys provided but 'top' is not a dict" + args = self.args( + debug=False, dump_all=False, format=None, + instance_data=instance_data.strpath, list_keys=True, + user_data='ud', vendor_data='vd', varname='top') + with mock.patch('os.getuid') as m_getuid: + m_getuid.return_value = 100 + assert 1 == query.handle_args('anyname', args) + assert expected_error in caplog.text + +# vi: ts=4 expandtab diff --git a/tests/unittests/cmd/test_status.py b/tests/unittests/cmd/test_status.py new file mode 100644 index 00000000..49eae043 --- /dev/null +++ b/tests/unittests/cmd/test_status.py @@ -0,0 +1,391 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +from collections import namedtuple +import os +from io import StringIO +from textwrap import dedent + +from cloudinit.atomic_helper import write_json +from cloudinit.cmd import status +from cloudinit.util import ensure_file +from tests.unittests.helpers import CiTestCase, wrap_and_call, mock + +mypaths = namedtuple('MyPaths', 'run_dir') +myargs = namedtuple('MyArgs', 'long wait') + + +class TestStatus(CiTestCase): + + def setUp(self): + super(TestStatus, self).setUp() + self.new_root = self.tmp_dir() + self.status_file = self.tmp_path('status.json', self.new_root) + self.disable_file = self.tmp_path('cloudinit-disable', self.new_root) + self.paths = mypaths(run_dir=self.new_root) + + class FakeInit(object): + paths = self.paths + + def __init__(self, ds_deps): + pass + + def read_cfg(self): + pass + + self.init_class = FakeInit + + def test__is_cloudinit_disabled_false_on_sysvinit(self): + '''When not in an environment using systemd, return False.''' + ensure_file(self.disable_file) # Create the ignored disable file + (is_disabled, reason) = wrap_and_call( + 'cloudinit.cmd.status', + {'uses_systemd': False, + 'get_cmdline': "root=/dev/my-root not-important"}, + status._is_cloudinit_disabled, self.disable_file, self.paths) + self.assertFalse( + is_disabled, 'expected enabled cloud-init on sysvinit') + self.assertEqual('Cloud-init enabled on sysvinit', reason) + + def test__is_cloudinit_disabled_true_on_disable_file(self): + '''When using systemd and disable_file is present return disabled.''' + ensure_file(self.disable_file) # Create observed disable file + (is_disabled, reason) = wrap_and_call( + 'cloudinit.cmd.status', + {'uses_systemd': True, + 'get_cmdline': "root=/dev/my-root not-important"}, + status._is_cloudinit_disabled, self.disable_file, self.paths) + self.assertTrue(is_disabled, 'expected disabled cloud-init') + self.assertEqual( + 'Cloud-init disabled by {0}'.format(self.disable_file), reason) + + def test__is_cloudinit_disabled_false_on_kernel_cmdline_enable(self): + '''Not disabled when using systemd and enabled via commandline.''' + ensure_file(self.disable_file) # Create ignored disable file + (is_disabled, reason) = wrap_and_call( + 'cloudinit.cmd.status', + {'uses_systemd': True, + 'get_cmdline': 'something cloud-init=enabled else'}, + status._is_cloudinit_disabled, self.disable_file, self.paths) + self.assertFalse(is_disabled, 'expected enabled cloud-init') + self.assertEqual( + 'Cloud-init enabled by kernel command line cloud-init=enabled', + reason) + + def test__is_cloudinit_disabled_true_on_kernel_cmdline(self): + '''When using systemd and disable_file is present return disabled.''' + (is_disabled, reason) = wrap_and_call( + 'cloudinit.cmd.status', + {'uses_systemd': True, + 'get_cmdline': 'something cloud-init=disabled else'}, + status._is_cloudinit_disabled, self.disable_file, self.paths) + self.assertTrue(is_disabled, 'expected disabled cloud-init') + self.assertEqual( + 'Cloud-init disabled by kernel parameter cloud-init=disabled', + reason) + + def test__is_cloudinit_disabled_true_when_generator_disables(self): + '''When cloud-init-generator doesn't write enabled file return True.''' + enabled_file = os.path.join(self.paths.run_dir, 'enabled') + self.assertFalse(os.path.exists(enabled_file)) + (is_disabled, reason) = wrap_and_call( + 'cloudinit.cmd.status', + {'uses_systemd': True, + 'get_cmdline': 'something'}, + status._is_cloudinit_disabled, self.disable_file, self.paths) + self.assertTrue(is_disabled, 'expected disabled cloud-init') + self.assertEqual('Cloud-init disabled by cloud-init-generator', reason) + + def test__is_cloudinit_disabled_false_when_enabled_in_systemd(self): + '''Report enabled when systemd generator creates the enabled file.''' + enabled_file = os.path.join(self.paths.run_dir, 'enabled') + ensure_file(enabled_file) + (is_disabled, reason) = wrap_and_call( + 'cloudinit.cmd.status', + {'uses_systemd': True, + 'get_cmdline': 'something ignored'}, + status._is_cloudinit_disabled, self.disable_file, self.paths) + self.assertFalse(is_disabled, 'expected enabled cloud-init') + self.assertEqual( + 'Cloud-init enabled by systemd cloud-init-generator', reason) + + def test_status_returns_not_run(self): + '''When status.json does not exist yet, return 'not run'.''' + self.assertFalse( + os.path.exists(self.status_file), 'Unexpected status.json found') + cmdargs = myargs(long=False, wait=False) + with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: + retcode = wrap_and_call( + 'cloudinit.cmd.status', + {'_is_cloudinit_disabled': (False, ''), + 'Init': {'side_effect': self.init_class}}, + status.handle_status_args, 'ignored', cmdargs) + self.assertEqual(0, retcode) + self.assertEqual('status: not run\n', m_stdout.getvalue()) + + def test_status_returns_disabled_long_on_presence_of_disable_file(self): + '''When cloudinit is disabled, return disabled reason.''' + + checked_files = [] + + def fakeexists(filepath): + checked_files.append(filepath) + status_file = os.path.join(self.paths.run_dir, 'status.json') + return bool(not filepath == status_file) + + cmdargs = myargs(long=True, wait=False) + with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: + retcode = wrap_and_call( + 'cloudinit.cmd.status', + {'os.path.exists': {'side_effect': fakeexists}, + '_is_cloudinit_disabled': (True, 'disabled for some reason'), + 'Init': {'side_effect': self.init_class}}, + status.handle_status_args, 'ignored', cmdargs) + self.assertEqual(0, retcode) + self.assertEqual( + [os.path.join(self.paths.run_dir, 'status.json')], + checked_files) + expected = dedent('''\ + status: disabled + detail: + disabled for some reason + ''') + self.assertEqual(expected, m_stdout.getvalue()) + + def test_status_returns_running_on_no_results_json(self): + '''Report running when status.json exists but result.json does not.''' + result_file = self.tmp_path('result.json', self.new_root) + write_json(self.status_file, {}) + self.assertFalse( + os.path.exists(result_file), 'Unexpected result.json found') + cmdargs = myargs(long=False, wait=False) + with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: + retcode = wrap_and_call( + 'cloudinit.cmd.status', + {'_is_cloudinit_disabled': (False, ''), + 'Init': {'side_effect': self.init_class}}, + status.handle_status_args, 'ignored', cmdargs) + self.assertEqual(0, retcode) + self.assertEqual('status: running\n', m_stdout.getvalue()) + + def test_status_returns_running(self): + '''Report running when status exists with an unfinished stage.''' + ensure_file(self.tmp_path('result.json', self.new_root)) + write_json(self.status_file, + {'v1': {'init': {'start': 1, 'finished': None}}}) + cmdargs = myargs(long=False, wait=False) + with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: + retcode = wrap_and_call( + 'cloudinit.cmd.status', + {'_is_cloudinit_disabled': (False, ''), + 'Init': {'side_effect': self.init_class}}, + status.handle_status_args, 'ignored', cmdargs) + self.assertEqual(0, retcode) + self.assertEqual('status: running\n', m_stdout.getvalue()) + + def test_status_returns_done(self): + '''Report done results.json exists no stages are unfinished.''' + ensure_file(self.tmp_path('result.json', self.new_root)) + write_json( + self.status_file, + {'v1': {'stage': None, # No current stage running + 'datasource': ( + 'DataSourceNoCloud [seed=/var/.../seed/nocloud-net]' + '[dsmode=net]'), + 'blah': {'finished': 123.456}, + 'init': {'errors': [], 'start': 124.567, + 'finished': 125.678}, + 'init-local': {'start': 123.45, 'finished': 123.46}}}) + cmdargs = myargs(long=False, wait=False) + with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: + retcode = wrap_and_call( + 'cloudinit.cmd.status', + {'_is_cloudinit_disabled': (False, ''), + 'Init': {'side_effect': self.init_class}}, + status.handle_status_args, 'ignored', cmdargs) + self.assertEqual(0, retcode) + self.assertEqual('status: done\n', m_stdout.getvalue()) + + def test_status_returns_done_long(self): + '''Long format of done status includes datasource info.''' + ensure_file(self.tmp_path('result.json', self.new_root)) + write_json( + self.status_file, + {'v1': {'stage': None, + 'datasource': ( + 'DataSourceNoCloud [seed=/var/.../seed/nocloud-net]' + '[dsmode=net]'), + 'init': {'start': 124.567, 'finished': 125.678}, + 'init-local': {'start': 123.45, 'finished': 123.46}}}) + cmdargs = myargs(long=True, wait=False) + with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: + retcode = wrap_and_call( + 'cloudinit.cmd.status', + {'_is_cloudinit_disabled': (False, ''), + 'Init': {'side_effect': self.init_class}}, + status.handle_status_args, 'ignored', cmdargs) + self.assertEqual(0, retcode) + expected = dedent('''\ + status: done + time: Thu, 01 Jan 1970 00:02:05 +0000 + detail: + DataSourceNoCloud [seed=/var/.../seed/nocloud-net][dsmode=net] + ''') + self.assertEqual(expected, m_stdout.getvalue()) + + def test_status_on_errors(self): + '''Reports error when any stage has errors.''' + write_json( + self.status_file, + {'v1': {'stage': None, + 'blah': {'errors': [], 'finished': 123.456}, + 'init': {'errors': ['error1'], 'start': 124.567, + 'finished': 125.678}, + 'init-local': {'start': 123.45, 'finished': 123.46}}}) + cmdargs = myargs(long=False, wait=False) + with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: + retcode = wrap_and_call( + 'cloudinit.cmd.status', + {'_is_cloudinit_disabled': (False, ''), + 'Init': {'side_effect': self.init_class}}, + status.handle_status_args, 'ignored', cmdargs) + self.assertEqual(1, retcode) + self.assertEqual('status: error\n', m_stdout.getvalue()) + + def test_status_on_errors_long(self): + '''Long format of error status includes all error messages.''' + write_json( + self.status_file, + {'v1': {'stage': None, + 'datasource': ( + 'DataSourceNoCloud [seed=/var/.../seed/nocloud-net]' + '[dsmode=net]'), + 'init': {'errors': ['error1'], 'start': 124.567, + 'finished': 125.678}, + 'init-local': {'errors': ['error2', 'error3'], + 'start': 123.45, 'finished': 123.46}}}) + cmdargs = myargs(long=True, wait=False) + with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: + retcode = wrap_and_call( + 'cloudinit.cmd.status', + {'_is_cloudinit_disabled': (False, ''), + 'Init': {'side_effect': self.init_class}}, + status.handle_status_args, 'ignored', cmdargs) + self.assertEqual(1, retcode) + expected = dedent('''\ + status: error + time: Thu, 01 Jan 1970 00:02:05 +0000 + detail: + error1 + error2 + error3 + ''') + self.assertEqual(expected, m_stdout.getvalue()) + + def test_status_returns_running_long_format(self): + '''Long format reports the stage in which we are running.''' + write_json( + self.status_file, + {'v1': {'stage': 'init', + 'init': {'start': 124.456, 'finished': None}, + 'init-local': {'start': 123.45, 'finished': 123.46}}}) + cmdargs = myargs(long=True, wait=False) + with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: + retcode = wrap_and_call( + 'cloudinit.cmd.status', + {'_is_cloudinit_disabled': (False, ''), + 'Init': {'side_effect': self.init_class}}, + status.handle_status_args, 'ignored', cmdargs) + self.assertEqual(0, retcode) + expected = dedent('''\ + status: running + time: Thu, 01 Jan 1970 00:02:04 +0000 + detail: + Running in stage: init + ''') + self.assertEqual(expected, m_stdout.getvalue()) + + def test_status_wait_blocks_until_done(self): + '''Specifying wait will poll every 1/4 second until done state.''' + running_json = { + 'v1': {'stage': 'init', + 'init': {'start': 124.456, 'finished': None}, + 'init-local': {'start': 123.45, 'finished': 123.46}}} + done_json = { + 'v1': {'stage': None, + 'init': {'start': 124.456, 'finished': 125.678}, + 'init-local': {'start': 123.45, 'finished': 123.46}}} + + self.sleep_calls = 0 + + def fake_sleep(interval): + self.assertEqual(0.25, interval) + self.sleep_calls += 1 + if self.sleep_calls == 2: + write_json(self.status_file, running_json) + elif self.sleep_calls == 3: + write_json(self.status_file, done_json) + result_file = self.tmp_path('result.json', self.new_root) + ensure_file(result_file) + + cmdargs = myargs(long=False, wait=True) + with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: + retcode = wrap_and_call( + 'cloudinit.cmd.status', + {'sleep': {'side_effect': fake_sleep}, + '_is_cloudinit_disabled': (False, ''), + 'Init': {'side_effect': self.init_class}}, + status.handle_status_args, 'ignored', cmdargs) + self.assertEqual(0, retcode) + self.assertEqual(4, self.sleep_calls) + self.assertEqual('....\nstatus: done\n', m_stdout.getvalue()) + + def test_status_wait_blocks_until_error(self): + '''Specifying wait will poll every 1/4 second until error state.''' + running_json = { + 'v1': {'stage': 'init', + 'init': {'start': 124.456, 'finished': None}, + 'init-local': {'start': 123.45, 'finished': 123.46}}} + error_json = { + 'v1': {'stage': None, + 'init': {'errors': ['error1'], 'start': 124.456, + 'finished': 125.678}, + 'init-local': {'start': 123.45, 'finished': 123.46}}} + + self.sleep_calls = 0 + + def fake_sleep(interval): + self.assertEqual(0.25, interval) + self.sleep_calls += 1 + if self.sleep_calls == 2: + write_json(self.status_file, running_json) + elif self.sleep_calls == 3: + write_json(self.status_file, error_json) + + cmdargs = myargs(long=False, wait=True) + with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: + retcode = wrap_and_call( + 'cloudinit.cmd.status', + {'sleep': {'side_effect': fake_sleep}, + '_is_cloudinit_disabled': (False, ''), + 'Init': {'side_effect': self.init_class}}, + status.handle_status_args, 'ignored', cmdargs) + self.assertEqual(1, retcode) + self.assertEqual(4, self.sleep_calls) + self.assertEqual('....\nstatus: error\n', m_stdout.getvalue()) + + def test_status_main(self): + '''status.main can be run as a standalone script.''' + write_json(self.status_file, + {'v1': {'init': {'start': 1, 'finished': None}}}) + with self.assertRaises(SystemExit) as context_manager: + with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: + wrap_and_call( + 'cloudinit.cmd.status', + {'sys.argv': {'new': ['status']}, + '_is_cloudinit_disabled': (False, ''), + 'Init': {'side_effect': self.init_class}}, + status.main) + self.assertEqual(0, context_manager.exception.code) + self.assertEqual('status: running\n', m_stdout.getvalue()) + +# vi: ts=4 expandtab syntax=python |