diff options
-rw-r--r-- | ChangeLog | 1 | ||||
-rw-r--r-- | cloudinit/settings.py | 1 | ||||
-rw-r--r-- | cloudinit/sources/DataSourceAltCloud.py | 299 | ||||
-rw-r--r-- | doc/altcloud/README | 65 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_altcloud.py | 436 |
5 files changed, 802 insertions, 0 deletions
@@ -1,4 +1,5 @@ 0.7.0: + - Added RHEVm and vSphere support as source AltCloud [Joseph VLcek] - add write-files module (LP: #1012854) - Add setuptools + cheetah to debian package build dependencies (LP: #1022101) - Adjust the sysvinit local script to provide 'cloud-init-local' and have diff --git a/cloudinit/settings.py b/cloudinit/settings.py index 2083cf60..cdfc31ae 100644 --- a/cloudinit/settings.py +++ b/cloudinit/settings.py @@ -31,6 +31,7 @@ CFG_BUILTIN = { 'datasource_list': [ 'NoCloud', 'ConfigDrive', + 'AltCloud', 'OVF', 'MAAS', 'Ec2', diff --git a/cloudinit/sources/DataSourceAltCloud.py b/cloudinit/sources/DataSourceAltCloud.py new file mode 100644 index 00000000..69c376a5 --- /dev/null +++ b/cloudinit/sources/DataSourceAltCloud.py @@ -0,0 +1,299 @@ +# vi: ts=4 expandtab +# +# Copyright (C) 2009-2010 Canonical Ltd. +# Copyright (C) 2012 Hewlett-Packard Development Company, L.P. +# Copyright (C) 2012 Yahoo! Inc. +# +# Author: Joe VLcek <JVLcek@RedHat.com> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3, as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +''' +This file contains code used to gather the user data passed to an +instance on RHEVm and vSphere. +''' + +import errno +import os +import os.path + +from cloudinit import log as logging +from cloudinit import sources +from cloudinit import util +from cloudinit.util import ProcessExecutionError + +LOG = logging.getLogger(__name__) + +# Needed file paths +CLOUD_INFO_FILE = '/etc/sysconfig/cloud-info' + +# Shell command lists +CMD_DMI_SYSTEM = ['/usr/sbin/dmidecode', '--string', 'system-product-name'] +CMD_PROBE_FLOPPY = ['/sbin/modprobe', 'floppy'] +CMD_UDEVADM_SETTLE = ['/sbin/udevadm', 'settle', '--quiet', '--timeout=5'] + +META_DATA_NOT_SUPPORTED = { + 'block-device-mapping': {}, + 'instance-id': 455, + 'local-hostname': 'localhost', + 'placement': {}, + } + + +def read_user_data_callback(mount_dir): + ''' + Description: + This callback will be applied by util.mount_cb() on the mounted + file. + + Deltacloud file name contains deltacloud. Those not using + Deltacloud but instead instrumenting the injection, could + drop deltacloud from the file name. + + Input: + mount_dir - Mount directory + + Returns: + User Data + + ''' + + deltacloud_user_data_file = mount_dir + '/deltacloud-user-data.txt' + user_data_file = mount_dir + '/user-data.txt' + + # First try deltacloud_user_data_file. On failure try user_data_file. + try: + with open(deltacloud_user_data_file, 'r') as user_data_f: + user_data = user_data_f.read().strip() + except: + try: + with open(user_data_file, 'r') as user_data_f: + user_data = user_data_f.read().strip() + except: + util.logexc(LOG, ('Failed accessing user data file.')) + return None + + return user_data + + +class DataSourceAltCloud(sources.DataSource): + def __init__(self, sys_cfg, distro, paths): + sources.DataSource.__init__(self, sys_cfg, distro, paths) + self.seed = None + self.supported_seed_starts = ("/", "file://") + + def __str__(self): + mstr = "%s [seed=%s]" % (util.obj_name(self), self.seed) + return mstr + + def get_cloud_type(self): + ''' + Description: + Get the type for the cloud back end this instance is running on + by examining the string returned by: + dmidecode --string system-product-name + + On VMWare/vSphere dmidecode returns: RHEV Hypervisor + On VMWare/vSphere dmidecode returns: VMware Virtual Platform + + Input: + None + + Returns: + One of the following strings: + 'RHEV', 'VSPHERE' or 'UNKNOWN' + + ''' + + cmd = CMD_DMI_SYSTEM + try: + (cmd_out, _err) = util.subp(cmd) + except ProcessExecutionError, _err: + LOG.debug(('Failed command: %s\n%s') % \ + (' '.join(cmd), _err.message)) + return 'UNKNOWN' + except OSError, _err: + LOG.debug(('Failed command: %s\n%s') % \ + (' '.join(cmd), _err.message)) + return 'UNKNOWN' + + if cmd_out.upper().startswith('RHEV'): + return 'RHEV' + + if cmd_out.upper().startswith('VMWARE'): + return 'VSPHERE' + + return 'UNKNOWN' + + def get_data(self): + ''' + Description: + User Data is passed to the launching instance which + is used to perform instance configuration. + + Cloud providers expose the user data differently. + It is necessary to determine which cloud provider + the current instance is running on to determine + how to access the user data. Images built with + image factory will contain a CLOUD_INFO_FILE which + contains a string identifying the cloud provider. + + Images not built with Imagefactory will try to + determine what the cloud provider is based on system + information. + ''' + + LOG.debug('Invoked get_data()') + + if os.path.exists(CLOUD_INFO_FILE): + try: + cloud_info = open(CLOUD_INFO_FILE) + cloud_type = cloud_info.read().strip().upper() + cloud_info.close() + except: + util.logexc(LOG, 'Unable to access cloud info file.') + return False + else: + cloud_type = self.get_cloud_type() + + LOG.debug('cloud_type: ' + str(cloud_type)) + + if 'RHEV' in cloud_type: + if self.user_data_rhevm(): + return True + elif 'VSPHERE' in cloud_type: + if self.user_data_vsphere(): + return True + else: + # there was no recognized alternate cloud type + # indicating this handler should not be used. + return False + + # No user data found + util.logexc(LOG, ('Failed accessing user data.')) + return False + + def user_data_rhevm(self): + ''' + RHEVM specific userdata read + + If on RHEV-M the user data will be contained on the + floppy device in file <user_data_file> + To access it: + modprobe floppy + + Leverage util.mount_cb to: + mkdir <tmp mount dir> + mount /dev/fd0 <tmp mount dir> + The call back passed to util.mount_cb will do: + read <tmp mount dir>/<user_data_file> + ''' + + return_str = None + + # modprobe floppy + try: + cmd = CMD_PROBE_FLOPPY + (cmd_out, _err) = util.subp(cmd) + LOG.debug(('Command: %s\nOutput%s') % (' '.join(cmd), cmd_out)) + except ProcessExecutionError, _err: + util.logexc(LOG, (('Failed command: %s\n%s') % \ + (' '.join(cmd), _err.message))) + return False + except OSError, _err: + util.logexc(LOG, (('Failed command: %s\n%s') % \ + (' '.join(cmd), _err.message))) + return False + + floppy_dev = '/dev/fd0' + + # udevadm settle for floppy device + try: + cmd = CMD_UDEVADM_SETTLE + cmd.append('--exit-if-exists=' + floppy_dev) + (cmd_out, _err) = util.subp(cmd) + LOG.debug(('Command: %s\nOutput%s') % (' '.join(cmd), cmd_out)) + except ProcessExecutionError, _err: + util.logexc(LOG, (('Failed command: %s\n%s') % \ + (' '.join(cmd), _err.message))) + return False + except OSError, _err: + util.logexc(LOG, (('Failed command: %s\n%s') % \ + (' '.join(cmd), _err.message))) + return False + + try: + return_str = util.mount_cb(floppy_dev, read_user_data_callback) + except OSError as err: + if err.errno != errno.ENOENT: + raise + except util.MountFailedError: + util.logexc(LOG, ("Failed to mount %s" + " when looking for user data"), floppy_dev) + + self.userdata_raw = return_str + self.metadata = META_DATA_NOT_SUPPORTED + + if return_str: + return True + else: + return False + + def user_data_vsphere(self): + ''' + vSphere specific userdata read + + If on vSphere the user data will be contained on the + cdrom device in file <user_data_file> + To access it: + Leverage util.mount_cb to: + mkdir <tmp mount dir> + mount /dev/fd0 <tmp mount dir> + The call back passed to util.mount_cb will do: + read <tmp mount dir>/<user_data_file> + ''' + + return_str = None + cdrom_list = util.find_devs_with('LABEL=CDROM') + for cdrom_dev in cdrom_list: + try: + return_str = util.mount_cb(cdrom_dev, read_user_data_callback) + if return_str: + break + except OSError as err: + if err.errno != errno.ENOENT: + raise + except util.MountFailedError: + util.logexc(LOG, ("Failed to mount %s" + " when looking for user data"), cdrom_dev) + + self.userdata_raw = return_str + self.metadata = META_DATA_NOT_SUPPORTED + + if return_str: + return True + else: + return False + +# Used to match classes to dependencies +# Source DataSourceAltCloud does not really depend on networking. +# In the future 'dsmode' like behavior can be added to offer user +# the ability to run before networking. +datasources = [ + (DataSourceAltCloud, (sources.DEP_FILESYSTEM, sources.DEP_NETWORK)), +] + + +# Return a list of data sources that match this set of dependencies +def get_datasource_list(depends): + return sources.list_from_depends(depends, datasources) diff --git a/doc/altcloud/README b/doc/altcloud/README new file mode 100644 index 00000000..87d7949a --- /dev/null +++ b/doc/altcloud/README @@ -0,0 +1,65 @@ +Data souce AltCloud will be used to pick up user data on +RHEVm and vSphere. + +RHEVm: +====== +For REHVm v3.0 the userdata is injected into the VM using floppy +injection via the RHEVm dashboard "Custom Properties". The format +of the Custom Properties entry must be: +"floppyinject=user-data.txt:<base64 encoded data>" + +e.g.: To pass a simple bash script + +% cat simple_script.bash +#!/bin/bash +echo "Hello Joe!" >> /tmp/JJV_Joe_out.txt + +% base64 < simple_script.bash +IyEvYmluL2Jhc2gKZWNobyAiSGVsbG8gSm9lISIgPj4gL3RtcC9KSlZfSm9lX291dC50eHQK + +To pass this example script to cloud-init running in a RHEVm v3.0 VM +set the "Custom Properties" when creating the RHEMv v3.0 VM to: +floppyinject=user-data.txt:IyEvYmluL2Jhc2gKZWNobyAiSGVsbG8gSm9lISIgPj4gL3RtcC9KSlZfSm9lX291dC50eHQK + +NOTE: The prefix with file name must be: "floppyinject=user-data.txt:" + +It is also possible to launch a RHEVm v3.0 VM and pass optional user +data to it using the Delta Cloud. +For more inforation on Delta Cloud see: http://deltacloud.apache.org + +vSphere: +======== +For VMWare's vSphere the userdata is injected into the VM an ISO +via the cdrom. This can be done using the vSphere dashboard +by connecting an ISO image to the CD/DVD drive. + +To pass this example script to cloud-init running in a vSphere VM +set the CD/DVD drive when creating the vSphere VM to point to an +ISO on the data store. + +The ISO must contain the user data: + +For example, to pass the same simple_script.bash to vSphere: + +Create the ISO: +=============== +% mkdir my-iso + +NOTE: The file name on the ISO must be: "user-data.txt" +% cp simple_scirpt.bash my-iso/user-data.txt + +% genisoimage -o user-data.iso -r my-iso + +Verify the ISO: +=============== +% sudo mkdir /media/vsphere_iso +% sudo mount -o loop JoeV_CI_02.iso /media/vsphere_iso +% cat /media/vsphere_iso/user-data.txt +% sudo umount /media/vsphere_iso + +Then, launch the vSphere VM the ISO user-data.iso attached as a CDrom. + +It is also possible to launch a vSphere VM and pass optional user +data to it using the Delta Cloud. + +For more inforation on Delta Cloud see: http://deltacloud.apache.org diff --git a/tests/unittests/test_datasource/test_altcloud.py b/tests/unittests/test_datasource/test_altcloud.py new file mode 100644 index 00000000..54e152e9 --- /dev/null +++ b/tests/unittests/test_datasource/test_altcloud.py @@ -0,0 +1,436 @@ +# vi: ts=4 expandtab +# +# Copyright (C) 2009-2010 Canonical Ltd. +# Copyright (C) 2012 Hewlett-Packard Development Company, L.P. +# Copyright (C) 2012 Yahoo! Inc. +# +# Author: Joe VLcek <JVLcek@RedHat.com> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3, as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +''' +This test file exercises the code in sources DataSourceAltCloud.py +''' + +import os +import shutil +import tempfile + +from unittest import TestCase +from cloudinit import helpers + +# Get the cloudinit.sources.DataSourceAltCloud import items needed. +import cloudinit.sources.DataSourceAltCloud +from cloudinit.sources.DataSourceAltCloud import DataSourceAltCloud +from cloudinit.sources.DataSourceAltCloud import read_user_data_callback + +def _write_cloud_info_file(value): + ''' + Populate the CLOUD_INFO_FILE which would be populated + with a cloud backend identifier ImageFactory when building + an image with ImageFactory. + ''' + cifile = open(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 'w') + cifile.write(value) + cifile.close() + os.chmod(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 0664) + +def _remove_cloud_info_file(): + ''' + Remove the test CLOUD_INFO_FILE + ''' + os.remove(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE) + +def _write_user_data_files(mount_dir, value): + ''' + Populate the deltacloud_user_data_file the user_data_file + which would be populated with user data. + ''' + deltacloud_user_data_file = mount_dir + '/deltacloud-user-data.txt' + user_data_file = mount_dir + '/user-data.txt' + + udfile = open(deltacloud_user_data_file, 'w') + udfile.write(value) + udfile.close() + os.chmod(deltacloud_user_data_file, 0664) + + udfile = open(user_data_file, 'w') + udfile.write(value) + udfile.close() + os.chmod(user_data_file, 0664) + +def _remove_user_data_files(mount_dir, + dc_file=True, + non_dc_file=True): + ''' + Remove the test files: deltacloud_user_data_file and + user_data_file + ''' + deltacloud_user_data_file = mount_dir + '/deltacloud-user-data.txt' + user_data_file = mount_dir + '/user-data.txt' + + # Ignore any failures removeing files that are already gone. + if dc_file: + try: + os.remove(deltacloud_user_data_file) + except OSError: + pass + + if non_dc_file: + try: + os.remove(user_data_file) + except OSError: + pass + +class TestGetCloudType(TestCase): + ''' + Test to exercise method: DataSourceAltCloud.get_cloud_type() + ''' + + def setUp(self): + ''' Set up ''' + self.paths = helpers.Paths({ 'cloud_dir': '/tmp' }) + + def tearDown(self): + # Reset + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['dmidecode', '--string', 'system-product-name'] + + def test_rhev(self): + ''' + Test method get_cloud_type() for RHEVm systems. + Forcing dmidecode return to match a RHEVm system: RHEV Hypervisor + ''' + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['echo', 'RHEV Hypervisor'] + dsrc = DataSourceAltCloud({}, None, self.paths) + self.assertEquals('RHEV', \ + dsrc.get_cloud_type()) + + def test_vsphere(self): + ''' + Test method get_cloud_type() for vSphere systems. + Forcing dmidecode return to match a vSphere system: RHEV Hypervisor + ''' + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['echo', 'VMware Virtual Platform'] + dsrc = DataSourceAltCloud({}, None, self.paths) + self.assertEquals('VSPHERE', \ + dsrc.get_cloud_type()) + + def test_unknown(self): + ''' + Test method get_cloud_type() for unknown systems. + Forcing dmidecode return to match an unrecognized return. + ''' + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['echo', 'Unrecognized Platform'] + dsrc = DataSourceAltCloud({}, None, self.paths) + self.assertEquals('UNKNOWN', \ + dsrc.get_cloud_type()) + + def test_exception1(self): + ''' + Test method get_cloud_type() where command dmidecode fails. + ''' + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['ls', 'bad command'] + dsrc = DataSourceAltCloud({}, None, self.paths) + self.assertEquals('UNKNOWN', \ + dsrc.get_cloud_type()) + + def test_exception2(self): + ''' + Test method get_cloud_type() where command dmidecode is not available. + ''' + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['bad command'] + dsrc = DataSourceAltCloud({}, None, self.paths) + self.assertEquals('UNKNOWN', \ + dsrc.get_cloud_type()) + +class TestGetDataCloudInfoFile(TestCase): + ''' + Test to exercise method: DataSourceAltCloud.get_data() + With a contrived CLOUD_INFO_FILE + ''' + def setUp(self): + ''' Set up ''' + self.paths = helpers.Paths({ 'cloud_dir': '/tmp' }) + self.cloud_info_file = tempfile.mkstemp()[1] + cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ + self.cloud_info_file + + def tearDown(self): + # Reset + + # Attempt to remove the temp file ignoring errors + try: + os.remove(self.cloud_info_file) + except OSError: + pass + + cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ + '/etc/sysconfig/cloud-info' + + def test_rhev(self): + '''Success Test module get_data() forcing RHEV ''' + + _write_cloud_info_file('RHEV') + dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc.user_data_rhevm = lambda : True + self.assertEquals(True, dsrc.get_data()) + + def test_vsphere(self): + '''Success Test module get_data() forcing VSPHERE ''' + + _write_cloud_info_file('VSPHERE') + dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc.user_data_vsphere = lambda : True + self.assertEquals(True, dsrc.get_data()) + + def test_fail_rhev(self): + '''Failure Test module get_data() forcing RHEV ''' + + _write_cloud_info_file('RHEV') + dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc.user_data_rhevm = lambda : False + self.assertEquals(False, dsrc.get_data()) + + def test_fail_vsphere(self): + '''Failure Test module get_data() forcing VSPHERE ''' + + _write_cloud_info_file('VSPHERE') + dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc.user_data_vsphere = lambda : False + self.assertEquals(False, dsrc.get_data()) + + def test_unrecognized(self): + '''Failure Test module get_data() forcing unrecognized ''' + + _write_cloud_info_file('unrecognized') + dsrc = DataSourceAltCloud({}, None, self.paths) + self.assertEquals(False, dsrc.get_data()) + +class TestGetDataNoCloudInfoFile(TestCase): + ''' + Test to exercise method: DataSourceAltCloud.get_data() + Without a CLOUD_INFO_FILE + ''' + def setUp(self): + ''' Set up ''' + self.paths = helpers.Paths({ 'cloud_dir': '/tmp' }) + cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ + 'no such file' + + def tearDown(self): + # Reset + cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ + '/etc/sysconfig/cloud-info' + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['dmidecode', '--string', 'system-product-name'] + + def test_rhev_no_cloud_file(self): + '''Test No cloud info file module get_data() forcing RHEV ''' + + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['echo', 'RHEV Hypervisor'] + dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc.user_data_rhevm = lambda : True + self.assertEquals(True, dsrc.get_data()) + + def test_vsphere_no_cloud_file(self): + '''Test No cloud info file module get_data() forcing VSPHERE ''' + + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['echo', 'VMware Virtual Platform'] + dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc.user_data_vsphere = lambda : True + self.assertEquals(True, dsrc.get_data()) + + def test_failure_no_cloud_file(self): + '''Test No cloud info file module get_data() forcing unrecognized ''' + + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['echo', 'Unrecognized Platform'] + dsrc = DataSourceAltCloud({}, None, self.paths) + self.assertEquals(False, dsrc.get_data()) + +class TestUserDataRhevm(TestCase): + ''' + Test to exercise method: DataSourceAltCloud.user_data_rhevm() + ''' + def setUp(self): + ''' Set up ''' + self.paths = helpers.Paths({ 'cloud_dir': '/tmp' }) + self.mount_dir = tempfile.mkdtemp() + + _write_user_data_files(self.mount_dir, 'test user data') + + def tearDown(self): + # Reset + + _remove_user_data_files(self.mount_dir) + + # Attempt to remove the temp dir ignoring errors + try: + shutil.rmtree(self.mount_dir) + except OSError: + pass + + cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ + '/etc/sysconfig/cloud-info' + cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ + ['/sbin/modprobe', 'floppy'] + cloudinit.sources.DataSourceAltCloud.CMD_UDEVADM_SETTLE = \ + ['/sbin/udevadm', 'settle', '--quiet', '--timeout=5'] + + def test_mount_cb_fails(self): + '''Test user_data_rhevm() where mount_cb fails''' + + cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ + ['echo', 'modprobe floppy'] + + dsrc = DataSourceAltCloud({}, None, self.paths) + + self.assertEquals(False, dsrc.user_data_rhevm()) + + def test_modprobe_fails(self): + '''Test user_data_rhevm() where modprobe fails. ''' + + cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ + ['ls', 'modprobe floppy'] + + dsrc = DataSourceAltCloud({}, None, self.paths) + + self.assertEquals(False, dsrc.user_data_rhevm()) + + def test_no_modprobe_cmd(self): + '''Test user_data_rhevm() with no modprobe command. ''' + + cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ + ['bad command', 'modprobe floppy'] + + dsrc = DataSourceAltCloud({}, None, self.paths) + + self.assertEquals(False, dsrc.user_data_rhevm()) + + def test_udevadm_fails(self): + '''Test user_data_rhevm() where udevadm fails. ''' + + cloudinit.sources.DataSourceAltCloud.CMD_UDEVADM_SETTLE = \ + ['ls', 'udevadm floppy'] + + dsrc = DataSourceAltCloud({}, None, self.paths) + + self.assertEquals(False, dsrc.user_data_rhevm()) + + def test_no_udevadm_cmd(self): + '''Test user_data_rhevm() with no udevadm command. ''' + + cloudinit.sources.DataSourceAltCloud.CMD_UDEVADM_SETTLE = \ + ['bad command', 'udevadm floppy'] + + dsrc = DataSourceAltCloud({}, None, self.paths) + + self.assertEquals(False, dsrc.user_data_rhevm()) + +class TestUserDataVsphere(TestCase): + ''' + Test to exercise method: DataSourceAltCloud.user_data_vsphere() + ''' + def setUp(self): + ''' Set up ''' + self.paths = helpers.Paths({ 'cloud_dir': '/tmp' }) + self.mount_dir = tempfile.mkdtemp() + + _write_user_data_files(self.mount_dir, 'test user data') + + def tearDown(self): + # Reset + + _remove_user_data_files(self.mount_dir) + + # Attempt to remove the temp dir ignoring errors + try: + shutil.rmtree(self.mount_dir) + except OSError: + pass + + cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ + '/etc/sysconfig/cloud-info' + + def test_user_data_vsphere(self): + '''Test user_data_vsphere() where mount_cb fails''' + + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = self.mount_dir + + dsrc = DataSourceAltCloud({}, None, self.paths) + + self.assertEquals(False, dsrc.user_data_vsphere()) + +class TestReadUserDataCallback(TestCase): + ''' + Test to exercise method: DataSourceAltCloud.read_user_data_callback() + ''' + def setUp(self): + ''' Set up ''' + self.paths = helpers.Paths({ 'cloud_dir': '/tmp' }) + self.mount_dir = tempfile.mkdtemp() + + _write_user_data_files(self.mount_dir, 'test user data') + + def tearDown(self): + # Reset + + _remove_user_data_files(self.mount_dir) + + # Attempt to remove the temp dir ignoring errors + try: + shutil.rmtree(self.mount_dir) + except OSError: + pass + + + def test_callback_both(self): + '''Test read_user_data_callback() with both files''' + + self.assertEquals('test user data', + read_user_data_callback(self.mount_dir)) + + def test_callback_dc(self): + '''Test read_user_data_callback() with only DC file''' + + _remove_user_data_files(self.mount_dir, + dc_file=False, + non_dc_file=True) + + self.assertEquals('test user data', + read_user_data_callback(self.mount_dir)) + + def test_callback_non_dc(self): + '''Test read_user_data_callback() with only non-DC file''' + + _remove_user_data_files(self.mount_dir, + dc_file=True, + non_dc_file=False) + + self.assertEquals('test user data', + read_user_data_callback(self.mount_dir)) + + def test_callback_none(self): + '''Test read_user_data_callback() no files are found''' + + _remove_user_data_files(self.mount_dir) + self.assertEquals(None, read_user_data_callback(self.mount_dir)) + +# vi: ts=4 expandtab |