summaryrefslogtreecommitdiff
path: root/tests/unittests/test_datasource/test_altcloud.py
diff options
context:
space:
mode:
authorChad Smith <chad.smith@canonical.com>2018-10-09 21:46:35 +0000
committerServer Team CI Bot <josh.powers+server-team-bot@canonical.com>2018-10-09 21:46:35 +0000
commitf0bc02d7e221c9aa5982b267739481420c761ead (patch)
treefb45cab8d2e9a44564eea23db18b0c3566fabe2a /tests/unittests/test_datasource/test_altcloud.py
parent00e36d3ded0b0f81f352de993036fc9f89e14a7a (diff)
downloadvyos-cloud-init-f0bc02d7e221c9aa5982b267739481420c761ead.tar.gz
vyos-cloud-init-f0bc02d7e221c9aa5982b267739481420c761ead.zip
instance-data: Add standard keys platform and subplatform. Refactor ec2.
Add the following instance-data.json standardized keys: * v1._beta_keys: List any v1 keys in beta development, e.g. ['subplatform']. * v1.public_ssh_keys: List of any cloud-provided ssh keys for the instance. * v1.platform: String representing the cloud platform api supporting the datasource. For example: 'ec2' for aws, aliyun and brightbox cloud names. * v1.subplatform: String with more details about the source of the metadata consumed. For example, metadata uri, config drive device path or seed directory. To support the new platform and subplatform standardized instance-data, DataSource and its subclasses grew platform and subplatform attributes. The platform attribute defaults to the lowercase string datasource name at self.dsname. This method is overridden in NoCloud, Ec2 and ConfigDrive datasources. The subplatform attribute calls a _get_subplatform method which will return a string containing a simple slug for subplatform type such as metadata, seed-dir or config-drive followed by a detailed uri, device or directory path where the datasource consumed its configuration. As part of this work, DatasourceEC2 methods _get_data and _crawl_metadata have been refactored for a few reasons: - crawl_metadata is now a read-only operation, persisting no attributes on the datasource instance and returns a dictionary of consumed metadata. - crawl_metadata now closely represents the raw stucture of the ec2 metadata consumed, so that end-users can leverage public ec2 metadata documentation where possible. - crawl_metadata adds a '_metadata_api_version' key to the crawled ds.metadata to advertise what version of EC2's api was consumed by cloud-init. - _get_data now does all the processing of crawl_metadata and saves datasource instance attributes userdata_raw, metadata etc. Additional drive-bys: * unit test rework for test_altcloud and test_azure to simplify mocks and make use of existing util and test_helpers functions.
Diffstat (limited to 'tests/unittests/test_datasource/test_altcloud.py')
-rw-r--r--tests/unittests/test_datasource/test_altcloud.py118
1 files changed, 67 insertions, 51 deletions
diff --git a/tests/unittests/test_datasource/test_altcloud.py b/tests/unittests/test_datasource/test_altcloud.py
index ff35904e..3119bfac 100644
--- a/tests/unittests/test_datasource/test_altcloud.py
+++ b/tests/unittests/test_datasource/test_altcloud.py
@@ -10,7 +10,6 @@
This test file exercises the code in sources DataSourceAltCloud.py
'''
-import mock
import os
import shutil
import tempfile
@@ -18,32 +17,13 @@ import tempfile
from cloudinit import helpers
from cloudinit import util
-from cloudinit.tests.helpers import CiTestCase
+from cloudinit.tests.helpers import CiTestCase, mock
import cloudinit.sources.DataSourceAltCloud as dsac
OS_UNAME_ORIG = getattr(os, 'uname')
-def _write_cloud_info_file(value):
- '''
- Populate the CLOUD_INFO_FILE which would be populated
- with a cloud backend identifier ImageFactory when building
- an image with ImageFactory.
- '''
- cifile = open(dsac.CLOUD_INFO_FILE, 'w')
- cifile.write(value)
- cifile.close()
- os.chmod(dsac.CLOUD_INFO_FILE, 0o664)
-
-
-def _remove_cloud_info_file():
- '''
- Remove the test CLOUD_INFO_FILE
- '''
- os.remove(dsac.CLOUD_INFO_FILE)
-
-
def _write_user_data_files(mount_dir, value):
'''
Populate the deltacloud_user_data_file the user_data_file
@@ -98,13 +78,15 @@ def _dmi_data(expected):
class TestGetCloudType(CiTestCase):
- '''
- Test to exercise method: DataSourceAltCloud.get_cloud_type()
- '''
+ '''Test to exercise method: DataSourceAltCloud.get_cloud_type()'''
+
+ with_logs = True
def setUp(self):
'''Set up.'''
- self.paths = helpers.Paths({'cloud_dir': '/tmp'})
+ super(TestGetCloudType, self).setUp()
+ self.tmp = self.tmp_dir()
+ self.paths = helpers.Paths({'cloud_dir': self.tmp})
self.dmi_data = util.read_dmi_data
# We have a different code path for arm to deal with LP1243287
# We have to switch arch to x86_64 to avoid test failure
@@ -115,6 +97,26 @@ class TestGetCloudType(CiTestCase):
util.read_dmi_data = self.dmi_data
force_arch()
+ def test_cloud_info_file_ioerror(self):
+ """Return UNKNOWN when /etc/sysconfig/cloud-info exists but errors."""
+ self.assertEqual('/etc/sysconfig/cloud-info', dsac.CLOUD_INFO_FILE)
+ dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
+ # Attempting to read the directory generates IOError
+ with mock.patch.object(dsac, 'CLOUD_INFO_FILE', self.tmp):
+ self.assertEqual('UNKNOWN', dsrc.get_cloud_type())
+ self.assertIn(
+ "[Errno 21] Is a directory: '%s'" % self.tmp,
+ self.logs.getvalue())
+
+ def test_cloud_info_file(self):
+ """Return uppercase stripped content from /etc/sysconfig/cloud-info."""
+ dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
+ cloud_info = self.tmp_path('cloud-info', dir=self.tmp)
+ util.write_file(cloud_info, ' OverRiDdeN CloudType ')
+ # Attempting to read the directory generates IOError
+ with mock.patch.object(dsac, 'CLOUD_INFO_FILE', cloud_info):
+ self.assertEqual('OVERRIDDEN CLOUDTYPE', dsrc.get_cloud_type())
+
def test_rhev(self):
'''
Test method get_cloud_type() for RHEVm systems.
@@ -153,60 +155,57 @@ class TestGetDataCloudInfoFile(CiTestCase):
self.tmp = self.tmp_dir()
self.paths = helpers.Paths(
{'cloud_dir': self.tmp, 'run_dir': self.tmp})
- self.cloud_info_file = tempfile.mkstemp()[1]
- self.dmi_data = util.read_dmi_data
- dsac.CLOUD_INFO_FILE = self.cloud_info_file
-
- def tearDown(self):
- # Reset
-
- # Attempt to remove the temp file ignoring errors
- try:
- os.remove(self.cloud_info_file)
- except OSError:
- pass
-
- util.read_dmi_data = self.dmi_data
- dsac.CLOUD_INFO_FILE = '/etc/sysconfig/cloud-info'
+ self.cloud_info_file = self.tmp_path('cloud-info', dir=self.tmp)
def test_rhev(self):
'''Success Test module get_data() forcing RHEV.'''
- _write_cloud_info_file('RHEV')
+ util.write_file(self.cloud_info_file, 'RHEV')
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
dsrc.user_data_rhevm = lambda: True
- self.assertEqual(True, dsrc.get_data())
+ with mock.patch.object(dsac, 'CLOUD_INFO_FILE', self.cloud_info_file):
+ self.assertEqual(True, dsrc.get_data())
+ self.assertEqual('altcloud', dsrc.cloud_name)
+ self.assertEqual('altcloud', dsrc.platform_type)
+ self.assertEqual('rhev (/dev/fd0)', dsrc.subplatform)
def test_vsphere(self):
'''Success Test module get_data() forcing VSPHERE.'''
- _write_cloud_info_file('VSPHERE')
+ util.write_file(self.cloud_info_file, 'VSPHERE')
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
dsrc.user_data_vsphere = lambda: True
- self.assertEqual(True, dsrc.get_data())
+ with mock.patch.object(dsac, 'CLOUD_INFO_FILE', self.cloud_info_file):
+ self.assertEqual(True, dsrc.get_data())
+ self.assertEqual('altcloud', dsrc.cloud_name)
+ self.assertEqual('altcloud', dsrc.platform_type)
+ self.assertEqual('vsphere (unknown)', dsrc.subplatform)
def test_fail_rhev(self):
'''Failure Test module get_data() forcing RHEV.'''
- _write_cloud_info_file('RHEV')
+ util.write_file(self.cloud_info_file, 'RHEV')
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
dsrc.user_data_rhevm = lambda: False
- self.assertEqual(False, dsrc.get_data())
+ with mock.patch.object(dsac, 'CLOUD_INFO_FILE', self.cloud_info_file):
+ self.assertEqual(False, dsrc.get_data())
def test_fail_vsphere(self):
'''Failure Test module get_data() forcing VSPHERE.'''
- _write_cloud_info_file('VSPHERE')
+ util.write_file(self.cloud_info_file, 'VSPHERE')
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
dsrc.user_data_vsphere = lambda: False
- self.assertEqual(False, dsrc.get_data())
+ with mock.patch.object(dsac, 'CLOUD_INFO_FILE', self.cloud_info_file):
+ self.assertEqual(False, dsrc.get_data())
def test_unrecognized(self):
'''Failure Test module get_data() forcing unrecognized.'''
- _write_cloud_info_file('unrecognized')
+ util.write_file(self.cloud_info_file, 'unrecognized')
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
- self.assertEqual(False, dsrc.get_data())
+ with mock.patch.object(dsac, 'CLOUD_INFO_FILE', self.cloud_info_file):
+ self.assertEqual(False, dsrc.get_data())
class TestGetDataNoCloudInfoFile(CiTestCase):
@@ -322,7 +321,8 @@ class TestUserDataVsphere(CiTestCase):
'''
def setUp(self):
'''Set up.'''
- self.paths = helpers.Paths({'cloud_dir': '/tmp'})
+ self.tmp = self.tmp_dir()
+ self.paths = helpers.Paths({'cloud_dir': self.tmp})
self.mount_dir = tempfile.mkdtemp()
_write_user_data_files(self.mount_dir, 'test user data')
@@ -363,6 +363,22 @@ class TestUserDataVsphere(CiTestCase):
self.assertEqual(1, m_find_devs_with.call_count)
self.assertEqual(1, m_mount_cb.call_count)
+ @mock.patch("cloudinit.sources.DataSourceAltCloud.util.find_devs_with")
+ @mock.patch("cloudinit.sources.DataSourceAltCloud.util.mount_cb")
+ def test_user_data_vsphere_success(self, m_mount_cb, m_find_devs_with):
+ """Test user_data_vsphere() where successful."""
+ m_find_devs_with.return_value = ["/dev/mock/cdrom"]
+ m_mount_cb.return_value = 'raw userdata from cdrom'
+ dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
+ cloud_info = self.tmp_path('cloud-info', dir=self.tmp)
+ util.write_file(cloud_info, 'VSPHERE')
+ self.assertEqual(True, dsrc.user_data_vsphere())
+ m_find_devs_with.assert_called_once_with('LABEL=CDROM')
+ m_mount_cb.assert_called_once_with(
+ '/dev/mock/cdrom', dsac.read_user_data_callback)
+ with mock.patch.object(dsrc, 'get_cloud_type', return_value='VSPHERE'):
+ self.assertEqual('vsphere (/dev/mock/cdrom)', dsrc.subplatform)
+
class TestReadUserDataCallback(CiTestCase):
'''