summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cloudinit/settings.py1
-rw-r--r--cloudinit/sources/DataSourceAltCloud.py302
-rw-r--r--tests/unittests/test_datasource/test_altcloud.py498
3 files changed, 801 insertions, 0 deletions
diff --git a/cloudinit/settings.py b/cloudinit/settings.py
index 2083cf60..28340e29 100644
--- a/cloudinit/settings.py
+++ b/cloudinit/settings.py
@@ -29,6 +29,7 @@ CLOUD_CONFIG = '/etc/cloud/cloud.cfg'
# What u get if no config is provided
CFG_BUILTIN = {
'datasource_list': [
+ 'AltCloud',
'NoCloud',
'ConfigDrive',
'OVF',
diff --git a/cloudinit/sources/DataSourceAltCloud.py b/cloudinit/sources/DataSourceAltCloud.py
new file mode 100644
index 00000000..121a3c48
--- /dev/null
+++ b/cloudinit/sources/DataSourceAltCloud.py
@@ -0,0 +1,302 @@
+# vi: ts=4 expandtab
+#
+# Copyright (C) 2009-2010 Canonical Ltd.
+# Copyright (C) 2012 Hewlett-Packard Development Company, L.P.
+# Copyright (C) 2012 Yahoo! Inc.
+#
+# Author: Scott Moser <scott.moser@canonical.com>
+# Author: Juerg Hafliger <juerg.haefliger@hp.com>
+# Author: Joshua Harlow <harlowja@yahoo-inc.com>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3, as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import errno
+import os
+import os.path
+import subprocess
+
+from cloudinit import log as logging
+from cloudinit import sources
+from cloudinit import util
+from cloudinit.util import ProcessExecutionError
+
+LOG = logging.getLogger(__name__)
+
+'''
+Needed file paths
+'''
+CLOUD_INFO_FILE = '/etc/sysconfig/cloud-info'
+MEDIA_DIR = '/media/userdata'
+
+# Deltacloud file name contains deltacloud. Those not using
+# Deltacloud but instead instrumenting the injection, could
+# drop deltacloud from the file name.
+DELTACLOUD_USER_DATA_FILE = MEDIA_DIR + '/deltacloud-user-data.txt'
+USER_DATA_FILE = MEDIA_DIR + '/user-data.txt'
+
+'''
+Shell command lists
+'''
+CMD_DMI_SYSTEM = ['/usr/sbin/dmidecode', '--string', 'system-product-name']
+CMD_PROBE_FLOPPY = ['/sbin/modprobe', 'floppy']
+CMD_MNT_FLOPPY = ['/bin/mount', '/dev/fd0', MEDIA_DIR]
+CMD_MNT_CDROM = ['/bin/mount', '/dev/cdrom', MEDIA_DIR]
+
+META_DATA_NOT_SUPPORTED = {
+ 'block-device-mapping' : {},
+ 'instance-id' : 455,
+ 'local-hostname' : 'localhost',
+ 'placement': {},
+ }
+
+class DataSourceAltCloud(sources.DataSource):
+ def __init__(self, sys_cfg, distro, paths):
+ sources.DataSource.__init__(self, sys_cfg, distro, paths)
+ self.dsmode = 'local'
+ self.seed = None
+ self.cmdline_id = "ds=nocloud"
+ self.seed_dir = os.path.join(paths.seed_dir, 'nocloud')
+ self.supported_seed_starts = ("/", "file://")
+
+ def __str__(self):
+ mstr = "%s [seed=%s][dsmode=%s]" % (util.obj_name(self),
+ self.seed, self.dsmode)
+ return mstr
+
+ def get_cloud_type(self):
+ '''
+ Description:
+ Get the type for the cloud back end this instance is running on
+ by examining the string returned by:
+ dmidecode --string system-product-name
+
+ On VMWare/vSphere dmidecode returns: RHEV Hypervisor
+ On VMWare/vSphere dmidecode returns: VMware Virtual Platform
+
+ Input:
+ None
+
+ Returns:
+ One of the following strings:
+ 'RHEV', 'VSPHERE' or 'UNKNOWN'
+
+ '''
+
+ cmd = CMD_DMI_SYSTEM
+ try:
+ (cmd_out, _err) = util.subp(cmd)
+ except ProcessExecutionError, _err:
+ LOG.debug(('Failed command: %s\n%s') % \
+ (' '.join(cmd), _err.message))
+ return 'UNKNOWN'
+ except OSError, _err:
+ LOG.debug(('Failed command: %s\n%s') % \
+ (' '.join(cmd), _err.message))
+ return 'UNKNOWN'
+
+ if cmd_out.upper().startswith('RHEV'):
+ return 'RHEV'
+
+ if cmd_out.upper().startswith('VMWARE'):
+ return 'VSPHERE'
+
+ return 'UNKNOWN'
+
+ def get_data(self):
+ '''
+ Description:
+ User Data is passed to the launching instance which
+
+ Cloud providers expose the user data differently.
+ It is necessary to determine which cloud provider
+ the current instance is running on to determine
+ how to access the user data. Images built with
+ image factory will contain a CLOUD_INFO_FILE which
+ contains a string identifying the cloud provider.
+
+ Images not built with Imagefactory will try to
+ determine what the cloud provider is based on system
+ information.
+ '''
+
+ LOG.debug('Invoked get_data()')
+
+ if os.path.exists(CLOUD_INFO_FILE):
+ try:
+ cloud_info = open(CLOUD_INFO_FILE)
+ cloud_type = cloud_info.read().strip().upper()
+ cloud_info.close()
+ except:
+ util.logexc(LOG, 'Unable to access cloud info file.')
+ return False
+ else:
+ cloud_type = self.get_cloud_type()
+
+ LOG.debug('cloud_type: ' + str(cloud_type))
+
+ if 'RHEV' in cloud_type:
+ return self.user_data_rhevm()
+ elif 'VSPHERE' in cloud_type:
+ return self.user_data_vsphere()
+ else:
+ # there was no recognized alternate cloud type.
+ # suggesting this handler should not be used.
+ return False
+
+ def user_data_rhevm(self):
+ '''
+ RHEVM specific userdata read
+
+ If on RHEV-M the user data will be contained on the
+ floppy device in file <USER_DATA_FILE>
+ To access it:
+ modprobe floppy
+ mkdir <MEDIA_DIR>
+ mount /dev/fd0 <MEDIA_DIR>
+ mount /dev/fd0 <MEDIA_DIR> # NOTE: -> /dev/
+ read <MEDIA_DIR>/<USER_DATA_FILE>
+ '''
+
+ # modprobe floppy
+ try:
+ cmd = CMD_PROBE_FLOPPY
+ (cmd_out, _err) = util.subp(cmd)
+ except ProcessExecutionError, _err:
+ util.logexc(LOG, (('Failed command: %s\n%s') % \
+ (' '.join(cmd), _err.message)))
+ return False
+ except OSError, _err:
+ util.logexc(LOG, (('Failed command: %s\n%s') % \
+ (' '.join(cmd), _err.message)))
+ return False
+
+ # mkdir <MEDIA_DIR> dir just in case it isn't already.
+ try:
+ os.makedirs(MEDIA_DIR)
+ except OSError, (errno, strerror):
+ if errno is not 17:
+ LOG.debug(('makedirs(<MEDIA_DIR>) failed: %s \nError: %s') % \
+ (errno, strerror))
+ return False
+
+ # mount /dev/fd0 <MEDIA_DIR>
+ try:
+ cmd = CMD_MNT_FLOPPY
+ (cmd_out, _err) = util.subp(cmd)
+ except ProcessExecutionError, _err:
+ # Ignore failure: already mounted
+ if 'ALREADY MOUNTED' not in _err.message.upper():
+ util.logexc(LOG, (('Failed command: %s\n%s') % \
+ (' '.join(cmd), _err.message)))
+ return False
+ except OSError, _err:
+ util.logexc(LOG, (('Failed command: %s\n%s') % \
+ (' '.join(cmd), _err.message)))
+ return False
+
+ # This could be done using "with open()" but that's not available
+ # in Python 2.4 as used on RHEL5
+ # First try DELTACLOUD_USER_DATA_FILE. If that fails then try
+ # USER_DATA_FILE.
+ try:
+ user_data_file = open(DELTACLOUD_USER_DATA_FILE, 'r')
+ user_data = user_data_file.read().strip()
+ user_data_file.close()
+ except:
+ try:
+ user_data_file = open(USER_DATA_FILE, 'r')
+ user_data = user_data_file.read().strip()
+ user_data_file.close()
+ except:
+ util.logexc(LOG, ('Failed accessing RHEVm user data file.'))
+ try:
+ user_data_file.close()
+ except:
+ pass
+ return False
+
+ self.userdata_raw = user_data
+ self.metadata = META_DATA_NOT_SUPPORTED
+
+ return True
+
+ def user_data_vsphere(self):
+ '''
+ VSphere specific userdata read
+
+ If on vSphere the user data will be contained on the
+ floppy device in file <USER_DATA_FILE>
+ To access it:
+ mkdir <MEDIA_DIR> dir just in case it isn't already.
+ mount /dev/cdrom <MEDIA_DIR> # NOTE: -> /dev/cdrom
+ read <MEDIA_DIR>/<USER_DATA_FILE>
+ '''
+
+ # mkdir <MEDIA_DIR> dir just in case it isn't already.
+ try:
+ os.makedirs(MEDIA_DIR)
+ except OSError, (errno, strerror):
+ if errno is not 17:
+ LOG.debug(('makedirs(<MEDIA_DIR>) failed: %s \nError: %s') % \
+ (errno, strerror))
+ return False
+
+ # mount /dev/cdrom <MEDIA_DIR>
+ try:
+ cmd = CMD_MNT_CDROM
+ (cmd_out, _err) = util.subp(cmd)
+ except ProcessExecutionError, _err:
+ # Ignore failure: already mounted
+ if 'ALREADY MOUNTED' not in _err.message.upper():
+ LOG.debug(('Failed command: %s\n%s') % \
+ (' '.join(cmd), _err.message))
+ return False
+ except OSError, _err:
+ LOG.debug(('Failed command: %s\n%s') % \
+ (' '.join(cmd), _err.message))
+ return False
+
+ # This could be done using "with open()" but that's not available
+ # in Python 2.4 as used on RHEL5
+ # First try DELTACLOUD_USER_DATA_FILE. If that fails then try
+ # USER_DATA_FILE.
+ try:
+ user_data_file = open(DELTACLOUD_USER_DATA_FILE, 'r')
+ user_data = user_data_file.read().strip()
+ user_data_file.close()
+ except:
+ try:
+ user_data_file = open(USER_DATA_FILE, 'r')
+ user_data = user_data_file.read().strip()
+ user_data_file.close()
+ except:
+ LOG.debug('Failed accessing vSphere user data file.')
+ try:
+ user_data_file.close()
+ except:
+ pass
+ return False
+
+ self.userdata_raw = user_data
+ self.metadata = META_DATA_NOT_SUPPORTED
+ return True
+
+# Used to match classes to dependencies
+datasources = [
+ (DataSourceAltCloud, (sources.DEP_FILESYSTEM, )),
+]
+
+
+# Return a list of data sources that match this set of dependencies
+def get_datasource_list(depends):
+ return sources.list_from_depends(depends, datasources)
diff --git a/tests/unittests/test_datasource/test_altcloud.py b/tests/unittests/test_datasource/test_altcloud.py
new file mode 100644
index 00000000..d404fab9
--- /dev/null
+++ b/tests/unittests/test_datasource/test_altcloud.py
@@ -0,0 +1,498 @@
+#! /usr/bin/env python
+
+import os
+import stat
+import tempfile
+
+from shutil import rmtree
+from tempfile import mkdtemp
+from unittest import TestCase
+
+from time import sleep
+
+from cloudinit import helpers
+
+# Get the cloudinit.sources.DataSourceAltCloud import items needed.
+import cloudinit.sources.DataSourceAltCloud
+from cloudinit.sources.DataSourceAltCloud import DataSourceAltCloud
+
+def _write_cloud_info_file(value):
+ '''
+ Populate the CLOUD_INFO_FILE which would be populated
+ with a cloud backend identifier ImageFactory when building
+ an image with ImageFactory.
+ '''
+ f = open(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 'w')
+ f.write(value)
+ f.close()
+ os.chmod(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 0664)
+
+def _remove_cloud_info_file():
+ '''
+ Remove the test CLOUD_INFO_FILE
+ '''
+ os.remove(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE)
+
+def _write_user_data_files(value):
+ '''
+ Populate the DELTACLOUD_USER_DATA_FILE the USER_DATA_FILE
+ which would be populated with user data.
+ '''
+ f = open(cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE, 'w')
+ f.write(value)
+ f.close()
+ os.chmod(cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE, 0664)
+
+ f = open(cloudinit.sources.DataSourceAltCloud.USER_DATA_FILE, 'w')
+ f.write(value)
+ f.close()
+ os.chmod(cloudinit.sources.DataSourceAltCloud.USER_DATA_FILE, 0664)
+
+def _remove_user_data_files():
+ '''
+ Remove the test files: DELTACLOUD_USER_DATA_FILE and
+ USER_DATA_FILE
+ '''
+ os.remove(cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE)
+ os.remove(cloudinit.sources.DataSourceAltCloud.USER_DATA_FILE)
+
+class TestDataSouceAltCloud_get_cloud_type(TestCase):
+ '''
+ Test to exercise method: DataSourceAltCloud.get_cloud_type()
+ '''
+
+ def setUp(self):
+ ''' Set up '''
+ self.paths = helpers.Paths({ 'cloud_dir': '/tmp' })
+
+ def tearDown(self):
+ # Reset
+ cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
+ ['dmidecode', '--string', 'system-product-name']
+
+ def test_get_cloud_type_RHEV(self):
+ '''
+ Test method get_cloud_type() for RHEVm systems.
+ Forcing dmidecode return to match a RHEVm system: RHEV Hypervisor
+ '''
+ cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
+ ['echo', 'RHEV Hypervisor']
+ ds = DataSourceAltCloud({}, None, self.paths)
+ self.assertEquals('RHEV', \
+ ds.get_cloud_type())
+
+ def test_get_cloud_type_VSPHERE(self):
+ '''
+ Test method get_cloud_type() for vSphere systems.
+ Forcing dmidecode return to match a vSphere system: RHEV Hypervisor
+ '''
+ cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
+ ['echo', 'VMware Virtual Platform']
+ ds = DataSourceAltCloud({}, None, self.paths)
+ self.assertEquals('VSPHERE', \
+ ds.get_cloud_type())
+
+ def test_get_cloud_type_UNKNOWN(self):
+ '''
+ Test method get_cloud_type() for unknown systems.
+ Forcing dmidecode return to match an unrecognized return.
+ '''
+ cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
+ ['echo', 'Unrecognized Platform']
+ ds = DataSourceAltCloud({}, None, self.paths)
+ self.assertEquals('UNKNOWN', \
+ ds.get_cloud_type())
+
+ def test_get_cloud_type_exception1(self):
+ '''
+ Test method get_cloud_type() where command dmidecode fails.
+ '''
+ cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
+ ['ls', 'bad command']
+ ds = DataSourceAltCloud({}, None, self.paths)
+ self.assertEquals('UNKNOWN', \
+ ds.get_cloud_type())
+
+ def test_get_cloud_type_exception(self):
+ '''
+ Test method get_cloud_type() where command dmidecode is not available.
+ '''
+ cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
+ ['bad command']
+ ds = DataSourceAltCloud({}, None, self.paths)
+ self.assertEquals('UNKNOWN', \
+ ds.get_cloud_type())
+
+class TestDataSouceAltCloud_get_data_cloud_info_file(TestCase):
+ '''
+ Test to exercise method: DataSourceAltCloud.get_data()
+ With a contrived CLOUD_INFO_FILE
+ '''
+ def setUp(self):
+ ''' Set up '''
+ self.paths = helpers.Paths({ 'cloud_dir': '/tmp' })
+ cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
+ '/tmp/cloudinit_test_etc_sysconfig_cloud-info'
+
+ def tearDown(self):
+ # Reset
+ cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
+ CLOUD_INFO_FILE = '/etc/sysconfig/cloud-info'
+
+ def test_get_data_RHEV_cloud_file(self):
+ '''Success Test module get_data() forcing RHEV '''
+
+ _write_cloud_info_file('RHEV')
+ ds = DataSourceAltCloud({}, None, self.paths)
+ ds.user_data_rhevm = lambda : True
+ self.assertEquals(True, ds.get_data())
+
+ def test_get_data_VSPHERE_cloud_file(self):
+ '''Success Test module get_data() forcing VSPHERE '''
+
+ _write_cloud_info_file('VSPHERE')
+ ds = DataSourceAltCloud({}, None, self.paths)
+ ds.user_data_vsphere = lambda : True
+ self.assertEquals(True, ds.get_data())
+
+ def test_failure_get_data_RHEV_cloud_file(self):
+ '''Failure Test module get_data() forcing RHEV '''
+
+ _write_cloud_info_file('RHEV')
+ ds = DataSourceAltCloud({}, None, self.paths)
+ ds.user_data_rhevm = lambda : False
+ self.assertEquals(False, ds.get_data())
+
+ def test_failure_get_data_VSPHERE_cloud_file(self):
+ '''Failure Test module get_data() forcing VSPHERE '''
+
+ _write_cloud_info_file('VSPHERE')
+ ds = DataSourceAltCloud({}, None, self.paths)
+ ds.user_data_vsphere = lambda : False
+ self.assertEquals(False, ds.get_data())
+
+ def test_failure_get_data_unrecognized_cloud_file(self):
+ '''Failure Test module get_data() forcing unrecognized '''
+
+ _write_cloud_info_file('unrecognized')
+ ds = DataSourceAltCloud({}, None, self.paths)
+ self.assertEquals(False, ds.get_data())
+
+class TestDataSouceAltCloud_get_data_no_cloud_info_file(TestCase):
+ '''
+ Test to exercise method: DataSourceAltCloud.get_data()
+ Without a CLOUD_INFO_FILE
+ '''
+ def setUp(self):
+ ''' Set up '''
+ self.paths = helpers.Paths({ 'cloud_dir': '/tmp' })
+ cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
+ 'no such file'
+
+ def tearDown(self):
+ # Reset
+ cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
+ CLOUD_INFO_FILE = '/etc/sysconfig/cloud-info'
+ cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
+ ['dmidecode', '--string', 'system-product-name']
+
+ def test_get_data_RHEV_cloud_file(self):
+ '''Test No cloud info file module get_data() forcing RHEV '''
+
+ cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
+ ['echo', 'RHEV Hypervisor']
+ ds = DataSourceAltCloud({}, None, self.paths)
+ ds.user_data_rhevm = lambda : True
+ self.assertEquals(True, ds.get_data())
+
+ def test_get_data_VSPHERE_cloud_file(self):
+ '''Test No cloud info file module get_data() forcing VSPHERE '''
+
+ cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
+ ['echo', 'VMware Virtual Platform']
+ ds = DataSourceAltCloud({}, None, self.paths)
+ ds.user_data_vsphere = lambda : True
+ self.assertEquals(True, ds.get_data())
+
+ def test_failure_get_data_VSPHERE_cloud_file(self):
+ '''Test No cloud info file module get_data() forcing unrecognized '''
+
+ cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
+ ['echo', 'Unrecognized Platform']
+ ds = DataSourceAltCloud({}, None, self.paths)
+ self.assertEquals(False, ds.get_data())
+
+class TestDataSouceAltCloud_user_data_rhevm(TestCase):
+ '''
+ Test to exercise method: DataSourceAltCloud.user_data_rhevm()
+ '''
+ def setUp(self):
+ ''' Set up '''
+ self.paths = helpers.Paths({ 'cloud_dir': '/tmp' })
+ cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
+ '/tmp/cloudinit_test_etc_sysconfig_cloud-info'
+ cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \
+ '/tmp/cloudinit_test_media'
+ cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \
+ cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + \
+ '/deltacloud-user-data.txt'
+ cloudinit.sources.DataSourceAltCloud.USER_DATA_FILE = \
+ cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + \
+ '/user-data.txt'
+
+ try:
+ os.mkdir(cloudinit.sources.DataSourceAltCloud.MEDIA_DIR)
+ except OSError, (errno, strerror):
+ # Ignore OSError: [Errno 17] File exists:
+ if errno is not 17:
+ raise
+
+ _write_user_data_files('test user data')
+
+ def tearDown(self):
+ # Reset
+
+ _remove_user_data_files()
+
+ cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
+ CLOUD_INFO_FILE = '/etc/sysconfig/cloud-info'
+ cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = '/media'
+
+ cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \
+ cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/deltacloud-user-data.txt'
+
+ cloudinit.sources.DataSourceAltCloud.USER_DATA_FILE = \
+ cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/user-data.txt'
+
+ cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \
+ ['/sbin/modprobe', 'floppy']
+ cloudinit.sources.DataSourceAltCloud.CMD_MNT_FLOPPY = \
+ ['/bin/mount', '/dev/fd0', cloudinit.sources.DataSourceAltCloud.MEDIA_DIR]
+
+ def test_user_data_rhevm(self):
+ '''Test user_data_rhevm() '''
+
+ cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \
+ '/tmp/cloudinit_test_media'
+ cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \
+ cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/deltacloud-user-data.txt'
+
+ cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \
+ ['echo', 'modprobe floppy']
+ cloudinit.sources.DataSourceAltCloud.CMD_MNT_FLOPPY = \
+ ['echo', 'floppy mounted']
+
+ ds = DataSourceAltCloud({}, None, self.paths)
+
+ self.assertEquals(True, ds.user_data_rhevm())
+
+ def test_user_data_rhevm_modprobe_fails(self):
+ '''Test user_data_rhevm() where modprobe fails. '''
+
+ cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \
+ '/tmp/cloudinit_test_media'
+ cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \
+ cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/deltacloud-user-data.txt'
+
+ cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \
+ ['ls', 'modprobe floppy']
+ cloudinit.sources.DataSourceAltCloud.CMD_MNT_FLOPPY = \
+ ['echo', 'floppy mounted']
+
+ ds = DataSourceAltCloud({}, None, self.paths)
+
+ self.assertEquals(False, ds.user_data_rhevm())
+
+ def test_user_data_rhevm_no_modprobe_cmd(self):
+ '''Test user_data_rhevm() with no modprobe command. '''
+
+ cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \
+ '/tmp/cloudinit_test_media'
+ cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \
+ cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/deltacloud-user-data.txt'
+
+ cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \
+ ['bad command', 'modprobe floppy']
+ cloudinit.sources.DataSourceAltCloud.CMD_MNT_FLOPPY = \
+ ['echo', 'floppy mounted']
+
+ ds = DataSourceAltCloud({}, None, self.paths)
+
+ self.assertEquals(False, ds.user_data_rhevm())
+
+ def test_user_data_rhevm_mount_fails(self):
+ '''Test user_data_rhevm() where mount fails. '''
+
+ cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \
+ '/tmp/cloudinit_test_media'
+ cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \
+ cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/deltacloud-user-data.txt'
+
+ cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \
+ ['echo', 'modprobe floppy']
+ cloudinit.sources.DataSourceAltCloud.CMD_MNT_FLOPPY = \
+ ['ls', 'floppy mounted']
+
+ ds = DataSourceAltCloud({}, None, self.paths)
+
+ self.assertEquals(False, ds.user_data_rhevm())
+
+ def test_user_data_rhevm_no_user_data_file(self):
+ '''Test user_data_rhevm() with no user data files.'''
+
+ cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \
+ '/tmp/cloudinit_test_media'
+ cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \
+ cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/not-user-data.txt'
+ cloudinit.sources.DataSourceAltCloud.USER_DATA_FILE = \
+ cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/not-user-data.txt'
+
+ cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \
+ ['echo', 'modprobe floppy']
+ cloudinit.sources.DataSourceAltCloud.CMD_MNT_FLOPPY = \
+ ['echo', 'floppy mounted']
+
+ ds = DataSourceAltCloud({}, None, self.paths)
+
+ self.assertEquals(False, ds.user_data_rhevm())
+
+ cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \
+ cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/deltacloud-user-data.txt'
+ cloudinit.sources.DataSourceAltCloud.USER_DATA_FILE = \
+ cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/user-data.txt'
+
+ def test_user_data_rhevm_no_user_data_file(self):
+ '''Test user_data_rhevm() with no deltacloud user data file.'''
+
+ cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \
+ '/tmp/cloudinit_test_media'
+ cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \
+ cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/not-user-data.txt'
+
+ cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \
+ ['echo', 'modprobe floppy']
+ cloudinit.sources.DataSourceAltCloud.CMD_MNT_FLOPPY = \
+ ['echo', 'floppy mounted']
+
+ ds = DataSourceAltCloud({}, None, self.paths)
+
+ self.assertEquals(True, ds.user_data_rhevm())
+
+ cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \
+ cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/deltacloud-user-data.txt'
+
+class TestDataSouceAltCloud_user_data_vsphere(TestCase):
+ '''
+ Test to exercise method: DataSourceAltCloud.user_data_vsphere()
+ '''
+ def setUp(self):
+ ''' Set up '''
+ self.paths = helpers.Paths({ 'cloud_dir': '/tmp' })
+ cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
+ '/tmp/cloudinit_test_etc_sysconfig_cloud-info'
+ cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \
+ '/tmp/cloudinit_test_media'
+ cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \
+ cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + \
+ '/deltacloud-user-data.txt'
+ cloudinit.sources.DataSourceAltCloud.USER_DATA_FILE = \
+ cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + \
+ '/user-data.txt'
+
+ try:
+ os.mkdir(cloudinit.sources.DataSourceAltCloud.MEDIA_DIR)
+ except OSError, (errno, strerror):
+ # Ignore OSError: [Errno 17] File exists:
+ if errno is not 17:
+ raise
+
+ _write_user_data_files('test user data')
+
+ def tearDown(self):
+ # Reset
+
+ _remove_user_data_files()
+
+ cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
+ CLOUD_INFO_FILE = '/etc/sysconfig/cloud-info'
+ cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = '/media'
+
+ cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \
+ cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/deltacloud-user-data.txt'
+
+ cloudinit.sources.DataSourceAltCloud.CMD_MNT_CDROM = \
+ ['/bin/mount', '/dev/fd0', cloudinit.sources.DataSourceAltCloud.MEDIA_DIR]
+
+
+ def test_user_data_vsphere(self):
+ '''Test user_data_vsphere() '''
+
+ cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \
+ '/tmp/cloudinit_test_media'
+ cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \
+ cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/deltacloud-user-data.txt'
+
+ cloudinit.sources.DataSourceAltCloud.CMD_MNT_CDROM = \
+ ['echo', 'floppy mounted']
+
+ ds = DataSourceAltCloud({}, None, self.paths)
+
+ self.assertEquals(True, ds.user_data_vsphere())
+
+ def test_user_data_vsphere_mount_fails(self):
+ '''Test user_data_vsphere() where mount fails. '''
+
+ cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \
+ '/tmp/cloudinit_test_media'
+ cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \
+ cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/deltacloud-user-data.txt'
+
+ cloudinit.sources.DataSourceAltCloud.CMD_MNT_CDROM = \
+ ['ls', 'floppy mounted']
+
+ ds = DataSourceAltCloud({}, None, self.paths)
+
+ self.assertEquals(False, ds.user_data_vsphere())
+
+ def test_user_data_vsphere_no_user_data_file(self):
+ '''Test user_data_vsphere() with no user data files.'''
+
+ cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \
+ '/tmp/cloudinit_test_media'
+ cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \
+ cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/not-user-data.txt'
+ cloudinit.sources.DataSourceAltCloud.USER_DATA_FILE = \
+ cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/not-user-data.txt'
+
+ cloudinit.sources.DataSourceAltCloud.CMD_MNT_CDROM = \
+ ['echo', 'floppy mounted']
+
+ ds = DataSourceAltCloud({}, None, self.paths)
+
+ self.assertEquals(False, ds.user_data_vsphere())
+
+ cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \
+ cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/deltacloud-user-data.txt'
+ cloudinit.sources.DataSourceAltCloud.USER_DATA_FILE = \
+ cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/user-data.txt'
+
+ def test_user_data_vsphere_no_user_data_file(self):
+ '''Test user_data_vsphere() with no deltacloud user data files.'''
+
+ cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \
+ '/tmp/cloudinit_test_media'
+ cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \
+ cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/not-user-data.txt'
+
+ cloudinit.sources.DataSourceAltCloud.CMD_MNT_CDROM = \
+ ['echo', 'floppy mounted']
+
+ ds = DataSourceAltCloud({}, None, self.paths)
+
+ self.assertEquals(True, ds.user_data_vsphere())
+
+ cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \
+ cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/deltacloud-user-data.txt'
+
+# vi: ts=4 expandtab
+