diff options
Diffstat (limited to 'tests/unittests/test_util.py')
-rw-r--r-- | tests/unittests/test_util.py | 2034 |
1 files changed, 1730 insertions, 304 deletions
diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py index 857629f1..3765511b 100644 --- a/tests/unittests/test_util.py +++ b/tests/unittests/test_util.py @@ -1,23 +1,1339 @@ # This file is part of cloud-init. See LICENSE file for license information. +"""Tests for cloudinit.util""" + +import base64 import io +import json import logging import os +import platform import re import shutil import stat import tempfile +from textwrap import dedent +from unittest import mock + import pytest import yaml -from unittest import mock -from cloudinit import subp -from cloudinit import importer, util -from cloudinit.tests import helpers +from cloudinit import importer, subp, util +from tests.unittests import helpers +from tests.unittests.helpers import CiTestCase + +LOG = logging.getLogger(__name__) + +MOUNT_INFO = [ + "68 0 8:3 / / ro,relatime shared:1 - btrfs /dev/sda1 ro,attr2,inode64", + "153 68 254:0 / /home rw,relatime shared:101 - xfs /dev/sda2 rw,attr2", +] + +OS_RELEASE_SLES = dedent( + """\ + NAME="SLES" + VERSION="12-SP3" + VERSION_ID="12.3" + PRETTY_NAME="SUSE Linux Enterprise Server 12 SP3" + ID="sles" + ANSI_COLOR="0;32" + CPE_NAME="cpe:/o:suse:sles:12:sp3" +""" +) + +OS_RELEASE_OPENSUSE = dedent( + """\ + NAME="openSUSE Leap" + VERSION="42.3" + ID=opensuse + ID_LIKE="suse" + VERSION_ID="42.3" + PRETTY_NAME="openSUSE Leap 42.3" + ANSI_COLOR="0;32" + CPE_NAME="cpe:/o:opensuse:leap:42.3" + BUG_REPORT_URL="https://bugs.opensuse.org" + HOME_URL="https://www.opensuse.org/" +""" +) + +OS_RELEASE_OPENSUSE_L15 = dedent( + """\ + NAME="openSUSE Leap" + VERSION="15.0" + ID="opensuse-leap" + ID_LIKE="suse opensuse" + VERSION_ID="15.0" + PRETTY_NAME="openSUSE Leap 15.0" + ANSI_COLOR="0;32" + CPE_NAME="cpe:/o:opensuse:leap:15.0" + BUG_REPORT_URL="https://bugs.opensuse.org" + HOME_URL="https://www.opensuse.org/" +""" +) + +OS_RELEASE_OPENSUSE_TW = dedent( + """\ + NAME="openSUSE Tumbleweed" + ID="opensuse-tumbleweed" + ID_LIKE="opensuse suse" + VERSION_ID="20180920" + PRETTY_NAME="openSUSE Tumbleweed" + ANSI_COLOR="0;32" + CPE_NAME="cpe:/o:opensuse:tumbleweed:20180920" + BUG_REPORT_URL="https://bugs.opensuse.org" + HOME_URL="https://www.opensuse.org/" +""" +) + +OS_RELEASE_CENTOS = dedent( + """\ + NAME="CentOS Linux" + VERSION="7 (Core)" + ID="centos" + ID_LIKE="rhel fedora" + VERSION_ID="7" + PRETTY_NAME="CentOS Linux 7 (Core)" + ANSI_COLOR="0;31" + CPE_NAME="cpe:/o:centos:centos:7" + HOME_URL="https://www.centos.org/" + BUG_REPORT_URL="https://bugs.centos.org/" + + CENTOS_MANTISBT_PROJECT="CentOS-7" + CENTOS_MANTISBT_PROJECT_VERSION="7" + REDHAT_SUPPORT_PRODUCT="centos" + REDHAT_SUPPORT_PRODUCT_VERSION="7" +""" +) + +OS_RELEASE_REDHAT_7 = dedent( + """\ + NAME="Red Hat Enterprise Linux Server" + VERSION="7.5 (Maipo)" + ID="rhel" + ID_LIKE="fedora" + VARIANT="Server" + VARIANT_ID="server" + VERSION_ID="7.5" + PRETTY_NAME="Red Hat" + ANSI_COLOR="0;31" + CPE_NAME="cpe:/o:redhat:enterprise_linux:7.5:GA:server" + HOME_URL="https://www.redhat.com/" + BUG_REPORT_URL="https://bugzilla.redhat.com/" + + REDHAT_BUGZILLA_PRODUCT="Red Hat Enterprise Linux 7" + REDHAT_BUGZILLA_PRODUCT_VERSION=7.5 + REDHAT_SUPPORT_PRODUCT="Red Hat Enterprise Linux" + REDHAT_SUPPORT_PRODUCT_VERSION="7.5" +""" +) + +OS_RELEASE_ALMALINUX_8 = dedent( + """\ + NAME="AlmaLinux" + VERSION="8.3 (Purple Manul)" + ID="almalinux" + ID_LIKE="rhel centos fedora" + VERSION_ID="8.3" + PLATFORM_ID="platform:el8" + PRETTY_NAME="AlmaLinux 8.3 (Purple Manul)" + ANSI_COLOR="0;34" + CPE_NAME="cpe:/o:almalinux:almalinux:8.3:GA" + HOME_URL="https://almalinux.org/" + BUG_REPORT_URL="https://bugs.almalinux.org/" + + ALMALINUX_MANTISBT_PROJECT="AlmaLinux-8" + ALMALINUX_MANTISBT_PROJECT_VERSION="8.3" +""" +) + +OS_RELEASE_EUROLINUX_7 = dedent( + """\ + VERSION="7.9 (Minsk)" + ID="eurolinux" + ID_LIKE="rhel scientific centos fedora" + VERSION_ID="7.9" + PRETTY_NAME="EuroLinux 7.9 (Minsk)" + ANSI_COLOR="0;31" + CPE_NAME="cpe:/o:eurolinux:eurolinux:7.9:GA" + HOME_URL="http://www.euro-linux.com/" + BUG_REPORT_URL="mailto:support@euro-linux.com" + REDHAT_BUGZILLA_PRODUCT="EuroLinux 7" + REDHAT_BUGZILLA_PRODUCT_VERSION=7.9 + REDHAT_SUPPORT_PRODUCT="EuroLinux" + REDHAT_SUPPORT_PRODUCT_VERSION="7.9" +""" +) + +OS_RELEASE_EUROLINUX_8 = dedent( + """\ + NAME="EuroLinux" + VERSION="8.4 (Vaduz)" + ID="eurolinux" + ID_LIKE="rhel fedora centos" + VERSION_ID="8.4" + PLATFORM_ID="platform:el8" + PRETTY_NAME="EuroLinux 8.4 (Vaduz)" + ANSI_COLOR="0;34" + CPE_NAME="cpe:/o:eurolinux:eurolinux:8" + HOME_URL="https://www.euro-linux.com/" + BUG_REPORT_URL="https://github.com/EuroLinux/eurolinux-distro-bugs-and-rfc/" + REDHAT_SUPPORT_PRODUCT="EuroLinux" + REDHAT_SUPPORT_PRODUCT_VERSION="8" +""" +) + +OS_RELEASE_MIRACLELINUX_8 = dedent( + """\ + NAME="MIRACLE LINUX" + VERSION="8.4 (Peony)" + ID="miraclelinux" + ID_LIKE="rhel fedora" + PLATFORM_ID="platform:el8" + VERSION_ID="8" + PRETTY_NAME="MIRACLE LINUX 8.4 (Peony)" + ANSI_COLOR="0;31" + CPE_NAME="cpe:/o:cybertrust_japan:miracle_linux:8" + HOME_URL="https://www.cybertrust.co.jp/miracle-linux/" + DOCUMENTATION_URL="https://www.miraclelinux.com/support/miraclelinux8" + BUG_REPORT_URL="https://bugzilla.asianux.com/" + MIRACLELINUX_SUPPORT_PRODUCT="MIRACLE LINUX" + MIRACLELINUX_SUPPORT_PRODUCT_VERSION="8" +""" +) + +OS_RELEASE_ROCKY_8 = dedent( + """\ + NAME="Rocky Linux" + VERSION="8.3 (Green Obsidian)" + ID="rocky" + ID_LIKE="rhel fedora" + VERSION_ID="8.3" + PLATFORM_ID="platform:el8" + PRETTY_NAME="Rocky Linux 8.3 (Green Obsidian)" + ANSI_COLOR="0;31" + CPE_NAME="cpe:/o:rocky:rocky:8" + HOME_URL="https://rockylinux.org/" + BUG_REPORT_URL="https://bugs.rockylinux.org/" + ROCKY_SUPPORT_PRODUCT="Rocky Linux" + ROCKY_SUPPORT_PRODUCT_VERSION="8" +""" +) + +OS_RELEASE_VIRTUOZZO_8 = dedent( + """\ + NAME="Virtuozzo Linux" + VERSION="8" + ID="virtuozzo" + ID_LIKE="rhel fedora" + VERSION_ID="8" + PLATFORM_ID="platform:el8" + PRETTY_NAME="Virtuozzo Linux" + ANSI_COLOR="0;31" + CPE_NAME="cpe:/o:virtuozzoproject:vzlinux:8" + HOME_URL="https://www.vzlinux.org" + BUG_REPORT_URL="https://bugs.openvz.org" +""" +) + +OS_RELEASE_CLOUDLINUX_8 = dedent( + """\ + NAME="CloudLinux" + VERSION="8.4 (Valery Rozhdestvensky)" + ID="cloudlinux" + ID_LIKE="rhel fedora centos" + VERSION_ID="8.4" + PLATFORM_ID="platform:el8" + PRETTY_NAME="CloudLinux 8.4 (Valery Rozhdestvensky)" + ANSI_COLOR="0;31" + CPE_NAME="cpe:/o:cloudlinux:cloudlinux:8.4:GA:server" + HOME_URL="https://www.cloudlinux.com/" + BUG_REPORT_URL="https://www.cloudlinux.com/support" +""" +) + +OS_RELEASE_OPENEULER_20 = dedent( + """\ + NAME="openEuler" + VERSION="20.03 (LTS-SP2)" + ID="openEuler" + VERSION_ID="20.03" + PRETTY_NAME="openEuler 20.03 (LTS-SP2)" + ANSI_COLOR="0;31" +""" +) + +REDHAT_RELEASE_CENTOS_6 = "CentOS release 6.10 (Final)" +REDHAT_RELEASE_CENTOS_7 = "CentOS Linux release 7.5.1804 (Core)" +REDHAT_RELEASE_REDHAT_6 = ( + "Red Hat Enterprise Linux Server release 6.10 (Santiago)" +) +REDHAT_RELEASE_REDHAT_7 = "Red Hat Enterprise Linux Server release 7.5 (Maipo)" +REDHAT_RELEASE_ALMALINUX_8 = "AlmaLinux release 8.3 (Purple Manul)" +REDHAT_RELEASE_EUROLINUX_7 = "EuroLinux release 7.9 (Minsk)" +REDHAT_RELEASE_EUROLINUX_8 = "EuroLinux release 8.4 (Vaduz)" +REDHAT_RELEASE_MIRACLELINUX_8 = "MIRACLE LINUX release 8.4 (Peony)" +REDHAT_RELEASE_ROCKY_8 = "Rocky Linux release 8.3 (Green Obsidian)" +REDHAT_RELEASE_VIRTUOZZO_8 = "Virtuozzo Linux release 8" +REDHAT_RELEASE_CLOUDLINUX_8 = "CloudLinux release 8.4 (Valery Rozhdestvensky)" +OS_RELEASE_DEBIAN = dedent( + """\ + PRETTY_NAME="Debian GNU/Linux 9 (stretch)" + NAME="Debian GNU/Linux" + VERSION_ID="9" + VERSION="9 (stretch)" + ID=debian + HOME_URL="https://www.debian.org/" + SUPPORT_URL="https://www.debian.org/support" + BUG_REPORT_URL="https://bugs.debian.org/" +""" +) + +OS_RELEASE_UBUNTU = dedent( + """\ + NAME="Ubuntu"\n + # comment test + VERSION="16.04.3 LTS (Xenial Xerus)"\n + ID=ubuntu\n + ID_LIKE=debian\n + PRETTY_NAME="Ubuntu 16.04.3 LTS"\n + VERSION_ID="16.04"\n + HOME_URL="http://www.ubuntu.com/"\n + SUPPORT_URL="http://help.ubuntu.com/"\n + BUG_REPORT_URL="http://bugs.launchpad.net/ubuntu/"\n + VERSION_CODENAME=xenial\n + UBUNTU_CODENAME=xenial\n +""" +) + +OS_RELEASE_PHOTON = """\ + NAME="VMware Photon OS" + VERSION="4.0" + ID=photon + VERSION_ID=4.0 + PRETTY_NAME="VMware Photon OS/Linux" + ANSI_COLOR="1;34" + HOME_URL="https://vmware.github.io/photon/" + BUG_REPORT_URL="https://github.com/vmware/photon/issues" +""" + + +class FakeCloud(object): + def __init__(self, hostname, fqdn): + self.hostname = hostname + self.fqdn = fqdn + self.calls = [] + + def get_hostname(self, fqdn=None, metadata_only=None): + myargs = {} + if fqdn is not None: + myargs["fqdn"] = fqdn + if metadata_only is not None: + myargs["metadata_only"] = metadata_only + self.calls.append(myargs) + if fqdn: + return self.fqdn + return self.hostname + + +class TestUtil(CiTestCase): + def test_parse_mount_info_no_opts_no_arg(self): + result = util.parse_mount_info("/home", MOUNT_INFO, LOG) + self.assertEqual(("/dev/sda2", "xfs", "/home"), result) + + def test_parse_mount_info_no_opts_arg(self): + result = util.parse_mount_info("/home", MOUNT_INFO, LOG, False) + self.assertEqual(("/dev/sda2", "xfs", "/home"), result) + + def test_parse_mount_info_with_opts(self): + result = util.parse_mount_info("/", MOUNT_INFO, LOG, True) + self.assertEqual(("/dev/sda1", "btrfs", "/", "ro,relatime"), result) + + @mock.patch("cloudinit.util.get_mount_info") + def test_mount_is_rw(self, m_mount_info): + m_mount_info.return_value = ("/dev/sda1", "btrfs", "/", "rw,relatime") + is_rw = util.mount_is_read_write("/") + self.assertEqual(is_rw, True) + + @mock.patch("cloudinit.util.get_mount_info") + def test_mount_is_ro(self, m_mount_info): + m_mount_info.return_value = ("/dev/sda1", "btrfs", "/", "ro,relatime") + is_rw = util.mount_is_read_write("/") + self.assertEqual(is_rw, False) + + +class TestUptime(CiTestCase): + @mock.patch("cloudinit.util.boottime") + @mock.patch("cloudinit.util.os.path.exists") + @mock.patch("cloudinit.util.time.time") + def test_uptime_non_linux_path(self, m_time, m_exists, m_boottime): + boottime = 1000.0 + uptime = 10.0 + m_boottime.return_value = boottime + m_time.return_value = boottime + uptime + m_exists.return_value = False + result = util.uptime() + self.assertEqual(str(uptime), result) + + +class TestShellify(CiTestCase): + def test_input_dict_raises_type_error(self): + self.assertRaisesRegex( + TypeError, + "Input.*was.*dict.*xpected", + util.shellify, + {"mykey": "myval"}, + ) + + def test_input_str_raises_type_error(self): + self.assertRaisesRegex( + TypeError, "Input.*was.*str.*xpected", util.shellify, "foobar" + ) + def test_value_with_int_raises_type_error(self): + self.assertRaisesRegex( + TypeError, "shellify.*int", util.shellify, ["foo", 1] + ) -class FakeSelinux(object): + def test_supports_strings_and_lists(self): + self.assertEqual( + "\n".join( + [ + "#!/bin/sh", + "echo hi mom", + "'echo' 'hi dad'", + "'echo' 'hi' 'sis'", + "", + ] + ), + util.shellify( + ["echo hi mom", ["echo", "hi dad"], ("echo", "hi", "sis")] + ), + ) + + def test_supports_comments(self): + self.assertEqual( + "\n".join(["#!/bin/sh", "echo start", "echo end", ""]), + util.shellify(["echo start", None, "echo end"]), + ) + + +class TestGetHostnameFqdn(CiTestCase): + def test_get_hostname_fqdn_from_only_cfg_fqdn(self): + """When cfg only has the fqdn key, derive hostname and fqdn from it.""" + hostname, fqdn = util.get_hostname_fqdn( + cfg={"fqdn": "myhost.domain.com"}, cloud=None + ) + self.assertEqual("myhost", hostname) + self.assertEqual("myhost.domain.com", fqdn) + + def test_get_hostname_fqdn_from_cfg_fqdn_and_hostname(self): + """When cfg has both fqdn and hostname keys, return them.""" + hostname, fqdn = util.get_hostname_fqdn( + cfg={"fqdn": "myhost.domain.com", "hostname": "other"}, cloud=None + ) + self.assertEqual("other", hostname) + self.assertEqual("myhost.domain.com", fqdn) + + def test_get_hostname_fqdn_from_cfg_hostname_with_domain(self): + """When cfg has only hostname key which represents a fqdn, use that.""" + hostname, fqdn = util.get_hostname_fqdn( + cfg={"hostname": "myhost.domain.com"}, cloud=None + ) + self.assertEqual("myhost", hostname) + self.assertEqual("myhost.domain.com", fqdn) + + def test_get_hostname_fqdn_from_cfg_hostname_without_domain(self): + """When cfg has a hostname without a '.' query cloud.get_hostname.""" + mycloud = FakeCloud("cloudhost", "cloudhost.mycloud.com") + hostname, fqdn = util.get_hostname_fqdn( + cfg={"hostname": "myhost"}, cloud=mycloud + ) + self.assertEqual("myhost", hostname) + self.assertEqual("cloudhost.mycloud.com", fqdn) + self.assertEqual( + [{"fqdn": True, "metadata_only": False}], mycloud.calls + ) + + def test_get_hostname_fqdn_from_without_fqdn_or_hostname(self): + """When cfg has neither hostname nor fqdn cloud.get_hostname.""" + mycloud = FakeCloud("cloudhost", "cloudhost.mycloud.com") + hostname, fqdn = util.get_hostname_fqdn(cfg={}, cloud=mycloud) + self.assertEqual("cloudhost", hostname) + self.assertEqual("cloudhost.mycloud.com", fqdn) + self.assertEqual( + [{"fqdn": True, "metadata_only": False}, {"metadata_only": False}], + mycloud.calls, + ) + + def test_get_hostname_fqdn_from_passes_metadata_only_to_cloud(self): + """Calls to cloud.get_hostname pass the metadata_only parameter.""" + mycloud = FakeCloud("cloudhost", "cloudhost.mycloud.com") + _hn, _fqdn = util.get_hostname_fqdn( + cfg={}, cloud=mycloud, metadata_only=True + ) + self.assertEqual( + [{"fqdn": True, "metadata_only": True}, {"metadata_only": True}], + mycloud.calls, + ) + + +class TestBlkid(CiTestCase): + ids = { + "id01": "1111-1111", + "id02": "22222222-2222", + "id03": "33333333-3333", + "id04": "44444444-4444", + "id05": "55555555-5555-5555-5555-555555555555", + "id06": "66666666-6666-6666-6666-666666666666", + "id07": "52894610484658920398", + "id08": "86753098675309867530", + "id09": "99999999-9999-9999-9999-999999999999", + } + + blkid_out = dedent( + """\ + /dev/loop0: TYPE="squashfs" + /dev/loop1: TYPE="squashfs" + /dev/loop2: TYPE="squashfs" + /dev/loop3: TYPE="squashfs" + /dev/sda1: UUID="{id01}" TYPE="vfat" PARTUUID="{id02}" + /dev/sda2: UUID="{id03}" TYPE="ext4" PARTUUID="{id04}" + /dev/sda3: UUID="{id05}" TYPE="ext4" PARTUUID="{id06}" + /dev/sda4: LABEL="default" UUID="{id07}" UUID_SUB="{id08}" """ + """TYPE="zfs_member" PARTUUID="{id09}" + /dev/loop4: TYPE="squashfs" + """ + ) + + maxDiff = None + + def _get_expected(self): + return { + "/dev/loop0": {"DEVNAME": "/dev/loop0", "TYPE": "squashfs"}, + "/dev/loop1": {"DEVNAME": "/dev/loop1", "TYPE": "squashfs"}, + "/dev/loop2": {"DEVNAME": "/dev/loop2", "TYPE": "squashfs"}, + "/dev/loop3": {"DEVNAME": "/dev/loop3", "TYPE": "squashfs"}, + "/dev/loop4": {"DEVNAME": "/dev/loop4", "TYPE": "squashfs"}, + "/dev/sda1": { + "DEVNAME": "/dev/sda1", + "TYPE": "vfat", + "UUID": self.ids["id01"], + "PARTUUID": self.ids["id02"], + }, + "/dev/sda2": { + "DEVNAME": "/dev/sda2", + "TYPE": "ext4", + "UUID": self.ids["id03"], + "PARTUUID": self.ids["id04"], + }, + "/dev/sda3": { + "DEVNAME": "/dev/sda3", + "TYPE": "ext4", + "UUID": self.ids["id05"], + "PARTUUID": self.ids["id06"], + }, + "/dev/sda4": { + "DEVNAME": "/dev/sda4", + "TYPE": "zfs_member", + "LABEL": "default", + "UUID": self.ids["id07"], + "UUID_SUB": self.ids["id08"], + "PARTUUID": self.ids["id09"], + }, + } + + @mock.patch("cloudinit.subp.subp") + def test_functional_blkid(self, m_subp): + m_subp.return_value = (self.blkid_out.format(**self.ids), "") + self.assertEqual(self._get_expected(), util.blkid()) + m_subp.assert_called_with( + ["blkid", "-o", "full"], capture=True, decode="replace" + ) + + @mock.patch("cloudinit.subp.subp") + def test_blkid_no_cache_uses_no_cache(self, m_subp): + """blkid should turn off cache if disable_cache is true.""" + m_subp.return_value = (self.blkid_out.format(**self.ids), "") + self.assertEqual(self._get_expected(), util.blkid(disable_cache=True)) + m_subp.assert_called_with( + ["blkid", "-o", "full", "-c", "/dev/null"], + capture=True, + decode="replace", + ) + + +@mock.patch("cloudinit.subp.subp") +class TestUdevadmSettle(CiTestCase): + def test_with_no_params(self, m_subp): + """called with no parameters.""" + util.udevadm_settle() + m_subp.called_once_with(mock.call(["udevadm", "settle"])) + + def test_with_exists_and_not_exists(self, m_subp): + """with exists=file where file does not exist should invoke subp.""" + mydev = self.tmp_path("mydev") + util.udevadm_settle(exists=mydev) + m_subp.called_once_with( + ["udevadm", "settle", "--exit-if-exists=%s" % mydev] + ) + + def test_with_exists_and_file_exists(self, m_subp): + """with exists=file where file does exist should not invoke subp.""" + mydev = self.tmp_path("mydev") + util.write_file(mydev, "foo\n") + util.udevadm_settle(exists=mydev) + self.assertIsNone(m_subp.call_args) + + def test_with_timeout_int(self, m_subp): + """timeout can be an integer.""" + timeout = 9 + util.udevadm_settle(timeout=timeout) + m_subp.called_once_with( + ["udevadm", "settle", "--timeout=%s" % timeout] + ) + + def test_with_timeout_string(self, m_subp): + """timeout can be a string.""" + timeout = "555" + util.udevadm_settle(timeout=timeout) + m_subp.assert_called_once_with( + ["udevadm", "settle", "--timeout=%s" % timeout] + ) + + def test_with_exists_and_timeout(self, m_subp): + """test call with both exists and timeout.""" + mydev = self.tmp_path("mydev") + timeout = "3" + util.udevadm_settle(exists=mydev) + m_subp.called_once_with( + [ + "udevadm", + "settle", + "--exit-if-exists=%s" % mydev, + "--timeout=%s" % timeout, + ] + ) + + def test_subp_exception_raises_to_caller(self, m_subp): + m_subp.side_effect = subp.ProcessExecutionError("BOOM") + self.assertRaises(subp.ProcessExecutionError, util.udevadm_settle) + + +@mock.patch("os.path.exists") +class TestGetLinuxDistro(CiTestCase): + def setUp(self): + # python2 has no lru_cache, and therefore, no cache_clear() + if hasattr(util.get_linux_distro, "cache_clear"): + util.get_linux_distro.cache_clear() + + @classmethod + def os_release_exists(self, path): + """Side effect function""" + if path == "/etc/os-release": + return 1 + + @classmethod + def redhat_release_exists(self, path): + """Side effect function""" + if path == "/etc/redhat-release": + return 1 + @mock.patch("cloudinit.util.load_file") + def test_get_linux_distro_quoted_name(self, m_os_release, m_path_exists): + """Verify we get the correct name if the os-release file has + the distro name in quotes""" + m_os_release.return_value = OS_RELEASE_SLES + m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists + dist = util.get_linux_distro() + self.assertEqual(("sles", "12.3", platform.machine()), dist) + + @mock.patch("cloudinit.util.load_file") + def test_get_linux_distro_bare_name(self, m_os_release, m_path_exists): + """Verify we get the correct name if the os-release file does not + have the distro name in quotes""" + m_os_release.return_value = OS_RELEASE_UBUNTU + m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists + dist = util.get_linux_distro() + self.assertEqual(("ubuntu", "16.04", "xenial"), dist) + + @mock.patch("platform.system") + @mock.patch("platform.release") + @mock.patch("cloudinit.util._parse_redhat_release") + def test_get_linux_freebsd( + self, + m_parse_redhat_release, + m_platform_release, + m_platform_system, + m_path_exists, + ): + """Verify we get the correct name and release name on FreeBSD.""" + m_path_exists.return_value = False + m_platform_release.return_value = "12.0-RELEASE-p10" + m_platform_system.return_value = "FreeBSD" + m_parse_redhat_release.return_value = {} + util.is_BSD.cache_clear() + dist = util.get_linux_distro() + self.assertEqual(("freebsd", "12.0-RELEASE-p10", ""), dist) + + @mock.patch("cloudinit.util.load_file") + def test_get_linux_centos6(self, m_os_release, m_path_exists): + """Verify we get the correct name and release name on CentOS 6.""" + m_os_release.return_value = REDHAT_RELEASE_CENTOS_6 + m_path_exists.side_effect = TestGetLinuxDistro.redhat_release_exists + dist = util.get_linux_distro() + self.assertEqual(("centos", "6.10", "Final"), dist) + + @mock.patch("cloudinit.util.load_file") + def test_get_linux_centos7_redhat_release(self, m_os_release, m_exists): + """Verify the correct release info on CentOS 7 without os-release.""" + m_os_release.return_value = REDHAT_RELEASE_CENTOS_7 + m_exists.side_effect = TestGetLinuxDistro.redhat_release_exists + dist = util.get_linux_distro() + self.assertEqual(("centos", "7.5.1804", "Core"), dist) + + @mock.patch("cloudinit.util.load_file") + def test_get_linux_redhat7_osrelease(self, m_os_release, m_path_exists): + """Verify redhat 7 read from os-release.""" + m_os_release.return_value = OS_RELEASE_REDHAT_7 + m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists + dist = util.get_linux_distro() + self.assertEqual(("redhat", "7.5", "Maipo"), dist) + + @mock.patch("cloudinit.util.load_file") + def test_get_linux_redhat7_rhrelease(self, m_os_release, m_path_exists): + """Verify redhat 7 read from redhat-release.""" + m_os_release.return_value = REDHAT_RELEASE_REDHAT_7 + m_path_exists.side_effect = TestGetLinuxDistro.redhat_release_exists + dist = util.get_linux_distro() + self.assertEqual(("redhat", "7.5", "Maipo"), dist) + + @mock.patch("cloudinit.util.load_file") + def test_get_linux_redhat6_rhrelease(self, m_os_release, m_path_exists): + """Verify redhat 6 read from redhat-release.""" + m_os_release.return_value = REDHAT_RELEASE_REDHAT_6 + m_path_exists.side_effect = TestGetLinuxDistro.redhat_release_exists + dist = util.get_linux_distro() + self.assertEqual(("redhat", "6.10", "Santiago"), dist) + + @mock.patch("cloudinit.util.load_file") + def test_get_linux_copr_centos(self, m_os_release, m_path_exists): + """Verify we get the correct name and release name on COPR CentOS.""" + m_os_release.return_value = OS_RELEASE_CENTOS + m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists + dist = util.get_linux_distro() + self.assertEqual(("centos", "7", "Core"), dist) + + @mock.patch("cloudinit.util.load_file") + def test_get_linux_almalinux8_rhrelease(self, m_os_release, m_path_exists): + """Verify almalinux 8 read from redhat-release.""" + m_os_release.return_value = REDHAT_RELEASE_ALMALINUX_8 + m_path_exists.side_effect = TestGetLinuxDistro.redhat_release_exists + dist = util.get_linux_distro() + self.assertEqual(("almalinux", "8.3", "Purple Manul"), dist) + + @mock.patch("cloudinit.util.load_file") + def test_get_linux_almalinux8_osrelease(self, m_os_release, m_path_exists): + """Verify almalinux 8 read from os-release.""" + m_os_release.return_value = OS_RELEASE_ALMALINUX_8 + m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists + dist = util.get_linux_distro() + self.assertEqual(("almalinux", "8.3", "Purple Manul"), dist) + + @mock.patch("cloudinit.util.load_file") + def test_get_linux_eurolinux7_rhrelease(self, m_os_release, m_path_exists): + """Verify eurolinux 7 read from redhat-release.""" + m_os_release.return_value = REDHAT_RELEASE_EUROLINUX_7 + m_path_exists.side_effect = TestGetLinuxDistro.redhat_release_exists + dist = util.get_linux_distro() + self.assertEqual(("eurolinux", "7.9", "Minsk"), dist) + + @mock.patch("cloudinit.util.load_file") + def test_get_linux_eurolinux7_osrelease(self, m_os_release, m_path_exists): + """Verify eurolinux 7 read from os-release.""" + m_os_release.return_value = OS_RELEASE_EUROLINUX_7 + m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists + dist = util.get_linux_distro() + self.assertEqual(("eurolinux", "7.9", "Minsk"), dist) + + @mock.patch("cloudinit.util.load_file") + def test_get_linux_eurolinux8_rhrelease(self, m_os_release, m_path_exists): + """Verify eurolinux 8 read from redhat-release.""" + m_os_release.return_value = REDHAT_RELEASE_EUROLINUX_8 + m_path_exists.side_effect = TestGetLinuxDistro.redhat_release_exists + dist = util.get_linux_distro() + self.assertEqual(("eurolinux", "8.4", "Vaduz"), dist) + + @mock.patch("cloudinit.util.load_file") + def test_get_linux_eurolinux8_osrelease(self, m_os_release, m_path_exists): + """Verify eurolinux 8 read from os-release.""" + m_os_release.return_value = OS_RELEASE_EUROLINUX_8 + m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists + dist = util.get_linux_distro() + self.assertEqual(("eurolinux", "8.4", "Vaduz"), dist) + + @mock.patch("cloudinit.util.load_file") + def test_get_linux_miraclelinux8_rhrelease( + self, m_os_release, m_path_exists + ): + """Verify miraclelinux 8 read from redhat-release.""" + m_os_release.return_value = REDHAT_RELEASE_MIRACLELINUX_8 + m_path_exists.side_effect = TestGetLinuxDistro.redhat_release_exists + dist = util.get_linux_distro() + self.assertEqual(("miracle", "8.4", "Peony"), dist) + + @mock.patch("cloudinit.util.load_file") + def test_get_linux_miraclelinux8_osrelease( + self, m_os_release, m_path_exists + ): + """Verify miraclelinux 8 read from os-release.""" + m_os_release.return_value = OS_RELEASE_MIRACLELINUX_8 + m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists + dist = util.get_linux_distro() + self.assertEqual(("miraclelinux", "8", "Peony"), dist) + + @mock.patch("cloudinit.util.load_file") + def test_get_linux_rocky8_rhrelease(self, m_os_release, m_path_exists): + """Verify rocky linux 8 read from redhat-release.""" + m_os_release.return_value = REDHAT_RELEASE_ROCKY_8 + m_path_exists.side_effect = TestGetLinuxDistro.redhat_release_exists + dist = util.get_linux_distro() + self.assertEqual(("rocky", "8.3", "Green Obsidian"), dist) + + @mock.patch("cloudinit.util.load_file") + def test_get_linux_rocky8_osrelease(self, m_os_release, m_path_exists): + """Verify rocky linux 8 read from os-release.""" + m_os_release.return_value = OS_RELEASE_ROCKY_8 + m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists + dist = util.get_linux_distro() + self.assertEqual(("rocky", "8.3", "Green Obsidian"), dist) + + @mock.patch("cloudinit.util.load_file") + def test_get_linux_virtuozzo8_rhrelease(self, m_os_release, m_path_exists): + """Verify virtuozzo linux 8 read from redhat-release.""" + m_os_release.return_value = REDHAT_RELEASE_VIRTUOZZO_8 + m_path_exists.side_effect = TestGetLinuxDistro.redhat_release_exists + dist = util.get_linux_distro() + self.assertEqual(("virtuozzo", "8", "Virtuozzo Linux"), dist) + + @mock.patch("cloudinit.util.load_file") + def test_get_linux_virtuozzo8_osrelease(self, m_os_release, m_path_exists): + """Verify virtuozzo linux 8 read from os-release.""" + m_os_release.return_value = OS_RELEASE_VIRTUOZZO_8 + m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists + dist = util.get_linux_distro() + self.assertEqual(("virtuozzo", "8", "Virtuozzo Linux"), dist) + + @mock.patch("cloudinit.util.load_file") + def test_get_linux_cloud8_rhrelease(self, m_os_release, m_path_exists): + """Verify cloudlinux 8 read from redhat-release.""" + m_os_release.return_value = REDHAT_RELEASE_CLOUDLINUX_8 + m_path_exists.side_effect = TestGetLinuxDistro.redhat_release_exists + dist = util.get_linux_distro() + self.assertEqual(("cloudlinux", "8.4", "Valery Rozhdestvensky"), dist) + + @mock.patch("cloudinit.util.load_file") + def test_get_linux_cloud8_osrelease(self, m_os_release, m_path_exists): + """Verify cloudlinux 8 read from os-release.""" + m_os_release.return_value = OS_RELEASE_CLOUDLINUX_8 + m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists + dist = util.get_linux_distro() + self.assertEqual(("cloudlinux", "8.4", "Valery Rozhdestvensky"), dist) + + @mock.patch("cloudinit.util.load_file") + def test_get_linux_debian(self, m_os_release, m_path_exists): + """Verify we get the correct name and release name on Debian.""" + m_os_release.return_value = OS_RELEASE_DEBIAN + m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists + dist = util.get_linux_distro() + self.assertEqual(("debian", "9", "stretch"), dist) + + @mock.patch("cloudinit.util.load_file") + def test_get_linux_openeuler(self, m_os_release, m_path_exists): + """Verify get the correct name and release name on Openeuler.""" + m_os_release.return_value = OS_RELEASE_OPENEULER_20 + m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists + dist = util.get_linux_distro() + self.assertEqual(("openEuler", "20.03", "LTS-SP2"), dist) + + @mock.patch("cloudinit.util.load_file") + def test_get_linux_opensuse(self, m_os_release, m_path_exists): + """Verify we get the correct name and machine arch on openSUSE + prior to openSUSE Leap 15. + """ + m_os_release.return_value = OS_RELEASE_OPENSUSE + m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists + dist = util.get_linux_distro() + self.assertEqual(("opensuse", "42.3", platform.machine()), dist) + + @mock.patch("cloudinit.util.load_file") + def test_get_linux_opensuse_l15(self, m_os_release, m_path_exists): + """Verify we get the correct name and machine arch on openSUSE + for openSUSE Leap 15.0 and later. + """ + m_os_release.return_value = OS_RELEASE_OPENSUSE_L15 + m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists + dist = util.get_linux_distro() + self.assertEqual(("opensuse-leap", "15.0", platform.machine()), dist) + + @mock.patch("cloudinit.util.load_file") + def test_get_linux_opensuse_tw(self, m_os_release, m_path_exists): + """Verify we get the correct name and machine arch on openSUSE + for openSUSE Tumbleweed + """ + m_os_release.return_value = OS_RELEASE_OPENSUSE_TW + m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists + dist = util.get_linux_distro() + self.assertEqual( + ("opensuse-tumbleweed", "20180920", platform.machine()), dist + ) + + @mock.patch("cloudinit.util.load_file") + def test_get_linux_photon_os_release(self, m_os_release, m_path_exists): + """Verify we get the correct name and machine arch on PhotonOS""" + m_os_release.return_value = OS_RELEASE_PHOTON + m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists + dist = util.get_linux_distro() + self.assertEqual(("photon", "4.0", "VMware Photon OS/Linux"), dist) + + @mock.patch("platform.system") + @mock.patch("platform.dist", create=True) + def test_get_linux_distro_no_data( + self, m_platform_dist, m_platform_system, m_path_exists + ): + """Verify we get no information if os-release does not exist""" + m_platform_dist.return_value = ("", "", "") + m_platform_system.return_value = "Linux" + m_path_exists.return_value = 0 + dist = util.get_linux_distro() + self.assertEqual(("", "", ""), dist) + + @mock.patch("platform.system") + @mock.patch("platform.dist", create=True) + def test_get_linux_distro_no_impl( + self, m_platform_dist, m_platform_system, m_path_exists + ): + """Verify we get an empty tuple when no information exists and + Exceptions are not propagated""" + m_platform_dist.side_effect = Exception() + m_platform_system.return_value = "Linux" + m_path_exists.return_value = 0 + dist = util.get_linux_distro() + self.assertEqual(("", "", ""), dist) + + @mock.patch("platform.system") + @mock.patch("platform.dist", create=True) + def test_get_linux_distro_plat_data( + self, m_platform_dist, m_platform_system, m_path_exists + ): + """Verify we get the correct platform information""" + m_platform_dist.return_value = ("foo", "1.1", "aarch64") + m_platform_system.return_value = "Linux" + m_path_exists.return_value = 0 + dist = util.get_linux_distro() + self.assertEqual(("foo", "1.1", "aarch64"), dist) + + +class TestGetVariant: + @pytest.mark.parametrize( + "info, expected_variant", + [ + ({"system": "Linux", "dist": ("almalinux",)}, "almalinux"), + ({"system": "linux", "dist": ("alpine",)}, "alpine"), + ({"system": "linux", "dist": ("arch",)}, "arch"), + ({"system": "linux", "dist": ("centos",)}, "centos"), + ({"system": "linux", "dist": ("cloudlinux",)}, "cloudlinux"), + ({"system": "linux", "dist": ("debian",)}, "debian"), + ({"system": "linux", "dist": ("eurolinux",)}, "eurolinux"), + ({"system": "linux", "dist": ("fedora",)}, "fedora"), + ({"system": "linux", "dist": ("openEuler",)}, "openeuler"), + ({"system": "linux", "dist": ("photon",)}, "photon"), + ({"system": "linux", "dist": ("rhel",)}, "rhel"), + ({"system": "linux", "dist": ("rocky",)}, "rocky"), + ({"system": "linux", "dist": ("suse",)}, "suse"), + ({"system": "linux", "dist": ("virtuozzo",)}, "virtuozzo"), + ({"system": "linux", "dist": ("ubuntu",)}, "ubuntu"), + ({"system": "linux", "dist": ("linuxmint",)}, "ubuntu"), + ({"system": "linux", "dist": ("mint",)}, "ubuntu"), + ({"system": "linux", "dist": ("redhat",)}, "rhel"), + ({"system": "linux", "dist": ("opensuse",)}, "suse"), + ({"system": "linux", "dist": ("opensuse-tumbleweed",)}, "suse"), + ({"system": "linux", "dist": ("opensuse-leap",)}, "suse"), + ({"system": "linux", "dist": ("sles",)}, "suse"), + ({"system": "linux", "dist": ("sle_hpc",)}, "suse"), + ({"system": "linux", "dist": ("my_distro",)}, "linux"), + ({"system": "Windows", "dist": ("dontcare",)}, "windows"), + ({"system": "Darwin", "dist": ("dontcare",)}, "darwin"), + ({"system": "Freebsd", "dist": ("dontcare",)}, "freebsd"), + ({"system": "Netbsd", "dist": ("dontcare",)}, "netbsd"), + ({"system": "Openbsd", "dist": ("dontcare",)}, "openbsd"), + ({"system": "Dragonfly", "dist": ("dontcare",)}, "dragonfly"), + ], + ) + def test_get_variant(self, info, expected_variant): + """Verify we get the correct variant name""" + assert util._get_variant(info) == expected_variant + + +class TestJsonDumps(CiTestCase): + def test_is_str(self): + """json_dumps should return a string.""" + self.assertTrue(isinstance(util.json_dumps({"abc": "123"}), str)) + + def test_utf8(self): + smiley = "\\ud83d\\ude03" + self.assertEqual( + {"smiley": smiley}, json.loads(util.json_dumps({"smiley": smiley})) + ) + + def test_non_utf8(self): + blob = b"\xba\x03Qx-#y\xea" + self.assertEqual( + {"blob": "ci-b64:" + base64.b64encode(blob).decode("utf-8")}, + json.loads(util.json_dumps({"blob": blob})), + ) + + +@mock.patch("os.path.exists") +class TestIsLXD(CiTestCase): + def test_is_lxd_true_on_sock_device(self, m_exists): + """When lxd's /dev/lxd/sock exists, is_lxd returns true.""" + m_exists.return_value = True + self.assertTrue(util.is_lxd()) + m_exists.assert_called_once_with("/dev/lxd/sock") + + def test_is_lxd_false_when_sock_device_absent(self, m_exists): + """When lxd's /dev/lxd/sock is absent, is_lxd returns false.""" + m_exists.return_value = False + self.assertFalse(util.is_lxd()) + m_exists.assert_called_once_with("/dev/lxd/sock") + + +class TestReadCcFromCmdline: + if hasattr(pytest, "param"): + random_string = pytest.param( + CiTestCase.random_string(), None, id="random_string" + ) + else: + random_string = (CiTestCase.random_string(), None) + + @pytest.mark.parametrize( + "cmdline,expected_cfg", + [ + # Return None if cmdline has no cc:<YAML>end_cc content. + random_string, + # Return None if YAML content is empty string. + ("foo cc: end_cc bar", None), + # Return expected dictionary without trailing end_cc marker. + ("foo cc: ssh_pwauth: true", {"ssh_pwauth": True}), + # Return expected dictionary w escaped newline and no end_cc. + ("foo cc: ssh_pwauth: true\\n", {"ssh_pwauth": True}), + # Return expected dictionary of yaml between cc: and end_cc. + ("foo cc: ssh_pwauth: true end_cc bar", {"ssh_pwauth": True}), + # Return dict with list value w escaped newline, no end_cc. + ( + "cc: ssh_import_id: [smoser, kirkland]\\n", + {"ssh_import_id": ["smoser", "kirkland"]}, + ), + # Parse urlencoded brackets in yaml content. + ( + "cc: ssh_import_id: %5Bsmoser, kirkland%5D end_cc", + {"ssh_import_id": ["smoser", "kirkland"]}, + ), + # Parse complete urlencoded yaml content. + ( + "cc: ssh_import_id%3A%20%5Buser1%2C%20user2%5D end_cc", + {"ssh_import_id": ["user1", "user2"]}, + ), + # Parse nested dictionary in yaml content. + ( + "cc: ntp: {enabled: true, ntp_client: myclient} end_cc", + {"ntp": {"enabled": True, "ntp_client": "myclient"}}, + ), + # Parse single mapping value in yaml content. + ("cc: ssh_import_id: smoser end_cc", {"ssh_import_id": "smoser"}), + # Parse multiline content with multiple mapping and nested lists. + ( + "cc: ssh_import_id: [smoser, bob]\\n" + "runcmd: [ [ ls, -l ], echo hi ] end_cc", + { + "ssh_import_id": ["smoser", "bob"], + "runcmd": [["ls", "-l"], "echo hi"], + }, + ), + # Parse multiline encoded content w/ mappings and nested lists. + ( + "cc: ssh_import_id: %5Bsmoser, bob%5D\\n" + "runcmd: [ [ ls, -l ], echo hi ] end_cc", + { + "ssh_import_id": ["smoser", "bob"], + "runcmd": [["ls", "-l"], "echo hi"], + }, + ), + # test encoded escaped newlines work. + # + # unquote(encoded_content) + # 'ssh_import_id: [smoser, bob]\\nruncmd: [ [ ls, -l ], echo hi ]' + ( + ( + "cc: " + "ssh_import_id%3A%20%5Bsmoser%2C%20bob%5D%5Cn" + "runcmd%3A%20%5B%20%5B%20ls%2C%20-l%20%5D%2C" + "%20echo%20hi%20%5D" + " end_cc" + ), + { + "ssh_import_id": ["smoser", "bob"], + "runcmd": [["ls", "-l"], "echo hi"], + }, + ), + # test encoded newlines work. + # + # unquote(encoded_content) + # 'ssh_import_id: [smoser, bob]\nruncmd: [ [ ls, -l ], echo hi ]' + ( + ( + "cc: " + "ssh_import_id%3A%20%5Bsmoser%2C%20bob%5D%0A" + "runcmd%3A%20%5B%20%5B%20ls%2C%20-l%20%5D%2C" + "%20echo%20hi%20%5D" + " end_cc" + ), + { + "ssh_import_id": ["smoser", "bob"], + "runcmd": [["ls", "-l"], "echo hi"], + }, + ), + # Parse and merge multiple yaml content sections. + ( + "cc:ssh_import_id: [smoser, bob] end_cc " + "cc: runcmd: [ [ ls, -l ] ] end_cc", + {"ssh_import_id": ["smoser", "bob"], "runcmd": [["ls", "-l"]]}, + ), + # Parse and merge multiple encoded yaml content sections. + ( + "cc:ssh_import_id%3A%20%5Bsmoser%5D end_cc " + "cc:runcmd%3A%20%5B%20%5B%20ls%2C%20-l%20%5D%20%5D end_cc", + {"ssh_import_id": ["smoser"], "runcmd": [["ls", "-l"]]}, + ), + ], + ) + def test_read_conf_from_cmdline_config(self, expected_cfg, cmdline): + assert expected_cfg == util.read_conf_from_cmdline(cmdline=cmdline) + + +class TestMountCb: + """Tests for ``util.mount_cb``. + + These tests consider the "unit" under test to be ``util.mount_cb`` and + ``util.unmounter``, which is only used by ``mount_cb``. + + TODO: Test default mtype determination + TODO: Test the if/else branch that actually performs the mounting operation + """ + + @pytest.fixture + def already_mounted_device_and_mountdict(self): + """Mock an already-mounted device, and yield (device, mount dict)""" + device = "/dev/fake0" + mountpoint = "/mnt/fake" + with mock.patch("cloudinit.util.subp.subp"): + with mock.patch("cloudinit.util.mounts") as m_mounts: + mounts = {device: {"mountpoint": mountpoint}} + m_mounts.return_value = mounts + yield device, mounts[device] + + @pytest.fixture + def already_mounted_device(self, already_mounted_device_and_mountdict): + """already_mounted_device_and_mountdict, but return only the device""" + return already_mounted_device_and_mountdict[0] + + @pytest.mark.parametrize( + "mtype,expected", + [ + # While the filesystem is called iso9660, the mount type is cd9660 + ("iso9660", "cd9660"), + # vfat is generally called "msdos" on BSD + ("vfat", "msdos"), + # judging from man pages, only FreeBSD has this alias + ("msdosfs", "msdos"), + # Test happy path + ("ufs", "ufs"), + ], + ) + @mock.patch("cloudinit.util.is_Linux", autospec=True) + @mock.patch("cloudinit.util.is_BSD", autospec=True) + @mock.patch("cloudinit.util.subp.subp") + @mock.patch("cloudinit.temp_utils.tempdir", autospec=True) + def test_normalize_mtype_on_bsd( + self, m_tmpdir, m_subp, m_is_BSD, m_is_Linux, mtype, expected + ): + m_is_BSD.return_value = True + m_is_Linux.return_value = False + m_tmpdir.return_value.__enter__ = mock.Mock( + autospec=True, return_value="/tmp/fake" + ) + m_tmpdir.return_value.__exit__ = mock.Mock( + autospec=True, return_value=True + ) + callback = mock.Mock(autospec=True) + + util.mount_cb("/dev/fake0", callback, mtype=mtype) + assert ( + mock.call( + [ + "mount", + "-o", + "ro", + "-t", + expected, + "/dev/fake0", + "/tmp/fake", + ], + update_env=None, + ) + in m_subp.call_args_list + ) + + @pytest.mark.parametrize("invalid_mtype", [int(0), float(0.0), dict()]) + def test_typeerror_raised_for_invalid_mtype(self, invalid_mtype): + with pytest.raises(TypeError): + util.mount_cb(mock.Mock(), mock.Mock(), mtype=invalid_mtype) + + @mock.patch("cloudinit.util.subp.subp") + def test_already_mounted_does_not_mount_or_umount_anything( + self, m_subp, already_mounted_device + ): + util.mount_cb(already_mounted_device, mock.Mock()) + + assert 0 == m_subp.call_count + + @pytest.mark.parametrize("trailing_slash_in_mounts", ["/", ""]) + def test_already_mounted_calls_callback( + self, trailing_slash_in_mounts, already_mounted_device_and_mountdict + ): + device, mount_dict = already_mounted_device_and_mountdict + mountpoint = mount_dict["mountpoint"] + mount_dict["mountpoint"] += trailing_slash_in_mounts + + callback = mock.Mock() + util.mount_cb(device, callback) + + # The mountpoint passed to callback should always have a trailing + # slash, regardless of the input + assert [mock.call(mountpoint + "/")] == callback.call_args_list + + def test_already_mounted_calls_callback_with_data( + self, already_mounted_device + ): + callback = mock.Mock() + util.mount_cb( + already_mounted_device, callback, data=mock.sentinel.data + ) + + assert [ + mock.call(mock.ANY, mock.sentinel.data) + ] == callback.call_args_list + + +@mock.patch("cloudinit.util.write_file") +class TestEnsureFile: + """Tests for ``cloudinit.util.ensure_file``.""" + + def test_parameters_passed_through(self, m_write_file): + """Test the parameters in the signature are passed to write_file.""" + util.ensure_file( + mock.sentinel.path, + mode=mock.sentinel.mode, + preserve_mode=mock.sentinel.preserve_mode, + ) + + assert 1 == m_write_file.call_count + args, kwargs = m_write_file.call_args + assert (mock.sentinel.path,) == args + assert mock.sentinel.mode == kwargs["mode"] + assert mock.sentinel.preserve_mode == kwargs["preserve_mode"] + + @pytest.mark.parametrize( + "kwarg,expected", + [ + # Files should be world-readable by default + ("mode", 0o644), + # The previous behaviour of not preserving mode should be retained + ("preserve_mode", False), + ], + ) + def test_defaults(self, m_write_file, kwarg, expected): + """Test that ensure_file defaults appropriately.""" + util.ensure_file(mock.sentinel.path) + + assert 1 == m_write_file.call_count + _args, kwargs = m_write_file.call_args + assert expected == kwargs[kwarg] + + def test_static_parameters_are_passed(self, m_write_file): + """Test that the static write_files parameters are passed correctly.""" + util.ensure_file(mock.sentinel.path) + + assert 1 == m_write_file.call_count + _args, kwargs = m_write_file.call_args + assert "" == kwargs["content"] + assert "ab" == kwargs["omode"] + + +@mock.patch("cloudinit.util.grp.getgrnam") +@mock.patch("cloudinit.util.os.setgid") +@mock.patch("cloudinit.util.os.umask") +class TestRedirectOutputPreexecFn: + """This tests specifically the preexec_fn used in redirect_output.""" + + @pytest.fixture(params=["outfmt", "errfmt"]) + def preexec_fn(self, request): + """A fixture to gather the preexec_fn used by redirect_output. + + This enables simpler direct testing of it, and parameterises any tests + using it to cover both the stdout and stderr code paths. + """ + test_string = "| piped output to invoke subprocess" + if request.param == "outfmt": + args = (test_string, None) + elif request.param == "errfmt": + args = (None, test_string) + with mock.patch("cloudinit.util.subprocess.Popen") as m_popen: + util.redirect_output(*args) + + assert 1 == m_popen.call_count + _args, kwargs = m_popen.call_args + assert "preexec_fn" in kwargs, "preexec_fn not passed to Popen" + return kwargs["preexec_fn"] + + def test_preexec_fn_sets_umask( + self, m_os_umask, _m_setgid, _m_getgrnam, preexec_fn + ): + """preexec_fn should set a mask that avoids world-readable files.""" + preexec_fn() + + assert [mock.call(0o037)] == m_os_umask.call_args_list + + def test_preexec_fn_sets_group_id_if_adm_group_present( + self, _m_os_umask, m_setgid, m_getgrnam, preexec_fn + ): + """We should setgrp to adm if present, so files are owned by them.""" + fake_group = mock.Mock(gr_gid=mock.sentinel.gr_gid) + m_getgrnam.return_value = fake_group + + preexec_fn() + + assert [mock.call("adm")] == m_getgrnam.call_args_list + assert [mock.call(mock.sentinel.gr_gid)] == m_setgid.call_args_list + + def test_preexec_fn_handles_absent_adm_group_gracefully( + self, _m_os_umask, m_setgid, m_getgrnam, preexec_fn + ): + """We should handle an absent adm group gracefully.""" + m_getgrnam.side_effect = KeyError("getgrnam(): name not found: 'adm'") + + preexec_fn() + + assert 0 == m_setgid.call_count + + +class FakeSelinux(object): def __init__(self, match_what): self.match_what = match_what self.restored = [] @@ -141,7 +1457,7 @@ class TestWriteFile(helpers.TestCase): path = os.path.join(self.tmp, "NewFile.txt") contents = "Hey there" - open(path, 'w').close() + open(path, "w").close() os.chmod(path, 0o666) util.write_file(path, contents, preserve_mode=True) @@ -175,15 +1491,16 @@ class TestWriteFile(helpers.TestCase): fake_se = FakeSelinux(my_file) - with mock.patch.object(importer, 'import_module', - return_value=fake_se) as mockobj: + with mock.patch.object( + importer, "import_module", return_value=fake_se + ) as mockobj: with util.SeLinuxGuard(my_file) as is_on: self.assertTrue(is_on) self.assertEqual(1, len(fake_se.restored)) self.assertEqual(my_file, fake_se.restored[0]) - mockobj.assert_called_once_with('selinux') + mockobj.assert_called_once_with("selinux") class TestDeleteDirContents(helpers.TestCase): @@ -254,15 +1571,16 @@ class TestDeleteDirContents(helpers.TestCase): class TestKeyValStrings(helpers.TestCase): def test_keyval_str_to_dict(self): - expected = {'1': 'one', '2': 'one+one', 'ro': True} + expected = {"1": "one", "2": "one+one", "ro": True} cmdline = "1=one ro 2=one+one" self.assertEqual(expected, util.keyval_str_to_dict(cmdline)) class TestGetCmdline(helpers.TestCase): def test_cmdline_reads_debug_env(self): - with mock.patch.dict("os.environ", - values={'DEBUG_PROC_CMDLINE': 'abcd 123'}): + with mock.patch.dict( + "os.environ", values={"DEBUG_PROC_CMDLINE": "abcd 123"} + ): ret = util.get_cmdline() self.assertEqual("abcd 123", ret) @@ -272,59 +1590,75 @@ class TestLoadYaml(helpers.CiTestCase): with_logs = True def test_simple(self): - mydata = {'1': "one", '2': "two"} + mydata = {"1": "one", "2": "two"} self.assertEqual(util.load_yaml(yaml.dump(mydata)), mydata) def test_nonallowed_returns_default(self): - '''Any unallowed types result in returning default; log the issue.''' + """Any unallowed types result in returning default; log the issue.""" # for now, anything not in the allowed list just returns the default. - myyaml = yaml.dump({'1': "one"}) - self.assertEqual(util.load_yaml(blob=myyaml, - default=self.mydefault, - allowed=(str,)), - self.mydefault) + myyaml = yaml.dump({"1": "one"}) + self.assertEqual( + util.load_yaml( + blob=myyaml, default=self.mydefault, allowed=(str,) + ), + self.mydefault, + ) regex = re.compile( - r'Yaml load allows \(<(class|type) \'str\'>,\) root types, but' - r' got dict') - self.assertTrue(regex.search(self.logs.getvalue()), - msg='Missing expected yaml load error') + r"Yaml load allows \(<(class|type) \'str\'>,\) root types, but" + r" got dict" + ) + self.assertTrue( + regex.search(self.logs.getvalue()), + msg="Missing expected yaml load error", + ) def test_bogus_scan_error_returns_default(self): - '''On Yaml scan error, load_yaml returns the default and logs issue.''' + """On Yaml scan error, load_yaml returns the default and logs issue.""" badyaml = "1\n 2:" - self.assertEqual(util.load_yaml(blob=badyaml, - default=self.mydefault), - self.mydefault) + self.assertEqual( + util.load_yaml(blob=badyaml, default=self.mydefault), + self.mydefault, + ) self.assertIn( - 'Failed loading yaml blob. Invalid format at line 2 column 3:' + "Failed loading yaml blob. Invalid format at line 2 column 3:" ' "mapping values are not allowed here', - self.logs.getvalue()) + self.logs.getvalue(), + ) def test_bogus_parse_error_returns_default(self): - '''On Yaml parse error, load_yaml returns default and logs issue.''' + """On Yaml parse error, load_yaml returns default and logs issue.""" badyaml = "{}}" - self.assertEqual(util.load_yaml(blob=badyaml, - default=self.mydefault), - self.mydefault) + self.assertEqual( + util.load_yaml(blob=badyaml, default=self.mydefault), + self.mydefault, + ) self.assertIn( - 'Failed loading yaml blob. Invalid format at line 1 column 3:' - " \"expected \'<document start>\', but found \'}\'", - self.logs.getvalue()) + "Failed loading yaml blob. Invalid format at line 1 column 3:" + " \"expected '<document start>', but found '}'", + self.logs.getvalue(), + ) def test_unsafe_types(self): # should not load complex types - unsafe_yaml = yaml.dump((1, 2, 3,)) - self.assertEqual(util.load_yaml(blob=unsafe_yaml, - default=self.mydefault), - self.mydefault) + unsafe_yaml = yaml.dump( + ( + 1, + 2, + 3, + ) + ) + self.assertEqual( + util.load_yaml(blob=unsafe_yaml, default=self.mydefault), + self.mydefault, + ) def test_python_unicode(self): # complex type of python/unicode is explicitly allowed - myobj = {'1': "FOOBAR"} + myobj = {"1": "FOOBAR"} safe_yaml = yaml.dump(myobj) - self.assertEqual(util.load_yaml(blob=safe_yaml, - default=self.mydefault), - myobj) + self.assertEqual( + util.load_yaml(blob=safe_yaml, default=self.mydefault), myobj + ) def test_none_returns_default(self): """If yaml.load returns None, then default should be returned.""" @@ -332,168 +1666,177 @@ class TestLoadYaml(helpers.CiTestCase): mdef = self.mydefault self.assertEqual( [(b, self.mydefault) for b in blobs], - [(b, util.load_yaml(blob=b, default=mdef)) for b in blobs]) + [(b, util.load_yaml(blob=b, default=mdef)) for b in blobs], + ) class TestMountinfoParsing(helpers.ResourceUsingTestCase): def test_invalid_mountinfo(self): - line = ("20 1 252:1 / / rw,relatime - ext4 /dev/mapper/vg0-root" - "rw,errors=remount-ro,data=ordered") + line = ( + "20 1 252:1 / / rw,relatime - ext4 /dev/mapper/vg0-root" + "rw,errors=remount-ro,data=ordered" + ) elements = line.split() for i in range(len(elements) + 1): - lines = [' '.join(elements[0:i])] + lines = [" ".join(elements[0:i])] if i < 10: expected = None else: - expected = ('/dev/mapper/vg0-root', 'ext4', '/') - self.assertEqual(expected, util.parse_mount_info('/', lines)) + expected = ("/dev/mapper/vg0-root", "ext4", "/") + self.assertEqual(expected, util.parse_mount_info("/", lines)) def test_precise_ext4_root(self): - lines = helpers.readResource('mountinfo_precise_ext4.txt').splitlines() + lines = helpers.readResource("mountinfo_precise_ext4.txt").splitlines() - expected = ('/dev/mapper/vg0-root', 'ext4', '/') - self.assertEqual(expected, util.parse_mount_info('/', lines)) - self.assertEqual(expected, util.parse_mount_info('/usr', lines)) - self.assertEqual(expected, util.parse_mount_info('/usr/bin', lines)) + expected = ("/dev/mapper/vg0-root", "ext4", "/") + self.assertEqual(expected, util.parse_mount_info("/", lines)) + self.assertEqual(expected, util.parse_mount_info("/usr", lines)) + self.assertEqual(expected, util.parse_mount_info("/usr/bin", lines)) - expected = ('/dev/md0', 'ext4', '/boot') - self.assertEqual(expected, util.parse_mount_info('/boot', lines)) - self.assertEqual(expected, util.parse_mount_info('/boot/grub', lines)) + expected = ("/dev/md0", "ext4", "/boot") + self.assertEqual(expected, util.parse_mount_info("/boot", lines)) + self.assertEqual(expected, util.parse_mount_info("/boot/grub", lines)) - expected = ('/dev/mapper/vg0-root', 'ext4', '/') - self.assertEqual(expected, util.parse_mount_info('/home', lines)) - self.assertEqual(expected, util.parse_mount_info('/home/me', lines)) + expected = ("/dev/mapper/vg0-root", "ext4", "/") + self.assertEqual(expected, util.parse_mount_info("/home", lines)) + self.assertEqual(expected, util.parse_mount_info("/home/me", lines)) - expected = ('tmpfs', 'tmpfs', '/run') - self.assertEqual(expected, util.parse_mount_info('/run', lines)) + expected = ("tmpfs", "tmpfs", "/run") + self.assertEqual(expected, util.parse_mount_info("/run", lines)) - expected = ('none', 'tmpfs', '/run/lock') - self.assertEqual(expected, util.parse_mount_info('/run/lock', lines)) + expected = ("none", "tmpfs", "/run/lock") + self.assertEqual(expected, util.parse_mount_info("/run/lock", lines)) def test_raring_btrfs_root(self): - lines = helpers.readResource('mountinfo_raring_btrfs.txt').splitlines() + lines = helpers.readResource("mountinfo_raring_btrfs.txt").splitlines() - expected = ('/dev/vda1', 'btrfs', '/') - self.assertEqual(expected, util.parse_mount_info('/', lines)) - self.assertEqual(expected, util.parse_mount_info('/usr', lines)) - self.assertEqual(expected, util.parse_mount_info('/usr/bin', lines)) - self.assertEqual(expected, util.parse_mount_info('/boot', lines)) - self.assertEqual(expected, util.parse_mount_info('/boot/grub', lines)) + expected = ("/dev/vda1", "btrfs", "/") + self.assertEqual(expected, util.parse_mount_info("/", lines)) + self.assertEqual(expected, util.parse_mount_info("/usr", lines)) + self.assertEqual(expected, util.parse_mount_info("/usr/bin", lines)) + self.assertEqual(expected, util.parse_mount_info("/boot", lines)) + self.assertEqual(expected, util.parse_mount_info("/boot/grub", lines)) - expected = ('/dev/vda1', 'btrfs', '/home') - self.assertEqual(expected, util.parse_mount_info('/home', lines)) - self.assertEqual(expected, util.parse_mount_info('/home/me', lines)) + expected = ("/dev/vda1", "btrfs", "/home") + self.assertEqual(expected, util.parse_mount_info("/home", lines)) + self.assertEqual(expected, util.parse_mount_info("/home/me", lines)) - expected = ('tmpfs', 'tmpfs', '/run') - self.assertEqual(expected, util.parse_mount_info('/run', lines)) + expected = ("tmpfs", "tmpfs", "/run") + self.assertEqual(expected, util.parse_mount_info("/run", lines)) - expected = ('none', 'tmpfs', '/run/lock') - self.assertEqual(expected, util.parse_mount_info('/run/lock', lines)) + expected = ("none", "tmpfs", "/run/lock") + self.assertEqual(expected, util.parse_mount_info("/run/lock", lines)) - @mock.patch('cloudinit.util.os') - @mock.patch('cloudinit.subp.subp') + @mock.patch("cloudinit.util.os") + @mock.patch("cloudinit.subp.subp") def test_get_device_info_from_zpool(self, zpool_output, m_os): # mock /dev/zfs exists m_os.path.exists.return_value = True # mock subp command from util.get_mount_info_fs_on_zpool zpool_output.return_value = ( - helpers.readResource('zpool_status_simple.txt'), '' + helpers.readResource("zpool_status_simple.txt"), + "", ) # save function return values and do asserts - ret = util.get_device_info_from_zpool('vmzroot') - self.assertEqual('gpt/system', ret) + ret = util.get_device_info_from_zpool("vmzroot") + self.assertEqual("gpt/system", ret) self.assertIsNotNone(ret) - m_os.path.exists.assert_called_with('/dev/zfs') + m_os.path.exists.assert_called_with("/dev/zfs") - @mock.patch('cloudinit.util.os') + @mock.patch("cloudinit.util.os") def test_get_device_info_from_zpool_no_dev_zfs(self, m_os): # mock /dev/zfs missing m_os.path.exists.return_value = False # save function return values and do asserts - ret = util.get_device_info_from_zpool('vmzroot') + ret = util.get_device_info_from_zpool("vmzroot") self.assertIsNone(ret) - @mock.patch('cloudinit.util.os') - @mock.patch('cloudinit.subp.subp') + @mock.patch("cloudinit.util.os") + @mock.patch("cloudinit.subp.subp") def test_get_device_info_from_zpool_handles_no_zpool(self, m_sub, m_os): """Handle case where there is no zpool command""" # mock /dev/zfs exists m_os.path.exists.return_value = True m_sub.side_effect = subp.ProcessExecutionError("No zpool cmd") - ret = util.get_device_info_from_zpool('vmzroot') + ret = util.get_device_info_from_zpool("vmzroot") self.assertIsNone(ret) - @mock.patch('cloudinit.util.os') - @mock.patch('cloudinit.subp.subp') + @mock.patch("cloudinit.util.os") + @mock.patch("cloudinit.subp.subp") def test_get_device_info_from_zpool_on_error(self, zpool_output, m_os): # mock /dev/zfs exists m_os.path.exists.return_value = True # mock subp command from util.get_mount_info_fs_on_zpool zpool_output.return_value = ( - helpers.readResource('zpool_status_simple.txt'), 'error' + helpers.readResource("zpool_status_simple.txt"), + "error", ) # save function return values and do asserts - ret = util.get_device_info_from_zpool('vmzroot') + ret = util.get_device_info_from_zpool("vmzroot") self.assertIsNone(ret) - @mock.patch('cloudinit.subp.subp') + @mock.patch("cloudinit.subp.subp") def test_parse_mount_with_ext(self, mount_out): mount_out.return_value = ( - helpers.readResource('mount_parse_ext.txt'), '') + helpers.readResource("mount_parse_ext.txt"), + "", + ) # this one is valid and exists in mount_parse_ext.txt - ret = util.parse_mount('/var') - self.assertEqual(('/dev/mapper/vg00-lv_var', 'ext4', '/var'), ret) + ret = util.parse_mount("/var") + self.assertEqual(("/dev/mapper/vg00-lv_var", "ext4", "/var"), ret) # another one that is valid and exists - ret = util.parse_mount('/') - self.assertEqual(('/dev/mapper/vg00-lv_root', 'ext4', '/'), ret) + ret = util.parse_mount("/") + self.assertEqual(("/dev/mapper/vg00-lv_root", "ext4", "/"), ret) # this one exists in mount_parse_ext.txt - ret = util.parse_mount('/sys/kernel/debug') + ret = util.parse_mount("/sys/kernel/debug") self.assertIsNone(ret) # this one does not even exist in mount_parse_ext.txt - ret = util.parse_mount('/not/existing/mount') + ret = util.parse_mount("/not/existing/mount") self.assertIsNone(ret) - @mock.patch('cloudinit.subp.subp') + @mock.patch("cloudinit.subp.subp") def test_parse_mount_with_zfs(self, mount_out): mount_out.return_value = ( - helpers.readResource('mount_parse_zfs.txt'), '') + helpers.readResource("mount_parse_zfs.txt"), + "", + ) # this one is valid and exists in mount_parse_zfs.txt - ret = util.parse_mount('/var') - self.assertEqual(('vmzroot/ROOT/freebsd/var', 'zfs', '/var'), ret) + ret = util.parse_mount("/var") + self.assertEqual(("vmzroot/ROOT/freebsd/var", "zfs", "/var"), ret) # this one is the root, valid and also exists in mount_parse_zfs.txt - ret = util.parse_mount('/') - self.assertEqual(('vmzroot/ROOT/freebsd', 'zfs', '/'), ret) + ret = util.parse_mount("/") + self.assertEqual(("vmzroot/ROOT/freebsd", "zfs", "/"), ret) # this one does not even exist in mount_parse_ext.txt - ret = util.parse_mount('/not/existing/mount') + ret = util.parse_mount("/not/existing/mount") self.assertIsNone(ret) class TestIsX86(helpers.CiTestCase): - def test_is_x86_matches_x86_types(self): """is_x86 returns True if CPU architecture matches.""" - matched_arches = ['x86_64', 'i386', 'i586', 'i686'] + matched_arches = ["x86_64", "i386", "i586", "i686"] for arch in matched_arches: self.assertTrue( - util.is_x86(arch), 'Expected is_x86 for arch "%s"' % arch) + util.is_x86(arch), 'Expected is_x86 for arch "%s"' % arch + ) def test_is_x86_unmatched_types(self): """is_x86 returns Fale on non-intel x86 architectures.""" - unmatched_arches = ['ia64', '9000/800', 'arm64v71'] + unmatched_arches = ["ia64", "9000/800", "arm64v71"] for arch in unmatched_arches: self.assertFalse( - util.is_x86(arch), 'Expected not is_x86 for arch "%s"' % arch) + util.is_x86(arch), 'Expected not is_x86 for arch "%s"' % arch + ) - @mock.patch('cloudinit.util.os.uname') + @mock.patch("cloudinit.util.os.uname") def test_is_x86_calls_uname_for_architecture(self, m_uname): """is_x86 returns True if platform from uname matches.""" - m_uname.return_value = [0, 1, 2, 3, 'x86_64'] + m_uname.return_value = [0, 1, 2, 3, "x86_64"] self.assertTrue(util.is_x86()) class TestGetConfigLogfiles(helpers.CiTestCase): - def test_empty_cfg_returns_empty_list(self): """An empty config passed to get_config_logfiles returns empty list.""" self.assertEqual([], util.get_config_logfiles(None)) @@ -502,39 +1845,56 @@ class TestGetConfigLogfiles(helpers.CiTestCase): def test_default_log_file_present(self): """When default_log_file is set get_config_logfiles finds it.""" self.assertEqual( - ['/my.log'], - util.get_config_logfiles({'def_log_file': '/my.log'})) + ["/my.log"], util.get_config_logfiles({"def_log_file": "/my.log"}) + ) def test_output_logs_parsed_when_teeing_files(self): """When output configuration is parsed when teeing files.""" self.assertEqual( - ['/himom.log', '/my.log'], - sorted(util.get_config_logfiles({ - 'def_log_file': '/my.log', - 'output': {'all': '|tee -a /himom.log'}}))) + ["/himom.log", "/my.log"], + sorted( + util.get_config_logfiles( + { + "def_log_file": "/my.log", + "output": {"all": "|tee -a /himom.log"}, + } + ) + ), + ) def test_output_logs_parsed_when_redirecting(self): """When output configuration is parsed when redirecting to a file.""" self.assertEqual( - ['/my.log', '/test.log'], - sorted(util.get_config_logfiles({ - 'def_log_file': '/my.log', - 'output': {'all': '>/test.log'}}))) + ["/my.log", "/test.log"], + sorted( + util.get_config_logfiles( + { + "def_log_file": "/my.log", + "output": {"all": ">/test.log"}, + } + ) + ), + ) def test_output_logs_parsed_when_appending(self): """When output configuration is parsed when appending to a file.""" self.assertEqual( - ['/my.log', '/test.log'], - sorted(util.get_config_logfiles({ - 'def_log_file': '/my.log', - 'output': {'all': '>> /test.log'}}))) + ["/my.log", "/test.log"], + sorted( + util.get_config_logfiles( + { + "def_log_file": "/my.log", + "output": {"all": ">> /test.log"}, + } + ) + ), + ) class TestMultiLog(helpers.FilesystemMockingTestCase): - def _createConsole(self, root): - os.mkdir(os.path.join(root, 'dev')) - open(os.path.join(root, 'dev', 'console'), 'a').close() + os.mkdir(os.path.join(root, "dev")) + open(os.path.join(root, "dev", "console"), "a").close() def setUp(self): super(TestMultiLog, self).setUp() @@ -548,60 +1908,64 @@ class TestMultiLog(helpers.FilesystemMockingTestCase): self.patchStdoutAndStderr(self.stdout, self.stderr) def test_stderr_used_by_default(self): - logged_string = 'test stderr output' + logged_string = "test stderr output" util.multi_log(logged_string) self.assertEqual(logged_string, self.stderr.getvalue()) def test_stderr_not_used_if_false(self): - util.multi_log('should not see this', stderr=False) - self.assertEqual('', self.stderr.getvalue()) + util.multi_log("should not see this", stderr=False) + self.assertEqual("", self.stderr.getvalue()) def test_logs_go_to_console_by_default(self): self._createConsole(self.root) - logged_string = 'something very important' + logged_string = "something very important" util.multi_log(logged_string) - self.assertEqual(logged_string, open('/dev/console').read()) + self.assertEqual(logged_string, open("/dev/console").read()) def test_logs_dont_go_to_stdout_if_console_exists(self): self._createConsole(self.root) - util.multi_log('something') - self.assertEqual('', self.stdout.getvalue()) + util.multi_log("something") + self.assertEqual("", self.stdout.getvalue()) def test_logs_go_to_stdout_if_console_does_not_exist(self): - logged_string = 'something very important' + logged_string = "something very important" util.multi_log(logged_string) self.assertEqual(logged_string, self.stdout.getvalue()) + def test_logs_dont_go_to_stdout_if_fallback_to_stdout_is_false(self): + util.multi_log("something", fallback_to_stdout=False) + self.assertEqual("", self.stdout.getvalue()) + def test_logs_go_to_log_if_given(self): log = mock.MagicMock() - logged_string = 'something very important' + logged_string = "something very important" util.multi_log(logged_string, log=log) - self.assertEqual([((mock.ANY, logged_string), {})], - log.log.call_args_list) + self.assertEqual( + [((mock.ANY, logged_string), {})], log.log.call_args_list + ) def test_newlines_stripped_from_log_call(self): log = mock.MagicMock() - expected_string = 'something very important' - util.multi_log('{0}\n'.format(expected_string), log=log) + expected_string = "something very important" + util.multi_log("{0}\n".format(expected_string), log=log) self.assertEqual((mock.ANY, expected_string), log.log.call_args[0]) def test_log_level_defaults_to_debug(self): log = mock.MagicMock() - util.multi_log('message', log=log) + util.multi_log("message", log=log) self.assertEqual((logging.DEBUG, mock.ANY), log.log.call_args[0]) def test_given_log_level_used(self): log = mock.MagicMock() log_level = mock.Mock() - util.multi_log('message', log=log, log_level=log_level) + util.multi_log("message", log=log, log_level=log_level) self.assertEqual((log_level, mock.ANY), log.log.call_args[0]) class TestMessageFromString(helpers.TestCase): - def test_unicode_not_messed_up(self): - roundtripped = util.message_from_string(u'\n').as_string() - self.assertNotIn('\x00', roundtripped) + roundtripped = util.message_from_string("\n").as_string() + self.assertNotIn("\x00", roundtripped) class TestReadSeeded(helpers.TestCase): @@ -614,12 +1978,13 @@ class TestReadSeeded(helpers.TestCase): ud = b"userdatablob" vd = b"vendordatablob" helpers.populate_dir( - self.tmp, {'meta-data': "key1: val1", 'user-data': ud, - 'vendor-data': vd}) + self.tmp, + {"meta-data": "key1: val1", "user-data": ud, "vendor-data": vd}, + ) sdir = self.tmp + os.path.sep (found_md, found_ud, found_vd) = util.read_seeded(sdir) - self.assertEqual(found_md, {'key1': 'val1'}) + self.assertEqual(found_md, {"key1": "val1"}) self.assertEqual(found_ud, ud) self.assertEqual(found_vd, vd) @@ -634,157 +1999,189 @@ class TestReadSeededWithoutVendorData(helpers.TestCase): ud = b"userdatablob" vd = None helpers.populate_dir( - self.tmp, {'meta-data': "key1: val1", 'user-data': ud}) + self.tmp, {"meta-data": "key1: val1", "user-data": ud} + ) sdir = self.tmp + os.path.sep (found_md, found_ud, found_vd) = util.read_seeded(sdir) - self.assertEqual(found_md, {'key1': 'val1'}) + self.assertEqual(found_md, {"key1": "val1"}) self.assertEqual(found_ud, ud) self.assertEqual(found_vd, vd) class TestEncode(helpers.TestCase): """Test the encoding functions""" + def test_decode_binary_plain_text_with_hex(self): - blob = 'BOOTABLE_FLAG=\x80init=/bin/systemd' + blob = "BOOTABLE_FLAG=\x80init=/bin/systemd" text = util.decode_binary(blob) self.assertEqual(text, blob) class TestProcessExecutionError(helpers.TestCase): - template = ('{description}\n' - 'Command: {cmd}\n' - 'Exit code: {exit_code}\n' - 'Reason: {reason}\n' - 'Stdout: {stdout}\n' - 'Stderr: {stderr}') - empty_attr = '-' - empty_description = 'Unexpected error while running command.' + template = ( + "{description}\n" + "Command: {cmd}\n" + "Exit code: {exit_code}\n" + "Reason: {reason}\n" + "Stdout: {stdout}\n" + "Stderr: {stderr}" + ) + empty_attr = "-" + empty_description = "Unexpected error while running command." def test_pexec_error_indent_text(self): error = subp.ProcessExecutionError() - msg = 'abc\ndef' - formatted = 'abc\n{0}def'.format(' ' * 4) + msg = "abc\ndef" + formatted = "abc\n{0}def".format(" " * 4) self.assertEqual(error._indent_text(msg, indent_level=4), formatted) - self.assertEqual(error._indent_text(msg.encode(), indent_level=4), - formatted.encode()) + self.assertEqual( + error._indent_text(msg.encode(), indent_level=4), + formatted.encode(), + ) self.assertIsInstance( - error._indent_text(msg.encode()), type(msg.encode())) + error._indent_text(msg.encode()), type(msg.encode()) + ) def test_pexec_error_type(self): self.assertIsInstance(subp.ProcessExecutionError(), IOError) def test_pexec_error_empty_msgs(self): error = subp.ProcessExecutionError() - self.assertTrue(all(attr == self.empty_attr for attr in - (error.stderr, error.stdout, error.reason))) + self.assertTrue( + all( + attr == self.empty_attr + for attr in (error.stderr, error.stdout, error.reason) + ) + ) self.assertEqual(error.description, self.empty_description) - self.assertEqual(str(error), self.template.format( - description=self.empty_description, exit_code=self.empty_attr, - reason=self.empty_attr, stdout=self.empty_attr, - stderr=self.empty_attr, cmd=self.empty_attr)) + self.assertEqual( + str(error), + self.template.format( + description=self.empty_description, + exit_code=self.empty_attr, + reason=self.empty_attr, + stdout=self.empty_attr, + stderr=self.empty_attr, + cmd=self.empty_attr, + ), + ) def test_pexec_error_single_line_msgs(self): - stdout_msg = 'out out' - stderr_msg = 'error error' - cmd = 'test command' + stdout_msg = "out out" + stderr_msg = "error error" + cmd = "test command" exit_code = 3 error = subp.ProcessExecutionError( - stdout=stdout_msg, stderr=stderr_msg, exit_code=3, cmd=cmd) - self.assertEqual(str(error), self.template.format( - description=self.empty_description, stdout=stdout_msg, - stderr=stderr_msg, exit_code=str(exit_code), - reason=self.empty_attr, cmd=cmd)) + stdout=stdout_msg, stderr=stderr_msg, exit_code=3, cmd=cmd + ) + self.assertEqual( + str(error), + self.template.format( + description=self.empty_description, + stdout=stdout_msg, + stderr=stderr_msg, + exit_code=str(exit_code), + reason=self.empty_attr, + cmd=cmd, + ), + ) def test_pexec_error_multi_line_msgs(self): # make sure bytes is converted handled properly when formatting - stdout_msg = 'multi\nline\noutput message'.encode() - stderr_msg = 'multi\nline\nerror message\n\n\n' + stdout_msg = "multi\nline\noutput message".encode() + stderr_msg = "multi\nline\nerror message\n\n\n" error = subp.ProcessExecutionError( - stdout=stdout_msg, stderr=stderr_msg) + stdout=stdout_msg, stderr=stderr_msg + ) self.assertEqual( str(error), - '\n'.join(( - '{description}', - 'Command: {empty_attr}', - 'Exit code: {empty_attr}', - 'Reason: {empty_attr}', - 'Stdout: multi', - ' line', - ' output message', - 'Stderr: multi', - ' line', - ' error message', - )).format(description=self.empty_description, - empty_attr=self.empty_attr)) + "\n".join( + ( + "{description}", + "Command: {empty_attr}", + "Exit code: {empty_attr}", + "Reason: {empty_attr}", + "Stdout: multi", + " line", + " output message", + "Stderr: multi", + " line", + " error message", + ) + ).format( + description=self.empty_description, empty_attr=self.empty_attr + ), + ) class TestSystemIsSnappy(helpers.FilesystemMockingTestCase): def test_id_in_os_release_quoted(self): """os-release containing ID="ubuntu-core" is snappy.""" - orcontent = '\n'.join(['ID="ubuntu-core"', '']) + orcontent = "\n".join(['ID="ubuntu-core"', ""]) root_d = self.tmp_dir() - helpers.populate_dir(root_d, {'etc/os-release': orcontent}) + helpers.populate_dir(root_d, {"etc/os-release": orcontent}) self.reRoot(root_d) self.assertTrue(util.system_is_snappy()) def test_id_in_os_release(self): """os-release containing ID=ubuntu-core is snappy.""" - orcontent = '\n'.join(['ID=ubuntu-core', '']) + orcontent = "\n".join(["ID=ubuntu-core", ""]) root_d = self.tmp_dir() - helpers.populate_dir(root_d, {'etc/os-release': orcontent}) + helpers.populate_dir(root_d, {"etc/os-release": orcontent}) self.reRoot(root_d) self.assertTrue(util.system_is_snappy()) - @mock.patch('cloudinit.util.get_cmdline') + @mock.patch("cloudinit.util.get_cmdline") def test_bad_content_in_os_release_no_effect(self, m_cmdline): """malformed os-release should not raise exception.""" - m_cmdline.return_value = 'root=/dev/sda' - orcontent = '\n'.join(['IDubuntu-core', '']) + m_cmdline.return_value = "root=/dev/sda" + orcontent = "\n".join(["IDubuntu-core", ""]) root_d = self.tmp_dir() - helpers.populate_dir(root_d, {'etc/os-release': orcontent}) + helpers.populate_dir(root_d, {"etc/os-release": orcontent}) self.reRoot() self.assertFalse(util.system_is_snappy()) - @mock.patch('cloudinit.util.get_cmdline') + @mock.patch("cloudinit.util.get_cmdline") def test_snap_core_in_cmdline_is_snappy(self, m_cmdline): """The string snap_core= in kernel cmdline indicates snappy.""" cmdline = ( "BOOT_IMAGE=(loop)/kernel.img root=LABEL=writable " "snap_core=core_x1.snap snap_kernel=pc-kernel_x1.snap ro " "net.ifnames=0 init=/lib/systemd/systemd console=tty1 " - "console=ttyS0 panic=-1") + "console=ttyS0 panic=-1" + ) m_cmdline.return_value = cmdline self.assertTrue(util.system_is_snappy()) self.assertTrue(m_cmdline.call_count > 0) - @mock.patch('cloudinit.util.get_cmdline') + @mock.patch("cloudinit.util.get_cmdline") def test_nothing_found_is_not_snappy(self, m_cmdline): """If no positive identification, then not snappy.""" - m_cmdline.return_value = 'root=/dev/sda' + m_cmdline.return_value = "root=/dev/sda" self.reRoot() self.assertFalse(util.system_is_snappy()) self.assertTrue(m_cmdline.call_count > 0) - @mock.patch('cloudinit.util.get_cmdline') + @mock.patch("cloudinit.util.get_cmdline") def test_channel_ini_with_snappy_is_snappy(self, m_cmdline): """A Channel.ini file with 'ubuntu-core' indicates snappy.""" - m_cmdline.return_value = 'root=/dev/sda' + m_cmdline.return_value = "root=/dev/sda" root_d = self.tmp_dir() - content = '\n'.join(["[Foo]", "source = 'ubuntu-core'", ""]) - helpers.populate_dir( - root_d, {'etc/system-image/channel.ini': content}) + content = "\n".join(["[Foo]", "source = 'ubuntu-core'", ""]) + helpers.populate_dir(root_d, {"etc/system-image/channel.ini": content}) self.reRoot(root_d) self.assertTrue(util.system_is_snappy()) - @mock.patch('cloudinit.util.get_cmdline') + @mock.patch("cloudinit.util.get_cmdline") def test_system_image_config_dir_is_snappy(self, m_cmdline): """Existence of /etc/system-image/config.d indicates snappy.""" - m_cmdline.return_value = 'root=/dev/sda' + m_cmdline.return_value = "root=/dev/sda" root_d = self.tmp_dir() helpers.populate_dir( - root_d, {'etc/system-image/config.d/my.file': "_unused"}) + root_d, {"etc/system-image/config.d/my.file": "_unused"} + ) self.reRoot(root_d) self.assertTrue(util.system_is_snappy()) @@ -793,41 +2190,52 @@ class TestLoadShellContent(helpers.TestCase): def test_comments_handled_correctly(self): """Shell comments should be allowed in the content.""" self.assertEqual( - {'key1': 'val1', 'key2': 'val2', 'key3': 'val3 #tricky'}, - util.load_shell_content('\n'.join([ - "#top of file comment", - "key1=val1 #this is a comment", - "# second comment", - 'key2="val2" # inlin comment' - '#badkey=wark', - 'key3="val3 #tricky"', - '']))) + {"key1": "val1", "key2": "val2", "key3": "val3 #tricky"}, + util.load_shell_content( + "\n".join( + [ + "#top of file comment", + "key1=val1 #this is a comment", + "# second comment", + 'key2="val2" # inlin comment#badkey=wark', + 'key3="val3 #tricky"', + "", + ] + ) + ), + ) class TestGetProcEnv(helpers.TestCase): """test get_proc_env.""" - null = b'\x00' - simple1 = b'HOME=/' - simple2 = b'PATH=/bin:/sbin' - bootflag = b'BOOTABLE_FLAG=\x80' # from LP: #1775371 - mixed = b'MIXED=' + b'ab\xccde' - def _val_decoded(self, blob, encoding='utf-8', errors='replace'): + null = b"\x00" + simple1 = b"HOME=/" + simple2 = b"PATH=/bin:/sbin" + bootflag = b"BOOTABLE_FLAG=\x80" # from LP: #1775371 + mixed = b"MIXED=" + b"ab\xccde" + + def _val_decoded(self, blob, encoding="utf-8", errors="replace"): # return the value portion of key=val decoded. - return blob.split(b'=', 1)[1].decode(encoding, errors) + return blob.split(b"=", 1)[1].decode(encoding, errors) @mock.patch("cloudinit.util.load_file") def test_non_utf8_in_environment(self, m_load_file): """env may have non utf-8 decodable content.""" content = self.null.join( - (self.bootflag, self.simple1, self.simple2, self.mixed)) + (self.bootflag, self.simple1, self.simple2, self.mixed) + ) m_load_file.return_value = content self.assertEqual( - {'BOOTABLE_FLAG': self._val_decoded(self.bootflag), - 'HOME': '/', 'PATH': '/bin:/sbin', - 'MIXED': self._val_decoded(self.mixed)}, - util.get_proc_env(1)) + { + "BOOTABLE_FLAG": self._val_decoded(self.bootflag), + "HOME": "/", + "PATH": "/bin:/sbin", + "MIXED": self._val_decoded(self.mixed), + }, + util.get_proc_env(1), + ) self.assertEqual(1, m_load_file.call_count) @mock.patch("cloudinit.util.load_file") @@ -838,8 +2246,9 @@ class TestGetProcEnv(helpers.TestCase): m_load_file.return_value = content self.assertEqual( - dict([t.split(b'=') for t in lines]), - util.get_proc_env(1, encoding=None)) + dict([t.split(b"=") for t in lines]), + util.get_proc_env(1, encoding=None), + ) self.assertEqual(1, m_load_file.call_count) @mock.patch("cloudinit.util.load_file") @@ -848,8 +2257,8 @@ class TestGetProcEnv(helpers.TestCase): content = self.null.join((self.simple1, self.simple2)) m_load_file.return_value = content self.assertEqual( - {'HOME': '/', 'PATH': '/bin:/sbin'}, - util.get_proc_env(1)) + {"HOME": "/", "PATH": "/bin:/sbin"}, util.get_proc_env(1) + ) self.assertEqual(1, m_load_file.call_count) @mock.patch("cloudinit.util.load_file") @@ -867,16 +2276,17 @@ class TestGetProcEnv(helpers.TestCase): self.assertEqual(my_ppid, util.get_proc_ppid(my_pid)) -class TestKernelVersion(): +class TestKernelVersion: """test kernel version function""" params = [ - ('5.6.19-300.fc32.x86_64', (5, 6)), - ('4.15.0-101-generic', (4, 15)), - ('3.10.0-1062.12.1.vz7.131.10', (3, 10)), - ('4.18.0-144.el8.x86_64', (4, 18))] + ("5.6.19-300.fc32.x86_64", (5, 6)), + ("4.15.0-101-generic", (4, 15)), + ("3.10.0-1062.12.1.vz7.131.10", (3, 10)), + ("4.18.0-144.el8.x86_64", (4, 18)), + ] - @mock.patch('os.uname') + @mock.patch("os.uname") @pytest.mark.parametrize("uname_release,expected", params) def test_kernel_version(self, m_uname, uname_release, expected): m_uname.return_value.release = uname_release @@ -884,49 +2294,48 @@ class TestKernelVersion(): class TestFindDevs: - @mock.patch('cloudinit.subp.subp') + @mock.patch("cloudinit.subp.subp") def test_find_devs_with(self, m_subp): m_subp.return_value = ( '/dev/sda1: UUID="some-uuid" TYPE="ext4" PARTUUID="some-partid"', - '' + "", ) devlist = util.find_devs_with() assert devlist == [ - '/dev/sda1: UUID="some-uuid" TYPE="ext4" PARTUUID="some-partid"'] + '/dev/sda1: UUID="some-uuid" TYPE="ext4" PARTUUID="some-partid"' + ] devlist = util.find_devs_with("LABEL_FATBOOT=A_LABEL") assert devlist == [ - '/dev/sda1: UUID="some-uuid" TYPE="ext4" PARTUUID="some-partid"'] + '/dev/sda1: UUID="some-uuid" TYPE="ext4" PARTUUID="some-partid"' + ] - @mock.patch('cloudinit.subp.subp') + @mock.patch("cloudinit.subp.subp") def test_find_devs_with_openbsd(self, m_subp): - m_subp.return_value = ( - 'cd0:,sd0:630d98d32b5d3759,sd1:,fd0:', '' - ) + m_subp.return_value = ("cd0:,sd0:630d98d32b5d3759,sd1:,fd0:", "") devlist = util.find_devs_with_openbsd() - assert devlist == ['/dev/cd0a', '/dev/sd1i'] + assert devlist == ["/dev/cd0a", "/dev/sd1a", "/dev/sd1i"] - @mock.patch('cloudinit.subp.subp') + @mock.patch("cloudinit.subp.subp") def test_find_devs_with_openbsd_with_criteria(self, m_subp): - m_subp.return_value = ( - 'cd0:,sd0:630d98d32b5d3759,sd1:,fd0:', '' - ) + m_subp.return_value = ("cd0:,sd0:630d98d32b5d3759,sd1:,fd0:", "") devlist = util.find_devs_with_openbsd(criteria="TYPE=iso9660") - assert devlist == ['/dev/cd0a'] + assert devlist == ["/dev/cd0a", "/dev/sd1a", "/dev/sd1i"] # lp: #1841466 devlist = util.find_devs_with_openbsd(criteria="LABEL_FATBOOT=A_LABEL") - assert devlist == ['/dev/cd0a', '/dev/sd1i'] + assert devlist == ["/dev/cd0a", "/dev/sd1a", "/dev/sd1i"] @pytest.mark.parametrize( - 'criteria,expected_devlist', ( - (None, ['/dev/msdosfs/EFISYS', '/dev/iso9660/config-2']), - ('TYPE=iso9660', ['/dev/iso9660/config-2']), - ('TYPE=vfat', ['/dev/msdosfs/EFISYS']), - ('LABEL_FATBOOT=A_LABEL', []), # lp: #1841466 + "criteria,expected_devlist", + ( + (None, ["/dev/msdosfs/EFISYS", "/dev/iso9660/config-2"]), + ("TYPE=iso9660", ["/dev/iso9660/config-2"]), + ("TYPE=vfat", ["/dev/msdosfs/EFISYS"]), + ("LABEL_FATBOOT=A_LABEL", []), # lp: #1841466 ), ) - @mock.patch('glob.glob') + @mock.patch("glob.glob") def test_find_devs_with_freebsd(self, m_glob, criteria, expected_devlist): def fake_glob(pattern): msdos = ["/dev/msdosfs/EFISYS"] @@ -936,58 +2345,54 @@ class TestFindDevs: elif pattern == "/dev/iso9660/*": return iso9660 raise Exception + m_glob.side_effect = fake_glob devlist = util.find_devs_with_freebsd(criteria=criteria) assert devlist == expected_devlist @pytest.mark.parametrize( - 'criteria,expected_devlist', ( - (None, ['/dev/ld0', '/dev/dk0', '/dev/dk1', '/dev/cd0']), - ('TYPE=iso9660', ['/dev/cd0']), - ('TYPE=vfat', ["/dev/ld0", "/dev/dk0", "/dev/dk1"]), - ('LABEL_FATBOOT=A_LABEL', # lp: #1841466 - ['/dev/ld0', '/dev/dk0', '/dev/dk1', '/dev/cd0']), - ) + "criteria,expected_devlist", + ( + (None, ["/dev/ld0", "/dev/dk0", "/dev/dk1", "/dev/cd0"]), + ("TYPE=iso9660", ["/dev/cd0"]), + ("TYPE=vfat", ["/dev/ld0", "/dev/dk0", "/dev/dk1"]), + ( + "LABEL_FATBOOT=A_LABEL", # lp: #1841466 + ["/dev/ld0", "/dev/dk0", "/dev/dk1", "/dev/cd0"], + ), + ), ) @mock.patch("cloudinit.subp.subp") def test_find_devs_with_netbsd(self, m_subp, criteria, expected_devlist): side_effect_values = [ ("ld0 dk0 dk1 cd0", ""), ( - ( - "mscdlabel: CDIOREADTOCHEADER: " - "Inappropriate ioctl for device\n" - "track (ctl=4) at sector 0\n" - "disklabel not written\n" - ), + "mscdlabel: CDIOREADTOCHEADER: " + "Inappropriate ioctl for device\n" + "track (ctl=4) at sector 0\n" + "disklabel not written\n", "", ), ( - ( - "mscdlabel: CDIOREADTOCHEADER: " - "Inappropriate ioctl for device\n" - "track (ctl=4) at sector 0\n" - "disklabel not written\n" - ), + "mscdlabel: CDIOREADTOCHEADER: " + "Inappropriate ioctl for device\n" + "track (ctl=4) at sector 0\n" + "disklabel not written\n", "", ), ( - ( - "mscdlabel: CDIOREADTOCHEADER: " - "Inappropriate ioctl for device\n" - "track (ctl=4) at sector 0\n" - "disklabel not written\n" - ), + "mscdlabel: CDIOREADTOCHEADER: " + "Inappropriate ioctl for device\n" + "track (ctl=4) at sector 0\n" + "disklabel not written\n", "", ), ( - ( - "track (ctl=4) at sector 0\n" - 'ISO filesystem, label "config-2", ' - "creation time: 2020/03/31 17:29\n" - "adding as 'a'\n" - ), + "track (ctl=4) at sector 0\n" + 'ISO filesystem, label "config-2", ' + "creation time: 2020/03/31 17:29\n" + "adding as 'a'\n", "", ), ] @@ -995,4 +2400,25 @@ class TestFindDevs: devlist = util.find_devs_with_netbsd(criteria=criteria) assert devlist == expected_devlist + @pytest.mark.parametrize( + "criteria,expected_devlist", + ( + (None, ["/dev/vbd0", "/dev/cd0", "/dev/acd0"]), + ("TYPE=iso9660", ["/dev/cd0", "/dev/acd0"]), + ("TYPE=vfat", ["/dev/vbd0"]), + ( + "LABEL_FATBOOT=A_LABEL", # lp: #1841466 + ["/dev/vbd0", "/dev/cd0", "/dev/acd0"], + ), + ), + ) + @mock.patch("cloudinit.subp.subp") + def test_find_devs_with_dragonflybsd( + self, m_subp, criteria, expected_devlist + ): + m_subp.return_value = ("md2 md1 cd0 vbd0 acd0 vn3 vn2 vn1 vn0 md0", "") + devlist = util.find_devs_with_dragonflybsd(criteria=criteria) + assert devlist == expected_devlist + + # vi: ts=4 expandtab |