summaryrefslogtreecommitdiff
path: root/tests/unittests/config/test_cc_write_files.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/config/test_cc_write_files.py')
-rw-r--r--tests/unittests/config/test_cc_write_files.py267
1 files changed, 267 insertions, 0 deletions
diff --git a/tests/unittests/config/test_cc_write_files.py b/tests/unittests/config/test_cc_write_files.py
new file mode 100644
index 00000000..faea5885
--- /dev/null
+++ b/tests/unittests/config/test_cc_write_files.py
@@ -0,0 +1,267 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+import base64
+import copy
+import gzip
+import io
+import shutil
+import tempfile
+
+from cloudinit import log as logging
+from cloudinit import util
+from cloudinit.config.cc_write_files import decode_perms, handle, write_files
+from tests.unittests.helpers import (
+ CiTestCase,
+ FilesystemMockingTestCase,
+ mock,
+ skipUnlessJsonSchema,
+)
+
+LOG = logging.getLogger(__name__)
+
+YAML_TEXT = """
+write_files:
+ - encoding: gzip
+ content: !!binary |
+ H4sIAIDb/U8C/1NW1E/KzNMvzuBKTc7IV8hIzcnJVyjPL8pJ4QIA6N+MVxsAAAA=
+ path: /usr/bin/hello
+ permissions: '0755'
+ - content: !!binary |
+ Zm9vYmFyCg==
+ path: /wark
+ permissions: '0755'
+ - content: |
+ hi mom line 1
+ hi mom line 2
+ path: /tmp/message
+"""
+
+YAML_CONTENT_EXPECTED = {
+ "/usr/bin/hello": "#!/bin/sh\necho hello world\n",
+ "/wark": "foobar\n",
+ "/tmp/message": "hi mom line 1\nhi mom line 2\n",
+}
+
+VALID_SCHEMA = {
+ "write_files": [
+ {
+ "append": False,
+ "content": "a",
+ "encoding": "gzip",
+ "owner": "jeff",
+ "path": "/some",
+ "permissions": "0777",
+ }
+ ]
+}
+
+INVALID_SCHEMA = { # Dropped required path key
+ "write_files": [
+ {
+ "append": False,
+ "content": "a",
+ "encoding": "gzip",
+ "owner": "jeff",
+ "permissions": "0777",
+ }
+ ]
+}
+
+
+@skipUnlessJsonSchema()
+@mock.patch("cloudinit.config.cc_write_files.write_files")
+class TestWriteFilesSchema(CiTestCase):
+
+ with_logs = True
+
+ def test_schema_validation_warns_missing_path(self, m_write_files):
+ """The only required file item property is 'path'."""
+ cc = self.tmp_cloud("ubuntu")
+ valid_config = {"write_files": [{"path": "/some/path"}]}
+ handle("cc_write_file", valid_config, cc, LOG, [])
+ self.assertNotIn(
+ "Invalid cloud-config provided:", self.logs.getvalue()
+ )
+ handle("cc_write_file", INVALID_SCHEMA, cc, LOG, [])
+ self.assertIn("Invalid cloud-config provided:", self.logs.getvalue())
+ self.assertIn("'path' is a required property", self.logs.getvalue())
+
+ def test_schema_validation_warns_non_string_type_for_files(
+ self, m_write_files
+ ):
+ """Schema validation warns of non-string values for each file item."""
+ cc = self.tmp_cloud("ubuntu")
+ for key in VALID_SCHEMA["write_files"][0].keys():
+ if key == "append":
+ key_type = "boolean"
+ else:
+ key_type = "string"
+ invalid_config = copy.deepcopy(VALID_SCHEMA)
+ invalid_config["write_files"][0][key] = 1
+ handle("cc_write_file", invalid_config, cc, LOG, [])
+ self.assertIn(
+ mock.call("cc_write_file", invalid_config["write_files"]),
+ m_write_files.call_args_list,
+ )
+ self.assertIn(
+ "write_files.0.%s: 1 is not of type '%s'" % (key, key_type),
+ self.logs.getvalue(),
+ )
+ self.assertIn("Invalid cloud-config provided:", self.logs.getvalue())
+
+ def test_schema_validation_warns_on_additional_undefined_propertes(
+ self, m_write_files
+ ):
+ """Schema validation warns on additional undefined file properties."""
+ cc = self.tmp_cloud("ubuntu")
+ invalid_config = copy.deepcopy(VALID_SCHEMA)
+ invalid_config["write_files"][0]["bogus"] = "value"
+ handle("cc_write_file", invalid_config, cc, LOG, [])
+ self.assertIn(
+ "Invalid cloud-config provided:\nwrite_files.0: Additional"
+ " properties are not allowed ('bogus' was unexpected)",
+ self.logs.getvalue(),
+ )
+
+
+class TestWriteFiles(FilesystemMockingTestCase):
+
+ with_logs = True
+
+ def setUp(self):
+ super(TestWriteFiles, self).setUp()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
+
+ @skipUnlessJsonSchema()
+ def test_handler_schema_validation_warns_non_array_type(self):
+ """Schema validation warns of non-array value."""
+ invalid_config = {"write_files": 1}
+ cc = self.tmp_cloud("ubuntu")
+ with self.assertRaises(TypeError):
+ handle("cc_write_file", invalid_config, cc, LOG, [])
+ self.assertIn(
+ "Invalid cloud-config provided:\nwrite_files: 1 is not of type"
+ " 'array'",
+ self.logs.getvalue(),
+ )
+
+ def test_simple(self):
+ self.patchUtils(self.tmp)
+ expected = "hello world\n"
+ filename = "/tmp/my.file"
+ write_files("test_simple", [{"content": expected, "path": filename}])
+ self.assertEqual(util.load_file(filename), expected)
+
+ def test_append(self):
+ self.patchUtils(self.tmp)
+ existing = "hello "
+ added = "world\n"
+ expected = existing + added
+ filename = "/tmp/append.file"
+ util.write_file(filename, existing)
+ write_files(
+ "test_append",
+ [{"content": added, "path": filename, "append": "true"}],
+ )
+ self.assertEqual(util.load_file(filename), expected)
+
+ def test_yaml_binary(self):
+ self.patchUtils(self.tmp)
+ data = util.load_yaml(YAML_TEXT)
+ write_files("testname", data["write_files"])
+ for path, content in YAML_CONTENT_EXPECTED.items():
+ self.assertEqual(util.load_file(path), content)
+
+ def test_all_decodings(self):
+ self.patchUtils(self.tmp)
+
+ # build a 'files' array that has a dictionary of encodings
+ # for 'gz', 'gzip', 'gz+base64' ...
+ data = b"foobzr"
+ utf8_valid = b"foobzr"
+ utf8_invalid = b"ab\xaadef"
+ files = []
+ expected = []
+
+ gz_aliases = ("gz", "gzip")
+ gz_b64_aliases = ("gz+base64", "gzip+base64", "gz+b64", "gzip+b64")
+ b64_aliases = ("base64", "b64")
+
+ datum = (("utf8", utf8_valid), ("no-utf8", utf8_invalid))
+ for name, data in datum:
+ gz = (_gzip_bytes(data), gz_aliases)
+ gz_b64 = (base64.b64encode(_gzip_bytes(data)), gz_b64_aliases)
+ b64 = (base64.b64encode(data), b64_aliases)
+ for content, aliases in (gz, gz_b64, b64):
+ for enc in aliases:
+ cur = {
+ "content": content,
+ "path": "/tmp/file-%s-%s" % (name, enc),
+ "encoding": enc,
+ }
+ files.append(cur)
+ expected.append((cur["path"], data))
+
+ write_files("test_decoding", files)
+
+ for path, content in expected:
+ self.assertEqual(util.load_file(path, decode=False), content)
+
+ # make sure we actually wrote *some* files.
+ flen_expected = len(gz_aliases + gz_b64_aliases + b64_aliases) * len(
+ datum
+ )
+ self.assertEqual(len(expected), flen_expected)
+
+ def test_deferred(self):
+ self.patchUtils(self.tmp)
+ file_path = "/tmp/deferred.file"
+ config = {"write_files": [{"path": file_path, "defer": True}]}
+ cc = self.tmp_cloud("ubuntu")
+ handle("cc_write_file", config, cc, LOG, [])
+ with self.assertRaises(FileNotFoundError):
+ util.load_file(file_path)
+
+
+class TestDecodePerms(CiTestCase):
+
+ with_logs = True
+
+ def test_none_returns_default(self):
+ """If None is passed as perms, then default should be returned."""
+ default = object()
+ found = decode_perms(None, default)
+ self.assertEqual(default, found)
+
+ def test_integer(self):
+ """A valid integer should return itself."""
+ found = decode_perms(0o755, None)
+ self.assertEqual(0o755, found)
+
+ def test_valid_octal_string(self):
+ """A string should be read as octal."""
+ found = decode_perms("644", None)
+ self.assertEqual(0o644, found)
+
+ def test_invalid_octal_string_returns_default_and_warns(self):
+ """A string with invalid octal should warn and return default."""
+ found = decode_perms("999", None)
+ self.assertIsNone(found)
+ self.assertIn("WARNING: Undecodable", self.logs.getvalue())
+
+
+def _gzip_bytes(data):
+ buf = io.BytesIO()
+ fp = None
+ try:
+ fp = gzip.GzipFile(fileobj=buf, mode="wb")
+ fp.write(data)
+ fp.close()
+ return buf.getvalue()
+ finally:
+ if fp:
+ fp.close()
+
+
+# vi: ts=4 expandtab