# This file is part of cloud-init. See LICENSE file for license information. import os.path from unittest import mock from cloudinit.config import cc_mounts from cloudinit.tests import helpers as test_helpers class TestSanitizeDevname(test_helpers.FilesystemMockingTestCase): def setUp(self): super(TestSanitizeDevname, self).setUp() self.new_root = self.tmp_dir() self.patchOS(self.new_root) def _touch(self, path): path = os.path.join(self.new_root, path.lstrip('/')) basedir = os.path.dirname(path) if not os.path.exists(basedir): os.makedirs(basedir) open(path, 'a').close() def _makedirs(self, directory): directory = os.path.join(self.new_root, directory.lstrip('/')) if not os.path.exists(directory): os.makedirs(directory) def mock_existence_of_disk(self, disk_path): self._touch(disk_path) self._makedirs(os.path.join('/sys/block', disk_path.split('/')[-1])) def mock_existence_of_partition(self, disk_path, partition_number): self.mock_existence_of_disk(disk_path) self._touch(disk_path + str(partition_number)) disk_name = disk_path.split('/')[-1] self._makedirs(os.path.join('/sys/block', disk_name, disk_name + str(partition_number))) def test_existent_full_disk_path_is_returned(self): disk_path = '/dev/sda' self.mock_existence_of_disk(disk_path) self.assertEqual(disk_path, cc_mounts.sanitize_devname(disk_path, lambda x: None, mock.Mock())) def test_existent_disk_name_returns_full_path(self): disk_name = 'sda' disk_path = '/dev/' + disk_name self.mock_existence_of_disk(disk_path) self.assertEqual(disk_path, cc_mounts.sanitize_devname(disk_name, lambda x: None, mock.Mock())) def test_existent_meta_disk_is_returned(self): actual_disk_path = '/dev/sda' self.mock_existence_of_disk(actual_disk_path) self.assertEqual( actual_disk_path, cc_mounts.sanitize_devname('ephemeral0', lambda x: actual_disk_path, mock.Mock())) def test_existent_meta_partition_is_returned(self): disk_name, partition_part = '/dev/sda', '1' actual_partition_path = disk_name + partition_part self.mock_existence_of_partition(disk_name, partition_part) self.assertEqual( actual_partition_path, cc_mounts.sanitize_devname('ephemeral0.1', lambda x: disk_name, mock.Mock())) def test_existent_meta_partition_with_p_is_returned(self): disk_name, partition_part = '/dev/sda', 'p1' actual_partition_path = disk_name + partition_part self.mock_existence_of_partition(disk_name, partition_part) self.assertEqual( actual_partition_path, cc_mounts.sanitize_devname('ephemeral0.1', lambda x: disk_name, mock.Mock())) def test_first_partition_returned_if_existent_disk_is_partitioned(self): disk_name, partition_part = '/dev/sda', '1' actual_partition_path = disk_name + partition_part self.mock_existence_of_partition(disk_name, partition_part) self.assertEqual( actual_partition_path, cc_mounts.sanitize_devname('ephemeral0', lambda x: disk_name, mock.Mock())) def test_nth_partition_returned_if_requested(self): disk_name, partition_part = '/dev/sda', '3' actual_partition_path = disk_name + partition_part self.mock_existence_of_partition(disk_name, partition_part) self.assertEqual( actual_partition_path, cc_mounts.sanitize_devname('ephemeral0.3', lambda x: disk_name, mock.Mock())) def test_transformer_returning_none_returns_none(self): self.assertIsNone( cc_mounts.sanitize_devname( 'ephemeral0', lambda x: None, mock.Mock())) def test_missing_device_returns_none(self): self.assertIsNone( cc_mounts.sanitize_devname('/dev/sda', None, mock.Mock())) def test_missing_sys_returns_none(self): disk_path = '/dev/sda' self._makedirs(disk_path) self.assertIsNone( cc_mounts.sanitize_devname(disk_path, None, mock.Mock())) def test_existent_disk_but_missing_partition_returns_none(self): disk_path = '/dev/sda' self.mock_existence_of_disk(disk_path) self.assertIsNone( cc_mounts.sanitize_devname( 'ephemeral0.1', lambda x: disk_path, mock.Mock())) def test_network_device_returns_network_device(self): disk_path = 'netdevice:/path' self.assertEqual( disk_path, cc_mounts.sanitize_devname(disk_path, None, mock.Mock())) def test_device_aliases_remapping(self): disk_path = '/dev/sda' self.mock_existence_of_disk(disk_path) self.assertEqual(disk_path, cc_mounts.sanitize_devname('mydata', lambda x: None, mock.Mock(), {'mydata': disk_path})) class TestSwapFileCreation(test_helpers.FilesystemMockingTestCase): def setUp(self): super(TestSwapFileCreation, self).setUp() self.new_root = self.tmp_dir() self.patchOS(self.new_root) self.fstab_path = os.path.join(self.new_root, 'etc/fstab') self.swap_path = os.path.join(self.new_root, 'swap.img') self._makedirs('/etc') self.add_patch('cloudinit.config.cc_mounts.FSTAB_PATH', 'mock_fstab_path', self.fstab_path, autospec=False) self.add_patch('cloudinit.config.cc_mounts.subp.subp', 'm_subp_subp') self.add_patch('cloudinit.config.cc_mounts.util.mounts', 'mock_util_mounts', return_value={ '/dev/sda1': {'fstype': 'ext4', 'mountpoint': '/', 'opts': 'rw,relatime,discard' }}) self.mock_cloud = mock.Mock() self.mock_log = mock.Mock() self.mock_cloud.device_name_to_device = self.device_name_to_device self.cc = { 'swap': { 'filename': self.swap_path, 'size': '512', 'maxsize': '512'}} def _makedirs(self, directory): directory = os.path.join(self.new_root, directory.lstrip('/')) if not os.path.exists(directory): os.makedirs(directory) def device_name_to_device(self, path): if path == 'swap': return self.swap_path else: dev = None return dev @mock.patch('cloudinit.util.get_mount_info') @mock.patch('cloudinit.util.kernel_version') def test_swap_creation_method_fallocate_on_xfs(self, m_kernel_version, m_get_mount_info): m_kernel_version.return_value = (4, 20) m_get_mount_info.return_value = ["", "xfs"] cc_mounts.handle(None, self.cc, self.mock_cloud, self.mock_log, []) self.m_subp_subp.assert_has_calls([ mock.call(['fallocate', '-l', '0M', self.swap_path], capture=True), mock.call(['mkswap', self.swap_path]), mock.call(['swapon', '-a'])]) @mock.patch('cloudinit.util.get_mount_info') @mock.patch('cloudinit.util.kernel_version') def test_swap_creation_method_xfs(self, m_kernel_version, m_get_mount_info): m_kernel_version.return_value = (3, 18) m_get_mount_info.return_value = ["", "xfs"] cc_mounts.handle(None, self.cc, self.mock_cloud, self.mock_log, []) self.m_subp_subp.assert_has_calls([ mock.call(['dd', 'if=/dev/zero', 'of=' + self.swap_path, 'bs=1M', 'count=0'], capture=True), mock.call(['mkswap', self.swap_path]), mock.call(['swapon', '-a'])]) @mock.patch('cloudinit.util.get_mount_info') @mock.patch('cloudinit.util.kernel_version') def test_swap_creation_method_btrfs(self, m_kernel_version, m_get_mount_info): m_kernel_version.return_value = (4, 20) m_get_mount_info.return_value = ["", "btrfs"] cc_mounts.handle(None, self.cc, self.mock_cloud, self.mock_log, []) self.m_subp_subp.assert_has_calls([ mock.call(['dd', 'if=/dev/zero', 'of=' + self.swap_path, 'bs=1M', 'count=0'], capture=True), mock.call(['mkswap', self.swap_path]), mock.call(['swapon', '-a'])]) @mock.patch('cloudinit.util.get_mount_info') @mock.patch('cloudinit.util.kernel_version') def test_swap_creation_method_ext4(self, m_kernel_version, m_get_mount_info): m_kernel_version.return_value = (5, 14) m_get_mount_info.return_value = ["", "ext4"] cc_mounts.handle(None, self.cc, self.mock_cloud, self.mock_log, []) self.m_subp_subp.assert_has_calls([ mock.call(['fallocate', '-l', '0M', self.swap_path], capture=True), mock.call(['mkswap', self.swap_path]), mock.call(['swapon', '-a'])]) class TestFstabHandling(test_helpers.FilesystemMockingTestCase): swap_path = '/dev/sdb1' def setUp(self): super(TestFstabHandling, self).setUp() self.new_root = self.tmp_dir() self.patchOS(self.new_root) self.fstab_path = os.path.join(self.new_root, 'etc/fstab') self._makedirs('/etc') self.add_patch('cloudinit.config.cc_mounts.FSTAB_PATH', 'mock_fstab_path', self.fstab_path, autospec=False) self.add_patch('cloudinit.config.cc_mounts._is_block_device', 'mock_is_block_device', return_value=True) self.add_patch('cloudinit.config.cc_mounts.subp.subp', 'm_subp_subp') self.add_patch('cloudinit.config.cc_mounts.util.mounts', 'mock_util_mounts', return_value={ '/dev/sda1': {'fstype': 'ext4', 'mountpoint': '/', 'opts': 'rw,relatime,discard' }}) self.mock_cloud = mock.Mock() self.mock_log = mock.Mock() self.mock_cloud.device_name_to_device = self.device_name_to_device def _makedirs(self, directory): directory = os.path.join(self.new_root, directory.lstrip('/')) if not os.path.exists(directory): os.makedirs(directory) def device_name_to_device(self, path): if path == 'swap': return self.swap_path else: dev = None return dev def test_no_fstab(self): """ Handle images which do not include an fstab. """ self.assertFalse(os.path.exists(cc_mounts.FSTAB_PATH)) fstab_expected_content = ( '%s\tnone\tswap\tsw,comment=cloudconfig\t' '0\t0\n' % (self.swap_path,) ) cc_mounts.handle(None, {}, self.mock_cloud, self.mock_log, []) with open(cc_mounts.FSTAB_PATH, 'r') as fd: fstab_new_content = fd.read() self.assertEqual(fstab_expected_content, fstab_new_content) def test_swap_integrity(self): '''Ensure that the swap file is correctly created and can swapon successfully. Fixing the corner case of: kernel: swapon: swapfile has holes''' fstab = '/swap.img swap swap defaults 0 0\n' with open(cc_mounts.FSTAB_PATH, 'w') as fd: fd.write(fstab) cc = {'swap': ['filename: /swap.img', 'size: 512', 'maxsize: 512']} cc_mounts.handle(None, cc, self.mock_cloud, self.mock_log, []) def test_fstab_no_swap_device(self): '''Ensure that cloud-init adds a discovered swap partition to /etc/fstab.''' fstab_original_content = '' fstab_expected_content = ( '%s\tnone\tswap\tsw,comment=cloudconfig\t' '0\t0\n' % (self.swap_path,) ) with open(cc_mounts.FSTAB_PATH, 'w') as fd: fd.write(fstab_original_content) cc_mounts.handle(None, {}, self.mock_cloud, self.mock_log, []) with open(cc_mounts.FSTAB_PATH, 'r') as fd: fstab_new_content = fd.read() self.assertEqual(fstab_expected_content, fstab_new_content) def test_fstab_same_swap_device_already_configured(self): '''Ensure that cloud-init will not add a swap device if the same device already exists in /etc/fstab.''' fstab_original_content = '%s swap swap defaults 0 0\n' % ( self.swap_path,) fstab_expected_content = fstab_original_content with open(cc_mounts.FSTAB_PATH, 'w') as fd: fd.write(fstab_original_content) cc_mounts.handle(None, {}, self.mock_cloud, self.mock_log, []) with open(cc_mounts.FSTAB_PATH, 'r') as fd: fstab_new_content = fd.read() self.assertEqual(fstab_expected_content, fstab_new_content) def test_fstab_alternate_swap_device_already_configured(self): '''Ensure that cloud-init will add a discovered swap device to /etc/fstab even when there exists a swap definition on another device.''' fstab_original_content = '/dev/sdc1 swap swap defaults 0 0\n' fstab_expected_content = ( fstab_original_content + '%s\tnone\tswap\tsw,comment=cloudconfig\t' '0\t0\n' % (self.swap_path,) ) with open(cc_mounts.FSTAB_PATH, 'w') as fd: fd.write(fstab_original_content) cc_mounts.handle(None, {}, self.mock_cloud, self.mock_log, []) with open(cc_mounts.FSTAB_PATH, 'r') as fd: fstab_new_content = fd.read() self.assertEqual(fstab_expected_content, fstab_new_content) def test_no_change_fstab_sets_needs_mount_all(self): '''verify unchanged fstab entries are mounted if not call mount -a''' fstab_original_content = ( 'LABEL=cloudimg-rootfs / ext4 defaults 0 0\n' 'LABEL=UEFI /boot/efi vfat defaults 0 0\n' '/dev/vdb /mnt auto defaults,noexec,comment=cloudconfig 0 2\n' ) fstab_expected_content = fstab_original_content cc = { 'mounts': [ ['/dev/vdb', '/mnt', 'auto', 'defaults,noexec'] ] } with open(cc_mounts.FSTAB_PATH, 'w') as fd: fd.write(fstab_original_content) with open(cc_mounts.FSTAB_PATH, 'r') as fd: fstab_new_content = fd.read() self.assertEqual(fstab_expected_content, fstab_new_content) cc_mounts.handle(None, cc, self.mock_cloud, self.mock_log, []) self.m_subp_subp.assert_has_calls([ mock.call(['mount', '-a']), mock.call(['systemctl', 'daemon-reload'])]) # vi: ts=4 expandtab