summaryrefslogtreecommitdiff
path: root/tests/unittests/config/test_cc_mounts.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/config/test_cc_mounts.py')
-rw-r--r--tests/unittests/config/test_cc_mounts.py449
1 files changed, 255 insertions, 194 deletions
diff --git a/tests/unittests/config/test_cc_mounts.py b/tests/unittests/config/test_cc_mounts.py
index fc65f108..084faacd 100644
--- a/tests/unittests/config/test_cc_mounts.py
+++ b/tests/unittests/config/test_cc_mounts.py
@@ -1,302 +1,363 @@
# This file is part of cloud-init. See LICENSE file for license information.
-import pytest
import os.path
from unittest import mock
-from tests.unittests import helpers as test_helpers
+import pytest
+
from cloudinit.config import cc_mounts
from cloudinit.config.cc_mounts import create_swapfile
from cloudinit.subp import ProcessExecutionError
+from tests.unittests import helpers as test_helpers
-M_PATH = 'cloudinit.config.cc_mounts.'
+M_PATH = "cloudinit.config.cc_mounts."
class TestSanitizeDevname(test_helpers.FilesystemMockingTestCase):
-
def setUp(self):
super(TestSanitizeDevname, self).setUp()
self.new_root = self.tmp_dir()
self.patchOS(self.new_root)
def _touch(self, path):
- path = os.path.join(self.new_root, path.lstrip('/'))
+ path = os.path.join(self.new_root, path.lstrip("/"))
basedir = os.path.dirname(path)
if not os.path.exists(basedir):
os.makedirs(basedir)
- open(path, 'a').close()
+ open(path, "a").close()
def _makedirs(self, directory):
- directory = os.path.join(self.new_root, directory.lstrip('/'))
+ directory = os.path.join(self.new_root, directory.lstrip("/"))
if not os.path.exists(directory):
os.makedirs(directory)
def mock_existence_of_disk(self, disk_path):
self._touch(disk_path)
- self._makedirs(os.path.join('/sys/block', disk_path.split('/')[-1]))
+ self._makedirs(os.path.join("/sys/block", disk_path.split("/")[-1]))
def mock_existence_of_partition(self, disk_path, partition_number):
self.mock_existence_of_disk(disk_path)
self._touch(disk_path + str(partition_number))
- disk_name = disk_path.split('/')[-1]
- self._makedirs(os.path.join('/sys/block',
- disk_name,
- disk_name + str(partition_number)))
+ disk_name = disk_path.split("/")[-1]
+ self._makedirs(
+ os.path.join(
+ "/sys/block", disk_name, disk_name + str(partition_number)
+ )
+ )
def test_existent_full_disk_path_is_returned(self):
- disk_path = '/dev/sda'
+ disk_path = "/dev/sda"
self.mock_existence_of_disk(disk_path)
- self.assertEqual(disk_path,
- cc_mounts.sanitize_devname(disk_path,
- lambda x: None,
- mock.Mock()))
+ self.assertEqual(
+ disk_path,
+ cc_mounts.sanitize_devname(disk_path, lambda x: None, mock.Mock()),
+ )
def test_existent_disk_name_returns_full_path(self):
- disk_name = 'sda'
- disk_path = '/dev/' + disk_name
+ disk_name = "sda"
+ disk_path = "/dev/" + disk_name
self.mock_existence_of_disk(disk_path)
- self.assertEqual(disk_path,
- cc_mounts.sanitize_devname(disk_name,
- lambda x: None,
- mock.Mock()))
+ self.assertEqual(
+ disk_path,
+ cc_mounts.sanitize_devname(disk_name, lambda x: None, mock.Mock()),
+ )
def test_existent_meta_disk_is_returned(self):
- actual_disk_path = '/dev/sda'
+ actual_disk_path = "/dev/sda"
self.mock_existence_of_disk(actual_disk_path)
self.assertEqual(
actual_disk_path,
- cc_mounts.sanitize_devname('ephemeral0',
- lambda x: actual_disk_path,
- mock.Mock()))
+ cc_mounts.sanitize_devname(
+ "ephemeral0", lambda x: actual_disk_path, mock.Mock()
+ ),
+ )
def test_existent_meta_partition_is_returned(self):
- disk_name, partition_part = '/dev/sda', '1'
+ disk_name, partition_part = "/dev/sda", "1"
actual_partition_path = disk_name + partition_part
self.mock_existence_of_partition(disk_name, partition_part)
self.assertEqual(
actual_partition_path,
- cc_mounts.sanitize_devname('ephemeral0.1',
- lambda x: disk_name,
- mock.Mock()))
+ cc_mounts.sanitize_devname(
+ "ephemeral0.1", lambda x: disk_name, mock.Mock()
+ ),
+ )
def test_existent_meta_partition_with_p_is_returned(self):
- disk_name, partition_part = '/dev/sda', 'p1'
+ disk_name, partition_part = "/dev/sda", "p1"
actual_partition_path = disk_name + partition_part
self.mock_existence_of_partition(disk_name, partition_part)
self.assertEqual(
actual_partition_path,
- cc_mounts.sanitize_devname('ephemeral0.1',
- lambda x: disk_name,
- mock.Mock()))
+ cc_mounts.sanitize_devname(
+ "ephemeral0.1", lambda x: disk_name, mock.Mock()
+ ),
+ )
def test_first_partition_returned_if_existent_disk_is_partitioned(self):
- disk_name, partition_part = '/dev/sda', '1'
+ disk_name, partition_part = "/dev/sda", "1"
actual_partition_path = disk_name + partition_part
self.mock_existence_of_partition(disk_name, partition_part)
self.assertEqual(
actual_partition_path,
- cc_mounts.sanitize_devname('ephemeral0',
- lambda x: disk_name,
- mock.Mock()))
+ cc_mounts.sanitize_devname(
+ "ephemeral0", lambda x: disk_name, mock.Mock()
+ ),
+ )
def test_nth_partition_returned_if_requested(self):
- disk_name, partition_part = '/dev/sda', '3'
+ disk_name, partition_part = "/dev/sda", "3"
actual_partition_path = disk_name + partition_part
self.mock_existence_of_partition(disk_name, partition_part)
self.assertEqual(
actual_partition_path,
- cc_mounts.sanitize_devname('ephemeral0.3',
- lambda x: disk_name,
- mock.Mock()))
+ cc_mounts.sanitize_devname(
+ "ephemeral0.3", lambda x: disk_name, mock.Mock()
+ ),
+ )
def test_transformer_returning_none_returns_none(self):
self.assertIsNone(
cc_mounts.sanitize_devname(
- 'ephemeral0', lambda x: None, mock.Mock()))
+ "ephemeral0", lambda x: None, mock.Mock()
+ )
+ )
def test_missing_device_returns_none(self):
self.assertIsNone(
- cc_mounts.sanitize_devname('/dev/sda', None, mock.Mock()))
+ cc_mounts.sanitize_devname("/dev/sda", None, mock.Mock())
+ )
def test_missing_sys_returns_none(self):
- disk_path = '/dev/sda'
+ disk_path = "/dev/sda"
self._makedirs(disk_path)
self.assertIsNone(
- cc_mounts.sanitize_devname(disk_path, None, mock.Mock()))
+ cc_mounts.sanitize_devname(disk_path, None, mock.Mock())
+ )
def test_existent_disk_but_missing_partition_returns_none(self):
- disk_path = '/dev/sda'
+ disk_path = "/dev/sda"
self.mock_existence_of_disk(disk_path)
self.assertIsNone(
cc_mounts.sanitize_devname(
- 'ephemeral0.1', lambda x: disk_path, mock.Mock()))
+ "ephemeral0.1", lambda x: disk_path, mock.Mock()
+ )
+ )
def test_network_device_returns_network_device(self):
- disk_path = 'netdevice:/path'
+ disk_path = "netdevice:/path"
self.assertEqual(
- disk_path,
- cc_mounts.sanitize_devname(disk_path, None, mock.Mock()))
+ disk_path, cc_mounts.sanitize_devname(disk_path, None, mock.Mock())
+ )
def test_device_aliases_remapping(self):
- disk_path = '/dev/sda'
+ disk_path = "/dev/sda"
self.mock_existence_of_disk(disk_path)
- self.assertEqual(disk_path,
- cc_mounts.sanitize_devname('mydata',
- lambda x: None,
- mock.Mock(),
- {'mydata': disk_path}))
+ self.assertEqual(
+ disk_path,
+ cc_mounts.sanitize_devname(
+ "mydata", lambda x: None, mock.Mock(), {"mydata": disk_path}
+ ),
+ )
class TestSwapFileCreation(test_helpers.FilesystemMockingTestCase):
-
def setUp(self):
super(TestSwapFileCreation, self).setUp()
self.new_root = self.tmp_dir()
self.patchOS(self.new_root)
- self.fstab_path = os.path.join(self.new_root, 'etc/fstab')
- self.swap_path = os.path.join(self.new_root, 'swap.img')
- self._makedirs('/etc')
+ self.fstab_path = os.path.join(self.new_root, "etc/fstab")
+ self.swap_path = os.path.join(self.new_root, "swap.img")
+ self._makedirs("/etc")
- self.add_patch('cloudinit.config.cc_mounts.FSTAB_PATH',
- 'mock_fstab_path',
- self.fstab_path,
- autospec=False)
-
- self.add_patch('cloudinit.config.cc_mounts.subp.subp',
- 'm_subp_subp')
+ self.add_patch(
+ "cloudinit.config.cc_mounts.FSTAB_PATH",
+ "mock_fstab_path",
+ self.fstab_path,
+ autospec=False,
+ )
- self.add_patch('cloudinit.config.cc_mounts.util.mounts',
- 'mock_util_mounts',
- return_value={
- '/dev/sda1': {'fstype': 'ext4',
- 'mountpoint': '/',
- 'opts': 'rw,relatime,discard'
- }})
+ self.add_patch("cloudinit.config.cc_mounts.subp.subp", "m_subp_subp")
+
+ self.add_patch(
+ "cloudinit.config.cc_mounts.util.mounts",
+ "mock_util_mounts",
+ return_value={
+ "/dev/sda1": {
+ "fstype": "ext4",
+ "mountpoint": "/",
+ "opts": "rw,relatime,discard",
+ }
+ },
+ )
self.mock_cloud = mock.Mock()
self.mock_log = mock.Mock()
self.mock_cloud.device_name_to_device = self.device_name_to_device
self.cc = {
- 'swap': {
- 'filename': self.swap_path,
- 'size': '512',
- 'maxsize': '512'}}
+ "swap": {
+ "filename": self.swap_path,
+ "size": "512",
+ "maxsize": "512",
+ }
+ }
def _makedirs(self, directory):
- directory = os.path.join(self.new_root, directory.lstrip('/'))
+ directory = os.path.join(self.new_root, directory.lstrip("/"))
if not os.path.exists(directory):
os.makedirs(directory)
def device_name_to_device(self, path):
- if path == 'swap':
+ if path == "swap":
return self.swap_path
else:
dev = None
return dev
- @mock.patch('cloudinit.util.get_mount_info')
- @mock.patch('cloudinit.util.kernel_version')
- def test_swap_creation_method_fallocate_on_xfs(self, m_kernel_version,
- m_get_mount_info):
+ @mock.patch("cloudinit.util.get_mount_info")
+ @mock.patch("cloudinit.util.kernel_version")
+ def test_swap_creation_method_fallocate_on_xfs(
+ self, m_kernel_version, m_get_mount_info
+ ):
m_kernel_version.return_value = (4, 20)
m_get_mount_info.return_value = ["", "xfs"]
cc_mounts.handle(None, self.cc, self.mock_cloud, self.mock_log, [])
- self.m_subp_subp.assert_has_calls([
- mock.call(['fallocate', '-l', '0M', self.swap_path], capture=True),
- mock.call(['mkswap', self.swap_path]),
- mock.call(['swapon', '-a'])])
-
- @mock.patch('cloudinit.util.get_mount_info')
- @mock.patch('cloudinit.util.kernel_version')
- def test_swap_creation_method_xfs(self, m_kernel_version,
- m_get_mount_info):
+ self.m_subp_subp.assert_has_calls(
+ [
+ mock.call(
+ ["fallocate", "-l", "0M", self.swap_path], capture=True
+ ),
+ mock.call(["mkswap", self.swap_path]),
+ mock.call(["swapon", "-a"]),
+ ]
+ )
+
+ @mock.patch("cloudinit.util.get_mount_info")
+ @mock.patch("cloudinit.util.kernel_version")
+ def test_swap_creation_method_xfs(
+ self, m_kernel_version, m_get_mount_info
+ ):
m_kernel_version.return_value = (3, 18)
m_get_mount_info.return_value = ["", "xfs"]
cc_mounts.handle(None, self.cc, self.mock_cloud, self.mock_log, [])
- self.m_subp_subp.assert_has_calls([
- mock.call(['dd', 'if=/dev/zero',
- 'of=' + self.swap_path,
- 'bs=1M', 'count=0'], capture=True),
- mock.call(['mkswap', self.swap_path]),
- mock.call(['swapon', '-a'])])
-
- @mock.patch('cloudinit.util.get_mount_info')
- @mock.patch('cloudinit.util.kernel_version')
- def test_swap_creation_method_btrfs(self, m_kernel_version,
- m_get_mount_info):
+ self.m_subp_subp.assert_has_calls(
+ [
+ mock.call(
+ [
+ "dd",
+ "if=/dev/zero",
+ "of=" + self.swap_path,
+ "bs=1M",
+ "count=0",
+ ],
+ capture=True,
+ ),
+ mock.call(["mkswap", self.swap_path]),
+ mock.call(["swapon", "-a"]),
+ ]
+ )
+
+ @mock.patch("cloudinit.util.get_mount_info")
+ @mock.patch("cloudinit.util.kernel_version")
+ def test_swap_creation_method_btrfs(
+ self, m_kernel_version, m_get_mount_info
+ ):
m_kernel_version.return_value = (4, 20)
m_get_mount_info.return_value = ["", "btrfs"]
cc_mounts.handle(None, self.cc, self.mock_cloud, self.mock_log, [])
- self.m_subp_subp.assert_has_calls([
- mock.call(['dd', 'if=/dev/zero',
- 'of=' + self.swap_path,
- 'bs=1M', 'count=0'], capture=True),
- mock.call(['mkswap', self.swap_path]),
- mock.call(['swapon', '-a'])])
-
- @mock.patch('cloudinit.util.get_mount_info')
- @mock.patch('cloudinit.util.kernel_version')
- def test_swap_creation_method_ext4(self, m_kernel_version,
- m_get_mount_info):
+ self.m_subp_subp.assert_has_calls(
+ [
+ mock.call(
+ [
+ "dd",
+ "if=/dev/zero",
+ "of=" + self.swap_path,
+ "bs=1M",
+ "count=0",
+ ],
+ capture=True,
+ ),
+ mock.call(["mkswap", self.swap_path]),
+ mock.call(["swapon", "-a"]),
+ ]
+ )
+
+ @mock.patch("cloudinit.util.get_mount_info")
+ @mock.patch("cloudinit.util.kernel_version")
+ def test_swap_creation_method_ext4(
+ self, m_kernel_version, m_get_mount_info
+ ):
m_kernel_version.return_value = (5, 14)
m_get_mount_info.return_value = ["", "ext4"]
cc_mounts.handle(None, self.cc, self.mock_cloud, self.mock_log, [])
- self.m_subp_subp.assert_has_calls([
- mock.call(['fallocate', '-l', '0M', self.swap_path], capture=True),
- mock.call(['mkswap', self.swap_path]),
- mock.call(['swapon', '-a'])])
+ self.m_subp_subp.assert_has_calls(
+ [
+ mock.call(
+ ["fallocate", "-l", "0M", self.swap_path], capture=True
+ ),
+ mock.call(["mkswap", self.swap_path]),
+ mock.call(["swapon", "-a"]),
+ ]
+ )
class TestFstabHandling(test_helpers.FilesystemMockingTestCase):
- swap_path = '/dev/sdb1'
+ swap_path = "/dev/sdb1"
def setUp(self):
super(TestFstabHandling, self).setUp()
self.new_root = self.tmp_dir()
self.patchOS(self.new_root)
- self.fstab_path = os.path.join(self.new_root, 'etc/fstab')
- self._makedirs('/etc')
-
- self.add_patch('cloudinit.config.cc_mounts.FSTAB_PATH',
- 'mock_fstab_path',
- self.fstab_path,
- autospec=False)
+ self.fstab_path = os.path.join(self.new_root, "etc/fstab")
+ self._makedirs("/etc")
- self.add_patch('cloudinit.config.cc_mounts._is_block_device',
- 'mock_is_block_device',
- return_value=True)
+ self.add_patch(
+ "cloudinit.config.cc_mounts.FSTAB_PATH",
+ "mock_fstab_path",
+ self.fstab_path,
+ autospec=False,
+ )
- self.add_patch('cloudinit.config.cc_mounts.subp.subp',
- 'm_subp_subp')
+ self.add_patch(
+ "cloudinit.config.cc_mounts._is_block_device",
+ "mock_is_block_device",
+ return_value=True,
+ )
- self.add_patch('cloudinit.config.cc_mounts.util.mounts',
- 'mock_util_mounts',
- return_value={
- '/dev/sda1': {'fstype': 'ext4',
- 'mountpoint': '/',
- 'opts': 'rw,relatime,discard'
- }})
+ self.add_patch("cloudinit.config.cc_mounts.subp.subp", "m_subp_subp")
+
+ self.add_patch(
+ "cloudinit.config.cc_mounts.util.mounts",
+ "mock_util_mounts",
+ return_value={
+ "/dev/sda1": {
+ "fstype": "ext4",
+ "mountpoint": "/",
+ "opts": "rw,relatime,discard",
+ }
+ },
+ )
self.mock_cloud = mock.Mock()
self.mock_log = mock.Mock()
self.mock_cloud.device_name_to_device = self.device_name_to_device
def _makedirs(self, directory):
- directory = os.path.join(self.new_root, directory.lstrip('/'))
+ directory = os.path.join(self.new_root, directory.lstrip("/"))
if not os.path.exists(directory):
os.makedirs(directory)
def device_name_to_device(self, path):
- if path == 'swap':
+ if path == "swap":
return self.swap_path
else:
dev = None
@@ -304,127 +365,126 @@ class TestFstabHandling(test_helpers.FilesystemMockingTestCase):
return dev
def test_no_fstab(self):
- """ Handle images which do not include an fstab. """
+ """Handle images which do not include an fstab."""
self.assertFalse(os.path.exists(cc_mounts.FSTAB_PATH))
fstab_expected_content = (
- '%s\tnone\tswap\tsw,comment=cloudconfig\t'
- '0\t0\n' % (self.swap_path,)
+ "%s\tnone\tswap\tsw,comment=cloudconfig\t0\t0\n"
+ % (self.swap_path,)
)
cc_mounts.handle(None, {}, self.mock_cloud, self.mock_log, [])
- with open(cc_mounts.FSTAB_PATH, 'r') as fd:
+ with open(cc_mounts.FSTAB_PATH, "r") as fd:
fstab_new_content = fd.read()
self.assertEqual(fstab_expected_content, fstab_new_content)
def test_swap_integrity(self):
- '''Ensure that the swap file is correctly created and can
+ """Ensure that the swap file is correctly created and can
swapon successfully. Fixing the corner case of:
- kernel: swapon: swapfile has holes'''
+ kernel: swapon: swapfile has holes"""
- fstab = '/swap.img swap swap defaults 0 0\n'
+ fstab = "/swap.img swap swap defaults 0 0\n"
- with open(cc_mounts.FSTAB_PATH, 'w') as fd:
+ with open(cc_mounts.FSTAB_PATH, "w") as fd:
fd.write(fstab)
- cc = {'swap': ['filename: /swap.img', 'size: 512', 'maxsize: 512']}
+ cc = {"swap": ["filename: /swap.img", "size: 512", "maxsize: 512"]}
cc_mounts.handle(None, cc, self.mock_cloud, self.mock_log, [])
def test_fstab_no_swap_device(self):
- '''Ensure that cloud-init adds a discovered swap partition
- to /etc/fstab.'''
+ """Ensure that cloud-init adds a discovered swap partition
+ to /etc/fstab."""
- fstab_original_content = ''
+ fstab_original_content = ""
fstab_expected_content = (
- '%s\tnone\tswap\tsw,comment=cloudconfig\t'
- '0\t0\n' % (self.swap_path,)
+ "%s\tnone\tswap\tsw,comment=cloudconfig\t0\t0\n"
+ % (self.swap_path,)
)
- with open(cc_mounts.FSTAB_PATH, 'w') as fd:
+ with open(cc_mounts.FSTAB_PATH, "w") as fd:
fd.write(fstab_original_content)
cc_mounts.handle(None, {}, self.mock_cloud, self.mock_log, [])
- with open(cc_mounts.FSTAB_PATH, 'r') as fd:
+ with open(cc_mounts.FSTAB_PATH, "r") as fd:
fstab_new_content = fd.read()
self.assertEqual(fstab_expected_content, fstab_new_content)
def test_fstab_same_swap_device_already_configured(self):
- '''Ensure that cloud-init will not add a swap device if the same
- device already exists in /etc/fstab.'''
+ """Ensure that cloud-init will not add a swap device if the same
+ device already exists in /etc/fstab."""
- fstab_original_content = '%s swap swap defaults 0 0\n' % (
- self.swap_path,)
+ fstab_original_content = "%s swap swap defaults 0 0\n" % (
+ self.swap_path,
+ )
fstab_expected_content = fstab_original_content
- with open(cc_mounts.FSTAB_PATH, 'w') as fd:
+ with open(cc_mounts.FSTAB_PATH, "w") as fd:
fd.write(fstab_original_content)
cc_mounts.handle(None, {}, self.mock_cloud, self.mock_log, [])
- with open(cc_mounts.FSTAB_PATH, 'r') as fd:
+ with open(cc_mounts.FSTAB_PATH, "r") as fd:
fstab_new_content = fd.read()
self.assertEqual(fstab_expected_content, fstab_new_content)
def test_fstab_alternate_swap_device_already_configured(self):
- '''Ensure that cloud-init will add a discovered swap device to
+ """Ensure that cloud-init will add a discovered swap device to
/etc/fstab even when there exists a swap definition on another
- device.'''
+ device."""
- fstab_original_content = '/dev/sdc1 swap swap defaults 0 0\n'
+ fstab_original_content = "/dev/sdc1 swap swap defaults 0 0\n"
fstab_expected_content = (
- fstab_original_content +
- '%s\tnone\tswap\tsw,comment=cloudconfig\t'
- '0\t0\n' % (self.swap_path,)
+ fstab_original_content
+ + "%s\tnone\tswap\tsw,comment=cloudconfig\t0\t0\n"
+ % (self.swap_path,)
)
- with open(cc_mounts.FSTAB_PATH, 'w') as fd:
+ with open(cc_mounts.FSTAB_PATH, "w") as fd:
fd.write(fstab_original_content)
cc_mounts.handle(None, {}, self.mock_cloud, self.mock_log, [])
- with open(cc_mounts.FSTAB_PATH, 'r') as fd:
+ with open(cc_mounts.FSTAB_PATH, "r") as fd:
fstab_new_content = fd.read()
self.assertEqual(fstab_expected_content, fstab_new_content)
def test_no_change_fstab_sets_needs_mount_all(self):
- '''verify unchanged fstab entries are mounted if not call mount -a'''
+ """verify unchanged fstab entries are mounted if not call mount -a"""
fstab_original_content = (
- 'LABEL=cloudimg-rootfs / ext4 defaults 0 0\n'
- 'LABEL=UEFI /boot/efi vfat defaults 0 0\n'
- '/dev/vdb /mnt auto defaults,noexec,comment=cloudconfig 0 2\n'
+ "LABEL=cloudimg-rootfs / ext4 defaults 0 0\n"
+ "LABEL=UEFI /boot/efi vfat defaults 0 0\n"
+ "/dev/vdb /mnt auto defaults,noexec,comment=cloudconfig 0 2\n"
)
fstab_expected_content = fstab_original_content
- cc = {
- 'mounts': [
- ['/dev/vdb', '/mnt', 'auto', 'defaults,noexec']
- ]
- }
- with open(cc_mounts.FSTAB_PATH, 'w') as fd:
+ cc = {"mounts": [["/dev/vdb", "/mnt", "auto", "defaults,noexec"]]}
+ with open(cc_mounts.FSTAB_PATH, "w") as fd:
fd.write(fstab_original_content)
- with open(cc_mounts.FSTAB_PATH, 'r') as fd:
+ with open(cc_mounts.FSTAB_PATH, "r") as fd:
fstab_new_content = fd.read()
self.assertEqual(fstab_expected_content, fstab_new_content)
cc_mounts.handle(None, cc, self.mock_cloud, self.mock_log, [])
- self.m_subp_subp.assert_has_calls([
- mock.call(['mount', '-a']),
- mock.call(['systemctl', 'daemon-reload'])])
+ self.m_subp_subp.assert_has_calls(
+ [
+ mock.call(["mount", "-a"]),
+ mock.call(["systemctl", "daemon-reload"]),
+ ]
+ )
class TestCreateSwapfile:
-
- @pytest.mark.parametrize('fstype', ('xfs', 'btrfs', 'ext4', 'other'))
- @mock.patch(M_PATH + 'util.get_mount_info')
- @mock.patch(M_PATH + 'subp.subp')
+ @pytest.mark.parametrize("fstype", ("xfs", "btrfs", "ext4", "other"))
+ @mock.patch(M_PATH + "util.get_mount_info")
+ @mock.patch(M_PATH + "subp.subp")
def test_happy_path(self, m_subp, m_get_mount_info, fstype, tmpdir):
swap_file = tmpdir.join("swap-file")
fname = str(swap_file)
# Some of the calls to subp.subp should create the swap file; this
# roughly approximates that
- m_subp.side_effect = lambda *args, **kwargs: swap_file.write('')
+ m_subp.side_effect = lambda *args, **kwargs: swap_file.write("")
m_get_mount_info.return_value = (mock.ANY, fstype)
- create_swapfile(fname, '')
- assert mock.call(['mkswap', fname]) in m_subp.call_args_list
+ create_swapfile(fname, "")
+ assert mock.call(["mkswap", fname]) in m_subp.call_args_list
@mock.patch(M_PATH + "util.get_mount_info")
@mock.patch(M_PATH + "subp.subp")
@@ -458,4 +518,5 @@ class TestCreateSwapfile:
msg = "fallocate swap creation failed, will attempt with dd"
assert msg in caplog.text
+
# vi: ts=4 expandtab