path: root/tests/unittests/cmd/
diff options
Diffstat (limited to 'tests/unittests/cmd/')
1 files changed, 178 insertions, 0 deletions
diff --git a/tests/unittests/cmd/ b/tests/unittests/cmd/
new file mode 100644
index 00000000..81fc930e
--- /dev/null
+++ b/tests/unittests/cmd/
@@ -0,0 +1,178 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+from cloudinit.cmd import clean
+from cloudinit.util import ensure_dir, sym_link, write_file
+from tests.unittests.helpers import CiTestCase, wrap_and_call, mock
+from collections import namedtuple
+import os
+from io import StringIO
+mypaths = namedtuple('MyPaths', 'cloud_dir')
+class TestClean(CiTestCase):
+ def setUp(self):
+ super(TestClean, self).setUp()
+ self.new_root = self.tmp_dir()
+ self.artifact_dir = self.tmp_path('artifacts', self.new_root)
+ self.log1 = self.tmp_path('cloud-init.log', self.new_root)
+ self.log2 = self.tmp_path('cloud-init-output.log', self.new_root)
+ class FakeInit(object):
+ cfg = {'def_log_file': self.log1,
+ 'output': {'all': '|tee -a {0}'.format(self.log2)}}
+ # Ensure cloud_dir has a trailing slash, to match real behaviour
+ paths = mypaths(cloud_dir='{}/'.format(self.artifact_dir))
+ def __init__(self, ds_deps):
+ pass
+ def read_cfg(self):
+ pass
+ self.init_class = FakeInit
+ def test_remove_artifacts_removes_logs(self):
+ """remove_artifacts removes logs when remove_logs is True."""
+ write_file(self.log1, 'cloud-init-log')
+ write_file(self.log2, 'cloud-init-output-log')
+ self.assertFalse(
+ os.path.exists(self.artifact_dir), 'Unexpected artifacts dir')
+ retcode = wrap_and_call(
+ 'cloudinit.cmd.clean',
+ {'Init': {'side_effect': self.init_class}},
+ clean.remove_artifacts, remove_logs=True)
+ self.assertFalse(os.path.exists(self.log1), 'Unexpected file')
+ self.assertFalse(os.path.exists(self.log2), 'Unexpected file')
+ self.assertEqual(0, retcode)
+ def test_remove_artifacts_preserves_logs(self):
+ """remove_artifacts leaves logs when remove_logs is False."""
+ write_file(self.log1, 'cloud-init-log')
+ write_file(self.log2, 'cloud-init-output-log')
+ retcode = wrap_and_call(
+ 'cloudinit.cmd.clean',
+ {'Init': {'side_effect': self.init_class}},
+ clean.remove_artifacts, remove_logs=False)
+ self.assertTrue(os.path.exists(self.log1), 'Missing expected file')
+ self.assertTrue(os.path.exists(self.log2), 'Missing expected file')
+ self.assertEqual(0, retcode)
+ def test_remove_artifacts_removes_unlinks_symlinks(self):
+ """remove_artifacts cleans artifacts dir unlinking any symlinks."""
+ dir1 = os.path.join(self.artifact_dir, 'dir1')
+ ensure_dir(dir1)
+ symlink = os.path.join(self.artifact_dir, 'mylink')
+ sym_link(dir1, symlink)
+ retcode = wrap_and_call(
+ 'cloudinit.cmd.clean',
+ {'Init': {'side_effect': self.init_class}},
+ clean.remove_artifacts, remove_logs=False)
+ self.assertEqual(0, retcode)
+ for path in (dir1, symlink):
+ self.assertFalse(
+ os.path.exists(path),
+ 'Unexpected {0} dir'.format(path))
+ def test_remove_artifacts_removes_artifacts_skipping_seed(self):
+ """remove_artifacts cleans artifacts dir with exception of seed dir."""
+ dirs = [
+ self.artifact_dir,
+ os.path.join(self.artifact_dir, 'seed'),
+ os.path.join(self.artifact_dir, 'dir1'),
+ os.path.join(self.artifact_dir, 'dir2')]
+ for _dir in dirs:
+ ensure_dir(_dir)
+ retcode = wrap_and_call(
+ 'cloudinit.cmd.clean',
+ {'Init': {'side_effect': self.init_class}},
+ clean.remove_artifacts, remove_logs=False)
+ self.assertEqual(0, retcode)
+ for expected_dir in dirs[:2]:
+ self.assertTrue(
+ os.path.exists(expected_dir),
+ 'Missing {0} dir'.format(expected_dir))
+ for deleted_dir in dirs[2:]:
+ self.assertFalse(
+ os.path.exists(deleted_dir),
+ 'Unexpected {0} dir'.format(deleted_dir))
+ def test_remove_artifacts_removes_artifacts_removes_seed(self):
+ """remove_artifacts removes seed dir when remove_seed is True."""
+ dirs = [
+ self.artifact_dir,
+ os.path.join(self.artifact_dir, 'seed'),
+ os.path.join(self.artifact_dir, 'dir1'),
+ os.path.join(self.artifact_dir, 'dir2')]
+ for _dir in dirs:
+ ensure_dir(_dir)
+ retcode = wrap_and_call(
+ 'cloudinit.cmd.clean',
+ {'Init': {'side_effect': self.init_class}},
+ clean.remove_artifacts, remove_logs=False, remove_seed=True)
+ self.assertEqual(0, retcode)
+ self.assertTrue(
+ os.path.exists(self.artifact_dir), 'Missing artifact dir')
+ for deleted_dir in dirs[1:]:
+ self.assertFalse(
+ os.path.exists(deleted_dir),
+ 'Unexpected {0} dir'.format(deleted_dir))
+ def test_remove_artifacts_returns_one_on_errors(self):
+ """remove_artifacts returns non-zero on failure and prints an error."""
+ ensure_dir(self.artifact_dir)
+ ensure_dir(os.path.join(self.artifact_dir, 'dir1'))
+ with mock.patch('sys.stderr', new_callable=StringIO) as m_stderr:
+ retcode = wrap_and_call(
+ 'cloudinit.cmd.clean',
+ {'del_dir': {'side_effect': OSError('oops')},
+ 'Init': {'side_effect': self.init_class}},
+ clean.remove_artifacts, remove_logs=False)
+ self.assertEqual(1, retcode)
+ self.assertEqual(
+ 'ERROR: Could not remove %s/dir1: oops\n' % self.artifact_dir,
+ m_stderr.getvalue())
+ def test_handle_clean_args_reboots(self):
+ """handle_clean_args_reboots when reboot arg is provided."""
+ called_cmds = []
+ def fake_subp(cmd, capture):
+ called_cmds.append((cmd, capture))
+ return '', ''
+ myargs = namedtuple('MyArgs', 'remove_logs remove_seed reboot')
+ cmdargs = myargs(remove_logs=False, remove_seed=False, reboot=True)
+ retcode = wrap_and_call(
+ 'cloudinit.cmd.clean',
+ {'subp': {'side_effect': fake_subp},
+ 'Init': {'side_effect': self.init_class}},
+ clean.handle_clean_args, name='does not matter', args=cmdargs)
+ self.assertEqual(0, retcode)
+ self.assertEqual(
+ [(['shutdown', '-r', 'now'], False)], called_cmds)
+ def test_status_main(self):
+ '''clean.main can be run as a standalone script.'''
+ write_file(self.log1, 'cloud-init-log')
+ with self.assertRaises(SystemExit) as context_manager:
+ wrap_and_call(
+ 'cloudinit.cmd.clean',
+ {'Init': {'side_effect': self.init_class},
+ 'sys.argv': {'new': ['clean', '--logs']}},
+ clean.main)
+ self.assertEqual(0, context_manager.exception.code)
+ self.assertFalse(
+ os.path.exists(self.log1), 'Unexpected log {0}'.format(self.log1))
+# vi: ts=4 expandtab syntax=python