summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cloudinit/config/cc_write_files.py10
-rw-r--r--cloudinit/util.py6
-rw-r--r--tests/unittests/helpers.py8
-rw-r--r--tests/unittests/test_datasource/test_azure.py3
-rw-r--r--tests/unittests/test_handler/test_handler_ntp.py3
-rw-r--r--tests/unittests/test_handler/test_handler_write_files.py31
6 files changed, 52 insertions, 9 deletions
diff --git a/cloudinit/config/cc_write_files.py b/cloudinit/config/cc_write_files.py
index 72e1cdd6..1835a31b 100644
--- a/cloudinit/config/cc_write_files.py
+++ b/cloudinit/config/cc_write_files.py
@@ -53,6 +53,7 @@ import six
from cloudinit.settings import PER_INSTANCE
from cloudinit import util
+
frequency = PER_INSTANCE
DEFAULT_OWNER = "root:root"
@@ -119,7 +120,14 @@ def decode_perms(perm, default, log):
# Force to string and try octal conversion
return int(str(perm), 8)
except (TypeError, ValueError):
- log.warn("Undecodable permissions %s, assuming %s", perm, default)
+ reps = []
+ for r in (perm, default):
+ try:
+ reps.append("%o" % r)
+ except TypeError:
+ reps.append("%r" % r)
+ log.warning(
+ "Undecodable permissions {0}, returning default {1}".format(*reps))
return default
diff --git a/cloudinit/util.py b/cloudinit/util.py
index 415ca374..ec68925e 100644
--- a/cloudinit/util.py
+++ b/cloudinit/util.py
@@ -1751,8 +1751,12 @@ def write_file(filename, content, mode=0o644, omode="wb", copy_mode=False):
else:
content = decode_binary(content)
write_type = 'characters'
+ try:
+ mode_r = "%o" % mode
+ except TypeError:
+ mode_r = "%r" % mode
LOG.debug("Writing to %s - %s: [%s] %s %s",
- filename, omode, mode, len(content), write_type)
+ filename, omode, mode_r, len(content), write_type)
with SeLinuxGuard(path=filename):
with open(filename, omode) as fh:
fh.write(content)
diff --git a/tests/unittests/helpers.py b/tests/unittests/helpers.py
index e78abce2..569f1aef 100644
--- a/tests/unittests/helpers.py
+++ b/tests/unittests/helpers.py
@@ -97,11 +97,13 @@ class CiTestCase(TestCase):
super(CiTestCase, self).setUp()
if self.with_logs:
# Create a log handler so unit tests can search expected logs.
- logger = logging.getLogger()
+ self.logger = logging.getLogger()
self.logs = six.StringIO()
+ formatter = logging.Formatter('%(levelname)s: %(message)s')
handler = logging.StreamHandler(self.logs)
- self.old_handlers = logger.handlers
- logger.handlers = [handler]
+ handler.setFormatter(formatter)
+ self.old_handlers = self.logger.handlers
+ self.logger.handlers = [handler]
def tearDown(self):
if self.with_logs:
diff --git a/tests/unittests/test_datasource/test_azure.py b/tests/unittests/test_datasource/test_azure.py
index 42f49e06..b17f389c 100644
--- a/tests/unittests/test_datasource/test_azure.py
+++ b/tests/unittests/test_datasource/test_azure.py
@@ -263,7 +263,8 @@ fdescfs /dev/fd fdescfs rw 0 0
{}, distro=None, paths=self.paths)
self.assertFalse(dsrc.get_data())
self.assertEqual(
- "Non-Azure DMI asset tag '{0}' discovered.\n".format(nonazure_tag),
+ "DEBUG: Non-Azure DMI asset tag '{0}' discovered.\n".format(
+ nonazure_tag),
self.logs.getvalue())
def test_basic_seed_dir(self):
diff --git a/tests/unittests/test_handler/test_handler_ntp.py b/tests/unittests/test_handler/test_handler_ntp.py
index 3a9f7f7e..c4299d94 100644
--- a/tests/unittests/test_handler/test_handler_ntp.py
+++ b/tests/unittests/test_handler/test_handler_ntp.py
@@ -216,7 +216,8 @@ class TestNtp(FilesystemMockingTestCase):
"""When no ntp section is defined handler logs a warning and noops."""
cc_ntp.handle('cc_ntp', {}, None, None, [])
self.assertEqual(
- 'Skipping module named cc_ntp, not present or disabled by cfg\n',
+ 'DEBUG: Skipping module named cc_ntp, '
+ 'not present or disabled by cfg\n',
self.logs.getvalue())
def test_ntp_handler_schema_validation_allows_empty_ntp_config(self):
diff --git a/tests/unittests/test_handler/test_handler_write_files.py b/tests/unittests/test_handler/test_handler_write_files.py
index fb252d1d..88a4742b 100644
--- a/tests/unittests/test_handler/test_handler_write_files.py
+++ b/tests/unittests/test_handler/test_handler_write_files.py
@@ -1,10 +1,10 @@
# This file is part of cloud-init. See LICENSE file for license information.
-from cloudinit.config.cc_write_files import write_files
+from cloudinit.config.cc_write_files import write_files, decode_perms
from cloudinit import log as logging
from cloudinit import util
-from ..helpers import FilesystemMockingTestCase
+from ..helpers import CiTestCase, FilesystemMockingTestCase
import base64
import gzip
@@ -98,6 +98,33 @@ class TestWriteFiles(FilesystemMockingTestCase):
self.assertEqual(len(expected), flen_expected)
+class TestDecodePerms(CiTestCase):
+
+ with_logs = True
+
+ def test_none_returns_default(self):
+ """If None is passed as perms, then default should be returned."""
+ default = object()
+ found = decode_perms(None, default, self.logger)
+ self.assertEqual(default, found)
+
+ def test_integer(self):
+ """A valid integer should return itself."""
+ found = decode_perms(0o755, None, self.logger)
+ self.assertEqual(0o755, found)
+
+ def test_valid_octal_string(self):
+ """A string should be read as octal."""
+ found = decode_perms("644", None, self.logger)
+ self.assertEqual(0o644, found)
+
+ def test_invalid_octal_string_returns_default_and_warns(self):
+ """A string with invalid octal should warn and return default."""
+ found = decode_perms("999", None, self.logger)
+ self.assertIsNone(found)
+ self.assertIn("WARNING: Undecodable", self.logs.getvalue())
+
+
def _gzip_bytes(data):
buf = six.BytesIO()
fp = None