diff options
Diffstat (limited to 'tests/unittests/test_util.py')
-rw-r--r-- | tests/unittests/test_util.py | 934 |
1 files changed, 455 insertions, 479 deletions
diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py index 61b9e303..e2bfe9d2 100644 --- a/tests/unittests/test_util.py +++ b/tests/unittests/test_util.py @@ -3,33 +3,30 @@ """Tests for cloudinit.util""" import base64 -import logging -import json -import platform -import pytest - import io +import json +import logging import os +import platform import re import shutil import stat import tempfile -import yaml +from textwrap import dedent from unittest import mock -from cloudinit import subp -from cloudinit import importer, util -from tests.unittests import helpers - +import pytest +import yaml +from cloudinit import importer, subp, util +from tests.unittests import helpers from tests.unittests.helpers import CiTestCase -from textwrap import dedent LOG = logging.getLogger(__name__) MOUNT_INFO = [ - '68 0 8:3 / / ro,relatime shared:1 - btrfs /dev/sda1 ro,attr2,inode64', - '153 68 254:0 / /home rw,relatime shared:101 - xfs /dev/sda2 rw,attr2', + "68 0 8:3 / / ro,relatime shared:1 - btrfs /dev/sda1 ro,attr2,inode64", + "153 68 254:0 / /home rw,relatime shared:101 - xfs /dev/sda2 rw,attr2", ] OS_RELEASE_SLES = dedent( @@ -329,9 +326,9 @@ class FakeCloud(object): def get_hostname(self, fqdn=None, metadata_only=None): myargs = {} if fqdn is not None: - myargs['fqdn'] = fqdn + myargs["fqdn"] = fqdn if metadata_only is not None: - myargs['metadata_only'] = metadata_only + myargs["metadata_only"] = metadata_only self.calls.append(myargs) if fqdn: return self.fqdn @@ -340,34 +337,34 @@ class FakeCloud(object): class TestUtil(CiTestCase): def test_parse_mount_info_no_opts_no_arg(self): - result = util.parse_mount_info('/home', MOUNT_INFO, LOG) - self.assertEqual(('/dev/sda2', 'xfs', '/home'), result) + result = util.parse_mount_info("/home", MOUNT_INFO, LOG) + self.assertEqual(("/dev/sda2", "xfs", "/home"), result) def test_parse_mount_info_no_opts_arg(self): - result = util.parse_mount_info('/home', MOUNT_INFO, LOG, False) - self.assertEqual(('/dev/sda2', 'xfs', '/home'), result) + result = util.parse_mount_info("/home", MOUNT_INFO, LOG, False) + self.assertEqual(("/dev/sda2", "xfs", "/home"), result) def test_parse_mount_info_with_opts(self): - result = util.parse_mount_info('/', MOUNT_INFO, LOG, True) - self.assertEqual(('/dev/sda1', 'btrfs', '/', 'ro,relatime'), result) + result = util.parse_mount_info("/", MOUNT_INFO, LOG, True) + self.assertEqual(("/dev/sda1", "btrfs", "/", "ro,relatime"), result) - @mock.patch('cloudinit.util.get_mount_info') + @mock.patch("cloudinit.util.get_mount_info") def test_mount_is_rw(self, m_mount_info): - m_mount_info.return_value = ('/dev/sda1', 'btrfs', '/', 'rw,relatime') - is_rw = util.mount_is_read_write('/') + m_mount_info.return_value = ("/dev/sda1", "btrfs", "/", "rw,relatime") + is_rw = util.mount_is_read_write("/") self.assertEqual(is_rw, True) - @mock.patch('cloudinit.util.get_mount_info') + @mock.patch("cloudinit.util.get_mount_info") def test_mount_is_ro(self, m_mount_info): - m_mount_info.return_value = ('/dev/sda1', 'btrfs', '/', 'ro,relatime') - is_rw = util.mount_is_read_write('/') + m_mount_info.return_value = ("/dev/sda1", "btrfs", "/", "ro,relatime") + is_rw = util.mount_is_read_write("/") self.assertEqual(is_rw, False) class TestUptime(CiTestCase): - @mock.patch('cloudinit.util.boottime') - @mock.patch('cloudinit.util.os.path.exists') - @mock.patch('cloudinit.util.time.time') + @mock.patch("cloudinit.util.boottime") + @mock.patch("cloudinit.util.os.path.exists") + @mock.patch("cloudinit.util.time.time") def test_uptime_non_linux_path(self, m_time, m_exists, m_boottime): boottime = 1000.0 uptime = 10.0 @@ -382,24 +379,24 @@ class TestShellify(CiTestCase): def test_input_dict_raises_type_error(self): self.assertRaisesRegex( TypeError, - 'Input.*was.*dict.*xpected', + "Input.*was.*dict.*xpected", util.shellify, - {'mykey': 'myval'}, + {"mykey": "myval"}, ) def test_input_str_raises_type_error(self): self.assertRaisesRegex( - TypeError, 'Input.*was.*str.*xpected', util.shellify, "foobar" + TypeError, "Input.*was.*str.*xpected", util.shellify, "foobar" ) def test_value_with_int_raises_type_error(self): self.assertRaisesRegex( - TypeError, 'shellify.*int', util.shellify, ["foo", 1] + TypeError, "shellify.*int", util.shellify, ["foo", 1] ) def test_supports_strings_and_lists(self): self.assertEqual( - '\n'.join( + "\n".join( [ "#!/bin/sh", "echo hi mom", @@ -409,13 +406,13 @@ class TestShellify(CiTestCase): ] ), util.shellify( - ["echo hi mom", ["echo", "hi dad"], ('echo', 'hi', 'sis')] + ["echo hi mom", ["echo", "hi dad"], ("echo", "hi", "sis")] ), ) def test_supports_comments(self): self.assertEqual( - '\n'.join(["#!/bin/sh", "echo start", "echo end", ""]), + "\n".join(["#!/bin/sh", "echo start", "echo end", ""]), util.shellify(["echo start", None, "echo end"]), ) @@ -424,58 +421,58 @@ class TestGetHostnameFqdn(CiTestCase): def test_get_hostname_fqdn_from_only_cfg_fqdn(self): """When cfg only has the fqdn key, derive hostname and fqdn from it.""" hostname, fqdn = util.get_hostname_fqdn( - cfg={'fqdn': 'myhost.domain.com'}, cloud=None + cfg={"fqdn": "myhost.domain.com"}, cloud=None ) - self.assertEqual('myhost', hostname) - self.assertEqual('myhost.domain.com', fqdn) + self.assertEqual("myhost", hostname) + self.assertEqual("myhost.domain.com", fqdn) def test_get_hostname_fqdn_from_cfg_fqdn_and_hostname(self): """When cfg has both fqdn and hostname keys, return them.""" hostname, fqdn = util.get_hostname_fqdn( - cfg={'fqdn': 'myhost.domain.com', 'hostname': 'other'}, cloud=None + cfg={"fqdn": "myhost.domain.com", "hostname": "other"}, cloud=None ) - self.assertEqual('other', hostname) - self.assertEqual('myhost.domain.com', fqdn) + self.assertEqual("other", hostname) + self.assertEqual("myhost.domain.com", fqdn) def test_get_hostname_fqdn_from_cfg_hostname_with_domain(self): """When cfg has only hostname key which represents a fqdn, use that.""" hostname, fqdn = util.get_hostname_fqdn( - cfg={'hostname': 'myhost.domain.com'}, cloud=None + cfg={"hostname": "myhost.domain.com"}, cloud=None ) - self.assertEqual('myhost', hostname) - self.assertEqual('myhost.domain.com', fqdn) + self.assertEqual("myhost", hostname) + self.assertEqual("myhost.domain.com", fqdn) def test_get_hostname_fqdn_from_cfg_hostname_without_domain(self): """When cfg has a hostname without a '.' query cloud.get_hostname.""" - mycloud = FakeCloud('cloudhost', 'cloudhost.mycloud.com') + mycloud = FakeCloud("cloudhost", "cloudhost.mycloud.com") hostname, fqdn = util.get_hostname_fqdn( - cfg={'hostname': 'myhost'}, cloud=mycloud + cfg={"hostname": "myhost"}, cloud=mycloud ) - self.assertEqual('myhost', hostname) - self.assertEqual('cloudhost.mycloud.com', fqdn) + self.assertEqual("myhost", hostname) + self.assertEqual("cloudhost.mycloud.com", fqdn) self.assertEqual( - [{'fqdn': True, 'metadata_only': False}], mycloud.calls + [{"fqdn": True, "metadata_only": False}], mycloud.calls ) def test_get_hostname_fqdn_from_without_fqdn_or_hostname(self): """When cfg has neither hostname nor fqdn cloud.get_hostname.""" - mycloud = FakeCloud('cloudhost', 'cloudhost.mycloud.com') + mycloud = FakeCloud("cloudhost", "cloudhost.mycloud.com") hostname, fqdn = util.get_hostname_fqdn(cfg={}, cloud=mycloud) - self.assertEqual('cloudhost', hostname) - self.assertEqual('cloudhost.mycloud.com', fqdn) + self.assertEqual("cloudhost", hostname) + self.assertEqual("cloudhost.mycloud.com", fqdn) self.assertEqual( - [{'fqdn': True, 'metadata_only': False}, {'metadata_only': False}], + [{"fqdn": True, "metadata_only": False}, {"metadata_only": False}], mycloud.calls, ) def test_get_hostname_fqdn_from_passes_metadata_only_to_cloud(self): """Calls to cloud.get_hostname pass the metadata_only parameter.""" - mycloud = FakeCloud('cloudhost', 'cloudhost.mycloud.com') + mycloud = FakeCloud("cloudhost", "cloudhost.mycloud.com") _hn, _fqdn = util.get_hostname_fqdn( cfg={}, cloud=mycloud, metadata_only=True ) self.assertEqual( - [{'fqdn': True, 'metadata_only': True}, {'metadata_only': True}], + [{"fqdn": True, "metadata_only": True}, {"metadata_only": True}], mycloud.calls, ) @@ -565,19 +562,19 @@ class TestBlkid(CiTestCase): ) -@mock.patch('cloudinit.subp.subp') +@mock.patch("cloudinit.subp.subp") class TestUdevadmSettle(CiTestCase): def test_with_no_params(self, m_subp): """called with no parameters.""" util.udevadm_settle() - m_subp.called_once_with(mock.call(['udevadm', 'settle'])) + m_subp.called_once_with(mock.call(["udevadm", "settle"])) def test_with_exists_and_not_exists(self, m_subp): """with exists=file where file does not exist should invoke subp.""" mydev = self.tmp_path("mydev") util.udevadm_settle(exists=mydev) m_subp.called_once_with( - ['udevadm', 'settle', '--exit-if-exists=%s' % mydev] + ["udevadm", "settle", "--exit-if-exists=%s" % mydev] ) def test_with_exists_and_file_exists(self, m_subp): @@ -592,7 +589,7 @@ class TestUdevadmSettle(CiTestCase): timeout = 9 util.udevadm_settle(timeout=timeout) m_subp.called_once_with( - ['udevadm', 'settle', '--timeout=%s' % timeout] + ["udevadm", "settle", "--timeout=%s" % timeout] ) def test_with_timeout_string(self, m_subp): @@ -600,7 +597,7 @@ class TestUdevadmSettle(CiTestCase): timeout = "555" util.udevadm_settle(timeout=timeout) m_subp.assert_called_once_with( - ['udevadm', 'settle', '--timeout=%s' % timeout] + ["udevadm", "settle", "--timeout=%s" % timeout] ) def test_with_exists_and_timeout(self, m_subp): @@ -610,10 +607,10 @@ class TestUdevadmSettle(CiTestCase): util.udevadm_settle(exists=mydev) m_subp.called_once_with( [ - 'udevadm', - 'settle', - '--exit-if-exists=%s' % mydev, - '--timeout=%s' % timeout, + "udevadm", + "settle", + "--exit-if-exists=%s" % mydev, + "--timeout=%s" % timeout, ] ) @@ -622,7 +619,7 @@ class TestUdevadmSettle(CiTestCase): self.assertRaises(subp.ProcessExecutionError, util.udevadm_settle) -@mock.patch('os.path.exists') +@mock.patch("os.path.exists") class TestGetLinuxDistro(CiTestCase): def setUp(self): # python2 has no lru_cache, and therefore, no cache_clear() @@ -632,36 +629,36 @@ class TestGetLinuxDistro(CiTestCase): @classmethod def os_release_exists(self, path): """Side effect function""" - if path == '/etc/os-release': + if path == "/etc/os-release": return 1 @classmethod def redhat_release_exists(self, path): """Side effect function""" - if path == '/etc/redhat-release': + if path == "/etc/redhat-release": return 1 - @mock.patch('cloudinit.util.load_file') + @mock.patch("cloudinit.util.load_file") def test_get_linux_distro_quoted_name(self, m_os_release, m_path_exists): """Verify we get the correct name if the os-release file has the distro name in quotes""" m_os_release.return_value = OS_RELEASE_SLES m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists dist = util.get_linux_distro() - self.assertEqual(('sles', '12.3', platform.machine()), dist) + self.assertEqual(("sles", "12.3", platform.machine()), dist) - @mock.patch('cloudinit.util.load_file') + @mock.patch("cloudinit.util.load_file") def test_get_linux_distro_bare_name(self, m_os_release, m_path_exists): """Verify we get the correct name if the os-release file does not have the distro name in quotes""" m_os_release.return_value = OS_RELEASE_UBUNTU m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists dist = util.get_linux_distro() - self.assertEqual(('ubuntu', '16.04', 'xenial'), dist) + self.assertEqual(("ubuntu", "16.04", "xenial"), dist) - @mock.patch('platform.system') - @mock.patch('platform.release') - @mock.patch('cloudinit.util._parse_redhat_release') + @mock.patch("platform.system") + @mock.patch("platform.release") + @mock.patch("cloudinit.util._parse_redhat_release") def test_get_linux_freebsd( self, m_parse_redhat_release, @@ -671,192 +668,194 @@ class TestGetLinuxDistro(CiTestCase): ): """Verify we get the correct name and release name on FreeBSD.""" m_path_exists.return_value = False - m_platform_release.return_value = '12.0-RELEASE-p10' - m_platform_system.return_value = 'FreeBSD' + m_platform_release.return_value = "12.0-RELEASE-p10" + m_platform_system.return_value = "FreeBSD" m_parse_redhat_release.return_value = {} util.is_BSD.cache_clear() dist = util.get_linux_distro() - self.assertEqual(('freebsd', '12.0-RELEASE-p10', ''), dist) + self.assertEqual(("freebsd", "12.0-RELEASE-p10", ""), dist) - @mock.patch('cloudinit.util.load_file') + @mock.patch("cloudinit.util.load_file") def test_get_linux_centos6(self, m_os_release, m_path_exists): """Verify we get the correct name and release name on CentOS 6.""" m_os_release.return_value = REDHAT_RELEASE_CENTOS_6 m_path_exists.side_effect = TestGetLinuxDistro.redhat_release_exists dist = util.get_linux_distro() - self.assertEqual(('centos', '6.10', 'Final'), dist) + self.assertEqual(("centos", "6.10", "Final"), dist) - @mock.patch('cloudinit.util.load_file') + @mock.patch("cloudinit.util.load_file") def test_get_linux_centos7_redhat_release(self, m_os_release, m_exists): """Verify the correct release info on CentOS 7 without os-release.""" m_os_release.return_value = REDHAT_RELEASE_CENTOS_7 m_exists.side_effect = TestGetLinuxDistro.redhat_release_exists dist = util.get_linux_distro() - self.assertEqual(('centos', '7.5.1804', 'Core'), dist) + self.assertEqual(("centos", "7.5.1804", "Core"), dist) - @mock.patch('cloudinit.util.load_file') + @mock.patch("cloudinit.util.load_file") def test_get_linux_redhat7_osrelease(self, m_os_release, m_path_exists): """Verify redhat 7 read from os-release.""" m_os_release.return_value = OS_RELEASE_REDHAT_7 m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists dist = util.get_linux_distro() - self.assertEqual(('redhat', '7.5', 'Maipo'), dist) + self.assertEqual(("redhat", "7.5", "Maipo"), dist) - @mock.patch('cloudinit.util.load_file') + @mock.patch("cloudinit.util.load_file") def test_get_linux_redhat7_rhrelease(self, m_os_release, m_path_exists): """Verify redhat 7 read from redhat-release.""" m_os_release.return_value = REDHAT_RELEASE_REDHAT_7 m_path_exists.side_effect = TestGetLinuxDistro.redhat_release_exists dist = util.get_linux_distro() - self.assertEqual(('redhat', '7.5', 'Maipo'), dist) + self.assertEqual(("redhat", "7.5", "Maipo"), dist) - @mock.patch('cloudinit.util.load_file') + @mock.patch("cloudinit.util.load_file") def test_get_linux_redhat6_rhrelease(self, m_os_release, m_path_exists): """Verify redhat 6 read from redhat-release.""" m_os_release.return_value = REDHAT_RELEASE_REDHAT_6 m_path_exists.side_effect = TestGetLinuxDistro.redhat_release_exists dist = util.get_linux_distro() - self.assertEqual(('redhat', '6.10', 'Santiago'), dist) + self.assertEqual(("redhat", "6.10", "Santiago"), dist) - @mock.patch('cloudinit.util.load_file') + @mock.patch("cloudinit.util.load_file") def test_get_linux_copr_centos(self, m_os_release, m_path_exists): """Verify we get the correct name and release name on COPR CentOS.""" m_os_release.return_value = OS_RELEASE_CENTOS m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists dist = util.get_linux_distro() - self.assertEqual(('centos', '7', 'Core'), dist) + self.assertEqual(("centos", "7", "Core"), dist) - @mock.patch('cloudinit.util.load_file') + @mock.patch("cloudinit.util.load_file") def test_get_linux_almalinux8_rhrelease(self, m_os_release, m_path_exists): """Verify almalinux 8 read from redhat-release.""" m_os_release.return_value = REDHAT_RELEASE_ALMALINUX_8 m_path_exists.side_effect = TestGetLinuxDistro.redhat_release_exists dist = util.get_linux_distro() - self.assertEqual(('almalinux', '8.3', 'Purple Manul'), dist) + self.assertEqual(("almalinux", "8.3", "Purple Manul"), dist) - @mock.patch('cloudinit.util.load_file') + @mock.patch("cloudinit.util.load_file") def test_get_linux_almalinux8_osrelease(self, m_os_release, m_path_exists): """Verify almalinux 8 read from os-release.""" m_os_release.return_value = OS_RELEASE_ALMALINUX_8 m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists dist = util.get_linux_distro() - self.assertEqual(('almalinux', '8.3', 'Purple Manul'), dist) + self.assertEqual(("almalinux", "8.3", "Purple Manul"), dist) - @mock.patch('cloudinit.util.load_file') + @mock.patch("cloudinit.util.load_file") def test_get_linux_eurolinux7_rhrelease(self, m_os_release, m_path_exists): """Verify eurolinux 7 read from redhat-release.""" m_os_release.return_value = REDHAT_RELEASE_EUROLINUX_7 m_path_exists.side_effect = TestGetLinuxDistro.redhat_release_exists dist = util.get_linux_distro() - self.assertEqual(('eurolinux', '7.9', 'Minsk'), dist) + self.assertEqual(("eurolinux", "7.9", "Minsk"), dist) - @mock.patch('cloudinit.util.load_file') + @mock.patch("cloudinit.util.load_file") def test_get_linux_eurolinux7_osrelease(self, m_os_release, m_path_exists): """Verify eurolinux 7 read from os-release.""" m_os_release.return_value = OS_RELEASE_EUROLINUX_7 m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists dist = util.get_linux_distro() - self.assertEqual(('eurolinux', '7.9', 'Minsk'), dist) + self.assertEqual(("eurolinux", "7.9", "Minsk"), dist) - @mock.patch('cloudinit.util.load_file') + @mock.patch("cloudinit.util.load_file") def test_get_linux_eurolinux8_rhrelease(self, m_os_release, m_path_exists): """Verify eurolinux 8 read from redhat-release.""" m_os_release.return_value = REDHAT_RELEASE_EUROLINUX_8 m_path_exists.side_effect = TestGetLinuxDistro.redhat_release_exists dist = util.get_linux_distro() - self.assertEqual(('eurolinux', '8.4', 'Vaduz'), dist) + self.assertEqual(("eurolinux", "8.4", "Vaduz"), dist) - @mock.patch('cloudinit.util.load_file') + @mock.patch("cloudinit.util.load_file") def test_get_linux_eurolinux8_osrelease(self, m_os_release, m_path_exists): """Verify eurolinux 8 read from os-release.""" m_os_release.return_value = OS_RELEASE_EUROLINUX_8 m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists dist = util.get_linux_distro() - self.assertEqual(('eurolinux', '8.4', 'Vaduz'), dist) + self.assertEqual(("eurolinux", "8.4", "Vaduz"), dist) - @mock.patch('cloudinit.util.load_file') - def test_get_linux_miraclelinux8_rhrelease(self, m_os_release, - m_path_exists): + @mock.patch("cloudinit.util.load_file") + def test_get_linux_miraclelinux8_rhrelease( + self, m_os_release, m_path_exists + ): """Verify miraclelinux 8 read from redhat-release.""" m_os_release.return_value = REDHAT_RELEASE_MIRACLELINUX_8 m_path_exists.side_effect = TestGetLinuxDistro.redhat_release_exists dist = util.get_linux_distro() - self.assertEqual(('miracle', '8.4', 'Peony'), dist) + self.assertEqual(("miracle", "8.4", "Peony"), dist) - @mock.patch('cloudinit.util.load_file') - def test_get_linux_miraclelinux8_osrelease(self, m_os_release, - m_path_exists): + @mock.patch("cloudinit.util.load_file") + def test_get_linux_miraclelinux8_osrelease( + self, m_os_release, m_path_exists + ): """Verify miraclelinux 8 read from os-release.""" m_os_release.return_value = OS_RELEASE_MIRACLELINUX_8 m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists dist = util.get_linux_distro() - self.assertEqual(('miraclelinux', '8', 'Peony'), dist) + self.assertEqual(("miraclelinux", "8", "Peony"), dist) - @mock.patch('cloudinit.util.load_file') + @mock.patch("cloudinit.util.load_file") def test_get_linux_rocky8_rhrelease(self, m_os_release, m_path_exists): """Verify rocky linux 8 read from redhat-release.""" m_os_release.return_value = REDHAT_RELEASE_ROCKY_8 m_path_exists.side_effect = TestGetLinuxDistro.redhat_release_exists dist = util.get_linux_distro() - self.assertEqual(('rocky', '8.3', 'Green Obsidian'), dist) + self.assertEqual(("rocky", "8.3", "Green Obsidian"), dist) - @mock.patch('cloudinit.util.load_file') + @mock.patch("cloudinit.util.load_file") def test_get_linux_rocky8_osrelease(self, m_os_release, m_path_exists): """Verify rocky linux 8 read from os-release.""" m_os_release.return_value = OS_RELEASE_ROCKY_8 m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists dist = util.get_linux_distro() - self.assertEqual(('rocky', '8.3', 'Green Obsidian'), dist) + self.assertEqual(("rocky", "8.3", "Green Obsidian"), dist) - @mock.patch('cloudinit.util.load_file') + @mock.patch("cloudinit.util.load_file") def test_get_linux_virtuozzo8_rhrelease(self, m_os_release, m_path_exists): """Verify virtuozzo linux 8 read from redhat-release.""" m_os_release.return_value = REDHAT_RELEASE_VIRTUOZZO_8 m_path_exists.side_effect = TestGetLinuxDistro.redhat_release_exists dist = util.get_linux_distro() - self.assertEqual(('virtuozzo', '8', 'Virtuozzo Linux'), dist) + self.assertEqual(("virtuozzo", "8", "Virtuozzo Linux"), dist) - @mock.patch('cloudinit.util.load_file') + @mock.patch("cloudinit.util.load_file") def test_get_linux_virtuozzo8_osrelease(self, m_os_release, m_path_exists): """Verify virtuozzo linux 8 read from os-release.""" m_os_release.return_value = OS_RELEASE_VIRTUOZZO_8 m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists dist = util.get_linux_distro() - self.assertEqual(('virtuozzo', '8', 'Virtuozzo Linux'), dist) + self.assertEqual(("virtuozzo", "8", "Virtuozzo Linux"), dist) - @mock.patch('cloudinit.util.load_file') + @mock.patch("cloudinit.util.load_file") def test_get_linux_cloud8_rhrelease(self, m_os_release, m_path_exists): """Verify cloudlinux 8 read from redhat-release.""" m_os_release.return_value = REDHAT_RELEASE_CLOUDLINUX_8 m_path_exists.side_effect = TestGetLinuxDistro.redhat_release_exists dist = util.get_linux_distro() - self.assertEqual(('cloudlinux', '8.4', 'Valery Rozhdestvensky'), dist) + self.assertEqual(("cloudlinux", "8.4", "Valery Rozhdestvensky"), dist) - @mock.patch('cloudinit.util.load_file') + @mock.patch("cloudinit.util.load_file") def test_get_linux_cloud8_osrelease(self, m_os_release, m_path_exists): """Verify cloudlinux 8 read from os-release.""" m_os_release.return_value = OS_RELEASE_CLOUDLINUX_8 m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists dist = util.get_linux_distro() - self.assertEqual(('cloudlinux', '8.4', 'Valery Rozhdestvensky'), dist) + self.assertEqual(("cloudlinux", "8.4", "Valery Rozhdestvensky"), dist) - @mock.patch('cloudinit.util.load_file') + @mock.patch("cloudinit.util.load_file") def test_get_linux_debian(self, m_os_release, m_path_exists): """Verify we get the correct name and release name on Debian.""" m_os_release.return_value = OS_RELEASE_DEBIAN m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists dist = util.get_linux_distro() - self.assertEqual(('debian', '9', 'stretch'), dist) + self.assertEqual(("debian", "9", "stretch"), dist) - @mock.patch('cloudinit.util.load_file') + @mock.patch("cloudinit.util.load_file") def test_get_linux_openeuler(self, m_os_release, m_path_exists): """Verify get the correct name and release name on Openeuler.""" m_os_release.return_value = OS_RELEASE_OPENEULER_20 m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists dist = util.get_linux_distro() - self.assertEqual(('openEuler', '20.03', 'LTS-SP2'), dist) + self.assertEqual(("openEuler", "20.03", "LTS-SP2"), dist) - @mock.patch('cloudinit.util.load_file') + @mock.patch("cloudinit.util.load_file") def test_get_linux_opensuse(self, m_os_release, m_path_exists): """Verify we get the correct name and machine arch on openSUSE prior to openSUSE Leap 15. @@ -864,9 +863,9 @@ class TestGetLinuxDistro(CiTestCase): m_os_release.return_value = OS_RELEASE_OPENSUSE m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists dist = util.get_linux_distro() - self.assertEqual(('opensuse', '42.3', platform.machine()), dist) + self.assertEqual(("opensuse", "42.3", platform.machine()), dist) - @mock.patch('cloudinit.util.load_file') + @mock.patch("cloudinit.util.load_file") def test_get_linux_opensuse_l15(self, m_os_release, m_path_exists): """Verify we get the correct name and machine arch on openSUSE for openSUSE Leap 15.0 and later. @@ -874,9 +873,9 @@ class TestGetLinuxDistro(CiTestCase): m_os_release.return_value = OS_RELEASE_OPENSUSE_L15 m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists dist = util.get_linux_distro() - self.assertEqual(('opensuse-leap', '15.0', platform.machine()), dist) + self.assertEqual(("opensuse-leap", "15.0", platform.machine()), dist) - @mock.patch('cloudinit.util.load_file') + @mock.patch("cloudinit.util.load_file") def test_get_linux_opensuse_tw(self, m_os_release, m_path_exists): """Verify we get the correct name and machine arch on openSUSE for openSUSE Tumbleweed @@ -885,31 +884,31 @@ class TestGetLinuxDistro(CiTestCase): m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists dist = util.get_linux_distro() self.assertEqual( - ('opensuse-tumbleweed', '20180920', platform.machine()), dist + ("opensuse-tumbleweed", "20180920", platform.machine()), dist ) - @mock.patch('cloudinit.util.load_file') + @mock.patch("cloudinit.util.load_file") def test_get_linux_photon_os_release(self, m_os_release, m_path_exists): """Verify we get the correct name and machine arch on PhotonOS""" m_os_release.return_value = OS_RELEASE_PHOTON m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists dist = util.get_linux_distro() - self.assertEqual(('photon', '4.0', 'VMware Photon OS/Linux'), dist) + self.assertEqual(("photon", "4.0", "VMware Photon OS/Linux"), dist) - @mock.patch('platform.system') - @mock.patch('platform.dist', create=True) + @mock.patch("platform.system") + @mock.patch("platform.dist", create=True) def test_get_linux_distro_no_data( self, m_platform_dist, m_platform_system, m_path_exists ): """Verify we get no information if os-release does not exist""" - m_platform_dist.return_value = ('', '', '') + m_platform_dist.return_value = ("", "", "") m_platform_system.return_value = "Linux" m_path_exists.return_value = 0 dist = util.get_linux_distro() - self.assertEqual(('', '', ''), dist) + self.assertEqual(("", "", ""), dist) - @mock.patch('platform.system') - @mock.patch('platform.dist', create=True) + @mock.patch("platform.system") + @mock.patch("platform.dist", create=True) def test_get_linux_distro_no_impl( self, m_platform_dist, m_platform_system, m_path_exists ): @@ -919,55 +918,55 @@ class TestGetLinuxDistro(CiTestCase): m_platform_system.return_value = "Linux" m_path_exists.return_value = 0 dist = util.get_linux_distro() - self.assertEqual(('', '', ''), dist) + self.assertEqual(("", "", ""), dist) - @mock.patch('platform.system') - @mock.patch('platform.dist', create=True) + @mock.patch("platform.system") + @mock.patch("platform.dist", create=True) def test_get_linux_distro_plat_data( self, m_platform_dist, m_platform_system, m_path_exists ): """Verify we get the correct platform information""" - m_platform_dist.return_value = ('foo', '1.1', 'aarch64') + m_platform_dist.return_value = ("foo", "1.1", "aarch64") m_platform_system.return_value = "Linux" m_path_exists.return_value = 0 dist = util.get_linux_distro() - self.assertEqual(('foo', '1.1', 'aarch64'), dist) + self.assertEqual(("foo", "1.1", "aarch64"), dist) class TestGetVariant: @pytest.mark.parametrize( - 'info, expected_variant', + "info, expected_variant", [ - ({'system': 'Linux', 'dist': ('almalinux',)}, 'almalinux'), - ({'system': 'linux', 'dist': ('alpine',)}, 'alpine'), - ({'system': 'linux', 'dist': ('arch',)}, 'arch'), - ({'system': 'linux', 'dist': ('centos',)}, 'centos'), - ({'system': 'linux', 'dist': ('cloudlinux',)}, 'cloudlinux'), - ({'system': 'linux', 'dist': ('debian',)}, 'debian'), - ({'system': 'linux', 'dist': ('eurolinux',)}, 'eurolinux'), - ({'system': 'linux', 'dist': ('fedora',)}, 'fedora'), - ({'system': 'linux', 'dist': ('openEuler',)}, 'openeuler'), - ({'system': 'linux', 'dist': ('photon',)}, 'photon'), - ({'system': 'linux', 'dist': ('rhel',)}, 'rhel'), - ({'system': 'linux', 'dist': ('rocky',)}, 'rocky'), - ({'system': 'linux', 'dist': ('suse',)}, 'suse'), - ({'system': 'linux', 'dist': ('virtuozzo',)}, 'virtuozzo'), - ({'system': 'linux', 'dist': ('ubuntu',)}, 'ubuntu'), - ({'system': 'linux', 'dist': ('linuxmint',)}, 'ubuntu'), - ({'system': 'linux', 'dist': ('mint',)}, 'ubuntu'), - ({'system': 'linux', 'dist': ('redhat',)}, 'rhel'), - ({'system': 'linux', 'dist': ('opensuse',)}, 'suse'), - ({'system': 'linux', 'dist': ('opensuse-tumbleweed',)}, 'suse'), - ({'system': 'linux', 'dist': ('opensuse-leap',)}, 'suse'), - ({'system': 'linux', 'dist': ('sles',)}, 'suse'), - ({'system': 'linux', 'dist': ('sle_hpc',)}, 'suse'), - ({'system': 'linux', 'dist': ('my_distro',)}, 'linux'), - ({'system': 'Windows', 'dist': ('dontcare',)}, 'windows'), - ({'system': 'Darwin', 'dist': ('dontcare',)}, 'darwin'), - ({'system': 'Freebsd', 'dist': ('dontcare',)}, 'freebsd'), - ({'system': 'Netbsd', 'dist': ('dontcare',)}, 'netbsd'), - ({'system': 'Openbsd', 'dist': ('dontcare',)}, 'openbsd'), - ({'system': 'Dragonfly', 'dist': ('dontcare',)}, 'dragonfly'), + ({"system": "Linux", "dist": ("almalinux",)}, "almalinux"), + ({"system": "linux", "dist": ("alpine",)}, "alpine"), + ({"system": "linux", "dist": ("arch",)}, "arch"), + ({"system": "linux", "dist": ("centos",)}, "centos"), + ({"system": "linux", "dist": ("cloudlinux",)}, "cloudlinux"), + ({"system": "linux", "dist": ("debian",)}, "debian"), + ({"system": "linux", "dist": ("eurolinux",)}, "eurolinux"), + ({"system": "linux", "dist": ("fedora",)}, "fedora"), + ({"system": "linux", "dist": ("openEuler",)}, "openeuler"), + ({"system": "linux", "dist": ("photon",)}, "photon"), + ({"system": "linux", "dist": ("rhel",)}, "rhel"), + ({"system": "linux", "dist": ("rocky",)}, "rocky"), + ({"system": "linux", "dist": ("suse",)}, "suse"), + ({"system": "linux", "dist": ("virtuozzo",)}, "virtuozzo"), + ({"system": "linux", "dist": ("ubuntu",)}, "ubuntu"), + ({"system": "linux", "dist": ("linuxmint",)}, "ubuntu"), + ({"system": "linux", "dist": ("mint",)}, "ubuntu"), + ({"system": "linux", "dist": ("redhat",)}, "rhel"), + ({"system": "linux", "dist": ("opensuse",)}, "suse"), + ({"system": "linux", "dist": ("opensuse-tumbleweed",)}, "suse"), + ({"system": "linux", "dist": ("opensuse-leap",)}, "suse"), + ({"system": "linux", "dist": ("sles",)}, "suse"), + ({"system": "linux", "dist": ("sle_hpc",)}, "suse"), + ({"system": "linux", "dist": ("my_distro",)}, "linux"), + ({"system": "Windows", "dist": ("dontcare",)}, "windows"), + ({"system": "Darwin", "dist": ("dontcare",)}, "darwin"), + ({"system": "Freebsd", "dist": ("dontcare",)}, "freebsd"), + ({"system": "Netbsd", "dist": ("dontcare",)}, "netbsd"), + ({"system": "Openbsd", "dist": ("dontcare",)}, "openbsd"), + ({"system": "Dragonfly", "dist": ("dontcare",)}, "dragonfly"), ], ) def test_get_variant(self, info, expected_variant): @@ -978,41 +977,42 @@ class TestGetVariant: class TestJsonDumps(CiTestCase): def test_is_str(self): """json_dumps should return a string.""" - self.assertTrue(isinstance(util.json_dumps({'abc': '123'}), str)) + self.assertTrue(isinstance(util.json_dumps({"abc": "123"}), str)) def test_utf8(self): - smiley = '\\ud83d\\ude03' + smiley = "\\ud83d\\ude03" self.assertEqual( - {'smiley': smiley}, json.loads(util.json_dumps({'smiley': smiley})) + {"smiley": smiley}, json.loads(util.json_dumps({"smiley": smiley})) ) def test_non_utf8(self): - blob = b'\xba\x03Qx-#y\xea' + blob = b"\xba\x03Qx-#y\xea" self.assertEqual( - {'blob': 'ci-b64:' + base64.b64encode(blob).decode('utf-8')}, - json.loads(util.json_dumps({'blob': blob})), + {"blob": "ci-b64:" + base64.b64encode(blob).decode("utf-8")}, + json.loads(util.json_dumps({"blob": blob})), ) -@mock.patch('os.path.exists') +@mock.patch("os.path.exists") class TestIsLXD(CiTestCase): def test_is_lxd_true_on_sock_device(self, m_exists): """When lxd's /dev/lxd/sock exists, is_lxd returns true.""" m_exists.return_value = True self.assertTrue(util.is_lxd()) - m_exists.assert_called_once_with('/dev/lxd/sock') + m_exists.assert_called_once_with("/dev/lxd/sock") def test_is_lxd_false_when_sock_device_absent(self, m_exists): """When lxd's /dev/lxd/sock is absent, is_lxd returns false.""" m_exists.return_value = False self.assertFalse(util.is_lxd()) - m_exists.assert_called_once_with('/dev/lxd/sock') + m_exists.assert_called_once_with("/dev/lxd/sock") class TestReadCcFromCmdline: if hasattr(pytest, "param"): random_string = pytest.param( - CiTestCase.random_string(), None, id="random_string") + CiTestCase.random_string(), None, id="random_string" + ) else: random_string = (CiTestCase.random_string(), None) @@ -1022,55 +1022,51 @@ class TestReadCcFromCmdline: # Return None if cmdline has no cc:<YAML>end_cc content. random_string, # Return None if YAML content is empty string. - ('foo cc: end_cc bar', None), + ("foo cc: end_cc bar", None), # Return expected dictionary without trailing end_cc marker. - ('foo cc: ssh_pwauth: true', {'ssh_pwauth': True}), + ("foo cc: ssh_pwauth: true", {"ssh_pwauth": True}), # Return expected dictionary w escaped newline and no end_cc. - ('foo cc: ssh_pwauth: true\\n', {'ssh_pwauth': True}), + ("foo cc: ssh_pwauth: true\\n", {"ssh_pwauth": True}), # Return expected dictionary of yaml between cc: and end_cc. - ('foo cc: ssh_pwauth: true end_cc bar', {'ssh_pwauth': True}), + ("foo cc: ssh_pwauth: true end_cc bar", {"ssh_pwauth": True}), # Return dict with list value w escaped newline, no end_cc. ( - 'cc: ssh_import_id: [smoser, kirkland]\\n', - {'ssh_import_id': ['smoser', 'kirkland']}, + "cc: ssh_import_id: [smoser, kirkland]\\n", + {"ssh_import_id": ["smoser", "kirkland"]}, ), # Parse urlencoded brackets in yaml content. ( - 'cc: ssh_import_id: %5Bsmoser, kirkland%5D end_cc', - {'ssh_import_id': ['smoser', 'kirkland']}, + "cc: ssh_import_id: %5Bsmoser, kirkland%5D end_cc", + {"ssh_import_id": ["smoser", "kirkland"]}, ), # Parse complete urlencoded yaml content. ( - 'cc: ssh_import_id%3A%20%5Buser1%2C%20user2%5D end_cc', - {'ssh_import_id': ['user1', 'user2']}, + "cc: ssh_import_id%3A%20%5Buser1%2C%20user2%5D end_cc", + {"ssh_import_id": ["user1", "user2"]}, ), # Parse nested dictionary in yaml content. ( - 'cc: ntp: {enabled: true, ntp_client: myclient} end_cc', - {'ntp': {'enabled': True, 'ntp_client': 'myclient'}}, + "cc: ntp: {enabled: true, ntp_client: myclient} end_cc", + {"ntp": {"enabled": True, "ntp_client": "myclient"}}, ), # Parse single mapping value in yaml content. - ('cc: ssh_import_id: smoser end_cc', {'ssh_import_id': 'smoser'}), + ("cc: ssh_import_id: smoser end_cc", {"ssh_import_id": "smoser"}), # Parse multiline content with multiple mapping and nested lists. ( - ( - 'cc: ssh_import_id: [smoser, bob]\\n' - 'runcmd: [ [ ls, -l ], echo hi ] end_cc' - ), + "cc: ssh_import_id: [smoser, bob]\\n" + "runcmd: [ [ ls, -l ], echo hi ] end_cc", { - 'ssh_import_id': ['smoser', 'bob'], - 'runcmd': [['ls', '-l'], 'echo hi'], + "ssh_import_id": ["smoser", "bob"], + "runcmd": [["ls", "-l"], "echo hi"], }, ), # Parse multiline encoded content w/ mappings and nested lists. ( - ( - 'cc: ssh_import_id: %5Bsmoser, bob%5D\\n' - 'runcmd: [ [ ls, -l ], echo hi ] end_cc' - ), + "cc: ssh_import_id: %5Bsmoser, bob%5D\\n" + "runcmd: [ [ ls, -l ], echo hi ] end_cc", { - 'ssh_import_id': ['smoser', 'bob'], - 'runcmd': [['ls', '-l'], 'echo hi'], + "ssh_import_id": ["smoser", "bob"], + "runcmd": [["ls", "-l"], "echo hi"], }, ), # test encoded escaped newlines work. @@ -1079,17 +1075,13 @@ class TestReadCcFromCmdline: # 'ssh_import_id: [smoser, bob]\\nruncmd: [ [ ls, -l ], echo hi ]' ( ( - 'cc: ' - + ( - 'ssh_import_id%3A%20%5Bsmoser%2C%20bob%5D%5Cn' - 'runcmd%3A%20%5B%20%5B%20ls%2C%20-l%20%5D%2C' - '%20echo%20hi%20%5D' - ) - + ' end_cc' + "cc: " + "ssh_import_id%3A%20%5Bsmoser%2C%20bob%5D%5Cn" + "runcmd%3A%20%5B%20%5B%20ls%2C%20-l%20%5D%2C" + "%20echo%20hi%20%5D" + " end_cc" ), { - 'ssh_import_id': ['smoser', 'bob'], - 'runcmd': [['ls', '-l'], 'echo hi'], + "ssh_import_id": ["smoser", "bob"], + "runcmd": [["ls", "-l"], "echo hi"], }, ), # test encoded newlines work. @@ -1098,34 +1090,26 @@ class TestReadCcFromCmdline: # 'ssh_import_id: [smoser, bob]\nruncmd: [ [ ls, -l ], echo hi ]' ( ( - "cc: " - + ( - 'ssh_import_id%3A%20%5Bsmoser%2C%20bob%5D%0A' - 'runcmd%3A%20%5B%20%5B%20ls%2C%20-l%20%5D%2C' - '%20echo%20hi%20%5D' - ) - + ' end_cc' + "cc: " + "ssh_import_id%3A%20%5Bsmoser%2C%20bob%5D%0A" + "runcmd%3A%20%5B%20%5B%20ls%2C%20-l%20%5D%2C" + "%20echo%20hi%20%5D" + " end_cc" ), { - 'ssh_import_id': ['smoser', 'bob'], - 'runcmd': [['ls', '-l'], 'echo hi'], + "ssh_import_id": ["smoser", "bob"], + "runcmd": [["ls", "-l"], "echo hi"], }, ), # Parse and merge multiple yaml content sections. ( - ( - 'cc:ssh_import_id: [smoser, bob] end_cc ' - 'cc: runcmd: [ [ ls, -l ] ] end_cc' - ), - {'ssh_import_id': ['smoser', 'bob'], 'runcmd': [['ls', '-l']]}, + "cc:ssh_import_id: [smoser, bob] end_cc " + "cc: runcmd: [ [ ls, -l ] ] end_cc", + {"ssh_import_id": ["smoser", "bob"], "runcmd": [["ls", "-l"]]}, ), # Parse and merge multiple encoded yaml content sections. ( - ( - 'cc:ssh_import_id%3A%20%5Bsmoser%5D end_cc ' - 'cc:runcmd%3A%20%5B%20%5B%20ls%2C%20-l%20%5D%20%5D end_cc' - ), - {'ssh_import_id': ['smoser'], 'runcmd': [['ls', '-l']]}, + "cc:ssh_import_id%3A%20%5Bsmoser%5D end_cc " + "cc:runcmd%3A%20%5B%20%5B%20ls%2C%20-l%20%5D%20%5D end_cc", + {"ssh_import_id": ["smoser"], "runcmd": [["ls", "-l"]]}, ), ], ) @@ -1189,7 +1173,7 @@ class TestMountCb: ) callback = mock.Mock(autospec=True) - util.mount_cb('/dev/fake0', callback, mtype=mtype) + util.mount_cb("/dev/fake0", callback, mtype=mtype) assert ( mock.call( [ @@ -1473,7 +1457,7 @@ class TestWriteFile(helpers.TestCase): path = os.path.join(self.tmp, "NewFile.txt") contents = "Hey there" - open(path, 'w').close() + open(path, "w").close() os.chmod(path, 0o666) util.write_file(path, contents, preserve_mode=True) @@ -1508,7 +1492,7 @@ class TestWriteFile(helpers.TestCase): fake_se = FakeSelinux(my_file) with mock.patch.object( - importer, 'import_module', return_value=fake_se + importer, "import_module", return_value=fake_se ) as mockobj: with util.SeLinuxGuard(my_file) as is_on: self.assertTrue(is_on) @@ -1516,7 +1500,7 @@ class TestWriteFile(helpers.TestCase): self.assertEqual(1, len(fake_se.restored)) self.assertEqual(my_file, fake_se.restored[0]) - mockobj.assert_called_once_with('selinux') + mockobj.assert_called_once_with("selinux") class TestDeleteDirContents(helpers.TestCase): @@ -1587,7 +1571,7 @@ class TestDeleteDirContents(helpers.TestCase): class TestKeyValStrings(helpers.TestCase): def test_keyval_str_to_dict(self): - expected = {'1': 'one', '2': 'one+one', 'ro': True} + expected = {"1": "one", "2": "one+one", "ro": True} cmdline = "1=one ro 2=one+one" self.assertEqual(expected, util.keyval_str_to_dict(cmdline)) @@ -1595,7 +1579,7 @@ class TestKeyValStrings(helpers.TestCase): class TestGetCmdline(helpers.TestCase): def test_cmdline_reads_debug_env(self): with mock.patch.dict( - "os.environ", values={'DEBUG_PROC_CMDLINE': 'abcd 123'} + "os.environ", values={"DEBUG_PROC_CMDLINE": "abcd 123"} ): ret = util.get_cmdline() self.assertEqual("abcd 123", ret) @@ -1606,13 +1590,13 @@ class TestLoadYaml(helpers.CiTestCase): with_logs = True def test_simple(self): - mydata = {'1': "one", '2': "two"} + mydata = {"1": "one", "2": "two"} self.assertEqual(util.load_yaml(yaml.dump(mydata)), mydata) def test_nonallowed_returns_default(self): - '''Any unallowed types result in returning default; log the issue.''' + """Any unallowed types result in returning default; log the issue.""" # for now, anything not in the allowed list just returns the default. - myyaml = yaml.dump({'1': "one"}) + myyaml = yaml.dump({"1": "one"}) self.assertEqual( util.load_yaml( blob=myyaml, default=self.mydefault, allowed=(str,) @@ -1620,37 +1604,37 @@ class TestLoadYaml(helpers.CiTestCase): self.mydefault, ) regex = re.compile( - r'Yaml load allows \(<(class|type) \'str\'>,\) root types, but' - r' got dict' + r"Yaml load allows \(<(class|type) \'str\'>,\) root types, but" + r" got dict" ) self.assertTrue( regex.search(self.logs.getvalue()), - msg='Missing expected yaml load error', + msg="Missing expected yaml load error", ) def test_bogus_scan_error_returns_default(self): - '''On Yaml scan error, load_yaml returns the default and logs issue.''' + """On Yaml scan error, load_yaml returns the default and logs issue.""" badyaml = "1\n 2:" self.assertEqual( util.load_yaml(blob=badyaml, default=self.mydefault), self.mydefault, ) self.assertIn( - 'Failed loading yaml blob. Invalid format at line 2 column 3:' + "Failed loading yaml blob. Invalid format at line 2 column 3:" ' "mapping values are not allowed here', self.logs.getvalue(), ) def test_bogus_parse_error_returns_default(self): - '''On Yaml parse error, load_yaml returns default and logs issue.''' + """On Yaml parse error, load_yaml returns default and logs issue.""" badyaml = "{}}" self.assertEqual( util.load_yaml(blob=badyaml, default=self.mydefault), self.mydefault, ) self.assertIn( - 'Failed loading yaml blob. Invalid format at line 1 column 3:' - " \"expected \'<document start>\', but found \'}\'", + "Failed loading yaml blob. Invalid format at line 1 column 3:" + " \"expected '<document start>', but found '}'", self.logs.getvalue(), ) @@ -1670,7 +1654,7 @@ class TestLoadYaml(helpers.CiTestCase): def test_python_unicode(self): # complex type of python/unicode is explicitly allowed - myobj = {'1': "FOOBAR"} + myobj = {"1": "FOOBAR"} safe_yaml = yaml.dump(myobj) self.assertEqual( util.load_yaml(blob=safe_yaml, default=self.mydefault), myobj @@ -1694,144 +1678,144 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase): ) elements = line.split() for i in range(len(elements) + 1): - lines = [' '.join(elements[0:i])] + lines = [" ".join(elements[0:i])] if i < 10: expected = None else: - expected = ('/dev/mapper/vg0-root', 'ext4', '/') - self.assertEqual(expected, util.parse_mount_info('/', lines)) + expected = ("/dev/mapper/vg0-root", "ext4", "/") + self.assertEqual(expected, util.parse_mount_info("/", lines)) def test_precise_ext4_root(self): - lines = helpers.readResource('mountinfo_precise_ext4.txt').splitlines() + lines = helpers.readResource("mountinfo_precise_ext4.txt").splitlines() - expected = ('/dev/mapper/vg0-root', 'ext4', '/') - self.assertEqual(expected, util.parse_mount_info('/', lines)) - self.assertEqual(expected, util.parse_mount_info('/usr', lines)) - self.assertEqual(expected, util.parse_mount_info('/usr/bin', lines)) + expected = ("/dev/mapper/vg0-root", "ext4", "/") + self.assertEqual(expected, util.parse_mount_info("/", lines)) + self.assertEqual(expected, util.parse_mount_info("/usr", lines)) + self.assertEqual(expected, util.parse_mount_info("/usr/bin", lines)) - expected = ('/dev/md0', 'ext4', '/boot') - self.assertEqual(expected, util.parse_mount_info('/boot', lines)) - self.assertEqual(expected, util.parse_mount_info('/boot/grub', lines)) + expected = ("/dev/md0", "ext4", "/boot") + self.assertEqual(expected, util.parse_mount_info("/boot", lines)) + self.assertEqual(expected, util.parse_mount_info("/boot/grub", lines)) - expected = ('/dev/mapper/vg0-root', 'ext4', '/') - self.assertEqual(expected, util.parse_mount_info('/home', lines)) - self.assertEqual(expected, util.parse_mount_info('/home/me', lines)) + expected = ("/dev/mapper/vg0-root", "ext4", "/") + self.assertEqual(expected, util.parse_mount_info("/home", lines)) + self.assertEqual(expected, util.parse_mount_info("/home/me", lines)) - expected = ('tmpfs', 'tmpfs', '/run') - self.assertEqual(expected, util.parse_mount_info('/run', lines)) + expected = ("tmpfs", "tmpfs", "/run") + self.assertEqual(expected, util.parse_mount_info("/run", lines)) - expected = ('none', 'tmpfs', '/run/lock') - self.assertEqual(expected, util.parse_mount_info('/run/lock', lines)) + expected = ("none", "tmpfs", "/run/lock") + self.assertEqual(expected, util.parse_mount_info("/run/lock", lines)) def test_raring_btrfs_root(self): - lines = helpers.readResource('mountinfo_raring_btrfs.txt').splitlines() + lines = helpers.readResource("mountinfo_raring_btrfs.txt").splitlines() - expected = ('/dev/vda1', 'btrfs', '/') - self.assertEqual(expected, util.parse_mount_info('/', lines)) - self.assertEqual(expected, util.parse_mount_info('/usr', lines)) - self.assertEqual(expected, util.parse_mount_info('/usr/bin', lines)) - self.assertEqual(expected, util.parse_mount_info('/boot', lines)) - self.assertEqual(expected, util.parse_mount_info('/boot/grub', lines)) + expected = ("/dev/vda1", "btrfs", "/") + self.assertEqual(expected, util.parse_mount_info("/", lines)) + self.assertEqual(expected, util.parse_mount_info("/usr", lines)) + self.assertEqual(expected, util.parse_mount_info("/usr/bin", lines)) + self.assertEqual(expected, util.parse_mount_info("/boot", lines)) + self.assertEqual(expected, util.parse_mount_info("/boot/grub", lines)) - expected = ('/dev/vda1', 'btrfs', '/home') - self.assertEqual(expected, util.parse_mount_info('/home', lines)) - self.assertEqual(expected, util.parse_mount_info('/home/me', lines)) + expected = ("/dev/vda1", "btrfs", "/home") + self.assertEqual(expected, util.parse_mount_info("/home", lines)) + self.assertEqual(expected, util.parse_mount_info("/home/me", lines)) - expected = ('tmpfs', 'tmpfs', '/run') - self.assertEqual(expected, util.parse_mount_info('/run', lines)) + expected = ("tmpfs", "tmpfs", "/run") + self.assertEqual(expected, util.parse_mount_info("/run", lines)) - expected = ('none', 'tmpfs', '/run/lock') - self.assertEqual(expected, util.parse_mount_info('/run/lock', lines)) + expected = ("none", "tmpfs", "/run/lock") + self.assertEqual(expected, util.parse_mount_info("/run/lock", lines)) - @mock.patch('cloudinit.util.os') - @mock.patch('cloudinit.subp.subp') + @mock.patch("cloudinit.util.os") + @mock.patch("cloudinit.subp.subp") def test_get_device_info_from_zpool(self, zpool_output, m_os): # mock /dev/zfs exists m_os.path.exists.return_value = True # mock subp command from util.get_mount_info_fs_on_zpool zpool_output.return_value = ( - helpers.readResource('zpool_status_simple.txt'), - '', + helpers.readResource("zpool_status_simple.txt"), + "", ) # save function return values and do asserts - ret = util.get_device_info_from_zpool('vmzroot') - self.assertEqual('gpt/system', ret) + ret = util.get_device_info_from_zpool("vmzroot") + self.assertEqual("gpt/system", ret) self.assertIsNotNone(ret) - m_os.path.exists.assert_called_with('/dev/zfs') + m_os.path.exists.assert_called_with("/dev/zfs") - @mock.patch('cloudinit.util.os') + @mock.patch("cloudinit.util.os") def test_get_device_info_from_zpool_no_dev_zfs(self, m_os): # mock /dev/zfs missing m_os.path.exists.return_value = False # save function return values and do asserts - ret = util.get_device_info_from_zpool('vmzroot') + ret = util.get_device_info_from_zpool("vmzroot") self.assertIsNone(ret) - @mock.patch('cloudinit.util.os') - @mock.patch('cloudinit.subp.subp') + @mock.patch("cloudinit.util.os") + @mock.patch("cloudinit.subp.subp") def test_get_device_info_from_zpool_handles_no_zpool(self, m_sub, m_os): """Handle case where there is no zpool command""" # mock /dev/zfs exists m_os.path.exists.return_value = True m_sub.side_effect = subp.ProcessExecutionError("No zpool cmd") - ret = util.get_device_info_from_zpool('vmzroot') + ret = util.get_device_info_from_zpool("vmzroot") self.assertIsNone(ret) - @mock.patch('cloudinit.util.os') - @mock.patch('cloudinit.subp.subp') + @mock.patch("cloudinit.util.os") + @mock.patch("cloudinit.subp.subp") def test_get_device_info_from_zpool_on_error(self, zpool_output, m_os): # mock /dev/zfs exists m_os.path.exists.return_value = True # mock subp command from util.get_mount_info_fs_on_zpool zpool_output.return_value = ( - helpers.readResource('zpool_status_simple.txt'), - 'error', + helpers.readResource("zpool_status_simple.txt"), + "error", ) # save function return values and do asserts - ret = util.get_device_info_from_zpool('vmzroot') + ret = util.get_device_info_from_zpool("vmzroot") self.assertIsNone(ret) - @mock.patch('cloudinit.subp.subp') + @mock.patch("cloudinit.subp.subp") def test_parse_mount_with_ext(self, mount_out): mount_out.return_value = ( - helpers.readResource('mount_parse_ext.txt'), - '', + helpers.readResource("mount_parse_ext.txt"), + "", ) # this one is valid and exists in mount_parse_ext.txt - ret = util.parse_mount('/var') - self.assertEqual(('/dev/mapper/vg00-lv_var', 'ext4', '/var'), ret) + ret = util.parse_mount("/var") + self.assertEqual(("/dev/mapper/vg00-lv_var", "ext4", "/var"), ret) # another one that is valid and exists - ret = util.parse_mount('/') - self.assertEqual(('/dev/mapper/vg00-lv_root', 'ext4', '/'), ret) + ret = util.parse_mount("/") + self.assertEqual(("/dev/mapper/vg00-lv_root", "ext4", "/"), ret) # this one exists in mount_parse_ext.txt - ret = util.parse_mount('/sys/kernel/debug') + ret = util.parse_mount("/sys/kernel/debug") self.assertIsNone(ret) # this one does not even exist in mount_parse_ext.txt - ret = util.parse_mount('/not/existing/mount') + ret = util.parse_mount("/not/existing/mount") self.assertIsNone(ret) - @mock.patch('cloudinit.subp.subp') + @mock.patch("cloudinit.subp.subp") def test_parse_mount_with_zfs(self, mount_out): mount_out.return_value = ( - helpers.readResource('mount_parse_zfs.txt'), - '', + helpers.readResource("mount_parse_zfs.txt"), + "", ) # this one is valid and exists in mount_parse_zfs.txt - ret = util.parse_mount('/var') - self.assertEqual(('vmzroot/ROOT/freebsd/var', 'zfs', '/var'), ret) + ret = util.parse_mount("/var") + self.assertEqual(("vmzroot/ROOT/freebsd/var", "zfs", "/var"), ret) # this one is the root, valid and also exists in mount_parse_zfs.txt - ret = util.parse_mount('/') - self.assertEqual(('vmzroot/ROOT/freebsd', 'zfs', '/'), ret) + ret = util.parse_mount("/") + self.assertEqual(("vmzroot/ROOT/freebsd", "zfs", "/"), ret) # this one does not even exist in mount_parse_ext.txt - ret = util.parse_mount('/not/existing/mount') + ret = util.parse_mount("/not/existing/mount") self.assertIsNone(ret) class TestIsX86(helpers.CiTestCase): def test_is_x86_matches_x86_types(self): """is_x86 returns True if CPU architecture matches.""" - matched_arches = ['x86_64', 'i386', 'i586', 'i686'] + matched_arches = ["x86_64", "i386", "i586", "i686"] for arch in matched_arches: self.assertTrue( util.is_x86(arch), 'Expected is_x86 for arch "%s"' % arch @@ -1839,16 +1823,16 @@ class TestIsX86(helpers.CiTestCase): def test_is_x86_unmatched_types(self): """is_x86 returns Fale on non-intel x86 architectures.""" - unmatched_arches = ['ia64', '9000/800', 'arm64v71'] + unmatched_arches = ["ia64", "9000/800", "arm64v71"] for arch in unmatched_arches: self.assertFalse( util.is_x86(arch), 'Expected not is_x86 for arch "%s"' % arch ) - @mock.patch('cloudinit.util.os.uname') + @mock.patch("cloudinit.util.os.uname") def test_is_x86_calls_uname_for_architecture(self, m_uname): """is_x86 returns True if platform from uname matches.""" - m_uname.return_value = [0, 1, 2, 3, 'x86_64'] + m_uname.return_value = [0, 1, 2, 3, "x86_64"] self.assertTrue(util.is_x86()) @@ -1861,18 +1845,18 @@ class TestGetConfigLogfiles(helpers.CiTestCase): def test_default_log_file_present(self): """When default_log_file is set get_config_logfiles finds it.""" self.assertEqual( - ['/my.log'], util.get_config_logfiles({'def_log_file': '/my.log'}) + ["/my.log"], util.get_config_logfiles({"def_log_file": "/my.log"}) ) def test_output_logs_parsed_when_teeing_files(self): """When output configuration is parsed when teeing files.""" self.assertEqual( - ['/himom.log', '/my.log'], + ["/himom.log", "/my.log"], sorted( util.get_config_logfiles( { - 'def_log_file': '/my.log', - 'output': {'all': '|tee -a /himom.log'}, + "def_log_file": "/my.log", + "output": {"all": "|tee -a /himom.log"}, } ) ), @@ -1881,12 +1865,12 @@ class TestGetConfigLogfiles(helpers.CiTestCase): def test_output_logs_parsed_when_redirecting(self): """When output configuration is parsed when redirecting to a file.""" self.assertEqual( - ['/my.log', '/test.log'], + ["/my.log", "/test.log"], sorted( util.get_config_logfiles( { - 'def_log_file': '/my.log', - 'output': {'all': '>/test.log'}, + "def_log_file": "/my.log", + "output": {"all": ">/test.log"}, } ) ), @@ -1895,12 +1879,12 @@ class TestGetConfigLogfiles(helpers.CiTestCase): def test_output_logs_parsed_when_appending(self): """When output configuration is parsed when appending to a file.""" self.assertEqual( - ['/my.log', '/test.log'], + ["/my.log", "/test.log"], sorted( util.get_config_logfiles( { - 'def_log_file': '/my.log', - 'output': {'all': '>> /test.log'}, + "def_log_file": "/my.log", + "output": {"all": ">> /test.log"}, } ) ), @@ -1909,8 +1893,8 @@ class TestGetConfigLogfiles(helpers.CiTestCase): class TestMultiLog(helpers.FilesystemMockingTestCase): def _createConsole(self, root): - os.mkdir(os.path.join(root, 'dev')) - open(os.path.join(root, 'dev', 'console'), 'a').close() + os.mkdir(os.path.join(root, "dev")) + open(os.path.join(root, "dev", "console"), "a").close() def setUp(self): super(TestMultiLog, self).setUp() @@ -1924,37 +1908,37 @@ class TestMultiLog(helpers.FilesystemMockingTestCase): self.patchStdoutAndStderr(self.stdout, self.stderr) def test_stderr_used_by_default(self): - logged_string = 'test stderr output' + logged_string = "test stderr output" util.multi_log(logged_string) self.assertEqual(logged_string, self.stderr.getvalue()) def test_stderr_not_used_if_false(self): - util.multi_log('should not see this', stderr=False) - self.assertEqual('', self.stderr.getvalue()) + util.multi_log("should not see this", stderr=False) + self.assertEqual("", self.stderr.getvalue()) def test_logs_go_to_console_by_default(self): self._createConsole(self.root) - logged_string = 'something very important' + logged_string = "something very important" util.multi_log(logged_string) - self.assertEqual(logged_string, open('/dev/console').read()) + self.assertEqual(logged_string, open("/dev/console").read()) def test_logs_dont_go_to_stdout_if_console_exists(self): self._createConsole(self.root) - util.multi_log('something') - self.assertEqual('', self.stdout.getvalue()) + util.multi_log("something") + self.assertEqual("", self.stdout.getvalue()) def test_logs_go_to_stdout_if_console_does_not_exist(self): - logged_string = 'something very important' + logged_string = "something very important" util.multi_log(logged_string) self.assertEqual(logged_string, self.stdout.getvalue()) def test_logs_dont_go_to_stdout_if_fallback_to_stdout_is_false(self): - util.multi_log('something', fallback_to_stdout=False) - self.assertEqual('', self.stdout.getvalue()) + util.multi_log("something", fallback_to_stdout=False) + self.assertEqual("", self.stdout.getvalue()) def test_logs_go_to_log_if_given(self): log = mock.MagicMock() - logged_string = 'something very important' + logged_string = "something very important" util.multi_log(logged_string, log=log) self.assertEqual( [((mock.ANY, logged_string), {})], log.log.call_args_list @@ -1962,26 +1946,26 @@ class TestMultiLog(helpers.FilesystemMockingTestCase): def test_newlines_stripped_from_log_call(self): log = mock.MagicMock() - expected_string = 'something very important' - util.multi_log('{0}\n'.format(expected_string), log=log) + expected_string = "something very important" + util.multi_log("{0}\n".format(expected_string), log=log) self.assertEqual((mock.ANY, expected_string), log.log.call_args[0]) def test_log_level_defaults_to_debug(self): log = mock.MagicMock() - util.multi_log('message', log=log) + util.multi_log("message", log=log) self.assertEqual((logging.DEBUG, mock.ANY), log.log.call_args[0]) def test_given_log_level_used(self): log = mock.MagicMock() log_level = mock.Mock() - util.multi_log('message', log=log, log_level=log_level) + util.multi_log("message", log=log, log_level=log_level) self.assertEqual((log_level, mock.ANY), log.log.call_args[0]) class TestMessageFromString(helpers.TestCase): def test_unicode_not_messed_up(self): - roundtripped = util.message_from_string('\n').as_string() - self.assertNotIn('\x00', roundtripped) + roundtripped = util.message_from_string("\n").as_string() + self.assertNotIn("\x00", roundtripped) class TestReadSeeded(helpers.TestCase): @@ -1995,12 +1979,12 @@ class TestReadSeeded(helpers.TestCase): vd = b"vendordatablob" helpers.populate_dir( self.tmp, - {'meta-data': "key1: val1", 'user-data': ud, 'vendor-data': vd}, + {"meta-data": "key1: val1", "user-data": ud, "vendor-data": vd}, ) sdir = self.tmp + os.path.sep (found_md, found_ud, found_vd) = util.read_seeded(sdir) - self.assertEqual(found_md, {'key1': 'val1'}) + self.assertEqual(found_md, {"key1": "val1"}) self.assertEqual(found_ud, ud) self.assertEqual(found_vd, vd) @@ -2015,12 +1999,12 @@ class TestReadSeededWithoutVendorData(helpers.TestCase): ud = b"userdatablob" vd = None helpers.populate_dir( - self.tmp, {'meta-data': "key1: val1", 'user-data': ud} + self.tmp, {"meta-data": "key1: val1", "user-data": ud} ) sdir = self.tmp + os.path.sep (found_md, found_ud, found_vd) = util.read_seeded(sdir) - self.assertEqual(found_md, {'key1': 'val1'}) + self.assertEqual(found_md, {"key1": "val1"}) self.assertEqual(found_ud, ud) self.assertEqual(found_vd, vd) @@ -2029,7 +2013,7 @@ class TestEncode(helpers.TestCase): """Test the encoding functions""" def test_decode_binary_plain_text_with_hex(self): - blob = 'BOOTABLE_FLAG=\x80init=/bin/systemd' + blob = "BOOTABLE_FLAG=\x80init=/bin/systemd" text = util.decode_binary(blob) self.assertEqual(text, blob) @@ -2037,20 +2021,20 @@ class TestEncode(helpers.TestCase): class TestProcessExecutionError(helpers.TestCase): template = ( - '{description}\n' - 'Command: {cmd}\n' - 'Exit code: {exit_code}\n' - 'Reason: {reason}\n' - 'Stdout: {stdout}\n' - 'Stderr: {stderr}' + "{description}\n" + "Command: {cmd}\n" + "Exit code: {exit_code}\n" + "Reason: {reason}\n" + "Stdout: {stdout}\n" + "Stderr: {stderr}" ) - empty_attr = '-' - empty_description = 'Unexpected error while running command.' + empty_attr = "-" + empty_description = "Unexpected error while running command." def test_pexec_error_indent_text(self): error = subp.ProcessExecutionError() - msg = 'abc\ndef' - formatted = 'abc\n{0}def'.format(' ' * 4) + msg = "abc\ndef" + formatted = "abc\n{0}def".format(" " * 4) self.assertEqual(error._indent_text(msg, indent_level=4), formatted) self.assertEqual( error._indent_text(msg.encode(), indent_level=4), @@ -2085,9 +2069,9 @@ class TestProcessExecutionError(helpers.TestCase): ) def test_pexec_error_single_line_msgs(self): - stdout_msg = 'out out' - stderr_msg = 'error error' - cmd = 'test command' + stdout_msg = "out out" + stderr_msg = "error error" + cmd = "test command" exit_code = 3 error = subp.ProcessExecutionError( stdout=stdout_msg, stderr=stderr_msg, exit_code=3, cmd=cmd @@ -2106,25 +2090,25 @@ class TestProcessExecutionError(helpers.TestCase): def test_pexec_error_multi_line_msgs(self): # make sure bytes is converted handled properly when formatting - stdout_msg = 'multi\nline\noutput message'.encode() - stderr_msg = 'multi\nline\nerror message\n\n\n' + stdout_msg = "multi\nline\noutput message".encode() + stderr_msg = "multi\nline\nerror message\n\n\n" error = subp.ProcessExecutionError( stdout=stdout_msg, stderr=stderr_msg ) self.assertEqual( str(error), - '\n'.join( + "\n".join( ( - '{description}', - 'Command: {empty_attr}', - 'Exit code: {empty_attr}', - 'Reason: {empty_attr}', - 'Stdout: multi', - ' line', - ' output message', - 'Stderr: multi', - ' line', - ' error message', + "{description}", + "Command: {empty_attr}", + "Exit code: {empty_attr}", + "Reason: {empty_attr}", + "Stdout: multi", + " line", + " output message", + "Stderr: multi", + " line", + " error message", ) ).format( description=self.empty_description, empty_attr=self.empty_attr @@ -2135,31 +2119,31 @@ class TestProcessExecutionError(helpers.TestCase): class TestSystemIsSnappy(helpers.FilesystemMockingTestCase): def test_id_in_os_release_quoted(self): """os-release containing ID="ubuntu-core" is snappy.""" - orcontent = '\n'.join(['ID="ubuntu-core"', '']) + orcontent = "\n".join(['ID="ubuntu-core"', ""]) root_d = self.tmp_dir() - helpers.populate_dir(root_d, {'etc/os-release': orcontent}) + helpers.populate_dir(root_d, {"etc/os-release": orcontent}) self.reRoot(root_d) self.assertTrue(util.system_is_snappy()) def test_id_in_os_release(self): """os-release containing ID=ubuntu-core is snappy.""" - orcontent = '\n'.join(['ID=ubuntu-core', '']) + orcontent = "\n".join(["ID=ubuntu-core", ""]) root_d = self.tmp_dir() - helpers.populate_dir(root_d, {'etc/os-release': orcontent}) + helpers.populate_dir(root_d, {"etc/os-release": orcontent}) self.reRoot(root_d) self.assertTrue(util.system_is_snappy()) - @mock.patch('cloudinit.util.get_cmdline') + @mock.patch("cloudinit.util.get_cmdline") def test_bad_content_in_os_release_no_effect(self, m_cmdline): """malformed os-release should not raise exception.""" - m_cmdline.return_value = 'root=/dev/sda' - orcontent = '\n'.join(['IDubuntu-core', '']) + m_cmdline.return_value = "root=/dev/sda" + orcontent = "\n".join(["IDubuntu-core", ""]) root_d = self.tmp_dir() - helpers.populate_dir(root_d, {'etc/os-release': orcontent}) + helpers.populate_dir(root_d, {"etc/os-release": orcontent}) self.reRoot() self.assertFalse(util.system_is_snappy()) - @mock.patch('cloudinit.util.get_cmdline') + @mock.patch("cloudinit.util.get_cmdline") def test_snap_core_in_cmdline_is_snappy(self, m_cmdline): """The string snap_core= in kernel cmdline indicates snappy.""" cmdline = ( @@ -2172,31 +2156,31 @@ class TestSystemIsSnappy(helpers.FilesystemMockingTestCase): self.assertTrue(util.system_is_snappy()) self.assertTrue(m_cmdline.call_count > 0) - @mock.patch('cloudinit.util.get_cmdline') + @mock.patch("cloudinit.util.get_cmdline") def test_nothing_found_is_not_snappy(self, m_cmdline): """If no positive identification, then not snappy.""" - m_cmdline.return_value = 'root=/dev/sda' + m_cmdline.return_value = "root=/dev/sda" self.reRoot() self.assertFalse(util.system_is_snappy()) self.assertTrue(m_cmdline.call_count > 0) - @mock.patch('cloudinit.util.get_cmdline') + @mock.patch("cloudinit.util.get_cmdline") def test_channel_ini_with_snappy_is_snappy(self, m_cmdline): """A Channel.ini file with 'ubuntu-core' indicates snappy.""" - m_cmdline.return_value = 'root=/dev/sda' + m_cmdline.return_value = "root=/dev/sda" root_d = self.tmp_dir() - content = '\n'.join(["[Foo]", "source = 'ubuntu-core'", ""]) - helpers.populate_dir(root_d, {'etc/system-image/channel.ini': content}) + content = "\n".join(["[Foo]", "source = 'ubuntu-core'", ""]) + helpers.populate_dir(root_d, {"etc/system-image/channel.ini": content}) self.reRoot(root_d) self.assertTrue(util.system_is_snappy()) - @mock.patch('cloudinit.util.get_cmdline') + @mock.patch("cloudinit.util.get_cmdline") def test_system_image_config_dir_is_snappy(self, m_cmdline): """Existence of /etc/system-image/config.d indicates snappy.""" - m_cmdline.return_value = 'root=/dev/sda' + m_cmdline.return_value = "root=/dev/sda" root_d = self.tmp_dir() helpers.populate_dir( - root_d, {'etc/system-image/config.d/my.file': "_unused"} + root_d, {"etc/system-image/config.d/my.file": "_unused"} ) self.reRoot(root_d) self.assertTrue(util.system_is_snappy()) @@ -2206,16 +2190,16 @@ class TestLoadShellContent(helpers.TestCase): def test_comments_handled_correctly(self): """Shell comments should be allowed in the content.""" self.assertEqual( - {'key1': 'val1', 'key2': 'val2', 'key3': 'val3 #tricky'}, + {"key1": "val1", "key2": "val2", "key3": "val3 #tricky"}, util.load_shell_content( - '\n'.join( + "\n".join( [ "#top of file comment", "key1=val1 #this is a comment", "# second comment", 'key2="val2" # inlin comment#badkey=wark', 'key3="val3 #tricky"', - '', + "", ] ) ), @@ -2225,15 +2209,15 @@ class TestLoadShellContent(helpers.TestCase): class TestGetProcEnv(helpers.TestCase): """test get_proc_env.""" - null = b'\x00' - simple1 = b'HOME=/' - simple2 = b'PATH=/bin:/sbin' - bootflag = b'BOOTABLE_FLAG=\x80' # from LP: #1775371 - mixed = b'MIXED=' + b'ab\xccde' + null = b"\x00" + simple1 = b"HOME=/" + simple2 = b"PATH=/bin:/sbin" + bootflag = b"BOOTABLE_FLAG=\x80" # from LP: #1775371 + mixed = b"MIXED=" + b"ab\xccde" - def _val_decoded(self, blob, encoding='utf-8', errors='replace'): + def _val_decoded(self, blob, encoding="utf-8", errors="replace"): # return the value portion of key=val decoded. - return blob.split(b'=', 1)[1].decode(encoding, errors) + return blob.split(b"=", 1)[1].decode(encoding, errors) @mock.patch("cloudinit.util.load_file") def test_non_utf8_in_environment(self, m_load_file): @@ -2245,10 +2229,10 @@ class TestGetProcEnv(helpers.TestCase): self.assertEqual( { - 'BOOTABLE_FLAG': self._val_decoded(self.bootflag), - 'HOME': '/', - 'PATH': '/bin:/sbin', - 'MIXED': self._val_decoded(self.mixed), + "BOOTABLE_FLAG": self._val_decoded(self.bootflag), + "HOME": "/", + "PATH": "/bin:/sbin", + "MIXED": self._val_decoded(self.mixed), }, util.get_proc_env(1), ) @@ -2262,7 +2246,7 @@ class TestGetProcEnv(helpers.TestCase): m_load_file.return_value = content self.assertEqual( - dict([t.split(b'=') for t in lines]), + dict([t.split(b"=") for t in lines]), util.get_proc_env(1, encoding=None), ) self.assertEqual(1, m_load_file.call_count) @@ -2273,7 +2257,7 @@ class TestGetProcEnv(helpers.TestCase): content = self.null.join((self.simple1, self.simple2)) m_load_file.return_value = content self.assertEqual( - {'HOME': '/', 'PATH': '/bin:/sbin'}, util.get_proc_env(1) + {"HOME": "/", "PATH": "/bin:/sbin"}, util.get_proc_env(1) ) self.assertEqual(1, m_load_file.call_count) @@ -2296,13 +2280,13 @@ class TestKernelVersion: """test kernel version function""" params = [ - ('5.6.19-300.fc32.x86_64', (5, 6)), - ('4.15.0-101-generic', (4, 15)), - ('3.10.0-1062.12.1.vz7.131.10', (3, 10)), - ('4.18.0-144.el8.x86_64', (4, 18)), + ("5.6.19-300.fc32.x86_64", (5, 6)), + ("4.15.0-101-generic", (4, 15)), + ("3.10.0-1062.12.1.vz7.131.10", (3, 10)), + ("4.18.0-144.el8.x86_64", (4, 18)), ] - @mock.patch('os.uname') + @mock.patch("os.uname") @pytest.mark.parametrize("uname_release,expected", params) def test_kernel_version(self, m_uname, uname_release, expected): m_uname.return_value.release = uname_release @@ -2310,11 +2294,11 @@ class TestKernelVersion: class TestFindDevs: - @mock.patch('cloudinit.subp.subp') + @mock.patch("cloudinit.subp.subp") def test_find_devs_with(self, m_subp): m_subp.return_value = ( '/dev/sda1: UUID="some-uuid" TYPE="ext4" PARTUUID="some-partid"', - '', + "", ) devlist = util.find_devs_with() assert devlist == [ @@ -2326,32 +2310,32 @@ class TestFindDevs: '/dev/sda1: UUID="some-uuid" TYPE="ext4" PARTUUID="some-partid"' ] - @mock.patch('cloudinit.subp.subp') + @mock.patch("cloudinit.subp.subp") def test_find_devs_with_openbsd(self, m_subp): - m_subp.return_value = ('cd0:,sd0:630d98d32b5d3759,sd1:,fd0:', '') + m_subp.return_value = ("cd0:,sd0:630d98d32b5d3759,sd1:,fd0:", "") devlist = util.find_devs_with_openbsd() - assert devlist == ['/dev/cd0a', '/dev/sd1a', '/dev/sd1i'] + assert devlist == ["/dev/cd0a", "/dev/sd1a", "/dev/sd1i"] - @mock.patch('cloudinit.subp.subp') + @mock.patch("cloudinit.subp.subp") def test_find_devs_with_openbsd_with_criteria(self, m_subp): - m_subp.return_value = ('cd0:,sd0:630d98d32b5d3759,sd1:,fd0:', '') + m_subp.return_value = ("cd0:,sd0:630d98d32b5d3759,sd1:,fd0:", "") devlist = util.find_devs_with_openbsd(criteria="TYPE=iso9660") - assert devlist == ['/dev/cd0a', '/dev/sd1a', '/dev/sd1i'] + assert devlist == ["/dev/cd0a", "/dev/sd1a", "/dev/sd1i"] # lp: #1841466 devlist = util.find_devs_with_openbsd(criteria="LABEL_FATBOOT=A_LABEL") - assert devlist == ['/dev/cd0a', '/dev/sd1a', '/dev/sd1i'] + assert devlist == ["/dev/cd0a", "/dev/sd1a", "/dev/sd1i"] @pytest.mark.parametrize( - 'criteria,expected_devlist', + "criteria,expected_devlist", ( - (None, ['/dev/msdosfs/EFISYS', '/dev/iso9660/config-2']), - ('TYPE=iso9660', ['/dev/iso9660/config-2']), - ('TYPE=vfat', ['/dev/msdosfs/EFISYS']), - ('LABEL_FATBOOT=A_LABEL', []), # lp: #1841466 + (None, ["/dev/msdosfs/EFISYS", "/dev/iso9660/config-2"]), + ("TYPE=iso9660", ["/dev/iso9660/config-2"]), + ("TYPE=vfat", ["/dev/msdosfs/EFISYS"]), + ("LABEL_FATBOOT=A_LABEL", []), # lp: #1841466 ), ) - @mock.patch('glob.glob') + @mock.patch("glob.glob") def test_find_devs_with_freebsd(self, m_glob, criteria, expected_devlist): def fake_glob(pattern): msdos = ["/dev/msdosfs/EFISYS"] @@ -2368,14 +2352,14 @@ class TestFindDevs: assert devlist == expected_devlist @pytest.mark.parametrize( - 'criteria,expected_devlist', + "criteria,expected_devlist", ( - (None, ['/dev/ld0', '/dev/dk0', '/dev/dk1', '/dev/cd0']), - ('TYPE=iso9660', ['/dev/cd0']), - ('TYPE=vfat', ["/dev/ld0", "/dev/dk0", "/dev/dk1"]), + (None, ["/dev/ld0", "/dev/dk0", "/dev/dk1", "/dev/cd0"]), + ("TYPE=iso9660", ["/dev/cd0"]), + ("TYPE=vfat", ["/dev/ld0", "/dev/dk0", "/dev/dk1"]), ( - 'LABEL_FATBOOT=A_LABEL', # lp: #1841466 - ['/dev/ld0', '/dev/dk0', '/dev/dk1', '/dev/cd0'], + "LABEL_FATBOOT=A_LABEL", # lp: #1841466 + ["/dev/ld0", "/dev/dk0", "/dev/dk1", "/dev/cd0"], ), ), ) @@ -2384,39 +2368,31 @@ class TestFindDevs: side_effect_values = [ ("ld0 dk0 dk1 cd0", ""), ( - ( - "mscdlabel: CDIOREADTOCHEADER: " - "Inappropriate ioctl for device\n" - "track (ctl=4) at sector 0\n" - "disklabel not written\n" - ), + "mscdlabel: CDIOREADTOCHEADER: " + "Inappropriate ioctl for device\n" + "track (ctl=4) at sector 0\n" + "disklabel not written\n", "", ), ( - ( - "mscdlabel: CDIOREADTOCHEADER: " - "Inappropriate ioctl for device\n" - "track (ctl=4) at sector 0\n" - "disklabel not written\n" - ), + "mscdlabel: CDIOREADTOCHEADER: " + "Inappropriate ioctl for device\n" + "track (ctl=4) at sector 0\n" + "disklabel not written\n", "", ), ( - ( - "mscdlabel: CDIOREADTOCHEADER: " - "Inappropriate ioctl for device\n" - "track (ctl=4) at sector 0\n" - "disklabel not written\n" - ), + "mscdlabel: CDIOREADTOCHEADER: " + "Inappropriate ioctl for device\n" + "track (ctl=4) at sector 0\n" + "disklabel not written\n", "", ), ( - ( - "track (ctl=4) at sector 0\n" - 'ISO filesystem, label "config-2", ' - "creation time: 2020/03/31 17:29\n" - "adding as 'a'\n" - ), + "track (ctl=4) at sector 0\n" + 'ISO filesystem, label "config-2", ' + "creation time: 2020/03/31 17:29\n" + "adding as 'a'\n", "", ), ] @@ -2425,14 +2401,14 @@ class TestFindDevs: assert devlist == expected_devlist @pytest.mark.parametrize( - 'criteria,expected_devlist', + "criteria,expected_devlist", ( - (None, ['/dev/vbd0', '/dev/cd0', '/dev/acd0']), - ('TYPE=iso9660', ['/dev/cd0', '/dev/acd0']), - ('TYPE=vfat', ['/dev/vbd0']), + (None, ["/dev/vbd0", "/dev/cd0", "/dev/acd0"]), + ("TYPE=iso9660", ["/dev/cd0", "/dev/acd0"]), + ("TYPE=vfat", ["/dev/vbd0"]), ( - 'LABEL_FATBOOT=A_LABEL', # lp: #1841466 - ['/dev/vbd0', '/dev/cd0', '/dev/acd0'], + "LABEL_FATBOOT=A_LABEL", # lp: #1841466 + ["/dev/vbd0", "/dev/cd0", "/dev/acd0"], ), ), ) @@ -2440,7 +2416,7 @@ class TestFindDevs: def test_find_devs_with_dragonflybsd( self, m_subp, criteria, expected_devlist ): - m_subp.return_value = ('md2 md1 cd0 vbd0 acd0 vn3 vn2 vn1 vn0 md0', '') + m_subp.return_value = ("md2 md1 cd0 vbd0 acd0 vn3 vn2 vn1 vn0 md0", "") devlist = util.find_devs_with_dragonflybsd(criteria=criteria) assert devlist == expected_devlist |