# This file is part of cloud-init. See LICENSE file for license information. """Tests for cloudinit.util""" import logging import platform import cloudinit.util as util from cloudinit.tests.helpers import CiTestCase, mock from textwrap import dedent LOG = logging.getLogger(__name__) MOUNT_INFO = [ '68 0 8:3 / / ro,relatime shared:1 - btrfs /dev/sda1 ro,attr2,inode64', '153 68 254:0 / /home rw,relatime shared:101 - xfs /dev/sda2 rw,attr2' ] OS_RELEASE_SLES = dedent("""\ NAME="SLES" VERSION="12-SP3" VERSION_ID="12.3" PRETTY_NAME="SUSE Linux Enterprise Server 12 SP3" ID="sles" ANSI_COLOR="0;32" CPE_NAME="cpe:/o:suse:sles:12:sp3" """) OS_RELEASE_OPENSUSE = dedent("""\ NAME="openSUSE Leap" VERSION="42.3" ID=opensuse ID_LIKE="suse" VERSION_ID="42.3" PRETTY_NAME="openSUSE Leap 42.3" ANSI_COLOR="0;32" CPE_NAME="cpe:/o:opensuse:leap:42.3" BUG_REPORT_URL="https://bugs.opensuse.org" HOME_URL="https://www.opensuse.org/" """) OS_RELEASE_OPENSUSE_L15 = dedent("""\ NAME="openSUSE Leap" VERSION="15.0" ID="opensuse-leap" ID_LIKE="suse opensuse" VERSION_ID="15.0" PRETTY_NAME="openSUSE Leap 15.0" ANSI_COLOR="0;32" CPE_NAME="cpe:/o:opensuse:leap:15.0" BUG_REPORT_URL="https://bugs.opensuse.org" HOME_URL="https://www.opensuse.org/" """) OS_RELEASE_OPENSUSE_TW = dedent("""\ NAME="openSUSE Tumbleweed" ID="opensuse-tumbleweed" ID_LIKE="opensuse suse" VERSION_ID="20180920" PRETTY_NAME="openSUSE Tumbleweed" ANSI_COLOR="0;32" CPE_NAME="cpe:/o:opensuse:tumbleweed:20180920" BUG_REPORT_URL="https://bugs.opensuse.org" HOME_URL="https://www.opensuse.org/" """) OS_RELEASE_CENTOS = dedent("""\ NAME="CentOS Linux" VERSION="7 (Core)" ID="centos" ID_LIKE="rhel fedora" VERSION_ID="7" PRETTY_NAME="CentOS Linux 7 (Core)" ANSI_COLOR="0;31" CPE_NAME="cpe:/o:centos:centos:7" HOME_URL="https://www.centos.org/" BUG_REPORT_URL="https://bugs.centos.org/" CENTOS_MANTISBT_PROJECT="CentOS-7" CENTOS_MANTISBT_PROJECT_VERSION="7" REDHAT_SUPPORT_PRODUCT="centos" REDHAT_SUPPORT_PRODUCT_VERSION="7" """) OS_RELEASE_REDHAT_7 = dedent("""\ NAME="Red Hat Enterprise Linux Server" VERSION="7.5 (Maipo)" ID="rhel" ID_LIKE="fedora" VARIANT="Server" VARIANT_ID="server" VERSION_ID="7.5" PRETTY_NAME="Red Hat" ANSI_COLOR="0;31" CPE_NAME="cpe:/o:redhat:enterprise_linux:7.5:GA:server" HOME_URL="https://www.redhat.com/" BUG_REPORT_URL="https://bugzilla.redhat.com/" REDHAT_BUGZILLA_PRODUCT="Red Hat Enterprise Linux 7" REDHAT_BUGZILLA_PRODUCT_VERSION=7.5 REDHAT_SUPPORT_PRODUCT="Red Hat Enterprise Linux" REDHAT_SUPPORT_PRODUCT_VERSION="7.5" """) REDHAT_RELEASE_CENTOS_6 = "CentOS release 6.10 (Final)" REDHAT_RELEASE_CENTOS_7 = "CentOS Linux release 7.5.1804 (Core)" REDHAT_RELEASE_REDHAT_6 = ( "Red Hat Enterprise Linux Server release 6.10 (Santiago)") REDHAT_RELEASE_REDHAT_7 = ( "Red Hat Enterprise Linux Server release 7.5 (Maipo)") OS_RELEASE_DEBIAN = dedent("""\ PRETTY_NAME="Debian GNU/Linux 9 (stretch)" NAME="Debian GNU/Linux" VERSION_ID="9" VERSION="9 (stretch)" ID=debian HOME_URL="https://www.debian.org/" SUPPORT_URL="https://www.debian.org/support" BUG_REPORT_URL="https://bugs.debian.org/" """) OS_RELEASE_UBUNTU = dedent("""\ NAME="Ubuntu"\n # comment test VERSION="16.04.3 LTS (Xenial Xerus)"\n ID=ubuntu\n ID_LIKE=debian\n PRETTY_NAME="Ubuntu 16.04.3 LTS"\n VERSION_ID="16.04"\n HOME_URL="http://www.ubuntu.com/"\n SUPPORT_URL="http://help.ubuntu.com/"\n BUG_REPORT_URL="http://bugs.launchpad.net/ubuntu/"\n VERSION_CODENAME=xenial\n UBUNTU_CODENAME=xenial\n """) class FakeCloud(object): def __init__(self, hostname, fqdn): self.hostname = hostname self.fqdn = fqdn self.calls = [] def get_hostname(self, fqdn=None, metadata_only=None): myargs = {} if fqdn is not None: myargs['fqdn'] = fqdn if metadata_only is not None: myargs['metadata_only'] = metadata_only self.calls.append(myargs) if fqdn: return self.fqdn return self.hostname class TestUtil(CiTestCase): def test_parse_mount_info_no_opts_no_arg(self): result = util.parse_mount_info('/home', MOUNT_INFO, LOG) self.assertEqual(('/dev/sda2', 'xfs', '/home'), result) def test_parse_mount_info_no_opts_arg(self): result = util.parse_mount_info('/home', MOUNT_INFO, LOG, False) self.assertEqual(('/dev/sda2', 'xfs', '/home'), result) def test_parse_mount_info_with_opts(self): result = util.parse_mount_info('/', MOUNT_INFO, LOG, True) self.assertEqual( ('/dev/sda1', 'btrfs', '/', 'ro,relatime'), result ) @mock.patch('cloudinit.util.get_mount_info') def test_mount_is_rw(self, m_mount_info): m_mount_info.return_value = ('/dev/sda1', 'btrfs', '/', 'rw,relatime') is_rw = util.mount_is_read_write('/') self.assertEqual(is_rw, True) @mock.patch('cloudinit.util.get_mount_info') def test_mount_is_ro(self, m_mount_info): m_mount_info.return_value = ('/dev/sda1', 'btrfs', '/', 'ro,relatime') is_rw = util.mount_is_read_write('/') self.assertEqual(is_rw, False) class TestShellify(CiTestCase): def test_input_dict_raises_type_error(self): self.assertRaisesRegex( TypeError, 'Input.*was.*dict.*xpected', util.shellify, {'mykey': 'myval'}) def test_input_str_raises_type_error(self): self.assertRaisesRegex( TypeError, 'Input.*was.*str.*xpected', util.shellify, "foobar") def test_value_with_int_raises_type_error(self): self.assertRaisesRegex( TypeError, 'shellify.*int', util.shellify, ["foo", 1]) def test_supports_strings_and_lists(self): self.assertEqual( '\n'.join(["#!/bin/sh", "echo hi mom", "'echo' 'hi dad'", "'echo' 'hi' 'sis'", ""]), util.shellify(["echo hi mom", ["echo", "hi dad"], ('echo', 'hi', 'sis')])) class TestGetHostnameFqdn(CiTestCase): def test_get_hostname_fqdn_from_only_cfg_fqdn(self): """When cfg only has the fqdn key, derive hostname and fqdn from it.""" hostname, fqdn = util.get_hostname_fqdn( cfg={'fqdn': 'myhost.domain.com'}, cloud=None) self.assertEqual('myhost', hostname) self.assertEqual('myhost.domain.com', fqdn) def test_get_hostname_fqdn_from_cfg_fqdn_and_hostname(self): """When cfg has both fqdn and hostname keys, return them.""" hostname, fqdn = util.get_hostname_fqdn( cfg={'fqdn': 'myhost.domain.com', 'hostname': 'other'}, cloud=None) self.assertEqual('other', hostname) self.assertEqual('myhost.domain.com', fqdn) def test_get_hostname_fqdn_from_cfg_hostname_with_domain(self): """When cfg has only hostname key which represents a fqdn, use that.""" hostname, fqdn = util.get_hostname_fqdn( cfg={'hostname': 'myhost.domain.com'}, cloud=None) self.assertEqual('myhost', hostname) self.assertEqual('myhost.domain.com', fqdn) def test_get_hostname_fqdn_from_cfg_hostname_without_domain(self): """When cfg has a hostname without a '.' query cloud.get_hostname.""" mycloud = FakeCloud('cloudhost', 'cloudhost.mycloud.com') hostname, fqdn = util.get_hostname_fqdn( cfg={'hostname': 'myhost'}, cloud=mycloud) self.assertEqual('myhost', hostname) self.assertEqual('cloudhost.mycloud.com', fqdn) self.assertEqual( [{'fqdn': True, 'metadata_only': False}], mycloud.calls) def test_get_hostname_fqdn_from_without_fqdn_or_hostname(self): """When cfg has neither hostname nor fqdn cloud.get_hostname.""" mycloud = FakeCloud('cloudhost', 'cloudhost.mycloud.com') hostname, fqdn = util.get_hostname_fqdn(cfg={}, cloud=mycloud) self.assertEqual('cloudhost', hostname) self.assertEqual('cloudhost.mycloud.com', fqdn) self.assertEqual( [{'fqdn': True, 'metadata_only': False}, {'metadata_only': False}], mycloud.calls) def test_get_hostname_fqdn_from_passes_metadata_only_to_cloud(self): """Calls to cloud.get_hostname pass the metadata_only parameter.""" mycloud = FakeCloud('cloudhost', 'cloudhost.mycloud.com') _hn, _fqdn = util.get_hostname_fqdn( cfg={}, cloud=mycloud, metadata_only=True) self.assertEqual( [{'fqdn': True, 'metadata_only': True}, {'metadata_only': True}], mycloud.calls) class TestBlkid(CiTestCase): ids = { "id01": "1111-1111", "id02": "22222222-2222", "id03": "33333333-3333", "id04": "44444444-4444", "id05": "55555555-5555-5555-5555-555555555555", "id06": "66666666-6666-6666-6666-666666666666", "id07": "52894610484658920398", "id08": "86753098675309867530", "id09": "99999999-9999-9999-9999-999999999999", } blkid_out = dedent("""\ /dev/loop0: TYPE="squashfs" /dev/loop1: TYPE="squashfs" /dev/loop2: TYPE="squashfs" /dev/loop3: TYPE="squashfs" /dev/sda1: UUID="{id01}" TYPE="vfat" PARTUUID="{id02}" /dev/sda2: UUID="{id03}" TYPE="ext4" PARTUUID="{id04}" /dev/sda3: UUID="{id05}" TYPE="ext4" PARTUUID="{id06}" /dev/sda4: LABEL="default" UUID="{id07}" UUID_SUB="{id08}" """ """TYPE="zfs_member" PARTUUID="{id09}" /dev/loop4: TYPE="squashfs" """) maxDiff = None def _get_expected(self): return ({ "/dev/loop0": {"DEVNAME": "/dev/loop0", "TYPE": "squashfs"}, "/dev/loop1": {"DEVNAME": "/dev/loop1", "TYPE": "squashfs"}, "/dev/loop2": {"DEVNAME": "/dev/loop2", "TYPE": "squashfs"}, "/dev/loop3": {"DEVNAME": "/dev/loop3", "TYPE": "squashfs"}, "/dev/loop4": {"DEVNAME": "/dev/loop4", "TYPE": "squashfs"}, "/dev/sda1": {"DEVNAME": "/dev/sda1", "TYPE": "vfat", "UUID": self.ids["id01"], "PARTUUID": self.ids["id02"]}, "/dev/sda2": {"DEVNAME": "/dev/sda2", "TYPE": "ext4", "UUID": self.ids["id03"], "PARTUUID": self.ids["id04"]}, "/dev/sda3": {"DEVNAME": "/dev/sda3", "TYPE": "ext4", "UUID": self.ids["id05"], "PARTUUID": self.ids["id06"]}, "/dev/sda4": {"DEVNAME": "/dev/sda4", "TYPE": "zfs_member", "LABEL": "default", "UUID": self.ids["id07"], "UUID_SUB": self.ids["id08"], "PARTUUID": self.ids["id09"]}, }) @mock.patch("cloudinit.util.subp") def test_functional_blkid(self, m_subp): m_subp.return_value = ( self.blkid_out.format(**self.ids), "") self.assertEqual(self._get_expected(), util.blkid()) m_subp.assert_called_with(["blkid", "-o", "full"], capture=True, decode="replace") @mock.patch("cloudinit.util.subp") def test_blkid_no_cache_uses_no_cache(self, m_subp): """blkid should turn off cache if disable_cache is true.""" m_subp.return_value = ( self.blkid_out.format(**self.ids), "") self.assertEqual(self._get_expected(), util.blkid(disable_cache=True)) m_subp.assert_called_with(["blkid", "-o", "full", "-c", "/dev/null"], capture=True, decode="replace") @mock.patch('cloudinit.util.subp') class TestUdevadmSettle(CiTestCase): def test_with_no_params(self, m_subp): """called with no parameters.""" util.udevadm_settle() m_subp.called_once_with(mock.call(['udevadm', 'settle'])) def test_with_exists_and_not_exists(self, m_subp): """with exists=file where file does not exist should invoke subp.""" mydev = self.tmp_path("mydev") util.udevadm_settle(exists=mydev) m_subp.called_once_with( ['udevadm', 'settle', '--exit-if-exists=%s' % mydev]) def test_with_exists_and_file_exists(self, m_subp): """with exists=file where file does exist should not invoke subp.""" mydev = self.tmp_path("mydev") util.write_file(mydev, "foo\n") util.udevadm_settle(exists=mydev) self.assertIsNone(m_subp.call_args) def test_with_timeout_int(self, m_subp): """timeout can be an integer.""" timeout = 9 util.udevadm_settle(timeout=timeout) m_subp.called_once_with( ['udevadm', 'settle', '--timeout=%s' % timeout]) def test_with_timeout_string(self, m_subp): """timeout can be a string.""" timeout = "555" util.udevadm_settle(timeout=timeout) m_subp.assert_called_once_with( ['udevadm', 'settle', '--timeout=%s' % timeout]) def test_with_exists_and_timeout(self, m_subp): """test call with both exists and timeout.""" mydev = self.tmp_path("mydev") timeout = "3" util.udevadm_settle(exists=mydev) m_subp.called_once_with( ['udevadm', 'settle', '--exit-if-exists=%s' % mydev, '--timeout=%s' % timeout]) def test_subp_exception_raises_to_caller(self, m_subp): m_subp.side_effect = util.ProcessExecutionError("BOOM") self.assertRaises(util.ProcessExecutionError, util.udevadm_settle) @mock.patch('os.path.exists') class TestGetLinuxDistro(CiTestCase): @classmethod def os_release_exists(self, path): """Side effect function""" if path == '/etc/os-release': return 1 @classmethod def redhat_release_exists(self, path): """Side effect function """ if path == '/etc/redhat-release': return 1 @mock.patch('cloudinit.util.load_file') def test_get_linux_distro_quoted_name(self, m_os_release, m_path_exists): """Verify we get the correct name if the os-release file has the distro name in quotes""" m_os_release.return_value = OS_RELEASE_SLES m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists dist = util.get_linux_distro() self.assertEqual(('sles', '12.3', platform.machine()), dist) @mock.patch('cloudinit.util.load_file') def test_get_linux_distro_bare_name(self, m_os_release, m_path_exists): """Verify we get the correct name if the os-release file does not have the distro name in quotes""" m_os_release.return_value = OS_RELEASE_UBUNTU m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists dist = util.get_linux_distro() self.assertEqual(('ubuntu', '16.04', 'xenial'), dist) @mock.patch('cloudinit.util.load_file') def test_get_linux_centos6(self, m_os_release, m_path_exists): """Verify we get the correct name and release name on CentOS 6.""" m_os_release.return_value = REDHAT_RELEASE_CENTOS_6 m_path_exists.side_effect = TestGetLinuxDistro.redhat_release_exists dist = util.get_linux_distro() self.assertEqual(('centos', '6.10', 'Final'), dist) @mock.patch('cloudinit.util.load_file') def test_get_linux_centos7_redhat_release(self, m_os_release, m_exists): """Verify the correct release info on CentOS 7 without os-release.""" m_os_release.return_value = REDHAT_RELEASE_CENTOS_7 m_exists.side_effect = TestGetLinuxDistro.redhat_release_exists dist = util.get_linux_distro() self.assertEqual(('centos', '7.5.1804', 'Core'), dist) @mock.patch('cloudinit.util.load_file') def test_get_linux_redhat7_osrelease(self, m_os_release, m_path_exists): """Verify redhat 7 read from os-release.""" m_os_release.return_value = OS_RELEASE_REDHAT_7 m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists dist = util.get_linux_distro() self.assertEqual(('redhat', '7.5', 'Maipo'), dist) @mock.patch('cloudinit.util.load_file') def test_get_linux_redhat7_rhrelease(self, m_os_release, m_path_exists): """Verify redhat 7 read from redhat-release.""" m_os_release.return_value = REDHAT_RELEASE_REDHAT_7 m_path_exists.side_effect = TestGetLinuxDistro.redhat_release_exists dist = util.get_linux_distro() self.assertEqual(('redhat', '7.5', 'Maipo'), dist) @mock.patch('cloudinit.util.load_file') def test_get_linux_redhat6_rhrelease(self, m_os_release, m_path_exists): """Verify redhat 6 read from redhat-release.""" m_os_release.return_value = REDHAT_RELEASE_REDHAT_6 m_path_exists.side_effect = TestGetLinuxDistro.redhat_release_exists dist = util.get_linux_distro() self.assertEqual(('redhat', '6.10', 'Santiago'), dist) @mock.patch('cloudinit.util.load_file') def test_get_linux_copr_centos(self, m_os_release, m_path_exists): """Verify we get the correct name and release name on COPR CentOS.""" m_os_release.return_value = OS_RELEASE_CENTOS m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists dist = util.get_linux_distro() self.assertEqual(('centos', '7', 'Core'), dist) @mock.patch('cloudinit.util.load_file') def test_get_linux_debian(self, m_os_release, m_path_exists): """Verify we get the correct name and release name on Debian.""" m_os_release.return_value = OS_RELEASE_DEBIAN m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists dist = util.get_linux_distro() self.assertEqual(('debian', '9', 'stretch'), dist) @mock.patch('cloudinit.util.load_file') def test_get_linux_opensuse(self, m_os_release, m_path_exists): """Verify we get the correct name and machine arch on openSUSE prior to openSUSE Leap 15. """ m_os_release.return_value = OS_RELEASE_OPENSUSE m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists dist = util.get_linux_distro() self.assertEqual(('opensuse', '42.3', platform.machine()), dist) @mock.patch('cloudinit.util.load_file') def test_get_linux_opensuse_l15(self, m_os_release, m_path_exists): """Verify we get the correct name and machine arch on openSUSE for openSUSE Leap 15.0 and later. """ m_os_release.return_value = OS_RELEASE_OPENSUSE_L15 m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists dist = util.get_linux_distro() self.assertEqual(('opensuse-leap', '15.0', platform.machine()), dist) @mock.patch('cloudinit.util.load_file') def test_get_linux_opensuse_tw(self, m_os_release, m_path_exists): """Verify we get the correct name and machine arch on openSUSE for openSUSE Tumbleweed """ m_os_release.return_value = OS_RELEASE_OPENSUSE_TW m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists dist = util.get_linux_distro() self.assertEqual( ('opensuse-tumbleweed', '20180920', platform.machine()), dist) @mock.patch('platform.dist') def test_get_linux_distro_no_data(self, m_platform_dist, m_path_exists): """Verify we get no information if os-release does not exist""" m_platform_dist.return_value = ('', '', '') m_path_exists.return_value = 0 dist = util.get_linux_distro() self.assertEqual(('', '', ''), dist) @mock.patch('platform.dist') def test_get_linux_distro_no_impl(self, m_platform_dist, m_path_exists): """Verify we get an empty tuple when no information exists and Exceptions are not propagated""" m_platform_dist.side_effect = Exception() m_path_exists.return_value = 0 dist = util.get_linux_distro() self.assertEqual(('', '', ''), dist) @mock.patch('platform.dist') def test_get_linux_distro_plat_data(self, m_platform_dist, m_path_exists): """Verify we get the correct platform information""" m_platform_dist.return_value = ('foo', '1.1', 'aarch64') m_path_exists.return_value = 0 dist = util.get_linux_distro() self.assertEqual(('foo', '1.1', 'aarch64'), dist) @mock.patch('os.path.exists') class TestIsLXD(CiTestCase): def test_is_lxd_true_on_sock_device(self, m_exists): """When lxd's /dev/lxd/sock exists, is_lxd returns true.""" m_exists.return_value = True self.assertTrue(util.is_lxd()) m_exists.assert_called_once_with('/dev/lxd/sock') def test_is_lxd_false_when_sock_device_absent(self, m_exists): """When lxd's /dev/lxd/sock is absent, is_lxd returns false.""" m_exists.return_value = False self.assertFalse(util.is_lxd()) m_exists.assert_called_once_with('/dev/lxd/sock') # vi: ts=4 expandtab