diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/unittests/test_datasource/test_altcloud.py | 255 |
1 files changed, 131 insertions, 124 deletions
diff --git a/tests/unittests/test_datasource/test_altcloud.py b/tests/unittests/test_datasource/test_altcloud.py index 27912652..b828711c 100644 --- a/tests/unittests/test_datasource/test_altcloud.py +++ b/tests/unittests/test_datasource/test_altcloud.py @@ -1,6 +1,29 @@ -#! /usr/bin/env python +# vi: ts=4 expandtab +# +# Copyright (C) 2009-2010 Canonical Ltd. +# Copyright (C) 2012 Hewlett-Packard Development Company, L.P. +# Copyright (C) 2012 Yahoo! Inc. +# +# Author: Joe VLcek <JVLcek@RedHat.com> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3, as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +''' +This test file exercises the code in sources DataSourceAltCloud.py +''' import os +import shutil +import tempfile from unittest import TestCase from cloudinit import helpers @@ -16,9 +39,9 @@ def _write_cloud_info_file(value): with a cloud backend identifier ImageFactory when building an image with ImageFactory. ''' - f = open(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 'w') - f.write(value) - f.close() + cifile = open(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 'w') + cifile.write(value) + cifile.close() os.chmod(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 0664) def _remove_cloud_info_file(): @@ -35,14 +58,14 @@ def _write_user_data_files(mount_dir, value): deltacloud_user_data_file = mount_dir + '/deltacloud-user-data.txt' user_data_file = mount_dir + '/user-data.txt' - f = open(deltacloud_user_data_file, 'w') - f.write(value) - f.close() + udfile = open(deltacloud_user_data_file, 'w') + udfile.write(value) + udfile.close() os.chmod(deltacloud_user_data_file, 0664) - f = open(user_data_file, 'w') - f.write(value) - f.close() + udfile = open(user_data_file, 'w') + udfile.write(value) + udfile.close() os.chmod(user_data_file, 0664) def _remove_user_data_files(mount_dir, @@ -68,8 +91,7 @@ def _remove_user_data_files(mount_dir, except OSError: pass - -class TestDataSouceAltCloud_get_cloud_type(TestCase): +class TestGetCloudType(TestCase): ''' Test to exercise method: DataSourceAltCloud.get_cloud_type() ''' @@ -77,70 +99,66 @@ class TestDataSouceAltCloud_get_cloud_type(TestCase): def setUp(self): ''' Set up ''' self.paths = helpers.Paths({ 'cloud_dir': '/tmp' }) - cloudinit.sources.DataSourceAltCloud.RETRY_TIMES = 1 - cloudinit.sources.DataSourceAltCloud.SLEEP_SECS = 1 def tearDown(self): # Reset cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ ['dmidecode', '--string', 'system-product-name'] - cloudinit.sources.DataSourceAltCloud.RETRY_TIMES = 3 - cloudinit.sources.DataSourceAltCloud.SLEEP_SECS = 3 - def test_get_cloud_type_RHEV(self): + def test_rhev(self): ''' Test method get_cloud_type() for RHEVm systems. Forcing dmidecode return to match a RHEVm system: RHEV Hypervisor ''' cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ ['echo', 'RHEV Hypervisor'] - ds = DataSourceAltCloud({}, None, self.paths) + dsrc = DataSourceAltCloud({}, None, self.paths) self.assertEquals('RHEV', \ - ds.get_cloud_type()) + dsrc.get_cloud_type()) - def test_get_cloud_type_VSPHERE(self): + def test_vsphere(self): ''' Test method get_cloud_type() for vSphere systems. Forcing dmidecode return to match a vSphere system: RHEV Hypervisor ''' cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ ['echo', 'VMware Virtual Platform'] - ds = DataSourceAltCloud({}, None, self.paths) + dsrc = DataSourceAltCloud({}, None, self.paths) self.assertEquals('VSPHERE', \ - ds.get_cloud_type()) + dsrc.get_cloud_type()) - def test_get_cloud_type_UNKNOWN(self): + def test_unknown(self): ''' Test method get_cloud_type() for unknown systems. Forcing dmidecode return to match an unrecognized return. ''' cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ ['echo', 'Unrecognized Platform'] - ds = DataSourceAltCloud({}, None, self.paths) + dsrc = DataSourceAltCloud({}, None, self.paths) self.assertEquals('UNKNOWN', \ - ds.get_cloud_type()) + dsrc.get_cloud_type()) - def test_get_cloud_type_exception1(self): + def test_exception1(self): ''' Test method get_cloud_type() where command dmidecode fails. ''' cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ ['ls', 'bad command'] - ds = DataSourceAltCloud({}, None, self.paths) + dsrc = DataSourceAltCloud({}, None, self.paths) self.assertEquals('UNKNOWN', \ - ds.get_cloud_type()) + dsrc.get_cloud_type()) - def test_get_cloud_type_exception(self): + def test_exception2(self): ''' Test method get_cloud_type() where command dmidecode is not available. ''' cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ ['bad command'] - ds = DataSourceAltCloud({}, None, self.paths) + dsrc = DataSourceAltCloud({}, None, self.paths) self.assertEquals('UNKNOWN', \ - ds.get_cloud_type()) + dsrc.get_cloud_type()) -class TestDataSouceAltCloud_get_data_cloud_info_file(TestCase): +class TestGetDataCloudInfoFile(TestCase): ''' Test to exercise method: DataSourceAltCloud.get_data() With a contrived CLOUD_INFO_FILE @@ -148,58 +166,62 @@ class TestDataSouceAltCloud_get_data_cloud_info_file(TestCase): def setUp(self): ''' Set up ''' self.paths = helpers.Paths({ 'cloud_dir': '/tmp' }) + self.cloud_info_file = tempfile.mkstemp()[1] cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ - '/tmp/cloudinit_test_etc_sysconfig_cloud-info' - cloudinit.sources.DataSourceAltCloud.RETRY_TIMES = 1 - cloudinit.sources.DataSourceAltCloud.SLEEP_SECS = 1 + self.cloud_info_file def tearDown(self): # Reset + + # Attempt to remove the temp file ignoring errors + try: + os.remove(self.cloud_info_file) + except OSError: + pass + cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ '/etc/sysconfig/cloud-info' - cloudinit.sources.DataSourceAltCloud.RETRY_TIMES = 3 - cloudinit.sources.DataSourceAltCloud.SLEEP_SECS = 3 - def test_get_data_RHEV_cloud_file(self): + def test_rhev(self): '''Success Test module get_data() forcing RHEV ''' _write_cloud_info_file('RHEV') - ds = DataSourceAltCloud({}, None, self.paths) - ds.user_data_rhevm = lambda : True - self.assertEquals(True, ds.get_data()) + dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc.user_data_rhevm = lambda : True + self.assertEquals(True, dsrc.get_data()) - def test_get_data_VSPHERE_cloud_file(self): + def test_vsphere(self): '''Success Test module get_data() forcing VSPHERE ''' _write_cloud_info_file('VSPHERE') - ds = DataSourceAltCloud({}, None, self.paths) - ds.user_data_vsphere = lambda : True - self.assertEquals(True, ds.get_data()) + dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc.user_data_vsphere = lambda : True + self.assertEquals(True, dsrc.get_data()) - def test_failure_get_data_RHEV_cloud_file(self): + def test_fail_rhev(self): '''Failure Test module get_data() forcing RHEV ''' _write_cloud_info_file('RHEV') - ds = DataSourceAltCloud({}, None, self.paths) - ds.user_data_rhevm = lambda : False - self.assertEquals(False, ds.get_data()) + dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc.user_data_rhevm = lambda : False + self.assertEquals(False, dsrc.get_data()) - def test_failure_get_data_VSPHERE_cloud_file(self): + def test_fail_vsphere(self): '''Failure Test module get_data() forcing VSPHERE ''' _write_cloud_info_file('VSPHERE') - ds = DataSourceAltCloud({}, None, self.paths) - ds.user_data_vsphere = lambda : False - self.assertEquals(False, ds.get_data()) + dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc.user_data_vsphere = lambda : False + self.assertEquals(False, dsrc.get_data()) - def test_failure_get_data_unrecognized_cloud_file(self): + def test_unrecognized(self): '''Failure Test module get_data() forcing unrecognized ''' _write_cloud_info_file('unrecognized') - ds = DataSourceAltCloud({}, None, self.paths) - self.assertEquals(False, ds.get_data()) + dsrc = DataSourceAltCloud({}, None, self.paths) + self.assertEquals(False, dsrc.get_data()) -class TestDataSouceAltCloud_get_data_no_cloud_info_file(TestCase): +class TestGetDataNoCloudInfoFile(TestCase): ''' Test to exercise method: DataSourceAltCloud.get_data() Without a CLOUD_INFO_FILE @@ -209,8 +231,6 @@ class TestDataSouceAltCloud_get_data_no_cloud_info_file(TestCase): self.paths = helpers.Paths({ 'cloud_dir': '/tmp' }) cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ 'no such file' - cloudinit.sources.DataSourceAltCloud.RETRY_TIMES = 1 - cloudinit.sources.DataSourceAltCloud.SLEEP_SECS = 1 def tearDown(self): # Reset @@ -218,55 +238,41 @@ class TestDataSouceAltCloud_get_data_no_cloud_info_file(TestCase): '/etc/sysconfig/cloud-info' cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ ['dmidecode', '--string', 'system-product-name'] - cloudinit.sources.DataSourceAltCloud.RETRY_TIMES = 3 - cloudinit.sources.DataSourceAltCloud.SLEEP_SECS = 3 - def test_get_data_RHEV_cloud_file(self): + def test_rhev_no_cloud_file(self): '''Test No cloud info file module get_data() forcing RHEV ''' cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ ['echo', 'RHEV Hypervisor'] - ds = DataSourceAltCloud({}, None, self.paths) - ds.user_data_rhevm = lambda : True - self.assertEquals(True, ds.get_data()) + dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc.user_data_rhevm = lambda : True + self.assertEquals(True, dsrc.get_data()) - def test_get_data_VSPHERE_cloud_file(self): + def test_vsphere_no_cloud_file(self): '''Test No cloud info file module get_data() forcing VSPHERE ''' cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ ['echo', 'VMware Virtual Platform'] - ds = DataSourceAltCloud({}, None, self.paths) - ds.user_data_vsphere = lambda : True - self.assertEquals(True, ds.get_data()) + dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc.user_data_vsphere = lambda : True + self.assertEquals(True, dsrc.get_data()) - def test_failure_get_data_VSPHERE_cloud_file(self): + def test_failure_no_cloud_file(self): '''Test No cloud info file module get_data() forcing unrecognized ''' cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ ['echo', 'Unrecognized Platform'] - ds = DataSourceAltCloud({}, None, self.paths) - self.assertEquals(False, ds.get_data()) + dsrc = DataSourceAltCloud({}, None, self.paths) + self.assertEquals(False, dsrc.get_data()) -class TestDataSouceAltCloud_user_data_rhevm(TestCase): +class TestUserDataRhevm(TestCase): ''' Test to exercise method: DataSourceAltCloud.user_data_rhevm() ''' def setUp(self): ''' Set up ''' self.paths = helpers.Paths({ 'cloud_dir': '/tmp' }) - self.mount_dir = '/tmp/cloudinit_test_media' - - cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ - '/tmp/cloudinit_test_etc_sysconfig_cloud-info' - cloudinit.sources.DataSourceAltCloud.RETRY_TIMES = 1 - cloudinit.sources.DataSourceAltCloud.SLEEP_SECS = 1 - - try: - os.mkdir(self.mount_dir) - except OSError, (errno, strerror): - # Ignore OSError: [Errno 17] File exists: - if errno is not 17: - raise + self.mount_dir = tempfile.mkdtemp() _write_user_data_files(self.mount_dir, 'test user data') @@ -275,64 +281,55 @@ class TestDataSouceAltCloud_user_data_rhevm(TestCase): _remove_user_data_files(self.mount_dir) + # Attempt to remove the temp dir ignoring errors + try: + shutil.rmtree(self.mount_dir) + except OSError: + pass + cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ '/etc/sysconfig/cloud-info' - cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ ['/sbin/modprobe', 'floppy'] - cloudinit.sources.DataSourceAltCloud.RETRY_TIMES = 3 - cloudinit.sources.DataSourceAltCloud.SLEEP_SECS = 3 - def test_user_data_rhevm(self): + def test_mount_cb_fails(self): '''Test user_data_rhevm() where mount_cb fails''' cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ ['echo', 'modprobe floppy'] - ds = DataSourceAltCloud({}, None, self.paths) + dsrc = DataSourceAltCloud({}, None, self.paths) - self.assertEquals(False, ds.user_data_rhevm()) + self.assertEquals(False, dsrc.user_data_rhevm()) - def test_user_data_rhevm_modprobe_fails(self): + def test_modprobe_fails(self): '''Test user_data_rhevm() where modprobe fails. ''' cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ ['ls', 'modprobe floppy'] - ds = DataSourceAltCloud({}, None, self.paths) + dsrc = DataSourceAltCloud({}, None, self.paths) - self.assertEquals(False, ds.user_data_rhevm()) + self.assertEquals(False, dsrc.user_data_rhevm()) - def test_user_data_rhevm_no_modprobe_cmd(self): + def test_no_modprobe_cmd(self): '''Test user_data_rhevm() with no modprobe command. ''' cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ ['bad command', 'modprobe floppy'] - ds = DataSourceAltCloud({}, None, self.paths) + dsrc = DataSourceAltCloud({}, None, self.paths) - self.assertEquals(False, ds.user_data_rhevm()) + self.assertEquals(False, dsrc.user_data_rhevm()) -class TestDataSouceAltCloud_user_data_vsphere(TestCase): +class TestUserDataVsphere(TestCase): ''' Test to exercise method: DataSourceAltCloud.user_data_vsphere() ''' def setUp(self): ''' Set up ''' self.paths = helpers.Paths({ 'cloud_dir': '/tmp' }) - self.mount_dir = '/tmp/cloudinit_test_media' - - cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ - '/tmp/cloudinit_test_etc_sysconfig_cloud-info' - cloudinit.sources.DataSourceAltCloud.RETRY_TIMES = 1 - cloudinit.sources.DataSourceAltCloud.SLEEP_SECS = 1 - - try: - os.mkdir(self.mount_dir) - except OSError, (errno, strerror): - # Ignore OSError: [Errno 17] File exists: - if errno is not 17: - raise + self.mount_dir = tempfile.mkdtemp() _write_user_data_files(self.mount_dir, 'test user data') @@ -341,29 +338,32 @@ class TestDataSouceAltCloud_user_data_vsphere(TestCase): _remove_user_data_files(self.mount_dir) + # Attempt to remove the temp dir ignoring errors + try: + shutil.rmtree(self.mount_dir) + except OSError: + pass + cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ '/etc/sysconfig/cloud-info' - cloudinit.sources.DataSourceAltCloud.RETRY_TIMES = 3 - cloudinit.sources.DataSourceAltCloud.SLEEP_SECS = 3 def test_user_data_vsphere(self): '''Test user_data_vsphere() where mount_cb fails''' - cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \ - '/tmp/cloudinit_test_media' + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = self.mount_dir - ds = DataSourceAltCloud({}, None, self.paths) + dsrc = DataSourceAltCloud({}, None, self.paths) - self.assertEquals(False, ds.user_data_vsphere()) + self.assertEquals(False, dsrc.user_data_vsphere()) -class TestDataSouceAltCloud_read_user_data_callback(TestCase): +class TestReadUserDataCallback(TestCase): ''' Test to exercise method: DataSourceAltCloud.read_user_data_callback() ''' def setUp(self): ''' Set up ''' self.paths = helpers.Paths({ 'cloud_dir': '/tmp' }) - self.mount_dir = '/tmp/cloudinit_test_media' + self.mount_dir = tempfile.mkdtemp() _write_user_data_files(self.mount_dir, 'test user data') @@ -372,13 +372,20 @@ class TestDataSouceAltCloud_read_user_data_callback(TestCase): _remove_user_data_files(self.mount_dir) - def test_read_user_data_callback_both(self): + # Attempt to remove the temp dir ignoring errors + try: + shutil.rmtree(self.mount_dir) + except OSError: + pass + + + def test_callback_both(self): '''Test read_user_data_callback() with both files''' self.assertEquals('test user data', read_user_data_callback(self.mount_dir)) - def test_read_user_data_callback_dc(self): + def test_callback_dc(self): '''Test read_user_data_callback() with only DC file''' _remove_user_data_files(self.mount_dir, @@ -388,7 +395,7 @@ class TestDataSouceAltCloud_read_user_data_callback(TestCase): self.assertEquals('test user data', read_user_data_callback(self.mount_dir)) - def test_read_user_data_callback_non_dc(self): + def test_callback_non_dc(self): '''Test read_user_data_callback() with only non-DC file''' _remove_user_data_files(self.mount_dir, @@ -398,7 +405,7 @@ class TestDataSouceAltCloud_read_user_data_callback(TestCase): self.assertEquals('test user data', read_user_data_callback(self.mount_dir)) - def test_read_user_data_callback_none(self): + def test_callback_none(self): '''Test read_user_data_callback() no files are found''' _remove_user_data_files(self.mount_dir) |