diff options
author | Joe VLcek <JoeV@RedHat.com> | 2012-08-08 14:36:41 -0400 |
---|---|---|
committer | Joe VLcek <JoeV@RedHat.com> | 2012-08-08 14:36:41 -0400 |
commit | 6f2dc0d943d3f3527983b38f20085a6cf071c43e (patch) | |
tree | 9aec40e41a56e93cae0864658b4671eea5d05263 | |
parent | 5e6ee0444fa7d2acf688b152c7fd632c878e23c0 (diff) | |
download | vyos-cloud-init-6f2dc0d943d3f3527983b38f20085a6cf071c43e.tar.gz vyos-cloud-init-6f2dc0d943d3f3527983b38f20085a6cf071c43e.zip |
Address review feedback for:
https://code.launchpad.net/~joev-n/cloud-init/altcloud-changes/+merge/116542
-rw-r--r-- | cloudinit/settings.py | 2 | ||||
-rw-r--r-- | cloudinit/sources/DataSourceAltCloud.py | 211 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_altcloud.py | 299 |
3 files changed, 173 insertions, 339 deletions
diff --git a/cloudinit/settings.py b/cloudinit/settings.py index 28340e29..cdfc31ae 100644 --- a/cloudinit/settings.py +++ b/cloudinit/settings.py @@ -29,9 +29,9 @@ CLOUD_CONFIG = '/etc/cloud/cloud.cfg' # What u get if no config is provided CFG_BUILTIN = { 'datasource_list': [ - 'AltCloud', 'NoCloud', 'ConfigDrive', + 'AltCloud', 'OVF', 'MAAS', 'Ec2', diff --git a/cloudinit/sources/DataSourceAltCloud.py b/cloudinit/sources/DataSourceAltCloud.py index 27a67e6c..0f4cc8e0 100644 --- a/cloudinit/sources/DataSourceAltCloud.py +++ b/cloudinit/sources/DataSourceAltCloud.py @@ -4,9 +4,6 @@ # Copyright (C) 2012 Hewlett-Packard Development Company, L.P. # Copyright (C) 2012 Yahoo! Inc. # -# Author: Scott Moser <scott.moser@canonical.com> -# Author: Juerg Hafliger <juerg.haefliger@hp.com> -# Author: Joshua Harlow <harlowja@yahoo-inc.com> # Author: Joe VLcek <JVLcek@RedHat.com> # # This program is free software: you can redistribute it and/or modify @@ -21,9 +18,10 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import time +import errno import os import os.path +import time from cloudinit import log as logging from cloudinit import sources @@ -34,19 +32,10 @@ LOG = logging.getLogger(__name__) # Needed file paths CLOUD_INFO_FILE = '/etc/sysconfig/cloud-info' -MEDIA_DIR = '/media/userdata' - -# Deltacloud file name contains deltacloud. Those not using -# Deltacloud but instead instrumenting the injection, could -# drop deltacloud from the file name. -DELTACLOUD_USER_DATA_FILE = MEDIA_DIR + '/deltacloud-user-data.txt' -USER_DATA_FILE = MEDIA_DIR + '/user-data.txt' # Shell command lists CMD_DMI_SYSTEM = ['/usr/sbin/dmidecode', '--string', 'system-product-name'] CMD_PROBE_FLOPPY = ['/sbin/modprobe', 'floppy'] -CMD_MNT_FLOPPY = ['/bin/mount', '/dev/fd0', MEDIA_DIR] -CMD_MNT_CDROM = ['/bin/mount', '/dev/cdrom', MEDIA_DIR] # Retry times and sleep secs between each try RETRY_TIMES = 3 @@ -60,18 +49,50 @@ META_DATA_NOT_SUPPORTED = { } +def read_user_data_callback(mount_dir): + ''' + Description: + This callback will be applied by util.mount_cb() on the mounted + file. + + Deltacloud file name contains deltacloud. Those not using + Deltacloud but instead instrumenting the injection, could + drop deltacloud from the file name. + + Input: + mount_dir - Mount directory + + Returns: + User Data + + ''' + + deltacloud_user_data_file = mount_dir + '/deltacloud-user-data.txt' + user_data_file = mount_dir + '/user-data.txt' + + # First try deltacloud_user_data_file. On failure try user_data_file. + try: + with open(deltacloud_user_data_file, 'r') as user_data_f: + user_data = user_data_f.read().strip() + except: + try: + with open(user_data_file, 'r') as user_data_f: + user_data = user_data_f.read().strip() + except: + util.logexc(LOG, ('Failed accessing user data file.')) + return None + + return user_data + + class DataSourceAltCloud(sources.DataSource): def __init__(self, sys_cfg, distro, paths): sources.DataSource.__init__(self, sys_cfg, distro, paths) - self.dsmode = 'local' self.seed = None - self.cmdline_id = "ds=nocloud" - self.seed_dir = os.path.join(paths.seed_dir, 'nocloud') self.supported_seed_starts = ("/", "file://") def __str__(self): - mstr = "%s [seed=%s][dsmode=%s]" % (util.obj_name(self), - self.seed, self.dsmode) + mstr = "%s [seed=%s]" % (util.obj_name(self), self.seed) return mstr def get_cloud_type(self): @@ -117,6 +138,7 @@ class DataSourceAltCloud(sources.DataSource): ''' Description: User Data is passed to the launching instance which + is used to perform instance configuration. Cloud providers expose the user data differently. It is necessary to determine which cloud provider @@ -172,15 +194,19 @@ class DataSourceAltCloud(sources.DataSource): RHEVM specific userdata read If on RHEV-M the user data will be contained on the - floppy device in file <USER_DATA_FILE> + floppy device in file <user_data_file> To access it: modprobe floppy - mkdir <MEDIA_DIR> - mount /dev/fd0 <MEDIA_DIR> - mount /dev/fd0 <MEDIA_DIR> # NOTE: -> /dev/ - read <MEDIA_DIR>/<USER_DATA_FILE> + + Leverage util.mount_cb to: + mkdir <tmp mount dir> + mount /dev/fd0 <tmp mount dir> + The call back passed to util.mount_cb will do: + read <tmp mount dir>/<user_data_file> ''' + return_str = None + # modprobe floppy try: cmd = CMD_PROBE_FLOPPY @@ -195,118 +221,59 @@ class DataSourceAltCloud(sources.DataSource): (' '.join(cmd), _err.message))) return False - # mkdir <MEDIA_DIR> dir just in case it isn't already. - try: - os.makedirs(MEDIA_DIR) - except OSError, (_err, strerror): - if _err is not 17: - LOG.debug(('makedirs(<MEDIA_DIR>) failed: %s \nError: %s') % \ - (_err, strerror)) - return False - - # mount /dev/fd0 <MEDIA_DIR> - try: - cmd = CMD_MNT_FLOPPY - (cmd_out, _err) = util.subp(cmd) - LOG.debug(('Command: %s\nOutput%s') % (' '.join(cmd), cmd_out)) - except ProcessExecutionError, _err: - # Ignore failure: already mounted - if 'ALREADY MOUNTED' not in str(_err.message).upper(): - util.logexc(LOG, (('Failed command: %s\n%s') % \ - (' '.join(cmd), _err.message))) - return False - except OSError, _err: - util.logexc(LOG, (('Failed command: %s\n%s') % \ - (' '.join(cmd), _err.message))) - return False - - # This could be done using "with open()" but that's not available - # in Python 2.4 as used on RHEL5 - # First try DELTACLOUD_USER_DATA_FILE. If that fails then try - # USER_DATA_FILE. + floppy_dev = '/dev/fd0' try: - user_data_file = open(DELTACLOUD_USER_DATA_FILE, 'r') - user_data = user_data_file.read().strip() - user_data_file.close() - except: - try: - user_data_file = open(USER_DATA_FILE, 'r') - user_data = user_data_file.read().strip() - user_data_file.close() - except: - util.logexc(LOG, ('Failed accessing RHEVm user data file.')) - try: - user_data_file.close() - except: - pass - return False - - self.userdata_raw = user_data + return_str = util.mount_cb(floppy_dev, read_user_data_callback) + except OSError as err: + if err.errno != errno.ENOENT: + raise + except util.MountFailedError: + util.logexc(LOG, ("Failed to mount %s" + " when looking for user data"), floppy_dev) + + self.userdata_raw = return_str self.metadata = META_DATA_NOT_SUPPORTED - return True + if return_str: + return True + else: + return False def user_data_vsphere(self): ''' - VSphere specific userdata read + vSphere specific userdata read If on vSphere the user data will be contained on the - floppy device in file <USER_DATA_FILE> + cdrom device in file <user_data_file> To access it: - mkdir <MEDIA_DIR> dir just in case it isn't already. - mount /dev/cdrom <MEDIA_DIR> # NOTE: -> /dev/cdrom - read <MEDIA_DIR>/<USER_DATA_FILE> + Leverage util.mount_cb to: + mkdir <tmp mount dir> + mount /dev/fd0 <tmp mount dir> + The call back passed to util.mount_cb will do: + read <tmp mount dir>/<user_data_file> ''' - # mkdir <MEDIA_DIR> dir just in case it isn't already. - try: - os.makedirs(MEDIA_DIR) - except OSError, (_err, strerror): - if _err is not 17: - LOG.debug(('makedirs(<MEDIA_DIR>) failed: %s \nError: %s') % \ - (_err, strerror)) - return False - - # mount /dev/cdrom <MEDIA_DIR> - try: - cmd = CMD_MNT_CDROM - (cmd_out, _err) = util.subp(cmd) - LOG.debug(('Command: %s\nOutput%s') % (' '.join(cmd), cmd_out)) - except ProcessExecutionError, _err: - # Ignore failure: already mounted - if 'ALREADY MOUNTED' not in str(_err.message).upper(): - LOG.debug(('Failed command: %s\n%s') % \ - (' '.join(cmd), _err.message)) - return False - except OSError, _err: - LOG.debug(('Failed command: %s\n%s') % \ - (' '.join(cmd), _err.message)) - return False - - # This could be done using "with open()" but that's not available - # in Python 2.4 as used on RHEL5 - # First try DELTACLOUD_USER_DATA_FILE. If that fails then try - # USER_DATA_FILE. - try: - user_data_file = open(DELTACLOUD_USER_DATA_FILE, 'r') - user_data = user_data_file.read().strip() - user_data_file.close() - except: + return_str = None + cdrom_list = util.find_devs_with('LABEL=CDROM') + for cdrom_dev in cdrom_list: try: - user_data_file = open(USER_DATA_FILE, 'r') - user_data = user_data_file.read().strip() - user_data_file.close() - except: - LOG.debug('Failed accessing vSphere user data file.') - try: - user_data_file.close() - except: - pass - return False - - self.userdata_raw = user_data + return_str = util.mount_cb(cdrom_dev, read_user_data_callback) + if return_str: + break + except OSError as err: + if err.errno != errno.ENOENT: + raise + except util.MountFailedError: + util.logexc(LOG, ("Failed to mount %s" + " when looking for user data"), cdrom_dev) + + self.userdata_raw = return_str self.metadata = META_DATA_NOT_SUPPORTED - return True + + if return_str: + return True + else: + return False # Used to match classes to dependencies datasources = [ diff --git a/tests/unittests/test_datasource/test_altcloud.py b/tests/unittests/test_datasource/test_altcloud.py index 5c3c8ddf..27912652 100644 --- a/tests/unittests/test_datasource/test_altcloud.py +++ b/tests/unittests/test_datasource/test_altcloud.py @@ -1,20 +1,14 @@ #! /usr/bin/env python import os -import stat -import tempfile -from shutil import rmtree -from tempfile import mkdtemp from unittest import TestCase - -from time import sleep - from cloudinit import helpers # Get the cloudinit.sources.DataSourceAltCloud import items needed. import cloudinit.sources.DataSourceAltCloud from cloudinit.sources.DataSourceAltCloud import DataSourceAltCloud +from cloudinit.sources.DataSourceAltCloud import read_user_data_callback def _write_cloud_info_file(value): ''' @@ -33,28 +27,47 @@ def _remove_cloud_info_file(): ''' os.remove(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE) -def _write_user_data_files(value): +def _write_user_data_files(mount_dir, value): ''' - Populate the DELTACLOUD_USER_DATA_FILE the USER_DATA_FILE + Populate the deltacloud_user_data_file the user_data_file which would be populated with user data. ''' - f = open(cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE, 'w') + deltacloud_user_data_file = mount_dir + '/deltacloud-user-data.txt' + user_data_file = mount_dir + '/user-data.txt' + + f = open(deltacloud_user_data_file, 'w') f.write(value) f.close() - os.chmod(cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE, 0664) + os.chmod(deltacloud_user_data_file, 0664) - f = open(cloudinit.sources.DataSourceAltCloud.USER_DATA_FILE, 'w') + f = open(user_data_file, 'w') f.write(value) f.close() - os.chmod(cloudinit.sources.DataSourceAltCloud.USER_DATA_FILE, 0664) + os.chmod(user_data_file, 0664) -def _remove_user_data_files(): +def _remove_user_data_files(mount_dir, + dc_file=True, + non_dc_file=True): ''' - Remove the test files: DELTACLOUD_USER_DATA_FILE and - USER_DATA_FILE + Remove the test files: deltacloud_user_data_file and + user_data_file ''' - os.remove(cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE) - os.remove(cloudinit.sources.DataSourceAltCloud.USER_DATA_FILE) + deltacloud_user_data_file = mount_dir + '/deltacloud-user-data.txt' + user_data_file = mount_dir + '/user-data.txt' + + # Ignore any failures removeing files that are already gone. + if dc_file: + try: + os.remove(deltacloud_user_data_file) + except OSError: + pass + + if non_dc_file: + try: + os.remove(user_data_file) + except OSError: + pass + class TestDataSouceAltCloud_get_cloud_type(TestCase): ''' @@ -143,7 +156,7 @@ class TestDataSouceAltCloud_get_data_cloud_info_file(TestCase): def tearDown(self): # Reset cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ - CLOUD_INFO_FILE = '/etc/sysconfig/cloud-info' + '/etc/sysconfig/cloud-info' cloudinit.sources.DataSourceAltCloud.RETRY_TIMES = 3 cloudinit.sources.DataSourceAltCloud.SLEEP_SECS = 3 @@ -202,7 +215,7 @@ class TestDataSouceAltCloud_get_data_no_cloud_info_file(TestCase): def tearDown(self): # Reset cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ - CLOUD_INFO_FILE = '/etc/sysconfig/cloud-info' + '/etc/sysconfig/cloud-info' cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ ['dmidecode', '--string', 'system-product-name'] cloudinit.sources.DataSourceAltCloud.RETRY_TIMES = 3 @@ -241,83 +254,50 @@ class TestDataSouceAltCloud_user_data_rhevm(TestCase): def setUp(self): ''' Set up ''' self.paths = helpers.Paths({ 'cloud_dir': '/tmp' }) + self.mount_dir = '/tmp/cloudinit_test_media' + cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ '/tmp/cloudinit_test_etc_sysconfig_cloud-info' - cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \ - '/tmp/cloudinit_test_media' - cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ - cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + \ - '/deltacloud-user-data.txt' - cloudinit.sources.DataSourceAltCloud.USER_DATA_FILE = \ - cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + \ - '/user-data.txt' cloudinit.sources.DataSourceAltCloud.RETRY_TIMES = 1 cloudinit.sources.DataSourceAltCloud.SLEEP_SECS = 1 try: - os.mkdir(cloudinit.sources.DataSourceAltCloud.MEDIA_DIR) + os.mkdir(self.mount_dir) except OSError, (errno, strerror): # Ignore OSError: [Errno 17] File exists: if errno is not 17: raise - _write_user_data_files('test user data') + _write_user_data_files(self.mount_dir, 'test user data') def tearDown(self): # Reset - _remove_user_data_files() + _remove_user_data_files(self.mount_dir) cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ - CLOUD_INFO_FILE = '/etc/sysconfig/cloud-info' - cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = '/media' - - cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ - cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + \ - '/deltacloud-user-data.txt' - - cloudinit.sources.DataSourceAltCloud.USER_DATA_FILE = \ - cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + '/user-data.txt' + '/etc/sysconfig/cloud-info' cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ ['/sbin/modprobe', 'floppy'] - cloudinit.sources.DataSourceAltCloud.CMD_MNT_FLOPPY = \ - ['/bin/mount', '/dev/fd0', \ - cloudinit.sources.DataSourceAltCloud.MEDIA_DIR] cloudinit.sources.DataSourceAltCloud.RETRY_TIMES = 3 cloudinit.sources.DataSourceAltCloud.SLEEP_SECS = 3 def test_user_data_rhevm(self): - '''Test user_data_rhevm() ''' - - cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \ - '/tmp/cloudinit_test_media' - cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ - cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + \ - '/deltacloud-user-data.txt' + '''Test user_data_rhevm() where mount_cb fails''' cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ ['echo', 'modprobe floppy'] - cloudinit.sources.DataSourceAltCloud.CMD_MNT_FLOPPY = \ - ['echo', 'floppy mounted'] ds = DataSourceAltCloud({}, None, self.paths) - self.assertEquals(True, ds.user_data_rhevm()) + self.assertEquals(False, ds.user_data_rhevm()) def test_user_data_rhevm_modprobe_fails(self): '''Test user_data_rhevm() where modprobe fails. ''' - cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \ - '/tmp/cloudinit_test_media' - cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ - cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + \ - '/deltacloud-user-data.txt' - cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ ['ls', 'modprobe floppy'] - cloudinit.sources.DataSourceAltCloud.CMD_MNT_FLOPPY = \ - ['echo', 'floppy mounted'] ds = DataSourceAltCloud({}, None, self.paths) @@ -326,89 +306,13 @@ class TestDataSouceAltCloud_user_data_rhevm(TestCase): def test_user_data_rhevm_no_modprobe_cmd(self): '''Test user_data_rhevm() with no modprobe command. ''' - cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \ - '/tmp/cloudinit_test_media' - cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ - cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + \ - '/deltacloud-user-data.txt' - cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ ['bad command', 'modprobe floppy'] - cloudinit.sources.DataSourceAltCloud.CMD_MNT_FLOPPY = \ - ['echo', 'floppy mounted'] - - ds = DataSourceAltCloud({}, None, self.paths) - - self.assertEquals(False, ds.user_data_rhevm()) - - def test_user_data_rhevm_mount_fails(self): - '''Test user_data_rhevm() where mount fails. ''' - - cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \ - '/tmp/cloudinit_test_media' - cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ - cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + \ - '/deltacloud-user-data.txt' - - cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ - ['echo', 'modprobe floppy'] - cloudinit.sources.DataSourceAltCloud.CMD_MNT_FLOPPY = \ - ['ls', 'floppy mounted'] ds = DataSourceAltCloud({}, None, self.paths) self.assertEquals(False, ds.user_data_rhevm()) - def test_user_data_rhevm_no_user_data_file(self): - '''Test user_data_rhevm() with no user data files.''' - - cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \ - '/tmp/cloudinit_test_media' - cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ - cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + \ - '/not-user-data.txt' - cloudinit.sources.DataSourceAltCloud.USER_DATA_FILE = \ - cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + \ - '/not-user-data.txt' - - cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ - ['echo', 'modprobe floppy'] - cloudinit.sources.DataSourceAltCloud.CMD_MNT_FLOPPY = \ - ['echo', 'floppy mounted'] - - ds = DataSourceAltCloud({}, None, self.paths) - - self.assertEquals(False, ds.user_data_rhevm()) - - cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ - cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + \ - '/deltacloud-user-data.txt' - cloudinit.sources.DataSourceAltCloud.USER_DATA_FILE = \ - cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + \ - '/user-data.txt' - - def test_user_data_rhevm_no_user_data_file(self): - '''Test user_data_rhevm() with no deltacloud user data file.''' - - cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \ - '/tmp/cloudinit_test_media' - cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ - cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + \ - '/not-user-data.txt' - - cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ - ['echo', 'modprobe floppy'] - cloudinit.sources.DataSourceAltCloud.CMD_MNT_FLOPPY = \ - ['echo', 'floppy mounted'] - - ds = DataSourceAltCloud({}, None, self.paths) - - self.assertEquals(True, ds.user_data_rhevm()) - - cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ - cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + \ - '/deltacloud-user-data.txt' - class TestDataSouceAltCloud_user_data_vsphere(TestCase): ''' Test to exercise method: DataSourceAltCloud.user_data_vsphere() @@ -416,125 +320,88 @@ class TestDataSouceAltCloud_user_data_vsphere(TestCase): def setUp(self): ''' Set up ''' self.paths = helpers.Paths({ 'cloud_dir': '/tmp' }) + self.mount_dir = '/tmp/cloudinit_test_media' + cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ '/tmp/cloudinit_test_etc_sysconfig_cloud-info' - cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \ - '/tmp/cloudinit_test_media' - cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ - cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + \ - '/deltacloud-user-data.txt' - cloudinit.sources.DataSourceAltCloud.USER_DATA_FILE = \ - cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + \ - '/user-data.txt' cloudinit.sources.DataSourceAltCloud.RETRY_TIMES = 1 cloudinit.sources.DataSourceAltCloud.SLEEP_SECS = 1 try: - os.mkdir(cloudinit.sources.DataSourceAltCloud.MEDIA_DIR) + os.mkdir(self.mount_dir) except OSError, (errno, strerror): # Ignore OSError: [Errno 17] File exists: if errno is not 17: raise - _write_user_data_files('test user data') + _write_user_data_files(self.mount_dir, 'test user data') def tearDown(self): # Reset - _remove_user_data_files() + _remove_user_data_files(self.mount_dir) cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ - CLOUD_INFO_FILE = '/etc/sysconfig/cloud-info' - cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = '/media' - - cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ - cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + \ - '/deltacloud-user-data.txt' - - cloudinit.sources.DataSourceAltCloud.CMD_MNT_CDROM = \ - ['/bin/mount', '/dev/fd0', \ - cloudinit.sources.DataSourceAltCloud.MEDIA_DIR] - + '/etc/sysconfig/cloud-info' cloudinit.sources.DataSourceAltCloud.RETRY_TIMES = 3 cloudinit.sources.DataSourceAltCloud.SLEEP_SECS = 3 def test_user_data_vsphere(self): - '''Test user_data_vsphere() ''' + '''Test user_data_vsphere() where mount_cb fails''' cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \ '/tmp/cloudinit_test_media' - cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ - cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + \ - '/deltacloud-user-data.txt' - - cloudinit.sources.DataSourceAltCloud.CMD_MNT_CDROM = \ - ['echo', 'floppy mounted'] ds = DataSourceAltCloud({}, None, self.paths) - self.assertEquals(True, ds.user_data_vsphere()) - - def test_user_data_vsphere_mount_fails(self): - '''Test user_data_vsphere() where mount fails. ''' - - cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \ - '/tmp/cloudinit_test_media' - cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ - cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + \ - '/deltacloud-user-data.txt' - - cloudinit.sources.DataSourceAltCloud.CMD_MNT_CDROM = \ - ['ls', 'floppy mounted'] + self.assertEquals(False, ds.user_data_vsphere()) - ds = DataSourceAltCloud({}, None, self.paths) +class TestDataSouceAltCloud_read_user_data_callback(TestCase): + ''' + Test to exercise method: DataSourceAltCloud.read_user_data_callback() + ''' + def setUp(self): + ''' Set up ''' + self.paths = helpers.Paths({ 'cloud_dir': '/tmp' }) + self.mount_dir = '/tmp/cloudinit_test_media' - self.assertEquals(False, ds.user_data_vsphere()) + _write_user_data_files(self.mount_dir, 'test user data') - def test_user_data_vsphere_no_user_data_file(self): - '''Test user_data_vsphere() with no user data files.''' + def tearDown(self): + # Reset - cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \ - '/tmp/cloudinit_test_media' - cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ - cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + \ - '/not-user-data.txt' - cloudinit.sources.DataSourceAltCloud.USER_DATA_FILE = \ - cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + \ - '/not-user-data.txt' + _remove_user_data_files(self.mount_dir) - cloudinit.sources.DataSourceAltCloud.CMD_MNT_CDROM = \ - ['echo', 'floppy mounted'] + def test_read_user_data_callback_both(self): + '''Test read_user_data_callback() with both files''' - ds = DataSourceAltCloud({}, None, self.paths) + self.assertEquals('test user data', + read_user_data_callback(self.mount_dir)) - self.assertEquals(False, ds.user_data_vsphere()) + def test_read_user_data_callback_dc(self): + '''Test read_user_data_callback() with only DC file''' - cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ - cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + \ - '/deltacloud-user-data.txt' - cloudinit.sources.DataSourceAltCloud.USER_DATA_FILE = \ - cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + \ - '/user-data.txt' + _remove_user_data_files(self.mount_dir, + dc_file=False, + non_dc_file=True) - def test_user_data_vsphere_no_user_data_file(self): - '''Test user_data_vsphere() with no deltacloud user data files.''' + self.assertEquals('test user data', + read_user_data_callback(self.mount_dir)) - cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = \ - '/tmp/cloudinit_test_media' - cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ - cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + \ - '/not-user-data.txt' + def test_read_user_data_callback_non_dc(self): + '''Test read_user_data_callback() with only non-DC file''' - cloudinit.sources.DataSourceAltCloud.CMD_MNT_CDROM = \ - ['echo', 'floppy mounted'] + _remove_user_data_files(self.mount_dir, + dc_file=True, + non_dc_file=False) - ds = DataSourceAltCloud({}, None, self.paths) + self.assertEquals('test user data', + read_user_data_callback(self.mount_dir)) - self.assertEquals(True, ds.user_data_vsphere()) + def test_read_user_data_callback_none(self): + '''Test read_user_data_callback() no files are found''' - cloudinit.sources.DataSourceAltCloud.DELTACLOUD_USER_DATA_FILE = \ - cloudinit.sources.DataSourceAltCloud.MEDIA_DIR + \ - '/deltacloud-user-data.txt' + _remove_user_data_files(self.mount_dir) + self.assertEquals(None, read_user_data_callback(self.mount_dir)) # vi: ts=4 expandtab - |