summaryrefslogtreecommitdiff
path: root/cloudinit/tests
diff options
context:
space:
mode:
Diffstat (limited to 'cloudinit/tests')
-rw-r--r--cloudinit/tests/helpers.py72
-rw-r--r--cloudinit/tests/test_conftest.py65
-rw-r--r--cloudinit/tests/test_features.py60
-rw-r--r--cloudinit/tests/test_gpg.py10
-rw-r--r--cloudinit/tests/test_netinfo.py40
-rw-r--r--cloudinit/tests/test_subp.py227
-rw-r--r--cloudinit/tests/test_url_helper.py34
-rw-r--r--cloudinit/tests/test_util.py210
8 files changed, 619 insertions, 99 deletions
diff --git a/cloudinit/tests/helpers.py b/cloudinit/tests/helpers.py
index 70f6bad7..58f63b69 100644
--- a/cloudinit/tests/helpers.py
+++ b/cloudinit/tests/helpers.py
@@ -1,7 +1,5 @@
# This file is part of cloud-init. See LICENSE file for license information.
-from __future__ import print_function
-
import functools
import httpretty
import io
@@ -13,20 +11,10 @@ import string
import sys
import tempfile
import time
+import unittest
+from contextlib import ExitStack, contextmanager
from unittest import mock
-
-import unittest2
-from unittest2.util import strclass
-
-try:
- from contextlib import ExitStack, contextmanager
-except ImportError:
- from contextlib2 import ExitStack, contextmanager
-
-try:
- from configparser import ConfigParser
-except ImportError:
- from ConfigParser import ConfigParser
+from unittest.util import strclass
from cloudinit.config.schema import (
SchemaValidationError, validate_cloudconfig_schema)
@@ -35,13 +23,14 @@ from cloudinit import distros
from cloudinit import helpers as ch
from cloudinit.sources import DataSourceNone
from cloudinit.templater import JINJA_AVAILABLE
+from cloudinit import subp
from cloudinit import util
-_real_subp = util.subp
+_real_subp = subp.subp
# Used for skipping tests
-SkipTest = unittest2.SkipTest
-skipIf = unittest2.skipIf
+SkipTest = unittest.SkipTest
+skipIf = unittest.skipIf
# Makes the old path start
@@ -78,7 +67,7 @@ def retarget_many_wrapper(new_base, am, old_func):
return wrapper
-class TestCase(unittest2.TestCase):
+class TestCase(unittest.TestCase):
def reset_global_state(self):
"""Reset any global state to its original settings.
@@ -114,16 +103,6 @@ class TestCase(unittest2.TestCase):
self.addCleanup(m.stop)
setattr(self, attr, p)
- # prefer python3 read_file over readfp but allow fallback
- def parse_and_read(self, contents):
- parser = ConfigParser()
- if hasattr(parser, 'read_file'):
- parser.read_file(contents)
- elif hasattr(parser, 'readfp'):
- # pylint: disable=W1505
- parser.readfp(contents)
- return parser
-
class CiTestCase(TestCase):
"""This is the preferred test case base class unless user
@@ -156,14 +135,17 @@ class CiTestCase(TestCase):
self.old_handlers = self.logger.handlers
self.logger.handlers = [handler]
if self.allowed_subp is True:
- util.subp = _real_subp
+ subp.subp = _real_subp
else:
- util.subp = self._fake_subp
+ subp.subp = self._fake_subp
def _fake_subp(self, *args, **kwargs):
if 'args' in kwargs:
cmd = kwargs['args']
else:
+ if not args:
+ raise TypeError(
+ "subp() missing 1 required positional argument: 'args'")
cmd = args[0]
if not isinstance(cmd, str):
@@ -190,7 +172,7 @@ class CiTestCase(TestCase):
# Remove the handler we setup
logging.getLogger().handlers = self.old_handlers
logging.getLogger().level = None
- util.subp = _real_subp
+ subp.subp = _real_subp
super(CiTestCase, self).tearDown()
def tmp_dir(self, dir=None, cleanup=True):
@@ -212,16 +194,6 @@ class CiTestCase(TestCase):
dir = self.tmp_dir()
return os.path.normpath(os.path.abspath(os.path.join(dir, path)))
- def sys_exit(self, code):
- """Provide a wrapper around sys.exit for python 2.6
-
- In 2.6, this code would produce 'cm.exception' with value int(2)
- rather than the SystemExit that was raised by sys.exit(2).
- with assertRaises(SystemExit) as cm:
- sys.exit(2)
- """
- raise SystemExit(code)
-
def tmp_cloud(self, distro, sys_cfg=None, metadata=None):
"""Create a cloud with tmp working directory paths.
@@ -309,13 +281,13 @@ class FilesystemMockingTestCase(ResourceUsingTestCase):
mock.patch.object(mod, f, trap_func))
# Handle subprocess calls
- func = getattr(util, 'subp')
+ func = getattr(subp, 'subp')
def nsubp(*_args, **_kwargs):
return ('', '')
self.patched_funcs.enter_context(
- mock.patch.object(util, 'subp', nsubp))
+ mock.patch.object(subp, 'subp', nsubp))
def null_func(*_args, **_kwargs):
return None
@@ -363,6 +335,7 @@ class FilesystemMockingTestCase(ResourceUsingTestCase):
root = self.tmp_dir()
self.patchUtils(root)
self.patchOS(root)
+ self.patchOpen(root)
return root
@contextmanager
@@ -396,7 +369,7 @@ class HttprettyTestCase(CiTestCase):
super(HttprettyTestCase, self).tearDown()
-class SchemaTestCaseMixin(unittest2.TestCase):
+class SchemaTestCaseMixin(unittest.TestCase):
def assertSchemaValid(self, cfg, msg="Valid Schema failed validation."):
"""Assert the config is valid per self.schema.
@@ -528,13 +501,4 @@ if not hasattr(mock.Mock, 'assert_not_called'):
raise AssertionError(msg)
mock.Mock.assert_not_called = __mock_assert_not_called
-
-# older unittest2.TestCase (centos6) have only the now-deprecated
-# assertRaisesRegexp. Simple assignment makes pylint complain, about
-# users of assertRaisesRegex so we use getattr to trick it.
-# https://github.com/PyCQA/pylint/issues/1946
-if not hasattr(unittest2.TestCase, 'assertRaisesRegex'):
- unittest2.TestCase.assertRaisesRegex = (
- getattr(unittest2.TestCase, 'assertRaisesRegexp'))
-
# vi: ts=4 expandtab
diff --git a/cloudinit/tests/test_conftest.py b/cloudinit/tests/test_conftest.py
new file mode 100644
index 00000000..6f1263a5
--- /dev/null
+++ b/cloudinit/tests/test_conftest.py
@@ -0,0 +1,65 @@
+import pytest
+
+from cloudinit import subp
+from cloudinit.tests.helpers import CiTestCase
+
+
+class TestDisableSubpUsage:
+ """Test that the disable_subp_usage fixture behaves as expected."""
+
+ def test_using_subp_raises_assertion_error(self):
+ with pytest.raises(AssertionError):
+ subp.subp(["some", "args"])
+
+ def test_typeerrors_on_incorrect_usage(self):
+ with pytest.raises(TypeError):
+ # We are intentionally passing no value for a parameter, so:
+ # pylint: disable=no-value-for-parameter
+ subp.subp()
+
+ @pytest.mark.allow_all_subp
+ def test_subp_usage_can_be_reenabled(self):
+ subp.subp(['whoami'])
+
+ @pytest.mark.allow_subp_for("whoami")
+ def test_subp_usage_can_be_conditionally_reenabled(self):
+ # The two parameters test each potential invocation with a single
+ # argument
+ with pytest.raises(AssertionError) as excinfo:
+ subp.subp(["some", "args"])
+ assert "allowed: whoami" in str(excinfo.value)
+ subp.subp(['whoami'])
+
+ @pytest.mark.allow_subp_for("whoami", "bash")
+ def test_subp_usage_can_be_conditionally_reenabled_for_multiple_cmds(self):
+ with pytest.raises(AssertionError) as excinfo:
+ subp.subp(["some", "args"])
+ assert "allowed: whoami,bash" in str(excinfo.value)
+ subp.subp(['bash', '-c', 'true'])
+ subp.subp(['whoami'])
+
+ @pytest.mark.allow_all_subp
+ @pytest.mark.allow_subp_for("bash")
+ def test_both_marks_raise_an_error(self):
+ with pytest.raises(AssertionError, match="marked both"):
+ subp.subp(["bash"])
+
+
+class TestDisableSubpUsageInTestSubclass(CiTestCase):
+ """Test that disable_subp_usage doesn't impact CiTestCase's subp logic."""
+
+ def test_using_subp_raises_exception(self):
+ with pytest.raises(Exception):
+ subp.subp(["some", "args"])
+
+ def test_typeerrors_on_incorrect_usage(self):
+ with pytest.raises(TypeError):
+ subp.subp()
+
+ def test_subp_usage_can_be_reenabled(self):
+ _old_allowed_subp = self.allow_subp
+ self.allowed_subp = True
+ try:
+ subp.subp(['bash', '-c', 'true'])
+ finally:
+ self.allowed_subp = _old_allowed_subp
diff --git a/cloudinit/tests/test_features.py b/cloudinit/tests/test_features.py
new file mode 100644
index 00000000..d7a7226d
--- /dev/null
+++ b/cloudinit/tests/test_features.py
@@ -0,0 +1,60 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+# pylint: disable=no-member,no-name-in-module
+"""
+This file is for testing the feature flag functionality itself,
+NOT for testing any individual feature flag
+"""
+import pytest
+import sys
+from pathlib import Path
+
+import cloudinit
+
+
+@pytest.yield_fixture()
+def create_override(request):
+ """
+ Create a feature overrides file and do some module wizardry to make
+ it seem like we're importing the features file for the first time.
+
+ After creating the override file with the values passed by the test,
+ we need to reload cloudinit.features
+ to get all of the current features (including the overridden ones).
+ Once the test is complete, we remove the file we created and set
+ features and feature_overrides modules to how they were before
+ the test started
+ """
+ override_path = Path(cloudinit.__file__).parent / 'feature_overrides.py'
+ if override_path.exists():
+ raise Exception("feature_overrides.py unexpectedly exists! "
+ "Remove it to run this test.")
+ with override_path.open('w') as f:
+ for key, value in request.param.items():
+ f.write('{} = {}\n'.format(key, value))
+
+ sys.modules.pop('cloudinit.features', None)
+
+ yield
+
+ override_path.unlink()
+ sys.modules.pop('cloudinit.feature_overrides', None)
+
+
+class TestFeatures:
+ def test_feature_without_override(self):
+ from cloudinit.features import ERROR_ON_USER_DATA_FAILURE
+ assert ERROR_ON_USER_DATA_FAILURE is True
+
+ @pytest.mark.parametrize('create_override',
+ [{'ERROR_ON_USER_DATA_FAILURE': False}],
+ indirect=True)
+ def test_feature_with_override(self, create_override):
+ from cloudinit.features import ERROR_ON_USER_DATA_FAILURE
+ assert ERROR_ON_USER_DATA_FAILURE is False
+
+ @pytest.mark.parametrize('create_override',
+ [{'SPAM': True}],
+ indirect=True)
+ def test_feature_only_in_override(self, create_override):
+ from cloudinit.features import SPAM
+ assert SPAM is True
diff --git a/cloudinit/tests/test_gpg.py b/cloudinit/tests/test_gpg.py
index 8dd57137..f96f5372 100644
--- a/cloudinit/tests/test_gpg.py
+++ b/cloudinit/tests/test_gpg.py
@@ -4,19 +4,19 @@
from unittest import mock
from cloudinit import gpg
-from cloudinit import util
+from cloudinit import subp
from cloudinit.tests.helpers import CiTestCase
@mock.patch("cloudinit.gpg.time.sleep")
-@mock.patch("cloudinit.gpg.util.subp")
+@mock.patch("cloudinit.gpg.subp.subp")
class TestReceiveKeys(CiTestCase):
"""Test the recv_key method."""
def test_retries_on_subp_exc(self, m_subp, m_sleep):
"""retry should be done on gpg receive keys failure."""
retries = (1, 2, 4)
- my_exc = util.ProcessExecutionError(
+ my_exc = subp.ProcessExecutionError(
stdout='', stderr='', exit_code=2, cmd=['mycmd'])
m_subp.side_effect = (my_exc, my_exc, ('', ''))
gpg.recv_key("ABCD", "keyserver.example.com", retries=retries)
@@ -26,7 +26,7 @@ class TestReceiveKeys(CiTestCase):
"""If the final run fails, error should be raised."""
naplen = 1
keyid, keyserver = ("ABCD", "keyserver.example.com")
- m_subp.side_effect = util.ProcessExecutionError(
+ m_subp.side_effect = subp.ProcessExecutionError(
stdout='', stderr='', exit_code=2, cmd=['mycmd'])
with self.assertRaises(ValueError) as rcm:
gpg.recv_key(keyid, keyserver, retries=(naplen,))
@@ -36,7 +36,7 @@ class TestReceiveKeys(CiTestCase):
def test_no_retries_on_none(self, m_subp, m_sleep):
"""retry should not be done if retries is None."""
- m_subp.side_effect = util.ProcessExecutionError(
+ m_subp.side_effect = subp.ProcessExecutionError(
stdout='', stderr='', exit_code=2, cmd=['mycmd'])
with self.assertRaises(ValueError):
gpg.recv_key("ABCD", "keyserver.example.com", retries=None)
diff --git a/cloudinit/tests/test_netinfo.py b/cloudinit/tests/test_netinfo.py
index 1c8a791e..e44b16d8 100644
--- a/cloudinit/tests/test_netinfo.py
+++ b/cloudinit/tests/test_netinfo.py
@@ -27,8 +27,8 @@ class TestNetInfo(CiTestCase):
maxDiff = None
with_logs = True
- @mock.patch('cloudinit.netinfo.util.which')
- @mock.patch('cloudinit.netinfo.util.subp')
+ @mock.patch('cloudinit.netinfo.subp.which')
+ @mock.patch('cloudinit.netinfo.subp.subp')
def test_netdev_old_nettools_pformat(self, m_subp, m_which):
"""netdev_pformat properly rendering old nettools info."""
m_subp.return_value = (SAMPLE_OLD_IFCONFIG_OUT, '')
@@ -36,8 +36,8 @@ class TestNetInfo(CiTestCase):
content = netdev_pformat()
self.assertEqual(NETDEV_FORMATTED_OUT, content)
- @mock.patch('cloudinit.netinfo.util.which')
- @mock.patch('cloudinit.netinfo.util.subp')
+ @mock.patch('cloudinit.netinfo.subp.which')
+ @mock.patch('cloudinit.netinfo.subp.subp')
def test_netdev_new_nettools_pformat(self, m_subp, m_which):
"""netdev_pformat properly rendering netdev new nettools info."""
m_subp.return_value = (SAMPLE_NEW_IFCONFIG_OUT, '')
@@ -45,8 +45,8 @@ class TestNetInfo(CiTestCase):
content = netdev_pformat()
self.assertEqual(NETDEV_FORMATTED_OUT, content)
- @mock.patch('cloudinit.netinfo.util.which')
- @mock.patch('cloudinit.netinfo.util.subp')
+ @mock.patch('cloudinit.netinfo.subp.which')
+ @mock.patch('cloudinit.netinfo.subp.subp')
def test_netdev_freebsd_nettools_pformat(self, m_subp, m_which):
"""netdev_pformat properly rendering netdev new nettools info."""
m_subp.return_value = (SAMPLE_FREEBSD_IFCONFIG_OUT, '')
@@ -57,8 +57,8 @@ class TestNetInfo(CiTestCase):
print()
self.assertEqual(FREEBSD_NETDEV_OUT, content)
- @mock.patch('cloudinit.netinfo.util.which')
- @mock.patch('cloudinit.netinfo.util.subp')
+ @mock.patch('cloudinit.netinfo.subp.which')
+ @mock.patch('cloudinit.netinfo.subp.subp')
def test_netdev_iproute_pformat(self, m_subp, m_which):
"""netdev_pformat properly rendering ip route info."""
m_subp.return_value = (SAMPLE_IPADDRSHOW_OUT, '')
@@ -72,8 +72,8 @@ class TestNetInfo(CiTestCase):
'255.0.0.0 | . |', '255.0.0.0 | host |')
self.assertEqual(new_output, content)
- @mock.patch('cloudinit.netinfo.util.which')
- @mock.patch('cloudinit.netinfo.util.subp')
+ @mock.patch('cloudinit.netinfo.subp.which')
+ @mock.patch('cloudinit.netinfo.subp.subp')
def test_netdev_warn_on_missing_commands(self, m_subp, m_which):
"""netdev_pformat warns when missing both ip and 'netstat'."""
m_which.return_value = None # Niether ip nor netstat found
@@ -85,8 +85,8 @@ class TestNetInfo(CiTestCase):
self.logs.getvalue())
m_subp.assert_not_called()
- @mock.patch('cloudinit.netinfo.util.which')
- @mock.patch('cloudinit.netinfo.util.subp')
+ @mock.patch('cloudinit.netinfo.subp.which')
+ @mock.patch('cloudinit.netinfo.subp.subp')
def test_netdev_info_nettools_down(self, m_subp, m_which):
"""test netdev_info using nettools and down interfaces."""
m_subp.return_value = (
@@ -100,8 +100,8 @@ class TestNetInfo(CiTestCase):
'hwaddr': '.', 'up': True}},
netdev_info("."))
- @mock.patch('cloudinit.netinfo.util.which')
- @mock.patch('cloudinit.netinfo.util.subp')
+ @mock.patch('cloudinit.netinfo.subp.which')
+ @mock.patch('cloudinit.netinfo.subp.subp')
def test_netdev_info_iproute_down(self, m_subp, m_which):
"""Test netdev_info with ip and down interfaces."""
m_subp.return_value = (
@@ -130,8 +130,8 @@ class TestNetInfo(CiTestCase):
readResource("netinfo/netdev-formatted-output-down"),
netdev_pformat())
- @mock.patch('cloudinit.netinfo.util.which')
- @mock.patch('cloudinit.netinfo.util.subp')
+ @mock.patch('cloudinit.netinfo.subp.which')
+ @mock.patch('cloudinit.netinfo.subp.subp')
def test_route_nettools_pformat(self, m_subp, m_which):
"""route_pformat properly rendering nettools route info."""
@@ -147,8 +147,8 @@ class TestNetInfo(CiTestCase):
content = route_pformat()
self.assertEqual(ROUTE_FORMATTED_OUT, content)
- @mock.patch('cloudinit.netinfo.util.which')
- @mock.patch('cloudinit.netinfo.util.subp')
+ @mock.patch('cloudinit.netinfo.subp.which')
+ @mock.patch('cloudinit.netinfo.subp.subp')
def test_route_iproute_pformat(self, m_subp, m_which):
"""route_pformat properly rendering ip route info."""
@@ -165,8 +165,8 @@ class TestNetInfo(CiTestCase):
content = route_pformat()
self.assertEqual(ROUTE_FORMATTED_OUT, content)
- @mock.patch('cloudinit.netinfo.util.which')
- @mock.patch('cloudinit.netinfo.util.subp')
+ @mock.patch('cloudinit.netinfo.subp.which')
+ @mock.patch('cloudinit.netinfo.subp.subp')
def test_route_warn_on_missing_commands(self, m_subp, m_which):
"""route_pformat warns when missing both ip and 'netstat'."""
m_which.return_value = None # Niether ip nor netstat found
diff --git a/cloudinit/tests/test_subp.py b/cloudinit/tests/test_subp.py
index 448097d3..911c1f3d 100644
--- a/cloudinit/tests/test_subp.py
+++ b/cloudinit/tests/test_subp.py
@@ -2,10 +2,21 @@
"""Tests for cloudinit.subp utility functions"""
-from cloudinit import subp
+import json
+import os
+import sys
+import stat
+
+from unittest import mock
+
+from cloudinit import subp, util
from cloudinit.tests.helpers import CiTestCase
+BASH = subp.which('bash')
+BOGUS_COMMAND = 'this-is-not-expected-to-be-a-program-name'
+
+
class TestPrependBaseCommands(CiTestCase):
with_logs = True
@@ -58,4 +69,218 @@ class TestPrependBaseCommands(CiTestCase):
self.assertEqual('', self.logs.getvalue())
self.assertEqual(expected, fixed_commands)
+
+class TestSubp(CiTestCase):
+ allowed_subp = [BASH, 'cat', CiTestCase.SUBP_SHELL_TRUE,
+ BOGUS_COMMAND, sys.executable]
+
+ stdin2err = [BASH, '-c', 'cat >&2']
+ stdin2out = ['cat']
+ utf8_invalid = b'ab\xaadef'
+ utf8_valid = b'start \xc3\xa9 end'
+ utf8_valid_2 = b'd\xc3\xa9j\xc8\xa7'
+ printenv = [BASH, '-c', 'for n in "$@"; do echo "$n=${!n}"; done', '--']
+
+ def printf_cmd(self, *args):
+ # bash's printf supports \xaa. So does /usr/bin/printf
+ # but by using bash, we remove dependency on another program.
+ return([BASH, '-c', 'printf "$@"', 'printf'] + list(args))
+
+ def test_subp_handles_bytestrings(self):
+ """subp can run a bytestring command if shell is True."""
+ tmp_file = self.tmp_path('test.out')
+ cmd = 'echo HI MOM >> {tmp_file}'.format(tmp_file=tmp_file)
+ (out, _err) = subp.subp(cmd.encode('utf-8'), shell=True)
+ self.assertEqual(u'', out)
+ self.assertEqual(u'', _err)
+ self.assertEqual('HI MOM\n', util.load_file(tmp_file))
+
+ def test_subp_handles_strings(self):
+ """subp can run a string command if shell is True."""
+ tmp_file = self.tmp_path('test.out')
+ cmd = 'echo HI MOM >> {tmp_file}'.format(tmp_file=tmp_file)
+ (out, _err) = subp.subp(cmd, shell=True)
+ self.assertEqual(u'', out)
+ self.assertEqual(u'', _err)
+ self.assertEqual('HI MOM\n', util.load_file(tmp_file))
+
+ def test_subp_handles_utf8(self):
+ # The given bytes contain utf-8 accented characters as seen in e.g.
+ # the "deja dup" package in Ubuntu.
+ cmd = self.printf_cmd(self.utf8_valid_2)
+ (out, _err) = subp.subp(cmd, capture=True)
+ self.assertEqual(out, self.utf8_valid_2.decode('utf-8'))
+
+ def test_subp_respects_decode_false(self):
+ (out, err) = subp.subp(self.stdin2out, capture=True, decode=False,
+ data=self.utf8_valid)
+ self.assertTrue(isinstance(out, bytes))
+ self.assertTrue(isinstance(err, bytes))
+ self.assertEqual(out, self.utf8_valid)
+
+ def test_subp_decode_ignore(self):
+ # this executes a string that writes invalid utf-8 to stdout
+ (out, _err) = subp.subp(self.printf_cmd('abc\\xaadef'),
+ capture=True, decode='ignore')
+ self.assertEqual(out, 'abcdef')
+
+ def test_subp_decode_strict_valid_utf8(self):
+ (out, _err) = subp.subp(self.stdin2out, capture=True,
+ decode='strict', data=self.utf8_valid)
+ self.assertEqual(out, self.utf8_valid.decode('utf-8'))
+
+ def test_subp_decode_invalid_utf8_replaces(self):
+ (out, _err) = subp.subp(self.stdin2out, capture=True,
+ data=self.utf8_invalid)
+ expected = self.utf8_invalid.decode('utf-8', 'replace')
+ self.assertEqual(out, expected)
+
+ def test_subp_decode_strict_raises(self):
+ args = []
+ kwargs = {'args': self.stdin2out, 'capture': True,
+ 'decode': 'strict', 'data': self.utf8_invalid}
+ self.assertRaises(UnicodeDecodeError, subp.subp, *args, **kwargs)
+
+ def test_subp_capture_stderr(self):
+ data = b'hello world'
+ (out, err) = subp.subp(self.stdin2err, capture=True,
+ decode=False, data=data,
+ update_env={'LC_ALL': 'C'})
+ self.assertEqual(err, data)
+ self.assertEqual(out, b'')
+
+ def test_subp_reads_env(self):
+ with mock.patch.dict("os.environ", values={'FOO': 'BAR'}):
+ out, _err = subp.subp(self.printenv + ['FOO'], capture=True)
+ self.assertEqual('FOO=BAR', out.splitlines()[0])
+
+ def test_subp_env_and_update_env(self):
+ out, _err = subp.subp(
+ self.printenv + ['FOO', 'HOME', 'K1', 'K2'], capture=True,
+ env={'FOO': 'BAR'},
+ update_env={'HOME': '/myhome', 'K2': 'V2'})
+ self.assertEqual(
+ ['FOO=BAR', 'HOME=/myhome', 'K1=', 'K2=V2'], out.splitlines())
+
+ def test_subp_update_env(self):
+ extra = {'FOO': 'BAR', 'HOME': '/root', 'K1': 'V1'}
+ with mock.patch.dict("os.environ", values=extra):
+ out, _err = subp.subp(
+ self.printenv + ['FOO', 'HOME', 'K1', 'K2'], capture=True,
+ update_env={'HOME': '/myhome', 'K2': 'V2'})
+
+ self.assertEqual(
+ ['FOO=BAR', 'HOME=/myhome', 'K1=V1', 'K2=V2'], out.splitlines())
+
+ def test_subp_warn_missing_shebang(self):
+ """Warn on no #! in script"""
+ noshebang = self.tmp_path('noshebang')
+ util.write_file(noshebang, 'true\n')
+
+ print("os is %s" % os)
+ os.chmod(noshebang, os.stat(noshebang).st_mode | stat.S_IEXEC)
+ with self.allow_subp([noshebang]):
+ self.assertRaisesRegex(subp.ProcessExecutionError,
+ r'Missing #! in script\?',
+ subp.subp, (noshebang,))
+
+ def test_subp_combined_stderr_stdout(self):
+ """Providing combine_capture as True redirects stderr to stdout."""
+ data = b'hello world'
+ (out, err) = subp.subp(self.stdin2err, capture=True,
+ combine_capture=True, decode=False, data=data)
+ self.assertEqual(b'', err)
+ self.assertEqual(data, out)
+
+ def test_returns_none_if_no_capture(self):
+ (out, err) = subp.subp(self.stdin2out, data=b'', capture=False)
+ self.assertIsNone(err)
+ self.assertIsNone(out)
+
+ def test_exception_has_out_err_are_bytes_if_decode_false(self):
+ """Raised exc should have stderr, stdout as bytes if no decode."""
+ with self.assertRaises(subp.ProcessExecutionError) as cm:
+ subp.subp([BOGUS_COMMAND], decode=False)
+ self.assertTrue(isinstance(cm.exception.stdout, bytes))
+ self.assertTrue(isinstance(cm.exception.stderr, bytes))
+
+ def test_exception_has_out_err_are_bytes_if_decode_true(self):
+ """Raised exc should have stderr, stdout as string if no decode."""
+ with self.assertRaises(subp.ProcessExecutionError) as cm:
+ subp.subp([BOGUS_COMMAND], decode=True)
+ self.assertTrue(isinstance(cm.exception.stdout, str))
+ self.assertTrue(isinstance(cm.exception.stderr, str))
+
+ def test_bunch_of_slashes_in_path(self):
+ self.assertEqual("/target/my/path/",
+ subp.target_path("/target/", "//my/path/"))
+ self.assertEqual("/target/my/path/",
+ subp.target_path("/target/", "///my/path/"))
+
+ def test_c_lang_can_take_utf8_args(self):
+ """Independent of system LC_CTYPE, args can contain utf-8 strings.
+
+ When python starts up, its default encoding gets set based on
+ the value of LC_CTYPE. If no system locale is set, the default
+ encoding for both python2 and python3 in some paths will end up
+ being ascii.
+
+ Attempts to use setlocale or patching (or changing) os.environ
+ in the current environment seem to not be effective.
+
+ This test starts up a python with LC_CTYPE set to C so that
+ the default encoding will be set to ascii. In such an environment
+ Popen(['command', 'non-ascii-arg']) would cause a UnicodeDecodeError.
+ """
+ python_prog = '\n'.join([
+ 'import json, sys',
+ 'from cloudinit.subp import subp',
+ 'data = sys.stdin.read()',
+ 'cmd = json.loads(data)',
+ 'subp(cmd, capture=False)',
+ ''])
+ cmd = [BASH, '-c', 'echo -n "$@"', '--',
+ self.utf8_valid.decode("utf-8")]
+ python_subp = [sys.executable, '-c', python_prog]
+
+ out, _err = subp.subp(
+ python_subp, update_env={'LC_CTYPE': 'C'},
+ data=json.dumps(cmd).encode("utf-8"),
+ decode=False)
+ self.assertEqual(self.utf8_valid, out)
+
+ def test_bogus_command_logs_status_messages(self):
+ """status_cb gets status messages logs on bogus commands provided."""
+ logs = []
+
+ def status_cb(log):
+ logs.append(log)
+
+ with self.assertRaises(subp.ProcessExecutionError):
+ subp.subp([BOGUS_COMMAND], status_cb=status_cb)
+
+ expected = [
+ 'Begin run command: {cmd}\n'.format(cmd=BOGUS_COMMAND),
+ 'ERROR: End run command: invalid command provided\n']
+ self.assertEqual(expected, logs)
+
+ def test_command_logs_exit_codes_to_status_cb(self):
+ """status_cb gets status messages containing command exit code."""
+ logs = []
+
+ def status_cb(log):
+ logs.append(log)
+
+ with self.assertRaises(subp.ProcessExecutionError):
+ subp.subp([BASH, '-c', 'exit 2'], status_cb=status_cb)
+ subp.subp([BASH, '-c', 'exit 0'], status_cb=status_cb)
+
+ expected = [
+ 'Begin run command: %s -c exit 2\n' % BASH,
+ 'ERROR: End run command: exit(2)\n',
+ 'Begin run command: %s -c exit 0\n' % BASH,
+ 'End run command: exit(0)\n']
+ self.assertEqual(expected, logs)
+
+
# vi: ts=4 expandtab
diff --git a/cloudinit/tests/test_url_helper.py b/cloudinit/tests/test_url_helper.py
index 1674120f..364ec822 100644
--- a/cloudinit/tests/test_url_helper.py
+++ b/cloudinit/tests/test_url_helper.py
@@ -1,7 +1,8 @@
# This file is part of cloud-init. See LICENSE file for license information.
from cloudinit.url_helper import (
- NOT_FOUND, UrlError, oauth_headers, read_file_or_url, retry_on_url_exc)
+ NOT_FOUND, UrlError, REDACTED, oauth_headers, read_file_or_url,
+ retry_on_url_exc)
from cloudinit.tests.helpers import CiTestCase, mock, skipIf
from cloudinit import util
from cloudinit import version
@@ -50,6 +51,9 @@ class TestOAuthHeaders(CiTestCase):
class TestReadFileOrUrl(CiTestCase):
+
+ with_logs = True
+
def test_read_file_or_url_str_from_file(self):
"""Test that str(result.contents) on file is text version of contents.
It should not be "b'data'", but just "'data'" """
@@ -71,6 +75,34 @@ class TestReadFileOrUrl(CiTestCase):
self.assertEqual(result.contents, data)
self.assertEqual(str(result), data.decode('utf-8'))
+ @httpretty.activate
+ def test_read_file_or_url_str_from_url_redacting_headers_from_logs(self):
+ """Headers are redacted from logs but unredacted in requests."""
+ url = 'http://hostname/path'
+ headers = {'sensitive': 'sekret', 'server': 'blah'}
+ httpretty.register_uri(httpretty.GET, url)
+
+ read_file_or_url(url, headers=headers, headers_redact=['sensitive'])
+ logs = self.logs.getvalue()
+ for k in headers.keys():
+ self.assertEqual(headers[k], httpretty.last_request().headers[k])
+ self.assertIn(REDACTED, logs)
+ self.assertNotIn('sekret', logs)
+
+ @httpretty.activate
+ def test_read_file_or_url_str_from_url_redacts_noheaders(self):
+ """When no headers_redact, header values are in logs and requests."""
+ url = 'http://hostname/path'
+ headers = {'sensitive': 'sekret', 'server': 'blah'}
+ httpretty.register_uri(httpretty.GET, url)
+
+ read_file_or_url(url, headers=headers)
+ for k in headers.keys():
+ self.assertEqual(headers[k], httpretty.last_request().headers[k])
+ logs = self.logs.getvalue()
+ self.assertNotIn(REDACTED, logs)
+ self.assertIn('sekret', logs)
+
@mock.patch(M_PATH + 'readurl')
def test_read_file_or_url_passes_params_to_readurl(self, m_readurl):
"""read_file_or_url passes all params through to readurl."""
diff --git a/cloudinit/tests/test_util.py b/cloudinit/tests/test_util.py
index 11f37000..096a3037 100644
--- a/cloudinit/tests/test_util.py
+++ b/cloudinit/tests/test_util.py
@@ -6,8 +6,10 @@ import base64
import logging
import json
import platform
+import pytest
import cloudinit.util as util
+from cloudinit import subp
from cloudinit.tests.helpers import CiTestCase, mock
from textwrap import dedent
@@ -331,7 +333,7 @@ class TestBlkid(CiTestCase):
"PARTUUID": self.ids["id09"]},
})
- @mock.patch("cloudinit.util.subp")
+ @mock.patch("cloudinit.subp.subp")
def test_functional_blkid(self, m_subp):
m_subp.return_value = (
self.blkid_out.format(**self.ids), "")
@@ -339,7 +341,7 @@ class TestBlkid(CiTestCase):
m_subp.assert_called_with(["blkid", "-o", "full"], capture=True,
decode="replace")
- @mock.patch("cloudinit.util.subp")
+ @mock.patch("cloudinit.subp.subp")
def test_blkid_no_cache_uses_no_cache(self, m_subp):
"""blkid should turn off cache if disable_cache is true."""
m_subp.return_value = (
@@ -350,7 +352,7 @@ class TestBlkid(CiTestCase):
capture=True, decode="replace")
-@mock.patch('cloudinit.util.subp')
+@mock.patch('cloudinit.subp.subp')
class TestUdevadmSettle(CiTestCase):
def test_with_no_params(self, m_subp):
"""called with no parameters."""
@@ -395,8 +397,8 @@ class TestUdevadmSettle(CiTestCase):
'--timeout=%s' % timeout])
def test_subp_exception_raises_to_caller(self, m_subp):
- m_subp.side_effect = util.ProcessExecutionError("BOOM")
- self.assertRaises(util.ProcessExecutionError, util.udevadm_settle)
+ m_subp.side_effect = subp.ProcessExecutionError("BOOM")
+ self.assertRaises(subp.ProcessExecutionError, util.udevadm_settle)
@mock.patch('os.path.exists')
@@ -419,12 +421,6 @@ class TestGetLinuxDistro(CiTestCase):
if path == '/etc/redhat-release':
return 1
- @classmethod
- def freebsd_version_exists(self, path):
- """Side effect function """
- if path == '/bin/freebsd-version':
- return 1
-
@mock.patch('cloudinit.util.load_file')
def test_get_linux_distro_quoted_name(self, m_os_release, m_path_exists):
"""Verify we get the correct name if the os-release file has
@@ -443,11 +439,18 @@ class TestGetLinuxDistro(CiTestCase):
dist = util.get_linux_distro()
self.assertEqual(('ubuntu', '16.04', 'xenial'), dist)
- @mock.patch('cloudinit.util.subp')
- def test_get_linux_freebsd(self, m_subp, m_path_exists):
+ @mock.patch('platform.system')
+ @mock.patch('platform.release')
+ @mock.patch('cloudinit.util._parse_redhat_release')
+ def test_get_linux_freebsd(self, m_parse_redhat_release,
+ m_platform_release,
+ m_platform_system, m_path_exists):
"""Verify we get the correct name and release name on FreeBSD."""
- m_path_exists.side_effect = TestGetLinuxDistro.freebsd_version_exists
- m_subp.return_value = ("12.0-RELEASE-p10\n", '')
+ m_path_exists.return_value = False
+ m_platform_release.return_value = '12.0-RELEASE-p10'
+ m_platform_system.return_value = 'FreeBSD'
+ m_parse_redhat_release.return_value = {}
+ util.is_BSD.cache_clear()
dist = util.get_linux_distro()
self.assertEqual(('freebsd', '12.0-RELEASE-p10', ''), dist)
@@ -538,27 +541,36 @@ class TestGetLinuxDistro(CiTestCase):
self.assertEqual(
('opensuse-tumbleweed', '20180920', platform.machine()), dist)
+ @mock.patch('platform.system')
@mock.patch('platform.dist', create=True)
- def test_get_linux_distro_no_data(self, m_platform_dist, m_path_exists):
+ def test_get_linux_distro_no_data(self, m_platform_dist,
+ m_platform_system, m_path_exists):
"""Verify we get no information if os-release does not exist"""
m_platform_dist.return_value = ('', '', '')
+ m_platform_system.return_value = "Linux"
m_path_exists.return_value = 0
dist = util.get_linux_distro()
self.assertEqual(('', '', ''), dist)
+ @mock.patch('platform.system')
@mock.patch('platform.dist', create=True)
- def test_get_linux_distro_no_impl(self, m_platform_dist, m_path_exists):
+ def test_get_linux_distro_no_impl(self, m_platform_dist,
+ m_platform_system, m_path_exists):
"""Verify we get an empty tuple when no information exists and
Exceptions are not propagated"""
m_platform_dist.side_effect = Exception()
+ m_platform_system.return_value = "Linux"
m_path_exists.return_value = 0
dist = util.get_linux_distro()
self.assertEqual(('', '', ''), dist)
+ @mock.patch('platform.system')
@mock.patch('platform.dist', create=True)
- def test_get_linux_distro_plat_data(self, m_platform_dist, m_path_exists):
+ def test_get_linux_distro_plat_data(self, m_platform_dist,
+ m_platform_system, m_path_exists):
"""Verify we get the correct platform information"""
m_platform_dist.return_value = ('foo', '1.1', 'aarch64')
+ m_platform_system.return_value = "Linux"
m_path_exists.return_value = 0
dist = util.get_linux_distro()
self.assertEqual(('foo', '1.1', 'aarch64'), dist)
@@ -597,4 +609,166 @@ class TestIsLXD(CiTestCase):
self.assertFalse(util.is_lxd())
m_exists.assert_called_once_with('/dev/lxd/sock')
+
+class TestReadCcFromCmdline:
+
+ @pytest.mark.parametrize(
+ "cmdline,expected_cfg",
+ [
+ # Return None if cmdline has no cc:<YAML>end_cc content.
+ (CiTestCase.random_string(), None),
+ # Return None if YAML content is empty string.
+ ('foo cc: end_cc bar', None),
+ # Return expected dictionary without trailing end_cc marker.
+ ('foo cc: ssh_pwauth: true', {'ssh_pwauth': True}),
+ # Return expected dictionary w escaped newline and no end_cc.
+ ('foo cc: ssh_pwauth: true\\n', {'ssh_pwauth': True}),
+ # Return expected dictionary of yaml between cc: and end_cc.
+ ('foo cc: ssh_pwauth: true end_cc bar', {'ssh_pwauth': True}),
+ # Return dict with list value w escaped newline, no end_cc.
+ (
+ 'cc: ssh_import_id: [smoser, kirkland]\\n',
+ {'ssh_import_id': ['smoser', 'kirkland']}
+ ),
+ # Parse urlencoded brackets in yaml content.
+ (
+ 'cc: ssh_import_id: %5Bsmoser, kirkland%5D end_cc',
+ {'ssh_import_id': ['smoser', 'kirkland']}
+ ),
+ # Parse complete urlencoded yaml content.
+ (
+ 'cc: ssh_import_id%3A%20%5Buser1%2C%20user2%5D end_cc',
+ {'ssh_import_id': ['user1', 'user2']}
+ ),
+ # Parse nested dictionary in yaml content.
+ (
+ 'cc: ntp: {enabled: true, ntp_client: myclient} end_cc',
+ {'ntp': {'enabled': True, 'ntp_client': 'myclient'}}
+ ),
+ # Parse single mapping value in yaml content.
+ ('cc: ssh_import_id: smoser end_cc', {'ssh_import_id': 'smoser'}),
+ # Parse multiline content with multiple mapping and nested lists.
+ (
+ ('cc: ssh_import_id: [smoser, bob]\\n'
+ 'runcmd: [ [ ls, -l ], echo hi ] end_cc'),
+ {'ssh_import_id': ['smoser', 'bob'],
+ 'runcmd': [['ls', '-l'], 'echo hi']}
+ ),
+ # Parse multiline encoded content w/ mappings and nested lists.
+ (
+ ('cc: ssh_import_id: %5Bsmoser, bob%5D\\n'
+ 'runcmd: [ [ ls, -l ], echo hi ] end_cc'),
+ {'ssh_import_id': ['smoser', 'bob'],
+ 'runcmd': [['ls', '-l'], 'echo hi']}
+ ),
+ # test encoded escaped newlines work.
+ #
+ # unquote(encoded_content)
+ # 'ssh_import_id: [smoser, bob]\\nruncmd: [ [ ls, -l ], echo hi ]'
+ (
+ ('cc: ' +
+ ('ssh_import_id%3A%20%5Bsmoser%2C%20bob%5D%5Cn'
+ 'runcmd%3A%20%5B%20%5B%20ls%2C%20-l%20%5D%2C'
+ '%20echo%20hi%20%5D') + ' end_cc'),
+ {'ssh_import_id': ['smoser', 'bob'],
+ 'runcmd': [['ls', '-l'], 'echo hi']}
+ ),
+ # test encoded newlines work.
+ #
+ # unquote(encoded_content)
+ # 'ssh_import_id: [smoser, bob]\nruncmd: [ [ ls, -l ], echo hi ]'
+ (
+ ("cc: " +
+ ('ssh_import_id%3A%20%5Bsmoser%2C%20bob%5D%0A'
+ 'runcmd%3A%20%5B%20%5B%20ls%2C%20-l%20%5D%2C'
+ '%20echo%20hi%20%5D') + ' end_cc'),
+ {'ssh_import_id': ['smoser', 'bob'],
+ 'runcmd': [['ls', '-l'], 'echo hi']}
+ ),
+ # Parse and merge multiple yaml content sections.
+ (
+ ('cc:ssh_import_id: [smoser, bob] end_cc '
+ 'cc: runcmd: [ [ ls, -l ] ] end_cc'),
+ {'ssh_import_id': ['smoser', 'bob'],
+ 'runcmd': [['ls', '-l']]}
+ ),
+ # Parse and merge multiple encoded yaml content sections.
+ (
+ ('cc:ssh_import_id%3A%20%5Bsmoser%5D end_cc '
+ 'cc:runcmd%3A%20%5B%20%5B%20ls%2C%20-l%20%5D%20%5D end_cc'),
+ {'ssh_import_id': ['smoser'], 'runcmd': [['ls', '-l']]}
+ ),
+ ]
+ )
+ def test_read_conf_from_cmdline_config(self, expected_cfg, cmdline):
+ assert expected_cfg == util.read_conf_from_cmdline(cmdline=cmdline)
+
+
+class TestMountCb:
+ """Tests for ``util.mount_cb``.
+
+ These tests consider the "unit" under test to be ``util.mount_cb`` and
+ ``util.unmounter``, which is only used by ``mount_cb``.
+
+ TODO: Test default mtype determination
+ TODO: Test the if/else branch that actually performs the mounting operation
+ """
+
+ @pytest.yield_fixture
+ def already_mounted_device_and_mountdict(self):
+ """Mock an already-mounted device, and yield (device, mount dict)"""
+ device = "/dev/fake0"
+ mountpoint = "/mnt/fake"
+ with mock.patch("cloudinit.util.subp.subp"):
+ with mock.patch("cloudinit.util.mounts") as m_mounts:
+ mounts = {device: {"mountpoint": mountpoint}}
+ m_mounts.return_value = mounts
+ yield device, mounts[device]
+
+ @pytest.fixture
+ def already_mounted_device(self, already_mounted_device_and_mountdict):
+ """already_mounted_device_and_mountdict, but return only the device"""
+ return already_mounted_device_and_mountdict[0]
+
+ @pytest.mark.parametrize("invalid_mtype", [int(0), float(0.0), dict()])
+ def test_typeerror_raised_for_invalid_mtype(self, invalid_mtype):
+ with pytest.raises(TypeError):
+ util.mount_cb(mock.Mock(), mock.Mock(), mtype=invalid_mtype)
+
+ @mock.patch("cloudinit.util.subp.subp")
+ def test_already_mounted_does_not_mount_or_umount_anything(
+ self, m_subp, already_mounted_device
+ ):
+ util.mount_cb(already_mounted_device, mock.Mock())
+
+ assert 0 == m_subp.call_count
+
+ @pytest.mark.parametrize("trailing_slash_in_mounts", ["/", ""])
+ def test_already_mounted_calls_callback(
+ self, trailing_slash_in_mounts, already_mounted_device_and_mountdict
+ ):
+ device, mount_dict = already_mounted_device_and_mountdict
+ mountpoint = mount_dict["mountpoint"]
+ mount_dict["mountpoint"] += trailing_slash_in_mounts
+
+ callback = mock.Mock()
+ util.mount_cb(device, callback)
+
+ # The mountpoint passed to callback should always have a trailing
+ # slash, regardless of the input
+ assert [mock.call(mountpoint + "/")] == callback.call_args_list
+
+ def test_already_mounted_calls_callback_with_data(
+ self, already_mounted_device
+ ):
+ callback = mock.Mock()
+ util.mount_cb(
+ already_mounted_device, callback, data=mock.sentinel.data
+ )
+
+ assert [
+ mock.call(mock.ANY, mock.sentinel.data)
+ ] == callback.call_args_list
+
+
# vi: ts=4 expandtab