summaryrefslogtreecommitdiff
path: root/cloudinit/tests/test_util.py
diff options
context:
space:
mode:
Diffstat (limited to 'cloudinit/tests/test_util.py')
-rw-r--r--cloudinit/tests/test_util.py210
1 files changed, 192 insertions, 18 deletions
diff --git a/cloudinit/tests/test_util.py b/cloudinit/tests/test_util.py
index 11f37000..096a3037 100644
--- a/cloudinit/tests/test_util.py
+++ b/cloudinit/tests/test_util.py
@@ -6,8 +6,10 @@ import base64
import logging
import json
import platform
+import pytest
import cloudinit.util as util
+from cloudinit import subp
from cloudinit.tests.helpers import CiTestCase, mock
from textwrap import dedent
@@ -331,7 +333,7 @@ class TestBlkid(CiTestCase):
"PARTUUID": self.ids["id09"]},
})
- @mock.patch("cloudinit.util.subp")
+ @mock.patch("cloudinit.subp.subp")
def test_functional_blkid(self, m_subp):
m_subp.return_value = (
self.blkid_out.format(**self.ids), "")
@@ -339,7 +341,7 @@ class TestBlkid(CiTestCase):
m_subp.assert_called_with(["blkid", "-o", "full"], capture=True,
decode="replace")
- @mock.patch("cloudinit.util.subp")
+ @mock.patch("cloudinit.subp.subp")
def test_blkid_no_cache_uses_no_cache(self, m_subp):
"""blkid should turn off cache if disable_cache is true."""
m_subp.return_value = (
@@ -350,7 +352,7 @@ class TestBlkid(CiTestCase):
capture=True, decode="replace")
-@mock.patch('cloudinit.util.subp')
+@mock.patch('cloudinit.subp.subp')
class TestUdevadmSettle(CiTestCase):
def test_with_no_params(self, m_subp):
"""called with no parameters."""
@@ -395,8 +397,8 @@ class TestUdevadmSettle(CiTestCase):
'--timeout=%s' % timeout])
def test_subp_exception_raises_to_caller(self, m_subp):
- m_subp.side_effect = util.ProcessExecutionError("BOOM")
- self.assertRaises(util.ProcessExecutionError, util.udevadm_settle)
+ m_subp.side_effect = subp.ProcessExecutionError("BOOM")
+ self.assertRaises(subp.ProcessExecutionError, util.udevadm_settle)
@mock.patch('os.path.exists')
@@ -419,12 +421,6 @@ class TestGetLinuxDistro(CiTestCase):
if path == '/etc/redhat-release':
return 1
- @classmethod
- def freebsd_version_exists(self, path):
- """Side effect function """
- if path == '/bin/freebsd-version':
- return 1
-
@mock.patch('cloudinit.util.load_file')
def test_get_linux_distro_quoted_name(self, m_os_release, m_path_exists):
"""Verify we get the correct name if the os-release file has
@@ -443,11 +439,18 @@ class TestGetLinuxDistro(CiTestCase):
dist = util.get_linux_distro()
self.assertEqual(('ubuntu', '16.04', 'xenial'), dist)
- @mock.patch('cloudinit.util.subp')
- def test_get_linux_freebsd(self, m_subp, m_path_exists):
+ @mock.patch('platform.system')
+ @mock.patch('platform.release')
+ @mock.patch('cloudinit.util._parse_redhat_release')
+ def test_get_linux_freebsd(self, m_parse_redhat_release,
+ m_platform_release,
+ m_platform_system, m_path_exists):
"""Verify we get the correct name and release name on FreeBSD."""
- m_path_exists.side_effect = TestGetLinuxDistro.freebsd_version_exists
- m_subp.return_value = ("12.0-RELEASE-p10\n", '')
+ m_path_exists.return_value = False
+ m_platform_release.return_value = '12.0-RELEASE-p10'
+ m_platform_system.return_value = 'FreeBSD'
+ m_parse_redhat_release.return_value = {}
+ util.is_BSD.cache_clear()
dist = util.get_linux_distro()
self.assertEqual(('freebsd', '12.0-RELEASE-p10', ''), dist)
@@ -538,27 +541,36 @@ class TestGetLinuxDistro(CiTestCase):
self.assertEqual(
('opensuse-tumbleweed', '20180920', platform.machine()), dist)
+ @mock.patch('platform.system')
@mock.patch('platform.dist', create=True)
- def test_get_linux_distro_no_data(self, m_platform_dist, m_path_exists):
+ def test_get_linux_distro_no_data(self, m_platform_dist,
+ m_platform_system, m_path_exists):
"""Verify we get no information if os-release does not exist"""
m_platform_dist.return_value = ('', '', '')
+ m_platform_system.return_value = "Linux"
m_path_exists.return_value = 0
dist = util.get_linux_distro()
self.assertEqual(('', '', ''), dist)
+ @mock.patch('platform.system')
@mock.patch('platform.dist', create=True)
- def test_get_linux_distro_no_impl(self, m_platform_dist, m_path_exists):
+ def test_get_linux_distro_no_impl(self, m_platform_dist,
+ m_platform_system, m_path_exists):
"""Verify we get an empty tuple when no information exists and
Exceptions are not propagated"""
m_platform_dist.side_effect = Exception()
+ m_platform_system.return_value = "Linux"
m_path_exists.return_value = 0
dist = util.get_linux_distro()
self.assertEqual(('', '', ''), dist)
+ @mock.patch('platform.system')
@mock.patch('platform.dist', create=True)
- def test_get_linux_distro_plat_data(self, m_platform_dist, m_path_exists):
+ def test_get_linux_distro_plat_data(self, m_platform_dist,
+ m_platform_system, m_path_exists):
"""Verify we get the correct platform information"""
m_platform_dist.return_value = ('foo', '1.1', 'aarch64')
+ m_platform_system.return_value = "Linux"
m_path_exists.return_value = 0
dist = util.get_linux_distro()
self.assertEqual(('foo', '1.1', 'aarch64'), dist)
@@ -597,4 +609,166 @@ class TestIsLXD(CiTestCase):
self.assertFalse(util.is_lxd())
m_exists.assert_called_once_with('/dev/lxd/sock')
+
+class TestReadCcFromCmdline:
+
+ @pytest.mark.parametrize(
+ "cmdline,expected_cfg",
+ [
+ # Return None if cmdline has no cc:<YAML>end_cc content.
+ (CiTestCase.random_string(), None),
+ # Return None if YAML content is empty string.
+ ('foo cc: end_cc bar', None),
+ # Return expected dictionary without trailing end_cc marker.
+ ('foo cc: ssh_pwauth: true', {'ssh_pwauth': True}),
+ # Return expected dictionary w escaped newline and no end_cc.
+ ('foo cc: ssh_pwauth: true\\n', {'ssh_pwauth': True}),
+ # Return expected dictionary of yaml between cc: and end_cc.
+ ('foo cc: ssh_pwauth: true end_cc bar', {'ssh_pwauth': True}),
+ # Return dict with list value w escaped newline, no end_cc.
+ (
+ 'cc: ssh_import_id: [smoser, kirkland]\\n',
+ {'ssh_import_id': ['smoser', 'kirkland']}
+ ),
+ # Parse urlencoded brackets in yaml content.
+ (
+ 'cc: ssh_import_id: %5Bsmoser, kirkland%5D end_cc',
+ {'ssh_import_id': ['smoser', 'kirkland']}
+ ),
+ # Parse complete urlencoded yaml content.
+ (
+ 'cc: ssh_import_id%3A%20%5Buser1%2C%20user2%5D end_cc',
+ {'ssh_import_id': ['user1', 'user2']}
+ ),
+ # Parse nested dictionary in yaml content.
+ (
+ 'cc: ntp: {enabled: true, ntp_client: myclient} end_cc',
+ {'ntp': {'enabled': True, 'ntp_client': 'myclient'}}
+ ),
+ # Parse single mapping value in yaml content.
+ ('cc: ssh_import_id: smoser end_cc', {'ssh_import_id': 'smoser'}),
+ # Parse multiline content with multiple mapping and nested lists.
+ (
+ ('cc: ssh_import_id: [smoser, bob]\\n'
+ 'runcmd: [ [ ls, -l ], echo hi ] end_cc'),
+ {'ssh_import_id': ['smoser', 'bob'],
+ 'runcmd': [['ls', '-l'], 'echo hi']}
+ ),
+ # Parse multiline encoded content w/ mappings and nested lists.
+ (
+ ('cc: ssh_import_id: %5Bsmoser, bob%5D\\n'
+ 'runcmd: [ [ ls, -l ], echo hi ] end_cc'),
+ {'ssh_import_id': ['smoser', 'bob'],
+ 'runcmd': [['ls', '-l'], 'echo hi']}
+ ),
+ # test encoded escaped newlines work.
+ #
+ # unquote(encoded_content)
+ # 'ssh_import_id: [smoser, bob]\\nruncmd: [ [ ls, -l ], echo hi ]'
+ (
+ ('cc: ' +
+ ('ssh_import_id%3A%20%5Bsmoser%2C%20bob%5D%5Cn'
+ 'runcmd%3A%20%5B%20%5B%20ls%2C%20-l%20%5D%2C'
+ '%20echo%20hi%20%5D') + ' end_cc'),
+ {'ssh_import_id': ['smoser', 'bob'],
+ 'runcmd': [['ls', '-l'], 'echo hi']}
+ ),
+ # test encoded newlines work.
+ #
+ # unquote(encoded_content)
+ # 'ssh_import_id: [smoser, bob]\nruncmd: [ [ ls, -l ], echo hi ]'
+ (
+ ("cc: " +
+ ('ssh_import_id%3A%20%5Bsmoser%2C%20bob%5D%0A'
+ 'runcmd%3A%20%5B%20%5B%20ls%2C%20-l%20%5D%2C'
+ '%20echo%20hi%20%5D') + ' end_cc'),
+ {'ssh_import_id': ['smoser', 'bob'],
+ 'runcmd': [['ls', '-l'], 'echo hi']}
+ ),
+ # Parse and merge multiple yaml content sections.
+ (
+ ('cc:ssh_import_id: [smoser, bob] end_cc '
+ 'cc: runcmd: [ [ ls, -l ] ] end_cc'),
+ {'ssh_import_id': ['smoser', 'bob'],
+ 'runcmd': [['ls', '-l']]}
+ ),
+ # Parse and merge multiple encoded yaml content sections.
+ (
+ ('cc:ssh_import_id%3A%20%5Bsmoser%5D end_cc '
+ 'cc:runcmd%3A%20%5B%20%5B%20ls%2C%20-l%20%5D%20%5D end_cc'),
+ {'ssh_import_id': ['smoser'], 'runcmd': [['ls', '-l']]}
+ ),
+ ]
+ )
+ def test_read_conf_from_cmdline_config(self, expected_cfg, cmdline):
+ assert expected_cfg == util.read_conf_from_cmdline(cmdline=cmdline)
+
+
+class TestMountCb:
+ """Tests for ``util.mount_cb``.
+
+ These tests consider the "unit" under test to be ``util.mount_cb`` and
+ ``util.unmounter``, which is only used by ``mount_cb``.
+
+ TODO: Test default mtype determination
+ TODO: Test the if/else branch that actually performs the mounting operation
+ """
+
+ @pytest.yield_fixture
+ def already_mounted_device_and_mountdict(self):
+ """Mock an already-mounted device, and yield (device, mount dict)"""
+ device = "/dev/fake0"
+ mountpoint = "/mnt/fake"
+ with mock.patch("cloudinit.util.subp.subp"):
+ with mock.patch("cloudinit.util.mounts") as m_mounts:
+ mounts = {device: {"mountpoint": mountpoint}}
+ m_mounts.return_value = mounts
+ yield device, mounts[device]
+
+ @pytest.fixture
+ def already_mounted_device(self, already_mounted_device_and_mountdict):
+ """already_mounted_device_and_mountdict, but return only the device"""
+ return already_mounted_device_and_mountdict[0]
+
+ @pytest.mark.parametrize("invalid_mtype", [int(0), float(0.0), dict()])
+ def test_typeerror_raised_for_invalid_mtype(self, invalid_mtype):
+ with pytest.raises(TypeError):
+ util.mount_cb(mock.Mock(), mock.Mock(), mtype=invalid_mtype)
+
+ @mock.patch("cloudinit.util.subp.subp")
+ def test_already_mounted_does_not_mount_or_umount_anything(
+ self, m_subp, already_mounted_device
+ ):
+ util.mount_cb(already_mounted_device, mock.Mock())
+
+ assert 0 == m_subp.call_count
+
+ @pytest.mark.parametrize("trailing_slash_in_mounts", ["/", ""])
+ def test_already_mounted_calls_callback(
+ self, trailing_slash_in_mounts, already_mounted_device_and_mountdict
+ ):
+ device, mount_dict = already_mounted_device_and_mountdict
+ mountpoint = mount_dict["mountpoint"]
+ mount_dict["mountpoint"] += trailing_slash_in_mounts
+
+ callback = mock.Mock()
+ util.mount_cb(device, callback)
+
+ # The mountpoint passed to callback should always have a trailing
+ # slash, regardless of the input
+ assert [mock.call(mountpoint + "/")] == callback.call_args_list
+
+ def test_already_mounted_calls_callback_with_data(
+ self, already_mounted_device
+ ):
+ callback = mock.Mock()
+ util.mount_cb(
+ already_mounted_device, callback, data=mock.sentinel.data
+ )
+
+ assert [
+ mock.call(mock.ANY, mock.sentinel.data)
+ ] == callback.call_args_list
+
+
# vi: ts=4 expandtab