diff options
-rw-r--r-- | cloudinit/config/cc_write_files.py | 10 | ||||
-rw-r--r-- | cloudinit/util.py | 6 | ||||
-rw-r--r-- | tests/unittests/helpers.py | 8 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_azure.py | 3 | ||||
-rw-r--r-- | tests/unittests/test_handler/test_handler_ntp.py | 3 | ||||
-rw-r--r-- | tests/unittests/test_handler/test_handler_write_files.py | 31 |
6 files changed, 52 insertions, 9 deletions
diff --git a/cloudinit/config/cc_write_files.py b/cloudinit/config/cc_write_files.py index 72e1cdd6..1835a31b 100644 --- a/cloudinit/config/cc_write_files.py +++ b/cloudinit/config/cc_write_files.py @@ -53,6 +53,7 @@ import six from cloudinit.settings import PER_INSTANCE from cloudinit import util + frequency = PER_INSTANCE DEFAULT_OWNER = "root:root" @@ -119,7 +120,14 @@ def decode_perms(perm, default, log): # Force to string and try octal conversion return int(str(perm), 8) except (TypeError, ValueError): - log.warn("Undecodable permissions %s, assuming %s", perm, default) + reps = [] + for r in (perm, default): + try: + reps.append("%o" % r) + except TypeError: + reps.append("%r" % r) + log.warning( + "Undecodable permissions {0}, returning default {1}".format(*reps)) return default diff --git a/cloudinit/util.py b/cloudinit/util.py index 415ca374..ec68925e 100644 --- a/cloudinit/util.py +++ b/cloudinit/util.py @@ -1751,8 +1751,12 @@ def write_file(filename, content, mode=0o644, omode="wb", copy_mode=False): else: content = decode_binary(content) write_type = 'characters' + try: + mode_r = "%o" % mode + except TypeError: + mode_r = "%r" % mode LOG.debug("Writing to %s - %s: [%s] %s %s", - filename, omode, mode, len(content), write_type) + filename, omode, mode_r, len(content), write_type) with SeLinuxGuard(path=filename): with open(filename, omode) as fh: fh.write(content) diff --git a/tests/unittests/helpers.py b/tests/unittests/helpers.py index e78abce2..569f1aef 100644 --- a/tests/unittests/helpers.py +++ b/tests/unittests/helpers.py @@ -97,11 +97,13 @@ class CiTestCase(TestCase): super(CiTestCase, self).setUp() if self.with_logs: # Create a log handler so unit tests can search expected logs. - logger = logging.getLogger() + self.logger = logging.getLogger() self.logs = six.StringIO() + formatter = logging.Formatter('%(levelname)s: %(message)s') handler = logging.StreamHandler(self.logs) - self.old_handlers = logger.handlers - logger.handlers = [handler] + handler.setFormatter(formatter) + self.old_handlers = self.logger.handlers + self.logger.handlers = [handler] def tearDown(self): if self.with_logs: diff --git a/tests/unittests/test_datasource/test_azure.py b/tests/unittests/test_datasource/test_azure.py index 42f49e06..b17f389c 100644 --- a/tests/unittests/test_datasource/test_azure.py +++ b/tests/unittests/test_datasource/test_azure.py @@ -263,7 +263,8 @@ fdescfs /dev/fd fdescfs rw 0 0 {}, distro=None, paths=self.paths) self.assertFalse(dsrc.get_data()) self.assertEqual( - "Non-Azure DMI asset tag '{0}' discovered.\n".format(nonazure_tag), + "DEBUG: Non-Azure DMI asset tag '{0}' discovered.\n".format( + nonazure_tag), self.logs.getvalue()) def test_basic_seed_dir(self): diff --git a/tests/unittests/test_handler/test_handler_ntp.py b/tests/unittests/test_handler/test_handler_ntp.py index 3a9f7f7e..c4299d94 100644 --- a/tests/unittests/test_handler/test_handler_ntp.py +++ b/tests/unittests/test_handler/test_handler_ntp.py @@ -216,7 +216,8 @@ class TestNtp(FilesystemMockingTestCase): """When no ntp section is defined handler logs a warning and noops.""" cc_ntp.handle('cc_ntp', {}, None, None, []) self.assertEqual( - 'Skipping module named cc_ntp, not present or disabled by cfg\n', + 'DEBUG: Skipping module named cc_ntp, ' + 'not present or disabled by cfg\n', self.logs.getvalue()) def test_ntp_handler_schema_validation_allows_empty_ntp_config(self): diff --git a/tests/unittests/test_handler/test_handler_write_files.py b/tests/unittests/test_handler/test_handler_write_files.py index fb252d1d..88a4742b 100644 --- a/tests/unittests/test_handler/test_handler_write_files.py +++ b/tests/unittests/test_handler/test_handler_write_files.py @@ -1,10 +1,10 @@ # This file is part of cloud-init. See LICENSE file for license information. -from cloudinit.config.cc_write_files import write_files +from cloudinit.config.cc_write_files import write_files, decode_perms from cloudinit import log as logging from cloudinit import util -from ..helpers import FilesystemMockingTestCase +from ..helpers import CiTestCase, FilesystemMockingTestCase import base64 import gzip @@ -98,6 +98,33 @@ class TestWriteFiles(FilesystemMockingTestCase): self.assertEqual(len(expected), flen_expected) +class TestDecodePerms(CiTestCase): + + with_logs = True + + def test_none_returns_default(self): + """If None is passed as perms, then default should be returned.""" + default = object() + found = decode_perms(None, default, self.logger) + self.assertEqual(default, found) + + def test_integer(self): + """A valid integer should return itself.""" + found = decode_perms(0o755, None, self.logger) + self.assertEqual(0o755, found) + + def test_valid_octal_string(self): + """A string should be read as octal.""" + found = decode_perms("644", None, self.logger) + self.assertEqual(0o644, found) + + def test_invalid_octal_string_returns_default_and_warns(self): + """A string with invalid octal should warn and return default.""" + found = decode_perms("999", None, self.logger) + self.assertIsNone(found) + self.assertIn("WARNING: Undecodable", self.logs.getvalue()) + + def _gzip_bytes(data): buf = six.BytesIO() fp = None |