# This file is part of cloud-init. See LICENSE file for license information. import os from collections import namedtuple from io import StringIO from cloudinit.cmd import clean from cloudinit.util import ensure_dir, sym_link, write_file from tests.unittests.helpers import CiTestCase, mock, wrap_and_call mypaths = namedtuple("MyPaths", "cloud_dir") class TestClean(CiTestCase): def setUp(self): super(TestClean, self).setUp() self.new_root = self.tmp_dir() self.artifact_dir = self.tmp_path("artifacts", self.new_root) self.log1 = self.tmp_path("cloud-init.log", self.new_root) self.log2 = self.tmp_path("cloud-init-output.log", self.new_root) class FakeInit(object): cfg = { "def_log_file": self.log1, "output": {"all": "|tee -a {0}".format(self.log2)}, } # Ensure cloud_dir has a trailing slash, to match real behaviour paths = mypaths(cloud_dir="{}/".format(self.artifact_dir)) def __init__(self, ds_deps): pass def read_cfg(self): pass self.init_class = FakeInit def test_remove_artifacts_removes_logs(self): """remove_artifacts removes logs when remove_logs is True.""" write_file(self.log1, "cloud-init-log") write_file(self.log2, "cloud-init-output-log") self.assertFalse( os.path.exists(self.artifact_dir), "Unexpected artifacts dir" ) retcode = wrap_and_call( "cloudinit.cmd.clean", {"Init": {"side_effect": self.init_class}}, clean.remove_artifacts, remove_logs=True, ) self.assertFalse(os.path.exists(self.log1), "Unexpected file") self.assertFalse(os.path.exists(self.log2), "Unexpected file") self.assertEqual(0, retcode) def test_remove_artifacts_preserves_logs(self): """remove_artifacts leaves logs when remove_logs is False.""" write_file(self.log1, "cloud-init-log") write_file(self.log2, "cloud-init-output-log") retcode = wrap_and_call( "cloudinit.cmd.clean", {"Init": {"side_effect": self.init_class}}, clean.remove_artifacts, remove_logs=False, ) self.assertTrue(os.path.exists(self.log1), "Missing expected file") self.assertTrue(os.path.exists(self.log2), "Missing expected file") self.assertEqual(0, retcode) def test_remove_artifacts_removes_unlinks_symlinks(self): """remove_artifacts cleans artifacts dir unlinking any symlinks.""" dir1 = os.path.join(self.artifact_dir, "dir1") ensure_dir(dir1) symlink = os.path.join(self.artifact_dir, "mylink") sym_link(dir1, symlink) retcode = wrap_and_call( "cloudinit.cmd.clean", {"Init": {"side_effect": self.init_class}}, clean.remove_artifacts, remove_logs=False, ) self.assertEqual(0, retcode) for path in (dir1, symlink): self.assertFalse( os.path.exists(path), "Unexpected {0} dir".format(path) ) def test_remove_artifacts_removes_artifacts_skipping_seed(self): """remove_artifacts cleans artifacts dir with exception of seed dir.""" dirs = [ self.artifact_dir, os.path.join(self.artifact_dir, "seed"), os.path.join(self.artifact_dir, "dir1"), os.path.join(self.artifact_dir, "dir2"), ] for _dir in dirs: ensure_dir(_dir) retcode = wrap_and_call( "cloudinit.cmd.clean", {"Init": {"side_effect": self.init_class}}, clean.remove_artifacts, remove_logs=False, ) self.assertEqual(0, retcode) for expected_dir in dirs[:2]: self.assertTrue( os.path.exists(expected_dir), "Missing {0} dir".format(expected_dir), ) for deleted_dir in dirs[2:]: self.assertFalse( os.path.exists(deleted_dir), "Unexpected {0} dir".format(deleted_dir), ) def test_remove_artifacts_removes_artifacts_removes_seed(self): """remove_artifacts removes seed dir when remove_seed is True.""" dirs = [ self.artifact_dir, os.path.join(self.artifact_dir, "seed"), os.path.join(self.artifact_dir, "dir1"), os.path.join(self.artifact_dir, "dir2"), ] for _dir in dirs: ensure_dir(_dir) retcode = wrap_and_call( "cloudinit.cmd.clean", {"Init": {"side_effect": self.init_class}}, clean.remove_artifacts, remove_logs=False, remove_seed=True, ) self.assertEqual(0, retcode) self.assertTrue( os.path.exists(self.artifact_dir), "Missing artifact dir" ) for deleted_dir in dirs[1:]: self.assertFalse( os.path.exists(deleted_dir), "Unexpected {0} dir".format(deleted_dir), ) def test_remove_artifacts_returns_one_on_errors(self): """remove_artifacts returns non-zero on failure and prints an error.""" ensure_dir(self.artifact_dir) ensure_dir(os.path.join(self.artifact_dir, "dir1")) with mock.patch("sys.stderr", new_callable=StringIO) as m_stderr: retcode = wrap_and_call( "cloudinit.cmd.clean", { "del_dir": {"side_effect": OSError("oops")}, "Init": {"side_effect": self.init_class}, }, clean.remove_artifacts, remove_logs=False, ) self.assertEqual(1, retcode) self.assertEqual( "Error:\nCould not remove %s/dir1: oops\n" % self.artifact_dir, m_stderr.getvalue(), ) def test_handle_clean_args_reboots(self): """handle_clean_args_reboots when reboot arg is provided.""" called_cmds = [] def fake_subp(cmd, capture): called_cmds.append((cmd, capture)) return "", "" myargs = namedtuple("MyArgs", "remove_logs remove_seed reboot") cmdargs = myargs(remove_logs=False, remove_seed=False, reboot=True) retcode = wrap_and_call( "cloudinit.cmd.clean", { "subp": {"side_effect": fake_subp}, "Init": {"side_effect": self.init_class}, }, clean.handle_clean_args, name="does not matter", args=cmdargs, ) self.assertEqual(0, retcode) self.assertEqual([(["shutdown", "-r", "now"], False)], called_cmds) def test_status_main(self): """clean.main can be run as a standalone script.""" write_file(self.log1, "cloud-init-log") with self.assertRaises(SystemExit) as context_manager: wrap_and_call( "cloudinit.cmd.clean", { "Init": {"side_effect": self.init_class}, "sys.argv": {"new": ["clean", "--logs"]}, }, clean.main, ) self.assertEqual(0, context_manager.exception.code) self.assertFalse( os.path.exists(self.log1), "Unexpected log {0}".format(self.log1) ) # vi: ts=4 expandtab syntax=python