diff options
author | Joe VLcek <JoeV@RedHat.com> | 2012-07-17 15:10:10 -0400 |
---|---|---|
committer | Joe VLcek <JoeV@RedHat.com> | 2012-07-17 15:10:10 -0400 |
commit | 8822100158489d8f7fb5c38d440489df5df62996 (patch) | |
tree | c8fe668c927d147521edf49bac79ac9e23f38521 | |
parent | 285e127de45f0feed7170bafc79b502170d5b381 (diff) | |
download | vyos-cloud-init-8822100158489d8f7fb5c38d440489df5df62996.tar.gz vyos-cloud-init-8822100158489d8f7fb5c38d440489df5df62996.zip |
Added RHEVm and vSphere support as source AltCloud
-rw-r--r-- | cloudinit/settings.py | 1 | ||||
-rw-r--r-- | cloudinit/sources/DataSourceAltCloud.py | 302 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_altcloud.py | 498 |
3 files changed, 801 insertions, 0 deletions
diff --git a/cloudinit/settings.py b/cloudinit/settings.py index 2083cf60..28340e29 100644 --- a/cloudinit/settings.py +++ b/cloudinit/settings.py @@ -29,6 +29,7 @@ CLOUD_CONFIG = '/etc/cloud/cloud.cfg' # What u get if no config is provided CFG_BUILTIN = { 'datasource_list': [ + 'AltCloud', 'NoCloud', 'ConfigDrive', 'OVF', diff --git a/cloudinit/sources/DataSourceAltCloud.py b/cloudinit/sources/DataSourceAltCloud.py new file mode 100644 index 00000000..121a3c48 --- /dev/null +++ b/cloudinit/sources/DataSourceAltCloud.py @@ -0,0 +1,302 @@ +# vi: ts=4 expandtab +# +# Copyright (C) 2009-2010 Canonical Ltd. +# Copyright (C) 2012 Hewlett-Packard Development Company, L.P. +# Copyright (C) 2012 Yahoo! Inc. +# +# Author: Scott Moser <scott.moser@canonical.com> +# Author: Juerg Hafliger <juerg.haefliger@hp.com> +# Author: Joshua Harlow <harlowja@yahoo-inc.com> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3, as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import errno +import os +import os.path +import subprocess + +from cloudinit import log as logging +from cloudinit import sources +from cloudinit import util +from cloudinit.util import ProcessExecutionError + +LOG = logging.getLogger(__name__) + +''' +Needed file paths +''' +CLOUD_INFO_FILE = '/etc/sysconfig/cloud-info' +MEDIA_DIR = '/media/userdata' + +# Deltacloud file name contains deltacloud. Those not using +# Deltacloud but instead instrumenting the injection, could +# drop deltacloud from the file name. +DELTACLOUD_USER_DATA_FILE = MEDIA_DIR + '/deltacloud-user-data.txt' +USER_DATA_FILE = MEDIA_DIR + '/user-data.txt' + +''' +Shell command lists +''' +CMD_DMI_SYSTEM = ['/usr/sbin/dmidecode', '--string', 'system-product-name'] +CMD_PROBE_FLOPPY = ['/sbin/modprobe', 'floppy'] +CMD_MNT_FLOPPY = ['/bin/mount', '/dev/fd0', MEDIA_DIR] +CMD_MNT_CDROM = ['/bin/mount', '/dev/cdrom', MEDIA_DIR] + +META_DATA_NOT_SUPPORTED = { + 'block-device-mapping' : {}, + 'instance-id' : 455, + 'local-hostname' : 'localhost', + 'placement': {}, + } + +class DataSourceAltCloud(sources.DataSource): + def __init__(self, sys_cfg, distro, paths): + sources.DataSource.__init__(self, sys_cfg, distro, paths) + self.dsmode = 'local' + self.seed = None + self.cmdline_id = "ds=nocloud" + self.seed_dir = os.path.join(paths.seed_dir, 'nocloud') + self.supported_seed_starts = ("/", "file://") + + def __str__(self): + mstr = "%s [seed=%s][dsmode=%s]" % (util.obj_name(self), + self.seed, self.dsmode) + return mstr + + def get_cloud_type(self): + ''' + Description: + Get the type for the cloud back end this instance is running on + by examining the string returned by: + dmidecode --string system-product-name + + On VMWare/vSphere dmidecode returns: RHEV Hypervisor + On VMWare/vSphere dmidecode returns: VMware Virtual Platform + + Input: + None + + Returns: + One of the following strings: + 'RHEV', 'VSPHERE' or 'UNKNOWN' + + ''' + + cmd = CMD_DMI_SYSTEM + try: + (cmd_out, _err) = util.subp(cmd) + except ProcessExecutionError, _err: + LOG.debug(('Failed command: %s\n%s') % \ + (' '.join(cmd), _err.message)) + return 'UNKNOWN' + except OSError, _err: + LOG.debug(('Failed command: %s\n%s') % \ + (' '.join(cmd), _err.message)) + return 'UNKNOWN' + + if cmd_out.upper().startswith('RHEV'): + return 'RHEV' + + if cmd_out.upper().startswith('VMWARE'): + return 'VSPHERE' + + return 'UNKNOWN' + + def get_data(self): + ''' + Description: + User Data is passed to the launching instance which + + Cloud providers expose the user data differently. + It is necessary to determine which cloud provider + the current instance is running on to determine + how to access the user data. Images built with + image factory will contain a CLOUD_INFO_FILE which + contains a string identifying the cloud provider. + + Images not built with Imagefactory will try to + determine what the cloud provider is based on system + information. + ''' + + LOG.debug('Invoked get_data()') + + if os.path.exists(CLOUD_INFO_FILE): + try: + cloud_info = open(CLOUD_INFO_FILE) + cloud_type = cloud_info.read().strip().upper() + cloud_info.close() + except: + util.logexc(LOG, 'Unable to access cloud info file.') + return False + else: + cloud_type = self.get_cloud_type() + + LOG.debug('cloud_type: ' + str(cloud_type)) + + if 'RHEV' in cloud_type: + return self.user_data_rhevm() + elif 'VSPHERE' in cloud_type: + return self.user_data_vsphere() + else: + # there was no recognized alternate cloud type. + # suggesting this handler should not be used. + return False + + def user_data_rhevm(self): + ''' + RHEVM specific userdata read + + If on RHEV-M the user data will be contained on the + floppy device in file <USER_DATA_FILE> + To access it: + modprobe floppy + mkdir <MEDIA_DIR> + mount /dev/fd0 <MEDIA_DIR> + mount /dev/fd0 <MEDIA_DIR> # NOTE: -> /dev/ + read <MEDIA_DIR>/<USER_DATA_FILE> + ''' + + # modprobe floppy + try: + cmd = CMD_PROBE_FLOPPY + (cmd_out, _err) = util.subp(cmd) + except ProcessExecutionError, _err: + util.logexc(LOG, (('Failed command: %s\n%s') % \ + (' '.join(cmd), _err.message))) + return False + except OSError, _err: + util.logexc(LOG, (('Failed command: %s\n%s') % \ + (' '.join(cmd), _err.message))) + return False + + # mkdir <MEDIA_DIR> dir just in case it isn't already. + try: + os.makedirs(MEDIA_DIR) + except OSError, (errno, strerror): + if errno is not 17: + LOG.debug(('makedirs(<MEDIA_DIR>) failed: %s \nError: %s') % \ + (errno, strerror)) + return False + + # mount /dev/fd0 <MEDIA_DIR> + try: + cmd = CMD_MNT_FLOPPY + (cmd_out, _err) = util.subp(cmd) + except ProcessExecutionError, _err: + # Ignore failure: already mounted + if 'ALREADY MOUNTED' not in _err.message.upper(): + util.logexc(LOG, (('Failed command: %s\n%s') % \ + (' '.join(cmd), _err.message))) + return False + except OSError, _err: + util.logexc(LOG, (('Failed command: %s\n%s') % \ + (' '.join(cmd), _err.message))) + return False + + # This could be done using "with open()" but that's not available + # in Python 2.4 as used on RHEL5 + # First try DELTACLOUD_USER_DATA_FILE. If that fails then try + # USER_DATA_FILE. + try: + user_data_file = open(DELTACLOUD_USER_DATA_FILE, 'r') + user_data = user_data_file.read().strip() + user_data_file.close() + except: + try: + user_data_file = open(USER_DATA_FILE, 'r') + user_data = user_data_file.read().strip() + user_data_file.close() + except: + util.logexc(LOG, ('Failed accessing RHEVm user data file.')) + try: + user_data_file.close() + except: + pass + return False + + self.userdata_raw = user_data + self.metadata = META_DATA_NOT_SUPPORTED + + return True + + def user_data_vsphere(self): + ''' + VSphere specific userdata read + + If on vSphere the user data will be contained on the + floppy device in file <USER_DATA_FILE> + To access it: + mkdir <MEDIA_DIR> dir just in case it isn't already. + mount /dev/cdrom <MEDIA_DIR> # NOTE: -> /dev/cdrom + read <MEDIA_DIR>/<USER_DATA_FILE> + ''' + + # mkdir <MEDIA_DIR> dir just in case it isn't already. + try: + os.makedirs(MEDIA_DIR) + except OSError, (errno, strerror): + if errno is not 17: + LOG.debug(('makedirs(<MEDIA_DIR>) failed: %s \nError: %s') % \ + (errno, strerror)) + return False + + # mount /dev/cdrom <MEDIA_DIR> + try: + cmd = CMD_MNT_CDROM + (cmd_out, _err) = util.subp(cmd) + except ProcessExecutionError, _err: + # Ignore failure: already mounted + if 'ALREADY MOUNTED' not in _err.message.upper(): + LOG.debug(('Failed command: %s\n%s') % \ + (' '.join(cmd), _err.message)) + return False + except OSError, _err: + LOG.debug(('Failed command: %s\n%s') % \ + (' '.join(cmd), _err.message)) + return False + + # This could be done using "with open()" but that's not available + # in Python 2.4 as used on RHEL5 + # First try DELTACLOUD_USER_DATA_FILE. If that fails then try + # USER_DATA_FILE. + try: + user_data_file = open(DELTACLOUD_USER_DATA_FILE, 'r') + user_data = user_data_file.read().strip() + user_data_file.close() + except: + try: + user_data_file = open(USER_DATA_FILE, 'r') + user_data = user_data_file.read().strip() + user_data_file.close() + except: + LOG.debug('Failed accessing vSphere user data file.') + try: + user_data_file.close() + except: + pass + return False + + self.userdata_raw = user_data + self.metadata = META_DATA_NOT_SUPPORTED + return True + +# Used to match classes to dependencies +datasources = [ + (DataSourceAltCloud, (sources.DEP_FILESYSTEM, )), +] + + +# Return a list of data sources that match this set of dependencies +def get_datasource_list(depends): + return sources.list_from_depends(depends, datasources) diff --git a/tests/unittests/test_datasource/test_altcloud.py b/tests/unittests/test_datasource/test_altcloud.py new file mode 100644 index 00000000..d404fab9 --- /dev/null +++ b/tests/unittests/test_datasource/test_altcloud.py @@ -0,0 +1,498 @@ +#! /usr/bin/env python + +import os +import stat +import tempfile + +from shutil import rmtree +from tempfile import mkdtemp +from unittest import TestCase + +from time import sleep + +from cloudinit import helpers + +# Get the cloudinit.sources.DataSourceAltCloud import items needed. +import cloudinit.sources.DataSourceAltCloud +from cloudinit.sources.DataSourceAltCloud import DataSourceAltCloud + +def _write_cloud_info_file(value): + ''' + Populate the CLOUD_INFO_FILE which would be populated + with a cloud backend identifier ImageFactory when building + an image with ImageFactory. + ''' + f = open(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 'w') + f.write(value) + f.close() + os.chmod(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 0664) + +def _remove_cloud_info_file(): + ''' + Remove the test CLOUD_INFO_FILE + ''' + os.remove(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE) + +def _write_user_data_files(value): + ''' + Populate the DELTACLOUD_USER_DATA_FILE the USER_DATA_FILE + which would be populated with user data. + ''' + f = open(cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE, 'w') + f.write(value) + f.close() + os.chmod(cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE, 0664) + + f = open(cloudinit.sources.DataSourceAltCloud.USER_DATA_FILE, 'w') + f.write(value) + f.close() + os.chmod(cloudinit.sources.DataSourceAltCloud.USER_DATA_FILE, 0664) + +def _remove_user_data_files(): + ''' + Remove the test files: DELTACLOUD_USER_DATA_FILE and + USER_DATA_FILE + ''' + os.remove(cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE) + os.remove(cloudinit.sources.DataSourceAltCloud.USER_DATA_FILE) + +class TestDataSouceAltCloud_get_cloud_type(TestCase): + ''' + Test to exercise method: DataSourceAltCloud.get_cloud_type() + ''' + + def setUp(self): + ''' Set up ''' + self.paths = helpers.Paths({ 'cloud_dir': '/tmp' }) + + def tearDown(self): + # Reset + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['dmidecode', '--string', 'system-product-name'] + + def test_get_cloud_type_RHEV(self): + ''' + Test method get_cloud_type() for RHEVm systems. + Forcing dmidecode return to match a RHEVm system: RHEV Hypervisor + ''' + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['echo', 'RHEV Hypervisor'] + ds = DataSourceAltCloud({}, None, self.paths) + self.assertEquals('RHEV', \ + ds.get_cloud_type()) + + def test_get_cloud_type_VSPHERE(self): + ''' + Test method get_cloud_type() for vSphere systems. + Forcing dmidecode return to match a vSphere system: RHEV Hypervisor + ''' + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['echo', 'VMware Virtual Platform'] + ds = DataSourceAltCloud({}, None, self.paths) + self.assertEquals('VSPHERE', \ + ds.get_cloud_type()) + + def test_get_cloud_type_UNKNOWN(self): + ''' + Test method get_cloud_type() for unknown systems. + Forcing dmidecode return to match an unrecognized return. + ''' + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['echo', 'Unrecognized Platform'] + ds = DataSourceAltCloud({}, None, self.paths) + self.assertEquals('UNKNOWN', \ + ds.get_cloud_type()) + + def test_get_cloud_type_exception1(self): + ''' + Test method get_cloud_type() where command dmidecode fails. + ''' + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['ls', 'bad command'] + ds = DataSourceAltCloud({}, None, self.paths) + self.assertEquals('UNKNOWN', \ + ds.get_cloud_type()) + + def test_get_cloud_type_exception(self): + ''' + Test method get_cloud_type() where command dmidecode is not available. + ''' + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['bad command'] + ds = DataSourceAltCloud({}, None, self.paths) + self.assertEquals('UNKNOWN', \ + ds.get_cloud_type()) + +class TestDataSouceAltCloud_get_data_cloud_info_file(TestCase): + ''' + Test to exercise method: DataSourceAltCloud.get_data() + With a contrived CLOUD_INFO_FILE + ''' + def setUp(self): + ''' Set up ''' + self.paths = helpers.Paths({ 'cloud_dir': '/tmp' }) + cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ + '/tmp/cloudinit_test_etc_sysconfig_cloud-info' + + def tearDown(self): + # Reset + cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ + CLOUD_INFO_FILE = '/etc/sysconfig/cloud-info' + + def test_get_data_RHEV_cloud_file(self): + '''Success Test module get_data() forcing RHEV ''' + + _write_cloud_info_file('RHEV') + ds = DataSourceAltCloud({}, None, self.paths) + ds.user_data_rhevm = lambda : True + self.assertEquals(True, ds.get_data()) + + def test_get_data_VSPHERE_cloud_file(self): + '''Success Test module get_data() forcing VSPHERE ''' + + _write_cloud_info_file('VSPHERE') + ds = DataSourceAltCloud({}, None, self.paths) + ds.user_data_vsphere = lambda : True + self.assertEquals(True, ds.get_data()) + + def test_failure_get_data_RHEV_cloud_file(self): + '''Failure Test module get_data() forcing RHEV ''' + + _write_cloud_info_file('RHEV') + ds = DataSourceAltCloud({}, None, self.paths) + ds.user_data_rhevm = lambda : False + self.assertEquals(False, ds.get_data()) + + def test_failure_get_data_VSPHERE_cloud_file(self): + '''Failure Test module get_data() forcing VSPHERE ''' + + _write_cloud_info_file('VSPHERE') + ds = DataSourceAltCloud({}, None, self.paths) + ds.user_data_vsphere = lambda : False + self.assertEquals(False, ds.get_data()) + + def test_failure_get_data_unrecognized_cloud_file(self): + '''Failure Test module get_data() forcing unrecognized ''' + + _write_cloud_info_file('unrecognized') + ds = DataSourceAltCloud({}, None, self.paths) + self.assertEquals(False, ds.get_data()) + +class TestDataSouceAltCloud_get_data_no_cloud_info_file(TestCase): + ''' + Test to exercise method: DataSourceAltCloud.get_data() + Without a CLOUD_INFO_FILE + ''' + def setUp(self): + ''' Set up ''' + self.paths = helpers.Paths({ 'cloud_dir': '/tmp' }) + cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ + 'no such file' + + def tearDown(self): + # Reset + cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ + CLOUD_INFO_FILE = '/etc/sysconfig/cloud-info' + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['dmidecode', '--string', 'system-product-name'] + + def test_get_data_RHEV_cloud_file(self): + '''Test No cloud info file module get_data() forcing RHEV ''' + + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['echo', 'RHEV Hypervisor'] + ds = DataSourceAltCloud({}, None, self.paths) + ds.user_data_rhevm = lambda : True + self.assertEquals(True, ds.get_data()) + + def test_get_data_VSPHERE_cloud_file(self): + '''Test No cloud info file module get_data() forcing VSPHERE ''' + + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['echo', 'VMware Virtual Platform'] + ds = DataSourceAltCloud({}, None, self.paths) + ds.user_data_vsphere = lambda : True + self.assertEquals(True, ds.get_data()) + + def test_failure_get_data_VSPHERE_cloud_file(self): + '''Test No cloud info file module get_data() forcing unrecognized ''' + + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['echo', 'Unrecognized Platform'] + ds = DataSourceAltCloud({}, None, self.paths) + self.assertEquals(False, ds.get_data()) + +class TestDataSouceAltCloud_user_data_rhevm(TestCase): + ''' + Test to exercise method: DataSourceAltCloud.user_data_rhevm() + ''' + def setUp(self): + ''' Set up ''' + self.paths = helpers.Paths({ 'cloud_dir': '/tmp' }) + cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ + '/tmp/cloudinit_test_etc_sysconfig_cloud-info' + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \ + '/tmp/cloudinit_test_media' + cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + \ + '/deltacloud-user-data.txt' + cloudinit.sources.DataSourceAltCloud.USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + \ + '/user-data.txt' + + try: + os.mkdir(cloudinit.sources.DataSourceAltCloud.MEDIA_DIR) + except OSError, (errno, strerror): + # Ignore OSError: [Errno 17] File exists: + if errno is not 17: + raise + + _write_user_data_files('test user data') + + def tearDown(self): + # Reset + + _remove_user_data_files() + + cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ + CLOUD_INFO_FILE = '/etc/sysconfig/cloud-info' + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = '/media' + + cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/deltacloud-user-data.txt' + + cloudinit.sources.DataSourceAltCloud.USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/user-data.txt' + + cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ + ['/sbin/modprobe', 'floppy'] + cloudinit.sources.DataSourceAltCloud.CMD_MNT_FLOPPY = \ + ['/bin/mount', '/dev/fd0', cloudinit.sources.DataSourceAltCloud.MEDIA_DIR] + + def test_user_data_rhevm(self): + '''Test user_data_rhevm() ''' + + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \ + '/tmp/cloudinit_test_media' + cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/deltacloud-user-data.txt' + + cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ + ['echo', 'modprobe floppy'] + cloudinit.sources.DataSourceAltCloud.CMD_MNT_FLOPPY = \ + ['echo', 'floppy mounted'] + + ds = DataSourceAltCloud({}, None, self.paths) + + self.assertEquals(True, ds.user_data_rhevm()) + + def test_user_data_rhevm_modprobe_fails(self): + '''Test user_data_rhevm() where modprobe fails. ''' + + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \ + '/tmp/cloudinit_test_media' + cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/deltacloud-user-data.txt' + + cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ + ['ls', 'modprobe floppy'] + cloudinit.sources.DataSourceAltCloud.CMD_MNT_FLOPPY = \ + ['echo', 'floppy mounted'] + + ds = DataSourceAltCloud({}, None, self.paths) + + self.assertEquals(False, ds.user_data_rhevm()) + + def test_user_data_rhevm_no_modprobe_cmd(self): + '''Test user_data_rhevm() with no modprobe command. ''' + + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \ + '/tmp/cloudinit_test_media' + cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/deltacloud-user-data.txt' + + cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ + ['bad command', 'modprobe floppy'] + cloudinit.sources.DataSourceAltCloud.CMD_MNT_FLOPPY = \ + ['echo', 'floppy mounted'] + + ds = DataSourceAltCloud({}, None, self.paths) + + self.assertEquals(False, ds.user_data_rhevm()) + + def test_user_data_rhevm_mount_fails(self): + '''Test user_data_rhevm() where mount fails. ''' + + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \ + '/tmp/cloudinit_test_media' + cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/deltacloud-user-data.txt' + + cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ + ['echo', 'modprobe floppy'] + cloudinit.sources.DataSourceAltCloud.CMD_MNT_FLOPPY = \ + ['ls', 'floppy mounted'] + + ds = DataSourceAltCloud({}, None, self.paths) + + self.assertEquals(False, ds.user_data_rhevm()) + + def test_user_data_rhevm_no_user_data_file(self): + '''Test user_data_rhevm() with no user data files.''' + + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \ + '/tmp/cloudinit_test_media' + cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/not-user-data.txt' + cloudinit.sources.DataSourceAltCloud.USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/not-user-data.txt' + + cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ + ['echo', 'modprobe floppy'] + cloudinit.sources.DataSourceAltCloud.CMD_MNT_FLOPPY = \ + ['echo', 'floppy mounted'] + + ds = DataSourceAltCloud({}, None, self.paths) + + self.assertEquals(False, ds.user_data_rhevm()) + + cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/deltacloud-user-data.txt' + cloudinit.sources.DataSourceAltCloud.USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/user-data.txt' + + def test_user_data_rhevm_no_user_data_file(self): + '''Test user_data_rhevm() with no deltacloud user data file.''' + + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \ + '/tmp/cloudinit_test_media' + cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/not-user-data.txt' + + cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ + ['echo', 'modprobe floppy'] + cloudinit.sources.DataSourceAltCloud.CMD_MNT_FLOPPY = \ + ['echo', 'floppy mounted'] + + ds = DataSourceAltCloud({}, None, self.paths) + + self.assertEquals(True, ds.user_data_rhevm()) + + cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/deltacloud-user-data.txt' + +class TestDataSouceAltCloud_user_data_vsphere(TestCase): + ''' + Test to exercise method: DataSourceAltCloud.user_data_vsphere() + ''' + def setUp(self): + ''' Set up ''' + self.paths = helpers.Paths({ 'cloud_dir': '/tmp' }) + cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ + '/tmp/cloudinit_test_etc_sysconfig_cloud-info' + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \ + '/tmp/cloudinit_test_media' + cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + \ + '/deltacloud-user-data.txt' + cloudinit.sources.DataSourceAltCloud.USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + \ + '/user-data.txt' + + try: + os.mkdir(cloudinit.sources.DataSourceAltCloud.MEDIA_DIR) + except OSError, (errno, strerror): + # Ignore OSError: [Errno 17] File exists: + if errno is not 17: + raise + + _write_user_data_files('test user data') + + def tearDown(self): + # Reset + + _remove_user_data_files() + + cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ + CLOUD_INFO_FILE = '/etc/sysconfig/cloud-info' + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = '/media' + + cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/deltacloud-user-data.txt' + + cloudinit.sources.DataSourceAltCloud.CMD_MNT_CDROM = \ + ['/bin/mount', '/dev/fd0', cloudinit.sources.DataSourceAltCloud.MEDIA_DIR] + + + def test_user_data_vsphere(self): + '''Test user_data_vsphere() ''' + + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \ + '/tmp/cloudinit_test_media' + cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/deltacloud-user-data.txt' + + cloudinit.sources.DataSourceAltCloud.CMD_MNT_CDROM = \ + ['echo', 'floppy mounted'] + + ds = DataSourceAltCloud({}, None, self.paths) + + self.assertEquals(True, ds.user_data_vsphere()) + + def test_user_data_vsphere_mount_fails(self): + '''Test user_data_vsphere() where mount fails. ''' + + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \ + '/tmp/cloudinit_test_media' + cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/deltacloud-user-data.txt' + + cloudinit.sources.DataSourceAltCloud.CMD_MNT_CDROM = \ + ['ls', 'floppy mounted'] + + ds = DataSourceAltCloud({}, None, self.paths) + + self.assertEquals(False, ds.user_data_vsphere()) + + def test_user_data_vsphere_no_user_data_file(self): + '''Test user_data_vsphere() with no user data files.''' + + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \ + '/tmp/cloudinit_test_media' + cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/not-user-data.txt' + cloudinit.sources.DataSourceAltCloud.USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/not-user-data.txt' + + cloudinit.sources.DataSourceAltCloud.CMD_MNT_CDROM = \ + ['echo', 'floppy mounted'] + + ds = DataSourceAltCloud({}, None, self.paths) + + self.assertEquals(False, ds.user_data_vsphere()) + + cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/deltacloud-user-data.txt' + cloudinit.sources.DataSourceAltCloud.USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/user-data.txt' + + def test_user_data_vsphere_no_user_data_file(self): + '''Test user_data_vsphere() with no deltacloud user data files.''' + + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \ + '/tmp/cloudinit_test_media' + cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/not-user-data.txt' + + cloudinit.sources.DataSourceAltCloud.CMD_MNT_CDROM = \ + ['echo', 'floppy mounted'] + + ds = DataSourceAltCloud({}, None, self.paths) + + self.assertEquals(True, ds.user_data_vsphere()) + + cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/deltacloud-user-data.txt' + +# vi: ts=4 expandtab + |