diff options
Diffstat (limited to 'tests/unittests/sources/test_altcloud.py')
-rw-r--r-- | tests/unittests/sources/test_altcloud.py | 450 |
1 files changed, 450 insertions, 0 deletions
diff --git a/tests/unittests/sources/test_altcloud.py b/tests/unittests/sources/test_altcloud.py new file mode 100644 index 00000000..7384c104 --- /dev/null +++ b/tests/unittests/sources/test_altcloud.py @@ -0,0 +1,450 @@ +# Copyright (C) 2009-2010 Canonical Ltd. +# Copyright (C) 2012 Hewlett-Packard Development Company, L.P. +# Copyright (C) 2012 Yahoo! Inc. +# +# Author: Joe VLcek <JVLcek@RedHat.com> +# +# This file is part of cloud-init. See LICENSE file for license information. + +''' +This test file exercises the code in sources DataSourceAltCloud.py +''' + +import os +import shutil +import tempfile + +from cloudinit import dmi +from cloudinit import helpers +from cloudinit import subp +from cloudinit import util + +from tests.unittests.helpers import CiTestCase, mock + +import cloudinit.sources.DataSourceAltCloud as dsac + +OS_UNAME_ORIG = getattr(os, 'uname') + + +def _write_user_data_files(mount_dir, value): + ''' + Populate the deltacloud_user_data_file the user_data_file + which would be populated with user data. + ''' + deltacloud_user_data_file = mount_dir + '/deltacloud-user-data.txt' + user_data_file = mount_dir + '/user-data.txt' + + udfile = open(deltacloud_user_data_file, 'w') + udfile.write(value) + udfile.close() + os.chmod(deltacloud_user_data_file, 0o664) + + udfile = open(user_data_file, 'w') + udfile.write(value) + udfile.close() + os.chmod(user_data_file, 0o664) + + +def _remove_user_data_files(mount_dir, + dc_file=True, + non_dc_file=True): + ''' + Remove the test files: deltacloud_user_data_file and + user_data_file + ''' + deltacloud_user_data_file = mount_dir + '/deltacloud-user-data.txt' + user_data_file = mount_dir + '/user-data.txt' + + # Ignore any failures removeing files that are already gone. + if dc_file: + try: + os.remove(deltacloud_user_data_file) + except OSError: + pass + + if non_dc_file: + try: + os.remove(user_data_file) + except OSError: + pass + + +def _dmi_data(expected): + ''' + Spoof the data received over DMI + ''' + def _data(key): + return expected + + return _data + + +class TestGetCloudType(CiTestCase): + '''Test to exercise method: DataSourceAltCloud.get_cloud_type()''' + + with_logs = True + + def setUp(self): + '''Set up.''' + super(TestGetCloudType, self).setUp() + self.tmp = self.tmp_dir() + self.paths = helpers.Paths({'cloud_dir': self.tmp}) + self.dmi_data = dmi.read_dmi_data + # We have a different code path for arm to deal with LP1243287 + # We have to switch arch to x86_64 to avoid test failure + force_arch('x86_64') + + def tearDown(self): + # Reset + dmi.read_dmi_data = self.dmi_data + force_arch() + + def test_cloud_info_file_ioerror(self): + """Return UNKNOWN when /etc/sysconfig/cloud-info exists but errors.""" + self.assertEqual('/etc/sysconfig/cloud-info', dsac.CLOUD_INFO_FILE) + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) + # Attempting to read the directory generates IOError + with mock.patch.object(dsac, 'CLOUD_INFO_FILE', self.tmp): + self.assertEqual('UNKNOWN', dsrc.get_cloud_type()) + self.assertIn( + "[Errno 21] Is a directory: '%s'" % self.tmp, + self.logs.getvalue()) + + def test_cloud_info_file(self): + """Return uppercase stripped content from /etc/sysconfig/cloud-info.""" + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) + cloud_info = self.tmp_path('cloud-info', dir=self.tmp) + util.write_file(cloud_info, ' OverRiDdeN CloudType ') + # Attempting to read the directory generates IOError + with mock.patch.object(dsac, 'CLOUD_INFO_FILE', cloud_info): + self.assertEqual('OVERRIDDEN CLOUDTYPE', dsrc.get_cloud_type()) + + def test_rhev(self): + ''' + Test method get_cloud_type() for RHEVm systems. + Forcing read_dmi_data return to match a RHEVm system: RHEV Hypervisor + ''' + dmi.read_dmi_data = _dmi_data('RHEV') + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) + self.assertEqual('RHEV', dsrc.get_cloud_type()) + + def test_vsphere(self): + ''' + Test method get_cloud_type() for vSphere systems. + Forcing read_dmi_data return to match a vSphere system: RHEV Hypervisor + ''' + dmi.read_dmi_data = _dmi_data('VMware Virtual Platform') + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) + self.assertEqual('VSPHERE', dsrc.get_cloud_type()) + + def test_unknown(self): + ''' + Test method get_cloud_type() for unknown systems. + Forcing read_dmi_data return to match an unrecognized return. + ''' + dmi.read_dmi_data = _dmi_data('Unrecognized Platform') + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) + self.assertEqual('UNKNOWN', dsrc.get_cloud_type()) + + +class TestGetDataCloudInfoFile(CiTestCase): + ''' + Test to exercise method: DataSourceAltCloud.get_data() + With a contrived CLOUD_INFO_FILE + ''' + def setUp(self): + '''Set up.''' + self.tmp = self.tmp_dir() + self.paths = helpers.Paths( + {'cloud_dir': self.tmp, 'run_dir': self.tmp}) + self.cloud_info_file = self.tmp_path('cloud-info', dir=self.tmp) + + def test_rhev(self): + '''Success Test module get_data() forcing RHEV.''' + + util.write_file(self.cloud_info_file, 'RHEV') + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) + dsrc.user_data_rhevm = lambda: True + with mock.patch.object(dsac, 'CLOUD_INFO_FILE', self.cloud_info_file): + self.assertEqual(True, dsrc.get_data()) + self.assertEqual('altcloud', dsrc.cloud_name) + self.assertEqual('altcloud', dsrc.platform_type) + self.assertEqual('rhev (/dev/fd0)', dsrc.subplatform) + + def test_vsphere(self): + '''Success Test module get_data() forcing VSPHERE.''' + + util.write_file(self.cloud_info_file, 'VSPHERE') + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) + dsrc.user_data_vsphere = lambda: True + with mock.patch.object(dsac, 'CLOUD_INFO_FILE', self.cloud_info_file): + self.assertEqual(True, dsrc.get_data()) + self.assertEqual('altcloud', dsrc.cloud_name) + self.assertEqual('altcloud', dsrc.platform_type) + self.assertEqual('vsphere (unknown)', dsrc.subplatform) + + def test_fail_rhev(self): + '''Failure Test module get_data() forcing RHEV.''' + + util.write_file(self.cloud_info_file, 'RHEV') + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) + dsrc.user_data_rhevm = lambda: False + with mock.patch.object(dsac, 'CLOUD_INFO_FILE', self.cloud_info_file): + self.assertEqual(False, dsrc.get_data()) + + def test_fail_vsphere(self): + '''Failure Test module get_data() forcing VSPHERE.''' + + util.write_file(self.cloud_info_file, 'VSPHERE') + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) + dsrc.user_data_vsphere = lambda: False + with mock.patch.object(dsac, 'CLOUD_INFO_FILE', self.cloud_info_file): + self.assertEqual(False, dsrc.get_data()) + + def test_unrecognized(self): + '''Failure Test module get_data() forcing unrecognized.''' + + util.write_file(self.cloud_info_file, 'unrecognized') + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) + with mock.patch.object(dsac, 'CLOUD_INFO_FILE', self.cloud_info_file): + self.assertEqual(False, dsrc.get_data()) + + +class TestGetDataNoCloudInfoFile(CiTestCase): + ''' + Test to exercise method: DataSourceAltCloud.get_data() + Without a CLOUD_INFO_FILE + ''' + def setUp(self): + '''Set up.''' + self.tmp = self.tmp_dir() + self.paths = helpers.Paths( + {'cloud_dir': self.tmp, 'run_dir': self.tmp}) + self.dmi_data = dmi.read_dmi_data + dsac.CLOUD_INFO_FILE = \ + 'no such file' + # We have a different code path for arm to deal with LP1243287 + # We have to switch arch to x86_64 to avoid test failure + force_arch('x86_64') + + def tearDown(self): + # Reset + dsac.CLOUD_INFO_FILE = \ + '/etc/sysconfig/cloud-info' + dmi.read_dmi_data = self.dmi_data + # Return back to original arch + force_arch() + + def test_rhev_no_cloud_file(self): + '''Test No cloud info file module get_data() forcing RHEV.''' + + dmi.read_dmi_data = _dmi_data('RHEV Hypervisor') + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) + dsrc.user_data_rhevm = lambda: True + self.assertEqual(True, dsrc.get_data()) + + def test_vsphere_no_cloud_file(self): + '''Test No cloud info file module get_data() forcing VSPHERE.''' + + dmi.read_dmi_data = _dmi_data('VMware Virtual Platform') + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) + dsrc.user_data_vsphere = lambda: True + self.assertEqual(True, dsrc.get_data()) + + def test_failure_no_cloud_file(self): + '''Test No cloud info file module get_data() forcing unrecognized.''' + + dmi.read_dmi_data = _dmi_data('Unrecognized Platform') + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) + self.assertEqual(False, dsrc.get_data()) + + +class TestUserDataRhevm(CiTestCase): + ''' + Test to exercise method: DataSourceAltCloud.user_data_rhevm() + ''' + def setUp(self): + '''Set up.''' + self.paths = helpers.Paths({'cloud_dir': '/tmp'}) + self.mount_dir = self.tmp_dir() + _write_user_data_files(self.mount_dir, 'test user data') + self.add_patch( + 'cloudinit.sources.DataSourceAltCloud.modprobe_floppy', + 'm_modprobe_floppy', return_value=None) + self.add_patch( + 'cloudinit.sources.DataSourceAltCloud.util.udevadm_settle', + 'm_udevadm_settle', return_value=('', '')) + self.add_patch( + 'cloudinit.sources.DataSourceAltCloud.util.mount_cb', + 'm_mount_cb') + + def test_mount_cb_fails(self): + '''Test user_data_rhevm() where mount_cb fails.''' + + self.m_mount_cb.side_effect = util.MountFailedError("Failed Mount") + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) + self.assertEqual(False, dsrc.user_data_rhevm()) + + def test_modprobe_fails(self): + '''Test user_data_rhevm() where modprobe fails.''' + + self.m_modprobe_floppy.side_effect = subp.ProcessExecutionError( + "Failed modprobe") + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) + self.assertEqual(False, dsrc.user_data_rhevm()) + + def test_no_modprobe_cmd(self): + '''Test user_data_rhevm() with no modprobe command.''' + + self.m_modprobe_floppy.side_effect = subp.ProcessExecutionError( + "No such file or dir") + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) + self.assertEqual(False, dsrc.user_data_rhevm()) + + def test_udevadm_fails(self): + '''Test user_data_rhevm() where udevadm fails.''' + + self.m_udevadm_settle.side_effect = subp.ProcessExecutionError( + "Failed settle.") + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) + self.assertEqual(False, dsrc.user_data_rhevm()) + + def test_no_udevadm_cmd(self): + '''Test user_data_rhevm() with no udevadm command.''' + + self.m_udevadm_settle.side_effect = OSError("No such file or dir") + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) + self.assertEqual(False, dsrc.user_data_rhevm()) + + +class TestUserDataVsphere(CiTestCase): + ''' + Test to exercise method: DataSourceAltCloud.user_data_vsphere() + ''' + def setUp(self): + '''Set up.''' + self.tmp = self.tmp_dir() + self.paths = helpers.Paths({'cloud_dir': self.tmp}) + self.mount_dir = tempfile.mkdtemp() + + _write_user_data_files(self.mount_dir, 'test user data') + + def tearDown(self): + # Reset + + _remove_user_data_files(self.mount_dir) + + # Attempt to remove the temp dir ignoring errors + try: + shutil.rmtree(self.mount_dir) + except OSError: + pass + + dsac.CLOUD_INFO_FILE = \ + '/etc/sysconfig/cloud-info' + + @mock.patch("cloudinit.sources.DataSourceAltCloud.util.find_devs_with") + @mock.patch("cloudinit.sources.DataSourceAltCloud.util.mount_cb") + def test_user_data_vsphere_no_cdrom(self, m_mount_cb, m_find_devs_with): + '''Test user_data_vsphere() where mount_cb fails.''' + + m_mount_cb.return_value = [] + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) + self.assertEqual(False, dsrc.user_data_vsphere()) + self.assertEqual(0, m_mount_cb.call_count) + + @mock.patch("cloudinit.sources.DataSourceAltCloud.util.find_devs_with") + @mock.patch("cloudinit.sources.DataSourceAltCloud.util.mount_cb") + def test_user_data_vsphere_mcb_fail(self, m_mount_cb, m_find_devs_with): + '''Test user_data_vsphere() where mount_cb fails.''' + + m_find_devs_with.return_value = ["/dev/mock/cdrom"] + m_mount_cb.side_effect = util.MountFailedError("Unable To mount") + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) + self.assertEqual(False, dsrc.user_data_vsphere()) + self.assertEqual(1, m_find_devs_with.call_count) + self.assertEqual(1, m_mount_cb.call_count) + + @mock.patch("cloudinit.sources.DataSourceAltCloud.util.find_devs_with") + @mock.patch("cloudinit.sources.DataSourceAltCloud.util.mount_cb") + def test_user_data_vsphere_success(self, m_mount_cb, m_find_devs_with): + """Test user_data_vsphere() where successful.""" + m_find_devs_with.return_value = ["/dev/mock/cdrom"] + m_mount_cb.return_value = 'raw userdata from cdrom' + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) + cloud_info = self.tmp_path('cloud-info', dir=self.tmp) + util.write_file(cloud_info, 'VSPHERE') + self.assertEqual(True, dsrc.user_data_vsphere()) + m_find_devs_with.assert_called_once_with('LABEL=CDROM') + m_mount_cb.assert_called_once_with( + '/dev/mock/cdrom', dsac.read_user_data_callback) + with mock.patch.object(dsrc, 'get_cloud_type', return_value='VSPHERE'): + self.assertEqual('vsphere (/dev/mock/cdrom)', dsrc.subplatform) + + +class TestReadUserDataCallback(CiTestCase): + ''' + Test to exercise method: DataSourceAltCloud.read_user_data_callback() + ''' + def setUp(self): + '''Set up.''' + self.paths = helpers.Paths({'cloud_dir': '/tmp'}) + self.mount_dir = tempfile.mkdtemp() + + _write_user_data_files(self.mount_dir, 'test user data') + + def tearDown(self): + # Reset + + _remove_user_data_files(self.mount_dir) + + # Attempt to remove the temp dir ignoring errors + try: + shutil.rmtree(self.mount_dir) + except OSError: + pass + + def test_callback_both(self): + '''Test read_user_data_callback() with both files.''' + + self.assertEqual('test user data', + dsac.read_user_data_callback(self.mount_dir)) + + def test_callback_dc(self): + '''Test read_user_data_callback() with only DC file.''' + + _remove_user_data_files(self.mount_dir, + dc_file=False, + non_dc_file=True) + + self.assertEqual('test user data', + dsac.read_user_data_callback(self.mount_dir)) + + def test_callback_non_dc(self): + '''Test read_user_data_callback() with only non-DC file.''' + + _remove_user_data_files(self.mount_dir, + dc_file=True, + non_dc_file=False) + + self.assertEqual('test user data', + dsac.read_user_data_callback(self.mount_dir)) + + def test_callback_none(self): + '''Test read_user_data_callback() no files are found.''' + + _remove_user_data_files(self.mount_dir) + self.assertIsNone(dsac.read_user_data_callback(self.mount_dir)) + + +def force_arch(arch=None): + + def _os_uname(): + return ('LINUX', 'NODENAME', 'RELEASE', 'VERSION', arch) + + if arch: + setattr(os, 'uname', _os_uname) + elif arch is None: + setattr(os, 'uname', OS_UNAME_ORIG) + +# vi: ts=4 expandtab |