diff options
-rw-r--r-- | cloudinit/config/cc_write_files.py | 27 | ||||
-rw-r--r-- | tests/unittests/test_handler/test_handler_write_files.py | 14 |
2 files changed, 22 insertions, 19 deletions
diff --git a/cloudinit/config/cc_write_files.py b/cloudinit/config/cc_write_files.py index 1835a31b..54ae3a68 100644 --- a/cloudinit/config/cc_write_files.py +++ b/cloudinit/config/cc_write_files.py @@ -50,6 +50,7 @@ import base64 import os import six +from cloudinit import log as logging from cloudinit.settings import PER_INSTANCE from cloudinit import util @@ -60,6 +61,8 @@ DEFAULT_OWNER = "root:root" DEFAULT_PERMS = 0o644 UNKNOWN_ENC = 'text/plain' +LOG = logging.getLogger(__name__) + def handle(name, cfg, _cloud, log, _args): files = cfg.get('write_files') @@ -67,10 +70,10 @@ def handle(name, cfg, _cloud, log, _args): log.debug(("Skipping module named %s," " no/empty 'write_files' key in configuration"), name) return - write_files(name, files, log) + write_files(name, files) -def canonicalize_extraction(encoding_type, log): +def canonicalize_extraction(encoding_type): if not encoding_type: encoding_type = '' encoding_type = encoding_type.lower().strip() @@ -85,31 +88,31 @@ def canonicalize_extraction(encoding_type, log): if encoding_type in ['b64', 'base64']: return ['application/base64'] if encoding_type: - log.warn("Unknown encoding type %s, assuming %s", - encoding_type, UNKNOWN_ENC) + LOG.warning("Unknown encoding type %s, assuming %s", + encoding_type, UNKNOWN_ENC) return [UNKNOWN_ENC] -def write_files(name, files, log): +def write_files(name, files): if not files: return for (i, f_info) in enumerate(files): path = f_info.get('path') if not path: - log.warn("No path provided to write for entry %s in module %s", - i + 1, name) + LOG.warning("No path provided to write for entry %s in module %s", + i + 1, name) continue path = os.path.abspath(path) - extractions = canonicalize_extraction(f_info.get('encoding'), log) + extractions = canonicalize_extraction(f_info.get('encoding')) contents = extract_contents(f_info.get('content', ''), extractions) (u, g) = util.extract_usergroup(f_info.get('owner', DEFAULT_OWNER)) - perms = decode_perms(f_info.get('permissions'), DEFAULT_PERMS, log) + perms = decode_perms(f_info.get('permissions'), DEFAULT_PERMS) util.write_file(path, contents, mode=perms) util.chownbyname(path, u, g) -def decode_perms(perm, default, log): +def decode_perms(perm, default): if perm is None: return default try: @@ -126,8 +129,8 @@ def decode_perms(perm, default, log): reps.append("%o" % r) except TypeError: reps.append("%r" % r) - log.warning( - "Undecodable permissions {0}, returning default {1}".format(*reps)) + LOG.warning( + "Undecodable permissions %s, returning default %s", *reps) return default diff --git a/tests/unittests/test_handler/test_handler_write_files.py b/tests/unittests/test_handler/test_handler_write_files.py index 88a4742b..1129e77d 100644 --- a/tests/unittests/test_handler/test_handler_write_files.py +++ b/tests/unittests/test_handler/test_handler_write_files.py @@ -49,13 +49,13 @@ class TestWriteFiles(FilesystemMockingTestCase): expected = "hello world\n" filename = "/tmp/my.file" write_files( - "test_simple", [{"content": expected, "path": filename}], LOG) + "test_simple", [{"content": expected, "path": filename}]) self.assertEqual(util.load_file(filename), expected) def test_yaml_binary(self): self.patchUtils(self.tmp) data = util.load_yaml(YAML_TEXT) - write_files("testname", data['write_files'], LOG) + write_files("testname", data['write_files']) for path, content in YAML_CONTENT_EXPECTED.items(): self.assertEqual(util.load_file(path), content) @@ -87,7 +87,7 @@ class TestWriteFiles(FilesystemMockingTestCase): files.append(cur) expected.append((cur['path'], data)) - write_files("test_decoding", files, LOG) + write_files("test_decoding", files) for path, content in expected: self.assertEqual(util.load_file(path, decode=False), content) @@ -105,22 +105,22 @@ class TestDecodePerms(CiTestCase): def test_none_returns_default(self): """If None is passed as perms, then default should be returned.""" default = object() - found = decode_perms(None, default, self.logger) + found = decode_perms(None, default) self.assertEqual(default, found) def test_integer(self): """A valid integer should return itself.""" - found = decode_perms(0o755, None, self.logger) + found = decode_perms(0o755, None) self.assertEqual(0o755, found) def test_valid_octal_string(self): """A string should be read as octal.""" - found = decode_perms("644", None, self.logger) + found = decode_perms("644", None) self.assertEqual(0o644, found) def test_invalid_octal_string_returns_default_and_warns(self): """A string with invalid octal should warn and return default.""" - found = decode_perms("999", None, self.logger) + found = decode_perms("999", None) self.assertIsNone(found) self.assertIn("WARNING: Undecodable", self.logs.getvalue()) |