summaryrefslogtreecommitdiff
path: root/cloudinit/tests/test_util.py
diff options
context:
space:
mode:
Diffstat (limited to 'cloudinit/tests/test_util.py')
-rw-r--r--cloudinit/tests/test_util.py854
1 files changed, 0 insertions, 854 deletions
diff --git a/cloudinit/tests/test_util.py b/cloudinit/tests/test_util.py
deleted file mode 100644
index b7a302f1..00000000
--- a/cloudinit/tests/test_util.py
+++ /dev/null
@@ -1,854 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-
-"""Tests for cloudinit.util"""
-
-import base64
-import logging
-import json
-import platform
-import pytest
-
-import cloudinit.util as util
-from cloudinit import subp
-
-from cloudinit.tests.helpers import CiTestCase, mock
-from textwrap import dedent
-
-LOG = logging.getLogger(__name__)
-
-MOUNT_INFO = [
- '68 0 8:3 / / ro,relatime shared:1 - btrfs /dev/sda1 ro,attr2,inode64',
- '153 68 254:0 / /home rw,relatime shared:101 - xfs /dev/sda2 rw,attr2'
-]
-
-OS_RELEASE_SLES = dedent("""\
- NAME="SLES"
- VERSION="12-SP3"
- VERSION_ID="12.3"
- PRETTY_NAME="SUSE Linux Enterprise Server 12 SP3"
- ID="sles"
- ANSI_COLOR="0;32"
- CPE_NAME="cpe:/o:suse:sles:12:sp3"
-""")
-
-OS_RELEASE_OPENSUSE = dedent("""\
- NAME="openSUSE Leap"
- VERSION="42.3"
- ID=opensuse
- ID_LIKE="suse"
- VERSION_ID="42.3"
- PRETTY_NAME="openSUSE Leap 42.3"
- ANSI_COLOR="0;32"
- CPE_NAME="cpe:/o:opensuse:leap:42.3"
- BUG_REPORT_URL="https://bugs.opensuse.org"
- HOME_URL="https://www.opensuse.org/"
-""")
-
-OS_RELEASE_OPENSUSE_L15 = dedent("""\
- NAME="openSUSE Leap"
- VERSION="15.0"
- ID="opensuse-leap"
- ID_LIKE="suse opensuse"
- VERSION_ID="15.0"
- PRETTY_NAME="openSUSE Leap 15.0"
- ANSI_COLOR="0;32"
- CPE_NAME="cpe:/o:opensuse:leap:15.0"
- BUG_REPORT_URL="https://bugs.opensuse.org"
- HOME_URL="https://www.opensuse.org/"
-""")
-
-OS_RELEASE_OPENSUSE_TW = dedent("""\
- NAME="openSUSE Tumbleweed"
- ID="opensuse-tumbleweed"
- ID_LIKE="opensuse suse"
- VERSION_ID="20180920"
- PRETTY_NAME="openSUSE Tumbleweed"
- ANSI_COLOR="0;32"
- CPE_NAME="cpe:/o:opensuse:tumbleweed:20180920"
- BUG_REPORT_URL="https://bugs.opensuse.org"
- HOME_URL="https://www.opensuse.org/"
-""")
-
-OS_RELEASE_CENTOS = dedent("""\
- NAME="CentOS Linux"
- VERSION="7 (Core)"
- ID="centos"
- ID_LIKE="rhel fedora"
- VERSION_ID="7"
- PRETTY_NAME="CentOS Linux 7 (Core)"
- ANSI_COLOR="0;31"
- CPE_NAME="cpe:/o:centos:centos:7"
- HOME_URL="https://www.centos.org/"
- BUG_REPORT_URL="https://bugs.centos.org/"
-
- CENTOS_MANTISBT_PROJECT="CentOS-7"
- CENTOS_MANTISBT_PROJECT_VERSION="7"
- REDHAT_SUPPORT_PRODUCT="centos"
- REDHAT_SUPPORT_PRODUCT_VERSION="7"
-""")
-
-OS_RELEASE_REDHAT_7 = dedent("""\
- NAME="Red Hat Enterprise Linux Server"
- VERSION="7.5 (Maipo)"
- ID="rhel"
- ID_LIKE="fedora"
- VARIANT="Server"
- VARIANT_ID="server"
- VERSION_ID="7.5"
- PRETTY_NAME="Red Hat"
- ANSI_COLOR="0;31"
- CPE_NAME="cpe:/o:redhat:enterprise_linux:7.5:GA:server"
- HOME_URL="https://www.redhat.com/"
- BUG_REPORT_URL="https://bugzilla.redhat.com/"
-
- REDHAT_BUGZILLA_PRODUCT="Red Hat Enterprise Linux 7"
- REDHAT_BUGZILLA_PRODUCT_VERSION=7.5
- REDHAT_SUPPORT_PRODUCT="Red Hat Enterprise Linux"
- REDHAT_SUPPORT_PRODUCT_VERSION="7.5"
-""")
-
-REDHAT_RELEASE_CENTOS_6 = "CentOS release 6.10 (Final)"
-REDHAT_RELEASE_CENTOS_7 = "CentOS Linux release 7.5.1804 (Core)"
-REDHAT_RELEASE_REDHAT_6 = (
- "Red Hat Enterprise Linux Server release 6.10 (Santiago)")
-REDHAT_RELEASE_REDHAT_7 = (
- "Red Hat Enterprise Linux Server release 7.5 (Maipo)")
-
-
-OS_RELEASE_DEBIAN = dedent("""\
- PRETTY_NAME="Debian GNU/Linux 9 (stretch)"
- NAME="Debian GNU/Linux"
- VERSION_ID="9"
- VERSION="9 (stretch)"
- ID=debian
- HOME_URL="https://www.debian.org/"
- SUPPORT_URL="https://www.debian.org/support"
- BUG_REPORT_URL="https://bugs.debian.org/"
-""")
-
-OS_RELEASE_UBUNTU = dedent("""\
- NAME="Ubuntu"\n
- # comment test
- VERSION="16.04.3 LTS (Xenial Xerus)"\n
- ID=ubuntu\n
- ID_LIKE=debian\n
- PRETTY_NAME="Ubuntu 16.04.3 LTS"\n
- VERSION_ID="16.04"\n
- HOME_URL="http://www.ubuntu.com/"\n
- SUPPORT_URL="http://help.ubuntu.com/"\n
- BUG_REPORT_URL="http://bugs.launchpad.net/ubuntu/"\n
- VERSION_CODENAME=xenial\n
- UBUNTU_CODENAME=xenial\n
-""")
-
-
-class FakeCloud(object):
-
- def __init__(self, hostname, fqdn):
- self.hostname = hostname
- self.fqdn = fqdn
- self.calls = []
-
- def get_hostname(self, fqdn=None, metadata_only=None):
- myargs = {}
- if fqdn is not None:
- myargs['fqdn'] = fqdn
- if metadata_only is not None:
- myargs['metadata_only'] = metadata_only
- self.calls.append(myargs)
- if fqdn:
- return self.fqdn
- return self.hostname
-
-
-class TestUtil(CiTestCase):
-
- def test_parse_mount_info_no_opts_no_arg(self):
- result = util.parse_mount_info('/home', MOUNT_INFO, LOG)
- self.assertEqual(('/dev/sda2', 'xfs', '/home'), result)
-
- def test_parse_mount_info_no_opts_arg(self):
- result = util.parse_mount_info('/home', MOUNT_INFO, LOG, False)
- self.assertEqual(('/dev/sda2', 'xfs', '/home'), result)
-
- def test_parse_mount_info_with_opts(self):
- result = util.parse_mount_info('/', MOUNT_INFO, LOG, True)
- self.assertEqual(
- ('/dev/sda1', 'btrfs', '/', 'ro,relatime'),
- result
- )
-
- @mock.patch('cloudinit.util.get_mount_info')
- def test_mount_is_rw(self, m_mount_info):
- m_mount_info.return_value = ('/dev/sda1', 'btrfs', '/', 'rw,relatime')
- is_rw = util.mount_is_read_write('/')
- self.assertEqual(is_rw, True)
-
- @mock.patch('cloudinit.util.get_mount_info')
- def test_mount_is_ro(self, m_mount_info):
- m_mount_info.return_value = ('/dev/sda1', 'btrfs', '/', 'ro,relatime')
- is_rw = util.mount_is_read_write('/')
- self.assertEqual(is_rw, False)
-
-
-class TestUptime(CiTestCase):
-
- @mock.patch('cloudinit.util.boottime')
- @mock.patch('cloudinit.util.os.path.exists')
- @mock.patch('cloudinit.util.time.time')
- def test_uptime_non_linux_path(self, m_time, m_exists, m_boottime):
- boottime = 1000.0
- uptime = 10.0
- m_boottime.return_value = boottime
- m_time.return_value = boottime + uptime
- m_exists.return_value = False
- result = util.uptime()
- self.assertEqual(str(uptime), result)
-
-
-class TestShellify(CiTestCase):
-
- def test_input_dict_raises_type_error(self):
- self.assertRaisesRegex(
- TypeError, 'Input.*was.*dict.*xpected',
- util.shellify, {'mykey': 'myval'})
-
- def test_input_str_raises_type_error(self):
- self.assertRaisesRegex(
- TypeError, 'Input.*was.*str.*xpected', util.shellify, "foobar")
-
- def test_value_with_int_raises_type_error(self):
- self.assertRaisesRegex(
- TypeError, 'shellify.*int', util.shellify, ["foo", 1])
-
- def test_supports_strings_and_lists(self):
- self.assertEqual(
- '\n'.join(["#!/bin/sh", "echo hi mom", "'echo' 'hi dad'",
- "'echo' 'hi' 'sis'", ""]),
- util.shellify(["echo hi mom", ["echo", "hi dad"],
- ('echo', 'hi', 'sis')]))
-
-
-class TestGetHostnameFqdn(CiTestCase):
-
- def test_get_hostname_fqdn_from_only_cfg_fqdn(self):
- """When cfg only has the fqdn key, derive hostname and fqdn from it."""
- hostname, fqdn = util.get_hostname_fqdn(
- cfg={'fqdn': 'myhost.domain.com'}, cloud=None)
- self.assertEqual('myhost', hostname)
- self.assertEqual('myhost.domain.com', fqdn)
-
- def test_get_hostname_fqdn_from_cfg_fqdn_and_hostname(self):
- """When cfg has both fqdn and hostname keys, return them."""
- hostname, fqdn = util.get_hostname_fqdn(
- cfg={'fqdn': 'myhost.domain.com', 'hostname': 'other'}, cloud=None)
- self.assertEqual('other', hostname)
- self.assertEqual('myhost.domain.com', fqdn)
-
- def test_get_hostname_fqdn_from_cfg_hostname_with_domain(self):
- """When cfg has only hostname key which represents a fqdn, use that."""
- hostname, fqdn = util.get_hostname_fqdn(
- cfg={'hostname': 'myhost.domain.com'}, cloud=None)
- self.assertEqual('myhost', hostname)
- self.assertEqual('myhost.domain.com', fqdn)
-
- def test_get_hostname_fqdn_from_cfg_hostname_without_domain(self):
- """When cfg has a hostname without a '.' query cloud.get_hostname."""
- mycloud = FakeCloud('cloudhost', 'cloudhost.mycloud.com')
- hostname, fqdn = util.get_hostname_fqdn(
- cfg={'hostname': 'myhost'}, cloud=mycloud)
- self.assertEqual('myhost', hostname)
- self.assertEqual('cloudhost.mycloud.com', fqdn)
- self.assertEqual(
- [{'fqdn': True, 'metadata_only': False}], mycloud.calls)
-
- def test_get_hostname_fqdn_from_without_fqdn_or_hostname(self):
- """When cfg has neither hostname nor fqdn cloud.get_hostname."""
- mycloud = FakeCloud('cloudhost', 'cloudhost.mycloud.com')
- hostname, fqdn = util.get_hostname_fqdn(cfg={}, cloud=mycloud)
- self.assertEqual('cloudhost', hostname)
- self.assertEqual('cloudhost.mycloud.com', fqdn)
- self.assertEqual(
- [{'fqdn': True, 'metadata_only': False},
- {'metadata_only': False}], mycloud.calls)
-
- def test_get_hostname_fqdn_from_passes_metadata_only_to_cloud(self):
- """Calls to cloud.get_hostname pass the metadata_only parameter."""
- mycloud = FakeCloud('cloudhost', 'cloudhost.mycloud.com')
- _hn, _fqdn = util.get_hostname_fqdn(
- cfg={}, cloud=mycloud, metadata_only=True)
- self.assertEqual(
- [{'fqdn': True, 'metadata_only': True},
- {'metadata_only': True}], mycloud.calls)
-
-
-class TestBlkid(CiTestCase):
- ids = {
- "id01": "1111-1111",
- "id02": "22222222-2222",
- "id03": "33333333-3333",
- "id04": "44444444-4444",
- "id05": "55555555-5555-5555-5555-555555555555",
- "id06": "66666666-6666-6666-6666-666666666666",
- "id07": "52894610484658920398",
- "id08": "86753098675309867530",
- "id09": "99999999-9999-9999-9999-999999999999",
- }
-
- blkid_out = dedent("""\
- /dev/loop0: TYPE="squashfs"
- /dev/loop1: TYPE="squashfs"
- /dev/loop2: TYPE="squashfs"
- /dev/loop3: TYPE="squashfs"
- /dev/sda1: UUID="{id01}" TYPE="vfat" PARTUUID="{id02}"
- /dev/sda2: UUID="{id03}" TYPE="ext4" PARTUUID="{id04}"
- /dev/sda3: UUID="{id05}" TYPE="ext4" PARTUUID="{id06}"
- /dev/sda4: LABEL="default" UUID="{id07}" UUID_SUB="{id08}" """
- """TYPE="zfs_member" PARTUUID="{id09}"
- /dev/loop4: TYPE="squashfs"
- """)
-
- maxDiff = None
-
- def _get_expected(self):
- return ({
- "/dev/loop0": {"DEVNAME": "/dev/loop0", "TYPE": "squashfs"},
- "/dev/loop1": {"DEVNAME": "/dev/loop1", "TYPE": "squashfs"},
- "/dev/loop2": {"DEVNAME": "/dev/loop2", "TYPE": "squashfs"},
- "/dev/loop3": {"DEVNAME": "/dev/loop3", "TYPE": "squashfs"},
- "/dev/loop4": {"DEVNAME": "/dev/loop4", "TYPE": "squashfs"},
- "/dev/sda1": {"DEVNAME": "/dev/sda1", "TYPE": "vfat",
- "UUID": self.ids["id01"],
- "PARTUUID": self.ids["id02"]},
- "/dev/sda2": {"DEVNAME": "/dev/sda2", "TYPE": "ext4",
- "UUID": self.ids["id03"],
- "PARTUUID": self.ids["id04"]},
- "/dev/sda3": {"DEVNAME": "/dev/sda3", "TYPE": "ext4",
- "UUID": self.ids["id05"],
- "PARTUUID": self.ids["id06"]},
- "/dev/sda4": {"DEVNAME": "/dev/sda4", "TYPE": "zfs_member",
- "LABEL": "default",
- "UUID": self.ids["id07"],
- "UUID_SUB": self.ids["id08"],
- "PARTUUID": self.ids["id09"]},
- })
-
- @mock.patch("cloudinit.subp.subp")
- def test_functional_blkid(self, m_subp):
- m_subp.return_value = (
- self.blkid_out.format(**self.ids), "")
- self.assertEqual(self._get_expected(), util.blkid())
- m_subp.assert_called_with(["blkid", "-o", "full"], capture=True,
- decode="replace")
-
- @mock.patch("cloudinit.subp.subp")
- def test_blkid_no_cache_uses_no_cache(self, m_subp):
- """blkid should turn off cache if disable_cache is true."""
- m_subp.return_value = (
- self.blkid_out.format(**self.ids), "")
- self.assertEqual(self._get_expected(),
- util.blkid(disable_cache=True))
- m_subp.assert_called_with(["blkid", "-o", "full", "-c", "/dev/null"],
- capture=True, decode="replace")
-
-
-@mock.patch('cloudinit.subp.subp')
-class TestUdevadmSettle(CiTestCase):
- def test_with_no_params(self, m_subp):
- """called with no parameters."""
- util.udevadm_settle()
- m_subp.called_once_with(mock.call(['udevadm', 'settle']))
-
- def test_with_exists_and_not_exists(self, m_subp):
- """with exists=file where file does not exist should invoke subp."""
- mydev = self.tmp_path("mydev")
- util.udevadm_settle(exists=mydev)
- m_subp.called_once_with(
- ['udevadm', 'settle', '--exit-if-exists=%s' % mydev])
-
- def test_with_exists_and_file_exists(self, m_subp):
- """with exists=file where file does exist should not invoke subp."""
- mydev = self.tmp_path("mydev")
- util.write_file(mydev, "foo\n")
- util.udevadm_settle(exists=mydev)
- self.assertIsNone(m_subp.call_args)
-
- def test_with_timeout_int(self, m_subp):
- """timeout can be an integer."""
- timeout = 9
- util.udevadm_settle(timeout=timeout)
- m_subp.called_once_with(
- ['udevadm', 'settle', '--timeout=%s' % timeout])
-
- def test_with_timeout_string(self, m_subp):
- """timeout can be a string."""
- timeout = "555"
- util.udevadm_settle(timeout=timeout)
- m_subp.assert_called_once_with(
- ['udevadm', 'settle', '--timeout=%s' % timeout])
-
- def test_with_exists_and_timeout(self, m_subp):
- """test call with both exists and timeout."""
- mydev = self.tmp_path("mydev")
- timeout = "3"
- util.udevadm_settle(exists=mydev)
- m_subp.called_once_with(
- ['udevadm', 'settle', '--exit-if-exists=%s' % mydev,
- '--timeout=%s' % timeout])
-
- def test_subp_exception_raises_to_caller(self, m_subp):
- m_subp.side_effect = subp.ProcessExecutionError("BOOM")
- self.assertRaises(subp.ProcessExecutionError, util.udevadm_settle)
-
-
-@mock.patch('os.path.exists')
-class TestGetLinuxDistro(CiTestCase):
-
- def setUp(self):
- # python2 has no lru_cache, and therefore, no cache_clear()
- if hasattr(util.get_linux_distro, "cache_clear"):
- util.get_linux_distro.cache_clear()
-
- @classmethod
- def os_release_exists(self, path):
- """Side effect function"""
- if path == '/etc/os-release':
- return 1
-
- @classmethod
- def redhat_release_exists(self, path):
- """Side effect function """
- if path == '/etc/redhat-release':
- return 1
-
- @mock.patch('cloudinit.util.load_file')
- def test_get_linux_distro_quoted_name(self, m_os_release, m_path_exists):
- """Verify we get the correct name if the os-release file has
- the distro name in quotes"""
- m_os_release.return_value = OS_RELEASE_SLES
- m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists
- dist = util.get_linux_distro()
- self.assertEqual(('sles', '12.3', platform.machine()), dist)
-
- @mock.patch('cloudinit.util.load_file')
- def test_get_linux_distro_bare_name(self, m_os_release, m_path_exists):
- """Verify we get the correct name if the os-release file does not
- have the distro name in quotes"""
- m_os_release.return_value = OS_RELEASE_UBUNTU
- m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists
- dist = util.get_linux_distro()
- self.assertEqual(('ubuntu', '16.04', 'xenial'), dist)
-
- @mock.patch('platform.system')
- @mock.patch('platform.release')
- @mock.patch('cloudinit.util._parse_redhat_release')
- def test_get_linux_freebsd(self, m_parse_redhat_release,
- m_platform_release,
- m_platform_system, m_path_exists):
- """Verify we get the correct name and release name on FreeBSD."""
- m_path_exists.return_value = False
- m_platform_release.return_value = '12.0-RELEASE-p10'
- m_platform_system.return_value = 'FreeBSD'
- m_parse_redhat_release.return_value = {}
- util.is_BSD.cache_clear()
- dist = util.get_linux_distro()
- self.assertEqual(('freebsd', '12.0-RELEASE-p10', ''), dist)
-
- @mock.patch('cloudinit.util.load_file')
- def test_get_linux_centos6(self, m_os_release, m_path_exists):
- """Verify we get the correct name and release name on CentOS 6."""
- m_os_release.return_value = REDHAT_RELEASE_CENTOS_6
- m_path_exists.side_effect = TestGetLinuxDistro.redhat_release_exists
- dist = util.get_linux_distro()
- self.assertEqual(('centos', '6.10', 'Final'), dist)
-
- @mock.patch('cloudinit.util.load_file')
- def test_get_linux_centos7_redhat_release(self, m_os_release, m_exists):
- """Verify the correct release info on CentOS 7 without os-release."""
- m_os_release.return_value = REDHAT_RELEASE_CENTOS_7
- m_exists.side_effect = TestGetLinuxDistro.redhat_release_exists
- dist = util.get_linux_distro()
- self.assertEqual(('centos', '7.5.1804', 'Core'), dist)
-
- @mock.patch('cloudinit.util.load_file')
- def test_get_linux_redhat7_osrelease(self, m_os_release, m_path_exists):
- """Verify redhat 7 read from os-release."""
- m_os_release.return_value = OS_RELEASE_REDHAT_7
- m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists
- dist = util.get_linux_distro()
- self.assertEqual(('redhat', '7.5', 'Maipo'), dist)
-
- @mock.patch('cloudinit.util.load_file')
- def test_get_linux_redhat7_rhrelease(self, m_os_release, m_path_exists):
- """Verify redhat 7 read from redhat-release."""
- m_os_release.return_value = REDHAT_RELEASE_REDHAT_7
- m_path_exists.side_effect = TestGetLinuxDistro.redhat_release_exists
- dist = util.get_linux_distro()
- self.assertEqual(('redhat', '7.5', 'Maipo'), dist)
-
- @mock.patch('cloudinit.util.load_file')
- def test_get_linux_redhat6_rhrelease(self, m_os_release, m_path_exists):
- """Verify redhat 6 read from redhat-release."""
- m_os_release.return_value = REDHAT_RELEASE_REDHAT_6
- m_path_exists.side_effect = TestGetLinuxDistro.redhat_release_exists
- dist = util.get_linux_distro()
- self.assertEqual(('redhat', '6.10', 'Santiago'), dist)
-
- @mock.patch('cloudinit.util.load_file')
- def test_get_linux_copr_centos(self, m_os_release, m_path_exists):
- """Verify we get the correct name and release name on COPR CentOS."""
- m_os_release.return_value = OS_RELEASE_CENTOS
- m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists
- dist = util.get_linux_distro()
- self.assertEqual(('centos', '7', 'Core'), dist)
-
- @mock.patch('cloudinit.util.load_file')
- def test_get_linux_debian(self, m_os_release, m_path_exists):
- """Verify we get the correct name and release name on Debian."""
- m_os_release.return_value = OS_RELEASE_DEBIAN
- m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists
- dist = util.get_linux_distro()
- self.assertEqual(('debian', '9', 'stretch'), dist)
-
- @mock.patch('cloudinit.util.load_file')
- def test_get_linux_opensuse(self, m_os_release, m_path_exists):
- """Verify we get the correct name and machine arch on openSUSE
- prior to openSUSE Leap 15.
- """
- m_os_release.return_value = OS_RELEASE_OPENSUSE
- m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists
- dist = util.get_linux_distro()
- self.assertEqual(('opensuse', '42.3', platform.machine()), dist)
-
- @mock.patch('cloudinit.util.load_file')
- def test_get_linux_opensuse_l15(self, m_os_release, m_path_exists):
- """Verify we get the correct name and machine arch on openSUSE
- for openSUSE Leap 15.0 and later.
- """
- m_os_release.return_value = OS_RELEASE_OPENSUSE_L15
- m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists
- dist = util.get_linux_distro()
- self.assertEqual(('opensuse-leap', '15.0', platform.machine()), dist)
-
- @mock.patch('cloudinit.util.load_file')
- def test_get_linux_opensuse_tw(self, m_os_release, m_path_exists):
- """Verify we get the correct name and machine arch on openSUSE
- for openSUSE Tumbleweed
- """
- m_os_release.return_value = OS_RELEASE_OPENSUSE_TW
- m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists
- dist = util.get_linux_distro()
- self.assertEqual(
- ('opensuse-tumbleweed', '20180920', platform.machine()), dist)
-
- @mock.patch('platform.system')
- @mock.patch('platform.dist', create=True)
- def test_get_linux_distro_no_data(self, m_platform_dist,
- m_platform_system, m_path_exists):
- """Verify we get no information if os-release does not exist"""
- m_platform_dist.return_value = ('', '', '')
- m_platform_system.return_value = "Linux"
- m_path_exists.return_value = 0
- dist = util.get_linux_distro()
- self.assertEqual(('', '', ''), dist)
-
- @mock.patch('platform.system')
- @mock.patch('platform.dist', create=True)
- def test_get_linux_distro_no_impl(self, m_platform_dist,
- m_platform_system, m_path_exists):
- """Verify we get an empty tuple when no information exists and
- Exceptions are not propagated"""
- m_platform_dist.side_effect = Exception()
- m_platform_system.return_value = "Linux"
- m_path_exists.return_value = 0
- dist = util.get_linux_distro()
- self.assertEqual(('', '', ''), dist)
-
- @mock.patch('platform.system')
- @mock.patch('platform.dist', create=True)
- def test_get_linux_distro_plat_data(self, m_platform_dist,
- m_platform_system, m_path_exists):
- """Verify we get the correct platform information"""
- m_platform_dist.return_value = ('foo', '1.1', 'aarch64')
- m_platform_system.return_value = "Linux"
- m_path_exists.return_value = 0
- dist = util.get_linux_distro()
- self.assertEqual(('foo', '1.1', 'aarch64'), dist)
-
-
-class TestJsonDumps(CiTestCase):
- def test_is_str(self):
- """json_dumps should return a string."""
- self.assertTrue(isinstance(util.json_dumps({'abc': '123'}), str))
-
- def test_utf8(self):
- smiley = '\\ud83d\\ude03'
- self.assertEqual(
- {'smiley': smiley},
- json.loads(util.json_dumps({'smiley': smiley})))
-
- def test_non_utf8(self):
- blob = b'\xba\x03Qx-#y\xea'
- self.assertEqual(
- {'blob': 'ci-b64:' + base64.b64encode(blob).decode('utf-8')},
- json.loads(util.json_dumps({'blob': blob})))
-
-
-@mock.patch('os.path.exists')
-class TestIsLXD(CiTestCase):
-
- def test_is_lxd_true_on_sock_device(self, m_exists):
- """When lxd's /dev/lxd/sock exists, is_lxd returns true."""
- m_exists.return_value = True
- self.assertTrue(util.is_lxd())
- m_exists.assert_called_once_with('/dev/lxd/sock')
-
- def test_is_lxd_false_when_sock_device_absent(self, m_exists):
- """When lxd's /dev/lxd/sock is absent, is_lxd returns false."""
- m_exists.return_value = False
- self.assertFalse(util.is_lxd())
- m_exists.assert_called_once_with('/dev/lxd/sock')
-
-
-class TestReadCcFromCmdline:
-
- @pytest.mark.parametrize(
- "cmdline,expected_cfg",
- [
- # Return None if cmdline has no cc:<YAML>end_cc content.
- (CiTestCase.random_string(), None),
- # Return None if YAML content is empty string.
- ('foo cc: end_cc bar', None),
- # Return expected dictionary without trailing end_cc marker.
- ('foo cc: ssh_pwauth: true', {'ssh_pwauth': True}),
- # Return expected dictionary w escaped newline and no end_cc.
- ('foo cc: ssh_pwauth: true\\n', {'ssh_pwauth': True}),
- # Return expected dictionary of yaml between cc: and end_cc.
- ('foo cc: ssh_pwauth: true end_cc bar', {'ssh_pwauth': True}),
- # Return dict with list value w escaped newline, no end_cc.
- (
- 'cc: ssh_import_id: [smoser, kirkland]\\n',
- {'ssh_import_id': ['smoser', 'kirkland']}
- ),
- # Parse urlencoded brackets in yaml content.
- (
- 'cc: ssh_import_id: %5Bsmoser, kirkland%5D end_cc',
- {'ssh_import_id': ['smoser', 'kirkland']}
- ),
- # Parse complete urlencoded yaml content.
- (
- 'cc: ssh_import_id%3A%20%5Buser1%2C%20user2%5D end_cc',
- {'ssh_import_id': ['user1', 'user2']}
- ),
- # Parse nested dictionary in yaml content.
- (
- 'cc: ntp: {enabled: true, ntp_client: myclient} end_cc',
- {'ntp': {'enabled': True, 'ntp_client': 'myclient'}}
- ),
- # Parse single mapping value in yaml content.
- ('cc: ssh_import_id: smoser end_cc', {'ssh_import_id': 'smoser'}),
- # Parse multiline content with multiple mapping and nested lists.
- (
- ('cc: ssh_import_id: [smoser, bob]\\n'
- 'runcmd: [ [ ls, -l ], echo hi ] end_cc'),
- {'ssh_import_id': ['smoser', 'bob'],
- 'runcmd': [['ls', '-l'], 'echo hi']}
- ),
- # Parse multiline encoded content w/ mappings and nested lists.
- (
- ('cc: ssh_import_id: %5Bsmoser, bob%5D\\n'
- 'runcmd: [ [ ls, -l ], echo hi ] end_cc'),
- {'ssh_import_id': ['smoser', 'bob'],
- 'runcmd': [['ls', '-l'], 'echo hi']}
- ),
- # test encoded escaped newlines work.
- #
- # unquote(encoded_content)
- # 'ssh_import_id: [smoser, bob]\\nruncmd: [ [ ls, -l ], echo hi ]'
- (
- ('cc: ' +
- ('ssh_import_id%3A%20%5Bsmoser%2C%20bob%5D%5Cn'
- 'runcmd%3A%20%5B%20%5B%20ls%2C%20-l%20%5D%2C'
- '%20echo%20hi%20%5D') + ' end_cc'),
- {'ssh_import_id': ['smoser', 'bob'],
- 'runcmd': [['ls', '-l'], 'echo hi']}
- ),
- # test encoded newlines work.
- #
- # unquote(encoded_content)
- # 'ssh_import_id: [smoser, bob]\nruncmd: [ [ ls, -l ], echo hi ]'
- (
- ("cc: " +
- ('ssh_import_id%3A%20%5Bsmoser%2C%20bob%5D%0A'
- 'runcmd%3A%20%5B%20%5B%20ls%2C%20-l%20%5D%2C'
- '%20echo%20hi%20%5D') + ' end_cc'),
- {'ssh_import_id': ['smoser', 'bob'],
- 'runcmd': [['ls', '-l'], 'echo hi']}
- ),
- # Parse and merge multiple yaml content sections.
- (
- ('cc:ssh_import_id: [smoser, bob] end_cc '
- 'cc: runcmd: [ [ ls, -l ] ] end_cc'),
- {'ssh_import_id': ['smoser', 'bob'],
- 'runcmd': [['ls', '-l']]}
- ),
- # Parse and merge multiple encoded yaml content sections.
- (
- ('cc:ssh_import_id%3A%20%5Bsmoser%5D end_cc '
- 'cc:runcmd%3A%20%5B%20%5B%20ls%2C%20-l%20%5D%20%5D end_cc'),
- {'ssh_import_id': ['smoser'], 'runcmd': [['ls', '-l']]}
- ),
- ]
- )
- def test_read_conf_from_cmdline_config(self, expected_cfg, cmdline):
- assert expected_cfg == util.read_conf_from_cmdline(cmdline=cmdline)
-
-
-class TestMountCb:
- """Tests for ``util.mount_cb``.
-
- These tests consider the "unit" under test to be ``util.mount_cb`` and
- ``util.unmounter``, which is only used by ``mount_cb``.
-
- TODO: Test default mtype determination
- TODO: Test the if/else branch that actually performs the mounting operation
- """
-
- @pytest.yield_fixture
- def already_mounted_device_and_mountdict(self):
- """Mock an already-mounted device, and yield (device, mount dict)"""
- device = "/dev/fake0"
- mountpoint = "/mnt/fake"
- with mock.patch("cloudinit.util.subp.subp"):
- with mock.patch("cloudinit.util.mounts") as m_mounts:
- mounts = {device: {"mountpoint": mountpoint}}
- m_mounts.return_value = mounts
- yield device, mounts[device]
-
- @pytest.fixture
- def already_mounted_device(self, already_mounted_device_and_mountdict):
- """already_mounted_device_and_mountdict, but return only the device"""
- return already_mounted_device_and_mountdict[0]
-
- @pytest.mark.parametrize(
- "mtype,expected",
- [
- # While the filesystem is called iso9660, the mount type is cd9660
- ("iso9660", "cd9660"),
- # vfat is generally called "msdos" on BSD
- ("vfat", "msdos"),
- # judging from man pages, only FreeBSD has this alias
- ("msdosfs", "msdos"),
- # Test happy path
- ("ufs", "ufs")
- ],
- )
- @mock.patch("cloudinit.util.is_Linux", autospec=True)
- @mock.patch("cloudinit.util.is_BSD", autospec=True)
- @mock.patch("cloudinit.util.subp.subp")
- @mock.patch("cloudinit.temp_utils.tempdir", autospec=True)
- def test_normalize_mtype_on_bsd(
- self, m_tmpdir, m_subp, m_is_BSD, m_is_Linux, mtype, expected
- ):
- m_is_BSD.return_value = True
- m_is_Linux.return_value = False
- m_tmpdir.return_value.__enter__ = mock.Mock(
- autospec=True, return_value="/tmp/fake"
- )
- m_tmpdir.return_value.__exit__ = mock.Mock(
- autospec=True, return_value=True
- )
- callback = mock.Mock(autospec=True)
-
- util.mount_cb('/dev/fake0', callback, mtype=mtype)
- assert mock.call(
- ["mount", "-o", "ro", "-t", expected, "/dev/fake0", "/tmp/fake"],
- update_env=None) in m_subp.call_args_list
-
- @pytest.mark.parametrize("invalid_mtype", [int(0), float(0.0), dict()])
- def test_typeerror_raised_for_invalid_mtype(self, invalid_mtype):
- with pytest.raises(TypeError):
- util.mount_cb(mock.Mock(), mock.Mock(), mtype=invalid_mtype)
-
- @mock.patch("cloudinit.util.subp.subp")
- def test_already_mounted_does_not_mount_or_umount_anything(
- self, m_subp, already_mounted_device
- ):
- util.mount_cb(already_mounted_device, mock.Mock())
-
- assert 0 == m_subp.call_count
-
- @pytest.mark.parametrize("trailing_slash_in_mounts", ["/", ""])
- def test_already_mounted_calls_callback(
- self, trailing_slash_in_mounts, already_mounted_device_and_mountdict
- ):
- device, mount_dict = already_mounted_device_and_mountdict
- mountpoint = mount_dict["mountpoint"]
- mount_dict["mountpoint"] += trailing_slash_in_mounts
-
- callback = mock.Mock()
- util.mount_cb(device, callback)
-
- # The mountpoint passed to callback should always have a trailing
- # slash, regardless of the input
- assert [mock.call(mountpoint + "/")] == callback.call_args_list
-
- def test_already_mounted_calls_callback_with_data(
- self, already_mounted_device
- ):
- callback = mock.Mock()
- util.mount_cb(
- already_mounted_device, callback, data=mock.sentinel.data
- )
-
- assert [
- mock.call(mock.ANY, mock.sentinel.data)
- ] == callback.call_args_list
-
-
-@mock.patch("cloudinit.util.write_file")
-class TestEnsureFile:
- """Tests for ``cloudinit.util.ensure_file``."""
-
- def test_parameters_passed_through(self, m_write_file):
- """Test the parameters in the signature are passed to write_file."""
- util.ensure_file(
- mock.sentinel.path,
- mode=mock.sentinel.mode,
- preserve_mode=mock.sentinel.preserve_mode,
- )
-
- assert 1 == m_write_file.call_count
- args, kwargs = m_write_file.call_args
- assert (mock.sentinel.path,) == args
- assert mock.sentinel.mode == kwargs["mode"]
- assert mock.sentinel.preserve_mode == kwargs["preserve_mode"]
-
- @pytest.mark.parametrize(
- "kwarg,expected",
- [
- # Files should be world-readable by default
- ("mode", 0o644),
- # The previous behaviour of not preserving mode should be retained
- ("preserve_mode", False),
- ],
- )
- def test_defaults(self, m_write_file, kwarg, expected):
- """Test that ensure_file defaults appropriately."""
- util.ensure_file(mock.sentinel.path)
-
- assert 1 == m_write_file.call_count
- _args, kwargs = m_write_file.call_args
- assert expected == kwargs[kwarg]
-
- def test_static_parameters_are_passed(self, m_write_file):
- """Test that the static write_files parameters are passed correctly."""
- util.ensure_file(mock.sentinel.path)
-
- assert 1 == m_write_file.call_count
- _args, kwargs = m_write_file.call_args
- assert "" == kwargs["content"]
- assert "ab" == kwargs["omode"]
-
-
-# vi: ts=4 expandtab