diff options
author | Joe VLcek <JoeV@RedHat.com> | 2012-07-17 15:10:10 -0400 |
---|---|---|
committer | Joe VLcek <JoeV@RedHat.com> | 2012-07-17 15:10:10 -0400 |
commit | 8822100158489d8f7fb5c38d440489df5df62996 (patch) | |
tree | c8fe668c927d147521edf49bac79ac9e23f38521 /tests/unittests/test_datasource | |
parent | 285e127de45f0feed7170bafc79b502170d5b381 (diff) | |
download | vyos-cloud-init-8822100158489d8f7fb5c38d440489df5df62996.tar.gz vyos-cloud-init-8822100158489d8f7fb5c38d440489df5df62996.zip |
Added RHEVm and vSphere support as source AltCloud
Diffstat (limited to 'tests/unittests/test_datasource')
-rw-r--r-- | tests/unittests/test_datasource/test_altcloud.py | 498 |
1 files changed, 498 insertions, 0 deletions
diff --git a/tests/unittests/test_datasource/test_altcloud.py b/tests/unittests/test_datasource/test_altcloud.py new file mode 100644 index 00000000..d404fab9 --- /dev/null +++ b/tests/unittests/test_datasource/test_altcloud.py @@ -0,0 +1,498 @@ +#! /usr/bin/env python + +import os +import stat +import tempfile + +from shutil import rmtree +from tempfile import mkdtemp +from unittest import TestCase + +from time import sleep + +from cloudinit import helpers + +# Get the cloudinit.sources.DataSourceAltCloud import items needed. +import cloudinit.sources.DataSourceAltCloud +from cloudinit.sources.DataSourceAltCloud import DataSourceAltCloud + +def _write_cloud_info_file(value): + ''' + Populate the CLOUD_INFO_FILE which would be populated + with a cloud backend identifier ImageFactory when building + an image with ImageFactory. + ''' + f = open(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 'w') + f.write(value) + f.close() + os.chmod(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 0664) + +def _remove_cloud_info_file(): + ''' + Remove the test CLOUD_INFO_FILE + ''' + os.remove(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE) + +def _write_user_data_files(value): + ''' + Populate the DELTACLOUD_USER_DATA_FILE the USER_DATA_FILE + which would be populated with user data. + ''' + f = open(cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE, 'w') + f.write(value) + f.close() + os.chmod(cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE, 0664) + + f = open(cloudinit.sources.DataSourceAltCloud.USER_DATA_FILE, 'w') + f.write(value) + f.close() + os.chmod(cloudinit.sources.DataSourceAltCloud.USER_DATA_FILE, 0664) + +def _remove_user_data_files(): + ''' + Remove the test files: DELTACLOUD_USER_DATA_FILE and + USER_DATA_FILE + ''' + os.remove(cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE) + os.remove(cloudinit.sources.DataSourceAltCloud.USER_DATA_FILE) + +class TestDataSouceAltCloud_get_cloud_type(TestCase): + ''' + Test to exercise method: DataSourceAltCloud.get_cloud_type() + ''' + + def setUp(self): + ''' Set up ''' + self.paths = helpers.Paths({ 'cloud_dir': '/tmp' }) + + def tearDown(self): + # Reset + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['dmidecode', '--string', 'system-product-name'] + + def test_get_cloud_type_RHEV(self): + ''' + Test method get_cloud_type() for RHEVm systems. + Forcing dmidecode return to match a RHEVm system: RHEV Hypervisor + ''' + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['echo', 'RHEV Hypervisor'] + ds = DataSourceAltCloud({}, None, self.paths) + self.assertEquals('RHEV', \ + ds.get_cloud_type()) + + def test_get_cloud_type_VSPHERE(self): + ''' + Test method get_cloud_type() for vSphere systems. + Forcing dmidecode return to match a vSphere system: RHEV Hypervisor + ''' + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['echo', 'VMware Virtual Platform'] + ds = DataSourceAltCloud({}, None, self.paths) + self.assertEquals('VSPHERE', \ + ds.get_cloud_type()) + + def test_get_cloud_type_UNKNOWN(self): + ''' + Test method get_cloud_type() for unknown systems. + Forcing dmidecode return to match an unrecognized return. + ''' + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['echo', 'Unrecognized Platform'] + ds = DataSourceAltCloud({}, None, self.paths) + self.assertEquals('UNKNOWN', \ + ds.get_cloud_type()) + + def test_get_cloud_type_exception1(self): + ''' + Test method get_cloud_type() where command dmidecode fails. + ''' + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['ls', 'bad command'] + ds = DataSourceAltCloud({}, None, self.paths) + self.assertEquals('UNKNOWN', \ + ds.get_cloud_type()) + + def test_get_cloud_type_exception(self): + ''' + Test method get_cloud_type() where command dmidecode is not available. + ''' + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['bad command'] + ds = DataSourceAltCloud({}, None, self.paths) + self.assertEquals('UNKNOWN', \ + ds.get_cloud_type()) + +class TestDataSouceAltCloud_get_data_cloud_info_file(TestCase): + ''' + Test to exercise method: DataSourceAltCloud.get_data() + With a contrived CLOUD_INFO_FILE + ''' + def setUp(self): + ''' Set up ''' + self.paths = helpers.Paths({ 'cloud_dir': '/tmp' }) + cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ + '/tmp/cloudinit_test_etc_sysconfig_cloud-info' + + def tearDown(self): + # Reset + cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ + CLOUD_INFO_FILE = '/etc/sysconfig/cloud-info' + + def test_get_data_RHEV_cloud_file(self): + '''Success Test module get_data() forcing RHEV ''' + + _write_cloud_info_file('RHEV') + ds = DataSourceAltCloud({}, None, self.paths) + ds.user_data_rhevm = lambda : True + self.assertEquals(True, ds.get_data()) + + def test_get_data_VSPHERE_cloud_file(self): + '''Success Test module get_data() forcing VSPHERE ''' + + _write_cloud_info_file('VSPHERE') + ds = DataSourceAltCloud({}, None, self.paths) + ds.user_data_vsphere = lambda : True + self.assertEquals(True, ds.get_data()) + + def test_failure_get_data_RHEV_cloud_file(self): + '''Failure Test module get_data() forcing RHEV ''' + + _write_cloud_info_file('RHEV') + ds = DataSourceAltCloud({}, None, self.paths) + ds.user_data_rhevm = lambda : False + self.assertEquals(False, ds.get_data()) + + def test_failure_get_data_VSPHERE_cloud_file(self): + '''Failure Test module get_data() forcing VSPHERE ''' + + _write_cloud_info_file('VSPHERE') + ds = DataSourceAltCloud({}, None, self.paths) + ds.user_data_vsphere = lambda : False + self.assertEquals(False, ds.get_data()) + + def test_failure_get_data_unrecognized_cloud_file(self): + '''Failure Test module get_data() forcing unrecognized ''' + + _write_cloud_info_file('unrecognized') + ds = DataSourceAltCloud({}, None, self.paths) + self.assertEquals(False, ds.get_data()) + +class TestDataSouceAltCloud_get_data_no_cloud_info_file(TestCase): + ''' + Test to exercise method: DataSourceAltCloud.get_data() + Without a CLOUD_INFO_FILE + ''' + def setUp(self): + ''' Set up ''' + self.paths = helpers.Paths({ 'cloud_dir': '/tmp' }) + cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ + 'no such file' + + def tearDown(self): + # Reset + cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ + CLOUD_INFO_FILE = '/etc/sysconfig/cloud-info' + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['dmidecode', '--string', 'system-product-name'] + + def test_get_data_RHEV_cloud_file(self): + '''Test No cloud info file module get_data() forcing RHEV ''' + + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['echo', 'RHEV Hypervisor'] + ds = DataSourceAltCloud({}, None, self.paths) + ds.user_data_rhevm = lambda : True + self.assertEquals(True, ds.get_data()) + + def test_get_data_VSPHERE_cloud_file(self): + '''Test No cloud info file module get_data() forcing VSPHERE ''' + + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['echo', 'VMware Virtual Platform'] + ds = DataSourceAltCloud({}, None, self.paths) + ds.user_data_vsphere = lambda : True + self.assertEquals(True, ds.get_data()) + + def test_failure_get_data_VSPHERE_cloud_file(self): + '''Test No cloud info file module get_data() forcing unrecognized ''' + + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['echo', 'Unrecognized Platform'] + ds = DataSourceAltCloud({}, None, self.paths) + self.assertEquals(False, ds.get_data()) + +class TestDataSouceAltCloud_user_data_rhevm(TestCase): + ''' + Test to exercise method: DataSourceAltCloud.user_data_rhevm() + ''' + def setUp(self): + ''' Set up ''' + self.paths = helpers.Paths({ 'cloud_dir': '/tmp' }) + cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ + '/tmp/cloudinit_test_etc_sysconfig_cloud-info' + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \ + '/tmp/cloudinit_test_media' + cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + \ + '/deltacloud-user-data.txt' + cloudinit.sources.DataSourceAltCloud.USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + \ + '/user-data.txt' + + try: + os.mkdir(cloudinit.sources.DataSourceAltCloud.MEDIA_DIR) + except OSError, (errno, strerror): + # Ignore OSError: [Errno 17] File exists: + if errno is not 17: + raise + + _write_user_data_files('test user data') + + def tearDown(self): + # Reset + + _remove_user_data_files() + + cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ + CLOUD_INFO_FILE = '/etc/sysconfig/cloud-info' + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = '/media' + + cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/deltacloud-user-data.txt' + + cloudinit.sources.DataSourceAltCloud.USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/user-data.txt' + + cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ + ['/sbin/modprobe', 'floppy'] + cloudinit.sources.DataSourceAltCloud.CMD_MNT_FLOPPY = \ + ['/bin/mount', '/dev/fd0', cloudinit.sources.DataSourceAltCloud.MEDIA_DIR] + + def test_user_data_rhevm(self): + '''Test user_data_rhevm() ''' + + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \ + '/tmp/cloudinit_test_media' + cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/deltacloud-user-data.txt' + + cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ + ['echo', 'modprobe floppy'] + cloudinit.sources.DataSourceAltCloud.CMD_MNT_FLOPPY = \ + ['echo', 'floppy mounted'] + + ds = DataSourceAltCloud({}, None, self.paths) + + self.assertEquals(True, ds.user_data_rhevm()) + + def test_user_data_rhevm_modprobe_fails(self): + '''Test user_data_rhevm() where modprobe fails. ''' + + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \ + '/tmp/cloudinit_test_media' + cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/deltacloud-user-data.txt' + + cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ + ['ls', 'modprobe floppy'] + cloudinit.sources.DataSourceAltCloud.CMD_MNT_FLOPPY = \ + ['echo', 'floppy mounted'] + + ds = DataSourceAltCloud({}, None, self.paths) + + self.assertEquals(False, ds.user_data_rhevm()) + + def test_user_data_rhevm_no_modprobe_cmd(self): + '''Test user_data_rhevm() with no modprobe command. ''' + + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \ + '/tmp/cloudinit_test_media' + cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/deltacloud-user-data.txt' + + cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ + ['bad command', 'modprobe floppy'] + cloudinit.sources.DataSourceAltCloud.CMD_MNT_FLOPPY = \ + ['echo', 'floppy mounted'] + + ds = DataSourceAltCloud({}, None, self.paths) + + self.assertEquals(False, ds.user_data_rhevm()) + + def test_user_data_rhevm_mount_fails(self): + '''Test user_data_rhevm() where mount fails. ''' + + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \ + '/tmp/cloudinit_test_media' + cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/deltacloud-user-data.txt' + + cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ + ['echo', 'modprobe floppy'] + cloudinit.sources.DataSourceAltCloud.CMD_MNT_FLOPPY = \ + ['ls', 'floppy mounted'] + + ds = DataSourceAltCloud({}, None, self.paths) + + self.assertEquals(False, ds.user_data_rhevm()) + + def test_user_data_rhevm_no_user_data_file(self): + '''Test user_data_rhevm() with no user data files.''' + + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \ + '/tmp/cloudinit_test_media' + cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/not-user-data.txt' + cloudinit.sources.DataSourceAltCloud.USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/not-user-data.txt' + + cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ + ['echo', 'modprobe floppy'] + cloudinit.sources.DataSourceAltCloud.CMD_MNT_FLOPPY = \ + ['echo', 'floppy mounted'] + + ds = DataSourceAltCloud({}, None, self.paths) + + self.assertEquals(False, ds.user_data_rhevm()) + + cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/deltacloud-user-data.txt' + cloudinit.sources.DataSourceAltCloud.USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/user-data.txt' + + def test_user_data_rhevm_no_user_data_file(self): + '''Test user_data_rhevm() with no deltacloud user data file.''' + + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \ + '/tmp/cloudinit_test_media' + cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/not-user-data.txt' + + cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ + ['echo', 'modprobe floppy'] + cloudinit.sources.DataSourceAltCloud.CMD_MNT_FLOPPY = \ + ['echo', 'floppy mounted'] + + ds = DataSourceAltCloud({}, None, self.paths) + + self.assertEquals(True, ds.user_data_rhevm()) + + cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/deltacloud-user-data.txt' + +class TestDataSouceAltCloud_user_data_vsphere(TestCase): + ''' + Test to exercise method: DataSourceAltCloud.user_data_vsphere() + ''' + def setUp(self): + ''' Set up ''' + self.paths = helpers.Paths({ 'cloud_dir': '/tmp' }) + cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ + '/tmp/cloudinit_test_etc_sysconfig_cloud-info' + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \ + '/tmp/cloudinit_test_media' + cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + \ + '/deltacloud-user-data.txt' + cloudinit.sources.DataSourceAltCloud.USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + \ + '/user-data.txt' + + try: + os.mkdir(cloudinit.sources.DataSourceAltCloud.MEDIA_DIR) + except OSError, (errno, strerror): + # Ignore OSError: [Errno 17] File exists: + if errno is not 17: + raise + + _write_user_data_files('test user data') + + def tearDown(self): + # Reset + + _remove_user_data_files() + + cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ + CLOUD_INFO_FILE = '/etc/sysconfig/cloud-info' + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = '/media' + + cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/deltacloud-user-data.txt' + + cloudinit.sources.DataSourceAltCloud.CMD_MNT_CDROM = \ + ['/bin/mount', '/dev/fd0', cloudinit.sources.DataSourceAltCloud.MEDIA_DIR] + + + def test_user_data_vsphere(self): + '''Test user_data_vsphere() ''' + + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \ + '/tmp/cloudinit_test_media' + cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/deltacloud-user-data.txt' + + cloudinit.sources.DataSourceAltCloud.CMD_MNT_CDROM = \ + ['echo', 'floppy mounted'] + + ds = DataSourceAltCloud({}, None, self.paths) + + self.assertEquals(True, ds.user_data_vsphere()) + + def test_user_data_vsphere_mount_fails(self): + '''Test user_data_vsphere() where mount fails. ''' + + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \ + '/tmp/cloudinit_test_media' + cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/deltacloud-user-data.txt' + + cloudinit.sources.DataSourceAltCloud.CMD_MNT_CDROM = \ + ['ls', 'floppy mounted'] + + ds = DataSourceAltCloud({}, None, self.paths) + + self.assertEquals(False, ds.user_data_vsphere()) + + def test_user_data_vsphere_no_user_data_file(self): + '''Test user_data_vsphere() with no user data files.''' + + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \ + '/tmp/cloudinit_test_media' + cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/not-user-data.txt' + cloudinit.sources.DataSourceAltCloud.USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/not-user-data.txt' + + cloudinit.sources.DataSourceAltCloud.CMD_MNT_CDROM = \ + ['echo', 'floppy mounted'] + + ds = DataSourceAltCloud({}, None, self.paths) + + self.assertEquals(False, ds.user_data_vsphere()) + + cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/deltacloud-user-data.txt' + cloudinit.sources.DataSourceAltCloud.USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/user-data.txt' + + def test_user_data_vsphere_no_user_data_file(self): + '''Test user_data_vsphere() with no deltacloud user data files.''' + + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \ + '/tmp/cloudinit_test_media' + cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/not-user-data.txt' + + cloudinit.sources.DataSourceAltCloud.CMD_MNT_CDROM = \ + ['echo', 'floppy mounted'] + + ds = DataSourceAltCloud({}, None, self.paths) + + self.assertEquals(True, ds.user_data_vsphere()) + + cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/deltacloud-user-data.txt' + +# vi: ts=4 expandtab + |