summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cloudinit/config/cc_write_files.py27
-rw-r--r--tests/unittests/test_handler/test_handler_write_files.py14
2 files changed, 22 insertions, 19 deletions
diff --git a/cloudinit/config/cc_write_files.py b/cloudinit/config/cc_write_files.py
index 1835a31b..54ae3a68 100644
--- a/cloudinit/config/cc_write_files.py
+++ b/cloudinit/config/cc_write_files.py
@@ -50,6 +50,7 @@ import base64
import os
import six
+from cloudinit import log as logging
from cloudinit.settings import PER_INSTANCE
from cloudinit import util
@@ -60,6 +61,8 @@ DEFAULT_OWNER = "root:root"
DEFAULT_PERMS = 0o644
UNKNOWN_ENC = 'text/plain'
+LOG = logging.getLogger(__name__)
+
def handle(name, cfg, _cloud, log, _args):
files = cfg.get('write_files')
@@ -67,10 +70,10 @@ def handle(name, cfg, _cloud, log, _args):
log.debug(("Skipping module named %s,"
" no/empty 'write_files' key in configuration"), name)
return
- write_files(name, files, log)
+ write_files(name, files)
-def canonicalize_extraction(encoding_type, log):
+def canonicalize_extraction(encoding_type):
if not encoding_type:
encoding_type = ''
encoding_type = encoding_type.lower().strip()
@@ -85,31 +88,31 @@ def canonicalize_extraction(encoding_type, log):
if encoding_type in ['b64', 'base64']:
return ['application/base64']
if encoding_type:
- log.warn("Unknown encoding type %s, assuming %s",
- encoding_type, UNKNOWN_ENC)
+ LOG.warning("Unknown encoding type %s, assuming %s",
+ encoding_type, UNKNOWN_ENC)
return [UNKNOWN_ENC]
-def write_files(name, files, log):
+def write_files(name, files):
if not files:
return
for (i, f_info) in enumerate(files):
path = f_info.get('path')
if not path:
- log.warn("No path provided to write for entry %s in module %s",
- i + 1, name)
+ LOG.warning("No path provided to write for entry %s in module %s",
+ i + 1, name)
continue
path = os.path.abspath(path)
- extractions = canonicalize_extraction(f_info.get('encoding'), log)
+ extractions = canonicalize_extraction(f_info.get('encoding'))
contents = extract_contents(f_info.get('content', ''), extractions)
(u, g) = util.extract_usergroup(f_info.get('owner', DEFAULT_OWNER))
- perms = decode_perms(f_info.get('permissions'), DEFAULT_PERMS, log)
+ perms = decode_perms(f_info.get('permissions'), DEFAULT_PERMS)
util.write_file(path, contents, mode=perms)
util.chownbyname(path, u, g)
-def decode_perms(perm, default, log):
+def decode_perms(perm, default):
if perm is None:
return default
try:
@@ -126,8 +129,8 @@ def decode_perms(perm, default, log):
reps.append("%o" % r)
except TypeError:
reps.append("%r" % r)
- log.warning(
- "Undecodable permissions {0}, returning default {1}".format(*reps))
+ LOG.warning(
+ "Undecodable permissions %s, returning default %s", *reps)
return default
diff --git a/tests/unittests/test_handler/test_handler_write_files.py b/tests/unittests/test_handler/test_handler_write_files.py
index 88a4742b..1129e77d 100644
--- a/tests/unittests/test_handler/test_handler_write_files.py
+++ b/tests/unittests/test_handler/test_handler_write_files.py
@@ -49,13 +49,13 @@ class TestWriteFiles(FilesystemMockingTestCase):
expected = "hello world\n"
filename = "/tmp/my.file"
write_files(
- "test_simple", [{"content": expected, "path": filename}], LOG)
+ "test_simple", [{"content": expected, "path": filename}])
self.assertEqual(util.load_file(filename), expected)
def test_yaml_binary(self):
self.patchUtils(self.tmp)
data = util.load_yaml(YAML_TEXT)
- write_files("testname", data['write_files'], LOG)
+ write_files("testname", data['write_files'])
for path, content in YAML_CONTENT_EXPECTED.items():
self.assertEqual(util.load_file(path), content)
@@ -87,7 +87,7 @@ class TestWriteFiles(FilesystemMockingTestCase):
files.append(cur)
expected.append((cur['path'], data))
- write_files("test_decoding", files, LOG)
+ write_files("test_decoding", files)
for path, content in expected:
self.assertEqual(util.load_file(path, decode=False), content)
@@ -105,22 +105,22 @@ class TestDecodePerms(CiTestCase):
def test_none_returns_default(self):
"""If None is passed as perms, then default should be returned."""
default = object()
- found = decode_perms(None, default, self.logger)
+ found = decode_perms(None, default)
self.assertEqual(default, found)
def test_integer(self):
"""A valid integer should return itself."""
- found = decode_perms(0o755, None, self.logger)
+ found = decode_perms(0o755, None)
self.assertEqual(0o755, found)
def test_valid_octal_string(self):
"""A string should be read as octal."""
- found = decode_perms("644", None, self.logger)
+ found = decode_perms("644", None)
self.assertEqual(0o644, found)
def test_invalid_octal_string_returns_default_and_warns(self):
"""A string with invalid octal should warn and return default."""
- found = decode_perms("999", None, self.logger)
+ found = decode_perms("999", None)
self.assertIsNone(found)
self.assertIn("WARNING: Undecodable", self.logs.getvalue())