# This file is part of cloud-init. See LICENSE file for license information. import pytest import os.path from unittest import mock from tests.unittests import helpers as test_helpers from cloudinit.config import cc_mounts from cloudinit.config.cc_mounts import create_swapfile from cloudinit.subp import ProcessExecutionError M_PATH = 'cloudinit.config.cc_mounts.' class TestSanitizeDevname(test_helpers.FilesystemMockingTestCase): def setUp(self): super(TestSanitizeDevname, self).setUp() self.new_root = self.tmp_dir() self.patchOS(self.new_root) def _touch(self, path): path = os.path.join(self.new_root, path.lstrip('/')) basedir = os.path.dirname(path) if not os.path.exists(basedir): os.makedirs(basedir) open(path, 'a').close() def _makedirs(self, directory): directory = os.path.join(self.new_root, directory.lstrip('/')) if not os.path.exists(directory): os.makedirs(directory) def mock_existence_of_disk(self, disk_path): self._touch(disk_path) self._makedirs(os.path.join('/sys/block', disk_path.split('/')[-1])) def mock_existence_of_partition(self, disk_path, partition_number): self.mock_existence_of_disk(disk_path) self._touch(disk_path + str(partition_number)) disk_name = disk_path.split('/')[-1] self._makedirs(os.path.join('/sys/block', disk_name, disk_name + str(partition_number))) def test_existent_full_disk_path_is_returned(self): disk_path = '/dev/sda' self.mock_existence_of_disk(disk_path) self.assertEqual(disk_path, cc_mounts.sanitize_devname(disk_path, lambda x: None, mock.Mock())) def test_existent_disk_name_returns_full_path(self): disk_name = 'sda' disk_path = '/dev/' + disk_name self.mock_existence_of_disk(disk_path) self.assertEqual(disk_path, cc_mounts.sanitize_devname(disk_name, lambda x: None, mock.Mock())) def test_existent_meta_disk_is_returned(self): actual_disk_path = '/dev/sda' self.mock_existence_of_disk(actual_disk_path) self.assertEqual( actual_disk_path, cc_mounts.sanitize_devname('ephemeral0', lambda x: actual_disk_path, mock.Mock())) def test_existent_meta_partition_is_returned(self): disk_name, partition_part = '/dev/sda', '1' actual_partition_path = disk_name + partition_part self.mock_existence_of_partition(disk_name, partition_part) self.assertEqual( actual_partition_path, cc_mounts.sanitize_devname('ephemeral0.1', lambda x: disk_name, mock.Mock())) def test_existent_meta_partition_with_p_is_returned(self): disk_name, partition_part = '/dev/sda', 'p1' actual_partition_path = disk_name + partition_part self.mock_existence_of_partition(disk_name, partition_part) self.assertEqual( actual_partition_path, cc_mounts.sanitize_devname('ephemeral0.1', lambda x: disk_name, mock.Mock())) def test_first_partition_returned_if_existent_disk_is_partitioned(self): disk_name, partition_part = '/dev/sda', '1' actual_partition_path = disk_name + partition_part self.mock_existence_of_partition(disk_name, partition_part) self.assertEqual( actual_partition_path, cc_mounts.sanitize_devname('ephemeral0', lambda x: disk_name, mock.Mock())) def test_nth_partition_returned_if_requested(self): disk_name, partition_part = '/dev/sda', '3' actual_partition_path = disk_name + partition_part self.mock_existence_of_partition(disk_name, partition_part) self.assertEqual( actual_partition_path, cc_mounts.sanitize_devname('ephemeral0.3', lambda x: disk_name, mock.Mock())) def test_transformer_returning_none_returns_none(self): self.assertIsNone( cc_mounts.sanitize_devname( 'ephemeral0', lambda x: None, mock.Mock())) def test_missing_device_returns_none(self): self.assertIsNone( cc_mounts.sanitize_devname('/dev/sda', None, mock.Mock())) def test_missing_sys_returns_none(self): disk_path = '/dev/sda' self._makedirs(disk_path) self.assertIsNone( cc_mounts.sanitize_devname(disk_path, None, mock.Mock())) def test_existent_disk_but_missing_partition_returns_none(self): disk_path = '/dev/sda' self.mock_existence_of_disk(disk_path) self.assertIsNone( cc_mounts.sanitize_devname( 'ephemeral0.1', lambda x: disk_path, mock.Mock())) def test_network_device_returns_network_device(self): disk_path = 'netdevice:/path' self.assertEqual( disk_path, cc_mounts.sanitize_devname(disk_path, None, mock.Mock())) def test_device_aliases_remapping(self): disk_path = '/dev/sda' self.mock_existence_of_disk(disk_path) self.assertEqual(disk_path, cc_mounts.sanitize_devname('mydata', lambda x: None, mock.Mock(), {'mydata': disk_path})) class TestSwapFileCreation(test_helpers.FilesystemMockingTestCase): def setUp(self): super(TestSwapFileCreation, self).setUp() self.new_root = self.tmp_dir() self.patchOS(self.new_root) self.fstab_path = os.path.join(self.new_root, 'etc/fstab') self.swap_path = os.path.join(self.new_root, 'swap.img') self._makedirs('/etc') self.add_patch('cloudinit.config.cc_mounts.FSTAB_PATH', 'mock_fstab_path', self.fstab_path, autospec=False) self.add_patch('cloudinit.config.cc_mounts.subp.subp', 'm_subp_subp') self.add_patch('cloudinit.config.cc_mounts.util.mounts', 'mock_util_mounts', return_value={ '/dev/sda1': {'fstype': 'ext4', 'mountpoint': '/', 'opts': 'rw,relatime,discard' }}) self.mock_cloud = mock.Mock() self.mock_log = mock.Mock() self.mock_cloud.device_name_to_device = self.device_name_to_device self.cc = { 'swap': { 'filename': self.swap_path, 'size': '512', 'maxsize': '512'}} def _makedirs(self, directory): directory = os.path.join(self.new_root, directory.lstrip('/')) if not os.path.exists(directory): os.makedirs(directory) def device_name_to_device(self, path): if path == 'swap': return self.swap_path else: dev = None return dev @mock.patch('cloudinit.util.get_mount_info') @mock.patch('cloudinit.util.kernel_version') def test_swap_creation_method_fallocate_on_xfs(self, m_kernel_version, m_get_mount_info): m_kernel_version.return_value = (4, 20) m_get_mount_info.return_value = ["", "xfs"] cc_mounts.handle(None, self.cc, self.mock_cloud, self.mock_log, []) self.m_subp_subp.assert_has_calls([ mock.call(['fallocate', '-l', '0M', self.swap_path], capture=True), mock.call(['mkswap', self.swap_path]), mock.call(['swapon', '-a'])]) @mock.patch('cloudinit.util.get_mount_info') @mock.patch('cloudinit.util.kernel_version') def test_swap_creation_method_xfs(self, m_kernel_version, m_get_mount_info): m_kernel_version.return_value = (3, 18) m_get_mount_info.return_value = ["", "xfs"] cc_mounts.handle(None, self.cc, self.mock_cloud, self.mock_log, []) self.m_subp_subp.assert_has_calls([ mock.call(['dd', 'if=/dev/zero', 'of=' + self.swap_path, 'bs=1M', 'count=0'], capture=True), mock.call(['mkswap', self.swap_path]), mock.call(['swapon', '-a'])]) @mock.patch('cloudinit.util.get_mount_info') @mock.patch('cloudinit.util.kernel_version') def test_swap_creation_method_btrfs(self, m_kernel_version, m_get_mount_info): m_kernel_version.return_value = (4, 20) m_get_mount_info.return_value = ["", "btrfs"] cc_mounts.handle(None, self.cc, self.mock_cloud, self.mock_log, []) self.m_subp_subp.assert_has_calls([ mock.call(['dd', 'if=/dev/zero', 'of=' + self.swap_path, 'bs=1M', 'count=0'], capture=True), mock.call(['mkswap', self.swap_path]), mock.call(['swapon', '-a'])]) @mock.patch('cloudinit.util.get_mount_info') @mock.patch('cloudinit.util.kernel_version') def test_swap_creation_method_ext4(self, m_kernel_version, m_get_mount_info): m_kernel_version.return_value = (5, 14) m_get_mount_info.return_value = ["", "ext4"] cc_mounts.handle(None, self.cc, self.mock_cloud, self.mock_log, []) self.m_subp_subp.assert_has_calls([ mock.call(['fallocate', '-l', '0M', self.swap_path], capture=True), mock.call(['mkswap', self.swap_path]), mock.call(['swapon', '-a'])]) class TestFstabHandling(test_helpers.FilesystemMockingTestCase): swap_path = '/dev/sdb1' def setUp(self): super(TestFstabHandling, self).setUp() self.new_root = self.tmp_dir() self.patchOS(self.new_root) self.fstab_path = os.path.join(self.new_root, 'etc/fstab') self._makedirs('/etc') self.add_patch('cloudinit.config.cc_mounts.FSTAB_PATH', 'mock_fstab_path', self.fstab_path, autospec=False) self.add_patch('cloudinit.config.cc_mounts._is_block_device', 'mock_is_block_device', return_value=True) self.add_patch('cloudinit.config.cc_mounts.subp.subp', 'm_subp_subp') self.add_patch('cloudinit.config.cc_mounts.util.mounts', 'mock_util_mounts', return_value={ '/dev/sda1': {'fstype': 'ext4', 'mountpoint': '/', 'opts': 'rw,relatime,discard' }}) self.mock_cloud = mock.Mock() self.mock_log = mock.Mock() self.mock_cloud.device_name_to_device = self.device_name_to_device def _makedirs(self, directory): directory = os.path.join(self.new_root, directory.lstrip('/')) if not os.path.exists(directory): os.makedirs(directory) def device_name_to_device(self, path): if path == 'swap': return self.swap_path else: dev = None return dev def test_no_fstab(self): """ Handle images which do not include an fstab. """ self.assertFalse(os.path.exists(cc_mounts.FSTAB_PATH)) fstab_expected_content = ( '%s\tnone\tswap\tsw,comment=cloudconfig\t' '0\t0\n' % (self.swap_path,) ) cc_mounts.handle(None, {}, self.mock_cloud, self.mock_log, []) with open(cc_mounts.FSTAB_PATH, 'r') as fd: fstab_new_content = fd.read() self.assertEqual(fstab_expected_content, fstab_new_content) def test_swap_integrity(self): '''Ensure that the swap file is correctly created and can swapon successfully. Fixing the corner case of: kernel: swapon: swapfile has holes''' fstab = '/swap.img swap swap defaults 0 0\n' with open(cc_mounts.FSTAB_PATH, 'w') as fd: fd.write(fstab) cc = {'swap': ['filename: /swap.img', 'size: 512', 'maxsize: 512']} cc_mounts.handle(None, cc, self.mock_cloud, self.mock_log, []) def test_fstab_no_swap_device(self): '''Ensure that cloud-init adds a discovered swap partition to /etc/fstab.''' fstab_original_content = '' fstab_expected_content = ( '%s\tnone\tswap\tsw,comment=cloudconfig\t' '0\t0\n' % (self.swap_path,) ) with open(cc_mounts.FSTAB_PATH, 'w') as fd: fd.write(fstab_original_content) cc_mounts.handle(None, {}, self.mock_cloud, self.mock_log, []) with open(cc_mounts.FSTAB_PATH, 'r') as fd: fstab_new_content = fd.read() self.assertEqual(fstab_expected_content, fstab_new_content) def test_fstab_same_swap_device_already_configured(self): '''Ensure that cloud-init will not add a swap device if the same device already exists in /etc/fstab.''' fstab_original_content = '%s swap swap defaults 0 0\n' % ( self.swap_path,) fstab_expected_content = fstab_original_content with open(cc_mounts.FSTAB_PATH, 'w') as fd: fd.write(fstab_original_content) cc_mounts.handle(None, {}, self.mock_cloud, self.mock_log, []) with open(cc_mounts.FSTAB_PATH, 'r') as fd: fstab_new_content = fd.read() self.assertEqual(fstab_expected_content, fstab_new_content) def test_fstab_alternate_swap_device_already_configured(self): '''Ensure that cloud-init will add a discovered swap device to /etc/fstab even when there exists a swap definition on another device.''' fstab_original_content = '/dev/sdc1 swap swap defaults 0 0\n' fstab_expected_content = ( fstab_original_content + '%s\tnone\tswap\tsw,comment=cloudconfig\t' '0\t0\n' % (self.swap_path,) ) with open(cc_mounts.FSTAB_PATH, 'w') as fd: fd.write(fstab_original_content) cc_mounts.handle(None, {}, self.mock_cloud, self.mock_log, []) with open(cc_mounts.FSTAB_PATH, 'r') as fd: fstab_new_content = fd.read() self.assertEqual(fstab_expected_content, fstab_new_content) def test_no_change_fstab_sets_needs_mount_all(self): '''verify unchanged fstab entries are mounted if not call mount -a''' fstab_original_content = ( 'LABEL=cloudimg-rootfs / ext4 defaults 0 0\n' 'LABEL=UEFI /boot/efi vfat defaults 0 0\n' '/dev/vdb /mnt auto defaults,noexec,comment=cloudconfig 0 2\n' ) fstab_expected_content = fstab_original_content cc = { 'mounts': [ ['/dev/vdb', '/mnt', 'auto', 'defaults,noexec'] ] } with open(cc_mounts.FSTAB_PATH, 'w') as fd: fd.write(fstab_original_content) with open(cc_mounts.FSTAB_PATH, 'r') as fd: fstab_new_content = fd.read() self.assertEqual(fstab_expected_content, fstab_new_content) cc_mounts.handle(None, cc, self.mock_cloud, self.mock_log, []) self.m_subp_subp.assert_has_calls([ mock.call(['mount', '-a']), mock.call(['systemctl', 'daemon-reload'])]) class TestCreateSwapfile: @pytest.mark.parametrize('fstype', ('xfs', 'btrfs', 'ext4', 'other')) @mock.patch(M_PATH + 'util.get_mount_info') @mock.patch(M_PATH + 'subp.subp') def test_happy_path(self, m_subp, m_get_mount_info, fstype, tmpdir): swap_file = tmpdir.join("swap-file") fname = str(swap_file) # Some of the calls to subp.subp should create the swap file; this # roughly approximates that m_subp.side_effect = lambda *args, **kwargs: swap_file.write('') m_get_mount_info.return_value = (mock.ANY, fstype) create_swapfile(fname, '') assert mock.call(['mkswap', fname]) in m_subp.call_args_list @mock.patch(M_PATH + "util.get_mount_info") @mock.patch(M_PATH + "subp.subp") def test_fallback_from_fallocate_to_dd( self, m_subp, m_get_mount_info, caplog, tmpdir ): swap_file = tmpdir.join("swap-file") fname = str(swap_file) def subp_side_effect(cmd, *args, **kwargs): # Mock fallocate failing, to initiate fallback if cmd[0] == "fallocate": raise ProcessExecutionError() m_subp.side_effect = subp_side_effect # Use ext4 so both fallocate and dd are valid swap creation methods m_get_mount_info.return_value = (mock.ANY, "ext4") create_swapfile(fname, "") cmds = [args[0][0] for args, _kwargs in m_subp.call_args_list] assert "fallocate" in cmds, "fallocate was not called" assert "dd" in cmds, "fallocate failure did not fallback to dd" assert cmds.index("dd") > cmds.index( "fallocate" ), "dd ran before fallocate" assert mock.call(["mkswap", fname]) in m_subp.call_args_list msg = "fallocate swap creation failed, will attempt with dd" assert msg in caplog.text # vi: ts=4 expandtab