summaryrefslogtreecommitdiff
path: root/tests/unittests/test_datasource/test_ibmcloud.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/test_datasource/test_ibmcloud.py')
-rw-r--r--tests/unittests/test_datasource/test_ibmcloud.py343
1 files changed, 0 insertions, 343 deletions
diff --git a/tests/unittests/test_datasource/test_ibmcloud.py b/tests/unittests/test_datasource/test_ibmcloud.py
deleted file mode 100644
index 9013ae9f..00000000
--- a/tests/unittests/test_datasource/test_ibmcloud.py
+++ /dev/null
@@ -1,343 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-
-from cloudinit.helpers import Paths
-from cloudinit.sources import DataSourceIBMCloud as ibm
-from cloudinit.tests import helpers as test_helpers
-from cloudinit import util
-
-import base64
-import copy
-import json
-from textwrap import dedent
-
-mock = test_helpers.mock
-
-D_PATH = "cloudinit.sources.DataSourceIBMCloud."
-
-
-@mock.patch(D_PATH + "_is_xen", return_value=True)
-@mock.patch(D_PATH + "_is_ibm_provisioning")
-@mock.patch(D_PATH + "util.blkid")
-class TestGetIBMPlatform(test_helpers.CiTestCase):
- """Test the get_ibm_platform helper."""
-
- blkid_base = {
- "/dev/xvda1": {
- "DEVNAME": "/dev/xvda1", "LABEL": "cloudimg-bootfs",
- "TYPE": "ext3"},
- "/dev/xvda2": {
- "DEVNAME": "/dev/xvda2", "LABEL": "cloudimg-rootfs",
- "TYPE": "ext4"},
- }
-
- blkid_metadata_disk = {
- "/dev/xvdh1": {
- "DEVNAME": "/dev/xvdh1", "LABEL": "METADATA", "TYPE": "vfat",
- "SEC_TYPE": "msdos", "UUID": "681B-8C5D",
- "PARTUUID": "3d631e09-01"},
- }
-
- blkid_oscode_disk = {
- "/dev/xvdh": {
- "DEVNAME": "/dev/xvdh", "LABEL": "config-2", "TYPE": "vfat",
- "SEC_TYPE": "msdos", "UUID": ibm.IBM_CONFIG_UUID}
- }
-
- def setUp(self):
- self.blkid_metadata = copy.deepcopy(self.blkid_base)
- self.blkid_metadata.update(copy.deepcopy(self.blkid_metadata_disk))
-
- self.blkid_oscode = copy.deepcopy(self.blkid_base)
- self.blkid_oscode.update(copy.deepcopy(self.blkid_oscode_disk))
-
- def test_id_template_live_metadata(self, m_blkid, m_is_prov, _m_xen):
- """identify TEMPLATE_LIVE_METADATA."""
- m_blkid.return_value = self.blkid_metadata
- m_is_prov.return_value = False
- self.assertEqual(
- (ibm.Platforms.TEMPLATE_LIVE_METADATA, "/dev/xvdh1"),
- ibm.get_ibm_platform())
-
- def test_id_template_prov_metadata(self, m_blkid, m_is_prov, _m_xen):
- """identify TEMPLATE_PROVISIONING_METADATA."""
- m_blkid.return_value = self.blkid_metadata
- m_is_prov.return_value = True
- self.assertEqual(
- (ibm.Platforms.TEMPLATE_PROVISIONING_METADATA, "/dev/xvdh1"),
- ibm.get_ibm_platform())
-
- def test_id_template_prov_nodata(self, m_blkid, m_is_prov, _m_xen):
- """identify TEMPLATE_PROVISIONING_NODATA."""
- m_blkid.return_value = self.blkid_base
- m_is_prov.return_value = True
- self.assertEqual(
- (ibm.Platforms.TEMPLATE_PROVISIONING_NODATA, None),
- ibm.get_ibm_platform())
-
- def test_id_os_code(self, m_blkid, m_is_prov, _m_xen):
- """Identify OS_CODE."""
- m_blkid.return_value = self.blkid_oscode
- m_is_prov.return_value = False
- self.assertEqual((ibm.Platforms.OS_CODE, "/dev/xvdh"),
- ibm.get_ibm_platform())
-
- def test_id_os_code_must_match_uuid(self, m_blkid, m_is_prov, _m_xen):
- """Test against false positive on openstack with non-ibm UUID."""
- blkid = self.blkid_oscode
- blkid["/dev/xvdh"]["UUID"] = "9999-9999"
- m_blkid.return_value = blkid
- m_is_prov.return_value = False
- self.assertEqual((None, None), ibm.get_ibm_platform())
-
-
-@mock.patch(D_PATH + "_read_system_uuid", return_value=None)
-@mock.patch(D_PATH + "get_ibm_platform")
-class TestReadMD(test_helpers.CiTestCase):
- """Test the read_datasource helper."""
-
- template_md = {
- "files": [],
- "network_config": {"content_path": "/content/interfaces"},
- "hostname": "ci-fond-ram",
- "name": "ci-fond-ram",
- "domain": "testing.ci.cloud-init.org",
- "meta": {"dsmode": "net"},
- "uuid": "8e636730-9f5d-c4a5-327c-d7123c46e82f",
- "public_keys": {"1091307": "ssh-rsa AAAAB3NzaC1...Hw== ci-pubkey"},
- }
-
- oscode_md = {
- "hostname": "ci-grand-gannet.testing.ci.cloud-init.org",
- "name": "ci-grand-gannet",
- "uuid": "2f266908-8e6c-4818-9b5c-42e9cc66a785",
- "random_seed": "bm90LXJhbmRvbQo=",
- "crypt_key": "ssh-rsa AAAAB3NzaC1yc2..n6z/",
- "configuration_token": "eyJhbGciOi..M3ZA",
- "public_keys": {"1091307": "ssh-rsa AAAAB3N..Hw== ci-pubkey"},
- }
-
- content_interfaces = dedent("""\
- auto lo
- iface lo inet loopback
-
- auto eth0
- allow-hotplug eth0
- iface eth0 inet static
- address 10.82.43.5
- netmask 255.255.255.192
- """)
-
- userdata = b"#!/bin/sh\necho hi mom\n"
- # meta.js file gets json encoded userdata as a list.
- meta_js = '["#!/bin/sh\necho hi mom\n"]'
- vendor_data = {
- "cloud-init": "#!/bin/bash\necho 'root:$6$5ab01p1m1' | chpasswd -e"}
-
- network_data = {
- "links": [
- {"id": "interface_29402281", "name": "eth0", "mtu": None,
- "type": "phy", "ethernet_mac_address": "06:00:f1:bd:da:25"},
- {"id": "interface_29402279", "name": "eth1", "mtu": None,
- "type": "phy", "ethernet_mac_address": "06:98:5e:d0:7f:86"}
- ],
- "networks": [
- {"id": "network_109887563", "link": "interface_29402281",
- "type": "ipv4", "ip_address": "10.82.43.2",
- "netmask": "255.255.255.192",
- "routes": [
- {"network": "10.0.0.0", "netmask": "255.0.0.0",
- "gateway": "10.82.43.1"},
- {"network": "161.26.0.0", "netmask": "255.255.0.0",
- "gateway": "10.82.43.1"}]},
- {"id": "network_109887551", "link": "interface_29402279",
- "type": "ipv4", "ip_address": "108.168.194.252",
- "netmask": "255.255.255.248",
- "routes": [
- {"network": "0.0.0.0", "netmask": "0.0.0.0",
- "gateway": "108.168.194.249"}]}
- ],
- "services": [
- {"type": "dns", "address": "10.0.80.11"},
- {"type": "dns", "address": "10.0.80.12"}
- ],
- }
-
- sysuuid = '7f79ebf5-d791-43c3-a723-854e8389d59f'
-
- def _get_expected_metadata(self, os_md):
- """return expected 'metadata' for data loaded from meta_data.json."""
- os_md = copy.deepcopy(os_md)
- renames = (
- ('hostname', 'local-hostname'),
- ('uuid', 'instance-id'),
- ('public_keys', 'public-keys'))
- ret = {}
- for osname, mdname in renames:
- if osname in os_md:
- ret[mdname] = os_md[osname]
- if 'random_seed' in os_md:
- ret['random_seed'] = base64.b64decode(os_md['random_seed'])
-
- return ret
-
- def test_provisioning_md(self, m_platform, m_sysuuid):
- """Provisioning env with a metadata disk should return None."""
- m_platform.return_value = (
- ibm.Platforms.TEMPLATE_PROVISIONING_METADATA, "/dev/xvdh")
- self.assertIsNone(ibm.read_md())
-
- def test_provisioning_no_metadata(self, m_platform, m_sysuuid):
- """Provisioning env with no metadata disk should return None."""
- m_platform.return_value = (
- ibm.Platforms.TEMPLATE_PROVISIONING_NODATA, None)
- self.assertIsNone(ibm.read_md())
-
- def test_provisioning_not_ibm(self, m_platform, m_sysuuid):
- """Provisioning env but not identified as IBM should return None."""
- m_platform.return_value = (None, None)
- self.assertIsNone(ibm.read_md())
-
- def test_template_live(self, m_platform, m_sysuuid):
- """Template live environment should be identified."""
- tmpdir = self.tmp_dir()
- m_platform.return_value = (
- ibm.Platforms.TEMPLATE_LIVE_METADATA, tmpdir)
- m_sysuuid.return_value = self.sysuuid
-
- test_helpers.populate_dir(tmpdir, {
- 'openstack/latest/meta_data.json': json.dumps(self.template_md),
- 'openstack/latest/user_data': self.userdata,
- 'openstack/content/interfaces': self.content_interfaces,
- 'meta.js': self.meta_js})
-
- ret = ibm.read_md()
- self.assertEqual(ibm.Platforms.TEMPLATE_LIVE_METADATA,
- ret['platform'])
- self.assertEqual(tmpdir, ret['source'])
- self.assertEqual(self.userdata, ret['userdata'])
- self.assertEqual(self._get_expected_metadata(self.template_md),
- ret['metadata'])
- self.assertEqual(self.sysuuid, ret['system-uuid'])
-
- def test_os_code_live(self, m_platform, m_sysuuid):
- """Verify an os_code metadata path."""
- tmpdir = self.tmp_dir()
- m_platform.return_value = (ibm.Platforms.OS_CODE, tmpdir)
- netdata = json.dumps(self.network_data)
- test_helpers.populate_dir(tmpdir, {
- 'openstack/latest/meta_data.json': json.dumps(self.oscode_md),
- 'openstack/latest/user_data': self.userdata,
- 'openstack/latest/vendor_data.json': json.dumps(self.vendor_data),
- 'openstack/latest/network_data.json': netdata,
- })
-
- ret = ibm.read_md()
- self.assertEqual(ibm.Platforms.OS_CODE, ret['platform'])
- self.assertEqual(tmpdir, ret['source'])
- self.assertEqual(self.userdata, ret['userdata'])
- self.assertEqual(self._get_expected_metadata(self.oscode_md),
- ret['metadata'])
-
- def test_os_code_live_no_userdata(self, m_platform, m_sysuuid):
- """Verify os_code without user-data."""
- tmpdir = self.tmp_dir()
- m_platform.return_value = (ibm.Platforms.OS_CODE, tmpdir)
- test_helpers.populate_dir(tmpdir, {
- 'openstack/latest/meta_data.json': json.dumps(self.oscode_md),
- 'openstack/latest/vendor_data.json': json.dumps(self.vendor_data),
- })
-
- ret = ibm.read_md()
- self.assertEqual(ibm.Platforms.OS_CODE, ret['platform'])
- self.assertEqual(tmpdir, ret['source'])
- self.assertIsNone(ret['userdata'])
- self.assertEqual(self._get_expected_metadata(self.oscode_md),
- ret['metadata'])
-
-
-class TestIsIBMProvisioning(test_helpers.FilesystemMockingTestCase):
- """Test the _is_ibm_provisioning method."""
- inst_log = "/root/swinstall.log"
- prov_cfg = "/root/provisioningConfiguration.cfg"
- boot_ref = "/proc/1/environ"
- with_logs = True
-
- def _call_with_root(self, rootd):
- self.reRoot(rootd)
- return ibm._is_ibm_provisioning()
-
- def test_no_config(self):
- """No provisioning config means not provisioning."""
- self.assertFalse(self._call_with_root(self.tmp_dir()))
-
- def test_config_only(self):
- """A provisioning config without a log means provisioning."""
- rootd = self.tmp_dir()
- test_helpers.populate_dir(rootd, {self.prov_cfg: "key=value"})
- self.assertTrue(self._call_with_root(rootd))
-
- def test_config_with_old_log(self):
- """A config with a log from previous boot is not provisioning."""
- rootd = self.tmp_dir()
- data = {self.prov_cfg: ("key=value\nkey2=val2\n", -10),
- self.inst_log: ("log data\n", -30),
- self.boot_ref: ("PWD=/", 0)}
- test_helpers.populate_dir_with_ts(rootd, data)
- self.assertFalse(self._call_with_root(rootd=rootd))
- self.assertIn("from previous boot", self.logs.getvalue())
-
- def test_config_with_new_log(self):
- """A config with a log from this boot is provisioning."""
- rootd = self.tmp_dir()
- data = {self.prov_cfg: ("key=value\nkey2=val2\n", -10),
- self.inst_log: ("log data\n", 30),
- self.boot_ref: ("PWD=/", 0)}
- test_helpers.populate_dir_with_ts(rootd, data)
- self.assertTrue(self._call_with_root(rootd=rootd))
- self.assertIn("from current boot", self.logs.getvalue())
-
- def test_config_and_log_no_reference(self):
- """If the config and log existed, but no reference, assume not."""
- rootd = self.tmp_dir()
- test_helpers.populate_dir(
- rootd, {self.prov_cfg: "key=value", self.inst_log: "log data\n"})
- self.assertFalse(self._call_with_root(rootd=rootd))
- self.assertIn("no reference file", self.logs.getvalue())
-
-
-class TestDataSourceIBMCloud(test_helpers.CiTestCase):
-
- def setUp(self):
- super(TestDataSourceIBMCloud, self).setUp()
- self.tmp = self.tmp_dir()
- self.cloud_dir = self.tmp_path('cloud', dir=self.tmp)
- util.ensure_dir(self.cloud_dir)
- paths = Paths({'run_dir': self.tmp, 'cloud_dir': self.cloud_dir})
- self.ds = ibm.DataSourceIBMCloud(
- sys_cfg={}, distro=None, paths=paths)
-
- def test_get_data_false(self):
- """When read_md returns None, get_data returns False."""
- with mock.patch(D_PATH + 'read_md', return_value=None):
- self.assertFalse(self.ds.get_data())
-
- def test_get_data_processes_read_md(self):
- """get_data processes and caches content returned by read_md."""
- md = {
- 'metadata': {}, 'networkdata': 'net', 'platform': 'plat',
- 'source': 'src', 'system-uuid': 'uuid', 'userdata': 'ud',
- 'vendordata': 'vd'}
- with mock.patch(D_PATH + 'read_md', return_value=md):
- self.assertTrue(self.ds.get_data())
- self.assertEqual('src', self.ds.source)
- self.assertEqual('plat', self.ds.platform)
- self.assertEqual({}, self.ds.metadata)
- self.assertEqual('ud', self.ds.userdata_raw)
- self.assertEqual('net', self.ds.network_json)
- self.assertEqual('vd', self.ds.vendordata_pure)
- self.assertEqual('uuid', self.ds.system_uuid)
- self.assertEqual('ibmcloud', self.ds.cloud_name)
- self.assertEqual('ibmcloud', self.ds.platform_type)
- self.assertEqual('plat (src)', self.ds.subplatform)
-
-# vi: ts=4 expandtab